- Make sure that screenshare is synched when recording chapter breaks

This commit is contained in:
Richard Alam 2018-02-14 12:19:26 -08:00
parent d6d4459f35
commit 4511101ad1
5 changed files with 231 additions and 98 deletions

View File

@ -2,8 +2,12 @@ package org.bigbluebutton.app.screenshare;
import java.util.HashMap;
import java.util.Map;
import org.red5.logging.Red5LoggerFactory;
import org.slf4j.Logger;
public class Meeting {
private static Logger log = Red5LoggerFactory.getLogger(Meeting.class, "screenshare");
public final String id;
private Map<String, VideoStream> videoStreams = new HashMap<String, VideoStream>();
@ -13,17 +17,21 @@ public class Meeting {
}
public synchronized void addStream(VideoStream stream) {
videoStreams.put(stream.getStreamId(), stream);
log.debug("Adding VideoStream {} to meeting {}", stream.getStreamId(), id);
videoStreams.put(stream.getStreamId(), stream);
}
public synchronized void removeStream(String streamId) {
VideoStream vs = videoStreams.remove(streamId);
log.debug("Removing VideoStream {} to meeting {}", streamId, id);
VideoStream vs = videoStreams.remove(streamId);
}
public synchronized void streamBroadcastClose(String streamId) {
VideoStream vs = videoStreams.remove(streamId);
log.debug("streamBroadcastClose VideoStream {} to meeting {}", streamId, id);
VideoStream vs = videoStreams.remove(streamId);
if (vs != null) {
vs.streamBroadcastClose();
log.debug("Closing VideoStream {} to meeting {}", streamId, id);
vs.streamBroadcastClose();
}
}

View File

@ -2,8 +2,11 @@ package org.bigbluebutton.app.screenshare;
import java.util.HashMap;
import java.util.Map;
import org.red5.logging.Red5LoggerFactory;
import org.slf4j.Logger;
public class MeetingManager {
private static Logger log = Red5LoggerFactory.getLogger(MeetingManager.class, "screenshare");
private Map<String, Meeting> meetings = new HashMap<String, Meeting>();
@ -16,6 +19,8 @@ public class MeetingManager {
}
public void addStream(String meetingId, VideoStream vs) {
log.debug("Adding VideoStream {} to meeting {}", vs.getStreamId(), meetingId);
Meeting m = meetings.get(meetingId);
if (m != null) {
m.addStream(vs);
@ -27,16 +32,22 @@ public class MeetingManager {
}
public void removeStream(String meetingId, String streamId) {
log.debug("Removing VideoStream {} to meeting {}", streamId, meetingId);
Meeting m = meetings.get(meetingId);
if (m != null) {
log.debug("Removed VideoStream {} to meeting {}", streamId, meetingId);
m.removeStream(streamId);
}
}
public void streamBroadcastClose(String meetingId, String streamId) {
Meeting m = meetings.get(meetingId);
log.debug("streamBroadcastClose VideoStream {} to meeting {}", streamId, meetingId);
Meeting m = meetings.get(meetingId);
if (m != null) {
m.streamBroadcastClose(streamId);
log.debug("streamBroadcastClose 2 VideoStream {} to meeting {}", streamId, meetingId);
m.streamBroadcastClose(streamId);
if (!m.hasVideoStreams()) {
remove(m.id);
}

View File

@ -7,6 +7,9 @@ import org.red5.server.api.scope.IScope;
import org.red5.server.api.stream.IBroadcastStream;
import org.red5.server.stream.ClientBroadcastStream;
import org.slf4j.Logger;
import java.util.HashMap;
import java.util.Map;
import com.google.gson.Gson;
public class VideoStream {
private static Logger log = Red5LoggerFactory.getLogger(VideoStream.class, "screenshare");
@ -26,7 +29,7 @@ public class VideoStream {
}
public String getStreamId() {
return streamId;
return stream.getPublishedName();
}
public synchronized void startRecording() {
@ -36,19 +39,38 @@ public class VideoStream {
log.info("Recording stream " + recordingStreamName);
videoStreamListener.setStreamId(recordingStreamName);
cstream.saveAs(recordingStreamName, false);
Map<String, Object> logData2 = new HashMap<String, Object>();
logData2.put("broadcastStream", stream.getPublishedName());
logData2.put("recordStreamId", recordingStreamName);
logData2.put("recording", cstream.isRecording());
logData2.put("event", "start_recording_stream");
logData2.put("description", "Start recording stream.");
Gson gson2 = new Gson();
String logStr2 = gson2.toJson(logData2);
log.info(logStr2);
} catch (Exception e) {
log.error("ERROR while recording stream " + e.getMessage());
e.printStackTrace();
}
}
public synchronized void stopRecording() {
if (cstream.isRecording()) {
cstream.stopRecording();
videoStreamListener.stopRecording();
videoStreamListener.reset();
}
}
public synchronized void stopRecording() {
log.debug("STOP RECORDING STREAM {} recording {}", stream.getPublishedName(), cstream.isRecording());
Map<String, Object> logData2 = new HashMap<String, Object>();
logData2.put("broadcastStream", stream.getPublishedName());
logData2.put("recordStreamId", recordingStreamName);
logData2.put("event", "stop_recording_stream");
logData2.put("description", "Stop recording stream.");
Gson gson2 = new Gson();
String logStr2 = gson2.toJson(logData2);
log.info(logStr2);
cstream.stopRecording();
videoStreamListener.stopRecording();
videoStreamListener.reset();
}
public synchronized void stopStartRecording() {
stopRecording();

View File

@ -34,6 +34,7 @@ import org.slf4j.Logger;
import org.red5.logging.Red5LoggerFactory;
import com.google.gson.Gson;
import java.text.SimpleDateFormat;
/**
* Class to listen for the first video packet of the webcam.
@ -51,7 +52,7 @@ import com.google.gson.Gson;
*
*/
public class VideoStreamListener implements IStreamListener {
private static final Logger log = Red5LoggerFactory.getLogger(VideoStreamListener.class, "video");
private static final Logger log = Red5LoggerFactory.getLogger(VideoStreamListener.class, "screenshare");
private EventRecordingService recordingService;
private volatile boolean firstPacketReceived = false;
@ -60,6 +61,7 @@ public class VideoStreamListener implements IStreamListener {
private int videoTimeout = 10000;
private long firstPacketTime = 0L;
private long packetCount = 0L;
private int keyFrameCount = 0;
// Last time video was received, not video timestamp
private long lastVideoTime;
@ -81,12 +83,16 @@ public class VideoStreamListener implements IStreamListener {
private volatile boolean publishing = false;
private volatile boolean streamPaused = false;
private volatile boolean streamStarted = false;
private String meetingId;
private long recordingStartTime;
private String filename;
private final String DATE = "date";
private final String TIMESTAMP_UTC = "timestampUTC";
public VideoStreamListener(String meetingId, String streamId, Boolean record,
String recordingDir, int packetTimeout,
QuartzSchedulingService scheduler,
@ -98,6 +104,9 @@ public class VideoStreamListener implements IStreamListener {
this.recordingDir = recordingDir;
this.scheduler = scheduler;
this.recordingService = recordingService;
// start the worker to monitor if we are still receiving video packets
timeoutJobName = scheduler.addScheduledJob(videoTimeout, new TimeoutJob());
}
private Long genTimestamp() {
@ -106,81 +115,112 @@ public class VideoStreamListener implements IStreamListener {
public void reset() {
firstPacketReceived = false;
keyFrameCount = 0;
}
public void setStreamId(String streamId) {
this.streamId = streamId;
}
@Override
public void packetReceived(IBroadcastStream stream, IStreamPacket packet) {
IoBuffer buf = packet.getData();
if (buf != null)
buf.rewind();
@Override
public void packetReceived(IBroadcastStream stream, IStreamPacket packet) {
IoBuffer buf = packet.getData();
if (buf != null)
buf.rewind();
if (buf == null || buf.remaining() == 0) {
return;
}
if (buf == null || buf.remaining() == 0) {
return;
}
if (packet instanceof VideoData) {
// keep track of last time video was received
lastVideoTime = System.currentTimeMillis();
packetCount++;
if (packet instanceof VideoData) {
// keep track of last time video was received
lastVideoTime = System.currentTimeMillis();
packetCount++;
if (!firstPacketReceived) {
firstPacketReceived = true;
publishing = true;
firstPacketTime = lastVideoTime;
VideoData vidPkt = (VideoData) packet;
// start the worker to monitor if we are still receiving video packets
timeoutJobName = scheduler.addScheduledJob(videoTimeout, new TimeoutJob());
if (!firstPacketReceived && vidPkt.getFrameType() == VideoData.FrameType.KEYFRAME) {
log.info("******* Receiving first screenshare KEYFRAME packet");
firstPacketReceived = true;
publishing = true;
firstPacketTime = lastVideoTime;
streamStarted = true;
if (record) {
recordingStartTime = System.currentTimeMillis();
filename = recordingDir;
if (!filename.endsWith("/")) {
filename.concat("/");
}
if (record) {
recordingStartTime = System.currentTimeMillis();
filename = recordingDir;
if (!filename.endsWith("/")) {
filename.concat("/");
}
filename = filename.concat(meetingId).concat("/").concat(streamId).concat(".flv");
recordingStartTime = System.currentTimeMillis();
Map<String, String> event = new HashMap<String, String>();
event.put("module", "Deskshare");
event.put("timestamp", genTimestamp().toString());
event.put("meetingId", meetingId);
event.put("file", filename);
event.put("stream", streamId);
event.put("eventName", "DeskshareStartedEvent");
filename = filename.concat(meetingId).concat("/").concat(streamId).concat(".flv");
recordingStartTime = System.currentTimeMillis();
Map<String, String> event = new HashMap<String, String>();
event.put("module", "Deskshare");
event.put("timestamp", genTimestamp().toString());
event.put("meetingId", meetingId);
event.put("file", filename);
event.put("stream", streamId);
event.put(TIMESTAMP_UTC, Long.toString(recordingStartTime));
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX");
event.put(DATE, sdf.format(recordingStartTime));
event.put("eventName", "DeskshareStartedEvent");
recordingService.record(meetingId, event);
}
}
recordingService.record(meetingId, event);
if (streamPaused) {
streamPaused = false;
long now = System.currentTimeMillis();
long numSeconds = (now - lastVideoTime) / 1000;
Gson gson = new Gson();
String logStr = gson.toJson(event);
log.info("StartScreenShareEvent data={} timeoutJobName={}", logStr, timeoutJobName);
}
}
Map<String, Object> logData = new HashMap<String, Object>();
logData.put("meetingId", meetingId);
logData.put("stream", streamId);
logData.put("packetCount", packetCount);
logData.put("publishing", publishing);
logData.put("pausedFor (sec)", numSeconds);
Gson gson = new Gson();
String logStr = gson.toJson(logData);
if (streamPaused) {
streamPaused = false;
long now = System.currentTimeMillis();
long numSeconds = (now - lastVideoTime) / 1000;
log.warn("Screenshare stream restarted. data={}", logStr);
}
Map<String, Object> logData = new HashMap<String, Object>();
logData.put("meetingId", meetingId);
logData.put("stream", streamId);
logData.put("packetCount", packetCount);
logData.put("publishing", publishing);
logData.put("pausedFor (sec)", numSeconds);
Gson gson = new Gson();
String logStr = gson.toJson(logData);
log.warn("Screenshare stream restarted. data={}", logStr);
}
if (vidPkt.getFrameType() == VideoData.FrameType.KEYFRAME && keyFrameCount < 3) {
// Log first 3 keyframe packets to allow us to see interval between key frames. Helps
// to debug if there are any synch issues with recording playback. (ralam feb 12, 2018)
keyFrameCount++;
long now = System.currentTimeMillis();
Map<String, Object> logData = new HashMap<String, Object>();
logData.put("meetingId", meetingId);
logData.put("stream", stream.getPublishedName());
logData.put("packetCount", packetCount);
logData.put("keyFrameCount", keyFrameCount);
logData.put("publishing", publishing);
logData.put(TIMESTAMP_UTC, Long.toString(now));
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX");
logData.put(DATE, sdf.format(now));
Gson gson = new Gson();
String logStr = gson.toJson(logData);
log.warn("Video stream keyframe. data={}", logStr);
}
}
}
}
}
public void stopRecording() {
if (record) {
long publishDuration = (System.currentTimeMillis() - recordingStartTime) / 1000;
long now = System.currentTimeMillis();
long publishDuration = (now - recordingStartTime) / 1000;
Map<String, String> event = new HashMap<String, String>();
event.put("module", "Deskshare");
@ -189,13 +229,34 @@ public class VideoStreamListener implements IStreamListener {
event.put("stream", streamId);
event.put("file", filename);
event.put("duration", new Long(publishDuration).toString());
event.put("eventName", "DeskshareStoppedEvent");
event.put(TIMESTAMP_UTC, Long.toString(now));
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX");
event.put(DATE, sdf.format(now));
event.put("eventName", "DeskshareStoppedEvent");
recordingService.record(meetingId, event);
Gson gson = new Gson();
String logStr = gson.toJson(event);
log.info("StopScreenShareEvent data={}", logStr);
}
}
public void streamStopped() {
this.publishing = false;
public void streamStopped() {
this.publishing = false;
if (!streamStarted) {
Map<String, Object> logData = new HashMap<String, Object>();
logData.put("meetingId", meetingId);
logData.put("stream", streamId);
logData.put("packetCount", packetCount);
logData.put("publishing", publishing);
logData.put("timeoutJobName", timeoutJobName);
Gson gson = new Gson();
String logStr = gson.toJson(logData);
log.warn("Removing scheduled job.as stream hasn't started. data={}", logStr);
// remove the scheduled job
scheduler.removeScheduledJob(timeoutJobName);
}
}
private class TimeoutJob implements IScheduledJob {
@ -207,11 +268,12 @@ public class VideoStreamListener implements IStreamListener {
logData.put("stream", streamId);
logData.put("packetCount", packetCount);
logData.put("publishing", publishing);
logData.put("timeoutJobName", timeoutJobName);
Gson gson = new Gson();
long now = System.currentTimeMillis();
if ((now - lastVideoTime) > videoTimeout && !streamPaused) {
if ((now - lastVideoTime) > videoTimeout && !streamPaused && streamStarted) {
streamPaused = true;
long numSeconds = (now - lastVideoTime) / 1000;
@ -224,7 +286,7 @@ public class VideoStreamListener implements IStreamListener {
}
String logStr = gson.toJson(logData);
if (!publishing) {
if (!publishing && streamStarted) {
log.warn("Removing scheduled job. data={}", logStr);
// remove the scheduled job
scheduler.removeScheduledJob(timeoutJobName);

View File

@ -151,8 +151,24 @@ public class Red5AppAdapter extends MultiThreadedApplicationAdapter {
String connId = conn.getSessionId();
String scopeName = stream.getScope().getName();
String connType = getConnectionType(Red5.getConnectionLocal().getType());
String streamId = stream.getPublishedName();
Map<String, Object> logData = new HashMap<String, Object>();
logData.put("meetingId", getMeetingId());
logData.put("userId", getUserId());
logData.put("connType", connType);
logData.put("connId", connId);
logData.put("stream", stream.getPublishedName());
logData.put("context", scopeName);
logData.put("event", "stream_broadcast_start");
logData.put("description", "Stream broadcast start.");
Gson gson = new Gson();
String logStr = gson.toJson(logData);
log.info(logStr);
Matcher matcher = STREAM_ID_PATTERN.matcher(stream.getPublishedName());
if (matcher.matches()) {
String meetingId = matcher.group(1).trim();
@ -162,25 +178,33 @@ public class Red5AppAdapter extends MultiThreadedApplicationAdapter {
app.authorizeBroadcastStream(meetingId, streamId, connId, scopeName);
boolean recordVideoStream = app.recordStream(meetingId, streamId);
VideoStreamListener listener = new VideoStreamListener(meetingId, streamId,
recordVideoStream, recordingDirectory, packetTimeout, scheduler, recordingService);
ClientBroadcastStream cstream = (ClientBroadcastStream) this.getBroadcastStream(conn.getScope(), stream.getPublishedName());
stream.addStreamListener(listener);
VideoStream vstream = new VideoStream(stream, listener, cstream);
vstream.startRecording();
if (recordVideoStream) {
Map<String, Object> logData2 = new HashMap<String, Object>();
logData2.put("meetingId", meetingId);
logData2.put("connType", connType);
logData2.put("connId", connId);
logData.put("streamId", streamId);
logData.put("url", url);
logData.put("recorded", recordVideoStream);
logData2.put("context", scopeName);
logData2.put("event", "stream_broadcast_record_start");
logData2.put("description", "Stream broadcast record start.");
meetingManager.addStream(meetingId, vstream);
Gson gson2 = new Gson();
String logStr2 = gson2.toJson(logData2);
log.info(logStr2);
Map<String, Object> logData = new HashMap<String, Object>();
logData.put("meetingId", meetingId);
logData.put("streamId", streamId);
logData.put("url", url);
logData.put("recorded", recordVideoStream);
VideoStreamListener listener = new VideoStreamListener(meetingId, streamId,
recordVideoStream, recordingDirectory, packetTimeout, scheduler, recordingService);
ClientBroadcastStream cstream = (ClientBroadcastStream) this.getBroadcastStream(conn.getScope(), stream.getPublishedName());
stream.addStreamListener(listener);
VideoStream vstream = new VideoStream(stream, listener, cstream);
vstream.startRecording();
Gson gson = new Gson();
String logStr = gson.toJson(logData);
meetingManager.addStream(meetingId, vstream);
}
log.info("ScreenShare broadcast started: data={}", logStr);
} else {
log.error("Invalid streamid format [{}]", streamId);
conn.close();
@ -195,8 +219,11 @@ public class Red5AppAdapter extends MultiThreadedApplicationAdapter {
public void streamBroadcastClose(IBroadcastStream stream) {
super.streamBroadcastClose(stream);
log.info("streamBroadcastStop " + stream.getPublishedName() + "]");
String streamId = stream.getPublishedName();
String connType = getConnectionType(Red5.getConnectionLocal().getType());
String connId = Red5.getConnectionLocal().getSessionId();
String scopeName = stream.getScope().getName();
String streamId = stream.getPublishedName();
Matcher matcher = STREAM_ID_PATTERN.matcher(stream.getPublishedName());
if (matcher.matches()) {
String meetingId = matcher.group(1).trim();
@ -206,15 +233,18 @@ public class Red5AppAdapter extends MultiThreadedApplicationAdapter {
meetingManager.streamBroadcastClose(meetingId, streamId);
Map<String, Object> logData = new HashMap<String, Object>();
logData.put("meetingId", meetingId);
logData.put("streamId", streamId);
logData.put("recorded", recordVideoStream);
Map<String, Object> logData2 = new HashMap<String, Object>();
logData2.put("meetingId", meetingId);
logData2.put("connType", connType);
logData2.put("connId", connId);
logData2.put("stream", stream.getPublishedName());
logData2.put("context", scopeName);
logData2.put("event", "stream_broadcast_close");
logData2.put("description", "Stream broadcast close.");
Gson gson = new Gson();
String logStr = gson.toJson(logData);
log.info("ScreenShare broadcast stopped: data={}", logStr);
Gson gson2 = new Gson();
String logStr2 = gson2.toJson(logData2);
log.info(logStr2);
} else {
log.error("Invalid streamid format [{}]", streamId);
}