- continue refactoring

This commit is contained in:
Richard Alam 2010-06-18 16:19:19 -04:00
parent 696d5ef802
commit e8795dfc63
17 changed files with 647 additions and 343 deletions

2
bbb-voice/src/main/java/local/net/RtpSocket.java Normal file → Executable file
View File

@ -21,12 +21,10 @@
package local.net;
import java.net.InetAddress;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.io.IOException;
import org.zoolu.tools.Random;
/** RtpSocket implements a RTP socket for receiving and sending RTP packets.

View File

@ -3,33 +3,24 @@ package org.bigbluebutton.voiceconf.red5.media;
import java.net.DatagramSocket;
import java.net.SocketException;
import org.bigbluebutton.voiceconf.red5.media.transcoder.NellyToPcmTranscoder;
import org.bigbluebutton.voiceconf.red5.media.transcoder.PcmToNellyTranscoder;
import org.bigbluebutton.voiceconf.red5.media.transcoder.SpeexToSpeexTranscoder;
import org.bigbluebutton.voiceconf.red5.media.transcoder.Transcoder;
import org.bigbluebutton.voiceconf.sip.SipConnectInfo;
import org.red5.app.sip.codecs.Codec;
import org.red5.app.sip.codecs.SpeexCodec;
import org.red5.app.sip.stream.ListenStream;
import org.red5.app.sip.stream.ReceivedRtpPacketProcessor;
import org.red5.app.sip.stream.RtpStreamReceiver;
import org.red5.app.sip.stream.RtpStreamReceiverListener;
import org.red5.app.sip.stream.RtpStreamSender;
import org.red5.app.sip.stream.TalkStream;
import org.red5.app.sip.trancoders.NellyToPcmTranscoder;
import org.red5.app.sip.trancoders.PcmToNellyTranscoder;
import org.red5.app.sip.trancoders.SpeexToSpeexTranscoder;
import org.red5.app.sip.trancoders.Transcoder;
import org.slf4j.Logger;
import org.red5.logging.Red5LoggerFactory;
import org.red5.server.api.IScope;
import org.red5.server.api.stream.IBroadcastStream;
public class CallStream implements RtpStreamReceiverListener {
public class CallStream implements ListenStreamObserver {
private final static Logger log = Red5LoggerFactory.getLogger(CallStream.class, "sip");
private DatagramSocket socket = null;
private RtpStreamReceiver rtpReceiver;
private RtpStreamSender rtpSender;
private TalkStream talkStream;
private ListenStream listenStream;
private ReceivedRtpPacketProcessor packetProcessor;
private final Codec sipCodec;
private final SipConnectInfo connInfo;
private final IScope scope;
@ -47,9 +38,7 @@ public class CallStream implements RtpStreamReceiverListener {
log.error("SocketException while initializing DatagramSocket");
throw new Exception("Exception while initializing CallStream");
}
listenStream = new ListenStream(scope);
Transcoder rtmpToRtpTranscoder, rtpToRtmpTranscoder;
if (sipCodec.getCodecId() == SpeexCodec.codecId) {
rtmpToRtpTranscoder = new SpeexToSpeexTranscoder(sipCodec);
@ -58,11 +47,10 @@ public class CallStream implements RtpStreamReceiverListener {
rtmpToRtpTranscoder = new NellyToPcmTranscoder(sipCodec);
rtpToRtmpTranscoder = new PcmToNellyTranscoder(sipCodec, listenStream);
}
packetProcessor = new ReceivedRtpPacketProcessor(rtpToRtmpTranscoder);
rtpReceiver = new RtpStreamReceiver(packetProcessor, socket, rtpToRtmpTranscoder.getIncomingEncodedFrameSize());
rtpSender = new RtpStreamSender(rtmpToRtpTranscoder, socket, connInfo.getRemoteAddr(), connInfo.getRemotePort());
talkStream = new TalkStream(rtmpToRtpTranscoder, rtpSender);
listenStream = new ListenStream(scope, rtpToRtmpTranscoder, socket);
listenStream.addListenStreamObserver(this);
talkStream = new TalkStream(rtmpToRtpTranscoder, socket, connInfo);
}
public String getTalkStreamName() {
@ -73,42 +61,25 @@ public class CallStream implements RtpStreamReceiverListener {
return listenStream.getStreamName();
}
public void queueSipDtmfDigits(String argDigits) {
if (rtpSender != null)
rtpSender.queueSipDtmfDigits(argDigits);
public void sendSipDtmfDigits(String argDigits) throws StreamException {
if (talkStream != null)
talkStream.sendDtmfDigits(argDigits);
}
public void startTalkStream(IBroadcastStream broadcastStream, IScope scope) {
public void startTalkStream(IBroadcastStream broadcastStream, IScope scope) throws StreamException {
talkStream.start(broadcastStream, scope);
packetProcessor.start();
listenStream.start();
rtpSender.start();
rtpReceiver.setRtpStreamReceiverListener(this);
rtpReceiver.start();
}
public void stopTalkStream(IBroadcastStream broadcastStream, IScope scope) {
stopMedia();
talkStream.stop(broadcastStream, scope);
}
public boolean stopMedia() {
printLog( "stopMedia", "Halting sip audio..." );
talkStream.stop();
public void stop() {
listenStream.stop();
packetProcessor.stop();
rtpSender.stop();
rtpReceiver.stop();
return true;
}
private static void printLog( String method, String message ) {
log.debug( "SipAudioLauncher - " + method + " -> " + message );
System.out.println( "SipAudioLauncher - " + method + " -> " + message );
}
public void onStoppedReceiving() {
System.out.println("Closing socket");
@Override
public void listenStreamStopped() {
socket.close();
}
}

View File

@ -1,7 +1,9 @@
package org.bigbluebutton.voiceconf.red5.media;
import org.red5.app.sip.AudioStream;
import org.red5.app.sip.trancoders.TranscodedAudioDataListener;
import java.net.DatagramSocket;
import org.bigbluebutton.voiceconf.red5.media.transcoder.TranscodedAudioDataListener;
import org.bigbluebutton.voiceconf.red5.media.transcoder.Transcoder;
import org.red5.logging.Red5LoggerFactory;
import org.red5.server.api.IContext;
import org.red5.server.api.IScope;
@ -11,15 +13,18 @@ import org.red5.server.stream.IBroadcastScope;
import org.red5.server.stream.IProviderService;
import org.slf4j.Logger;
public class ListenStream implements TranscodedAudioDataListener {
public class ListenStream implements TranscodedAudioDataListener, RtpStreamReceiverListener {
final private Logger log = Red5LoggerFactory.getLogger(ListenStream.class, "sip");
private AudioStream broadcastStream;
private AudioBroadcastStream audioBroadcastStream;
private IScope scope;
private final String listenStreamName;
public ListenStream(IScope scope) {
private RtpStreamReceiver rtpStreamReceiver;
private ListenStreamObserver observer;
public ListenStream(IScope scope, Transcoder transcoder, DatagramSocket socket) {
this.scope = scope;
rtpStreamReceiver = new RtpStreamReceiver(transcoder, socket);
listenStreamName = "speaker_" + System.currentTimeMillis();
scope.setName(listenStreamName);
}
@ -27,58 +32,51 @@ public class ListenStream implements TranscodedAudioDataListener {
public String getStreamName() {
return listenStreamName;
}
public void addListenStreamObserver(ListenStreamObserver o) {
observer = o;
}
public void stop() {
streamEnded();
public void stop() {
rtpStreamReceiver.stop();
audioBroadcastStream.stop();
audioBroadcastStream.close();
log.debug("stopping and closing stream {}", listenStreamName);
}
public void start() {
System.out.println("**** Starting listen stream ****");
startPublishing(scope);
}
public void handleTranscodedAudioData(AudioData audioData) {
streamAudioData(audioData);
}
private void streamAudioData(AudioData audioData) {
long startRx = System.currentTimeMillis();
log.debug("started publishing stream in " + scope.getName());
audioBroadcastStream = new AudioBroadcastStream(listenStreamName);
audioBroadcastStream.setPublishedName(listenStreamName);
audioBroadcastStream.setScope(scope);
/*
* Don't set the timestamp as it results in choppy audio. Let the client
* play the audio as soon as they receive the packets. (ralam dec 10, 2009)
*/
broadcastStream.dispatchEvent(audioData);
audioData.release();
long completeRx = System.currentTimeMillis();
// System.out.println("Send took " + (completeRx - startRx) + "ms.");
}
private void streamEnded() {
broadcastStream.stop();
broadcastStream.close();
log.debug("stopping and closing stream {}", listenStreamName);
}
private void startPublishing(IScope aScope){
System.out.println("started publishing stream in " + aScope.getName());
broadcastStream = new AudioStream(listenStreamName);
broadcastStream.setPublishedName(listenStreamName);
broadcastStream.setScope(aScope);
IContext context = aScope.getContext();
IContext context = scope.getContext();
IProviderService providerService = (IProviderService) context.getBean(IProviderService.BEAN_NAME);
if (providerService.registerBroadcastStream(aScope, listenStreamName, broadcastStream)){
IBroadcastScope bScope = (BroadcastScope) providerService.getLiveProviderInput(aScope, listenStreamName, true);
if (providerService.registerBroadcastStream(scope, listenStreamName, audioBroadcastStream)){
IBroadcastScope bScope = (BroadcastScope) providerService.getLiveProviderInput(scope, listenStreamName, true);
bScope.setAttribute(IBroadcastScope.STREAM_ATTRIBUTE, broadcastStream);
bScope.setAttribute(IBroadcastScope.STREAM_ATTRIBUTE, audioBroadcastStream);
} else{
log.error("could not register broadcast stream");
throw new RuntimeException("could not register broadcast stream");
}
broadcastStream.start();
audioBroadcastStream.start();
rtpStreamReceiver.start();
}
public void handleTranscodedAudioData(AudioData audioData) {
/* NOTE:
* Don't set the timestamp as it results in choppy audio. Let the client
* play the audio as soon as they receive the packets. (ralam dec 10, 2009)
*/
audioBroadcastStream.dispatchEvent(audioData);
audioData.release();
}
@Override
public void onStoppedReceiving() {
if (observer != null) observer.listenStreamStopped();
}
}

