Refactor the logger to add more context like meetingId, userId and sessionToken. (#21188)
For DEBUG logs it also includes the file/line and function of the caller.
This commit is contained in:
parent
a868eccd4f
commit
cccd2d78c7
@ -19,10 +19,12 @@ func main() {
|
||||
// Configure logger
|
||||
if logLevelFromConfig, err := log.ParseLevel(cfg.LogLevel); err == nil {
|
||||
log.SetLevel(logLevelFromConfig)
|
||||
if logLevelFromConfig > log.InfoLevel {
|
||||
log.SetReportCaller(true)
|
||||
}
|
||||
} else {
|
||||
log.SetLevel(log.InfoLevel)
|
||||
}
|
||||
|
||||
log.SetFormatter(&log.JSONFormatter{})
|
||||
log := log.WithField("_routine", "main")
|
||||
|
||||
|
@ -14,7 +14,10 @@ import (
|
||||
var sessionVarsHookUrl = config.GetConfig().SessionVarsHook.Url
|
||||
|
||||
func AkkaAppsGetSessionVariablesFrom(browserConnectionId string, sessionToken string) (map[string]string, error) {
|
||||
logger := log.WithField("_routine", "AkkaAppsClient").WithField("browserConnectionId", browserConnectionId)
|
||||
logger := log.WithField("_routine", "AkkaAppsClient").
|
||||
WithField("browserConnectionId", browserConnectionId).
|
||||
WithField("sessionToken", sessionToken)
|
||||
|
||||
logger.Debug("Starting AkkaAppsClient")
|
||||
defer logger.Debug("Finished AkkaAppsClient")
|
||||
|
||||
|
@ -15,7 +15,10 @@ import (
|
||||
var authHookUrl = config.GetConfig().AuthHook.Url
|
||||
|
||||
func BBBWebCheckAuthorization(browserConnectionId string, sessionToken string, cookies []*http.Cookie) (string, string, error) {
|
||||
logger := log.WithField("_routine", "BBBWebClient").WithField("browserConnectionId", browserConnectionId)
|
||||
logger := log.WithField("_routine", "BBBWebClient").
|
||||
WithField("browserConnectionId", browserConnectionId).
|
||||
WithField("sessionToken", sessionToken)
|
||||
|
||||
logger.Debug("Starting BBBWebClient")
|
||||
defer logger.Debug("Finished BBBWebClient")
|
||||
|
||||
|
@ -3,6 +3,7 @@ package common
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"github.com/sirupsen/logrus"
|
||||
"net/http"
|
||||
"sync"
|
||||
|
||||
@ -55,6 +56,7 @@ type BrowserConnection struct {
|
||||
FromBrowserToHasuraChannel *SafeChannelByte // channel to transmit messages from Browser to Hasura
|
||||
FromBrowserToGqlActionsChannel *SafeChannelByte // channel to transmit messages from Browser to Graphq-Actions
|
||||
FromHasuraToBrowserChannel *SafeChannelByte // channel to transmit messages from Hasura/GqlActions to Browser
|
||||
Logger *logrus.Entry // connection logger populated with connection info
|
||||
}
|
||||
|
||||
type HasuraConnection struct {
|
||||
|
@ -19,10 +19,8 @@ var graphqlActionsUrl = config.GetConfig().GraphqlActions.Url
|
||||
|
||||
func GraphqlActionsClient(
|
||||
browserConnection *common.BrowserConnection) error {
|
||||
|
||||
log := log.WithField("_routine", "GraphqlActionsClient").WithField("browserConnectionId", browserConnection.Id)
|
||||
log.Debug("Starting GraphqlActionsClient")
|
||||
defer log.Debug("Finished GraphqlActionsClient")
|
||||
browserConnection.Logger.Debug("Starting GraphqlActionsClient")
|
||||
defer browserConnection.Logger.Debug("Finished GraphqlActionsClient")
|
||||
|
||||
RangeLoop:
|
||||
for {
|
||||
@ -30,7 +28,7 @@ RangeLoop:
|
||||
case <-browserConnection.Context.Done():
|
||||
break RangeLoop
|
||||
case <-browserConnection.GraphqlActionsContext.Done():
|
||||
log.Debug("GraphqlActionsContext cancelled!")
|
||||
browserConnection.Logger.Debug("GraphqlActionsContext cancelled!")
|
||||
break RangeLoop
|
||||
case fromBrowserMessage := <-browserConnection.FromBrowserToGqlActionsChannel.ReceiveChannel():
|
||||
{
|
||||
@ -41,7 +39,7 @@ RangeLoop:
|
||||
var browserMessage common.BrowserSubscribeMessage
|
||||
err := json.Unmarshal(fromBrowserMessage, &browserMessage)
|
||||
if err != nil {
|
||||
log.Errorf("failed to unmarshal message: %v", err)
|
||||
browserConnection.Logger.Errorf("failed to unmarshal message: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
@ -52,16 +50,16 @@ RangeLoop:
|
||||
if strings.HasPrefix(browserMessage.Payload.Query, "mutation") {
|
||||
if funcName, inputs, err := parseGraphQLMutation(browserMessage.Payload.Query, browserMessage.Payload.Variables); err == nil {
|
||||
mutationFuncName = funcName
|
||||
if err = SendGqlActionsRequest(funcName, inputs, browserConnection.BBBWebSessionVariables, log); err == nil {
|
||||
if err = SendGqlActionsRequest(funcName, inputs, browserConnection.BBBWebSessionVariables, browserConnection.Logger); err == nil {
|
||||
//Add Prometheus Metrics
|
||||
common.GqlMutationsCounter.With(prometheus.Labels{"operationName": browserMessage.Payload.OperationName}).Inc()
|
||||
} else {
|
||||
errorMessage = err.Error()
|
||||
log.Error("It was not able to send the request to Graphql Actions", err)
|
||||
browserConnection.Logger.Error("It was not able to send the request to Graphql Actions", err)
|
||||
}
|
||||
} else {
|
||||
errorMessage = "It was not able to parse graphQL query"
|
||||
log.Error("It was not able to parse graphQL query", err)
|
||||
browserConnection.Logger.Error("It was not able to parse graphQL query", err)
|
||||
}
|
||||
}
|
||||
|
||||
@ -111,8 +109,8 @@ RangeLoop:
|
||||
return nil
|
||||
}
|
||||
|
||||
func SendGqlActionsRequest(funcName string, inputs map[string]interface{}, sessionVariables map[string]string, logger *log.Entry) error {
|
||||
logger = logger.WithField("funcName", funcName).WithField("inputs", inputs)
|
||||
func SendGqlActionsRequest(funcName string, inputs map[string]interface{}, sessionVariables map[string]string, bcLogger *log.Entry) error {
|
||||
logger := bcLogger.WithField("funcName", funcName).WithField("inputs", inputs)
|
||||
|
||||
data := GqlActionsRequestBody{
|
||||
Action: GqlActionsAction{
|
||||
|
@ -6,7 +6,6 @@ import (
|
||||
"bbb-graphql-middleware/internal/hasura/conn/writer"
|
||||
"context"
|
||||
"fmt"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"math"
|
||||
"net/http"
|
||||
"net/http/cookiejar"
|
||||
@ -24,14 +23,14 @@ var hasuraEndpoint = config.GetConfig().Hasura.Url
|
||||
// Hasura client connection
|
||||
func HasuraClient(
|
||||
browserConnection *common.BrowserConnection) error {
|
||||
log := log.WithField("_routine", "HasuraClient").WithField("browserConnectionId", browserConnection.Id)
|
||||
|
||||
// Obtain id for this connection
|
||||
lastHasuraConnectionId++
|
||||
hasuraConnectionId := "HC" + fmt.Sprintf("%010d", lastHasuraConnectionId)
|
||||
log = log.WithField("hasuraConnectionId", hasuraConnectionId)
|
||||
|
||||
defer log.Debugf("finished")
|
||||
browserConnection.Logger = browserConnection.Logger.WithField("hasuraConnectionId", hasuraConnectionId)
|
||||
|
||||
defer browserConnection.Logger.Debugf("finished")
|
||||
|
||||
// Add sub-protocol
|
||||
var dialOptions websocket.DialOptions
|
||||
@ -94,7 +93,7 @@ func HasuraClient(
|
||||
thisConnection.Websocket = hasuraWsConn
|
||||
|
||||
// Log the connection success
|
||||
log.Debugf("connected with Hasura")
|
||||
browserConnection.Logger.Info("connected with Hasura")
|
||||
|
||||
// Configure the wait group
|
||||
var wg sync.WaitGroup
|
||||
|
@ -9,7 +9,7 @@ import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/sirupsen/logrus"
|
||||
"hash/crc32"
|
||||
"nhooyr.io/websocket"
|
||||
"sync"
|
||||
@ -17,9 +17,8 @@ import (
|
||||
|
||||
// HasuraConnectionReader consumes messages from Hasura connection and add send to the browser channel
|
||||
func HasuraConnectionReader(hc *common.HasuraConnection, wg *sync.WaitGroup) {
|
||||
log := log.WithField("_routine", "HasuraConnectionReader").WithField("browserConnectionId", hc.BrowserConn.Id).WithField("hasuraConnectionId", hc.Id)
|
||||
defer log.Debugf("finished")
|
||||
log.Debugf("starting")
|
||||
defer hc.BrowserConn.Logger.Debugf("finished")
|
||||
hc.BrowserConn.Logger.Debugf("starting")
|
||||
|
||||
defer wg.Done()
|
||||
defer hc.ContextCancelFunc()
|
||||
@ -30,10 +29,10 @@ func HasuraConnectionReader(hc *common.HasuraConnection, wg *sync.WaitGroup) {
|
||||
|
||||
if err != nil {
|
||||
if errors.Is(err, context.Canceled) {
|
||||
log.Debugf("Closing Hasura ws connection as Context was cancelled!")
|
||||
hc.BrowserConn.Logger.Debugf("Closing Hasura ws connection as Context was cancelled!")
|
||||
} else if errors.As(err, &closeError) {
|
||||
hc.WebsocketCloseError = closeError
|
||||
log.Debug("WebSocket connection closed: status = %v, reason = %s", closeError.Code, closeError.Reason)
|
||||
hc.BrowserConn.Logger.Debug("WebSocket connection closed: status = %v, reason = %s", closeError.Code, closeError.Reason)
|
||||
//TODO check if it should send {"type":"connection_error","payload":"Authentication hook unauthorized this request"}
|
||||
} else {
|
||||
if websocket.CloseStatus(err) == -1 {
|
||||
@ -46,17 +45,17 @@ func HasuraConnectionReader(hc *common.HasuraConnection, wg *sync.WaitGroup) {
|
||||
}
|
||||
}
|
||||
|
||||
log.Debugf("Error reading message from Hasura: %v", err)
|
||||
hc.BrowserConn.Logger.Debugf("Error reading message from Hasura: %v", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
if messageType != websocket.MessageText {
|
||||
log.Warnf("received non-text message: %v", messageType)
|
||||
hc.BrowserConn.Logger.Warnf("received non-text message: %v", messageType)
|
||||
continue
|
||||
}
|
||||
|
||||
log.Tracef("received from hasura: %s", string(message))
|
||||
hc.BrowserConn.Logger.Tracef("received from hasura: %s", string(message))
|
||||
|
||||
handleMessageReceivedFromHasura(hc, message)
|
||||
}
|
||||
@ -72,7 +71,7 @@ func handleMessageReceivedFromHasura(hc *common.HasuraConnection, message []byte
|
||||
var hasuraMessageInfo HasuraMessageInfo
|
||||
err := json.Unmarshal(message, &hasuraMessageInfo)
|
||||
if err != nil {
|
||||
log.Errorf("failed to unmarshal message: %v", err)
|
||||
hc.BrowserConn.Logger.Errorf("failed to unmarshal message: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
@ -85,7 +84,7 @@ func handleMessageReceivedFromHasura(hc *common.HasuraConnection, message []byte
|
||||
subscription, ok := hc.BrowserConn.ActiveSubscriptions[hasuraMessageInfo.ID]
|
||||
hc.BrowserConn.ActiveSubscriptionsMutex.RUnlock()
|
||||
if !ok {
|
||||
log.Debugf("Subscription with Id %s doesn't exist anymore, skipping response.", hasuraMessageInfo.ID)
|
||||
hc.BrowserConn.Logger.Debugf("Subscription with Id %s doesn't exist anymore, skipping response.", hasuraMessageInfo.ID)
|
||||
return
|
||||
}
|
||||
|
||||
@ -141,7 +140,7 @@ func handleMessageReceivedFromHasura(hc *common.HasuraConnection, message []byte
|
||||
}
|
||||
|
||||
func handleSubscriptionMessage(hc *common.HasuraConnection, message *[]byte, subscription common.GraphQlSubscription, queryId string) bool {
|
||||
dataChecksum, messageDataKey, messageData := getHasuraMessage(*message, subscription)
|
||||
dataChecksum, messageDataKey, messageData := getHasuraMessage(*message, subscription, hc.BrowserConn.Logger)
|
||||
|
||||
//Check whether ReceivedData is different from the LastReceivedData
|
||||
//Otherwise stop forwarding this message
|
||||
@ -189,11 +188,11 @@ func handleCompleteMessage(hc *common.HasuraConnection, queryId string) {
|
||||
operationName := hc.BrowserConn.ActiveSubscriptions[queryId].OperationName
|
||||
delete(hc.BrowserConn.ActiveSubscriptions, queryId)
|
||||
hc.BrowserConn.ActiveSubscriptionsMutex.Unlock()
|
||||
log.Debugf("%s (%s) with Id %s finished by Hasura.", queryType, operationName, queryId)
|
||||
hc.BrowserConn.Logger.Debugf("%s (%s) with Id %s finished by Hasura.", queryType, operationName, queryId)
|
||||
}
|
||||
|
||||
func handleConnectionAckMessage(hc *common.HasuraConnection, message []byte) {
|
||||
log.Debugf("Received connection_ack")
|
||||
hc.BrowserConn.Logger.Debugf("Received connection_ack")
|
||||
//Hasura connection was initialized, now it's able to send new messages to Hasura
|
||||
hc.BrowserConn.FromBrowserToHasuraChannel.UnfreezeChannel()
|
||||
|
||||
@ -206,7 +205,7 @@ func handleConnectionAckMessage(hc *common.HasuraConnection, message []byte) {
|
||||
go retransmiter.RetransmitSubscriptionStartMessages(hc)
|
||||
}
|
||||
|
||||
func getHasuraMessage(message []byte, subscription common.GraphQlSubscription) (uint32, string, common.HasuraMessage) {
|
||||
func getHasuraMessage(message []byte, subscription common.GraphQlSubscription, logger *logrus.Entry) (uint32, string, common.HasuraMessage) {
|
||||
dataChecksum := crc32.ChecksumIEEE(message)
|
||||
|
||||
common.GlobalCacheLocks.Lock(dataChecksum)
|
||||
@ -219,7 +218,7 @@ func getHasuraMessage(message []byte, subscription common.GraphQlSubscription) (
|
||||
|
||||
err := json.Unmarshal(message, &hasuraMessage)
|
||||
if err != nil {
|
||||
log.Fatalf("Error unmarshalling JSON: %v", err)
|
||||
logger.Fatalf("Error unmarshalling JSON: %v", err)
|
||||
}
|
||||
|
||||
for key := range hasuraMessage.Payload.Data {
|
||||
|
@ -7,7 +7,6 @@ import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"nhooyr.io/websocket"
|
||||
"strings"
|
||||
"sync"
|
||||
@ -30,27 +29,23 @@ func init() {
|
||||
// HasuraConnectionWriter
|
||||
// process messages (middleware to hasura)
|
||||
func HasuraConnectionWriter(hc *common.HasuraConnection, wg *sync.WaitGroup, initMessage []byte) {
|
||||
log := log.WithField("_routine", "HasuraConnectionWriter")
|
||||
|
||||
browserConnection := hc.BrowserConn
|
||||
|
||||
log = log.WithField("browserConnectionId", browserConnection.Id).WithField("hasuraConnectionId", hc.Id)
|
||||
|
||||
defer wg.Done()
|
||||
defer hc.ContextCancelFunc()
|
||||
defer log.Debugf("finished")
|
||||
defer hc.BrowserConn.Logger.Debugf("finished")
|
||||
|
||||
//Send authentication (init) message at first
|
||||
//It will not use the channel (fromBrowserToHasuraChannel) because this msg must bypass ChannelFreeze
|
||||
if initMessage == nil {
|
||||
log.Errorf("it can't start Hasura Connection because initMessage is null")
|
||||
hc.BrowserConn.Logger.Errorf("it can't start Hasura Connection because initMessage is null")
|
||||
return
|
||||
}
|
||||
|
||||
//Send init connection message to Hasura to start
|
||||
err := hc.Websocket.Write(hc.Context, websocket.MessageText, initMessage)
|
||||
if err != nil {
|
||||
log.Errorf("error on write authentication (init) message (we're disconnected from hasura): %v", err)
|
||||
hc.BrowserConn.Logger.Errorf("error on write authentication (init) message (we're disconnected from hasura): %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
@ -70,7 +65,7 @@ RangeLoop:
|
||||
var browserMessage common.BrowserSubscribeMessage
|
||||
err := json.Unmarshal(fromBrowserMessage, &browserMessage)
|
||||
if err != nil {
|
||||
log.Errorf("failed to unmarshal message: %v", err)
|
||||
hc.BrowserConn.Logger.Errorf("failed to unmarshal message: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
@ -98,7 +93,7 @@ RangeLoop:
|
||||
}
|
||||
|
||||
if !subscriptionAllowed {
|
||||
log.Infof("Subscription %s not allowed!", browserMessage.Payload.OperationName)
|
||||
hc.BrowserConn.Logger.Infof("Subscription %s not allowed!", browserMessage.Payload.OperationName)
|
||||
continue
|
||||
}
|
||||
}
|
||||
@ -114,7 +109,7 @@ RangeLoop:
|
||||
}
|
||||
|
||||
if !subscriptionAllowed {
|
||||
log.Infof("Subscription %s not allowed!", browserMessage.Payload.OperationName)
|
||||
hc.BrowserConn.Logger.Infof("Subscription %s not allowed!", browserMessage.Payload.OperationName)
|
||||
continue
|
||||
}
|
||||
}
|
||||
@ -175,7 +170,7 @@ RangeLoop:
|
||||
Type: messageType,
|
||||
LastReceivedDataChecksum: lastReceivedDataChecksum,
|
||||
}
|
||||
// log.Tracef("Current queries: %v", browserConnection.ActiveSubscriptions)
|
||||
// hc.BrowserConn.Logger.Tracef("Current queries: %v", browserConnection.ActiveSubscriptions)
|
||||
browserConnection.ActiveSubscriptionsMutex.Unlock()
|
||||
|
||||
//Add Prometheus Metrics
|
||||
@ -198,7 +193,7 @@ RangeLoop:
|
||||
browserConnection.ActiveSubscriptionsMutex.RUnlock()
|
||||
browserConnection.ActiveSubscriptionsMutex.Lock()
|
||||
delete(browserConnection.ActiveSubscriptions, browserMessage.ID)
|
||||
// log.Tracef("Current queries: %v", browserConnection.ActiveSubscriptions)
|
||||
// hc.BrowserConn.Logger.Tracef("Current queries: %v", browserConnection.ActiveSubscriptions)
|
||||
browserConnection.ActiveSubscriptionsMutex.Unlock()
|
||||
}
|
||||
|
||||
@ -208,11 +203,11 @@ RangeLoop:
|
||||
continue
|
||||
}
|
||||
|
||||
log.Tracef("sending to hasura: %s", string(fromBrowserMessage))
|
||||
hc.BrowserConn.Logger.Tracef("sending to hasura: %s", string(fromBrowserMessage))
|
||||
errWrite := hc.Websocket.Write(hc.Context, websocket.MessageText, fromBrowserMessage)
|
||||
if errWrite != nil {
|
||||
if !errors.Is(errWrite, context.Canceled) {
|
||||
log.Errorf("error on write (we're disconnected from hasura): %v", errWrite)
|
||||
hc.BrowserConn.Logger.Errorf("error on write (we're disconnected from hasura): %v", errWrite)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
@ -2,12 +2,9 @@ package retransmiter
|
||||
|
||||
import (
|
||||
"bbb-graphql-middleware/internal/common"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
func RetransmitSubscriptionStartMessages(hc *common.HasuraConnection) {
|
||||
log := log.WithField("_routine", "RetransmitSubscriptionStartMessages").WithField("browserConnectionId", hc.BrowserConn.Id).WithField("hasuraConnectionId", hc.Id)
|
||||
|
||||
hc.BrowserConn.ActiveSubscriptionsMutex.RLock()
|
||||
defer hc.BrowserConn.ActiveSubscriptionsMutex.RUnlock()
|
||||
|
||||
@ -26,12 +23,12 @@ func RetransmitSubscriptionStartMessages(hc *common.HasuraConnection) {
|
||||
if !userCurrentlyInMeeting &&
|
||||
subscription.OperationName != "getUserInfo" &&
|
||||
subscription.OperationName != "getUserCurrent" {
|
||||
log.Debugf("Skipping retransmit %s because the user is offline", subscription.OperationName)
|
||||
hc.BrowserConn.Logger.Debugf("Skipping retransmit %s because the user is offline", subscription.OperationName)
|
||||
continue
|
||||
}
|
||||
|
||||
if subscription.LastSeenOnHasuraConnection != hc.Id {
|
||||
log.Tracef("retransmiting subscription start: %v", string(subscription.Message))
|
||||
hc.BrowserConn.Logger.Tracef("retransmiting subscription start: %v", string(subscription.Message))
|
||||
|
||||
if subscription.Type == common.Streaming && subscription.StreamCursorCurrValue != nil {
|
||||
hc.BrowserConn.FromBrowserToHasuraChannel.Send(common.PatchQuerySettingLastCursorValue(subscription))
|
||||
|
@ -14,7 +14,7 @@ import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/sirupsen/logrus"
|
||||
"net/http"
|
||||
"nhooyr.io/websocket"
|
||||
"strings"
|
||||
@ -34,12 +34,24 @@ var BrowserConnectionsMutex = &sync.RWMutex{}
|
||||
// Handle client connection
|
||||
// This is the connection that comes from browser
|
||||
func ConnectionHandler(w http.ResponseWriter, r *http.Request) {
|
||||
log := log.WithField("_routine", "ConnectionHandler")
|
||||
|
||||
// Configure logger
|
||||
newLogger := logrus.New()
|
||||
cfg := config.GetConfig()
|
||||
if logLevelFromConfig, err := logrus.ParseLevel(cfg.LogLevel); err == nil {
|
||||
newLogger.SetLevel(logLevelFromConfig)
|
||||
if logLevelFromConfig > logrus.InfoLevel {
|
||||
newLogger.SetReportCaller(true)
|
||||
}
|
||||
} else {
|
||||
newLogger.SetLevel(logrus.InfoLevel)
|
||||
}
|
||||
newLogger.SetFormatter(&logrus.JSONFormatter{})
|
||||
|
||||
// Obtain id for this connection
|
||||
lastBrowserConnectionId++
|
||||
browserConnectionId := "BC" + fmt.Sprintf("%010d", lastBrowserConnectionId)
|
||||
log = log.WithField("browserConnectionId", browserConnectionId)
|
||||
connectionLogger := newLogger.WithField("browserConnectionId", browserConnectionId)
|
||||
|
||||
// Starts a context that will be dependent on the connection, so we can cancel subroutines when the connection is dropped
|
||||
browserConnectionContext, browserConnectionContextCancel := context.WithCancel(r.Context())
|
||||
@ -57,7 +69,7 @@ func ConnectionHandler(w http.ResponseWriter, r *http.Request) {
|
||||
browserWsConn, err := websocket.Accept(w, r, &acceptOptions)
|
||||
browserWsConn.SetReadLimit(9999999) //10MB
|
||||
if err != nil {
|
||||
log.Errorf("error: %v", err)
|
||||
connectionLogger.Errorf("error: %v", err)
|
||||
}
|
||||
|
||||
if common.HasReachedMaxGlobalConnections() {
|
||||
@ -79,6 +91,7 @@ func ConnectionHandler(w http.ResponseWriter, r *http.Request) {
|
||||
FromBrowserToHasuraChannel: common.NewSafeChannelByte(bufferSize),
|
||||
FromBrowserToGqlActionsChannel: common.NewSafeChannelByte(bufferSize),
|
||||
FromHasuraToBrowserChannel: common.NewSafeChannelByte(bufferSize),
|
||||
Logger: connectionLogger,
|
||||
}
|
||||
|
||||
BrowserConnectionsMutex.Lock()
|
||||
@ -98,11 +111,10 @@ func ConnectionHandler(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
BrowserConnectionsMutex.Unlock()
|
||||
|
||||
log.Infof("connection removed")
|
||||
thisConnection.Logger.Infof("connection removed")
|
||||
}()
|
||||
|
||||
// Log it
|
||||
log.Infof("connection accepted")
|
||||
thisConnection.Logger.Infof("connection accepted")
|
||||
|
||||
// Configure the wait group (to hold this routine execution until both are completed)
|
||||
var wgAll sync.WaitGroup
|
||||
@ -117,7 +129,7 @@ func ConnectionHandler(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
go func() {
|
||||
wgReader.Wait()
|
||||
log.Debug("BrowserConnectionReader finished, closing Write Channel")
|
||||
thisConnection.Logger.Debug("BrowserConnectionReader finished, closing Write Channel")
|
||||
thisConnection.FromHasuraToBrowserChannel.Close()
|
||||
thisConnection.Disconnected = true
|
||||
}()
|
||||
@ -143,7 +155,7 @@ func ConnectionHandler(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
// Ensure a hasura client is running while the browser is connected
|
||||
go func() {
|
||||
log.Debugf("starting hasura client")
|
||||
thisConnection.Logger.Debugf("starting hasura client")
|
||||
|
||||
BrowserConnectedLoop:
|
||||
for {
|
||||
@ -152,12 +164,12 @@ func ConnectionHandler(w http.ResponseWriter, r *http.Request) {
|
||||
break BrowserConnectedLoop
|
||||
default:
|
||||
{
|
||||
log.Debugf("creating hasura client")
|
||||
thisConnection.Logger.Debugf("creating hasura client")
|
||||
BrowserConnectionsMutex.RLock()
|
||||
thisBrowserConnection := BrowserConnections[browserConnectionId]
|
||||
BrowserConnectionsMutex.RUnlock()
|
||||
if thisBrowserConnection != nil {
|
||||
log.Debugf("created hasura client")
|
||||
thisConnection.Logger.Debugf("created hasura client")
|
||||
hasura.HasuraClient(thisBrowserConnection)
|
||||
}
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
@ -168,7 +180,7 @@ func ConnectionHandler(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
// Ensure a gql-actions client is running while the browser is connected
|
||||
go func() {
|
||||
log.Debugf("starting gql-actions client")
|
||||
thisConnection.Logger.Debugf("starting gql-actions client")
|
||||
|
||||
BrowserConnectedLoop:
|
||||
for {
|
||||
@ -177,12 +189,12 @@ func ConnectionHandler(w http.ResponseWriter, r *http.Request) {
|
||||
break BrowserConnectedLoop
|
||||
default:
|
||||
{
|
||||
log.Debugf("creating gql-actions client")
|
||||
thisConnection.Logger.Debugf("creating gql-actions client")
|
||||
BrowserConnectionsMutex.RLock()
|
||||
thisBrowserConnection := BrowserConnections[browserConnectionId]
|
||||
BrowserConnectionsMutex.RUnlock()
|
||||
if thisBrowserConnection != nil {
|
||||
log.Debugf("created gql-actions client")
|
||||
thisConnection.Logger.Debugf("created gql-actions client")
|
||||
|
||||
BrowserConnectionsMutex.Lock()
|
||||
thisBrowserConnection.GraphqlActionsContext, thisBrowserConnection.GraphqlActionsContextCancel = context.WithCancel(browserConnectionContext)
|
||||
@ -232,10 +244,10 @@ func invalidateHasuraConnectionForSessionToken(bc *common.BrowserConnection, ses
|
||||
return // If there's no Hasura connection, there's nothing to invalidate.
|
||||
}
|
||||
|
||||
log.Debugf("Processing invalidate request for sessionToken %v (hasura connection %v)", sessionToken, bc.HasuraConnection.Id)
|
||||
bc.Logger.Debugf("Processing invalidate request for sessionToken %v (hasura connection %v)", sessionToken, bc.HasuraConnection.Id)
|
||||
|
||||
// Stop receiving new messages from the browser.
|
||||
log.Debug("freezing channel fromBrowserToHasuraChannel")
|
||||
bc.Logger.Debug("freezing channel fromBrowserToHasuraChannel")
|
||||
bc.FromBrowserToHasuraChannel.FreezeChannel()
|
||||
|
||||
//Update variables for Mutations (gql-actions requests)
|
||||
@ -259,10 +271,10 @@ func refreshUserSessionVariables(browserConnection *common.BrowserConnection) er
|
||||
// Check authorization
|
||||
sessionVariables, err := akka_apps.AkkaAppsGetSessionVariablesFrom(browserConnectionId, sessionToken)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
browserConnection.Logger.Error(err)
|
||||
return fmt.Errorf("error on checking sessionToken authorization")
|
||||
} else {
|
||||
log.Trace("Session variables obtained successfully")
|
||||
browserConnection.Logger.Trace("Session variables obtained successfully")
|
||||
}
|
||||
|
||||
if _, exists := sessionVariables["x-hasura-role"]; !exists {
|
||||
@ -300,7 +312,7 @@ func connectionInitHandler(browserConnection *common.BrowserConnection) error {
|
||||
if bytes.Contains(fromBrowserMessage, []byte("\"connection_init\"")) {
|
||||
var fromBrowserMessageAsMap map[string]interface{}
|
||||
if err := json.Unmarshal(fromBrowserMessage, &fromBrowserMessageAsMap); err != nil {
|
||||
log.Errorf("failed to unmarshal message: %v", err)
|
||||
browserConnection.Logger.Errorf("failed to unmarshal message: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
@ -310,6 +322,7 @@ func connectionInitHandler(browserConnection *common.BrowserConnection) error {
|
||||
if !existsSessionToken {
|
||||
return fmt.Errorf("X-Session-Token header missing on init connection")
|
||||
}
|
||||
browserConnection.Logger = browserConnection.Logger.WithField("sessionToken", sessionToken)
|
||||
|
||||
if common.HasReachedMaxUserConnections(sessionToken) {
|
||||
return fmt.Errorf("too many connections")
|
||||
@ -338,7 +351,7 @@ func connectionInitHandler(browserConnection *common.BrowserConnection) error {
|
||||
for {
|
||||
meetingId, userId, errCheckAuthorization = bbb_web.BBBWebCheckAuthorization(browserConnectionId, sessionToken, browserConnectionCookies)
|
||||
if errCheckAuthorization != nil {
|
||||
log.Error(errCheckAuthorization)
|
||||
browserConnection.Logger.Error(errCheckAuthorization)
|
||||
}
|
||||
|
||||
if (errCheckAuthorization == nil && meetingId != "" && userId != "") || numOfAttempts > 5 {
|
||||
@ -355,14 +368,16 @@ func connectionInitHandler(browserConnection *common.BrowserConnection) error {
|
||||
if meetingId == "" {
|
||||
return fmt.Errorf("error to obtain user meetingId from BBBWebCheckAuthorization")
|
||||
}
|
||||
browserConnection.Logger = browserConnection.Logger.WithField("meetingId", meetingId)
|
||||
|
||||
if userId == "" {
|
||||
return fmt.Errorf("error to obtain user userId from BBBWebCheckAuthorization")
|
||||
}
|
||||
browserConnection.Logger = browserConnection.Logger.WithField("userId", userId)
|
||||
|
||||
log.Trace("Success on check authorization")
|
||||
browserConnection.Logger.Trace("Success on check authorization")
|
||||
|
||||
log.Debugf("[ConnectionInitHandler] intercepted Session Token %v and Client Session UUID %v", sessionToken, clientSessionUUID)
|
||||
browserConnection.Logger.Debugf("[ConnectionInitHandler] intercepted Session Token %v and Client Session UUID %v", sessionToken, clientSessionUUID)
|
||||
BrowserConnectionsMutex.Lock()
|
||||
browserConnection.SessionToken = sessionToken
|
||||
browserConnection.ClientSessionUUID = clientSessionUUID
|
||||
|
@ -6,7 +6,6 @@ import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"nhooyr.io/websocket"
|
||||
"sync"
|
||||
"time"
|
||||
@ -15,9 +14,8 @@ import (
|
||||
func BrowserConnectionReader(
|
||||
browserConnection *common.BrowserConnection,
|
||||
waitGroups []*sync.WaitGroup) {
|
||||
log := log.WithField("_routine", "BrowserConnectionReader").WithField("browserConnectionId", browserConnection.Id)
|
||||
defer log.Debugf("finished")
|
||||
log.Debugf("starting")
|
||||
defer browserConnection.Logger.Debugf("finished")
|
||||
browserConnection.Logger.Debugf("starting")
|
||||
|
||||
defer func() {
|
||||
browserConnection.FromBrowserToHasuraChannel.Close()
|
||||
@ -39,17 +37,17 @@ func BrowserConnectionReader(
|
||||
messageType, message, err := browserConnection.Websocket.Read(browserConnection.Context)
|
||||
if err != nil {
|
||||
if errors.Is(err, context.Canceled) {
|
||||
log.Debugf("Closing Browser ws connection as Context was cancelled!")
|
||||
browserConnection.Logger.Debugf("Closing Browser ws connection as Context was cancelled!")
|
||||
} else {
|
||||
log.Debugf("Browser is disconnected, skipping reading of ws message: %v", err)
|
||||
browserConnection.Logger.Debugf("Browser is disconnected, skipping reading of ws message: %v", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
log.Tracef("received from browser: %s", string(message))
|
||||
browserConnection.Logger.Tracef("received from browser: %s", string(message))
|
||||
|
||||
if messageType != websocket.MessageText {
|
||||
log.Warnf("received non-text message: %v", messageType)
|
||||
browserConnection.Logger.Warnf("received non-text message: %v", messageType)
|
||||
continue
|
||||
}
|
||||
|
||||
@ -58,7 +56,7 @@ func BrowserConnectionReader(
|
||||
}
|
||||
err = json.Unmarshal(message, &browserMessageType)
|
||||
if err != nil {
|
||||
log.Errorf("failed to unmarshal message: %v", err)
|
||||
browserConnection.Logger.Errorf("failed to unmarshal message: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
|
@ -4,7 +4,6 @@ import (
|
||||
"bbb-graphql-middleware/internal/common"
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"nhooyr.io/websocket"
|
||||
"sync"
|
||||
)
|
||||
@ -12,16 +11,15 @@ import (
|
||||
func BrowserConnectionWriter(
|
||||
browserConnection *common.BrowserConnection,
|
||||
wg *sync.WaitGroup) {
|
||||
log := log.WithField("_routine", "BrowserConnectionWriter").WithField("browserConnectionId", browserConnection.Id)
|
||||
defer log.Debugf("finished")
|
||||
log.Debugf("starting")
|
||||
defer browserConnection.Logger.Debugf("finished")
|
||||
browserConnection.Logger.Debugf("starting")
|
||||
defer wg.Done()
|
||||
|
||||
RangeLoop:
|
||||
for {
|
||||
select {
|
||||
case <-browserConnection.Context.Done():
|
||||
log.Debug("Browser context cancelled.")
|
||||
browserConnection.Logger.Debug("Browser context cancelled.")
|
||||
break RangeLoop
|
||||
case toBrowserMessage := <-browserConnection.FromHasuraToBrowserChannel.ReceiveChannel():
|
||||
{
|
||||
@ -32,10 +30,10 @@ RangeLoop:
|
||||
continue
|
||||
}
|
||||
|
||||
log.Tracef("sending to browser: %s", string(toBrowserMessage))
|
||||
browserConnection.Logger.Tracef("sending to browser: %s", string(toBrowserMessage))
|
||||
err := browserConnection.Websocket.Write(browserConnection.Context, websocket.MessageText, toBrowserMessage)
|
||||
if err != nil {
|
||||
log.Debugf("Browser is disconnected, skipping writing of ws message: %v", err)
|
||||
browserConnection.Logger.Debugf("Browser is disconnected, skipping writing of ws message: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -1,7 +1,4 @@
|
||||
#!/bin/bash
|
||||
|
||||
sudo systemctl stop bbb-graphql-middleware
|
||||
set -a # Automatically export all variables
|
||||
source /etc/default/bbb-graphql-middleware
|
||||
set +a # Stop automatically exporting
|
||||
go run cmd/bbb-graphql-middleware/main.go --signal SIGTERM
|
||||
|
Loading…
Reference in New Issue
Block a user