From 47fdc4a87889f138112e4465f4efad5f3bc30627 Mon Sep 17 00:00:00 2001 From: Gustavo Trott Date: Tue, 23 Jan 2024 19:32:11 -0300 Subject: [PATCH] Fix Graphql error --- .../internal/common/SafeChannel.go | 21 ++++++++++++++++--- .../internal/common/types.go | 1 + .../internal/hascli/client.go | 15 ++++++------- .../internal/hascli/conn/reader/reader.go | 15 ++++++++++--- .../internal/hascli/conn/writer/writer.go | 13 +++++++++++- .../internal/websrv/connhandler.go | 9 ++++---- .../internal/websrv/reader/reader.go | 11 +++++----- 7 files changed, 62 insertions(+), 23 deletions(-) diff --git a/bbb-graphql-middleware/internal/common/SafeChannel.go b/bbb-graphql-middleware/internal/common/SafeChannel.go index 527576443e..fdc11c0784 100644 --- a/bbb-graphql-middleware/internal/common/SafeChannel.go +++ b/bbb-graphql-middleware/internal/common/SafeChannel.go @@ -5,9 +5,10 @@ import ( ) type SafeChannel struct { - ch chan interface{} - closed bool - mux sync.Mutex + ch chan interface{} + closed bool + mux sync.Mutex + freezeFlag bool } func NewSafeChannel(size int) *SafeChannel { @@ -45,3 +46,17 @@ func (s *SafeChannel) Close() { s.closed = true } } + +func (s *SafeChannel) FreezeChannel() { + if !s.freezeFlag { + s.mux.Lock() + s.freezeFlag = true + } +} + +func (s *SafeChannel) UnfreezeChannel() { + if s.freezeFlag { + s.mux.Unlock() + s.freezeFlag = false + } +} diff --git a/bbb-graphql-middleware/internal/common/types.go b/bbb-graphql-middleware/internal/common/types.go index 15bc7e5db5..f3ea4e1c83 100644 --- a/bbb-graphql-middleware/internal/common/types.go +++ b/bbb-graphql-middleware/internal/common/types.go @@ -38,6 +38,7 @@ type BrowserConnection struct { ConnectionInitMessage map[string]interface{} // init message received in this connection (to be used on hasura reconnect) HasuraConnection *HasuraConnection // associated hasura connection Disconnected bool // indicate if the connection is gone + ConnAckSentToBrowser bool // indicate if `connection_ack` msg was already sent to the browser } type HasuraConnection struct { diff --git a/bbb-graphql-middleware/internal/hascli/client.go b/bbb-graphql-middleware/internal/hascli/client.go index 224a066615..5bf445fb1f 100644 --- a/bbb-graphql-middleware/internal/hascli/client.go +++ b/bbb-graphql-middleware/internal/hascli/client.go @@ -67,7 +67,13 @@ func HasuraClient(browserConnection *common.BrowserConnection, cookies []*http.C } browserConnection.HasuraConnection = &thisConnection - defer func() { browserConnection.HasuraConnection = nil }() + defer func() { + browserConnection.HasuraConnection = nil + + //It's necessary to freeze the channel to avoid client trying to start subscriptions before Hasura connection is initialised + //It will unfreeze after `connection_ack` is sent by Hasura + fromBrowserToHasuraChannel.FreezeChannel() + }() // Make the connection c, _, err := websocket.Dial(hasuraConnectionContext, hasuraEndpoint, &dialOptions) @@ -90,16 +96,11 @@ func HasuraClient(browserConnection *common.BrowserConnection, cookies []*http.C // Start routines // reads from browser, writes to hasura - go writer.HasuraConnectionWriter(&thisConnection, fromBrowserToHasuraChannel, &wg) + go writer.HasuraConnectionWriter(&thisConnection, fromBrowserToHasuraChannel, &wg, browserConnection.ConnectionInitMessage) // reads from hasura, writes to browser go reader.HasuraConnectionReader(&thisConnection, fromHasuraToBrowserChannel, fromBrowserToHasuraChannel, &wg) - // if it's a reconnect, inject authentication - if !browserConnection.Disconnected && browserConnection.ConnectionInitMessage != nil { - fromBrowserToHasuraChannel.Send(browserConnection.ConnectionInitMessage) - } - // Wait wg.Wait() diff --git a/bbb-graphql-middleware/internal/hascli/conn/reader/reader.go b/bbb-graphql-middleware/internal/hascli/conn/reader/reader.go index c66a299c35..07b01089f7 100644 --- a/bbb-graphql-middleware/internal/hascli/conn/reader/reader.go +++ b/bbb-graphql-middleware/internal/hascli/conn/reader/reader.go @@ -80,13 +80,22 @@ func HasuraConnectionReader(hc *common.HasuraConnection, fromHasuraToBrowserChan } } - // Write the message to browser - fromHasuraToBrowserChannel.Send(messageAsMap) - // Retransmit the subscription start commands when hasura confirms the connection // this is useful in case of a connection invalidation if messageType == "connection_ack" { + //Hasura connection was initialized, now it's able to send new messages to Hasura + fromBrowserToHasuraChannel.UnfreezeChannel() + + //Avoid to send `connection_ack` to the browser when it's a reconnection + if hc.Browserconn.ConnAckSentToBrowser == false { + fromHasuraToBrowserChannel.Send(messageAsMap) + hc.Browserconn.ConnAckSentToBrowser = true + } + go retransmiter.RetransmitSubscriptionStartMessages(hc, fromBrowserToHasuraChannel) + } else { + // Forward the message to browser + fromHasuraToBrowserChannel.Send(messageAsMap) } } } diff --git a/bbb-graphql-middleware/internal/hascli/conn/writer/writer.go b/bbb-graphql-middleware/internal/hascli/conn/writer/writer.go index 334bc3c01a..9d7ea23837 100644 --- a/bbb-graphql-middleware/internal/hascli/conn/writer/writer.go +++ b/bbb-graphql-middleware/internal/hascli/conn/writer/writer.go @@ -12,7 +12,7 @@ import ( // HasuraConnectionWriter // process messages (middleware to hasura) -func HasuraConnectionWriter(hc *common.HasuraConnection, fromBrowserToHasuraChannel *common.SafeChannel, wg *sync.WaitGroup) { +func HasuraConnectionWriter(hc *common.HasuraConnection, fromBrowserToHasuraChannel *common.SafeChannel, wg *sync.WaitGroup, initMessage map[string]interface{}) { log := log.WithField("_routine", "HasuraConnectionWriter") browserConnection := hc.Browserconn @@ -23,6 +23,17 @@ func HasuraConnectionWriter(hc *common.HasuraConnection, fromBrowserToHasuraChan defer hc.ContextCancelFunc() defer log.Debugf("finished") + //Send authentication (init) message at first + //It will not use the channel (fromBrowserToHasuraChannel) because this msg must bypass ChannelFreeze + if initMessage != nil { + log.Infof("it's a reconnection, injecting authentication (init) message") + err := wsjson.Write(hc.Context, hc.Websocket, initMessage) + if err != nil { + log.Errorf("error on write authentication (init) message (we're disconnected from hasura): %v", err) + return + } + } + RangeLoop: for { select { diff --git a/bbb-graphql-middleware/internal/websrv/connhandler.go b/bbb-graphql-middleware/internal/websrv/connhandler.go index 1e25a30cbc..038c755b6a 100644 --- a/bbb-graphql-middleware/internal/websrv/connhandler.go +++ b/bbb-graphql-middleware/internal/websrv/connhandler.go @@ -54,9 +54,10 @@ func ConnectionHandler(w http.ResponseWriter, r *http.Request) { defer c.Close(websocket.StatusInternalError, "the sky is falling") var thisConnection = common.BrowserConnection{ - Id: browserConnectionId, - ActiveSubscriptions: make(map[string]common.GraphQlSubscription, 1), - Context: browserConnectionContext, + Id: browserConnectionId, + ActiveSubscriptions: make(map[string]common.GraphQlSubscription, 1), + Context: browserConnectionContext, + ConnAckSentToBrowser: false, } BrowserConnectionsMutex.Lock() @@ -97,8 +98,8 @@ func ConnectionHandler(w http.ResponseWriter, r *http.Request) { BrowserConnectionsMutex.RLock() thisBrowserConnection := BrowserConnections[browserConnectionId] BrowserConnectionsMutex.RUnlock() - log.Debugf("created hasura client") if thisBrowserConnection != nil { + log.Debugf("created hasura client") hascli.HasuraClient(thisBrowserConnection, r.Cookies(), fromBrowserToHasuraChannel, fromHasuraToBrowserChannel) } time.Sleep(100 * time.Millisecond) diff --git a/bbb-graphql-middleware/internal/websrv/reader/reader.go b/bbb-graphql-middleware/internal/websrv/reader/reader.go index 9ba7a8aca4..2915ff5381 100644 --- a/bbb-graphql-middleware/internal/websrv/reader/reader.go +++ b/bbb-graphql-middleware/internal/websrv/reader/reader.go @@ -10,14 +10,14 @@ import ( "time" ) -func BrowserConnectionReader(browserConnectionId string, ctx context.Context, c *websocket.Conn, fromBrowserToHasuraChannel1 *common.SafeChannel, fromBrowserToHasuraChannel2 *common.SafeChannel, waitGroups []*sync.WaitGroup) { +func BrowserConnectionReader(browserConnectionId string, ctx context.Context, c *websocket.Conn, fromBrowserToHasuraChannel *common.SafeChannel, fromBrowserToHasuraConnectionEstablishingChannel *common.SafeChannel, waitGroups []*sync.WaitGroup) { log := log.WithField("_routine", "BrowserConnectionReader").WithField("browserConnectionId", browserConnectionId) defer log.Debugf("finished") log.Debugf("starting") defer func() { - fromBrowserToHasuraChannel1.Close() - fromBrowserToHasuraChannel2.Close() + fromBrowserToHasuraChannel.Close() + fromBrowserToHasuraConnectionEstablishingChannel.Close() }() defer func() { @@ -41,8 +41,9 @@ func BrowserConnectionReader(browserConnectionId string, ctx context.Context, c } log.Tracef("received from browser: %v", v) + //fmt.Println("received from browser: %v", v) - fromBrowserToHasuraChannel1.Send(v) - fromBrowserToHasuraChannel2.Send(v) + fromBrowserToHasuraChannel.Send(v) + fromBrowserToHasuraConnectionEstablishingChannel.Send(v) } }