Merge pull request #5701 from pedrobmarin/webhooks-features-2.0

Webhooks features 2.0
This commit is contained in:
Fred Dixon 2018-06-15 15:37:58 -04:00 committed by GitHub
commit 381e4dd7c1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
30 changed files with 3283 additions and 965 deletions

View File

@ -3,4 +3,5 @@
*.log
node_modules/
config_local.coffee
config_local.js
log/*

View File

@ -1 +1 @@
0.10.33
8.4.0

View File

@ -1,7 +1,5 @@
// This is a simple wrapper to run the app with 'node app.js'
require("coffee-script/register");
Application = require('./application.coffee');
Application = require('./application.js');
application = new Application();
application.start();

View File

@ -1,26 +0,0 @@
redis = require("redis")
config = require("./config")
Hook = require("./hook")
IDMapping = require("./id_mapping")
WebHooks = require("./web_hooks")
WebServer = require("./web_server")
# Class that defines the application. Listens for events on redis and starts the
# process to perform the callback calls.
# TODO: add port (-p) and log level (-l) to the command line args
module.exports = class Application
constructor: ->
# one for pub/sub, another to read/write data
config.redis.pubSubClient = redis.createClient()
config.redis.client = redis.createClient()
@webHooks = new WebHooks()
@webServer = new WebServer()
start: ->
Hook.initialize =>
IDMapping.initialize =>
@webServer.start(config.server.port)
@webHooks.start()

View File

@ -0,0 +1,38 @@
const config = require("./config.js");
const Hook = require("./hook.js");
const IDMapping = require("./id_mapping.js");
const WebHooks = require("./web_hooks.js");
const WebServer = require("./web_server.js");
const redis = require("redis");
const UserMapping = require("./userMapping.js");
const async = require("async");
// Class that defines the application. Listens for events on redis and starts the
// process to perform the callback calls.
// TODO: add port (-p) and log level (-l) to the command line args
module.exports = class Application {
constructor() {
config.redis.pubSubClient = redis.createClient();
config.redis.client = redis.createClient()
this.webHooks = new WebHooks();
this.webServer = new WebServer();
}
start(callback) {
Hook.initialize(() => {
UserMapping.initialize(() => {
IDMapping.initialize(()=> {
async.parallel([
(callback) => { this.webServer.start(config.server.port, callback) },
(callback) => { this.webServer.createPermanents(callback) },
(callback) => { this.webHooks.start(callback) }
], (err,results) => {
if(err != null) {}
typeof callback === 'function' ? callback() : undefined;
});
});
});
});
}
};

View File

@ -1,87 +0,0 @@
_ = require('lodash')
request = require("request")
url = require('url')
EventEmitter = require('events').EventEmitter
config = require("./config")
Logger = require("./logger")
Utils = require("./utils")
# Use to perform a callback. Will try several times until the callback is
# properly emitted and stop when successful (or after a given number of tries).
# Used to emit a single callback. Destroy it and create a new class for a new callback.
# Emits "success" on success, "failure" on error and "stopped" when gave up trying
# to perform the callback.
module.exports = class CallbackEmitter extends EventEmitter
constructor: (@callbackURL, @message) ->
@nextInterval = 0
@timestap = 0
start: ->
@timestamp = new Date().getTime()
@nextInterval = 0
@_scheduleNext 0
_scheduleNext: (timeout) ->
setTimeout( =>
@_emitMessage (error, result) =>
if not error? and result
@emit "success"
else
@emit "failure", error
# get the next interval we have to wait and schedule a new try
interval = config.hooks.retryIntervals[@nextInterval]
if interval?
Logger.warn "xx> Trying the callback again in #{interval/1000.0} secs"
@nextInterval++
@_scheduleNext(interval)
# no intervals anymore, time to give up
else
@nextInterval = 0
@emit "stopped"
, timeout)
_emitMessage: (callback) ->
# data to be sent
# note: keep keys in alphabetical order
data =
event: JSON.stringify(@message)
timestamp: @timestamp
# calculate the checksum
checksum = Utils.checksum("#{@callbackURL}#{JSON.stringify(data)}#{config.bbb.sharedSecret}")
# get the final callback URL, including the checksum
urlObj = url.parse(@callbackURL, true)
callbackURL = @callbackURL
callbackURL += if _.isEmpty(urlObj.search) then "?" else "&"
callbackURL += "checksum=#{checksum}"
requestOptions =
followRedirect: true
maxRedirects: 10
uri: callbackURL
method: "POST"
form: data
request requestOptions, (error, response, body) ->
if error? or not (response?.statusCode >= 200 and response?.statusCode < 300)
Logger.warn "xx> Error in the callback call to: [#{requestOptions.uri}] for #{simplifiedEvent(data.event)}"
Logger.warn "xx> Error:", error
Logger.warn "xx> Status:", response?.statusCode
callback error, false
else
Logger.info "==> Successful callback call to: [#{requestOptions.uri}] for #{simplifiedEvent(data.event)}"
callback null, true
# A simple string that identifies the event
simplifiedEvent = (event) ->
try
eventJs = JSON.parse(event)
"event: { name: #{eventJs.envelope?.name}, timestamp: #{eventJs.envelope?.timestamp} }"
catch e
"event: #{event}"

View File

@ -0,0 +1,138 @@
const _ = require('lodash');
const request = require("request");
const url = require('url');
const EventEmitter = require('events').EventEmitter;
const config = require("./config.js");
const Logger = require("./logger.js");
const Utils = require("./utils.js");
// Use to perform a callback. Will try several times until the callback is
// properly emitted and stop when successful (or after a given number of tries).
// Used to emit a single callback. Destroy it and create a new class for a new callback.
// Emits "success" on success, "failure" on error and "stopped" when gave up trying
// to perform the callback.
module.exports = class CallbackEmitter extends EventEmitter {
constructor(callbackURL, message, permanent) {
super();
this.callbackURL = callbackURL;
this.message = message;
this.nextInterval = 0;
this.timestap = 0;
this.permanent = false;
this.permanent = permanent;
}
start() {
this.timestamp = new Date().getTime();
this.nextInterval = 0;
this._scheduleNext(0);
}
_scheduleNext(timeout) {
setTimeout( () => {
this._emitMessage((error, result) => {
if ((error == null) && result) {
this.emit("success");
} else {
this.emit("failure", error);
// get the next interval we have to wait and schedule a new try
const interval = config.hooks.retryIntervals[this.nextInterval];
if (interval != null) {
Logger.warn(`[Emitter] trying the callback again in ${interval/1000.0} secs`);
this.nextInterval++;
this._scheduleNext(interval);
// no intervals anymore, time to give up
} else {
this.nextInterval = !this.permanent ? 0 : config.hooks.permanentIntervalReset; // Reset interval to permanent hooks
if(this.permanent){
this._scheduleNext(interval);
}
else {
return this.emit("stopped");
}
}
}
});
}
, timeout);
}
_emitMessage(callback) {
let data,requestOptions;
if (config.bbb.auth2_0) {
// Send data as a JSON
data = "[" + this.message + "]";
const callbackURL = this.callbackURL;
requestOptions = {
followRedirect: true,
maxRedirects: 10,
uri: callbackURL,
method: "POST",
form: data,
auth: {
bearer: config.bbb.sharedSecret
}
};
}
else {
// data to be sent
// note: keep keys in alphabetical order
data = {
event: "[" + this.message + "]",
timestamp: this.timestamp
};
// calculate the checksum
const checksum = Utils.checksum(`${this.callbackURL}${JSON.stringify(data)}${config.bbb.sharedSecret}`);
// get the final callback URL, including the checksum
const urlObj = url.parse(this.callbackURL, true);
let callbackURL = this.callbackURL;
callbackURL += _.isEmpty(urlObj.search) ? "?" : "&";
callbackURL += `checksum=${checksum}`;
requestOptions = {
followRedirect: true,
maxRedirects: 10,
uri: callbackURL,
method: "POST",
form: data
};
}
const responseFailed = (response) => {
var statusCode = (response != null ? response.statusCode : undefined)
return !((statusCode >= 200) && (statusCode < 300))
};
request(requestOptions, function(error, response, body) {
if ((error != null) || responseFailed(response)) {
Logger.warn(`[Emitter] error in the callback call to: [${requestOptions.uri}] for ${simplifiedEvent(data)}`, "error:", error, "status:", response != null ? response.statusCode : undefined);
callback(error, false);
} else {
Logger.info(`[Emitter] successful callback call to: [${requestOptions.uri}] for ${simplifiedEvent(data)}`);
callback(null, true);
}
});
}
};
// A simple string that identifies the event
var simplifiedEvent = function(event) {
if (event.event != null) {
event = event.event
}
try {
const eventJs = JSON.parse(event);
return `event: { name: ${(eventJs.data != null ? eventJs.data.id : undefined)}, timestamp: ${(eventJs.data.event != null ? eventJs.data.event.ts : undefined)} }`;
} catch (e) {
return `event: ${event}`;
}
};

View File

@ -1,115 +0,0 @@
# Global configuration file
# load the local configs
config = require("./config_local")
# BigBlueButton configs
config.bbb or= {}
config.bbb.sharedSecret or= "33e06642a13942004fd83b3ba6e4104a"
config.bbb.apiPath or= "/bigbluebutton/api"
# Web server configs
config.server or= {}
config.server.port or= 3005
# Web hooks configs
config.hooks or= {}
# Channels to subscribe to.
config.hooks.channels or= {
mainChannel: 'from-akka-apps-redis-channel',
rapChannel: 'bigbluebutton:from-rap'
}
# Filters to the events we want to generate callback calls for
config.hooks.events or= [
{ channel: config.hooks.channels.mainChannel, name: "MeetingCreatedEvtMsg" },
{ channel: config.hooks.channels.mainChannel, name: "MeetingEndedEvtMsg" },
{ channel: config.hooks.channels.mainChannel, name: "UserJoinedMeetingEvtMsg" },
{ channel: config.hooks.channels.mainChannel, name: "UserLeftMeetingEvtMsg" },
{ channel: config.hooks.channels.mainChannel, name: "UserJoinedVoiceConfToClientEvtMsg" },
{ channel: config.hooks.channels.mainChannel, name: "UserLeftVoiceConfToClientEvtMsg" },
{ channel: config.hooks.channels.mainChannel, name: "UserMutedVoiceEvtMsg" },
{ channel: config.hooks.channels.mainChannel, name: "UserBroadcastCamStartedEvtMsg" },
{ channel: config.hooks.channels.mainChannel, name: "UserBroadcastCamStoppedEvtMsg" },
{ channel: config.hooks.channels.mainChannel, name: "RecordingStatusChangedEvtMsg" },
{ channel: config.hooks.channels.rapChannel, name: "sanity_started" },
{ channel: config.hooks.channels.rapChannel, name: "sanity_ended" },
{ channel: config.hooks.channels.rapChannel, name: "archive_started" },
{ channel: config.hooks.channels.rapChannel, name: "archive_ended" },
{ channel: config.hooks.channels.rapChannel, name: "post_archive_started" },
{ channel: config.hooks.channels.rapChannel, name: "post_archive_ended" },
{ channel: config.hooks.channels.rapChannel, name: "process_started" },
{ channel: config.hooks.channels.rapChannel, name: "process_ended" },
{ channel: config.hooks.channels.rapChannel, name: "post_process_started" },
{ channel: config.hooks.channels.rapChannel, name: "post_process_ended" },
{ channel: config.hooks.channels.rapChannel, name: "publish_started" },
{ channel: config.hooks.channels.rapChannel, name: "publish_ended" },
{ channel: config.hooks.channels.rapChannel, name: "post_publish_started" },
{ channel: config.hooks.channels.rapChannel, name: "post_publish_ended" },
{ channel: config.hooks.channels.rapChannel, name: "unpublished" },
{ channel: config.hooks.channels.rapChannel, name: "published" },
{ channel: config.hooks.channels.rapChannel, name: "deleted" }
]
# Retry intervals for failed attempts for perform callback calls.
# In ms. Totals to around 5min.
config.hooks.retryIntervals = [
100, 500, 1000, 2000, 4000, 8000, 10000, 30000, 60000, 60000, 60000, 60000
]
# Mappings of internal to external meeting IDs
config.mappings = {}
config.mappings.cleanupInterval = 10000 # 10 secs, in ms
config.mappings.timeout = 1000*60*60*24 # 24 hours, in ms
# Redis
config.redis = {}
config.redis.keys = {}
config.redis.keys.hook = (id) -> "bigbluebutton:webhooks:hook:#{id}"
config.redis.keys.hooks = "bigbluebutton:webhooks:hooks"
config.redis.keys.mappings = "bigbluebutton:webhooks:mappings"
config.redis.keys.mapping = (id) -> "bigbluebutton:webhooks:mapping:#{id}"
config.api = {}
config.api.responses = {}
config.api.responses.failure = (key, msg) ->
"<response> \
<returncode>FAILED</returncode> \
<messageKey>#{key}</messageKey> \
<message>#{msg}</message> \
</response>"
config.api.responses.checksumError =
config.api.responses.failure("checksumError", "You did not pass the checksum security check.")
config.api.responses.createSuccess = (id) ->
"<response> \
<returncode>SUCCESS</returncode> \
<hookID>#{id}</hookID> \
</response>"
config.api.responses.createFailure =
config.api.responses.failure("createHookError", "An error happened while creating your hook. Check the logs.")
config.api.responses.createDuplicated = (id) ->
"<response> \
<returncode>SUCCESS</returncode> \
<hookID>#{id}</hookID> \
<messageKey>duplicateWarning</messageKey> \
<message>There is already a hook for this callback URL.</message> \
</response>"
config.api.responses.destroySuccess =
"<response> \
<returncode>SUCCESS</returncode> \
<removed>true</removed> \
</response>"
config.api.responses.destroyFailure =
config.api.responses.failure("destroyHookError", "An error happened while removing your hook. Check the logs.")
config.api.responses.destroyNoHook =
config.api.responses.failure("destroyMissingHook", "The hook informed was not found.")
config.api.responses.missingParamCallbackURL =
config.api.responses.failure("missingParamCallbackURL", "You must specify a callbackURL in the parameters.")
config.api.responses.missingParamHookID =
config.api.responses.failure("missingParamHookID", "You must specify a hookID in the parameters.")
module.exports = config

109
bbb-webhooks/config.js Normal file
View File

@ -0,0 +1,109 @@
// Global configuration file
// load the local configs
const config = require("./config_local.js");
// BigBlueButton configs
if (config.bbb == null) { config.bbb = {}; }
if (!config.bbb.sharedSecret) { config.bbb.sharedSecret = "sharedSecret"; }
if (!config.bbb.apiPath) { config.bbb.apiPath = "/bigbluebutton/api"; }
// Whether to use Auth2.0 or not, Auth2.0 sends the sharedSecret whithin an Authorization header as a bearer
// and data as JSON
if (config.bbb.auth2_0 == null) { config.bbb.auth2_0 = false; }
// Web server configs
if (!config.server) { config.server = {}; }
if (config.server.port == null) { config.server.port = 3005; }
// Web hooks configs
if (!config.hooks) { config.hooks = {}; }
if (!config.hooks.channels) {
config.hooks.channels = {
mainChannel: 'from-akka-apps-redis-channel',
rapChannel: 'bigbluebutton:from-rap',
chatChannel: 'from-akka-apps-chat-redis-channel'
}
}
// IP where permanent hook will post data (more than 1 URL means more than 1 permanent hook)
if (!config.hooks.permanentURLs) { config.hooks.permanentURLs = []; }
// How many messages will be enqueued to be processed at the same time
if (config.hooks.queueSize == null) { config.hooks.queueSize = 10000; }
// Allow permanent hooks to receive raw message, which is the message straight from BBB
if (config.hooks.getRaw == null) { config.hooks.getRaw = true; }
// If set to higher than 1, will send events on the format:
// "event=[{event1},{event2}],timestamp=000" or "[{event1},{event2}]" (based on using auth2_0 or not)
// when there are more than 1 event on the queue at the moment of processing the queue.
if (config.hooks.multiEvent == null) { config.hooks.multiEvent = 1; }
// Retry intervals for failed attempts for perform callback calls.
// In ms. Totals to around 5min.
config.hooks.retryIntervals = [
100, 500, 1000, 2000, 4000, 8000, 10000, 30000, 60000, 60000, 60000, 60000
];
// Reset permanent interval when exceeding maximum attemps
config.hooks.permanentURLsIntervalReset = 8;
// Mappings of internal to external meeting IDs
config.mappings = {};
config.mappings.cleanupInterval = 10000; // 10 secs, in ms
config.mappings.timeout = 1000*60*60*24; // 24 hours, in ms
// Redis
config.redis = {};
config.redis.keys = {};
config.redis.keys.hook = id => `bigbluebutton:webhooks:hook:${id}`;
config.redis.keys.hooks = "bigbluebutton:webhooks:hooks";
config.redis.keys.mappings = "bigbluebutton:webhooks:mappings";
config.redis.keys.mapping = id => `bigbluebutton:webhooks:mapping:${id}`;
config.redis.keys.events = id => `bigbluebutton:webhooks:events:${id}`;
config.redis.keys.userMaps = "bigbluebutton:webhooks:userMaps";
config.redis.keys.userMap = id => `bigbluebutton:webhooks:userMap:${id}`;
config.api = {};
config.api.responses = {};
config.api.responses.failure = (key, msg) =>
`<response> \
<returncode>FAILED</returncode> \
<messageKey>${key}</messageKey> \
<message>${msg}</message> \
</response>`
;
config.api.responses.checksumError =
config.api.responses.failure("checksumError", "You did not pass the checksum security check.");
config.api.responses.createSuccess = (id, permanent, getRaw) =>
`<response> \
<returncode>SUCCESS</returncode> \
<hookID>${id}</hookID> \
<permanentHook>${permanent}</permanentHook> \
<rawData>${getRaw}</rawData> \
</response>`
;
config.api.responses.createFailure =
config.api.responses.failure("createHookError", "An error happened while creating your hook. Check the logs.");
config.api.responses.createDuplicated = id =>
`<response> \
<returncode>SUCCESS</returncode> \
<hookID>${id}</hookID> \
<messageKey>duplicateWarning</messageKey> \
<message>There is already a hook for this callback URL.</message> \
</response>`
;
config.api.responses.destroySuccess =
`<response> \
<returncode>SUCCESS</returncode> \
<removed>true</removed> \
</response>`;
config.api.responses.destroyFailure =
config.api.responses.failure("destroyHookError", "An error happened while removing your hook. Check the logs.");
config.api.responses.destroyNoHook =
config.api.responses.failure("destroyMissingHook", "The hook informed was not found.");
config.api.responses.missingParamCallbackURL =
config.api.responses.failure("missingParamCallbackURL", "You must specify a callbackURL in the parameters.");
config.api.responses.missingParamHookID =
config.api.responses.failure("missingParamHookID", "You must specify a hookID in the parameters.");
module.exports = config;

View File

@ -1,33 +0,0 @@
# Local configuration file
config = {}
# Shared secret of your BigBlueButton server.
config.bbb = {}
config.bbb.sharedSecret = "33e06642a13942004fd83b3ba6e4104a"
# The port in which the API server will run.
config.server = {}
config.server.port = 3005
# Web hooks configs
config.hooks = {}
# Channels to subscribe to.
config.hooks.channels = {
mainChannel: 'from-akka-apps-redis-channel',
rapChannel: 'bigbluebutton:from-rap'
}
# Callbacks will be triggered for all the events in this list and only for these events.
# You only need to specify it if you want events that are not used by default or
# if you want to restrict the events used. See `config.coffee` for the default list.
#
# config.hooks.events = [
# { channel: config.hooks.channels.mainChannel, name: "MeetingCreatedEvtMsg" },
# { channel: config.hooks.channels.mainChannel, name: "MeetingEndedEvtMsg" },
# { channel: config.hooks.channels.mainChannel, name: "UserJoinedMeetingEvtMsg" },
# { channel: config.hooks.channels.mainChannel, name: "UserLeftMeetingEvtMsg" }
# ]
module.exports = config

View File

@ -0,0 +1,29 @@
// Local configuration file
const config = {};
// Shared secret of your BigBlueButton server.
config.bbb = {};
config.bbb.sharedSecret = "mysharedsecret";
// Whether to use Auth2.0 or not, Auth2.0 sends the sharedSecret whithin an Authorization header as a bearer
config.bbb.auth2_0 = false
// The port in which the API server will run.
config.server = {};
config.server.port = 3005;
// Callbacks will be triggered for all the events in this list and only for these events.
//config.hooks = {};
//config.hooks.channels = {
// mainChannel: 'from-akka-apps-redis-channel',
// rapChannel: 'bigbluebutton:from-rap',
// chatChannel: 'from-akka-apps-chat-redis-channel'
//}
// IP where permanent hook will post data (more than 1 URL means more than 1 permanent hook)
//config.hooks.permanentURLs = ["request.catcher.url", "another.request.catcher.url"]
// Allow global hook to receive all events with raw data
//config.hooks.getRaw = false;
module.exports = config;

View File

@ -2,8 +2,8 @@
// Uses the first meeting started after the application runs and will list all
// events, but only the first time they happen.
redis = require("redis");
const redis = require("redis");
const config = require('../config.js');
var target_meeting = null;
var events_printed = [];
var subscriber = redis.createClient();
@ -15,32 +15,25 @@ subscriber.on("psubscribe", function(channel, count) {
subscriber.on("pmessage", function(pattern, channel, message) {
try {
message = JSON.parse(message);
if (message !== null && message !== undefined && message.header !== undefined) {
if (message.hasOwnProperty('envelope')) {
var message_meeting_id = message.payload.meeting_id;
var message_name = message.header.name;
var message_name = message.envelope.name;
if (message_name === "meeting_created_message") {
if (target_meeting === null) {
target_meeting = message_meeting_id;
}
if (!containsOrAdd(events_printed, message_name)) {
console.log("\n###", message_name, "\n");
console.log(message);
console.log("\n");
}
if (target_meeting !== null && target_meeting === message_meeting_id) {
if (!containsOrAdd(events_printed, message_name)) {
console.log("\n###", message_name, "\n");
console.log(message);
console.log("\n");
}
}
}
} catch(e) {
console.log("error processing the message", message, ":", e);
}
});
subscriber.psubscribe("bigbluebutton:*");
for (let k in config.hooks.channels) {
const channel = config.hooks.channels[k];
subscriber.psubscribe(channel);
}
var containsOrAdd = function(list, value) {
for (i = 0; i <= list.length-1; i++) {

View File

@ -1,206 +0,0 @@
_ = require("lodash")
async = require("async")
redis = require("redis")
config = require("./config")
CallbackEmitter = require("./callback_emitter")
IDMapping = require("./id_mapping")
Logger = require("./logger")
# The database of hooks.
# Used always from memory, but saved to redis for persistence.
#
# Format:
# { id: Hook }
# Format on redis:
# * a SET "...:hooks" with all ids
# * a HASH "...:hook:<id>" for each hook with some of its attributes
db = {}
nextID = 1
# The representation of a hook and its properties. Stored in memory and persisted
# to redis.
# Hooks can be global, receiving callback calls for events from all meetings on the
# server, or for a specific meeting. If an `externalMeetingID` is set in the hook,
# it will only receive calls related to this meeting, otherwise it will be global.
# Events are kept in a queue to be sent in the order they are received.
# TODO: The queue should be cleared at some point. The hook is destroyed if too many
# callback attempts fail, after ~5min. So the queue is already protected in this case.
# But if the requests are going by but taking too long, the queue might be increasing
# faster than the callbacks are made.
module.exports = class Hook
constructor: ->
@id = null
@callbackURL = null
@externalMeetingID = null
@queue = []
@emitter = null
@redisClient = config.redis.client
save: (callback) ->
@redisClient.hmset config.redis.keys.hook(@id), @toRedis(), (error, reply) =>
Logger.error "Hook: error saving hook to redis!", error, reply if error?
@redisClient.sadd config.redis.keys.hooks, @id, (error, reply) =>
Logger.error "Hook: error saving hookID to the list of hooks!", error, reply if error?
db[@id] = this
callback?(error, db[@id])
destroy: (callback) ->
@redisClient.srem config.redis.keys.hooks, @id, (error, reply) =>
Logger.error "Hook: error removing hookID from the list of hooks!", error, reply if error?
@redisClient.del config.redis.keys.hook(@id), (error) =>
Logger.error "Hook: error removing hook from redis!", error if error?
if db[@id]
delete db[@id]
callback?(error, true)
else
callback?(error, false)
# Is this a global hook?
isGlobal: ->
not @externalMeetingID?
# The meeting from which this hook should receive events.
targetMeetingID: ->
@externalMeetingID
# Puts a new message in the queue. Will also trigger a processing in the queue so this
# message might be processed instantly.
enqueue: (message) ->
Logger.info "Hook: enqueueing message", JSON.stringify(message)
@queue.push message
@_processQueue()
toRedis: ->
r =
"hookID": @id,
"callbackURL": @callbackURL
r.externalMeetingID = @externalMeetingID if @externalMeetingID?
r
fromRedis: (redisData) ->
@id = parseInt(redisData.hookID)
@callbackURL = redisData.callbackURL
if redisData.externalMeetingID?
@externalMeetingID = redisData.externalMeetingID
else
@externalMeetingID = null
# Gets the first message in the queue and start an emitter to send it. Will only do it
# if there is no emitter running already and if there is a message in the queue.
_processQueue: ->
message = @queue[0]
return if not message? or @emitter?
@emitter = new CallbackEmitter(@callbackURL, message)
@emitter.start()
@emitter.on "success", =>
delete @emitter
@queue.shift() # pop the first message just sent
@_processQueue() # go to the next message
# gave up trying to perform the callback, remove the hook forever
@emitter.on "stopped", (error) =>
Logger.warn "Hook: too many failed attempts to perform a callback call, removing the hook for", @callbackURL
@destroy()
@addSubscription = (callbackURL, meetingID=null, callback) ->
hook = Hook.findByCallbackURLSync(callbackURL)
if hook?
callback?(new Error("There is already a subscription for this callback URL"), hook)
else
msg = "Hook: adding a hook with callback URL [#{callbackURL}]"
msg += " for the meeting [#{meetingID}]" if meetingID?
Logger.info msg
hook = new Hook()
hook.id = nextID++
hook.callbackURL = callbackURL
hook.externalMeetingID = meetingID
hook.save (error, hook) -> callback?(error, hook)
@removeSubscription = (hookID, callback) ->
hook = Hook.getSync(hookID)
if hook?
msg = "Hook: removing the hook with callback URL [#{hook.callbackURL}]"
msg += " for the meeting [#{hook.externalMeetingID}]" if hook.externalMeetingID?
Logger.info msg
hook.destroy (error, removed) -> callback?(error, removed)
else
callback?(null, false)
@countSync = ->
Object.keys(db).length
@getSync = (id) ->
db[id]
@firstSync = ->
keys = Object.keys(db)
if keys.length > 0
db[keys[0]]
else
null
@findByExternalMeetingIDSync = (externalMeetingID) ->
hooks = Hook.allSync()
_.filter(hooks, (hook) ->
(externalMeetingID? and externalMeetingID is hook.externalMeetingID)
)
@allGlobalSync = ->
hooks = Hook.allSync()
_.filter(hooks, (hook) -> hook.isGlobal())
@allSync = ->
arr = Object.keys(db).reduce((arr, id) ->
arr.push db[id]
arr
, [])
arr
@clearSync = ->
for id of db
delete db[id]
db = {}
@findByCallbackURLSync = (callbackURL) ->
for id of db
if db[id].callbackURL is callbackURL
return db[id]
@initialize = (callback) ->
Hook.resync(callback)
# Gets all hooks from redis to populate the local database.
# Calls `callback()` when done.
@resync = (callback) ->
client = config.redis.client
tasks = []
client.smembers config.redis.keys.hooks, (error, hooks) =>
Logger.error "Hook: error getting list of hooks from redis", error if error?
hooks.forEach (id) =>
tasks.push (done) =>
client.hgetall config.redis.keys.hook(id), (error, hookData) ->
Logger.error "Hook: error getting information for a hook from redis", error if error?
if hookData?
hook = new Hook()
hook.fromRedis(hookData)
hook.save (error, hook) ->
nextID = hook.id + 1 if hook.id >= nextID
done(null, hook)
else
done(null, null)
async.series tasks, (errors, result) ->
hooks = _.map(Hook.allSync(), (hook) -> "[#{hook.id}] #{hook.callbackURL}")
Logger.info "Hook: finished resync, hooks registered:", hooks
callback?()

322
bbb-webhooks/hook.js Normal file
View File

@ -0,0 +1,322 @@
const _ = require("lodash");
const async = require("async");
const redis = require("redis");
const config = require("./config.js");
const CallbackEmitter = require("./callback_emitter.js");
const IDMapping = require("./id_mapping.js");
const Logger = require("./logger.js");
// The database of hooks.
// Used always from memory, but saved to redis for persistence.
//
// Format:
// { id: Hook }
// Format on redis:
// * a SET "...:hooks" with all ids
// * a HASH "...:hook:<id>" for each hook with some of its attributes
let db = {};
let nextID = 1;
// The representation of a hook and its properties. Stored in memory and persisted
// to redis.
// Hooks can be global, receiving callback calls for events from all meetings on the
// server, or for a specific meeting. If an `externalMeetingID` is set in the hook,
// it will only receive calls related to this meeting, otherwise it will be global.
// Events are kept in a queue to be sent in the order they are received.
// But if the requests are going by but taking too long, the queue might be increasing
// faster than the callbacks are made. In this case the events will be concatenated
// and send up to 10 events in every post
module.exports = class Hook {
constructor() {
this.id = null;
this.callbackURL = null;
this.externalMeetingID = null;
this.queue = [];
this.emitter = null;
this.redisClient = config.redis.client;
this.permanent = false;
this.getRaw = false;
}
save(callback) {
this.redisClient.hmset(config.redis.keys.hook(this.id), this.toRedis(), (error, reply) => {
if (error != null) { Logger.error("[Hook] error saving hook to redis:", error, reply); }
this.redisClient.sadd(config.redis.keys.hooks, this.id, (error, reply) => {
if (error != null) { Logger.error("[Hook] error saving hookID to the list of hooks:", error, reply); }
db[this.id] = this;
(typeof callback === 'function' ? callback(error, db[this.id]) : undefined);
});
});
}
destroy(callback) {
this.redisClient.srem(config.redis.keys.hooks, this.id, (error, reply) => {
if (error != null) { Logger.error("[Hook] error removing hookID from the list of hooks:", error, reply); }
this.redisClient.del(config.redis.keys.hook(this.id), error => {
if (error != null) { Logger.error("[Hook] error removing hook from redis:", error); }
if (db[this.id]) {
delete db[this.id];
(typeof callback === 'function' ? callback(error, true) : undefined);
} else {
(typeof callback === 'function' ? callback(error, false) : undefined);
}
});
});
}
// Is this a global hook?
isGlobal() {
return (this.externalMeetingID == null);
}
// The meeting from which this hook should receive events.
targetMeetingID() {
return this.externalMeetingID;
}
// Puts a new message in the queue. Will also trigger a processing in the queue so this
// message might be processed instantly.
enqueue(message) {
this.redisClient.llen(config.redis.keys.events(this.id), (error, reply) => {
const length = reply;
if (length < config.hooks.queueSize && this.queue.length < config.hooks.queueSize) {
Logger.info(`[Hook] ${this.callbackURL} enqueueing message:`, JSON.stringify(message));
// Add message to redis queue
this.redisClient.rpush(config.redis.keys.events(this.id), JSON.stringify(message), (error,reply) => {
if (error != null) { Logger.error("[Hook] error pushing event to redis queue:", JSON.stringify(message), error); }
});
this.queue.push(JSON.stringify(message));
this._processQueue();
} else {
Logger.warn(`[Hook] ${this.callbackURL} queue size exceed, event:`, JSON.stringify(message));
}
});
}
toRedis() {
const r = {
"hookID": this.id,
"callbackURL": this.callbackURL,
"permanent": this.permanent,
"getRaw": this.getRaw
};
if (this.externalMeetingID != null) { r.externalMeetingID = this.externalMeetingID; }
return r;
}
fromRedis(redisData) {
this.id = parseInt(redisData.hookID);
this.callbackURL = redisData.callbackURL;
this.permanent = redisData.permanent.toLowerCase() == 'true';
this.getRaw = redisData.getRaw.toLowerCase() == 'true';
if (redisData.externalMeetingID != null) {
this.externalMeetingID = redisData.externalMeetingID;
} else {
this.externalMeetingID = null;
}
}
// Gets the first message in the queue and start an emitter to send it. Will only do it
// if there is no emitter running already and if there is a message in the queue.
_processQueue() {
// Will try to send up to a defined number of messages together if they're enqueued (defined on config.hooks.multiEvent)
const lengthIn = this.queue.length > config.hooks.multiEvent ? config.hooks.multiEvent : this.queue.length;
let num = lengthIn + 1;
// Concat messages
let message = this.queue.slice(0,lengthIn);
message = message.join(",");
if ((message == null) || (this.emitter != null) || (lengthIn <= 0)) { return; }
// Add params so emitter will 'know' when a hook is permanent and have backupURLs
this.emitter = new CallbackEmitter(this.callbackURL, message, this.permanent);
this.emitter.start();
this.emitter.on("success", () => {
delete this.emitter;
while ((num -= 1)) {
// Remove the sent message from redis
this.redisClient.lpop(config.redis.keys.events(this.id), (error, reply) => {
if (error != null) { return Logger.error("[Hook] error removing event from redis queue:", error); }
});
this.queue.shift();
} // pop the first message just sent
this._processQueue(); // go to the next message
});
// gave up trying to perform the callback, remove the hook forever if the hook's not permanent (emmiter will validate that)
return this.emitter.on("stopped", error => {
Logger.warn("[Hook] too many failed attempts to perform a callback call, removing the hook for:", this.callbackURL);
this.destroy();
});
}
static addSubscription(callbackURL, meetingID, getRaw, callback) {
let hook = Hook.findByCallbackURLSync(callbackURL);
if (hook != null) {
return (typeof callback === 'function' ? callback(new Error("There is already a subscription for this callback URL"), hook) : undefined);
} else {
let msg = `[Hook] adding a hook with callback URL: [${callbackURL}],`;
if (meetingID != null) { msg += ` for the meeting: [${meetingID}]`; }
Logger.info(msg);
hook = new Hook();
hook.callbackURL = callbackURL;
hook.externalMeetingID = meetingID;
hook.getRaw = getRaw;
hook.permanent = config.hooks.permanentURLs.some( url => {
return url === callbackURL
});
if (hook.permanent) {
hook.id = config.hooks.permanentURLs.indexOf(callbackURL) + 1;
nextID = config.hooks.permanentURLs.length + 1;
} else {
hook.id = nextID++;
}
// Sync permanent queue
if (hook.permanent) {
hook.redisClient.llen(config.redis.keys.events(hook.id), (error, len) => {
if (len > 0) {
const length = len;
hook.redisClient.lrange(config.redis.keys.events(hook.id), 0, len, (error, elements) => {
elements.forEach(element => {
hook.queue.push(element);
});
if (hook.queue.length > 0) { return hook._processQueue(); }
});
}
});
}
hook.save((error, hook) => { typeof callback === 'function' ? callback(error, hook) : undefined });
}
}
static removeSubscription(hookID, callback) {
let hook = Hook.getSync(hookID);
if (hook != null && !hook.permanent) {
let msg = `[Hook] removing the hook with callback URL: [${hook.callbackURL}],`;
if (hook.externalMeetingID != null) { msg += ` for the meeting: [${hook.externalMeetingID}]`; }
Logger.info(msg);
hook.destroy((error, removed) => { typeof callback === 'function' ? callback(error, removed) : undefined });
} else {
return (typeof callback === 'function' ? callback(null, false) : undefined);
}
}
static countSync() {
return Object.keys(db).length;
}
static getSync(id) {
return db[id];
}
static firstSync() {
const keys = Object.keys(db);
if (keys.length > 0) {
return db[keys[0]];
} else {
return null;
}
}
static findByExternalMeetingIDSync(externalMeetingID) {
const hooks = Hook.allSync();
return _.filter(hooks, hook => (externalMeetingID != null) && (externalMeetingID === hook.externalMeetingID));
}
static allGlobalSync() {
const hooks = Hook.allSync();
return _.filter(hooks, hook => hook.isGlobal());
}
static allSync() {
let arr = Object.keys(db).reduce(function(arr, id) {
arr.push(db[id]);
return arr;
}
, []);
return arr;
}
static clearSync() {
for (let id in db) {
delete db[id];
}
return db = {};
}
static findByCallbackURLSync(callbackURL) {
for (let id in db) {
if (db[id].callbackURL === callbackURL) {
return db[id];
}
}
}
static initialize(callback) {
Hook.resync(callback);
}
// Gets all hooks from redis to populate the local database.
// Calls `callback()` when done.
static resync(callback) {
let client = config.redis.client;
// Remove previous permanent hooks
for (let hk = 1; hk <= config.hooks.permanentURLs.length; hk++) {
client.srem(config.redis.keys.hooks, hk, (error, reply) => {
if (error != null) { Logger.error("[Hook] error removing previous permanent hook from list:", error); }
client.del(config.redis.keys.hook(hk), error => {
if (error != null) { Logger.error("[Hook] error removing previous permanent hook from redis:", error); }
});
});
}
let tasks = [];
client.smembers(config.redis.keys.hooks, (error, hooks) => {
if (error != null) { Logger.error("[Hook] error getting list of hooks from redis:", error); }
hooks.forEach(id => {
tasks.push(done => {
client.hgetall(config.redis.keys.hook(id), function(error, hookData) {
if (error != null) { Logger.error("[Hook] error getting information for a hook from redis:", error); }
if (hookData != null) {
let length;
let hook = new Hook();
hook.fromRedis(hookData);
// sync events queue
client.llen(config.redis.keys.events(hook.id), (error, len) => {
length = len;
client.lrange(config.redis.keys.events(hook.id), 0, len, (error, elements) => {
elements.forEach(element => {
hook.queue.push(element);
});
});
});
// Persist hook to redis
hook.save( (error, hook) => {
if (hook.id >= nextID) { nextID = hook.id + 1; }
if (hook.queue.length > 0) { hook._processQueue(); }
done(null, hook);
});
} else {
done(null, null);
}
});
});
});
async.series(tasks, function(errors, result) {
hooks = _.map(Hook.allSync(), hook => `[${hook.id}] ${hook.callbackURL}`);
Logger.info("[Hook] finished resync, hooks registered:", hooks);
(typeof callback === 'function' ? callback() : undefined);
});
});
}
};

View File

@ -1,163 +0,0 @@
_ = require("lodash")
async = require("async")
redis = require("redis")
config = require("./config")
Logger = require("./logger")
# The database of mappings. Uses the externalID as key because it changes less than
# the internal ID (e.g. the internalID can change for different meetings in the same
# room). Used always from memory, but saved to redis for persistence.
#
# Format:
# {
# externalMeetingID: {
# id: @id
# externalMeetingID: @exnternalMeetingID
# internalMeetingID: @internalMeetingID
# lastActivity: @lastActivity
# }
# }
# Format on redis:
# * a SET "...:mappings" with all ids (not meeting ids, the object id)
# * a HASH "...:mapping:<id>" for each mapping with all its attributes
db = {}
nextID = 1
# A simple model to store mappings for meeting IDs.
module.exports = class IDMapping
constructor: ->
@id = null
@externalMeetingID = null
@internalMeetingID = null
@lastActivity = null
@redisClient = config.redis.client
save: (callback) ->
@redisClient.hmset config.redis.keys.mapping(@id), @toRedis(), (error, reply) =>
Logger.error "Hook: error saving mapping to redis!", error, reply if error?
@redisClient.sadd config.redis.keys.mappings, @id, (error, reply) =>
Logger.error "Hook: error saving mapping ID to the list of mappings!", error, reply if error?
db[@externalMeetingID] = this
callback?(error, db[@externalMeetingID])
destroy: (callback) ->
@redisClient.srem config.redis.keys.mappings, @id, (error, reply) =>
Logger.error "Hook: error removing mapping ID from the list of mappings!", error, reply if error?
@redisClient.del config.redis.keys.mapping(@id), (error) =>
Logger.error "Hook: error removing mapping from redis!", error if error?
if db[@externalMeetingID]
delete db[@externalMeetingID]
callback?(error, true)
else
callback?(error, false)
toRedis: ->
r =
"id": @id,
"internalMeetingID": @internalMeetingID
"externalMeetingID": @externalMeetingID
"lastActivity": @lastActivity
r
fromRedis: (redisData) ->
@id = parseInt(redisData.id)
@externalMeetingID = redisData.externalMeetingID
@internalMeetingID = redisData.internalMeetingID
@lastActivity = redisData.lastActivity
print: ->
JSON.stringify(@toRedis())
@addOrUpdateMapping = (internalMeetingID, externalMeetingID, callback) ->
mapping = new IDMapping()
mapping.id = nextID++
mapping.internalMeetingID = internalMeetingID
mapping.externalMeetingID = externalMeetingID
mapping.lastActivity = new Date().getTime()
mapping.save (error, result) ->
Logger.info "IDMapping: added or changed meeting mapping to the list #{externalMeetingID}:", mapping.print()
callback?(error, result)
@removeMapping = (internalMeetingID, callback) ->
for external, mapping of db
if mapping.internalMeetingID is internalMeetingID
mapping.destroy (error, result) ->
Logger.info "IDMapping: removing meeting mapping from the list #{external}:", mapping.print()
callback?(error, result)
@getInternalMeetingID = (externalMeetingID) ->
db[externalMeetingID].internalMeetingID
@getExternalMeetingID = (internalMeetingID) ->
mapping = IDMapping.findByInternalMeetingID(internalMeetingID)
mapping?.externalMeetingID
@findByInternalMeetingID = (internalMeetingID) ->
if internalMeetingID?
for external, mapping of db
if mapping.internalMeetingID is internalMeetingID
return mapping
null
@allSync = ->
arr = Object.keys(db).reduce((arr, id) ->
arr.push db[id]
arr
, [])
arr
# Sets the last activity of the mapping for `internalMeetingID` to now.
@reportActivity = (internalMeetingID) ->
mapping = IDMapping.findByInternalMeetingID(internalMeetingID)
if mapping?
mapping.lastActivity = new Date().getTime()
mapping.save()
# Checks all current mappings for their last activity and removes the ones that
# are "expired", that had their last activity too long ago.
@cleanup = ->
now = new Date().getTime()
all = IDMapping.allSync()
toRemove = _.filter(all, (mapping) ->
mapping.lastActivity < now - config.mappings.timeout
)
unless _.isEmpty(toRemove)
Logger.info "IDMapping: expiring the mappings:", _.map(toRemove, (map) -> map.print())
toRemove.forEach (mapping) -> mapping.destroy()
# Initializes global methods for this model.
@initialize = (callback) ->
IDMapping.resync(callback)
IDMapping.cleanupInterval = setInterval(IDMapping.cleanup, config.mappings.cleanupInterval)
# Gets all mappings from redis to populate the local database.
# Calls `callback()` when done.
@resync = (callback) ->
client = config.redis.client
tasks = []
client.smembers config.redis.keys.mappings, (error, mappings) =>
Logger.error "Hook: error getting list of mappings from redis", error if error?
mappings.forEach (id) =>
tasks.push (done) =>
client.hgetall config.redis.keys.mapping(id), (error, mappingData) ->
Logger.error "Hook: error getting information for a mapping from redis", error if error?
if mappingData?
mapping = new IDMapping()
mapping.fromRedis(mappingData)
mapping.save (error, hook) ->
nextID = mapping.id + 1 if mapping.id >= nextID
done(null, mapping)
else
done(null, null)
async.series tasks, (errors, result) ->
mappings = _.map(IDMapping.allSync(), (m) -> m.print())
Logger.info "IDMapping: finished resync, mappings registered:", mappings
callback?()

215
bbb-webhooks/id_mapping.js Normal file
View File

@ -0,0 +1,215 @@
const _ = require("lodash");
const async = require("async");
const redis = require("redis");
const config = require("./config.js");
const Logger = require("./logger.js");
const UserMapping = require("./userMapping.js");
// The database of mappings. Uses the internal ID as key because it is unique
// unlike the external ID.
// Used always from memory, but saved to redis for persistence.
//
// Format:
// {
// internalMeetingID: {
// id: @id
// externalMeetingID: @externalMeetingID
// internalMeetingID: @internalMeetingID
// lastActivity: @lastActivity
// }
// }
// Format on redis:
// * a SET "...:mappings" with all ids (not meeting ids, the object id)
// * a HASH "...:mapping:<id>" for each mapping with all its attributes
const db = {};
let nextID = 1;
// A simple model to store mappings for meeting IDs.
module.exports = class IDMapping {
constructor() {
this.id = null;
this.externalMeetingID = null;
this.internalMeetingID = null;
this.lastActivity = null;
this.redisClient = config.redis.client;
}
save(callback) {
this.redisClient.hmset(config.redis.keys.mapping(this.id), this.toRedis(), (error, reply) => {
if (error != null) { Logger.error("[IDMapping] error saving mapping to redis:", error, reply); }
this.redisClient.sadd(config.redis.keys.mappings, this.id, (error, reply) => {
if (error != null) { Logger.error("[IDMapping] error saving mapping ID to the list of mappings:", error, reply); }
db[this.internalMeetingID] = this;
(typeof callback === 'function' ? callback(error, db[this.internalMeetingID]) : undefined);
});
});
}
destroy(callback) {
this.redisClient.srem(config.redis.keys.mappings, this.id, (error, reply) => {
if (error != null) { Logger.error("[IDMapping] error removing mapping ID from the list of mappings:", error, reply); }
this.redisClient.del(config.redis.keys.mapping(this.id), error => {
if (error != null) { Logger.error("[IDMapping] error removing mapping from redis:", error); }
if (db[this.internalMeetingID]) {
delete db[this.internalMeetingID];
(typeof callback === 'function' ? callback(error, true) : undefined);
} else {
(typeof callback === 'function' ? callback(error, false) : undefined);
}
});
});
}
toRedis() {
const r = {
"id": this.id,
"internalMeetingID": this.internalMeetingID,
"externalMeetingID": this.externalMeetingID,
"lastActivity": this.lastActivity
};
return r;
}
fromRedis(redisData) {
this.id = parseInt(redisData.id);
this.externalMeetingID = redisData.externalMeetingID;
this.internalMeetingID = redisData.internalMeetingID;
this.lastActivity = redisData.lastActivity;
}
print() {
return JSON.stringify(this.toRedis());
}
static addOrUpdateMapping(internalMeetingID, externalMeetingID, callback) {
let mapping = new IDMapping();
mapping.id = nextID++;
mapping.internalMeetingID = internalMeetingID;
mapping.externalMeetingID = externalMeetingID;
mapping.lastActivity = new Date().getTime();
mapping.save(function(error, result) {
Logger.info(`[IDMapping] added or changed meeting mapping to the list ${externalMeetingID}:`, mapping.print());
(typeof callback === 'function' ? callback(error, result) : undefined);
});
}
static removeMapping(internalMeetingID, callback) {
return (() => {
let result = [];
for (let internal in db) {
var mapping = db[internal];
if (mapping.internalMeetingID === internalMeetingID) {
result.push(mapping.destroy( (error, result) => {
Logger.info(`[IDMapping] removing meeting mapping from the list ${external}:`, mapping.print());
return (typeof callback === 'function' ? callback(error, result) : undefined);
}));
} else {
result.push(undefined);
}
}
return result;
})();
}
static getInternalMeetingID(externalMeetingID) {
const mapping = IDMapping.findByExternalMeetingID(externalMeetingID);
return (mapping != null ? mapping.internalMeetingID : undefined);
}
static getExternalMeetingID(internalMeetingID) {
if (db[internalMeetingID]){
return db[internalMeetingID].externalMeetingID;
}
}
static findByExternalMeetingID(externalMeetingID) {
if (externalMeetingID != null) {
for (let internal in db) {
const mapping = db[internal];
if (mapping.externalMeetingID === externalMeetingID) {
return mapping;
}
}
}
return null;
}
static allSync() {
let arr = Object.keys(db).reduce(function(arr, id) {
arr.push(db[id]);
return arr;
}
, []);
return arr;
}
// Sets the last activity of the mapping for `internalMeetingID` to now.
static reportActivity(internalMeetingID) {
let mapping = db[internalMeetingID];
if (mapping != null) {
mapping.lastActivity = new Date().getTime();
return mapping.save();
}
}
// Checks all current mappings for their last activity and removes the ones that
// are "expired", that had their last activity too long ago.
static cleanup() {
const now = new Date().getTime();
const all = IDMapping.allSync();
const toRemove = _.filter(all, mapping => mapping.lastActivity < (now - config.mappings.timeout));
if (!_.isEmpty(toRemove)) {
Logger.info("[IDMapping] expiring the mappings:", _.map(toRemove, map => map.print()));
toRemove.forEach(mapping => {
UserMapping.removeMappingMeetingId(mapping.internalMeetingID);
mapping.destroy()
});
}
}
// Initializes global methods for this model.
static initialize(callback) {
IDMapping.resync(callback);
IDMapping.cleanupInterval = setInterval(IDMapping.cleanup, config.mappings.cleanupInterval);
}
// Gets all mappings from redis to populate the local database.
// Calls `callback()` when done.
static resync(callback) {
let client = config.redis.client;
let tasks = [];
return client.smembers(config.redis.keys.mappings, (error, mappings) => {
if (error != null) { Logger.error("[IDMapping] error getting list of mappings from redis:", error); }
mappings.forEach(id => {
tasks.push(done => {
client.hgetall(config.redis.keys.mapping(id), function(error, mappingData) {
if (error != null) { Logger.error("[IDMapping] error getting information for a mapping from redis:", error); }
if (mappingData != null) {
let mapping = new IDMapping();
mapping.fromRedis(mappingData);
mapping.save(function(error, hook) {
if (mapping.id >= nextID) { nextID = mapping.id + 1; }
done(null, mapping);
});
} else {
done(null, null);
}
});
});
});
return async.series(tasks, function(errors, result) {
mappings = _.map(IDMapping.allSync(), m => m.print());
Logger.info("[IDMapping] finished resync, mappings registered:", mappings);
return (typeof callback === 'function' ? callback() : undefined);
});
});
}
};

View File

@ -1,10 +1,10 @@
winston = require("winston")
const winston = require("winston");
logger = new (winston.Logger)(
const logger = new (winston.Logger)({
transports: [
new (winston.transports.Console)({ timestamp: true, colorize: true }),
new (winston.transports.File)({ filename: "log/application.log", timestamp: true })
]
)
});
module.exports = logger
module.exports = logger;

View File

@ -0,0 +1,216 @@
const config = require("./config.js");
const Logger = require("./logger.js");
const IDMapping = require("./id_mapping.js");
const UserMapping = require("./userMapping.js");
module.exports = class MessageMapping {
constructor() {
this.mappedObject = {};
this.mappedMessage = {};
this.meetingEvents = ["MeetingCreatedEvtMsg","MeetingDestroyedEvtMsg"];
this.userEvents = ["UserJoinedMeetingEvtMsg","UserLeftMeetingEvtMsg","UserJoinedVoiceConfToClientEvtMsg","UserLeftVoiceConfToClientEvtMsg","PresenterAssignedEvtMsg", "PresenterUnassignedEvtMsg"];
this.chatEvents = ["SendPublicMessageEvtMsg","SendPrivateMessageEvtMsg"];
this.rapEvents = ["archive_started","archive_ended","sanity_started","sanity_ended","post_archive_started","post_archive_ended","process_started","process_ended","post_process_started","post_process_ended","publish_started","publish_ended","post_publish_started","post_publish_ended"];
}
// Map internal message based on it's type
mapMessage(messageObj) {
if (this.mappedEvent(messageObj,this.meetingEvents)) {
this.meetingTemplate(messageObj);
} else if (this.mappedEvent(messageObj,this.userEvents)) {
this.userTemplate(messageObj);
} else if (this.mappedEvent(messageObj,this.chatEvents)) {
this.chatTemplate(messageObj);
} else if (this.mappedEvent(messageObj,this.rapEvents)) {
this.rapTemplate(messageObj);
}
}
mappedEvent(messageObj,events) {
return events.some( event => {
if ((messageObj.header != null ? messageObj.header.name : undefined) === event) {
return true;
}
if ((messageObj.envelope != null ? messageObj.envelope.name : undefined) === event) {
return true;
}
return false;
});
}
// Map internal to external message for meeting information
meetingTemplate(messageObj) {
const props = messageObj.core.body.props;
this.mappedObject.data = {
"type": "event",
"id": this.mapInternalMessage(messageObj),
"attributes":{
"meeting":{
"internal-meeting-id": messageObj.core.body.meetingId,
"external-meeting-id": IDMapping.getExternalMeetingID(messageObj.core.body.meetingId)
}
},
"event":{
"ts": Date.now()
}
};
if (messageObj.envelope.name === "MeetingCreatedEvtMsg") {
this.mappedObject.data.attributes = {
"meeting":{
"internal-meeting-id": props.meetingProp.intId,
"external-meeting-id": props.meetingProp.extId,
"name": props.meetingProp.name,
"is-breakout": props.meetingProp.isBreakout,
"duration": props.durationProps.duration,
"create-time": props.durationProps.createdTime,
"create-date": props.durationProps.createdDate,
"moderator-pass": props.password.moderatorPass,
"viewer-pass": props.password.viewerPass,
"record": props.recordProp.record,
"voice-conf": props.voiceProp.voiceConf,
"dial-number": props.voiceProp.dialNumber,
"max-users": props.usersProp.maxUsers,
"metadata": props.metadataProp.metadata
}
};
}
this.mappedMessage = JSON.stringify(this.mappedObject);
Logger.info("[MessageMapping] Mapped message:", this.mappedMessage);
}
// Map internal to external message for user information
userTemplate(messageObj) {
const msgBody = messageObj.core.body;
const msgHeader = messageObj.core.header;
const extId = UserMapping.getExternalUserID(msgHeader.userId) ? UserMapping.getExternalUserID(msgHeader.userId) : msgBody.extId;
this.mappedObject.data = {
"type": "event",
"id": this.mapInternalMessage(messageObj),
"attributes":{
"meeting":{
"internal-meeting-id": messageObj.envelope.routing.meetingId,
"external-meeting-id": IDMapping.getExternalMeetingID(messageObj.envelope.routing.meetingId)
},
"user":{
"internal-user-id": msgHeader.userId,
"external-user-id": extId,
"sharing-mic": msgBody.muted,
"name": msgBody.name,
"role": msgBody.role,
"presenter": msgBody.presenter,
"stream": msgBody.stream,
"listening-only": msgBody.listenOnly
}
},
"event":{
"ts": Date.now()
}
};
this.mappedMessage = JSON.stringify(this.mappedObject);
Logger.info("[MessageMapping] Mapped message:", this.mappedMessage);
}
// Map internal to external message for chat information
chatTemplate(messageObj) {
const message = messageObj.core.body.message;
this.mappedObject.data = {
"type": "event",
"id": this.mapInternalMessage(messageObj),
"attributes":{
"meeting":{
"internal-meeting-id": messageObj.envelope.routing.meetingId,
"external-meeting-id": IDMapping.getExternalMeetingID(messageObj.envelope.routing.meetingId)
},
"chat-message":{
"message": message.message,
"sender":{
"internal-user-id": message.fromUserId,
"external-user-id": message.fromUsername,
"timezone-offset": message.fromTimezoneOffset,
"time": message.fromTime
}
}
},
"event":{
"ts": Date.now()
}
};
if (messageObj.envelope.name.indexOf("Private") !== -1) {
this.mappedObject.data.attributes["chat-message"].receiver = {
"internal-user-id": message.toUserId,
"external-user-id": message.toUsername
};
}
this.mappedMessage = JSON.stringify(this.mappedObject);
Logger.info("[MessageMapping] Mapped message:", this.mappedMessage);
}
rapTemplate(messageObj) {
data = messageObj.payload
this.mappedObject.data = {
"type": "event",
"id": this.mapInternalMessage(messageObj.header.name),
"attributes": {
"meeting": {
"internal-meeting-id": data.meeting_id,
"external-meeting-id": IDMapping.getExternalMeetingID(data.meeting_id)
},
"recording": {
"name": data.metadata.meetingName,
"isBreakout": data.metadata.isBreakout,
"startTime": data.startTime,
"endTime": data.endTime,
"size": data.playback.size,
"rawSize": data.rawSize,
"metadata": data.metadata,
"playback": data.playback,
"download": data.download
}
},
"event": {
"ts": messageObj.header.current_time
}
};
this.mappedMessage = JSON.stringify(this.mappedObject);
Logger.info("[MessageMapping] Mapped message:", this.mappedMessage);
}
mapInternalMessage(message) {
if (message.envelope) {
message = message.envelope.name
}
else if (message.header) {
message = message.header.name
}
const mappedMsg = (() => { switch (message) {
case "MeetingCreatedEvtMsg": return "meeting-created";
case "MeetingDestroyedEvtMsg": return "meeting-ended";
case "UserJoinedMeetingEvtMsg": return "user-joined";
case "UserLeftMeetingEvtMsg": return "user-left";
case "UserJoinedVoiceConfToClientEvtMsg": return "user-audio-voice-enabled";
case "UserLeftVoiceConfToClientEvtMsg": return "user-audio-voice-disabled";
case "UserBroadcastCamStartedEvtMsg": return "user-cam-broadcast-start";
case "UserBroadcastCamStoppedEvtMsg": return "user-cam-broadcast-end";
case "PresenterAssignedEvtMsg": return "user-presenter-assigned";
case "PresenterUnassignedEvtMsg": return "user-presenter-unassigned"
case "SendPublicMessageEvtMsg": return "chat-public-message-sent";
case "SendPrivateMessageEvtMsg": return "chat-private-message-sent";
case "archive_started": return "rap-archive-started";
case "archive_ended": return "rap-archive-ended";
case "sanity_started": return "rap-sanity-started";
case "sanity_ended": return "rap-sanity-ended";
case "post_archive_started": return "rap-post-archive-started";
case "post_archive_ended": return "rap-post-archive-ended";
case "process_started": return "rap-process-started";
case "process_ended": return "rap-process-ended";
case "post_process_started": return "rap-post-process-started";
case "post_process_ended": return "rap-post-process-ended";
case "publish_started": return "rap-publish-started";
case "publish_ended": return "rap-publish-ended";
case "post_publish_started": return "rap-post-publish-started";
case "post_publish_ended": return "rap-post-publish-ended";
} })();
return mappedMsg;
}
};

1256
bbb-webhooks/package-lock.json generated Normal file

File diff suppressed because it is too large Load Diff

View File

@ -13,12 +13,23 @@
"coffee-script": "1.8.0",
"express": "4.10.2",
"lodash": "2.4.1",
"nock": "^9.0.14",
"redis": "0.12.1",
"request": "2.47.0",
"sha1": "1.1.0",
"winston": "0.8.3"
"sinon": "^3.2.1",
"winston": "0.8.3",
"xmldom": "^0.1.27",
"xpath": "0.0.24"
},
"engines": {
"node": "0.10.26"
"node": "8.4.0"
},
"devDependencies": {
"mocha": "^3.5.0",
"supertest": "^3.0.0"
},
"scripts": {
"test": "mocha"
}
}

View File

@ -0,0 +1,46 @@
const helpers = {};
helpers.url = 'http://10.0.3.179'; //serverUrl
helpers.port = ':3005'
helpers.callback = 'http://we2bh.requestcatcher.com'
helpers.callbackURL = '?callbackURL=' + helpers.callback
helpers.apiPath = '/bigbluebutton/api/hooks/'
helpers.createUrl = helpers.port + helpers.apiPath + 'create/' + helpers.callbackURL
helpers.destroyUrl = (id) => { return helpers.port + helpers.apiPath + 'destroy/' + '?hookID=' + id }
helpers.destroyPermanent = helpers.port + helpers.apiPath + 'destroy/' + '?hookID=1'
helpers.createRaw = '&getRaw=true'
helpers.listUrl = 'list/'
helpers.rawMessage = {
envelope: {
name: 'PresenterAssignedEvtMsg',
routing: {
msgType: 'BROADCAST_TO_MEETING',
meetingId: 'a674bb9c6ff92bfa6d5a0a1e530fabb56023932e-1509387833678',
userId: 'w_ysgy0erqgayc'
}
},
core: {
header: {
name: 'PresenterAssignedEvtMsg',
meetingId: 'a674bb9c6ff92bfa6d5a0a1e530fabb56023932e-1509387833678',
userId: 'w_ysgy0erqgayc'
},
body: {
presenterId: 'w_ysgy0erqgayc',
presenterName: 'User 4125097',
assignedBy: 'w_vlnwu1wkhena'
}
}
};
helpers.flushall = (rClient) => {
let client = rClient;
client.flushdb()
}
helpers.flushredis = (hook) => {
hook.redisClient.flushdb();
}
module.exports = helpers;

View File

@ -0,0 +1 @@
--timeout 5000

301
bbb-webhooks/test/test.js Normal file
View File

@ -0,0 +1,301 @@
const request = require('supertest');
const nock = require("nock");
const Application = require('../application.js');
const Logger = require('../logger.js');
const utils = require('../utils.js');
const config = require('../config.js');
const Hook = require('../hook.js');
const Helpers = require('./helpers.js')
const sinon = require('sinon');
const winston = require('winston');
// Block winston from logging
Logger.remove(winston.transports.Console);
describe('bbb-webhooks tests', () => {
before( (done) => {
config.hooks.queueSize = 10;
config.hooks.permanentURLs = ["http://wh.requestcatcher.com"];
application = new Application();
application.start( () => {
done();
});
});
beforeEach( (done) => {
hooks = Hook.allGlobalSync();
Helpers.flushall(config.redis.client);
hooks.forEach( hook => {
Helpers.flushredis(hook);
})
done();
})
after( () => {
hooks = Hook.allGlobalSync();
Helpers.flushall(config.redis.client);
hooks.forEach( hook => {
Helpers.flushredis(hook);
})
});
describe('GET /hooks/list permanent', () => {
it('should list permanent hook', (done) => {
let getUrl = utils.checksumAPI(Helpers.url + Helpers.listUrl, config.bbb.sharedSecret);
getUrl = Helpers.listUrl + '?checksum=' + getUrl
request(Helpers.url)
.get(getUrl)
.expect('Content-Type', /text\/xml/)
.expect(200, (res) => {
const hooks = Hook.allGlobalSync();
if (hooks && hooks.some( hook => { return hook.permanent }) ) {
done();
}
else {
done(new Error ("permanent hook was not created"));
}
})
})
});
describe('GET /hooks/create', () => {
after( (done) => {
const hooks = Hook.allGlobalSync();
Hook.removeSubscription(hooks[hooks.length-1].id, () => { done(); });
});
it('should create a hook', (done) => {
let getUrl = utils.checksumAPI(Helpers.url + Helpers.createUrl, config.bbb.sharedSecret);
getUrl = Helpers.createUrl + '&checksum=' + getUrl
request(Helpers.url)
.get(getUrl)
.expect('Content-Type', /text\/xml/)
.expect(200, (res) => {
const hooks = Hook.allGlobalSync();
if (hooks && hooks.some( hook => { return !hook.permanent }) ) {
done();
}
else {
done(new Error ("hook was not created"));
}
})
})
});
describe('GET /hooks/destroy', () => {
before( (done) => {
Hook.addSubscription(Helpers.callback,null,false,() => { done(); });
});
it('should destroy a hook', (done) => {
const hooks = Hook.allGlobalSync();
const hook = hooks[hooks.length-1].id;
let getUrl = utils.checksumAPI(Helpers.url + Helpers.destroyUrl(hook), config.bbb.sharedSecret);
getUrl = Helpers.destroyUrl(hook) + '&checksum=' + getUrl
request(Helpers.url)
.get(getUrl)
.expect('Content-Type', /text\/xml/)
.expect(200, (res) => {
const hooks = Hook.allGlobalSync();
if(hooks && hooks.every( hook => { return hook.callbackURL != Helpers.callback }))
done();
})
})
});
describe('GET /hooks/destroy permanent hook', () => {
it('should not destroy the permanent hook', (done) => {
let getUrl = utils.checksumAPI(Helpers.url + Helpers.destroyPermanent, config.bbb.sharedSecret);
getUrl = Helpers.destroyPermanent + '&checksum=' + getUrl
request(Helpers.url)
.get(getUrl)
.expect('Content-Type', /text\/xml/)
.expect(200, (res) => {
const hooks = Hook.allGlobalSync();
if (hooks && hooks[0].callbackURL == config.hooks.permanentURLs[0]) {
done();
}
else {
done(new Error("should not delete permanent"));
}
})
})
});
describe('GET /hooks/create getRaw hook', () => {
after( (done) => {
const hooks = Hook.allGlobalSync();
Hook.removeSubscription(hooks[hooks.length-1].id, () => { done(); });
});
it('should create a hook with getRaw=true', (done) => {
let getUrl = utils.checksumAPI(Helpers.url + Helpers.createUrl + Helpers.createRaw, config.bbb.sharedSecret);
getUrl = Helpers.createUrl + '&checksum=' + getUrl + Helpers.createRaw
request(Helpers.url)
.get(getUrl)
.expect('Content-Type', /text\/xml/)
.expect(200, (res) => {
const hooks = Hook.allGlobalSync();
if (hooks && hooks.some( (hook) => { return hook.getRaw })) {
done();
}
else {
done(new Error("getRaw hook was not created"))
}
})
})
});
describe('Hook queues', () => {
before( () => {
config.redis.pubSubClient.psubscribe("test-channel");
Hook.addSubscription(Helpers.callback,null,false, (err,reply) => {
const hooks = Hook.allGlobalSync();
const hook = hooks[0];
const hook2 = hooks[hooks.length -1];
sinon.stub(hook, '_processQueue');
sinon.stub(hook2, '_processQueue');
});
});
after( () => {
const hooks = Hook.allGlobalSync();
const hook = hooks[0];
const hook2 = hooks[hooks.length -1];
hook._processQueue.restore();
hook2._processQueue.restore();
Hook.removeSubscription(hooks[hooks.length-1].id);
config.redis.pubSubClient.unsubscribe("test-channel");
});
it('should have different queues for each hook', (done) => {
config.redis.client.publish("test-channel", JSON.stringify(Helpers.rawMessage));
const hooks = Hook.allGlobalSync();
if (hooks && hooks[0].queue != hooks[hooks.length-1].queue) {
done();
}
else {
done(new Error("hooks using same queue"))
}
})
});
// reduce queue size, fill queue with requests, try to add another one, if queue does not exceed, OK
describe('Hook queues', () => {
before( () => {
const hooks = Hook.allGlobalSync();
const hook = hooks[0];
sinon.stub(hook, '_processQueue');
});
after( () => {
const hooks = Hook.allGlobalSync();
const hook = hooks[0];
hook._processQueue.restore();
Helpers.flushredis(hook);
})
it('should limit queue size to defined in config', (done) => {
let hook = Hook.allGlobalSync();
hook = hook[0];
for(i=0;i<=9;i++) { hook.enqueue("message" + i); }
if (hook && hook.queue.length <= config.hooks.queueSize) {
done();
}
else {
done(new Error("hooks exceeded max queue size"))
}
})
});
describe('/POST mapped message', () => {
before( () => {
config.redis.pubSubClient.psubscribe("test-channel");
const hooks = Hook.allGlobalSync();
const hook = hooks[0];
hook.queue = [];
Helpers.flushredis(hook);
});
after( () => {
const hooks = Hook.allGlobalSync();
const hook = hooks[0];
Helpers.flushredis(hook);
config.redis.pubSubClient.unsubscribe("test-channel");
})
it('should post mapped message ', (done) => {
const hooks = Hook.allGlobalSync();
const hook = hooks[0];
const getpost = nock(config.hooks.permanentURLs[0])
.filteringRequestBody( (body) => {
let parsed = JSON.parse(body)
return parsed[0].data.id ? "mapped" : "not mapped";
})
.post("/", "mapped")
.reply(200, (res) => {
done();
});
config.redis.client.publish("test-channel", JSON.stringify(Helpers.rawMessage));
})
});
describe('/POST raw message', () => {
before( () => {
config.redis.pubSubClient.psubscribe("test-channel");
Hook.addSubscription(Helpers.callback,null,true, (err,hook) => {
Helpers.flushredis(hook);
})
});
after( () => {
const hooks = Hook.allGlobalSync();
Hook.removeSubscription(hooks[hooks.length-1].id);
Helpers.flushredis(hooks[hooks.length-1]);
config.redis.pubSubClient.unsubscribe("test-channel");
});
it('should post raw message ', (done) => {
const hooks = Hook.allGlobalSync();
const hook = hooks[0];
const getpost = nock(Helpers.callback)
.filteringRequestBody( (body) => {
if (body.indexOf("PresenterAssignedEvtMsg")) {
return "raw message";
}
else { return "not raw"; }
})
.post("/", "raw message")
.reply(200, () => {
done();
});
const permanent = nock(config.hooks.permanentURLs[0])
.post("/")
.reply(200)
config.redis.client.publish("test-channel", JSON.stringify(Helpers.rawMessage));
})
});
describe('/POST multi message', () => {
before( () =>{
const hooks = Hook.allGlobalSync();
const hook = hooks[0];
Helpers.flushredis(hook);
hook.queue = ["multiMessage1"];
});
it('should post multi message ', (done) => {
const hooks = Hook.allGlobalSync();
const hook = hooks[0];
hook.enqueue("multiMessage2")
const getpost = nock(config.hooks.permanentURLs[0])
.filteringPath( (path) => {
return path.split('?')[0];
})
.filteringRequestBody( (body) => {
if (body.indexOf("multiMessage1") != -1 && body.indexOf("multiMessage2") != -1) {
return "multiMess"
}
else {
return "not multi"
}
})
.post("/", "multiMess")
.reply(200, (res) => {
done();
});
})
});
});

189
bbb-webhooks/userMapping.js Normal file
View File

@ -0,0 +1,189 @@
const _ = require("lodash");
const async = require("async");
const redis = require("redis");
const config = require("./config.js");
const Logger = require("./logger.js");
// The database of mappings. Uses the internal ID as key because it is unique
// unlike the external ID.
// Used always from memory, but saved to redis for persistence.
//
// Format:
// {
// internalMeetingID: {
// id: @id
// externalMeetingID: @externalMeetingID
// internalMeetingID: @internalMeetingID
// lastActivity: @lastActivity
// }
// }
// Format on redis:
// * a SET "...:mappings" with all ids (not meeting ids, the object id)
// * a HASH "...:mapping:<id>" for each mapping with all its attributes
const db = {};
let nextID = 1;
// A simple model to store mappings for user extIDs.
module.exports = class UserMapping {
constructor() {
this.id = null;
this.externalUserID = null;
this.internalUserID = null;
this.meetingId = null;
this.redisClient = config.redis.client;
}
save(callback) {
this.redisClient.hmset(config.redis.keys.userMap(this.id), this.toRedis(), (error, reply) => {
if (error != null) { Logger.error("[UserMapping] error saving mapping to redis:", error, reply); }
this.redisClient.sadd(config.redis.keys.userMaps, this.id, (error, reply) => {
if (error != null) { Logger.error("[UserMapping] error saving mapping ID to the list of mappings:", error, reply); }
db[this.internalUserID] = this;
(typeof callback === 'function' ? callback(error, db[this.internalUserID]) : undefined);
});
});
}
destroy(callback) {
this.redisClient.srem(config.redis.keys.userMaps, this.id, (error, reply) => {
if (error != null) { Logger.error("[UserMapping] error removing mapping ID from the list of mappings:", error, reply); }
this.redisClient.del(config.redis.keys.userMap(this.id), error => {
if (error != null) { Logger.error("[UserMapping] error removing mapping from redis:", error); }
if (db[this.internalUserID]) {
delete db[this.internalUserID];
(typeof callback === 'function' ? callback(error, true) : undefined);
} else {
(typeof callback === 'function' ? callback(error, false) : undefined);
}
});
});
}
toRedis() {
const r = {
"id": this.id,
"internalUserID": this.internalUserID,
"externalUserID": this.externalUserID,
"meetingId": this.meetingId
};
return r;
}
fromRedis(redisData) {
this.id = parseInt(redisData.id);
this.externalUserID = redisData.externalUserID;
this.internalUserID = redisData.internalUserID;
this.meetingId = redisData.meetingId;
}
print() {
return JSON.stringify(this.toRedis());
}
static addMapping(internalUserID, externalUserID, meetingId, callback) {
let mapping = new UserMapping();
mapping.id = nextID++;
mapping.internalUserID = internalUserID;
mapping.externalUserID = externalUserID;
mapping.meetingId = meetingId;
mapping.save(function(error, result) {
Logger.info(`[UserMapping] added user mapping to the list ${internalUserID}:`, mapping.print());
(typeof callback === 'function' ? callback(error, result) : undefined);
});
}
static removeMapping(internalUserID, callback) {
return (() => {
let result = [];
for (let internal in db) {
var mapping = db[internal];
if (mapping.internalUserID === internalUserID) {
result.push(mapping.destroy( (error, result) => {
Logger.info(`[UserMapping] removing user mapping from the list ${internalUserID}:`, mapping.print());
return (typeof callback === 'function' ? callback(error, result) : undefined);
}));
} else {
result.push(undefined);
}
}
return result;
})();
}
static removeMappingMeetingId(meetingId, callback) {
return (() => {
let result = [];
for (let internal in db) {
var mapping = db[internal];
if (mapping.meetingId === meetingId) {
result.push(mapping.destroy( (error, result) => {
Logger.info(`[UserMapping] removing user mapping from the list ${mapping.internalUserID}:`, mapping.print());
}));
} else {
result.push(undefined);
}
}
return (typeof callback === 'function' ? callback() : undefined);
})();
}
static getExternalUserID(internalUserID) {
if (db[internalUserID]){
return db[internalUserID].externalUserID;
}
}
static allSync() {
let arr = Object.keys(db).reduce(function(arr, id) {
arr.push(db[id]);
return arr;
}
, []);
return arr;
}
// Initializes global methods for this model.
static initialize(callback) {
UserMapping.resync(callback);
}
// Gets all mappings from redis to populate the local database.
// Calls `callback()` when done.
static resync(callback) {
let client = config.redis.client;
let tasks = [];
return client.smembers(config.redis.keys.userMaps, (error, mappings) => {
if (error != null) { Logger.error("[UserMapping] error getting list of mappings from redis:", error); }
mappings.forEach(id => {
tasks.push(done => {
client.hgetall(config.redis.keys.userMap(id), function(error, mappingData) {
if (error != null) { Logger.error("[UserMapping] error getting information for a mapping from redis:", error); }
if (mappingData != null) {
let mapping = new UserMapping();
mapping.fromRedis(mappingData);
mapping.save(function(error, hook) {
if (mapping.id >= nextID) { nextID = mapping.id + 1; }
done(null, mapping);
});
} else {
done(null, null);
}
});
});
});
return async.series(tasks, function(errors, result) {
mappings = _.map(UserMapping.allSync(), m => m.print());
Logger.info("[UserMapping] finished resync, mappings registered:", mappings);
return (typeof callback === 'function' ? callback() : undefined);
});
});
}
};

View File

@ -1,62 +0,0 @@
sha1 = require("sha1")
url = require("url")
config = require("./config")
Utils = exports
# Calculates the checksum given a url `fullUrl` and a `salt`, as calculate by bbb-web.
Utils.checksumAPI = (fullUrl, salt) ->
query = Utils.queryFromUrl(fullUrl)
method = Utils.methodFromUrl(fullUrl)
Utils.checksum(method + query + salt)
# Calculates the checksum for a string.
# Just a wrapper for the method that actually does it.
Utils.checksum = (string) ->
sha1(string)
# Get the query of an API call from the url object (from url.parse())
# Example:
#
# * `fullUrl` = `http://bigbluebutton.org/bigbluebutton/api/create?name=Demo+Meeting&meetingID=Demo`
# * returns: `name=Demo+Meeting&meetingID=Demo`
Utils.queryFromUrl = (fullUrl) ->
# Returns the query without the checksum.
# We can't use url.parse() because it would change the encoding
# and the checksum wouldn't match. We need the url exactly as
# the client sent us.
query = fullUrl.replace(/&checksum=[^&]*/, '')
query = query.replace(/checksum=[^&]*&/, '')
query = query.replace(/checksum=[^&]*$/, '')
matched = query.match(/\?(.*)/)
if matched?
matched[1]
else
''
# Get the method name of an API call from the url object (from url.parse())
# Example:
#
# * `fullUrl` = `http://mconf.org/bigbluebutton/api/create?name=Demo+Meeting&meetingID=Demo`
# * returns: `create`
Utils.methodFromUrl = (fullUrl) ->
urlObj = url.parse(fullUrl, true)
urlObj.pathname.substr (config.bbb.apiPath + "/").length
# Returns the IP address of the client that made a request `req`.
# If can not determine the IP, returns `127.0.0.1`.
Utils.ipFromRequest = (req) ->
# the first ip in the list if the ip of the client
# the others are proxys between him and us
if req.headers?["x-forwarded-for"]?
ips = req.headers["x-forwarded-for"].split(",")
ipAddress = ips[0]?.trim()
# fallbacks
ipAddress ||= req.headers?["x-real-ip"] # when behind nginx
ipAddress ||= req.connection?.remoteAddress
ipAddress ||= "127.0.0.1"
ipAddress

