SFU responding to media server offline event and closing active media sessions

This commit is contained in:
prlanzarin 2018-04-10 13:32:03 +00:00
parent a01154dbe9
commit 553ea58fcc
16 changed files with 314 additions and 160 deletions

View File

@ -11,6 +11,7 @@ export default class ScreenshareComponent extends React.Component {
}
componentWillUnmount() {
this.props.presenterScreenshareHasEnded();
this.props.unshareScreen();
}
render() {

View File

@ -105,8 +105,8 @@ module.exports = class ProcessManager {
for (var proc in this.processes) {
if (this.processes.hasOwnProperty(proc)) {
let procObj = this.processes[proc];
if (typeof procObj.disconnect === 'function' && !procObj.killed) {
await procObj.disconnect()
if (typeof procObj.exit === 'function' && !procObj.killed) {
await procObj.exit()
}
}
}

View File

@ -39,7 +39,6 @@ module.exports = class BaseProcess {
handleException (error) {
Logger.error(this.logPrefix, 'TODO => Uncaught exception', error.stack);
process.exit(1);
}
handleRejection (reason, promise) {

View File

@ -107,7 +107,10 @@ const config = require('config');
SCREENSHARE_PROVIDER_PREFIX: '[ScreenshareProvider]',
VIDEO_PROCESS_PREFIX: '[VideoProcess]',
VIDEO_MANAGER_PREFIX: '[VideoManager]',
VIDEO_PROVIDER_PREFIX: '[VideoProvider]'
VIDEO_PROVIDER_PREFIX: '[VideoProvider]',
// MCS error codes
MEDIA_SERVER_OFFLINE: "1000"
}
}

View File

@ -37,6 +37,7 @@ exports.EVENT.MEDIA_STATE.FLOW_OUT = "MediaFlowOutStateChange"
exports.EVENT.MEDIA_STATE.FLOW_IN = "MediaFlowInStateChange"
exports.EVENT.MEDIA_STATE.ENDOFSTREAM = "EndOfStream"
exports.EVENT.MEDIA_STATE.ICE = "OnIceCandidate"
exports.EVENT.SERVER_STATE = "ServerState"
// Error codes
exports.ERROR = {};

View File

@ -61,6 +61,10 @@ module.exports = class MCSApiStub extends EventEmitter{
this.listener.on(C.EVENT.MEDIA_STATE.MEDIA_EVENT+sessionId, (event) => {
this.emit(C.EVENT.MEDIA_STATE.MEDIA_EVENT+sessionId, event);
});
this.listener.on(C.EVENT.SERVER_STATE+sessionId, (event) => {
this.emit(C.EVENT.SERVER_STATE+sessionId, event);
});
});
const answer = await this._mediaController.publish(user, room, type, params);
return Promise.resolve(answer);
@ -70,7 +74,7 @@ module.exports = class MCSApiStub extends EventEmitter{
return Promise.reject(err);
}
}
async unpublish (user, mediaId) {
try {
await this._mediaController.unpublish(mediaId);

View File

@ -6,6 +6,7 @@ const Logger = require('../../../utils/Logger');
// Model
const SfuUser = require('../model/SfuUser');
const Room = require('../model/Room.js');
const isError = require('../utils/util').isError;
/* PUBLIC ELEMENTS */
@ -57,7 +58,7 @@ module.exports = class MediaController {
return Promise.resolve(user.id);
}
catch (err) {
Logger.error("[mcs-controller] Join returned an error", err);
err = this._handleError(err);
return Promise.reject(err);
}
}
@ -72,20 +73,20 @@ module.exports = class MediaController {
return Promise.resolve();
}
const killedSessions = await user.leave();
const killedMedias = await user.leave();
for (var session in killedSessions) {
this._mediaSessions[killedSessions[session]] = null;
}
killedMedias.forEach((mediaId) => {
delete this._mediaSessions[killedMedias[mediaId]];
});
room.destroyUser(user.id);
this._users[user.id] = null;
delete this._users[user.id];
return Promise.resolve();
}
catch (err) {
return Promise.reject(new Error(err));
err = this._handleError(err);
return Promise.reject(err);
}
}
@ -94,18 +95,18 @@ module.exports = class MediaController {
Logger.debug("[mcs-controler] PublishAndSubscribe descriptor is", params.descriptor);
try {
let type = params.type;
const type = params.type;
const user = this.getUserMCS(userId);
let userId = user.id;
let session = user.addSdp(sdp, type);
let sessionId = session.id;
const userId = user.id;
const session = user.addSdp(sdp, type);
const sessionId = session.id;
if (typeof this._mediaSessions[session.id] == 'undefined' ||
if (typeof this._mediaSessions[session.id] == 'undefined' ||
!this._mediaSessions[session.id]) {
this._mediaSessions[session.id] = {};
}
this._mediaSessions[session.id] = session;
this._mediaSessions[session.id] = session;
const answer = await user.startSession(session.id);
await user.connect(sourceId, session.id);
@ -114,7 +115,7 @@ module.exports = class MediaController {
return Promise.resolve({userId, sessionId});
}
catch (err) {
Logger.error("[mcs-controller] PublishAndSubscribe returned an error", err);
err = this._handleError(err);
return Promise.reject(err);
}
}
@ -122,10 +123,7 @@ module.exports = class MediaController {
async publish (userId, roomId, type, params) {
Logger.info("[mcs-controller] Publish from user", userId, "to room", roomId);
Logger.debug("[mcs-controler] Publish descriptor is", params.descriptor);
let session;
// TODO handle mediaType
let mediaType = params.mediaType;
let answer;
try {
@ -138,20 +136,6 @@ module.exports = class MediaController {
case "RtpEndpoint":
case "WebRtcEndpoint":
session = user.addSdp(params.descriptor, type);
session.emitter.on(C.EVENT.MEDIA_SESSION_STOPPED, (pubId) => {
Logger.info("[mcs-controller] Media session", session.id, "stopped");
if(pubId === session.id) {
for (var sub in session.subscribedSessions) {
Logger.info("[mcs-controller] Unsubscribing session ", sub);
let subSession = this._mediaSessions[sub];
if (subSession) {
subSession.stop();
delete this._mediaSessions[sub];
}
}
}
});
answer = await user.startSession(session.id);
break;
case "URI":
@ -160,11 +144,11 @@ module.exports = class MediaController {
answer = await user.startSession(session.id);
break;
default: return Promise.reject("[mcs-controller] Invalid media type");
default: return Promise.reject(new Error("[mcs-controller] Invalid media type"));
}
}
catch (err) {
Logger.error('[mcs-controller] Publish failed with an error', err);
err = this._handleError(err);
return Promise.reject(err);
}
@ -184,10 +168,8 @@ module.exports = class MediaController {
Logger.debug("[mcs-controler] Subscribe descriptor is", params.descriptor);
let session;
// TODO handle mediaType
let mediaType = params.mediaType;
let answer;
let sourceSession = this._mediaSessions[sourceId];
const sourceSession = this._mediaSessions[sourceId];
if (typeof sourceSession === 'undefined') {
return Promise.reject(new Error("[mcs-controller] Media session", sourceId, "was not found"));
@ -219,7 +201,7 @@ module.exports = class MediaController {
}
}
catch (err) {
Logger.error("[mcs-controller] Subscribe failed with an error", err);
err = this._handleError(err);
return Promise.reject(err);
}
@ -229,27 +211,23 @@ module.exports = class MediaController {
}
this._mediaSessions[session.id] = session;
let sessionId = session.id;
const sessionId = session.id;
return Promise.resolve({answer, sessionId});
}
async unpublish (userId, mediaId) {
try {
const session = this._mediaSessions[mediaId];
const user = this.getUserMCS(userId);
if(typeof session === 'undefined' || !session) {
return Promise.resolve();
if (user) {
const answer = await user.unpublish(mediaId);
this._mediaSessions[mediaId] = null;
}
const answer = await user.unpublish(mediaId);
this._mediaSessions[mediaId] = null;
return Promise.resolve(answer);
}
catch (err) {
return Promise.reject(new Error(err));
err = this._handleError(err);
return Promise.reject(err);
}
}
@ -263,13 +241,14 @@ module.exports = class MediaController {
return Promise.resolve();
}
catch (err) {
return Promise.reject(new Error(err));
err = this._handleError(err);
return Promise.reject(err);
}
}
async addIceCandidate (mediaId, candidate) {
let session = this._mediaSessions[mediaId];
if (typeof session === 'undefined') {
const session = this._mediaSessions[mediaId];
if (!session) {
return Promise.reject(new Error("[mcs-controller] Media session " + mediaId + " was not found"));
}
try {
@ -277,7 +256,7 @@ module.exports = class MediaController {
return Promise.resolve(ack);
}
catch (err) {
Logger.error("[mcs-controller] addIceCandidate failed with an error", err);
err = this._handleError(err);
return Promise.reject(err);
}
}
@ -328,4 +307,13 @@ module.exports = class MediaController {
getUserMCS (userId) {
return this._users[userId];
}
_handleError (error) {
Logger.error("[mcs-controller] Controller received error", error);
// Checking if the error needs to be wrapped into a JS Error instance
if (isError(error)) {
error = new Error(error);
}
return error;
}
}

View File

@ -5,6 +5,7 @@ const config = require('config');
const mediaServerClient = require('kurento-client');
const EventEmitter = require('events').EventEmitter;
const Logger = require('../../../utils/Logger');
const isError = require('../utils/util').isError;
let instance = null;
@ -33,7 +34,7 @@ module.exports = class MediaServer extends EventEmitter {
_getMediaServerClient (serverUri) {
return new Promise((resolve, reject) => {
mediaServerClient(serverUri, {failAfter: 5}, (error, client) => {
mediaServerClient(serverUri, {failAfter: 1}, (error, client) => {
if (error) {
error = this._handleError(error);
return reject(error);
@ -103,12 +104,20 @@ module.exports = class MediaServer extends EventEmitter {
}
_releasePipeline (room) {
Logger.debug("[mcs-media] Releasing room", room, "pipeline");
let pipeline = this._mediaPipelines[room];
if (pipeline && typeof pipeline.release === 'function') {
pipeline.release();
delete this._mediaPipelines[room];
}
return new Promise((resolve, reject) => {
Logger.debug("[mcs-media] Releasing room", room, "pipeline");
let pipeline = this._mediaPipelines[room];
if (pipeline && typeof pipeline.release === 'function') {
pipeline.release((error) => {
if (error) {
error = this._handleError(error);
return reject(error);
}
delete this._mediaPipelines[room];
return resolve()
});
}
});
}
_createElement (pipeline, type) {
@ -187,24 +196,42 @@ module.exports = class MediaServer extends EventEmitter {
}
stop (room, elementId) {
Logger.info("[mcs-media] Releasing endpoint", elementId, "from room", room);
let mediaElement = this._mediaElements[elementId];
let pipeline = this._mediaPipelines[room];
return new Promise((resolve, reject) => {
try {
Logger.info("[mcs-media] Releasing endpoint", elementId, "from room", room);
const mediaElement = this._mediaElements[elementId];
const pipeline = this._mediaPipelines[room];
if (mediaElement && typeof mediaElement.release === 'function') {
pipeline.activeElements--;
if (mediaElement && typeof mediaElement.release === 'function') {
mediaElement.release(async (error) => {
if (error) {
error = this._handleError(error);
return reject(error);
}
Logger.info("[mcs-media] Pipeline has a total of", pipeline.activeElements, "active elements");
if (pipeline.activeElements <= 0) {
this._releasePipeline(room);
delete this._mediaElements[elementId];
if (pipeline) {
pipeline.activeElements--;
Logger.info("[mcs-media] Pipeline has a total of", pipeline.activeElements, "active elements");
if (pipeline.activeElements <= 0) {
await this._releasePipeline(room);
}
}
return resolve();
});
}
else {
Logger.warn("[mcs-media] Media element", elementId, "could not be found to stop");
return resolve();
}
}
mediaElement.release();
this._mediaElements[elementId] = null;
}
else {
Logger.warn("[mcs-media] Media element", elementId, "could not be found to stop");
}
catch (err) {
err = this._handleError(err);
resolve();
}
});
}
@ -360,17 +387,12 @@ module.exports = class MediaServer extends EventEmitter {
_handleError(error) {
// Checking if the error needs to be wrapped into a JS Error instance
if(!this.isError(error)) {
if (isError(error)) {
error = new Error(error);
}
error.code = C.ERROR.MEDIA_SERVER_ERROR;
Logger.error('[mcs-media] Media Server returned an', error.message);
}
// duck
isError(error) {
return error && error.stack && error.message && typeof error.stack === 'string'
&& typeof error.message === 'string';
return error;
}
};

View File

@ -11,6 +11,7 @@ const MediaServer = require('../media/media-server');
const config = require('config');
const kurentoUrl = config.get('kurentoUrl');
const Logger = require('../../../utils/Logger');
const isError = require('../utils/util').isError;
module.exports = class MediaSession {
constructor(emitter, room, type) {
@ -31,6 +32,7 @@ module.exports = class MediaSession {
Logger.info("[mcs-media-session] Starting new media session", this.id, "in room", this.room );
this._mediaElement = await this._MediaServer.createMediaElement(this.room, this._type);
this._status = C.STATUS.STARTED;
this._MediaServer.trackMediaState(this._mediaElement, this._type);
this._MediaServer.on(C.EVENT.MEDIA_STATE.MEDIA_EVENT+this._mediaElement, (event) => {
setTimeout(() => {
@ -39,26 +41,44 @@ module.exports = class MediaSession {
}, 50);
});
this._MediaServer.on(C.ERROR.MEDIA_SERVER_OFFLINE, () => {
let event = {};
event.eventTag = C.ERROR.MEDIA_SERVER_OFFLINE;
event.id = this.id;
this.emitter.emit(C.EVENT.SERVER_STATE+this.id, event);
});
this._MediaServer.on(C.EVENT.MEDIA_SERVER_ONLINE, () => {
let event = {};
event.eventTag = C.EVENT.MEDIA_SERVER_ONLINE;
event.id = this.id;
this.emitter.emit(C.EVENT.SERVER_STATE+this.id);
});
return Promise.resolve(this._mediaElement);
}
catch (err) {
this.handleError(err);
err = this._handlerError(err);
return Promise.reject(err);
}
}
async stop () {
this._status = C.STATUS.STOPPING;
try {
await this._MediaServer.stop(this.room, this._mediaElement);
this._status = C.STATUS.STOPPED;
Logger.info("[mcs-media-session] Session ", this.id, " is going to stop...");
this.emitter.emit(C.EVENT.MEDIA_SESSION_STOPPED, this.id);
Promise.resolve();
}
catch (err) {
this.handleError(err);
Promise.reject(err);
if (this._status === C.STATUS.STARTED) {
this._status = C.STATUS.STOPPING;
try {
await this._MediaServer.stop(this.room, this._mediaElement);
this._status = C.STATUS.STOPPED;
Logger.info("[mcs-media-session] Session", this.id, "stopped with status", this._status);
this.emitter.emit(C.EVENT.MEDIA_SESSION_STOPPED, this.id);
return Promise.resolve();
}
catch (err) {
err = this._handlerError(err);
return Promise.reject(err);
}
} else {
return Promise.resolve();
}
}
@ -70,7 +90,7 @@ module.exports = class MediaSession {
return Promise.resolve();
}
catch (err) {
this.handleError(err);
err = this._handlerError(err);
return Promise.reject(err);
}
}
@ -79,8 +99,13 @@ module.exports = class MediaSession {
this._MediaServer.addMediaEventListener (type, mediaId);
}
handleError (err) {
Logger.error("[mcs-media-session] SFU MediaSession received an error", err);
_handlerError (error) {
Logger.error("[mcs-media-session] SFU MediaSession received an error", error);
// Checking if the error needs to be wrapped into a JS Error instance
if (isError(error)) {
error = new Error(error);
}
this._status = C.STATUS.STOPPED;
return error;
}
}

View File

@ -31,7 +31,7 @@ module.exports = class SdpSession extends MediaSession {
async processDescriptor () {
try {
const answer = await this._MediaServer.processOffer(this._mediaElement, this._sdp.getPlainSdp());
const answer = await this._MediaServer.processOffer(this._mediaElement, this._sdp.getPlainSdp());
if (this._type === 'WebRtcEndpoint') {
this._MediaServer.gatherCandidates(this._mediaElement);
@ -40,7 +40,7 @@ module.exports = class SdpSession extends MediaSession {
return Promise.resolve(answer);
}
catch (err) {
this.handleError(err);
err = this._handleError(err);
return Promise.reject(err);
}
}
@ -51,6 +51,7 @@ module.exports = class SdpSession extends MediaSession {
Promise.resolve();
}
catch (err) {
err = this._handleError(err);
Promise.reject(err);
}
}

View File

@ -11,6 +11,7 @@ const SdpWrapper = require('../utils/SdpWrapper');
const SdpSession = require('../model/SdpSession');
const UriSession = require('../model/UriSession');
const Logger = require('../../../utils/Logger');
const isError = require('../utils/util').isError;
module.exports = class SfuUser extends User {
constructor(_roomId, type, emitter, userAgentString = C.STRING.ANONYMOUS, sdp = null, uri = null) {
@ -32,17 +33,17 @@ module.exports = class SfuUser extends User {
// TODO switch from type to children UriSessions (RTSP|HTTP|etc)
let session = new UriSession(uri, type);
if (typeof this._mediaSessions[session.id] == 'undefined' ||
if (typeof this._mediaSessions[session.id] == 'undefined' ||
!this._mediaSessions[session.id]) {
this._mediaSessions[session.id] = {};
}
this._mediaSessions[session.id] = session;
this._mediaSessions[session.id] = session;
try {
await this.startSession(session.id);
Promise.resolve(session.id);
}
catch (err) {
this.handleError(err);
err = this.handleError(err);
Promise.reject(err);
}
}
@ -52,17 +53,17 @@ module.exports = class SfuUser extends User {
let session = new SdpSession(this.emitter, sdp, this.roomId, type);
this.emitter.emit(C.EVENT.NEW_SESSION+this.id, session.id);
session.emitter.on(C.EVENT.MEDIA_SESSION_STOPPED, (sessId) => {
Logger.info("[mcs-sfu-user] Session ", sessId, "stopped, cleaning it...");
if (sessId === session.id) {
Logger.info("[mcs-sfu-user] Session ", sessId, "stopped, cleaning it...");
this._mediaSessions[sessId] = null;
}
});
if (typeof this._mediaSessions[session.id] == 'undefined' ||
if (typeof this._mediaSessions[session.id] == 'undefined' ||
!this._mediaSessions[session.id]) {
this._mediaSessions[session.id] = {};
}
this._mediaSessions[session.id] = session;
this._mediaSessions[session.id] = session;
Logger.info("[mcs-sfu-user] Added new SDP session", session.id, "to user", this.id);
return session;
@ -70,14 +71,14 @@ module.exports = class SfuUser extends User {
async startSession (sessionId) {
let session = this._mediaSessions[sessionId];
try {
const mediaElement = await session.start();
const answer = await session.processDescriptor();
return Promise.resolve(answer);
}
catch (err) {
this.handleError(err);
err = this.handleError(err);
return Promise.reject(err);
}
}
@ -88,9 +89,9 @@ module.exports = class SfuUser extends User {
await this.startSession(session.id);
await this.connect(session.id, mediaId);
Promise.resolve(session);
}
}
catch (err) {
this.handleError(err);
err = this.handleError(err);
Promise.reject(err);
}
}
@ -100,9 +101,9 @@ module.exports = class SfuUser extends User {
try {
await this.startSession(session.id);
Promise.resolve();
}
}
catch (err) {
this.handleError(err);
err = this.handleError(err);
Promise.reject(err);
}
}
@ -110,10 +111,10 @@ module.exports = class SfuUser extends User {
async unsubscribe (mediaId) {
try {
await this.stopSession(mediaId);
Promise.resolve();
}
Promise.resolve(mediaId);
}
catch (err) {
this.handleError(err);
err = this.handleError(err);
Promise.reject(err);
}
}
@ -122,9 +123,9 @@ module.exports = class SfuUser extends User {
try {
await this.stopSession(mediaId);
Promise.resolve();
}
}
catch (err) {
this.handleError(err);
err = this.handleError(err);
Promise.reject(err);
}
}
@ -134,12 +135,14 @@ module.exports = class SfuUser extends User {
let session = this._mediaSessions[sessionId];
try {
await session.stop();
this._mediaSessions[sessionId] = null;
if (session) {
await session.stop();
delete this._mediaSessions[sessionId];
}
return Promise.resolve();
}
catch (err) {
this.handleError(err);
err = this.handleError(err);
Promise.reject(err);
}
}
@ -153,7 +156,7 @@ module.exports = class SfuUser extends User {
return Promise.resolve();
}
catch (err) {
this.handleError(err);
err = this.handleError(err);
return Promise.reject(err);
}
}
@ -163,24 +166,31 @@ module.exports = class SfuUser extends User {
}
async leave () {
let sessions = Object.keys(this._mediaSessions);
const sessions = Object.keys(this._mediaSessions);
let stopProcedures = [];
Logger.info("[mcs-sfu-user] User sessions will be killed");
try {
for (var session in sessions) {
await this.stopSession(sessions[session]);
for (let i = 0; i < sessions.length; i++) {
stopProcedures.push(this.stopSession(sessions[i]));
}
return Promise.resolve(sessions);
return Promise.all(stopProcedures);
}
catch (err) {
this.handleError(err);
err = this.handleError(err);
Promise.reject(err);
}
}
handleError (err) {
Logger.error("[mcs-sfu-user] SFU User received error", err);
handleError (error) {
Logger.error("[mcs-sfu-user] SFU User received error", error);
// Checking if the error needs to be wrapped into a JS Error instance
if (isError(error)) {
error = new Error(error);
}
error.code = C.ERROR.MEDIA_SERVER_ERROR;
this._status = C.STATUS.STOPPED;
return error;
}
}

View File

@ -0,0 +1,11 @@
/**
* * @classdesc
* * Utils class for mcs-core
* * @constructor
* */
exports.isError = function (error) {
return error && error.stack && error.message && typeof error.stack === 'string'
&& typeof error.message === 'string';
}

View File

@ -17,6 +17,7 @@ module.exports = class ScreenshareManager extends BaseManager {
constructor (connectionChannel, additionalChannels, logPrefix) {
super(connectionChannel, additionalChannels, logPrefix);
this.messageFactory(this._onMessage);
this._iceQueues = {};
}
_onMessage(message) {
@ -25,11 +26,21 @@ module.exports = class ScreenshareManager extends BaseManager {
let session;
let sessionId = message.voiceBridge;
let connectionId = message.connectionId;
let iceQueue;
if(this._sessions[sessionId]) {
session = this._sessions[sessionId];
}
if (!this._iceQueues[sessionId] && message.voiceBridge) {
this._iceQueues[sessionId] = [];
}
if (this._iceQueues[sessionId]) {
iceQueue = this._iceQueues[sessionId] ;
}
switch (message.id) {
case 'presenter':
@ -73,6 +84,19 @@ module.exports = class ScreenshareManager extends BaseManager {
sdpAnswer : sdpAnswer
}), C.FROM_SCREENSHARE);
// Empty ice queue after starting session
if (iceQueue) {
let candidate;
while(candidate = iceQueue.pop()) {
session.onIceCandidate(candidate);
}
}
session.once(C.MEDIA_SERVER_OFFLINE, (event) => {
this._stopSession(sessionId);
});
Logger.info(this._logPrefix, "Sending presenterResponse to presenter", sessionId, "for connection", session._id);
});
break;
@ -86,6 +110,14 @@ module.exports = class ScreenshareManager extends BaseManager {
message.sdpOffer,
connectionId,
this._sessions[message.voiceBridge]._presenterEndpoint);
// Empty ice queue after starting session
if (iceQueue) {
let candidate;
while(candidate = iceQueue.pop()) {
session.onViewerIceCandidate(candidate);
}
}
} else {
// TODO ERROR HANDLING
}
@ -103,10 +135,16 @@ module.exports = class ScreenshareManager extends BaseManager {
break;
case 'onIceCandidate':
if (session) {
if (session.constructor === Screenshare) {
session.onIceCandidate(message.candidate);
} else {
Logger.warn(this._logPrefix, "There was no screensharing session for onIceCandidate for", sessionId, ". There should be a queue here");
Logger.info(this._logPrefix, "Queueing ice candidate for later in screenshare", message.voiceBridge);
if (!iceQueue) {
this._iceQueues[sessionId] = [];
iceQueue = this._iceQueues[sessionId];
}
iceQueue.push(message.candidate);
}
break;
@ -114,7 +152,13 @@ module.exports = class ScreenshareManager extends BaseManager {
if (session) {
session.onViewerIceCandidate(message.candidate, connectionId);
} else {
Logger.warn(this._logPrefix, "There was no screensharing session for onIceCandidate for", sessionId + ". There should be a queue here");
Logger.info(this._logPrefix, "Queueing ice candidate for later in screenshare", message.voiceBridge);
if (!iceQueue) {
this._iceQueues[sessionId] = [];
iceQueue = this._iceQueues[sessionId];
}
iceQueue.push(message.candidate);
}
break;

View File

@ -17,6 +17,7 @@ const MCSApi = require('../mcs-core/lib/media/MCSApiStub');
const config = require('config');
const kurentoIp = config.get('kurentoIp');
const localIpAddress = config.get('localIpAddress');
const EventEmitter = require('events').EventEmitter;
const Logger = require('../utils/Logger');
// Global MCS endpoints mapping. These hashes maps IDs generated by the mcs-core
@ -24,8 +25,9 @@ const Logger = require('../utils/Logger');
var sharedScreens = {};
var rtpEndpoints = {};
module.exports = class Screenshare {
module.exports = class Screenshare extends EventEmitter {
constructor(id, bbbgw, voiceBridge, caller = 'caller', vh, vw, meetingId) {
super();
this.mcs = new MCSApi();
this._id = id;
this._BigBlueButtonGW = bbbgw;
@ -145,6 +147,17 @@ module.exports = class Screenshare {
}
}
serverState (event) {
switch (event && event.eventTag) {
case C.MEDIA_SERVER_OFFLINE:
Logger.error("[screenshare] Screenshare provider received MEDIA_SERVER_OFFLINE event");
this.emit(C.MEDIA_SERVER_OFFLINE, event);
break;
default:
Logger.warn("[screenshare] Unknown server state", event);
}
}
async _startPresenter(id, sdpOffer, callback) {
let presenterSdpAnswer, rtpSdpAnswer;
let _callback = callback;
@ -181,6 +194,9 @@ module.exports = class Screenshare {
Logger.error("[screenshare] MCS publish returned error =>", err);
return callback(err);
}
finally {
this.mcs.once('ServerState' + this._presenterEndpoint, this.serverState.bind(this));
}
try {
let sendVideoPort = MediaHandler.getVideoPort();

View File

@ -86,6 +86,10 @@ module.exports = class VideoManager extends BaseManager {
}), C.FROM_VIDEO);
}
video.once(C.MEDIA_SERVER_OFFLINE, async (event) => {
this._stopSession(sessionId);
});
this._bbbGW.publish(JSON.stringify({
connectionId: connectionId,
type: 'video',
@ -98,8 +102,8 @@ module.exports = class VideoManager extends BaseManager {
break;
case 'stop':
if (video.constructor=== Video) {
this._stopSession(sessionId, role, message.cameraId);
if (video.constructor === Video) {
this._stopSession(sessionId);
} else {
Logger.warn(this._logPrefix, "There is no video instance named", cameraId, "to stop");
}

View File

@ -7,11 +7,13 @@ const C = require('../bbb/messages/Constants');
const Logger = require('../utils/Logger');
const Messaging = require('../bbb/messages/Messaging');
const h264_sdp = require('../h264-sdp');
const EventEmitter = require('events').EventEmitter;
var sharedWebcams = {};
module.exports = class Video {
module.exports = class Video extends EventEmitter {
constructor(_bbbGW, _meetingId, _id, _shared, _sessionId) {
super();
this.mcs = new MCSApi();
this.bbbGW = _bbbGW;
this.id = _id;
@ -55,6 +57,27 @@ module.exports = class Video {
}
}
serverState (event) {
switch (event && event.eventTag) {
case C.MEDIA_SERVER_OFFLINE:
Logger.error("[video] Video provider received MEDIA_SERVER_OFFLINE event");
this.bbbGW.publish(JSON.stringify({
connectionId: this.sessionId,
type: 'video',
id : 'error',
response : 'rejected',
cameraId : this.id,
message : C.MEDIA_SERVER_OFFLINE
}), C.FROM_VIDEO);
this.emit(C.MEDIA_SERVER_OFFLINE, event);
break;
default:
Logger.warn("[video] Unknown server state", event);
}
}
mediaState (event) {
let msEvent = event.event;
@ -108,10 +131,9 @@ module.exports = class Video {
cameraId: this.id,
}), C.FROM_VIDEO);
}
break;
default: Logger.warn("[video] Unrecognized event");
default: Logger.warn("[video] Unrecognized event", event);
}
}
@ -134,18 +156,20 @@ module.exports = class Video {
sdpAnswer = ret.answer;
this.flushCandidatesQueue();
this.mcs.on('MediaEvent' + this.mediaId, this.mediaState.bind(this));
this.mcs.on('ServerState' + this.mediaId, this.serverState.bind(this));
Logger.info("[video] MCS publish for user", this.userId, "returned", this.mediaId);
return callback(null, sdpAnswer);
}
else {
else if (sharedWebcams[this.id]) {
const ret = await this.mcs.subscribe(this.userId, sharedWebcams[this.id], 'WebRtcEndpoint', {descriptor: sdpOffer});
this.mediaId = ret.sessionId;
sdpAnswer = ret.answer;
this.flushCandidatesQueue();
this.mcs.on('MediaEvent' + this.mediaId, this.mediaState.bind(this));
this.mcs.on('ServerState' + this.mediaId, this.serverState.bind(this));
Logger.info("[video] MCS subscribe for user", this.userId, "returned", this.mediaId);
@ -159,26 +183,27 @@ module.exports = class Video {
};
async stop () {
Logger.info('[video] Releasing endpoints for user', this.userId, 'at room', this.meetingId);
return new Promise(async (resolve, reject) => {
Logger.info('[video] Stopping video session', this.userId, 'at room', this.meetingId);
try {
await this.mcs.leave(this.meetingId, this.userId);
if (this.shared) {
sharedWebcams[this.id] = null;
try {
await this.mcs.leave(this.meetingId, this.userId);
if (this.shared) {
delete sharedWebcams[this.id];
}
if (this.notFlowingTimeout) {
clearTimeout(this.notFlowingTimeout);
delete this.notFlowingTimeout;
}
delete this._candidatesQueue;
resolve();
}
if (this.notFlowingTimeout) {
clearTimeout(this.notFlowingTimeout);
this.notFlowingTimeout = null;
catch (err) {
// TODO error handling
reject();
}
this._candidatesQueue = null;
Promise.resolve();
}
catch (err) {
// TODO error handling
Promise.reject();
}
return;
};
});
}
};