diff --git a/batch/job_queue.js b/batch/job_queue.js index 647b9c62..97916a8a 100644 --- a/batch/job_queue.js +++ b/batch/job_queue.js @@ -3,14 +3,20 @@ function JobQueue(metadataBackend, jobPublisher) { this.metadataBackend = metadataBackend; this.jobPublisher = jobPublisher; - this.db = 5; - this.redisPrefix = 'batch:queues:'; } +module.exports = JobQueue; + +var QUEUE = { + DB: 5, + PREFIX: 'batch:queues:' +}; +module.exports.QUEUE = QUEUE; + JobQueue.prototype.enqueue = function (job_id, host, callback) { var self = this; - this.metadataBackend.redisCmd(this.db, 'LPUSH', [ this.redisPrefix + host, job_id ], function (err) { + this.metadataBackend.redisCmd(QUEUE.DB, 'LPUSH', [ QUEUE.PREFIX + host, job_id ], function (err) { if (err) { return callback(err); } @@ -21,11 +27,9 @@ JobQueue.prototype.enqueue = function (job_id, host, callback) { }; JobQueue.prototype.dequeue = function (host, callback) { - this.metadataBackend.redisCmd(this.db, 'RPOP', [ this.redisPrefix + host ], callback); + this.metadataBackend.redisCmd(QUEUE.DB, 'RPOP', [ QUEUE.PREFIX + host ], callback); }; JobQueue.prototype.enqueueFirst = function (job_id, host, callback) { - this.metadataBackend.redisCmd(this.db, 'RPUSH', [ this.redisPrefix + host, job_id ], callback); + this.metadataBackend.redisCmd(QUEUE.DB, 'RPUSH', [ QUEUE.PREFIX + host, job_id ], callback); }; - -module.exports = JobQueue; diff --git a/batch/pubsub/queue-seeker.js b/batch/pubsub/queue-seeker.js index f7a8c88e..da137fb0 100644 --- a/batch/pubsub/queue-seeker.js +++ b/batch/pubsub/queue-seeker.js @@ -1,10 +1,8 @@ 'use strict'; +var QUEUE = require('../job_queue').QUEUE; + function QueueSeeker(pool) { - this.db = 5; - this.channel = 'batch:hosts'; - this.redisPrefix = 'batch:queues:'; - this.pattern = this.redisPrefix + '*'; this.pool = pool; } @@ -18,9 +16,9 @@ QueueSeeker.prototype.seek = function (callback) { QueueSeeker.prototype._seek = function (cursor, hosts, callback) { var self = this; - var redisParams = [cursor[0], 'MATCH', self.pattern]; + var redisParams = [cursor[0], 'MATCH', QUEUE.PREFIX + '*']; - this.pool.acquire(this.db, function(err, client) { + this.pool.acquire(QUEUE.DB, function(err, client) { if (err) { return callback(err); } @@ -28,7 +26,7 @@ QueueSeeker.prototype._seek = function (cursor, hosts, callback) { client.scan(redisParams, function(err, currentCursor) { // checks if iteration has ended if (currentCursor[0] === '0') { - self.pool.release(self.db, client); + self.pool.release(QUEUE.DB, client); return callback(null, Object.keys(hosts)); } @@ -39,7 +37,7 @@ QueueSeeker.prototype._seek = function (cursor, hosts, callback) { } queues.forEach(function (queue) { - var host = queue.substr(self.redisPrefix.length); + var host = queue.substr(QUEUE.PREFIX.length); hosts[host] = true; });