View File

@ -0,0 +1,6 @@
package org.bigbluebutton.voiceconf.red5.media;
public interface ListenStreamObserver {
void listenStreamStopped();
}

View File

@ -1,63 +0,0 @@
package org.bigbluebutton.voiceconf.red5.media;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import org.red5.app.sip.trancoders.Transcoder;
import org.red5.logging.Red5LoggerFactory;
import org.slf4j.Logger;
import local.net.RtpPacket;
public class ReceivedRtpPacketProcessor {
final private Logger log = Red5LoggerFactory.getLogger(ReceivedRtpPacketProcessor.class, "sip");
private BlockingQueue<RtpPacket> packets = new LinkedBlockingQueue<RtpPacket>();
private final Executor exec = Executors.newSingleThreadExecutor();
private Runnable packetProcessor;
private volatile boolean processPacket = false;
private final Transcoder transcoder;
public ReceivedRtpPacketProcessor(Transcoder transcoder) {
this.transcoder = transcoder;
}
public void start() {
processPacket = true;
packetProcessor = new Runnable() {
public void run() {
while (processPacket) {
try {
RtpPacket packet = packets.take();
processPacket(packet);
} catch (InterruptedException e) {
log.warn("InterruptedExeption while taking event.");
}
}
}
};
exec.execute(packetProcessor);
}
private void processPacket(RtpPacket packet) {
byte[] payload = packet.getPayload();
transcoder.transcode(payload);
}
public void process(RtpPacket packet) throws InterruptedException {
packets.put(packet);
}
public void stop() {
System.out.println("processPacket stopped");
processPacket = false;
clearQueue();
}
private void clearQueue() {
packets.clear();
}
}

