Simplify user connection status flow
This commit is contained in:
parent
1e6f7d9136
commit
97dc51e8c3
@ -5,7 +5,7 @@ import org.slf4j.LoggerFactory
|
||||
|
||||
import java.io.{ ByteArrayInputStream, File }
|
||||
import scala.io.BufferedSource
|
||||
import scala.util.{ Failure, Success, Try }
|
||||
import scala.util.{ Failure, Success }
|
||||
|
||||
object ClientSettings extends SystemConfiguration {
|
||||
var clientSettingsFromFile: Map[String, Object] = Map("" -> "")
|
||||
@ -82,6 +82,24 @@ object ClientSettings extends SystemConfiguration {
|
||||
}
|
||||
}
|
||||
|
||||
def getConfigPropertyValueByPathAsListOfStringOrElse(map: Map[String, Any], path: String, alternativeValue: List[String]): List[String] = {
|
||||
getConfigPropertyValueByPath(map, path) match {
|
||||
case Some(configValue: List[String]) => configValue
|
||||
case _ =>
|
||||
logger.debug(s"Config `$path` with type List[String] not found in clientSettings.")
|
||||
alternativeValue
|
||||
}
|
||||
}
|
||||
|
||||
def getConfigPropertyValueByPathAsListOfIntOrElse(map: Map[String, Any], path: String, alternativeValue: List[Int]): List[Int] = {
|
||||
getConfigPropertyValueByPath(map, path) match {
|
||||
case Some(configValue: List[Int]) => configValue
|
||||
case _ =>
|
||||
logger.debug(s"Config `$path` with type List[Int] not found in clientSettings.")
|
||||
alternativeValue
|
||||
}
|
||||
}
|
||||
|
||||
def getConfigPropertyValueByPath(map: Map[String, Any], path: String): Option[Any] = {
|
||||
val keys = path.split("\\.")
|
||||
|
||||
|
@ -1,9 +1,10 @@
|
||||
package org.bigbluebutton.core.apps.users
|
||||
|
||||
import org.bigbluebutton.ClientSettings.{ getConfigPropertyValueByPathAsListOfIntOrElse, getConfigPropertyValueByPathAsListOfStringOrElse }
|
||||
import org.bigbluebutton.common2.msgs._
|
||||
import org.bigbluebutton.core.apps.RightsManagementTrait
|
||||
import org.bigbluebutton.core.db.UserConnectionStatusDAO
|
||||
import org.bigbluebutton.core.models.{ UserState, Users2x }
|
||||
import org.bigbluebutton.core.models.Users2x
|
||||
import org.bigbluebutton.core.running.{ LiveMeeting, OutMsgRouter }
|
||||
|
||||
trait UserConnectionAliveReqMsgHdlr extends RightsManagementTrait {
|
||||
@ -13,13 +14,42 @@ trait UserConnectionAliveReqMsgHdlr extends RightsManagementTrait {
|
||||
val outGW: OutMsgRouter
|
||||
|
||||
def handleUserConnectionAliveReqMsg(msg: UserConnectionAliveReqMsg): Unit = {
|
||||
log.info("handleUserConnectionAliveReqMsg: userId={}", msg.body.userId)
|
||||
log.info("handleUserConnectionAliveReqMsg: networkRttInMs={} userId={}", msg.body.networkRttInMs, msg.body.userId)
|
||||
|
||||
for {
|
||||
user <- Users2x.findWithIntId(liveMeeting.users2x, msg.body.userId)
|
||||
} yield {
|
||||
UserConnectionStatusDAO.updateUserAlive(user.intId)
|
||||
val rtt: Option[Double] = msg.body.networkRttInMs match {
|
||||
case 0 => None
|
||||
case rtt: Double => Some(rtt)
|
||||
}
|
||||
|
||||
val status = getLevelFromRtt(msg.body.networkRttInMs)
|
||||
|
||||
UserConnectionStatusDAO.updateUserAlive(user.intId, rtt, status)
|
||||
}
|
||||
}
|
||||
|
||||
def getLevelFromRtt(networkRttInMs: Double): String = {
|
||||
val levelOptions = getConfigPropertyValueByPathAsListOfStringOrElse(
|
||||
liveMeeting.clientSettings,
|
||||
"public.stats.level",
|
||||
List("warning", "danger", "critical")
|
||||
)
|
||||
|
||||
val rttOptions = getConfigPropertyValueByPathAsListOfIntOrElse(
|
||||
liveMeeting.clientSettings,
|
||||
"public.stats.rtt",
|
||||
List(500, 1000, 2000)
|
||||
)
|
||||
|
||||
val statusRttXLevel = levelOptions.zip(rttOptions).reverse
|
||||
|
||||
val statusFound = statusRttXLevel.collectFirst {
|
||||
case (level, rtt) if networkRttInMs > rtt => level
|
||||
}
|
||||
|
||||
statusFound.getOrElse("normal")
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -1,25 +0,0 @@
|
||||
package org.bigbluebutton.core.apps.users
|
||||
|
||||
import org.bigbluebutton.common2.msgs._
|
||||
import org.bigbluebutton.core.apps.RightsManagementTrait
|
||||
import org.bigbluebutton.core.db.UserConnectionStatusDAO
|
||||
import org.bigbluebutton.core.models.{ UserState, Users2x }
|
||||
import org.bigbluebutton.core.running.{ LiveMeeting, OutMsgRouter }
|
||||
|
||||
trait UserConnectionUpdateRttReqMsgHdlr extends RightsManagementTrait {
|
||||
this: UsersApp =>
|
||||
|
||||
val liveMeeting: LiveMeeting
|
||||
val outGW: OutMsgRouter
|
||||
|
||||
def handleUserConnectionUpdateRttReqMsg(msg: UserConnectionUpdateRttReqMsg): Unit = {
|
||||
log.info("handleUserConnectionUpdateRttReqMsg: networkRttInMs={} userId={}", msg.body.networkRttInMs, msg.body.userId)
|
||||
|
||||
for {
|
||||
user <- Users2x.findWithIntId(liveMeeting.users2x, msg.body.userId)
|
||||
} yield {
|
||||
UserConnectionStatusDAO.updateUserRtt(user.intId, msg.body.networkRttInMs)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
@ -169,7 +169,6 @@ class UsersApp(
|
||||
with ChangeUserPinStateReqMsgHdlr
|
||||
with ChangeUserMobileFlagReqMsgHdlr
|
||||
with UserConnectionAliveReqMsgHdlr
|
||||
with UserConnectionUpdateRttReqMsgHdlr
|
||||
with ChangeUserReactionEmojiReqMsgHdlr
|
||||
with ChangeUserRaiseHandReqMsgHdlr
|
||||
with ChangeUserAwayReqMsgHdlr
|
||||
|
@ -5,22 +5,24 @@ import scala.concurrent.ExecutionContext.Implicits.global
|
||||
import scala.util.{ Failure, Success }
|
||||
|
||||
case class UserConnectionStatusDbModel(
|
||||
userId: String,
|
||||
meetingId: String,
|
||||
connectionAliveAt: Option[java.sql.Timestamp],
|
||||
userClientResponseAt: Option[java.sql.Timestamp],
|
||||
networkRttInMs: Option[Double]
|
||||
userId: String,
|
||||
meetingId: String,
|
||||
connectionAliveAt: Option[java.sql.Timestamp],
|
||||
networkRttInMs: Option[Double],
|
||||
status: String,
|
||||
statusUpdatedAt: Option[java.sql.Timestamp]
|
||||
)
|
||||
|
||||
class UserConnectionStatusDbTableDef(tag: Tag) extends Table[UserConnectionStatusDbModel](tag, None, "user_connectionStatus") {
|
||||
override def * = (
|
||||
userId, meetingId, connectionAliveAt, userClientResponseAt, networkRttInMs
|
||||
userId, meetingId, connectionAliveAt, networkRttInMs, status, statusUpdatedAt
|
||||
) <> (UserConnectionStatusDbModel.tupled, UserConnectionStatusDbModel.unapply)
|
||||
val userId = column[String]("userId", O.PrimaryKey)
|
||||
val meetingId = column[String]("meetingId")
|
||||
val connectionAliveAt = column[Option[java.sql.Timestamp]]("connectionAliveAt")
|
||||
val userClientResponseAt = column[Option[java.sql.Timestamp]]("userClientResponseAt")
|
||||
val networkRttInMs = column[Option[Double]]("networkRttInMs")
|
||||
val status = column[String]("status")
|
||||
val statusUpdatedAt = column[Option[java.sql.Timestamp]]("statusUpdatedAt")
|
||||
}
|
||||
|
||||
object UserConnectionStatusDAO {
|
||||
@ -32,8 +34,9 @@ object UserConnectionStatusDAO {
|
||||
userId = userId,
|
||||
meetingId = meetingId,
|
||||
connectionAliveAt = None,
|
||||
userClientResponseAt = None,
|
||||
networkRttInMs = None
|
||||
networkRttInMs = None,
|
||||
status = "normal",
|
||||
statusUpdatedAt = None
|
||||
)
|
||||
)
|
||||
).onComplete {
|
||||
@ -42,28 +45,23 @@ object UserConnectionStatusDAO {
|
||||
}
|
||||
}
|
||||
|
||||
def updateUserAlive(userId: String) = {
|
||||
def updateUserAlive(userId: String, rtt: Option[Double], status: String) = {
|
||||
DatabaseConnection.db.run(
|
||||
TableQuery[UserConnectionStatusDbTableDef]
|
||||
.filter(_.userId === userId)
|
||||
.map(t => (t.connectionAliveAt))
|
||||
.update(Some(new java.sql.Timestamp(System.currentTimeMillis())))
|
||||
.map(t => (t.connectionAliveAt, t.networkRttInMs, t.status, t.statusUpdatedAt))
|
||||
.update(
|
||||
(
|
||||
Some(new java.sql.Timestamp(System.currentTimeMillis())),
|
||||
rtt,
|
||||
status,
|
||||
Some(new java.sql.Timestamp(System.currentTimeMillis())),
|
||||
)
|
||||
)
|
||||
).onComplete {
|
||||
case Success(rowsAffected) => DatabaseConnection.logger.debug(s"$rowsAffected row(s) updated connectionAliveAt on UserConnectionStatus table!")
|
||||
case Failure(e) => DatabaseConnection.logger.debug(s"Error updating connectionAliveAt on UserConnectionStatus: $e")
|
||||
}
|
||||
}
|
||||
|
||||
def updateUserRtt(userId: String, networkRttInMs: Double) = {
|
||||
DatabaseConnection.db.run(
|
||||
TableQuery[UserConnectionStatusDbTableDef]
|
||||
.filter(_.userId === userId)
|
||||
.map(t => (t.networkRttInMs, t.userClientResponseAt))
|
||||
.update((Some(networkRttInMs), Some(new java.sql.Timestamp(System.currentTimeMillis()))))
|
||||
).onComplete {
|
||||
case Success(rowsAffected) => DatabaseConnection.logger.debug(s"$rowsAffected row(s) updated networkRttInMs on UserConnectionStatus table!")
|
||||
case Failure(e) => DatabaseConnection.logger.debug(s"Error updating networkRttInMs on UserConnectionStatus: $e")
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -113,8 +113,6 @@ class ReceivedJsonMsgHandlerActor(
|
||||
routeGenericMsg[ChangeUserMobileFlagReqMsg](envelope, jsonNode)
|
||||
case UserConnectionAliveReqMsg.NAME =>
|
||||
routeGenericMsg[UserConnectionAliveReqMsg](envelope, jsonNode)
|
||||
case UserConnectionUpdateRttReqMsg.NAME =>
|
||||
routeGenericMsg[UserConnectionUpdateRttReqMsg](envelope, jsonNode)
|
||||
case SetUserSpeechLocaleReqMsg.NAME =>
|
||||
routeGenericMsg[SetUserSpeechLocaleReqMsg](envelope, jsonNode)
|
||||
case SetUserSpeechOptionsReqMsg.NAME =>
|
||||
|
@ -405,7 +405,6 @@ class MeetingActor(
|
||||
case m: ChangeUserPinStateReqMsg => usersApp.handleChangeUserPinStateReqMsg(m)
|
||||
case m: ChangeUserMobileFlagReqMsg => usersApp.handleChangeUserMobileFlagReqMsg(m)
|
||||
case m: UserConnectionAliveReqMsg => usersApp.handleUserConnectionAliveReqMsg(m)
|
||||
case m: UserConnectionUpdateRttReqMsg => usersApp.handleUserConnectionUpdateRttReqMsg(m)
|
||||
case m: SetUserSpeechLocaleReqMsg => usersApp.handleSetUserSpeechLocaleReqMsg(m)
|
||||
case m: SetUserSpeechOptionsReqMsg => usersApp.handleSetUserSpeechOptionsReqMsg(m)
|
||||
|
||||
|
@ -79,7 +79,6 @@ class AnalyticsActor(val includeChat: Boolean) extends Actor with ActorLogging {
|
||||
case m: ChangeUserPinStateReqMsg => logMessage(msg)
|
||||
case m: ChangeUserMobileFlagReqMsg => logMessage(msg)
|
||||
case m: UserConnectionAliveReqMsg => logMessage(msg)
|
||||
case m: UserConnectionUpdateRttReqMsg => logMessage(msg)
|
||||
case m: ScreenshareRtmpBroadcastStartedVoiceConfEvtMsg => logMessage(msg)
|
||||
case m: ScreenshareRtmpBroadcastStoppedVoiceConfEvtMsg => logMessage(msg)
|
||||
case m: ScreenshareRtmpBroadcastStartedEvtMsg => logMessage(msg)
|
||||
|
@ -298,14 +298,7 @@ case class ChangeUserMobileFlagReqMsgBody(userId: String, mobile: Boolean)
|
||||
*/
|
||||
object UserConnectionAliveReqMsg { val NAME = "UserConnectionAliveReqMsg" }
|
||||
case class UserConnectionAliveReqMsg(header: BbbClientMsgHeader, body: UserConnectionAliveReqMsgBody) extends StandardMsg
|
||||
case class UserConnectionAliveReqMsgBody(userId: String)
|
||||
|
||||
/**
|
||||
* Sent from client to inform the RTT (time it took to send the Alive and receive confirmation).
|
||||
*/
|
||||
object UserConnectionUpdateRttReqMsg { val NAME = "UserConnectionUpdateRttReqMsg" }
|
||||
case class UserConnectionUpdateRttReqMsg(header: BbbClientMsgHeader, body: UserConnectionUpdateRttReqMsgBody) extends StandardMsg
|
||||
case class UserConnectionUpdateRttReqMsgBody(userId: String, networkRttInMs: Double)
|
||||
case class UserConnectionAliveReqMsgBody(userId: String, networkRttInMs: Double)
|
||||
|
||||
/**
|
||||
* Sent to all clients about a user mobile flag.
|
||||
|
@ -16,6 +16,7 @@ export default function buildRedisMessage(sessionVariables: Record<string, unkno
|
||||
|
||||
const body = {
|
||||
userId: routing.userId,
|
||||
networkRttInMs: input.networkRttInMs
|
||||
};
|
||||
|
||||
return { eventName, routing, header, body };
|
||||
|
@ -1,23 +0,0 @@
|
||||
import { RedisMessage } from '../types';
|
||||
|
||||
export default function buildRedisMessage(sessionVariables: Record<string, unknown>, input: Record<string, unknown>): RedisMessage {
|
||||
const eventName = `UserConnectionUpdateRttReqMsg`;
|
||||
|
||||
const routing = {
|
||||
meetingId: sessionVariables['x-hasura-meetingid'] as String,
|
||||
userId: sessionVariables['x-hasura-userid'] as String
|
||||
};
|
||||
|
||||
const header = {
|
||||
name: eventName,
|
||||
meetingId: routing.meetingId,
|
||||
userId: routing.userId
|
||||
};
|
||||
|
||||
const body = {
|
||||
userId: routing.userId,
|
||||
networkRttInMs: input.networkRttInMs
|
||||
};
|
||||
|
||||
return { eventName, routing, header, body };
|
||||
}
|
@ -670,10 +670,9 @@ JOIN "user" u ON u."userId" = "user_breakoutRoom"."userId";
|
||||
CREATE TABLE "user_connectionStatus" (
|
||||
"userId" varchar(50) PRIMARY KEY REFERENCES "user"("userId") ON DELETE CASCADE,
|
||||
"meetingId" varchar(100) REFERENCES "meeting"("meetingId") ON DELETE CASCADE,
|
||||
"connectionAliveAtMaxIntervalMs" numeric,
|
||||
"connectionAliveAt" timestamp with time zone,
|
||||
"userClientResponseAt" timestamp with time zone,
|
||||
"networkRttInMs" numeric,
|
||||
"applicationRttInMs" numeric,
|
||||
"status" varchar(25),
|
||||
"statusUpdatedAt" timestamp with time zone
|
||||
);
|
||||
@ -681,9 +680,30 @@ create index "idx_user_connectionStatus_meetingId" on "user_connectionStatus"("m
|
||||
|
||||
create view "v_user_connectionStatus" as select * from "user_connectionStatus";
|
||||
|
||||
|
||||
--Populate connectionAliveAtMaxIntervalMs to calc clientNotResponding
|
||||
--It will sum settings public.stats.interval + public.stats.rtt (critical)
|
||||
CREATE OR REPLACE FUNCTION "update_connectionAliveAtMaxIntervalMs"()
|
||||
RETURNS TRIGGER AS $$
|
||||
BEGIN
|
||||
SELECT ("clientSettingsJson"->'public'->'stats'->'rtt'->>(jsonb_array_length("clientSettingsJson"->'public'->'stats'->'rtt') - 1))::int
|
||||
+
|
||||
("clientSettingsJson"->'public'->'stats'->'interval')::int
|
||||
INTO NEW."connectionAliveAtMaxIntervalMs"
|
||||
from "meeting_clientSettings" mcs
|
||||
where mcs."meetingId" = NEW."meetingId";
|
||||
RETURN NEW;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
CREATE TRIGGER "trigger_update_connectionAliveAtMaxIntervalMs"
|
||||
BEFORE INSERT ON "user_connectionStatus"
|
||||
FOR EACH ROW
|
||||
EXECUTE FUNCTION "update_connectionAliveAtMaxIntervalMs"();
|
||||
|
||||
|
||||
--CREATE TABLE "user_connectionStatusHistory" (
|
||||
-- "userId" varchar(50) REFERENCES "user"("userId") ON DELETE CASCADE,
|
||||
-- "applicationRttInMs" numeric,
|
||||
-- "status" varchar(25),
|
||||
-- "statusUpdatedAt" timestamp with time zone
|
||||
--);
|
||||
@ -692,7 +712,6 @@ create view "v_user_connectionStatus" as select * from "user_connectionStatus";
|
||||
-- "status" varchar(25),
|
||||
-- "totalOfOccurrences" integer,
|
||||
-- "highestNetworkRttInMs" numeric,
|
||||
-- "highestApplicationRttInMs" numeric,
|
||||
-- "statusInsertedAt" timestamp with time zone,
|
||||
-- "statusUpdatedAt" timestamp with time zone,
|
||||
-- CONSTRAINT "user_connectionStatusHistory_pkey" PRIMARY KEY ("userId","status")
|
||||
@ -707,9 +726,6 @@ CREATE TABLE "user_connectionStatusMetrics" (
|
||||
"lowestNetworkRttInMs" numeric,
|
||||
"highestNetworkRttInMs" numeric,
|
||||
"lastNetworkRttInMs" numeric,
|
||||
"lowestApplicationRttInMs" numeric,
|
||||
"highestApplicationRttInMs" numeric,
|
||||
"lastApplicationRttInMs" numeric,
|
||||
CONSTRAINT "user_connectionStatusMetrics_pkey" PRIMARY KEY ("userId","status")
|
||||
);
|
||||
|
||||
@ -717,70 +733,38 @@ create index "idx_user_connectionStatusMetrics_userId" on "user_connectionStatus
|
||||
|
||||
--This function populate rtt, status and the table user_connectionStatusMetrics
|
||||
CREATE OR REPLACE FUNCTION "update_user_connectionStatus_trigger_func"() RETURNS TRIGGER AS $$
|
||||
DECLARE
|
||||
"newApplicationRttInMs" numeric;
|
||||
"newStatus" varchar(25);
|
||||
BEGIN
|
||||
IF NEW."connectionAliveAt" IS NULL OR NEW."userClientResponseAt" IS NULL THEN
|
||||
IF NEW."connectionAliveAt" IS NULL THEN
|
||||
RETURN NEW;
|
||||
END IF;
|
||||
"newApplicationRttInMs" := (EXTRACT(EPOCH FROM (NEW."userClientResponseAt" - NEW."connectionAliveAt")) * 1000);
|
||||
"newStatus" := CASE WHEN COALESCE(NEW."networkRttInMs",0) > 2000 THEN 'critical'
|
||||
WHEN COALESCE(NEW."networkRttInMs",0) > 1000 THEN 'danger'
|
||||
WHEN COALESCE(NEW."networkRttInMs",0) > 500 THEN 'warning'
|
||||
ELSE 'normal' END;
|
||||
|
||||
--Update table user_connectionStatusMetrics
|
||||
WITH upsert AS (UPDATE "user_connectionStatusMetrics" SET
|
||||
"occurrencesCount" = "user_connectionStatusMetrics"."occurrencesCount" + 1,
|
||||
"highestApplicationRttInMs" = GREATEST("user_connectionStatusMetrics"."highestApplicationRttInMs","newApplicationRttInMs"),
|
||||
"lowestApplicationRttInMs" = LEAST("user_connectionStatusMetrics"."lowestApplicationRttInMs","newApplicationRttInMs"),
|
||||
"lastApplicationRttInMs" = "newApplicationRttInMs",
|
||||
"highestNetworkRttInMs" = GREATEST("user_connectionStatusMetrics"."highestNetworkRttInMs",NEW."networkRttInMs"),
|
||||
"lowestNetworkRttInMs" = LEAST("user_connectionStatusMetrics"."lowestNetworkRttInMs",NEW."networkRttInMs"),
|
||||
"lastNetworkRttInMs" = NEW."networkRttInMs",
|
||||
"lastOccurrenceAt" = current_timestamp
|
||||
WHERE "userId"=NEW."userId" AND "status"= "newStatus" RETURNING *)
|
||||
WHERE "userId"=NEW."userId" AND "status"= NEW."status" RETURNING *)
|
||||
INSERT INTO "user_connectionStatusMetrics"("userId","status","occurrencesCount", "firstOccurrenceAt",
|
||||
"highestApplicationRttInMs", "lowestApplicationRttInMs", "lastApplicationRttInMs",
|
||||
"highestNetworkRttInMs", "lowestNetworkRttInMs", "lastNetworkRttInMs")
|
||||
SELECT NEW."userId", "newStatus", 1, current_timestamp,
|
||||
"newApplicationRttInMs", "newApplicationRttInMs", "newApplicationRttInMs",
|
||||
SELECT NEW."userId", NEW."status", 1, current_timestamp,
|
||||
NEW."networkRttInMs", NEW."networkRttInMs", NEW."networkRttInMs"
|
||||
WHERE NOT EXISTS (SELECT * FROM upsert);
|
||||
--Update networkRttInMs, applicationRttInMs, status, statusUpdatedAt in user_connectionStatus
|
||||
UPDATE "user_connectionStatus"
|
||||
SET "applicationRttInMs" = "newApplicationRttInMs",
|
||||
"status" = "newStatus",
|
||||
"statusUpdatedAt" = now()
|
||||
WHERE "userId" = NEW."userId";
|
||||
|
||||
RETURN NEW;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
CREATE TRIGGER "update_user_connectionStatus_trigger" AFTER UPDATE OF "userClientResponseAt" ON "user_connectionStatus"
|
||||
CREATE TRIGGER "update_user_connectionStatus_trigger" AFTER UPDATE OF "connectionAliveAt" ON "user_connectionStatus"
|
||||
FOR EACH ROW EXECUTE FUNCTION "update_user_connectionStatus_trigger_func"();
|
||||
|
||||
--This function clear userClientResponseAt and applicationRttInMs when connectionAliveAt is updated
|
||||
CREATE OR REPLACE FUNCTION "update_user_connectionStatus_connectionAliveAt_trigger_func"() RETURNS TRIGGER AS $$
|
||||
BEGIN
|
||||
IF NEW."connectionAliveAt" <> OLD."connectionAliveAt" THEN
|
||||
NEW."userClientResponseAt" := NULL;
|
||||
NEW."applicationRttInMs" := NULL;
|
||||
END IF;
|
||||
RETURN NEW;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
CREATE TRIGGER "update_user_connectionStatus_connectionAliveAt_trigger" BEFORE UPDATE OF "connectionAliveAt" ON "user_connectionStatus"
|
||||
FOR EACH ROW EXECUTE FUNCTION "update_user_connectionStatus_connectionAliveAt_trigger_func"();
|
||||
|
||||
|
||||
CREATE OR REPLACE VIEW "v_user_connectionStatusReport" AS
|
||||
SELECT u."meetingId", u."userId",
|
||||
max(cs."connectionAliveAt") AS "connectionAliveAt",
|
||||
max(cs."status") AS "currentStatus",
|
||||
--COALESCE(max(cs."applicationRttInMs"),(EXTRACT(EPOCH FROM (current_timestamp - max(cs."connectionAliveAt"))) * 1000)) AS "applicationRttInMs",
|
||||
CASE WHEN max(cs."connectionAliveAt") < current_timestamp - INTERVAL '12 seconds' THEN TRUE ELSE FALSE END AS "clientNotResponding",
|
||||
CASE WHEN max(cs."connectionAliveAt") < current_timestamp - INTERVAL '1 millisecond' * max(cs."connectionAliveAtMaxIntervalMs") THEN TRUE ELSE FALSE END AS "clientNotResponding",
|
||||
(array_agg(csm."status" ORDER BY csm."lastOccurrenceAt" DESC))[1] as "lastUnstableStatus",
|
||||
max(csm."lastOccurrenceAt") AS "lastUnstableStatusAt"
|
||||
FROM "user" u
|
||||
|
@ -460,11 +460,7 @@ type Mutation {
|
||||
}
|
||||
|
||||
type Mutation {
|
||||
userSetConnectionAlive: Boolean
|
||||
}
|
||||
|
||||
type Mutation {
|
||||
userSetConnectionRtt(
|
||||
userSetConnectionAlive(
|
||||
networkRttInMs: Float!
|
||||
): Boolean
|
||||
}
|
||||
|
@ -421,12 +421,6 @@ actions:
|
||||
handler: '{{HASURA_BBB_GRAPHQL_ACTIONS_ADAPTER_URL}}'
|
||||
permissions:
|
||||
- role: bbb_client
|
||||
- name: userSetConnectionRtt
|
||||
definition:
|
||||
kind: synchronous
|
||||
handler: '{{HASURA_BBB_GRAPHQL_ACTIONS_ADAPTER_URL}}'
|
||||
permissions:
|
||||
- role: bbb_client
|
||||
- name: userSetEmojiStatus
|
||||
definition:
|
||||
kind: synchronous
|
||||
|
@ -22,11 +22,9 @@ select_permissions:
|
||||
columns:
|
||||
- connectionAliveAt
|
||||
- meetingId
|
||||
- applicationRttInMs
|
||||
- networkRttInMs
|
||||
- status
|
||||
- statusUpdatedAt
|
||||
- userClientResponseAt
|
||||
- userId
|
||||
filter:
|
||||
_and:
|
||||
|
@ -1,30 +1,22 @@
|
||||
import { useEffect, useRef } from 'react';
|
||||
import { useMutation, useSubscription } from '@apollo/client';
|
||||
import { CONNECTION_STATUS_SUBSCRIPTION } from './queries';
|
||||
import { UPDATE_CONNECTION_ALIVE_AT, UPDATE_USER_CLIENT_RTT } from './mutations';
|
||||
import { useMutation } from '@apollo/client';
|
||||
import { UPDATE_CONNECTION_ALIVE_AT } from './mutations';
|
||||
|
||||
const STATS_INTERVAL = window.meetingClientSettings.public.stats.interval;
|
||||
|
||||
const ConnectionStatus = () => {
|
||||
const networkRttInMs = useRef(null); // Ref to store the current timeout
|
||||
const lastStatusUpdatedAtReceived = useRef(null); // Ref to store the current timeout
|
||||
const networkRttInMs = useRef(0); // Ref to store the last rtt
|
||||
const timeoutRef = useRef(null);
|
||||
|
||||
const [updateUserClientRtt] = useMutation(UPDATE_USER_CLIENT_RTT);
|
||||
|
||||
const handleUpdateUserClientResponseAt = () => {
|
||||
updateUserClientRtt({
|
||||
variables: {
|
||||
networkRttInMs: networkRttInMs.current,
|
||||
},
|
||||
});
|
||||
};
|
||||
|
||||
const [updateConnectionAliveAtToMeAsNow] = useMutation(UPDATE_CONNECTION_ALIVE_AT);
|
||||
const [updateConnectionAliveAtM] = useMutation(UPDATE_CONNECTION_ALIVE_AT);
|
||||
|
||||
const handleUpdateConnectionAliveAt = () => {
|
||||
const startTime = performance.now();
|
||||
updateConnectionAliveAtToMeAsNow().then(() => {
|
||||
updateConnectionAliveAtM({
|
||||
variables: {
|
||||
networkRttInMs: networkRttInMs.current,
|
||||
},
|
||||
}).then(() => {
|
||||
const endTime = performance.now();
|
||||
networkRttInMs.current = endTime - startTime;
|
||||
}).finally(() => {
|
||||
@ -39,27 +31,13 @@ const ConnectionStatus = () => {
|
||||
};
|
||||
|
||||
useEffect(() => {
|
||||
handleUpdateConnectionAliveAt();
|
||||
// Delay first connectionAlive to avoid high RTT misestimation
|
||||
// due to initial subscription and mutation traffic at client render
|
||||
timeoutRef.current = setTimeout(() => {
|
||||
handleUpdateConnectionAliveAt();
|
||||
}, STATS_INTERVAL / 2);
|
||||
}, []);
|
||||
|
||||
const { loading, error, data } = useSubscription(CONNECTION_STATUS_SUBSCRIPTION);
|
||||
|
||||
useEffect(() => {
|
||||
if (!loading && !error && data) {
|
||||
data.user_connectionStatus.forEach((curr) => {
|
||||
if (curr.connectionAliveAt != null
|
||||
&& curr.userClientResponseAt == null
|
||||
&& (curr.statusUpdatedAt == null
|
||||
|| curr.statusUpdatedAt !== lastStatusUpdatedAtReceived.current
|
||||
)
|
||||
) {
|
||||
lastStatusUpdatedAtReceived.current = curr.statusUpdatedAt;
|
||||
handleUpdateUserClientResponseAt();
|
||||
}
|
||||
});
|
||||
}
|
||||
}, [data]);
|
||||
|
||||
return null;
|
||||
};
|
||||
|
||||
|
@ -1,18 +1,12 @@
|
||||
import { gql } from '@apollo/client';
|
||||
|
||||
export const UPDATE_CONNECTION_ALIVE_AT = gql`
|
||||
mutation UpdateConnectionAliveAt {
|
||||
userSetConnectionAlive
|
||||
}`;
|
||||
|
||||
export const UPDATE_USER_CLIENT_RTT = gql`
|
||||
mutation UpdateConnectionRtt($networkRttInMs: Float!) {
|
||||
userSetConnectionRtt(
|
||||
mutation UpdateConnectionAliveAt($networkRttInMs: Float!) {
|
||||
userSetConnectionAlive(
|
||||
networkRttInMs: $networkRttInMs
|
||||
)
|
||||
}`;
|
||||
|
||||
export default {
|
||||
UPDATE_CONNECTION_ALIVE_AT,
|
||||
UPDATE_USER_CLIENT_RTT,
|
||||
};
|
||||
|
Loading…
Reference in New Issue
Block a user