From ede7a2dd2570bfbdd2e7e437ad63a6bd9b8c3a70 Mon Sep 17 00:00:00 2001 From: Gustavo Trott Date: Tue, 25 Jun 2024 10:27:44 -0300 Subject: [PATCH 1/3] Bunch of improvements for messages parsing --- .../cmd/bbb-graphql-middleware/main.go | 13 +- .../internal/common/CustomCache.go | 8 +- .../internal/common/CustomJsonPatcher.go | 32 ++- .../internal/common/GlobalState.go | 92 +++++-- .../internal/common/SafeChannelByte.go | 73 ++++++ .../internal/common/StreamCursorUtils.go | 171 +++++++------ .../internal/common/types.go | 26 +- .../internal/gql_actions/client.go | 38 +-- .../internal/hasura/client.go | 4 +- .../internal/hasura/conn/reader/reader.go | 229 ++++++++++-------- .../internal/hasura/conn/writer/writer.go | 79 +++--- .../hasura/retransmiter/retransmiter.go | 2 +- .../internal/msgpatch/jsonpatch.go | 89 ++++--- .../internal/websrv/connhandler.go | 22 +- .../internal/websrv/reader/reader.go | 43 ++-- .../internal/websrv/writer/writer.go | 26 +- 16 files changed, 603 insertions(+), 344 deletions(-) create mode 100644 bbb-graphql-middleware/internal/common/SafeChannelByte.go diff --git a/bbb-graphql-middleware/cmd/bbb-graphql-middleware/main.go b/bbb-graphql-middleware/cmd/bbb-graphql-middleware/main.go index 2215380913..f375f34427 100644 --- a/bbb-graphql-middleware/cmd/bbb-graphql-middleware/main.go +++ b/bbb-graphql-middleware/cmd/bbb-graphql-middleware/main.go @@ -45,11 +45,14 @@ func main() { log.Infof("Json Patch Disabled!") } - if rawDataCacheStorageMode := os.Getenv("BBB_GRAPHQL_MIDDLEWARE_RAW_DATA_CACHE_STORAGE_MODE"); rawDataCacheStorageMode == "memory" { - msgpatch.RawDataCacheStorageMode = "memory" - } else { - msgpatch.RawDataCacheStorageMode = "file" - } + //if rawDataCacheStorageMode := os.Getenv("BBB_GRAPHQL_MIDDLEWARE_RAW_DATA_CACHE_STORAGE_MODE"); rawDataCacheStorageMode == "file" { + // msgpatch.RawDataCacheStorageMode = "file" + //} else { + // msgpatch.RawDataCacheStorageMode = "memory" + //} + //Force memory cache for now + msgpatch.RawDataCacheStorageMode = "memory" + log.Infof("Raw Data Cache Storage Mode: %s", msgpatch.RawDataCacheStorageMode) // Websocket listener diff --git a/bbb-graphql-middleware/internal/common/CustomCache.go b/bbb-graphql-middleware/internal/common/CustomCache.go index 49c3003dfe..ad5ff752ec 100644 --- a/bbb-graphql-middleware/internal/common/CustomCache.go +++ b/bbb-graphql-middleware/internal/common/CustomCache.go @@ -7,17 +7,17 @@ import ( var GlobalCacheLocks = NewCacheLocks() type CacheLocks struct { - locks map[string]*sync.Mutex + locks map[uint32]*sync.Mutex mutex sync.Mutex // Protects the 'locks' map } func NewCacheLocks() *CacheLocks { return &CacheLocks{ - locks: make(map[string]*sync.Mutex), + locks: make(map[uint32]*sync.Mutex), } } -func (c *CacheLocks) Lock(id string) { +func (c *CacheLocks) Lock(id uint32) { c.mutex.Lock() if _, exists := c.locks[id]; !exists { c.locks[id] = &sync.Mutex{} @@ -28,7 +28,7 @@ func (c *CacheLocks) Lock(id string) { mtx.Lock() // Lock the specific ID mutex } -func (c *CacheLocks) Unlock(id string) { +func (c *CacheLocks) Unlock(id uint32) { c.mutex.Lock() if mtx, exists := c.locks[id]; exists { mtx.Unlock() diff --git a/bbb-graphql-middleware/internal/common/CustomJsonPatcher.go b/bbb-graphql-middleware/internal/common/CustomJsonPatcher.go index 7eb858d504..efaa1acfa7 100644 --- a/bbb-graphql-middleware/internal/common/CustomJsonPatcher.go +++ b/bbb-graphql-middleware/internal/common/CustomJsonPatcher.go @@ -1,6 +1,7 @@ package common import ( + "bytes" "encoding/json" "fmt" evanphxjsonpatch "github.com/evanphx/json-patch" @@ -8,46 +9,51 @@ import ( log "github.com/sirupsen/logrus" ) -func ValidateIfShouldUseCustomJsonPatch(original []byte, modified []byte, idFieldName string) bool { +func ValidateIfShouldUseCustomJsonPatch(original []byte, modified []byte, idFieldName string) (bool, []byte) { + //Temporarily use CustomPatch only for UserList (testing feature) + if !bytes.Contains(modified, []byte("\"__typename\":\"user\"}]")) { + return false, nil + } + //Test Original Data originalMap := GetMapFromByte(original) if originalMap == nil { - return false + return false, nil } if len(originalMap) <= 1 { - return false + return false, nil } firstItem := originalMap[0] if _, existsIdField := firstItem[idFieldName].(string); !existsIdField { - return false + return false, nil } if hasDuplicatedId(originalMap, idFieldName) { - return false + return false, nil } //Test Modified Data modifiedMap := GetMapFromByte(modified) if modifiedMap == nil { - return false + return false, nil } if len(modifiedMap) <= 1 { - return false + return false, nil } firstItem = modifiedMap[0] if _, existsIdField := firstItem[idFieldName].(string); !existsIdField { - return false + return false, nil } if hasDuplicatedId(modifiedMap, idFieldName) { - return false + return false, nil } - return true + return true, CreateJsonPatchFromMaps(originalMap, modifiedMap, modified, "userId") } func hasDuplicatedId(items []map[string]interface{}, idFieldName string) bool { @@ -67,11 +73,11 @@ func CreateJsonPatch(original []byte, modified []byte, idFieldName string) []byt originalMap := GetMapFromByte(original) modifiedMap := GetMapFromByte(modified) - return CreateJsonPatchFromMaps(originalMap, modifiedMap, idFieldName) + return CreateJsonPatchFromMaps(originalMap, modifiedMap, modified, idFieldName) } -func CreateJsonPatchFromMaps(original []map[string]interface{}, modified []map[string]interface{}, idFieldName string) []byte { - modifiedJson, _ := json.Marshal(modified) +func CreateJsonPatchFromMaps(original []map[string]interface{}, modified []map[string]interface{}, modifiedJson []byte, idFieldName string) []byte { + //modifiedJson, _ := json.Marshal(modified) //CREATE PATCHES FOR OPERATION "REPLACE" replacesPatches, originalWithReplaces := CreateReplacePatches(original, modified, idFieldName) diff --git a/bbb-graphql-middleware/internal/common/GlobalState.go b/bbb-graphql-middleware/internal/common/GlobalState.go index ccbba9273d..14928a20c4 100644 --- a/bbb-graphql-middleware/internal/common/GlobalState.go +++ b/bbb-graphql-middleware/internal/common/GlobalState.go @@ -16,31 +16,93 @@ func GetUniqueID() string { return uniqueID } -var JsonPatchCache = make(map[string][]byte) -var JsonPatchCacheMutex sync.RWMutex +var PatchedMessageCache = make(map[uint32][]byte) +var PatchedMessageCacheMutex sync.RWMutex -func GetJsonPatchCache(cacheKey string) ([]byte, bool) { - JsonPatchCacheMutex.RLock() - defer JsonPatchCacheMutex.RUnlock() +func GetPatchedMessageCache(cacheKey uint32) ([]byte, bool) { + PatchedMessageCacheMutex.RLock() + defer PatchedMessageCacheMutex.RUnlock() - jsonDiffPatch, jsonDiffPatchExists := JsonPatchCache[cacheKey] + jsonDiffPatch, jsonDiffPatchExists := PatchedMessageCache[cacheKey] return jsonDiffPatch, jsonDiffPatchExists } -func StoreJsonPatchCache(cacheKey string, data []byte) { - JsonPatchCacheMutex.Lock() - defer JsonPatchCacheMutex.Unlock() +func StorePatchedMessageCache(cacheKey uint32, data []byte) { + PatchedMessageCacheMutex.Lock() + defer PatchedMessageCacheMutex.Unlock() - JsonPatchCache[cacheKey] = data + PatchedMessageCache[cacheKey] = data //Remove the cache after 30 seconds - go RemoveJsonPatchCache(cacheKey, 30) + go RemovePatchedMessageCache(cacheKey, 30) } -func RemoveJsonPatchCache(cacheKey string, delayInSecs time.Duration) { +func RemovePatchedMessageCache(cacheKey uint32, delayInSecs time.Duration) { time.Sleep(delayInSecs * time.Second) - JsonPatchCacheMutex.Lock() - defer JsonPatchCacheMutex.Unlock() - delete(JsonPatchCache, cacheKey) + PatchedMessageCacheMutex.Lock() + defer PatchedMessageCacheMutex.Unlock() + delete(PatchedMessageCache, cacheKey) +} + +var HasuraMessageCache = make(map[uint32]HasuraMessage) +var HasuraMessageKeyCache = make(map[uint32]string) +var HasuraMessageCacheMutex sync.RWMutex + +func GetHasuraMessageCache(cacheKey uint32) (string, HasuraMessage, bool) { + HasuraMessageCacheMutex.RLock() + defer HasuraMessageCacheMutex.RUnlock() + + hasuraMessageDataKey, _ := HasuraMessageKeyCache[cacheKey] + hasuraMessage, hasuraMessageExists := HasuraMessageCache[cacheKey] + return hasuraMessageDataKey, hasuraMessage, hasuraMessageExists +} + +func StoreHasuraMessageCache(cacheKey uint32, dataKey string, hasuraMessage HasuraMessage) { + HasuraMessageCacheMutex.Lock() + defer HasuraMessageCacheMutex.Unlock() + + HasuraMessageKeyCache[cacheKey] = dataKey + HasuraMessageCache[cacheKey] = hasuraMessage + + //Remove the cache after 30 seconds + go RemoveHasuraMessageCache(cacheKey, 30) +} + +func RemoveHasuraMessageCache(cacheKey uint32, delayInSecs time.Duration) { + time.Sleep(delayInSecs * time.Second) + + HasuraMessageCacheMutex.Lock() + defer HasuraMessageCacheMutex.Unlock() + delete(HasuraMessageKeyCache, cacheKey) + delete(HasuraMessageCache, cacheKey) +} + +var StreamCursorValueCache = make(map[uint32]interface{}) +var StreamCursorValueCacheMutex sync.RWMutex + +func GetStreamCursorValueCache(cacheKey uint32) (interface{}, bool) { + StreamCursorValueCacheMutex.RLock() + defer StreamCursorValueCacheMutex.RUnlock() + + streamCursorValue, streamCursorValueExists := StreamCursorValueCache[cacheKey] + return streamCursorValue, streamCursorValueExists +} + +func StoreStreamCursorValueCache(cacheKey uint32, streamCursorValue interface{}) { + StreamCursorValueCacheMutex.Lock() + defer StreamCursorValueCacheMutex.Unlock() + + StreamCursorValueCache[cacheKey] = streamCursorValue + + //Remove the cache after 30 seconds + go RemoveStreamCursorValueCache(cacheKey, 30) +} + +func RemoveStreamCursorValueCache(cacheKey uint32, delayInSecs time.Duration) { + time.Sleep(delayInSecs * time.Second) + + StreamCursorValueCacheMutex.Lock() + defer StreamCursorValueCacheMutex.Unlock() + delete(StreamCursorValueCache, cacheKey) } diff --git a/bbb-graphql-middleware/internal/common/SafeChannelByte.go b/bbb-graphql-middleware/internal/common/SafeChannelByte.go new file mode 100644 index 0000000000..41d8034dfe --- /dev/null +++ b/bbb-graphql-middleware/internal/common/SafeChannelByte.go @@ -0,0 +1,73 @@ +package common + +import ( + "sync" +) + +type SafeChannelByte struct { + ch chan []byte + closed bool + mux sync.Mutex + freezeFlag bool +} + +func NewSafeChannelByte(size int) *SafeChannelByte { + return &SafeChannelByte{ + ch: make(chan []byte, size), + } +} + +func (s *SafeChannelByte) Send(value []byte) bool { + s.mux.Lock() + defer s.mux.Unlock() + + if s.closed { + return false + } + s.ch <- value + return true +} + +func (s *SafeChannelByte) Receive() ([]byte, bool) { + val, ok := <-s.ch + return val, ok +} + +func (s *SafeChannelByte) ReceiveChannel() <-chan []byte { + return s.ch +} + +func (s *SafeChannelByte) Closed() bool { + s.mux.Lock() + defer s.mux.Unlock() + + return s.closed +} + +func (s *SafeChannelByte) Close() { + s.mux.Lock() + defer s.mux.Unlock() + + if !s.closed { + close(s.ch) + s.closed = true + } +} + +func (s *SafeChannelByte) Frozen() bool { + return s.freezeFlag +} + +func (s *SafeChannelByte) FreezeChannel() { + if !s.freezeFlag { + s.mux.Lock() + s.freezeFlag = true + } +} + +func (s *SafeChannelByte) UnfreezeChannel() { + if s.freezeFlag { + s.mux.Unlock() + s.freezeFlag = false + } +} diff --git a/bbb-graphql-middleware/internal/common/StreamCursorUtils.go b/bbb-graphql-middleware/internal/common/StreamCursorUtils.go index 872a13dc32..1df451356a 100644 --- a/bbb-graphql-middleware/internal/common/StreamCursorUtils.go +++ b/bbb-graphql-middleware/internal/common/StreamCursorUtils.go @@ -1,27 +1,28 @@ package common import ( + "encoding/json" "fmt" + log "github.com/sirupsen/logrus" + "hash/crc32" "regexp" "strconv" "strings" ) -func GetStreamCursorPropsFromQuery(payload map[string]interface{}, query string) (string, string, interface{}) { +func GetStreamCursorPropsFromBrowserMessage(browserMessage BrowserSubscribeMessage) (string, string, interface{}) { streamCursorField := "" streamCursorVariableName := "" var streamCursorInitialValue interface{} cursorInitialValueRePattern := regexp.MustCompile(`cursor:\s*\{\s*initial_value\s*:\s*\{\s*([^:]+):\s*([^}]+)\s*}\s*}`) - matches := cursorInitialValueRePattern.FindStringSubmatch(query) + matches := cursorInitialValueRePattern.FindStringSubmatch(browserMessage.Payload.Query) if matches != nil { streamCursorField = matches[1] if strings.HasPrefix(matches[2], "$") { streamCursorVariableName, _ = strings.CutPrefix(matches[2], "$") - if variables, okVariables := payload["variables"].(map[string]interface{}); okVariables { - if targetVariableValue, okTargetVariableValue := variables[streamCursorVariableName]; okTargetVariableValue { - streamCursorInitialValue = targetVariableValue - } + if targetVariableValue, okTargetVariableValue := browserMessage.Payload.Variables[streamCursorVariableName]; okTargetVariableValue { + streamCursorInitialValue = targetVariableValue } } else { streamCursorInitialValue = matches[2] @@ -31,9 +32,28 @@ func GetStreamCursorPropsFromQuery(payload map[string]interface{}, query string) return streamCursorField, streamCursorVariableName, streamCursorInitialValue } -func GetLastStreamCursorValueFromReceivedMessage(messageAsMap map[string]interface{}, streamCursorField string) interface{} { +func GetLastStreamCursorValueFromReceivedMessage(message []byte, streamCursorField string) interface{} { + dataChecksum := crc32.ChecksumIEEE(message) + GlobalCacheLocks.Lock(dataChecksum) + + if streamCursorValueCache, streamCursorValueCacheExists := GetStreamCursorValueCache(dataChecksum); streamCursorValueCacheExists { + //Unlock immediately once the cache was already created by other routine + GlobalCacheLocks.Unlock(dataChecksum) + return streamCursorValueCache + } else { + //It will create the cache and then Unlock (others will wait to benefit from this cache) + defer GlobalCacheLocks.Unlock(dataChecksum) + } + var lastStreamCursorValue interface{} + var messageAsMap map[string]interface{} + err := json.Unmarshal(message, &messageAsMap) + if err != nil { + log.Errorf("failed to unmarshal message: %v", err) + //return + } + if payload, okPayload := messageAsMap["payload"].(map[string]interface{}); okPayload { if data, okData := payload["data"].(map[string]interface{}); okData { //Data will have only one prop, `range` because its name is unknown @@ -52,6 +72,7 @@ func GetLastStreamCursorValueFromReceivedMessage(messageAsMap map[string]interfa } } + StoreStreamCursorValueCache(dataChecksum, lastStreamCursorValue) return lastStreamCursorValue } @@ -60,76 +81,84 @@ func PatchQueryIncludingCursorField(originalQuery string, cursorField string) st return originalQuery } - lastIndex := strings.LastIndex(originalQuery, "{") - if lastIndex == -1 { + lastIndexOfTypename := LastButOneIndex(originalQuery, "}") + if lastIndexOfTypename == -1 { return originalQuery } - // It will include the cursorField at the beginning of the list of fields + // It will include the cursorField at the end of the list of fields // It's not a problem if the field be duplicated in the list, Hasura just ignore the second occurrence - return originalQuery[:lastIndex+1] + "\n " + cursorField + originalQuery[lastIndex+1:] + return originalQuery[:lastIndexOfTypename] + " " + cursorField + "\n " + originalQuery[lastIndexOfTypename:] } -func PatchQuerySettingLastCursorValue(subscription GraphQlSubscription) interface{} { - message := subscription.Message - payload, okPayload := message["payload"].(map[string]interface{}) - - if okPayload { - if subscription.StreamCursorVariableName != "" { - /**** This stream has its cursor value set through variables ****/ - if variables, okVariables := payload["variables"].(map[string]interface{}); okVariables { - if variables[subscription.StreamCursorVariableName] != subscription.StreamCursorCurrValue { - variables[subscription.StreamCursorVariableName] = subscription.StreamCursorCurrValue - payload["variables"] = variables - message["payload"] = payload - } - } - } else { - /**** This stream has its cursor value set through inline value (not variables) ****/ - query, okQuery := payload["query"].(string) - if okQuery { - cursorInitialValueRePattern := regexp.MustCompile(`cursor:\s*\{\s*initial_value\s*:\s*\{\s*([^:]+:\s*[^}]+)\s*}\s*}`) - newValue := "" - - replaceInitialValueFunc := func(match string) string { - switch v := subscription.StreamCursorCurrValue.(type) { - case string: - newValue = v - - //Append quotes if it is missing, it will be necessary when appending to the query - if !strings.HasPrefix(v, "\"") { - newValue = "\"" + newValue - } - if !strings.HasSuffix(v, "\"") { - newValue = newValue + "\"" - } - case int: - newValue = strconv.Itoa(v) - case float32: - myFloat64 := float64(v) - newValue = strconv.FormatFloat(myFloat64, 'f', -1, 32) - case float64: - newValue = strconv.FormatFloat(v, 'f', -1, 64) - default: - newValue = "" - } - - if newValue != "" { - replacement := subscription.StreamCursorField + ": " + newValue - return fmt.Sprintf("cursor: {initial_value: {%s}}", replacement) - } else { - return match - } - } - - newQuery := cursorInitialValueRePattern.ReplaceAllStringFunc(query, replaceInitialValueFunc) - if query != newQuery { - payload["query"] = newQuery - message["payload"] = payload - } - } - } +func PatchQuerySettingLastCursorValue(subscription GraphQlSubscription) []byte { + var browserMessage BrowserSubscribeMessage + err := json.Unmarshal(subscription.Message, &browserMessage) + if err != nil { + log.Errorf("failed to unmarshal message: %v", err) + return subscription.Message } - return message + if subscription.StreamCursorVariableName != "" { + /**** This stream has its cursor value set through variables ****/ + if browserMessage.Payload.Variables[subscription.StreamCursorVariableName] == subscription.StreamCursorCurrValue { + return subscription.Message + } + browserMessage.Payload.Variables[subscription.StreamCursorVariableName] = subscription.StreamCursorCurrValue + } else { + /**** This stream has its cursor value set through inline value (not variables) ****/ + cursorInitialValueRePattern := regexp.MustCompile(`cursor:\s*\{\s*initial_value\s*:\s*\{\s*([^:]+:\s*[^}]+)\s*}\s*}`) + newValue := "" + + replaceInitialValueFunc := func(match string) string { + switch v := subscription.StreamCursorCurrValue.(type) { + case string: + newValue = v + + //Append quotes if it is missing, it will be necessary when appending to the query + if !strings.HasPrefix(v, "\"") { + newValue = "\"" + newValue + } + if !strings.HasSuffix(v, "\"") { + newValue = newValue + "\"" + } + case int: + newValue = strconv.Itoa(v) + case float32: + myFloat64 := float64(v) + newValue = strconv.FormatFloat(myFloat64, 'f', -1, 32) + case float64: + newValue = strconv.FormatFloat(v, 'f', -1, 64) + default: + newValue = "" + } + + if newValue != "" { + replacement := subscription.StreamCursorField + ": " + newValue + return fmt.Sprintf("cursor: {initial_value: {%s}}", replacement) + } else { + return match + } + } + + newQuery := cursorInitialValueRePattern.ReplaceAllStringFunc(browserMessage.Payload.Query, replaceInitialValueFunc) + if browserMessage.Payload.Query == newQuery { + return subscription.Message + } + + browserMessage.Payload.Query = newQuery + } + + newMessageJson, _ := json.Marshal(browserMessage) + + return newMessageJson +} + +func LastButOneIndex(s, substr string) int { + last := strings.LastIndex(s, substr) + if last == -1 { + return -1 + } + + return strings.LastIndex(s[:last], substr) } diff --git a/bbb-graphql-middleware/internal/common/types.go b/bbb-graphql-middleware/internal/common/types.go index f28453c220..6488b9cb3f 100644 --- a/bbb-graphql-middleware/internal/common/types.go +++ b/bbb-graphql-middleware/internal/common/types.go @@ -2,6 +2,7 @@ package common import ( "context" + "encoding/json" "net/http" "sync" @@ -20,13 +21,13 @@ const ( type GraphQlSubscription struct { Id string - Message map[string]interface{} + Message []byte Type QueryType OperationName string StreamCursorField string StreamCursorVariableName string StreamCursorCurrValue interface{} - LastReceivedData []byte + LastReceivedData HasuraMessage LastReceivedDataChecksum uint32 JsonPatchSupported bool // indicate if client support Json Patch for this subscription LastSeenOnHasuraConnection string // id of the hasura connection that this query was active @@ -45,7 +46,7 @@ type BrowserConnection struct { BrowserRequestCookies []*http.Cookie ActiveSubscriptions map[string]GraphQlSubscription // active subscriptions of this connection (start, but no stop) ActiveSubscriptionsMutex sync.RWMutex // mutex to control the map usage - ConnectionInitMessage map[string]interface{} // init message received in this connection (to be used on hasura reconnect) + ConnectionInitMessage []byte // init message received in this connection (to be used on hasura reconnect) HasuraConnection *HasuraConnection // associated hasura connection Disconnected bool // indicate if the connection is gone ConnAckSentToBrowser bool // indicate if `connection_ack` msg was already sent to the browser @@ -62,3 +63,22 @@ type HasuraConnection struct { ContextCancelFunc context.CancelFunc // function to cancel the hasura context (and so, the hasura connection) FreezeMsgFromBrowserChan *SafeChannel // indicate that it's waiting for the return of mutations before closing connection } + +type HasuraMessage struct { + Type string `json:"type"` + ID string `json:"id"` + Payload struct { + Data map[string]json.RawMessage `json:"data"` + } `json:"payload"` +} + +type BrowserSubscribeMessage struct { + Type string `json:"type"` + ID string `json:"id"` + Payload struct { + Extensions map[string]interface{} `json:"extensions"` + OperationName string `json:"operationName"` + Query string `json:"query"` + Variables map[string]interface{} `json:"variables"` + } `json:"payload"` +} diff --git a/bbb-graphql-middleware/internal/gql_actions/client.go b/bbb-graphql-middleware/internal/gql_actions/client.go index 6669bb3c33..3d79609bf0 100644 --- a/bbb-graphql-middleware/internal/gql_actions/client.go +++ b/bbb-graphql-middleware/internal/gql_actions/client.go @@ -17,8 +17,8 @@ var graphqlActionsUrl = os.Getenv("BBB_GRAPHQL_MIDDLEWARE_GRAPHQL_ACTIONS_URL") func GraphqlActionsClient( browserConnection *common.BrowserConnection, - fromBrowserToGqlActionsChannel *common.SafeChannel, - fromHasuraToBrowserChannel *common.SafeChannel) error { + fromBrowserToGqlActionsChannel *common.SafeChannelByte, + fromHasuraToBrowserChannel *common.SafeChannelByte) error { log := log.WithField("_routine", "GraphqlActionsClient").WithField("browserConnectionId", browserConnection.Id) log.Debug("Starting GraphqlActionsClient") @@ -38,19 +38,19 @@ RangeLoop: continue } - var fromBrowserMessageAsMap = fromBrowserMessage.(map[string]interface{}) - - if fromBrowserMessageAsMap["type"] == "subscribe" { - queryId := fromBrowserMessageAsMap["id"].(string) - payload := fromBrowserMessageAsMap["payload"].(map[string]interface{}) + var browserMessage common.BrowserSubscribeMessage + err := json.Unmarshal(fromBrowserMessage, &browserMessage) + if err != nil { + log.Errorf("failed to unmarshal message: %v", err) + continue + } + if browserMessage.Type == "subscribe" { var errorMessage string var mutationFuncName string - query, okQuery := payload["query"].(string) - variables, okVariables := payload["variables"].(map[string]interface{}) - if okQuery && okVariables && strings.HasPrefix(query, "mutation") { - if funcName, inputs, err := parseGraphQLMutation(query, variables); err == nil { + if strings.HasPrefix(browserMessage.Payload.Query, "mutation") { + if funcName, inputs, err := parseGraphQLMutation(browserMessage.Payload.Query, browserMessage.Payload.Variables); err == nil { mutationFuncName = funcName if err = SendGqlActionsRequest(funcName, inputs, browserConnection.BBBWebSessionVariables); err == nil { } else { @@ -66,7 +66,7 @@ RangeLoop: if errorMessage != "" { //Error on sending action, return error msg to client browserResponseData := map[string]interface{}{ - "id": queryId, + "id": browserMessage.ID, "type": "error", "payload": []interface{}{ map[string]interface{}{ @@ -74,12 +74,12 @@ RangeLoop: }, }, } - - fromHasuraToBrowserChannel.Send(browserResponseData) + jsonData, _ := json.Marshal(browserResponseData) + fromHasuraToBrowserChannel.Send(jsonData) } else { //Action sent successfully, return data msg to client browserResponseData := map[string]interface{}{ - "id": queryId, + "id": browserMessage.ID, "type": "next", "payload": map[string]interface{}{ "data": map[string]interface{}{ @@ -87,15 +87,17 @@ RangeLoop: }, }, } - fromHasuraToBrowserChannel.Send(browserResponseData) + jsonData, _ := json.Marshal(browserResponseData) + fromHasuraToBrowserChannel.Send(jsonData) } //Return complete msg to client browserResponseComplete := map[string]interface{}{ - "id": queryId, + "id": browserMessage.ID, "type": "complete", } - fromHasuraToBrowserChannel.Send(browserResponseComplete) + jsonData, _ := json.Marshal(browserResponseComplete) + fromHasuraToBrowserChannel.Send(jsonData) } //Fallback to Hasura was disabled (keeping the code temporarily) diff --git a/bbb-graphql-middleware/internal/hasura/client.go b/bbb-graphql-middleware/internal/hasura/client.go index 2a0078896e..91cf6de71c 100644 --- a/bbb-graphql-middleware/internal/hasura/client.go +++ b/bbb-graphql-middleware/internal/hasura/client.go @@ -24,8 +24,8 @@ var hasuraEndpoint = os.Getenv("BBB_GRAPHQL_MIDDLEWARE_HASURA_WS") // Hasura client connection func HasuraClient( browserConnection *common.BrowserConnection, - fromBrowserToHasuraChannel *common.SafeChannel, - fromHasuraToBrowserChannel *common.SafeChannel) error { + fromBrowserToHasuraChannel *common.SafeChannelByte, + fromHasuraToBrowserChannel *common.SafeChannelByte) error { log := log.WithField("_routine", "HasuraClient").WithField("browserConnectionId", browserConnection.Id) common.ActivitiesOverviewStarted("__HasuraConnection") defer common.ActivitiesOverviewCompleted("__HasuraConnection") diff --git a/bbb-graphql-middleware/internal/hasura/conn/reader/reader.go b/bbb-graphql-middleware/internal/hasura/conn/reader/reader.go index b4b95866b8..de14fcad46 100644 --- a/bbb-graphql-middleware/internal/hasura/conn/reader/reader.go +++ b/bbb-graphql-middleware/internal/hasura/conn/reader/reader.go @@ -1,26 +1,21 @@ package reader import ( + "bytes" "context" "encoding/json" "errors" - "fmt" "github.com/iMDT/bbb-graphql-middleware/internal/common" "github.com/iMDT/bbb-graphql-middleware/internal/hasura/retransmiter" "github.com/iMDT/bbb-graphql-middleware/internal/msgpatch" log "github.com/sirupsen/logrus" "hash/crc32" "nhooyr.io/websocket" - "nhooyr.io/websocket/wsjson" "sync" ) // HasuraConnectionReader consumes messages from Hasura connection and add send to the browser channel -func HasuraConnectionReader( - hc *common.HasuraConnection, - fromHasuraToBrowserChannel *common.SafeChannel, - fromBrowserToHasuraChannel *common.SafeChannel, - wg *sync.WaitGroup) { +func HasuraConnectionReader(hc *common.HasuraConnection, fromHasuraToBrowserChannel *common.SafeChannelByte, fromBrowserToHasuraChannel *common.SafeChannelByte, wg *sync.WaitGroup) { log := log.WithField("_routine", "HasuraConnectionReader").WithField("browserConnectionId", hc.BrowserConn.Id).WithField("hasuraConnectionId", hc.Id) defer log.Debugf("finished") log.Debugf("starting") @@ -29,10 +24,7 @@ func HasuraConnectionReader( defer hc.ContextCancelFunc() for { - // Read a message from hasura - var message interface{} - err := wsjson.Read(hc.Context, hc.Websocket, &message) - + messageType, message, err := hc.Websocket.Read(hc.Context) var closeError *websocket.CloseError if err != nil { @@ -58,113 +50,131 @@ func HasuraConnectionReader( return } + if messageType != websocket.MessageText { + log.Warnf("received non-text message: %v", messageType) + continue + } + log.Tracef("received from hasura: %v", message) handleMessageReceivedFromHasura(hc, fromHasuraToBrowserChannel, fromBrowserToHasuraChannel, message) } } -func handleMessageReceivedFromHasura(hc *common.HasuraConnection, fromHasuraToBrowserChannel *common.SafeChannel, fromBrowserToHasuraChannel *common.SafeChannel, message interface{}) { - var messageMap = message.(map[string]interface{}) +var QueryIdPlaceholderInBytes = []byte("--------------QUERY-ID--------------") //36 chars - if messageMap != nil { - var messageType = messageMap["type"] - var queryId, _ = messageMap["id"].(string) +func handleMessageReceivedFromHasura(hc *common.HasuraConnection, fromHasuraToBrowserChannel *common.SafeChannelByte, fromBrowserToHasuraChannel *common.SafeChannelByte, message []byte) { + type HasuraMessageInfo struct { + Type string `json:"type"` + ID string `json:"id"` + } + var hasuraMessageInfo HasuraMessageInfo + err := json.Unmarshal(message, &hasuraMessageInfo) + if err != nil { + log.Errorf("failed to unmarshal message: %v", err) + return + } - //Check if subscription is still active! - if queryId != "" { - hc.BrowserConn.ActiveSubscriptionsMutex.RLock() - subscription, ok := hc.BrowserConn.ActiveSubscriptions[queryId] - hc.BrowserConn.ActiveSubscriptionsMutex.RUnlock() - if !ok { - log.Debugf("Subscription with Id %s doesn't exist anymore, skipping response.", queryId) + queryIdReplacementApplied := false + queryIdInBytes := []byte(hasuraMessageInfo.ID) + + //Check if subscription is still active! + if hasuraMessageInfo.ID != "" { + hc.BrowserConn.ActiveSubscriptionsMutex.RLock() + subscription, ok := hc.BrowserConn.ActiveSubscriptions[hasuraMessageInfo.ID] + hc.BrowserConn.ActiveSubscriptionsMutex.RUnlock() + if !ok { + log.Debugf("Subscription with Id %s doesn't exist anymore, skipping response.", hasuraMessageInfo.ID) + return + } + + //When Hasura send msg type "complete", this query is finished + if hasuraMessageInfo.Type == "complete" { + handleCompleteMessage(hc, hasuraMessageInfo.ID) + common.ActivitiesOverviewCompleted(string(subscription.Type) + "-" + subscription.OperationName) + common.ActivitiesOverviewCompleted("_Sum-" + string(subscription.Type)) + } + + if hasuraMessageInfo.Type == "next" { + common.ActivitiesOverviewDataReceived(string(subscription.Type) + "-" + subscription.OperationName) + } + + if hasuraMessageInfo.Type == "next" && + subscription.Type == common.Subscription { + + //Remove queryId from message + message = bytes.Replace(message, queryIdInBytes, QueryIdPlaceholderInBytes, 1) + queryIdReplacementApplied = true + + isDifferentFromPreviousMessage := handleSubscriptionMessage(hc, &message, subscription, hasuraMessageInfo.ID) + + //Stop processing case it is the same message (probably is a reconnection with Hasura) + if !isDifferentFromPreviousMessage { return } - - //When Hasura send msg type "complete", this query is finished - if messageType == "complete" { - handleCompleteMessage(hc, queryId) - common.ActivitiesOverviewCompleted(string(subscription.Type) + "-" + subscription.OperationName) - common.ActivitiesOverviewCompleted("_Sum-" + string(subscription.Type)) - } - - if messageType == "next" { - common.ActivitiesOverviewDataReceived(string(subscription.Type) + "-" + subscription.OperationName) - } - - if messageType == "next" && - subscription.Type == common.Subscription { - hasNoPreviousOccurrence := handleSubscriptionMessage(hc, messageMap, subscription, queryId) - - if !hasNoPreviousOccurrence { - return - } - } - - //Set last cursor value for stream - if subscription.Type == common.Streaming { - handleStreamingMessage(hc, messageMap, subscription, queryId) - } } - // Retransmit the subscription start commands when hasura confirms the connection - // this is useful in case of a connection invalidation - if messageType == "connection_ack" { - handleConnectionAckMessage(hc, messageMap, fromHasuraToBrowserChannel, fromBrowserToHasuraChannel) - } else { - // Forward the message to browser - fromHasuraToBrowserChannel.Send(messageMap) + //Set last cursor value for stream + if subscription.Type == common.Streaming { + //Remove queryId from message + messageWithoutId := bytes.Replace(message, queryIdInBytes, QueryIdPlaceholderInBytes, 1) + + handleStreamingMessage(hc, messageWithoutId, subscription, hasuraMessageInfo.ID) } } + + // Retransmit the subscription start commands when hasura confirms the connection + // this is useful in case of a connection invalidation + if hasuraMessageInfo.Type == "connection_ack" { + handleConnectionAckMessage(hc, message, fromHasuraToBrowserChannel, fromBrowserToHasuraChannel) + } else { + if queryIdReplacementApplied { + message = bytes.Replace(message, QueryIdPlaceholderInBytes, queryIdInBytes, 1) + } + + // Forward the message to browser + fromHasuraToBrowserChannel.Send(message) + } } -func handleSubscriptionMessage(hc *common.HasuraConnection, messageMap map[string]interface{}, subscription common.GraphQlSubscription, queryId string) bool { - if payload, okPayload := messageMap["payload"].(map[string]interface{}); okPayload { - if data, okData := payload["data"].(map[string]interface{}); okData { - for dataKey, dataItem := range data { - if currentDataProp, okCurrentDataProp := dataItem.([]interface{}); okCurrentDataProp { - if dataAsJson, err := json.Marshal(currentDataProp); err == nil { - if common.ActivitiesOverviewEnabled { - dataSize := len(string(dataAsJson)) - dataCount := len(currentDataProp) - common.ActivitiesOverviewDataSize(string(subscription.Type)+"-"+subscription.OperationName, int64(dataSize), int64(dataCount)) - } +func handleSubscriptionMessage(hc *common.HasuraConnection, message *[]byte, subscription common.GraphQlSubscription, queryId string) bool { + if common.ActivitiesOverviewEnabled { + dataSize := len(string(*message)) + common.ActivitiesOverviewDataSize(string(subscription.Type)+"-"+subscription.OperationName, int64(dataSize), 0) + } - //Check whether ReceivedData is different from the LastReceivedData - //Otherwise stop forwarding this message - dataChecksum := crc32.ChecksumIEEE(dataAsJson) - if subscription.LastReceivedDataChecksum == dataChecksum { - return false - } + dataChecksum, messageDataKey, messageData := getHasuraMessage(*message) - lastDataChecksumWas := subscription.LastReceivedDataChecksum - lastDataAsJsonWas := subscription.LastReceivedData - cacheKey := fmt.Sprintf("%s-%s-%v-%v", string(subscription.Type), subscription.OperationName, subscription.LastReceivedDataChecksum, dataChecksum) + //Check whether ReceivedData is different from the LastReceivedData + //Otherwise stop forwarding this message + if subscription.LastReceivedDataChecksum == dataChecksum { + return false + } - //Store LastReceivedData Checksum - if msgpatch.RawDataCacheStorageMode == "memory" { - subscription.LastReceivedData = dataAsJson - } - subscription.LastReceivedDataChecksum = dataChecksum - hc.BrowserConn.ActiveSubscriptionsMutex.Lock() - hc.BrowserConn.ActiveSubscriptions[queryId] = subscription - hc.BrowserConn.ActiveSubscriptionsMutex.Unlock() + lastDataChecksumWas := subscription.LastReceivedDataChecksum + lastReceivedDataWas := subscription.LastReceivedData + //cacheKey := fmt.Sprintf("%v-%v", subscription.LastReceivedDataChecksum, dataChecksum) + cacheKey := subscription.LastReceivedDataChecksum | dataChecksum - //Apply msg patch when it supports it - if subscription.JsonPatchSupported { - msgpatch.PatchMessage(&messageMap, queryId, dataKey, lastDataAsJsonWas, dataAsJson, hc.BrowserConn.Id, hc.BrowserConn.SessionToken, cacheKey, lastDataChecksumWas, dataChecksum) - } - } - } - } - } + //Store LastReceivedData Checksum + if msgpatch.RawDataCacheStorageMode == "memory" { + subscription.LastReceivedData = messageData + } + subscription.LastReceivedDataChecksum = dataChecksum + hc.BrowserConn.ActiveSubscriptionsMutex.Lock() + hc.BrowserConn.ActiveSubscriptions[queryId] = subscription + hc.BrowserConn.ActiveSubscriptionsMutex.Unlock() + + //Apply msg patch when it supports it + if subscription.JsonPatchSupported { + *message = msgpatch.GetPatchedMessage(*message, queryId, messageDataKey, lastReceivedDataWas, messageData, hc.BrowserConn.Id, hc.BrowserConn.SessionToken, cacheKey, lastDataChecksumWas, dataChecksum) } return true } -func handleStreamingMessage(hc *common.HasuraConnection, messageMap map[string]interface{}, subscription common.GraphQlSubscription, queryId string) { - lastCursor := common.GetLastStreamCursorValueFromReceivedMessage(messageMap, subscription.StreamCursorField) +func handleStreamingMessage(hc *common.HasuraConnection, message []byte, subscription common.GraphQlSubscription, queryId string) { + lastCursor := common.GetLastStreamCursorValueFromReceivedMessage(message, subscription.StreamCursorField) if lastCursor != nil && subscription.StreamCursorCurrValue != lastCursor { subscription.StreamCursorCurrValue = lastCursor @@ -183,16 +193,45 @@ func handleCompleteMessage(hc *common.HasuraConnection, queryId string) { log.Debugf("%s (%s) with Id %s finished by Hasura.", queryType, operationName, queryId) } -func handleConnectionAckMessage(hc *common.HasuraConnection, messageMap map[string]interface{}, fromHasuraToBrowserChannel *common.SafeChannel, fromBrowserToHasuraChannel *common.SafeChannel) { +func handleConnectionAckMessage(hc *common.HasuraConnection, message []byte, fromHasuraToBrowserChannel *common.SafeChannelByte, fromBrowserToHasuraChannel *common.SafeChannelByte) { log.Debugf("Received connection_ack") //Hasura connection was initialized, now it's able to send new messages to Hasura fromBrowserToHasuraChannel.UnfreezeChannel() //Avoid to send `connection_ack` to the browser when it's a reconnection if hc.BrowserConn.ConnAckSentToBrowser == false { - fromHasuraToBrowserChannel.Send(messageMap) + fromHasuraToBrowserChannel.Send(message) hc.BrowserConn.ConnAckSentToBrowser = true } go retransmiter.RetransmitSubscriptionStartMessages(hc, fromBrowserToHasuraChannel) } + +func getHasuraMessage(message []byte) (uint32, string, common.HasuraMessage) { + dataChecksum := crc32.ChecksumIEEE(message) + + common.GlobalCacheLocks.Lock(dataChecksum) + dataKey, hasuraMessage, dataMapExists := common.GetHasuraMessageCache(dataChecksum) + if dataMapExists { + //Unlock immediately once the cache was already created by other routine + common.GlobalCacheLocks.Unlock(dataChecksum) + return dataChecksum, dataKey, hasuraMessage + } else { + //It will create the cache and then Unlock (others will wait to benefit from this cache) + defer common.GlobalCacheLocks.Unlock(dataChecksum) + } + + err := json.Unmarshal(message, &hasuraMessage) + if err != nil { + log.Fatalf("Error unmarshalling JSON: %v", err) + } + + for key := range hasuraMessage.Payload.Data { + dataKey = key + break + } + + common.StoreHasuraMessageCache(dataChecksum, dataKey, hasuraMessage) + + return dataChecksum, dataKey, hasuraMessage +} diff --git a/bbb-graphql-middleware/internal/hasura/conn/writer/writer.go b/bbb-graphql-middleware/internal/hasura/conn/writer/writer.go index 61135f57c7..daf3039844 100644 --- a/bbb-graphql-middleware/internal/hasura/conn/writer/writer.go +++ b/bbb-graphql-middleware/internal/hasura/conn/writer/writer.go @@ -2,11 +2,12 @@ package writer import ( "context" + "encoding/json" "errors" "github.com/iMDT/bbb-graphql-middleware/internal/common" "github.com/iMDT/bbb-graphql-middleware/internal/msgpatch" log "github.com/sirupsen/logrus" - "nhooyr.io/websocket/wsjson" + "nhooyr.io/websocket" "os" "strings" "sync" @@ -14,7 +15,7 @@ import ( // HasuraConnectionWriter // process messages (middleware to hasura) -func HasuraConnectionWriter(hc *common.HasuraConnection, fromBrowserToHasuraChannel *common.SafeChannel, wg *sync.WaitGroup, initMessage map[string]interface{}) { +func HasuraConnectionWriter(hc *common.HasuraConnection, fromBrowserToHasuraChannel *common.SafeChannelByte, wg *sync.WaitGroup, initMessage []byte) { log := log.WithField("_routine", "HasuraConnectionWriter") browserConnection := hc.BrowserConn @@ -33,7 +34,7 @@ func HasuraConnectionWriter(hc *common.HasuraConnection, fromBrowserToHasuraChan } //Send init connection message to Hasura to start - err := wsjson.Write(hc.Context, hc.Websocket, initMessage) + err := hc.Websocket.Write(hc.Context, websocket.MessageText, initMessage) if err != nil { log.Errorf("error on write authentication (init) message (we're disconnected from hasura): %v", err) return @@ -56,10 +57,17 @@ RangeLoop: continue } - var fromBrowserMessageAsMap = fromBrowserMessage.(map[string]interface{}) + //var fromBrowserMessageAsMap = fromBrowserMessage.(map[string]interface{}) - if fromBrowserMessageAsMap["type"] == "subscribe" { - var queryId = fromBrowserMessageAsMap["id"].(string) + var browserMessage common.BrowserSubscribeMessage + err := json.Unmarshal(fromBrowserMessage, &browserMessage) + if err != nil { + log.Errorf("failed to unmarshal message: %v", err) + return + } + + if browserMessage.Type == "subscribe" { + var queryId = browserMessage.ID //Identify type based on query string messageType := common.Query @@ -67,26 +75,25 @@ RangeLoop: streamCursorField := "" streamCursorVariableName := "" var streamCursorInitialValue interface{} - payload := fromBrowserMessageAsMap["payload"].(map[string]interface{}) - operationName, ok := payload["operationName"].(string) + //payload := browserMessage.Payload + //operationName := browserMessage.Payload.OperationName - query, ok := payload["query"].(string) - if ok { + query := browserMessage.Payload.Query + if query != "" { if strings.HasPrefix(query, "subscription") { - //Validate if subscription is allowed if allowedSubscriptions := os.Getenv("BBB_GRAPHQL_MIDDLEWARE_ALLOWED_SUBSCRIPTIONS"); allowedSubscriptions != "" { allowedSubscriptionsSlice := strings.Split(allowedSubscriptions, ",") subscriptionAllowed := false for _, s := range allowedSubscriptionsSlice { - if s == operationName { + if s == browserMessage.Payload.OperationName { subscriptionAllowed = true break } } if !subscriptionAllowed { - log.Infof("Subscription %s not allowed!", operationName) + log.Infof("Subscription %s not allowed!", browserMessage.Payload.OperationName) continue } } @@ -96,14 +103,14 @@ RangeLoop: deniedSubscriptionsSlice := strings.Split(deniedSubscriptions, ",") subscriptionAllowed := true for _, s := range deniedSubscriptionsSlice { - if s == operationName { + if s == browserMessage.Payload.OperationName { subscriptionAllowed = false break } } if !subscriptionAllowed { - log.Infof("Subscription %s not allowed!", operationName) + log.Infof("Subscription %s not allowed!", browserMessage.Payload.OperationName) continue } } @@ -119,14 +126,15 @@ RangeLoop: if strings.Contains(query, "_stream(") && strings.Contains(query, "cursor: {") { messageType = common.Streaming - if !queryIdExists { - streamCursorField, streamCursorVariableName, streamCursorInitialValue = common.GetStreamCursorPropsFromQuery(payload, query) + streamCursorField, streamCursorVariableName, streamCursorInitialValue = common.GetStreamCursorPropsFromBrowserMessage(browserMessage) //It's necessary to assure the cursor field will return in the result of the query //To be able to store the last received cursor value - payload["query"] = common.PatchQueryIncludingCursorField(query, streamCursorField) - fromBrowserMessageAsMap["payload"] = payload + browserMessage.Payload.Query = common.PatchQueryIncludingCursorField(query, streamCursorField) + + newMessageJson, _ := json.Marshal(browserMessage) + fromBrowserMessage = newMessageJson } } @@ -143,7 +151,7 @@ RangeLoop: //Identify if the client that requested this subscription expects to receive json-patch //Client append `Patched_` to the query operationName to indicate that it supports jsonPatchSupported := false - if ok && strings.HasPrefix(operationName, "Patched_") { + if strings.HasPrefix(browserMessage.Payload.OperationName, "Patched_") { jsonPatchSupported = true } if jsonPatchDisabled := os.Getenv("BBB_GRAPHQL_MIDDLEWARE_JSON_PATCH_DISABLED"); jsonPatchDisabled != "" { @@ -153,8 +161,8 @@ RangeLoop: browserConnection.ActiveSubscriptionsMutex.Lock() browserConnection.ActiveSubscriptions[queryId] = common.GraphQlSubscription{ Id: queryId, - Message: fromBrowserMessageAsMap, - OperationName: operationName, + Message: fromBrowserMessage, + OperationName: browserMessage.Payload.OperationName, StreamCursorField: streamCursorField, StreamCursorVariableName: streamCursorVariableName, StreamCursorCurrValue: streamCursorInitialValue, @@ -166,7 +174,7 @@ RangeLoop: // log.Tracef("Current queries: %v", browserConnection.ActiveSubscriptions) browserConnection.ActiveSubscriptionsMutex.Unlock() - common.ActivitiesOverviewStarted(string(messageType) + "-" + operationName) + common.ActivitiesOverviewStarted(string(messageType) + "-" + browserMessage.Payload.OperationName) common.ActivitiesOverviewStarted("_Sum-" + string(messageType)) //Dump of all subscriptions for analysis purpose @@ -174,36 +182,35 @@ RangeLoop: //saveItToFile(fmt.Sprintf("%s-%s-%02s", string(messageType), operationName, queryId), fromBrowserMessageAsMap) } - if fromBrowserMessageAsMap["type"] == "complete" { - var queryId = fromBrowserMessageAsMap["id"].(string) + if browserMessage.Type == "complete" { browserConnection.ActiveSubscriptionsMutex.RLock() - jsonPatchSupported := browserConnection.ActiveSubscriptions[queryId].JsonPatchSupported + jsonPatchSupported := browserConnection.ActiveSubscriptions[browserMessage.ID].JsonPatchSupported //Remove subscriptions from ActivitiesOverview here once Hasura-Reader will ignore "complete" msg for them - common.ActivitiesOverviewCompleted(string(browserConnection.ActiveSubscriptions[queryId].Type) + "-" + browserConnection.ActiveSubscriptions[queryId].OperationName) - common.ActivitiesOverviewCompleted("_Sum-" + string(browserConnection.ActiveSubscriptions[queryId].Type)) + common.ActivitiesOverviewCompleted(string(browserConnection.ActiveSubscriptions[browserMessage.ID].Type) + "-" + browserConnection.ActiveSubscriptions[browserMessage.ID].OperationName) + common.ActivitiesOverviewCompleted("_Sum-" + string(browserConnection.ActiveSubscriptions[browserMessage.ID].Type)) browserConnection.ActiveSubscriptionsMutex.RUnlock() if jsonPatchSupported { - msgpatch.RemoveConnSubscriptionCacheFile(browserConnection.Id, browserConnection.SessionToken, queryId) + msgpatch.RemoveConnSubscriptionCacheFile(browserConnection.Id, browserConnection.SessionToken, browserMessage.ID) } browserConnection.ActiveSubscriptionsMutex.Lock() - delete(browserConnection.ActiveSubscriptions, queryId) + delete(browserConnection.ActiveSubscriptions, browserMessage.ID) // log.Tracef("Current queries: %v", browserConnection.ActiveSubscriptions) browserConnection.ActiveSubscriptionsMutex.Unlock() } - if fromBrowserMessageAsMap["type"] == "connection_init" { + if browserMessage.Type == "connection_init" { //browserConnection.ConnectionInitMessage = fromBrowserMessageAsMap //Skip message once it is handled by ConnInitHandler already continue } - log.Tracef("sending to hasura: %v", fromBrowserMessageAsMap) - err := wsjson.Write(hc.Context, hc.Websocket, fromBrowserMessageAsMap) - if err != nil { - if !errors.Is(err, context.Canceled) { - log.Errorf("error on write (we're disconnected from hasura): %v", err) + log.Tracef("sending to hasura: %v", fromBrowserMessage) + errWrite := hc.Websocket.Write(hc.Context, websocket.MessageText, fromBrowserMessage) + if errWrite != nil { + if !errors.Is(errWrite, context.Canceled) { + log.Errorf("error on write (we're disconnected from hasura): %v", errWrite) } return } diff --git a/bbb-graphql-middleware/internal/hasura/retransmiter/retransmiter.go b/bbb-graphql-middleware/internal/hasura/retransmiter/retransmiter.go index 221ea5ad47..7dd9dd4735 100644 --- a/bbb-graphql-middleware/internal/hasura/retransmiter/retransmiter.go +++ b/bbb-graphql-middleware/internal/hasura/retransmiter/retransmiter.go @@ -5,7 +5,7 @@ import ( log "github.com/sirupsen/logrus" ) -func RetransmitSubscriptionStartMessages(hc *common.HasuraConnection, fromBrowserToHasuraChannel *common.SafeChannel) { +func RetransmitSubscriptionStartMessages(hc *common.HasuraConnection, fromBrowserToHasuraChannel *common.SafeChannelByte) { log := log.WithField("_routine", "RetransmitSubscriptionStartMessages").WithField("browserConnectionId", hc.BrowserConn.Id).WithField("hasuraConnectionId", hc.Id) hc.BrowserConn.ActiveSubscriptionsMutex.RLock() diff --git a/bbb-graphql-middleware/internal/msgpatch/jsonpatch.go b/bbb-graphql-middleware/internal/msgpatch/jsonpatch.go index 942f25fd52..1825666d26 100644 --- a/bbb-graphql-middleware/internal/msgpatch/jsonpatch.go +++ b/bbb-graphql-middleware/internal/msgpatch/jsonpatch.go @@ -8,7 +8,7 @@ import ( "io/ioutil" "os" "path/filepath" - "strings" + "strconv" ) var cacheDir = os.TempDir() + "/graphql-middleware-cache/" @@ -137,79 +137,78 @@ func fileExists(filename string) bool { return !os.IsNotExist(err) } -func PatchMessage( - receivedMessage *map[string]interface{}, +func GetPatchedMessage( + receivedMessage []byte, subscriptionId string, dataKey string, - lastDataAsJson []byte, - dataAsJson []byte, + lastHasuraMessage common.HasuraMessage, + hasuraMessage common.HasuraMessage, browserConnectionId string, browserSessionToken string, - cacheKey string, + cacheKey uint32, lastDataChecksum uint32, - currDataChecksum uint32) { + currDataChecksum uint32) []byte { if lastDataChecksum != 0 { - common.JsonPatchBenchmarkingStarted(cacheKey) - defer common.JsonPatchBenchmarkingCompleted(cacheKey) + common.JsonPatchBenchmarkingStarted(strconv.Itoa(int(cacheKey))) + defer common.JsonPatchBenchmarkingCompleted(strconv.Itoa(int(cacheKey))) } - //Avoid other routines from processing the same JsonPatch + //Lock to avoid other routines from processing the same message common.GlobalCacheLocks.Lock(cacheKey) - jsonDiffPatch, jsonDiffPatchExists := common.GetJsonPatchCache(cacheKey) - if jsonDiffPatchExists { + if patchedMessageCache, patchedMessageCacheExists := common.GetPatchedMessageCache(cacheKey); patchedMessageCacheExists { //Unlock immediately once the cache was already created by other routine common.GlobalCacheLocks.Unlock(cacheKey) + return patchedMessageCache } else { //It will create the cache and then Unlock (others will wait to benefit from this cache) defer common.GlobalCacheLocks.Unlock(cacheKey) } - var receivedMessageMap = *receivedMessage + var jsonDiffPatch []byte - if !jsonDiffPatchExists { - if currDataChecksum == lastDataChecksum { - //Content didn't change, set message as null to avoid sending it to the browser - //This case is usual when the middleware reconnects with Hasura and receives the data again - *receivedMessage = nil - } else { - //Content was changed, creating json patch - //If data is small (< minLengthToPatch) it's not worth creating the patch - if len(string(dataAsJson)) > minLengthToPatch { - if lastContent, lastContentErr := GetRawDataCache(browserConnectionId, browserSessionToken, subscriptionId, dataKey, lastDataAsJson); lastContentErr == nil && string(lastContent) != "" { - //Temporarily use CustomPatch only for UserList (testing feature) - if strings.HasPrefix(cacheKey, "subscription-Patched_UserListSubscription") && - common.ValidateIfShouldUseCustomJsonPatch(lastContent, dataAsJson, "userId") { - jsonDiffPatch = common.CreateJsonPatch(lastContent, dataAsJson, "userId") - common.StoreJsonPatchCache(cacheKey, jsonDiffPatch) - } else if diffPatch, diffPatchErr := jsonpatch.CreatePatch(lastContent, dataAsJson); diffPatchErr == nil { - var err error - if jsonDiffPatch, err = json.Marshal(diffPatch); err == nil { - common.StoreJsonPatchCache(cacheKey, jsonDiffPatch) - } else { - log.Errorf("Error marshaling patch array: %v", err) - } - } else { - log.Errorf("Error creating JSON patch: %v\n%v", diffPatchErr, string(dataAsJson)) + if currDataChecksum == lastDataChecksum { + //Content didn't change, set message as null to avoid sending it to the browser + //This case is usual when the middleware reconnects with Hasura and receives the data again + jsonData, _ := json.Marshal(nil) + common.StorePatchedMessageCache(cacheKey, jsonData) + return jsonData + } else { + //Content was changed, creating json patch + //If data is small (< minLengthToPatch) it's not worth creating the patch + if len(hasuraMessage.Payload.Data[dataKey]) > minLengthToPatch { + if lastContent, lastContentErr := GetRawDataCache(browserConnectionId, browserSessionToken, subscriptionId, dataKey, lastHasuraMessage.Payload.Data[dataKey]); lastContentErr == nil && string(lastContent) != "" { + var shouldUseCustomJsonPatch bool + if shouldUseCustomJsonPatch, jsonDiffPatch = common.ValidateIfShouldUseCustomJsonPatch(lastContent, hasuraMessage.Payload.Data[dataKey], "userId"); shouldUseCustomJsonPatch { + common.StorePatchedMessageCache(cacheKey, jsonDiffPatch) + } else if diffPatch, diffPatchErr := jsonpatch.CreatePatch(lastContent, hasuraMessage.Payload.Data[dataKey]); diffPatchErr == nil { + var err error + if jsonDiffPatch, err = json.Marshal(diffPatch); err != nil { + log.Errorf("Error marshaling patch array: %v", err) } + } else { + log.Errorf("Error creating JSON patch: %v\n%v", diffPatchErr, string(hasuraMessage.Payload.Data[dataKey])) } } } } //Use patch if the length is {minShrinkToUsePatch}% smaller than the original msg - if jsonDiffPatch != nil && float64(len(string(jsonDiffPatch)))/float64(len(string(dataAsJson))) < minShrinkToUsePatch { + if jsonDiffPatch != nil && float64(len(string(jsonDiffPatch)))/float64(len(string(hasuraMessage.Payload.Data[dataKey]))) < minShrinkToUsePatch { //Modify receivedMessage to include the Patch and remove the previous data //The key of the original message is kept to avoid errors (Apollo-client expects to receive this prop) - receivedMessageMap["payload"] = map[string]interface{}{ - "data": map[string]interface{}{ - "patch": json.RawMessage(jsonDiffPatch), - dataKey: json.RawMessage("[]"), - }, + + hasuraMessage.Payload.Data = map[string]json.RawMessage{ + "patch": jsonDiffPatch, + dataKey: json.RawMessage("[]"), } - *receivedMessage = receivedMessageMap + hasuraMessageJson, _ := json.Marshal(hasuraMessage) + receivedMessage = hasuraMessageJson } //Store current result to be used to create json patch in the future - StoreRawDataCache(browserConnectionId, browserSessionToken, subscriptionId, dataKey, dataAsJson) + StoreRawDataCache(browserConnectionId, browserSessionToken, subscriptionId, dataKey, hasuraMessage.Payload.Data[dataKey]) + + common.StorePatchedMessageCache(cacheKey, receivedMessage) + return receivedMessage } diff --git a/bbb-graphql-middleware/internal/websrv/connhandler.go b/bbb-graphql-middleware/internal/websrv/connhandler.go index 3bae3515e0..8d856d8071 100644 --- a/bbb-graphql-middleware/internal/websrv/connhandler.go +++ b/bbb-graphql-middleware/internal/websrv/connhandler.go @@ -1,7 +1,9 @@ package websrv import ( + "bytes" "context" + "encoding/json" "fmt" "github.com/iMDT/bbb-graphql-middleware/internal/akka_apps" "github.com/iMDT/bbb-graphql-middleware/internal/bbb_web" @@ -95,10 +97,10 @@ func ConnectionHandler(w http.ResponseWriter, r *http.Request) { log.Infof("connection accepted") // Create channels - fromBrowserToHasuraConnectionEstablishingChannel := common.NewSafeChannel(bufferSize) - fromBrowserToHasuraChannel := common.NewSafeChannel(bufferSize) - fromBrowserToGqlActionsChannel := common.NewSafeChannel(bufferSize) - fromHasuraToBrowserChannel := common.NewSafeChannel(bufferSize) + fromBrowserToHasuraConnectionEstablishingChannel := common.NewSafeChannelByte(bufferSize) + fromBrowserToHasuraChannel := common.NewSafeChannelByte(bufferSize) + fromBrowserToGqlActionsChannel := common.NewSafeChannelByte(bufferSize) + fromHasuraToBrowserChannel := common.NewSafeChannelByte(bufferSize) // Configure the wait group (to hold this routine execution until both are completed) var wgAll sync.WaitGroup @@ -295,7 +297,7 @@ func refreshUserSessionVariables(browserConnection *common.BrowserConnection) er func connectionInitHandler( browserConnection *common.BrowserConnection, - fromBrowserToHasuraConnectionEstablishingChannel *common.SafeChannel) error { + fromBrowserToHasuraConnectionEstablishingChannel *common.SafeChannelByte) error { BrowserConnectionsMutex.RLock() browserConnectionId := browserConnection.Id @@ -309,9 +311,13 @@ func connectionInitHandler( //Received all messages. Channel is closed return fmt.Errorf("error on receiving init connection") } - var fromBrowserMessageAsMap = fromBrowserMessage.(map[string]interface{}) + if bytes.Contains(fromBrowserMessage, []byte("\"connection_init\"")) { + var fromBrowserMessageAsMap map[string]interface{} + if err := json.Unmarshal(fromBrowserMessage, &fromBrowserMessageAsMap); err != nil { + log.Errorf("failed to unmarshal message: %v", err) + continue + } - if fromBrowserMessageAsMap["type"] == "connection_init" { var payloadAsMap = fromBrowserMessageAsMap["payload"].(map[string]interface{}) var headersAsMap = payloadAsMap["headers"].(map[string]interface{}) var sessionToken, existsSessionToken = headersAsMap["X-Session-Token"].(string) @@ -349,7 +355,7 @@ func connectionInitHandler( browserConnection.ClientSessionUUID = clientSessionUUID browserConnection.MeetingId = meetingId browserConnection.UserId = userId - browserConnection.ConnectionInitMessage = fromBrowserMessageAsMap + browserConnection.ConnectionInitMessage = fromBrowserMessage BrowserConnectionsMutex.Unlock() refreshUserSessionVariables(browserConnection) diff --git a/bbb-graphql-middleware/internal/websrv/reader/reader.go b/bbb-graphql-middleware/internal/websrv/reader/reader.go index 85b4837279..60e61a3570 100644 --- a/bbb-graphql-middleware/internal/websrv/reader/reader.go +++ b/bbb-graphql-middleware/internal/websrv/reader/reader.go @@ -1,13 +1,13 @@ package reader import ( + "bytes" "context" + "encoding/json" "errors" "github.com/iMDT/bbb-graphql-middleware/internal/common" log "github.com/sirupsen/logrus" "nhooyr.io/websocket" - "nhooyr.io/websocket/wsjson" - "strings" "sync" "time" ) @@ -17,9 +17,9 @@ func BrowserConnectionReader( ctx context.Context, ctxCancel context.CancelFunc, browserWsConn *websocket.Conn, - fromBrowserToGqlActionsChannel *common.SafeChannel, - fromBrowserToHasuraChannel *common.SafeChannel, - fromBrowserToHasuraConnectionEstablishingChannel *common.SafeChannel, + fromBrowserToGqlActionsChannel *common.SafeChannelByte, + fromBrowserToHasuraChannel *common.SafeChannelByte, + fromBrowserToHasuraConnectionEstablishingChannel *common.SafeChannelByte, waitGroups []*sync.WaitGroup) { log := log.WithField("_routine", "BrowserConnectionReader").WithField("browserConnectionId", browserConnectionId) defer log.Debugf("finished") @@ -43,8 +43,7 @@ func BrowserConnectionReader( defer ctxCancel() for { - var v interface{} - err := wsjson.Read(ctx, browserWsConn, &v) + messageType, message, err := browserWsConn.Read(ctx) if err != nil { if errors.Is(err, context.Canceled) { log.Debugf("Closing Browser ws connection as Context was cancelled!") @@ -54,26 +53,30 @@ func BrowserConnectionReader( return } - log.Tracef("received from browser: %v", v) + log.Tracef("received from browser: %v", message) - if v == nil { + if messageType != websocket.MessageText { + log.Warnf("received non-text message: %v", messageType) continue } - var fromBrowserMessageAsMap = v.(map[string]interface{}) + var browserMessageType struct { + Type string `json:"type"` + } + err = json.Unmarshal(message, &browserMessageType) + if err != nil { + log.Errorf("failed to unmarshal message: %v", err) + return + } - if payload, ok := fromBrowserMessageAsMap["payload"].(map[string]interface{}); ok { - if query, okQuery := payload["query"].(string); okQuery { - //Forward Mutations directly to GraphqlActions - //Update mutations must be handled by Hasura - if strings.HasPrefix(query, "mutation") && !strings.Contains(query, "update_") { - fromBrowserToGqlActionsChannel.Send(v) - continue - } + if browserMessageType.Type == "subscribe" { + if bytes.Contains(message, []byte("\"query\":\"mutation")) && !bytes.Contains(message, []byte("update_")) { + fromBrowserToGqlActionsChannel.Send(message) + continue } } - fromBrowserToHasuraChannel.Send(v) - fromBrowserToHasuraConnectionEstablishingChannel.Send(v) + fromBrowserToHasuraChannel.Send(message) + fromBrowserToHasuraConnectionEstablishingChannel.Send(message) } } diff --git a/bbb-graphql-middleware/internal/websrv/writer/writer.go b/bbb-graphql-middleware/internal/websrv/writer/writer.go index b478ba61fa..4b56c90765 100644 --- a/bbb-graphql-middleware/internal/websrv/writer/writer.go +++ b/bbb-graphql-middleware/internal/websrv/writer/writer.go @@ -2,14 +2,20 @@ package writer import ( "context" + "encoding/json" "github.com/iMDT/bbb-graphql-middleware/internal/common" log "github.com/sirupsen/logrus" "nhooyr.io/websocket" - "nhooyr.io/websocket/wsjson" + "strings" "sync" ) -func BrowserConnectionWriter(browserConnectionId string, ctx context.Context, browserWsConn *websocket.Conn, fromHasuraToBrowserChannel *common.SafeChannel, wg *sync.WaitGroup) { +func BrowserConnectionWriter( + browserConnectionId string, + ctx context.Context, + browserWsConn *websocket.Conn, + fromHasuraToBrowserChannel *common.SafeChannelByte, + wg *sync.WaitGroup) { log := log.WithField("_routine", "BrowserConnectionWriter").WithField("browserConnectionId", browserConnectionId) defer log.Debugf("finished") log.Debugf("starting") @@ -30,10 +36,8 @@ RangeLoop: continue } - var toBrowserMessageAsMap = toBrowserMessage.(map[string]interface{}) - log.Tracef("sending to browser: %v", toBrowserMessage) - err := wsjson.Write(ctx, browserWsConn, toBrowserMessage) + err := browserWsConn.Write(ctx, websocket.MessageText, toBrowserMessage) if err != nil { log.Debugf("Browser is disconnected, skipping writing of ws message: %v", err) return @@ -41,9 +45,15 @@ RangeLoop: // After the error is sent to client, close its connection // Authentication hook unauthorized this request - if toBrowserMessageAsMap["type"] == "connection_error" { - var payloadAsString = toBrowserMessageAsMap["payload"].(string) - browserWsConn.Close(websocket.StatusInternalError, payloadAsString) + if strings.Contains(string(toBrowserMessage), "connection_error") { + type HasuraMessage struct { + Type string `json:"type"` + } + var hasuraMessage HasuraMessage + _ = json.Unmarshal(toBrowserMessage, &hasuraMessage) + if hasuraMessage.Type == "connection_error" { + _ = browserWsConn.Close(websocket.StatusInternalError, string(toBrowserMessage)) + } } } } From 630ed89b486c6e66754c9a6f1546e66a8faef6aa Mon Sep 17 00:00:00 2001 From: Gustavo Trott Date: Tue, 25 Jun 2024 10:31:10 -0300 Subject: [PATCH 2/3] remove comments --- bbb-graphql-middleware/internal/common/CustomJsonPatcher.go | 2 -- bbb-graphql-middleware/internal/hasura/conn/reader/reader.go | 1 - bbb-graphql-middleware/internal/hasura/conn/writer/writer.go | 2 -- 3 files changed, 5 deletions(-) diff --git a/bbb-graphql-middleware/internal/common/CustomJsonPatcher.go b/bbb-graphql-middleware/internal/common/CustomJsonPatcher.go index efaa1acfa7..75109e98ec 100644 --- a/bbb-graphql-middleware/internal/common/CustomJsonPatcher.go +++ b/bbb-graphql-middleware/internal/common/CustomJsonPatcher.go @@ -77,8 +77,6 @@ func CreateJsonPatch(original []byte, modified []byte, idFieldName string) []byt } func CreateJsonPatchFromMaps(original []map[string]interface{}, modified []map[string]interface{}, modifiedJson []byte, idFieldName string) []byte { - //modifiedJson, _ := json.Marshal(modified) - //CREATE PATCHES FOR OPERATION "REPLACE" replacesPatches, originalWithReplaces := CreateReplacePatches(original, modified, idFieldName) diff --git a/bbb-graphql-middleware/internal/hasura/conn/reader/reader.go b/bbb-graphql-middleware/internal/hasura/conn/reader/reader.go index de14fcad46..79c065262a 100644 --- a/bbb-graphql-middleware/internal/hasura/conn/reader/reader.go +++ b/bbb-graphql-middleware/internal/hasura/conn/reader/reader.go @@ -153,7 +153,6 @@ func handleSubscriptionMessage(hc *common.HasuraConnection, message *[]byte, sub lastDataChecksumWas := subscription.LastReceivedDataChecksum lastReceivedDataWas := subscription.LastReceivedData - //cacheKey := fmt.Sprintf("%v-%v", subscription.LastReceivedDataChecksum, dataChecksum) cacheKey := subscription.LastReceivedDataChecksum | dataChecksum //Store LastReceivedData Checksum diff --git a/bbb-graphql-middleware/internal/hasura/conn/writer/writer.go b/bbb-graphql-middleware/internal/hasura/conn/writer/writer.go index daf3039844..b554147964 100644 --- a/bbb-graphql-middleware/internal/hasura/conn/writer/writer.go +++ b/bbb-graphql-middleware/internal/hasura/conn/writer/writer.go @@ -75,8 +75,6 @@ RangeLoop: streamCursorField := "" streamCursorVariableName := "" var streamCursorInitialValue interface{} - //payload := browserMessage.Payload - //operationName := browserMessage.Payload.OperationName query := browserMessage.Payload.Query if query != "" { From 7350947839baf9fdbd2443671a73ddf38ae1e59d Mon Sep 17 00:00:00 2001 From: Gustavo Trott Date: Tue, 25 Jun 2024 12:55:41 -0300 Subject: [PATCH 3/3] fix wrong merging of uint32 --- .../internal/hasura/conn/reader/reader.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/bbb-graphql-middleware/internal/hasura/conn/reader/reader.go b/bbb-graphql-middleware/internal/hasura/conn/reader/reader.go index 79c065262a..744436133c 100644 --- a/bbb-graphql-middleware/internal/hasura/conn/reader/reader.go +++ b/bbb-graphql-middleware/internal/hasura/conn/reader/reader.go @@ -153,7 +153,7 @@ func handleSubscriptionMessage(hc *common.HasuraConnection, message *[]byte, sub lastDataChecksumWas := subscription.LastReceivedDataChecksum lastReceivedDataWas := subscription.LastReceivedData - cacheKey := subscription.LastReceivedDataChecksum | dataChecksum + cacheKey := mergeUint32(subscription.LastReceivedDataChecksum, dataChecksum) //Store LastReceivedData Checksum if msgpatch.RawDataCacheStorageMode == "memory" { @@ -172,6 +172,10 @@ func handleSubscriptionMessage(hc *common.HasuraConnection, message *[]byte, sub return true } +func mergeUint32(a, b uint32) uint32 { + return (a << 16) | (b >> 16) +} + func handleStreamingMessage(hc *common.HasuraConnection, message []byte, subscription common.GraphQlSubscription, queryId string) { lastCursor := common.GetLastStreamCursorValueFromReceivedMessage(message, subscription.StreamCursorField) if lastCursor != nil && subscription.StreamCursorCurrValue != lastCursor {