bigbluebutton-Github/bigbluebutton-html5/imports/startup/server/redis.js
Pedro Beschorner Marin 0c4cf0135d Handle pad's events and Meteor's instances
Since Meteor was split in multiple process and events started to be
filtered by instances, all Etherpad's Redis events were being discarded.

Etherpad has a Redis' publisher plugin that is unaware of BigBlueButton's
existence. All the communication between them is kept simple with minimal
of internal data exchange. The concept of distincts subscribers at Meteor's
side broke part of this simplicity and, now, Etherpad has to know which
instance must receive it's messages. To provide such information I decided
to include Meteor's instance as part of the pad's id. Should look like:

 - [instanceId]padId for the shared notes
 - [instanceId]padId_cc_(locale) for the closed captions

With those changes the pad id generation made at the recording scripts had to
be re-done because there is no instance id available. Pad id is now recorded at
akka-apps and queried while archiving the shared notes.
2021-03-23 18:03:50 -03:00

403 lines
13 KiB
JavaScript
Executable File

/* global PowerQueue */
import Redis from 'redis';
import { Meteor } from 'meteor/meteor';
import { EventEmitter2 } from 'eventemitter2';
import {
isPadMessage,
getInstanceIdFromPadMessage,
} from './etherpad';
import { check } from 'meteor/check';
import fs from 'fs';
import Logger from './logger';
import Metrics from './metrics';
// Fake meetingId used for messages that have no meetingId
const NO_MEETING_ID = '_';
const { queueMetrics } = Meteor.settings.private.redis.metrics;
const makeEnvelope = (channel, eventName, header, body, routing) => {
const envelope = {
envelope: {
name: eventName,
routing: routing || {
sender: 'html5-server',
},
timestamp: Date.now(),
},
core: {
header,
body,
},
};
return JSON.stringify(envelope);
};
const getInstanceIdFromMessage = (parsedMessage) => {
// End meeting message does not seem to have systemProps
let instanceIdFromMessage = parsedMessage.core.body.props?.systemProps?.html5InstanceId;
// Pad messages does not have systemProps
if (!instanceIdFromMessage && isPadMessage(parsedMessage)) {
instanceIdFromMessage = getInstanceIdFromPadMessage(parsedMessage);
}
return instanceIdFromMessage;
};
class MeetingMessageQueue {
constructor(eventEmitter, asyncMessages = [], redisDebugEnabled = false) {
this.asyncMessages = asyncMessages;
this.emitter = eventEmitter;
this.queue = new PowerQueue();
this.redisDebugEnabled = redisDebugEnabled;
this.handleTask = this.handleTask.bind(this);
this.queue.taskHandler = this.handleTask;
}
handleTask(data, next) {
const { channel } = data;
const { envelope } = data.parsedMessage;
const { header } = data.parsedMessage.core;
const { body } = data.parsedMessage.core;
const { meetingId } = header;
const eventName = header.name;
const isAsync = this.asyncMessages.includes(channel)
|| this.asyncMessages.includes(eventName);
const beginHandleTimestamp = Date.now();
let called = false;
check(eventName, String);
check(body, Object);
const callNext = () => {
if (called) return;
if (this.redisDebugEnabled) {
Logger.debug(`Redis: ${eventName} completed ${isAsync ? 'async' : 'sync'}`);
}
called = true;
if (queueMetrics) {
const queueId = meetingId || NO_MEETING_ID;
const dataLength = JSON.stringify(data).length;
Metrics.processEvent(queueId, eventName, dataLength, beginHandleTimestamp);
}
const queueLength = this.queue.length();
if (queueLength > 100) {
Logger.warn(`Redis: MeetingMessageQueue for meetingId=${meetingId} has queue size=${queueLength} `);
}
next();
};
const onError = (reason) => {
Logger.error(`${eventName}: ${reason.stack ? reason.stack : reason}`);
callNext();
};
try {
if (this.redisDebugEnabled) {
Logger.debug(`Redis: ${JSON.stringify(data.parsedMessage.core)} emitted`);
}
if (isAsync) {
callNext();
}
this.emitter
.emitAsync(eventName, { envelope, header, body }, meetingId)
.then(callNext)
.catch(onError);
} catch (reason) {
onError(reason);
}
}
add(...args) {
return this.queue.add(...args);
}
}
class RedisPubSub {
static handlePublishError(err) {
if (err) {
Logger.error(err);
}
}
constructor(config = {}) {
this.config = config;
this.didSendRequestEvent = false;
const host = process.env.REDIS_HOST || Meteor.settings.private.redis.host;
const redisConf = Meteor.settings.private.redis;
this.instanceId = parseInt(process.env.INSTANCE_ID, 10) || 1; // 1 also handles running in dev mode
this.role = process.env.BBB_HTML5_ROLE;
this.customRedisChannel = `to-html5-redis-channel${this.instanceId}`;
const { password, port } = redisConf;
if (password) {
this.pub = Redis.createClient({ host, port, password });
this.sub = Redis.createClient({ host, port, password });
this.pub.auth(password);
this.sub.auth(password);
} else {
this.pub = Redis.createClient({ host, port });
this.sub = Redis.createClient({ host, port });
}
if (queueMetrics) {
Metrics.startDumpFile();
}
this.emitter = new EventEmitter2();
this.meetingsQueues = {};
// We create this _ meeting queue because we need to be able to handle system messages (no meetingId in core.header)
this.meetingsQueues[NO_MEETING_ID] = new MeetingMessageQueue(this.emitter, this.config.async, this.config.debug);
this.handleSubscribe = this.handleSubscribe.bind(this);
this.handleMessage = this.handleMessage.bind(this);
}
init() {
this.sub.on('psubscribe', Meteor.bindEnvironment(this.handleSubscribe));
this.sub.on('pmessage', Meteor.bindEnvironment(this.handleMessage));
const channelsToSubscribe = this.config.subscribeTo;
channelsToSubscribe.push(this.customRedisChannel);
switch (this.role) {
case 'frontend':
this.sub.psubscribe('from-akka-apps-frontend-redis-channel');
if (this.redisDebugEnabled) {
Logger.debug(`Redis: NodeJSPool:${this.instanceId} Role: frontend. Subscribed to 'from-akka-apps-frontend-redis-channel'`);
}
break;
case 'backend':
channelsToSubscribe.forEach((channel) => {
this.sub.psubscribe(channel);
if (this.redisDebugEnabled) {
Logger.debug(`Redis: NodeJSPool:${this.instanceId} Role: backend. Subscribed to '${channelsToSubscribe}'`);
}
});
break;
default:
this.sub.psubscribe('from-akka-apps-frontend-redis-channel');
channelsToSubscribe.forEach((channel) => {
this.sub.psubscribe(channel);
if (this.redisDebugEnabled) {
Logger.debug(`Redis: NodeJSPool:${this.instanceId} Role:${this.role} (likely only one nodejs running, doing both frontend and backend. Dev env? ). Subscribed to '${channelsToSubscribe}'`);
}
});
break;
}
}
updateConfig(config) {
this.config = Object.assign({}, this.config, config);
this.redisDebugEnabled = this.config.debug;
}
// TODO: Move this out of this class, maybe pass as a callback to init?
handleSubscribe() {
if (this.didSendRequestEvent || this.role === 'frontend') return;
// populate collections with pre-existing data
const REDIS_CONFIG = Meteor.settings.private.redis;
const CHANNEL = REDIS_CONFIG.channels.toAkkaApps;
const EVENT_NAME = 'GetAllMeetingsReqMsg';
const body = {
requesterId: 'nodeJSapp',
html5InstanceId: this.instanceId,
};
this.publishSystemMessage(CHANNEL, EVENT_NAME, body);
this.didSendRequestEvent = true;
}
handleMessage(pattern, channel, message) {
const parsedMessage = JSON.parse(message);
const { ignored: ignoredMessages, async } = this.config;
const eventName = parsedMessage.core.header.name;
if (ignoredMessages.includes(channel)
|| ignoredMessages.includes(eventName)) {
if (eventName === 'CheckAlivePongSysMsg') {
return;
}
if (this.redisDebugEnabled) {
Logger.debug(`Redis: ${eventName} skipped`);
}
return;
}
if (this.redisDebugEnabled) {
Logger.warn('Received event to handle', { date: new Date().toISOString(), eventName });
}
// System messages like Create / Destroy Meeting, etc do not have core.header.meetingId.
// Process them in MeetingQueue['_'] --- the NO_MEETING queueId
const meetingIdFromMessageCoreHeader = parsedMessage.core.header.meetingId || NO_MEETING_ID;
if (this.role === 'frontend') {
// receiving this message means we need to look at it. Frontends do not have instanceId.
if (meetingIdFromMessageCoreHeader === NO_MEETING_ID) { // if this is a system message
if (eventName === 'MeetingCreatedEvtMsg' || eventName === 'SyncGetMeetingInfoRespMsg') {
const meetingIdFromMessageMeetingProp = parsedMessage.core.body.props.meetingProp.intId;
this.meetingsQueues[meetingIdFromMessageMeetingProp] = new MeetingMessageQueue(this.emitter, async, this.redisDebugEnabled);
if (this.redisDebugEnabled) {
Logger.warn('Created frontend queue for meeting', { date: new Date().toISOString(), eventName, meetingIdFromMessageMeetingProp });
}
}
}
if (!this.meetingsQueues[meetingIdFromMessageCoreHeader]) {
Logger.warn(`Frontend meeting queue had not been initialized ${message}`, { eventName, meetingIdFromMessageCoreHeader} )
this.meetingsQueues[NO_MEETING_ID].add({
pattern,
channel,
eventName,
parsedMessage,
});
} else {
// process the event - whether it's a system message or not, the meetingIdFromMessageCoreHeader value is adjusted
this.meetingsQueues[meetingIdFromMessageCoreHeader].add({
pattern,
channel,
eventName,
parsedMessage,
});
}
} else {
if (meetingIdFromMessageCoreHeader === NO_MEETING_ID) { // if this is a system message
const meetingIdFromMessageMeetingProp = parsedMessage.core.body.props?.meetingProp?.intId;
const instanceIdFromMessage = getInstanceIdFromMessage(parsedMessage);
if (this.instanceId === instanceIdFromMessage) {
// create queue or destroy queue
if (eventName === 'MeetingCreatedEvtMsg' || eventName === 'SyncGetMeetingInfoRespMsg') {
this.meetingsQueues[meetingIdFromMessageMeetingProp] = new MeetingMessageQueue(this.emitter, async, this.redisDebugEnabled);
if (this.redisDebugEnabled) {
Logger.warn('Created backend queue for meeting', { date: new Date().toISOString(), eventName, meetingIdFromMessageMeetingProp });
}
}
this.meetingsQueues[NO_MEETING_ID].add({
pattern,
channel,
eventName,
parsedMessage,
});
} else {
if (eventName === 'MeetingEndedEvtMsg' || eventName === 'MeetingDestroyedEvtMsg') {
// MeetingEndedEvtMsg does not follow the system message pattern for meetingId
// but we still need to process it on the backend which is processing the rest of the events
// for this meetingId (it does not contain instanceId either, so we cannot compare that)
const meetingIdForMeetingEnded = parsedMessage.core.body.meetingId;
if (!!this.meetingsQueues[meetingIdForMeetingEnded]) {
this.meetingsQueues[NO_MEETING_ID].add({
pattern,
channel,
eventName,
parsedMessage,
});
}
}
// ignore
}
} else {
// add to existing queue
if (!!this.meetingsQueues[meetingIdFromMessageCoreHeader]) {
// only handle message if we have a queue for the meeting. If we don't have a queue, it means it's for a different instanceId
this.meetingsQueues[meetingIdFromMessageCoreHeader].add({
pattern,
channel,
eventName,
parsedMessage,
});
} else {
Logger.warn('Backend meeting queue had not been initialized', { eventName, meetingIdFromMessageCoreHeader })
}
}
}
}
destroyMeetingQueue(id) {
delete this.meetingsQueues[id];
}
on(...args) {
return this.emitter.on(...args);
}
publishVoiceMessage(channel, eventName, voiceConf, payload) {
const header = {
name: eventName,
voiceConf,
};
const envelope = makeEnvelope(channel, eventName, header, payload);
return this.pub.publish(channel, envelope, RedisPubSub.handlePublishError);
}
publishSystemMessage(channel, eventName, payload) {
const header = {
name: eventName,
};
const envelope = makeEnvelope(channel, eventName, header, payload);
return this.pub.publish(channel, envelope, RedisPubSub.handlePublishError);
}
publishMeetingMessage(channel, eventName, meetingId, payload) {
const header = {
name: eventName,
meetingId,
};
const envelope = makeEnvelope(channel, eventName, header, payload);
return this.pub.publish(channel, envelope, RedisPubSub.handlePublishError);
}
publishUserMessage(channel, eventName, meetingId, userId, payload) {
const header = {
name: eventName,
meetingId,
userId,
};
if (!meetingId || !userId) {
Logger.warn(`Publishing ${eventName} with potentially missing data userId=${userId} meetingId=${meetingId}`);
}
const envelope = makeEnvelope(channel, eventName, header, payload, { meetingId, userId });
return this.pub.publish(channel, envelope, RedisPubSub.handlePublishError);
}
}
const RedisPubSubSingleton = new RedisPubSub();
Meteor.startup(() => {
const REDIS_CONFIG = Meteor.settings.private.redis;
RedisPubSubSingleton.updateConfig(REDIS_CONFIG);
RedisPubSubSingleton.init();
});
export default RedisPubSubSingleton;