- close stream if no video packet received within a few seconds

This commit is contained in:
Richard Alam 2015-07-02 14:41:17 +00:00
parent 721bcf9342
commit 4190ad3aa7
2 changed files with 81 additions and 21 deletions

View File

@ -178,13 +178,13 @@ public class VideoApplication extends MultiThreadedApplicationAdapter {
String streamId = stream.getPublishedName();
publisher.userSharedWebcamMessage(meetingId, userId, streamId);
if (recordVideoStream) {
recordStream(stream);
VideoStreamListener listener = new VideoStreamListener();
VideoStreamListener listener = new VideoStreamListener(conn.getScope(), stream, recordVideoStream);
listener.setEventRecordingService(recordingService);
stream.addStreamListener(listener);
streamListeners.put(conn.getScope().getName() + "-" + stream.getPublishedName(), listener);
if (recordVideoStream) {
recordStream(stream);
}
}
@ -212,13 +212,13 @@ public class VideoApplication extends MultiThreadedApplicationAdapter {
publisher.userUnshareWebcamRequestMessage(meetingId, userId, streamId);
if (recordVideoStream) {
IStreamListener listener = streamListeners.remove(scopeName + "-" + stream.getPublishedName());
if (listener != null) {
stream.removeStreamListener(listener);
}
if (recordVideoStream) {
long publishDuration = (System.currentTimeMillis() - stream.getCreationTime()) / 1000;
log.info("streamBroadcastClose " + stream.getPublishedName() + " " + System.currentTimeMillis() + " " + scopeName);
Map<String, String> event = new HashMap<String, String>();

View File

@ -21,14 +21,20 @@ package org.bigbluebutton.app.video;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.mina.core.buffer.IoBuffer;
import org.red5.server.api.IConnection;
import org.red5.server.api.Red5;
import org.red5.server.api.scheduling.IScheduledJob;
import org.red5.server.api.scheduling.ISchedulingService;
import org.red5.server.api.scope.IScope;
import org.red5.server.api.stream.IBroadcastStream;
import org.red5.server.api.stream.IStreamListener;
import org.red5.server.api.stream.IStreamPacket;
import org.red5.server.net.rtmp.event.VideoData;
import org.red5.server.scheduling.QuartzSchedulingService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.red5.logging.Red5LoggerFactory;
/**
* Class to listen for the first video packet of the webcam.
@ -46,9 +52,41 @@ import org.red5.server.net.rtmp.event.VideoData;
*
*/
public class VideoStreamListener implements IStreamListener {
private static final Logger log = Red5LoggerFactory.getLogger(VideoStreamListener.class, "video");
private EventRecordingService recordingService;
private volatile boolean firstPacketReceived = false;
// Maximum time between video packets
private int videoTimeout = 10000;
// Last time video was received, not video timestamp
private long lastVideoTime;
// Stream being observed
private IBroadcastStream stream;
// if this stream is recorded or not
private boolean record;
// Scheduler
private QuartzSchedulingService scheduler;
// Event queue worker job name
private String timeoutJobName;
private IScope scope;
public VideoStreamListener(IScope scope, IBroadcastStream stream, Boolean record) {
this.scope = scope;
this.stream = stream;
this.record = record;
// get the scheduler
scheduler = (QuartzSchedulingService) scope.getParent().getContext().getBean(QuartzSchedulingService.BEAN_NAME);
}
private Long genTimestamp() {
return TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
}
@ -64,13 +102,20 @@ public class VideoStreamListener implements IStreamListener {
}
if (packet instanceof VideoData) {
// keep track of last time video was received
lastVideoTime = System.currentTimeMillis();
if (! firstPacketReceived) {
firstPacketReceived = true;
// start the worker
timeoutJobName = scheduler.addScheduledJob(videoTimeout, new TimeoutJob());
if (record) {
IConnection conn = Red5.getConnectionLocal();
Map<String, String> event = new HashMap<String, String>();
event.put("module", "WEBCAM");
event.put("timestamp", genTimestamp().toString());
event.put("meetingId", conn.getScope().getName());
event.put("meetingId", scope.getName());
event.put("stream", stream.getPublishedName());
event.put("eventName", "StartWebcamShareEvent");
@ -78,9 +123,24 @@ public class VideoStreamListener implements IStreamListener {
}
}
}
}
public void setEventRecordingService(EventRecordingService s) {
recordingService = s;
}
private class TimeoutJob implements IScheduledJob {
public void execute(ISchedulingService service) {
if ((System.currentTimeMillis() - lastVideoTime) > videoTimeout) {
log.warn("No data received for stream[{}] in the last few seconds. Close stream.", stream.getPublishedName());
// remove the scheduled job
scheduler.removeScheduledJob(timeoutJobName);
// stop / clean up
stream.stop();
}
}
}
}