Improved queue consuption avoiding possible stack overflow

This commit is contained in:
Daniel García Aubert 2015-12-31 12:33:11 +01:00
parent ada39d84b8
commit ef354bdb4d
2 changed files with 59 additions and 30 deletions

View File

@ -6,6 +6,7 @@ var JobRunner = require('./job_runner');
var JobQueuePool = require('./job_queue_pool'); var JobQueuePool = require('./job_queue_pool');
var JobSubscriber = require('./job_subscriber'); var JobSubscriber = require('./job_subscriber');
var UserDatabaseMetadataService = require('./user_database_metadata_service'); var UserDatabaseMetadataService = require('./user_database_metadata_service');
var forever = require('./forever');
function Batch(metadataBackend) { function Batch(metadataBackend) {
EventEmitter.call(this); EventEmitter.call(this);
@ -21,41 +22,58 @@ Batch.prototype.start = function () {
this.jobSubscriber.subscribe(function (channel, host) { this.jobSubscriber.subscribe(function (channel, host) {
var queue = self.jobQueuePool.get(host); var queue = self.jobQueuePool.get(host);
if (!queue) { // there is nothing to do. It is already running jobs
queue = self.jobQueuePool.add(host); if (queue) {
consume(queue); return;
} }
function consume(queue) { queue = self.jobQueuePool.add(host);
queue.dequeue(host, function (err, job_id) {
if (err) {
self.jobQueuePool.remove(host);
return console.error(err);
}
if (!job_id) { // do forever, it does not cause a stack overflow
self.jobQueuePool.remove(host); forever(function (next) {
return console.log('Queue %s is empty', host); self._consume(host, queue, next);
} }, function (err) {
self.jobQueuePool.remove(host);
self.jobRunner.run(job_id) if (err.name === 'EmptyQueue') {
.on('done', function (job) { return console.log(err.message);
console.log('Job %s done in %s', job_id, host); }
self.emit('job:done', job.job_id);
consume(queue); // recursive call console.error(err);
}) });
.on('failed', function (job) { });
console.log('Job %s done in %s', job_id, host); };
self.emit('job:failed', job.job_id);
consume(queue); // recursive call Batch.prototype._consume = function consume(host, queue, callback) {
}) var self = this;
.on('error', function (err) {
console.error('Error in job ', err.message || err); queue.dequeue(host, function (err, job_id) {
self.emit('job:failed', job_id); if (err) {
self.jobQueuePool.remove(host); return callback(err);
}); }
if (!job_id) {
var emptyQueueError = new Error('Queue ' + host + ' is empty');
emptyQueueError.name = 'EmptyQueue';
return callback(emptyQueueError);
}
self.jobRunner.run(job_id)
.on('done', function (job) {
console.log('Job %s done in %s', job_id, host);
self.emit('job:done', job.job_id);
callback();
})
.on('failed', function (job) {
console.log('Job %s failed in %s', job_id, host);
self.emit('job:failed', job.job_id);
callback();
})
.on('error', function (err) {
console.error('Error in job %s due to', job_id, err.message || err);
self.emit('job:failed', job_id);
callback(err);
}); });
}
}); });
}; };

11
batch/forever.js Normal file
View File

@ -0,0 +1,11 @@
'use strict';
module.exports = function forever(fn, done) {
function next(err) {
if (err) {
return done(err);
}
fn(next);
}
next();
};