bigbluebutton-Github/bbb-webhooks/web_hooks.js
2019-03-29 20:52:58 +00:00

150 lines
5.7 KiB
JavaScript

const _ = require("lodash");
const async = require("async");
const redis = require("redis");
const request = require("request");
const config = require("config");
const Hook = require("./hook.js");
const IDMapping = require("./id_mapping.js");
const Logger = require("./logger.js");
const MessageMapping = require("./messageMapping.js");
const UserMapping = require("./userMapping.js");
// Web hooks will listen for events on redis coming from BigBlueButton and
// perform HTTP calls with them to all registered hooks.
module.exports = class WebHooks {
constructor() {
this.subscriberEvents = Application.redisPubSubClient();
}
start(callback) {
this._subscribeToEvents();
typeof callback === 'function' ? callback(null,"w") : undefined;
}
// Subscribe to the events on pubsub that might need to be sent in callback calls.
_subscribeToEvents() {
this.subscriberEvents.on("psubscribe", (channel, count) => Logger.info(`[WebHooks] subscribed to:${channel}`));
this.subscriberEvents.on("pmessage", (pattern, channel, message) => {
let raw;
const processMessage = () => {
Logger.info(`[WebHooks] processing message on [${channel}]:`, JSON.stringify(message));
this._processEvent(message, raw);
};
try {
raw = JSON.parse(message);
let messageMapped = new MessageMapping();
messageMapped.mapMessage(JSON.parse(message));
message = messageMapped.mappedObject;
if (!_.isEmpty(message)) {
const intId = message.data.attributes.meeting["internal-meeting-id"];
IDMapping.reportActivity(intId);
// First treat meeting events to add/remove ID mappings
switch (message.data.id) {
case "meeting-created":
Logger.info(`[WebHooks] got create message on meetings channel [${channel}]:`, message);
IDMapping.addOrUpdateMapping(intId, message.data.attributes.meeting["external-meeting-id"], (error, result) => {
// has to be here, after the meeting was created, otherwise create calls won't generate
// callback calls for meeting hooks
processMessage();
});
break;
case "user-joined":
UserMapping.addOrUpdateMapping(message.data.attributes.user["internal-user-id"],message.data.attributes.user["external-user-id"], intId, message.data.attributes.user, () => {
processMessage();
});
break;
case "user-left":
UserMapping.removeMapping(message.data.attributes.user["internal-user-id"], () => { processMessage(); });
break;
case "meeting-ended":
UserMapping.removeMappingMeetingId(intId, () => { processMessage(); });
break;
default:
processMessage();
}
}
} catch (e) {
Logger.error("[WebHooks] error processing the message:", JSON.stringify(raw), ":", e.message);
}
});
config.get("hooks.channels").forEach((channel) => {
this.subscriberEvents.psubscribe(channel);
});
}
// Send raw data to hooks that are not expecting mapped messages
_processRaw(message) {
let idFromMessage;
let hooks = Hook.allGlobalSync();
// Add hooks for the specific meeting that expect raw data
// Get meetingId for a raw message that was previously mapped by another webhook application or if it's straight from redis
idFromMessage = this._findMeetingID(message);
if (idFromMessage != null) {
const eMeetingID = IDMapping.getExternalMeetingID(idFromMessage);
hooks = hooks.concat(Hook.findByExternalMeetingIDSync(eMeetingID));
// Notify the hooks that expect raw data
async.forEach(hooks, (hook) => {
if (hook.getRaw) {
Logger.info("[WebHooks] enqueueing a raw message in the hook:", hook.callbackURL);
hook.enqueue(message);
}
});
} // Put foreach inside the if to avoid pingpong events
}
_findMeetingID(message) {
if (message.data) {
return message.data.attributes.meeting["internal-meeting-id"];
}
if (message.payload) {
return message.payload.meeting_id;
}
if (message.envelope && message.envelope.routing && message.envelope.routing.meetingId) {
return message.envelope.routing.meetingId;
}
if (message.header && message.header.body && message.header.body.meetingId) {
return message.header.body.meetingId;
}
if (message.core && message.core.body) {
return message.core.body.props ? message.core.body.props.meetingProp.intId : message.core.body.meetingId;
}
return undefined;
}
// Processes an event received from redis. Will get all hook URLs that
// should receive this event and start the process to perform the callback.
_processEvent(message, raw) {
// Get all global hooks
let hooks = Hook.allGlobalSync();
// filter the hooks that need to receive this event
// add hooks that are registered for this specific meeting
const idFromMessage = message.data != null ? message.data.attributes.meeting["internal-meeting-id"] : undefined;
if (idFromMessage != null) {
const eMeetingID = IDMapping.getExternalMeetingID(idFromMessage);
hooks = hooks.concat(Hook.findByExternalMeetingIDSync(eMeetingID));
}
// Notify every hook asynchronously, if hook N fails, it won't block hook N+k from receiving its message
async.forEach(hooks, (hook) => {
if (!hook.getRaw) {
Logger.info("[WebHooks] enqueueing a message in the hook:", hook.callbackURL);
hook.enqueue(message);
}
});
const sendRaw = hooks.some(hook => { return hook.getRaw });
if (sendRaw && config.get("hooks.getRaw")) {
this._processRaw(raw);
}
}
};