Merge pull request #4622 from ritzalam/screenshare-chapter-breaks

Screenshare chapter breaks
This commit is contained in:
Richard Alam 2017-11-01 16:04:30 -04:00 committed by GitHub
commit 0e8ec5f3b4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 469 additions and 73 deletions

View File

@ -414,7 +414,7 @@ class MeetingActor(
val elapsedInMs = now - lastRecBreakSentOn
val elapsedInMin = TimeUtil.millisToMinutes(elapsedInMs)
if (elapsedInMin > 1) {
if (elapsedInMin > recordingChapterBreakLenghtInMinutes) {
lastRecBreakSentOn = now
val event = MsgBuilder.buildRecordingChapterBreakSysMsg(props.meetingProp.intId, TimeUtil.timeNowInMs())
outGW.send(event)

View File

@ -0,0 +1,44 @@
package org.bigbluebutton.app.screenshare;
import java.util.HashMap;
import java.util.Map;
public class Meeting {
public final String id;
private Map<String, VideoStream> videoStreams = new HashMap<String, VideoStream>();
public Meeting(String id) {
this.id = id;
}
public synchronized void addStream(VideoStream stream) {
videoStreams.put(stream.getStreamId(), stream);
}
public synchronized void removeStream(String streamId) {
VideoStream vs = videoStreams.remove(streamId);
}
public synchronized void streamBroadcastClose(String streamId) {
VideoStream vs = videoStreams.remove(streamId);
if (vs != null) {
vs.streamBroadcastClose();
}
}
public synchronized boolean hasVideoStreams() {
return !videoStreams.isEmpty();
}
public synchronized void stopStartRecording(String streamId) {
VideoStream vs = videoStreams.get(streamId);
if (vs != null) vs.stopStartRecording();
}
public synchronized void stopStartAllRecordings() {
for (VideoStream vs : videoStreams.values()) {
stopStartRecording(vs.getStreamId());
}
}
}

View File

@ -0,0 +1,53 @@
package org.bigbluebutton.app.screenshare;
import java.util.HashMap;
import java.util.Map;
public class MeetingManager {
private Map<String, Meeting> meetings = new HashMap<String, Meeting>();
private void add(Meeting m) {
meetings.put(m.id, m);
}
private void remove(String id) {
Meeting m = meetings.remove(id);
}
public void addStream(String meetingId, VideoStream vs) {
Meeting m = meetings.get(meetingId);
if (m != null) {
m.addStream(vs);
} else {
Meeting nm = new Meeting(meetingId);
nm.addStream(vs);
add(nm);
}
}
public void removeStream(String meetingId, String streamId) {
Meeting m = meetings.get(meetingId);
if (m != null) {
m.removeStream(streamId);
}
}
public void streamBroadcastClose(String meetingId, String streamId) {
Meeting m = meetings.get(meetingId);
if (m != null) {
m.streamBroadcastClose(streamId);
if (!m.hasVideoStreams()) {
remove(m.id);
}
}
}
public synchronized void stopStartAllRecordings(String meetingId) {
Meeting m = meetings.get(meetingId);
if (m != null) {
m.stopStartAllRecordings();
}
}
}

View File

@ -0,0 +1,65 @@
package org.bigbluebutton.app.screenshare;
import org.red5.logging.Red5LoggerFactory;
import org.red5.server.api.IConnection;
import org.red5.server.api.Red5;
import org.red5.server.api.scope.IScope;
import org.red5.server.api.stream.IBroadcastStream;
import org.red5.server.stream.ClientBroadcastStream;
import org.slf4j.Logger;
public class VideoStream {
private static Logger log = Red5LoggerFactory.getLogger(VideoStream.class, "screenshare");
private VideoStreamListener videoStreamListener;
private IScope scope;
private String streamId;
private IBroadcastStream stream;
private String recordingStreamName;
private ClientBroadcastStream cstream;
public VideoStream(IBroadcastStream stream, VideoStreamListener videoStreamListener, ClientBroadcastStream cstream) {
this.stream = stream;
this.videoStreamListener = videoStreamListener;
stream.addStreamListener(videoStreamListener);
this.cstream = cstream;
}
public String getStreamId() {
return streamId;
}
public synchronized void startRecording() {
long now = System.currentTimeMillis();
recordingStreamName = stream.getPublishedName() + "-" + now;
try {
log.info("Recording stream " + recordingStreamName);
videoStreamListener.setStreamId(recordingStreamName);
cstream.saveAs(recordingStreamName, false);
} catch (Exception e) {
log.error("ERROR while recording stream " + e.getMessage());
e.printStackTrace();
}
}
public synchronized void stopRecording() {
if (cstream.isRecording()) {
cstream.stopRecording();
videoStreamListener.stopRecording();
videoStreamListener.reset();
}
}
public synchronized void stopStartRecording() {
stopRecording();
videoStreamListener.reset();
startRecording();
}
public synchronized void streamBroadcastClose() {
stopRecording();
videoStreamListener.streamStopped();
stream.removeStreamListener(videoStreamListener);
}
}

View File

@ -0,0 +1,236 @@
/**
* BigBlueButton open source conferencing system - http://www.bigbluebutton.org/
* <p>
* Copyright (c) 2012 BigBlueButton Inc. and by respective authors (see below).
* <p>
* This program is free software; you can redistribute it and/or modify it under the
* terms of the GNU Lesser General Public License as published by the Free Software
* Foundation; either version 3.0 of the License, or (at your option) any later
* version.
* <p>
* BigBlueButton is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
* PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
* <p>
* You should have received a copy of the GNU Lesser General Public License along
* with BigBlueButton; if not, see <http://www.gnu.org/licenses/>.
*/
package org.bigbluebutton.app.screenshare;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.mina.core.buffer.IoBuffer;
import org.red5.server.api.scheduling.IScheduledJob;
import org.red5.server.api.scheduling.ISchedulingService;
import org.red5.server.api.scope.IScope;
import org.red5.server.api.stream.IBroadcastStream;
import org.red5.server.api.stream.IStreamListener;
import org.red5.server.api.stream.IStreamPacket;
import org.red5.server.net.rtmp.event.VideoData;
import org.red5.server.scheduling.QuartzSchedulingService;
import org.slf4j.Logger;
import org.red5.logging.Red5LoggerFactory;
import com.google.gson.Gson;
/**
* Class to listen for the first video packet of the webcam.
* We need to listen for the first packet and send a startWebcamEvent.
* The reason is that when starting the webcam, sometimes Flash Player
* needs to prompt the user for permission to access the webcam. However,
* while waiting for the user to click OK to the prompt, Red5 has already
* called the startBroadcast method which we take as the start of the recording.
* When the user finally clicks OK, the packets then start to flow through.
* This introduces a delay of when we assume the start of the recording and
* the webcam actually publishes video packets. When we do the ingest and
* processing of the video and multiplex the audio, the video and audio will
* be un-synched by at least this amount of delay.
* @author Richard Alam
*
*/
public class VideoStreamListener implements IStreamListener {
private static final Logger log = Red5LoggerFactory.getLogger(VideoStreamListener.class, "video");
private EventRecordingService recordingService;
private volatile boolean firstPacketReceived = false;
// Maximum time between video packets
private int videoTimeout = 10000;
private long firstPacketTime = 0L;
private long packetCount = 0L;
// Last time video was received, not video timestamp
private long lastVideoTime;
private String recordingDir;
// Stream being observed
private String streamId;
// if this stream is recorded or not
private boolean record;
// Scheduler
private QuartzSchedulingService scheduler;
// Event queue worker job name
private String timeoutJobName;
private volatile boolean publishing = false;
private volatile boolean streamPaused = false;
private String meetingId;
private long recordingStartTime;
private String filename;
public VideoStreamListener(String meetingId, String streamId, Boolean record,
String recordingDir, int packetTimeout,
QuartzSchedulingService scheduler,
EventRecordingService recordingService) {
this.meetingId = meetingId;
this.streamId = streamId;
this.record = record;
this.videoTimeout = packetTimeout;
this.recordingDir = recordingDir;
this.scheduler = scheduler;
this.recordingService = recordingService;
}
private Long genTimestamp() {
return TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
}
public void reset() {
firstPacketReceived = false;
}
public void setStreamId(String streamId) {
this.streamId = streamId;
}
@Override
public void packetReceived(IBroadcastStream stream, IStreamPacket packet) {
IoBuffer buf = packet.getData();
if (buf != null)
buf.rewind();
if (buf == null || buf.remaining() == 0) {
return;
}
if (packet instanceof VideoData) {
// keep track of last time video was received
lastVideoTime = System.currentTimeMillis();
packetCount++;
if (!firstPacketReceived) {
firstPacketReceived = true;
publishing = true;
firstPacketTime = lastVideoTime;
// start the worker to monitor if we are still receiving video packets
timeoutJobName = scheduler.addScheduledJob(videoTimeout, new TimeoutJob());
if (record) {
recordingStartTime = System.currentTimeMillis();
filename = recordingDir;
if (!filename.endsWith("/")) {
filename.concat("/");
}
filename = filename.concat(meetingId).concat("/").concat(streamId).concat(".flv");
recordingStartTime = System.currentTimeMillis();
Map<String, String> event = new HashMap<String, String>();
event.put("module", "Deskshare");
event.put("timestamp", genTimestamp().toString());
event.put("meetingId", meetingId);
event.put("file", filename);
event.put("stream", streamId);
event.put("eventName", "DeskshareStartedEvent");
recordingService.record(meetingId, event);
}
}
if (streamPaused) {
streamPaused = false;
long now = System.currentTimeMillis();
long numSeconds = (now - lastVideoTime) / 1000;
Map<String, Object> logData = new HashMap<String, Object>();
logData.put("meetingId", meetingId);
logData.put("stream", streamId);
logData.put("packetCount", packetCount);
logData.put("publishing", publishing);
logData.put("pausedFor (sec)", numSeconds);
Gson gson = new Gson();
String logStr = gson.toJson(logData);
log.warn("Screenshare stream restarted. data={}", logStr);
}
}
}
public void stopRecording() {
if (record) {
long publishDuration = (System.currentTimeMillis() - recordingStartTime) / 1000;
Map<String, String> event = new HashMap<String, String>();
event.put("module", "Deskshare");
event.put("timestamp", genTimestamp().toString());
event.put("meetingId", meetingId);
event.put("stream", streamId);
event.put("file", filename);
event.put("duration", new Long(publishDuration).toString());
event.put("eventName", "DeskshareStoppedEvent");
recordingService.record(meetingId, event);
}
}
public void streamStopped() {
this.publishing = false;
}
private class TimeoutJob implements IScheduledJob {
private boolean streamStopped = false;
public void execute(ISchedulingService service) {
Map<String, Object> logData = new HashMap<String, Object>();
logData.put("meetingId", meetingId);
logData.put("stream", streamId);
logData.put("packetCount", packetCount);
logData.put("publishing", publishing);
Gson gson = new Gson();
long now = System.currentTimeMillis();
if ((now - lastVideoTime) > videoTimeout && !streamPaused) {
streamPaused = true;
long numSeconds = (now - lastVideoTime) / 1000;
logData.put("lastPacketTime (sec)", numSeconds);
String logStr = gson.toJson(logData);
log.warn("Screenshare packet timeout. data={}", logStr);
}
String logStr = gson.toJson(logData);
if (!publishing) {
log.warn("Removing scheduled job. data={}", logStr);
// remove the scheduled job
scheduler.removeScheduledJob(timeoutJobName);
}
}
}
}

View File

@ -0,0 +1,11 @@
package org.bigbluebutton.app.screenshare.events;
public class RecordChapterBreakMessage implements IEvent {
public final String meetingId;
public final Long timestamp;
public RecordChapterBreakMessage(String meetingId, Long timestamp) {
this.meetingId = meetingId;
this.timestamp = timestamp;
}
}

View File

@ -6,11 +6,13 @@ import org.bigbluebutton.app.screenshare.events.*;
import com.google.gson.Gson;
import org.red5.logging.Red5LoggerFactory;
import org.slf4j.Logger;
import org.bigbluebutton.app.screenshare.MeetingManager;
public class EventListenerImp implements IEventListener {
private static Logger log = Red5LoggerFactory.getLogger(EventListenerImp.class, "screenshare");
private ConnectionInvokerService sender;
private MeetingManager meetingManager;
@Override
public void handleMessage(IEvent event) {
if (event instanceof ScreenShareStartedEvent) {
@ -27,6 +29,9 @@ public class EventListenerImp implements IEventListener {
sendIsScreenSharingResponse((IsScreenSharingResponse) event);
} else if (event instanceof ScreenShareClientPing) {
sendScreenShareClientPing((ScreenShareClientPing) event);
} else if (event instanceof RecordChapterBreakMessage) {
RecordChapterBreakMessage rcbm = (RecordChapterBreakMessage) event;
meetingManager.stopStartAllRecordings(rcbm.meetingId);
}
}
@ -202,7 +207,10 @@ public class EventListenerImp implements IEventListener {
}
public void setMeetingManager(MeetingManager meetingManager) {
this.meetingManager = meetingManager;
}
public void setMessageSender(ConnectionInvokerService sender) {
this.sender = sender;

View File

@ -34,9 +34,11 @@ import org.red5.server.api.stream.IServerStream;
import org.red5.server.api.stream.IStreamListener;
import org.red5.server.stream.ClientBroadcastStream;
import org.slf4j.Logger;
import org.red5.server.scheduling.QuartzSchedulingService;
import com.google.gson.Gson;
import org.bigbluebutton.app.screenshare.MeetingManager;
import org.bigbluebutton.app.screenshare.VideoStreamListener;
import org.bigbluebutton.app.screenshare.VideoStream;
import org.bigbluebutton.app.screenshare.EventRecordingService;
import org.bigbluebutton.app.screenshare.IScreenShareApplication;
import org.bigbluebutton.app.screenshare.ScreenshareStreamListener;
@ -44,21 +46,27 @@ import org.bigbluebutton.app.screenshare.ScreenshareStreamListener;
public class Red5AppAdapter extends MultiThreadedApplicationAdapter {
private static Logger log = Red5LoggerFactory.getLogger(Red5AppAdapter.class, "screenshare");
private EventRecordingService recordingService;
private final Map<String, IStreamListener> streamListeners = new HashMap<String, IStreamListener>();
// Scheduler
private QuartzSchedulingService scheduler;
private EventRecordingService recordingService;
private IScreenShareApplication app;
private String streamBaseUrl;
private ConnectionInvokerService sender;
private String recordingDirectory;
private final Pattern STREAM_ID_PATTERN = Pattern.compile("(.*)-(.*)-(.*)$");
private MeetingManager meetingManager;
private int packetTimeout = 10000;
@Override
public boolean appStart(IScope app) {
super.appStart(app);
log.info("BBB Screenshare appStart");
sender.setAppScope(app);
// get the scheduler
scheduler = (QuartzSchedulingService) getContext().getBean(QuartzSchedulingService.BEAN_NAME);
return true;
}
@ -148,12 +156,14 @@ public class Red5AppAdapter extends MultiThreadedApplicationAdapter {
app.streamStarted(meetingId, streamId, url);
boolean recordVideoStream = app.recordStream(meetingId, streamId);
if (recordVideoStream) {
recordStream(stream);
ScreenshareStreamListener listener = new ScreenshareStreamListener(recordingService, recordingDirectory);
stream.addStreamListener(listener);
streamListeners.put(conn.getScope().getName() + "-" + stream.getPublishedName(), listener);
}
VideoStreamListener listener = new VideoStreamListener(meetingId, streamId,
recordVideoStream, recordingDirectory, packetTimeout, scheduler, recordingService);
ClientBroadcastStream cstream = (ClientBroadcastStream) this.getBroadcastStream(conn.getScope(), stream.getPublishedName());
stream.addStreamListener(listener);
VideoStream vstream = new VideoStream(stream, listener, cstream);
vstream.startRecording();
meetingManager.addStream(meetingId, vstream);
Map<String, Object> logData = new HashMap<String, Object>();
logData.put("meetingId", meetingId);
@ -182,43 +192,12 @@ public class Red5AppAdapter extends MultiThreadedApplicationAdapter {
String streamId = stream.getPublishedName();
Matcher matcher = STREAM_ID_PATTERN.matcher(stream.getPublishedName());
if (matcher.matches()) {
String meetingId = matcher.group(1).trim();
app.streamStopped(meetingId, streamId);
String meetingId = matcher.group(1).trim();
app.streamStopped(meetingId, streamId);
boolean recordVideoStream = app.recordStream(meetingId, streamId);
if (recordVideoStream) {
IConnection conn = Red5.getConnectionLocal();
String scopeName;
if (conn != null) {
scopeName = conn.getScope().getName();
} else {
log.info("Connection local was null, using scope name from the stream: {}", stream);
scopeName = stream.getScope().getName();
}
IStreamListener listener = streamListeners.remove(scopeName + "-" + stream.getPublishedName());
if (listener != null) {
stream.removeStreamListener(listener);
}
boolean recordVideoStream = app.recordStream(meetingId, streamId);
meetingManager.streamBroadcastClose(meetingId, streamId);
String filename = recordingDirectory;
if (!filename.endsWith("/")) {
filename.concat("/");
}
filename = filename.concat(meetingId).concat("/").concat(stream.getPublishedName()).concat(".flv");
long publishDuration = (System.currentTimeMillis() - stream.getCreationTime()) / 1000;
Map<String, String> event = new HashMap<String, String>();
event.put("module", "Deskshare");
event.put("timestamp", genTimestamp().toString());
event.put("meetingId", scopeName);
event.put("stream", stream.getPublishedName());
event.put("file", filename);
event.put("duration", new Long(publishDuration).toString());
event.put("eventName", "DeskshareStoppedEvent");
recordingService.record(scopeName, event);
}
Map<String, Object> logData = new HashMap<String, Object>();
logData.put("meetingId", meetingId);
@ -234,27 +213,10 @@ public class Red5AppAdapter extends MultiThreadedApplicationAdapter {
}
}
/**
* A hook to record a stream. A file is written in webapps/video/streams/
* @param stream
*/
private void recordStream(IBroadcastStream stream) {
IConnection conn = Red5.getConnectionLocal();
long now = System.currentTimeMillis();
String recordingStreamName = stream.getPublishedName(); // + "-" + now; /** Comment out for now...forgot why I added this - ralam */
try {
log.info("Recording stream " + recordingStreamName );
ClientBroadcastStream cstream = (ClientBroadcastStream) this.getBroadcastStream(conn.getScope(), stream.getPublishedName());
cstream.saveAs(recordingStreamName, false);
} catch(Exception e) {
log.error("ERROR while recording stream " + e.getMessage());
e.printStackTrace();
}
public void setMeetingManager(MeetingManager meetingManager) {
this.meetingManager = meetingManager;
}
public void setEventRecordingService(EventRecordingService s) {
recordingService = s;
}

View File

@ -1,8 +1,8 @@
package org.bigbluebutton.app.screenshare.redis
import akka.actor.{Actor, ActorLogging, ActorRef, Props}
import com.fasterxml.jackson.databind.JsonNode
import org.bigbluebutton.app.screenshare.server.sessions.messages.{MeetingCreated, MeetingEnded}
import org.bigbluebutton.app.screenshare.server.sessions.messages.{MeetingCreated, MeetingEnded, RecordingChapterBreak}
import akka.actor.{Actor, ActorLogging, ActorRef, Props}
import scala.reflect.runtime.universe._
import org.bigbluebutton.common2.msgs._
@ -56,6 +56,12 @@ class ReceivedJsonMsgHandlerActor(screenshareManager: ActorRef)
} yield {
screenshareManager ! new MeetingEnded(m.body.meetingId)
}
case RecordingChapterBreakSysMsg.NAME =>
for {
m <- deserialize[RecordingChapterBreakSysMsg](jsonNode)
} yield {
screenshareManager ! new RecordingChapterBreak(m.body.meetingId, m.body.timestamp)
}
case _ =>
// log.error("Cannot route envelope name " + envelope.name)
// do nothing

View File

@ -18,11 +18,11 @@
*/
package org.bigbluebutton.app.screenshare.server.sessions
import akka.actor.{Actor, ActorLogging, ActorSystem, Props}
import org.bigbluebutton.app.screenshare.StreamInfo
import org.bigbluebutton.app.screenshare.server.sessions.Session.StopSession
import akka.actor.{Actor, ActorLogging, ActorSystem, Props}
import scala.collection.mutable.HashMap
import org.bigbluebutton.app.screenshare.events.{IEventsMessageBus, IsScreenSharingResponse, ScreenShareRequestTokenFailedResponse}
import org.bigbluebutton.app.screenshare.events.{IEventsMessageBus, IsScreenSharingResponse, RecordChapterBreakMessage, ScreenShareRequestTokenFailedResponse}
import org.bigbluebutton.app.screenshare.server.sessions.messages._
object ScreenshareManager {
@ -57,10 +57,15 @@ class ScreenshareManager(val aSystem: ActorSystem, val bus: IEventsMessageBus)
case msg: MeetingEnded => handleMeetingHasEnded(msg)
case msg: MeetingCreated => handleMeetingCreated(msg)
case msg: ClientPongMessage => handleClientPongMessage(msg)
case msg: RecordingChapterBreak => handleRecordingChapterBreak(msg)
case msg: Any => log.warning("Unknown message " + msg)
}
private def handleRecordingChapterBreak(msg: RecordingChapterBreak): Unit = {
bus.send(new RecordChapterBreakMessage(msg.meetingId, msg.timestamp))
}
private def handleClientPongMessage(msg: ClientPongMessage) {
if (log.isDebugEnabled) {
log.debug("Received ClientPongMessage message for meeting=[" + msg.meetingId + "]")

View File

@ -47,4 +47,6 @@ case class MeetingEnded(meetingId: String)
case class MeetingCreated(meetingId: String, record: Boolean)
case class ClientPongMessage(meetingId: String, userId: String, streamId: String, timestamp: Long)
case class ClientPongMessage(meetingId: String, userId: String, streamId: String, timestamp: Long)
case class RecordingChapterBreak(meetingId: String, timestamp: Long)

View File

@ -52,7 +52,10 @@ with BigBlueButton; if not, see <http://www.gnu.org/licenses/>.
<property name="recordingDirectory" value="${recordingDirectory}"/>
<property name="application" ref="screenShareApplication"/>
<property name="messageSender" ref="connectionInvokerService"/>
<property name="meetingManager" ref="meetingManager"/>
</bean>
<bean id="meetingManager" class="org.bigbluebutton.app.screenshare.MeetingManager"/>
<bean id="screenshare.service" class="org.bigbluebutton.app.screenshare.red5.Red5AppService">
<property name="appHandler" ref="red5AppHandler"/>
@ -77,6 +80,7 @@ with BigBlueButton; if not, see <http://www.gnu.org/licenses/>.
<bean id="eventListenerImp" class="org.bigbluebutton.app.screenshare.red5.EventListenerImp">
<property name="messageSender" ref="connectionInvokerService"/>
<property name="meetingManager" ref="meetingManager"/>
</bean>
<bean id="jnlpConfigurator" class="org.bigbluebutton.app.screenshare.server.servlet.JnlpConfigurator">