2023-04-27 09:03:40 +08:00
|
|
|
package reader
|
|
|
|
|
|
|
|
import (
|
|
|
|
"github.com/iMDT/bbb-graphql-middleware/internal/common"
|
|
|
|
"github.com/iMDT/bbb-graphql-middleware/internal/hascli/replayer"
|
2023-05-10 02:37:58 +08:00
|
|
|
"github.com/iMDT/bbb-graphql-middleware/internal/msgpatch"
|
2023-04-27 09:03:40 +08:00
|
|
|
log "github.com/sirupsen/logrus"
|
|
|
|
"nhooyr.io/websocket/wsjson"
|
|
|
|
"sync"
|
|
|
|
)
|
|
|
|
|
|
|
|
// HasuraConnectionReader consumes messages from Hasura connection and add send to the browser channel
|
|
|
|
func HasuraConnectionReader(hc *common.HasuraConnection, fromHasuraToBrowserChannel chan interface{}, fromBrowserToHasuraChannel chan interface{}, wg *sync.WaitGroup) {
|
|
|
|
log := log.WithField("_routine", "HasuraConnectionReader").WithField("browserConnectionId", hc.Browserconn.Id).WithField("hasuraConnectionId", hc.Id)
|
2023-09-30 07:05:23 +08:00
|
|
|
defer log.Debugf("finished")
|
|
|
|
log.Debugf("starting")
|
2023-04-27 09:03:40 +08:00
|
|
|
|
|
|
|
defer wg.Done()
|
|
|
|
defer hc.ContextCancelFunc()
|
|
|
|
|
|
|
|
for {
|
|
|
|
// Read a message from hasura
|
|
|
|
var message interface{}
|
|
|
|
err := wsjson.Read(hc.Context, hc.Websocket, &message)
|
|
|
|
if err != nil {
|
|
|
|
log.Errorf("Error: %v", err)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
log.Tracef("received from hasura: %v", message)
|
|
|
|
|
|
|
|
var messageAsMap = message.(map[string]interface{})
|
2023-07-06 20:34:48 +08:00
|
|
|
|
2023-05-10 02:37:58 +08:00
|
|
|
if messageAsMap != nil {
|
|
|
|
var messageType = messageAsMap["type"]
|
|
|
|
var queryId, _ = messageAsMap["id"].(string)
|
2023-04-27 09:03:40 +08:00
|
|
|
|
2023-05-10 02:37:58 +08:00
|
|
|
//Check if subscription is still active!
|
|
|
|
if queryId != "" {
|
2023-07-06 20:34:48 +08:00
|
|
|
hc.Browserconn.ActiveSubscriptionsMutex.RLock()
|
2023-05-10 02:37:58 +08:00
|
|
|
subscription, ok := hc.Browserconn.ActiveSubscriptions[queryId]
|
2023-07-06 20:34:48 +08:00
|
|
|
hc.Browserconn.ActiveSubscriptionsMutex.RUnlock()
|
2023-05-10 02:37:58 +08:00
|
|
|
if !ok {
|
|
|
|
log.Debugf("Subscription with Id %s doesn't exist anymore, skiping response.", queryId)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
//When Hasura send msg type "complete", this query is finished
|
|
|
|
if messageType == "complete" {
|
|
|
|
hc.Browserconn.ActiveSubscriptionsMutex.Lock()
|
|
|
|
delete(hc.Browserconn.ActiveSubscriptions, queryId)
|
|
|
|
hc.Browserconn.ActiveSubscriptionsMutex.Unlock()
|
2023-09-07 22:54:27 +08:00
|
|
|
log.Debugf("Subscription with Id %s finished by Hasura.", queryId)
|
2023-05-10 02:37:58 +08:00
|
|
|
}
|
2023-04-27 09:03:40 +08:00
|
|
|
|
2023-05-10 02:37:58 +08:00
|
|
|
//Apply msg patch when it supports it
|
2023-05-25 06:31:31 +08:00
|
|
|
if subscription.JsonPatchSupported &&
|
2023-05-10 02:37:58 +08:00
|
|
|
messageType == "data" &&
|
|
|
|
subscription.Type == common.Subscription {
|
|
|
|
msgpatch.PatchMessage(&messageAsMap, hc.Browserconn)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Write the message to browser
|
|
|
|
fromHasuraToBrowserChannel <- messageAsMap
|
|
|
|
|
|
|
|
// Replay the subscription start commands when hasura confirms the connection
|
|
|
|
// this is useful in case of a connection invalidation
|
|
|
|
if messageType == "connection_ack" {
|
|
|
|
go replayer.ReplaySubscriptionStartMessages(hc, fromBrowserToHasuraChannel)
|
|
|
|
}
|
|
|
|
}
|
2023-04-27 09:03:40 +08:00
|
|
|
}
|
|
|
|
}
|