- modify how we send and receive messages to/from redis

This commit is contained in:
Richard Alam 2014-04-30 21:45:53 +00:00
parent 75d908625c
commit 51e489a143
29 changed files with 477 additions and 386 deletions

View File

@ -26,33 +26,41 @@ with BigBlueButton; if not, see <http://www.gnu.org/licenses/>.
http://www.springframework.org/schema/util http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util-2.0.xsd http://www.springframework.org/schema/util/spring-util-2.0.xsd
"> ">
<bean id="redisStorageService" class="org.bigbluebutton.api.messaging.RedisStorageService">
<bean id="redisMessageSender" class="org.bigbluebutton.api.messaging.redis.MessageSender" <property name="redisPool"> <ref bean="redisPool"/></property>
</bean>
<bean id="messageSender" class="org.bigbluebutton.api.messaging.MessageSender"
init-method="start" destroy-method="stop"> init-method="start" destroy-method="stop">
<property name="redisPool"> <ref bean="redisPool"/></property> <property name="redisPool"> <ref bean="redisPool"/></property>
</bean> </bean>
<bean id="redisMessageReceiver" class="org.bigbluebutton.api.messaging.redis.MessageReceiver" <bean id="redisMessageReceiver" class="org.bigbluebutton.api.messaging.MessageReceiver"
init-method="start" destroy-method="stop"> init-method="start" destroy-method="stop">
<property name="redisPool"> <ref bean="redisPool"/></property> <property name="redisPool"> <ref bean="redisPool"/></property>
<property name="messageHandler"> <ref local="redisMessageHandler"/> </property> <property name="messageHandler"> <ref local="redisMessageHandler"/> </property>
</bean> </bean>
<bean id="redisMessageHandler" class="org.bigbluebutton.api.messaging.redis.ReceivedMessageHandler" <bean id="redisMessageHandler" class="org.bigbluebutton.api.messaging.ReceivedMessageHandler"
init-method="start" destroy-method="stop"> init-method="start" destroy-method="stop">
<property name="messageDistributor"><ref bean="redisMessageDistributor" /></property> <property name="messageDistributor"><ref bean="redisMessageDistributor" /></property>
</bean> </bean>
<bean id="redisMessageDistributor" class="org.bigbluebutton.api.messaging.redis.MessageDistributor"> <bean id="redisMessageDistributor" class="org.bigbluebutton.api.messaging.MessageDistributor">
<property name="messageHandler"> <ref local="redisMessageHandler"/> </property> <property name="messageHandler"> <ref local="redisMessageHandler"/> </property>
<property name="messageListeners">
<set>
<ref bean="meetingMessageHandler" />
</set>
</property>
</bean>
<bean id="meetingMessageHandler" class="org.bigbluebutton.api.messaging.MeetingMessageHandler">
<property name="messageListeners"> <property name="messageListeners">
<set> <set>
<ref bean="presentationMessageListener" /> <ref bean="meetingService" />
<ref bean="chatMessageListener" /> <ref bean="keepAliveService" />
<ref bean="meetingMessageHandler" />
<ref bean="pollMessageHandler" />
<ref bean="participantsListener" />
</set> </set>
</property> </property>
</bean> </bean>

View File

@ -31,11 +31,11 @@ with BigBlueButton; if not, see <http://www.gnu.org/licenses/>.
<constructor-arg index="0"> <constructor-arg index="0">
<bean factory-bean="config" factory-method="getConfig" /> <bean factory-bean="config" factory-method="getConfig" />
</constructor-arg> </constructor-arg>
<constructor-arg index="1" value="${redis.host}"/> <constructor-arg index="1" value="${redisHost}"/>
<constructor-arg index="2" value="${redis.port}"/> <constructor-arg index="2" value="${redisPort}"/>
</bean> </bean>
<bean id="config" class="org.bigbluebutton.conference.service.recorder.GenericObjectPoolConfigWrapper"> <bean id="config" class="org.bigbluebutton.api.messaging.GenericObjectPoolConfigWrapper">
<!-- Action to take when trying to acquire a connection and all connections are taken --> <!-- Action to take when trying to acquire a connection and all connections are taken -->
<property name="whenExhaustedAction"> <property name="whenExhaustedAction">
<!-- Fail-fast behaviour, we don't like to keep the kids waiting --> <!-- Fail-fast behaviour, we don't like to keep the kids waiting -->

46
bigbluebutton-web/grails-app/conf/spring/resources.xml Normal file → Executable file
View File

