Merge pull request #286 from CartoDB/batch-use-constants

Batch use constants
This commit is contained in:
Daniel 2016-05-10 10:56:53 +02:00
commit bc69ddc1d6
5 changed files with 39 additions and 24 deletions

View File

@ -4,6 +4,7 @@ var util = require('util');
var EventEmitter = require('events').EventEmitter; var EventEmitter = require('events').EventEmitter;
var forever = require('./forever'); var forever = require('./forever');
var queue = require('queue-async'); var queue = require('queue-async');
var jobStatus = require('./job_status');
function Batch(jobSubscriber, jobQueuePool, jobRunner, jobCanceller) { function Batch(jobSubscriber, jobQueuePool, jobRunner, jobCanceller) {
EventEmitter.call(this); EventEmitter.call(this);
@ -124,7 +125,7 @@ Batch.prototype._consumeJobs = function (host, queue, callback) {
return callback(err); return callback(err);
} }
if (job.status === 'failed') { if (job.status === jobStatus.FAILED) {
console.log('Job %s %s in %s due to: %s', job_id, job.status, host, job.failed_reason); console.log('Job %s %s in %s due to: %s', job_id, job.status, host, job.failed_reason);
} else { } else {
console.log('Job %s %s in %s', job_id, job.status, host); console.log('Job %s %s in %s', job_id, job.status, host);

View File

@ -3,13 +3,14 @@
var uuid = require('node-uuid'); var uuid = require('node-uuid');
var queue = require('queue-async'); var queue = require('queue-async');
var JOBS_TTL_IN_SECONDS = global.settings.jobs_ttl_in_seconds || 48 * 3600; // 48 hours var JOBS_TTL_IN_SECONDS = global.settings.jobs_ttl_in_seconds || 48 * 3600; // 48 hours
var jobStatus = require('./job_status');
function setPendingIfMutiqueryJob(sql) { function setPendingIfMutiqueryJob(sql) {
if (Array.isArray(sql)) { if (Array.isArray(sql)) {
for (var j = 0; j < sql.length; j++) { for (var j = 0; j < sql.length; j++) {
sql[j] = { sql[j] = {
query: sql[j], query: sql[j],
status: 'pending' status: jobStatus.PENDING
}; };
} }
} }
@ -36,7 +37,7 @@ JobBackend.prototype.create = function (username, sql, host, callback) {
var redisParams = [ var redisParams = [
this.redisPrefix + job_id, this.redisPrefix + job_id,
'user', username, 'user', username,
'status', 'pending', 'status', jobStatus.PENDING,
'query', JSON.stringify(sql), 'query', JSON.stringify(sql),
'created_at', now, 'created_at', now,
'updated_at', now 'updated_at', now
@ -74,13 +75,13 @@ JobBackend.prototype.update = function (job_id, sql, callback) {
return callback(err); return callback(err);
} }
if (job.status !== 'pending') { if (job.status !== jobStatus.PENDING) {
return callback(new Error('Job is not pending, it cannot be updated')); return callback(new Error('Job is not pending, it cannot be updated'));
} }
if (Array.isArray(job.query)) { if (Array.isArray(job.query)) {
for (var i = 0; i < job.query.length; i++) { for (var i = 0; i < job.query.length; i++) {
if (job.query[i].status !== 'pending') { if (job.query[i].status !== jobStatus.PENDING) {
return callback(new Error('Job is not pending, it cannot be updated')); return callback(new Error('Job is not pending, it cannot be updated'));
} }
} }
@ -224,14 +225,14 @@ JobBackend.prototype.setRunning = function (job, index, callback) {
var now = new Date().toISOString(); var now = new Date().toISOString();
var redisParams = [ var redisParams = [
this.redisPrefix + job.job_id, this.redisPrefix + job.job_id,
'status', 'running', 'status', jobStatus.RUNNING,
'updated_at', now, 'updated_at', now,
]; ];
if (!callback) { if (!callback) {
callback = index; callback = index;
} else if (index >= 0 && index < job.query.length) { } else if (index >= 0 && index < job.query.length) {
job.query[index].status = 'running'; job.query[index].status = jobStatus.RUNNING;
redisParams = redisParams.concat('query', JSON.stringify(job.query)); redisParams = redisParams.concat('query', JSON.stringify(job.query));
} }
@ -250,14 +251,14 @@ JobBackend.prototype.setPending = function (job, index, callback) {
var redisKey = this.redisPrefix + job.job_id; var redisKey = this.redisPrefix + job.job_id;
var redisParams = [ var redisParams = [
redisKey, redisKey,
'status', 'pending', 'status', jobStatus.PENDING,
'updated_at', now 'updated_at', now
]; ];
if (!callback) { if (!callback) {
callback = index; callback = index;
} else if (index >= 0 && index < job.query.length) { } else if (index >= 0 && index < job.query.length) {
job.query[index].status = 'pending'; job.query[index].status = jobStatus.PENDING;
redisParams = redisParams.concat('query', JSON.stringify(job.query)); redisParams = redisParams.concat('query', JSON.stringify(job.query));
} }
@ -276,14 +277,14 @@ JobBackend.prototype.setDone = function (job, index, callback) {
var redisKey = this.redisPrefix + job.job_id; var redisKey = this.redisPrefix + job.job_id;
var redisParams = [ var redisParams = [
redisKey, redisKey,
'status', 'done', 'status', jobStatus.DONE,
'updated_at', now 'updated_at', now
]; ];
if (!callback) { if (!callback) {
callback = index; callback = index;
} else if (index >= 0 && index < job.query.length) { } else if (index >= 0 && index < job.query.length) {
job.query[index].status = 'done'; job.query[index].status = jobStatus.DONE;
redisParams = redisParams.concat('query', JSON.stringify(job.query)); redisParams = redisParams.concat('query', JSON.stringify(job.query));
} }
@ -307,11 +308,11 @@ JobBackend.prototype.setJobPendingAndQueryDone = function (job, index, callback)
var now = new Date().toISOString(); var now = new Date().toISOString();
var redisKey = this.redisPrefix + job.job_id; var redisKey = this.redisPrefix + job.job_id;
job.query[index].status = 'done'; job.query[index].status = jobStatus.DONE;
var redisParams = [ var redisParams = [
redisKey, redisKey,
'status', 'pending', 'status', jobStatus.PENDING,
'updated_at', now, 'updated_at', now,
'query', JSON.stringify(job.query) 'query', JSON.stringify(job.query)
]; ];
@ -331,7 +332,7 @@ JobBackend.prototype.setFailed = function (job, error, index, callback) {
var redisKey = this.redisPrefix + job.job_id; var redisKey = this.redisPrefix + job.job_id;
var redisParams = [ var redisParams = [
redisKey, redisKey,
'status', 'failed', 'status', jobStatus.FAILED,
'failed_reason', error.message, 'failed_reason', error.message,
'updated_at', now 'updated_at', now
]; ];
@ -339,7 +340,7 @@ JobBackend.prototype.setFailed = function (job, error, index, callback) {
if (!callback) { if (!callback) {
callback = index; callback = index;
} else if (index >= 0 && index < job.query.length) { } else if (index >= 0 && index < job.query.length) {
job.query[index].status = 'failed'; job.query[index].status = jobStatus.FAILED;
job.query[index].failed_reason = error.message; job.query[index].failed_reason = error.message;
redisParams = redisParams.concat('query', JSON.stringify(job.query)); redisParams = redisParams.concat('query', JSON.stringify(job.query));
} }
@ -365,14 +366,14 @@ JobBackend.prototype.setCancelled = function (job, index, callback) {
var redisKey = this.redisPrefix + job.job_id; var redisKey = this.redisPrefix + job.job_id;
var redisParams = [ var redisParams = [
redisKey, redisKey,
'status', 'cancelled', 'status', jobStatus.CANCELLED,
'updated_at', now 'updated_at', now
]; ];
if (!callback) { if (!callback) {
callback = index; callback = index;
} else if (index >= 0 && index < job.query.length) { } else if (index >= 0 && index < job.query.length) {
job.query[index].status = 'cancelled'; job.query[index].status = jobStatus.CANCELLED;
redisParams = redisParams.concat('query', JSON.stringify(job.query)); redisParams = redisParams.concat('query', JSON.stringify(job.query));
} }
@ -404,7 +405,7 @@ JobBackend.prototype.setUnknown = function (job_id, callback) {
var redisKey = self.redisPrefix + job.job_id; var redisKey = self.redisPrefix + job.job_id;
var redisParams = [ var redisParams = [
redisKey, redisKey,
'status', 'unknown', 'status', jobStatus.UNKNOWN,
'updated_at', now 'updated_at', now
]; ];

View File

@ -1,6 +1,7 @@
'use strict'; 'use strict';
var PSQL = require('cartodb-psql'); var PSQL = require('cartodb-psql');
var jobStatus = require('./job_status');
function JobCanceller(metadataBackend, userDatabaseMetadataService, jobBackend) { function JobCanceller(metadataBackend, userDatabaseMetadataService, jobBackend) {
this.metadataBackend = metadataBackend; this.metadataBackend = metadataBackend;
@ -11,7 +12,7 @@ function JobCanceller(metadataBackend, userDatabaseMetadataService, jobBackend)
function getIndexOfRunningQuery(job) { function getIndexOfRunningQuery(job) {
if (Array.isArray(job.query)) { if (Array.isArray(job.query)) {
for (var i = 0; i < job.query.length; i++) { for (var i = 0; i < job.query.length; i++) {
if (job.query[i].status === 'running') { if (job.query[i].status === jobStatus.RUNNING) {
return i; return i;
} }
} }
@ -26,11 +27,11 @@ JobCanceller.prototype.cancel = function (job_id, callback) {
return callback(err); return callback(err);
} }
if (job.status === 'pending') { if (job.status === jobStatus.PENDING) {
return self.jobBackend.setCancelled(job, callback); return self.jobBackend.setCancelled(job, callback);
} }
if (job.status !== 'running') { if (job.status !== jobStatus.RUNNING) {
var cancelNotAllowedError = new Error('Job is ' + job.status + ', cancel is not allowed'); var cancelNotAllowedError = new Error('Job is ' + job.status + ', cancel is not allowed');
cancelNotAllowedError.name = 'CancelNotAllowedError'; cancelNotAllowedError.name = 'CancelNotAllowedError';
return callback(cancelNotAllowedError); return callback(cancelNotAllowedError);

View File

@ -1,6 +1,7 @@
'use strict'; 'use strict';
var errorCodes = require('../app/postgresql/error_codes').codeToCondition; var errorCodes = require('../app/postgresql/error_codes').codeToCondition;
var jobStatus = require('./job_status');
function getNextQuery(job) { function getNextQuery(job) {
if (!Array.isArray(job.query)) { if (!Array.isArray(job.query)) {
@ -10,7 +11,7 @@ function getNextQuery(job) {
} }
for (var i = 0; i < job.query.length; i++) { for (var i = 0; i < job.query.length; i++) {
if (job.query[i].status === 'pending') { if (job.query[i].status === jobStatus.PENDING) {
return { return {
index: i, index: i,
query: job.query[i].query query: job.query[i].query
@ -46,7 +47,7 @@ JobRunner.prototype.run = function (job_id, callback) {
return callback(err); return callback(err);
} }
if (job.status !== 'pending') { if (job.status !== jobStatus.PENDING) {
var invalidJobStatusError = new Error([ var invalidJobStatusError = new Error([
'Cannot run job', 'Cannot run job',
job.job_id, job.job_id,
@ -98,7 +99,6 @@ JobRunner.prototype._run = function (job, query, callback) {
} }
if (isLastQuery(job, query.index)) { if (isLastQuery(job, query.index)) {
console.log('set done', query.index);
return self.jobBackend.setDone(job, query.index, callback); return self.jobBackend.setDone(job, query.index, callback);
} }

12
batch/job_status.js Normal file
View File

@ -0,0 +1,12 @@
'use strict';
var JOB_STATUS_ENUM = {
PENDING: 'pending',
RUNNING: 'running',
DONE: 'done',
CANCELLED: 'cancelled',
FAILED: 'failed',
UNKNOWN: 'unknown'
};
module.exports = JOB_STATUS_ENUM;