graphql-server: Insert client graphql connections into the database (#18875)
This commit is contained in:
parent
45224c0758
commit
09f3e6fae4
@ -4,17 +4,15 @@ import org.apache.pekko.actor.ActorSystem
|
||||
import org.apache.pekko.event.Logging
|
||||
import org.apache.pekko.http.scaladsl.Http
|
||||
import org.apache.pekko.stream.ActorMaterializer
|
||||
import org.bigbluebutton.common2.redis.{ MessageSender, RedisConfig, RedisPublisher }
|
||||
import org.bigbluebutton.common2.redis.{MessageSender, RedisConfig, RedisPublisher}
|
||||
import org.bigbluebutton.core._
|
||||
import org.bigbluebutton.core.bus._
|
||||
import org.bigbluebutton.core.pubsub.senders.ReceivedJsonMsgHandlerActor
|
||||
import org.bigbluebutton.core2.AnalyticsActor
|
||||
import org.bigbluebutton.core2.FromAkkaAppsMsgSenderActor
|
||||
import org.bigbluebutton.endpoint.redis.AppsRedisSubscriberActor
|
||||
import org.bigbluebutton.endpoint.redis.{ RedisRecorderActor, ExportAnnotationsActor }
|
||||
import org.bigbluebutton.endpoint.redis.LearningDashboardActor
|
||||
import org.bigbluebutton.endpoint.redis.{AppsRedisSubscriberActor, ExportAnnotationsActor, GraphqlActionsActor, LearningDashboardActor, RedisRecorderActor}
|
||||
import org.bigbluebutton.common2.bus.IncomingJsonMessageBus
|
||||
import org.bigbluebutton.service.{ HealthzService, MeetingInfoActor, MeetingInfoService }
|
||||
import org.bigbluebutton.service.{HealthzService, MeetingInfoActor, MeetingInfoService}
|
||||
|
||||
object Boot extends App with SystemConfiguration {
|
||||
|
||||
@ -69,6 +67,11 @@ object Boot extends App with SystemConfiguration {
|
||||
"LearningDashboardActor"
|
||||
)
|
||||
|
||||
val graphqlActionsActor = system.actorOf(
|
||||
GraphqlActionsActor.props(system),
|
||||
"GraphqlActionsActor"
|
||||
)
|
||||
|
||||
recordingEventBus.subscribe(redisRecorderActor, outMessageChannel)
|
||||
val incomingJsonMessageBus = new IncomingJsonMessageBus
|
||||
|
||||
@ -85,6 +88,9 @@ object Boot extends App with SystemConfiguration {
|
||||
outBus2.subscribe(learningDashboardActor, outBbbMsgMsgChannel)
|
||||
bbbMsgBus.subscribe(learningDashboardActor, analyticsChannel)
|
||||
|
||||
eventBus.subscribe(graphqlActionsActor, meetingManagerChannel)
|
||||
bbbMsgBus.subscribe(graphqlActionsActor, analyticsChannel)
|
||||
|
||||
val bbbActor = system.actorOf(BigBlueButtonActor.props(system, eventBus, bbbMsgBus, outGW, healthzService), "bigbluebutton-actor")
|
||||
eventBus.subscribe(bbbActor, meetingManagerChannel)
|
||||
|
||||
|
@ -81,6 +81,8 @@ class BigBlueButtonActor(
|
||||
case m: GetRunningMeetingsReqMsg => handleGetRunningMeetingsReqMsg(m)
|
||||
case m: CheckAlivePingSysMsg => handleCheckAlivePingSysMsg(m)
|
||||
case m: ValidateConnAuthTokenSysMsg => handleValidateConnAuthTokenSysMsg(m)
|
||||
case _: UserGraphqlConnectionStablishedSysMsg => //Ignore
|
||||
case _: UserGraphqlConnectionClosedSysMsg => //Ignore
|
||||
case _ => log.warning("Cannot handle " + msg.envelope.name)
|
||||
}
|
||||
}
|
||||
|
@ -13,6 +13,7 @@ case class UserDbModel(
|
||||
role: String,
|
||||
avatar: String = "",
|
||||
color: String = "",
|
||||
sessionToken: String = "",
|
||||
authed: Boolean = false,
|
||||
joined: Boolean = false,
|
||||
banned: Boolean = false,
|
||||
@ -27,7 +28,7 @@ case class UserDbModel(
|
||||
|
||||
class UserDbTableDef(tag: Tag) extends Table[UserDbModel](tag, None, "user") {
|
||||
override def * = (
|
||||
userId,extId,meetingId,name,role,avatar,color,authed,joined,banned,loggedOut,guest,guestStatus,registeredOn,excludeFromDashboard) <> (UserDbModel.tupled, UserDbModel.unapply)
|
||||
userId,extId,meetingId,name,role,avatar,color, sessionToken, authed,joined,banned,loggedOut,guest,guestStatus,registeredOn,excludeFromDashboard) <> (UserDbModel.tupled, UserDbModel.unapply)
|
||||
val userId = column[String]("userId", O.PrimaryKey)
|
||||
val extId = column[String]("extId")
|
||||
val meetingId = column[String]("meetingId")
|
||||
@ -35,6 +36,7 @@ class UserDbTableDef(tag: Tag) extends Table[UserDbModel](tag, None, "user") {
|
||||
val role = column[String]("role")
|
||||
val avatar = column[String]("avatar")
|
||||
val color = column[String]("color")
|
||||
val sessionToken = column[String]("sessionToken")
|
||||
val authed = column[Boolean]("authed")
|
||||
val joined = column[Boolean]("joined")
|
||||
val banned = column[Boolean]("banned")
|
||||
@ -57,6 +59,7 @@ object UserDAO {
|
||||
role = regUser.role,
|
||||
avatar = regUser.avatarURL,
|
||||
color = regUser.color,
|
||||
sessionToken = regUser.sessionToken,
|
||||
authed = regUser.authed,
|
||||
joined = regUser.joined,
|
||||
banned = regUser.banned,
|
||||
|
@ -0,0 +1,64 @@
|
||||
package org.bigbluebutton.core.db
|
||||
|
||||
import org.bigbluebutton.core.models.{ VoiceUserState }
|
||||
import slick.jdbc.PostgresProfile.api._
|
||||
|
||||
import scala.concurrent.ExecutionContext.Implicits.global
|
||||
import scala.util.{Failure, Success }
|
||||
|
||||
case class UserGraphqlConnectionDbModel (
|
||||
graphqlConnectionId: Option[Int],
|
||||
sessionToken: String,
|
||||
middlewareConnectionId: String,
|
||||
stablishedAt: java.sql.Timestamp,
|
||||
closedAt: Option[java.sql.Timestamp],
|
||||
)
|
||||
|
||||
class UserGraphqlConnectionDbTableDef(tag: Tag) extends Table[UserGraphqlConnectionDbModel](tag, None, "user_graphqlConnection") {
|
||||
override def * = (
|
||||
graphqlConnectionId, sessionToken, middlewareConnectionId, stablishedAt, closedAt
|
||||
) <> (UserGraphqlConnectionDbModel.tupled, UserGraphqlConnectionDbModel.unapply)
|
||||
val graphqlConnectionId = column[Option[Int]]("graphqlConnectionId", O.PrimaryKey, O.AutoInc)
|
||||
val sessionToken = column[String]("sessionToken")
|
||||
val middlewareConnectionId = column[String]("middlewareConnectionId")
|
||||
val stablishedAt = column[java.sql.Timestamp]("stablishedAt")
|
||||
val closedAt = column[Option[java.sql.Timestamp]]("closedAt")
|
||||
}
|
||||
|
||||
|
||||
object UserGraphqlConnectionDAO {
|
||||
def insert(sessionToken: String, middlewareConnectionId: String) = {
|
||||
DatabaseConnection.db.run(
|
||||
TableQuery[UserGraphqlConnectionDbTableDef].insertOrUpdate(
|
||||
UserGraphqlConnectionDbModel(
|
||||
graphqlConnectionId = None,
|
||||
sessionToken = sessionToken,
|
||||
middlewareConnectionId = middlewareConnectionId,
|
||||
stablishedAt = new java.sql.Timestamp(System.currentTimeMillis()),
|
||||
closedAt = None
|
||||
)
|
||||
)
|
||||
).onComplete {
|
||||
case Success(rowsAffected) => {
|
||||
DatabaseConnection.logger.debug(s"$rowsAffected row(s) inserted on user_graphqlConnection table!")
|
||||
}
|
||||
case Failure(e) => DatabaseConnection.logger.debug(s"Error inserting user_graphqlConnection: $e")
|
||||
}
|
||||
}
|
||||
|
||||
def updateClosed(sessionToken: String, middlewareConnectionId: String) = {
|
||||
DatabaseConnection.db.run(
|
||||
TableQuery[UserGraphqlConnectionDbTableDef]
|
||||
.filter(_.sessionToken === sessionToken)
|
||||
.filter(_.middlewareConnectionId === middlewareConnectionId)
|
||||
.filter(_.closedAt.isEmpty)
|
||||
.map(u => u.closedAt)
|
||||
.update(Some(new java.sql.Timestamp(System.currentTimeMillis())))
|
||||
).onComplete {
|
||||
case Success(rowsAffected) => DatabaseConnection.logger.debug(s"$rowsAffected row(s) updated on user_graphqlConnection table!")
|
||||
case Failure(e) => DatabaseConnection.logger.error(s"Error updating user_graphqlConnection: $e")
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
@ -447,6 +447,13 @@ class ReceivedJsonMsgHandlerActor(
|
||||
case TimerEndedPubMsg.NAME =>
|
||||
routeGenericMsg[TimerEndedPubMsg](envelope, jsonNode)
|
||||
|
||||
// Messages from Graphql Middleware
|
||||
case UserGraphqlConnectionStablishedSysMsg.NAME =>
|
||||
route[UserGraphqlConnectionStablishedSysMsg](meetingManagerChannel, envelope, jsonNode)
|
||||
|
||||
case UserGraphqlConnectionClosedSysMsg.NAME =>
|
||||
route[UserGraphqlConnectionClosedSysMsg](meetingManagerChannel, envelope, jsonNode)
|
||||
|
||||
case _ =>
|
||||
log.error("Cannot route envelope name " + envelope.name)
|
||||
// do nothing
|
||||
|
@ -0,0 +1,43 @@
|
||||
package org.bigbluebutton.endpoint.redis
|
||||
|
||||
import org.apache.pekko.actor.{Actor, ActorLogging, ActorSystem, Props}
|
||||
import org.bigbluebutton.common2.msgs._
|
||||
import org.bigbluebutton.core.db.UserGraphqlConnectionDAO
|
||||
|
||||
object GraphqlActionsActor {
|
||||
def props(system: ActorSystem): Props =
|
||||
Props(
|
||||
classOf[GraphqlActionsActor],
|
||||
system,
|
||||
)
|
||||
}
|
||||
|
||||
class GraphqlActionsActor(
|
||||
system: ActorSystem,
|
||||
) extends Actor with ActorLogging {
|
||||
|
||||
def receive = {
|
||||
//=============================
|
||||
// 2x messages
|
||||
case msg: BbbCommonEnvCoreMsg => handleBbbCommonEnvCoreMsg(msg)
|
||||
case _ => // do nothing
|
||||
}
|
||||
|
||||
private def handleBbbCommonEnvCoreMsg(msg: BbbCommonEnvCoreMsg): Unit = {
|
||||
msg.core match {
|
||||
// Messages from bbb-graphql-middleware
|
||||
case m: UserGraphqlConnectionStablishedSysMsg => handleUserGraphqlConnectionStablishedSysMsg(m)
|
||||
case m: UserGraphqlConnectionClosedSysMsg => handleUserGraphqlConnectionClosedSysMsg(m)
|
||||
case _ => // message not to be handled.
|
||||
}
|
||||
}
|
||||
|
||||
private def handleUserGraphqlConnectionStablishedSysMsg(msg: UserGraphqlConnectionStablishedSysMsg) {
|
||||
UserGraphqlConnectionDAO.insert(msg.body.sessionToken, msg.body.browserConnectionId)
|
||||
}
|
||||
|
||||
private def handleUserGraphqlConnectionClosedSysMsg(msg: UserGraphqlConnectionClosedSysMsg) {
|
||||
UserGraphqlConnectionDAO.updateClosed(msg.body.sessionToken, msg.body.browserConnectionId)
|
||||
}
|
||||
|
||||
}
|
@ -242,6 +242,31 @@ case class InvalidateUserGraphqlConnectionSysMsg(
|
||||
) extends BbbCoreMsg
|
||||
case class InvalidateUserGraphqlConnectionSysMsgBody(meetingId: String, userId: String, sessionToken: String, reason: String)
|
||||
|
||||
/**
|
||||
* Sent from graphql-middleware to akka-apps
|
||||
*/
|
||||
|
||||
object UserGraphqlConnectionInvalidatedEvtMsg { val NAME = "UserGraphqlConnectionInvalidatedEvtMsg" }
|
||||
case class UserGraphqlConnectionInvalidatedEvtMsg(
|
||||
header: BbbCoreBaseHeader,
|
||||
body: UserGraphqlConnectionInvalidatedEvtMsgBody
|
||||
) extends BbbCoreMsg
|
||||
case class UserGraphqlConnectionInvalidatedEvtMsgBody(sessionToken: String, browserConnectionId: String)
|
||||
|
||||
object UserGraphqlConnectionStablishedSysMsg { val NAME = "UserGraphqlConnectionStablishedSysMsg" }
|
||||
case class UserGraphqlConnectionStablishedSysMsg(
|
||||
header: BbbCoreBaseHeader,
|
||||
body: UserGraphqlConnectionStablishedSysMsgBody
|
||||
) extends BbbCoreMsg
|
||||
case class UserGraphqlConnectionStablishedSysMsgBody(sessionToken: String, browserConnectionId: String)
|
||||
|
||||
object UserGraphqlConnectionClosedSysMsg { val NAME = "UserGraphqlConnectionClosedSysMsg" }
|
||||
case class UserGraphqlConnectionClosedSysMsg(
|
||||
header: BbbCoreBaseHeader,
|
||||
body: UserGraphqlConnectionClosedSysMsgBody
|
||||
) extends BbbCoreMsg
|
||||
case class UserGraphqlConnectionClosedSysMsgBody(sessionToken: String, browserConnectionId: String)
|
||||
|
||||
/**
|
||||
* Sent from akka-apps to bbb-web to inform a summary of the meeting activities
|
||||
*/
|
||||
|
@ -3,8 +3,8 @@ package main
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/iMDT/bbb-graphql-middleware/internal/msgpatch"
|
||||
"github.com/iMDT/bbb-graphql-middleware/internal/rediscli"
|
||||
"github.com/iMDT/bbb-graphql-middleware/internal/websrv"
|
||||
"github.com/iMDT/bbb-graphql-middleware/internal/websrv/invalidator"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"net/http"
|
||||
"os"
|
||||
@ -20,8 +20,8 @@ func main() {
|
||||
//Clear cache from last exec
|
||||
msgpatch.ClearAllCaches()
|
||||
|
||||
// Connection invalidator
|
||||
go invalidator.BrowserConnectionInvalidator()
|
||||
// Listen msgs from akka (for example to invalidate connection)
|
||||
go rediscli.StartRedisListener()
|
||||
|
||||
// Websocket listener
|
||||
// set default port
|
||||
|
@ -12,12 +12,12 @@ import (
|
||||
// HasuraConnectionReader consumes messages from Hasura connection and add send to the browser channel
|
||||
func HasuraConnectionReader(hc *common.HasuraConnection, fromHasuraToBrowserChannel chan interface{}, fromBrowserToHasuraChannel chan interface{}, wg *sync.WaitGroup) {
|
||||
log := log.WithField("_routine", "HasuraConnectionReader").WithField("browserConnectionId", hc.Browserconn.Id).WithField("hasuraConnectionId", hc.Id)
|
||||
defer log.Debugf("finished")
|
||||
log.Debugf("starting")
|
||||
|
||||
defer wg.Done()
|
||||
defer hc.ContextCancelFunc()
|
||||
|
||||
defer log.Debugf("finished")
|
||||
|
||||
for {
|
||||
// Read a message from hasura
|
||||
var message interface{}
|
||||
|
13
bbb-graphql-middleware/internal/rediscli/connection.go
Normal file
13
bbb-graphql-middleware/internal/rediscli/connection.go
Normal file
@ -0,0 +1,13 @@
|
||||
package rediscli
|
||||
|
||||
import "github.com/redis/go-redis/v9"
|
||||
|
||||
var redisClient = redis.NewClient(&redis.Options{
|
||||
Addr: "127.0.0.1:6379",
|
||||
Password: "",
|
||||
DB: 0,
|
||||
})
|
||||
|
||||
func GetRedisConn() *redis.Client {
|
||||
return redisClient
|
||||
}
|
49
bbb-graphql-middleware/internal/rediscli/listener.go
Normal file
49
bbb-graphql-middleware/internal/rediscli/listener.go
Normal file
@ -0,0 +1,49 @@
|
||||
package rediscli
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"strings"
|
||||
)
|
||||
|
||||
func StartRedisListener() {
|
||||
log := log.WithField("_routine", "StartRedisListener")
|
||||
|
||||
var ctx = context.Background()
|
||||
|
||||
subscriber := GetRedisConn().Subscribe(ctx, "from-akka-apps-redis-channel")
|
||||
|
||||
for {
|
||||
msg, err := subscriber.ReceiveMessage(ctx)
|
||||
if err != nil {
|
||||
log.Errorf("error: ", err)
|
||||
}
|
||||
|
||||
// Skip parsing unnecessary messages
|
||||
if !strings.Contains(msg.Payload, "InvalidateUserGraphqlConnectionSysMsg") {
|
||||
continue
|
||||
}
|
||||
|
||||
var message interface{}
|
||||
if err := json.Unmarshal([]byte(msg.Payload), &message); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
messageAsMap := message.(map[string]interface{})
|
||||
|
||||
messageEnvelopeAsMap := messageAsMap["envelope"].(map[string]interface{})
|
||||
|
||||
messageType := messageEnvelopeAsMap["name"]
|
||||
|
||||
if messageType == "InvalidateUserGraphqlConnectionSysMsg" {
|
||||
messageCoreAsMap := messageAsMap["core"].(map[string]interface{})
|
||||
messageBodyAsMap := messageCoreAsMap["body"].(map[string]interface{})
|
||||
sessionTokenToInvalidate := messageBodyAsMap["sessionToken"]
|
||||
log.Debugf("Received invalidate request for sessionToken %v", sessionTokenToInvalidate)
|
||||
|
||||
//Not being used yet
|
||||
//websrv.InvalidateSessionTokenConnections(sessionTokenToInvalidate.(string))
|
||||
}
|
||||
}
|
||||
}
|
74
bbb-graphql-middleware/internal/rediscli/sender.go
Normal file
74
bbb-graphql-middleware/internal/rediscli/sender.go
Normal file
@ -0,0 +1,74 @@
|
||||
package rediscli
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"time"
|
||||
)
|
||||
|
||||
func getCurrTimeInMs() int64 {
|
||||
currentTime := time.Now()
|
||||
milliseconds := currentTime.UnixNano() / int64(time.Millisecond)
|
||||
return milliseconds
|
||||
}
|
||||
|
||||
func sendBbbCoreMsgToRedis(name string, body map[string]interface{}) {
|
||||
channelName := "to-akka-apps-redis-channel"
|
||||
|
||||
message := map[string]interface{}{
|
||||
"envelope": map[string]interface{}{
|
||||
"name": name,
|
||||
"routing": map[string]interface{}{
|
||||
"sender": "bbb-graphql-middleware",
|
||||
},
|
||||
"timestamp": getCurrTimeInMs(),
|
||||
},
|
||||
"core": map[string]interface{}{
|
||||
"header": map[string]interface{}{
|
||||
"name": name,
|
||||
},
|
||||
"body": body,
|
||||
},
|
||||
}
|
||||
|
||||
messageJSON, err := json.Marshal(message)
|
||||
if err != nil {
|
||||
fmt.Printf("Error while marshaling message to json: %v\n", err)
|
||||
return
|
||||
}
|
||||
|
||||
err = GetRedisConn().Publish(context.Background(), channelName, messageJSON).Err()
|
||||
if err != nil {
|
||||
fmt.Printf("Error while sending msg to redis channel: %v\n", err)
|
||||
return
|
||||
}
|
||||
|
||||
fmt.Printf("JSON message sent to channel %s:\n%s\n", channelName, messageJSON)
|
||||
}
|
||||
|
||||
func SendUserGraphqlConnectionInvalidatedEvtMsg(sessionToken string) {
|
||||
var body = map[string]interface{}{
|
||||
"sessionToken": sessionToken,
|
||||
}
|
||||
|
||||
sendBbbCoreMsgToRedis("UserGraphqlConnectionInvalidatedEvtMsg", body)
|
||||
}
|
||||
|
||||
func SendUserGraphqlConnectionStablishedSysMsg(sessionToken string, browserConnectionId string) {
|
||||
var body = map[string]interface{}{
|
||||
"sessionToken": sessionToken,
|
||||
"browserConnectionId": browserConnectionId,
|
||||
}
|
||||
|
||||
sendBbbCoreMsgToRedis("UserGraphqlConnectionStablishedSysMsg", body)
|
||||
}
|
||||
|
||||
func SendUserGraphqlConnectionClosedSysMsg(sessionToken string, browserConnectionId string) {
|
||||
var body = map[string]interface{}{
|
||||
"sessionToken": sessionToken,
|
||||
"browserConnectionId": browserConnectionId,
|
||||
}
|
||||
|
||||
sendBbbCoreMsgToRedis("UserGraphqlConnectionClosedSysMsg", body)
|
||||
}
|
@ -6,6 +6,7 @@ import (
|
||||
"github.com/iMDT/bbb-graphql-middleware/internal/common"
|
||||
"github.com/iMDT/bbb-graphql-middleware/internal/hascli"
|
||||
"github.com/iMDT/bbb-graphql-middleware/internal/msgpatch"
|
||||
"github.com/iMDT/bbb-graphql-middleware/internal/rediscli"
|
||||
"github.com/iMDT/bbb-graphql-middleware/internal/websrv/reader"
|
||||
"github.com/iMDT/bbb-graphql-middleware/internal/websrv/writer"
|
||||
log "github.com/sirupsen/logrus"
|
||||
@ -61,8 +62,10 @@ func ConnectionHandler(w http.ResponseWriter, r *http.Request) {
|
||||
defer func() {
|
||||
msgpatch.RemoveConnCacheDir(browserConnectionId)
|
||||
BrowserConnectionsMutex.Lock()
|
||||
sessionTokenRemoved := BrowserConnections[browserConnectionId].SessionToken
|
||||
delete(BrowserConnections, browserConnectionId)
|
||||
BrowserConnectionsMutex.Unlock()
|
||||
go rediscli.SendUserGraphqlConnectionClosedSysMsg(sessionTokenRemoved, browserConnectionId)
|
||||
|
||||
log.Infof("connection removed")
|
||||
}()
|
||||
@ -90,6 +93,7 @@ func ConnectionHandler(w http.ResponseWriter, r *http.Request) {
|
||||
BrowserConnectionsMutex.RLock()
|
||||
thisBrowserConnection := BrowserConnections[browserConnectionId]
|
||||
BrowserConnectionsMutex.RUnlock()
|
||||
log.Debugf("created hasura client")
|
||||
if thisBrowserConnection != nil {
|
||||
hascli.HasuraClient(thisBrowserConnection, r.Cookies(), fromBrowserChannel1, toBrowserChannel)
|
||||
}
|
||||
@ -116,9 +120,25 @@ func ConnectionHandler(w http.ResponseWriter, r *http.Request) {
|
||||
// Reads from toBrowserChannel, writes to browser connection
|
||||
go writer.BrowserConnectionWriter(browserConnectionId, browserConnectionContext, c, toBrowserChannel, &wgAll)
|
||||
|
||||
go SessionTokenReader(browserConnectionId, browserConnectionContext, fromBrowserChannel2, &wgAll)
|
||||
go ConnectionInitHandler(browserConnectionId, browserConnectionContext, fromBrowserChannel2, &wgAll)
|
||||
|
||||
// Wait until all routines are finished
|
||||
wgAll.Wait()
|
||||
|
||||
}
|
||||
|
||||
func InvalidateSessionTokenConnections(sessionTokenToInvalidate string) {
|
||||
BrowserConnectionsMutex.RLock()
|
||||
for _, browserConnection := range BrowserConnections {
|
||||
if browserConnection.SessionToken == sessionTokenToInvalidate {
|
||||
if browserConnection.HasuraConnection != nil {
|
||||
log.Debugf("Processing invalidate request for sessionToken %v (hasura connection %v)", sessionTokenToInvalidate, browserConnection.HasuraConnection.Id)
|
||||
browserConnection.HasuraConnection.ContextCancelFunc()
|
||||
log.Debugf("Processed invalidate request for sessionToken %v (hasura connection %v)", sessionTokenToInvalidate, browserConnection.HasuraConnection.Id)
|
||||
|
||||
//go SendInvalidatedUserGraphqlConnectionEvtMsg(browserConnection.SessionToken)
|
||||
}
|
||||
}
|
||||
}
|
||||
BrowserConnectionsMutex.RUnlock()
|
||||
}
|
||||
|
@ -2,18 +2,21 @@ package websrv
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/iMDT/bbb-graphql-middleware/internal/rediscli"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"sync"
|
||||
)
|
||||
|
||||
func SessionTokenReader(connectionId string, browserConnectionContext context.Context, fromBrowser chan interface{}, wg *sync.WaitGroup) {
|
||||
log := log.WithField("_routine", "SessionTokenReader")
|
||||
func ConnectionInitHandler(browserConnectionId string, browserConnectionContext context.Context, fromBrowser chan interface{}, wg *sync.WaitGroup) {
|
||||
log := log.WithField("_routine", "ConnectionInitHandler").WithField("browserConnectionId", browserConnectionId)
|
||||
|
||||
log.Debugf("starting")
|
||||
|
||||
defer wg.Done()
|
||||
defer log.Debugf("finished")
|
||||
|
||||
BrowserConnectionsMutex.RLock()
|
||||
browserConnection := BrowserConnections[connectionId]
|
||||
browserConnection := BrowserConnections[browserConnectionId]
|
||||
BrowserConnectionsMutex.RUnlock()
|
||||
|
||||
// Intercept the fromBrowserMessage channel to get the sessionToken
|
||||
@ -29,7 +32,13 @@ func SessionTokenReader(connectionId string, browserConnectionContext context.Co
|
||||
if sessionToken != nil {
|
||||
sessionToken := headersAsMap["X-Session-Token"].(string)
|
||||
log.Infof("[SessionTokenReader] intercepted session token %v", sessionToken)
|
||||
BrowserConnectionsMutex.Lock()
|
||||
browserConnection.SessionToken = sessionToken
|
||||
BrowserConnectionsMutex.Unlock()
|
||||
|
||||
go rediscli.SendUserGraphqlConnectionStablishedSysMsg(sessionToken, browserConnectionId)
|
||||
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
@ -1,66 +0,0 @@
|
||||
package invalidator
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"github.com/iMDT/bbb-graphql-middleware/internal/websrv"
|
||||
"github.com/redis/go-redis/v9"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"strings"
|
||||
)
|
||||
|
||||
func BrowserConnectionInvalidator() {
|
||||
log := log.WithField("_routine", "BrowserConnectionInvalidator")
|
||||
|
||||
var ctx = context.Background()
|
||||
|
||||
redisClient := redis.NewClient(&redis.Options{
|
||||
Addr: "127.0.0.1:6379",
|
||||
Password: "",
|
||||
DB: 0,
|
||||
})
|
||||
|
||||
subscriber := redisClient.Subscribe(ctx, "from-akka-apps-redis-channel")
|
||||
|
||||
for {
|
||||
msg, err := subscriber.ReceiveMessage(ctx)
|
||||
if err != nil {
|
||||
log.Errorf("error: ", err)
|
||||
}
|
||||
|
||||
// Skip parsing unnecessary messages
|
||||
if !strings.Contains(msg.Payload, "InvalidateUserGraphqlConnectionSysMsg") {
|
||||
continue
|
||||
}
|
||||
|
||||
var message interface{}
|
||||
if err := json.Unmarshal([]byte(msg.Payload), &message); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
messageAsMap := message.(map[string]interface{})
|
||||
|
||||
messageEnvelopeAsMap := messageAsMap["envelope"].(map[string]interface{})
|
||||
|
||||
messageType := messageEnvelopeAsMap["name"]
|
||||
|
||||
if messageType == "InvalidateUserGraphqlConnectionSysMsg" {
|
||||
messageCoreAsMap := messageAsMap["core"].(map[string]interface{})
|
||||
messageBodyAsMap := messageCoreAsMap["body"].(map[string]interface{})
|
||||
sessionTokenToInvalidate := messageBodyAsMap["sessionToken"]
|
||||
log.Debugf("Received invalidate request for sessionToken %v", sessionTokenToInvalidate)
|
||||
|
||||
websrv.BrowserConnectionsMutex.RLock()
|
||||
for _, browserConnection := range websrv.BrowserConnections {
|
||||
if browserConnection.SessionToken == sessionTokenToInvalidate {
|
||||
if browserConnection.HasuraConnection != nil {
|
||||
log.Debugf("Processing invalidate request for sessionToken %v (hasura connection %v)", sessionTokenToInvalidate, browserConnection.HasuraConnection.Id)
|
||||
browserConnection.HasuraConnection.ContextCancelFunc()
|
||||
log.Debugf("Processed invalidate request for sessionToken %v (hasura connection %v)", sessionTokenToInvalidate, browserConnection.HasuraConnection.Id)
|
||||
}
|
||||
}
|
||||
}
|
||||
websrv.BrowserConnectionsMutex.RUnlock()
|
||||
}
|
||||
}
|
||||
}
|
@ -11,6 +11,8 @@ import (
|
||||
|
||||
func BrowserConnectionReader(browserConnectionId string, ctx context.Context, c *websocket.Conn, fromBrowserChannel1 chan interface{}, fromBrowserChannel2 chan interface{}, waitGroups []*sync.WaitGroup) {
|
||||
log := log.WithField("_routine", "BrowserConnectionReader").WithField("browserConnectionId", browserConnectionId)
|
||||
defer log.Debugf("finished")
|
||||
log.Debugf("starting")
|
||||
|
||||
defer func() {
|
||||
close(fromBrowserChannel1)
|
||||
@ -26,12 +28,10 @@ func BrowserConnectionReader(browserConnectionId string, ctx context.Context, c
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}()
|
||||
|
||||
defer log.Debugf("finished")
|
||||
|
||||
for {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
for {
|
||||
var v interface{}
|
||||
err := wsjson.Read(ctx, c, &v)
|
||||
if err != nil {
|
||||
|
@ -10,10 +10,10 @@ import (
|
||||
)
|
||||
|
||||
func BrowserConnectionWriter(browserConnectionId string, ctx context.Context, c *websocket.Conn, toBrowserChannel chan interface{}, wg *sync.WaitGroup) {
|
||||
log := log.WithField("_routine", "websocketConnectionWriter").WithField("browserConnectionId", browserConnectionId)
|
||||
|
||||
defer wg.Done()
|
||||
log := log.WithField("_routine", "BrowserConnectionWriter").WithField("browserConnectionId", browserConnectionId)
|
||||
defer log.Debugf("finished")
|
||||
log.Debugf("starting")
|
||||
defer wg.Done()
|
||||
|
||||
RangeLoop:
|
||||
for {
|
||||
|
@ -217,6 +217,7 @@ CREATE TABLE "user" (
|
||||
"role" varchar(20),
|
||||
"avatar" varchar(500),
|
||||
"color" varchar(7),
|
||||
"sessionToken" varchar(16),
|
||||
"authed" bool,
|
||||
"joined" bool,
|
||||
"banned" bool,
|
||||
@ -646,6 +647,18 @@ GROUP BY u."meetingId", u."userId";
|
||||
CREATE INDEX "idx_user_connectionStatusMetrics_UnstableReport" ON "user_connectionStatusMetrics" ("userId") WHERE "status" != 'normal';
|
||||
|
||||
|
||||
CREATE TABLE "user_graphqlConnection" (
|
||||
"graphqlConnectionId" serial PRIMARY KEY,
|
||||
"sessionToken" varchar(16),
|
||||
"middlewareConnectionId" varchar(12),
|
||||
"stablishedAt" timestamp with time zone,
|
||||
"closedAt" timestamp with time zone
|
||||
);
|
||||
|
||||
CREATE INDEX "idx_user_graphqlConnectionsessionToken" ON "user_graphqlConnection"("sessionToken");
|
||||
|
||||
|
||||
|
||||
--ALTER TABLE "user_connectionStatus" ADD COLUMN "rttInMs" NUMERIC GENERATED ALWAYS AS
|
||||
--(CASE WHEN "connectionAliveAt" IS NULL OR "userClientResponseAt" IS NULL THEN NULL
|
||||
--ELSE EXTRACT(EPOCH FROM ("userClientResponseAt" - "connectionAliveAt")) * 1000
|
||||
|
Loading…
Reference in New Issue
Block a user