Merge pull request #4008 from ritzalam/refactor-akka-fsesl-for-new-messages

Refactor akka fsesl for new messages
This commit is contained in:
Richard Alam 2017-06-12 17:28:49 -04:00 committed by GitHub
commit 080441e27b
47 changed files with 1330 additions and 401 deletions

View File

@ -43,4 +43,7 @@ trait SystemConfiguration {
lazy val toAkkaAppsJsonChannel = Try(config.getString("eventBus.toAkkaAppsChannel")).getOrElse("to-akka-apps-json-channel")
lazy val fromAkkaAppsJsonChannel = Try(config.getString("eventBus.fromAkkaAppsChannel")).getOrElse("from-akka-apps-json-channel")
lazy val fromAkkaAppsOldJsonChannel = Try(config.getString("eventBus.fromAkkaAppsOldChannel")).getOrElse("from-akka-apps-old-json-channel")
lazy val toVoiceConfRedisChannel = Try(config.getString("redis.toVoiceConfRedisChannel")).getOrElse("to-voice-conf-redis-channel")
lazy val fromVoiceConfRedisChannel = Try(config.getString("redis.fromVoiceConfRedisChannel")).getOrElse("from-voice-conf-redis-channel")
}

View File

@ -106,6 +106,8 @@ class BigBlueButtonActor(val system: ActorSystem,
eventBus.subscribe(m.actorRef, m.props.screenshareProps.screenshareConf)
bbbMsgBus.subscribe(m.actorRef, m.props.meetingProp.intId)
bbbMsgBus.subscribe(m.actorRef, m.props.voiceProp.voiceConf)
bbbMsgBus.subscribe(m.actorRef, m.props.screenshareProps.screenshareConf)
RunningMeetings.add(meetings, m)

View File

@ -420,7 +420,8 @@ trait UsersApp {
}
def handleUserJoinedVoiceFromPhone(msg: UserJoinedVoiceConfMessage) = {
log.info("User joining from phone. meetingId=" + props.meetingProp.intId + " userId=" + msg.userId + " extUserId=" + msg.externUserId)
log.info("User joining from phone. meetingId=" + props.meetingProp.intId + " userId=" + msg.userId
+ " extUserId=" + msg.externUserId)
Users.getUserWithVoiceUserId(msg.voiceUserId, liveMeeting.users) match {
case Some(user) => {

View File

@ -63,14 +63,7 @@ class RegisteredUsers {
}
}
case class RegisteredUser(
id: String,
externId: String,
name: String,
role: String,
authToken: String,
avatarURL: String,
guest: Boolean,
authed: Boolean,
waitingForAcceptance: Boolean)
case class RegisteredUser(id: String, externId: String, name: String, role: String,
authToken: String, avatarURL: String, guest: Boolean,
authed: Boolean, waitingForAcceptance: Boolean)

View File

@ -2,14 +2,14 @@ package org.bigbluebutton.core.models
import com.softwaremill.quicklens._
object Streams {
trait Streams {
def add(stream: Stream, user: String): Stream = {
def addViewer(stream: MediaStream, user: String): MediaStream = {
val newViewers = stream.viewers + user
modify(stream)(_.viewers).setTo(newViewers)
}
def remove(stream: Stream, user: String): Stream = {
def removeViewer(stream: MediaStream, user: String): MediaStream = {
val newViewers = stream.viewers - user
modify(stream)(_.viewers).setTo(newViewers)
}
@ -20,4 +20,4 @@ object Streams {
* https://en.wikipedia.org/wiki/Session_Description_Protocol
*/
case class MediaAttribute(key: String, value: String)
case class Stream(id: String, sessionId: String, attributes: Set[MediaAttribute], viewers: Set[String])
case class MediaStream(id: String, sessionId: String, userId: String, attributes: Set[MediaAttribute], viewers: Set[String])

View File

@ -0,0 +1,44 @@
package org.bigbluebutton.core.models
object Users2x {
def findUserWithIntId(users: Users2x, intId: String): Option[String] = {
users.toVector find (u => u == intId)
}
def add(users: Users2x, user: String): Option[String] = {
users.save(user)
Some(user)
}
def remove(users: Users2x, intId: String): Option[String] = {
users.remove(intId)
}
def removeUserWithId(users: Users2x, voiceUserId: String): Option[String] = {
users.remove(voiceUserId)
}
}
class Users2x {
private var users: Set[String] = Set.empty
private def toVector: Vector[String] = users.toVector
private def save(user: String): String = {
users = users + user
user
}
private def remove(id: String): Option[String] = {
val user = for {
user <- users.find(u => u == id)
} yield {
users = users.filterNot(u => u == id)
user
}
user
}
}

View File

@ -0,0 +1,75 @@
package org.bigbluebutton.core.models
import com.softwaremill.quicklens.modify
object UsersState {
def findWithIntId(users: UsersState, intId: String): Option[UserState] = users.toVector.find(u => u.intId == intId)
def findModerators(users: UsersState): Vector[UserState] = users.toVector.filter(u => u.role == Roles.MODERATOR_ROLE)
def findPresenters(users: UsersState): Vector[UserState] = users.toVector.filter(u => u.role == Roles.PRESENTER_ROLE)
def findViewers(users: UsersState): Vector[UserState] = users.toVector.filter(u => u.role == Roles.VIEWER_ROLE)
def hasModerator(users: UsersState): Boolean = users.toVector.filter(u => u.role == Roles.MODERATOR_ROLE).length > 0
def hasPresenter(users: UsersState): Boolean = users.toVector.filter(u => u.role == Roles.PRESENTER_ROLE).length > 0
def hasNoPresenter(users: UsersState): Boolean = users.toVector.filter(u => u.role == Roles.PRESENTER_ROLE).length == 0
def hasUserWithIntId(users: UsersState, intId: String): Boolean = {
findWithIntId(users, intId) match {
case Some(u) => true
case None => false
}
}
def numUsers(users: UsersState): Int = users.toVector.size
def usersWhoAreNotPresenter(users: UsersState): Vector[UserState] = users.toVector filter (u => u.presenter == false)
def unbecomePresenter(users: UsersState, intId: String) = {
for {
u <- findWithIntId(users, intId)
user = modify(u)(_.presenter).setTo(false)
} yield users.save(user)
}
def becomePresenter(users: UsersState, intId: String) = {
for {
u <- findWithIntId(users, intId)
user = modify(u)(_.presenter).setTo(true)
} yield users.save(user)
}
def isModerator(id: String, users: Users): Boolean = {
Users.findWithId(id, users) match {
case Some(user) => return user.role == Roles.MODERATOR_ROLE && !user.waitingForAcceptance
case None => return false
}
}
def add(users: UsersState, state: UserState): Option[UserState] = {
users.save(state)
Some(state)
}
def remove(users: UsersState, intId: String): Option[UserState] = {
users.remove(intId)
}
}
class UsersState {
private var usersStatus: collection.immutable.HashMap[String, UserState] = new collection.immutable.HashMap[String, UserState]
private def toVector: Vector[UserState] = usersStatus.values.toVector
private def save(user: UserState): UserState = {
usersStatus += user.intId -> user
user
}
private def remove(intId: String): Option[UserState] = {
val user = usersStatus.get(intId)
user foreach (u => usersStatus -= intId)
user
}
}
case class UserState(intId: String, extId: String, name: String, role: String,
guest: Boolean, authed: Boolean, waitingForAcceptance: Boolean, emoji: String, locked: Boolean,
presenter: Boolean, avatar: String)

View File

@ -0,0 +1,49 @@
package org.bigbluebutton.core.models
object VoiceUsers {
def findUserWithVoiceUserId(users: VoiceUsers, voiceUserId: String): Option[VoiceUser2x] = {
users.toVector find (u => u.intId == voiceUserId)
}
def add(users: VoiceUsers, user: VoiceUser2x): Option[VoiceUser2x] = {
users.save(user)
Some(user)
}
def remove(users: VoiceUsers, intId: String): Option[VoiceUser2x] = {
users.remove(intId)
}
def removeUserWithId(users: VoiceUsers, voiceUserId: String): Option[VoiceUser2x] = {
users.remove(voiceUserId)
}
}
class VoiceUsers {
private var users: Set[VoiceUser2x] = Set.empty
private var voiceUsersState = new VoiceUsersState
private def toVector: Vector[VoiceUser2x] = users.toVector
private def save(user: VoiceUser2x): VoiceUser2x = {
users = users + user
user
}
private def remove(id: String): Option[VoiceUser2x] = {
val user = for {
user <- users.find(u => u.intId == id)
} yield {
users = users.filterNot(u => u.intId == id)
user
}
user
}
}
case class VoiceUser2x(intId: String, voiceUserId: String)
case class VoiceUserVO2x(intId: String, voiceUserId: String, callerName: String,
callerNum: String, joined: Boolean, locked: Boolean, muted: Boolean,
talking: Boolean, callingWith: String, listenOnly: Boolean)

View File

@ -0,0 +1,70 @@
package org.bigbluebutton.core.models
import com.softwaremill.quicklens._
object VoiceUsersState {
def findWithId(users: VoiceUsersState, userId: String): Option[VoiceUserState] = users.toVector.find(u => u.intId == userId)
def add(users: VoiceUsersState, state: VoiceUserState): Option[VoiceUserState] = {
users.save(state)
Some(state)
}
def remove(users: VoiceUsersState, intId: String): Option[VoiceUserState] = {
users.remove(intId)
}
def joinedVoiceListenOnly(users: VoiceUsersState, userId: String): Option[VoiceUserState] = {
for {
u <- findWithId(users, userId)
vu = u.modify(_.talking).setTo(false)
.modify(_.listenOnly).setTo(true)
} yield {
users.save(vu)
vu
}
}
def leftVoiceListenOnly(users: VoiceUsersState, userId: String): Option[VoiceUserState] = {
for {
u <- findWithId(users, userId)
vu = u.modify(_.talking).setTo(false)
.modify(_.listenOnly).setTo(false)
} yield {
users.save(vu)
vu
}
}
def setUserTalking(users: VoiceUsersState, user: VoiceUserState, talking: Boolean): VoiceUserState = {
val nv = user.modify(_.talking).setTo(talking)
users.save(nv)
nv
}
def setUserMuted(users: VoiceUsersState, user: VoiceUserState, muted: Boolean): VoiceUserState = {
val talking: Boolean = if (muted) false else user.talking
val nv = user.modify(_.muted).setTo(muted).modify(_.talking).setTo(talking)
users.save(nv)
nv
}
}
class VoiceUsersState {
private var users: collection.immutable.HashMap[String, VoiceUserState] = new collection.immutable.HashMap[String, VoiceUserState]
private def toVector: Vector[VoiceUserState] = users.values.toVector
private def save(user: VoiceUserState): VoiceUserState = {
users += user.intId -> user
user
}
private def remove(intId: String): Option[VoiceUserState] = {
val user = users.get(intId)
user foreach (u => users -= intId)
user
}
}
case class VoiceUserState(intId: String, voiceUserId: String, callingWith: String, callerName: String,
callerNum: String, muted: Boolean, talking: Boolean, listenOnly: Boolean)

View File

@ -6,9 +6,20 @@ object Webcams {
}
def findWebcamsForUser(webcams: Webcams, userId: String): Vector[WebcamStream] = {
webcams.toVector.filter(w => w.userId == userId)
webcams.toVector.filter(w => w.stream.userId == userId)
}
def addWebcamBroadcastStream(webcams: Webcams, webcamStream: WebcamStream): Option[WebcamStream] = {
webcams.save(webcamStream)
Some(webcamStream)
}
def removeWebcamBroadcastStream(webcams: Webcams, streamId: String): Option[WebcamStream] = {
for {
stream <- findWithStreamId(webcams, streamId)
removedStream <- webcams.remove(streamId)
} yield removedStream
}
}
class Webcams {
@ -28,4 +39,4 @@ class Webcams {
}
}
case class WebcamStream(userId: String, stream: Stream)
case class WebcamStream(streamId: String, stream: MediaStream)

View File

@ -3,136 +3,22 @@ package org.bigbluebutton.core.pubsub.senders
import com.fasterxml.jackson.databind.JsonNode
import org.bigbluebutton.SystemConfiguration
import org.bigbluebutton.common2.messages._
import org.bigbluebutton.core.bus.{ BbbMsgEvent, BbbMsgRouterEventBus, ReceivedJsonMessage }
import scala.util.{ Failure, Success }
trait ReceivedJsonMsgDeserializer extends SystemConfiguration {
this: ReceivedJsonMsgHandlerActor =>
object JsonDeserializer extends Deserializer
def routeCreateMeetingReqMsg(envelope: BbbCoreEnvelope, jsonNode: JsonNode): Unit = {
def deserialize(jsonNode: JsonNode): Option[CreateMeetingReqMsg] = {
val (result, error) = JsonDeserializer.toBbbCommonMsg[CreateMeetingReqMsg](jsonNode)
result match {
case Some(msg) => Some(msg.asInstanceOf[CreateMeetingReqMsg])
case None =>
log.error("Failed to deserialize CreateMeetingReqMsg message " + error)
None
}
}
def deserialize[T](jsonNode: JsonNode)(implicit m: Manifest[T]): Option[T] = {
val (result, error) = JsonDeserializer.toBbbCommonMsg[T](jsonNode)
def send(envelope: BbbCoreEnvelope, msg: CreateMeetingReqMsg): Unit = {
val event = BbbMsgEvent(meetingManagerChannel, BbbCommonEnvCoreMsg(envelope, msg))
publish(event)
}
for {
m <- deserialize(jsonNode)
} yield {
send(envelope, m)
result match {
case Some(msg) =>
Some(msg.asInstanceOf[T])
case None =>
log.error("Failed to deserialize message. error: {} \n msg: ", error, jsonNode)
None
}
}
def routeValidateAuthTokenReqMsg(envelope: BbbCoreEnvelope, jsonNode: JsonNode): Unit = {
def deserialize(jsonNode: JsonNode): Option[ValidateAuthTokenReqMsg] = {
val (result, error) = JsonDeserializer.toBbbCommonMsg[ValidateAuthTokenReqMsg](jsonNode)
result match {
case Some(msg) => Some(msg.asInstanceOf[ValidateAuthTokenReqMsg])
case None =>
log.error("Failed to deserialize ValidateAuthTokenReqMsg message " + error)
None
}
}
def send(envelope: BbbCoreEnvelope, msg: ValidateAuthTokenReqMsg): Unit = {
val event = BbbMsgEvent(msg.header.meetingId, BbbCommonEnvCoreMsg(envelope, msg))
publish(event)
}
for {
m <- deserialize(jsonNode)
} yield {
send(envelope, m)
}
}
def routeRegisterUserReqMsg(envelope: BbbCoreEnvelope, jsonNode: JsonNode): Unit = {
def deserialize(jsonNode: JsonNode): Option[RegisterUserReqMsg] = {
val (result, error) = JsonDeserializer.toBbbCommonMsg[RegisterUserReqMsg](jsonNode)
result match {
case Some(msg) =>
Some(msg.asInstanceOf[RegisterUserReqMsg])
case None =>
log.error("Failed to RegisterUserReqMsg message " + error)
None
}
}
def send(envelope: BbbCoreEnvelope, msg: RegisterUserReqMsg): Unit = {
// Route via meeting manager as there is a race condition if we send directly to meeting
// because the meeting actor might not have been created yet.
val event = BbbMsgEvent(meetingManagerChannel, BbbCommonEnvCoreMsg(envelope, msg))
publish(event)
}
for {
m <- deserialize(jsonNode)
} yield {
send(envelope, m)
}
}
def routeUserBroadcastCamStartMsg(envelope: BbbCoreEnvelope, jsonNode: JsonNode): Unit = {
def deserialize(jsonNode: JsonNode): Option[UserBroadcastCamStartMsg] = {
val (result, error) = JsonDeserializer.toBbbCommonMsg[UserBroadcastCamStartMsg](jsonNode)
result match {
case Some(msg) =>
Some(msg.asInstanceOf[UserBroadcastCamStartMsg])
case None =>
log.error("Failed to UserShareWebcamMsg message " + error)
None
}
}
def send(envelope: BbbCoreEnvelope, msg: UserBroadcastCamStartMsg): Unit = {
val event = BbbMsgEvent(msg.header.meetingId, BbbCommonEnvCoreMsg(envelope, msg))
publish(event)
}
for {
m <- deserialize(jsonNode)
} yield {
send(envelope, m)
}
}
def routeUserBroadcastCamStopMsg(envelope: BbbCoreEnvelope, jsonNode: JsonNode): Unit = {
def deserialize(jsonNode: JsonNode): Option[UserBroadcastCamStopMsg] = {
val (result, error) = JsonDeserializer.toBbbCommonMsg[UserBroadcastCamStopMsg](jsonNode)
result match {
case Some(msg) =>
Some(msg.asInstanceOf[UserBroadcastCamStopMsg])
case None =>
log.error("Failed to UserShareWebcamMsg message " + error)
None
}
}
def send(envelope: BbbCoreEnvelope, msg: UserBroadcastCamStopMsg): Unit = {
val event = BbbMsgEvent(msg.header.meetingId, BbbCommonEnvCoreMsg(envelope, msg))
publish(event)
}
for {
m <- deserialize(jsonNode)
} yield {
send(envelope, m)
}
}
}

View File

@ -4,6 +4,7 @@ import akka.actor.{ Actor, ActorLogging, Props }
import org.bigbluebutton.SystemConfiguration
import com.fasterxml.jackson.databind.JsonNode
import org.bigbluebutton.common2.messages._
import org.bigbluebutton.common2.messages.voiceconf._
import org.bigbluebutton.core.bus._
import org.bigbluebutton.core2.ReceivedMessageRouter
@ -33,19 +34,83 @@ class ReceivedJsonMsgHandlerActor(
} yield handle(envJsonNode.envelope, envJsonNode.core)
}
def send(channel: String, envelope: BbbCoreEnvelope, msg: BbbCoreMsg): Unit = {
val event = BbbMsgEvent(channel, BbbCommonEnvCoreMsg(envelope, msg))
publish(event)
}
def handle(envelope: BbbCoreEnvelope, jsonNode: JsonNode): Unit = {
log.debug("Route envelope name " + envelope.name)
envelope.name match {
case CreateMeetingReqMsg.NAME =>
routeCreateMeetingReqMsg(envelope, jsonNode)
for {
m <- deserialize[CreateMeetingReqMsg](jsonNode)
} yield {
send(meetingManagerChannel, envelope, m)
}
case ValidateAuthTokenReqMsg.NAME =>
routeValidateAuthTokenReqMsg(envelope, jsonNode)
for {
m <- deserialize[ValidateAuthTokenReqMsg](jsonNode)
} yield {
send(m.header.meetingId, envelope, m)
}
case RegisterUserReqMsg.NAME =>
routeRegisterUserReqMsg(envelope, jsonNode)
for {
m <- deserialize[RegisterUserReqMsg](jsonNode)
} yield {
// Route via meeting manager as there is a race condition if we send directly to meeting
// because the meeting actor might not have been created yet.
send(meetingManagerChannel, envelope, m)
}
case UserJoinMeetingReqMsg.NAME =>
for {
m <- deserialize[UserJoinMeetingReqMsg](jsonNode)
} yield {
send(m.header.userId, envelope, m)
}
case UserBroadcastCamStartMsg.NAME =>
routeUserBroadcastCamStartMsg(envelope, jsonNode)
for {
m <- deserialize[UserBroadcastCamStartMsg](jsonNode)
} yield {
val event = BbbMsgEvent(m.header.meetingId, BbbCommonEnvCoreMsg(envelope, m))
publish(event)
}
case UserBroadcastCamStopMsg.NAME =>
routeUserBroadcastCamStopMsg(envelope, jsonNode)
for {
m <- deserialize[UserBroadcastCamStopMsg](jsonNode)
} yield {
send(m.header.meetingId, envelope, m)
}
case RecordingStartedVoiceConfEvtMsg.NAME =>
for {
m <- deserialize[RecordingStartedVoiceConfEvtMsg](jsonNode)
} yield {
send(m.header.voiceConf, envelope, m)
}
case UserJoinedVoiceConfEvtMsg.NAME =>
for {
m <- deserialize[UserJoinedVoiceConfEvtMsg](jsonNode)
} yield {
send(m.header.voiceConf, envelope, m)
}
case UserLeftVoiceConfEvtMsg.NAME =>
for {
m <- deserialize[UserLeftVoiceConfEvtMsg](jsonNode)
} yield {
send(m.header.voiceConf, envelope, m)
}
case UserMutedInVoiceConfEvtMsg.NAME =>
for {
m <- deserialize[UserMutedInVoiceConfEvtMsg](jsonNode)
} yield {
send(m.header.voiceConf, envelope, m)
}
case UserTalkingInVoiceConfEvtMsg.NAME =>
for {
m <- deserialize[UserTalkingInVoiceConfEvtMsg](jsonNode)
} yield {
send(m.header.voiceConf, envelope, m)
}
case _ =>
log.error("Cannot route envelope name " + envelope.name)
// do nothing

View File

@ -5,7 +5,7 @@ import java.util.concurrent.TimeUnit
import org.bigbluebutton.common2.domain.DefaultProps
import org.bigbluebutton.core.api._
import org.bigbluebutton.core.apps._
import org.bigbluebutton.core.models.{ RegisteredUsers, Users }
import org.bigbluebutton.core.models._
import org.bigbluebutton.core2.MeetingStatus2x
class LiveMeeting(val props: DefaultProps,
@ -19,7 +19,12 @@ class LiveMeeting(val props: DefaultProps,
val presModel: PresentationModel,
val breakoutModel: BreakoutRoomModel,
val captionModel: CaptionModel,
val notesModel: SharedNotesModel)
val notesModel: SharedNotesModel,
val webcams: Webcams,
val voiceUsers: VoiceUsers,
val voiceUsersState: VoiceUsersState,
val users2x: Users2x,
val usersState: UsersState)
extends ChatModelTrait {
def hasMeetingEnded(): Boolean = {

View File

@ -9,13 +9,14 @@ import akka.util.Timeout
import org.bigbluebutton.common2.domain.DefaultProps
import org.bigbluebutton.common2.messages.MessageBody.ValidateAuthTokenRespMsgBody
import org.bigbluebutton.common2.messages._
import org.bigbluebutton.common2.messages.voiceconf.UserJoinedVoiceConfEvtMsg
import org.bigbluebutton.core._
import org.bigbluebutton.core.api._
import org.bigbluebutton.core.apps._
import org.bigbluebutton.core.bus._
import org.bigbluebutton.core.models.{ RegisteredUsers, Users }
import org.bigbluebutton.core2.MeetingStatus2x
import org.bigbluebutton.core2.message.handlers.{ UserBroadcastCamStartMsgHdlr, UserBroadcastCamStopMsgHdlr }
import org.bigbluebutton.core2.message.handlers.{ UserBroadcastCamStartMsgHdlr, UserBroadcastCamStopMsgHdlr, UserJoinMeetingReqMsgHdlr, UserJoinedVoiceConfEvtMsgHdlr }
import scala.concurrent.duration._
@ -35,7 +36,9 @@ class MeetingActor(val props: DefaultProps,
with BreakoutRoomApp with CaptionApp
with SharedNotesApp with PermisssionCheck
with UserBroadcastCamStartMsgHdlr
with UserBroadcastCamStopMsgHdlr {
with UserBroadcastCamStopMsgHdlr
with UserJoinedVoiceConfEvtMsgHdlr
with UserJoinMeetingReqMsgHdlr {
override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
case e: Exception => {
@ -184,8 +187,11 @@ class MeetingActor(val props: DefaultProps,
msg.core match {
case m: ValidateAuthTokenReqMsg => handleValidateAuthTokenReqMsg(m)
case m: RegisterUserReqMsg => handleRegisterUserReqMsg(m)
case m: UserJoinMeetingReqMsg => handle(m)
case m: UserBroadcastCamStartMsg => handleUserBroadcastCamStartMsg(m)
case m: UserBroadcastCamStopMsg => handleUserBroadcastCamStopMsg(m)
case m: UserJoinedVoiceConfEvtMsg => handle(m)
case _ => println("***** Cannot handle " + msg.envelope.name)
}
}

View File

@ -105,9 +105,9 @@ class MeetingActorInternal(val props: DefaultProps,
}
def handleMonitor() {
handleMonitorActivity()
// handleMonitorActivity()
handleMonitorNumberOfWebUsers()
handleMonitorExpiration()
// handleMonitorExpiration()
}
def handleMessage(msg: Object) {

View File

@ -4,8 +4,8 @@ import akka.actor.ActorContext
import org.bigbluebutton.common2.domain.{ DefaultProps, Meeting2x }
import org.bigbluebutton.core.apps._
import org.bigbluebutton.core.bus._
import org.bigbluebutton.core.models.{ RegisteredUsers, Users }
import org.bigbluebutton.core.{ OutMessageGateway }
import org.bigbluebutton.core.models._
import org.bigbluebutton.core.OutMessageGateway
import org.bigbluebutton.core2.MeetingStatus2x
object RunningMeeting {
@ -28,14 +28,19 @@ class RunningMeeting(val props: DefaultProps, val outGW: OutMessageGateway,
val users = new Users
val registeredUsers = new RegisteredUsers
val meetingStatux2x = new MeetingStatus2x
val webcams = new Webcams
val voiceUsers = new VoiceUsers
val voiceUsersState = new VoiceUsersState
val users2x = new Users2x
val usersState = new UsersState
// meetingModel.setGuestPolicy(props.usersProp.guestPolicy)
// We extract the meeting handlers into this class so it is
// easy to test.
val liveMeeting = new LiveMeeting(props, meetingStatux2x, chatModel, layoutModel,
users, registeredUsers, pollModel,
wbModel, presModel, breakoutModel, captionModel, notesModel)
users, registeredUsers, pollModel, wbModel, presModel, breakoutModel, captionModel,
notesModel, webcams, voiceUsers, voiceUsersState, users2x, usersState)
val actorRef = context.actorOf(MeetingActor.props(props, eventBus, outGW, liveMeeting), props.meetingProp.intId)

View File

@ -11,16 +11,16 @@ object FromAkkaAppsMsgSenderActor {
def props(msgSender: MessageSender): Props = Props(classOf[FromAkkaAppsMsgSenderActor], msgSender)
}
class FromAkkaAppsMsgSenderActor(msgSender: MessageSender) extends Actor with ActorLogging with SystemConfiguration {
class FromAkkaAppsMsgSenderActor(msgSender: MessageSender)
extends Actor with ActorLogging with SystemConfiguration {
def receive = {
case msg: BbbCommonEnvCoreMsg => handleBbbCommonEnvCoreMsg(msg)
case _ => println("************* FromAkkaAppsMsgSenderActor Cannot handle message ")
case _ => log.warning("Cannot handle message ")
}
def handleBbbCommonEnvCoreMsg(msg: BbbCommonEnvCoreMsg): Unit = {
val json = JsonUtil.toJson(msg)
println("****** Publishing " + json)
msgSender.send(fromAkkaAppsRedisChannel, json)
}
}

View File

@ -0,0 +1,8 @@
package org.bigbluebutton.core2.message.handlers
/**
* Created by ritz on 2017-06-09.
*/
trait RecordingStartedVoiceConfEvtMsgHdlr {
}

View File

@ -1,9 +1,9 @@
package org.bigbluebutton.core2.message.handlers
import org.bigbluebutton.common2.messages.MessageBody.{ UserBroadcastCamStartedEvtMsgBody }
import org.bigbluebutton.common2.messages.MessageBody.UserBroadcastCamStartedEvtMsgBody
import org.bigbluebutton.common2.messages._
import org.bigbluebutton.core.OutMessageGateway
import org.bigbluebutton.core.models.Users
import org.bigbluebutton.core.models.{ MediaStream, WebcamStream, Webcams }
import org.bigbluebutton.core.running.MeetingActor
trait UserBroadcastCamStartMsgHdlr {
@ -14,7 +14,7 @@ trait UserBroadcastCamStartMsgHdlr {
def handleUserBroadcastCamStartMsg(msg: UserBroadcastCamStartMsg): Unit = {
def broadcastEvent(msg: UserBroadcastCamStartMsg): Unit = {
val routing = Routing.addMsgToClientRouting(MessageTypes.BROADCAST, props.meetingProp.intId, msg.header.userId)
val routing = Routing.addMsgToClientRouting(MessageTypes.BROADCAST_TO_MEETING, props.meetingProp.intId, msg.header.userId)
val envelope = BbbCoreEnvelope(UserBroadcastCamStartedEvtMsg.NAME, routing)
val header = BbbClientMsgHeader(UserBroadcastCamStartedEvtMsg.NAME, props.meetingProp.intId, msg.header.userId)
@ -26,8 +26,11 @@ trait UserBroadcastCamStartMsgHdlr {
record(event)
}
val stream = new MediaStream(msg.body.stream, msg.body.stream, msg.header.userId, Set.empty, Set.empty)
val webcamStream = new WebcamStream(msg.body.stream, stream)
for {
uvo <- Users.userSharedWebcam(msg.header.userId, liveMeeting.users, msg.body.stream)
uvo <- Webcams.addWebcamBroadcastStream(liveMeeting.webcams, webcamStream)
} yield {
broadcastEvent(msg)
}

View File

@ -1,9 +1,9 @@
package org.bigbluebutton.core2.message.handlers
import org.bigbluebutton.common2.messages.MessageBody.{ UserBroadcastCamStoppedEvtMsgBody }
import org.bigbluebutton.common2.messages.MessageBody.UserBroadcastCamStoppedEvtMsgBody
import org.bigbluebutton.common2.messages._
import org.bigbluebutton.core.OutMessageGateway
import org.bigbluebutton.core.models.Users
import org.bigbluebutton.core.models.{ Users, Webcams }
import org.bigbluebutton.core.running.MeetingActor
trait UserBroadcastCamStopMsgHdlr {
@ -14,7 +14,7 @@ trait UserBroadcastCamStopMsgHdlr {
def handleUserBroadcastCamStopMsg(msg: UserBroadcastCamStopMsg): Unit = {
def broadcastEvent(msg: UserBroadcastCamStopMsg): Unit = {
val routing = Routing.addMsgToClientRouting(MessageTypes.BROADCAST, props.meetingProp.intId, msg.header.userId)
val routing = Routing.addMsgToClientRouting(MessageTypes.BROADCAST_TO_MEETING, props.meetingProp.intId, msg.header.userId)
val envelope = BbbCoreEnvelope(UserBroadcastCamStoppedEvtMsg.NAME, routing)
val header = BbbClientMsgHeader(UserBroadcastCamStoppedEvtMsg.NAME, props.meetingProp.intId, msg.header.userId)
@ -27,7 +27,7 @@ trait UserBroadcastCamStopMsgHdlr {
}
for {
uvo <- Users.userUnsharedWebcam(msg.header.userId, liveMeeting.users, msg.body.stream)
uvo <- Webcams.removeWebcamBroadcastStream(liveMeeting.webcams, msg.body.stream)
} yield {
broadcastEvent(msg)

View File

@ -0,0 +1,45 @@
package org.bigbluebutton.core2.message.handlers
import org.bigbluebutton.common2.messages._
import org.bigbluebutton.core.OutMessageGateway
import org.bigbluebutton.core.models.{ RegisteredUsers, UserState, Users2x, UsersState }
import org.bigbluebutton.core.running.MeetingActor
trait UserJoinMeetingReqMsgHdlr {
this: MeetingActor =>
val outGW: OutMessageGateway
def handle(msg: UserJoinMeetingReqMsg): Unit = {
log.warning("Received user joined voice conference " + msg)
def broadcastEvent(userState: UserState): Unit = {
val routing = Routing.addMsgToClientRouting(MessageTypes.BROADCAST_TO_MEETING, props.meetingProp.intId, userState.intId)
val envelope = BbbCoreEnvelope(UserJoinedMeetingEvtMsg.NAME, routing)
val header = BbbClientMsgHeader(UserJoinedMeetingEvtMsg.NAME, props.meetingProp.intId, userState.intId)
val body = UserJoinedMeetingEvtMsgBody(intId = userState.intId, extId = userState.extId, name = userState.name,
role = userState.role, guest = userState.guest, authed = userState.authed,
waitingForAcceptance = userState.waitingForAcceptance, emoji = userState.emoji,
presenter = userState.presenter, locked = userState.locked, avatar = userState.avatar)
val event = UserJoinedMeetingEvtMsg(header, body)
val msgEvent = BbbCommonEnvCoreMsg(envelope, event)
outGW.send(msgEvent)
record(event)
}
for {
regUser <- RegisteredUsers.findWithToken(msg.body.authToken, liveMeeting.registeredUsers)
} yield {
val userState = UserState(intId = regUser.id, extId = regUser.externId, name = regUser.name, role = regUser.role,
guest = regUser.guest, authed = regUser.authed, waitingForAcceptance = regUser.waitingForAcceptance,
emoji = "none", presenter = false, locked = false, avatar = regUser.avatarURL)
Users2x.add(liveMeeting.users2x, regUser.id)
UsersState.add(liveMeeting.usersState, userState)
broadcastEvent(userState)
}
}
}

View File

@ -0,0 +1,43 @@
package org.bigbluebutton.core2.message.handlers
import org.bigbluebutton.common2.messages._
import org.bigbluebutton.common2.messages.voiceconf.{ UserJoinedVoiceConfEvtMsg, UserJoinedVoiceConfToClientEvtMsg, UserJoinedVoiceConfToClientEvtMsgBody }
import org.bigbluebutton.core.OutMessageGateway
import org.bigbluebutton.core.models.{ VoiceUser2x, VoiceUserState, VoiceUsers, VoiceUsersState }
import org.bigbluebutton.core.running.MeetingActor
trait UserJoinedVoiceConfEvtMsgHdlr {
this: MeetingActor =>
val outGW: OutMessageGateway
def handle(msg: UserJoinedVoiceConfEvtMsg): Unit = {
log.warning("Received user joined voice conference " + msg)
def broadcastEvent(voiceUserState: VoiceUserState): Unit = {
val routing = Routing.addMsgToClientRouting(MessageTypes.BROADCAST_TO_MEETING, props.meetingProp.intId, voiceUserState.intId)
val envelope = BbbCoreEnvelope(UserJoinedVoiceConfToClientEvtMsg.NAME, routing)
val header = BbbClientMsgHeader(UserJoinedVoiceConfToClientEvtMsg.NAME, props.meetingProp.intId, voiceUserState.intId)
val body = UserJoinedVoiceConfToClientEvtMsgBody(intId = voiceUserState.intId, voiceUserId = voiceUserState.voiceUserId,
callerName = voiceUserState.callerName, callerNum = voiceUserState.callerNum, muted = voiceUserState.muted,
talking = voiceUserState.talking, callingWith = voiceUserState.callingWith, listenOnly = voiceUserState.listenOnly)
val event = UserJoinedVoiceConfToClientEvtMsg(header, body)
val msgEvent = BbbCommonEnvCoreMsg(envelope, event)
outGW.send(msgEvent)
record(event)
}
val voiceUser = VoiceUser2x(msg.body.intId, msg.body.voiceUserId)
val voiceUserState = VoiceUserState(intId = msg.body.intId, voiceUserId = msg.body.voiceUserId,
callingWith = msg.body.callingWith, callerName = msg.body.callerIdName, callerNum = msg.body.callerIdNum,
muted = msg.body.muted, talking = msg.body.talking, listenOnly = false)
VoiceUsers.add(liveMeeting.voiceUsers, voiceUser)
VoiceUsersState.add(liveMeeting.voiceUsersState, voiceUserState)
broadcastEvent(voiceUserState)
}
}

View File

@ -0,0 +1,8 @@
package org.bigbluebutton.core2.message.handlers
/**
* Created by ritz on 2017-06-09.
*/
trait UserLeftVoiceConfEvtMsgHdlr {
}

View File

@ -0,0 +1,8 @@
package org.bigbluebutton.core2.message.handlers
/**
* Created by ritz on 2017-06-09.
*/
trait UserMutedInVoiceConfEvtMsgHdlr {
}

View File

@ -0,0 +1,8 @@
package org.bigbluebutton.core2.message.handlers
/**
* Created by ritz on 2017-06-09.
*/
trait UserTalkingInVoiceConfEvtMsgHdlr {
}

View File

@ -18,7 +18,7 @@ import redis.api.servers.ClientSetname
object AppsRedisSubscriberActor extends SystemConfiguration {
val TO_AKKA_APPS = "bbb:to-akka-apps"
val channels = Seq("time", toAkkaAppsRedisChannel)
val channels = Seq(toAkkaAppsRedisChannel, fromVoiceConfRedisChannel)
val patterns = Seq("bigbluebutton:to-bbb-apps:*", "bigbluebutton:from-voice-conf:*")
def props(msgReceiver: RedisMessageReceiver, jsonMsgBus: IncomingJsonMessageBus): Props =
@ -49,7 +49,7 @@ class AppsRedisSubscriberActor(msgReceiver: RedisMessageReceiver, jsonMsgBus: In
def onMessage(message: Message) {
//log.error(s"SHOULD NOT BE RECEIVING: $message")
if (message.channel == toAkkaAppsRedisChannel) {
if (message.channel == toAkkaAppsRedisChannel || message.channel == fromVoiceConfRedisChannel) {
val receivedJsonMessage = new ReceivedJsonMessage(message.channel, message.data.utf8String)
log.debug(s"RECEIVED:\n [${receivedJsonMessage.channel}] \n ${receivedJsonMessage.data} \n")
jsonMsgBus.publish(IncomingJsonMessage(toAkkaAppsJsonChannel, receivedJsonMessage))

View File

@ -27,10 +27,11 @@ class ReceivedJsonMsgHandlerTraitTests extends UnitSpec
val header = BbbCoreBaseHeader(CreateMeetingReqMsg.NAME)
val body = CreateMeetingReqMsgBody(defaultProps)
val req = CreateMeetingReqMsg(header, body)
val eventMsg = BbbMsgEvent(meetingManagerChannel, BbbCommonEnvCoreMsg(envelope, req))
object JsonDeserializer extends Deserializer
classUnderTest.send(envelope, req)
classUnderTest.publish(eventMsg)
// Then verify the class under test used the mock object as expected
// The disconnect user shouldn't be called as user has ability to eject another user

View File

@ -37,25 +37,51 @@ testOptions in Test += Tests.Argument(TestFrameworks.Specs2, "html", "console",
testOptions in Test += Tests.Argument(TestFrameworks.ScalaTest, "-h", "target/scalatest-reports")
val akkaVersion = "2.5.1"
val scalaTestV = "2.2.6"
libraryDependencies ++= {
val akkaVersion = "2.5.1"
Seq(
"com.typesafe.akka" %% "akka-actor" % akkaVersion,
"com.typesafe.akka" %% "akka-testkit" % akkaVersion % "test",
"com.typesafe.akka" %% "akka-slf4j" % akkaVersion,
"ch.qos.logback" % "logback-classic" % "1.0.3",
"org.pegdown" % "pegdown" % "1.4.0",
"junit" % "junit" % "4.11",
"com.github.etaty" % "rediscala_2.12" % "1.8.0",
"commons-codec" % "commons-codec" % "1.10",
"joda-time" % "joda-time" % "2.3",
"com.google.code.gson" % "gson" % "1.7.1",
"redis.clients" % "jedis" % "2.1.0",
"org.apache.commons" % "commons-lang3" % "3.2",
"org.bigbluebutton" % "bbb-common-message_2.12" % "0.0.19-SNAPSHOT",
"org.bigbluebutton" % "bbb-fsesl-client" % "0.0.4"
"org.apache.commons" % "commons-lang3" % "3.2"
)}
libraryDependencies += "org.bigbluebutton" % "bbb-common-message_2.12" % "0.0.19-SNAPSHOT"
libraryDependencies += "org.bigbluebutton" % "bbb-fsesl-client" % "0.0.4"
// https://mvnrepository.com/artifact/org.scala-lang/scala-library
libraryDependencies += "org.scala-lang" % "scala-library" % "2.12.2"
// https://mvnrepository.com/artifact/org.scala-lang/scala-compiler
libraryDependencies += "org.scala-lang" % "scala-compiler" % "2.12.2"
// https://mvnrepository.com/artifact/com.typesafe.akka/akka-actor_2.12
libraryDependencies += "com.typesafe.akka" % "akka-actor_2.12" % akkaVersion
// https://mvnrepository.com/artifact/com.typesafe.akka/akka-slf4j_2.12
libraryDependencies += "com.typesafe.akka" % "akka-slf4j_2.12" % akkaVersion
// https://mvnrepository.com/artifact/com.github.etaty/rediscala_2.12
libraryDependencies += "com.github.etaty" % "rediscala_2.12" % "1.8.0"
// For generating test reports
libraryDependencies += "org.pegdown" % "pegdown" % "1.6.0" % "test"
// https://mvnrepository.com/artifact/com.typesafe.akka/akka-testkit_2.12
libraryDependencies += "com.typesafe.akka" % "akka-testkit_2.12" % "2.5.1" % "test"
// https://mvnrepository.com/artifact/org.scalactic/scalactic_2.12
libraryDependencies += "org.scalactic" % "scalactic_2.12" % "3.0.3" % "test"
// https://mvnrepository.com/artifact/org.scalatest/scalatest_2.12
libraryDependencies += "org.scalatest" % "scalatest_2.12" % "3.0.3" % "test"
libraryDependencies += "org.mockito" % "mockito-core" % "2.7.22" % "test"
seq(Revolver.settings: _*)
scalariformSettings

View File

@ -1,21 +1,20 @@
/**
* BigBlueButton open source conferencing system - http://www.bigbluebutton.org/
*
* Copyright (c) 2015 BigBlueButton Inc. and by respective authors (see below).
*
* This program is free software; you can redistribute it and/or modify it under the
* terms of the GNU Lesser General Public License as published by the Free Software
* Foundation; either version 3.0 of the License, or (at your option) any later
* version.
*
* BigBlueButton is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
* PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License along
* with BigBlueButton; if not, see <http://www.gnu.org/licenses/>.
*
*/
* BigBlueButton open source conferencing system - http://www.bigbluebutton.org/
* <p>
* Copyright (c) 2015 BigBlueButton Inc. and by respective authors (see below).
* <p>
* This program is free software; you can redistribute it and/or modify it under the
* terms of the GNU Lesser General Public License as published by the Free Software
* Foundation; either version 3.0 of the License, or (at your option) any later
* version.
* <p>
* BigBlueButton is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
* PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
* <p>
* You should have received a copy of the GNU Lesser General Public License along
* with BigBlueButton; if not, see <http://www.gnu.org/licenses/>.
*/
package org.bigbluebutton.freeswitch.voice.freeswitch;
import java.io.File;
@ -24,6 +23,7 @@ import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.bigbluebutton.freeswitch.voice.freeswitch.actions.BroadcastConferenceCommand;
import org.bigbluebutton.freeswitch.voice.freeswitch.actions.EjectAllUsersCommand;
import org.bigbluebutton.freeswitch.voice.freeswitch.actions.EjectUserCommand;
@ -36,149 +36,148 @@ import org.bigbluebutton.freeswitch.voice.freeswitch.actions.*;
public class FreeswitchApplication {
private static final int SENDERTHREADS = 1;
private static final Executor msgSenderExec = Executors
.newFixedThreadPool(SENDERTHREADS);
private static final Executor runExec = Executors
.newFixedThreadPool(SENDERTHREADS);
private BlockingQueue<FreeswitchCommand> messages = new LinkedBlockingQueue<FreeswitchCommand>();
private static final int SENDERTHREADS = 1;
private static final Executor msgSenderExec = Executors.newFixedThreadPool(SENDERTHREADS);
private static final Executor runExec = Executors.newFixedThreadPool(SENDERTHREADS);
private BlockingQueue<FreeswitchCommand> messages = new LinkedBlockingQueue<FreeswitchCommand>();
private final ConnectionManager manager;
private final ConnectionManager manager;
private final String USER = "0"; /* not used for now */
private final String USER = "0"; /* not used for now */
private volatile boolean sendMessages = false;
private final String audioProfile;
private volatile boolean sendMessages = false;
public FreeswitchApplication(ConnectionManager manager, String profile) {
this.manager = manager;
this.audioProfile = profile;
}
private final String audioProfile;
private void queueMessage(FreeswitchCommand command) {
try {
messages.offer(command, 5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public FreeswitchApplication(ConnectionManager manager, String profile) {
this.manager = manager;
this.audioProfile = profile;
}
public void transferUserToMeeting(String voiceConfId,
String targetVoiceConfId, String voiceUserId) {
TransferUserToMeetingCommand tutmc = new TransferUserToMeetingCommand(
voiceConfId, targetVoiceConfId, voiceUserId, this.audioProfile,
USER);
queueMessage(tutmc);
private void queueMessage(FreeswitchCommand command) {
try {
messages.offer(command, 5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public void start() {
sendMessages = true;
Runnable sender = new Runnable() {
public void run() {
while (sendMessages) {
FreeswitchCommand message;
try {
message = messages.take();
sendMessageToFreeswitch(message);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
};
msgSenderExec.execute(sender);
}
public void transferUserToMeeting(String voiceConfId,
String targetVoiceConfId, String voiceUserId) {
TransferUserToMeetingCommand tutmc = new TransferUserToMeetingCommand(
voiceConfId, targetVoiceConfId, voiceUserId, this.audioProfile,
USER);
queueMessage(tutmc);
}
public void getAllUsers(String voiceConfId) {
GetAllUsersCommand prc = new GetAllUsersCommand(voiceConfId, USER);
queueMessage(prc);
}
public void muteUser(String voiceConfId, String voiceUserId, Boolean mute) {
MuteUserCommand mpc = new MuteUserCommand(voiceConfId, voiceUserId, mute, USER);
queueMessage(mpc);
}
public void start() {
sendMessages = true;
Runnable sender = new Runnable() {
public void run() {
while (sendMessages) {
FreeswitchCommand message;
try {
message = messages.take();
sendMessageToFreeswitch(message);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
};
msgSenderExec.execute(sender);
}
public void eject(String voiceConfId, String voiceUserId) {
EjectUserCommand mpc = new EjectUserCommand(voiceConfId, voiceUserId, USER);
queueMessage(mpc);
}
public void getAllUsers(String voiceConfId) {
GetAllUsersCommand prc = new GetAllUsersCommand(voiceConfId, USER);
queueMessage(prc);
}
public void ejectAll(String voiceConfId) {
EjectAllUsersCommand mpc = new EjectAllUsersCommand(voiceConfId, USER);
queueMessage(mpc);
}
public void muteUser(String voiceConfId, String voiceUserId, Boolean mute) {
MuteUserCommand mpc = new MuteUserCommand(voiceConfId, voiceUserId, mute, USER);
queueMessage(mpc);
}
private Long genTimestamp() {
return TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
}
public void eject(String voiceConfId, String voiceUserId) {
EjectUserCommand mpc = new EjectUserCommand(voiceConfId, voiceUserId, USER);
queueMessage(mpc);
}
public void startRecording(String voiceConfId, String meetingid){
String RECORD_DIR = "/var/freeswitch/meetings";
String voicePath = RECORD_DIR + File.separatorChar + meetingid + "-" + genTimestamp() + ".wav";
public void ejectAll(String voiceConfId) {
EjectAllUsersCommand mpc = new EjectAllUsersCommand(voiceConfId, USER);
queueMessage(mpc);
}
RecordConferenceCommand rcc = new RecordConferenceCommand(voiceConfId, USER, true, voicePath);
queueMessage(rcc);
}
private Long genTimestamp() {
return TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
}
public void stopRecording(String voiceConfId, String meetingid, String voicePath){
RecordConferenceCommand rcc = new RecordConferenceCommand(voiceConfId, USER, false, voicePath);
queueMessage(rcc);
}
public void startRecording(String voiceConfId, String meetingid) {
String RECORD_DIR = "/var/freeswitch/meetings";
String voicePath = RECORD_DIR + File.separatorChar + meetingid + "-" + genTimestamp() + ".wav";
public void deskShareBroadcastRTMP(String voiceConfId, String streamUrl, String timestamp, Boolean broadcast){
DeskShareBroadcastRTMPCommand rtmp = new DeskShareBroadcastRTMPCommand(voiceConfId, USER, streamUrl, timestamp, broadcast);
queueMessage(rtmp);
}
RecordConferenceCommand rcc = new RecordConferenceCommand(voiceConfId, USER, true, voicePath);
queueMessage(rcc);
}
public void deskShareHangUp(String voiceConfId, String fsConferenceName, String timestamp){
DeskShareHangUpCommand huCmd = new DeskShareHangUpCommand(voiceConfId, fsConferenceName, USER, timestamp);
queueMessage(huCmd);
}
private void sendMessageToFreeswitch(final FreeswitchCommand command) {
Runnable task = new Runnable() {
public void run() {
if (command instanceof GetAllUsersCommand) {
GetAllUsersCommand cmd = (GetAllUsersCommand) command;
System.out.println("Sending PopulateRoomCommand for conference = [" + cmd.getRoom() + "]");
manager.getUsers(cmd);
} else if (command instanceof MuteUserCommand) {
MuteUserCommand cmd = (MuteUserCommand) command;
System.out.println("Sending MuteParticipantCommand for conference = [" + cmd.getRoom() + "]");
manager.mute(cmd);
} else if (command instanceof EjectUserCommand) {
EjectUserCommand cmd = (EjectUserCommand) command;
System.out.println("Sending EjectParticipantCommand for conference = [" + cmd.getRoom() + "]");
manager.eject(cmd);
} else if (command instanceof EjectAllUsersCommand) {
EjectAllUsersCommand cmd = (EjectAllUsersCommand) command;
System.out.println("Sending EjectAllUsersCommand for conference = [" + cmd.getRoom() + "]");
manager.ejectAll(cmd);
} else if (command instanceof TransferUserToMeetingCommand) {
TransferUserToMeetingCommand cmd = (TransferUserToMeetingCommand) command;
System.out.println("Sending TransferUsetToMeetingCommand for conference = ["
+ cmd.getRoom() + "]");
manager.tranfer(cmd);
} else if (command instanceof RecordConferenceCommand) {
manager.record((RecordConferenceCommand) command);
} else if (command instanceof DeskShareBroadcastRTMPCommand) {
manager.broadcastRTMP((DeskShareBroadcastRTMPCommand)command);
} else if (command instanceof DeskShareHangUpCommand) {
DeskShareHangUpCommand cmd = (DeskShareHangUpCommand) command;
manager.hangUp(cmd);
} else if (command instanceof BroadcastConferenceCommand) {
manager.broadcast((BroadcastConferenceCommand) command);
}
}
};
public void stopRecording(String voiceConfId, String meetingid, String voicePath) {
RecordConferenceCommand rcc = new RecordConferenceCommand(voiceConfId, USER, false, voicePath);
queueMessage(rcc);
}
runExec.execute(task);
}
public void deskShareBroadcastRTMP(String voiceConfId, String streamUrl, String timestamp, Boolean broadcast) {
DeskShareBroadcastRTMPCommand rtmp = new DeskShareBroadcastRTMPCommand(voiceConfId, USER, streamUrl, timestamp, broadcast);
queueMessage(rtmp);
}
public void stop() {
sendMessages = false;
}
public void deskShareHangUp(String voiceConfId, String fsConferenceName, String timestamp) {
DeskShareHangUpCommand huCmd = new DeskShareHangUpCommand(voiceConfId, fsConferenceName, USER, timestamp);
queueMessage(huCmd);
}
private void sendMessageToFreeswitch(final FreeswitchCommand command) {
Runnable task = new Runnable() {
public void run() {
if (command instanceof GetAllUsersCommand) {
GetAllUsersCommand cmd = (GetAllUsersCommand) command;
System.out.println("Sending PopulateRoomCommand for conference = [" + cmd.getRoom() + "]");
manager.getUsers(cmd);
} else if (command instanceof MuteUserCommand) {
MuteUserCommand cmd = (MuteUserCommand) command;
System.out.println("Sending MuteParticipantCommand for conference = [" + cmd.getRoom() + "]");
manager.mute(cmd);
} else if (command instanceof EjectUserCommand) {
EjectUserCommand cmd = (EjectUserCommand) command;
System.out.println("Sending EjectParticipantCommand for conference = [" + cmd.getRoom() + "]");
manager.eject(cmd);
} else if (command instanceof EjectAllUsersCommand) {
EjectAllUsersCommand cmd = (EjectAllUsersCommand) command;
System.out.println("Sending EjectAllUsersCommand for conference = [" + cmd.getRoom() + "]");
manager.ejectAll(cmd);
} else if (command instanceof TransferUserToMeetingCommand) {
TransferUserToMeetingCommand cmd = (TransferUserToMeetingCommand) command;
System.out.println("Sending TransferUsetToMeetingCommand for conference = ["
+ cmd.getRoom() + "]");
manager.tranfer(cmd);
} else if (command instanceof RecordConferenceCommand) {
manager.record((RecordConferenceCommand) command);
} else if (command instanceof DeskShareBroadcastRTMPCommand) {
manager.broadcastRTMP((DeskShareBroadcastRTMPCommand) command);
} else if (command instanceof DeskShareHangUpCommand) {
DeskShareHangUpCommand cmd = (DeskShareHangUpCommand) command;
manager.hangUp(cmd);
} else if (command instanceof BroadcastConferenceCommand) {
manager.broadcast((BroadcastConferenceCommand) command);
}
}
};
runExec.execute(task);
}
public void stop() {
sendMessages = false;
}
}

View File

@ -1,16 +1,13 @@
package org.bigbluebutton
import akka.actor.{ ActorSystem, Props }
import scala.concurrent.duration._
import redis.RedisClient
import scala.concurrent.{ Future, Await }
import scala.concurrent.ExecutionContext.Implicits.global
import akka.actor.{ ActorSystem }
import org.freeswitch.esl.client.manager.DefaultManagerConnection
import org.bigbluebutton.endpoint.redis.{ RedisPublisher, AppsRedisSubscriberActor }
import org.bigbluebutton.freeswitch.VoiceConferenceService
import org.bigbluebutton.endpoint.redis.{ AppsRedisSubscriberActor, RedisPublisher }
import org.bigbluebutton.freeswitch.{ RxJsonMsgHdlrActor, VoiceConferenceService }
import org.bigbluebutton.freeswitch.bus.InsonMsgBus
import org.bigbluebutton.freeswitch.voice.FreeswitchConferenceEventListener
import org.bigbluebutton.freeswitch.voice.freeswitch.{ ESLEventListener, ConnectionManager, FreeswitchApplication }
import org.bigbluebutton.freeswitch.voice.IVoiceConferenceService
import org.bigbluebutton.freeswitch.voice.freeswitch.{ ConnectionManager, ESLEventListener, FreeswitchApplication }
import org.bigbluebutton.freeswitch.pubsub.receivers.RedisMessageReceiver
object Boot extends App with SystemConfiguration {
@ -19,7 +16,7 @@ object Boot extends App with SystemConfiguration {
val redisPublisher = new RedisPublisher(system)
val eslConnection = new DefaultManagerConnection(eslHost, eslPort, eslPassword);
val eslConnection = new DefaultManagerConnection(eslHost, eslPort, eslPassword)
val voiceConfService = new VoiceConferenceService(redisPublisher)
@ -36,5 +33,10 @@ object Boot extends App with SystemConfiguration {
val redisMsgReceiver = new RedisMessageReceiver(fsApplication)
val redisSubscriberActor = system.actorOf(AppsRedisSubscriberActor.props(system, redisMsgReceiver), "redis-subscriber")
val inJsonMsgBus = new InsonMsgBus
val redisMessageHandlerActor = system.actorOf(RxJsonMsgHdlrActor.props(fsApplication))
inJsonMsgBus.subscribe(redisMessageHandlerActor, toFsAppsJsonChannel)
val redisSubscriberActor = system.actorOf(AppsRedisSubscriberActor.props(system, redisMsgReceiver, inJsonMsgBus), "redis-subscriber")
}

View File

@ -16,4 +16,8 @@ trait SystemConfiguration {
lazy val redisPort = Try(config.getInt("redis.port")).getOrElse(6379)
lazy val redisPassword = Try(config.getString("redis.password")).getOrElse("")
}
lazy val toVoiceConfRedisChannel = Try(config.getString("redis.toVoiceConfRedisChannel")).getOrElse("to-voice-conf-redis-channel")
lazy val fromVoiceConfRedisChannel = Try(config.getString("redis.fromVoiceConfRedisChannel")).getOrElse("from-voice-conf-redis-channel")
lazy val toFsAppsJsonChannel = Try(config.getString("eventBus.toFsAppsChannel")).getOrElse("to-fs-apps-json-channel")
lazy val fromFsAppsJsonChannel = Try(config.getString("eventBus.fromFsAppsChannel")).getOrElse("from-fs-apps-json-channel")
}

View File

@ -6,16 +6,13 @@ import java.net.InetSocketAddress
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.DurationInt
import org.bigbluebutton.SystemConfiguration
import org.bigbluebutton.common.converters.FromJsonDecoder
import org.bigbluebutton.common.messages.PubSubPongMessage
import org.bigbluebutton.freeswitch.pubsub.receivers.RedisMessageReceiver
import akka.actor.ActorSystem
import akka.actor.OneForOneStrategy
import akka.actor.Props
import akka.actor.SupervisorStrategy.Resume
import org.bigbluebutton.freeswitch.bus.{ InJsonMsg, InsonMsgBus, ReceivedJsonMsg }
import redis.actors.RedisSubscriberActor
import redis.api.pubsub.Message
import redis.api.pubsub.PMessage
@ -26,13 +23,14 @@ object AppsRedisSubscriberActor extends SystemConfiguration {
val channels = Seq("time")
val patterns = Seq("bigbluebutton:to-voice-conf:*", "bigbluebutton:from-bbb-apps:*")
def props(system: ActorSystem, msgReceiver: RedisMessageReceiver): Props =
Props(classOf[AppsRedisSubscriberActor], system, msgReceiver,
def props(system: ActorSystem, msgReceiver: RedisMessageReceiver, inJsonMgBus: InsonMsgBus): Props =
Props(classOf[AppsRedisSubscriberActor], system, msgReceiver, inJsonMgBus,
redisHost, redisPort,
channels, patterns).withDispatcher("akka.rediscala-subscriber-worker-dispatcher")
}
class AppsRedisSubscriberActor(val system: ActorSystem, msgReceiver: RedisMessageReceiver, redisHost: String,
class AppsRedisSubscriberActor(val system: ActorSystem, msgReceiver: RedisMessageReceiver,
inJsonMgBus: InsonMsgBus, redisHost: String,
redisPort: Int,
channels: Seq[String] = Nil, patterns: Seq[String] = Nil)
extends RedisSubscriberActor(new InetSocketAddress(redisHost, redisPort),
@ -48,7 +46,7 @@ class AppsRedisSubscriberActor(val system: ActorSystem, msgReceiver: RedisMessag
}
}
val decoder = new FromJsonDecoder()
// val decoder = new FromJsonDecoder()
var lastPongReceivedOn = 0L
system.scheduler.schedule(10 seconds, 10 seconds)(checkPongMessage())
@ -67,26 +65,17 @@ class AppsRedisSubscriberActor(val system: ActorSystem, msgReceiver: RedisMessag
def onMessage(message: Message) {
log.debug(s"message received: $message")
if (message.channel == toVoiceConfRedisChannel) {
val receivedJsonMessage = new ReceivedJsonMsg(message.channel, message.data.utf8String)
log.debug(s"RECEIVED:\n [${receivedJsonMessage.channel}] \n ${receivedJsonMessage.data} \n")
inJsonMgBus.publish(InJsonMsg(toFsAppsJsonChannel, receivedJsonMessage))
}
}
def onPMessage(pmessage: PMessage) {
// log.debug(s"pattern message received: $pmessage")
val msg = decoder.decodeMessage(pmessage.data.utf8String)
if (msg != null) {
msg match {
case m: PubSubPongMessage => {
if (m.payload.system == "BbbFsESL") {
lastPongReceivedOn = System.currentTimeMillis()
}
}
case _ => // do nothing
}
} else {
msgReceiver.handleMessage(pmessage.patternMatched, pmessage.channel, pmessage.data.utf8String)
}
msgReceiver.handleMessage(pmessage.patternMatched, pmessage.channel, pmessage.data.utf8String)
}
def handleMessage(msg: String) {

View File

@ -1,14 +1,8 @@
package org.bigbluebutton.endpoint.redis
import akka.actor.Props
import redis.RedisClient
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
import akka.actor.ActorSystem
import scala.concurrent.Await
import akka.actor.Actor
import org.bigbluebutton.SystemConfiguration
import org.bigbluebutton.common.converters.ToJsonEncoder
class RedisPublisher(val system: ActorSystem) extends SystemConfiguration {
@ -18,16 +12,8 @@ class RedisPublisher(val system: ActorSystem) extends SystemConfiguration {
// CLIENT LIST on redis-cli
redis.clientSetname("BbbFsEslAkkaPub")
val encoder = new ToJsonEncoder()
def sendPingMessage() {
val json = encoder.encodePubSubPingMessage("BbbFsESL", System.currentTimeMillis())
redis.publish("bigbluebutton:to-bbb-apps:system", json)
}
// system.scheduler.schedule(10 seconds, 10 seconds)(sendPingMessage())
def publish(channel: String, data: String) {
//println("PUBLISH TO [" + channel + "]: \n [" + data + "]")
println("PUBLISH TO [" + channel + "]: \n [" + data + "]")
redis.publish(channel, data)
}

View File

@ -0,0 +1,120 @@
package org.bigbluebutton.freeswitch
import com.fasterxml.jackson.databind.JsonNode
import org.bigbluebutton.common2.messages.voiceconf._
import org.bigbluebutton.common2.messages.{ BbbCoreEnvelope, Deserializer }
trait RxJsonMsgDeserializer {
this: RxJsonMsgHdlrActor =>
object JsonDeserializer extends Deserializer
def routeEjectAllFromVoiceConfMsg(envelope: BbbCoreEnvelope, jsonNode: JsonNode): Unit = {
def deserialize(jsonNode: JsonNode): Option[EjectAllFromVoiceConfMsg] = {
val (result, error) = JsonDeserializer.toBbbCommonMsg[EjectAllFromVoiceConfMsg](jsonNode)
result match {
case Some(msg) => Some(msg.asInstanceOf[EjectAllFromVoiceConfMsg])
case None =>
log.error("Failed to deserialize message: error: {} \n msg: {}", error, jsonNode)
None
}
}
for {
m <- deserialize(jsonNode)
} yield {
fsApp.ejectAll(m.body.voiceConf)
}
}
def routeEjectUserFromVoiceConfMsg(envelope: BbbCoreEnvelope, jsonNode: JsonNode): Unit = {
def deserialize(jsonNode: JsonNode): Option[EjectUserFromVoiceConfMsg] = {
val (result, error) = JsonDeserializer.toBbbCommonMsg[EjectUserFromVoiceConfMsg](jsonNode)
result match {
case Some(msg) => Some(msg.asInstanceOf[EjectUserFromVoiceConfMsg])
case None =>
log.error("Failed to deserialize message: error: {} \n msg: {}", error, jsonNode)
None
}
}
for {
m <- deserialize(jsonNode)
} yield {
fsApp.eject(m.body.voiceConf, m.body.voiceUserId)
}
}
def routeMuteUserInVoiceConfMsg(envelope: BbbCoreEnvelope, jsonNode: JsonNode): Unit = {
def deserialize(jsonNode: JsonNode): Option[MuteUserInVoiceConfMsg] = {
val (result, error) = JsonDeserializer.toBbbCommonMsg[MuteUserInVoiceConfMsg](jsonNode)
result match {
case Some(msg) => Some(msg.asInstanceOf[MuteUserInVoiceConfMsg])
case None =>
log.error("Failed to deserialize message: error: {} \n msg: {}", error, jsonNode)
None
}
}
for {
m <- deserialize(jsonNode)
} yield {
fsApp.eject(m.body.voiceConf, m.body.voiceUserId)
}
}
def routeTransferUserToVoiceConfMsg(envelope: BbbCoreEnvelope, jsonNode: JsonNode): Unit = {
def deserialize(jsonNode: JsonNode): Option[TransferUserToVoiceConfMsg] = {
val (result, error) = JsonDeserializer.toBbbCommonMsg[TransferUserToVoiceConfMsg](jsonNode)
result match {
case Some(msg) => Some(msg.asInstanceOf[TransferUserToVoiceConfMsg])
case None =>
log.error("Failed to deserialize message: error: {} \n msg: {}", error, jsonNode)
None
}
}
for {
m <- deserialize(jsonNode)
} yield {
fsApp.transferUserToMeeting(m.body.fromVoiceConf, m.body.toVoiceConf, m.body.voiceUserId)
}
}
def routeStartRecordingVoiceConfMsg(envelope: BbbCoreEnvelope, jsonNode: JsonNode): Unit = {
def deserialize(jsonNode: JsonNode): Option[StartRecordingVoiceConfMsg] = {
val (result, error) = JsonDeserializer.toBbbCommonMsg[StartRecordingVoiceConfMsg](jsonNode)
result match {
case Some(msg) => Some(msg.asInstanceOf[StartRecordingVoiceConfMsg])
case None =>
log.error("Failed to deserialize message: error: {} \n msg: {}", error, jsonNode)
None
}
}
for {
m <- deserialize(jsonNode)
} yield {
fsApp.startRecording(m.body.voiceConf, m.body.meetingId)
}
}
def routeStopRecordingVoiceConfMsg(envelope: BbbCoreEnvelope, jsonNode: JsonNode): Unit = {
def deserialize(jsonNode: JsonNode): Option[StopRecordingVoiceConfMsg] = {
val (result, error) = JsonDeserializer.toBbbCommonMsg[StopRecordingVoiceConfMsg](jsonNode)
result match {
case Some(msg) => Some(msg.asInstanceOf[StopRecordingVoiceConfMsg])
case None =>
log.error("Failed to deserialize message: error: {} \n msg: {}", error, jsonNode)
None
}
}
for {
m <- deserialize(jsonNode)
} yield {
fsApp.stopRecording(m.body.voiceConf, m.body.meetingId, m.body.stream)
}
}
}

View File

@ -0,0 +1,49 @@
package org.bigbluebutton.freeswitch
import akka.actor.{ Actor, ActorLogging, Props }
import com.fasterxml.jackson.databind.JsonNode
import org.bigbluebutton.SystemConfiguration
import org.bigbluebutton.common2.messages.BbbCoreEnvelope
import org.bigbluebutton.common2.messages.voiceconf._
import org.bigbluebutton.freeswitch.bus.ReceivedJsonMsg
import org.bigbluebutton.freeswitch.voice.freeswitch.FreeswitchApplication
object RxJsonMsgHdlrActor {
def props(fsApp: FreeswitchApplication): Props =
Props(classOf[RxJsonMsgHdlrActor], fsApp)
}
class RxJsonMsgHdlrActor(val fsApp: FreeswitchApplication) extends Actor with ActorLogging
with SystemConfiguration with RxJsonMsgDeserializer {
def receive = {
case msg: ReceivedJsonMsg =>
log.debug("handling {} - {}", msg.channel, msg.data)
handleReceivedJsonMessage(msg)
case _ => // do nothing
}
def handleReceivedJsonMessage(msg: ReceivedJsonMsg): Unit = {
for {
envJsonNode <- JsonDeserializer.toBbbCommonEnvJsNodeMsg(msg.data)
} yield handle(envJsonNode.envelope, envJsonNode.core)
}
def handle(envelope: BbbCoreEnvelope, jsonNode: JsonNode): Unit = {
log.debug("Route envelope name " + envelope.name)
envelope.name match {
case EjectAllFromVoiceConfMsg.NAME =>
routeEjectAllFromVoiceConfMsg(envelope, jsonNode)
case EjectUserFromVoiceConfMsg.NAME =>
routeEjectUserFromVoiceConfMsg(envelope, jsonNode)
case MuteUserInVoiceConfMsg.NAME =>
routeMuteUserInVoiceConfMsg(envelope, jsonNode)
case TransferUserToVoiceConfMsg.NAME =>
routeTransferUserToVoiceConfMsg(envelope, jsonNode)
case StartRecordingVoiceConfMsg.NAME =>
routeStartRecordingVoiceConfMsg(envelope, jsonNode)
case StopRecordingVoiceConfMsg.NAME =>
routeStopRecordingVoiceConfMsg(envelope, jsonNode)
case _ => // do nothing
}
}
}

View File

@ -1,5 +1,6 @@
package org.bigbluebutton.freeswitch
import org.bigbluebutton.SystemConfiguration
import org.bigbluebutton.freeswitch.voice.IVoiceConferenceService
import org.bigbluebutton.endpoint.redis.RedisPublisher
import org.bigbluebutton.common.messages.VoiceConfRecordingStartedMessage
@ -11,28 +12,64 @@ import org.bigbluebutton.common.messages.DeskShareStartedEventMessage
import org.bigbluebutton.common.messages.DeskShareStoppedEventMessage
import org.bigbluebutton.common.messages.DeskShareRTMPBroadcastStartedEventMessage
import org.bigbluebutton.common.messages.DeskShareRTMPBroadcastStoppedEventMessage
import org.bigbluebutton.common2.messages.{ BbbCommonEnvCoreMsg, BbbCoreEnvelope }
import org.bigbluebutton.common2.messages.voiceconf._
import org.bigbluebutton.common2.util.JsonUtil
class VoiceConferenceService(sender: RedisPublisher) extends IVoiceConferenceService {
class VoiceConferenceService(sender: RedisPublisher) extends IVoiceConferenceService with SystemConfiguration {
val FROM_VOICE_CONF_SYSTEM_CHAN = "bigbluebutton:from-voice-conf:system"
def voiceConfRecordingStarted(voiceConfId: String, recordStream: String, recording: java.lang.Boolean, timestamp: String) {
val msg = new VoiceConfRecordingStartedMessage(voiceConfId, recordStream, recording, timestamp)
sender.publish(FROM_VOICE_CONF_SYSTEM_CHAN, msg.toJson())
val header = BbbCoreVoiceConfHeader(RecordingStartedVoiceConfEvtMsg.NAME, voiceConfId)
val body = RecordingStartedVoiceConfEvtMsgBody(voiceConfId, recordStream, recording.booleanValue(), timestamp)
val envelope = BbbCoreEnvelope(RecordingStartedVoiceConfEvtMsg.NAME, Map("voiceConf" -> voiceConfId))
val msg = new RecordingStartedVoiceConfEvtMsg(header, body)
val msgEvent = BbbCommonEnvCoreMsg(envelope, msg)
val json = JsonUtil.toJson(msgEvent)
sender.publish(fromVoiceConfRedisChannel, json)
val oldmsg = new VoiceConfRecordingStartedMessage(voiceConfId, recordStream, recording, timestamp)
sender.publish(FROM_VOICE_CONF_SYSTEM_CHAN, oldmsg.toJson())
}
def userJoinedVoiceConf(voiceConfId: String, voiceUserId: String, userId: String, callerIdName: String,
callerIdNum: String, muted: java.lang.Boolean, talking: java.lang.Boolean, avatarURL: String) {
println("******** FreeswitchConferenceService received voiceUserJoined vui=[" +
userId + "] wui=[" + voiceUserId + "]")
val msg = new UserJoinedVoiceConfMessage(voiceConfId, voiceUserId, userId, callerIdName, callerIdNum, muted, talking, avatarURL)
sender.publish(FROM_VOICE_CONF_SYSTEM_CHAN, msg.toJson())
val header = BbbCoreVoiceConfHeader(UserJoinedVoiceConfEvtMsg.NAME, voiceConfId)
val body = UserJoinedVoiceConfEvtMsgBody(voiceConfId, voiceUserId, userId, callerIdName, callerIdNum,
muted.booleanValue(), talking.booleanValue(), avatarURL)
val envelope = BbbCoreEnvelope(UserJoinedVoiceConfEvtMsg.NAME, Map("voiceConf" -> voiceConfId))
val msg = new UserJoinedVoiceConfEvtMsg(header, body)
val msgEvent = BbbCommonEnvCoreMsg(envelope, msg)
val json = JsonUtil.toJson(msgEvent)
sender.publish(fromVoiceConfRedisChannel, json)
val oldmsg = new UserJoinedVoiceConfMessage(voiceConfId, voiceUserId, userId, callerIdName, callerIdNum, muted, talking, avatarURL)
sender.publish(FROM_VOICE_CONF_SYSTEM_CHAN, oldmsg.toJson())
}
def userLeftVoiceConf(voiceConfId: String, voiceUserId: String) {
println("******** FreeswitchConferenceService received voiceUserLeft vui=[" + voiceUserId + "] conference=[" + voiceConfId + "]")
val msg = new UserLeftVoiceConfMessage(voiceConfId, voiceUserId)
sender.publish(FROM_VOICE_CONF_SYSTEM_CHAN, msg.toJson())
val header = BbbCoreVoiceConfHeader(UserLeftVoiceConfEvtMsg.NAME, voiceConfId)
val body = UserLeftVoiceConfEvtMsgBody(voiceConfId, voiceUserId)
val envelope = BbbCoreEnvelope(UserLeftVoiceConfEvtMsg.NAME, Map("voiceConf" -> voiceConfId))
val msg = new UserLeftVoiceConfEvtMsg(header, body)
val msgEvent = BbbCommonEnvCoreMsg(envelope, msg)
val json = JsonUtil.toJson(msgEvent)
sender.publish(fromVoiceConfRedisChannel, json)
val oldmsg = new UserLeftVoiceConfMessage(voiceConfId, voiceUserId)
sender.publish(FROM_VOICE_CONF_SYSTEM_CHAN, oldmsg.toJson())
}
def userLockedInVoiceConf(voiceConfId: String, voiceUserId: String, locked: java.lang.Boolean) {
@ -41,38 +78,61 @@ class VoiceConferenceService(sender: RedisPublisher) extends IVoiceConferenceSer
def userMutedInVoiceConf(voiceConfId: String, voiceUserId: String, muted: java.lang.Boolean) {
println("******** FreeswitchConferenceService received voiceUserMuted vui=[" + voiceUserId + "] muted=[" + muted + "]")
val msg = new UserMutedInVoiceConfMessage(voiceConfId, voiceUserId, muted)
sender.publish(FROM_VOICE_CONF_SYSTEM_CHAN, msg.toJson())
val header = BbbCoreVoiceConfHeader(UserMutedInVoiceConfEvtMsg.NAME, voiceConfId)
val body = UserMutedInVoiceConfEvtMsgBody(voiceConfId, voiceUserId, muted.booleanValue())
val envelope = BbbCoreEnvelope(UserMutedInVoiceConfEvtMsg.NAME, Map("voiceConf" -> voiceConfId))
val msg = new UserMutedInVoiceConfEvtMsg(header, body)
val msgEvent = BbbCommonEnvCoreMsg(envelope, msg)
val json = JsonUtil.toJson(msgEvent)
sender.publish(fromVoiceConfRedisChannel, json)
val oldmsg = new UserMutedInVoiceConfMessage(voiceConfId, voiceUserId, muted)
sender.publish(FROM_VOICE_CONF_SYSTEM_CHAN, oldmsg.toJson())
}
def userTalkingInVoiceConf(voiceConfId: String, voiceUserId: String, talking: java.lang.Boolean) {
println("******** FreeswitchConferenceService received voiceUserTalking vui=[" + voiceUserId + "] talking=[" + talking + "]")
val msg = new UserTalkingInVoiceConfMessage(voiceConfId, voiceUserId, talking)
sender.publish(FROM_VOICE_CONF_SYSTEM_CHAN, msg.toJson())
val header = BbbCoreVoiceConfHeader(UserTalkingInVoiceConfEvtMsg.NAME, voiceConfId)
val body = UserTalkingInVoiceConfEvtMsgBody(voiceConfId, voiceUserId, talking.booleanValue())
val envelope = BbbCoreEnvelope(UserTalkingInVoiceConfEvtMsg.NAME, Map("voiceConf" -> voiceConfId))
val msg = new UserTalkingInVoiceConfEvtMsg(header, body)
val msgEvent = BbbCommonEnvCoreMsg(envelope, msg)
val json = JsonUtil.toJson(msgEvent)
sender.publish(fromVoiceConfRedisChannel, json)
val oldmsg = new UserTalkingInVoiceConfMessage(voiceConfId, voiceUserId, talking)
sender.publish(FROM_VOICE_CONF_SYSTEM_CHAN, oldmsg.toJson())
}
def deskShareStarted(voiceConfId: String, callerIdNum: String, callerIdName: String) {
println("******** FreeswitchConferenceService send deskShareStarted to BBB " + voiceConfId)
val msg = new DeskShareStartedEventMessage(voiceConfId, callerIdNum, callerIdName)
sender.publish(FROM_VOICE_CONF_SYSTEM_CHAN, msg.toJson())
val oldmsg = new DeskShareStartedEventMessage(voiceConfId, callerIdNum, callerIdName)
sender.publish(fromVoiceConfRedisChannel, oldmsg.toJson())
}
def deskShareEnded(voiceConfId: String, callerIdNum: String, callerIdName: String) {
println("******** FreeswitchConferenceService send deskShareStopped to BBB " + voiceConfId)
val msg = new DeskShareStoppedEventMessage(voiceConfId, callerIdNum, callerIdName)
sender.publish(FROM_VOICE_CONF_SYSTEM_CHAN, msg.toJson())
sender.publish(fromVoiceConfRedisChannel, msg.toJson())
}
def deskShareRTMPBroadcastStarted(voiceConfId: String, streamname: String, vw: java.lang.Integer, vh: java.lang.Integer, timestamp: String) {
println("******** FreeswitchConferenceService send deskShareRTMPBroadcastStarted to BBB " + voiceConfId)
val msg = new DeskShareRTMPBroadcastStartedEventMessage(voiceConfId, streamname, vw, vh, timestamp)
sender.publish(FROM_VOICE_CONF_SYSTEM_CHAN, msg.toJson())
sender.publish(fromVoiceConfRedisChannel, msg.toJson())
}
def deskShareRTMPBroadcastStopped(voiceConfId: String, streamname: String, vw: java.lang.Integer, vh: java.lang.Integer, timestamp: String) {
println("******** FreeswitchConferenceService send deskShareRTMPBroadcastStopped to BBB " + voiceConfId)
val msg = new DeskShareRTMPBroadcastStoppedEventMessage(voiceConfId, streamname, vw, vh, timestamp)
sender.publish(FROM_VOICE_CONF_SYSTEM_CHAN, msg.toJson())
sender.publish(fromVoiceConfRedisChannel, msg.toJson())
}
}

View File

@ -0,0 +1,31 @@
package org.bigbluebutton.freeswitch.bus
import akka.actor.ActorRef
import akka.event.{ EventBus, LookupClassification }
case class ReceivedJsonMsg(channel: String, data: String)
case class InJsonMsg(val topic: String, val payload: ReceivedJsonMsg)
class InsonMsgBus extends EventBus with LookupClassification {
type Event = InJsonMsg
type Classifier = String
type Subscriber = ActorRef
// is used for extracting the classifier from the incoming events
override protected def classify(event: Event): Classifier = event.topic
// will be invoked for each event for all subscribers which registered themselves
// for the events classifier
override protected def publish(event: Event, subscriber: Subscriber): Unit = {
subscriber ! event.payload
}
// must define a full order over the subscribers, expressed as expected from
// `java.lang.Comparable.compare`
override protected def compareSubscribers(a: Subscriber, b: Subscriber): Int =
a.compareTo(b)
// determines the initial size of the index data structure
// used internally (i.e. the expected number of different classifiers)
override protected def mapSize: Int = 128
}

View File

@ -1,6 +1,7 @@
package org.bigbluebutton.client.meeting
import akka.actor.{Actor, ActorLogging, Props}
import org.bigbluebutton.client.SystemConfiguration
import org.bigbluebutton.client.bus._
import org.bigbluebutton.common2.messages.{BbbCommonEnvJsNodeMsg, MessageTypes}
@ -11,7 +12,9 @@ object MeetingActor {
}
class MeetingActor(val meetingId: String, msgToAkkaAppsEventBus: MsgToAkkaAppsEventBus,
msgToClientEventBus: MsgToClientEventBus) extends Actor with ActorLogging {
msgToClientEventBus: MsgToClientEventBus)
extends Actor with ActorLogging
with SystemConfiguration{
private val userMgr = new UsersManager
@ -69,7 +72,7 @@ class MeetingActor(val meetingId: String, msgToAkkaAppsEventBus: MsgToAkkaAppsEv
log.debug("**** MeetingActor handleServerMsg " + msg.envelope.name)
msgType match {
case MessageTypes.DIRECT => handleDirectMessage(msg)
case MessageTypes.BROADCAST => handleBroadcastMessage(msg)
case MessageTypes.BROADCAST_TO_MEETING => handleBroadcastMessage(msg)
case MessageTypes.SYSTEM => handleSystemMessage(msg)
}
}
@ -93,7 +96,7 @@ class MeetingActor(val meetingId: String, msgToAkkaAppsEventBus: MsgToAkkaAppsEv
def handleBroadcastMessage(msg: BbbCommonEnvJsNodeMsg): Unit = {
// In case we want to handle specific messages. We can do it here.
forwardToUser(msg)
msgToClientEventBus.publish(MsgToClientBusMsg(toClientChannel, BroadcastMsgToMeeting(meetingId, msg)))
}
def handleSystemMessage(msg: BbbCommonEnvJsNodeMsg): Unit = {

View File

@ -71,7 +71,7 @@ class MeetingManagerActor(msgToAkkaAppsEventBus: MsgToAkkaAppsEventBus,
log.debug("**** MeetingManagerActor handleServerMsg " + msg.envelope.name)
msgType match {
case MessageTypes.DIRECT => handleDirectMessage(msg)
case MessageTypes.BROADCAST => handleBroadcastMessage(msg)
case MessageTypes.BROADCAST_TO_MEETING => handleBroadcastMessage(msg)
case MessageTypes.SYSTEM => handleSystemMessage(msg)
}
}

View File

@ -108,7 +108,7 @@ class UserActor(val userId: String,
log.debug("**** UserActor handleServerMsg " + msg)
msgType match {
case MessageTypes.DIRECT => handleDirectMessage(msg)
case MessageTypes.BROADCAST => handleBroadcastMessage(msg)
case MessageTypes.BROADCAST_TO_MEETING => handleBroadcastMessage(msg)
case MessageTypes.SYSTEM => handleSystemMessage(msg)
}
}

View File

@ -10,7 +10,7 @@ object MessageBody {
extUserId: String, authToken: String, avatarURL: String,
guest: Boolean, authed: Boolean)
case class ValidateAuthTokenRespMsgBody(userId: String, authToken: String, valid: Boolean)
case class UserJoinReqMsgBody(userId: String, authToken: String)
case class UserLeaveReqMsgBody(userId: String, sessionId: String)
case class GetUsersReqMsgBody(requesterId: String)
case class UserEmojiStatusChangeReqMsgBody(userId: String, emoji: String)

View File

@ -5,14 +5,15 @@ import org.bigbluebutton.common2.messages.MessageBody._
object MessageTypes {
val DIRECT = "DIRECT"
val BROADCAST = "BROADCAST"
val BROADCAST_TO_MEETING = "BROADCAST_TO_MEETING" // Send to all clients in the meeting
val BROADCAST_TO_ALL = "BROADCAST_TO_ALL" // Send to all clients
val SYSTEM = "SYSTEM"
}
// seal trait to force all classes that extends this trait to be defined in this file.
sealed trait BbbCoreMsg
trait BbbCoreMsg
sealed trait BbbCommonMsg
sealed trait BbbCoreHeader
trait BbbCoreHeader
case class RoutingEnvelope(msgType: String, meetingId: String, userId: String)
case class BbbMsgToClientEnvelope(name: String, routing: RoutingEnvelope)
@ -41,8 +42,9 @@ case class ValidateAuthTokenReqMsg(header: BbbClientMsgHeader,
body: ValidateAuthTokenReqMsgBody) extends BbbCoreMsg
object UserJoinReqMsg { val NAME = "UserJoinReqMsg" }
case class UserJoinReqMsg(header: BbbClientMsgHeader, body: UserJoinReqMsgBody) extends BbbCoreMsg
object UserJoinMeetingReqMsg { val NAME = "UserJoinMeetingReqMsg" }
case class UserJoinMeetingReqMsg(header: BbbClientMsgHeader, body: UserJoinMeetingReqMsgBody) extends BbbCoreMsg
case class UserJoinMeetingReqMsgBody(userId: String, authToken: String)
object UserLeaveReqMsg { val NAME = "UserLeaveReqMsg" }
case class UserLeaveReqMsg(header: BbbClientMsgHeader, body: UserLeaveReqMsgBody) extends BbbCoreMsg
@ -56,6 +58,8 @@ case class UserBroadcastCamStartMsg(header: BbbClientMsgHeader, body: UserBroadc
object UserBroadcastCamStopMsg { val NAME = "UserBroadcastCamStopMsg"}
case class UserBroadcastCamStopMsg(header: BbbClientMsgHeader, body: UserBroadcastCamStopMsgBody) extends BbbCoreMsg
/** Event messages sent by Akka apps as result of receiving incoming messages ***/
object MeetingCreatedEvtMsg { val NAME = "MeetingCreatedEvtMsg"}
case class MeetingCreatedEvtMsg(header: BbbCoreBaseHeader,
@ -65,6 +69,13 @@ object ValidateAuthTokenRespMsg { val NAME = "ValidateAuthTokenRespMsg" }
case class ValidateAuthTokenRespMsg(header: BbbClientMsgHeader,
body: ValidateAuthTokenRespMsgBody) extends BbbCoreMsg
object UserJoinedMeetingEvtMsg { val NAME = "UserJoinedMeetingEvtMsg" }
case class UserJoinedMeetingEvtMsg(header: BbbClientMsgHeader,
body: UserJoinedMeetingEvtMsgBody) extends BbbCoreMsg
case class UserJoinedMeetingEvtMsgBody(intId: String, extId: String, name: String, role: String,
guest: Boolean, authed: Boolean, waitingForAcceptance: Boolean, emoji: String,
presenter: Boolean, locked: Boolean, avatar: String)
object UserBroadcastCamStartedEvtMsg { val NAME = "UserBroadcastCamStartedEvtMsg" }
case class UserBroadcastCamStartedEvtMsg(header: BbbClientMsgHeader, body: UserBroadcastCamStartedEvtMsgBody) extends BbbCoreMsg

View File

@ -0,0 +1,113 @@
package org.bigbluebutton.common2.messages.voiceconf
import org.bigbluebutton.common2.messages.{BbbClientMsgHeader, BbbCoreHeader, BbbCoreHeaderWithMeetingId, BbbCoreMsg}
/*** Message from Akka Apps to FS Conference ***/
object EjectAllFromVoiceConfMsg { val NAME = "EjectAllFromVoiceConfMsg" }
case class EjectAllFromVoiceConfMsg(header: BbbCoreHeaderWithMeetingId,
body: EjectAllFromVoiceConfMsgBody) extends BbbCoreMsg
case class EjectAllFromVoiceConfMsgBody(voiceConf: String)
object EjectUserFromVoiceConfMsg { val NAME = "EjectUserFromVoiceConfMsg"}
case class EjectUserFromVoiceConfMsg(header: BbbCoreHeaderWithMeetingId,
body: EjectUserFromVoiceConfMsgBody) extends BbbCoreMsg
case class EjectUserFromVoiceConfMsgBody(voiceConf: String, voiceUserId: String)
object MuteUserInVoiceConfMsg { val NAME = "MuteUserInVoiceConfMsg" }
case class MuteUserInVoiceConfMsg(header: BbbCoreHeaderWithMeetingId,
body: MuteUserInVoiceConfMsgBody) extends BbbCoreMsg
case class MuteUserInVoiceConfMsgBody(voiceConf: String, voiceUserId: String, mute: Boolean)
object TransferUserToVoiceConfMsg { val NAME = "TransferUserToVoiceConfMsg" }
case class TransferUserToVoiceConfMsg(header: BbbCoreHeaderWithMeetingId,
body: TransferUserToVoiceConfMsgBody) extends BbbCoreMsg
case class TransferUserToVoiceConfMsgBody(fromVoiceConf: String, toVoiceConf: String, voiceUserId: String)
object StartRecordingVoiceConfMsg { val NAME = "StartRecordingVoiceConfMsg" }
case class StartRecordingVoiceConfMsg(header: BbbCoreHeaderWithMeetingId,
body: StartRecordingVoiceConfMsgBody) extends BbbCoreMsg
case class StartRecordingVoiceConfMsgBody(voiceConf: String, meetingId: String)
object StopRecordingVoiceConfMsg { val NAME = "StopRecordingVoiceConfMsg" }
case class StopRecordingVoiceConfMsg(header: BbbCoreHeaderWithMeetingId,
body: StopRecordingVoiceConfMsgBody) extends BbbCoreMsg
case class StopRecordingVoiceConfMsgBody(voiceConf: String, meetingId: String, stream: String)
object DeskshareStartRtmpBroadcastVoiceConfMsg { val NAME = "DeskshareStartRtmpBroadcastVoiceConfMsg" }
case class DeskshareStartRtmpBroadcastVoiceConfMsg(header: BbbCoreHeaderWithMeetingId,
body: DeskshareStartRtmpBroadcastVoiceConfMsgBody) extends BbbCoreMsg
case class DeskshareStartRtmpBroadcastVoiceConfMsgBody(voiceConf: String, deskshareConf: String, url: String, timestamp: String)
object DeskshareStopRtmpBroadcastVoiceConfMsg { val NAME = "DeskshareStopRtmpBroadcastVoiceConfMsg" }
case class DeskshareStopRtmpBroadcastVoiceConfMsg(header: BbbCoreHeaderWithMeetingId,
body: DeskshareStopRtmpBroadcastVoiceConfMsgBody) extends BbbCoreMsg
case class DeskshareStopRtmpBroadcastVoiceConfMsgBody(voiceConf: String, deskshareConf: String, url: String, timestamp: String)
object DeskshareHangUpVoiceConfMsg { val NAME = "DeskshareHangUpVoiceConfMsg" }
case class DeskshareHangUpVoiceConfMsg(header: BbbCoreHeaderWithMeetingId,
body: DeskshareHangUpVoiceConfMsgBody) extends BbbCoreMsg
case class DeskshareHangUpVoiceConfMsgBody(voiceConf: String, deskshareConf: String, timestamp: String)
/*** Message from FS Conference to Akka Apps ***/
case class BbbCoreVoiceConfHeader(name: String, voiceConf: String) extends BbbCoreHeader
object RecordingStartedVoiceConfEvtMsg { val NAME = "RecordingStartedVoiceConfEvtMsg" }
case class RecordingStartedVoiceConfEvtMsg(header: BbbCoreVoiceConfHeader,
body: RecordingStartedVoiceConfEvtMsgBody) extends BbbCoreMsg
case class RecordingStartedVoiceConfEvtMsgBody(voiceConf: String, stream: String, recording: Boolean, timestamp: String)
object UserJoinedVoiceConfEvtMsg { val NAME = "UserJoinedVoiceConfEvtMsg" }
case class UserJoinedVoiceConfEvtMsg(header: BbbCoreVoiceConfHeader,
body: UserJoinedVoiceConfEvtMsgBody) extends BbbCoreMsg
case class UserJoinedVoiceConfEvtMsgBody(voiceConf: String, voiceUserId: String, intId: String,
callerIdName: String, callerIdNum: String, muted: Boolean,
talking: Boolean, callingWith: String)
object UserLeftVoiceConfEvtMsg { val NAME = "UserLeftVoiceConfEvtMsg" }
case class UserLeftVoiceConfEvtMsg(header: BbbCoreVoiceConfHeader,
body: UserLeftVoiceConfEvtMsgBody) extends BbbCoreMsg
case class UserLeftVoiceConfEvtMsgBody(voiceConf: String, voiceUserId: String)
object UserMutedInVoiceConfEvtMsg { val NAME = "UserMutedInVoiceConfEvtMsg" }
case class UserMutedInVoiceConfEvtMsg(header: BbbCoreVoiceConfHeader,
body: UserMutedInVoiceConfEvtMsgBody) extends BbbCoreMsg
case class UserMutedInVoiceConfEvtMsgBody(voiceConf: String, voiceUserId: String, muted: Boolean)
object UserTalkingInVoiceConfEvtMsg { val NAME = "UserTalkingInVoiceConfEvtMsg" }
case class UserTalkingInVoiceConfEvtMsg(header: BbbCoreVoiceConfHeader,
body: UserTalkingInVoiceConfEvtMsgBody) extends BbbCoreMsg
case class UserTalkingInVoiceConfEvtMsgBody(voiceConf: String, voiceUserId: String, talking: Boolean)
object DeskshareStartedVoiceConfEvtMsg { val NAME = "DeskshareStartedVoiceConfEvtMsg" }
case class DeskshareStartedVoiceConfEvtMsg(header: BbbCoreVoiceConfHeader,
body: DeskshareStartedVoiceConfEvtMsgBody) extends BbbCoreMsg
case class DeskshareStartedVoiceConfEvtMsgBody(voiceConf: String, deskshareConf: String,
callerIdNum: String, callerIdName: String)
object DeskshareStoppedVoiceConfEvtMsg { val NAME = "DeskshareStoppedVoiceConfEvtMsg"}
case class DeskshareStoppedVoiceConfEvtMsg(header: BbbCoreVoiceConfHeader,
body: DeskshareStoppedVoiceConfEvtMsgBody) extends BbbCoreMsg
case class DeskshareStoppedVoiceConfEvtMsgBody(voiceConf: String, deskshareConf: String,
callerIdNum: String, callerIdName: String)
object DeskshareRtmpBroadcastStartedVoiceConfEvtMsg { val NAME = "DeskshareRtmpBroadcastStartedVoiceConfEvtMsg"}
case class DeskshareRtmpBroadcastStartedVoiceConfEvtMsg(header: BbbCoreVoiceConfHeader,
body: DeskshareRtmpBroadcastStartedVoiceConfEvtMsgBody) extends BbbCoreMsg
case class DeskshareRtmpBroadcastStartedVoiceConfEvtMsgBody(voiceConf: String, deskshareConf: String,
stream: String, vidWidth: String, vidHeight: String,
timestamp: String)
object DeskshareRtmpBroadcastStoppedVoiceConfEvtMsg { val NAME = "DeskshareRtmpBroadcastStoppedVoiceConfEvtMsg"}
case class DeskshareRtmpBroadcastStoppedVoiceConfEvtMsg(header: BbbCoreVoiceConfHeader,
body: DeskshareRtmpBroadcastStoppedVoiceConfEvtMsgBody) extends BbbCoreMsg
case class DeskshareRtmpBroadcastStoppedVoiceConfEvtMsgBody(voiceConf: String, deskshareConf: String,
stream: String, vidWidth: String, vidHeight: String,
timestamp: String)
/*** Message going to clients from Akka Apps ***/
object UserJoinedVoiceConfToClientEvtMsg { val NAME = "UserJoinedVoiceConfToClientEvtMsg" }
case class UserJoinedVoiceConfToClientEvtMsg(header: BbbClientMsgHeader, body: UserJoinedVoiceConfToClientEvtMsgBody) extends BbbCoreMsg
case class UserJoinedVoiceConfToClientEvtMsgBody(intId: String, voiceUserId: String, callerName: String,
callerNum: String, muted: Boolean,
talking: Boolean, callingWith: String, listenOnly: Boolean)

View File

@ -119,6 +119,8 @@ public class ConnectionInvokerService implements IConnectionInvokerService {
handleDisconnectAllMessage((DisconnectAllMessage) message);
} else if (message instanceof DirectToClientMsg) {
handleDirectToClientMsg((DirectToClientMsg) message);
} else if (message instanceof BroadcastToMeetingMsg) {
handleBroadcastToMeetingMsg((BroadcastToMeetingMsg) message);
}
}
@ -178,6 +180,51 @@ public class ConnectionInvokerService implements IConnectionInvokerService {
}
}
private void handleBroadcastToMeetingMsg(final BroadcastToMeetingMsg msg) {
if (log.isTraceEnabled()) {
log.trace("Handle broadcast message: " + msg.messageName + " msg=" + msg.json);
}
Runnable sender = new Runnable() {
public void run() {
IScope meetingScope = getScope(msg.meetingId);
if (meetingScope != null) {
List<Object> params = new ArrayList<Object>();
params.add(msg.messageName);
params.add(msg.json);
if (log.isTraceEnabled()) {
log.trace("Broadcast message: " + msg.messageName + " msg=" + msg.json);
}
ServiceUtils.invokeOnAllScopeConnections(meetingScope, "onMessageFromServer2x", params.toArray(), null);
}
}
};
/**
* We need to add a way to cancel sending when the thread is blocked.
* Red5 uses a semaphore to guard the rtmp connection and we've seen
* instances where our thread is blocked preventing us from sending messages
* to other connections. (ralam nov 19, 2015)
*/
long endNanos = System.nanoTime() + SEND_TIMEOUT;
Future<?> f = runExec.submit(sender);
try {
// Only wait for the remaining time budget
long timeLeft = endNanos - System.nanoTime();
f.get(timeLeft, TimeUnit.NANOSECONDS);
} catch (ExecutionException e) {
log.warn("ExecutionException while sending broadcast message: " + msg.messageName + " msg=" + msg.json);
} catch (InterruptedException e) {
log.warn("Interrupted exception while sending broadcast message: " + msg.messageName + " msg=" + msg.json);
Thread.currentThread().interrupt();
} catch (TimeoutException e) {
log.warn("Timeout exception while sending broadcast message: " + msg.messageName + " msg=" + msg.json);
f.cancel(true);
}
}
private void handleDisconnectAllMessage(DisconnectAllMessage msg) {
IScope meetingScope = bbbAppScope.getContext().resolveScope("bigbluebutton");

View File

@ -0,0 +1,130 @@
package org.bigbluebutton.core.model.users
{
import mx.collections.ArrayCollection;
import org.bigbluebutton.core.model.Me;
import org.bigbluebutton.core.vo.UserVO;
import org.bigbluebutton.core.vo.VoiceUserVO;
public class Users
{
private var _users:ArrayCollection = new ArrayCollection();
private function add(user: UserVO):void {
_users.addItem(user);
}
private function remove(userId: String):UserVO {
var index:int = getIndex(userId);
if (index >= 0) {
return _users.removeItemAt(index) as UserVO;
}
return null;
}
private function getUserAndIndex(userId: String):Object {
var user:User;
for (var i:int = 0; i < _users.length; i++) {
user = _users.getItemAt(i) as User;
if (user.id == userId) {
return {index:i, user:user};;
}
}
return null;
}
private function getUser(userId:String):UserVO {
var user:UserVO;
for (var i:int = 0; i < _users.length; i++) {
user = _users.getItemAt(i) as UserVO;
if (user.id == userId) {
return user;
}
}
return null;
}
private function getIndex(userId: String):int {
var user:UserVO;
for (var i:int = 0; i < _users.length; i++) {
user = _users.getItemAt(i) as UserVO;
if (user.id == userId) {
return i;
}
}
return -1;
}
public function userJoinedVoice(vu: VoiceUserVO):UserVO {
var user: UserVO = getUser(vu.webId) as UserVO;
if (user != null) {
user.voiceUser = vu;
return user.copy();
}
return null;
}
public function userLeftVoice(vu: VoiceUserVO):UserVO {
var user:UserVO = getUser(vu.webId) as UserVO;
if (user != null) {
user.voiceUser = vu;
return user.copy();
}
return null;
}
public function userJoined(vu: UserVO):UserVO {
add(vu);
return vu.copy();
}
public function userLeft(vu: UserVO):UserVO {
var user: UserVO = remove(vu.id);
if (user != null) {
return user.copy();
}
return null;
}
public function userMuted(userId: String, voiceId: String, muted: Boolean):UserVO {
var user: UserVO = getUser(userId) as UserVO;
if (user != null) {
user.voiceUser.muted = muted;
return user.copy();
}
return null;
}
public function userTalking(userId: String, voiceId: String, talking: Boolean):UserVO {
var user: UserVO = getUser(userId) as UserVO;
if (user != null) {
user.voiceUser.talking = talking;
return user.copy();
}
return null;
}
// private function get users():ArrayCollection {
// var us:ArrayCollection = new ArrayCollection();
// for (var i:int = 0; i < _users.length; i++) {
//
// }
// }
}
}

View File

@ -0,0 +1,17 @@
package org.bigbluebutton.core.model.users
{
public class User2x {
var intId: String;
var extId: String;
var name: String;
var role: String;
var guest: Boolean;
var authed: Boolean;
var waitingForAcceptance: Boolean;
var emoji: String;
var locked: Boolean;
var presenter: Boolean;
var avatar: String;
}
}

View File

@ -376,9 +376,12 @@ class ApiController {
boolean redirectImm = parseBoolean(params.redirectImmediately)
String internalUserID = RandomStringUtils.randomAlphanumeric(12).toLowerCase()
// We preprend "w_" to our internal meeting Id to indicate that this is a web user.
// For users joining using the phone, we will prepend "v_" so it will be easier
// to distinguish users who doesn't have a web client. (ralam june 12, 2017)
String internalUserID = "w_" + RandomStringUtils.randomAlphanumeric(12).toLowerCase()
String authToken = RandomStringUtils.randomAlphanumeric(12).toLowerCase()
String authToken = RandomStringUtils.randomAlphanumeric(12).toLowerCase()
String sessionToken = RandomStringUtils.randomAlphanumeric(16).toLowerCase()