- change how we decode byte stream into frames

- add an end frame delimeter so we can handle frames easily
This commit is contained in:
Richard Alam 2012-04-12 21:49:37 +00:00
parent 17b3922295
commit 49bab2514b
5 changed files with 148 additions and 162 deletions

View File

@ -21,6 +21,7 @@
*/
package org.bigbluebutton.deskshare.server.socket;
import org.apache.mina.core.future.CloseFuture;
import org.bigbluebutton.deskshare.server.session.ISessionManagerGateway;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IdleStatus;
@ -36,13 +37,24 @@ public class BlockStreamEventMessageHandler extends IoHandlerAdapter {
final private Logger log = Red5LoggerFactory.getLogger(BlockStreamEventMessageHandler.class, "deskshare");
private ISessionManagerGateway sessionManager;
private static final String ROOM = "ROOM";
// @Override
// public void exceptionCaught( IoSession session, Throwable cause ) throws Exception
// {
// log.warn(cause.toString() + " \n " + cause.getMessage());
// cause.printStackTrace();
// }
@Override
public void exceptionCaught( IoSession session, Throwable cause ) throws Exception {
log.warn(cause.toString() + " \n " + cause.getMessage());
cause.printStackTrace();
closeSession(session);
}
private void closeSession(IoSession session) {
String room = (String)session.getAttribute(ROOM, null);
if (room != null) {
log.info("Closing session [" + room + "]. ");
} else {
log.info("Cannot determine session to close.");
}
CloseFuture future = session.close(true);
}
@Override
public void messageReceived( IoSession session, Object message ) throws Exception

View File

@ -25,8 +25,6 @@ import java.awt.Point;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.util.Arrays;
import org.apache.mina.core.future.CloseFuture;
import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
@ -43,83 +41,94 @@ public class BlockStreamProtocolDecoder extends CumulativeProtocolDecoder {
final private Logger log = Red5LoggerFactory.getLogger(BlockStreamProtocolDecoder.class, "deskshare");
private static final String ROOM = "ROOM";
private static final byte[] END_FRAME = new byte[] {'D', 'S', '-', 'E', 'N', 'D'};
private static final byte[] HEADER = new byte[] {'B', 'B', 'B', '-', 'D', 'S'};
private static final byte CAPTURE_START_EVENT = 0;
private static final byte CAPTURE_UPDATE_EVENT = 1;
private static final byte CAPTURE_END_EVENT = 2;
private static final byte MOUSE_LOCATION_EVENT = 3;
protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
try {
// Let's work with a buffer that contains header and the message length,
// ten (10) should be enough since header (6-bytes) plus length (4-bytes)
if (in.remaining() < 10) return false;
// Remember the initial position.
int start = in.position();
byte[] endFrame = new byte[END_FRAME.length];
// Now find the END FRAME delimeter in the buffer.
int curpos = 0;
while (in.remaining() > END_FRAME.length) {
curpos = in.position();
in.get(endFrame);
if (Arrays.equals(endFrame, END_FRAME)) {
//log.debug("***** END FRAME {} = {}", endFrame, END_FRAME);
// Remember the current position and limit.
int position = in.position();
int limit = in.limit();
try {
in.position(start);
in.limit(position);
// The bytes between in.position() and in.limit()
// now contain a full frame.
parseFrame(session, in.slice(), out);
} finally {
// Set the position to point right after the
// detected END FRAME and set the limit to the old
// one.
in.position(position);
in.limit(limit);
}
return true;
}
in.position(curpos+1);
}
// Could not find END FRAME in the buffer. Reset the initial
// position to the one we recorded above.
in.position(start);
return false;
}
private void parseFrame(IoSession session, IoBuffer in, ProtocolDecoderOutput out) {
//log.debug("Frame = {}", in.toString());
try {
byte[] header = new byte[HEADER.length];
int start = in.position();
in.get(header, 0, HEADER.length);
if (!Arrays.equals(header, HEADER)) {
log.info("Closing session. Invalid header.");
closeSession(session, out);
if (! Arrays.equals(header, HEADER)) {
log.info("Invalid header. Discarding. {}", header);
return;
}
int messageLength = in.getInt();
if (in.remaining() < messageLength) {
in.position(start);
return false;
log.info("Invalid length. Discarding. [{} < {}]", in.remaining(), messageLength);
return;
}
decodeMessage(session, in, out);
return true;
return;
} catch (Exception e) {
// throwAwayCorruptedPacket(in);
// Integer numErrors = (Integer)session.getAttribute("NUM_ERRORS", 0);
// session.setAttribute("NUM_ERRORS", numErrors++);
// if (numErrors > 50) {
log.info("Closing session. Too many corrupt packets.");
closeSession(session, out);
// }
return true;
}
log.warn("Failed to parse frame. Discarding.");
}
}
private void closeSession(IoSession session, ProtocolDecoderOutput out) {
log.info("Closing session");
int seqNum = 0;
String room = (String)session.getAttribute(ROOM, null);
if (room != null) {
log.info("Closing session [" + room + "]. ");
CaptureEndBlockEvent ceb = new CaptureEndBlockEvent(room, seqNum);
out.write(ceb);
} else {
log.info("Cannot determine session to close.");
}
CloseFuture future = session.close(true);
}
private void decodeMessage(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
byte event = in.get();
switch (event) {
case CAPTURE_START_EVENT:
System.out.println("Decoding CAPTURE_START_EVENT");
log.info("Decoding CAPTURE_START_EVENT");
decodeCaptureStartEvent(session, in, out);
break;
case CAPTURE_UPDATE_EVENT:
// System.out.println("Decoding CAPTURE_UPDATE_EVENT");
//log.info("Decoding CAPTURE_UPDATE_EVENT");
decodeCaptureUpdateEvent(session, in, out);
break;
case CAPTURE_END_EVENT:
log.warn("Got CAPTURE_END_EVENT event: " + event);
System.out.println("Got CAPTURE_END_EVENT event: " + event);
log.info("Got CAPTURE_END_EVENT event: " + event);
decodeCaptureEndEvent(session, in, out);
break;
case MOUSE_LOCATION_EVENT:
@ -130,96 +139,61 @@ public class BlockStreamProtocolDecoder extends CumulativeProtocolDecoder {
throw new Exception("Unknown event: " + event);
}
}
private static final byte CR = 13;
private static final byte LF = 10;
private void throwAwayCorruptedPacket(IoBuffer in) {
// Remember the initial position.
int start = in.position();
private void decodeMouseLocationEvent(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
String room = decodeRoom(session, in);
if ("".equals(room)) {
log.warn("Empty meeting name in decoding mouse location.");
throw new Exception("Empty meeting name in decoding mouse location.");
}
int seqNum = in.getInt();
int mouseX = in.getInt();
int mouseY = in.getInt();
/** Swallow end frame **/
in.get(new byte[END_FRAME.length]);
// Now find the first CRLF in the buffer.
byte previous = 0;
while (in.hasRemaining()) {
byte current = in.get();
if (previous == CR && current == LF) {
// Remember the current position and limit.
int position = in.position();
int limit = in.limit();
try {
in.position(start);
in.limit(position);
// The bytes between in.position() and in.limit()
// now contain a full CRLF terminated byte stream.
log.info("Throwing corrupted packet.");
in.slice();
} finally {
// Set the position to point right after the
// detected CRLF and set the limit to the old
// one.
in.position(position);
in.limit(limit);
}
return;
}
previous = current;
}
log.warn("Could not find end of corrupted packet.");
// Could not find CRLF in the buffer. Reset the initial
// position to the one we recorded above.
in.position(start);
MouseLocationEvent event = new MouseLocationEvent(room, new Point(mouseX, mouseY), seqNum);
out.write(event);
}
private void decodeMouseLocationEvent(IoSession session, IoBuffer in, ProtocolDecoderOutput out) {
private void decodeCaptureEndEvent(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
String room = decodeRoom(session, in);
if ("".equals(room)) {
log.warn("Empty meeting name in decoding capture end event.");
throw new Exception("Empty meeting name in decoding capture end event.");
}
log.info("CaptureEndEvent for " + room);
int seqNum = in.getInt();
int mouseX = in.getInt();
int mouseY = in.getInt();
/** Swallow CRLF **/
byte cr = in.get();
byte lf = in.get();
MouseLocationEvent event = new MouseLocationEvent(room, new Point(mouseX, mouseY), seqNum);
/** Swallow end frame **/
in.get(new byte[END_FRAME.length]);
CaptureEndBlockEvent event = new CaptureEndBlockEvent(room, seqNum);
out.write(event);
}
private void decodeCaptureEndEvent(IoSession session, IoBuffer in, ProtocolDecoderOutput out) {
private void decodeCaptureStartEvent(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
String room = decodeRoom(session, in);
if (! "".equals(room)) {
log.info("CaptureEndEvent for " + room);
int seqNum = in.getInt();
/** Swallow CRLF **/
byte cr = in.get();
byte lf = in.get();
CaptureEndBlockEvent event = new CaptureEndBlockEvent(room, seqNum);
out.write(event);
} else {
log.warn("Room is empty.");
if ("".equals(room)) {
log.warn("Empty meeting name in decoding capture start event.");
throw new Exception("Empty meeting name in decoding capture start event.");
}
}
private void decodeCaptureStartEvent(IoSession session, IoBuffer in, ProtocolDecoderOutput out) {
String room = decodeRoom(session, in);
session.setAttribute(ROOM, room);
int seqNum = in.getInt();
Dimension blockDim = decodeDimension(in);
Dimension screenDim = decodeDimension(in);
/** Swallow CRLF **/
byte cr = in.get();
byte lf = in.get();
log.info("CaptureStartEvent for " + room);
CaptureStartBlockEvent event = new CaptureStartBlockEvent(room, screenDim, blockDim, seqNum);
out.write(event);
session.setAttribute(ROOM, room);
int seqNum = in.getInt();
Dimension blockDim = decodeDimension(in);
Dimension screenDim = decodeDimension(in);
/** Swallow end frame **/
in.get(new byte[END_FRAME.length]);
log.info("CaptureStartEvent for " + room);
CaptureStartBlockEvent event = new CaptureStartBlockEvent(room, screenDim, blockDim, seqNum);
out.write(event);
}
private Dimension decodeDimension(IoBuffer in) {
@ -247,28 +221,31 @@ public class BlockStreamProtocolDecoder extends CumulativeProtocolDecoder {
return room;
}
private void decodeCaptureUpdateEvent(IoSession session, IoBuffer in, ProtocolDecoderOutput out) {
private void decodeCaptureUpdateEvent(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
String room = decodeRoom(session, in);
int seqNum = in.getInt();
int numBlocks = in.getShort();
String blocksStr = "Blocks changed ";
for (int i = 0; i < numBlocks; i++) {
int position = in.getShort();
blocksStr += " " + position;
boolean isKeyFrame = (in.get() == 1) ? true : false;
int length = in.getInt();
byte[] data = new byte[length];
in.get(data, 0, length);
CaptureUpdateBlockEvent event = new CaptureUpdateBlockEvent(room, position, data, isKeyFrame, seqNum);
out.write(event);
if ("".equals(room)) {
log.warn("Empty meeting name in decoding capture start event.");
throw new Exception("Empty meeting name in decoding capture start event.");
}
/** Swallow CRLF **/
byte cr = in.get();
byte lf = in.get();
int seqNum = in.getInt();
int numBlocks = in.getShort();
String blocksStr = "Blocks changed ";
for (int i = 0; i < numBlocks; i++) {
int position = in.getShort();
blocksStr += " " + position;
boolean isKeyFrame = (in.get() == 1) ? true : false;
int length = in.getInt();
byte[] data = new byte[length];
in.get(data, 0, length);
CaptureUpdateBlockEvent event = new CaptureUpdateBlockEvent(room, position, data, isKeyFrame, seqNum);
out.write(event);
}
/** Swallow end frame **/
in.get(new byte[END_FRAME.length]);
}
}

View File

@ -21,17 +21,14 @@
*/
package org.bigbluebutton.deskshare.server.red5
import org.red5.server.api.{IContext, IScope, ScopeUtils, IConnection}
import org.red5.server.so.SharedObjectService
import org.red5.server.api.so.{ISharedObject, ISharedObjectService}
import org.red5.server.stream.{BroadcastScope, IBroadcastScope, IProviderService}
import org.bigbluebutton.deskshare.server.ScreenVideoBroadcastStream
import org.bigbluebutton.deskshare.server.stream.StreamManager
import org.bigbluebutton.deskshare.server.socket.DeskShareServer
import org.bigbluebutton.deskshare.server.MultiThreadedAppAdapter
import org.red5.server.api.ScopeUtils
import org.red5.server.api.IConnection
import org.red5.server.api.{IContext, IScope}
import org.red5.server.so.SharedObjectService
import org.red5.server.api.so.{ISharedObject, ISharedObjectService}
import org.red5.server.stream.{BroadcastScope, IBroadcastScope, IProviderService}
import scala.actors.Actor
import scala.actors.Actor._

View File

@ -27,7 +27,7 @@ import java.util.zip.Adler32;
import org.bigbluebutton.deskshare.common.Dimension;
public class BlockStreamProtocolEncoder {
private static final byte[] CRLF_DELIMITER = new byte[] {13, 10};
private static final byte[] END_FRAME = new byte[] {'D', 'S', '-', 'E', 'N', 'D'};
private static final byte[] HEADER = new byte[] {'B', 'B', 'B', '-', 'D', 'S'};
private static final byte CAPTURE_START_EVENT = 0;
private static final byte CAPTURE_UPDATE_EVENT = 1;
@ -117,7 +117,7 @@ public class BlockStreamProtocolEncoder {
}
public static void encodeDelimiter(ByteArrayOutputStream data) throws IOException {
data.write(CRLF_DELIMITER);
data.write(END_FRAME);
}
public static byte[] encodeChecksum(byte[] data) {