bigbluebutton-Github/bbb-webhooks/hook.js
2019-03-28 20:17:20 +00:00

323 lines
12 KiB
JavaScript

const _ = require("lodash");
const async = require("async");
const redis = require("redis");
const config = require('config');
const CallbackEmitter = require("./callback_emitter.js");
const IDMapping = require("./id_mapping.js");
const Logger = require("./logger.js");
// The database of hooks.
// Used always from memory, but saved to redis for persistence.
//
// Format:
// { id: Hook }
// Format on redis:
// * a SET "...:hooks" with all ids
// * a HASH "...:hook:<id>" for each hook with some of its attributes
let db = {};
let nextID = 1;
// The representation of a hook and its properties. Stored in memory and persisted
// to redis.
// Hooks can be global, receiving callback calls for events from all meetings on the
// server, or for a specific meeting. If an `externalMeetingID` is set in the hook,
// it will only receive calls related to this meeting, otherwise it will be global.
// Events are kept in a queue to be sent in the order they are received.
// But if the requests are going by but taking too long, the queue might be increasing
// faster than the callbacks are made. In this case the events will be concatenated
// and send up to 10 events in every post
module.exports = class Hook {
constructor() {
this.id = null;
this.callbackURL = null;
this.externalMeetingID = null;
this.queue = [];
this.emitter = null;
this.redisClient = Application.redisClient();
this.permanent = false;
this.getRaw = false;
}
save(callback) {
this.redisClient.hmset(config.get("redis.keys.hookPrefix") + ":" + this.id, this.toRedis(), (error, reply) => {
if (error != null) { Logger.error("[Hook] error saving hook to redis:", error, reply); }
this.redisClient.sadd(config.get("redis.keys.hooks"), this.id, (error, reply) => {
if (error != null) { Logger.error("[Hook] error saving hookID to the list of hooks:", error, reply); }
db[this.id] = this;
(typeof callback === 'function' ? callback(error, db[this.id]) : undefined);
});
});
}
destroy(callback) {
this.redisClient.srem(config.get("redis.keys.hooks"), this.id, (error, reply) => {
if (error != null) { Logger.error("[Hook] error removing hookID from the list of hooks:", error, reply); }
this.redisClient.del(config.get("redis.keys.hookPrefix") + ":" + this.id, error => {
if (error != null) { Logger.error("[Hook] error removing hook from redis:", error); }
if (db[this.id]) {
delete db[this.id];
(typeof callback === 'function' ? callback(error, true) : undefined);
} else {
(typeof callback === 'function' ? callback(error, false) : undefined);
}
});
});
}
// Is this a global hook?
isGlobal() {
return (this.externalMeetingID == null);
}
// The meeting from which this hook should receive events.
targetMeetingID() {
return this.externalMeetingID;
}
// Puts a new message in the queue. Will also trigger a processing in the queue so this
// message might be processed instantly.
enqueue(message) {
this.redisClient.llen(config.get("redis.keys.eventsPrefix") + ":" + this.id, (error, reply) => {
const length = reply;
if (length < config.get("hooks.queueSize") && this.queue.length < config.get("hooks.queueSize")) {
Logger.info(`[Hook] ${this.callbackURL} enqueueing message:`, JSON.stringify(message));
// Add message to redis queue
this.redisClient.rpush(config.get("redis.keys.eventsPrefix") + ":" + this.id, JSON.stringify(message), (error,reply) => {
if (error != null) { Logger.error("[Hook] error pushing event to redis queue:", JSON.stringify(message), error); }
});
this.queue.push(JSON.stringify(message));
this._processQueue();
} else {
Logger.warn(`[Hook] ${this.callbackURL} queue size exceed, event:`, JSON.stringify(message));
}
});
}
toRedis() {
const r = {
"hookID": this.id,
"callbackURL": this.callbackURL,
"permanent": this.permanent,
"getRaw": this.getRaw
};
if (this.externalMeetingID != null) { r.externalMeetingID = this.externalMeetingID; }
return r;
}
fromRedis(redisData) {
this.id = parseInt(redisData.hookID);
this.callbackURL = redisData.callbackURL;
this.permanent = redisData.permanent.toLowerCase() == 'true';
this.getRaw = redisData.getRaw.toLowerCase() == 'true';
if (redisData.externalMeetingID != null) {
this.externalMeetingID = redisData.externalMeetingID;
} else {
this.externalMeetingID = null;
}
}
// Gets the first message in the queue and start an emitter to send it. Will only do it
// if there is no emitter running already and if there is a message in the queue.
_processQueue() {
// Will try to send up to a defined number of messages together if they're enqueued (defined on config.get("hooks.multiEvent"))
const lengthIn = this.queue.length > config.get("hooks.multiEvent") ? config.get("hooks.multiEvent") : this.queue.length;
let num = lengthIn + 1;
// Concat messages
let message = this.queue.slice(0,lengthIn);
message = message.join(",");
if ((message == null) || (this.emitter != null) || (lengthIn <= 0)) { return; }
// Add params so emitter will 'know' when a hook is permanent and have backupURLs
this.emitter = new CallbackEmitter(this.callbackURL, message, this.permanent);
this.emitter.start();
this.emitter.on("success", () => {
delete this.emitter;
while ((num -= 1)) {
// Remove the sent message from redis
this.redisClient.lpop(config.get("redis.keys.eventsPrefix") + ":" + this.id, (error, reply) => {
if (error != null) { return Logger.error("[Hook] error removing event from redis queue:", error); }
});
this.queue.shift();
} // pop the first message just sent
this._processQueue(); // go to the next message
});
// gave up trying to perform the callback, remove the hook forever if the hook's not permanent (emmiter will validate that)
return this.emitter.on("stopped", error => {
Logger.warn("[Hook] too many failed attempts to perform a callback call, removing the hook for:", this.callbackURL);
this.destroy();
});
}
static addSubscription(callbackURL, meetingID, getRaw, callback) {
let hook = Hook.findByCallbackURLSync(callbackURL);
if (hook != null) {
return (typeof callback === 'function' ? callback(new Error("There is already a subscription for this callback URL"), hook) : undefined);
} else {
let msg = `[Hook] adding a hook with callback URL: [${callbackURL}],`;
if (meetingID != null) { msg += ` for the meeting: [${meetingID}]`; }
Logger.info(msg);
hook = new Hook();
hook.callbackURL = callbackURL;
hook.externalMeetingID = meetingID;
hook.getRaw = getRaw;
hook.permanent = config.get("hooks.permanentURLs").some( obj => {
return obj.url === callbackURL
});
if (hook.permanent) {
hook.id = config.get("hooks.permanentURLs").map(obj => obj.url).indexOf(callbackURL) + 1;
nextID = config.get("hooks.permanentURLs").length + 1;
} else {
hook.id = nextID++;
}
// Sync permanent queue
if (hook.permanent) {
hook.redisClient.llen(config.get("redis.keys.eventsPrefix") + ":" + hook.id, (error, len) => {
if (len > 0) {
const length = len;
hook.redisClient.lrange(config.get("redis.keys.eventsPrefix") + ":" + hook.id, 0, len, (error, elements) => {
elements.forEach(element => {
hook.queue.push(element);
});
if (hook.queue.length > 0) { return hook._processQueue(); }
});
}
});
}
hook.save((error, hook) => { typeof callback === 'function' ? callback(error, hook) : undefined });
}
}
static removeSubscription(hookID, callback) {
let hook = Hook.getSync(hookID);
if (hook != null && !hook.permanent) {
let msg = `[Hook] removing the hook with callback URL: [${hook.callbackURL}],`;
if (hook.externalMeetingID != null) { msg += ` for the meeting: [${hook.externalMeetingID}]`; }
Logger.info(msg);
hook.destroy((error, removed) => { typeof callback === 'function' ? callback(error, removed) : undefined });
} else {
return (typeof callback === 'function' ? callback(null, false) : undefined);
}
}
static countSync() {
return Object.keys(db).length;
}
static getSync(id) {
return db[id];
}
static firstSync() {
const keys = Object.keys(db);
if (keys.length > 0) {
return db[keys[0]];
} else {
return null;
}
}
static findByExternalMeetingIDSync(externalMeetingID) {
const hooks = Hook.allSync();
return _.filter(hooks, hook => (externalMeetingID != null) && (externalMeetingID === hook.externalMeetingID));
}
static allGlobalSync() {
const hooks = Hook.allSync();
return _.filter(hooks, hook => hook.isGlobal());
}
static allSync() {
let arr = Object.keys(db).reduce(function(arr, id) {
arr.push(db[id]);
return arr;
}
, []);
return arr;
}
static clearSync() {
for (let id in db) {
delete db[id];
}
return db = {};
}
static findByCallbackURLSync(callbackURL) {
for (let id in db) {
if (db[id].callbackURL === callbackURL) {
return db[id];
}
}
}
static initialize(callback) {
Hook.resync(callback);
}
// Gets all hooks from redis to populate the local database.
// Calls `callback()` when done.
static resync(callback) {
let client = Application.redisClient();
// Remove previous permanent hooks
for (let hk = 1; hk <= config.get("hooks.permanentURLs").length; hk++) {
client.srem(config.get("redis.keys.hooks"), hk, (error, reply) => {
if (error != null) { Logger.error("[Hook] error removing previous permanent hook from list:", error); }
client.del(config.get("redis.keys.hookPrefix") + ":" + hk, error => {
if (error != null) { Logger.error("[Hook] error removing previous permanent hook from redis:", error); }
});
});
}
let tasks = [];
client.smembers(config.get("redis.keys.hooks"), (error, hooks) => {
if (error != null) { Logger.error("[Hook] error getting list of hooks from redis:", error); }
hooks.forEach(id => {
tasks.push(done => {
client.hgetall(config.get("redis.keys.hookPrefix") + ":" + id, function(error, hookData) {
if (error != null) { Logger.error("[Hook] error getting information for a hook from redis:", error); }
if (hookData != null) {
let length;
let hook = new Hook();
hook.fromRedis(hookData);
// sync events queue
client.llen(config.get("redis.keys.eventsPrefix") + ":" + hook.id, (error, len) => {
length = len;
client.lrange(config.get("redis.keys.eventsPrefix") + ":" + hook.id, 0, len, (error, elements) => {
elements.forEach(element => {
hook.queue.push(element);
});
});
});
// Persist hook to redis
hook.save( (error, hook) => {
if (hook.id >= nextID) { nextID = hook.id + 1; }
if (hook.queue.length > 0) { hook._processQueue(); }
done(null, hook);
});
} else {
done(null, null);
}
});
});
});
async.series(tasks, function(errors, result) {
hooks = _.map(Hook.allSync(), hook => `[${hook.id}] ${hook.callbackURL}`);
Logger.info("[Hook] finished resync, hooks registered:", hooks);
(typeof callback === 'function' ? callback() : undefined);
});
});
}
};