diff --git a/akka-bbb-apps/src/main/java/org/bigbluebutton/core/api/IBigBlueButtonInGW.java b/akka-bbb-apps/src/main/java/org/bigbluebutton/core/api/IBigBlueButtonInGW.java index 4ec828cf8a..e5c8178e6c 100755 --- a/akka-bbb-apps/src/main/java/org/bigbluebutton/core/api/IBigBlueButtonInGW.java +++ b/akka-bbb-apps/src/main/java/org/bigbluebutton/core/api/IBigBlueButtonInGW.java @@ -125,4 +125,11 @@ public interface IBigBlueButtonInGW { void destroyAdditionalNotes(String meetingID, String requesterID, String noteID); void requestAdditionalNotesSet(String meetingID, String requesterID, int additionalNotesSetSize); void sharedNotesSyncNoteRequest(String meetingID, String requesterID, String noteID); + + //Transcode + void startTranscoderReply(String meetingId, String transcoderId, Map params); + void updateTranscoderReply(String meetingId, String transcoderId, Map params); + void stopTranscoderReply(String meetingId, String transcoderId); + void transcoderStatusUpdate(String meetingId, String transcoderId, Map params); + void startProbingReply(String meetingId, String transcoderId, Map params); } diff --git a/akka-bbb-apps/src/main/java/org/bigbluebutton/core/pubsub/receivers/UsersMessageReceiver.java b/akka-bbb-apps/src/main/java/org/bigbluebutton/core/pubsub/receivers/UsersMessageReceiver.java index a930ae4784..fbdd813457 100755 --- a/akka-bbb-apps/src/main/java/org/bigbluebutton/core/pubsub/receivers/UsersMessageReceiver.java +++ b/akka-bbb-apps/src/main/java/org/bigbluebutton/core/pubsub/receivers/UsersMessageReceiver.java @@ -24,6 +24,11 @@ import org.bigbluebutton.common.messages.RespondToGuestMessage; import org.bigbluebutton.common.messages.SetGuestPolicyMessage; import org.bigbluebutton.common.messages.SetRecordingStatusRequestMessage; import org.bigbluebutton.common.messages.SetUserStatusRequestMessage; +import org.bigbluebutton.common.messages.StartTranscoderReplyMessage; +import org.bigbluebutton.common.messages.StartProbingReplyMessage; +import org.bigbluebutton.common.messages.UpdateTranscoderReplyMessage; +import org.bigbluebutton.common.messages.StopTranscoderReplyMessage; +import org.bigbluebutton.common.messages.TranscoderStatusUpdateMessage; import org.bigbluebutton.common.messages.UserJoinedVoiceConfMessage; import org.bigbluebutton.common.messages.UserLeavingMessage; import org.bigbluebutton.common.messages.UserLeftVoiceConfMessage; @@ -171,6 +176,33 @@ public class UsersMessageReceiver implements MessageHandler{ } } } + } else if (channel.equalsIgnoreCase(MessagingConstants.FROM_BBB_TRANSCODE_SYSTEM_CHAN)) { + JsonParser parser = new JsonParser(); + JsonObject obj = (JsonObject) parser.parse(message); + if (obj.has("header") && obj.has("payload")) { + JsonObject header = (JsonObject) obj.get("header"); + + if (header.has("name")) { + String messageName = header.get("name").getAsString(); + switch (messageName) { + case StartTranscoderReplyMessage.START_TRANSCODER_REPLY: + processStartTranscoderReplyMessage(message); + break; + case UpdateTranscoderReplyMessage.UPDATE_TRANSCODER_REPLY: + processUpdateTranscoderReplyMessage(message); + break; + case StopTranscoderReplyMessage.STOP_TRANSCODER_REPLY: + processStopTranscoderReplyMessage(message); + break; + case TranscoderStatusUpdateMessage.TRANSCODER_STATUS_UPDATE: + processTranscoderStatusUpdateMessage(message); + break; + case StartProbingReplyMessage.START_PROBING_REPLY: + processStartProbingReplyMessage(message); + break; + } + } + } } } @@ -397,4 +429,39 @@ public class UsersMessageReceiver implements MessageHandler{ bbbInGW.logoutEndMeeting(lemm.meetingId, lemm.userId); } } + + private void processStartTranscoderReplyMessage(String message) { + StartTranscoderReplyMessage msg = StartTranscoderReplyMessage.fromJson(message); + if(msg !=null){ + bbbInGW.startTranscoderReply(msg.meetingId, msg.transcoderId, msg.params); + } + } + + private void processUpdateTranscoderReplyMessage(String message) { + UpdateTranscoderReplyMessage msg = UpdateTranscoderReplyMessage.fromJson(message); + if(msg !=null){ + bbbInGW.updateTranscoderReply(msg.meetingId, msg.transcoderId, msg.params); + } + } + + private void processStopTranscoderReplyMessage(String message) { + StopTranscoderReplyMessage msg = StopTranscoderReplyMessage.fromJson(message); + if(msg !=null){ + bbbInGW.stopTranscoderReply(msg.meetingId, msg.transcoderId); + } + } + + private void processTranscoderStatusUpdateMessage(String message) { + TranscoderStatusUpdateMessage msg = TranscoderStatusUpdateMessage.fromJson(message); + if(msg !=null){ + bbbInGW.transcoderStatusUpdate(msg.meetingId, msg.transcoderId, msg.params); + } + } + + private void processStartProbingReplyMessage(String message) { + StartProbingReplyMessage msg = StartProbingReplyMessage.fromJson(message); + if (msg != null){ + bbbInGW.startProbingReply(msg.meetingId, msg.transcoderId, msg.params); + } + } } diff --git a/akka-bbb-apps/src/main/scala/org/bigbluebutton/core/BigBlueButtonInGW.scala b/akka-bbb-apps/src/main/scala/org/bigbluebutton/core/BigBlueButtonInGW.scala index 504fd889f9..b865584e68 100755 --- a/akka-bbb-apps/src/main/scala/org/bigbluebutton/core/BigBlueButtonInGW.scala +++ b/akka-bbb-apps/src/main/scala/org/bigbluebutton/core/BigBlueButtonInGW.scala @@ -541,4 +541,30 @@ class BigBlueButtonInGW(val system: ActorSystem, recorderApp: RecorderApplicatio def sharedNotesSyncNoteRequest(meetingId: String, userId: String, noteId: String) { bbbActor ! new SharedNotesSyncNoteRequest(meetingId, userId, noteId) } + + /** + * ******************************************************************* + * Message Interface for transcode + * ***************************************************************** + */ + + def startTranscoderReply(meetingId: String, transcoderId: String, params: java.util.Map[String, String]) { + bbbActor ! new StartTranscoderReply(meetingId, transcoderId, mapAsScalaMap(params).toMap) + } + + def updateTranscoderReply(meetingId: String, transcoderId: String, params: java.util.Map[String, String]) { + bbbActor ! new UpdateTranscoderReply(meetingId, transcoderId, mapAsScalaMap(params).toMap) + } + + def stopTranscoderReply(meetingId: String, transcoderId: String) { + bbbActor ! new StopTranscoderReply(meetingId, transcoderId) + } + + def transcoderStatusUpdate(meetingId: String, transcoderId: String, params: java.util.Map[String, String]) { + bbbActor ! new TranscoderStatusUpdate(meetingId, transcoderId, mapAsScalaMap(params).toMap) + } + + def startProbingReply(meetingId: String, transcoderId: String, params: java.util.Map[String, String]) { + bbbActor ! new StartProbingReply(meetingId, transcoderId, mapAsScalaMap(params).toMap) + } } diff --git a/akka-bbb-apps/src/main/scala/org/bigbluebutton/core/MeetingActor.scala b/akka-bbb-apps/src/main/scala/org/bigbluebutton/core/MeetingActor.scala index f2b99df7ab..55b47e7e12 100755 --- a/akka-bbb-apps/src/main/scala/org/bigbluebutton/core/MeetingActor.scala +++ b/akka-bbb-apps/src/main/scala/org/bigbluebutton/core/MeetingActor.scala @@ -224,6 +224,16 @@ class MeetingActor(val mProps: MeetingProperties, val outGW: OutMessageGateway) handleSharedNotesSyncNoteRequest(msg) case msg: ReconnectionTimeout => handleReconnectionTimeout(msg) + case msg: StartTranscoderReply => + handleStartTranscoderReply(msg) + case msg: UpdateTranscoderReply => + handleUpdateTranscoderReply(msg) + case msg: StopTranscoderReply => + handleStopTranscoderReply(msg) + case msg: TranscoderStatusUpdate => + handleTranscoderStatusUpdate(msg) + case msg: StartProbingReply => + handleStartProbingReply(msg) case msg: EndMeeting => handleEndMeeting(msg) case StopMeetingActor => //exit diff --git a/akka-bbb-apps/src/main/scala/org/bigbluebutton/core/MessageSenderActor.scala b/akka-bbb-apps/src/main/scala/org/bigbluebutton/core/MessageSenderActor.scala index 0c32da0b49..926acc876e 100755 --- a/akka-bbb-apps/src/main/scala/org/bigbluebutton/core/MessageSenderActor.scala +++ b/akka-bbb-apps/src/main/scala/org/bigbluebutton/core/MessageSenderActor.scala @@ -33,6 +33,9 @@ import org.bigbluebutton.common.messages.UserEjectedFromMeetingMessage import org.bigbluebutton.common.messages.LockLayoutMessage import org.bigbluebutton.core.pubsub.senders.WhiteboardMessageToJsonConverter import org.bigbluebutton.common.converters.ToJsonEncoder +import org.bigbluebutton.common.messages.StartTranscoderRequestMessage +import org.bigbluebutton.common.messages.UpdateTranscoderRequestMessage +import org.bigbluebutton.common.messages.StopTranscoderRequestMessage object MessageSenderActor { def props(meetingId: String, msgSender: MessageSender): Props = @@ -124,6 +127,9 @@ class MessageSenderActor(val meetingId: String, val service: MessageSender) case msg: UserLeftVoice => handleUserLeftVoice(msg) case msg: IsMeetingMutedReply => handleIsMeetingMutedReply(msg) case msg: UserListeningOnly => handleUserListeningOnly(msg) + case msg: StartTranscoderRequest => handleStartTranscoderRequest(msg) + case msg: UpdateTranscoderRequest => handleUpdateTranscoderRequest(msg) + case msg: StopTranscoderRequest => handleStopTranscoderRequest(msg) case msg: GetCurrentLayoutReply => handleGetCurrentLayoutReply(msg) case msg: BroadcastLayoutEvent => handleBroadcastLayoutEvent(msg) case msg: LockLayoutEvent => handleLockLayoutEvent(msg) @@ -669,6 +675,21 @@ class MessageSenderActor(val meetingId: String, val service: MessageSender) service.send(MessagingConstants.FROM_USERS_CHANNEL, json) } + private def handleStartTranscoderRequest(msg: StartTranscoderRequest) { + val str = new StartTranscoderRequestMessage(msg.meetingID, msg.transcoderId, mapAsJavaMap(msg.params)) + service.send(MessagingConstants.TO_BBB_TRANSCODE_SYSTEM_CHAN, str.toJson()) + } + + private def handleUpdateTranscoderRequest(msg: UpdateTranscoderRequest) { + val str = new UpdateTranscoderRequestMessage(msg.meetingID, msg.transcoderId, mapAsJavaMap(msg.params)) + service.send(MessagingConstants.TO_BBB_TRANSCODE_SYSTEM_CHAN, str.toJson()) + } + + private def handleStopTranscoderRequest(msg: StopTranscoderRequest) { + val str = new StopTranscoderRequestMessage(msg.meetingID, msg.transcoderId) + service.send(MessagingConstants.TO_BBB_TRANSCODE_SYSTEM_CHAN, str.toJson()) + } + private def handleGetWhiteboardShapesReply(msg: GetWhiteboardShapesReply) { val json = WhiteboardMessageToJsonConverter.getWhiteboardShapesReplyToJson(msg) service.send(MessagingConstants.FROM_WHITEBOARD_CHANNEL, json) diff --git a/akka-bbb-apps/src/main/scala/org/bigbluebutton/core/api/InMessages.scala b/akka-bbb-apps/src/main/scala/org/bigbluebutton/core/api/InMessages.scala index 9da320c856..732efb1143 100755 --- a/akka-bbb-apps/src/main/scala/org/bigbluebutton/core/api/InMessages.scala +++ b/akka-bbb-apps/src/main/scala/org/bigbluebutton/core/api/InMessages.scala @@ -135,4 +135,11 @@ case class GetCurrentDocumentRequest(meetingID: String, requesterID: String) ext case class CreateAdditionalNotesRequest(meetingID: String, requesterID: String, noteName: String) extends InMessage case class DestroyAdditionalNotesRequest(meetingID: String, requesterID: String, noteID: String) extends InMessage case class RequestAdditionalNotesSetRequest(meetingID: String, requesterID: String, additionalNotesSetSize: Int) extends InMessage -case class SharedNotesSyncNoteRequest(meetingID: String, requesterID: String, noteID: String) extends InMessage \ No newline at end of file +case class SharedNotesSyncNoteRequest(meetingID: String, requesterID: String, noteID: String) extends InMessage + +//Transcode +case class StartTranscoderReply(meetingID: String, transcoderId: String, params: Map[String, String]) extends InMessage +case class UpdateTranscoderReply(meetingID: String, transcoderId: String, params: Map[String, String]) extends InMessage +case class StopTranscoderReply(meetingID: String, transcoderId: String) extends InMessage +case class TranscoderStatusUpdate(meetingID: String, transcoderId: String, params: Map[String, String]) extends InMessage +case class StartProbingReply(meetingID: String, transcoderId: String, params: Map[String, String]) extends InMessage diff --git a/akka-bbb-apps/src/main/scala/org/bigbluebutton/core/api/OutMessages.scala b/akka-bbb-apps/src/main/scala/org/bigbluebutton/core/api/OutMessages.scala index 5aac8ce483..2dc7101d17 100755 --- a/akka-bbb-apps/src/main/scala/org/bigbluebutton/core/api/OutMessages.scala +++ b/akka-bbb-apps/src/main/scala/org/bigbluebutton/core/api/OutMessages.scala @@ -146,6 +146,12 @@ case class CreateAdditionalNotesReply(meetingID: String, recorded: Boolean, requ case class DestroyAdditionalNotesReply(meetingID: String, recorded: Boolean, requesterID: String, noteID: String) extends IOutMessage case class SharedNotesSyncNoteReply(meetingID: String, recorded: Boolean, requesterID: String, noteID: String, note: NoteReport) extends IOutMessage +//Transcode +case class StartTranscoderRequest(meetingID: String, transcoderId: String, params: scala.collection.mutable.HashMap[String, String]) extends IOutMessage +case class UpdateTranscoderRequest(meetingID: String, transcoderId: String, params: scala.collection.mutable.HashMap[String, String]) extends IOutMessage +case class StopTranscoderRequest(meetingID: String, transcoderId: String) extends IOutMessage +case class StartProbingRequest(meetingID: String, transcoderId: String, params: scala.collection.mutable.HashMap[String, String]) extends IOutMessage + // Value Objects case class MeetingVO(id: String, recorded: Boolean) diff --git a/akka-bbb-apps/src/main/scala/org/bigbluebutton/endpoint/redis/AppsRedisSubscriberActor.scala b/akka-bbb-apps/src/main/scala/org/bigbluebutton/endpoint/redis/AppsRedisSubscriberActor.scala index 6187fcd93c..f2381644c7 100755 --- a/akka-bbb-apps/src/main/scala/org/bigbluebutton/endpoint/redis/AppsRedisSubscriberActor.scala +++ b/akka-bbb-apps/src/main/scala/org/bigbluebutton/endpoint/redis/AppsRedisSubscriberActor.scala @@ -17,7 +17,7 @@ import redis.api.servers.ClientSetname object AppsRedisSubscriberActor extends SystemConfiguration { val channels = Seq("time") - val patterns = Seq("bigbluebutton:to-bbb-apps:*", "bigbluebutton:from-voice-conf:*") + val patterns = Seq("bigbluebutton:to-bbb-apps:*", "bigbluebutton:from-voice-conf:*", "bigbluebutton:from-bbb-transcode:*") def props(msgReceiver: RedisMessageReceiver): Props = Props(classOf[AppsRedisSubscriberActor], msgReceiver, diff --git a/akka-bbb-transcode/.gitignore b/akka-bbb-transcode/.gitignore new file mode 100644 index 0000000000..1c0e395724 --- /dev/null +++ b/akka-bbb-transcode/.gitignore @@ -0,0 +1,49 @@ +.metadata +.project +.classpath +.settings +.history +.worksheet +gen +**/*.swp +**/*~.nib +**/build/ +**/*.pbxuser +**/*.perspective +**/*.perspectivev3 +*.xcworkspace +*.xcuserdatad +*.iml +project/*.ipr +project/*.iml +project/*.iws +project/out +project/*/target +project/target +project/*/bin +project/*/build +project/*.iml +project/*/*.iml +project/.idea +project/.idea/* +.idea/ +.DS_Store +project/.DS_Store +project/*/.DS_Store +tm.out +tmlog*.log +*.tm*.epoch +out/ +provisioning/.vagrant +provisioning/*/.vagrant +provisioning/*/*.known +/sbt/akka-patterns-store/ +/daemon/src/build/ +*.lock +logs/ +tmp/ +build/ +akka-patterns-store/ +lib_managed/ +.cache +bin/ diff --git a/akka-bbb-transcode/README.md b/akka-bbb-transcode/README.md new file mode 100644 index 0000000000..2c52138368 --- /dev/null +++ b/akka-bbb-transcode/README.md @@ -0,0 +1,12 @@ + +# akka-bbb-transcode + +This app implements a simple mechanism for media transcoding, acting as a relay between endpoints. This uses [FFmpeg](http://ffmpeg.org/) for transcoding between media formats and exchanging protocols. + +building and running +--- +```bash +cd akka-bbb-transcode/ +sbt clean +sbt run +``` diff --git a/akka-bbb-transcode/build.sbt b/akka-bbb-transcode/build.sbt new file mode 100755 index 0000000000..f715c03ec8 --- /dev/null +++ b/akka-bbb-transcode/build.sbt @@ -0,0 +1,97 @@ +enablePlugins(JavaServerAppPackaging) + +name := "bbb-transcode-akka" + +organization := "org.bigbluebutton" + +version := "0.0.1" + +scalaVersion := "2.11.6" + +scalacOptions ++= Seq( + "-unchecked", + "-deprecation", + "-Xlint", + "-Ywarn-dead-code", + "-language:_", + "-target:jvm-1.7", + "-encoding", "UTF-8" +) + +resolvers ++= Seq( + "spray repo" at "http://repo.spray.io/", + "rediscala" at "http://dl.bintray.com/etaty/maven", + "blindside-repos" at "http://blindside.googlecode.com/svn/repository/" +) + +publishTo := Some(Resolver.file("file", new File(Path.userHome.absolutePath+"/dev/repo/maven-repo/releases" )) ) + +// We want to have our jar files in lib_managed dir. +// This way we'll have the right path when we import +// into eclipse. +retrieveManaged := true + +testOptions in Test += Tests.Argument(TestFrameworks.Specs2, "html", "console", "junitxml") + +testOptions in Test += Tests.Argument(TestFrameworks.ScalaTest, "-h", "target/scalatest-reports") + +libraryDependencies ++= { + val akkaVersion = "2.3.11" + Seq( + "com.typesafe.akka" %% "akka-actor" % akkaVersion, + "com.typesafe.akka" %% "akka-testkit" % akkaVersion % "test", + "com.typesafe.akka" %% "akka-slf4j" % akkaVersion, + "ch.qos.logback" % "logback-classic" % "1.0.3", + "org.pegdown" % "pegdown" % "1.4.0", + "junit" % "junit" % "4.11", + "com.etaty.rediscala" %% "rediscala" % "1.4.0", + "commons-codec" % "commons-codec" % "1.8", + "joda-time" % "joda-time" % "2.3", + "com.google.code.gson" % "gson" % "1.7.1", + "redis.clients" % "jedis" % "2.1.0", + "org.apache.commons" % "commons-lang3" % "3.2", + "org.bigbluebutton" % "bbb-common-message" % "0.0.16" + )} + +seq(Revolver.settings: _*) + +scalariformSettings + +//----------- +// Packaging +// +// Reference: +// https://github.com/muuki88/sbt-native-packager-examples/tree/master/akka-server-app +// http://www.scala-sbt.org/sbt-native-packager/index.html +//----------- +mainClass := Some("org.bigbluebutton.Boot") + +maintainer in Linux := "Mario Gasparoni " + +packageSummary in Linux := "BigBlueButton Transcoder" + +packageDescription := """BigBlueButton FFmpeg transcoder.""" + +val user = "bigbluebutton" + +val group = "bigbluebutton" + +// user which will execute the application +daemonUser in Linux := user + +// group which will execute the application +daemonGroup in Linux := group + +mappings in Universal <+= (packageBin in Compile, sourceDirectory ) map { (_, src) => + // Move the application.conf so the user can override settings here + val appConf = src / "main" / "resources" / "application.conf" + appConf -> "conf/application.conf" +} + +mappings in Universal <+= (packageBin in Compile, sourceDirectory ) map { (_, src) => + // Move logback.xml so the user can override settings here + val logConf = src / "main" / "resources" / "logback.xml" + logConf -> "conf/logback.xml" +} + +debianPackageDependencies in Debian ++= Seq("java7-runtime-headless", "bash") diff --git a/akka-bbb-transcode/project/Build.scala b/akka-bbb-transcode/project/Build.scala new file mode 100755 index 0000000000..e69de29bb2 diff --git a/akka-bbb-transcode/project/build.properties b/akka-bbb-transcode/project/build.properties new file mode 100755 index 0000000000..a6e117b610 --- /dev/null +++ b/akka-bbb-transcode/project/build.properties @@ -0,0 +1 @@ +sbt.version=0.13.8 diff --git a/akka-bbb-transcode/project/plugins.sbt b/akka-bbb-transcode/project/plugins.sbt new file mode 100755 index 0000000000..87ef644864 --- /dev/null +++ b/akka-bbb-transcode/project/plugins.sbt @@ -0,0 +1,7 @@ +addSbtPlugin("io.spray" % "sbt-revolver" % "0.7.2") + +addSbtPlugin("com.typesafe.sbt" % "sbt-scalariform" % "1.3.0") + +addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "2.2.0") + +addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.0.0") diff --git a/akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/api/DestroyVideoTranscoderReply.java b/akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/api/DestroyVideoTranscoderReply.java new file mode 100644 index 0000000000..5894a96dfd --- /dev/null +++ b/akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/api/DestroyVideoTranscoderReply.java @@ -0,0 +1,20 @@ +package org.bigbluebutton.transcode.api; + +public class DestroyVideoTranscoderReply extends InternalMessage { + private final String meetingId; + private final String transcoderId; + + public DestroyVideoTranscoderReply(String meetingId, String transcoderId) { + this.meetingId = meetingId; + this.transcoderId = transcoderId; + } + + public String getMeetingId() { + return meetingId; + } + + public String getTranscoderId() { + return transcoderId; + } + +} diff --git a/akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/api/DestroyVideoTranscoderRequest.java b/akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/api/DestroyVideoTranscoderRequest.java new file mode 100644 index 0000000000..b11005906f --- /dev/null +++ b/akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/api/DestroyVideoTranscoderRequest.java @@ -0,0 +1,6 @@ +package org.bigbluebutton.transcode.api; + +public class DestroyVideoTranscoderRequest extends InternalMessage { + public DestroyVideoTranscoderRequest() {} + +} diff --git a/akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/api/InternalMessage.java b/akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/api/InternalMessage.java new file mode 100644 index 0000000000..a0115ba6d6 --- /dev/null +++ b/akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/api/InternalMessage.java @@ -0,0 +1,3 @@ +package org.bigbluebutton.transcode.api; + +public class InternalMessage {} diff --git a/akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/api/RestartVideoTranscoderReply.java b/akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/api/RestartVideoTranscoderReply.java new file mode 100644 index 0000000000..e0c9d7702a --- /dev/null +++ b/akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/api/RestartVideoTranscoderReply.java @@ -0,0 +1,26 @@ +package org.bigbluebutton.transcode.api; + +public class RestartVideoTranscoderReply extends InternalMessage { + private final String meetingId; + private final String transcoderId; + private final String output; + + public RestartVideoTranscoderReply(String meetingId, String transcoderId, String output) { + this.meetingId = meetingId; + this.transcoderId = transcoderId; + this.output = output; + } + + public String getMeetingId() { + return meetingId; + } + + public String getTranscoderId() { + return transcoderId; + } + + public String getOutput() { + return output; + } + +} diff --git a/akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/api/RestartVideoTranscoderRequest.java b/akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/api/RestartVideoTranscoderRequest.java new file mode 100644 index 0000000000..d3a821cc12 --- /dev/null +++ b/akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/api/RestartVideoTranscoderRequest.java @@ -0,0 +1,5 @@ +package org.bigbluebutton.transcode.api; + +public class RestartVideoTranscoderRequest extends InternalMessage { + public RestartVideoTranscoderRequest() {} +} diff --git a/akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/api/StartVideoProbingReply.java b/akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/api/StartVideoProbingReply.java new file mode 100644 index 0000000000..6c5937d24e --- /dev/null +++ b/akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/api/StartVideoProbingReply.java @@ -0,0 +1,27 @@ +package org.bigbluebutton.transcode.api; + +import java.util.Map; + +public class StartVideoProbingReply extends InternalMessage { + private final String meetingId; + private final String transcoderId; + private final Map probingData; + + public StartVideoProbingReply (String meetingId, String transcoderId, Map probingData) { + this.meetingId = meetingId; + this.transcoderId = transcoderId; + this.probingData = probingData; + } + + public String getMeetingId() { + return meetingId; + } + + public String getTranscoderId() { + return transcoderId; + } + + public Map getProbingData(){ + return probingData; + } +} diff --git a/akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/api/StartVideoProbingRequest.java b/akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/api/StartVideoProbingRequest.java new file mode 100644 index 0000000000..db4e21767e --- /dev/null +++ b/akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/api/StartVideoProbingRequest.java @@ -0,0 +1,5 @@ +package org.bigbluebutton.transcode.api; + +public class StartVideoProbingRequest extends InternalMessage { + public StartVideoProbingRequest() {} +} diff --git a/akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/api/StartVideoTranscoderReply.java b/akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/api/StartVideoTranscoderReply.java new file mode 100644 index 0000000000..c840e22460 --- /dev/null +++ b/akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/api/StartVideoTranscoderReply.java @@ -0,0 +1,26 @@ +package org.bigbluebutton.transcode.api; + +public class StartVideoTranscoderReply extends InternalMessage { + private final String meetingId; + private final String transcoderId; + private final String output; + + public StartVideoTranscoderReply(String meetingId, String transcoderId, String output) { + this.meetingId = meetingId; + this.transcoderId = transcoderId; + this.output = output; + } + + public String getMeetingId() { + return meetingId; + } + + public String getTranscoderId() { + return transcoderId; + } + + public String getOutput() { + return output; + } + +} diff --git a/akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/api/StartVideoTranscoderRequest.java b/akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/api/StartVideoTranscoderRequest.java new file mode 100644 index 0000000000..2beaf5163a --- /dev/null +++ b/akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/api/StartVideoTranscoderRequest.java @@ -0,0 +1,5 @@ +package org.bigbluebutton.transcode.api; + +public class StartVideoTranscoderRequest extends InternalMessage { + public StartVideoTranscoderRequest() {} +} diff --git a/akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/api/StopVideoTranscoderReply.java b/akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/api/StopVideoTranscoderReply.java new file mode 100644 index 0000000000..8b7544010d --- /dev/null +++ b/akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/api/StopVideoTranscoderReply.java @@ -0,0 +1,20 @@ +package org.bigbluebutton.transcode.api; + +public class StopVideoTranscoderReply extends InternalMessage { + private final String meetingId; + private final String transcoderId; + + public StopVideoTranscoderReply(String meetingId, String transcoderId) { + this.meetingId = meetingId; + this.transcoderId = transcoderId; + } + + public String getMeetingId() { + return meetingId; + } + + public String getTranscoderId() { + return transcoderId; + } + +} diff --git a/akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/api/StopVideoTranscoderRequest.java b/akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/api/StopVideoTranscoderRequest.java new file mode 100644 index 0000000000..1902077ebd --- /dev/null +++ b/akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/api/StopVideoTranscoderRequest.java @@ -0,0 +1,5 @@ +package org.bigbluebutton.transcode.api; + +public class StopVideoTranscoderRequest extends InternalMessage { + public StopVideoTranscoderRequest() {} +} diff --git a/akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/api/TranscodingFinishedSuccessfully.java b/akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/api/TranscodingFinishedSuccessfully.java new file mode 100644 index 0000000000..98e75da0e9 --- /dev/null +++ b/akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/api/TranscodingFinishedSuccessfully.java @@ -0,0 +1,13 @@ +package org.bigbluebutton.transcode.api; + +public class TranscodingFinishedSuccessfully extends InternalMessage { + private final String transcoderId; + + public TranscodingFinishedSuccessfully (String transcoderId) { + this.transcoderId = transcoderId; + } + + public String getTranscoderId() { + return transcoderId; + } +} diff --git a/akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/api/TranscodingFinishedUnsuccessfully.java b/akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/api/TranscodingFinishedUnsuccessfully.java new file mode 100644 index 0000000000..ae4add54b2 --- /dev/null +++ b/akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/api/TranscodingFinishedUnsuccessfully.java @@ -0,0 +1,13 @@ +package org.bigbluebutton.transcode.api; + +public class TranscodingFinishedUnsuccessfully extends InternalMessage { + private final String transcoderId; + + public TranscodingFinishedUnsuccessfully (String transcoderId) { + this.transcoderId = transcoderId; + } + + public String getTranscoderId() { + return transcoderId; + } +} diff --git a/akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/api/UpdateVideoTranscoderReply.java b/akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/api/UpdateVideoTranscoderReply.java new file mode 100644 index 0000000000..8a2a5540c9 --- /dev/null +++ b/akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/api/UpdateVideoTranscoderReply.java @@ -0,0 +1,26 @@ +package org.bigbluebutton.transcode.api; + +import java.util.Map; + +public class UpdateVideoTranscoderReply extends InternalMessage { + private final String meetingId; + private final String transcoderId; + private final String output; + + public UpdateVideoTranscoderReply(String meetingId, String transcoderId, String output) { + this.meetingId = meetingId; + this.transcoderId = transcoderId; + this.output = output; + } + + public String getMeetingId() { + return meetingId; + } + public String getTranscoderId() { + return transcoderId; + } + + public String getOutput() { + return output; + } +} diff --git a/akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/api/UpdateVideoTranscoderRequest.java b/akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/api/UpdateVideoTranscoderRequest.java new file mode 100644 index 0000000000..68341ceb47 --- /dev/null +++ b/akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/api/UpdateVideoTranscoderRequest.java @@ -0,0 +1,15 @@ +package org.bigbluebutton.transcode.api; + +import java.util.Map; + +public class UpdateVideoTranscoderRequest extends InternalMessage { + private final Map params; + + public UpdateVideoTranscoderRequest(Map params) { + this.params = params; + } + + public Map getParams() { + return params; + } +} diff --git a/akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/core/VideoTranscoder.java b/akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/core/VideoTranscoder.java new file mode 100644 index 0000000000..49c3d7975b --- /dev/null +++ b/akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/core/VideoTranscoder.java @@ -0,0 +1,766 @@ + +package org.bigbluebutton.transcode.core; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.StringReader; +import java.util.HashMap; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.bigbluebutton.transcode.core.ffprobe.FFProbeCommand; +import org.bigbluebutton.transcode.core.ffmpeg.FFmpegCommand; +import org.bigbluebutton.transcode.core.ffmpeg.FFmpegConstants; +import org.bigbluebutton.transcode.core.ffmpeg.FFmpegUtils; +import org.bigbluebutton.transcode.core.processmonitor.ProcessMonitor; +import org.bigbluebutton.transcode.core.processmonitor.ProcessMonitorObserver; +import org.bigbluebutton.common.messages.Constants; + +import akka.actor.UntypedActor; +import akka.actor.ActorRef; +import akka.actor.Props; +import akka.japi.Creator; +import org.bigbluebutton.transcode.api.InternalMessage; +import org.bigbluebutton.transcode.api.DestroyVideoTranscoderRequest; +import org.bigbluebutton.transcode.api.DestroyVideoTranscoderReply; +import org.bigbluebutton.transcode.api.RestartVideoTranscoderRequest; +import org.bigbluebutton.transcode.api.RestartVideoTranscoderReply; +import org.bigbluebutton.transcode.api.StartVideoProbingRequest; +import org.bigbluebutton.transcode.api.StartVideoProbingReply; +import org.bigbluebutton.transcode.api.StartVideoTranscoderRequest; +import org.bigbluebutton.transcode.api.StartVideoTranscoderReply; +import org.bigbluebutton.transcode.api.StopVideoTranscoderRequest; +import org.bigbluebutton.transcode.api.StopVideoTranscoderReply; +import org.bigbluebutton.transcode.api.UpdateVideoTranscoderRequest; +import org.bigbluebutton.transcode.api.UpdateVideoTranscoderReply; +import org.bigbluebutton.transcode.api.TranscodingFinishedSuccessfully; +import org.bigbluebutton.transcode.api.TranscodingFinishedUnsuccessfully; + +public class VideoTranscoder extends UntypedActor implements ProcessMonitorObserver { + + public static enum Type{TRANSCODE_RTP_TO_RTMP, TRANSCODE_RTMP_TO_RTP,TRANSCODE_FILE_TO_RTP, TRANSCODE_FILE_TO_RTMP, PROBE_RTMP}; + public static enum Status{RUNNING, STOPPED, UPDATING} + public static final String VIDEO_CONF_LOGO_PATH = FFmpegUtils.videoconfLogoPath; + public static final String FFMPEG_PATH = FFmpegUtils.ffmpegPath; + public static final String FFPROBE_PATH = FFmpegUtils.ffprobePath; + + //if ffmpeg restarts 5 times in less than 5 seconds, we will not restart it anymore + //this is to prevent a infinite loop of ffmpeg restartings + private static final int MAX_RESTARTINGS_NUMBER = 5; + private static final long MIN_RESTART_TIME = 5000; //5 seconds + private int currentFFmpegRestartNumber = 0; + private long lastFFmpegRestartTime = 0; + + private ActorRef parentActor; + private Type type; + private Status status = Status.STOPPED; + private ProcessMonitor ffmpegProcessMonitor; + private ProcessMonitor ffprobeProcessMonitor; + private FFmpegCommand ffmpeg; + private String transcoderId; + private String username; + private String callername; //used to create rtp-> (any) SDP + private String videoStreamName; + private String input; + private String outputLive; + private String output; //output of transcoder + private String meetingId; + private String voiceBridge; + private String sourceIp; + private String destinationIp; + private String localVideoPort; + private String remoteVideoPort; + private String sdpPath; + private VideoTranscoderObserver observer; + private String globalVideoWidth = "640";// get this from properties (Stored in FFmpegUtils) + private String globalVideoHeight = "480";// get this from properties + public static final String FFMPEG_NAME = "FFMPEG"; + public static final String FFPROBE_NAME = "FFPROBE"; + + public static Props props(final ActorRef parentActor, final String meetingId, final String transcoderId, final Map params) { + return Props.create(new Creator() { + private static final long serialVersionUID = 1L; + + @Override + public VideoTranscoder create() throws Exception { + return new VideoTranscoder(parentActor, meetingId, transcoderId, params); + } + }); + } + + @Override + public void onReceive(Object msg) { + if (msg instanceof StartVideoTranscoderRequest) { + start(); + } else if (msg instanceof StopVideoTranscoderRequest) { + stop(); + } else if (msg instanceof UpdateVideoTranscoderRequest) { + UpdateVideoTranscoderRequest uvtr = (UpdateVideoTranscoderRequest) msg; + update(uvtr.getParams()); + } else if (msg instanceof DestroyVideoTranscoderRequest) { + destroyTranscoder(); + } else if (msg instanceof RestartVideoTranscoderRequest) { + restart(); + } else if (msg instanceof StartVideoProbingRequest) { + probeVideoStream(); + } + } + + private void stopActor() { + if (context() != null) + context().stop(getSelf()); + } + + private void sendMessage(InternalMessage msg) { + if ((parentActor != null) && (msg != null)) + parentActor.tell(msg,getSelf()); + } + + public VideoTranscoder(ActorRef parentActor, String meetingId, String transcoderId, Map params){ + this.parentActor = parentActor; + this.meetingId = meetingId; + this.transcoderId = transcoderId; + if (params != null) + switch (params.get(Constants.TRANSCODER_TYPE)){ + case Constants.TRANSCODE_RTP_TO_RTMP: + this.type = Type.TRANSCODE_RTP_TO_RTMP; + this.sourceIp = params.get(Constants.LOCAL_IP_ADDRESS); + this.localVideoPort = params.get(Constants.LOCAL_VIDEO_PORT); + this.remoteVideoPort = params.get(Constants.REMOTE_VIDEO_PORT); + this.destinationIp = params.get(Constants.SIP_HOST); + this.voiceBridge = params.get(Constants.VOICE_CONF); + this.callername = params.get(Constants.CALLERNAME); + break; + + case Constants.TRANSCODE_RTMP_TO_RTP: + this.type = Type.TRANSCODE_RTMP_TO_RTP; + this.sourceIp = params.get(Constants.LOCAL_IP_ADDRESS); + this.localVideoPort = params.get(Constants.LOCAL_VIDEO_PORT); + this.remoteVideoPort = params.get(Constants.REMOTE_VIDEO_PORT); + this.destinationIp = params.get(Constants.SIP_HOST); + this.voiceBridge = params.get(Constants.VOICE_CONF); + this.callername = params.get(Constants.CALLERNAME); + this.videoStreamName = params.get(Constants.INPUT); + break; + + case Constants.TRANSCODE_FILE_TO_RTP: + this.type = Type.TRANSCODE_FILE_TO_RTP; + this.sourceIp = params.get(Constants.LOCAL_IP_ADDRESS); + this.localVideoPort = params.get(Constants.LOCAL_VIDEO_PORT); + this.remoteVideoPort = params.get(Constants.REMOTE_VIDEO_PORT); + this.destinationIp = params.get(Constants.SIP_HOST); + this.voiceBridge = params.get(Constants.VOICE_CONF); + this.callername = params.get(Constants.CALLERNAME); + break; + + case Constants.TRANSCODE_FILE_TO_RTMP: + this.type = Type.TRANSCODE_FILE_TO_RTMP; + this.sourceIp = params.get(Constants.LOCAL_IP_ADDRESS); + this.localVideoPort = params.get(Constants.LOCAL_VIDEO_PORT); + this.remoteVideoPort = params.get(Constants.REMOTE_VIDEO_PORT); + this.destinationIp = params.get(Constants.SIP_HOST); + this.voiceBridge = params.get(Constants.VOICE_CONF); + this.callername = params.get(Constants.CALLERNAME); + break; + + case Constants.PROBE_RTMP: + this.type = Type.PROBE_RTMP; + this.videoStreamName = params.get(Constants.INPUT); + break; + + default: + break; + } + } + + private synchronized void start() { + switch (status){ + case RUNNING: + System.out.println(" > Transcoder already running, sending it's output"); + break; + case UPDATING: + System.out.println(" > Transcoder is being updated, returning it's current output"); + break; + default: + status = Status.RUNNING; + startTranscoder(); + break; + } + sendMessage(new StartVideoTranscoderReply(meetingId, transcoderId, output)); + } + + private boolean startTranscoder(){ + if ((ffmpegProcessMonitor != null) &&(ffmpeg != null)) { + return false; + } + + String[] command; + + if(!canFFmpegRun()) { + //log.debug("***TRANSCODER WILL NOT START: ffmpeg cannot run"); + return false; + } + + //log.debug("Starting Video Transcoder..."); + + switch(type){ + case TRANSCODE_RTMP_TO_RTP: + + if(!areRtmpToRtpParametersValid()) { + System.out.println(" > ***TRANSCODER WILL NOT START: Rtmp to Rtp Parameters are invalid"); + return false; + } + + + input = "rtmp://" + sourceIp + "/video/" + meetingId + "/" + + videoStreamName + " live=1"; //the full input is composed by the videoStreamName + outputLive = "rtp://" + destinationIp + ":" + remoteVideoPort + "?localport=" + localVideoPort; + output = ""; + + ffmpeg = new FFmpegCommand(); + ffmpeg.setFFmpegPath(FFMPEG_PATH); + ffmpeg.setInput(input); + ffmpeg.addRtmpInputConnectionParameter(meetingId); + ffmpeg.addRtmpInputConnectionParameter("transcoder-"+transcoderId); + ffmpeg.setFrameRate(15); + ffmpeg.setBufSize(1024); + ffmpeg.setGop(1); //MCU compatibility + ffmpeg.setCodec("libopenh264"); + ffmpeg.setMaxRate(1024); + ffmpeg.setSliceMode("dyn"); + ffmpeg.setMaxNalSize("1024"); + ffmpeg.setRtpFlags("h264_mode0"); //RTP's packetization mode 0 + ffmpeg.setProfile("baseline"); + ffmpeg.setFormat("rtp"); + ffmpeg.setPayloadType(FFmpegConstants.CODEC_ID_H264); + ffmpeg.setLoglevel("verbose"); + ffmpeg.setOutput(outputLive); + ffmpeg.setAnalyzeDuration("1000"); // 1ms + ffmpeg.setProbeSize("32"); // 1ms + System.out.println("Preparing FFmpeg process monitor"); + command = ffmpeg.getFFmpegCommand(true); + break; + + case TRANSCODE_RTP_TO_RTMP: + + if(!areRtpToRtmpParametersValid()) { + System.out.println(" > ***TRANSCODER WILL NOT START: Rtp to Rtmp Parameters are invalid"); + return false; + } + + //Create SDP FILE + sdpPath = FFmpegUtils.createSDPVideoFile(callername, sourceIp, localVideoPort, FFmpegConstants.CODEC_NAME_H264, FFmpegConstants.CODEC_ID_H264, FFmpegConstants.SAMPLE_RATE_H264, voiceBridge); + input = sdpPath; + + //Generate video stream name + videoStreamName = generateVideoStreamName(type); + outputLive = "rtmp://" + destinationIp + "/video/" + meetingId + "/" + + videoStreamName+" live=1"; + output = videoStreamName; + + ffmpeg = new FFmpegCommand(); + ffmpeg.setFFmpegPath(FFMPEG_PATH); + ffmpeg.setInput(input); + ffmpeg.setFormat("flv"); + ffmpeg.setLoglevel("verbose"); + ffmpeg.setOutput(outputLive); + ffmpeg.addRtmpOutputConnectionParameter(meetingId); + ffmpeg.addRtmpOutputConnectionParameter("transcoder-"+transcoderId); + ffmpeg.setVideoBitRate(1024); + ffmpeg.setBufSize(1024); + ffmpeg.setMaxRate(1024); + ffmpeg.setCodec("libopenh264"); + ffmpeg.setProfile("baseline"); + ffmpeg.setAnalyzeDuration("1000"); // 10ms + ffmpeg.addCustomParameter("-s", globalVideoWidth+"x"+globalVideoHeight); + ffmpeg.addCustomParameter("-filter:v","scale=iw*min("+globalVideoWidth+"/iw\\,"+globalVideoHeight+"/ih):ih*min("+globalVideoWidth+"/iw\\,"+globalVideoHeight+"/ih), pad="+globalVideoWidth+":"+globalVideoHeight+":("+globalVideoWidth+"-iw*min("+globalVideoWidth+"/iw\\,"+globalVideoHeight+"/ih))/2:("+globalVideoHeight+"-ih*min("+globalVideoWidth+"/iw\\,"+globalVideoHeight+"/ih))/2, fps=fps=15"); + command = ffmpeg.getFFmpegCommand(true); + break; + + case TRANSCODE_FILE_TO_RTP: + + if(!areFileToRtpParametersValid()) { + System.out.println(" > ***TRANSCODER WILL NOT START: File to Rtp Parameters are invalid"); + return false; + } + + input = VIDEO_CONF_LOGO_PATH; + outputLive = "rtp://" + destinationIp + ":" + remoteVideoPort + "?localport=" + localVideoPort; + output = ""; + username = callername; + + ffmpeg = new FFmpegCommand(); + ffmpeg.setFFmpegPath(FFMPEG_PATH); + ffmpeg.setIgnoreLoop(0); + ffmpeg.setInput(input); + ffmpeg.setInputLive(true); + ffmpeg.addCustomParameter("-s", globalVideoWidth+"x"+globalVideoHeight); + ffmpeg.setFrameRate(15); + ffmpeg.setPayloadType(FFmpegConstants.CODEC_ID_H264); + ffmpeg.setLoglevel("quiet"); + if (FFmpegUtils.isUserVideoSubtitleEnabled()) + ffmpeg.addCustomParameter("-vf","drawtext=fontfile=/usr/share/fonts/truetype/liberation/LiberationSans-Bold.ttf:text="+username+":x="+globalVideoWidth+"-tw-20:y="+globalVideoHeight+"-th-20:fontcolor=white@0.9:shadowcolor=black:shadowx=2:shadowy=2:fontsize=20"); + ffmpeg.setGop(1); + ffmpeg.setCodec("libopenh264"); + ffmpeg.setSliceMode("dyn"); + ffmpeg.setMaxNalSize("1024"); + ffmpeg.setRtpFlags("h264_mode0"); //RTP's packetization mode 0 + ffmpeg.setProfile("baseline"); + ffmpeg.setFormat("rtp"); + ffmpeg.setOutput(outputLive); + command = ffmpeg.getFFmpegCommand(true); + break; + + case TRANSCODE_FILE_TO_RTMP: + if(!areFileToRtmpParametersValid()) { + System.out.println("***TRANSCODER WILL NOT START: File to Rtmp Parameters are invalid"); + return false; + } + videoStreamName = generateVideoStreamName(type); + input = VIDEO_CONF_LOGO_PATH; + outputLive = "rtmp://" + destinationIp + "/video/" + meetingId + "/" + + videoStreamName+" live=1"; + output = videoStreamName; + + ffmpeg = new FFmpegCommand(); + ffmpeg.setFFmpegPath(FFMPEG_PATH); + ffmpeg.setInput(input); + ffmpeg.setInputLive(true); + ffmpeg.setFrameSize("640x480"); + ffmpeg.setIgnoreLoop(0); + ffmpeg.setFormat("flv"); + ffmpeg.setLoglevel("verbose"); + ffmpeg.addRtmpOutputConnectionParameter(meetingId); + ffmpeg.addRtmpOutputConnectionParameter("transcoder-"+transcoderId); + ffmpeg.setOutput(outputLive); + ffmpeg.setCodec("libopenh264"); + ffmpeg.setProfile("baseline"); + command = ffmpeg.getFFmpegCommand(true); + break; + + default: command = null; + } + + if(command != null){ + this.ffmpegProcessMonitor = new ProcessMonitor(command,FFMPEG_NAME); + ffmpegProcessMonitor.setProcessMonitorObserver(this); + ffmpegProcessMonitor.start(); + return true; + } + return false; + } + + private synchronized void stop(){ + status = Status.STOPPED; + stopTranscoder(); + sendMessage(new StopVideoTranscoderReply(meetingId, transcoderId)); + } + + private void stopTranscoder() { + if (ffmpegProcessMonitor != null) { + ffmpegProcessMonitor.forceDestroy(); + clearData(); + } + } + + /** + * Clear monitor and ffmpeg data. + */ + private void clearData() { + ffmpegProcessMonitor = null; + ffmpeg = null; + switch (type) { + case TRANSCODE_RTP_TO_RTMP: + FFmpegUtils.removeSDPVideoFile(voiceBridge); + break; + default: + } + } + + private synchronized void destroyTranscoder() { + status = Status.STOPPED; + stopTranscoder(); + sendMessage(new DestroyVideoTranscoderReply(meetingId, transcoderId)); + stopActor(); + } + + private synchronized void update(Map params) { + switch (status) { + case UPDATING: + status = Status.RUNNING; + startTranscoder(); + sendMessage(new UpdateVideoTranscoderReply(meetingId, transcoderId, output)); + break; + default: + if (params != null) { + String transcoderType = params.get(Constants.TRANSCODER_TYPE); + String input = params.get(Constants.INPUT); + String sourceIp = params.get(Constants.LOCAL_IP_ADDRESS); + String localVideoPort = params.get(Constants.LOCAL_VIDEO_PORT); + String remoteVideoPort = params.get(Constants.REMOTE_VIDEO_PORT); + String destinationIp = params.get(Constants.SIP_HOST); + + setType(transcoderType); + setVideoStreamName(input); + setSourceIp(sourceIp); + setLocalVideoPort(localVideoPort); + setRemoteVideoPort(remoteVideoPort); + setDestinationIp(destinationIp); + + status = Status.UPDATING; //mark update status + stopTranscoder(); + } + break; + } + } + + + private synchronized void restart() { + if (!maxRestartsReached()) { + System.out.println(" > [Restart] Starting current transcoder " + transcoderId); + status = Status.RUNNING; + lastFFmpegRestartTime = System.currentTimeMillis(); + clearData(); + startTranscoder(); + sendMessage(new RestartVideoTranscoderReply(meetingId, transcoderId, output)); + } + } + + private boolean maxRestartsReached() { + currentFFmpegRestartNumber++; + if(currentFFmpegRestartNumber == MAX_RESTARTINGS_NUMBER) { + long timeInterval = System.currentTimeMillis() - lastFFmpegRestartTime; + if(timeInterval <= MIN_RESTART_TIME) { + System.out.println(" > Max number of ffmpeg restartings reached in " + timeInterval + " miliseconds for " + transcoderId + "'s Video Transcoder." + + " Not restating it anymore."); + return true; + } + else + currentFFmpegRestartNumber = 0; + } + return false; + } + + public synchronized void transcodingFinishedSuccessfully() { + sendMessage(new TranscodingFinishedSuccessfully(transcoderId)); //tell parent for clean up + stopActor(); + } + + public synchronized void probeVideoStream(){ + if (ffmpegProcessMonitor != null) { + FFProbeCommand ffprobe = new FFProbeCommand(outputLive); + String command[]; + + ffprobe.setFFprobepath(FFPROBE_PATH); + ffprobe.setInput(outputLive); + ffprobe.setAnalyzeDuration("1"); + ffprobe.setShowStreams(); + ffprobe.setLoglevel("quiet"); + ffprobe.getFFprobeCommand(true); + + command = ffprobe.getFFprobeCommand(true); + if(command != null){ + this.ffprobeProcessMonitor = new ProcessMonitor(command,FFPROBE_NAME); + ffprobeProcessMonitor.setProcessMonitorObserver(this); + ffprobeProcessMonitor.start(); + } + } else { + } + } + + private void updateGlobalStreamName(String streamName){ + this.videoStreamName = streamName; + String outputLive; + String[] newCommand; + outputLive = "rtmp://" + destinationIp + "/video/" + meetingId + "/" + + this.videoStreamName+" live=1"; + ffmpeg.setOutput(outputLive); //update ffmpeg's output + newCommand = ffmpeg.getFFmpegCommand(true); + ffmpegProcessMonitor.setCommand(newCommand); //update ffmpeg command + } + + public void setVideoTranscoderObserver(VideoTranscoderObserver observer){ + this.observer = observer; + } + + @Override + public void handleProcessFinishedUnsuccessfully(String processMonitorName,String processOutput) { + if ((processMonitorName == null)|| processMonitorName.isEmpty()){ + return; + } + + if (FFMPEG_NAME.equals(processMonitorName)){ + sendMessage(new TranscodingFinishedUnsuccessfully(transcoderId)); + }else if (FFPROBE_NAME.equals(processMonitorName)){ + System.out.println(" > Failed to probe video stream " + outputLive); + } + } + + @Override + public void handleProcessFinishedWithSuccess(String processMonitorName, String processOutput) { + if ((processMonitorName == null)|| processMonitorName.isEmpty()) { + System.out.println(" > Can't handle process process monitor finishing with success: UNKNOWN PROCESS"); + return; + } + + if (FFMPEG_NAME.equals(processMonitorName)){ + switch (status) { + case RUNNING: + System.out.println(" > Transcoder finished with success but wasn't closed by the user " + transcoderId + ". Informing parentActor"); + transcodingFinishedSuccessfully(); + break; + case STOPPED: + System.out.println(" > Transcoder closed by the user. Finished with success"); + break; + case UPDATING: + update(null); + break; + default: + break; + } + } + else if (FFPROBE_NAME.equals(processMonitorName)){ + String ffprobeOutput = processOutput; + Map ffprobeData = parseFFprobeOutput(ffprobeOutput); + sendMessage(new StartVideoProbingReply(meetingId, transcoderId, ffprobeData)); + }else{ + System.out.println("Can't handle process monitor finishing with success: UNKNOWN PROCESS"); + } + } + + public Map parseFFprobeOutput(String ffprobeOutput){ + Pattern pattern = Pattern.compile("(.*)=(.*)"); + Map ffprobeResult = new HashMap(); + + BufferedReader buf = new BufferedReader(new StringReader(ffprobeOutput)); + String line = null; + try { + while( (line=buf.readLine()) != null){ + Matcher matcher = pattern.matcher(line); + if(matcher.matches()) { + ffprobeResult.put(matcher.group(1), matcher.group(2)); + } + } + } catch (IOException e){ + //log.debug("Error when parsing FFprobe's output"); + } + return ffprobeResult; + } + + + public boolean canFFmpegRun() { + //log.debug("Checking if FFmpeg can run..."); + return validateIps() && isFFmpegPathValid(); + } + + public boolean validateIps(){ + if ((sourceIp == null || sourceIp.isEmpty()) + && (type == Type.TRANSCODE_RTMP_TO_RTP)) + return false; + + if ((destinationIp == null || destinationIp.isEmpty()) + && (type == Type.TRANSCODE_FILE_TO_RTMP + || type == Type.TRANSCODE_FILE_TO_RTP + || type == Type.TRANSCODE_RTMP_TO_RTP + || type == Type.TRANSCODE_FILE_TO_RTP)) + return false; + + return true; + } + + public boolean isFFmpegPathValid() { + /*if (!GlobalCall.ffmpegExists(FFMPEG_PATH)) { + //log.debug("***FFMPEG DOESN'T EXIST: check the FFMPEG path in bigbluebutton-sip.properties"); + return false; + }*/ + + return true; + } + + + public boolean areRtmpToRtpParametersValid() { + //log.debug("Checking Rtmp to Rtp Transcoder Parameters..."); + + if(meetingId == null || meetingId.isEmpty()) { + //log.debug("meetingId is null or empty"); + return false; + } + + if(videoStreamName == null || videoStreamName.isEmpty()) { + //log.debug("videoStreamName is null or empty"); + return false; + } + + return areVideoPortsValid(); + } + + public boolean areRtpToRtmpParametersValid() { + //log.debug("Checking Rtp to Rtmp Transcoder Parameters..."); + + if(meetingId == null || meetingId.isEmpty()) { + //log.debug("meetingId is null or empty"); + return false; + } + + return isSdpPathValid(); + } + + public boolean areFileToRtpParametersValid() { + //log.debug("Checking File to Rtp Transcoder Parameters..."); + return areVideoPortsValid() && isVideoConfLogoValid(); + } + + public boolean areFileToRtmpParametersValid() { + //log.debug("Checking File to Rtmp Transcoder Parameters..."); + + if(meetingId == null || meetingId.isEmpty()) { + //log.debug("meetingId is null or empty"); + return false; + } + + return isVideoConfLogoValid(); + } + + public boolean areVideoPortsValid() { + if(localVideoPort == null || localVideoPort.isEmpty()) { + //log.debug("localVideoPort is null or empty"); + return false; + } + + if(remoteVideoPort == null || remoteVideoPort.isEmpty()) { + //log.debug("remoteVideoPort is null or empty"); + return false; + } + + if(localVideoPort.equals("0")) { + //log.debug("localVideoPort is 0"); + return false; + } + + if(remoteVideoPort.equals("0")) { + //log.debug("remoteVideoPort is 0"); + return false; + } + + return true; + + } + + public boolean isVideoConfLogoValid() { + /*if(!GlobalCall.videoConfLogoExists(VIDEO_CONF_LOGO_PATH)) { + //log.debug("***IMAGE FOR VIDEOCONF-LOGO VIDEO DOESN'T EXIST: check the image path in bigbluebutton-sip.properties"); + return false; + }*/ + + return true; + } + + public boolean isSdpPathValid() { + /*if(!GlobalCall.sdpVideoExists(sdpPath)) { + //log.debug("***SDP FOR GLOBAL FFMPEG ({}) doesn't exist", sdpPath); + return false; + }*/ + + return true; + } + + public String getVideoStreamName(){ + return this.videoStreamName; + } + + public String getTranscoderId(){ + return this.transcoderId; + } + + public String getMeetingId(){ + return this.meetingId; + } + + public String getOutput(){ + return this.output; + } + + public String generateVideoStreamName(Type type){ + switch(type){ + case TRANSCODE_RTP_TO_RTMP: + return FFmpegUtils.GLOBAL_VIDEO_STREAM_NAME_PREFIX + voiceBridge + "_" + System.currentTimeMillis(); + case TRANSCODE_FILE_TO_RTMP: + return FFmpegUtils.VIDEOCONF_LOGO_STREAM_NAME_PREFIX + voiceBridge + "_" + System.currentTimeMillis(); + default: + return "unknown_stream_name"; + } + } + + public void setVideoStreamName(String videoStreamName) { + if (videoStreamName != null) this.videoStreamName = videoStreamName; + } + + public void setType(String type) { + if (type == null) return; + switch (type){ + case Constants.TRANSCODE_RTP_TO_RTMP: + this.type = Type.TRANSCODE_RTP_TO_RTMP; + break; + case Constants.TRANSCODE_RTMP_TO_RTP: + this.type = Type.TRANSCODE_RTMP_TO_RTP; + break; + case Constants.TRANSCODE_FILE_TO_RTP: + this.type = Type.TRANSCODE_FILE_TO_RTP; + break; + case Constants.TRANSCODE_FILE_TO_RTMP: + this.type = Type.TRANSCODE_FILE_TO_RTMP; + break; + default: + return; + } + } + + public String getType() { + switch (type){ + case TRANSCODE_RTP_TO_RTMP: + return Constants.TRANSCODE_RTP_TO_RTMP; + case TRANSCODE_RTMP_TO_RTP: + return Constants.TRANSCODE_RTMP_TO_RTP; + case TRANSCODE_FILE_TO_RTP: + return Constants.TRANSCODE_FILE_TO_RTP; + case TRANSCODE_FILE_TO_RTMP: + return Constants.TRANSCODE_FILE_TO_RTMP; + default: + return "UNKNOWN"; + } + } + + public void setSourceIp(String sourceIp) { + if (sourceIp != null) this.sourceIp = sourceIp; + } + + public String getSourceIp() { + return sourceIp; + } + + public void setLocalVideoPort(String localVideoPort) { + if (localVideoPort != null) this.localVideoPort = localVideoPort; + } + + public String getLocalVideoPort() { + return localVideoPort; + } + + public void setRemoteVideoPort(String remoteVideoPort) { + if (remoteVideoPort != null) this.remoteVideoPort = remoteVideoPort; + } + + public String getRemoteVideoPort() { + return remoteVideoPort; + } + + public void setDestinationIp(String destinationIp) { + if (destinationIp != null) this.destinationIp = destinationIp; + } + + public String getDestinationIp() { + return destinationIp; + } + +} diff --git a/akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/core/VideoTranscoderObserver.java b/akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/core/VideoTranscoderObserver.java new file mode 100644 index 0000000000..052537b980 --- /dev/null +++ b/akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/core/VideoTranscoderObserver.java @@ -0,0 +1,9 @@ +package org.bigbluebutton.transcode.core; + +import java.util.Map; + +public interface VideoTranscoderObserver { + public void handleTranscodingFinishedUnsuccessfully(); + public void handleTranscodingFinishedWithSuccess(); + public void handleVideoProbingFinishedWithSuccess(Map ffprobeResult); +} diff --git a/akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/core/api/ITranscodingInGW.java b/akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/core/api/ITranscodingInGW.java new file mode 100644 index 0000000000..ac9551e7dc --- /dev/null +++ b/akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/core/api/ITranscodingInGW.java @@ -0,0 +1,11 @@ +package org.bigbluebutton.transcode.core.api; + +import java.util.Map; + + +public interface ITranscodingInGW { + void startTranscoder(String meetingId, String transcoderId, Map params); + void updateTranscoder(String meetingId, String transcoderId, Map params); + void stopTranscoder(String meetingId, String transcoderId); + void startProbing(String meetingId, String transcoderId, Map params); +} diff --git a/akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/core/ffmpeg/FFmpegCommand.java b/akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/core/ffmpeg/FFmpegCommand.java new file mode 100644 index 0000000000..d03e7ab245 --- /dev/null +++ b/akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/core/ffmpeg/FFmpegCommand.java @@ -0,0 +1,387 @@ +package org.bigbluebutton.transcode.core.ffmpeg; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.HashMap; +import java.util.Iterator; + +public class FFmpegCommand { + private HashMap args; + private HashMap x264Params; + private List rtmpInputConnParams; + private List rtmpOutputConnParams; + + private String[] command; + + private String ffmpegPath; + private String input; + private String output; + private Boolean inputLive; + + /* Analyze duration is a special parameter that MUST come before the input */ + private String analyzeDuration; + private String probeSize; + + /* Parameters when the input is a loop image/file */ + private String loop; + private String ignoreLoop; + private int frameRate; + private String frameSize; + + public FFmpegCommand() { + this.args = new HashMap(); + this.x264Params = new HashMap(); + this.rtmpInputConnParams = new ArrayList(); + this.rtmpOutputConnParams = new ArrayList(); + + + this.ffmpegPath = null; + this.inputLive = false; + this.loop = null; + this.frameRate = 0; + } + + public String[] getFFmpegCommand(boolean shouldBuild) { + if(shouldBuild) + buildFFmpegCommand(); + + return this.command; + } + + public void buildFFmpegCommand() { + List comm = new ArrayList(); + + if(this.ffmpegPath == null) + this.ffmpegPath = "/usr/local/bin/ffmpeg"; + + comm.add(this.ffmpegPath); + + if (this.inputLive){ + comm.add("-re"); + } + + /* Analyze duration and probesize MUST come before the input */ + if(analyzeDuration != null && !analyzeDuration.isEmpty()) { + comm.add("-analyzeduration"); + comm.add(analyzeDuration); + } + + if(loop != null && !loop.isEmpty()){ + comm.add("-loop"); + comm.add(loop); + } + + if(ignoreLoop != null && !ignoreLoop.isEmpty()){ + comm.add("-ignore_loop"); + comm.add(ignoreLoop); + } + + if(probeSize != null && !probeSize.isEmpty()) { + comm.add("-probesize"); + comm.add(probeSize); + } + + buildRtmpInput(); + + comm.add("-i"); + comm.add(input); + + Iterator argsIter = this.args.entrySet().iterator(); + while (argsIter.hasNext()) { + Map.Entry pairs = (Map.Entry)argsIter.next(); + comm.add(pairs.getKey()); + comm.add(pairs.getValue()); + } + + if(!x264Params.isEmpty()) { + comm.add("-x264-params"); + String params = ""; + Iterator x264Iter = this.x264Params.entrySet().iterator(); + while (x264Iter.hasNext()) { + Map.Entry pairs = (Map.Entry)x264Iter.next(); + String argValue = pairs.getKey() + "=" + pairs.getValue(); + params += argValue; + // x264-params are separated by ':' + params += ":"; + } + // Remove trailing ':' + params.replaceAll(":+$", ""); + comm.add(params); + } + + buildRtmpOutput(); + + comm.add(this.output); + + this.command = new String[comm.size()]; + comm.toArray(this.command); + } + + /** + * Add rtmp parameters (if there are any) to the current input, + * if the input is rtmp. + */ + private void buildRtmpInput() { + if(!rtmpInputConnParams.isEmpty() && isRtmpInput()) { + StringBuilder sb = new StringBuilder(); + for (String s[] : rtmpInputConnParams){ + sb.append("conn="+s[0]+":"+s[1]+" "); + } + input+=" "+sb.toString().trim(); + } + } + + /** + * Add rtmp parameters (if there are any) to the current output, + * if the output is rtmp. + */ + private void buildRtmpOutput() { + if(!rtmpOutputConnParams.isEmpty() && isRtmpOutput()) { + StringBuilder sb = new StringBuilder(); + for (String s[] : rtmpOutputConnParams){ + sb.append("conn="+s[0]+":"+s[1]+" "); + } + output+=" "+sb.toString().trim(); + } + } + + + public void setFFmpegPath(String arg) { + this.ffmpegPath = arg; + } + + public void setInput(String arg) { + this.input = arg; + } + + public void setInputLive(Boolean live){ + this.inputLive = live; + } + + public void setOutput(String arg) { + this.output = arg; + } + + public void setCodec(String arg) { + this.args.put("-vcodec", arg); + } + + public void setLoop(String arg) { + this.loop = arg; + } + + /** + * Set ignore loop (valid for GIFs input, only) + * 0: means that input GIF will loop indefinitely + * @param arg + */ + public void setIgnoreLoop(int arg) { + this.ignoreLoop = Integer.toString(arg); + } + + public void setLevel(String arg) { + this.args.put("-level", arg); + } + + public void setPreset(String arg) { + this.args.put("-preset", arg); + } + + public void setProfile(String arg) { + this.args.put("-profile:v", arg); + } + + public void setFormat(String arg) { + this.args.put("-f", arg); + } + + public void setPayloadType(String arg) { + this.args.put("-payload_type", arg); + } + + public void setLoglevel(String arg) { + this.args.put("-loglevel", arg); + } + + public void setPixelFormat(String arg){ + this.args.put("-pix_fmt", arg); + } + + /** + * Set video bitrate, in Kbps. + * @param arg + */ + public void setVideoBitRate(int arg){ + this.args.put("-b:v", Integer.toString(arg)+"k"); + } + + /** + * Set bufsize, in Kb. + * @param arg + */ + public void setBufSize(int arg){ + this.args.put("-bufsize", Integer.toString(arg)+"k"); + } + + /** + * Set maximum bitrate, in Kbps. + * @param arg + */ + public void setMaxRate(int arg){ + this.args.put("-maxrate", Integer.toString(arg)+"k"); + } + + /** + * Set Group of images (GOP) + * @param arg + */ + public void setGop(int arg){ + this.args.put("-g", Integer.toString(arg)); + } + /** + * Set maximum NAL size, in bytes. + * This option works with libopenh264 encoder,only + * @param arg + */ + public void setMaxNalSize(String arg){ + this.args.put("-max_nal_size", arg); + } + + /** + * Set slice_mode for libopenh264 encoder. + * @param arg + */ + public void setSliceMode(String arg){ + this.args.put("-slice_mode", arg); + } + + /** + * Set rtpflags for RTP encoder. + * @param arg + */ + public void setRtpFlags(String arg){ + this.args.put("-rtpflags", arg); + } + + public void setSliceMaxSize(String arg) { + this.x264Params.put("slice-max-size", arg); + } + + public void setMaxKeyFrameInterval(String arg) { + this.x264Params.put("keyint", arg); + } + + public void addCustomParameter(String name, String value) { + this.args.put(name, value); + } + + /** + * Set how much time FFmpeg should analyze stream + * data to get stream information. Note that this + * affects directly the delay to start the stream. + * + * @param duration Analysis duration + */ + public void setAnalyzeDuration(String duration) { + this.analyzeDuration = duration; + } + + /** + * Probe size, in bytes. + * Minimum value: 32 + * Default: 5000000 + **/ + public void setProbeSize(String size) { + this.probeSize = size; + } + + /** + * Set frame rate of the input data + * @param value + */ + public void setFrameRate(int value){ + if (value>0) + this.args.put("-r",Integer.toString(value)); + } + + public void setFrameSize(String value){ + this.frameSize = value; + } + + /** + * Add parameters for rtmp connections. + * The order of parameters is the order they are added + * @param value + */ + public void addRtmpInputConnectionParameter(String value){ + //S: String + this.rtmpInputConnParams.add(new String[]{"S", value}); + } + + /** + * Add parameters for rtmp connections. + * The order of parameters is the order they are added + * @param value + */ + public void addRtmpInputConnectionParameter(boolean value){ + //B: Boolean + this.rtmpInputConnParams.add(new String[]{"B", value?"1":"0"}); + } + + /** + * Add parameters for rtmp connections. + * The order of parameters is the order they are added + * @param value + */ + public void addRtmpInputConnectionParameter(int value){ + //N : Number + this.rtmpInputConnParams.add(new String[]{"N", Integer.toString(value)}); + } + + /** + * Add parameters for rtmp connections. + * The order of parameters is the order they are added + * @param value + */ + public void addRtmpOutputConnectionParameter(String value){ + //S: String + this.rtmpOutputConnParams.add(new String[]{"S", value}); + } + + /** + * Add parameters for rtmp connections. + * The order of parameters is the order they are added + * @param value + */ + public void addRtmpOutputConnectionParameter(boolean value){ + //B: Boolean + this.rtmpOutputConnParams.add(new String[]{"B", value?"1":"0"}); + } + + /** + * Add parameters for rtmp connections. + * The order of parameters is the order they are added + * @param value + */ + public void addRtmpOutputConnectionParameter(int value){ + //N : Number + this.rtmpOutputConnParams.add(new String[]{"N", Integer.toString(value)}); + } + + /** + * Check if the current set intput is rtmp + * @return + */ + private boolean isRtmpInput(){ + return input.contains("rtmp"); + } + + /** + * Check if the current set output is rtmp + * @return + */ + private boolean isRtmpOutput(){ + return output.contains("rtmp"); + } +} diff --git a/akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/core/ffmpeg/FFmpegConstants.java b/akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/core/ffmpeg/FFmpegConstants.java new file mode 100644 index 0000000000..ac4da31e4a --- /dev/null +++ b/akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/core/ffmpeg/FFmpegConstants.java @@ -0,0 +1,45 @@ +package org.bigbluebutton.transcode.core.ffmpeg; + +public class FFmpegConstants { + + //exit codes (obtained from process exit code) + public static final int FATAL_ERROR_CODE = 128; + public static final int EXIT_WITH_SUCCESS_CODE = 0; + public static final int EXIT_WITH_NO_INPUT_CODE = 1; + public static final int EXIT_WITH_SIGKILL_CODE = FATAL_ERROR_CODE + 9; + public static final int ACCEPTABLE_EXIT_CODES[] = {EXIT_WITH_SUCCESS_CODE,EXIT_WITH_SIGKILL_CODE}; + + //status codes (obtained from sterr/out) + public static final int EXIT_WITH_SUCCESS_STATUS = 0; + public static final int EXIT_WITH_NO_INPUT_STATUS = 1; + public static final int RUNNING_STATUS = 2; + public static final int ACCEPTABLE_EXIT_STATUS[] = {EXIT_WITH_SUCCESS_STATUS,EXIT_WITH_NO_INPUT_STATUS}; + + //output constants (obtained from verbose stderr/out) + public static String FFMPEG_EXIT_WITH_NO_INPUT_OUTPUT = "Connection timed out"; + public static String WIDTH = "width"; + public static String HEIGHT = "height"; + + //Codecs + public static final String CODEC_ID_H264 = "96" ; + public static final String CODEC_NAME_H264 = "H264" ; + public static final String SAMPLE_RATE_H264 = "90000" ; + + public static boolean acceptableExitCode(int code){ + int i; + if ((ACCEPTABLE_EXIT_CODES == null) || (code < 0)) return false; + for(i=0;i(); + + if(this.ffprobePath == null) + this.ffprobePath = "/usr/local/bin/ffprobe"; + + comm.add(this.ffprobePath); + + /* Analyze duration MUST come before the input */ + if(analyzeDuration != null && !analyzeDuration.isEmpty()) { + comm.add("-analyzeduration"); + comm.add(analyzeDuration); + } + + comm.add("-i"); + comm.add(input); + + Iterator argsIter = this.args.entrySet().iterator(); + while (argsIter.hasNext()) { + Map.Entry pairs = (Map.Entry)argsIter.next(); + comm.add(pairs.getKey()); + if (pairs.getValue()!=null) + comm.add(pairs.getValue()); + } + + this.command = new String[comm.size()]; + comm.toArray(this.command); + } + + public void setFFprobepath(String arg) { + this.ffprobePath = arg; + } + + public void setAnalyzeDuration(String duration) { + this.analyzeDuration = duration; + } + + public void setInput(String arg){ + this.input = arg; + } + + public void setLoglevel(String arg){ + this.args.put("-loglevel", arg); + } + + public void setShowStreams(){ + this.args.put("-show_streams",null); + } + + public void selectStream(String arg){ + this.args.put("-select_streams", arg); + } + +} diff --git a/akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/core/processmonitor/ProcessMonitor.java b/akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/core/processmonitor/ProcessMonitor.java new file mode 100644 index 0000000000..b39618c093 --- /dev/null +++ b/akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/core/processmonitor/ProcessMonitor.java @@ -0,0 +1,255 @@ +package org.bigbluebutton.transcode.core.processmonitor; + +import java.io.InputStream; +import java.io.IOException; +import java.lang.reflect.Field; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.bigbluebutton.transcode.core.ffmpeg.FFmpegConstants; + +public class ProcessMonitor { + + private String[] command; + private Process process; + private String name; + ProcessStream inputStreamMonitor; + ProcessStream errorStreamMonitor; + private String inputStreamMonitorOutput; + private String errorStreamMonitorOutput; + public static enum Status{RUNNING,CLOSED_BY_USER}; + private Status status; + + private Thread thread; + private ProcessMonitorObserver observer; + + public ProcessMonitor(String[] command,String name) { + this.command = command; + this.process = null; + this.thread = null; + this.inputStreamMonitor = null; + this.errorStreamMonitor = null; + this.name = name; + this.inputStreamMonitorOutput = null; + this.errorStreamMonitor = null; + } + + @Override + public String toString() { + if (this.command == null || this.command.length == 0) { + return ""; + } + + Pattern pattern = Pattern.compile("(.*) (.*)"); + StringBuffer result = new StringBuffer(); + String delim = ""; + for (String i : this.command) { + Matcher matcher = pattern.matcher(i); + if(matcher.matches()) { + result.append(delim).append("\""+matcher.group(1)+" "+matcher.group(2)+"\""); + }else result.append(delim).append(i); + delim = " "; + } + return removeLogLevelFlag(result.toString()); + } + + private String getCommandString(){ + //used by the process's thread instead of toString() + return this.toString(); + } + + public void setCommand(String[] command){ + this.command = command; + } + + private void notifyProcessMonitorObserverOnFinishedUnsuccessfully() { + if(observer != null){ + //log.debug("Notifying ProcessMonitorObserver that process finished unsuccessfully"); + observer.handleProcessFinishedUnsuccessfully(this.name,inputStreamMonitorOutput); + }else { + //log.debug("Cannot notify ProcessMonitorObserver that process finished unsuccessfully: ProcessMonitorObserver null"); + } + } + + private void notifyProcessMonitorObserverOnFinished() { + if(observer != null){ + //log.debug("Notifying ProcessMonitorObserver that {} successfully finished",this.name); + observer.handleProcessFinishedWithSuccess(this.name,inputStreamMonitorOutput); + }else { + //log.debug("Cannot notify ProcessMonitorObserver that {} finished: ProcessMonitorObserver null",this.name); + } + } + + public synchronized void start() { + if(this.thread == null){ + this.thread = new Thread( new Runnable(){ + public void run(){ + try { + System.out.println(" > Creating thread to execute " + name); + process = Runtime.getRuntime().exec(command); + System.out.println(" > Executing " + name + "( pid=" + getPid() + " ):\n " + getCommandString()); + + if(status == Status.CLOSED_BY_USER) { + System.out.println(" > ProcessMonitor closed by user. Closing " + name + " it immediatelly"); + forceDestroy(); + return; + } + + InputStream is = process.getInputStream(); + InputStream es = process.getErrorStream(); + + inputStreamMonitor = new ProcessStream(is,"STDOUT"); + errorStreamMonitor = new ProcessStream(es,"STDERR"); + + inputStreamMonitor.start(); + errorStreamMonitor.start(); + + process.waitFor(); + } + catch(SecurityException se) { + System.out.println("Security Exception"); + } + catch(IOException ioe) { + System.out.println("IO Exception"); + } + catch(NullPointerException npe) { + System.out.println("NullPointer Exception"); + } + catch(IllegalArgumentException iae) { + System.out.println("IllegalArgument Exception"); + } + catch(InterruptedException ie) { + System.out.println("Interrupted Exception"); + } + + int ret = process.exitValue(); + + if (FFmpegConstants.acceptableExitCode(ret) || errorStreamMonitor.acceptableExitStatus()){ + //log.debug("Exiting thread that executes {}. Exit value: {}, Exit status (from stdout): {} ",name,ret, errorStreamMonitor.getExitStatus()); + storeProcessOutputs(inputStreamMonitor.getOutput(), errorStreamMonitor.getOutput()); + clearData(); + notifyProcessMonitorObserverOnFinished(); + } + else{ + //log.debug("Exiting thread that executes {}. Exit value: {}, Exit status (from stdout): {}",name,ret,errorStreamMonitor.getExitStatus()); + storeProcessOutputs(inputStreamMonitor.getOutput(), errorStreamMonitor.getOutput()); + clearData(); + notifyProcessMonitorObserverOnFinishedUnsuccessfully(); + } + } + }); + status = Status.RUNNING; + this.thread.start(); + }else{ + //log.debug("Can't start a new process monitor: It is already running."); + } + } + + public synchronized void restart(){ + clearData(); + status = Status.CLOSED_BY_USER; + start(); + } + + private void clearData(){ + closeProcessStream(); + closeProcess(); + clearMonitorThread(); + } + + private void clearMonitorThread(){ + if (this.thread !=null) + this.thread=null; + } + + private void closeProcessStream(){ + if(this.inputStreamMonitor != null){ + this.inputStreamMonitor.close(); + this.inputStreamMonitor = null; + } + if (this.errorStreamMonitor != null) { + this.errorStreamMonitor.close(); + this.errorStreamMonitor = null; + } + } + + private void closeProcess(){ + if(this.process != null) { + status = Status.CLOSED_BY_USER; + //log.debug("Closing {} process",this.name); + this.process.destroy(); + this.process = null; + } + } + + private void storeProcessOutputs(String inputStreamOutput,String errorStreamOutput){ + this.inputStreamMonitorOutput = inputStreamOutput; + this.errorStreamMonitorOutput = errorStreamOutput; + } + + public synchronized void destroy() { + if (this.thread != null){ + status = Status.CLOSED_BY_USER; + clearData(); + //log.debug("ProcessMonitor successfully finished"); + }else{ + //log.debug("Can't destroy this process monitor: There's no process running."); + } + } + + public void setProcessMonitorObserver(ProcessMonitorObserver observer){ + if (observer==null){ + //log.debug("Cannot assign observer: ProcessMonitorObserver null"); + }else this.observer = observer; + } + + public int getPid(){ + Field f; + int pid; + try { + if (this.process == null) return -1; + f = this.process.getClass().getDeclaredField("pid"); + f.setAccessible(true); + pid = (int)f.get(this.process); + return pid; + } catch (IllegalArgumentException | IllegalAccessException + | NoSuchFieldException | SecurityException e) { + //log.debug("Error when obtaining {} PID",this.name); + return -1; + } + } + + public synchronized void forceDestroy(){ + if (this.thread != null) { + status = Status.CLOSED_BY_USER; + try { + int pid = getPid(); + if (pid < 0){ + //log.debug("Process doesn't exist. Not destroying it..."); + return; + }else { + Runtime.getRuntime().exec("kill -9 "+ getPid()); + } + } catch (IOException e) { + //log.debug("Failed to force-kill {} process",this.name); + e.printStackTrace(); + } + }else{ + //log.debug("Can't force-destroy this process monitor: There's no process running."); + } + } + + public boolean isFFmpegProcess(){ + return this.name.toLowerCase().contains("ffmpeg"); + } + + /** + * Removes loglevel flag of ffmpeg command. + * Usefull for faster debugging + */ + private String removeLogLevelFlag(String commandString){ + if (isFFmpegProcess()){ + return commandString.replaceAll("-loglevel \\w+", ""); + }else return commandString; + } + +} diff --git a/akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/core/processmonitor/ProcessMonitorObserver.java b/akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/core/processmonitor/ProcessMonitorObserver.java new file mode 100644 index 0000000000..c2b90869a1 --- /dev/null +++ b/akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/core/processmonitor/ProcessMonitorObserver.java @@ -0,0 +1,6 @@ +package org.bigbluebutton.transcode.core.processmonitor; + +public interface ProcessMonitorObserver { + public void handleProcessFinishedUnsuccessfully(String processName, String processOutput); + public void handleProcessFinishedWithSuccess(String processName, String processOutput); +} diff --git a/akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/core/processmonitor/ProcessStream.java b/akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/core/processmonitor/ProcessStream.java new file mode 100644 index 0000000000..47eb243a20 --- /dev/null +++ b/akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/core/processmonitor/ProcessStream.java @@ -0,0 +1,95 @@ +package org.bigbluebutton.transcode.core.processmonitor; + +import java.io.BufferedReader; +import java.io.InputStream; +import java.io.InputStreamReader; + +import java.io.IOException; +import org.bigbluebutton.transcode.core.ffmpeg.FFmpegConstants; + +public class ProcessStream { + private InputStream stream; + private Thread thread; + private String type; + private String output; + private int exitStatus = FFmpegConstants.RUNNING_STATUS; + + ProcessStream(InputStream stream, String type) { + if(stream != null) + this.stream = stream; + this.type = type; + this.output = ""; + } + + protected void start() { + exitStatus = FFmpegConstants.RUNNING_STATUS; + this.thread = new Thread( new Runnable(){ + public void run(){ + try { + String line; + InputStreamReader isr = new InputStreamReader(stream); + BufferedReader ibr = new BufferedReader(isr); + output = ""; + while ((line = ibr.readLine()) != null) { + ////log.debug("[{}]"+line,type); + updateCurrentStatusFromOutput(line); + output+=line+"\n"; + } + + close(); + } + catch(IOException ioe) { + //log.debug("Finishing process stream [type={}] because there's no more data to be read",type); + close(); + } + } + }); + this.thread.start(); + } + + protected void close() { + try { + if(this.stream != null) { + ////log.debug("Closing process stream"); + this.stream.close(); + this.stream = null; + } + } + catch(IOException ioe) { + //log.debug("IOException"); + } + } + + protected String getOutput(){ + return this.output; + } + + /** + * Update current process status based on the stdout. + * The exitStatus is mapped according to ffmpeg exit status + * @param outputLine + * Requires loglevel verbose of FFmpegCommand + */ + private void updateCurrentStatusFromOutput(String outputLine){ + if (outputLine != null){ + if (outputLine.contains(FFmpegConstants.FFMPEG_EXIT_WITH_NO_INPUT_OUTPUT)){ + ////log.debug("FFmpeg exited with no input status."); + exitStatus = FFmpegConstants.EXIT_WITH_NO_INPUT_STATUS; + } + /*else if outputLine.contains(FFmpegConstants....) + exitStatus = FFmpegConstants.... + */ + } + } + + public int getExitStatus(){ + return exitStatus; + } + + /** + * Validates exit status + */ + public boolean acceptableExitStatus(){ + return FFmpegConstants.acceptableExitStatus(exitStatus); + } +} diff --git a/akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/pubsub/RedisMessageReceiver.java b/akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/pubsub/RedisMessageReceiver.java new file mode 100644 index 0000000000..e1ca955837 --- /dev/null +++ b/akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/pubsub/RedisMessageReceiver.java @@ -0,0 +1,72 @@ +package org.bigbluebutton.transcode.pubsub.receivers; + +import org.bigbluebutton.transcode.core.api.ITranscodingInGW; + +import org.bigbluebutton.common.messages.StartTranscoderRequestMessage; +import org.bigbluebutton.common.messages.UpdateTranscoderRequestMessage; +import org.bigbluebutton.common.messages.StopTranscoderRequestMessage; +import org.bigbluebutton.common.messages.StartProbingRequestMessage; + +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; + +public class RedisMessageReceiver { + + public static final String TO_BBB_TRANSCODE_CHANNEL = "bigbluebutton:to-bbb-transcode"; + public static final String TO_BBB_TRANSCODE_PATTERN = TO_BBB_TRANSCODE_CHANNEL + ":*"; + public static final String TO_BBB_TRANSCODE_SYSTEM_CHAN = TO_BBB_TRANSCODE_CHANNEL + ":system"; + + private ITranscodingInGW transcodingInGW; + + public RedisMessageReceiver(ITranscodingInGW transcodingInGW) { + this.transcodingInGW = transcodingInGW; + } + + public void handleMessage(String pattern, String channel, String message) { + if (channel.equalsIgnoreCase(TO_BBB_TRANSCODE_SYSTEM_CHAN)) { + JsonParser parser = new JsonParser(); + JsonObject obj = (JsonObject) parser.parse(message); + + if (obj.has("header") && obj.has("payload")) { + JsonObject header = (JsonObject) obj.get("header"); + + if (header.has("name")) { + String messageName = header.get("name").getAsString(); + switch (messageName) { + case StartTranscoderRequestMessage.START_TRANSCODER_REQUEST: + processStartTranscoderRequestMessage(message); + break; + case UpdateTranscoderRequestMessage.UPDATE_TRANSCODER_REQUEST: + processUpdateTranscoderRequestMessage(message); + break; + case StopTranscoderRequestMessage.STOP_TRANSCODER_REQUEST: + processStopTranscoderRequestMessage(message); + break; + case StartProbingRequestMessage.START_PROBING_REQUEST: + processStartProbingRequestMessage(message); + } + } + } + } + } + + private void processStartTranscoderRequestMessage(String json) { + StartTranscoderRequestMessage msg = StartTranscoderRequestMessage.fromJson(json); + transcodingInGW.startTranscoder(msg.meetingId, msg.transcoderId, msg.params); + } + + private void processUpdateTranscoderRequestMessage(String json) { + UpdateTranscoderRequestMessage msg = UpdateTranscoderRequestMessage.fromJson(json); + transcodingInGW.updateTranscoder(msg.meetingId, msg.transcoderId, msg.params); + } + + private void processStopTranscoderRequestMessage(String json) { + StopTranscoderRequestMessage msg = StopTranscoderRequestMessage.fromJson(json); + transcodingInGW.stopTranscoder(msg.meetingId, msg.transcoderId); + } + + private void processStartProbingRequestMessage(String json) { + StartProbingRequestMessage msg = StartProbingRequestMessage.fromJson(json); + transcodingInGW.startProbing(msg.meetingId, msg.transcoderId, msg.params); + } +} diff --git a/akka-bbb-transcode/src/main/resources/application.conf b/akka-bbb-transcode/src/main/resources/application.conf new file mode 100644 index 0000000000..766ef0c84e --- /dev/null +++ b/akka-bbb-transcode/src/main/resources/application.conf @@ -0,0 +1,45 @@ +akka { + actor { + debug { + receive = on + } + } + loglevel = INFO + stdout-loglevel = "INFO" + + rediscala-subscriber-worker-dispatcher { + mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox" + # Throughput defines the maximum number of messages to be + # processed per actor before the thread jumps to the next actor. + # Set to 1 for as fair as possible. + throughput = 512 + } +} + + +redis { + host="127.0.0.1" + port=6379 + password="" +} + +videoconference { + #The image to use in the videoconference window and/or when the webuser has no video + videoconf-logo-image-path = /usr/share/red5/webapps/sip/WEB-INF/mconf-videoconf-logo.gif + + #Enable username subtitle on video-conf-logo (the one shown in sip-phone when + #webconference's talker has no video ) + enable-user-video-subtitle = true + + #To change the sip video resolution, edit below: + #IMPORTANT: For now, we only accept these 3 resolutions: 160x120, 320x240, 640x480 + sip-video-resolution=640x480 +} + +transcoder { + #The path where ffmpeg is installed + ffmpeg-path = /usr/local/bin/ffmpeg + + #The path where ffprobe is installed + ffprobe-path = /usr/local/bin/ffprobe +} diff --git a/akka-bbb-transcode/src/main/resources/logback.xml b/akka-bbb-transcode/src/main/resources/logback.xml new file mode 100644 index 0000000000..73cb0571c1 --- /dev/null +++ b/akka-bbb-transcode/src/main/resources/logback.xml @@ -0,0 +1,29 @@ + + + + + %date{ISO8601} %-5level %logger{36} %X{akkaSource} - %msg%n + + + + + logs/bbb-transcode.log + + logs/bbb-transcode.%d{yyyy-MM-dd}.log + + 5 + + + %d{"yyyy-MM-dd HH:mm:ss,SSSXXX"} [%thread] %-5level %logger{35} - %msg%n + + + + + + + + + + + + diff --git a/akka-bbb-transcode/src/main/scala/org/bigbluebutton/Boot.scala b/akka-bbb-transcode/src/main/scala/org/bigbluebutton/Boot.scala new file mode 100644 index 0000000000..d38af2c86d --- /dev/null +++ b/akka-bbb-transcode/src/main/scala/org/bigbluebutton/Boot.scala @@ -0,0 +1,23 @@ +package org.bigbluebutton + +import akka.actor.{ ActorSystem, Props } +import scala.concurrent.duration._ +import redis.RedisClient +import scala.concurrent.{ Future, Await } +import scala.concurrent.ExecutionContext.Implicits.global +import org.bigbluebutton.endpoint.redis.{ RedisPublisher, AppsRedisSubscriberActor } +import org.bigbluebutton.transcode.pubsub.receivers.RedisMessageReceiver +import org.bigbluebutton.transcode.core.TranscodingInGW + +object Boot extends App with SystemConfiguration { + + implicit val system = ActorSystem("bigbluebutton-transcode-system") + + val redisPublisher = new RedisPublisher(system) + + var transcodingInGW = new TranscodingInGW(system, redisPublisher); + + val redisMsgReceiver = new RedisMessageReceiver(transcodingInGW); + + val redisSubscriberActor = system.actorOf(AppsRedisSubscriberActor.props(system, redisMsgReceiver), "redis-subscriber") +} diff --git a/akka-bbb-transcode/src/main/scala/org/bigbluebutton/SystemConfiguration.scala b/akka-bbb-transcode/src/main/scala/org/bigbluebutton/SystemConfiguration.scala new file mode 100644 index 0000000000..9bfb709886 --- /dev/null +++ b/akka-bbb-transcode/src/main/scala/org/bigbluebutton/SystemConfiguration.scala @@ -0,0 +1,20 @@ +package org.bigbluebutton + +import com.typesafe.config.ConfigFactory +import scala.util.Try + +trait SystemConfiguration { + + val config = ConfigFactory.load() + + lazy val redisHost = Try(config.getString("redis.host")).getOrElse("127.0.0.1") + lazy val redisPort = Try(config.getInt("redis.port")).getOrElse(6379) + lazy val redisPassword = Try(config.getString("redis.password")).getOrElse("") + + lazy val _ffmpegPath = Try(config.getString("transcoder.ffmpeg-path")).getOrElse("/usr/local/bin/ffmpeg") + lazy val _ffprobePath = Try(config.getString("transcoder.ffprobe-path")).getOrElse("/usr/local/bin/ffprobe") + + lazy val _videoconfLogoImagePath = Try(config.getString("videoconference.videoconf-logo-image-path")).getOrElse("") + lazy val _enableUserVideoSubtitle = Try(config.getString("videoconference.enable-user-video-subtitle").toBoolean).getOrElse(false) + lazy val _sipVideoResolution = Try(config.getString("videoconference.sip-video-resolution")).getOrElse("") +} diff --git a/akka-bbb-transcode/src/main/scala/org/bigbluebutton/endpoint/redis/AppsRedisSubscriberActor.scala b/akka-bbb-transcode/src/main/scala/org/bigbluebutton/endpoint/redis/AppsRedisSubscriberActor.scala new file mode 100755 index 0000000000..3fc85256ac --- /dev/null +++ b/akka-bbb-transcode/src/main/scala/org/bigbluebutton/endpoint/redis/AppsRedisSubscriberActor.scala @@ -0,0 +1,79 @@ +package org.bigbluebutton.endpoint.redis + +import akka.actor.Props +import java.net.InetSocketAddress +import redis.actors.RedisSubscriberActor +import redis.api.pubsub.{ PMessage, Message } +import scala.concurrent.duration._ +import akka.actor.ActorRef +import akka.actor.actorRef2Scala +import org.bigbluebutton.SystemConfiguration +import org.bigbluebutton.transcode.pubsub.receivers.RedisMessageReceiver +import redis.api.servers.ClientSetname +import org.bigbluebutton.common.converters.FromJsonDecoder +import org.bigbluebutton.common.messages.PubSubPongMessage +import akka.actor.ActorSystem +import scala.concurrent.duration._ +import scala.concurrent.ExecutionContext.Implicits.global + +object AppsRedisSubscriberActor extends SystemConfiguration { + + val channels = Seq("time") + val patterns = Seq("bigbluebutton:to-bbb-transcode:*") + + def props(system: ActorSystem, msgReceiver: RedisMessageReceiver): Props = + Props(classOf[AppsRedisSubscriberActor], system, msgReceiver, + redisHost, redisPort, + channels, patterns).withDispatcher("akka.rediscala-subscriber-worker-dispatcher") +} + +class AppsRedisSubscriberActor(val system: ActorSystem, msgReceiver: RedisMessageReceiver, redisHost: String, + redisPort: Int, channels: Seq[String] = Nil, patterns: Seq[String] = Nil) + extends RedisSubscriberActor(new InetSocketAddress(redisHost, redisPort), + channels, patterns) { + + val decoder = new FromJsonDecoder() + + var lastPongReceivedOn = 0L + system.scheduler.schedule(10 seconds, 10 seconds)(checkPongMessage()) + + // Set the name of this client to be able to distinguish when doing + // CLIENT LIST on redis-cli + write(ClientSetname("BbbTranscodeAkkaSub").encodedRequest) + + def checkPongMessage() { + val now = System.currentTimeMillis() + + if (lastPongReceivedOn != 0 && (now - lastPongReceivedOn > 30000)) { + log.error("BBB-Transcode pubsub error!"); + } + } + + def onMessage(message: Message) { + log.debug(s"message received: $message") + } + + def onPMessage(pmessage: PMessage) { + log.debug(s"pattern message received: $pmessage") + + val msg = decoder.decodeMessage(pmessage.data) + + if (msg != null) { + msg match { + case m: PubSubPongMessage => { + if (m.payload.system == "BbbTranscode") { + lastPongReceivedOn = System.currentTimeMillis() + } + } + case _ => // do nothing + } + } else { + msgReceiver.handleMessage(pmessage.patternMatched, pmessage.channel, pmessage.data) + } + + } + + def handleMessage(msg: String) { + log.warning("**** TODO: Handle pubsub messages. ****") + } +} diff --git a/akka-bbb-transcode/src/main/scala/org/bigbluebutton/endpoint/redis/RedisPublisher.scala b/akka-bbb-transcode/src/main/scala/org/bigbluebutton/endpoint/redis/RedisPublisher.scala new file mode 100755 index 0000000000..1f491cef63 --- /dev/null +++ b/akka-bbb-transcode/src/main/scala/org/bigbluebutton/endpoint/redis/RedisPublisher.scala @@ -0,0 +1,34 @@ +package org.bigbluebutton.endpoint.redis + +import akka.actor.Props +import redis.RedisClient +import scala.concurrent.duration._ +import scala.concurrent.ExecutionContext.Implicits.global +import akka.actor.ActorSystem +import scala.concurrent.Await +import akka.actor.Actor +import org.bigbluebutton.SystemConfiguration +import org.bigbluebutton.common.converters.ToJsonEncoder + +class RedisPublisher(val system: ActorSystem) extends SystemConfiguration { + + val redis = RedisClient(redisHost, redisPort)(system) + + // Set the name of this client to be able to distinguish when doing + // CLIENT LIST on redis-cli + redis.clientSetname("BbbTranscodeAkkaPub") + + val encoder = new ToJsonEncoder() + def sendPingMessage() { + val json = encoder.encodePubSubPingMessage("BbbTranscode", System.currentTimeMillis()) + redis.publish("bigbluebutton:to-bbb-apps:system", json) + } + + system.scheduler.schedule(10 seconds, 10 seconds)(sendPingMessage()) + + def publish(channel: String, data: String) { + //println("PUBLISH TO [" + channel + "]: \n [" + data + "]") + redis.publish(channel, data) + } + +} diff --git a/akka-bbb-transcode/src/main/scala/org/bigbluebutton/transcode/TranscodingInGW.scala b/akka-bbb-transcode/src/main/scala/org/bigbluebutton/transcode/TranscodingInGW.scala new file mode 100644 index 0000000000..c49cb5a02a --- /dev/null +++ b/akka-bbb-transcode/src/main/scala/org/bigbluebutton/transcode/TranscodingInGW.scala @@ -0,0 +1,31 @@ +package org.bigbluebutton.transcode.core + +import org.bigbluebutton.transcode.core.api.ITranscodingInGW +import org.bigbluebutton.endpoint.redis.RedisPublisher +import scala.collection.JavaConversions._ +import java.util.ArrayList +import scala.collection.mutable.ArrayBuffer +import akka.actor.ActorSystem +import org.bigbluebutton.transcode.api._ + +class TranscodingInGW(val system: ActorSystem, messageSender: RedisPublisher) extends ITranscodingInGW { + val log = system.log + val transcodingActor = system.actorOf(TranscodingActor.props(system, messageSender), "bbb-transcoding-manager") + + def startTranscoder(meetingId: String, transcoderId: String, params: java.util.Map[String, String]) { + transcodingActor ! new StartTranscoderRequest(meetingId, transcoderId, params) + } + + def updateTranscoder(meetingId: String, transcoderId: String, params: java.util.Map[String, String]) { + transcodingActor ! new UpdateTranscoderRequest(meetingId, transcoderId, params) + } + + def stopTranscoder(meetingId: String, transcoderId: String) { + transcodingActor ! new StopTranscoderRequest(meetingId, transcoderId) + } + + def startProbing(meetingId: String, transcoderId: String, params: java.util.Map[String, String]) { + transcodingActor ! new StartProbingRequest(meetingId, transcoderId, params) + } + +} diff --git a/akka-bbb-transcode/src/main/scala/org/bigbluebutton/transcode/api/InMessages.scala b/akka-bbb-transcode/src/main/scala/org/bigbluebutton/transcode/api/InMessages.scala new file mode 100644 index 0000000000..e8ee89a6be --- /dev/null +++ b/akka-bbb-transcode/src/main/scala/org/bigbluebutton/transcode/api/InMessages.scala @@ -0,0 +1,8 @@ +package org.bigbluebutton.transcode.api + +trait InMessage { val meetingId: String } + +case class StartTranscoderRequest(meetingId: String, transcoderId: String, params: java.util.Map[String, String]) extends InMessage +case class UpdateTranscoderRequest(meetingId: String, transcoderId: String, params: java.util.Map[String, String]) extends InMessage +case class StopTranscoderRequest(meetingId: String, transcoderId: String) extends InMessage +case class StartProbingRequest(meetingId: String, transcoderId: String, params: java.util.Map[String, String]) extends InMessage diff --git a/akka-bbb-transcode/src/main/scala/org/bigbluebutton/transcode/api/OutMessages.scala b/akka-bbb-transcode/src/main/scala/org/bigbluebutton/transcode/api/OutMessages.scala new file mode 100644 index 0000000000..fa650f308f --- /dev/null +++ b/akka-bbb-transcode/src/main/scala/org/bigbluebutton/transcode/api/OutMessages.scala @@ -0,0 +1,12 @@ +package org.bigbluebutton.transcode.api + +import scala.collection.mutable.HashMap + +abstract class OutMessage + +case class StartTranscoderReply(meetingId: String, transcoderId: String, params: HashMap[String, String]) extends OutMessage +case class StopTranscoderReply(meetingId: String, transcoderId: String) extends OutMessage +case class UpdateTranscoderReply(meetingId: String, transcoderId: String, params: HashMap[String, String]) extends OutMessage +case class StartProbingReply(meetingId: String, transcoderId: String, params: HashMap[String, String]) extends OutMessage + +case class TranscoderStatusUpdate(meetingId: String, transcoderId: String, params: HashMap[String, String]) extends OutMessage diff --git a/akka-bbb-transcode/src/main/scala/org/bigbluebutton/transcode/core/MessageSenderActor.scala b/akka-bbb-transcode/src/main/scala/org/bigbluebutton/transcode/core/MessageSenderActor.scala new file mode 100644 index 0000000000..af2ce5e9f9 --- /dev/null +++ b/akka-bbb-transcode/src/main/scala/org/bigbluebutton/transcode/core/MessageSenderActor.scala @@ -0,0 +1,84 @@ +package org.bigbluebutton.transcode.core + +import akka.actor.Actor +import akka.actor.ActorContext +import akka.actor.ActorLogging +import akka.actor.Props +import org.bigbluebutton.transcode.api._ +import org.bigbluebutton.endpoint.redis.RedisPublisher + +import collection.JavaConverters._ +import scala.collection.JavaConversions._ + +import org.bigbluebutton.common.messages.StartTranscoderReplyMessage +import org.bigbluebutton.common.messages.StopTranscoderReplyMessage +import org.bigbluebutton.common.messages.TranscoderStatusUpdateMessage +import org.bigbluebutton.common.messages.UpdateTranscoderReplyMessage +import org.bigbluebutton.common.messages.StartProbingReplyMessage +import org.bigbluebutton.common.messages.MessagingConstants + +object MessageSenderActor { + def props(msgSender: RedisPublisher): Props = + Props(classOf[MessageSenderActor], msgSender) +} + +class MessageSenderActor(val msgSender: RedisPublisher) + extends Actor with ActorLogging { + + def receive = { + case msg: StartTranscoderReply => handleStartTranscoderReply(msg) + case msg: StopTranscoderReply => handleStopTranscoderReply(msg) + case msg: UpdateTranscoderReply => handleUpdateTranscoderReply(msg) + case msg: TranscoderStatusUpdate => handleTranscoderStatusUpdate(msg) + case msg: StartProbingReply => handleStartProbingReply(msg) + case _ => // do nothing + } + + private def handleStartTranscoderReply(msg: StartTranscoderReply) { + System.out.println("Sending StartTranscoderReplyMessage. Params: [\n" + + "meetingId = " + msg.meetingId + "\n" + + "transcoderId = " + msg.transcoderId + "\n" + + "params = " + msg.params.mkString(", ") + "\n]\n") + + val str = new StartTranscoderReplyMessage(msg.meetingId, msg.transcoderId, msg.params) + msgSender.publish(MessagingConstants.FROM_BBB_TRANSCODE_SYSTEM_CHAN, str.toJson()) + } + + private def handleStopTranscoderReply(msg: StopTranscoderReply) { + System.out.println("Sending StopTranscoderReplyMessage. Params: [\n" + + "meetingId = " + msg.meetingId + "\n" + + "transcoderId = " + msg.transcoderId + "\n]\n") + val str = new StopTranscoderReplyMessage(msg.meetingId, msg.transcoderId) + msgSender.publish(MessagingConstants.FROM_BBB_TRANSCODE_SYSTEM_CHAN, str.toJson()) + } + + private def handleUpdateTranscoderReply(msg: UpdateTranscoderReply) { + System.out.println("Sending UpdateTranscoderReplyMessage. Params: [\n" + + "meetingId = " + msg.meetingId + "\n" + + "transcoderId = " + msg.transcoderId + "\n" + + "params = " + msg.params.mkString(", ") + "\n]\n") + + val str = new UpdateTranscoderReplyMessage(msg.meetingId, msg.transcoderId, msg.params) + msgSender.publish(MessagingConstants.FROM_BBB_TRANSCODE_SYSTEM_CHAN, str.toJson()) + } + + private def handleTranscoderStatusUpdate(msg: TranscoderStatusUpdate) { + System.out.println("Sending TranscoderStatusUpdateMessage. Params: [\n" + + "meetingId = " + msg.meetingId + "\n" + + "transcoderId = " + msg.transcoderId + "\n" + + "params = " + msg.params.mkString(", ") + "\n]\n") + + val str = new TranscoderStatusUpdateMessage(msg.meetingId, msg.transcoderId, msg.params) + msgSender.publish(MessagingConstants.FROM_BBB_TRANSCODE_SYSTEM_CHAN, str.toJson()) + } + + private def handleStartProbingReply(msg: StartProbingReply) { + System.out.println("Sending StartProbingReplyMessage. Params: [\n" + + "meetingId = " + msg.meetingId + "\n" + + "transcoderId = " + msg.transcoderId + "\n" + + "params = " + msg.params.mkString(", ") + "\n]\n") + val str = new TranscoderStatusUpdateMessage(msg.meetingId, msg.transcoderId, msg.params) + msgSender.publish(MessagingConstants.FROM_BBB_TRANSCODE_SYSTEM_CHAN, str.toJson()) + } + +} diff --git a/akka-bbb-transcode/src/main/scala/org/bigbluebutton/transcode/core/TranscodersModel.scala b/akka-bbb-transcode/src/main/scala/org/bigbluebutton/transcode/core/TranscodersModel.scala new file mode 100644 index 0000000000..eee34eef24 --- /dev/null +++ b/akka-bbb-transcode/src/main/scala/org/bigbluebutton/transcode/core/TranscodersModel.scala @@ -0,0 +1,26 @@ + +package org.bigbluebutton.transcode.core + +import scala.collection.mutable.HashMap +import akka.actor.ActorRef + +class TranscodersModel { + + private var transcoders = new HashMap[String, ActorRef] + + def addTranscoder(transcoderId: String, transcoderActor: ActorRef) { + transcoders += transcoderId -> transcoderActor + } + + def removeTranscoder(transcoderId: String) { + transcoders -= transcoderId + } + + def getTranscoder(transcoderId: String): Option[ActorRef] = { + transcoders.get(transcoderId) + } + + def getTranscoders(): Array[ActorRef] = { + transcoders.values toArray + } +} diff --git a/akka-bbb-transcode/src/main/scala/org/bigbluebutton/transcode/core/TranscodersService.scala b/akka-bbb-transcode/src/main/scala/org/bigbluebutton/transcode/core/TranscodersService.scala new file mode 100644 index 0000000000..1991583dee --- /dev/null +++ b/akka-bbb-transcode/src/main/scala/org/bigbluebutton/transcode/core/TranscodersService.scala @@ -0,0 +1,29 @@ +package org.bigbluebutton.transcode.core + +import org.bigbluebutton.SystemConfiguration +import org.bigbluebutton.transcode.core.ffmpeg.FFmpegUtils + +class TranscodersService {} + +object TranscodersService extends SystemConfiguration { + + def ffmpegPath(): String = { + _ffmpegPath + } + + def ffprobePath(): String = { + _ffprobePath + } + + def videoconfLogoImagePath(): String = { + _videoconfLogoImagePath + } + + def enableUserVideoSubtitle(): Boolean = { + _enableUserVideoSubtitle + } + + def sipVideoResolution(): String = { + _sipVideoResolution + } +} diff --git a/akka-bbb-transcode/src/main/scala/org/bigbluebutton/transcode/core/TranscodingActor.scala b/akka-bbb-transcode/src/main/scala/org/bigbluebutton/transcode/core/TranscodingActor.scala new file mode 100644 index 0000000000..3aed0cf7b4 --- /dev/null +++ b/akka-bbb-transcode/src/main/scala/org/bigbluebutton/transcode/core/TranscodingActor.scala @@ -0,0 +1,106 @@ +package org.bigbluebutton.transcode.core + +import akka.actor._ +import akka.actor.ActorLogging +import scala.collection.mutable.HashMap +import org.bigbluebutton.endpoint.redis.RedisPublisher +import org.bigbluebutton.transcode.api._ +import org.bigbluebutton.SystemConfiguration +import scala.collection._ +import scala.collection.JavaConversions._ +import org.bigbluebutton.common.messages.Constants +import org.bigbluebutton.transcode.core.apps.{ TranscodingObserverApp } + +object TranscodingActor extends SystemConfiguration { + def props(system: ActorSystem, messageSender: RedisPublisher): Props = + Props(classOf[TranscodingActor], system, messageSender) +} + +class TranscodingActor(val system: ActorSystem, messageSender: RedisPublisher) + extends Actor with ActorLogging with TranscodingObserverApp { + val transcodersModel = new TranscodersModel() + + val messageSenderActor = context.actorOf(MessageSenderActor.props(messageSender), "bbb-sender-actor") + + def receive = { + case msg: StartTranscoderRequest => handleStartTranscoderRequest(msg) + case msg: UpdateTranscoderRequest => handleUpdateTranscoderRequest(msg) + case msg: StopTranscoderRequest => handleStopTranscoderRequest(msg) + case msg: StartProbingRequest => handleStartProbingRequest(msg) + + //internal messages + case msg: StartVideoTranscoderReply => handleStartVideoTranscoderReply(msg) + case msg: UpdateVideoTranscoderReply => handleUpdateVideoTranscoderReply(msg) + case msg: DestroyVideoTranscoderReply => handleDestroyVideoTranscoderReply(msg) + case msg: TranscodingFinishedUnsuccessfully => handleTranscodingFinishedUnsuccessfully(msg) + case msg: TranscodingFinishedSuccessfully => handleTranscodingFinishedSuccessfully(msg) + case msg: RestartVideoTranscoderReply => handleRestartVideoTranscoderReply(msg) + case msg: StartVideoProbingReply => handleStartVideoProbingReply(msg) + case _ => // do nothing + } + + private def handleStartTranscoderRequest(msg: StartTranscoderRequest) { + log.info("\n > Received StartTranscoderRequest. Params:\n" + + " meetingId = " + msg.meetingId + "\n" + + " transcoderId = " + msg.transcoderId + "\n" + + " params = " + msg.params.toString() + "\n") + + transcodersModel.getTranscoder(msg.transcoderId) match { + case Some(vt) => { + log.info("\n > Found a transcoder for this user {}", msg.transcoderId) + vt ! new StartVideoTranscoderRequest() + } + case None => { + val vt = context.actorOf(VideoTranscoder.props(self, msg.meetingId, msg.transcoderId, msg.params)) + transcodersModel.addTranscoder(msg.transcoderId, vt) + vt ! new StartVideoTranscoderRequest() + } + } + } + + private def handleUpdateTranscoderRequest(msg: UpdateTranscoderRequest) { + log.info("\n > Received UpdateTranscoderRequest. Params:\n" + + " meetingId = " + msg.meetingId + "\n" + + " transcoderId = " + msg.transcoderId + "\n" + + " params = " + msg.params.toString() + "\n") + + transcodersModel.getTranscoder(msg.transcoderId) match { + case Some(vt) => vt ! new UpdateVideoTranscoderRequest(msg.params) + case None => + log.info("\n > Video transcoder with id = {} not found (might be finished already or it is restarting).", msg.transcoderId) + } + } + + private def handleStopTranscoderRequest(msg: StopTranscoderRequest) { + log.info("\n > Received StopTranscoderRequest. Params:\n" + + " meetingId = " + msg.meetingId + "\n" + + " transcoderId = " + msg.transcoderId + "\n") + transcodersModel.getTranscoder(msg.transcoderId) match { + case Some(vt) => { + transcodersModel.removeTranscoder(msg.transcoderId) + vt ! new DestroyVideoTranscoderRequest() //stop transcoder and destroy it's actor + } + case None => { + log.info("\n > Transcoder with id = {} not found (might be finished already).", msg.transcoderId) + } + } + } + + private def handleStartProbingRequest(msg: StartProbingRequest) { + log.info("\n > Received StartProbingRequest. Params:\n" + + " meetingId = " + msg.meetingId + "\n" + + " transcoderId = " + msg.transcoderId + "\n") + transcodersModel.getTranscoder(msg.transcoderId) match { + case Some(vt) => { + log.info("\n > Found a transcoder for this user {}", msg.transcoderId) + vt ! new StartVideoProbingRequest() + } + case None => { + val vt = context.actorOf(VideoTranscoder.props(self, msg.meetingId, msg.transcoderId, msg.params)) + transcodersModel.addTranscoder(msg.transcoderId, vt) + vt ! new StartVideoProbingRequest() + } + } + } + +} diff --git a/akka-bbb-transcode/src/main/scala/org/bigbluebutton/transcode/core/apps/TranscodingObserverApp.scala b/akka-bbb-transcode/src/main/scala/org/bigbluebutton/transcode/core/apps/TranscodingObserverApp.scala new file mode 100644 index 0000000000..cfa8be37f9 --- /dev/null +++ b/akka-bbb-transcode/src/main/scala/org/bigbluebutton/transcode/core/apps/TranscodingObserverApp.scala @@ -0,0 +1,78 @@ +package org.bigbluebutton.transcode.core.apps + +import akka.actor.ActorRef +import org.bigbluebutton.transcode.core.TranscodingActor +import org.bigbluebutton.transcode.api._ +import org.bigbluebutton.common.messages.Constants +import org.bigbluebutton.transcode.core.ffmpeg.FFmpegConstants +import scala.collection.JavaConversions._ + +trait TranscodingObserverApp { + + this: TranscodingActor => + + val messageSenderActor: ActorRef + + def handleTranscodingFinishedUnsuccessfully(msg: TranscodingFinishedUnsuccessfully) = { + transcodersModel.getTranscoder(msg.getTranscoderId()) match { + case Some(vt) => { + log.info("\n > Transcoder for this user {} stopped unsuccessfully, restarting it...", msg.getTranscoderId()) + vt ! new RestartVideoTranscoderRequest() + } + case None => { + log.info("\n > Video transcoder with id = {} not found (might be destroyed already).", msg.getTranscoderId()) + } + } + } + + def handleTranscodingFinishedSuccessfully(msg: TranscodingFinishedSuccessfully) = { + transcodersModel.getTranscoder(msg.getTranscoderId()) match { + case Some(vt) => { + log.info("\n > Transcoder for this user {} stopped with success, removing it from transcoder's list...", msg.getTranscoderId()) + transcodersModel.removeTranscoder(msg.getTranscoderId()) + } + case None => { + log.info("\n > Video transcoder with id = {} not found (might be destroyed already).", msg.getTranscoderId()) + } + } + } + + def handleStartVideoTranscoderReply(msg: StartVideoTranscoderReply) = { + log.info("\n > Transcoder with id = {} started", msg.getTranscoderId()) + val params = new scala.collection.mutable.HashMap[String, String] + params += Constants.OUTPUT -> msg.getOutput() + messageSenderActor ! new StartTranscoderReply(msg.getMeetingId(), msg.getTranscoderId(), params) + } + + def handleUpdateVideoTranscoderReply(msg: UpdateVideoTranscoderReply) = { + log.info("\n > Transcoder with id = {} updated", msg.getTranscoderId()) + val params = new scala.collection.mutable.HashMap[String, String] + params += Constants.OUTPUT -> msg.getOutput() + messageSenderActor ! new UpdateTranscoderReply(msg.getMeetingId(), msg.getTranscoderId(), params) + } + + def handleDestroyVideoTranscoderReply(msg: DestroyVideoTranscoderReply) = { + log.info("\n > Transcoder with id = {} stopped", msg.getTranscoderId()) + messageSenderActor ! new StopTranscoderReply(msg.getMeetingId(), msg.getTranscoderId()) + } + + def handleRestartVideoTranscoderReply(msg: RestartVideoTranscoderReply) = { + log.info("\n > Transcoder with id = {} restarted", msg.getTranscoderId()) + val params = new scala.collection.mutable.HashMap[String, String] + params += Constants.OUTPUT -> msg.getOutput() + messageSenderActor ! new TranscoderStatusUpdate(msg.getMeetingId(), msg.getTranscoderId(), params) + } + + def handleStartVideoProbingReply(msg: StartVideoProbingReply) = { + val ffprobeResult = mapAsScalaMap(msg.getProbingData()) + Option(ffprobeResult) match { + case Some(result) => + val params = new scala.collection.mutable.HashMap[String, String] + params += Constants.WIDTH_RATIO -> result.getOrElse(FFmpegConstants.WIDTH, "") + params += Constants.HEIGHT_RATIO -> result.getOrElse(FFmpegConstants.HEIGHT, "") + messageSenderActor ! new StartProbingReply(msg.getMeetingId(), msg.getTranscoderId(), params) + case _ => log.debug("Could not send ffprobe reply : failed to get the new resolution"); + } + } + +} diff --git a/bbb-common-message/src/main/java/org/bigbluebutton/common/messages/Constants.java b/bbb-common-message/src/main/java/org/bigbluebutton/common/messages/Constants.java index b76f93f18e..54c7708321 100755 --- a/bbb-common-message/src/main/java/org/bigbluebutton/common/messages/Constants.java +++ b/bbb-common-message/src/main/java/org/bigbluebutton/common/messages/Constants.java @@ -141,4 +141,16 @@ public class Constants { public static final String OPERATION = "operation"; public static final String NOTE = "note"; public static final String METADATA = "metadata"; + public static final String LOCAL_IP_ADDRESS = "local_ip_address"; + public static final String LOCAL_VIDEO_PORT = "local_video_port"; + public static final String REMOTE_VIDEO_PORT = "remote_video_port"; + public static final String SIP_HOST = "sip_host"; + public static final String TRANSCODER_TYPE = "transcoder_type"; + public static final String INPUT = "input"; + public static final String OUTPUT = "output"; + public static final String TRANSCODE_RTP_TO_RTMP = "transcode_rtp_to_rtmp"; + public static final String TRANSCODE_RTMP_TO_RTP = "transcode_rtmp_to_rtp"; + public static final String TRANSCODE_FILE_TO_RTP = "transcode_file_to_rtp"; + public static final String TRANSCODE_FILE_TO_RTMP = "transcode_file_to_rtmp"; + public static final String PROBE_RTMP = "probe_rtmp"; } diff --git a/bbb-common-message/src/main/java/org/bigbluebutton/common/messages/MessagingConstants.java b/bbb-common-message/src/main/java/org/bigbluebutton/common/messages/MessagingConstants.java index d4b6bd727b..c9ca64617d 100755 --- a/bbb-common-message/src/main/java/org/bigbluebutton/common/messages/MessagingConstants.java +++ b/bbb-common-message/src/main/java/org/bigbluebutton/common/messages/MessagingConstants.java @@ -53,6 +53,14 @@ public class MessagingConstants { public static final String FROM_VOICE_CONF_SYSTEM_CHAN = FROM_VOICE_CONF_CHANNEL + ":system"; public static final String FROM_BBB_RECORDING_CHANNEL = "bigbluebutton:from-rap"; + + public static final String TO_BBB_TRANSCODE_CHANNEL = "bigbluebutton:to-bbb-transcode"; + public static final String TO_BBB_TRANSCODE_PATTERN = TO_BBB_TRANSCODE_CHANNEL + ":*"; + public static final String TO_BBB_TRANSCODE_SYSTEM_CHAN = TO_BBB_TRANSCODE_CHANNEL + ":system"; + public static final String FROM_BBB_TRANSCODE_CHANNEL = "bigbluebutton:from-bbb-transcode"; + public static final String FROM_BBB_TRANSCODE_PATTERN = FROM_BBB_TRANSCODE_CHANNEL + ":*"; + public static final String FROM_BBB_TRANSCODE_SYSTEM_CHAN = FROM_BBB_TRANSCODE_CHANNEL + ":system"; + public static final String DESTROY_MEETING_REQUEST_EVENT = "DestroyMeetingRequestEvent"; public static final String CREATE_MEETING_REQUEST_EVENT = "CreateMeetingRequestEvent"; diff --git a/bbb-common-message/src/main/java/org/bigbluebutton/common/messages/payload/StartProbingReplyMessage.java b/bbb-common-message/src/main/java/org/bigbluebutton/common/messages/payload/StartProbingReplyMessage.java new file mode 100644 index 0000000000..44f4f3e235 --- /dev/null +++ b/bbb-common-message/src/main/java/org/bigbluebutton/common/messages/payload/StartProbingReplyMessage.java @@ -0,0 +1,64 @@ +package org.bigbluebutton.common.messages; + +import java.util.Map; +import java.util.HashMap; +import com.google.gson.Gson; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import com.google.gson.reflect.TypeToken; + + +public class StartProbingReplyMessage implements IBigBlueButtonMessage { + public static final String START_PROBING_REPLY = "start_probing_reply_message"; + public static final String VERSION = "0.0.1"; + + public static final String MEETING_ID = "meeting_id"; + public static final String TRANSCODER_ID = "transcoder_id"; + public static final String PARAMS = "params"; + + public final String meetingId; + public final String transcoderId; + public final Map params; + + public StartProbingReplyMessage(String meetingId, String transcoderId, Map params) { + this.meetingId = meetingId; + this.transcoderId = transcoderId; + this.params = params; + } + + public String toJson() { + HashMap payload = new HashMap(); + payload.put(MEETING_ID, meetingId); + payload.put(TRANSCODER_ID, transcoderId); + payload.put(PARAMS, params); + + java.util.HashMap header = MessageBuilder.buildHeader(START_PROBING_REPLY, VERSION, null); + + return MessageBuilder.buildJson(header, payload); + } + + public static StartProbingReplyMessage fromJson(String message) { + JsonParser parser = new JsonParser(); + JsonObject obj = (JsonObject) parser.parse(message); + + if (obj.has("header") && obj.has("payload")) { + JsonObject header = (JsonObject) obj.get("header"); + JsonObject payload = (JsonObject) obj.get("payload"); + + if (header.has("name")) { + String messageName = header.get("name").getAsString(); + if (START_PROBING_REPLY.equals(messageName)) { + if (payload.has(MEETING_ID) + && payload.has(TRANSCODER_ID) + && payload.has(PARAMS)){ + String meetingId = payload.get(MEETING_ID).getAsString(); + String transcoderId = payload.get(TRANSCODER_ID).getAsString(); + Map params = new Gson().fromJson(payload.get(PARAMS).toString(), new TypeToken>() {}.getType()); + return new StartProbingReplyMessage(meetingId, transcoderId, params); + } + } + } + } + return null; + } +} diff --git a/bbb-common-message/src/main/java/org/bigbluebutton/common/messages/payload/StartProbingRequestMessage.java b/bbb-common-message/src/main/java/org/bigbluebutton/common/messages/payload/StartProbingRequestMessage.java new file mode 100644 index 0000000000..b9d9067283 --- /dev/null +++ b/bbb-common-message/src/main/java/org/bigbluebutton/common/messages/payload/StartProbingRequestMessage.java @@ -0,0 +1,64 @@ +package org.bigbluebutton.common.messages; + +import java.util.Map; +import java.util.HashMap; +import com.google.gson.Gson; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import com.google.gson.reflect.TypeToken; + + +public class StartProbingRequestMessage implements IBigBlueButtonMessage { + public static final String START_PROBING_REQUEST = "start_probing_request_message"; + public static final String VERSION = "0.0.1"; + + public static final String MEETING_ID = "meeting_id"; + public static final String TRANSCODER_ID = "transcoder_id"; + public static final String PARAMS = "params"; + + public final String meetingId; + public final String transcoderId; + public final Map params; + + public StartProbingRequestMessage(String meetingId, String transcoderId, Map params) { + this.meetingId = meetingId; + this.transcoderId = transcoderId; + this.params = params; + } + + public String toJson() { + HashMap payload = new HashMap(); + payload.put(MEETING_ID, meetingId); + payload.put(TRANSCODER_ID, transcoderId); + payload.put(PARAMS, params); + + java.util.HashMap header = MessageBuilder.buildHeader(START_PROBING_REQUEST, VERSION, null); + + return MessageBuilder.buildJson(header, payload); + } + + public static StartProbingRequestMessage fromJson(String message) { + JsonParser parser = new JsonParser(); + JsonObject obj = (JsonObject) parser.parse(message); + + if (obj.has("header") && obj.has("payload")) { + JsonObject header = (JsonObject) obj.get("header"); + JsonObject payload = (JsonObject) obj.get("payload"); + + if (header.has("name")) { + String messageName = header.get("name").getAsString(); + if (START_PROBING_REQUEST.equals(messageName)) { + if (payload.has(MEETING_ID) + && payload.has(TRANSCODER_ID) + && payload.has(PARAMS)){ + String meetingId = payload.get(MEETING_ID).getAsString(); + String transcoderId = payload.get(TRANSCODER_ID).getAsString(); + Map params = new Gson().fromJson(payload.get(PARAMS).toString(), new TypeToken>() {}.getType()); + return new StartProbingRequestMessage(meetingId, transcoderId, params); + } + } + } + } + return null; + } +} diff --git a/bbb-common-message/src/main/java/org/bigbluebutton/common/messages/payload/StartTranscoderReplyMessage.java b/bbb-common-message/src/main/java/org/bigbluebutton/common/messages/payload/StartTranscoderReplyMessage.java new file mode 100644 index 0000000000..33d71db113 --- /dev/null +++ b/bbb-common-message/src/main/java/org/bigbluebutton/common/messages/payload/StartTranscoderReplyMessage.java @@ -0,0 +1,64 @@ +package org.bigbluebutton.common.messages; + +import java.util.Map; +import java.util.HashMap; +import com.google.gson.Gson; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import com.google.gson.reflect.TypeToken; + + +public class StartTranscoderReplyMessage implements IBigBlueButtonMessage { + public static final String START_TRANSCODER_REPLY = "start_transcoder_reply_message"; + public static final String VERSION = "0.0.1"; + + public static final String MEETING_ID = "meeting_id"; + public static final String TRANSCODER_ID = "transcoder_id"; + public static final String PARAMS = "params"; + + public final String meetingId; + public final String transcoderId; + public final Map params; + + public StartTranscoderReplyMessage(String meetingId, String transcoderId, Map params) { + this.meetingId = meetingId; + this.transcoderId = transcoderId; + this.params = params; + } + + public String toJson() { + HashMap payload = new HashMap(); + payload.put(MEETING_ID, meetingId); + payload.put(TRANSCODER_ID, transcoderId); + payload.put(PARAMS, params); + + java.util.HashMap header = MessageBuilder.buildHeader(START_TRANSCODER_REPLY, VERSION, null); + + return MessageBuilder.buildJson(header, payload); + } + + public static StartTranscoderReplyMessage fromJson(String message) { + JsonParser parser = new JsonParser(); + JsonObject obj = (JsonObject) parser.parse(message); + + if (obj.has("header") && obj.has("payload")) { + JsonObject header = (JsonObject) obj.get("header"); + JsonObject payload = (JsonObject) obj.get("payload"); + + if (header.has("name")) { + String messageName = header.get("name").getAsString(); + if (START_TRANSCODER_REPLY.equals(messageName)) { + if ( payload.has(MEETING_ID) + && payload.has(TRANSCODER_ID) + && payload.has(PARAMS)){ + String meetingId = payload.get(MEETING_ID).getAsString(); + String transcoderId = payload.get(TRANSCODER_ID).getAsString(); + Map params = new Gson().fromJson(payload.get(PARAMS).toString(), new TypeToken>() {}.getType()); + return new StartTranscoderReplyMessage(meetingId, transcoderId, params); + } + } + } + } + return null; + } +} diff --git a/bbb-common-message/src/main/java/org/bigbluebutton/common/messages/payload/StartTranscoderRequestMessage.java b/bbb-common-message/src/main/java/org/bigbluebutton/common/messages/payload/StartTranscoderRequestMessage.java new file mode 100644 index 0000000000..728093dc22 --- /dev/null +++ b/bbb-common-message/src/main/java/org/bigbluebutton/common/messages/payload/StartTranscoderRequestMessage.java @@ -0,0 +1,64 @@ +package org.bigbluebutton.common.messages; + +import java.util.Map; +import java.util.HashMap; +import com.google.gson.Gson; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import com.google.gson.reflect.TypeToken; + + +public class StartTranscoderRequestMessage implements IBigBlueButtonMessage { + public static final String START_TRANSCODER_REQUEST = "start_transcoder_request_message"; + public static final String VERSION = "0.0.1"; + + public static final String MEETING_ID = "meeting_id"; + public static final String TRANSCODER_ID = "transcoder_id"; + public static final String PARAMS = "params"; + + public final String meetingId; + public final String transcoderId; + public final Map params; + + public StartTranscoderRequestMessage(String meetingId, String transcoderId, Map params) { + this.meetingId = meetingId; + this.transcoderId = transcoderId; + this.params = params; + } + + public String toJson() { + HashMap payload = new HashMap(); + payload.put(MEETING_ID, meetingId); + payload.put(TRANSCODER_ID, transcoderId); + payload.put(PARAMS, params); + + java.util.HashMap header = MessageBuilder.buildHeader(START_TRANSCODER_REQUEST, VERSION, null); + + return MessageBuilder.buildJson(header, payload); + } + + public static StartTranscoderRequestMessage fromJson(String message) { + JsonParser parser = new JsonParser(); + JsonObject obj = (JsonObject) parser.parse(message); + + if (obj.has("header") && obj.has("payload")) { + JsonObject header = (JsonObject) obj.get("header"); + JsonObject payload = (JsonObject) obj.get("payload"); + + if (header.has("name")) { + String messageName = header.get("name").getAsString(); + if (START_TRANSCODER_REQUEST.equals(messageName)) { + if (payload.has(MEETING_ID) + && payload.has(TRANSCODER_ID) + && payload.has(PARAMS)){ + String meetingId = payload.get(MEETING_ID).getAsString(); + String transcoderId = payload.get(TRANSCODER_ID).getAsString(); + Map params = new Gson().fromJson(payload.get(PARAMS).toString(), new TypeToken>() {}.getType()); + return new StartTranscoderRequestMessage(meetingId, transcoderId, params); + } + } + } + } + return null; + } +} diff --git a/bbb-common-message/src/main/java/org/bigbluebutton/common/messages/payload/StopTranscoderReplyMessage.java b/bbb-common-message/src/main/java/org/bigbluebutton/common/messages/payload/StopTranscoderReplyMessage.java new file mode 100644 index 0000000000..d8aee64bb5 --- /dev/null +++ b/bbb-common-message/src/main/java/org/bigbluebutton/common/messages/payload/StopTranscoderReplyMessage.java @@ -0,0 +1,58 @@ +package org.bigbluebutton.common.messages; + +import java.util.Map; +import java.util.HashMap; +import com.google.gson.Gson; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import com.google.gson.reflect.TypeToken; + + +public class StopTranscoderReplyMessage implements IBigBlueButtonMessage { + public static final String STOP_TRANSCODER_REPLY = "stop_transcoder_reply_message"; + public static final String VERSION = "0.0.1"; + + public static final String MEETING_ID = "meeting_id"; + public static final String TRANSCODER_ID = "transcoder_id"; + + public final String meetingId; + public final String transcoderId; + + public StopTranscoderReplyMessage(String meetingId, String transcoderId) { + this.meetingId = meetingId; + this.transcoderId = transcoderId; + } + + public String toJson() { + HashMap payload = new HashMap(); + payload.put(MEETING_ID, meetingId); + payload.put(TRANSCODER_ID, transcoderId); + + java.util.HashMap header = MessageBuilder.buildHeader(STOP_TRANSCODER_REPLY, VERSION, null); + + return MessageBuilder.buildJson(header, payload); + } + + public static StopTranscoderReplyMessage fromJson(String message) { + JsonParser parser = new JsonParser(); + JsonObject obj = (JsonObject) parser.parse(message); + + if (obj.has("header") && obj.has("payload")) { + JsonObject header = (JsonObject) obj.get("header"); + JsonObject payload = (JsonObject) obj.get("payload"); + + if (header.has("name")) { + String messageName = header.get("name").getAsString(); + if (STOP_TRANSCODER_REPLY.equals(messageName)) { + if ( payload.has(MEETING_ID) + && payload.has(TRANSCODER_ID)){ + String meetingId = payload.get(MEETING_ID).getAsString(); + String transcoderId = payload.get(TRANSCODER_ID).getAsString(); + return new StopTranscoderReplyMessage(meetingId, transcoderId); + } + } + } + } + return null; + } +} diff --git a/bbb-common-message/src/main/java/org/bigbluebutton/common/messages/payload/StopTranscoderRequestMessage.java b/bbb-common-message/src/main/java/org/bigbluebutton/common/messages/payload/StopTranscoderRequestMessage.java new file mode 100644 index 0000000000..7ac74c1729 --- /dev/null +++ b/bbb-common-message/src/main/java/org/bigbluebutton/common/messages/payload/StopTranscoderRequestMessage.java @@ -0,0 +1,58 @@ +package org.bigbluebutton.common.messages; + +import java.util.Map; +import java.util.HashMap; +import com.google.gson.Gson; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import com.google.gson.reflect.TypeToken; + + +public class StopTranscoderRequestMessage implements IBigBlueButtonMessage { + public static final String STOP_TRANSCODER_REQUEST = "stop_transcoder_request_message"; + public static final String VERSION = "0.0.1"; + + public static final String MEETING_ID = "meeting_id"; + public static final String TRANSCODER_ID = "transcoder_id"; + + public final String meetingId; + public final String transcoderId; + + public StopTranscoderRequestMessage(String meetingId, String transcoderId) { + this.meetingId = meetingId; + this.transcoderId = transcoderId; + } + + public String toJson() { + HashMap payload = new HashMap(); + payload.put(MEETING_ID, meetingId); + payload.put(TRANSCODER_ID, transcoderId); + + java.util.HashMap header = MessageBuilder.buildHeader(STOP_TRANSCODER_REQUEST, VERSION, null); + + return MessageBuilder.buildJson(header, payload); + } + + public static StopTranscoderRequestMessage fromJson(String message) { + JsonParser parser = new JsonParser(); + JsonObject obj = (JsonObject) parser.parse(message); + + if (obj.has("header") && obj.has("payload")) { + JsonObject header = (JsonObject) obj.get("header"); + JsonObject payload = (JsonObject) obj.get("payload"); + + if (header.has("name")) { + String messageName = header.get("name").getAsString(); + if (STOP_TRANSCODER_REQUEST.equals(messageName)) { + if (payload.has(MEETING_ID) + && payload.has(TRANSCODER_ID)){ + String meetingId = payload.get(MEETING_ID).getAsString(); + String transcoderId = payload.get(TRANSCODER_ID).getAsString(); + return new StopTranscoderRequestMessage(meetingId, transcoderId); + } + } + } + } + return null; + } +} diff --git a/bbb-common-message/src/main/java/org/bigbluebutton/common/messages/payload/TranscoderStatusUpdateMessage.java b/bbb-common-message/src/main/java/org/bigbluebutton/common/messages/payload/TranscoderStatusUpdateMessage.java new file mode 100644 index 0000000000..a48a85db18 --- /dev/null +++ b/bbb-common-message/src/main/java/org/bigbluebutton/common/messages/payload/TranscoderStatusUpdateMessage.java @@ -0,0 +1,64 @@ +package org.bigbluebutton.common.messages; + +import java.util.Map; +import java.util.HashMap; +import com.google.gson.Gson; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import com.google.gson.reflect.TypeToken; + + +public class TranscoderStatusUpdateMessage implements IBigBlueButtonMessage { + public static final String TRANSCODER_STATUS_UPDATE = "transcoder_status_update"; + public static final String VERSION = "0.0.1"; + + public static final String MEETING_ID = "meeting_id"; + public static final String TRANSCODER_ID = "transcoder_id"; + public static final String PARAMS = "params"; + + public final String meetingId; + public final String transcoderId; + public final Map params; + + public TranscoderStatusUpdateMessage(String meetingId, String transcoderId, Map params) { + this.meetingId = meetingId; + this.transcoderId = transcoderId; + this.params = params; + } + + public String toJson() { + HashMap payload = new HashMap(); + payload.put(MEETING_ID, meetingId); + payload.put(TRANSCODER_ID, transcoderId); + payload.put(PARAMS, params); + + java.util.HashMap header = MessageBuilder.buildHeader(TRANSCODER_STATUS_UPDATE, VERSION, null); + + return MessageBuilder.buildJson(header, payload); + } + + public static TranscoderStatusUpdateMessage fromJson(String message) { + JsonParser parser = new JsonParser(); + JsonObject obj = (JsonObject) parser.parse(message); + + if (obj.has("header") && obj.has("payload")) { + JsonObject header = (JsonObject) obj.get("header"); + JsonObject payload = (JsonObject) obj.get("payload"); + + if (header.has("name")) { + String messageName = header.get("name").getAsString(); + if (TRANSCODER_STATUS_UPDATE.equals(messageName)) { + if (payload.has(MEETING_ID) + && payload.has(TRANSCODER_ID) + && payload.has(PARAMS)){ + String meetingId = payload.get(MEETING_ID).getAsString(); + String transcoderId = payload.get(TRANSCODER_ID).getAsString(); + Map params = new Gson().fromJson(payload.get(PARAMS).toString(), new TypeToken>() {}.getType()); + return new TranscoderStatusUpdateMessage(meetingId, transcoderId, params); + } + } + } + } + return null; + } +} diff --git a/bbb-common-message/src/main/java/org/bigbluebutton/common/messages/payload/UpdateTranscoderReplyMessage.java b/bbb-common-message/src/main/java/org/bigbluebutton/common/messages/payload/UpdateTranscoderReplyMessage.java new file mode 100644 index 0000000000..a8c8ce5338 --- /dev/null +++ b/bbb-common-message/src/main/java/org/bigbluebutton/common/messages/payload/UpdateTranscoderReplyMessage.java @@ -0,0 +1,64 @@ +package org.bigbluebutton.common.messages; + +import java.util.Map; +import java.util.HashMap; +import com.google.gson.Gson; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import com.google.gson.reflect.TypeToken; + + +public class UpdateTranscoderReplyMessage implements IBigBlueButtonMessage { + public static final String UPDATE_TRANSCODER_REPLY = "update_transcoder_reply_message"; + public static final String VERSION = "0.0.1"; + + public static final String MEETING_ID = "meeting_id"; + public static final String TRANSCODER_ID = "transcoder_id"; + public static final String PARAMS = "params"; + + public final String meetingId; + public final String transcoderId; + public final Map params; + + public UpdateTranscoderReplyMessage(String meetingId, String transcoderId, Map params) { + this.meetingId = meetingId; + this.transcoderId = transcoderId; + this.params = params; + } + + public String toJson() { + HashMap payload = new HashMap(); + payload.put(MEETING_ID, meetingId); + payload.put(TRANSCODER_ID, transcoderId); + payload.put(PARAMS, params); + + java.util.HashMap header = MessageBuilder.buildHeader(UPDATE_TRANSCODER_REPLY, VERSION, null); + + return MessageBuilder.buildJson(header, payload); + } + + public static UpdateTranscoderReplyMessage fromJson(String message) { + JsonParser parser = new JsonParser(); + JsonObject obj = (JsonObject) parser.parse(message); + + if (obj.has("header") && obj.has("payload")) { + JsonObject header = (JsonObject) obj.get("header"); + JsonObject payload = (JsonObject) obj.get("payload"); + + if (header.has("name")) { + String messageName = header.get("name").getAsString(); + if (UPDATE_TRANSCODER_REPLY.equals(messageName)) { + if ( payload.has(MEETING_ID) + && payload.has(TRANSCODER_ID) + && payload.has(PARAMS)){ + String meetingId = payload.get(MEETING_ID).getAsString(); + String transcoderId = payload.get(TRANSCODER_ID).getAsString(); + Map params = new Gson().fromJson(payload.get(PARAMS).toString(), new TypeToken>() {}.getType()); + return new UpdateTranscoderReplyMessage(meetingId, transcoderId, params); + } + } + } + } + return null; + } +} diff --git a/bbb-common-message/src/main/java/org/bigbluebutton/common/messages/payload/UpdateTranscoderRequestMessage.java b/bbb-common-message/src/main/java/org/bigbluebutton/common/messages/payload/UpdateTranscoderRequestMessage.java new file mode 100644 index 0000000000..3fd6904e6a --- /dev/null +++ b/bbb-common-message/src/main/java/org/bigbluebutton/common/messages/payload/UpdateTranscoderRequestMessage.java @@ -0,0 +1,64 @@ +package org.bigbluebutton.common.messages; + +import java.util.Map; +import java.util.HashMap; +import com.google.gson.Gson; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import com.google.gson.reflect.TypeToken; + + +public class UpdateTranscoderRequestMessage implements IBigBlueButtonMessage { + public static final String UPDATE_TRANSCODER_REQUEST = "update_transcoder_request_message"; + public static final String VERSION = "0.0.1"; + + public static final String MEETING_ID = "meeting_id"; + public static final String TRANSCODER_ID = "transcoder_id"; + public static final String PARAMS = "params"; + + public final String meetingId; + public final String transcoderId; + public final Map params; + + public UpdateTranscoderRequestMessage(String meetingId, String transcoderId, Map params) { + this.meetingId = meetingId; + this.transcoderId = transcoderId; + this.params = params; + } + + public String toJson() { + HashMap payload = new HashMap(); + payload.put(MEETING_ID, meetingId); + payload.put(TRANSCODER_ID, transcoderId); + payload.put(PARAMS, params); + + java.util.HashMap header = MessageBuilder.buildHeader(UPDATE_TRANSCODER_REQUEST, VERSION, null); + + return MessageBuilder.buildJson(header, payload); + } + + public static UpdateTranscoderRequestMessage fromJson(String message) { + JsonParser parser = new JsonParser(); + JsonObject obj = (JsonObject) parser.parse(message); + + if (obj.has("header") && obj.has("payload")) { + JsonObject header = (JsonObject) obj.get("header"); + JsonObject payload = (JsonObject) obj.get("payload"); + + if (header.has("name")) { + String messageName = header.get("name").getAsString(); + if (UPDATE_TRANSCODER_REQUEST.equals(messageName)) { + if (payload.has(MEETING_ID) + && payload.has(TRANSCODER_ID) + && payload.has(PARAMS)){ + String meetingId = payload.get(MEETING_ID).getAsString(); + String transcoderId = payload.get(TRANSCODER_ID).getAsString(); + Map params = new Gson().fromJson(payload.get(PARAMS).toString(), new TypeToken>() {}.getType()); + return new UpdateTranscoderRequestMessage(meetingId, transcoderId, params); + } + } + } + } + return null; + } +}