From d1e2df74f040806298d5f6931348f2f7d5e0bd6d Mon Sep 17 00:00:00 2001 From: Gustavo Trott Date: Wed, 13 Mar 2024 10:35:51 -0300 Subject: [PATCH] Improve recconection flow (#19788) --- .../internal/common/SafeChannel.go | 7 ++++ .../internal/common/types.go | 12 +++--- .../internal/hascli/client.go | 10 ++--- .../internal/hascli/conn/reader/reader.go | 40 +++++++++---------- .../internal/hascli/conn/writer/writer.go | 6 +-- .../hascli/retransmiter/retransmiter.go | 8 ++-- .../internal/websrv/connhandler.go | 4 +- .../internal/websrv/reader/reader.go | 7 +++- .../internal/websrv/writer/writer.go | 3 ++ 9 files changed, 56 insertions(+), 41 deletions(-) diff --git a/bbb-graphql-middleware/internal/common/SafeChannel.go b/bbb-graphql-middleware/internal/common/SafeChannel.go index 09b31ab2d2..0a86608119 100644 --- a/bbb-graphql-middleware/internal/common/SafeChannel.go +++ b/bbb-graphql-middleware/internal/common/SafeChannel.go @@ -37,6 +37,13 @@ func (s *SafeChannel) ReceiveChannel() <-chan interface{} { return s.ch } +func (s *SafeChannel) Closed() bool { + s.mux.Lock() + defer s.mux.Unlock() + + return s.closed +} + func (s *SafeChannel) Close() { s.mux.Lock() defer s.mux.Unlock() diff --git a/bbb-graphql-middleware/internal/common/types.go b/bbb-graphql-middleware/internal/common/types.go index a7f850f293..2ebc3a2911 100644 --- a/bbb-graphql-middleware/internal/common/types.go +++ b/bbb-graphql-middleware/internal/common/types.go @@ -43,10 +43,10 @@ type BrowserConnection struct { } type HasuraConnection struct { - Id string // hasura connection id - Browserconn *BrowserConnection // browser connection that originated this hasura connection - Websocket *websocket.Conn // websocket used to connect to hasura - Context context.Context // hasura connection context (child of browser connection context) - ContextCancelFunc context.CancelFunc // function to cancel the hasura context (and so, the hasura connection) - MsgReceivingActiveChan *SafeChannel // indicate that it's waiting for the return of mutations before closing connection + Id string // hasura connection id + BrowserConn *BrowserConnection // browser connection that originated this hasura connection + Websocket *websocket.Conn // websocket used to connect to hasura + Context context.Context // hasura connection context (child of browser connection context) + ContextCancelFunc context.CancelFunc // function to cancel the hasura context (and so, the hasura connection) + FreezeMsgFromBrowserChan *SafeChannel // indicate that it's waiting for the return of mutations before closing connection } diff --git a/bbb-graphql-middleware/internal/hascli/client.go b/bbb-graphql-middleware/internal/hascli/client.go index 869dc2072e..aad8a131f4 100644 --- a/bbb-graphql-middleware/internal/hascli/client.go +++ b/bbb-graphql-middleware/internal/hascli/client.go @@ -60,11 +60,11 @@ func HasuraClient(browserConnection *common.BrowserConnection, cookies []*http.C defer hasuraConnectionContextCancel() var thisConnection = common.HasuraConnection{ - Id: hasuraConnectionId, - Browserconn: browserConnection, - Context: hasuraConnectionContext, - ContextCancelFunc: hasuraConnectionContextCancel, - MsgReceivingActiveChan: common.NewSafeChannel(1), + Id: hasuraConnectionId, + BrowserConn: browserConnection, + Context: hasuraConnectionContext, + ContextCancelFunc: hasuraConnectionContextCancel, + FreezeMsgFromBrowserChan: common.NewSafeChannel(1), } browserConnection.HasuraConnection = &thisConnection diff --git a/bbb-graphql-middleware/internal/hascli/conn/reader/reader.go b/bbb-graphql-middleware/internal/hascli/conn/reader/reader.go index 0c3a4e84f6..73d302cf29 100644 --- a/bbb-graphql-middleware/internal/hascli/conn/reader/reader.go +++ b/bbb-graphql-middleware/internal/hascli/conn/reader/reader.go @@ -15,7 +15,7 @@ import ( // HasuraConnectionReader consumes messages from Hasura connection and add send to the browser channel func HasuraConnectionReader(hc *common.HasuraConnection, fromHasuraToBrowserChannel *common.SafeChannel, fromBrowserToHasuraChannel *common.SafeChannel, wg *sync.WaitGroup) { - log := log.WithField("_routine", "HasuraConnectionReader").WithField("browserConnectionId", hc.Browserconn.Id).WithField("hasuraConnectionId", hc.Id) + log := log.WithField("_routine", "HasuraConnectionReader").WithField("browserConnectionId", hc.BrowserConn.Id).WithField("hasuraConnectionId", hc.Id) defer log.Debugf("finished") log.Debugf("starting") @@ -28,9 +28,9 @@ func HasuraConnectionReader(hc *common.HasuraConnection, fromHasuraToBrowserChan err := wsjson.Read(hc.Context, hc.Websocket, &message) if err != nil { if errors.Is(err, context.Canceled) { - log.Debugf("Closing ws connection as Context was cancelled!") + log.Debugf("Closing Hasura ws connection as Context was cancelled!") } else { - log.Errorf("Error reading message from Hasura: %v", err) + log.Debugf("Error reading message from Hasura: %v", err) } return } @@ -50,9 +50,9 @@ func handleMessageReceivedFromHasura(hc *common.HasuraConnection, fromHasuraToBr //Check if subscription is still active! if queryId != "" { - hc.Browserconn.ActiveSubscriptionsMutex.RLock() - subscription, ok := hc.Browserconn.ActiveSubscriptions[queryId] - hc.Browserconn.ActiveSubscriptionsMutex.RUnlock() + hc.BrowserConn.ActiveSubscriptionsMutex.RLock() + subscription, ok := hc.BrowserConn.ActiveSubscriptions[queryId] + hc.BrowserConn.ActiveSubscriptionsMutex.RUnlock() if !ok { log.Debugf("Subscription with Id %s doesn't exist anymore, skiping response.", queryId) return @@ -104,13 +104,13 @@ func handleSubscriptionMessage(hc *common.HasuraConnection, messageMap map[strin //Store LastReceivedData Checksum subscription.LastReceivedDataChecksum = dataChecksum - hc.Browserconn.ActiveSubscriptionsMutex.Lock() - hc.Browserconn.ActiveSubscriptions[queryId] = subscription - hc.Browserconn.ActiveSubscriptionsMutex.Unlock() + hc.BrowserConn.ActiveSubscriptionsMutex.Lock() + hc.BrowserConn.ActiveSubscriptions[queryId] = subscription + hc.BrowserConn.ActiveSubscriptionsMutex.Unlock() //Apply msg patch when it supports it if subscription.JsonPatchSupported { - msgpatch.PatchMessage(&messageMap, queryId, dataKey, dataAsJson, hc.Browserconn) + msgpatch.PatchMessage(&messageMap, queryId, dataKey, dataAsJson, hc.BrowserConn) } } } @@ -126,18 +126,18 @@ func handleStreamingMessage(hc *common.HasuraConnection, messageMap map[string]i if lastCursor != nil && subscription.StreamCursorCurrValue != lastCursor { subscription.StreamCursorCurrValue = lastCursor - hc.Browserconn.ActiveSubscriptionsMutex.Lock() - hc.Browserconn.ActiveSubscriptions[queryId] = subscription - hc.Browserconn.ActiveSubscriptionsMutex.Unlock() + hc.BrowserConn.ActiveSubscriptionsMutex.Lock() + hc.BrowserConn.ActiveSubscriptions[queryId] = subscription + hc.BrowserConn.ActiveSubscriptionsMutex.Unlock() } } func handleCompleteMessage(hc *common.HasuraConnection, queryId string) { - hc.Browserconn.ActiveSubscriptionsMutex.Lock() - queryType := hc.Browserconn.ActiveSubscriptions[queryId].Type - operationName := hc.Browserconn.ActiveSubscriptions[queryId].OperationName - delete(hc.Browserconn.ActiveSubscriptions, queryId) - hc.Browserconn.ActiveSubscriptionsMutex.Unlock() + hc.BrowserConn.ActiveSubscriptionsMutex.Lock() + queryType := hc.BrowserConn.ActiveSubscriptions[queryId].Type + operationName := hc.BrowserConn.ActiveSubscriptions[queryId].OperationName + delete(hc.BrowserConn.ActiveSubscriptions, queryId) + hc.BrowserConn.ActiveSubscriptionsMutex.Unlock() log.Debugf("%s (%s) with Id %s finished by Hasura.", queryType, operationName, queryId) } @@ -147,9 +147,9 @@ func handleConnectionAckMessage(hc *common.HasuraConnection, messageMap map[stri fromBrowserToHasuraChannel.UnfreezeChannel() //Avoid to send `connection_ack` to the browser when it's a reconnection - if hc.Browserconn.ConnAckSentToBrowser == false { + if hc.BrowserConn.ConnAckSentToBrowser == false { fromHasuraToBrowserChannel.Send(messageMap) - hc.Browserconn.ConnAckSentToBrowser = true + hc.BrowserConn.ConnAckSentToBrowser = true } go retransmiter.RetransmitSubscriptionStartMessages(hc, fromBrowserToHasuraChannel) diff --git a/bbb-graphql-middleware/internal/hascli/conn/writer/writer.go b/bbb-graphql-middleware/internal/hascli/conn/writer/writer.go index bfb5a1b81d..a6f5cd4145 100644 --- a/bbb-graphql-middleware/internal/hascli/conn/writer/writer.go +++ b/bbb-graphql-middleware/internal/hascli/conn/writer/writer.go @@ -14,7 +14,7 @@ import ( func HasuraConnectionWriter(hc *common.HasuraConnection, fromBrowserToHasuraChannel *common.SafeChannel, wg *sync.WaitGroup, initMessage map[string]interface{}) { log := log.WithField("_routine", "HasuraConnectionWriter") - browserConnection := hc.Browserconn + browserConnection := hc.BrowserConn log = log.WithField("browserConnectionId", browserConnection.Id).WithField("hasuraConnectionId", hc.Id) @@ -38,9 +38,9 @@ RangeLoop: select { case <-hc.Context.Done(): break RangeLoop - case <-hc.MsgReceivingActiveChan.ReceiveChannel(): + case <-hc.FreezeMsgFromBrowserChan.ReceiveChannel(): if !fromBrowserToHasuraChannel.Frozen() { - log.Debugf("freezing channel fromBrowserToHasuraChannel") + log.Debug("freezing channel fromBrowserToHasuraChannel") //Freeze channel once it's about to close Hasura connection fromBrowserToHasuraChannel.FreezeChannel() } diff --git a/bbb-graphql-middleware/internal/hascli/retransmiter/retransmiter.go b/bbb-graphql-middleware/internal/hascli/retransmiter/retransmiter.go index 06c86bbc4c..221ea5ad47 100644 --- a/bbb-graphql-middleware/internal/hascli/retransmiter/retransmiter.go +++ b/bbb-graphql-middleware/internal/hascli/retransmiter/retransmiter.go @@ -6,10 +6,10 @@ import ( ) func RetransmitSubscriptionStartMessages(hc *common.HasuraConnection, fromBrowserToHasuraChannel *common.SafeChannel) { - log := log.WithField("_routine", "RetransmitSubscriptionStartMessages").WithField("browserConnectionId", hc.Browserconn.Id).WithField("hasuraConnectionId", hc.Id) + log := log.WithField("_routine", "RetransmitSubscriptionStartMessages").WithField("browserConnectionId", hc.BrowserConn.Id).WithField("hasuraConnectionId", hc.Id) - hc.Browserconn.ActiveSubscriptionsMutex.RLock() - for _, subscription := range hc.Browserconn.ActiveSubscriptions { + hc.BrowserConn.ActiveSubscriptionsMutex.RLock() + for _, subscription := range hc.BrowserConn.ActiveSubscriptions { //Not retransmitting Mutations if subscription.Type == common.Mutation { @@ -27,5 +27,5 @@ func RetransmitSubscriptionStartMessages(hc *common.HasuraConnection, fromBrowse } } } - hc.Browserconn.ActiveSubscriptionsMutex.RUnlock() + hc.BrowserConn.ActiveSubscriptionsMutex.RUnlock() } diff --git a/bbb-graphql-middleware/internal/websrv/connhandler.go b/bbb-graphql-middleware/internal/websrv/connhandler.go index 19db02a4ba..33e925aece 100644 --- a/bbb-graphql-middleware/internal/websrv/connhandler.go +++ b/bbb-graphql-middleware/internal/websrv/connhandler.go @@ -138,8 +138,8 @@ func InvalidateSessionTokenConnections(sessionTokenToInvalidate string) { for _, browserConnection := range BrowserConnections { if browserConnection.SessionToken == sessionTokenToInvalidate { if browserConnection.HasuraConnection != nil { - //Close chan to force stop receiving new messages from the browser - browserConnection.HasuraConnection.MsgReceivingActiveChan.Close() + //Send message to force stop receiving new messages from the browser + browserConnection.HasuraConnection.FreezeMsgFromBrowserChan.Send(true) // Wait until there are no active mutations for iterationCount := 0; iterationCount < 20; iterationCount++ { diff --git a/bbb-graphql-middleware/internal/websrv/reader/reader.go b/bbb-graphql-middleware/internal/websrv/reader/reader.go index 0d3fb7cecd..20ddbf8f07 100644 --- a/bbb-graphql-middleware/internal/websrv/reader/reader.go +++ b/bbb-graphql-middleware/internal/websrv/reader/reader.go @@ -2,6 +2,7 @@ package reader import ( "context" + "errors" "github.com/iMDT/bbb-graphql-middleware/internal/common" log "github.com/sirupsen/logrus" "nhooyr.io/websocket" @@ -35,7 +36,11 @@ func BrowserConnectionReader(browserConnectionId string, ctx context.Context, ct var v interface{} err := wsjson.Read(ctx, browserWsConn, &v) if err != nil { - log.Debugf("Browser is disconnected, skiping reading of ws message: %v", err) + if errors.Is(err, context.Canceled) { + log.Debugf("Closing Browser ws connection as Context was cancelled!") + } else { + log.Debugf("Hasura is disconnected, skiping reading of ws message: %v", err) + } return } diff --git a/bbb-graphql-middleware/internal/websrv/writer/writer.go b/bbb-graphql-middleware/internal/websrv/writer/writer.go index c197eae582..b168f95492 100644 --- a/bbb-graphql-middleware/internal/websrv/writer/writer.go +++ b/bbb-graphql-middleware/internal/websrv/writer/writer.go @@ -24,6 +24,9 @@ RangeLoop: case toBrowserMessage := <-fromHasuraToBrowserChannel.ReceiveChannel(): { if toBrowserMessage == nil { + if fromHasuraToBrowserChannel.Closed() { + break RangeLoop + } continue }