Merge branch 'riadvice-akka-transcode-update'
This commit is contained in:
commit
4e32514e03
@ -7,13 +7,13 @@ object Dependencies {
|
||||
|
||||
object Versions {
|
||||
// Scala
|
||||
val scala = "2.12.7"
|
||||
val scala = "2.12.8"
|
||||
val junit = "4.12"
|
||||
val junitInterface = "0.11"
|
||||
val scalactic = "3.0.3"
|
||||
|
||||
// Libraries
|
||||
val akkaVersion = "2.5.17"
|
||||
val akkaVersion = "2.5.19"
|
||||
val gson = "2.8.5"
|
||||
val jackson = "2.9.7"
|
||||
val logback = "1.2.3"
|
||||
|
@ -1 +1 @@
|
||||
sbt.version=1.2.6
|
||||
sbt.version=1.2.7
|
@ -4,8 +4,8 @@ addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "5.2.4")
|
||||
|
||||
addSbtPlugin("org.scalariform" % "sbt-scalariform" % "1.8.2")
|
||||
|
||||
addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.3.12")
|
||||
addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.3.15")
|
||||
|
||||
addSbtPlugin("net.vonbuchholtz" % "sbt-dependency-check" % "0.2.8")
|
||||
addSbtPlugin("net.vonbuchholtz" % "sbt-dependency-check" % "0.2.9")
|
||||
|
||||
addSbtPlugin("org.scalastyle" %% "scalastyle-sbt-plugin" % "1.0.0")
|
||||
|
@ -1 +0,0 @@
|
||||
JAVA_OPTS="-Dconfig.file=${{chdir}}/conf/application.conf $JAVA_OPTS"
|
@ -3,10 +3,12 @@ import org.bigbluebutton.build._
|
||||
import scalariform.formatter.preferences._
|
||||
import com.typesafe.sbt.SbtScalariform
|
||||
import com.typesafe.sbt.SbtScalariform.ScalariformKeys
|
||||
|
||||
import NativePackagerHelper._
|
||||
import com.typesafe.sbt.SbtNativePackager.autoImport._
|
||||
|
||||
enablePlugins(JavaServerAppPackaging)
|
||||
enablePlugins(UniversalPlugin)
|
||||
enablePlugins(DebianPlugin)
|
||||
|
||||
version := "0.0.2"
|
||||
|
||||
@ -81,4 +83,6 @@ daemonUser in Linux := user
|
||||
// group which will execute the application
|
||||
daemonGroup in Linux := group
|
||||
|
||||
javaOptions in Universal ++= Seq("-J-Xms130m", "-J-Xmx256m", "-Dconfig.file=conf/application.conf", "-Dlogback.configurationFile=conf/logback.xml")
|
||||
|
||||
debianPackageDependencies in Debian ++= Seq("java8-runtime-headless", "bash")
|
||||
|
@ -7,12 +7,12 @@ object Dependencies {
|
||||
|
||||
object Versions {
|
||||
// Scala
|
||||
val scala = "2.12.7"
|
||||
val scala = "2.12.8"
|
||||
val junitInterface = "0.11"
|
||||
val scalactic = "3.0.3"
|
||||
|
||||
// Libraries
|
||||
val akkaVersion = "2.5.17"
|
||||
val akkaVersion = "2.5.19"
|
||||
val logback = "1.2.3"
|
||||
|
||||
// Apache Commons
|
||||
|
@ -1 +1 @@
|
||||
sbt.version=1.2.6
|
||||
sbt.version=1.2.7
|
@ -4,8 +4,8 @@ addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "5.2.4")
|
||||
|
||||
addSbtPlugin("org.scalariform" % "sbt-scalariform" % "1.8.2")
|
||||
|
||||
addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.3.12")
|
||||
addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.3.15")
|
||||
|
||||
addSbtPlugin("net.vonbuchholtz" % "sbt-dependency-check" % "0.2.8")
|
||||
addSbtPlugin("net.vonbuchholtz" % "sbt-dependency-check" % "0.2.9")
|
||||
|
||||
addSbtPlugin("org.scalastyle" %% "scalastyle-sbt-plugin" % "1.0.0")
|
||||
|
@ -1 +1,6 @@
|
||||
sbt clean run
|
||||
#!/usr/bin/env bash
|
||||
|
||||
sbt clean stage
|
||||
sudo service bbb-fsesl-akka stop
|
||||
cd target/universal/stage
|
||||
./bin/bbb-fsesl-akka
|
||||
|
@ -1 +0,0 @@
|
||||
JAVA_OPTS="-Dconfig.file=${{chdir}}/conf/application.conf $JAVA_OPTS"
|
@ -1,42 +0,0 @@
|
||||
# #################################
|
||||
# ##### Default configuration #####
|
||||
# #################################
|
||||
|
||||
# Available replacements
|
||||
# ------------------------------------------------
|
||||
# ${{author}} debian author
|
||||
# ${{descr}} debian package description
|
||||
# ${{exec}} startup script name
|
||||
# ${{chdir}} app directory
|
||||
# ${{retries}} retries for startup
|
||||
# ${{retryTimeout}} retry timeout
|
||||
# ${{app_name}} normalized app name
|
||||
# ${{daemon_user}} daemon user
|
||||
# -------------------------------------------------
|
||||
# DEPRECATED, use -J-Xmx1024m instead
|
||||
# -mem 1024
|
||||
|
||||
# Setting -X directly (-J is stripped)
|
||||
# -J-X
|
||||
# -J-Xmx1024
|
||||
|
||||
# Add additional jvm parameters
|
||||
# -Dkey=val
|
||||
|
||||
# For play applications you may set
|
||||
# -Dpidfile.path=/var/run/${{app_name}}/play.pid
|
||||
|
||||
# Turn on JVM debugging, open at the given port
|
||||
# -jvm-debug <port>
|
||||
|
||||
# Don't run the java version check
|
||||
# -no-version-check
|
||||
|
||||
-J-Xms130m
|
||||
-J-Xmx256m
|
||||
|
||||
# With universal:packageBin:
|
||||
# - setup with a configuration tool after unzip
|
||||
# - use the path to the application.ini file
|
||||
# -Dconfig.file=${{path_to}}/conf/application.conf
|
||||
|
@ -1,28 +1,36 @@
|
||||
import org.bigbluebutton.build._
|
||||
|
||||
import scalariform.formatter.preferences._
|
||||
import com.typesafe.sbt.SbtScalariform
|
||||
import com.typesafe.sbt.SbtScalariform.ScalariformKeys
|
||||
import NativePackagerHelper._
|
||||
import com.typesafe.sbt.SbtNativePackager.autoImport._
|
||||
|
||||
enablePlugins(JavaServerAppPackaging)
|
||||
enablePlugins(UniversalPlugin)
|
||||
enablePlugins(SystemdPlugin)
|
||||
enablePlugins(DebianPlugin)
|
||||
|
||||
name := "bbb-transcode-akka"
|
||||
|
||||
organization := "org.bigbluebutton"
|
||||
version := "0.0.3"
|
||||
|
||||
version := "0.0.2"
|
||||
val compileSettings = Seq(
|
||||
organization := "org.bigbluebutton",
|
||||
|
||||
scalaVersion := "2.12.6"
|
||||
|
||||
scalacOptions ++= Seq(
|
||||
"-unchecked",
|
||||
"-deprecation",
|
||||
"-Xlint",
|
||||
"-Ywarn-dead-code",
|
||||
"-language:_",
|
||||
"-target:jvm-1.8",
|
||||
"-encoding", "UTF-8"
|
||||
)
|
||||
|
||||
resolvers ++= Seq(
|
||||
"spray repo" at "http://repo.spray.io/",
|
||||
"rediscala" at "http://dl.bintray.com/etaty/maven",
|
||||
"blindside-repos" at "http://blindside.googlecode.com/svn/repository/"
|
||||
scalacOptions ++= List(
|
||||
"-unchecked",
|
||||
"-deprecation",
|
||||
"-Xlint",
|
||||
"-Ywarn-dead-code",
|
||||
"-language:_",
|
||||
"-target:jvm-1.8",
|
||||
"-encoding", "UTF-8"
|
||||
),
|
||||
javacOptions ++= List(
|
||||
"-Xlint:unchecked",
|
||||
"-Xlint:deprecation"
|
||||
)
|
||||
)
|
||||
|
||||
resolvers += Resolver.sonatypeRepo("releases")
|
||||
@ -34,66 +42,16 @@ publishTo := Some(Resolver.file("file", new File(Path.userHome.absolutePath+"/d
|
||||
// into eclipse.
|
||||
retrieveManaged := true
|
||||
|
||||
testOptions in Test += Tests.Argument(TestFrameworks.Specs2, "html", "console", "junitxml")
|
||||
Seq(Revolver.settings: _*)
|
||||
lazy val bbbAppsAkka = (project in file(".")).settings(name := "bbb-apps-akka", libraryDependencies ++= Dependencies.runtime).settings(compileSettings)
|
||||
|
||||
testOptions in Test += Tests.Argument(TestFrameworks.ScalaTest, "-h", "target/scalatest-reports")
|
||||
scalariformAutoformat := false
|
||||
|
||||
val akkaVersion = "2.5.14"
|
||||
val scalaTestV = "3.0.5"
|
||||
|
||||
libraryDependencies ++= {
|
||||
Seq(
|
||||
"ch.qos.logback" % "logback-classic" % "1.2.3" % "runtime",
|
||||
"junit" % "junit" % "4.11",
|
||||
"commons-codec" % "commons-codec" % "1.11",
|
||||
"joda-time" % "joda-time" % "2.3",
|
||||
"org.apache.commons" % "commons-lang3" % "3.7"
|
||||
)}
|
||||
|
||||
libraryDependencies += "org.bigbluebutton" % "bbb-common-message_2.12" % "0.0.19-SNAPSHOT"
|
||||
|
||||
// https://mvnrepository.com/artifact/org.scala-lang/scala-library
|
||||
libraryDependencies += "org.scala-lang" % "scala-library" % scalaVersion.value
|
||||
// https://mvnrepository.com/artifact/org.scala-lang/scala-compiler
|
||||
libraryDependencies += "org.scala-lang" % "scala-compiler" % scalaVersion.value
|
||||
|
||||
// https://mvnrepository.com/artifact/com.typesafe.akka/akka-actor_2.12
|
||||
libraryDependencies += "com.typesafe.akka" % "akka-actor_2.12" % akkaVersion
|
||||
|
||||
// https://mvnrepository.com/artifact/com.typesafe.akka/akka-slf4j_2.12
|
||||
libraryDependencies += "com.typesafe.akka" % "akka-slf4j_2.12" % akkaVersion
|
||||
|
||||
// https://mvnrepository.com/artifact/com.github.etaty/rediscala_2.12
|
||||
libraryDependencies += "com.github.etaty" % "rediscala_2.12" % "1.8.0"
|
||||
|
||||
// For generating test reports
|
||||
libraryDependencies += "org.pegdown" % "pegdown" % "1.6.0" % "test"
|
||||
// https://mvnrepository.com/artifact/com.typesafe.akka/akka-testkit_2.12
|
||||
libraryDependencies += "com.typesafe.akka" % "akka-testkit_2.12" % "2.5.1" % "test"
|
||||
|
||||
// https://mvnrepository.com/artifact/org.scalactic/scalactic_2.12
|
||||
libraryDependencies += "org.scalactic" % "scalactic_2.12" % "3.0.3" % "test"
|
||||
|
||||
// https://mvnrepository.com/artifact/org.scalatest/scalatest_2.12
|
||||
libraryDependencies += "org.scalatest" % "scalatest_2.12" % "3.0.3" % "test"
|
||||
|
||||
libraryDependencies += "org.mockito" % "mockito-core" % "2.7.22" % "test"
|
||||
|
||||
seq(Revolver.settings: _*)
|
||||
|
||||
import com.typesafe.sbt.SbtScalariform
|
||||
|
||||
import scalariform.formatter.preferences._
|
||||
import com.typesafe.sbt.SbtScalariform.ScalariformKeys
|
||||
|
||||
SbtScalariform.defaultScalariformSettings
|
||||
|
||||
ScalariformKeys.preferences := ScalariformKeys.preferences.value
|
||||
scalariformPreferences := scalariformPreferences.value
|
||||
.setPreference(AlignSingleLineCaseStatements, true)
|
||||
.setPreference(DoubleIndentClassDeclaration, true)
|
||||
.setPreference(DoubleIndentConstructorArguments, true)
|
||||
.setPreference(AlignParameters, true)
|
||||
|
||||
|
||||
//-----------
|
||||
// Packaging
|
||||
//
|
||||
@ -119,16 +77,6 @@ daemonUser in Linux := user
|
||||
// group which will execute the application
|
||||
daemonGroup in Linux := group
|
||||
|
||||
mappings in Universal <+= (packageBin in Compile, sourceDirectory ) map { (_, src) =>
|
||||
// Move the application.conf so the user can override settings here
|
||||
val appConf = src / "main" / "resources" / "application.conf"
|
||||
appConf -> "conf/application.conf"
|
||||
}
|
||||
|
||||
mappings in Universal <+= (packageBin in Compile, sourceDirectory ) map { (_, src) =>
|
||||
// Move logback.xml so the user can override settings here
|
||||
val logConf = src / "main" / "resources" / "logback.xml"
|
||||
logConf -> "conf/logback.xml"
|
||||
}
|
||||
javaOptions in Universal ++= Seq("-J-Xms130m", "-J-Xmx256m", "-Dconfig.file=conf/application.conf", "-Dlogback.configurationFile=conf/logback.xml")
|
||||
|
||||
debianPackageDependencies in Debian ++= Seq("java7-runtime-headless", "bash")
|
||||
|
51
akka-bbb-transcode/project/Dependencies.scala
Normal file
51
akka-bbb-transcode/project/Dependencies.scala
Normal file
@ -0,0 +1,51 @@
|
||||
package org.bigbluebutton.build
|
||||
|
||||
import sbt._
|
||||
import Keys._
|
||||
|
||||
object Dependencies {
|
||||
|
||||
object Versions {
|
||||
// Scala
|
||||
val scala = "2.12.8"
|
||||
val junitInterface = "0.11"
|
||||
val scalactic = "3.0.3"
|
||||
|
||||
// Libraries
|
||||
val akkaVersion = "2.5.19"
|
||||
val logback = "1.2.3"
|
||||
|
||||
// Apache Commons
|
||||
val lang = "3.8.1"
|
||||
val codec = "1.11"
|
||||
|
||||
// BigBlueButton
|
||||
val bbbCommons = "0.0.20-SNAPSHOT"
|
||||
}
|
||||
|
||||
object Compile {
|
||||
val scalaLibrary = "org.scala-lang" % "scala-library" % Versions.scala
|
||||
val scalaCompiler = "org.scala-lang" % "scala-compiler" % Versions.scala
|
||||
|
||||
val akkaActor = "com.typesafe.akka" % "akka-actor_2.12" % Versions.akkaVersion
|
||||
val akkaSl4fj = "com.typesafe.akka" % "akka-slf4j_2.12" % Versions.akkaVersion
|
||||
|
||||
val logback = "ch.qos.logback" % "logback-classic" % Versions.logback
|
||||
val commonsCodec = "commons-codec" % "commons-codec" % Versions.codec
|
||||
|
||||
val apacheLang = "org.apache.commons" % "commons-lang3" % Versions.lang
|
||||
|
||||
val bbbCommons = "org.bigbluebutton" % "bbb-common-message_2.12" % Versions.bbbCommons excludeAll (
|
||||
ExclusionRule(organization = "org.red5"))
|
||||
}
|
||||
|
||||
val runtime = Seq(
|
||||
Compile.scalaLibrary,
|
||||
Compile.scalaCompiler,
|
||||
Compile.akkaActor,
|
||||
Compile.akkaSl4fj,
|
||||
Compile.logback,
|
||||
Compile.commonsCodec,
|
||||
Compile.apacheLang,
|
||||
Compile.bbbCommons)
|
||||
}
|
@ -1 +1 @@
|
||||
sbt.version=0.13.8
|
||||
sbt.version=1.2.7
|
@ -1,11 +1,11 @@
|
||||
addSbtPlugin("io.spray" % "sbt-revolver" % "0.9.1")
|
||||
|
||||
addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "5.2.4")
|
||||
|
||||
addSbtPlugin("org.scalariform" % "sbt-scalariform" % "1.8.2")
|
||||
|
||||
addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "2.2.0")
|
||||
addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.3.15")
|
||||
|
||||
addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.3.6")
|
||||
|
||||
addSbtPlugin("net.vonbuchholtz" % "sbt-dependency-check" % "0.2.7")
|
||||
addSbtPlugin("net.vonbuchholtz" % "sbt-dependency-check" % "0.2.9")
|
||||
|
||||
addSbtPlugin("org.scalastyle" %% "scalastyle-sbt-plugin" % "1.0.0")
|
||||
|
6
akka-bbb-transcode/run.sh
Executable file
6
akka-bbb-transcode/run.sh
Executable file
@ -0,0 +1,6 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
sbt clean stage
|
||||
sudo service bbb-transcode-akka stop
|
||||
cd target/universal/stage
|
||||
./bin/bbb-transcode-akka
|
@ -1,21 +1,24 @@
|
||||
package org.bigbluebutton
|
||||
|
||||
import akka.actor.ActorSystem
|
||||
|
||||
import org.bigbluebutton.endpoint.redis.{ RedisPublisher, AppsRedisSubscriberActor }
|
||||
import org.bigbluebutton.common2.bus.IncomingJsonMessageBus
|
||||
import org.bigbluebutton.common2.redis.RedisPublisher
|
||||
import org.bigbluebutton.endpoint.redis.AppsRedisSubscriberActor
|
||||
import org.bigbluebutton.transcode.JsonMsgHdlrActor
|
||||
|
||||
import akka.actor.ActorSystem
|
||||
import org.bigbluebutton.common2.redis.MessageSender
|
||||
import org.bigbluebutton.transcode.core.TranscodingInGW
|
||||
import org.bigbluebutton.transcode.bus.InJsonMsgBus
|
||||
|
||||
object Boot extends App with SystemConfiguration {
|
||||
|
||||
implicit val system = ActorSystem("bigbluebutton-transcode-system")
|
||||
|
||||
val redisPublisher = new RedisPublisher(system)
|
||||
val redisPublisher = new RedisPublisher(system, "BbbTranscodeAkkaPub")
|
||||
val msgSender = new MessageSender(redisPublisher)
|
||||
|
||||
var inGW = new TranscodingInGW(system, redisPublisher)
|
||||
var inGW = new TranscodingInGW(system, msgSender)
|
||||
|
||||
val inJsonMsgBus = new InJsonMsgBus
|
||||
val inJsonMsgBus = new IncomingJsonMessageBus
|
||||
val redisMessageHandlerActor = system.actorOf(JsonMsgHdlrActor.props(inGW))
|
||||
inJsonMsgBus.subscribe(redisMessageHandlerActor, toAkkaTranscodeJsonChannel)
|
||||
|
||||
|
@ -1,25 +1,14 @@
|
||||
package org.bigbluebutton
|
||||
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import scala.util.Try
|
||||
|
||||
trait SystemConfiguration {
|
||||
|
||||
val config = ConfigFactory.load()
|
||||
|
||||
lazy val redisHost = Try(config.getString("redis.host")).getOrElse("127.0.0.1")
|
||||
lazy val redisPort = Try(config.getInt("redis.port")).getOrElse(6379)
|
||||
lazy val redisPassword = Try(config.getString("redis.password")).getOrElse("")
|
||||
import org.bigbluebutton.common2.redis.RedisConfiguration
|
||||
|
||||
trait SystemConfiguration extends RedisConfiguration {
|
||||
lazy val _ffmpegPath = Try(config.getString("transcoder.ffmpeg-path")).getOrElse("/usr/local/bin/ffmpeg")
|
||||
lazy val _ffprobePath = Try(config.getString("transcoder.ffprobe-path")).getOrElse("/usr/local/bin/ffprobe")
|
||||
|
||||
lazy val _videoconfLogoImagePath = Try(config.getString("videoconference.videoconf-logo-image-path")).getOrElse("")
|
||||
lazy val _enableUserVideoSubtitle = Try(config.getString("videoconference.enable-user-video-subtitle").toBoolean).getOrElse(false)
|
||||
lazy val _sipVideoResolution = Try(config.getString("videoconference.sip-video-resolution")).getOrElse("")
|
||||
|
||||
lazy val toAkkaTranscodeRedisChannel = Try(config.getString("redis.toAkkaTranscodeRedisChannel")).getOrElse("bigbluebutton:to-bbb-transcode:system")
|
||||
lazy val fromAkkaTranscodeRedisChannel = Try(config.getString("redis.fromAkkaTranscodeRedisChannel")).getOrElse("bigbluebutton:from-bbb-transcode:system")
|
||||
lazy val toAkkaTranscodeJsonChannel = Try(config.getString("eventBus.toAkkaTranscodeJsonChannel")).getOrElse("to-akka-transcode-json-channel")
|
||||
lazy val fromAkkaTranscodeJsonChannel = Try(config.getString("eventBus.fromAkkaTranscodeJsonChannel")).getOrElse("from-akka-transcode-json-channel")
|
||||
}
|
||||
|
@ -1,60 +1,38 @@
|
||||
package org.bigbluebutton.endpoint.redis
|
||||
|
||||
import java.io.PrintWriter
|
||||
import java.io.StringWriter
|
||||
import java.net.InetSocketAddress
|
||||
|
||||
import scala.concurrent.ExecutionContext.Implicits.global
|
||||
import scala.concurrent.duration.DurationInt
|
||||
|
||||
import akka.actor.ActorSystem
|
||||
import akka.actor.OneForOneStrategy
|
||||
import akka.actor.Props
|
||||
import akka.actor.SupervisorStrategy.Resume
|
||||
|
||||
import redis.actors.RedisSubscriberActor
|
||||
import redis.api.pubsub.Message
|
||||
import redis.api.pubsub.PMessage
|
||||
import redis.api.servers.ClientSetname
|
||||
|
||||
import org.bigbluebutton.SystemConfiguration
|
||||
import org.bigbluebutton.transcode.bus.{ InJsonMsg, InJsonMsgBus, ReceivedJsonMsg }
|
||||
import org.bigbluebutton.common2.bus.IncomingJsonMessageBus
|
||||
import org.bigbluebutton.common2.redis.{ RedisSubscriber, RedisSubscriberProvider }
|
||||
|
||||
object AppsRedisSubscriberActor extends SystemConfiguration {
|
||||
import akka.actor.ActorSystem
|
||||
import akka.actor.Props
|
||||
|
||||
object AppsRedisSubscriberActor extends RedisSubscriber {
|
||||
|
||||
val channels = Seq(toAkkaTranscodeRedisChannel)
|
||||
val patterns = Seq("bigbluebutton:to-bbb-transcode:*")
|
||||
|
||||
def props(system: ActorSystem, msgBus: InJsonMsgBus): Props =
|
||||
Props(classOf[AppsRedisSubscriberActor], system, msgBus,
|
||||
def props(system: ActorSystem, jsonMsgBus: IncomingJsonMessageBus): Props =
|
||||
Props(
|
||||
classOf[AppsRedisSubscriberActor],
|
||||
system, jsonMsgBus,
|
||||
redisHost, redisPort,
|
||||
channels, patterns).withDispatcher("akka.rediscala-subscriber-worker-dispatcher")
|
||||
channels, patterns).withDispatcher("akka.redis-subscriber-worker-dispatcher")
|
||||
}
|
||||
|
||||
class AppsRedisSubscriberActor(
|
||||
val system: ActorSystem,
|
||||
msgBus: InJsonMsgBus, redisHost: String,
|
||||
system: ActorSystem,
|
||||
msgBus: IncomingJsonMessageBus, redisHost: String,
|
||||
redisPort: Int,
|
||||
channels: Seq[String] = Nil, patterns: Seq[String] = Nil)
|
||||
extends RedisSubscriberActor(
|
||||
new InetSocketAddress(redisHost, redisPort),
|
||||
channels, patterns, onConnectStatus = connected => { println(s"connected: $connected") }) with SystemConfiguration {
|
||||
|
||||
override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
|
||||
case e: Exception => {
|
||||
val sw: StringWriter = new StringWriter()
|
||||
sw.write("An exception has been thrown on AppsRedisSubscriberActor, exception message [" + e.getMessage() + "] (full stacktrace below)\n")
|
||||
e.printStackTrace(new PrintWriter(sw))
|
||||
log.error(sw.toString())
|
||||
Resume
|
||||
}
|
||||
}
|
||||
extends RedisSubscriberProvider(system, "BbbTranscodeAkkaSub", channels, patterns, msgBus) with SystemConfiguration {
|
||||
|
||||
var lastPongReceivedOn = 0L
|
||||
system.scheduler.schedule(10 seconds, 10 seconds)(checkPongMessage())
|
||||
|
||||
write(ClientSetname("BbbTranscodeAkkaSub").encodedRequest)
|
||||
|
||||
def checkPongMessage() {
|
||||
val now = System.currentTimeMillis()
|
||||
|
||||
@ -63,19 +41,6 @@ class AppsRedisSubscriberActor(
|
||||
}
|
||||
}
|
||||
|
||||
def onMessage(message: Message) {
|
||||
if (message.channel == toAkkaTranscodeRedisChannel) {
|
||||
val receivedJsonMessage = new ReceivedJsonMsg(message.channel, message.data.utf8String)
|
||||
log.debug(s"RECEIVED:\n [${receivedJsonMessage.channel}] \n ${receivedJsonMessage.data} \n")
|
||||
msgBus.publish(InJsonMsg(toAkkaTranscodeJsonChannel, receivedJsonMessage))
|
||||
}
|
||||
}
|
||||
|
||||
def onPMessage(pmessage: PMessage) {
|
||||
// log.debug(s"pattern message received: $pmessage")
|
||||
}
|
||||
|
||||
def handleMessage(msg: String) {
|
||||
log.warning("**** TODO: Handle pubsub messages. ****")
|
||||
}
|
||||
addListener(toAkkaTranscodeJsonChannel)
|
||||
subscribe()
|
||||
}
|
||||
|
@ -1,31 +0,0 @@
|
||||
package org.bigbluebutton.endpoint.redis
|
||||
|
||||
import redis.RedisClient
|
||||
import scala.concurrent.duration._
|
||||
import scala.concurrent.ExecutionContext.Implicits.global
|
||||
import akka.actor.ActorSystem
|
||||
import org.bigbluebutton.SystemConfiguration
|
||||
//import org.bigbluebutton.common.converters.ToJsonEncoder
|
||||
|
||||
class RedisPublisher(val system: ActorSystem) extends SystemConfiguration {
|
||||
|
||||
val redis = RedisClient(redisHost, redisPort)(system)
|
||||
|
||||
// Set the name of this client to be able to distinguish when doing
|
||||
// CLIENT LIST on redis-cli
|
||||
redis.clientSetname("BbbTranscodeAkkaPub")
|
||||
|
||||
//val encoder = new ToJsonEncoder()
|
||||
//def sendPingMessage() {
|
||||
// val json = encoder.encodePubSubPingMessage("BbbTranscode", System.currentTimeMillis())
|
||||
// redis.publish("bigbluebutton:to-bbb-apps:system", json)
|
||||
//}
|
||||
|
||||
//system.scheduler.schedule(10 seconds, 10 seconds)(sendPingMessage())
|
||||
|
||||
def publish(channel: String, data: String) {
|
||||
//println("PUBLISH TO [" + channel + "]: \n [" + data + "]")
|
||||
redis.publish(channel, data)
|
||||
}
|
||||
|
||||
}
|
@ -1,13 +1,15 @@
|
||||
package org.bigbluebutton.transcode
|
||||
|
||||
import akka.actor.{ Actor, ActorLogging, Props }
|
||||
import org.bigbluebutton.SystemConfiguration
|
||||
import org.bigbluebutton.common2.bus.ReceivedJsonMessage
|
||||
import org.bigbluebutton.common2.msgs._
|
||||
import org.bigbluebutton.transcode.core.TranscodingInGW
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode
|
||||
|
||||
import org.bigbluebutton.SystemConfiguration
|
||||
import org.bigbluebutton.common2.msgs._
|
||||
import org.bigbluebutton.transcode.bus.ReceivedJsonMsg
|
||||
import org.bigbluebutton.transcode.core.TranscodingInGW
|
||||
import akka.actor.Actor
|
||||
import akka.actor.ActorLogging
|
||||
import akka.actor.Props
|
||||
|
||||
object JsonMsgHdlrActor {
|
||||
def props(inGW: TranscodingInGW): Props = Props(classOf[JsonMsgHdlrActor], inGW)
|
||||
@ -16,13 +18,13 @@ object JsonMsgHdlrActor {
|
||||
class JsonMsgHdlrActor(val inGW: TranscodingInGW) extends Actor with ActorLogging
|
||||
with SystemConfiguration with JsonMsgDeserializer {
|
||||
def receive = {
|
||||
case msg: ReceivedJsonMsg =>
|
||||
case msg: ReceivedJsonMessage =>
|
||||
log.debug("handling {} - {}", msg.channel, msg.data)
|
||||
handleReceivedJsonMessage(msg)
|
||||
case _ => // do nothing
|
||||
}
|
||||
|
||||
def handleReceivedJsonMessage(msg: ReceivedJsonMsg): Unit = {
|
||||
def handleReceivedJsonMessage(msg: ReceivedJsonMessage): Unit = {
|
||||
for {
|
||||
envJsonNode <- JsonDeserializer.toBbbCommonEnvJsNodeMsg(msg.data)
|
||||
} yield handle(envJsonNode.envelope, envJsonNode.core)
|
||||
|
@ -1,31 +0,0 @@
|
||||
package org.bigbluebutton.transcode.bus
|
||||
|
||||
import akka.actor.ActorRef
|
||||
import akka.event.{ EventBus, LookupClassification }
|
||||
|
||||
case class ReceivedJsonMsg(channel: String, data: String)
|
||||
case class InJsonMsg(val topic: String, val payload: ReceivedJsonMsg)
|
||||
|
||||
class InJsonMsgBus extends EventBus with LookupClassification {
|
||||
type Event = InJsonMsg
|
||||
type Classifier = String
|
||||
type Subscriber = ActorRef
|
||||
|
||||
// is used for extracting the classifier from the incoming events
|
||||
override protected def classify(event: Event): Classifier = event.topic
|
||||
|
||||
// will be invoked for each event for all subscribers which registered themselves
|
||||
// for the event’s classifier
|
||||
override protected def publish(event: Event, subscriber: Subscriber): Unit = {
|
||||
subscriber ! event.payload
|
||||
}
|
||||
|
||||
// must define a full order over the subscribers, expressed as expected from
|
||||
// `java.lang.Comparable.compare`
|
||||
override protected def compareSubscribers(a: Subscriber, b: Subscriber): Int =
|
||||
a.compareTo(b)
|
||||
|
||||
// determines the initial size of the index data structure
|
||||
// used internally (i.e. the expected number of different classifiers)
|
||||
override protected def mapSize: Int = 128
|
||||
}
|
@ -4,16 +4,17 @@ import akka.actor.Actor
|
||||
import akka.actor.ActorLogging
|
||||
import akka.actor.Props
|
||||
import org.bigbluebutton.transcode.api._
|
||||
import org.bigbluebutton.endpoint.redis.RedisPublisher
|
||||
import org.bigbluebutton.common2.msgs._
|
||||
import org.bigbluebutton.common2.util.JsonUtil
|
||||
import org.bigbluebutton.common2.redis.MessageSender
|
||||
|
||||
|
||||
object MessageSenderActor {
|
||||
def props(msgSender: RedisPublisher): Props =
|
||||
def props(msgSender: MessageSender): Props =
|
||||
Props(classOf[MessageSenderActor], msgSender)
|
||||
}
|
||||
|
||||
class MessageSenderActor(val msgSender: RedisPublisher)
|
||||
class MessageSenderActor(val msgSender: MessageSender)
|
||||
extends Actor with ActorLogging {
|
||||
|
||||
val fromBbbTranscodeRedisChannel = "bigbluebutton:from-bbb-transcode:system"
|
||||
@ -39,7 +40,7 @@ class MessageSenderActor(val msgSender: RedisPublisher)
|
||||
val evt = new StartTranscoderSysRespMsg(header, body)
|
||||
val msgEvent = BbbCommonEnvCoreMsg(envelope, evt)
|
||||
val json = JsonUtil.toJson(msgEvent)
|
||||
msgSender.publish(fromBbbTranscodeRedisChannel, json)
|
||||
msgSender.send(fromBbbTranscodeRedisChannel, json)
|
||||
}
|
||||
|
||||
private def handleStopTranscoderReply(msg: StopTranscoderReply) {
|
||||
@ -52,7 +53,7 @@ class MessageSenderActor(val msgSender: RedisPublisher)
|
||||
val evt = new StopTranscoderSysRespMsg(header, body)
|
||||
val msgEvent = BbbCommonEnvCoreMsg(envelope, evt)
|
||||
val json = JsonUtil.toJson(msgEvent)
|
||||
msgSender.publish(fromBbbTranscodeRedisChannel, json)
|
||||
msgSender.send(fromBbbTranscodeRedisChannel, json)
|
||||
}
|
||||
|
||||
private def handleUpdateTranscoderReply(msg: UpdateTranscoderReply) {
|
||||
@ -66,7 +67,7 @@ class MessageSenderActor(val msgSender: RedisPublisher)
|
||||
val evt = new UpdateTranscoderSysRespMsg(header, body)
|
||||
val msgEvent = BbbCommonEnvCoreMsg(envelope, evt)
|
||||
val json = JsonUtil.toJson(msgEvent)
|
||||
msgSender.publish(fromBbbTranscodeRedisChannel, json)
|
||||
msgSender.send(fromBbbTranscodeRedisChannel, json)
|
||||
}
|
||||
|
||||
private def handleStartProbingReply(msg: StartProbingReply) {
|
||||
@ -80,7 +81,7 @@ class MessageSenderActor(val msgSender: RedisPublisher)
|
||||
val evt = new StartProbingSysRespMsg(header, body)
|
||||
val msgEvent = BbbCommonEnvCoreMsg(envelope, evt)
|
||||
val json = JsonUtil.toJson(msgEvent)
|
||||
msgSender.publish(fromBbbTranscodeRedisChannel, json)
|
||||
msgSender.send(fromBbbTranscodeRedisChannel, json)
|
||||
}
|
||||
|
||||
private def handleTranscoderStatusUpdate(msg: TranscoderStatusUpdate) {
|
||||
|
@ -2,17 +2,17 @@ package org.bigbluebutton.transcode.core
|
||||
|
||||
import akka.actor._
|
||||
import akka.actor.ActorLogging
|
||||
import org.bigbluebutton.endpoint.redis.RedisPublisher
|
||||
import org.bigbluebutton.transcode.api._
|
||||
import org.bigbluebutton.SystemConfiguration
|
||||
import org.bigbluebutton.transcode.core.apps.TranscodingObserverApp
|
||||
import org.bigbluebutton.common2.redis.MessageSender
|
||||
|
||||
object TranscodingActor extends SystemConfiguration {
|
||||
def props(system: ActorSystem, messageSender: RedisPublisher): Props =
|
||||
def props(system: ActorSystem, messageSender: MessageSender): Props =
|
||||
Props(classOf[TranscodingActor], system, messageSender)
|
||||
}
|
||||
|
||||
class TranscodingActor(val system: ActorSystem, messageSender: RedisPublisher)
|
||||
class TranscodingActor(val system: ActorSystem, messageSender: MessageSender)
|
||||
extends Actor with ActorLogging with TranscodingObserverApp {
|
||||
val transcodersModel = new TranscodersModel()
|
||||
|
||||
|
@ -1,11 +1,11 @@
|
||||
package org.bigbluebutton.transcode.core
|
||||
|
||||
import org.bigbluebutton.common2.redis.MessageSender
|
||||
import org.bigbluebutton.transcode.api._
|
||||
|
||||
import akka.actor.ActorSystem
|
||||
|
||||
import org.bigbluebutton.endpoint.redis.RedisPublisher
|
||||
import org.bigbluebutton.transcode.api._
|
||||
|
||||
class TranscodingInGW(val system: ActorSystem, messageSender: RedisPublisher) {
|
||||
class TranscodingInGW(val system: ActorSystem, messageSender: MessageSender) {
|
||||
val log = system.log
|
||||
val transcodingActor = system.actorOf(TranscodingActor.props(system, messageSender), "bbb-transcoding-manager")
|
||||
|
||||
|
@ -8,7 +8,7 @@ akka {
|
||||
loglevel = INFO
|
||||
stdout-loglevel = "INFO"
|
||||
|
||||
rediscala-subscriber-worker-dispatcher {
|
||||
redis-subscriber-worker-dispatcher {
|
||||
mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox"
|
||||
# Throughput defines the maximum number of messages to be
|
||||
# processed per actor before the thread jumps to the next actor.
|
@ -21,6 +21,7 @@
|
||||
<logger name="akka" level="INFO" />
|
||||
<logger name="org.bigbluebutton" level="DEBUG" />
|
||||
<logger name="org.freeswitch.transcode" level="WARN" />
|
||||
<logger name="io.lettuce" level="INFO" />
|
||||
|
||||
<root level="INFO">
|
||||
<appender-ref ref="STDOUT"/>
|
@ -7,10 +7,10 @@ object Dependencies {
|
||||
|
||||
object Versions {
|
||||
// Scala
|
||||
val scala = "2.12.7"
|
||||
val scala = "2.12.8"
|
||||
|
||||
// Libraries
|
||||
val akkaVersion = "2.5.17"
|
||||
val akkaVersion = "2.5.19"
|
||||
val gson = "2.8.5"
|
||||
val sl4j = "1.7.25"
|
||||
val quicklens = "1.4.11"
|
||||
|
@ -1 +1 @@
|
||||
sbt.version=1.2.6
|
||||
sbt.version=1.2.7
|
@ -2,8 +2,8 @@ addSbtPlugin("io.spray" % "sbt-revolver" % "0.9.1")
|
||||
|
||||
addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "5.2.4")
|
||||
|
||||
addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.1.1")
|
||||
addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.1.2-1")
|
||||
|
||||
addSbtPlugin("net.vonbuchholtz" % "sbt-dependency-check" % "0.2.8")
|
||||
addSbtPlugin("net.vonbuchholtz" % "sbt-dependency-check" % "0.2.9")
|
||||
|
||||
addSbtPlugin("org.scalastyle" %% "scalastyle-sbt-plugin" % "1.0.0")
|
||||
|
@ -7,13 +7,13 @@ object Dependencies {
|
||||
|
||||
object Versions {
|
||||
// Scala
|
||||
val scala = "2.12.7"
|
||||
val scala = "2.12.8"
|
||||
val junit = "4.12"
|
||||
val junitInterface = "0.11"
|
||||
val scalactic = "3.0.3"
|
||||
|
||||
// Libraries
|
||||
val akkaVersion = "2.5.17"
|
||||
val akkaVersion = "2.5.19"
|
||||
val gson = "2.8.5"
|
||||
val jackson = "2.9.7"
|
||||
val sl4j = "1.7.25"
|
||||
|
@ -1 +1 @@
|
||||
sbt.version=1.2.6
|
||||
sbt.version=1.2.7
|
@ -2,8 +2,8 @@ addSbtPlugin("io.spray" % "sbt-revolver" % "0.9.1")
|
||||
|
||||
addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "5.2.4")
|
||||
|
||||
addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.1.1")
|
||||
addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.1.2-1")
|
||||
|
||||
addSbtPlugin("net.vonbuchholtz" % "sbt-dependency-check" % "0.2.8")
|
||||
addSbtPlugin("net.vonbuchholtz" % "sbt-dependency-check" % "0.2.9")
|
||||
|
||||
addSbtPlugin("org.scalastyle" %% "scalastyle-sbt-plugin" % "1.0.0")
|
||||
|
@ -24,4 +24,9 @@ trait RedisConfiguration {
|
||||
lazy val fromAkkaAppsPresRedisChannel = Try(config.getString("redis.fromAkkaAppsPresRedisChannel")).getOrElse("from-akka-apps-pres-redis-channel")
|
||||
|
||||
lazy val fromBbbWebRedisChannel = Try(config.getString("redis.fromBbbWebRedisChannel")).getOrElse("from-bbb-web-redis-channel")
|
||||
|
||||
lazy val toAkkaTranscodeRedisChannel = Try(config.getString("redis.toAkkaTranscodeRedisChannel")).getOrElse("bigbluebutton:to-bbb-transcode:system")
|
||||
lazy val fromAkkaTranscodeRedisChannel = Try(config.getString("redis.fromAkkaTranscodeRedisChannel")).getOrElse("bigbluebutton:from-bbb-transcode:system")
|
||||
lazy val toAkkaTranscodeJsonChannel = Try(config.getString("eventBus.toAkkaTranscodeJsonChannel")).getOrElse("to-akka-transcode-json-channel")
|
||||
lazy val fromAkkaTranscodeJsonChannel = Try(config.getString("eventBus.fromAkkaTranscodeJsonChannel")).getOrElse("from-akka-transcode-json-channel")
|
||||
}
|
||||
|
@ -7,13 +7,13 @@ object Dependencies {
|
||||
|
||||
object Versions {
|
||||
// Scala
|
||||
val scala = "2.12.7"
|
||||
val scala = "2.12.8"
|
||||
val junit = "4.12"
|
||||
val junitInterface = "0.11"
|
||||
val scalactic = "3.0.3"
|
||||
|
||||
// Libraries
|
||||
val akkaVersion = "2.5.17"
|
||||
val akkaVersion = "2.5.19"
|
||||
val gson = "2.8.5"
|
||||
val jackson = "2.9.7"
|
||||
val freemaker = "2.3.28"
|
||||
|
@ -1 +1 @@
|
||||
sbt.version=1.2.6
|
||||
sbt.version=1.2.7
|
@ -2,8 +2,8 @@ addSbtPlugin("io.spray" % "sbt-revolver" % "0.9.1")
|
||||
|
||||
addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "5.2.4")
|
||||
|
||||
addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.1.1")
|
||||
addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.1.2-1")
|
||||
|
||||
addSbtPlugin("net.vonbuchholtz" % "sbt-dependency-check" % "0.2.7")
|
||||
addSbtPlugin("net.vonbuchholtz" % "sbt-dependency-check" % "0.2.9")
|
||||
|
||||
addSbtPlugin("org.scalastyle" %% "scalastyle-sbt-plugin" % "1.0.0")
|
||||
|
@ -7,7 +7,7 @@ object Dependencies {
|
||||
|
||||
object Versions {
|
||||
// Scala
|
||||
val scala = "2.12.7"
|
||||
val scala = "2.12.8"
|
||||
|
||||
// Libraries
|
||||
val netty = "3.2.10.Final"
|
||||
|
@ -1 +1 @@
|
||||
sbt.version=1.2.6
|
||||
sbt.version=1.2.7
|
@ -2,8 +2,8 @@ addSbtPlugin("io.spray" % "sbt-revolver" % "0.9.1")
|
||||
|
||||
addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "5.2.4")
|
||||
|
||||
addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.1.1")
|
||||
addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.1.2-1")
|
||||
|
||||
addSbtPlugin("net.vonbuchholtz" % "sbt-dependency-check" % "0.2.8")
|
||||
addSbtPlugin("net.vonbuchholtz" % "sbt-dependency-check" % "0.2.9")
|
||||
|
||||
addSbtPlugin("org.scalastyle" %% "scalastyle-sbt-plugin" % "1.0.0")
|
||||
|
@ -7,12 +7,12 @@ object Dependencies {
|
||||
|
||||
object Versions {
|
||||
// Scala
|
||||
val scala = "2.12.7"
|
||||
val scala = "2.12.8"
|
||||
val junitInterface = "0.11"
|
||||
val scalactic = "3.0.3"
|
||||
|
||||
// Libraries
|
||||
val akkaVersion = "2.5.17"
|
||||
val akkaVersion = "2.5.19"
|
||||
val gson = "2.8.5"
|
||||
val jackson = "2.9.7"
|
||||
val logback = "1.2.3"
|
||||
|
@ -1 +1 @@
|
||||
sbt.version=1.2.6
|
||||
sbt.version=1.2.7
|
@ -6,7 +6,7 @@ addSbtPlugin("org.scalariform" % "sbt-scalariform" % "1.8.2")
|
||||
|
||||
addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.3.12")
|
||||
|
||||
addSbtPlugin("net.vonbuchholtz" % "sbt-dependency-check" % "0.2.8")
|
||||
addSbtPlugin("net.vonbuchholtz" % "sbt-dependency-check" % "0.2.9")
|
||||
|
||||
addSbtPlugin("org.scalastyle" %% "scalastyle-sbt-plugin" % "1.0.0")
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user