diff --git a/akka-bbb-apps/src/main/scala/org/bigbluebutton/Boot.scala b/akka-bbb-apps/src/main/scala/org/bigbluebutton/Boot.scala index dcebef7ed6..0c1623093a 100755 --- a/akka-bbb-apps/src/main/scala/org/bigbluebutton/Boot.scala +++ b/akka-bbb-apps/src/main/scala/org/bigbluebutton/Boot.scala @@ -16,6 +16,7 @@ import org.bigbluebutton.core.bus._ import org.bigbluebutton.core.JsonMessageSenderActor import org.bigbluebutton.core.pubsub.senders.ReceivedJsonMsgHandlerActor import org.bigbluebutton.core.recorder.RecorderActor +import org.bigbluebutton.core2.FromAkkaAppsMsgSenderActor object Boot extends App with SystemConfiguration { @@ -46,6 +47,9 @@ object Boot extends App with SystemConfiguration { val bbbMsgBus = new BbbMsgRouterEventBus + val fromAkkaAppsMsgSenderActorRef = system.actorOf(FromAkkaAppsMsgSenderActor.props(msgSender)) + outBus2.subscribe(fromAkkaAppsMsgSenderActorRef, outBbbMsgMsgChannel) + val bbbInGW = new BigBlueButtonInGW(system, eventBus, bbbMsgBus, outGW) val redisMsgReceiver = new RedisMessageReceiver(bbbInGW) diff --git a/akka-bbb-apps/src/main/scala/org/bigbluebutton/core/BigBlueButtonActor.scala b/akka-bbb-apps/src/main/scala/org/bigbluebutton/core/BigBlueButtonActor.scala index fd911b6664..7647d06489 100755 --- a/akka-bbb-apps/src/main/scala/org/bigbluebutton/core/BigBlueButtonActor.scala +++ b/akka-bbb-apps/src/main/scala/org/bigbluebutton/core/BigBlueButtonActor.scala @@ -13,7 +13,7 @@ import org.bigbluebutton.core.api._ import org.bigbluebutton.SystemConfiguration import java.util.concurrent.TimeUnit -import org.bigbluebutton.common2.messages.{ BbbCommonEnvCoreMsg, CreateMeetingReqMsg } +import org.bigbluebutton.common2.messages._ import org.bigbluebutton.core.running.RunningMeeting object BigBlueButtonActor extends SystemConfiguration { @@ -78,6 +78,14 @@ class BigBlueButtonActor(val system: ActorSystem, private def handleCreateMeetingReqMsg(msg: CreateMeetingReqMsg): Unit = { log.debug("****** RECEIVED CreateMeetingReqMsg msg {}", msg) + + val routing = collection.immutable.HashMap("sender" -> "bbb-apps-akka") + val envelope = BbbCoreEnvelope(MeetingCreatedEvtMsg.NAME, routing) + val header = BbbCoreHeader(MeetingCreatedEvtMsg.NAME) + val body = MeetingCreatedEvtBody(msg.body.props) + val event = MeetingCreatedEvtMsg(header, body) + val msgEvent = BbbCommonEnvCoreMsg(envelope, event) + outGW.send(msgEvent) } private def findMeetingWithVoiceConfId(voiceConfId: String): Option[RunningMeeting] = { diff --git a/akka-bbb-apps/src/main/scala/org/bigbluebutton/core/OutMessageGateway.scala b/akka-bbb-apps/src/main/scala/org/bigbluebutton/core/OutMessageGateway.scala index 937bb8c5f1..aecd107bb1 100755 --- a/akka-bbb-apps/src/main/scala/org/bigbluebutton/core/OutMessageGateway.scala +++ b/akka-bbb-apps/src/main/scala/org/bigbluebutton/core/OutMessageGateway.scala @@ -1,7 +1,7 @@ package org.bigbluebutton.core import org.bigbluebutton.SystemConfiguration -import org.bigbluebutton.common2.messages.{ BbbCommonEnvJsNodeMsg } +import org.bigbluebutton.common2.messages.{ BbbCommonEnvCoreMsg } import org.bigbluebutton.core.bus.{ BbbOutMessage, BigBlueButtonOutMessage, OutEventBus2, OutgoingEventBus } import org.bigbluebutton.core.api.IOutMessage @@ -16,7 +16,8 @@ class OutMessageGateway(outgoingEventBus: OutgoingEventBus, outBus2: OutEventBus outgoingEventBus.publish(BigBlueButtonOutMessage(outMessageChannel, msg)) } - def send(msg: BbbCommonEnvJsNodeMsg): Unit = { + def send(msg: BbbCommonEnvCoreMsg): Unit = { + println("****** Sending to outBbbMsgMsgChannel " + msg.envelope.name) outBus2.publish(BbbOutMessage(outBbbMsgMsgChannel, msg)) } } diff --git a/akka-bbb-apps/src/main/scala/org/bigbluebutton/core/bus/OutEventBus2.scala b/akka-bbb-apps/src/main/scala/org/bigbluebutton/core/bus/OutEventBus2.scala index 5f0d4d3895..46dde1d369 100755 --- a/akka-bbb-apps/src/main/scala/org/bigbluebutton/core/bus/OutEventBus2.scala +++ b/akka-bbb-apps/src/main/scala/org/bigbluebutton/core/bus/OutEventBus2.scala @@ -2,9 +2,9 @@ package org.bigbluebutton.core.bus import akka.actor.ActorRef import akka.event.{ EventBus, LookupClassification } -import org.bigbluebutton.common2.messages.{ BbbCommonEnvJsNodeMsg } +import org.bigbluebutton.common2.messages.{ BbbCommonEnvCoreMsg } -case class BbbOutMessage(val topic: String, val payload: BbbCommonEnvJsNodeMsg) +case class BbbOutMessage(val topic: String, val payload: BbbCommonEnvCoreMsg) class OutEventBus2 extends EventBus with LookupClassification { type Event = BbbOutMessage diff --git a/akka-bbb-apps/src/main/scala/org/bigbluebutton/core2/FromAkkaAppsMsgSenderActor.scala b/akka-bbb-apps/src/main/scala/org/bigbluebutton/core2/FromAkkaAppsMsgSenderActor.scala new file mode 100755 index 0000000000..e2f1f326bb --- /dev/null +++ b/akka-bbb-apps/src/main/scala/org/bigbluebutton/core2/FromAkkaAppsMsgSenderActor.scala @@ -0,0 +1,26 @@ +package org.bigbluebutton.core2 + +import akka.actor.{ Actor, ActorLogging, Props } +import org.bigbluebutton.SystemConfiguration +import org.bigbluebutton.common2.messages.BbbCommonEnvCoreMsg +import org.bigbluebutton.common2.util.JsonUtil +import org.bigbluebutton.core.MessageSender +import org.bigbluebutton.core.bus.BbbOutMessage + +object FromAkkaAppsMsgSenderActor { + def props(msgSender: MessageSender): Props = Props(classOf[FromAkkaAppsMsgSenderActor], msgSender) +} + +class FromAkkaAppsMsgSenderActor(msgSender: MessageSender) extends Actor with ActorLogging with SystemConfiguration { + + def receive = { + case msg: BbbCommonEnvCoreMsg => handleBbbCommonEnvCoreMsg(msg) + case _ => println("************* FromAkkaAppsMsgSenderActor Cannot handle message ") + } + + def handleBbbCommonEnvCoreMsg(msg: BbbCommonEnvCoreMsg): Unit = { + val json = JsonUtil.toJson(msg) + println("****** Publishing " + json) + msgSender.send(fromAkkaAppsRedisChannel, json) + } +} diff --git a/akka-bbb-apps/src/main/scala/org/bigbluebutton/endpoint/redis/AppsRedisSubscriberActor.scala b/akka-bbb-apps/src/main/scala/org/bigbluebutton/endpoint/redis/AppsRedisSubscriberActor.scala index 5c531beb0b..85843b0ee0 100755 --- a/akka-bbb-apps/src/main/scala/org/bigbluebutton/endpoint/redis/AppsRedisSubscriberActor.scala +++ b/akka-bbb-apps/src/main/scala/org/bigbluebutton/endpoint/redis/AppsRedisSubscriberActor.scala @@ -57,7 +57,7 @@ class AppsRedisSubscriberActor(msgReceiver: RedisMessageReceiver, jsonMsgBus: In } def onPMessage(pmessage: PMessage) { - log.debug(s"RECEIVED:\n ${pmessage.data.utf8String} \n") + //log.debug(s"RECEIVED:\n ${pmessage.data.utf8String} \n") msgReceiver.handleMessage(pmessage.patternMatched, pmessage.channel, pmessage.data.utf8String) } diff --git a/akka-bbb-apps/src/main/scala/org/bigbluebutton/endpoint/redis/RedisPublisher.scala b/akka-bbb-apps/src/main/scala/org/bigbluebutton/endpoint/redis/RedisPublisher.scala index 6af8a64ebb..17bc1aac87 100755 --- a/akka-bbb-apps/src/main/scala/org/bigbluebutton/endpoint/redis/RedisPublisher.scala +++ b/akka-bbb-apps/src/main/scala/org/bigbluebutton/endpoint/redis/RedisPublisher.scala @@ -14,7 +14,7 @@ class RedisPublisher(val system: ActorSystem) extends SystemConfiguration { redis.clientSetname("BbbAppsAkkaPub") def publish(channel: String, data: String) { - //println("PUBLISH TO [" + channel + "]: \n [" + data + "]") + println("PUBLISH TO [" + channel + "]: \n [" + data + "]") redis.publish(channel, ByteString(data)) } diff --git a/bbb-common-message/src/main/scala/org/bigbluebutton/common2/messages/BbbCoreEnvelope.scala b/bbb-common-message/src/main/scala/org/bigbluebutton/common2/messages/BbbCoreEnvelope.scala index 2263ef2e01..2db0e463c2 100755 --- a/bbb-common-message/src/main/scala/org/bigbluebutton/common2/messages/BbbCoreEnvelope.scala +++ b/bbb-common-message/src/main/scala/org/bigbluebutton/common2/messages/BbbCoreEnvelope.scala @@ -33,5 +33,5 @@ case class CreateMeetingReqMsg(header: BbbCoreHeader, body: CreateMeetingReqMsgB case class CreateMeetingReqMsgBody(props: DefaultProps) object MeetingCreatedEvtMsg { val NAME = "MeetingCreatedEvtMsg"} -case class MeetingCreatedEvtMsg(header: BbbCoreHeader, body: MeetingCreatedEvtBody) +case class MeetingCreatedEvtMsg(header: BbbCoreHeader, body: MeetingCreatedEvtBody) extends BbbCoreMsg case class MeetingCreatedEvtBody(props: DefaultProps) \ No newline at end of file diff --git a/bbb-common-message/src/main/scala/org/bigbluebutton/common2/messages/Deserializer.scala b/bbb-common-message/src/main/scala/org/bigbluebutton/common2/messages/Deserializer.scala index 72fd87ac41..dcd2d46bc6 100755 --- a/bbb-common-message/src/main/scala/org/bigbluebutton/common2/messages/Deserializer.scala +++ b/bbb-common-message/src/main/scala/org/bigbluebutton/common2/messages/Deserializer.scala @@ -42,4 +42,21 @@ trait Deserializer { } } + def toMeetingCreatedEvtMsg(envelope: BbbCoreEnvelope, jsonNode: JsonNode): Option[MeetingCreatedEvtMsg] = { + def convertFromJson(json: String): Try[MeetingCreatedEvtMsg] = { + for { + msg <- Try(fromJson[MeetingCreatedEvtMsg](json)) + } yield msg + } + + val json = JsonUtil.toJson(jsonNode) + + convertFromJson(json) match { + case Success(msg) => Some(msg) + case Failure(ex) => println(s"************ Problem deserializing json: ${json}") + println(s"*********** Exception deserializing json: ${ex.getMessage}") + None + } + } + } diff --git a/bbb-common-web/build.sbt b/bbb-common-web/build.sbt index 5d643ca6c3..19560432ac 100755 --- a/bbb-common-web/build.sbt +++ b/bbb-common-web/build.sbt @@ -44,6 +44,8 @@ libraryDependencies += "com.github.etaty" % "rediscala_2.12" % "1.8.0" libraryDependencies += "com.softwaremill.quicklens" %% "quicklens" % "1.4.8" libraryDependencies += "org.bigbluebutton" % "bbb-common-message_2.12" % "0.0.19-SNAPSHOT" +// https://mvnrepository.com/artifact/com.fasterxml.jackson.module/jackson-module-scala_2.12 +libraryDependencies += "com.fasterxml.jackson.module" % "jackson-module-scala_2.12" % "2.8.8" libraryDependencies += "redis.clients" % "jedis" % "2.7.2" libraryDependencies += "com.google.code.gson" % "gson" % "2.5" diff --git a/bbb-common-web/src/main/scala/org/bigbluebutton/api2/bus/MsgFromAkkaAppsEventBus.scala b/bbb-common-web/src/main/scala/org/bigbluebutton/api2/bus/MsgFromAkkaAppsEventBus.scala index 7227c23bf5..bb66bd8b2e 100755 --- a/bbb-common-web/src/main/scala/org/bigbluebutton/api2/bus/MsgFromAkkaAppsEventBus.scala +++ b/bbb-common-web/src/main/scala/org/bigbluebutton/api2/bus/MsgFromAkkaAppsEventBus.scala @@ -2,9 +2,9 @@ package org.bigbluebutton.api2.bus import akka.actor.ActorRef import akka.event.{EventBus, LookupClassification} -import org.bigbluebutton.common2.messages.{ BbbCommonEnvJsNodeMsg} +import org.bigbluebutton.common2.messages.{BbbCommonEnvCoreMsg} -case class MsgFromAkkaApps(val topic: String, val payload: BbbCommonEnvJsNodeMsg) +case class MsgFromAkkaApps(val topic: String, val payload: BbbCommonEnvCoreMsg) class MsgFromAkkaAppsEventBus extends EventBus with LookupClassification { type Event = MsgFromAkkaApps diff --git a/bbb-common-web/src/main/scala/org/bigbluebutton/api2/bus/ReceivedJsonMsgHdlrActor.scala b/bbb-common-web/src/main/scala/org/bigbluebutton/api2/bus/ReceivedJsonMsgHdlrActor.scala index a9e8cbf109..4f6b61a1e4 100755 --- a/bbb-common-web/src/main/scala/org/bigbluebutton/api2/bus/ReceivedJsonMsgHdlrActor.scala +++ b/bbb-common-web/src/main/scala/org/bigbluebutton/api2/bus/ReceivedJsonMsgHdlrActor.scala @@ -2,9 +2,8 @@ package org.bigbluebutton.api2.bus import akka.actor.{Actor, ActorLogging, Props} import org.bigbluebutton.api2.SystemConfiguration -import org.bigbluebutton.common2.messages.BbbCommonEnvJsNodeMsg -import org.bigbluebutton.common2.util.JsonUtil - +import org.bigbluebutton.common2.messages._ +import com.fasterxml.jackson.databind.JsonNode object ReceivedJsonMsgHdlrActor { def props(msgFromAkkaAppsEventBus: MsgFromAkkaAppsEventBus): Props = @@ -14,6 +13,8 @@ object ReceivedJsonMsgHdlrActor { class ReceivedJsonMsgHdlrActor(val msgFromAkkaAppsEventBus: MsgFromAkkaAppsEventBus) extends Actor with ActorLogging with SystemConfiguration { + object JsonDeserializer extends Deserializer + def receive = { case msg: JsonMsgFromAkkaApps => handleReceivedJsonMessage(msg) @@ -22,7 +23,38 @@ class ReceivedJsonMsgHdlrActor(val msgFromAkkaAppsEventBus: MsgFromAkkaAppsEvent } def handleReceivedJsonMessage(msg: JsonMsgFromAkkaApps): Unit = { - val serverMsg = JsonUtil.fromJson[BbbCommonEnvJsNodeMsg](msg.data) - msgFromAkkaAppsEventBus.publish(MsgFromAkkaApps(fromAkkaAppsChannel, serverMsg)) + for { + envJsonNode <- JsonDeserializer.toJBbbCommonEnvJsNodeMsg(msg.data) + } yield route(envJsonNode.envelope, envJsonNode.core) + } + + def route(envelope: BbbCoreEnvelope, jsonNode: JsonNode): Unit = { + println("*************** Route envelope name " + envelope.name) + envelope.name match { + case MeetingCreatedEvtMsg.NAME => + println("**************** Route MeetingCreatedEvtMsg") + for { + m <- routeMeetingCreatedEvtMsg(envelope, jsonNode) + } yield { + println("************ Sending MeetingCreatedEvtMsg") + send(m) + } + case _ => + println("************ Cannot route envelope name " + envelope.name) + // do nothing + } + } + + def send(msg: MsgFromAkkaApps): Unit = { + println("******************** Routing " + msg.payload.envelope.name) + msgFromAkkaAppsEventBus.publish(msg) + } + + def routeMeetingCreatedEvtMsg(envelope: BbbCoreEnvelope, jsonNode: JsonNode): Option[MsgFromAkkaApps] = { + for { + msg <- JsonDeserializer.toMeetingCreatedEvtMsg(envelope, jsonNode) + } yield { + MsgFromAkkaApps(fromAkkaAppsChannel, BbbCommonEnvCoreMsg(envelope, msg)) + } } } diff --git a/bbb-common-web/src/main/scala/org/bigbluebutton/api2/endpoint/redis/AppsRedisSubscriberActor.scala b/bbb-common-web/src/main/scala/org/bigbluebutton/api2/endpoint/redis/AppsRedisSubscriberActor.scala index 4ede23fc90..32e4654c97 100755 --- a/bbb-common-web/src/main/scala/org/bigbluebutton/api2/endpoint/redis/AppsRedisSubscriberActor.scala +++ b/bbb-common-web/src/main/scala/org/bigbluebutton/api2/endpoint/redis/AppsRedisSubscriberActor.scala @@ -46,9 +46,11 @@ class AppsRedisSubscriberActor(jsonMsgBus: JsonMsgFromAkkaAppsBus, oldMessageEve write(ClientSetname("Red5AppsSub").encodedRequest) def onMessage(message: Message) { - log.error(s"SHOULD NOT BE RECEIVING: $message") - val receivedJsonMessage = new JsonMsgFromAkkaApps(message.channel, message.data.utf8String) - jsonMsgBus.publish(JsonMsgFromAkkaAppsEvent(fromAkkaAppsJsonChannel, receivedJsonMessage)) + //log.error(s"SHOULD NOT BE RECEIVING: $message") + if (message.channel == fromAkkaAppsRedisChannel) { + val receivedJsonMessage = new JsonMsgFromAkkaApps(message.channel, message.data.utf8String) + jsonMsgBus.publish(JsonMsgFromAkkaAppsEvent(fromAkkaAppsJsonChannel, receivedJsonMessage)) + } } def onPMessage(pmessage: PMessage) { diff --git a/bbb-common-web/src/main/scala/org/bigbluebutton/api2/meeting/FromAkkaAppsHandlersTrait.scala b/bbb-common-web/src/main/scala/org/bigbluebutton/api2/meeting/FromAkkaAppsHandlersTrait.scala index b795ed03ff..38b6aecaf3 100755 --- a/bbb-common-web/src/main/scala/org/bigbluebutton/api2/meeting/FromAkkaAppsHandlersTrait.scala +++ b/bbb-common-web/src/main/scala/org/bigbluebutton/api2/meeting/FromAkkaAppsHandlersTrait.scala @@ -4,6 +4,6 @@ import org.bigbluebutton.common2.messages.MeetingCreatedEvtMsg trait FromAkkaAppsHandlersTrait { def handleMeetingCreatedEvtMsg(msg: MeetingCreatedEvtMsg): Unit = { - + println("************* HANDLING " + msg.header.name) } } diff --git a/bbb-common-web/src/main/scala/org/bigbluebutton/api2/meeting/MeetingsManagerActor.scala b/bbb-common-web/src/main/scala/org/bigbluebutton/api2/meeting/MeetingsManagerActor.scala index e3b93b08c0..1702687f2a 100755 --- a/bbb-common-web/src/main/scala/org/bigbluebutton/api2/meeting/MeetingsManagerActor.scala +++ b/bbb-common-web/src/main/scala/org/bigbluebutton/api2/meeting/MeetingsManagerActor.scala @@ -6,7 +6,7 @@ import org.bigbluebutton.api2.bus.MsgToAkkaAppsEventBus import org.bigbluebutton.common2.domain.DefaultProps import org.bigbluebutton.api2.util.Util2 import org.bigbluebutton.common.messages.UserJoinedVoiceMessage -import org.bigbluebutton.common2.messages.MeetingCreatedEvtMsg +import org.bigbluebutton.common2.messages.{BbbCommonEnvCoreMsg, MeetingCreatedEvtMsg} sealed trait ApiMsg @@ -58,7 +58,7 @@ class MeetingsManagerActor(val msgToAkkaAppsEventBus: MsgToAkkaAppsEventBus) def receive = { case msg: CreateMeetingMsg => handleCreateMeeting(msg) - case msg: MeetingCreatedEvtMsg => handleMeetingCreatedEvtMsg(msg) + case msg: BbbCommonEnvCoreMsg => handleBbbCommonEnvCoreMsg(msg) } def handleCreateMeeting(msg: CreateMeetingMsg): Unit = { @@ -88,4 +88,11 @@ class MeetingsManagerActor(val msgToAkkaAppsEventBus: MsgToAkkaAppsEventBus) } + + private def handleBbbCommonEnvCoreMsg(msg: BbbCommonEnvCoreMsg): Unit = { + msg.core match { + case m: MeetingCreatedEvtMsg => handleMeetingCreatedEvtMsg(m) + case _ => println("***** Cannot handle " + msg.envelope.name) + } + } }