bigbluebutton-Github/bbb-graphql-middleware/internal/hascli/conn/writer/writer.go

143 lines
5.2 KiB
Go
Raw Normal View History

2023-04-27 09:03:40 +08:00
package writer
import (
"github.com/iMDT/bbb-graphql-middleware/internal/msgpatch"
"strings"
2023-04-27 09:03:40 +08:00
"sync"
"github.com/iMDT/bbb-graphql-middleware/internal/common"
log "github.com/sirupsen/logrus"
"nhooyr.io/websocket/wsjson"
)
// HasuraConnectionWriter
// process messages (middleware to hasura)
func HasuraConnectionWriter(hc *common.HasuraConnection, fromBrowserToHasuraChannel *common.SafeChannel, wg *sync.WaitGroup, initMessage map[string]interface{}) {
2023-04-27 09:03:40 +08:00
log := log.WithField("_routine", "HasuraConnectionWriter")
browserConnection := hc.Browserconn
log = log.WithField("browserConnectionId", browserConnection.Id).WithField("hasuraConnectionId", hc.Id)
defer wg.Done()
defer hc.ContextCancelFunc()
defer log.Debugf("finished")
2023-04-27 09:03:40 +08:00
//Send authentication (init) message at first
//It will not use the channel (fromBrowserToHasuraChannel) because this msg must bypass ChannelFreeze
if initMessage != nil {
log.Infof("it's a reconnection, injecting authentication (init) message")
err := wsjson.Write(hc.Context, hc.Websocket, initMessage)
if err != nil {
log.Errorf("error on write authentication (init) message (we're disconnected from hasura): %v", err)
return
}
}
2023-04-27 09:03:40 +08:00
RangeLoop:
for {
select {
case <-hc.Context.Done():
break RangeLoop
case fromBrowserMessage := <-fromBrowserToHasuraChannel.ReceiveChannel():
2023-04-27 09:03:40 +08:00
{
if fromBrowserMessage == nil {
continue
}
var fromBrowserMessageAsMap = fromBrowserMessage.(map[string]interface{})
if fromBrowserMessageAsMap["type"] == "start" {
var queryId = fromBrowserMessageAsMap["id"].(string)
//Identify type based on query string
messageType := common.Query
streamCursorField := ""
streamCursorVariableName := ""
var streamCursorInitialValue interface{}
payload := fromBrowserMessageAsMap["payload"].(map[string]interface{})
query, ok := payload["query"].(string)
if ok {
if strings.HasPrefix(query, "subscription") {
messageType = common.Subscription
if strings.Contains(query, "_stream(") && strings.Contains(query, "cursor: {") {
messageType = common.Streaming
browserConnection.ActiveSubscriptionsMutex.RLock()
_, queryIdExists := browserConnection.ActiveSubscriptions[queryId]
browserConnection.ActiveSubscriptionsMutex.RUnlock()
if !queryIdExists {
streamCursorField, streamCursorVariableName, streamCursorInitialValue = common.GetStreamCursorPropsFromQuery(payload, query)
//It's necessary to assure the cursor field will return in the result of the query
//To be able to store the last received cursor value
payload["query"] = common.PatchQueryIncludingCursorField(query, streamCursorField)
fromBrowserMessageAsMap["payload"] = payload
}
}
if strings.Contains(query, "_aggregate") && strings.Contains(query, "aggregate {") {
messageType = common.SubscriptionAggregate
}
}
if strings.HasPrefix(query, "mutation") {
messageType = common.Mutation
}
}
//Identify if the client that requested this subscription expects to receive json-patch
//Client append `Patched_` to the query operationName to indicate that it supports
jsonPatchSupported := false
operationName, ok := payload["operationName"].(string)
if ok && strings.HasPrefix(operationName, "Patched_") {
jsonPatchSupported = true
}
browserConnection.ActiveSubscriptionsMutex.Lock()
browserConnection.ActiveSubscriptions[queryId] = common.GraphQlSubscription{
Id: queryId,
Message: fromBrowserMessageAsMap,
OperationName: operationName,
StreamCursorField: streamCursorField,
StreamCursorVariableName: streamCursorVariableName,
StreamCursorCurrValue: streamCursorInitialValue,
LastSeenOnHasuraConnection: hc.Id,
JsonPatchSupported: jsonPatchSupported,
Type: messageType,
2023-04-27 09:03:40 +08:00
}
// log.Tracef("Current queries: %v", browserConnection.ActiveSubscriptions)
browserConnection.ActiveSubscriptionsMutex.Unlock()
2023-04-27 09:03:40 +08:00
}
if fromBrowserMessageAsMap["type"] == "stop" {
var queryId = fromBrowserMessageAsMap["id"].(string)
browserConnection.ActiveSubscriptionsMutex.RLock()
jsonPatchSupported := browserConnection.ActiveSubscriptions[queryId].JsonPatchSupported
browserConnection.ActiveSubscriptionsMutex.RUnlock()
if jsonPatchSupported {
msgpatch.RemoveConnSubscriptionCacheFile(browserConnection, queryId)
}
2023-04-27 09:03:40 +08:00
browserConnection.ActiveSubscriptionsMutex.Lock()
delete(browserConnection.ActiveSubscriptions, queryId)
// log.Tracef("Current queries: %v", browserConnection.ActiveSubscriptions)
browserConnection.ActiveSubscriptionsMutex.Unlock()
}
if fromBrowserMessageAsMap["type"] == "connection_init" {
browserConnection.ConnectionInitMessage = fromBrowserMessageAsMap
2023-04-27 09:03:40 +08:00
}
log.Tracef("sending to hasura: %v", fromBrowserMessageAsMap)
err := wsjson.Write(hc.Context, hc.Websocket, fromBrowserMessageAsMap)
2023-04-27 09:03:40 +08:00
if err != nil {
log.Errorf("error on write (we're disconnected from hasura): %v", err)
return
}
}
}
}
}