- cancel the message sending task when it takes a long time to complete. The thread

may have blocked when the rtmp connection was closed.
This commit is contained in:
Richard Alam 2015-11-17 20:44:15 -05:00
parent de71d17d73
commit 22b3a4e2a0

View File

@ -22,10 +22,15 @@ import java.util.Set;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.red5.logging.Red5LoggerFactory;
import org.red5.server.api.IConnection;
import org.red5.server.api.scope.IScope;
@ -36,6 +41,7 @@ import org.red5.server.api.so.ISharedObjectService;
import org.red5.server.so.SharedObjectService;
import org.red5.server.util.ScopeUtils;
import org.slf4j.Logger;
import com.google.gson.Gson;
public class ConnectionInvokerService {
@ -44,12 +50,14 @@ public class ConnectionInvokerService {
private final String CONN = "RED5-";
private static final int NTHREADS = 1;
private static final Executor exec = Executors.newFixedThreadPool(NTHREADS);
private static final Executor runExec = Executors.newFixedThreadPool(NTHREADS);
private static final ExecutorService runExec = Executors.newFixedThreadPool(3);
private BlockingQueue<ClientMessage> messages;
private volatile boolean sendMessages = false;
private IScope bbbAppScope;
private final long SEND_TIMEOUT = 5000000000L; // 5s
public ConnectionInvokerService() {
messages = new LinkedBlockingQueue<ClientMessage>();
}
@ -79,6 +87,7 @@ public class ConnectionInvokerService {
public void stop() {
sendMessages = false;
runExec.shutdown();
}
public void sendMessage(final ClientMessage message) {
@ -157,7 +166,7 @@ public class ConnectionInvokerService {
}
}
}
private void sendDirectMessage(final DirectClientMessage msg) {
if (log.isTraceEnabled()) {
Gson gson = new Gson();
@ -177,11 +186,13 @@ public class ConnectionInvokerService {
List<Object> params = new ArrayList<Object>();
params.add(msg.getMessageName());
params.add(msg.getMessage());
if (log.isTraceEnabled()) {
Gson gson = new Gson();
String json = gson.toJson(msg.getMessage());
log.trace("Send direct message: " + msg.getMessageName() + " msg=" + json);
}
ServiceUtils.invokeOnConnection(conn, "onMessageFromServer", params.toArray());
}
} else {
@ -191,9 +202,30 @@ public class ConnectionInvokerService {
}
}
};
runExec.execute(sender);
}
/**
* We need to add a way to cancel sending when the thread is blocked.
* Red5 uses a semaphore to guard the rtmp connection and we've seen
* instances where our thread is blocked preventing us from sending messages
* to other connections. (ralam nov 19, 2015)
*/
long endNanos = System.nanoTime() + SEND_TIMEOUT;
Future<?> f = runExec.submit(sender);
try {
// Only wait for the remaining time budget
long timeLeft = endNanos - System.nanoTime();
f.get(timeLeft, TimeUnit.NANOSECONDS);
} catch (ExecutionException e) {
log.warn("ExecutionException while sending direct message on connection[" + sessionId + "]");
} catch (InterruptedException e) {
log.warn("Interrupted exception while sending direct message on connection[" + sessionId + "]");
Thread.currentThread().interrupt();
} catch (TimeoutException e) {
log.warn("Timeout exception while sending direct message on connection[" + sessionId + "]");
f.cancel(true);
}
}
private void sendBroadcastMessage(final BroadcastClientMessage msg) {
if (log.isTraceEnabled()) {
Gson gson = new Gson();
@ -217,7 +249,28 @@ public class ConnectionInvokerService {
}
}
};
runExec.execute(sender);
/**
* We need to add a way to cancel sending when the thread is blocked.
* Red5 uses a semaphore to guard the rtmp connection and we've seen
* instances where our thread is blocked preventing us from sending messages
* to other connections. (ralam nov 19, 2015)
*/
long endNanos = System.nanoTime() + SEND_TIMEOUT;
Future<?> f = runExec.submit(sender);
try {
// Only wait for the remaining time budget
long timeLeft = endNanos - System.nanoTime();
f.get(timeLeft, TimeUnit.NANOSECONDS);
} catch (ExecutionException e) {
log.warn("ExecutionException while sending broadcast message[" + msg.getMessageName() + "]");
} catch (InterruptedException e) {
log.warn("Interrupted exception while sending direct message[" + msg.getMessageName() + "]");
Thread.currentThread().interrupt();
} catch (TimeoutException e) {
log.warn("Timeout exception while sending direct message[" + msg.getMessageName() + "]");
f.cancel(true);
}
}
private IConnection getConnection(IScope scope, String userID) {