Added first prototype of Kurento WebRTC screensharing server module

This commit is contained in:
prlanzarin 2017-06-13 15:53:51 +00:00
parent d6b1dd09b9
commit 5a77b1b69d
25 changed files with 1171 additions and 0 deletions

1
labs/kurento-screenshare/.gitignore vendored Normal file
View File

@ -0,0 +1 @@
node_modules/

View File

View File

@ -0,0 +1,8 @@
kurentoUrl: "wss://kurento-desktopsharing.mconf.com/kurento"
kurentoIp: "104.131.115.8"
localIpAddress: "104.131.5.8"
acceptSelfSignedCertificate: false
redisHost : "127.0.0.1"
redisPort : "6379"
minVideoPort: 30000
maxVideoPort: 33000

View File

@ -0,0 +1 @@
node --inspect --debug-brk server.js

View File

@ -0,0 +1,2 @@
This folder contains a dummy self-signed certificate only for demo purposses,
**DON'T USE IT IN PRODUCTION**.

View File

@ -0,0 +1,19 @@
-----BEGIN CERTIFICATE-----
MIIDBjCCAe4CCQCuf5QfyX2oDDANBgkqhkiG9w0BAQsFADBFMQswCQYDVQQGEwJB
VTETMBEGA1UECAwKU29tZS1TdGF0ZTEhMB8GA1UECgwYSW50ZXJuZXQgV2lkZ2l0
cyBQdHkgTHRkMB4XDTE0MDkyOTA5NDczNVoXDTE1MDkyOTA5NDczNVowRTELMAkG
A1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoMGEludGVybmV0
IFdpZGdpdHMgUHR5IEx0ZDCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEB
AMJOyOHJ+rJWJEQ7P7kKoWa31ff7hKNZxF6sYE5lFi3pBYWIY6kTN/iUaxJLROFo
FhoC/M/STY76rIryix474v/6cRoG8N+GQBEn4IAP1UitWzVO6pVvBaIt5IKlhhfm
YA1IMweCd03vLcaHTddNmFDBTks7QDwfenTaR5VjKYc3OtEhcG8dgLAnOjbbk2Hr
8wter2IeNgkhya3zyoXnTLT8m8IMg2mQaJs62Xlo9gs56urvVDWG4rhdGybj1uwU
ZiDYyP4CFCUHS6UVt12vADP8vjbwmss2ScGsIf0NjaU+MpSdEbB82z4b2NiN8Wq+
rFA/JbvyeoWWHMoa7wkVs1MCAwEAATANBgkqhkiG9w0BAQsFAAOCAQEAYLRwV9fo
AOhJfeK199Tv6oXoNSSSe10pVLnYxPcczCVQ4b9SomKFJFbmwtPVGi6w3m+8mV7F
9I2WKyeBHzmzfW2utZNupVybxgzEjuFLOVytSPdsB+DcJomOi8W/Cf2Vk8Wykb/t
Ctr1gfOcI8rwEGKxm279spBs0u1snzoLyoimbMbiXbC82j1IiN3Jus08U07m/j7N
hRBCpeHjUHT3CRpvYyTRnt+AyBd8BiyJB7nWmcNI1DksXPfehd62MAFS9e1ZE+dH
Aavg/U8VpS7pcCQcPJvIJ2hehrt8L6kUk3YUYqZ0OeRZK27f2R5+wFlDF33esm3N
dCSsLJlXyqAQFg==
-----END CERTIFICATE-----

View File

@ -0,0 +1,16 @@
-----BEGIN CERTIFICATE REQUEST-----
MIICijCCAXICAQAwRTELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUx
ITAfBgNVBAoMGEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDCCASIwDQYJKoZIhvcN
AQEBBQADggEPADCCAQoCggEBAMJOyOHJ+rJWJEQ7P7kKoWa31ff7hKNZxF6sYE5l
Fi3pBYWIY6kTN/iUaxJLROFoFhoC/M/STY76rIryix474v/6cRoG8N+GQBEn4IAP
1UitWzVO6pVvBaIt5IKlhhfmYA1IMweCd03vLcaHTddNmFDBTks7QDwfenTaR5Vj
KYc3OtEhcG8dgLAnOjbbk2Hr8wter2IeNgkhya3zyoXnTLT8m8IMg2mQaJs62Xlo
9gs56urvVDWG4rhdGybj1uwUZiDYyP4CFCUHS6UVt12vADP8vjbwmss2ScGsIf0N
jaU+MpSdEbB82z4b2NiN8Wq+rFA/JbvyeoWWHMoa7wkVs1MCAwEAAaAAMA0GCSqG
SIb3DQEBCwUAA4IBAQBMszYHMpklgTF/3h1zAzKXUD9NrtZp8eWhL06nwVjQX8Ai
EaCUiW0ypstokWcH9+30chd2OD++67NbxYUEucH8HrKpOoy6gs5L/mqgQ9Npz3OT
TB1HI4kGtpVuUQ5D7L0596tKzMX/CgW/hRcHWl+PDkwGhQs1qZcJ8QN+YP6AkRrO
5sDdDB/BLrB9PtBQbPrYIQcHQ7ooYWz/G+goqRxzZ6rt0aU2uAB6l7c82ADLAqFJ
qlw+xqVzEETVfqM5TXKK/wV3hgm4oSX5Q4SHLKF94ODOkWcnV4nfIKz7y+5XcQ3p
PrGimI1br07okC5rO9cgLCR0Ks20PPFcM0FvInW/
-----END CERTIFICATE REQUEST-----

View File

