This update allows duplicating a user session via the getJoinUrl
endpoint. The generated link will create a new sessionToken
while retaining the same userId
. This setup enables two devices be represented as a single user on the user list, making it particularly useful for scenarios like transferring a session from a computer to a mobile device.
Additionally, the mobile app can use this feature to render the whiteboard inside an iframe with the same `userId`. By setting the parameter `revokePreviousSession=true`, a new `sessionToken` will be generated, and the previous session will be revoked when the new device connects. This is useful for transferring a session to another device and automatically closing the previous session.
This commit is contained in:
parent
41445f63e3
commit
4ea48b3333
@ -135,8 +135,15 @@ class ApiService(healthz: HealthzService, meetingInfoz: MeetingInfoService, user
|
||||
val responseMap = userInfoService.generateResponseMap(userInfos)
|
||||
userInfoService.createHttpResponse(StatusCodes.OK, responseMap)
|
||||
|
||||
case ApiResponseFailure(msg, arg) =>
|
||||
userInfoService.createHttpResponse(StatusCodes.OK, Map("response" -> "unauthorized", "message" -> msg))
|
||||
case ApiResponseFailure(msg, msgId, arg) =>
|
||||
userInfoService.createHttpResponse(
|
||||
StatusCodes.OK,
|
||||
Map(
|
||||
"response" -> "unauthorized",
|
||||
"message" -> msg,
|
||||
"message_id" -> msgId,
|
||||
)
|
||||
)
|
||||
}
|
||||
|
||||
complete(entityFuture)
|
||||
|
@ -46,7 +46,12 @@ class BigBlueButtonActor(
|
||||
|
||||
private val meetings = new RunningMeetings
|
||||
|
||||
private var sessionTokens = new collection.immutable.HashMap[String, (String, String)] //sessionToken -> (meetingId, userId)
|
||||
private case class SessionTokenInfo(
|
||||
meetingId: String,
|
||||
userId: String,
|
||||
revoked: Boolean = false
|
||||
)
|
||||
private var sessionTokens = new collection.immutable.HashMap[String, SessionTokenInfo] //sessionToken -> SessionTokenInfo
|
||||
|
||||
override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
|
||||
case e: Exception => {
|
||||
@ -96,8 +101,13 @@ class BigBlueButtonActor(
|
||||
|
||||
sessionTokens.get(msg.sessionToken) match {
|
||||
case Some(sessionTokenInfo) =>
|
||||
RunningMeetings.findWithId(meetings, sessionTokenInfo._1) match {
|
||||
if (sessionTokenInfo.revoked) {
|
||||
log.debug("handleGetUserApiMsg ({}): Session token revoked.", msg.sessionToken)
|
||||
actorRef ! ApiResponseFailure("Session token revoked.", "session_token_revoked")
|
||||
} else {
|
||||
RunningMeetings.findWithId(meetings, sessionTokenInfo.meetingId) match {
|
||||
case Some(m) =>
|
||||
log.debug("handleGetUserApiMsg ({}): {}.", msg.sessionToken, m)
|
||||
m.actorRef forward (msg)
|
||||
|
||||
case None =>
|
||||
@ -106,8 +116,8 @@ class BigBlueButtonActor(
|
||||
val userInfos = Map(
|
||||
"returncode" -> "SUCCESS",
|
||||
"sessionToken" -> msg.sessionToken,
|
||||
"meetingID" -> sessionTokenInfo._1,
|
||||
"internalUserID" -> sessionTokenInfo._2,
|
||||
"meetingID" -> sessionTokenInfo.meetingId,
|
||||
"internalUserID" -> sessionTokenInfo.userId,
|
||||
"externMeetingID" -> "",
|
||||
"externUserID" -> "",
|
||||
"currentlyInMeeting" -> false,
|
||||
@ -122,10 +132,15 @@ class BigBlueButtonActor(
|
||||
"hideUserList" -> false,
|
||||
"webcamsOnlyForModerator" -> false
|
||||
)
|
||||
actorRef ! ApiResponseSuccess("Meeting is ended!", UserInfosApiMsg(userInfos))
|
||||
|
||||
log.debug("handleGetUserApiMsg ({}): Meeting is ended.", msg.sessionToken)
|
||||
actorRef ! ApiResponseSuccess("Meeting is ended.", UserInfosApiMsg(userInfos))
|
||||
}
|
||||
}
|
||||
|
||||
case None =>
|
||||
actorRef ! ApiResponseFailure("Meeting not found!")
|
||||
log.debug("handleGetUserApiMsg ({}): Meeting not found.", msg.sessionToken)
|
||||
actorRef ! ApiResponseFailure("Meeting not found.", "meeting_not_found")
|
||||
}
|
||||
}
|
||||
|
||||
@ -134,6 +149,7 @@ class BigBlueButtonActor(
|
||||
|
||||
case m: CreateMeetingReqMsg => handleCreateMeetingReqMsg(m)
|
||||
case m: RegisterUserReqMsg => handleRegisterUserReqMsg(m)
|
||||
case m: RegisterUserSessionTokenReqMsg => handleRegisterUserSessionTokenReqMsg(m)
|
||||
case m: CheckAlivePingSysMsg => handleCheckAlivePingSysMsg(m)
|
||||
case _: UserGraphqlConnectionEstablishedSysMsg => //Ignore
|
||||
case _: UserGraphqlConnectionClosedSysMsg => //Ignore
|
||||
@ -150,7 +166,29 @@ class BigBlueButtonActor(
|
||||
log.debug("FORWARDING Register user message")
|
||||
|
||||
//Store sessionTokens and associate them with their respective meetingId + userId owners
|
||||
sessionTokens += (msg.body.sessionToken -> (msg.body.meetingId, msg.body.intUserId))
|
||||
sessionTokens += (msg.body.sessionToken -> SessionTokenInfo(msg.body.meetingId, msg.body.intUserId))
|
||||
|
||||
m.actorRef forward (msg)
|
||||
}
|
||||
}
|
||||
|
||||
def handleRegisterUserSessionTokenReqMsg(msg: RegisterUserSessionTokenReqMsg): Unit = {
|
||||
log.debug("RECEIVED RegisterUserSessionTokenReqMsg msg {}", msg)
|
||||
for {
|
||||
m <- RunningMeetings.findWithId(meetings, msg.header.meetingId)
|
||||
} yield {
|
||||
log.debug("FORWARDING Register user session token message")
|
||||
|
||||
//Store sessionTokens and associate them with their respective meetingId + userId owners
|
||||
sessionTokens += (msg.body.sessionToken -> SessionTokenInfo(msg.body.meetingId, msg.body.userId))
|
||||
|
||||
if (msg.body.revokeSessionToken.nonEmpty) {
|
||||
for {
|
||||
sessionTokenInfo <- sessionTokens.get(msg.body.revokeSessionToken)
|
||||
} yield {
|
||||
sessionTokens += (msg.body.revokeSessionToken -> sessionTokenInfo.copy(revoked = true))
|
||||
}
|
||||
}
|
||||
|
||||
m.actorRef forward (msg)
|
||||
}
|
||||
@ -226,7 +264,7 @@ class BigBlueButtonActor(
|
||||
context.system.scheduler.scheduleOnce(Duration.create(60, TimeUnit.MINUTES)) {
|
||||
log.debug("Removing Graphql data and session tokens. meetingID={}", msg.meetingId)
|
||||
|
||||
sessionTokens = sessionTokens.filter(sessionTokenInfo => sessionTokenInfo._2._1 != msg.meetingId)
|
||||
sessionTokens = sessionTokens.filter(sessionTokenInfo => sessionTokenInfo._2.meetingId != msg.meetingId)
|
||||
|
||||
//In Db, Removing the meeting is enough, all other tables has "ON DELETE CASCADE"
|
||||
MeetingDAO.delete(msg.meetingId)
|
||||
|
@ -131,4 +131,4 @@ case class UserInfosApiMsg(infos: Map[String, Any])
|
||||
|
||||
trait ApiResponse
|
||||
case class ApiResponseSuccess(msg: String, any: Any = null) extends ApiResponse
|
||||
case class ApiResponseFailure(msg: String, any: Any = null) extends ApiResponse
|
||||
case class ApiResponseFailure(msg: String, msgId: String, any: Any = null) extends ApiResponse
|
@ -19,7 +19,7 @@ trait GetUserApiMsgHdlr extends HandlerHelpers {
|
||||
actorRef ! ApiResponseSuccess("User found!", UserInfosApiMsg(getUserInfoResponse(regUser)))
|
||||
case None =>
|
||||
log.debug("User not found, sending failure message")
|
||||
actorRef ! ApiResponseFailure("User not found", Map())
|
||||
actorRef ! ApiResponseFailure("User not found", "user_not_found", Map())
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -58,7 +58,7 @@ trait RegisterUserReqMsgHdlr {
|
||||
val guestStatus = msg.body.guestStatus
|
||||
|
||||
val regUser = RegisteredUsers.create(liveMeeting.props.meetingProp.intId, msg.body.intUserId, msg.body.extUserId,
|
||||
msg.body.name, msg.body.role, msg.body.authToken, msg.body.sessionToken,
|
||||
msg.body.name, msg.body.role, msg.body.authToken, Vector(msg.body.sessionToken),
|
||||
msg.body.avatarURL, msg.body.webcamBackgroundURL, ColorPicker.nextColor(liveMeeting.props.meetingProp.intId), msg.body.guest, msg.body.authed,
|
||||
guestStatus, msg.body.excludeFromDashboard, msg.body.enforceLayout, msg.body.userMetadata, false)
|
||||
|
||||
|
@ -0,0 +1,32 @@
|
||||
package org.bigbluebutton.core.apps.users
|
||||
|
||||
import org.bigbluebutton.common2.msgs._
|
||||
import org.bigbluebutton.core.models._
|
||||
import org.bigbluebutton.core.running.{ LiveMeeting, OutMsgRouter }
|
||||
import org.bigbluebutton.core2.message.senders.Sender
|
||||
|
||||
trait RegisterUserSessionTokenReqMsgHdlr {
|
||||
this: UsersApp =>
|
||||
|
||||
val liveMeeting: LiveMeeting
|
||||
val outGW: OutMsgRouter
|
||||
|
||||
def handleRegisterUserSessionTokenReqMsg(msg: RegisterUserSessionTokenReqMsg): Unit = {
|
||||
for {
|
||||
ru <- RegisteredUsers.findWithUserId(msg.body.userId, liveMeeting.registeredUsers)
|
||||
} yield {
|
||||
val updatedRU = RegisteredUsers.addUserSessionToken(liveMeeting.registeredUsers, ru, msg.body.sessionToken)
|
||||
log.info("Register user session token success. meetingId=" + liveMeeting.props.meetingProp.intId + " userId=" + msg.body.userId + " sessionToken=" + msg.body.sessionToken)
|
||||
|
||||
if (msg.body.revokeSessionToken.nonEmpty) {
|
||||
RegisteredUsers.removeUserSessionToken(liveMeeting.registeredUsers, updatedRU, msg.body.revokeSessionToken)
|
||||
log.info("Revoked user session token success. meetingId=" + liveMeeting.props.meetingProp.intId + " userId=" + msg.body.userId + " sessionToken=" + msg.body.revokeSessionToken)
|
||||
|
||||
//Close user connection
|
||||
Sender.sendForceUserGraphqlDisconnectionSysMsg(liveMeeting.props.meetingProp.intId, updatedRU.id, Vector(msg.body.revokeSessionToken), "user connected with a new session token", "session_token_revoked", outGW)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
}
|
@ -161,6 +161,7 @@ class UsersApp(
|
||||
)(implicit val context: ActorContext)
|
||||
|
||||
extends RegisterUserReqMsgHdlr
|
||||
with RegisterUserSessionTokenReqMsgHdlr
|
||||
with GetUserApiMsgHdlr
|
||||
with ChangeUserRoleCmdMsgHdlr
|
||||
with SetUserSpeechLocaleMsgHdlr
|
||||
|
@ -33,7 +33,7 @@ trait UserJoinedVoiceConfEvtMsgHdlr extends SystemConfiguration {
|
||||
|
||||
def registerUserInRegisteredUsers() = {
|
||||
val regUser = RegisteredUsers.create(liveMeeting.props.meetingProp.intId, msg.body.intId, msg.body.voiceUserId,
|
||||
msg.body.callerIdName, Roles.VIEWER_ROLE, msg.body.intId, "", "", "", userColor,
|
||||
msg.body.callerIdName, Roles.VIEWER_ROLE, msg.body.intId, Vector(""), "", "", userColor,
|
||||
true, true, GuestStatus.WAIT, true, "", Map(), false)
|
||||
RegisteredUsers.add(liveMeeting.registeredUsers, regUser, liveMeeting.props.meetingProp.intId)
|
||||
}
|
||||
|
@ -11,7 +11,6 @@ case class UserDbModel(
|
||||
avatar: String = "",
|
||||
webcamBackground: String = "",
|
||||
color: String = "",
|
||||
sessionToken: String = "",
|
||||
authToken: String = "",
|
||||
authed: Boolean = false,
|
||||
joined: Boolean = false,
|
||||
@ -30,7 +29,7 @@ case class UserDbModel(
|
||||
|
||||
class UserDbTableDef(tag: Tag) extends Table[UserDbModel](tag, None, "user") {
|
||||
override def * = (
|
||||
meetingId,userId,extId,name,role,avatar,webcamBackground,color, sessionToken, authToken, authed,joined,joinErrorCode,
|
||||
meetingId,userId,extId,name,role,avatar,webcamBackground,color, authToken, authed,joined,joinErrorCode,
|
||||
joinErrorMessage, banned,loggedOut,guest,guestStatus,registeredOn,excludeFromDashboard, enforceLayout) <> (UserDbModel.tupled, UserDbModel.unapply)
|
||||
val meetingId = column[String]("meetingId", O.PrimaryKey)
|
||||
val userId = column[String]("userId", O.PrimaryKey)
|
||||
@ -40,7 +39,6 @@ class UserDbTableDef(tag: Tag) extends Table[UserDbModel](tag, None, "user") {
|
||||
val avatar = column[String]("avatar")
|
||||
val webcamBackground = column[String]("webcamBackground")
|
||||
val color = column[String]("color")
|
||||
val sessionToken = column[String]("sessionToken")
|
||||
val authToken = column[String]("authToken")
|
||||
val authed = column[Boolean]("authed")
|
||||
val joined = column[Boolean]("joined")
|
||||
@ -69,7 +67,6 @@ object UserDAO {
|
||||
avatar = regUser.avatarURL,
|
||||
webcamBackground = regUser.webcamBackgroundURL,
|
||||
color = regUser.color,
|
||||
sessionToken = regUser.sessionToken,
|
||||
authed = regUser.authed,
|
||||
joined = regUser.joined,
|
||||
joinErrorCode = None,
|
||||
@ -92,6 +89,7 @@ object UserDAO {
|
||||
UserMetadataDAO.insert(meetingId, regUser.id, regUser.userMetadata)
|
||||
UserClientSettingsDAO.insertOrUpdate(meetingId, regUser.id, JsonUtils.stringToJson("{}"))
|
||||
ChatUserDAO.insertUserPublicChat(meetingId, regUser.id)
|
||||
UserSessionTokenDAO.insert(regUser.meetingId, regUser.id, regUser.sessionToken.head)
|
||||
}
|
||||
|
||||
def update(regUser: RegisteredUser) = {
|
||||
|
@ -0,0 +1,51 @@
|
||||
package org.bigbluebutton.core.db
|
||||
import slick.jdbc.PostgresProfile.api._
|
||||
|
||||
case class UserSessionTokenDbModel (
|
||||
meetingId: String,
|
||||
userId: String,
|
||||
sessionToken: String,
|
||||
createdAt: java.sql.Timestamp,
|
||||
removedAt: Option[java.sql.Timestamp],
|
||||
)
|
||||
|
||||
class UserSessionTokenDbTableDef(tag: Tag) extends Table[UserSessionTokenDbModel](tag, None, "user_sessionToken") {
|
||||
override def * = (
|
||||
meetingId, userId, sessionToken, createdAt, removedAt
|
||||
) <> (UserSessionTokenDbModel.tupled, UserSessionTokenDbModel.unapply)
|
||||
val meetingId = column[String]("meetingId", O.PrimaryKey)
|
||||
val userId = column[String]("userId", O.PrimaryKey)
|
||||
val sessionToken = column[String]("sessionToken", O.PrimaryKey)
|
||||
val createdAt = column[java.sql.Timestamp]("createdAt")
|
||||
val removedAt = column[Option[java.sql.Timestamp]]("removedAt")
|
||||
}
|
||||
|
||||
object UserSessionTokenDAO {
|
||||
def insert(meetingId: String, userId: String, sessionToken: String) = {
|
||||
DatabaseConnection.enqueue(
|
||||
TableQuery[UserSessionTokenDbTableDef].insertOrUpdate(
|
||||
UserSessionTokenDbModel(
|
||||
meetingId = meetingId,
|
||||
userId = userId,
|
||||
sessionToken = sessionToken,
|
||||
createdAt = new java.sql.Timestamp(System.currentTimeMillis()),
|
||||
removedAt = None
|
||||
)
|
||||
)
|
||||
)
|
||||
}
|
||||
|
||||
def softDelete(meetingId: String, userId: String, sessionTokenToRemove: String) = {
|
||||
DatabaseConnection.enqueue(
|
||||
TableQuery[UserSessionTokenDbTableDef]
|
||||
.filter(_.meetingId === meetingId)
|
||||
.filter(_.userId === userId)
|
||||
.filter(_.sessionToken === sessionTokenToRemove)
|
||||
.filter(_.removedAt.isEmpty)
|
||||
.map(u => u.removedAt)
|
||||
.update(Some(new java.sql.Timestamp(System.currentTimeMillis())))
|
||||
)
|
||||
}
|
||||
|
||||
|
||||
}
|
@ -1,12 +1,12 @@
|
||||
package org.bigbluebutton.core.models
|
||||
|
||||
import com.softwaremill.quicklens._
|
||||
import org.bigbluebutton.core.db.{UserBreakoutRoomDAO, UserDAO, UserDbModel}
|
||||
import org.bigbluebutton.core.db.{UserBreakoutRoomDAO, UserDAO, UserDbModel, UserSessionTokenDAO}
|
||||
import org.bigbluebutton.core.domain.BreakoutRoom2x
|
||||
|
||||
object RegisteredUsers {
|
||||
def create(meetingId: String, userId: String, extId: String, name: String, roles: String,
|
||||
authToken: String, sessionToken: String, avatar: String, webcamBackground: String, color: String, guest: Boolean, authenticated: Boolean,
|
||||
authToken: String, sessionToken: Vector[String], avatar: String, webcamBackground: String, color: String, guest: Boolean, authenticated: Boolean,
|
||||
guestStatus: String, excludeFromDashboard: Boolean, enforceLayout: String,
|
||||
userMetadata: Map[String, String], loggedOut: Boolean): RegisteredUser = {
|
||||
new RegisteredUser(
|
||||
@ -42,7 +42,7 @@ object RegisteredUsers {
|
||||
}
|
||||
|
||||
def findWithSessionToken(sessionToken: String, users: RegisteredUsers): Option[RegisteredUser] = {
|
||||
users.toVector.find(u => u.sessionToken == sessionToken)
|
||||
users.toVector.find(u => u.sessionToken.contains(sessionToken))
|
||||
}
|
||||
|
||||
def findAll(users: RegisteredUsers): Vector[RegisteredUser] = {
|
||||
@ -210,6 +210,20 @@ object RegisteredUsers {
|
||||
u
|
||||
}
|
||||
|
||||
def addUserSessionToken(users: RegisteredUsers, user: RegisteredUser, newSessionToken: String): RegisteredUser = {
|
||||
val u = user.copy(sessionToken = user.sessionToken :+ newSessionToken)
|
||||
users.save(u)
|
||||
UserSessionTokenDAO.insert(u.meetingId, u.id, newSessionToken)
|
||||
u
|
||||
}
|
||||
|
||||
def removeUserSessionToken(users: RegisteredUsers, user: RegisteredUser, revokeSessionToken: String): RegisteredUser = {
|
||||
val u = user.copy(sessionToken = user.sessionToken.filterNot(_ == revokeSessionToken))
|
||||
users.save(u)
|
||||
UserSessionTokenDAO.softDelete(u.meetingId, u.id, revokeSessionToken)
|
||||
u
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
class RegisteredUsers {
|
||||
@ -238,7 +252,7 @@ case class RegisteredUser(
|
||||
name: String,
|
||||
role: String,
|
||||
authToken: String,
|
||||
sessionToken: String,
|
||||
sessionToken: Vector[String],
|
||||
avatarURL: String,
|
||||
webcamBackgroundURL: String,
|
||||
color: String,
|
||||
|
@ -61,6 +61,8 @@ class ReceivedJsonMsgHandlerActor(
|
||||
// Route via meeting manager as there is a race condition if we send directly to meeting
|
||||
// because the meeting actor might not have been created yet.
|
||||
route[RegisterUserReqMsg](meetingManagerChannel, envelope, jsonNode)
|
||||
case RegisterUserSessionTokenReqMsg.NAME =>
|
||||
route[RegisterUserSessionTokenReqMsg](meetingManagerChannel, envelope, jsonNode)
|
||||
case UserJoinMeetingReqMsg.NAME =>
|
||||
routeGenericMsg[UserJoinMeetingReqMsg](envelope, jsonNode)
|
||||
case DestroyMeetingSysCmdMsg.NAME =>
|
||||
|
@ -263,6 +263,7 @@ class MeetingActor(
|
||||
// Handling RegisterUserReqMsg as it is forwarded from BBBActor and
|
||||
// its type is not BbbCommonEnvCoreMsg
|
||||
case m: RegisterUserReqMsg => usersApp.handleRegisterUserReqMsg(m)
|
||||
case m: RegisterUserSessionTokenReqMsg => usersApp.handleRegisterUserSessionTokenReqMsg(m)
|
||||
|
||||
//API Msgs
|
||||
case m: GetUserApiMsg => usersApp.handleGetUserApiMsg(m, sender)
|
||||
|
@ -202,6 +202,16 @@ object MsgBuilder {
|
||||
BbbCommonEnvCoreMsg(envelope, event)
|
||||
}
|
||||
|
||||
def buildForceUserGraphqlDisconnectionSysMsg(meetingId: String, userId: String, sessionToken: String, reason: String, reasonMsgId: String): BbbCommonEnvCoreMsg = {
|
||||
val routing = Routing.addMsgToClientRouting(MessageTypes.SYSTEM, meetingId, userId)
|
||||
val envelope = BbbCoreEnvelope(ForceUserGraphqlDisconnectionSysMsg.NAME, routing)
|
||||
val header = BbbCoreHeaderWithMeetingId(ForceUserGraphqlDisconnectionSysMsg.NAME, meetingId)
|
||||
val body = ForceUserGraphqlDisconnectionSysMsgBody(meetingId, userId, sessionToken, reason, reasonMsgId)
|
||||
val event = ForceUserGraphqlDisconnectionSysMsg(header, body)
|
||||
|
||||
BbbCommonEnvCoreMsg(envelope, event)
|
||||
}
|
||||
|
||||
def buildCheckGraphqlMiddlewareAlivePingSysMsg(middlewareUid: String): BbbCommonEnvCoreMsg = {
|
||||
val routing = Routing.addMsgToClientRouting(MessageTypes.SYSTEM, "", "")
|
||||
val envelope = BbbCoreEnvelope(CheckGraphqlMiddlewareAlivePingSysMsg.NAME, routing)
|
||||
|
@ -4,9 +4,24 @@ import org.bigbluebutton.core.running.OutMsgRouter
|
||||
|
||||
object Sender {
|
||||
|
||||
def sendForceUserGraphqlReconnectionSysMsg(meetingId: String, userId: String, sessionToken: String, reason: String, outGW: OutMsgRouter): Unit = {
|
||||
val ForceUserGraphqlReconnectionSysMsg = MsgBuilder.buildForceUserGraphqlReconnectionSysMsg(meetingId, userId, sessionToken, reason)
|
||||
outGW.send(ForceUserGraphqlReconnectionSysMsg)
|
||||
def sendForceUserGraphqlReconnectionSysMsg(meetingId: String, userId: String, sessionTokens: Vector[String], reason: String, outGW: OutMsgRouter): Unit = {
|
||||
for {
|
||||
sessionToken <- sessionTokens
|
||||
} yield {
|
||||
outGW.send(
|
||||
MsgBuilder.buildForceUserGraphqlReconnectionSysMsg(meetingId, userId, sessionToken, reason)
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
def sendForceUserGraphqlDisconnectionSysMsg(meetingId: String, userId: String, sessionTokens: Vector[String], reason: String, reasonMsgId: String, outGW: OutMsgRouter): Unit = {
|
||||
for {
|
||||
sessionToken <- sessionTokens
|
||||
} yield {
|
||||
outGW.send(
|
||||
MsgBuilder.buildForceUserGraphqlDisconnectionSysMsg(meetingId, userId, sessionToken, reason, reasonMsgId)
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
def sendUserInactivityInspectMsg(meetingId: String, userId: String, responseDelay: Long, outGW: OutMsgRouter): Unit = {
|
||||
|
@ -58,7 +58,7 @@ object FakeUserGenerator {
|
||||
val color = "#ff6242"
|
||||
|
||||
val ru = RegisteredUsers.create(meetingId, userId = id, extId, name, role,
|
||||
authToken, sessionToken, avatarURL, webcamBackgroundURL, color, guest, authed, guestStatus = GuestStatus.ALLOW, false, "", Map(), false)
|
||||
authToken, Vector(sessionToken), avatarURL, webcamBackgroundURL, color, guest, authed, guestStatus = GuestStatus.ALLOW, false, "", Map(), false)
|
||||
RegisteredUsers.add(users, ru, meetingId)
|
||||
ru
|
||||
}
|
||||
|
@ -68,6 +68,7 @@ class GraphqlConnectionsActor(
|
||||
private def handleBbbCommonEnvCoreMsg(msg: BbbCommonEnvCoreMsg): Unit = {
|
||||
msg.core match {
|
||||
case m: RegisterUserReqMsg => handleUserRegisteredRespMsg(m)
|
||||
case m: RegisterUserSessionTokenReqMsg => handleRegisterUserSessionTokenReqMsg(m)
|
||||
case m: DestroyMeetingSysCmdMsg => handleDestroyMeetingSysCmdMsg(m)
|
||||
// Messages from bbb-graphql-middleware
|
||||
case m: UserGraphqlConnectionEstablishedSysMsg => handleUserGraphqlConnectionEstablishedSysMsg(m)
|
||||
@ -85,6 +86,14 @@ class GraphqlConnectionsActor(
|
||||
))
|
||||
}
|
||||
|
||||
private def handleRegisterUserSessionTokenReqMsg(msg: RegisterUserSessionTokenReqMsg): Unit = {
|
||||
users += (msg.body.sessionToken -> GraphqlUser(
|
||||
msg.body.userId,
|
||||
msg.body.meetingId,
|
||||
msg.body.sessionToken
|
||||
))
|
||||
}
|
||||
|
||||
private def handleDestroyMeetingSysCmdMsg(msg: DestroyMeetingSysCmdMsg): Unit = {
|
||||
users = users.filter(u => u._2.meetingId != msg.body.meetingId)
|
||||
graphqlConnections = graphqlConnections.filter(c => c._2.user.meetingId != msg.body.meetingId)
|
||||
@ -136,7 +145,8 @@ class GraphqlConnectionsActor(
|
||||
graphqlConnections = graphqlConnections.-(browserConnectionId)
|
||||
|
||||
//Send internal message informing user disconnected
|
||||
if (!graphqlConnections.values.exists(c => c.sessionToken == sessionToken)) {
|
||||
if (!graphqlConnections.values.exists(c => c.sessionToken == sessionToken
|
||||
|| (c.user.meetingId == user.meetingId && c.user.intId == user.intId))) {
|
||||
eventBus.publish(BigBlueButtonEvent(user.meetingId, UserClosedAllGraphqlConnectionsInternalMsg(user.intId)))
|
||||
}
|
||||
}
|
||||
|
@ -25,7 +25,7 @@ class UserInfoService(system: ActorSystem, bbbActor: ActorRef) {
|
||||
val future = bbbActor.ask(GetUserApiMsg(sessionToken)).mapTo[ApiResponse]
|
||||
|
||||
future.recover {
|
||||
case e: AskTimeoutException => ApiResponseFailure("Request Timeout error")
|
||||
case e: AskTimeoutException => ApiResponseFailure("Request Timeout error", "request_timeout", Map())
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -18,7 +18,7 @@ object TestDataGen {
|
||||
val color = "#ff6242"
|
||||
|
||||
val ru = RegisteredUsers.create(meetingId, userId = id, extId, name, role,
|
||||
authToken, sessionToken, avatarURL, webcamBackgroundURL, color, guest, authed, GuestStatus.ALLOW, false, "", Map(), false)
|
||||
authToken, Vector(sessionToken), avatarURL, webcamBackgroundURL, color, guest, authed, GuestStatus.ALLOW, false, "", Map(), false)
|
||||
|
||||
RegisteredUsers.add(users, ru, meetingId = "test")
|
||||
ru
|
||||
|
@ -190,6 +190,13 @@ case class ForceUserGraphqlReconnectionSysMsg(
|
||||
) extends BbbCoreMsg
|
||||
case class ForceUserGraphqlReconnectionSysMsgBody(meetingId: String, userId: String, sessionToken: String, reason: String)
|
||||
|
||||
object ForceUserGraphqlDisconnectionSysMsg { val NAME = "ForceUserGraphqlDisconnectionSysMsg" }
|
||||
case class ForceUserGraphqlDisconnectionSysMsg(
|
||||
header: BbbCoreHeaderWithMeetingId,
|
||||
body: ForceUserGraphqlDisconnectionSysMsgBody
|
||||
) extends BbbCoreMsg
|
||||
case class ForceUserGraphqlDisconnectionSysMsgBody(meetingId: String, userId: String, sessionToken: String, reason: String, reasonMessageId: String)
|
||||
|
||||
/**
|
||||
* Sent from graphql-middleware to akka-apps
|
||||
*/
|
||||
@ -201,6 +208,13 @@ case class UserGraphqlReconnectionForcedEvtMsg(
|
||||
) extends BbbCoreMsg
|
||||
case class UserGraphqlReconnectionForcedEvtMsgBody(middlewareUID: String, sessionToken: String, browserConnectionId: String)
|
||||
|
||||
object UserGraphqlDisconnectionForcedEvtMsg { val NAME = "UserGraphqlDisconnectionForcedEvtMsg" }
|
||||
case class UserGraphqlDisconnectionForcedEvtMsg(
|
||||
header: BbbCoreBaseHeader,
|
||||
body: UserGraphqlDisconnectionForcedEvtMsgBody
|
||||
) extends BbbCoreMsg
|
||||
case class UserGraphqlDisconnectionForcedEvtMsgBody(middlewareUID: String, sessionToken: String, browserConnectionId: String)
|
||||
|
||||
object UserGraphqlConnectionEstablishedSysMsg { val NAME = "UserGraphqlConnectionEstablishedSysMsg" }
|
||||
case class UserGraphqlConnectionEstablishedSysMsg(
|
||||
header: BbbCoreBaseHeader,
|
||||
|
@ -18,6 +18,20 @@ case class UserRegisteredRespMsg(
|
||||
case class UserRegisteredRespMsgBody(meetingId: String, userId: String, name: String,
|
||||
role: String, excludeFromDashboard: Boolean, registeredOn: Long)
|
||||
|
||||
object RegisterUserSessionTokenReqMsg { val NAME = "RegisterUserSessionTokenReqMsg" }
|
||||
case class RegisterUserSessionTokenReqMsg(
|
||||
header: BbbCoreHeaderWithMeetingId,
|
||||
body: RegisterUserSessionTokenReqMsgBody
|
||||
) extends BbbCoreMsg
|
||||
case class RegisterUserSessionTokenReqMsgBody(meetingId: String, userId: String, sessionToken: String, revokeSessionToken: String)
|
||||
|
||||
object UserSessionTokenRegisteredRespMsg { val NAME = "UserSessionTokenRegisteredRespMsg" }
|
||||
case class UserSessionTokenRegisteredRespMsg(
|
||||
header: BbbCoreHeaderWithMeetingId,
|
||||
body: UserSessionTokenRegisteredRespMsgBody
|
||||
) extends BbbCoreMsg
|
||||
case class UserSessionTokenRegisteredRespMsgBody(meetingId: String, userId: String, sessionToken: String)
|
||||
|
||||
/**
|
||||
* Out Messages
|
||||
*/
|
||||
|
@ -140,6 +140,12 @@ public class MeetingService implements MessageListener {
|
||||
}
|
||||
}
|
||||
|
||||
public void registerUserSession(String meetingID, String internalUserId, String sessionToken, String revokeSessionToken) {
|
||||
handle(
|
||||
new RegisterUserSessionToken(meetingID, internalUserId, sessionToken, revokeSessionToken)
|
||||
);
|
||||
}
|
||||
|
||||
public UserSession getUserSessionWithUserId(String userId) {
|
||||
for (UserSession userSession : sessions.values()) {
|
||||
if (userSession.internalUserId.equals(userId)) {
|
||||
@ -450,6 +456,10 @@ public class MeetingService implements MessageListener {
|
||||
message.authed, message.guestStatus, message.excludeFromDashboard, message.enforceLayout, message.userMetadata);
|
||||
}
|
||||
|
||||
private void processRegisterUserSessionToken(RegisterUserSessionToken message) {
|
||||
gw.registerUserSessionToken(message.meetingID, message.internalUserId, message.sessionToken, message.revokeSessionToken);
|
||||
}
|
||||
|
||||
public Meeting getMeeting(String meetingId) {
|
||||
if (meetingId == null)
|
||||
return null;
|
||||
@ -1192,6 +1202,8 @@ public class MeetingService implements MessageListener {
|
||||
processEndMeeting((EndMeeting) message);
|
||||
} else if (message instanceof RegisterUser) {
|
||||
processRegisterUser((RegisterUser) message);
|
||||
} else if (message instanceof RegisterUserSessionToken) {
|
||||
processRegisterUserSessionToken((RegisterUserSessionToken) message);
|
||||
} else if (message instanceof CreateBreakoutRoom) {
|
||||
processCreateBreakoutRoom((CreateBreakoutRoom) message);
|
||||
} else if (message instanceof PresentationUploadToken) {
|
||||
|
@ -271,6 +271,10 @@ public class Meeting {
|
||||
|
||||
}
|
||||
|
||||
public RegisteredUser getRegisteredUserWithUserId(String userId) {
|
||||
return registeredUsers.get(userId);
|
||||
}
|
||||
|
||||
public RegisteredUser getRegisteredUserWithAuthToken(String authToken) {
|
||||
for (RegisteredUser ruser : registeredUsers.values()) {
|
||||
if (ruser.authToken.equals(authToken)) {
|
||||
|
@ -0,0 +1,16 @@
|
||||
package org.bigbluebutton.api.messaging.messages;
|
||||
|
||||
public class RegisterUserSessionToken implements IMessage {
|
||||
|
||||
public final String meetingID;
|
||||
public final String internalUserId;
|
||||
public final String sessionToken;
|
||||
public final String revokeSessionToken;
|
||||
|
||||
public RegisterUserSessionToken(String meetingID, String internalUserId, String sessionToken, String revokeSessionToken) {
|
||||
this.meetingID = meetingID;
|
||||
this.internalUserId = internalUserId;
|
||||
this.sessionToken = sessionToken;
|
||||
this.revokeSessionToken = revokeSessionToken;
|
||||
}
|
||||
}
|
@ -75,6 +75,7 @@ public interface IBbbWebApiGWApp {
|
||||
String externUserID, String authToken, String sessionToken, String avatarURL, String webcamBackgroundURL,
|
||||
Boolean guest, Boolean authed, String guestStatus, Boolean excludeFromDashboard,
|
||||
String enforceLayout, Map<String, String> userMetadata);
|
||||
void registerUserSessionToken(String meetingID, String internalUserId, String sessionToken, String revokeSessionToken);
|
||||
|
||||
void destroyMeeting(DestroyMeetingMessage msg);
|
||||
void endMeeting(EndMeetingMessage msg);
|
||||
|
@ -5,7 +5,7 @@ import org.apache.pekko.actor.ActorSystem
|
||||
import org.apache.pekko.event.Logging
|
||||
import org.bigbluebutton.api.domain.{BreakoutRoomsParams, Group, LockSettingsParams}
|
||||
import org.bigbluebutton.api.messaging.converters.messages._
|
||||
import org.bigbluebutton.api.messaging.messages.ChatMessageFromApi
|
||||
import org.bigbluebutton.api.messaging.messages.{ChatMessageFromApi, RegisterUserSessionToken}
|
||||
import org.bigbluebutton.api2.bus._
|
||||
import org.bigbluebutton.api2.endpoint.redis.WebRedisSubscriberActor
|
||||
import org.bigbluebutton.common2.redis.MessageSender
|
||||
@ -304,6 +304,13 @@ class BbbWebApiGWApp(
|
||||
msgToAkkaAppsEventBus.publish(MsgToAkkaApps(toAkkaAppsChannel, event))
|
||||
}
|
||||
|
||||
def registerUserSessionToken(meetingId: String, intUserId: String, sessionToken: String, revokeSessionToken: String): Unit = {
|
||||
val regUserSessionToken = new RegisterUserSessionToken(meetingId, intUserId, sessionToken, revokeSessionToken)
|
||||
|
||||
val event = MsgBuilder.buildRegisterUserSessionTokenRequestToAkkaApps(regUserSessionToken)
|
||||
msgToAkkaAppsEventBus.publish(MsgToAkkaApps(toAkkaAppsChannel, event))
|
||||
}
|
||||
|
||||
def destroyMeeting(msg: DestroyMeetingMessage): Unit = {
|
||||
val event = MsgBuilder.buildDestroyMeetingSysCmdMsg(msg)
|
||||
msgToAkkaAppsEventBus.publish(MsgToAkkaApps(toAkkaAppsChannel, event))
|
||||
|
@ -1,7 +1,7 @@
|
||||
package org.bigbluebutton.api2
|
||||
|
||||
import org.bigbluebutton.api.messaging.converters.messages._
|
||||
import org.bigbluebutton.api.messaging.messages.ChatMessageFromApi
|
||||
import org.bigbluebutton.api.messaging.messages.{ ChatMessageFromApi, RegisterUserSessionToken }
|
||||
import org.bigbluebutton.api2.meeting.RegisterUser
|
||||
import org.bigbluebutton.common2.domain.{ DefaultProps, PageVO, PresentationPageConvertedVO, PresentationVO }
|
||||
import org.bigbluebutton.common2.msgs._
|
||||
@ -56,6 +56,20 @@ object MsgBuilder {
|
||||
BbbCommonEnvCoreMsg(envelope, req)
|
||||
}
|
||||
|
||||
def buildRegisterUserSessionTokenRequestToAkkaApps(msg: RegisterUserSessionToken): BbbCommonEnvCoreMsg = {
|
||||
val routing = collection.immutable.HashMap("sender" -> "bbb-web")
|
||||
val envelope = BbbCoreEnvelope(RegisterUserSessionTokenReqMsg.NAME, routing)
|
||||
val header = BbbCoreHeaderWithMeetingId(RegisterUserSessionTokenReqMsg.NAME, msg.meetingID)
|
||||
val body = RegisterUserSessionTokenReqMsgBody(
|
||||
meetingId = msg.meetingID,
|
||||
userId = msg.internalUserId,
|
||||
sessionToken = msg.sessionToken,
|
||||
revokeSessionToken = msg.revokeSessionToken
|
||||
)
|
||||
val req = RegisterUserSessionTokenReqMsg(header, body)
|
||||
BbbCommonEnvCoreMsg(envelope, req)
|
||||
}
|
||||
|
||||
def buildCheckAlivePingSysMsg(system: String, bbbWebTimestamp: Long, akkaAppsTimestamp: Long): BbbCommonEnvCoreMsg = {
|
||||
val routing = collection.immutable.HashMap("sender" -> "bbb-web")
|
||||
val envelope = BbbCoreEnvelope(CheckAlivePingSysMsg.NAME, routing)
|
||||
|
@ -13,7 +13,10 @@ import (
|
||||
// sessionVarsHookUrl is the authentication hook URL obtained from an environment variable.
|
||||
var sessionVarsHookUrl = config.GetConfig().SessionVarsHook.Url
|
||||
|
||||
func AkkaAppsGetSessionVariablesFrom(browserConnectionId string, sessionToken string) (map[string]string, error) {
|
||||
var internalError = fmt.Errorf("server internal error")
|
||||
var internalErrorId = "internal_error"
|
||||
|
||||
func AkkaAppsGetSessionVariablesFrom(browserConnectionId string, sessionToken string) (map[string]string, error, string) {
|
||||
logger := log.WithField("_routine", "AkkaAppsClient").WithField("browserConnectionId", browserConnectionId)
|
||||
logger.Debug("Starting AkkaAppsClient")
|
||||
defer logger.Debug("Finished AkkaAppsClient")
|
||||
@ -23,7 +26,8 @@ func AkkaAppsGetSessionVariablesFrom(browserConnectionId string, sessionToken st
|
||||
|
||||
// Check if the authentication hook URL is set.
|
||||
if sessionVarsHookUrl == "" {
|
||||
return nil, fmt.Errorf("BBB_GRAPHQL_MIDDLEWARE_SESSION_VARS_HOOK_URL not set")
|
||||
log.Error("BBB_GRAPHQL_MIDDLEWARE_SESSION_VARS_HOOK_URL not set")
|
||||
return nil, internalError, internalErrorId
|
||||
}
|
||||
|
||||
log.Trace("Get user session vars from: " + sessionVarsHookUrl + "?sessionToken=" + sessionToken)
|
||||
@ -31,7 +35,8 @@ func AkkaAppsGetSessionVariablesFrom(browserConnectionId string, sessionToken st
|
||||
// Create a new HTTP request to the authentication hook URL.
|
||||
req, err := http.NewRequest("GET", sessionVarsHookUrl, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
log.Error(err)
|
||||
return nil, internalError, internalErrorId
|
||||
}
|
||||
|
||||
// Execute the HTTP request to obtain user session variables (like X-Hasura-Role)
|
||||
@ -39,28 +44,31 @@ func AkkaAppsGetSessionVariablesFrom(browserConnectionId string, sessionToken st
|
||||
req.Header.Set("User-Agent", "bbb-graphql-middleware")
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, internalError, internalErrorId
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
respBody, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, internalError, internalErrorId
|
||||
}
|
||||
|
||||
var respBodyAsMap map[string]string
|
||||
if err := json.Unmarshal(respBody, &respBodyAsMap); err != nil {
|
||||
return nil, err
|
||||
return nil, internalError, internalErrorId
|
||||
}
|
||||
|
||||
// Check the response status.
|
||||
response, ok := respBodyAsMap["response"]
|
||||
message, _ := respBodyAsMap["message"]
|
||||
messageId, _ := respBodyAsMap["message_id"]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("response key not found in the parsed object")
|
||||
log.Error("response key not found in the parsed object")
|
||||
return nil, internalError, internalErrorId
|
||||
}
|
||||
if response != "authorized" {
|
||||
logger.Error(response)
|
||||
return nil, fmt.Errorf("user not authorized")
|
||||
logger.Error(response, message, messageId)
|
||||
return nil, fmt.Errorf(message), messageId
|
||||
}
|
||||
|
||||
// Normalize the response header keys.
|
||||
@ -71,5 +79,5 @@ func AkkaAppsGetSessionVariablesFrom(browserConnectionId string, sessionToken st
|
||||
}
|
||||
}
|
||||
|
||||
return normalizedResponse, nil
|
||||
return normalizedResponse, nil, ""
|
||||
}
|
||||
|
@ -62,7 +62,13 @@ func ConnectionHandler(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
if common.HasReachedMaxGlobalConnections() {
|
||||
common.WsConnectionRejectedCounter.With(prometheus.Labels{"reason": "limit of server connections exceeded"}).Inc()
|
||||
browserWsConn.Close(websocket.StatusInternalError, "limit of server connections exceeded")
|
||||
disconnectWithError(
|
||||
browserWsConn,
|
||||
browserConnectionContext,
|
||||
browserConnectionContextCancel,
|
||||
websocket.StatusInternalError,
|
||||
"connections_limit_exceeded",
|
||||
"limit of server connections exceeded")
|
||||
return
|
||||
}
|
||||
|
||||
@ -123,17 +129,17 @@ func ConnectionHandler(w http.ResponseWriter, r *http.Request) {
|
||||
}()
|
||||
|
||||
//Check authorization and obtain user session variables from bbb-web
|
||||
if errorOnInitConnection := connectionInitHandler(&thisConnection); errorOnInitConnection != nil {
|
||||
if errorOnInitConnection, errorMessageId := connectionInitHandler(&thisConnection); errorOnInitConnection != nil {
|
||||
common.WsConnectionRejectedCounter.With(prometheus.Labels{"reason": errorOnInitConnection.Error()}).Inc()
|
||||
|
||||
disconnectWithError(
|
||||
browserWsConn,
|
||||
browserConnectionContext,
|
||||
browserConnectionContextCancel,
|
||||
//If the server wishes to reject the connection it is recommended to close the socket with `4403: Forbidden`.
|
||||
//https://github.com/enisdenjo/graphql-ws/blob/63881c3372a3564bf42040e3f572dd74e41b2e49/PROTOCOL.md?plain=1#L36
|
||||
wsError := &websocket.CloseError{
|
||||
Code: websocket.StatusCode(4403),
|
||||
Reason: errorOnInitConnection.Error(),
|
||||
}
|
||||
browserWsConn.Close(wsError.Code, wsError.Reason)
|
||||
browserConnectionContextCancel()
|
||||
websocket.StatusCode(4403),
|
||||
errorMessageId,
|
||||
errorOnInitConnection.Error())
|
||||
}
|
||||
|
||||
common.WsConnectionAcceptedCounter.Inc()
|
||||
@ -203,7 +209,7 @@ func ConnectionHandler(w http.ResponseWriter, r *http.Request) {
|
||||
wgAll.Wait()
|
||||
}
|
||||
|
||||
func InvalidateSessionTokenConnections(sessionTokenToInvalidate string) {
|
||||
func InvalidateSessionTokenHasuraConnections(sessionTokenToInvalidate string) {
|
||||
BrowserConnectionsMutex.RLock()
|
||||
connectionsToProcess := make([]*common.BrowserConnection, 0)
|
||||
for _, browserConnection := range BrowserConnections {
|
||||
@ -232,7 +238,7 @@ func invalidateHasuraConnectionForSessionToken(bc *common.BrowserConnection, ses
|
||||
return // If there's no Hasura connection, there's nothing to invalidate.
|
||||
}
|
||||
|
||||
log.Debugf("Processing invalidate request for sessionToken %v (hasura connection %v)", sessionToken, bc.HasuraConnection.Id)
|
||||
log.Debugf("Processing reconnection request for sessionToken %v (hasura connection %v)", sessionToken, bc.HasuraConnection.Id)
|
||||
|
||||
// Stop receiving new messages from the browser.
|
||||
log.Debug("freezing channel fromBrowserToHasuraChannel")
|
||||
@ -250,41 +256,84 @@ func invalidateHasuraConnectionForSessionToken(bc *common.BrowserConnection, ses
|
||||
go SendUserGraphqlReconnectionForcedEvtMsg(sessionToken)
|
||||
}
|
||||
|
||||
func refreshUserSessionVariables(browserConnection *common.BrowserConnection) error {
|
||||
func InvalidateSessionTokenBrowserConnections(sessionTokenToInvalidate string, reasonMsgId string, reason string) {
|
||||
BrowserConnectionsMutex.RLock()
|
||||
connectionsToProcess := make([]*common.BrowserConnection, 0)
|
||||
for _, browserConnection := range BrowserConnections {
|
||||
if browserConnection.SessionToken == sessionTokenToInvalidate {
|
||||
connectionsToProcess = append(connectionsToProcess, browserConnection)
|
||||
}
|
||||
}
|
||||
BrowserConnectionsMutex.RUnlock()
|
||||
|
||||
var wg sync.WaitGroup
|
||||
for _, browserConnection := range connectionsToProcess {
|
||||
wg.Add(1)
|
||||
go func(bc *common.BrowserConnection) {
|
||||
defer wg.Done()
|
||||
invalidateBrowserConnectionForSessionToken(bc, sessionTokenToInvalidate, reasonMsgId, reason)
|
||||
}(browserConnection)
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func invalidateBrowserConnectionForSessionToken(bc *common.BrowserConnection, sessionToken string, reasonMsgId string, reason string) {
|
||||
BrowserConnectionsMutex.RLock()
|
||||
defer BrowserConnectionsMutex.RUnlock()
|
||||
|
||||
log.Debugf("Processing disconnection request for sessionToken %v (browser connection %v)", sessionToken, bc.Id)
|
||||
|
||||
// Stop receiving new messages from the browser.
|
||||
log.Debug("freezing channel fromBrowserToHasuraChannel")
|
||||
bc.FromBrowserToHasuraChannel.FreezeChannel()
|
||||
|
||||
disconnectWithError(
|
||||
bc.Websocket,
|
||||
bc.Context,
|
||||
bc.ContextCancelFunc,
|
||||
websocket.StatusCode(4403),
|
||||
reasonMsgId,
|
||||
reason)
|
||||
|
||||
// Send a reconnection confirmation message
|
||||
go SendUserGraphqlDisconnectionForcedEvtMsg(sessionToken)
|
||||
}
|
||||
|
||||
func refreshUserSessionVariables(browserConnection *common.BrowserConnection) (error, string) {
|
||||
BrowserConnectionsMutex.RLock()
|
||||
sessionToken := browserConnection.SessionToken
|
||||
browserConnectionId := browserConnection.Id
|
||||
BrowserConnectionsMutex.RUnlock()
|
||||
|
||||
// Check authorization
|
||||
sessionVariables, err := akka_apps.AkkaAppsGetSessionVariablesFrom(browserConnectionId, sessionToken)
|
||||
sessionVariables, err, errorId := akka_apps.AkkaAppsGetSessionVariablesFrom(browserConnectionId, sessionToken)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
return fmt.Errorf("error on checking sessionToken authorization")
|
||||
return fmt.Errorf("error on checking sessionToken authorization: %s", err.Error()), errorId
|
||||
} else {
|
||||
log.Trace("Session variables obtained successfully")
|
||||
}
|
||||
|
||||
if _, exists := sessionVariables["x-hasura-role"]; !exists {
|
||||
return fmt.Errorf("error on checking sessionToken authorization, X-Hasura-Role is missing")
|
||||
return fmt.Errorf("error on checking sessionToken authorization, X-Hasura-Role is missing"), "param_missing"
|
||||
}
|
||||
|
||||
if _, exists := sessionVariables["x-hasura-userid"]; !exists {
|
||||
return fmt.Errorf("error on checking sessionToken authorization, X-Hasura-UserId is missing")
|
||||
return fmt.Errorf("error on checking sessionToken authorization, X-Hasura-UserId is missing"), "param_missing"
|
||||
}
|
||||
|
||||
if _, exists := sessionVariables["x-hasura-meetingid"]; !exists {
|
||||
return fmt.Errorf("error on checking sessionToken authorization, X-Hasura-MeetingId is missing")
|
||||
return fmt.Errorf("error on checking sessionToken authorization, X-Hasura-MeetingId is missing"), "param_missing"
|
||||
}
|
||||
|
||||
BrowserConnectionsMutex.Lock()
|
||||
browserConnection.BBBWebSessionVariables = sessionVariables
|
||||
BrowserConnectionsMutex.Unlock()
|
||||
|
||||
return nil
|
||||
return nil, ""
|
||||
}
|
||||
|
||||
func connectionInitHandler(browserConnection *common.BrowserConnection) error {
|
||||
func connectionInitHandler(browserConnection *common.BrowserConnection) (error, string) {
|
||||
BrowserConnectionsMutex.RLock()
|
||||
browserConnectionId := browserConnection.Id
|
||||
browserConnectionCookies := browserConnection.BrowserRequestCookies
|
||||
@ -295,7 +344,7 @@ func connectionInitHandler(browserConnection *common.BrowserConnection) error {
|
||||
fromBrowserMessage, ok := browserConnection.FromBrowserToHasuraChannel.Receive()
|
||||
if !ok {
|
||||
//Received all messages. Channel is closed
|
||||
return fmt.Errorf("error on receiving init connection")
|
||||
return fmt.Errorf("error on receiving init connection"), "param_missing"
|
||||
}
|
||||
if bytes.Contains(fromBrowserMessage, []byte("\"connection_init\"")) {
|
||||
var fromBrowserMessageAsMap map[string]interface{}
|
||||
@ -308,26 +357,26 @@ func connectionInitHandler(browserConnection *common.BrowserConnection) error {
|
||||
var headersAsMap = payloadAsMap["headers"].(map[string]interface{})
|
||||
var sessionToken, existsSessionToken = headersAsMap["X-Session-Token"].(string)
|
||||
if !existsSessionToken {
|
||||
return fmt.Errorf("X-Session-Token header missing on init connection")
|
||||
return fmt.Errorf("X-Session-Token header missing on init connection"), "param_missing"
|
||||
}
|
||||
|
||||
if common.HasReachedMaxUserConnections(sessionToken) {
|
||||
return fmt.Errorf("too many connections")
|
||||
return fmt.Errorf("too many connections"), "too_many_connections"
|
||||
}
|
||||
|
||||
var clientSessionUUID, existsClientSessionUUID = headersAsMap["X-ClientSessionUUID"].(string)
|
||||
if !existsClientSessionUUID {
|
||||
return fmt.Errorf("X-ClientSessionUUID header missing on init connection")
|
||||
return fmt.Errorf("X-ClientSessionUUID header missing on init connection"), "param_missing"
|
||||
}
|
||||
|
||||
var clientType, existsClientType = headersAsMap["X-ClientType"].(string)
|
||||
if !existsClientType {
|
||||
return fmt.Errorf("X-ClientType header missing on init connection")
|
||||
return fmt.Errorf("X-ClientType header missing on init connection"), "param_missing"
|
||||
}
|
||||
|
||||
var clientIsMobile, existsMobile = headersAsMap["X-ClientIsMobile"].(string)
|
||||
if !existsMobile {
|
||||
return fmt.Errorf("X-ClientIsMobile header missing on init connection")
|
||||
return fmt.Errorf("X-ClientIsMobile header missing on init connection"), "param_missing"
|
||||
}
|
||||
|
||||
var meetingId, userId string
|
||||
@ -349,15 +398,15 @@ func connectionInitHandler(browserConnection *common.BrowserConnection) error {
|
||||
}
|
||||
|
||||
if errCheckAuthorization != nil {
|
||||
return fmt.Errorf("error on trying to check authorization")
|
||||
return fmt.Errorf("error on trying to check authorization"), "user_not_found"
|
||||
}
|
||||
|
||||
if meetingId == "" {
|
||||
return fmt.Errorf("error to obtain user meetingId from BBBWebCheckAuthorization")
|
||||
return fmt.Errorf("error on trying to check authorization"), "user_not_found"
|
||||
}
|
||||
|
||||
if userId == "" {
|
||||
return fmt.Errorf("error to obtain user userId from BBBWebCheckAuthorization")
|
||||
return fmt.Errorf("error on trying to check authorization"), "user_not_found"
|
||||
}
|
||||
|
||||
log.Trace("Success on check authorization")
|
||||
@ -371,8 +420,8 @@ func connectionInitHandler(browserConnection *common.BrowserConnection) error {
|
||||
browserConnection.ConnectionInitMessage = fromBrowserMessage
|
||||
BrowserConnectionsMutex.Unlock()
|
||||
|
||||
if err := refreshUserSessionVariables(browserConnection); err != nil {
|
||||
return fmt.Errorf("error on getting session variables")
|
||||
if err, errorId := refreshUserSessionVariables(browserConnection); err != nil {
|
||||
return err, errorId
|
||||
}
|
||||
|
||||
go SendUserGraphqlConnectionEstablishedSysMsg(
|
||||
@ -387,5 +436,42 @@ func connectionInitHandler(browserConnection *common.BrowserConnection) error {
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
return nil, ""
|
||||
}
|
||||
|
||||
func disconnectWithError(
|
||||
browserConnectionWs *websocket.Conn,
|
||||
browserConnectionContext context.Context,
|
||||
browserConnectionContextCancel context.CancelFunc,
|
||||
wsCloseStatusCode websocket.StatusCode,
|
||||
reasonMessageId string,
|
||||
reasonMessage string) {
|
||||
|
||||
//Chromium-based browsers can't read websocket close code/reason, so it will send this message before closing conn
|
||||
browserResponseData := map[string]interface{}{
|
||||
"id": "-1", //The client recognizes this message ID as a signal to terminate the session
|
||||
"type": "error",
|
||||
"payload": []interface{}{
|
||||
map[string]interface{}{
|
||||
"messageId": reasonMessageId,
|
||||
"message": reasonMessage,
|
||||
},
|
||||
},
|
||||
}
|
||||
jsonData, _ := json.Marshal(browserResponseData)
|
||||
|
||||
log.Tracef("sending to browser: %s", string(jsonData))
|
||||
err := browserConnectionWs.Write(browserConnectionContext, websocket.MessageText, jsonData)
|
||||
if err != nil {
|
||||
log.Debugf("Browser is disconnected, skipping writing of ws message: %v", err)
|
||||
}
|
||||
|
||||
errCloseWs := browserConnectionWs.Close(wsCloseStatusCode, reasonMessage)
|
||||
if errCloseWs != nil {
|
||||
log.Debugf("Error on close websocket: %v", errCloseWs)
|
||||
}
|
||||
|
||||
browserConnectionContextCancel()
|
||||
|
||||
return
|
||||
}
|
||||
|
@ -37,6 +37,7 @@ func StartRedisListener() {
|
||||
|
||||
// Skip parsing unnecessary messages
|
||||
if !strings.Contains(msg.Payload, "ForceUserGraphqlReconnectionSysMsg") &&
|
||||
!strings.Contains(msg.Payload, "ForceUserGraphqlDisconnectionSysMsg") &&
|
||||
!strings.Contains(msg.Payload, "CheckGraphqlMiddlewareAlivePingSysMsg") {
|
||||
continue
|
||||
}
|
||||
@ -57,10 +58,21 @@ func StartRedisListener() {
|
||||
messageBodyAsMap := messageCoreAsMap["body"].(map[string]interface{})
|
||||
sessionTokenToInvalidate := messageBodyAsMap["sessionToken"]
|
||||
reason := messageBodyAsMap["reason"]
|
||||
log.Debugf("Received invalidate request for sessionToken %v (%v)", sessionTokenToInvalidate, reason)
|
||||
log.Debugf("Received reconnection request for sessionToken %v (%v)", sessionTokenToInvalidate, reason)
|
||||
|
||||
go InvalidateSessionTokenHasuraConnections(sessionTokenToInvalidate.(string))
|
||||
}
|
||||
|
||||
if messageType == "ForceUserGraphqlDisconnectionSysMsg" {
|
||||
messageCoreAsMap := messageAsMap["core"].(map[string]interface{})
|
||||
messageBodyAsMap := messageCoreAsMap["body"].(map[string]interface{})
|
||||
sessionTokenToInvalidate := messageBodyAsMap["sessionToken"]
|
||||
reason := messageBodyAsMap["reason"]
|
||||
reasonMsgId := messageBodyAsMap["reasonMessageId"]
|
||||
log.Debugf("Received disconnection request for sessionToken %v (%s - %s)", sessionTokenToInvalidate, reasonMsgId, reason)
|
||||
|
||||
//Not being used yet
|
||||
go InvalidateSessionTokenConnections(sessionTokenToInvalidate.(string))
|
||||
go InvalidateSessionTokenBrowserConnections(sessionTokenToInvalidate.(string), reasonMsgId.(string), reason.(string))
|
||||
}
|
||||
|
||||
//Ping message requires a response with a Pong message
|
||||
@ -126,6 +138,15 @@ func SendUserGraphqlReconnectionForcedEvtMsg(sessionToken string) {
|
||||
sendBbbCoreMsgToRedis("UserGraphqlReconnectionForcedEvtMsg", body)
|
||||
}
|
||||
|
||||
func SendUserGraphqlDisconnectionForcedEvtMsg(sessionToken string) {
|
||||
var body = map[string]interface{}{
|
||||
"middlewareUID": common.GetUniqueID(),
|
||||
"sessionToken": sessionToken,
|
||||
}
|
||||
|
||||
sendBbbCoreMsgToRedis("UserGraphqlDisconnectionForcedEvtMsg", body)
|
||||
}
|
||||
|
||||
func SendUserGraphqlConnectionEstablishedSysMsg(
|
||||
sessionToken string,
|
||||
clientSessionUUID string,
|
||||
|
@ -269,7 +269,6 @@ CREATE TABLE "user" (
|
||||
"avatar" varchar(500),
|
||||
"webcamBackground" varchar(500),
|
||||
"color" varchar(7),
|
||||
"sessionToken" varchar(16),
|
||||
"authToken" varchar(16),
|
||||
"authed" bool,
|
||||
"joined" bool,
|
||||
@ -761,6 +760,18 @@ GROUP BY u."meetingId", u."userId";
|
||||
|
||||
CREATE INDEX "idx_user_connectionStatusMetrics_UnstableReport" ON "user_connectionStatusMetrics" ("meetingId", "userId") WHERE "status" != 'normal';
|
||||
|
||||
CREATE TABLE "user_sessionToken" (
|
||||
"meetingId" varchar(100),
|
||||
"userId" varchar(50),
|
||||
"sessionToken" varchar(16),
|
||||
"createdAt" timestamp with time zone not null default current_timestamp,
|
||||
"removedAt" timestamp with time zone,
|
||||
CONSTRAINT "user_sessionToken_pk" PRIMARY KEY ("meetingId", "userId","sessionToken"),
|
||||
FOREIGN KEY ("meetingId", "userId") REFERENCES "user"("meetingId","userId") ON DELETE CASCADE
|
||||
);
|
||||
|
||||
CREATE INDEX "idx_user_sessionToken_stk" ON "user_sessionToken"("sessionToken");
|
||||
create view "v_user_sessionToken" as select * from "user_sessionToken";
|
||||
|
||||
CREATE TABLE "user_graphqlConnection" (
|
||||
"graphqlConnectionId" serial PRIMARY KEY,
|
||||
|
@ -158,7 +158,7 @@ const ConnectionManager: React.FC<ConnectionManagerProps> = ({ children }): Reac
|
||||
},
|
||||
on: {
|
||||
error: (error) => {
|
||||
logger.error('Error: on subscription to server', error);
|
||||
logger.error('Graphql Client Error:', error);
|
||||
loadingContextInfo.setLoading(false, '');
|
||||
connectionStatus.setConnectedStatus(false);
|
||||
setErrorCounts((prev: number) => prev + 1);
|
||||
@ -183,6 +183,13 @@ const ConnectionManager: React.FC<ConnectionManagerProps> = ({ children }): Reac
|
||||
if (message.type === 'ping') {
|
||||
tsLastPingMessageRef.current = Date.now();
|
||||
}
|
||||
if (message.type === 'error' && message.id === '-1') {
|
||||
// message ID -1 as a signal to terminate the session
|
||||
// it contains a prop message.messageId which can be used to show a proper error to the user
|
||||
logger.error({ logCode: 'graphql_server_closed_connection', extraInfo: message }, 'Graphql Server closed the connection');
|
||||
loadingContextInfo.setLoading(false, '');
|
||||
setTerminalError('Server closed the connection');
|
||||
}
|
||||
tsLastMessageRef.current = Date.now();
|
||||
},
|
||||
|
||||
|
@ -264,13 +264,24 @@ class ApiController {
|
||||
request
|
||||
)
|
||||
|
||||
boolean redirectClient = REDIRECT_RESPONSE
|
||||
if(!(validationResponse == null)) {
|
||||
invalid(validationResponse.getKey(), validationResponse.getValue(), redirectClient)
|
||||
return
|
||||
}
|
||||
|
||||
String existingUserID = params.existingUserID
|
||||
if (!StringUtils.isEmpty(existingUserID)) {
|
||||
handleJoinExistingUser(existingUserID)
|
||||
return
|
||||
}
|
||||
|
||||
HashMap<String, String> roles = new HashMap<String, String>();
|
||||
|
||||
roles.put("moderator", ROLE_MODERATOR)
|
||||
roles.put("viewer", ROLE_ATTENDEE)
|
||||
|
||||
//check if exists the param redirect
|
||||
boolean redirectClient = REDIRECT_RESPONSE
|
||||
String clientURL = paramsProcessorUtil.getDefaultHTML5ClientUrl();
|
||||
|
||||
if (!StringUtils.isEmpty(params.redirect)) {
|
||||
@ -279,11 +290,6 @@ class ApiController {
|
||||
} catch (Exception ignored) {}
|
||||
}
|
||||
|
||||
if(!(validationResponse == null)) {
|
||||
invalid(validationResponse.getKey(), validationResponse.getValue(), redirectClient)
|
||||
return
|
||||
}
|
||||
|
||||
Boolean authenticated = false;
|
||||
|
||||
Boolean guest = false;
|
||||
@ -552,6 +558,121 @@ class ApiController {
|
||||
}
|
||||
}
|
||||
|
||||
def handleJoinExistingUser(String existingUserID) {
|
||||
Meeting meeting = ServiceUtils.findMeetingFromMeetingID(params.meetingID);
|
||||
UserSession existingUserSession = meetingService.getUserSessionWithUserId(existingUserID)
|
||||
|
||||
//check if exists the param redirect
|
||||
boolean redirectClient = REDIRECT_RESPONSE
|
||||
String clientURL = paramsProcessorUtil.getDefaultHTML5ClientUrl();
|
||||
|
||||
if (!StringUtils.isEmpty(params.redirect)) {
|
||||
try {
|
||||
redirectClient = Boolean.parseBoolean(params.redirect);
|
||||
} catch (Exception ignored) {}
|
||||
}
|
||||
|
||||
String sessionToken = RandomStringUtils.randomAlphanumeric(16).toLowerCase()
|
||||
log.debug "Session token: " + sessionToken
|
||||
|
||||
UserSession us = new UserSession();
|
||||
us.authToken = existingUserSession.authToken;
|
||||
us.internalUserId = existingUserSession.internalUserId
|
||||
us.conferencename = meeting.getName()
|
||||
us.meetingID = meeting.getInternalId()
|
||||
us.externMeetingID = meeting.getExternalId()
|
||||
us.externUserID = existingUserSession.externUserID
|
||||
us.fullname = existingUserSession.fullname
|
||||
us.role = existingUserSession.role
|
||||
us.conference = meeting.getInternalId()
|
||||
us.room = meeting.getInternalId()
|
||||
us.voicebridge = meeting.getTelVoice()
|
||||
us.webvoiceconf = meeting.getWebVoice()
|
||||
us.mode = "LIVE"
|
||||
us.record = meeting.isRecord()
|
||||
us.welcome = meeting.getWelcomeMessage()
|
||||
us.guest = existingUserSession.guest
|
||||
us.authed = existingUserSession.authed
|
||||
us.guestStatus = existingUserSession.guestStatus
|
||||
us.logoutUrl = meeting.getLogoutUrl()
|
||||
us.defaultLayout = meeting.getMeetingLayout()
|
||||
us.leftGuestLobby = false
|
||||
us.avatarURL = existingUserSession.avatarURL
|
||||
us.excludeFromDashboard = existingUserSession.excludeFromDashboard
|
||||
|
||||
if (!StringUtils.isEmpty(params.defaultLayout)) {
|
||||
us.defaultLayout = params.defaultLayout;
|
||||
}
|
||||
|
||||
if (!StringUtils.isEmpty(params.enforceLayout)) {
|
||||
us.enforceLayout = params.enforceLayout;
|
||||
}
|
||||
|
||||
//used to drop the previous session of the user
|
||||
String revokeSessionToken = ""
|
||||
if (!StringUtils.isEmpty(params.revokeSessionToken)) {
|
||||
revokeSessionToken = params.revokeSessionToken;
|
||||
}
|
||||
|
||||
// Register a new session token to the user
|
||||
meetingService.registerUserSession(
|
||||
us.meetingID,
|
||||
us.internalUserId,
|
||||
sessionToken,
|
||||
revokeSessionToken
|
||||
)
|
||||
|
||||
session.setMaxInactiveInterval(paramsProcessorUtil.getDefaultHttpSessionTimeout())
|
||||
|
||||
String msgKey = "successfullyJoined"
|
||||
String msgValue = "You have joined successfully."
|
||||
|
||||
// Keep track of the client url in case this needs to wait for
|
||||
// approval as guest. We need to be able to send the user to the
|
||||
// client after being approved by moderator.
|
||||
us.clientUrl = clientURL + "?sessionToken=" + sessionToken
|
||||
|
||||
session[sessionToken] = sessionToken
|
||||
meetingService.addUserSession(sessionToken, us)
|
||||
|
||||
//Identify which of these to logs should be used. sessionToken or user-token
|
||||
log.info("Session sessionToken for " + us.fullname + " [" + session[sessionToken] + "]")
|
||||
log.info("Session user-token for " + us.fullname + " [" + session['user-token'] + "]")
|
||||
|
||||
log.info("Session token: ${sessionToken}")
|
||||
|
||||
// Process if we send the user directly to the client or
|
||||
// have it wait for approval.
|
||||
String destUrl = clientURL + "?sessionToken=" + sessionToken
|
||||
|
||||
Map<String, Object> logData = new HashMap<String, Object>();
|
||||
logData.put("meetingid", us.meetingID);
|
||||
logData.put("extMeetingid", us.externMeetingID);
|
||||
logData.put("name", us.fullname);
|
||||
logData.put("userid", us.internalUserId);
|
||||
logData.put("sessionToken", sessionToken);
|
||||
logData.put("logCode", "join_api");
|
||||
logData.put("description", "Handle JOIN API.");
|
||||
|
||||
Gson gson = new Gson();
|
||||
String logStr = gson.toJson(logData);
|
||||
|
||||
log.info(" --analytics-- data=" + logStr);
|
||||
|
||||
if (redirectClient) {
|
||||
log.info("Redirecting to ${destUrl}");
|
||||
redirect(url: destUrl);
|
||||
} else {
|
||||
log.info("Successfully joined. Sending XML response.");
|
||||
response.addHeader("Cache-Control", "no-cache")
|
||||
withFormat {
|
||||
xml {
|
||||
render(text: responseBuilder.buildJoinMeeting(us, session[sessionToken], guestStatusVal, destUrl, msgKey, msgValue, RESP_CODE_SUCCESS), contentType: "text/xml")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*******************************************
|
||||
* IS_MEETING_RUNNING API
|
||||
*******************************************/
|
||||
@ -1043,7 +1164,12 @@ class ApiController {
|
||||
queryParameters.put("meetingID", externalMeetingId);
|
||||
queryParameters.put("role", us.role.equals(ROLE_MODERATOR) ? ROLE_MODERATOR : ROLE_ATTENDEE);
|
||||
queryParameters.put("redirect", "true");
|
||||
queryParameters.put("userID", us.getExternUserID());
|
||||
queryParameters.put("existingUserID", us.getInternalUserId());
|
||||
|
||||
// revokePreviousSession: If this link is intended to replace the previous session of the user
|
||||
if (!StringUtils.isEmpty(params.revokePreviousSession) && Boolean.parseBoolean(params.revokePreviousSession)) {
|
||||
queryParameters.put("revokeSessionToken", sessionToken);
|
||||
}
|
||||
|
||||
// If the user calling getJoinUrl is a moderator (except in breakout rooms), allow to specify additional parameters
|
||||
if (us.role.equals(ROLE_MODERATOR) && !meeting.isBreakout()) {
|
||||
|
Loading…
Reference in New Issue
Block a user