a04fbc56e1
New config: settings.private.prometheus.collectRedisMetrics disabled by default
411 lines
14 KiB
JavaScript
Executable File
411 lines
14 KiB
JavaScript
Executable File
import Redis from 'redis';
|
|
import { Meteor } from 'meteor/meteor';
|
|
import { EventEmitter2 } from 'eventemitter2';
|
|
import { check } from 'meteor/check';
|
|
import Logger from './logger';
|
|
import Metrics from './metrics';
|
|
import queue from 'queue';
|
|
import { PrometheusAgent, METRIC_NAMES } from './prom-metrics/index.js'
|
|
|
|
// Fake meetingId used for messages that have no meetingId
|
|
const NO_MEETING_ID = '_';
|
|
|
|
const { queueMetrics } = Meteor.settings.private.redis.metrics;
|
|
const { collectRedisMetrics: PROM_METRICS_ENABLED } = Meteor.settings.private.prometheus;
|
|
|
|
const makeEnvelope = (channel, eventName, header, body, routing) => {
|
|
const envelope = {
|
|
envelope: {
|
|
name: eventName,
|
|
routing: routing || {
|
|
sender: 'html5-server',
|
|
},
|
|
timestamp: Date.now(),
|
|
},
|
|
core: {
|
|
header,
|
|
body,
|
|
},
|
|
};
|
|
|
|
return JSON.stringify(envelope);
|
|
};
|
|
|
|
const getInstanceIdFromMessage = (parsedMessage) => {
|
|
// End meeting message does not seem to have systemProps
|
|
let instanceIdFromMessage = parsedMessage.core.body.props?.systemProps?.html5InstanceId;
|
|
|
|
return instanceIdFromMessage;
|
|
};
|
|
|
|
class MeetingMessageQueue {
|
|
constructor(eventEmitter, asyncMessages = [], redisDebugEnabled = false) {
|
|
this.asyncMessages = asyncMessages;
|
|
this.emitter = eventEmitter;
|
|
this.queue = queue({ autostart: true, concurrency: 1 });
|
|
this.redisDebugEnabled = redisDebugEnabled;
|
|
|
|
this.handleTask = this.handleTask.bind(this);
|
|
this.queue.taskHandler = this.handleTask;
|
|
}
|
|
|
|
handleTask(data, next) {
|
|
const { channel } = data;
|
|
const { envelope } = data.parsedMessage;
|
|
const { header } = data.parsedMessage.core;
|
|
const { body } = data.parsedMessage.core;
|
|
const { meetingId } = header;
|
|
const eventName = header.name;
|
|
const isAsync = this.asyncMessages.includes(channel)
|
|
|| this.asyncMessages.includes(eventName);
|
|
|
|
const beginHandleTimestamp = Date.now();
|
|
let called = false;
|
|
|
|
check(eventName, String);
|
|
check(body, Object);
|
|
|
|
const callNext = () => {
|
|
if (called) return;
|
|
if (this.redisDebugEnabled) {
|
|
Logger.debug(`Redis: ${eventName} completed ${isAsync ? 'async' : 'sync'}`);
|
|
}
|
|
called = true;
|
|
|
|
if (queueMetrics) {
|
|
const queueId = meetingId || NO_MEETING_ID;
|
|
const dataLength = JSON.stringify(data).length;
|
|
|
|
Metrics.processEvent(queueId, eventName, dataLength, beginHandleTimestamp);
|
|
}
|
|
|
|
const queueLength = this.queue.length;
|
|
|
|
if (PROM_METRICS_ENABLED) {
|
|
const dataLength = JSON.stringify(data).length;
|
|
const currentTimestamp = Date.now();
|
|
const processTime = currentTimestamp - beginHandleTimestamp;
|
|
PrometheusAgent.observe(METRIC_NAMES.REDIS_PROCESSING_TIME, processTime, { eventName });
|
|
PrometheusAgent.observe(METRIC_NAMES.REDIS_PAYLOAD_SIZE, dataLength, { eventName });
|
|
meetingId && PrometheusAgent.set(METRIC_NAMES.REDIS_MESSAGE_QUEUE, queueLength, { meetingId });
|
|
}
|
|
|
|
if (queueLength > 100) {
|
|
Logger.warn(`Redis: MeetingMessageQueue for meetingId=${meetingId} has queue size=${queueLength} `);
|
|
}
|
|
next();
|
|
};
|
|
|
|
const onError = (reason) => {
|
|
Logger.error(`${eventName}: ${reason.stack ? reason.stack : reason}`);
|
|
callNext();
|
|
};
|
|
|
|
try {
|
|
if (this.redisDebugEnabled) {
|
|
if (!Meteor.settings.private.analytics.includeChat && eventName === 'GroupChatMessageBroadcastEvtMsg') {
|
|
return;
|
|
}
|
|
Logger.debug(`Redis: ${JSON.stringify(data.parsedMessage.core)} emitted`);
|
|
}
|
|
|
|
if (isAsync) {
|
|
callNext();
|
|
}
|
|
|
|
this.emitter
|
|
.emitAsync(eventName, { envelope, header, body }, meetingId)
|
|
.then(callNext)
|
|
.catch(onError);
|
|
} catch (reason) {
|
|
onError(reason);
|
|
}
|
|
}
|
|
|
|
add(...args) {
|
|
const { taskHandler } = this.queue;
|
|
|
|
this.queue.push(function (next) {
|
|
taskHandler(...args, next);
|
|
})
|
|
|
|
}
|
|
}
|
|
|
|
class RedisPubSub {
|
|
static handlePublishError(err) {
|
|
if (err) {
|
|
Logger.error(err);
|
|
}
|
|
}
|
|
|
|
constructor(config = {}) {
|
|
this.config = config;
|
|
|
|
this.didSendRequestEvent = false;
|
|
const host = process.env.REDIS_HOST || Meteor.settings.private.redis.host;
|
|
const redisConf = Meteor.settings.private.redis;
|
|
this.instanceId = parseInt(process.env.INSTANCE_ID, 10) || 1; // 1 also handles running in dev mode
|
|
this.role = process.env.BBB_HTML5_ROLE;
|
|
this.customRedisChannel = `to-html5-redis-channel${this.instanceId}`;
|
|
|
|
const { password, port } = redisConf;
|
|
|
|
if (password) {
|
|
this.pub = Redis.createClient({ host, port, password });
|
|
this.sub = Redis.createClient({ host, port, password });
|
|
this.pub.auth(password);
|
|
this.sub.auth(password);
|
|
} else {
|
|
this.pub = Redis.createClient({ host, port });
|
|
this.sub = Redis.createClient({ host, port });
|
|
}
|
|
|
|
if (queueMetrics) {
|
|
Metrics.startDumpFile();
|
|
}
|
|
|
|
this.emitter = new EventEmitter2();
|
|
this.meetingsQueues = {};
|
|
// We create this _ meeting queue because we need to be able to handle system messages (no meetingId in core.header)
|
|
this.meetingsQueues[NO_MEETING_ID] = new MeetingMessageQueue(this.emitter, this.config.async, this.config.debug);
|
|
|
|
this.handleSubscribe = this.handleSubscribe.bind(this);
|
|
this.handleMessage = this.handleMessage.bind(this);
|
|
}
|
|
|
|
init() {
|
|
this.sub.on('psubscribe', Meteor.bindEnvironment(this.handleSubscribe));
|
|
this.sub.on('pmessage', Meteor.bindEnvironment(this.handleMessage));
|
|
|
|
const channelsToSubscribe = this.config.subscribeTo;
|
|
|
|
channelsToSubscribe.push(this.customRedisChannel);
|
|
|
|
switch (this.role) {
|
|
case 'frontend':
|
|
this.sub.psubscribe('from-akka-apps-frontend-redis-channel');
|
|
if (this.redisDebugEnabled) {
|
|
Logger.debug(`Redis: NodeJSPool:${this.instanceId} Role: frontend. Subscribed to 'from-akka-apps-frontend-redis-channel'`);
|
|
}
|
|
break;
|
|
case 'backend':
|
|
channelsToSubscribe.forEach((channel) => {
|
|
this.sub.psubscribe(channel);
|
|
if (this.redisDebugEnabled) {
|
|
Logger.debug(`Redis: NodeJSPool:${this.instanceId} Role: backend. Subscribed to '${channelsToSubscribe}'`);
|
|
}
|
|
});
|
|
break;
|
|
default:
|
|
this.sub.psubscribe('from-akka-apps-frontend-redis-channel');
|
|
channelsToSubscribe.forEach((channel) => {
|
|
this.sub.psubscribe(channel);
|
|
if (this.redisDebugEnabled) {
|
|
Logger.debug(`Redis: NodeJSPool:${this.instanceId} Role:${this.role} (likely only one nodejs running, doing both frontend and backend. Dev env? ). Subscribed to '${channelsToSubscribe}'`);
|
|
}
|
|
});
|
|
|
|
break;
|
|
}
|
|
}
|
|
|
|
updateConfig(config) {
|
|
this.config = Object.assign({}, this.config, config);
|
|
this.redisDebugEnabled = this.config.debug;
|
|
}
|
|
|
|
|
|
// TODO: Move this out of this class, maybe pass as a callback to init?
|
|
handleSubscribe() {
|
|
if (this.didSendRequestEvent || this.role === 'frontend') return;
|
|
|
|
// populate collections with pre-existing data
|
|
const REDIS_CONFIG = Meteor.settings.private.redis;
|
|
const CHANNEL = REDIS_CONFIG.channels.toAkkaApps;
|
|
const EVENT_NAME = 'GetAllMeetingsReqMsg';
|
|
|
|
const body = {
|
|
requesterId: 'nodeJSapp',
|
|
html5InstanceId: this.instanceId,
|
|
};
|
|
|
|
this.publishSystemMessage(CHANNEL, EVENT_NAME, body);
|
|
this.didSendRequestEvent = true;
|
|
}
|
|
|
|
handleMessage(pattern, channel, message) {
|
|
const parsedMessage = JSON.parse(message);
|
|
const { ignored: ignoredMessages, async } = this.config;
|
|
const eventName = parsedMessage.core.header.name;
|
|
|
|
if (ignoredMessages.includes(channel)
|
|
|| ignoredMessages.includes(eventName)) {
|
|
if (eventName === 'CheckAlivePongSysMsg') {
|
|
return;
|
|
}
|
|
if (this.redisDebugEnabled) {
|
|
Logger.debug(`Redis: ${eventName} skipped`);
|
|
}
|
|
return;
|
|
}
|
|
|
|
if (this.redisDebugEnabled) {
|
|
Logger.warn('Received event to handle', { date: new Date().toISOString(), eventName });
|
|
}
|
|
|
|
// System messages like Create / Destroy Meeting, etc do not have core.header.meetingId.
|
|
// Process them in MeetingQueue['_'] --- the NO_MEETING queueId
|
|
const meetingIdFromMessageCoreHeader = parsedMessage.core.header.meetingId || NO_MEETING_ID;
|
|
|
|
if (this.role === 'frontend') {
|
|
// receiving this message means we need to look at it. Frontends do not have instanceId.
|
|
if (meetingIdFromMessageCoreHeader === NO_MEETING_ID) { // if this is a system message
|
|
if (eventName === 'MeetingCreatedEvtMsg' || eventName === 'SyncGetMeetingInfoRespMsg') {
|
|
const meetingIdFromMessageMeetingProp = parsedMessage.core.body.props.meetingProp.intId;
|
|
this.meetingsQueues[meetingIdFromMessageMeetingProp] = new MeetingMessageQueue(this.emitter, async, this.redisDebugEnabled);
|
|
if (this.redisDebugEnabled) {
|
|
Logger.warn('Created frontend queue for meeting', { date: new Date().toISOString(), eventName, meetingIdFromMessageMeetingProp });
|
|
}
|
|
}
|
|
}
|
|
|
|
if (!this.meetingsQueues[meetingIdFromMessageCoreHeader]) {
|
|
Logger.warn(`Frontend meeting queue had not been initialized ${message}`, { eventName, meetingIdFromMessageCoreHeader });
|
|
this.meetingsQueues[NO_MEETING_ID].add({
|
|
pattern,
|
|
channel,
|
|
eventName,
|
|
parsedMessage,
|
|
});
|
|
} else {
|
|
// process the event - whether it's a system message or not, the meetingIdFromMessageCoreHeader value is adjusted
|
|
this.meetingsQueues[meetingIdFromMessageCoreHeader].add({
|
|
pattern,
|
|
channel,
|
|
eventName,
|
|
parsedMessage,
|
|
});
|
|
}
|
|
} else { // backend
|
|
if (meetingIdFromMessageCoreHeader === NO_MEETING_ID) { // if this is a system message
|
|
const meetingIdFromMessageMeetingProp = parsedMessage.core.body.props?.meetingProp?.intId;
|
|
const instanceIdFromMessage = getInstanceIdFromMessage(parsedMessage);
|
|
|
|
if (this.instanceId === instanceIdFromMessage) {
|
|
// create queue or destroy queue
|
|
if (eventName === 'MeetingCreatedEvtMsg' || eventName === 'SyncGetMeetingInfoRespMsg') {
|
|
this.meetingsQueues[meetingIdFromMessageMeetingProp] = new MeetingMessageQueue(this.emitter, async, this.redisDebugEnabled);
|
|
if (this.redisDebugEnabled) {
|
|
Logger.warn('Created backend queue for meeting', { date: new Date().toISOString(), eventName, meetingIdFromMessageMeetingProp });
|
|
}
|
|
}
|
|
this.meetingsQueues[NO_MEETING_ID].add({
|
|
pattern,
|
|
channel,
|
|
eventName,
|
|
parsedMessage,
|
|
});
|
|
} else {
|
|
if (eventName === 'MeetingEndedEvtMsg' || eventName === 'MeetingDestroyedEvtMsg') {
|
|
// MeetingEndedEvtMsg does not follow the system message pattern for meetingId
|
|
// but we still need to process it on the backend which is processing the rest of the events
|
|
// for this meetingId (it does not contain instanceId either, so we cannot compare that)
|
|
const meetingIdForMeetingEnded = parsedMessage.core.body.meetingId;
|
|
if (!!this.meetingsQueues[meetingIdForMeetingEnded]) {
|
|
this.meetingsQueues[NO_MEETING_ID].add({
|
|
pattern,
|
|
channel,
|
|
eventName,
|
|
parsedMessage,
|
|
});
|
|
}
|
|
}
|
|
// ignore
|
|
}
|
|
} else {
|
|
// add to existing queue
|
|
if (!!this.meetingsQueues[meetingIdFromMessageCoreHeader]) {
|
|
// only handle message if we have a queue for the meeting. If we don't have a queue, it means it's for a different instanceId
|
|
this.meetingsQueues[meetingIdFromMessageCoreHeader].add({
|
|
pattern,
|
|
channel,
|
|
eventName,
|
|
parsedMessage,
|
|
});
|
|
} else {
|
|
// If we reach this line, this means that there is no existing queue for this redis "backend" message
|
|
// which means that the meeting is fully handled by another bbb-html5-backend.
|
|
// Logger.warn('Backend meeting queue had not been initialized', { eventName, meetingIdFromMessageCoreHeader })
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
destroyMeetingQueue(id) {
|
|
delete this.meetingsQueues[id];
|
|
}
|
|
|
|
on(...args) {
|
|
return this.emitter.on(...args);
|
|
}
|
|
|
|
publishVoiceMessage(channel, eventName, voiceConf, payload) {
|
|
const header = {
|
|
name: eventName,
|
|
voiceConf,
|
|
};
|
|
|
|
const envelope = makeEnvelope(channel, eventName, header, payload);
|
|
|
|
return this.pub.publish(channel, envelope, RedisPubSub.handlePublishError);
|
|
}
|
|
|
|
publishSystemMessage(channel, eventName, payload) {
|
|
const header = {
|
|
name: eventName,
|
|
};
|
|
|
|
const envelope = makeEnvelope(channel, eventName, header, payload);
|
|
|
|
return this.pub.publish(channel, envelope, RedisPubSub.handlePublishError);
|
|
}
|
|
|
|
publishMeetingMessage(channel, eventName, meetingId, payload) {
|
|
const header = {
|
|
name: eventName,
|
|
meetingId,
|
|
};
|
|
|
|
const envelope = makeEnvelope(channel, eventName, header, payload);
|
|
|
|
return this.pub.publish(channel, envelope, RedisPubSub.handlePublishError);
|
|
}
|
|
|
|
publishUserMessage(channel, eventName, meetingId, userId, payload) {
|
|
const header = {
|
|
name: eventName,
|
|
meetingId,
|
|
userId,
|
|
};
|
|
|
|
if (!meetingId || !userId) {
|
|
Logger.warn(`Publishing ${eventName} with potentially missing data userId=${userId} meetingId=${meetingId}`);
|
|
}
|
|
const envelope = makeEnvelope(channel, eventName, header, payload, { meetingId, userId });
|
|
|
|
return this.pub.publish(channel, envelope, RedisPubSub.handlePublishError);
|
|
}
|
|
}
|
|
|
|
const RedisPubSubSingleton = new RedisPubSub();
|
|
|
|
Meteor.startup(() => {
|
|
const REDIS_CONFIG = Meteor.settings.private.redis;
|
|
|
|
RedisPubSubSingleton.updateConfig(REDIS_CONFIG);
|
|
RedisPubSubSingleton.init();
|
|
});
|
|
|
|
export default RedisPubSubSingleton;
|