@ -0,0 +1,27 @@
-----BEGIN RSA PRIVATE KEY-----
MIIEogIBAAKCAQEAwk7I4cn6slYkRDs/uQqhZrfV9/uEo1nEXqxgTmUWLekFhYhj
qRM3+JRrEktE4WgWGgL8z9JNjvqsivKLHjvi//pxGgbw34ZAESfggA/VSK1bNU7q
lW8Foi3kgqWGF+ZgDUgzB4J3Te8txodN102YUMFOSztAPB96dNpHlWMphzc60SFw
bx2AsCc6NtuTYevzC16vYh42CSHJrfPKhedMtPybwgyDaZBomzrZeWj2Cznq6u9U
NYbiuF0bJuPW7BRmINjI/gIUJQdLpRW3Xa8AM/y+NvCayzZJwawh/Q2NpT4ylJ0R
sHzbPhvY2I3xar6sUD8lu/J6hZYcyhrvCRWzUwIDAQABAoIBACwt56TW3MZxqZtN
8WYsUZheUispJ/ZQMcLo5JjOiSV1Jwk+gpJtyTse291z+bxagzP02/CQu4u32UVa
cmE0cp+LHO4zB8964dREwdm8P91fdS6Au/uwG5LNZniCFCQZAFvkv52Ef4XbzQen
uf4rKWerHBck6K0C5z/sZXxE6KtScE2ZLUmkhO0nkHM6MA6gFk2OMnB+oDTOWWPt
1mlreQlzuMYG/D4axviRYrOSYCE5Qu1SOw/DEOLQqqeBjQrKtAyOlFHZsIR6lBfe
KHMChPUcYIwaowt2DcqH/A+AFXRtaifa6DvH8Yul+2vAp47UEpaenVfM5bpN33XV
EzerjtECgYEA+xiXzblek67iQgRpc9eHSoqs4iRLhae8s8kpAG51Jz46Je+Dmium
XV769oiUGUxBeoUb7ryW+4MOzHJaA1BfGejQSvwLIB9e4cnikqnAArcqbcAcOCL1
aYYDiSmSmN/AokNZlPKEBFXP9bzXrU9smQJWNTHlcRl7JXfnwF+jwNsCgYEAxhpE
SBr9vlUVHNh/S6C5i80NIYg6jCy2FgsmuzEqmcqV0pTyzegmq8bru+QmuvoUj2o4
nVv4J9d1fLF6ECUVk9aK8UdJOOB6hAfurOdJCArgrsY/9t4uDzXfbPCdfSNQITE0
XgeNGQX1EzvwwkBmyZKk0kLIr3syP8ZCWfXDROkCgYBR+dF1pJMv++R6UR5sZ20P
9P5ERj0xwXVl7MKqFWXCDhrFz9BTQPTrftrIKgbPy4mFCnf4FTHlov/t11dzxYWG
2+9Ey8yGDDfZ1yNVZn39ZPdBJXsRCLi+XrZAzYXCyyoEz6ArdJGNKMbgH2r6dfeq
bIzgiQ2zQvJlZSQQNiksCQKBgCgwzAmU8EXdHRttEOZXBU3HnBJhgP9PUuHGAWWY
4/uvjhXbAiekIbRX9xt3fiQQ+HrgIfxK3F246K0TlKAR5f7IWAf7Xm+bmz+OHG4X
vklTa6IJtpBvIwkS9PE1H75zm54gTW+GOKoK+12bm4zNZA0hIy9FPVHcvKUTpAJ8
SdGBAoGAHLtJnB1NO4EgO6WtLQMXt7HrIbup8eZi8/82gC3422C+ooKIrYQ07qSw
nBOO/G0OB4yd6vCE2x5+TWSSCYGgG5A8aIv5qP76RP4hovGHxG/y2tfotw5UuOrh
nFWlTP4Urs8PeykvK9ao8r/T8BnPIC16U6ENYvAc0mRlFA2j1GA=
-----END RSA PRIVATE KEY-----

View File

@ -0,0 +1,67 @@
"use strict";
/**
* @classdesc
* Message constants for the communication with BigBlueButton
* @constructor
*/
function Constants () {
return {
// Redis channels
FROM_BBB_TRANSCODE_SYSTEM_CHAN : "bigbluebutton:from-bbb-transcode:system",
FROM_VOICE_CONF_SYSTEM_CHAN: "bigbluebutton:from_voice_conf:system",
TO_BBB_TRANSCODE_SYSTEM_CHAN: "bigbluebutton:to-bbb-transcode:system",
// RedisWrapper events
REDIS_MESSAGE : "redis_message",
// Message identifiers
START_TRANSCODER_REQUEST: "start_transcoder_request_message",
START_TRANSCODER_REPLY: "start_transcoder_reply_message",
STOP_TRANSCODER_REQUEST: "stop_transcoder_request_message",
STOP_TRANSCODER_REPLY: "stop_transcoder_reply_message",
DESKSHARE_RTMP_BROADCAST_STARTED: "deskshare_rtmp_broadcast_started_message",
DESKSHARE_RTMP_BROADCAST_STOPPED: "deskshare_rtmp_broadcast_stopped_message",
// Redis messages fields
USER_ID : "user_id",
OPTIONS: "options",
VOICE_CONF_ID : "voice_conf_id",
TRANSCODER_ID : "transcoder_id",
CONFERENCE_NAME: "conference_name",
STREAM_URL: "stream_url",
TIMESTAMP: "timestamp",
VIDEO_WIDTH: "vw",
VIDEO_HEIGHT: "vh",
// RTP params
MEETING_ID : "meeting_id",
VOICE_CONF : "voice_conf",
KURENTO_ENDPOINT_ID : "kurento_endpoint_id",
PARAMS : "params",
MEDIA_DESCRIPTION: "media_description",
LOCAL_IP_ADDRESS: "local_ip_address",
LOCAL_VIDEO_PORT: "local_video_port",
DESTINATION_IP_ADDRESS : "destination_ip_address",
DESTINATION_VIDEO_PORT : "destination_video_port",
REMOTE_VIDEO_PORT : "remote_video_port",
CODEC_NAME: "codec_name",
CODEC_ID: "codec_id",
CODEC_RATE: "codec_rate",
RTP_PROFILE: "rtp_profile",
SEND_RECEIVE: "send_receive",
FRAME_RATE: "frame_rate",
INPUT: "input",
KURENTO_TOKEN : "kurento_token",
SCREENSHARE: "deskShare",
STREAM_TYPE: "stream_type",
STREAM_TYPE_SCREENSHARE: "stream_type_deskshare",
STREAM_TYPE_VIDEO: "stream_type_video",
RTP_TO_RTMP: "transcode_rtp_to_rtmp",
TRANSCODER_CODEC: "codec",
TRANSCODER_TYPE: "transcoder_type",
CALLERNAME: "callername"
}
}
module.exports = Constants();

