- add new voice conf messages

This commit is contained in:
Richard Alam 2017-06-08 17:55:27 -07:00
parent 9b9b93c779
commit 1ccf71cad7
12 changed files with 398 additions and 193 deletions

View File

@ -43,4 +43,7 @@ trait SystemConfiguration {
lazy val toAkkaAppsJsonChannel = Try(config.getString("eventBus.toAkkaAppsChannel")).getOrElse("to-akka-apps-json-channel") lazy val toAkkaAppsJsonChannel = Try(config.getString("eventBus.toAkkaAppsChannel")).getOrElse("to-akka-apps-json-channel")
lazy val fromAkkaAppsJsonChannel = Try(config.getString("eventBus.fromAkkaAppsChannel")).getOrElse("from-akka-apps-json-channel") lazy val fromAkkaAppsJsonChannel = Try(config.getString("eventBus.fromAkkaAppsChannel")).getOrElse("from-akka-apps-json-channel")
lazy val fromAkkaAppsOldJsonChannel = Try(config.getString("eventBus.fromAkkaAppsOldChannel")).getOrElse("from-akka-apps-old-json-channel") lazy val fromAkkaAppsOldJsonChannel = Try(config.getString("eventBus.fromAkkaAppsOldChannel")).getOrElse("from-akka-apps-old-json-channel")
lazy val toVoiceConfRedisChannel = Try(config.getString("redis.toVoiceConfRedisChannel")).getOrElse("to-voice-conf-redis-channel")
lazy val fromVoiceConfRedisChannel = Try(config.getString("redis.fromVoiceConfRedisChannel")).getOrElse("from-void-conf-redis-channel")
} }

View File

@ -11,16 +11,16 @@ object FromAkkaAppsMsgSenderActor {
def props(msgSender: MessageSender): Props = Props(classOf[FromAkkaAppsMsgSenderActor], msgSender) def props(msgSender: MessageSender): Props = Props(classOf[FromAkkaAppsMsgSenderActor], msgSender)
} }
class FromAkkaAppsMsgSenderActor(msgSender: MessageSender) extends Actor with ActorLogging with SystemConfiguration { class FromAkkaAppsMsgSenderActor(msgSender: MessageSender)
extends Actor with ActorLogging with SystemConfiguration {
def receive = { def receive = {
case msg: BbbCommonEnvCoreMsg => handleBbbCommonEnvCoreMsg(msg) case msg: BbbCommonEnvCoreMsg => handleBbbCommonEnvCoreMsg(msg)
case _ => println("************* FromAkkaAppsMsgSenderActor Cannot handle message ") case _ => log.warning("Cannot handle message ")
} }
def handleBbbCommonEnvCoreMsg(msg: BbbCommonEnvCoreMsg): Unit = { def handleBbbCommonEnvCoreMsg(msg: BbbCommonEnvCoreMsg): Unit = {
val json = JsonUtil.toJson(msg) val json = JsonUtil.toJson(msg)
println("****** Publishing " + json)
msgSender.send(fromAkkaAppsRedisChannel, json) msgSender.send(fromAkkaAppsRedisChannel, json)
} }
} }

View File