View File

@ -8,29 +8,29 @@ import java.net.SocketException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import org.slf4j.Logger;
import org.bigbluebutton.voiceconf.red5.media.transcoder.Transcoder;
import org.red5.logging.Red5LoggerFactory;
public class RtpStreamReceiver {
protected static Logger log = Red5LoggerFactory.getLogger(RtpStreamReceiver.class, "sip");
// Maximum blocking time, spent waiting for reading new bytes [milliseconds]
private static final int SO_TIMEOUT = 200;
// private static final int SO_TIMEOUT = 200;
private static int RTP_HEADER_SIZE = 12;
private RtpSocket rtpSocket = null;
private final Executor exec = Executors.newSingleThreadExecutor();
private Runnable rtpPacketReceiver;
private volatile boolean receivePackets = false;
private ReceivedRtpPacketProcessor packetProcessor;
private RtpStreamReceiverListener listener;
private RtpStreamReceiverListener listener;
private Transcoder transcoder;
private final int payloadLength;
public RtpStreamReceiver(ReceivedRtpPacketProcessor packetProcessor, DatagramSocket socket, int payloadLength) {
this.packetProcessor = packetProcessor;
this.payloadLength = payloadLength;
public RtpStreamReceiver(Transcoder transcoder, DatagramSocket socket) {
this.transcoder = transcoder;
this.payloadLength = transcoder.getIncomingEncodedFrameSize();
rtpSocket = new RtpSocket(socket);
// initializeSocket();
initializeSocket();
}
public void setRtpStreamReceiverListener(RtpStreamReceiverListener listener) {
@ -38,12 +38,12 @@ public class RtpStreamReceiver {
}
private void initializeSocket() {
try {
/* try {
rtpSocket.getDatagramSocket().setSoTimeout(SO_TIMEOUT);
} catch (SocketException e1) {
log.warn("SocketException while setting socket block time.");
}
}
*/ }
public void start() {
receivePackets = true;
@ -60,7 +60,6 @@ public class RtpStreamReceiver {
}
public void receiveRtpPackets() {
int packetReceivedCounter = 0;
int internalBufferLength = payloadLength + RTP_HEADER_SIZE;
@ -70,49 +69,13 @@ public class RtpStreamReceiver {
RtpPacket rtpPacket = new RtpPacket(internalBuffer, 0);
rtpSocket.receive(rtpPacket);
packetReceivedCounter++;
processReceivedPacket(rtpPacket);
transcoder.transcode(rtpPacket.getPayload());
} catch (IOException e) {
log.error("IOException while receiving rtp packets.");
receivePackets = false;
}
}
closeSocket();
log.debug("Rtp Receiver stopped." );
log.debug("Packet Received = " + packetReceivedCounter + "." );
}
private void processReceivedPacket(RtpPacket rtpPacket) {
/*
int headerOffset = 0;
int payloadLength = 0;
byte[] packetBuffer = rtpPacket.getPacket();
headerOffset = rtpPacket.getHeaderLength();
payloadLength = rtpPacket.getPayloadLength();
byte[] codedBuffer = new byte[payloadLength];
*/
// System.out.println("pkt.length = " + rtpPacket.getPayloadLength()
// + ", offset = " + rtpPacket.getHeaderLength()
// + ", length = " + rtpPacket.getPayloadLength() + "." );
// System.arraycopy(packetBuffer, headerOffset, codedBuffer, 0, payloadLength);
// transcoder.transcode(codedBuffer);
// byte[] payload = rtpPacket.getPayload();
try {
packetProcessor.process(rtpPacket);
} catch (InterruptedException e) {
log.error("InterruptedException while attempting to process received Rtp packet");
}
}
private void closeSocket() {
rtpSocket.close();
if (listener != null) {
listener.onStoppedReceiving();
}
}
}

View File

@ -2,13 +2,16 @@ package org.bigbluebutton.voiceconf.red5.media;
import local.net.RtpPacket;
import local.net.RtpSocket;
import java.io.IOException;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.SocketException;
import java.net.UnknownHostException;
import org.slf4j.Logger;
import org.red5.app.sip.trancoders.Transcoder;
import org.bigbluebutton.voiceconf.red5.media.transcoder.Transcoder;
import org.bigbluebutton.voiceconf.sip.SipConnectInfo;
import org.bigbluebutton.voiceconf.util.StackTraceUtil;
import org.red5.logging.Red5LoggerFactory;
public class RtpStreamSender {
@ -16,33 +19,46 @@ public class RtpStreamSender {
private static final int RTP_HEADER_SIZE = 12;
private RtpSocket rtpSocket = null;
private byte[] packetBuffer;
private byte[] transcodedAudioDataBuffer;
private RtpPacket rtpPacket;
private int startPayloadPos;
private int dtmf2833Type = 101;
private static final int DTMF2833 = 101;
private int sequenceNum = 0;
private long timestamp = 0;
private Transcoder transcoder;
private final Transcoder transcoder;
private final DatagramSocket srcSocket;
private final SipConnectInfo connInfo;
public RtpStreamSender(Transcoder transcoder, DatagramSocket srcSocket, String destAddr, int destPort) throws UnknownHostException {
public RtpStreamSender(Transcoder transcoder, DatagramSocket srcSocket, SipConnectInfo connInfo) {
this.transcoder = transcoder;
rtpSocket = new RtpSocket(srcSocket, InetAddress.getByName(destAddr), destPort);
this.srcSocket = srcSocket;
this.connInfo = connInfo;
}
public void start() {
packetBuffer = new byte[transcoder.getOutgoingEncodedFrameSize() + RTP_HEADER_SIZE];
rtpPacket = new RtpPacket(packetBuffer, 0);
public void connect() throws StreamException {
try {
rtpSocket = new RtpSocket(srcSocket, InetAddress.getByName(connInfo.getRemoteAddr()), connInfo.getRemotePort());
init();
} catch (UnknownHostException e) {
log.error("Failed to connect to {}", connInfo.getRemoteAddr());
log.error(StackTraceUtil.getStackTrace(e));
throw new StreamException("Rtp sender failed to connect to " + connInfo.getRemoteAddr() + ".");
}
}
private void init() {
transcodedAudioDataBuffer = new byte[transcoder.getOutgoingEncodedFrameSize() + RTP_HEADER_SIZE];
rtpPacket = new RtpPacket(transcodedAudioDataBuffer, 0);
rtpPacket.setPayloadType(transcoder.getCodecId());
startPayloadPos = rtpPacket.getHeaderLength();
sequenceNum = 0;
timestamp = 0;
timestamp = 0;
}
public void queueSipDtmfDigits(String dtmfDigits) {
public void sendDtmfDigits(String dtmfDigits) throws StreamException {
byte[] dtmfbuf = new byte[transcoder.getOutgoingEncodedFrameSize() + RTP_HEADER_SIZE];
RtpPacket dtmfpacket = new RtpPacket(dtmfbuf, 0);
dtmfpacket.setPayloadType(dtmf2833Type);
dtmfpacket.setPayloadType(DTMF2833);
dtmfpacket.setPayloadLength(transcoder.getOutgoingEncodedFrameSize());
byte[] blankbuf = new byte[transcoder.getOutgoingEncodedFrameSize() + RTP_HEADER_SIZE];
@ -66,80 +82,66 @@ public class RtpStreamSender {
}
// notice we are bumping times/seqn just like audio packets
try {
// send start event packet 3 times
dtmfbuf[startPayloadPos + 1] = 0; // start event flag
// and volume
dtmfbuf[startPayloadPos + 2] = 1; // duration 8 bits
dtmfbuf[startPayloadPos + 3] = -32; // duration 8 bits
// send start event packet 3 times
dtmfbuf[startPayloadPos + 1] = 0; // start event flag and volume
dtmfbuf[startPayloadPos + 2] = 1; // duration 8 bits
dtmfbuf[startPayloadPos + 3] = -32; // duration 8 bits
for (int r = 0; r < 3; r++) {
dtmfpacket.setSequenceNumber(sequenceNum++);
dtmfpacket.setTimestamp(transcoder.getOutgoingEncodedFrameSize());
doRtpDelay();
rtpSocketSend(dtmfpacket);
}
// send end event packet 3 times
dtmfbuf[startPayloadPos + 1] = -128; // end event flag
dtmfbuf[startPayloadPos + 2] = 3; // duration 8 bits
dtmfbuf[startPayloadPos + 3] = 116; // duration 8 bits
for (int r = 0; r < 3; r++) {
dtmfpacket.setSequenceNumber(sequenceNum++);
dtmfpacket.setTimestamp(transcoder.getOutgoingEncodedFrameSize() );
doRtpDelay();
rtpSocketSend(dtmfpacket);
}
// send 200 ms of blank packets
for (int r = 0; r < 200 / transcoder.getOutgoingPacketization(); r++) {
blankpacket.setSequenceNumber(sequenceNum++);
blankpacket.setTimestamp(transcoder.getOutgoingEncodedFrameSize());
doRtpDelay();
rtpSocketSend(blankpacket);
}
for (int r = 0; r < 3; r++) {
dtmfpacket.setSequenceNumber(sequenceNum++);
dtmfpacket.setTimestamp(transcoder.getOutgoingEncodedFrameSize());
doRtpDelay();
rtpSocketSend(dtmfpacket);
}
catch (Exception e) {
log.warn("queueSipDtmfDigits", e.getLocalizedMessage());
// send end event packet 3 times
dtmfbuf[startPayloadPos + 1] = -128; // end event flag
dtmfbuf[startPayloadPos + 2] = 3; // duration 8 bits
dtmfbuf[startPayloadPos + 3] = 116; // duration 8 bits
for (int r = 0; r < 3; r++) {
dtmfpacket.setSequenceNumber(sequenceNum++);
dtmfpacket.setTimestamp(transcoder.getOutgoingEncodedFrameSize() );
doRtpDelay();
rtpSocketSend(dtmfpacket);
}
// send 200 ms of blank packets
for (int r = 0; r < 200 / transcoder.getOutgoingPacketization(); r++) {
blankpacket.setSequenceNumber(sequenceNum++);
blankpacket.setTimestamp(transcoder.getOutgoingEncodedFrameSize());
doRtpDelay();
rtpSocketSend(blankpacket);
}
}
}
public void send(byte[] asaoBuffer, int offset, int num) {
// System.out.println("Transcoding from Nelly to PCM");
// transcoder.transcode(asaoBuffer, offset, num, packetBuffer, RTP_HEADER_SIZE, this);
public void send(byte[] audioData, int offset, int num) {
transcoder.transcode(audioData, offset, num, transcodedAudioDataBuffer, RTP_HEADER_SIZE, this);
}
public void sendTranscodedData() {
rtpPacket.setSequenceNumber( sequenceNum++ );
timestamp += transcoder.getOutgoingEncodedFrameSize();
rtpPacket.setTimestamp( timestamp );
rtpPacket.setPayloadLength( transcoder.getOutgoingEncodedFrameSize() );
// System.out.println("Sending rtpPacket " + timestamp);
public void sendTranscodedData() throws StreamException {
rtpPacket.setSequenceNumber(sequenceNum++);
timestamp += transcoder.getOutgoingEncodedFrameSize();
rtpPacket.setTimestamp(timestamp);
rtpPacket.setPayloadLength(transcoder.getOutgoingEncodedFrameSize());
rtpSocketSend(rtpPacket);
}
public void stop() {
rtpSocket.close();
log.debug(" Stopping Rtp sender." );
}
private void doRtpDelay() {
try {
Thread.sleep(transcoder.getOutgoingPacketization() - 2);
}
catch ( Exception e ) {
} catch (Exception e) {
}
}
private synchronized void rtpSocketSend(RtpPacket rtpPacket) {
try {
rtpSocket.send( rtpPacket );
timestamp += rtpPacket.getPayloadLength();
}
catch (Exception e) {
log.error("Exception while sending packet");
}
private synchronized void rtpSocketSend(RtpPacket rtpPacket) throws StreamException {
try {
rtpSocket.send(rtpPacket);
timestamp += rtpPacket.getPayloadLength();
} catch (IOException e) {
log.error("Exception while trying to send rtp packet");
log.error(StackTraceUtil.getStackTrace(e));
throw new StreamException("Failed to send data to server.");
}
}
}

View File

@ -0,0 +1,10 @@
package org.bigbluebutton.voiceconf.red5.media;
public class StreamException extends Exception {
private static final long serialVersionUID = 1L;
public StreamException(String message) {
super(message);
}
}

View File

@ -1,13 +1,10 @@
package org.bigbluebutton.voiceconf.red5.media;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.net.DatagramSocket;
import org.apache.mina.core.buffer.IoBuffer;
import org.red5.app.sip.RtmpAudioData;
import org.red5.app.sip.trancoders.Transcoder;
import org.bigbluebutton.voiceconf.red5.media.transcoder.Transcoder;
import org.bigbluebutton.voiceconf.sip.SipConnectInfo;
import org.red5.logging.Red5LoggerFactory;
import org.red5.server.api.IScope;
import org.red5.server.api.stream.IBroadcastStream;
@ -20,76 +17,52 @@ import org.slf4j.Logger;
public class TalkStream {
private final static Logger log = Red5LoggerFactory.getLogger(TalkStream.class, "sip");
private final Transcoder transcoder;
private final RtpStreamSender rtpSender;
private final IStreamListener mInputListener;
private final Transcoder transcoder;
private IStreamListener mInputListener;
private final DatagramSocket srcSocket;
private final SipConnectInfo connInfo;
private String talkStreamName;
private BlockingQueue<RtmpAudioData> packets = new LinkedBlockingQueue<RtmpAudioData>();
private final Executor exec = Executors.newSingleThreadExecutor();
private Runnable audioProcessor;
private volatile boolean processAudio = false;
private RtpStreamSender rtpSender;
private final String talkStreamName;
public TalkStream(final Transcoder transcoder, final RtpStreamSender rtpSender) {
public TalkStream(final Transcoder transcoder, DatagramSocket srcSocket, SipConnectInfo connInfo) {
this.transcoder = transcoder;
this.rtpSender = rtpSender;
talkStreamName = "microphone_" + System.currentTimeMillis();
this.srcSocket = srcSocket;
this.connInfo = connInfo;
}
public void start(IBroadcastStream broadcastStream, IScope scope) throws StreamException {
log.debug("startTranscodingStream({},{})", broadcastStream.getPublishedName(), scope.getName());
talkStreamName = broadcastStream.getPublishedName();
mInputListener = new IStreamListener() {
public void packetReceived(IBroadcastStream broadcastStream, IStreamPacket packet) {
IoBuffer buf = packet.getData();
if (buf != null)
buf.rewind();
buf.rewind();
if (buf == null || buf.remaining() == 0){
log.debug("skipping empty packet with no data");
System.out.println("skipping empty packet with no data");
return;
log.debug("skipping empty packet with no data");
return;
}
if (packet instanceof AudioData) {
try {
byte[] data = SerializeUtils.ByteBufferToByteArray(buf);
RtmpAudioData audioData = new RtmpAudioData(data);
// System.out.println("Adding data " + data.length);
packets.put(audioData);
} catch (InterruptedException e) {
log.info("Interrupted exception while queieing audio packet");
}
byte[] data = SerializeUtils.ByteBufferToByteArray(buf);
rtpSender.send(data, 1, data.length-1);
}
}
};
}
public void start(IBroadcastStream broadcastStream, IScope scope) {
log.debug("startTranscodingStream({},{})", broadcastStream.getPublishedName(), scope.getName());
broadcastStream.addStreamListener(mInputListener);
processAudio = true;
audioProcessor = new Runnable() {
public void run() {
while (processAudio) {
try {
RtmpAudioData packet = packets.take();
processAudioPacket(packet);
} catch (InterruptedException e) {
log.info("InterruptedExeption while taking audio packet.");
}
}
}
};
exec.execute(audioProcessor);
broadcastStream.addStreamListener(mInputListener);
rtpSender = new RtpStreamSender(transcoder, srcSocket, connInfo);
rtpSender.connect();
}
public void stop(IBroadcastStream broadcastStream, IScope scope) {
broadcastStream.removeStreamListener(mInputListener);
}
private void processAudioPacket(RtmpAudioData packet) {
byte[] data = packet.getData();
// System.out.println("Proccessing voice data");
rtpSender.send(data, 1, data.length-1);
}
public void stop() {
processAudio = false;
public void sendDtmfDigits(String dtmfDigits) throws StreamException {
rtpSender.sendDtmfDigits(dtmfDigits);
}
public String getStreamName() {

View File

@ -0,0 +1,178 @@
package org.bigbluebutton.voiceconf.red5.media.transcoder;
import org.slf4j.Logger;
import org.bigbluebutton.voiceconf.red5.media.RtpStreamSender;
import org.bigbluebutton.voiceconf.red5.media.StreamException;
import org.red5.logging.Red5LoggerFactory;
import org.red5.app.sip.codecs.Codec;
import org.red5.app.sip.codecs.asao.ByteStream;
import org.red5.app.sip.codecs.asao.Decoder;
import org.red5.app.sip.codecs.asao.DecoderMap;
/**
* Transcodes audio from voice conferencing server to Flash.
* Specifically U-law to Nelly.
* @author Richard Alam
*
*/
public class NellyToPcmTranscoder implements Transcoder {
protected static Logger log = Red5LoggerFactory.getLogger( NellyToPcmTranscoder.class, "sip" );
private static final int NELLYMOSER_DECODED_PACKET_SIZE = 256;
private static final int NELLYMOSER_ENCODED_PACKET_SIZE = 64;
private Codec sipCodec = null; // Sip codec to be used on audio session
private Decoder decoder;
private DecoderMap decoderMap;
float[] tempBuffer; // Temporary buffer with received PCM audio from FlashPlayer.
int tempBufferRemaining = 0; // Floats remaining on temporary buffer.
float[] encodingBuffer; // Encoding buffer used to encode to final codec format;
int encodingOffset = 0; // Offset of encoding buffer.
boolean asao_buffer_processed = false; // Indicates whether the current asao buffer was processed.
boolean hasInitilializedBuffers = false; // Indicates whether the handling buffers have already been initialized.
public NellyToPcmTranscoder(Codec sipCodec) {
this.sipCodec = sipCodec;
decoder = new Decoder();
decoderMap = null;
}
public int getOutgoingEncodedFrameSize() {
return sipCodec.getOutgoingEncodedFrameSize();
}
public int getCodecId() {
return sipCodec.getCodecId();
}
public int getOutgoingPacketization() {
return sipCodec.getOutgoingPacketization();
}
private int fillRtpPacketBuffer(byte[] audioData, byte[] transcodedData, int dataOffset) {
int copyingSize = 0;
int finalCopySize = 0;
byte[] codedBuffer = new byte[sipCodec.getOutgoingEncodedFrameSize()];
try {
if ((tempBufferRemaining + encodingOffset) >= sipCodec.getOutgoingDecodedFrameSize()) {
copyingSize = encodingBuffer.length - encodingOffset;
System.arraycopy(tempBuffer, tempBuffer.length-tempBufferRemaining,
encodingBuffer, encodingOffset, copyingSize);
encodingOffset = sipCodec.getOutgoingDecodedFrameSize();
tempBufferRemaining -= copyingSize;
finalCopySize = sipCodec.getOutgoingDecodedFrameSize();
}
else {
if (tempBufferRemaining > 0) {
System.arraycopy(tempBuffer, tempBuffer.length - tempBufferRemaining,
encodingBuffer, encodingOffset, tempBufferRemaining);
encodingOffset += tempBufferRemaining;
finalCopySize += tempBufferRemaining;
tempBufferRemaining = 0;
}
// Decode new asao packet.
asao_buffer_processed = true;
ByteStream audioStream = new ByteStream(audioData, 1, NELLYMOSER_ENCODED_PACKET_SIZE);
decoderMap = decoder.decode( decoderMap, audioStream.bytes, 1, tempBuffer, 0 );
//tempBuffer = ResampleUtils.normalize(tempBuffer, 256); // normalise volume
tempBufferRemaining = tempBuffer.length;
if ( tempBuffer.length <= 0 ) {
log.error("Asao decoder Error." );
}
// Try to complete the encodingBuffer with necessary data.
if ( ( encodingOffset + tempBufferRemaining ) > sipCodec.getOutgoingDecodedFrameSize() ) {
copyingSize = encodingBuffer.length - encodingOffset;
}
else {
copyingSize = tempBufferRemaining;
}
System.arraycopy(tempBuffer, 0, encodingBuffer, encodingOffset, copyingSize);
encodingOffset += copyingSize;
tempBufferRemaining -= copyingSize;
finalCopySize += copyingSize;
}
if (encodingOffset == encodingBuffer.length)
{
int encodedBytes = sipCodec.pcmToCodec( encodingBuffer, codedBuffer );
if ( encodedBytes == sipCodec.getOutgoingEncodedFrameSize() ) {
System.arraycopy(codedBuffer, 0, transcodedData, dataOffset, codedBuffer.length);
}
else {
log.error("Failure encoding buffer." );
}
}
}
catch ( Exception e ) {
log.error("Exception - " + e.toString());
e.printStackTrace();
}
return finalCopySize;
}
public void transcode(byte[] asaoBuffer, int offset, int num, byte[] transcodedData, int dataOffset, RtpStreamSender rtpSender) {
asao_buffer_processed = false;
if (!hasInitilializedBuffers) {
tempBuffer = new float[NELLYMOSER_DECODED_PACKET_SIZE];
encodingBuffer = new float[sipCodec.getOutgoingDecodedFrameSize()];
hasInitilializedBuffers = true;
}
if (num > 0) {
do {
int encodedBytes = fillRtpPacketBuffer( asaoBuffer, transcodedData, dataOffset );
log.debug( "send " + sipCodec.getCodecName() + " encoded " + encodedBytes + " bytes." );
if ( encodedBytes == 0 ) {
break;
}
if (encodingOffset == sipCodec.getOutgoingDecodedFrameSize()) {
log.debug("Send this audio to asterisk.");
try {
rtpSender.sendTranscodedData();
} catch (StreamException e) {
// Swallow this error for now. We don't really want to end the call if sending hiccups.
// Just log it for now. (ralam june 18, 2010)
log.warn("Error while sending transcoded audio packet.");
}
encodingOffset = 0;
}
log.debug("asao_buffer_processed = [" + asao_buffer_processed + "] ." );
}
while (!asao_buffer_processed);
}
else if (num < 0) {
log.debug("Closing" );
}
}
/**
* Not implemented. Implemented by transcoders for voice conf server to Flash.
*/
public void transcode(byte[] codedBuffer) {
log.error("Not implemented.");
}
/**
* Not implemented. Implemented by transcoders for voice conf server to Flash.
*/
public int getIncomingEncodedFrameSize() {
log.error("Not implemented.");
return 0;
}
}

View File

@ -0,0 +1,133 @@
package org.bigbluebutton.voiceconf.red5.media.transcoder;
import org.slf4j.Logger;
import org.apache.mina.core.buffer.IoBuffer;
import org.bigbluebutton.voiceconf.red5.media.RtpStreamSender;
import org.red5.logging.Red5LoggerFactory;
import org.red5.server.net.rtmp.event.AudioData;
import org.red5.app.sip.codecs.Codec;
import org.red5.app.sip.codecs.asao.ByteStream;
import org.red5.app.sip.codecs.asao.CodecImpl;
public class PcmToNellyTranscoder implements Transcoder {
protected static Logger log = Red5LoggerFactory.getLogger(PcmToNellyTranscoder.class, "sip");
private static final int NELLYMOSER_DECODED_PACKET_SIZE = 256;
private static final int NELLYMOSER_ENCODED_PACKET_SIZE = 64;
private static final int NELLYMOSER_CODEC_ID = 82;
private float[] encoderMap;
private Codec audioCodec = null;
private float[] tempBuffer; // Temporary buffer with PCM audio to be sent to FlashPlayer.
private int tempBufferOffset = 0;
private final TranscodedAudioDataListener listener;
private long start = 0;
public PcmToNellyTranscoder(Codec audioCodec, TranscodedAudioDataListener listener) {
this.audioCodec = audioCodec;
this.listener = listener;
encoderMap = new float[64];
tempBuffer = new float[NELLYMOSER_DECODED_PACKET_SIZE];
start = System.currentTimeMillis();
}
/**
* Fills the tempBuffer with necessary PCM's floats and encodes
* the audio to be sent to FlashPlayer.
*/
private void forwardAudioToFlashPlayer(float[] pcmBuffer) {
int pcmBufferOffset = 0;
int copySize = 0;
boolean pcmBufferProcessed = false;
do {
if ((tempBuffer.length - tempBufferOffset) <= (pcmBuffer.length - pcmBufferOffset)) {
copySize = tempBuffer.length - tempBufferOffset;
}
else {
copySize = pcmBuffer.length - pcmBufferOffset;
}
System.arraycopy( pcmBuffer, pcmBufferOffset, tempBuffer, tempBufferOffset, copySize);
tempBufferOffset += copySize;
pcmBufferOffset += copySize;
if (tempBufferOffset == NELLYMOSER_DECODED_PACKET_SIZE) {
ByteStream encodedStream = new ByteStream(NELLYMOSER_ENCODED_PACKET_SIZE);
encoderMap = CodecImpl.encode(encoderMap, tempBuffer, encodedStream.bytes);
pushAudio(encodedStream.bytes);
tempBufferOffset = 0;
}
if ( pcmBufferOffset == pcmBuffer.length ) {
pcmBufferProcessed = true;
}
}
while (!pcmBufferProcessed);
}
public void transcode(byte[] codedBuffer) {
float[] decodingBuffer = new float[codedBuffer.length];
int decodedBytes = audioCodec.codecToPcm(codedBuffer, decodingBuffer);
// log.debug("encodedBytes = " + decodedBytes + ", incomingDecodedFrameSize = " +
// audioCodec.getIncomingDecodedFrameSize() + "." );
if (decodedBytes == audioCodec.getIncomingDecodedFrameSize()) {
forwardAudioToFlashPlayer(decodingBuffer);
}
else {
log.warn("Failure decoding buffer." );
}
}
private void pushAudio(byte[] audio) {
IoBuffer buffer = IoBuffer.allocate(1024);
buffer.setAutoExpand(true);
buffer.clear();
buffer.put((byte) NELLYMOSER_CODEC_ID);
byte[] copy = new byte[audio.length];
System.arraycopy(audio, 0, copy, 0, audio.length );
buffer.put(copy);
buffer.flip();
AudioData audioData = new AudioData(buffer);
audioData.setTimestamp((int)(System.currentTimeMillis() - start) );
listener.handleTranscodedAudioData(audioData);
}
public int getIncomingEncodedFrameSize() {
return audioCodec.getIncomingEncodedFrameSize();
}
/**
* Not implemented. Implemented by transcoders for FP to voice conference server.
*/
public void transcode(byte[] asaoBuffer, int offset, int num,
byte[] transcodedData, int dataOffset, RtpStreamSender rtpSender) {
// TODO Auto-generated method stub
}
public int getCodecId() {
// TODO Auto-generated method stub
return 0;
}
public int getOutgoingEncodedFrameSize() {
// TODO Auto-generated method stub
return 0;
}
public int getOutgoingPacketization() {
// TODO Auto-generated method stub
return 0;
}
}

View File

@ -0,0 +1,71 @@
package org.bigbluebutton.voiceconf.red5.media.transcoder;
import org.apache.mina.core.buffer.IoBuffer;
import org.bigbluebutton.voiceconf.red5.media.RtpStreamSender;
import org.red5.app.sip.codecs.Codec;
import org.red5.server.net.rtmp.event.AudioData;
public class SpeexToSpeexTranscoder implements Transcoder {
private Codec audioCodec;
private TranscodedAudioDataListener listener;
private int timestamp = 0;
private static final int SPEEX_CODEC = 178; /* 1011 1111 (see flv spec) */
public SpeexToSpeexTranscoder(Codec audioCodec, TranscodedAudioDataListener listener) {
this.audioCodec = audioCodec;
this.listener = listener;
}
public SpeexToSpeexTranscoder(Codec audioCodec) {
this.audioCodec = audioCodec;
}
public void transcode(byte[] asaoBuffer, int offset, int num,
byte[] transcodedData, int dataOffset, RtpStreamSender rtpSender) {
System.arraycopy(asaoBuffer, offset, transcodedData, dataOffset, num);
rtpSender.sendTranscodedData();
}
public void transcode(byte[] codedBuffer) {
pushAudio(codedBuffer);
}
private void pushAudio(byte[] audio) {
timestamp = timestamp + audio.length;
IoBuffer buffer = IoBuffer.allocate(1024);
buffer.setAutoExpand(true);
buffer.clear();
buffer.put((byte) SPEEX_CODEC);
byte[] copy = new byte[audio.length];
System.arraycopy(audio, 0, copy, 0, audio.length );
buffer.put(copy);
buffer.flip();
AudioData audioData = new AudioData( buffer );
audioData.setTimestamp((int)timestamp );
listener.handleTranscodedAudioData(audioData);
}
public int getCodecId() {
return audioCodec.getCodecId();
}
public int getOutgoingEncodedFrameSize() {
return audioCodec.getOutgoingEncodedFrameSize();
}
public int getOutgoingPacketization() {
return audioCodec.getOutgoingPacketization();
}
public int getIncomingEncodedFrameSize() {
return audioCodec.getIncomingEncodedFrameSize();
}
}

View File

@ -0,0 +1,8 @@
package org.bigbluebutton.voiceconf.red5.media.transcoder;
import org.red5.server.net.rtmp.event.AudioData;
public interface TranscodedAudioDataListener {
public void handleTranscodedAudioData(AudioData audioData);
}

View File

@ -0,0 +1,28 @@
package org.bigbluebutton.voiceconf.red5.media.transcoder;
import org.red5.app.sip.stream.RtpStreamSender;
public class TranscodedPcmAudioBuffer {
private byte[] buffer;
private int offset;
private RtpStreamSender sender;
TranscodedPcmAudioBuffer(byte[] data, int offset, RtpStreamSender sender) {
buffer = data;
this.offset = offset;
}
boolean copyData(byte[] data) {
if (data.length > buffer.length - offset)
return false;
System.arraycopy(data, 0, buffer, offset, data.length);
return true;
}
void sendData() {
sender.sendTranscodedData();
}
}

View File

@ -0,0 +1,15 @@
package org.bigbluebutton.voiceconf.red5.media.transcoder;
import org.bigbluebutton.voiceconf.red5.media.RtpStreamSender;
public interface Transcoder {
void transcode(byte[] asaoBuffer, int offset, int num, byte[] transcodedData, int dataOffset, RtpStreamSender rtpSender);
void transcode(byte[] codedBuffer);
int getOutgoingEncodedFrameSize();
int getCodecId();
int getOutgoingPacketization();
int getIncomingEncodedFrameSize();
}

View File

@ -9,6 +9,7 @@ import org.zoolu.sdp.*;
import org.bigbluebutton.voiceconf.red5.CallStreamFactory;
import org.bigbluebutton.voiceconf.red5.ClientConnectionManager;
import org.bigbluebutton.voiceconf.red5.media.CallStream;
import org.bigbluebutton.voiceconf.red5.media.ListenStreamObserver;
import org.red5.app.sip.codecs.Codec;
import org.red5.app.sip.codecs.CodecUtils;
import org.slf4j.Logger;
@ -21,7 +22,7 @@ import java.util.HashSet;
import java.util.Set;
import java.util.Vector;
public class CallAgent extends CallListenerAdapter {
public class CallAgent extends CallListenerAdapter {
private static Logger log = Red5LoggerFactory.getLogger(CallAgent.class, "sip");
private final SipPeerProfile userProfile;
@ -51,9 +52,7 @@ public class CallAgent extends CallListenerAdapter {
this.userProfile = userProfile;
this.clientId = clientId;
// If no contact_url and/or from_url has been set, create it now.
userProfile.initContactAddress(sipProvider);
// Set local sdp.
initSessionDescriptor();
}
@ -77,8 +76,8 @@ public class CallAgent extends CallListenerAdapter {
return callState == CallState.UA_IDLE;
}
public void queueSipDtmfDigits(String digits) {
callStream.queueSipDtmfDigits(digits);
public void sendDtmfDigits(String digits) {
callStream.sendDtmfDigits(digits);
}
private void initSessionDescriptor() {
@ -198,7 +197,7 @@ public class CallAgent extends CallListenerAdapter {
log.debug("closeMediaApplication" );
if (callStream != null) {
callStream.stopMedia();
callStream.stop();
callStream = null;
}
}
@ -386,4 +385,6 @@ public class CallAgent extends CallListenerAdapter {
public void setClientConnectionManager(ClientConnectionManager ccm) {
clientConnManager = ccm;
}
}

View File

@ -0,0 +1,12 @@
package org.bigbluebutton.voiceconf.util;
import java.io.*;
public final class StackTraceUtil {
public static String getStackTrace(Throwable aThrowable) {
final Writer result = new StringWriter();
final PrintWriter printWriter = new PrintWriter(result);
aThrowable.printStackTrace(printWriter);
return result.toString();
}
}