refactor (gql-middlware): Refactored code to simplify channel handling and remove unnecessary routines (#20640)

* Refactor gql-middleware channels and remove dedicated channel to establish init connection

* remove unnecessary routin to wait mutations to complete before invalidate Hasura connection
This commit is contained in:
Gustavo Trott 2024-07-05 13:35:08 -03:00 committed by GitHub
parent fb38ae0baa
commit 3c71aab4bc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 99 additions and 158 deletions

View File

@ -34,34 +34,36 @@ type GraphQlSubscription struct {
} }
type BrowserConnection struct { type BrowserConnection struct {
Id string // browser connection id Id string // browser connection id
Websocket *websocket.Conn // websocket of browser connection Websocket *websocket.Conn // websocket of browser connection
SessionToken string // session token of this connection SessionToken string // session token of this connection
MeetingId string // auth info provided by bbb-web MeetingId string // auth info provided by bbb-web
UserId string // auth info provided by bbb-web UserId string // auth info provided by bbb-web
BBBWebSessionVariables map[string]string // graphql session variables provided by akka-apps BBBWebSessionVariables map[string]string // graphql session variables provided by akka-apps
ClientSessionUUID string // self-generated unique id for this client ClientSessionUUID string // self-generated unique id for this client
Context context.Context // browser connection context Context context.Context // browser connection context
ContextCancelFunc context.CancelFunc // function to cancel the browser context (and so, the browser connection) ContextCancelFunc context.CancelFunc // function to cancel the browser context (and so, the browser connection)
BrowserRequestCookies []*http.Cookie BrowserRequestCookies []*http.Cookie
ActiveSubscriptions map[string]GraphQlSubscription // active subscriptions of this connection (start, but no stop) ActiveSubscriptions map[string]GraphQlSubscription // active subscriptions of this connection (start, but no stop)
ActiveSubscriptionsMutex sync.RWMutex // mutex to control the map usage ActiveSubscriptionsMutex sync.RWMutex // mutex to control the map usage
ConnectionInitMessage []byte // init message received in this connection (to be used on hasura reconnect) ConnectionInitMessage []byte // init message received in this connection (to be used on hasura reconnect)
HasuraConnection *HasuraConnection // associated hasura connection HasuraConnection *HasuraConnection // associated hasura connection
Disconnected bool // indicate if the connection is gone Disconnected bool // indicate if the connection is gone
ConnAckSentToBrowser bool // indicate if `connection_ack` msg was already sent to the browser ConnAckSentToBrowser bool // indicate if `connection_ack` msg was already sent to the browser
GraphqlActionsContext context.Context // graphql actions context GraphqlActionsContext context.Context // graphql actions context
GraphqlActionsContextCancel context.CancelFunc // function to cancel the graphql actions context GraphqlActionsContextCancel context.CancelFunc // function to cancel the graphql actions context
FromBrowserToHasuraChannel *SafeChannelByte // channel to transmit messages from Browser to Hasura
FromBrowserToGqlActionsChannel *SafeChannelByte // channel to transmit messages from Browser to Graphq-Actions
FromHasuraToBrowserChannel *SafeChannelByte // channel to transmit messages from Hasura/GqlActions to Browser
} }
type HasuraConnection struct { type HasuraConnection struct {
Id string // hasura connection id Id string // hasura connection id
BrowserConn *BrowserConnection // browser connection that originated this hasura connection BrowserConn *BrowserConnection // browser connection that originated this hasura connection
Websocket *websocket.Conn // websocket used to connect to Hasura Websocket *websocket.Conn // websocket used to connect to Hasura
WebsocketCloseError *websocket.CloseError // closeError received from Hasura WebsocketCloseError *websocket.CloseError // closeError received from Hasura
Context context.Context // hasura connection context (child of browser connection context) Context context.Context // hasura connection context (child of browser connection context)
ContextCancelFunc context.CancelFunc // function to cancel the hasura context (and so, the hasura connection) ContextCancelFunc context.CancelFunc // function to cancel the hasura context (and so, the hasura connection)
FreezeMsgFromBrowserChan *SafeChannel // indicate that it's waiting for the return of mutations before closing connection
} }
type HasuraMessage struct { type HasuraMessage struct {

View File

@ -17,9 +17,7 @@ import (
var graphqlActionsUrl = os.Getenv("BBB_GRAPHQL_MIDDLEWARE_GRAPHQL_ACTIONS_URL") var graphqlActionsUrl = os.Getenv("BBB_GRAPHQL_MIDDLEWARE_GRAPHQL_ACTIONS_URL")
func GraphqlActionsClient( func GraphqlActionsClient(
browserConnection *common.BrowserConnection, browserConnection *common.BrowserConnection) error {
fromBrowserToGqlActionsChannel *common.SafeChannelByte,
fromHasuraToBrowserChannel *common.SafeChannelByte) error {
log := log.WithField("_routine", "GraphqlActionsClient").WithField("browserConnectionId", browserConnection.Id) log := log.WithField("_routine", "GraphqlActionsClient").WithField("browserConnectionId", browserConnection.Id)
log.Debug("Starting GraphqlActionsClient") log.Debug("Starting GraphqlActionsClient")
@ -33,7 +31,7 @@ RangeLoop:
case <-browserConnection.GraphqlActionsContext.Done(): case <-browserConnection.GraphqlActionsContext.Done():
log.Debug("GraphqlActionsContext cancelled!") log.Debug("GraphqlActionsContext cancelled!")
break RangeLoop break RangeLoop
case fromBrowserMessage := <-fromBrowserToGqlActionsChannel.ReceiveChannel(): case fromBrowserMessage := <-browserConnection.FromBrowserToGqlActionsChannel.ReceiveChannel():
{ {
if fromBrowserMessage == nil { if fromBrowserMessage == nil {
continue continue
@ -76,7 +74,7 @@ RangeLoop:
}, },
} }
jsonData, _ := json.Marshal(browserResponseData) jsonData, _ := json.Marshal(browserResponseData)
fromHasuraToBrowserChannel.Send(jsonData) browserConnection.FromHasuraToBrowserChannel.Send(jsonData)
} else { } else {
//Action sent successfully, return data msg to client //Action sent successfully, return data msg to client
browserResponseData := map[string]interface{}{ browserResponseData := map[string]interface{}{
@ -89,7 +87,7 @@ RangeLoop:
}, },
} }
jsonData, _ := json.Marshal(browserResponseData) jsonData, _ := json.Marshal(browserResponseData)
fromHasuraToBrowserChannel.Send(jsonData) browserConnection.FromHasuraToBrowserChannel.Send(jsonData)
} }
//Return complete msg to client //Return complete msg to client
@ -98,7 +96,7 @@ RangeLoop:
"type": "complete", "type": "complete",
} }
jsonData, _ := json.Marshal(browserResponseComplete) jsonData, _ := json.Marshal(browserResponseComplete)
fromHasuraToBrowserChannel.Send(jsonData) browserConnection.FromHasuraToBrowserChannel.Send(jsonData)
} }
//Fallback to Hasura was disabled (keeping the code temporarily) //Fallback to Hasura was disabled (keeping the code temporarily)