View File

@ -0,0 +1,48 @@
var Constants = require('./Constants.js');
// Messages
var OutMessage = require('./OutMessage.js');
var StartTranscoderRequestMessage =
require('./transcode/StartTranscoderRequestMessage.js')(Constants);
var StopTranscoderRequestMessage =
require('./transcode/StopTranscoderRequestMessage.js')(Constants);
var DeskShareRTMPBroadcastStartedEventMessage =
require('./screenshare/DeskShareRTMPBroadcastStartedEventMessage.js')(Constants);
var DeskShareRTMPBroadcastStoppedEventMessage =
require('./screenshare/DeskShareRTMPBroadcastStoppedEventMessage.js')(Constants);
/**
* @classdesc
* Messaging utils to assemble JSON/Redis BigBlueButton messages
* @constructor
*/
function Messaging() {}
Messaging.prototype.generateStartTranscoderRequestMessage =
function(meetingId, transcoderId, params) {
var statrm = new StartTranscoderRequestMessage(meetingId, transcoderId, params);
return statrm.toJson();
}
Messaging.prototype.generateStopTranscoderRequestMessage =
function(meetingId, transcoderId) {
var stotrm = new StopTranscoderRequestMessage(meetingId, transcoderId);
return stotrm.toJson();
}
Messaging.prototype.generateDeskShareRTMPBroadcastStartedEvent =
function(conferenceName, streamUrl, timestamp) {
var stadrbem = new DeskShareRTMPBroadcastStartedEventMessage(conferenceName, streamUrl, timestamp);
return stadrbem.toJson();
}
Messaging.prototype.generateDeskShareRTMPBroadcastStoppedEvent =
function(conferenceName, streamUrl, timestamp) {
var stodrbem = new DeskShareRTMPBroadcastStoppedEventMessage(conferenceName, streamUrl, timestamp);
return stodrbem.toJson();
}
module.exports = new Messaging();
module.exports.Constants = Constants;

View File

@ -0,0 +1,35 @@
/*
* (C) Copyright 2016 Mconf Tecnologia (http://mconf.com/)
*/
/**
* @classdesc
* Base class for output messages sent to BBB
* @constructor
*/
function OutMessage(messageName) {
/**
* The header template of the message
* @type {Object}
*/
this.header = {
version: "0.0.1",
name: messageName
};
/**
* The payload of the message
* @type {Object}
*/
this.payload = null;
/**
* Generates the JSON representation of the message
* @return {String} The JSON string of this message
*/
this.toJson = function () {
return JSON.stringify(this);
}
};
module.exports = OutMessage;

View File

@ -0,0 +1,22 @@
/*
*
*/
var inherits = require('inherits');
var OutMessage = require('../OutMessage');
module.exports = function (Constants) {
function DeskShareRTMPBroadcastStartedEventMessage (conferenceName, streamUrl, vw, vh, timestamp) {
DeskShareRTMPBroadcastStartedEventMessage.super_.call(this, Constants.DESKSHARE_RTMP_BROADCAST_STARTED);
this.payload = {};
this.payload[Constants.CONFERENCE_NAME] = conferenceName;
this.payload[Constants.STREAM_URL] = streamUrl;
this.payload[Constants.TIMESTAMP] = timestamp;
this.payload[Constants.VIDEO_WIDTH] = vw;
this.payload[Constants.VIDEO_HEIGHT] = vh;
};
inherits(DeskShareRTMPBroadcastStartedEventMessage, OutMessage);
return DeskShareRTMPBroadcastStartedEventMessage;
}

View File

@ -0,0 +1,22 @@
/*
*
*/
var inherits = require('inherits');
var OutMessage = require('../OutMessage');
module.exports = function (Constants) {
function DeskShareRTMPBroadcastStoppedEventMessage (conferenceName, streamUrl, vw, vh, timestamp) {
DeskShareRTMPBroadcastStoppedEventMessage.super_.call(this, Constants.DESKSHARE_RTMP_BROADCAST_STOPPED);
this.payload = {};
this.payload[Constants.CONFERENCE_NAME] = conferenceName;
this.payload[Constants.STREAM_URL] = streamUrl;
this.payload[Constants.TIMESTAMP] = timestamp;
this.payload[Constants.VIDEO_WIDTH] = vw;
this.payload[Constants.VIDEO_HEIGHT] = vh;
};
inherits(DeskShareRTMPBroadcastStoppedEventMessage, OutMessage);
return DeskShareRTMPBroadcastStoppedEventMessage;
}

