Use constants for queues

This commit is contained in:
Raul Ochoa 2016-10-12 17:53:03 +02:00
parent 189aff2aa9
commit f7d1f9426c
2 changed files with 17 additions and 15 deletions

View File

@ -3,14 +3,20 @@
function JobQueue(metadataBackend, jobPublisher) { function JobQueue(metadataBackend, jobPublisher) {
this.metadataBackend = metadataBackend; this.metadataBackend = metadataBackend;
this.jobPublisher = jobPublisher; this.jobPublisher = jobPublisher;
this.db = 5;
this.redisPrefix = 'batch:queues:';
} }
module.exports = JobQueue;
var QUEUE = {
DB: 5,
PREFIX: 'batch:queues:'
};
module.exports.QUEUE = QUEUE;
JobQueue.prototype.enqueue = function (job_id, host, callback) { JobQueue.prototype.enqueue = function (job_id, host, callback) {
var self = this; var self = this;
this.metadataBackend.redisCmd(this.db, 'LPUSH', [ this.redisPrefix + host, job_id ], function (err) { this.metadataBackend.redisCmd(QUEUE.DB, 'LPUSH', [ QUEUE.PREFIX + host, job_id ], function (err) {
if (err) { if (err) {
return callback(err); return callback(err);
} }
@ -21,11 +27,9 @@ JobQueue.prototype.enqueue = function (job_id, host, callback) {
}; };
JobQueue.prototype.dequeue = function (host, callback) { JobQueue.prototype.dequeue = function (host, callback) {
this.metadataBackend.redisCmd(this.db, 'RPOP', [ this.redisPrefix + host ], callback); this.metadataBackend.redisCmd(QUEUE.DB, 'RPOP', [ QUEUE.PREFIX + host ], callback);
}; };
JobQueue.prototype.enqueueFirst = function (job_id, host, callback) { JobQueue.prototype.enqueueFirst = function (job_id, host, callback) {
this.metadataBackend.redisCmd(this.db, 'RPUSH', [ this.redisPrefix + host, job_id ], callback); this.metadataBackend.redisCmd(QUEUE.DB, 'RPUSH', [ QUEUE.PREFIX + host, job_id ], callback);
}; };
module.exports = JobQueue;

View File

@ -1,10 +1,8 @@
'use strict'; 'use strict';
var QUEUE = require('../job_queue').QUEUE;
function QueueSeeker(pool) { function QueueSeeker(pool) {
this.db = 5;
this.channel = 'batch:hosts';
this.redisPrefix = 'batch:queues:';
this.pattern = this.redisPrefix + '*';
this.pool = pool; this.pool = pool;
} }
@ -18,9 +16,9 @@ QueueSeeker.prototype.seek = function (callback) {
QueueSeeker.prototype._seek = function (cursor, hosts, callback) { QueueSeeker.prototype._seek = function (cursor, hosts, callback) {
var self = this; var self = this;
var redisParams = [cursor[0], 'MATCH', self.pattern]; var redisParams = [cursor[0], 'MATCH', QUEUE.PREFIX + '*'];
this.pool.acquire(this.db, function(err, client) { this.pool.acquire(QUEUE.DB, function(err, client) {
if (err) { if (err) {
return callback(err); return callback(err);
} }
@ -28,7 +26,7 @@ QueueSeeker.prototype._seek = function (cursor, hosts, callback) {
client.scan(redisParams, function(err, currentCursor) { client.scan(redisParams, function(err, currentCursor) {
// checks if iteration has ended // checks if iteration has ended
if (currentCursor[0] === '0') { if (currentCursor[0] === '0') {
self.pool.release(self.db, client); self.pool.release(QUEUE.DB, client);
return callback(null, Object.keys(hosts)); return callback(null, Object.keys(hosts));
} }
@ -39,7 +37,7 @@ QueueSeeker.prototype._seek = function (cursor, hosts, callback) {
} }
queues.forEach(function (queue) { queues.forEach(function (queue) {
var host = queue.substr(self.redisPrefix.length); var host = queue.substr(QUEUE.PREFIX.length);
hosts[host] = true; hosts[host] = true;
}); });