@ -26,47 +26,12 @@ with BigBlueButton; if not, see <http://www.gnu.org/licenses/>.
http://www.springframework.org/schema/util http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util-2.0.xsd http://www.springframework.org/schema/util/spring-util-2.0.xsd
"> ">
<bean id="config" class="org.bigbluebutton.api.messaging.GenericObjectPoolConfigWrapper">
<!-- Action to take when trying to acquire a connection and all connections are taken -->
<property name="whenExhaustedAction">
<!-- Fail-fast behaviour, we don't like to keep the kids waiting -->
<util:constant static-field="org.apache.commons.pool.impl.GenericObjectPool.WHEN_EXHAUSTED_FAIL" />
<!-- Default behaviour, block the caller until a resource becomes available -->
<!--<util:constant static-field="org.apache.commons.pool.impl.GenericObjectPool.WHEN_EXHAUSTED_BLOCK" />-->
</property>
<!-- Maximum active connections to Redis instance -->
<property name="maxActive" value="12" />
<!-- Number of connections to Redis that just sit there and do nothing -->
<property name="maxIdle" value="6" />
<!-- Minimum number of idle connections to Redis - these can be seen as always open and ready to serve -->
<property name="minIdle" value="1" />
<!-- Tests whether connection is dead when connection retrieval method is called -->
<property name="testOnBorrow" value="true" />
<!-- Tests whether connection is dead when returning a connection to the pool -->
<property name="testOnReturn" value="true" />
<!-- Tests whether connections are dead during idle periods -->
<property name="testWhileIdle" value="true" />
<!-- Maximum number of connections to test in each idle check -->
<property name="numTestsPerEvictionRun" value="12" />
<!-- Idle connection checking period -->
<property name="timeBetweenEvictionRunsMillis" value="60000" />
<!-- Maximum time, in milliseconds, to wait for a resource when exausted action is set to WHEN_EXAUSTED_BLOCK -->
<property name="maxWait" value="5000" />
</bean>
<bean id="messagingService" class="org.bigbluebutton.api.messaging.RedisMessagingService"> <bean id="messagingService" class="org.bigbluebutton.api.messaging.RedisMessagingService">
<property name="redisPool" ref="redisPool"/> <property name="messageSender" ref="messageSender"/>
<property name="redisStorageService" ref="redisStorageService"/>
</bean> </bean>
<bean id="redisPool" class="redis.clients.jedis.JedisPool">
<constructor-arg index="0">
<bean factory-bean="config" factory-method="getConfig" />
</constructor-arg>
<constructor-arg index="1" value="${redisHost}"/>
<constructor-arg index="2" value="${redisPort}"/>
</bean>
<bean id="expiredMeetingCleanupTimerTask" class="org.bigbluebutton.web.services.ExpiredMeetingCleanupTimerTask"/> <bean id="expiredMeetingCleanupTimerTask" class="org.bigbluebutton.web.services.ExpiredMeetingCleanupTimerTask"/>
<bean id="keepAliveService" class="org.bigbluebutton.web.services.KeepAliveService" <bean id="keepAliveService" class="org.bigbluebutton.web.services.KeepAliveService"
@ -75,12 +40,11 @@ with BigBlueButton; if not, see <http://www.gnu.org/licenses/>.
<property name="messagingService" ref="messagingService" /> <property name="messagingService" ref="messagingService" />
</bean> </bean>
<bean id="meetingService" class="org.bigbluebutton.api.MeetingService"> <bean id="meetingService" class="org.bigbluebutton.api.MeetingService" init-method="start" destroy-method="stop">
<property name="defaultMeetingExpireDuration" value="${defaultMeetingExpireDuration}"/> <property name="defaultMeetingExpireDuration" value="${defaultMeetingExpireDuration}"/>
<property name="defaultMeetingCreateJoinDuration" value="${defaultMeetingCreateJoinDuration}"/> <property name="defaultMeetingCreateJoinDuration" value="${defaultMeetingCreateJoinDuration}"/>
<property name="removeMeetingWhenEnded" value="${removeMeetingWhenEnded}"/> <property name="removeMeetingWhenEnded" value="${removeMeetingWhenEnded}"/>
<property name="expiredMeetingCleanupTimerTask" ref="expiredMeetingCleanupTimerTask"/> <property name="expiredMeetingCleanupTimerTask" ref="expiredMeetingCleanupTimerTask"/>
<property name="keepAliveService" ref="keepAliveService"/>
<property name="messagingService" ref="messagingService"/> <property name="messagingService" ref="messagingService"/>
<property name="recordingService" ref="recordingService"/> <property name="recordingService" ref="recordingService"/>
</bean> </bean>
@ -115,4 +79,6 @@ with BigBlueButton; if not, see <http://www.gnu.org/licenses/>.
</bean> </bean>
<import resource="doc-conversion.xml" /> <import resource="doc-conversion.xml" />
<import resource="bbb-redis-pool.xml" />
<import resource="bbb-redis-messaging.xml" />
</beans> </beans>

View File

