- modify to use NIO buffer and add more documentation to transcoding process

This commit is contained in:
Richard Alam 2011-01-04 16:20:28 -05:00
parent 5941e6371e
commit a246f5452c
5 changed files with 176 additions and 108 deletions

View File

@ -185,12 +185,14 @@ public class AudioBroadcastStream implements IBroadcastStream, IProvider, IPipeC
} }
} }
private final RTMPMessage msg = new RTMPMessage();
public void dispatchEvent(IEvent event) { public void dispatchEvent(IEvent event) {
// log.trace("dispatchEvent(event:{})", event); // log.trace("dispatchEvent(event:{})", event);
if (event instanceof IRTMPEvent) { if (event instanceof IRTMPEvent) {
IRTMPEvent rtmpEvent = (IRTMPEvent) event; IRTMPEvent rtmpEvent = (IRTMPEvent) event;
if (livePipe != null) { if (livePipe != null) {
RTMPMessage msg = new RTMPMessage();
msg.setBody(rtmpEvent); msg.setBody(rtmpEvent);
if (creationTime == null) if (creationTime == null)

View File

@ -156,9 +156,6 @@ public class SipToFlashAudioStream implements TranscodedAudioDataListener, RtpSt
while (processAudioData) { while (processAudioData) {
try { try {
if (streamToFlash.available() > 1000) {
long skipped = streamToFlash.skip(1000L);
}
int bytesRead = streamToFlash.read(pcmAudio, offset, remaining); int bytesRead = streamToFlash.read(pcmAudio, offset, remaining);
remaining -= bytesRead; remaining -= bytesRead;
if (remaining == 0) { if (remaining == 0) {

View File

@ -40,6 +40,8 @@ public class RtpSocket {
/** Remote port */ /** Remote port */
int r_port; int r_port;
private final byte[] payload = new byte[10];
/** Creates a new RTP socket (only receiver) */ /** Creates a new RTP socket (only receiver) */
public RtpSocket(DatagramSocket datagram_socket) { public RtpSocket(DatagramSocket datagram_socket) {
socket=datagram_socket; socket=datagram_socket;
@ -59,20 +61,24 @@ public class RtpSocket {
return socket; return socket;
} }
private final DatagramPacket rxDatagram = new DatagramPacket(payload, payload.length);
/** Receives a RTP packet from this socket */ /** Receives a RTP packet from this socket */
public void receive(RtpPacket rtpp) throws IOException { public void receive(RtpPacket rtpp) throws IOException {
DatagramPacket datagram = new DatagramPacket(rtpp.getPacket(), rtpp.getLength()); rxDatagram.setData(rtpp.getPacket());
socket.receive(datagram); socket.receive(rxDatagram);
rtpp.setPacketLength(datagram.getLength()); rtpp.setPacketLength(rxDatagram.getLength());
} }
private final DatagramPacket txDatagram = new DatagramPacket(payload, payload.length);
/** Sends a RTP packet from this socket */ /** Sends a RTP packet from this socket */
public void send(RtpPacket rtpp) throws IOException { public void send(RtpPacket rtpp) throws IOException {
DatagramPacket datagram = new DatagramPacket(rtpp.getPacket(), rtpp.getLength()); txDatagram.setData(rtpp.getPacket());
datagram.setAddress(r_addr); txDatagram.setAddress(r_addr);
datagram.setPort(r_port); txDatagram.setPort(r_port);
if (!socket.isClosed()) if (!socket.isClosed())
socket.send(datagram); socket.send(txDatagram);
} }
/** Closes this socket */ /** Closes this socket */

View File

@ -37,29 +37,48 @@ import org.red5.app.sip.codecs.asao.DecoderMap;
public class NellyFlashToSipTranscoderImp implements FlashToSipTranscoder { public class NellyFlashToSipTranscoderImp implements FlashToSipTranscoder {
protected static Logger log = Red5LoggerFactory.getLogger( NellyFlashToSipTranscoderImp.class, "sip" ); protected static Logger log = Red5LoggerFactory.getLogger( NellyFlashToSipTranscoderImp.class, "sip" );
private static final int NELLYMOSER_DECODED_PACKET_SIZE = 256; private static final int NELLY_TO_L16_AUDIO_SIZE = 256;
private static final int NELLYMOSER_ENCODED_PACKET_SIZE = 64; private static final int NELLY_AUDIO_LENGTH = 64;
private static final int MAX_BYTE = 1280; private static final int ULAW_AUDIO_LENGTH = 160;
private final FloatBuffer nellyAudio = FloatBuffer.allocate(1280);
private FloatBuffer ulawAudio;
private Codec sipCodec = null; // Sip codec to be used on audio session /**
* Max buffer length when 5 Nelly/L16 packets equals 8 Ulaw packets.
*/
private static final int MAX_BUFFER_LENGTH = 1280;
/**
* Allocate a fixed buffer length so we don't have to copy elements around. We'll use the
* position, limit, mart attributes of the NIO Buffer.
*/
private final FloatBuffer l16Audio = FloatBuffer.allocate(MAX_BUFFER_LENGTH);
/**
* This is a view read-only copy of the buffer to track which byte are being transcoded from L16->Ulaw
*/
private FloatBuffer viewBuffer;
private Codec sipCodec = null;
private Decoder decoder; private Decoder decoder;
private DecoderMap decoderMap; private DecoderMap decoderMap;
private float[] tempBuffer = new float[NELLYMOSER_DECODED_PACKET_SIZE]; // Temporary buffer with received PCM audio from FlashPlayer. private float[] tempL16Buffer = new float[NELLY_TO_L16_AUDIO_SIZE];
private float[] encodingBuffer; // Encoding buffer used to encode to final codec format; private float[] tempUlawBuffer = new float[ULAW_AUDIO_LENGTH];
private byte[] ulawEncodedBuffer = new byte[ULAW_AUDIO_LENGTH];
private long timestamp = 0; private long timestamp = 0;
private final static int TS_INCREMENT = 180; // Determined from PCAP traces. private final static int TS_INCREMENT = 180; // Determined from PCAP traces.
/**
* The transcode process works by taking a 64-byte-array Nelly audio and converting it into a 256-float-array L16 audio. From the
* 256-float-array L16 audio, we take 160-float-array and convert it to a 160-byte-array Ulaw audio. The remaining 96-float-array
* will be used in the next iteration.
* Therefore, 5 Nelly/L16 packets (5x256 = 1280) will result into 8 Ulaw packets (8x160 = 1280).
*
*/
public NellyFlashToSipTranscoderImp(Codec sipCodec) { public NellyFlashToSipTranscoderImp(Codec sipCodec) {
this.sipCodec = sipCodec; this.sipCodec = sipCodec;
encodingBuffer = new float[sipCodec.getOutgoingDecodedFrameSize()];
decoder = new Decoder(); decoder = new Decoder();
decoderMap = null; decoderMap = null;
Random rgen = new Random(); Random rgen = new Random();
timestamp = rgen.nextInt(1000); timestamp = rgen.nextInt(1000);
ulawAudio = nellyAudio.asReadOnlyBuffer(); viewBuffer = l16Audio.asReadOnlyBuffer();
} }
@Override @Override
@ -74,39 +93,64 @@ public class NellyFlashToSipTranscoderImp implements FlashToSipTranscoder {
@Override @Override
public void transcode(byte[] audioData, int startOffset, int length, TranscodedAudioDataListener listener) { public void transcode(byte[] audioData, int startOffset, int length, TranscodedAudioDataListener listener) {
byte[] codedBuffer = new byte[160]; if (audioData.length != NELLY_AUDIO_LENGTH) {
decoderMap = decoder.decode(decoderMap, audioData, 0, tempBuffer, 0); log.warn("Receiving bad nelly audio. Expecting {}, got {}.", NELLY_AUDIO_LENGTH, audioData.length);
nellyAudio.put(tempBuffer); return;
ulawAudio.get(encodingBuffer); }
int encodedBytes = sipCodec.pcmToCodec(encodingBuffer, codedBuffer);
// Convert the Nelly audio to L16.
decoderMap = decoder.decode(decoderMap, audioData, 0, tempL16Buffer, 0);
// Store the L16 audio into the buffer
l16Audio.put(tempL16Buffer);
// Read 160-float worth of audio
viewBuffer.get(tempUlawBuffer);
// Convert the L16 audio to Ulaw
int encodedBytes = sipCodec.pcmToCodec(tempUlawBuffer, ulawEncodedBuffer);
listener.handleTranscodedAudioData(codedBuffer, timestamp += TS_INCREMENT); // Send it to the server
listener.handleTranscodedAudioData(ulawEncodedBuffer, timestamp += TS_INCREMENT);
if (nellyAudio.position() == nellyAudio.capacity()) { if (l16Audio.position() == l16Audio.capacity()) {
ulawAudio.get(encodingBuffer); /**
encodedBytes = sipCodec.pcmToCodec(encodingBuffer, codedBuffer); * This means we already processed 5 Nelly packets and sent 5 Ulaw packets.
* However, we have 3 extra Ulaw packets.
* Fire them off to the server. We don't want to discard them as it will
* result in choppy audio.
*/
// Get the 6th packet and send
viewBuffer.get(tempUlawBuffer);
encodedBytes = sipCodec.pcmToCodec(tempUlawBuffer, ulawEncodedBuffer);
if (encodedBytes == sipCodec.getOutgoingEncodedFrameSize()) { if (encodedBytes == sipCodec.getOutgoingEncodedFrameSize()) {
listener.handleTranscodedAudioData(codedBuffer, timestamp += TS_INCREMENT); listener.handleTranscodedAudioData(ulawEncodedBuffer, timestamp += TS_INCREMENT);
} else {
log.error("Failure encoding buffer." );
}
ulawAudio.get(encodingBuffer);
encodedBytes = sipCodec.pcmToCodec(encodingBuffer, codedBuffer);
if (encodedBytes == sipCodec.getOutgoingEncodedFrameSize()) {
listener.handleTranscodedAudioData(codedBuffer, timestamp += TS_INCREMENT);
} else { } else {
log.error("Failure encoding buffer." ); log.error("Failure encoding buffer." );
} }
// Get the 7th packet and send
viewBuffer.get(tempUlawBuffer);
encodedBytes = sipCodec.pcmToCodec(tempUlawBuffer, ulawEncodedBuffer);
if (encodedBytes == sipCodec.getOutgoingEncodedFrameSize()) { if (encodedBytes == sipCodec.getOutgoingEncodedFrameSize()) {
listener.handleTranscodedAudioData(codedBuffer, timestamp += TS_INCREMENT); listener.handleTranscodedAudioData(ulawEncodedBuffer, timestamp += TS_INCREMENT);
} else { } else {
log.error("Failure encoding buffer." ); log.error("Failure encoding buffer." );
} }
nellyAudio.clear();
ulawAudio.clear(); // Get the 8th packet and send
viewBuffer.get(tempUlawBuffer);
encodedBytes = sipCodec.pcmToCodec(tempUlawBuffer, ulawEncodedBuffer);
if (encodedBytes == sipCodec.getOutgoingEncodedFrameSize()) {
listener.handleTranscodedAudioData(ulawEncodedBuffer, timestamp += TS_INCREMENT);
} else {
log.error("Failure encoding buffer." );
}
// Reset the buffer's position back to zero and start over.
l16Audio.clear();
viewBuffer.clear();
} }
} }
} }

View File

@ -19,6 +19,7 @@
**/ **/
package org.bigbluebutton.voiceconf.red5.media.transcoder; package org.bigbluebutton.voiceconf.red5.media.transcoder;
import java.nio.FloatBuffer;
import java.util.Random; import java.util.Random;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.red5.logging.Red5LoggerFactory; import org.red5.logging.Red5LoggerFactory;
@ -26,89 +27,107 @@ import org.red5.server.api.IConnection;
import org.red5.server.api.Red5; import org.red5.server.api.Red5;
import org.red5.server.net.rtmp.RTMPMinaConnection; import org.red5.server.net.rtmp.RTMPMinaConnection;
import org.red5.app.sip.codecs.Codec; import org.red5.app.sip.codecs.Codec;
import org.red5.app.sip.codecs.asao.ByteStream;
import org.red5.app.sip.codecs.asao.CodecImpl; import org.red5.app.sip.codecs.asao.CodecImpl;
public class NellySipToFlashTranscoderImp implements SipToFlashTranscoder { public class NellySipToFlashTranscoderImp implements SipToFlashTranscoder {
protected static Logger log = Red5LoggerFactory.getLogger(NellySipToFlashTranscoderImp.class, "sip"); protected static Logger log = Red5LoggerFactory.getLogger(NellySipToFlashTranscoderImp.class, "sip");
private static final int NELLYMOSER_DECODED_PACKET_SIZE = 256;
private static final int NELLYMOSER_ENCODED_PACKET_SIZE = 64;
private static final int NELLYMOSER_CODEC_ID = 82; private static final int NELLYMOSER_CODEC_ID = 82;
/**
* The length of resulting L16 audio converted from 160-byte Ulaw audio.
*/
private static final int L16_AUDIO_LENGTH = 256;
/**
* The length of Nelly audio that gets sent to Flash Player.
*/
private static final int NELLY_AUDIO_LENGTH = 64;
/**
* The length of received Ulaw audio.
*/
private static final int ULAW_AUDIO_LENGTH = 160;
/**
* The maximum size of our processing buffer. 8 Ulaw packets (8x160 = 1280) yields 5 L16/Nelly audio (5x256 = 1280).
*/
private static final int MAX_BUFFER_LENGTH = 1280;
/**
* Buffer that contain L16 transcoded audio from Ulaw.
*/
private final FloatBuffer l16Audio = FloatBuffer.allocate(MAX_BUFFER_LENGTH);
/*
* A view read-only buffer that keeps track of which part of the L16 buffer will be converted to Nelly.
*/
private FloatBuffer viewBuffer;
private final float[] tempL16Buffer = new float[ULAW_AUDIO_LENGTH];
private float[] tempNellyBuffer = new float[L16_AUDIO_LENGTH];
private final byte[] nellyBytes = new byte[NELLY_AUDIO_LENGTH];
private float[] encoderMap; private float[] encoderMap;
private Codec audioCodec = null; private Codec audioCodec = null;
private float[] tempBuffer; // Temporary buffer with PCM audio to be sent to FlashPlayer.
private int tempBufferOffset = 0;
private long timestamp = 0; private long timestamp = 0;
private final static int TS_INCREMENT = 32; // Determined from PCAP traces. private final static int TS_INCREMENT = 32; // Determined from PCAP traces.
/**
* The transcode takes a 160-byte Ulaw audio and converts it to a 160-float L16 audio. Whenever there is an
* available 256-float L16 audio, that gets converted into a 64-byte Nelly audio. Therefore, 8 Ulaw packets
* are needed to generate 5 Nelly packets.
* @param audioCodec
*/
public NellySipToFlashTranscoderImp(Codec audioCodec) { public NellySipToFlashTranscoderImp(Codec audioCodec) {
this.audioCodec = audioCodec; this.audioCodec = audioCodec;
encoderMap = new float[64]; encoderMap = new float[64];
tempBuffer = new float[NELLYMOSER_DECODED_PACKET_SIZE];
Random rgen = new Random(); Random rgen = new Random();
timestamp = rgen.nextInt(1000); timestamp = rgen.nextInt(1000);
} viewBuffer = l16Audio.asReadOnlyBuffer();
private void transcodePcmToNellymoser(byte[] codedBuffer, TranscodedAudioDataListener listener) {
float[] decodingBuffer = new float[codedBuffer.length];
int decodedBytes = audioCodec.codecToPcm(codedBuffer, decodingBuffer);
if (decodedBytes == audioCodec.getIncomingDecodedFrameSize()) {
int pcmBufferOffset = 0;
int copySize = 0;
boolean pcmBufferProcessed = false;
do {
if ((tempBuffer.length - tempBufferOffset) <= (decodingBuffer.length - pcmBufferOffset)) {
copySize = tempBuffer.length - tempBufferOffset;
} else {
copySize = decodingBuffer.length - pcmBufferOffset;
}
System.arraycopy(decodingBuffer, pcmBufferOffset, tempBuffer, tempBufferOffset, copySize);
tempBufferOffset += copySize;
pcmBufferOffset += copySize;
if (tempBufferOffset == NELLYMOSER_DECODED_PACKET_SIZE) {
ByteStream encodedStream = new ByteStream(NELLYMOSER_ENCODED_PACKET_SIZE);
encoderMap = CodecImpl.encode(encoderMap, tempBuffer, encodedStream.bytes);
tempBufferOffset = 0;
boolean sendPacket = true;
IConnection conn = Red5.getConnectionLocal();
if (conn instanceof RTMPMinaConnection) {
long pendingMessages = ((RTMPMinaConnection)conn).getPendingMessages();
if (pendingMessages > 25) {
sendPacket = false;
// Message backed up probably due to slow connection to client (10 message * 20ms ptime = 200 audio)
log.info("Dropping packet. Connection {} congested with {} pending messages (~500ms worth of audio) .", conn.getClient().getId(), pendingMessages);
}
}
if (sendPacket) listener.handleTranscodedAudioData(encodedStream.bytes, timestamp += TS_INCREMENT);
}
if (pcmBufferOffset == decodingBuffer.length) {
pcmBufferProcessed = true;
}
} while (!pcmBufferProcessed);
} else {
log.warn("[IncomingBytes=" + codedBuffer.length + ",DecodedBytes=" + decodedBytes +", ExpectedDecodedBytes=" + audioCodec.getIncomingDecodedFrameSize() +"]");
}
} }
@Override @Override
public void transcode(byte[] audioData, TranscodedAudioDataListener listener) { public void transcode(byte[] audioData, TranscodedAudioDataListener listener) {
transcodePcmToNellymoser(audioData, listener); if (audioData.length != ULAW_AUDIO_LENGTH) {
} log.warn("Received corrupt audio. Got {}, expected {}.", audioData.length, ULAW_AUDIO_LENGTH);
return;
}
// Convert Ulaw to L16
int decodedBytes = audioCodec.codecToPcm(audioData, tempL16Buffer);
// Store into the buffer
l16Audio.put(tempL16Buffer);
if ((l16Audio.position() - viewBuffer.position()) >= L16_AUDIO_LENGTH) {
// We have enough L16 audio to generate a Nelly audio.
// Get some L16 audio
viewBuffer.get(tempNellyBuffer);
// Convert it into Nelly
encoderMap = CodecImpl.encode(encoderMap, tempNellyBuffer, nellyBytes);
// Having done all of that, we now see if we need to send the audio or drop it.
// We have to encode to build the encoderMap so that data from previous audio packet
// will be used for the next packet.
boolean sendPacket = true;
IConnection conn = Red5.getConnectionLocal();
if (conn instanceof RTMPMinaConnection) {
long pendingMessages = ((RTMPMinaConnection)conn).getPendingMessages();
if (pendingMessages > 25) {
// Message backed up probably due to slow connection to client (25 messages * 20ms ptime = 500ms audio)
sendPacket = false;
log.info("Dropping packet. Connection {} congested with {} pending messages (~500ms worth of audio) .", conn.getClient().getId(), pendingMessages);
}
}
if (sendPacket) listener.handleTranscodedAudioData(nellyBytes, timestamp += TS_INCREMENT);
}
if (l16Audio.position() == l16Audio.capacity()) {
// We've processed 8 Ulaw packets (5 Nelly packets), reset the buffers.
l16Audio.clear();
viewBuffer.clear();
}
}
@Override @Override
public int getIncomingEncodedFrameSize() { public int getIncomingEncodedFrameSize() {
return audioCodec.getIncomingEncodedFrameSize(); return audioCodec.getIncomingEncodedFrameSize();