View File

@ -0,0 +1,21 @@
/*
*
*/
var inherits = require('inherits');
var OutMessage = require('../OutMessage');
module.exports = function (Constants) {
function DeskShareStartRTMPBroadcastEventMessage (conferenceName, streamUrl, timestamp) {
DeskShareStartRTMPBroadcastEventMessage.super_.call(this, Constants.DESKSHARE_START_BROADCAST_EVENT);
this.payload = {};
this.payload[Constants.CONFERENCE_NAME] = conferenceName;
this.payload[Constants.STREAM_URL] = streamUrl;
this.payload[Constants.TIMESTAMP] = timestamp;
};
inherits(DeskShareStartRTMPBroadcastEventMessage, OutMessage);
return DeskShareStartRTMPBroadcastEventMessage;
}

View File

@ -0,0 +1,20 @@
/*
*
*/
var inherits = require('inherits');
var OutMessage = require('../OutMessage');
module.exports = function (Constants) {
function DeskShareStopRTMPBroadcastEventMessage (conferenceName, streamUrl, timestamp) {
DeskShareStopRTMPBroadcastEventMessage.super_.call(this, Constants.DESKSHARE_STOP_BROADCAST_EVENT);
this.payload = {};
this.payload[Constants.CONFERENCE_NAME] = conferenceName;
this.payload[Constants.STREAM_URL] = streamUrl;
this.payload[Constants.TIMESTAMP] = timestamp;
};
inherits(DeskShareStopRTMPBroadcastEventMessage, OutMessage);
return DeskShareStopRTMPBroadcastEventMessage;
}

View File

@ -0,0 +1,20 @@
/*
*
*/
var inherits = require('inherits');
var OutMessage = require('../OutMessage');
module.exports = function (Constants) {
function StartTranscoderRequestMessage (meetingId, transcoderId, params) {
StartTranscoderRequestMessage.super_.call(this, Constants.START_TRANSCODER_REQUEST);
this.payload = {};
this.payload[Constants.MEETING_ID] = meetingId;
this.payload[Constants.TRANSCODER_ID] = transcoderId;
this.payload[Constants.PARAMS] = params;
};
inherits(StartTranscoderRequestMessage, OutMessage);
return StartTranscoderRequestMessage;
}

View File

@ -0,0 +1,15 @@
var inherits = require('inherits');
var OutMessage = require('../OutMessage');
module.exports = function (Constants) {
function StopTranscoderRequestMessage (meetingId, transcoderId) {
StopTranscoderRequestMessage.super_.call(this, Constants.STOP_TRANSCODER_REQUEST);
this.payload = {};
this.payload[Constants.MEETING_ID] = meetingId;
this.payload[Constants.TRANSCODER_ID] = transcoderId;
};
inherits(StopTranscoderRequestMessage, OutMessage);
return StopTranscoderRequestMessage;
}

View File

@ -0,0 +1,102 @@
/**
* @classdesc
* Redis wrapper class for connecting to Redis channels
*/
/* Modules */
var redis = require('redis');
var config = require('config');
var Constants = require('../messages/Constants.js');
var util = require('util');
const EventEmitter = require('events').EventEmitter;
const _retryThreshold = 1000 * 60 * 60;
const _maxRetries = 10;
/* Public members */
var RedisWrapper = function(subpattern) {
// Redis PubSub client holders
this.redisCli = null;
this.redisPub = null;
// Pub and Sub channels/patterns
this.subpattern = subpattern;
EventEmitter.call(this);
}
util.inherits(RedisWrapper, EventEmitter);
RedisWrapper.prototype.startRedis = function(callback) {
var self = this;
if (this.redisCli) {
console.log(" [RedisWrapper] Redis Client already exists");
callback(false, this);
}
var options = {
host : config.get('redisHost'),
port : config.get('redisPort'),
//password: config.get('redis.password')
retry_strategy: redisRetry
};
this.redisCli = redis.createClient(options);
this.redisPub = redis.createClient(options);
console.log(" [RedisWrapper] Trying to subscribe to redis channel");
this.redisCli.on("psubscribe", function (channel, count) {
console.log(" [RedisWrapper] Successfully subscribed to pattern [" + channel + "]");
});
this.redisCli.on("pmessage", function(pattern, channel, message) {
console.log(" [RedisWrapper] Message received from channel [" + channel + "] : " + message);
// use event emitter to throw new message
self.emit(Constants.REDIS_MESSAGE, message);
});
this.redisCli.psubscribe(this.subpattern);
console.log(" [RedisWrapper] Started Redis client at " + options.host + ":" + options.port +
" for subscription pattern: " + this.subpattern);
callback(false, this);
};
RedisWrapper.prototype.stopRedis = function(callback) {
if (this.redisCli){
this.redisCli.quit();
}
callback(false);
};
RedisWrapper.prototype.publishToChannel = function(message, channel) {
if(this.redisPub) {
console.log(" [RedisWrapper] Sending message to channel: " + message);
this.redisPub.publish(channel, message);
}
};
RedisWrapper.prototype.onMessage = function(pattern, channel, message) {
console.log(" [RedisWrapper] Message received from channel [" + channel + "] : " + message);
// use event emitter to throw new message
this.emit(Constants.REDIS_MESSAGE, message);
}
/* Private members */
function redisRetry(options) {
if (options.error && options.error.code === 'ECONNREFUSED') {
return new Error('The server refused the connection');
}
if (options.total_retry_time > _retryThreshold) {
return new Error('Retry time exhausted');
}
if (options.times_connected > _maxRetries) {
return undefined;
}
return Math.max(options.attempt * 100, 3000);
};
module.exports = RedisWrapper;