@ -21,24 +21,42 @@ package org.bigbluebutton.api;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.*; import java.util.*;
import org.apache.commons.lang.RandomStringUtils;
import org.bigbluebutton.api.domain.Meeting; import org.bigbluebutton.api.domain.Meeting;
import org.bigbluebutton.api.domain.Playback; import org.bigbluebutton.api.domain.Playback;
import org.bigbluebutton.api.domain.Recording; import org.bigbluebutton.api.domain.Recording;
import org.bigbluebutton.api.domain.User; import org.bigbluebutton.api.domain.User;
import org.bigbluebutton.api.domain.UserSession; import org.bigbluebutton.api.domain.UserSession;
import org.bigbluebutton.api.messaging.MessageListener; import org.bigbluebutton.api.messaging.MessageListener;
import org.bigbluebutton.api.messaging.MessagingConstants;
import org.bigbluebutton.api.messaging.MessagingService; import org.bigbluebutton.api.messaging.MessagingService;
import org.bigbluebutton.api.messaging.ReceivedMessage;
import org.bigbluebutton.api.messaging.messages.IMessage;
import org.bigbluebutton.api.messaging.messages.MeetingDestroyed;
import org.bigbluebutton.api.messaging.messages.MeetingEnded;
import org.bigbluebutton.api.messaging.messages.MeetingStarted;
import org.bigbluebutton.api.messaging.messages.UserJoined;
import org.bigbluebutton.api.messaging.messages.UserLeft;
import org.bigbluebutton.api.messaging.messages.UserStatusChanged;
import org.bigbluebutton.web.services.ExpiredMeetingCleanupTimerTask; import org.bigbluebutton.web.services.ExpiredMeetingCleanupTimerTask;
import org.bigbluebutton.web.services.KeepAliveService; import org.bigbluebutton.web.services.KeepAliveService;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
public class MeetingService { public class MeetingService implements MessageListener {
private static Logger log = LoggerFactory.getLogger(MeetingService.class); private static Logger log = LoggerFactory.getLogger(MeetingService.class);
private BlockingQueue<IMessage> receivedMessages = new LinkedBlockingQueue<IMessage>();
private volatile boolean processMessage = false;
private final Executor msgProcessorExec = Executors.newSingleThreadExecutor();
private final ConcurrentMap<String, Meeting> meetings; private final ConcurrentMap<String, Meeting> meetings;
private final ConcurrentMap<String, UserSession> sessions; private final ConcurrentMap<String, UserSession> sessions;
@ -49,8 +67,7 @@ public class MeetingService {
private MessagingService messagingService; private MessagingService messagingService;
private ExpiredMeetingCleanupTimerTask cleaner; private ExpiredMeetingCleanupTimerTask cleaner;
private boolean removeMeetingWhenEnded = false; private boolean removeMeetingWhenEnded = false;
private KeepAliveService keepAliveService;
public MeetingService() { public MeetingService() {
meetings = new ConcurrentHashMap<String, Meeting>(); meetings = new ConcurrentHashMap<String, Meeting>();
sessions = new ConcurrentHashMap<String, UserSession>(); sessions = new ConcurrentHashMap<String, UserSession>();
@ -294,8 +311,6 @@ public class MeetingService {
public void setMessagingService(MessagingService mess) { public void setMessagingService(MessagingService mess) {
messagingService = mess; messagingService = mess;
messagingService.addListener(new MeetingMessageListener());
messagingService.start();
} }
public void setExpiredMeetingCleanupTimerTask(ExpiredMeetingCleanupTimerTask c) { public void setExpiredMeetingCleanupTimerTask(ExpiredMeetingCleanupTimerTask c) {
@ -304,92 +319,123 @@ public class MeetingService {
cleaner.start(); cleaner.start();
} }
public void setKeepAliveService(KeepAliveService keepAlive){ private void meetingStarted(MeetingStarted message) {
this.keepAliveService = keepAlive; Meeting m = getMeeting(message.meetingId);
} if (m != null) {
if (m.getStartTime() == 0) {
/**
* Class that listens for messages from bbb-apps.
* @author Richard Alam
*
*/
private class MeetingMessageListener implements MessageListener {
@Override
public void meetingStarted(String meetingId) {
Meeting m = getMeeting(meetingId);
if (m != null) {
if (m.getStartTime() == 0) {
long now = System.currentTimeMillis();
log.info("Meeting [{}] has started on [{}]", meetingId, now);
m.setStartTime(now);
} else {
log.debug("The meeting [{}] has been started again...", meetingId);
}
m.setEndTime(0);
return;
}
log.warn("The meeting [{}] doesn't exist", meetingId);
}
@Override
public void meetingEnded(String meetingId) {
Meeting m = getMeeting(meetingId);
if (m != null) {
long now = System.currentTimeMillis(); long now = System.currentTimeMillis();
log.debug("Meeting [{}] end time [{}].", meetingId, now); log.info("Meeting [{}] has started on [{}]", message.meetingId, now);
m.setEndTime(now); m.setStartTime(now);
return; } else {
log.debug("The meeting [{}] has been started again...", message.meetingId);
} }
log.warn("The meeting " + meetingId + " doesn't exist"); m.setEndTime(0);
return;
} }
log.warn("The meeting [{}] doesn't exist", message.meetingId);
}
@Override private void meetingEnded(MeetingEnded message) {
public void userJoined(String meetingId, String internalUserId, String externalUserId, String name, String role) { Meeting m = getMeeting(message.meetingId);
Meeting m = getMeeting(meetingId); if (m != null) {
if (m != null) { long now = System.currentTimeMillis();
User user = new User(internalUserId, externalUserId, name, role); log.debug("Meeting [{}] end time [{}].", message.meetingId, now);
m.userJoined(user); m.setEndTime(now);
log.debug("New user in meeting " + meetingId + ":" + user.getFullname()); return;
return;
}
log.warn("The meeting " + meetingId + " doesn't exist");
} }
log.warn("The meeting " + message.meetingId + " doesn't exist");
}
@Override public void userJoined(UserJoined message) {
public void userLeft(String meetingId, String internalUserId) { Meeting m = getMeeting(message.meetingId);
Meeting m = getMeeting(meetingId); if (m != null) {
if (m != null) { User user = new User(message.userId, message.externalUserId, message.name, message.role);
User user = m.userLeft(internalUserId); m.userJoined(user);
if(user != null){ log.debug("New user in meeting " + message.meetingId + ":" + user.getFullname());
log.debug("User removed from meeting " + meetingId + ":" + user.getFullname()); return;
return; }
} log.warn("The meeting " + message.meetingId + " doesn't exist");
log.warn("The participant " + internalUserId + " doesn't exist in the meeting " + meetingId); }
private void userLeft(UserLeft message) {
Meeting m = getMeeting(message.meetingId);
if (m != null) {
User user = m.userLeft(message.userId);
if(user != null){
log.debug("User removed from meeting " + message.meetingId + ":" + user.getFullname());
return; return;
} }
log.warn("The meeting " + meetingId + " doesn't exist"); log.warn("The participant " + message.userId + " doesn't exist in the meeting " + message.meetingId);
return;
} }
log.warn("The meeting " + message.meetingId + " doesn't exist");
}
@Override private void updatedStatus(UserStatusChanged message) {
public void updatedStatus(String meetingId, String internalUserId, String status, String value) { Meeting m = getMeeting(message.meetingId);
Meeting m = getMeeting(meetingId); if (m != null) {
if (m != null) { User user = m.getUserById(message.userId);
User user = m.getUserById(internalUserId); if(user != null){
if(user != null){ user.setStatus(message.status, message.value);
user.setStatus(status, value); log.debug("Setting new status value in meeting " + message.meetingId + " for participant:"+user.getFullname());
log.debug("Setting new status value in meeting " + meetingId + " for participant:"+user.getFullname());
return;
}
log.warn("The participant " + internalUserId + " doesn't exist in the meeting " + meetingId);
return; return;
} }
log.warn("The meeting " + meetingId + " doesn't exist"); log.warn("The participant " + message.userId + " doesn't exist in the meeting " + message.meetingId);
return;
} }
log.warn("The meeting " + message.meetingId + " doesn't exist");
}
@Override private void processMessage(IMessage message) {
public void keepAliveReply(String aliveId){ if (message instanceof MeetingDestroyed) {
keepAliveService.keepAliveReply(aliveId);
} else if (message instanceof MeetingStarted) {
meetingStarted((MeetingStarted)message);
} else if (message instanceof MeetingEnded) {
meetingEnded((MeetingEnded)message);
} else if (message instanceof UserJoined) {
userJoined((UserJoined)message);
} else if (message instanceof UserLeft) {
userLeft((UserLeft)message);
} else if (message instanceof UserStatusChanged) {
updatedStatus((UserStatusChanged)message);
}
}
@Override
public void handle(IMessage message) {
try {
receivedMessages.offer(message, 5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public void start() {
try {
processMessage = true;
Runnable messageReceiver = new Runnable() {
public void run() {
if (processMessage) {
try {
IMessage msg = receivedMessages.take();
processMessage(msg);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
};
msgProcessorExec.execute(messageReceiver);
} catch (Exception e) {
log.error("Error PRocessing Message");
} }
} }
public void stop() {
processMessage = false;
}
} }

View File

@ -0,0 +1,82 @@
package org.bigbluebutton.api.messaging;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.bigbluebutton.api.messaging.messages.KeepAliveReply;
import org.bigbluebutton.api.messaging.messages.MeetingDestroyed;
import org.bigbluebutton.api.messaging.messages.MeetingEnded;
import org.bigbluebutton.api.messaging.messages.MeetingStarted;
import org.bigbluebutton.api.messaging.messages.UserJoined;
import org.bigbluebutton.api.messaging.messages.UserLeft;
import org.bigbluebutton.api.messaging.messages.UserStatusChanged;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
public class MeetingMessageHandler implements MessageHandler {
private static Logger log = LoggerFactory.getLogger(MeetingMessageHandler.class);
private Set<MessageListener> listeners;
public void setMessageListeners(Set<MessageListener> listeners) {
this.listeners = listeners;
}
public void handleMessage(String pattern, String channel, String message) {
Gson gson = new Gson();
if (channel.equalsIgnoreCase(MessagingConstants.SYSTEM_CHANNEL)) {
HashMap<String,String> map = gson.fromJson(message, new TypeToken<Map<String, String>>() {}.getType());
String messageId = map.get("messageID");
for (MessageListener listener : listeners) {
if(MessagingConstants.MEETING_STARTED_EVENT.equalsIgnoreCase(messageId)) {
String meetingId = map.get("meetingID");
listener.handle(new MeetingStarted(meetingId));
} else if(MessagingConstants.MEETING_ENDED_EVENT.equalsIgnoreCase(messageId)) {
String meetingId = map.get("meetingID");
listener.handle(new MeetingEnded(meetingId));
} else if(MessagingConstants.KEEP_ALIVE_REPLY_EVENT.equalsIgnoreCase(messageId)){
String pongId = map.get("aliveID");
listener.handle(new KeepAliveReply(pongId));
} else if (MessagingConstants.MEETING_DESTROYED_EVENT.equalsIgnoreCase(messageId)) {
String meetingId = map.get("meetingID");
log.info("Received a meeting destroyed message for meeting id=[{}]", meetingId);
listener.handle(new MeetingDestroyed(meetingId));
}
}
} else if (channel.equalsIgnoreCase(MessagingConstants.PARTICIPANTS_CHANNEL)) {
HashMap<String,String> map = gson.fromJson(message, new TypeToken<Map<String, String>>() {}.getType());
String meetingId = map.get("meetingID");
String messageId = map.get("messageID");
if (MessagingConstants.USER_JOINED_EVENT.equalsIgnoreCase(messageId)){
String userId = map.get("internalUserID");
String externalUserId = map.get("externalUserID");
String name = map.get("fullname");
String role = map.get("role");
for (MessageListener listener : listeners) {
listener.handle(new UserJoined(meetingId, userId, externalUserId, name, role));
}
} else if(MessagingConstants.USER_STATUS_CHANGE_EVENT.equalsIgnoreCase(messageId)){
String userId = map.get("internalUserID");
String status = map.get("status");
String value = map.get("value");
for (MessageListener listener : listeners) {
listener.handle(new UserStatusChanged(meetingId, userId, status, value));
}
} else if(MessagingConstants.USER_LEFT_EVENT.equalsIgnoreCase(messageId)){
String userId = map.get("internalUserID");
for (MessageListener listener : listeners) {
listener.handle(new UserLeft(meetingId, userId));
}
}
}
}
}

View File

@ -19,11 +19,8 @@
package org.bigbluebutton.api.messaging; package org.bigbluebutton.api.messaging;
import org.bigbluebutton.api.messaging.messages.IMessage;
public interface MessageListener { public interface MessageListener {
void meetingStarted(String meetingId); void handle(IMessage message);
void meetingEnded(String meetingId);
void userJoined(String meetingId, String internalUserId, String externalUserId, String name, String role);
void userLeft(String meetingId, String internalUserId);
void updatedStatus(String meetingId, String internalUserId, String status, String value);
void keepAliveReply(String aliveId);
} }

View File

@ -23,17 +23,13 @@ import java.util.List;
import java.util.Map; import java.util.Map;
public interface MessagingService { public interface MessagingService {
public void start(); void recordMeetingInfo(String meetingId, Map<String, String> info);
public void stop(); void destroyMeeting(String meetingID);
public void recordMeetingInfo(String meetingId, Map<String, String> info); void createMeeting(String meetingID, String meetingName, Boolean recorded, String voiceBridge, Long duration);
public void destroyMeeting(String meetingID); void endMeeting(String meetingId);
public void createMeeting(String meetingID, String meetingName, Boolean recorded, String voiceBridge, Long duration); void send(String channel, String message);
public void endMeeting(String meetingId); void sendPolls(String meetingId, String title, String question, String questionType, List<String> answers);
public void send(String channel, String message); String storeSubscription(String meetingId, String externalMeetingID, String callbackURL);
public void addListener(MessageListener listener); boolean removeSubscription(String meetingId, String subscriptionId);
public void removeListener(MessageListener listener); List<Map<String,String>> listSubscriptions(String meetingId);
public void sendPolls(String meetingId, String title, String question, String questionType, List<String> answers);
public String storeSubscription(String meetingId, String externalMeetingID, String callbackURL);
public boolean removeSubscription(String meetingId, String subscriptionId);
public List<Map<String,String>> listSubscriptions(String meetingId);
} }

View File

@ -8,7 +8,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
public class ReceivedMessageHandler { public class ReceivedMessageHandler {
private static Logger log = LoggerFactory.getLogger(MessageReceiver.class); private static Logger log = LoggerFactory.getLogger(ReceivedMessageHandler.class);
private BlockingQueue<ReceivedMessage> receivedMessages = new LinkedBlockingQueue<ReceivedMessage>(); private BlockingQueue<ReceivedMessage> receivedMessages = new LinkedBlockingQueue<ReceivedMessage>();

View File

@ -39,35 +39,11 @@ import redis.clients.jedis.JedisPubSub;
public class RedisMessagingService implements MessagingService { public class RedisMessagingService implements MessagingService {
private static Logger log = LoggerFactory.getLogger(RedisMessagingService.class); private static Logger log = LoggerFactory.getLogger(RedisMessagingService.class);
private JedisPool redisPool; private RedisStorageService storeService;
private final Set<MessageListener> listeners = new HashSet<MessageListener>(); private MessageSender sender;
private final Executor exec = Executors.newSingleThreadExecutor();
private Runnable pubsubListener;
@Override
public void addListener(MessageListener listener) {
listeners.add(listener);
}
public void removeListener(MessageListener listener) {
listeners.remove(listener);
}
public void recordMeetingInfo(String meetingId, Map<String, String> info) { public void recordMeetingInfo(String meetingId, Map<String, String> info) {
Jedis jedis = redisPool.getResource(); storeService.recordMeetingInfo(meetingId, info);
try {
for (String key: info.keySet()) {
log.debug("Storing metadata {} = {}", key, info.get(key));
}
log.debug("Saving metadata in {}", meetingId);
jedis.hmset("meeting:info:" + meetingId, info);
} catch (Exception e){
log.warn("Cannot record the info meeting:"+meetingId,e);
} finally {
redisPool.returnResource(jedis);
}
} }
public void destroyMeeting(String meetingID) { public void destroyMeeting(String meetingID) {
@ -76,7 +52,7 @@ public class RedisMessagingService implements MessagingService {
map.put("meetingID", meetingID); map.put("meetingID", meetingID);
Gson gson = new Gson(); Gson gson = new Gson();
log.info("Sending destroy meeting [{}]", meetingID); log.info("Sending destroy meeting [{}]", meetingID);
send(MessagingConstants.SYSTEM_CHANNEL, gson.toJson(map)); sender.send(MessagingConstants.SYSTEM_CHANNEL, gson.toJson(map));
} }
public void createMeeting(String meetingID, String meetingName, Boolean recorded, String voiceBridge, Long duration) { public void createMeeting(String meetingID, String meetingName, Boolean recorded, String voiceBridge, Long duration) {
@ -90,7 +66,7 @@ public class RedisMessagingService implements MessagingService {
Gson gson = new Gson(); Gson gson = new Gson();
log.info("Sending create meeting [{}] - [{}]", meetingID, voiceBridge); log.info("Sending create meeting [{}] - [{}]", meetingID, voiceBridge);
send(MessagingConstants.SYSTEM_CHANNEL, gson.toJson(map)); sender.send(MessagingConstants.SYSTEM_CHANNEL, gson.toJson(map));
} }
public void endMeeting(String meetingId) { public void endMeeting(String meetingId) {
@ -99,21 +75,13 @@ public class RedisMessagingService implements MessagingService {
map.put("meetingId", meetingId); map.put("meetingId", meetingId);
Gson gson = new Gson(); Gson gson = new Gson();
log.info("Sending end meeting [{}]", meetingId); log.info("Sending end meeting [{}]", meetingId);
send(MessagingConstants.SYSTEM_CHANNEL, gson.toJson(map)); sender.send(MessagingConstants.SYSTEM_CHANNEL, gson.toJson(map));
}
public void send(String channel, String message) {
Jedis jedis = redisPool.getResource();
try {
log.debug("Sending to channel[" + channel + "] message[" + message + "]");
jedis.publish(channel, message);
} catch(Exception e){
log.warn("Cannot publish the message to redis",e);
}finally{
redisPool.returnResource(jedis);
}
} }
public void send(String channel, String message) {
sender.send(channel, message);
}
public void sendPolls(String meetingId, String title, String question, String questionType, List<String> answers){ public void sendPolls(String meetingId, String title, String question, String questionType, List<String> answers){
Gson gson = new Gson(); Gson gson = new Gson();
@ -127,199 +95,31 @@ public class RedisMessagingService implements MessagingService {
System.out.println(gson.toJson(map)); System.out.println(gson.toJson(map));
send(MessagingConstants.POLLING_CHANNEL, gson.toJson(map)); sender.send(MessagingConstants.POLLING_CHANNEL, gson.toJson(map));
} }
public void setMessageSender(MessageSender sender) {
this.sender = sender;
}
public void setRedisStorageService(RedisStorageService storeService) {
this.storeService = storeService;
}
public String storeSubscription(String meetingId, String externalMeetingID, String callbackURL){ public String storeSubscription(String meetingId, String externalMeetingID, String callbackURL){
String sid = ""; return storeService.storeSubscription(meetingId, externalMeetingID, callbackURL);
Jedis jedis = redisPool.getResource();
try {
sid = Long.toString(jedis.incr("meeting:" + meetingId + ":nextSubscription"));
HashMap<String,String> props = new HashMap<String,String>();
props.put("subscriptionID", sid);
props.put("meetingId", meetingId);
props.put("externalMeetingID", externalMeetingID);
props.put("callbackURL", callbackURL);
props.put("active", "true");
jedis.hmset("meeting:" + meetingId + ":subscription:" + sid, props);
jedis.rpush("meeting:" + meetingId + ":subscriptions", sid);
} catch (Exception e){
log.warn("Cannot store subscription:" + meetingId, e);
} finally {
redisPool.returnResource(jedis);
}
return sid;
} }
public boolean removeSubscription(String meetingId, String subscriptionId){ public boolean removeSubscription(String meetingId, String subscriptionId){
boolean unsubscribed = true; return storeService.removeSubscription(meetingId, subscriptionId);
Jedis jedis = redisPool.getResource();
try {
jedis.hset("meeting:" + meetingId + ":subscription:" + subscriptionId, "active", "false");
} catch (Exception e){
log.warn("Cannot rmove subscription:" + meetingId, e);
unsubscribed = false;
} finally {
redisPool.returnResource(jedis);
}
return unsubscribed;
} }
public List<Map<String,String>> listSubscriptions(String meetingId){ public List<Map<String,String>> listSubscriptions(String meetingId){
List<Map<String,String>> list = new ArrayList<Map<String,String>>(); return storeService.listSubscriptions(meetingId);
Jedis jedis = redisPool.getResource();
try {
List<String> sids = jedis.lrange("meeting:" + meetingId + ":subscriptions", 0 , -1);
for(int i=0; i<sids.size(); i++){
Map<String,String> props = jedis.hgetAll("meeting:" + meetingId + ":subscription:" + sids.get(i));
list.add(props);
}
} catch (Exception e){
log.warn("Cannot list subscriptions:" + meetingId, e);
} finally {
redisPool.returnResource(jedis);
}
return list;
} }
public void start() {
log.debug("Starting redis pubsub...");
final Jedis jedis = redisPool.getResource();
try {
pubsubListener = new Runnable() {
public void run() {
jedis.psubscribe(new PubSubListener(), MessagingConstants.BIGBLUEBUTTON_PATTERN);
}
};
exec.execute(pubsubListener);
} catch (Exception e) {
log.error("Error in subscribe: " + e.getMessage());
}
}
public void stop() {
try {
redisPool.destroy();
} catch (Exception e) {
e.printStackTrace();
}
}
public void setRedisPool(JedisPool redisPool){
this.redisPool=redisPool;
}
private class PubSubListener extends JedisPubSub {
public PubSubListener() {
super();
}
@Override
public void onMessage(String channel, String message) {
// Not used.
}
@Override
public void onPMessage(String pattern, String channel, String message) {
// log.debug("Received in channel [{}] message [{}]", channel, message);
Gson gson = new Gson();
// for (String key: map.keySet()) {
// log.debug("rx: {} = {}", key, map.get(key));
// }
if (channel.equalsIgnoreCase(MessagingConstants.SYSTEM_CHANNEL)){
HashMap<String,String> map = gson.fromJson(message, new TypeToken<Map<String, String>>() {}.getType());
String messageId = map.get("messageID");
for (MessageListener listener : listeners) {
if(MessagingConstants.MEETING_STARTED_EVENT.equalsIgnoreCase(messageId)) {
String meetingId = map.get("meetingID");
listener.meetingStarted(meetingId);
} else if(MessagingConstants.MEETING_ENDED_EVENT.equalsIgnoreCase(messageId)) {
String meetingId = map.get("meetingID");
listener.meetingEnded(meetingId);
} else if(MessagingConstants.KEEP_ALIVE_REPLY_EVENT.equalsIgnoreCase(messageId)){
String aliveId = map.get("aliveID");
listener.keepAliveReply(aliveId);
} else if (MessagingConstants.MEETING_DESTROYED_EVENT.equalsIgnoreCase(messageId)) {
String meetingId = map.get("meetingID");
log.info("Received a meeting destroyed message for meeting id=[{}]", meetingId);
}
}
} else if(channel.equalsIgnoreCase(MessagingConstants.PARTICIPANTS_CHANNEL)){
HashMap<String,String> map = gson.fromJson(message, new TypeToken<Map<String, String>>() {}.getType());
String meetingId = map.get("meetingID");
String messageId = map.get("messageID");
if(MessagingConstants.USER_JOINED_EVENT.equalsIgnoreCase(messageId)){
String internalUserId = map.get("internalUserID");
String externalUserId = map.get("externalUserID");
String fullname = map.get("fullname");
String role = map.get("role");
for (MessageListener listener : listeners) {
listener.userJoined(meetingId, internalUserId, externalUserId, fullname, role);
}
} else if(MessagingConstants.USER_STATUS_CHANGE_EVENT.equalsIgnoreCase(messageId)){
String internalUserId = map.get("internalUserID");
String status = map.get("status");
String value = map.get("value");
for (MessageListener listener : listeners) {
listener.updatedStatus(meetingId, internalUserId, status, value);
}
} else if(MessagingConstants.USER_LEFT_EVENT.equalsIgnoreCase(messageId)){
String internalUserId = map.get("internalUserID");
for (MessageListener listener : listeners) {
listener.userLeft(meetingId, internalUserId);
}
}
}
}
@Override
public void onPSubscribe(String pattern, int subscribedChannels) {
log.debug("Subscribed to the pattern:"+pattern);
}
@Override
public void onPUnsubscribe(String pattern, int subscribedChannels) {
// Not used.
}
@Override
public void onSubscribe(String channel, int subscribedChannels) {
// Not used.
}
@Override
public void onUnsubscribe(String channel, int subscribedChannels) {
// Not used.
}
}
public void removeMeeting(String meetingId){ public void removeMeeting(String meetingId){
Jedis jedis = redisPool.getResource(); storeService.removeMeeting(meetingId);
try {
jedis.del("meeting-" + meetingId);
//jedis.hmset("meeting"+ COLON +"info" + COLON + meetingId, metadata);
jedis.srem("meetings", meetingId);
} finally {
redisPool.returnResource(jedis);
}
} }
} }

View File

@ -1,5 +1,105 @@
package org.bigbluebutton.api.messaging; package org.bigbluebutton.api.messaging;
public class RedisStorageService { import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
public class RedisStorageService {
private static Logger log = LoggerFactory.getLogger(RedisStorageService.class);
private JedisPool redisPool;
public void recordMeetingInfo(String meetingId, Map<String, String> info) {
Jedis jedis = redisPool.getResource();
try {
for (String key: info.keySet()) {
log.debug("Storing metadata {} = {}", key, info.get(key));
}
log.debug("Saving metadata in {}", meetingId);
jedis.hmset("meeting:info:" + meetingId, info);
} catch (Exception e){
log.warn("Cannot record the info meeting:"+meetingId,e);
} finally {
redisPool.returnResource(jedis);
}
}
public void removeMeeting(String meetingId){
Jedis jedis = redisPool.getResource();
try {
jedis.del("meeting-" + meetingId);
jedis.srem("meetings", meetingId);
} finally {
redisPool.returnResource(jedis);
}
}
public List<Map<String,String>> listSubscriptions(String meetingId){
List<Map<String,String>> list = new ArrayList<Map<String,String>>();
Jedis jedis = redisPool.getResource();
try {
List<String> sids = jedis.lrange("meeting:" + meetingId + ":subscriptions", 0 , -1);
for(int i=0; i<sids.size(); i++){
Map<String,String> props = jedis.hgetAll("meeting:" + meetingId + ":subscription:" + sids.get(i));
list.add(props);
}
} catch (Exception e){
log.warn("Cannot list subscriptions:" + meetingId, e);
} finally {
redisPool.returnResource(jedis);
}
return list;
}
public boolean removeSubscription(String meetingId, String subscriptionId){
boolean unsubscribed = true;
Jedis jedis = redisPool.getResource();
try {
jedis.hset("meeting:" + meetingId + ":subscription:" + subscriptionId, "active", "false");
} catch (Exception e){
log.warn("Cannot rmove subscription:" + meetingId, e);
unsubscribed = false;
} finally {
redisPool.returnResource(jedis);
}
return unsubscribed;
}
public String storeSubscription(String meetingId, String externalMeetingID, String callbackURL){
String sid = "";
Jedis jedis = redisPool.getResource();
try {
sid = Long.toString(jedis.incr("meeting:" + meetingId + ":nextSubscription"));
HashMap<String,String> props = new HashMap<String,String>();
props.put("subscriptionID", sid);
props.put("meetingId", meetingId);
props.put("externalMeetingID", externalMeetingID);
props.put("callbackURL", callbackURL);
props.put("active", "true");
jedis.hmset("meeting:" + meetingId + ":subscription:" + sid, props);
jedis.rpush("meeting:" + meetingId + ":subscriptions", sid);
} catch (Exception e){
log.warn("Cannot store subscription:" + meetingId, e);
} finally {
redisPool.returnResource(jedis);
}
return sid;
}
public void setRedisPool(JedisPool redisPool){
this.redisPool=redisPool;
}
} }

View File

@ -0,0 +1,5 @@
package org.bigbluebutton.api.messaging.messages;
public interface IMessage {
}

View File

@ -0,0 +1,9 @@
package org.bigbluebutton.api.messaging.messages;
public class KeepAliveReply implements IMessage {
public final String pongId;
public KeepAliveReply(String pongId) {
this.pongId = pongId;
}
}

View File

@ -0,0 +1,9 @@
package org.bigbluebutton.api.messaging.messages;
public class MeetingDestroyed implements IMessage {
public final String meetingId;
public MeetingDestroyed(String meetingId) {
this.meetingId = meetingId;
}
}

View File

@ -0,0 +1,9 @@
package org.bigbluebutton.api.messaging.messages;
public class MeetingEnded implements IMessage {
public final String meetingId;
public MeetingEnded(String meetingId) {
this.meetingId = meetingId;
}
}

View File

@ -0,0 +1,9 @@
package org.bigbluebutton.api.messaging.messages;
public class MeetingStarted implements IMessage {
public final String meetingId;
public MeetingStarted(String meetingId) {
this.meetingId = meetingId;
}
}

View File

@ -0,0 +1,17 @@
package org.bigbluebutton.api.messaging.messages;
public class UserJoined implements IMessage {
public final String meetingId;
public final String userId;
public final String externalUserId;
public final String name;
public final String role;
public UserJoined(String meetingId, String userId, String externalUserId, String name, String role) {
this.meetingId = meetingId;
this.userId = userId;
this.externalUserId = externalUserId;
this.name = name;
this.role = role;
}
}

View File

@ -0,0 +1,11 @@
package org.bigbluebutton.api.messaging.messages;
public class UserLeft implements IMessage {
public final String userId;
public final String meetingId;
public UserLeft(String meetingId, String userId) {
this.meetingId = meetingId;
this.userId = userId;
}
}

View File

@ -0,0 +1,15 @@
package org.bigbluebutton.api.messaging.messages;
public class UserStatusChanged implements IMessage {
public final String meetingId;
public final String userId;
public final String status;
public final String value;
public UserStatusChanged(String meetingId, String userId, String status, String value) {
this.meetingId = meetingId;
this.userId = userId;
this.status = status;
this.value = value;
}
}

View File

@ -19,9 +19,13 @@
package org.bigbluebutton.web.services; package org.bigbluebutton.web.services;
import org.bigbluebutton.api.messaging.MessageListener;
import org.bigbluebutton.api.messaging.MessagingService; import org.bigbluebutton.api.messaging.MessagingService;
import org.bigbluebutton.api.messaging.MessagingConstants; import org.bigbluebutton.api.messaging.MessagingConstants;
import org.bigbluebutton.api.messaging.RedisMessagingService; import org.bigbluebutton.api.messaging.RedisMessagingService;
import org.bigbluebutton.api.messaging.messages.IMessage;
import org.bigbluebutton.api.messaging.messages.KeepAliveReply;
import org.bigbluebutton.api.messaging.messages.MeetingDestroyed;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.Timer; import java.util.Timer;
@ -35,7 +39,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import com.google.gson.Gson; import com.google.gson.Gson;
public class KeepAliveService { public class KeepAliveService implements MessageListener {
private static Logger log = LoggerFactory.getLogger(KeepAliveService.class); private static Logger log = LoggerFactory.getLogger(KeepAliveService.class);
private final String KEEP_ALIVE_REQUEST = "KEEP_ALIVE_REQUEST"; private final String KEEP_ALIVE_REQUEST = "KEEP_ALIVE_REQUEST";
private MessagingService service; private MessagingService service;
@ -81,12 +85,6 @@ public class KeepAliveService {
} }
} }
public void keepAliveReply(String aliveId) {
log.debug("Received keep alive msg reply from bbb-apps. id [{}]", aliveId);
KeepAlivePong pong = new KeepAlivePong(aliveId);
queueMessage(pong);
}
public boolean isDown(){ public boolean isDown(){
return !available; return !available;
} }
@ -167,4 +165,18 @@ public class KeepAliveService {
log.info("Received invalid keep alive response from bbb-apps:" + msg.getId()); log.info("Received invalid keep alive response from bbb-apps:" + msg.getId());
} }
} }
private void keepAliveReply(String aliveId) {
log.debug("Received keep alive msg reply from bbb-apps. id [{}]", aliveId);
KeepAlivePong pong = new KeepAlivePong(aliveId);
queueMessage(pong);
}
@Override
public void handle(IMessage message) {
if (message instanceof KeepAliveReply) {
KeepAliveReply msg = (KeepAliveReply) message;
keepAliveReply(msg.pongId);
}
}
} }