Introducing bbb-transcode-akka

Conflicts:
	akka-bbb-apps/src/main/java/org/bigbluebutton/core/api/IBigBlueButtonInGW.java
	akka-bbb-apps/src/main/java/org/bigbluebutton/core/pubsub/receivers/MeetingMessageReceiver.java
	akka-bbb-apps/src/main/scala/org/bigbluebutton/core/BigBlueButtonInGW.scala
	akka-bbb-apps/src/main/scala/org/bigbluebutton/core/MeetingActor.scala
	akka-bbb-apps/src/main/scala/org/bigbluebutton/core/MeetingModel.scala
	akka-bbb-apps/src/main/scala/org/bigbluebutton/core/MessageSenderActor.scala
	akka-bbb-apps/src/main/scala/org/bigbluebutton/core/api/InMessages.scala
	akka-bbb-apps/src/main/scala/org/bigbluebutton/core/api/OutMessages.scala
	akka-bbb-apps/src/main/scala/org/bigbluebutton/core/api/ValueObjects.scala
	akka-bbb-apps/src/main/scala/org/bigbluebutton/core/apps/UsersApp.scala
	akka-bbb-apps/src/main/scala/org/bigbluebutton/core/apps/UsersModel.scala
	akka-bbb-apps/src/main/scala/org/bigbluebutton/core/apps/VoiceApp.scala
	akka-bbb-fsesl/src/main/java/org/bigbluebutton/freeswitch/voice/freeswitch/ESLEventListener.java
	akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/core/VideoTranscoderObserver.java
	akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/core/ffmpeg/FFmpegCommand.java
	akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/core/ffmpeg/FFmpegConstants.java
	akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/core/ffprobe/FFProbeCommand.java
	akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/core/processmonitor/ProcessMonitor.java
	akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/core/processmonitor/ProcessMonitorObserver.java
	akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/core/processmonitor/ProcessStream.java
	bbb-common-message/src/main/java/org/bigbluebutton/common/messages/Constants.java
	bbb-common-message/src/main/java/org/bigbluebutton/common/messages/MessageFromJsonConverter.java
	bbb-common-message/src/main/java/org/bigbluebutton/common/messages/MessagingConstants.java
	bbb-voice/src/main/java/org/bigbluebutton/voiceconf/messaging/Constants.java
	bbb-voice/src/main/java/org/bigbluebutton/voiceconf/messaging/IMessagingService.java
	bbb-voice/src/main/java/org/bigbluebutton/voiceconf/messaging/MessageReceiver.java
	bbb-voice/src/main/java/org/bigbluebutton/voiceconf/messaging/ReceivedMessageHandler.java
	bbb-voice/src/main/java/org/bigbluebutton/voiceconf/messaging/RedisMessagingService.java
	bbb-voice/src/main/java/org/bigbluebutton/voiceconf/red5/Service.java
	bbb-voice/src/main/java/org/bigbluebutton/voiceconf/sip/CallAgent.java
	bbb-voice/src/main/java/org/bigbluebutton/voiceconf/sip/CallAgentObserver.java
	bbb-voice/src/main/java/org/bigbluebutton/voiceconf/sip/GlobalCall.java
	bbb-voice/src/main/java/org/bigbluebutton/voiceconf/sip/SipPeer.java
	bbb-voice/src/main/java/org/bigbluebutton/voiceconf/sip/SipPeerManager.java
	bbb-voice/src/main/webapp/WEB-INF/bbb-redis-messaging.xml
	bbb-voice/src/main/webapp/WEB-INF/red5-web.xml
This commit is contained in:
Mario Gasparoni 2016-05-27 12:48:38 -03:00 committed by Pedro Beschorner Marin
parent 654fd8c553
commit 16698f7489
65 changed files with 3593 additions and 2 deletions

View File

@ -125,4 +125,11 @@ public interface IBigBlueButtonInGW {
void destroyAdditionalNotes(String meetingID, String requesterID, String noteID);
void requestAdditionalNotesSet(String meetingID, String requesterID, int additionalNotesSetSize);
void sharedNotesSyncNoteRequest(String meetingID, String requesterID, String noteID);
//Transcode
void startTranscoderReply(String meetingId, String transcoderId, Map<String,String> params);
void updateTranscoderReply(String meetingId, String transcoderId, Map<String,String> params);
void stopTranscoderReply(String meetingId, String transcoderId);
void transcoderStatusUpdate(String meetingId, String transcoderId, Map<String,String> params);
void startProbingReply(String meetingId, String transcoderId, Map<String,String> params);
}

View File

@ -24,6 +24,11 @@ import org.bigbluebutton.common.messages.RespondToGuestMessage;
import org.bigbluebutton.common.messages.SetGuestPolicyMessage;
import org.bigbluebutton.common.messages.SetRecordingStatusRequestMessage;
import org.bigbluebutton.common.messages.SetUserStatusRequestMessage;
import org.bigbluebutton.common.messages.StartTranscoderReplyMessage;
import org.bigbluebutton.common.messages.StartProbingReplyMessage;
import org.bigbluebutton.common.messages.UpdateTranscoderReplyMessage;
import org.bigbluebutton.common.messages.StopTranscoderReplyMessage;
import org.bigbluebutton.common.messages.TranscoderStatusUpdateMessage;
import org.bigbluebutton.common.messages.UserJoinedVoiceConfMessage;
import org.bigbluebutton.common.messages.UserLeavingMessage;
import org.bigbluebutton.common.messages.UserLeftVoiceConfMessage;
@ -171,6 +176,33 @@ public class UsersMessageReceiver implements MessageHandler{
}
}
}
} else if (channel.equalsIgnoreCase(MessagingConstants.FROM_BBB_TRANSCODE_SYSTEM_CHAN)) {
JsonParser parser = new JsonParser();
JsonObject obj = (JsonObject) parser.parse(message);
if (obj.has("header") && obj.has("payload")) {
JsonObject header = (JsonObject) obj.get("header");
if (header.has("name")) {
String messageName = header.get("name").getAsString();
switch (messageName) {
case StartTranscoderReplyMessage.START_TRANSCODER_REPLY:
processStartTranscoderReplyMessage(message);
break;
case UpdateTranscoderReplyMessage.UPDATE_TRANSCODER_REPLY:
processUpdateTranscoderReplyMessage(message);
break;
case StopTranscoderReplyMessage.STOP_TRANSCODER_REPLY:
processStopTranscoderReplyMessage(message);
break;
case TranscoderStatusUpdateMessage.TRANSCODER_STATUS_UPDATE:
processTranscoderStatusUpdateMessage(message);
break;
case StartProbingReplyMessage.START_PROBING_REPLY:
processStartProbingReplyMessage(message);
break;
}
}
}
}
}
@ -397,4 +429,39 @@ public class UsersMessageReceiver implements MessageHandler{
bbbInGW.logoutEndMeeting(lemm.meetingId, lemm.userId);
}
}
private void processStartTranscoderReplyMessage(String message) {
StartTranscoderReplyMessage msg = StartTranscoderReplyMessage.fromJson(message);
if(msg !=null){
bbbInGW.startTranscoderReply(msg.meetingId, msg.transcoderId, msg.params);
}
}
private void processUpdateTranscoderReplyMessage(String message) {
UpdateTranscoderReplyMessage msg = UpdateTranscoderReplyMessage.fromJson(message);
if(msg !=null){
bbbInGW.updateTranscoderReply(msg.meetingId, msg.transcoderId, msg.params);
}
}
private void processStopTranscoderReplyMessage(String message) {
StopTranscoderReplyMessage msg = StopTranscoderReplyMessage.fromJson(message);
if(msg !=null){
bbbInGW.stopTranscoderReply(msg.meetingId, msg.transcoderId);
}
}
private void processTranscoderStatusUpdateMessage(String message) {
TranscoderStatusUpdateMessage msg = TranscoderStatusUpdateMessage.fromJson(message);
if(msg !=null){
bbbInGW.transcoderStatusUpdate(msg.meetingId, msg.transcoderId, msg.params);
}
}
private void processStartProbingReplyMessage(String message) {
StartProbingReplyMessage msg = StartProbingReplyMessage.fromJson(message);
if (msg != null){
bbbInGW.startProbingReply(msg.meetingId, msg.transcoderId, msg.params);
}
}
}

View File

@ -541,4 +541,30 @@ class BigBlueButtonInGW(val system: ActorSystem, recorderApp: RecorderApplicatio
def sharedNotesSyncNoteRequest(meetingId: String, userId: String, noteId: String) {
bbbActor ! new SharedNotesSyncNoteRequest(meetingId, userId, noteId)
}
/**
* *******************************************************************
* Message Interface for transcode
* *****************************************************************
*/
def startTranscoderReply(meetingId: String, transcoderId: String, params: java.util.Map[String, String]) {
bbbActor ! new StartTranscoderReply(meetingId, transcoderId, mapAsScalaMap(params).toMap)
}
def updateTranscoderReply(meetingId: String, transcoderId: String, params: java.util.Map[String, String]) {
bbbActor ! new UpdateTranscoderReply(meetingId, transcoderId, mapAsScalaMap(params).toMap)
}
def stopTranscoderReply(meetingId: String, transcoderId: String) {
bbbActor ! new StopTranscoderReply(meetingId, transcoderId)
}
def transcoderStatusUpdate(meetingId: String, transcoderId: String, params: java.util.Map[String, String]) {
bbbActor ! new TranscoderStatusUpdate(meetingId, transcoderId, mapAsScalaMap(params).toMap)
}
def startProbingReply(meetingId: String, transcoderId: String, params: java.util.Map[String, String]) {
bbbActor ! new StartProbingReply(meetingId, transcoderId, mapAsScalaMap(params).toMap)
}
}

View File

@ -224,6 +224,16 @@ class MeetingActor(val mProps: MeetingProperties, val outGW: OutMessageGateway)
handleSharedNotesSyncNoteRequest(msg)
case msg: ReconnectionTimeout =>
handleReconnectionTimeout(msg)
case msg: StartTranscoderReply =>
handleStartTranscoderReply(msg)
case msg: UpdateTranscoderReply =>
handleUpdateTranscoderReply(msg)
case msg: StopTranscoderReply =>
handleStopTranscoderReply(msg)
case msg: TranscoderStatusUpdate =>
handleTranscoderStatusUpdate(msg)
case msg: StartProbingReply =>
handleStartProbingReply(msg)
case msg: EndMeeting => handleEndMeeting(msg)
case StopMeetingActor => //exit

View File