View File

@ -0,0 +1,99 @@
/**
* @classdesc
* BigBlueButton redis gateway for bbb-screenshare node app
*/
/* Modules */
var Constants = require('../messages/Constants.js');
var RedisWrapper = require('./RedisWrapper.js');
var config = require('config');
var util = require('util');
var EventEmitter = require('events').EventEmitter;
/* Public members */
var BigBlueButtonGW = function () {
this.redisClients = null
EventEmitter.call(this);
};
util.inherits(BigBlueButtonGW, EventEmitter);
BigBlueButtonGW.prototype.addSubscribeChannel = function (channel, callback) {
var self = this;
if (this.redisClients === null) {
this.redisClients = {};
}
if (this.redisClients[channel]) {
return callback(null, this.redisClients[channel]);
}
var wrobj = new RedisWrapper(channel);
this.redisClients[channel] = {};
this.redisClients[channel] = wrobj;
wrobj.startRedis(function(error, redisCli) {
if(error) {
console.log(" [BigBlueButtonGW] Could not start redis client for channel " + channel);
return callback(error);
}
console.log(" [BigBlueButtonGW] Added redis client to this.redisClients[" + channel + "]");
wrobj.on(Constants.REDIS_MESSAGE, function(message) { var msg = JSON.parse(message);
var header = msg.header;
var payload = msg.payload;
if (header){
switch (header.name) {
case Constants.START_TRANSCODER_REPLY:
console.log("Received TRANSCODER REPLY => " + payload);
self.emit(Constants.START_TRANSCODER_REPLY, payload);
break;
case Constants.STOP_TRANSCODER_REPLY:
self.emit(Constants.STOP_TRANSCODER_REPLY, payload);
break;
default:
console.log(" [BigBlueButtonGW] Unknown Redis message with ID =>" + header.name);
}
}
});
return callback(null, wrobj);
});
};
/**
* Capture messages from subscribed channels and emit an event with it's
* identifier and payload. Check Constants.js for the identifiers.
*
* @param {Object} message Redis message
*/
BigBlueButtonGW.prototype.incomingMessage = function (message) {
var msg = JSON.parse(message);
var header = msg.header;
var payload = msg.payload;
if (header){
switch (header.name) {
case Constants.START_TRANSCODER_REPLY:
console.log("Received TRANSCODER REPLY => " + payload);
this.emit(Constants.START_TRANSCODER_REPLY, payload);
break;
case Constants.STOP_TRANSCODER_REPLY:
this.emit(Constants.STOP_TRANSCODER_REPLY, payload);
break;
default:
console.log(" [BigBlueButtonGW] Unknown Redis message with ID =>" + header.name);
}
}
};
BigBlueButtonGW.prototype.publish = function (message, channel, callback) {
for(var client in this.redisClients) {
if(typeof this.redisClients[client].publishToChannel === 'function') {
this.redisClients[client].publishToChannel(message, channel);
return callback(null);
}
}
return callback("Client not found");
};
module.exports = BigBlueButtonGW;

View File

@ -0,0 +1,56 @@
/*
* A module with the sole purpose of removing all non h264 options from an sdpOffer
*
* We use this to prevent any transcoding from the Kurento side if Firefox or Chrome offer VP8/VP9 as
* the default format.
*/
var sdpTransform = require('sdp-transform');
exports.transform = function(sdp) {
var mediaIndex = 0;
var res = sdpTransform.parse(sdp);
var validPayloads;
if (res.media[0].type === 'audio') {
// Audio
res.media[mediaIndex].rtp = res.media[mediaIndex].rtp.filter(function(elem) {
return elem.codec === 'opus';
});
validPayloads = res.media[mediaIndex].rtp.map(function(elem) {
return elem.payload;
});
res.media[mediaIndex].fmtp = res.media[mediaIndex].fmtp.filter(function(elem) {
return validPayloads.indexOf(elem.payload) >= 0;
});
res.media[mediaIndex].payloads = validPayloads.join(' ');
mediaIndex += 1;
}
// Video
res.media[mediaIndex].rtp = res.media[mediaIndex].rtp.filter(function(elem) {
return elem.codec === 'H264';
});
validPayloads = res.media[mediaIndex].rtp.map(function(elem) {
return elem.payload;
});
res.media[mediaIndex].fmtp = res.media[mediaIndex].fmtp.filter(function(elem) {
return validPayloads.indexOf(elem.payload) >= 0;
});
res.media[mediaIndex].rtcpFb = res.media[mediaIndex].rtcpFb.filter(function(elem) {
return validPayloads.indexOf(elem.payload) >= 0;
});
res.media[mediaIndex].payloads = validPayloads.join(' ');
return sdpTransform.write(res);
};

View File

