Refactor (gql-middleware): Introduces clientSessionUUID
and validations (error handling) (#20353)
This commit is contained in:
parent
7514066fc3
commit
fd3071c28a
@ -130,4 +130,4 @@ case class UserClosedAllGraphqlConnectionsInternalMsg(userId: String) extends In
|
||||
* Sent by GraphqlActionsActor to inform MeetingActor that user came back from disconnection
|
||||
* @param userId
|
||||
*/
|
||||
case class UserEstablishedGraphqlConnectionInternalMsg(userId: String) extends InMessage
|
||||
case class UserEstablishedGraphqlConnectionInternalMsg(userId: String, clientType: String, isMobile: Boolean) extends InMessage
|
||||
|
@ -1,42 +0,0 @@
|
||||
package org.bigbluebutton.core.apps.users
|
||||
|
||||
import org.bigbluebutton.common2.msgs._
|
||||
import org.bigbluebutton.core.apps.{ RightsManagementTrait }
|
||||
import org.bigbluebutton.core.models.{ UserState, Users2x }
|
||||
import org.bigbluebutton.core.running.{ LiveMeeting, OutMsgRouter }
|
||||
|
||||
trait ChangeUserMobileFlagReqMsgHdlr extends RightsManagementTrait {
|
||||
this: UsersApp =>
|
||||
|
||||
val liveMeeting: LiveMeeting
|
||||
val outGW: OutMsgRouter
|
||||
|
||||
def handleChangeUserMobileFlagReqMsg(msg: ChangeUserMobileFlagReqMsg): Unit = {
|
||||
log.info("handleChangeUserMobileFlagReqMsg: mobile={} userId={}", msg.body.mobile, msg.body.userId)
|
||||
|
||||
def broadcastUserMobileChanged(user: UserState, mobile: Boolean): Unit = {
|
||||
val routingChange = Routing.addMsgToClientRouting(
|
||||
MessageTypes.BROADCAST_TO_MEETING,
|
||||
liveMeeting.props.meetingProp.intId, user.intId
|
||||
)
|
||||
val envelopeChange = BbbCoreEnvelope(UserMobileFlagChangedEvtMsg.NAME, routingChange)
|
||||
val headerChange = BbbClientMsgHeader(UserMobileFlagChangedEvtMsg.NAME, liveMeeting.props.meetingProp.intId,
|
||||
user.intId)
|
||||
|
||||
val bodyChange = UserMobileFlagChangedEvtMsgBody(user.intId, mobile)
|
||||
val eventChange = UserMobileFlagChangedEvtMsg(headerChange, bodyChange)
|
||||
val msgEventChange = BbbCommonEnvCoreMsg(envelopeChange, eventChange)
|
||||
outGW.send(msgEventChange)
|
||||
}
|
||||
|
||||
for {
|
||||
user <- Users2x.findWithIntId(liveMeeting.users2x, msg.body.userId)
|
||||
} yield {
|
||||
if (user.mobile != msg.body.mobile) {
|
||||
val userMobile = Users2x.setMobile(liveMeeting.users2x, user)
|
||||
broadcastUserMobileChanged(userMobile, msg.body.mobile)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
@ -1,8 +1,9 @@
|
||||
package org.bigbluebutton.core.apps.users
|
||||
|
||||
import org.bigbluebutton.common2.msgs.{ BbbClientMsgHeader, BbbCommonEnvCoreMsg, BbbCoreEnvelope, MessageTypes, Routing, UserMobileFlagChangedEvtMsg, UserMobileFlagChangedEvtMsgBody }
|
||||
import org.bigbluebutton.core.api.UserEstablishedGraphqlConnectionInternalMsg
|
||||
import org.bigbluebutton.core.domain.MeetingState2x
|
||||
import org.bigbluebutton.core.models.Users2x
|
||||
import org.bigbluebutton.core.models.{ UserState, Users2x }
|
||||
import org.bigbluebutton.core.running.{ HandlerHelpers, LiveMeeting, MeetingActor, OutMsgRouter }
|
||||
|
||||
trait UserEstablishedGraphqlConnectionInternalMsgHdlr extends HandlerHelpers {
|
||||
@ -12,18 +13,50 @@ trait UserEstablishedGraphqlConnectionInternalMsgHdlr extends HandlerHelpers {
|
||||
val outGW: OutMsgRouter
|
||||
|
||||
def handleUserEstablishedGraphqlConnectionInternalMsg(msg: UserEstablishedGraphqlConnectionInternalMsg, state: MeetingState2x): MeetingState2x = {
|
||||
log.info("Received user established a graphql connection. user {} meetingId={}", msg.userId, liveMeeting.props.meetingProp.intId)
|
||||
log.info("Received user established a graphql connection. msg={} meetingId={}", msg, liveMeeting.props.meetingProp.intId)
|
||||
Users2x.findWithIntId(liveMeeting.users2x, msg.userId) match {
|
||||
case Some(reconnectingUser) =>
|
||||
if (reconnectingUser.userLeftFlag.left) {
|
||||
case Some(connectingUser) =>
|
||||
var userNewState = connectingUser
|
||||
|
||||
if (connectingUser.userLeftFlag.left) {
|
||||
log.info("Resetting flag that user left meeting. user {}", msg.userId)
|
||||
sendUserLeftFlagUpdatedEvtMsg(outGW, liveMeeting, msg.userId, leftFlag = false)
|
||||
Users2x.resetUserLeftFlag(liveMeeting.users2x, msg.userId)
|
||||
for {
|
||||
userUpdated <- Users2x.resetUserLeftFlag(liveMeeting.users2x, msg.userId)
|
||||
} yield {
|
||||
userNewState = userUpdated
|
||||
}
|
||||
}
|
||||
|
||||
//isMobile and clientType are set on join, but if it is a reconnection join is not necessary
|
||||
//so it need to be set here
|
||||
if (connectingUser.mobile != msg.isMobile) {
|
||||
userNewState = Users2x.setMobile(liveMeeting.users2x, userNewState)
|
||||
broadcastUserMobileChanged(userNewState, msg.isMobile)
|
||||
}
|
||||
|
||||
if (connectingUser.clientType != msg.clientType) {
|
||||
userNewState = Users2x.setClientType(liveMeeting.users2x, userNewState, msg.clientType)
|
||||
}
|
||||
|
||||
state
|
||||
case None =>
|
||||
state
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def broadcastUserMobileChanged(user: UserState, mobile: Boolean): Unit = {
|
||||
val routingChange = Routing.addMsgToClientRouting(
|
||||
MessageTypes.BROADCAST_TO_MEETING,
|
||||
liveMeeting.props.meetingProp.intId, user.intId
|
||||
)
|
||||
val envelopeChange = BbbCoreEnvelope(UserMobileFlagChangedEvtMsg.NAME, routingChange)
|
||||
val headerChange = BbbClientMsgHeader(UserMobileFlagChangedEvtMsg.NAME, liveMeeting.props.meetingProp.intId,
|
||||
user.intId)
|
||||
|
||||
val bodyChange = UserMobileFlagChangedEvtMsgBody(user.intId, mobile)
|
||||
val eventChange = UserMobileFlagChangedEvtMsg(headerChange, bodyChange)
|
||||
val msgEventChange = BbbCommonEnvCoreMsg(envelopeChange, eventChange)
|
||||
outGW.send(msgEventChange)
|
||||
}
|
||||
}
|
||||
|
@ -25,7 +25,7 @@ trait UserJoinMeetingAfterReconnectReqMsgHdlr extends HandlerHelpers with UserJo
|
||||
}
|
||||
state
|
||||
case None =>
|
||||
val newState = userJoinMeeting(outGW, msg.body.authToken, msg.body.clientType, liveMeeting, state)
|
||||
val newState = userJoinMeeting(outGW, msg.body.authToken, msg.body.clientType, mobile = false, liveMeeting, state)
|
||||
if (liveMeeting.props.meetingProp.isBreakout) {
|
||||
BreakoutHdlrHelpers.updateParentMeetingWithUsers(liveMeeting, eventBus)
|
||||
}
|
||||
|
@ -46,7 +46,7 @@ trait UserJoinMeetingReqMsgHdlr extends HandlerHelpers {
|
||||
}
|
||||
|
||||
private def handleSuccessfulUserJoin(msg: UserJoinMeetingReqMsg, regUser: RegisteredUser) = {
|
||||
val newState = userJoinMeeting(outGW, msg.body.authToken, msg.body.clientType, liveMeeting, state)
|
||||
val newState = userJoinMeeting(outGW, msg.body.authToken, msg.body.clientType, msg.body.clientIsMobile, liveMeeting, state)
|
||||
updateParentMeetingWithNewListOfUsers()
|
||||
notifyPreviousUsersWithSameExtId(regUser)
|
||||
clearCachedVoiceUser(regUser)
|
||||
|
@ -167,7 +167,6 @@ class UsersApp(
|
||||
with GetRecordingStatusReqMsgHdlr
|
||||
with AssignPresenterReqMsgHdlr
|
||||
with ChangeUserPinStateReqMsgHdlr
|
||||
with ChangeUserMobileFlagReqMsgHdlr
|
||||
with UserConnectionAliveReqMsgHdlr
|
||||
with ChangeUserReactionEmojiReqMsgHdlr
|
||||
with ChangeUserRaiseHandReqMsgHdlr
|
||||
|
@ -9,6 +9,9 @@ import scala.util.{Failure, Success }
|
||||
case class UserGraphqlConnectionDbModel (
|
||||
graphqlConnectionId: Option[Int],
|
||||
sessionToken: String,
|
||||
clientSessionUUID: String,
|
||||
clientType: String,
|
||||
clientIsMobile: Boolean,
|
||||
middlewareUID: String,
|
||||
middlewareConnectionId: String,
|
||||
establishedAt: java.sql.Timestamp,
|
||||
@ -17,10 +20,13 @@ case class UserGraphqlConnectionDbModel (
|
||||
|
||||
class UserGraphqlConnectionDbTableDef(tag: Tag) extends Table[UserGraphqlConnectionDbModel](tag, None, "user_graphqlConnection") {
|
||||
override def * = (
|
||||
graphqlConnectionId, sessionToken, middlewareUID, middlewareConnectionId, establishedAt, closedAt
|
||||
graphqlConnectionId, sessionToken, clientSessionUUID, clientType, clientIsMobile, middlewareUID, middlewareConnectionId, establishedAt, closedAt
|
||||
) <> (UserGraphqlConnectionDbModel.tupled, UserGraphqlConnectionDbModel.unapply)
|
||||
val graphqlConnectionId = column[Option[Int]]("graphqlConnectionId", O.PrimaryKey, O.AutoInc)
|
||||
val sessionToken = column[String]("sessionToken")
|
||||
val clientSessionUUID = column[String]("clientSessionUUID")
|
||||
val clientType = column[String]("clientType")
|
||||
val clientIsMobile = column[Boolean]("clientIsMobile")
|
||||
val middlewareUID = column[String]("middlewareUID")
|
||||
val middlewareConnectionId = column[String]("middlewareConnectionId")
|
||||
val establishedAt = column[java.sql.Timestamp]("establishedAt")
|
||||
@ -29,12 +35,20 @@ class UserGraphqlConnectionDbTableDef(tag: Tag) extends Table[UserGraphqlConnect
|
||||
|
||||
|
||||
object UserGraphqlConnectionDAO {
|
||||
def insert(sessionToken: String, middlewareUID:String, middlewareConnectionId: String) = {
|
||||
def insert(sessionToken: String,
|
||||
clientSessionUUID: String,
|
||||
clientType: String,
|
||||
clientIsMobile: Boolean,
|
||||
middlewareUID:String,
|
||||
middlewareConnectionId: String) = {
|
||||
DatabaseConnection.db.run(
|
||||
TableQuery[UserGraphqlConnectionDbTableDef].insertOrUpdate(
|
||||
UserGraphqlConnectionDbModel(
|
||||
graphqlConnectionId = None,
|
||||
sessionToken = sessionToken,
|
||||
clientSessionUUID = clientSessionUUID,
|
||||
clientType = clientType,
|
||||
clientIsMobile = clientIsMobile,
|
||||
middlewareUID = middlewareUID,
|
||||
middlewareConnectionId = middlewareConnectionId,
|
||||
establishedAt = new java.sql.Timestamp(System.currentTimeMillis()),
|
||||
@ -42,10 +56,10 @@ object UserGraphqlConnectionDAO {
|
||||
)
|
||||
)
|
||||
).onComplete {
|
||||
case Success(rowsAffected) => {
|
||||
case Success(rowsAffected) =>
|
||||
DatabaseConnection.logger.debug(s"$rowsAffected row(s) inserted on user_graphqlConnection table!")
|
||||
}
|
||||
case Failure(e) => DatabaseConnection.logger.debug(s"Error inserting user_graphqlConnection: $e")
|
||||
case Failure(e) =>
|
||||
DatabaseConnection.logger.debug(s"Error inserting user_graphqlConnection: $e")
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -46,10 +46,10 @@ object Users2x {
|
||||
for {
|
||||
u <- findWithIntId(users, intId)
|
||||
} yield {
|
||||
val newUser = u.copy(userLeftFlag = UserLeftFlag(false, 0))
|
||||
val newUser = u.copy(userLeftFlag = UserLeftFlag(left = false, 0))
|
||||
users.save(newUser)
|
||||
UserStateDAO.update(newUser)
|
||||
UserStateDAO.updateExpired(u.meetingId, u.intId, false)
|
||||
UserStateDAO.updateExpired(u.meetingId, u.intId, expired = false)
|
||||
newUser
|
||||
}
|
||||
}
|
||||
@ -124,6 +124,13 @@ object Users2x {
|
||||
newUserState
|
||||
}
|
||||
|
||||
def setClientType(users: Users2x, u: UserState, clientType: String): UserState = {
|
||||
val newUserState = modify(u)(_.clientType).setTo(clientType)
|
||||
users.save(newUserState)
|
||||
UserStateDAO.update(newUserState)
|
||||
newUserState
|
||||
}
|
||||
|
||||
def ejectFromMeeting(users: Users2x, intId: String): Option[UserState] = {
|
||||
for {
|
||||
_ <- users.remove(intId)
|
||||
|
@ -110,8 +110,6 @@ class ReceivedJsonMsgHandlerActor(
|
||||
routeGenericMsg[UserActivitySignCmdMsg](envelope, jsonNode)
|
||||
case ChangeUserPinStateReqMsg.NAME =>
|
||||
routeGenericMsg[ChangeUserPinStateReqMsg](envelope, jsonNode)
|
||||
case ChangeUserMobileFlagReqMsg.NAME =>
|
||||
routeGenericMsg[ChangeUserMobileFlagReqMsg](envelope, jsonNode)
|
||||
case UserConnectionAliveReqMsg.NAME =>
|
||||
routeGenericMsg[UserConnectionAliveReqMsg](envelope, jsonNode)
|
||||
case SetUserSpeechLocaleReqMsg.NAME =>
|
||||
|
@ -40,7 +40,7 @@ trait HandlerHelpers extends SystemConfiguration {
|
||||
}
|
||||
}
|
||||
|
||||
def userJoinMeeting(outGW: OutMsgRouter, authToken: String, clientType: String,
|
||||
def userJoinMeeting(outGW: OutMsgRouter, authToken: String, clientType: String, mobile: Boolean,
|
||||
liveMeeting: LiveMeeting, state: MeetingState2x): MeetingState2x = {
|
||||
|
||||
val nu = for {
|
||||
@ -68,7 +68,7 @@ trait HandlerHelpers extends SystemConfiguration {
|
||||
raiseHand = false,
|
||||
away = false,
|
||||
pin = false,
|
||||
mobile = false,
|
||||
mobile = mobile,
|
||||
presenter = false,
|
||||
locked = MeetingStatus2x.getPermissions(liveMeeting.status).lockOnJoin,
|
||||
avatar = regUser.avatarURL,
|
||||
|
@ -460,7 +460,6 @@ class MeetingActor(
|
||||
case m: ChangeUserPinStateReqMsg =>
|
||||
usersApp.handleChangeUserPinStateReqMsg(m)
|
||||
updateUserLastActivity(m.body.changedBy)
|
||||
case m: ChangeUserMobileFlagReqMsg => usersApp.handleChangeUserMobileFlagReqMsg(m)
|
||||
case m: UserConnectionAliveReqMsg => usersApp.handleUserConnectionAliveReqMsg(m)
|
||||
case m: SetUserSpeechLocaleReqMsg => usersApp.handleSetUserSpeechLocaleReqMsg(m)
|
||||
case m: SetUserSpeechOptionsReqMsg => usersApp.handleSetUserSpeechOptionsReqMsg(m)
|
||||
|
@ -77,7 +77,6 @@ class AnalyticsActor(val includeChat: Boolean) extends Actor with ActorLogging {
|
||||
case m: UserDisconnectedFromGlobalAudioMsg => logMessage(msg)
|
||||
case m: AssignPresenterReqMsg => logMessage(msg)
|
||||
case m: ChangeUserPinStateReqMsg => logMessage(msg)
|
||||
case m: ChangeUserMobileFlagReqMsg => logMessage(msg)
|
||||
case m: UserConnectionAliveReqMsg => logMessage(msg)
|
||||
case m: ScreenshareRtmpBroadcastStartedVoiceConfEvtMsg => logMessage(msg)
|
||||
case m: ScreenshareRtmpBroadcastStoppedVoiceConfEvtMsg => logMessage(msg)
|
||||
|
@ -37,6 +37,9 @@ case class GraphqlUserConnection(
|
||||
middlewareUID: String,
|
||||
browserConnectionId: String,
|
||||
sessionToken: String,
|
||||
clientSessionUUID: String,
|
||||
clientType: String,
|
||||
clientIsMobile: Boolean,
|
||||
user: GraphqlUser,
|
||||
)
|
||||
|
||||
@ -88,21 +91,33 @@ class GraphqlConnectionsActor(
|
||||
}
|
||||
|
||||
private def handleUserGraphqlConnectionEstablishedSysMsg(msg: UserGraphqlConnectionEstablishedSysMsg): Unit = {
|
||||
UserGraphqlConnectionDAO.insert(msg.body.sessionToken, msg.body.middlewareUID, msg.body.browserConnectionId)
|
||||
UserGraphqlConnectionDAO.insert(
|
||||
msg.body.sessionToken,
|
||||
msg.body.clientSessionUUID,
|
||||
msg.body.clientType,
|
||||
msg.body.clientIsMobile,
|
||||
msg.body.middlewareUID,
|
||||
msg.body.browserConnectionId)
|
||||
|
||||
for {
|
||||
user <- users.get(msg.body.sessionToken)
|
||||
} yield {
|
||||
|
||||
//Send internal message informing user has connected
|
||||
if (!graphqlConnections.values.exists(c => c.sessionToken == msg.body.sessionToken)) {
|
||||
eventBus.publish(BigBlueButtonEvent(user.meetingId, UserEstablishedGraphqlConnectionInternalMsg(user.intId)))
|
||||
}
|
||||
eventBus.publish(BigBlueButtonEvent(user.meetingId,
|
||||
UserEstablishedGraphqlConnectionInternalMsg(
|
||||
user.intId,
|
||||
msg.body.clientType,
|
||||
msg.body.clientIsMobile)
|
||||
))
|
||||
|
||||
graphqlConnections += (msg.body.browserConnectionId -> GraphqlUserConnection(
|
||||
msg.body.middlewareUID,
|
||||
msg.body.browserConnectionId,
|
||||
msg.body.sessionToken,
|
||||
msg.body.clientSessionUUID,
|
||||
msg.body.clientType,
|
||||
msg.body.clientIsMobile,
|
||||
user
|
||||
))
|
||||
}
|
||||
|
@ -278,7 +278,14 @@ case class UserGraphqlConnectionEstablishedSysMsg(
|
||||
header: BbbCoreBaseHeader,
|
||||
body: UserGraphqlConnectionEstablishedSysMsgBody
|
||||
) extends BbbCoreMsg
|
||||
case class UserGraphqlConnectionEstablishedSysMsgBody(middlewareUID: String, sessionToken: String, browserConnectionId: String)
|
||||
case class UserGraphqlConnectionEstablishedSysMsgBody(
|
||||
middlewareUID: String,
|
||||
sessionToken: String,
|
||||
clientSessionUUID: String,
|
||||
clientType: String,
|
||||
clientIsMobile: Boolean,
|
||||
browserConnectionId: String
|
||||
)
|
||||
|
||||
object UserGraphqlConnectionClosedSysMsg { val NAME = "UserGraphqlConnectionClosedSysMsg" }
|
||||
case class UserGraphqlConnectionClosedSysMsg(
|
||||
|
@ -279,13 +279,6 @@ object ClearedAllUsersReactionEvtMsg { val NAME = "ClearedAllUsersReactionEvtMsg
|
||||
case class ClearedAllUsersReactionEvtMsg(header: BbbClientMsgHeader, body: ClearedAllUsersReactionEvtMsgBody) extends StandardMsg
|
||||
case class ClearedAllUsersReactionEvtMsgBody()
|
||||
|
||||
/**
|
||||
* Sent from client about a user mobile flag.
|
||||
*/
|
||||
object ChangeUserMobileFlagReqMsg { val NAME = "ChangeUserMobileFlagReqMsg" }
|
||||
case class ChangeUserMobileFlagReqMsg(header: BbbClientMsgHeader, body: ChangeUserMobileFlagReqMsgBody) extends StandardMsg
|
||||
case class ChangeUserMobileFlagReqMsgBody(userId: String, mobile: Boolean)
|
||||
|
||||
/**
|
||||
* Sent from client to inform the connection is alive.
|
||||
*/
|
||||
@ -403,7 +396,7 @@ case class LogoutAndEndMeetingCmdMsgBody(userId: String)
|
||||
|
||||
object UserJoinMeetingReqMsg { val NAME = "UserJoinMeetingReqMsg" }
|
||||
case class UserJoinMeetingReqMsg(header: BbbClientMsgHeader, body: UserJoinMeetingReqMsgBody) extends StandardMsg
|
||||
case class UserJoinMeetingReqMsgBody(userId: String, authToken: String, clientType: String)
|
||||
case class UserJoinMeetingReqMsgBody(userId: String, authToken: String, clientType: String, clientIsMobile: Boolean)
|
||||
|
||||
/**
|
||||
* Sent from Flash client to rejoin meeting after reconnection
|
||||
|
@ -6,6 +6,7 @@ export default function buildRedisMessage(sessionVariables: Record<string, unkno
|
||||
[
|
||||
{name: 'authToken', type: 'string', required: true},
|
||||
{name: 'clientType', type: 'string', required: true},
|
||||
{name: 'clientIsMobile', type: 'boolean', required: true},
|
||||
]
|
||||
)
|
||||
|
||||
@ -26,6 +27,7 @@ export default function buildRedisMessage(sessionVariables: Record<string, unkno
|
||||
userId: routing.userId,
|
||||
authToken: input.authToken,
|
||||
clientType: input.clientType,
|
||||
clientIsMobile: input.clientIsMobile,
|
||||
};
|
||||
|
||||
return { eventName, routing, header, body };
|
||||
|
@ -1,30 +0,0 @@
|
||||
import { RedisMessage } from '../types';
|
||||
import {throwErrorIfInvalidInput} from "../imports/validation";
|
||||
|
||||
export default function buildRedisMessage(sessionVariables: Record<string, unknown>, input: Record<string, unknown>): RedisMessage {
|
||||
throwErrorIfInvalidInput(input,
|
||||
[
|
||||
{name: 'mobile', type: 'boolean', required: true},
|
||||
]
|
||||
)
|
||||
|
||||
const eventName = `ChangeUserMobileFlagReqMsg`;
|
||||
|
||||
const routing = {
|
||||
meetingId: sessionVariables['x-hasura-meetingid'] as String,
|
||||
userId: sessionVariables['x-hasura-userid'] as String
|
||||
};
|
||||
|
||||
const header = {
|
||||
name: eventName,
|
||||
meetingId: routing.meetingId,
|
||||
userId: routing.userId
|
||||
};
|
||||
|
||||
const body = {
|
||||
userId: routing.userId,
|
||||
mobile: input.mobile
|
||||
};
|
||||
|
||||
return { eventName, routing, header, body };
|
||||
}
|
@ -10,7 +10,6 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/iMDT/bbb-graphql-middleware/internal/common"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
@ -18,8 +17,8 @@ import (
|
||||
var authHookUrl = os.Getenv("BBB_GRAPHQL_MIDDLEWARE_AUTH_HOOK_URL")
|
||||
|
||||
// BBBWebClient handles the web requests for authentication and returns a map of response headers.
|
||||
func BBBWebClient(browserConnection *common.BrowserConnection, cookies []*http.Cookie) (map[string]string, error) {
|
||||
logger := log.WithField("_routine", "BBBWebClient").WithField("browserConnectionId", browserConnection.Id)
|
||||
func BBBWebClient(browserConnectionId string, sessionToken string, cookies []*http.Cookie) (map[string]string, error) {
|
||||
logger := log.WithField("_routine", "BBBWebClient").WithField("browserConnectionId", browserConnectionId)
|
||||
logger.Debug("Starting BBBWebClient")
|
||||
defer logger.Debug("Finished BBBWebClient")
|
||||
|
||||
@ -47,12 +46,12 @@ func BBBWebClient(browserConnection *common.BrowserConnection, cookies []*http.C
|
||||
}
|
||||
|
||||
// Wait for SessionToken to be provided.
|
||||
for browserConnection.SessionToken == "" {
|
||||
for sessionToken == "" {
|
||||
time.Sleep(150 * time.Millisecond)
|
||||
}
|
||||
|
||||
// Execute the HTTP request to obtain user session variables (like X-Hasura-Role)
|
||||
req.Header.Set("x-session-token", browserConnection.SessionToken)
|
||||
req.Header.Set("x-session-token", sessionToken)
|
||||
req.Header.Set("User-Agent", "hasura-graphql-engine")
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
|
@ -2,6 +2,7 @@ package common
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
"sync"
|
||||
|
||||
"nhooyr.io/websocket"
|
||||
@ -31,9 +32,14 @@ type GraphQlSubscription struct {
|
||||
}
|
||||
|
||||
type BrowserConnection struct {
|
||||
Id string // browser connection id
|
||||
SessionToken string // session token of this connection
|
||||
Context context.Context // browser connection context
|
||||
Id string // browser connection id
|
||||
Websocket *websocket.Conn // websocket of browser connection
|
||||
SessionToken string // session token of this connection
|
||||
BBBWebSessionVariables map[string]string // graphql session variables provided by bbb-web
|
||||
ClientSessionUUID string // self-generated unique id for this client
|
||||
Context context.Context // browser connection context
|
||||
ContextCancelFunc context.CancelFunc // function to cancel the browser context (and so, the browser connection)
|
||||
BrowserRequestCookies []*http.Cookie
|
||||
ActiveSubscriptions map[string]GraphQlSubscription // active subscriptions of this connection (start, but no stop)
|
||||
ActiveSubscriptionsMutex sync.RWMutex // mutex to control the map usage
|
||||
ConnectionInitMessage map[string]interface{} // init message received in this connection (to be used on hasura reconnect)
|
||||
@ -45,10 +51,11 @@ type BrowserConnection struct {
|
||||
}
|
||||
|
||||
type HasuraConnection struct {
|
||||
Id string // hasura connection id
|
||||
BrowserConn *BrowserConnection // browser connection that originated this hasura connection
|
||||
Websocket *websocket.Conn // websocket used to connect to hasura
|
||||
Context context.Context // hasura connection context (child of browser connection context)
|
||||
ContextCancelFunc context.CancelFunc // function to cancel the hasura context (and so, the hasura connection)
|
||||
FreezeMsgFromBrowserChan *SafeChannel // indicate that it's waiting for the return of mutations before closing connection
|
||||
Id string // hasura connection id
|
||||
BrowserConn *BrowserConnection // browser connection that originated this hasura connection
|
||||
Websocket *websocket.Conn // websocket used to connect to Hasura
|
||||
WebsocketCloseError *websocket.CloseError // closeError received from Hasura
|
||||
Context context.Context // hasura connection context (child of browser connection context)
|
||||
ContextCancelFunc context.CancelFunc // function to cancel the hasura context (and so, the hasura connection)
|
||||
FreezeMsgFromBrowserChan *SafeChannel // indicate that it's waiting for the return of mutations before closing connection
|
||||
}
|
||||
|
@ -4,7 +4,6 @@ import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/iMDT/bbb-graphql-middleware/internal/bbb_web"
|
||||
"github.com/iMDT/bbb-graphql-middleware/internal/common"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"io/ioutil"
|
||||
@ -18,21 +17,13 @@ var graphqlActionsUrl = os.Getenv("BBB_GRAPHQL_MIDDLEWARE_GRAPHQL_ACTIONS_URL")
|
||||
|
||||
func GraphqlActionsClient(
|
||||
browserConnection *common.BrowserConnection,
|
||||
cookies []*http.Cookie,
|
||||
fromBrowserToGqlActionsChannel *common.SafeChannel,
|
||||
fromBrowserToHasuraChannel *common.SafeChannel,
|
||||
fromHasuraToBrowserChannel *common.SafeChannel) error {
|
||||
|
||||
log := log.WithField("_routine", "GraphqlActionsClient").WithField("browserConnectionId", browserConnection.Id)
|
||||
log.Debug("Starting GraphqlActionsClient")
|
||||
defer log.Debug("Finished GraphqlActionsClient")
|
||||
|
||||
sessionVariables, err := bbb_web.BBBWebClient(browserConnection, cookies)
|
||||
if err != nil {
|
||||
log.Error("It was not able to load session variables from AuthHook", err)
|
||||
return nil
|
||||
}
|
||||
|
||||
RangeLoop:
|
||||
for {
|
||||
select {
|
||||
@ -61,7 +52,7 @@ RangeLoop:
|
||||
if okQuery && okVariables && strings.HasPrefix(query, "mutation") {
|
||||
if funcName, inputs, err := parseGraphQLMutation(query, variables); err == nil {
|
||||
mutationFuncName = funcName
|
||||
if err = SendGqlActionsRequest(funcName, inputs, sessionVariables); err == nil {
|
||||
if err = SendGqlActionsRequest(funcName, inputs, browserConnection.BBBWebSessionVariables); err == nil {
|
||||
} else {
|
||||
errorMessage = err.Error()
|
||||
log.Error("It was not able to send the request to Graphql Actions", err)
|
||||
@ -77,15 +68,13 @@ RangeLoop:
|
||||
browserResponseData := map[string]interface{}{
|
||||
"id": queryId,
|
||||
"type": "error",
|
||||
"payload": map[string]interface{}{
|
||||
"data": nil,
|
||||
"errors": []interface{}{
|
||||
map[string]interface{}{
|
||||
"message": errorMessage,
|
||||
},
|
||||
"payload": []interface{}{
|
||||
map[string]interface{}{
|
||||
"message": errorMessage,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
fromHasuraToBrowserChannel.Send(browserResponseData)
|
||||
} else {
|
||||
//Action sent successfully, return data msg to client
|
||||
|
@ -22,7 +22,10 @@ var lastHasuraConnectionId int
|
||||
var hasuraEndpoint = os.Getenv("BBB_GRAPHQL_MIDDLEWARE_HASURA_WS")
|
||||
|
||||
// Hasura client connection
|
||||
func HasuraClient(browserConnection *common.BrowserConnection, cookies []*http.Cookie, fromBrowserToHasuraChannel *common.SafeChannel, fromHasuraToBrowserChannel *common.SafeChannel) error {
|
||||
func HasuraClient(
|
||||
browserConnection *common.BrowserConnection,
|
||||
fromBrowserToHasuraChannel *common.SafeChannel,
|
||||
fromHasuraToBrowserChannel *common.SafeChannel) error {
|
||||
log := log.WithField("_routine", "HasuraClient").WithField("browserConnectionId", browserConnection.Id)
|
||||
common.ActivitiesOverviewStarted("__HasuraConnection")
|
||||
defer common.ActivitiesOverviewCompleted("__HasuraConnection")
|
||||
@ -58,7 +61,7 @@ func HasuraClient(browserConnection *common.BrowserConnection, cookies []*http.C
|
||||
return xerrors.Errorf("failed to parse url: %w", err)
|
||||
}
|
||||
parsedURL.Scheme = "http"
|
||||
jar.SetCookies(parsedURL, cookies)
|
||||
jar.SetCookies(parsedURL, browserConnection.BrowserRequestCookies)
|
||||
hc := &http.Client{
|
||||
Jar: jar,
|
||||
}
|
||||
@ -81,6 +84,12 @@ func HasuraClient(browserConnection *common.BrowserConnection, cookies []*http.C
|
||||
|
||||
browserConnection.HasuraConnection = &thisConnection
|
||||
defer func() {
|
||||
//When Hasura sends an CloseError, it will forward the error to the browser and close the connection
|
||||
if thisConnection.WebsocketCloseError != nil {
|
||||
browserConnection.Websocket.Close(thisConnection.WebsocketCloseError.Code, thisConnection.WebsocketCloseError.Reason)
|
||||
browserConnection.ContextCancelFunc()
|
||||
}
|
||||
|
||||
browserConnection.HasuraConnection = nil
|
||||
|
||||
//It's necessary to freeze the channel to avoid client trying to start subscriptions before Hasura connection is initialised
|
||||
@ -89,15 +98,15 @@ func HasuraClient(browserConnection *common.BrowserConnection, cookies []*http.C
|
||||
}()
|
||||
|
||||
// Make the connection
|
||||
c, _, err := websocket.Dial(hasuraConnectionContext, hasuraEndpoint, &dialOptions)
|
||||
hasuraWsConn, _, err := websocket.Dial(hasuraConnectionContext, hasuraEndpoint, &dialOptions)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error connecting to hasura: %v", err)
|
||||
}
|
||||
defer c.Close(websocket.StatusInternalError, "the sky is falling")
|
||||
defer hasuraWsConn.Close(websocket.StatusInternalError, "the sky is falling")
|
||||
|
||||
c.SetReadLimit(math.MaxInt64 - 1)
|
||||
hasuraWsConn.SetReadLimit(math.MaxInt64 - 1)
|
||||
|
||||
thisConnection.Websocket = c
|
||||
thisConnection.Websocket = hasuraWsConn
|
||||
|
||||
// Log the connection success
|
||||
log.Debugf("connected with Hasura")
|
||||
|
@ -10,12 +10,17 @@ import (
|
||||
"github.com/iMDT/bbb-graphql-middleware/internal/msgpatch"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"hash/crc32"
|
||||
"nhooyr.io/websocket"
|
||||
"nhooyr.io/websocket/wsjson"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// HasuraConnectionReader consumes messages from Hasura connection and add send to the browser channel
|
||||
func HasuraConnectionReader(hc *common.HasuraConnection, fromHasuraToBrowserChannel *common.SafeChannel, fromBrowserToHasuraChannel *common.SafeChannel, wg *sync.WaitGroup) {
|
||||
func HasuraConnectionReader(
|
||||
hc *common.HasuraConnection,
|
||||
fromHasuraToBrowserChannel *common.SafeChannel,
|
||||
fromBrowserToHasuraChannel *common.SafeChannel,
|
||||
wg *sync.WaitGroup) {
|
||||
log := log.WithField("_routine", "HasuraConnectionReader").WithField("browserConnectionId", hc.BrowserConn.Id).WithField("hasuraConnectionId", hc.Id)
|
||||
defer log.Debugf("finished")
|
||||
log.Debugf("starting")
|
||||
@ -27,10 +32,27 @@ func HasuraConnectionReader(hc *common.HasuraConnection, fromHasuraToBrowserChan
|
||||
// Read a message from hasura
|
||||
var message interface{}
|
||||
err := wsjson.Read(hc.Context, hc.Websocket, &message)
|
||||
|
||||
var closeError *websocket.CloseError
|
||||
|
||||
if err != nil {
|
||||
if errors.Is(err, context.Canceled) {
|
||||
log.Debugf("Closing Hasura ws connection as Context was cancelled!")
|
||||
} else if errors.As(err, &closeError) {
|
||||
hc.WebsocketCloseError = closeError
|
||||
log.Debug("WebSocket connection closed: status = %v, reason = %s", closeError.Code, closeError.Reason)
|
||||
//TODO check if it should send {"type":"connection_error","payload":"Authentication hook unauthorized this request"}
|
||||
} else {
|
||||
if websocket.CloseStatus(err) == -1 {
|
||||
//It doesn't have a CloseError, it will reconnect do Hasura
|
||||
} else {
|
||||
//In case Hasura sent an CloseError, it will forward it to browser and disconnect
|
||||
hc.WebsocketCloseError = &websocket.CloseError{
|
||||
Code: websocket.CloseStatus(err),
|
||||
Reason: "Graphql connection closed with error" + err.Error(),
|
||||
}
|
||||
}
|
||||
|
||||
log.Debugf("Error reading message from Hasura: %v", err)
|
||||
}
|
||||
return
|
||||
|
@ -27,13 +27,16 @@ func HasuraConnectionWriter(hc *common.HasuraConnection, fromBrowserToHasuraChan
|
||||
|
||||
//Send authentication (init) message at first
|
||||
//It will not use the channel (fromBrowserToHasuraChannel) because this msg must bypass ChannelFreeze
|
||||
if initMessage != nil {
|
||||
log.Infof("it's a reconnection, injecting authentication (init) message")
|
||||
err := wsjson.Write(hc.Context, hc.Websocket, initMessage)
|
||||
if err != nil {
|
||||
log.Errorf("error on write authentication (init) message (we're disconnected from hasura): %v", err)
|
||||
return
|
||||
}
|
||||
if initMessage == nil {
|
||||
log.Errorf("it can't start Hasura Connection because initMessage is null")
|
||||
return
|
||||
}
|
||||
|
||||
//Send init connection message to Hasura to start
|
||||
err := wsjson.Write(hc.Context, hc.Websocket, initMessage)
|
||||
if err != nil {
|
||||
log.Errorf("error on write authentication (init) message (we're disconnected from hasura): %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
RangeLoop:
|
||||
@ -174,7 +177,9 @@ RangeLoop:
|
||||
}
|
||||
|
||||
if fromBrowserMessageAsMap["type"] == "connection_init" {
|
||||
browserConnection.ConnectionInitMessage = fromBrowserMessageAsMap
|
||||
//browserConnection.ConnectionInitMessage = fromBrowserMessageAsMap
|
||||
//Skip message once it is handled by ConnInitHandler already
|
||||
continue
|
||||
}
|
||||
|
||||
log.Tracef("sending to hasura: %v", fromBrowserMessageAsMap)
|
||||
|
@ -3,6 +3,7 @@ package websrv
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/iMDT/bbb-graphql-middleware/internal/bbb_web"
|
||||
"github.com/iMDT/bbb-graphql-middleware/internal/common"
|
||||
"github.com/iMDT/bbb-graphql-middleware/internal/gql_actions"
|
||||
"github.com/iMDT/bbb-graphql-middleware/internal/hasura"
|
||||
@ -13,6 +14,7 @@ import (
|
||||
"net/http"
|
||||
"nhooyr.io/websocket"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
@ -58,10 +60,13 @@ func ConnectionHandler(w http.ResponseWriter, r *http.Request) {
|
||||
defer browserWsConn.Close(websocket.StatusInternalError, "the sky is falling")
|
||||
|
||||
var thisConnection = common.BrowserConnection{
|
||||
Id: browserConnectionId,
|
||||
ActiveSubscriptions: make(map[string]common.GraphQlSubscription, 1),
|
||||
Context: browserConnectionContext,
|
||||
ConnAckSentToBrowser: false,
|
||||
Id: browserConnectionId,
|
||||
Websocket: browserWsConn,
|
||||
BrowserRequestCookies: r.Cookies(),
|
||||
ActiveSubscriptions: make(map[string]common.GraphQlSubscription, 1),
|
||||
Context: browserConnectionContext,
|
||||
ContextCancelFunc: browserConnectionContextCancel,
|
||||
ConnAckSentToBrowser: false,
|
||||
}
|
||||
|
||||
BrowserConnectionsMutex.Lock()
|
||||
@ -91,6 +96,44 @@ func ConnectionHandler(w http.ResponseWriter, r *http.Request) {
|
||||
fromBrowserToGqlActionsChannel := common.NewSafeChannel(bufferSize)
|
||||
fromHasuraToBrowserChannel := common.NewSafeChannel(bufferSize)
|
||||
|
||||
// Configure the wait group (to hold this routine execution until both are completed)
|
||||
var wgAll sync.WaitGroup
|
||||
wgAll.Add(3)
|
||||
|
||||
// Other wait group to close this connection once Browser Reader dies
|
||||
var wgReader sync.WaitGroup
|
||||
wgReader.Add(1)
|
||||
|
||||
// Reads from browser connection, writes into fromBrowserToHasuraChannel and fromBrowserToHasuraConnectionEstablishingChannel
|
||||
go reader.BrowserConnectionReader(
|
||||
browserConnectionId,
|
||||
browserConnectionContext,
|
||||
browserConnectionContextCancel,
|
||||
browserWsConn,
|
||||
fromBrowserToGqlActionsChannel,
|
||||
fromBrowserToHasuraChannel,
|
||||
fromBrowserToHasuraConnectionEstablishingChannel,
|
||||
[]*sync.WaitGroup{&wgAll, &wgReader})
|
||||
|
||||
go func() {
|
||||
wgReader.Wait()
|
||||
log.Debug("BrowserConnectionReader finished, closing Write Channel")
|
||||
fromHasuraToBrowserChannel.Close()
|
||||
thisConnection.Disconnected = true
|
||||
}()
|
||||
|
||||
//Obtain user session variables from bbb-web
|
||||
if errorOnInitConnection := connectionInitHandler(&thisConnection, fromBrowserToHasuraConnectionEstablishingChannel); errorOnInitConnection != nil {
|
||||
//If the server wishes to reject the connection it is recommended to close the socket with `4403: Forbidden`.
|
||||
//https://github.com/enisdenjo/graphql-ws/blob/63881c3372a3564bf42040e3f572dd74e41b2e49/PROTOCOL.md?plain=1#L36
|
||||
wsError := &websocket.CloseError{
|
||||
Code: websocket.StatusCode(4403),
|
||||
Reason: errorOnInitConnection.Error(),
|
||||
}
|
||||
browserWsConn.Close(wsError.Code, wsError.Reason)
|
||||
browserConnectionContextCancel()
|
||||
}
|
||||
|
||||
// Ensure a hasura client is running while the browser is connected
|
||||
go func() {
|
||||
log.Debugf("starting hasura client")
|
||||
@ -108,7 +151,10 @@ func ConnectionHandler(w http.ResponseWriter, r *http.Request) {
|
||||
BrowserConnectionsMutex.RUnlock()
|
||||
if thisBrowserConnection != nil {
|
||||
log.Debugf("created hasura client")
|
||||
hasura.HasuraClient(thisBrowserConnection, r.Cookies(), fromBrowserToHasuraChannel, fromHasuraToBrowserChannel)
|
||||
hasura.HasuraClient(
|
||||
thisBrowserConnection,
|
||||
fromBrowserToHasuraChannel,
|
||||
fromHasuraToBrowserChannel)
|
||||
}
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
@ -138,43 +184,20 @@ func ConnectionHandler(w http.ResponseWriter, r *http.Request) {
|
||||
thisBrowserConnection.GraphqlActionsContext, thisBrowserConnection.GraphqlActionsContextCancel = context.WithCancel(browserConnectionContext)
|
||||
BrowserConnectionsMutex.Unlock()
|
||||
|
||||
gql_actions.GraphqlActionsClient(thisBrowserConnection, r.Cookies(), fromBrowserToGqlActionsChannel, fromBrowserToHasuraChannel, fromHasuraToBrowserChannel)
|
||||
gql_actions.GraphqlActionsClient(
|
||||
thisBrowserConnection,
|
||||
fromBrowserToGqlActionsChannel,
|
||||
fromHasuraToBrowserChannel)
|
||||
}
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
time.Sleep(1000 * time.Millisecond)
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Configure the wait group (to hold this routine execution until both are completed)
|
||||
var wgAll sync.WaitGroup
|
||||
wgAll.Add(3)
|
||||
|
||||
var wgReader sync.WaitGroup
|
||||
wgReader.Add(1)
|
||||
|
||||
// Reads from browser connection, writes into fromBrowserToHasuraChannel and fromBrowserToHasuraConnectionEstablishingChannel
|
||||
go reader.BrowserConnectionReader(
|
||||
browserConnectionId,
|
||||
browserConnectionContext,
|
||||
browserConnectionContextCancel,
|
||||
browserWsConn,
|
||||
fromBrowserToGqlActionsChannel,
|
||||
fromBrowserToHasuraChannel,
|
||||
fromBrowserToHasuraConnectionEstablishingChannel,
|
||||
[]*sync.WaitGroup{&wgAll, &wgReader})
|
||||
go func() {
|
||||
wgReader.Wait()
|
||||
log.Debug("BrowserConnectionReader finished, closing Write Channel")
|
||||
fromHasuraToBrowserChannel.Close()
|
||||
thisConnection.Disconnected = true
|
||||
}()
|
||||
|
||||
// Reads from fromHasuraToBrowserChannel, writes to browser connection
|
||||
go writer.BrowserConnectionWriter(browserConnectionId, browserConnectionContext, browserWsConn, fromHasuraToBrowserChannel, &wgAll)
|
||||
|
||||
go ConnectionInitHandler(browserConnectionId, browserConnectionContext, fromBrowserToHasuraConnectionEstablishingChannel, &wgAll)
|
||||
|
||||
// Wait until all routines are finished
|
||||
wgAll.Wait()
|
||||
}
|
||||
@ -194,6 +217,7 @@ func InvalidateSessionTokenConnections(sessionTokenToInvalidate string) {
|
||||
wg.Add(1)
|
||||
go func(bc *common.BrowserConnection) {
|
||||
defer wg.Done()
|
||||
go refreshUserSessionVariables(bc)
|
||||
invalidateHasuraConnectionForSessionToken(bc, sessionTokenToInvalidate)
|
||||
}(browserConnection)
|
||||
}
|
||||
@ -241,3 +265,99 @@ func invalidateHasuraConnectionForSessionToken(bc *common.BrowserConnection, ses
|
||||
// Send a reconnection confirmation message
|
||||
go SendUserGraphqlReconnectionForcedEvtMsg(sessionToken)
|
||||
}
|
||||
|
||||
func refreshUserSessionVariables(browserConnection *common.BrowserConnection) error {
|
||||
BrowserConnectionsMutex.RLock()
|
||||
browserSessionToken := browserConnection.SessionToken
|
||||
browserConnectionId := browserConnection.Id
|
||||
browserConnectionCookies := browserConnection.BrowserRequestCookies
|
||||
BrowserConnectionsMutex.RUnlock()
|
||||
|
||||
// Check authorization
|
||||
sessionVariables, err := bbb_web.BBBWebClient(browserConnectionId, browserSessionToken, browserConnectionCookies)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
return fmt.Errorf("error on checking sessionToken authorization")
|
||||
} else {
|
||||
log.Trace("Session variables obtained successfully")
|
||||
}
|
||||
|
||||
BrowserConnectionsMutex.Lock()
|
||||
browserConnection.BBBWebSessionVariables = sessionVariables
|
||||
BrowserConnectionsMutex.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func connectionInitHandler(
|
||||
browserConnection *common.BrowserConnection,
|
||||
fromBrowserToHasuraConnectionEstablishingChannel *common.SafeChannel) error {
|
||||
|
||||
BrowserConnectionsMutex.RLock()
|
||||
browserConnectionId := browserConnection.Id
|
||||
browserConnectionCookies := browserConnection.BrowserRequestCookies
|
||||
BrowserConnectionsMutex.RUnlock()
|
||||
|
||||
// Intercept the fromBrowserMessage channel to get the sessionToken
|
||||
for {
|
||||
fromBrowserMessage, ok := fromBrowserToHasuraConnectionEstablishingChannel.Receive()
|
||||
if !ok {
|
||||
//Received all messages. Channel is closed
|
||||
return fmt.Errorf("error on receiving init connection")
|
||||
}
|
||||
var fromBrowserMessageAsMap = fromBrowserMessage.(map[string]interface{})
|
||||
|
||||
if fromBrowserMessageAsMap["type"] == "connection_init" {
|
||||
var payloadAsMap = fromBrowserMessageAsMap["payload"].(map[string]interface{})
|
||||
var headersAsMap = payloadAsMap["headers"].(map[string]interface{})
|
||||
var sessionToken, existsSessionToken = headersAsMap["X-Session-Token"].(string)
|
||||
if !existsSessionToken {
|
||||
return fmt.Errorf("X-Session-Token header missing on init connection")
|
||||
}
|
||||
|
||||
var clientSessionUUID, existsClientSessionUUID = headersAsMap["X-ClientSessionUUID"].(string)
|
||||
if !existsClientSessionUUID {
|
||||
return fmt.Errorf("X-ClientSessionUUID header missing on init connection")
|
||||
}
|
||||
|
||||
var clientType, existsClientType = headersAsMap["X-ClientType"].(string)
|
||||
if !existsClientType {
|
||||
return fmt.Errorf("X-ClientType header missing on init connection")
|
||||
}
|
||||
|
||||
var clientIsMobile, existsMobile = headersAsMap["X-ClientIsMobile"].(string)
|
||||
if !existsMobile {
|
||||
return fmt.Errorf("X-ClientIsMobile header missing on init connection")
|
||||
}
|
||||
|
||||
// Check authorization
|
||||
sessionVariables, err := bbb_web.BBBWebClient(browserConnectionId, sessionToken, browserConnectionCookies)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
return fmt.Errorf("error on checking sessionToken authorization")
|
||||
} else {
|
||||
log.Trace("Session variables obtained successfully")
|
||||
}
|
||||
|
||||
log.Debugf("[ConnectionInitHandler] intercepted Session Token %v and Client Session UUID %v", sessionToken, clientSessionUUID)
|
||||
BrowserConnectionsMutex.Lock()
|
||||
browserConnection.SessionToken = sessionToken
|
||||
browserConnection.ClientSessionUUID = clientSessionUUID
|
||||
browserConnection.BBBWebSessionVariables = sessionVariables
|
||||
browserConnection.ConnectionInitMessage = fromBrowserMessageAsMap
|
||||
BrowserConnectionsMutex.Unlock()
|
||||
|
||||
go SendUserGraphqlConnectionEstablishedSysMsg(
|
||||
sessionToken,
|
||||
clientSessionUUID,
|
||||
clientType,
|
||||
strings.ToLower(clientIsMobile) == "true",
|
||||
browserConnectionId,
|
||||
)
|
||||
fromBrowserToHasuraConnectionEstablishingChannel.Close()
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -1,50 +0,0 @@
|
||||
package websrv
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/iMDT/bbb-graphql-middleware/internal/common"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"sync"
|
||||
)
|
||||
|
||||
func ConnectionInitHandler(browserConnectionId string, browserConnectionContext context.Context, fromBrowserToHasuraConnectionEstablishingChannel *common.SafeChannel, wg *sync.WaitGroup) {
|
||||
log := log.WithField("_routine", "ConnectionInitHandler").WithField("browserConnectionId", browserConnectionId)
|
||||
|
||||
log.Debugf("starting")
|
||||
|
||||
defer wg.Done()
|
||||
defer log.Debugf("finished")
|
||||
|
||||
BrowserConnectionsMutex.RLock()
|
||||
browserConnection := BrowserConnections[browserConnectionId]
|
||||
BrowserConnectionsMutex.RUnlock()
|
||||
|
||||
// Intercept the fromBrowserMessage channel to get the sessionToken
|
||||
for {
|
||||
fromBrowserMessage, ok := fromBrowserToHasuraConnectionEstablishingChannel.Receive()
|
||||
if !ok {
|
||||
//Received all messages. Channel is closed
|
||||
return
|
||||
}
|
||||
if browserConnection.SessionToken == "" {
|
||||
var fromBrowserMessageAsMap = fromBrowserMessage.(map[string]interface{})
|
||||
|
||||
if fromBrowserMessageAsMap["type"] == "connection_init" {
|
||||
var payloadAsMap = fromBrowserMessageAsMap["payload"].(map[string]interface{})
|
||||
var headersAsMap = payloadAsMap["headers"].(map[string]interface{})
|
||||
var sessionToken = headersAsMap["X-Session-Token"]
|
||||
if sessionToken != nil {
|
||||
sessionToken := headersAsMap["X-Session-Token"].(string)
|
||||
log.Debugf("[SessionTokenReader] intercepted session token %v", sessionToken)
|
||||
BrowserConnectionsMutex.Lock()
|
||||
browserConnection.SessionToken = sessionToken
|
||||
BrowserConnectionsMutex.Unlock()
|
||||
|
||||
go SendUserGraphqlConnectionEstablishedSysMsg(sessionToken, browserConnectionId)
|
||||
fromBrowserToHasuraConnectionEstablishingChannel.Close()
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -125,10 +125,18 @@ func SendUserGraphqlReconnectionForcedEvtMsg(sessionToken string) {
|
||||
sendBbbCoreMsgToRedis("UserGraphqlReconnectionForcedEvtMsg", body)
|
||||
}
|
||||
|
||||
func SendUserGraphqlConnectionEstablishedSysMsg(sessionToken string, browserConnectionId string) {
|
||||
func SendUserGraphqlConnectionEstablishedSysMsg(
|
||||
sessionToken string,
|
||||
clientSessionUUID string,
|
||||
clientType string,
|
||||
clientIsMobile bool,
|
||||
browserConnectionId string) {
|
||||
var body = map[string]interface{}{
|
||||
"middlewareUID": common.GetUniqueID(),
|
||||
"sessionToken": sessionToken,
|
||||
"clientSessionUUID": clientSessionUUID,
|
||||
"clientType": clientType,
|
||||
"clientIsMobile": clientIsMobile,
|
||||
"browserConnectionId": browserConnectionId,
|
||||
}
|
||||
|
||||
|
@ -807,6 +807,9 @@ CREATE INDEX "idx_user_connectionStatusMetrics_UnstableReport" ON "user_connecti
|
||||
CREATE TABLE "user_graphqlConnection" (
|
||||
"graphqlConnectionId" serial PRIMARY KEY,
|
||||
"sessionToken" varchar(16),
|
||||
"clientSessionUUID" varchar(36),
|
||||
"clientType" varchar(50),
|
||||
"clientIsMobile" bool,
|
||||
"middlewareUID" varchar(36),
|
||||
"middlewareConnectionId" varchar(12),
|
||||
"establishedAt" timestamp with time zone,
|
||||
|
@ -494,12 +494,6 @@ type Mutation {
|
||||
): Boolean
|
||||
}
|
||||
|
||||
type Mutation {
|
||||
userSetMobileFlag(
|
||||
mobile: Boolean!
|
||||
): Boolean
|
||||
}
|
||||
|
||||
type Mutation {
|
||||
userSetMuted(
|
||||
userId: String
|
||||
|
@ -445,12 +445,6 @@ actions:
|
||||
handler: '{{HASURA_BBB_GRAPHQL_ACTIONS_ADAPTER_URL}}'
|
||||
permissions:
|
||||
- role: bbb_client
|
||||
- name: userSetMobileFlag
|
||||
definition:
|
||||
kind: synchronous
|
||||
handler: '{{HASURA_BBB_GRAPHQL_ACTIONS_ADAPTER_URL}}'
|
||||
permissions:
|
||||
- role: bbb_client
|
||||
- name: userSetMuted
|
||||
definition:
|
||||
kind: synchronous
|
||||
|
@ -160,7 +160,6 @@ class App extends Component {
|
||||
intl,
|
||||
layoutContextDispatch,
|
||||
isRTL,
|
||||
setMobileUser,
|
||||
toggleVoice,
|
||||
} = this.props;
|
||||
const { browserName } = browserInfo;
|
||||
@ -219,8 +218,6 @@ class App extends Component {
|
||||
};
|
||||
}
|
||||
|
||||
if (deviceInfo.isMobile) setMobileUser(true);
|
||||
|
||||
logger.info({ logCode: 'app_component_componentdidmount' }, 'Client loaded successfully');
|
||||
}
|
||||
|
||||
|
@ -80,7 +80,6 @@ const AppContainer = (props) => {
|
||||
const deviceType = layoutSelect((i) => i.deviceType);
|
||||
const layoutContextDispatch = layoutDispatch();
|
||||
|
||||
const [setMobileFlag] = useMutation(SET_MOBILE_FLAG);
|
||||
const [setSyncWithPresenterLayout] = useMutation(SET_SYNC_WITH_PRESENTER_LAYOUT);
|
||||
const [setMeetingLayoutProps] = useMutation(SET_LAYOUT_PROPS);
|
||||
const toggleVoice = useToggleVoice();
|
||||
@ -91,14 +90,6 @@ const AppContainer = (props) => {
|
||||
const isSharedNotesPinned = sharedNotesInput?.isPinned && isSharedNotesPinnedFromGraphql;
|
||||
const isThereWebcam = VideoStreamsState.getStreams().length > 0;
|
||||
|
||||
const setMobileUser = (mobile) => {
|
||||
setMobileFlag({
|
||||
variables: {
|
||||
mobile,
|
||||
},
|
||||
});
|
||||
};
|
||||
|
||||
const { data: currentUserData } = useCurrentUser((user) => ({
|
||||
enforceLayout: user.enforceLayout,
|
||||
isModerator: user.isModerator,
|
||||
@ -241,7 +232,6 @@ const AppContainer = (props) => {
|
||||
shouldShowScreenshare,
|
||||
isSharedNotesPinned,
|
||||
shouldShowPresentation,
|
||||
setMobileUser,
|
||||
toggleVoice,
|
||||
setLocalSettings,
|
||||
genericComponentId: genericComponent.genericComponentId,
|
||||
|
@ -2,6 +2,7 @@ import { useCallback } from 'react';
|
||||
import { useMutation } from '@apollo/client';
|
||||
import { USER_SET_MUTED } from '../mutations';
|
||||
import useCurrentUser from '/imports/ui/core/hooks/useCurrentUser';
|
||||
import logger from '/imports/startup/client/logger';
|
||||
|
||||
const useToggleVoice = () => {
|
||||
const [userSetMuted] = useMutation(USER_SET_MUTED);
|
||||
@ -12,7 +13,11 @@ const useToggleVoice = () => {
|
||||
}));
|
||||
|
||||
const toggleVoice = async (userId: string, muted: boolean) => {
|
||||
userSetMuted({ variables: { muted, userId } });
|
||||
try {
|
||||
await userSetMuted({ variables: { muted, userId } });
|
||||
} catch (e) {
|
||||
logger.error('Error on trying to toggle muted');
|
||||
}
|
||||
};
|
||||
|
||||
return useCallback(toggleVoice, [currentUserData?.voice?.muted]);
|
||||
|
@ -73,7 +73,7 @@ class ErrorBoundary extends Component {
|
||||
// delay to termintate the connection, for user receive the end eject message
|
||||
setTimeout(() => {
|
||||
apolloClient.setLink(ApolloLink.empty());
|
||||
ws.close();
|
||||
ws.terminate();
|
||||
}, 5000);
|
||||
}
|
||||
this.setState({
|
||||
|
@ -6,7 +6,9 @@ import { createClient } from 'graphql-ws';
|
||||
import React, { useContext, useEffect } from 'react';
|
||||
import { LoadingContext } from '/imports/ui/components/common/loading-screen/loading-screen-HOC/component';
|
||||
import logger from '/imports/startup/client/logger';
|
||||
import { onError } from '@apollo/client/link/error';
|
||||
import apolloContextHolder from '../../core/graphql/apolloContextHolder/apolloContextHolder';
|
||||
import deviceInfo from '/imports/utils/deviceInfo';
|
||||
|
||||
interface ConnectionManagerProps {
|
||||
children: React.ReactNode;
|
||||
@ -48,6 +50,18 @@ const payloadSizeCheckLink = new ApolloLink((operation, forward) => {
|
||||
return forward(operation);
|
||||
});
|
||||
|
||||
const errorLink = onError(({ graphQLErrors, networkError }) => {
|
||||
if (graphQLErrors) {
|
||||
graphQLErrors.forEach(({ message }) => {
|
||||
logger.error(`[GraphQL error]: Message: ${message}`);
|
||||
});
|
||||
}
|
||||
|
||||
if (networkError) {
|
||||
logger.error(`[Network error]: ${networkError}`);
|
||||
}
|
||||
});
|
||||
|
||||
const ConnectionManager: React.FC<ConnectionManagerProps> = ({ children }): React.ReactNode => {
|
||||
const [graphqlUrlApolloClient, setApolloClient] = React.useState<ApolloClient<NormalizedCacheObject> | null>(null);
|
||||
const [graphqlUrl, setGraphqlUrl] = React.useState<string>('');
|
||||
@ -80,13 +94,20 @@ const ConnectionManager: React.FC<ConnectionManagerProps> = ({ children }): Reac
|
||||
}
|
||||
sessionStorage.setItem('sessionToken', sessionToken);
|
||||
|
||||
const clientSessionUUID = sessionStorage.getItem('clientSessionUUID');
|
||||
const { isMobile } = deviceInfo;
|
||||
|
||||
let wsLink;
|
||||
try {
|
||||
const subscription = createClient({
|
||||
url: graphqlUrl,
|
||||
retryAttempts: 2,
|
||||
connectionParams: {
|
||||
headers: {
|
||||
'X-Session-Token': sessionToken,
|
||||
'X-ClientSessionUUID': clientSessionUUID,
|
||||
'X-ClientType': 'HTML5',
|
||||
'X-ClientIsMobile': isMobile ? 'true' : 'false',
|
||||
},
|
||||
},
|
||||
on: {
|
||||
@ -100,7 +121,7 @@ const ConnectionManager: React.FC<ConnectionManagerProps> = ({ children }): Reac
|
||||
const graphWsLink = new GraphQLWsLink(
|
||||
subscription,
|
||||
);
|
||||
wsLink = ApolloLink.from([payloadSizeCheckLink, graphWsLink]);
|
||||
wsLink = ApolloLink.from([payloadSizeCheckLink, errorLink, graphWsLink]);
|
||||
wsLink.setOnError((error) => {
|
||||
loadingContextInfo.setLoading(false, '');
|
||||
throw new Error('Error: on apollo connection'.concat(JSON.stringify(error) || ''));
|
||||
|
@ -1,5 +1,6 @@
|
||||
import React, { useEffect } from 'react';
|
||||
import { Session } from 'meteor/session';
|
||||
import { v4 as uuid } from 'uuid';
|
||||
import { ErrorScreen } from '../../error-screen/component';
|
||||
import LoadingScreen from '../../common/loading-screen/component';
|
||||
|
||||
@ -52,6 +53,10 @@ const StartupDataFetch: React.FC<StartupDataFetchProps> = ({
|
||||
setError('Timeout on fetching startup data');
|
||||
setLoading(false);
|
||||
}, connectionTimeout);
|
||||
|
||||
const clientSessionUUID = uuid();
|
||||
sessionStorage.setItem('clientSessionUUID', clientSessionUUID);
|
||||
|
||||
const urlParams = new URLSearchParams(window.location.search);
|
||||
const sessionToken = urlParams.get('sessionToken');
|
||||
|
||||
|
@ -12,6 +12,8 @@ import { setAuthData } from '/imports/ui/core/local-states/useAuthData';
|
||||
import MeetingEndedContainer from '../../meeting-ended/component';
|
||||
import { setUserDataToSessionStorage } from './service';
|
||||
import { LoadingContext } from '../../common/loading-screen/loading-screen-HOC/component';
|
||||
import logger from '/imports/startup/client/logger';
|
||||
import deviceInfo from '/imports/utils/deviceInfo';
|
||||
|
||||
const connectionTimeout = 60000;
|
||||
|
||||
@ -110,6 +112,7 @@ const PresenceManager: React.FC<PresenceManagerProps> = ({
|
||||
variables: {
|
||||
authToken,
|
||||
clientType: 'HTML5',
|
||||
clientIsMobile: deviceInfo.isMobile,
|
||||
},
|
||||
});
|
||||
}
|
||||
@ -162,7 +165,8 @@ const PresenceManagerContainer: React.FC<PresenceManagerContainerProps> = ({ chi
|
||||
if (loading || userInfoLoading) return null;
|
||||
if (error || userInfoError) {
|
||||
loadingContextInfo.setLoading(false, '');
|
||||
throw new Error('Error on user authentication: ', error);
|
||||
logger.debug(`Error on user authentication: ${error}`);
|
||||
throw new Error('Error on user authentication');
|
||||
}
|
||||
|
||||
if (!data || data.user_current.length === 0) return null;
|
||||
|
@ -70,10 +70,11 @@ subscription getUserCurrent {
|
||||
}
|
||||
`;
|
||||
export const userJoinMutation = gql`
|
||||
mutation UserJoin($authToken: String!, $clientType: String!) {
|
||||
mutation UserJoin($authToken: String!, $clientType: String!, $clientIsMobile: Boolean!) {
|
||||
userJoinMeeting(
|
||||
authToken: $authToken,
|
||||
clientType: $clientType,
|
||||
clientIsMobile: $clientIsMobile,
|
||||
)
|
||||
}
|
||||
`;
|
||||
|
@ -1,5 +1,4 @@
|
||||
import { Session } from 'meteor/session';
|
||||
import { v4 as uuid } from 'uuid';
|
||||
import { getSettingsSingletonInstance } from '/imports/ui/services/settings';
|
||||
import Auth from '/imports/ui/services/auth';
|
||||
import Meetings from '/imports/api/meetings';
|
||||
@ -43,7 +42,7 @@ class VideoService {
|
||||
|
||||
private deviceId: string | null = null;
|
||||
|
||||
private readonly tabId: string;
|
||||
private readonly clientSessionUUID: string;
|
||||
|
||||
constructor() {
|
||||
this.userParameterProfile = null;
|
||||
@ -52,7 +51,7 @@ class VideoService {
|
||||
this.numberOfDevices = 0;
|
||||
this.record = null;
|
||||
this.hackRecordViewer = null;
|
||||
this.tabId = uuid();
|
||||
this.clientSessionUUID = sessionStorage.getItem('clientSessionUUID') || '0';
|
||||
|
||||
if (navigator.mediaDevices) {
|
||||
this.updateNumberOfDevices = this.updateNumberOfDevices.bind(this);
|
||||
@ -523,12 +522,8 @@ class VideoService {
|
||||
});
|
||||
}
|
||||
|
||||
getTabId() {
|
||||
return this.tabId;
|
||||
}
|
||||
|
||||
getPrefix() {
|
||||
return `${Auth.userID}${TOKEN}${this.tabId}`;
|
||||
return `${Auth.userID}${TOKEN}${this.clientSessionUUID}`;
|
||||
}
|
||||
}
|
||||
|
||||
@ -572,6 +567,5 @@ export default {
|
||||
setTrackEnabled: (value: boolean) => videoService.setTrackEnabled(value),
|
||||
getRoleModerator: VideoService.getRoleModerator,
|
||||
getRoleViewer: VideoService.getRoleViewer,
|
||||
getTabId: videoService.getTabId.bind(videoService),
|
||||
getPrefix: videoService.getPrefix.bind(videoService),
|
||||
};
|
||||
|
@ -344,6 +344,7 @@ const GuestUsersManagementPanel: React.FC<GuestUsersManagementPanelProps> = ({
|
||||
"
|
||||
{
|
||||
guestLobbyMessage && guestLobbyMessage !== ''
|
||||
// eslint-disable-next-line react/no-danger
|
||||
? <span dangerouslySetInnerHTML={{ __html: guestLobbyMessage }} />
|
||||
: intl.formatMessage(intlMessages.emptyMessage)
|
||||
}
|
||||
|
@ -114,6 +114,7 @@ const renderGuestUserItem = (
|
||||
<i>
|
||||
"
|
||||
{privateGuestLobbyMessage && privateGuestLobbyMessage !== ''
|
||||
// eslint-disable-next-line react/no-danger
|
||||
? <span dangerouslySetInnerHTML={{ __html: privateGuestLobbyMessage }} />
|
||||
: intl.formatMessage(intlMessages.emptyMessage)}
|
||||
"
|
||||
|
@ -18,14 +18,6 @@ export const SET_RAISE_HAND = gql`
|
||||
}
|
||||
`;
|
||||
|
||||
export const SET_MOBILE_FLAG = gql`
|
||||
mutation SetMobileFlag($mobile: Boolean!) {
|
||||
userSetMobileFlag(
|
||||
mobile: $mobile,
|
||||
)
|
||||
}
|
||||
`;
|
||||
|
||||
export const EJECT_FROM_MEETING = gql`
|
||||
mutation EjectFromMeeting($userId: String!, $banUser: Boolean!) {
|
||||
userEjectFromMeeting(
|
||||
@ -115,7 +107,6 @@ export const USER_LEAVE_MEETING = gql`
|
||||
export default {
|
||||
SET_CAMERA_PINNED,
|
||||
SET_RAISE_HAND,
|
||||
SET_MOBILE_FLAG,
|
||||
EJECT_FROM_MEETING,
|
||||
EJECT_FROM_VOICE,
|
||||
SET_PRESENTER,
|
||||
|
Loading…
Reference in New Issue
Block a user