- try to detect if the flash client and the jws app are still alive. If one of them

if dead, end screen sharing.
This commit is contained in:
Richard Alam 2016-08-18 19:49:32 +00:00
parent 8103162d61
commit b591106aaa
23 changed files with 360 additions and 113 deletions

View File

@ -10,9 +10,11 @@ public class MeetingCreatedMessage implements ISubscribedMessage {
public final String VERSION = "0.0.1"; public final String VERSION = "0.0.1";
public final String meetingId; public final String meetingId;
public final Boolean record;
public MeetingCreatedMessage(String meetingID) { public MeetingCreatedMessage(String meetingId, Boolean record) {
this.meetingId = meetingID; this.meetingId = meetingId;
this.record = record;
} }
public String toJson() { public String toJson() {
@ -35,10 +37,10 @@ public class MeetingCreatedMessage implements ISubscribedMessage {
if (header.has("name")) { if (header.has("name")) {
String messageName = header.get("name").getAsString(); String messageName = header.get("name").getAsString();
if (MEETING_CREATED.equals(messageName)) { if (MEETING_CREATED.equals(messageName)) {
if (payload.has(Constants.MEETING_ID)) { if (payload.has(Constants.MEETING_ID) && payload.has(Constants.RECORDED)) {
String meetingId = payload.get(Constants.MEETING_ID).getAsString(); String meetingId = payload.get(Constants.MEETING_ID).getAsString();
Boolean record = payload.get(Constants.RECORDED).getAsBoolean();
return new MeetingCreatedMessage(meetingId); return new MeetingCreatedMessage(meetingId, record);
} }
} }
} }

View File

@ -23,5 +23,6 @@ public interface IScreenShareApplication {
void userDisconnected(String meetingId, String userId); void userDisconnected(String meetingId, String userId);
void userConnected(String meetingId, String userId); void userConnected(String meetingId, String userId);
void meetingHasEnded(String meetingId); void meetingHasEnded(String meetingId);
void meetingCreated(String meetingId); void meetingCreated(String meetingId, Boolean record);
void screenShareClientPongMessage(String meetingId, String userId, String streamId, Long timestamp);
} }

View File

@ -0,0 +1,16 @@
package org.bigbluebutton.app.screenshare.events;
public class ScreenShareClientPing implements IEvent {
public final String meetingId;
public final String userId;
public final String streamId;
public final Long timestamp;
public ScreenShareClientPing(String meetingId, String userId, String streamId, Long timestamp) {
this.meetingId = meetingId;
this.userId = userId;
this.streamId = streamId;
this.timestamp = timestamp;
}
}

View File

@ -29,7 +29,7 @@ public class RedisPubSubMessageHandler implements MessageHandler {
} else if (MeetingCreatedMessage.MEETING_CREATED.equals(messageName)) { } else if (MeetingCreatedMessage.MEETING_CREATED.equals(messageName)) {
MeetingCreatedMessage msg = MeetingCreatedMessage.fromJson(message); MeetingCreatedMessage msg = MeetingCreatedMessage.fromJson(message);
handler.meetingCreated(msg.meetingId); handler.meetingCreated(msg.meetingId, msg.record);
} }
} }

View File

@ -25,9 +25,27 @@ public class EventListenerImp implements IEventListener {
sendStartShareRequestResponse((StartShareRequestResponse) event); sendStartShareRequestResponse((StartShareRequestResponse) event);
} else if (event instanceof StartShareRequestFailedResponse) { } else if (event instanceof StartShareRequestFailedResponse) {
sendStartShareRequestFailedResponse((StartShareRequestFailedResponse) event); sendStartShareRequestFailedResponse((StartShareRequestFailedResponse) event);
} else if (event instanceof IsScreenSharingResponse) { } else if (event instanceof IsScreenSharingResponse) {
sendIsScreenSharingResponse((IsScreenSharingResponse) event); sendIsScreenSharingResponse((IsScreenSharingResponse) event);
} else if (event instanceof ScreenShareClientPing) {
sendScreenShareClientPing((ScreenShareClientPing) event);
} }
}
private void sendScreenShareClientPing(ScreenShareClientPing event) {
Map<String, Object> data = new HashMap<String, Object>();
data.put("meetingId", event.meetingId);
data.put("streamId", event.streamId);
data.put("timestamp", event.timestamp);
Map<String, Object> message = new HashMap<String, Object>();
Gson gson = new Gson();
message.put("msg", gson.toJson(data));
log.info("Sending ScreenShareClientPing to client, meetingId=" + event.meetingId + " userid=" + event.userId);
DirectClientMessage msg = new DirectClientMessage(event.meetingId, event.userId, "screenShareClientPingMessage", message);
sender.sendMessage(msg);
} }
private void sendIsScreenSharingResponse(IsScreenSharingResponse event) { private void sendIsScreenSharingResponse(IsScreenSharingResponse event) {

View File

@ -22,8 +22,8 @@ public class Red5AppHandler {
app.meetingHasEnded(meetingId); app.meetingHasEnded(meetingId);
} }
public void meetingCreated(String meetingId) { public void meetingCreated(String meetingId, Boolean record) {
app.meetingCreated(meetingId); app.meetingCreated(meetingId, record);
} }
public void userConnected(String meetingId, String userId) { public void userConnected(String meetingId, String userId) {
@ -57,6 +57,10 @@ public class Red5AppHandler {
app.stopShareRequest(meetingId, streamId); app.stopShareRequest(meetingId, streamId);
} }
} }
public void screenShareClientPongMessage(String meetingId, String userId, String streamId, Long timestamp) {
app.screenShareClientPongMessage(meetingId, userId, streamId, timestamp);
}
public void setApplication(IScreenShareApplication app) { public void setApplication(IScreenShareApplication app) {
this.app = app; this.app = app;

View File

@ -119,6 +119,17 @@ public class Red5AppService {
handler.stopShareRequest(meetingId, streamId); handler.stopShareRequest(meetingId, streamId);
} }
public void screenShareClientPongMessage(Map<String, Object> msg) {
String meetingId = Red5.getConnectionLocal().getScope().getName();
String streamId = (String) msg.get("streamId");
Double timestamp = (Double) msg.get("timestamp");
String userId = (String) Red5.getConnectionLocal().getAttribute("USERID");
log.debug("Received screenShareClientPongMessage for meeting=[{}]", meetingId);
handler.screenShareClientPongMessage(meetingId, userId, streamId, timestamp.longValue());
}
private Long genTimestamp() { private Long genTimestamp() {
return TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); return TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
} }

