Merge pull request #20576 from gustavotrott/gql-middleware-improve-perform

refactor (gql-middleware): Optimizing GraphQL Message Handling and Caching
This commit is contained in:
Gustavo Trott 2024-06-25 13:35:30 -03:00 committed by GitHub
commit 9d2f090eea
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 603 additions and 345 deletions

View File

@ -45,11 +45,14 @@ func main() {
log.Infof("Json Patch Disabled!")
}
if rawDataCacheStorageMode := os.Getenv("BBB_GRAPHQL_MIDDLEWARE_RAW_DATA_CACHE_STORAGE_MODE"); rawDataCacheStorageMode == "memory" {
msgpatch.RawDataCacheStorageMode = "memory"
} else {
msgpatch.RawDataCacheStorageMode = "file"
}
//if rawDataCacheStorageMode := os.Getenv("BBB_GRAPHQL_MIDDLEWARE_RAW_DATA_CACHE_STORAGE_MODE"); rawDataCacheStorageMode == "file" {
// msgpatch.RawDataCacheStorageMode = "file"
//} else {
// msgpatch.RawDataCacheStorageMode = "memory"
//}
//Force memory cache for now
msgpatch.RawDataCacheStorageMode = "memory"
log.Infof("Raw Data Cache Storage Mode: %s", msgpatch.RawDataCacheStorageMode)
// Websocket listener

View File

