make publish function private to RedisPubSub2x class

This commit is contained in:
KDSBrowne 2017-09-13 08:29:23 -07:00
parent 4b7d790a57
commit 41b629b232

View File

@ -5,80 +5,12 @@ import { EventEmitter2 } from 'eventemitter2';
import { check } from 'meteor/check'; import { check } from 'meteor/check';
import Logger from './logger'; import Logger from './logger';
class RedisPubSub2x { const RedisPubSub2xWrapper = ( () => {
constructor(config = {}) { this.pub = Redis.createClient();
this.config = config;
this.didSendRequestEvent = false;
this.pub = Redis.createClient();
this.sub = Redis.createClient();
this.emitter = new EventEmitter2();
this.queue = new PowerQueue();
this.handleTask = this.handleTask.bind(this);
this.handleSubscribe = this.handleSubscribe.bind(this);
this.handleMessage = this.handleMessage.bind(this);
}
init() {
this.queue.taskHandler = this.handleTask;
this.sub.on('psubscribe', Meteor.bindEnvironment(this.handleSubscribe));
this.sub.on('pmessage', Meteor.bindEnvironment(this.handleMessage));
this.queue.reset();
this.sub.psubscribe(this.config.channels.fromAkkaApps); // 2.0
this.sub.psubscribe(this.config.channels.toHTML5); // 2.0
Logger.info(`Subscribed to '${this.config.channels.fromAkkaApps}'`);
}
updateConfig(config) {
this.config = Object.assign({}, this.config, config);
}
on(...args) {
return this.emitter.on(...args);
}
publishVoiceMessage(channel, eventName, voiceConf, payload) {
const header = {
name: eventName,
voiceConf
}
this.publish(channel, eventName, header, payload);
}
publishSystemMessage(channel, eventName, payload) {
const header = {
name: eventName
}
this.publish(channel, eventName, header, payload);
}
publishMeetingMessage(channel, eventName, meetingId, payload) {
const header = {
name: eventName,
meetingId
}
this.publish(channel, eventName, header, payload);
}
publishUserMessage(channel, eventName, meetingId, userId, payload) {
const header = {
name: eventName,
meetingId,
userId
}
this.publish(channel, eventName, header, payload);
}
// this publish function should only be called from either publishSystemMessage, // this publish function should only be called from either publishSystemMessage,
// publishMeetingMessage or publishUserMessage // publishMeetingMessage, publishVoiceMessage or publishUserMessage
publish(channel, eventName, header, payload) { const publish = (channel, eventName, header, body) => {
const envelope = { const envelope = {
envelope: { envelope: {
name: eventName, name: eventName,
@ -89,11 +21,11 @@ class RedisPubSub2x {
}, },
core: { core: {
header, header,
body: payload, body,
} }
}; };
Logger.warn(`<<<<<<Publishing 2.0 ${eventName} to ${channel} ${JSON.stringify(envelope)}`); Logger.warn(`<<<<<<Publishing 2.0 ${eventName} to ${channel} ${JSON.stringify(envelope)}`);
return this.pub.publish(channel, JSON.stringify(envelope), (err) => { return this.pub.publish(channel, JSON.stringify(envelope), (err) => {
if (err) { if (err) {
Logger.error('Tried to publish to %s', channel, envelope); Logger.error('Tried to publish to %s', channel, envelope);
@ -101,85 +33,158 @@ class RedisPubSub2x {
}); });
} }
handleSubscribe() { class RedisPubSub2x {
if (this.didSendRequestEvent) return; constructor(config = {}) {
this.config = config;
// populate collections with pre-existing data
const REDIS_CONFIG = Meteor.settings.redis; this.didSendRequestEvent = false;
const CHANNEL = REDIS_CONFIG.channels.toAkkaApps; this.sub = Redis.createClient();
const EVENT_NAME = 'GetAllMeetingsReqMsg'; this.emitter = new EventEmitter2();
this.queue = new PowerQueue();
const body = {
requesterId: 'nodeJSapp', this.handleTask = this.handleTask.bind(this);
}; this.handleSubscribe = this.handleSubscribe.bind(this);
this.handleMessage = this.handleMessage.bind(this);
this.publishSystemMessage(CHANNEL, EVENT_NAME, body);
this.didSendRequestEvent = true;
}
handleMessage(pattern, channel, message) {
Logger.warn(`2.0 handleMessage: ${message}`);
const REDIS_CONFIG = Meteor.settings.redis;
const { fromAkkaApps, toHTML5 } = REDIS_CONFIG.channels;
const parsedMessage = JSON.parse(message);
const { header } = parsedMessage.core;
const eventName = header.name;
Logger.info(`2.0 QUEUE | PROGRESS ${this.queue.progress()}% | LENGTH ${this.queue.length()}} ${eventName} | CHANNEL ${channel}`);
const regex = new RegExp(fromAkkaApps);
// We should only handle messages from this two channels, else, we simple ignore them.
if (!regex.test(channel) && channel !== toHTML5) {
Logger.warn(`The following message was ignored: CHANNEL ${channel} MESSAGE ${message}`);
return;
} }
this.queue.add({
pattern, init() {
channel, this.queue.taskHandler = this.handleTask;
eventName, this.sub.on('psubscribe', Meteor.bindEnvironment(this.handleSubscribe));
parsedMessage, this.sub.on('pmessage', Meteor.bindEnvironment(this.handleMessage));
});
} this.queue.reset();
this.sub.psubscribe(this.config.channels.fromAkkaApps); // 2.0
handleTask(data, next) { this.sub.psubscribe(this.config.channels.toHTML5); // 2.0
const { header } = data.parsedMessage.core;
const { body } = data.parsedMessage.core; Logger.info(`Subscribed to '${this.config.channels.fromAkkaApps}'`);
const { envelope } = data.parsedMessage; }
const eventName = header.name;
const meetingId = header.meetingId; updateConfig(config) {
this.config = Object.assign({}, this.config, config);
check(eventName, String); }
check(body, Object);
on(...args) {
try { return this.emitter.on(...args);
this._debug(`${eventName} emitted`); }
return this.emitter
.emitAsync(eventName, { envelope, header, body }, meetingId) publishVoiceMessage(channel, eventName, voiceConf, payload) {
.then(() => { const header = {
this._debug(`${eventName} completed`); name: eventName,
return next(); voiceConf
}) }
.catch((reason) => {
this._debug(`${eventName} completed with error`); return publish(channel, eventName, header, payload);
Logger.error(`${eventName}: ${reason}`); }
return next();
}); publishSystemMessage(channel, eventName, payload) {
} catch (reason) { const header = {
this._debug(`${eventName} completed with error`); name: eventName
Logger.error(`${eventName}: ${reason}`); }
return next();
return publish(channel, eventName, header, payload);
}
publishMeetingMessage(channel, eventName, meetingId, payload) {
const header = {
name: eventName,
meetingId
}
return publish(channel, eventName, header, payload);
}
publishUserMessage(channel, eventName, meetingId, userId, payload) {
const header = {
name: eventName,
meetingId,
userId
}
return publish(channel, eventName, header, payload);
}
handleSubscribe() {
if (this.didSendRequestEvent) return;
// populate collections with pre-existing data
const REDIS_CONFIG = Meteor.settings.redis;
const CHANNEL = REDIS_CONFIG.channels.toAkkaApps;
const EVENT_NAME = 'GetAllMeetingsReqMsg';
const body = {
requesterId: 'nodeJSapp',
};
this.publishSystemMessage(CHANNEL, EVENT_NAME, body);
this.didSendRequestEvent = true;
}
handleMessage(pattern, channel, message) {
Logger.warn(`2.0 handleMessage: ${message}`);
const REDIS_CONFIG = Meteor.settings.redis;
const { fromAkkaApps, toHTML5 } = REDIS_CONFIG.channels;
const parsedMessage = JSON.parse(message);
const { header } = parsedMessage.core;
const eventName = header.name;
Logger.info(`2.0 QUEUE | PROGRESS ${this.queue.progress()}% | LENGTH ${this.queue.length()}} ${eventName} | CHANNEL ${channel}`);
const regex = new RegExp(fromAkkaApps);
// We should only handle messages from this two channels, else, we simple ignore them.
if (!regex.test(channel) && channel !== toHTML5) {
Logger.warn(`The following message was ignored: CHANNEL ${channel} MESSAGE ${message}`);
return;
}
this.queue.add({
pattern,
channel,
eventName,
parsedMessage,
});
}
handleTask(data, next) {
const { header } = data.parsedMessage.core;
const { body } = data.parsedMessage.core;
const { envelope } = data.parsedMessage;
const eventName = header.name;
const meetingId = header.meetingId;
check(eventName, String);
check(body, Object);
try {
this._debug(`${eventName} emitted`);
return this.emitter
.emitAsync(eventName, { envelope, header, body }, meetingId)
.then(() => {
this._debug(`${eventName} completed`);
return next();
})
.catch((reason) => {
this._debug(`${eventName} completed with error`);
Logger.error(`${eventName}: ${reason}`);
return next();
});
} catch (reason) {
this._debug(`${eventName} completed with error`);
Logger.error(`${eventName}: ${reason}`);
return next();
}
}
_debug(message) {
if (this.config.debug) {
Logger.info(message);
}
} }
} }
return RedisPubSub2x;
})();
_debug(message) {
if (this.config.debug) {
Logger.info(message);
}
}
}
const RedisPubSubSingleton = new RedisPubSub2x(); const RedisPubSubSingleton = new RedisPubSub2xWrapper;
Meteor.startup(() => { Meteor.startup(() => {
const REDIS_CONFIG = Meteor.settings.redis; const REDIS_CONFIG = Meteor.settings.redis;