68
bbb-webhooks/utils.js Normal file
View File

@ -0,0 +1,68 @@
const sha1 = require("sha1");
const url = require("url");
const config = require("./config");
const Utils = exports;
// Calculates the checksum given a url `fullUrl` and a `salt`, as calculate by bbb-web.
Utils.checksumAPI = function(fullUrl, salt) {
const query = Utils.queryFromUrl(fullUrl);
const method = Utils.methodFromUrl(fullUrl);
return Utils.checksum(method + query + salt);
};
// Calculates the checksum for a string.
// Just a wrapper for the method that actually does it.
Utils.checksum = string => sha1(string);
// Get the query of an API call from the url object (from url.parse())
// Example:
//
// * `fullUrl` = `http://bigbluebutton.org/bigbluebutton/api/create?name=Demo+Meeting&meetingID=Demo`
// * returns: `name=Demo+Meeting&meetingID=Demo`
Utils.queryFromUrl = function(fullUrl) {
// Returns the query without the checksum.
// We can't use url.parse() because it would change the encoding
// and the checksum wouldn't match. We need the url exactly as
// the client sent us.
let query = fullUrl.replace(/&checksum=[^&]*/, '');
query = query.replace(/checksum=[^&]*&/, '');
query = query.replace(/checksum=[^&]*$/, '');
const matched = query.match(/\?(.*)/);
if (matched != null) {
return matched[1];
} else {
return '';
}
};
// Get the method name of an API call from the url object (from url.parse())
// Example:
//
// * `fullUrl` = `http://mconf.org/bigbluebutton/api/create?name=Demo+Meeting&meetingID=Demo`
// * returns: `create`
Utils.methodFromUrl = function(fullUrl) {
const urlObj = url.parse(fullUrl, true);
return urlObj.pathname.substr((config.bbb.apiPath + "/").length);
};
// Returns the IP address of the client that made a request `req`.
// If can not determine the IP, returns `127.0.0.1`.
Utils.ipFromRequest = function(req) {
// the first ip in the list if the ip of the client
// the others are proxys between him and us
let ipAddress;
if ((req.headers != null ? req.headers["x-forwarded-for"] : undefined) != null) {
let ips = req.headers["x-forwarded-for"].split(",");
ipAddress = ips[0] != null ? ips[0].trim() : undefined;
}
// fallbacks
if (!ipAddress) { ipAddress = req.headers != null ? req.headers["x-real-ip"] : undefined; } // when behind nginx
if (!ipAddress) { ipAddress = req.connection != null ? req.connection.remoteAddress : undefined; }
if (!ipAddress) { ipAddress = "127.0.0.1"; }
return ipAddress;
};

