feat: pubsub send and receive statuses added to health endpoint
This commit is contained in:
parent
391ce4d83d
commit
cae124aed7
@ -3,13 +3,21 @@ package org.bigbluebutton
|
||||
import akka.http.scaladsl.model._
|
||||
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
|
||||
import akka.http.scaladsl.server.Directives._
|
||||
import org.bigbluebutton.service.HealthzService
|
||||
import org.bigbluebutton.service.{ HealthzService, PubSubReceiveStatus, PubSubSendStatus, RecordingDBSendStatus }
|
||||
import spray.json.DefaultJsonProtocol
|
||||
|
||||
case class HealthResponse(isHealthy: Boolean)
|
||||
case class HealthResponse(
|
||||
isHealthy: Boolean,
|
||||
pubsubSendStatus: PubSubSendStatus,
|
||||
pubsubReceiveStatus: PubSubReceiveStatus,
|
||||
recordingDbStatus: RecordingDBSendStatus
|
||||
)
|
||||
|
||||
trait JsonSupportProtocol extends SprayJsonSupport with DefaultJsonProtocol {
|
||||
implicit val healthServiceJsonFormat = jsonFormat1(HealthResponse)
|
||||
implicit val pubSubSendStatusJsonFormat = jsonFormat2(PubSubSendStatus)
|
||||
implicit val pubSubReceiveStatusJsonFormat = jsonFormat2(PubSubReceiveStatus)
|
||||
implicit val recordingDbStatusJsonFormat = jsonFormat2(RecordingDBSendStatus)
|
||||
implicit val healthServiceJsonFormat = jsonFormat4(HealthResponse)
|
||||
}
|
||||
|
||||
class ApiService(healthz: HealthzService) extends JsonSupportProtocol {
|
||||
@ -21,9 +29,19 @@ class ApiService(healthz: HealthzService) extends JsonSupportProtocol {
|
||||
onSuccess(future) {
|
||||
case response =>
|
||||
if (response.isHealthy) {
|
||||
complete(StatusCodes.OK, HealthResponse(response.isHealthy))
|
||||
complete(StatusCodes.OK, HealthResponse(
|
||||
response.isHealthy,
|
||||
response.pubSubSendStatus,
|
||||
response.pubSubReceiveStatus,
|
||||
response.recordingDBSendStatus
|
||||
))
|
||||
} else {
|
||||
complete(StatusCodes.ServiceUnavailable, HealthResponse(response.isHealthy))
|
||||
complete(StatusCodes.ServiceUnavailable, HealthResponse(
|
||||
response.isHealthy,
|
||||
response.pubSubSendStatus,
|
||||
response.pubSubReceiveStatus,
|
||||
response.recordingDBSendStatus
|
||||
))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -167,7 +167,8 @@ class BigBlueButtonActor(
|
||||
}
|
||||
|
||||
private def handleCheckAlivePingSysMsg(msg: CheckAlivePingSysMsg): Unit = {
|
||||
val event = MsgBuilder.buildCheckAlivePingSysMsg(msg.body.system, msg.body.timestamp)
|
||||
val event = MsgBuilder.buildCheckAlivePingSysMsg(msg.body.system, msg.body.bbbWebTimestamp, System.currentTimeMillis())
|
||||
healthzService.sendSentStatusMessage(msg.body.akkaAppsTimestamp)
|
||||
healthzService.sendReceiveStatusMessage(System.currentTimeMillis())
|
||||
outGW.send(event)
|
||||
}
|
||||
|
@ -238,10 +238,10 @@ object MsgBuilder {
|
||||
BbbCommonEnvCoreMsg(envelope, event)
|
||||
}
|
||||
|
||||
def buildCheckAlivePingSysMsg(system: String, timestamp: Long): BbbCommonEnvCoreMsg = {
|
||||
def buildCheckAlivePingSysMsg(system: String, bbbWebTimestamp: Long, akkaAppsTimestamp: Long): BbbCommonEnvCoreMsg = {
|
||||
val routing = collection.immutable.HashMap("sender" -> "bbb-apps-akka")
|
||||
val envelope = BbbCoreEnvelope(CheckAlivePongSysMsg.NAME, routing)
|
||||
val body = CheckAlivePongSysMsgBody(system, timestamp)
|
||||
val body = CheckAlivePongSysMsgBody(system, bbbWebTimestamp, akkaAppsTimestamp)
|
||||
val header = BbbCoreBaseHeader(CheckAlivePongSysMsg.NAME)
|
||||
val event = CheckAlivePongSysMsg(header, body)
|
||||
|
||||
|
@ -8,14 +8,27 @@ import scala.concurrent.duration._
|
||||
import akka.pattern.{ AskTimeoutException, ask }
|
||||
import org.bigbluebutton.core.BigBlueButtonActor
|
||||
|
||||
import java.time.{ Instant, LocalDateTime }
|
||||
import java.util.TimeZone
|
||||
|
||||
sealed trait HealthMessage
|
||||
|
||||
case object GetHealthMessage extends HealthMessage
|
||||
case class GetBigBlueButtonActorStatus(bbbActor: BigBlueButtonActor) extends HealthMessage
|
||||
case class SetPubSubReceiveStatus(timestamp: Long) extends HealthMessage
|
||||
case class SetPubSubSentStatus(timestamp: Long) extends HealthMessage
|
||||
case class SetRecordingDatabaseStatus(timestamp: Long) extends HealthMessage
|
||||
case object GetHealthMessage extends HealthMessage
|
||||
|
||||
case class GetHealthResponseMessage(isHealthy: Boolean) extends HealthMessage
|
||||
case class GetHealthResponseMessage(
|
||||
isHealthy: Boolean,
|
||||
pubSubSendStatus: PubSubSendStatus,
|
||||
pubSubReceiveStatus: PubSubReceiveStatus,
|
||||
recordingDBSendStatus: RecordingDBSendStatus
|
||||
) extends HealthMessage
|
||||
|
||||
case class PubSubSendStatus(status: Boolean, timestamp: String)
|
||||
case class PubSubReceiveStatus(status: Boolean, timestamp: String)
|
||||
case class RecordingDBSendStatus(status: Boolean, timestamp: String)
|
||||
|
||||
object HealthzService {
|
||||
def apply(system: ActorSystem) = new HealthzService(system)
|
||||
@ -31,11 +44,15 @@ class HealthzService(system: ActorSystem) {
|
||||
val future = healthActor.ask(GetHealthMessage).mapTo[GetHealthResponseMessage]
|
||||
future.recover {
|
||||
case e: AskTimeoutException => {
|
||||
GetHealthResponseMessage(isHealthy = false)
|
||||
GetHealthResponseMessage(false, null, null, null)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def sendSentStatusMessage(timestamp: Long): Unit = {
|
||||
healthActor ! SetPubSubSentStatus(timestamp)
|
||||
}
|
||||
|
||||
def sendReceiveStatusMessage(timestamp: Long): Unit = {
|
||||
healthActor ! SetPubSubReceiveStatus(timestamp)
|
||||
}
|
||||
@ -43,6 +60,7 @@ class HealthzService(system: ActorSystem) {
|
||||
def sendRecordingDBStatusMessage(timestamp: Long): Unit = {
|
||||
healthActor ! SetRecordingDatabaseStatus(timestamp)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
object HealthzActor {
|
||||
@ -50,19 +68,26 @@ object HealthzActor {
|
||||
}
|
||||
|
||||
class HealthzActor extends Actor with ActorLogging {
|
||||
val twoMins = 2 * 60 * 1000
|
||||
var lastReceivedTimestamp = 0L
|
||||
var recordingDBResponseTimestamp = 0L
|
||||
val twoMins: Int = 2 * 60 * 1000
|
||||
var lastSentTimestamp: Long = 0L
|
||||
var lastReceivedTimestamp: Long = 0L
|
||||
var recordingDBResponseTimestamp: Long = 0L
|
||||
|
||||
override def receive: Receive = {
|
||||
case GetHealthMessage =>
|
||||
val status: Boolean = (computeElapsedTimeFromNow(lastReceivedTimestamp) < twoMins) &&
|
||||
(computeElapsedTimeFromNow(recordingDBResponseTimestamp) < twoMins)
|
||||
println(s"lastReceivedTimestamp: $lastReceivedTimestamp")
|
||||
println(s"recordingDBResponseTimestamp: $recordingDBResponseTimestamp")
|
||||
|
||||
sender ! GetHealthResponseMessage(status)
|
||||
val c1: Boolean = computeElapsedTimeFromNow(lastSentTimestamp) < 30000
|
||||
val c2: Boolean = computeElapsedTimeFromNow(lastReceivedTimestamp) < twoMins
|
||||
val c3: Boolean = computeElapsedTimeFromNow(recordingDBResponseTimestamp) < twoMins
|
||||
val status: Boolean = c1 && c2 && c3
|
||||
|
||||
sender ! GetHealthResponseMessage(
|
||||
status,
|
||||
PubSubSendStatus(c1, convertLongTimestampToDateTimeString(lastSentTimestamp)),
|
||||
PubSubReceiveStatus(c2, convertLongTimestampToDateTimeString(lastReceivedTimestamp)),
|
||||
RecordingDBSendStatus(c3, convertLongTimestampToDateTimeString(recordingDBResponseTimestamp))
|
||||
)
|
||||
case SetPubSubSentStatus(timestamp) =>
|
||||
lastSentTimestamp = timestamp
|
||||
case SetPubSubReceiveStatus(timestamp) =>
|
||||
lastReceivedTimestamp = timestamp
|
||||
case SetRecordingDatabaseStatus(timestamp) =>
|
||||
@ -73,4 +98,9 @@ class HealthzActor extends Actor with ActorLogging {
|
||||
def computeElapsedTimeFromNow(inputTimeStamp: Long): Long = {
|
||||
System.currentTimeMillis() - inputTimeStamp
|
||||
}
|
||||
|
||||
def convertLongTimestampToDateTimeString(inputTimestamp: Long): String = {
|
||||
LocalDateTime.ofInstant(Instant.ofEpochMilli(inputTimestamp), TimeZone.getDefault.toZoneId).toString
|
||||
}
|
||||
|
||||
}
|
@ -164,14 +164,14 @@ case class CheckAlivePingSysMsg(
|
||||
header: BbbCoreBaseHeader,
|
||||
body: CheckAlivePingSysMsgBody
|
||||
) extends BbbCoreMsg
|
||||
case class CheckAlivePingSysMsgBody(system: String, timestamp: Long)
|
||||
case class CheckAlivePingSysMsgBody(system: String, bbbWebTimestamp: Long, akkaAppsTimestamp: Long)
|
||||
|
||||
object CheckAlivePongSysMsg { val NAME = "CheckAlivePongSysMsg" }
|
||||
case class CheckAlivePongSysMsg(
|
||||
header: BbbCoreBaseHeader,
|
||||
body: CheckAlivePongSysMsgBody
|
||||
) extends BbbCoreMsg
|
||||
case class CheckAlivePongSysMsgBody(system: String, timestamp: Long)
|
||||
case class CheckAlivePongSysMsgBody(system: String, bbbWebTimestamp: Long, akkaAppsTimestamp: Long)
|
||||
|
||||
object RecordingChapterBreakSysMsg { val NAME = "RecordingChapterBreakSysMsg" }
|
||||
case class RecordingChapterBreakSysMsg(
|
||||
|
@ -3,10 +3,12 @@ package org.bigbluebutton.api.messaging.messages;
|
||||
public class KeepAliveReply implements IMessage {
|
||||
|
||||
public final String system;
|
||||
public final Long timestamp;
|
||||
|
||||
public KeepAliveReply(String system, Long timestamp) {
|
||||
public final Long bbbWebTimestamp;
|
||||
public final Long akkaAppsTimestamp;
|
||||
|
||||
public KeepAliveReply(String system, Long bbbWebTimestamp, Long akkaAppsTimestamp) {
|
||||
this.system = system;
|
||||
this.timestamp = timestamp;
|
||||
this.bbbWebTimestamp = bbbWebTimestamp;
|
||||
this.akkaAppsTimestamp = akkaAppsTimestamp;
|
||||
}
|
||||
}
|
||||
|
@ -19,6 +19,6 @@ public interface IPublisherService {
|
||||
void send(String channel, String message);
|
||||
void registerUser(String meetingID, String internalUserId, String fullname, String role, String externUserID,
|
||||
String authToken, String avatarURL, Boolean guest, Boolean authed);
|
||||
void sendKeepAlive(String system, Long timestamp);
|
||||
void sendKeepAlive(String system, Long bbbWebTimestamp, Long akkaAppsTimestamp);
|
||||
void sendStunTurnInfo(String meetingId, String internalUserId, Set<StunServer> stuns, Set<TurnEntry> turns);
|
||||
}
|
||||
|
@ -42,7 +42,7 @@ public interface IBbbWebApiGWApp {
|
||||
|
||||
void destroyMeeting(DestroyMeetingMessage msg);
|
||||
void endMeeting(EndMeetingMessage msg);
|
||||
void sendKeepAlive(String system, Long timestamp);
|
||||
void sendKeepAlive(String system, Long bbbWebTimestamp, Long akkaAppsTimestamp);
|
||||
void publishedRecording(PublishedRecordingMessage msg);
|
||||
void unpublishedRecording(UnpublishedRecordingMessage msg);
|
||||
void deletedRecording(DeletedRecordingMessage msg);
|
||||
|
@ -3,10 +3,12 @@ package org.bigbluebutton.web.services;
|
||||
public class KeepAlivePong implements KeepAliveMessage {
|
||||
|
||||
public final String system;
|
||||
public final Long timestamp;
|
||||
|
||||
public KeepAlivePong(String system, Long timestamp) {
|
||||
public final Long bbbWebTimestamp;
|
||||
public final Long akkaAppsTimestamp;
|
||||
|
||||
public KeepAlivePong(String system, Long bbbWebTimestamp, Long akkaAppsTimestamp) {
|
||||
this.system = system;
|
||||
this.timestamp = timestamp;
|
||||
this.bbbWebTimestamp = bbbWebTimestamp;
|
||||
this.akkaAppsTimestamp = akkaAppsTimestamp;
|
||||
}
|
||||
}
|
||||
|
@ -51,6 +51,7 @@ public class KeepAliveService implements MessageListener {
|
||||
private BlockingQueue<KeepAliveMessage> messages = new LinkedBlockingQueue<KeepAliveMessage>();
|
||||
|
||||
private Long lastKeepAliveMessage = 0L;
|
||||
private Long lastAkkaAppsTimestamp = 0L;
|
||||
|
||||
private static final String SYSTEM = "BbbWeb";
|
||||
|
||||
@ -123,7 +124,7 @@ public class KeepAliveService implements MessageListener {
|
||||
}
|
||||
|
||||
private void processPing(KeepAlivePing msg) {
|
||||
gw.sendKeepAlive(SYSTEM, System.currentTimeMillis());
|
||||
gw.sendKeepAlive(SYSTEM, System.currentTimeMillis(), lastAkkaAppsTimestamp);
|
||||
Boolean akkaAppsIsAvailable = available;
|
||||
|
||||
if (lastKeepAliveMessage != 0 && (System.currentTimeMillis() - lastKeepAliveMessage > 30000)) {
|
||||
@ -141,12 +142,13 @@ public class KeepAliveService implements MessageListener {
|
||||
}
|
||||
|
||||
lastKeepAliveMessage = System.currentTimeMillis();
|
||||
lastAkkaAppsTimestamp = msg.akkaAppsTimestamp;
|
||||
available = true;
|
||||
}
|
||||
|
||||
private void handleKeepAliveReply(String system, Long timestamp) {
|
||||
private void handleKeepAliveReply(String system, Long bbbWebTimestamp, Long akkaAppsTimestamp) {
|
||||
if (SYSTEM.equals(system)) {
|
||||
KeepAlivePong pong = new KeepAlivePong(system, timestamp);
|
||||
KeepAlivePong pong = new KeepAlivePong(system, bbbWebTimestamp, akkaAppsTimestamp);
|
||||
queueMessage(pong);
|
||||
}
|
||||
}
|
||||
@ -155,7 +157,7 @@ public class KeepAliveService implements MessageListener {
|
||||
public void handle(IMessage message) {
|
||||
if (message instanceof KeepAliveReply) {
|
||||
KeepAliveReply msg = (KeepAliveReply) message;
|
||||
handleKeepAliveReply(msg.system, msg.timestamp);
|
||||
handleKeepAliveReply(msg.system, msg.bbbWebTimestamp, msg.akkaAppsTimestamp);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -255,8 +255,8 @@ class BbbWebApiGWApp(
|
||||
msgToAkkaAppsEventBus.publish(MsgToAkkaApps(toAkkaAppsChannel, event))
|
||||
}
|
||||
|
||||
def sendKeepAlive(system: String, timestamp: java.lang.Long): Unit = {
|
||||
val event = MsgBuilder.buildCheckAlivePingSysMsg(system, timestamp.longValue())
|
||||
def sendKeepAlive(system: String, bbbWebTimestamp: java.lang.Long, akkaAppsTimestamp: java.lang.Long): Unit = {
|
||||
val event = MsgBuilder.buildCheckAlivePingSysMsg(system, bbbWebTimestamp.longValue(), akkaAppsTimestamp.longValue())
|
||||
msgToAkkaAppsEventBus.publish(MsgToAkkaApps(toAkkaAppsChannel, event))
|
||||
}
|
||||
|
||||
|
@ -64,11 +64,11 @@ object MsgBuilder {
|
||||
BbbCommonEnvCoreMsg(envelope, req)
|
||||
}
|
||||
|
||||
def buildCheckAlivePingSysMsg(system: String, timestamp: Long): BbbCommonEnvCoreMsg = {
|
||||
def buildCheckAlivePingSysMsg(system: String, bbbWebTimestamp: Long, akkaAppsTimestamp: Long): BbbCommonEnvCoreMsg = {
|
||||
val routing = collection.immutable.HashMap("sender" -> "bbb-web")
|
||||
val envelope = BbbCoreEnvelope(CheckAlivePingSysMsg.NAME, routing)
|
||||
val header = BbbCoreBaseHeader(CheckAlivePingSysMsg.NAME)
|
||||
val body = CheckAlivePingSysMsgBody(system, timestamp)
|
||||
val body = CheckAlivePingSysMsgBody(system, bbbWebTimestamp, akkaAppsTimestamp)
|
||||
val req = CheckAlivePingSysMsg(header, body)
|
||||
BbbCommonEnvCoreMsg(envelope, req)
|
||||
}
|
||||
|
@ -91,7 +91,8 @@ class OldMeetingMsgHdlrActor(val olgMsgGW: OldMessageReceivedGW)
|
||||
}
|
||||
|
||||
def handleCheckAlivePongSysMsg(msg: CheckAlivePongSysMsg): Unit = {
|
||||
olgMsgGW.handle(new org.bigbluebutton.api.messaging.messages.KeepAliveReply(msg.body.system, msg.body.timestamp))
|
||||
olgMsgGW.handle(new org.bigbluebutton.api.messaging.messages.KeepAliveReply(msg.body.system, msg.body.bbbWebTimestamp,
|
||||
msg.body.akkaAppsTimestamp))
|
||||
}
|
||||
|
||||
def handleUserJoinedMeetingEvtMsg(msg: UserJoinedMeetingEvtMsg): Unit = {
|
||||
|
Loading…
Reference in New Issue
Block a user