@ -0,0 +1,43 @@
var config = require('config');
var Constants = require('./bbb/messages/Constants');
module.exports.generateSdp = function(remote_ip_address, remote_video_port) {
return "v=0\r\n"
+ "o=- 0 0 IN IP4 " + remote_ip_address + "\r\n"
+ "s=Kurento-SCREENSHARE\r\n"
+ "c=IN IP4 " + remote_ip_address + "\r\n"
+ "t=0 0\r\n"
+ "m=video " + remote_video_port + " RTP/AVP 96\r\n"
+ "a=rtpmap:96 H264/90000\r\n"
+ "a=ftmp:96\r\n";
}
module.exports.generateVideoSdp = function (sourceIpAddress, sourceVideoPort, codecId, sendReceive, rtpProfile, codecName, codecRate, fmtp) {
return 'm=video ' + sourceVideoPort + ' ' + rtpProfile + ' ' + codecId + '\r\n'
+ 'a=' + sendReceive + '\r\n'
+ 'c=IN IP4 ' + sourceIpAddress + '\r\n'
+ 'a=rtpmap:' + codecId + ' ' + codecName + '/' + codecRate + '\r\n'
+ 'a=fmtp:' + codecId + ' ' + fmtp + '\r\n';
};
module.exports.generateTranscoderParams = function (localIp, destIp, sendPort, recvPort, input, streamType, transcoderType, codec, callername) {
var rtpParams = {};
rtpParams[Constants.LOCAL_IP_ADDRESS] = localIp;
rtpParams[Constants.LOCAL_VIDEO_PORT] = sendVideoPort;
rtpParams[Constants.DESTINATION_IP_ADDRESS] = destIp;
rtpParams[Constants.REMOTE_VIDEO_PORT] = recvPort;
rtpParams[Constants.INPUT] = input;
rtpParams[Constants.STREAM_TYPE] = streamType;
rtpParams[Constants.TRANSCODER_TYPE] = transcoderType;
rtpParams[Constants.TRANSCODER_CODEC] = codec;
rtpParams[Constants.CALLERNAME] = callername;
return rtpParams;
}
module.exports.getPort = function (min_port, max_port) {
return Math.floor((Math.random() * (max_port - min_port + 1) + min_port));
}
module.exports.getVideoPort = function () {
return this.getPort(config.get('minVideoPort'), config.get('maxVideoPort'));
}

View File