View File

@ -1,117 +0,0 @@
_ = require("lodash")
async = require("async")
redis = require("redis")
request = require("request")
config = require("./config")
Hook = require("./hook")
IDMapping = require("./id_mapping")
Logger = require("./logger")
# Web hooks will listen for events on redis coming from BigBlueButton and
# perform HTTP calls with them to all registered hooks.
module.exports = class WebHooks
constructor: ->
@subscriberEvents = config.redis.pubSubClient
start: ->
@_subscribeToEvents()
# Subscribe to the events on pubsub that might need to be sent in callback calls.
_subscribeToEvents: ->
@subscriberEvents.on "psubscribe", (channel, count) ->
Logger.info "WebHooks: subscribed to " + channel
@subscriberEvents.on "pmessage", (pattern, channel, message) =>
processMessage = =>
if @_filterMessage(channel, message)
Logger.info "WebHooks: processing message on [#{channel}]:", JSON.stringify(message)
@_processEvent(message)
try
message = JSON.parse(message)
if message?
id = @_findMeetingId(message)
IDMapping.reportActivity(id)
# First treat meeting events to add/remove ID mappings
if message.envelope?.name is "MeetingCreatedEvtMsg"
Logger.info "WebHooks: got create message on meetings channel [#{channel}]", message
meetingProp = message.core?.body?.props?.meetingProp
if meetingProp
IDMapping.addOrUpdateMapping meetingProp.intId, meetingProp.extId, (error, result) ->
# has to be here, after the meeting was created, otherwise create calls won't generate
# callback calls for meeting hooks
processMessage()
# TODO: Temporarily commented because we still need the mapping for recording events,
# after the meeting ended.
# else if message.envelope?.name is "MeetingEndedEvtMessage"
# Logger.info "WebHooks: got destroy message on meetings channel [#{channel}]", message
# IDMapping.removeMapping @_findMeetingId(message), (error, result) ->
# processMessage()
else
processMessage()
catch e
Logger.error "WebHooks: error processing the message", message, ":", e
# Subscribe to the neccesary channels.
for k, channel of config.hooks.channels
@subscriberEvents.psubscribe channel
# Returns whether the message read from redis should generate a callback
# call or not.
_filterMessage: (channel, message) ->
messageName = @_messageNameFromChannel(channel, message)
for event in config.hooks.events
if channel? and messageName? and
event.channel.match(channel) and event.name.match(messageName)
return true
false
# BigBlueButton 2.0 changed where the message name is located, but it didn't
# change for the Record and Playback events. Thus, we need to handle both.
_messageNameFromChannel: (channel, message) ->
if channel == 'bigbluebutton:from-rap'
return message.header?.name
message.envelope?.name
# Find the meetingId in the message.
# This is neccesary because the new message in BigBlueButton 2.0 have
# their meetingId in different locations.
_findMeetingId: (message) ->
# Various 2.0 meetingId locations.
return message.envelope.routing.meetingId if message.envelope?.routing?.meetingId?
return message.header.body.meetingId if message.header?.body?.meetingId?
return message.core.body.meetingId if message.core?.body?.meetingId?
return message.core.body.props.meetingProp.intId if message.core?.body?.props?.meetingProp?.intId?
# Record and Playback 1.1 meeting_id location.
return message.payload.meeting_id if message.payload?.meeting_id?
return undefined
# Processes an event received from redis. Will get all hook URLs that
# should receive this event and start the process to perform the callback.
_processEvent: (message) ->
hooks = Hook.allGlobalSync()
# TODO: events that happen after the meeting ended will never trigger the hooks
# below, since the mapping is removed when the meeting ends
# filter the hooks that need to receive this event
# only global hooks or hooks for this specific meeting
# All the messages have the meetingId in different locations now.
# Depending on the event, it could appear within header, core or envelope.
# It always appears in atleast one, so we just need to search for it.
idFromMessage = @_findMeetingId(message)
if idFromMessage?
eMeetingID = IDMapping.getExternalMeetingID(idFromMessage)
hooks = hooks.concat(Hook.findByExternalMeetingIDSync(eMeetingID))
hooks.forEach (hook) ->
Logger.info "WebHooks: enqueueing a message in the hook:", hook.callbackURL
hook.enqueue message