View File

@ -18,14 +18,15 @@
*/ */
package org.bigbluebutton.app.screenshare package org.bigbluebutton.app.screenshare
import scala.util.{Success, Failure}
import akka.util.Timeout import akka.util.Timeout
import org.bigbluebutton.app.screenshare.events.{IEventsMessageBus, IsScreenSharingResponse, StartShareRequestResponse} import akka.pattern.ask
import org.bigbluebutton.app.screenshare.events.{IEventsMessageBus}
import org.bigbluebutton.app.screenshare.server.sessions.ScreenshareManager import org.bigbluebutton.app.screenshare.server.sessions.ScreenshareManager
import org.bigbluebutton.app.screenshare.server.sessions.messages._ import org.bigbluebutton.app.screenshare.server.sessions.messages._
import org.bigbluebutton.app.screenshare.server.util.LogHelper import org.bigbluebutton.app.screenshare.server.util.LogHelper
import akka.actor.ActorSystem import akka.actor.ActorSystem
import akka.pattern.ask import scala.concurrent.Future
import scala.concurrent.Await import scala.concurrent.Await
import scala.concurrent.duration._ import scala.concurrent.duration._
@ -33,28 +34,33 @@ class ScreenShareApplication(val bus: IEventsMessageBus, val jnlpFile: String,
val streamBaseUrl: String) extends IScreenShareApplication with LogHelper { val streamBaseUrl: String) extends IScreenShareApplication with LogHelper {
implicit val system = ActorSystem("bigbluebutton-screenshare-system") implicit val system = ActorSystem("bigbluebutton-screenshare-system")
val screenshareManager = system.actorOf(ScreenshareManager.props(system, bus), "screenshare-manager") implicit val timeout = akka.util.Timeout(3 seconds)
val screenShareManager = system.actorOf(ScreenshareManager.props(system, bus), "screenshare-manager")
implicit def executionContext = system.dispatcher implicit def executionContext = system.dispatcher
val initError: Error = new Error("Uninitialized error.") val initError: Error = new Error("Uninitialized error.")
logger.info("Creating a new ScreenShareApplication") if (logger.isDebugEnabled()) {
logger.debug("ScreenShareApplication created.")
}
def meetingHasEnded(meetingId: String) { def meetingHasEnded(meetingId: String) {
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("Received meetingHasEnded on meeting=" + meetingId + "]") logger.debug("Received meetingHasEnded on meeting=" + meetingId + "]")
} }
screenshareManager ! new MeetingHasEnded(meetingId) screenShareManager ! new MeetingHasEnded(meetingId)
} }
def meetingCreated(meetingId: String) { def meetingCreated(meetingId: String, record: java.lang.Boolean) {
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("Received meetingCreated on meeting=" + meetingId + "]") logger.debug("Received meetingCreated on meeting=" + meetingId + "]")
} }
screenshareManager ! new MeetingCreated(meetingId) screenShareManager ! new MeetingCreated(meetingId, record)
} }
@ -63,15 +69,15 @@ class ScreenShareApplication(val bus: IEventsMessageBus, val jnlpFile: String,
logger.debug("Received user connected on meeting=" + meetingId logger.debug("Received user connected on meeting=" + meetingId
+ "] userid=[" + userId + "]") + "] userid=[" + userId + "]")
} }
screenshareManager ! new UserConnected(meetingId, userId) screenShareManager ! new UserConnected(meetingId, userId)
} }
def userDisconnected(meetingId: String, userId: String) { def userDisconnected(meetingId: String, userId: String) {
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("Received user disconnected on meeting=" + meetingId logger.debug("Received user disconnected on meeting=" + meetingId
+ "] userid=[" + userId + "]") + "] userid=[" + userId + "]")
} }
screenshareManager ! new UserDisconnected(meetingId, userId) screenShareManager ! new UserDisconnected(meetingId, userId)
} }
def isScreenSharing(meetingId: String, userId: String) { def isScreenSharing(meetingId: String, userId: String) {
@ -79,15 +85,15 @@ class ScreenShareApplication(val bus: IEventsMessageBus, val jnlpFile: String,
logger.debug("Received is screen sharing on meeting=" + meetingId + "]") logger.debug("Received is screen sharing on meeting=" + meetingId + "]")
} }
screenshareManager ! IsScreenSharing(meetingId, userId) screenShareManager ! IsScreenSharing(meetingId, userId)
} }
def getScreenShareInfo(meetingId: String, token: String):ScreenShareInfoResponse = { def getScreenShareInfo(meetingId: String, token: String):ScreenShareInfoResponse = {
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("Received get screen sharing info on token=" + token + "]") logger.debug("Received get screen sharing info on token=" + token + "]")
} }
implicit val timeout = Timeout(3 seconds)
val future = screenshareManager ? ScreenShareInfoRequest(meetingId, token) val future = screenShareManager ? ScreenShareInfoRequest(meetingId, token)
val reply = Await.result(future, timeout.duration).asInstanceOf[ScreenShareInfoRequestReply] val reply = Await.result(future, timeout.duration).asInstanceOf[ScreenShareInfoRequestReply]
val publishUrl = streamBaseUrl + "/" + meetingId val publishUrl = streamBaseUrl + "/" + meetingId
@ -99,13 +105,40 @@ class ScreenShareApplication(val bus: IEventsMessageBus, val jnlpFile: String,
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("Received record stream request on stream=" + streamId + "]") logger.debug("Received record stream request on stream=" + streamId + "]")
} }
var record = false
implicit val timeout = Timeout(3 seconds) //var recorded = false
val future = screenshareManager ? IsStreamRecorded(meetingId, streamId) //sendIsStreamRecored(meetingId, streamId).onComplete {
// case Success(record) => recorded = record
// case Failure(ex) => recorded = false
//}
// if (logger.isDebugEnabled()) {
// logger.debug("Received record stream response=" + recorded + "] *************************************")
// }
// recorded
var record = false
val future = screenShareManager ? IsStreamRecorded(meetingId, streamId)
val reply = Await.result(future, timeout.duration).asInstanceOf[IsStreamRecordedReply] val reply = Await.result(future, timeout.duration).asInstanceOf[IsStreamRecordedReply]
record = reply.record record = reply.record
record record
}
private def sendIsStreamRecored(meetingId: String, streamId: String): Future[Boolean] = {
//screenShareManager ? IsStreamRecorded(meetingId, streamId)
(screenShareManager ? IsStreamRecorded(meetingId, streamId))
.mapTo[IsStreamRecordedReply]
.map(result => result.record)
//.recover {
// case _ => false
//}
} }
def startShareRequest(meetingId: String, userId: String, record: java.lang.Boolean) { def startShareRequest(meetingId: String, userId: String, record: java.lang.Boolean) {
@ -113,7 +146,7 @@ class ScreenShareApplication(val bus: IEventsMessageBus, val jnlpFile: String,
logger.debug("Received start share request on meeting=" + meetingId + "for user=" + userId + "]") logger.debug("Received start share request on meeting=" + meetingId + "for user=" + userId + "]")
} }
screenshareManager ! StartShareRequestMessage(meetingId, userId, jnlpFile, record) screenShareManager ! StartShareRequestMessage(meetingId, userId, jnlpFile, record)
} }
def restartShareRequest(meetingId: String, userId: String) { def restartShareRequest(meetingId: String, userId: String) {
@ -121,7 +154,7 @@ class ScreenShareApplication(val bus: IEventsMessageBus, val jnlpFile: String,
logger.debug("Received restart share request on meeting=[" + meetingId logger.debug("Received restart share request on meeting=[" + meetingId
+ "] from userId=[" + userId + "]") + "] from userId=[" + userId + "]")
} }
screenshareManager ! new RestartShareRequestMessage(meetingId, userId) screenShareManager ! new RestartShareRequestMessage(meetingId, userId)
} }
def pauseShareRequest(meetingId: String, userId: String, streamId: String) { def pauseShareRequest(meetingId: String, userId: String, streamId: String) {
@ -129,67 +162,58 @@ class ScreenShareApplication(val bus: IEventsMessageBus, val jnlpFile: String,
logger.debug("Received pause share request on meeting=[" + meetingId logger.debug("Received pause share request on meeting=[" + meetingId
+ "] for stream=[" + streamId + "]") + "] for stream=[" + streamId + "]")
} }
screenshareManager ! new PauseShareRequestMessage(meetingId, userId, streamId) screenShareManager ! new PauseShareRequestMessage(meetingId, userId, streamId)
} }
def stopShareRequest(meetingId: String, streamId: String) { def stopShareRequest(meetingId: String, streamId: String) {
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("Received stop share request on meeting=[" + meetingId logger.debug("Received stop share request on meeting=[" + meetingId
+ "] for stream=[" + streamId + "]") + "] for stream=[" + streamId + "]")
} }
screenshareManager ! new StopShareRequestMessage(meetingId, streamId) screenShareManager ! new StopShareRequestMessage(meetingId, streamId)
} }
def streamStarted(meetingId: String, streamId: String, url: String) { def streamStarted(meetingId: String, streamId: String, url: String) {
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("Received stream started on meeting=[" + meetingId logger.debug("Received stream started on meeting=[" + meetingId
+ "] for stream=[" + streamId + "]") + "] for stream=[" + streamId + "]")
} }
screenshareManager ! new StreamStartedMessage(meetingId, streamId, url) screenShareManager ! new StreamStartedMessage(meetingId, streamId, url)
} }
def streamStopped(meetingId: String, streamId: String) { def streamStopped(meetingId: String, streamId: String) {
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("Received stream stopped on meeting=[" + meetingId logger.debug("Received stream stopped on meeting=[" + meetingId
+ "] for stream=[" + streamId + "]") + "] for stream=[" + streamId + "]")
} }
screenshareManager ! new StreamStoppedMessage(meetingId, streamId) screenShareManager ! new StreamStoppedMessage(meetingId, streamId)
} }
def sharingStarted(meetingId: String, streamId: String, width: java.lang.Integer, height: java.lang.Integer) { def sharingStarted(meetingId: String, streamId: String, width: java.lang.Integer, height: java.lang.Integer) {
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("Received share started on meeting=[" + meetingId logger.debug("Received share started on meeting=[" + meetingId
+ "] for stream=[" + streamId + "] with region=[" + width + "x" + height + "]") + "] for stream=[" + streamId + "] with region=[" + width + "x" + height + "]")
} }
screenshareManager ! new SharingStartedMessage(meetingId, streamId, width, height) screenShareManager ! new SharingStartedMessage(meetingId, streamId, width, height)
} }
def sharingStopped(meetingId: String, streamId: String) { def sharingStopped(meetingId: String, streamId: String) {
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("Received sharing stopped on meeting=" + meetingId logger.debug("Received sharing stopped on meeting=" + meetingId
+ "for stream=" + streamId + "]") + "for stream=" + streamId + "]")
} }
screenshareManager ! new SharingStoppedMessage(meetingId, streamId) screenShareManager ! new SharingStoppedMessage(meetingId, streamId)
} }
def updateShareStatus(meetingId: String, streamId : String, seqNum: java.lang.Integer) { def updateShareStatus(meetingId: String, streamId : String, seqNum: java.lang.Integer) {
// if (logger.isDebugEnabled()) { screenShareManager ! new UpdateShareStatus(meetingId, streamId, seqNum)
// logger.debug("Received sharing status on meeting=" + meetingId
// + "for stream=" + streamId + "]")
// }
screenshareManager ! new UpdateShareStatus(meetingId, streamId, seqNum)
} }
def getSharingStatus(meetingId: String, streamId: String): SharingStatus = { def getSharingStatus(meetingId: String, streamId: String): SharingStatus = {
if (logger.isDebugEnabled()) {
logger.debug("Received sharing status on meeting=" + meetingId
+ "for stream=" + streamId + "]")
}
var stopped = false var stopped = false
implicit val timeout = Timeout(3 seconds) val future = screenShareManager ? GetSharingStatus(meetingId, streamId)
val future = screenshareManager ? GetSharingStatus(meetingId, streamId)
val reply = Await.result(future, timeout.duration).asInstanceOf[GetSharingStatusReply] val reply = Await.result(future, timeout.duration).asInstanceOf[GetSharingStatusReply]
reply.streamId match { reply.streamId match {
@ -199,4 +223,7 @@ class ScreenShareApplication(val bus: IEventsMessageBus, val jnlpFile: String,
} }
def screenShareClientPongMessage (meetingId: String, userId: String, streamId: String, timestamp: java.lang.Long) {
screenShareManager ! new ClientPongMessage(meetingId, userId, streamId, timestamp)
}
} }

View File

@ -6,10 +6,11 @@ import org.bigbluebutton.app.screenshare.events.IEventsMessageBus
object ActiveScreenshare { object ActiveScreenshare {
def apply(screenshareSessionManager: ScreenshareManager, bus: IEventsMessageBus, def apply(screenshareSessionManager: ScreenshareManager, bus: IEventsMessageBus,
meetingId:String)(implicit context: ActorContext) = new ActiveScreenshare(screenshareSessionManager, bus, meetingId)(context) meetingId:String, record: Boolean)(implicit context: ActorContext) =
new ActiveScreenshare(screenshareSessionManager, bus, meetingId, record)(context)
} }
class ActiveScreenshare(val screenshareSessionManager: ScreenshareManager, class ActiveScreenshare(val screenshareSessionManager: ScreenshareManager,
val bus: IEventsMessageBus, val meetingId:String)(implicit val context: ActorContext) { val bus: IEventsMessageBus, val meetingId:String, record: Boolean)(implicit val context: ActorContext) {
val actorRef = context.actorOf(Screenshare.props(screenshareSessionManager, bus, meetingId)) val actorRef = context.actorOf(Screenshare.props(screenshareSessionManager, bus, meetingId, record))
} }

View File

@ -29,13 +29,13 @@ import org.bigbluebutton.app.screenshare.server.sessions.messages._
import scala.collection.immutable.StringOps import scala.collection.immutable.StringOps
object Screenshare { object Screenshare {
def props(screenshareSessionManager: ScreenshareManager, bus: IEventsMessageBus, meetingId:String): Props = def props(screenshareSessionManager: ScreenshareManager, bus: IEventsMessageBus, meetingId:String, record: Boolean): Props =
Props(classOf[Screenshare], screenshareSessionManager, bus, meetingId) Props(classOf[Screenshare], screenshareSessionManager, bus, meetingId, record)
} }
class Screenshare(val sessionManager: ScreenshareManager, class Screenshare(val sessionManager: ScreenshareManager,
val bus: IEventsMessageBus, val bus: IEventsMessageBus,
val meetingId: String) extends Actor with ActorLogging { val meetingId: String, val record: Boolean) extends Actor with ActorLogging {
log.info("Creating a new Screenshare") log.info("Creating a new Screenshare")
private val sessions = new HashMap[String, ActiveSession] private val sessions = new HashMap[String, ActiveSession]
@ -58,7 +58,6 @@ class Screenshare(val sessionManager: ScreenshareManager,
private var currentStreamId:Option[String] = None private var currentStreamId:Option[String] = None
private var currentPresenterId:Option[String] = None private var currentPresenterId:Option[String] = None
private var record:Boolean = false
def receive = { def receive = {
@ -79,6 +78,7 @@ class Screenshare(val sessionManager: ScreenshareManager,
case msg: ScreenShareInfoRequest => handleScreenShareInfoRequest(msg) case msg: ScreenShareInfoRequest => handleScreenShareInfoRequest(msg)
case msg: MeetingHasEnded => handleMeetingHasEnded(msg) case msg: MeetingHasEnded => handleMeetingHasEnded(msg)
case msg: KeepAliveTimeout => handleKeepAliveTimeout(msg) case msg: KeepAliveTimeout => handleKeepAliveTimeout(msg)
case msg: ClientPongMessage => handleClientPongMessage(msg)
case m: Any => log.warning("Session: Unknown message [{}]", m) case m: Any => log.warning("Session: Unknown message [{}]", m)
} }
@ -90,6 +90,18 @@ class Screenshare(val sessionManager: ScreenshareManager,
sessions.values find (su => su.token == token) sessions.values find (su => su.token == token)
} }
private def handleClientPongMessage(msg: ClientPongMessage) {
if (log.isDebugEnabled) {
log.debug("Received ClientPongMessage message for streamId=[" + msg.streamId + "]")
}
activeSession foreach { session =>
if (session.streamId == msg.streamId) {
session.actorRef forward msg
}
}
}
private def handleMeetingHasEnded(msg: MeetingHasEnded) { private def handleMeetingHasEnded(msg: MeetingHasEnded) {
if (log.isDebugEnabled) { if (log.isDebugEnabled) {
log.debug("Received MeetingHasEnded for meetingId=[" + msg.meetingId + "]") log.debug("Received MeetingHasEnded for meetingId=[" + msg.meetingId + "]")
@ -113,6 +125,9 @@ class Screenshare(val sessionManager: ScreenshareManager,
if (userIdArray.length == 2) { if (userIdArray.length == 2) {
val userId = userIdArray(0) val userId = userIdArray(0)
currentPresenterId foreach { presId => currentPresenterId foreach { presId =>
if (log.isDebugEnabled) {
log.debug("UserDisconnected. curPresId=[" + presId + "] userId=[" + msg.userId + "]")
}
if (presId == userId) { if (presId == userId) {
// The user sharing the screen got disconnected. Stop the // The user sharing the screen got disconnected. Stop the
// screen sharing. // screen sharing.
@ -178,9 +193,10 @@ class Screenshare(val sessionManager: ScreenshareManager,
sessions.get(msg.streamId) match { sessions.get(msg.streamId) match {
case Some(session) => case Some(session) =>
session.actorRef forward msg sender ! new IsStreamRecordedReply(record)
case None => case None =>
log.info("IsStreamRecorded on a non-existing session=[" + msg.streamId + "]") log.info("IsStreamRecorded on a non-existing session=[" + msg.streamId + "]")
sender ! new IsStreamRecordedReply(false)
} }
} }
@ -338,7 +354,6 @@ class Screenshare(val sessionManager: ScreenshareManager,
} }
currentStreamId = Some(streamId) currentStreamId = Some(streamId)
record = msg.record
val session = ActiveSession(this, bus, meetingId, streamId, token, msg.record, msg.userId) val session = ActiveSession(this, bus, meetingId, streamId, token, msg.record, msg.userId)

View File

@ -60,11 +60,21 @@ class ScreenshareManager(val aSystem: ActorSystem, val bus: IEventsMessageBus)
case msg: UserConnected => handleUserConnected(msg) case msg: UserConnected => handleUserConnected(msg)
case msg: MeetingHasEnded => handleMeetingHasEnded(msg) case msg: MeetingHasEnded => handleMeetingHasEnded(msg)
case msg: MeetingCreated => handleMeetingCreated(msg) case msg: MeetingCreated => handleMeetingCreated(msg)
case msg: ClientPongMessage => handleClientPongMessage(msg)
case msg: Any => log.warning("Unknown message " + msg) case msg: Any => log.warning("Unknown message " + msg)
} }
private def handleClientPongMessage(msg: ClientPongMessage) {
if (log.isDebugEnabled) {
log.debug("Received ClientPongMessage message for meeting=[" + msg.meetingId + "]")
}
screenshares.get(msg.meetingId) foreach { screenshare =>
screenshare.actorRef ! msg
}
}
private def handleUserDisconnected(msg: UserDisconnected) { private def handleUserDisconnected(msg: UserDisconnected) {
if (log.isDebugEnabled) { if (log.isDebugEnabled) {
log.debug("Received UserDisconnected message for meeting=[" + msg.meetingId + "]") log.debug("Received UserDisconnected message for meeting=[" + msg.meetingId + "]")
@ -126,7 +136,7 @@ class ScreenshareManager(val aSystem: ActorSystem, val bus: IEventsMessageBus)
if (log.isDebugEnabled) { if (log.isDebugEnabled) {
log.debug("Creating screenshare=[" + msg.meetingId + "]") log.debug("Creating screenshare=[" + msg.meetingId + "]")
} }
val activeScreenshare = ActiveScreenshare(this, bus, msg.meetingId) val activeScreenshare = ActiveScreenshare(this, bus, msg.meetingId, msg.record)
screenshares += msg.meetingId -> activeScreenshare screenshares += msg.meetingId -> activeScreenshare
} }

View File

@ -46,8 +46,7 @@ class Session(parent: Screenshare,
val userId: String) extends Actor with ActorLogging { val userId: String) extends Actor with ActorLogging {
log.info("Creating of new Session") log.info("Creating of new Session")
private var timeOfLastKeepAliveUpdate:Long = TimeUtil.getCurrentMonoTime()
private val KEEP_ALIVE_TIMEOUT = 60000
// if ffmpeg is still broadcasting // if ffmpeg is still broadcasting
private var streamStopped = true private var streamStopped = true
@ -61,14 +60,26 @@ class Session(parent: Screenshare,
private var streamUrl: Option[String] = None private var streamUrl: Option[String] = None
private val IS_STREAM_ALIVE = "IsStreamAlive" private val SESSION_AUDIT_MESSAGE = "SessionAuditMessage"
private var lastStatusUpdate = 0L private var lastStatusUpdate = 0L
// Number of seconds before we assume that the JWS is dead.
private val LAST_STATUS_UPDATE_TIMEOUT = 20
private var sessionStartedTimestamp:Long = TimeUtil.currentMonoTimeInSeconds()
private val SESSION_START_TIMEOUT = 60
// The last time we received a pong response from the client.
// We need to check if the client is still alive. If the client
// crashed, we need to end the screen sharing as soon as we detect it.
private var lastPongReceivedTimestamp = 0L
private val PONG_TIMEOUT_SEC = 20
implicit def executionContext = parent.sessionManager.actorSystem.dispatcher implicit def executionContext = parent.sessionManager.actorSystem.dispatcher
def scheduleKeepAliveCheck() { def scheduleKeepAliveCheck() {
parent.sessionManager.actorSystem.scheduler.scheduleOnce(5.seconds, self, IS_STREAM_ALIVE) parent.sessionManager.actorSystem.scheduler.scheduleOnce(5.seconds, self, SESSION_AUDIT_MESSAGE)
} }
def receive = { def receive = {
@ -85,7 +96,8 @@ class Session(parent: Screenshare,
case msg: UpdateShareStatus => handleUpdateShareStatus(msg) case msg: UpdateShareStatus => handleUpdateShareStatus(msg)
case msg: UserDisconnected => handleUserDisconnected(msg) case msg: UserDisconnected => handleUserDisconnected(msg)
case msg: ScreenShareInfoRequest => handleScreenShareInfoRequest(msg) case msg: ScreenShareInfoRequest => handleScreenShareInfoRequest(msg)
case IS_STREAM_ALIVE => checkIfStreamIsAlive() case SESSION_AUDIT_MESSAGE => handleSessionAuditMessage()
case msg: ClientPongMessage => handleClientPongMessage(msg)
case m: Any => log.warning("Session: Unknown message [%s]", m) case m: Any => log.warning("Session: Unknown message [%s]", m)
} }
@ -181,6 +193,7 @@ class Session(parent: Screenshare,
} }
streamStopped = true streamStopped = true
bus.send(new StreamStoppedEvent(meetingId, streamId)) bus.send(new StreamStoppedEvent(meetingId, streamId))
} }
private def handleStopShareRequestMessage(msg: StopShareRequestMessage) { private def handleStopShareRequestMessage(msg: StopShareRequestMessage) {
@ -190,6 +203,7 @@ class Session(parent: Screenshare,
bus.send(new ShareStoppedEvent(meetingId, streamId)) bus.send(new ShareStoppedEvent(meetingId, streamId))
stopSession()
} }
private def handlePauseShareRequestMessage(msg: PauseShareRequestMessage) { private def handlePauseShareRequestMessage(msg: PauseShareRequestMessage) {
@ -212,16 +226,66 @@ class Session(parent: Screenshare,
} }
private def handleUpdateShareStatus(msg: UpdateShareStatus): Unit = { private def handleUpdateShareStatus(msg: UpdateShareStatus): Unit = {
timeOfLastKeepAliveUpdate = TimeUtil.getCurrentMonoTime() lastStatusUpdate = TimeUtil.currentMonoTimeInSeconds()
} }
private def checkIfStreamIsAlive() { private def handleSessionAuditMessage() {
if (TimeUtil.getCurrentMonoTime - timeOfLastKeepAliveUpdate > KEEP_ALIVE_TIMEOUT) { if (jwsStarted()) {
log.warning("Did not receive updates for more than 1 minute. Removing stream {}", streamId) if (jwsIsStillAlive()) {
context.parent ! new KeepAliveTimeout(streamId) if (clientIsStillAlive()) {
} else { scheduleKeepAliveCheck()
scheduleKeepAliveCheck() }
}
} }
} }
private def jwsStarted(): Boolean = {
val currentTimeInSec = TimeUtil.currentMonoTimeInSeconds()
if ((lastStatusUpdate == 0) && (currentTimeInSec - sessionStartedTimestamp > SESSION_START_TIMEOUT)) {
log.warning("JWS failed to start. streamId={}", streamId)
stopSession()
false
} else {
true
}
}
private def handleClientPongMessage(msg: ClientPongMessage) {
if (log.isDebugEnabled) {
log.debug("Received ClientPongMessage message for streamId=[" + msg.streamId + "]")
}
lastPongReceivedTimestamp = TimeUtil.currentMonoTimeInSeconds()
}
private def jwsIsStillAlive(): Boolean = {
val currentTimeInSec = TimeUtil.currentMonoTimeInSeconds()
if ((lastStatusUpdate > 0) && (currentTimeInSec - lastStatusUpdate > LAST_STATUS_UPDATE_TIMEOUT)) {
log.warning("Did not receive status update from JWS. Assume it is dead. streamId={}", streamId)
stopSession()
false
} else {
true
}
}
private def clientIsStillAlive(): Boolean = {
val currentTimeInSec = TimeUtil.currentMonoTimeInSeconds()
if ((lastPongReceivedTimestamp > 0) && (currentTimeInSec - lastPongReceivedTimestamp > PONG_TIMEOUT_SEC)) {
log.warning("Did not receive pong from client. Assume it is dead. streamId={}", streamId)
stopSession()
false
} else {
bus.send(new ScreenShareClientPing(meetingId, userId, streamId, currentTimeInSec))
true
}
}
private def stopSession(): Unit = {
context.parent ! new KeepAliveTimeout(streamId)
context.stop(self)
}
} }

View File

@ -43,4 +43,6 @@ case class UserConnected(meetingId: String, userId: String)
case class MeetingHasEnded(meetingId: String) case class MeetingHasEnded(meetingId: String)
case class MeetingCreated(meetingId: String) case class MeetingCreated(meetingId: String, record: Boolean)
case class ClientPongMessage(meetingId: String, userId: String, streamId: String, timestamp: Long)

View File

@ -11,7 +11,15 @@ object TimeUtil {
def getCurrentMonoTime():Long = { def getCurrentMonoTime():Long = {
TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) TimeUnit.NANOSECONDS.toMillis(System.nanoTime())
} }
def millisToSeconds(millis: Long): Long = {
TimeUnit.MILLISECONDS.toSeconds(millis)
}
def currentMonoTimeInSeconds(): Long = {
millisToSeconds(getCurrentMonoTime())
}
def getCurrentTime():Long = { def getCurrentTime():Long = {
System.currentTimeMillis(); System.currentTimeMillis();
} }

View File

@ -98,8 +98,7 @@ public class FfmpegScreenshare {
ignoreDisconnect = false; ignoreDisconnect = false;
grabber.start(); grabber.start();
} catch (Exception e) { } catch (Exception e) {
// TODO Auto-generated catch block listener.networkConnectionException(ExitCode.CONNECTION_TO_DESKSHARE_SERVER_DROPPED, null);
e.printStackTrace();
} }
// useH264(recorder, codecOptions); // useH264(recorder, codecOptions);
@ -109,8 +108,7 @@ public class FfmpegScreenshare {
try { try {
mainRecorder.start(); mainRecorder.start();
} catch (Exception e) { } catch (Exception e) {
// TODO Auto-generated catch block listener.networkConnectionException(ExitCode.CONNECTION_TO_DESKSHARE_SERVER_DROPPED, null);
e.printStackTrace();
} }
} }
@ -215,9 +213,7 @@ public class FfmpegScreenshare {
// grabber.stop(); // grabber.stop();
System.out.println("End stop sequence."); System.out.println("End stop sequence.");
} catch (Exception e) { } catch (Exception e) {
// TODO Auto-generated catch block listener.networkConnectionException(ExitCode.CONNECTION_TO_DESKSHARE_SERVER_DROPPED, null);
System.out.println("Exception in Stopping screen capture." );
e.printStackTrace();
} }
} }
} }

View File

@ -0,0 +1,23 @@
package org.bigbluebutton.modules.screenshare.events
{
import flash.events.Event;
public class ScreenShareClientPingMessage extends Event
{
public static const CLIENT_PING:String = "screenshare client ping message";
public var streamId: String;
public var timestamp: Number;
public function ScreenShareClientPingMessage(streamId: String, timestamp: Number)
{
super(CLIENT_PING, true, false);
this.streamId = streamId;
this.timestamp = timestamp;
}
}
}

View File

@ -27,6 +27,7 @@ package org.bigbluebutton.modules.screenshare.managers {
import org.bigbluebutton.modules.screenshare.events.ShareStartRequestResponseEvent; import org.bigbluebutton.modules.screenshare.events.ShareStartRequestResponseEvent;
import org.bigbluebutton.modules.screenshare.events.StartShareRequestFailedEvent; import org.bigbluebutton.modules.screenshare.events.StartShareRequestFailedEvent;
import org.bigbluebutton.modules.screenshare.events.StartShareRequestSuccessEvent; import org.bigbluebutton.modules.screenshare.events.StartShareRequestSuccessEvent;
import org.bigbluebutton.modules.screenshare.events.ScreenShareClientPingMessage;
import org.bigbluebutton.modules.screenshare.events.StreamStartedEvent; import org.bigbluebutton.modules.screenshare.events.StreamStartedEvent;
import org.bigbluebutton.modules.screenshare.events.ViewStreamEvent; import org.bigbluebutton.modules.screenshare.events.ViewStreamEvent;
import org.bigbluebutton.modules.screenshare.model.ScreenshareModel; import org.bigbluebutton.modules.screenshare.model.ScreenshareModel;
@ -157,6 +158,11 @@ package org.bigbluebutton.modules.screenshare.managers {
service.requestStartSharing(); service.requestStartSharing();
} }
} }
public function handleScreenShareClientPingMessage(event: ScreenShareClientPingMessage):void {
LOGGER.debug("handleScreenShareClientPingMessage");
service.sendClientPongMessage(event.streamId, event.timestamp);
}
public function handleRequestPauseSharingEvent():void { public function handleRequestPauseSharingEvent():void {
service.requestPauseSharing(ScreenshareModel.getInstance().streamId); service.requestPauseSharing(ScreenshareModel.getInstance().streamId);

View File

@ -37,6 +37,7 @@ with BigBlueButton; if not, see <http://www.gnu.org/licenses/>.
import org.bigbluebutton.modules.screenshare.events.RequestToPauseSharing; import org.bigbluebutton.modules.screenshare.events.RequestToPauseSharing;
import org.bigbluebutton.modules.screenshare.events.ShareEvent; import org.bigbluebutton.modules.screenshare.events.ShareEvent;
import org.bigbluebutton.modules.screenshare.events.ShareStartRequestResponseEvent; import org.bigbluebutton.modules.screenshare.events.ShareStartRequestResponseEvent;
import org.bigbluebutton.modules.screenshare.events.ScreenShareClientPingMessage;
import org.bigbluebutton.modules.screenshare.events.ShareWindowEvent; import org.bigbluebutton.modules.screenshare.events.ShareWindowEvent;
import org.bigbluebutton.modules.screenshare.events.StartedViewingEvent; import org.bigbluebutton.modules.screenshare.events.StartedViewingEvent;
import org.bigbluebutton.modules.screenshare.events.StreamEvent; import org.bigbluebutton.modules.screenshare.events.StreamEvent;
@ -84,6 +85,11 @@ with BigBlueButton; if not, see <http://www.gnu.org/licenses/>.
<MethodInvoker generator="{ScreenshareManager}" method="handleShareStartRequestResponseEvent" arguments="{event}"/> <MethodInvoker generator="{ScreenshareManager}" method="handleShareStartRequestResponseEvent" arguments="{event}"/>
</EventHandlers> </EventHandlers>
<EventHandlers type="{ScreenShareClientPingMessage.CLIENT_PING}">
<MethodInvoker generator="{ScreenshareManager}" method="handleScreenShareClientPingMessage" arguments="{event}"/>
</EventHandlers>
<EventHandlers type="{MadePresenterEvent.SWITCH_TO_PRESENTER_MODE}"> <EventHandlers type="{MadePresenterEvent.SWITCH_TO_PRESENTER_MODE}">
<MethodInvoker generator="{ScreenshareManager}" method="handleMadePresenterEvent" arguments="{event}"/> <MethodInvoker generator="{ScreenshareManager}" method="handleMadePresenterEvent" arguments="{event}"/>
</EventHandlers> </EventHandlers>
@ -101,7 +107,7 @@ with BigBlueButton; if not, see <http://www.gnu.org/licenses/>.
</EventHandlers> </EventHandlers>
<EventHandlers type="{ShareWindowEvent.CLOSE}"> <EventHandlers type="{ShareWindowEvent.CLOSE}">
<MethodInvoker generator="{ScreenshareManager}" method="handleShareWindowCloseEvent"/> <MethodInvoker generator="{ScreenshareManager}" method="handleShareWindowCloseEvent"/>
</EventHandlers> </EventHandlers>

View File

@ -29,8 +29,10 @@ package org.bigbluebutton.modules.screenshare.services
import org.bigbluebutton.modules.screenshare.events.StreamStartedEvent; import org.bigbluebutton.modules.screenshare.events.StreamStartedEvent;
import org.bigbluebutton.modules.screenshare.events.StreamStoppedEvent; import org.bigbluebutton.modules.screenshare.events.StreamStoppedEvent;
import org.bigbluebutton.modules.screenshare.events.ScreenSharePausedEvent; import org.bigbluebutton.modules.screenshare.events.ScreenSharePausedEvent;
import org.bigbluebutton.modules.screenshare.events.ScreenShareClientPingMessage;
import org.bigbluebutton.modules.screenshare.services.red5.Connection; import org.bigbluebutton.modules.screenshare.services.red5.Connection;
import org.bigbluebutton.modules.screenshare.services.red5.IMessageListener; import org.bigbluebutton.modules.screenshare.services.red5.IMessageListener;
import org.bigbluebutton.modules.screenshare.model.ScreenshareModel;
public class MessageReceiver implements IMessageListener public class MessageReceiver implements IMessageListener
{ {
@ -72,11 +74,26 @@ package org.bigbluebutton.modules.screenshare.services
case "startShareRequestRejectedResponse": case "startShareRequestRejectedResponse":
handleStartShareRequestRejectedResponse(message); handleStartShareRequestRejectedResponse(message);
break; break;
case "screenShareClientPingMessage":
handleScreenShareClientPingMessage(message);
break;
default: default:
// LogUtil.warn("Cannot handle message [" + messageName + "]"); // LogUtil.warn("Cannot handle message [" + messageName + "]");
} }
} }
private function handleScreenShareClientPingMessage(message:Object):void {
LOGGER.debug("handleScreenShareClientPingMessage " + JSON.stringify(message));
var map:Object = JSON.parse(message.msg);
if (map.hasOwnProperty("meetingId") && map.hasOwnProperty("streamId") && map.hasOwnProperty("timestamp")) {
if (ScreenshareModel.getInstance().streamId == map.streamId) {
LOGGER.debug("handleScreenShareClientPingMessage - sending ping for streamId=[" + map.streamId + "]");
var sharePingEvent: ScreenShareClientPingMessage = new ScreenShareClientPingMessage(map.streamId, map.timestamp);
dispatcher.dispatchEvent(sharePingEvent);
}
}
}
private function handlePauseScreenSharingEvent(message:Object):void { private function handlePauseScreenSharingEvent(message:Object):void {
LOGGER.debug("handlePauseScreenSharingEvent " + JSON.stringify(message)); LOGGER.debug("handlePauseScreenSharingEvent " + JSON.stringify(message));
var map:Object = JSON.parse(message.msg); var map:Object = JSON.parse(message.msg);

View File

@ -21,33 +21,37 @@ package org.bigbluebutton.modules.screenshare.services
import org.bigbluebutton.modules.screenshare.services.red5.Connection; import org.bigbluebutton.modules.screenshare.services.red5.Connection;
public class MessageSender public class MessageSender
{ {
private static const LOG:String = "SC::MessageSender - "; private static const LOG:String = "SC::MessageSender - ";
private var conn: Connection; private var conn: Connection;
public function MessageSender(conn: Connection) { public function MessageSender(conn: Connection) {
this.conn = conn; this.conn = conn;
} }
public function isScreenSharing(meetingId: String):void { public function isScreenSharing(meetingId: String):void {
conn.isScreenSharing(meetingId); conn.isScreenSharing(meetingId);
} }
public function startShareRequest(meetingId: String, userId: String, record: Boolean):void { public function startShareRequest(meetingId: String, userId: String, record: Boolean):void {
conn.startShareRequest(meetingId, userId, record); conn.startShareRequest(meetingId, userId, record);
} }
public function stopShareRequest(meetingId: String, streamId: String):void { public function stopShareRequest(meetingId: String, streamId: String):void {
conn.stopShareRequest(meetingId, streamId); conn.stopShareRequest(meetingId, streamId);
} }
public function pauseShareRequest(meetingId: String, userId: String, streamId: String):void { public function pauseShareRequest(meetingId: String, userId: String, streamId: String):void {
conn.pauseShareRequest(meetingId, userId, streamId); conn.pauseShareRequest(meetingId, userId, streamId);
} }
public function restartShareRequest(meetingId: String, userId: String):void { public function restartShareRequest(meetingId: String, userId: String):void {
conn.restartShareRequest(meetingId, userId); conn.restartShareRequest(meetingId, userId);
}
public function sendClientPongMessage(meetingId: String, streamId: String, timestamp: Number):void {
conn.sendClientPongMessage(meetingId, streamId, timestamp);
}
} }
}
} }

View File

@ -96,6 +96,9 @@ package org.bigbluebutton.modules.screenshare.services {
sender.restartShareRequest(UsersUtil.getInternalMeetingID(), UsersUtil.getMyUserID()); sender.restartShareRequest(UsersUtil.getInternalMeetingID(), UsersUtil.getMyUserID());
} }
public function sendClientPongMessage(streamId: String, timestamp: Number):void {
LOGGER.debug("sendClientPongMessage");
sender.sendClientPongMessage(UsersUtil.getInternalMeetingID(), streamId, timestamp);
}
} }
} }

View File

@ -229,6 +229,19 @@ package org.bigbluebutton.modules.screenshare.services.red5 {
}, message); }, message);
} }
public function sendClientPongMessage(meetingId:String, streamId:String, timestamp: Number):void {
var message:Object = new Object();
message["meetingId"] = meetingId;
message["streamId"] = streamId;
message["timestamp"] = timestamp;
sendMessage("screenshare.screenShareClientPongMessage", function(result:String):void { // On successful result
LOGGER.debug(result);
}, function(status:String):void { // status - On error occurred
LOGGER.error(status);
}, message);
}
public function setURI(p_URI:String):void { public function setURI(p_URI:String):void {
uri = p_URI; uri = p_URI;
} }