Merge branch 'paultrudel-analytics-actor-for-meeting-info' into develop
This commit is contained in:
commit
8ff7d45ac9
@ -3,8 +3,11 @@ package org.bigbluebutton
|
||||
import akka.http.scaladsl.model._
|
||||
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
|
||||
import akka.http.scaladsl.server.Directives._
|
||||
import org.bigbluebutton.service.{ HealthzService, PubSubReceiveStatus, PubSubSendStatus, RecordingDBSendStatus }
|
||||
import spray.json.DefaultJsonProtocol
|
||||
import org.bigbluebutton.common2.msgs._
|
||||
import org.bigbluebutton.service.{ HealthzService, MeetingInfoService, PubSubReceiveStatus, PubSubSendStatus, RecordingDBSendStatus }
|
||||
import spray.json._
|
||||
import scala.concurrent._
|
||||
import ExecutionContext.Implicits.global
|
||||
|
||||
case class HealthResponse(
|
||||
isHealthy: Boolean,
|
||||
@ -13,14 +16,56 @@ case class HealthResponse(
|
||||
recordingDbStatus: RecordingDBSendStatus
|
||||
)
|
||||
|
||||
trait JsonSupportProtocol extends SprayJsonSupport with DefaultJsonProtocol {
|
||||
case class MeetingInfoResponse(
|
||||
meetingInfoResponse: Option[MeetingInfoAnalytics]
|
||||
)
|
||||
|
||||
case class MeetingInfoAnalytics(
|
||||
name: String,
|
||||
externalId: String,
|
||||
internalId: String,
|
||||
hasUserJoined: Boolean,
|
||||
isMeetingRecorded: Boolean,
|
||||
webcams: Webcam,
|
||||
audio: Audio,
|
||||
screenshare: Screenshare,
|
||||
users: List[Participant],
|
||||
presentation: PresentationInfo,
|
||||
breakoutRoom: BreakoutRoom
|
||||
)
|
||||
|
||||
trait JsonSupportProtocolHealthResponse extends SprayJsonSupport with DefaultJsonProtocol {
|
||||
implicit val pubSubSendStatusJsonFormat = jsonFormat2(PubSubSendStatus)
|
||||
implicit val pubSubReceiveStatusJsonFormat = jsonFormat2(PubSubReceiveStatus)
|
||||
implicit val recordingDbStatusJsonFormat = jsonFormat2(RecordingDBSendStatus)
|
||||
implicit val healthServiceJsonFormat = jsonFormat4(HealthResponse)
|
||||
}
|
||||
|
||||
class ApiService(healthz: HealthzService) extends JsonSupportProtocol {
|
||||
trait JsonSupportProtocolMeetingInfoResponse extends SprayJsonSupport with DefaultJsonProtocol {
|
||||
implicit val meetingInfoUserJsonFormat = jsonFormat2(User)
|
||||
implicit val meetingInfoBroadcastJsonFormat = jsonFormat3(Broadcast)
|
||||
implicit val meetingInfoWebcamStreamJsonFormat = jsonFormat2(WebcamStream)
|
||||
implicit val meetingInfoWebcamJsonFormat = jsonFormat2(Webcam)
|
||||
|
||||
implicit val meetingInfoListenOnlyAudioJsonFormat = jsonFormat2(ListenOnlyAudio)
|
||||
implicit val meetingInfoTwoWayAudioJsonFormat = jsonFormat2(TwoWayAudio)
|
||||
implicit val meetingInfoPhoneAudioJsonFormat = jsonFormat2(PhoneAudio)
|
||||
implicit val meetingInfoAudioJsonFormat = jsonFormat4(Audio)
|
||||
|
||||
implicit val meetingInfoScreenshareStreamJsonFormat = jsonFormat2(ScreenshareStream)
|
||||
implicit val meetingInfoScreenshareJsonFormat = jsonFormat1(Screenshare)
|
||||
|
||||
implicit val meetingInfoPresentationInfoJsonFormat = jsonFormat2(PresentationInfo)
|
||||
implicit val meetingInfoBreakoutRoomJsonFormat = jsonFormat2(BreakoutRoom)
|
||||
|
||||
implicit val meetingInfoParticipantJsonFormat = jsonFormat3(Participant)
|
||||
implicit val meetingInfoAnalyticsJsonFormat = jsonFormat11(MeetingInfoAnalytics)
|
||||
implicit val meetingInfoResponseJsonFormat = jsonFormat1(MeetingInfoResponse)
|
||||
}
|
||||
|
||||
class ApiService(healthz: HealthzService, meetingInfoz: MeetingInfoService)
|
||||
extends JsonSupportProtocolHealthResponse
|
||||
with JsonSupportProtocolMeetingInfoResponse {
|
||||
|
||||
def routes =
|
||||
path("healthz") {
|
||||
@ -51,5 +96,33 @@ class ApiService(healthz: HealthzService) extends JsonSupportProtocol {
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} ~
|
||||
path("analytics") {
|
||||
parameter('meetingId.as[String]) { meetingId =>
|
||||
get {
|
||||
val meetingAnalyticsFuture = meetingInfoz.getAnalytics(meetingId)
|
||||
val entityFuture = meetingAnalyticsFuture.map { resp =>
|
||||
resp.optionMeetingInfoAnalytics match {
|
||||
case Some(_) =>
|
||||
HttpEntity(ContentTypes.`application/json`, resp.optionMeetingInfoAnalytics.get.toJson.prettyPrint)
|
||||
case None =>
|
||||
HttpEntity(ContentTypes.`application/json`, s"""{ "message": "No active meeting with ID $meetingId"}""".parseJson.prettyPrint)
|
||||
}
|
||||
}
|
||||
complete(entityFuture)
|
||||
}
|
||||
} ~
|
||||
get {
|
||||
val future = meetingInfoz.getAnalytics()
|
||||
val entityFuture = future.map { res =>
|
||||
res.optionMeetingsInfoAnalytics match {
|
||||
case Some(_) =>
|
||||
HttpEntity(ContentTypes.`application/json`, res.optionMeetingsInfoAnalytics.get.toJson.prettyPrint)
|
||||
case None =>
|
||||
HttpEntity(ContentTypes.`application/json`, """{ "message": "No active meetings"}""".parseJson.prettyPrint)
|
||||
}
|
||||
}
|
||||
complete(entityFuture)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -13,7 +13,7 @@ import org.bigbluebutton.core2.FromAkkaAppsMsgSenderActor
|
||||
import org.bigbluebutton.endpoint.redis.AppsRedisSubscriberActor
|
||||
import org.bigbluebutton.endpoint.redis.RedisRecorderActor
|
||||
import org.bigbluebutton.common2.bus.IncomingJsonMessageBus
|
||||
import org.bigbluebutton.service.HealthzService
|
||||
import org.bigbluebutton.service.{ HealthzService, MeetingInfoActor, MeetingInfoService }
|
||||
|
||||
object Boot extends App with SystemConfiguration {
|
||||
|
||||
@ -40,10 +40,18 @@ object Boot extends App with SystemConfiguration {
|
||||
)
|
||||
|
||||
val msgSender = new MessageSender(redisPublisher)
|
||||
val bbbMsgBus = new BbbMsgRouterEventBus
|
||||
|
||||
val healthzService = HealthzService(system)
|
||||
|
||||
val apiService = new ApiService(healthzService)
|
||||
val meetingInfoActorRef = system.actorOf(MeetingInfoActor.props())
|
||||
|
||||
outBus2.subscribe(meetingInfoActorRef, outBbbMsgMsgChannel)
|
||||
bbbMsgBus.subscribe(meetingInfoActorRef, analyticsChannel)
|
||||
|
||||
val meetingInfoService = MeetingInfoService(system, meetingInfoActorRef)
|
||||
|
||||
val apiService = new ApiService(healthzService, meetingInfoService)
|
||||
|
||||
val redisRecorderActor = system.actorOf(
|
||||
RedisRecorderActor.props(system, redisConfig, healthzService),
|
||||
@ -53,8 +61,6 @@ object Boot extends App with SystemConfiguration {
|
||||
recordingEventBus.subscribe(redisRecorderActor, outMessageChannel)
|
||||
val incomingJsonMessageBus = new IncomingJsonMessageBus
|
||||
|
||||
val bbbMsgBus = new BbbMsgRouterEventBus
|
||||
|
||||
val fromAkkaAppsMsgSenderActorRef = system.actorOf(FromAkkaAppsMsgSenderActor.props(msgSender))
|
||||
|
||||
val analyticsActorRef = system.actorOf(AnalyticsActor.props(analyticsIncludeChat))
|
||||
|
@ -14,6 +14,7 @@ object VoiceUsers {
|
||||
def findAll(users: VoiceUsers): Vector[VoiceUserState] = users.toVector
|
||||
|
||||
def findAllNonListenOnlyVoiceUsers(users: VoiceUsers): Vector[VoiceUserState] = users.toVector.filter(u => u.listenOnly == false)
|
||||
def findAllListenOnlyVoiceUsers(users: VoiceUsers): Vector[VoiceUserState] = users.toVector.filter(u => u.listenOnly == true)
|
||||
def findAllFreeswitchCallers(users: VoiceUsers): Vector[VoiceUserState] = users.toVector.filter(u => u.calledInto == "freeswitch")
|
||||
def findAllKurentoCallers(users: VoiceUsers): Vector[VoiceUserState] = users.toVector.filter(u => u.calledInto == "kms")
|
||||
|
||||
|
@ -30,6 +30,21 @@ object Webcams {
|
||||
removedStream <- webcams.remove(streamId)
|
||||
} yield removedStream
|
||||
}
|
||||
|
||||
def updateWebcamStream(webcams: Webcams, streamId: String, userId: String): Option[WebcamStream] = {
|
||||
findWithStreamId(webcams, streamId) match {
|
||||
case Some(value) => {
|
||||
val mediaStream: MediaStream = MediaStream(value.stream.id, value.stream.url, userId, value.stream.attributes,
|
||||
value.stream.viewers)
|
||||
val webcamStream: WebcamStream = WebcamStream(streamId, mediaStream)
|
||||
webcams.update(streamId, webcamStream)
|
||||
Some(webcamStream)
|
||||
}
|
||||
case None => {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class Webcams {
|
||||
@ -47,6 +62,12 @@ class Webcams {
|
||||
webcam foreach (u => webcams -= streamId)
|
||||
webcam
|
||||
}
|
||||
|
||||
private def update(streamId: String, webcamStream: WebcamStream): WebcamStream = {
|
||||
val webcam = remove(streamId)
|
||||
|
||||
save(webcamStream)
|
||||
}
|
||||
}
|
||||
|
||||
case class WebcamStream(streamId: String, stream: MediaStream)
|
||||
|
@ -21,7 +21,7 @@ import org.bigbluebutton.core.apps.presentation.PresentationApp2x
|
||||
import org.bigbluebutton.core.apps.users.UsersApp2x
|
||||
import org.bigbluebutton.core.apps.whiteboard.WhiteboardApp2x
|
||||
import org.bigbluebutton.core.bus._
|
||||
import org.bigbluebutton.core.models._
|
||||
import org.bigbluebutton.core.models.{ Users2x, VoiceUsers, _ }
|
||||
import org.bigbluebutton.core2.{ MeetingStatus2x, Permissions }
|
||||
import org.bigbluebutton.core2.message.handlers._
|
||||
import org.bigbluebutton.core2.message.handlers.meeting._
|
||||
@ -30,13 +30,18 @@ import org.bigbluebutton.common2.msgs._
|
||||
import org.bigbluebutton.core.apps.breakout._
|
||||
import org.bigbluebutton.core.apps.polls._
|
||||
import org.bigbluebutton.core.apps.voice._
|
||||
import akka.actor._
|
||||
import akka.actor.Props
|
||||
import akka.actor.OneForOneStrategy
|
||||
import akka.actor.SupervisorStrategy.Resume
|
||||
import org.bigbluebutton.common2.msgs
|
||||
|
||||
import scala.concurrent.duration._
|
||||
import org.bigbluebutton.core.apps.layout.LayoutApp2x
|
||||
import org.bigbluebutton.core.apps.meeting.{ SyncGetMeetingInfoRespMsgHdlr, ValidateConnAuthTokenSysMsgHdlr }
|
||||
import org.bigbluebutton.core.apps.users.ChangeLockSettingsInMeetingCmdMsgHdlr
|
||||
import org.bigbluebutton.core.models.VoiceUsers.{ findAllFreeswitchCallers, findAllListenOnlyVoiceUsers }
|
||||
import org.bigbluebutton.core.models.Webcams.{ findAll, updateWebcamStream }
|
||||
import org.bigbluebutton.core2.MeetingStatus2x.{ hasAuthedUserJoined, isVoiceRecording }
|
||||
import org.bigbluebutton.core2.message.senders.{ MsgBuilder, Sender }
|
||||
|
||||
import java.util.concurrent.TimeUnit
|
||||
@ -97,6 +102,8 @@ class MeetingActor(
|
||||
|
||||
object CheckVoiceRecordingInternalMsg
|
||||
object SyncVoiceUserStatusInternalMsg
|
||||
object MeetingInfoAnalyticsMsg
|
||||
object MeetingInfoAnalyticsLogMsg
|
||||
|
||||
override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
|
||||
case e: Exception => {
|
||||
@ -212,12 +219,32 @@ class MeetingActor(
|
||||
CheckVoiceRecordingInternalMsg
|
||||
)
|
||||
|
||||
context.system.scheduler.scheduleOnce(
|
||||
10 seconds,
|
||||
self,
|
||||
MeetingInfoAnalyticsLogMsg
|
||||
)
|
||||
|
||||
context.system.scheduler.schedule(
|
||||
10 seconds,
|
||||
30 seconds,
|
||||
self,
|
||||
MeetingInfoAnalyticsMsg
|
||||
)
|
||||
|
||||
def receive = {
|
||||
case SyncVoiceUserStatusInternalMsg =>
|
||||
checkVoiceConfUsersStatus()
|
||||
case CheckVoiceRecordingInternalMsg =>
|
||||
checkVoiceConfIsRunningAndRecording()
|
||||
|
||||
case MeetingInfoAnalyticsLogMsg =>
|
||||
handleMeetingInfoAnalyticsLogging()
|
||||
case MeetingInfoAnalyticsMsg =>
|
||||
handleMeetingInfoAnalyticsService()
|
||||
case msg: CamStreamSubscribeSysMsg =>
|
||||
handleCamStreamSubscribeSysMsg(msg)
|
||||
case msg: ScreenStreamSubscribeSysMsg =>
|
||||
handleScreenStreamSubscribeSysMsg(msg)
|
||||
//=============================
|
||||
|
||||
// 2x messages
|
||||
@ -339,27 +366,27 @@ class MeetingActor(
|
||||
private def handleMessageThatAffectsInactivity(msg: BbbCommonEnvCoreMsg): Unit = {
|
||||
|
||||
msg.core match {
|
||||
case m: EndMeetingSysCmdMsg => handleEndMeeting(m, state)
|
||||
case m: EndMeetingSysCmdMsg => handleEndMeeting(m, state)
|
||||
|
||||
// Users
|
||||
case m: ValidateAuthTokenReqMsg => state = usersApp.handleValidateAuthTokenReqMsg(m, state)
|
||||
case m: UserJoinMeetingReqMsg =>
|
||||
case m: ValidateAuthTokenReqMsg => state = usersApp.handleValidateAuthTokenReqMsg(m, state)
|
||||
case m: UserJoinMeetingReqMsg =>
|
||||
state = handleUserJoinMeetingReqMsg(m, state)
|
||||
updateModeratorsPresence()
|
||||
case m: UserJoinMeetingAfterReconnectReqMsg =>
|
||||
state = handleUserJoinMeetingAfterReconnectReqMsg(m, state)
|
||||
updateModeratorsPresence()
|
||||
case m: UserLeaveReqMsg =>
|
||||
case m: UserLeaveReqMsg =>
|
||||
state = handleUserLeaveReqMsg(m, state)
|
||||
updateModeratorsPresence()
|
||||
case m: UserBroadcastCamStartMsg => handleUserBroadcastCamStartMsg(m)
|
||||
case m: UserBroadcastCamStopMsg => handleUserBroadcastCamStopMsg(m)
|
||||
case m: GetCamBroadcastPermissionReqMsg => handleGetCamBroadcastPermissionReqMsg(m)
|
||||
case m: GetCamSubscribePermissionReqMsg => handleGetCamSubscribePermissionReqMsg(m)
|
||||
case m: UserBroadcastCamStartMsg => handleUserBroadcastCamStartMsg(m)
|
||||
case m: UserBroadcastCamStopMsg => handleUserBroadcastCamStopMsg(m)
|
||||
case m: GetCamBroadcastPermissionReqMsg => handleGetCamBroadcastPermissionReqMsg(m)
|
||||
case m: GetCamSubscribePermissionReqMsg => handleGetCamSubscribePermissionReqMsg(m)
|
||||
|
||||
case m: UserJoinedVoiceConfEvtMsg => handleUserJoinedVoiceConfEvtMsg(m)
|
||||
case m: LogoutAndEndMeetingCmdMsg => usersApp.handleLogoutAndEndMeetingCmdMsg(m, state)
|
||||
case m: SetRecordingStatusCmdMsg =>
|
||||
case m: UserJoinedVoiceConfEvtMsg => handleUserJoinedVoiceConfEvtMsg(m)
|
||||
case m: LogoutAndEndMeetingCmdMsg => usersApp.handleLogoutAndEndMeetingCmdMsg(m, state)
|
||||
case m: SetRecordingStatusCmdMsg =>
|
||||
state = usersApp.handleSetRecordingStatusCmdMsg(m, state)
|
||||
updateUserLastActivity(m.body.setBy)
|
||||
case m: RecordAndClearPreviousMarkersCmdMsg =>
|
||||
@ -546,6 +573,98 @@ class MeetingActor(
|
||||
}
|
||||
}
|
||||
|
||||
private def handleCamStreamSubscribeSysMsg(msg: CamStreamSubscribeSysMsg): Unit = {
|
||||
updateWebcamStream(liveMeeting.webcams, msg.body.streamId, msg.body.userId)
|
||||
}
|
||||
|
||||
private def handleScreenStreamSubscribeSysMsg(msg: ScreenStreamSubscribeSysMsg): Unit = ???
|
||||
|
||||
private def handleMeetingInfoAnalyticsLogging(): Unit = {
|
||||
val meetingInfoAnalyticsLogMsg: MeetingInfoAnalytics = prepareMeetingInfo()
|
||||
val event = MsgBuilder.buildMeetingInfoAnalyticsMsg(meetingInfoAnalyticsLogMsg)
|
||||
outGW.send(event)
|
||||
}
|
||||
|
||||
private def handleMeetingInfoAnalyticsService(): Unit = {
|
||||
val meetingInfoAnalyticsLogMsg: MeetingInfoAnalytics = prepareMeetingInfo()
|
||||
val event2 = MsgBuilder.buildMeetingInfoAnalyticsServiceMsg(meetingInfoAnalyticsLogMsg)
|
||||
outGW.send(event2)
|
||||
}
|
||||
|
||||
private def prepareMeetingInfo(): MeetingInfoAnalytics = {
|
||||
val meetingName: String = liveMeeting.props.meetingProp.name
|
||||
val externalId: String = liveMeeting.props.meetingProp.extId
|
||||
val internalId: String = liveMeeting.props.meetingProp.intId
|
||||
val hasUserJoined: Boolean = hasAuthedUserJoined(liveMeeting.status)
|
||||
val isMeetingRecorded = MeetingStatus2x.isRecording(liveMeeting.status)
|
||||
|
||||
// TODO: Placeholder values as required values not available
|
||||
val screenshareStream: ScreenshareStream = ScreenshareStream(new User("", ""), List())
|
||||
val screenshare: Screenshare = Screenshare(screenshareStream)
|
||||
|
||||
val listOfUsers: List[UserState] = Users2x.findAll(liveMeeting.users2x).toList
|
||||
val breakoutRoomNames: List[String] = {
|
||||
if (state.breakout.isDefined)
|
||||
state.breakout.get.getRooms.map(_.name).toList
|
||||
else
|
||||
List()
|
||||
}
|
||||
val breakoutRoom: BreakoutRoom = BreakoutRoom(liveMeeting.props.breakoutProps.parentId, breakoutRoomNames)
|
||||
MeetingInfoAnalytics(
|
||||
meetingName, externalId, internalId, hasUserJoined, isMeetingRecorded, getMeetingInfoWebcamDetails, getMeetingInfoAudioDetails,
|
||||
screenshare, listOfUsers.map(u => Participant(u.intId, u.name, u.role)), getMeetingInfoPresentationDetails, breakoutRoom
|
||||
)
|
||||
}
|
||||
|
||||
private def resolveUserName(userId: String): String = {
|
||||
val userName: String = Users2x.findWithIntId(liveMeeting.users2x, userId).map(_.name).getOrElse("")
|
||||
if (userName.isEmpty) log.error(s"Failed to map username for id $userId")
|
||||
userName
|
||||
}
|
||||
|
||||
private def getMeetingInfoWebcamDetails(): Webcam = {
|
||||
val liveWebcams: Vector[org.bigbluebutton.core.models.WebcamStream] = findAll(liveMeeting.webcams)
|
||||
val numOfLiveWebcams: Int = liveWebcams.length
|
||||
val broadcasts: List[Broadcast] = liveWebcams.map(webcam => Broadcast(
|
||||
webcam.stream.id,
|
||||
User(webcam.stream.userId, resolveUserName(webcam.stream.userId)), 0L
|
||||
)).toList
|
||||
val viewers: Set[String] = liveWebcams.flatMap(_.stream.viewers).toSet
|
||||
val webcamStream: msgs.WebcamStream = msgs.WebcamStream(broadcasts, viewers)
|
||||
Webcam(numOfLiveWebcams, webcamStream)
|
||||
}
|
||||
|
||||
private def getMeetingInfoAudioDetails(): Audio = {
|
||||
val voiceUsers: Vector[VoiceUserState] = VoiceUsers.findAll(liveMeeting.voiceUsers)
|
||||
val numOfVoiceUsers: Int = voiceUsers.length
|
||||
|
||||
val listenOnlyUsers: Vector[VoiceUserState] = findAllListenOnlyVoiceUsers(liveMeeting.voiceUsers)
|
||||
val numOfListenOnlyUsers: Int = listenOnlyUsers.length
|
||||
val listenOnlyAudio = ListenOnlyAudio(
|
||||
numOfListenOnlyUsers,
|
||||
listenOnlyUsers.map(voiceUserState => User(voiceUserState.voiceUserId, resolveUserName(voiceUserState.intId))).toList
|
||||
)
|
||||
|
||||
val freeswitchUsers: Vector[VoiceUserState] = findAllFreeswitchCallers(liveMeeting.voiceUsers)
|
||||
val numOfFreeswitchUsers: Int = freeswitchUsers.length
|
||||
val twoWayAudio = TwoWayAudio(
|
||||
numOfFreeswitchUsers,
|
||||
freeswitchUsers.map(voiceUserState => User(voiceUserState.voiceUserId, resolveUserName(voiceUserState.intId))).toList
|
||||
)
|
||||
|
||||
// TODO: Placeholder values
|
||||
val phoneAudio = PhoneAudio(0, List())
|
||||
|
||||
Audio(numOfVoiceUsers, listenOnlyAudio, twoWayAudio, phoneAudio)
|
||||
}
|
||||
|
||||
private def getMeetingInfoPresentationDetails(): PresentationInfo = {
|
||||
val presentationPods: Vector[PresentationPod] = state.presentationPodManager.getAllPresentationPodsInMeeting()
|
||||
val presentationId: String = presentationPods.flatMap(_.getCurrentPresentation.map(_.id)).mkString
|
||||
val presentationName: String = presentationPods.flatMap(_.getCurrentPresentation.map(_.name)).mkString
|
||||
PresentationInfo(presentationId, presentationName)
|
||||
}
|
||||
|
||||
def handleGetRunningMeetingStateReqMsg(msg: GetRunningMeetingStateReqMsg): Unit = {
|
||||
processGetRunningMeetingStateReqMsg()
|
||||
}
|
||||
@ -596,7 +715,6 @@ class MeetingActor(
|
||||
}
|
||||
|
||||
def handleDeskShareGetDeskShareInfoRequest(msg: DeskShareGetDeskShareInfoRequest): Unit = {
|
||||
|
||||
log.info("handleDeskShareGetDeskShareInfoRequest: " + msg.conferenceName + "isBroadcasting="
|
||||
+ ScreenshareModel.isBroadcastingRTMP(liveMeeting.screenshareModel) + " URL:" +
|
||||
ScreenshareModel.getRTMPBroadcastingUrl(liveMeeting.screenshareModel))
|
||||
@ -737,9 +855,7 @@ class MeetingActor(
|
||||
}
|
||||
}
|
||||
|
||||
def handleExtendMeetingDuration(msg: ExtendMeetingDuration) {
|
||||
|
||||
}
|
||||
def handleExtendMeetingDuration(msg: ExtendMeetingDuration) = ???
|
||||
|
||||
def removeUsersWithExpiredUserLeftFlag(liveMeeting: LiveMeeting, state: MeetingState2x): MeetingState2x = {
|
||||
val leftUsers = Users2x.findAllExpiredUserLeftFlags(liveMeeting.users2x, expiryTracker.meetingExpireWhenLastUserLeftInMs)
|
||||
|
@ -3,7 +3,6 @@ package org.bigbluebutton.core2
|
||||
import akka.actor.{ Actor, ActorLogging, Props }
|
||||
import org.bigbluebutton.common2.msgs._
|
||||
import org.bigbluebutton.common2.util.JsonUtil
|
||||
|
||||
object AnalyticsActor {
|
||||
def props(includeChat: Boolean): Props = Props(classOf[AnalyticsActor], includeChat)
|
||||
}
|
||||
@ -168,6 +167,7 @@ class AnalyticsActor(val includeChat: Boolean) extends Actor with ActorLogging {
|
||||
case m: ChangeLockSettingsInMeetingCmdMsg => logMessage(msg)
|
||||
case m: GetLockSettingsReqMsg => logMessage(msg)
|
||||
case m: LockSettingsNotInitializedRespMsg => logMessage(msg)
|
||||
case m: MeetingInfoAnalyticsMsg => logMessage(msg)
|
||||
|
||||
case _ => // ignore message
|
||||
}
|
||||
|
@ -180,6 +180,35 @@ object MsgBuilder {
|
||||
BbbCommonEnvCoreMsg(envelope, event)
|
||||
}
|
||||
|
||||
def buildMeetingInfoAnalyticsMsg(analytics: MeetingInfoAnalytics): BbbCommonEnvCoreMsg = {
|
||||
val routing = collection.immutable.HashMap("sender" -> "bbb-apps-akka")
|
||||
val envelope = BbbCoreEnvelope(MeetingInfoAnalyticsMsg.NAME, routing)
|
||||
val header = BbbCoreBaseHeader(MeetingInfoAnalyticsMsg.NAME)
|
||||
val body = MeetingInfoAnalyticsMsgBody(analytics)
|
||||
val event = MeetingInfoAnalyticsMsg(header, body)
|
||||
|
||||
BbbCommonEnvCoreMsg(envelope, event)
|
||||
}
|
||||
|
||||
def buildMeetingInfoAnalyticsServiceMsg(analytics: MeetingInfoAnalytics): BbbCommonEnvCoreMsg = {
|
||||
val routing = collection.immutable.HashMap("sender" -> "bbb-apps-akka")
|
||||
val envelope = BbbCoreEnvelope(MeetingInfoAnalyticsServiceMsg.NAME, routing)
|
||||
val header = BbbCoreBaseHeader(MeetingInfoAnalyticsServiceMsg.NAME)
|
||||
val body = MeetingInfoAnalyticsMsgBody(analytics)
|
||||
val event = MeetingInfoAnalyticsServiceMsg(header, body)
|
||||
BbbCommonEnvCoreMsg(envelope, event)
|
||||
}
|
||||
|
||||
def buildCamStreamSubscribeSysMsg(meetingId: String, userId: String, streamId: String, sfuSessionId: String): BbbCommonEnvCoreMsg = {
|
||||
val routing = collection.immutable.HashMap("sender" -> "bbb-apps-akka")
|
||||
val envelope = BbbCoreEnvelope(CamStreamSubscribeSysMsg.NAME, routing)
|
||||
val header = BbbCoreBaseHeader(CamStreamSubscribeSysMsg.NAME)
|
||||
val body = CamStreamSubscribeSysMsgBody(meetingId, userId, streamId, sfuSessionId)
|
||||
val event = CamStreamSubscribeSysMsg(header, body)
|
||||
|
||||
BbbCommonEnvCoreMsg(envelope, event)
|
||||
}
|
||||
|
||||
def buildMeetingDestroyedEvtMsg(meetingId: String): BbbCommonEnvCoreMsg = {
|
||||
val routing = collection.immutable.HashMap("sender" -> "bbb-apps-akka")
|
||||
val envelope = BbbCoreEnvelope(MeetingDestroyedEvtMsg.NAME, routing)
|
||||
|
@ -0,0 +1,95 @@
|
||||
package org.bigbluebutton.service
|
||||
|
||||
import akka.actor.{ Actor, ActorLogging, ActorRef, ActorSystem, Props }
|
||||
import akka.pattern.ask
|
||||
import akka.pattern.AskTimeoutException
|
||||
import akka.util.Timeout
|
||||
import org.bigbluebutton.MeetingInfoAnalytics
|
||||
import org.bigbluebutton.common2.msgs.{ BbbCommonEnvCoreMsg, MeetingEndingEvtMsg, MeetingInfoAnalyticsServiceMsg }
|
||||
|
||||
import scala.collection.mutable
|
||||
import scala.concurrent.duration.DurationInt
|
||||
import scala.concurrent.{ ExecutionContextExecutor, Future }
|
||||
|
||||
sealed trait MeetingInfoMessage
|
||||
|
||||
case class GetMeetingInfoMessage(meetingId: String) extends MeetingInfoMessage
|
||||
case object GetMeetingsInfoMessage extends MeetingInfoMessage
|
||||
case class MeetingInfoResponseMsg(optionMeetingInfoAnalytics: Option[MeetingInfoAnalytics]) extends MeetingInfoMessage
|
||||
case class MeetingInfoListResponseMsg(optionMeetingsInfoAnalytics: Option[List[MeetingInfoAnalytics]]) extends MeetingInfoMessage
|
||||
|
||||
object MeetingInfoService {
|
||||
def apply(system: ActorSystem, meetingInfoActor: ActorRef) = new MeetingInfoService(system, meetingInfoActor)
|
||||
}
|
||||
|
||||
class MeetingInfoService(system: ActorSystem, meetingInfoActor: ActorRef) {
|
||||
implicit def executionContext: ExecutionContextExecutor = system.dispatcher
|
||||
implicit val timeout: Timeout = 2 seconds
|
||||
|
||||
def getAnalytics(): Future[MeetingInfoListResponseMsg] = {
|
||||
val future = meetingInfoActor.ask(GetMeetingsInfoMessage).mapTo[MeetingInfoListResponseMsg]
|
||||
|
||||
future.recover {
|
||||
case e: AskTimeoutException => {
|
||||
MeetingInfoListResponseMsg(None)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def getAnalytics(meetingId: String): Future[MeetingInfoResponseMsg] = {
|
||||
val future = meetingInfoActor.ask(GetMeetingInfoMessage(meetingId)).mapTo[MeetingInfoResponseMsg]
|
||||
|
||||
future.recover {
|
||||
case e: AskTimeoutException => {
|
||||
MeetingInfoResponseMsg(None)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
object MeetingInfoActor {
|
||||
def props(): Props = Props(classOf[MeetingInfoActor])
|
||||
}
|
||||
|
||||
class MeetingInfoActor extends Actor with ActorLogging {
|
||||
var optionMeetingInfo: Option[MeetingInfoAnalytics] = None
|
||||
var meetingInfoMap: mutable.HashMap[String, MeetingInfoAnalytics] = mutable.HashMap.empty[String, MeetingInfoAnalytics]
|
||||
|
||||
override def receive: Receive = {
|
||||
case msg: BbbCommonEnvCoreMsg => handle(msg)
|
||||
case GetMeetingsInfoMessage =>
|
||||
if (meetingInfoMap.size > 0) {
|
||||
sender ! MeetingInfoListResponseMsg(Option(meetingInfoMap.values.toList))
|
||||
} else {
|
||||
sender ! MeetingInfoListResponseMsg(None)
|
||||
}
|
||||
case GetMeetingInfoMessage(meetingId) =>
|
||||
meetingInfoMap.get(meetingId) match {
|
||||
case Some(meetingInfoAnalytics) =>
|
||||
sender ! MeetingInfoResponseMsg(Option(meetingInfoAnalytics))
|
||||
case None => sender ! MeetingInfoResponseMsg(None)
|
||||
}
|
||||
case _ => // ignore other messages
|
||||
}
|
||||
|
||||
def handle(msg: BbbCommonEnvCoreMsg): Unit = {
|
||||
msg.core match {
|
||||
case m: MeetingInfoAnalyticsServiceMsg =>
|
||||
val meetingInternalId = m.body.meetingInfo.internalId
|
||||
|
||||
optionMeetingInfo = Option.apply(MeetingInfoAnalytics(m.body.meetingInfo.name, m.body.meetingInfo.externalId,
|
||||
meetingInternalId, m.body.meetingInfo.hasUserJoined, m.body.meetingInfo.isMeetingRecorded, m.body.meetingInfo.webcams,
|
||||
m.body.meetingInfo.audio, m.body.meetingInfo.screenshare, m.body.meetingInfo.users, m.body.meetingInfo.presentation,
|
||||
m.body.meetingInfo.breakoutRoom))
|
||||
|
||||
meetingInfoMap.get(meetingInternalId) match {
|
||||
case Some(_) => {
|
||||
meetingInfoMap(meetingInternalId) = optionMeetingInfo.get
|
||||
}
|
||||
case None => meetingInfoMap += (meetingInternalId -> optionMeetingInfo.get)
|
||||
}
|
||||
case m: MeetingEndingEvtMsg => meetingInfoMap -= m.body.meetingId
|
||||
case _ => // ignore
|
||||
}
|
||||
}
|
||||
}
|
@ -6,7 +6,7 @@ case class DurationProps(duration: Int, createdTime: Long, createdDate: String,
|
||||
meetingExpireIfNoUserJoinedInMinutes: Int, meetingExpireWhenLastUserLeftInMinutes: Int,
|
||||
userInactivityInspectTimerInMinutes: Int, userInactivityThresholdInMinutes: Int,
|
||||
userActivitySignResponseDelayInMinutes: Int,
|
||||
endWhenNoModerator: Boolean, endWhenNoModeratorDelayInMinutes: Int)
|
||||
endWhenNoModerator: Boolean, endWhenNoModeratorDelayInMinutes: Int)
|
||||
|
||||
case class MeetingProp(name: String, extId: String, intId: String, isBreakout: Boolean)
|
||||
|
||||
|
@ -0,0 +1,14 @@
|
||||
package org.bigbluebutton.common2.msgs
|
||||
|
||||
object CamStreamSubscribeSysMsg { val NAME = "CamStreamSubscribeSysMsg" }
|
||||
case class CamStreamSubscribeSysMsg(
|
||||
header: BbbCoreBaseHeader,
|
||||
body: CamStreamSubscribeSysMsgBody
|
||||
) extends BbbCoreMsg
|
||||
|
||||
case class CamStreamSubscribeSysMsgBody(
|
||||
meetingId: String,
|
||||
userId: String,
|
||||
streamId: String,
|
||||
sfuSessionId: String
|
||||
)
|
@ -0,0 +1,47 @@
|
||||
package org.bigbluebutton.common2.msgs
|
||||
|
||||
object MeetingInfoAnalyticsMsg { val NAME = "MeetingInfoAnalyticsMsg" }
|
||||
case class MeetingInfoAnalyticsMsg(
|
||||
header: BbbCoreBaseHeader,
|
||||
body: MeetingInfoAnalyticsMsgBody
|
||||
) extends BbbCoreMsg
|
||||
case class MeetingInfoAnalyticsMsgBody(meetingInfo: MeetingInfoAnalytics)
|
||||
|
||||
object MeetingInfoAnalytics {
|
||||
def apply(name: String, externalId: String, internalId: String, hasUserJoined: Boolean, isMeetingRecorded: Boolean,
|
||||
webcam: Webcam, audio: Audio, screenshare: Screenshare, users: List[Participant], presentation: PresentationInfo,
|
||||
breakoutRooms: BreakoutRoom): MeetingInfoAnalytics =
|
||||
new MeetingInfoAnalytics(name, externalId, internalId, hasUserJoined, isMeetingRecorded, webcam, audio, screenshare, users,
|
||||
presentation, breakoutRooms)
|
||||
}
|
||||
|
||||
case class MeetingInfoAnalytics(
|
||||
name: String,
|
||||
externalId: String,
|
||||
internalId: String,
|
||||
hasUserJoined: Boolean,
|
||||
isMeetingRecorded: Boolean,
|
||||
webcams: Webcam,
|
||||
audio: Audio,
|
||||
screenshare: Screenshare,
|
||||
users: List[Participant],
|
||||
presentation: PresentationInfo,
|
||||
breakoutRoom: BreakoutRoom
|
||||
)
|
||||
|
||||
case class Webcam(total: Int, streams: WebcamStream)
|
||||
case class WebcamStream(broadcasts: List[Broadcast], viewers: Set[String])
|
||||
case class User(id: String, name: String)
|
||||
case class Broadcast(id: String, user: User, startedOn: Long)
|
||||
|
||||
case class Audio(total: Int, listenOnly: ListenOnlyAudio, twoWay: TwoWayAudio, phone: PhoneAudio)
|
||||
case class ListenOnlyAudio(total: Int, users: List[User])
|
||||
case class TwoWayAudio(total: Int, users: List[User])
|
||||
case class PhoneAudio(total: Int, users: List[User])
|
||||
|
||||
case class Screenshare(stream: ScreenshareStream)
|
||||
case class ScreenshareStream(user: User, viewers: List[User])
|
||||
|
||||
case class Participant(id: String, name: String, role: String)
|
||||
case class PresentationInfo(id: String, name: String)
|
||||
case class BreakoutRoom(id: String, names: List[String])
|
@ -0,0 +1,7 @@
|
||||
package org.bigbluebutton.common2.msgs
|
||||
|
||||
object MeetingInfoAnalyticsServiceMsg { val NAME = "MeetingInfoAnalyticsServiceMsg" }
|
||||
case class MeetingInfoAnalyticsServiceMsg(
|
||||
header: BbbCoreBaseHeader,
|
||||
body: MeetingInfoAnalyticsMsgBody
|
||||
) extends BbbCoreMsg
|
@ -0,0 +1,14 @@
|
||||
package org.bigbluebutton.common2.msgs
|
||||
|
||||
object ScreenStreamSubscribeSysMsg { val NAME = "ScreenStreamSubscribeSysMsg" }
|
||||
case class ScreenStreamSubscribeSysMsg(
|
||||
header: BbbCoreBaseHeader,
|
||||
body: ScreenStreamSubscribeSysMsg
|
||||
) extends BbbCoreMsg
|
||||
|
||||
case class ScreenStreamSubscribeSysMsgBody(
|
||||
meetingId: String,
|
||||
userId: String,
|
||||
streamId: String,
|
||||
sfuSessionId: String
|
||||
)
|
Loading…
Reference in New Issue
Block a user