Fix Graphql error
This commit is contained in:
parent
02f86c8e0a
commit
47fdc4a878
@ -5,9 +5,10 @@ import (
|
||||
)
|
||||
|
||||
type SafeChannel struct {
|
||||
ch chan interface{}
|
||||
closed bool
|
||||
mux sync.Mutex
|
||||
ch chan interface{}
|
||||
closed bool
|
||||
mux sync.Mutex
|
||||
freezeFlag bool
|
||||
}
|
||||
|
||||
func NewSafeChannel(size int) *SafeChannel {
|
||||
@ -45,3 +46,17 @@ func (s *SafeChannel) Close() {
|
||||
s.closed = true
|
||||
}
|
||||
}
|
||||
|
||||
func (s *SafeChannel) FreezeChannel() {
|
||||
if !s.freezeFlag {
|
||||
s.mux.Lock()
|
||||
s.freezeFlag = true
|
||||
}
|
||||
}
|
||||
|
||||
func (s *SafeChannel) UnfreezeChannel() {
|
||||
if s.freezeFlag {
|
||||
s.mux.Unlock()
|
||||
s.freezeFlag = false
|
||||
}
|
||||
}
|
||||
|
@ -38,6 +38,7 @@ type BrowserConnection struct {
|
||||
ConnectionInitMessage map[string]interface{} // init message received in this connection (to be used on hasura reconnect)
|
||||
HasuraConnection *HasuraConnection // associated hasura connection
|
||||
Disconnected bool // indicate if the connection is gone
|
||||
ConnAckSentToBrowser bool // indicate if `connection_ack` msg was already sent to the browser
|
||||
}
|
||||
|
||||
type HasuraConnection struct {
|
||||
|
@ -67,7 +67,13 @@ func HasuraClient(browserConnection *common.BrowserConnection, cookies []*http.C
|
||||
}
|
||||
|
||||
browserConnection.HasuraConnection = &thisConnection
|
||||
defer func() { browserConnection.HasuraConnection = nil }()
|
||||
defer func() {
|
||||
browserConnection.HasuraConnection = nil
|
||||
|
||||
//It's necessary to freeze the channel to avoid client trying to start subscriptions before Hasura connection is initialised
|
||||
//It will unfreeze after `connection_ack` is sent by Hasura
|
||||
fromBrowserToHasuraChannel.FreezeChannel()
|
||||
}()
|
||||
|
||||
// Make the connection
|
||||
c, _, err := websocket.Dial(hasuraConnectionContext, hasuraEndpoint, &dialOptions)
|
||||
@ -90,16 +96,11 @@ func HasuraClient(browserConnection *common.BrowserConnection, cookies []*http.C
|
||||
// Start routines
|
||||
|
||||
// reads from browser, writes to hasura
|
||||
go writer.HasuraConnectionWriter(&thisConnection, fromBrowserToHasuraChannel, &wg)
|
||||
go writer.HasuraConnectionWriter(&thisConnection, fromBrowserToHasuraChannel, &wg, browserConnection.ConnectionInitMessage)
|
||||
|
||||
// reads from hasura, writes to browser
|
||||
go reader.HasuraConnectionReader(&thisConnection, fromHasuraToBrowserChannel, fromBrowserToHasuraChannel, &wg)
|
||||
|
||||
// if it's a reconnect, inject authentication
|
||||
if !browserConnection.Disconnected && browserConnection.ConnectionInitMessage != nil {
|
||||
fromBrowserToHasuraChannel.Send(browserConnection.ConnectionInitMessage)
|
||||
}
|
||||
|
||||
// Wait
|
||||
wg.Wait()
|
||||
|
||||
|
@ -80,13 +80,22 @@ func HasuraConnectionReader(hc *common.HasuraConnection, fromHasuraToBrowserChan
|
||||
}
|
||||
}
|
||||
|
||||
// Write the message to browser
|
||||
fromHasuraToBrowserChannel.Send(messageAsMap)
|
||||
|
||||
// Retransmit the subscription start commands when hasura confirms the connection
|
||||
// this is useful in case of a connection invalidation
|
||||
if messageType == "connection_ack" {
|
||||
//Hasura connection was initialized, now it's able to send new messages to Hasura
|
||||
fromBrowserToHasuraChannel.UnfreezeChannel()
|
||||
|
||||
//Avoid to send `connection_ack` to the browser when it's a reconnection
|
||||
if hc.Browserconn.ConnAckSentToBrowser == false {
|
||||
fromHasuraToBrowserChannel.Send(messageAsMap)
|
||||
hc.Browserconn.ConnAckSentToBrowser = true
|
||||
}
|
||||
|
||||
go retransmiter.RetransmitSubscriptionStartMessages(hc, fromBrowserToHasuraChannel)
|
||||
} else {
|
||||
// Forward the message to browser
|
||||
fromHasuraToBrowserChannel.Send(messageAsMap)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -12,7 +12,7 @@ import (
|
||||
|
||||
// HasuraConnectionWriter
|
||||
// process messages (middleware to hasura)
|
||||
func HasuraConnectionWriter(hc *common.HasuraConnection, fromBrowserToHasuraChannel *common.SafeChannel, wg *sync.WaitGroup) {
|
||||
func HasuraConnectionWriter(hc *common.HasuraConnection, fromBrowserToHasuraChannel *common.SafeChannel, wg *sync.WaitGroup, initMessage map[string]interface{}) {
|
||||
log := log.WithField("_routine", "HasuraConnectionWriter")
|
||||
|
||||
browserConnection := hc.Browserconn
|
||||
@ -23,6 +23,17 @@ func HasuraConnectionWriter(hc *common.HasuraConnection, fromBrowserToHasuraChan
|
||||
defer hc.ContextCancelFunc()
|
||||
defer log.Debugf("finished")
|
||||
|
||||
//Send authentication (init) message at first
|
||||
//It will not use the channel (fromBrowserToHasuraChannel) because this msg must bypass ChannelFreeze
|
||||
if initMessage != nil {
|
||||
log.Infof("it's a reconnection, injecting authentication (init) message")
|
||||
err := wsjson.Write(hc.Context, hc.Websocket, initMessage)
|
||||
if err != nil {
|
||||
log.Errorf("error on write authentication (init) message (we're disconnected from hasura): %v", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
RangeLoop:
|
||||
for {
|
||||
select {
|
||||
|
@ -54,9 +54,10 @@ func ConnectionHandler(w http.ResponseWriter, r *http.Request) {
|
||||
defer c.Close(websocket.StatusInternalError, "the sky is falling")
|
||||
|
||||
var thisConnection = common.BrowserConnection{
|
||||
Id: browserConnectionId,
|
||||
ActiveSubscriptions: make(map[string]common.GraphQlSubscription, 1),
|
||||
Context: browserConnectionContext,
|
||||
Id: browserConnectionId,
|
||||
ActiveSubscriptions: make(map[string]common.GraphQlSubscription, 1),
|
||||
Context: browserConnectionContext,
|
||||
ConnAckSentToBrowser: false,
|
||||
}
|
||||
|
||||
BrowserConnectionsMutex.Lock()
|
||||
@ -97,8 +98,8 @@ func ConnectionHandler(w http.ResponseWriter, r *http.Request) {
|
||||
BrowserConnectionsMutex.RLock()
|
||||
thisBrowserConnection := BrowserConnections[browserConnectionId]
|
||||
BrowserConnectionsMutex.RUnlock()
|
||||
log.Debugf("created hasura client")
|
||||
if thisBrowserConnection != nil {
|
||||
log.Debugf("created hasura client")
|
||||
hascli.HasuraClient(thisBrowserConnection, r.Cookies(), fromBrowserToHasuraChannel, fromHasuraToBrowserChannel)
|
||||
}
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
@ -10,14 +10,14 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
func BrowserConnectionReader(browserConnectionId string, ctx context.Context, c *websocket.Conn, fromBrowserToHasuraChannel1 *common.SafeChannel, fromBrowserToHasuraChannel2 *common.SafeChannel, waitGroups []*sync.WaitGroup) {
|
||||
func BrowserConnectionReader(browserConnectionId string, ctx context.Context, c *websocket.Conn, fromBrowserToHasuraChannel *common.SafeChannel, fromBrowserToHasuraConnectionEstablishingChannel *common.SafeChannel, waitGroups []*sync.WaitGroup) {
|
||||
log := log.WithField("_routine", "BrowserConnectionReader").WithField("browserConnectionId", browserConnectionId)
|
||||
defer log.Debugf("finished")
|
||||
log.Debugf("starting")
|
||||
|
||||
defer func() {
|
||||
fromBrowserToHasuraChannel1.Close()
|
||||
fromBrowserToHasuraChannel2.Close()
|
||||
fromBrowserToHasuraChannel.Close()
|
||||
fromBrowserToHasuraConnectionEstablishingChannel.Close()
|
||||
}()
|
||||
|
||||
defer func() {
|
||||
@ -41,8 +41,9 @@ func BrowserConnectionReader(browserConnectionId string, ctx context.Context, c
|
||||
}
|
||||
|
||||
log.Tracef("received from browser: %v", v)
|
||||
//fmt.Println("received from browser: %v", v)
|
||||
|
||||
fromBrowserToHasuraChannel1.Send(v)
|
||||
fromBrowserToHasuraChannel2.Send(v)
|
||||
fromBrowserToHasuraChannel.Send(v)
|
||||
fromBrowserToHasuraConnectionEstablishingChannel.Send(v)
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user