- minimize the number of threads required for doc conversion

This commit is contained in:
Richard Alam 2014-07-22 14:42:30 -07:00
parent 89af96714f
commit fee5a208c8
5 changed files with 87 additions and 53 deletions

View File

@ -61,7 +61,8 @@ public class MeetingService implements MessageListener {
private volatile boolean processMessage = false;
private final Executor msgProcessorExec = Executors.newSingleThreadExecutor();
private final Executor runExec = Executors.newSingleThreadExecutor();
/**
* http://ria101.wordpress.com/2011/12/12/concurrenthashmap-avoid-a-common-misuse/
*/
@ -425,32 +426,38 @@ public class MeetingService implements MessageListener {
log.warn("The meeting " + message.meetingId + " doesn't exist");
}
private void processMessage(IMessage message) {
if (message instanceof MeetingDestroyed) {
} else if (message instanceof MeetingStarted) {
meetingStarted((MeetingStarted)message);
} else if (message instanceof MeetingEnded) {
log.info("Processing meeting ended request.");
meetingEnded((MeetingEnded)message);
} else if (message instanceof UserJoined) {
log.info("Processing user joined message.");
userJoined((UserJoined)message);
} else if (message instanceof UserLeft) {
log.info("Processing user left message.");
userLeft((UserLeft)message);
} else if (message instanceof UserStatusChanged) {
updatedStatus((UserStatusChanged)message);
} else if (message instanceof RemoveExpiredMeetings) {
checkAndRemoveExpiredMeetings();
} else if (message instanceof CreateMeeting) {
processCreateMeeting((CreateMeeting)message);
} else if (message instanceof EndMeeting) {
log.info("Processing end meeting request.");
processEndMeeting((EndMeeting)message);
} else if (message instanceof RegisterUser) {
processRegisterUser((RegisterUser) message);
}
private void processMessage(final IMessage message) {
Runnable task = new Runnable() {
public void run() {
if (message instanceof MeetingDestroyed) {
} else if (message instanceof MeetingStarted) {
meetingStarted((MeetingStarted)message);
} else if (message instanceof MeetingEnded) {
log.info("Processing meeting ended request.");
meetingEnded((MeetingEnded)message);
} else if (message instanceof UserJoined) {
log.info("Processing user joined message.");
userJoined((UserJoined)message);
} else if (message instanceof UserLeft) {
log.info("Processing user left message.");
userLeft((UserLeft)message);
} else if (message instanceof UserStatusChanged) {
updatedStatus((UserStatusChanged)message);
} else if (message instanceof RemoveExpiredMeetings) {
checkAndRemoveExpiredMeetings();
} else if (message instanceof CreateMeeting) {
processCreateMeeting((CreateMeeting)message);
} else if (message instanceof EndMeeting) {
log.info("Processing end meeting request.");
processEndMeeting((EndMeeting)message);
} else if (message instanceof RegisterUser) {
processRegisterUser((RegisterUser) message);
}
}
};
runExec.execute(task);
}
@Override

View File

@ -17,7 +17,8 @@ public class MessageReceiver {
private volatile boolean receiveMessage = false;
private final Executor msgReceiverExec = Executors.newSingleThreadExecutor();
private final Executor runExec = Executors.newSingleThreadExecutor();
public void stop() {
receiveMessage = false;
}
@ -61,8 +62,14 @@ public class MessageReceiver {
}
@Override
public void onPMessage(String pattern, String channel, String message) {
handler.handleMessage(pattern, channel, message);
public void onPMessage(final String pattern, final String channel, final String message) {
Runnable task = new Runnable() {
public void run() {
handler.handleMessage(pattern, channel, message);
}
};
runExec.execute(task);
}
@Override

View File

@ -16,6 +16,7 @@ public class MessageSender {
private volatile boolean sendMessage = false;
private final Executor msgSenderExec = Executors.newSingleThreadExecutor();
private final Executor runExec = Executors.newSingleThreadExecutor();
private BlockingQueue<MessageToSend> messages = new LinkedBlockingQueue<MessageToSend>();
public void stop() {
@ -50,15 +51,21 @@ public class MessageSender {
messages.add(msg);
}
private void publish(String channel, String message) {
Jedis jedis = redisPool.getResource();
try {
jedis.publish(channel, message);
} catch(Exception e){
log.warn("Cannot publish the message to redis", e);
} finally {
redisPool.returnResource(jedis);
}
private void publish(final String channel, final String message) {
Runnable task = new Runnable() {
public void run() {
Jedis jedis = redisPool.getResource();
try {
jedis.publish(channel, message);
} catch(Exception e){
log.warn("Cannot publish the message to redis", e);
} finally {
redisPool.returnResource(jedis);
}
}
};
runExec.execute(task);
}
public void setRedisPool(JedisPool redisPool){

View File

@ -15,7 +15,7 @@ public class ReceivedMessageHandler {
private volatile boolean processMessage = false;
private final Executor msgProcessorExec = Executors.newSingleThreadExecutor();
private final Executor runExec = Executors.newSingleThreadExecutor();
private MessageDistributor handler;
@ -47,14 +47,20 @@ public class ReceivedMessageHandler {
}
}
private void processMessage(ReceivedMessage msg) {
if (handler != null) {
log.debug("Let's process this message: " + msg.getMessage());
private void processMessage(final ReceivedMessage msg) {
Runnable task = new Runnable() {
public void run() {
if (handler != null) {
log.debug("Let's process this message: " + msg.getMessage());
handler.notifyListeners(msg.getPattern(), msg.getChannel(), msg.getMessage());
} else {
log.warn("No listeners interested in messages from Redis!");
}
handler.notifyListeners(msg.getPattern(), msg.getChannel(), msg.getMessage());
} else {
log.warn("No listeners interested in messages from Redis!");
}
}
};
runExec.execute(task);
}
public void handleMessage(String pattern, String channel, String message) {

View File

@ -53,6 +53,7 @@ public class KeepAliveService implements MessageListener {
private static final int SENDERTHREADS = 1;
private static final Executor msgSenderExec = Executors.newFixedThreadPool(SENDERTHREADS);
private static final Executor runExec = Executors.newFixedThreadPool(SENDERTHREADS);
private BlockingQueue<KeepAliveMessage> messages = new LinkedBlockingQueue<KeepAliveMessage>();
@ -119,12 +120,18 @@ public class KeepAliveService implements MessageListener {
msgSenderExec.execute(sender);
}
private void processMessage(KeepAliveMessage msg) {
if (msg instanceof KeepAlivePing) {
processPing((KeepAlivePing) msg);
} else if (msg instanceof KeepAlivePong) {
processPong((KeepAlivePong) msg);
}
private void processMessage(final KeepAliveMessage msg) {
Runnable task = new Runnable() {
public void run() {
if (msg instanceof KeepAlivePing) {
processPing((KeepAlivePing) msg);
} else if (msg instanceof KeepAlivePong) {
processPong((KeepAlivePong) msg);
}
}
};
runExec.execute(task);
}
private void processPing(KeepAlivePing msg) {