/* global PowerQueue */ import Redis from 'redis'; import { Meteor } from 'meteor/meteor'; import { EventEmitter2 } from 'eventemitter2'; import { check } from 'meteor/check'; import Logger from './logger'; // Fake meetingId used for messages that have no meetingId const NO_MEETING_ID = '_'; const makeEnvelope = (channel, eventName, header, body, routing) => { const envelope = { envelope: { name: eventName, routing: routing || { sender: 'bbb-apps-akka', // sender: 'html5-server', // TODO }, timestamp: Date.now(), }, core: { header, body, }, }; return JSON.stringify(envelope); }; const makeDebugger = enabled => (message) => { if (!enabled) return; Logger.debug(`REDIS: ${message}`); }; class MeetingMessageQueue { constructor(eventEmitter, asyncMessages = [], debug = () => { }) { this.asyncMessages = asyncMessages; this.emitter = eventEmitter; this.queue = new PowerQueue(); this.debug = debug; this.handleTask = this.handleTask.bind(this); this.queue.taskHandler = this.handleTask; } handleTask(data, next) { const { channel } = data; const { envelope } = data.parsedMessage; const { header } = data.parsedMessage.core; const { body } = data.parsedMessage.core; const { meetingId } = header; const eventName = header.name; const isAsync = this.asyncMessages.includes(channel) || this.asyncMessages.includes(eventName); let called = false; check(eventName, String); check(body, Object); const callNext = () => { if (called) return; this.debug(`${eventName} completed ${isAsync ? 'async' : 'sync'}`); called = true; const queueLength = this.queue.length(); if (queueLength > 100) { Logger.error(`prev queue size=${queueLength} `); } next(); }; const onError = (reason) => { Logger.error(`${eventName}: ${reason.stack ? reason.stack : reason}`); callNext(); }; try { this.debug(`${JSON.stringify(data.parsedMessage.core)} emitted`); if (isAsync) { callNext(); } this.emitter .emitAsync(eventName, { envelope, header, body }, meetingId) .then(callNext) .catch(onError); } catch (reason) { onError(reason); } } add(...args) { return this.queue.add(...args); } } class RedisPubSub { static handlePublishError(err) { if (err) { Logger.error(err); } } constructor(config = {}) { this.config = config; this.didSendRequestEvent = false; const host = process.env.REDIS_HOST || Meteor.settings.private.redis.host; const redisConf = Meteor.settings.private.redis; this.instanceMax = parseInt(process.env.INSTANCE_MAX || 1); this.instanceId = parseInt(process.env.INSTANCE_ID) || 1; const { password, port } = redisConf; if (password) { this.pub = Redis.createClient({ host, port, password }); this.sub = Redis.createClient({ host, port, password }); this.pub.auth(password); this.sub.auth(password); } else { this.pub = Redis.createClient({ host, port }); this.sub = Redis.createClient({ host, port }); } this.emitter = new EventEmitter2(); this.mettingsQueues = {}; this.mettingsQueues[NO_MEETING_ID] = new MeetingMessageQueue(this.emitter, this.config.async, this.config.debug); this.handleSubscribe = this.handleSubscribe.bind(this); this.handleMessage = this.handleMessage.bind(this); this.debug = makeDebugger(this.config.debug); } init() { this.sub.on('psubscribe', Meteor.bindEnvironment(this.handleSubscribe)); this.sub.on('pmessage', Meteor.bindEnvironment(this.handleMessage)); const channelsToSubscribe = this.config.subscribeTo; channelsToSubscribe.forEach((channel) => { this.sub.psubscribe(channel); }); this.debug(`Subscribed to '${channelsToSubscribe}'`); } updateConfig(config) { this.config = Object.assign({}, this.config, config); this.debug = makeDebugger(this.config.debug); } // TODO: Move this out of this class, maybe pass as a callback to init? handleSubscribe() { if (this.didSendRequestEvent) return; // populate collections with pre-existing data const REDIS_CONFIG = Meteor.settings.private.redis; const CHANNEL = REDIS_CONFIG.channels.toAkkaApps; const EVENT_NAME = 'GetAllMeetingsReqMsg'; const body = { requesterId: 'nodeJSapp', }; this.publishSystemMessage(CHANNEL, EVENT_NAME, body); this.didSendRequestEvent = true; } handleMessage(pattern, channel, message) { const parsedMessage = JSON.parse(message); const { name: eventName, meetingId } = parsedMessage.core.header; const { ignored: ignoredMessages, async } = this.config; if (ignoredMessages.includes(channel) || ignoredMessages.includes(eventName)) { if (eventName === 'CheckAlivePongSysMsg') { return; } this.debug(`${eventName} skipped`); return; } const queueId = meetingId || NO_MEETING_ID; if (eventName === 'MeetingCreatedEvtMsg'){ const newIntId = parsedMessage.core.body.props.meetingProp.intId; const metadata = parsedMessage.core.body.props.metadataProp.metadata; const instanceId = parseInt(metadata['bbb-meetinginstance']) || 1; Logger.warn("MeetingCreatedEvtMsg received with meetingInstance: " + instanceId + " -- this is instance: " + this.instanceId); if (instanceId === this.instanceId){ this.mettingsQueues[newIntId] = new MeetingMessageQueue(this.emitter, async, this.debug); } else { // Logger.error('THIS NODEJS ' + this.instanceId + ' IS **NOT** PROCESSING EVENTS FOR THIS MEETING ' + instanceId) } } if (queueId in this.mettingsQueues) { this.mettingsQueues[queueId].add({ pattern, channel, eventName, parsedMessage, }); } //else { //Logger.info("Skipping redis message for " + queueId); //} } destroyMeetingQueue(id) { delete this.mettingsQueues[id]; } on(...args) { return this.emitter.on(...args); } publishVoiceMessage(channel, eventName, voiceConf, payload) { const header = { name: eventName, voiceConf, }; const envelope = makeEnvelope(channel, eventName, header, payload); return this.pub.publish(channel, envelope, RedisPubSub.handlePublishError); } publishSystemMessage(channel, eventName, payload) { const header = { name: eventName, }; const envelope = makeEnvelope(channel, eventName, header, payload); return this.pub.publish(channel, envelope, RedisPubSub.handlePublishError); } publishMeetingMessage(channel, eventName, meetingId, payload) { const header = { name: eventName, meetingId, }; const envelope = makeEnvelope(channel, eventName, header, payload); return this.pub.publish(channel, envelope, RedisPubSub.handlePublishError); } publishUserMessage(channel, eventName, meetingId, userId, payload) { const header = { name: eventName, meetingId, userId, }; if (!meetingId || !userId) { Logger.warn(`Publishing ${eventName} with potentially missing data userId=${userId} meetingId=${meetingId}`); } const envelope = makeEnvelope(channel, eventName, header, payload, { meetingId, userId }); return this.pub.publish(channel, envelope, RedisPubSub.handlePublishError); } } const RedisPubSubSingleton = new RedisPubSub(); Meteor.startup(() => { const REDIS_CONFIG = Meteor.settings.private.redis; RedisPubSubSingleton.updateConfig(REDIS_CONFIG); RedisPubSubSingleton.init(); }); export default RedisPubSubSingleton;