From 51e489a1430cba40221449334135f60b09a70fe2 Mon Sep 17 00:00:00 2001 From: Richard Alam Date: Wed, 30 Apr 2014 21:45:53 +0000 Subject: [PATCH] - modify how we send and receive messages to/from redis --- .../conf/spring/bbb-redis-messaging.xml | 34 ++- .../grails-app/conf/spring/bbb-redis-pool.xml | 6 +- .../grails-app/conf/spring/resources.xml | 46 +--- .../org/bigbluebutton/api/MeetingService.java | 200 ++++++++------ .../api/messaging/MeetingMessageHandler.java | 82 ++++++ .../api/messaging/MessageDistributor.java | 0 .../api/messaging/MessageHandler.java | 0 .../api/messaging/MessageListener.java | 9 +- .../api/messaging/MessageReceiver.java | 0 .../api/messaging/MessageSender.java | 0 .../api/messaging/MessageToSend.java | 0 .../api/messaging/MessagingService.java | 22 +- .../api/messaging/ReceivedMessage.java | 0 .../api/messaging/ReceivedMessageHandler.java | 2 +- .../api/messaging/RedisMessagingService.java | 250 ++---------------- .../api/messaging/RedisStorageService.java | 102 ++++++- .../api/messaging/messages/IMessage.java | 5 + .../messaging/messages/KeepAliveReply.java | 9 + .../messaging/messages/MeetingDestroyed.java | 9 + .../api/messaging/messages/MeetingEnded.java | 9 + .../messaging/messages/MeetingStarted.java | 9 + .../api/messaging/messages/UserJoined.java | 17 ++ .../api/messaging/messages/UserLeft.java | 11 + .../messaging/messages/UserStatusChanged.java | 15 ++ .../imp/PdfToSwfSlidesGenerationService.java | 0 .../web/services/KeepAliveMessage.java | 0 .../web/services/KeepAlivePing.java | 0 .../web/services/KeepAlivePong.java | 0 .../web/services/KeepAliveService.java | 26 +- 29 files changed, 477 insertions(+), 386 deletions(-) mode change 100644 => 100755 bigbluebutton-web/grails-app/conf/spring/bbb-redis-messaging.xml mode change 100644 => 100755 bigbluebutton-web/grails-app/conf/spring/bbb-redis-pool.xml mode change 100644 => 100755 bigbluebutton-web/grails-app/conf/spring/resources.xml mode change 100644 => 100755 bigbluebutton-web/src/java/org/bigbluebutton/api/MeetingService.java create mode 100755 bigbluebutton-web/src/java/org/bigbluebutton/api/messaging/MeetingMessageHandler.java mode change 100644 => 100755 bigbluebutton-web/src/java/org/bigbluebutton/api/messaging/MessageDistributor.java mode change 100644 => 100755 bigbluebutton-web/src/java/org/bigbluebutton/api/messaging/MessageHandler.java mode change 100644 => 100755 bigbluebutton-web/src/java/org/bigbluebutton/api/messaging/MessageReceiver.java mode change 100644 => 100755 bigbluebutton-web/src/java/org/bigbluebutton/api/messaging/MessageSender.java mode change 100644 => 100755 bigbluebutton-web/src/java/org/bigbluebutton/api/messaging/MessageToSend.java mode change 100644 => 100755 bigbluebutton-web/src/java/org/bigbluebutton/api/messaging/ReceivedMessage.java mode change 100644 => 100755 bigbluebutton-web/src/java/org/bigbluebutton/api/messaging/ReceivedMessageHandler.java mode change 100644 => 100755 bigbluebutton-web/src/java/org/bigbluebutton/api/messaging/RedisMessagingService.java create mode 100755 bigbluebutton-web/src/java/org/bigbluebutton/api/messaging/messages/IMessage.java create mode 100755 bigbluebutton-web/src/java/org/bigbluebutton/api/messaging/messages/KeepAliveReply.java create mode 100755 bigbluebutton-web/src/java/org/bigbluebutton/api/messaging/messages/MeetingDestroyed.java create mode 100755 bigbluebutton-web/src/java/org/bigbluebutton/api/messaging/messages/MeetingEnded.java create mode 100755 bigbluebutton-web/src/java/org/bigbluebutton/api/messaging/messages/MeetingStarted.java create mode 100755 bigbluebutton-web/src/java/org/bigbluebutton/api/messaging/messages/UserJoined.java create mode 100755 bigbluebutton-web/src/java/org/bigbluebutton/api/messaging/messages/UserLeft.java create mode 100755 bigbluebutton-web/src/java/org/bigbluebutton/api/messaging/messages/UserStatusChanged.java mode change 100644 => 100755 bigbluebutton-web/src/java/org/bigbluebutton/presentation/imp/PdfToSwfSlidesGenerationService.java mode change 100644 => 100755 bigbluebutton-web/src/java/org/bigbluebutton/web/services/KeepAliveMessage.java mode change 100644 => 100755 bigbluebutton-web/src/java/org/bigbluebutton/web/services/KeepAlivePing.java mode change 100644 => 100755 bigbluebutton-web/src/java/org/bigbluebutton/web/services/KeepAlivePong.java mode change 100644 => 100755 bigbluebutton-web/src/java/org/bigbluebutton/web/services/KeepAliveService.java diff --git a/bigbluebutton-web/grails-app/conf/spring/bbb-redis-messaging.xml b/bigbluebutton-web/grails-app/conf/spring/bbb-redis-messaging.xml old mode 100644 new mode 100755 index 4b57975027..260bd653cf --- a/bigbluebutton-web/grails-app/conf/spring/bbb-redis-messaging.xml +++ b/bigbluebutton-web/grails-app/conf/spring/bbb-redis-messaging.xml @@ -26,33 +26,41 @@ with BigBlueButton; if not, see . http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-2.0.xsd "> - - - + + + + - - - - - + + + + + + + + + + + - - - - - + + diff --git a/bigbluebutton-web/grails-app/conf/spring/bbb-redis-pool.xml b/bigbluebutton-web/grails-app/conf/spring/bbb-redis-pool.xml old mode 100644 new mode 100755 index 4c20c42638..22063aba8b --- a/bigbluebutton-web/grails-app/conf/spring/bbb-redis-pool.xml +++ b/bigbluebutton-web/grails-app/conf/spring/bbb-redis-pool.xml @@ -31,11 +31,11 @@ with BigBlueButton; if not, see . - - + + - + diff --git a/bigbluebutton-web/grails-app/conf/spring/resources.xml b/bigbluebutton-web/grails-app/conf/spring/resources.xml old mode 100644 new mode 100755 index ee012ddadd..2fc4c7d454 --- a/bigbluebutton-web/grails-app/conf/spring/resources.xml +++ b/bigbluebutton-web/grails-app/conf/spring/resources.xml @@ -26,47 +26,12 @@ with BigBlueButton; if not, see . http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-2.0.xsd "> - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + - + + - - - - - - - - . - + - @@ -115,4 +79,6 @@ with BigBlueButton; if not, see . + + \ No newline at end of file diff --git a/bigbluebutton-web/src/java/org/bigbluebutton/api/MeetingService.java b/bigbluebutton-web/src/java/org/bigbluebutton/api/MeetingService.java old mode 100644 new mode 100755 index edac43f2f9..545ad78992 --- a/bigbluebutton-web/src/java/org/bigbluebutton/api/MeetingService.java +++ b/bigbluebutton-web/src/java/org/bigbluebutton/api/MeetingService.java @@ -21,24 +21,42 @@ package org.bigbluebutton.api; import java.util.ArrayList; import java.util.Collection; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import java.util.*; -import org.apache.commons.lang.RandomStringUtils; import org.bigbluebutton.api.domain.Meeting; import org.bigbluebutton.api.domain.Playback; import org.bigbluebutton.api.domain.Recording; import org.bigbluebutton.api.domain.User; import org.bigbluebutton.api.domain.UserSession; import org.bigbluebutton.api.messaging.MessageListener; +import org.bigbluebutton.api.messaging.MessagingConstants; import org.bigbluebutton.api.messaging.MessagingService; +import org.bigbluebutton.api.messaging.ReceivedMessage; +import org.bigbluebutton.api.messaging.messages.IMessage; +import org.bigbluebutton.api.messaging.messages.MeetingDestroyed; +import org.bigbluebutton.api.messaging.messages.MeetingEnded; +import org.bigbluebutton.api.messaging.messages.MeetingStarted; +import org.bigbluebutton.api.messaging.messages.UserJoined; +import org.bigbluebutton.api.messaging.messages.UserLeft; +import org.bigbluebutton.api.messaging.messages.UserStatusChanged; import org.bigbluebutton.web.services.ExpiredMeetingCleanupTimerTask; import org.bigbluebutton.web.services.KeepAliveService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class MeetingService { +public class MeetingService implements MessageListener { private static Logger log = LoggerFactory.getLogger(MeetingService.class); + + private BlockingQueue receivedMessages = new LinkedBlockingQueue(); + private volatile boolean processMessage = false; + + private final Executor msgProcessorExec = Executors.newSingleThreadExecutor(); private final ConcurrentMap meetings; private final ConcurrentMap sessions; @@ -49,8 +67,7 @@ public class MeetingService { private MessagingService messagingService; private ExpiredMeetingCleanupTimerTask cleaner; private boolean removeMeetingWhenEnded = false; - private KeepAliveService keepAliveService; - + public MeetingService() { meetings = new ConcurrentHashMap(); sessions = new ConcurrentHashMap(); @@ -294,8 +311,6 @@ public class MeetingService { public void setMessagingService(MessagingService mess) { messagingService = mess; - messagingService.addListener(new MeetingMessageListener()); - messagingService.start(); } public void setExpiredMeetingCleanupTimerTask(ExpiredMeetingCleanupTimerTask c) { @@ -304,92 +319,123 @@ public class MeetingService { cleaner.start(); } - public void setKeepAliveService(KeepAliveService keepAlive){ - this.keepAliveService = keepAlive; - } - - /** - * Class that listens for messages from bbb-apps. - * @author Richard Alam - * - */ - private class MeetingMessageListener implements MessageListener { - @Override - public void meetingStarted(String meetingId) { - Meeting m = getMeeting(meetingId); - if (m != null) { - if (m.getStartTime() == 0) { - long now = System.currentTimeMillis(); - log.info("Meeting [{}] has started on [{}]", meetingId, now); - m.setStartTime(now); - } else { - log.debug("The meeting [{}] has been started again...", meetingId); - } - m.setEndTime(0); - return; - } - log.warn("The meeting [{}] doesn't exist", meetingId); - } - - @Override - public void meetingEnded(String meetingId) { - Meeting m = getMeeting(meetingId); - if (m != null) { + private void meetingStarted(MeetingStarted message) { + Meeting m = getMeeting(message.meetingId); + if (m != null) { + if (m.getStartTime() == 0) { long now = System.currentTimeMillis(); - log.debug("Meeting [{}] end time [{}].", meetingId, now); - m.setEndTime(now); - return; + log.info("Meeting [{}] has started on [{}]", message.meetingId, now); + m.setStartTime(now); + } else { + log.debug("The meeting [{}] has been started again...", message.meetingId); } - log.warn("The meeting " + meetingId + " doesn't exist"); + m.setEndTime(0); + return; } + log.warn("The meeting [{}] doesn't exist", message.meetingId); + } - @Override - public void userJoined(String meetingId, String internalUserId, String externalUserId, String name, String role) { - Meeting m = getMeeting(meetingId); - if (m != null) { - User user = new User(internalUserId, externalUserId, name, role); - m.userJoined(user); - log.debug("New user in meeting " + meetingId + ":" + user.getFullname()); - return; - } - log.warn("The meeting " + meetingId + " doesn't exist"); + private void meetingEnded(MeetingEnded message) { + Meeting m = getMeeting(message.meetingId); + if (m != null) { + long now = System.currentTimeMillis(); + log.debug("Meeting [{}] end time [{}].", message.meetingId, now); + m.setEndTime(now); + return; } + log.warn("The meeting " + message.meetingId + " doesn't exist"); + } - @Override - public void userLeft(String meetingId, String internalUserId) { - Meeting m = getMeeting(meetingId); - if (m != null) { - User user = m.userLeft(internalUserId); - if(user != null){ - log.debug("User removed from meeting " + meetingId + ":" + user.getFullname()); - return; - } - log.warn("The participant " + internalUserId + " doesn't exist in the meeting " + meetingId); + public void userJoined(UserJoined message) { + Meeting m = getMeeting(message.meetingId); + if (m != null) { + User user = new User(message.userId, message.externalUserId, message.name, message.role); + m.userJoined(user); + log.debug("New user in meeting " + message.meetingId + ":" + user.getFullname()); + return; + } + log.warn("The meeting " + message.meetingId + " doesn't exist"); + } + + private void userLeft(UserLeft message) { + Meeting m = getMeeting(message.meetingId); + if (m != null) { + User user = m.userLeft(message.userId); + if(user != null){ + log.debug("User removed from meeting " + message.meetingId + ":" + user.getFullname()); return; } - log.warn("The meeting " + meetingId + " doesn't exist"); + log.warn("The participant " + message.userId + " doesn't exist in the meeting " + message.meetingId); + return; } + log.warn("The meeting " + message.meetingId + " doesn't exist"); + } - @Override - public void updatedStatus(String meetingId, String internalUserId, String status, String value) { - Meeting m = getMeeting(meetingId); - if (m != null) { - User user = m.getUserById(internalUserId); - if(user != null){ - user.setStatus(status, value); - log.debug("Setting new status value in meeting " + meetingId + " for participant:"+user.getFullname()); - return; - } - log.warn("The participant " + internalUserId + " doesn't exist in the meeting " + meetingId); + private void updatedStatus(UserStatusChanged message) { + Meeting m = getMeeting(message.meetingId); + if (m != null) { + User user = m.getUserById(message.userId); + if(user != null){ + user.setStatus(message.status, message.value); + log.debug("Setting new status value in meeting " + message.meetingId + " for participant:"+user.getFullname()); return; } - log.warn("The meeting " + meetingId + " doesn't exist"); + log.warn("The participant " + message.userId + " doesn't exist in the meeting " + message.meetingId); + return; } + log.warn("The meeting " + message.meetingId + " doesn't exist"); + } - @Override - public void keepAliveReply(String aliveId){ - keepAliveService.keepAliveReply(aliveId); + private void processMessage(IMessage message) { + if (message instanceof MeetingDestroyed) { + + } else if (message instanceof MeetingStarted) { + meetingStarted((MeetingStarted)message); + } else if (message instanceof MeetingEnded) { + meetingEnded((MeetingEnded)message); + } else if (message instanceof UserJoined) { + userJoined((UserJoined)message); + } else if (message instanceof UserLeft) { + userLeft((UserLeft)message); + } else if (message instanceof UserStatusChanged) { + updatedStatus((UserStatusChanged)message); + } + } + + @Override + public void handle(IMessage message) { + try { + receivedMessages.offer(message, 5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + + } + + public void start() { + try { + processMessage = true; + Runnable messageReceiver = new Runnable() { + public void run() { + if (processMessage) { + try { + IMessage msg = receivedMessages.take(); + processMessage(msg); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + } + }; + msgProcessorExec.execute(messageReceiver); + } catch (Exception e) { + log.error("Error PRocessing Message"); } } + public void stop() { + processMessage = false; + } } diff --git a/bigbluebutton-web/src/java/org/bigbluebutton/api/messaging/MeetingMessageHandler.java b/bigbluebutton-web/src/java/org/bigbluebutton/api/messaging/MeetingMessageHandler.java new file mode 100755 index 0000000000..476127af92 --- /dev/null +++ b/bigbluebutton-web/src/java/org/bigbluebutton/api/messaging/MeetingMessageHandler.java @@ -0,0 +1,82 @@ +package org.bigbluebutton.api.messaging; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import org.bigbluebutton.api.messaging.messages.KeepAliveReply; +import org.bigbluebutton.api.messaging.messages.MeetingDestroyed; +import org.bigbluebutton.api.messaging.messages.MeetingEnded; +import org.bigbluebutton.api.messaging.messages.MeetingStarted; +import org.bigbluebutton.api.messaging.messages.UserJoined; +import org.bigbluebutton.api.messaging.messages.UserLeft; +import org.bigbluebutton.api.messaging.messages.UserStatusChanged; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; + +public class MeetingMessageHandler implements MessageHandler { + private static Logger log = LoggerFactory.getLogger(MeetingMessageHandler.class); + + private Set listeners; + + public void setMessageListeners(Set listeners) { + this.listeners = listeners; + } + + public void handleMessage(String pattern, String channel, String message) { + Gson gson = new Gson(); + + if (channel.equalsIgnoreCase(MessagingConstants.SYSTEM_CHANNEL)) { + HashMap map = gson.fromJson(message, new TypeToken>() {}.getType()); + String messageId = map.get("messageID"); + + for (MessageListener listener : listeners) { + if(MessagingConstants.MEETING_STARTED_EVENT.equalsIgnoreCase(messageId)) { + String meetingId = map.get("meetingID"); + listener.handle(new MeetingStarted(meetingId)); + } else if(MessagingConstants.MEETING_ENDED_EVENT.equalsIgnoreCase(messageId)) { + String meetingId = map.get("meetingID"); + listener.handle(new MeetingEnded(meetingId)); + } else if(MessagingConstants.KEEP_ALIVE_REPLY_EVENT.equalsIgnoreCase(messageId)){ + String pongId = map.get("aliveID"); + listener.handle(new KeepAliveReply(pongId)); + } else if (MessagingConstants.MEETING_DESTROYED_EVENT.equalsIgnoreCase(messageId)) { + String meetingId = map.get("meetingID"); + log.info("Received a meeting destroyed message for meeting id=[{}]", meetingId); + listener.handle(new MeetingDestroyed(meetingId)); + } + } + } else if (channel.equalsIgnoreCase(MessagingConstants.PARTICIPANTS_CHANNEL)) { + HashMap map = gson.fromJson(message, new TypeToken>() {}.getType()); + String meetingId = map.get("meetingID"); + String messageId = map.get("messageID"); + if (MessagingConstants.USER_JOINED_EVENT.equalsIgnoreCase(messageId)){ + String userId = map.get("internalUserID"); + String externalUserId = map.get("externalUserID"); + String name = map.get("fullname"); + String role = map.get("role"); + + for (MessageListener listener : listeners) { + listener.handle(new UserJoined(meetingId, userId, externalUserId, name, role)); + } + } else if(MessagingConstants.USER_STATUS_CHANGE_EVENT.equalsIgnoreCase(messageId)){ + String userId = map.get("internalUserID"); + String status = map.get("status"); + String value = map.get("value"); + + for (MessageListener listener : listeners) { + listener.handle(new UserStatusChanged(meetingId, userId, status, value)); + } + } else if(MessagingConstants.USER_LEFT_EVENT.equalsIgnoreCase(messageId)){ + String userId = map.get("internalUserID"); + + for (MessageListener listener : listeners) { + listener.handle(new UserLeft(meetingId, userId)); + } + } + } + } + +} diff --git a/bigbluebutton-web/src/java/org/bigbluebutton/api/messaging/MessageDistributor.java b/bigbluebutton-web/src/java/org/bigbluebutton/api/messaging/MessageDistributor.java old mode 100644 new mode 100755 diff --git a/bigbluebutton-web/src/java/org/bigbluebutton/api/messaging/MessageHandler.java b/bigbluebutton-web/src/java/org/bigbluebutton/api/messaging/MessageHandler.java old mode 100644 new mode 100755 diff --git a/bigbluebutton-web/src/java/org/bigbluebutton/api/messaging/MessageListener.java b/bigbluebutton-web/src/java/org/bigbluebutton/api/messaging/MessageListener.java index 7d5a7a5bf8..4779ea975b 100755 --- a/bigbluebutton-web/src/java/org/bigbluebutton/api/messaging/MessageListener.java +++ b/bigbluebutton-web/src/java/org/bigbluebutton/api/messaging/MessageListener.java @@ -19,11 +19,8 @@ package org.bigbluebutton.api.messaging; +import org.bigbluebutton.api.messaging.messages.IMessage; + public interface MessageListener { - void meetingStarted(String meetingId); - void meetingEnded(String meetingId); - void userJoined(String meetingId, String internalUserId, String externalUserId, String name, String role); - void userLeft(String meetingId, String internalUserId); - void updatedStatus(String meetingId, String internalUserId, String status, String value); - void keepAliveReply(String aliveId); + void handle(IMessage message); } diff --git a/bigbluebutton-web/src/java/org/bigbluebutton/api/messaging/MessageReceiver.java b/bigbluebutton-web/src/java/org/bigbluebutton/api/messaging/MessageReceiver.java old mode 100644 new mode 100755 diff --git a/bigbluebutton-web/src/java/org/bigbluebutton/api/messaging/MessageSender.java b/bigbluebutton-web/src/java/org/bigbluebutton/api/messaging/MessageSender.java old mode 100644 new mode 100755 diff --git a/bigbluebutton-web/src/java/org/bigbluebutton/api/messaging/MessageToSend.java b/bigbluebutton-web/src/java/org/bigbluebutton/api/messaging/MessageToSend.java old mode 100644 new mode 100755 diff --git a/bigbluebutton-web/src/java/org/bigbluebutton/api/messaging/MessagingService.java b/bigbluebutton-web/src/java/org/bigbluebutton/api/messaging/MessagingService.java index 5920f0f843..d13278d186 100755 --- a/bigbluebutton-web/src/java/org/bigbluebutton/api/messaging/MessagingService.java +++ b/bigbluebutton-web/src/java/org/bigbluebutton/api/messaging/MessagingService.java @@ -23,17 +23,13 @@ import java.util.List; import java.util.Map; public interface MessagingService { - public void start(); - public void stop(); - public void recordMeetingInfo(String meetingId, Map info); - public void destroyMeeting(String meetingID); - public void createMeeting(String meetingID, String meetingName, Boolean recorded, String voiceBridge, Long duration); - public void endMeeting(String meetingId); - public void send(String channel, String message); - public void addListener(MessageListener listener); - public void removeListener(MessageListener listener); - public void sendPolls(String meetingId, String title, String question, String questionType, List answers); - public String storeSubscription(String meetingId, String externalMeetingID, String callbackURL); - public boolean removeSubscription(String meetingId, String subscriptionId); - public List> listSubscriptions(String meetingId); + void recordMeetingInfo(String meetingId, Map info); + void destroyMeeting(String meetingID); + void createMeeting(String meetingID, String meetingName, Boolean recorded, String voiceBridge, Long duration); + void endMeeting(String meetingId); + void send(String channel, String message); + void sendPolls(String meetingId, String title, String question, String questionType, List answers); + String storeSubscription(String meetingId, String externalMeetingID, String callbackURL); + boolean removeSubscription(String meetingId, String subscriptionId); + List> listSubscriptions(String meetingId); } diff --git a/bigbluebutton-web/src/java/org/bigbluebutton/api/messaging/ReceivedMessage.java b/bigbluebutton-web/src/java/org/bigbluebutton/api/messaging/ReceivedMessage.java old mode 100644 new mode 100755 diff --git a/bigbluebutton-web/src/java/org/bigbluebutton/api/messaging/ReceivedMessageHandler.java b/bigbluebutton-web/src/java/org/bigbluebutton/api/messaging/ReceivedMessageHandler.java old mode 100644 new mode 100755 index 268cd543bb..b5620aa127 --- a/bigbluebutton-web/src/java/org/bigbluebutton/api/messaging/ReceivedMessageHandler.java +++ b/bigbluebutton-web/src/java/org/bigbluebutton/api/messaging/ReceivedMessageHandler.java @@ -8,7 +8,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ReceivedMessageHandler { - private static Logger log = LoggerFactory.getLogger(MessageReceiver.class); + private static Logger log = LoggerFactory.getLogger(ReceivedMessageHandler.class); private BlockingQueue receivedMessages = new LinkedBlockingQueue(); diff --git a/bigbluebutton-web/src/java/org/bigbluebutton/api/messaging/RedisMessagingService.java b/bigbluebutton-web/src/java/org/bigbluebutton/api/messaging/RedisMessagingService.java old mode 100644 new mode 100755 index 7e2e60c35c..d14b109e1d --- a/bigbluebutton-web/src/java/org/bigbluebutton/api/messaging/RedisMessagingService.java +++ b/bigbluebutton-web/src/java/org/bigbluebutton/api/messaging/RedisMessagingService.java @@ -39,35 +39,11 @@ import redis.clients.jedis.JedisPubSub; public class RedisMessagingService implements MessagingService { private static Logger log = LoggerFactory.getLogger(RedisMessagingService.class); - private JedisPool redisPool; - private final Set listeners = new HashSet(); - - private final Executor exec = Executors.newSingleThreadExecutor(); - private Runnable pubsubListener; - - @Override - public void addListener(MessageListener listener) { - listeners.add(listener); - } - - public void removeListener(MessageListener listener) { - listeners.remove(listener); - } - + private RedisStorageService storeService; + private MessageSender sender; + public void recordMeetingInfo(String meetingId, Map info) { - Jedis jedis = redisPool.getResource(); - try { - for (String key: info.keySet()) { - log.debug("Storing metadata {} = {}", key, info.get(key)); - } - - log.debug("Saving metadata in {}", meetingId); - jedis.hmset("meeting:info:" + meetingId, info); - } catch (Exception e){ - log.warn("Cannot record the info meeting:"+meetingId,e); - } finally { - redisPool.returnResource(jedis); - } + storeService.recordMeetingInfo(meetingId, info); } public void destroyMeeting(String meetingID) { @@ -76,7 +52,7 @@ public class RedisMessagingService implements MessagingService { map.put("meetingID", meetingID); Gson gson = new Gson(); log.info("Sending destroy meeting [{}]", meetingID); - send(MessagingConstants.SYSTEM_CHANNEL, gson.toJson(map)); + sender.send(MessagingConstants.SYSTEM_CHANNEL, gson.toJson(map)); } public void createMeeting(String meetingID, String meetingName, Boolean recorded, String voiceBridge, Long duration) { @@ -90,7 +66,7 @@ public class RedisMessagingService implements MessagingService { Gson gson = new Gson(); log.info("Sending create meeting [{}] - [{}]", meetingID, voiceBridge); - send(MessagingConstants.SYSTEM_CHANNEL, gson.toJson(map)); + sender.send(MessagingConstants.SYSTEM_CHANNEL, gson.toJson(map)); } public void endMeeting(String meetingId) { @@ -99,21 +75,13 @@ public class RedisMessagingService implements MessagingService { map.put("meetingId", meetingId); Gson gson = new Gson(); log.info("Sending end meeting [{}]", meetingId); - send(MessagingConstants.SYSTEM_CHANNEL, gson.toJson(map)); - } - - public void send(String channel, String message) { - Jedis jedis = redisPool.getResource(); - try { - log.debug("Sending to channel[" + channel + "] message[" + message + "]"); - jedis.publish(channel, message); - } catch(Exception e){ - log.warn("Cannot publish the message to redis",e); - }finally{ - redisPool.returnResource(jedis); - } + sender.send(MessagingConstants.SYSTEM_CHANNEL, gson.toJson(map)); } + public void send(String channel, String message) { + sender.send(channel, message); + } + public void sendPolls(String meetingId, String title, String question, String questionType, List answers){ Gson gson = new Gson(); @@ -127,199 +95,31 @@ public class RedisMessagingService implements MessagingService { System.out.println(gson.toJson(map)); - send(MessagingConstants.POLLING_CHANNEL, gson.toJson(map)); + sender.send(MessagingConstants.POLLING_CHANNEL, gson.toJson(map)); } + public void setMessageSender(MessageSender sender) { + this.sender = sender; + } + + public void setRedisStorageService(RedisStorageService storeService) { + this.storeService = storeService; + } + public String storeSubscription(String meetingId, String externalMeetingID, String callbackURL){ - String sid = ""; - Jedis jedis = redisPool.getResource(); - try { - sid = Long.toString(jedis.incr("meeting:" + meetingId + ":nextSubscription")); - - HashMap props = new HashMap(); - props.put("subscriptionID", sid); - props.put("meetingId", meetingId); - props.put("externalMeetingID", externalMeetingID); - props.put("callbackURL", callbackURL); - props.put("active", "true"); - - jedis.hmset("meeting:" + meetingId + ":subscription:" + sid, props); - jedis.rpush("meeting:" + meetingId + ":subscriptions", sid); - - } catch (Exception e){ - log.warn("Cannot store subscription:" + meetingId, e); - } finally { - redisPool.returnResource(jedis); - } - - return sid; + return storeService.storeSubscription(meetingId, externalMeetingID, callbackURL); } public boolean removeSubscription(String meetingId, String subscriptionId){ - boolean unsubscribed = true; - Jedis jedis = redisPool.getResource(); - try { - jedis.hset("meeting:" + meetingId + ":subscription:" + subscriptionId, "active", "false"); - } catch (Exception e){ - log.warn("Cannot rmove subscription:" + meetingId, e); - unsubscribed = false; - } finally { - redisPool.returnResource(jedis); - } - - return unsubscribed; + return storeService.removeSubscription(meetingId, subscriptionId); } public List> listSubscriptions(String meetingId){ - List> list = new ArrayList>(); - Jedis jedis = redisPool.getResource(); - try { - List sids = jedis.lrange("meeting:" + meetingId + ":subscriptions", 0 , -1); - for(int i=0; i props = jedis.hgetAll("meeting:" + meetingId + ":subscription:" + sids.get(i)); - list.add(props); - } - - } catch (Exception e){ - log.warn("Cannot list subscriptions:" + meetingId, e); - } finally { - redisPool.returnResource(jedis); - } - - return list; + return storeService.listSubscriptions(meetingId); } - - public void start() { - log.debug("Starting redis pubsub..."); - - final Jedis jedis = redisPool.getResource(); - try { - pubsubListener = new Runnable() { - public void run() { - jedis.psubscribe(new PubSubListener(), MessagingConstants.BIGBLUEBUTTON_PATTERN); - } - }; - exec.execute(pubsubListener); - } catch (Exception e) { - log.error("Error in subscribe: " + e.getMessage()); - } - } - - public void stop() { - try { - redisPool.destroy(); - } catch (Exception e) { - e.printStackTrace(); - } - } - public void setRedisPool(JedisPool redisPool){ - this.redisPool=redisPool; - } - - private class PubSubListener extends JedisPubSub { - - public PubSubListener() { - super(); - } - - @Override - public void onMessage(String channel, String message) { - // Not used. - } - - @Override - public void onPMessage(String pattern, String channel, String message) { -// log.debug("Received in channel [{}] message [{}]", channel, message); - - Gson gson = new Gson(); - -// for (String key: map.keySet()) { -// log.debug("rx: {} = {}", key, map.get(key)); -// } - - if (channel.equalsIgnoreCase(MessagingConstants.SYSTEM_CHANNEL)){ - HashMap map = gson.fromJson(message, new TypeToken>() {}.getType()); - String messageId = map.get("messageID"); - - for (MessageListener listener : listeners) { - if(MessagingConstants.MEETING_STARTED_EVENT.equalsIgnoreCase(messageId)) { - String meetingId = map.get("meetingID"); - listener.meetingStarted(meetingId); - } else if(MessagingConstants.MEETING_ENDED_EVENT.equalsIgnoreCase(messageId)) { - String meetingId = map.get("meetingID"); - listener.meetingEnded(meetingId); - } else if(MessagingConstants.KEEP_ALIVE_REPLY_EVENT.equalsIgnoreCase(messageId)){ - String aliveId = map.get("aliveID"); - listener.keepAliveReply(aliveId); - } else if (MessagingConstants.MEETING_DESTROYED_EVENT.equalsIgnoreCase(messageId)) { - String meetingId = map.get("meetingID"); - log.info("Received a meeting destroyed message for meeting id=[{}]", meetingId); - } - } - } else if(channel.equalsIgnoreCase(MessagingConstants.PARTICIPANTS_CHANNEL)){ - HashMap map = gson.fromJson(message, new TypeToken>() {}.getType()); - String meetingId = map.get("meetingID"); - String messageId = map.get("messageID"); - if(MessagingConstants.USER_JOINED_EVENT.equalsIgnoreCase(messageId)){ - String internalUserId = map.get("internalUserID"); - String externalUserId = map.get("externalUserID"); - String fullname = map.get("fullname"); - String role = map.get("role"); - - for (MessageListener listener : listeners) { - listener.userJoined(meetingId, internalUserId, externalUserId, fullname, role); - } - } else if(MessagingConstants.USER_STATUS_CHANGE_EVENT.equalsIgnoreCase(messageId)){ - String internalUserId = map.get("internalUserID"); - String status = map.get("status"); - String value = map.get("value"); - - for (MessageListener listener : listeners) { - listener.updatedStatus(meetingId, internalUserId, status, value); - } - } else if(MessagingConstants.USER_LEFT_EVENT.equalsIgnoreCase(messageId)){ - String internalUserId = map.get("internalUserID"); - - for (MessageListener listener : listeners) { - listener.userLeft(meetingId, internalUserId); - } - } - } - } - - @Override - public void onPSubscribe(String pattern, int subscribedChannels) { - log.debug("Subscribed to the pattern:"+pattern); - } - - @Override - public void onPUnsubscribe(String pattern, int subscribedChannels) { - // Not used. - } - - @Override - public void onSubscribe(String channel, int subscribedChannels) { - // Not used. - } - - @Override - public void onUnsubscribe(String channel, int subscribedChannels) { - // Not used. - } - } - public void removeMeeting(String meetingId){ - Jedis jedis = redisPool.getResource(); - try { - jedis.del("meeting-" + meetingId); - //jedis.hmset("meeting"+ COLON +"info" + COLON + meetingId, metadata); - jedis.srem("meetings", meetingId); - - } finally { - redisPool.returnResource(jedis); - } + storeService.removeMeeting(meetingId); } - - + } diff --git a/bigbluebutton-web/src/java/org/bigbluebutton/api/messaging/RedisStorageService.java b/bigbluebutton-web/src/java/org/bigbluebutton/api/messaging/RedisStorageService.java index d2a43fdff6..e90c16d680 100755 --- a/bigbluebutton-web/src/java/org/bigbluebutton/api/messaging/RedisStorageService.java +++ b/bigbluebutton-web/src/java/org/bigbluebutton/api/messaging/RedisStorageService.java @@ -1,5 +1,105 @@ package org.bigbluebutton.api.messaging; -public class RedisStorageService { +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.JedisPool; +public class RedisStorageService { + private static Logger log = LoggerFactory.getLogger(RedisStorageService.class); + + private JedisPool redisPool; + + public void recordMeetingInfo(String meetingId, Map info) { + Jedis jedis = redisPool.getResource(); + try { + for (String key: info.keySet()) { + log.debug("Storing metadata {} = {}", key, info.get(key)); + } + + log.debug("Saving metadata in {}", meetingId); + jedis.hmset("meeting:info:" + meetingId, info); + } catch (Exception e){ + log.warn("Cannot record the info meeting:"+meetingId,e); + } finally { + redisPool.returnResource(jedis); + } + } + + public void removeMeeting(String meetingId){ + Jedis jedis = redisPool.getResource(); + try { + jedis.del("meeting-" + meetingId); + jedis.srem("meetings", meetingId); + } finally { + redisPool.returnResource(jedis); + } + } + + public List> listSubscriptions(String meetingId){ + List> list = new ArrayList>(); + Jedis jedis = redisPool.getResource(); + try { + List sids = jedis.lrange("meeting:" + meetingId + ":subscriptions", 0 , -1); + for(int i=0; i props = jedis.hgetAll("meeting:" + meetingId + ":subscription:" + sids.get(i)); + list.add(props); + } + + } catch (Exception e){ + log.warn("Cannot list subscriptions:" + meetingId, e); + } finally { + redisPool.returnResource(jedis); + } + + return list; + } + + public boolean removeSubscription(String meetingId, String subscriptionId){ + boolean unsubscribed = true; + Jedis jedis = redisPool.getResource(); + try { + jedis.hset("meeting:" + meetingId + ":subscription:" + subscriptionId, "active", "false"); + } catch (Exception e){ + log.warn("Cannot rmove subscription:" + meetingId, e); + unsubscribed = false; + } finally { + redisPool.returnResource(jedis); + } + + return unsubscribed; + } + + public String storeSubscription(String meetingId, String externalMeetingID, String callbackURL){ + String sid = ""; + Jedis jedis = redisPool.getResource(); + try { + sid = Long.toString(jedis.incr("meeting:" + meetingId + ":nextSubscription")); + + HashMap props = new HashMap(); + props.put("subscriptionID", sid); + props.put("meetingId", meetingId); + props.put("externalMeetingID", externalMeetingID); + props.put("callbackURL", callbackURL); + props.put("active", "true"); + + jedis.hmset("meeting:" + meetingId + ":subscription:" + sid, props); + jedis.rpush("meeting:" + meetingId + ":subscriptions", sid); + + } catch (Exception e){ + log.warn("Cannot store subscription:" + meetingId, e); + } finally { + redisPool.returnResource(jedis); + } + + return sid; + } + + public void setRedisPool(JedisPool redisPool){ + this.redisPool=redisPool; + } } diff --git a/bigbluebutton-web/src/java/org/bigbluebutton/api/messaging/messages/IMessage.java b/bigbluebutton-web/src/java/org/bigbluebutton/api/messaging/messages/IMessage.java new file mode 100755 index 0000000000..8d40b70fbc --- /dev/null +++ b/bigbluebutton-web/src/java/org/bigbluebutton/api/messaging/messages/IMessage.java @@ -0,0 +1,5 @@ +package org.bigbluebutton.api.messaging.messages; + +public interface IMessage { + +} diff --git a/bigbluebutton-web/src/java/org/bigbluebutton/api/messaging/messages/KeepAliveReply.java b/bigbluebutton-web/src/java/org/bigbluebutton/api/messaging/messages/KeepAliveReply.java new file mode 100755 index 0000000000..8c28678602 --- /dev/null +++ b/bigbluebutton-web/src/java/org/bigbluebutton/api/messaging/messages/KeepAliveReply.java @@ -0,0 +1,9 @@ +package org.bigbluebutton.api.messaging.messages; + +public class KeepAliveReply implements IMessage { + public final String pongId; + + public KeepAliveReply(String pongId) { + this.pongId = pongId; + } +} diff --git a/bigbluebutton-web/src/java/org/bigbluebutton/api/messaging/messages/MeetingDestroyed.java b/bigbluebutton-web/src/java/org/bigbluebutton/api/messaging/messages/MeetingDestroyed.java new file mode 100755 index 0000000000..f6d1ff49f3 --- /dev/null +++ b/bigbluebutton-web/src/java/org/bigbluebutton/api/messaging/messages/MeetingDestroyed.java @@ -0,0 +1,9 @@ +package org.bigbluebutton.api.messaging.messages; + +public class MeetingDestroyed implements IMessage { + public final String meetingId; + + public MeetingDestroyed(String meetingId) { + this.meetingId = meetingId; + } +} diff --git a/bigbluebutton-web/src/java/org/bigbluebutton/api/messaging/messages/MeetingEnded.java b/bigbluebutton-web/src/java/org/bigbluebutton/api/messaging/messages/MeetingEnded.java new file mode 100755 index 0000000000..dcc4b9da2a --- /dev/null +++ b/bigbluebutton-web/src/java/org/bigbluebutton/api/messaging/messages/MeetingEnded.java @@ -0,0 +1,9 @@ +package org.bigbluebutton.api.messaging.messages; + +public class MeetingEnded implements IMessage { + public final String meetingId; + + public MeetingEnded(String meetingId) { + this.meetingId = meetingId; + } +} diff --git a/bigbluebutton-web/src/java/org/bigbluebutton/api/messaging/messages/MeetingStarted.java b/bigbluebutton-web/src/java/org/bigbluebutton/api/messaging/messages/MeetingStarted.java new file mode 100755 index 0000000000..3f28136534 --- /dev/null +++ b/bigbluebutton-web/src/java/org/bigbluebutton/api/messaging/messages/MeetingStarted.java @@ -0,0 +1,9 @@ +package org.bigbluebutton.api.messaging.messages; + +public class MeetingStarted implements IMessage { + public final String meetingId; + + public MeetingStarted(String meetingId) { + this.meetingId = meetingId; + } +} diff --git a/bigbluebutton-web/src/java/org/bigbluebutton/api/messaging/messages/UserJoined.java b/bigbluebutton-web/src/java/org/bigbluebutton/api/messaging/messages/UserJoined.java new file mode 100755 index 0000000000..1e415f1af2 --- /dev/null +++ b/bigbluebutton-web/src/java/org/bigbluebutton/api/messaging/messages/UserJoined.java @@ -0,0 +1,17 @@ +package org.bigbluebutton.api.messaging.messages; + +public class UserJoined implements IMessage { + public final String meetingId; + public final String userId; + public final String externalUserId; + public final String name; + public final String role; + + public UserJoined(String meetingId, String userId, String externalUserId, String name, String role) { + this.meetingId = meetingId; + this.userId = userId; + this.externalUserId = externalUserId; + this.name = name; + this.role = role; + } +} diff --git a/bigbluebutton-web/src/java/org/bigbluebutton/api/messaging/messages/UserLeft.java b/bigbluebutton-web/src/java/org/bigbluebutton/api/messaging/messages/UserLeft.java new file mode 100755 index 0000000000..401f77b3f4 --- /dev/null +++ b/bigbluebutton-web/src/java/org/bigbluebutton/api/messaging/messages/UserLeft.java @@ -0,0 +1,11 @@ +package org.bigbluebutton.api.messaging.messages; + +public class UserLeft implements IMessage { + public final String userId; + public final String meetingId; + + public UserLeft(String meetingId, String userId) { + this.meetingId = meetingId; + this.userId = userId; + } +} diff --git a/bigbluebutton-web/src/java/org/bigbluebutton/api/messaging/messages/UserStatusChanged.java b/bigbluebutton-web/src/java/org/bigbluebutton/api/messaging/messages/UserStatusChanged.java new file mode 100755 index 0000000000..2b1e17e35f --- /dev/null +++ b/bigbluebutton-web/src/java/org/bigbluebutton/api/messaging/messages/UserStatusChanged.java @@ -0,0 +1,15 @@ +package org.bigbluebutton.api.messaging.messages; + +public class UserStatusChanged implements IMessage { + public final String meetingId; + public final String userId; + public final String status; + public final String value; + + public UserStatusChanged(String meetingId, String userId, String status, String value) { + this.meetingId = meetingId; + this.userId = userId; + this.status = status; + this.value = value; + } +} diff --git a/bigbluebutton-web/src/java/org/bigbluebutton/presentation/imp/PdfToSwfSlidesGenerationService.java b/bigbluebutton-web/src/java/org/bigbluebutton/presentation/imp/PdfToSwfSlidesGenerationService.java old mode 100644 new mode 100755 diff --git a/bigbluebutton-web/src/java/org/bigbluebutton/web/services/KeepAliveMessage.java b/bigbluebutton-web/src/java/org/bigbluebutton/web/services/KeepAliveMessage.java old mode 100644 new mode 100755 diff --git a/bigbluebutton-web/src/java/org/bigbluebutton/web/services/KeepAlivePing.java b/bigbluebutton-web/src/java/org/bigbluebutton/web/services/KeepAlivePing.java old mode 100644 new mode 100755 diff --git a/bigbluebutton-web/src/java/org/bigbluebutton/web/services/KeepAlivePong.java b/bigbluebutton-web/src/java/org/bigbluebutton/web/services/KeepAlivePong.java old mode 100644 new mode 100755 diff --git a/bigbluebutton-web/src/java/org/bigbluebutton/web/services/KeepAliveService.java b/bigbluebutton-web/src/java/org/bigbluebutton/web/services/KeepAliveService.java old mode 100644 new mode 100755 index ef668f4de4..19262e51d1 --- a/bigbluebutton-web/src/java/org/bigbluebutton/web/services/KeepAliveService.java +++ b/bigbluebutton-web/src/java/org/bigbluebutton/web/services/KeepAliveService.java @@ -19,9 +19,13 @@ package org.bigbluebutton.web.services; +import org.bigbluebutton.api.messaging.MessageListener; import org.bigbluebutton.api.messaging.MessagingService; import org.bigbluebutton.api.messaging.MessagingConstants; import org.bigbluebutton.api.messaging.RedisMessagingService; +import org.bigbluebutton.api.messaging.messages.IMessage; +import org.bigbluebutton.api.messaging.messages.KeepAliveReply; +import org.bigbluebutton.api.messaging.messages.MeetingDestroyed; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Timer; @@ -35,7 +39,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import com.google.gson.Gson; -public class KeepAliveService { +public class KeepAliveService implements MessageListener { private static Logger log = LoggerFactory.getLogger(KeepAliveService.class); private final String KEEP_ALIVE_REQUEST = "KEEP_ALIVE_REQUEST"; private MessagingService service; @@ -81,12 +85,6 @@ public class KeepAliveService { } } - public void keepAliveReply(String aliveId) { - log.debug("Received keep alive msg reply from bbb-apps. id [{}]", aliveId); - KeepAlivePong pong = new KeepAlivePong(aliveId); - queueMessage(pong); - } - public boolean isDown(){ return !available; } @@ -167,4 +165,18 @@ public class KeepAliveService { log.info("Received invalid keep alive response from bbb-apps:" + msg.getId()); } } + + private void keepAliveReply(String aliveId) { + log.debug("Received keep alive msg reply from bbb-apps. id [{}]", aliveId); + KeepAlivePong pong = new KeepAlivePong(aliveId); + queueMessage(pong); + } + + @Override + public void handle(IMessage message) { + if (message instanceof KeepAliveReply) { + KeepAliveReply msg = (KeepAliveReply) message; + keepAliveReply(msg.pongId); + } + } } \ No newline at end of file