Merge pull request #8223 from ritzalam/upgrade-vertx-akka

Prototyping client <-> server messaging
This commit is contained in:
Richard Alam 2019-10-20 13:42:05 -04:00 committed by GitHub
commit ade85da737
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 242 additions and 122 deletions

View File

@ -49,3 +49,5 @@ lib_managed/
bin/
.vertx/
target/
src/main/resources/

View File

@ -0,0 +1,31 @@
#alignArguments=false
alignParameters=true
alignSingleLineCaseStatements=true
#alignSingleLineCaseStatements.maxArrowIndent=40
#allowParamGroupsOnNewlines=false
#compactControlReadability=false
#compactStringConcatenation=false
danglingCloseParenthesis=Force
#doubleIndentClassDeclaration=false
doubleIndentConstructorArguments=true
doubleIndentMethodDeclaration=true
firstArgumentOnNewline=Force
firstParameterOnNewline=Force
#formatXml=true
#indentLocalDefs=false
#indentPackageBlocks=true
#indentSpaces=2
#indentWithTabs=false
#multilineScaladocCommentsStartOnFirstLine=false
#newlineAtEndOfFile=false
#placeScaladocAsterisksBeneathSecondAsterisk=false
#preserveSpaceBeforeArguments=false
#rewriteArrowSymbols=false
singleCasePatternOnNewline=false
#spaceBeforeColon=false
#spaceBeforeContextColon=false
#spaceInsideBrackets=false
#spaceInsideParentheses=false
#spacesAroundMultiImports=true
#spacesWithinPatternBinders=true

View File

