Merge branch 'master' into async-await-test

This commit is contained in:
Tainan Felipe 2018-12-10 15:23:16 -02:00 committed by GitHub
commit ac9dff0c3e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
297 changed files with 8602 additions and 6310 deletions

View File

@ -27,7 +27,7 @@ env:
- BBB_SERVER_URL=http://localhost/bigbluebutton/api
script:
- travis_wait bash ./build_script.sh $JOB_TYPE
- travis_wait 30 bash ./build_script.sh $JOB_TYPE
after_script:
- docker stop $docker

View File

@ -1,14 +1,19 @@
import org.bigbluebutton.build._
import scalariform.formatter.preferences._
import com.typesafe.sbt.SbtScalariform
import com.typesafe.sbt.SbtScalariform.ScalariformKeys
import com.typesafe.sbt.SbtNativePackager.autoImport._
enablePlugins(JavaServerAppPackaging)
name := "bbb-apps-akka"
version := "0.0.3"
organization := "org.bigbluebutton"
val compileSettings = Seq(
organization := "org.bigbluebutton",
version := "0.0.2"
scalaVersion := "2.12.6"
scalacOptions ++= Seq(
scalacOptions ++= List(
"-unchecked",
"-deprecation",
"-Xlint",
@ -16,17 +21,13 @@ scalacOptions ++= Seq(
"-language:_",
"-target:jvm-1.8",
"-encoding", "UTF-8"
),
javacOptions ++= List(
"-Xlint:unchecked",
"-Xlint:deprecation"
)
resolvers ++= Seq(
"spray repo" at "http://repo.spray.io/",
"rediscala" at "http://dl.bintray.com/etaty/maven",
"blindside-repos" at "http://blindside.googlecode.com/svn/repository/"
)
resolvers += Resolver.sonatypeRepo("releases")
resolvers += Resolver.typesafeRepo("releases")
publishTo := Some(Resolver.file("file", new File(Path.userHome.absolutePath + "/dev/repo/maven-repo/releases")))
// We want to have our jar files in lib_managed dir.
@ -38,75 +39,15 @@ testOptions in Test += Tests.Argument(TestFrameworks.Specs2, "html", "console",
testOptions in Test += Tests.Argument(TestFrameworks.ScalaTest, "-h", "target/scalatest-reports")
val akkaVersion = "2.5.14"
val scalaTestVersion = "3.0.5"
lazy val bbbAppsAkka = (project in file(".")).settings(name := "bbb-apps-akka", libraryDependencies ++= Dependencies.runtime).settings(compileSettings)
libraryDependencies ++= {
Seq(
"ch.qos.logback" % "logback-classic" % "1.2.3" % "runtime",
"junit" % "junit" % "4.11",
"commons-codec" % "commons-codec" % "1.11",
"org.apache.commons" % "commons-lang3" % "3.7"
)
}
scalariformAutoformat := false
libraryDependencies += "org.bigbluebutton" % "bbb-common-message_2.12" % "0.0.19-SNAPSHOT"
// https://mvnrepository.com/artifact/org.scala-lang/scala-library
libraryDependencies += "org.scala-lang" % "scala-library" % scalaVersion.value
// https://mvnrepository.com/artifact/org.scala-lang/scala-compiler
libraryDependencies += "org.scala-lang" % "scala-compiler" % scalaVersion.value
// https://mvnrepository.com/artifact/com.typesafe.akka/akka-actor_2.12
libraryDependencies += "com.typesafe.akka" % "akka-actor_2.12" % akkaVersion
// https://mvnrepository.com/artifact/com.typesafe.akka/akka-slf4j_2.12
libraryDependencies += "com.typesafe.akka" % "akka-slf4j_2.12" % akkaVersion
// https://mvnrepository.com/artifact/com.github.etaty/rediscala_2.12
libraryDependencies += "com.github.etaty" % "rediscala_2.12" % "1.8.0"
libraryDependencies += "com.softwaremill.quicklens" %% "quicklens" % "1.4.11"
libraryDependencies += "com.google.code.gson" % "gson" % "2.8.5"
libraryDependencies += "joda-time" % "joda-time" % "2.10"
libraryDependencies += "io.spray" % "spray-json_2.12" % "1.3.4"
libraryDependencies += "org.parboiled" % "parboiled-scala_2.12" % "1.1.8"
// https://mvnrepository.com/artifact/com.fasterxml.jackson.module/jackson-module-scala_2.12
libraryDependencies += "com.fasterxml.jackson.module" % "jackson-module-scala_2.12" % "2.9.6"
// For generating test reports
libraryDependencies += "org.pegdown" % "pegdown" % "1.6.0" % "test"
// https://mvnrepository.com/artifact/com.typesafe.akka/akka-testkit_2.12
libraryDependencies += "com.typesafe.akka" % "akka-testkit_2.12" % akkaVersion % "test"
// https://mvnrepository.com/artifact/org.scalactic/scalactic_2.12
libraryDependencies += "org.scalactic" % "scalactic_2.12" % "3.0.3" % "test"
// https://mvnrepository.com/artifact/org.scalatest/scalatest_2.12
libraryDependencies += "org.scalatest" % "scalatest_2.12" % scalaTestVersion % "test"
libraryDependencies += "org.mockito" % "mockito-core" % "2.21.0" % "test"
import com.typesafe.sbt.SbtScalariform
import scalariform.formatter.preferences._
import com.typesafe.sbt.SbtScalariform.ScalariformKeys
SbtScalariform.defaultScalariformSettings
ScalariformKeys.preferences := ScalariformKeys.preferences.value
scalariformPreferences := scalariformPreferences.value
.setPreference(AlignSingleLineCaseStatements, true)
.setPreference(DoubleIndentClassDeclaration, true)
.setPreference(DoubleIndentConstructorArguments, true)
.setPreference(AlignParameters, true)
//-----------
// Packaging
//
@ -132,14 +73,7 @@ daemonUser in Linux := user
// group which will execute the application
daemonGroup in Linux := group
mappings in Universal <+= (packageBin in Compile, sourceDirectory ) map { (_, src) =>
// Move the application.conf so the user can override settings here
val appConf = src / "main" / "resources" / "application.conf"
appConf -> "conf/application.conf"
}
mappings in(Universal, packageBin) += file("src/main/resources/application.conf") -> "conf/application.conf"
mappings in(Universal, packageBin) += file("src/main/resources/logback.xml") -> "conf/logback.xml"
mappings in Universal <+= (packageBin in Compile, sourceDirectory ) map { (_, src) =>
// Move logback.xml so the user can override settings here
val logConf = src / "main" / "resources" / "logback.xml"
logConf -> "conf/logback.xml"
}
debianPackageDependencies in Debian ++= Seq("java8-runtime-headless", "bash")

View File

@ -0,0 +1,83 @@
package org.bigbluebutton.build
import sbt._
import Keys._
object Dependencies {
object Versions {
// Scala
val scala = "2.12.7"
val junit = "4.12"
val junitInterface = "0.11"
val scalactic = "3.0.3"
// Libraries
val akkaVersion = "2.5.17"
val gson = "2.8.5"
val jackson = "2.9.7"
val logback = "1.2.3"
val quicklens = "1.4.11"
val spray = "1.3.4"
// Apache Commons
val lang = "3.8.1"
val codec = "1.11"
// BigBlueButton
val bbbCommons = "0.0.20-SNAPSHOT"
// Test
val scalaTest = "3.0.5"
val mockito = "2.23.0"
val akkaTestKit = "2.5.18"
}
object Compile {
val scalaLibrary = "org.scala-lang" % "scala-library" % Versions.scala
val scalaCompiler = "org.scala-lang" % "scala-compiler" % Versions.scala
val akkaActor = "com.typesafe.akka" % "akka-actor_2.12" % Versions.akkaVersion
val akkaSl4fj = "com.typesafe.akka" % "akka-slf4j_2.12" % Versions.akkaVersion
val googleGson = "com.google.code.gson" % "gson" % Versions.gson
val jacksonModule = "com.fasterxml.jackson.module" %% "jackson-module-scala" % Versions.jackson
val quicklens = "com.softwaremill.quicklens" %% "quicklens" % Versions.quicklens
val logback = "ch.qos.logback" % "logback-classic" % Versions.logback % "runtime"
val commonsCodec = "commons-codec" % "commons-codec" % Versions.codec
val sprayJson = "io.spray" % "spray-json_2.12" % Versions.spray
val apacheLang = "org.apache.commons" % "commons-lang3" % Versions.lang
val bbbCommons = "org.bigbluebutton" % "bbb-common-message_2.12" % Versions.bbbCommons
}
object Test {
val scalaTest = "org.scalatest" %% "scalatest" % Versions.scalaTest % "test"
val junit = "junit" % "junit" % Versions.junit % "test"
val mockitoCore = "org.mockito" % "mockito-core" % Versions.mockito % "test"
val scalactic = "org.scalactic" % "scalactic_2.12" % Versions.scalactic % "test"
val akkaTestKit = "com.typesafe.akka" %% "akka-testkit" % Versions.akkaTestKit % "test"
}
val testing = Seq(
Test.scalaTest,
Test.junit,
Test.mockitoCore,
Test.scalactic,
Test.akkaTestKit)
val runtime = Seq(
Compile.scalaLibrary,
Compile.scalaCompiler,
Compile.akkaActor,
Compile.akkaSl4fj,
Compile.googleGson,
Compile.jacksonModule,
Compile.quicklens,
Compile.logback,
Compile.commonsCodec,
Compile.sprayJson,
Compile.apacheLang,
Compile.bbbCommons) ++ testing
}

View File

@ -1 +1 @@
sbt.version=0.13.8
sbt.version=1.2.6

View File

@ -1,11 +1,11 @@
addSbtPlugin("io.spray" % "sbt-revolver" % "0.9.1")
addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "5.2.4")
addSbtPlugin("org.scalariform" % "sbt-scalariform" % "1.8.2")
addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "2.2.0")
addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.3.12")
addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.3.6")
addSbtPlugin("net.vonbuchholtz" % "sbt-dependency-check" % "0.2.7")
addSbtPlugin("net.vonbuchholtz" % "sbt-dependency-check" % "0.2.8")
addSbtPlugin("org.scalastyle" %% "scalastyle-sbt-plugin" % "1.0.0")

View File

@ -1,3 +1 @@
sbt clean
sbt run
sbt clean run

View File

@ -10,7 +10,7 @@ akka {
loggers = ["akka.event.slf4j.Slf4jLogger"]
loglevel = "DEBUG"
rediscala-publish-worker-dispatcher {
redis-publish-worker-dispatcher {
mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox"
# Throughput defines the maximum number of messages to be
# processed per actor before the thread jumps to the next actor.
@ -18,7 +18,7 @@ akka {
throughput = 512
}
rediscala-subscriber-worker-dispatcher {
redis-subscriber-worker-dispatcher {
mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox"
# Throughput defines the maximum number of messages to be
# processed per actor before the thread jumps to the next actor.

View File

@ -1,12 +1,18 @@
package org.bigbluebutton
import akka.event.Logging
import akka.actor.ActorSystem
import org.bigbluebutton.endpoint.redis.{ AppsRedisSubscriberActor, KeepAliveRedisPublisher, RedisPublisher, RedisRecorderActor }
import org.bigbluebutton.common2.redis.RedisPublisher
import org.bigbluebutton.core._
import org.bigbluebutton.core.bus._
import org.bigbluebutton.core.pubsub.senders.ReceivedJsonMsgHandlerActor
import org.bigbluebutton.core2.{ AnalyticsActor, FromAkkaAppsMsgSenderActor }
import org.bigbluebutton.core2.AnalyticsActor
import org.bigbluebutton.core2.FromAkkaAppsMsgSenderActor
import org.bigbluebutton.endpoint.redis.AppsRedisSubscriberActor
import org.bigbluebutton.endpoint.redis.RedisRecorderActor
import akka.actor.ActorSystem
import akka.event.Logging
import org.bigbluebutton.common2.redis.MessageSender
import org.bigbluebutton.common2.bus.IncomingJsonMessageBus
object Boot extends App with SystemConfiguration {
@ -22,7 +28,7 @@ object Boot extends App with SystemConfiguration {
val outGW = new OutMessageGatewayImp(outBus2)
val redisPublisher = new RedisPublisher(system)
val redisPublisher = new RedisPublisher(system, "BbbAppsAkkaPub")
val msgSender = new MessageSender(redisPublisher)
val redisRecorderActor = system.actorOf(RedisRecorderActor.props(system), "redisRecorderActor")
@ -46,8 +52,5 @@ object Boot extends App with SystemConfiguration {
val redisMessageHandlerActor = system.actorOf(ReceivedJsonMsgHandlerActor.props(bbbMsgBus, incomingJsonMessageBus))
incomingJsonMessageBus.subscribe(redisMessageHandlerActor, toAkkaAppsJsonChannel)
val redisSubscriberActor = system.actorOf(AppsRedisSubscriberActor.props(incomingJsonMessageBus), "redis-subscriber")
val keepAliveRedisPublisher = new KeepAliveRedisPublisher(system, redisPublisher)
val redisSubscriberActor = system.actorOf(AppsRedisSubscriberActor.props(system, incomingJsonMessageBus), "redis-subscriber")
}

View File

@ -1,15 +1,10 @@
package org.bigbluebutton
import com.typesafe.config.ConfigFactory
import scala.util.Try
trait SystemConfiguration {
import org.bigbluebutton.common2.redis.RedisConfiguration
val config = ConfigFactory.load()
lazy val redisHost = Try(config.getString("redis.host")).getOrElse("127.0.0.1")
lazy val redisPort = Try(config.getInt("redis.port")).getOrElse(6379)
lazy val redisPassword = Try(config.getString("redis.password")).getOrElse("")
trait SystemConfiguration extends RedisConfiguration {
lazy val bbbWebHost = Try(config.getString("services.bbbWebHost")).getOrElse("localhost")
lazy val bbbWebPort = Try(config.getInt("services.bbbWebPort")).getOrElse(8888)
@ -31,8 +26,6 @@ trait SystemConfiguration {
lazy val outBbbMsgMsgChannel = Try(config.getString("eventBus.outBbbMsgMsgChannel")).getOrElse("OutBbbMsgChannel")
lazy val recordServiceMessageChannel = Try(config.getString("eventBus.recordServiceMessageChannel")).getOrElse("RecordServiceMessageChannel")
lazy val toAkkaAppsRedisChannel = Try(config.getString("redis.toAkkaAppsRedisChannel")).getOrElse("to-akka-apps-redis-channel")
lazy val fromAkkaAppsRedisChannel = Try(config.getString("redis.fromAkkaAppsRedisChannel")).getOrElse("from-akka-apps-redis-channel")
lazy val toHTML5RedisChannel = Try(config.getString("redis.toHTML5RedisChannel")).getOrElse("to-html5-redis-channel")
lazy val fromAkkaAppsChannel = Try(config.getString("eventBus.fromAkkaAppsChannel")).getOrElse("from-akka-apps-channel")
lazy val toAkkaAppsChannel = Try(config.getString("eventBus.toAkkaAppsChannel")).getOrElse("to-akka-apps-channel")
@ -41,21 +34,9 @@ trait SystemConfiguration {
lazy val toAkkaAppsJsonChannel = Try(config.getString("eventBus.toAkkaAppsChannel")).getOrElse("to-akka-apps-json-channel")
lazy val fromAkkaAppsJsonChannel = Try(config.getString("eventBus.fromAkkaAppsChannel")).getOrElse("from-akka-apps-json-channel")
lazy val toVoiceConfRedisChannel = Try(config.getString("redis.toVoiceConfRedisChannel")).getOrElse("to-voice-conf-redis-channel")
lazy val fromVoiceConfRedisChannel = Try(config.getString("redis.fromVoiceConfRedisChannel")).getOrElse("from-voice-conf-redis-channel")
lazy val fromAkkaAppsWbRedisChannel = Try(config.getString("redis.fromAkkaAppsWbRedisChannel")).getOrElse("from-akka-apps-wb-redis-channel")
lazy val fromAkkaAppsChatRedisChannel = Try(config.getString("redis.fromAkkaAppsChatRedisChannel")).getOrElse("from-akka-apps-chat-redis-channel")
lazy val fromAkkaAppsPresRedisChannel = Try(config.getString("redis.fromAkkaAppsPresRedisChannel")).getOrElse("from-akka-apps-pres-redis-channel")
lazy val maxNumberOfNotes = Try(config.getInt("sharedNotes.maxNumberOfNotes")).getOrElse(3)
lazy val maxNumberOfUndos = Try(config.getInt("sharedNotes.maxNumberOfUndos")).getOrElse(30)
lazy val httpInterface = Try(config.getString("http.interface")).getOrElse("")
lazy val httpPort = Try(config.getInt("http.port")).getOrElse(9090)
lazy val telizeHost = Try(config.getString("services.telizeHost")).getOrElse("")
lazy val telizePort = Try(config.getInt("services.telizePort")).getOrElse(80)
lazy val applyPermissionCheck = Try(config.getBoolean("apps.checkPermissions")).getOrElse(false)
lazy val voiceConfRecordPath = Try(config.getString("voiceConf.recordPath")).getOrElse("/var/freeswitch/meetings")

View File

@ -1,10 +0,0 @@
package org.bigbluebutton.core
import org.bigbluebutton.endpoint.redis.RedisPublisher
class MessageSender(publisher: RedisPublisher) {
def send(channel: String, data: String) {
publisher.publish(channel, data)
}
}

View File

@ -22,7 +22,6 @@ trait RegisterUserReqMsgHdlr {
BbbCommonEnvCoreMsg(envelope, event)
}
val guestPolicy = liveMeeting.guestsWaiting.getGuestPolicy().policy
val guestStatus = msg.body.guestStatus
val regUser = RegisteredUsers.create(msg.body.intUserId, msg.body.extUserId,

View File

@ -7,6 +7,8 @@ import org.bigbluebutton.common2.msgs._
import org.bigbluebutton.core.bus._
import org.bigbluebutton.core2.ReceivedMessageRouter
import scala.reflect.runtime.universe._
import org.bigbluebutton.common2.bus.ReceivedJsonMessage
import org.bigbluebutton.common2.bus.IncomingJsonMessageBus
object ReceivedJsonMsgHandlerActor {
def props(eventBus: BbbMsgRouterEventBus, incomingJsonMessageBus: IncomingJsonMessageBus): Props =

View File

@ -21,7 +21,9 @@ package org.bigbluebutton.core.record.events
import java.text.SimpleDateFormat
import scala.collection.Map
import scala.collection.mutable.HashMap
import org.bigbluebutton.core.api.TimestampGenerator
trait RecordEvent {
@ -70,6 +72,7 @@ trait RecordEvent {
eventMap.put(EVENT, event)
}
// @fixme : not used anymore
/**
* Convert the event into a Map to be recorded.
* @return
@ -77,6 +80,7 @@ trait RecordEvent {
final def toMap(): Map[String, String] = {
eventMap.toMap
}
}
object RecordEvent extends RecordEvent {

View File

@ -4,7 +4,7 @@ import akka.actor.{ Actor, ActorLogging, Props }
import org.bigbluebutton.SystemConfiguration
import org.bigbluebutton.common2.msgs._
import org.bigbluebutton.common2.util.JsonUtil
import org.bigbluebutton.core.MessageSender
import org.bigbluebutton.common2.redis.MessageSender
object FromAkkaAppsMsgSenderActor {
def props(msgSender: MessageSender): Props = Props(classOf[FromAkkaAppsMsgSenderActor], msgSender)

View File

@ -0,0 +1,32 @@
package org.bigbluebutton.endpoint.redis
import org.bigbluebutton.SystemConfiguration
import org.bigbluebutton.common2.bus.IncomingJsonMessageBus
import org.bigbluebutton.common2.redis.{ RedisSubscriber, RedisSubscriberProvider }
import akka.actor.ActorSystem
import akka.actor.Props
object AppsRedisSubscriberActor extends RedisSubscriber {
val channels = Seq(toAkkaAppsRedisChannel, fromVoiceConfRedisChannel)
val patterns = Seq("bigbluebutton:to-bbb-apps:*", "bigbluebutton:from-voice-conf:*", "bigbluebutton:from-bbb-transcode:*")
def props(system: ActorSystem, jsonMsgBus: IncomingJsonMessageBus): Props =
Props(
classOf[AppsRedisSubscriberActor],
system, jsonMsgBus,
redisHost, redisPort,
channels, patterns).withDispatcher("akka.redis-subscriber-worker-dispatcher")
}
class AppsRedisSubscriberActor(
system: ActorSystem,
jsonMsgBus: IncomingJsonMessageBus,
redisHost: String, redisPort: Int,
channels: Seq[String] = Nil, patterns: Seq[String] = Nil)
extends RedisSubscriberProvider(system, "BbbAppsAkkaSub", channels, patterns, jsonMsgBus) with SystemConfiguration {
addListener(toAkkaAppsJsonChannel)
subscribe()
}

View File

@ -1,64 +0,0 @@
package org.bigbluebutton.endpoint.redis
import akka.actor.Props
import akka.actor.OneForOneStrategy
import akka.actor.SupervisorStrategy.Resume
import java.io.{ PrintWriter, StringWriter }
import java.net.InetSocketAddress
import redis.actors.RedisSubscriberActor
import redis.api.pubsub.{ Message, PMessage }
import scala.concurrent.duration._
import org.bigbluebutton.SystemConfiguration
import org.bigbluebutton.core.bus.{ IncomingJsonMessage, IncomingJsonMessageBus, ReceivedJsonMessage }
import redis.api.servers.ClientSetname
object AppsRedisSubscriberActor extends SystemConfiguration {
val TO_AKKA_APPS = "bbb:to-akka-apps"
val channels = Seq(toAkkaAppsRedisChannel, fromVoiceConfRedisChannel)
val patterns = Seq("bigbluebutton:to-bbb-apps:*", "bigbluebutton:from-voice-conf:*", "bigbluebutton:from-bbb-transcode:*")
def props(jsonMsgBus: IncomingJsonMessageBus): Props =
Props(classOf[AppsRedisSubscriberActor], jsonMsgBus,
redisHost, redisPort,
channels, patterns).withDispatcher("akka.rediscala-subscriber-worker-dispatcher")
}
class AppsRedisSubscriberActor(jsonMsgBus: IncomingJsonMessageBus, redisHost: String,
redisPort: Int,
channels: Seq[String] = Nil, patterns: Seq[String] = Nil)
extends RedisSubscriberActor(
new InetSocketAddress(redisHost, redisPort),
channels, patterns, onConnectStatus = connected => { println(s"connected: $connected") }) with SystemConfiguration {
override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
case e: Exception => {
val sw: StringWriter = new StringWriter()
sw.write("An exception has been thrown on AppsRedisSubscriberActor, exception message [" + e.getMessage() + "] (full stacktrace below)\n")
e.printStackTrace(new PrintWriter(sw))
log.error(sw.toString())
Resume
}
}
// Set the name of this client to be able to distinguish when doing
// CLIENT LIST on redis-cli
write(ClientSetname("BbbAppsAkkaSub").encodedRequest)
def onMessage(message: Message) {
if (message.channel == toAkkaAppsRedisChannel || message.channel == fromVoiceConfRedisChannel) {
val receivedJsonMessage = new ReceivedJsonMessage(message.channel, message.data.utf8String)
//log.debug(s"RECEIVED:\n [${receivedJsonMessage.channel}] \n ${receivedJsonMessage.data} \n")
jsonMsgBus.publish(IncomingJsonMessage(toAkkaAppsJsonChannel, receivedJsonMessage))
}
}
def onPMessage(pmessage: PMessage) {
// We don't use PSubscribe anymore, but an implementation of the method is required
//log.error("Should not be receiving a PMessage. It triggered on a match of pattern: " + pmessage.patternMatched)
//log.error(pmessage.data.utf8String)
}
}

View File

@ -1,16 +0,0 @@
package org.bigbluebutton.endpoint.redis
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
import akka.actor.ActorSystem
import org.bigbluebutton.SystemConfiguration
class KeepAliveRedisPublisher(val system: ActorSystem, sender: RedisPublisher) extends SystemConfiguration {
val startedOn = System.currentTimeMillis()
system.scheduler.schedule(2 seconds, 5 seconds) {
//val msg = new BbbAppsIsAliveMessage(startedOn, System.currentTimeMillis())
// sender.publish("bigbluebutton:from-bbb-apps:keepalive", msg.toJson())
}
}

View File

@ -1,21 +0,0 @@
package org.bigbluebutton.endpoint.redis
import redis.RedisClient
import akka.actor.ActorSystem
import org.bigbluebutton.SystemConfiguration
import akka.util.ByteString
class RedisPublisher(val system: ActorSystem) extends SystemConfiguration {
val redis = RedisClient(redisHost, redisPort)(system)
// Set the name of this client to be able to distinguish when doing
// CLIENT LIST on redis-cli
redis.clientSetname("BbbAppsAkkaPub")
def publish(channel: String, data: String) {
//println("PUBLISH TO [" + channel + "]: \n [" + data + "]")
redis.publish(channel, ByteString(data))
}
}

View File

@ -1,40 +1,30 @@
package org.bigbluebutton.endpoint.redis
import akka.actor.{ Actor, ActorLogging, ActorSystem, Props }
import org.bigbluebutton.SystemConfiguration
import redis.RedisClient
import scala.concurrent.ExecutionContext.Implicits.global
import scala.collection.immutable.StringOps
import scala.collection.JavaConverters._
import org.bigbluebutton.SystemConfiguration
import org.bigbluebutton.common2.msgs._
import org.bigbluebutton.core.record.events._
import org.bigbluebutton.common2.redis.RedisStorageProvider
import org.bigbluebutton.core.apps.groupchats.GroupChatApp
import org.bigbluebutton.core.record.events._
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.ActorSystem
import akka.actor.Props
object RedisRecorderActor {
def props(system: ActorSystem): Props = Props(classOf[RedisRecorderActor], system)
}
class RedisRecorderActor(val system: ActorSystem)
extends SystemConfiguration
class RedisRecorderActor(system: ActorSystem)
extends RedisStorageProvider(system, "BbbAppsAkkaRecorder")
with SystemConfiguration
with Actor with ActorLogging {
val redis = RedisClient(redisHost, redisPort)(system)
// Set the name of this client to be able to distinguish when doing
// CLIENT LIST on redis-cli
redis.clientSetname("BbbAppsAkkaRecorder")
val COLON = ":"
private def record(session: String, message: collection.immutable.Map[String, String]): Unit = {
for {
msgid <- redis.incr("global:nextRecordedMsgId")
key = "recording" + COLON + session + COLON + msgid
_ <- redis.hmset(key.mkString, message)
_ <- redis.expire(key.mkString, keysExpiresInSec)
key2 = "meeting" + COLON + session + COLON + "recordings"
_ <- redis.rpush(key2.mkString, msgid.toString)
result <- redis.expire(key2.mkString, keysExpiresInSec)
} yield result
private def record(session: String, message: java.util.Map[java.lang.String, java.lang.String]): Unit = {
redis.recordAndExpire(session, message)
}
def receive = {
@ -121,7 +111,7 @@ class RedisRecorderActor(val system: ActorSystem)
ev.setMessage(msg.body.msg.message)
ev.setColor(msg.body.msg.color)
record(msg.header.meetingId, ev.toMap)
record(msg.header.meetingId, ev.toMap.asJava)
}
}
@ -129,7 +119,7 @@ class RedisRecorderActor(val system: ActorSystem)
val ev = new ClearPublicChatRecordEvent()
ev.setMeetingId(msg.header.meetingId)
record(msg.header.meetingId, ev.toMap)
record(msg.header.meetingId, ev.toMap.asJava)
}
private def handlePresentationConversionCompletedEvtMsg(msg: PresentationConversionCompletedEvtMsg) {
@ -139,7 +129,7 @@ class RedisRecorderActor(val system: ActorSystem)
ev.setPresentationName(msg.body.presentation.id)
ev.setOriginalFilename(msg.body.presentation.name)
record(msg.header.meetingId, ev.toMap)
record(msg.header.meetingId, ev.toMap.asJava)
if (msg.body.presentation.current) {
recordSharePresentationEvent(msg.header.meetingId, msg.body.podId, msg.body.presentation.id)
@ -154,7 +144,7 @@ class RedisRecorderActor(val system: ActorSystem)
ev.setSlide(getPageNum(msg.body.pageId))
ev.setId(msg.body.pageId)
record(msg.header.meetingId, ev.toMap)
record(msg.header.meetingId, ev.toMap.asJava)
}
private def handleResizeAndMovePageEvtMsg(msg: ResizeAndMovePageEvtMsg) {
@ -168,7 +158,7 @@ class RedisRecorderActor(val system: ActorSystem)
ev.setWidthRatio(msg.body.widthRatio)
ev.setHeightRatio(msg.body.heightRatio)
record(msg.header.meetingId, ev.toMap)
record(msg.header.meetingId, ev.toMap.asJava)
}
private def handleRemovePresentationEvtMsg(msg: RemovePresentationEvtMsg) {
@ -177,7 +167,7 @@ class RedisRecorderActor(val system: ActorSystem)
ev.setPodId(msg.body.podId)
ev.setPresentationName(msg.body.presentationId)
record(msg.header.meetingId, ev.toMap)
record(msg.header.meetingId, ev.toMap.asJava)
}
private def handleSetPresentationDownloadableEvtMsg(msg: SetPresentationDownloadableEvtMsg) {
@ -187,7 +177,7 @@ class RedisRecorderActor(val system: ActorSystem)
ev.setPresentationName(msg.body.presentationId)
ev.setDownloadable(msg.body.downloadable)
record(msg.header.meetingId, ev.toMap)
record(msg.header.meetingId, ev.toMap.asJava)
}
private def handleSetCurrentPresentationEvtMsg(msg: SetCurrentPresentationEvtMsg) {
@ -200,7 +190,7 @@ class RedisRecorderActor(val system: ActorSystem)
ev.setPodId(msg.body.podId)
ev.setCurrentPresenter(msg.body.currentPresenterId)
record(msg.header.meetingId, ev.toMap)
record(msg.header.meetingId, ev.toMap.asJava)
}
private def handleRemovePresentationPodEvtMsg(msg: RemovePresentationPodEvtMsg) {
@ -208,7 +198,7 @@ class RedisRecorderActor(val system: ActorSystem)
ev.setMeetingId(msg.header.meetingId)
ev.setPodId(msg.body.podId)
record(msg.header.meetingId, ev.toMap)
record(msg.header.meetingId, ev.toMap.asJava)
}
private def handleSetPresenterInPodRespMsg(msg: SetPresenterInPodRespMsg) {
@ -217,7 +207,7 @@ class RedisRecorderActor(val system: ActorSystem)
ev.setPodId(msg.body.podId)
ev.setNextPresenterId(msg.body.nextPresenterId)
record(msg.header.meetingId, ev.toMap)
record(msg.header.meetingId, ev.toMap.asJava)
}
private def recordSharePresentationEvent(meetingId: String, podId: String, presentationId: String) {
@ -227,7 +217,7 @@ class RedisRecorderActor(val system: ActorSystem)
ev.setPresentationName(presentationId)
ev.setShare(true)
record(meetingId, ev.toMap)
record(meetingId, ev.toMap.asJava)
}
private def getPageNum(id: String): Integer = {
@ -266,7 +256,7 @@ class RedisRecorderActor(val system: ActorSystem)
ev.setPosition(annotation.position)
ev.addAnnotation(annotation.annotationInfo)
record(msg.header.meetingId, ev.toMap)
record(msg.header.meetingId, ev.toMap.asJava)
}
private def handleSendCursorPositionEvtMsg(msg: SendCursorPositionEvtMsg) {
@ -279,7 +269,7 @@ class RedisRecorderActor(val system: ActorSystem)
ev.setXPercent(msg.body.xPercent)
ev.setYPercent(msg.body.yPercent)
record(msg.header.meetingId, ev.toMap)
record(msg.header.meetingId, ev.toMap.asJava)
}
private def handleClearWhiteboardEvtMsg(msg: ClearWhiteboardEvtMsg) {
@ -291,7 +281,7 @@ class RedisRecorderActor(val system: ActorSystem)
ev.setUserId(msg.body.userId)
ev.setFullClear(msg.body.fullClear)
record(msg.header.meetingId, ev.toMap)
record(msg.header.meetingId, ev.toMap.asJava)
}
private def handleUndoWhiteboardEvtMsg(msg: UndoWhiteboardEvtMsg) {
@ -302,7 +292,7 @@ class RedisRecorderActor(val system: ActorSystem)
ev.setWhiteboardId(msg.body.whiteboardId)
ev.setUserId(msg.body.userId)
ev.setShapeId(msg.body.annotationId)
record(msg.header.meetingId, ev.toMap)
record(msg.header.meetingId, ev.toMap.asJava)
}
private def handleUserJoinedMeetingEvtMsg(msg: UserJoinedMeetingEvtMsg): Unit = {
@ -313,7 +303,7 @@ class RedisRecorderActor(val system: ActorSystem)
ev.setName(msg.body.name)
ev.setRole(msg.body.role)
record(msg.header.meetingId, ev.toMap)
record(msg.header.meetingId, ev.toMap.asJava)
}
private def handleUserLeftMeetingEvtMsg(msg: UserLeftMeetingEvtMsg): Unit = {
@ -321,7 +311,7 @@ class RedisRecorderActor(val system: ActorSystem)
ev.setMeetingId(msg.header.meetingId)
ev.setUserId(msg.body.intId)
record(msg.header.meetingId, ev.toMap)
record(msg.header.meetingId, ev.toMap.asJava)
}
private def handlePresenterAssignedEvtMsg(msg: PresenterAssignedEvtMsg): Unit = {
@ -331,7 +321,7 @@ class RedisRecorderActor(val system: ActorSystem)
ev.setName(msg.body.presenterName)
ev.setAssignedBy(msg.body.assignedBy)
record(msg.header.meetingId, ev.toMap)
record(msg.header.meetingId, ev.toMap.asJava)
}
private def handleUserEmojiChangedEvtMsg(msg: UserEmojiChangedEvtMsg) {
handleUserStatusChange(msg.header.meetingId, msg.body.userId, "emojiStatus", msg.body.emoji)
@ -352,7 +342,7 @@ class RedisRecorderActor(val system: ActorSystem)
ev.setStatus(statusName)
ev.setValue(statusValue)
record(meetingId, ev.toMap)
record(meetingId, ev.toMap.asJava)
}
private def handleUserJoinedVoiceConfToClientEvtMsg(msg: UserJoinedVoiceConfToClientEvtMsg) {
@ -365,7 +355,7 @@ class RedisRecorderActor(val system: ActorSystem)
ev.setMuted(msg.body.muted)
ev.setTalking(msg.body.talking)
record(msg.header.meetingId, ev.toMap)
record(msg.header.meetingId, ev.toMap.asJava)
}
private def handleUserLeftVoiceConfToClientEvtMsg(msg: UserLeftVoiceConfToClientEvtMsg) {
@ -374,7 +364,7 @@ class RedisRecorderActor(val system: ActorSystem)
ev.setBridge(msg.body.voiceConf)
ev.setParticipant(msg.body.intId)
record(msg.header.meetingId, ev.toMap)
record(msg.header.meetingId, ev.toMap.asJava)
}
private def handleUserMutedVoiceEvtMsg(msg: UserMutedVoiceEvtMsg) {
@ -384,7 +374,7 @@ class RedisRecorderActor(val system: ActorSystem)
ev.setParticipant(msg.body.intId)
ev.setMuted(msg.body.muted)
record(msg.header.meetingId, ev.toMap)
record(msg.header.meetingId, ev.toMap.asJava)
}
private def handleUserTalkingVoiceEvtMsg(msg: UserTalkingVoiceEvtMsg) {
@ -394,7 +384,7 @@ class RedisRecorderActor(val system: ActorSystem)
ev.setParticipant(msg.body.intId)
ev.setTalking(msg.body.talking)
record(msg.header.meetingId, ev.toMap)
record(msg.header.meetingId, ev.toMap.asJava)
}
private def handleVoiceRecordingStartedEvtMsg(msg: VoiceRecordingStartedEvtMsg) {
@ -404,7 +394,7 @@ class RedisRecorderActor(val system: ActorSystem)
ev.setRecordingTimestamp(msg.body.timestamp)
ev.setFilename(msg.body.stream)
record(msg.header.meetingId, ev.toMap)
record(msg.header.meetingId, ev.toMap.asJava)
}
private def handleVoiceRecordingStoppedEvtMsg(msg: VoiceRecordingStoppedEvtMsg) {
@ -414,7 +404,7 @@ class RedisRecorderActor(val system: ActorSystem)
ev.setRecordingTimestamp(msg.body.timestamp)
ev.setFilename(msg.body.stream)
record(msg.header.meetingId, ev.toMap)
record(msg.header.meetingId, ev.toMap.asJava)
}
private def handleEditCaptionHistoryEvtMsg(msg: EditCaptionHistoryEvtMsg) {
@ -426,7 +416,7 @@ class RedisRecorderActor(val system: ActorSystem)
ev.setLocaleCode(msg.body.localeCode)
ev.setText(msg.body.text)
record(msg.header.meetingId, ev.toMap)
record(msg.header.meetingId, ev.toMap.asJava)
}
private def handleScreenshareRtmpBroadcastStartedEvtMsg(msg: ScreenshareRtmpBroadcastStartedEvtMsg) {
@ -434,7 +424,7 @@ class RedisRecorderActor(val system: ActorSystem)
ev.setMeetingId(msg.header.meetingId)
ev.setStreamPath(msg.body.stream)
record(msg.header.meetingId, ev.toMap)
record(msg.header.meetingId, ev.toMap.asJava)
}
private def handleScreenshareRtmpBroadcastStoppedEvtMsg(msg: ScreenshareRtmpBroadcastStoppedEvtMsg) {
@ -442,7 +432,7 @@ class RedisRecorderActor(val system: ActorSystem)
ev.setMeetingId(msg.header.meetingId)
ev.setStreamPath(msg.body.stream)
record(msg.header.meetingId, ev.toMap)
record(msg.header.meetingId, ev.toMap.asJava)
}
/*
@ -462,7 +452,7 @@ class RedisRecorderActor(val system: ActorSystem)
ev.setUserId(msg.body.setBy)
ev.setRecordingStatus(msg.body.recording)
record(msg.header.meetingId, ev.toMap)
record(msg.header.meetingId, ev.toMap.asJava)
}
private def handleRecordStatusResetSysMsg(msg: RecordStatusResetSysMsg) {
@ -471,7 +461,7 @@ class RedisRecorderActor(val system: ActorSystem)
ev.setUserId(msg.body.setBy)
ev.setRecordingStatus(msg.body.recording)
record(msg.header.meetingId, ev.toMap)
record(msg.header.meetingId, ev.toMap.asJava)
}
private def handleWebcamsOnlyForModeratorChangedEvtMsg(msg: WebcamsOnlyForModeratorChangedEvtMsg) {
@ -480,14 +470,14 @@ class RedisRecorderActor(val system: ActorSystem)
ev.setUserId(msg.body.setBy)
ev.setWebcamsOnlyForModerator(msg.body.webcamsOnlyForModerator)
record(msg.header.meetingId, ev.toMap)
record(msg.header.meetingId, ev.toMap.asJava)
}
private def handleEndAndKickAllSysMsg(msg: EndAndKickAllSysMsg): Unit = {
val ev = new EndAndKickAllRecordEvent()
ev.setMeetingId(msg.header.meetingId)
record(msg.header.meetingId, ev.toMap)
record(msg.header.meetingId, ev.toMap.asJava)
}
private def handleRecordingChapterBreakSysMsg(msg: RecordingChapterBreakSysMsg): Unit = {
@ -495,7 +485,7 @@ class RedisRecorderActor(val system: ActorSystem)
ev.setMeetingId(msg.header.meetingId)
ev.setChapterBreakTimestamp(msg.body.timestamp)
record(msg.header.meetingId, ev.toMap)
record(msg.header.meetingId, ev.toMap.asJava)
}
private def handlePollStartedEvtMsg(msg: PollStartedEvtMsg): Unit = {
@ -503,7 +493,7 @@ class RedisRecorderActor(val system: ActorSystem)
ev.setPollId(msg.body.pollId)
ev.setAnswers(msg.body.poll.answers)
record(msg.header.meetingId, ev.toMap)
record(msg.header.meetingId, ev.toMap.asJava)
}
private def handleUserRespondedToPollRecordMsg(msg: UserRespondedToPollRecordMsg): Unit = {
@ -512,7 +502,7 @@ class RedisRecorderActor(val system: ActorSystem)
ev.setUserId(msg.header.userId)
ev.setAnswerId(msg.body.answerId)
record(msg.header.meetingId, ev.toMap)
record(msg.header.meetingId, ev.toMap.asJava)
}
private def handlePollStoppedEvtMsg(msg: PollStoppedEvtMsg): Unit = {
@ -527,6 +517,6 @@ class RedisRecorderActor(val system: ActorSystem)
val ev = new PollStoppedRecordEvent()
ev.setPollId(pollId)
record(meetingId, ev.toMap)
record(meetingId, ev.toMap.asJava)
}
}

View File

@ -15,10 +15,16 @@ trait AppsTestFixtures {
val meetingName = "test meeting"
val record = false
val voiceConfId = "85115"
val muteOnStart = true
val deskshareConfId = "85115-DESKSHARE"
val durationInMinutes = 10
val maxInactivityTimeoutMinutes = 120
val warnMinutesBeforeMax = 5
val warnMinutesBeforeMax = 30
val meetingExpireIfNoUserJoinedInMinutes = 5
val meetingExpireWhenLastUserLeftInMinutes = 10
val userInactivityInspectTimerInMinutes = 60
val userInactivityThresholdInMinutes = 10
val userActivitySignResponseDelayInMinutes = 5
val autoStartRecording = false
val allowStartStopRecording = false
val webcamsOnlyForModerator = false;
@ -38,24 +44,19 @@ trait AppsTestFixtures {
val red5DeskShareAppTestFixtures = "red5App"
val metadata: collection.immutable.Map[String, String] = Map("foo" -> "bar", "bar" -> "baz", "baz" -> "foo")
val screenshareProps = ScreenshareProps("TODO", "TODO", "TODO")
val breakoutProps = BreakoutProps(parentMeetingId, sequence, Vector())
val breakoutProps = BreakoutProps(parentId = parentMeetingId, sequence = sequence, freeJoin = false, breakoutRooms = Vector())
val meetingProp = MeetingProp(name = meetingName, extId = externalMeetingId, intId = meetingId,
isBreakout = isBreakout.booleanValue())
val durationProps = DurationProps(
duration = durationInMinutes,
createdTime = createTime, createdDate = createDate,
maxInactivityTimeoutMinutes = maxInactivityTimeoutMinutes,
warnMinutesBeforeMax = warnMinutesBeforeMax,
meetingExpireIfNoUserJoinedInMinutes = 5,
meetingExpireWhenLastUserLeftInMinutes = 1
)
val durationProps = DurationProps(duration = durationInMinutes, createdTime = createTime, createdDate = createDate, maxInactivityTimeoutMinutes = maxInactivityTimeoutMinutes, warnMinutesBeforeMax = warnMinutesBeforeMax,
meetingExpireIfNoUserJoinedInMinutes = meetingExpireIfNoUserJoinedInMinutes, meetingExpireWhenLastUserLeftInMinutes = meetingExpireWhenLastUserLeftInMinutes,
userInactivityInspectTimerInMinutes = userInactivityInspectTimerInMinutes, userInactivityThresholdInMinutes = userInactivityInspectTimerInMinutes, userActivitySignResponseDelayInMinutes = userActivitySignResponseDelayInMinutes)
val password = PasswordProp(moderatorPass = moderatorPassword, viewerPass = viewerPassword)
val recordProp = RecordProp(record = record, autoStartRecording = autoStartRecording,
allowStartStopRecording = allowStartStopRecording)
val welcomeProp = WelcomeProp(welcomeMsgTemplate = welcomeMsgTemplate, welcomeMsg = welcomeMsg,
modOnlyMessage = modOnlyMessage)
val voiceProp = VoiceProp(telVoice = voiceConfId, voiceConf = voiceConfId, dialNumber = dialNumber)
val voiceProp = VoiceProp(telVoice = voiceConfId, voiceConf = voiceConfId, dialNumber = dialNumber, muteOnStart = muteOnStart)
val usersProp = UsersProp(maxUsers = maxUsers, webcamsOnlyForModerator = webcamsOnlyForModerator,
guestPolicy = guestPolicy)
val metadataProp = new MetadataProp(metadata)
@ -84,7 +85,6 @@ trait AppsTestFixtures {
val layouts = new Layouts()
val wbModel = new WhiteboardModel()
val presModel = new PresentationModel()
val breakoutRooms = new BreakoutRooms()
val captionModel = new CaptionModel()
val notesModel = new SharedNotesModel()
val registeredUsers = new RegisteredUsers

View File

@ -1,7 +1,6 @@
package org.bigbluebutton.core.domain
import org.bigbluebutton.core.UnitSpec
import org.bigbluebutton.core.running.MeetingExpiryTrackerHelper
import org.bigbluebutton.core.util.TimeUtil
class MeetingInactivityTrackerTests extends UnitSpec {

View File

@ -2,7 +2,6 @@ package org.bigbluebutton.core.models
import org.bigbluebutton.common2.msgs.{ GroupChatAccess, GroupChatUser }
import org.bigbluebutton.core.UnitSpec
import org.bigbluebutton.core.domain.BbbSystemConst
class GroupsChatTests extends UnitSpec {
@ -10,7 +9,7 @@ class GroupsChatTests extends UnitSpec {
val gcId = "gc-id"
val chatName = "Public"
val userId = "uid-1"
val createBy = GroupChatUser(BbbSystemConst.SYSTEM_USER, BbbSystemConst.SYSTEM_USER)
val createBy = GroupChatUser("groupId", "groupname")
val gc = GroupChatFactory.create(gcId, chatName, GroupChatAccess.PUBLIC, createBy, Vector.empty, Vector.empty)
val user = GroupChatUser(userId, "User 1")
val gc2 = gc.add(user)
@ -25,18 +24,16 @@ class GroupsChatTests extends UnitSpec {
}
"A GroupChat" should "be able to add, update, and remove msg" in {
val createBy = GroupChatUser(BbbSystemConst.SYSTEM_USER, BbbSystemConst.SYSTEM_USER)
val createBy = GroupChatUser("groupId", "groupname")
val gcId = "gc-id"
val chatName = "Public"
val userId = "uid-1"
val gc = GroupChatFactory.create(gcId, chatName, GroupChatAccess.PUBLIC, createBy, Vector.empty, Vector.empty)
val msgId1 = "msgid-1"
val ts = System.currentTimeMillis()
val hello = "Hello World!"
val msg1 = GroupChatMessage(id = msgId1, timestamp = ts, correlationId = "cordId1", createdOn = ts,
updatedOn = ts, sender = createBy,
font = "arial", size = 12, color = "red", message = hello)
updatedOn = ts, sender = createBy, color = "red", message = hello)
val gc2 = gc.add(msg1)
assert(gc2.msgs.size == 1)
@ -45,8 +42,7 @@ class GroupsChatTests extends UnitSpec {
val foo = "Foo bar"
val ts2 = System.currentTimeMillis()
val msg2 = GroupChatMessage(id = msgId2, timestamp = ts2, correlationId = "cordId2", createdOn = ts2,
updatedOn = ts2, sender = createBy,
font = "arial", size = 12, color = "red", message = foo)
updatedOn = ts2, sender = createBy, color = "red", message = foo)
val gc3 = gc2.add(msg2)
assert(gc3.msgs.size == 2)
@ -55,8 +51,7 @@ class GroupsChatTests extends UnitSpec {
val msgId3 = "msgid-3"
val ts3 = System.currentTimeMillis()
val msg3 = GroupChatMessage(id = msgId3, timestamp = ts3, correlationId = "cordId3", createdOn = ts3,
updatedOn = ts3, sender = createBy,
font = "arial", size = 12, color = "red", message = baz)
updatedOn = ts3, sender = createBy, color = "red", message = baz)
val gc4 = gc3.update(msg3)
gc4.findMsgWithId(msgId3) match {

View File

@ -3,7 +3,7 @@ package org.bigbluebutton.core.pubsub.sender
import org.bigbluebutton.SystemConfiguration
import org.bigbluebutton.common2.msgs._
import org.bigbluebutton.core.{ AppsTestFixtures, UnitSpec }
import org.bigbluebutton.core.bus.{ BbbMsgEvent, BbbMsgRouterEventBus, ReceivedJsonMessage }
import org.bigbluebutton.core.bus.{ BbbMsgEvent, BbbMsgRouterEventBus }
import org.bigbluebutton.core2.ReceivedMessageRouter
import org.mockito.Mockito._
import org.scalatest.mockito.MockitoSugar

View File

@ -46,9 +46,9 @@ object TestDataGen {
def createUserFor(liveMeeting: LiveMeeting, regUser: RegisteredUser, presenter: Boolean): UserState = {
val u = UserState(intId = regUser.id, extId = regUser.externId, name = regUser.name, role = regUser.role,
guest = regUser.guest, authed = regUser.authed, guestStatus = regUser.guestStatus,
emoji = "none", locked = false, presenter, avatar = regUser.avatarURL)
emoji = "none", locked = false, presenter = false, avatar = regUser.avatarURL, clientType = "unknown",
userLeftFlag = UserLeftFlag(false, 0))
Users2x.add(liveMeeting.users2x, u)
u
}
}

View File

@ -1,14 +1,19 @@
import org.bigbluebutton.build._
import scalariform.formatter.preferences._
import com.typesafe.sbt.SbtScalariform
import com.typesafe.sbt.SbtScalariform.ScalariformKeys
import com.typesafe.sbt.SbtNativePackager.autoImport._
enablePlugins(JavaServerAppPackaging)
name := "bbb-fsesl-akka"
version := "0.0.2"
organization := "org.bigbluebutton"
val compileSettings = Seq(
organization := "org.bigbluebutton",
version := "0.0.1"
scalaVersion := "2.12.6"
scalacOptions ++= Seq(
scalacOptions ++= List(
"-unchecked",
"-deprecation",
"-Xlint",
@ -16,11 +21,15 @@ scalacOptions ++= Seq(
"-language:_",
"-target:jvm-1.8",
"-encoding", "UTF-8"
),
javacOptions ++= List(
"-Xlint:unchecked",
"-Xlint:deprecation"
)
)
resolvers ++= Seq(
"spray repo" at "http://repo.spray.io/",
"rediscala" at "http://dl.bintray.com/etaty/maven",
"blindside-repos" at "http://blindside.googlecode.com/svn/repository/"
)
@ -37,63 +46,14 @@ testOptions in Test += Tests.Argument(TestFrameworks.Specs2, "html", "console",
testOptions in Test += Tests.Argument(TestFrameworks.ScalaTest, "-h", "target/scalatest-reports")
val akkaVersion = "2.5.14"
val scalaTestV = "2.2.6"
Seq(Revolver.settings: _*)
lazy val bbbFseslAkka = (project in file(".")).settings(name := "bbb-fsesl-akka", libraryDependencies ++= Dependencies.runtime).settings(compileSettings)
scalariformAutoformat := false
libraryDependencies ++= {
Seq(
"ch.qos.logback" % "logback-classic" % "1.2.3" % "runtime",
"junit" % "junit" % "4.11",
"commons-codec" % "commons-codec" % "1.11",
"joda-time" % "joda-time" % "2.10",
"org.apache.commons" % "commons-lang3" % "3.7"
)}
libraryDependencies += "org.bigbluebutton" % "bbb-common-message_2.12" % "0.0.19-SNAPSHOT"
libraryDependencies += "org.bigbluebutton" % "bbb-fsesl-client" % "0.0.6"
// https://mvnrepository.com/artifact/org.scala-lang/scala-library
libraryDependencies += "org.scala-lang" % "scala-library" % scalaVersion.value
// https://mvnrepository.com/artifact/org.scala-lang/scala-compiler
libraryDependencies += "org.scala-lang" % "scala-compiler" % scalaVersion.value
// https://mvnrepository.com/artifact/com.typesafe.akka/akka-actor_2.12
libraryDependencies += "com.typesafe.akka" % "akka-actor_2.12" % akkaVersion
// https://mvnrepository.com/artifact/com.typesafe.akka/akka-slf4j_2.12
libraryDependencies += "com.typesafe.akka" % "akka-slf4j_2.12" % akkaVersion
// https://mvnrepository.com/artifact/com.github.etaty/rediscala_2.12
libraryDependencies += "com.github.etaty" % "rediscala_2.12" % "1.8.0"
// For generating test reports
libraryDependencies += "org.pegdown" % "pegdown" % "1.6.0" % "test"
// https://mvnrepository.com/artifact/com.typesafe.akka/akka-testkit_2.12
libraryDependencies += "com.typesafe.akka" % "akka-testkit_2.12" % "2.5.14" % "test"
// https://mvnrepository.com/artifact/org.scalactic/scalactic_2.12
libraryDependencies += "org.scalactic" % "scalactic_2.12" % "3.0.5" % "test"
// https://mvnrepository.com/artifact/org.scalatest/scalatest_2.12
libraryDependencies += "org.scalatest" % "scalatest_2.12" % "3.0.5" % "test"
libraryDependencies += "org.mockito" % "mockito-core" % "2.21.0" % "test"
seq(Revolver.settings: _*)
import com.typesafe.sbt.SbtScalariform
import scalariform.formatter.preferences._
import com.typesafe.sbt.SbtScalariform.ScalariformKeys
SbtScalariform.defaultScalariformSettings
ScalariformKeys.preferences := ScalariformKeys.preferences.value
scalariformPreferences := scalariformPreferences.value
.setPreference(AlignSingleLineCaseStatements, true)
.setPreference(DoubleIndentClassDeclaration, true)
.setPreference(DoubleIndentConstructorArguments, true)
.setPreference(AlignParameters, true)
//-----------
@ -121,16 +81,7 @@ daemonUser in Linux := user
// group which will execute the application
daemonGroup in Linux := group
mappings in Universal <+= (packageBin in Compile, sourceDirectory ) map { (_, src) =>
// Move the application.conf so the user can override settings here
val appConf = src / "main" / "resources" / "application.conf"
appConf -> "conf/application.conf"
}
mappings in Universal <+= (packageBin in Compile, sourceDirectory ) map { (_, src) =>
// Move logback.xml so the user can override settings here
val logConf = src / "main" / "resources" / "logback.xml"
logConf -> "conf/logback.xml"
}
mappings in(Universal, packageBin) += file("src/main/resources/application.conf") -> "conf/application.conf"
mappings in(Universal, packageBin) += file("src/main/resources/logback.xml") -> "conf/logback.xml"
debianPackageDependencies in Debian ++= Seq("java8-runtime-headless", "bash")

View File

@ -0,0 +1,71 @@
package org.bigbluebutton.build
import sbt._
import Keys._
object Dependencies {
object Versions {
// Scala
val scala = "2.12.7"
val junitInterface = "0.11"
val scalactic = "3.0.3"
// Libraries
val akkaVersion = "2.5.17"
val logback = "1.2.3"
// Apache Commons
val lang = "3.8.1"
val codec = "1.11"
// BigBlueButton
val bbbCommons = "0.0.20-SNAPSHOT"
val bbbFsesl = "0.0.7-SNAPSHOT"
// Test
val scalaTest = "3.0.5"
val akkaTestKit = "2.5.18"
val junit = "4.12"
}
object Compile {
val scalaLibrary = "org.scala-lang" % "scala-library" % Versions.scala
val scalaCompiler = "org.scala-lang" % "scala-compiler" % Versions.scala
val akkaActor = "com.typesafe.akka" % "akka-actor_2.12" % Versions.akkaVersion
val akkaSl4fj = "com.typesafe.akka" % "akka-slf4j_2.12" % Versions.akkaVersion
val logback = "ch.qos.logback" % "logback-classic" % Versions.logback
val commonsCodec = "commons-codec" % "commons-codec" % Versions.codec
val apacheLang = "org.apache.commons" % "commons-lang3" % Versions.lang
val bbbCommons = "org.bigbluebutton" % "bbb-common-message_2.12" % Versions.bbbCommons
val bbbFseslClient = "org.bigbluebutton" % "bbb-fsesl-client" % Versions.bbbFsesl
}
object Test {
val scalaTest = "org.scalatest" %% "scalatest" % Versions.scalaTest % "test"
val junit = "junit" % "junit" % Versions.junit % "test"
val scalactic = "org.scalactic" % "scalactic_2.12" % Versions.scalactic % "test"
val akkaTestKit = "com.typesafe.akka" %% "akka-testkit" % Versions.akkaTestKit % "test"
}
val testing = Seq(
Test.scalaTest,
Test.junit,
Test.scalactic,
Test.akkaTestKit)
val runtime = Seq(
Compile.scalaLibrary,
Compile.scalaCompiler,
Compile.akkaActor,
Compile.akkaSl4fj,
Compile.logback,
Compile.commonsCodec,
Compile.apacheLang,
Compile.bbbCommons,
Compile.bbbFseslClient) ++ testing
}

View File

@ -1 +1 @@
sbt.version=0.13.8
sbt.version=1.2.6

View File

@ -1,11 +1,11 @@
addSbtPlugin("io.spray" % "sbt-revolver" % "0.9.1")
addSbtPlugin("org.scalariform" % "sbt-scalariform" % "1.8.2")
addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "5.2.4")
addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.3.6")
addSbtPlugin("org.scalariform" % "sbt-scalariform" % "1.8.2")
addSbtPlugin("net.vonbuchholtz" % "sbt-dependency-check" % "0.2.7")
addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.3.12")
addSbtPlugin("net.vonbuchholtz" % "sbt-dependency-check" % "0.2.8")
addSbtPlugin("org.scalastyle" %% "scalastyle-sbt-plugin" % "1.0.0")

View File

@ -1,3 +1 @@
sbt clean
sbt run
sbt clean run

View File

@ -8,7 +8,7 @@ akka {
loglevel = "DEBUG"
stdout-loglevel = "DEBUG"
rediscala-subscriber-worker-dispatcher {
redis-subscriber-worker-dispatcher {
mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox"
# Throughput defines the maximum number of messages to be
# processed per actor before the thread jumps to the next actor.

View File

@ -1,19 +1,20 @@
package org.bigbluebutton
import akka.actor.{ ActorSystem }
import org.bigbluebutton.endpoint.redis.{ AppsRedisSubscriberActor, RedisPublisher }
import org.bigbluebutton.common2.bus.IncomingJsonMessageBus
import org.bigbluebutton.common2.redis.RedisPublisher
import org.bigbluebutton.endpoint.redis.FSESLRedisSubscriberActor
import org.bigbluebutton.freeswitch.{ RxJsonMsgHdlrActor, VoiceConferenceService }
import org.bigbluebutton.freeswitch.bus.InsonMsgBus
import org.bigbluebutton.freeswitch.voice.FreeswitchConferenceEventListener
import org.bigbluebutton.freeswitch.voice.freeswitch.{ ConnectionManager, ESLEventListener, FreeswitchApplication }
import org.freeswitch.esl.client.manager.DefaultManagerConnection
import akka.actor.ActorSystem
object Boot extends App with SystemConfiguration {
implicit val system = ActorSystem("bigbluebutton-fsesl-system")
val redisPublisher = new RedisPublisher(system)
val redisPublisher = new RedisPublisher(system, "BbbFsEslAkkaPub")
val eslConnection = new DefaultManagerConnection(eslHost, eslPort, eslPassword)
@ -30,10 +31,9 @@ object Boot extends App with SystemConfiguration {
val fsApplication = new FreeswitchApplication(connManager, fsProfile)
fsApplication.start()
val inJsonMsgBus = new InsonMsgBus
val inJsonMsgBus = new IncomingJsonMessageBus
val redisMessageHandlerActor = system.actorOf(RxJsonMsgHdlrActor.props(fsApplication))
inJsonMsgBus.subscribe(redisMessageHandlerActor, toFsAppsJsonChannel)
val redisSubscriberActor = system.actorOf(AppsRedisSubscriberActor.props(system, inJsonMsgBus), "redis-subscriber")
val redisSubscriberActor = system.actorOf(FSESLRedisSubscriberActor.props(system, inJsonMsgBus), "redis-subscriber")
}

View File

@ -1,23 +1,15 @@
package org.bigbluebutton
import com.typesafe.config.ConfigFactory
import scala.util.Try
trait SystemConfiguration {
val config = ConfigFactory.load()
import org.bigbluebutton.common2.redis.RedisConfiguration
trait SystemConfiguration extends RedisConfiguration {
lazy val eslHost = Try(config.getString("freeswitch.esl.host")).getOrElse("127.0.0.1")
lazy val eslPort = Try(config.getInt("freeswitch.esl.port")).getOrElse(8021)
lazy val eslPassword = Try(config.getString("freeswitch.esl.password")).getOrElse("ClueCon")
lazy val fsProfile = Try(config.getString("freeswitch.conf.profile")).getOrElse("cdquality")
lazy val redisHost = Try(config.getString("redis.host")).getOrElse("127.0.0.1")
lazy val redisPort = Try(config.getInt("redis.port")).getOrElse(6379)
lazy val redisPassword = Try(config.getString("redis.password")).getOrElse("")
lazy val toVoiceConfRedisChannel = Try(config.getString("redis.toVoiceConfRedisChannel")).getOrElse("to-voice-conf-redis-channel")
lazy val fromVoiceConfRedisChannel = Try(config.getString("redis.fromVoiceConfRedisChannel")).getOrElse("from-voice-conf-redis-channel")
lazy val toFsAppsJsonChannel = Try(config.getString("eventBus.toFsAppsChannel")).getOrElse("to-fs-apps-json-channel")
lazy val fromFsAppsJsonChannel = Try(config.getString("eventBus.fromFsAppsChannel")).getOrElse("from-fs-apps-json-channel")
}

View File

@ -1,82 +0,0 @@
package org.bigbluebutton.endpoint.redis
import java.io.PrintWriter
import java.io.StringWriter
import java.net.InetSocketAddress
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.DurationInt
import org.bigbluebutton.SystemConfiguration
import akka.actor.ActorSystem
import akka.actor.OneForOneStrategy
import akka.actor.Props
import akka.actor.SupervisorStrategy.Resume
import org.bigbluebutton.freeswitch.bus.{ InJsonMsg, InsonMsgBus, ReceivedJsonMsg }
import redis.actors.RedisSubscriberActor
import redis.api.pubsub.Message
import redis.api.pubsub.PMessage
import redis.api.servers.ClientSetname
object AppsRedisSubscriberActor extends SystemConfiguration {
val channels = Seq(toVoiceConfRedisChannel)
val patterns = Seq("bigbluebutton:to-voice-conf:*", "bigbluebutton:from-bbb-apps:*")
def props(system: ActorSystem, inJsonMgBus: InsonMsgBus): Props =
Props(classOf[AppsRedisSubscriberActor], system, inJsonMgBus,
redisHost, redisPort,
channels, patterns).withDispatcher("akka.rediscala-subscriber-worker-dispatcher")
}
class AppsRedisSubscriberActor(
val system: ActorSystem,
inJsonMgBus: InsonMsgBus, redisHost: String,
redisPort: Int,
channels: Seq[String] = Nil, patterns: Seq[String] = Nil)
extends RedisSubscriberActor(
new InetSocketAddress(redisHost, redisPort),
channels, patterns, onConnectStatus = connected => { println(s"connected: $connected") }) with SystemConfiguration {
override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
case e: Exception => {
val sw: StringWriter = new StringWriter()
sw.write("An exception has been thrown on AppsRedisSubscriberActor, exception message [" + e.getMessage() + "] (full stacktrace below)\n")
e.printStackTrace(new PrintWriter(sw))
log.error(sw.toString())
Resume
}
}
// val decoder = new FromJsonDecoder()
var lastPongReceivedOn = 0L
system.scheduler.schedule(10 seconds, 10 seconds)(checkPongMessage())
// Set the name of this client to be able to distinguish when doing
// CLIENT LIST on redis-cli
write(ClientSetname("BbbFsEslAkkaSub").encodedRequest)
def checkPongMessage() {
val now = System.currentTimeMillis()
if (lastPongReceivedOn != 0 && (now - lastPongReceivedOn > 30000)) {
log.error("FSESL pubsub error!");
}
}
def onMessage(message: Message) {
if (message.channel == toVoiceConfRedisChannel) {
val receivedJsonMessage = new ReceivedJsonMsg(message.channel, message.data.utf8String)
log.debug(s"RECEIVED:\n [${receivedJsonMessage.channel}] \n ${receivedJsonMessage.data} \n")
inJsonMgBus.publish(InJsonMsg(toFsAppsJsonChannel, receivedJsonMessage))
}
}
def onPMessage(pmessage: PMessage) {
// log.debug(s"pattern message received: $pmessage")
}
def handleMessage(msg: String) {
log.warning("**** TODO: Handle pubsub messages. ****")
}
}

View File

@ -0,0 +1,46 @@
package org.bigbluebutton.endpoint.redis
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.DurationInt
import org.bigbluebutton.SystemConfiguration
import org.bigbluebutton.common2.bus.IncomingJsonMessageBus
import org.bigbluebutton.common2.redis.{ RedisSubscriber, RedisSubscriberProvider }
import akka.actor.ActorSystem
import akka.actor.Props
object FSESLRedisSubscriberActor extends RedisSubscriber {
val channels = Seq(toVoiceConfRedisChannel)
val patterns = Seq("bigbluebutton:to-voice-conf:*", "bigbluebutton:from-bbb-apps:*")
def props(system: ActorSystem, inJsonMgBus: IncomingJsonMessageBus): Props =
Props(
classOf[FSESLRedisSubscriberActor],
system, inJsonMgBus,
redisHost, redisPort,
channels, patterns).withDispatcher("akka.redis-subscriber-worker-dispatcher")
}
class FSESLRedisSubscriberActor(
system: ActorSystem,
inJsonMgBus: IncomingJsonMessageBus,
redisHost: String, redisPort: Int,
channels: Seq[String] = Nil, patterns: Seq[String] = Nil)
extends RedisSubscriberProvider(system, "BbbFsEslAkkaSub", channels, patterns, inJsonMgBus) with SystemConfiguration {
var lastPongReceivedOn = 0L
system.scheduler.schedule(10 seconds, 10 seconds)(checkPongMessage())
def checkPongMessage() {
val now = System.currentTimeMillis()
if (lastPongReceivedOn != 0 && (now - lastPongReceivedOn > 30000)) {
log.error("FSESL pubsub error!");
}
}
addListener(toFsAppsJsonChannel)
subscribe()
}

View File

@ -1,20 +0,0 @@
package org.bigbluebutton.endpoint.redis
import redis.RedisClient
import akka.actor.ActorSystem
import org.bigbluebutton.SystemConfiguration
class RedisPublisher(val system: ActorSystem) extends SystemConfiguration {
val redis = RedisClient(redisHost, redisPort)(system)
// Set the name of this client to be able to distinguish when doing
// CLIENT LIST on redis-cli
redis.clientSetname("BbbFsEslAkkaPub")
def publish(channel: String, data: String) {
//println("PUBLISH TO [" + channel + "]: \n [" + data + "]")
redis.publish(channel, data)
}
}

View File

@ -1,12 +1,16 @@
package org.bigbluebutton.freeswitch
import akka.actor.{ Actor, ActorLogging, Props }
import com.fasterxml.jackson.databind.JsonNode
import org.bigbluebutton.SystemConfiguration
import org.bigbluebutton.common2.bus.ReceivedJsonMessage
import org.bigbluebutton.common2.msgs._
import org.bigbluebutton.freeswitch.bus.ReceivedJsonMsg
import org.bigbluebutton.freeswitch.voice.freeswitch.FreeswitchApplication
import com.fasterxml.jackson.databind.JsonNode
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.Props
object RxJsonMsgHdlrActor {
def props(fsApp: FreeswitchApplication): Props =
Props(classOf[RxJsonMsgHdlrActor], fsApp)
@ -15,13 +19,13 @@ object RxJsonMsgHdlrActor {
class RxJsonMsgHdlrActor(val fsApp: FreeswitchApplication) extends Actor with ActorLogging
with SystemConfiguration with RxJsonMsgDeserializer {
def receive = {
case msg: ReceivedJsonMsg =>
case msg: ReceivedJsonMessage =>
log.debug("handling {} - {}", msg.channel, msg.data)
handleReceivedJsonMessage(msg)
case _ => // do nothing
}
def handleReceivedJsonMessage(msg: ReceivedJsonMsg): Unit = {
def handleReceivedJsonMessage(msg: ReceivedJsonMessage): Unit = {
for {
envJsonNode <- JsonDeserializer.toBbbCommonEnvJsNodeMsg(msg.data)
} yield handle(envJsonNode.envelope, envJsonNode.core)

View File

@ -2,9 +2,9 @@ package org.bigbluebutton.freeswitch
import org.bigbluebutton.SystemConfiguration
import org.bigbluebutton.freeswitch.voice.IVoiceConferenceService
import org.bigbluebutton.endpoint.redis.RedisPublisher
import org.bigbluebutton.common2.msgs._
import org.bigbluebutton.common2.util.JsonUtil
import org.bigbluebutton.common2.redis.RedisPublisher
class VoiceConferenceService(sender: RedisPublisher) extends IVoiceConferenceService with SystemConfiguration {

View File

@ -1,31 +0,0 @@
package org.bigbluebutton.freeswitch.bus
import akka.actor.ActorRef
import akka.event.{ EventBus, LookupClassification }
case class ReceivedJsonMsg(channel: String, data: String)
case class InJsonMsg(val topic: String, val payload: ReceivedJsonMsg)
class InsonMsgBus extends EventBus with LookupClassification {
type Event = InJsonMsg
type Classifier = String
type Subscriber = ActorRef
// is used for extracting the classifier from the incoming events
override protected def classify(event: Event): Classifier = event.topic
// will be invoked for each event for all subscribers which registered themselves
// for the events classifier
override protected def publish(event: Event, subscriber: Subscriber): Unit = {
subscriber ! event.payload
}
// must define a full order over the subscribers, expressed as expected from
// `java.lang.Comparable.compare`
override protected def compareSubscribers(a: Subscriber, b: Subscriber): Int =
a.compareTo(b)
// determines the initial size of the index data structure
// used internally (i.e. the expected number of different classifiers)
override protected def mapSize: Int = 128
}

View File

@ -1,13 +1,11 @@
import org.bigbluebutton.build._
name := "bbb-apps-common"
version := "0.0.4-SNAPSHOT"
organization := "org.bigbluebutton"
val compileSettings = Seq(
organization := "org.bigbluebutton",
version := "0.0.3-SNAPSHOT"
scalaVersion := "2.12.6"
scalacOptions ++= Seq(
scalacOptions ++= List(
"-unchecked",
"-deprecation",
"-Xlint",
@ -15,6 +13,11 @@ scalacOptions ++= Seq(
"-language:_",
"-target:jvm-1.8",
"-encoding", "UTF-8"
),
javacOptions ++= List(
"-Xlint:unchecked",
"-Xlint:deprecation"
)
)
// We want to have our jar files in lib_managed dir.
@ -22,57 +25,8 @@ scalacOptions ++= Seq(
// into eclipse.
retrieveManaged := true
testOptions in Test += Tests.Argument(TestFrameworks.Specs2, "html", "console", "junitxml")
testOptions in Test += Tests.Argument(TestFrameworks.ScalaTest, "-h", "target/scalatest-reports")
val akkaVersion = "2.5.14"
val scalaTestV = "2.2.6"
// https://mvnrepository.com/artifact/org.scala-lang/scala-library
libraryDependencies += "org.scala-lang" % "scala-library" % scalaVersion.value
// https://mvnrepository.com/artifact/org.scala-lang/scala-compiler
libraryDependencies += "org.scala-lang" % "scala-compiler" % scalaVersion.value
// https://mvnrepository.com/artifact/com.typesafe.akka/akka-actor_2.12
libraryDependencies += "com.typesafe.akka" % "akka-actor_2.12" % "2.5.1"
// https://mvnrepository.com/artifact/com.typesafe.akka/akka-slf4j_2.12
libraryDependencies += "com.typesafe.akka" % "akka-slf4j_2.12" % "2.5.1"
// https://mvnrepository.com/artifact/com.github.etaty/rediscala_2.12
libraryDependencies += "com.github.etaty" % "rediscala_2.12" % "1.8.0"
libraryDependencies += "com.softwaremill.quicklens" %% "quicklens" % "1.4.11"
libraryDependencies += "org.bigbluebutton" % "bbb-common-message_2.12" % "0.0.19-SNAPSHOT"
libraryDependencies += "com.google.code.gson" % "gson" % "2.8.5"
libraryDependencies += "redis.clients" % "jedis" % "2.9.0"
// https://mvnrepository.com/artifact/org.apache.commons/commons-lang3
libraryDependencies += "org.apache.commons" % "commons-lang3" % "3.7"
libraryDependencies += "commons-io" % "commons-io" % "2.4"
libraryDependencies += "org.apache.commons" % "commons-pool2" % "2.6.0"
libraryDependencies += "org.slf4j" % "slf4j-api" % "1.7.23" % "provided"
libraryDependencies += "junit" % "junit" % "4.12" % "test"
libraryDependencies += "com.novocode" % "junit-interface" % "0.11" % "test"
// For generating test reports
libraryDependencies += "org.pegdown" % "pegdown" % "1.6.0" % "test"
// https://mvnrepository.com/artifact/com.typesafe.akka/akka-testkit_2.12
libraryDependencies += "com.typesafe.akka" % "akka-testkit_2.12" % "2.5.1" % "test"
// https://mvnrepository.com/artifact/org.scalactic/scalactic_2.12
libraryDependencies += "org.scalactic" % "scalactic_2.12" % "3.0.3" % "test"
// https://mvnrepository.com/artifact/org.scalatest/scalatest_2.12
libraryDependencies += "org.scalatest" % "scalatest_2.12" % "3.0.3" % "test"
libraryDependencies += "org.mockito" % "mockito-core" % "2.7.22" % "test"
seq(Revolver.settings: _*)
Seq(Revolver.settings: _*)
lazy val appsCommons = (project in file(".")).settings(name := "bbb-apps-common", libraryDependencies ++= Dependencies.runtime).settings(compileSettings)
//-----------
// Packaging
@ -133,5 +87,3 @@ pomExtra := (
licenses := Seq("LGPL-3.0" -> url("http://opensource.org/licenses/LGPL-3.0"))
homepage := Some(url("http://www.bigbluebutton.org"))

View File

@ -1,2 +1 @@
sbt clean
sbt publish publishLocal
sbt clean publish publishLocal

View File

@ -0,0 +1,57 @@
package org.bigbluebutton.build
import sbt._
import Keys._
object Dependencies {
object Versions {
// Scala
val scala = "2.12.7"
// Libraries
val akkaVersion = "2.5.17"
val gson = "2.8.5"
val sl4j = "1.7.25"
val quicklens = "1.4.11"
// Apache Commons
val lang = "3.8.1"
val io = "2.6"
val pool = "2.6.0"
// BigBlueButton
val bbbCommons = "0.0.20-SNAPSHOT"
}
object Compile {
val scalaLibrary = "org.scala-lang" % "scala-library" % Versions.scala
val scalaCompiler = "org.scala-lang" % "scala-compiler" % Versions.scala
val akkaActor = "com.typesafe.akka" % "akka-actor_2.12" % Versions.akkaVersion
val akkaSl4fj = "com.typesafe.akka" % "akka-slf4j_2.12" % Versions.akkaVersion
val googleGson = "com.google.code.gson" % "gson" % Versions.gson
val quicklens = "com.softwaremill.quicklens" %% "quicklens" % Versions.quicklens
val sl4jApi = "org.slf4j" % "slf4j-api" % Versions.sl4j % "provided"
val apacheLang = "org.apache.commons" % "commons-lang3" % Versions.lang
val apacheIo = "commons-io" % "commons-io" % Versions.io
val apachePool2 = "org.apache.commons" % "commons-pool2" % Versions.pool
val bbbCommons = "org.bigbluebutton" % "bbb-common-message_2.12" % Versions.bbbCommons
}
val runtime = Seq(
Compile.scalaLibrary,
Compile.scalaCompiler,
Compile.akkaActor,
Compile.akkaSl4fj,
Compile.googleGson,
Compile.quicklens,
Compile.sl4jApi,
Compile.apacheLang,
Compile.apacheIo,
Compile.apachePool2,
Compile.bbbCommons)
}

View File

@ -1 +1 @@
sbt.version=0.13.8
sbt.version=1.2.6

View File

@ -1,9 +1,9 @@
addSbtPlugin("io.spray" % "sbt-revolver" % "0.7.2")
addSbtPlugin("io.spray" % "sbt-revolver" % "0.9.1")
addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "2.2.0")
addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "5.2.4")
addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.0.0")
addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.1.1")
addSbtPlugin("net.vonbuchholtz" % "sbt-dependency-check" % "0.2.7")
addSbtPlugin("net.vonbuchholtz" % "sbt-dependency-check" % "0.2.8")
addSbtPlugin("org.scalastyle" %% "scalastyle-sbt-plugin" % "1.0.0")

View File

@ -3,10 +3,14 @@ package org.bigbluebutton.client
import akka.actor.ActorSystem
import akka.event.Logging
import org.bigbluebutton.client.bus._
import org.bigbluebutton.client.endpoint.redis.{AppsRedisSubscriberActor, MessageSender, RedisPublisher}
import org.bigbluebutton.client.endpoint.redis.Red5AppsRedisSubscriberActor
import org.bigbluebutton.client.meeting.MeetingManagerActor
import org.bigbluebutton.common2.redis.RedisPublisher
import scala.concurrent.duration._
import org.bigbluebutton.common2.redis.MessageSender
import org.bigbluebutton.api2.bus.MsgFromAkkaAppsEventBus
import org.bigbluebutton.common2.bus.JsonMsgFromAkkaAppsBus
class ClientGWApplication(val msgToClientGW: MsgToClientGW) extends SystemConfiguration {
@ -20,7 +24,7 @@ class ClientGWApplication(val msgToClientGW: MsgToClientGW) extends SystemConfig
private val msgToRedisEventBus = new MsgToRedisEventBus
private val msgToClientEventBus = new MsgToClientEventBus
private val redisPublisher = new RedisPublisher(system)
private val redisPublisher = new RedisPublisher(system, "Red5AppsPub")
private val msgSender: MessageSender = new MessageSender(redisPublisher)
private val meetingManagerActorRef = system.actorOf(
@ -41,15 +45,13 @@ class ClientGWApplication(val msgToClientGW: MsgToClientGW) extends SystemConfig
msgToClientEventBus.subscribe(msgToClientJsonActor, toClientChannel)
private val appsRedisSubscriberActor = system.actorOf(
AppsRedisSubscriberActor.props(receivedJsonMsgBus), "appsRedisSubscriberActor")
private val appsRedisSubscriberActor = system.actorOf(Red5AppsRedisSubscriberActor.props(system, receivedJsonMsgBus), "appsRedisSubscriberActor")
private val receivedJsonMsgHdlrActor = system.actorOf(
ReceivedJsonMsgHdlrActor.props(msgFromAkkaAppsEventBus), "receivedJsonMsgHdlrActor")
receivedJsonMsgBus.subscribe(receivedJsonMsgHdlrActor, fromAkkaAppsJsonChannel)
/**
*
* External Interface for Gateway

View File

@ -3,8 +3,8 @@ package org.bigbluebutton.client
import akka.actor.{ Actor, ActorLogging, Props }
import org.bigbluebutton.common2.msgs.BbbCommonEnvJsNodeMsg
import org.bigbluebutton.common2.util.JsonUtil
import org.bigbluebutton.client.endpoint.redis.MessageSender
import org.bigbluebutton.common2.msgs.LookUpUserReqMsg
import org.bigbluebutton.common2.redis.MessageSender
object MsgToRedisActor {
def props(msgSender: MessageSender): Props =
@ -26,5 +26,4 @@ class MsgToRedisActor(msgSender: MessageSender)
case _ => msgSender.send(toAkkaAppsRedisChannel, json)
}
}
}

View File

@ -1,11 +1,13 @@
package org.bigbluebutton.client
import akka.actor.{Actor, ActorLogging, Props}
import org.bigbluebutton.client.bus.{JsonMsgFromAkkaApps, MsgFromAkkaApps, MsgFromAkkaAppsEventBus}
import org.bigbluebutton.common2.msgs.BbbCommonEnvJsNodeMsg
import org.bigbluebutton.common2.util.JsonUtil
import scala.util.{Failure, Success}
import org.bigbluebutton.common2.bus.JsonMsgFromAkkaApps
import org.bigbluebutton.api2.bus.MsgFromAkkaAppsEventBus
import org.bigbluebutton.api2.bus.MsgFromAkkaApps
object ReceivedJsonMsgHdlrActor {

View File

@ -1,6 +1,6 @@
package org.bigbluebutton.client
import org.bigbluebutton.client.bus.{MsgFromAkkaApps, MsgFromAkkaAppsEventBus}
import org.bigbluebutton.api2.bus.{ MsgFromAkkaApps, MsgFromAkkaAppsEventBus }
trait ReceivedMessageRouter {
val msgFromAkkaAppsEventBus: MsgFromAkkaAppsEventBus

View File

@ -1,17 +1,10 @@
package org.bigbluebutton.client
import scala.util.Try
import com.typesafe.config.ConfigFactory
trait SystemConfiguration {
val config = ConfigFactory.load()
import org.bigbluebutton.common2.redis.RedisConfiguration
lazy val redisHost = Try(config.getString("redis.host")).getOrElse("127.0.0.1")
lazy val redisPort = Try(config.getInt("redis.port")).getOrElse(6379)
lazy val redisPassword = Try(config.getString("redis.password")).getOrElse("")
lazy val toAkkaAppsRedisChannel = Try(config.getString("redis.toAkkaAppsRedisChannel")).getOrElse("to-akka-apps-redis-channel")
lazy val fromAkkaAppsRedisChannel = Try(config.getString("redis.fromAkkaAppsRedisChannel")).getOrElse("from-akka-apps-redis-channel")
trait SystemConfiguration extends RedisConfiguration {
lazy val toThirdPartyRedisChannel = Try(config.getString("redis.toThirdPartyRedisChannel")).getOrElse("to-third-party-redis-channel")
lazy val fromThirdPartyRedisChannel = Try(config.getString("redis.fromThirdPartyRedisChannel")).getOrElse("from-third-party-redis-channel")
lazy val fromAkkaAppsChannel = Try(config.getString("eventBus.fromAkkaAppsChannel")).getOrElse("from-akka-apps-channel")
@ -19,8 +12,4 @@ trait SystemConfiguration {
lazy val fromClientChannel = Try(config.getString("eventBus.fromClientChannel")).getOrElse("from-client-channel")
lazy val toClientChannel = Try(config.getString("eventBus.toClientChannel")).getOrElse("to-client-channel")
lazy val fromAkkaAppsJsonChannel = Try(config.getString("eventBus.fromAkkaAppsChannel")).getOrElse("from-akka-apps-json-channel")
lazy val fromAkkaAppsWbRedisChannel = Try(config.getString("redis.fromAkkaAppsWbRedisChannel")).getOrElse("from-akka-apps-wb-redis-channel")
lazy val fromAkkaAppsChatRedisChannel = Try(config.getString("redis.fromAkkaAppsChatRedisChannel")).getOrElse("from-akka-apps-chat-redis-channel")
lazy val fromAkkaAppsPresRedisChannel = Try(config.getString("redis.fromAkkaAppsPresRedisChannel")).getOrElse("from-akka-apps-pres-redis-channel")
}

View File

@ -1,32 +0,0 @@
package org.bigbluebutton.client.bus
import akka.actor.ActorRef
import akka.event.{EventBus, LookupClassification}
case class JsonMsgFromAkkaApps(name: String, data: String)
case class JsonMsgFromAkkaAppsEvent(val topic: String, val payload: JsonMsgFromAkkaApps)
class JsonMsgFromAkkaAppsBus extends EventBus with LookupClassification {
type Event = JsonMsgFromAkkaAppsEvent
type Classifier = String
type Subscriber = ActorRef
// is used for extracting the classifier from the incoming events
override protected def classify(event: Event): Classifier = event.topic
// will be invoked for each event for all subscribers which registered themselves
// for the events classifier
override protected def publish(event: Event, subscriber: Subscriber): Unit = {
subscriber ! event.payload
}
// must define a full order over the subscribers, expressed as expected from
// `java.lang.Comparable.compare`
override protected def compareSubscribers(a: Subscriber, b: Subscriber): Int =
a.compareTo(b)
// determines the initial size of the index data structure
// used internally (i.e. the expected number of different classifiers)
override protected def mapSize: Int = 128
}

View File

@ -1,31 +0,0 @@
package org.bigbluebutton.client.bus
import akka.actor.ActorRef
import akka.event.{EventBus, LookupClassification}
import org.bigbluebutton.common2.msgs.{ BbbCommonEnvJsNodeMsg}
case class MsgFromAkkaApps(val topic: String, val payload: BbbCommonEnvJsNodeMsg)
class MsgFromAkkaAppsEventBus extends EventBus with LookupClassification {
type Event = MsgFromAkkaApps
type Classifier = String
type Subscriber = ActorRef
// is used for extracting the classifier from the incoming events
override protected def classify(event: Event): Classifier = event.topic
// will be invoked for each event for all subscribers which registered themselves
// for the events classifier
override protected def publish(event: Event, subscriber: Subscriber): Unit = {
subscriber ! event.payload
}
// must define a full order over the subscribers, expressed as expected from
// `java.lang.Comparable.compare`
override protected def compareSubscribers(a: Subscriber, b: Subscriber): Int =
a.compareTo(b)
// determines the initial size of the index data structure
// used internally (i.e. the expected number of different classifiers)
override protected def mapSize: Int = 128
}

View File

@ -1,62 +0,0 @@
package org.bigbluebutton.client.endpoint.redis
import akka.actor.{ActorLogging, OneForOneStrategy, Props}
import akka.actor.SupervisorStrategy.Resume
import java.io.{PrintWriter, StringWriter}
import java.net.InetSocketAddress
import redis.actors.RedisSubscriberActor
import redis.api.pubsub.{Message, PMessage}
import scala.concurrent.duration._
import org.bigbluebutton.client._
import org.bigbluebutton.client.bus.{JsonMsgFromAkkaApps, JsonMsgFromAkkaAppsBus, JsonMsgFromAkkaAppsEvent}
import redis.api.servers.ClientSetname
object AppsRedisSubscriberActor extends SystemConfiguration {
val channels = Seq(fromAkkaAppsRedisChannel, fromAkkaAppsWbRedisChannel, fromAkkaAppsChatRedisChannel, fromAkkaAppsPresRedisChannel, fromThirdPartyRedisChannel)
val patterns = Seq("bigbluebutton:from-bbb-apps:*")
def props(jsonMsgBus: JsonMsgFromAkkaAppsBus): Props =
Props(classOf[AppsRedisSubscriberActor], jsonMsgBus,
redisHost, redisPort,
channels, patterns).withDispatcher("akka.rediscala-subscriber-worker-dispatcher")
}
class AppsRedisSubscriberActor(jsonMsgBus: JsonMsgFromAkkaAppsBus, redisHost: String,
redisPort: Int,
channels: Seq[String] = Nil, patterns: Seq[String] = Nil)
extends RedisSubscriberActor(new InetSocketAddress(redisHost, redisPort),
channels, patterns, onConnectStatus = connected => { println(s"connected: $connected") })
with SystemConfiguration with ActorLogging {
override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
case e: Exception => {
val sw: StringWriter = new StringWriter()
sw.write("An exception has been thrown on AppsRedisSubscriberActor, exception message [" + e.getMessage() + "] (full stacktrace below)\n")
e.printStackTrace(new PrintWriter(sw))
log.error(sw.toString())
Resume
}
}
// Set the name of this client to be able to distinguish when doing
// CLIENT LIST on redis-cli
write(ClientSetname("Red5AppsSub").encodedRequest)
def onMessage(message: Message) {
if (channels.contains(message.channel)) {
//log.debug(s"RECEIVED:\n ${message.data.utf8String} \n")
val receivedJsonMessage = new JsonMsgFromAkkaApps(message.channel, message.data.utf8String)
jsonMsgBus.publish(JsonMsgFromAkkaAppsEvent(fromAkkaAppsJsonChannel, receivedJsonMessage))
}
}
def onPMessage(pmessage: PMessage) {
// We don't use PSubscribe anymore, but an implementation of the method is required
log.error("Should not be receiving a PMessage. It triggered on a match of pattern: " + pmessage.patternMatched)
}
}

View File

@ -1,17 +0,0 @@
package org.bigbluebutton.client.endpoint.redis
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
import akka.actor.ActorSystem
import org.bigbluebutton.client.SystemConfiguration
//import org.bigbluebutton.common.messages.BbbAppsIsAliveMessage
class KeepAliveRedisPublisher(val system: ActorSystem, sender: RedisPublisher) extends SystemConfiguration {
val startedOn = System.currentTimeMillis()
system.scheduler.schedule(2 seconds, 5 seconds) {
// val msg = new BbbAppsIsAliveMessage(startedOn, System.currentTimeMillis())
// sender.publish("bigbluebutton:from-bbb-apps:keepalive", msg.toJson())
}
}

View File

@ -1,8 +0,0 @@
package org.bigbluebutton.client.endpoint.redis
class MessageSender(publisher: RedisPublisher) {
def send(channel: String, data: String) {
publisher.publish(channel, data)
}
}

View File

@ -0,0 +1,50 @@
package org.bigbluebutton.client.endpoint.redis
import org.bigbluebutton.common2.redis.RedisSubscriberProvider
import io.lettuce.core.pubsub.RedisPubSubListener
import org.bigbluebutton.common2.bus.JsonMsgFromAkkaApps
import org.bigbluebutton.common2.redis.RedisConfiguration
import org.bigbluebutton.client.SystemConfiguration
import akka.actor.ActorSystem
import org.bigbluebutton.common2.redis.RedisSubscriber
import org.bigbluebutton.common2.bus.JsonMsgFromAkkaAppsBus
import akka.actor.Props
import org.bigbluebutton.common2.bus.JsonMsgFromAkkaAppsEvent
object Red5AppsRedisSubscriberActor extends RedisSubscriber with RedisConfiguration with SystemConfiguration {
val channels = Seq(fromAkkaAppsRedisChannel, fromAkkaAppsWbRedisChannel, fromAkkaAppsChatRedisChannel, fromAkkaAppsPresRedisChannel, fromThirdPartyRedisChannel)
val patterns = Seq("bigbluebutton:from-bbb-apps:*")
def props(system: ActorSystem, jsonMsgBus: JsonMsgFromAkkaAppsBus): Props =
Props(
classOf[Red5AppsRedisSubscriberActor],
system, jsonMsgBus,
redisHost, redisPort,
channels, patterns).withDispatcher("akka.redis-subscriber-worker-dispatcher")
}
class Red5AppsRedisSubscriberActor(system: ActorSystem, jsonMsgBus: JsonMsgFromAkkaAppsBus,
redisHost: String, redisPort: Int,
channels: Seq[String] = Nil, patterns: Seq[String] = Nil)
extends RedisSubscriberProvider(system, "Red5AppsSub", channels, patterns, null) with SystemConfiguration {
override def addListener(appChannel: String) {
connection.addListener(new RedisPubSubListener[String, String] {
def message(channel: String, message: String): Unit = {
if (channels.contains(channel)) {
val receivedJsonMessage = new JsonMsgFromAkkaApps(channel, message)
jsonMsgBus.publish(JsonMsgFromAkkaAppsEvent(fromAkkaAppsJsonChannel, receivedJsonMessage))
}
}
def message(pattern: String, channel: String, message: String): Unit = { log.info("Subscribed to channel {} with pattern {}", channel, pattern) }
def psubscribed(pattern: String, count: Long): Unit = { log.info("Subscribed to pattern {}", pattern) }
def punsubscribed(pattern: String, count: Long): Unit = { log.info("Unsubscribed from pattern {}", pattern) }
def subscribed(channel: String, count: Long): Unit = { log.info("Subscribed to pattern {}", channel) }
def unsubscribed(channel: String, count: Long): Unit = { log.info("Unsubscribed from channel {}", channel) }
})
}
addListener(null)
subscribe()
}

View File

@ -1,24 +0,0 @@
package org.bigbluebutton.client.endpoint.redis
import redis.RedisClient
import akka.actor.ActorSystem
import akka.event.Logging
import org.bigbluebutton.client.SystemConfiguration
import akka.util.ByteString
class RedisPublisher(val system: ActorSystem) extends SystemConfiguration {
val redis = RedisClient(redisHost, redisPort)(system)
val log = Logging(system, getClass)
// Set the name of this client to be able to distinguish when doing
// CLIENT LIST on redis-cli
redis.clientSetname("Red5AppsPub")
def publish(channel: String, data: String) {
//log.debug("PUBLISH TO [" + channel + "]: \n [" + data + "]")
redis.publish(channel, ByteString(data))
}
}

View File

@ -1,12 +1,11 @@
name := "bbb-common-message"
import org.bigbluebutton.build._
organization := "org.bigbluebutton"
version := "0.0.20-SNAPSHOT"
version := "0.0.19-SNAPSHOT"
val compileSettings = Seq(
organization := "org.bigbluebutton",
scalaVersion := "2.12.6"
scalacOptions ++= Seq(
scalacOptions ++= List(
"-unchecked",
"-deprecation",
"-Xlint",
@ -14,6 +13,11 @@ scalacOptions ++= Seq(
"-language:_",
"-target:jvm-1.8",
"-encoding", "UTF-8"
),
javacOptions ++= List(
"-Xlint:unchecked",
"-Xlint:deprecation"
)
)
resolvers += Resolver.sonatypeRepo("releases")
@ -26,32 +30,8 @@ retrieveManaged := true
testOptions in Test += Tests.Argument(TestFrameworks.Specs2, "html", "console", "junitxml")
testOptions in Test += Tests.Argument(TestFrameworks.ScalaTest, "-h", "target/scalatest-reports")
libraryDependencies ++= {
Seq(
"com.google.code.gson" % "gson" % "2.5"
)}
// https://mvnrepository.com/artifact/org.scala-lang/scala-library
libraryDependencies += "org.scala-lang" % "scala-library" % scalaVersion.value
// https://mvnrepository.com/artifact/org.scala-lang/scala-compiler
libraryDependencies += "org.scala-lang" % "scala-compiler" % scalaVersion.value
libraryDependencies += "junit" % "junit" % "4.12" % "test"
libraryDependencies += "com.novocode" % "junit-interface" % "0.11" % "test"
// https://mvnrepository.com/artifact/org.scalactic/scalactic_2.12
libraryDependencies += "org.scalactic" % "scalactic_2.12" % "3.0.3" % "test"
// For generating test reports
libraryDependencies += "org.pegdown" % "pegdown" % "1.6.0" % "test"
// https://mvnrepository.com/artifact/org.scalatest/scalatest_2.12
libraryDependencies += "org.scalatest" % "scalatest_2.12" % "3.0.3" % "test"
// https://mvnrepository.com/artifact/com.fasterxml.jackson.module/jackson-module-scala_2.12
libraryDependencies += "com.fasterxml.jackson.module" % "jackson-module-scala_2.12" % "2.9.6"
seq(Revolver.settings: _*)
Seq(Revolver.settings: _*)
lazy val commonMessage = (project in file(".")).settings(name := "bbb-common-message", libraryDependencies ++= Dependencies.runtime).settings(compileSettings)
//-----------
// Packaging
@ -112,4 +92,3 @@ pomExtra := (
licenses := Seq("LGPL-3.0" -> url("http://opensource.org/licenses/LGPL-3.0"))
homepage := Some(url("http://www.bigbluebutton.org"))

View File

@ -1,2 +1 @@
sbt clean
sbt publish publishLocal
sbt clean publish publishLocal

View File

@ -0,0 +1,68 @@
package org.bigbluebutton.build
import sbt._
import Keys._
object Dependencies {
object Versions {
// Scala
val scala = "2.12.7"
val junit = "4.12"
val junitInterface = "0.11"
val scalactic = "3.0.3"
// Libraries
val akkaVersion = "2.5.17"
val gson = "2.8.5"
val jackson = "2.9.7"
val sl4j = "1.7.25"
val red5 = "1.0.10-M5"
val pool = "2.6.0"
// Redis
val lettuce = "5.1.3.RELEASE"
// Test
val scalaTest = "3.0.5"
}
object Compile {
val scalaLibrary = "org.scala-lang" % "scala-library" % Versions.scala
val scalaCompiler = "org.scala-lang" % "scala-compiler" % Versions.scala
val akkaActor = "com.typesafe.akka" % "akka-actor_2.12" % Versions.akkaVersion
val googleGson = "com.google.code.gson" % "gson" % Versions.gson
val jacksonModule = "com.fasterxml.jackson.module" %% "jackson-module-scala" % Versions.jackson
val sl4jApi = "org.slf4j" % "slf4j-api" % Versions.sl4j % "runtime"
val red5 = "org.red5" % "red5-server-common" % Versions.red5
val apachePool2 = "org.apache.commons" % "commons-pool2" % Versions.pool
val lettuceCore = "io.lettuce" % "lettuce-core" % Versions.lettuce
}
object Test {
val scalaTest = "org.scalatest" %% "scalatest" % Versions.scalaTest % "test"
val junit = "junit" % "junit" % Versions.junit % "test"
val junitInteface = "com.novocode" % "junit-interface" % Versions.junitInterface % "test"
val scalactic = "org.scalactic" % "scalactic_2.12" % Versions.scalactic % "test"
}
val testing = Seq(
Test.scalaTest,
Test.junit,
Test.junitInteface,
Test.scalactic)
val runtime = Seq(
Compile.scalaLibrary,
Compile.scalaCompiler,
Compile.akkaActor,
Compile.googleGson,
Compile.jacksonModule,
Compile.sl4jApi,
Compile.red5,
Compile.apachePool2,
Compile.lettuceCore) ++ testing
}

View File

@ -1 +1 @@
sbt.version=0.13.8
sbt.version=1.2.6

View File

@ -2,8 +2,8 @@ addSbtPlugin("io.spray" % "sbt-revolver" % "0.9.1")
addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "5.2.4")
addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.0.0")
addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.1.1")
addSbtPlugin("net.vonbuchholtz" % "sbt-dependency-check" % "0.2.7")
addSbtPlugin("net.vonbuchholtz" % "sbt-dependency-check" % "0.2.8")
addSbtPlugin("org.scalastyle" %% "scalastyle-sbt-plugin" % "1.0.0")

View File

@ -1,7 +1,7 @@
/**
* BigBlueButton open source conferencing system - http://www.bigbluebutton.org/
*
* Copyright (c) 2012 BigBlueButton Inc. and by respective authors (see below).
* Copyright (c) 2018 BigBlueButton Inc. and by respective authors (see below).
*
* This program is free software; you can redistribute it and/or modify it under the
* terms of the GNU Lesser General Public License as published by the Free Software
@ -16,28 +16,13 @@
* with BigBlueButton; if not, see <http://www.gnu.org/licenses/>.
*
*/
package org.bigbluebutton.app.screenshare;
package org.bigbluebutton.common2.redis;
import java.util.Map;
import redis.clients.jedis.Jedis;
public class EventRecordingService {
private static final String COLON = ":";
private final String host;
private final int port;
public EventRecordingService(String host, int port) {
this.host = host;
this.port = port;
}
public void record(String meetingId, Map<String, String> event) {
Jedis jedis = new Jedis(host, port);
Long msgid = jedis.incr("global:nextRecordedMsgId");
jedis.hmset("recording:" + meetingId + COLON + msgid, event);
jedis.rpush("meeting:" + meetingId + COLON + "recordings", msgid.toString());
}
public final class Keys {
public static final String MEETING = "meeting-";
public static final String MEETINGS = "meetings";
public static final String MEETING_INFO = "meeting:info:";
public static final String BREAKOUT_MEETING = "meeting:breakout:";
public static final String BREAKOUT_ROOMS = "meeting:breakout:rooms:";
}

View File

@ -0,0 +1,98 @@
/**
* BigBlueButton open source conferencing system - http://www.bigbluebutton.org/
*
* Copyright (c) 2018 BigBlueButton Inc. and by respective authors (see below).
*
* This program is free software; you can redistribute it and/or modify it under the
* terms of the GNU Lesser General Public License as published by the Free Software
* Foundation; either version 3.0 of the License, or (at your option) any later
* version.
*
* BigBlueButton is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
* PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License along
* with BigBlueButton; if not, see <http://www.gnu.org/licenses/>.
*
*/
package org.bigbluebutton.common2.redis;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.slf4j.Logger;
import io.lettuce.core.RedisClient;
import io.lettuce.core.event.Event;
import io.lettuce.core.event.EventBus;
import io.lettuce.core.event.connection.ConnectedEvent;
import io.lettuce.core.event.connection.ConnectionActivatedEvent;
import io.lettuce.core.event.connection.ConnectionDeactivatedEvent;
import io.lettuce.core.event.connection.DisconnectedEvent;
import reactor.core.Disposable;
public abstract class RedisAwareCommunicator {
protected RedisClient redisClient;
protected Disposable eventBusSubscription;
protected EventBus eventBus;
protected String host;
protected String password;
protected int port;
protected String clientName;
protected int expireKey;
public abstract void start();
public abstract void stop();
public void setPassword(String password) {
this.password = password;
}
protected void connectionStatusHandler(Event event, Logger log) {
if (event instanceof ConnectedEvent) {
log.info("Connected to redis");
} else if (event instanceof ConnectionActivatedEvent) {
log.info("Connected to redis activated");
} else if (event instanceof DisconnectedEvent) {
log.info("Disconnected from redis");
} else if (event instanceof ConnectionDeactivatedEvent) {
log.info("Connected to redis deactivated");
}
}
public void setClientName(String clientName) {
this.clientName = clientName;
}
public void setHost(String host) {
this.host = host;
}
public void setPort(int port) {
this.port = port;
}
public void setExpireKey(int expireKey) {
this.expireKey = expireKey;
}
protected GenericObjectPoolConfig createPoolingConfig() {
GenericObjectPoolConfig config = new GenericObjectPoolConfig();
config.setMaxTotal(32);
config.setMaxIdle(8);
config.setMinIdle(1);
config.setTestOnBorrow(true);
config.setTestOnReturn(true);
config.setTestWhileIdle(true);
config.setNumTestsPerEvictionRun(12);
config.setMaxWaitMillis(5000);
config.setTimeBetweenEvictionRunsMillis(60000);
config.setBlockWhenExhausted(true);
return config;
}
}

View File

@ -0,0 +1,116 @@
/**
* BigBlueButton open source conferencing system - http://www.bigbluebutton.org/
*
* Copyright (c) 2018 BigBlueButton Inc. and by respective authors (see below).
*
* This program is free software; you can redistribute it and/or modify it under the
* terms of the GNU Lesser General Public License as published by the Free Software
* Foundation; either version 3.0 of the License, or (at your option) any later
* version.
*
* BigBlueButton is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
* PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License along
* with BigBlueButton; if not, see <http://www.gnu.org/licenses/>.
*
*/
package org.bigbluebutton.common2.redis;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.lettuce.core.ClientOptions;
import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisURI;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.sync.RedisCommands;
public class RedisStorageService extends RedisAwareCommunicator {
private static Logger log = LoggerFactory.getLogger(RedisStorageService.class);
StatefulRedisConnection<String, String> connection;
public void start() {
log.info("Starting RedisStorageService with client name: {}", clientName);
RedisURI redisUri = RedisURI.Builder.redis(this.host, this.port).withClientName(this.clientName)
.withPassword(this.password).build();
redisClient = RedisClient.create(redisUri);
redisClient.setOptions(ClientOptions.builder().autoReconnect(true).build());
eventBus = redisClient.getResources().eventBus();
eventBusSubscription = eventBus.get().subscribe(e -> connectionStatusHandler(e, log));
connection = redisClient.connect();
}
public void stop() {
eventBusSubscription.dispose();
connection.close();
redisClient.shutdown();
log.info("RedisStorageService Stopped");
}
public void recordMeetingInfo(String meetingId, Map<String, String> info) {
log.debug("Storing meeting {} metadata {}", meetingId, info);
recordMeeting(Keys.MEETING_INFO + meetingId, info);
}
public void recordBreakoutInfo(String meetingId, Map<String, String> breakoutInfo) {
log.debug("Saving breakout metadata in {}", meetingId);
recordMeeting(Keys.BREAKOUT_MEETING + meetingId, breakoutInfo);
}
public void addBreakoutRoom(String parentId, String breakoutId) {
log.debug("Saving breakout room for meeting {}", parentId);
RedisCommands<String, String> commands = connection.sync();
commands.sadd(Keys.BREAKOUT_ROOMS + parentId, breakoutId);
}
public void record(String meetingId, Map<String, String> event) {
log.debug("Recording meeting event {} inside a transaction", meetingId);
RedisCommands<String, String> commands = connection.sync();
Long msgid = commands.incr("global:nextRecordedMsgId");
commands.hmset("recording:" + meetingId + ":" + msgid, event);
commands.rpush("meeting:" + meetingId + ":" + "recordings", Long.toString(msgid));
}
// @fixme: not used anywhere
public void removeMeeting(String meetingId) {
log.debug("Removing meeting meeting {} inside a transaction", meetingId);
RedisCommands<String, String> commands = connection.sync();
commands.del(Keys.MEETING + meetingId);
commands.srem(Keys.MEETINGS + meetingId);
}
public void recordAndExpire(String meetingId, Map<String, String> event) {
log.debug("Recording meeting event {} inside a transaction", meetingId);
RedisCommands<String, String> commands = connection.sync();
Long msgid = commands.incr("global:nextRecordedMsgId");
String key = "recording:" + meetingId + ":" + msgid;
commands.hmset(key, event);
/**
* We set the key to expire after 14 days as we are still recording the
* event into redis even if the meeting is not recorded. (ralam sept 23,
* 2015)
*/
commands.expire(key, expireKey);
key = "meeting:" + meetingId + ":recordings";
commands.rpush(key, Long.toString(msgid));
commands.expire(key, expireKey);
}
private String recordMeeting(String key, Map<String, String> info) {
log.debug("Storing metadata {}", info);
String result = "";
RedisCommands<String, String> commands = connection.sync();
result = commands.hmset(key, info);
return result;
}
}

View File

@ -1,4 +1,4 @@
package org.bigbluebutton.red5.pubsub;
package org.bigbluebutton.common2.redis.pubsub;
import java.util.Set;

View File

@ -16,7 +16,7 @@
* with BigBlueButton; if not, see <http://www.gnu.org/licenses/>.
*
*/
package org.bigbluebutton.voiceconf.messaging;
package org.bigbluebutton.common2.redis.pubsub;
public interface MessageHandler {
void handleMessage(String pattern, String channel, String message);

View File

@ -0,0 +1,121 @@
package org.bigbluebutton.common2.redis.pubsub;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.bigbluebutton.common2.redis.RedisAwareCommunicator;
import org.red5.logging.Red5LoggerFactory;
import org.slf4j.Logger;
import io.lettuce.core.ClientOptions;
import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisFuture;
import io.lettuce.core.RedisURI;
import io.lettuce.core.pubsub.RedisPubSubListener;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands;
import io.lettuce.core.support.ConnectionPoolSupport;
public class MessageReceiver extends RedisAwareCommunicator {
private static Logger log = Red5LoggerFactory.getLogger(MessageReceiver.class, "video");
private ReceivedMessageHandler handler;
GenericObjectPool<StatefulRedisPubSubConnection<String, String>> connectionPool;
private final Executor runExec = Executors.newSingleThreadExecutor();
private volatile boolean receiveMessage = false;
private final String FROM_BBB_APPS_PATTERN = "from-akka-apps-redis-channel";
public void start() {
log.info("Ready to receive messages from Redis pubsub.");
receiveMessage = true;
RedisURI redisUri = RedisURI.Builder.redis(this.host, this.port).withClientName(this.clientName).build();
if (!this.password.isEmpty()) {
redisUri.setPassword(this.password);
}
redisClient = RedisClient.create(redisUri);
redisClient.setOptions(ClientOptions.builder().autoReconnect(true).build());
eventBus = redisClient.getResources().eventBus();
eventBusSubscription = eventBus.get().subscribe(e -> connectionStatusHandler(e, log));
connectionPool = ConnectionPoolSupport.createGenericObjectPool(() -> redisClient.connectPubSub(),
createPoolingConfig());
Runnable messageReceiver = new Runnable() {
public void run() {
if (receiveMessage) {
try (StatefulRedisPubSubConnection<String, String> connection = connectionPool.borrowObject()) {
if (receiveMessage) {
connection.addListener(new MessageListener());
RedisPubSubAsyncCommands<String, String> async = connection.async();
RedisFuture<Void> future = async.subscribe(FROM_BBB_APPS_PATTERN);
}
} catch (Exception e) {
log.error("Error resubscribing to channels: ", e);
}
}
}
};
runExec.execute(messageReceiver);
}
public void stop() {
receiveMessage = false;
connectionPool.close();
redisClient.shutdown();
log.info("MessageReceiver Stopped");
}
public void setMessageHandler(ReceivedMessageHandler handler) {
this.handler = handler;
}
private class MessageListener implements RedisPubSubListener<String, String> {
@Override
public void message(String channel, String message) {
handler.handleMessage("", channel, message);
}
@Override
public void message(String pattern, String channel, String message) {
log.debug("RECEIVED onPMessage" + channel + " message=\n" + message);
Runnable task = new Runnable() {
public void run() {
handler.handleMessage(pattern, channel, message);
}
};
runExec.execute(task);
}
@Override
public void subscribed(String channel, long count) {
log.debug("Subscribed to the channel: " + channel);
}
@Override
public void psubscribed(String pattern, long count) {
log.debug("Subscribed to the pattern: " + pattern);
}
@Override
public void unsubscribed(String channel, long count) {
log.debug("Unsubscribed from the channel: " + channel);
}
@Override
public void punsubscribed(String pattern, long count) {
log.debug("Unsubscribed from the pattern: " + pattern);
}
}
}

View File

@ -0,0 +1,94 @@
package org.bigbluebutton.common2.redis.pubsub;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.bigbluebutton.common2.redis.RedisAwareCommunicator;
import org.red5.logging.Red5LoggerFactory;
import org.slf4j.Logger;
import io.lettuce.core.ClientOptions;
import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisFuture;
import io.lettuce.core.RedisURI;
import io.lettuce.core.api.async.RedisAsyncCommands;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import io.lettuce.core.support.ConnectionPoolSupport;
public class MessageSender extends RedisAwareCommunicator {
private static Logger log = Red5LoggerFactory.getLogger(MessageSender.class, "bigbluebutton");
GenericObjectPool<StatefulRedisPubSubConnection<String, String>> connectionPool;
private volatile boolean sendMessage = false;
private final Executor msgSenderExec = Executors.newSingleThreadExecutor();
private final Executor runExec = Executors.newSingleThreadExecutor();
private BlockingQueue<MessageToSend> messages = new LinkedBlockingQueue<MessageToSend>();
public void stop() {
sendMessage = false;
connectionPool.close();
redisClient.shutdown();
}
public void start() {
RedisURI redisUri = RedisURI.Builder.redis(this.host, this.port).withClientName(this.clientName).build();
if (!this.password.isEmpty()) {
redisUri.setPassword(this.password);
}
redisClient = RedisClient.create(redisUri);
redisClient.setOptions(ClientOptions.builder().autoReconnect(true).build());
eventBus = redisClient.getResources().eventBus();
eventBusSubscription = eventBus.get().subscribe(e -> connectionStatusHandler(e, log));
connectionPool = ConnectionPoolSupport.createGenericObjectPool(() -> redisClient.connectPubSub(),
createPoolingConfig());
log.info("Redis org.bigbluebutton.red5.pubsub.message publisher starting!");
try {
sendMessage = true;
Runnable messageSender = new Runnable() {
public void run() {
while (sendMessage) {
try {
MessageToSend msg = messages.take();
publish(msg.getChannel(), msg.getMessage());
} catch (InterruptedException e) {
log.warn("Failed to get org.bigbluebutton.common2.redis.pubsub from queue.");
}
}
}
};
msgSenderExec.execute(messageSender);
} catch (Exception e) {
log.error("Error subscribing to channels: " + e.getMessage());
}
}
public void send(String channel, String message) {
MessageToSend msg = new MessageToSend(channel, message);
messages.add(msg);
}
private void publish(final String channel, final String message) {
Runnable task = new Runnable() {
public void run() {
try (StatefulRedisPubSubConnection<String, String> connection = connectionPool.borrowObject()) {
RedisAsyncCommands<String, String> async = connection.async();
RedisFuture<Long> future = async.publish(channel, message);
} catch (Exception e) {
log.warn("Cannot publish the org.bigbluebutton.red5.pubsub.message to redis", e);
}
}
};
runExec.execute(task);
}
}

View File

@ -0,0 +1,19 @@
package org.bigbluebutton.common2.redis.pubsub;
public class MessageToSend {
private final String channel;
private final String message;
public MessageToSend(String channel, String message) {
this.channel = channel;
this.message = message;
}
public String getChannel() {
return channel;
}
public String getMessage() {
return message;
}
}

View File

@ -1,4 +1,4 @@
package org.bigbluebutton.red5.pubsub;
package org.bigbluebutton.common2.redis.pubsub;
public class ReceivedMessage {
private final String pattern;

View File

@ -1,15 +1,16 @@
package org.bigbluebutton.red5.pubsub;
package org.bigbluebutton.common2.redis.pubsub;
import org.red5.logging.Red5LoggerFactory;
import org.slf4j.Logger;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import org.red5.logging.Red5LoggerFactory;
import org.slf4j.Logger;
public class ReceivedMessageHandler {
private static Logger log = Red5LoggerFactory.getLogger(ReceivedMessageHandler.class, "video");
private static Logger log = Red5LoggerFactory
.getLogger(ReceivedMessageHandler.class/* , "video" */);
private BlockingQueue<ReceivedMessage> receivedMessages = new LinkedBlockingQueue<ReceivedMessage>();

View File

@ -1,4 +1,4 @@
package org.bigbluebutton.core.bus
package org.bigbluebutton.common2.bus
import akka.actor.ActorRef
import akka.event.{ EventBus, LookupClassification }

View File

@ -1,4 +1,4 @@
package org.bigbluebutton.api2.bus
package org.bigbluebutton.common2.bus
import akka.actor.ActorRef
import akka.event.{ EventBus, LookupClassification }

View File

@ -2,9 +2,9 @@ package org.bigbluebutton.api2.bus
import akka.actor.ActorRef
import akka.event.{EventBus, LookupClassification}
import org.bigbluebutton.common2.msgs.{BbbCommonEnvCoreMsg}
import org.bigbluebutton.common2.msgs.BbbCommonMsg
case class MsgFromAkkaApps(val topic: String, val payload: BbbCommonEnvCoreMsg)
case class MsgFromAkkaApps(val topic: String, val payload: BbbCommonMsg)
class MsgFromAkkaAppsEventBus extends EventBus with LookupClassification {
type Event = MsgFromAkkaApps

View File

@ -1,7 +1,8 @@
package org.bigbluebutton.api2.bus
package org.bigbluebutton.common2.bus
import akka.actor.ActorRef
import akka.event.{EventBus, LookupClassification}
import akka.actor.actorRef2Scala
case class OldReceivedJsonMessage(pattern: String, channel: String, msg: String)
case class OldIncomingJsonMessage(val topic: String, val payload: OldReceivedJsonMessage)
@ -11,7 +12,7 @@ class OldMessageEventBus extends EventBus with LookupClassification {
type Classifier = String
type Subscriber = ActorRef
// is used for extracting the classifier from the incoming events
// is used for extracting te classifier from the incoming events
override protected def classify(event: Event): Classifier = event.topic
// will be invoked for each event for all subscribers which registered themselves

View File

@ -1,4 +1,4 @@
package org.bigbluebutton.api2.endpoint.redis
package org.bigbluebutton.common2.redis
class MessageSender(publisher: RedisPublisher) {

View File

@ -0,0 +1,15 @@
package org.bigbluebutton.common2.redis
import akka.actor.ActorSystem
import io.lettuce.core.ClientOptions
import io.lettuce.core.RedisClient
import io.lettuce.core.RedisURI
abstract class RedisClientProvider(val system: ActorSystem, val clientName: String) extends RedisConfiguration {
// Set the name of this client to be able to distinguish when doing
// CLIENT LIST on redis-cli
val redisUri = RedisURI.Builder.redis(redisHost, redisPort).withClientName(clientName).withPassword(redisPassword).build()
var redis = RedisClient.create(redisUri)
redis.setOptions(ClientOptions.builder().autoReconnect(true).build())
}

View File

@ -0,0 +1,25 @@
package org.bigbluebutton.common2.redis
import scala.util.Try
import com.typesafe.config.ConfigFactory
trait RedisConfiguration {
val config = ConfigFactory.load()
// Redis server configuration
lazy val redisHost = Try(config.getString("redis.host")).getOrElse("127.0.0.1")
lazy val redisPort = Try(config.getInt("redis.port")).getOrElse(6379)
lazy val redisPassword = Try(config.getString("redis.password")).getOrElse("")
lazy val redisExpireKey = Try(config.getInt("redis.keyExpiry")).getOrElse(1209600)
// Redis channels
lazy val toAkkaAppsRedisChannel = Try(config.getString("redis.toAkkaAppsRedisChannel")).getOrElse("to-akka-apps-redis-channel")
lazy val fromAkkaAppsRedisChannel = Try(config.getString("redis.fromAkkaAppsRedisChannel")).getOrElse("from-akka-apps-redis-channel")
lazy val toVoiceConfRedisChannel = Try(config.getString("redis.toVoiceConfRedisChannel")).getOrElse("to-voice-conf-redis-channel")
lazy val fromVoiceConfRedisChannel = Try(config.getString("redis.fromVoiceConfRedisChannel")).getOrElse("from-voice-conf-redis-channel")
lazy val fromAkkaAppsWbRedisChannel = Try(config.getString("redis.fromAkkaAppsWbRedisChannel")).getOrElse("from-akka-apps-wb-redis-channel")
lazy val fromAkkaAppsChatRedisChannel = Try(config.getString("redis.fromAkkaAppsChatRedisChannel")).getOrElse("from-akka-apps-chat-redis-channel")
lazy val fromAkkaAppsPresRedisChannel = Try(config.getString("redis.fromAkkaAppsPresRedisChannel")).getOrElse("from-akka-apps-pres-redis-channel")
}

View File

@ -0,0 +1,29 @@
package org.bigbluebutton.common2.redis
import io.lettuce.core.RedisClient
import io.lettuce.core.event.Event
import io.lettuce.core.event.EventBus
import io.lettuce.core.event.connection.{ ConnectionDeactivatedEvent, ConnectionActivatedEvent, ConnectedEvent, DisconnectedEvent }
import reactor.core.Disposable
import akka.event.LoggingAdapter
trait RedisConnectionHandler {
def subscribeToEventBus(redis: RedisClient, log: LoggingAdapter) {
val eventBus: EventBus = redis.getResources().eventBus();
// @todo : unsubscribe when connection is closed
val eventBusSubscription: Disposable = eventBus.get().subscribe(e => connectionStatusHandler(e, log))
}
def connectionStatusHandler(event: Event, log: LoggingAdapter) {
if (event.isInstanceOf[ConnectedEvent]) {
log.info("Connected to redis");
} else if (event.isInstanceOf[ConnectionActivatedEvent]) {
log.info("Connection to redis activated");
} else if (event.isInstanceOf[DisconnectedEvent]) {
log.info("Disconnected from redis");
} else if (event.isInstanceOf[ConnectionDeactivatedEvent]) {
log.info("Connection to redis deactivated");
}
}
}

View File

@ -0,0 +1,21 @@
package org.bigbluebutton.common2.redis
import akka.actor.ActorSystem
import akka.event.Logging
class RedisPublisher(system: ActorSystem, clientName: String) extends RedisClientProvider(system, clientName) with RedisConnectionHandler {
val log = Logging(system, getClass)
subscribeToEventBus(redis, log)
val connection = redis.connectPubSub()
redis.connect()
def publish(channel: String, data: String) {
val async = connection.async();
async.publish(channel, data);
}
}

View File

@ -0,0 +1,13 @@
package org.bigbluebutton.common2.redis
import akka.actor.ActorSystem
abstract class RedisStorageProvider(system: ActorSystem, clientName: String) extends RedisConfiguration {
var redis = new RedisStorageService()
redis.setHost(redisHost)
redis.setPort(redisPort)
redis.setPassword(redisPassword)
redis.setExpireKey(redisExpireKey)
redis.setClientName(clientName)
redis.start();
}

View File

@ -0,0 +1,6 @@
package org.bigbluebutton.common2.redis
trait RedisSubscriber extends RedisConfiguration {
val channels: Seq[String]
val patterns: Seq[String]
}

View File

@ -0,0 +1,62 @@
package org.bigbluebutton.common2.redis
import akka.actor.ActorSystem
import org.bigbluebutton.common2.bus.ReceivedJsonMessage
import org.bigbluebutton.common2.bus.IncomingJsonMessage
import io.lettuce.core.pubsub.RedisPubSubListener
import org.bigbluebutton.common2.bus.IncomingJsonMessageBus
import akka.actor.ActorLogging
import akka.actor.Actor
import akka.actor.ActorSystem
import akka.actor.OneForOneStrategy
import akka.actor.SupervisorStrategy.Resume
import java.io.StringWriter
import scala.concurrent.duration._
import java.io.PrintWriter
abstract class RedisSubscriberProvider(system: ActorSystem, clientName: String,
channels: Seq[String], patterns: Seq[String],
jsonMsgBus: IncomingJsonMessageBus)
extends RedisClientProvider(system, clientName) with RedisConnectionHandler with Actor with ActorLogging {
subscribeToEventBus(redis, log)
var connection = redis.connectPubSub()
def addListener(appChannel: String) {
connection.addListener(new RedisPubSubListener[String, String] {
def message(channel: String, message: String): Unit = {
if (channels.contains(channel)) {
val receivedJsonMessage = new ReceivedJsonMessage(channel, message)
jsonMsgBus.publish(IncomingJsonMessage(appChannel, receivedJsonMessage))
}
}
def message(pattern: String, channel: String, message: String): Unit = { log.info("Subscribed to channel {} with pattern {}", channel, pattern) }
def psubscribed(pattern: String, count: Long): Unit = { log.info("Subscribed to pattern {}", pattern) }
def punsubscribed(pattern: String, count: Long): Unit = { log.info("Unsubscribed from pattern {}", pattern) }
def subscribed(channel: String, count: Long): Unit = { log.info("Subscribed to pattern {}", channel) }
def unsubscribed(channel: String, count: Long): Unit = { log.info("Unsubscribed from channel {}", channel) }
})
}
def subscribe() {
val async = connection.async()
for (channel <- channels) async.subscribe(channel)
for (pattern <- patterns) async.psubscribe(pattern)
}
override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
case e: Exception => {
val sw: StringWriter = new StringWriter()
sw.write("An exception has been thrown on " + getClass + ", exception message [" + e.getMessage + "] (full stacktrace below)\n")
e.printStackTrace(new PrintWriter(sw))
log.error(sw.toString())
Resume
}
}
def receive = {
case _ => // do nothing
}
}

View File

@ -2,7 +2,6 @@ package org.bigbluebutton.common2
import org.bigbluebutton.common2.domain._
trait TestFixtures {
val meetingId = "testMeetingId"
val externalMeetingId = "testExternalMeetingId"
@ -12,6 +11,15 @@ trait TestFixtures {
val record = false
val voiceConfId = "85115"
val durationInMinutes = 10
val maxInactivityTimeoutMinutes = 120
val warnMinutesBeforeMax = 30
val meetingExpireIfNoUserJoinedInMinutes = 5
val meetingExpireWhenLastUserLeftInMinutes = 10
val userInactivityInspectTimerInMinutes = 60
val userInactivityThresholdInMinutes = 10
val userActivitySignResponseDelayInMinutes = 5
val autoStartRecording = false
val allowStartStopRecording = false
val webcamsOnlyForModerator = false
@ -25,19 +33,23 @@ trait TestFixtures {
val modOnlyMessage = "Moderator only message"
val dialNumber = "613-555-1234"
val maxUsers = 25
val muteOnStart = false
val guestPolicy = "ALWAYS_ASK"
val metadata: collection.immutable.Map[String, String] = Map("foo" -> "bar", "bar" -> "baz", "baz" -> "foo")
val meetingProp = MeetingProp(name = meetingName, extId = externalMeetingId, intId = meetingId,
isBreakout = isBreakout.booleanValue())
val breakoutProps = BreakoutProps(parentId = parentMeetingId, sequence = sequence, breakoutRooms = Vector())
val durationProps = DurationProps(duration = durationInMinutes, createdTime = createTime, createdDate = createDate)
val breakoutProps = BreakoutProps(parentId = parentMeetingId, sequence = sequence, freeJoin = false, breakoutRooms = Vector())
val durationProps = DurationProps(duration = durationInMinutes, createdTime = createTime, createdDate = createDate, maxInactivityTimeoutMinutes = maxInactivityTimeoutMinutes, warnMinutesBeforeMax = warnMinutesBeforeMax,
meetingExpireIfNoUserJoinedInMinutes = meetingExpireIfNoUserJoinedInMinutes, meetingExpireWhenLastUserLeftInMinutes = meetingExpireWhenLastUserLeftInMinutes,
userInactivityInspectTimerInMinutes = userInactivityInspectTimerInMinutes, userInactivityThresholdInMinutes = userInactivityInspectTimerInMinutes, userActivitySignResponseDelayInMinutes = userActivitySignResponseDelayInMinutes)
val password = PasswordProp(moderatorPass = moderatorPassword, viewerPass = viewerPassword)
val recordProp = RecordProp(record = record, autoStartRecording = autoStartRecording,
allowStartStopRecording = allowStartStopRecording)
val welcomeProp = WelcomeProp(welcomeMsgTemplate = welcomeMsgTemplate, welcomeMsg = welcomeMsg,
modOnlyMessage = modOnlyMessage)
val voiceProp = VoiceProp(telVoice = voiceConfId, voiceConf = voiceConfId, dialNumber = dialNumber)
val voiceProp = VoiceProp(telVoice = voiceConfId, voiceConf = voiceConfId, dialNumber = dialNumber, muteOnStart = muteOnStart)
val usersProp = UsersProp(maxUsers = maxUsers, webcamsOnlyForModerator = webcamsOnlyForModerator,
guestPolicy = guestPolicy)
val metadataProp = new MetadataProp(metadata)

View File

@ -1,13 +1,12 @@
package org.bigbluebutton.common2.messages
import com.fasterxml.jackson.databind.JsonNode
import org.bigbluebutton.common2.messages.MessageBody.CreateMeetingReqMsgBody
import org.bigbluebutton.common2.msgs._
import org.bigbluebutton.common2.util.JsonUtil
import org.bigbluebutton.common2.{ TestFixtures, UnitSpec2 }
import scala.util.{ Failure, Success }
class DeserializerTests extends UnitSpec2 with TestFixtures {
object Deserializer extends Deserializer
@ -46,9 +45,10 @@ class DeserializerTests extends UnitSpec2 with TestFixtures {
println(map)
map match {
case Success(envJsNodeMsg) => assert(envJsNodeMsg.core.isInstanceOf[JsonNode])
val createMeetingReqMsg = Deserializer.toCreateMeetingReqMsg(envJsNodeMsg.envelope, envJsNodeMsg.core)
createMeetingReqMsg match {
case Success(envJsNodeMsg) =>
assert(envJsNodeMsg.core.isInstanceOf[JsonNode])
val (msg, exception) = Deserializer.toBbbCommonMsg[CreateMeetingReqMsg](envJsNodeMsg.core)
msg match {
case Some(cmrq) => assert(cmrq.isInstanceOf[CreateMeetingReqMsg])
case None => fail("Failed to decode CreateMeetingReqMsg")
}
@ -71,7 +71,8 @@ class DeserializerTests extends UnitSpec2 with TestFixtures {
println(map)
map match {
case Success(envJsNodeMsg) => assert(envJsNodeMsg.core.isInstanceOf[JsonNode])
case Success(envJsNodeMsg) =>
assert(envJsNodeMsg.core.isInstanceOf[JsonNode])
val (msg, exception) = Deserializer.toBbbCommonMsg[CreateMeetingReqMsg](envJsNodeMsg.core)
msg match {
case Some(cmrq) => assert(cmrq.isInstanceOf[CreateMeetingReqMsg])

View File

@ -1,15 +1,13 @@
package org.bigbluebutton.common2.util
import org.bigbluebutton.common2.{ TestFixtures, UnitSpec2 }
import org.bigbluebutton.common2.messages._
import org.bigbluebutton.common2.msgs._
import scala.collection.immutable.List
import com.fasterxml.jackson.databind.JsonNode
import org.bigbluebutton.common2.messages.MessageBody.ValidateAuthTokenReqMsgBody
import scala.util.{ Failure, Success }
case class Person(name: String, age: Int)
case class Group(name: String, persons: Seq[Person], leader: Person)
@ -52,7 +50,7 @@ class JsonUtilTest extends UnitSpec2 with TestFixtures {
}
"JsonUtil" should "unmarshall a ValidateAuthTokenReq" in {
val header: BbbCoreHeaderWithMeetingId = new BbbCoreHeaderWithMeetingId("foo", "mId")
val header: BbbClientMsgHeader = new BbbClientMsgHeader("foo", "mId", "uId")
val body: ValidateAuthTokenReqMsgBody = new ValidateAuthTokenReqMsgBody(userId = "uId", authToken = "myToken")
val msg: ValidateAuthTokenReqMsg = new ValidateAuthTokenReqMsg(header, body)
val json = JsonUtil.toJson(msg)

View File

@ -1,12 +1,11 @@
name := "bbb-common-web"
import org.bigbluebutton.build._
organization := "org.bigbluebutton"
version := "0.0.3-SNAPSHOT"
version := "0.0.2-SNAPSHOT"
val compileSettings = Seq(
organization := "org.bigbluebutton",
scalaVersion := "2.12.6"
scalacOptions ++= Seq(
scalacOptions ++= List(
"-unchecked",
"-deprecation",
"-Xlint",
@ -14,6 +13,11 @@ scalacOptions ++= Seq(
"-language:_",
"-target:jvm-1.8",
"-encoding", "UTF-8"
),
javacOptions ++= List(
"-Xlint:unchecked",
"-Xlint:deprecation"
)
)
// We want to have our jar files in lib_managed dir.
@ -25,82 +29,8 @@ testOptions in Test += Tests.Argument(TestFrameworks.Specs2, "html", "console",
testOptions in Test += Tests.Argument(TestFrameworks.ScalaTest, "-h", "target/scalatest-reports")
val akkaVersion = "2.5.14"
// https://mvnrepository.com/artifact/org.scala-lang/scala-library
libraryDependencies += "org.scala-lang" % "scala-library" % scalaVersion.value
// https://mvnrepository.com/artifact/org.scala-lang/scala-compiler
libraryDependencies += "org.scala-lang" % "scala-compiler" % scalaVersion.value
// https://mvnrepository.com/artifact/com.typesafe.akka/akka-actor_2.12
libraryDependencies += "com.typesafe.akka" % "akka-actor_2.12" % akkaVersion
// https://mvnrepository.com/artifact/com.typesafe.akka/akka-slf4j_2.12
libraryDependencies += "com.typesafe.akka" % "akka-slf4j_2.12" % akkaVersion
// https://mvnrepository.com/artifact/com.github.etaty/rediscala_2.12
libraryDependencies += "com.github.etaty" % "rediscala_2.12" % "1.8.0"
libraryDependencies += "com.softwaremill.quicklens" %% "quicklens" % "1.4.11"
libraryDependencies += "org.bigbluebutton" % "bbb-common-message_2.12" % "0.0.19-SNAPSHOT"
// https://mvnrepository.com/artifact/com.fasterxml.jackson.module/jackson-module-scala_2.12
libraryDependencies += "com.fasterxml.jackson.module" % "jackson-module-scala_2.12" % "2.9.6"
libraryDependencies += "redis.clients" % "jedis" % "2.9.0"
libraryDependencies += "com.google.code.gson" % "gson" % "2.8.5"
// https://mvnrepository.com/artifact/org.apache.commons/commons-lang3
libraryDependencies += "org.apache.commons" % "commons-lang3" % "3.7"
libraryDependencies += "commons-io" % "commons-io" % "2.6"
libraryDependencies += "org.apache.commons" % "commons-pool2" % "2.6.0"
libraryDependencies += "com.zaxxer" % "nuprocess" % "1.2.4"
// https://mvnrepository.com/artifact/org.jodconverter/jodconverter-core
libraryDependencies += "org.jodconverter" % "jodconverter-local" % "4.2.0"
// https://mvnrepository.com/artifact/org.libreoffice/unoil
libraryDependencies += "org.libreoffice" % "unoil" % "5.4.2"
// https://mvnrepository.com/artifact/org.libreoffice/ridl
libraryDependencies += "org.libreoffice" % "ridl" % "5.4.2"
// https://mvnrepository.com/artifact/org.libreoffice/juh
libraryDependencies += "org.libreoffice" % "juh" % "5.4.2"
// https://mvnrepository.com/artifact/org.libreoffice/jurt
libraryDependencies += "org.libreoffice" % "jurt" % "5.4.2"
libraryDependencies += "org.apache.poi" % "poi-ooxml" % "3.17"
libraryDependencies += "org.slf4j" % "slf4j-api" % "1.7.25"
// https://mvnrepository.com/artifact/org.apache.httpcomponents/httpclient
libraryDependencies += "org.apache.httpcomponents" % "httpclient" % "4.5.6"
// https://mvnrepository.com/artifact/org.apache.httpcomponents/httpasyncclient
libraryDependencies += "org.apache.httpcomponents" % "httpasyncclient" % "4.1.4"
libraryDependencies += "org.freemarker" % "freemarker" % "2.3.28"
libraryDependencies += "com.fasterxml.jackson.dataformat" % "jackson-dataformat-xml" % "2.9.6"
// https://mvnrepository.com/artifact/org.codehaus.woodstox/woodstox-core-asl
libraryDependencies += "org.codehaus.woodstox" % "woodstox-core-asl" % "4.4.1"
libraryDependencies += "org.pegdown" % "pegdown" % "1.4.0" % "test"
libraryDependencies += "junit" % "junit" % "4.12" % "test"
libraryDependencies += "com.novocode" % "junit-interface" % "0.11" % "test"
// https://mvnrepository.com/artifact/org.mockito/mockito-core
libraryDependencies += "org.mockito" % "mockito-core" % "2.7.12" % "test"
libraryDependencies += "org.scalactic" %% "scalactic" % "3.0.1" % "test"
libraryDependencies += "org.scalatest" %% "scalatest" % "3.0.1" % "test"
// https://mvnrepository.com/artifact/com.typesafe.akka/akka-testkit_2.12
libraryDependencies += "com.typesafe.akka" % "akka-testkit_2.12" % akkaVersion % "test"
// https://mvnrepository.com/artifact/org.scala-lang.modules/scala-xml_2.12
libraryDependencies += "org.scala-lang.modules" % "scala-xml_2.12" % "1.1.0"
seq(Revolver.settings: _*)
Seq(Revolver.settings: _*)
lazy val commonWeb = (project in file(".")).settings(name := "bbb-common-web", libraryDependencies ++= Dependencies.runtime).settings(compileSettings)
//-----------
// Packaging
@ -161,5 +91,3 @@ pomExtra := (
licenses := Seq("LGPL-3.0" -> url("http://opensource.org/licenses/LGPL-3.0"))
homepage := Some(url("http://www.bigbluebutton.org"))

View File

@ -1,3 +1 @@
sbt clean
sbt publish publishLocal
sbt clean publish publishLocal

View File

@ -0,0 +1,103 @@
package org.bigbluebutton.build
import sbt._
import Keys._
object Dependencies {
object Versions {
// Scala
val scala = "2.12.7"
val junit = "4.12"
val junitInterface = "0.11"
val scalactic = "3.0.3"
// Libraries
val akkaVersion = "2.5.17"
val gson = "2.8.5"
val jackson = "2.9.7"
val freemaker = "2.3.28"
val apacheHttp = "4.5.6"
val apacheHttpAsync = "4.1.4"
// Office and document conversion
val apacheOffice = "4.0.0"
val jodConverter = "4.2.1"
val apachePoi = "3.17"
val nuProcess = "1.2.4"
val libreOffice = "5.4.2"
// Apache Commons
val lang = "3.8.1"
val io = "2.6"
val pool = "2.6.0"
// BigBlueButton
val bbbCommons = "0.0.20-SNAPSHOT"
// Test
val scalaTest = "3.0.5"
}
object Compile {
val scalaLibrary = "org.scala-lang" % "scala-library" % Versions.scala
val scalaCompiler = "org.scala-lang" % "scala-compiler" % Versions.scala
val akkaActor = "com.typesafe.akka" % "akka-actor_2.12" % Versions.akkaVersion
val akkaSl4fj = "com.typesafe.akka" % "akka-slf4j_2.12" % Versions.akkaVersion % "runtime"
val googleGson = "com.google.code.gson" % "gson" % Versions.gson
val jacksonModule = "com.fasterxml.jackson.module" %% "jackson-module-scala" % Versions.jackson
val jacksonXml = "com.fasterxml.jackson.dataformat" % "jackson-dataformat-xml" % Versions.jackson
val freeMaker = "org.freemarker" % "freemarker" % Versions.freemaker
val apacheHttp = "org.apache.httpcomponents" % "httpclient" % Versions.apacheHttp
val apacheHttpAsync = "org.apache.httpcomponents" % "httpasyncclient" % Versions.apacheHttpAsync
val poiXml = "org.apache.poi" % "poi-ooxml" % Versions.apachePoi
val jodConverter = "org.jodconverter" % "jodconverter-local" % Versions.jodConverter
val nuProcess = "com.zaxxer" % "nuprocess" % Versions.nuProcess
val officeUnoil = "org.libreoffice" % "unoil" % Versions.libreOffice
val officeRidl = "org.libreoffice" % "ridl" % Versions.libreOffice
val officeJuh = "org.libreoffice" % "juh" % Versions.libreOffice
val officejurt = "org.libreoffice" % "jurt" % Versions.libreOffice
val apacheLang = "org.apache.commons" % "commons-lang3" % Versions.lang
val apacheIo = "commons-io" % "commons-io" % Versions.io
val apachePool2 = "org.apache.commons" % "commons-pool2" % Versions.pool
val bbbCommons = "org.bigbluebutton" % "bbb-common-message_2.12" % Versions.bbbCommons
}
object Test {
val scalaTest = "org.scalatest" %% "scalatest" % Versions.scalaTest % "test"
val junit = "junit" % "junit" % Versions.junit % "test"
val junitInteface = "com.novocode" % "junit-interface" % Versions.junitInterface % "test"
val scalactic = "org.scalactic" % "scalactic_2.12" % Versions.scalactic % "test"
}
val testing = Seq(
Test.scalaTest,
Test.junit,
Test.junitInteface,
Test.scalactic)
val runtime = Seq(
Compile.scalaLibrary,
Compile.scalaCompiler,
Compile.akkaActor,
Compile.akkaSl4fj,
Compile.googleGson,
Compile.jacksonModule,
Compile.jacksonXml,
Compile.freeMaker,
Compile.apacheHttp,
Compile.apacheHttpAsync,
Compile.poiXml,
Compile.jodConverter,
Compile.nuProcess,
Compile.apacheLang,
Compile.apacheIo,
Compile.apachePool2,
Compile.bbbCommons) ++ testing
}

View File

@ -1 +1 @@
sbt.version=0.13.8
sbt.version=1.2.6

View File

@ -2,9 +2,7 @@ addSbtPlugin("io.spray" % "sbt-revolver" % "0.9.1")
addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "5.2.4")
addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.0.0")
addSbtPlugin("com.artima.supersafe" % "sbtplugin" % "1.1.7")
addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.1.1")
addSbtPlugin("net.vonbuchholtz" % "sbt-dependency-check" % "0.2.7")

View File

@ -48,7 +48,6 @@ import org.bigbluebutton.api.domain.RegisteredUser;
import org.bigbluebutton.api.domain.User;
import org.bigbluebutton.api.domain.UserSession;
import org.bigbluebutton.api.messaging.MessageListener;
import org.bigbluebutton.api.messaging.RedisStorageService;
import org.bigbluebutton.api.messaging.converters.messages.DestroyMeetingMessage;
import org.bigbluebutton.api.messaging.converters.messages.EndMeetingMessage;
import org.bigbluebutton.api.messaging.messages.CreateBreakoutRoom;
@ -77,6 +76,7 @@ import org.bigbluebutton.api.messaging.messages.UserStatusChanged;
import org.bigbluebutton.api.messaging.messages.UserUnsharedWebcam;
import org.bigbluebutton.api2.IBbbWebApiGWApp;
import org.bigbluebutton.api2.domain.UploadedTrack;
import org.bigbluebutton.common2.redis.RedisStorageService;
import org.bigbluebutton.presentation.PresentationUrlDownloadService;
import org.bigbluebutton.web.services.RegisteredUserCleanupTimerTask;
import org.bigbluebutton.web.services.callback.CallbackUrlService;

View File

@ -1,91 +0,0 @@
package org.bigbluebutton.api.messaging;
import java.util.Map;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.Protocol;
public class RedisStorageService {
private static Logger log = LoggerFactory.getLogger(RedisStorageService.class);
private JedisPool redisPool;
private String host;
private int port;
public void stop() {
}
public void start() {
// Set the name of this client to be able to distinguish when doing
// CLIENT LIST on redis-cli
redisPool = new JedisPool(new GenericObjectPoolConfig<Object>(), host, port, Protocol.DEFAULT_TIMEOUT, null,
Protocol.DEFAULT_DATABASE, "BbbRed5AppsPub");
}
public void recordMeetingInfo(String meetingId, Map<String, String> info) {
Jedis jedis = redisPool.getResource();
try {
if (log.isDebugEnabled()) {
for (Map.Entry<String,String> entry : info.entrySet()) {
log.debug("Storing metadata {} = {}", entry.getKey(), entry.getValue());
}
}
log.debug("Saving metadata in {}", meetingId);
jedis.hmset("meeting:info:" + meetingId, info);
} catch (Exception e) {
log.warn("Cannot record the info meeting: {}", meetingId, e);
} finally {
jedis.close();
}
}
public void recordBreakoutInfo(String meetingId, Map<String, String> breakoutInfo) {
Jedis jedis = redisPool.getResource();
try {
log.debug("Saving breakout metadata in {}", meetingId);
jedis.hmset("meeting:breakout:" + meetingId, breakoutInfo);
} catch (Exception e) {
log.warn("Cannot record the info meeting: {}", meetingId, e);
} finally {
jedis.close();
}
}
public void addBreakoutRoom(String parentId, String breakoutId) {
Jedis jedis = redisPool.getResource();
try {
log.debug("Saving breakout room for meeting {}", parentId);
jedis.sadd("meeting:breakout:rooms:" + parentId, breakoutId);
} catch (Exception e) {
log.warn("Cannot record the info meeting:" + parentId, e);
} finally {
jedis.close();
}
}
public void removeMeeting(String meetingId) {
Jedis jedis = redisPool.getResource();
try {
jedis.del("meeting-" + meetingId);
jedis.srem("meetings", meetingId);
} finally {
jedis.close();
}
}
public void setHost(String host) {
this.host = host;
}
public void setPort(int port) {
this.port = port;
}
}

View File

@ -1,92 +0,0 @@
/**
* BigBlueButton open source conferencing system - http://www.bigbluebutton.org/
*
* Copyright (c) 2012 BigBlueButton Inc. and by respective authors (see below).
*
* This program is free software; you can redistribute it and/or modify it under the
* terms of the GNU Lesser General Public License as published by the Free Software
* Foundation; either version 3.0 of the License, or (at your option) any later
* version.
*
* BigBlueButton is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
* PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License along
* with BigBlueButton; if not, see <http://www.gnu.org/licenses/>.
*
*/
package org.bigbluebutton.web.services;
import java.util.HashMap;
import java.util.Map;
import org.bigbluebutton.api.domain.Poll;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
public class RedisStorageService implements IStorageService{
JedisPool jedisPool;
private static final String SEPARATOR = ":";
private static final String ID_SEED = "nextID";
/* Meeting Patterns */
private static final String MEETING = "meeting";
private static final String POLL = "poll";
private static final String POLL_ANSWER = "answer";
private static final String POLL_RESULTS = "results";
/*
meeting:<id>:poll:list [1,2,3] <-- list
meeting:<id>:poll:<pollid> title, date <-- hash
meeting:<id>:poll:<pollid>:answer:list [1,2,3] <-- list
meeting:<id>:poll:<pollid>:answer:<answerid> answertext <-- key/value
meeting:<id>:poll:<pollid>:answer:<answerid>:results [<userid>|1] <-- Set
*/
public String generatePollID(String meetingID){
Jedis jedis = (Jedis) jedisPool.getResource();
String pattern = getPollRedisPattern(meetingID);
String pollID = Long.toString(jedis.incr(pattern + SEPARATOR + ID_SEED));
jedisPool.returnResource(jedis);
return pollID;
}
public String generatePollAnswerID(String meetingID){
Jedis jedis = jedisPool.getResource();
String pattern = getPollRedisPattern(meetingID);
String pollID = Long.toString(jedis.incr(pattern + SEPARATOR + POLL_ANSWER + SEPARATOR + ID_SEED));
jedisPool.returnResource(jedis);
return pollID;
}
public void storePoll(Poll p){
Jedis jedis = jedisPool.getResource();
String pattern = getPollRedisPattern(p.getMeetingID());
HashMap<String,String> pollMap = p.toMap();
jedis.hmset(pattern + SEPARATOR + p.getPollID(), pollMap);
jedisPool.returnResource(jedis);
}
public void storePollAnswers(String meetingID, String pollID, Map<String,String> answers){
Jedis jedis = jedisPool.getResource();
String pattern = getPollRedisPattern(meetingID);
//HashMap<String,String> pollMap = p.toMap();
//jedis.hmset(pattern + SEPARATOR + p.getPollID + SEPARATOR + POLL_ANSWER + SEPARATOR + ID_SEED, pollMap);
//jedisPool.returnResource(jedis);
}
private String getPollRedisPattern(String meetingID){
return MEETING + SEPARATOR + meetingID + SEPARATOR + POLL;
}
public void setJedisPool(JedisPool jedisPool){
this.jedisPool = jedisPool;
}
}

View File

@ -5,12 +5,14 @@ import akka.actor.ActorSystem
import akka.event.Logging
import org.bigbluebutton.api.messaging.converters.messages._
import org.bigbluebutton.api2.bus._
import org.bigbluebutton.api2.endpoint.redis.{ AppsRedisSubscriberActor, MessageSender, RedisPublisher }
import org.bigbluebutton.api2.endpoint.redis.{ WebRedisSubscriberActor }
import org.bigbluebutton.common2.redis.MessageSender
import org.bigbluebutton.api2.meeting.{ OldMeetingMsgHdlrActor, RegisterUser }
import org.bigbluebutton.common2.domain._
import org.bigbluebutton.presentation.messages._
import scala.concurrent.duration._
import org.bigbluebutton.common2.redis._
import org.bigbluebutton.common2.bus._
class BbbWebApiGWApp(
val oldMessageReceivedGW: OldMessageReceivedGW,
@ -24,10 +26,8 @@ class BbbWebApiGWApp(
val log = Logging(system, getClass)
log.debug("*********** meetingManagerChannel = " + meetingManagerChannel)
private val jsonMsgToAkkaAppsBus = new JsonMsgToAkkaAppsBus
private val redisPublisher = new RedisPublisher(system)
private val redisPublisher = new RedisPublisher(system, "BbbWebPub")
private val msgSender: MessageSender = new MessageSender(redisPublisher)
private val messageSenderActorRef = system.actorOf(MessageSenderActor.props(msgSender), "messageSenderActor")
@ -54,8 +54,7 @@ class BbbWebApiGWApp(
msgToAkkaAppsEventBus.subscribe(msgToAkkaAppsToJsonActor, toAkkaAppsChannel)
private val appsRedisSubscriberActor = system.actorOf(
AppsRedisSubscriberActor.props(receivedJsonMsgBus, oldMessageEventBus), "appsRedisSubscriberActor")
private val appsRedisSubscriberActor = system.actorOf(WebRedisSubscriberActor.props(system, receivedJsonMsgBus, oldMessageEventBus), "appsRedisSubscriberActor")
private val receivedJsonMsgHdlrActor = system.actorOf(
ReceivedJsonMsgHdlrActor.props(msgFromAkkaAppsEventBus), "receivedJsonMsgHdlrActor")

View File

@ -3,17 +3,11 @@ package org.bigbluebutton.api2
import com.typesafe.config.ConfigFactory
import scala.util.Try
import org.bigbluebutton.common2.redis.RedisConfiguration
trait SystemConfiguration {
val config = ConfigFactory.load("bbb-web")
trait SystemConfiguration extends RedisConfiguration {
override val config = ConfigFactory.load("bbb-web")
lazy val redisHost = Try(config.getString("redis.host")).getOrElse("127.0.0.1")
lazy val redisPort = Try(config.getInt("redis.port")).getOrElse(6379)
lazy val redisPassword = Try(config.getString("redis.password")).getOrElse("")
lazy val toAkkaAppsRedisChannel = Try(config.getString("redis.toAkkaAppsRedisChannel")).getOrElse("to-akka-apps-redis-channel")
lazy val fromAkkaAppsRedisChannel = Try(config.getString("redis.fromAkkaAppsRedisChannel")).getOrElse("from-akka-apps-redis-channel")
lazy val meetingManagerChannel = Try(config.getString("eventBus.meetingManagerChannel")).getOrElse("FOOOOOOOOO")
lazy val fromAkkaAppsChannel = Try(config.getString("eventBus.fromAkkaAppsChannel")).getOrElse("from-akka-apps-channel")
lazy val toAkkaAppsChannel = Try(config.getString("eventBus.toAkkaAppsChannel")).getOrElse("to-akka-apps-channel")
lazy val fromClientChannel = Try(config.getString("eventBus.fromClientChannel")).getOrElse("from-client-channel")

View File

@ -4,7 +4,7 @@ import java.io.{PrintWriter, StringWriter}
import akka.actor.SupervisorStrategy.Resume
import akka.actor.{Actor, ActorLogging, OneForOneStrategy, Props}
import org.bigbluebutton.api2.endpoint.redis.MessageSender
import org.bigbluebutton.common2.redis.MessageSender
import scala.concurrent.duration._
object MessageSenderActor {

Some files were not shown because too many files have changed in this diff Show More