View File

@ -23,9 +23,7 @@ var hasuraEndpoint = os.Getenv("BBB_GRAPHQL_MIDDLEWARE_HASURA_WS")
// Hasura client connection // Hasura client connection
func HasuraClient( func HasuraClient(
browserConnection *common.BrowserConnection, browserConnection *common.BrowserConnection) error {
fromBrowserToHasuraChannel *common.SafeChannelByte,
fromHasuraToBrowserChannel *common.SafeChannelByte) error {
log := log.WithField("_routine", "HasuraClient").WithField("browserConnectionId", browserConnection.Id) log := log.WithField("_routine", "HasuraClient").WithField("browserConnectionId", browserConnection.Id)
common.ActivitiesOverviewStarted("__HasuraConnection") common.ActivitiesOverviewStarted("__HasuraConnection")
defer common.ActivitiesOverviewCompleted("__HasuraConnection") defer common.ActivitiesOverviewCompleted("__HasuraConnection")
@ -75,11 +73,10 @@ func HasuraClient(
defer hasuraConnectionContextCancel() defer hasuraConnectionContextCancel()
var thisConnection = common.HasuraConnection{ var thisConnection = common.HasuraConnection{
Id: hasuraConnectionId, Id: hasuraConnectionId,
BrowserConn: browserConnection, BrowserConn: browserConnection,
Context: hasuraConnectionContext, Context: hasuraConnectionContext,
ContextCancelFunc: hasuraConnectionContextCancel, ContextCancelFunc: hasuraConnectionContextCancel,
FreezeMsgFromBrowserChan: common.NewSafeChannel(1),
} }
browserConnection.HasuraConnection = &thisConnection browserConnection.HasuraConnection = &thisConnection
@ -94,7 +91,7 @@ func HasuraClient(
//It's necessary to freeze the channel to avoid client trying to start subscriptions before Hasura connection is initialised //It's necessary to freeze the channel to avoid client trying to start subscriptions before Hasura connection is initialised
//It will unfreeze after `connection_ack` is sent by Hasura //It will unfreeze after `connection_ack` is sent by Hasura
fromBrowserToHasuraChannel.FreezeChannel() browserConnection.FromBrowserToHasuraChannel.FreezeChannel()
}() }()
// Make the connection // Make the connection
@ -118,10 +115,10 @@ func HasuraClient(
// Start routines // Start routines
// reads from browser, writes to hasura // reads from browser, writes to hasura
go writer.HasuraConnectionWriter(&thisConnection, fromBrowserToHasuraChannel, &wg, browserConnection.ConnectionInitMessage) go writer.HasuraConnectionWriter(&thisConnection, &wg, browserConnection.ConnectionInitMessage)
// reads from hasura, writes to browser // reads from hasura, writes to browser
go reader.HasuraConnectionReader(&thisConnection, fromHasuraToBrowserChannel, fromBrowserToHasuraChannel, &wg) go reader.HasuraConnectionReader(&thisConnection, &wg)
// Wait // Wait
wg.Wait() wg.Wait()

View File

@ -15,7 +15,7 @@ import (
) )
// HasuraConnectionReader consumes messages from Hasura connection and add send to the browser channel // HasuraConnectionReader consumes messages from Hasura connection and add send to the browser channel
func HasuraConnectionReader(hc *common.HasuraConnection, fromHasuraToBrowserChannel *common.SafeChannelByte, fromBrowserToHasuraChannel *common.SafeChannelByte, wg *sync.WaitGroup) { func HasuraConnectionReader(hc *common.HasuraConnection, wg *sync.WaitGroup) {
log := log.WithField("_routine", "HasuraConnectionReader").WithField("browserConnectionId", hc.BrowserConn.Id).WithField("hasuraConnectionId", hc.Id) log := log.WithField("_routine", "HasuraConnectionReader").WithField("browserConnectionId", hc.BrowserConn.Id).WithField("hasuraConnectionId", hc.Id)
defer log.Debugf("finished") defer log.Debugf("finished")
log.Debugf("starting") log.Debugf("starting")
@ -57,13 +57,13 @@ func HasuraConnectionReader(hc *common.HasuraConnection, fromHasuraToBrowserChan
log.Tracef("received from hasura: %s", string(message)) log.Tracef("received from hasura: %s", string(message))
handleMessageReceivedFromHasura(hc, fromHasuraToBrowserChannel, fromBrowserToHasuraChannel, message) handleMessageReceivedFromHasura(hc, message)
} }
} }
var QueryIdPlaceholderInBytes = []byte("--------------QUERY-ID--------------") //36 chars var QueryIdPlaceholderInBytes = []byte("--------------QUERY-ID--------------") //36 chars
func handleMessageReceivedFromHasura(hc *common.HasuraConnection, fromHasuraToBrowserChannel *common.SafeChannelByte, fromBrowserToHasuraChannel *common.SafeChannelByte, message []byte) { func handleMessageReceivedFromHasura(hc *common.HasuraConnection, message []byte) {
type HasuraMessageInfo struct { type HasuraMessageInfo struct {
Type string `json:"type"` Type string `json:"type"`
ID string `json:"id"` ID string `json:"id"`
@ -126,14 +126,14 @@ func handleMessageReceivedFromHasura(hc *common.HasuraConnection, fromHasuraToBr
// Retransmit the subscription start commands when hasura confirms the connection // Retransmit the subscription start commands when hasura confirms the connection
// this is useful in case of a connection invalidation // this is useful in case of a connection invalidation
if hasuraMessageInfo.Type == "connection_ack" { if hasuraMessageInfo.Type == "connection_ack" {
handleConnectionAckMessage(hc, message, fromHasuraToBrowserChannel, fromBrowserToHasuraChannel) handleConnectionAckMessage(hc, message)
} else { } else {
if queryIdReplacementApplied { if queryIdReplacementApplied {
message = bytes.Replace(message, QueryIdPlaceholderInBytes, queryIdInBytes, 1) message = bytes.Replace(message, QueryIdPlaceholderInBytes, queryIdInBytes, 1)
} }
// Forward the message to browser // Forward the message to browser
fromHasuraToBrowserChannel.Send(message) hc.BrowserConn.FromHasuraToBrowserChannel.Send(message)
} }
} }
@ -196,18 +196,18 @@ func handleCompleteMessage(hc *common.HasuraConnection, queryId string) {
log.Debugf("%s (%s) with Id %s finished by Hasura.", queryType, operationName, queryId) log.Debugf("%s (%s) with Id %s finished by Hasura.", queryType, operationName, queryId)
} }
func handleConnectionAckMessage(hc *common.HasuraConnection, message []byte, fromHasuraToBrowserChannel *common.SafeChannelByte, fromBrowserToHasuraChannel *common.SafeChannelByte) { func handleConnectionAckMessage(hc *common.HasuraConnection, message []byte) {
log.Debugf("Received connection_ack") log.Debugf("Received connection_ack")
//Hasura connection was initialized, now it's able to send new messages to Hasura //Hasura connection was initialized, now it's able to send new messages to Hasura
fromBrowserToHasuraChannel.UnfreezeChannel() hc.BrowserConn.FromBrowserToHasuraChannel.UnfreezeChannel()
//Avoid to send `connection_ack` to the browser when it's a reconnection //Avoid to send `connection_ack` to the browser when it's a reconnection
if hc.BrowserConn.ConnAckSentToBrowser == false { if hc.BrowserConn.ConnAckSentToBrowser == false {
fromHasuraToBrowserChannel.Send(message) hc.BrowserConn.FromHasuraToBrowserChannel.Send(message)
hc.BrowserConn.ConnAckSentToBrowser = true hc.BrowserConn.ConnAckSentToBrowser = true
} }
go retransmiter.RetransmitSubscriptionStartMessages(hc, fromBrowserToHasuraChannel) go retransmiter.RetransmitSubscriptionStartMessages(hc)
} }
func getHasuraMessage(message []byte) (uint32, string, common.HasuraMessage) { func getHasuraMessage(message []byte) (uint32, string, common.HasuraMessage) {

View File

@ -15,7 +15,7 @@ import (
// HasuraConnectionWriter // HasuraConnectionWriter
// process messages (middleware to hasura) // process messages (middleware to hasura)
func HasuraConnectionWriter(hc *common.HasuraConnection, fromBrowserToHasuraChannel *common.SafeChannelByte, wg *sync.WaitGroup, initMessage []byte) { func HasuraConnectionWriter(hc *common.HasuraConnection, wg *sync.WaitGroup, initMessage []byte) {
log := log.WithField("_routine", "HasuraConnectionWriter") log := log.WithField("_routine", "HasuraConnectionWriter")
browserConnection := hc.BrowserConn browserConnection := hc.BrowserConn
@ -45,13 +45,7 @@ RangeLoop:
select { select {
case <-hc.Context.Done(): case <-hc.Context.Done():
break RangeLoop break RangeLoop
case <-hc.FreezeMsgFromBrowserChan.ReceiveChannel(): case fromBrowserMessage := <-hc.BrowserConn.FromBrowserToHasuraChannel.ReceiveChannel():
if !fromBrowserToHasuraChannel.Frozen() {
log.Debug("freezing channel fromBrowserToHasuraChannel")
//Freeze channel once it's about to close Hasura connection
fromBrowserToHasuraChannel.FreezeChannel()
}
case fromBrowserMessage := <-fromBrowserToHasuraChannel.ReceiveChannel():
{ {
if fromBrowserMessage == nil { if fromBrowserMessage == nil {
continue continue

View File

@ -5,7 +5,7 @@ import (
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
func RetransmitSubscriptionStartMessages(hc *common.HasuraConnection, fromBrowserToHasuraChannel *common.SafeChannelByte) { func RetransmitSubscriptionStartMessages(hc *common.HasuraConnection) {
log := log.WithField("_routine", "RetransmitSubscriptionStartMessages").WithField("browserConnectionId", hc.BrowserConn.Id).WithField("hasuraConnectionId", hc.Id) log := log.WithField("_routine", "RetransmitSubscriptionStartMessages").WithField("browserConnectionId", hc.BrowserConn.Id).WithField("hasuraConnectionId", hc.Id)
hc.BrowserConn.ActiveSubscriptionsMutex.RLock() hc.BrowserConn.ActiveSubscriptionsMutex.RLock()
@ -21,9 +21,9 @@ func RetransmitSubscriptionStartMessages(hc *common.HasuraConnection, fromBrowse
log.Tracef("retransmiting subscription start: %v", subscription.Message) log.Tracef("retransmiting subscription start: %v", subscription.Message)
if subscription.Type == common.Streaming && subscription.StreamCursorCurrValue != nil { if subscription.Type == common.Streaming && subscription.StreamCursorCurrValue != nil {
fromBrowserToHasuraChannel.Send(common.PatchQuerySettingLastCursorValue(subscription)) hc.BrowserConn.FromBrowserToHasuraChannel.Send(common.PatchQuerySettingLastCursorValue(subscription))
} else { } else {
fromBrowserToHasuraChannel.Send(subscription.Message) hc.BrowserConn.FromBrowserToHasuraChannel.Send(subscription.Message)
} }
} }
} }

View File

@ -63,13 +63,16 @@ func ConnectionHandler(w http.ResponseWriter, r *http.Request) {
defer browserWsConn.Close(websocket.StatusInternalError, "the sky is falling") defer browserWsConn.Close(websocket.StatusInternalError, "the sky is falling")
var thisConnection = common.BrowserConnection{ var thisConnection = common.BrowserConnection{
Id: browserConnectionId, Id: browserConnectionId,
Websocket: browserWsConn, Websocket: browserWsConn,
BrowserRequestCookies: r.Cookies(), BrowserRequestCookies: r.Cookies(),
ActiveSubscriptions: make(map[string]common.GraphQlSubscription, 1), ActiveSubscriptions: make(map[string]common.GraphQlSubscription, 1),
Context: browserConnectionContext, Context: browserConnectionContext,
ContextCancelFunc: browserConnectionContextCancel, ContextCancelFunc: browserConnectionContextCancel,
ConnAckSentToBrowser: false, ConnAckSentToBrowser: false,
FromBrowserToHasuraChannel: common.NewSafeChannelByte(bufferSize),
FromBrowserToGqlActionsChannel: common.NewSafeChannelByte(bufferSize),
FromHasuraToBrowserChannel: common.NewSafeChannelByte(bufferSize),
} }
BrowserConnectionsMutex.Lock() BrowserConnectionsMutex.Lock()
@ -96,12 +99,6 @@ func ConnectionHandler(w http.ResponseWriter, r *http.Request) {
// Log it // Log it
log.Infof("connection accepted") log.Infof("connection accepted")
// Create channels
fromBrowserToHasuraConnectionEstablishingChannel := common.NewSafeChannelByte(bufferSize)
fromBrowserToHasuraChannel := common.NewSafeChannelByte(bufferSize)
fromBrowserToGqlActionsChannel := common.NewSafeChannelByte(bufferSize)
fromHasuraToBrowserChannel := common.NewSafeChannelByte(bufferSize)
// Configure the wait group (to hold this routine execution until both are completed) // Configure the wait group (to hold this routine execution until both are completed)
var wgAll sync.WaitGroup var wgAll sync.WaitGroup
wgAll.Add(2) wgAll.Add(2)
@ -110,26 +107,18 @@ func ConnectionHandler(w http.ResponseWriter, r *http.Request) {
var wgReader sync.WaitGroup var wgReader sync.WaitGroup
wgReader.Add(1) wgReader.Add(1)
// Reads from browser connection, writes into fromBrowserToHasuraChannel and fromBrowserToHasuraConnectionEstablishingChannel // Reads from browser connection, writes into fromBrowserToHasuraChannel
go reader.BrowserConnectionReader( go reader.BrowserConnectionReader(&thisConnection, []*sync.WaitGroup{&wgAll, &wgReader})
browserConnectionId,
browserConnectionContext,
browserConnectionContextCancel,
browserWsConn,
fromBrowserToGqlActionsChannel,
fromBrowserToHasuraChannel,
fromBrowserToHasuraConnectionEstablishingChannel,
[]*sync.WaitGroup{&wgAll, &wgReader})
go func() { go func() {
wgReader.Wait() wgReader.Wait()
log.Debug("BrowserConnectionReader finished, closing Write Channel") log.Debug("BrowserConnectionReader finished, closing Write Channel")
fromHasuraToBrowserChannel.Close() thisConnection.FromHasuraToBrowserChannel.Close()
thisConnection.Disconnected = true thisConnection.Disconnected = true
}() }()
//Obtain user session variables from bbb-web //Check authorization and obtain user session variables from bbb-web
if errorOnInitConnection := connectionInitHandler(&thisConnection, fromBrowserToHasuraConnectionEstablishingChannel); errorOnInitConnection != nil { if errorOnInitConnection := connectionInitHandler(&thisConnection); errorOnInitConnection != nil {
//If the server wishes to reject the connection it is recommended to close the socket with `4403: Forbidden`. //If the server wishes to reject the connection it is recommended to close the socket with `4403: Forbidden`.
//https://github.com/enisdenjo/graphql-ws/blob/63881c3372a3564bf42040e3f572dd74e41b2e49/PROTOCOL.md?plain=1#L36 //https://github.com/enisdenjo/graphql-ws/blob/63881c3372a3564bf42040e3f572dd74e41b2e49/PROTOCOL.md?plain=1#L36
wsError := &websocket.CloseError{ wsError := &websocket.CloseError{
@ -157,10 +146,7 @@ func ConnectionHandler(w http.ResponseWriter, r *http.Request) {
BrowserConnectionsMutex.RUnlock() BrowserConnectionsMutex.RUnlock()
if thisBrowserConnection != nil { if thisBrowserConnection != nil {
log.Debugf("created hasura client") log.Debugf("created hasura client")
hasura.HasuraClient( hasura.HasuraClient(thisBrowserConnection)
thisBrowserConnection,
fromBrowserToHasuraChannel,
fromHasuraToBrowserChannel)
} }
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
} }
@ -190,10 +176,7 @@ func ConnectionHandler(w http.ResponseWriter, r *http.Request) {
thisBrowserConnection.GraphqlActionsContext, thisBrowserConnection.GraphqlActionsContextCancel = context.WithCancel(browserConnectionContext) thisBrowserConnection.GraphqlActionsContext, thisBrowserConnection.GraphqlActionsContextCancel = context.WithCancel(browserConnectionContext)
BrowserConnectionsMutex.Unlock() BrowserConnectionsMutex.Unlock()
gql_actions.GraphqlActionsClient( gql_actions.GraphqlActionsClient(thisBrowserConnection)
thisBrowserConnection,
fromBrowserToGqlActionsChannel,
fromHasuraToBrowserChannel)
} }
time.Sleep(1000 * time.Millisecond) time.Sleep(1000 * time.Millisecond)
} }
@ -202,7 +185,7 @@ func ConnectionHandler(w http.ResponseWriter, r *http.Request) {
}() }()
// Reads from fromHasuraToBrowserChannel, writes to browser connection // Reads from fromHasuraToBrowserChannel, writes to browser connection
go writer.BrowserConnectionWriter(browserConnectionId, browserConnectionContext, browserWsConn, fromHasuraToBrowserChannel, &wgAll) go writer.BrowserConnectionWriter(&thisConnection, &wgAll)
// Wait until all routines are finished // Wait until all routines are finished
wgAll.Wait() wgAll.Wait()
@ -223,7 +206,6 @@ func InvalidateSessionTokenConnections(sessionTokenToInvalidate string) {
wg.Add(1) wg.Add(1)
go func(bc *common.BrowserConnection) { go func(bc *common.BrowserConnection) {
defer wg.Done() defer wg.Done()
go refreshUserSessionVariables(bc)
invalidateHasuraConnectionForSessionToken(bc, sessionTokenToInvalidate) invalidateHasuraConnectionForSessionToken(bc, sessionTokenToInvalidate)
}(browserConnection) }(browserConnection)
} }
@ -231,43 +213,27 @@ func InvalidateSessionTokenConnections(sessionTokenToInvalidate string) {
} }
func invalidateHasuraConnectionForSessionToken(bc *common.BrowserConnection, sessionToken string) { func invalidateHasuraConnectionForSessionToken(bc *common.BrowserConnection, sessionToken string) {
BrowserConnectionsMutex.RLock()
defer BrowserConnectionsMutex.RUnlock()
if bc.HasuraConnection == nil { if bc.HasuraConnection == nil {
return // If there's no Hasura connection, there's nothing to invalidate. return // If there's no Hasura connection, there's nothing to invalidate.
} }
hasuraConnectionId := bc.HasuraConnection.Id log.Debugf("Processing invalidate request for sessionToken %v (hasura connection %v)", sessionToken, bc.HasuraConnection.Id)
// Send message to stop receiving new messages from the browser. // Stop receiving new messages from the browser.
bc.HasuraConnection.FreezeMsgFromBrowserChan.Send(true) log.Debug("freezing channel fromBrowserToHasuraChannel")
bc.GraphqlActionsContextCancel() bc.FromBrowserToHasuraChannel.FreezeChannel()
// Wait until there are no active mutations. //Update variables for Mutations (gql-actions requests)
for iterationCount := 0; iterationCount < 20; iterationCount++ { go refreshUserSessionVariables(bc)
activeMutationFound := false
bc.ActiveSubscriptionsMutex.RLock()
for _, subscription := range bc.ActiveSubscriptions {
if subscription.Type == common.Mutation {
activeMutationFound = true
break
}
}
bc.ActiveSubscriptionsMutex.RUnlock()
if !activeMutationFound {
break // Exit the loop if no active mutations are found.
}
time.Sleep(100 * time.Millisecond) // Wait a bit before checking again.
}
log.Debugf("Processing invalidate request for sessionToken %v (hasura connection %v)", sessionToken, hasuraConnectionId)
// Cancel the Hasura connection context to clean up resources. // Cancel the Hasura connection context to clean up resources.
if bc.HasuraConnection != nil && bc.HasuraConnection.ContextCancelFunc != nil { if bc.HasuraConnection != nil && bc.HasuraConnection.ContextCancelFunc != nil {
bc.HasuraConnection.ContextCancelFunc() bc.HasuraConnection.ContextCancelFunc()
} }
log.Debugf("Processed invalidate request for sessionToken %v (hasura connection %v)", sessionToken, hasuraConnectionId)
// Send a reconnection confirmation message // Send a reconnection confirmation message
go SendUserGraphqlReconnectionForcedEvtMsg(sessionToken) go SendUserGraphqlReconnectionForcedEvtMsg(sessionToken)
} }
@ -288,7 +254,6 @@ func refreshUserSessionVariables(browserConnection *common.BrowserConnection) er
log.Trace("Session variables obtained successfully") log.Trace("Session variables obtained successfully")
} }
log.Info(sessionVariables)
if _, exists := sessionVariables["x-hasura-role"]; !exists { if _, exists := sessionVariables["x-hasura-role"]; !exists {
return fmt.Errorf("error on checking sessionToken authorization, X-Hasura-Role is missing") return fmt.Errorf("error on checking sessionToken authorization, X-Hasura-Role is missing")
} }
@ -308,10 +273,7 @@ func refreshUserSessionVariables(browserConnection *common.BrowserConnection) er
return nil return nil
} }
func connectionInitHandler( func connectionInitHandler(browserConnection *common.BrowserConnection) error {
browserConnection *common.BrowserConnection,
fromBrowserToHasuraConnectionEstablishingChannel *common.SafeChannelByte) error {
BrowserConnectionsMutex.RLock() BrowserConnectionsMutex.RLock()
browserConnectionId := browserConnection.Id browserConnectionId := browserConnection.Id
browserConnectionCookies := browserConnection.BrowserRequestCookies browserConnectionCookies := browserConnection.BrowserRequestCookies
@ -319,7 +281,7 @@ func connectionInitHandler(
// Intercept the fromBrowserMessage channel to get the sessionToken // Intercept the fromBrowserMessage channel to get the sessionToken
for { for {
fromBrowserMessage, ok := fromBrowserToHasuraConnectionEstablishingChannel.Receive() fromBrowserMessage, ok := browserConnection.FromBrowserToHasuraChannel.Receive()
if !ok { if !ok {
//Received all messages. Channel is closed //Received all messages. Channel is closed
return fmt.Errorf("error on receiving init connection") return fmt.Errorf("error on receiving init connection")
@ -405,7 +367,7 @@ func connectionInitHandler(
strings.ToLower(clientIsMobile) == "true", strings.ToLower(clientIsMobile) == "true",
browserConnectionId, browserConnectionId,
) )
fromBrowserToHasuraConnectionEstablishingChannel.Close()
break break
} }
} }

View File

@ -13,22 +13,15 @@ import (
) )
func BrowserConnectionReader( func BrowserConnectionReader(
browserConnectionId string, browserConnection *common.BrowserConnection,
ctx context.Context,
ctxCancel context.CancelFunc,
browserWsConn *websocket.Conn,
fromBrowserToGqlActionsChannel *common.SafeChannelByte,
fromBrowserToHasuraChannel *common.SafeChannelByte,
fromBrowserToHasuraConnectionEstablishingChannel *common.SafeChannelByte,
waitGroups []*sync.WaitGroup) { waitGroups []*sync.WaitGroup) {
log := log.WithField("_routine", "BrowserConnectionReader").WithField("browserConnectionId", browserConnectionId) log := log.WithField("_routine", "BrowserConnectionReader").WithField("browserConnectionId", browserConnection.Id)
defer log.Debugf("finished") defer log.Debugf("finished")
log.Debugf("starting") log.Debugf("starting")
defer func() { defer func() {
fromBrowserToHasuraChannel.Close() browserConnection.FromBrowserToHasuraChannel.Close()
fromBrowserToGqlActionsChannel.Close() browserConnection.FromBrowserToGqlActionsChannel.Close()
fromBrowserToHasuraConnectionEstablishingChannel.Close()
}() }()
defer func() { defer func() {
@ -40,10 +33,10 @@ func BrowserConnectionReader(
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
}() }()
defer ctxCancel() defer browserConnection.ContextCancelFunc()
for { for {
messageType, message, err := browserWsConn.Read(ctx) messageType, message, err := browserConnection.Websocket.Read(browserConnection.Context)
if err != nil { if err != nil {
if errors.Is(err, context.Canceled) { if errors.Is(err, context.Canceled) {
log.Debugf("Closing Browser ws connection as Context was cancelled!") log.Debugf("Closing Browser ws connection as Context was cancelled!")
@ -71,12 +64,11 @@ func BrowserConnectionReader(
if browserMessageType.Type == "subscribe" { if browserMessageType.Type == "subscribe" {
if bytes.Contains(message, []byte("\"query\":\"mutation")) { if bytes.Contains(message, []byte("\"query\":\"mutation")) {
fromBrowserToGqlActionsChannel.Send(message) browserConnection.FromBrowserToGqlActionsChannel.Send(message)
continue continue
} }
} }
fromBrowserToHasuraChannel.Send(message) browserConnection.FromBrowserToHasuraChannel.Send(message)
fromBrowserToHasuraConnectionEstablishingChannel.Send(message)
} }
} }

View File

@ -2,7 +2,6 @@ package writer
import ( import (
"bytes" "bytes"
"context"
"encoding/json" "encoding/json"
"github.com/iMDT/bbb-graphql-middleware/internal/common" "github.com/iMDT/bbb-graphql-middleware/internal/common"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
@ -11,12 +10,9 @@ import (
) )
func BrowserConnectionWriter( func BrowserConnectionWriter(
browserConnectionId string, browserConnection *common.BrowserConnection,
ctx context.Context,
browserWsConn *websocket.Conn,
fromHasuraToBrowserChannel *common.SafeChannelByte,
wg *sync.WaitGroup) { wg *sync.WaitGroup) {
log := log.WithField("_routine", "BrowserConnectionWriter").WithField("browserConnectionId", browserConnectionId) log := log.WithField("_routine", "BrowserConnectionWriter").WithField("browserConnectionId", browserConnection.Id)
defer log.Debugf("finished") defer log.Debugf("finished")
log.Debugf("starting") log.Debugf("starting")
defer wg.Done() defer wg.Done()
@ -24,20 +20,20 @@ func BrowserConnectionWriter(
RangeLoop: RangeLoop:
for { for {
select { select {
case <-ctx.Done(): case <-browserConnection.Context.Done():
log.Debug("Browser context cancelled.") log.Debug("Browser context cancelled.")
break RangeLoop break RangeLoop
case toBrowserMessage := <-fromHasuraToBrowserChannel.ReceiveChannel(): case toBrowserMessage := <-browserConnection.FromHasuraToBrowserChannel.ReceiveChannel():
{ {
if toBrowserMessage == nil { if toBrowserMessage == nil {
if fromHasuraToBrowserChannel.Closed() { if browserConnection.FromHasuraToBrowserChannel.Closed() {
break RangeLoop break RangeLoop
} }
continue continue
} }
log.Tracef("sending to browser: %s", string(toBrowserMessage)) log.Tracef("sending to browser: %s", string(toBrowserMessage))
err := browserWsConn.Write(ctx, websocket.MessageText, toBrowserMessage) err := browserConnection.Websocket.Write(browserConnection.Context, websocket.MessageText, toBrowserMessage)
if err != nil { if err != nil {
log.Debugf("Browser is disconnected, skipping writing of ws message: %v", err) log.Debugf("Browser is disconnected, skipping writing of ws message: %v", err)
return return
@ -52,7 +48,7 @@ RangeLoop:
var hasuraMessage HasuraMessage var hasuraMessage HasuraMessage
_ = json.Unmarshal(toBrowserMessage, &hasuraMessage) _ = json.Unmarshal(toBrowserMessage, &hasuraMessage)
if hasuraMessage.Type == "connection_error" { if hasuraMessage.Type == "connection_error" {
_ = browserWsConn.Close(websocket.StatusInternalError, string(toBrowserMessage)) _ = browserConnection.Websocket.Close(websocket.StatusInternalError, string(toBrowserMessage))
} }
} }
} }