125 lines
3.9 KiB
Go
125 lines
3.9 KiB
Go
package websrv
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"github.com/iMDT/bbb-graphql-middleware/internal/common"
|
|
"github.com/iMDT/bbb-graphql-middleware/internal/hascli"
|
|
"github.com/iMDT/bbb-graphql-middleware/internal/msgpatch"
|
|
"github.com/iMDT/bbb-graphql-middleware/internal/websrv/reader"
|
|
"github.com/iMDT/bbb-graphql-middleware/internal/websrv/writer"
|
|
log "github.com/sirupsen/logrus"
|
|
"net/http"
|
|
"nhooyr.io/websocket"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
var lastBrowserConnectionId int
|
|
|
|
// Buffer size of the channels
|
|
var bufferSize = 100
|
|
|
|
// active browser connections
|
|
var BrowserConnections = make(map[string]*common.BrowserConnection)
|
|
var BrowserConnectionsMutex = &sync.Mutex{}
|
|
|
|
// Handle client connection
|
|
// This is the connection that comes from browser
|
|
func ConnectionHandler(w http.ResponseWriter, r *http.Request) {
|
|
log := log.WithField("_routine", "ConnectionHandler")
|
|
|
|
// Obtain id for this connection
|
|
lastBrowserConnectionId++
|
|
browserConnectionId := "BC" + fmt.Sprintf("%010d", lastBrowserConnectionId)
|
|
log = log.WithField("browserConnectionId", browserConnectionId)
|
|
|
|
// Starts a context that will be dependent on the connection, so we can cancel subroutines when the connection is dropped
|
|
browserConnectionContext, browserConnectionContextCancel := context.WithCancel(r.Context())
|
|
defer browserConnectionContextCancel()
|
|
|
|
// Add sub-protocol
|
|
var acceptOptions websocket.AcceptOptions
|
|
acceptOptions.Subprotocols = append(acceptOptions.Subprotocols, "graphql-ws")
|
|
|
|
c, err := websocket.Accept(w, r, &acceptOptions)
|
|
if err != nil {
|
|
log.Errorf("error: %v", err)
|
|
}
|
|
defer c.Close(websocket.StatusInternalError, "the sky is falling")
|
|
|
|
var thisConnection = common.BrowserConnection{
|
|
Id: browserConnectionId,
|
|
ActiveSubscriptions: make(map[string]common.GraphQlSubscription, 1),
|
|
Context: browserConnectionContext,
|
|
}
|
|
|
|
BrowserConnectionsMutex.Lock()
|
|
BrowserConnections[browserConnectionId] = &thisConnection
|
|
BrowserConnectionsMutex.Unlock()
|
|
|
|
defer func() {
|
|
if thisConnection.JsonPatchSupported {
|
|
msgpatch.RemoveConnCacheDir(BrowserConnections[browserConnectionId])
|
|
}
|
|
BrowserConnectionsMutex.Lock()
|
|
delete(BrowserConnections, browserConnectionId)
|
|
BrowserConnectionsMutex.Unlock()
|
|
}()
|
|
|
|
// Log it
|
|
log.Printf("connection accepted")
|
|
|
|
// Create channels
|
|
fromBrowserChannel1 := make(chan interface{}, bufferSize)
|
|
fromBrowserChannel2 := make(chan interface{}, bufferSize)
|
|
toBrowserChannel := make(chan interface{}, bufferSize)
|
|
|
|
// Ensure a hasura client is running while the browser is connected
|
|
go func() {
|
|
log.Printf("starting hasura client")
|
|
|
|
BrowserConnectedLoop:
|
|
for {
|
|
select {
|
|
case <-browserConnectionContext.Done():
|
|
break BrowserConnectedLoop
|
|
default:
|
|
{
|
|
log.Printf("creating hasura client")
|
|
BrowserConnectionsMutex.Lock()
|
|
thisBrowserConnection := BrowserConnections[browserConnectionId]
|
|
BrowserConnectionsMutex.Unlock()
|
|
if thisBrowserConnection != nil {
|
|
hascli.HasuraClient(thisBrowserConnection, r.Cookies(), fromBrowserChannel1, toBrowserChannel)
|
|
}
|
|
time.Sleep(100 * time.Millisecond)
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
|
|
// Configure the wait group (to hold this routine execution until both are completed)
|
|
var wgAll sync.WaitGroup
|
|
wgAll.Add(3)
|
|
|
|
var wgReader sync.WaitGroup
|
|
wgReader.Add(1)
|
|
|
|
// Reads from browser connection, writes into fromBrowserChannel1 and fromBrowserChannel2
|
|
go reader.BrowserConnectionReader(browserConnectionId, browserConnectionContext, c, fromBrowserChannel1, fromBrowserChannel2, []*sync.WaitGroup{&wgAll, &wgReader})
|
|
go func() {
|
|
wgReader.Wait()
|
|
thisConnection.Disconnected = true
|
|
}()
|
|
|
|
// Reads from toBrowserChannel, writes to browser connection
|
|
go writer.BrowserConnectionWriter(browserConnectionId, browserConnectionContext, c, toBrowserChannel, &wgAll)
|
|
|
|
go SessionTokenReader(browserConnectionId, browserConnectionContext, fromBrowserChannel2, &wgAll)
|
|
|
|
// Wait until all routines are finished
|
|
wgAll.Wait()
|
|
|
|
}
|