@ -1,73 +1,47 @@
import org.bigbluebutton.build._
import NativePackagerHelper._
import com.typesafe.sbt.SbtNativePackager.autoImport._
enablePlugins(JavaServerAppPackaging)
enablePlugins(UniversalPlugin)
enablePlugins(DebianPlugin)
name := "vertx-akka"
val compileSettings = Seq(
organization := "org.bigbluebutton",
organization := "org.bigbluebutton"
version := "0.0.2"
scalaVersion := "2.12.2"
scalacOptions ++= Seq(
"-unchecked",
"-deprecation",
"-Xlint",
"-Ywarn-dead-code",
"-language:_",
"-target:jvm-1.8",
"-encoding", "UTF-8"
scalacOptions ++= List(
"-unchecked",
"-deprecation",
"-Xlint",
"-Ywarn-dead-code",
"-language:_",
"-target:jvm-1.8",
"-encoding", "UTF-8"
),
javacOptions ++= List(
"-Xlint:unchecked",
"-Xlint:deprecation"
)
)
resolvers ++= Seq(
"spray repo" at "http://repo.spray.io/",
"rediscala" at "http://dl.bintray.com/etaty/maven",
"blindside-repos" at "http://blindside.googlecode.com/svn/repository/"
)
publishTo := Some(Resolver.file("file", new File(Path.userHome.absolutePath+"/dev/repo/maven-repo/releases" )) )
publishTo := Some(Resolver.file("file", new File(Path.userHome.absolutePath + "/dev/repo/maven-repo/releases")))
// We want to have our jar files in lib_managed dir.
// This way we'll have the right path when we import
// into eclipse.
retrieveManaged := true
unmanagedResourceDirectories in Compile += { baseDirectory.value / "src/main/webapp" }
testOptions in Test += Tests.Argument(TestFrameworks.Specs2, "html", "console", "junitxml")
testOptions in Test += Tests.Argument(TestFrameworks.ScalaTest, "-h", "target/scalatest-reports")
libraryDependencies ++= {
val scalaV = "2.12.2"
val akkaVersion = "2.5.1"
val scalaTestV = "2.2.6"
val vertxV = "3.5.1"
Seq(
"com.typesafe.akka" %% "akka-actor" % akkaVersion,
"com.typesafe.akka" %% "akka-testkit" % akkaVersion % "test",
"com.typesafe.akka" %% "akka-slf4j" % akkaVersion,
"ch.qos.logback" % "logback-classic" % "1.0.13" % "runtime",
"org.pegdown" % "pegdown" % "1.4.0",
"junit" % "junit" % "4.11",
"commons-codec" % "commons-codec" % "1.8",
"joda-time" % "joda-time" % "2.3",
"com.google.code.gson" % "gson" % "2.8.0",
"io.vertx" % "vertx-web" % vertxV,
"io.vertx" % "vertx-auth-common" % vertxV,
"io.vertx" % "vertx-auth-shiro" % vertxV,
"io.vertx" %% "vertx-web-scala" % vertxV,
"io.vertx" %% "vertx-lang-scala" % vertxV,
"com.github.etaty" % "rediscala_2.12" % "1.8.0",
"com.softwaremill.quicklens" %% "quicklens" % "1.4.8",
"org.bigbluebutton" % "bbb-common-message_2.12" % "0.0.19-SNAPSHOT",
"redis.clients" % "jedis" % "2.9.0"
)}
seq(Revolver.settings: _*)
scalariformSettings
Seq(Revolver.settings: _*)
lazy val bbbVertxAkka = (project in file(".")).settings(name := "bbb-vertx-akka", libraryDependencies ++= Dependencies.runtime).settings(compileSettings)
// See https://github.com/scala-ide/scalariform
// Config file is in ./.scalariform.conf
scalariformAutoformat := true
//-----------
// Packaging
@ -80,30 +54,22 @@ mainClass := Some("org.bigbluebutton.Boot")
maintainer in Linux := "Richard Alam <ritzalam@gmail.com>"
packageSummary in Linux := "vertx akka example"
packageSummary in Linux := "BigBlueButton Vertx Akka"
packageDescription := """Vertx Akka Example."""
packageDescription := """BigBlueButton Core Vertx Akka."""
val user = "bigbluebutton"
val group = "bigbluebutton"
// user which will execute the application
daemonUser in Linux := user
daemonUser in Linux := user
// group which will execute the application
daemonGroup in Linux := group
daemonGroup in Linux := group
mappings in Universal <+= (packageBin in Compile, sourceDirectory ) map { (_, src) =>
// Move the application.conf so the user can override settings here
val appConf = src / "main" / "resources" / "application.conf"
appConf -> "conf/application.conf"
}
mappings in Universal <+= (packageBin in Compile, sourceDirectory ) map { (_, src) =>
// Move logback.xml so the user can override settings here
val logConf = src / "main" / "resources" / "logback.xml"
logConf -> "conf/logback.xml"
}
javaOptions in Universal ++= Seq("-J-Xms130m", "-J-Xmx256m", "-Dconfig.file=conf/application.conf", "-Dlogback.configurationFile=conf/logback.xml")
debianPackageDependencies in Debian ++= Seq("java8-runtime-headless", "bash")

View File

@ -0,0 +1,99 @@
package org.bigbluebutton.build
import sbt._
import Keys._
object Dependencies {
object Versions {
// Scala
val scala = "2.12.8"
val junit = "4.12"
val junitInterface = "0.11"
val scalactic = "3.0.3"
// Libraries
val akkaVersion = "2.5.19"
val gson = "2.8.5"
val jackson = "2.9.7"
val logback = "1.2.3"
val quicklens = "1.4.11"
val spray = "1.3.4"
val vertxV = "3.5.1"
// Apache Commons
val lang = "3.8.1"
val codec = "1.11"
// BigBlueButton
val bbbCommons = "0.0.20-SNAPSHOT"
// Test
val scalaTest = "3.0.5"
val mockito = "2.23.0"
val akkaTestKit = "2.5.18"
}
object Compile {
val scalaLibrary = "org.scala-lang" % "scala-library" % Versions.scala
val scalaCompiler = "org.scala-lang" % "scala-compiler" % Versions.scala
val akkaActor = "com.typesafe.akka" % "akka-actor_2.12" % Versions.akkaVersion
val akkaSl4fj = "com.typesafe.akka" % "akka-slf4j_2.12" % Versions.akkaVersion
val vertxWeb = "io.vertx" % "vertx-web" % Versions.vertxV
val vertxAuthCommon = "io.vertx" % "vertx-auth-common" % Versions.vertxV
val vertxAuthShiro = "io.vertx" % "vertx-auth-shiro" % Versions.vertxV
val vertxWebScala = "io.vertx" %% "vertx-web-scala" % Versions.vertxV
val vertxLangScala = "io.vertx" %% "vertx-lang-scala" % Versions.vertxV
val googleGson = "com.google.code.gson" % "gson" % Versions.gson
val jacksonModule = "com.fasterxml.jackson.module" %% "jackson-module-scala" % Versions.jackson
val quicklens = "com.softwaremill.quicklens" %% "quicklens" % Versions.quicklens
val logback = "ch.qos.logback" % "logback-classic" % Versions.logback
val commonsCodec = "commons-codec" % "commons-codec" % Versions.codec
val sprayJson = "io.spray" % "spray-json_2.12" % Versions.spray
val redisEtaty = "com.github.etaty" % "rediscala_2.12" % "1.8.0"
val apacheLang = "org.apache.commons" % "commons-lang3" % Versions.lang
val bbbCommons = "org.bigbluebutton" % "bbb-common-message_2.12" % Versions.bbbCommons excludeAll (
ExclusionRule(organization = "org.red5"))
}
object Test {
val scalaTest = "org.scalatest" %% "scalatest" % Versions.scalaTest % "test"
val junit = "junit" % "junit" % Versions.junit % "test"
val mockitoCore = "org.mockito" % "mockito-core" % Versions.mockito % "test"
val scalactic = "org.scalactic" % "scalactic_2.12" % Versions.scalactic % "test"
val akkaTestKit = "com.typesafe.akka" %% "akka-testkit" % Versions.akkaTestKit % "test"
}
val testing = Seq(
Test.scalaTest,
Test.junit,
Test.mockitoCore,
Test.scalactic,
Test.akkaTestKit)
val runtime = Seq(
Compile.scalaLibrary,
Compile.scalaCompiler,
Compile.akkaActor,
Compile.akkaSl4fj,
Compile.vertxWeb,
Compile.vertxAuthCommon,
Compile.vertxAuthShiro,
Compile.vertxWebScala,
Compile.vertxWebScala,
Compile.googleGson,
Compile.jacksonModule,
Compile.quicklens,
Compile.logback,
Compile.commonsCodec,
Compile.sprayJson,
Compile.apacheLang,
Compile.redisEtaty,
Compile.bbbCommons) ++ testing
}

View File

@ -1 +1 @@
sbt.version=0.13.8
sbt.version=1.2.7

View File

@ -1,11 +1,11 @@
addSbtPlugin("io.spray" % "sbt-revolver" % "0.7.2")
addSbtPlugin("io.spray" % "sbt-revolver" % "0.9.1")
addSbtPlugin("com.typesafe.sbt" % "sbt-scalariform" % "1.3.0")
addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "5.2.4")
addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "2.2.0")
addSbtPlugin("org.scalariform" % "sbt-scalariform" % "1.8.2")
addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.0.0")
addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.3.15")
addSbtPlugin("net.vonbuchholtz" % "sbt-dependency-check" % "0.2.7")
addSbtPlugin("net.vonbuchholtz" % "sbt-dependency-check" % "0.2.9")
addSbtPlugin("org.scalastyle" %% "scalastyle-sbt-plugin" % "1.0.0")
addSbtPlugin("org.scalastyle" %% "scalastyle-sbt-plugin" % "1.0.0")