151
bbb-webhooks/web_hooks.js Normal file
View File

@ -0,0 +1,151 @@
const _ = require("lodash");
const async = require("async");
const redis = require("redis");
const request = require("request");
const config = require("./config.js");
const Hook = require("./hook.js");
const IDMapping = require("./id_mapping.js");
const Logger = require("./logger.js");
const MessageMapping = require("./messageMapping.js");
const UserMapping = require("./userMapping.js");
// Web hooks will listen for events on redis coming from BigBlueButton and
// perform HTTP calls with them to all registered hooks.
module.exports = class WebHooks {
constructor() {
this.subscriberEvents = config.redis.pubSubClient;
}
start(callback) {
this._subscribeToEvents();
typeof callback === 'function' ? callback(null,"w") : undefined;
}
// Subscribe to the events on pubsub that might need to be sent in callback calls.
_subscribeToEvents() {
this.subscriberEvents.on("psubscribe", (channel, count) => Logger.info(`[WebHooks] subscribed to:${channel}`));
this.subscriberEvents.on("pmessage", (pattern, channel, message) => {
let raw;
const processMessage = () => {
Logger.info(`[WebHooks] processing message on [${channel}]:`, JSON.stringify(message));
this._processEvent(message, raw);
};
try {
raw = JSON.parse(message);
let messageMapped = new MessageMapping();
messageMapped.mapMessage(JSON.parse(message));
message = messageMapped.mappedObject;
if (!_.isEmpty(message) && !config.hooks.getRaw) {
const intId = message.data.attributes.meeting["internal-meeting-id"];
IDMapping.reportActivity(intId);
// First treat meeting events to add/remove ID mappings
switch (message.data.id) {
case "meeting-created":
Logger.info(`[WebHooks] got create message on meetings channel [${channel}]:`, message);
IDMapping.addOrUpdateMapping(intId, message.data.attributes.meeting["external-meeting-id"], (error, result) => {
// has to be here, after the meeting was created, otherwise create calls won't generate
// callback calls for meeting hooks
processMessage();
});
break;
case "user-joined":
UserMapping.addMapping(message.data.attributes.user["internal-user-id"],message.data.attributes.user["external-user-id"], intId, () => {
processMessage();
});
break;
case "user-left":
UserMapping.removeMapping(message.data.attributes.user["internal-user-id"], () => { processMessage(); });
break;
case "meeting-ended":
UserMapping.removeMappingMeetingId(intId, () => { processMessage(); });
break;
default:
processMessage();
}
} else {
this._processRaw(raw);
}
} catch (e) {
Logger.error("[WebHooks] error processing the message:", JSON.stringify(raw), ":", e);
}
});
for (let k in config.hooks.channels) {
const channel = config.hooks.channels[k];
this.subscriberEvents.psubscribe(channel);
}
}
// Send raw data to hooks that are not expecting mapped messages
_processRaw(message) {
let idFromMessage;
let hooks = Hook.allGlobalSync();
// Add hooks for the specific meeting that expect raw data
// Get meetingId for a raw message that was previously mapped by another webhook application or if it's straight from redis
idFromMessage = this._findMeetingID(message);
if (idFromMessage != null) {
const eMeetingID = IDMapping.getExternalMeetingID(idFromMessage);
hooks = hooks.concat(Hook.findByExternalMeetingIDSync(eMeetingID));
// Notify the hooks that expect raw data
async.forEach(hooks, (hook) => {
if (hook.getRaw) {
Logger.info("[WebHooks] enqueueing a raw message in the hook:", hook.callbackURL);
hook.enqueue(message);
}
});
} // Put foreach inside the if to avoid pingpong events
}
_findMeetingID(message) {
if (message.data) {
return message.data.attributes.meeting["internal-meeting-id"];
}
if (message.payload) {
return message.payload.meeting_id;
}
if (message.envelope && message.envelope.routing && message.envelope.routing.meetingId) {
return message.envelope.routing.meetingId;
}
if (message.header && message.header.body && message.header.body.meetingId) {
return message.header.body.meetingId;
}
if (message.core && message.core.body) {
return message.core.body.props ? message.core.body.props.meetingProp.intId : message.core.body.meetingId;
}
return undefined;
}
// Processes an event received from redis. Will get all hook URLs that
// should receive this event and start the process to perform the callback.
_processEvent(message, raw) {
// Get all global hooks
let hooks = Hook.allGlobalSync();
// filter the hooks that need to receive this event
// add hooks that are registered for this specific meeting
const idFromMessage = message.data != null ? message.data.attributes.meeting["internal-meeting-id"] : undefined;
if (idFromMessage != null) {
const eMeetingID = IDMapping.getExternalMeetingID(idFromMessage);
hooks = hooks.concat(Hook.findByExternalMeetingIDSync(eMeetingID));
}
// Notify every hook asynchronously, if hook N fails, it won't block hook N+k from receiving its message
async.forEach(hooks, (hook) => {
if (!hook.getRaw) {
Logger.info("[WebHooks] enqueueing a message in the hook:", hook.callbackURL);
hook.enqueue(message);
}
});
const sendRaw = hooks.some(hook => { return hook.getRaw });
if (sendRaw) {
this._processRaw(raw);
}
}
};