@ -7,17 +7,17 @@ import (
var GlobalCacheLocks = NewCacheLocks()
type CacheLocks struct {
locks map[string]*sync.Mutex
locks map[uint32]*sync.Mutex
mutex sync.Mutex // Protects the 'locks' map
}
func NewCacheLocks() *CacheLocks {
return &CacheLocks{
locks: make(map[string]*sync.Mutex),
locks: make(map[uint32]*sync.Mutex),
}
}
func (c *CacheLocks) Lock(id string) {
func (c *CacheLocks) Lock(id uint32) {
c.mutex.Lock()
if _, exists := c.locks[id]; !exists {
c.locks[id] = &sync.Mutex{}
@ -28,7 +28,7 @@ func (c *CacheLocks) Lock(id string) {
mtx.Lock() // Lock the specific ID mutex
}
func (c *CacheLocks) Unlock(id string) {
func (c *CacheLocks) Unlock(id uint32) {
c.mutex.Lock()
if mtx, exists := c.locks[id]; exists {
mtx.Unlock()

View File

@ -1,6 +1,7 @@
package common
import (
"bytes"
"encoding/json"
"fmt"
evanphxjsonpatch "github.com/evanphx/json-patch"
@ -8,46 +9,51 @@ import (
log "github.com/sirupsen/logrus"
)
func ValidateIfShouldUseCustomJsonPatch(original []byte, modified []byte, idFieldName string) bool {
func ValidateIfShouldUseCustomJsonPatch(original []byte, modified []byte, idFieldName string) (bool, []byte) {
//Temporarily use CustomPatch only for UserList (testing feature)
if !bytes.Contains(modified, []byte("\"__typename\":\"user\"}]")) {
return false, nil
}
//Test Original Data
originalMap := GetMapFromByte(original)
if originalMap == nil {
return false
return false, nil
}
if len(originalMap) <= 1 {
return false
return false, nil
}
firstItem := originalMap[0]
if _, existsIdField := firstItem[idFieldName].(string); !existsIdField {
return false
return false, nil
}
if hasDuplicatedId(originalMap, idFieldName) {
return false
return false, nil
}
//Test Modified Data
modifiedMap := GetMapFromByte(modified)
if modifiedMap == nil {
return false
return false, nil
}
if len(modifiedMap) <= 1 {
return false
return false, nil
}
firstItem = modifiedMap[0]
if _, existsIdField := firstItem[idFieldName].(string); !existsIdField {
return false
return false, nil
}
if hasDuplicatedId(modifiedMap, idFieldName) {
return false
return false, nil
}
return true
return true, CreateJsonPatchFromMaps(originalMap, modifiedMap, modified, "userId")
}
func hasDuplicatedId(items []map[string]interface{}, idFieldName string) bool {
@ -67,12 +73,10 @@ func CreateJsonPatch(original []byte, modified []byte, idFieldName string) []byt
originalMap := GetMapFromByte(original)
modifiedMap := GetMapFromByte(modified)
return CreateJsonPatchFromMaps(originalMap, modifiedMap, idFieldName)
return CreateJsonPatchFromMaps(originalMap, modifiedMap, modified, idFieldName)
}
func CreateJsonPatchFromMaps(original []map[string]interface{}, modified []map[string]interface{}, idFieldName string) []byte {
modifiedJson, _ := json.Marshal(modified)
func CreateJsonPatchFromMaps(original []map[string]interface{}, modified []map[string]interface{}, modifiedJson []byte, idFieldName string) []byte {
//CREATE PATCHES FOR OPERATION "REPLACE"
replacesPatches, originalWithReplaces := CreateReplacePatches(original, modified, idFieldName)

View File

@ -16,31 +16,93 @@ func GetUniqueID() string {
return uniqueID
}
var JsonPatchCache = make(map[string][]byte)
var JsonPatchCacheMutex sync.RWMutex
var PatchedMessageCache = make(map[uint32][]byte)
var PatchedMessageCacheMutex sync.RWMutex
func GetJsonPatchCache(cacheKey string) ([]byte, bool) {
JsonPatchCacheMutex.RLock()
defer JsonPatchCacheMutex.RUnlock()
func GetPatchedMessageCache(cacheKey uint32) ([]byte, bool) {
PatchedMessageCacheMutex.RLock()
defer PatchedMessageCacheMutex.RUnlock()
jsonDiffPatch, jsonDiffPatchExists := JsonPatchCache[cacheKey]
jsonDiffPatch, jsonDiffPatchExists := PatchedMessageCache[cacheKey]
return jsonDiffPatch, jsonDiffPatchExists
}
func StoreJsonPatchCache(cacheKey string, data []byte) {
JsonPatchCacheMutex.Lock()
defer JsonPatchCacheMutex.Unlock()
func StorePatchedMessageCache(cacheKey uint32, data []byte) {
PatchedMessageCacheMutex.Lock()
defer PatchedMessageCacheMutex.Unlock()
JsonPatchCache[cacheKey] = data
PatchedMessageCache[cacheKey] = data
//Remove the cache after 30 seconds
go RemoveJsonPatchCache(cacheKey, 30)
go RemovePatchedMessageCache(cacheKey, 30)
}
func RemoveJsonPatchCache(cacheKey string, delayInSecs time.Duration) {
func RemovePatchedMessageCache(cacheKey uint32, delayInSecs time.Duration) {
time.Sleep(delayInSecs * time.Second)
JsonPatchCacheMutex.Lock()
defer JsonPatchCacheMutex.Unlock()
delete(JsonPatchCache, cacheKey)
PatchedMessageCacheMutex.Lock()
defer PatchedMessageCacheMutex.Unlock()
delete(PatchedMessageCache, cacheKey)
}
var HasuraMessageCache = make(map[uint32]HasuraMessage)
var HasuraMessageKeyCache = make(map[uint32]string)
var HasuraMessageCacheMutex sync.RWMutex
func GetHasuraMessageCache(cacheKey uint32) (string, HasuraMessage, bool) {
HasuraMessageCacheMutex.RLock()
defer HasuraMessageCacheMutex.RUnlock()
hasuraMessageDataKey, _ := HasuraMessageKeyCache[cacheKey]
hasuraMessage, hasuraMessageExists := HasuraMessageCache[cacheKey]
return hasuraMessageDataKey, hasuraMessage, hasuraMessageExists
}
func StoreHasuraMessageCache(cacheKey uint32, dataKey string, hasuraMessage HasuraMessage) {
HasuraMessageCacheMutex.Lock()
defer HasuraMessageCacheMutex.Unlock()
HasuraMessageKeyCache[cacheKey] = dataKey
HasuraMessageCache[cacheKey] = hasuraMessage
//Remove the cache after 30 seconds
go RemoveHasuraMessageCache(cacheKey, 30)
}
func RemoveHasuraMessageCache(cacheKey uint32, delayInSecs time.Duration) {
time.Sleep(delayInSecs * time.Second)
HasuraMessageCacheMutex.Lock()
defer HasuraMessageCacheMutex.Unlock()
delete(HasuraMessageKeyCache, cacheKey)
delete(HasuraMessageCache, cacheKey)
}
var StreamCursorValueCache = make(map[uint32]interface{})
var StreamCursorValueCacheMutex sync.RWMutex
func GetStreamCursorValueCache(cacheKey uint32) (interface{}, bool) {
StreamCursorValueCacheMutex.RLock()
defer StreamCursorValueCacheMutex.RUnlock()
streamCursorValue, streamCursorValueExists := StreamCursorValueCache[cacheKey]
return streamCursorValue, streamCursorValueExists
}
func StoreStreamCursorValueCache(cacheKey uint32, streamCursorValue interface{}) {
StreamCursorValueCacheMutex.Lock()
defer StreamCursorValueCacheMutex.Unlock()
StreamCursorValueCache[cacheKey] = streamCursorValue
//Remove the cache after 30 seconds
go RemoveStreamCursorValueCache(cacheKey, 30)
}
func RemoveStreamCursorValueCache(cacheKey uint32, delayInSecs time.Duration) {
time.Sleep(delayInSecs * time.Second)
StreamCursorValueCacheMutex.Lock()
defer StreamCursorValueCacheMutex.Unlock()
delete(StreamCursorValueCache, cacheKey)
}

View File

@ -0,0 +1,73 @@
package common
import (
"sync"
)
type SafeChannelByte struct {
ch chan []byte
closed bool
mux sync.Mutex
freezeFlag bool
}
func NewSafeChannelByte(size int) *SafeChannelByte {
return &SafeChannelByte{
ch: make(chan []byte, size),
}
}
func (s *SafeChannelByte) Send(value []byte) bool {
s.mux.Lock()
defer s.mux.Unlock()
if s.closed {
return false
}
s.ch <- value
return true
}
func (s *SafeChannelByte) Receive() ([]byte, bool) {
val, ok := <-s.ch
return val, ok
}
func (s *SafeChannelByte) ReceiveChannel() <-chan []byte {
return s.ch
}
func (s *SafeChannelByte) Closed() bool {
s.mux.Lock()
defer s.mux.Unlock()
return s.closed
}
func (s *SafeChannelByte) Close() {
s.mux.Lock()
defer s.mux.Unlock()
if !s.closed {
close(s.ch)
s.closed = true
}
}
func (s *SafeChannelByte) Frozen() bool {
return s.freezeFlag
}
func (s *SafeChannelByte) FreezeChannel() {
if !s.freezeFlag {
s.mux.Lock()
s.freezeFlag = true
}
}
func (s *SafeChannelByte) UnfreezeChannel() {
if s.freezeFlag {
s.mux.Unlock()
s.freezeFlag = false
}
}

View File

@ -1,27 +1,28 @@
package common
import (
"encoding/json"
"fmt"
log "github.com/sirupsen/logrus"
"hash/crc32"
"regexp"
"strconv"
"strings"
)
func GetStreamCursorPropsFromQuery(payload map[string]interface{}, query string) (string, string, interface{}) {
func GetStreamCursorPropsFromBrowserMessage(browserMessage BrowserSubscribeMessage) (string, string, interface{}) {
streamCursorField := ""
streamCursorVariableName := ""
var streamCursorInitialValue interface{}
cursorInitialValueRePattern := regexp.MustCompile(`cursor:\s*\{\s*initial_value\s*:\s*\{\s*([^:]+):\s*([^}]+)\s*}\s*}`)
matches := cursorInitialValueRePattern.FindStringSubmatch(query)
matches := cursorInitialValueRePattern.FindStringSubmatch(browserMessage.Payload.Query)
if matches != nil {
streamCursorField = matches[1]
if strings.HasPrefix(matches[2], "$") {
streamCursorVariableName, _ = strings.CutPrefix(matches[2], "$")
if variables, okVariables := payload["variables"].(map[string]interface{}); okVariables {
if targetVariableValue, okTargetVariableValue := variables[streamCursorVariableName]; okTargetVariableValue {
streamCursorInitialValue = targetVariableValue
}
if targetVariableValue, okTargetVariableValue := browserMessage.Payload.Variables[streamCursorVariableName]; okTargetVariableValue {
streamCursorInitialValue = targetVariableValue
}
} else {
streamCursorInitialValue = matches[2]
@ -31,9 +32,28 @@ func GetStreamCursorPropsFromQuery(payload map[string]interface{}, query string)
return streamCursorField, streamCursorVariableName, streamCursorInitialValue
}
func GetLastStreamCursorValueFromReceivedMessage(messageAsMap map[string]interface{}, streamCursorField string) interface{} {
func GetLastStreamCursorValueFromReceivedMessage(message []byte, streamCursorField string) interface{} {
dataChecksum := crc32.ChecksumIEEE(message)
GlobalCacheLocks.Lock(dataChecksum)
if streamCursorValueCache, streamCursorValueCacheExists := GetStreamCursorValueCache(dataChecksum); streamCursorValueCacheExists {
//Unlock immediately once the cache was already created by other routine
GlobalCacheLocks.Unlock(dataChecksum)
return streamCursorValueCache
} else {
//It will create the cache and then Unlock (others will wait to benefit from this cache)
defer GlobalCacheLocks.Unlock(dataChecksum)
}
var lastStreamCursorValue interface{}
var messageAsMap map[string]interface{}
err := json.Unmarshal(message, &messageAsMap)
if err != nil {
log.Errorf("failed to unmarshal message: %v", err)
//return
}
if payload, okPayload := messageAsMap["payload"].(map[string]interface{}); okPayload {
if data, okData := payload["data"].(map[string]interface{}); okData {
//Data will have only one prop, `range` because its name is unknown
@ -52,6 +72,7 @@ func GetLastStreamCursorValueFromReceivedMessage(messageAsMap map[string]interfa
}
}
StoreStreamCursorValueCache(dataChecksum, lastStreamCursorValue)
return lastStreamCursorValue
}
@ -60,76 +81,84 @@ func PatchQueryIncludingCursorField(originalQuery string, cursorField string) st
return originalQuery
}
lastIndex := strings.LastIndex(originalQuery, "{")
if lastIndex == -1 {
lastIndexOfTypename := LastButOneIndex(originalQuery, "}")
if lastIndexOfTypename == -1 {
return originalQuery
}
// It will include the cursorField at the beginning of the list of fields
// It will include the cursorField at the end of the list of fields
// It's not a problem if the field be duplicated in the list, Hasura just ignore the second occurrence
return originalQuery[:lastIndex+1] + "\n " + cursorField + originalQuery[lastIndex+1:]
return originalQuery[:lastIndexOfTypename] + " " + cursorField + "\n " + originalQuery[lastIndexOfTypename:]
}
func PatchQuerySettingLastCursorValue(subscription GraphQlSubscription) interface{} {
message := subscription.Message
payload, okPayload := message["payload"].(map[string]interface{})
if okPayload {
if subscription.StreamCursorVariableName != "" {
/**** This stream has its cursor value set through variables ****/
if variables, okVariables := payload["variables"].(map[string]interface{}); okVariables {
if variables[subscription.StreamCursorVariableName] != subscription.StreamCursorCurrValue {
variables[subscription.StreamCursorVariableName] = subscription.StreamCursorCurrValue
payload["variables"] = variables
message["payload"] = payload
}
}
} else {
/**** This stream has its cursor value set through inline value (not variables) ****/
query, okQuery := payload["query"].(string)
if okQuery {
cursorInitialValueRePattern := regexp.MustCompile(`cursor:\s*\{\s*initial_value\s*:\s*\{\s*([^:]+:\s*[^}]+)\s*}\s*}`)
newValue := ""
replaceInitialValueFunc := func(match string) string {
switch v := subscription.StreamCursorCurrValue.(type) {
case string:
newValue = v
//Append quotes if it is missing, it will be necessary when appending to the query
if !strings.HasPrefix(v, "\"") {
newValue = "\"" + newValue
}
if !strings.HasSuffix(v, "\"") {
newValue = newValue + "\""
}
case int:
newValue = strconv.Itoa(v)
case float32:
myFloat64 := float64(v)
newValue = strconv.FormatFloat(myFloat64, 'f', -1, 32)
case float64:
newValue = strconv.FormatFloat(v, 'f', -1, 64)
default:
newValue = ""
}
if newValue != "" {
replacement := subscription.StreamCursorField + ": " + newValue
return fmt.Sprintf("cursor: {initial_value: {%s}}", replacement)
} else {
return match
}
}
newQuery := cursorInitialValueRePattern.ReplaceAllStringFunc(query, replaceInitialValueFunc)
if query != newQuery {
payload["query"] = newQuery
message["payload"] = payload
}
}
}
func PatchQuerySettingLastCursorValue(subscription GraphQlSubscription) []byte {
var browserMessage BrowserSubscribeMessage
err := json.Unmarshal(subscription.Message, &browserMessage)
if err != nil {
log.Errorf("failed to unmarshal message: %v", err)
return subscription.Message
}
return message
if subscription.StreamCursorVariableName != "" {
/**** This stream has its cursor value set through variables ****/
if browserMessage.Payload.Variables[subscription.StreamCursorVariableName] == subscription.StreamCursorCurrValue {
return subscription.Message
}
browserMessage.Payload.Variables[subscription.StreamCursorVariableName] = subscription.StreamCursorCurrValue
} else {
/**** This stream has its cursor value set through inline value (not variables) ****/
cursorInitialValueRePattern := regexp.MustCompile(`cursor:\s*\{\s*initial_value\s*:\s*\{\s*([^:]+:\s*[^}]+)\s*}\s*}`)
newValue := ""
replaceInitialValueFunc := func(match string) string {
switch v := subscription.StreamCursorCurrValue.(type) {
case string:
newValue = v
//Append quotes if it is missing, it will be necessary when appending to the query
if !strings.HasPrefix(v, "\"") {
newValue = "\"" + newValue
}
if !strings.HasSuffix(v, "\"") {
newValue = newValue + "\""
}
case int:
newValue = strconv.Itoa(v)
case float32:
myFloat64 := float64(v)
newValue = strconv.FormatFloat(myFloat64, 'f', -1, 32)
case float64:
newValue = strconv.FormatFloat(v, 'f', -1, 64)
default:
newValue = ""
}
if newValue != "" {
replacement := subscription.StreamCursorField + ": " + newValue
return fmt.Sprintf("cursor: {initial_value: {%s}}", replacement)
} else {
return match
}
}
newQuery := cursorInitialValueRePattern.ReplaceAllStringFunc(browserMessage.Payload.Query, replaceInitialValueFunc)
if browserMessage.Payload.Query == newQuery {
return subscription.Message
}
browserMessage.Payload.Query = newQuery
}
newMessageJson, _ := json.Marshal(browserMessage)
return newMessageJson
}
func LastButOneIndex(s, substr string) int {
last := strings.LastIndex(s, substr)
if last == -1 {
return -1
}
return strings.LastIndex(s[:last], substr)
}

View File

@ -2,6 +2,7 @@ package common
import (
"context"
"encoding/json"
"net/http"
"sync"
@ -20,13 +21,13 @@ const (
type GraphQlSubscription struct {
Id string
Message map[string]interface{}
Message []byte
Type QueryType
OperationName string
StreamCursorField string
StreamCursorVariableName string
StreamCursorCurrValue interface{}
LastReceivedData []byte
LastReceivedData HasuraMessage
LastReceivedDataChecksum uint32
JsonPatchSupported bool // indicate if client support Json Patch for this subscription
LastSeenOnHasuraConnection string // id of the hasura connection that this query was active
@ -45,7 +46,7 @@ type BrowserConnection struct {
BrowserRequestCookies []*http.Cookie
ActiveSubscriptions map[string]GraphQlSubscription // active subscriptions of this connection (start, but no stop)
ActiveSubscriptionsMutex sync.RWMutex // mutex to control the map usage
ConnectionInitMessage map[string]interface{} // init message received in this connection (to be used on hasura reconnect)
ConnectionInitMessage []byte // init message received in this connection (to be used on hasura reconnect)
HasuraConnection *HasuraConnection // associated hasura connection
Disconnected bool // indicate if the connection is gone
ConnAckSentToBrowser bool // indicate if `connection_ack` msg was already sent to the browser
@ -62,3 +63,22 @@ type HasuraConnection struct {
ContextCancelFunc context.CancelFunc // function to cancel the hasura context (and so, the hasura connection)
FreezeMsgFromBrowserChan *SafeChannel // indicate that it's waiting for the return of mutations before closing connection
}
type HasuraMessage struct {
Type string `json:"type"`
ID string `json:"id"`
Payload struct {
Data map[string]json.RawMessage `json:"data"`
} `json:"payload"`
}
type BrowserSubscribeMessage struct {
Type string `json:"type"`
ID string `json:"id"`
Payload struct {
Extensions map[string]interface{} `json:"extensions"`
OperationName string `json:"operationName"`
Query string `json:"query"`
Variables map[string]interface{} `json:"variables"`
} `json:"payload"`
}

View File

@ -17,8 +17,8 @@ var graphqlActionsUrl = os.Getenv("BBB_GRAPHQL_MIDDLEWARE_GRAPHQL_ACTIONS_URL")
func GraphqlActionsClient(
browserConnection *common.BrowserConnection,
fromBrowserToGqlActionsChannel *common.SafeChannel,
fromHasuraToBrowserChannel *common.SafeChannel) error {
fromBrowserToGqlActionsChannel *common.SafeChannelByte,
fromHasuraToBrowserChannel *common.SafeChannelByte) error {
log := log.WithField("_routine", "GraphqlActionsClient").WithField("browserConnectionId", browserConnection.Id)
log.Debug("Starting GraphqlActionsClient")
@ -38,19 +38,19 @@ RangeLoop:
continue
}
var fromBrowserMessageAsMap = fromBrowserMessage.(map[string]interface{})
if fromBrowserMessageAsMap["type"] == "subscribe" {
queryId := fromBrowserMessageAsMap["id"].(string)
payload := fromBrowserMessageAsMap["payload"].(map[string]interface{})
var browserMessage common.BrowserSubscribeMessage
err := json.Unmarshal(fromBrowserMessage, &browserMessage)
if err != nil {
log.Errorf("failed to unmarshal message: %v", err)
continue
}
if browserMessage.Type == "subscribe" {
var errorMessage string
var mutationFuncName string
query, okQuery := payload["query"].(string)
variables, okVariables := payload["variables"].(map[string]interface{})
if okQuery && okVariables && strings.HasPrefix(query, "mutation") {
if funcName, inputs, err := parseGraphQLMutation(query, variables); err == nil {
if strings.HasPrefix(browserMessage.Payload.Query, "mutation") {
if funcName, inputs, err := parseGraphQLMutation(browserMessage.Payload.Query, browserMessage.Payload.Variables); err == nil {
mutationFuncName = funcName
if err = SendGqlActionsRequest(funcName, inputs, browserConnection.BBBWebSessionVariables); err == nil {
} else {
@ -66,7 +66,7 @@ RangeLoop:
if errorMessage != "" {
//Error on sending action, return error msg to client
browserResponseData := map[string]interface{}{
"id": queryId,
"id": browserMessage.ID,
"type": "error",
"payload": []interface{}{
map[string]interface{}{
@ -74,12 +74,12 @@ RangeLoop:
},
},
}
fromHasuraToBrowserChannel.Send(browserResponseData)
jsonData, _ := json.Marshal(browserResponseData)
fromHasuraToBrowserChannel.Send(jsonData)
} else {
//Action sent successfully, return data msg to client
browserResponseData := map[string]interface{}{
"id": queryId,
"id": browserMessage.ID,
"type": "next",
"payload": map[string]interface{}{
"data": map[string]interface{}{
@ -87,15 +87,17 @@ RangeLoop:
},
},
}
fromHasuraToBrowserChannel.Send(browserResponseData)
jsonData, _ := json.Marshal(browserResponseData)
fromHasuraToBrowserChannel.Send(jsonData)
}
//Return complete msg to client
browserResponseComplete := map[string]interface{}{
"id": queryId,
"id": browserMessage.ID,
"type": "complete",
}
fromHasuraToBrowserChannel.Send(browserResponseComplete)
jsonData, _ := json.Marshal(browserResponseComplete)
fromHasuraToBrowserChannel.Send(jsonData)
}
//Fallback to Hasura was disabled (keeping the code temporarily)

View File

@ -24,8 +24,8 @@ var hasuraEndpoint = os.Getenv("BBB_GRAPHQL_MIDDLEWARE_HASURA_WS")
// Hasura client connection
func HasuraClient(
browserConnection *common.BrowserConnection,
fromBrowserToHasuraChannel *common.SafeChannel,
fromHasuraToBrowserChannel *common.SafeChannel) error {
fromBrowserToHasuraChannel *common.SafeChannelByte,
fromHasuraToBrowserChannel *common.SafeChannelByte) error {
log := log.WithField("_routine", "HasuraClient").WithField("browserConnectionId", browserConnection.Id)
common.ActivitiesOverviewStarted("__HasuraConnection")
defer common.ActivitiesOverviewCompleted("__HasuraConnection")

View File

@ -1,26 +1,21 @@
package reader
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"github.com/iMDT/bbb-graphql-middleware/internal/common"
"github.com/iMDT/bbb-graphql-middleware/internal/hasura/retransmiter"
"github.com/iMDT/bbb-graphql-middleware/internal/msgpatch"
log "github.com/sirupsen/logrus"
"hash/crc32"
"nhooyr.io/websocket"
"nhooyr.io/websocket/wsjson"
"sync"
)
// HasuraConnectionReader consumes messages from Hasura connection and add send to the browser channel
func HasuraConnectionReader(
hc *common.HasuraConnection,
fromHasuraToBrowserChannel *common.SafeChannel,
fromBrowserToHasuraChannel *common.SafeChannel,
wg *sync.WaitGroup) {
func HasuraConnectionReader(hc *common.HasuraConnection, fromHasuraToBrowserChannel *common.SafeChannelByte, fromBrowserToHasuraChannel *common.SafeChannelByte, wg *sync.WaitGroup) {
log := log.WithField("_routine", "HasuraConnectionReader").WithField("browserConnectionId", hc.BrowserConn.Id).WithField("hasuraConnectionId", hc.Id)
defer log.Debugf("finished")
log.Debugf("starting")
@ -29,10 +24,7 @@ func HasuraConnectionReader(
defer hc.ContextCancelFunc()
for {
// Read a message from hasura
var message interface{}
err := wsjson.Read(hc.Context, hc.Websocket, &message)
messageType, message, err := hc.Websocket.Read(hc.Context)
var closeError *websocket.CloseError
if err != nil {
@ -58,113 +50,134 @@ func HasuraConnectionReader(
return
}
if messageType != websocket.MessageText {
log.Warnf("received non-text message: %v", messageType)
continue
}
log.Tracef("received from hasura: %v", message)
handleMessageReceivedFromHasura(hc, fromHasuraToBrowserChannel, fromBrowserToHasuraChannel, message)
}
}
func handleMessageReceivedFromHasura(hc *common.HasuraConnection, fromHasuraToBrowserChannel *common.SafeChannel, fromBrowserToHasuraChannel *common.SafeChannel, message interface{}) {
var messageMap = message.(map[string]interface{})
var QueryIdPlaceholderInBytes = []byte("--------------QUERY-ID--------------") //36 chars
if messageMap != nil {
var messageType = messageMap["type"]
var queryId, _ = messageMap["id"].(string)
func handleMessageReceivedFromHasura(hc *common.HasuraConnection, fromHasuraToBrowserChannel *common.SafeChannelByte, fromBrowserToHasuraChannel *common.SafeChannelByte, message []byte) {
type HasuraMessageInfo struct {
Type string `json:"type"`
ID string `json:"id"`
}
var hasuraMessageInfo HasuraMessageInfo
err := json.Unmarshal(message, &hasuraMessageInfo)
if err != nil {
log.Errorf("failed to unmarshal message: %v", err)
return
}
//Check if subscription is still active!
if queryId != "" {
hc.BrowserConn.ActiveSubscriptionsMutex.RLock()
subscription, ok := hc.BrowserConn.ActiveSubscriptions[queryId]
hc.BrowserConn.ActiveSubscriptionsMutex.RUnlock()
if !ok {
log.Debugf("Subscription with Id %s doesn't exist anymore, skipping response.", queryId)
queryIdReplacementApplied := false
queryIdInBytes := []byte(hasuraMessageInfo.ID)
//Check if subscription is still active!
if hasuraMessageInfo.ID != "" {
hc.BrowserConn.ActiveSubscriptionsMutex.RLock()
subscription, ok := hc.BrowserConn.ActiveSubscriptions[hasuraMessageInfo.ID]
hc.BrowserConn.ActiveSubscriptionsMutex.RUnlock()
if !ok {
log.Debugf("Subscription with Id %s doesn't exist anymore, skipping response.", hasuraMessageInfo.ID)
return
}
//When Hasura send msg type "complete", this query is finished
if hasuraMessageInfo.Type == "complete" {
handleCompleteMessage(hc, hasuraMessageInfo.ID)
common.ActivitiesOverviewCompleted(string(subscription.Type) + "-" + subscription.OperationName)
common.ActivitiesOverviewCompleted("_Sum-" + string(subscription.Type))
}
if hasuraMessageInfo.Type == "next" {
common.ActivitiesOverviewDataReceived(string(subscription.Type) + "-" + subscription.OperationName)
}
if hasuraMessageInfo.Type == "next" &&
subscription.Type == common.Subscription {
//Remove queryId from message
message = bytes.Replace(message, queryIdInBytes, QueryIdPlaceholderInBytes, 1)
queryIdReplacementApplied = true
isDifferentFromPreviousMessage := handleSubscriptionMessage(hc, &message, subscription, hasuraMessageInfo.ID)
//Stop processing case it is the same message (probably is a reconnection with Hasura)
if !isDifferentFromPreviousMessage {
return
}
//When Hasura send msg type "complete", this query is finished
if messageType == "complete" {
handleCompleteMessage(hc, queryId)
common.ActivitiesOverviewCompleted(string(subscription.Type) + "-" + subscription.OperationName)
common.ActivitiesOverviewCompleted("_Sum-" + string(subscription.Type))
}
if messageType == "next" {
common.ActivitiesOverviewDataReceived(string(subscription.Type) + "-" + subscription.OperationName)
}
if messageType == "next" &&
subscription.Type == common.Subscription {
hasNoPreviousOccurrence := handleSubscriptionMessage(hc, messageMap, subscription, queryId)
if !hasNoPreviousOccurrence {
return
}
}
//Set last cursor value for stream
if subscription.Type == common.Streaming {
handleStreamingMessage(hc, messageMap, subscription, queryId)
}
}
// Retransmit the subscription start commands when hasura confirms the connection
// this is useful in case of a connection invalidation
if messageType == "connection_ack" {
handleConnectionAckMessage(hc, messageMap, fromHasuraToBrowserChannel, fromBrowserToHasuraChannel)
} else {
// Forward the message to browser
fromHasuraToBrowserChannel.Send(messageMap)
//Set last cursor value for stream
if subscription.Type == common.Streaming {
//Remove queryId from message
messageWithoutId := bytes.Replace(message, queryIdInBytes, QueryIdPlaceholderInBytes, 1)
handleStreamingMessage(hc, messageWithoutId, subscription, hasuraMessageInfo.ID)
}
}
// Retransmit the subscription start commands when hasura confirms the connection
// this is useful in case of a connection invalidation
if hasuraMessageInfo.Type == "connection_ack" {
handleConnectionAckMessage(hc, message, fromHasuraToBrowserChannel, fromBrowserToHasuraChannel)
} else {
if queryIdReplacementApplied {
message = bytes.Replace(message, QueryIdPlaceholderInBytes, queryIdInBytes, 1)
}
// Forward the message to browser
fromHasuraToBrowserChannel.Send(message)
}
}
func handleSubscriptionMessage(hc *common.HasuraConnection, messageMap map[string]interface{}, subscription common.GraphQlSubscription, queryId string) bool {
if payload, okPayload := messageMap["payload"].(map[string]interface{}); okPayload {
if data, okData := payload["data"].(map[string]interface{}); okData {
for dataKey, dataItem := range data {
if currentDataProp, okCurrentDataProp := dataItem.([]interface{}); okCurrentDataProp {
if dataAsJson, err := json.Marshal(currentDataProp); err == nil {
if common.ActivitiesOverviewEnabled {
dataSize := len(string(dataAsJson))
dataCount := len(currentDataProp)
common.ActivitiesOverviewDataSize(string(subscription.Type)+"-"+subscription.OperationName, int64(dataSize), int64(dataCount))
}
func handleSubscriptionMessage(hc *common.HasuraConnection, message *[]byte, subscription common.GraphQlSubscription, queryId string) bool {
if common.ActivitiesOverviewEnabled {
dataSize := len(string(*message))
common.ActivitiesOverviewDataSize(string(subscription.Type)+"-"+subscription.OperationName, int64(dataSize), 0)
}
//Check whether ReceivedData is different from the LastReceivedData
//Otherwise stop forwarding this message
dataChecksum := crc32.ChecksumIEEE(dataAsJson)
if subscription.LastReceivedDataChecksum == dataChecksum {
return false
}
dataChecksum, messageDataKey, messageData := getHasuraMessage(*message)
lastDataChecksumWas := subscription.LastReceivedDataChecksum
lastDataAsJsonWas := subscription.LastReceivedData
cacheKey := fmt.Sprintf("%s-%s-%v-%v", string(subscription.Type), subscription.OperationName, subscription.LastReceivedDataChecksum, dataChecksum)
//Check whether ReceivedData is different from the LastReceivedData
//Otherwise stop forwarding this message
if subscription.LastReceivedDataChecksum == dataChecksum {
return false
}
//Store LastReceivedData Checksum
if msgpatch.RawDataCacheStorageMode == "memory" {
subscription.LastReceivedData = dataAsJson
}
subscription.LastReceivedDataChecksum = dataChecksum
hc.BrowserConn.ActiveSubscriptionsMutex.Lock()
hc.BrowserConn.ActiveSubscriptions[queryId] = subscription
hc.BrowserConn.ActiveSubscriptionsMutex.Unlock()
lastDataChecksumWas := subscription.LastReceivedDataChecksum
lastReceivedDataWas := subscription.LastReceivedData
cacheKey := mergeUint32(subscription.LastReceivedDataChecksum, dataChecksum)
//Apply msg patch when it supports it
if subscription.JsonPatchSupported {
msgpatch.PatchMessage(&messageMap, queryId, dataKey, lastDataAsJsonWas, dataAsJson, hc.BrowserConn.Id, hc.BrowserConn.SessionToken, cacheKey, lastDataChecksumWas, dataChecksum)
}
}
}
}
}
//Store LastReceivedData Checksum
if msgpatch.RawDataCacheStorageMode == "memory" {
subscription.LastReceivedData = messageData
}
subscription.LastReceivedDataChecksum = dataChecksum
hc.BrowserConn.ActiveSubscriptionsMutex.Lock()
hc.BrowserConn.ActiveSubscriptions[queryId] = subscription
hc.BrowserConn.ActiveSubscriptionsMutex.Unlock()
//Apply msg patch when it supports it
if subscription.JsonPatchSupported {
*message = msgpatch.GetPatchedMessage(*message, queryId, messageDataKey, lastReceivedDataWas, messageData, hc.BrowserConn.Id, hc.BrowserConn.SessionToken, cacheKey, lastDataChecksumWas, dataChecksum)
}
return true
}
func handleStreamingMessage(hc *common.HasuraConnection, messageMap map[string]interface{}, subscription common.GraphQlSubscription, queryId string) {
lastCursor := common.GetLastStreamCursorValueFromReceivedMessage(messageMap, subscription.StreamCursorField)
func mergeUint32(a, b uint32) uint32 {
return (a << 16) | (b >> 16)
}
func handleStreamingMessage(hc *common.HasuraConnection, message []byte, subscription common.GraphQlSubscription, queryId string) {
lastCursor := common.GetLastStreamCursorValueFromReceivedMessage(message, subscription.StreamCursorField)
if lastCursor != nil && subscription.StreamCursorCurrValue != lastCursor {
subscription.StreamCursorCurrValue = lastCursor
@ -183,16 +196,45 @@ func handleCompleteMessage(hc *common.HasuraConnection, queryId string) {
log.Debugf("%s (%s) with Id %s finished by Hasura.", queryType, operationName, queryId)
}
func handleConnectionAckMessage(hc *common.HasuraConnection, messageMap map[string]interface{}, fromHasuraToBrowserChannel *common.SafeChannel, fromBrowserToHasuraChannel *common.SafeChannel) {
func handleConnectionAckMessage(hc *common.HasuraConnection, message []byte, fromHasuraToBrowserChannel *common.SafeChannelByte, fromBrowserToHasuraChannel *common.SafeChannelByte) {
log.Debugf("Received connection_ack")
//Hasura connection was initialized, now it's able to send new messages to Hasura
fromBrowserToHasuraChannel.UnfreezeChannel()
//Avoid to send `connection_ack` to the browser when it's a reconnection
if hc.BrowserConn.ConnAckSentToBrowser == false {
fromHasuraToBrowserChannel.Send(messageMap)
fromHasuraToBrowserChannel.Send(message)
hc.BrowserConn.ConnAckSentToBrowser = true
}
go retransmiter.RetransmitSubscriptionStartMessages(hc, fromBrowserToHasuraChannel)
}
func getHasuraMessage(message []byte) (uint32, string, common.HasuraMessage) {
dataChecksum := crc32.ChecksumIEEE(message)
common.GlobalCacheLocks.Lock(dataChecksum)
dataKey, hasuraMessage, dataMapExists := common.GetHasuraMessageCache(dataChecksum)
if dataMapExists {
//Unlock immediately once the cache was already created by other routine
common.GlobalCacheLocks.Unlock(dataChecksum)
return dataChecksum, dataKey, hasuraMessage
} else {
//It will create the cache and then Unlock (others will wait to benefit from this cache)
defer common.GlobalCacheLocks.Unlock(dataChecksum)
}
err := json.Unmarshal(message, &hasuraMessage)
if err != nil {
log.Fatalf("Error unmarshalling JSON: %v", err)
}
for key := range hasuraMessage.Payload.Data {
dataKey = key
break
}
common.StoreHasuraMessageCache(dataChecksum, dataKey, hasuraMessage)
return dataChecksum, dataKey, hasuraMessage
}

View File

@ -2,11 +2,12 @@ package writer
import (
"context"
"encoding/json"
"errors"
"github.com/iMDT/bbb-graphql-middleware/internal/common"
"github.com/iMDT/bbb-graphql-middleware/internal/msgpatch"
log "github.com/sirupsen/logrus"
"nhooyr.io/websocket/wsjson"
"nhooyr.io/websocket"
"os"
"strings"
"sync"
@ -14,7 +15,7 @@ import (
// HasuraConnectionWriter
// process messages (middleware to hasura)
func HasuraConnectionWriter(hc *common.HasuraConnection, fromBrowserToHasuraChannel *common.SafeChannel, wg *sync.WaitGroup, initMessage map[string]interface{}) {
func HasuraConnectionWriter(hc *common.HasuraConnection, fromBrowserToHasuraChannel *common.SafeChannelByte, wg *sync.WaitGroup, initMessage []byte) {
log := log.WithField("_routine", "HasuraConnectionWriter")
browserConnection := hc.BrowserConn
@ -33,7 +34,7 @@ func HasuraConnectionWriter(hc *common.HasuraConnection, fromBrowserToHasuraChan
}
//Send init connection message to Hasura to start
err := wsjson.Write(hc.Context, hc.Websocket, initMessage)
err := hc.Websocket.Write(hc.Context, websocket.MessageText, initMessage)
if err != nil {
log.Errorf("error on write authentication (init) message (we're disconnected from hasura): %v", err)
return
@ -56,10 +57,17 @@ RangeLoop:
continue
}
var fromBrowserMessageAsMap = fromBrowserMessage.(map[string]interface{})
//var fromBrowserMessageAsMap = fromBrowserMessage.(map[string]interface{})
if fromBrowserMessageAsMap["type"] == "subscribe" {
var queryId = fromBrowserMessageAsMap["id"].(string)
var browserMessage common.BrowserSubscribeMessage
err := json.Unmarshal(fromBrowserMessage, &browserMessage)
if err != nil {
log.Errorf("failed to unmarshal message: %v", err)
return
}
if browserMessage.Type == "subscribe" {
var queryId = browserMessage.ID
//Identify type based on query string
messageType := common.Query
@ -67,26 +75,23 @@ RangeLoop:
streamCursorField := ""
streamCursorVariableName := ""
var streamCursorInitialValue interface{}
payload := fromBrowserMessageAsMap["payload"].(map[string]interface{})
operationName, ok := payload["operationName"].(string)
query, ok := payload["query"].(string)
if ok {
query := browserMessage.Payload.Query
if query != "" {
if strings.HasPrefix(query, "subscription") {
//Validate if subscription is allowed
if allowedSubscriptions := os.Getenv("BBB_GRAPHQL_MIDDLEWARE_ALLOWED_SUBSCRIPTIONS"); allowedSubscriptions != "" {
allowedSubscriptionsSlice := strings.Split(allowedSubscriptions, ",")
subscriptionAllowed := false
for _, s := range allowedSubscriptionsSlice {
if s == operationName {
if s == browserMessage.Payload.OperationName {
subscriptionAllowed = true
break
}
}
if !subscriptionAllowed {
log.Infof("Subscription %s not allowed!", operationName)
log.Infof("Subscription %s not allowed!", browserMessage.Payload.OperationName)
continue
}
}
@ -96,14 +101,14 @@ RangeLoop:
deniedSubscriptionsSlice := strings.Split(deniedSubscriptions, ",")
subscriptionAllowed := true
for _, s := range deniedSubscriptionsSlice {
if s == operationName {
if s == browserMessage.Payload.OperationName {
subscriptionAllowed = false
break
}
}
if !subscriptionAllowed {
log.Infof("Subscription %s not allowed!", operationName)
log.Infof("Subscription %s not allowed!", browserMessage.Payload.OperationName)
continue
}
}
@ -119,14 +124,15 @@ RangeLoop:
if strings.Contains(query, "_stream(") && strings.Contains(query, "cursor: {") {
messageType = common.Streaming
if !queryIdExists {
streamCursorField, streamCursorVariableName, streamCursorInitialValue = common.GetStreamCursorPropsFromQuery(payload, query)
streamCursorField, streamCursorVariableName, streamCursorInitialValue = common.GetStreamCursorPropsFromBrowserMessage(browserMessage)
//It's necessary to assure the cursor field will return in the result of the query
//To be able to store the last received cursor value
payload["query"] = common.PatchQueryIncludingCursorField(query, streamCursorField)
fromBrowserMessageAsMap["payload"] = payload
browserMessage.Payload.Query = common.PatchQueryIncludingCursorField(query, streamCursorField)
newMessageJson, _ := json.Marshal(browserMessage)
fromBrowserMessage = newMessageJson
}
}
@ -143,7 +149,7 @@ RangeLoop:
//Identify if the client that requested this subscription expects to receive json-patch
//Client append `Patched_` to the query operationName to indicate that it supports
jsonPatchSupported := false
if ok && strings.HasPrefix(operationName, "Patched_") {
if strings.HasPrefix(browserMessage.Payload.OperationName, "Patched_") {
jsonPatchSupported = true
}
if jsonPatchDisabled := os.Getenv("BBB_GRAPHQL_MIDDLEWARE_JSON_PATCH_DISABLED"); jsonPatchDisabled != "" {
@ -153,8 +159,8 @@ RangeLoop:
browserConnection.ActiveSubscriptionsMutex.Lock()
browserConnection.ActiveSubscriptions[queryId] = common.GraphQlSubscription{
Id: queryId,
Message: fromBrowserMessageAsMap,
OperationName: operationName,
Message: fromBrowserMessage,
OperationName: browserMessage.Payload.OperationName,
StreamCursorField: streamCursorField,
StreamCursorVariableName: streamCursorVariableName,
StreamCursorCurrValue: streamCursorInitialValue,
@ -166,7 +172,7 @@ RangeLoop:
// log.Tracef("Current queries: %v", browserConnection.ActiveSubscriptions)
browserConnection.ActiveSubscriptionsMutex.Unlock()
common.ActivitiesOverviewStarted(string(messageType) + "-" + operationName)
common.ActivitiesOverviewStarted(string(messageType) + "-" + browserMessage.Payload.OperationName)
common.ActivitiesOverviewStarted("_Sum-" + string(messageType))
//Dump of all subscriptions for analysis purpose
@ -174,36 +180,35 @@ RangeLoop:
//saveItToFile(fmt.Sprintf("%s-%s-%02s", string(messageType), operationName, queryId), fromBrowserMessageAsMap)
}
if fromBrowserMessageAsMap["type"] == "complete" {
var queryId = fromBrowserMessageAsMap["id"].(string)
if browserMessage.Type == "complete" {
browserConnection.ActiveSubscriptionsMutex.RLock()
jsonPatchSupported := browserConnection.ActiveSubscriptions[queryId].JsonPatchSupported
jsonPatchSupported := browserConnection.ActiveSubscriptions[browserMessage.ID].JsonPatchSupported
//Remove subscriptions from ActivitiesOverview here once Hasura-Reader will ignore "complete" msg for them
common.ActivitiesOverviewCompleted(string(browserConnection.ActiveSubscriptions[queryId].Type) + "-" + browserConnection.ActiveSubscriptions[queryId].OperationName)
common.ActivitiesOverviewCompleted("_Sum-" + string(browserConnection.ActiveSubscriptions[queryId].Type))
common.ActivitiesOverviewCompleted(string(browserConnection.ActiveSubscriptions[browserMessage.ID].Type) + "-" + browserConnection.ActiveSubscriptions[browserMessage.ID].OperationName)
common.ActivitiesOverviewCompleted("_Sum-" + string(browserConnection.ActiveSubscriptions[browserMessage.ID].Type))
browserConnection.ActiveSubscriptionsMutex.RUnlock()
if jsonPatchSupported {
msgpatch.RemoveConnSubscriptionCacheFile(browserConnection.Id, browserConnection.SessionToken, queryId)
msgpatch.RemoveConnSubscriptionCacheFile(browserConnection.Id, browserConnection.SessionToken, browserMessage.ID)
}
browserConnection.ActiveSubscriptionsMutex.Lock()
delete(browserConnection.ActiveSubscriptions, queryId)
delete(browserConnection.ActiveSubscriptions, browserMessage.ID)
// log.Tracef("Current queries: %v", browserConnection.ActiveSubscriptions)
browserConnection.ActiveSubscriptionsMutex.Unlock()
}
if fromBrowserMessageAsMap["type"] == "connection_init" {
if browserMessage.Type == "connection_init" {
//browserConnection.ConnectionInitMessage = fromBrowserMessageAsMap
//Skip message once it is handled by ConnInitHandler already
continue
}
log.Tracef("sending to hasura: %v", fromBrowserMessageAsMap)
err := wsjson.Write(hc.Context, hc.Websocket, fromBrowserMessageAsMap)
if err != nil {
if !errors.Is(err, context.Canceled) {
log.Errorf("error on write (we're disconnected from hasura): %v", err)
log.Tracef("sending to hasura: %v", fromBrowserMessage)
errWrite := hc.Websocket.Write(hc.Context, websocket.MessageText, fromBrowserMessage)
if errWrite != nil {
if !errors.Is(errWrite, context.Canceled) {
log.Errorf("error on write (we're disconnected from hasura): %v", errWrite)
}
return
}

View File

@ -5,7 +5,7 @@ import (
log "github.com/sirupsen/logrus"
)
func RetransmitSubscriptionStartMessages(hc *common.HasuraConnection, fromBrowserToHasuraChannel *common.SafeChannel) {
func RetransmitSubscriptionStartMessages(hc *common.HasuraConnection, fromBrowserToHasuraChannel *common.SafeChannelByte) {
log := log.WithField("_routine", "RetransmitSubscriptionStartMessages").WithField("browserConnectionId", hc.BrowserConn.Id).WithField("hasuraConnectionId", hc.Id)
hc.BrowserConn.ActiveSubscriptionsMutex.RLock()

View File

@ -8,7 +8,7 @@ import (
"io/ioutil"
"os"
"path/filepath"
"strings"
"strconv"
)
var cacheDir = os.TempDir() + "/graphql-middleware-cache/"
@ -137,79 +137,78 @@ func fileExists(filename string) bool {
return !os.IsNotExist(err)
}
func PatchMessage(
receivedMessage *map[string]interface{},
func GetPatchedMessage(
receivedMessage []byte,
subscriptionId string,
dataKey string,
lastDataAsJson []byte,
dataAsJson []byte,
lastHasuraMessage common.HasuraMessage,
hasuraMessage common.HasuraMessage,
browserConnectionId string,
browserSessionToken string,
cacheKey string,
cacheKey uint32,
lastDataChecksum uint32,
currDataChecksum uint32) {
currDataChecksum uint32) []byte {
if lastDataChecksum != 0 {
common.JsonPatchBenchmarkingStarted(cacheKey)
defer common.JsonPatchBenchmarkingCompleted(cacheKey)
common.JsonPatchBenchmarkingStarted(strconv.Itoa(int(cacheKey)))
defer common.JsonPatchBenchmarkingCompleted(strconv.Itoa(int(cacheKey)))
}
//Avoid other routines from processing the same JsonPatch
//Lock to avoid other routines from processing the same message
common.GlobalCacheLocks.Lock(cacheKey)
jsonDiffPatch, jsonDiffPatchExists := common.GetJsonPatchCache(cacheKey)
if jsonDiffPatchExists {
if patchedMessageCache, patchedMessageCacheExists := common.GetPatchedMessageCache(cacheKey); patchedMessageCacheExists {
//Unlock immediately once the cache was already created by other routine
common.GlobalCacheLocks.Unlock(cacheKey)
return patchedMessageCache
} else {
//It will create the cache and then Unlock (others will wait to benefit from this cache)
defer common.GlobalCacheLocks.Unlock(cacheKey)
}
var receivedMessageMap = *receivedMessage
var jsonDiffPatch []byte
if !jsonDiffPatchExists {
if currDataChecksum == lastDataChecksum {
//Content didn't change, set message as null to avoid sending it to the browser
//This case is usual when the middleware reconnects with Hasura and receives the data again
*receivedMessage = nil
} else {
//Content was changed, creating json patch
//If data is small (< minLengthToPatch) it's not worth creating the patch
if len(string(dataAsJson)) > minLengthToPatch {
if lastContent, lastContentErr := GetRawDataCache(browserConnectionId, browserSessionToken, subscriptionId, dataKey, lastDataAsJson); lastContentErr == nil && string(lastContent) != "" {
//Temporarily use CustomPatch only for UserList (testing feature)
if strings.HasPrefix(cacheKey, "subscription-Patched_UserListSubscription") &&
common.ValidateIfShouldUseCustomJsonPatch(lastContent, dataAsJson, "userId") {
jsonDiffPatch = common.CreateJsonPatch(lastContent, dataAsJson, "userId")
common.StoreJsonPatchCache(cacheKey, jsonDiffPatch)
} else if diffPatch, diffPatchErr := jsonpatch.CreatePatch(lastContent, dataAsJson); diffPatchErr == nil {
var err error
if jsonDiffPatch, err = json.Marshal(diffPatch); err == nil {
common.StoreJsonPatchCache(cacheKey, jsonDiffPatch)
} else {
log.Errorf("Error marshaling patch array: %v", err)
}
} else {
log.Errorf("Error creating JSON patch: %v\n%v", diffPatchErr, string(dataAsJson))
if currDataChecksum == lastDataChecksum {
//Content didn't change, set message as null to avoid sending it to the browser
//This case is usual when the middleware reconnects with Hasura and receives the data again
jsonData, _ := json.Marshal(nil)
common.StorePatchedMessageCache(cacheKey, jsonData)
return jsonData
} else {
//Content was changed, creating json patch
//If data is small (< minLengthToPatch) it's not worth creating the patch
if len(hasuraMessage.Payload.Data[dataKey]) > minLengthToPatch {
if lastContent, lastContentErr := GetRawDataCache(browserConnectionId, browserSessionToken, subscriptionId, dataKey, lastHasuraMessage.Payload.Data[dataKey]); lastContentErr == nil && string(lastContent) != "" {
var shouldUseCustomJsonPatch bool
if shouldUseCustomJsonPatch, jsonDiffPatch = common.ValidateIfShouldUseCustomJsonPatch(lastContent, hasuraMessage.Payload.Data[dataKey], "userId"); shouldUseCustomJsonPatch {
common.StorePatchedMessageCache(cacheKey, jsonDiffPatch)
} else if diffPatch, diffPatchErr := jsonpatch.CreatePatch(lastContent, hasuraMessage.Payload.Data[dataKey]); diffPatchErr == nil {
var err error
if jsonDiffPatch, err = json.Marshal(diffPatch); err != nil {
log.Errorf("Error marshaling patch array: %v", err)
}
} else {
log.Errorf("Error creating JSON patch: %v\n%v", diffPatchErr, string(hasuraMessage.Payload.Data[dataKey]))
}
}
}
}
//Use patch if the length is {minShrinkToUsePatch}% smaller than the original msg
if jsonDiffPatch != nil && float64(len(string(jsonDiffPatch)))/float64(len(string(dataAsJson))) < minShrinkToUsePatch {
if jsonDiffPatch != nil && float64(len(string(jsonDiffPatch)))/float64(len(string(hasuraMessage.Payload.Data[dataKey]))) < minShrinkToUsePatch {
//Modify receivedMessage to include the Patch and remove the previous data
//The key of the original message is kept to avoid errors (Apollo-client expects to receive this prop)
receivedMessageMap["payload"] = map[string]interface{}{
"data": map[string]interface{}{
"patch": json.RawMessage(jsonDiffPatch),
dataKey: json.RawMessage("[]"),
},
hasuraMessage.Payload.Data = map[string]json.RawMessage{
"patch": jsonDiffPatch,
dataKey: json.RawMessage("[]"),
}
*receivedMessage = receivedMessageMap
hasuraMessageJson, _ := json.Marshal(hasuraMessage)
receivedMessage = hasuraMessageJson
}
//Store current result to be used to create json patch in the future
StoreRawDataCache(browserConnectionId, browserSessionToken, subscriptionId, dataKey, dataAsJson)
StoreRawDataCache(browserConnectionId, browserSessionToken, subscriptionId, dataKey, hasuraMessage.Payload.Data[dataKey])
common.StorePatchedMessageCache(cacheKey, receivedMessage)
return receivedMessage
}

View File

@ -1,7 +1,9 @@
package websrv
import (
"bytes"
"context"
"encoding/json"
"fmt"
"github.com/iMDT/bbb-graphql-middleware/internal/akka_apps"
"github.com/iMDT/bbb-graphql-middleware/internal/bbb_web"
@ -95,10 +97,10 @@ func ConnectionHandler(w http.ResponseWriter, r *http.Request) {
log.Infof("connection accepted")
// Create channels
fromBrowserToHasuraConnectionEstablishingChannel := common.NewSafeChannel(bufferSize)
fromBrowserToHasuraChannel := common.NewSafeChannel(bufferSize)
fromBrowserToGqlActionsChannel := common.NewSafeChannel(bufferSize)
fromHasuraToBrowserChannel := common.NewSafeChannel(bufferSize)
fromBrowserToHasuraConnectionEstablishingChannel := common.NewSafeChannelByte(bufferSize)
fromBrowserToHasuraChannel := common.NewSafeChannelByte(bufferSize)
fromBrowserToGqlActionsChannel := common.NewSafeChannelByte(bufferSize)
fromHasuraToBrowserChannel := common.NewSafeChannelByte(bufferSize)
// Configure the wait group (to hold this routine execution until both are completed)
var wgAll sync.WaitGroup
@ -295,7 +297,7 @@ func refreshUserSessionVariables(browserConnection *common.BrowserConnection) er
func connectionInitHandler(
browserConnection *common.BrowserConnection,
fromBrowserToHasuraConnectionEstablishingChannel *common.SafeChannel) error {
fromBrowserToHasuraConnectionEstablishingChannel *common.SafeChannelByte) error {
BrowserConnectionsMutex.RLock()
browserConnectionId := browserConnection.Id
@ -309,9 +311,13 @@ func connectionInitHandler(
//Received all messages. Channel is closed
return fmt.Errorf("error on receiving init connection")
}
var fromBrowserMessageAsMap = fromBrowserMessage.(map[string]interface{})
if bytes.Contains(fromBrowserMessage, []byte("\"connection_init\"")) {
var fromBrowserMessageAsMap map[string]interface{}
if err := json.Unmarshal(fromBrowserMessage, &fromBrowserMessageAsMap); err != nil {
log.Errorf("failed to unmarshal message: %v", err)
continue
}
if fromBrowserMessageAsMap["type"] == "connection_init" {
var payloadAsMap = fromBrowserMessageAsMap["payload"].(map[string]interface{})
var headersAsMap = payloadAsMap["headers"].(map[string]interface{})
var sessionToken, existsSessionToken = headersAsMap["X-Session-Token"].(string)
@ -349,7 +355,7 @@ func connectionInitHandler(
browserConnection.ClientSessionUUID = clientSessionUUID
browserConnection.MeetingId = meetingId
browserConnection.UserId = userId
browserConnection.ConnectionInitMessage = fromBrowserMessageAsMap
browserConnection.ConnectionInitMessage = fromBrowserMessage
BrowserConnectionsMutex.Unlock()
refreshUserSessionVariables(browserConnection)

View File

@ -1,13 +1,13 @@
package reader
import (
"bytes"
"context"
"encoding/json"
"errors"
"github.com/iMDT/bbb-graphql-middleware/internal/common"
log "github.com/sirupsen/logrus"
"nhooyr.io/websocket"
"nhooyr.io/websocket/wsjson"
"strings"
"sync"
"time"
)
@ -17,9 +17,9 @@ func BrowserConnectionReader(
ctx context.Context,
ctxCancel context.CancelFunc,
browserWsConn *websocket.Conn,
fromBrowserToGqlActionsChannel *common.SafeChannel,
fromBrowserToHasuraChannel *common.SafeChannel,
fromBrowserToHasuraConnectionEstablishingChannel *common.SafeChannel,
fromBrowserToGqlActionsChannel *common.SafeChannelByte,
fromBrowserToHasuraChannel *common.SafeChannelByte,
fromBrowserToHasuraConnectionEstablishingChannel *common.SafeChannelByte,
waitGroups []*sync.WaitGroup) {
log := log.WithField("_routine", "BrowserConnectionReader").WithField("browserConnectionId", browserConnectionId)
defer log.Debugf("finished")
@ -43,8 +43,7 @@ func BrowserConnectionReader(
defer ctxCancel()
for {
var v interface{}
err := wsjson.Read(ctx, browserWsConn, &v)
messageType, message, err := browserWsConn.Read(ctx)
if err != nil {
if errors.Is(err, context.Canceled) {
log.Debugf("Closing Browser ws connection as Context was cancelled!")
@ -54,26 +53,30 @@ func BrowserConnectionReader(
return
}
log.Tracef("received from browser: %v", v)
log.Tracef("received from browser: %v", message)
if v == nil {
if messageType != websocket.MessageText {
log.Warnf("received non-text message: %v", messageType)
continue
}
var fromBrowserMessageAsMap = v.(map[string]interface{})
var browserMessageType struct {
Type string `json:"type"`
}
err = json.Unmarshal(message, &browserMessageType)
if err != nil {
log.Errorf("failed to unmarshal message: %v", err)
return
}
if payload, ok := fromBrowserMessageAsMap["payload"].(map[string]interface{}); ok {
if query, okQuery := payload["query"].(string); okQuery {
//Forward Mutations directly to GraphqlActions
//Update mutations must be handled by Hasura
if strings.HasPrefix(query, "mutation") && !strings.Contains(query, "update_") {
fromBrowserToGqlActionsChannel.Send(v)
continue
}
if browserMessageType.Type == "subscribe" {
if bytes.Contains(message, []byte("\"query\":\"mutation")) && !bytes.Contains(message, []byte("update_")) {
fromBrowserToGqlActionsChannel.Send(message)
continue
}
}
fromBrowserToHasuraChannel.Send(v)
fromBrowserToHasuraConnectionEstablishingChannel.Send(v)
fromBrowserToHasuraChannel.Send(message)
fromBrowserToHasuraConnectionEstablishingChannel.Send(message)
}
}

View File

@ -2,14 +2,20 @@ package writer
import (
"context"
"encoding/json"
"github.com/iMDT/bbb-graphql-middleware/internal/common"
log "github.com/sirupsen/logrus"
"nhooyr.io/websocket"
"nhooyr.io/websocket/wsjson"
"strings"
"sync"
)
func BrowserConnectionWriter(browserConnectionId string, ctx context.Context, browserWsConn *websocket.Conn, fromHasuraToBrowserChannel *common.SafeChannel, wg *sync.WaitGroup) {
func BrowserConnectionWriter(
browserConnectionId string,
ctx context.Context,
browserWsConn *websocket.Conn,
fromHasuraToBrowserChannel *common.SafeChannelByte,
wg *sync.WaitGroup) {
log := log.WithField("_routine", "BrowserConnectionWriter").WithField("browserConnectionId", browserConnectionId)
defer log.Debugf("finished")
log.Debugf("starting")
@ -30,10 +36,8 @@ RangeLoop:
continue
}
var toBrowserMessageAsMap = toBrowserMessage.(map[string]interface{})
log.Tracef("sending to browser: %v", toBrowserMessage)
err := wsjson.Write(ctx, browserWsConn, toBrowserMessage)
err := browserWsConn.Write(ctx, websocket.MessageText, toBrowserMessage)
if err != nil {
log.Debugf("Browser is disconnected, skipping writing of ws message: %v", err)
return
@ -41,9 +45,15 @@ RangeLoop:
// After the error is sent to client, close its connection
// Authentication hook unauthorized this request
if toBrowserMessageAsMap["type"] == "connection_error" {
var payloadAsString = toBrowserMessageAsMap["payload"].(string)
browserWsConn.Close(websocket.StatusInternalError, payloadAsString)
if strings.Contains(string(toBrowserMessage), "connection_error") {
type HasuraMessage struct {
Type string `json:"type"`
}
var hasuraMessage HasuraMessage
_ = json.Unmarshal(toBrowserMessage, &hasuraMessage)
if hasuraMessage.Type == "connection_error" {
_ = browserWsConn.Close(websocket.StatusInternalError, string(toBrowserMessage))
}
}
}
}