refactor: Graphql middleware enhancements (#19336)

* Convert all go chan to SafeChannel

* Remove unnecessary comments

* Refactor graphql message names

* Fix renamed function
This commit is contained in:
Gustavo Trott 2023-12-14 14:01:47 -03:00 committed by GitHub
parent baffceabcb
commit 095085310d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 117 additions and 118 deletions

View File

@ -75,15 +75,15 @@ class BigBlueButtonActor(
private def handleBbbCommonEnvCoreMsg(msg: BbbCommonEnvCoreMsg): Unit = {
msg.core match {
case m: CreateMeetingReqMsg => handleCreateMeetingReqMsg(m)
case m: RegisterUserReqMsg => handleRegisterUserReqMsg(m)
case m: GetAllMeetingsReqMsg => handleGetAllMeetingsReqMsg(m)
case m: GetRunningMeetingsReqMsg => handleGetRunningMeetingsReqMsg(m)
case m: CheckAlivePingSysMsg => handleCheckAlivePingSysMsg(m)
case m: ValidateConnAuthTokenSysMsg => handleValidateConnAuthTokenSysMsg(m)
case _: UserGraphqlConnectionStablishedSysMsg => //Ignore
case _: UserGraphqlConnectionClosedSysMsg => //Ignore
case _ => log.warning("Cannot handle " + msg.envelope.name)
case m: CreateMeetingReqMsg => handleCreateMeetingReqMsg(m)
case m: RegisterUserReqMsg => handleRegisterUserReqMsg(m)
case m: GetAllMeetingsReqMsg => handleGetAllMeetingsReqMsg(m)
case m: GetRunningMeetingsReqMsg => handleGetRunningMeetingsReqMsg(m)
case m: CheckAlivePingSysMsg => handleCheckAlivePingSysMsg(m)
case m: ValidateConnAuthTokenSysMsg => handleValidateConnAuthTokenSysMsg(m)
case _: UserGraphqlConnectionEstablishedSysMsg => //Ignore
case _: UserGraphqlConnectionClosedSysMsg => //Ignore
case _ => log.warning("Cannot handle " + msg.envelope.name)
}
}

View File

@ -95,7 +95,7 @@ object PermissionCheck extends SystemConfiguration {
for {
regUser <- RegisteredUsers.findWithUserId(userId, liveMeeting.registeredUsers)
} yield {
Sender.sendInvalidateUserGraphqlConnectionSysMsg(liveMeeting.props.meetingProp.intId, regUser.id, regUser.sessionToken, reason, outGW)
Sender.sendForceUserGraphqlReconnectionSysMsg(liveMeeting.props.meetingProp.intId, regUser.id, regUser.sessionToken, reason, outGW)
}
} else {
// TODO: get this object a context so it can use the akka logging system

View File

@ -36,7 +36,7 @@ trait EjectUserFromBreakoutInternalMsgHdlr {
Sender.sendDisconnectClientSysMsg(msg.breakoutId, registeredUser.id, msg.ejectedBy, msg.reasonCode, outGW)
// Force reconnection with graphql to refresh permissions
Sender.sendInvalidateUserGraphqlConnectionSysMsg(liveMeeting.props.meetingProp.intId, registeredUser.id, registeredUser.sessionToken, msg.reasonCode, outGW)
Sender.sendForceUserGraphqlReconnectionSysMsg(liveMeeting.props.meetingProp.intId, registeredUser.id, registeredUser.sessionToken, msg.reasonCode, outGW)
//send users update to parent meeting
BreakoutHdlrHelpers.updateParentMeetingWithUsers(liveMeeting, eventBus)

View File

@ -85,7 +85,7 @@ object AssignPresenterActionHandler extends RightsManagementTrait {
for {
u <- RegisteredUsers.findWithUserId(oldPres.intId, liveMeeting.registeredUsers)
} yield {
Sender.sendInvalidateUserGraphqlConnectionSysMsg(liveMeeting.props.meetingProp.intId, oldPres.intId, u.sessionToken, "role_changed", outGW)
Sender.sendForceUserGraphqlReconnectionSysMsg(liveMeeting.props.meetingProp.intId, oldPres.intId, u.sessionToken, "role_changed", outGW)
}
}
}
@ -100,7 +100,7 @@ object AssignPresenterActionHandler extends RightsManagementTrait {
for {
u <- RegisteredUsers.findWithUserId(newPres.intId, liveMeeting.registeredUsers)
} yield {
Sender.sendInvalidateUserGraphqlConnectionSysMsg(liveMeeting.props.meetingProp.intId, newPres.intId, u.sessionToken, "role_changed", outGW)
Sender.sendForceUserGraphqlReconnectionSysMsg(liveMeeting.props.meetingProp.intId, newPres.intId, u.sessionToken, "role_changed", outGW)
}
}
}

View File

@ -73,7 +73,7 @@ trait ChangeUserRoleCmdMsgHdlr extends RightsManagementTrait {
for {
u <- RegisteredUsers.findWithUserId(uvo.intId, liveMeeting.registeredUsers)
} yield {
Sender.sendInvalidateUserGraphqlConnectionSysMsg(liveMeeting.props.meetingProp.intId, uvo.intId, u.sessionToken, "role_changed", outGW)
Sender.sendForceUserGraphqlReconnectionSysMsg(liveMeeting.props.meetingProp.intId, uvo.intId, u.sessionToken, "role_changed", outGW)
}
}
}

View File

@ -74,7 +74,7 @@ trait EjectUserFromMeetingCmdMsgHdlr extends RightsManagementTrait {
Sender.sendDisconnectClientSysMsg(meetingId, ru.id, ejectedBy, EjectReasonCode.EJECT_USER, outGW)
// Force reconnection with graphql to refresh permissions
Sender.sendInvalidateUserGraphqlConnectionSysMsg(liveMeeting.props.meetingProp.intId, registeredUser.id, registeredUser.sessionToken, EjectReasonCode.EJECT_USER, outGW)
Sender.sendForceUserGraphqlReconnectionSysMsg(liveMeeting.props.meetingProp.intId, registeredUser.id, registeredUser.sessionToken, EjectReasonCode.EJECT_USER, outGW)
}
} else {
// User is ejecting self, so just eject this userid not all sessions if joined using multiple
@ -93,7 +93,7 @@ trait EjectUserFromMeetingCmdMsgHdlr extends RightsManagementTrait {
Sender.sendDisconnectClientSysMsg(meetingId, userId, ejectedBy, EjectReasonCode.EJECT_USER, outGW)
// Force reconnection with graphql to refresh permissions
Sender.sendInvalidateUserGraphqlConnectionSysMsg(liveMeeting.props.meetingProp.intId, registeredUser.id, registeredUser.sessionToken, EjectReasonCode.EJECT_USER, outGW)
Sender.sendForceUserGraphqlReconnectionSysMsg(liveMeeting.props.meetingProp.intId, registeredUser.id, registeredUser.sessionToken, EjectReasonCode.EJECT_USER, outGW)
}
}
@ -129,7 +129,7 @@ trait EjectUserFromMeetingSysMsgHdlr {
for {
regUser <- RegisteredUsers.findWithUserId(userId, liveMeeting.registeredUsers)
} yield {
Sender.sendInvalidateUserGraphqlConnectionSysMsg(liveMeeting.props.meetingProp.intId, regUser.id, regUser.sessionToken, EjectReasonCode.SYSTEM_EJECT_USER, outGW)
Sender.sendForceUserGraphqlReconnectionSysMsg(liveMeeting.props.meetingProp.intId, regUser.id, regUser.sessionToken, EjectReasonCode.SYSTEM_EJECT_USER, outGW)
}
}
}

