- minor cleanup and add start/stop into transcoder

TODO:
  - cleanup logging
  - create abstract class to implement common methods that individual transcoders override
This commit is contained in:
Richard Alam 2011-03-01 21:38:40 +00:00
parent d550962c0d
commit 15459681f6
8 changed files with 75 additions and 121 deletions

View File

@ -35,8 +35,6 @@ import org.slf4j.Logger;
public class FlashToSipAudioStream {
private final static Logger log = Red5LoggerFactory.getLogger(FlashToSipAudioStream.class, "sip");
private final FlashToSipTranscoder transcoder;
private IStreamListener mInputListener;
@ -45,22 +43,20 @@ public class FlashToSipAudioStream {
private String talkStreamName;
private RtpStreamSender rtpSender;
private TranscodedAudioListener transcodedAudioListener;
private volatile boolean processAudioData = false;
public FlashToSipAudioStream(final FlashToSipTranscoder transcoder, DatagramSocket srcSocket, SipConnectInfo connInfo) {
public FlashToSipAudioStream(final FlashToSipTranscoder transcoder, DatagramSocket srcSocket,
SipConnectInfo connInfo) {
this.transcoder = transcoder;
this.srcSocket = srcSocket;
this.connInfo = connInfo;
talkStreamName = "microphone_" + System.currentTimeMillis();
transcodedAudioListener = new TranscodedAudioListener();
transcoder.setTranscodedAudioListener(transcodedAudioListener);
transcoder.setProcessAudioData(processAudioData);
}
public void start(IBroadcastStream broadcastStream, IScope scope) throws StreamException {
log.debug("startTranscodingStream({},{})", broadcastStream.getPublishedName(), scope.getName());
processAudioData = true;
transcoder.setProcessAudioData(processAudioData);
if (log.isDebugEnabled())
log.debug("startTranscodingStream({},{})", broadcastStream.getPublishedName(), scope.getName());
mInputListener = new IStreamListener() {
public void packetReceived(IBroadcastStream broadcastStream, IStreamPacket packet) {
IoBuffer buf = packet.getData();
@ -74,8 +70,8 @@ public class FlashToSipAudioStream {
if (packet instanceof AudioData) {
byte[] data = SerializeUtils.ByteBufferToByteArray(buf);
if (data.length > 20) //==Don't send silence data whose data length is 11
transcoder.handlePacket(data, 1, data.length-1);
// Remove the first byte as it is the codec id.
transcoder.handlePacket(data, 1, data.length-1);
}
}
};
@ -83,8 +79,7 @@ public class FlashToSipAudioStream {
broadcastStream.addStreamListener(mInputListener);
rtpSender = new RtpStreamSender(srcSocket, connInfo);
rtpSender.connect();
transcoder.start();
}
public void stop(IBroadcastStream broadcastStream, IScope scope) {
@ -93,8 +88,7 @@ public class FlashToSipAudioStream {
broadcastStream.stop();
broadcastStream.close();
}
processAudioData = false;
transcoder.setProcessAudioData(processAudioData);
transcoder.stop();
srcSocket.close();
}
@ -105,7 +99,7 @@ public class FlashToSipAudioStream {
public class TranscodedAudioListener implements TranscodedAudioDataListener {
@Override
public void handleTranscodedAudioData(byte[] audioData, long timestamp) {
if (audioData != null && processAudioData) {
if (audioData != null) {
rtpSender.sendAudio(audioData, transcoder.getCodecId(), timestamp);
} else {
log.warn("Transcodec audio is null. Discarding.");

View File

@ -34,11 +34,7 @@ import org.red5.server.stream.IBroadcastScope;
import org.red5.server.stream.IProviderService;
import org.slf4j.Logger;
public class SipToFlashAudioStream implements TranscodedAudioDataListener, RtpStreamReceiverListener {
final private Logger log = Red5LoggerFactory.getLogger(SipToFlashAudioStream.class, "sip");
// private Runnable audioDataProcessor;
private volatile boolean processAudioData = false;
private static final Logger log = Red5LoggerFactory.getLogger(SipToFlashAudioStream.class, "sip");
private AudioBroadcastStream audioBroadcastStream;
private IScope scope;
@ -63,7 +59,6 @@ public class SipToFlashAudioStream implements TranscodedAudioDataListener, RtpSt
};
public SipToFlashAudioStream(IScope scope, SipToFlashTranscoder transcoder, DatagramSocket socket) {
processAudioData = true;
transcoder.setProcessAudioData(processAudioData);
this.scope = scope;
this.transcoder = transcoder;
@ -71,18 +66,12 @@ public class SipToFlashAudioStream implements TranscodedAudioDataListener, RtpSt
rtpStreamReceiver.setRtpStreamReceiverListener(this);
listenStreamName = "speaker_" + System.currentTimeMillis();
scope.setName(listenStreamName);
// streamFromSip = new PipedOutputStream();
// try {
// streamToFlash = new PipedInputStream(streamFromSip);
startNow();
mBuffer = IoBuffer.allocate(1024);
mBuffer = mBuffer.setAutoExpand(true);
audioData = new AudioData();
// } catch (IOException e) {
// TODO Auto-generated catch block
// e.printStackTrace();
// }
transcoder.setTranscodedAudioListener(this); //
startNow();
mBuffer = IoBuffer.allocate(1024);
mBuffer = mBuffer.setAutoExpand(true);
audioData = new AudioData();
transcoder.setTranscodedAudioListener(this);
start();
}
public String getStreamName() {
@ -94,9 +83,8 @@ public class SipToFlashAudioStream implements TranscodedAudioDataListener, RtpSt
}
public void stop() {
log.debug("Stopping stream for {}", listenStreamName);
processAudioData = false;
transcoder.setProcessAudioData(processAudioData);
log.debug("Stopping stream for {}", listenStreamName);
transcoder.stop();
rtpStreamReceiver.stop();
log.debug("Stopped RTP Stream Receiver for {}", listenStreamName);
if (audioBroadcastStream != null) {
@ -108,12 +96,8 @@ public class SipToFlashAudioStream implements TranscodedAudioDataListener, RtpSt
log.debug("audioBroadcastStream is null, couldn't stop");
log.debug("Stream(s) stopped");
}
public void start() {
}
private void startNow() {
private void start() {
log.debug("started publishing stream in " + scope.getName());
audioBroadcastStream = new AudioBroadcastStream(listenStreamName);
audioBroadcastStream.setPublishedName(listenStreamName);
@ -131,43 +115,10 @@ public class SipToFlashAudioStream implements TranscodedAudioDataListener, RtpSt
}
audioBroadcastStream.start();
processAudioData = true;
transcoder.setProcessAudioData(processAudioData);
/* audioDataProcessor = new Runnable() {
public void run() {
processAudioData();
}
};
exec.execute(audioDataProcessor);
*/
transcoder.start();
rtpStreamReceiver.start();
}
/* private void processAudioData() {
int len = 160;
byte[] pcmAudio = new byte[len];
int remaining = len;
int offset = 0;
while (processAudioData) {
try {
int bytesRead = streamToFlash.read(pcmAudio, offset, remaining);
remaining -= bytesRead;
if (remaining == 0) {
remaining = len;
offset = 0;
transcoder.transcode(pcmAudio, this);
} else {
offset += bytesRead;
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
*/
@Override
public void onStoppedReceiving() {
if (observer != null) observer.onStreamStopped();

View File

@ -27,5 +27,6 @@ public interface FlashToSipTranscoder {
int getOutgoingEncodedFrameSize();
int getCodecId();
void setTranscodedAudioListener(TranscodedAudioListener transcodedAudioListener);
void setProcessAudioData(boolean isProcessing);
void start();
void stop();
}

View File

@ -76,7 +76,7 @@ public class NellyFlashToSipTranscoderImp implements FlashToSipTranscoder {
private final Executor exec = Executors.newSingleThreadExecutor();
private Runnable audioDataProcessor;
private boolean processAudioData = false;
private volatile boolean processAudioData = false;
private TranscodedAudioListener transcodedAudioListener;
/**
@ -101,8 +101,11 @@ public class NellyFlashToSipTranscoderImp implements FlashToSipTranscoder {
// TODO Auto-generated catch block
e.printStackTrace();
}
processAudioData = true;
}
@Override
public void start() {
processAudioData = true;
audioDataProcessor = new Runnable() {
public void run() {
processAudioData();
@ -221,9 +224,10 @@ public class NellyFlashToSipTranscoderImp implements FlashToSipTranscoder {
public void setTranscodedAudioListener(TranscodedAudioListener transcodedAudioListener) {
this.transcodedAudioListener = transcodedAudioListener;
}
@Override
public void setProcessAudioData(boolean isProcessing){
processAudioData = isProcessing;
public void stop() {
processAudioData = false;
}
}

View File

@ -80,8 +80,6 @@ public class NellySipToFlashTranscoderImp implements SipToFlashTranscoder {
private final PipedOutputStream streamFromSip;
private PipedInputStream streamToFlash;
// private NellySipToFlashTranscoderImp transcoder;
private TranscodedAudioDataListener transcodedAudioListener;
private boolean processAudioData;
@ -105,7 +103,6 @@ public class NellySipToFlashTranscoderImp implements SipToFlashTranscoder {
} catch (IOException e) {
e.printStackTrace();
}
startNow();
}
@Override
@ -172,17 +169,6 @@ public class NellySipToFlashTranscoderImp implements SipToFlashTranscoder {
}
private void startNow() {
audioDataProcessor = new Runnable() {
public void run() {
processAudioData();
}
};
exec.execute(audioDataProcessor);
}
private void processAudioData() {
int len = 160;
byte[] pcmAudio = new byte[len];
@ -207,15 +193,25 @@ public class NellySipToFlashTranscoderImp implements SipToFlashTranscoder {
}
@Override
public void setProcessAudioData(boolean isProcessing){
processAudioData = isProcessing;
}
public void start(){
processAudioData = true;
audioDataProcessor = new Runnable() {
public void run() {
processAudioData();
}
};
exec.execute(audioDataProcessor);
}
@Override
public void stop() {
processAudioData = false;
}
@Override
public void setTranscodedAudioListener(
SipToFlashAudioStream sipToFlashAudioStream) {
this.transcodedAudioListener = sipToFlashAudioStream;
public void setTranscodedAudioListener(SipToFlashAudioStream sipToFlashAudioStream) {
this.transcodedAudioListener = sipToFlashAudioStream;
}

View File

@ -27,6 +27,6 @@ public interface SipToFlashTranscoder {
int getIncomingEncodedFrameSize();
void handleData(byte[] audioData, int offset, int len);
void setTranscodedAudioListener(SipToFlashAudioStream sipToFlashAudioStream);
void setProcessAudioData(boolean isProcessing);
void start();
void stop();
}

View File

@ -26,6 +26,11 @@ import org.red5.app.sip.codecs.Codec;
import org.red5.logging.Red5LoggerFactory;
import org.slf4j.Logger;
/**
* Speex wideband to Speex wideband Flash to SIP transcoder.
* This class is just a passthrough transcoder.
*
*/
public class SpeexFlashToSipTranscoderImp implements FlashToSipTranscoder {
protected static Logger log = Red5LoggerFactory.getLogger(SpeexFlashToSipTranscoderImp.class, "sip");
@ -38,11 +43,12 @@ public class SpeexFlashToSipTranscoderImp implements FlashToSipTranscoder {
this.audioCodec = audioCodec;
Random rgen = new Random();
timestamp = rgen.nextInt(1000);
System.out.println("Speex start!");
}
public void transcode(byte[] audioData, int startOffset, int length) {
byte[] transcodedAudio = new byte[length];
// Just copy the audio data removing the codec id which is the first-byte
// represented by the startOffset var.
System.arraycopy(audioData, startOffset, transcodedAudio, 0, length);
transcodedAudioListener.handleTranscodedAudioData(transcodedAudio, timestamp += TS_INCREMENT);
}
@ -61,20 +67,21 @@ public class SpeexFlashToSipTranscoderImp implements FlashToSipTranscoder {
@Override
public void handlePacket(byte[] data, int begin, int end) {
transcode(data, begin, end);
transcode(data, begin, end);
}
@Override
public void setTranscodedAudioListener(
TranscodedAudioListener transcodedAudioListener) {
this.transcodedAudioListener = transcodedAudioListener;
public void setTranscodedAudioListener(TranscodedAudioListener transcodedAudioListener) {
this.transcodedAudioListener = transcodedAudioListener;
}
@Override
public void setProcessAudioData(boolean isProcessing) {
// TODO do nothing here;
public void start() {
// do nothing. just implement the interface.
}
@Override
public void stop() {
// do nothing. just implement the interface.
}
}

View File

@ -26,6 +26,11 @@ import org.red5.app.sip.codecs.Codec;
import org.red5.logging.Red5LoggerFactory;
import org.slf4j.Logger;
/**
* Speex wideband to speex wideband Sip to Flash Transcoder.
* This is just a passthrough transcoder.
*
*/
public class SpeexSipToFlashTranscoderImp implements SipToFlashTranscoder {
protected static Logger log = Red5LoggerFactory.getLogger(SpeexSipToFlashTranscoderImp.class, "sip");
@ -43,9 +48,7 @@ public class SpeexSipToFlashTranscoderImp implements SipToFlashTranscoder {
@Override
public void transcode(byte[] audioData ) {
byte[] codedBuffer = audioData;
// System.out.println("Speex transcode:"+audioData.length);
transcodedAudioListener.handleTranscodedAudioData(codedBuffer, timestamp += TS_INCREMENT);
transcodedAudioListener.handleTranscodedAudioData(audioData, timestamp += TS_INCREMENT);
}
@Override
@ -62,13 +65,11 @@ public class SpeexSipToFlashTranscoderImp implements SipToFlashTranscoder {
public void handleData(byte[] audioData, int offset, int len) {
byte[] data = new byte[len];
System.arraycopy(audioData, offset, data, 0, len);
transcode(data);
transcode(data);
}
@Override
public void setTranscodedAudioListener(
SipToFlashAudioStream sipToFlashAudioStream) {
public void setTranscodedAudioListener(SipToFlashAudioStream sipToFlashAudioStream) {
this.transcodedAudioListener = sipToFlashAudioStream;
}