136 lines
4.6 KiB
Go
136 lines
4.6 KiB
Go
|
package common
|
||
|
|
||
|
import (
|
||
|
"fmt"
|
||
|
"regexp"
|
||
|
"strconv"
|
||
|
"strings"
|
||
|
)
|
||
|
|
||
|
func GetStreamCursorPropsFromQuery(payload map[string]interface{}, query string) (string, string, interface{}) {
|
||
|
streamCursorField := ""
|
||
|
streamCursorVariableName := ""
|
||
|
var streamCursorInitialValue interface{}
|
||
|
|
||
|
cursorInitialValueRePattern := regexp.MustCompile(`cursor:\s*\{\s*initial_value\s*:\s*\{\s*([^:]+):\s*([^}]+)\s*}\s*}`)
|
||
|
matches := cursorInitialValueRePattern.FindStringSubmatch(query)
|
||
|
if matches != nil {
|
||
|
streamCursorField = matches[1]
|
||
|
if strings.HasPrefix(matches[2], "$") {
|
||
|
streamCursorVariableName, _ = strings.CutPrefix(matches[2], "$")
|
||
|
if variables, okVariables := payload["variables"].(map[string]interface{}); okVariables {
|
||
|
if targetVariableValue, okTargetVariableValue := variables[streamCursorVariableName]; okTargetVariableValue {
|
||
|
streamCursorInitialValue = targetVariableValue
|
||
|
}
|
||
|
}
|
||
|
} else {
|
||
|
streamCursorInitialValue = matches[2]
|
||
|
}
|
||
|
}
|
||
|
|
||
|
return streamCursorField, streamCursorVariableName, streamCursorInitialValue
|
||
|
}
|
||
|
|
||
|
func GetLastStreamCursorValueFromReceivedMessage(messageAsMap map[string]interface{}, streamCursorField string) interface{} {
|
||
|
var lastStreamCursorValue interface{}
|
||
|
|
||
|
if payload, okPayload := messageAsMap["payload"].(map[string]interface{}); okPayload {
|
||
|
if data, okData := payload["data"].(map[string]interface{}); okData {
|
||
|
//Data will have only one prop, `range` because its name is unknown
|
||
|
for _, dataItem := range data {
|
||
|
currentDataProp, okCurrentDataProp := dataItem.([]interface{})
|
||
|
if okCurrentDataProp && len(currentDataProp) > 0 {
|
||
|
// Get the last item directly (once it will contain the last cursor value)
|
||
|
lastItemOfMessage := currentDataProp[len(currentDataProp)-1]
|
||
|
if lastItemOfMessageAsMap, currDataOk := lastItemOfMessage.(map[string]interface{}); currDataOk {
|
||
|
if lastItemValue, okLastItemValue := lastItemOfMessageAsMap[streamCursorField]; okLastItemValue {
|
||
|
lastStreamCursorValue = lastItemValue
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
return lastStreamCursorValue
|
||
|
}
|
||
|
|
||
|
func PatchQueryIncludingCursorField(originalQuery string, cursorField string) string {
|
||
|
if cursorField == "" {
|
||
|
return originalQuery
|
||
|
}
|
||
|
|
||
|
lastIndex := strings.LastIndex(originalQuery, "{")
|
||
|
if lastIndex == -1 {
|
||
|
return originalQuery
|
||
|
}
|
||
|
|
||
|
// It will include the cursorField at the beginning of the list of fields
|
||
|
// It's not a problem if the field be duplicated in the list, Hasura just ignore the second occurrence
|
||
|
return originalQuery[:lastIndex+1] + "\n " + cursorField + originalQuery[lastIndex+1:]
|
||
|
}
|
||
|
|
||
|
func PatchQuerySettingLastCursorValue(subscription GraphQlSubscription) interface{} {
|
||
|
message := subscription.Message
|
||
|
payload, okPayload := message["payload"].(map[string]interface{})
|
||
|
|
||
|
if okPayload {
|
||
|
if subscription.StreamCursorVariableName != "" {
|
||
|
/**** This stream has its cursor value set through variables ****/
|
||
|
if variables, okVariables := payload["variables"].(map[string]interface{}); okVariables {
|
||
|
if variables[subscription.StreamCursorVariableName] != subscription.StreamCursorCurrValue {
|
||
|
variables[subscription.StreamCursorVariableName] = subscription.StreamCursorCurrValue
|
||
|
payload["variables"] = variables
|
||
|
message["payload"] = payload
|
||
|
}
|
||
|
}
|
||
|
} else {
|
||
|
/**** This stream has its cursor value set through inline value (not variables) ****/
|
||
|
query, okQuery := payload["query"].(string)
|
||
|
if okQuery {
|
||
|
cursorInitialValueRePattern := regexp.MustCompile(`cursor:\s*\{\s*initial_value\s*:\s*\{\s*([^:]+:\s*[^}]+)\s*}\s*}`)
|
||
|
newValue := ""
|
||
|
|
||
|
replaceInitialValueFunc := func(match string) string {
|
||
|
switch v := subscription.StreamCursorCurrValue.(type) {
|
||
|
case string:
|
||
|
newValue = v
|
||
|
|
||
|
//Append quotes if it is missing, it will be necessary when appending to the query
|
||
|
if !strings.HasPrefix(v, "\"") {
|
||
|
newValue = "\"" + newValue
|
||
|
}
|
||
|
if !strings.HasSuffix(v, "\"") {
|
||
|
newValue = newValue + "\""
|
||
|
}
|
||
|
case int:
|
||
|
newValue = strconv.Itoa(v)
|
||
|
case float32:
|
||
|
myFloat64 := float64(v)
|
||
|
newValue = strconv.FormatFloat(myFloat64, 'f', -1, 32)
|
||
|
case float64:
|
||
|
newValue = strconv.FormatFloat(v, 'f', -1, 64)
|
||
|
default:
|
||
|
newValue = ""
|
||
|
}
|
||
|
|
||
|
if newValue != "" {
|
||
|
replacement := subscription.StreamCursorField + ": " + newValue
|
||
|
return fmt.Sprintf("cursor: {initial_value: {%s}}", replacement)
|
||
|
} else {
|
||
|
return match
|
||
|
}
|
||
|
}
|
||
|
|
||
|
newQuery := cursorInitialValueRePattern.ReplaceAllStringFunc(query, replaceInitialValueFunc)
|
||
|
if query != newQuery {
|
||
|
payload["query"] = newQuery
|
||
|
message["payload"] = payload
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
return message
|
||
|
}
|