enhancement (graphql-middleware): Data Uniqueness Verification and others (#19559)

* Prevent middleware from sending the same message when reconnection

* Refactor hasura reader to simplify code

* Its not necessary to name the for

* Close hasura connnection on error
This commit is contained in:
Gustavo Trott 2024-02-02 12:36:27 -03:00 committed by GitHub
parent 16c8baf666
commit 5708c2506b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 168 additions and 134 deletions

View File

@ -0,0 +1,12 @@
package common
import (
"crypto/sha256"
"fmt"
)
func GenerateSha256(data []byte) string {
hasher := sha256.New()
hasher.Write(data)
return fmt.Sprintf("%x", hasher.Sum(nil))
}

View File

@ -25,6 +25,7 @@ type GraphQlSubscription struct {
StreamCursorField string
StreamCursorVariableName string
StreamCursorCurrValue interface{}
LastReceivedDataSha256 string
JsonPatchSupported bool // indicate if client support Json Patch for this subscription
LastSeenOnHasuraConnection string // id of the hasura connection that this query was active
}

View File

@ -2,6 +2,7 @@ package reader
import (
"context"
"encoding/json"
"errors"
"github.com/iMDT/bbb-graphql-middleware/internal/common"
"github.com/iMDT/bbb-graphql-middleware/internal/hascli/retransmiter"
@ -35,69 +36,118 @@ func HasuraConnectionReader(hc *common.HasuraConnection, fromHasuraToBrowserChan
log.Tracef("received from hasura: %v", message)
var messageAsMap = message.(map[string]interface{})
go handleMessageReceivedFromHasura(hc, fromHasuraToBrowserChannel, fromBrowserToHasuraChannel, message)
}
}
if messageAsMap != nil {
var messageType = messageAsMap["type"]
var queryId, _ = messageAsMap["id"].(string)
func handleMessageReceivedFromHasura(hc *common.HasuraConnection, fromHasuraToBrowserChannel *common.SafeChannel, fromBrowserToHasuraChannel *common.SafeChannel, message interface{}) {
var messageMap = message.(map[string]interface{})
//Check if subscription is still active!
if queryId != "" {
hc.Browserconn.ActiveSubscriptionsMutex.RLock()
subscription, ok := hc.Browserconn.ActiveSubscriptions[queryId]
hc.Browserconn.ActiveSubscriptionsMutex.RUnlock()
if !ok {
log.Debugf("Subscription with Id %s doesn't exist anymore, skiping response.", queryId)
continue
}
if messageMap != nil {
var messageType = messageMap["type"]
var queryId, _ = messageMap["id"].(string)
//When Hasura send msg type "complete", this query is finished
if messageType == "complete" {
hc.Browserconn.ActiveSubscriptionsMutex.Lock()
delete(hc.Browserconn.ActiveSubscriptions, queryId)
hc.Browserconn.ActiveSubscriptionsMutex.Unlock()
log.Debugf("Subscription with Id %s finished by Hasura.", queryId)
}
//Check if subscription is still active!
if queryId != "" {
hc.Browserconn.ActiveSubscriptionsMutex.RLock()
subscription, ok := hc.Browserconn.ActiveSubscriptions[queryId]
hc.Browserconn.ActiveSubscriptionsMutex.RUnlock()
if !ok {
log.Debugf("Subscription with Id %s doesn't exist anymore, skiping response.", queryId)
return
}
//Apply msg patch when it supports it
if subscription.JsonPatchSupported &&
messageType == "data" &&
subscription.Type == common.Subscription {
msgpatch.PatchMessage(&messageAsMap, hc.Browserconn)
}
//When Hasura send msg type "complete", this query is finished
if messageType == "complete" {
handleCompleteMessage(hc, queryId)
}
//Set last cursor value for stream
if subscription.Type == common.Streaming {
lastCursor := common.GetLastStreamCursorValueFromReceivedMessage(messageAsMap, subscription.StreamCursorField)
if lastCursor != nil && subscription.StreamCursorCurrValue != lastCursor {
subscription.StreamCursorCurrValue = lastCursor
hc.Browserconn.ActiveSubscriptionsMutex.Lock()
hc.Browserconn.ActiveSubscriptions[queryId] = subscription
hc.Browserconn.ActiveSubscriptionsMutex.Unlock()
}
if messageType == "data" &&
subscription.Type == common.Subscription {
hasNoPreviousOccurrence := handleSubscriptionMessage(hc, messageMap, subscription, queryId)
if !hasNoPreviousOccurrence {
return
}
}
// Retransmit the subscription start commands when hasura confirms the connection
// this is useful in case of a connection invalidation
if messageType == "connection_ack" {
log.Debugf("Received connection_ack")
//Hasura connection was initialized, now it's able to send new messages to Hasura
fromBrowserToHasuraChannel.UnfreezeChannel()
//Avoid to send `connection_ack` to the browser when it's a reconnection
if hc.Browserconn.ConnAckSentToBrowser == false {
fromHasuraToBrowserChannel.Send(messageAsMap)
hc.Browserconn.ConnAckSentToBrowser = true
}
go retransmiter.RetransmitSubscriptionStartMessages(hc, fromBrowserToHasuraChannel)
} else {
// Forward the message to browser
fromHasuraToBrowserChannel.Send(messageAsMap)
//Set last cursor value for stream
if subscription.Type == common.Streaming {
handleStreamingMessage(hc, messageMap, subscription, queryId)
}
}
// Retransmit the subscription start commands when hasura confirms the connection
// this is useful in case of a connection invalidation
if messageType == "connection_ack" {
handleConnectionAckMessage(hc, messageMap, fromHasuraToBrowserChannel, fromBrowserToHasuraChannel)
} else {
// Forward the message to browser
fromHasuraToBrowserChannel.Send(messageMap)
}
}
}
func handleSubscriptionMessage(hc *common.HasuraConnection, messageMap map[string]interface{}, subscription common.GraphQlSubscription, queryId string) bool {
if payload, okPayload := messageMap["payload"].(map[string]interface{}); okPayload {
if data, okData := payload["data"].(map[string]interface{}); okData {
for dataKey, dataItem := range data {
if currentDataProp, okCurrentDataProp := dataItem.([]interface{}); okCurrentDataProp {
if dataAsJson, err := json.Marshal(currentDataProp); err == nil {
//Check whether ReceivedData is different from the LastReceivedData
//Otherwise stop forwarding this message
dataSha256 := common.GenerateSha256(dataAsJson)
if subscription.LastReceivedDataSha256 == dataSha256 {
return false
}
//Store LastReceivedData Sha256
subscription.LastReceivedDataSha256 = dataSha256
hc.Browserconn.ActiveSubscriptionsMutex.Lock()
hc.Browserconn.ActiveSubscriptions[queryId] = subscription
hc.Browserconn.ActiveSubscriptionsMutex.Unlock()
//Apply msg patch when it supports it
if subscription.JsonPatchSupported {
msgpatch.PatchMessage(&messageMap, queryId, dataKey, dataAsJson, hc.Browserconn)
}
}
}
}
}
}
return true
}
func handleStreamingMessage(hc *common.HasuraConnection, messageMap map[string]interface{}, subscription common.GraphQlSubscription, queryId string) {
lastCursor := common.GetLastStreamCursorValueFromReceivedMessage(messageMap, subscription.StreamCursorField)
if lastCursor != nil && subscription.StreamCursorCurrValue != lastCursor {
subscription.StreamCursorCurrValue = lastCursor
hc.Browserconn.ActiveSubscriptionsMutex.Lock()
hc.Browserconn.ActiveSubscriptions[queryId] = subscription
hc.Browserconn.ActiveSubscriptionsMutex.Unlock()
}
}
func handleCompleteMessage(hc *common.HasuraConnection, queryId string) {
hc.Browserconn.ActiveSubscriptionsMutex.Lock()
delete(hc.Browserconn.ActiveSubscriptions, queryId)
hc.Browserconn.ActiveSubscriptionsMutex.Unlock()
log.Debugf("Subscription with Id %s finished by Hasura.", queryId)
}
func handleConnectionAckMessage(hc *common.HasuraConnection, messageMap map[string]interface{}, fromHasuraToBrowserChannel *common.SafeChannel, fromBrowserToHasuraChannel *common.SafeChannel) {
log.Debugf("Received connection_ack")
//Hasura connection was initialized, now it's able to send new messages to Hasura
fromBrowserToHasuraChannel.UnfreezeChannel()
//Avoid to send `connection_ack` to the browser when it's a reconnection
if hc.Browserconn.ConnAckSentToBrowser == false {
fromHasuraToBrowserChannel.Send(messageMap)
hc.Browserconn.ConnAckSentToBrowser = true
}
go retransmiter.RetransmitSubscriptionStartMessages(hc, fromBrowserToHasuraChannel)
}

View File

@ -55,6 +55,7 @@ RangeLoop:
//Identify type based on query string
messageType := common.Query
lastReceivedDataSha256 := ""
streamCursorField := ""
streamCursorVariableName := ""
var streamCursorInitialValue interface{}
@ -65,12 +66,16 @@ RangeLoop:
if strings.HasPrefix(query, "subscription") {
messageType = common.Subscription
browserConnection.ActiveSubscriptionsMutex.RLock()
existingSubscriptionData, queryIdExists := browserConnection.ActiveSubscriptions[queryId]
browserConnection.ActiveSubscriptionsMutex.RUnlock()
if queryIdExists {
lastReceivedDataSha256 = existingSubscriptionData.LastReceivedDataSha256
}
if strings.Contains(query, "_stream(") && strings.Contains(query, "cursor: {") {
messageType = common.Streaming
browserConnection.ActiveSubscriptionsMutex.RLock()
_, queryIdExists := browserConnection.ActiveSubscriptions[queryId]
browserConnection.ActiveSubscriptionsMutex.RUnlock()
if !queryIdExists {
streamCursorField, streamCursorVariableName, streamCursorInitialValue = common.GetStreamCursorPropsFromQuery(payload, query)
@ -110,6 +115,7 @@ RangeLoop:
LastSeenOnHasuraConnection: hc.Id,
JsonPatchSupported: jsonPatchSupported,
Type: messageType,
LastReceivedDataSha256: lastReceivedDataSha256,
}
// log.Tracef("Current queries: %v", browserConnection.ActiveSubscriptions)
browserConnection.ActiveSubscriptionsMutex.Unlock()

View File

@ -82,95 +82,60 @@ func ClearAllCaches() {
}
}
func PatchMessage(receivedMessage *map[string]interface{}, bConn *common.BrowserConnection) {
func PatchMessage(receivedMessage *map[string]interface{}, queryId string, dataKey string, dataAsJson []byte, bConn *common.BrowserConnection) {
var receivedMessageMap = *receivedMessage
idValue, ok := receivedMessageMap["id"]
if !ok {
//Id does not exists in response Json
//It's not a subscription data
fileCacheDirPath, err := getSubscriptionCacheDirPath(bConn, queryId, true)
if err != nil {
log.Errorf("Error on get Client/Subscription cache path: %v", err)
return
}
filePath := fileCacheDirPath + dataKey + ".json"
payload, ok := receivedMessageMap["payload"].(map[string]interface{})
if !ok {
//payload does not exists in response Json
//It's not a subscription data
return
lastContent, err := ioutil.ReadFile(filePath)
if err != nil {
//Last content doesn't exist, probably it's the first response
}
data, ok := payload["data"].(map[string]interface{})
if !ok {
//payload.data does not exists in response Json
//It's not a subscription data
return
}
for key, value := range data {
currentData, ok := value.([]interface{})
if !ok {
log.Errorf("Payload/Data/%s does not exists in response Json.", key)
return
}
dataAsJsonString, err := json.Marshal(currentData)
if err != nil {
log.Errorf("Error on convert Payload/Data/%s.", key)
return
}
fileCacheDirPath, err := getSubscriptionCacheDirPath(bConn, idValue.(string), true)
if err != nil {
log.Errorf("Error on get Client/Subscription cache path: %v", err)
return
}
filePath := fileCacheDirPath + key + ".json"
lastContent, err := ioutil.ReadFile(filePath)
if err != nil {
//Last content doesn't exist, probably it's the first response
}
lastDataAsJsonString := string(lastContent)
if string(dataAsJsonString) == lastDataAsJsonString {
//Content didn't change, set message as null to avoid sending it to the browser
//This case is usual when the middleware reconnects with Hasura and receives the data again
*receivedMessage = nil
} else {
//Content was changed, creating json patch
//If data is small (< minLengthToPatch) it's not worth creating the patch
if lastDataAsJsonString != "" && len(string(dataAsJsonString)) > minLengthToPatch {
diffPatch, e := jsonpatch.CreatePatch([]byte(lastDataAsJsonString), []byte(dataAsJsonString))
if e != nil {
log.Errorf("Error creating JSON patch:%v", e)
return
}
jsonDiffPatch, err := json.Marshal(diffPatch)
if err != nil {
log.Errorf("Error marshaling patch array:", err)
return
}
//Use patch if the length is {minShrinkToUsePatch}% smaller than the original msg
if float64(len(string(jsonDiffPatch)))/float64(len(string(dataAsJsonString))) < minShrinkToUsePatch {
//Modify receivedMessage to include the Patch and remove the previous data
//The key of the original message is kept to avoid errors (Apollo-client expects to receive this prop)
receivedMessageMap["payload"] = map[string]interface{}{
"data": map[string]interface{}{
"patch": json.RawMessage(jsonDiffPatch),
key: json.RawMessage("[]"),
},
}
*receivedMessage = receivedMessageMap
}
lastDataAsJsonString := string(lastContent)
if string(dataAsJson) == lastDataAsJsonString {
//Content didn't change, set message as null to avoid sending it to the browser
//This case is usual when the middleware reconnects with Hasura and receives the data again
*receivedMessage = nil
} else {
//Content was changed, creating json patch
//If data is small (< minLengthToPatch) it's not worth creating the patch
if lastDataAsJsonString != "" && len(string(dataAsJson)) > minLengthToPatch {
diffPatch, e := jsonpatch.CreatePatch([]byte(lastDataAsJsonString), []byte(dataAsJson))
if e != nil {
log.Errorf("Error creating JSON patch:%v", e)
return
}
jsonDiffPatch, err := json.Marshal(diffPatch)
if err != nil {
log.Errorf("Error marshaling patch array:", err)
return
}
//Store current result to be used to create json patch in the future
if lastDataAsJsonString != "" || len(string(dataAsJsonString)) > minLengthToPatch {
errWritingOutput := ioutil.WriteFile(filePath, []byte(dataAsJsonString), 0644)
if errWritingOutput != nil {
log.Errorf("Error on trying to write cache of json diff:", errWritingOutput)
//Use patch if the length is {minShrinkToUsePatch}% smaller than the original msg
if float64(len(string(jsonDiffPatch)))/float64(len(string(dataAsJson))) < minShrinkToUsePatch {
//Modify receivedMessage to include the Patch and remove the previous data
//The key of the original message is kept to avoid errors (Apollo-client expects to receive this prop)
receivedMessageMap["payload"] = map[string]interface{}{
"data": map[string]interface{}{
"patch": json.RawMessage(jsonDiffPatch),
dataKey: json.RawMessage("[]"),
},
}
*receivedMessage = receivedMessageMap
}
}
//Store current result to be used to create json patch in the future
if lastDataAsJsonString != "" || len(string(dataAsJson)) > minLengthToPatch {
errWritingOutput := ioutil.WriteFile(filePath, []byte(dataAsJson), 0644)
if errWritingOutput != nil {
log.Errorf("Error on trying to write cache of json diff:", errWritingOutput)
}
}
}
}