ActivityTracker handle/stores more important events

This commit is contained in:
Gustavo Trott 2021-06-23 11:51:10 -03:00
parent c33f2f48ef
commit 54dfe4bc3e

View File

@ -1,34 +1,57 @@
package org.bigbluebutton.endpoint.redis
import akka.actor.{Actor, ActorLogging, ActorSystem, Props}
import org.bigbluebutton.common2.msgs._
import org.bigbluebutton.common2.redis.{RedisConfig, RedisStorageProvider}
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.ActorSystem
import akka.actor.Props
import org.bigbluebutton.common2.util.JsonUtil
import org.bigbluebutton.core.apps.groupchats.GroupChatApp
import scala.concurrent.duration._
import scala.concurrent._
import ExecutionContext.Implicits.global
case object SendPeriodicReport
case class MeetingActivityTracker(
intId: String,
extId: String,
name: String,
users: Map[String, UserActivityTracker],
users: Map[String, UserActivityTracker] = Map(),
polls: Map[String, Poll] = Map(),
)
case class UserActivityTracker(
intId: String,
extId: String,
name: String,
answers: Map[String,String] = Map(),
talks: Vector[Talk] = Vector(),
webcams: Vector[Webcam] = Vector(),
totalOfMessages: Long = 0,
totalOfRaiseHands: Long = 0,
totalOfEmojis: Long = 0,
registeredOn: Long = System.currentTimeMillis(),
leftOn: Long = 0,
)
case class Poll(
pollId: String,
pollType: String,
question: String,
options: Vector[String] = Vector(),
createdOn: Long = System.currentTimeMillis(),
)
case class Talk(
startedOn: Long = System.currentTimeMillis(),
stoppedOn: Long = 0,
)
case class Webcam(
startedOn: Long = System.currentTimeMillis(),
stoppedOn: Long = 0,
)
object ActivityTrackerActor {
def props(
@ -74,23 +97,23 @@ class ActivityTrackerActor(
private def handleBbbCommonEnvCoreMsg(msg: BbbCommonEnvCoreMsg): Unit = {
msg.core match {
// Chat
// case m: GroupChatMessageBroadcastEvtMsg => handleGroupChatMessageBroadcastEvtMsg(m)
case m: GroupChatMessageBroadcastEvtMsg => handleGroupChatMessageBroadcastEvtMsg(m)
// case m: ClearPublicChatHistoryEvtMsg => handleClearPublicChatHistoryEvtMsg(m)
// User
case m: UserJoinedMeetingEvtMsg => handleUserJoinedMeetingEvtMsg(m)
case m: UserLeftMeetingEvtMsg => handleUserLeftMeetingEvtMsg(m)
// case m: PresenterAssignedEvtMsg => handlePresenterAssignedEvtMsg(m)
// case m: UserEmojiChangedEvtMsg => handleUserEmojiChangedEvtMsg(m)
case m: UserEmojiChangedEvtMsg => handleUserEmojiChangedEvtMsg(m)
// case m: UserRoleChangedEvtMsg => handleUserRoleChangedEvtMsg(m)
// case m: UserBroadcastCamStartedEvtMsg => handleUserBroadcastCamStartedEvtMsg(m)
// case m: UserBroadcastCamStoppedEvtMsg => handleUserBroadcastCamStoppedEvtMsg(m)
case m: UserBroadcastCamStartedEvtMsg => handleUserBroadcastCamStartedEvtMsg(m)
case m: UserBroadcastCamStoppedEvtMsg => handleUserBroadcastCamStoppedEvtMsg(m)
// Voice
// case m: UserJoinedVoiceConfToClientEvtMsg => handleUserJoinedVoiceConfToClientEvtMsg(m)
// case m: UserLeftVoiceConfToClientEvtMsg => handleUserLeftVoiceConfToClientEvtMsg(m)
// case m: UserMutedVoiceEvtMsg => handleUserMutedVoiceEvtMsg(m)
// case m: UserTalkingVoiceEvtMsg => handleUserTalkingVoiceEvtMsg(m)
case m: UserTalkingVoiceEvtMsg => handleUserTalkingVoiceEvtMsg(m)
//
// case m: VoiceRecordingStartedEvtMsg => handleVoiceRecordingStartedEvtMsg(m)
// case m: VoiceRecordingStoppedEvtMsg => handleVoiceRecordingStoppedEvtMsg(m)
@ -102,16 +125,31 @@ class ActivityTrackerActor(
case m: CreateMeetingReqMsg => handleCreateMeetingReqMsg(m)
case m: MeetingEndingEvtMsg => handleMeetingEndingEvtMsg(m)
// Poll
case m: PollStartedEvtMsg => handlePollStartedEvtMsg(m)
case m: UserRespondedToPollRecordMsg => handleUserRespondedToPollRecordMsg(m)
// case m: PollStoppedEvtMsg => handlePollStoppedEvtMsg(m)
// case m: PollShowResultEvtMsg => handlePollShowResultEvtMsg(m)
case _ => // message not to be recorded.
}
}
private def handleGroupChatMessageBroadcastEvtMsg(msg: GroupChatMessageBroadcastEvtMsg) {
if (msg.body.chatId == GroupChatApp.MAIN_PUBLIC_CHAT) {
for {
meeting <- meetings.values.find(m => m.intId == msg.header.meetingId)
user <- meeting.users.values.find(u => u.intId == msg.header.userId)
} yield {
val updatedUser = user.copy(totalOfMessages = user.totalOfMessages + 1)
val updatedMeeting = meeting.copy(users = meeting.users + (updatedUser.intId -> updatedUser))
meetings += (updatedMeeting.intId -> updatedMeeting)
}
}
}
private def handleUserJoinedMeetingEvtMsg(msg: UserJoinedMeetingEvtMsg): Unit = {
log.info("------------------")
log.info("handleUserJoinedMeetingEvtMsg.")
log.info("user entrou....")
val newUser = UserActivityTracker(
msg.body.intId, msg.body.extId, msg.body.name
)
@ -127,11 +165,6 @@ class ActivityTrackerActor(
)
})
log.info("----meeting")
log.info(registeredMeeting.intId)
log.info(registeredMeeting.name)
val refreshedMeeting = registeredMeeting.copy(users = registeredMeeting.users + (newUser.intId -> newUser))
meetings += (refreshedMeeting.intId -> refreshedMeeting)
@ -140,9 +173,6 @@ class ActivityTrackerActor(
}
private def handleUserLeftMeetingEvtMsg(msg: UserLeftMeetingEvtMsg): Unit = {
log.info("------------------")
log.info("handleUserLeftMeetingEvtMsg.")
for {
meeting <- meetings.values.find(m => m.intId == msg.header.meetingId)
user <- meeting.users.values.find(u => u.intId == msg.body.intId)
@ -156,6 +186,97 @@ class ActivityTrackerActor(
}
private def handleUserEmojiChangedEvtMsg(msg: UserEmojiChangedEvtMsg) {
for {
meeting <- meetings.values.find(m => m.intId == msg.header.meetingId)
user <- meeting.users.values.find(u => u.intId == msg.body.userId)
} yield {
if (msg.body.emoji == "raiseHand") {
val updatedUser = user.copy(totalOfRaiseHands = user.totalOfRaiseHands + 1)
val updatedMeeting = meeting.copy(users = meeting.users + (updatedUser.intId -> updatedUser))
meetings += (updatedMeeting.intId -> updatedMeeting)
} else if (msg.body.emoji != "none") {
val updatedUser = user.copy(totalOfEmojis = user.totalOfEmojis + 1)
val updatedMeeting = meeting.copy(users = meeting.users + (updatedUser.intId -> updatedUser))
meetings += (updatedMeeting.intId -> updatedMeeting)
}
}
}
private def handleUserBroadcastCamStartedEvtMsg(msg: UserBroadcastCamStartedEvtMsg) {
for {
meeting <- meetings.values.find(m => m.intId == msg.header.meetingId)
user <- meeting.users.values.find(u => u.intId == msg.body.userId)
} yield {
val updatedUser = user.copy(webcams = user.webcams :+ Webcam())
val updatedMeeting = meeting.copy(users = meeting.users + (updatedUser.intId -> updatedUser))
meetings += (updatedMeeting.intId -> updatedMeeting)
}
}
private def handleUserBroadcastCamStoppedEvtMsg(msg: UserBroadcastCamStoppedEvtMsg) {
for {
meeting <- meetings.values.find(m => m.intId == msg.header.meetingId)
user <- meeting.users.values.find(u => u.intId == msg.body.userId)
} yield {
val lastWebcam: Webcam = user.webcams.last.copy(stoppedOn = System.currentTimeMillis())
val updatedUser = user.copy(webcams = user.webcams.dropRight(1) :+ lastWebcam)
val updatedMeeting = meeting.copy(users = meeting.users + (updatedUser.intId -> updatedUser))
meetings += (updatedMeeting.intId -> updatedMeeting)
}
}
private def handleUserTalkingVoiceEvtMsg(msg: UserTalkingVoiceEvtMsg) {
for {
meeting <- meetings.values.find(m => m.intId == msg.header.meetingId)
user <- meeting.users.values.find(u => u.intId == msg.body.intId)
} yield {
if(msg.body.talking) {
val updatedUser = user.copy(talks = user.talks :+ Talk())
val updatedMeeting = meeting.copy(users = meeting.users + (updatedUser.intId -> updatedUser))
meetings += (updatedMeeting.intId -> updatedMeeting)
} else {
val lastTalk: Talk = user.talks.last.copy(stoppedOn = System.currentTimeMillis())
val updatedUser = user.copy(talks = user.talks.dropRight(1) :+ lastTalk)
val updatedMeeting = meeting.copy(users = meeting.users + (updatedUser.intId -> updatedUser))
meetings += (updatedMeeting.intId -> updatedMeeting)
}
}
}
private def handlePollStartedEvtMsg(msg: PollStartedEvtMsg): Unit = {
for {
meeting <- meetings.values.find(m => m.intId == msg.header.meetingId)
} yield {
val options = msg.body.poll.answers.map(answer => answer.key)
val newPoll = Poll(msg.body.pollId, msg.body.pollType, msg.body.question, options.toVector)
val updatedMeeting = meeting.copy(polls = meeting.polls + (newPoll.pollId -> newPoll))
meetings += (updatedMeeting.intId -> updatedMeeting)
}
}
private def handleUserRespondedToPollRecordMsg(msg: UserRespondedToPollRecordMsg): Unit = {
for {
meeting <- meetings.values.find(m => m.intId == msg.header.meetingId)
user <- meeting.users.values.find(u => u.intId == msg.header.userId)
} yield {
//val lastTalk: Talk = user.talks.last.copy(stoppedOn = System.currentTimeMillis())
val updatedUser = user.copy(answers = user.answers + (msg.body.pollId -> msg.body.answer))
val updatedMeeting = meeting.copy(users = meeting.users + (updatedUser.intId -> updatedUser))
meetings += (updatedMeeting.intId -> updatedMeeting)
}
}
private def handleCreateMeetingReqMsg(msg: CreateMeetingReqMsg): Unit = {
log.info("------------------")
log.info("handleCreateMeetingReqMsg.")
@ -164,7 +285,6 @@ class ActivityTrackerActor(
msg.body.props.meetingProp.intId,
msg.body.props.meetingProp.extId,
msg.body.props.meetingProp.name,
Map()
)
meetings += (newMeeting.intId -> newMeeting)
@ -185,38 +305,24 @@ class ActivityTrackerActor(
meeting.users.map(user => {
log.info(user._2.toString)
})
meetings = meetings.-(meeting.intId)
}
}
private def sendPeriodicReport(): Unit = {
// if (redis.checkConnectionStatusBasic)
// healthzService.sendRecordingDBStatusMessage(System.currentTimeMillis())
// else
log.info("------------------")
log.info("SendPeriodicReport.")
log.info("meetings: " + meetings.toVector.length)
log.info(JsonUtil.toJson(meetings))
meetings.map(meeting => {
log.info("------------------")
log.info(meeting._2.name)
log.info("users: " + meeting._2.users.toVector.length)
meeting._2.users.map(user => {
log.info("-----")
log.info(user._2.name)
val registeredOn : java.util.Date = new java.util.Date(user._2.registeredOn);
log.info("registeredOn: " + registeredOn)
if(user._2.leftOn > 0) {
val leftOn : java.util.Date = new java.util.Date(user._2.leftOn);
log.info("leftOn: " + leftOn)
} else {
log.info("leftOn: " + user._2.leftOn)
}
})
log.info(JsonUtil.toJson(meeting._2))
})