diff --git a/akka-bbb-apps/src/main/scala/org/bigbluebutton/Boot.scala b/akka-bbb-apps/src/main/scala/org/bigbluebutton/Boot.scala index af0b53a488..b2a696fec3 100755 --- a/akka-bbb-apps/src/main/scala/org/bigbluebutton/Boot.scala +++ b/akka-bbb-apps/src/main/scala/org/bigbluebutton/Boot.scala @@ -4,17 +4,15 @@ import org.apache.pekko.actor.ActorSystem import org.apache.pekko.event.Logging import org.apache.pekko.http.scaladsl.Http import org.apache.pekko.stream.ActorMaterializer -import org.bigbluebutton.common2.redis.{ MessageSender, RedisConfig, RedisPublisher } +import org.bigbluebutton.common2.redis.{MessageSender, RedisConfig, RedisPublisher} import org.bigbluebutton.core._ import org.bigbluebutton.core.bus._ import org.bigbluebutton.core.pubsub.senders.ReceivedJsonMsgHandlerActor import org.bigbluebutton.core2.AnalyticsActor import org.bigbluebutton.core2.FromAkkaAppsMsgSenderActor -import org.bigbluebutton.endpoint.redis.AppsRedisSubscriberActor -import org.bigbluebutton.endpoint.redis.{ RedisRecorderActor, ExportAnnotationsActor } -import org.bigbluebutton.endpoint.redis.LearningDashboardActor +import org.bigbluebutton.endpoint.redis.{AppsRedisSubscriberActor, ExportAnnotationsActor, GraphqlActionsActor, LearningDashboardActor, RedisRecorderActor} import org.bigbluebutton.common2.bus.IncomingJsonMessageBus -import org.bigbluebutton.service.{ HealthzService, MeetingInfoActor, MeetingInfoService } +import org.bigbluebutton.service.{HealthzService, MeetingInfoActor, MeetingInfoService} object Boot extends App with SystemConfiguration { @@ -69,6 +67,11 @@ object Boot extends App with SystemConfiguration { "LearningDashboardActor" ) + val graphqlActionsActor = system.actorOf( + GraphqlActionsActor.props(system), + "GraphqlActionsActor" + ) + recordingEventBus.subscribe(redisRecorderActor, outMessageChannel) val incomingJsonMessageBus = new IncomingJsonMessageBus @@ -85,6 +88,9 @@ object Boot extends App with SystemConfiguration { outBus2.subscribe(learningDashboardActor, outBbbMsgMsgChannel) bbbMsgBus.subscribe(learningDashboardActor, analyticsChannel) + eventBus.subscribe(graphqlActionsActor, meetingManagerChannel) + bbbMsgBus.subscribe(graphqlActionsActor, analyticsChannel) + val bbbActor = system.actorOf(BigBlueButtonActor.props(system, eventBus, bbbMsgBus, outGW, healthzService), "bigbluebutton-actor") eventBus.subscribe(bbbActor, meetingManagerChannel) diff --git a/akka-bbb-apps/src/main/scala/org/bigbluebutton/core/BigBlueButtonActor.scala b/akka-bbb-apps/src/main/scala/org/bigbluebutton/core/BigBlueButtonActor.scala index 53550db436..51fbe9a903 100755 --- a/akka-bbb-apps/src/main/scala/org/bigbluebutton/core/BigBlueButtonActor.scala +++ b/akka-bbb-apps/src/main/scala/org/bigbluebutton/core/BigBlueButtonActor.scala @@ -75,13 +75,15 @@ class BigBlueButtonActor( private def handleBbbCommonEnvCoreMsg(msg: BbbCommonEnvCoreMsg): Unit = { msg.core match { - case m: CreateMeetingReqMsg => handleCreateMeetingReqMsg(m) - case m: RegisterUserReqMsg => handleRegisterUserReqMsg(m) - case m: GetAllMeetingsReqMsg => handleGetAllMeetingsReqMsg(m) - case m: GetRunningMeetingsReqMsg => handleGetRunningMeetingsReqMsg(m) - case m: CheckAlivePingSysMsg => handleCheckAlivePingSysMsg(m) - case m: ValidateConnAuthTokenSysMsg => handleValidateConnAuthTokenSysMsg(m) - case _ => log.warning("Cannot handle " + msg.envelope.name) + case m: CreateMeetingReqMsg => handleCreateMeetingReqMsg(m) + case m: RegisterUserReqMsg => handleRegisterUserReqMsg(m) + case m: GetAllMeetingsReqMsg => handleGetAllMeetingsReqMsg(m) + case m: GetRunningMeetingsReqMsg => handleGetRunningMeetingsReqMsg(m) + case m: CheckAlivePingSysMsg => handleCheckAlivePingSysMsg(m) + case m: ValidateConnAuthTokenSysMsg => handleValidateConnAuthTokenSysMsg(m) + case _: UserGraphqlConnectionStablishedSysMsg => //Ignore + case _: UserGraphqlConnectionClosedSysMsg => //Ignore + case _ => log.warning("Cannot handle " + msg.envelope.name) } } diff --git a/akka-bbb-apps/src/main/scala/org/bigbluebutton/core/db/UserDAO.scala b/akka-bbb-apps/src/main/scala/org/bigbluebutton/core/db/UserDAO.scala index 58ff1439d2..de3c5fb459 100644 --- a/akka-bbb-apps/src/main/scala/org/bigbluebutton/core/db/UserDAO.scala +++ b/akka-bbb-apps/src/main/scala/org/bigbluebutton/core/db/UserDAO.scala @@ -13,6 +13,7 @@ case class UserDbModel( role: String, avatar: String = "", color: String = "", + sessionToken: String = "", authed: Boolean = false, joined: Boolean = false, banned: Boolean = false, @@ -27,7 +28,7 @@ case class UserDbModel( class UserDbTableDef(tag: Tag) extends Table[UserDbModel](tag, None, "user") { override def * = ( - userId,extId,meetingId,name,role,avatar,color,authed,joined,banned,loggedOut,guest,guestStatus,registeredOn,excludeFromDashboard) <> (UserDbModel.tupled, UserDbModel.unapply) + userId,extId,meetingId,name,role,avatar,color, sessionToken, authed,joined,banned,loggedOut,guest,guestStatus,registeredOn,excludeFromDashboard) <> (UserDbModel.tupled, UserDbModel.unapply) val userId = column[String]("userId", O.PrimaryKey) val extId = column[String]("extId") val meetingId = column[String]("meetingId") @@ -35,6 +36,7 @@ class UserDbTableDef(tag: Tag) extends Table[UserDbModel](tag, None, "user") { val role = column[String]("role") val avatar = column[String]("avatar") val color = column[String]("color") + val sessionToken = column[String]("sessionToken") val authed = column[Boolean]("authed") val joined = column[Boolean]("joined") val banned = column[Boolean]("banned") @@ -57,6 +59,7 @@ object UserDAO { role = regUser.role, avatar = regUser.avatarURL, color = regUser.color, + sessionToken = regUser.sessionToken, authed = regUser.authed, joined = regUser.joined, banned = regUser.banned, diff --git a/akka-bbb-apps/src/main/scala/org/bigbluebutton/core/db/UserGraphqlConnectionDAO.scala b/akka-bbb-apps/src/main/scala/org/bigbluebutton/core/db/UserGraphqlConnectionDAO.scala new file mode 100644 index 0000000000..f3b32610c0 --- /dev/null +++ b/akka-bbb-apps/src/main/scala/org/bigbluebutton/core/db/UserGraphqlConnectionDAO.scala @@ -0,0 +1,64 @@ +package org.bigbluebutton.core.db + +import org.bigbluebutton.core.models.{ VoiceUserState } +import slick.jdbc.PostgresProfile.api._ + +import scala.concurrent.ExecutionContext.Implicits.global +import scala.util.{Failure, Success } + +case class UserGraphqlConnectionDbModel ( + graphqlConnectionId: Option[Int], + sessionToken: String, + middlewareConnectionId: String, + stablishedAt: java.sql.Timestamp, + closedAt: Option[java.sql.Timestamp], +) + +class UserGraphqlConnectionDbTableDef(tag: Tag) extends Table[UserGraphqlConnectionDbModel](tag, None, "user_graphqlConnection") { + override def * = ( + graphqlConnectionId, sessionToken, middlewareConnectionId, stablishedAt, closedAt + ) <> (UserGraphqlConnectionDbModel.tupled, UserGraphqlConnectionDbModel.unapply) + val graphqlConnectionId = column[Option[Int]]("graphqlConnectionId", O.PrimaryKey, O.AutoInc) + val sessionToken = column[String]("sessionToken") + val middlewareConnectionId = column[String]("middlewareConnectionId") + val stablishedAt = column[java.sql.Timestamp]("stablishedAt") + val closedAt = column[Option[java.sql.Timestamp]]("closedAt") +} + + +object UserGraphqlConnectionDAO { + def insert(sessionToken: String, middlewareConnectionId: String) = { + DatabaseConnection.db.run( + TableQuery[UserGraphqlConnectionDbTableDef].insertOrUpdate( + UserGraphqlConnectionDbModel( + graphqlConnectionId = None, + sessionToken = sessionToken, + middlewareConnectionId = middlewareConnectionId, + stablishedAt = new java.sql.Timestamp(System.currentTimeMillis()), + closedAt = None + ) + ) + ).onComplete { + case Success(rowsAffected) => { + DatabaseConnection.logger.debug(s"$rowsAffected row(s) inserted on user_graphqlConnection table!") + } + case Failure(e) => DatabaseConnection.logger.debug(s"Error inserting user_graphqlConnection: $e") + } + } + + def updateClosed(sessionToken: String, middlewareConnectionId: String) = { + DatabaseConnection.db.run( + TableQuery[UserGraphqlConnectionDbTableDef] + .filter(_.sessionToken === sessionToken) + .filter(_.middlewareConnectionId === middlewareConnectionId) + .filter(_.closedAt.isEmpty) + .map(u => u.closedAt) + .update(Some(new java.sql.Timestamp(System.currentTimeMillis()))) + ).onComplete { + case Success(rowsAffected) => DatabaseConnection.logger.debug(s"$rowsAffected row(s) updated on user_graphqlConnection table!") + case Failure(e) => DatabaseConnection.logger.error(s"Error updating user_graphqlConnection: $e") + } + } + + +} diff --git a/akka-bbb-apps/src/main/scala/org/bigbluebutton/core/pubsub/senders/ReceivedJsonMsgHandlerActor.scala b/akka-bbb-apps/src/main/scala/org/bigbluebutton/core/pubsub/senders/ReceivedJsonMsgHandlerActor.scala index 3e74f0ab18..195ecd6078 100755 --- a/akka-bbb-apps/src/main/scala/org/bigbluebutton/core/pubsub/senders/ReceivedJsonMsgHandlerActor.scala +++ b/akka-bbb-apps/src/main/scala/org/bigbluebutton/core/pubsub/senders/ReceivedJsonMsgHandlerActor.scala @@ -447,6 +447,13 @@ class ReceivedJsonMsgHandlerActor( case TimerEndedPubMsg.NAME => routeGenericMsg[TimerEndedPubMsg](envelope, jsonNode) + // Messages from Graphql Middleware + case UserGraphqlConnectionStablishedSysMsg.NAME => + route[UserGraphqlConnectionStablishedSysMsg](meetingManagerChannel, envelope, jsonNode) + + case UserGraphqlConnectionClosedSysMsg.NAME => + route[UserGraphqlConnectionClosedSysMsg](meetingManagerChannel, envelope, jsonNode) + case _ => log.error("Cannot route envelope name " + envelope.name) // do nothing diff --git a/akka-bbb-apps/src/main/scala/org/bigbluebutton/endpoint/redis/GraphqlActionsSubscriberActor.scala b/akka-bbb-apps/src/main/scala/org/bigbluebutton/endpoint/redis/GraphqlActionsSubscriberActor.scala new file mode 100644 index 0000000000..d0448ee898 --- /dev/null +++ b/akka-bbb-apps/src/main/scala/org/bigbluebutton/endpoint/redis/GraphqlActionsSubscriberActor.scala @@ -0,0 +1,43 @@ +package org.bigbluebutton.endpoint.redis + +import org.apache.pekko.actor.{Actor, ActorLogging, ActorSystem, Props} +import org.bigbluebutton.common2.msgs._ +import org.bigbluebutton.core.db.UserGraphqlConnectionDAO + +object GraphqlActionsActor { + def props(system: ActorSystem): Props = + Props( + classOf[GraphqlActionsActor], + system, + ) +} + +class GraphqlActionsActor( + system: ActorSystem, +) extends Actor with ActorLogging { + + def receive = { + //============================= + // 2x messages + case msg: BbbCommonEnvCoreMsg => handleBbbCommonEnvCoreMsg(msg) + case _ => // do nothing + } + + private def handleBbbCommonEnvCoreMsg(msg: BbbCommonEnvCoreMsg): Unit = { + msg.core match { + // Messages from bbb-graphql-middleware + case m: UserGraphqlConnectionStablishedSysMsg => handleUserGraphqlConnectionStablishedSysMsg(m) + case m: UserGraphqlConnectionClosedSysMsg => handleUserGraphqlConnectionClosedSysMsg(m) + case _ => // message not to be handled. + } + } + + private def handleUserGraphqlConnectionStablishedSysMsg(msg: UserGraphqlConnectionStablishedSysMsg) { + UserGraphqlConnectionDAO.insert(msg.body.sessionToken, msg.body.browserConnectionId) + } + + private def handleUserGraphqlConnectionClosedSysMsg(msg: UserGraphqlConnectionClosedSysMsg) { + UserGraphqlConnectionDAO.updateClosed(msg.body.sessionToken, msg.body.browserConnectionId) + } + +} diff --git a/bbb-common-message/src/main/scala/org/bigbluebutton/common2/msgs/SystemMsgs.scala b/bbb-common-message/src/main/scala/org/bigbluebutton/common2/msgs/SystemMsgs.scala index 41e85547cc..22887f2372 100755 --- a/bbb-common-message/src/main/scala/org/bigbluebutton/common2/msgs/SystemMsgs.scala +++ b/bbb-common-message/src/main/scala/org/bigbluebutton/common2/msgs/SystemMsgs.scala @@ -242,6 +242,31 @@ case class InvalidateUserGraphqlConnectionSysMsg( ) extends BbbCoreMsg case class InvalidateUserGraphqlConnectionSysMsgBody(meetingId: String, userId: String, sessionToken: String, reason: String) +/** + * Sent from graphql-middleware to akka-apps + */ + +object UserGraphqlConnectionInvalidatedEvtMsg { val NAME = "UserGraphqlConnectionInvalidatedEvtMsg" } +case class UserGraphqlConnectionInvalidatedEvtMsg( + header: BbbCoreBaseHeader, + body: UserGraphqlConnectionInvalidatedEvtMsgBody +) extends BbbCoreMsg +case class UserGraphqlConnectionInvalidatedEvtMsgBody(sessionToken: String, browserConnectionId: String) + +object UserGraphqlConnectionStablishedSysMsg { val NAME = "UserGraphqlConnectionStablishedSysMsg" } +case class UserGraphqlConnectionStablishedSysMsg( + header: BbbCoreBaseHeader, + body: UserGraphqlConnectionStablishedSysMsgBody +) extends BbbCoreMsg +case class UserGraphqlConnectionStablishedSysMsgBody(sessionToken: String, browserConnectionId: String) + +object UserGraphqlConnectionClosedSysMsg { val NAME = "UserGraphqlConnectionClosedSysMsg" } +case class UserGraphqlConnectionClosedSysMsg( + header: BbbCoreBaseHeader, + body: UserGraphqlConnectionClosedSysMsgBody +) extends BbbCoreMsg +case class UserGraphqlConnectionClosedSysMsgBody(sessionToken: String, browserConnectionId: String) + /** * Sent from akka-apps to bbb-web to inform a summary of the meeting activities */ diff --git a/bbb-graphql-middleware/cmd/bbb-graphql-middleware/main.go b/bbb-graphql-middleware/cmd/bbb-graphql-middleware/main.go index 6a03c2a374..5e1bfffb5f 100644 --- a/bbb-graphql-middleware/cmd/bbb-graphql-middleware/main.go +++ b/bbb-graphql-middleware/cmd/bbb-graphql-middleware/main.go @@ -3,8 +3,8 @@ package main import ( "fmt" "github.com/iMDT/bbb-graphql-middleware/internal/msgpatch" + "github.com/iMDT/bbb-graphql-middleware/internal/rediscli" "github.com/iMDT/bbb-graphql-middleware/internal/websrv" - "github.com/iMDT/bbb-graphql-middleware/internal/websrv/invalidator" log "github.com/sirupsen/logrus" "net/http" "os" @@ -20,8 +20,8 @@ func main() { //Clear cache from last exec msgpatch.ClearAllCaches() - // Connection invalidator - go invalidator.BrowserConnectionInvalidator() + // Listen msgs from akka (for example to invalidate connection) + go rediscli.StartRedisListener() // Websocket listener // set default port diff --git a/bbb-graphql-middleware/internal/hascli/conn/reader/reader.go b/bbb-graphql-middleware/internal/hascli/conn/reader/reader.go index 515d131724..715d014a5c 100644 --- a/bbb-graphql-middleware/internal/hascli/conn/reader/reader.go +++ b/bbb-graphql-middleware/internal/hascli/conn/reader/reader.go @@ -12,12 +12,12 @@ import ( // HasuraConnectionReader consumes messages from Hasura connection and add send to the browser channel func HasuraConnectionReader(hc *common.HasuraConnection, fromHasuraToBrowserChannel chan interface{}, fromBrowserToHasuraChannel chan interface{}, wg *sync.WaitGroup) { log := log.WithField("_routine", "HasuraConnectionReader").WithField("browserConnectionId", hc.Browserconn.Id).WithField("hasuraConnectionId", hc.Id) + defer log.Debugf("finished") + log.Debugf("starting") defer wg.Done() defer hc.ContextCancelFunc() - defer log.Debugf("finished") - for { // Read a message from hasura var message interface{} diff --git a/bbb-graphql-middleware/internal/rediscli/connection.go b/bbb-graphql-middleware/internal/rediscli/connection.go new file mode 100644 index 0000000000..21fc0585e0 --- /dev/null +++ b/bbb-graphql-middleware/internal/rediscli/connection.go @@ -0,0 +1,13 @@ +package rediscli + +import "github.com/redis/go-redis/v9" + +var redisClient = redis.NewClient(&redis.Options{ + Addr: "127.0.0.1:6379", + Password: "", + DB: 0, +}) + +func GetRedisConn() *redis.Client { + return redisClient +} diff --git a/bbb-graphql-middleware/internal/rediscli/listener.go b/bbb-graphql-middleware/internal/rediscli/listener.go new file mode 100644 index 0000000000..09a96585d5 --- /dev/null +++ b/bbb-graphql-middleware/internal/rediscli/listener.go @@ -0,0 +1,49 @@ +package rediscli + +import ( + "context" + "encoding/json" + log "github.com/sirupsen/logrus" + "strings" +) + +func StartRedisListener() { + log := log.WithField("_routine", "StartRedisListener") + + var ctx = context.Background() + + subscriber := GetRedisConn().Subscribe(ctx, "from-akka-apps-redis-channel") + + for { + msg, err := subscriber.ReceiveMessage(ctx) + if err != nil { + log.Errorf("error: ", err) + } + + // Skip parsing unnecessary messages + if !strings.Contains(msg.Payload, "InvalidateUserGraphqlConnectionSysMsg") { + continue + } + + var message interface{} + if err := json.Unmarshal([]byte(msg.Payload), &message); err != nil { + panic(err) + } + + messageAsMap := message.(map[string]interface{}) + + messageEnvelopeAsMap := messageAsMap["envelope"].(map[string]interface{}) + + messageType := messageEnvelopeAsMap["name"] + + if messageType == "InvalidateUserGraphqlConnectionSysMsg" { + messageCoreAsMap := messageAsMap["core"].(map[string]interface{}) + messageBodyAsMap := messageCoreAsMap["body"].(map[string]interface{}) + sessionTokenToInvalidate := messageBodyAsMap["sessionToken"] + log.Debugf("Received invalidate request for sessionToken %v", sessionTokenToInvalidate) + + //Not being used yet + //websrv.InvalidateSessionTokenConnections(sessionTokenToInvalidate.(string)) + } + } +} diff --git a/bbb-graphql-middleware/internal/rediscli/sender.go b/bbb-graphql-middleware/internal/rediscli/sender.go new file mode 100644 index 0000000000..69e10ac92c --- /dev/null +++ b/bbb-graphql-middleware/internal/rediscli/sender.go @@ -0,0 +1,74 @@ +package rediscli + +import ( + "context" + "encoding/json" + "fmt" + "time" +) + +func getCurrTimeInMs() int64 { + currentTime := time.Now() + milliseconds := currentTime.UnixNano() / int64(time.Millisecond) + return milliseconds +} + +func sendBbbCoreMsgToRedis(name string, body map[string]interface{}) { + channelName := "to-akka-apps-redis-channel" + + message := map[string]interface{}{ + "envelope": map[string]interface{}{ + "name": name, + "routing": map[string]interface{}{ + "sender": "bbb-graphql-middleware", + }, + "timestamp": getCurrTimeInMs(), + }, + "core": map[string]interface{}{ + "header": map[string]interface{}{ + "name": name, + }, + "body": body, + }, + } + + messageJSON, err := json.Marshal(message) + if err != nil { + fmt.Printf("Error while marshaling message to json: %v\n", err) + return + } + + err = GetRedisConn().Publish(context.Background(), channelName, messageJSON).Err() + if err != nil { + fmt.Printf("Error while sending msg to redis channel: %v\n", err) + return + } + + fmt.Printf("JSON message sent to channel %s:\n%s\n", channelName, messageJSON) +} + +func SendUserGraphqlConnectionInvalidatedEvtMsg(sessionToken string) { + var body = map[string]interface{}{ + "sessionToken": sessionToken, + } + + sendBbbCoreMsgToRedis("UserGraphqlConnectionInvalidatedEvtMsg", body) +} + +func SendUserGraphqlConnectionStablishedSysMsg(sessionToken string, browserConnectionId string) { + var body = map[string]interface{}{ + "sessionToken": sessionToken, + "browserConnectionId": browserConnectionId, + } + + sendBbbCoreMsgToRedis("UserGraphqlConnectionStablishedSysMsg", body) +} + +func SendUserGraphqlConnectionClosedSysMsg(sessionToken string, browserConnectionId string) { + var body = map[string]interface{}{ + "sessionToken": sessionToken, + "browserConnectionId": browserConnectionId, + } + + sendBbbCoreMsgToRedis("UserGraphqlConnectionClosedSysMsg", body) +} diff --git a/bbb-graphql-middleware/internal/websrv/connhandler.go b/bbb-graphql-middleware/internal/websrv/connhandler.go index d1a7254de3..7e43dad27d 100644 --- a/bbb-graphql-middleware/internal/websrv/connhandler.go +++ b/bbb-graphql-middleware/internal/websrv/connhandler.go @@ -6,6 +6,7 @@ import ( "github.com/iMDT/bbb-graphql-middleware/internal/common" "github.com/iMDT/bbb-graphql-middleware/internal/hascli" "github.com/iMDT/bbb-graphql-middleware/internal/msgpatch" + "github.com/iMDT/bbb-graphql-middleware/internal/rediscli" "github.com/iMDT/bbb-graphql-middleware/internal/websrv/reader" "github.com/iMDT/bbb-graphql-middleware/internal/websrv/writer" log "github.com/sirupsen/logrus" @@ -61,8 +62,10 @@ func ConnectionHandler(w http.ResponseWriter, r *http.Request) { defer func() { msgpatch.RemoveConnCacheDir(browserConnectionId) BrowserConnectionsMutex.Lock() + sessionTokenRemoved := BrowserConnections[browserConnectionId].SessionToken delete(BrowserConnections, browserConnectionId) BrowserConnectionsMutex.Unlock() + go rediscli.SendUserGraphqlConnectionClosedSysMsg(sessionTokenRemoved, browserConnectionId) log.Infof("connection removed") }() @@ -90,6 +93,7 @@ func ConnectionHandler(w http.ResponseWriter, r *http.Request) { BrowserConnectionsMutex.RLock() thisBrowserConnection := BrowserConnections[browserConnectionId] BrowserConnectionsMutex.RUnlock() + log.Debugf("created hasura client") if thisBrowserConnection != nil { hascli.HasuraClient(thisBrowserConnection, r.Cookies(), fromBrowserChannel1, toBrowserChannel) } @@ -116,9 +120,25 @@ func ConnectionHandler(w http.ResponseWriter, r *http.Request) { // Reads from toBrowserChannel, writes to browser connection go writer.BrowserConnectionWriter(browserConnectionId, browserConnectionContext, c, toBrowserChannel, &wgAll) - go SessionTokenReader(browserConnectionId, browserConnectionContext, fromBrowserChannel2, &wgAll) + go ConnectionInitHandler(browserConnectionId, browserConnectionContext, fromBrowserChannel2, &wgAll) // Wait until all routines are finished wgAll.Wait() } + +func InvalidateSessionTokenConnections(sessionTokenToInvalidate string) { + BrowserConnectionsMutex.RLock() + for _, browserConnection := range BrowserConnections { + if browserConnection.SessionToken == sessionTokenToInvalidate { + if browserConnection.HasuraConnection != nil { + log.Debugf("Processing invalidate request for sessionToken %v (hasura connection %v)", sessionTokenToInvalidate, browserConnection.HasuraConnection.Id) + browserConnection.HasuraConnection.ContextCancelFunc() + log.Debugf("Processed invalidate request for sessionToken %v (hasura connection %v)", sessionTokenToInvalidate, browserConnection.HasuraConnection.Id) + + //go SendInvalidatedUserGraphqlConnectionEvtMsg(browserConnection.SessionToken) + } + } + } + BrowserConnectionsMutex.RUnlock() +} diff --git a/bbb-graphql-middleware/internal/websrv/sessiontokenreader.go b/bbb-graphql-middleware/internal/websrv/conninithandler.go similarity index 62% rename from bbb-graphql-middleware/internal/websrv/sessiontokenreader.go rename to bbb-graphql-middleware/internal/websrv/conninithandler.go index 97e789c7bd..1f7d467412 100644 --- a/bbb-graphql-middleware/internal/websrv/sessiontokenreader.go +++ b/bbb-graphql-middleware/internal/websrv/conninithandler.go @@ -2,18 +2,21 @@ package websrv import ( "context" + "github.com/iMDT/bbb-graphql-middleware/internal/rediscli" log "github.com/sirupsen/logrus" "sync" ) -func SessionTokenReader(connectionId string, browserConnectionContext context.Context, fromBrowser chan interface{}, wg *sync.WaitGroup) { - log := log.WithField("_routine", "SessionTokenReader") +func ConnectionInitHandler(browserConnectionId string, browserConnectionContext context.Context, fromBrowser chan interface{}, wg *sync.WaitGroup) { + log := log.WithField("_routine", "ConnectionInitHandler").WithField("browserConnectionId", browserConnectionId) + + log.Debugf("starting") defer wg.Done() defer log.Debugf("finished") BrowserConnectionsMutex.RLock() - browserConnection := BrowserConnections[connectionId] + browserConnection := BrowserConnections[browserConnectionId] BrowserConnectionsMutex.RUnlock() // Intercept the fromBrowserMessage channel to get the sessionToken @@ -29,7 +32,13 @@ func SessionTokenReader(connectionId string, browserConnectionContext context.Co if sessionToken != nil { sessionToken := headersAsMap["X-Session-Token"].(string) log.Infof("[SessionTokenReader] intercepted session token %v", sessionToken) + BrowserConnectionsMutex.Lock() browserConnection.SessionToken = sessionToken + BrowserConnectionsMutex.Unlock() + + go rediscli.SendUserGraphqlConnectionStablishedSysMsg(sessionToken, browserConnectionId) + + break } } } diff --git a/bbb-graphql-middleware/internal/websrv/invalidator/invalidator.go b/bbb-graphql-middleware/internal/websrv/invalidator/invalidator.go deleted file mode 100644 index 5f23e976c0..0000000000 --- a/bbb-graphql-middleware/internal/websrv/invalidator/invalidator.go +++ /dev/null @@ -1,66 +0,0 @@ -package invalidator - -import ( - "context" - "encoding/json" - "github.com/iMDT/bbb-graphql-middleware/internal/websrv" - "github.com/redis/go-redis/v9" - log "github.com/sirupsen/logrus" - "strings" -) - -func BrowserConnectionInvalidator() { - log := log.WithField("_routine", "BrowserConnectionInvalidator") - - var ctx = context.Background() - - redisClient := redis.NewClient(&redis.Options{ - Addr: "127.0.0.1:6379", - Password: "", - DB: 0, - }) - - subscriber := redisClient.Subscribe(ctx, "from-akka-apps-redis-channel") - - for { - msg, err := subscriber.ReceiveMessage(ctx) - if err != nil { - log.Errorf("error: ", err) - } - - // Skip parsing unnecessary messages - if !strings.Contains(msg.Payload, "InvalidateUserGraphqlConnectionSysMsg") { - continue - } - - var message interface{} - if err := json.Unmarshal([]byte(msg.Payload), &message); err != nil { - panic(err) - } - - messageAsMap := message.(map[string]interface{}) - - messageEnvelopeAsMap := messageAsMap["envelope"].(map[string]interface{}) - - messageType := messageEnvelopeAsMap["name"] - - if messageType == "InvalidateUserGraphqlConnectionSysMsg" { - messageCoreAsMap := messageAsMap["core"].(map[string]interface{}) - messageBodyAsMap := messageCoreAsMap["body"].(map[string]interface{}) - sessionTokenToInvalidate := messageBodyAsMap["sessionToken"] - log.Debugf("Received invalidate request for sessionToken %v", sessionTokenToInvalidate) - - websrv.BrowserConnectionsMutex.RLock() - for _, browserConnection := range websrv.BrowserConnections { - if browserConnection.SessionToken == sessionTokenToInvalidate { - if browserConnection.HasuraConnection != nil { - log.Debugf("Processing invalidate request for sessionToken %v (hasura connection %v)", sessionTokenToInvalidate, browserConnection.HasuraConnection.Id) - browserConnection.HasuraConnection.ContextCancelFunc() - log.Debugf("Processed invalidate request for sessionToken %v (hasura connection %v)", sessionTokenToInvalidate, browserConnection.HasuraConnection.Id) - } - } - } - websrv.BrowserConnectionsMutex.RUnlock() - } - } -} diff --git a/bbb-graphql-middleware/internal/websrv/reader/reader.go b/bbb-graphql-middleware/internal/websrv/reader/reader.go index 551ddd0904..03cf231e54 100644 --- a/bbb-graphql-middleware/internal/websrv/reader/reader.go +++ b/bbb-graphql-middleware/internal/websrv/reader/reader.go @@ -11,6 +11,8 @@ import ( func BrowserConnectionReader(browserConnectionId string, ctx context.Context, c *websocket.Conn, fromBrowserChannel1 chan interface{}, fromBrowserChannel2 chan interface{}, waitGroups []*sync.WaitGroup) { log := log.WithField("_routine", "BrowserConnectionReader").WithField("browserConnectionId", browserConnectionId) + defer log.Debugf("finished") + log.Debugf("starting") defer func() { close(fromBrowserChannel1) @@ -26,12 +28,10 @@ func BrowserConnectionReader(browserConnectionId string, ctx context.Context, c time.Sleep(100 * time.Millisecond) }() - defer log.Debugf("finished") + ctx, cancel := context.WithCancel(ctx) + defer cancel() for { - ctx, cancel := context.WithCancel(ctx) - defer cancel() - var v interface{} err := wsjson.Read(ctx, c, &v) if err != nil { diff --git a/bbb-graphql-middleware/internal/websrv/writer/writer.go b/bbb-graphql-middleware/internal/websrv/writer/writer.go index 75fa65675e..86977bcfba 100644 --- a/bbb-graphql-middleware/internal/websrv/writer/writer.go +++ b/bbb-graphql-middleware/internal/websrv/writer/writer.go @@ -10,10 +10,10 @@ import ( ) func BrowserConnectionWriter(browserConnectionId string, ctx context.Context, c *websocket.Conn, toBrowserChannel chan interface{}, wg *sync.WaitGroup) { - log := log.WithField("_routine", "websocketConnectionWriter").WithField("browserConnectionId", browserConnectionId) - - defer wg.Done() + log := log.WithField("_routine", "BrowserConnectionWriter").WithField("browserConnectionId", browserConnectionId) defer log.Debugf("finished") + log.Debugf("starting") + defer wg.Done() RangeLoop: for { diff --git a/bbb-graphql-server/bbb_schema.sql b/bbb-graphql-server/bbb_schema.sql index 2b927cde8a..96ca8f407f 100644 --- a/bbb-graphql-server/bbb_schema.sql +++ b/bbb-graphql-server/bbb_schema.sql @@ -217,6 +217,7 @@ CREATE TABLE "user" ( "role" varchar(20), "avatar" varchar(500), "color" varchar(7), + "sessionToken" varchar(16), "authed" bool, "joined" bool, "banned" bool, @@ -646,6 +647,18 @@ GROUP BY u."meetingId", u."userId"; CREATE INDEX "idx_user_connectionStatusMetrics_UnstableReport" ON "user_connectionStatusMetrics" ("userId") WHERE "status" != 'normal'; +CREATE TABLE "user_graphqlConnection" ( + "graphqlConnectionId" serial PRIMARY KEY, + "sessionToken" varchar(16), + "middlewareConnectionId" varchar(12), + "stablishedAt" timestamp with time zone, + "closedAt" timestamp with time zone +); + +CREATE INDEX "idx_user_graphqlConnectionsessionToken" ON "user_graphqlConnection"("sessionToken"); + + + --ALTER TABLE "user_connectionStatus" ADD COLUMN "rttInMs" NUMERIC GENERATED ALWAYS AS --(CASE WHEN "connectionAliveAt" IS NULL OR "userClientResponseAt" IS NULL THEN NULL --ELSE EXTRACT(EPOCH FROM ("userClientResponseAt" - "connectionAliveAt")) * 1000