@ -0,0 +1,308 @@
/*
* Lucas Fialho Zawacki
* Paulo Renato Lanzarin
* (C) Copyright 2017 Bigbluebutton
*
*/
// Imports
var Constants = require('./bbb/messages/Constants');
var MediaHandler = require('./media-handler');
var Messaging = require('./bbb/messages/Messaging');
var moment = require('moment');
var h264_sdp = require('./h264-sdp');
var now = moment();
// Global stuff
var mediaPipelines = {};
var sharedScreens = {};
var rtpEndpoints = {};
const kurento = require('kurento-client');
const config = require('config');
const kurentoUrl = config.get('kurentoUrl');
const kurentoIp = config.get('kurentoIp');
const localIpAddress = config.get('localIpAddress');
if (config.get('acceptSelfSignedCertificate')) {
process.env.NODE_TLS_REJECT_UNAUTHORIZED=0;
}
var kurentoClient = null;
function getKurentoClient(callback) {
if (kurentoClient !== null) {
return callback(null, kurentoClient);
}
kurento(kurentoUrl, function(error, _kurentoClient) {
if (error) {
console.log("Could not find media server at address " + kurentoUrl);
return callback("Could not find media server at address" + kurentoUrl + ". Exiting with error " + error);
}
console.log(" [server] Initiating kurento client. Connecting to: " + kurentoUrl);
kurentoClient = _kurentoClient;
callback(null, kurentoClient);
});
}
function getMediaPipeline(id, callback) {
console.log(' [media] Creating media pipeline for ' + id);
if (mediaPipelines[id]) {
console.log(' [media] Pipeline already exists.');
callback(null, mediaPipelines[id]);
} else {
kurentoClient.create('MediaPipeline', function(err, pipeline) {
mediaPipelines[id] = pipeline;
return callback(err, pipeline);
});
}
}
function Screenshare(_ws, _id, _bbbgw, _voiceBridge, _caller) {
var ws = _ws;
var id = _id;
var BigBlueButtonGW = _bbbgw
var webRtcEndpoint = null;
var rtpEndpoint = null;
var voiceBridge = _voiceBridge;
var caller = _caller;
var streamUrl = "";
// TODO fetch those parameters from BBB
var vw = 1920;
var vh = 1200;
var candidatesQueue = [];
this.onIceCandidate = function(_candidate) {
var candidate = kurento.getComplexType('IceCandidate')(_candidate);
if (webRtcEndpoint) {
webRtcEndpoint.addIceCandidate(candidate);
}
else {
candidatesQueue.push(candidate);
}
};
// TODO this method should be refactored
this.startPresenter = function(id, ws, sdpOffer, callback) {
var self = this;
var theCallback = callback;
// Force H264 on Firefox and Chrome
sdpOffer = h264_sdp.transform(sdpOffer);
console.log("Starting presenter for " + sdpOffer);
getKurentoClient(function(error, kurentoClient) {
if (error) {
console.log("Kurento client error " + error);
return theCallback(error);
}
console.log("Got kurento client");
getMediaPipeline(id, function(error, pipeline) {
if (error) {
console.log("Media pipeline client error" + error);
return theCallback(error);
}
console.log("Got pipeline " + pipeline.id);
createMediaElements(pipeline, function(error, _webRtcEndpoint, _rtpEndpoint) {
if (error) {
console.log("Media elements error" + error);
pipeline.release();
return theCallback(error);
}
console.log("Got WebRTC endpoint " + _webRtcEndpoint.id);
while(candidatesQueue.length) {
var candidate = candidatesQueue.shift();
_webRtcEndpoint.addIceCandidate(candidate);
}
var flowInOut = function(event) {
console.log(' [=] ' + event.type + ' for endpoint ' + id);
if (event.state === 'NOT_FLOWING') {
} else if (event.state === 'FLOWING') {
}
};
_webRtcEndpoint.on('MediaFlowInStateChange', flowInOut);
_webRtcEndpoint.on('MediaFlowOutStateChange', flowInOut);
connectMediaElements(_webRtcEndpoint, _rtpEndpoint, function(error) {
if (error) {
console.log("Media elements CONNECT error" + error);
pipeline.release();
return theCallback(error);
}
console.log("Elements connected");
// It's a user sharing a Screen
sharedScreens[id] = _webRtcEndpoint;
rtpEndpoints[id] = _rtpEndpoint;
// Store our endpoint
webRtcEndpoint = _webRtcEndpoint;
rtpEndpoint = _rtpEndpoint;
_webRtcEndpoint.on('OnIceCandidate', function(event) {
var candidate = kurento.getComplexType('IceCandidate')(event.candidate);
ws.sendMessage({ id : 'iceCandidate', cameraId: id, candidate : candidate });
});
_webRtcEndpoint.processOffer(sdpOffer, function(error, webRtcSdpAnswer) {
if (error) {
console.log(" [webrtc] processOffer error => " + error + "for SDP " + sdpOffer);
pipeline.release();
return theCallback(error);
}
sendVideoPort = MediaHandler.getVideoPort();
var rtpSdpOffer = MediaHandler.generateSdp(localIpAddress, sendVideoPort);
console.log(" [rtpendpoint] RtpEndpoint processing => " + rtpSdpOffer);
_rtpEndpoint.processOffer(rtpSdpOffer, function(error, rtpSdpAnswer) {
if (error) {
console.log(" [rtpendpoint] processOffer error => " + error + "for SDP " + rtpSdpOffer);
pipeline.release();
return theCallback(error);
}
console.log(" [rtpendpoint] KMS answer SDP => " + rtpSdpAnswer);
var recvVideoPort = rtpSdpAnswer.match(/m=video\s(\d*)/)[1];
var rtpParams = MediaHandler.generateTranscoderParams(localIpAddress, kurentoIp,
sendVideoPort, recvVideoPort, voiceBridge, "stream_type_video", Constants.RTP_TO_RTMP, "copy", "caller");
_rtpEndpoint.on('MediaFlowInStateChange', function(event) {
if (event.state === 'NOT_FLOWING') {
} else if (event.state === 'FLOWING') {
var strm = Messaging.generateStartTranscoderRequestMessage(voiceBridge, voiceBridge, rtpParams);
BigBlueButtonGW.publish(strm, Constants.TO_BBB_TRANSCODE_SYSTEM_CHAN, function(error) {});
BigBlueButtonGW.on(Constants.START_TRANSCODER_REPLY, function(payload) {
console.log("REPLY PAYLOAD => " + JSON.stringify(payload, null, 2));
streamUrl = payload.streamUrl?streamUrl:'';
var timestamp = now.format('hhmmss');
var dsrbstam = Messaging.generateDeskShareRTMPBroadcastStartedEvent(voiceBridge, streamUrl, vw, vh, timestamp);
BigBlueButtonGW.publish(dsrbstam, Constants.FROM_VOICE_CONF_SYSTEM_CHAN, function(error) {});
});
}
});
return theCallback(null, webRtcSdpAnswer);
});
});
_webRtcEndpoint.gatherCandidates(function(error) {
if (error) { return theCallback(error);
}
});
});
});
});
});
};
var createMediaElements = function(pipeline, callback) {
console.log(" [webrtc] Creating webrtc and rtp endpoints");
pipeline.create('WebRtcEndpoint', function(error, _webRtcEndpoint) {
if (error) {
return callback(error);
}
webRtcEndpoint = _webRtcEndpoint;
pipeline.create('RtpEndpoint', function(error, _rtpEndpoint) {
if (error) {
return callback(error);
}
rtpEndpoint = _rtpEndpoint;
return callback(null, _webRtcEndpoint, _rtpEndpoint);
});
});
};
var connectMediaElements = function(webRtcEndpoint, rtpEndpoint, callback) {
// User is sharing Screen (sendOnly connection from the client)
console.log(" [webrtc] User wants to receive Screen ");
webRtcEndpoint.connect(rtpEndpoint, function(error) {
if (error) {
return callback(error);
}
return callback(null);
});
};
this.stop = function() {
console.log(' [stop] Releasing endpoints for ' + id);
this.stopKurentoScreenshare();
if (webRtcEndpoint) {
webRtcEndpoint.release();
webRtcEndpoint = null;
} else {
console.log(" [webRtcEndpoint] PLEASE DONT TRY STOPPING THINGS TWICE");
}
if (rtpEndpoint) {
rtpEndpoint.release();
rtpEndpoint = null;
} else {
console.log(" [rtpEndpoint] PLEASE DONT TRY STOPPING THINGS TWICE");
}
console.log(' [stop] Screen is shared, releasing ' + id);
if (mediaPipelines[id]) {
mediaPipelines[id].release();
} else {
console.log(" [mediaPipeline] PLEASE DONT TRY STOPPING THINGS TWICE");
}
delete mediaPipelines[id];
delete sharedScreens[id];
delete candidatesQueue;
};
this.stopKurentoScreenshare = function () {
var strm = Messaging.generateStopTranscoderRequestMessage(voiceBridge, voiceBridge);
BigBlueButtonGW.publish(strm, Constants.TO_BBB_TRANSCODE_SYSTEM_CHAN, function(error) {});
BigBlueButtonGW.on(Constants.STOP_TRANSCODER_REPLY, function(payload) {
var meetingId = payload[Constants.MEETING_ID];
var transcoderId = payload[Constants.TRANSCODER_ID];
if(voiceBridge === meetingId) {
var dsrstom = Messaging.generateDeskShareRTMPBroadcastStoppedEvent(voiceBridge,
streamUrl, vw, vh, streamUrl);
BigblueButton.publish(dsrstom, FROM_VOICE_CONF_SYSTEM_CHAN, function(error) {});
}
});
}
return this;
};
module.exports = Screenshare;