@ -33,6 +33,9 @@ import org.bigbluebutton.common.messages.UserEjectedFromMeetingMessage
import org.bigbluebutton.common.messages.LockLayoutMessage
import org.bigbluebutton.core.pubsub.senders.WhiteboardMessageToJsonConverter
import org.bigbluebutton.common.converters.ToJsonEncoder
import org.bigbluebutton.common.messages.StartTranscoderRequestMessage
import org.bigbluebutton.common.messages.UpdateTranscoderRequestMessage
import org.bigbluebutton.common.messages.StopTranscoderRequestMessage
object MessageSenderActor {
def props(meetingId: String, msgSender: MessageSender): Props =
@ -124,6 +127,9 @@ class MessageSenderActor(val meetingId: String, val service: MessageSender)
case msg: UserLeftVoice => handleUserLeftVoice(msg)
case msg: IsMeetingMutedReply => handleIsMeetingMutedReply(msg)
case msg: UserListeningOnly => handleUserListeningOnly(msg)
case msg: StartTranscoderRequest => handleStartTranscoderRequest(msg)
case msg: UpdateTranscoderRequest => handleUpdateTranscoderRequest(msg)
case msg: StopTranscoderRequest => handleStopTranscoderRequest(msg)
case msg: GetCurrentLayoutReply => handleGetCurrentLayoutReply(msg)
case msg: BroadcastLayoutEvent => handleBroadcastLayoutEvent(msg)
case msg: LockLayoutEvent => handleLockLayoutEvent(msg)
@ -669,6 +675,21 @@ class MessageSenderActor(val meetingId: String, val service: MessageSender)
service.send(MessagingConstants.FROM_USERS_CHANNEL, json)
}
private def handleStartTranscoderRequest(msg: StartTranscoderRequest) {
val str = new StartTranscoderRequestMessage(msg.meetingID, msg.transcoderId, mapAsJavaMap(msg.params))
service.send(MessagingConstants.TO_BBB_TRANSCODE_SYSTEM_CHAN, str.toJson())
}
private def handleUpdateTranscoderRequest(msg: UpdateTranscoderRequest) {
val str = new UpdateTranscoderRequestMessage(msg.meetingID, msg.transcoderId, mapAsJavaMap(msg.params))
service.send(MessagingConstants.TO_BBB_TRANSCODE_SYSTEM_CHAN, str.toJson())
}
private def handleStopTranscoderRequest(msg: StopTranscoderRequest) {
val str = new StopTranscoderRequestMessage(msg.meetingID, msg.transcoderId)
service.send(MessagingConstants.TO_BBB_TRANSCODE_SYSTEM_CHAN, str.toJson())
}
private def handleGetWhiteboardShapesReply(msg: GetWhiteboardShapesReply) {
val json = WhiteboardMessageToJsonConverter.getWhiteboardShapesReplyToJson(msg)
service.send(MessagingConstants.FROM_WHITEBOARD_CHANNEL, json)

View File

@ -135,4 +135,11 @@ case class GetCurrentDocumentRequest(meetingID: String, requesterID: String) ext
case class CreateAdditionalNotesRequest(meetingID: String, requesterID: String, noteName: String) extends InMessage
case class DestroyAdditionalNotesRequest(meetingID: String, requesterID: String, noteID: String) extends InMessage
case class RequestAdditionalNotesSetRequest(meetingID: String, requesterID: String, additionalNotesSetSize: Int) extends InMessage
case class SharedNotesSyncNoteRequest(meetingID: String, requesterID: String, noteID: String) extends InMessage
case class SharedNotesSyncNoteRequest(meetingID: String, requesterID: String, noteID: String) extends InMessage
//Transcode
case class StartTranscoderReply(meetingID: String, transcoderId: String, params: Map[String, String]) extends InMessage
case class UpdateTranscoderReply(meetingID: String, transcoderId: String, params: Map[String, String]) extends InMessage
case class StopTranscoderReply(meetingID: String, transcoderId: String) extends InMessage
case class TranscoderStatusUpdate(meetingID: String, transcoderId: String, params: Map[String, String]) extends InMessage
case class StartProbingReply(meetingID: String, transcoderId: String, params: Map[String, String]) extends InMessage

View File

@ -146,6 +146,12 @@ case class CreateAdditionalNotesReply(meetingID: String, recorded: Boolean, requ
case class DestroyAdditionalNotesReply(meetingID: String, recorded: Boolean, requesterID: String, noteID: String) extends IOutMessage
case class SharedNotesSyncNoteReply(meetingID: String, recorded: Boolean, requesterID: String, noteID: String, note: NoteReport) extends IOutMessage
//Transcode
case class StartTranscoderRequest(meetingID: String, transcoderId: String, params: scala.collection.mutable.HashMap[String, String]) extends IOutMessage
case class UpdateTranscoderRequest(meetingID: String, transcoderId: String, params: scala.collection.mutable.HashMap[String, String]) extends IOutMessage
case class StopTranscoderRequest(meetingID: String, transcoderId: String) extends IOutMessage
case class StartProbingRequest(meetingID: String, transcoderId: String, params: scala.collection.mutable.HashMap[String, String]) extends IOutMessage
// Value Objects
case class MeetingVO(id: String, recorded: Boolean)

View File

@ -17,7 +17,7 @@ import redis.api.servers.ClientSetname
object AppsRedisSubscriberActor extends SystemConfiguration {
val channels = Seq("time")
val patterns = Seq("bigbluebutton:to-bbb-apps:*", "bigbluebutton:from-voice-conf:*")
val patterns = Seq("bigbluebutton:to-bbb-apps:*", "bigbluebutton:from-voice-conf:*", "bigbluebutton:from-bbb-transcode:*")
def props(msgReceiver: RedisMessageReceiver): Props =
Props(classOf[AppsRedisSubscriberActor], msgReceiver,

49
akka-bbb-transcode/.gitignore vendored Normal file
View File

@ -0,0 +1,49 @@
.metadata
.project
.classpath
.settings
.history
.worksheet
gen
**/*.swp
**/*~.nib
**/build/
**/*.pbxuser
**/*.perspective
**/*.perspectivev3
*.xcworkspace
*.xcuserdatad
*.iml
project/*.ipr
project/*.iml
project/*.iws
project/out
project/*/target
project/target
project/*/bin
project/*/build
project/*.iml
project/*/*.iml
project/.idea
project/.idea/*
.idea/
.DS_Store
project/.DS_Store
project/*/.DS_Store
tm.out
tmlog*.log
*.tm*.epoch
out/
provisioning/.vagrant
provisioning/*/.vagrant
provisioning/*/*.known
/sbt/akka-patterns-store/
/daemon/src/build/
*.lock
logs/
tmp/
build/
akka-patterns-store/
lib_managed/
.cache
bin/

View File

@ -0,0 +1,12 @@
# akka-bbb-transcode
This app implements a simple mechanism for media transcoding, acting as a relay between endpoints. This uses [FFmpeg](http://ffmpeg.org/) for transcoding between media formats and exchanging protocols.
building and running
---
```bash
cd akka-bbb-transcode/
sbt clean
sbt run
```

97
akka-bbb-transcode/build.sbt Executable file
View File

@ -0,0 +1,97 @@
enablePlugins(JavaServerAppPackaging)
name := "bbb-transcode-akka"
organization := "org.bigbluebutton"
version := "0.0.1"
scalaVersion := "2.11.6"
scalacOptions ++= Seq(
"-unchecked",
"-deprecation",
"-Xlint",
"-Ywarn-dead-code",
"-language:_",
"-target:jvm-1.7",
"-encoding", "UTF-8"
)
resolvers ++= Seq(
"spray repo" at "http://repo.spray.io/",
"rediscala" at "http://dl.bintray.com/etaty/maven",
"blindside-repos" at "http://blindside.googlecode.com/svn/repository/"
)
publishTo := Some(Resolver.file("file", new File(Path.userHome.absolutePath+"/dev/repo/maven-repo/releases" )) )
// We want to have our jar files in lib_managed dir.
// This way we'll have the right path when we import
// into eclipse.
retrieveManaged := true
testOptions in Test += Tests.Argument(TestFrameworks.Specs2, "html", "console", "junitxml")
testOptions in Test += Tests.Argument(TestFrameworks.ScalaTest, "-h", "target/scalatest-reports")
libraryDependencies ++= {
val akkaVersion = "2.3.11"
Seq(
"com.typesafe.akka" %% "akka-actor" % akkaVersion,
"com.typesafe.akka" %% "akka-testkit" % akkaVersion % "test",
"com.typesafe.akka" %% "akka-slf4j" % akkaVersion,
"ch.qos.logback" % "logback-classic" % "1.0.3",
"org.pegdown" % "pegdown" % "1.4.0",
"junit" % "junit" % "4.11",
"com.etaty.rediscala" %% "rediscala" % "1.4.0",
"commons-codec" % "commons-codec" % "1.8",
"joda-time" % "joda-time" % "2.3",
"com.google.code.gson" % "gson" % "1.7.1",
"redis.clients" % "jedis" % "2.1.0",
"org.apache.commons" % "commons-lang3" % "3.2",
"org.bigbluebutton" % "bbb-common-message" % "0.0.16"
)}
seq(Revolver.settings: _*)
scalariformSettings
//-----------
// Packaging
//
// Reference:
// https://github.com/muuki88/sbt-native-packager-examples/tree/master/akka-server-app
// http://www.scala-sbt.org/sbt-native-packager/index.html
//-----------
mainClass := Some("org.bigbluebutton.Boot")
maintainer in Linux := "Mario Gasparoni <mariogasparoni@gmail.com>"
packageSummary in Linux := "BigBlueButton Transcoder"
packageDescription := """BigBlueButton FFmpeg transcoder."""
val user = "bigbluebutton"
val group = "bigbluebutton"
// user which will execute the application
daemonUser in Linux := user
// group which will execute the application
daemonGroup in Linux := group
mappings in Universal <+= (packageBin in Compile, sourceDirectory ) map { (_, src) =>
// Move the application.conf so the user can override settings here
val appConf = src / "main" / "resources" / "application.conf"
appConf -> "conf/application.conf"
}
mappings in Universal <+= (packageBin in Compile, sourceDirectory ) map { (_, src) =>
// Move logback.xml so the user can override settings here
val logConf = src / "main" / "resources" / "logback.xml"
logConf -> "conf/logback.xml"
}
debianPackageDependencies in Debian ++= Seq("java7-runtime-headless", "bash")

View File

View File

@ -0,0 +1 @@
sbt.version=0.13.8

View File

@ -0,0 +1,7 @@
addSbtPlugin("io.spray" % "sbt-revolver" % "0.7.2")
addSbtPlugin("com.typesafe.sbt" % "sbt-scalariform" % "1.3.0")
addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "2.2.0")
addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.0.0")

View File

@ -0,0 +1,20 @@
package org.bigbluebutton.transcode.api;
public class DestroyVideoTranscoderReply extends InternalMessage {
private final String meetingId;
private final String transcoderId;
public DestroyVideoTranscoderReply(String meetingId, String transcoderId) {
this.meetingId = meetingId;
this.transcoderId = transcoderId;
}
public String getMeetingId() {
return meetingId;
}
public String getTranscoderId() {
return transcoderId;
}
}

View File

@ -0,0 +1,6 @@
package org.bigbluebutton.transcode.api;
public class DestroyVideoTranscoderRequest extends InternalMessage {
public DestroyVideoTranscoderRequest() {}
}

View File

@ -0,0 +1,3 @@
package org.bigbluebutton.transcode.api;
public class InternalMessage {}

View File

@ -0,0 +1,26 @@
package org.bigbluebutton.transcode.api;
public class RestartVideoTranscoderReply extends InternalMessage {
private final String meetingId;
private final String transcoderId;
private final String output;
public RestartVideoTranscoderReply(String meetingId, String transcoderId, String output) {
this.meetingId = meetingId;
this.transcoderId = transcoderId;
this.output = output;
}
public String getMeetingId() {
return meetingId;
}
public String getTranscoderId() {
return transcoderId;
}
public String getOutput() {
return output;
}
}

View File

@ -0,0 +1,5 @@
package org.bigbluebutton.transcode.api;
public class RestartVideoTranscoderRequest extends InternalMessage {
public RestartVideoTranscoderRequest() {}
}

View File

@ -0,0 +1,27 @@
package org.bigbluebutton.transcode.api;
import java.util.Map;
public class StartVideoProbingReply extends InternalMessage {
private final String meetingId;
private final String transcoderId;
private final Map<String,String> probingData;
public StartVideoProbingReply (String meetingId, String transcoderId, Map<String,String> probingData) {
this.meetingId = meetingId;
this.transcoderId = transcoderId;
this.probingData = probingData;
}
public String getMeetingId() {
return meetingId;
}
public String getTranscoderId() {
return transcoderId;
}
public Map<String,String> getProbingData(){
return probingData;
}
}

View File

@ -0,0 +1,5 @@
package org.bigbluebutton.transcode.api;
public class StartVideoProbingRequest extends InternalMessage {
public StartVideoProbingRequest() {}
}

View File

@ -0,0 +1,26 @@
package org.bigbluebutton.transcode.api;
public class StartVideoTranscoderReply extends InternalMessage {
private final String meetingId;
private final String transcoderId;
private final String output;
public StartVideoTranscoderReply(String meetingId, String transcoderId, String output) {
this.meetingId = meetingId;
this.transcoderId = transcoderId;
this.output = output;
}
public String getMeetingId() {
return meetingId;
}
public String getTranscoderId() {
return transcoderId;
}
public String getOutput() {
return output;
}
}

View File

@ -0,0 +1,5 @@
package org.bigbluebutton.transcode.api;
public class StartVideoTranscoderRequest extends InternalMessage {
public StartVideoTranscoderRequest() {}
}

View File

@ -0,0 +1,20 @@
package org.bigbluebutton.transcode.api;
public class StopVideoTranscoderReply extends InternalMessage {
private final String meetingId;
private final String transcoderId;
public StopVideoTranscoderReply(String meetingId, String transcoderId) {
this.meetingId = meetingId;
this.transcoderId = transcoderId;
}
public String getMeetingId() {
return meetingId;
}
public String getTranscoderId() {
return transcoderId;
}
}

View File

@ -0,0 +1,5 @@
package org.bigbluebutton.transcode.api;
public class StopVideoTranscoderRequest extends InternalMessage {
public StopVideoTranscoderRequest() {}
}

View File

@ -0,0 +1,13 @@
package org.bigbluebutton.transcode.api;
public class TranscodingFinishedSuccessfully extends InternalMessage {
private final String transcoderId;
public TranscodingFinishedSuccessfully (String transcoderId) {
this.transcoderId = transcoderId;
}
public String getTranscoderId() {
return transcoderId;
}
}

View File

@ -0,0 +1,13 @@
package org.bigbluebutton.transcode.api;
public class TranscodingFinishedUnsuccessfully extends InternalMessage {
private final String transcoderId;
public TranscodingFinishedUnsuccessfully (String transcoderId) {
this.transcoderId = transcoderId;
}
public String getTranscoderId() {
return transcoderId;
}
}

View File

@ -0,0 +1,26 @@
package org.bigbluebutton.transcode.api;
import java.util.Map;
public class UpdateVideoTranscoderReply extends InternalMessage {
private final String meetingId;
private final String transcoderId;
private final String output;
public UpdateVideoTranscoderReply(String meetingId, String transcoderId, String output) {
this.meetingId = meetingId;
this.transcoderId = transcoderId;
this.output = output;
}
public String getMeetingId() {
return meetingId;
}
public String getTranscoderId() {
return transcoderId;
}
public String getOutput() {
return output;
}
}

View File

@ -0,0 +1,15 @@
package org.bigbluebutton.transcode.api;
import java.util.Map;
public class UpdateVideoTranscoderRequest extends InternalMessage {
private final Map<String,String> params;
public UpdateVideoTranscoderRequest(Map<String,String> params) {
this.params = params;
}
public Map<String,String> getParams() {
return params;
}
}

View File

@ -0,0 +1,766 @@
package org.bigbluebutton.transcode.core;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.StringReader;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.bigbluebutton.transcode.core.ffprobe.FFProbeCommand;
import org.bigbluebutton.transcode.core.ffmpeg.FFmpegCommand;
import org.bigbluebutton.transcode.core.ffmpeg.FFmpegConstants;
import org.bigbluebutton.transcode.core.ffmpeg.FFmpegUtils;
import org.bigbluebutton.transcode.core.processmonitor.ProcessMonitor;
import org.bigbluebutton.transcode.core.processmonitor.ProcessMonitorObserver;
import org.bigbluebutton.common.messages.Constants;
import akka.actor.UntypedActor;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.japi.Creator;
import org.bigbluebutton.transcode.api.InternalMessage;
import org.bigbluebutton.transcode.api.DestroyVideoTranscoderRequest;
import org.bigbluebutton.transcode.api.DestroyVideoTranscoderReply;
import org.bigbluebutton.transcode.api.RestartVideoTranscoderRequest;
import org.bigbluebutton.transcode.api.RestartVideoTranscoderReply;
import org.bigbluebutton.transcode.api.StartVideoProbingRequest;
import org.bigbluebutton.transcode.api.StartVideoProbingReply;
import org.bigbluebutton.transcode.api.StartVideoTranscoderRequest;
import org.bigbluebutton.transcode.api.StartVideoTranscoderReply;
import org.bigbluebutton.transcode.api.StopVideoTranscoderRequest;
import org.bigbluebutton.transcode.api.StopVideoTranscoderReply;
import org.bigbluebutton.transcode.api.UpdateVideoTranscoderRequest;
import org.bigbluebutton.transcode.api.UpdateVideoTranscoderReply;
import org.bigbluebutton.transcode.api.TranscodingFinishedSuccessfully;
import org.bigbluebutton.transcode.api.TranscodingFinishedUnsuccessfully;
public class VideoTranscoder extends UntypedActor implements ProcessMonitorObserver {
public static enum Type{TRANSCODE_RTP_TO_RTMP, TRANSCODE_RTMP_TO_RTP,TRANSCODE_FILE_TO_RTP, TRANSCODE_FILE_TO_RTMP, PROBE_RTMP};
public static enum Status{RUNNING, STOPPED, UPDATING}
public static final String VIDEO_CONF_LOGO_PATH = FFmpegUtils.videoconfLogoPath;
public static final String FFMPEG_PATH = FFmpegUtils.ffmpegPath;
public static final String FFPROBE_PATH = FFmpegUtils.ffprobePath;
//if ffmpeg restarts 5 times in less than 5 seconds, we will not restart it anymore
//this is to prevent a infinite loop of ffmpeg restartings
private static final int MAX_RESTARTINGS_NUMBER = 5;
private static final long MIN_RESTART_TIME = 5000; //5 seconds
private int currentFFmpegRestartNumber = 0;
private long lastFFmpegRestartTime = 0;
private ActorRef parentActor;
private Type type;
private Status status = Status.STOPPED;
private ProcessMonitor ffmpegProcessMonitor;
private ProcessMonitor ffprobeProcessMonitor;
private FFmpegCommand ffmpeg;
private String transcoderId;
private String username;
private String callername; //used to create rtp-> (any) SDP
private String videoStreamName;
private String input;
private String outputLive;
private String output; //output of transcoder
private String meetingId;
private String voiceBridge;
private String sourceIp;
private String destinationIp;
private String localVideoPort;
private String remoteVideoPort;
private String sdpPath;
private VideoTranscoderObserver observer;
private String globalVideoWidth = "640";// get this from properties (Stored in FFmpegUtils)
private String globalVideoHeight = "480";// get this from properties
public static final String FFMPEG_NAME = "FFMPEG";
public static final String FFPROBE_NAME = "FFPROBE";
public static Props props(final ActorRef parentActor, final String meetingId, final String transcoderId, final Map<String,String> params) {
return Props.create(new Creator<VideoTranscoder>() {
private static final long serialVersionUID = 1L;
@Override
public VideoTranscoder create() throws Exception {
return new VideoTranscoder(parentActor, meetingId, transcoderId, params);
}
});
}
@Override
public void onReceive(Object msg) {
if (msg instanceof StartVideoTranscoderRequest) {
start();
} else if (msg instanceof StopVideoTranscoderRequest) {
stop();
} else if (msg instanceof UpdateVideoTranscoderRequest) {
UpdateVideoTranscoderRequest uvtr = (UpdateVideoTranscoderRequest) msg;
update(uvtr.getParams());
} else if (msg instanceof DestroyVideoTranscoderRequest) {
destroyTranscoder();
} else if (msg instanceof RestartVideoTranscoderRequest) {
restart();
} else if (msg instanceof StartVideoProbingRequest) {
probeVideoStream();
}
}
private void stopActor() {
if (context() != null)
context().stop(getSelf());
}
private void sendMessage(InternalMessage msg) {
if ((parentActor != null) && (msg != null))
parentActor.tell(msg,getSelf());
}
public VideoTranscoder(ActorRef parentActor, String meetingId, String transcoderId, Map<String,String> params){
this.parentActor = parentActor;
this.meetingId = meetingId;
this.transcoderId = transcoderId;
if (params != null)
switch (params.get(Constants.TRANSCODER_TYPE)){
case Constants.TRANSCODE_RTP_TO_RTMP:
this.type = Type.TRANSCODE_RTP_TO_RTMP;
this.sourceIp = params.get(Constants.LOCAL_IP_ADDRESS);
this.localVideoPort = params.get(Constants.LOCAL_VIDEO_PORT);
this.remoteVideoPort = params.get(Constants.REMOTE_VIDEO_PORT);
this.destinationIp = params.get(Constants.SIP_HOST);
this.voiceBridge = params.get(Constants.VOICE_CONF);
this.callername = params.get(Constants.CALLERNAME);
break;
case Constants.TRANSCODE_RTMP_TO_RTP:
this.type = Type.TRANSCODE_RTMP_TO_RTP;
this.sourceIp = params.get(Constants.LOCAL_IP_ADDRESS);
this.localVideoPort = params.get(Constants.LOCAL_VIDEO_PORT);
this.remoteVideoPort = params.get(Constants.REMOTE_VIDEO_PORT);
this.destinationIp = params.get(Constants.SIP_HOST);
this.voiceBridge = params.get(Constants.VOICE_CONF);
this.callername = params.get(Constants.CALLERNAME);
this.videoStreamName = params.get(Constants.INPUT);
break;
case Constants.TRANSCODE_FILE_TO_RTP:
this.type = Type.TRANSCODE_FILE_TO_RTP;
this.sourceIp = params.get(Constants.LOCAL_IP_ADDRESS);
this.localVideoPort = params.get(Constants.LOCAL_VIDEO_PORT);
this.remoteVideoPort = params.get(Constants.REMOTE_VIDEO_PORT);
this.destinationIp = params.get(Constants.SIP_HOST);
this.voiceBridge = params.get(Constants.VOICE_CONF);
this.callername = params.get(Constants.CALLERNAME);
break;
case Constants.TRANSCODE_FILE_TO_RTMP:
this.type = Type.TRANSCODE_FILE_TO_RTMP;
this.sourceIp = params.get(Constants.LOCAL_IP_ADDRESS);
this.localVideoPort = params.get(Constants.LOCAL_VIDEO_PORT);
this.remoteVideoPort = params.get(Constants.REMOTE_VIDEO_PORT);
this.destinationIp = params.get(Constants.SIP_HOST);
this.voiceBridge = params.get(Constants.VOICE_CONF);
this.callername = params.get(Constants.CALLERNAME);
break;
case Constants.PROBE_RTMP:
this.type = Type.PROBE_RTMP;
this.videoStreamName = params.get(Constants.INPUT);
break;
default:
break;
}
}
private synchronized void start() {
switch (status){
case RUNNING:
System.out.println(" > Transcoder already running, sending it's output");
break;
case UPDATING:
System.out.println(" > Transcoder is being updated, returning it's current output");
break;
default:
status = Status.RUNNING;
startTranscoder();
break;
}
sendMessage(new StartVideoTranscoderReply(meetingId, transcoderId, output));
}
private boolean startTranscoder(){
if ((ffmpegProcessMonitor != null) &&(ffmpeg != null)) {
return false;
}
String[] command;
if(!canFFmpegRun()) {
//log.debug("***TRANSCODER WILL NOT START: ffmpeg cannot run");
return false;
}
//log.debug("Starting Video Transcoder...");
switch(type){
case TRANSCODE_RTMP_TO_RTP:
if(!areRtmpToRtpParametersValid()) {
System.out.println(" > ***TRANSCODER WILL NOT START: Rtmp to Rtp Parameters are invalid");
return false;
}
input = "rtmp://" + sourceIp + "/video/" + meetingId + "/"
+ videoStreamName + " live=1"; //the full input is composed by the videoStreamName
outputLive = "rtp://" + destinationIp + ":" + remoteVideoPort + "?localport=" + localVideoPort;
output = "";
ffmpeg = new FFmpegCommand();
ffmpeg.setFFmpegPath(FFMPEG_PATH);
ffmpeg.setInput(input);
ffmpeg.addRtmpInputConnectionParameter(meetingId);
ffmpeg.addRtmpInputConnectionParameter("transcoder-"+transcoderId);
ffmpeg.setFrameRate(15);
ffmpeg.setBufSize(1024);
ffmpeg.setGop(1); //MCU compatibility
ffmpeg.setCodec("libopenh264");
ffmpeg.setMaxRate(1024);
ffmpeg.setSliceMode("dyn");
ffmpeg.setMaxNalSize("1024");
ffmpeg.setRtpFlags("h264_mode0"); //RTP's packetization mode 0
ffmpeg.setProfile("baseline");
ffmpeg.setFormat("rtp");
ffmpeg.setPayloadType(FFmpegConstants.CODEC_ID_H264);
ffmpeg.setLoglevel("verbose");
ffmpeg.setOutput(outputLive);
ffmpeg.setAnalyzeDuration("1000"); // 1ms
ffmpeg.setProbeSize("32"); // 1ms
System.out.println("Preparing FFmpeg process monitor");
command = ffmpeg.getFFmpegCommand(true);
break;
case TRANSCODE_RTP_TO_RTMP:
if(!areRtpToRtmpParametersValid()) {
System.out.println(" > ***TRANSCODER WILL NOT START: Rtp to Rtmp Parameters are invalid");
return false;
}
//Create SDP FILE
sdpPath = FFmpegUtils.createSDPVideoFile(callername, sourceIp, localVideoPort, FFmpegConstants.CODEC_NAME_H264, FFmpegConstants.CODEC_ID_H264, FFmpegConstants.SAMPLE_RATE_H264, voiceBridge);
input = sdpPath;
//Generate video stream name
videoStreamName = generateVideoStreamName(type);
outputLive = "rtmp://" + destinationIp + "/video/" + meetingId + "/"
+ videoStreamName+" live=1";
output = videoStreamName;
ffmpeg = new FFmpegCommand();
ffmpeg.setFFmpegPath(FFMPEG_PATH);
ffmpeg.setInput(input);
ffmpeg.setFormat("flv");
ffmpeg.setLoglevel("verbose");
ffmpeg.setOutput(outputLive);
ffmpeg.addRtmpOutputConnectionParameter(meetingId);
ffmpeg.addRtmpOutputConnectionParameter("transcoder-"+transcoderId);
ffmpeg.setVideoBitRate(1024);
ffmpeg.setBufSize(1024);
ffmpeg.setMaxRate(1024);
ffmpeg.setCodec("libopenh264");
ffmpeg.setProfile("baseline");
ffmpeg.setAnalyzeDuration("1000"); // 10ms
ffmpeg.addCustomParameter("-s", globalVideoWidth+"x"+globalVideoHeight);
ffmpeg.addCustomParameter("-filter:v","scale=iw*min("+globalVideoWidth+"/iw\\,"+globalVideoHeight+"/ih):ih*min("+globalVideoWidth+"/iw\\,"+globalVideoHeight+"/ih), pad="+globalVideoWidth+":"+globalVideoHeight+":("+globalVideoWidth+"-iw*min("+globalVideoWidth+"/iw\\,"+globalVideoHeight+"/ih))/2:("+globalVideoHeight+"-ih*min("+globalVideoWidth+"/iw\\,"+globalVideoHeight+"/ih))/2, fps=fps=15");
command = ffmpeg.getFFmpegCommand(true);
break;
case TRANSCODE_FILE_TO_RTP:
if(!areFileToRtpParametersValid()) {
System.out.println(" > ***TRANSCODER WILL NOT START: File to Rtp Parameters are invalid");
return false;
}
input = VIDEO_CONF_LOGO_PATH;
outputLive = "rtp://" + destinationIp + ":" + remoteVideoPort + "?localport=" + localVideoPort;
output = "";
username = callername;
ffmpeg = new FFmpegCommand();
ffmpeg.setFFmpegPath(FFMPEG_PATH);
ffmpeg.setIgnoreLoop(0);
ffmpeg.setInput(input);
ffmpeg.setInputLive(true);
ffmpeg.addCustomParameter("-s", globalVideoWidth+"x"+globalVideoHeight);
ffmpeg.setFrameRate(15);
ffmpeg.setPayloadType(FFmpegConstants.CODEC_ID_H264);
ffmpeg.setLoglevel("quiet");
if (FFmpegUtils.isUserVideoSubtitleEnabled())
ffmpeg.addCustomParameter("-vf","drawtext=fontfile=/usr/share/fonts/truetype/liberation/LiberationSans-Bold.ttf:text="+username+":x="+globalVideoWidth+"-tw-20:y="+globalVideoHeight+"-th-20:fontcolor=white@0.9:shadowcolor=black:shadowx=2:shadowy=2:fontsize=20");
ffmpeg.setGop(1);
ffmpeg.setCodec("libopenh264");
ffmpeg.setSliceMode("dyn");
ffmpeg.setMaxNalSize("1024");
ffmpeg.setRtpFlags("h264_mode0"); //RTP's packetization mode 0
ffmpeg.setProfile("baseline");
ffmpeg.setFormat("rtp");
ffmpeg.setOutput(outputLive);
command = ffmpeg.getFFmpegCommand(true);
break;
case TRANSCODE_FILE_TO_RTMP:
if(!areFileToRtmpParametersValid()) {
System.out.println("***TRANSCODER WILL NOT START: File to Rtmp Parameters are invalid");
return false;
}
videoStreamName = generateVideoStreamName(type);
input = VIDEO_CONF_LOGO_PATH;
outputLive = "rtmp://" + destinationIp + "/video/" + meetingId + "/"
+ videoStreamName+" live=1";
output = videoStreamName;
ffmpeg = new FFmpegCommand();
ffmpeg.setFFmpegPath(FFMPEG_PATH);
ffmpeg.setInput(input);
ffmpeg.setInputLive(true);
ffmpeg.setFrameSize("640x480");
ffmpeg.setIgnoreLoop(0);
ffmpeg.setFormat("flv");
ffmpeg.setLoglevel("verbose");
ffmpeg.addRtmpOutputConnectionParameter(meetingId);
ffmpeg.addRtmpOutputConnectionParameter("transcoder-"+transcoderId);
ffmpeg.setOutput(outputLive);
ffmpeg.setCodec("libopenh264");
ffmpeg.setProfile("baseline");
command = ffmpeg.getFFmpegCommand(true);
break;
default: command = null;
}
if(command != null){
this.ffmpegProcessMonitor = new ProcessMonitor(command,FFMPEG_NAME);
ffmpegProcessMonitor.setProcessMonitorObserver(this);
ffmpegProcessMonitor.start();
return true;
}
return false;
}
private synchronized void stop(){
status = Status.STOPPED;
stopTranscoder();
sendMessage(new StopVideoTranscoderReply(meetingId, transcoderId));
}
private void stopTranscoder() {
if (ffmpegProcessMonitor != null) {
ffmpegProcessMonitor.forceDestroy();
clearData();
}
}
/**
* Clear monitor and ffmpeg data.
*/
private void clearData() {
ffmpegProcessMonitor = null;
ffmpeg = null;
switch (type) {
case TRANSCODE_RTP_TO_RTMP:
FFmpegUtils.removeSDPVideoFile(voiceBridge);
break;
default:
}
}
private synchronized void destroyTranscoder() {
status = Status.STOPPED;
stopTranscoder();
sendMessage(new DestroyVideoTranscoderReply(meetingId, transcoderId));
stopActor();
}
private synchronized void update(Map<String,String> params) {
switch (status) {
case UPDATING:
status = Status.RUNNING;
startTranscoder();
sendMessage(new UpdateVideoTranscoderReply(meetingId, transcoderId, output));
break;
default:
if (params != null) {
String transcoderType = params.get(Constants.TRANSCODER_TYPE);
String input = params.get(Constants.INPUT);
String sourceIp = params.get(Constants.LOCAL_IP_ADDRESS);
String localVideoPort = params.get(Constants.LOCAL_VIDEO_PORT);
String remoteVideoPort = params.get(Constants.REMOTE_VIDEO_PORT);
String destinationIp = params.get(Constants.SIP_HOST);
setType(transcoderType);
setVideoStreamName(input);
setSourceIp(sourceIp);
setLocalVideoPort(localVideoPort);
setRemoteVideoPort(remoteVideoPort);
setDestinationIp(destinationIp);
status = Status.UPDATING; //mark update status
stopTranscoder();
}
break;
}
}
private synchronized void restart() {
if (!maxRestartsReached()) {
System.out.println(" > [Restart] Starting current transcoder " + transcoderId);
status = Status.RUNNING;
lastFFmpegRestartTime = System.currentTimeMillis();
clearData();
startTranscoder();
sendMessage(new RestartVideoTranscoderReply(meetingId, transcoderId, output));
}
}
private boolean maxRestartsReached() {
currentFFmpegRestartNumber++;
if(currentFFmpegRestartNumber == MAX_RESTARTINGS_NUMBER) {
long timeInterval = System.currentTimeMillis() - lastFFmpegRestartTime;
if(timeInterval <= MIN_RESTART_TIME) {
System.out.println(" > Max number of ffmpeg restartings reached in " + timeInterval + " miliseconds for " + transcoderId + "'s Video Transcoder." +
" Not restating it anymore.");
return true;
}
else
currentFFmpegRestartNumber = 0;
}
return false;
}
public synchronized void transcodingFinishedSuccessfully() {
sendMessage(new TranscodingFinishedSuccessfully(transcoderId)); //tell parent for clean up
stopActor();
}
public synchronized void probeVideoStream(){
if (ffmpegProcessMonitor != null) {
FFProbeCommand ffprobe = new FFProbeCommand(outputLive);
String command[];
ffprobe.setFFprobepath(FFPROBE_PATH);
ffprobe.setInput(outputLive);
ffprobe.setAnalyzeDuration("1");
ffprobe.setShowStreams();
ffprobe.setLoglevel("quiet");
ffprobe.getFFprobeCommand(true);
command = ffprobe.getFFprobeCommand(true);
if(command != null){
this.ffprobeProcessMonitor = new ProcessMonitor(command,FFPROBE_NAME);
ffprobeProcessMonitor.setProcessMonitorObserver(this);
ffprobeProcessMonitor.start();
}
} else {
}
}
private void updateGlobalStreamName(String streamName){
this.videoStreamName = streamName;
String outputLive;
String[] newCommand;
outputLive = "rtmp://" + destinationIp + "/video/" + meetingId + "/"
+ this.videoStreamName+" live=1";
ffmpeg.setOutput(outputLive); //update ffmpeg's output
newCommand = ffmpeg.getFFmpegCommand(true);
ffmpegProcessMonitor.setCommand(newCommand); //update ffmpeg command
}
public void setVideoTranscoderObserver(VideoTranscoderObserver observer){
this.observer = observer;
}
@Override
public void handleProcessFinishedUnsuccessfully(String processMonitorName,String processOutput) {
if ((processMonitorName == null)|| processMonitorName.isEmpty()){
return;
}
if (FFMPEG_NAME.equals(processMonitorName)){
sendMessage(new TranscodingFinishedUnsuccessfully(transcoderId));
}else if (FFPROBE_NAME.equals(processMonitorName)){
System.out.println(" > Failed to probe video stream " + outputLive);
}
}
@Override
public void handleProcessFinishedWithSuccess(String processMonitorName, String processOutput) {
if ((processMonitorName == null)|| processMonitorName.isEmpty()) {
System.out.println(" > Can't handle process process monitor finishing with success: UNKNOWN PROCESS");
return;
}
if (FFMPEG_NAME.equals(processMonitorName)){
switch (status) {
case RUNNING:
System.out.println(" > Transcoder finished with success but wasn't closed by the user " + transcoderId + ". Informing parentActor");
transcodingFinishedSuccessfully();
break;
case STOPPED:
System.out.println(" > Transcoder closed by the user. Finished with success");
break;
case UPDATING:
update(null);
break;
default:
break;
}
}
else if (FFPROBE_NAME.equals(processMonitorName)){
String ffprobeOutput = processOutput;
Map<String,String> ffprobeData = parseFFprobeOutput(ffprobeOutput);
sendMessage(new StartVideoProbingReply(meetingId, transcoderId, ffprobeData));
}else{
System.out.println("Can't handle process monitor finishing with success: UNKNOWN PROCESS");
}
}
public Map<String,String> parseFFprobeOutput(String ffprobeOutput){
Pattern pattern = Pattern.compile("(.*)=(.*)");
Map<String, String> ffprobeResult = new HashMap<String, String>();
BufferedReader buf = new BufferedReader(new StringReader(ffprobeOutput));
String line = null;
try {
while( (line=buf.readLine()) != null){
Matcher matcher = pattern.matcher(line);
if(matcher.matches()) {
ffprobeResult.put(matcher.group(1), matcher.group(2));
}
}
} catch (IOException e){
//log.debug("Error when parsing FFprobe's output");
}
return ffprobeResult;
}
public boolean canFFmpegRun() {
//log.debug("Checking if FFmpeg can run...");
return validateIps() && isFFmpegPathValid();
}
public boolean validateIps(){
if ((sourceIp == null || sourceIp.isEmpty())
&& (type == Type.TRANSCODE_RTMP_TO_RTP))
return false;
if ((destinationIp == null || destinationIp.isEmpty())
&& (type == Type.TRANSCODE_FILE_TO_RTMP
|| type == Type.TRANSCODE_FILE_TO_RTP
|| type == Type.TRANSCODE_RTMP_TO_RTP
|| type == Type.TRANSCODE_FILE_TO_RTP))
return false;
return true;
}
public boolean isFFmpegPathValid() {
/*if (!GlobalCall.ffmpegExists(FFMPEG_PATH)) {
//log.debug("***FFMPEG DOESN'T EXIST: check the FFMPEG path in bigbluebutton-sip.properties");
return false;
}*/
return true;
}
public boolean areRtmpToRtpParametersValid() {
//log.debug("Checking Rtmp to Rtp Transcoder Parameters...");
if(meetingId == null || meetingId.isEmpty()) {
//log.debug("meetingId is null or empty");
return false;
}
if(videoStreamName == null || videoStreamName.isEmpty()) {
//log.debug("videoStreamName is null or empty");
return false;
}
return areVideoPortsValid();
}
public boolean areRtpToRtmpParametersValid() {
//log.debug("Checking Rtp to Rtmp Transcoder Parameters...");
if(meetingId == null || meetingId.isEmpty()) {
//log.debug("meetingId is null or empty");
return false;
}
return isSdpPathValid();
}
public boolean areFileToRtpParametersValid() {
//log.debug("Checking File to Rtp Transcoder Parameters...");
return areVideoPortsValid() && isVideoConfLogoValid();
}
public boolean areFileToRtmpParametersValid() {
//log.debug("Checking File to Rtmp Transcoder Parameters...");
if(meetingId == null || meetingId.isEmpty()) {
//log.debug("meetingId is null or empty");
return false;
}
return isVideoConfLogoValid();
}
public boolean areVideoPortsValid() {
if(localVideoPort == null || localVideoPort.isEmpty()) {
//log.debug("localVideoPort is null or empty");
return false;
}
if(remoteVideoPort == null || remoteVideoPort.isEmpty()) {
//log.debug("remoteVideoPort is null or empty");
return false;
}
if(localVideoPort.equals("0")) {
//log.debug("localVideoPort is 0");
return false;
}
if(remoteVideoPort.equals("0")) {
//log.debug("remoteVideoPort is 0");
return false;
}
return true;
}
public boolean isVideoConfLogoValid() {
/*if(!GlobalCall.videoConfLogoExists(VIDEO_CONF_LOGO_PATH)) {
//log.debug("***IMAGE FOR VIDEOCONF-LOGO VIDEO DOESN'T EXIST: check the image path in bigbluebutton-sip.properties");
return false;
}*/
return true;
}
public boolean isSdpPathValid() {
/*if(!GlobalCall.sdpVideoExists(sdpPath)) {
//log.debug("***SDP FOR GLOBAL FFMPEG ({}) doesn't exist", sdpPath);
return false;
}*/
return true;
}
public String getVideoStreamName(){
return this.videoStreamName;
}
public String getTranscoderId(){
return this.transcoderId;
}
public String getMeetingId(){
return this.meetingId;
}
public String getOutput(){
return this.output;
}
public String generateVideoStreamName(Type type){
switch(type){
case TRANSCODE_RTP_TO_RTMP:
return FFmpegUtils.GLOBAL_VIDEO_STREAM_NAME_PREFIX + voiceBridge + "_" + System.currentTimeMillis();
case TRANSCODE_FILE_TO_RTMP:
return FFmpegUtils.VIDEOCONF_LOGO_STREAM_NAME_PREFIX + voiceBridge + "_" + System.currentTimeMillis();
default:
return "unknown_stream_name";
}
}
public void setVideoStreamName(String videoStreamName) {
if (videoStreamName != null) this.videoStreamName = videoStreamName;
}
public void setType(String type) {
if (type == null) return;
switch (type){
case Constants.TRANSCODE_RTP_TO_RTMP:
this.type = Type.TRANSCODE_RTP_TO_RTMP;
break;
case Constants.TRANSCODE_RTMP_TO_RTP:
this.type = Type.TRANSCODE_RTMP_TO_RTP;
break;
case Constants.TRANSCODE_FILE_TO_RTP:
this.type = Type.TRANSCODE_FILE_TO_RTP;
break;
case Constants.TRANSCODE_FILE_TO_RTMP:
this.type = Type.TRANSCODE_FILE_TO_RTMP;
break;
default:
return;
}
}
public String getType() {
switch (type){
case TRANSCODE_RTP_TO_RTMP:
return Constants.TRANSCODE_RTP_TO_RTMP;
case TRANSCODE_RTMP_TO_RTP:
return Constants.TRANSCODE_RTMP_TO_RTP;
case TRANSCODE_FILE_TO_RTP:
return Constants.TRANSCODE_FILE_TO_RTP;
case TRANSCODE_FILE_TO_RTMP:
return Constants.TRANSCODE_FILE_TO_RTMP;
default:
return "UNKNOWN";
}
}
public void setSourceIp(String sourceIp) {
if (sourceIp != null) this.sourceIp = sourceIp;
}
public String getSourceIp() {
return sourceIp;
}
public void setLocalVideoPort(String localVideoPort) {
if (localVideoPort != null) this.localVideoPort = localVideoPort;
}
public String getLocalVideoPort() {
return localVideoPort;
}
public void setRemoteVideoPort(String remoteVideoPort) {
if (remoteVideoPort != null) this.remoteVideoPort = remoteVideoPort;
}
public String getRemoteVideoPort() {
return remoteVideoPort;
}
public void setDestinationIp(String destinationIp) {
if (destinationIp != null) this.destinationIp = destinationIp;
}
public String getDestinationIp() {
return destinationIp;
}
}

View File

@ -0,0 +1,9 @@
package org.bigbluebutton.transcode.core;
import java.util.Map;
public interface VideoTranscoderObserver {
public void handleTranscodingFinishedUnsuccessfully();
public void handleTranscodingFinishedWithSuccess();
public void handleVideoProbingFinishedWithSuccess(Map<String,String> ffprobeResult);
}

View File

@ -0,0 +1,11 @@
package org.bigbluebutton.transcode.core.api;
import java.util.Map;
public interface ITranscodingInGW {
void startTranscoder(String meetingId, String transcoderId, Map<String, String> params);
void updateTranscoder(String meetingId, String transcoderId, Map<String, String> params);
void stopTranscoder(String meetingId, String transcoderId);
void startProbing(String meetingId, String transcoderId, Map<String, String> params);
}

View File

@ -0,0 +1,387 @@
package org.bigbluebutton.transcode.core.ffmpeg;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
import java.util.Iterator;
public class FFmpegCommand {
private HashMap args;
private HashMap x264Params;
private List<String[]> rtmpInputConnParams;
private List<String[]> rtmpOutputConnParams;
private String[] command;
private String ffmpegPath;
private String input;
private String output;
private Boolean inputLive;
/* Analyze duration is a special parameter that MUST come before the input */
private String analyzeDuration;
private String probeSize;
/* Parameters when the input is a loop image/file */
private String loop;
private String ignoreLoop;
private int frameRate;
private String frameSize;
public FFmpegCommand() {
this.args = new HashMap();
this.x264Params = new HashMap();
this.rtmpInputConnParams = new ArrayList<String[]>();
this.rtmpOutputConnParams = new ArrayList<String[]>();
this.ffmpegPath = null;
this.inputLive = false;
this.loop = null;
this.frameRate = 0;
}
public String[] getFFmpegCommand(boolean shouldBuild) {
if(shouldBuild)
buildFFmpegCommand();
return this.command;
}
public void buildFFmpegCommand() {
List comm = new ArrayList<String>();
if(this.ffmpegPath == null)
this.ffmpegPath = "/usr/local/bin/ffmpeg";
comm.add(this.ffmpegPath);
if (this.inputLive){
comm.add("-re");
}
/* Analyze duration and probesize MUST come before the input */
if(analyzeDuration != null && !analyzeDuration.isEmpty()) {
comm.add("-analyzeduration");
comm.add(analyzeDuration);
}
if(loop != null && !loop.isEmpty()){
comm.add("-loop");
comm.add(loop);
}
if(ignoreLoop != null && !ignoreLoop.isEmpty()){
comm.add("-ignore_loop");
comm.add(ignoreLoop);
}
if(probeSize != null && !probeSize.isEmpty()) {
comm.add("-probesize");
comm.add(probeSize);
}
buildRtmpInput();
comm.add("-i");
comm.add(input);
Iterator argsIter = this.args.entrySet().iterator();
while (argsIter.hasNext()) {
Map.Entry pairs = (Map.Entry)argsIter.next();
comm.add(pairs.getKey());
comm.add(pairs.getValue());
}
if(!x264Params.isEmpty()) {
comm.add("-x264-params");
String params = "";
Iterator x264Iter = this.x264Params.entrySet().iterator();
while (x264Iter.hasNext()) {
Map.Entry pairs = (Map.Entry)x264Iter.next();
String argValue = pairs.getKey() + "=" + pairs.getValue();
params += argValue;
// x264-params are separated by ':'
params += ":";
}
// Remove trailing ':'
params.replaceAll(":+$", "");
comm.add(params);
}
buildRtmpOutput();
comm.add(this.output);
this.command = new String[comm.size()];
comm.toArray(this.command);
}
/**
* Add rtmp parameters (if there are any) to the current input,
* if the input is rtmp.
*/
private void buildRtmpInput() {
if(!rtmpInputConnParams.isEmpty() && isRtmpInput()) {
StringBuilder sb = new StringBuilder();
for (String s[] : rtmpInputConnParams){
sb.append("conn="+s[0]+":"+s[1]+" ");
}
input+=" "+sb.toString().trim();
}
}
/**
* Add rtmp parameters (if there are any) to the current output,
* if the output is rtmp.
*/
private void buildRtmpOutput() {
if(!rtmpOutputConnParams.isEmpty() && isRtmpOutput()) {
StringBuilder sb = new StringBuilder();
for (String s[] : rtmpOutputConnParams){
sb.append("conn="+s[0]+":"+s[1]+" ");
}
output+=" "+sb.toString().trim();
}
}
public void setFFmpegPath(String arg) {
this.ffmpegPath = arg;
}
public void setInput(String arg) {
this.input = arg;
}
public void setInputLive(Boolean live){
this.inputLive = live;
}
public void setOutput(String arg) {
this.output = arg;
}
public void setCodec(String arg) {
this.args.put("-vcodec", arg);
}
public void setLoop(String arg) {
this.loop = arg;
}
/**
* Set ignore loop (valid for GIFs input, only)
* 0: means that input GIF will loop indefinitely
* @param arg
*/
public void setIgnoreLoop(int arg) {
this.ignoreLoop = Integer.toString(arg);
}
public void setLevel(String arg) {
this.args.put("-level", arg);
}
public void setPreset(String arg) {
this.args.put("-preset", arg);
}
public void setProfile(String arg) {
this.args.put("-profile:v", arg);
}
public void setFormat(String arg) {
this.args.put("-f", arg);
}
public void setPayloadType(String arg) {
this.args.put("-payload_type", arg);
}
public void setLoglevel(String arg) {
this.args.put("-loglevel", arg);
}
public void setPixelFormat(String arg){
this.args.put("-pix_fmt", arg);
}
/**
* Set video bitrate, in Kbps.
* @param arg
*/
public void setVideoBitRate(int arg){
this.args.put("-b:v", Integer.toString(arg)+"k");
}
/**
* Set bufsize, in Kb.
* @param arg
*/
public void setBufSize(int arg){
this.args.put("-bufsize", Integer.toString(arg)+"k");
}
/**
* Set maximum bitrate, in Kbps.
* @param arg
*/
public void setMaxRate(int arg){
this.args.put("-maxrate", Integer.toString(arg)+"k");
}
/**
* Set Group of images (GOP)
* @param arg
*/
public void setGop(int arg){
this.args.put("-g", Integer.toString(arg));
}
/**
* Set maximum NAL size, in bytes.
* This option works with libopenh264 encoder,only
* @param arg
*/
public void setMaxNalSize(String arg){
this.args.put("-max_nal_size", arg);
}
/**
* Set slice_mode for libopenh264 encoder.
* @param arg
*/
public void setSliceMode(String arg){
this.args.put("-slice_mode", arg);
}
/**
* Set rtpflags for RTP encoder.
* @param arg
*/
public void setRtpFlags(String arg){
this.args.put("-rtpflags", arg);
}
public void setSliceMaxSize(String arg) {
this.x264Params.put("slice-max-size", arg);
}
public void setMaxKeyFrameInterval(String arg) {
this.x264Params.put("keyint", arg);
}
public void addCustomParameter(String name, String value) {
this.args.put(name, value);
}
/**
* Set how much time FFmpeg should analyze stream
* data to get stream information. Note that this
* affects directly the delay to start the stream.
*
* @param duration Analysis duration
*/
public void setAnalyzeDuration(String duration) {
this.analyzeDuration = duration;
}
/**
* Probe size, in bytes.
* Minimum value: 32
* Default: 5000000
**/
public void setProbeSize(String size) {
this.probeSize = size;
}
/**
* Set frame rate of the input data
* @param value
*/
public void setFrameRate(int value){
if (value>0)
this.args.put("-r",Integer.toString(value));
}
public void setFrameSize(String value){
this.frameSize = value;
}
/**
* Add parameters for rtmp connections.
* The order of parameters is the order they are added
* @param value
*/
public void addRtmpInputConnectionParameter(String value){
//S: String
this.rtmpInputConnParams.add(new String[]{"S", value});
}
/**
* Add parameters for rtmp connections.
* The order of parameters is the order they are added
* @param value
*/
public void addRtmpInputConnectionParameter(boolean value){
//B: Boolean
this.rtmpInputConnParams.add(new String[]{"B", value?"1":"0"});
}
/**
* Add parameters for rtmp connections.
* The order of parameters is the order they are added
* @param value
*/
public void addRtmpInputConnectionParameter(int value){
//N : Number
this.rtmpInputConnParams.add(new String[]{"N", Integer.toString(value)});
}
/**
* Add parameters for rtmp connections.
* The order of parameters is the order they are added
* @param value
*/
public void addRtmpOutputConnectionParameter(String value){
//S: String
this.rtmpOutputConnParams.add(new String[]{"S", value});
}
/**
* Add parameters for rtmp connections.
* The order of parameters is the order they are added
* @param value
*/
public void addRtmpOutputConnectionParameter(boolean value){
//B: Boolean
this.rtmpOutputConnParams.add(new String[]{"B", value?"1":"0"});
}
/**
* Add parameters for rtmp connections.
* The order of parameters is the order they are added
* @param value
*/
public void addRtmpOutputConnectionParameter(int value){
//N : Number
this.rtmpOutputConnParams.add(new String[]{"N", Integer.toString(value)});
}
/**
* Check if the current set intput is rtmp
* @return
*/
private boolean isRtmpInput(){
return input.contains("rtmp");
}
/**
* Check if the current set output is rtmp
* @return
*/
private boolean isRtmpOutput(){
return output.contains("rtmp");
}
}

View File

@ -0,0 +1,45 @@
package org.bigbluebutton.transcode.core.ffmpeg;
public class FFmpegConstants {
//exit codes (obtained from process exit code)
public static final int FATAL_ERROR_CODE = 128;
public static final int EXIT_WITH_SUCCESS_CODE = 0;
public static final int EXIT_WITH_NO_INPUT_CODE = 1;
public static final int EXIT_WITH_SIGKILL_CODE = FATAL_ERROR_CODE + 9;
public static final int ACCEPTABLE_EXIT_CODES[] = {EXIT_WITH_SUCCESS_CODE,EXIT_WITH_SIGKILL_CODE};
//status codes (obtained from sterr/out)
public static final int EXIT_WITH_SUCCESS_STATUS = 0;
public static final int EXIT_WITH_NO_INPUT_STATUS = 1;
public static final int RUNNING_STATUS = 2;
public static final int ACCEPTABLE_EXIT_STATUS[] = {EXIT_WITH_SUCCESS_STATUS,EXIT_WITH_NO_INPUT_STATUS};
//output constants (obtained from verbose stderr/out)
public static String FFMPEG_EXIT_WITH_NO_INPUT_OUTPUT = "Connection timed out";
public static String WIDTH = "width";
public static String HEIGHT = "height";
//Codecs
public static final String CODEC_ID_H264 = "96" ;
public static final String CODEC_NAME_H264 = "H264" ;
public static final String SAMPLE_RATE_H264 = "90000" ;
public static boolean acceptableExitCode(int code){
int i;
if ((ACCEPTABLE_EXIT_CODES == null) || (code < 0)) return false;
for(i=0;i<ACCEPTABLE_EXIT_CODES.length;i++)
if (ACCEPTABLE_EXIT_CODES[i] == code)
return true;
return false;
}
public static boolean acceptableExitStatus(int status){
int i;
if ((ACCEPTABLE_EXIT_STATUS == null) || (status < 0)) return false;
for(i=0;i<ACCEPTABLE_EXIT_STATUS.length;i++)
if (ACCEPTABLE_EXIT_STATUS[i] == status)
return true;
return false;
}
}

View File

@ -0,0 +1,137 @@
package org.bigbluebutton.transcode.core.ffmpeg;
import java.io.BufferedWriter;
import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import org.bigbluebutton.transcode.core.TranscodersService;
public class FFmpegUtils {
private static final String LOW_QUALITY = "160x120";
private static final String MEDIUM_QUALITY = "320x240";
private static final String HIGH_QUALITY = "640x480";
public static String defaultVideoWidth;
public static String defaultVideoHeight;
public static final String GLOBAL_VIDEO_STREAM_NAME_PREFIX = "sip_";
public static final String VIDEOCONF_LOGO_STREAM_NAME_PREFIX = "video_conf_";
private static final String sdpVideoFullPath = "/tmp/"+GLOBAL_VIDEO_STREAM_NAME_PREFIX; //when changed , must also change VideoApplication.java in bbb-video
private static OpenOption[] fileOptions = new OpenOption[] {StandardOpenOption.CREATE,StandardOpenOption.WRITE};
public static String ffmpegPath = TranscodersService.ffmpegPath();
public static String ffprobePath = TranscodersService.ffprobePath();
public static String videoconfLogoPath = TranscodersService.videoconfLogoImagePath();
private static boolean enableUserVideoSubtitle = TranscodersService.enableUserVideoSubtitle();
public static String createSDPVideoFile(String userId, String localIpAddress, String localVideoPort, String codecName, String codecId, String sampleRate, String voiceconf) {
Path sdpVideoPath = FileSystems.getDefault().getPath(sdpVideoFullPath + voiceconf+".sdp");
String sdp = "v=0\r\n"
+ "o=" + userId + " 0 0 IN IP4 " + localIpAddress + "\r\n"
+ "s=Session SIP/SDP\r\n"
+ "c=IN IP4 " + localIpAddress + "\r\n"
+ "t=0 0\r\n"
+ "m=video " + localVideoPort + " RTP/AVP " + codecId +"\r\n"
+ "a=rtpmap:" + codecId + " " + codecName + "/" + sampleRate + "/1\r\n";
Charset charset = Charset.forName("US-ASCII");
try {
BufferedWriter writer = Files.newBufferedWriter(sdpVideoPath,charset,fileOptions);
writer.write(sdp, 0, sdp.length());
writer.close();
System.out.println("SDP video file created at: "+sdpVideoPath.toString());
} catch (IOException x) {
System.out.println("Failed to create SDP video file: "+sdpVideoPath.toString());
}
return (sdpVideoPath==null)?null:sdpVideoPath.toString();
}
public static void removeSDPVideoFile(String voiceconf) {
Path sdpVideoPath = FileSystems.getDefault().getPath(sdpVideoFullPath +voiceconf+".sdp");
try {
Files.deleteIfExists(sdpVideoPath);
} catch (IOException e) {
System.out.println("Failed to remove SDP video file: "+sdpVideoPath.toString());
}
}
public String getSdpVideoPath(String voiceconf) {
return sdpVideoFullPath+voiceconf+".sdp";
}
public boolean sdpVideoExists(String sdpFilePath) {
return fileExists(sdpFilePath);
}
private boolean fileExists(String filePath) {
if(filePath == null || filePath.isEmpty())
return false;
return new File(filePath).isFile();
}
public boolean isVideoConfLogoStream(String videoStreamName) {
return ((videoStreamName != null) && (videoStreamName.startsWith(VIDEOCONF_LOGO_STREAM_NAME_PREFIX)));
}
public void setFfmpegPath(String ffPath) {
System.out.println("Trying to set the ffmpeg path to: " + ffPath);
if(ffmpegExists(ffPath)) {
ffmpegPath = ffPath;
System.out.println("ffmpeg path set to: " + ffmpegPath);
}
else
System.out.println("****Could NOT set " + ffPath + " as the ffmpeg path");
}
public boolean ffmpegExists(String ffPath) {
return fileExists(ffPath);
}
public static boolean isUserVideoSubtitleEnabled(){
return enableUserVideoSubtitle;
}
public boolean videoconfLogoExists(String filePath) {
return fileExists(filePath);
}
private static void validateResolution(String resolution) {
System.out.println("Validating sip video resolution: " + resolution);
switch(resolution) {
case LOW_QUALITY:
case MEDIUM_QUALITY:
case HIGH_QUALITY: parseResolution(resolution);
break;
//using the default resolution
default: System.out.println("****The resolution set in bigbluebutton-sip.properties is invalid. Using the default resolution.");
parseResolution(MEDIUM_QUALITY);
}
}
private static void parseResolution(String resolution) {
String[] dimensions = resolution.split("x");
defaultVideoWidth = dimensions[0];
defaultVideoHeight = dimensions[1];
System.out.println("Video Resolution is " + defaultVideoWidth + "x" + defaultVideoHeight);
}
public static String getVideoWidth() {
System.out.println("Getting video width: " + defaultVideoWidth + " (Resolution is " + defaultVideoWidth + "x" + defaultVideoHeight + ")");
return defaultVideoWidth;
}
public static String getVideoHeight() {
System.out.println("Getting video height: " + defaultVideoHeight + " (Resolution is " + defaultVideoWidth + "x" + defaultVideoHeight + ")");
return defaultVideoHeight;
}
}

View File

@ -0,0 +1,95 @@
package org.bigbluebutton.transcode.core.ffprobe;
import java.lang.Runtime;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
import java.util.regex.Pattern;
import java.util.regex.Matcher;
import java.io.InputStreamReader;
import java.io.BufferedReader;
import java.io.IOException;
public class FFProbeCommand {
private String input;
private String[] command;
private String ffprobePath;
private HashMap args;
/* Analyze duration is a special parameter that MUST come before the input */
private String analyzeDuration;
public FFProbeCommand(String input) {
this.input = input;
this.command = null;
this.ffprobePath = null;
this.args = new HashMap();
}
public String[] getFFprobeCommand(boolean shouldBuild){
if(shouldBuild)
buildFFprobeCommand();
return this.command;
}
public void buildFFprobeCommand() {
List comm = new ArrayList<String>();
if(this.ffprobePath == null)
this.ffprobePath = "/usr/local/bin/ffprobe";
comm.add(this.ffprobePath);
/* Analyze duration MUST come before the input */
if(analyzeDuration != null && !analyzeDuration.isEmpty()) {
comm.add("-analyzeduration");
comm.add(analyzeDuration);
}
comm.add("-i");
comm.add(input);
Iterator argsIter = this.args.entrySet().iterator();
while (argsIter.hasNext()) {
Map.Entry pairs = (Map.Entry)argsIter.next();
comm.add(pairs.getKey());
if (pairs.getValue()!=null)
comm.add(pairs.getValue());
}
this.command = new String[comm.size()];
comm.toArray(this.command);
}
public void setFFprobepath(String arg) {
this.ffprobePath = arg;
}
public void setAnalyzeDuration(String duration) {
this.analyzeDuration = duration;
}
public void setInput(String arg){
this.input = arg;
}
public void setLoglevel(String arg){
this.args.put("-loglevel", arg);
}
public void setShowStreams(){
this.args.put("-show_streams",null);
}
public void selectStream(String arg){
this.args.put("-select_streams", arg);
}
}

View File

@ -0,0 +1,255 @@
package org.bigbluebutton.transcode.core.processmonitor;
import java.io.InputStream;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.bigbluebutton.transcode.core.ffmpeg.FFmpegConstants;
public class ProcessMonitor {
private String[] command;
private Process process;
private String name;
ProcessStream inputStreamMonitor;
ProcessStream errorStreamMonitor;
private String inputStreamMonitorOutput;
private String errorStreamMonitorOutput;
public static enum Status{RUNNING,CLOSED_BY_USER};
private Status status;
private Thread thread;
private ProcessMonitorObserver observer;
public ProcessMonitor(String[] command,String name) {
this.command = command;
this.process = null;
this.thread = null;
this.inputStreamMonitor = null;
this.errorStreamMonitor = null;
this.name = name;
this.inputStreamMonitorOutput = null;
this.errorStreamMonitor = null;
}
@Override
public String toString() {
if (this.command == null || this.command.length == 0) {
return "";
}
Pattern pattern = Pattern.compile("(.*) (.*)");
StringBuffer result = new StringBuffer();
String delim = "";
for (String i : this.command) {
Matcher matcher = pattern.matcher(i);
if(matcher.matches()) {
result.append(delim).append("\""+matcher.group(1)+" "+matcher.group(2)+"\"");
}else result.append(delim).append(i);
delim = " ";
}
return removeLogLevelFlag(result.toString());
}
private String getCommandString(){
//used by the process's thread instead of toString()
return this.toString();
}
public void setCommand(String[] command){
this.command = command;
}
private void notifyProcessMonitorObserverOnFinishedUnsuccessfully() {
if(observer != null){
//log.debug("Notifying ProcessMonitorObserver that process finished unsuccessfully");
observer.handleProcessFinishedUnsuccessfully(this.name,inputStreamMonitorOutput);
}else {
//log.debug("Cannot notify ProcessMonitorObserver that process finished unsuccessfully: ProcessMonitorObserver null");
}
}
private void notifyProcessMonitorObserverOnFinished() {
if(observer != null){
//log.debug("Notifying ProcessMonitorObserver that {} successfully finished",this.name);
observer.handleProcessFinishedWithSuccess(this.name,inputStreamMonitorOutput);
}else {
//log.debug("Cannot notify ProcessMonitorObserver that {} finished: ProcessMonitorObserver null",this.name);
}
}
public synchronized void start() {
if(this.thread == null){
this.thread = new Thread( new Runnable(){
public void run(){
try {
System.out.println(" > Creating thread to execute " + name);
process = Runtime.getRuntime().exec(command);
System.out.println(" > Executing " + name + "( pid=" + getPid() + " ):\n " + getCommandString());
if(status == Status.CLOSED_BY_USER) {
System.out.println(" > ProcessMonitor closed by user. Closing " + name + " it immediatelly");
forceDestroy();
return;
}
InputStream is = process.getInputStream();
InputStream es = process.getErrorStream();
inputStreamMonitor = new ProcessStream(is,"STDOUT");
errorStreamMonitor = new ProcessStream(es,"STDERR");
inputStreamMonitor.start();
errorStreamMonitor.start();
process.waitFor();
}
catch(SecurityException se) {
System.out.println("Security Exception");
}
catch(IOException ioe) {
System.out.println("IO Exception");
}
catch(NullPointerException npe) {
System.out.println("NullPointer Exception");
}
catch(IllegalArgumentException iae) {
System.out.println("IllegalArgument Exception");
}
catch(InterruptedException ie) {
System.out.println("Interrupted Exception");
}
int ret = process.exitValue();
if (FFmpegConstants.acceptableExitCode(ret) || errorStreamMonitor.acceptableExitStatus()){
//log.debug("Exiting thread that executes {}. Exit value: {}, Exit status (from stdout): {} ",name,ret, errorStreamMonitor.getExitStatus());
storeProcessOutputs(inputStreamMonitor.getOutput(), errorStreamMonitor.getOutput());
clearData();
notifyProcessMonitorObserverOnFinished();
}
else{
//log.debug("Exiting thread that executes {}. Exit value: {}, Exit status (from stdout): {}",name,ret,errorStreamMonitor.getExitStatus());
storeProcessOutputs(inputStreamMonitor.getOutput(), errorStreamMonitor.getOutput());
clearData();
notifyProcessMonitorObserverOnFinishedUnsuccessfully();
}
}
});
status = Status.RUNNING;
this.thread.start();
}else{
//log.debug("Can't start a new process monitor: It is already running.");
}
}
public synchronized void restart(){
clearData();
status = Status.CLOSED_BY_USER;
start();
}
private void clearData(){
closeProcessStream();
closeProcess();
clearMonitorThread();
}
private void clearMonitorThread(){
if (this.thread !=null)
this.thread=null;
}
private void closeProcessStream(){
if(this.inputStreamMonitor != null){
this.inputStreamMonitor.close();
this.inputStreamMonitor = null;
}
if (this.errorStreamMonitor != null) {
this.errorStreamMonitor.close();
this.errorStreamMonitor = null;
}
}
private void closeProcess(){
if(this.process != null) {
status = Status.CLOSED_BY_USER;
//log.debug("Closing {} process",this.name);
this.process.destroy();
this.process = null;
}
}
private void storeProcessOutputs(String inputStreamOutput,String errorStreamOutput){
this.inputStreamMonitorOutput = inputStreamOutput;
this.errorStreamMonitorOutput = errorStreamOutput;
}
public synchronized void destroy() {
if (this.thread != null){
status = Status.CLOSED_BY_USER;
clearData();
//log.debug("ProcessMonitor successfully finished");
}else{
//log.debug("Can't destroy this process monitor: There's no process running.");
}
}
public void setProcessMonitorObserver(ProcessMonitorObserver observer){
if (observer==null){
//log.debug("Cannot assign observer: ProcessMonitorObserver null");
}else this.observer = observer;
}
public int getPid(){
Field f;
int pid;
try {
if (this.process == null) return -1;
f = this.process.getClass().getDeclaredField("pid");
f.setAccessible(true);
pid = (int)f.get(this.process);
return pid;
} catch (IllegalArgumentException | IllegalAccessException
| NoSuchFieldException | SecurityException e) {
//log.debug("Error when obtaining {} PID",this.name);
return -1;
}
}
public synchronized void forceDestroy(){
if (this.thread != null) {
status = Status.CLOSED_BY_USER;
try {
int pid = getPid();
if (pid < 0){
//log.debug("Process doesn't exist. Not destroying it...");
return;
}else {
Runtime.getRuntime().exec("kill -9 "+ getPid());
}
} catch (IOException e) {
//log.debug("Failed to force-kill {} process",this.name);
e.printStackTrace();
}
}else{
//log.debug("Can't force-destroy this process monitor: There's no process running.");
}
}
public boolean isFFmpegProcess(){
return this.name.toLowerCase().contains("ffmpeg");
}
/**
* Removes loglevel flag of ffmpeg command.
* Usefull for faster debugging
*/
private String removeLogLevelFlag(String commandString){
if (isFFmpegProcess()){
return commandString.replaceAll("-loglevel \\w+", "");
}else return commandString;
}
}

View File

@ -0,0 +1,6 @@
package org.bigbluebutton.transcode.core.processmonitor;
public interface ProcessMonitorObserver {
public void handleProcessFinishedUnsuccessfully(String processName, String processOutput);
public void handleProcessFinishedWithSuccess(String processName, String processOutput);
}

View File

@ -0,0 +1,95 @@
package org.bigbluebutton.transcode.core.processmonitor;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.IOException;
import org.bigbluebutton.transcode.core.ffmpeg.FFmpegConstants;
public class ProcessStream {
private InputStream stream;
private Thread thread;
private String type;
private String output;
private int exitStatus = FFmpegConstants.RUNNING_STATUS;
ProcessStream(InputStream stream, String type) {
if(stream != null)
this.stream = stream;
this.type = type;
this.output = "";
}
protected void start() {
exitStatus = FFmpegConstants.RUNNING_STATUS;
this.thread = new Thread( new Runnable(){
public void run(){
try {
String line;
InputStreamReader isr = new InputStreamReader(stream);
BufferedReader ibr = new BufferedReader(isr);
output = "";
while ((line = ibr.readLine()) != null) {
////log.debug("[{}]"+line,type);
updateCurrentStatusFromOutput(line);
output+=line+"\n";
}
close();
}
catch(IOException ioe) {
//log.debug("Finishing process stream [type={}] because there's no more data to be read",type);
close();
}
}
});
this.thread.start();
}
protected void close() {
try {
if(this.stream != null) {
////log.debug("Closing process stream");
this.stream.close();
this.stream = null;
}
}
catch(IOException ioe) {
//log.debug("IOException");
}
}
protected String getOutput(){
return this.output;
}
/**
* Update current process status based on the stdout.
* The exitStatus is mapped according to ffmpeg exit status
* @param outputLine
* Requires loglevel verbose of FFmpegCommand
*/
private void updateCurrentStatusFromOutput(String outputLine){
if (outputLine != null){
if (outputLine.contains(FFmpegConstants.FFMPEG_EXIT_WITH_NO_INPUT_OUTPUT)){
////log.debug("FFmpeg exited with no input status.");
exitStatus = FFmpegConstants.EXIT_WITH_NO_INPUT_STATUS;
}
/*else if outputLine.contains(FFmpegConstants....)
exitStatus = FFmpegConstants....
*/
}
}
public int getExitStatus(){
return exitStatus;
}
/**
* Validates exit status
*/
public boolean acceptableExitStatus(){
return FFmpegConstants.acceptableExitStatus(exitStatus);
}
}

View File

@ -0,0 +1,72 @@
package org.bigbluebutton.transcode.pubsub.receivers;
import org.bigbluebutton.transcode.core.api.ITranscodingInGW;
import org.bigbluebutton.common.messages.StartTranscoderRequestMessage;
import org.bigbluebutton.common.messages.UpdateTranscoderRequestMessage;
import org.bigbluebutton.common.messages.StopTranscoderRequestMessage;
import org.bigbluebutton.common.messages.StartProbingRequestMessage;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
public class RedisMessageReceiver {
public static final String TO_BBB_TRANSCODE_CHANNEL = "bigbluebutton:to-bbb-transcode";
public static final String TO_BBB_TRANSCODE_PATTERN = TO_BBB_TRANSCODE_CHANNEL + ":*";
public static final String TO_BBB_TRANSCODE_SYSTEM_CHAN = TO_BBB_TRANSCODE_CHANNEL + ":system";
private ITranscodingInGW transcodingInGW;
public RedisMessageReceiver(ITranscodingInGW transcodingInGW) {
this.transcodingInGW = transcodingInGW;
}
public void handleMessage(String pattern, String channel, String message) {
if (channel.equalsIgnoreCase(TO_BBB_TRANSCODE_SYSTEM_CHAN)) {
JsonParser parser = new JsonParser();
JsonObject obj = (JsonObject) parser.parse(message);
if (obj.has("header") && obj.has("payload")) {
JsonObject header = (JsonObject) obj.get("header");
if (header.has("name")) {
String messageName = header.get("name").getAsString();
switch (messageName) {
case StartTranscoderRequestMessage.START_TRANSCODER_REQUEST:
processStartTranscoderRequestMessage(message);
break;
case UpdateTranscoderRequestMessage.UPDATE_TRANSCODER_REQUEST:
processUpdateTranscoderRequestMessage(message);
break;
case StopTranscoderRequestMessage.STOP_TRANSCODER_REQUEST:
processStopTranscoderRequestMessage(message);
break;
case StartProbingRequestMessage.START_PROBING_REQUEST:
processStartProbingRequestMessage(message);
}
}
}
}
}
private void processStartTranscoderRequestMessage(String json) {
StartTranscoderRequestMessage msg = StartTranscoderRequestMessage.fromJson(json);
transcodingInGW.startTranscoder(msg.meetingId, msg.transcoderId, msg.params);
}
private void processUpdateTranscoderRequestMessage(String json) {
UpdateTranscoderRequestMessage msg = UpdateTranscoderRequestMessage.fromJson(json);
transcodingInGW.updateTranscoder(msg.meetingId, msg.transcoderId, msg.params);
}
private void processStopTranscoderRequestMessage(String json) {
StopTranscoderRequestMessage msg = StopTranscoderRequestMessage.fromJson(json);
transcodingInGW.stopTranscoder(msg.meetingId, msg.transcoderId);
}
private void processStartProbingRequestMessage(String json) {
StartProbingRequestMessage msg = StartProbingRequestMessage.fromJson(json);
transcodingInGW.startProbing(msg.meetingId, msg.transcoderId, msg.params);
}
}

View File

@ -0,0 +1,45 @@
akka {
actor {
debug {
receive = on
}
}
loglevel = INFO
stdout-loglevel = "INFO"
rediscala-subscriber-worker-dispatcher {
mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox"
# Throughput defines the maximum number of messages to be
# processed per actor before the thread jumps to the next actor.
# Set to 1 for as fair as possible.
throughput = 512
}
}
redis {
host="127.0.0.1"
port=6379
password=""
}
videoconference {
#The image to use in the videoconference window and/or when the webuser has no video
videoconf-logo-image-path = /usr/share/red5/webapps/sip/WEB-INF/mconf-videoconf-logo.gif
#Enable username subtitle on video-conf-logo (the one shown in sip-phone when
#webconference's talker has no video )
enable-user-video-subtitle = true
#To change the sip video resolution, edit below:
#IMPORTANT: For now, we only accept these 3 resolutions: 160x120, 320x240, 640x480
sip-video-resolution=640x480
}
transcoder {
#The path where ffmpeg is installed
ffmpeg-path = /usr/local/bin/ffmpeg
#The path where ffprobe is installed
ffprobe-path = /usr/local/bin/ffprobe
}

View File

@ -0,0 +1,29 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%date{ISO8601} %-5level %logger{36} %X{akkaSource} - %msg%n</pattern>
</encoder>
</appender>
<appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<File>logs/bbb-transcode.log</File>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<FileNamePattern>logs/bbb-transcode.%d{yyyy-MM-dd}.log</FileNamePattern>
<!-- keep 30 days worth of history -->
<MaxHistory>5</MaxHistory>
</rollingPolicy>
<layout class="ch.qos.logback.classic.PatternLayout">
<Pattern>%d{"yyyy-MM-dd HH:mm:ss,SSSXXX"} [%thread] %-5level %logger{35} - %msg%n</Pattern>
</layout>
</appender>
<logger name="akka" level="INFO" />
<logger name="org.bigbluebutton" level="DEBUG" />
<logger name="org.freeswitch.transcode" level="WARN" />
<root level="INFO">
<appender-ref ref="STDOUT"/>
<appender-ref ref="FILE" />
</root>
</configuration>

View File

@ -0,0 +1,23 @@
package org.bigbluebutton
import akka.actor.{ ActorSystem, Props }
import scala.concurrent.duration._
import redis.RedisClient
import scala.concurrent.{ Future, Await }
import scala.concurrent.ExecutionContext.Implicits.global
import org.bigbluebutton.endpoint.redis.{ RedisPublisher, AppsRedisSubscriberActor }
import org.bigbluebutton.transcode.pubsub.receivers.RedisMessageReceiver
import org.bigbluebutton.transcode.core.TranscodingInGW
object Boot extends App with SystemConfiguration {
implicit val system = ActorSystem("bigbluebutton-transcode-system")
val redisPublisher = new RedisPublisher(system)
var transcodingInGW = new TranscodingInGW(system, redisPublisher);
val redisMsgReceiver = new RedisMessageReceiver(transcodingInGW);
val redisSubscriberActor = system.actorOf(AppsRedisSubscriberActor.props(system, redisMsgReceiver), "redis-subscriber")
}

View File

@ -0,0 +1,20 @@
package org.bigbluebutton
import com.typesafe.config.ConfigFactory
import scala.util.Try
trait SystemConfiguration {
val config = ConfigFactory.load()
lazy val redisHost = Try(config.getString("redis.host")).getOrElse("127.0.0.1")
lazy val redisPort = Try(config.getInt("redis.port")).getOrElse(6379)
lazy val redisPassword = Try(config.getString("redis.password")).getOrElse("")
lazy val _ffmpegPath = Try(config.getString("transcoder.ffmpeg-path")).getOrElse("/usr/local/bin/ffmpeg")
lazy val _ffprobePath = Try(config.getString("transcoder.ffprobe-path")).getOrElse("/usr/local/bin/ffprobe")
lazy val _videoconfLogoImagePath = Try(config.getString("videoconference.videoconf-logo-image-path")).getOrElse("")
lazy val _enableUserVideoSubtitle = Try(config.getString("videoconference.enable-user-video-subtitle").toBoolean).getOrElse(false)
lazy val _sipVideoResolution = Try(config.getString("videoconference.sip-video-resolution")).getOrElse("")
}

View File

@ -0,0 +1,79 @@
package org.bigbluebutton.endpoint.redis
import akka.actor.Props
import java.net.InetSocketAddress
import redis.actors.RedisSubscriberActor
import redis.api.pubsub.{ PMessage, Message }
import scala.concurrent.duration._
import akka.actor.ActorRef
import akka.actor.actorRef2Scala
import org.bigbluebutton.SystemConfiguration
import org.bigbluebutton.transcode.pubsub.receivers.RedisMessageReceiver
import redis.api.servers.ClientSetname
import org.bigbluebutton.common.converters.FromJsonDecoder
import org.bigbluebutton.common.messages.PubSubPongMessage
import akka.actor.ActorSystem
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
object AppsRedisSubscriberActor extends SystemConfiguration {
val channels = Seq("time")
val patterns = Seq("bigbluebutton:to-bbb-transcode:*")
def props(system: ActorSystem, msgReceiver: RedisMessageReceiver): Props =
Props(classOf[AppsRedisSubscriberActor], system, msgReceiver,
redisHost, redisPort,
channels, patterns).withDispatcher("akka.rediscala-subscriber-worker-dispatcher")
}
class AppsRedisSubscriberActor(val system: ActorSystem, msgReceiver: RedisMessageReceiver, redisHost: String,
redisPort: Int, channels: Seq[String] = Nil, patterns: Seq[String] = Nil)
extends RedisSubscriberActor(new InetSocketAddress(redisHost, redisPort),
channels, patterns) {
val decoder = new FromJsonDecoder()
var lastPongReceivedOn = 0L
system.scheduler.schedule(10 seconds, 10 seconds)(checkPongMessage())
// Set the name of this client to be able to distinguish when doing
// CLIENT LIST on redis-cli
write(ClientSetname("BbbTranscodeAkkaSub").encodedRequest)
def checkPongMessage() {
val now = System.currentTimeMillis()
if (lastPongReceivedOn != 0 && (now - lastPongReceivedOn > 30000)) {
log.error("BBB-Transcode pubsub error!");
}
}
def onMessage(message: Message) {
log.debug(s"message received: $message")
}
def onPMessage(pmessage: PMessage) {
log.debug(s"pattern message received: $pmessage")
val msg = decoder.decodeMessage(pmessage.data)
if (msg != null) {
msg match {
case m: PubSubPongMessage => {
if (m.payload.system == "BbbTranscode") {
lastPongReceivedOn = System.currentTimeMillis()
}
}
case _ => // do nothing
}
} else {
msgReceiver.handleMessage(pmessage.patternMatched, pmessage.channel, pmessage.data)
}
}
def handleMessage(msg: String) {
log.warning("**** TODO: Handle pubsub messages. ****")
}
}

View File

@ -0,0 +1,34 @@
package org.bigbluebutton.endpoint.redis
import akka.actor.Props
import redis.RedisClient
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
import akka.actor.ActorSystem
import scala.concurrent.Await
import akka.actor.Actor
import org.bigbluebutton.SystemConfiguration
import org.bigbluebutton.common.converters.ToJsonEncoder
class RedisPublisher(val system: ActorSystem) extends SystemConfiguration {
val redis = RedisClient(redisHost, redisPort)(system)
// Set the name of this client to be able to distinguish when doing
// CLIENT LIST on redis-cli
redis.clientSetname("BbbTranscodeAkkaPub")
val encoder = new ToJsonEncoder()
def sendPingMessage() {
val json = encoder.encodePubSubPingMessage("BbbTranscode", System.currentTimeMillis())
redis.publish("bigbluebutton:to-bbb-apps:system", json)
}
system.scheduler.schedule(10 seconds, 10 seconds)(sendPingMessage())
def publish(channel: String, data: String) {
//println("PUBLISH TO [" + channel + "]: \n [" + data + "]")
redis.publish(channel, data)
}
}

View File

@ -0,0 +1,31 @@
package org.bigbluebutton.transcode.core
import org.bigbluebutton.transcode.core.api.ITranscodingInGW
import org.bigbluebutton.endpoint.redis.RedisPublisher
import scala.collection.JavaConversions._
import java.util.ArrayList
import scala.collection.mutable.ArrayBuffer
import akka.actor.ActorSystem
import org.bigbluebutton.transcode.api._
class TranscodingInGW(val system: ActorSystem, messageSender: RedisPublisher) extends ITranscodingInGW {
val log = system.log
val transcodingActor = system.actorOf(TranscodingActor.props(system, messageSender), "bbb-transcoding-manager")
def startTranscoder(meetingId: String, transcoderId: String, params: java.util.Map[String, String]) {
transcodingActor ! new StartTranscoderRequest(meetingId, transcoderId, params)
}
def updateTranscoder(meetingId: String, transcoderId: String, params: java.util.Map[String, String]) {
transcodingActor ! new UpdateTranscoderRequest(meetingId, transcoderId, params)
}
def stopTranscoder(meetingId: String, transcoderId: String) {
transcodingActor ! new StopTranscoderRequest(meetingId, transcoderId)
}
def startProbing(meetingId: String, transcoderId: String, params: java.util.Map[String, String]) {
transcodingActor ! new StartProbingRequest(meetingId, transcoderId, params)
}
}

View File

@ -0,0 +1,8 @@
package org.bigbluebutton.transcode.api
trait InMessage { val meetingId: String }
case class StartTranscoderRequest(meetingId: String, transcoderId: String, params: java.util.Map[String, String]) extends InMessage
case class UpdateTranscoderRequest(meetingId: String, transcoderId: String, params: java.util.Map[String, String]) extends InMessage
case class StopTranscoderRequest(meetingId: String, transcoderId: String) extends InMessage
case class StartProbingRequest(meetingId: String, transcoderId: String, params: java.util.Map[String, String]) extends InMessage

View File

@ -0,0 +1,12 @@
package org.bigbluebutton.transcode.api
import scala.collection.mutable.HashMap
abstract class OutMessage
case class StartTranscoderReply(meetingId: String, transcoderId: String, params: HashMap[String, String]) extends OutMessage
case class StopTranscoderReply(meetingId: String, transcoderId: String) extends OutMessage
case class UpdateTranscoderReply(meetingId: String, transcoderId: String, params: HashMap[String, String]) extends OutMessage
case class StartProbingReply(meetingId: String, transcoderId: String, params: HashMap[String, String]) extends OutMessage
case class TranscoderStatusUpdate(meetingId: String, transcoderId: String, params: HashMap[String, String]) extends OutMessage

View File

@ -0,0 +1,84 @@
package org.bigbluebutton.transcode.core
import akka.actor.Actor
import akka.actor.ActorContext
import akka.actor.ActorLogging
import akka.actor.Props
import org.bigbluebutton.transcode.api._
import org.bigbluebutton.endpoint.redis.RedisPublisher
import collection.JavaConverters._
import scala.collection.JavaConversions._
import org.bigbluebutton.common.messages.StartTranscoderReplyMessage
import org.bigbluebutton.common.messages.StopTranscoderReplyMessage
import org.bigbluebutton.common.messages.TranscoderStatusUpdateMessage
import org.bigbluebutton.common.messages.UpdateTranscoderReplyMessage
import org.bigbluebutton.common.messages.StartProbingReplyMessage
import org.bigbluebutton.common.messages.MessagingConstants
object MessageSenderActor {
def props(msgSender: RedisPublisher): Props =
Props(classOf[MessageSenderActor], msgSender)
}
class MessageSenderActor(val msgSender: RedisPublisher)
extends Actor with ActorLogging {
def receive = {
case msg: StartTranscoderReply => handleStartTranscoderReply(msg)
case msg: StopTranscoderReply => handleStopTranscoderReply(msg)
case msg: UpdateTranscoderReply => handleUpdateTranscoderReply(msg)
case msg: TranscoderStatusUpdate => handleTranscoderStatusUpdate(msg)
case msg: StartProbingReply => handleStartProbingReply(msg)
case _ => // do nothing
}
private def handleStartTranscoderReply(msg: StartTranscoderReply) {
System.out.println("Sending StartTranscoderReplyMessage. Params: [\n"
+ "meetingId = " + msg.meetingId + "\n"
+ "transcoderId = " + msg.transcoderId + "\n"
+ "params = " + msg.params.mkString(", ") + "\n]\n")
val str = new StartTranscoderReplyMessage(msg.meetingId, msg.transcoderId, msg.params)
msgSender.publish(MessagingConstants.FROM_BBB_TRANSCODE_SYSTEM_CHAN, str.toJson())
}
private def handleStopTranscoderReply(msg: StopTranscoderReply) {
System.out.println("Sending StopTranscoderReplyMessage. Params: [\n"
+ "meetingId = " + msg.meetingId + "\n"
+ "transcoderId = " + msg.transcoderId + "\n]\n")
val str = new StopTranscoderReplyMessage(msg.meetingId, msg.transcoderId)
msgSender.publish(MessagingConstants.FROM_BBB_TRANSCODE_SYSTEM_CHAN, str.toJson())
}
private def handleUpdateTranscoderReply(msg: UpdateTranscoderReply) {
System.out.println("Sending UpdateTranscoderReplyMessage. Params: [\n"
+ "meetingId = " + msg.meetingId + "\n"
+ "transcoderId = " + msg.transcoderId + "\n"
+ "params = " + msg.params.mkString(", ") + "\n]\n")
val str = new UpdateTranscoderReplyMessage(msg.meetingId, msg.transcoderId, msg.params)
msgSender.publish(MessagingConstants.FROM_BBB_TRANSCODE_SYSTEM_CHAN, str.toJson())
}
private def handleTranscoderStatusUpdate(msg: TranscoderStatusUpdate) {
System.out.println("Sending TranscoderStatusUpdateMessage. Params: [\n"
+ "meetingId = " + msg.meetingId + "\n"
+ "transcoderId = " + msg.transcoderId + "\n"
+ "params = " + msg.params.mkString(", ") + "\n]\n")
val str = new TranscoderStatusUpdateMessage(msg.meetingId, msg.transcoderId, msg.params)
msgSender.publish(MessagingConstants.FROM_BBB_TRANSCODE_SYSTEM_CHAN, str.toJson())
}
private def handleStartProbingReply(msg: StartProbingReply) {
System.out.println("Sending StartProbingReplyMessage. Params: [\n"
+ "meetingId = " + msg.meetingId + "\n"
+ "transcoderId = " + msg.transcoderId + "\n"
+ "params = " + msg.params.mkString(", ") + "\n]\n")
val str = new TranscoderStatusUpdateMessage(msg.meetingId, msg.transcoderId, msg.params)
msgSender.publish(MessagingConstants.FROM_BBB_TRANSCODE_SYSTEM_CHAN, str.toJson())
}
}

View File

@ -0,0 +1,26 @@
package org.bigbluebutton.transcode.core
import scala.collection.mutable.HashMap
import akka.actor.ActorRef
class TranscodersModel {
private var transcoders = new HashMap[String, ActorRef]
def addTranscoder(transcoderId: String, transcoderActor: ActorRef) {
transcoders += transcoderId -> transcoderActor
}
def removeTranscoder(transcoderId: String) {
transcoders -= transcoderId
}
def getTranscoder(transcoderId: String): Option[ActorRef] = {
transcoders.get(transcoderId)
}
def getTranscoders(): Array[ActorRef] = {
transcoders.values toArray
}
}

View File

@ -0,0 +1,29 @@
package org.bigbluebutton.transcode.core
import org.bigbluebutton.SystemConfiguration
import org.bigbluebutton.transcode.core.ffmpeg.FFmpegUtils
class TranscodersService {}
object TranscodersService extends SystemConfiguration {
def ffmpegPath(): String = {
_ffmpegPath
}
def ffprobePath(): String = {
_ffprobePath
}
def videoconfLogoImagePath(): String = {
_videoconfLogoImagePath
}
def enableUserVideoSubtitle(): Boolean = {
_enableUserVideoSubtitle
}
def sipVideoResolution(): String = {
_sipVideoResolution
}
}

View File

@ -0,0 +1,106 @@
package org.bigbluebutton.transcode.core
import akka.actor._
import akka.actor.ActorLogging
import scala.collection.mutable.HashMap
import org.bigbluebutton.endpoint.redis.RedisPublisher
import org.bigbluebutton.transcode.api._
import org.bigbluebutton.SystemConfiguration
import scala.collection._
import scala.collection.JavaConversions._
import org.bigbluebutton.common.messages.Constants
import org.bigbluebutton.transcode.core.apps.{ TranscodingObserverApp }
object TranscodingActor extends SystemConfiguration {
def props(system: ActorSystem, messageSender: RedisPublisher): Props =
Props(classOf[TranscodingActor], system, messageSender)
}
class TranscodingActor(val system: ActorSystem, messageSender: RedisPublisher)
extends Actor with ActorLogging with TranscodingObserverApp {
val transcodersModel = new TranscodersModel()
val messageSenderActor = context.actorOf(MessageSenderActor.props(messageSender), "bbb-sender-actor")
def receive = {
case msg: StartTranscoderRequest => handleStartTranscoderRequest(msg)
case msg: UpdateTranscoderRequest => handleUpdateTranscoderRequest(msg)
case msg: StopTranscoderRequest => handleStopTranscoderRequest(msg)
case msg: StartProbingRequest => handleStartProbingRequest(msg)
//internal messages
case msg: StartVideoTranscoderReply => handleStartVideoTranscoderReply(msg)
case msg: UpdateVideoTranscoderReply => handleUpdateVideoTranscoderReply(msg)
case msg: DestroyVideoTranscoderReply => handleDestroyVideoTranscoderReply(msg)
case msg: TranscodingFinishedUnsuccessfully => handleTranscodingFinishedUnsuccessfully(msg)
case msg: TranscodingFinishedSuccessfully => handleTranscodingFinishedSuccessfully(msg)
case msg: RestartVideoTranscoderReply => handleRestartVideoTranscoderReply(msg)
case msg: StartVideoProbingReply => handleStartVideoProbingReply(msg)
case _ => // do nothing
}
private def handleStartTranscoderRequest(msg: StartTranscoderRequest) {
log.info("\n > Received StartTranscoderRequest. Params:\n"
+ " meetingId = " + msg.meetingId + "\n"
+ " transcoderId = " + msg.transcoderId + "\n"
+ " params = " + msg.params.toString() + "\n")
transcodersModel.getTranscoder(msg.transcoderId) match {
case Some(vt) => {
log.info("\n > Found a transcoder for this user {}", msg.transcoderId)
vt ! new StartVideoTranscoderRequest()
}
case None => {
val vt = context.actorOf(VideoTranscoder.props(self, msg.meetingId, msg.transcoderId, msg.params))
transcodersModel.addTranscoder(msg.transcoderId, vt)
vt ! new StartVideoTranscoderRequest()
}
}
}
private def handleUpdateTranscoderRequest(msg: UpdateTranscoderRequest) {
log.info("\n > Received UpdateTranscoderRequest. Params:\n"
+ " meetingId = " + msg.meetingId + "\n"
+ " transcoderId = " + msg.transcoderId + "\n"
+ " params = " + msg.params.toString() + "\n")
transcodersModel.getTranscoder(msg.transcoderId) match {
case Some(vt) => vt ! new UpdateVideoTranscoderRequest(msg.params)
case None =>
log.info("\n > Video transcoder with id = {} not found (might be finished already or it is restarting).", msg.transcoderId)
}
}
private def handleStopTranscoderRequest(msg: StopTranscoderRequest) {
log.info("\n > Received StopTranscoderRequest. Params:\n"
+ " meetingId = " + msg.meetingId + "\n"
+ " transcoderId = " + msg.transcoderId + "\n")
transcodersModel.getTranscoder(msg.transcoderId) match {
case Some(vt) => {
transcodersModel.removeTranscoder(msg.transcoderId)
vt ! new DestroyVideoTranscoderRequest() //stop transcoder and destroy it's actor
}
case None => {
log.info("\n > Transcoder with id = {} not found (might be finished already).", msg.transcoderId)
}
}
}
private def handleStartProbingRequest(msg: StartProbingRequest) {
log.info("\n > Received StartProbingRequest. Params:\n"
+ " meetingId = " + msg.meetingId + "\n"
+ " transcoderId = " + msg.transcoderId + "\n")
transcodersModel.getTranscoder(msg.transcoderId) match {
case Some(vt) => {
log.info("\n > Found a transcoder for this user {}", msg.transcoderId)
vt ! new StartVideoProbingRequest()
}
case None => {
val vt = context.actorOf(VideoTranscoder.props(self, msg.meetingId, msg.transcoderId, msg.params))
transcodersModel.addTranscoder(msg.transcoderId, vt)
vt ! new StartVideoProbingRequest()
}
}
}
}

View File

@ -0,0 +1,78 @@
package org.bigbluebutton.transcode.core.apps
import akka.actor.ActorRef
import org.bigbluebutton.transcode.core.TranscodingActor
import org.bigbluebutton.transcode.api._
import org.bigbluebutton.common.messages.Constants
import org.bigbluebutton.transcode.core.ffmpeg.FFmpegConstants
import scala.collection.JavaConversions._
trait TranscodingObserverApp {
this: TranscodingActor =>
val messageSenderActor: ActorRef
def handleTranscodingFinishedUnsuccessfully(msg: TranscodingFinishedUnsuccessfully) = {
transcodersModel.getTranscoder(msg.getTranscoderId()) match {
case Some(vt) => {
log.info("\n > Transcoder for this user {} stopped unsuccessfully, restarting it...", msg.getTranscoderId())
vt ! new RestartVideoTranscoderRequest()
}
case None => {
log.info("\n > Video transcoder with id = {} not found (might be destroyed already).", msg.getTranscoderId())
}
}
}
def handleTranscodingFinishedSuccessfully(msg: TranscodingFinishedSuccessfully) = {
transcodersModel.getTranscoder(msg.getTranscoderId()) match {
case Some(vt) => {
log.info("\n > Transcoder for this user {} stopped with success, removing it from transcoder's list...", msg.getTranscoderId())
transcodersModel.removeTranscoder(msg.getTranscoderId())
}
case None => {
log.info("\n > Video transcoder with id = {} not found (might be destroyed already).", msg.getTranscoderId())
}
}
}
def handleStartVideoTranscoderReply(msg: StartVideoTranscoderReply) = {
log.info("\n > Transcoder with id = {} started", msg.getTranscoderId())
val params = new scala.collection.mutable.HashMap[String, String]
params += Constants.OUTPUT -> msg.getOutput()
messageSenderActor ! new StartTranscoderReply(msg.getMeetingId(), msg.getTranscoderId(), params)
}
def handleUpdateVideoTranscoderReply(msg: UpdateVideoTranscoderReply) = {
log.info("\n > Transcoder with id = {} updated", msg.getTranscoderId())
val params = new scala.collection.mutable.HashMap[String, String]
params += Constants.OUTPUT -> msg.getOutput()
messageSenderActor ! new UpdateTranscoderReply(msg.getMeetingId(), msg.getTranscoderId(), params)
}
def handleDestroyVideoTranscoderReply(msg: DestroyVideoTranscoderReply) = {
log.info("\n > Transcoder with id = {} stopped", msg.getTranscoderId())
messageSenderActor ! new StopTranscoderReply(msg.getMeetingId(), msg.getTranscoderId())
}
def handleRestartVideoTranscoderReply(msg: RestartVideoTranscoderReply) = {
log.info("\n > Transcoder with id = {} restarted", msg.getTranscoderId())
val params = new scala.collection.mutable.HashMap[String, String]
params += Constants.OUTPUT -> msg.getOutput()
messageSenderActor ! new TranscoderStatusUpdate(msg.getMeetingId(), msg.getTranscoderId(), params)
}
def handleStartVideoProbingReply(msg: StartVideoProbingReply) = {
val ffprobeResult = mapAsScalaMap(msg.getProbingData())
Option(ffprobeResult) match {
case Some(result) =>
val params = new scala.collection.mutable.HashMap[String, String]
params += Constants.WIDTH_RATIO -> result.getOrElse(FFmpegConstants.WIDTH, "")
params += Constants.HEIGHT_RATIO -> result.getOrElse(FFmpegConstants.HEIGHT, "")
messageSenderActor ! new StartProbingReply(msg.getMeetingId(), msg.getTranscoderId(), params)
case _ => log.debug("Could not send ffprobe reply : failed to get the new resolution");
}
}
}

View File

@ -141,4 +141,16 @@ public class Constants {
public static final String OPERATION = "operation";
public static final String NOTE = "note";
public static final String METADATA = "metadata";
public static final String LOCAL_IP_ADDRESS = "local_ip_address";
public static final String LOCAL_VIDEO_PORT = "local_video_port";
public static final String REMOTE_VIDEO_PORT = "remote_video_port";
public static final String SIP_HOST = "sip_host";
public static final String TRANSCODER_TYPE = "transcoder_type";
public static final String INPUT = "input";
public static final String OUTPUT = "output";
public static final String TRANSCODE_RTP_TO_RTMP = "transcode_rtp_to_rtmp";
public static final String TRANSCODE_RTMP_TO_RTP = "transcode_rtmp_to_rtp";
public static final String TRANSCODE_FILE_TO_RTP = "transcode_file_to_rtp";
public static final String TRANSCODE_FILE_TO_RTMP = "transcode_file_to_rtmp";
public static final String PROBE_RTMP = "probe_rtmp";
}

View File

@ -53,6 +53,14 @@ public class MessagingConstants {
public static final String FROM_VOICE_CONF_SYSTEM_CHAN = FROM_VOICE_CONF_CHANNEL + ":system";
public static final String FROM_BBB_RECORDING_CHANNEL = "bigbluebutton:from-rap";
public static final String TO_BBB_TRANSCODE_CHANNEL = "bigbluebutton:to-bbb-transcode";
public static final String TO_BBB_TRANSCODE_PATTERN = TO_BBB_TRANSCODE_CHANNEL + ":*";
public static final String TO_BBB_TRANSCODE_SYSTEM_CHAN = TO_BBB_TRANSCODE_CHANNEL + ":system";
public static final String FROM_BBB_TRANSCODE_CHANNEL = "bigbluebutton:from-bbb-transcode";
public static final String FROM_BBB_TRANSCODE_PATTERN = FROM_BBB_TRANSCODE_CHANNEL + ":*";
public static final String FROM_BBB_TRANSCODE_SYSTEM_CHAN = FROM_BBB_TRANSCODE_CHANNEL + ":system";
public static final String DESTROY_MEETING_REQUEST_EVENT = "DestroyMeetingRequestEvent";
public static final String CREATE_MEETING_REQUEST_EVENT = "CreateMeetingRequestEvent";

View File

@ -0,0 +1,64 @@
package org.bigbluebutton.common.messages;
import java.util.Map;
import java.util.HashMap;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.reflect.TypeToken;
public class StartProbingReplyMessage implements IBigBlueButtonMessage {
public static final String START_PROBING_REPLY = "start_probing_reply_message";
public static final String VERSION = "0.0.1";
public static final String MEETING_ID = "meeting_id";
public static final String TRANSCODER_ID = "transcoder_id";
public static final String PARAMS = "params";
public final String meetingId;
public final String transcoderId;
public final Map<String,String> params;
public StartProbingReplyMessage(String meetingId, String transcoderId, Map<String,String> params) {
this.meetingId = meetingId;
this.transcoderId = transcoderId;
this.params = params;
}
public String toJson() {
HashMap<String, Object> payload = new HashMap<String, Object>();
payload.put(MEETING_ID, meetingId);
payload.put(TRANSCODER_ID, transcoderId);
payload.put(PARAMS, params);
java.util.HashMap<String, Object> header = MessageBuilder.buildHeader(START_PROBING_REPLY, VERSION, null);
return MessageBuilder.buildJson(header, payload);
}
public static StartProbingReplyMessage fromJson(String message) {
JsonParser parser = new JsonParser();
JsonObject obj = (JsonObject) parser.parse(message);
if (obj.has("header") && obj.has("payload")) {
JsonObject header = (JsonObject) obj.get("header");
JsonObject payload = (JsonObject) obj.get("payload");
if (header.has("name")) {
String messageName = header.get("name").getAsString();
if (START_PROBING_REPLY.equals(messageName)) {
if (payload.has(MEETING_ID)
&& payload.has(TRANSCODER_ID)
&& payload.has(PARAMS)){
String meetingId = payload.get(MEETING_ID).getAsString();
String transcoderId = payload.get(TRANSCODER_ID).getAsString();
Map<String,String> params = new Gson().fromJson(payload.get(PARAMS).toString(), new TypeToken<Map<String, String>>() {}.getType());
return new StartProbingReplyMessage(meetingId, transcoderId, params);
}
}
}
}
return null;
}
}

View File

@ -0,0 +1,64 @@
package org.bigbluebutton.common.messages;
import java.util.Map;
import java.util.HashMap;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.reflect.TypeToken;
public class StartProbingRequestMessage implements IBigBlueButtonMessage {
public static final String START_PROBING_REQUEST = "start_probing_request_message";
public static final String VERSION = "0.0.1";
public static final String MEETING_ID = "meeting_id";
public static final String TRANSCODER_ID = "transcoder_id";
public static final String PARAMS = "params";
public final String meetingId;
public final String transcoderId;
public final Map<String,String> params;
public StartProbingRequestMessage(String meetingId, String transcoderId, Map<String,String> params) {
this.meetingId = meetingId;
this.transcoderId = transcoderId;
this.params = params;
}
public String toJson() {
HashMap<String, Object> payload = new HashMap<String, Object>();
payload.put(MEETING_ID, meetingId);
payload.put(TRANSCODER_ID, transcoderId);
payload.put(PARAMS, params);
java.util.HashMap<String, Object> header = MessageBuilder.buildHeader(START_PROBING_REQUEST, VERSION, null);
return MessageBuilder.buildJson(header, payload);
}
public static StartProbingRequestMessage fromJson(String message) {
JsonParser parser = new JsonParser();
JsonObject obj = (JsonObject) parser.parse(message);
if (obj.has("header") && obj.has("payload")) {
JsonObject header = (JsonObject) obj.get("header");
JsonObject payload = (JsonObject) obj.get("payload");
if (header.has("name")) {
String messageName = header.get("name").getAsString();
if (START_PROBING_REQUEST.equals(messageName)) {
if (payload.has(MEETING_ID)
&& payload.has(TRANSCODER_ID)
&& payload.has(PARAMS)){
String meetingId = payload.get(MEETING_ID).getAsString();
String transcoderId = payload.get(TRANSCODER_ID).getAsString();
Map<String,String> params = new Gson().fromJson(payload.get(PARAMS).toString(), new TypeToken<Map<String, String>>() {}.getType());
return new StartProbingRequestMessage(meetingId, transcoderId, params);
}
}
}
}
return null;
}
}

View File

@ -0,0 +1,64 @@
package org.bigbluebutton.common.messages;
import java.util.Map;
import java.util.HashMap;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.reflect.TypeToken;
public class StartTranscoderReplyMessage implements IBigBlueButtonMessage {
public static final String START_TRANSCODER_REPLY = "start_transcoder_reply_message";
public static final String VERSION = "0.0.1";
public static final String MEETING_ID = "meeting_id";
public static final String TRANSCODER_ID = "transcoder_id";
public static final String PARAMS = "params";
public final String meetingId;
public final String transcoderId;
public final Map<String,String> params;
public StartTranscoderReplyMessage(String meetingId, String transcoderId, Map<String,String> params) {
this.meetingId = meetingId;
this.transcoderId = transcoderId;
this.params = params;
}
public String toJson() {
HashMap<String, Object> payload = new HashMap<String, Object>();
payload.put(MEETING_ID, meetingId);
payload.put(TRANSCODER_ID, transcoderId);
payload.put(PARAMS, params);
java.util.HashMap<String, Object> header = MessageBuilder.buildHeader(START_TRANSCODER_REPLY, VERSION, null);
return MessageBuilder.buildJson(header, payload);
}
public static StartTranscoderReplyMessage fromJson(String message) {
JsonParser parser = new JsonParser();
JsonObject obj = (JsonObject) parser.parse(message);
if (obj.has("header") && obj.has("payload")) {
JsonObject header = (JsonObject) obj.get("header");
JsonObject payload = (JsonObject) obj.get("payload");
if (header.has("name")) {
String messageName = header.get("name").getAsString();
if (START_TRANSCODER_REPLY.equals(messageName)) {
if ( payload.has(MEETING_ID)
&& payload.has(TRANSCODER_ID)
&& payload.has(PARAMS)){
String meetingId = payload.get(MEETING_ID).getAsString();
String transcoderId = payload.get(TRANSCODER_ID).getAsString();
Map<String,String> params = new Gson().fromJson(payload.get(PARAMS).toString(), new TypeToken<Map<String, String>>() {}.getType());
return new StartTranscoderReplyMessage(meetingId, transcoderId, params);
}
}
}
}
return null;
}
}

View File

@ -0,0 +1,64 @@
package org.bigbluebutton.common.messages;
import java.util.Map;
import java.util.HashMap;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.reflect.TypeToken;
public class StartTranscoderRequestMessage implements IBigBlueButtonMessage {
public static final String START_TRANSCODER_REQUEST = "start_transcoder_request_message";
public static final String VERSION = "0.0.1";
public static final String MEETING_ID = "meeting_id";
public static final String TRANSCODER_ID = "transcoder_id";
public static final String PARAMS = "params";
public final String meetingId;
public final String transcoderId;
public final Map<String,String> params;
public StartTranscoderRequestMessage(String meetingId, String transcoderId, Map<String,String> params) {
this.meetingId = meetingId;
this.transcoderId = transcoderId;
this.params = params;
}
public String toJson() {
HashMap<String, Object> payload = new HashMap<String, Object>();
payload.put(MEETING_ID, meetingId);
payload.put(TRANSCODER_ID, transcoderId);
payload.put(PARAMS, params);
java.util.HashMap<String, Object> header = MessageBuilder.buildHeader(START_TRANSCODER_REQUEST, VERSION, null);
return MessageBuilder.buildJson(header, payload);
}
public static StartTranscoderRequestMessage fromJson(String message) {
JsonParser parser = new JsonParser();
JsonObject obj = (JsonObject) parser.parse(message);
if (obj.has("header") && obj.has("payload")) {
JsonObject header = (JsonObject) obj.get("header");
JsonObject payload = (JsonObject) obj.get("payload");
if (header.has("name")) {
String messageName = header.get("name").getAsString();
if (START_TRANSCODER_REQUEST.equals(messageName)) {
if (payload.has(MEETING_ID)
&& payload.has(TRANSCODER_ID)
&& payload.has(PARAMS)){
String meetingId = payload.get(MEETING_ID).getAsString();
String transcoderId = payload.get(TRANSCODER_ID).getAsString();
Map<String,String> params = new Gson().fromJson(payload.get(PARAMS).toString(), new TypeToken<Map<String, String>>() {}.getType());
return new StartTranscoderRequestMessage(meetingId, transcoderId, params);
}
}
}
}
return null;
}
}

View File

@ -0,0 +1,58 @@
package org.bigbluebutton.common.messages;
import java.util.Map;
import java.util.HashMap;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.reflect.TypeToken;
public class StopTranscoderReplyMessage implements IBigBlueButtonMessage {
public static final String STOP_TRANSCODER_REPLY = "stop_transcoder_reply_message";
public static final String VERSION = "0.0.1";
public static final String MEETING_ID = "meeting_id";
public static final String TRANSCODER_ID = "transcoder_id";
public final String meetingId;
public final String transcoderId;
public StopTranscoderReplyMessage(String meetingId, String transcoderId) {
this.meetingId = meetingId;
this.transcoderId = transcoderId;
}
public String toJson() {
HashMap<String, Object> payload = new HashMap<String, Object>();
payload.put(MEETING_ID, meetingId);
payload.put(TRANSCODER_ID, transcoderId);
java.util.HashMap<String, Object> header = MessageBuilder.buildHeader(STOP_TRANSCODER_REPLY, VERSION, null);
return MessageBuilder.buildJson(header, payload);
}
public static StopTranscoderReplyMessage fromJson(String message) {
JsonParser parser = new JsonParser();
JsonObject obj = (JsonObject) parser.parse(message);
if (obj.has("header") && obj.has("payload")) {
JsonObject header = (JsonObject) obj.get("header");
JsonObject payload = (JsonObject) obj.get("payload");
if (header.has("name")) {
String messageName = header.get("name").getAsString();
if (STOP_TRANSCODER_REPLY.equals(messageName)) {
if ( payload.has(MEETING_ID)
&& payload.has(TRANSCODER_ID)){
String meetingId = payload.get(MEETING_ID).getAsString();
String transcoderId = payload.get(TRANSCODER_ID).getAsString();
return new StopTranscoderReplyMessage(meetingId, transcoderId);
}
}
}
}
return null;
}
}

View File

@ -0,0 +1,58 @@
package org.bigbluebutton.common.messages;
import java.util.Map;
import java.util.HashMap;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.reflect.TypeToken;
public class StopTranscoderRequestMessage implements IBigBlueButtonMessage {
public static final String STOP_TRANSCODER_REQUEST = "stop_transcoder_request_message";
public static final String VERSION = "0.0.1";
public static final String MEETING_ID = "meeting_id";
public static final String TRANSCODER_ID = "transcoder_id";
public final String meetingId;
public final String transcoderId;
public StopTranscoderRequestMessage(String meetingId, String transcoderId) {
this.meetingId = meetingId;
this.transcoderId = transcoderId;
}
public String toJson() {
HashMap<String, Object> payload = new HashMap<String, Object>();
payload.put(MEETING_ID, meetingId);
payload.put(TRANSCODER_ID, transcoderId);
java.util.HashMap<String, Object> header = MessageBuilder.buildHeader(STOP_TRANSCODER_REQUEST, VERSION, null);
return MessageBuilder.buildJson(header, payload);
}
public static StopTranscoderRequestMessage fromJson(String message) {
JsonParser parser = new JsonParser();
JsonObject obj = (JsonObject) parser.parse(message);
if (obj.has("header") && obj.has("payload")) {
JsonObject header = (JsonObject) obj.get("header");
JsonObject payload = (JsonObject) obj.get("payload");
if (header.has("name")) {
String messageName = header.get("name").getAsString();
if (STOP_TRANSCODER_REQUEST.equals(messageName)) {
if (payload.has(MEETING_ID)
&& payload.has(TRANSCODER_ID)){
String meetingId = payload.get(MEETING_ID).getAsString();
String transcoderId = payload.get(TRANSCODER_ID).getAsString();
return new StopTranscoderRequestMessage(meetingId, transcoderId);
}
}
}
}
return null;
}
}

View File

@ -0,0 +1,64 @@
package org.bigbluebutton.common.messages;
import java.util.Map;
import java.util.HashMap;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.reflect.TypeToken;
public class TranscoderStatusUpdateMessage implements IBigBlueButtonMessage {
public static final String TRANSCODER_STATUS_UPDATE = "transcoder_status_update";
public static final String VERSION = "0.0.1";
public static final String MEETING_ID = "meeting_id";
public static final String TRANSCODER_ID = "transcoder_id";
public static final String PARAMS = "params";
public final String meetingId;
public final String transcoderId;
public final Map<String,String> params;
public TranscoderStatusUpdateMessage(String meetingId, String transcoderId, Map<String,String> params) {
this.meetingId = meetingId;
this.transcoderId = transcoderId;
this.params = params;
}
public String toJson() {
HashMap<String, Object> payload = new HashMap<String, Object>();
payload.put(MEETING_ID, meetingId);
payload.put(TRANSCODER_ID, transcoderId);
payload.put(PARAMS, params);
java.util.HashMap<String, Object> header = MessageBuilder.buildHeader(TRANSCODER_STATUS_UPDATE, VERSION, null);
return MessageBuilder.buildJson(header, payload);
}
public static TranscoderStatusUpdateMessage fromJson(String message) {
JsonParser parser = new JsonParser();
JsonObject obj = (JsonObject) parser.parse(message);
if (obj.has("header") && obj.has("payload")) {
JsonObject header = (JsonObject) obj.get("header");
JsonObject payload = (JsonObject) obj.get("payload");
if (header.has("name")) {
String messageName = header.get("name").getAsString();
if (TRANSCODER_STATUS_UPDATE.equals(messageName)) {
if (payload.has(MEETING_ID)
&& payload.has(TRANSCODER_ID)
&& payload.has(PARAMS)){
String meetingId = payload.get(MEETING_ID).getAsString();
String transcoderId = payload.get(TRANSCODER_ID).getAsString();
Map<String,String> params = new Gson().fromJson(payload.get(PARAMS).toString(), new TypeToken<Map<String, String>>() {}.getType());
return new TranscoderStatusUpdateMessage(meetingId, transcoderId, params);
}
}
}
}
return null;
}
}

View File

@ -0,0 +1,64 @@
package org.bigbluebutton.common.messages;
import java.util.Map;
import java.util.HashMap;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.reflect.TypeToken;
public class UpdateTranscoderReplyMessage implements IBigBlueButtonMessage {
public static final String UPDATE_TRANSCODER_REPLY = "update_transcoder_reply_message";
public static final String VERSION = "0.0.1";
public static final String MEETING_ID = "meeting_id";
public static final String TRANSCODER_ID = "transcoder_id";
public static final String PARAMS = "params";
public final String meetingId;
public final String transcoderId;
public final Map<String,String> params;
public UpdateTranscoderReplyMessage(String meetingId, String transcoderId, Map<String,String> params) {
this.meetingId = meetingId;
this.transcoderId = transcoderId;
this.params = params;
}
public String toJson() {
HashMap<String, Object> payload = new HashMap<String, Object>();
payload.put(MEETING_ID, meetingId);
payload.put(TRANSCODER_ID, transcoderId);
payload.put(PARAMS, params);
java.util.HashMap<String, Object> header = MessageBuilder.buildHeader(UPDATE_TRANSCODER_REPLY, VERSION, null);
return MessageBuilder.buildJson(header, payload);
}
public static UpdateTranscoderReplyMessage fromJson(String message) {
JsonParser parser = new JsonParser();
JsonObject obj = (JsonObject) parser.parse(message);
if (obj.has("header") && obj.has("payload")) {
JsonObject header = (JsonObject) obj.get("header");
JsonObject payload = (JsonObject) obj.get("payload");
if (header.has("name")) {
String messageName = header.get("name").getAsString();
if (UPDATE_TRANSCODER_REPLY.equals(messageName)) {
if ( payload.has(MEETING_ID)
&& payload.has(TRANSCODER_ID)
&& payload.has(PARAMS)){
String meetingId = payload.get(MEETING_ID).getAsString();
String transcoderId = payload.get(TRANSCODER_ID).getAsString();
Map<String,String> params = new Gson().fromJson(payload.get(PARAMS).toString(), new TypeToken<Map<String, String>>() {}.getType());
return new UpdateTranscoderReplyMessage(meetingId, transcoderId, params);
}
}
}
}
return null;
}
}

View File

@ -0,0 +1,64 @@
package org.bigbluebutton.common.messages;
import java.util.Map;
import java.util.HashMap;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.reflect.TypeToken;
public class UpdateTranscoderRequestMessage implements IBigBlueButtonMessage {
public static final String UPDATE_TRANSCODER_REQUEST = "update_transcoder_request_message";
public static final String VERSION = "0.0.1";
public static final String MEETING_ID = "meeting_id";
public static final String TRANSCODER_ID = "transcoder_id";
public static final String PARAMS = "params";
public final String meetingId;
public final String transcoderId;
public final Map<String,String> params;
public UpdateTranscoderRequestMessage(String meetingId, String transcoderId, Map<String,String> params) {
this.meetingId = meetingId;
this.transcoderId = transcoderId;
this.params = params;
}
public String toJson() {
HashMap<String, Object> payload = new HashMap<String, Object>();
payload.put(MEETING_ID, meetingId);
payload.put(TRANSCODER_ID, transcoderId);
payload.put(PARAMS, params);
java.util.HashMap<String, Object> header = MessageBuilder.buildHeader(UPDATE_TRANSCODER_REQUEST, VERSION, null);
return MessageBuilder.buildJson(header, payload);
}
public static UpdateTranscoderRequestMessage fromJson(String message) {
JsonParser parser = new JsonParser();
JsonObject obj = (JsonObject) parser.parse(message);
if (obj.has("header") && obj.has("payload")) {
JsonObject header = (JsonObject) obj.get("header");
JsonObject payload = (JsonObject) obj.get("payload");
if (header.has("name")) {
String messageName = header.get("name").getAsString();
if (UPDATE_TRANSCODER_REQUEST.equals(messageName)) {
if (payload.has(MEETING_ID)
&& payload.has(TRANSCODER_ID)
&& payload.has(PARAMS)){
String meetingId = payload.get(MEETING_ID).getAsString();
String transcoderId = payload.get(TRANSCODER_ID).getAsString();
Map<String,String> params = new Gson().fromJson(payload.get(PARAMS).toString(), new TypeToken<Map<String, String>>() {}.getType());
return new UpdateTranscoderRequestMessage(meetingId, transcoderId, params);
}
}
}
}
return null;
}
}