View File

@ -1,127 +0,0 @@
_ = require("lodash")
express = require("express")
url = require("url")
config = require("./config")
Hook = require("./hook")
Logger = require("./logger")
Utils = require("./utils")
# Web server that listens for API calls and process them.
module.exports = class WebServer
constructor: ->
@app = express()
@_registerRoutes()
start: (port) ->
@server = @app.listen(port)
unless @server.address()?
Logger.error "Could not bind to port", port
Logger.error "Aborting."
process.exit(1)
Logger.info "Server listening on port", port, "in", @app.settings.env.toUpperCase(), "mode"
_registerRoutes: ->
# Request logger
@app.all "*", (req, res, next) ->
unless fromMonit(req)
Logger.info "<==", req.method, "request to", req.url, "from:", clientDataSimple(req)
next()
@app.get "/bigbluebutton/api/hooks/create", @_validateChecksum, @_create
@app.get "/bigbluebutton/api/hooks/destroy", @_validateChecksum, @_destroy
@app.get "/bigbluebutton/api/hooks/list", @_validateChecksum, @_list
@app.get "/bigbluebutton/api/hooks/ping", (req, res) ->
res.write "bbb-webhooks up!"
res.end()
_create: (req, res, next) ->
urlObj = url.parse(req.url, true)
callbackURL = urlObj.query["callbackURL"]
meetingID = urlObj.query["meetingID"]
unless callbackURL?
respondWithXML(res, config.api.responses.missingParamCallbackURL)
else
Hook.addSubscription callbackURL, meetingID, (error, hook) ->
if error? # the only error for now is for duplicated callbackURL
msg = config.api.responses.createDuplicated(hook.id)
else if hook?
msg = config.api.responses.createSuccess(hook.id)
else
msg = config.api.responses.createFailure
respondWithXML(res, msg)
_destroy: (req, res, next) ->
urlObj = url.parse(req.url, true)
hookID = urlObj.query["hookID"]
unless hookID?
respondWithXML(res, config.api.responses.missingParamHookID)
else
Hook.removeSubscription hookID, (error, result) ->
if error?
msg = config.api.responses.destroyFailure
else if !result
msg = config.api.responses.destroyNoHook
else
msg = config.api.responses.destroySuccess
respondWithXML(res, msg)
_list: (req, res, next) ->
urlObj = url.parse(req.url, true)
meetingID = urlObj.query["meetingID"]
if meetingID?
# all the hooks that receive events from this meeting
hooks = Hook.allGlobalSync()
hooks = hooks.concat(Hook.findByExternalMeetingIDSync(meetingID))
hooks = _.sortBy(hooks, (hook) -> hook.id)
else
# no meetingID, return all hooks
hooks = Hook.allSync()
msg = "<response><returncode>SUCCESS</returncode><hooks>"
hooks.forEach (hook) ->
msg += "<hook>"
msg += "<hookID>#{hook.id}</hookID>"
msg += "<callbackURL><![CDATA[#{hook.callbackURL}]]></callbackURL>"
msg += "<meetingID><![CDATA[#{hook.externalMeetingID}]]></meetingID>" unless hook.isGlobal()
msg += "</hook>"
msg += "</hooks></response>"
respondWithXML(res, msg)
# Validates the checksum in the request `req`.
# If it doesn't match BigBlueButton's shared secret, will send an XML response
# with an error code just like BBB does.
_validateChecksum: (req, res, next) =>
urlObj = url.parse(req.url, true)
checksum = urlObj.query["checksum"]
if checksum is Utils.checksumAPI(req.url, config.bbb.sharedSecret)
next()
else
Logger.info "checksum check failed, sending a checksumError response"
res.setHeader("Content-Type", "text/xml")
res.send cleanupXML(config.api.responses.checksumError)
respondWithXML = (res, msg) ->
msg = cleanupXML(msg)
Logger.info "==> respond with:", msg
res.setHeader("Content-Type", "text/xml")
res.send msg
# Returns a simple string with a description of the client that made
# the request. It includes the IP address and the user agent.
clientDataSimple = (req) ->
"ip " + Utils.ipFromRequest(req) + ", using " + req.headers["user-agent"]
# Cleans up a string with an XML in it removing spaces and new lines from between the tags.
cleanupXML = (string) ->
string.trim().replace(/>\s*/g, '>')
# Was this request made by monit?
fromMonit = (req) ->
req.headers["user-agent"]? and req.headers["user-agent"].match(/^monit/)

