Merge pull request #4051 from ritzalam/implement-bbb-web-msgs

handle messages from akka-apps
This commit is contained in:
Anton Georgiev 2017-07-04 17:06:42 -04:00 committed by GitHub
commit 80b18c212f
7 changed files with 154 additions and 125 deletions

View File

@ -142,18 +142,6 @@ class MeetingActor(val props: DefaultProps,
case msg: UserConnectedToGlobalAudio => handleUserConnectedToGlobalAudio(msg)
case msg: UserDisconnectedFromGlobalAudio => handleUserDisconnectedFromGlobalAudio(msg)
case msg: InitializeMeeting => handleInitializeMeeting(msg)
case msg: ClearPresentation => handleClearPresentation(msg)
case msg: PresentationConversionUpdate => handlePresentationConversionUpdate(msg)
case msg: PresentationPageCountError => handlePresentationPageCountError(msg)
case msg: PresentationSlideGenerated => handlePresentationSlideGenerated(msg)
case msg: PresentationConversionCompleted => handlePresentationConversionCompleted(msg)
case msg: RemovePresentation => handleRemovePresentation(msg)
case msg: GetPresentationInfo => handleGetPresentationInfo(msg)
case msg: ResizeAndMoveSlide => handleResizeAndMoveSlide(msg)
case msg: GotoSlide => handleGotoSlide(msg)
case msg: SharePresentation => handleSharePresentation(msg)
case msg: GetSlideInfo => handleGetSlideInfo(msg)
case msg: PreuploadedPresentations => handlePreuploadedPresentations(msg)
case msg: SetRecordingStatus => handleSetRecordingStatus(msg)
case msg: GetRecordingStatus => handleGetRecordingStatus(msg)
case msg: GetPollRequest => handleGetPollRequest(msg)

View File

@ -36,9 +36,12 @@ class BbbWebApiGWApp(val oldMessageReceivedGW: OldMessageReceivedGW) extends IBb
private val msgFromAkkaAppsEventBus = new MsgFromAkkaAppsEventBus
private val msgToAkkaAppsEventBus = new MsgToAkkaAppsEventBus
private val meetingManagerActorRef = system.actorOf(
MeetingsManagerActor.props(msgToAkkaAppsEventBus), "meetingManagerActor")
msgFromAkkaAppsEventBus.subscribe(meetingManagerActorRef, fromAkkaAppsChannel)
/**
* Not used for now as we will still user MeetingService for 2.0 (ralam july 4, 2017)
*/
//private val meetingManagerActorRef = system.actorOf(
// MeetingsManagerActor.props(msgToAkkaAppsEventBus), "meetingManagerActor")
//msgFromAkkaAppsEventBus.subscribe(meetingManagerActorRef, fromAkkaAppsChannel)
private val oldMeetingMsgHdlrActor = system.actorOf(
OldMeetingMsgHdlrActor.props(oldMessageReceivedGW), "oldMeetingMsgHdlrActor"
@ -125,11 +128,15 @@ class BbbWebApiGWApp(val oldMessageReceivedGW: OldMessageReceivedGW) extends IBb
}
def destroyMeeting (msg: DestroyMeetingMessage): Unit = {
val event = MsgBuilder.buildDestroyMeetingSysCmdMsg(msg)
println(event)
msgToAkkaAppsEventBus.publish(MsgToAkkaApps(toAkkaAppsChannel, event))
}
def endMeeting(msg: EndMeetingMessage): Unit = {
val event = MsgBuilder.buildEndMeetingSysCmdMsg(msg)
println(event)
msgToAkkaAppsEventBus.publish(MsgToAkkaApps(toAkkaAppsChannel, event))
}
def sendKeepAlive(system: String, timestamp: java.lang.Long): Unit = {

View File

@ -1,5 +1,6 @@
package org.bigbluebutton.api2
import org.bigbluebutton.api.messaging.converters.messages.{DestroyMeetingMessage, EndMeetingMessage}
import org.bigbluebutton.api2.meeting.RegisterUser
import org.bigbluebutton.common2.domain.{DefaultProps, PageVO, PresentationVO}
import org.bigbluebutton.common2.msgs._
@ -7,6 +8,24 @@ import org.bigbluebutton.presentation.messages._
object MsgBuilder {
def buildDestroyMeetingSysCmdMsg(msg: DestroyMeetingMessage): BbbCommonEnvCoreMsg = {
val routing = collection.immutable.HashMap("sender" -> "bbb-web")
val envelope = BbbCoreEnvelope(DestroyMeetingSysCmdMsg.NAME, routing)
val header = BbbCoreBaseHeader(DestroyMeetingSysCmdMsg.NAME)
val body = DestroyMeetingSysCmdMsgBody(msg.meetingId)
val req = DestroyMeetingSysCmdMsg(header, body)
BbbCommonEnvCoreMsg(envelope, req)
}
def buildEndMeetingSysCmdMsg(msg: EndMeetingMessage): BbbCommonEnvCoreMsg = {
val routing = collection.immutable.HashMap("sender" -> "bbb-web")
val envelope = BbbCoreEnvelope(EndMeetingSysCmdMsg.NAME, routing)
val header = BbbCoreBaseHeader(EndMeetingSysCmdMsg.NAME)
val body = EndMeetingSysCmdMsgBody(msg.meetingId)
val req = EndMeetingSysCmdMsg(header, body)
BbbCommonEnvCoreMsg(envelope, req)
}
def buildCreateMeetingRequestToAkkaApps(props: DefaultProps): BbbCommonEnvCoreMsg = {
val routing = collection.immutable.HashMap("sender" -> "bbb-web")
val envelope = BbbCoreEnvelope(CreateMeetingReqMsg.NAME, routing)

View File

@ -1,16 +1,14 @@
package org.bigbluebutton.api2.bus
import org.bigbluebutton.api2.SystemConfiguration
import org.bigbluebutton.common2.msgs.BbbCoreEnvelope
import org.bigbluebutton.common2.msgs.Deserializer
import org.bigbluebutton.common2.msgs.MeetingCreatedEvtMsg
import org.bigbluebutton.common2.msgs._
import com.fasterxml.jackson.databind.JsonNode
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.Props
import scala.reflect.runtime.universe._
object ReceivedJsonMsgHdlrActor {
def props(msgFromAkkaAppsEventBus: MsgFromAkkaAppsEventBus): Props =
Props(classOf[ReceivedJsonMsgHdlrActor], msgFromAkkaAppsEventBus)
@ -24,6 +22,26 @@ class ReceivedJsonMsgHdlrActor(val msgFromAkkaAppsEventBus: MsgFromAkkaAppsEvent
object JsonDeserializer extends Deserializer
def deserialize[B <: BbbCoreMsg](jsonNode: JsonNode)(implicit tag: TypeTag[B]): Option[B] = {
val (result, error) = JsonDeserializer.toBbbCommonMsg[B](jsonNode)
result match {
case Some(msg) =>
Some(msg.asInstanceOf[B])
case None =>
log.error("Failed to deserialize message " + error)
None
}
}
def route[T <: BbbCoreMsg](envelope: BbbCoreEnvelope, jsonNode: JsonNode)(implicit tag: TypeTag[T]): Unit = {
for {
m <- deserialize[T](jsonNode)
} yield {
send(envelope, m)
}
}
def receive = {
case msg: JsonMsgFromAkkaApps => handleReceivedJsonMessage(msg)
@ -34,38 +52,44 @@ class ReceivedJsonMsgHdlrActor(val msgFromAkkaAppsEventBus: MsgFromAkkaAppsEvent
def handleReceivedJsonMessage(msg: JsonMsgFromAkkaApps): Unit = {
for {
envJsonNode <- JsonDeserializer.toBbbCommonEnvJsNodeMsg(msg.data)
} yield route(envJsonNode.envelope, envJsonNode.core)
} yield handle(envJsonNode.envelope, envJsonNode.core)
}
def route(envelope: BbbCoreEnvelope, jsonNode: JsonNode): Unit = {
def handle(envelope: BbbCoreEnvelope, jsonNode: JsonNode): Unit = {
log.debug("*************** Route envelope name " + envelope.name)
envelope.name match {
case MeetingCreatedEvtMsg.NAME =>
log.debug("**************** Route MeetingCreatedEvtMsg")
for {
m <- routeMeetingCreatedEvtMsg(jsonNode)
} yield {
log.debug("************ Sending MeetingCreatedEvtMsg")
send(envelope, m)
}
route[MeetingCreatedEvtMsg](envelope, jsonNode)
case MeetingEndedEvtMsg.NAME =>
route[MeetingEndedEvtMsg](envelope, jsonNode)
case MeetingDestroyedEvtMsg.NAME =>
route[MeetingDestroyedEvtMsg](envelope, jsonNode)
case PubSubPongSysRespMsg.NAME =>
route[PubSubPongSysRespMsg](envelope, jsonNode)
case UserEmojiChangedEvtMsg.NAME =>
route[UserEmojiChangedEvtMsg](envelope, jsonNode)
case UserJoinedMeetingEvtMsg.NAME =>
route[UserJoinedMeetingEvtMsg](envelope, jsonNode)
case UserLeftMeetingEvtMsg.NAME =>
route[UserLeftMeetingEvtMsg](envelope, jsonNode)
case UserJoinedVoiceConfToClientEvtMsg.NAME =>
route[UserJoinedVoiceConfToClientEvtMsg](envelope, jsonNode)
case UserLeftVoiceConfToClientEvtMsg.NAME =>
route[UserLeftVoiceConfToClientEvtMsg](envelope, jsonNode)
case UserBroadcastCamStartedEvtMsg.NAME =>
route[UserBroadcastCamStartedEvtMsg](envelope, jsonNode)
case UserBroadcastCamStoppedEvtMsg.NAME =>
route[UserBroadcastCamStoppedEvtMsg](envelope, jsonNode)
case CreateBreakoutRoomEvtMsg.NAME =>
route[CreateBreakoutRoomEvtMsg](envelope, jsonNode)
case EndBreakoutRoomEvtMsg.NAME =>
route[EndBreakoutRoomEvtMsg](envelope, jsonNode)
case _ =>
log.debug("************ Cannot route envelope name " + envelope.name)
// do nothing
}
}
def send(msg: MsgFromAkkaApps): Unit = {
log.debug("******************** Routing " + msg.payload.envelope.name)
msgFromAkkaAppsEventBus.publish(msg)
}
def routeMeetingCreatedEvtMsg(jsonNode: JsonNode): Option[MeetingCreatedEvtMsg] = {
val (result, error) = JsonDeserializer.toBbbCommonMsg[MeetingCreatedEvtMsg](jsonNode)
result match {
case Some(msg) => Some(msg.asInstanceOf[MeetingCreatedEvtMsg])
case None =>
log.error("Failed to ValidateAuthTokenReqMsg message with error: " + error)
None
}
}
}

View File

@ -1,8 +1,7 @@
package org.bigbluebutton.api2.bus
import org.bigbluebutton.api2.SystemConfiguration
import org.bigbluebutton.api2.SystemConfiguration
import org.bigbluebutton.common2.msgs.{BbbCommonEnvCoreMsg, BbbCoreEnvelope, MeetingCreatedEvtMsg}
import org.bigbluebutton.common2.msgs.{BbbCommonEnvCoreMsg, BbbCoreEnvelope, BbbCoreMsg}
trait ReceivedMessageRouter extends SystemConfiguration {
@ -12,7 +11,7 @@ trait ReceivedMessageRouter extends SystemConfiguration {
msgFromAkkaAppsEventBus.publish(msg)
}
def send(envelope: BbbCoreEnvelope, msg: MeetingCreatedEvtMsg): Unit = {
def send(envelope: BbbCoreEnvelope, msg: BbbCoreMsg): Unit = {
val event = MsgFromAkkaApps(fromAkkaAppsChannel, BbbCommonEnvCoreMsg(envelope, msg))
publish(event)
}

View File

@ -14,34 +14,16 @@ case class CreateBreakoutRoomMsg(meetingId: String, parentMeetingId: String,
viewerPassword: String, moderatorPassword: String, duration: Int,
sourcePresentationId: String, sourcePresentationSlide: Int,
record: Boolean) extends ApiMsg
case class EndBreakoutRoomMsg() extends ApiMsg
case class KeepAliveReply() extends ApiMsg
case class MeetingDestoyedMsg() extends ApiMsg
case class MeetingStartedMsg() extends ApiMsg
case class AddUserSession(token: String, session: UserSession)
case class RegisterUser(meetingId: String, intUserId: String, name: String, role: String,
extUserId: String, authToken: String, avatarURL: String,
guest: Boolean, authed: Boolean)
case class GetUserSession(token: String)
case class RemoveUserSession(token: String)
case object PurgeRegisteredUsers
case class GetMeetings()
case class GetSessions()
case class CreateMeetingMsg(defaultProps: DefaultProps)
case class GetMeeting(meetingId: String)
case class GetMeetingsWithId(meetingId: String)
case class GetNotEndedMeetingWithId(meetingId: String)
case class IsMeetingWithVoiceBridgeExist(voiceBridge: String)
case class EndMeeting(meetingId: String)
case class AddUserCustomData(meetingId: String, userId: String, userCustomData: collection.immutable.Map[String, String])
case class UserJoinedVoice(msg: UserJoinedVoiceMessage)
case class UserLeftVoice(msg: UserLeftVoice)
case class UserListeningOnly(msg: UserListeningOnly)
case class UserSharedWebcam(msg: UserSharedWebcam)
case class UserUnsharedWebcam(msg: UserUnsharedWebcam)
object MeetingsManagerActor {
def props(msgToAkkaAppsEventBus: MsgToAkkaAppsEventBus): Props =

View File

@ -1,6 +1,7 @@
package org.bigbluebutton.api2.meeting
import akka.actor.{Actor, ActorLogging, Props}
import org.bigbluebutton.api.messaging.messages._
import org.bigbluebutton.api2.bus.OldMessageReceivedGW
import org.bigbluebutton.common2.msgs._
@ -21,83 +22,92 @@ class OldMeetingMsgHdlrActor(val olgMsgGW: OldMessageReceivedGW)
private def handleBbbCommonEnvCoreMsg(msg: BbbCommonEnvCoreMsg): Unit = {
msg.core match {
case m: MeetingCreatedEvtMsg => handleMeetingCreatedEvtMsg(m)
case m: MeetingEndedEvtMsg => handleMeetingEndedEvtMsg(m)
case m: MeetingDestroyedEvtMsg => handleMeetingDestroyedEvtMsg(m)
case m: PubSubPongSysRespMsg => handlePubSubPongSysRespMsg(m)
case m: UserEmojiChangedEvtMsg => handleUserEmojiChangedEvtMsg(m)
case m: UserJoinedMeetingEvtMsg => handleUserJoinedMeetingEvtMsg(m)
case m: UserLeftMeetingEvtMsg => handleUserLeftMeetingEvtMsg(m)
case m: UserJoinedVoiceConfToClientEvtMsg => handleUserJoinedVoiceConfToClientEvtMsg(m)
case m: UserLeftVoiceConfToClientEvtMsg => handleUserLeftVoiceConfToClientEvtMsg(m)
case m: UserBroadcastCamStartedEvtMsg => handleUserBroadcastCamStartedEvtMsg(m)
case m: UserBroadcastCamStoppedEvtMsg => handleUserBroadcastCamStoppedEvtMsg(m)
case m: CreateBreakoutRoomEvtMsg => handleCreateBreakoutRoomSysCmdMsg(m)
case m: EndBreakoutRoomEvtMsg => handleEndBreakoutRoomEvtMsg(m)
case _ => log.error("***** Cannot handle " + msg.envelope.name)
}
}
def handleMeetingCreatedEvtMsg(msg: MeetingCreatedEvtMsg): Unit = {
// listener.handle(new MeetingStarted(meetingId))
olgMsgGW.handle(new MeetingStarted(msg.body.props.meetingProp.intId))
}
def handleMeetingEndedEvtMsg(msg: MeetingEndedEvtMsg): Unit = {
// listener.handle(new MeetingEnded(meetingId))
olgMsgGW.handle(new MeetingEnded(msg.body.meetingId))
}
def handleMeetingDestroyedEvtMsg(msg: MeetingDestroyedEvtMsg): Unit = {
// listener.handle(new MeetingDestroyed(meetingId))
olgMsgGW.handle(new MeetingDestroyed(msg.body.meetingId))
}
// def handleCreateBreakoutRoomSysCmdMsg(msg: CreateBreakoutRoomSysCmdMsg): Unit = {
/* listener.handle(new CreateBreakoutRoom(
msg.payload.breakoutMeetingId,
msg.payload.parentMeetingId,
msg.payload.name,
msg.payload.sequence,
msg.payload.voiceConfId,
msg.payload.viewerPassword,
msg.payload.moderatorPassword,
msg.payload.durationInMinutes,
msg.payload.sourcePresentationId,
msg.payload.sourcePresentationSlide,
msg.payload.record
)
);
*/
// }
def handleCreateBreakoutRoomSysCmdMsg(msg: CreateBreakoutRoomEvtMsg): Unit = {
olgMsgGW.handle(new CreateBreakoutRoom(
msg.body.room.breakoutMeetingId,
msg.body.room.parentId,
msg.body.room.name,
msg.body.room.sequence,
msg.body.room.voiceConfId,
msg.body.room.viewerPassword,
msg.body.room.moderatorPassword,
msg.body.room.durationInMinutes,
msg.body.room.sourcePresentationId,
msg.body.room.sourcePresentationSlide,
msg.body.room.record
))
// def handleEndBreakoutRoomSysCmdMsg(msg: EndBreakoutRoomSysCmdMsg): Unit = {
//listener.handle(new EndBreakoutRoom(msg.payload.meetingId))
// }
}
// def handlePubSubPongSysRespMsg(msg: PubSubPongSysRespMsg): Unit = {
// new KeepAliveReply(m.payload.system, m.payload.timestamp)
// }
def handleEndBreakoutRoomEvtMsg(msg: EndBreakoutRoomEvtMsg): Unit = {
olgMsgGW.handle(new EndBreakoutRoom(msg.body.breakoutMeetingId))
}
// def handleUserJoinedMeetingEvtMsg(msg: UserJoinedMeetingEvtMsg): Unit = {
// listener.handle(new UserJoined(meetingId, userid, externuserid, username, role, avatarURL, guest, waitingForAcceptance));
def handlePubSubPongSysRespMsg(msg: PubSubPongSysRespMsg): Unit = {
olgMsgGW.handle(new org.bigbluebutton.api.messaging.messages.KeepAliveReply(msg.body.system, msg.body.timestamp))
}
// }
def handleUserJoinedMeetingEvtMsg(msg: UserJoinedMeetingEvtMsg): Unit = {
olgMsgGW.handle(new UserJoined(msg.header.meetingId, msg.body.intId,
msg.body.extId, msg.body.name, msg.body.role, msg.body.avatar, msg.body.guest, msg.body.waitingForAcceptance))
// def handleUserStausChangedEvtMsg(msg: UserStatusChangedEvtMsg): Unit = {
//listener.handle(new UserStatusChanged(meetingId, userid, status, value))
// }
}
// def handleUserLeftMeetingEvtMsg(msg: UserLeftMeetingEvtMsg): Unit = {
// listener.handle(new UserLeft(meetingId, userid))
// }
def handleUserEmojiChangedEvtMsg(msg: UserEmojiChangedEvtMsg): Unit = {
//listener.handle(new UserStatusChanged(meetingId, userid, status, value))
}
/**
* for (MessageListener listener : listeners) {
listener.handle(new UserJoinedVoice(meetingId, userid));
def handleUserLeftMeetingEvtMsg(msg: UserLeftMeetingEvtMsg): Unit = {
olgMsgGW.handle(new UserLeft(msg.header.meetingId, msg.body.intId))
}
def handleUserJoinedVoiceConfToClientEvtMsg(msg: UserJoinedVoiceConfToClientEvtMsg): Unit = {
if (msg.body.listenOnly) {
olgMsgGW.handle(new UserListeningOnly(msg.header.meetingId, msg.body.intId, msg.body.listenOnly))
} else {
olgMsgGW.handle(new UserJoinedVoice(msg.header.meetingId, msg.body.intId))
}
for (MessageListener listener : listeners) {
listener.handle(new UserLeftVoice(meetingId, userid));
}
for (MessageListener listener : listeners) {
listener.handle(new UserListeningOnly(meetingId, userid, listenOnly));
}
for (MessageListener listener : listeners) {
listener.handle(new UserSharedWebcam(meetingId, userid, stream));
}
for (MessageListener listener : listeners) {
listener.handle(new UserUnsharedWebcam(meetingId, userid, stream));
}
for (MessageListener listener : listeners) {
listener.handle(new UserRoleChanged(meetingId, userid, role));
}
for (MessageListener listener : listeners) {
listener.handle(new StunTurnInfoRequested(meetingId, requesterId));
}
*/
}
def handleUserLeftVoiceConfToClientEvtMsg(msg: UserLeftVoiceConfToClientEvtMsg): Unit = {
olgMsgGW.handle(new UserLeftVoice(msg.header.meetingId, msg.body.intId))
}
def handleUserBroadcastCamStartedEvtMsg(msg: UserBroadcastCamStartedEvtMsg): Unit = {
olgMsgGW.handle(new UserSharedWebcam(msg.header.meetingId, msg.body.userId, msg.body.stream))
}
def handleUserBroadcastCamStoppedEvtMsg(msg: UserBroadcastCamStoppedEvtMsg): Unit = {
olgMsgGW.handle(new UserUnsharedWebcam(msg.header.meetingId, msg.body.userId, msg.body.stream))
}
}