diff --git a/bbb-video/src/main/java/org/bigbluebutton/app/video/VideoApplication.java b/bbb-video/src/main/java/org/bigbluebutton/app/video/VideoApplication.java index 7f1a650418..3c3731caa5 100755 --- a/bbb-video/src/main/java/org/bigbluebutton/app/video/VideoApplication.java +++ b/bbb-video/src/main/java/org/bigbluebutton/app/video/VideoApplication.java @@ -178,13 +178,13 @@ public class VideoApplication extends MultiThreadedApplicationAdapter { String streamId = stream.getPublishedName(); publisher.userSharedWebcamMessage(meetingId, userId, streamId); - + VideoStreamListener listener = new VideoStreamListener(conn.getScope(), stream, recordVideoStream); + listener.setEventRecordingService(recordingService); + stream.addStreamListener(listener); + streamListeners.put(conn.getScope().getName() + "-" + stream.getPublishedName(), listener); + if (recordVideoStream) { recordStream(stream); - VideoStreamListener listener = new VideoStreamListener(); - listener.setEventRecordingService(recordingService); - stream.addStreamListener(listener); - streamListeners.put(conn.getScope().getName() + "-" + stream.getPublishedName(), listener); } } @@ -211,14 +211,14 @@ public class VideoApplication extends MultiThreadedApplicationAdapter { String streamId = stream.getPublishedName(); publisher.userUnshareWebcamRequestMessage(meetingId, userId, streamId); - - if (recordVideoStream) { IStreamListener listener = streamListeners.remove(scopeName + "-" + stream.getPublishedName()); if (listener != null) { stream.removeStreamListener(listener); } + if (recordVideoStream) { + long publishDuration = (System.currentTimeMillis() - stream.getCreationTime()) / 1000; log.info("streamBroadcastClose " + stream.getPublishedName() + " " + System.currentTimeMillis() + " " + scopeName); Map event = new HashMap(); diff --git a/bbb-video/src/main/java/org/bigbluebutton/app/video/VideoStreamListener.java b/bbb-video/src/main/java/org/bigbluebutton/app/video/VideoStreamListener.java old mode 100644 new mode 100755 index dffeed4774..aceb4012af --- a/bbb-video/src/main/java/org/bigbluebutton/app/video/VideoStreamListener.java +++ b/bbb-video/src/main/java/org/bigbluebutton/app/video/VideoStreamListener.java @@ -21,14 +21,20 @@ package org.bigbluebutton.app.video; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; - import org.apache.mina.core.buffer.IoBuffer; import org.red5.server.api.IConnection; import org.red5.server.api.Red5; +import org.red5.server.api.scheduling.IScheduledJob; +import org.red5.server.api.scheduling.ISchedulingService; +import org.red5.server.api.scope.IScope; import org.red5.server.api.stream.IBroadcastStream; import org.red5.server.api.stream.IStreamListener; import org.red5.server.api.stream.IStreamPacket; import org.red5.server.net.rtmp.event.VideoData; +import org.red5.server.scheduling.QuartzSchedulingService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.red5.logging.Red5LoggerFactory; /** * Class to listen for the first video packet of the webcam. @@ -46,13 +52,45 @@ import org.red5.server.net.rtmp.event.VideoData; * */ public class VideoStreamListener implements IStreamListener { + private static final Logger log = Red5LoggerFactory.getLogger(VideoStreamListener.class, "video"); + private EventRecordingService recordingService; private volatile boolean firstPacketReceived = false; - private Long genTimestamp() { - return TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); - } - + // Maximum time between video packets + private int videoTimeout = 10000; + + // Last time video was received, not video timestamp + private long lastVideoTime; + + // Stream being observed + private IBroadcastStream stream; + + // if this stream is recorded or not + private boolean record; + + // Scheduler + private QuartzSchedulingService scheduler; + + // Event queue worker job name + private String timeoutJobName; + + private IScope scope; + + public VideoStreamListener(IScope scope, IBroadcastStream stream, Boolean record) { + this.scope = scope; + this.stream = stream; + this.record = record; + + // get the scheduler + scheduler = (QuartzSchedulingService) scope.getParent().getContext().getBean(QuartzSchedulingService.BEAN_NAME); + + } + + private Long genTimestamp() { + return TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); + } + @Override public void packetReceived(IBroadcastStream stream, IStreamPacket packet) { IoBuffer buf = packet.getData(); @@ -64,17 +102,25 @@ public class VideoStreamListener implements IStreamListener { } if (packet instanceof VideoData) { + // keep track of last time video was received + lastVideoTime = System.currentTimeMillis(); + if (! firstPacketReceived) { firstPacketReceived = true; - IConnection conn = Red5.getConnectionLocal(); - Map event = new HashMap(); - event.put("module", "WEBCAM"); - event.put("timestamp", genTimestamp().toString()); - event.put("meetingId", conn.getScope().getName()); - event.put("stream", stream.getPublishedName()); - event.put("eventName", "StartWebcamShareEvent"); - - recordingService.record(conn.getScope().getName(), event); + // start the worker + timeoutJobName = scheduler.addScheduledJob(videoTimeout, new TimeoutJob()); + + if (record) { + IConnection conn = Red5.getConnectionLocal(); + Map event = new HashMap(); + event.put("module", "WEBCAM"); + event.put("timestamp", genTimestamp().toString()); + event.put("meetingId", scope.getName()); + event.put("stream", stream.getPublishedName()); + event.put("eventName", "StartWebcamShareEvent"); + + recordingService.record(conn.getScope().getName(), event); + } } } } @@ -82,5 +128,19 @@ public class VideoStreamListener implements IStreamListener { public void setEventRecordingService(EventRecordingService s) { recordingService = s; } + + private class TimeoutJob implements IScheduledJob { + + public void execute(ISchedulingService service) { + if ((System.currentTimeMillis() - lastVideoTime) > videoTimeout) { + log.warn("No data received for stream[{}] in the last few seconds. Close stream.", stream.getPublishedName()); + // remove the scheduled job + scheduler.removeScheduledJob(timeoutJobName); + // stop / clean up + stream.stop(); + } + } + + } }