Merge pull request #11557 from aron-2809/bbb-core-http-endpoint-wip

Bbb core http endpoint wip
This commit is contained in:
Richard Alam 2021-03-05 14:27:48 -05:00 committed by GitHub
commit 28245a1b58
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 179 additions and 56 deletions

View File

@ -3,13 +3,21 @@ package org.bigbluebutton
import akka.http.scaladsl.model._
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
import akka.http.scaladsl.server.Directives._
import org.bigbluebutton.service.HealthzService
import org.bigbluebutton.service.{ HealthzService, PubSubReceiveStatus, PubSubSendStatus, RecordingDBSendStatus }
import spray.json.DefaultJsonProtocol
case class HealthResponse(isHealthy: Boolean)
case class HealthResponse(
isHealthy: Boolean,
pubsubSendStatus: PubSubSendStatus,
pubsubReceiveStatus: PubSubReceiveStatus,
recordingDbStatus: RecordingDBSendStatus
)
trait JsonSupportProtocol extends SprayJsonSupport with DefaultJsonProtocol {
implicit val healthServiceJsonFormat = jsonFormat1(HealthResponse)
implicit val pubSubSendStatusJsonFormat = jsonFormat2(PubSubSendStatus)
implicit val pubSubReceiveStatusJsonFormat = jsonFormat2(PubSubReceiveStatus)
implicit val recordingDbStatusJsonFormat = jsonFormat2(RecordingDBSendStatus)
implicit val healthServiceJsonFormat = jsonFormat4(HealthResponse)
}
class ApiService(healthz: HealthzService) extends JsonSupportProtocol {
@ -21,9 +29,25 @@ class ApiService(healthz: HealthzService) extends JsonSupportProtocol {
onSuccess(future) {
case response =>
if (response.isHealthy) {
complete(StatusCodes.OK, HealthResponse(response.isHealthy))
complete(
StatusCodes.OK,
HealthResponse(
response.isHealthy,
response.pubSubSendStatus,
response.pubSubReceiveStatus,
response.recordingDBSendStatus
)
)
} else {
complete(StatusCodes.ServiceUnavailable, HealthResponse(response.isHealthy))
complete(
StatusCodes.ServiceUnavailable,
HealthResponse(
response.isHealthy,
response.pubSubSendStatus,
response.pubSubReceiveStatus,
response.recordingDBSendStatus
)
)
}
}
}

View File

@ -41,8 +41,12 @@ object Boot extends App with SystemConfiguration {
val msgSender = new MessageSender(redisPublisher)
val healthzService = HealthzService(system)
val apiService = new ApiService(healthzService)
val redisRecorderActor = system.actorOf(
RedisRecorderActor.props(system, redisConfig),
RedisRecorderActor.props(system, redisConfig, healthzService),
"redisRecorderActor"
)
@ -60,7 +64,7 @@ object Boot extends App with SystemConfiguration {
outBus2.subscribe(analyticsActorRef, outBbbMsgMsgChannel)
bbbMsgBus.subscribe(analyticsActorRef, analyticsChannel)
val bbbActor = system.actorOf(BigBlueButtonActor.props(system, eventBus, bbbMsgBus, outGW), "bigbluebutton-actor")
val bbbActor = system.actorOf(BigBlueButtonActor.props(system, eventBus, bbbMsgBus, outGW, healthzService), "bigbluebutton-actor")
eventBus.subscribe(bbbActor, meetingManagerChannel)
val redisMessageHandlerActor = system.actorOf(ReceivedJsonMsgHandlerActor.props(bbbMsgBus, incomingJsonMessageBus))
@ -80,9 +84,5 @@ object Boot extends App with SystemConfiguration {
"redis-subscriber"
)
val healthz = HealthzService(system)
val apiService = new ApiService(healthz)
val bindingFuture = Http().bindAndHandle(apiService.routes, httpHost, httpPort)
}

View File

@ -1,7 +1,6 @@
package org.bigbluebutton.core
import java.io.{ PrintWriter, StringWriter }
import akka.actor._
import akka.actor.ActorLogging
import akka.actor.SupervisorStrategy.Resume
@ -11,27 +10,30 @@ import scala.concurrent.duration._
import org.bigbluebutton.core.bus._
import org.bigbluebutton.core.api._
import org.bigbluebutton.SystemConfiguration
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeUnit
import org.bigbluebutton.common2.msgs._
import org.bigbluebutton.core.running.RunningMeeting
import org.bigbluebutton.core2.RunningMeetings
import org.bigbluebutton.core2.message.senders.MsgBuilder
import org.bigbluebutton.service.HealthzService
object BigBlueButtonActor extends SystemConfiguration {
def props(
system: ActorSystem,
eventBus: InternalEventBus,
bbbMsgBus: BbbMsgRouterEventBus,
outGW: OutMessageGateway
system: ActorSystem,
eventBus: InternalEventBus,
bbbMsgBus: BbbMsgRouterEventBus,
outGW: OutMessageGateway,
healthzService: HealthzService
): Props =
Props(classOf[BigBlueButtonActor], system, eventBus, bbbMsgBus, outGW)
Props(classOf[BigBlueButtonActor], system, eventBus, bbbMsgBus, outGW, healthzService)
}
class BigBlueButtonActor(
val system: ActorSystem,
val eventBus: InternalEventBus, val bbbMsgBus: BbbMsgRouterEventBus,
val outGW: OutMessageGateway
val outGW: OutMessageGateway,
val healthzService: HealthzService
) extends Actor
with ActorLogging with SystemConfiguration {
@ -165,7 +167,8 @@ class BigBlueButtonActor(
}
private def handleCheckAlivePingSysMsg(msg: CheckAlivePingSysMsg): Unit = {
val event = MsgBuilder.buildCheckAlivePingSysMsg(msg.body.system, msg.body.timestamp)
val event = MsgBuilder.buildCheckAlivePingSysMsg(msg.body.system, msg.body.bbbWebTimestamp, System.currentTimeMillis())
healthzService.sendPubSubStatusMessage(msg.body.akkaAppsTimestamp, System.currentTimeMillis())
outGW.send(event)
}

View File

@ -258,10 +258,10 @@ object MsgBuilder {
BbbCommonEnvCoreMsg(envelope, event)
}
def buildCheckAlivePingSysMsg(system: String, timestamp: Long): BbbCommonEnvCoreMsg = {
def buildCheckAlivePingSysMsg(system: String, bbbWebTimestamp: Long, akkaAppsTimestamp: Long): BbbCommonEnvCoreMsg = {
val routing = collection.immutable.HashMap("sender" -> "bbb-apps-akka")
val envelope = BbbCoreEnvelope(CheckAlivePongSysMsg.NAME, routing)
val body = CheckAlivePongSysMsgBody(system, timestamp)
val body = CheckAlivePongSysMsgBody(system, bbbWebTimestamp, akkaAppsTimestamp)
val header = BbbCoreBaseHeader(CheckAlivePongSysMsg.NAME)
val event = CheckAlivePongSysMsg(header, body)

View File

@ -10,22 +10,32 @@ import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.ActorSystem
import akka.actor.Props
import org.bigbluebutton.service.HealthzService
import scala.concurrent.duration._
import scala.concurrent._
import ExecutionContext.Implicits.global
case object CheckRecordingDBStatus
object RedisRecorderActor {
def props(
system: ActorSystem,
redisConfig: RedisConfig
system: ActorSystem,
redisConfig: RedisConfig,
healthzService: HealthzService
): Props =
Props(
classOf[RedisRecorderActor],
system,
redisConfig
redisConfig,
healthzService
)
}
class RedisRecorderActor(
system: ActorSystem,
redisConfig: RedisConfig
system: ActorSystem,
redisConfig: RedisConfig,
healthzService: HealthzService
)
extends RedisStorageProvider(
system,
@ -33,6 +43,8 @@ class RedisRecorderActor(
redisConfig
) with Actor with ActorLogging {
system.scheduler.schedule(1.minutes, 1.minutes, self, CheckRecordingDBStatus)
private def record(session: String, message: java.util.Map[java.lang.String, java.lang.String]): Unit = {
redis.recordAndExpire(session, message)
}
@ -41,7 +53,7 @@ class RedisRecorderActor(
//=============================
// 2x messages
case msg: BbbCommonEnvCoreMsg => handleBbbCommonEnvCoreMsg(msg)
case CheckRecordingDBStatus => checkRecordingDBStatus()
case _ => // do nothing
}
@ -560,4 +572,12 @@ class RedisRecorderActor(
record(meetingId, ev.toMap.asJava)
}
private def checkRecordingDBStatus(): Unit = {
if (redis.checkConnectionStatusBasic)
healthzService.sendRecordingDBStatusMessage(System.currentTimeMillis())
else
log.error("recording database is not available.")
}
}

View File

@ -4,13 +4,31 @@ import akka.actor.{ Actor, ActorLogging, ActorSystem, Props }
import akka.util.Timeout
import scala.concurrent.Future
import scala.concurrent.duration.DurationInt
import scala.concurrent.duration._
import akka.pattern.{ AskTimeoutException, ask }
import org.bigbluebutton.core.BigBlueButtonActor
import java.time.{ Instant, LocalDateTime }
import java.util.TimeZone
sealed trait HealthMessage
case object GetHealthMessage extends HealthMessage
case class GetHealthResponseMessage(isHealthy: Boolean) extends HealthMessage
case class GetBigBlueButtonActorStatus(bbbActor: BigBlueButtonActor) extends HealthMessage
case class SetPubSubReceiveStatus(timestamp: Long) extends HealthMessage
case class SetPubSubStatus(sendTimestamp: Long, receiveTimestamp: Long) extends HealthMessage
case class SetRecordingDatabaseStatus(timestamp: Long) extends HealthMessage
case class GetHealthResponseMessage(
isHealthy: Boolean,
pubSubSendStatus: PubSubSendStatus,
pubSubReceiveStatus: PubSubReceiveStatus,
recordingDBSendStatus: RecordingDBSendStatus
) extends HealthMessage
case class PubSubSendStatus(status: Boolean, timestamp: String)
case class PubSubReceiveStatus(status: Boolean, timestamp: String)
case class RecordingDBSendStatus(status: Boolean, timestamp: String)
object HealthzService {
def apply(system: ActorSystem) = new HealthzService(system)
@ -20,17 +38,30 @@ class HealthzService(system: ActorSystem) {
implicit def executionContext = system.dispatcher
implicit val timeout: Timeout = 2 seconds
val actorRef = system.actorOf(HealthzActor.props())
val healthActor = system.actorOf(HealthzActor.props())
def getHealthz(): Future[GetHealthResponseMessage] = {
val future = actorRef.ask(GetHealthMessage).mapTo[GetHealthResponseMessage]
val future = healthActor.ask(GetHealthMessage).mapTo[GetHealthResponseMessage]
future.recover {
case e: AskTimeoutException => {
GetHealthResponseMessage(isHealthy = false)
GetHealthResponseMessage(
false,
PubSubSendStatus(false, String.valueOf(0L)),
PubSubReceiveStatus(false, String.valueOf(0L)),
RecordingDBSendStatus(false, String.valueOf(0L))
)
}
}
}
def sendPubSubStatusMessage(sendTimestamp: Long, receiveTimestamp: Long): Unit = {
healthActor ! SetPubSubStatus(sendTimestamp, receiveTimestamp)
}
def sendRecordingDBStatusMessage(timestamp: Long): Unit = {
healthActor ! SetRecordingDatabaseStatus(timestamp)
}
}
object HealthzActor {
@ -38,8 +69,38 @@ object HealthzActor {
}
class HealthzActor extends Actor with ActorLogging {
val twoMins: Int = 2 * 60 * 1000
var lastSentTimestamp: Long = 0L
var lastReceivedTimestamp: Long = 0L
var recordingDBResponseTimestamp: Long = 0L
override def receive: Receive = {
case GetHealthMessage => sender ! GetHealthResponseMessage(isHealthy = true)
case _ => println("unexpected message, exception could be raised")
case GetHealthMessage =>
val c1: Boolean = computeElapsedTimeFromNow(lastSentTimestamp) < 30000
val c2: Boolean = computeElapsedTimeFromNow(lastReceivedTimestamp) < twoMins
val c3: Boolean = computeElapsedTimeFromNow(recordingDBResponseTimestamp) < twoMins
val status: Boolean = c1 && c2 && c3
sender ! GetHealthResponseMessage(
status,
PubSubSendStatus(c1, convertLongTimestampToDateTimeString(lastSentTimestamp)),
PubSubReceiveStatus(c2, convertLongTimestampToDateTimeString(lastReceivedTimestamp)),
RecordingDBSendStatus(c3, convertLongTimestampToDateTimeString(recordingDBResponseTimestamp))
)
case SetPubSubStatus(sendTimestamp: Long, receiveTimestamp: Long) =>
lastSentTimestamp = sendTimestamp
lastReceivedTimestamp = receiveTimestamp
case SetRecordingDatabaseStatus(timestamp) =>
recordingDBResponseTimestamp = timestamp
case _ => println("unexpected message, exception could be raised")
}
def computeElapsedTimeFromNow(inputTimeStamp: Long): Long = {
System.currentTimeMillis() - inputTimeStamp
}
def convertLongTimestampToDateTimeString(inputTimestamp: Long): String = {
LocalDateTime.ofInstant(Instant.ofEpochMilli(inputTimestamp), TimeZone.getDefault.toZoneId).toString
}
}

View File

@ -23,6 +23,7 @@ import java.util.HashMap;
import java.util.Map;
import com.sun.org.apache.xpath.internal.operations.Bool;
import io.lettuce.core.api.sync.BaseRedisCommands;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -146,4 +147,11 @@ public class RedisStorageService extends RedisAwareCommunicator {
result = commands.hmset(key, info);
return result;
}
public Boolean checkConnectionStatusBasic() {
BaseRedisCommands command = connection.sync();
String response = command.ping();
return response.equals("PONG");
}
}

View File

@ -164,14 +164,14 @@ case class CheckAlivePingSysMsg(
header: BbbCoreBaseHeader,
body: CheckAlivePingSysMsgBody
) extends BbbCoreMsg
case class CheckAlivePingSysMsgBody(system: String, timestamp: Long)
case class CheckAlivePingSysMsgBody(system: String, bbbWebTimestamp: Long, akkaAppsTimestamp: Long)
object CheckAlivePongSysMsg { val NAME = "CheckAlivePongSysMsg" }
case class CheckAlivePongSysMsg(
header: BbbCoreBaseHeader,
body: CheckAlivePongSysMsgBody
) extends BbbCoreMsg
case class CheckAlivePongSysMsgBody(system: String, timestamp: Long)
case class CheckAlivePongSysMsgBody(system: String, bbbWebTimestamp: Long, akkaAppsTimestamp: Long)
object RecordingChapterBreakSysMsg { val NAME = "RecordingChapterBreakSysMsg" }
case class RecordingChapterBreakSysMsg(

View File

@ -3,10 +3,12 @@ package org.bigbluebutton.api.messaging.messages;
public class KeepAliveReply implements IMessage {
public final String system;
public final Long timestamp;
public KeepAliveReply(String system, Long timestamp) {
public final Long bbbWebTimestamp;
public final Long akkaAppsTimestamp;
public KeepAliveReply(String system, Long bbbWebTimestamp, Long akkaAppsTimestamp) {
this.system = system;
this.timestamp = timestamp;
this.bbbWebTimestamp = bbbWebTimestamp;
this.akkaAppsTimestamp = akkaAppsTimestamp;
}
}

View File

@ -19,6 +19,6 @@ public interface IPublisherService {
void send(String channel, String message);
void registerUser(String meetingID, String internalUserId, String fullname, String role, String externUserID,
String authToken, String avatarURL, Boolean guest, Boolean authed);
void sendKeepAlive(String system, Long timestamp);
void sendKeepAlive(String system, Long bbbWebTimestamp, Long akkaAppsTimestamp);
void sendStunTurnInfo(String meetingId, String internalUserId, Set<StunServer> stuns, Set<TurnEntry> turns);
}

View File

@ -42,7 +42,7 @@ public interface IBbbWebApiGWApp {
void destroyMeeting(DestroyMeetingMessage msg);
void endMeeting(EndMeetingMessage msg);
void sendKeepAlive(String system, Long timestamp);
void sendKeepAlive(String system, Long bbbWebTimestamp, Long akkaAppsTimestamp);
void publishedRecording(PublishedRecordingMessage msg);
void unpublishedRecording(UnpublishedRecordingMessage msg);
void deletedRecording(DeletedRecordingMessage msg);

View File

@ -3,10 +3,12 @@ package org.bigbluebutton.web.services;
public class KeepAlivePong implements KeepAliveMessage {
public final String system;
public final Long timestamp;
public KeepAlivePong(String system, Long timestamp) {
public final Long bbbWebTimestamp;
public final Long akkaAppsTimestamp;
public KeepAlivePong(String system, Long bbbWebTimestamp, Long akkaAppsTimestamp) {
this.system = system;
this.timestamp = timestamp;
this.bbbWebTimestamp = bbbWebTimestamp;
this.akkaAppsTimestamp = akkaAppsTimestamp;
}
}

View File

@ -51,6 +51,7 @@ public class KeepAliveService implements MessageListener {
private BlockingQueue<KeepAliveMessage> messages = new LinkedBlockingQueue<KeepAliveMessage>();
private Long lastKeepAliveMessage = 0L;
private Long lastAkkaAppsTimestamp = 0L;
private static final String SYSTEM = "BbbWeb";
@ -123,7 +124,7 @@ public class KeepAliveService implements MessageListener {
}
private void processPing(KeepAlivePing msg) {
gw.sendKeepAlive(SYSTEM, System.currentTimeMillis());
gw.sendKeepAlive(SYSTEM, System.currentTimeMillis(), lastAkkaAppsTimestamp);
Boolean akkaAppsIsAvailable = available;
if (lastKeepAliveMessage != 0 && (System.currentTimeMillis() - lastKeepAliveMessage > 30000)) {
@ -141,12 +142,13 @@ public class KeepAliveService implements MessageListener {
}
lastKeepAliveMessage = System.currentTimeMillis();
lastAkkaAppsTimestamp = msg.akkaAppsTimestamp;
available = true;
}
private void handleKeepAliveReply(String system, Long timestamp) {
private void handleKeepAliveReply(String system, Long bbbWebTimestamp, Long akkaAppsTimestamp) {
if (SYSTEM.equals(system)) {
KeepAlivePong pong = new KeepAlivePong(system, timestamp);
KeepAlivePong pong = new KeepAlivePong(system, bbbWebTimestamp, akkaAppsTimestamp);
queueMessage(pong);
}
}
@ -155,7 +157,7 @@ public class KeepAliveService implements MessageListener {
public void handle(IMessage message) {
if (message instanceof KeepAliveReply) {
KeepAliveReply msg = (KeepAliveReply) message;
handleKeepAliveReply(msg.system, msg.timestamp);
handleKeepAliveReply(msg.system, msg.bbbWebTimestamp, msg.akkaAppsTimestamp);
}
}
}

View File

@ -255,8 +255,8 @@ class BbbWebApiGWApp(
msgToAkkaAppsEventBus.publish(MsgToAkkaApps(toAkkaAppsChannel, event))
}
def sendKeepAlive(system: String, timestamp: java.lang.Long): Unit = {
val event = MsgBuilder.buildCheckAlivePingSysMsg(system, timestamp.longValue())
def sendKeepAlive(system: String, bbbWebTimestamp: java.lang.Long, akkaAppsTimestamp: java.lang.Long): Unit = {
val event = MsgBuilder.buildCheckAlivePingSysMsg(system, bbbWebTimestamp.longValue(), akkaAppsTimestamp.longValue())
msgToAkkaAppsEventBus.publish(MsgToAkkaApps(toAkkaAppsChannel, event))
}

View File

@ -64,11 +64,11 @@ object MsgBuilder {
BbbCommonEnvCoreMsg(envelope, req)
}
def buildCheckAlivePingSysMsg(system: String, timestamp: Long): BbbCommonEnvCoreMsg = {
def buildCheckAlivePingSysMsg(system: String, bbbWebTimestamp: Long, akkaAppsTimestamp: Long): BbbCommonEnvCoreMsg = {
val routing = collection.immutable.HashMap("sender" -> "bbb-web")
val envelope = BbbCoreEnvelope(CheckAlivePingSysMsg.NAME, routing)
val header = BbbCoreBaseHeader(CheckAlivePingSysMsg.NAME)
val body = CheckAlivePingSysMsgBody(system, timestamp)
val body = CheckAlivePingSysMsgBody(system, bbbWebTimestamp, akkaAppsTimestamp)
val req = CheckAlivePingSysMsg(header, body)
BbbCommonEnvCoreMsg(envelope, req)
}

View File

@ -101,7 +101,8 @@ class OldMeetingMsgHdlrActor(val olgMsgGW: OldMessageReceivedGW)
}
def handleCheckAlivePongSysMsg(msg: CheckAlivePongSysMsg): Unit = {
olgMsgGW.handle(new org.bigbluebutton.api.messaging.messages.KeepAliveReply(msg.body.system, msg.body.timestamp))
olgMsgGW.handle(new org.bigbluebutton.api.messaging.messages.KeepAliveReply(msg.body.system, msg.body.bbbWebTimestamp,
msg.body.akkaAppsTimestamp))
}
def handleUserJoinedMeetingEvtMsg(msg: UserJoinedMeetingEvtMsg): Unit = {