diff --git a/bigbluebutton-web/src/java/org/bigbluebutton/api/MeetingService.java b/bigbluebutton-web/src/java/org/bigbluebutton/api/MeetingService.java index a5ba5bafef..db5dbbfc51 100644 --- a/bigbluebutton-web/src/java/org/bigbluebutton/api/MeetingService.java +++ b/bigbluebutton-web/src/java/org/bigbluebutton/api/MeetingService.java @@ -61,7 +61,8 @@ public class MeetingService implements MessageListener { private volatile boolean processMessage = false; private final Executor msgProcessorExec = Executors.newSingleThreadExecutor(); - + private final Executor runExec = Executors.newSingleThreadExecutor(); + /** * http://ria101.wordpress.com/2011/12/12/concurrenthashmap-avoid-a-common-misuse/ */ @@ -425,32 +426,38 @@ public class MeetingService implements MessageListener { log.warn("The meeting " + message.meetingId + " doesn't exist"); } - private void processMessage(IMessage message) { - if (message instanceof MeetingDestroyed) { - - } else if (message instanceof MeetingStarted) { - meetingStarted((MeetingStarted)message); - } else if (message instanceof MeetingEnded) { - log.info("Processing meeting ended request."); - meetingEnded((MeetingEnded)message); - } else if (message instanceof UserJoined) { - log.info("Processing user joined message."); - userJoined((UserJoined)message); - } else if (message instanceof UserLeft) { - log.info("Processing user left message."); - userLeft((UserLeft)message); - } else if (message instanceof UserStatusChanged) { - updatedStatus((UserStatusChanged)message); - } else if (message instanceof RemoveExpiredMeetings) { - checkAndRemoveExpiredMeetings(); - } else if (message instanceof CreateMeeting) { - processCreateMeeting((CreateMeeting)message); - } else if (message instanceof EndMeeting) { - log.info("Processing end meeting request."); - processEndMeeting((EndMeeting)message); - } else if (message instanceof RegisterUser) { - processRegisterUser((RegisterUser) message); - } + private void processMessage(final IMessage message) { + Runnable task = new Runnable() { + public void run() { + if (message instanceof MeetingDestroyed) { + + } else if (message instanceof MeetingStarted) { + meetingStarted((MeetingStarted)message); + } else if (message instanceof MeetingEnded) { + log.info("Processing meeting ended request."); + meetingEnded((MeetingEnded)message); + } else if (message instanceof UserJoined) { + log.info("Processing user joined message."); + userJoined((UserJoined)message); + } else if (message instanceof UserLeft) { + log.info("Processing user left message."); + userLeft((UserLeft)message); + } else if (message instanceof UserStatusChanged) { + updatedStatus((UserStatusChanged)message); + } else if (message instanceof RemoveExpiredMeetings) { + checkAndRemoveExpiredMeetings(); + } else if (message instanceof CreateMeeting) { + processCreateMeeting((CreateMeeting)message); + } else if (message instanceof EndMeeting) { + log.info("Processing end meeting request."); + processEndMeeting((EndMeeting)message); + } else if (message instanceof RegisterUser) { + processRegisterUser((RegisterUser) message); + } + } + }; + + runExec.execute(task); } @Override diff --git a/bigbluebutton-web/src/java/org/bigbluebutton/api/messaging/MessageReceiver.java b/bigbluebutton-web/src/java/org/bigbluebutton/api/messaging/MessageReceiver.java old mode 100755 new mode 100644 index f87bec6474..3a955de422 --- a/bigbluebutton-web/src/java/org/bigbluebutton/api/messaging/MessageReceiver.java +++ b/bigbluebutton-web/src/java/org/bigbluebutton/api/messaging/MessageReceiver.java @@ -17,7 +17,8 @@ public class MessageReceiver { private volatile boolean receiveMessage = false; private final Executor msgReceiverExec = Executors.newSingleThreadExecutor(); - + private final Executor runExec = Executors.newSingleThreadExecutor(); + public void stop() { receiveMessage = false; } @@ -61,8 +62,14 @@ public class MessageReceiver { } @Override - public void onPMessage(String pattern, String channel, String message) { - handler.handleMessage(pattern, channel, message); + public void onPMessage(final String pattern, final String channel, final String message) { + Runnable task = new Runnable() { + public void run() { + handler.handleMessage(pattern, channel, message); + } + }; + + runExec.execute(task); } @Override diff --git a/bigbluebutton-web/src/java/org/bigbluebutton/api/messaging/MessageSender.java b/bigbluebutton-web/src/java/org/bigbluebutton/api/messaging/MessageSender.java old mode 100755 new mode 100644 index d6bfd71d45..9369cab6a3 --- a/bigbluebutton-web/src/java/org/bigbluebutton/api/messaging/MessageSender.java +++ b/bigbluebutton-web/src/java/org/bigbluebutton/api/messaging/MessageSender.java @@ -16,6 +16,7 @@ public class MessageSender { private volatile boolean sendMessage = false; private final Executor msgSenderExec = Executors.newSingleThreadExecutor(); + private final Executor runExec = Executors.newSingleThreadExecutor(); private BlockingQueue messages = new LinkedBlockingQueue(); public void stop() { @@ -50,15 +51,21 @@ public class MessageSender { messages.add(msg); } - private void publish(String channel, String message) { - Jedis jedis = redisPool.getResource(); - try { - jedis.publish(channel, message); - } catch(Exception e){ - log.warn("Cannot publish the message to redis", e); - } finally { - redisPool.returnResource(jedis); - } + private void publish(final String channel, final String message) { + Runnable task = new Runnable() { + public void run() { + Jedis jedis = redisPool.getResource(); + try { + jedis.publish(channel, message); + } catch(Exception e){ + log.warn("Cannot publish the message to redis", e); + } finally { + redisPool.returnResource(jedis); + } + } + }; + + runExec.execute(task); } public void setRedisPool(JedisPool redisPool){ diff --git a/bigbluebutton-web/src/java/org/bigbluebutton/api/messaging/ReceivedMessageHandler.java b/bigbluebutton-web/src/java/org/bigbluebutton/api/messaging/ReceivedMessageHandler.java old mode 100755 new mode 100644 index b5620aa127..9b9ce89c0d --- a/bigbluebutton-web/src/java/org/bigbluebutton/api/messaging/ReceivedMessageHandler.java +++ b/bigbluebutton-web/src/java/org/bigbluebutton/api/messaging/ReceivedMessageHandler.java @@ -15,7 +15,7 @@ public class ReceivedMessageHandler { private volatile boolean processMessage = false; private final Executor msgProcessorExec = Executors.newSingleThreadExecutor(); - + private final Executor runExec = Executors.newSingleThreadExecutor(); private MessageDistributor handler; @@ -47,14 +47,20 @@ public class ReceivedMessageHandler { } } - private void processMessage(ReceivedMessage msg) { - if (handler != null) { - log.debug("Let's process this message: " + msg.getMessage()); + private void processMessage(final ReceivedMessage msg) { + Runnable task = new Runnable() { + public void run() { + if (handler != null) { + log.debug("Let's process this message: " + msg.getMessage()); - handler.notifyListeners(msg.getPattern(), msg.getChannel(), msg.getMessage()); - } else { - log.warn("No listeners interested in messages from Redis!"); - } + handler.notifyListeners(msg.getPattern(), msg.getChannel(), msg.getMessage()); + } else { + log.warn("No listeners interested in messages from Redis!"); + } + } + }; + + runExec.execute(task); } public void handleMessage(String pattern, String channel, String message) { diff --git a/bigbluebutton-web/src/java/org/bigbluebutton/web/services/KeepAliveService.java b/bigbluebutton-web/src/java/org/bigbluebutton/web/services/KeepAliveService.java index c984faae30..6051bb0da1 100644 --- a/bigbluebutton-web/src/java/org/bigbluebutton/web/services/KeepAliveService.java +++ b/bigbluebutton-web/src/java/org/bigbluebutton/web/services/KeepAliveService.java @@ -53,6 +53,7 @@ public class KeepAliveService implements MessageListener { private static final int SENDERTHREADS = 1; private static final Executor msgSenderExec = Executors.newFixedThreadPool(SENDERTHREADS); + private static final Executor runExec = Executors.newFixedThreadPool(SENDERTHREADS); private BlockingQueue messages = new LinkedBlockingQueue(); @@ -119,12 +120,18 @@ public class KeepAliveService implements MessageListener { msgSenderExec.execute(sender); } - private void processMessage(KeepAliveMessage msg) { - if (msg instanceof KeepAlivePing) { - processPing((KeepAlivePing) msg); - } else if (msg instanceof KeepAlivePong) { - processPong((KeepAlivePong) msg); - } + private void processMessage(final KeepAliveMessage msg) { + Runnable task = new Runnable() { + public void run() { + if (msg instanceof KeepAlivePing) { + processPing((KeepAlivePing) msg); + } else if (msg instanceof KeepAlivePong) { + processPong((KeepAlivePong) msg); + } + } + }; + + runExec.execute(task); } private void processPing(KeepAlivePing msg) {