Making the screenshare process have its own websocket

This is temporary while we fix the server-side message routing
This commit is contained in:
prlanzarin 2017-11-11 01:39:04 +00:00
parent 76f09e16ad
commit 4dc8085648
4 changed files with 227 additions and 150 deletions

View File

@ -8,149 +8,221 @@
"use strict";
const BigBlueButtonGW = require('../bbb/pubsub/bbb-gw');
const cookieParser = require('cookie-parser')
const express = require('express');
const session = require('express-session')
const wsModule = require('../websocket');
const http = require('http');
const fs = require('fs');
const MediaController = require('../media-controller');
var Screenshare = require('./screenshare');
var C = require('../bbb/messages/Constants');
// Global variables
module.exports = class ScreenshareManager {
constructor (logger) {
constructor (settings, logger) {
this._logger = logger;
this._clientId = 0;
this._app = express();
this._sessions = {};
this._screenshareSessions = {};
this._bbbGW = new BigBlueButtonGW("MANAGER");
this._redisGateway;
this._setupExpressSession();
this._setupHttpServer();
}
async start() {
let self = this;
_setupExpressSession() {
this._app.use(cookieParser());
try {
this._redisGateway = await this._bbbGW.addSubscribeChannel(C.TO_SCREENSHARE);
const transcode = await this._bbbGW.addSubscribeChannel(C.FROM_BBB_TRANSCODE_SYSTEM_CHAN);
this._redisGateway.on(C.REDIS_MESSAGE, this._onMessage.bind(this));
process.on('message', this._onMessage.bind(this));
console.log(' [ScreenshareManager] Successfully subscribed to redis channel');
}
catch (error) {
console.log(' [ScreenshareManager] Could not connect to transcoder redis channel, finishing app...');
console.log(error);
self._stopAll();
}
this._sessionHandler = session({
secret : 'Shawarma', rolling : true, resave : true, saveUninitialized : true
});
this._app.use(this._sessionHandler);
}
_onMessage(_message) {
console.log(' [ScreenshareManager] Received message => ');
_setupHttpServer() {
let self = this;
let session;
let message = _message;
/*
* Server startup
*/
this._httpServer = http.createServer(this._app).listen(3008, function() {
console.log(' [*] Running node-apps connection manager.');
});
// The sessionId is voiceBridge for screensharing sessions
let sessionId = message.voiceBridge;
if(this._screenshareSessions[sessionId]) {
session = this._screenshareSessions[sessionId];
}
/*
* Management of sessions
*/
this._wss = new wsModule.Server({
server : this._httpServer,
path : '/kurento-screenshare'
});
switch (message.id) {
case 'presenter':
// TODO isolate this
this._bbbGW = new BigBlueButtonGW();
// Checking if there's already a Screenshare session started
// because we shouldn't overwrite it
this._bbbGW.addSubscribeChannel(C.FROM_BBB_TRANSCODE_SYSTEM_CHAN, function(error, redisWrapper) {
if(error) {
console.log(' Could not connect to transcoder redis channel, finishing app...');
self._stopAll();
}
console.log(' [server] Successfully subscribed to redis channel');
});
if (!self._screenshareSessions[message.voiceBridge]) {
self._screenshareSessions[message.voiceBridge] = {}
self._screenshareSessions[message.voiceBridge] = session;
}
if(session) {
break;
}
this._wss.on('connection', self._onNewConnection.bind(self));
}
session = new Screenshare(sessionId, self._bbbGW,
sessionId, message.callerName, message.vh, message.vw,
message.internalMeetingId);
_onNewConnection(webSocket) {
let self = this;
let connectionId;
let request = webSocket.upgradeReq;
let sessionId;
let callerName;
let response = {
writeHead : {}
};
self._screenshareSessions[sessionId] = {}
self._screenshareSessions[sessionId] = session;
this._sessionHandler(request, response, function(err) {
connectionId = request.session.id + "_" + self._clientId++;
console.log('Connection received with connectionId ' + connectionId);
});
// starts presenter by sending sessionID, websocket and sdpoffer
session._startPresenter(sessionId, message.sdpOffer, function(error, sdpAnswer) {
console.log(" [ScreenshareManager] Started presenter " + sessionId);
if (error) {
self._bbbGW.publish(JSON.stringify({
webSocket.on('error', function(error) {
console.log('Connection ' + connectionId + ' error');
self._stopSession(sessionId);
});
webSocket.on('close', function() {
console.log('Connection ' + connectionId + ' closed');
console.log(webSocket.presenter);
if (webSocket.presenter && self._screenshareSessions[sessionId]) { // if presenter // FIXME (this conditional was added to prevent screenshare stop when an iOS user quits)
console.log(" [CM] Stopping presenter " + sessionId);
self._stopSession(sessionId);
}
if (webSocket.viewer && typeof webSocket.session !== 'undefined') {
console.log(" [CM] Stopping viewer " + webSocket.viewerId);
webSocket.session._stopViewer(webSocket.viewerId);
}
});
webSocket.on('message', function(_message) {
let message = JSON.parse(_message);
let session;
// The sessionId is voiceBridge for screensharing sessions
sessionId = message.voiceBridge;
if(self._screenshareSessions[sessionId]) {
session = self._screenshareSessions[sessionId];
webSocket.session = session;
}
switch (message.id) {
case 'presenter':
// Checking if there's already a Screenshare session started
// because we shouldn't overwrite it
webSocket.presenter = true;
if (!self._screenshareSessions[message.voiceBridge]) {
self._screenshareSessions[message.voiceBridge] = {}
self._screenshareSessions[message.voiceBridge] = session;
}
//session.on('message', self._assembleSessionMessage.bind(self));
if(session) {
break;
}
session = new Screenshare(webSocket, connectionId, self._bbbGW,
sessionId, message.callerName, message.vh, message.vw,
message.internalMeetingId);
self._screenshareSessions[sessionId] = {}
self._screenshareSessions[sessionId] = session;
// starts presenter by sending sessionID, websocket and sdpoffer
session._startPresenter(connectionId, webSocket, message.sdpOffer, function(error, sdpAnswer) {
console.log(" Started presenter " + connectionId);
if (error) {
return webSocket.send(JSON.stringify({
id : 'presenterResponse',
response : 'rejected',
message : error
}));
}
webSocket.send(JSON.stringify({
id : 'presenterResponse',
response : 'rejected',
message : error
}), C.FROM_SCREENSHARE);
return error;
response : 'accepted',
sdpAnswer : sdpAnswer
}));
console.log(" [websocket] Sending presenterResponse \n" + sdpAnswer);
});
break;
case 'viewer':
console.log("[viewer] Session output \n " + session);
webSocket.viewer = true;
webSocket.viewerId = message.callerName;
if (message.sdpOffer && message.voiceBridge) {
if (session) {
session._startViewer(webSocket, message.voiceBridge, message.sdpOffer, message.callerName, self._screenshareSessions[message.voiceBridge]._presenterEndpoint);
} else {
webSocket.sendMessage("voiceBridge not recognized");
webSocket.sendMessage(Object.keys(self._screenshareSessions));
webSocket.sendMessage(message.voiceBridge);
}
}
break;
self._bbbGW.publish(JSON.stringify({
id : 'presenterResponse',
response : 'accepted',
sdpAnswer : sdpAnswer
}), C.FROM_SCREENSHARE);
case 'stop':
console.log('[' + message.id + '] connection ' + connectionId);
console.log(" [ScreenshareManager] [websocket] Sending presenterResponse \n" + sdpAnswer);
});
break;
case 'viewer':
console.log(" [ScreenshareManager][viewer] Session output \n " + session);
if (message.sdpOffer && message.voiceBridge) {
if (session) {
session._startViewer(message.voiceBridge, message.sdpOffer, message.callerName, self._screenshareSessions[message.voiceBridge]._presenterEndpoint);
session._stop(sessionId);
} else {
// TODO ERROR HANDLING
console.log(" [stop] Why is there no session on STOP?");
}
}
break;
break;
case 'stop':
console.log('[' + message.id + '] connection ' + sessionId);
case 'onIceCandidate':
if (session) {
console.log(" [CM] What the fluff is happening");
session._onIceCandidate(message.candidate);
} else {
console.log(" [iceCandidate] Why is there no session on ICE CANDIDATE?");
}
break;
if (session) {
session._stop(sessionId);
} else {
console.log(" [stop] Why is there no session on STOP?");
}
break;
case 'onIceCandidate':
if (session) {
session._onIceCandidate(message.candidate);
} else {
console.log(" [iceCandidate] Why is there no session on ICE CANDIDATE?");
}
break;
case 'ping':
self._bbbGW.publish(JSON.stringify({
id : 'pong',
response : 'accepted'
}), C.FROM_SCREENSHARE);
break;
case 'ping':
webSocket.send(JSON.stringify({
id : 'pong',
response : 'accepted'
}));
break;
case 'viewerIceCandidate':
console.log("[viewerIceCandidate] Session output => " + session);
if (session) {
session._onViewerIceCandidate(message.candidate, message.callerName);
} else {
console.log("[iceCandidate] Why is there no session on ICE CANDIDATE?");
}
break;
case 'viewerIceCandidate':
console.log("[viewerIceCandidate] Session output => " + session);
if (session) {
session._onViewerIceCandidate(message.candidate, message.callerName);
} else {
console.log("[iceCandidate] Why is there no session on ICE CANDIDATE?");
}
break;
default:
self._bbbGW.publish(JSON.stringify({
id : 'error',
message: 'Invald message ' + message
}), C.FROM_SCREENSHARE);
break;
}
default:
webSocket.sendMessage({ id : 'error', message : 'Invalid message ' + message });
break;
}
});
}
_stopSession(sessionId) {

View File

@ -10,4 +10,3 @@ process.on('disconnect',function() {
});
c = new ScreenshareManager();
c.start();

View File

@ -31,7 +31,8 @@ if (config.get('acceptSelfSignedCertificate')) {
}
module.exports = class Screenshare {
constructor(id, bbbgw, voiceBridge, caller, vh, vw, meetingId) {
constructor(ws, id, bbbgw, voiceBridge, caller, vh, vw, meetingId) {
this._ws = ws;
this._id = id;
this._BigBlueButtonGW = bbbgw;
this._presenterEndpoint = null;
@ -52,17 +53,18 @@ module.exports = class Screenshare {
let candidate = kurento.getComplexType('IceCandidate')(_candidate);
if (this._presenterEndpoint) {
console.log(" [screenshare] Adding ICE candidate to presenter");
this._presenterEndpoint.addIceCandidate(candidate);
}
else {
this._candidatesQueue.push(candidate);
}
};
_onViewerIceCandidate(_candidate, callerName) {
console.log("onviewericecandidate callerName = " + callerName);
let candidate = kurento.getComplexType('IceCandidate')(_candidate);
if (this._viewersEndpoint[callerName]) {
this._viewersEndpoint[callerName].addIceCandidate(candidate);
}
@ -74,14 +76,14 @@ module.exports = class Screenshare {
}
}
_startViewer(voiceBridge, sdp, callerName, presenterEndpoint, callback) {
_startViewer(ws, voiceBridge, sdp, callerName, presenterEndpoint, callback) {
let self = this;
let _callback = function(){};
console.log("startviewer callerName = " + callerName);
self._viewersCandidatesQueue[callerName] = [];
console.log("VIEWER VOICEBRIDGE: "+self._voiceBridge);
MediaController.createMediaElement(voiceBridge, C.WebRTC, function(error, webRtcEndpoint) {
if (error) {
console.log("Media elements error" + error);
@ -106,11 +108,7 @@ module.exports = class Screenshare {
// ICE NEGOTIATION WITH THE ENDPOINT
self._viewersEndpoint[callerName].on('OnIceCandidate', function(event) {
let candidate = kurento.getComplexType('IceCandidate')(event.candidate);
self._BigBlueButtonGW.publish(JSON.stringify({
id : 'iceCandidate',
candidate : candidate
}), C.FROM_SCREENSHARE);
let candidate = kurento.getComplexType('IceCandidate')(event.candidate); ws.sendMessage({ id : 'iceCandidate', candidate : candidate });
});
sdp = h264_sdp.transform(sdp);
@ -121,11 +119,7 @@ module.exports = class Screenshare {
//pipeline.release();
return _callback(error);
}
self._BigBlueButtonGW.publish(JSON.stringify({
id: "viewerResponse",
sdpAnswer: webRtcSdpAnswer,
response: "accepted"
}), C.FROM_SCREENSHARE);
ws.sendMessage({id: "viewerResponse", sdpAnswer: webRtcSdpAnswer, response: "accepted"});
console.log(" Sent sdp message to client with callerName:" + callerName);
MediaController.gatherCandidates(webRtcEndpoint.id, function(error) {
@ -134,12 +128,12 @@ module.exports = class Screenshare {
}
self._viewersEndpoint[callerName].on('MediaFlowInStateChange', function(event) {
if (event.state === 'NOT_FLOWING') {
console.log(" NOT FLOWING ");
}
else if (event.state === 'FLOWING') {
console.log(" FLOWING ");
}
if (event.state === 'NOT_FLOWING') {
console.log(" NOT FLOWING ");
}
else if (event.state === 'FLOWING') {
console.log(" FLOWING ");
}
});
});
});
@ -147,7 +141,7 @@ module.exports = class Screenshare {
}
_startPresenter(id, sdpOffer, callback) {
_startPresenter(id, ws, sdpOffer, callback) {
let self = this;
let _callback = callback;
@ -189,11 +183,7 @@ module.exports = class Screenshare {
self._presenterEndpoint.on('OnIceCandidate', function(event) {
let candidate = kurento.getComplexType('IceCandidate')(event.candidate);
self._BigBlueButtonGW.publish(JSON.stringify({
id : 'iceCandidate',
cameraId: id,
candidate : candidate
}), C.FROM_SCREENSHARE);
ws.sendMessage({ id : 'iceCandidate', cameraId: id, candidate : candidate });
});
MediaController.processOffer(webRtcEndpoint.id, sdpOffer, function(error, webRtcSdpAnswer) {
@ -254,7 +244,6 @@ module.exports = class Screenshare {
} else {
console.log(" [webRtcEndpoint] PLEASE DONT TRY STOPPING THINGS TWICE");
}
if (this._ffmpegRtpEndpoint) {
MediaController.releaseMediaElement(this._ffmpegRtpEndpoint.id);
this._ffmpegRtpEndpoint = null;
@ -273,7 +262,7 @@ module.exports = class Screenshare {
let self = this;
let strm = Messaging.generateStopTranscoderRequestMessage(this._meetingId, this._meetingId);
self._BigBlueButtonGW.publish(strm, C.TO_BBB_TRANSCODE_SYSTEM_CHAN);
self._BigBlueButtonGW.publish(strm, C.TO_BBB_TRANSCODE_SYSTEM_CHAN, function(error) {});
// Interoperability: capturing 1.1 stop_transcoder_reply messages
self._BigBlueButtonGW.once(C.STOP_TRANSCODER_REPLY, function(payload) {
@ -290,6 +279,7 @@ module.exports = class Screenshare {
}
_onRtpMediaFlowing(meetingId, rtpParams) {
console.log(" [screenshare] Media FLOWING for meeting => " + meetingId);
let self = this;
let strm = Messaging.generateStartTranscoderRequestMessage(meetingId, meetingId, rtpParams);
@ -308,21 +298,23 @@ module.exports = class Screenshare {
});
self._BigBlueButtonGW.publish(strm, C.TO_BBB_TRANSCODE_SYSTEM_CHAN);
self._BigBlueButtonGW.publish(strm, C.TO_BBB_TRANSCODE_SYSTEM_CHAN, function(error) {});
};
_stopRtmpBroadcast (meetingId) {
var self = this;
console.log(" [screenshare] _stopRtmpBroadcast for meeting => " + meetingId);
let self = this;
if(self._meetingId === meetingId) {
// TODO correctly assemble this timestamp
let timestamp = now.format('hhmmss');
let dsrstom = Messaging.generateScreenshareRTMPBroadcastStoppedEvent2x(self._voiceBridge,
self._voiceBridge, self._streamUrl, self._vw, self._vh, timestamp);
self._BigBlueButtonGW.publish(dsrstom, C.FROM_VOICE_CONF_SYSTEM_CHAN);
self._BigBlueButtonGW.publish(dsrstom, C.FROM_VOICE_CONF_SYSTEM_CHAN, function(error) {});
}
}
_startRtmpBroadcast (meetingId, output) {
console.log(" [screenshare] _startRtmpBroadcast for meeting => " + meetingId);
var self = this;
if(self._meetingId === meetingId) {
// TODO correctly assemble this timestamp
@ -331,11 +323,25 @@ module.exports = class Screenshare {
let dsrbstam = Messaging.generateScreenshareRTMPBroadcastStartedEvent2x(self._voiceBridge,
self._voiceBridge, self._streamUrl, self._vw, self._vh, timestamp);
self._BigBlueButtonGW.publish(dsrbstam, C.FROM_VOICE_CONF_SYSTEM_CHAN);
self._BigBlueButtonGW.publish(dsrbstam, C.FROM_VOICE_CONF_SYSTEM_CHAN, function(error) {});
}
}
_onRtpMediaNotFlowing() {
console.log(" [screenshare] TODO RTP NOT_FLOWING");
};
_stopViewer(id) {
let viewer = this._viewersEndpoint[id];
console.log(' [stop] Releasing endpoints for ' + id);
if (viewer) {
MediaController.releaseMediaElement(viewer.id);
this._viewersEndpoint[viewer.id] = null;
} else {
console.log(" [webRtcEndpoint] PLEASE DONT TRY STOPPING THINGS TWICE");
}
delete this._viewersCandidatesQueue[id];
};
};

View File

@ -7,8 +7,8 @@
const ConnectionManager = require('./lib/connection-manager/ConnectionManager');
const HttpServer = require('./lib/connection-manager/HttpServer');
const server = new HttpServer();
const WebsocketConnectionManager = require('./lib/connection-manager/WebsocketConnectionManager');
//const server = new HttpServer();
//const WebsocketConnectionManager = require('./lib/connection-manager/WebsocketConnectionManager');
const cp = require('child_process');
let screenshareProc = cp.fork('./lib/screenshare/ScreenshareProcess', {
@ -47,9 +47,9 @@ videoProc.on('message',onMessage);
videoProc.on('error',onError);
videoProc.on('disconnect',onDisconnect);
const CM = new ConnectionManager(screenshareProc, videoProc);
//const CM = new ConnectionManager(screenshareProc, videoProc);
let websocketManager = new WebsocketConnectionManager(server.getServerObject(), '/kurento-screenshare');
//let websocketManager = new WebsocketConnectionManager(server.getServerObject(), '/kurento-screenshare');
process.on('SIGTERM', process.exit)
process.on('SIGINT', process.exit)
@ -59,9 +59,9 @@ process.on('uncaughtException', function (error) {
});
CM.setHttpServer(server);
CM.addAdapter(websocketManager);
CM.listen(() => {
console.log(" [SERVER] Server started");
});
//CM.setHttpServer(server);
//CM.addAdapter(websocketManager);
//
//CM.listen(() => {
// console.log(" [SERVER] Server started");
//});