SFU VideoManager using ES6 class and VideoProcess with BaseProcess

This commit is contained in:
prlanzarin 2018-03-14 00:22:32 +00:00
parent 4149e106f8
commit 36aebf39b0
3 changed files with 176 additions and 195 deletions

View File

@ -27,9 +27,8 @@ module.exports = class BaseProcess {
async stop () {
try {
await this.manager.stopAll();
Logger.info(this.logPrefix, "Exiting screenshare process");
Logger.info(this.logPrefix, "Exiting process");
process.exit(0);
}
catch (err) {
Logger.error(this.logPrefix, err);
@ -38,11 +37,11 @@ module.exports = class BaseProcess {
}
handleException (error) {
process.exit(1);
Logger.error(this.logPrefix, 'TODO => Uncaught exception', error.stack);
process.exit(1);
}
handleRejection (reason, promise) {
Logger.error(this.logPrefix, 'Unhandled Rejection at: Promise', p, 'reason:', reason);
Logger.error(this.logPrefix, 'TODO => Unhandled Rejection at: Promise', promise, 'reason:', reason);
}
}

View File

@ -11,201 +11,203 @@ const Video = require('./video');
const C = require('../bbb/messages/Constants');
const Logger = require('../utils/Logger');
let sessions = {};
var clientId = 0;
let bbbGW = new BigBlueButtonGW("MANAGER");
let redisGateway;
bbbGW.addSubscribeChannel(C.TO_VIDEO).then((gw) => {
redisGateway = gw;
redisGateway.on(C.REDIS_MESSAGE, _onMessage);
});
let _onMessage = async function (_message) {
let message = _message;
let sessionId = message.connectionId;
let video;
let role = message.role? message.role : 'any';
let cameraId = message.cameraId;
let shared = false;
let iceQueues = {};
let iceQueue;
if (!message.cameraId) {
console.log(" [VideoManager] Undefined message.cameraId for session ", sessionId);
return;
module.exports = class VideoManager {
constructor() {
this.sessions = {};
this.bbbGW = new BigBlueButtonGW();
this.redisGateway;
this.iceQueues = {};
}
if (message.role === 'share') {
shared = true;
cameraId += '-shared';
start () {
this.bbbGW.addSubscribeChannel(C.TO_VIDEO).then((gw) => {
this.redisGateway = gw;
this.redisGateway.on(C.REDIS_MESSAGE, this._onMessage.bind(this));
});
}
if (!sessions[sessionId]) {
sessions[sessionId] = {};
}
async _onMessage (_message) {
let message = _message;
let sessionId = message.connectionId;
let video;
let role = message.role? message.role : 'any';
let cameraId = message.cameraId;
let shared = false;
let iceQueue;
if (!iceQueues[sessionId]) {
iceQueues[sessionId] = {};
}
if (!message.cameraId) {
Logger.warn(" [VideoManager] Undefined message.cameraId for session ", sessionId);
return;
}
if (sessions[sessionId][cameraId]) {
video = sessions[sessionId][cameraId];
}
if (message.role === 'share') {
shared = true;
cameraId += '-shared';
}
if (iceQueues[sessionId][cameraId]) {
iceQueue = iceQueues[sessionId][cameraId] ;
}
if (!this.sessions[sessionId]) {
this.sessions[sessionId] = {};
}
switch (message.id) {
case 'start':
Logger.info('[VideoManager] Received message [' + message.id + '] from connection ' + sessionId);
Logger.debug('[VideoManager] Message =>', JSON.stringify(message, null, 2));
if (!this.iceQueues[sessionId]) {
this.iceQueues[sessionId] = {};
}
video = new Video(bbbGW, message.cameraId, shared, message.connectionId);
if (this.sessions[sessionId][cameraId]) {
video = this.sessions[sessionId][cameraId];
}
// Empty ice queue after starting video
if (iceQueue) {
let candidate;
while(candidate = iceQueue.pop()) {
video.onIceCandidate(cand);
if (this.iceQueues[sessionId][cameraId]) {
iceQueue = this.iceQueues[sessionId][cameraId] ;
}
switch (message.id) {
case 'start':
Logger.info('[VideoManager] Received message [' + message.id + '] from connection ' + sessionId);
Logger.debug('[VideoManager] Message =>', JSON.stringify(message, null, 2));
video = new Video(this.bbbGW, message.cameraId, shared, message.connectionId);
// Empty ice queue after starting video
if (iceQueue) {
let candidate;
while(candidate = iceQueue.pop()) {
video.onIceCandidate(cand);
}
}
}
sessions[sessionId][cameraId] = video;
this.sessions[sessionId][cameraId] = video;
video.start(message.sdpOffer, (error, sdpAnswer) => {
if (error) {
return bbbGW.publish(JSON.stringify({
video.start(message.sdpOffer, (error, sdpAnswer) => {
if (error) {
return this.bbbGW.publish(JSON.stringify({
connectionId: sessionId,
type: 'video',
role: role,
id : 'error',
response : 'rejected',
cameraId : message.cameraId,
message : error
}), C.FROM_VIDEO);
}
this.bbbGW.publish(JSON.stringify({
connectionId: sessionId,
type: 'video',
role: role,
id : 'error',
response : 'rejected',
cameraId : message.cameraId,
message : error
id : 'startResponse',
cameraId: message.cameraId,
sdpAnswer : sdpAnswer
}), C.FROM_VIDEO);
}
});
break;
bbbGW.publish(JSON.stringify({
case 'stop':
if (video) {
this._stopVideo(sessionId, role, message.cameraId);
} else {
Logger.warn("[VideoManager] There is no video instance named", cameraId, "to stop");
}
break;
case 'onIceCandidate':
if (video) {
video.onIceCandidate(message.candidate);
} else {
Logger.info("[VideoManager] Queueing ice candidate for later in video", cameraId);
if (!iceQueue) {
this.iceQueues[sessionId][cameraId] = [];
iceQueue = this.iceQueues[sessionId][cameraId];
}
iceQueue.push(message.candidate);
}
break;
case 'close':
Logger.info("[VideoManager] Closing session for sessionId: ", sessionId);
this._stopSession(sessionId);
break;
default:
this.bbbGW.publish(JSON.stringify({
connectionId: sessionId,
type: 'video',
role: role,
id : 'startResponse',
cameraId: message.cameraId,
sdpAnswer : sdpAnswer
id : 'error',
response : 'rejected',
message : 'Invalid message ' + JSON.stringify(message)
}), C.FROM_VIDEO);
});
break;
break;
}
}
case 'stop':
if (video) {
stopVideo(sessionId, role, message.cameraId);
} else {
Logger.warn("[VideoManager] There is no video instance named", cameraId, "to stop");
}
break;
async _stopSession (sessionId) {
case 'onIceCandidate':
let videoIds = Object.keys(this.sessions[sessionId]);
if (video) {
video.onIceCandidate(message.candidate);
} else {
Logger.info("[VideoManager] Queueing ice candidate for later in video", cameraId);
if (!iceQueue) {
iceQueues[sessionId][cameraId] = [];
iceQueue = iceQueues[sessionId][cameraId];
for (let i=0; i < videoIds.length; i++) {
let camId = videoIds[i].split('-')[0], role = videoIds[i].split('-')[1];
await this._stopVideo(sessionId, role ? 'share' : 'viewer', camId);
}
delete this.sessions[sessionId];
this.logAvailableSessions();
}
async _stopVideo (sessionId, role, cameraId) {
Logger.info('[VideoManager/x] Stopping session ' + sessionId + " with role " + role + " for camera " + cameraId);
try {
if (role === 'share') {
var sharedVideo = this.sessions[sessionId][cameraId+'-shared'];
if (sharedVideo) {
Logger.info('[VideoManager] Stopping sharer [', sessionId, '][', cameraId,']');
await sharedVideo.stop();
delete this.sessions[sessionId][cameraId+'-shared'];
}
iceQueue.push(message.candidate);
}
break;
case 'close':
Logger.info("[VideoManager] Closing session for sessionId: ", sessionId);
stopSession(sessionId);
break;
default:
bbbGW.publish(JSON.stringify({
connectionId: sessionId,
type: 'video',
id : 'error',
response : 'rejected',
message : 'Invalid message ' + JSON.stringify(message)
}), C.FROM_VIDEO);
break;
}
};
let stopSession = async function(sessionId) {
let videoIds = Object.keys(sessions[sessionId]);
for (let i=0; i < videoIds.length; i++) {
let camId = videoIds[i].split('-')[0], role = videoIds[i].split('-')[1];
await stopVideo(sessionId, role ? 'share' : 'viewer', camId);
}
delete sessions[sessionId];
logAvailableSessions();
}
let stopVideo = async function(sessionId, role, cameraId) {
Logger.info('[VideoManager/x] Stopping session ' + sessionId + " with role " + role + " for camera " + cameraId);
try {
if (role === 'share') {
var sharedVideo = sessions[sessionId][cameraId+'-shared'];
if (sharedVideo) {
Logger.info('[VideoManager] Stopping sharer [', sessionId, '][', cameraId,']');
await sharedVideo.stop();
delete sessions[sessionId][cameraId+'-shared'];
else if (role === 'viewer') {
var video = this.sessions[sessionId][cameraId];
if (video) {
Logger.info('[VideoManager] Stopping viewer [', sessionId, '][', cameraId,']');
await video.stop();
delete this.sessions[sessionId][cameraId];
}
}
}
else if (role === 'viewer') {
var video = sessions[sessionId][cameraId];
if (video) {
Logger.info('[VideoManager] Stopping viewer [', sessionId, '][', cameraId,']');
await video.stop();
delete sessions[sessionId][cameraId];
}
}
}
catch (err) {
Logger.error("[VideoManager] Stop error => ", err);
}
}
let stopAll = function() {
Logger.info('[VideoManager] Stopping everything! ');
if (sessions == null) {
return;
}
let sessionIds = Object.keys(sessions);
for (var i = 0; i < sessionIds.length; i++) {
stopSession(sessionIds[i]);
}
setTimeout(process.exit, 100);
}
let logAvailableSessions = function() {
if(sessions) {
Logger.info("[VideoManager] Available sessions are =>");
let sessionMainKeys = Object.keys(sessions);
for (var k in sessions) {
if(sessions[k]) {
Logger.info('[VideoManager] Session[', k,'] => ', Object.keys(sessions[k]));
}
catch (err) {
Logger.error("[VideoManager] Stop error => ", err);
}
}
stopAll () {
Logger.info('[VideoManager] Stopping everything! ');
if (this.sessions == null) {
return;
}
let sessionIds = Object.keys(this.sessions);
for (var i = 0; i < sessionIds.length; i++) {
this._stopSession(sessionIds[i]);
}
setTimeout(process.exit, 100);
}
_logAvailableSessions () {
if(this.sessions) {
Logger.info("[VideoManager] Available sessions are =>");
let sessionMainKeys = Object.keys(this.sessions);
for (var k in this.sessions) {
if(this.sessions[k]) {
Logger.info('[VideoManager] Session[', k,'] => ', Object.keys(this.sessions[k]));
}
}
}
}
}

View File

@ -1,29 +1,9 @@
'use strict';
const Logger = require('../utils/Logger');
const config = require('config');
const VideoManager= require('./VideoManager');
const BaseProcess = require('../base/BaseProcess');
if (config.get('acceptSelfSignedCertificate')) {
process.env.NODE_TLS_REJECT_UNAUTHORIZED=0;
}
let manager = new VideoManager();
let newProcess = new BaseProcess(manager, '[VideoManager]');
// This basically starts the video Manager routines which listens the connection
// manager messages routed to "to-sfu-video"
const VideoManager = require('./VideoManager');
process.on('uncaughtException', (error) => {
Logger.error('[VideoProcess] Uncaught exception ', error.stack);
});
process.on('disconnect', async () => {
Logger.warn('[VideoProcess] Parent process exited, cleaning things up and finishing child now...');
//TODO below
//async VideoManager.stopAll();
process.exit(0);
});
// Added this listener to identify unhandled promises, but we should start making
// sense of those as we find them
process.on('unhandledRejection', (reason, p) => {
Logger.error('[VideoProcess] Unhandled Rejection at: Promise', p, 'reason:', reason);
});
newProcess.start();