Separate job draining from processing

This commit is contained in:
Raul Ochoa 2016-10-17 18:42:29 +02:00
parent a8e03f01c9
commit 761fbe5205

View File

@ -20,8 +20,10 @@ function Batch(name, jobSubscriber, jobQueue, jobRunner, jobService, jobPublishe
this.locker = Locker.create('redis-distlock', { pool: redisPool });
this.hostUserQueueMover = new HostUserQueueMover(jobQueue, jobService, this.locker, redisPool);
// map: host => jobId
this.workingQueues = {};
// map: user => jobId. Will be used for draining jobs.
this.workInProgressJobs = {};
}
util.inherits(Batch, EventEmitter);
@ -97,9 +99,11 @@ Batch.prototype.processNextJob = function (user, callback) {
return callback(emptyQueueError);
}
self.setWorkInProgressJob(user, jobId);
self.setProcessingJobId(user, jobId);
self.jobRunner.run(jobId, function (err, job) {
self.clearWorkInProgressJob(user);
self.setProcessingJobId(user, null);
if (err) {
@ -123,7 +127,7 @@ Batch.prototype.processNextJob = function (user, callback) {
Batch.prototype.drain = function (callback) {
var self = this;
var workingUsers = this.getWorkingUsers();
var workingUsers = this.getWorkInProgressUsers();
var batchQueues = queue(workingUsers.length);
workingUsers.forEach(function (user) {
@ -143,7 +147,7 @@ Batch.prototype.drain = function (callback) {
Batch.prototype._drainJob = function (user, callback) {
var self = this;
var job_id = this.getProcessingJobId(user);
var job_id = this.getWorkInProgressJob(user);
if (!job_id) {
return process.nextTick(function () {
@ -173,18 +177,29 @@ Batch.prototype.isProcessingUser = function(user) {
return this.workingQueues.hasOwnProperty(user);
};
Batch.prototype.getWorkingUsers = function() {
return Object.keys(this.workingQueues);
};
Batch.prototype.setProcessingJobId = function(user, jobId) {
this.workingQueues[user] = jobId;
};
Batch.prototype.getProcessingJobId = function(user) {
return this.workingQueues[user];
};
Batch.prototype.finishedProcessingUser = function(user) {
delete this.workingQueues[user];
};
/* Work in progress jobs */
Batch.prototype.setWorkInProgressJob = function(user, jobId) {
this.workInProgressJobs[user] = jobId;
};
Batch.prototype.getWorkInProgressJob = function(user) {
return this.workInProgressJobs[user];
};
Batch.prototype.clearWorkInProgressJob = function(user) {
delete this.workInProgressJobs[user];
};
Batch.prototype.getWorkInProgressUsers = function() {
return Object.keys(this.workInProgressJobs);
};