View File

@ -44,7 +44,7 @@ trait LockUserInMeetingCmdMsgHdlr extends RightsManagementTrait {
for {
u <- RegisteredUsers.findWithUserId(uvo.intId, liveMeeting.registeredUsers)
} yield {
Sender.sendInvalidateUserGraphqlConnectionSysMsg(liveMeeting.props.meetingProp.intId, uvo.intId, u.sessionToken, "lock_user_changed", outGW)
Sender.sendForceUserGraphqlReconnectionSysMsg(liveMeeting.props.meetingProp.intId, uvo.intId, u.sessionToken, "lock_user_changed", outGW)
}
log.info("Lock user. meetingId=" + props.meetingProp.intId + " userId=" + uvo.intId + " locked=" + uvo.locked)

View File

@ -49,7 +49,7 @@ trait RegisterUserReqMsgHdlr {
Sender.sendDisconnectClientSysMsg(meetingId, userToRemove.id, SystemUser.ID, EjectReasonCode.DUPLICATE_USER, outGW)
// Force reconnection with graphql to refresh permissions
Sender.sendInvalidateUserGraphqlConnectionSysMsg(liveMeeting.props.meetingProp.intId, userToRemove.id, userToRemove.sessionToken, EjectReasonCode.DUPLICATE_USER, outGW)
Sender.sendForceUserGraphqlReconnectionSysMsg(liveMeeting.props.meetingProp.intId, userToRemove.id, userToRemove.sessionToken, EjectReasonCode.DUPLICATE_USER, outGW)
}
}
}

View File