172
bbb-webhooks/web_server.js Normal file
View File

@ -0,0 +1,172 @@
const _ = require("lodash");
const express = require("express");
const url = require("url");
const config = require("./config.js");
const Hook = require("./hook.js");
const Logger = require("./logger.js");
const Utils = require("./utils.js");
// Web server that listens for API calls and process them.
module.exports = class WebServer {
constructor() {
this._validateChecksum = this._validateChecksum.bind(this);
this.app = express();
this._registerRoutes();
}
start(port, callback) {
this.server = this.app.listen(port);
if (this.server.address() == null) {
Logger.error("[WebServer] aborting, could not bind to port", port,
process.exit(1));
}
Logger.info("[WebServer] listening on port", port, "in", this.app.settings.env.toUpperCase(), "mode");
typeof callback === 'function' ? callback(null,"k") : undefined;
}
_registerRoutes() {
// Request logger
this.app.all("*", function(req, res, next) {
if (!fromMonit(req)) {
Logger.info("[WebServer]", req.method, "request to", req.url, "from:", clientDataSimple(req));
}
next();
});
this.app.get("/bigbluebutton/api/hooks/create", this._validateChecksum, this._create);
this.app.get("/bigbluebutton/api/hooks/destroy", this._validateChecksum, this._destroy);
this.app.get("/bigbluebutton/api/hooks/list", this._validateChecksum, this._list);
this.app.get("/bigbluebutton/api/hooks/ping", function(req, res) {
res.write("bbb-webhooks up!");
res.end();
});
}
_create(req, res, next) {
const urlObj = url.parse(req.url, true);
const callbackURL = urlObj.query["callbackURL"];
const meetingID = urlObj.query["meetingID"];
let getRaw = urlObj.query["getRaw"];
if(getRaw){
getRaw = JSON.parse(getRaw.toLowerCase());
}
else getRaw = false
if (callbackURL == null) {
respondWithXML(res, config.api.responses.missingParamCallbackURL);
} else {
Hook.addSubscription(callbackURL, meetingID, getRaw, function(error, hook) {
let msg;
if (error != null) { // the only error for now is for duplicated callbackURL
msg = config.api.responses.createDuplicated(hook.id);
} else if (hook != null) {
msg = config.api.responses.createSuccess(hook.id, hook.permanent, hook.getRaw);
} else {
msg = config.api.responses.createFailure;
}
respondWithXML(res, msg);
});
}
}
// Create a permanent hook. Permanent hooks can't be deleted via API and will try to emit a message until it succeed
createPermanents(callback) {
for (let i = 0; i < config.hooks.permanentURLs.length; i++) {
Hook.addSubscription(config.hooks.permanentURLs[i], null, config.hooks.getRaw, function(error, hook) {
if (error != null) { // there probably won't be any errors here
Logger.info("[WebServer] duplicated permanent hook", error);
} else if (hook != null) {
Logger.info("[WebServer] permanent hook created successfully");
} else {
Logger.info("[WebServer] error creating permanent hook");
}
});
}
typeof callback === 'function' ? callback(null,"p") : undefined;
}
_destroy(req, res, next) {
const urlObj = url.parse(req.url, true);
const hookID = urlObj.query["hookID"];
if (hookID == null) {
respondWithXML(res, config.api.responses.missingParamHookID);
} else {
Hook.removeSubscription(hookID, function(error, result) {
let msg;
if (error != null) {
msg = config.api.responses.destroyFailure;
} else if (!result) {
msg = config.api.responses.destroyNoHook;
} else {
msg = config.api.responses.destroySuccess;
}
respondWithXML(res, msg);
});
}
}
_list(req, res, next) {
let hooks;
const urlObj = url.parse(req.url, true);
const meetingID = urlObj.query["meetingID"];
if (meetingID != null) {
// all the hooks that receive events from this meeting
hooks = Hook.allGlobalSync();
hooks = hooks.concat(Hook.findByExternalMeetingIDSync(meetingID));
hooks = _.sortBy(hooks, hook => hook.id);
} else {
// no meetingID, return all hooks
hooks = Hook.allSync();
}
let msg = "<response><returncode>SUCCESS</returncode><hooks>";
hooks.forEach(function(hook) {
msg += "<hook>";
msg += `<hookID>${hook.id}</hookID>`;
msg += `<callbackURL><![CDATA[${hook.callbackURL}]]></callbackURL>`;
if (!hook.isGlobal()) { msg += `<meetingID><![CDATA[${hook.externalMeetingID}]]></meetingID>`; }
msg += `<permanentHook>${hook.permanent}</permanentHook>`;
msg += `<rawData>${hook.getRaw}</rawData>`;
msg += "</hook>";
});
msg += "</hooks></response>";
respondWithXML(res, msg);
}
// Validates the checksum in the request `req`.
// If it doesn't match BigBlueButton's shared secret, will send an XML response
// with an error code just like BBB does.
_validateChecksum(req, res, next) {
const urlObj = url.parse(req.url, true);
const checksum = urlObj.query["checksum"];
if (checksum === Utils.checksumAPI(req.url, config.bbb.sharedSecret)) {
next();
} else {
Logger.info("[WebServer] checksum check failed, sending a checksumError response");
res.setHeader("Content-Type", "text/xml");
res.send(cleanupXML(config.api.responses.checksumError));
}
}
};
var respondWithXML = function(res, msg) {
msg = cleanupXML(msg);
Logger.info("[WebServer] respond with:", msg);
res.setHeader("Content-Type", "text/xml");
res.send(msg);
};
// Returns a simple string with a description of the client that made
// the request. It includes the IP address and the user agent.
var clientDataSimple = req => `ip ${Utils.ipFromRequest(req)}, using ${req.headers["user-agent"]}`;
// Cleans up a string with an XML in it removing spaces and new lines from between the tags.
var cleanupXML = string => string.trim().replace(/>\s*/g, '>');
// Was this request made by monit?
var fromMonit = req => (req.headers["user-agent"] != null) && req.headers["user-agent"].match(/^monit/);