Added edge case to error handling on base classes and SFU pipeline cleanup on room empty

This commit is contained in:
prlanzarin 2018-10-11 18:54:09 +00:00
parent 7f3d3c0d32
commit e83e881f77
7 changed files with 30 additions and 8 deletions

View File

@ -143,6 +143,11 @@ module.exports = class BaseManager {
} }
_handleError (logPrefix, connectionId, streamId, role, error) { _handleError (logPrefix, connectionId, streamId, role, error) {
// Setting a default error in case it was unhandled
if (error == null) {
error = { code: 2200, reason: errors[2200] }
}
if (error && this._validateErrorMessage(error)) { if (error && this._validateErrorMessage(error)) {
return error; return error;
} }

View File

@ -33,6 +33,11 @@ module.exports = class BaseProvider extends EventEmitter {
_handleError (logPrefix, error, role, streamId) { _handleError (logPrefix, error, role, streamId) {
// Setting a default error in case it was unhandled
if (error == null) {
error = { code: 2200, reason: errors[2200] }
}
if (this._validateErrorMessage(error)) { if (this._validateErrorMessage(error)) {
return error; return error;
} }

View File

@ -12,10 +12,11 @@ const KURENTO_WEBSOCKET_POOL_SIZE = config.get('kurento-websocket-pool-size');
let instance = null; let instance = null;
module.exports = class MediaServer extends EventEmitter { module.exports = class MediaServer extends EventEmitter {
constructor(serverUri) { constructor(serverUri, globalEmitter) {
if (!instance){ if (!instance){
super(); super();
this._serverUri = serverUri; this._serverUri = serverUri;
this._globalEmitter = globalEmitter;
this._mediaPipelines = {}; this._mediaPipelines = {};
this._mediaElements = {}; this._mediaElements = {};
this._mediaServers; this._mediaServers;
@ -36,6 +37,7 @@ module.exports = class MediaServer extends EventEmitter {
let client = await this._getMediaServerClient(this._serverUri); let client = await this._getMediaServerClient(this._serverUri);
this._mediaServers.push(client); this._mediaServers.push(client);
} }
this._globalEmitter.on(C.EVENT.ROOM_EMPTY, this._releasePipeline.bind(this));
Logger.info("[mcs-media] Retrieved", this._mediaServers.length, "media server clients"); Logger.info("[mcs-media] Retrieved", this._mediaServers.length, "media server clients");
this._status = C.STATUS.STARTING; this._status = C.STATUS.STARTING;
this._monitorConnectionState(); this._monitorConnectionState();
@ -170,6 +172,7 @@ module.exports = class MediaServer extends EventEmitter {
if (error) { if (error) {
return reject(this._handleError(error)); return reject(this._handleError(error));
} }
Logger.debug("[mcs-media] Pipeline", pipeline.id, "released");
delete this._mediaPipelines[room]; delete this._mediaPipelines[room];
return resolve() return resolve()
}); });

View File

@ -39,6 +39,7 @@ exports.EVENT.MEDIA_STATE.FLOW_IN = "MediaFlowInStateChange"
exports.EVENT.MEDIA_STATE.ENDOFSTREAM = "EndOfStream" exports.EVENT.MEDIA_STATE.ENDOFSTREAM = "EndOfStream"
exports.EVENT.MEDIA_STATE.ICE = "OnIceCandidate" exports.EVENT.MEDIA_STATE.ICE = "OnIceCandidate"
exports.EVENT.SERVER_STATE = "ServerState" exports.EVENT.SERVER_STATE = "ServerState"
exports.EVENT.ROOM_EMPTY = "RoomEmpty"
exports.EVENT.RECORDING = {}; exports.EVENT.RECORDING = {};
exports.EVENT.RECORDING.STOPPED = 'Stopped'; exports.EVENT.RECORDING.STOPPED = 'Stopped';

View File

@ -45,7 +45,7 @@ module.exports = class MediaController {
let session; let session;
const room = await this.createRoomMCS(roomId); const room = await this.createRoomMCS(roomId);
const user = await this.createUserMCS(roomId, type, params); const user = await this.createUserMCS(roomId, type, params);
room.setUser(user.id); room.setUser(user);
if (params.sdp) { if (params.sdp) {
session = user.addSdp(params.sdp); session = user.addSdp(params.sdp);
@ -306,7 +306,7 @@ module.exports = class MediaController {
Logger.info("[mcs-controller] Creating new room with ID", roomId); Logger.info("[mcs-controller] Creating new room with ID", roomId);
if (this._rooms[roomId] == null) { if (this._rooms[roomId] == null) {
this._rooms[roomId] = new Room(roomId); this._rooms[roomId] = new Room(roomId, this.emitter);
} }
return Promise.resolve(this._rooms[roomId]); return Promise.resolve(this._rooms[roomId]);

View File

@ -32,7 +32,7 @@ module.exports = class MediaSession {
this._options = options; this._options = options;
this._adapter = options.adapter? options.adapter : C.STRING.KURENTO; this._adapter = options.adapter? options.adapter : C.STRING.KURENTO;
this._name = options.name? options.name : C.STRING.DEFAULT_NAME; this._name = options.name? options.name : C.STRING.DEFAULT_NAME;
this._MediaServer = MediaSession.getAdapter(this._adapter); this._MediaServer = MediaSession.getAdapter(this._adapter, emitter);
this.eventQueue = []; this.eventQueue = [];
} }
@ -147,14 +147,14 @@ module.exports = class MediaSession {
} }
} }
static getAdapter (adapter) { static getAdapter (adapter, emitter) {
let obj = null; let obj = null;
Logger.info("[mcs-media-session] Session is using the", adapter, "adapter"); Logger.info("[mcs-media-session] Session is using the", adapter, "adapter");
switch (adapter) { switch (adapter) {
case C.STRING.KURENTO: case C.STRING.KURENTO:
obj = new Kurento(kurentoUrl); obj = new Kurento(kurentoUrl, emitter);
break; break;
case C.STRING.FREESWITCH: case C.STRING.FREESWITCH:
obj = new Freeswitch(); obj = new Freeswitch();

View File

@ -5,11 +5,14 @@
'use strict' 'use strict'
const C = require('../constants/Constants');
module.exports = class Room { module.exports = class Room {
constructor(id) { constructor(id, emitter) {
this._id = id; this._id = id;
this._users = {}; this._users = {};
this._mcuUsers = {}; this._mcuUsers = {};
this.emitter = emitter;
} }
getUser (id) { getUser (id) {
@ -21,6 +24,11 @@ module.exports = class Room {
} }
destroyUser(userId) { destroyUser(userId) {
this._users[userId] = null; if (this._users[userId]) {
delete this._users[userId];
if (Object.keys(this._users).length <= 0) {
this.emitter.emit(C.EVENT.ROOM_EMPTY, this._id);
}
}
} }
} }