@ -37,25 +37,51 @@ testOptions in Test += Tests.Argument(TestFrameworks.Specs2, "html", "console",
testOptions in Test += Tests.Argument(TestFrameworks.ScalaTest, "-h", "target/scalatest-reports") testOptions in Test += Tests.Argument(TestFrameworks.ScalaTest, "-h", "target/scalatest-reports")
val akkaVersion = "2.5.1"
val scalaTestV = "2.2.6"
libraryDependencies ++= { libraryDependencies ++= {
val akkaVersion = "2.5.1"
Seq( Seq(
"com.typesafe.akka" %% "akka-actor" % akkaVersion,
"com.typesafe.akka" %% "akka-testkit" % akkaVersion % "test",
"com.typesafe.akka" %% "akka-slf4j" % akkaVersion,
"ch.qos.logback" % "logback-classic" % "1.0.3", "ch.qos.logback" % "logback-classic" % "1.0.3",
"org.pegdown" % "pegdown" % "1.4.0",
"junit" % "junit" % "4.11", "junit" % "junit" % "4.11",
"com.github.etaty" % "rediscala_2.12" % "1.8.0",
"commons-codec" % "commons-codec" % "1.10", "commons-codec" % "commons-codec" % "1.10",
"joda-time" % "joda-time" % "2.3", "joda-time" % "joda-time" % "2.3",
"com.google.code.gson" % "gson" % "1.7.1", "org.apache.commons" % "commons-lang3" % "3.2"
"redis.clients" % "jedis" % "2.1.0",
"org.apache.commons" % "commons-lang3" % "3.2",
"org.bigbluebutton" % "bbb-common-message_2.12" % "0.0.19-SNAPSHOT",
"org.bigbluebutton" % "bbb-fsesl-client" % "0.0.4"
)} )}
libraryDependencies += "org.bigbluebutton" % "bbb-common-message_2.12" % "0.0.19-SNAPSHOT"
libraryDependencies += "org.bigbluebutton" % "bbb-fsesl-client" % "0.0.4"
// https://mvnrepository.com/artifact/org.scala-lang/scala-library
libraryDependencies += "org.scala-lang" % "scala-library" % "2.12.2"
// https://mvnrepository.com/artifact/org.scala-lang/scala-compiler
libraryDependencies += "org.scala-lang" % "scala-compiler" % "2.12.2"
// https://mvnrepository.com/artifact/com.typesafe.akka/akka-actor_2.12
libraryDependencies += "com.typesafe.akka" % "akka-actor_2.12" % akkaVersion
// https://mvnrepository.com/artifact/com.typesafe.akka/akka-slf4j_2.12
libraryDependencies += "com.typesafe.akka" % "akka-slf4j_2.12" % akkaVersion
// https://mvnrepository.com/artifact/com.github.etaty/rediscala_2.12
libraryDependencies += "com.github.etaty" % "rediscala_2.12" % "1.8.0"
// For generating test reports
libraryDependencies += "org.pegdown" % "pegdown" % "1.6.0" % "test"
// https://mvnrepository.com/artifact/com.typesafe.akka/akka-testkit_2.12
libraryDependencies += "com.typesafe.akka" % "akka-testkit_2.12" % "2.5.1" % "test"
// https://mvnrepository.com/artifact/org.scalactic/scalactic_2.12
libraryDependencies += "org.scalactic" % "scalactic_2.12" % "3.0.3" % "test"
// https://mvnrepository.com/artifact/org.scalatest/scalatest_2.12
libraryDependencies += "org.scalatest" % "scalatest_2.12" % "3.0.3" % "test"
libraryDependencies += "org.mockito" % "mockito-core" % "2.7.22" % "test"
seq(Revolver.settings: _*) seq(Revolver.settings: _*)
scalariformSettings scalariformSettings

View File

@ -1,21 +1,20 @@
/** /**
* BigBlueButton open source conferencing system - http://www.bigbluebutton.org/ * BigBlueButton open source conferencing system - http://www.bigbluebutton.org/
* * <p>
* Copyright (c) 2015 BigBlueButton Inc. and by respective authors (see below). * Copyright (c) 2015 BigBlueButton Inc. and by respective authors (see below).
* * <p>
* This program is free software; you can redistribute it and/or modify it under the * This program is free software; you can redistribute it and/or modify it under the
* terms of the GNU Lesser General Public License as published by the Free Software * terms of the GNU Lesser General Public License as published by the Free Software
* Foundation; either version 3.0 of the License, or (at your option) any later * Foundation; either version 3.0 of the License, or (at your option) any later
* version. * version.
* * <p>
* BigBlueButton is distributed in the hope that it will be useful, but WITHOUT ANY * BigBlueButton is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
* PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
* * <p>
* You should have received a copy of the GNU Lesser General Public License along * You should have received a copy of the GNU Lesser General Public License along
* with BigBlueButton; if not, see <http://www.gnu.org/licenses/>. * with BigBlueButton; if not, see <http://www.gnu.org/licenses/>.
* */
*/
package org.bigbluebutton.freeswitch.voice.freeswitch; package org.bigbluebutton.freeswitch.voice.freeswitch;
import java.io.File; import java.io.File;
@ -24,6 +23,7 @@ import java.util.concurrent.Executor;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.bigbluebutton.freeswitch.voice.freeswitch.actions.BroadcastConferenceCommand; import org.bigbluebutton.freeswitch.voice.freeswitch.actions.BroadcastConferenceCommand;
import org.bigbluebutton.freeswitch.voice.freeswitch.actions.EjectAllUsersCommand; import org.bigbluebutton.freeswitch.voice.freeswitch.actions.EjectAllUsersCommand;
import org.bigbluebutton.freeswitch.voice.freeswitch.actions.EjectUserCommand; import org.bigbluebutton.freeswitch.voice.freeswitch.actions.EjectUserCommand;
@ -36,149 +36,148 @@ import org.bigbluebutton.freeswitch.voice.freeswitch.actions.*;
public class FreeswitchApplication { public class FreeswitchApplication {
private static final int SENDERTHREADS = 1; private static final int SENDERTHREADS = 1;
private static final Executor msgSenderExec = Executors private static final Executor msgSenderExec = Executors.newFixedThreadPool(SENDERTHREADS);
.newFixedThreadPool(SENDERTHREADS); private static final Executor runExec = Executors.newFixedThreadPool(SENDERTHREADS);
private static final Executor runExec = Executors private BlockingQueue<FreeswitchCommand> messages = new LinkedBlockingQueue<FreeswitchCommand>();
.newFixedThreadPool(SENDERTHREADS);
private BlockingQueue<FreeswitchCommand> messages = new LinkedBlockingQueue<FreeswitchCommand>();
private final ConnectionManager manager; private final ConnectionManager manager;
private final String USER = "0"; /* not used for now */ private final String USER = "0"; /* not used for now */
private volatile boolean sendMessages = false; private volatile boolean sendMessages = false;
private final String audioProfile;
public FreeswitchApplication(ConnectionManager manager, String profile) { private final String audioProfile;
this.manager = manager;
this.audioProfile = profile;
}
private void queueMessage(FreeswitchCommand command) { public FreeswitchApplication(ConnectionManager manager, String profile) {
try { this.manager = manager;
messages.offer(command, 5, TimeUnit.SECONDS); this.audioProfile = profile;
} catch (InterruptedException e) { }
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public void transferUserToMeeting(String voiceConfId, private void queueMessage(FreeswitchCommand command) {
String targetVoiceConfId, String voiceUserId) { try {
TransferUserToMeetingCommand tutmc = new TransferUserToMeetingCommand( messages.offer(command, 5, TimeUnit.SECONDS);
voiceConfId, targetVoiceConfId, voiceUserId, this.audioProfile, } catch (InterruptedException e) {
USER); // TODO Auto-generated catch block
queueMessage(tutmc); e.printStackTrace();
} }
}
public void start() { public void transferUserToMeeting(String voiceConfId,
sendMessages = true; String targetVoiceConfId, String voiceUserId) {
Runnable sender = new Runnable() { TransferUserToMeetingCommand tutmc = new TransferUserToMeetingCommand(
public void run() { voiceConfId, targetVoiceConfId, voiceUserId, this.audioProfile,
while (sendMessages) { USER);
FreeswitchCommand message; queueMessage(tutmc);
try { }
message = messages.take();
sendMessageToFreeswitch(message);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
};
msgSenderExec.execute(sender);
}
public void getAllUsers(String voiceConfId) { public void start() {
GetAllUsersCommand prc = new GetAllUsersCommand(voiceConfId, USER); sendMessages = true;
queueMessage(prc); Runnable sender = new Runnable() {
} public void run() {
while (sendMessages) {
public void muteUser(String voiceConfId, String voiceUserId, Boolean mute) { FreeswitchCommand message;
MuteUserCommand mpc = new MuteUserCommand(voiceConfId, voiceUserId, mute, USER); try {
queueMessage(mpc); message = messages.take();
} sendMessageToFreeswitch(message);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
};
msgSenderExec.execute(sender);
}
public void eject(String voiceConfId, String voiceUserId) { public void getAllUsers(String voiceConfId) {
EjectUserCommand mpc = new EjectUserCommand(voiceConfId, voiceUserId, USER); GetAllUsersCommand prc = new GetAllUsersCommand(voiceConfId, USER);
queueMessage(mpc); queueMessage(prc);
} }
public void ejectAll(String voiceConfId) { public void muteUser(String voiceConfId, String voiceUserId, Boolean mute) {
EjectAllUsersCommand mpc = new EjectAllUsersCommand(voiceConfId, USER); MuteUserCommand mpc = new MuteUserCommand(voiceConfId, voiceUserId, mute, USER);
queueMessage(mpc); queueMessage(mpc);
} }
private Long genTimestamp() { public void eject(String voiceConfId, String voiceUserId) {
return TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); EjectUserCommand mpc = new EjectUserCommand(voiceConfId, voiceUserId, USER);
} queueMessage(mpc);
}
public void startRecording(String voiceConfId, String meetingid){ public void ejectAll(String voiceConfId) {
String RECORD_DIR = "/var/freeswitch/meetings"; EjectAllUsersCommand mpc = new EjectAllUsersCommand(voiceConfId, USER);
String voicePath = RECORD_DIR + File.separatorChar + meetingid + "-" + genTimestamp() + ".wav"; queueMessage(mpc);
}
RecordConferenceCommand rcc = new RecordConferenceCommand(voiceConfId, USER, true, voicePath); private Long genTimestamp() {
queueMessage(rcc); return TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
} }
public void stopRecording(String voiceConfId, String meetingid, String voicePath){ public void startRecording(String voiceConfId, String meetingid) {
RecordConferenceCommand rcc = new RecordConferenceCommand(voiceConfId, USER, false, voicePath); String RECORD_DIR = "/var/freeswitch/meetings";
queueMessage(rcc); String voicePath = RECORD_DIR + File.separatorChar + meetingid + "-" + genTimestamp() + ".wav";
}
public void deskShareBroadcastRTMP(String voiceConfId, String streamUrl, String timestamp, Boolean broadcast){ RecordConferenceCommand rcc = new RecordConferenceCommand(voiceConfId, USER, true, voicePath);
DeskShareBroadcastRTMPCommand rtmp = new DeskShareBroadcastRTMPCommand(voiceConfId, USER, streamUrl, timestamp, broadcast); queueMessage(rcc);
queueMessage(rtmp); }
}
public void deskShareHangUp(String voiceConfId, String fsConferenceName, String timestamp){ public void stopRecording(String voiceConfId, String meetingid, String voicePath) {
DeskShareHangUpCommand huCmd = new DeskShareHangUpCommand(voiceConfId, fsConferenceName, USER, timestamp); RecordConferenceCommand rcc = new RecordConferenceCommand(voiceConfId, USER, false, voicePath);
queueMessage(huCmd); queueMessage(rcc);
} }
private void sendMessageToFreeswitch(final FreeswitchCommand command) {
Runnable task = new Runnable() {
public void run() {
if (command instanceof GetAllUsersCommand) {
GetAllUsersCommand cmd = (GetAllUsersCommand) command;
System.out.println("Sending PopulateRoomCommand for conference = [" + cmd.getRoom() + "]");
manager.getUsers(cmd);
} else if (command instanceof MuteUserCommand) {
MuteUserCommand cmd = (MuteUserCommand) command;
System.out.println("Sending MuteParticipantCommand for conference = [" + cmd.getRoom() + "]");
manager.mute(cmd);
} else if (command instanceof EjectUserCommand) {
EjectUserCommand cmd = (EjectUserCommand) command;
System.out.println("Sending EjectParticipantCommand for conference = [" + cmd.getRoom() + "]");
manager.eject(cmd);
} else if (command instanceof EjectAllUsersCommand) {
EjectAllUsersCommand cmd = (EjectAllUsersCommand) command;
System.out.println("Sending EjectAllUsersCommand for conference = [" + cmd.getRoom() + "]");
manager.ejectAll(cmd);
} else if (command instanceof TransferUserToMeetingCommand) {
TransferUserToMeetingCommand cmd = (TransferUserToMeetingCommand) command;
System.out.println("Sending TransferUsetToMeetingCommand for conference = ["
+ cmd.getRoom() + "]");
manager.tranfer(cmd);
} else if (command instanceof RecordConferenceCommand) {
manager.record((RecordConferenceCommand) command);
} else if (command instanceof DeskShareBroadcastRTMPCommand) {
manager.broadcastRTMP((DeskShareBroadcastRTMPCommand)command);
} else if (command instanceof DeskShareHangUpCommand) {
DeskShareHangUpCommand cmd = (DeskShareHangUpCommand) command;
manager.hangUp(cmd);
} else if (command instanceof BroadcastConferenceCommand) {
manager.broadcast((BroadcastConferenceCommand) command);
}
}
};
runExec.execute(task); public void deskShareBroadcastRTMP(String voiceConfId, String streamUrl, String timestamp, Boolean broadcast) {
} DeskShareBroadcastRTMPCommand rtmp = new DeskShareBroadcastRTMPCommand(voiceConfId, USER, streamUrl, timestamp, broadcast);
queueMessage(rtmp);
}
public void stop() { public void deskShareHangUp(String voiceConfId, String fsConferenceName, String timestamp) {
sendMessages = false; DeskShareHangUpCommand huCmd = new DeskShareHangUpCommand(voiceConfId, fsConferenceName, USER, timestamp);
} queueMessage(huCmd);
}
private void sendMessageToFreeswitch(final FreeswitchCommand command) {
Runnable task = new Runnable() {
public void run() {
if (command instanceof GetAllUsersCommand) {
GetAllUsersCommand cmd = (GetAllUsersCommand) command;
System.out.println("Sending PopulateRoomCommand for conference = [" + cmd.getRoom() + "]");
manager.getUsers(cmd);
} else if (command instanceof MuteUserCommand) {
MuteUserCommand cmd = (MuteUserCommand) command;
System.out.println("Sending MuteParticipantCommand for conference = [" + cmd.getRoom() + "]");
manager.mute(cmd);
} else if (command instanceof EjectUserCommand) {
EjectUserCommand cmd = (EjectUserCommand) command;
System.out.println("Sending EjectParticipantCommand for conference = [" + cmd.getRoom() + "]");
manager.eject(cmd);
} else if (command instanceof EjectAllUsersCommand) {
EjectAllUsersCommand cmd = (EjectAllUsersCommand) command;
System.out.println("Sending EjectAllUsersCommand for conference = [" + cmd.getRoom() + "]");
manager.ejectAll(cmd);
} else if (command instanceof TransferUserToMeetingCommand) {
TransferUserToMeetingCommand cmd = (TransferUserToMeetingCommand) command;
System.out.println("Sending TransferUsetToMeetingCommand for conference = ["
+ cmd.getRoom() + "]");
manager.tranfer(cmd);
} else if (command instanceof RecordConferenceCommand) {
manager.record((RecordConferenceCommand) command);
} else if (command instanceof DeskShareBroadcastRTMPCommand) {
manager.broadcastRTMP((DeskShareBroadcastRTMPCommand) command);
} else if (command instanceof DeskShareHangUpCommand) {
DeskShareHangUpCommand cmd = (DeskShareHangUpCommand) command;
manager.hangUp(cmd);
} else if (command instanceof BroadcastConferenceCommand) {
manager.broadcast((BroadcastConferenceCommand) command);
}
}
};
runExec.execute(task);
}
public void stop() {
sendMessages = false;
}
} }

View File

@ -1,16 +1,13 @@
package org.bigbluebutton package org.bigbluebutton
import akka.actor.{ ActorSystem, Props } import akka.actor.{ ActorSystem, Props }
import scala.concurrent.duration._
import redis.RedisClient
import scala.concurrent.{ Future, Await }
import scala.concurrent.ExecutionContext.Implicits.global
import org.freeswitch.esl.client.manager.DefaultManagerConnection import org.freeswitch.esl.client.manager.DefaultManagerConnection
import org.bigbluebutton.endpoint.redis.{ RedisPublisher, AppsRedisSubscriberActor } import org.bigbluebutton.endpoint.redis.{ AppsRedisSubscriberActor, RedisPublisher }
import org.bigbluebutton.freeswitch.VoiceConferenceService import org.bigbluebutton.freeswitch.{ RxJsonMsgHdlrActor, VoiceConferenceService }
import org.bigbluebutton.freeswitch.bus.InsonMsgBus
import org.bigbluebutton.freeswitch.voice.FreeswitchConferenceEventListener import org.bigbluebutton.freeswitch.voice.FreeswitchConferenceEventListener
import org.bigbluebutton.freeswitch.voice.freeswitch.{ ESLEventListener, ConnectionManager, FreeswitchApplication } import org.bigbluebutton.freeswitch.voice.freeswitch.{ ConnectionManager, ESLEventListener, FreeswitchApplication }
import org.bigbluebutton.freeswitch.voice.IVoiceConferenceService
import org.bigbluebutton.freeswitch.pubsub.receivers.RedisMessageReceiver import org.bigbluebutton.freeswitch.pubsub.receivers.RedisMessageReceiver
object Boot extends App with SystemConfiguration { object Boot extends App with SystemConfiguration {
@ -19,7 +16,7 @@ object Boot extends App with SystemConfiguration {
val redisPublisher = new RedisPublisher(system) val redisPublisher = new RedisPublisher(system)
val eslConnection = new DefaultManagerConnection(eslHost, eslPort, eslPassword); val eslConnection = new DefaultManagerConnection(eslHost, eslPort, eslPassword)
val voiceConfService = new VoiceConferenceService(redisPublisher) val voiceConfService = new VoiceConferenceService(redisPublisher)
@ -36,5 +33,10 @@ object Boot extends App with SystemConfiguration {
val redisMsgReceiver = new RedisMessageReceiver(fsApplication) val redisMsgReceiver = new RedisMessageReceiver(fsApplication)
val redisSubscriberActor = system.actorOf(AppsRedisSubscriberActor.props(system, redisMsgReceiver), "redis-subscriber") val inJsonMsgBus = new InsonMsgBus
val redisMessageHandlerActor = system.actorOf(RxJsonMsgHdlrActor.props(fsApplication))
inJsonMsgBus.subscribe(redisMessageHandlerActor, toFsAppsJsonChannel)
val redisSubscriberActor = system.actorOf(AppsRedisSubscriberActor.props(system, redisMsgReceiver, inJsonMsgBus), "redis-subscriber")
} }

View File

@ -16,4 +16,8 @@ trait SystemConfiguration {
lazy val redisPort = Try(config.getInt("redis.port")).getOrElse(6379) lazy val redisPort = Try(config.getInt("redis.port")).getOrElse(6379)
lazy val redisPassword = Try(config.getString("redis.password")).getOrElse("") lazy val redisPassword = Try(config.getString("redis.password")).getOrElse("")
} lazy val toVoiceConfRedisChannel = Try(config.getString("redis.toVoiceConfRedisChannel")).getOrElse("to-voice-conf-redis-channel")
lazy val fromVoiceConfRedisChannel = Try(config.getString("redis.fromVoiceConfRedisChannel")).getOrElse("from-void-conf-redis-channel")
lazy val toFsAppsJsonChannel = Try(config.getString("eventBus.toFsAppsChannel")).getOrElse("to-fs-apps-json-channel")
lazy val fromFsAppsJsonChannel = Try(config.getString("eventBus.fromFsAppsChannel")).getOrElse("from-fs-apps-json-channel")
}

View File

@ -6,16 +6,13 @@ import java.net.InetSocketAddress
import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.DurationInt import scala.concurrent.duration.DurationInt
import org.bigbluebutton.SystemConfiguration import org.bigbluebutton.SystemConfiguration
import org.bigbluebutton.common.converters.FromJsonDecoder
import org.bigbluebutton.common.messages.PubSubPongMessage
import org.bigbluebutton.freeswitch.pubsub.receivers.RedisMessageReceiver import org.bigbluebutton.freeswitch.pubsub.receivers.RedisMessageReceiver
import akka.actor.ActorSystem import akka.actor.ActorSystem
import akka.actor.OneForOneStrategy import akka.actor.OneForOneStrategy
import akka.actor.Props import akka.actor.Props
import akka.actor.SupervisorStrategy.Resume import akka.actor.SupervisorStrategy.Resume
import org.bigbluebutton.freeswitch.bus.{ InJsonMsg, InsonMsgBus, ReceivedJsonMsg }
import redis.actors.RedisSubscriberActor import redis.actors.RedisSubscriberActor
import redis.api.pubsub.Message import redis.api.pubsub.Message
import redis.api.pubsub.PMessage import redis.api.pubsub.PMessage
@ -26,13 +23,14 @@ object AppsRedisSubscriberActor extends SystemConfiguration {
val channels = Seq("time") val channels = Seq("time")
val patterns = Seq("bigbluebutton:to-voice-conf:*", "bigbluebutton:from-bbb-apps:*") val patterns = Seq("bigbluebutton:to-voice-conf:*", "bigbluebutton:from-bbb-apps:*")
def props(system: ActorSystem, msgReceiver: RedisMessageReceiver): Props = def props(system: ActorSystem, msgReceiver: RedisMessageReceiver, inJsonMgBus: InsonMsgBus): Props =
Props(classOf[AppsRedisSubscriberActor], system, msgReceiver, Props(classOf[AppsRedisSubscriberActor], system, msgReceiver, inJsonMgBus,
redisHost, redisPort, redisHost, redisPort,
channels, patterns).withDispatcher("akka.rediscala-subscriber-worker-dispatcher") channels, patterns).withDispatcher("akka.rediscala-subscriber-worker-dispatcher")
} }
class AppsRedisSubscriberActor(val system: ActorSystem, msgReceiver: RedisMessageReceiver, redisHost: String, class AppsRedisSubscriberActor(val system: ActorSystem, msgReceiver: RedisMessageReceiver,
inJsonMgBus: InsonMsgBus, redisHost: String,
redisPort: Int, redisPort: Int,
channels: Seq[String] = Nil, patterns: Seq[String] = Nil) channels: Seq[String] = Nil, patterns: Seq[String] = Nil)
extends RedisSubscriberActor(new InetSocketAddress(redisHost, redisPort), extends RedisSubscriberActor(new InetSocketAddress(redisHost, redisPort),
@ -48,7 +46,7 @@ class AppsRedisSubscriberActor(val system: ActorSystem, msgReceiver: RedisMessag
} }
} }
val decoder = new FromJsonDecoder() // val decoder = new FromJsonDecoder()
var lastPongReceivedOn = 0L var lastPongReceivedOn = 0L
system.scheduler.schedule(10 seconds, 10 seconds)(checkPongMessage()) system.scheduler.schedule(10 seconds, 10 seconds)(checkPongMessage())
@ -67,26 +65,17 @@ class AppsRedisSubscriberActor(val system: ActorSystem, msgReceiver: RedisMessag
def onMessage(message: Message) { def onMessage(message: Message) {
log.debug(s"message received: $message") log.debug(s"message received: $message")
if (message.channel == toVoiceConfRedisChannel) {
val receivedJsonMessage = new ReceivedJsonMsg(message.channel, message.data.utf8String)
log.debug(s"RECEIVED:\n [${receivedJsonMessage.channel}] \n ${receivedJsonMessage.data} \n")
inJsonMgBus.publish(InJsonMsg(toFsAppsJsonChannel, receivedJsonMessage))
}
} }
def onPMessage(pmessage: PMessage) { def onPMessage(pmessage: PMessage) {
// log.debug(s"pattern message received: $pmessage") // log.debug(s"pattern message received: $pmessage")
val msg = decoder.decodeMessage(pmessage.data.utf8String) msgReceiver.handleMessage(pmessage.patternMatched, pmessage.channel, pmessage.data.utf8String)
if (msg != null) {
msg match {
case m: PubSubPongMessage => {
if (m.payload.system == "BbbFsESL") {
lastPongReceivedOn = System.currentTimeMillis()
}
}
case _ => // do nothing
}
} else {
msgReceiver.handleMessage(pmessage.patternMatched, pmessage.channel, pmessage.data.utf8String)
}
} }
def handleMessage(msg: String) { def handleMessage(msg: String) {

View File

@ -0,0 +1,7 @@
package org.bigbluebutton.freeswitch
import org.bigbluebutton.common2.messages.Deserializer
trait RxJsonMsgDeserializer {
object JsonDeserializer extends Deserializer
}

View File

@ -0,0 +1,36 @@
package org.bigbluebutton.freeswitch
import akka.actor.{ Actor, ActorLogging, Props }
import com.fasterxml.jackson.databind.JsonNode
import org.bigbluebutton.SystemConfiguration
import org.bigbluebutton.common2.messages.BbbCoreEnvelope
import org.bigbluebutton.freeswitch.bus.{ ReceivedJsonMsg }
import org.bigbluebutton.freeswitch.voice.freeswitch.FreeswitchApplication
object RxJsonMsgHdlrActor {
def props(fsApp: FreeswitchApplication): Props =
Props(classOf[RxJsonMsgHdlrActor], fsApp)
}
class RxJsonMsgHdlrActor(fsApp: FreeswitchApplication) extends Actor with ActorLogging
with SystemConfiguration with RxJsonMsgDeserializer {
def receive = {
case msg: ReceivedJsonMsg =>
log.debug("handling {} - {}", msg.channel, msg.data)
handleReceivedJsonMessage(msg)
case _ => // do nothing
}
def handleReceivedJsonMessage(msg: ReceivedJsonMsg): Unit = {
for {
envJsonNode <- JsonDeserializer.toBbbCommonEnvJsNodeMsg(msg.data)
} yield handle(envJsonNode.envelope, envJsonNode.core)
}
def handle(envelope: BbbCoreEnvelope, jsonNode: JsonNode): Unit = {
log.debug("Route envelope name " + envelope.name)
envelope.name match {
case _ => // do nothing
}
}
}

View File

@ -0,0 +1,31 @@
package org.bigbluebutton.freeswitch.bus
import akka.actor.ActorRef
import akka.event.{ EventBus, LookupClassification }
case class ReceivedJsonMsg(channel: String, data: String)
case class InJsonMsg(val topic: String, val payload: ReceivedJsonMsg)
class InsonMsgBus extends EventBus with LookupClassification {
type Event = InJsonMsg
type Classifier = String
type Subscriber = ActorRef
// is used for extracting the classifier from the incoming events
override protected def classify(event: Event): Classifier = event.topic
// will be invoked for each event for all subscribers which registered themselves
// for the events classifier
override protected def publish(event: Event, subscriber: Subscriber): Unit = {
subscriber ! event.payload
}
// must define a full order over the subscribers, expressed as expected from
// `java.lang.Comparable.compare`
override protected def compareSubscribers(a: Subscriber, b: Subscriber): Int =
a.compareTo(b)
// determines the initial size of the index data structure
// used internally (i.e. the expected number of different classifiers)
override protected def mapSize: Int = 128
}

View File

@ -10,9 +10,9 @@ object MessageTypes {
} }
// seal trait to force all classes that extends this trait to be defined in this file. // seal trait to force all classes that extends this trait to be defined in this file.
sealed trait BbbCoreMsg trait BbbCoreMsg
sealed trait BbbCommonMsg sealed trait BbbCommonMsg
sealed trait BbbCoreHeader trait BbbCoreHeader
case class RoutingEnvelope(msgType: String, meetingId: String, userId: String) case class RoutingEnvelope(msgType: String, meetingId: String, userId: String)
case class BbbMsgToClientEnvelope(name: String, routing: RoutingEnvelope) case class BbbMsgToClientEnvelope(name: String, routing: RoutingEnvelope)
@ -56,6 +56,8 @@ case class UserBroadcastCamStartMsg(header: BbbClientMsgHeader, body: UserBroadc
object UserBroadcastCamStopMsg { val NAME = "UserBroadcastCamStopMsg"} object UserBroadcastCamStopMsg { val NAME = "UserBroadcastCamStopMsg"}
case class UserBroadcastCamStopMsg(header: BbbClientMsgHeader, body: UserBroadcastCamStopMsgBody) extends BbbCoreMsg case class UserBroadcastCamStopMsg(header: BbbClientMsgHeader, body: UserBroadcastCamStopMsgBody) extends BbbCoreMsg
/** Event messages sent by Akka apps as result of receiving incoming messages ***/ /** Event messages sent by Akka apps as result of receiving incoming messages ***/
object MeetingCreatedEvtMsg { val NAME = "MeetingCreatedEvtMsg"} object MeetingCreatedEvtMsg { val NAME = "MeetingCreatedEvtMsg"}
case class MeetingCreatedEvtMsg(header: BbbCoreBaseHeader, case class MeetingCreatedEvtMsg(header: BbbCoreBaseHeader,

View File

@ -0,0 +1,106 @@
package org.bigbluebutton.common2.messages.voiceconf
import org.bigbluebutton.common2.messages.{BbbCoreHeader, BbbCoreHeaderWithMeetingId, BbbCoreMsg}
/*** Message from Akka Apps to FS Conference ***/
object EjectAllFromVoiceConfMsg { val NAME = "EjectAllFromVoiceConfMsg" }
case class EjectAllFromVoiceConfMsg(header: BbbCoreHeaderWithMeetingId,
body: EjectAllFromVoiceConfMsgBody) extends BbbCoreMsg
case class EjectAllFromVoiceConfMsgBody(voiceConf: String)
object EjectUserFromVoiceConfMsg { val NAME = "EjectUserFromVoiceConfMsg"}
case class EjectUserFromVoiceConfMsg(header: BbbCoreHeaderWithMeetingId,
body: EjectUserFromVoiceConfMsgBody) extends BbbCoreMsg
case class EjectUserFromVoiceConfMsgBody(voiceConf: String, voiceUserId: String)
object MuteUserInVoiceConfMsg { val NAME = "MuteUserInVoiceConfMsg" }
case class MuteUserInVoiceConfMsg(header: BbbCoreHeaderWithMeetingId,
body: MuteUserInVoiceConfMsgBody) extends BbbCoreMsg
case class MuteUserInVoiceConfMsgBody(voiceConf: String, voiceUserId: String, mute: Boolean)
object TransferUserToVoiceConfMsg { val NAME = "TransferUserToVoiceConfMsg" }
case class TransferUserToVoiceConfMsg(header: BbbCoreHeaderWithMeetingId,
body: TransferUserToVoiceConfMsgBody) extends BbbCoreMsg
case class TransferUserToVoiceConfMsgBody(fromVoiceConf: String, toVoiceConf: String, voiceUserId: String)
object StartRecordingVoiceConfMsg { val NAME = "StartRecordingVoiceConfMsg" }
case class StartRecordingVoiceConfMsg(header: BbbCoreHeaderWithMeetingId,
body: StartRecordingVoiceConfMsgBody) extends BbbCoreMsg
case class StartRecordingVoiceConfMsgBody(voiceConf: String)
object StopRecordingVoiceConfMsg { val NAME = "StopRecordingVoiceConfMsg" }
case class StopRecordingVoiceConfMsg(header: BbbCoreHeaderWithMeetingId,
body: StopRecordingVoiceConfMsgBody) extends BbbCoreMsg
case class StopRecordingVoiceConfMsgBody(voiceConf: String, stream: String)
object DeskshareStartRtmpBroadcastVoiceConfMsg { val NAME = "DeskshareStartRtmpBroadcastVoiceConfMsg" }
case class DeskshareStartRtmpBroadcastVoiceConfMsg(header: BbbCoreHeaderWithMeetingId,
body: DeskshareStartRtmpBroadcastVoiceConfMsgBody) extends BbbCoreMsg
case class DeskshareStartRtmpBroadcastVoiceConfMsgBody(voiceConf: String, deskshareConf: String, url: String, timestamp: String)
object DeskshareStopRtmpBroadcastVoiceConfMsg { val NAME = "DeskshareStopRtmpBroadcastVoiceConfMsg" }
case class DeskshareStopRtmpBroadcastVoiceConfMsg(header: BbbCoreHeaderWithMeetingId,
body: DeskshareStopRtmpBroadcastVoiceConfMsgBody) extends BbbCoreMsg
case class DeskshareStopRtmpBroadcastVoiceConfMsgBody(voiceConf: String, deskshareConf: String, url: String, timestamp: String)
object DeskshareHangUpVoiceConfMsg { val NAME = "DeskshareHangUpVoiceConfMsg" }
case class DeskshareHangUpVoiceConfMsg(header: BbbCoreHeaderWithMeetingId,
body: DeskshareHangUpVoiceConfMsgBody) extends BbbCoreMsg
case class DeskshareHangUpVoiceConfMsgBody(voiceConf: String, deskshareConf: String, timestamp: String)
/*** Message from FS Conference to Akka Apps ***/
case class BbbCoreVoiceConfHeader(name: String, voiceConf: String) extends BbbCoreHeader
object RecordingStartedVoiceConfEvtMsg { val NAME = "RecordingStartedVoiceConfEvtMsg" }
case class RecordingStartedVoiceConfEvtMsg(header: BbbCoreVoiceConfHeader,
body: RecordingStartedVoiceConfEvtMsgBody) extends BbbCoreMsg
case class RecordingStartedVoiceConfEvtMsgBody(voiceConf: String, stream: String, recording: Boolean, timestamp: String)
object UserJoinedVoiceConfEvtMsg { val NAME = "UserJoinedVoiceConfEvtMsg" }
case class UserJoinedVoiceConfEvtMsg(header: BbbCoreVoiceConfHeader,
body: UserJoinedVoiceConfEvtMsgBody) extends BbbCoreMsg
case class UserJoinedVoiceConfEvtMsgBody(voiceConf: String, voiceUserId: String, userId: String,
callerIdName: String, callerIdNum: String, muted: Boolean,
talking: Boolean, avatarUrl: String)
object UserLeftVoiceConfEvtMsg { val NAME = "UserLeftVoiceConfEvtMsg" }
case class UserLeftVoiceConfEvtMsg(header: BbbCoreVoiceConfHeader,
body: UserLeftVoiceConfEvtMsgBody) extends BbbCoreMsg
case class UserLeftVoiceConfEvtMsgBody(voiceConf: String, voiceUserId: String)
object UserMutedInVoiceConfEvtMsg { val NAME = "UserMutedInVoiceConfEvtMsg" }
case class UserMutedInVoiceConfEvtMsg(header: BbbCoreVoiceConfHeader,
body: UserMutedInVoiceConfEvtMsgBody) extends BbbCoreMsg
case class UserMutedInVoiceConfEvtMsgBody(voiceConf: String, voiceUserId: String, muted: Boolean)
object UserTalkingInVoiceConfEvtMsg { val NAME = "UserTalkingInVoiceConfEvtMsg" }
case class UserTalkingInVoiceConfEvtMsg(header: BbbCoreVoiceConfHeader,
body: UserTalkingInVoiceConfEvtMsgBody) extends BbbCoreMsg
case class UserTalkingInVoiceConfEvtMsgBody(voiceConf: String, voiceUserId: String, talking: Boolean)
object DeskshareStartedVoiceConfEvtMsg { val NAME = "DeskshareStartedVoiceConfEvtMsg" }
case class DeskshareStartedVoiceConfEvtMsg(header: BbbCoreVoiceConfHeader,
body: DeskshareStartedVoiceConfEvtMsgBody) extends BbbCoreMsg
case class DeskshareStartedVoiceConfEvtMsgBody(voiceConf: String, deskshareConf: String,
callerIdNum: String, callerIdName: String)
object DeskshareStoppedVoiceConfEvtMsg { val NAME = "DeskshareStoppedVoiceConfEvtMsg"}
case class DeskshareStoppedVoiceConfEvtMsg(header: BbbCoreVoiceConfHeader,
body: DeskshareStoppedVoiceConfEvtMsgBody) extends BbbCoreMsg
case class DeskshareStoppedVoiceConfEvtMsgBody(voiceConf: String, deskshareConf: String,
callerIdNum: String, callerIdName: String)
object DeskshareRtmpBroadcastStartedVoiceConfEvtMsg { val NAME = "DeskshareRtmpBroadcastStartedVoiceConfEvtMsg"}
case class DeskshareRtmpBroadcastStartedVoiceConfEvtMsg(header: BbbCoreVoiceConfHeader,
body: DeskshareRtmpBroadcastStartedVoiceConfEvtMsgBody) extends BbbCoreMsg
case class DeskshareRtmpBroadcastStartedVoiceConfEvtMsgBody(voiceConf: String, deskshareConf: String,
stream: String, vidWidth: String, vidHeight: String,
timestamp: String)
object DeskshareRtmpBroadcastStoppedVoiceConfEvtMsg { val NAME = "DeskshareRtmpBroadcastStoppedVoiceConfEvtMsg"}
case class DeskshareRtmpBroadcastStoppedVoiceConfEvtMsg(header: BbbCoreVoiceConfHeader,
body: DeskshareRtmpBroadcastStoppedVoiceConfEvtMsgBody) extends BbbCoreMsg
case class DeskshareRtmpBroadcastStoppedVoiceConfEvtMsgBody(voiceConf: String, deskshareConf: String,
stream: String, vidWidth: String, vidHeight: String,
timestamp: String)