- make akka-apps and bbb-web work with redis password
This commit is contained in:
parent
57f47b7268
commit
a33271bc09
6
akka-bbb-apps/run-dev.sh
Executable file
6
akka-bbb-apps/run-dev.sh
Executable file
@ -0,0 +1,6 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
rm -rf src/main/resources
|
||||
cp -R src/universal/conf src/main/resources
|
||||
sbt run
|
||||
|
@ -1,6 +1,6 @@
|
||||
package org.bigbluebutton
|
||||
|
||||
import org.bigbluebutton.common2.redis.RedisPublisher
|
||||
import org.bigbluebutton.common2.redis.{ MessageSender, RedisConfig, RedisPublisher }
|
||||
import org.bigbluebutton.core._
|
||||
import org.bigbluebutton.core.bus._
|
||||
import org.bigbluebutton.core.pubsub.senders.ReceivedJsonMsgHandlerActor
|
||||
@ -8,10 +8,8 @@ import org.bigbluebutton.core2.AnalyticsActor
|
||||
import org.bigbluebutton.core2.FromAkkaAppsMsgSenderActor
|
||||
import org.bigbluebutton.endpoint.redis.AppsRedisSubscriberActor
|
||||
import org.bigbluebutton.endpoint.redis.RedisRecorderActor
|
||||
|
||||
import akka.actor.ActorSystem
|
||||
import akka.event.Logging
|
||||
import org.bigbluebutton.common2.redis.MessageSender
|
||||
import org.bigbluebutton.common2.bus.IncomingJsonMessageBus
|
||||
|
||||
object Boot extends App with SystemConfiguration {
|
||||
@ -28,10 +26,21 @@ object Boot extends App with SystemConfiguration {
|
||||
|
||||
val outGW = new OutMessageGatewayImp(outBus2)
|
||||
|
||||
val redisPublisher = new RedisPublisher(system, "BbbAppsAkkaPub")
|
||||
val redisPass = if (redisPassword != "") Some(redisPassword) else None
|
||||
val redisConfig = RedisConfig(redisHost, redisPort, redisPass, redisExpireKey)
|
||||
|
||||
val redisPublisher = new RedisPublisher(
|
||||
system,
|
||||
"BbbAppsAkkaPub",
|
||||
redisConfig
|
||||
)
|
||||
|
||||
val msgSender = new MessageSender(redisPublisher)
|
||||
|
||||
val redisRecorderActor = system.actorOf(RedisRecorderActor.props(system), "redisRecorderActor")
|
||||
val redisRecorderActor = system.actorOf(
|
||||
RedisRecorderActor.props(system, redisConfig),
|
||||
"redisRecorderActor"
|
||||
)
|
||||
|
||||
recordingEventBus.subscribe(redisRecorderActor, outMessageChannel)
|
||||
val incomingJsonMessageBus = new IncomingJsonMessageBus
|
||||
@ -39,6 +48,7 @@ object Boot extends App with SystemConfiguration {
|
||||
val bbbMsgBus = new BbbMsgRouterEventBus
|
||||
|
||||
val fromAkkaAppsMsgSenderActorRef = system.actorOf(FromAkkaAppsMsgSenderActor.props(msgSender))
|
||||
|
||||
val analyticsActorRef = system.actorOf(AnalyticsActor.props())
|
||||
outBus2.subscribe(fromAkkaAppsMsgSenderActorRef, outBbbMsgMsgChannel)
|
||||
outBus2.subscribe(redisRecorderActor, recordServiceMessageChannel)
|
||||
@ -52,5 +62,17 @@ object Boot extends App with SystemConfiguration {
|
||||
val redisMessageHandlerActor = system.actorOf(ReceivedJsonMsgHandlerActor.props(bbbMsgBus, incomingJsonMessageBus))
|
||||
incomingJsonMessageBus.subscribe(redisMessageHandlerActor, toAkkaAppsJsonChannel)
|
||||
|
||||
val redisSubscriberActor = system.actorOf(AppsRedisSubscriberActor.props(system, incomingJsonMessageBus), "redis-subscriber")
|
||||
val channelsToSubscribe = Seq(toAkkaAppsRedisChannel, fromVoiceConfRedisChannel)
|
||||
|
||||
val redisSubscriberActor = system.actorOf(
|
||||
AppsRedisSubscriberActor.props(
|
||||
system,
|
||||
incomingJsonMessageBus,
|
||||
redisConfig,
|
||||
channelsToSubscribe,
|
||||
Nil,
|
||||
toAkkaAppsJsonChannel
|
||||
),
|
||||
"redis-subscriber"
|
||||
)
|
||||
}
|
||||
|
@ -1,10 +1,10 @@
|
||||
package org.bigbluebutton
|
||||
|
||||
import scala.util.Try
|
||||
import com.typesafe.config.ConfigFactory
|
||||
|
||||
import org.bigbluebutton.common2.redis.RedisConfiguration
|
||||
|
||||
trait SystemConfiguration extends RedisConfiguration {
|
||||
trait SystemConfiguration {
|
||||
val config = ConfigFactory.load()
|
||||
|
||||
lazy val bbbWebHost = Try(config.getString("services.bbbWebHost")).getOrElse("localhost")
|
||||
lazy val bbbWebPort = Try(config.getInt("services.bbbWebPort")).getOrElse(8888)
|
||||
@ -47,4 +47,28 @@ trait SystemConfiguration extends RedisConfiguration {
|
||||
lazy val endMeetingWhenNoMoreAuthedUsers = Try(config.getBoolean("apps.endMeetingWhenNoMoreAuthedUsers")).getOrElse(false)
|
||||
lazy val endMeetingWhenNoMoreAuthedUsersAfterMinutes = Try(config.getInt("apps.endMeetingWhenNoMoreAuthedUsersAfterMinutes")).getOrElse(2)
|
||||
lazy val multiUserWhiteboardDefault = Try(config.getBoolean("whiteboard.multiUserDefault")).getOrElse(false)
|
||||
|
||||
// Redis server configuration
|
||||
lazy val redisHost = Try(config.getString("redis.host")).getOrElse("127.0.0.1")
|
||||
lazy val redisPort = Try(config.getInt("redis.port")).getOrElse(6379)
|
||||
lazy val redisPassword = Try(config.getString("redis.password")).getOrElse("")
|
||||
lazy val redisExpireKey = Try(config.getInt("redis.keyExpiry")).getOrElse(1209600)
|
||||
|
||||
// Redis channels
|
||||
lazy val toAkkaAppsRedisChannel = Try(config.getString("redis.toAkkaAppsRedisChannel")).getOrElse("to-akka-apps-redis-channel")
|
||||
lazy val fromAkkaAppsRedisChannel = Try(config.getString("redis.fromAkkaAppsRedisChannel")).getOrElse("from-akka-apps-redis-channel")
|
||||
|
||||
lazy val toVoiceConfRedisChannel = Try(config.getString("redis.toVoiceConfRedisChannel")).getOrElse("to-voice-conf-redis-channel")
|
||||
lazy val fromVoiceConfRedisChannel = Try(config.getString("redis.fromVoiceConfRedisChannel")).getOrElse("from-voice-conf-redis-channel")
|
||||
|
||||
lazy val fromAkkaAppsWbRedisChannel = Try(config.getString("redis.fromAkkaAppsWbRedisChannel")).getOrElse("from-akka-apps-wb-redis-channel")
|
||||
lazy val fromAkkaAppsChatRedisChannel = Try(config.getString("redis.fromAkkaAppsChatRedisChannel")).getOrElse("from-akka-apps-chat-redis-channel")
|
||||
lazy val fromAkkaAppsPresRedisChannel = Try(config.getString("redis.fromAkkaAppsPresRedisChannel")).getOrElse("from-akka-apps-pres-redis-channel")
|
||||
|
||||
lazy val fromBbbWebRedisChannel = Try(config.getString("redis.fromBbbWebRedisChannel")).getOrElse("from-bbb-web-redis-channel")
|
||||
|
||||
lazy val toAkkaTranscodeRedisChannel = Try(config.getString("redis.toAkkaTranscodeRedisChannel")).getOrElse("bigbluebutton:to-bbb-transcode:system")
|
||||
lazy val fromAkkaTranscodeRedisChannel = Try(config.getString("redis.fromAkkaTranscodeRedisChannel")).getOrElse("bigbluebutton:from-bbb-transcode:system")
|
||||
lazy val toAkkaTranscodeJsonChannel = Try(config.getString("eventBus.toAkkaTranscodeJsonChannel")).getOrElse("to-akka-transcode-json-channel")
|
||||
lazy val fromAkkaTranscodeJsonChannel = Try(config.getString("eventBus.fromAkkaTranscodeJsonChannel")).getOrElse("from-akka-transcode-json-channel")
|
||||
}
|
||||
|
@ -1,34 +1,47 @@
|
||||
package org.bigbluebutton.endpoint.redis
|
||||
|
||||
import org.bigbluebutton.SystemConfiguration
|
||||
import org.bigbluebutton.common2.bus.IncomingJsonMessageBus
|
||||
import org.bigbluebutton.common2.redis.{ RedisSubscriber, RedisSubscriberProvider }
|
||||
|
||||
import org.bigbluebutton.common2.redis.{ RedisConfig, RedisSubscriber, RedisSubscriberProvider }
|
||||
import akka.actor.ActorSystem
|
||||
import akka.actor.Props
|
||||
|
||||
object AppsRedisSubscriberActor extends RedisSubscriber {
|
||||
|
||||
val channels = Seq(toAkkaAppsRedisChannel, fromVoiceConfRedisChannel)
|
||||
val patterns = Seq("bigbluebutton:to-bbb-apps:*", "bigbluebutton:from-voice-conf:*", "bigbluebutton:from-bbb-transcode:*")
|
||||
|
||||
def props(system: ActorSystem, jsonMsgBus: IncomingJsonMessageBus): Props =
|
||||
object AppsRedisSubscriberActor {
|
||||
def props(
|
||||
system: ActorSystem,
|
||||
jsonMsgBus: IncomingJsonMessageBus,
|
||||
redisConfig: RedisConfig,
|
||||
channelsToSubscribe: Seq[String],
|
||||
patternsToSubscribe: Seq[String],
|
||||
forwardMsgToChannel: String
|
||||
): Props =
|
||||
Props(
|
||||
classOf[AppsRedisSubscriberActor],
|
||||
system, jsonMsgBus,
|
||||
redisHost, redisPort,
|
||||
channels, patterns
|
||||
system,
|
||||
jsonMsgBus,
|
||||
redisConfig,
|
||||
channelsToSubscribe,
|
||||
patternsToSubscribe,
|
||||
forwardMsgToChannel
|
||||
).withDispatcher("akka.redis-subscriber-worker-dispatcher")
|
||||
}
|
||||
|
||||
class AppsRedisSubscriberActor(
|
||||
system: ActorSystem,
|
||||
jsonMsgBus: IncomingJsonMessageBus,
|
||||
redisHost: String, redisPort: Int,
|
||||
channels: Seq[String] = Nil, patterns: Seq[String] = Nil
|
||||
system: ActorSystem,
|
||||
jsonMsgBus: IncomingJsonMessageBus,
|
||||
redisConfig: RedisConfig,
|
||||
channelsToSubscribe: Seq[String],
|
||||
patternsToSubscribe: Seq[String],
|
||||
forwardMsgToChannel: String
|
||||
)
|
||||
extends RedisSubscriberProvider(system, "BbbAppsAkkaSub", channels, patterns, jsonMsgBus) with SystemConfiguration {
|
||||
extends RedisSubscriberProvider(
|
||||
system,
|
||||
"BbbAppsAkkaSub",
|
||||
channelsToSubscribe,
|
||||
patternsToSubscribe,
|
||||
jsonMsgBus,
|
||||
redisConfig
|
||||
) {
|
||||
|
||||
addListener(toAkkaAppsJsonChannel)
|
||||
addListener(forwardMsgToChannel)
|
||||
subscribe()
|
||||
}
|
||||
|
@ -2,26 +2,37 @@ package org.bigbluebutton.endpoint.redis
|
||||
|
||||
import scala.collection.immutable.StringOps
|
||||
import scala.collection.JavaConverters._
|
||||
|
||||
import org.bigbluebutton.SystemConfiguration
|
||||
import org.bigbluebutton.common2.msgs._
|
||||
import org.bigbluebutton.common2.redis.RedisStorageProvider
|
||||
import org.bigbluebutton.common2.redis.{ RedisConfig, RedisStorageProvider }
|
||||
import org.bigbluebutton.core.apps.groupchats.GroupChatApp
|
||||
import org.bigbluebutton.core.record.events._
|
||||
|
||||
import akka.actor.Actor
|
||||
import akka.actor.ActorLogging
|
||||
import akka.actor.ActorSystem
|
||||
import akka.actor.Props
|
||||
|
||||
object RedisRecorderActor {
|
||||
def props(system: ActorSystem): Props = Props(classOf[RedisRecorderActor], system)
|
||||
def props(
|
||||
system: ActorSystem,
|
||||
redisConfig: RedisConfig
|
||||
): Props =
|
||||
Props(
|
||||
classOf[RedisRecorderActor],
|
||||
system,
|
||||
redisConfig
|
||||
)
|
||||
}
|
||||
|
||||
class RedisRecorderActor(system: ActorSystem)
|
||||
extends RedisStorageProvider(system, "BbbAppsAkkaRecorder")
|
||||
with SystemConfiguration
|
||||
with Actor with ActorLogging {
|
||||
class RedisRecorderActor(
|
||||
system: ActorSystem,
|
||||
redisConfig: RedisConfig
|
||||
)
|
||||
extends RedisStorageProvider(
|
||||
system,
|
||||
"BbbAppsAkkaRecorder",
|
||||
redisConfig
|
||||
) with Actor with ActorLogging {
|
||||
|
||||
private def record(session: String, message: java.util.Map[java.lang.String, java.lang.String]): Unit = {
|
||||
redis.recordAndExpire(session, message)
|
||||
|
@ -30,7 +30,7 @@ akka {
|
||||
redis {
|
||||
host="127.0.0.1"
|
||||
port=6379
|
||||
password=""
|
||||
password="foobared2"
|
||||
# recording keys should expire in 14 days
|
||||
keyExpiry=1209600
|
||||
}
|
||||
|
2
bbb-common-message/src/main/java/org/bigbluebutton/common2/redis/RedisAwareCommunicator.java
Normal file → Executable file
2
bbb-common-message/src/main/java/org/bigbluebutton/common2/redis/RedisAwareCommunicator.java
Normal file → Executable file
@ -54,6 +54,8 @@ public abstract class RedisAwareCommunicator {
|
||||
}
|
||||
|
||||
protected void connectionStatusHandler(Event event, Logger log) {
|
||||
System.out.println("******** RedisAwareCommunicator - " + event);
|
||||
|
||||
if (event instanceof ConnectedEvent) {
|
||||
log.info("Connected to redis");
|
||||
} else if (event instanceof ConnectionActivatedEvent) {
|
||||
|
2
bbb-common-message/src/main/java/org/bigbluebutton/common2/redis/RedisStorageService.java
Normal file → Executable file
2
bbb-common-message/src/main/java/org/bigbluebutton/common2/redis/RedisStorageService.java
Normal file → Executable file
@ -37,7 +37,7 @@ public class RedisStorageService extends RedisAwareCommunicator {
|
||||
StatefulRedisConnection<String, String> connection;
|
||||
|
||||
public void start() {
|
||||
log.info("Starting RedisStorageService with client name: {}", clientName);
|
||||
log.info("Starting RedisStorageService with client name: clientName={} password={}", clientName, this.password);
|
||||
RedisURI redisUri = RedisURI.Builder.redis(this.host, this.port).withClientName(this.clientName)
|
||||
.withPassword(this.password).build();
|
||||
|
||||
|
14
bbb-common-message/src/main/scala/org/bigbluebutton/common2/redis/RedisClientProvider.scala
Normal file → Executable file
14
bbb-common-message/src/main/scala/org/bigbluebutton/common2/redis/RedisClientProvider.scala
Normal file → Executable file
@ -5,10 +5,20 @@ import io.lettuce.core.ClientOptions
|
||||
import io.lettuce.core.RedisClient
|
||||
import io.lettuce.core.RedisURI
|
||||
|
||||
abstract class RedisClientProvider(val system: ActorSystem, val clientName: String) extends RedisConfiguration {
|
||||
abstract class RedisClientProvider(
|
||||
val system: ActorSystem,
|
||||
val clientName: String,
|
||||
val redisConfig: RedisConfig
|
||||
) {
|
||||
// Set the name of this client to be able to distinguish when doing
|
||||
// CLIENT LIST on redis-cli
|
||||
val redisUri = RedisURI.Builder.redis(redisHost, redisPort).withClientName(clientName).withPassword(redisPassword).build()
|
||||
val redisPassword = redisConfig.password match {
|
||||
case Some(pass) => pass
|
||||
case None => ""
|
||||
}
|
||||
|
||||
println("******* RedisClientProvider clientName=" + clientName + " password=" + redisPassword)
|
||||
val redisUri = RedisURI.Builder.redis(redisConfig.host, redisConfig.port).withClientName(clientName).withPassword(redisPassword).build()
|
||||
|
||||
var redis = RedisClient.create(redisUri)
|
||||
redis.setOptions(ClientOptions.builder().autoReconnect(true).build())
|
||||
|
2
bbb-common-message/src/main/scala/org/bigbluebutton/common2/redis/RedisConfiguration.scala
Normal file → Executable file
2
bbb-common-message/src/main/scala/org/bigbluebutton/common2/redis/RedisConfiguration.scala
Normal file → Executable file
@ -3,7 +3,7 @@ package org.bigbluebutton.common2.redis
|
||||
import scala.util.Try
|
||||
import com.typesafe.config.ConfigFactory
|
||||
|
||||
trait RedisConfiguration {
|
||||
trait RedisConfiguration2 {
|
||||
val config = ConfigFactory.load()
|
||||
|
||||
// Redis server configuration
|
||||
|
14
bbb-common-message/src/main/scala/org/bigbluebutton/common2/redis/RedisConnectionHandler.scala
Normal file → Executable file
14
bbb-common-message/src/main/scala/org/bigbluebutton/common2/redis/RedisConnectionHandler.scala
Normal file → Executable file
@ -7,23 +7,27 @@ import io.lettuce.core.event.connection.{ ConnectionDeactivatedEvent, Connection
|
||||
import reactor.core.Disposable
|
||||
import akka.event.LoggingAdapter
|
||||
|
||||
case class RedisConfig(host: String, port: Int, password: Option[String], expireKey: Int)
|
||||
|
||||
trait RedisConnectionHandler {
|
||||
|
||||
def subscribeToEventBus(redis: RedisClient, log: LoggingAdapter) {
|
||||
val eventBus: EventBus = redis.getResources().eventBus();
|
||||
val eventBus: EventBus = redis.getResources().eventBus()
|
||||
// @todo : unsubscribe when connection is closed
|
||||
val eventBusSubscription: Disposable = eventBus.get().subscribe(e => connectionStatusHandler(e, log))
|
||||
}
|
||||
|
||||
def connectionStatusHandler(event: Event, log: LoggingAdapter) {
|
||||
println("******* RedisConnectionHandler - " + event)
|
||||
|
||||
if (event.isInstanceOf[ConnectedEvent]) {
|
||||
log.info("Connected to redis");
|
||||
log.info("Connected to redis")
|
||||
} else if (event.isInstanceOf[ConnectionActivatedEvent]) {
|
||||
log.info("Connection to redis activated");
|
||||
log.info("Connection to redis activated")
|
||||
} else if (event.isInstanceOf[DisconnectedEvent]) {
|
||||
log.info("Disconnected from redis");
|
||||
log.info("Disconnected from redis")
|
||||
} else if (event.isInstanceOf[ConnectionDeactivatedEvent]) {
|
||||
log.info("Connection to redis deactivated");
|
||||
log.info("Connection to redis deactivated")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -3,7 +3,16 @@ package org.bigbluebutton.common2.redis
|
||||
import akka.actor.ActorSystem
|
||||
import akka.event.Logging
|
||||
|
||||
class RedisPublisher(system: ActorSystem, clientName: String) extends RedisClientProvider(system, clientName) with RedisConnectionHandler {
|
||||
class RedisPublisher(
|
||||
system: ActorSystem,
|
||||
clientName: String,
|
||||
redisConfig: RedisConfig
|
||||
)
|
||||
extends RedisClientProvider(
|
||||
system,
|
||||
clientName,
|
||||
redisConfig
|
||||
) with RedisConnectionHandler {
|
||||
|
||||
val log = Logging(system, getClass)
|
||||
|
||||
@ -14,8 +23,9 @@ class RedisPublisher(system: ActorSystem, clientName: String) extends RedisClien
|
||||
redis.connect()
|
||||
|
||||
def publish(channel: String, data: String) {
|
||||
val async = connection.async();
|
||||
async.publish(channel, data);
|
||||
val async = connection.async()
|
||||
println("****** RedisPublisher publish to channel " + channel)
|
||||
async.publish(channel, data)
|
||||
}
|
||||
|
||||
}
|
||||
|
17
bbb-common-message/src/main/scala/org/bigbluebutton/common2/redis/RedisStorageProvider.scala
Normal file → Executable file
17
bbb-common-message/src/main/scala/org/bigbluebutton/common2/redis/RedisStorageProvider.scala
Normal file → Executable file
@ -2,12 +2,17 @@ package org.bigbluebutton.common2.redis
|
||||
|
||||
import akka.actor.ActorSystem
|
||||
|
||||
abstract class RedisStorageProvider(system: ActorSystem, clientName: String) extends RedisConfiguration {
|
||||
abstract class RedisStorageProvider(system: ActorSystem, clientName: String, config: RedisConfig) {
|
||||
val redisPass = config.password match {
|
||||
case Some(pass) => pass
|
||||
case None => ""
|
||||
}
|
||||
|
||||
var redis = new RedisStorageService()
|
||||
redis.setHost(redisHost)
|
||||
redis.setPort(redisPort)
|
||||
redis.setPassword(redisPassword)
|
||||
redis.setExpireKey(redisExpireKey)
|
||||
redis.setHost(config.host)
|
||||
redis.setPort(config.port)
|
||||
redis.setPassword(redisPass)
|
||||
redis.setExpireKey(config.expireKey)
|
||||
redis.setClientName(clientName)
|
||||
redis.start();
|
||||
redis.start()
|
||||
}
|
||||
|
6
bbb-common-message/src/main/scala/org/bigbluebutton/common2/redis/RedisSubscriber.scala
Normal file → Executable file
6
bbb-common-message/src/main/scala/org/bigbluebutton/common2/redis/RedisSubscriber.scala
Normal file → Executable file
@ -1,6 +1,6 @@
|
||||
package org.bigbluebutton.common2.redis
|
||||
|
||||
trait RedisSubscriber extends RedisConfiguration {
|
||||
val channels: Seq[String]
|
||||
val patterns: Seq[String]
|
||||
trait RedisSubscriber {
|
||||
val channelsToSubscribe: Seq[String]
|
||||
val patternsToSubscribe: Seq[String]
|
||||
}
|
||||
|
27
bbb-common-message/src/main/scala/org/bigbluebutton/common2/redis/RedisSubscriberProvider.scala
Normal file → Executable file
27
bbb-common-message/src/main/scala/org/bigbluebutton/common2/redis/RedisSubscriberProvider.scala
Normal file → Executable file
@ -15,21 +15,30 @@ import java.io.StringWriter
|
||||
import scala.concurrent.duration._
|
||||
import java.io.PrintWriter
|
||||
|
||||
abstract class RedisSubscriberProvider(system: ActorSystem, clientName: String,
|
||||
channels: Seq[String], patterns: Seq[String],
|
||||
jsonMsgBus: IncomingJsonMessageBus)
|
||||
extends RedisClientProvider(system, clientName) with RedisConnectionHandler with Actor with ActorLogging {
|
||||
abstract class RedisSubscriberProvider(
|
||||
system: ActorSystem,
|
||||
clientName: String,
|
||||
channelsToSubscribe: Seq[String],
|
||||
patternsToSubscribe: Seq[String],
|
||||
jsonMsgBus: IncomingJsonMessageBus,
|
||||
redisConfig: RedisConfig
|
||||
)
|
||||
extends RedisClientProvider(
|
||||
system,
|
||||
clientName,
|
||||
redisConfig
|
||||
) with RedisConnectionHandler with Actor with ActorLogging {
|
||||
|
||||
subscribeToEventBus(redis, log)
|
||||
|
||||
var connection = redis.connectPubSub()
|
||||
|
||||
def addListener(appChannel: String) {
|
||||
def addListener(forwardMsgToChannel: String) {
|
||||
connection.addListener(new RedisPubSubListener[String, String] {
|
||||
def message(channel: String, message: String): Unit = {
|
||||
if (channels.contains(channel)) {
|
||||
if (channelsToSubscribe.contains(channel)) {
|
||||
val receivedJsonMessage = new ReceivedJsonMessage(channel, message)
|
||||
jsonMsgBus.publish(IncomingJsonMessage(appChannel, receivedJsonMessage))
|
||||
jsonMsgBus.publish(IncomingJsonMessage(forwardMsgToChannel, receivedJsonMessage))
|
||||
}
|
||||
}
|
||||
def message(pattern: String, channel: String, message: String): Unit = { log.info("Subscribed to channel {} with pattern {}", channel, pattern) }
|
||||
@ -42,8 +51,8 @@ abstract class RedisSubscriberProvider(system: ActorSystem, clientName: String,
|
||||
|
||||
def subscribe() {
|
||||
val async = connection.async()
|
||||
for (channel <- channels) async.subscribe(channel)
|
||||
for (pattern <- patterns) async.psubscribe(pattern)
|
||||
for (channel <- channelsToSubscribe) async.subscribe(channel)
|
||||
for (pattern <- patternsToSubscribe) async.psubscribe(pattern)
|
||||
}
|
||||
|
||||
override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
|
||||
|
@ -19,7 +19,11 @@ class BbbWebApiGWApp(
|
||||
val oldMessageReceivedGW: OldMessageReceivedGW,
|
||||
val screenshareRtmpServer: String,
|
||||
val screenshareRtmpBroadcastApp: String,
|
||||
val screenshareConfSuffix: String
|
||||
val screenshareConfSuffix: String,
|
||||
redisHost: String,
|
||||
redisPort: Int,
|
||||
redisPassword: String,
|
||||
redisExpireKey: Int
|
||||
) extends IBbbWebApiGWApp with SystemConfiguration {
|
||||
|
||||
implicit val system = ActorSystem("bbb-web-common")
|
||||
@ -29,7 +33,11 @@ class BbbWebApiGWApp(
|
||||
val log = Logging(system, getClass)
|
||||
|
||||
private val jsonMsgToAkkaAppsBus = new JsonMsgToAkkaAppsBus
|
||||
private val redisPublisher = new RedisPublisher(system, "BbbWebPub")
|
||||
|
||||
val redisPass = if (redisPassword != "") Some(redisPassword) else None
|
||||
val redisConfig = RedisConfig(redisHost, redisPort, redisPass, redisExpireKey)
|
||||
private val redisPublisher = new RedisPublisher(system, "BbbWebPub", redisConfig)
|
||||
|
||||
private val msgSender: MessageSender = new MessageSender(redisPublisher)
|
||||
private val messageSenderActorRef = system.actorOf(MessageSenderActor.props(msgSender), "messageSenderActor")
|
||||
|
||||
@ -58,7 +66,24 @@ class BbbWebApiGWApp(
|
||||
|
||||
msgToAkkaAppsEventBus.subscribe(msgToAkkaAppsToJsonActor, toAkkaAppsChannel)
|
||||
|
||||
private val appsRedisSubscriberActor = system.actorOf(WebRedisSubscriberActor.props(system, receivedJsonMsgBus, oldMessageEventBus), "appsRedisSubscriberActor")
|
||||
// Not used but needed by internal class (ralam april 4, 2019)
|
||||
val incomingJsonMessageBus = new IncomingJsonMessageBus
|
||||
|
||||
val channelsToSubscribe = Seq(fromAkkaAppsRedisChannel)
|
||||
private val appsRedisSubscriberActor = system.actorOf(
|
||||
WebRedisSubscriberActor.props(
|
||||
system,
|
||||
receivedJsonMsgBus,
|
||||
oldMessageEventBus,
|
||||
incomingJsonMessageBus,
|
||||
redisConfig,
|
||||
channelsToSubscribe,
|
||||
Nil,
|
||||
fromAkkaAppsJsonChannel,
|
||||
fromAkkaAppsOldJsonChannel
|
||||
),
|
||||
"appsRedisSubscriberActor"
|
||||
)
|
||||
|
||||
private val receivedJsonMsgHdlrActor = system.actorOf(
|
||||
ReceivedJsonMsgHdlrActor.props(msgFromAkkaAppsEventBus), "receivedJsonMsgHdlrActor"
|
||||
|
@ -3,10 +3,14 @@ package org.bigbluebutton.api2
|
||||
import com.typesafe.config.ConfigFactory
|
||||
|
||||
import scala.util.Try
|
||||
import org.bigbluebutton.common2.redis.RedisConfiguration
|
||||
|
||||
trait SystemConfiguration extends RedisConfiguration {
|
||||
override val config = ConfigFactory.load("bbb-web")
|
||||
trait SystemConfiguration {
|
||||
val config = ConfigFactory.load("bbb-web")
|
||||
|
||||
lazy val toAkkaAppsRedisChannel = Try(config.getString("redis.toAkkaAppsRedisChannel")).getOrElse("to-akka-apps-redis-channel")
|
||||
lazy val fromAkkaAppsRedisChannel = Try(config.getString("redis.fromAkkaAppsRedisChannel")).getOrElse("from-akka-apps-redis-channel")
|
||||
|
||||
lazy val fromBbbWebRedisChannel = Try(config.getString("redis.fromBbbWebRedisChannel")).getOrElse("from-bbb-web-redis-channel")
|
||||
|
||||
lazy val fromAkkaAppsChannel = Try(config.getString("eventBus.fromAkkaAppsChannel")).getOrElse("from-akka-apps-channel")
|
||||
lazy val toAkkaAppsChannel = Try(config.getString("eventBus.toAkkaAppsChannel")).getOrElse("to-akka-apps-channel")
|
||||
|
@ -2,46 +2,73 @@ package org.bigbluebutton.api2.endpoint.redis
|
||||
|
||||
import org.bigbluebutton.api2.SystemConfiguration
|
||||
import org.bigbluebutton.common2.bus._
|
||||
import org.bigbluebutton.common2.redis.{ RedisConfiguration, RedisSubscriber, RedisSubscriberProvider }
|
||||
|
||||
import org.bigbluebutton.common2.redis.{ RedisConfig, RedisSubscriberProvider }
|
||||
import akka.actor.ActorSystem
|
||||
import akka.actor.Props
|
||||
|
||||
import io.lettuce.core.pubsub.RedisPubSubListener
|
||||
|
||||
object WebRedisSubscriberActor extends RedisSubscriber with RedisConfiguration {
|
||||
object WebRedisSubscriberActor {
|
||||
|
||||
val channels = Seq(fromAkkaAppsRedisChannel)
|
||||
val patterns = Seq("bigbluebutton:from-bbb-apps:*")
|
||||
// val channels = Seq(fromAkkaAppsRedisChannel)
|
||||
// val patterns = Seq("bigbluebutton:from-bbb-apps:*")
|
||||
|
||||
def props(system: ActorSystem, jsonMsgBus: JsonMsgFromAkkaAppsBus, oldMessageEventBus: OldMessageEventBus): Props =
|
||||
def props(
|
||||
system: ActorSystem,
|
||||
jsonMsgBus: JsonMsgFromAkkaAppsBus,
|
||||
oldMessageEventBus: OldMessageEventBus,
|
||||
incomingJsonMsgBus: IncomingJsonMessageBus,
|
||||
redisConfig: RedisConfig,
|
||||
channelsToSubscribe: Seq[String],
|
||||
patternsToSubscribe: Seq[String],
|
||||
forwardMsgToChannel: String,
|
||||
oldJsonChannel: String
|
||||
): Props =
|
||||
Props(
|
||||
classOf[WebRedisSubscriberActor],
|
||||
system, jsonMsgBus, oldMessageEventBus,
|
||||
redisHost, redisPort,
|
||||
channels, patterns
|
||||
system,
|
||||
jsonMsgBus,
|
||||
oldMessageEventBus,
|
||||
incomingJsonMsgBus,
|
||||
redisConfig,
|
||||
channelsToSubscribe,
|
||||
patternsToSubscribe,
|
||||
forwardMsgToChannel,
|
||||
oldJsonChannel
|
||||
).withDispatcher("akka.redis-subscriber-worker-dispatcher")
|
||||
}
|
||||
|
||||
class WebRedisSubscriberActor(
|
||||
system: ActorSystem,
|
||||
jsonMsgBus: JsonMsgFromAkkaAppsBus, oldMessageEventBus: OldMessageEventBus, redisHost: String,
|
||||
redisPort: Int,
|
||||
channels: Seq[String] = Nil, patterns: Seq[String] = Nil
|
||||
)
|
||||
extends RedisSubscriberProvider(system, "BbbWebSub", channels, patterns, null) with SystemConfiguration {
|
||||
system: ActorSystem,
|
||||
jsonMsgBus: JsonMsgFromAkkaAppsBus,
|
||||
oldMessageEventBus: OldMessageEventBus,
|
||||
incomingJsonMsgBus: IncomingJsonMessageBus, // Not used. Just to satisfy RedisSubscriberProvider (ralam april 4, 2019)
|
||||
redisConfig: RedisConfig,
|
||||
channelsToSubscribe: Seq[String],
|
||||
patternsToSubscribe: Seq[String],
|
||||
forwardMsgToChannel: String,
|
||||
oldJsonChannel: String
|
||||
) extends RedisSubscriberProvider(
|
||||
system,
|
||||
"BbbWebSub",
|
||||
channelsToSubscribe,
|
||||
patternsToSubscribe,
|
||||
incomingJsonMsgBus, // Not used. Just to satisfy RedisSubscriberProvider (ralam april 4, 2019)
|
||||
redisConfig
|
||||
) {
|
||||
|
||||
override def addListener(appChannel: String) {
|
||||
override def addListener(forwardMsgToChannel: String) {
|
||||
connection.addListener(new RedisPubSubListener[String, String] {
|
||||
def message(channel: String, message: String): Unit = {
|
||||
if (channels.contains(channel)) {
|
||||
if (channelsToSubscribe.contains(channel)) {
|
||||
val receivedJsonMessage = new JsonMsgFromAkkaApps(channel, message)
|
||||
jsonMsgBus.publish(JsonMsgFromAkkaAppsEvent(fromAkkaAppsJsonChannel, receivedJsonMessage))
|
||||
jsonMsgBus.publish(JsonMsgFromAkkaAppsEvent(forwardMsgToChannel, receivedJsonMessage))
|
||||
}
|
||||
}
|
||||
def message(pattern: String, channel: String, message: String): Unit = {
|
||||
log.debug(s"RECEIVED:\n ${message} \n")
|
||||
val receivedJsonMessage = new OldReceivedJsonMessage(pattern, channel, message)
|
||||
oldMessageEventBus.publish(OldIncomingJsonMessage(fromAkkaAppsOldJsonChannel, receivedJsonMessage))
|
||||
oldMessageEventBus.publish(OldIncomingJsonMessage(oldJsonChannel, receivedJsonMessage))
|
||||
}
|
||||
def psubscribed(pattern: String, count: Long): Unit = { log.info("Subscribed to pattern {}", pattern) }
|
||||
def punsubscribed(pattern: String, count: Long): Unit = { log.info("Unsubscribed from pattern {}", pattern) }
|
||||
@ -50,6 +77,6 @@ class WebRedisSubscriberActor(
|
||||
})
|
||||
}
|
||||
|
||||
addListener(fromAkkaAppsJsonChannel)
|
||||
addListener(forwardMsgToChannel)
|
||||
subscribe()
|
||||
}
|
||||
|
2
bigbluebutton-web/grails-app/conf/application.conf
Normal file → Executable file
2
bigbluebutton-web/grails-app/conf/application.conf
Normal file → Executable file
@ -30,7 +30,7 @@ akka {
|
||||
redis {
|
||||
host="127.0.0.1"
|
||||
port=6379
|
||||
password=""
|
||||
password="foobared2"
|
||||
# recording keys should expire in 14 days
|
||||
keyExpiry=1209600
|
||||
}
|
||||
|
@ -197,7 +197,7 @@ keepEvents=false
|
||||
#----------------------------------------------------
|
||||
# This URL is where the BBB client is accessible. When a user sucessfully
|
||||
# enters a name and password, she is redirected here to load the client.
|
||||
bigbluebutton.web.serverURL=http://10.130.218.38
|
||||
bigbluebutton.web.serverURL=https://ritz-20.blindside-dev.com
|
||||
|
||||
|
||||
#----------------------------------------------------
|
||||
@ -244,7 +244,8 @@ recordStatusDir=/var/bigbluebutton/recording/status/recorded
|
||||
|
||||
redisHost=127.0.0.1
|
||||
redisPort=6379
|
||||
redisPassword=
|
||||
redisPassword=foobared2
|
||||
redisKeyExpiry=1209600
|
||||
|
||||
# The directory where the published/unpublised recordings are located. This is for
|
||||
# the get recording* api calls
|
||||
|
@ -32,7 +32,7 @@ with BigBlueButton; if not, see <http://www.gnu.org/licenses/>.
|
||||
init-method="start" destroy-method="stop">
|
||||
<property name="host" value="${redisHost}" />
|
||||
<property name="port" value="${redisPort}" />
|
||||
<property name="password" value="${redisPassword:}" />
|
||||
<property name="password" value="${redisPassword}" />
|
||||
<property name="clientName" value="BbbWeb" />
|
||||
</bean>
|
||||
|
||||
|
@ -60,6 +60,10 @@ with BigBlueButton; if not, see <http://www.gnu.org/licenses/>.
|
||||
<constructor-arg index="1" value="${screenshareRtmpServer}"/>
|
||||
<constructor-arg index="2" value="${screenshareRtmpBroadcastApp}"/>
|
||||
<constructor-arg index="3" value="${screenshareConfSuffix}"/>
|
||||
<constructor-arg index="4" value="${redisHost}"/>
|
||||
<constructor-arg index="5" value="${redisPort}"/>
|
||||
<constructor-arg index="6" value="${redisPassword}"/>
|
||||
<constructor-arg index="7" value="${redisKeyExpiry}"/>
|
||||
</bean>
|
||||
|
||||
<bean id="recordingServiceHelper" class="org.bigbluebutton.api.util.RecordingMetadataReaderHelper">
|
||||
|
Loading…
Reference in New Issue
Block a user