Improve recconection flow (#19788)

This commit is contained in:
Gustavo Trott 2024-03-13 10:35:51 -03:00 committed by GitHub
parent dfc2a570ba
commit d1e2df74f0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 56 additions and 41 deletions

View File

@ -37,6 +37,13 @@ func (s *SafeChannel) ReceiveChannel() <-chan interface{} {
return s.ch return s.ch
} }
func (s *SafeChannel) Closed() bool {
s.mux.Lock()
defer s.mux.Unlock()
return s.closed
}
func (s *SafeChannel) Close() { func (s *SafeChannel) Close() {
s.mux.Lock() s.mux.Lock()
defer s.mux.Unlock() defer s.mux.Unlock()

View File

@ -43,10 +43,10 @@ type BrowserConnection struct {
} }
type HasuraConnection struct { type HasuraConnection struct {
Id string // hasura connection id Id string // hasura connection id
Browserconn *BrowserConnection // browser connection that originated this hasura connection BrowserConn *BrowserConnection // browser connection that originated this hasura connection
Websocket *websocket.Conn // websocket used to connect to hasura Websocket *websocket.Conn // websocket used to connect to hasura
Context context.Context // hasura connection context (child of browser connection context) Context context.Context // hasura connection context (child of browser connection context)
ContextCancelFunc context.CancelFunc // function to cancel the hasura context (and so, the hasura connection) ContextCancelFunc context.CancelFunc // function to cancel the hasura context (and so, the hasura connection)
MsgReceivingActiveChan *SafeChannel // indicate that it's waiting for the return of mutations before closing connection FreezeMsgFromBrowserChan *SafeChannel // indicate that it's waiting for the return of mutations before closing connection
} }

View File

@ -60,11 +60,11 @@ func HasuraClient(browserConnection *common.BrowserConnection, cookies []*http.C
defer hasuraConnectionContextCancel() defer hasuraConnectionContextCancel()
var thisConnection = common.HasuraConnection{ var thisConnection = common.HasuraConnection{
Id: hasuraConnectionId, Id: hasuraConnectionId,
Browserconn: browserConnection, BrowserConn: browserConnection,
Context: hasuraConnectionContext, Context: hasuraConnectionContext,
ContextCancelFunc: hasuraConnectionContextCancel, ContextCancelFunc: hasuraConnectionContextCancel,
MsgReceivingActiveChan: common.NewSafeChannel(1), FreezeMsgFromBrowserChan: common.NewSafeChannel(1),
} }
browserConnection.HasuraConnection = &thisConnection browserConnection.HasuraConnection = &thisConnection

View File

@ -15,7 +15,7 @@ import (
// HasuraConnectionReader consumes messages from Hasura connection and add send to the browser channel // HasuraConnectionReader consumes messages from Hasura connection and add send to the browser channel
func HasuraConnectionReader(hc *common.HasuraConnection, fromHasuraToBrowserChannel *common.SafeChannel, fromBrowserToHasuraChannel *common.SafeChannel, wg *sync.WaitGroup) { func HasuraConnectionReader(hc *common.HasuraConnection, fromHasuraToBrowserChannel *common.SafeChannel, fromBrowserToHasuraChannel *common.SafeChannel, wg *sync.WaitGroup) {
log := log.WithField("_routine", "HasuraConnectionReader").WithField("browserConnectionId", hc.Browserconn.Id).WithField("hasuraConnectionId", hc.Id) log := log.WithField("_routine", "HasuraConnectionReader").WithField("browserConnectionId", hc.BrowserConn.Id).WithField("hasuraConnectionId", hc.Id)
defer log.Debugf("finished") defer log.Debugf("finished")
log.Debugf("starting") log.Debugf("starting")
@ -28,9 +28,9 @@ func HasuraConnectionReader(hc *common.HasuraConnection, fromHasuraToBrowserChan
err := wsjson.Read(hc.Context, hc.Websocket, &message) err := wsjson.Read(hc.Context, hc.Websocket, &message)
if err != nil { if err != nil {
if errors.Is(err, context.Canceled) { if errors.Is(err, context.Canceled) {
log.Debugf("Closing ws connection as Context was cancelled!") log.Debugf("Closing Hasura ws connection as Context was cancelled!")
} else { } else {
log.Errorf("Error reading message from Hasura: %v", err) log.Debugf("Error reading message from Hasura: %v", err)
} }
return return
} }
@ -50,9 +50,9 @@ func handleMessageReceivedFromHasura(hc *common.HasuraConnection, fromHasuraToBr
//Check if subscription is still active! //Check if subscription is still active!
if queryId != "" { if queryId != "" {
hc.Browserconn.ActiveSubscriptionsMutex.RLock() hc.BrowserConn.ActiveSubscriptionsMutex.RLock()
subscription, ok := hc.Browserconn.ActiveSubscriptions[queryId] subscription, ok := hc.BrowserConn.ActiveSubscriptions[queryId]
hc.Browserconn.ActiveSubscriptionsMutex.RUnlock() hc.BrowserConn.ActiveSubscriptionsMutex.RUnlock()
if !ok { if !ok {
log.Debugf("Subscription with Id %s doesn't exist anymore, skiping response.", queryId) log.Debugf("Subscription with Id %s doesn't exist anymore, skiping response.", queryId)
return return
@ -104,13 +104,13 @@ func handleSubscriptionMessage(hc *common.HasuraConnection, messageMap map[strin
//Store LastReceivedData Checksum //Store LastReceivedData Checksum
subscription.LastReceivedDataChecksum = dataChecksum subscription.LastReceivedDataChecksum = dataChecksum
hc.Browserconn.ActiveSubscriptionsMutex.Lock() hc.BrowserConn.ActiveSubscriptionsMutex.Lock()
hc.Browserconn.ActiveSubscriptions[queryId] = subscription hc.BrowserConn.ActiveSubscriptions[queryId] = subscription
hc.Browserconn.ActiveSubscriptionsMutex.Unlock() hc.BrowserConn.ActiveSubscriptionsMutex.Unlock()
//Apply msg patch when it supports it //Apply msg patch when it supports it
if subscription.JsonPatchSupported { if subscription.JsonPatchSupported {
msgpatch.PatchMessage(&messageMap, queryId, dataKey, dataAsJson, hc.Browserconn) msgpatch.PatchMessage(&messageMap, queryId, dataKey, dataAsJson, hc.BrowserConn)
} }
} }
} }
@ -126,18 +126,18 @@ func handleStreamingMessage(hc *common.HasuraConnection, messageMap map[string]i
if lastCursor != nil && subscription.StreamCursorCurrValue != lastCursor { if lastCursor != nil && subscription.StreamCursorCurrValue != lastCursor {
subscription.StreamCursorCurrValue = lastCursor subscription.StreamCursorCurrValue = lastCursor
hc.Browserconn.ActiveSubscriptionsMutex.Lock() hc.BrowserConn.ActiveSubscriptionsMutex.Lock()
hc.Browserconn.ActiveSubscriptions[queryId] = subscription hc.BrowserConn.ActiveSubscriptions[queryId] = subscription
hc.Browserconn.ActiveSubscriptionsMutex.Unlock() hc.BrowserConn.ActiveSubscriptionsMutex.Unlock()
} }
} }
func handleCompleteMessage(hc *common.HasuraConnection, queryId string) { func handleCompleteMessage(hc *common.HasuraConnection, queryId string) {
hc.Browserconn.ActiveSubscriptionsMutex.Lock() hc.BrowserConn.ActiveSubscriptionsMutex.Lock()
queryType := hc.Browserconn.ActiveSubscriptions[queryId].Type queryType := hc.BrowserConn.ActiveSubscriptions[queryId].Type
operationName := hc.Browserconn.ActiveSubscriptions[queryId].OperationName operationName := hc.BrowserConn.ActiveSubscriptions[queryId].OperationName
delete(hc.Browserconn.ActiveSubscriptions, queryId) delete(hc.BrowserConn.ActiveSubscriptions, queryId)
hc.Browserconn.ActiveSubscriptionsMutex.Unlock() hc.BrowserConn.ActiveSubscriptionsMutex.Unlock()
log.Debugf("%s (%s) with Id %s finished by Hasura.", queryType, operationName, queryId) log.Debugf("%s (%s) with Id %s finished by Hasura.", queryType, operationName, queryId)
} }
@ -147,9 +147,9 @@ func handleConnectionAckMessage(hc *common.HasuraConnection, messageMap map[stri
fromBrowserToHasuraChannel.UnfreezeChannel() fromBrowserToHasuraChannel.UnfreezeChannel()
//Avoid to send `connection_ack` to the browser when it's a reconnection //Avoid to send `connection_ack` to the browser when it's a reconnection
if hc.Browserconn.ConnAckSentToBrowser == false { if hc.BrowserConn.ConnAckSentToBrowser == false {
fromHasuraToBrowserChannel.Send(messageMap) fromHasuraToBrowserChannel.Send(messageMap)
hc.Browserconn.ConnAckSentToBrowser = true hc.BrowserConn.ConnAckSentToBrowser = true
} }
go retransmiter.RetransmitSubscriptionStartMessages(hc, fromBrowserToHasuraChannel) go retransmiter.RetransmitSubscriptionStartMessages(hc, fromBrowserToHasuraChannel)

View File

@ -14,7 +14,7 @@ import (
func HasuraConnectionWriter(hc *common.HasuraConnection, fromBrowserToHasuraChannel *common.SafeChannel, wg *sync.WaitGroup, initMessage map[string]interface{}) { func HasuraConnectionWriter(hc *common.HasuraConnection, fromBrowserToHasuraChannel *common.SafeChannel, wg *sync.WaitGroup, initMessage map[string]interface{}) {
log := log.WithField("_routine", "HasuraConnectionWriter") log := log.WithField("_routine", "HasuraConnectionWriter")
browserConnection := hc.Browserconn browserConnection := hc.BrowserConn
log = log.WithField("browserConnectionId", browserConnection.Id).WithField("hasuraConnectionId", hc.Id) log = log.WithField("browserConnectionId", browserConnection.Id).WithField("hasuraConnectionId", hc.Id)
@ -38,9 +38,9 @@ RangeLoop:
select { select {
case <-hc.Context.Done(): case <-hc.Context.Done():
break RangeLoop break RangeLoop
case <-hc.MsgReceivingActiveChan.ReceiveChannel(): case <-hc.FreezeMsgFromBrowserChan.ReceiveChannel():
if !fromBrowserToHasuraChannel.Frozen() { if !fromBrowserToHasuraChannel.Frozen() {
log.Debugf("freezing channel fromBrowserToHasuraChannel") log.Debug("freezing channel fromBrowserToHasuraChannel")
//Freeze channel once it's about to close Hasura connection //Freeze channel once it's about to close Hasura connection
fromBrowserToHasuraChannel.FreezeChannel() fromBrowserToHasuraChannel.FreezeChannel()
} }

View File

@ -6,10 +6,10 @@ import (
) )
func RetransmitSubscriptionStartMessages(hc *common.HasuraConnection, fromBrowserToHasuraChannel *common.SafeChannel) { func RetransmitSubscriptionStartMessages(hc *common.HasuraConnection, fromBrowserToHasuraChannel *common.SafeChannel) {
log := log.WithField("_routine", "RetransmitSubscriptionStartMessages").WithField("browserConnectionId", hc.Browserconn.Id).WithField("hasuraConnectionId", hc.Id) log := log.WithField("_routine", "RetransmitSubscriptionStartMessages").WithField("browserConnectionId", hc.BrowserConn.Id).WithField("hasuraConnectionId", hc.Id)
hc.Browserconn.ActiveSubscriptionsMutex.RLock() hc.BrowserConn.ActiveSubscriptionsMutex.RLock()
for _, subscription := range hc.Browserconn.ActiveSubscriptions { for _, subscription := range hc.BrowserConn.ActiveSubscriptions {
//Not retransmitting Mutations //Not retransmitting Mutations
if subscription.Type == common.Mutation { if subscription.Type == common.Mutation {
@ -27,5 +27,5 @@ func RetransmitSubscriptionStartMessages(hc *common.HasuraConnection, fromBrowse
} }
} }
} }
hc.Browserconn.ActiveSubscriptionsMutex.RUnlock() hc.BrowserConn.ActiveSubscriptionsMutex.RUnlock()
} }

View File

@ -138,8 +138,8 @@ func InvalidateSessionTokenConnections(sessionTokenToInvalidate string) {
for _, browserConnection := range BrowserConnections { for _, browserConnection := range BrowserConnections {
if browserConnection.SessionToken == sessionTokenToInvalidate { if browserConnection.SessionToken == sessionTokenToInvalidate {
if browserConnection.HasuraConnection != nil { if browserConnection.HasuraConnection != nil {
//Close chan to force stop receiving new messages from the browser //Send message to force stop receiving new messages from the browser
browserConnection.HasuraConnection.MsgReceivingActiveChan.Close() browserConnection.HasuraConnection.FreezeMsgFromBrowserChan.Send(true)
// Wait until there are no active mutations // Wait until there are no active mutations
for iterationCount := 0; iterationCount < 20; iterationCount++ { for iterationCount := 0; iterationCount < 20; iterationCount++ {

View File

@ -2,6 +2,7 @@ package reader
import ( import (
"context" "context"
"errors"
"github.com/iMDT/bbb-graphql-middleware/internal/common" "github.com/iMDT/bbb-graphql-middleware/internal/common"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"nhooyr.io/websocket" "nhooyr.io/websocket"
@ -35,7 +36,11 @@ func BrowserConnectionReader(browserConnectionId string, ctx context.Context, ct
var v interface{} var v interface{}
err := wsjson.Read(ctx, browserWsConn, &v) err := wsjson.Read(ctx, browserWsConn, &v)
if err != nil { if err != nil {
log.Debugf("Browser is disconnected, skiping reading of ws message: %v", err) if errors.Is(err, context.Canceled) {
log.Debugf("Closing Browser ws connection as Context was cancelled!")
} else {
log.Debugf("Hasura is disconnected, skiping reading of ws message: %v", err)
}
return return
} }

View File

@ -24,6 +24,9 @@ RangeLoop:
case toBrowserMessage := <-fromHasuraToBrowserChannel.ReceiveChannel(): case toBrowserMessage := <-fromHasuraToBrowserChannel.ReceiveChannel():
{ {
if toBrowserMessage == nil { if toBrowserMessage == nil {
if fromHasuraToBrowserChannel.Closed() {
break RangeLoop
}
continue continue
} }