bigbluebutton-Github/bbb-graphql-middleware/internal/websrv/connhandler.go

415 lines
14 KiB
Go
Raw Normal View History

2023-04-27 09:03:40 +08:00
package websrv
import (
"bytes"
2023-04-27 09:03:40 +08:00
"context"
"encoding/json"
2023-04-27 09:03:40 +08:00
"fmt"
"github.com/iMDT/bbb-graphql-middleware/internal/akka_apps"
"github.com/iMDT/bbb-graphql-middleware/internal/bbb_web"
"github.com/iMDT/bbb-graphql-middleware/internal/common"
"github.com/iMDT/bbb-graphql-middleware/internal/gql_actions"
"github.com/iMDT/bbb-graphql-middleware/internal/hasura"
"github.com/iMDT/bbb-graphql-middleware/internal/msgpatch"
2023-04-27 09:03:40 +08:00
"github.com/iMDT/bbb-graphql-middleware/internal/websrv/reader"
"github.com/iMDT/bbb-graphql-middleware/internal/websrv/writer"
log "github.com/sirupsen/logrus"
"net/http"
"nhooyr.io/websocket"
"os"
"strings"
2023-04-27 09:03:40 +08:00
"sync"
"time"
)
var lastBrowserConnectionId int
// Buffer size of the channels
var bufferSize = 100
// active browser connections
var BrowserConnections = make(map[string]*common.BrowserConnection)
var BrowserConnectionsMutex = &sync.RWMutex{}
2023-04-27 09:03:40 +08:00
// Handle client connection
// This is the connection that comes from browser
func ConnectionHandler(w http.ResponseWriter, r *http.Request) {
log := log.WithField("_routine", "ConnectionHandler")
common.ActivitiesOverviewStarted("__BrowserConnection")
defer common.ActivitiesOverviewCompleted("__BrowserConnection")
2023-04-27 09:03:40 +08:00
// Obtain id for this connection
lastBrowserConnectionId++
browserConnectionId := "BC" + fmt.Sprintf("%010d", lastBrowserConnectionId)
log = log.WithField("browserConnectionId", browserConnectionId)
// Starts a context that will be dependent on the connection, so we can cancel subroutines when the connection is dropped
browserConnectionContext, browserConnectionContextCancel := context.WithCancel(r.Context())
defer browserConnectionContextCancel()
// Add sub-protocol
var acceptOptions websocket.AcceptOptions
acceptOptions.Subprotocols = append(acceptOptions.Subprotocols, "graphql-transport-ws")
bbbOrigin := os.Getenv("BBB_GRAPHQL_MIDDLEWARE_ORIGIN")
if bbbOrigin != "" {
acceptOptions.OriginPatterns = append(acceptOptions.OriginPatterns, bbbOrigin)
}
2023-04-27 09:03:40 +08:00
browserWsConn, err := websocket.Accept(w, r, &acceptOptions)
2024-03-28 02:29:38 +08:00
browserWsConn.SetReadLimit(9999999) //10MB
2023-04-27 09:03:40 +08:00
if err != nil {
log.Errorf("error: %v", err)
}
defer browserWsConn.Close(websocket.StatusInternalError, "the sky is falling")
2023-04-27 09:03:40 +08:00
var thisConnection = common.BrowserConnection{
Id: browserConnectionId,
Websocket: browserWsConn,
BrowserRequestCookies: r.Cookies(),
ActiveSubscriptions: make(map[string]common.GraphQlSubscription, 1),
Context: browserConnectionContext,
ContextCancelFunc: browserConnectionContextCancel,
ConnAckSentToBrowser: false,
2023-04-27 09:03:40 +08:00
}
BrowserConnectionsMutex.Lock()
BrowserConnections[browserConnectionId] = &thisConnection
BrowserConnectionsMutex.Unlock()
defer func() {
2023-05-25 21:15:02 +08:00
msgpatch.RemoveConnCacheDir(browserConnectionId)
2023-04-27 09:03:40 +08:00
BrowserConnectionsMutex.Lock()
_, bcExists := BrowserConnections[browserConnectionId]
if bcExists {
sessionTokenRemoved := BrowserConnections[browserConnectionId].SessionToken
delete(BrowserConnections, browserConnectionId)
if sessionTokenRemoved != "" {
go SendUserGraphqlConnectionClosedSysMsg(sessionTokenRemoved, browserConnectionId)
}
}
2023-04-27 09:03:40 +08:00
BrowserConnectionsMutex.Unlock()
log.Infof("connection removed")
2023-04-27 09:03:40 +08:00
}()
// Log it
log.Infof("connection accepted")
2023-04-27 09:03:40 +08:00
// Create channels
fromBrowserToHasuraConnectionEstablishingChannel := common.NewSafeChannelByte(bufferSize)
fromBrowserToHasuraChannel := common.NewSafeChannelByte(bufferSize)
fromBrowserToGqlActionsChannel := common.NewSafeChannelByte(bufferSize)
fromHasuraToBrowserChannel := common.NewSafeChannelByte(bufferSize)
2023-04-27 09:03:40 +08:00
// Configure the wait group (to hold this routine execution until both are completed)
var wgAll sync.WaitGroup
wgAll.Add(2)
// Other wait group to close this connection once Browser Reader dies
var wgReader sync.WaitGroup
wgReader.Add(1)
// Reads from browser connection, writes into fromBrowserToHasuraChannel and fromBrowserToHasuraConnectionEstablishingChannel
go reader.BrowserConnectionReader(
browserConnectionId,
browserConnectionContext,
browserConnectionContextCancel,
browserWsConn,
fromBrowserToGqlActionsChannel,
fromBrowserToHasuraChannel,
fromBrowserToHasuraConnectionEstablishingChannel,
[]*sync.WaitGroup{&wgAll, &wgReader})
go func() {
wgReader.Wait()
log.Debug("BrowserConnectionReader finished, closing Write Channel")
fromHasuraToBrowserChannel.Close()
thisConnection.Disconnected = true
}()
//Obtain user session variables from bbb-web
if errorOnInitConnection := connectionInitHandler(&thisConnection, fromBrowserToHasuraConnectionEstablishingChannel); errorOnInitConnection != nil {
//If the server wishes to reject the connection it is recommended to close the socket with `4403: Forbidden`.
//https://github.com/enisdenjo/graphql-ws/blob/63881c3372a3564bf42040e3f572dd74e41b2e49/PROTOCOL.md?plain=1#L36
wsError := &websocket.CloseError{
Code: websocket.StatusCode(4403),
Reason: errorOnInitConnection.Error(),
}
browserWsConn.Close(wsError.Code, wsError.Reason)
browserConnectionContextCancel()
}
2023-04-27 09:03:40 +08:00
// Ensure a hasura client is running while the browser is connected
go func() {
log.Debugf("starting hasura client")
2023-04-27 09:03:40 +08:00
BrowserConnectedLoop:
for {
select {
case <-browserConnectionContext.Done():
break BrowserConnectedLoop
default:
{
log.Debugf("creating hasura client")
BrowserConnectionsMutex.RLock()
2023-04-27 09:03:40 +08:00
thisBrowserConnection := BrowserConnections[browserConnectionId]
BrowserConnectionsMutex.RUnlock()
if thisBrowserConnection != nil {
log.Debugf("created hasura client")
hasura.HasuraClient(
thisBrowserConnection,
fromBrowserToHasuraChannel,
fromHasuraToBrowserChannel)
}
time.Sleep(100 * time.Millisecond)
}
}
}
}()
// Ensure a gql-actions client is running while the browser is connected
go func() {
log.Debugf("starting gql-actions client")
BrowserConnectedLoop:
for {
select {
case <-browserConnectionContext.Done():
break BrowserConnectedLoop
default:
{
log.Debugf("creating gql-actions client")
BrowserConnectionsMutex.RLock()
thisBrowserConnection := BrowserConnections[browserConnectionId]
BrowserConnectionsMutex.RUnlock()
if thisBrowserConnection != nil {
log.Debugf("created gql-actions client")
BrowserConnectionsMutex.Lock()
thisBrowserConnection.GraphqlActionsContext, thisBrowserConnection.GraphqlActionsContextCancel = context.WithCancel(browserConnectionContext)
BrowserConnectionsMutex.Unlock()
gql_actions.GraphqlActionsClient(
thisBrowserConnection,
fromBrowserToGqlActionsChannel,
fromHasuraToBrowserChannel)
}
time.Sleep(1000 * time.Millisecond)
2023-04-27 09:03:40 +08:00
}
}
}
}()
// Reads from fromHasuraToBrowserChannel, writes to browser connection
go writer.BrowserConnectionWriter(browserConnectionId, browserConnectionContext, browserWsConn, fromHasuraToBrowserChannel, &wgAll)
2023-04-27 09:03:40 +08:00
// Wait until all routines are finished
wgAll.Wait()
}
func InvalidateSessionTokenConnections(sessionTokenToInvalidate string) {
BrowserConnectionsMutex.RLock()
connectionsToProcess := make([]*common.BrowserConnection, 0)
for _, browserConnection := range BrowserConnections {
if browserConnection.SessionToken == sessionTokenToInvalidate {
connectionsToProcess = append(connectionsToProcess, browserConnection)
}
}
BrowserConnectionsMutex.RUnlock()
var wg sync.WaitGroup
for _, browserConnection := range connectionsToProcess {
wg.Add(1)
go func(bc *common.BrowserConnection) {
defer wg.Done()
go refreshUserSessionVariables(bc)
invalidateHasuraConnectionForSessionToken(bc, sessionTokenToInvalidate)
}(browserConnection)
}
wg.Wait()
}
func invalidateHasuraConnectionForSessionToken(bc *common.BrowserConnection, sessionToken string) {
if bc.HasuraConnection == nil {
return // If there's no Hasura connection, there's nothing to invalidate.
}
hasuraConnectionId := bc.HasuraConnection.Id
// Send message to stop receiving new messages from the browser.
bc.HasuraConnection.FreezeMsgFromBrowserChan.Send(true)
bc.GraphqlActionsContextCancel()
// Wait until there are no active mutations.
for iterationCount := 0; iterationCount < 20; iterationCount++ {
activeMutationFound := false
bc.ActiveSubscriptionsMutex.RLock()
for _, subscription := range bc.ActiveSubscriptions {
if subscription.Type == common.Mutation {
activeMutationFound = true
break
}
}
bc.ActiveSubscriptionsMutex.RUnlock()
if !activeMutationFound {
break // Exit the loop if no active mutations are found.
}
time.Sleep(100 * time.Millisecond) // Wait a bit before checking again.
}
log.Debugf("Processing invalidate request for sessionToken %v (hasura connection %v)", sessionToken, hasuraConnectionId)
// Cancel the Hasura connection context to clean up resources.
if bc.HasuraConnection != nil && bc.HasuraConnection.ContextCancelFunc != nil {
bc.HasuraConnection.ContextCancelFunc()
}
log.Debugf("Processed invalidate request for sessionToken %v (hasura connection %v)", sessionToken, hasuraConnectionId)
// Send a reconnection confirmation message
go SendUserGraphqlReconnectionForcedEvtMsg(sessionToken)
}
func refreshUserSessionVariables(browserConnection *common.BrowserConnection) error {
BrowserConnectionsMutex.RLock()
browserUserId := browserConnection.UserId
browserMeetingId := browserConnection.MeetingId
browserConnectionId := browserConnection.Id
BrowserConnectionsMutex.RUnlock()
// Check authorization
sessionVariables, err := akka_apps.AkkaAppsGetSessionVariablesFrom(browserConnectionId, browserMeetingId, browserUserId)
if err != nil {
log.Error(err)
return fmt.Errorf("error on checking sessionToken authorization")
} else {
log.Trace("Session variables obtained successfully")
}
log.Info(sessionVariables)
if _, exists := sessionVariables["x-hasura-role"]; !exists {
return fmt.Errorf("error on checking sessionToken authorization, X-Hasura-Role is missing")
}
if _, exists := sessionVariables["x-hasura-userid"]; !exists {
return fmt.Errorf("error on checking sessionToken authorization, X-Hasura-UserId is missing")
}
if _, exists := sessionVariables["x-hasura-meetingid"]; !exists {
return fmt.Errorf("error on checking sessionToken authorization, X-Hasura-MeetingId is missing")
}
BrowserConnectionsMutex.Lock()
browserConnection.BBBWebSessionVariables = sessionVariables
BrowserConnectionsMutex.Unlock()
return nil
}
func connectionInitHandler(
browserConnection *common.BrowserConnection,
fromBrowserToHasuraConnectionEstablishingChannel *common.SafeChannelByte) error {
BrowserConnectionsMutex.RLock()
browserConnectionId := browserConnection.Id
browserConnectionCookies := browserConnection.BrowserRequestCookies
BrowserConnectionsMutex.RUnlock()
// Intercept the fromBrowserMessage channel to get the sessionToken
for {
fromBrowserMessage, ok := fromBrowserToHasuraConnectionEstablishingChannel.Receive()
if !ok {
//Received all messages. Channel is closed
return fmt.Errorf("error on receiving init connection")
}
if bytes.Contains(fromBrowserMessage, []byte("\"connection_init\"")) {
var fromBrowserMessageAsMap map[string]interface{}
if err := json.Unmarshal(fromBrowserMessage, &fromBrowserMessageAsMap); err != nil {
log.Errorf("failed to unmarshal message: %v", err)
continue
}
var payloadAsMap = fromBrowserMessageAsMap["payload"].(map[string]interface{})
var headersAsMap = payloadAsMap["headers"].(map[string]interface{})
var sessionToken, existsSessionToken = headersAsMap["X-Session-Token"].(string)
if !existsSessionToken {
return fmt.Errorf("X-Session-Token header missing on init connection")
}
var clientSessionUUID, existsClientSessionUUID = headersAsMap["X-ClientSessionUUID"].(string)
if !existsClientSessionUUID {
return fmt.Errorf("X-ClientSessionUUID header missing on init connection")
}
var clientType, existsClientType = headersAsMap["X-ClientType"].(string)
if !existsClientType {
return fmt.Errorf("X-ClientType header missing on init connection")
}
var clientIsMobile, existsMobile = headersAsMap["X-ClientIsMobile"].(string)
if !existsMobile {
return fmt.Errorf("X-ClientIsMobile header missing on init connection")
}
var meetingId, userId string
var errCheckAuthorization error
// Check authorization
var numOfAttempts = 0
for {
meetingId, userId, errCheckAuthorization = bbb_web.BBBWebCheckAuthorization(browserConnectionId, sessionToken, browserConnectionCookies)
if errCheckAuthorization != nil {
log.Error(errCheckAuthorization)
}
if (errCheckAuthorization == nil && meetingId != "" && userId != "") || numOfAttempts > 5 {
break
}
numOfAttempts++
time.Sleep(100 * time.Millisecond)
}
if errCheckAuthorization != nil {
return fmt.Errorf("error on trying to check authorization")
}
if meetingId == "" {
return fmt.Errorf("error to obtain user meetingId from BBBWebCheckAuthorization")
}
if userId == "" {
return fmt.Errorf("error to obtain user userId from BBBWebCheckAuthorization")
}
log.Trace("Success on check authorization")
log.Debugf("[ConnectionInitHandler] intercepted Session Token %v and Client Session UUID %v", sessionToken, clientSessionUUID)
BrowserConnectionsMutex.Lock()
browserConnection.SessionToken = sessionToken
browserConnection.ClientSessionUUID = clientSessionUUID
browserConnection.MeetingId = meetingId
browserConnection.UserId = userId
browserConnection.ConnectionInitMessage = fromBrowserMessage
BrowserConnectionsMutex.Unlock()
if err := refreshUserSessionVariables(browserConnection); err != nil {
return fmt.Errorf("error on getting session variables")
}
go SendUserGraphqlConnectionEstablishedSysMsg(
sessionToken,
clientSessionUUID,
clientType,
strings.ToLower(clientIsMobile) == "true",
browserConnectionId,
)
fromBrowserToHasuraConnectionEstablishingChannel.Close()
break
}
}
return nil
}