SFU audio module now uses BaseProcess and BaseManager

This commit is contained in:
prlanzarin 2018-04-13 17:24:16 +00:00
parent 10fe1d6a00
commit 0610d82eb7
4 changed files with 50 additions and 112 deletions

View File

@ -9,67 +9,53 @@
const BigBlueButtonGW = require('../bbb/pubsub/bbb-gw');
const Audio = require('./audio');
const BaseManager = require('../base/BaseManager');
const C = require('../bbb/messages/Constants');
const Logger = require('../utils/Logger');
module.exports = class AudioManager {
constructor () {
this._clientId = 0;
module.exports = class AudioManager extends BaseManager {
constructor (connectionChannel, additionalChannels, logPrefix) {
super(connectionChannel, additionalChannels, logPrefix);
this._meetings = {};
this._audioSessions = {};
this._bbbGW = new BigBlueButtonGW('MANAGER');
this._redisGateway;
this._trackMeetingTermination();
this.messageFactory(this._onMessage);
}
async start() {
try {
this._redisGateway = await this._bbbGW.addSubscribeChannel(C.TO_AUDIO);
const meeting = await this._bbbGW.addSubscribeChannel(C.FROM_BBB_MEETING_CHAN);
this._redisGateway.on(C.REDIS_MESSAGE, this._onMessage.bind(this));
// Interoperability between transcoder messages
// TODO: Do we need to remove this listeners at some point?
switch (C.COMMON_MESSAGE_VERSION) {
case "1.x":
this._bbbGW.on(C.DICONNECT_ALL_USERS, (payload) => {
let meetingId = payload[C.MEETING_ID];
this._disconnectAllUsers(meetingId);
});
break;
default:
this._bbbGW.on(C.DICONNECT_ALL_USERS_2x, (payload) => {
let meetingId = payload[C.MEETING_ID_2x];
this._disconnectAllUsers(meetingId);
});
}
}
catch (error) {
Logger.error('[AudioManager] Could not connect to audio redis channel, finishing audio app with error', error);
this.stopAll();
_trackMeetingTermination () {
switch (C.COMMON_MESSAGE_VERSION) {
case "1.x":
this._bbbGW.on(C.DICONNECT_ALL_USERS, (payload) => {
let meetingId = payload[C.MEETING_ID];
this._disconnectAllUsers(meetingId);
});
break;
default:
this._bbbGW.on(C.DICONNECT_ALL_USERS_2x, (payload) => {
let meetingId = payload[C.MEETING_ID_2x];
this._disconnectAllUsers(meetingId);
});
}
}
_disconnectAllUsers(meetingId) {
let sessionId = this._meetings[meetingId];
if (typeof sessionId !== 'undefined') {
Logger.debug('[AudioManager] Disconnecting all users from', sessionId);
Logger.debug(this._logPrefix, 'Disconnecting all users from', sessionId);
delete this._meetings[meetingId];
this._stopSession(sessionId);
}
}
_onMessage(message) {
Logger.debug('[AudioManager] Received message [' + message.id + '] from connection', message.connectionId);
Logger.debug(this._logPrefix, 'Received message [' + message.id + '] from connection', message.connectionId);
let session;
let sessionId = message.voiceBridge.split('-')[0];
let voiceBridge = sessionId;
let connectionId = message.connectionId;
if(this._audioSessions[sessionId]) {
session = this._audioSessions[sessionId];
if(this._sessions[sessionId]) {
session = this._sessions[sessionId];
}
switch (message.id) {
@ -81,13 +67,13 @@ module.exports = class AudioManager {
this._meetings[message.internalMeetingId] = {};
this._meetings[message.internalMeetingId] = sessionId;
this._audioSessions[sessionId] = {}
this._audioSessions[sessionId] = session;
this._sessions[sessionId] = {}
this._sessions[sessionId] = session;
// starts audio session by sending sessionID, websocket and sdpoffer
session.start(sessionId, connectionId, message.sdpOffer, message.callerName, message.userId, message.userName, (error, sdpAnswer) => {
Logger.info("[AudioManager] Started presenter ", sessionId, " for connection", connectionId);
Logger.debug("[AudioManager] SDP answer was", sdpAnswer);
Logger.info(this._logPrefix, "Started presenter ", sessionId, " for connection", connectionId);
Logger.debug(this._logPrefix, "SDP answer was", sdpAnswer);
if (error) {
this._bbbGW.publish(JSON.stringify({
connectionId: connectionId,
@ -107,17 +93,17 @@ module.exports = class AudioManager {
sdpAnswer : sdpAnswer
}), C.FROM_AUDIO);
Logger.info("[AudioManager] Sending startResponse to user", sessionId, "for connection", session._id);
Logger.info(this._logPrefix, "Sending startResponse to user", sessionId, "for connection", session._id);
});
break;
case 'stop':
Logger.info('[AudioManager] Received stop message for session', sessionId, "at connection", connectionId);
Logger.info(this._logPrefix, 'Received stop message for session', sessionId, "at connection", connectionId);
if (session) {
session.stopListener(connectionId);
} else {
Logger.warn("[AudioManager] There was no audio session on stop for", sessionId);
Logger.warn(this._logPrefix, "There was no audio session on stop for", sessionId);
}
break;
@ -125,15 +111,15 @@ module.exports = class AudioManager {
if (session) {
session.onIceCandidate(message.candidate, connectionId);
} else {
Logger.warn("[AudioManager] There was no audio session for onIceCandidate for", sessionId, ". There should be a queue here");
Logger.warn(this._logPrefix, "There was no audio session for onIceCandidate for", sessionId, ". There should be a queue here");
}
break;
case 'close':
Logger.info('[AudioManager] Connection ' + connectionId + ' closed');
Logger.info(this._logPrefix, 'Connection ' + connectionId + ' closed');
if (typeof session !== 'undefined') {
Logger.info("[AudioManager] Stopping viewer " + sessionId);
Logger.info(this._logPrefix, "Stopping viewer " + sessionId);
session.stopListener(message.connectionId);
}
break;
@ -148,33 +134,4 @@ module.exports = class AudioManager {
break;
}
}
_stopSession(sessionId) {
Logger.info('[AudioManager] Stopping session ' + sessionId);
if (typeof this._audioSessions === 'undefined' || typeof sessionId === 'undefined') {
return;
}
let session = this._audioSessions[sessionId];
if(typeof session !== 'undefined' && typeof session.stopAll === 'function') {
session.stopAll();
}
delete this._audioSessions[sessionId];
}
stopAll() {
Logger.info('[AudioManager] Stopping everything! ');
if (typeof this._audioSessions === 'undefined') {
return;
}
let sessionIds = Object.keys(this._audioSessions);
for (let i = 0; i < sessionIds.length; i++) {
this._stopSession(sessionIds[i]);
}
}
};

View File

@ -1,32 +1,10 @@
'use strict';
const AudioManager = require('./AudioManager');
const Logger = require('../utils/Logger');
const config = require('config');
const AudioManager= require('./AudioManager');
const BaseProcess = require('../base/BaseProcess');
const C = require('../bbb/messages/Constants');
if (config.get('acceptSelfSignedCertificate')) {
process.env.NODE_TLS_REJECT_UNAUTHORIZED=0;
}
const manager = new AudioManager(C.TO_AUDIO, [C.FROM_BBB_MEETING_CHAN], C.AUDIO_MANAGER_PREFIX);
const newProcess = new BaseProcess(manager, C.AUDIO_PROCESS_PREFIX);
let am = new AudioManager();
process.on('uncaughtException', (error) => {
Logger.error('[AudioProcess] Uncaught exception ', error.stack);
});
process.on('disconnect', async () => {
Logger.warn('[AudioProcess] Parent process exited, cleaning things up and finishing child now...');
//TODO below
//async AudioManager.stopAll();
process.exit(0);
});
// Added this listener to identify unhandled promises, but we should start making
// sense of those as we find them
process.on('unhandledRejection', (reason, p) => {
Logger.error('[AudioProcess] Unhandled Rejection at: Promise', p, 'reason:', reason);
});
// This basically starts the audio Manager routines which listens the connection
// manager messages routed to "to-sfu-audio"
am.start();
newProcess.start();

View File

@ -130,7 +130,7 @@ module.exports = class Audio {
case "MediaFlowOutStateChange":
Logger.info('[audio]', msEvent.type, '[' + msEvent.state? msEvent.state : 'UNKNOWN_STATE' + ']', 'for media session', event.id);
// TODO treat this accordingly =( (prlanzarin 05/02/2018)
break;
case "MediaFlowInStateChange":
@ -159,9 +159,9 @@ module.exports = class Audio {
this.userId = await this.mcs.join(this.voiceBridge, 'SFU', {});
Logger.info("[audio] MCS join for", this.id, "returned", this.userId);
const ret = await this.mcs.publish(this.userId,
this.voiceBridge,
'RtpEndpoint',
const ret = await this.mcs.publish(this.userId,
this.voiceBridge,
'RtpEndpoint',
{descriptor: sdpOffer, adapter: 'Freeswitch', name: callerName});
this.sourceAudio = ret.sessionId;
@ -171,9 +171,9 @@ module.exports = class Audio {
Logger.info("[audio] MCS publish for user", this.userId, "returned", this.sourceAudio);
}
const retSubscribe = await this.mcs.subscribe(this.userId,
const retSubscribe = await this.mcs.subscribe(this.userId,
this.sourceAudio,
'WebRtcEndpoint',
'WebRtcEndpoint',
{descriptor: sdpOffer, adapter: 'Kurento'});
this.audioEndpoints[connectionId] = retSubscribe.sessionId;
@ -204,7 +204,7 @@ module.exports = class Audio {
if (listener) {
try {
if (this.audioEndpoints && Object.keys(this.audioEndpoints).length === 1) {
await this.mcs.leave(this.voiceBridge, this.userId);
await this.mcs.leave(this.voiceBridge, this.userId);
this.sourceAudioStarted = false;
}
else {
@ -223,7 +223,7 @@ module.exports = class Audio {
}
}
async stopAll () {
async stop () {
Logger.info('[audio] Releasing endpoints for user', this.userId, 'at room', this.voiceBridge);
try {

View File

@ -148,6 +148,9 @@ const config = require('config');
VIDEO_PROCESS_PREFIX: '[VideoProcess]',
VIDEO_MANAGER_PREFIX: '[VideoManager]',
VIDEO_PROVIDER_PREFIX: '[VideoProvider]',
AUDIO_PROCESS_PREFIX: '[AudioProcess]',
AUDIO_MANAGER_PREFIX: '[AudioManager]',
AUDIO_PROVIDER_PREFIX: '[AudioProvider]',
// MCS error codes
MEDIA_SERVER_OFFLINE: "1000",