Refactored websocket connection manager and fixed event emitter leak

This commit is contained in:
prlanzarin 2018-05-08 17:29:04 +00:00
parent 5cee4c6e5a
commit d092edc6b1

View File

@ -7,11 +7,6 @@ const Logger = require('../utils/Logger');
// ID counter
let connectionIDCounter = 0;
ws.prototype.setErrorCallback = function(callback) {
this._errorCallback = callback;
};
ws.prototype.sendMessage = function(json) {
let websocket = this;
@ -38,107 +33,69 @@ module.exports = class WebsocketConnectionManager {
path
});
this.wss.on ('connection', (ws) => {
let self = this;
this.webSockets = {};
ws.id = connectionIDCounter++;
Logger.info("[WebsocketConnectionManager] New connection with id [ " + ws.id + " ]");
ws.on('message', (data) => {
let message = {};
try {
message = JSON.parse(data);
if (message.id === 'ping') {
ws.sendMessage({id: 'pong'});
return;
}
message.connectionId = ws.id;
if (!ws.sessionId) {
ws.sessionId = message.voiceBridge;
}
if (!ws.route) {
ws.route = message.type;
}
if (!ws.role) {
ws.role = message.role;
}
} catch(e) {
console.error(" [WebsocketConnectionManager] JSON message parse error " + e);
message = {};
}
// Test for empty or invalid JSON
if (Object.getOwnPropertyNames(message).length !== 0) {
this.emitter.emit(C.WEBSOCKET_MESSAGE, message);
}
});
//ws.on('message', this._onMessage.bind(this));
ws.setErrorCallback(this._onError.bind(this));
ws.on('close', (ev) => {
Logger.info('[WebsocketConnectionManager] Closed connection on [' + ws.id + ']');
let message = {
id: 'close',
type: ws.route,
role: ws.role,
voiceBridge: ws.sessionId,
connectionId: ws.id
}
this.emitter.emit(C.WEBSOCKET_MESSAGE, message);
ws = null;
});
ws.on('error', (err) => {
Logger.error('[WebsocketConnectionManager] Connection error [' + ws.id + ']');
let message = {
id: 'error',
type: ws.route,
role: ws.role,
voiceBridge: ws.sessionId,
connectionId: ws.id
}
this.emitter.emit(C.WEBSOCKET_MESSAGE, message);
ws = null;
});
// TODO: should we delete this listener after websocket dies?
this.emitter.on('response', (data) => {
if (ws && ws.id == data.connectionId) {
ws.sendMessage(data);
}
});
});
this.wss.on('connection', this._onNewConnection.bind(this));
}
setEventEmitter (emitter) {
this.emitter = emitter;
this.emitter.on('response', this._onServerResponse.bind(this));
}
_onServerResponse (data) {
// Here this is the 'ws' instance
this.sendMessage(data);
const connectionId = data? data.connectionId : null;
const ws = this.webSockets[connectionId];
if (ws) {
ws.sendMessage(data);
}
}
_onMessage (data) {
_onNewConnection (ws) {
ws.id = connectionIDCounter++;
this.webSockets[ws.id] = ws;
Logger.info("[WebsocketConnectionManager] New connection with id [ " + ws.id + " ]");
ws.on('message', (data) => {
this._onMessage(ws, data);
});
ws.on('close', (err) => {
this._onClose(ws, err);
});
ws.on('error', (err) => {
this._onError(ws, err);
});
};
_onMessage (ws, data) {
let message = {};
try {
message = JSON.parse(data);
if (message.id === 'ping') {
ws.sendMessage({id: 'pong'});
return;
}
message.connectionId = ws.id;
if (!ws.sessionId) {
ws.sessionId = message.voiceBridge;
}
if (!ws.route) {
ws.route = message.type;
}
if (!ws.role) {
ws.role = message.role;
}
} catch(e) {
console.error(" [WebsocketConnectionManager] JSON message parse error " + e);
Logger.error(" [WebsocketConnectionManager] JSON message parse error " + e);
message = {};
}
@ -148,27 +105,33 @@ module.exports = class WebsocketConnectionManager {
}
}
_onError (err) {
Logger.error('[WebsocketConnectionManager] Connection error');
_onError (ws, err) {
Logger.error('[WebsocketConnectionManager] Connection error [' + ws.id + ']', err);
let message = {
id: 'error',
type: ws.route,
role: ws.role,
voiceBridge: ws.sessionId,
connectionId: ws.id
}
this.emitter.emit(C.WEBSOCKET_MESSAGE, message);
delete this.webSockets[ws.id];
}
_onClose (err) {
Logger.info('[WebsocketConnectionManager] Closed connection [' + this.id + ']');
_onClose (ws, err) {
Logger.info('[WebsocketConnectionManager] Closed connection on [' + ws.id + ']');
let message = {
id: 'close',
voiceBridge: this.sessionId,
connectionId: this.id
type: ws.route,
role: ws.role,
voiceBridge: ws.sessionId,
connectionId: ws.id
}
this.emitter.emit(C.WEBSOCKET_MESSAGE, message);
}
_stop () {
delete this.webSockets[ws.id];
}
}