From f9bd7a5e37d8cea5dfa8489ddc8c17ac2c80dd65 Mon Sep 17 00:00:00 2001 From: prlanzarin Date: Mon, 29 Jan 2018 15:10:05 +0000 Subject: [PATCH] Tracking media pipeline active subscribers in SFU to properly clean them Also did some more work on error handling at the kurento adapter --- .../lib/mcs-core/lib/constants/Constants.js | 1 + .../lib/mcs-core/lib/media/media-server.js | 82 ++++++++++++++----- .../lib/mcs-core/lib/model/SdpSession.js | 2 +- 3 files changed, 65 insertions(+), 20 deletions(-) diff --git a/labs/bbb-webrtc-sfu/lib/mcs-core/lib/constants/Constants.js b/labs/bbb-webrtc-sfu/lib/mcs-core/lib/constants/Constants.js index 467f769f59..3dc2693519 100644 --- a/labs/bbb-webrtc-sfu/lib/mcs-core/lib/constants/Constants.js +++ b/labs/bbb-webrtc-sfu/lib/mcs-core/lib/constants/Constants.js @@ -52,6 +52,7 @@ exports.EVENT.MEDIA_STATE.ICE = "OnIceCandidate" // Error codes exports.ERROR = {}; exports.ERROR.MEDIA_SERVER_OFFLINE = "1000"; +exports.ERROR.MEDIA_SERVER_ERROR = "1001"; // RTP params exports.SDP = {}; diff --git a/labs/bbb-webrtc-sfu/lib/mcs-core/lib/media/media-server.js b/labs/bbb-webrtc-sfu/lib/mcs-core/lib/media/media-server.js index 0d12a21f9f..b18582d830 100644 --- a/labs/bbb-webrtc-sfu/lib/mcs-core/lib/media/media-server.js +++ b/labs/bbb-webrtc-sfu/lib/mcs-core/lib/media/media-server.js @@ -42,6 +42,7 @@ module.exports = class MediaServer extends EventEmitter { return new Promise((resolve, reject) => { mediaServerClient(serverUri, {failAfter: 3}, (error, client) => { if (error) { + error = this._handleError(error); reject(error); } @@ -50,30 +51,32 @@ module.exports = class MediaServer extends EventEmitter { }); } - _getMediaPipeline (conference) { + _getMediaPipeline (roomId) { return new Promise((resolve, reject) => { - if (this._mediaPipelines[conference]) { - Logger.warn('[mcs-media] Pipeline for', conference, ' already exists.'); - resolve(this._mediaPipelines[conference]); + if (this._mediaPipelines[roomId]) { + Logger.warn('[mcs-media] Pipeline for', roomId, ' already exists.'); + return this._mediaPipelines[roomId]; } else { this._mediaServer.create('MediaPipeline', (error, pipeline) => { if (error) { - Logger.error('[mcs-media] Create MediaPipeline returned error', error); + error = this._handleError(error); reject(error); } - this._mediaPipelines[conference] = pipeline; + this._mediaPipelines[roomId] = pipeline; + pipeline.activeElements = 0; resolve(pipeline); }); } }); } - _releasePipeline (pipelineId) { - let mediaPipeline = this._mediaPipelines[pipelineId]; - - if (typeof mediaPipeline !== 'undefined' && typeof mediaPipeline.release === 'function') { - mediaElement.release(); + _releasePipeline (room) { + Logger.debug("[mcs-media] Releasing room", room, "pipeline"); + let pipeline = this._mediaPipelines[room]; + if (pipeline && typeof pipeline.release === 'function') { + pipeline.release(); + delete this._mediaPipelines[room]; } } @@ -81,6 +84,7 @@ module.exports = class MediaServer extends EventEmitter { return new Promise((resolve, reject) => { pipeline.create(type, (error, mediaElement) => { if (error) { + error = this._handleError(error); return reject(error); } Logger.info("[mcs-media] Created [" + type + "] media element: " + mediaElement.id); @@ -91,14 +95,15 @@ module.exports = class MediaServer extends EventEmitter { } - async createMediaElement (conference, type) { + async createMediaElement (roomId, type) { try { - const pipeline = await this._getMediaPipeline(conference); + const pipeline = await this._getMediaPipeline(roomId); const mediaElement = await this._createElement(pipeline, type); + this._mediaPipelines[roomId].activeElements++; return Promise.resolve(mediaElement.id); } catch (err) { - return Promise.reject(new Error(err)); + return Promise.reject(err); } } @@ -112,6 +117,7 @@ module.exports = class MediaServer extends EventEmitter { case 'ALL': source.connect(sink, (error) => { if (error) { + error = this._handleError(error); return reject(error); } return resolve(); @@ -120,9 +126,18 @@ module.exports = class MediaServer extends EventEmitter { case 'AUDIO': + source.connect(sink, 'AUDIO', (error) => { + if (error) { + error = this._handleError(error); + return reject(error); + } + return resolve(); + }); + case 'VIDEO': source.connect(sink, (error) => { if (error) { + error = this._handleError(error); return reject(error); } return resolve(); @@ -138,13 +153,24 @@ module.exports = class MediaServer extends EventEmitter { } } - stop (elementId) { + stop (room, elementId) { + Logger.info("[mcs-media] Releasing endpoint", elementId, "from room", room); let mediaElement = this._mediaElements[elementId]; - if (typeof mediaElement !== 'undefined' && typeof mediaElement.release === 'function') { - Logger.info("[mcs-media] Releasing endpoint => " + elementId); + let pipeline = this._mediaPipelines[room]; + if (mediaElement && typeof mediaElement.release === 'function') { + pipeline.activeElements--; + + Logger.info("[mcs-media] Pipeline has a total of", pipeline.activeElements, "active elements"); + if (pipeline.activeElements <= 0) { + this._releasePipeline(room); + } + mediaElement.release(); this._mediaElements[elementId] = null; } + else { + Logger.warn("[mcs-media] Media element", elementId, "could not be found to stop"); + } } @@ -159,7 +185,7 @@ module.exports = class MediaServer extends EventEmitter { return Promise.resolve(); } else { - return Promise.reject(new Error("Candidate could not be parsed or media element does not exist")); + return Promise.reject("Candidate could not be parsed or media element does not exist"); } } @@ -171,7 +197,8 @@ module.exports = class MediaServer extends EventEmitter { if (typeof mediaElement !== 'undefined' && typeof mediaElement.gatherCandidates === 'function') { mediaElement.gatherCandidates((error) => { if (error) { - return reject(new Error(error)); + error = this._handleError(error); + return reject(error); } Logger.info('[mcs-media] Triggered ICE gathering for ' + elementId); return resolve(); @@ -223,6 +250,7 @@ module.exports = class MediaServer extends EventEmitter { if (typeof mediaElement !== 'undefined' && typeof mediaElement.processOffer === 'function') { mediaElement.processOffer(sdpOffer, (error, answer) => { if (error) { + error = this._handleError(error); return reject(error); } return resolve(answer); @@ -296,4 +324,20 @@ module.exports = class MediaServer extends EventEmitter { _destroyMediaServer() { delete this._mediaServer; } + + _handleError(error) { + // Checking if the error needs to be wrapped into a JS Error instance + if(!isError(error)) { + error = new Error(error); + } + + error.code = C.ERROR.MEDIA_SERVER_ERROR; + Logger.error('[mcs-media] Media Server returned error', error); + } + + // duck + isError(error) { + return error && error.stack && error.message && typeof error.stack === 'string' + && typeof error.message === 'string'; + } }; diff --git a/labs/bbb-webrtc-sfu/lib/mcs-core/lib/model/SdpSession.js b/labs/bbb-webrtc-sfu/lib/mcs-core/lib/model/SdpSession.js index 1da857d047..00b6d5ff37 100644 --- a/labs/bbb-webrtc-sfu/lib/mcs-core/lib/model/SdpSession.js +++ b/labs/bbb-webrtc-sfu/lib/mcs-core/lib/model/SdpSession.js @@ -70,7 +70,7 @@ module.exports = class SdpSession extends EventEmitter { async stop () { this._status = C.STATUS.STOPPING; try { - await this._MediaServer.stop(this._mediaElement); + await this._MediaServer.stop(this.room, this._mediaElement); this._status = C.STATUS.STOPPED; Logger.info("[mcs-sdp-session] Session ", this.id, " is going to stop..."); this.emit('SESSION_STOPPED', this.id);