Now Batch API looks for queues with jobs to consume at service initialization time

This commit is contained in:
dgaubert 2016-03-31 17:37:35 +02:00
parent c5610452f9
commit e8a44136c5
3 changed files with 31 additions and 5 deletions

View File

@ -13,7 +13,7 @@ var JobBackend = require('./job_backend');
var Batch = require('./batch');
module.exports = function batchFactory (metadataBackend) {
var jobSubscriber = new JobSubscriber(redis);
var jobSubscriber = new JobSubscriber(redis, metadataBackend);
var jobQueuePool = new JobQueuePool(metadataBackend);
var jobPublisher = new JobPublisher(redis);
var jobQueue = new JobQueue(metadataBackend);

View File

@ -1,13 +1,32 @@
'use strict';
function JobSubscriber(redis) {
function JobSubscriber(redis, metadataBackend) {
this.channel = 'batch:hosts';
this.client = redis.createClient(global.settings.redis_port, global.settings.redis_host);
this.metadataBackend = metadataBackend;
this.db = 5;
this.redisPrefix = 'batch:queues:';
}
JobSubscriber.prototype.subscribe = function (onMessage) {
this.client.subscribe(this.channel);
this.client.on('message', onMessage);
var self = this;
var redisParams = [ this.redisPrefix + '*' ];
self.metadataBackend.redisCmd(self.db, 'KEYS', redisParams , function (err, queues) {
if (err) {
console.error(err);
}
if (queues) {
queues.forEach(function (queue) {
var host = queue.substr(queue.lastIndexOf(':') + 1);
onMessage(self.channel, host);
});
}
self.client.subscribe(self.channel);
self.client.on('message', onMessage);
});
};
JobSubscriber.prototype.unsubscribe = function () {

View File

@ -23,8 +23,15 @@ describe('batch API job subscriber', function () {
self.redis.unsubscribeIsCalledWithValidArgs = isValidFirstArg;
}
};
this.metadataBackend = {
redisCmd: function () {
var callback = arguments[3];
this.jobSubscriber = new JobSubscriber(this.redis);
callback(null, []);
}
};
this.jobSubscriber = new JobSubscriber(this.redis, this.metadataBackend);
});
it('.subscribe() should listen for incoming messages', function () {