send rtmp broadcast notification to viewers(flash has to handle)

This commit is contained in:
Anton Georgiev 2015-07-02 16:24:53 +00:00
parent 7192079977
commit 766fea281d
9 changed files with 178 additions and 5 deletions

View File

@ -297,10 +297,11 @@ class BigBlueButtonActor(val system: ActorSystem, outGW: MessageOutGateway, voic
private def handleDeskShareStartedRequest(msg: DeskShareStartedRequest) {
val DESKSHARE_CONFERENCE_NAME_SUFFIX = "-DESKSHARE"
println("\nBBBActor: handleDeskShareStartedRequest" + msg.conferenceName + msg.callerId + msg.callerIdName + "\n")
if (msg.conferenceName.endsWith(DESKSHARE_CONFERENCE_NAME_SUFFIX)) {
var originalConfId = msg.conferenceName.replace(DESKSHARE_CONFERENCE_NAME_SUFFIX, "")
println("\nBBBActor: handleDeskShareStartedRequest" + msg.conferenceName + msg.callerId
+ originalConfId + msg.callerIdName + "\n")
findMeetingWithVoiceConfId(originalConfId) foreach { m =>
{
m.actorRef ! msg

View File

@ -379,6 +379,11 @@ class MeetingActor(val mProps: MeetingProperties, val outGW: MessageOutGateway)
meetingModel.setRTMPBroadcastingUrl(msg.streamname)
meetingModel.broadcastingRTMPStarted()
// Notify viewers in the meeting that there's an rtmp stream to view
outGW.send(new DeskShareNotifyViewersRTMP(mProps.meetingID, meetingModel.getRTMPBroadcastingUrl(),
true, System.currentTimeMillis().toString()))
println("DESKSHARE_RTMP_BROADCAST_STARTED_MESSAGE1 " + meetingModel.getRTMPBroadcastingUrl())
}
private def handleDeskShareRTMPBroadcastStoppedRequest(msg: DeskShareRTMPBroadcastStoppedRequest) {
@ -388,6 +393,11 @@ class MeetingActor(val mProps: MeetingProperties, val outGW: MessageOutGateway)
meetingModel.broadcastingRTMPStoppped()
// TODO should I do: meetingModel.setRTMPBroadcastingUrl("") ?
// TODO notify viewers that RTMP broadcast stopped
outGW.send(new DeskShareNotifyViewersRTMP(mProps.meetingID, meetingModel.getRTMPBroadcastingUrl(),
false, System.currentTimeMillis().toString()))
println("DESKSHARE_RTMP_BROADCAST_STOPPED_MESSAGE")
}
}

View File

@ -117,6 +117,7 @@ case class DeskShareStartRecording(conferenceName: String, filename: String, tim
case class DeskShareStopRecording(conferenceName: String, filename: String, timestamp: String) extends IOutMessage
case class DeskShareStartRTMPBroadcast(conferenceName: String, streamPath: String, timestamp: String) extends IOutMessage
case class DeskShareStopRTMPBroadcast(conferenceName: String, streamPath: String, timestamp: String) extends IOutMessage
case class DeskShareNotifyViewersRTMP(meetingID: String, streamPath: String, broadcasting: Boolean, timestamp: String) extends IOutMessage
// Value Objects
case class MeetingVO(id: String, recorded: Boolean)

View File

@ -18,31 +18,38 @@ class DeskShareEventRedisPublisher(service: MessageSender) extends OutMessageLis
case msg: DeskShareStopRecording => handleDeskShareStopRecording(msg)
case msg: DeskShareStartRTMPBroadcast => handleDeskShareStartRTMPBroadcast(msg)
case msg: DeskShareStopRTMPBroadcast => handleDeskShareStopRTMPBroadcast(msg)
case msg: DeskShareNotifyViewersRTMP => handleDeskShareNotifyViewersRTMP(msg)
case _ => // do nothing
}
}
private def handleDeskShareStartRecording(msg: DeskShareStartRecording) {
println("_______handleDeskShareStartRecording____________")
println("_____publish to FS__handleDeskShareStartRecording____________")
val json = DeskShareMessageToJsonConverter.getDeskShareStartRecordingToJson(msg)
service.send(MessagingConstants.TO_VOICE_CONF_SYSTEM_CHAN, json)
}
private def handleDeskShareStopRecording(msg: DeskShareStopRecording) {
println("_______handleDeskShareStopRecording____________")
println("_____publish to FS__handleDeskShareStopRecording____________")
val json = DeskShareMessageToJsonConverter.getDeskShareStopRecordingToJson(msg)
service.send(MessagingConstants.TO_VOICE_CONF_SYSTEM_CHAN, json)
}
private def handleDeskShareStartRTMPBroadcast(msg: DeskShareStartRTMPBroadcast) {
println("_______handleDeskShareStartRTMPBroadcast____________")
println("_____publish to FS__handleDeskShareStartRTMPBroadcast____________")
val json = DeskShareMessageToJsonConverter.getDeskShareStartRTMPBroadcastToJson(msg)
service.send(MessagingConstants.TO_VOICE_CONF_SYSTEM_CHAN, json)
}
private def handleDeskShareStopRTMPBroadcast(msg: DeskShareStopRTMPBroadcast) {
println("_______handleDeskShareStopRTMPBroadcast____________")
println("_____publish to FS__handleDeskShareStopRTMPBroadcast____________")
val json = DeskShareMessageToJsonConverter.getDeskShareStopRTMPBroadcastToJson(msg)
service.send(MessagingConstants.TO_VOICE_CONF_SYSTEM_CHAN, json)
}
private def handleDeskShareNotifyViewersRTMP(msg: DeskShareNotifyViewersRTMP) {
println("_____publish to bigbluebutton-apps(red5) __handleDeskShareNotifyViewersRTMP____________")
val json = DeskShareMessageToJsonConverter.getDeskShareNotifyViewersRTMPToJson(msg)
service.send(MessagingConstants.FROM_DESK_SHARE_CHANNEL, json)
}
}

View File

@ -5,6 +5,7 @@ import org.bigbluebutton.common.messages.DeskShareStartRecordingEventMessage
import org.bigbluebutton.common.messages.DeskShareStopRecordingEventMessage
import org.bigbluebutton.common.messages.DeskShareStartRTMPBroadcastEventMessage
import org.bigbluebutton.common.messages.DeskShareStopRTMPBroadcastEventMessage
import org.bigbluebutton.common.messages.DeskShareNotifyViewersRTMPEventMessage
object DeskShareMessageToJsonConverter {
@ -33,4 +34,10 @@ object DeskShareMessageToJsonConverter {
val newMsg = new DeskShareStopRTMPBroadcastEventMessage(msg.conferenceName, msg.streamPath, msg.timestamp)
newMsg.toJson()
}
def getDeskShareNotifyViewersRTMPToJson(msg: DeskShareNotifyViewersRTMP): String = {
println("^^^^^getDeskShareNotifyViewersRTMPToJson in DeskShareMessageToJsonConverter")
val newMsg = new DeskShareNotifyViewersRTMPEventMessage(msg.meetingID, msg.streamPath, msg.broadcasting, msg.timestamp)
newMsg.toJson()
}
}

View File

@ -0,0 +1,70 @@
package org.bigbluebutton.common.messages;
import java.util.HashMap;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
//Message from bbb-akka-apps to bbb-apps-red5
public class DeskShareNotifyViewersRTMPEventMessage {
public static final String DESK_SHARE_NOTIFY_VIEWERS_RTMP = "desk_share_notify_viewers_rtmp";
public static final String VERSION = "0.0.1";
public static final String MEETING_ID = "meetingid";
public static final String STREAM_PATH = "stream_path";
public static final String BROADCASTING = "broadcasting";
public static final String TIMESTAMP = "timestamp";
public final String meetingId;
public final String streamPath;
public final boolean broadcasting;
public final String timestamp;
public DeskShareNotifyViewersRTMPEventMessage(String meetingId, String streamPath,
boolean broadcasting, String timestamp) {
this.meetingId = meetingId;
this.streamPath = streamPath;
this.broadcasting = broadcasting;
this.timestamp = timestamp;
}
public String toJson() {
HashMap<String, Object> payload = new HashMap<String, Object>();
payload.put(MEETING_ID, meetingId);
payload.put(STREAM_PATH, streamPath);
payload.put(BROADCASTING, broadcasting);
payload.put(TIMESTAMP, timestamp);
java.util.HashMap<String, Object> header = MessageBuilder.buildHeader(DESK_SHARE_NOTIFY_VIEWERS_RTMP, VERSION, null);
return MessageBuilder.buildJson(header, payload);
}
public static DeskShareNotifyViewersRTMPEventMessage fromJson(String message) {
JsonParser parser = new JsonParser();
JsonObject obj = (JsonObject) parser.parse(message);
if (obj.has("header") && obj.has("payload")) {
JsonObject header = (JsonObject) obj.get("header");
JsonObject payload = (JsonObject) obj.get("payload");
if (header.has("name")) {
String messageName = header.get("name").getAsString();
if (DESK_SHARE_NOTIFY_VIEWERS_RTMP.equals(messageName)) {
if (payload.has(MEETING_ID)
&& payload.has(BROADCASTING)
&& payload.has(TIMESTAMP)
&& payload.has(STREAM_PATH)) {
String meetingId = payload.get(MEETING_ID).getAsString();
String streamPath = payload.get(STREAM_PATH).getAsString();
boolean broadcasting = payload.get(BROADCASTING).getAsBoolean();
String timestamp = payload.get(TIMESTAMP).getAsString();
return new DeskShareNotifyViewersRTMPEventMessage(meetingId, streamPath, broadcasting, timestamp);
}
}
}
}
return null;
}
}

View File

@ -29,6 +29,7 @@ public class MessagingConstants {
public static final String FROM_USERS_CHANNEL = FROM_BBB_APPS_CHANNEL + ":users";
public static final String FROM_CHAT_CHANNEL = FROM_BBB_APPS_CHANNEL + ":chat";
public static final String FROM_WHITEBOARD_CHANNEL = FROM_BBB_APPS_CHANNEL + ":whiteboard";
public static final String FROM_DESK_SHARE_CHANNEL = FROM_BBB_APPS_CHANNEL + ":deskshare";
public static final String TO_BBB_APPS_CHANNEL = "bigbluebutton:to-bbb-apps";
public static final String TO_BBB_APPS_PATTERN = TO_BBB_APPS_CHANNEL + ":*";

View File

@ -0,0 +1,70 @@
package org.bigbluebutton.red5.client;
import java.util.HashMap;
import java.util.Map;
import org.bigbluebutton.common.messages.ChatKeyUtil;
import org.bigbluebutton.common.messages.DeskShareNotifyViewersRTMPEventMessage;
import org.bigbluebutton.red5.client.messaging.BroadcastClientMessage;
import org.bigbluebutton.red5.client.messaging.ConnectionInvokerService;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
public class DeskShareMessageSender {
private ConnectionInvokerService service;
public DeskShareMessageSender(ConnectionInvokerService service) {
this.service = service;
}
public void handleDeskShareMessage(String message) {
JsonParser parser = new JsonParser();
JsonObject obj = (JsonObject) parser.parse(message);
if (obj.has("header") && obj.has("payload")) {
JsonObject header = (JsonObject) obj.get("header");
if (header.has("name")) {
String messageName = header.get("name").getAsString();
switch (messageName) {
case DeskShareNotifyViewersRTMPEventMessage.DESK_SHARE_NOTIFY_VIEWERS_RTMP:
DeskShareNotifyViewersRTMPEventMessage rtmp = DeskShareNotifyViewersRTMPEventMessage.fromJson(message);
System.out.println("DESKSHARE_RTMP_BROADCAST_STARTED_MESSAGE:" + rtmp.toJson());
if (rtmp != null) {
processDeskShareNotifyViewersRTMPEventMessage(rtmp);
}
break;
}
}
}
}
private void processDeskShareNotifyViewersRTMPEventMessage(DeskShareNotifyViewersRTMPEventMessage msg) {
Map<String, Object> messageInfo = new HashMap<String, Object>();
// messageInfo.put("rtmpUrl", msg.streamPath);
// messageInfo.put("broadcasting", msg.broadcasting);
System.out.println("RedisPubSubMessageHandler - " + msg.toJson());
System.out.println("RedisPubSubMessageHandler - processDeskShareNotifyViewersRTMPEventMessage \n" +msg.streamPath+ "\n");
messageInfo.put(ChatKeyUtil.CHAT_TYPE, "PUBLIC_CHAT");
messageInfo.put(ChatKeyUtil.FROM_USERID, "enm6bgnif0d4_2");
messageInfo.put(ChatKeyUtil.FROM_USERNAME, "User 4526544");
messageInfo.put(ChatKeyUtil.TO_USERID, "public_chat_userid");
messageInfo.put(ChatKeyUtil.TO_USERNAME, "public_chat_username");
messageInfo.put(ChatKeyUtil.FROM_TIME, "1.435851039645E12");
messageInfo.put(ChatKeyUtil.FROM_TZ_OFFSET, "240");
messageInfo.put(ChatKeyUtil.FROM_COLOR, "0");
messageInfo.put(ChatKeyUtil.MESSAGE, "BROADCASTING_RTMP:"+msg.broadcasting + " " + msg.streamPath);
// BroadcastClientMessage m = new BroadcastClientMessage(msg.meetingId, "DeskShareRTMPBroadcastNotification", messageInfo);
BroadcastClientMessage m = new BroadcastClientMessage(msg.meetingId, "ChatReceivePublicMessageCommand", messageInfo);
service.sendMessage(m);
}
}

View File

@ -6,6 +6,7 @@ import org.bigbluebutton.red5.client.PresentationClientMessageSender;
import org.bigbluebutton.red5.client.UserClientMessageSender;
import org.bigbluebutton.red5.client.ChatClientMessageSender;
import org.bigbluebutton.red5.client.WhiteboardClientMessageSender;
import org.bigbluebutton.red5.client.DeskShareMessageSender;
import org.bigbluebutton.red5.client.messaging.ConnectionInvokerService;
import org.bigbluebutton.red5.monitoring.BbbAppsIsKeepAliveHandler;
@ -17,6 +18,7 @@ public class RedisPubSubMessageHandler implements MessageHandler {
private ChatClientMessageSender chatMessageSender;
private PresentationClientMessageSender presentationMessageSender;
private WhiteboardClientMessageSender whiteboardMessageSender;
private DeskShareMessageSender deskShareMessageSender;
private BbbAppsIsKeepAliveHandler bbbAppsIsKeepAliveHandler;
@ -27,6 +29,7 @@ public class RedisPubSubMessageHandler implements MessageHandler {
chatMessageSender = new ChatClientMessageSender(service);
presentationMessageSender = new PresentationClientMessageSender(service);
whiteboardMessageSender = new WhiteboardClientMessageSender(service);
deskShareMessageSender = new DeskShareMessageSender(service);
}
public void setBbbAppsIsKeepAliveHandler(BbbAppsIsKeepAliveHandler handler) {
@ -35,6 +38,7 @@ public class RedisPubSubMessageHandler implements MessageHandler {
@Override
public void handleMessage(String pattern, String channel, String message) {
// System.out.println("in red5 getting message: " + channel + " " + message);
if (channel.equalsIgnoreCase(MessagingConstants.FROM_CHAT_CHANNEL)) {
chatMessageSender.handleChatMessage(message);
} else if (channel.equalsIgnoreCase(MessagingConstants.FROM_PRESENTATION_CHANNEL)) {
@ -47,6 +51,8 @@ public class RedisPubSubMessageHandler implements MessageHandler {
whiteboardMessageSender.handleWhiteboardMessage(message);
} else if (channel.equalsIgnoreCase(MessagingConstants.BBB_APPS_KEEP_ALIVE_CHANNEL)) {
bbbAppsIsKeepAliveHandler.handleKeepAliveMessage(message);
} else if (channel.equalsIgnoreCase(MessagingConstants.FROM_DESK_SHARE_CHANNEL)) {
deskShareMessageSender.handleDeskShareMessage(message);
}
}