diff --git a/bigbluebutton-html5/imports/ui/components/screenshare/component.jsx b/bigbluebutton-html5/imports/ui/components/screenshare/component.jsx index 1367fe8553..6f81651802 100755 --- a/bigbluebutton-html5/imports/ui/components/screenshare/component.jsx +++ b/bigbluebutton-html5/imports/ui/components/screenshare/component.jsx @@ -11,6 +11,7 @@ export default class ScreenshareComponent extends React.Component { } componentWillUnmount() { this.props.presenterScreenshareHasEnded(); + this.props.unshareScreen(); } render() { diff --git a/labs/bbb-webrtc-sfu/lib/ProcessManager.js b/labs/bbb-webrtc-sfu/lib/ProcessManager.js index 0b09a2909b..e584b79128 100644 --- a/labs/bbb-webrtc-sfu/lib/ProcessManager.js +++ b/labs/bbb-webrtc-sfu/lib/ProcessManager.js @@ -105,8 +105,8 @@ module.exports = class ProcessManager { for (var proc in this.processes) { if (this.processes.hasOwnProperty(proc)) { let procObj = this.processes[proc]; - if (typeof procObj.disconnect === 'function' && !procObj.killed) { - await procObj.disconnect() + if (typeof procObj.exit === 'function' && !procObj.killed) { + await procObj.exit() } } } diff --git a/labs/bbb-webrtc-sfu/lib/base/BaseProcess.js b/labs/bbb-webrtc-sfu/lib/base/BaseProcess.js index a2695a4e70..4e3c64a90d 100644 --- a/labs/bbb-webrtc-sfu/lib/base/BaseProcess.js +++ b/labs/bbb-webrtc-sfu/lib/base/BaseProcess.js @@ -39,7 +39,6 @@ module.exports = class BaseProcess { handleException (error) { Logger.error(this.logPrefix, 'TODO => Uncaught exception', error.stack); - process.exit(1); } handleRejection (reason, promise) { diff --git a/labs/bbb-webrtc-sfu/lib/bbb/messages/Constants.js b/labs/bbb-webrtc-sfu/lib/bbb/messages/Constants.js index 911c5b3d58..0936370330 100644 --- a/labs/bbb-webrtc-sfu/lib/bbb/messages/Constants.js +++ b/labs/bbb-webrtc-sfu/lib/bbb/messages/Constants.js @@ -107,7 +107,10 @@ const config = require('config'); SCREENSHARE_PROVIDER_PREFIX: '[ScreenshareProvider]', VIDEO_PROCESS_PREFIX: '[VideoProcess]', VIDEO_MANAGER_PREFIX: '[VideoManager]', - VIDEO_PROVIDER_PREFIX: '[VideoProvider]' + VIDEO_PROVIDER_PREFIX: '[VideoProvider]', + + // MCS error codes + MEDIA_SERVER_OFFLINE: "1000" } } diff --git a/labs/bbb-webrtc-sfu/lib/mcs-core/lib/constants/Constants.js b/labs/bbb-webrtc-sfu/lib/mcs-core/lib/constants/Constants.js index 83f874a613..5b1a21525c 100644 --- a/labs/bbb-webrtc-sfu/lib/mcs-core/lib/constants/Constants.js +++ b/labs/bbb-webrtc-sfu/lib/mcs-core/lib/constants/Constants.js @@ -37,6 +37,7 @@ exports.EVENT.MEDIA_STATE.FLOW_OUT = "MediaFlowOutStateChange" exports.EVENT.MEDIA_STATE.FLOW_IN = "MediaFlowInStateChange" exports.EVENT.MEDIA_STATE.ENDOFSTREAM = "EndOfStream" exports.EVENT.MEDIA_STATE.ICE = "OnIceCandidate" +exports.EVENT.SERVER_STATE = "ServerState" // Error codes exports.ERROR = {}; diff --git a/labs/bbb-webrtc-sfu/lib/mcs-core/lib/media/MCSApiStub.js b/labs/bbb-webrtc-sfu/lib/mcs-core/lib/media/MCSApiStub.js index f7992ee7f8..4be9eb7f4c 100644 --- a/labs/bbb-webrtc-sfu/lib/mcs-core/lib/media/MCSApiStub.js +++ b/labs/bbb-webrtc-sfu/lib/mcs-core/lib/media/MCSApiStub.js @@ -61,6 +61,10 @@ module.exports = class MCSApiStub extends EventEmitter{ this.listener.on(C.EVENT.MEDIA_STATE.MEDIA_EVENT+sessionId, (event) => { this.emit(C.EVENT.MEDIA_STATE.MEDIA_EVENT+sessionId, event); }); + this.listener.on(C.EVENT.SERVER_STATE+sessionId, (event) => { + this.emit(C.EVENT.SERVER_STATE+sessionId, event); + }); + }); const answer = await this._mediaController.publish(user, room, type, params); return Promise.resolve(answer); @@ -70,7 +74,7 @@ module.exports = class MCSApiStub extends EventEmitter{ return Promise.reject(err); } } - + async unpublish (user, mediaId) { try { await this._mediaController.unpublish(mediaId); diff --git a/labs/bbb-webrtc-sfu/lib/mcs-core/lib/media/MediaController.js b/labs/bbb-webrtc-sfu/lib/mcs-core/lib/media/MediaController.js index ab6272be36..a10b4573d2 100644 --- a/labs/bbb-webrtc-sfu/lib/mcs-core/lib/media/MediaController.js +++ b/labs/bbb-webrtc-sfu/lib/mcs-core/lib/media/MediaController.js @@ -6,6 +6,7 @@ const Logger = require('../../../utils/Logger'); // Model const SfuUser = require('../model/SfuUser'); const Room = require('../model/Room.js'); +const isError = require('../utils/util').isError; /* PUBLIC ELEMENTS */ @@ -57,7 +58,7 @@ module.exports = class MediaController { return Promise.resolve(user.id); } catch (err) { - Logger.error("[mcs-controller] Join returned an error", err); + err = this._handleError(err); return Promise.reject(err); } } @@ -72,20 +73,20 @@ module.exports = class MediaController { return Promise.resolve(); } - const killedSessions = await user.leave(); + const killedMedias = await user.leave(); - for (var session in killedSessions) { - this._mediaSessions[killedSessions[session]] = null; - } + killedMedias.forEach((mediaId) => { + delete this._mediaSessions[killedMedias[mediaId]]; + }); room.destroyUser(user.id); - this._users[user.id] = null; - + delete this._users[user.id]; return Promise.resolve(); } catch (err) { - return Promise.reject(new Error(err)); + err = this._handleError(err); + return Promise.reject(err); } } @@ -94,18 +95,18 @@ module.exports = class MediaController { Logger.debug("[mcs-controler] PublishAndSubscribe descriptor is", params.descriptor); try { - let type = params.type; + const type = params.type; const user = this.getUserMCS(userId); - let userId = user.id; - let session = user.addSdp(sdp, type); - let sessionId = session.id; + const userId = user.id; + const session = user.addSdp(sdp, type); + const sessionId = session.id; - if (typeof this._mediaSessions[session.id] == 'undefined' || + if (typeof this._mediaSessions[session.id] == 'undefined' || !this._mediaSessions[session.id]) { this._mediaSessions[session.id] = {}; } - this._mediaSessions[session.id] = session; + this._mediaSessions[session.id] = session; const answer = await user.startSession(session.id); await user.connect(sourceId, session.id); @@ -114,7 +115,7 @@ module.exports = class MediaController { return Promise.resolve({userId, sessionId}); } catch (err) { - Logger.error("[mcs-controller] PublishAndSubscribe returned an error", err); + err = this._handleError(err); return Promise.reject(err); } } @@ -122,10 +123,7 @@ module.exports = class MediaController { async publish (userId, roomId, type, params) { Logger.info("[mcs-controller] Publish from user", userId, "to room", roomId); Logger.debug("[mcs-controler] Publish descriptor is", params.descriptor); - let session; - // TODO handle mediaType - let mediaType = params.mediaType; let answer; try { @@ -138,20 +136,6 @@ module.exports = class MediaController { case "RtpEndpoint": case "WebRtcEndpoint": session = user.addSdp(params.descriptor, type); - session.emitter.on(C.EVENT.MEDIA_SESSION_STOPPED, (pubId) => { - Logger.info("[mcs-controller] Media session", session.id, "stopped"); - if(pubId === session.id) { - for (var sub in session.subscribedSessions) { - Logger.info("[mcs-controller] Unsubscribing session ", sub); - let subSession = this._mediaSessions[sub]; - if (subSession) { - subSession.stop(); - delete this._mediaSessions[sub]; - } - } - } - }); - answer = await user.startSession(session.id); break; case "URI": @@ -160,11 +144,11 @@ module.exports = class MediaController { answer = await user.startSession(session.id); break; - default: return Promise.reject("[mcs-controller] Invalid media type"); + default: return Promise.reject(new Error("[mcs-controller] Invalid media type")); } } catch (err) { - Logger.error('[mcs-controller] Publish failed with an error', err); + err = this._handleError(err); return Promise.reject(err); } @@ -184,10 +168,8 @@ module.exports = class MediaController { Logger.debug("[mcs-controler] Subscribe descriptor is", params.descriptor); let session; - // TODO handle mediaType - let mediaType = params.mediaType; let answer; - let sourceSession = this._mediaSessions[sourceId]; + const sourceSession = this._mediaSessions[sourceId]; if (typeof sourceSession === 'undefined') { return Promise.reject(new Error("[mcs-controller] Media session", sourceId, "was not found")); @@ -219,7 +201,7 @@ module.exports = class MediaController { } } catch (err) { - Logger.error("[mcs-controller] Subscribe failed with an error", err); + err = this._handleError(err); return Promise.reject(err); } @@ -229,27 +211,23 @@ module.exports = class MediaController { } this._mediaSessions[session.id] = session; - let sessionId = session.id; + const sessionId = session.id; return Promise.resolve({answer, sessionId}); } async unpublish (userId, mediaId) { try { - const session = this._mediaSessions[mediaId]; const user = this.getUserMCS(userId); - - if(typeof session === 'undefined' || !session) { - return Promise.resolve(); + if (user) { + const answer = await user.unpublish(mediaId); + this._mediaSessions[mediaId] = null; } - - - const answer = await user.unpublish(mediaId); - this._mediaSessions[mediaId] = null; return Promise.resolve(answer); } catch (err) { - return Promise.reject(new Error(err)); + err = this._handleError(err); + return Promise.reject(err); } } @@ -263,13 +241,14 @@ module.exports = class MediaController { return Promise.resolve(); } catch (err) { - return Promise.reject(new Error(err)); + err = this._handleError(err); + return Promise.reject(err); } } async addIceCandidate (mediaId, candidate) { - let session = this._mediaSessions[mediaId]; - if (typeof session === 'undefined') { + const session = this._mediaSessions[mediaId]; + if (!session) { return Promise.reject(new Error("[mcs-controller] Media session " + mediaId + " was not found")); } try { @@ -277,7 +256,7 @@ module.exports = class MediaController { return Promise.resolve(ack); } catch (err) { - Logger.error("[mcs-controller] addIceCandidate failed with an error", err); + err = this._handleError(err); return Promise.reject(err); } } @@ -328,4 +307,13 @@ module.exports = class MediaController { getUserMCS (userId) { return this._users[userId]; } + + _handleError (error) { + Logger.error("[mcs-controller] Controller received error", error); + // Checking if the error needs to be wrapped into a JS Error instance + if (isError(error)) { + error = new Error(error); + } + return error; + } } diff --git a/labs/bbb-webrtc-sfu/lib/mcs-core/lib/media/media-server.js b/labs/bbb-webrtc-sfu/lib/mcs-core/lib/media/media-server.js index 4ce846dac4..5634a29812 100644 --- a/labs/bbb-webrtc-sfu/lib/mcs-core/lib/media/media-server.js +++ b/labs/bbb-webrtc-sfu/lib/mcs-core/lib/media/media-server.js @@ -5,6 +5,7 @@ const config = require('config'); const mediaServerClient = require('kurento-client'); const EventEmitter = require('events').EventEmitter; const Logger = require('../../../utils/Logger'); +const isError = require('../utils/util').isError; let instance = null; @@ -33,7 +34,7 @@ module.exports = class MediaServer extends EventEmitter { _getMediaServerClient (serverUri) { return new Promise((resolve, reject) => { - mediaServerClient(serverUri, {failAfter: 5}, (error, client) => { + mediaServerClient(serverUri, {failAfter: 1}, (error, client) => { if (error) { error = this._handleError(error); return reject(error); @@ -103,12 +104,20 @@ module.exports = class MediaServer extends EventEmitter { } _releasePipeline (room) { - Logger.debug("[mcs-media] Releasing room", room, "pipeline"); - let pipeline = this._mediaPipelines[room]; - if (pipeline && typeof pipeline.release === 'function') { - pipeline.release(); - delete this._mediaPipelines[room]; - } + return new Promise((resolve, reject) => { + Logger.debug("[mcs-media] Releasing room", room, "pipeline"); + let pipeline = this._mediaPipelines[room]; + if (pipeline && typeof pipeline.release === 'function') { + pipeline.release((error) => { + if (error) { + error = this._handleError(error); + return reject(error); + } + delete this._mediaPipelines[room]; + return resolve() + }); + } + }); } _createElement (pipeline, type) { @@ -187,24 +196,42 @@ module.exports = class MediaServer extends EventEmitter { } stop (room, elementId) { - Logger.info("[mcs-media] Releasing endpoint", elementId, "from room", room); - let mediaElement = this._mediaElements[elementId]; - let pipeline = this._mediaPipelines[room]; + return new Promise((resolve, reject) => { + try { + Logger.info("[mcs-media] Releasing endpoint", elementId, "from room", room); + const mediaElement = this._mediaElements[elementId]; + const pipeline = this._mediaPipelines[room]; - if (mediaElement && typeof mediaElement.release === 'function') { - pipeline.activeElements--; + if (mediaElement && typeof mediaElement.release === 'function') { + mediaElement.release(async (error) => { + if (error) { + error = this._handleError(error); + return reject(error); + } - Logger.info("[mcs-media] Pipeline has a total of", pipeline.activeElements, "active elements"); - if (pipeline.activeElements <= 0) { - this._releasePipeline(room); + delete this._mediaElements[elementId]; + + if (pipeline) { + pipeline.activeElements--; + + Logger.info("[mcs-media] Pipeline has a total of", pipeline.activeElements, "active elements"); + if (pipeline.activeElements <= 0) { + await this._releasePipeline(room); + } + } + return resolve(); + }); + } + else { + Logger.warn("[mcs-media] Media element", elementId, "could not be found to stop"); + return resolve(); + } } - - mediaElement.release(); - this._mediaElements[elementId] = null; - } - else { - Logger.warn("[mcs-media] Media element", elementId, "could not be found to stop"); - } + catch (err) { + err = this._handleError(err); + resolve(); + } + }); } @@ -360,17 +387,12 @@ module.exports = class MediaServer extends EventEmitter { _handleError(error) { // Checking if the error needs to be wrapped into a JS Error instance - if(!this.isError(error)) { + if (isError(error)) { error = new Error(error); } error.code = C.ERROR.MEDIA_SERVER_ERROR; Logger.error('[mcs-media] Media Server returned an', error.message); - } - - // duck - isError(error) { - return error && error.stack && error.message && typeof error.stack === 'string' - && typeof error.message === 'string'; + return error; } }; diff --git a/labs/bbb-webrtc-sfu/lib/mcs-core/lib/model/MediaSession.js b/labs/bbb-webrtc-sfu/lib/mcs-core/lib/model/MediaSession.js index 4abe88991f..1e9c1dd2fb 100644 --- a/labs/bbb-webrtc-sfu/lib/mcs-core/lib/model/MediaSession.js +++ b/labs/bbb-webrtc-sfu/lib/mcs-core/lib/model/MediaSession.js @@ -11,6 +11,7 @@ const MediaServer = require('../media/media-server'); const config = require('config'); const kurentoUrl = config.get('kurentoUrl'); const Logger = require('../../../utils/Logger'); +const isError = require('../utils/util').isError; module.exports = class MediaSession { constructor(emitter, room, type) { @@ -31,6 +32,7 @@ module.exports = class MediaSession { Logger.info("[mcs-media-session] Starting new media session", this.id, "in room", this.room ); this._mediaElement = await this._MediaServer.createMediaElement(this.room, this._type); + this._status = C.STATUS.STARTED; this._MediaServer.trackMediaState(this._mediaElement, this._type); this._MediaServer.on(C.EVENT.MEDIA_STATE.MEDIA_EVENT+this._mediaElement, (event) => { setTimeout(() => { @@ -39,26 +41,44 @@ module.exports = class MediaSession { }, 50); }); + this._MediaServer.on(C.ERROR.MEDIA_SERVER_OFFLINE, () => { + let event = {}; + event.eventTag = C.ERROR.MEDIA_SERVER_OFFLINE; + event.id = this.id; + this.emitter.emit(C.EVENT.SERVER_STATE+this.id, event); + }); + + this._MediaServer.on(C.EVENT.MEDIA_SERVER_ONLINE, () => { + let event = {}; + event.eventTag = C.EVENT.MEDIA_SERVER_ONLINE; + event.id = this.id; + this.emitter.emit(C.EVENT.SERVER_STATE+this.id); + }); + return Promise.resolve(this._mediaElement); } catch (err) { - this.handleError(err); + err = this._handlerError(err); return Promise.reject(err); } } async stop () { - this._status = C.STATUS.STOPPING; - try { - await this._MediaServer.stop(this.room, this._mediaElement); - this._status = C.STATUS.STOPPED; - Logger.info("[mcs-media-session] Session ", this.id, " is going to stop..."); - this.emitter.emit(C.EVENT.MEDIA_SESSION_STOPPED, this.id); - Promise.resolve(); - } - catch (err) { - this.handleError(err); - Promise.reject(err); + if (this._status === C.STATUS.STARTED) { + this._status = C.STATUS.STOPPING; + try { + await this._MediaServer.stop(this.room, this._mediaElement); + this._status = C.STATUS.STOPPED; + Logger.info("[mcs-media-session] Session", this.id, "stopped with status", this._status); + this.emitter.emit(C.EVENT.MEDIA_SESSION_STOPPED, this.id); + return Promise.resolve(); + } + catch (err) { + err = this._handlerError(err); + return Promise.reject(err); + } + } else { + return Promise.resolve(); } } @@ -70,7 +90,7 @@ module.exports = class MediaSession { return Promise.resolve(); } catch (err) { - this.handleError(err); + err = this._handlerError(err); return Promise.reject(err); } } @@ -79,8 +99,13 @@ module.exports = class MediaSession { this._MediaServer.addMediaEventListener (type, mediaId); } - handleError (err) { - Logger.error("[mcs-media-session] SFU MediaSession received an error", err); + _handlerError (error) { + Logger.error("[mcs-media-session] SFU MediaSession received an error", error); + // Checking if the error needs to be wrapped into a JS Error instance + if (isError(error)) { + error = new Error(error); + } this._status = C.STATUS.STOPPED; + return error; } } diff --git a/labs/bbb-webrtc-sfu/lib/mcs-core/lib/model/SdpSession.js b/labs/bbb-webrtc-sfu/lib/mcs-core/lib/model/SdpSession.js index 3d77ff46ba..abec2ea3cf 100644 --- a/labs/bbb-webrtc-sfu/lib/mcs-core/lib/model/SdpSession.js +++ b/labs/bbb-webrtc-sfu/lib/mcs-core/lib/model/SdpSession.js @@ -31,7 +31,7 @@ module.exports = class SdpSession extends MediaSession { async processDescriptor () { try { - const answer = await this._MediaServer.processOffer(this._mediaElement, this._sdp.getPlainSdp()); + const answer = await this._MediaServer.processOffer(this._mediaElement, this._sdp.getPlainSdp()); if (this._type === 'WebRtcEndpoint') { this._MediaServer.gatherCandidates(this._mediaElement); @@ -40,7 +40,7 @@ module.exports = class SdpSession extends MediaSession { return Promise.resolve(answer); } catch (err) { - this.handleError(err); + err = this._handleError(err); return Promise.reject(err); } } @@ -51,6 +51,7 @@ module.exports = class SdpSession extends MediaSession { Promise.resolve(); } catch (err) { + err = this._handleError(err); Promise.reject(err); } } diff --git a/labs/bbb-webrtc-sfu/lib/mcs-core/lib/model/SfuUser.js b/labs/bbb-webrtc-sfu/lib/mcs-core/lib/model/SfuUser.js index 3b8cf0e52b..4c606b0bfb 100644 --- a/labs/bbb-webrtc-sfu/lib/mcs-core/lib/model/SfuUser.js +++ b/labs/bbb-webrtc-sfu/lib/mcs-core/lib/model/SfuUser.js @@ -11,6 +11,7 @@ const SdpWrapper = require('../utils/SdpWrapper'); const SdpSession = require('../model/SdpSession'); const UriSession = require('../model/UriSession'); const Logger = require('../../../utils/Logger'); +const isError = require('../utils/util').isError; module.exports = class SfuUser extends User { constructor(_roomId, type, emitter, userAgentString = C.STRING.ANONYMOUS, sdp = null, uri = null) { @@ -32,17 +33,17 @@ module.exports = class SfuUser extends User { // TODO switch from type to children UriSessions (RTSP|HTTP|etc) let session = new UriSession(uri, type); - if (typeof this._mediaSessions[session.id] == 'undefined' || + if (typeof this._mediaSessions[session.id] == 'undefined' || !this._mediaSessions[session.id]) { this._mediaSessions[session.id] = {}; } - this._mediaSessions[session.id] = session; + this._mediaSessions[session.id] = session; try { await this.startSession(session.id); Promise.resolve(session.id); } catch (err) { - this.handleError(err); + err = this.handleError(err); Promise.reject(err); } } @@ -52,17 +53,17 @@ module.exports = class SfuUser extends User { let session = new SdpSession(this.emitter, sdp, this.roomId, type); this.emitter.emit(C.EVENT.NEW_SESSION+this.id, session.id); session.emitter.on(C.EVENT.MEDIA_SESSION_STOPPED, (sessId) => { - Logger.info("[mcs-sfu-user] Session ", sessId, "stopped, cleaning it..."); if (sessId === session.id) { + Logger.info("[mcs-sfu-user] Session ", sessId, "stopped, cleaning it..."); this._mediaSessions[sessId] = null; } }); - if (typeof this._mediaSessions[session.id] == 'undefined' || + if (typeof this._mediaSessions[session.id] == 'undefined' || !this._mediaSessions[session.id]) { this._mediaSessions[session.id] = {}; } - this._mediaSessions[session.id] = session; + this._mediaSessions[session.id] = session; Logger.info("[mcs-sfu-user] Added new SDP session", session.id, "to user", this.id); return session; @@ -70,14 +71,14 @@ module.exports = class SfuUser extends User { async startSession (sessionId) { let session = this._mediaSessions[sessionId]; - + try { const mediaElement = await session.start(); const answer = await session.processDescriptor(); return Promise.resolve(answer); } catch (err) { - this.handleError(err); + err = this.handleError(err); return Promise.reject(err); } } @@ -88,9 +89,9 @@ module.exports = class SfuUser extends User { await this.startSession(session.id); await this.connect(session.id, mediaId); Promise.resolve(session); - } + } catch (err) { - this.handleError(err); + err = this.handleError(err); Promise.reject(err); } } @@ -100,9 +101,9 @@ module.exports = class SfuUser extends User { try { await this.startSession(session.id); Promise.resolve(); - } + } catch (err) { - this.handleError(err); + err = this.handleError(err); Promise.reject(err); } } @@ -110,10 +111,10 @@ module.exports = class SfuUser extends User { async unsubscribe (mediaId) { try { await this.stopSession(mediaId); - Promise.resolve(); - } + Promise.resolve(mediaId); + } catch (err) { - this.handleError(err); + err = this.handleError(err); Promise.reject(err); } } @@ -122,9 +123,9 @@ module.exports = class SfuUser extends User { try { await this.stopSession(mediaId); Promise.resolve(); - } + } catch (err) { - this.handleError(err); + err = this.handleError(err); Promise.reject(err); } } @@ -134,12 +135,14 @@ module.exports = class SfuUser extends User { let session = this._mediaSessions[sessionId]; try { - await session.stop(); - this._mediaSessions[sessionId] = null; + if (session) { + await session.stop(); + delete this._mediaSessions[sessionId]; + } return Promise.resolve(); } catch (err) { - this.handleError(err); + err = this.handleError(err); Promise.reject(err); } } @@ -153,7 +156,7 @@ module.exports = class SfuUser extends User { return Promise.resolve(); } catch (err) { - this.handleError(err); + err = this.handleError(err); return Promise.reject(err); } } @@ -163,24 +166,31 @@ module.exports = class SfuUser extends User { } async leave () { - let sessions = Object.keys(this._mediaSessions); + const sessions = Object.keys(this._mediaSessions); + let stopProcedures = []; Logger.info("[mcs-sfu-user] User sessions will be killed"); try { - for (var session in sessions) { - await this.stopSession(sessions[session]); + for (let i = 0; i < sessions.length; i++) { + stopProcedures.push(this.stopSession(sessions[i])); } - return Promise.resolve(sessions); + return Promise.all(stopProcedures); } catch (err) { - this.handleError(err); + err = this.handleError(err); Promise.reject(err); } } - handleError (err) { - Logger.error("[mcs-sfu-user] SFU User received error", err); + handleError (error) { + Logger.error("[mcs-sfu-user] SFU User received error", error); + // Checking if the error needs to be wrapped into a JS Error instance + if (isError(error)) { + error = new Error(error); + } + error.code = C.ERROR.MEDIA_SERVER_ERROR; this._status = C.STATUS.STOPPED; + return error; } } diff --git a/labs/bbb-webrtc-sfu/lib/mcs-core/lib/utils/util.js b/labs/bbb-webrtc-sfu/lib/mcs-core/lib/utils/util.js new file mode 100644 index 0000000000..d7a9e52d0c --- /dev/null +++ b/labs/bbb-webrtc-sfu/lib/mcs-core/lib/utils/util.js @@ -0,0 +1,11 @@ +/** + * * @classdesc + * * Utils class for mcs-core + * * @constructor + * */ + + +exports.isError = function (error) { + return error && error.stack && error.message && typeof error.stack === 'string' + && typeof error.message === 'string'; +} diff --git a/labs/bbb-webrtc-sfu/lib/screenshare/ScreenshareManager.js b/labs/bbb-webrtc-sfu/lib/screenshare/ScreenshareManager.js index e6cb10ef91..6828365c9c 100644 --- a/labs/bbb-webrtc-sfu/lib/screenshare/ScreenshareManager.js +++ b/labs/bbb-webrtc-sfu/lib/screenshare/ScreenshareManager.js @@ -17,6 +17,7 @@ module.exports = class ScreenshareManager extends BaseManager { constructor (connectionChannel, additionalChannels, logPrefix) { super(connectionChannel, additionalChannels, logPrefix); this.messageFactory(this._onMessage); + this._iceQueues = {}; } _onMessage(message) { @@ -25,11 +26,21 @@ module.exports = class ScreenshareManager extends BaseManager { let session; let sessionId = message.voiceBridge; let connectionId = message.connectionId; + let iceQueue; if(this._sessions[sessionId]) { session = this._sessions[sessionId]; } + if (!this._iceQueues[sessionId] && message.voiceBridge) { + this._iceQueues[sessionId] = []; + } + + if (this._iceQueues[sessionId]) { + iceQueue = this._iceQueues[sessionId] ; + } + + switch (message.id) { case 'presenter': @@ -73,6 +84,19 @@ module.exports = class ScreenshareManager extends BaseManager { sdpAnswer : sdpAnswer }), C.FROM_SCREENSHARE); + // Empty ice queue after starting session + if (iceQueue) { + let candidate; + while(candidate = iceQueue.pop()) { + session.onIceCandidate(candidate); + } + } + + + session.once(C.MEDIA_SERVER_OFFLINE, (event) => { + this._stopSession(sessionId); + }); + Logger.info(this._logPrefix, "Sending presenterResponse to presenter", sessionId, "for connection", session._id); }); break; @@ -86,6 +110,14 @@ module.exports = class ScreenshareManager extends BaseManager { message.sdpOffer, connectionId, this._sessions[message.voiceBridge]._presenterEndpoint); + // Empty ice queue after starting session + if (iceQueue) { + let candidate; + while(candidate = iceQueue.pop()) { + session.onViewerIceCandidate(candidate); + } + } + } else { // TODO ERROR HANDLING } @@ -103,10 +135,16 @@ module.exports = class ScreenshareManager extends BaseManager { break; case 'onIceCandidate': - if (session) { + if (session.constructor === Screenshare) { session.onIceCandidate(message.candidate); } else { - Logger.warn(this._logPrefix, "There was no screensharing session for onIceCandidate for", sessionId, ". There should be a queue here"); + Logger.info(this._logPrefix, "Queueing ice candidate for later in screenshare", message.voiceBridge); + if (!iceQueue) { + this._iceQueues[sessionId] = []; + iceQueue = this._iceQueues[sessionId]; + } + + iceQueue.push(message.candidate); } break; @@ -114,7 +152,13 @@ module.exports = class ScreenshareManager extends BaseManager { if (session) { session.onViewerIceCandidate(message.candidate, connectionId); } else { - Logger.warn(this._logPrefix, "There was no screensharing session for onIceCandidate for", sessionId + ". There should be a queue here"); + Logger.info(this._logPrefix, "Queueing ice candidate for later in screenshare", message.voiceBridge); + if (!iceQueue) { + this._iceQueues[sessionId] = []; + iceQueue = this._iceQueues[sessionId]; + } + + iceQueue.push(message.candidate); } break; diff --git a/labs/bbb-webrtc-sfu/lib/screenshare/screenshare.js b/labs/bbb-webrtc-sfu/lib/screenshare/screenshare.js index 9f1709dbf1..9553bc1c30 100644 --- a/labs/bbb-webrtc-sfu/lib/screenshare/screenshare.js +++ b/labs/bbb-webrtc-sfu/lib/screenshare/screenshare.js @@ -17,6 +17,7 @@ const MCSApi = require('../mcs-core/lib/media/MCSApiStub'); const config = require('config'); const kurentoIp = config.get('kurentoIp'); const localIpAddress = config.get('localIpAddress'); +const EventEmitter = require('events').EventEmitter; const Logger = require('../utils/Logger'); // Global MCS endpoints mapping. These hashes maps IDs generated by the mcs-core @@ -24,8 +25,9 @@ const Logger = require('../utils/Logger'); var sharedScreens = {}; var rtpEndpoints = {}; -module.exports = class Screenshare { +module.exports = class Screenshare extends EventEmitter { constructor(id, bbbgw, voiceBridge, caller = 'caller', vh, vw, meetingId) { + super(); this.mcs = new MCSApi(); this._id = id; this._BigBlueButtonGW = bbbgw; @@ -145,6 +147,17 @@ module.exports = class Screenshare { } } + serverState (event) { + switch (event && event.eventTag) { + case C.MEDIA_SERVER_OFFLINE: + Logger.error("[screenshare] Screenshare provider received MEDIA_SERVER_OFFLINE event"); + this.emit(C.MEDIA_SERVER_OFFLINE, event); + break; + default: + Logger.warn("[screenshare] Unknown server state", event); + } + } + async _startPresenter(id, sdpOffer, callback) { let presenterSdpAnswer, rtpSdpAnswer; let _callback = callback; @@ -181,6 +194,9 @@ module.exports = class Screenshare { Logger.error("[screenshare] MCS publish returned error =>", err); return callback(err); } + finally { + this.mcs.once('ServerState' + this._presenterEndpoint, this.serverState.bind(this)); + } try { let sendVideoPort = MediaHandler.getVideoPort(); diff --git a/labs/bbb-webrtc-sfu/lib/video/VideoManager.js b/labs/bbb-webrtc-sfu/lib/video/VideoManager.js index 45122ee923..13d114a3a8 100755 --- a/labs/bbb-webrtc-sfu/lib/video/VideoManager.js +++ b/labs/bbb-webrtc-sfu/lib/video/VideoManager.js @@ -86,6 +86,10 @@ module.exports = class VideoManager extends BaseManager { }), C.FROM_VIDEO); } + video.once(C.MEDIA_SERVER_OFFLINE, async (event) => { + this._stopSession(sessionId); + }); + this._bbbGW.publish(JSON.stringify({ connectionId: connectionId, type: 'video', @@ -98,8 +102,8 @@ module.exports = class VideoManager extends BaseManager { break; case 'stop': - if (video.constructor=== Video) { - this._stopSession(sessionId, role, message.cameraId); + if (video.constructor === Video) { + this._stopSession(sessionId); } else { Logger.warn(this._logPrefix, "There is no video instance named", cameraId, "to stop"); } diff --git a/labs/bbb-webrtc-sfu/lib/video/video.js b/labs/bbb-webrtc-sfu/lib/video/video.js index 030515bd89..f9a3305600 100644 --- a/labs/bbb-webrtc-sfu/lib/video/video.js +++ b/labs/bbb-webrtc-sfu/lib/video/video.js @@ -7,11 +7,13 @@ const C = require('../bbb/messages/Constants'); const Logger = require('../utils/Logger'); const Messaging = require('../bbb/messages/Messaging'); const h264_sdp = require('../h264-sdp'); +const EventEmitter = require('events').EventEmitter; var sharedWebcams = {}; -module.exports = class Video { +module.exports = class Video extends EventEmitter { constructor(_bbbGW, _meetingId, _id, _shared, _sessionId) { + super(); this.mcs = new MCSApi(); this.bbbGW = _bbbGW; this.id = _id; @@ -55,6 +57,27 @@ module.exports = class Video { } } + serverState (event) { + switch (event && event.eventTag) { + case C.MEDIA_SERVER_OFFLINE: + Logger.error("[video] Video provider received MEDIA_SERVER_OFFLINE event"); + this.bbbGW.publish(JSON.stringify({ + connectionId: this.sessionId, + type: 'video', + id : 'error', + response : 'rejected', + cameraId : this.id, + message : C.MEDIA_SERVER_OFFLINE + }), C.FROM_VIDEO); + this.emit(C.MEDIA_SERVER_OFFLINE, event); + break; + + default: + Logger.warn("[video] Unknown server state", event); + } + } + + mediaState (event) { let msEvent = event.event; @@ -108,10 +131,9 @@ module.exports = class Video { cameraId: this.id, }), C.FROM_VIDEO); } - break; - default: Logger.warn("[video] Unrecognized event"); + default: Logger.warn("[video] Unrecognized event", event); } } @@ -134,18 +156,20 @@ module.exports = class Video { sdpAnswer = ret.answer; this.flushCandidatesQueue(); this.mcs.on('MediaEvent' + this.mediaId, this.mediaState.bind(this)); + this.mcs.on('ServerState' + this.mediaId, this.serverState.bind(this)); Logger.info("[video] MCS publish for user", this.userId, "returned", this.mediaId); return callback(null, sdpAnswer); } - else { + else if (sharedWebcams[this.id]) { const ret = await this.mcs.subscribe(this.userId, sharedWebcams[this.id], 'WebRtcEndpoint', {descriptor: sdpOffer}); this.mediaId = ret.sessionId; sdpAnswer = ret.answer; this.flushCandidatesQueue(); this.mcs.on('MediaEvent' + this.mediaId, this.mediaState.bind(this)); + this.mcs.on('ServerState' + this.mediaId, this.serverState.bind(this)); Logger.info("[video] MCS subscribe for user", this.userId, "returned", this.mediaId); @@ -159,26 +183,27 @@ module.exports = class Video { }; async stop () { - Logger.info('[video] Releasing endpoints for user', this.userId, 'at room', this.meetingId); + return new Promise(async (resolve, reject) => { + Logger.info('[video] Stopping video session', this.userId, 'at room', this.meetingId); - try { - await this.mcs.leave(this.meetingId, this.userId); - if (this.shared) { - sharedWebcams[this.id] = null; + try { + await this.mcs.leave(this.meetingId, this.userId); + if (this.shared) { + delete sharedWebcams[this.id]; + } + + if (this.notFlowingTimeout) { + clearTimeout(this.notFlowingTimeout); + delete this.notFlowingTimeout; + } + + delete this._candidatesQueue; + resolve(); } - - if (this.notFlowingTimeout) { - clearTimeout(this.notFlowingTimeout); - this.notFlowingTimeout = null; + catch (err) { + // TODO error handling + reject(); } - - this._candidatesQueue = null; - Promise.resolve(); - } - catch (err) { - // TODO error handling - Promise.reject(); - } - return; - }; + }); + } };