Re-added redis connection pooling and running subscribers and publishers inside executors.

This commit is contained in:
Ghazi Triki 2018-11-30 18:49:22 +01:00
parent 6b20807fc1
commit 7e736d41f6
8 changed files with 170 additions and 82 deletions

View File

@ -18,10 +18,11 @@ object Dependencies {
val jackson = "2.9.7"
val sl4j = "1.7.25"
val red5 = "1.0.10-M5"
val pool = "2.6.0"
// Redis
val redisScala = "1.8.0"
val lettuce = "5.1.2.RELEASE"
val lettuce = "5.1.3.RELEASE"
// Test
val scalaTest = "3.0.5"
@ -37,6 +38,7 @@ object Dependencies {
val jacksonModule = "com.fasterxml.jackson.module" %% "jackson-module-scala" % Versions.jackson
val sl4jApi = "org.slf4j" % "slf4j-api" % Versions.sl4j % "runtime"
val red5 = "org.red5" % "red5-server-common" % Versions.red5
val apachePool2 = "org.apache.commons" % "commons-pool2" % Versions.pool
val redisScala = "com.github.etaty" % "rediscala_2.12" % Versions.redisScala
val lettuceCore = "io.lettuce" % "lettuce-core" % Versions.lettuce
@ -63,6 +65,7 @@ object Dependencies {
Compile.jacksonModule,
Compile.sl4jApi,
Compile.red5,
Compile.apachePool2,
Compile.lettuceCore,
Compile.redisScala) ++ testing
}

View File

@ -19,6 +19,8 @@
package org.bigbluebutton.common2.redis;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import io.lettuce.core.RedisClient;
public abstract class RedisAwareCommunicator {
@ -49,4 +51,19 @@ public abstract class RedisAwareCommunicator {
public void setPort(int port) {
this.port = port;
}
protected GenericObjectPoolConfig createPoolingConfig() {
GenericObjectPoolConfig config = new GenericObjectPoolConfig();
config.setMaxTotal(32);
config.setMaxIdle(8);
config.setMinIdle(1);
config.setTestOnBorrow(true);
config.setTestOnReturn(true);
config.setTestWhileIdle(true);
config.setNumTestsPerEvictionRun(12);
config.setMaxWaitMillis(5000);
config.setTimeBetweenEvictionRunsMillis(60000);
config.setBlockWhenExhausted(true);
return config;
}
}

View File

@ -21,6 +21,7 @@ package org.bigbluebutton.common2.redis;
import java.util.Map;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -29,18 +30,18 @@ import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisURI;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.sync.RedisCommands;
import io.lettuce.core.support.ConnectionPoolSupport;
public class RedisStorageService extends RedisAwareCommunicator {
private static Logger log = LoggerFactory.getLogger(RedisStorageService.class);
private long expireKey;
private int expireKey;
RedisCommands<String, String> commands;
private StatefulRedisConnection<String, String> connection;
GenericObjectPool<StatefulRedisConnection<String, String>> connectionPool;
public void start() {
log.info("Starting RedisStorageService");
log.info("Starting RedisStorageService with client name: {}", clientName);
RedisURI redisUri = RedisURI.Builder.redis(this.host, this.port).withClientName(this.clientName).build();
if (!this.password.isEmpty()) {
redisUri.setPassword(this.password);
@ -49,12 +50,12 @@ public class RedisStorageService extends RedisAwareCommunicator {
redisClient = RedisClient.create(redisUri);
redisClient.setOptions(ClientOptions.builder().autoReconnect(true).build());
connection = redisClient.connect();
commands = connection.sync();
connectionPool = ConnectionPoolSupport.createGenericObjectPool(() -> redisClient.connectPubSub(),
createPoolingConfig());
}
public void stop() {
connection.close();
connectionPool.close();
redisClient.shutdown();
log.info("RedisStorageService Stopped");
}
@ -71,51 +72,89 @@ public class RedisStorageService extends RedisAwareCommunicator {
public void addBreakoutRoom(String parentId, String breakoutId) {
log.debug("Saving breakout room for meeting {}", parentId);
commands.sadd(Keys.BREAKOUT_ROOMS + parentId, breakoutId);
try (StatefulRedisConnection<String, String> connection = connectionPool.borrowObject()) {
RedisCommands<String, String> commands = connection.sync();
commands.sadd(Keys.BREAKOUT_ROOMS + parentId, breakoutId);
} catch (Exception e) {
log.error("Cannot add breakout room data: {}", parentId, e);
} finally {
connectionPool.close();
}
}
public void record(String meetingId, Map<String, String> event) {
log.debug("Recording meeting event {} inside a transaction", meetingId);
commands.multi();
Long msgid = commands.incr("global:nextRecordedMsgId");
commands.hmset("recording:" + meetingId + ":" + msgid, event);
commands.rpush("meeting:" + meetingId + ":" + "recordings", Long.toString(msgid));
commands.exec();
try (StatefulRedisConnection<String, String> connection = connectionPool.borrowObject()) {
RedisCommands<String, String> commands = connection.sync();
commands.multi();
Long msgid = commands.incr("global:nextRecordedMsgId");
commands.hmset("recording:" + meetingId + ":" + msgid, event);
commands.rpush("meeting:" + meetingId + ":" + "recordings", Long.toString(msgid));
commands.exec();
} catch (Exception e) {
log.debug("Cannot record meeting data: {}", meetingId, e);
} finally {
connectionPool.close();
}
}
// @fixme: not used anywhere
public void removeMeeting(String meetingId) {
log.debug("Removing meeting meeting {} inside a transaction", meetingId);
commands.multi();
commands.del(Keys.MEETING + meetingId);
commands.srem(Keys.MEETINGS + meetingId);
commands.exec();
try (StatefulRedisConnection<String, String> connection = connectionPool.borrowObject()) {
RedisCommands<String, String> commands = connection.sync();
commands.multi();
commands.del(Keys.MEETING + meetingId);
commands.srem(Keys.MEETINGS + meetingId);
commands.exec();
} catch (Exception e) {
log.debug("Cannot remove meeting data : {}", meetingId, e);
} finally {
connectionPool.close();
}
}
public void recordAndExpire(String meetingId, Map<String, String> event) {
log.debug("Recording meeting event {} inside a transaction", meetingId);
commands.multi();
try (StatefulRedisConnection<String, String> connection = connectionPool.borrowObject()) {
RedisCommands<String, String> commands = connection.sync();
commands.multi();
Long msgid = commands.incr("global:nextRecordedMsgId");
commands.hmset("recording:" + meetingId + ":" + msgid, event);
commands.rpush("meeting:" + meetingId + ":recordings", Long.toString(msgid));
/**
* We set the key to expire after 14 days as we are still recording the
* event into redis even if the meeting is not recorded. (ralam sept 23,
* 2015)
*/
commands.expire("meeting:" + meetingId + ":recordings", expireKey);
commands.rpush("meeting:" + meetingId + ":recordings", Long.toString(msgid));
commands.expire("meeting:" + meetingId + ":recordings", expireKey);
commands.exec();
Long msgid = commands.incr("global:nextRecordedMsgId");
commands.hmset("recording:" + meetingId + ":" + msgid, event);
commands.rpush("meeting:" + meetingId + ":recordings", Long.toString(msgid));
/**
* We set the key to expire after 14 days as we are still recording
* the event into redis even if the meeting is not recorded. (ralam
* sept 23, 2015)
*/
commands.expire("meeting:" + meetingId + ":recordings", expireKey);
commands.rpush("meeting:" + meetingId + ":recordings", Long.toString(msgid));
commands.expire("meeting:" + meetingId + ":recordings", expireKey);
commands.exec();
} catch (Exception e) {
log.error("Cannot record data with expire: {}", meetingId, e);
} finally {
connectionPool.close();
}
}
public void setExpireKey(long expireKey) {
public void setExpireKey(int expireKey) {
this.expireKey = expireKey;
}
private String recordMeeting(String key, Map<String, String> info) {
return commands.hmset(key, info);
}
log.debug("Storing metadata {}", info);
String result = "";
try (StatefulRedisConnection<String, String> connection = connectionPool.borrowObject()) {
RedisCommands<String, String> commands = connection.sync();
result = commands.hmset(key, info);
} catch (Exception e) {
log.debug("Cannot record data with expire: {}", key, e);
} finally {
connectionPool.close();
}
return result;
}
}
}

View File

@ -1,5 +1,9 @@
package org.bigbluebutton.common2.redis.pubsub;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.bigbluebutton.common2.redis.RedisAwareCommunicator;
import org.red5.logging.Red5LoggerFactory;
import org.slf4j.Logger;
@ -11,13 +15,16 @@ import io.lettuce.core.RedisURI;
import io.lettuce.core.pubsub.RedisPubSubListener;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands;
import io.lettuce.core.support.ConnectionPoolSupport;
public class MessageReceiver extends RedisAwareCommunicator {
private static Logger log = Red5LoggerFactory.getLogger(MessageReceiver.class, "video");
private ReceivedMessageHandler handler;
private StatefulRedisPubSubConnection<String, String> connection;
GenericObjectPool<StatefulRedisPubSubConnection<String, String>> connectionPool;
private final Executor runExec = Executors.newSingleThreadExecutor();
private volatile boolean receiveMessage = false;
@ -34,23 +41,33 @@ public class MessageReceiver extends RedisAwareCommunicator {
redisClient = RedisClient.create(redisUri);
redisClient.setOptions(ClientOptions.builder().autoReconnect(true).build());
connection = redisClient.connectPubSub();
try {
if (receiveMessage) {
connection.addListener(new MessageListener());
connectionPool = ConnectionPoolSupport.createGenericObjectPool(() -> redisClient.connectPubSub(),
createPoolingConfig());
RedisPubSubAsyncCommands<String, String> async = connection.async();
RedisFuture<Void> future = async.subscribe(FROM_BBB_APPS_PATTERN);
Runnable messageReceiver = new Runnable() {
public void run() {
if (receiveMessage) {
try (StatefulRedisPubSubConnection<String, String> connection = connectionPool.borrowObject()) {
if (receiveMessage) {
connection.addListener(new MessageListener());
RedisPubSubAsyncCommands<String, String> async = connection.async();
RedisFuture<Void> future = async.subscribe(FROM_BBB_APPS_PATTERN);
}
} catch (Exception e) {
log.error("Error resubscribing to channels: ", e);
}
}
}
} catch (Exception e) {
log.error("Error resubscribing to channels: ", e);
}
};
runExec.execute(messageReceiver);
}
public void stop() {
receiveMessage = false;
connection.close();
connectionPool.close();
redisClient.shutdown();
log.info("MessageReceiver Stopped");
}
@ -69,7 +86,13 @@ public class MessageReceiver extends RedisAwareCommunicator {
@Override
public void message(String pattern, String channel, String message) {
log.debug("RECEIVED onPMessage" + channel + " message=\n" + message);
handler.handleMessage(pattern, channel, message);
Runnable task = new Runnable() {
public void run() {
handler.handleMessage(pattern, channel, message);
}
};
runExec.execute(task);
}
@Override

View File

@ -5,6 +5,7 @@ import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.bigbluebutton.common2.redis.RedisAwareCommunicator;
import org.red5.logging.Red5LoggerFactory;
import org.slf4j.Logger;
@ -13,39 +14,28 @@ import io.lettuce.core.ClientOptions;
import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisFuture;
import io.lettuce.core.RedisURI;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.async.RedisAsyncCommands;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import io.lettuce.core.support.ConnectionPoolSupport;
public class MessageSender extends RedisAwareCommunicator {
private static Logger log = Red5LoggerFactory.getLogger(MessageSender.class, "bigbluebutton");
private StatefulRedisConnection<String, String> connection;
GenericObjectPool<StatefulRedisPubSubConnection<String, String>> connectionPool;
private volatile boolean sendMessage = false;
private final Executor msgSenderExec = Executors.newSingleThreadExecutor();
private final Executor runExec = Executors.newSingleThreadExecutor();
private BlockingQueue<MessageToSend> messages = new LinkedBlockingQueue<MessageToSend>();
public void stop() {
sendMessage = false;
connectionPool.close();
redisClient.shutdown();
}
public void start() {
// GenericObjectPoolConfig config = new GenericObjectPoolConfig();
// config.setMaxTotal(32);
// config.setMaxIdle(8);
// config.setMinIdle(1);
// config.setTestOnBorrow(true);
// config.setTestOnReturn(true);
// config.setTestWhileIdle(true);
// config.setNumTestsPerEvictionRun(12);
// config.setMaxWaitMillis(5000);
// config.setTimeBetweenEvictionRunsMillis(60000);
// config.setBlockWhenExhausted(true);
// Set the name of this client to be able to distinguish when doing
// CLIENT LIST on redis-cli // "BbbRed5VoicePub"
RedisURI redisUri = RedisURI.Builder.redis(this.host, this.port).withClientName(this.clientName).build();
if (!this.password.isEmpty()) {
redisUri.setPassword(this.password);
@ -53,22 +43,31 @@ public class MessageSender extends RedisAwareCommunicator {
redisClient = RedisClient.create(redisUri);
redisClient.setOptions(ClientOptions.builder().autoReconnect(true).build());
connection = redisClient.connectPubSub();
connectionPool = ConnectionPoolSupport.createGenericObjectPool(() -> redisClient.connectPubSub(),
createPoolingConfig());
log.info("Redis org.bigbluebutton.red5.pubsub.message publisher starting!");
try {
sendMessage = true;
while (sendMessage) {
try {
MessageToSend msg = messages.take();
publish(msg.getChannel(), msg.getMessage());
} catch (InterruptedException e) {
log.warn("Failed to get org.bigbluebutton.common2.redis.pubsub from queue.");
Runnable messageSender = new Runnable() {
public void run() {
while (sendMessage) {
try {
MessageToSend msg = messages.take();
publish(msg.getChannel(), msg.getMessage());
} catch (InterruptedException e) {
log.warn("Failed to get org.bigbluebutton.common2.redis.pubsub from queue.");
}
}
}
}
};
msgSenderExec.execute(messageSender);
} catch (Exception e) {
log.error("Error subscribing to channels: " + e.getMessage());
}
}
public void send(String channel, String message) {
@ -77,11 +76,17 @@ public class MessageSender extends RedisAwareCommunicator {
}
private void publish(final String channel, final String message) {
try {
RedisAsyncCommands<String, String> async = connection.async();
RedisFuture<Long> future = async.publish(channel, message);
} catch (Exception e) {
log.warn("Cannot publish the org.bigbluebutton.red5.pubsub.message to redis", e);
}
Runnable task = new Runnable() {
public void run() {
try (StatefulRedisPubSubConnection<String, String> connection = connectionPool.borrowObject()) {
RedisAsyncCommands<String, String> async = connection.async();
RedisFuture<Long> future = async.publish(channel, message);
} catch (Exception e) {
log.warn("Cannot publish the org.bigbluebutton.red5.pubsub.message to redis", e);
}
}
};
runExec.execute(task);
}
}

View File

@ -9,7 +9,8 @@ import org.red5.logging.Red5LoggerFactory;
import org.slf4j.Logger;
public class ReceivedMessageHandler {
private static Logger log = Red5LoggerFactory.getLogger(ReceivedMessageHandler.class/*, "video"*/);
private static Logger log = Red5LoggerFactory
.getLogger(ReceivedMessageHandler.class/* , "video" */);
private BlockingQueue<ReceivedMessage> receivedMessages = new LinkedBlockingQueue<ReceivedMessage>();

View File

@ -21,9 +21,9 @@ sudo cp target/webapp/WEB-INF/lib/bbb-screenshare-akka_2.12-0.0.3.jar \
target/webapp/WEB-INF/lib/spring-webmvc-4.3.12.RELEASE.jar \
target/webapp/WEB-INF/lib/rediscala_2.12-1.8.0.jar \
target/webapp/WEB-INF/lib/bbb-common-message_2.12-0.0.20-SNAPSHOT.jar \
target/webapp/WEB-INF/lib/lettuce-core-5.1.2.RELEASE.jar \
target/webapp/WEB-INF/lib/lettuce-core-5.1.3.RELEASE.jar \
target/webapp/WEB-INF/lib/netty-* \
target/webapp/WEB-INF/lib/reactor-core-3.2.2.RELEASE.jar \
target/webapp/WEB-INF/lib/reactor-core-3.2.3.RELEASE.jar \
target/webapp/WEB-INF/lib/reactive-streams-1.0.2.jar \
/usr/share/red5/webapps/screenshare/WEB-INF/lib/

View File

@ -29,7 +29,7 @@ object Dependencies {
// Redis
val redisScala = "1.8.0"
val lettuce = "5.1.2.RELEASE"
val lettuce = "5.1.3.RELEASE"
// BigBlueButton
val bbbCommons = "0.0.20-SNAPSHOT"