6
labs/vertx-akka/run-dev.sh Executable file
View File

@ -0,0 +1,6 @@
#!/usr/bin/env bash
rm -rf src/main/resources
cp -R src/universal/conf src/main/resources
sbt run

6
labs/vertx-akka/run.sh Executable file
View File

@ -0,0 +1,6 @@
#!/usr/bin/env bash
sbt clean stage
sudo service bbb-vertx-akka stop
cd target/universal/stage
./bin/bbb-vertx-akka

View File

@ -12,7 +12,7 @@ object AuthService {
}
class AuthService(gw: AkkaToVertxGateway)
extends Actor with ActorLogging {
extends Actor with ActorLogging {
def receive = {
case msg: String => {

View File

@ -14,8 +14,8 @@ import io.vertx.core.eventbus.Message
import akka.actor.ActorSystem
class VertxToAkkaGateway(system: ActorSystem, vertx: Vertx,
authService: ActorRef,
echoService: ActorRef) extends IAkkaToVertxGateway {
authService: ActorRef,
echoService: ActorRef) extends IAkkaToVertxGateway {
implicit def executionContext = system.dispatcher
val consumer: MessageConsumer[String] = vertx.eventBus().consumer("foofoofoo")

View File

@ -17,7 +17,7 @@ object ClientActor {
}
class ClientActor(clientId: String, connEventBus: InternalMessageBus)
extends Actor with ActorLogging with SystemConfiguration {
extends Actor with ActorLogging with SystemConfiguration {
private val conns = new Connections
private var authorized = false

View File

@ -11,7 +11,7 @@ object MsgToAkkaAppsToJsonActor {
}
class MsgToAkkaAppsToJsonActor(connEventBus: InternalMessageBus)
extends Actor with ActorLogging with SystemConfiguration {
extends Actor with ActorLogging with SystemConfiguration {
def receive = {
case msg: MsgToAkkaApps => handle(msg)

View File

@ -13,9 +13,9 @@ object MsgToClientJsonActor {
class MsgToClientJsonActor(msgToClientGW: MsgToClientGW) extends Actor with ActorLogging {
def receive = {
case msg: BroadcastMsgToMeeting => handleBroadcastMsg(msg)
case msg: DirectMsgToClient => handleDirectMsg(msg)
case msg: DisconnectClientMsg => handleDisconnectClientMsg(msg)
case msg: BroadcastMsgToMeeting => handleBroadcastMsg(msg)
case msg: DirectMsgToClient => handleDirectMsg(msg)
case msg: DisconnectClientMsg => handleDisconnectClientMsg(msg)
case msg: DisconnectAllMeetingClientsMsg => handleDisconnectAllMeetingClientsMsg(msg)
}

View File

@ -13,18 +13,18 @@ object ReceivedJsonMsgHdlrActor {
}
class ReceivedJsonMsgHdlrActor(val connEventBus: InternalMessageBus)
extends Actor with ActorLogging with SystemConfiguration {
extends Actor with ActorLogging with SystemConfiguration {
def receive = {
case msg: JsonMsgFromAkkaApps => handleReceivedJsonMessage(msg)
case _ => // do nothing
case _ => // do nothing
}
def handleReceivedJsonMessage(msg: JsonMsgFromAkkaApps): Unit = {
//log.debug("****** Received JSON msg " + msg.data)
JsonUtil.fromJson[BbbCommonEnvJsNodeMsg](msg.data) match {
case Success(m) => connEventBus.publish(MsgFromConnBusMsg(fromAkkaAppsChannel, MsgFromAkkaApps(m)))
case Success(m) => connEventBus.publish(MsgFromConnBusMsg(fromAkkaAppsChannel, MsgFromAkkaApps(m)))
case Failure(ex) => log.error("Failed to deserialize message " + ex)
}

View File

@ -25,11 +25,13 @@ object AppsRedisSubscriberActor extends SystemConfiguration {
}
class AppsRedisSubscriberActor(connEventBus: InternalMessageBus, redisHost: String,
redisPort: Int,
channels: Seq[String] = Nil, patterns: Seq[String] = Nil)
extends RedisSubscriberActor(new InetSocketAddress(redisHost, redisPort),
channels, patterns, onConnectStatus = connected => { println(s"connected: $connected") })
with SystemConfiguration with ActorLogging {
redisPort: Int,
channels: Seq[String] = Nil, patterns: Seq[String] = Nil)
extends RedisSubscriberActor(
new InetSocketAddress(redisHost, redisPort),
channels, patterns, onConnectStatus = connected => { println(s"connected: $connected") }
)
with SystemConfiguration with ActorLogging {
override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
case e: Exception => {

View File

@ -11,15 +11,15 @@ object MeetingActor {
}
class MeetingActor(val meetingId: String, connEventBus: InternalMessageBus)
extends Actor with ActorLogging
with SystemConfiguration {
extends Actor with ActorLogging
with SystemConfiguration {
private val userMgr = new UsersManager
def receive = {
case msg: ClientConnectedMsg => handleConnectMsg(msg)
case msg: ClientConnectedMsg => handleConnectMsg(msg)
case msg: ClientDisconnectedMsg => handleDisconnectMsg(msg)
case msg: MsgFromClientMsg => handleMsgFromClientMsg(msg)
case msg: MsgFromClientMsg => handleMsgFromClientMsg(msg)
case msg: BbbCommonEnvJsNodeMsg => handleBbbServerMsg(msg)
// TODO: Should keep track of user lifecycle so we can remove when user leaves the meeting.
}
@ -69,9 +69,9 @@ class MeetingActor(val meetingId: String, connEventBus: InternalMessageBus)
def handleServerMsg(msgType: String, msg: BbbCommonEnvJsNodeMsg): Unit = {
//log.debug("**** MeetingActor handleServerMsg " + msg.envelope.name)
msgType match {
case MessageTypes.DIRECT => handleDirectMessage(msg)
case MessageTypes.DIRECT => handleDirectMessage(msg)
case MessageTypes.BROADCAST_TO_MEETING => handleBroadcastMessage(msg)
case MessageTypes.SYSTEM => handleSystemMessage(msg)
case MessageTypes.SYSTEM => handleSystemMessage(msg)
}
}

View File

@ -10,15 +10,15 @@ object MeetingManagerActor {
}
class MeetingManagerActor(connEventBus: InternalMessageBus)
extends Actor with ActorLogging {
extends Actor with ActorLogging {
private val meetingMgr = new MeetingManager
def receive = {
case msg: ClientConnectedMsg => handleConnectMsg(msg)
case msg: ClientConnectedMsg => handleConnectMsg(msg)
case msg: ClientDisconnectedMsg => handleDisconnectMsg(msg)
case msg: MsgFromClientMsg => handleMsgFromClientMsg(msg)
case msg: MsgFromAkkaApps => handleBbbServerMsg(msg)
case msg: MsgFromClientMsg => handleMsgFromClientMsg(msg)
case msg: MsgFromAkkaApps => handleBbbServerMsg(msg)
// TODO we should monitor meeting lifecycle so we can remove when meeting ends.
}
@ -67,9 +67,9 @@ class MeetingManagerActor(connEventBus: InternalMessageBus)
def handleServerMsg(msgType: String, msg: BbbCommonEnvJsNodeMsg): Unit = {
//log.debug("**** MeetingManagerActor handleServerMsg " + msg.envelope.name)
msgType match {
case MessageTypes.DIRECT => handleDirectMessage(msg)
case MessageTypes.DIRECT => handleDirectMessage(msg)
case MessageTypes.BROADCAST_TO_MEETING => handleBroadcastMessage(msg)
case MessageTypes.SYSTEM => handleSystemMessage(msg)
case MessageTypes.SYSTEM => handleSystemMessage(msg)
}
}
@ -78,7 +78,7 @@ class MeetingManagerActor(connEventBus: InternalMessageBus)
case Some(meetingId2) => //log.debug("**** MeetingManagerActor forwardToMeeting. Found " + meetingId2)
MeetingManager.findWithMeetingId(meetingMgr, meetingId2) match {
case Some(meetingId2) => //log.debug("**** MeetingManagerActor forwardToMeeting. Found " + meetingId2.meetingId)
case None => //log.debug("**** MeetingManagerActor forwardToMeeting. Could not find meetingId")
case None => //log.debug("**** MeetingManagerActor forwardToMeeting. Could not find meetingId")
}
case None => log.debug("**** MeetingManagerActor forwardToMeeting. Could not find meetingId")
}

View File

@ -4,15 +4,19 @@ import akka.actor.ActorContext
import org.bigbluebutton.client.bus.{ InternalMessageBus }
object User {
def apply(userId: String,
connEventBus: InternalMessageBus,
meetingId: String)(implicit context: ActorContext): User =
def apply(
userId: String,
connEventBus: InternalMessageBus,
meetingId: String
)(implicit context: ActorContext): User =
new User(userId, connEventBus, meetingId)(context)
}
class User(val userId: String,
class User(
val userId: String,
connEventBus: InternalMessageBus,
meetingId: String)(implicit val context: ActorContext) {
meetingId: String
)(implicit val context: ActorContext) {
val actorRef = context.actorOf(UserActor.props(userId, connEventBus, meetingId), meetingId + "-" + userId)
}

View File

@ -10,27 +10,31 @@ import com.fasterxml.jackson.databind.JsonNode
import scala.util.{ Failure, Success }
object UserActor {
def props(userId: String,
connEventBus: InternalMessageBus,
meetingId: String): Props =
def props(
userId: String,
connEventBus: InternalMessageBus,
meetingId: String
): Props =
Props(classOf[UserActor], userId, connEventBus, meetingId)
}
class UserActor(val userId: String,
connEventBus: InternalMessageBus,
meetingId: String)
extends Actor with ActorLogging with SystemConfiguration {
class UserActor(
val userId: String,
connEventBus: InternalMessageBus,
meetingId: String
)
extends Actor with ActorLogging with SystemConfiguration {
private val conns = new Connections
private var authorized = false
def receive = {
case msg: ClientConnectedMsg => handleConnectMsg(msg)
case msg: ClientConnectedMsg => handleConnectMsg(msg)
case msg: ClientDisconnectedMsg => handleDisconnectMsg(msg)
case msg: MsgFromClientMsg => handleMsgFromClientMsg(msg, true)
case msg: MsgFromClientMsg => handleMsgFromClientMsg(msg, true)
case msg: BbbCommonEnvJsNodeMsg => handleBbbServerMsg(msg)
case _ => log.debug("***** UserActor cannot handle msg ")
case _ => log.debug("***** UserActor cannot handle msg ")
}
private def createConnection(id: String, sessionId: String, active: Boolean): Connection = {
@ -156,9 +160,9 @@ class UserActor(val userId: String,
// log.debug("**** UserActor handleServerMsg " + msg)
println("************* MESSAGE FROM SERVER *********** \n" + msg)
msgType match {
case MessageTypes.DIRECT => handleDirectMessage(msg)
case MessageTypes.DIRECT => handleDirectMessage(msg)
case MessageTypes.BROADCAST_TO_MEETING => handleBroadcastMessage(msg)
case MessageTypes.SYSTEM => handleSystemMessage(msg)
case MessageTypes.SYSTEM => handleSystemMessage(msg)
}
}