From cc7dd7a0d2d78e2c14ff9a89e8d5538cf07306c5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Garc=C3=ADa=20Aubert?= Date: Fri, 13 May 2016 18:50:55 +0200 Subject: [PATCH 1/9] Job model refactor --- app/controllers/job_controller.js | 72 ++--- batch/batch.js | 95 ++++--- batch/index.js | 7 +- batch/job_backend.js | 439 ++++++++---------------------- batch/job_base.js | 137 ++++++++++ batch/job_canceller.js | 116 +++----- batch/job_factory.js | 24 ++ batch/job_multiple.js | 105 +++++++ batch/job_runner.js | 100 +++---- batch/job_service.js | 141 ++++++++++ batch/job_simple.js | 45 +++ batch/query_runner.js | 6 +- 12 files changed, 726 insertions(+), 561 deletions(-) create mode 100644 batch/job_base.js create mode 100644 batch/job_factory.js create mode 100644 batch/job_multiple.js create mode 100644 batch/job_service.js create mode 100644 batch/job_simple.js diff --git a/app/controllers/job_controller.js b/app/controllers/job_controller.js index 892a976d..2d5b186c 100644 --- a/app/controllers/job_controller.js +++ b/app/controllers/job_controller.js @@ -37,12 +37,13 @@ function getMaxSizeErrorMessage(sql) { ); } -function JobController(userDatabaseService, jobBackend, jobCanceller) { +function JobController(userDatabaseService, jobService) { this.userDatabaseService = userDatabaseService; - this.jobBackend = jobBackend; - this.jobCanceller = jobCanceller; + this.jobService = jobService; } +module.exports = JobController; + JobController.prototype.route = function (app) { app.post(global.settings.base_url + '/sql/job', this.createJob.bind(this)); app.get(global.settings.base_url + '/sql/job', this.listJob.bind(this)); @@ -84,13 +85,13 @@ JobController.prototype.cancelJob = function (req, res) { req.profiler.done('setDBAuth'); } - self.jobCanceller.cancel(job_id, function (err, job) { + self.jobService.cancel(job_id, function (err, job) { if (err) { return next(err); } next(null, { - job: job, + job: job.serialize(), host: userDatabase.host }); }); @@ -149,13 +150,15 @@ JobController.prototype.listJob = function (req, res) { req.profiler.done('setDBAuth'); } - self.jobBackend.list(cdbUsername, function (err, jobs) { + self.jobService.list(cdbUsername, function (err, jobs) { if (err) { return next(err); } next(null, { - jobs: jobs, + jobs: jobs.map(function (job) { + return job.serialize(); + }), host: userDatabase.host }); }); @@ -215,13 +218,13 @@ JobController.prototype.getJob = function (req, res) { req.profiler.done('setDBAuth'); } - self.jobBackend.get(job_id, function (err, job) { + self.jobService.get(job_id, function (err, job) { if (err) { return next(err); } next(null, { - job: job, + job: job.serialize(), host: userDatabase.host }); }); @@ -249,23 +252,6 @@ JobController.prototype.getJob = function (req, res) { ); }; -function isValidJob(sql) { - if (_.isArray(sql)) { - for (var i = 0; i < sql.length; i++) { - if (!_.isString(sql[i])) { - return false; - } - } - return true; - } - - if (!_.isString(sql)) { - return false; - } - - return true; -} - JobController.prototype.createJob = function (req, res) { // jshint maxcomplexity: 7 var self = this; @@ -274,11 +260,7 @@ JobController.prototype.createJob = function (req, res) { var sql = (params.query === "" || _.isUndefined(params.query)) ? null : params.query; var cdbUsername = cdbReq.userByReq(req); - - if (!isValidJob(sql)) { - return handleException(new Error('You must indicate a valid SQL'), res); - } - + // TODO: in job.validate() if (reachMaxQuerySizeLimit(sql)) { return handleException(new Error(getMaxSizeErrorMessage(sql)), res); } @@ -308,13 +290,19 @@ JobController.prototype.createJob = function (req, res) { req.profiler.done('setDBAuth'); } - self.jobBackend.create(cdbUsername, sql, userDatabase.host, function (err, result) { + var data = { + user: cdbUsername, + query: sql, + host: userDatabase.host + }; + + self.jobService.create(data, function (err, job) { if (err) { return next(err); } next(null, { - job: result, + job: job.serialize(), host: userDatabase.host }); }); @@ -342,7 +330,6 @@ JobController.prototype.createJob = function (req, res) { ); }; - JobController.prototype.updateJob = function (req, res) { // jshint maxcomplexity: 7 var self = this; @@ -352,10 +339,7 @@ JobController.prototype.updateJob = function (req, res) { var sql = (params.query === "" || _.isUndefined(params.query)) ? null : params.query; var cdbUsername = cdbReq.userByReq(req); - if (!isValidJob(sql)) { - return handleException(new Error('You must indicate a valid SQL'), res); - } - + // TODO: in jobValidate if (reachMaxQuerySizeLimit(sql)) { return handleException(new Error(getMaxSizeErrorMessage(sql)), res); } @@ -385,13 +369,18 @@ JobController.prototype.updateJob = function (req, res) { req.profiler.done('setDBAuth'); } - self.jobBackend.update(job_id, sql, function (err, job) { + var data = { + job_id: job_id, + query: sql + }; + + self.jobService.update(data, function (err, job) { if (err) { return next(err); } next(null, { - job: job, + job: job.serialize(), host: userDatabase.host }); }); @@ -413,9 +402,8 @@ JobController.prototype.updateJob = function (req, res) { if (result.host) { res.header('X-Served-By-DB-Host', result.host); } + res.send(result.job); } ); }; - -module.exports = JobController; diff --git a/batch/batch.js b/batch/batch.js index 25a37a3b..87025cfe 100644 --- a/batch/batch.js +++ b/batch/batch.js @@ -6,15 +6,17 @@ var forever = require('./forever'); var queue = require('queue-async'); var jobStatus = require('./job_status'); -function Batch(jobSubscriber, jobQueuePool, jobRunner, jobCanceller) { +function Batch(jobSubscriber, jobQueuePool, jobRunner, jobService) { EventEmitter.call(this); this.jobSubscriber = jobSubscriber; this.jobQueuePool = jobQueuePool; this.jobRunner = jobRunner; - this.jobCanceller = jobCanceller; + this.jobService = jobService; } util.inherits(Batch, EventEmitter); +module.exports = Batch; + Batch.prototype.start = function () { this._subscribe(); }; @@ -46,11 +48,51 @@ Batch.prototype._subscribe = function () { }); }; -Batch.prototype.drain = function (callback) { + +Batch.prototype._consumeJobs = function (host, queue, callback) { var self = this; - var queues = this.jobQueuePool.list(); + queue.dequeue(host, function (err, job_id) { + if (err) { + return callback(err); + } + if (!job_id) { + var emptyQueueError = new Error('Queue ' + host + ' is empty'); + emptyQueueError.name = 'EmptyQueue'; + return callback(emptyQueueError); + } + + self.jobQueuePool.setCurrentJobId(host, job_id); + + self.jobRunner.run(job_id, function (err, job) { + self.jobQueuePool.removeCurrentJobId(host); + + if (err && err.name === 'JobNotRunnable') { + console.log(err.message); + return callback(); + } + + if (err) { + return callback(err); + } + + if (job.data.status === jobStatus.FAILED) { + console.log('Job %s %s in %s due to: %s', job_id, job.data.status, host, job.failed_reason); + } else { + console.log('Job %s %s in %s', job_id, job.data.status, host); + } + + self.emit('job:' + job.data.status, job_id); + + callback(); + }); + }); +}; + +Batch.prototype.drain = function (callback) { + var self = this; + var queues = this.jobQueuePool.list(); var batchQueues = queue(queues.length); queues.forEach(function (host) { @@ -80,7 +122,7 @@ Batch.prototype._drainJob = function (host, callback) { var queue = self.jobQueuePool.getQueue(host); - this.jobCanceller.drain(job_id, function (err) { + this.jobService.drain(job_id, function (err) { if (err && err.name === 'CancelNotAllowedError') { return callback(); } @@ -96,46 +138,3 @@ Batch.prototype._drainJob = function (host, callback) { Batch.prototype.stop = function () { this.jobSubscriber.unsubscribe(); }; - -Batch.prototype._consumeJobs = function (host, queue, callback) { - var self = this; - - queue.dequeue(host, function (err, job_id) { - if (err) { - return callback(err); - } - - if (!job_id) { - var emptyQueueError = new Error('Queue ' + host + ' is empty'); - emptyQueueError.name = 'EmptyQueue'; - return callback(emptyQueueError); - } - - self.jobQueuePool.setCurrentJobId(host, job_id); - - self.jobRunner.run(job_id, function (err, job) { - self.jobQueuePool.removeCurrentJobId(host); - - if (err && err.name === 'InvalidJobStatus') { - console.log(err.message); - return callback(); - } - - if (err) { - return callback(err); - } - - if (job.status === jobStatus.FAILED) { - console.log('Job %s %s in %s due to: %s', job_id, job.status, host, job.failed_reason); - } else { - console.log('Job %s %s in %s', job_id, job.status, host); - } - - self.emit('job:' + job.status, job_id); - - callback(); - }); - }); -}; - -module.exports = Batch; diff --git a/batch/index.js b/batch/index.js index 54c83c92..3d996c63 100644 --- a/batch/index.js +++ b/batch/index.js @@ -12,6 +12,7 @@ var JobPublisher = require('./job_publisher'); var JobQueue = require('./job_queue'); var UserIndexer = require('./user_indexer'); var JobBackend = require('./job_backend'); +var JobService = require('./job_service'); var Batch = require('./batch'); module.exports = function batchFactory (metadataBackend) { @@ -23,9 +24,11 @@ module.exports = function batchFactory (metadataBackend) { var userIndexer = new UserIndexer(metadataBackend); var jobBackend = new JobBackend(metadataBackend, jobQueue, jobPublisher, userIndexer); var userDatabaseMetadataService = new UserDatabaseMetadataService(metadataBackend); + // TODO: down userDatabaseMetadataService var queryRunner = new QueryRunner(); - var jobRunner = new JobRunner(jobBackend, jobQueue, queryRunner, userDatabaseMetadataService); var jobCanceller = new JobCanceller(metadataBackend, userDatabaseMetadataService, jobBackend); + var jobService = new JobService(jobBackend, jobCanceller); + var jobRunner = new JobRunner(jobService, jobQueue, queryRunner, userDatabaseMetadataService); - return new Batch(jobSubscriber, jobQueuePool, jobRunner, jobCanceller); + return new Batch(jobSubscriber, jobQueuePool, jobRunner, jobService); }; diff --git a/batch/job_backend.js b/batch/job_backend.js index 80064d12..7134d7f5 100644 --- a/batch/job_backend.js +++ b/batch/job_backend.js @@ -1,22 +1,14 @@ 'use strict'; -var uuid = require('node-uuid'); var queue = require('queue-async'); var JOBS_TTL_IN_SECONDS = global.settings.jobs_ttl_in_seconds || 48 * 3600; // 48 hours var jobStatus = require('./job_status'); - -function setPendingIfMutiqueryJob(sql) { - if (Array.isArray(sql)) { - for (var j = 0; j < sql.length; j++) { - sql[j] = { - query: sql[j], - status: jobStatus.PENDING - }; - } - } - - return sql; -} +var finalStatus = [ + jobStatus.CANCELLED, + jobStatus.DONE, + jobStatus.FAILED, + jobStatus.UNKNOWN +]; function JobBackend(metadataBackend, jobQueueProducer, jobPublisher, userIndexer) { this.db = 5; @@ -27,103 +19,164 @@ function JobBackend(metadataBackend, jobQueueProducer, jobPublisher, userIndexer this.userIndexer = userIndexer; } -JobBackend.prototype.create = function (username, sql, host, callback) { +JobBackend.prototype.toRedisParams = function (obj) { + var redisParams = []; + for (var property in obj) { + if (obj.hasOwnProperty(property)) { + redisParams.push(property); + redisParams.push(obj[property]); + } + } + + return redisParams; +}; + +JobBackend.prototype.toObject = function (redisParams, redisValues) { + var obj = {}; + for (var i = 0; i < redisParams.length; i++) { + obj[redisParams[i]] = redisValues[i]; + } + + return obj; +}; + +// TODO: is it really necessary?? +function isJobFound(redisValues) { + return redisValues[0] && redisValues[1] && redisValues[2] && redisValues[3] && redisValues[4]; +} + +JobBackend.prototype.get = function (job_id, callback) { var self = this; - var job_id = uuid.v4(); - var now = new Date().toISOString(); - - sql = setPendingIfMutiqueryJob(sql); - var redisParams = [ this.redisPrefix + job_id, - 'user', username, - 'status', jobStatus.PENDING, - 'query', JSON.stringify(sql), - 'created_at', now, - 'updated_at', now + 'user', + 'status', + 'query', + 'created_at', + 'updated_at', + 'failed_reason' ]; - this.metadataBackend.redisCmd(this.db, 'HMSET', redisParams , function (err) { + this.metadataBackend.redisCmd(this.db, 'HMGET', redisParams , function (err, redisValues) { if (err) { return callback(err); } - self.jobQueueProducer.enqueue(job_id, host, function (err) { + if (!isJobFound(redisValues)) { + var notFoundError = new Error('Job with id ' + job_id + ' not found'); + notFoundError.name = 'NotFoundError'; + return callback(notFoundError); + } + + var jobData = self.toObject(redisParams.slice(1), redisValues); + jobData.job_id = job_id; + + callback(null, jobData); + }); +}; + +JobBackend.prototype.create = function (data, callback) { + var self = this; + + self.get(data.job_id, function (err) { + if (err && err.name !== 'NotFoundError') { + return callback(err); + } + + self.save(data, function (err, job) { if (err) { return callback(err); } - // broadcast to consumers - self.jobPublisher.publish(host); + self.jobQueueProducer.enqueue(data.job_id, data.host, function (err) { + if (err) { + return callback(err); + } - self.userIndexer.add(username, job_id, function (err) { - if (err) { - return callback(err); - } + // broadcast to consumers + self.jobPublisher.publish(data.host); - self.get(job_id, callback); + self.userIndexer.add(data.user, data.job_id, function (err) { + if (err) { + return callback(err); + } + + callback(null, job); + }); }); }); }); }; -JobBackend.prototype.update = function (job_id, sql, callback) { +JobBackend.prototype.update = function (data, callback) { var self = this; - this.get(job_id, function (err, job) { + self.get(data.job_id, function (err) { if (err) { return callback(err); } - if (job.status !== jobStatus.PENDING) { - return callback(new Error('Job is not pending, it cannot be updated')); + self.save(data, callback); + }); +}; + +JobBackend.prototype.save = function (data, callback) { + var self = this; + var redisParams = self.toRedisParams(data); + + self.metadataBackend.redisCmd(self.db, 'HMSET', redisParams , function (err) { + if (err) { + return callback(err); } - if (Array.isArray(job.query)) { - for (var i = 0; i < job.query.length; i++) { - if (job.query[i].status !== jobStatus.PENDING) { - return callback(new Error('Job is not pending, it cannot be updated')); - } - } - } - - sql = setPendingIfMutiqueryJob(sql); - - var now = new Date().toISOString(); - var redisParams = [ - self.redisPrefix + job_id, - 'query', JSON.stringify(sql), - 'updated_at', now - ]; - - self.metadataBackend.redisCmd(self.db, 'HMSET', redisParams , function (err) { + self.setTTL(data, function (err) { if (err) { return callback(err); } - self.get(job_id, callback); - }); + self.get(data.job_id, function (err, job) { + if (err) { + return callback(err); + } + callback(null, job); + }); + }); }); }; -JobBackend.prototype.list = function (username, callback) { +function isFrozen(status) { + return finalStatus.indexOf(status) !== -1; +} + +JobBackend.prototype.setTTL = function (data, callback) { + var self = this; + var redisKey = this.redisPrefix + data.job_id; + + if (!isFrozen(data.status)) { + return callback(); + } + + self.metadataBackend.redisCmd(self.db, 'EXPIRE', [ redisKey, JOBS_TTL_IN_SECONDS ], callback); +}; + +JobBackend.prototype.list = function (user, callback) { var self = this; - this.userIndexer.list(username, function (err, job_ids) { + this.userIndexer.list(user, function (err, job_ids) { if (err) { return callback(err); } var initialLength = job_ids.length; - self._getCleanedList(username, job_ids, function (err, jobs) { + self._getCleanedList(user, job_ids, function (err, jobs) { if (err) { return callback(err); } if (jobs.length < initialLength) { - return self.list(username, callback); + return self.list(user, callback); } callback(null, jobs); @@ -131,13 +184,13 @@ JobBackend.prototype.list = function (username, callback) { }); }; -JobBackend.prototype._getCleanedList = function (username, job_ids, callback) { +JobBackend.prototype._getCleanedList = function (user, job_ids, callback) { var self = this; var jobsQueue = queue(job_ids.length); job_ids.forEach(function(job_id) { - jobsQueue.defer(self._getIndexedJob.bind(self), job_id, username); + jobsQueue.defer(self._getIndexedJob.bind(self), job_id, user); }); jobsQueue.awaitAll(function (err, jobs) { @@ -151,13 +204,12 @@ JobBackend.prototype._getCleanedList = function (username, job_ids, callback) { }); }; -JobBackend.prototype._getIndexedJob = function (job_id, username, callback) { +JobBackend.prototype._getIndexedJob = function (job_id, user, callback) { var self = this; this.get(job_id, function (err, job) { - if (err && err.name === 'NotFoundError') { - return self.userIndexer.remove(username, job_id, function (err) { + return self.userIndexer.remove(user, job_id, function (err) { if (err) { console.error('Error removing key %s in user set', job_id, err); } @@ -173,257 +225,4 @@ JobBackend.prototype._getIndexedJob = function (job_id, username, callback) { }); }; -JobBackend.prototype._isJobFound = function (jobValues) { - return jobValues[0] && jobValues[1] && jobValues[2] && jobValues[3] && jobValues[4]; -}; - -JobBackend.prototype.get = function (job_id, callback) { - var self = this; - var redisParams = [ - this.redisPrefix + job_id, - 'user', - 'status', - 'query', - 'created_at', - 'updated_at', - 'failed_reason' - ]; - - this.metadataBackend.redisCmd(this.db, 'HMGET', redisParams , function (err, jobValues) { - if (err) { - return callback(err); - } - - if (!self._isJobFound(jobValues)) { - var notFoundError = new Error('Job with id ' + job_id + ' not found'); - notFoundError.name = 'NotFoundError'; - return callback(notFoundError); - } - - var query; - - try { - query = JSON.parse(jobValues[2]); - } catch (err) { - query = jobValues[2]; - } - - callback(null, { - job_id: job_id, - user: jobValues[0], - status: jobValues[1], - query: query, - created_at: jobValues[3], - updated_at: jobValues[4], - failed_reason: jobValues[5] ? jobValues[5] : undefined - }); - }); -}; - -JobBackend.prototype.setRunning = function (job, index, callback) { - var self = this; - var now = new Date().toISOString(); - var redisParams = [ - this.redisPrefix + job.job_id, - 'status', jobStatus.RUNNING, - 'updated_at', now, - ]; - - if (!callback) { - callback = index; - } else if (index >= 0 && index < job.query.length) { - job.query[index].status = jobStatus.RUNNING; - redisParams = redisParams.concat('query', JSON.stringify(job.query)); - } - - this.metadataBackend.redisCmd(this.db, 'HMSET', redisParams, function (err) { - if (err) { - return callback(err); - } - - self.get(job.job_id, callback); - }); -}; - -JobBackend.prototype.setPending = function (job, index, callback) { - var self = this; - var now = new Date().toISOString(); - var redisKey = this.redisPrefix + job.job_id; - var redisParams = [ - redisKey, - 'status', jobStatus.PENDING, - 'updated_at', now - ]; - - if (!callback) { - callback = index; - } else if (index >= 0 && index < job.query.length) { - job.query[index].status = jobStatus.PENDING; - redisParams = redisParams.concat('query', JSON.stringify(job.query)); - } - - this.metadataBackend.redisCmd(this.db, 'HMSET', redisParams , function (err) { - if (err) { - return callback(err); - } - - self.get(job.job_id, callback); - }); -}; - -JobBackend.prototype.setDone = function (job, index, callback) { - var self = this; - var now = new Date().toISOString(); - var redisKey = this.redisPrefix + job.job_id; - var redisParams = [ - redisKey, - 'status', jobStatus.DONE, - 'updated_at', now - ]; - - if (!callback) { - callback = index; - } else if (index >= 0 && index < job.query.length) { - job.query[index].status = jobStatus.DONE; - redisParams = redisParams.concat('query', JSON.stringify(job.query)); - } - - this.metadataBackend.redisCmd(this.db, 'HMSET', redisParams , function (err) { - if (err) { - return callback(err); - } - - self.metadataBackend.redisCmd(self.db, 'EXPIRE', [ redisKey, JOBS_TTL_IN_SECONDS ], function (err) { - if (err) { - return callback(err); - } - - self.get(job.job_id, callback); - }); - }); -}; - -JobBackend.prototype.setJobPendingAndQueryDone = function (job, index, callback) { - var self = this; - var now = new Date().toISOString(); - var redisKey = this.redisPrefix + job.job_id; - - job.query[index].status = jobStatus.DONE; - - var redisParams = [ - redisKey, - 'status', jobStatus.PENDING, - 'updated_at', now, - 'query', JSON.stringify(job.query) - ]; - - this.metadataBackend.redisCmd(this.db, 'HMSET', redisParams , function (err) { - if (err) { - return callback(err); - } - - self.get(job.job_id, callback); - }); -}; - -JobBackend.prototype.setFailed = function (job, error, index, callback) { - var self = this; - var now = new Date().toISOString(); - var redisKey = this.redisPrefix + job.job_id; - var redisParams = [ - redisKey, - 'status', jobStatus.FAILED, - 'failed_reason', error.message, - 'updated_at', now - ]; - - if (!callback) { - callback = index; - } else if (index >= 0 && index < job.query.length) { - job.query[index].status = jobStatus.FAILED; - job.query[index].failed_reason = error.message; - redisParams = redisParams.concat('query', JSON.stringify(job.query)); - } - - this.metadataBackend.redisCmd(this.db, 'HMSET', redisParams , function (err) { - if (err) { - return callback(err); - } - - self.metadataBackend.redisCmd(self.db, 'EXPIRE', [ redisKey, JOBS_TTL_IN_SECONDS ], function (err) { - if (err) { - return callback(err); - } - - self.get(job.job_id, callback); - }); - }); -}; - -JobBackend.prototype.setCancelled = function (job, index, callback) { - var self = this; - var now = new Date().toISOString(); - var redisKey = this.redisPrefix + job.job_id; - var redisParams = [ - redisKey, - 'status', jobStatus.CANCELLED, - 'updated_at', now - ]; - - if (!callback) { - callback = index; - } else if (index >= 0 && index < job.query.length) { - job.query[index].status = jobStatus.CANCELLED; - redisParams = redisParams.concat('query', JSON.stringify(job.query)); - } - - this.metadataBackend.redisCmd(this.db, 'HMSET', redisParams , function (err) { - if (err) { - return callback(err); - } - - self.metadataBackend.redisCmd(self.db, 'PERSIST', [ redisKey ], function (err) { - if (err) { - return callback(err); - } - - self.get(job.job_id, callback); - }); - - }); -}; - -JobBackend.prototype.setUnknown = function (job_id, callback) { - var self = this; - - this.get(job_id, function (err, job) { - if (err) { - return callback(err); - } - - var now = new Date().toISOString(); - var redisKey = self.redisPrefix + job.job_id; - var redisParams = [ - redisKey, - 'status', jobStatus.UNKNOWN, - 'updated_at', now - ]; - - self.metadataBackend.redisCmd(self.db, 'HMSET', redisParams , function (err) { - if (err) { - return callback(err); - } - - self.metadataBackend.redisCmd(self.db, 'EXPIRE', [ redisKey, JOBS_TTL_IN_SECONDS ], function (err) { - if (err) { - return callback(err); - } - - self.get(job.job_id, callback); - }); - }); - }); -}; - - module.exports = JobBackend; diff --git a/batch/job_base.js b/batch/job_base.js new file mode 100644 index 00000000..53b16fe1 --- /dev/null +++ b/batch/job_base.js @@ -0,0 +1,137 @@ +'use strict'; + +var assert = require('assert'); +var uuid = require('node-uuid'); +var jobStatus = require('./job_status'); +var validStatusTransitions = [ + [jobStatus.PENDING, jobStatus.RUNNING], + [jobStatus.PENDING, jobStatus.CANCELLED], + [jobStatus.PENDING, jobStatus.UNKNOWN], + [jobStatus.RUNNING, jobStatus.DONE], + [jobStatus.RUNNING, jobStatus.FAILED], + [jobStatus.RUNNING, jobStatus.CANCELLED], + [jobStatus.RUNNING, jobStatus.PENDING], + [jobStatus.RUNNING, jobStatus.UNKNOWN] +]; +var mandatoryProperties = [ + 'job_id', + 'status', + 'query', + 'created_at', + 'updated_at', + 'host', + 'user' +]; + +function JobBase(data) { + var now = new Date().toISOString(); + + this.data = data; + + if (!this.data.job_id) { + this.data.job_id = uuid.v4(); + } + + if (!this.data.created_at) { + this.data.created_at = now; + } + + if (!this.data.updated_at) { + this.data.updated_at = now; + } + + if (!this.data.status) { + this.data.status = jobStatus.PENDING; + } +} + +module.exports = JobBase; + +JobBase.isValidStatusTransition = function (initialStatus, finalStatus) { + var transition = [ initialStatus, finalStatus ]; + + for (var i = 0; i < validStatusTransitions.length; i++) { + try { + assert.deepEqual(transition, validStatusTransitions[i]); + return true; + } catch (e) { + continue; + } + } + + return false; +}; + +// should be implemented by childs +JobBase.prototype.getNextQuery = function () { + throw new Error('Unimplemented method'); +}; + +// should be implemented by childs +JobBase.prototype.hasNextQuery = function () { + throw new Error('Unimplemented method'); +}; + +JobBase.prototype.isPending = function () { + return this.data.status === jobStatus.PENDING; +}; + +JobBase.prototype.isRunning = function () { + return this.data.status === jobStatus.RUNNING; +}; + +JobBase.prototype.isDone = function () { + return this.data.status === jobStatus.DONE; +}; + +JobBase.prototype.isCancelled = function () { + return this.data.status === jobStatus.CANCELLED; +}; + +JobBase.prototype.isFailed = function () { + return this.data.status === jobStatus.FAILED; +}; + +JobBase.prototype.isUnknown = function () { + return this.data.status === jobStatus.UNKNOWN; +}; + +JobBase.prototype.set = function (data) { + var now = new Date().toISOString(); + + if (data.job_id !== this.data.job_id) { + throw new Error('Cannot modify id'); + } + + this.data.update_at = now; +}; + +JobBase.prototype.setQuery = function (/* query */) { + throw new Error('Unimplemented method'); +}; + +JobBase.prototype.setStatus = function (finalStatus) { + var initialStatus = this.data.status; + var isValid = this.isValidStatusTransition(initialStatus, finalStatus); + + if (!isValid) { + throw new Error('Cannot set status from ' + initialStatus + ' to ' + finalStatus); + } + + this.data.status = finalStatus; +}; + +JobBase.prototype.validate = function () { + for (var i = 0; i < mandatoryProperties.length; i++) { + if (!this.data[mandatoryProperties[i]]) { + throw new Error('property "' + mandatoryProperties[i] + '" is mandatory'); + } + } +}; + +JobBase.prototype.serialize = function () { + var data = JSON.parse(JSON.stringify(this.data)); + delete data.host; + + return data; +}; diff --git a/batch/job_canceller.js b/batch/job_canceller.js index dfeb122a..794f1dbd 100644 --- a/batch/job_canceller.js +++ b/batch/job_canceller.js @@ -1,87 +1,47 @@ 'use strict'; var PSQL = require('cartodb-psql'); -var jobStatus = require('./job_status'); -function JobCanceller(metadataBackend, userDatabaseMetadataService, jobBackend) { - this.metadataBackend = metadataBackend; +function JobCanceller(userDatabaseMetadataService) { this.userDatabaseMetadataService = userDatabaseMetadataService; - this.jobBackend = jobBackend; } -function getIndexOfRunningQuery(job) { - if (Array.isArray(job.query)) { - for (var i = 0; i < job.query.length; i++) { - if (job.query[i].status === jobStatus.RUNNING) { - return i; - } - } - } -} +module.exports = JobCanceller; -JobCanceller.prototype.cancel = function (job_id, callback) { - var self = this; - - self.jobBackend.get(job_id, function (err, job) { +JobCanceller.prototype.cancel = function (job, callback) { + this.userDatabaseMetadataService.getUserMetadata(job.data.user, function (err, userDatabaseMetadata) { if (err) { return callback(err); } - if (job.status === jobStatus.PENDING) { - return self.jobBackend.setCancelled(job, callback); + doCancel(job.data.job_id, userDatabaseMetadata, callback); + }); +}; + +function doCancel(job_id, userDatabaseMetadata, callback) { + var pg = new PSQL(userDatabaseMetadata, {}, { destroyOnError: true }); + + getQueryPID(pg, job_id, function (err, pid) { + if (err) { + return callback(err); } - if (job.status !== jobStatus.RUNNING) { - var cancelNotAllowedError = new Error('Job is ' + job.status + ', cancel is not allowed'); - cancelNotAllowedError.name = 'CancelNotAllowedError'; - return callback(cancelNotAllowedError); - } - - self.userDatabaseMetadataService.getUserMetadata(job.user, function (err, userDatabaseMetadata) { + cancelQuery(pg, pid, function (err, isCancelled) { if (err) { return callback(err); } - self._query(job, userDatabaseMetadata, function (err, job) { - if (err) { - return callback(err); - } + if (!isCancelled) { + return callback(new Error('Query has not been cancelled')); + } - var queryIndex = getIndexOfRunningQuery(job); - - self.jobBackend.setCancelled(job, queryIndex, function (err, job) { - if (err) { - return callback(err); - } - - callback(null, job, queryIndex); - }); - }); + callback(null); }); }); -}; +} -JobCanceller.prototype.drain = function (job_id, callback) { - var self = this; - - this.cancel(job_id, function (err, job, queryIndex) { - if (err && err.name === 'CancelNotAllowedError') { - return callback(err); - } - - if (err) { - console.error('There was an error while draining job %s, %s ', job_id, err); - return self.jobBackend.setUnknown(job_id, callback); - } - - self.jobBackend.setPending(job, queryIndex, callback); - }); - -}; - -JobCanceller.prototype._query = function (job, userDatabaseMetadata, callback) { - var pg = new PSQL(userDatabaseMetadata, {}, { destroyOnError: true }); - var getPIDQuery = "SELECT pid FROM pg_stat_activity WHERE query LIKE '/* " + job.job_id + " */%'"; +function getQueryPID(pg, job_id, callback) { + var getPIDQuery = "SELECT pid FROM pg_stat_activity WHERE query LIKE '/* " + job_id + " */%'"; pg.query(getPIDQuery, function(err, result) { if (err) { @@ -92,24 +52,20 @@ JobCanceller.prototype._query = function (job, userDatabaseMetadata, callback) { return callback(new Error('Query is not running currently')); } - var pid = result.rows[0].pid; - var cancelQuery = 'SELECT pg_cancel_backend(' + pid + ')'; - - pg.query(cancelQuery, function (err, result) { - if (err) { - return callback(err); - } - - var isCancelled = result.rows[0].pg_cancel_backend; - - if (!isCancelled) { - return callback(new Error('Query has not been cancelled')); - } - - callback(null, job); - }); + callback(null, result.rows[0].pid); }); -}; +} +function cancelQuery(pg, pid, callback) { + var cancelQuery = 'SELECT pg_cancel_backend(' + pid + ')'; -module.exports = JobCanceller; + pg.query(cancelQuery, function (err, result) { + if (err) { + return callback(err); + } + + var isCancelled = result.rows[0].pg_cancel_backend; + + callback(null, isCancelled); + }); +} diff --git a/batch/job_factory.js b/batch/job_factory.js new file mode 100644 index 00000000..c8772442 --- /dev/null +++ b/batch/job_factory.js @@ -0,0 +1,24 @@ +'use strict'; + +var JobSimple = require('job_simple'); +var JobMultiple = require('job_multiple'); + +function JobFactory() { + this.jobClasses = [ JobSimple, JobMultiple ]; +} + +module.exports = JobFactory; + +JobFactory.create = function (data) { + if (!data.query) { + throw new Error('param "query" is mandatory'); + } + + for (var i = 0; i < this.jobClasses.length; i++) { + if (this.jobClasses[i].is(data.query)) { + return new this.jobClasses[i](data); + } + } + + throw new Error('there is no job class for the provided query'); +}; diff --git a/batch/job_multiple.js b/batch/job_multiple.js new file mode 100644 index 00000000..51a30cea --- /dev/null +++ b/batch/job_multiple.js @@ -0,0 +1,105 @@ +'use strict'; + +var util = require('util'); +var JobBase = require('./job_base'); +var jobStatus = require('./job_status'); + +function JobMultiple(data) { + JobBase.call(this, data); + + this.init(); +} +util.inherits(JobMultiple, JobBase); + +module.exports = JobMultiple; + +JobMultiple.prototype.is = function (query) { + if (!Array.isArray(query)) { + return false; + } + + for (var i = 0; i < query.length; i++) { + if (typeof query[i] !== 'string') { + return false; + } + } + + return true; +}; + +JobMultiple.prototype.init = function () { + for (var i = 0; i < this.data.query.length; i++) { + this.data.query[i] = { + query: this.data.query[i], + status: jobStatus.PENDING + }; + } +}; + +JobMultiple.prototype.isPending = function (index) { + var isPending = JobMultiple.super_.prototype.isPending.call(this); + + if (isPending && index) { + isPending = this.data.query[index].status === jobStatus.PENDING; + } + + return isPending; +}; + +JobMultiple.prototype.hasNextQuery = function () { + return !!this.getNextQuery(); +}; + +JobMultiple.prototype.getNextQuery = function () { + if (this.isPending()) { + for (var i = 0; i < this.data.query.length; i++) { + if (this.isPending(i)) { + return this.data.query[i].query; + } + } + } +}; + +JobMultiple.prototype.setQuery = function (query) { + var isMultiple = this.is(query); + + if (this.isPending() && isMultiple) { + this.data.query = query; + } +}; + +JobMultiple.prototype.setStatus = function (finalStatus) { + var initialStatus = this.data.status; + + // if transition is to "done" and there are more queries to run + // then job status must be "pending" instead of "done" + // else job status transition to done (if "running") + if (finalStatus === jobStatus.DONE && this.hasNextQuery()) { + JobMultiple.super_.prototype.setStatus.call(this, jobStatus.PENDING); + } else { + JobMultiple.super_.prototype.setStatus.call(this, finalStatus); + } + + for (var i = 0; i < this.data.query.length; i++) { + var isValid = JobMultiple.super_.isValidStatusTransition(this.data.query[i].status, finalStatus); + + if (isValid) { + this.data.query[i].status = finalStatus; + return; + } + } + + throw new Error('Cannot set status from ' + initialStatus + ' to ' + finalStatus); +}; + +JobMultiple.prototype.set = function (data) { + JobMultiple.super_.prototype.set.call(this, data); + + if (data.status) { + this.setStatus(data.status); + } + + if (data.query) { + this.setQuery(data.query); + } +}; diff --git a/batch/job_runner.js b/batch/job_runner.js index 64a269e9..5f8b8942 100644 --- a/batch/job_runner.js +++ b/batch/job_runner.js @@ -3,111 +3,81 @@ var errorCodes = require('../app/postgresql/error_codes').codeToCondition; var jobStatus = require('./job_status'); -function getNextQuery(job) { - if (!Array.isArray(job.query)) { - return { - query: job.query - }; - } - - for (var i = 0; i < job.query.length; i++) { - if (job.query[i].status === jobStatus.PENDING) { - return { - index: i, - query: job.query[i].query - }; - } - } -} - -function isLastQuery(job, index) { - if (!Array.isArray(job.query)) { - return true; - } - - if (index >= (job.query.length -1)) { - return true; - } - - return false; -} - -function JobRunner(jobBackend, jobQueue, queryRunner,userDatabaseMetadataService) { - this.jobBackend = jobBackend; +function JobRunner(jobService, jobQueue, queryRunner, userDatabaseMetadataService) { + this.jobService = jobService; this.jobQueue = jobQueue; this.queryRunner = queryRunner; - this.userDatabaseMetadataService = userDatabaseMetadataService; + this.userDatabaseMetadataService = userDatabaseMetadataService; // TODO: move to queryRunner } JobRunner.prototype.run = function (job_id, callback) { var self = this; - self.jobBackend.get(job_id, function (err, job) { + self.jobService.get(job_id, function (err, job) { if (err) { return callback(err); } - if (job.status !== jobStatus.PENDING) { - var invalidJobStatusError = new Error([ - 'Cannot run job', - job.job_id, - 'due to its status is', - job.status - ].join(' ')); - invalidJobStatusError.name = 'InvalidJobStatus'; - return callback(invalidJobStatusError); + try { + job.setStatus(jobStatus.RUNNING); + } catch (err) { + return callback(err); } - var query = getNextQuery(job); - - if (!query) { - var queryNotFoundError = new Error([ - 'Cannot run job', - job.job_id, - ', there is no query to run' - ].join(' ')); - queryNotFoundError.name = 'QueryNotFound'; - return callback(queryNotFoundError); - } - - self.jobBackend.setRunning(job, query.index, function (err, job) { + self.jobService.save(job, function (err, job) { if (err) { return callback(err); } - self._run(job, query, callback); + self._run(job, callback); }); }); }; -JobRunner.prototype._run = function (job, query, callback) { +JobRunner.prototype._run = function (job, callback) { var self = this; - self.userDatabaseMetadataService.getUserMetadata(job.user, function (err, userDatabaseMetadata) { + + var query = job.getNextQuery(); + + // TODO: move to query + self.userDatabaseMetadataService.getUserMetadata(job.data.user, function (err, userDatabaseMetadata) { if (err) { return callback(err); } - self.queryRunner.run(job.job_id, query.query, userDatabaseMetadata, function (err /*, result */) { + self.queryRunner.run(job.data.job_id, query, userDatabaseMetadata, function (err /*, result */) { if (err) { // if query has been cancelled then it's going to get the current // job status saved by query_canceller if (errorCodes[err.code.toString()] === 'query_canceled') { - return self.jobBackend.get(job.job_id, callback); + return self.jobService.get(job.data.job_id, callback); } - return self.jobBackend.setFailed(job, err, query.index, callback); + try { + job.setStatus(jobStatus.FAILED); + } catch (err) { + return callback(err); + } + + return self.jobService.save(job, callback); } - if (isLastQuery(job, query.index)) { - return self.jobBackend.setDone(job, query.index, callback); + try { + job.setStatus(jobStatus.DONE); + } catch (err) { + return callback(err); } - self.jobBackend.setJobPendingAndQueryDone(job, query.index, function (err, job) { + self.jobService.save(job, function (err, job) { if (err) { return callback(err); } - self.jobQueue.enqueue(job.job_id, userDatabaseMetadata.host, function (err){ + if (job.isDone()) { + return callback(null, job); + } + + self.jobQueue.enqueue(job.data.job_id, userDatabaseMetadata.host, function (err) { if (err) { return callback(err); } diff --git a/batch/job_service.js b/batch/job_service.js new file mode 100644 index 00000000..5c4d8bb3 --- /dev/null +++ b/batch/job_service.js @@ -0,0 +1,141 @@ +'use strict'; + +var JobFactory = require('./job_factory'); +var jobStatus = require('./job_status'); + +function JobService(jobBackend, jobCanceller) { + this.jobBackend = jobBackend; + this.jobCanceller = jobCanceller; +} + +module.exports = JobService; + +JobService.prototype.get = function (job_id, callback) { + this.jobBackend.get(job_id, function (err, data) { + if (err) { + return callback(err); + } + + var job; + + try { + job = JobFactory.create(data); + } catch (err) { + return callback(err); + } + + callback(null, job); + }); +}; + +JobService.prototype.list = function (user, callback) { + this.jobBackend.list(user, function (err, dataList) { + if (err) { + return callback(err); + } + + var jobList = dataList.map(function (data) { + var job; + + try { + job = JobFactory.create(data); + } catch (err) { + return console.err(err); + } + + return job; + }) + .filter(function (job) { + return job !== undefined; + }); + + callback(null, jobList); + }); +}; + +JobService.prototype.create = function (data, callback) { + try { + var job = JobFactory.create(data); + job.validate(); + this.jobBackend.create(job.data, callback); + } catch (err) { + return callback(err); + } +}; + +JobService.prototype.update = function (data, callback) { + var self = this; + + self.get(data.job_id, function (err, job) { + if (err) { + return callback(err); + } + + try { + job.set(data); + self.save(job, callback); + } catch (err) { + return callback(err); + } + }); +}; + +JobService.prototype.save = function (job, callback) { + var self = this; + + try { + job.validate(); + self.jobBackend.update(job.data, callback); + } catch (err) { + return callback(err); + } +}; + +JobService.prototype.cancel = function (job_id, callback) { + var self = this; + + self.get(job_id, function (err, job) { + if (err) { + return callback(err); + } + + try { + job.setStatus(jobStatus.CANCELLED); + } catch (err) { + return callback(err); + } + + self.jobCanceller.cancel(job, function (err) { + if (err) { + return callback(err); + } + + self.jobBackend.update(job.data, callback); + }); + }); +}; + +JobService.prototype.drain = function (job_id, callback) { + var self = this; + + self.get(job_id, function (err, job) { + if (err) { + return callback(err); + } + + self.jobCanceller.cancel(job, function (err) { + if (err) { + // console.error('There was an error while draining job %s, %s ', job_id, err); + return callback(err); + } + + try { + job.setStatus(jobStatus.PENDING); + } catch (err) { + return callback(err); + } + + self.jobBackend.update(job.data, callback); + }); + }); +}; diff --git a/batch/job_simple.js b/batch/job_simple.js new file mode 100644 index 00000000..304d3427 --- /dev/null +++ b/batch/job_simple.js @@ -0,0 +1,45 @@ +'use strict'; + +var util = require('util'); +var JobBase = require('./job_base'); + +function JobSimple(data) { + JobBase.call(this, data); +} +util.inherits(JobSimple, JobBase); + +module.exports = JobSimple; + +JobSimple.prototype.is = function (query) { + return typeof query === 'string'; +}; + +JobSimple.prototype.hasNextQuery = function () { + return this.isPending(); +}; + +JobSimple.prototype.getNextQuery = function () { + if (this.hasNextQuery()) { + return this.data.query; + } +}; + +JobSimple.prototype.setQuery = function (query) { + var isSimple = this.is(query); + + if (this.isPending() && isSimple) { + this.data.query = query; + } +}; + +JobSimple.prototype.set = function (data) { + JobSimple.super_.prototype.set.call(this, data); + + if (data.status) { + this.setStatus(data.status); + } + + if (data.query) { + this.setQuery(data.query); + } +}; diff --git a/batch/query_runner.js b/batch/query_runner.js index 9e545514..1d5701ff 100644 --- a/batch/query_runner.js +++ b/batch/query_runner.js @@ -5,8 +5,9 @@ var PSQL = require('cartodb-psql'); function QueryRunner() { } -QueryRunner.prototype.run = function (job_id, sql, userDatabaseMetadata, callback) { +module.exports = QueryRunner; +QueryRunner.prototype.run = function (job_id, sql, userDatabaseMetadata, callback) { var pg = new PSQL(userDatabaseMetadata, {}, { destroyOnError: true }); pg.query('SET statement_timeout=0', function (err) { @@ -35,6 +36,3 @@ QueryRunner.prototype.run = function (job_id, sql, userDatabaseMetadata, callbac }); }; - - -module.exports = QueryRunner; From d2d3ba8159e01b7797a3912b711d3b01f3544d05 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Garc=C3=ADa=20Aubert?= Date: Mon, 16 May 2016 01:22:47 +0200 Subject: [PATCH 2/9] Passed tests --- app/app.js | 8 +++- batch/index.js | 2 +- batch/job_backend.js | 35 +++++++++++--- batch/job_base.js | 22 ++++----- batch/job_canceller.js | 4 +- batch/job_factory.js | 14 +++--- batch/job_multiple.js | 64 ++++++++----------------- batch/job_runner.js | 10 ++-- batch/job_service.js | 34 +++++++++++-- batch/job_simple.js | 28 +++-------- test/acceptance/batch.test.js | 20 ++++++-- test/acceptance/job.use-case-1.test.js | 2 +- test/acceptance/job.use-case-10.test.js | 9 ++-- test/acceptance/job.use-case-2.test.js | 2 +- test/acceptance/job.use-case-3.test.js | 2 +- test/acceptance/job.use-case-8.test.js | 9 +++- test/acceptance/job.use-case-9.test.js | 15 ++++-- 17 files changed, 160 insertions(+), 120 deletions(-) diff --git a/app/app.js b/app/app.js index 5b05e4be..173c2e86 100644 --- a/app/app.js +++ b/app/app.js @@ -28,6 +28,8 @@ var JobQueue = require('../batch/job_queue'); var UserIndexer = require('../batch/user_indexer'); var JobBackend = require('../batch/job_backend'); var JobCanceller = require('../batch/job_canceller'); +var JobService = require('../batch/job_service'); + var UserDatabaseMetadataService = require('../batch/user_database_metadata_service'); var cors = require('./middlewares/cors'); @@ -183,7 +185,9 @@ function App() { var userIndexer = new UserIndexer(metadataBackend); var jobBackend = new JobBackend(metadataBackend, jobQueue, jobPublisher, userIndexer); var userDatabaseMetadataService = new UserDatabaseMetadataService(metadataBackend); - var jobCanceller = new JobCanceller(metadataBackend, userDatabaseMetadataService, jobBackend); + var jobCanceller = new JobCanceller(userDatabaseMetadataService); + var jobService = new JobService(jobBackend, jobCanceller); + var genericController = new GenericController(); genericController.route(app); @@ -191,7 +195,7 @@ function App() { var queryController = new QueryController(userDatabaseService, tableCache, statsd_client); queryController.route(app); - var jobController = new JobController(userDatabaseService, jobBackend, jobCanceller); + var jobController = new JobController(userDatabaseService, jobService, jobCanceller); jobController.route(app); var cacheStatusController = new CacheStatusController(tableCache); diff --git a/batch/index.js b/batch/index.js index 3d996c63..985d7204 100644 --- a/batch/index.js +++ b/batch/index.js @@ -26,7 +26,7 @@ module.exports = function batchFactory (metadataBackend) { var userDatabaseMetadataService = new UserDatabaseMetadataService(metadataBackend); // TODO: down userDatabaseMetadataService var queryRunner = new QueryRunner(); - var jobCanceller = new JobCanceller(metadataBackend, userDatabaseMetadataService, jobBackend); + var jobCanceller = new JobCanceller(userDatabaseMetadataService); var jobService = new JobService(jobBackend, jobCanceller); var jobRunner = new JobRunner(jobService, jobQueue, queryRunner, userDatabaseMetadataService); diff --git a/batch/job_backend.js b/batch/job_backend.js index 7134d7f5..21f0f908 100644 --- a/batch/job_backend.js +++ b/batch/job_backend.js @@ -19,24 +19,45 @@ function JobBackend(metadataBackend, jobQueueProducer, jobPublisher, userIndexer this.userIndexer = userIndexer; } -JobBackend.prototype.toRedisParams = function (obj) { - var redisParams = []; +JobBackend.prototype.toRedisParams = function (data) { + var redisParams = [this.redisPrefix + data.job_id]; + var obj = JSON.parse(JSON.stringify(data)); + delete obj.job_id; + for (var property in obj) { if (obj.hasOwnProperty(property)) { redisParams.push(property); - redisParams.push(obj[property]); + if (property === 'query' && typeof obj[property] !== 'string') { + redisParams.push(JSON.stringify(obj[property])); + } else { + redisParams.push(obj[property]); + } } } return redisParams; }; -JobBackend.prototype.toObject = function (redisParams, redisValues) { +JobBackend.prototype.toObject = function (job_id, redisParams, redisValues) { var obj = {}; + + redisParams.shift(); // job_id value + redisParams.pop(); // WARN: weird function pushed by metadataBackend + for (var i = 0; i < redisParams.length; i++) { - obj[redisParams[i]] = redisValues[i]; + if (redisParams[i] === 'query') { + try { + obj[redisParams[i]] = JSON.parse(redisValues[i]); + } catch (e) { + obj[redisParams[i]] = redisValues[i]; + } + } else { + obj[redisParams[i]] = redisValues[i]; + } } + obj.job_id = job_id; // adds redisKey as object property + return obj; }; @@ -54,6 +75,7 @@ JobBackend.prototype.get = function (job_id, callback) { 'query', 'created_at', 'updated_at', + 'host', 'failed_reason' ]; @@ -68,8 +90,7 @@ JobBackend.prototype.get = function (job_id, callback) { return callback(notFoundError); } - var jobData = self.toObject(redisParams.slice(1), redisValues); - jobData.job_id = job_id; + var jobData = self.toObject(job_id, redisParams, redisValues); callback(null, jobData); }); diff --git a/batch/job_base.js b/batch/job_base.js index 53b16fe1..350739c6 100644 --- a/batch/job_base.js +++ b/batch/job_base.js @@ -47,7 +47,7 @@ function JobBase(data) { module.exports = JobBase; -JobBase.isValidStatusTransition = function (initialStatus, finalStatus) { +JobBase.prototype.isValidStatusTransition = function (initialStatus, finalStatus) { var transition = [ initialStatus, finalStatus ]; for (var i = 0; i < validStatusTransitions.length; i++) { @@ -67,11 +67,12 @@ JobBase.prototype.getNextQuery = function () { throw new Error('Unimplemented method'); }; -// should be implemented by childs JobBase.prototype.hasNextQuery = function () { - throw new Error('Unimplemented method'); + return !!this.getNextQuery(); }; + + JobBase.prototype.isPending = function () { return this.data.status === jobStatus.PENDING; }; @@ -96,21 +97,19 @@ JobBase.prototype.isUnknown = function () { return this.data.status === jobStatus.UNKNOWN; }; -JobBase.prototype.set = function (data) { +JobBase.prototype.setQuery = function (query) { var now = new Date().toISOString(); - if (data.job_id !== this.data.job_id) { - throw new Error('Cannot modify id'); + if (!this.isPending()) { + throw new Error('Job is not pending, it cannot be updated'); } - this.data.update_at = now; -}; - -JobBase.prototype.setQuery = function (/* query */) { - throw new Error('Unimplemented method'); + this.data.updated_at = now; + this.data.query = query; }; JobBase.prototype.setStatus = function (finalStatus) { + var now = new Date().toISOString(); var initialStatus = this.data.status; var isValid = this.isValidStatusTransition(initialStatus, finalStatus); @@ -118,6 +117,7 @@ JobBase.prototype.setStatus = function (finalStatus) { throw new Error('Cannot set status from ' + initialStatus + ' to ' + finalStatus); } + this.data.updated_at = now; this.data.status = finalStatus; }; diff --git a/batch/job_canceller.js b/batch/job_canceller.js index 794f1dbd..fb41b683 100644 --- a/batch/job_canceller.js +++ b/batch/job_canceller.js @@ -26,7 +26,7 @@ function doCancel(job_id, userDatabaseMetadata, callback) { return callback(err); } - cancelQuery(pg, pid, function (err, isCancelled) { + doCancelQuery(pg, pid, function (err, isCancelled) { if (err) { return callback(err); } @@ -56,7 +56,7 @@ function getQueryPID(pg, job_id, callback) { }); } -function cancelQuery(pg, pid, callback) { +function doCancelQuery(pg, pid, callback) { var cancelQuery = 'SELECT pg_cancel_backend(' + pid + ')'; pg.query(cancelQuery, function (err, result) { diff --git a/batch/job_factory.js b/batch/job_factory.js index c8772442..ca90d5f1 100644 --- a/batch/job_factory.js +++ b/batch/job_factory.js @@ -1,22 +1,22 @@ 'use strict'; -var JobSimple = require('job_simple'); -var JobMultiple = require('job_multiple'); +var JobSimple = require('./job_simple'); +var JobMultiple = require('./job_multiple'); +var jobClasses = [ JobSimple, JobMultiple ]; function JobFactory() { - this.jobClasses = [ JobSimple, JobMultiple ]; } module.exports = JobFactory; JobFactory.create = function (data) { if (!data.query) { - throw new Error('param "query" is mandatory'); + throw new Error('You must indicate a valid SQL'); } - for (var i = 0; i < this.jobClasses.length; i++) { - if (this.jobClasses[i].is(data.query)) { - return new this.jobClasses[i](data); + for (var i = 0; i < jobClasses.length; i++) { + if (jobClasses[i].is(data.query)) { + return new jobClasses[i](data); } } diff --git a/batch/job_multiple.js b/batch/job_multiple.js index 51a30cea..2f32089b 100644 --- a/batch/job_multiple.js +++ b/batch/job_multiple.js @@ -13,14 +13,19 @@ util.inherits(JobMultiple, JobBase); module.exports = JobMultiple; -JobMultiple.prototype.is = function (query) { +JobMultiple.is = function (query) { if (!Array.isArray(query)) { return false; } + // 1. From user: ['select * from ...', 'select * from ...'] + // 2. From redis: [ { query: 'select * from ...', status: 'pending' }, + // { query: 'select * from ...', status: 'pending' } ] for (var i = 0; i < query.length; i++) { if (typeof query[i] !== 'string') { - return false; + if (typeof query[i].query !== 'string') { + return false; + } } } @@ -29,48 +34,33 @@ JobMultiple.prototype.is = function (query) { JobMultiple.prototype.init = function () { for (var i = 0; i < this.data.query.length; i++) { - this.data.query[i] = { - query: this.data.query[i], - status: jobStatus.PENDING - }; + if (!this.data.query[i].query && !this.data.query[i].status) { + this.data.query[i] = { + query: this.data.query[i], + status: jobStatus.PENDING + }; + } } }; -JobMultiple.prototype.isPending = function (index) { - var isPending = JobMultiple.super_.prototype.isPending.call(this); - - if (isPending && index) { - isPending = this.data.query[index].status === jobStatus.PENDING; - } - - return isPending; -}; - -JobMultiple.prototype.hasNextQuery = function () { - return !!this.getNextQuery(); -}; - JobMultiple.prototype.getNextQuery = function () { - if (this.isPending()) { - for (var i = 0; i < this.data.query.length; i++) { - if (this.isPending(i)) { - return this.data.query[i].query; - } + for (var i = 0; i < this.data.query.length; i++) { + if (this.data.query[i].status === jobStatus.PENDING) { + return this.data.query[i].query; } } }; JobMultiple.prototype.setQuery = function (query) { - var isMultiple = this.is(query); - - if (this.isPending() && isMultiple) { - this.data.query = query; + if (!JobMultiple.is(query)) { + throw new Error('You must indicate a valid SQL'); } + + JobMultiple.super_.prototype.setQuery.call(this, query); }; JobMultiple.prototype.setStatus = function (finalStatus) { var initialStatus = this.data.status; - // if transition is to "done" and there are more queries to run // then job status must be "pending" instead of "done" // else job status transition to done (if "running") @@ -81,7 +71,7 @@ JobMultiple.prototype.setStatus = function (finalStatus) { } for (var i = 0; i < this.data.query.length; i++) { - var isValid = JobMultiple.super_.isValidStatusTransition(this.data.query[i].status, finalStatus); + var isValid = JobMultiple.super_.prototype.isValidStatusTransition(this.data.query[i].status, finalStatus); if (isValid) { this.data.query[i].status = finalStatus; @@ -91,15 +81,3 @@ JobMultiple.prototype.setStatus = function (finalStatus) { throw new Error('Cannot set status from ' + initialStatus + ' to ' + finalStatus); }; - -JobMultiple.prototype.set = function (data) { - JobMultiple.super_.prototype.set.call(this, data); - - if (data.status) { - this.setStatus(data.status); - } - - if (data.query) { - this.setQuery(data.query); - } -}; diff --git a/batch/job_runner.js b/batch/job_runner.js index 5f8b8942..7c8bd73e 100644 --- a/batch/job_runner.js +++ b/batch/job_runner.js @@ -18,6 +18,8 @@ JobRunner.prototype.run = function (job_id, callback) { return callback(err); } + var query = job.getNextQuery(); + try { job.setStatus(jobStatus.RUNNING); } catch (err) { @@ -29,16 +31,14 @@ JobRunner.prototype.run = function (job_id, callback) { return callback(err); } - self._run(job, callback); + self._run(job, query, callback); }); }); }; -JobRunner.prototype._run = function (job, callback) { +JobRunner.prototype._run = function (job, query, callback) { var self = this; - var query = job.getNextQuery(); - // TODO: move to query self.userDatabaseMetadataService.getUserMetadata(job.data.user, function (err, userDatabaseMetadata) { if (err) { @@ -59,7 +59,7 @@ JobRunner.prototype._run = function (job, callback) { return callback(err); } - return self.jobService.save(job, callback); + return self.jobService.save(job, callback); } try { diff --git a/batch/job_service.js b/batch/job_service.js index 5c4d8bb3..d9cc3ca1 100644 --- a/batch/job_service.js +++ b/batch/job_service.js @@ -57,7 +57,12 @@ JobService.prototype.create = function (data, callback) { try { var job = JobFactory.create(data); job.validate(); - this.jobBackend.create(job.data, callback); + this.jobBackend.create(job.data, function (err) { + if (err) { + return callback(err); + } + callback(null, job); + }); } catch (err) { return callback(err); } @@ -72,7 +77,7 @@ JobService.prototype.update = function (data, callback) { } try { - job.set(data); + job.setQuery(data.query); self.save(job, callback); } catch (err) { return callback(err); @@ -85,10 +90,23 @@ JobService.prototype.save = function (job, callback) { try { job.validate(); - self.jobBackend.update(job.data, callback); } catch (err) { return callback(err); } + + self.jobBackend.update(job.data, function (err, data) { + if (err) { + return callback(err); + } + + try { + job = JobFactory.create(data); + } catch (err) { + return callback(err); + } + + callback(null, job); + }); }; JobService.prototype.cancel = function (job_id, callback) { @@ -99,18 +117,24 @@ JobService.prototype.cancel = function (job_id, callback) { return callback(err); } + var isPending = job.isPending(); + try { job.setStatus(jobStatus.CANCELLED); } catch (err) { return callback(err); } + if (isPending) { + return self.save(job, callback); + } + self.jobCanceller.cancel(job, function (err) { if (err) { return callback(err); } - self.jobBackend.update(job.data, callback); + self.save(job, callback); }); }); }; @@ -125,7 +149,7 @@ JobService.prototype.drain = function (job_id, callback) { self.jobCanceller.cancel(job, function (err) { if (err) { - // console.error('There was an error while draining job %s, %s ', job_id, err); + console.error('There was an error while draining job %s, %s ', job_id, err); return callback(err); } diff --git a/batch/job_simple.js b/batch/job_simple.js index 304d3427..6e56fedb 100644 --- a/batch/job_simple.js +++ b/batch/job_simple.js @@ -10,36 +10,20 @@ util.inherits(JobSimple, JobBase); module.exports = JobSimple; -JobSimple.prototype.is = function (query) { +JobSimple.is = function (query) { return typeof query === 'string'; }; -JobSimple.prototype.hasNextQuery = function () { - return this.isPending(); -}; - JobSimple.prototype.getNextQuery = function () { - if (this.hasNextQuery()) { + if (this.isPending()) { return this.data.query; } }; JobSimple.prototype.setQuery = function (query) { - var isSimple = this.is(query); - - if (this.isPending() && isSimple) { - this.data.query = query; - } -}; - -JobSimple.prototype.set = function (data) { - JobSimple.super_.prototype.set.call(this, data); - - if (data.status) { - this.setStatus(data.status); - } - - if (data.query) { - this.setQuery(data.query); + if (!JobSimple.is(query)) { + throw new Error('You must indicate a valid SQL'); } + + JobSimple.super_.prototype.setQuery.call(this, query); }; diff --git a/test/acceptance/batch.test.js b/test/acceptance/batch.test.js index 02f51f75..5035c011 100644 --- a/test/acceptance/batch.test.js +++ b/test/acceptance/batch.test.js @@ -3,10 +3,14 @@ var _ = require('underscore'); var redis = require('redis'); var queue = require('queue-async'); var batchFactory = require('../../batch'); + var JobPublisher = require('../../batch/job_publisher'); var JobQueue = require('../../batch/job_queue'); var UserIndexer = require('../../batch/user_indexer'); var JobBackend = require('../../batch/job_backend'); +var JobService = require('../../batch/job_service'); +var UserDatabaseMetadataService = require('../../batch/user_database_metadata_service'); +var JobCanceller = require('../../batch/job_canceller'); var metadataBackend = require('cartodb-redis')({ host: global.settings.redis_host, port: global.settings.redis_port, @@ -22,6 +26,9 @@ describe('batch module', function() { var jobPublisher = new JobPublisher(redis); var userIndexer = new UserIndexer(metadataBackend); var jobBackend = new JobBackend(metadataBackend, jobQueue, jobPublisher, userIndexer); + var userDatabaseMetadataService = new UserDatabaseMetadataService(metadataBackend); + var jobCanceller = new JobCanceller(userDatabaseMetadataService); + var jobService = new JobService(jobBackend, jobCanceller); var batch = batchFactory(metadataBackend); @@ -37,12 +44,18 @@ describe('batch module', function() { }); function createJob(sql, done) { - jobBackend.create(username, sql, dbInstance, function (err, job) { + var data = { + user: username, + query: sql, + host: dbInstance + }; + + jobService.create(data, function (err, job) { if (err) { return done(err); } - done(null, job); + done(null, job.serialize()); }); } @@ -196,8 +209,7 @@ describe('batch module', function() { }); it('should perform job with array of select', function (done) { - var queries = ['select * from private_table', 'select * from private_table']; - + var queries = ['select * from private_table limit 1', 'select * from private_table']; createJob(queries, function (err, job) { if (err) { diff --git a/test/acceptance/job.use-case-1.test.js b/test/acceptance/job.use-case-1.test.js index ab1dd41b..2c965bac 100644 --- a/test/acceptance/job.use-case-1.test.js +++ b/test/acceptance/job.use-case-1.test.js @@ -91,7 +91,7 @@ describe('Use case 1: cancel and modify a done job', function () { status: 400 }, function(res) { var errors = JSON.parse(res.body); - assert.equal(errors.error[0], "Job is done, cancel is not allowed"); + assert.equal(errors.error[0], "Cannot set status from done to cancelled"); done(); }); }); diff --git a/test/acceptance/job.use-case-10.test.js b/test/acceptance/job.use-case-10.test.js index 444e56d8..a327bd9f 100644 --- a/test/acceptance/job.use-case-10.test.js +++ b/test/acceptance/job.use-case-10.test.js @@ -26,7 +26,7 @@ var metadataBackend = require('cartodb-redis')({ }); var batchFactory = require('../../batch'); -describe('Use case 1: cancel and modify a done multiquery job', function () { +describe('Use case 10: cancel and modify a done multiquery job', function () { var batch = batchFactory(metadataBackend); @@ -95,7 +95,7 @@ describe('Use case 1: cancel and modify a done multiquery job', function () { status: 400 }, function(res) { var errors = JSON.parse(res.body); - assert.equal(errors.error[0], "Job is done, cancel is not allowed"); + assert.equal(errors.error[0], "Cannot set status from done to cancelled"); done(); }); }); @@ -106,7 +106,10 @@ describe('Use case 1: cancel and modify a done multiquery job', function () { headers: { 'host': 'vizzuality.cartodb.com', 'Content-Type': 'application/x-www-form-urlencoded' }, method: 'PUT', data: querystring.stringify({ - query: "SELECT cartodb_id FROM untitle_table_4" + query: [ + "SELECT * FROM untitle_table_4", + "SELECT * FROM untitle_table_4" + ] }) }, { status: 400 diff --git a/test/acceptance/job.use-case-2.test.js b/test/acceptance/job.use-case-2.test.js index 205a9920..c2552d63 100644 --- a/test/acceptance/job.use-case-2.test.js +++ b/test/acceptance/job.use-case-2.test.js @@ -121,7 +121,7 @@ describe('Use case 2: cancel a running job', function() { status: 400 }, function(res) { var errors = JSON.parse(res.body); - assert.equal(errors.error[0], "Job is cancelled, cancel is not allowed"); + assert.equal(errors.error[0], "Cannot set status from cancelled to cancelled"); done(); }); }); diff --git a/test/acceptance/job.use-case-3.test.js b/test/acceptance/job.use-case-3.test.js index 9d666132..a4d36055 100644 --- a/test/acceptance/job.use-case-3.test.js +++ b/test/acceptance/job.use-case-3.test.js @@ -97,7 +97,7 @@ describe('Use case 3: cancel a pending job', function() { }, 50); }); - it('Step 4, cancel a job should be cancelled', function (done){ + it('Step 4, cancel a pending job should be cancelled', function (done){ assert.response(app, { url: '/api/v2/sql/job/' + pendingJob.job_id + '?api_key=1234', headers: { 'host': 'vizzuality.cartodb.com', 'Content-Type': 'application/x-www-form-urlencoded' }, diff --git a/test/acceptance/job.use-case-8.test.js b/test/acceptance/job.use-case-8.test.js index ff48a326..2ea0fa19 100644 --- a/test/acceptance/job.use-case-8.test.js +++ b/test/acceptance/job.use-case-8.test.js @@ -125,7 +125,7 @@ describe('Use case 8: cancel a running multiquery job', function() { status: 400 }, function(res) { var errors = JSON.parse(res.body); - assert.equal(errors.error[0], "Job is cancelled, cancel is not allowed"); + assert.equal(errors.error[0], "Cannot set status from cancelled to cancelled"); done(); }); }); @@ -136,7 +136,12 @@ describe('Use case 8: cancel a running multiquery job', function() { headers: { 'host': 'vizzuality.cartodb.com', 'Content-Type': 'application/x-www-form-urlencoded' }, method: 'PUT', data: querystring.stringify({ - query: "SELECT cartodb_id FROM untitle_table_4" + query: [ + "select pg_sleep(1)", + "select pg_sleep(1)", + "select pg_sleep(1)", + "select pg_sleep(1)" + ] }) }, { status: 400 diff --git a/test/acceptance/job.use-case-9.test.js b/test/acceptance/job.use-case-9.test.js index b9378fbe..ab56ee7c 100644 --- a/test/acceptance/job.use-case-9.test.js +++ b/test/acceptance/job.use-case-9.test.js @@ -103,20 +103,29 @@ describe('Use case 9: modify a pending multiquery job', function() { }, 50); }); - it('Step 4, multiquery job should be modified', function (done){ + it('Step 4, multiquery job should be modified', function (done) { assert.response(app, { url: '/api/v2/sql/job/' + pendingJob.job_id + '?api_key=1234', headers: { 'host': 'vizzuality.cartodb.com', 'Content-Type': 'application/x-www-form-urlencoded' }, method: 'PUT', data: querystring.stringify({ - query: "SELECT cartodb_id FROM untitle_table_4" + query: [ + "SELECT * FROM untitle_table_4", + "SELECT * FROM untitle_table_4 limit 1" + ] }) }, { status: 200 }, function(res) { var jobGot = JSON.parse(res.body); assert.equal(jobGot.job_id, pendingJob.job_id); - assert.equal(jobGot.query, "SELECT cartodb_id FROM untitle_table_4"); + assert.deepEqual(jobGot.query, [{ + query: 'SELECT * FROM untitle_table_4', + status: 'pending' + }, { + query: 'SELECT * FROM untitle_table_4 limit 1', + status: 'pending' + }]); done(); }); }); From 40d82112a8336818451ce43e2923f7f2696f95c8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Garc=C3=ADa=20Aubert?= Date: Tue, 17 May 2016 19:41:31 +0200 Subject: [PATCH 3/9] Used REDIS_PREFIX as constant --- batch/job_backend.js | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/batch/job_backend.js b/batch/job_backend.js index 21f0f908..22f184ce 100644 --- a/batch/job_backend.js +++ b/batch/job_backend.js @@ -1,6 +1,7 @@ 'use strict'; var queue = require('queue-async'); +var REDIS_PREFIX = 'batch:jobs:'; var JOBS_TTL_IN_SECONDS = global.settings.jobs_ttl_in_seconds || 48 * 3600; // 48 hours var jobStatus = require('./job_status'); var finalStatus = [ @@ -12,7 +13,6 @@ var finalStatus = [ function JobBackend(metadataBackend, jobQueueProducer, jobPublisher, userIndexer) { this.db = 5; - this.redisPrefix = 'batch:jobs:'; this.metadataBackend = metadataBackend; this.jobQueueProducer = jobQueueProducer; this.jobPublisher = jobPublisher; @@ -20,7 +20,7 @@ function JobBackend(metadataBackend, jobQueueProducer, jobPublisher, userIndexer } JobBackend.prototype.toRedisParams = function (data) { - var redisParams = [this.redisPrefix + data.job_id]; + var redisParams = [REDIS_PREFIX + data.job_id]; var obj = JSON.parse(JSON.stringify(data)); delete obj.job_id; @@ -69,7 +69,7 @@ function isJobFound(redisValues) { JobBackend.prototype.get = function (job_id, callback) { var self = this; var redisParams = [ - this.redisPrefix + job_id, + REDIS_PREFIX + job_id, 'user', 'status', 'query', @@ -172,7 +172,7 @@ function isFrozen(status) { JobBackend.prototype.setTTL = function (data, callback) { var self = this; - var redisKey = this.redisPrefix + data.job_id; + var redisKey = REDIS_PREFIX + data.job_id; if (!isFrozen(data.status)) { return callback(); From 672b8ef5376a3da89abe86166c5e19e02babd4ca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Garc=C3=ADa=20Aubert?= Date: Tue, 17 May 2016 19:44:45 +0200 Subject: [PATCH 4/9] Made pure functions --- batch/job_backend.js | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/batch/job_backend.js b/batch/job_backend.js index 22f184ce..fc35961c 100644 --- a/batch/job_backend.js +++ b/batch/job_backend.js @@ -19,7 +19,7 @@ function JobBackend(metadataBackend, jobQueueProducer, jobPublisher, userIndexer this.userIndexer = userIndexer; } -JobBackend.prototype.toRedisParams = function (data) { +function toRedisParams(data) { var redisParams = [REDIS_PREFIX + data.job_id]; var obj = JSON.parse(JSON.stringify(data)); delete obj.job_id; @@ -36,9 +36,9 @@ JobBackend.prototype.toRedisParams = function (data) { } return redisParams; -}; +} -JobBackend.prototype.toObject = function (job_id, redisParams, redisValues) { +function toObject(job_id, redisParams, redisValues) { var obj = {}; redisParams.shift(); // job_id value @@ -59,7 +59,7 @@ JobBackend.prototype.toObject = function (job_id, redisParams, redisValues) { obj.job_id = job_id; // adds redisKey as object property return obj; -}; +} // TODO: is it really necessary?? function isJobFound(redisValues) { @@ -79,7 +79,7 @@ JobBackend.prototype.get = function (job_id, callback) { 'failed_reason' ]; - this.metadataBackend.redisCmd(this.db, 'HMGET', redisParams , function (err, redisValues) { + self.metadataBackend.redisCmd(this.db, 'HMGET', redisParams , function (err, redisValues) { if (err) { return callback(err); } @@ -90,7 +90,7 @@ JobBackend.prototype.get = function (job_id, callback) { return callback(notFoundError); } - var jobData = self.toObject(job_id, redisParams, redisValues); + var jobData = toObject(job_id, redisParams, redisValues); callback(null, jobData); }); @@ -143,7 +143,7 @@ JobBackend.prototype.update = function (data, callback) { JobBackend.prototype.save = function (data, callback) { var self = this; - var redisParams = self.toRedisParams(data); + var redisParams = toRedisParams(data); self.metadataBackend.redisCmd(self.db, 'HMSET', redisParams , function (err) { if (err) { From b06eaae5e6753cf916e00e6e3319f0bb21f94a8a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Garc=C3=ADa=20Aubert?= Date: Tue, 17 May 2016 19:48:55 +0200 Subject: [PATCH 5/9] Use REDIS_DB as constant --- batch/job_backend.js | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/batch/job_backend.js b/batch/job_backend.js index fc35961c..a4945a2b 100644 --- a/batch/job_backend.js +++ b/batch/job_backend.js @@ -2,6 +2,7 @@ var queue = require('queue-async'); var REDIS_PREFIX = 'batch:jobs:'; +var REDIS_DB = 5; var JOBS_TTL_IN_SECONDS = global.settings.jobs_ttl_in_seconds || 48 * 3600; // 48 hours var jobStatus = require('./job_status'); var finalStatus = [ @@ -12,7 +13,6 @@ var finalStatus = [ ]; function JobBackend(metadataBackend, jobQueueProducer, jobPublisher, userIndexer) { - this.db = 5; this.metadataBackend = metadataBackend; this.jobQueueProducer = jobQueueProducer; this.jobPublisher = jobPublisher; @@ -79,7 +79,7 @@ JobBackend.prototype.get = function (job_id, callback) { 'failed_reason' ]; - self.metadataBackend.redisCmd(this.db, 'HMGET', redisParams , function (err, redisValues) { + self.metadataBackend.redisCmd(REDIS_DB, 'HMGET', redisParams , function (err, redisValues) { if (err) { return callback(err); } @@ -145,7 +145,7 @@ JobBackend.prototype.save = function (data, callback) { var self = this; var redisParams = toRedisParams(data); - self.metadataBackend.redisCmd(self.db, 'HMSET', redisParams , function (err) { + self.metadataBackend.redisCmd(REDIS_DB, 'HMSET', redisParams , function (err) { if (err) { return callback(err); } @@ -178,7 +178,7 @@ JobBackend.prototype.setTTL = function (data, callback) { return callback(); } - self.metadataBackend.redisCmd(self.db, 'EXPIRE', [ redisKey, JOBS_TTL_IN_SECONDS ], callback); + self.metadataBackend.redisCmd(REDIS_DB, 'EXPIRE', [ redisKey, JOBS_TTL_IN_SECONDS ], callback); }; JobBackend.prototype.list = function (user, callback) { From 26cd7785830a363d76e6edc0d67fb8f63978a710 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Garc=C3=ADa=20Aubert?= Date: Tue, 17 May 2016 19:55:33 +0200 Subject: [PATCH 6/9] Improved param name --- batch/job_backend.js | 40 ++++++++++++++++++++-------------------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/batch/job_backend.js b/batch/job_backend.js index a4945a2b..31d7cdd4 100644 --- a/batch/job_backend.js +++ b/batch/job_backend.js @@ -19,9 +19,9 @@ function JobBackend(metadataBackend, jobQueueProducer, jobPublisher, userIndexer this.userIndexer = userIndexer; } -function toRedisParams(data) { - var redisParams = [REDIS_PREFIX + data.job_id]; - var obj = JSON.parse(JSON.stringify(data)); +function toRedisParams(job) { + var redisParams = [REDIS_PREFIX + job.job_id]; + var obj = JSON.parse(JSON.stringify(job)); delete obj.job_id; for (var property in obj) { @@ -96,66 +96,66 @@ JobBackend.prototype.get = function (job_id, callback) { }); }; -JobBackend.prototype.create = function (data, callback) { +JobBackend.prototype.create = function (job, callback) { var self = this; - self.get(data.job_id, function (err) { + self.get(job.job_id, function (err) { if (err && err.name !== 'NotFoundError') { return callback(err); } - self.save(data, function (err, job) { + self.save(job, function (err, jobSaved) { if (err) { return callback(err); } - self.jobQueueProducer.enqueue(data.job_id, data.host, function (err) { + self.jobQueueProducer.enqueue(job.job_id, job.host, function (err) { if (err) { return callback(err); } // broadcast to consumers - self.jobPublisher.publish(data.host); + self.jobPublisher.publish(job.host); - self.userIndexer.add(data.user, data.job_id, function (err) { + self.userIndexer.add(job.user, job.job_id, function (err) { if (err) { return callback(err); } - callback(null, job); + callback(null, jobSaved); }); }); }); }); }; -JobBackend.prototype.update = function (data, callback) { +JobBackend.prototype.update = function (job, callback) { var self = this; - self.get(data.job_id, function (err) { + self.get(job.job_id, function (err) { if (err) { return callback(err); } - self.save(data, callback); + self.save(job, callback); }); }; -JobBackend.prototype.save = function (data, callback) { +JobBackend.prototype.save = function (job, callback) { var self = this; - var redisParams = toRedisParams(data); + var redisParams = toRedisParams(job); self.metadataBackend.redisCmd(REDIS_DB, 'HMSET', redisParams , function (err) { if (err) { return callback(err); } - self.setTTL(data, function (err) { + self.setTTL(job, function (err) { if (err) { return callback(err); } - self.get(data.job_id, function (err, job) { + self.get(job.job_id, function (err, job) { if (err) { return callback(err); } @@ -170,11 +170,11 @@ function isFrozen(status) { return finalStatus.indexOf(status) !== -1; } -JobBackend.prototype.setTTL = function (data, callback) { +JobBackend.prototype.setTTL = function (job, callback) { var self = this; - var redisKey = REDIS_PREFIX + data.job_id; + var redisKey = REDIS_PREFIX + job.job_id; - if (!isFrozen(data.status)) { + if (!isFrozen(job.status)) { return callback(); } From 9441017ed38a9d1b4168997f93a64d3cd88ddf17 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Garc=C3=ADa=20Aubert?= Date: Tue, 17 May 2016 20:00:45 +0200 Subject: [PATCH 7/9] Moved job classes to a models folder --- batch/job_factory.js | 10 ++++------ batch/models/index.js | 6 ++++++ batch/{ => models}/job_base.js | 2 +- batch/{ => models}/job_multiple.js | 2 +- batch/{ => models}/job_simple.js | 0 5 files changed, 12 insertions(+), 8 deletions(-) create mode 100644 batch/models/index.js rename batch/{ => models}/job_base.js (98%) rename batch/{ => models}/job_multiple.js (98%) rename batch/{ => models}/job_simple.js (100%) diff --git a/batch/job_factory.js b/batch/job_factory.js index ca90d5f1..6fa8aa80 100644 --- a/batch/job_factory.js +++ b/batch/job_factory.js @@ -1,8 +1,6 @@ 'use strict'; -var JobSimple = require('./job_simple'); -var JobMultiple = require('./job_multiple'); -var jobClasses = [ JobSimple, JobMultiple ]; +var jobModels = require('./models'); function JobFactory() { } @@ -14,9 +12,9 @@ JobFactory.create = function (data) { throw new Error('You must indicate a valid SQL'); } - for (var i = 0; i < jobClasses.length; i++) { - if (jobClasses[i].is(data.query)) { - return new jobClasses[i](data); + for (var i = 0; i < jobModels.length; i++) { + if (jobModels[i].is(data.query)) { + return new jobModels[i](data); } } diff --git a/batch/models/index.js b/batch/models/index.js new file mode 100644 index 00000000..0e80c124 --- /dev/null +++ b/batch/models/index.js @@ -0,0 +1,6 @@ +'use strict'; + +var JobSimple = require('./job_simple'); +var JobMultiple = require('./job_multiple'); + +module.exports = [ JobSimple, JobMultiple ]; diff --git a/batch/job_base.js b/batch/models/job_base.js similarity index 98% rename from batch/job_base.js rename to batch/models/job_base.js index 350739c6..8fd6d9d2 100644 --- a/batch/job_base.js +++ b/batch/models/job_base.js @@ -2,7 +2,7 @@ var assert = require('assert'); var uuid = require('node-uuid'); -var jobStatus = require('./job_status'); +var jobStatus = require('../job_status'); var validStatusTransitions = [ [jobStatus.PENDING, jobStatus.RUNNING], [jobStatus.PENDING, jobStatus.CANCELLED], diff --git a/batch/job_multiple.js b/batch/models/job_multiple.js similarity index 98% rename from batch/job_multiple.js rename to batch/models/job_multiple.js index 2f32089b..7bce92ae 100644 --- a/batch/job_multiple.js +++ b/batch/models/job_multiple.js @@ -2,7 +2,7 @@ var util = require('util'); var JobBase = require('./job_base'); -var jobStatus = require('./job_status'); +var jobStatus = require('../job_status'); function JobMultiple(data) { JobBase.call(this, data); diff --git a/batch/job_simple.js b/batch/models/job_simple.js similarity index 100% rename from batch/job_simple.js rename to batch/models/job_simple.js From 8b7d481b9ad1e08745f3ab48d9339c486b98517c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Garc=C3=ADa=20Aubert?= Date: Wed, 18 May 2016 11:06:49 +0200 Subject: [PATCH 8/9] Avoided console usage, favor debug --- batch/batch.js | 15 ++++++++------- batch/job_backend.js | 4 +++- batch/job_service.js | 5 +++-- batch/job_subscriber.js | 3 ++- batch/util/debug.js | 7 +++++++ package.json | 1 + 6 files changed, 24 insertions(+), 11 deletions(-) create mode 100644 batch/util/debug.js diff --git a/batch/batch.js b/batch/batch.js index 87025cfe..326b3a81 100644 --- a/batch/batch.js +++ b/batch/batch.js @@ -2,6 +2,7 @@ var util = require('util'); var EventEmitter = require('events').EventEmitter; +var debug = require('./util/debug')('batch'); var forever = require('./forever'); var queue = require('queue-async'); var jobStatus = require('./job_status'); @@ -40,10 +41,10 @@ Batch.prototype._subscribe = function () { self.jobQueuePool.removeQueue(host); if (err.name === 'EmptyQueue') { - return console.log(err.message); + return debug(err.message); } - console.error(err); + debug(err); }); }); }; @@ -69,7 +70,7 @@ Batch.prototype._consumeJobs = function (host, queue, callback) { self.jobQueuePool.removeCurrentJobId(host); if (err && err.name === 'JobNotRunnable') { - console.log(err.message); + debug(err.message); return callback(); } @@ -78,9 +79,9 @@ Batch.prototype._consumeJobs = function (host, queue, callback) { } if (job.data.status === jobStatus.FAILED) { - console.log('Job %s %s in %s due to: %s', job_id, job.data.status, host, job.failed_reason); + debug('Job %s %s in %s due to: %s', job_id, job.data.status, host, job.failed_reason); } else { - console.log('Job %s %s in %s', job_id, job.data.status, host); + debug('Job %s %s in %s', job_id, job.data.status, host); } self.emit('job:' + job.data.status, job_id); @@ -101,9 +102,9 @@ Batch.prototype.drain = function (callback) { batchQueues.awaitAll(function (err) { if (err) { - console.error('Something went wrong draining', err); + debug('Something went wrong draining', err); } else { - console.log('Drain complete'); + debug('Drain complete'); } callback(); diff --git a/batch/job_backend.js b/batch/job_backend.js index 31d7cdd4..996afb12 100644 --- a/batch/job_backend.js +++ b/batch/job_backend.js @@ -1,6 +1,8 @@ 'use strict'; + var queue = require('queue-async'); +var debug = require('./util/debug')('job-backend'); var REDIS_PREFIX = 'batch:jobs:'; var REDIS_DB = 5; var JOBS_TTL_IN_SECONDS = global.settings.jobs_ttl_in_seconds || 48 * 3600; // 48 hours @@ -232,7 +234,7 @@ JobBackend.prototype._getIndexedJob = function (job_id, user, callback) { if (err && err.name === 'NotFoundError') { return self.userIndexer.remove(user, job_id, function (err) { if (err) { - console.error('Error removing key %s in user set', job_id, err); + debug('Error removing key %s in user set', job_id, err); } callback(); }); diff --git a/batch/job_service.js b/batch/job_service.js index d9cc3ca1..99ad458c 100644 --- a/batch/job_service.js +++ b/batch/job_service.js @@ -1,5 +1,6 @@ 'use strict'; +var debug = require('./util/debug')('job-service'); var JobFactory = require('./job_factory'); var jobStatus = require('./job_status'); @@ -40,7 +41,7 @@ JobService.prototype.list = function (user, callback) { try { job = JobFactory.create(data); } catch (err) { - return console.err(err); + return debug(err); } return job; @@ -149,7 +150,7 @@ JobService.prototype.drain = function (job_id, callback) { self.jobCanceller.cancel(job, function (err) { if (err) { - console.error('There was an error while draining job %s, %s ', job_id, err); + debug('There was an error while draining job %s, %s ', job_id, err); return callback(err); } diff --git a/batch/job_subscriber.js b/batch/job_subscriber.js index 63d22f5a..2c411c29 100644 --- a/batch/job_subscriber.js +++ b/batch/job_subscriber.js @@ -1,11 +1,12 @@ 'use strict'; +var debug = require('./util/debug')('job-subscriber'); var SUBSCRIBE_INTERVAL_IN_MILLISECONDS = 10 * 60 * 1000; // 10 minutes function _subscribe(client, channel, queueSeeker, onMessage) { queueSeeker.seek(onMessage, function (err) { if (err) { - console.error(err); + debug(err); } client.removeAllListeners('message'); diff --git a/batch/util/debug.js b/batch/util/debug.js new file mode 100644 index 00000000..2d9116ea --- /dev/null +++ b/batch/util/debug.js @@ -0,0 +1,7 @@ +'use strict'; + +var debug = require('debug'); + +module.exports = function batchDebug (ns) { + return debug(['batch', ns].join(':')); +}; diff --git a/package.json b/package.json index 4f73554a..21bde5b3 100644 --- a/package.json +++ b/package.json @@ -19,6 +19,7 @@ "dependencies": { "cartodb-psql": "~0.6.0", "cartodb-redis": "~0.11.0", + "debug": "2.2.0", "express": "~2.5.11", "log4js": "https://github.com/CartoDB/log4js-node/tarball/cdb", "lru-cache": "~2.5.0", From 016d39d2b6d34e61416a25b7a1ffc7836c31dc3c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Garc=C3=ADa=20Aubert?= Date: Wed, 18 May 2016 11:18:20 +0200 Subject: [PATCH 9/9] Updated dependency-version-locker --- npm-shrinkwrap.json | 39 +++++++++++++-------------------------- 1 file changed, 13 insertions(+), 26 deletions(-) diff --git a/npm-shrinkwrap.json b/npm-shrinkwrap.json index 41f765a0..d4771fb6 100644 --- a/npm-shrinkwrap.json +++ b/npm-shrinkwrap.json @@ -23,18 +23,6 @@ "resolved": "https://registry.npmjs.org/buffer-writer/-/buffer-writer-1.0.0.tgz" } } - }, - "debug": { - "version": "2.2.0", - "from": "debug@>=2.2.0 <2.3.0", - "resolved": "https://registry.npmjs.org/debug/-/debug-2.2.0.tgz", - "dependencies": { - "ms": { - "version": "0.7.1", - "from": "ms@0.7.1", - "resolved": "https://registry.npmjs.org/ms/-/ms-0.7.1.tgz" - } - } } } }, @@ -94,6 +82,18 @@ } } }, + "debug": { + "version": "2.2.0", + "from": "debug@2.2.0", + "resolved": "https://registry.npmjs.org/debug/-/debug-2.2.0.tgz", + "dependencies": { + "ms": { + "version": "0.7.1", + "from": "ms@0.7.1", + "resolved": "https://registry.npmjs.org/ms/-/ms-0.7.1.tgz" + } + } + }, "express": { "version": "2.5.11", "from": "express@>=2.5.11 <2.6.0", @@ -256,20 +256,7 @@ "step-profiler": { "version": "0.3.0", "from": "step-profiler@>=0.3.0 <0.4.0", - "dependencies": { - "debug": { - "version": "2.2.0", - "from": "debug@>=2.2.0 <2.3.0", - "resolved": "https://registry.npmjs.org/debug/-/debug-2.2.0.tgz", - "dependencies": { - "ms": { - "version": "0.7.1", - "from": "ms@0.7.1", - "resolved": "https://registry.npmjs.org/ms/-/ms-0.7.1.tgz" - } - } - } - } + "resolved": "https://registry.npmjs.org/step-profiler/-/step-profiler-0.3.0.tgz" }, "topojson": { "version": "0.0.8",