Now batch publisher sends a ping to server before publishing and create a new connection if error.

Batch publisher and subscriber logs (if debug enabled) both outcoming and incoming messages to give more visibility.
This commit is contained in:
Daniel García Aubert 2016-07-07 10:44:17 +02:00
parent c592d77916
commit 74d83a457e
4 changed files with 85 additions and 10 deletions

View File

@ -1,12 +1,55 @@
'use strict';
function JobPublisher(redis) {
this.channel = 'batch:hosts';
this.client = redis.createClient(global.settings.redis_port, global.settings.redis_host);
var debug = require('./util/debug')('pubsub');
var redisServer = global.settings.redis_host + ':' + global.settings.redis_port;
function onReady() {
debug('redis publisher connected to ' + redisServer);
}
function onError(err) {
debug('redis publisher connection error: ' + err.message);
}
function onEnd() {
debug('redis publisher connection ends');
}
function onReconnect() {
debug('redis publisher reconnecting to ' + redisServer);
}
function JobPublisher(redis) {
this.redis = redis;
this.channel = 'batch:hosts';
this._createClient();
}
JobPublisher.prototype._createClient = function () {
if (this.client && this.client.connected) {
this.client.end(true);
}
this.client = this.redis.createClient(global.settings.redis_port, global.settings.redis_host);
this.client.on('ready', onReady);
this.client.on('error', onError);
this.client.on('end', onEnd);
this.client.on('reconnecting', onReconnect);
};
JobPublisher.prototype.publish = function (host) {
this.client.publish(this.channel, host);
var self = this;
this.client.ping(function (err) {
if (err) {
debug('Error sending a ping to server: ' + err.message);
self._createClient();
}
debug('publish to ' + self.channel + ':' + host);
self.client.publish(self.channel, host);
});
};
module.exports = JobPublisher;

View File

@ -1,9 +1,27 @@
'use strict';
var debug = require('./util/debug')('job-subscriber');
var debug = require('./util/debug')('pubsub');
var SUBSCRIBE_INTERVAL_IN_MILLISECONDS = 10 * 60 * 1000; // 10 minutes
var redisServer = global.settings.redis_host + ':' + global.settings.redis_port;
function onReady() {
debug('redis subscriber connected to ' + redisServer);
}
function onError(err) {
debug('redis subscriber connection error: ' + err.message);
}
function onEnd() {
debug('redis subscriber connection ends');
}
function onReconnect() {
debug('redis subscriber reconnecting to ' + redisServer);
}
function _subscribe(client, channel, queueSeeker, onMessage) {
queueSeeker.seek(onMessage, function (err) {
if (err) {
debug(err);
@ -12,14 +30,24 @@ function _subscribe(client, channel, queueSeeker, onMessage) {
client.removeAllListeners('message');
client.unsubscribe(channel);
client.subscribe(channel);
client.on('message', onMessage);
client.on('message', function (channel, host) {
debug('message received from: ' + channel + ':' + host);
onMessage(channel, host);
});
});
}
function JobSubscriber(redis, queueSeeker) {
this.channel = 'batch:hosts';
this.client = redis.createClient(global.settings.redis_port, global.settings.redis_host);
this.queueSeeker = queueSeeker;
this.client = redis.createClient(global.settings.redis_port, global.settings.redis_host);
this.client.on('ready', onReady);
this.client.on('error', onError);
this.client.on('end', onEnd);
this.client.on('reconnecting', onReconnect);
}
module.exports = JobSubscriber;

View File

@ -13,6 +13,10 @@ describe('batch API job publisher', function () {
var isValidFirstArg = arguments[0] === 'batch:hosts';
var isValidSecondArg = arguments[1] === self.host;
self.redis.publishIsCalledWithValidArgs = isValidFirstArg && isValidSecondArg;
},
on: function () {},
ping: function (cb) {
cb();
}
};

View File

@ -14,9 +14,9 @@ describe('batch API job subscriber', function () {
self.redis.subscribeIsCalledWithValidArgs = isValidFirstArg;
},
on: function () {
var isValidFirstArg = arguments[0] === 'message';
var isValidSecondArg = arguments[1] === self.onMessageListener;
self.redis.onIsCalledWithValidArgs = isValidFirstArg && isValidSecondArg;
if (arguments[0] === 'message') {
self.redis.onIsCalledWithValidArgs = true;
}
},
unsubscribe: function () {
var isValidFirstArg = arguments[0] === 'batch:hosts';