View File

@ -0,0 +1,18 @@
/*
* Simple wrapper around the ws library
*
*/
var ws = require('ws');
ws.prototype.sendMessage = function(json) {
return this.send(JSON.stringify(json), function(error) {
if(error)
console.log(' [server] Websocket error "' + error + '" on message "' + json.id + '"');
});
};
module.exports = ws;

View File

@ -0,0 +1,23 @@
{
"name": "bbb-screenshare-video-kurento-bridge",
"version": "1.0.0",
"private": true,
"scripts": {
"start": "nodejs server.js",
"postinstall": "npm start"
},
"dependencies": {
"cookie-parser": "^1.3.5",
"express": "~4.12.4",
"express-session": "~1.10.3",
"ws": "~1.0.1",
"kurento-client": "6.6.0",
"redis": "^2.6.2",
"sdp-transform": "*",
"moment": "*"
},
"devDependencies": {
"config": "^1.26.1",
"js-yaml": "^3.8.3"
}
}

View File

@ -0,0 +1,178 @@
/*
* Lucas Fialho Zawacki
* Paulo Renato Lanzarin
* (C) Copyright 2017 Bigbluebutton
*
*/
var cookieParser = require('cookie-parser')
var express = require('express');
var session = require('express-session')
var wsModule = require('./lib/websocket');
var http = require('http');
var fs = require('fs');
var Screenshare = require('./lib/screenshare');
var Constants = require('./lib/bbb/messages/Constants');
// Global variables
var app = express();
var sessions = {};
var BigBlueButtonGW = require('./lib/bbb/pubsub/bbb-gw');
/*
* Management of sessions
*/
app.use(cookieParser());
var sessionHandler = session({
secret : 'Shawarma', rolling : true, resave : true, saveUninitialized : true
});
app.use(sessionHandler);
/*
* Server startup
*/
var server = http.createServer(app).listen(3008, function() {
console.log(' [*] Running bbb-screenshare kurento screenshare service.');
});
var wss = new wsModule.Server({
server : server,
path : '/kurento-screenshare'
});
var clientId = 0;
var bbbGW = new BigBlueButtonGW();
bbbGW.addSubscribeChannel(Constants.FROM_BBB_TRANSCODE_SYSTEM_CHAN, function(error, redisWrapper) {
if(error) {
console.log(' Could not connect to transcoder redis channel, finishing app...');
stopAll();
}
console.log(' [server] Successfully subscribed to redis channel');
});
wss.on('connection', function(ws) {
var sessionId;
var request = ws.upgradeReq;
var response = {
writeHead : {}
};
sessionHandler(request, response, function(err) {
sessionId = request.session.id + "_" + clientId++;
if (!sessions[sessionId]) {
sessions[sessionId] = {};
}
console.log('Connection received with sessionId ' + sessionId);
});
ws.on('error', function(error) {
console.log('Connection ' + sessionId + ' error');
// stop(sessionId);
});
ws.on('close', function() {
console.log('Connection ' + sessionId + ' closed');
stopSession(sessionId);
});
ws.on('message', function(_message) {
var message = JSON.parse(_message);
var screenshare;
if (message.presenterId && sessions[sessionId][message.presenterId]) {
screenshare = sessions[sessionId][message.presenterId];
}
switch (message.id) {
case 'presenter':
console.log('[' + message.id + '] connection [' + sessionId + '][' + message.presenterId + '][' + message.voiceBridge + '][' + message.callerName + ']');
screenshare = new Screenshare(ws, message.presenterId, bbbGW, message.voiceBridge, message.callerName);
sessions[sessionId][message.presenterId] = screenshare;
// starts presenter by sending sessionID, websocket and sdpoffer
screenshare.startPresenter(sessionId, ws, message.sdpOffer, function(error, sdpAnswer) {
console.log(" Started presenter " + sessionId);
if (error) {
return ws.send(JSON.stringify({
id : 'presenterResponse',
response : 'rejected',
message : error
}));
}
ws.send(JSON.stringify({
id : 'presenterResponse',
response : 'accepted',
sdpAnswer : sdpAnswer
}));
console.log(" [websocket] Sending presenterResponse \n" + sdpAnswer);
});
break;
case 'stop':
console.log('[' + message.id + '] connection ' + sessionId);
if (screenshare) {
screenshare.stop(sessionId);
} else {
console.log(" [stop] Why is there no screenshare on STOP?");
}
break;
case 'onIceCandidate':
if (screenshare) {
screenshare.onIceCandidate(message.candidate);
} else {
console.log(" [iceCandidate] Why is there no screenshare on ICE CANDIDATE?");
}
break;
default:
ws.sendMessage({ id : 'error', message : 'Invalid message ' + message });
break;
}
});
});
var stopSession = function(sessionId) {
console.log(' [>] Stopping session ' + sessionId);
var screenshareIds = Object.keys(sessions[sessionId]);
for (var i = 0; i < screenshareIds.length; i++) {
var screenshare = sessions[sessionId][screenshareIds[i]];
screenshare.stop();
delete sessions[sessionId][screenshareIds[i]];
}
delete sessions[sessionId];
}
var stopAll = function() {
console.log('\n [x] Stopping everything! ');
var sessionIds = Object.keys(sessions);
for (var i = 0; i < sessionIds.length; i++) {
stopSession(sessionIds[i]);
}
setTimeout(process.exit, 1000);
}
process.on('SIGTERM', stopAll);
process.on('SIGINT', stopAll);