@ -51,7 +51,7 @@ trait UserJoinMeetingReqMsgHdlr extends HandlerHelpers {
notifyPreviousUsersWithSameExtId(regUser)
clearCachedVoiceUser(regUser)
clearExpiredUserState(regUser)
invalidateUserGraphqlConnection(regUser)
ForceUserGraphqlReconnection(regUser)
newState
}
@ -146,7 +146,7 @@ trait UserJoinMeetingReqMsgHdlr extends HandlerHelpers {
private def clearExpiredUserState(regUser: RegisteredUser) =
UserStateDAO.updateExpired(regUser.id, false)
private def invalidateUserGraphqlConnection(regUser: RegisteredUser) =
Sender.sendInvalidateUserGraphqlConnectionSysMsg(liveMeeting.props.meetingProp.intId, regUser.id, regUser.sessionToken, "user_joined", outGW)
private def ForceUserGraphqlReconnection(regUser: RegisteredUser) =
Sender.sendForceUserGraphqlReconnectionSysMsg(liveMeeting.props.meetingProp.intId, regUser.id, regUser.sessionToken, "user_joined", outGW)
}

View File

@ -31,7 +31,7 @@ trait UserLeaveReqMsgHdlr extends HandlerHelpers {
ru <- RegisteredUsers.findWithUserId(msg.body.userId, liveMeeting.registeredUsers)
} yield {
RegisteredUsers.setUserLoggedOutFlag(liveMeeting.registeredUsers, ru)
Sender.sendInvalidateUserGraphqlConnectionSysMsg(liveMeeting.props.meetingProp.intId, ru.id, ru.sessionToken, "user_loggedout", outGW)
Sender.sendForceUserGraphqlReconnectionSysMsg(liveMeeting.props.meetingProp.intId, ru.id, ru.sessionToken, "user_loggedout", outGW)
}
}
state

View File

@ -450,8 +450,8 @@ class ReceivedJsonMsgHandlerActor(
routeGenericMsg[TimerEndedPubMsg](envelope, jsonNode)
// Messages from Graphql Middleware
case UserGraphqlConnectionStablishedSysMsg.NAME =>
route[UserGraphqlConnectionStablishedSysMsg](meetingManagerChannel, envelope, jsonNode)
case UserGraphqlConnectionEstablishedSysMsg.NAME =>
route[UserGraphqlConnectionEstablishedSysMsg](meetingManagerChannel, envelope, jsonNode)
case UserGraphqlConnectionClosedSysMsg.NAME =>
route[UserGraphqlConnectionClosedSysMsg](meetingManagerChannel, envelope, jsonNode)

View File

@ -1005,7 +1005,7 @@ class MeetingActor(
for {
regUser <- RegisteredUsers.findWithUserId(u.intId, liveMeeting.registeredUsers)
} yield {
Sender.sendInvalidateUserGraphqlConnectionSysMsg(liveMeeting.props.meetingProp.intId, regUser.id, regUser.sessionToken, EjectReasonCode.USER_INACTIVITY, outGW)
Sender.sendForceUserGraphqlReconnectionSysMsg(liveMeeting.props.meetingProp.intId, regUser.id, regUser.sessionToken, EjectReasonCode.USER_INACTIVITY, outGW)
}
}
}

View File

