Tracking media pipeline active subscribers in SFU to properly clean them

Also did some more work on error handling at the kurento adapter
This commit is contained in:
prlanzarin 2018-01-29 15:10:05 +00:00
parent 438faef3d7
commit f9bd7a5e37
3 changed files with 65 additions and 20 deletions

View File

@ -52,6 +52,7 @@ exports.EVENT.MEDIA_STATE.ICE = "OnIceCandidate"
// Error codes
exports.ERROR = {};
exports.ERROR.MEDIA_SERVER_OFFLINE = "1000";
exports.ERROR.MEDIA_SERVER_ERROR = "1001";
// RTP params
exports.SDP = {};

View File

@ -42,6 +42,7 @@ module.exports = class MediaServer extends EventEmitter {
return new Promise((resolve, reject) => {
mediaServerClient(serverUri, {failAfter: 3}, (error, client) => {
if (error) {
error = this._handleError(error);
reject(error);
}
@ -50,30 +51,32 @@ module.exports = class MediaServer extends EventEmitter {
});
}
_getMediaPipeline (conference) {
_getMediaPipeline (roomId) {
return new Promise((resolve, reject) => {
if (this._mediaPipelines[conference]) {
Logger.warn('[mcs-media] Pipeline for', conference, ' already exists.');
resolve(this._mediaPipelines[conference]);
if (this._mediaPipelines[roomId]) {
Logger.warn('[mcs-media] Pipeline for', roomId, ' already exists.');
return this._mediaPipelines[roomId];
}
else {
this._mediaServer.create('MediaPipeline', (error, pipeline) => {
if (error) {
Logger.error('[mcs-media] Create MediaPipeline returned error', error);
error = this._handleError(error);
reject(error);
}
this._mediaPipelines[conference] = pipeline;
this._mediaPipelines[roomId] = pipeline;
pipeline.activeElements = 0;
resolve(pipeline);
});
}
});
}
_releasePipeline (pipelineId) {
let mediaPipeline = this._mediaPipelines[pipelineId];
if (typeof mediaPipeline !== 'undefined' && typeof mediaPipeline.release === 'function') {
mediaElement.release();
_releasePipeline (room) {
Logger.debug("[mcs-media] Releasing room", room, "pipeline");
let pipeline = this._mediaPipelines[room];
if (pipeline && typeof pipeline.release === 'function') {
pipeline.release();
delete this._mediaPipelines[room];
}
}
@ -81,6 +84,7 @@ module.exports = class MediaServer extends EventEmitter {
return new Promise((resolve, reject) => {
pipeline.create(type, (error, mediaElement) => {
if (error) {
error = this._handleError(error);
return reject(error);
}
Logger.info("[mcs-media] Created [" + type + "] media element: " + mediaElement.id);
@ -91,14 +95,15 @@ module.exports = class MediaServer extends EventEmitter {
}
async createMediaElement (conference, type) {
async createMediaElement (roomId, type) {
try {
const pipeline = await this._getMediaPipeline(conference);
const pipeline = await this._getMediaPipeline(roomId);
const mediaElement = await this._createElement(pipeline, type);
this._mediaPipelines[roomId].activeElements++;
return Promise.resolve(mediaElement.id);
}
catch (err) {
return Promise.reject(new Error(err));
return Promise.reject(err);
}
}
@ -112,6 +117,7 @@ module.exports = class MediaServer extends EventEmitter {
case 'ALL':
source.connect(sink, (error) => {
if (error) {
error = this._handleError(error);
return reject(error);
}
return resolve();
@ -120,9 +126,18 @@ module.exports = class MediaServer extends EventEmitter {
case 'AUDIO':
source.connect(sink, 'AUDIO', (error) => {
if (error) {
error = this._handleError(error);
return reject(error);
}
return resolve();
});
case 'VIDEO':
source.connect(sink, (error) => {
if (error) {
error = this._handleError(error);
return reject(error);
}
return resolve();
@ -138,13 +153,24 @@ module.exports = class MediaServer extends EventEmitter {
}
}
stop (elementId) {
stop (room, elementId) {
Logger.info("[mcs-media] Releasing endpoint", elementId, "from room", room);
let mediaElement = this._mediaElements[elementId];
if (typeof mediaElement !== 'undefined' && typeof mediaElement.release === 'function') {
Logger.info("[mcs-media] Releasing endpoint => " + elementId);
let pipeline = this._mediaPipelines[room];
if (mediaElement && typeof mediaElement.release === 'function') {
pipeline.activeElements--;
Logger.info("[mcs-media] Pipeline has a total of", pipeline.activeElements, "active elements");
if (pipeline.activeElements <= 0) {
this._releasePipeline(room);
}
mediaElement.release();
this._mediaElements[elementId] = null;
}
else {
Logger.warn("[mcs-media] Media element", elementId, "could not be found to stop");
}
}
@ -159,7 +185,7 @@ module.exports = class MediaServer extends EventEmitter {
return Promise.resolve();
}
else {
return Promise.reject(new Error("Candidate could not be parsed or media element does not exist"));
return Promise.reject("Candidate could not be parsed or media element does not exist");
}
}
@ -171,7 +197,8 @@ module.exports = class MediaServer extends EventEmitter {
if (typeof mediaElement !== 'undefined' && typeof mediaElement.gatherCandidates === 'function') {
mediaElement.gatherCandidates((error) => {
if (error) {
return reject(new Error(error));
error = this._handleError(error);
return reject(error);
}
Logger.info('[mcs-media] Triggered ICE gathering for ' + elementId);
return resolve();
@ -223,6 +250,7 @@ module.exports = class MediaServer extends EventEmitter {
if (typeof mediaElement !== 'undefined' && typeof mediaElement.processOffer === 'function') {
mediaElement.processOffer(sdpOffer, (error, answer) => {
if (error) {
error = this._handleError(error);
return reject(error);
}
return resolve(answer);
@ -296,4 +324,20 @@ module.exports = class MediaServer extends EventEmitter {
_destroyMediaServer() {
delete this._mediaServer;
}
_handleError(error) {
// Checking if the error needs to be wrapped into a JS Error instance
if(!isError(error)) {
error = new Error(error);
}
error.code = C.ERROR.MEDIA_SERVER_ERROR;
Logger.error('[mcs-media] Media Server returned error', error);
}
// duck
isError(error) {
return error && error.stack && error.message && typeof error.stack === 'string'
&& typeof error.message === 'string';
}
};

View File

@ -70,7 +70,7 @@ module.exports = class SdpSession extends EventEmitter {
async stop () {
this._status = C.STATUS.STOPPING;
try {
await this._MediaServer.stop(this._mediaElement);
await this._MediaServer.stop(this.room, this._mediaElement);
this._status = C.STATUS.STOPPED;
Logger.info("[mcs-sdp-session] Session ", this.id, " is going to stop...");
this.emit('SESSION_STOPPED', this.id);