- send and handle meeting created event

This commit is contained in:
Richard Alam 2017-05-25 14:34:46 -07:00
parent 72f7272eb7
commit 18cce3a818
15 changed files with 120 additions and 21 deletions

View File

@ -16,6 +16,7 @@ import org.bigbluebutton.core.bus._
import org.bigbluebutton.core.JsonMessageSenderActor
import org.bigbluebutton.core.pubsub.senders.ReceivedJsonMsgHandlerActor
import org.bigbluebutton.core.recorder.RecorderActor
import org.bigbluebutton.core2.FromAkkaAppsMsgSenderActor
object Boot extends App with SystemConfiguration {
@ -46,6 +47,9 @@ object Boot extends App with SystemConfiguration {
val bbbMsgBus = new BbbMsgRouterEventBus
val fromAkkaAppsMsgSenderActorRef = system.actorOf(FromAkkaAppsMsgSenderActor.props(msgSender))
outBus2.subscribe(fromAkkaAppsMsgSenderActorRef, outBbbMsgMsgChannel)
val bbbInGW = new BigBlueButtonInGW(system, eventBus, bbbMsgBus, outGW)
val redisMsgReceiver = new RedisMessageReceiver(bbbInGW)

View File

@ -13,7 +13,7 @@ import org.bigbluebutton.core.api._
import org.bigbluebutton.SystemConfiguration
import java.util.concurrent.TimeUnit
import org.bigbluebutton.common2.messages.{ BbbCommonEnvCoreMsg, CreateMeetingReqMsg }
import org.bigbluebutton.common2.messages._
import org.bigbluebutton.core.running.RunningMeeting
object BigBlueButtonActor extends SystemConfiguration {
@ -78,6 +78,14 @@ class BigBlueButtonActor(val system: ActorSystem,
private def handleCreateMeetingReqMsg(msg: CreateMeetingReqMsg): Unit = {
log.debug("****** RECEIVED CreateMeetingReqMsg msg {}", msg)
val routing = collection.immutable.HashMap("sender" -> "bbb-apps-akka")
val envelope = BbbCoreEnvelope(MeetingCreatedEvtMsg.NAME, routing)
val header = BbbCoreHeader(MeetingCreatedEvtMsg.NAME)
val body = MeetingCreatedEvtBody(msg.body.props)
val event = MeetingCreatedEvtMsg(header, body)
val msgEvent = BbbCommonEnvCoreMsg(envelope, event)
outGW.send(msgEvent)
}
private def findMeetingWithVoiceConfId(voiceConfId: String): Option[RunningMeeting] = {

View File

@ -1,7 +1,7 @@
package org.bigbluebutton.core
import org.bigbluebutton.SystemConfiguration
import org.bigbluebutton.common2.messages.{ BbbCommonEnvJsNodeMsg }
import org.bigbluebutton.common2.messages.{ BbbCommonEnvCoreMsg }
import org.bigbluebutton.core.bus.{ BbbOutMessage, BigBlueButtonOutMessage, OutEventBus2, OutgoingEventBus }
import org.bigbluebutton.core.api.IOutMessage
@ -16,7 +16,8 @@ class OutMessageGateway(outgoingEventBus: OutgoingEventBus, outBus2: OutEventBus
outgoingEventBus.publish(BigBlueButtonOutMessage(outMessageChannel, msg))
}
def send(msg: BbbCommonEnvJsNodeMsg): Unit = {
def send(msg: BbbCommonEnvCoreMsg): Unit = {
println("****** Sending to outBbbMsgMsgChannel " + msg.envelope.name)
outBus2.publish(BbbOutMessage(outBbbMsgMsgChannel, msg))
}
}

View File

@ -2,9 +2,9 @@ package org.bigbluebutton.core.bus
import akka.actor.ActorRef
import akka.event.{ EventBus, LookupClassification }
import org.bigbluebutton.common2.messages.{ BbbCommonEnvJsNodeMsg }
import org.bigbluebutton.common2.messages.{ BbbCommonEnvCoreMsg }
case class BbbOutMessage(val topic: String, val payload: BbbCommonEnvJsNodeMsg)
case class BbbOutMessage(val topic: String, val payload: BbbCommonEnvCoreMsg)
class OutEventBus2 extends EventBus with LookupClassification {
type Event = BbbOutMessage

View File

@ -0,0 +1,26 @@
package org.bigbluebutton.core2
import akka.actor.{ Actor, ActorLogging, Props }
import org.bigbluebutton.SystemConfiguration
import org.bigbluebutton.common2.messages.BbbCommonEnvCoreMsg
import org.bigbluebutton.common2.util.JsonUtil
import org.bigbluebutton.core.MessageSender
import org.bigbluebutton.core.bus.BbbOutMessage
object FromAkkaAppsMsgSenderActor {
def props(msgSender: MessageSender): Props = Props(classOf[FromAkkaAppsMsgSenderActor], msgSender)
}
class FromAkkaAppsMsgSenderActor(msgSender: MessageSender) extends Actor with ActorLogging with SystemConfiguration {
def receive = {
case msg: BbbCommonEnvCoreMsg => handleBbbCommonEnvCoreMsg(msg)
case _ => println("************* FromAkkaAppsMsgSenderActor Cannot handle message ")
}
def handleBbbCommonEnvCoreMsg(msg: BbbCommonEnvCoreMsg): Unit = {
val json = JsonUtil.toJson(msg)
println("****** Publishing " + json)
msgSender.send(fromAkkaAppsRedisChannel, json)
}
}

View File

@ -57,7 +57,7 @@ class AppsRedisSubscriberActor(msgReceiver: RedisMessageReceiver, jsonMsgBus: In
}
def onPMessage(pmessage: PMessage) {
log.debug(s"RECEIVED:\n ${pmessage.data.utf8String} \n")
//log.debug(s"RECEIVED:\n ${pmessage.data.utf8String} \n")
msgReceiver.handleMessage(pmessage.patternMatched, pmessage.channel, pmessage.data.utf8String)
}

View File

@ -14,7 +14,7 @@ class RedisPublisher(val system: ActorSystem) extends SystemConfiguration {
redis.clientSetname("BbbAppsAkkaPub")
def publish(channel: String, data: String) {
//println("PUBLISH TO [" + channel + "]: \n [" + data + "]")
println("PUBLISH TO [" + channel + "]: \n [" + data + "]")
redis.publish(channel, ByteString(data))
}

View File

@ -33,5 +33,5 @@ case class CreateMeetingReqMsg(header: BbbCoreHeader, body: CreateMeetingReqMsgB
case class CreateMeetingReqMsgBody(props: DefaultProps)
object MeetingCreatedEvtMsg { val NAME = "MeetingCreatedEvtMsg"}
case class MeetingCreatedEvtMsg(header: BbbCoreHeader, body: MeetingCreatedEvtBody)
case class MeetingCreatedEvtMsg(header: BbbCoreHeader, body: MeetingCreatedEvtBody) extends BbbCoreMsg
case class MeetingCreatedEvtBody(props: DefaultProps)

View File

@ -42,4 +42,21 @@ trait Deserializer {
}
}
def toMeetingCreatedEvtMsg(envelope: BbbCoreEnvelope, jsonNode: JsonNode): Option[MeetingCreatedEvtMsg] = {
def convertFromJson(json: String): Try[MeetingCreatedEvtMsg] = {
for {
msg <- Try(fromJson[MeetingCreatedEvtMsg](json))
} yield msg
}
val json = JsonUtil.toJson(jsonNode)
convertFromJson(json) match {
case Success(msg) => Some(msg)
case Failure(ex) => println(s"************ Problem deserializing json: ${json}")
println(s"*********** Exception deserializing json: ${ex.getMessage}")
None
}
}
}

View File

@ -44,6 +44,8 @@ libraryDependencies += "com.github.etaty" % "rediscala_2.12" % "1.8.0"
libraryDependencies += "com.softwaremill.quicklens" %% "quicklens" % "1.4.8"
libraryDependencies += "org.bigbluebutton" % "bbb-common-message_2.12" % "0.0.19-SNAPSHOT"
// https://mvnrepository.com/artifact/com.fasterxml.jackson.module/jackson-module-scala_2.12
libraryDependencies += "com.fasterxml.jackson.module" % "jackson-module-scala_2.12" % "2.8.8"
libraryDependencies += "redis.clients" % "jedis" % "2.7.2"
libraryDependencies += "com.google.code.gson" % "gson" % "2.5"

View File

@ -2,9 +2,9 @@ package org.bigbluebutton.api2.bus
import akka.actor.ActorRef
import akka.event.{EventBus, LookupClassification}
import org.bigbluebutton.common2.messages.{ BbbCommonEnvJsNodeMsg}
import org.bigbluebutton.common2.messages.{BbbCommonEnvCoreMsg}
case class MsgFromAkkaApps(val topic: String, val payload: BbbCommonEnvJsNodeMsg)
case class MsgFromAkkaApps(val topic: String, val payload: BbbCommonEnvCoreMsg)
class MsgFromAkkaAppsEventBus extends EventBus with LookupClassification {
type Event = MsgFromAkkaApps

View File

@ -2,9 +2,8 @@ package org.bigbluebutton.api2.bus
import akka.actor.{Actor, ActorLogging, Props}
import org.bigbluebutton.api2.SystemConfiguration
import org.bigbluebutton.common2.messages.BbbCommonEnvJsNodeMsg
import org.bigbluebutton.common2.util.JsonUtil
import org.bigbluebutton.common2.messages._
import com.fasterxml.jackson.databind.JsonNode
object ReceivedJsonMsgHdlrActor {
def props(msgFromAkkaAppsEventBus: MsgFromAkkaAppsEventBus): Props =
@ -14,6 +13,8 @@ object ReceivedJsonMsgHdlrActor {
class ReceivedJsonMsgHdlrActor(val msgFromAkkaAppsEventBus: MsgFromAkkaAppsEventBus)
extends Actor with ActorLogging with SystemConfiguration {
object JsonDeserializer extends Deserializer
def receive = {
case msg: JsonMsgFromAkkaApps => handleReceivedJsonMessage(msg)
@ -22,7 +23,38 @@ class ReceivedJsonMsgHdlrActor(val msgFromAkkaAppsEventBus: MsgFromAkkaAppsEvent
}
def handleReceivedJsonMessage(msg: JsonMsgFromAkkaApps): Unit = {
val serverMsg = JsonUtil.fromJson[BbbCommonEnvJsNodeMsg](msg.data)
msgFromAkkaAppsEventBus.publish(MsgFromAkkaApps(fromAkkaAppsChannel, serverMsg))
for {
envJsonNode <- JsonDeserializer.toJBbbCommonEnvJsNodeMsg(msg.data)
} yield route(envJsonNode.envelope, envJsonNode.core)
}
def route(envelope: BbbCoreEnvelope, jsonNode: JsonNode): Unit = {
println("*************** Route envelope name " + envelope.name)
envelope.name match {
case MeetingCreatedEvtMsg.NAME =>
println("**************** Route MeetingCreatedEvtMsg")
for {
m <- routeMeetingCreatedEvtMsg(envelope, jsonNode)
} yield {
println("************ Sending MeetingCreatedEvtMsg")
send(m)
}
case _ =>
println("************ Cannot route envelope name " + envelope.name)
// do nothing
}
}
def send(msg: MsgFromAkkaApps): Unit = {
println("******************** Routing " + msg.payload.envelope.name)
msgFromAkkaAppsEventBus.publish(msg)
}
def routeMeetingCreatedEvtMsg(envelope: BbbCoreEnvelope, jsonNode: JsonNode): Option[MsgFromAkkaApps] = {
for {
msg <- JsonDeserializer.toMeetingCreatedEvtMsg(envelope, jsonNode)
} yield {
MsgFromAkkaApps(fromAkkaAppsChannel, BbbCommonEnvCoreMsg(envelope, msg))
}
}
}

View File

@ -46,9 +46,11 @@ class AppsRedisSubscriberActor(jsonMsgBus: JsonMsgFromAkkaAppsBus, oldMessageEve
write(ClientSetname("Red5AppsSub").encodedRequest)
def onMessage(message: Message) {
log.error(s"SHOULD NOT BE RECEIVING: $message")
val receivedJsonMessage = new JsonMsgFromAkkaApps(message.channel, message.data.utf8String)
jsonMsgBus.publish(JsonMsgFromAkkaAppsEvent(fromAkkaAppsJsonChannel, receivedJsonMessage))
//log.error(s"SHOULD NOT BE RECEIVING: $message")
if (message.channel == fromAkkaAppsRedisChannel) {
val receivedJsonMessage = new JsonMsgFromAkkaApps(message.channel, message.data.utf8String)
jsonMsgBus.publish(JsonMsgFromAkkaAppsEvent(fromAkkaAppsJsonChannel, receivedJsonMessage))
}
}
def onPMessage(pmessage: PMessage) {

View File

@ -4,6 +4,6 @@ import org.bigbluebutton.common2.messages.MeetingCreatedEvtMsg
trait FromAkkaAppsHandlersTrait {
def handleMeetingCreatedEvtMsg(msg: MeetingCreatedEvtMsg): Unit = {
println("************* HANDLING " + msg.header.name)
}
}

View File

@ -6,7 +6,7 @@ import org.bigbluebutton.api2.bus.MsgToAkkaAppsEventBus
import org.bigbluebutton.common2.domain.DefaultProps
import org.bigbluebutton.api2.util.Util2
import org.bigbluebutton.common.messages.UserJoinedVoiceMessage
import org.bigbluebutton.common2.messages.MeetingCreatedEvtMsg
import org.bigbluebutton.common2.messages.{BbbCommonEnvCoreMsg, MeetingCreatedEvtMsg}
sealed trait ApiMsg
@ -58,7 +58,7 @@ class MeetingsManagerActor(val msgToAkkaAppsEventBus: MsgToAkkaAppsEventBus)
def receive = {
case msg: CreateMeetingMsg => handleCreateMeeting(msg)
case msg: MeetingCreatedEvtMsg => handleMeetingCreatedEvtMsg(msg)
case msg: BbbCommonEnvCoreMsg => handleBbbCommonEnvCoreMsg(msg)
}
def handleCreateMeeting(msg: CreateMeetingMsg): Unit = {
@ -88,4 +88,11 @@ class MeetingsManagerActor(val msgToAkkaAppsEventBus: MsgToAkkaAppsEventBus)
}
private def handleBbbCommonEnvCoreMsg(msg: BbbCommonEnvCoreMsg): Unit = {
msg.core match {
case m: MeetingCreatedEvtMsg => handleMeetingCreatedEvtMsg(m)
case _ => println("***** Cannot handle " + msg.envelope.name)
}
}
}