diff --git a/batch/batch.js b/batch/batch.js index 04cd9ecb..0a12051e 100644 --- a/batch/batch.js +++ b/batch/batch.js @@ -7,134 +7,106 @@ var forever = require('./util/forever'); var queue = require('queue-async'); var Locker = require('./leader/locker'); -function Batch(name, jobSubscriber, jobQueuePool, jobRunner, jobService, jobPublisher, redisConfig, logger) { +function Batch(name, jobSubscriber, jobQueue, jobRunner, jobService, jobPublisher, redisConfig, logger) { EventEmitter.call(this); this.name = name || 'batch'; this.jobSubscriber = jobSubscriber; - this.jobQueuePool = jobQueuePool; + this.jobQueue = jobQueue; this.jobRunner = jobRunner; this.jobService = jobService; this.jobPublisher = jobPublisher; this.logger = logger; this.locker = Locker.create('redis-distlock', { redisConfig: redisConfig }); + + // map: host => jobId + this.workingQueues = {}; } util.inherits(Batch, EventEmitter); module.exports = Batch; Batch.prototype.start = function () { - this._subscribe(); -}; - -Batch.prototype._subscribe = function () { var self = this; - this.jobSubscriber.subscribe(function onJobHandler(host) { - var queue = self.jobQueuePool.getQueue(host); - - // there is nothing to do. It is already running jobs - if (queue) { - return; - } - queue = self.jobQueuePool.createQueue(host); - - // do forever, it does not throw a stack overflow - forever(function (next) { - self._consumeJobs(host, queue, next); - }, function (err) { - self.jobQueuePool.removeQueue(host); - - if (err.name === 'EmptyQueue') { - return debug(err.message); + this.jobSubscriber.subscribe( + function onJobHandler(host) { + if (self.isProcessingHost(host)) { + return debug('%s is already processing host=%s', self.name, host); } - debug(err); - }); - }, function (err) { - if (err) { - return self.emit('error', err); - } + // do forever, it does not throw a stack overflow + forever( + function (next) { + self.locker.lock(host, 5000, function(err) { + // we didn't get the lock for the host + if (err) { + debug('Could not lock host=%s from %s. Reason: %s', host, self.name, err.message); + return next(err); + } + debug('Locked host=%s from %s', host, self.name); + self.processNextJob(host, next); + }); + }, + function (err) { + debug(err); + self.finishedProcessingHost(host); + self.locker.unlock(host, debug); + } + ); + }, + function onJobSubscriberReady(err) { + if (err) { + return self.emit('error', err); + } - self.emit('ready'); - }); + self.emit('ready'); + } + ); }; - -Batch.prototype._consumeJobs = function (host, queue, callback) { +Batch.prototype.processNextJob = function (host, callback) { var self = this; - this.locker.lock(host, 5000, function(err) { - // we didn't get the lock for the host + self.jobQueue.dequeue(host, function (err, jobId) { if (err) { - debug('On de-queue could not lock host=%s from %s. Reason: %s', host, self.name, err.message); - // In case we have lost the lock but there are pending jobs we re-announce the host - self.jobPublisher.publish(host); - return callback(new Error('Could not acquire lock for host=' + host)); + return callback(err); } - debug('On de-queue locked host=%s from %s', host, self.name); + if (!jobId) { + var emptyQueueError = new Error('Queue ' + host + ' is empty'); + emptyQueueError.name = 'EmptyQueue'; + return callback(emptyQueueError); + } - var lockRenewalIntervalId = setInterval(function() { - debug('Trying to extend lock host=%s', host); - self.locker.lock(host, 5000, function(err, _lock) { - if (err) { - clearInterval(lockRenewalIntervalId); - return callback(err); - } - if (!err && _lock) { - debug('Extended lock host=%s', host); - } - }); - }, 1000); + self.setProcessingJobId(host, jobId); + + self.jobRunner.run(jobId, function (err, job) { + self.setProcessingJobId(host, null); - queue.dequeue(host, function (err, job_id) { if (err) { + debug(err); + if (err.name === 'JobNotRunnable') { + return callback(); + } return callback(err); } - if (!job_id) { - clearInterval(lockRenewalIntervalId); - return self.locker.unlock(host, function() { - var emptyQueueError = new Error('Queue ' + host + ' is empty'); - emptyQueueError.name = 'EmptyQueue'; - return callback(emptyQueueError); - }); - } + debug('Job[%s] status=%s in host=%s (failed_reason=%s)', jobId, job.data.status, host, job.failed_reason); - self.jobQueuePool.setCurrentJobId(host, job_id); + self.logger.log(job); - self.jobRunner.run(job_id, function (err, job) { - self.jobQueuePool.removeCurrentJobId(host); + self.emit('job:' + job.data.status, jobId); - if (err && err.name === 'JobNotRunnable') { - debug(err.message); - clearInterval(lockRenewalIntervalId); - return callback(); - } - - if (err) { - clearInterval(lockRenewalIntervalId); - return callback(err); - } - - debug('Job[%s] status=%s in host=%s (error=%s)', job_id, job.data.status, host, job.failed_reason); - - self.logger.log(job); - - self.emit('job:' + job.data.status, job_id); - - clearInterval(lockRenewalIntervalId); - callback(); - }); + callback(); }); }); }; Batch.prototype.drain = function (callback) { var self = this; - var queues = this.jobQueuePool.list(); - var batchQueues = queue(queues.length); + var workingHosts = this.getWorkingHosts(); + var batchQueues = queue(workingHosts.length); - queues.forEach(function (host) { + workingHosts.forEach(function (host) { batchQueues.defer(self._drainJob.bind(self), host); }); @@ -151,7 +123,7 @@ Batch.prototype.drain = function (callback) { Batch.prototype._drainJob = function (host, callback) { var self = this; - var job_id = self.jobQueuePool.getCurrentJobId(host); + var job_id = this.getProcessingJobId(host); if (!job_id) { return process.nextTick(function () { @@ -159,8 +131,6 @@ Batch.prototype._drainJob = function (host, callback) { }); } - var queue = self.jobQueuePool.getQueue(host); - this.jobService.drain(job_id, function (err) { if (err && err.name === 'CancelNotAllowedError') { return callback(); @@ -170,10 +140,30 @@ Batch.prototype._drainJob = function (host, callback) { return callback(err); } - queue.enqueueFirst(job_id, host, callback); + self.jobQueue.enqueueFirst(job_id, host, callback); }); }; -Batch.prototype.stop = function () { - this.jobSubscriber.unsubscribe(); +Batch.prototype.stop = function (callback) { + this.jobSubscriber.unsubscribe(callback); +}; + +Batch.prototype.isProcessingHost = function(host) { + return this.workingQueues.hasOwnProperty(host); +}; + +Batch.prototype.getWorkingHosts = function() { + return Object.keys(this.workingQueues); +}; + +Batch.prototype.setProcessingJobId = function(host, jobId) { + this.workingQueues[host] = jobId; +}; + +Batch.prototype.getProcessingJobId = function(host) { + return this.workingQueues[host]; +}; + +Batch.prototype.finishedProcessingHost = function(host) { + delete this.workingQueues[host]; }; diff --git a/batch/index.js b/batch/index.js index d585e985..f8e823be 100644 --- a/batch/index.js +++ b/batch/index.js @@ -5,7 +5,6 @@ var _ = require('underscore'); var JobRunner = require('./job_runner'); var QueryRunner = require('./query_runner'); var JobCanceller = require('./job_canceller'); -var JobQueuePool = require('./job_queue_pool'); var JobSubscriber = require('./pubsub/job-subscriber'); var UserDatabaseMetadataService = require('./user_database_metadata_service'); var JobPublisher = require('./pubsub/job-publisher'); @@ -20,7 +19,6 @@ module.exports = function batchFactory (metadataBackend, redisConfig, name, stat var jobSubscriber = new JobSubscriber(pubSubRedisPool); var jobPublisher = new JobPublisher(pubSubRedisPool); - var jobQueuePool = new JobQueuePool(metadataBackend, jobPublisher); var jobQueue = new JobQueue(metadataBackend, jobPublisher); var jobBackend = new JobBackend(metadataBackend, jobQueue); var userDatabaseMetadataService = new UserDatabaseMetadataService(metadataBackend); @@ -33,7 +31,7 @@ module.exports = function batchFactory (metadataBackend, redisConfig, name, stat return new Batch( name, jobSubscriber, - jobQueuePool, + jobQueue, jobRunner, jobService, jobPublisher, diff --git a/batch/job_queue_pool.js b/batch/job_queue_pool.js deleted file mode 100644 index adbf45f5..00000000 --- a/batch/job_queue_pool.js +++ /dev/null @@ -1,56 +0,0 @@ -'use strict'; - -var JobQueue = require('./job_queue'); - -function JobQueuePool(metadataBackend, jobPublisher) { - this.metadataBackend = metadataBackend; - this.jobPublisher = jobPublisher; - this.queues = {}; -} - -JobQueuePool.prototype.get = function (host) { - return this.queues[host]; -}; - -JobQueuePool.prototype.getQueue = function (host) { - if (this.get(host)) { - return this.get(host).queue; - } -}; - -JobQueuePool.prototype.removeQueue = function (host) { - if (this.queues[host].queue) { - delete this.queues[host].queue; - } -}; - -JobQueuePool.prototype.list = function () { - return Object.keys(this.queues); -}; - -JobQueuePool.prototype.createQueue = function (host) { - this.queues[host] = { - queue: new JobQueue(this.metadataBackend, this.jobPublisher), - currentJobId: null - }; - - return this.getQueue(host); -}; - -JobQueuePool.prototype.setCurrentJobId = function (host, job_id) { - this.get(host).currentJobId = job_id; -}; - -JobQueuePool.prototype.getCurrentJobId = function (host) { - if (this.get(host).currentJobId) { - return this.get(host).currentJobId; - } -}; - -JobQueuePool.prototype.removeCurrentJobId = function (host) { - if (this.get(host).currentJobId) { - delete this.get(host).currentJobId; - } -}; - -module.exports = JobQueuePool;