@ -246,12 +246,12 @@ object MsgBuilder {
BbbCommonEnvCoreMsg(envelope, event)
}
def buildInvalidateUserGraphqlConnectionSysMsg(meetingId: String, userId: String, sessionToken: String, reason: String): BbbCommonEnvCoreMsg = {
def buildForceUserGraphqlReconnectionSysMsg(meetingId: String, userId: String, sessionToken: String, reason: String): BbbCommonEnvCoreMsg = {
val routing = Routing.addMsgToClientRouting(MessageTypes.SYSTEM, meetingId, userId)
val envelope = BbbCoreEnvelope(InvalidateUserGraphqlConnectionSysMsg.NAME, routing)
val header = BbbCoreHeaderWithMeetingId(InvalidateUserGraphqlConnectionSysMsg.NAME, meetingId)
val body = InvalidateUserGraphqlConnectionSysMsgBody(meetingId, userId, sessionToken, reason)
val event = InvalidateUserGraphqlConnectionSysMsg(header, body)
val envelope = BbbCoreEnvelope(ForceUserGraphqlReconnectionSysMsg.NAME, routing)
val header = BbbCoreHeaderWithMeetingId(ForceUserGraphqlReconnectionSysMsg.NAME, meetingId)
val body = ForceUserGraphqlReconnectionSysMsgBody(meetingId, userId, sessionToken, reason)
val event = ForceUserGraphqlReconnectionSysMsg(header, body)
BbbCommonEnvCoreMsg(envelope, event)
}

View File

@ -10,9 +10,9 @@ object Sender {
outGW.send(ejectFromMeetingSystemEvent)
}
def sendInvalidateUserGraphqlConnectionSysMsg(meetingId: String, userId: String, sessionToken: String, reason: String, outGW: OutMsgRouter): Unit = {
val invalidateUserGraphqlConnectionSysMsg = MsgBuilder.buildInvalidateUserGraphqlConnectionSysMsg(meetingId, userId, sessionToken, reason)
outGW.send(invalidateUserGraphqlConnectionSysMsg)
def sendForceUserGraphqlReconnectionSysMsg(meetingId: String, userId: String, sessionToken: String, reason: String, outGW: OutMsgRouter): Unit = {
val ForceUserGraphqlReconnectionSysMsg = MsgBuilder.buildForceUserGraphqlReconnectionSysMsg(meetingId, userId, sessionToken, reason)
outGW.send(ForceUserGraphqlReconnectionSysMsg)
}
def sendUserInactivityInspectMsg(meetingId: String, userId: String, responseDelay: Long, outGW: OutMsgRouter): Unit = {

View File

@ -26,13 +26,13 @@ class GraphqlActionsActor(
private def handleBbbCommonEnvCoreMsg(msg: BbbCommonEnvCoreMsg): Unit = {
msg.core match {
// Messages from bbb-graphql-middleware
case m: UserGraphqlConnectionStablishedSysMsg => handleUserGraphqlConnectionStablishedSysMsg(m)
case m: UserGraphqlConnectionEstablishedSysMsg => handleUserGraphqlConnectionEstablishedSysMsg(m)
case m: UserGraphqlConnectionClosedSysMsg => handleUserGraphqlConnectionClosedSysMsg(m)
case _ => // message not to be handled.
}
}
private def handleUserGraphqlConnectionStablishedSysMsg(msg: UserGraphqlConnectionStablishedSysMsg) {
private def handleUserGraphqlConnectionEstablishedSysMsg(msg: UserGraphqlConnectionEstablishedSysMsg) {
UserGraphqlConnectionDAO.insert(msg.body.sessionToken, msg.body.browserConnectionId)
}

View File

@ -235,30 +235,30 @@ case class DeletedRecordingSysMsgBody(recordId: String)
/**
* Sent from akka-apps to graphql-middleware
*/
object InvalidateUserGraphqlConnectionSysMsg { val NAME = "InvalidateUserGraphqlConnectionSysMsg" }
case class InvalidateUserGraphqlConnectionSysMsg(
object ForceUserGraphqlReconnectionSysMsg { val NAME = "ForceUserGraphqlReconnectionSysMsg" }
case class ForceUserGraphqlReconnectionSysMsg(
header: BbbCoreHeaderWithMeetingId,
body: InvalidateUserGraphqlConnectionSysMsgBody
body: ForceUserGraphqlReconnectionSysMsgBody
) extends BbbCoreMsg
case class InvalidateUserGraphqlConnectionSysMsgBody(meetingId: String, userId: String, sessionToken: String, reason: String)
case class ForceUserGraphqlReconnectionSysMsgBody(meetingId: String, userId: String, sessionToken: String, reason: String)
/**
* Sent from graphql-middleware to akka-apps
*/
object UserGraphqlConnectionInvalidatedEvtMsg { val NAME = "UserGraphqlConnectionInvalidatedEvtMsg" }
case class UserGraphqlConnectionInvalidatedEvtMsg(
object UserGraphqlReconnectionForcedEvtMsg { val NAME = "UserGraphqlReconnectionForcedEvtMsg" }
case class UserGraphqlReconnectionForcedEvtMsg(
header: BbbCoreBaseHeader,
body: UserGraphqlConnectionInvalidatedEvtMsgBody
body: UserGraphqlReconnectionForcedEvtMsgBody
) extends BbbCoreMsg
case class UserGraphqlConnectionInvalidatedEvtMsgBody(sessionToken: String, browserConnectionId: String)
case class UserGraphqlReconnectionForcedEvtMsgBody(sessionToken: String, browserConnectionId: String)
object UserGraphqlConnectionStablishedSysMsg { val NAME = "UserGraphqlConnectionStablishedSysMsg" }
case class UserGraphqlConnectionStablishedSysMsg(
object UserGraphqlConnectionEstablishedSysMsg { val NAME = "UserGraphqlConnectionEstablishedSysMsg" }
case class UserGraphqlConnectionEstablishedSysMsg(
header: BbbCoreBaseHeader,
body: UserGraphqlConnectionStablishedSysMsgBody
body: UserGraphqlConnectionEstablishedSysMsgBody
) extends BbbCoreMsg
case class UserGraphqlConnectionStablishedSysMsgBody(sessionToken: String, browserConnectionId: String)
case class UserGraphqlConnectionEstablishedSysMsgBody(sessionToken: String, browserConnectionId: String)
object UserGraphqlConnectionClosedSysMsg { val NAME = "UserGraphqlConnectionClosedSysMsg" }
case class UserGraphqlConnectionClosedSysMsg(

View File

@ -32,6 +32,10 @@ func (s *SafeChannel) Receive() (interface{}, bool) {
return val, ok
}
func (s *SafeChannel) ReceiveChannel() <-chan interface{} {
return s.ch
}
func (s *SafeChannel) Close() {
s.mux.Lock()
defer s.mux.Unlock()

View File

@ -22,7 +22,7 @@ var lastHasuraConnectionId int
var hasuraEndpoint = os.Getenv("BBB_GRAPHQL_MIDDLEWARE_HASURA_WS")
// Hasura client connection
func HasuraClient(browserConnection *common.BrowserConnection, cookies []*http.Cookie, fromBrowserChannel chan interface{}, toBrowserChannel chan interface{}) error {
func HasuraClient(browserConnection *common.BrowserConnection, cookies []*http.Cookie, fromBrowserToHasuraChannel *common.SafeChannel, fromHasuraToBrowserChannel *common.SafeChannel) error {
log := log.WithField("_routine", "HasuraClient").WithField("browserConnectionId", browserConnection.Id)
// Obtain id for this connection
@ -90,14 +90,14 @@ func HasuraClient(browserConnection *common.BrowserConnection, cookies []*http.C
// Start routines
// reads from browser, writes to hasura
go writer.HasuraConnectionWriter(&thisConnection, fromBrowserChannel, &wg)
go writer.HasuraConnectionWriter(&thisConnection, fromBrowserToHasuraChannel, &wg)
// reads from hasura, writes to browser
go reader.HasuraConnectionReader(&thisConnection, toBrowserChannel, fromBrowserChannel, &wg)
go reader.HasuraConnectionReader(&thisConnection, fromHasuraToBrowserChannel, fromBrowserToHasuraChannel, &wg)
// if it's a reconnect, inject authentication
if !browserConnection.Disconnected && browserConnection.ConnectionInitMessage != nil {
fromBrowserChannel <- browserConnection.ConnectionInitMessage
fromBrowserToHasuraChannel.Send(browserConnection.ConnectionInitMessage)
}
// Wait

View File

@ -2,7 +2,7 @@ package reader
import (
"github.com/iMDT/bbb-graphql-middleware/internal/common"
"github.com/iMDT/bbb-graphql-middleware/internal/hascli/replayer"
"github.com/iMDT/bbb-graphql-middleware/internal/hascli/retransmiter"
"github.com/iMDT/bbb-graphql-middleware/internal/msgpatch"
log "github.com/sirupsen/logrus"
"nhooyr.io/websocket/wsjson"
@ -10,7 +10,7 @@ import (
)
// HasuraConnectionReader consumes messages from Hasura connection and add send to the browser channel
func HasuraConnectionReader(hc *common.HasuraConnection, fromHasuraToBrowserChannel chan interface{}, fromBrowserToHasuraChannel chan interface{}, wg *sync.WaitGroup) {
func HasuraConnectionReader(hc *common.HasuraConnection, fromHasuraToBrowserChannel *common.SafeChannel, fromBrowserToHasuraChannel *common.SafeChannel, wg *sync.WaitGroup) {
log := log.WithField("_routine", "HasuraConnectionReader").WithField("browserConnectionId", hc.Browserconn.Id).WithField("hasuraConnectionId", hc.Id)
defer log.Debugf("finished")
log.Debugf("starting")
@ -62,12 +62,12 @@ func HasuraConnectionReader(hc *common.HasuraConnection, fromHasuraToBrowserChan
}
// Write the message to browser
fromHasuraToBrowserChannel <- messageAsMap
fromHasuraToBrowserChannel.Send(messageAsMap)
// Replay the subscription start commands when hasura confirms the connection
// Retransmit the subscription start commands when hasura confirms the connection
// this is useful in case of a connection invalidation
if messageType == "connection_ack" {
go replayer.ReplaySubscriptionStartMessages(hc, fromBrowserToHasuraChannel)
go retransmiter.RetransmitSubscriptionStartMessages(hc, fromBrowserToHasuraChannel)
}
}
}

View File

@ -12,7 +12,7 @@ import (
// HasuraConnectionWriter
// process messages (middleware to hasura)
func HasuraConnectionWriter(hc *common.HasuraConnection, fromBrowserChannel chan interface{}, wg *sync.WaitGroup) {
func HasuraConnectionWriter(hc *common.HasuraConnection, fromBrowserToHasuraChannel *common.SafeChannel, wg *sync.WaitGroup) {
log := log.WithField("_routine", "HasuraConnectionWriter")
browserConnection := hc.Browserconn
@ -28,7 +28,7 @@ RangeLoop:
select {
case <-hc.Context.Done():
break RangeLoop
case fromBrowserMessage := <-fromBrowserChannel:
case fromBrowserMessage := <-fromBrowserToHasuraChannel.ReceiveChannel():
{
if fromBrowserMessage == nil {
continue

View File

@ -1,19 +0,0 @@
package replayer
import (
"github.com/iMDT/bbb-graphql-middleware/internal/common"
log "github.com/sirupsen/logrus"
)
func ReplaySubscriptionStartMessages(hc *common.HasuraConnection, fromBrowserChannel chan interface{}) {
log := log.WithField("_routine", "ReplaySubscriptionStartMessages").WithField("browserConnectionId", hc.Browserconn.Id).WithField("hasuraConnectionId", hc.Id)
hc.Browserconn.ActiveSubscriptionsMutex.RLock()
for _, subscription := range hc.Browserconn.ActiveSubscriptions {
if subscription.LastSeenOnHasuraConnetion != hc.Id {
log.Tracef("replaying subscription start: %v", subscription.Message)
fromBrowserChannel <- subscription.Message
}
}
hc.Browserconn.ActiveSubscriptionsMutex.RUnlock()
}

View File

@ -0,0 +1,19 @@
package retransmiter
import (
"github.com/iMDT/bbb-graphql-middleware/internal/common"
log "github.com/sirupsen/logrus"
)
func RetransmitSubscriptionStartMessages(hc *common.HasuraConnection, fromBrowserToHasuraChannel *common.SafeChannel) {
log := log.WithField("_routine", "RetransmitSubscriptionStartMessages").WithField("browserConnectionId", hc.Browserconn.Id).WithField("hasuraConnectionId", hc.Id)
hc.Browserconn.ActiveSubscriptionsMutex.RLock()
for _, subscription := range hc.Browserconn.ActiveSubscriptions {
if subscription.LastSeenOnHasuraConnetion != hc.Id {
log.Tracef("retransmiting subscription start: %v", subscription.Message)
fromBrowserToHasuraChannel.Send(subscription.Message)
}
}
hc.Browserconn.ActiveSubscriptionsMutex.RUnlock()
}

View File

@ -70,7 +70,7 @@ func ClearAllCaches() {
if err == nil && info.IsDir() {
filepath.Walk(cacheDir, func(path string, info os.FileInfo, err error) error {
if err != nil {
log.Errorf("prevent panic by handling failure accessing a path %q: %v\n", path, err)
log.Debugf("Cache dir was removed previously (probably user disconnected): %q: %v\n", path, err)
return err
}

View File

@ -78,9 +78,9 @@ func ConnectionHandler(w http.ResponseWriter, r *http.Request) {
log.Infof("connection accepted")
// Create channels
fromBrowserChannel1 := make(chan interface{}, bufferSize)
fromBrowserChannel2 := common.NewSafeChannel(bufferSize)
toBrowserChannel := make(chan interface{}, bufferSize)
fromBrowserToHasuraConnectionEstablishingChannel := common.NewSafeChannel(bufferSize)
fromBrowserToHasuraChannel := common.NewSafeChannel(bufferSize)
fromHasuraToBrowserChannel := common.NewSafeChannel(bufferSize)
// Ensure a hasura client is running while the browser is connected
go func() {
@ -99,7 +99,7 @@ func ConnectionHandler(w http.ResponseWriter, r *http.Request) {
BrowserConnectionsMutex.RUnlock()
log.Debugf("created hasura client")
if thisBrowserConnection != nil {
hascli.HasuraClient(thisBrowserConnection, r.Cookies(), fromBrowserChannel1, toBrowserChannel)
hascli.HasuraClient(thisBrowserConnection, r.Cookies(), fromBrowserToHasuraChannel, fromHasuraToBrowserChannel)
}
time.Sleep(100 * time.Millisecond)
}
@ -114,21 +114,20 @@ func ConnectionHandler(w http.ResponseWriter, r *http.Request) {
var wgReader sync.WaitGroup
wgReader.Add(1)
// Reads from browser connection, writes into fromBrowserChannel1 and fromBrowserChannel2
go reader.BrowserConnectionReader(browserConnectionId, browserConnectionContext, c, fromBrowserChannel1, fromBrowserChannel2, []*sync.WaitGroup{&wgAll, &wgReader})
// Reads from browser connection, writes into fromBrowserToHasuraChannel and fromBrowserToHasuraConnectionEstablishingChannel
go reader.BrowserConnectionReader(browserConnectionId, browserConnectionContext, c, fromBrowserToHasuraChannel, fromBrowserToHasuraConnectionEstablishingChannel, []*sync.WaitGroup{&wgAll, &wgReader})
go func() {
wgReader.Wait()
thisConnection.Disconnected = true
}()
// Reads from toBrowserChannel, writes to browser connection
go writer.BrowserConnectionWriter(browserConnectionId, browserConnectionContext, c, toBrowserChannel, &wgAll)
// Reads from fromHasuraToBrowserChannel, writes to browser connection
go writer.BrowserConnectionWriter(browserConnectionId, browserConnectionContext, c, fromHasuraToBrowserChannel, &wgAll)
go ConnectionInitHandler(browserConnectionId, browserConnectionContext, fromBrowserChannel2, &wgAll)
go ConnectionInitHandler(browserConnectionId, browserConnectionContext, fromBrowserToHasuraConnectionEstablishingChannel, &wgAll)
// Wait until all routines are finished
wgAll.Wait()
}
func InvalidateSessionTokenConnections(sessionTokenToInvalidate string) {
@ -140,7 +139,7 @@ func InvalidateSessionTokenConnections(sessionTokenToInvalidate string) {
browserConnection.HasuraConnection.ContextCancelFunc()
log.Debugf("Processed invalidate request for sessionToken %v (hasura connection %v)", sessionTokenToInvalidate, browserConnection.HasuraConnection.Id)
go SendUserGraphqlConnectionInvalidatedEvtMsg(browserConnection.SessionToken)
go SendUserGraphqlReconnectionForcedEvtMsg(browserConnection.SessionToken)
}
}
}

View File

@ -7,7 +7,7 @@ import (
"sync"
)
func ConnectionInitHandler(browserConnectionId string, browserConnectionContext context.Context, fromBrowser *common.SafeChannel, wg *sync.WaitGroup) {
func ConnectionInitHandler(browserConnectionId string, browserConnectionContext context.Context, fromBrowserToHasuraConnectionEstablishingChannel *common.SafeChannel, wg *sync.WaitGroup) {
log := log.WithField("_routine", "ConnectionInitHandler").WithField("browserConnectionId", browserConnectionId)
log.Debugf("starting")
@ -21,7 +21,7 @@ func ConnectionInitHandler(browserConnectionId string, browserConnectionContext
// Intercept the fromBrowserMessage channel to get the sessionToken
for {
fromBrowserMessage, ok := fromBrowser.Receive()
fromBrowserMessage, ok := fromBrowserToHasuraConnectionEstablishingChannel.Receive()
if !ok {
//Received all messages. Channel is closed
return
@ -35,13 +35,13 @@ func ConnectionInitHandler(browserConnectionId string, browserConnectionContext
var sessionToken = headersAsMap["X-Session-Token"]
if sessionToken != nil {
sessionToken := headersAsMap["X-Session-Token"].(string)
log.Infof("[SessionTokenReader] intercepted session token %v", sessionToken)
log.Debugf("[SessionTokenReader] intercepted session token %v", sessionToken)
BrowserConnectionsMutex.Lock()
browserConnection.SessionToken = sessionToken
BrowserConnectionsMutex.Unlock()
go SendUserGraphqlConnectionStablishedSysMsg(sessionToken, browserConnectionId)
fromBrowser.Close()
go SendUserGraphqlConnectionEstablishedSysMsg(sessionToken, browserConnectionId)
fromBrowserToHasuraConnectionEstablishingChannel.Close()
break
}
}

View File

@ -10,14 +10,14 @@ import (
"time"
)
func BrowserConnectionReader(browserConnectionId string, ctx context.Context, c *websocket.Conn, fromBrowserChannel1 chan interface{}, fromBrowserChannel2 *common.SafeChannel, waitGroups []*sync.WaitGroup) {
func BrowserConnectionReader(browserConnectionId string, ctx context.Context, c *websocket.Conn, fromBrowserToHasuraChannel1 *common.SafeChannel, fromBrowserToHasuraChannel2 *common.SafeChannel, waitGroups []*sync.WaitGroup) {
log := log.WithField("_routine", "BrowserConnectionReader").WithField("browserConnectionId", browserConnectionId)
defer log.Debugf("finished")
log.Debugf("starting")
defer func() {
close(fromBrowserChannel1)
fromBrowserChannel2.Close()
fromBrowserToHasuraChannel1.Close()
fromBrowserToHasuraChannel2.Close()
}()
defer func() {
@ -32,21 +32,17 @@ func BrowserConnectionReader(browserConnectionId string, ctx context.Context, c
ctx, cancel := context.WithCancel(ctx)
defer cancel()
fromBrowserChannel2Alive := true
for {
var v interface{}
err := wsjson.Read(ctx, c, &v)
if err != nil {
log.Errorf("error on read (browser is disconnected): %v", err)
log.Debugf("Browser is disconnected, skiping reading of ws message: %v", err)
return
}
log.Tracef("received from browser: %v", v)
fromBrowserChannel1 <- v
if fromBrowserChannel2Alive {
fromBrowserChannel2Alive = fromBrowserChannel2.Send(v)
}
fromBrowserToHasuraChannel1.Send(v)
fromBrowserToHasuraChannel2.Send(v)
}
}

View File

@ -3,7 +3,6 @@ package websrv
import (
"context"
"encoding/json"
"fmt"
"github.com/redis/go-redis/v9"
log "github.com/sirupsen/logrus"
"os"
@ -35,7 +34,7 @@ func StartRedisListener() {
}
// Skip parsing unnecessary messages
if !strings.Contains(msg.Payload, "InvalidateUserGraphqlConnectionSysMsg") {
if !strings.Contains(msg.Payload, "ForceUserGraphqlReconnectionSysMsg") {
continue
}
@ -50,7 +49,7 @@ func StartRedisListener() {
messageType := messageEnvelopeAsMap["name"]
if messageType == "InvalidateUserGraphqlConnectionSysMsg" {
if messageType == "ForceUserGraphqlReconnectionSysMsg" {
messageCoreAsMap := messageAsMap["core"].(map[string]interface{})
messageBodyAsMap := messageCoreAsMap["body"].(map[string]interface{})
sessionTokenToInvalidate := messageBodyAsMap["sessionToken"]
@ -89,34 +88,34 @@ func sendBbbCoreMsgToRedis(name string, body map[string]interface{}) {
messageJSON, err := json.Marshal(message)
if err != nil {
fmt.Printf("Error while marshaling message to json: %v\n", err)
log.Tracef("Error while marshaling message to json: %v\n", err)
return
}
err = GetRedisConn().Publish(context.Background(), channelName, messageJSON).Err()
if err != nil {
fmt.Printf("Error while sending msg to redis channel: %v\n", err)
log.Tracef("Error while sending msg to redis channel: %v\n", err)
return
}
fmt.Printf("JSON message sent to channel %s:\n%s\n", channelName, messageJSON)
log.Tracef("JSON message sent to channel %s:\n%s\n", channelName, messageJSON)
}
func SendUserGraphqlConnectionInvalidatedEvtMsg(sessionToken string) {
func SendUserGraphqlReconnectionForcedEvtMsg(sessionToken string) {
var body = map[string]interface{}{
"sessionToken": sessionToken,
}
sendBbbCoreMsgToRedis("UserGraphqlConnectionInvalidatedEvtMsg", body)
sendBbbCoreMsgToRedis("UserGraphqlReconnectionForcedEvtMsg", body)
}
func SendUserGraphqlConnectionStablishedSysMsg(sessionToken string, browserConnectionId string) {
func SendUserGraphqlConnectionEstablishedSysMsg(sessionToken string, browserConnectionId string) {
var body = map[string]interface{}{
"sessionToken": sessionToken,
"browserConnectionId": browserConnectionId,
}
sendBbbCoreMsgToRedis("UserGraphqlConnectionStablishedSysMsg", body)
sendBbbCoreMsgToRedis("UserGraphqlConnectionEstablishedSysMsg", body)
}
func SendUserGraphqlConnectionClosedSysMsg(sessionToken string, browserConnectionId string) {

View File

@ -2,6 +2,7 @@ package writer
import (
"context"
"github.com/iMDT/bbb-graphql-middleware/internal/common"
log "github.com/sirupsen/logrus"
"sync"
@ -9,7 +10,7 @@ import (
"nhooyr.io/websocket/wsjson"
)
func BrowserConnectionWriter(browserConnectionId string, ctx context.Context, c *websocket.Conn, toBrowserChannel chan interface{}, wg *sync.WaitGroup) {
func BrowserConnectionWriter(browserConnectionId string, ctx context.Context, c *websocket.Conn, fromHasuratoBrowserChannel *common.SafeChannel, wg *sync.WaitGroup) {
log := log.WithField("_routine", "BrowserConnectionWriter").WithField("browserConnectionId", browserConnectionId)
defer log.Debugf("finished")
log.Debugf("starting")
@ -20,14 +21,14 @@ RangeLoop:
select {
case <-ctx.Done():
break RangeLoop
case toBrowserMessage := <-toBrowserChannel:
case toBrowserMessage := <-fromHasuratoBrowserChannel.ReceiveChannel():
{
var toBrowserMessageAsMap = toBrowserMessage.(map[string]interface{})
log.Tracef("sending to browser: %v", toBrowserMessage)
err := wsjson.Write(ctx, c, toBrowserMessage)
if err != nil {
log.Errorf("error on write (browser is disconnected): %v", err)
log.Debugf("Browser is disconnected, skiping writing of ws message: %v", err)
return
}