diff --git a/batch/batch.js b/batch/batch.js index 266e8de2..25a37a3b 100644 --- a/batch/batch.js +++ b/batch/batch.js @@ -4,6 +4,7 @@ var util = require('util'); var EventEmitter = require('events').EventEmitter; var forever = require('./forever'); var queue = require('queue-async'); +var jobStatus = require('./job_status'); function Batch(jobSubscriber, jobQueuePool, jobRunner, jobCanceller) { EventEmitter.call(this); @@ -124,7 +125,7 @@ Batch.prototype._consumeJobs = function (host, queue, callback) { return callback(err); } - if (job.status === 'failed') { + if (job.status === jobStatus.FAILED) { console.log('Job %s %s in %s due to: %s', job_id, job.status, host, job.failed_reason); } else { console.log('Job %s %s in %s', job_id, job.status, host); diff --git a/batch/job_backend.js b/batch/job_backend.js index fef104bc..80064d12 100644 --- a/batch/job_backend.js +++ b/batch/job_backend.js @@ -3,13 +3,14 @@ var uuid = require('node-uuid'); var queue = require('queue-async'); var JOBS_TTL_IN_SECONDS = global.settings.jobs_ttl_in_seconds || 48 * 3600; // 48 hours +var jobStatus = require('./job_status'); function setPendingIfMutiqueryJob(sql) { if (Array.isArray(sql)) { for (var j = 0; j < sql.length; j++) { sql[j] = { query: sql[j], - status: 'pending' + status: jobStatus.PENDING }; } } @@ -36,7 +37,7 @@ JobBackend.prototype.create = function (username, sql, host, callback) { var redisParams = [ this.redisPrefix + job_id, 'user', username, - 'status', 'pending', + 'status', jobStatus.PENDING, 'query', JSON.stringify(sql), 'created_at', now, 'updated_at', now @@ -74,13 +75,13 @@ JobBackend.prototype.update = function (job_id, sql, callback) { return callback(err); } - if (job.status !== 'pending') { + if (job.status !== jobStatus.PENDING) { return callback(new Error('Job is not pending, it cannot be updated')); } if (Array.isArray(job.query)) { for (var i = 0; i < job.query.length; i++) { - if (job.query[i].status !== 'pending') { + if (job.query[i].status !== jobStatus.PENDING) { return callback(new Error('Job is not pending, it cannot be updated')); } } @@ -224,14 +225,14 @@ JobBackend.prototype.setRunning = function (job, index, callback) { var now = new Date().toISOString(); var redisParams = [ this.redisPrefix + job.job_id, - 'status', 'running', + 'status', jobStatus.RUNNING, 'updated_at', now, ]; if (!callback) { callback = index; } else if (index >= 0 && index < job.query.length) { - job.query[index].status = 'running'; + job.query[index].status = jobStatus.RUNNING; redisParams = redisParams.concat('query', JSON.stringify(job.query)); } @@ -250,14 +251,14 @@ JobBackend.prototype.setPending = function (job, index, callback) { var redisKey = this.redisPrefix + job.job_id; var redisParams = [ redisKey, - 'status', 'pending', + 'status', jobStatus.PENDING, 'updated_at', now ]; if (!callback) { callback = index; } else if (index >= 0 && index < job.query.length) { - job.query[index].status = 'pending'; + job.query[index].status = jobStatus.PENDING; redisParams = redisParams.concat('query', JSON.stringify(job.query)); } @@ -276,14 +277,14 @@ JobBackend.prototype.setDone = function (job, index, callback) { var redisKey = this.redisPrefix + job.job_id; var redisParams = [ redisKey, - 'status', 'done', + 'status', jobStatus.DONE, 'updated_at', now ]; if (!callback) { callback = index; } else if (index >= 0 && index < job.query.length) { - job.query[index].status = 'done'; + job.query[index].status = jobStatus.DONE; redisParams = redisParams.concat('query', JSON.stringify(job.query)); } @@ -307,11 +308,11 @@ JobBackend.prototype.setJobPendingAndQueryDone = function (job, index, callback) var now = new Date().toISOString(); var redisKey = this.redisPrefix + job.job_id; - job.query[index].status = 'done'; + job.query[index].status = jobStatus.DONE; var redisParams = [ redisKey, - 'status', 'pending', + 'status', jobStatus.PENDING, 'updated_at', now, 'query', JSON.stringify(job.query) ]; @@ -331,7 +332,7 @@ JobBackend.prototype.setFailed = function (job, error, index, callback) { var redisKey = this.redisPrefix + job.job_id; var redisParams = [ redisKey, - 'status', 'failed', + 'status', jobStatus.FAILED, 'failed_reason', error.message, 'updated_at', now ]; @@ -339,7 +340,7 @@ JobBackend.prototype.setFailed = function (job, error, index, callback) { if (!callback) { callback = index; } else if (index >= 0 && index < job.query.length) { - job.query[index].status = 'failed'; + job.query[index].status = jobStatus.FAILED; job.query[index].failed_reason = error.message; redisParams = redisParams.concat('query', JSON.stringify(job.query)); } @@ -365,14 +366,14 @@ JobBackend.prototype.setCancelled = function (job, index, callback) { var redisKey = this.redisPrefix + job.job_id; var redisParams = [ redisKey, - 'status', 'cancelled', + 'status', jobStatus.CANCELLED, 'updated_at', now ]; if (!callback) { callback = index; } else if (index >= 0 && index < job.query.length) { - job.query[index].status = 'cancelled'; + job.query[index].status = jobStatus.CANCELLED; redisParams = redisParams.concat('query', JSON.stringify(job.query)); } @@ -404,7 +405,7 @@ JobBackend.prototype.setUnknown = function (job_id, callback) { var redisKey = self.redisPrefix + job.job_id; var redisParams = [ redisKey, - 'status', 'unknown', + 'status', jobStatus.UNKNOWN, 'updated_at', now ]; diff --git a/batch/job_canceller.js b/batch/job_canceller.js index f18a266d..dfeb122a 100644 --- a/batch/job_canceller.js +++ b/batch/job_canceller.js @@ -1,6 +1,7 @@ 'use strict'; var PSQL = require('cartodb-psql'); +var jobStatus = require('./job_status'); function JobCanceller(metadataBackend, userDatabaseMetadataService, jobBackend) { this.metadataBackend = metadataBackend; @@ -11,7 +12,7 @@ function JobCanceller(metadataBackend, userDatabaseMetadataService, jobBackend) function getIndexOfRunningQuery(job) { if (Array.isArray(job.query)) { for (var i = 0; i < job.query.length; i++) { - if (job.query[i].status === 'running') { + if (job.query[i].status === jobStatus.RUNNING) { return i; } } @@ -26,11 +27,11 @@ JobCanceller.prototype.cancel = function (job_id, callback) { return callback(err); } - if (job.status === 'pending') { + if (job.status === jobStatus.PENDING) { return self.jobBackend.setCancelled(job, callback); } - if (job.status !== 'running') { + if (job.status !== jobStatus.RUNNING) { var cancelNotAllowedError = new Error('Job is ' + job.status + ', cancel is not allowed'); cancelNotAllowedError.name = 'CancelNotAllowedError'; return callback(cancelNotAllowedError); diff --git a/batch/job_runner.js b/batch/job_runner.js index f0df31e6..64a269e9 100644 --- a/batch/job_runner.js +++ b/batch/job_runner.js @@ -1,6 +1,7 @@ 'use strict'; var errorCodes = require('../app/postgresql/error_codes').codeToCondition; +var jobStatus = require('./job_status'); function getNextQuery(job) { if (!Array.isArray(job.query)) { @@ -10,7 +11,7 @@ function getNextQuery(job) { } for (var i = 0; i < job.query.length; i++) { - if (job.query[i].status === 'pending') { + if (job.query[i].status === jobStatus.PENDING) { return { index: i, query: job.query[i].query @@ -46,7 +47,7 @@ JobRunner.prototype.run = function (job_id, callback) { return callback(err); } - if (job.status !== 'pending') { + if (job.status !== jobStatus.PENDING) { var invalidJobStatusError = new Error([ 'Cannot run job', job.job_id, @@ -98,7 +99,6 @@ JobRunner.prototype._run = function (job, query, callback) { } if (isLastQuery(job, query.index)) { - console.log('set done', query.index); return self.jobBackend.setDone(job, query.index, callback); } diff --git a/batch/job_status.js b/batch/job_status.js new file mode 100644 index 00000000..d212679f --- /dev/null +++ b/batch/job_status.js @@ -0,0 +1,12 @@ +'use strict'; + +var JOB_STATUS_ENUM = { + PENDING: 'pending', + RUNNING: 'running', + DONE: 'done', + CANCELLED: 'cancelled', + FAILED: 'failed', + UNKNOWN: 'unknown' +}; + +module.exports = JOB_STATUS_ENUM;