- redo how we deserialize messages

This commit is contained in:
Richard Alam 2017-06-01 09:47:14 -07:00
parent 7f9809459c
commit 1a58703fa4
7 changed files with 111 additions and 41 deletions

View File

@ -5,6 +5,8 @@ import org.bigbluebutton.client.bus.{JsonMsgFromAkkaApps, MsgFromAkkaApps, MsgFr
import org.bigbluebutton.common2.messages.BbbCommonEnvJsNodeMsg
import org.bigbluebutton.common2.util.JsonUtil
import scala.util.{Failure, Success}
object ReceivedJsonMsgHdlrActor {
def props(msgFromAkkaAppsEventBus: MsgFromAkkaAppsEventBus): Props =
@ -22,8 +24,11 @@ class ReceivedJsonMsgHdlrActor(val msgFromAkkaAppsEventBus: MsgFromAkkaAppsEvent
}
def handleReceivedJsonMessage(msg: JsonMsgFromAkkaApps): Unit = {
println("****** Received JSON msg " + msg.data)
val serverMsg = JsonUtil.fromJson[BbbCommonEnvJsNodeMsg](msg.data)
msgFromAkkaAppsEventBus.publish(MsgFromAkkaApps(fromAkkaAppsChannel, serverMsg))
log.debug("****** Received JSON msg " + msg.data)
JsonUtil.fromJson[BbbCommonEnvJsNodeMsg](msg.data) match {
case Success(m) => msgFromAkkaAppsEventBus.publish(MsgFromAkkaApps(fromAkkaAppsChannel, m))
case Failure(ex) => log.error("Failed to deserialize message " + ex)
}
}
}

View File

@ -0,0 +1,13 @@
package org.bigbluebutton.client
import org.bigbluebutton.client.bus.{MsgFromAkkaApps, MsgFromAkkaAppsEventBus}
trait ReceivedMessageRouter {
val msgFromAkkaAppsEventBus: MsgFromAkkaAppsEventBus
def publish(msg: MsgFromAkkaApps): Unit = {
msgFromAkkaAppsEventBus.publish(msg)
}
def send()
}

View File

@ -3,8 +3,10 @@ package org.bigbluebutton.client.meeting
import akka.actor.{Actor, ActorLogging, Props}
import org.bigbluebutton.client.SystemConfiguration
import org.bigbluebutton.client.bus._
import org.bigbluebutton.common2.messages.{BbbCommonEnvJsNodeMsg, BbbCoreEnvelope, BbbCoreHeaderBody}
import org.bigbluebutton.common2.messages.{BbbCommonEnvJsNodeMsg, BbbCoreEnvelope}
import org.bigbluebutton.common2.util.JsonUtil
import com.fasterxml.jackson.databind.JsonNode
import scala.util.{Failure, Success}
object UserActor {
def props(userId: String,
@ -62,24 +64,43 @@ class UserActor(val userId: String,
}
def handleMsgFromClientMsg(msg: MsgFromClientMsg):Unit = {
println("**** UserActor handleMsgFromClient " + msg.json)
log.debug("Received MsgFromClientMsg " + msg)
val map = JsonUtil.toMap[Map[String, Any]](msg.json)
for {
header <- map.get("header")
name <- header.get("name")
meetingId <- header.get("meetingId")
} yield {
val meta = collection.immutable.HashMap[String, String](
"meetingId" -> msg.connInfo.meetingId,
"userId" -> msg.connInfo.userId
)
val envelope = new BbbCoreEnvelope(name.toString, meta)
val akkaMsg = BbbCommonEnvJsNodeMsg(envelope, JsonUtil.toJsonNode(msg.json))
msgToAkkaAppsEventBus.publish(MsgToAkkaApps(toAkkaAppsChannel, akkaMsg))
def convertToJsonNode(json: String): Option[JsonNode] = {
JsonUtil.toJsonNode(json) match {
case Success(jsonNode) => Some(jsonNode)
case Failure(ex) => log.error("Failed to process client message body " + ex)
None
}
}
val msgAsMap = JsonUtil.toMap[Map[String, Any]](msg.json)
msgAsMap match {
case Success(map) =>
for {
header <- map.get("header")
name <- header.get("name")
meetingId <- header.get("meetingId")
} yield {
val meta = collection.immutable.HashMap[String, String](
"meetingId" -> msg.connInfo.meetingId,
"userId" -> msg.connInfo.userId
)
val envelope = new BbbCoreEnvelope(name.toString, meta)
for {
jsonNode <- convertToJsonNode(msg.json)
} yield {
val akkaMsg = BbbCommonEnvJsNodeMsg(envelope, jsonNode)
msgToAkkaAppsEventBus.publish(MsgToAkkaApps(toAkkaAppsChannel, akkaMsg))
}
}
case Failure(ex) => log.error("Failed to process client message " + ex)
}
}
def handleBbbServerMsg(msg: BbbCommonEnvJsNodeMsg): Unit = {

View File

@ -71,15 +71,16 @@ class BbbWebApiGWApp(val oldMessageReceivedGW: OldMessageReceivedGW) extends IBb
val meetingProp = MeetingProp(name = meetingName, extId = extMeetingId, intId = meetingId,
isBreakout = isBreakout.booleanValue())
val durationProps = DurationProps(duration = duration, createdTime = createTime, createDate)
val durationProps = DurationProps(duration = duration.intValue(), createdTime = createTime.longValue(), createDate)
val password = PasswordProp(moderatorPass = moderatorPass, viewerPass = viewerPass)
val recordProp = RecordProp(record = recorded, autoStartRecording = autoStartRecording,
allowStartStopRecording = allowStartStopRecording)
val breakoutProps = BreakoutProps(parentId = parentMeetingId, sequence = sequence, breakoutRooms = Vector())
val recordProp = RecordProp(record = recorded.booleanValue(), autoStartRecording = autoStartRecording.booleanValue(),
allowStartStopRecording = allowStartStopRecording.booleanValue())
val breakoutProps = BreakoutProps(parentId = parentMeetingId, sequence = sequence.intValue(), breakoutRooms = Vector())
val welcomeProp = WelcomeProp(welcomeMsgTemplate = welcomeMsgTemplate, welcomeMsg = welcomeMsg,
modOnlyMessage = modOnlyMessage)
val voiceProp = VoiceProp(telVoice = voiceBridge, voiceConf = voiceBridge, dialNumber = dialNumber)
val usersProp = UsersProp(maxUsers = maxUsers, webcamsOnlyForModerator = webcamsOnlyForModerator,
val usersProp = UsersProp(maxUsers = maxUsers.intValue(), webcamsOnlyForModerator = webcamsOnlyForModerator.booleanValue(),
guestPolicy = guestPolicy)
val metadataProp = MetadataProp(mapAsScalaMap(metadata).toMap)
val screenshareProps = ScreenshareProps(screenshareConf = "FixMe!", red5ScreenshareIp = "fixMe!",

View File

@ -5,13 +5,18 @@ import org.bigbluebutton.api2.SystemConfiguration
import org.bigbluebutton.common2.messages._
import com.fasterxml.jackson.databind.JsonNode
import scala.util.{Failure, Success}
object ReceivedJsonMsgHdlrActor {
def props(msgFromAkkaAppsEventBus: MsgFromAkkaAppsEventBus): Props =
Props(classOf[ReceivedJsonMsgHdlrActor], msgFromAkkaAppsEventBus)
}
class ReceivedJsonMsgHdlrActor(val msgFromAkkaAppsEventBus: MsgFromAkkaAppsEventBus)
extends Actor with ActorLogging with SystemConfiguration {
extends Actor
with ActorLogging
with SystemConfiguration
with ReceivedMessageRouter {
object JsonDeserializer extends Deserializer
@ -24,37 +29,38 @@ class ReceivedJsonMsgHdlrActor(val msgFromAkkaAppsEventBus: MsgFromAkkaAppsEvent
def handleReceivedJsonMessage(msg: JsonMsgFromAkkaApps): Unit = {
for {
envJsonNode <- JsonDeserializer.toJBbbCommonEnvJsNodeMsg(msg.data)
envJsonNode <- JsonDeserializer.toBbbCommonEnvJsNodeMsg(msg.data)
} yield route(envJsonNode.envelope, envJsonNode.core)
}
def route(envelope: BbbCoreEnvelope, jsonNode: JsonNode): Unit = {
println("*************** Route envelope name " + envelope.name)
log.debug("*************** Route envelope name " + envelope.name)
envelope.name match {
case MeetingCreatedEvtMsg.NAME =>
println("**************** Route MeetingCreatedEvtMsg")
log.debug("**************** Route MeetingCreatedEvtMsg")
for {
m <- routeMeetingCreatedEvtMsg(envelope, jsonNode)
m <- routeMeetingCreatedEvtMsg(jsonNode)
} yield {
println("************ Sending MeetingCreatedEvtMsg")
send(m)
log.debug("************ Sending MeetingCreatedEvtMsg")
send(envelope, m)
}
case _ =>
println("************ Cannot route envelope name " + envelope.name)
log.debug("************ Cannot route envelope name " + envelope.name)
// do nothing
}
}
def send(msg: MsgFromAkkaApps): Unit = {
println("******************** Routing " + msg.payload.envelope.name)
log.debug("******************** Routing " + msg.payload.envelope.name)
msgFromAkkaAppsEventBus.publish(msg)
}
def routeMeetingCreatedEvtMsg(envelope: BbbCoreEnvelope, jsonNode: JsonNode): Option[MsgFromAkkaApps] = {
for {
msg <- JsonDeserializer.toMeetingCreatedEvtMsg(envelope, jsonNode)
} yield {
MsgFromAkkaApps(fromAkkaAppsChannel, BbbCommonEnvCoreMsg(envelope, msg))
def routeMeetingCreatedEvtMsg(jsonNode: JsonNode): Option[MeetingCreatedEvtMsg] = {
JsonDeserializer.toBbbCommonMsg[MeetingCreatedEvtMsg](jsonNode) match {
case Success(msg) => Some(msg.asInstanceOf[MeetingCreatedEvtMsg])
case Failure(ex) =>
log.error("Failed to ValidateAuthTokenReqMsg message " + ex)
None
}
}
}

View File

@ -0,0 +1,19 @@
package org.bigbluebutton.api2.bus
import org.bigbluebutton.api2.SystemConfiguration
import org.bigbluebutton.api2.SystemConfiguration
import org.bigbluebutton.common2.messages.{BbbCommonEnvCoreMsg, BbbCoreEnvelope, MeetingCreatedEvtMsg}
trait ReceivedMessageRouter extends SystemConfiguration {
val msgFromAkkaAppsEventBus: MsgFromAkkaAppsEventBus
def publish(msg: MsgFromAkkaApps): Unit = {
msgFromAkkaAppsEventBus.publish(msg)
}
def send(envelope: BbbCoreEnvelope, msg: MeetingCreatedEvtMsg): Unit = {
val event = MsgFromAkkaApps(fromAkkaAppsChannel, BbbCommonEnvCoreMsg(envelope, msg))
publish(event)
}
}

View File

@ -63,12 +63,16 @@ class MeetingsManagerActor(val msgToAkkaAppsEventBus: MsgToAkkaAppsEventBus)
}
def handleCreateMeeting(msg: CreateMeetingMsg): Unit = {
log.debug("Received create meeting request for {} {} ", msg.defaultProps.meetingProp.intId,
msg.defaultProps.meetingProp.name)
for {
mid <- Util2.getFirstPartOfMeetingId(msg.defaultProps.meetingProp.intId)
} yield {
MeetingsManager.findMeetingThatStartsWith(manager, mid) match {
case Some(m) => replyWithDuplicateMeeting()
case Some(m) => replyWithDuplicateMeeting(msg)
case None => createNewMeeting(msg)
log.debug("Received create meeting request for {} {} ", msg.defaultProps.meetingProp.intId,
msg.defaultProps.meetingProp.name)
sendCreateMeetingRequestToAkkaApps(msg.defaultProps)
replyWithCreatedMeeting()
}
@ -77,8 +81,9 @@ class MeetingsManagerActor(val msgToAkkaAppsEventBus: MsgToAkkaAppsEventBus)
def replyWithDuplicateMeeting(): Unit = {
def replyWithDuplicateMeeting(msg: CreateMeetingMsg): Unit = {
log.warning("Duplicate create meeting request for {} {} ", msg.defaultProps.meetingProp.intId,
msg.defaultProps.meetingProp.name)
}
def createNewMeeting(msg: CreateMeetingMsg): RunningMeeting = {