diff --git a/app/app.js b/app/app.js index 5b05e4be..173c2e86 100644 --- a/app/app.js +++ b/app/app.js @@ -28,6 +28,8 @@ var JobQueue = require('../batch/job_queue'); var UserIndexer = require('../batch/user_indexer'); var JobBackend = require('../batch/job_backend'); var JobCanceller = require('../batch/job_canceller'); +var JobService = require('../batch/job_service'); + var UserDatabaseMetadataService = require('../batch/user_database_metadata_service'); var cors = require('./middlewares/cors'); @@ -183,7 +185,9 @@ function App() { var userIndexer = new UserIndexer(metadataBackend); var jobBackend = new JobBackend(metadataBackend, jobQueue, jobPublisher, userIndexer); var userDatabaseMetadataService = new UserDatabaseMetadataService(metadataBackend); - var jobCanceller = new JobCanceller(metadataBackend, userDatabaseMetadataService, jobBackend); + var jobCanceller = new JobCanceller(userDatabaseMetadataService); + var jobService = new JobService(jobBackend, jobCanceller); + var genericController = new GenericController(); genericController.route(app); @@ -191,7 +195,7 @@ function App() { var queryController = new QueryController(userDatabaseService, tableCache, statsd_client); queryController.route(app); - var jobController = new JobController(userDatabaseService, jobBackend, jobCanceller); + var jobController = new JobController(userDatabaseService, jobService, jobCanceller); jobController.route(app); var cacheStatusController = new CacheStatusController(tableCache); diff --git a/app/controllers/job_controller.js b/app/controllers/job_controller.js index 892a976d..2d5b186c 100644 --- a/app/controllers/job_controller.js +++ b/app/controllers/job_controller.js @@ -37,12 +37,13 @@ function getMaxSizeErrorMessage(sql) { ); } -function JobController(userDatabaseService, jobBackend, jobCanceller) { +function JobController(userDatabaseService, jobService) { this.userDatabaseService = userDatabaseService; - this.jobBackend = jobBackend; - this.jobCanceller = jobCanceller; + this.jobService = jobService; } +module.exports = JobController; + JobController.prototype.route = function (app) { app.post(global.settings.base_url + '/sql/job', this.createJob.bind(this)); app.get(global.settings.base_url + '/sql/job', this.listJob.bind(this)); @@ -84,13 +85,13 @@ JobController.prototype.cancelJob = function (req, res) { req.profiler.done('setDBAuth'); } - self.jobCanceller.cancel(job_id, function (err, job) { + self.jobService.cancel(job_id, function (err, job) { if (err) { return next(err); } next(null, { - job: job, + job: job.serialize(), host: userDatabase.host }); }); @@ -149,13 +150,15 @@ JobController.prototype.listJob = function (req, res) { req.profiler.done('setDBAuth'); } - self.jobBackend.list(cdbUsername, function (err, jobs) { + self.jobService.list(cdbUsername, function (err, jobs) { if (err) { return next(err); } next(null, { - jobs: jobs, + jobs: jobs.map(function (job) { + return job.serialize(); + }), host: userDatabase.host }); }); @@ -215,13 +218,13 @@ JobController.prototype.getJob = function (req, res) { req.profiler.done('setDBAuth'); } - self.jobBackend.get(job_id, function (err, job) { + self.jobService.get(job_id, function (err, job) { if (err) { return next(err); } next(null, { - job: job, + job: job.serialize(), host: userDatabase.host }); }); @@ -249,23 +252,6 @@ JobController.prototype.getJob = function (req, res) { ); }; -function isValidJob(sql) { - if (_.isArray(sql)) { - for (var i = 0; i < sql.length; i++) { - if (!_.isString(sql[i])) { - return false; - } - } - return true; - } - - if (!_.isString(sql)) { - return false; - } - - return true; -} - JobController.prototype.createJob = function (req, res) { // jshint maxcomplexity: 7 var self = this; @@ -274,11 +260,7 @@ JobController.prototype.createJob = function (req, res) { var sql = (params.query === "" || _.isUndefined(params.query)) ? null : params.query; var cdbUsername = cdbReq.userByReq(req); - - if (!isValidJob(sql)) { - return handleException(new Error('You must indicate a valid SQL'), res); - } - + // TODO: in job.validate() if (reachMaxQuerySizeLimit(sql)) { return handleException(new Error(getMaxSizeErrorMessage(sql)), res); } @@ -308,13 +290,19 @@ JobController.prototype.createJob = function (req, res) { req.profiler.done('setDBAuth'); } - self.jobBackend.create(cdbUsername, sql, userDatabase.host, function (err, result) { + var data = { + user: cdbUsername, + query: sql, + host: userDatabase.host + }; + + self.jobService.create(data, function (err, job) { if (err) { return next(err); } next(null, { - job: result, + job: job.serialize(), host: userDatabase.host }); }); @@ -342,7 +330,6 @@ JobController.prototype.createJob = function (req, res) { ); }; - JobController.prototype.updateJob = function (req, res) { // jshint maxcomplexity: 7 var self = this; @@ -352,10 +339,7 @@ JobController.prototype.updateJob = function (req, res) { var sql = (params.query === "" || _.isUndefined(params.query)) ? null : params.query; var cdbUsername = cdbReq.userByReq(req); - if (!isValidJob(sql)) { - return handleException(new Error('You must indicate a valid SQL'), res); - } - + // TODO: in jobValidate if (reachMaxQuerySizeLimit(sql)) { return handleException(new Error(getMaxSizeErrorMessage(sql)), res); } @@ -385,13 +369,18 @@ JobController.prototype.updateJob = function (req, res) { req.profiler.done('setDBAuth'); } - self.jobBackend.update(job_id, sql, function (err, job) { + var data = { + job_id: job_id, + query: sql + }; + + self.jobService.update(data, function (err, job) { if (err) { return next(err); } next(null, { - job: job, + job: job.serialize(), host: userDatabase.host }); }); @@ -413,9 +402,8 @@ JobController.prototype.updateJob = function (req, res) { if (result.host) { res.header('X-Served-By-DB-Host', result.host); } + res.send(result.job); } ); }; - -module.exports = JobController; diff --git a/batch/batch.js b/batch/batch.js index 25a37a3b..326b3a81 100644 --- a/batch/batch.js +++ b/batch/batch.js @@ -2,19 +2,22 @@ var util = require('util'); var EventEmitter = require('events').EventEmitter; +var debug = require('./util/debug')('batch'); var forever = require('./forever'); var queue = require('queue-async'); var jobStatus = require('./job_status'); -function Batch(jobSubscriber, jobQueuePool, jobRunner, jobCanceller) { +function Batch(jobSubscriber, jobQueuePool, jobRunner, jobService) { EventEmitter.call(this); this.jobSubscriber = jobSubscriber; this.jobQueuePool = jobQueuePool; this.jobRunner = jobRunner; - this.jobCanceller = jobCanceller; + this.jobService = jobService; } util.inherits(Batch, EventEmitter); +module.exports = Batch; + Batch.prototype.start = function () { this._subscribe(); }; @@ -38,64 +41,14 @@ Batch.prototype._subscribe = function () { self.jobQueuePool.removeQueue(host); if (err.name === 'EmptyQueue') { - return console.log(err.message); + return debug(err.message); } - console.error(err); + debug(err); }); }); }; -Batch.prototype.drain = function (callback) { - var self = this; - - var queues = this.jobQueuePool.list(); - - var batchQueues = queue(queues.length); - - queues.forEach(function (host) { - batchQueues.defer(self._drainJob.bind(self), host); - }); - - batchQueues.awaitAll(function (err) { - if (err) { - console.error('Something went wrong draining', err); - } else { - console.log('Drain complete'); - } - - callback(); - }); -}; - -Batch.prototype._drainJob = function (host, callback) { - var self = this; - var job_id = self.jobQueuePool.getCurrentJobId(host); - - if (!job_id) { - return process.nextTick(function () { - return callback(); - }); - } - - var queue = self.jobQueuePool.getQueue(host); - - this.jobCanceller.drain(job_id, function (err) { - if (err && err.name === 'CancelNotAllowedError') { - return callback(); - } - - if (err) { - return callback(err); - } - - queue.enqueueFirst(job_id, host, callback); - }); -}; - -Batch.prototype.stop = function () { - this.jobSubscriber.unsubscribe(); -}; Batch.prototype._consumeJobs = function (host, queue, callback) { var self = this; @@ -116,8 +69,8 @@ Batch.prototype._consumeJobs = function (host, queue, callback) { self.jobRunner.run(job_id, function (err, job) { self.jobQueuePool.removeCurrentJobId(host); - if (err && err.name === 'InvalidJobStatus') { - console.log(err.message); + if (err && err.name === 'JobNotRunnable') { + debug(err.message); return callback(); } @@ -125,17 +78,64 @@ Batch.prototype._consumeJobs = function (host, queue, callback) { return callback(err); } - if (job.status === jobStatus.FAILED) { - console.log('Job %s %s in %s due to: %s', job_id, job.status, host, job.failed_reason); + if (job.data.status === jobStatus.FAILED) { + debug('Job %s %s in %s due to: %s', job_id, job.data.status, host, job.failed_reason); } else { - console.log('Job %s %s in %s', job_id, job.status, host); + debug('Job %s %s in %s', job_id, job.data.status, host); } - self.emit('job:' + job.status, job_id); + self.emit('job:' + job.data.status, job_id); callback(); }); }); }; -module.exports = Batch; +Batch.prototype.drain = function (callback) { + var self = this; + var queues = this.jobQueuePool.list(); + var batchQueues = queue(queues.length); + + queues.forEach(function (host) { + batchQueues.defer(self._drainJob.bind(self), host); + }); + + batchQueues.awaitAll(function (err) { + if (err) { + debug('Something went wrong draining', err); + } else { + debug('Drain complete'); + } + + callback(); + }); +}; + +Batch.prototype._drainJob = function (host, callback) { + var self = this; + var job_id = self.jobQueuePool.getCurrentJobId(host); + + if (!job_id) { + return process.nextTick(function () { + return callback(); + }); + } + + var queue = self.jobQueuePool.getQueue(host); + + this.jobService.drain(job_id, function (err) { + if (err && err.name === 'CancelNotAllowedError') { + return callback(); + } + + if (err) { + return callback(err); + } + + queue.enqueueFirst(job_id, host, callback); + }); +}; + +Batch.prototype.stop = function () { + this.jobSubscriber.unsubscribe(); +}; diff --git a/batch/index.js b/batch/index.js index 54c83c92..985d7204 100644 --- a/batch/index.js +++ b/batch/index.js @@ -12,6 +12,7 @@ var JobPublisher = require('./job_publisher'); var JobQueue = require('./job_queue'); var UserIndexer = require('./user_indexer'); var JobBackend = require('./job_backend'); +var JobService = require('./job_service'); var Batch = require('./batch'); module.exports = function batchFactory (metadataBackend) { @@ -23,9 +24,11 @@ module.exports = function batchFactory (metadataBackend) { var userIndexer = new UserIndexer(metadataBackend); var jobBackend = new JobBackend(metadataBackend, jobQueue, jobPublisher, userIndexer); var userDatabaseMetadataService = new UserDatabaseMetadataService(metadataBackend); + // TODO: down userDatabaseMetadataService var queryRunner = new QueryRunner(); - var jobRunner = new JobRunner(jobBackend, jobQueue, queryRunner, userDatabaseMetadataService); - var jobCanceller = new JobCanceller(metadataBackend, userDatabaseMetadataService, jobBackend); + var jobCanceller = new JobCanceller(userDatabaseMetadataService); + var jobService = new JobService(jobBackend, jobCanceller); + var jobRunner = new JobRunner(jobService, jobQueue, queryRunner, userDatabaseMetadataService); - return new Batch(jobSubscriber, jobQueuePool, jobRunner, jobCanceller); + return new Batch(jobSubscriber, jobQueuePool, jobRunner, jobService); }; diff --git a/batch/job_backend.js b/batch/job_backend.js index 80064d12..996afb12 100644 --- a/batch/job_backend.js +++ b/batch/job_backend.js @@ -1,129 +1,205 @@ 'use strict'; -var uuid = require('node-uuid'); + var queue = require('queue-async'); +var debug = require('./util/debug')('job-backend'); +var REDIS_PREFIX = 'batch:jobs:'; +var REDIS_DB = 5; var JOBS_TTL_IN_SECONDS = global.settings.jobs_ttl_in_seconds || 48 * 3600; // 48 hours var jobStatus = require('./job_status'); - -function setPendingIfMutiqueryJob(sql) { - if (Array.isArray(sql)) { - for (var j = 0; j < sql.length; j++) { - sql[j] = { - query: sql[j], - status: jobStatus.PENDING - }; - } - } - - return sql; -} +var finalStatus = [ + jobStatus.CANCELLED, + jobStatus.DONE, + jobStatus.FAILED, + jobStatus.UNKNOWN +]; function JobBackend(metadataBackend, jobQueueProducer, jobPublisher, userIndexer) { - this.db = 5; - this.redisPrefix = 'batch:jobs:'; this.metadataBackend = metadataBackend; this.jobQueueProducer = jobQueueProducer; this.jobPublisher = jobPublisher; this.userIndexer = userIndexer; } -JobBackend.prototype.create = function (username, sql, host, callback) { +function toRedisParams(job) { + var redisParams = [REDIS_PREFIX + job.job_id]; + var obj = JSON.parse(JSON.stringify(job)); + delete obj.job_id; + + for (var property in obj) { + if (obj.hasOwnProperty(property)) { + redisParams.push(property); + if (property === 'query' && typeof obj[property] !== 'string') { + redisParams.push(JSON.stringify(obj[property])); + } else { + redisParams.push(obj[property]); + } + } + } + + return redisParams; +} + +function toObject(job_id, redisParams, redisValues) { + var obj = {}; + + redisParams.shift(); // job_id value + redisParams.pop(); // WARN: weird function pushed by metadataBackend + + for (var i = 0; i < redisParams.length; i++) { + if (redisParams[i] === 'query') { + try { + obj[redisParams[i]] = JSON.parse(redisValues[i]); + } catch (e) { + obj[redisParams[i]] = redisValues[i]; + } + } else { + obj[redisParams[i]] = redisValues[i]; + } + } + + obj.job_id = job_id; // adds redisKey as object property + + return obj; +} + +// TODO: is it really necessary?? +function isJobFound(redisValues) { + return redisValues[0] && redisValues[1] && redisValues[2] && redisValues[3] && redisValues[4]; +} + +JobBackend.prototype.get = function (job_id, callback) { var self = this; - var job_id = uuid.v4(); - var now = new Date().toISOString(); - - sql = setPendingIfMutiqueryJob(sql); - var redisParams = [ - this.redisPrefix + job_id, - 'user', username, - 'status', jobStatus.PENDING, - 'query', JSON.stringify(sql), - 'created_at', now, - 'updated_at', now + REDIS_PREFIX + job_id, + 'user', + 'status', + 'query', + 'created_at', + 'updated_at', + 'host', + 'failed_reason' ]; - this.metadataBackend.redisCmd(this.db, 'HMSET', redisParams , function (err) { + self.metadataBackend.redisCmd(REDIS_DB, 'HMGET', redisParams , function (err, redisValues) { if (err) { return callback(err); } - self.jobQueueProducer.enqueue(job_id, host, function (err) { + if (!isJobFound(redisValues)) { + var notFoundError = new Error('Job with id ' + job_id + ' not found'); + notFoundError.name = 'NotFoundError'; + return callback(notFoundError); + } + + var jobData = toObject(job_id, redisParams, redisValues); + + callback(null, jobData); + }); +}; + +JobBackend.prototype.create = function (job, callback) { + var self = this; + + self.get(job.job_id, function (err) { + if (err && err.name !== 'NotFoundError') { + return callback(err); + } + + self.save(job, function (err, jobSaved) { if (err) { return callback(err); } - // broadcast to consumers - self.jobPublisher.publish(host); + self.jobQueueProducer.enqueue(job.job_id, job.host, function (err) { + if (err) { + return callback(err); + } - self.userIndexer.add(username, job_id, function (err) { - if (err) { - return callback(err); - } + // broadcast to consumers + self.jobPublisher.publish(job.host); - self.get(job_id, callback); + self.userIndexer.add(job.user, job.job_id, function (err) { + if (err) { + return callback(err); + } + + callback(null, jobSaved); + }); }); }); }); }; -JobBackend.prototype.update = function (job_id, sql, callback) { +JobBackend.prototype.update = function (job, callback) { var self = this; - this.get(job_id, function (err, job) { + self.get(job.job_id, function (err) { if (err) { return callback(err); } - if (job.status !== jobStatus.PENDING) { - return callback(new Error('Job is not pending, it cannot be updated')); + self.save(job, callback); + }); +}; + +JobBackend.prototype.save = function (job, callback) { + var self = this; + var redisParams = toRedisParams(job); + + self.metadataBackend.redisCmd(REDIS_DB, 'HMSET', redisParams , function (err) { + if (err) { + return callback(err); } - if (Array.isArray(job.query)) { - for (var i = 0; i < job.query.length; i++) { - if (job.query[i].status !== jobStatus.PENDING) { - return callback(new Error('Job is not pending, it cannot be updated')); - } - } - } - - sql = setPendingIfMutiqueryJob(sql); - - var now = new Date().toISOString(); - var redisParams = [ - self.redisPrefix + job_id, - 'query', JSON.stringify(sql), - 'updated_at', now - ]; - - self.metadataBackend.redisCmd(self.db, 'HMSET', redisParams , function (err) { + self.setTTL(job, function (err) { if (err) { return callback(err); } - self.get(job_id, callback); - }); + self.get(job.job_id, function (err, job) { + if (err) { + return callback(err); + } + callback(null, job); + }); + }); }); }; -JobBackend.prototype.list = function (username, callback) { +function isFrozen(status) { + return finalStatus.indexOf(status) !== -1; +} + +JobBackend.prototype.setTTL = function (job, callback) { + var self = this; + var redisKey = REDIS_PREFIX + job.job_id; + + if (!isFrozen(job.status)) { + return callback(); + } + + self.metadataBackend.redisCmd(REDIS_DB, 'EXPIRE', [ redisKey, JOBS_TTL_IN_SECONDS ], callback); +}; + +JobBackend.prototype.list = function (user, callback) { var self = this; - this.userIndexer.list(username, function (err, job_ids) { + this.userIndexer.list(user, function (err, job_ids) { if (err) { return callback(err); } var initialLength = job_ids.length; - self._getCleanedList(username, job_ids, function (err, jobs) { + self._getCleanedList(user, job_ids, function (err, jobs) { if (err) { return callback(err); } if (jobs.length < initialLength) { - return self.list(username, callback); + return self.list(user, callback); } callback(null, jobs); @@ -131,13 +207,13 @@ JobBackend.prototype.list = function (username, callback) { }); }; -JobBackend.prototype._getCleanedList = function (username, job_ids, callback) { +JobBackend.prototype._getCleanedList = function (user, job_ids, callback) { var self = this; var jobsQueue = queue(job_ids.length); job_ids.forEach(function(job_id) { - jobsQueue.defer(self._getIndexedJob.bind(self), job_id, username); + jobsQueue.defer(self._getIndexedJob.bind(self), job_id, user); }); jobsQueue.awaitAll(function (err, jobs) { @@ -151,15 +227,14 @@ JobBackend.prototype._getCleanedList = function (username, job_ids, callback) { }); }; -JobBackend.prototype._getIndexedJob = function (job_id, username, callback) { +JobBackend.prototype._getIndexedJob = function (job_id, user, callback) { var self = this; this.get(job_id, function (err, job) { - if (err && err.name === 'NotFoundError') { - return self.userIndexer.remove(username, job_id, function (err) { + return self.userIndexer.remove(user, job_id, function (err) { if (err) { - console.error('Error removing key %s in user set', job_id, err); + debug('Error removing key %s in user set', job_id, err); } callback(); }); @@ -173,257 +248,4 @@ JobBackend.prototype._getIndexedJob = function (job_id, username, callback) { }); }; -JobBackend.prototype._isJobFound = function (jobValues) { - return jobValues[0] && jobValues[1] && jobValues[2] && jobValues[3] && jobValues[4]; -}; - -JobBackend.prototype.get = function (job_id, callback) { - var self = this; - var redisParams = [ - this.redisPrefix + job_id, - 'user', - 'status', - 'query', - 'created_at', - 'updated_at', - 'failed_reason' - ]; - - this.metadataBackend.redisCmd(this.db, 'HMGET', redisParams , function (err, jobValues) { - if (err) { - return callback(err); - } - - if (!self._isJobFound(jobValues)) { - var notFoundError = new Error('Job with id ' + job_id + ' not found'); - notFoundError.name = 'NotFoundError'; - return callback(notFoundError); - } - - var query; - - try { - query = JSON.parse(jobValues[2]); - } catch (err) { - query = jobValues[2]; - } - - callback(null, { - job_id: job_id, - user: jobValues[0], - status: jobValues[1], - query: query, - created_at: jobValues[3], - updated_at: jobValues[4], - failed_reason: jobValues[5] ? jobValues[5] : undefined - }); - }); -}; - -JobBackend.prototype.setRunning = function (job, index, callback) { - var self = this; - var now = new Date().toISOString(); - var redisParams = [ - this.redisPrefix + job.job_id, - 'status', jobStatus.RUNNING, - 'updated_at', now, - ]; - - if (!callback) { - callback = index; - } else if (index >= 0 && index < job.query.length) { - job.query[index].status = jobStatus.RUNNING; - redisParams = redisParams.concat('query', JSON.stringify(job.query)); - } - - this.metadataBackend.redisCmd(this.db, 'HMSET', redisParams, function (err) { - if (err) { - return callback(err); - } - - self.get(job.job_id, callback); - }); -}; - -JobBackend.prototype.setPending = function (job, index, callback) { - var self = this; - var now = new Date().toISOString(); - var redisKey = this.redisPrefix + job.job_id; - var redisParams = [ - redisKey, - 'status', jobStatus.PENDING, - 'updated_at', now - ]; - - if (!callback) { - callback = index; - } else if (index >= 0 && index < job.query.length) { - job.query[index].status = jobStatus.PENDING; - redisParams = redisParams.concat('query', JSON.stringify(job.query)); - } - - this.metadataBackend.redisCmd(this.db, 'HMSET', redisParams , function (err) { - if (err) { - return callback(err); - } - - self.get(job.job_id, callback); - }); -}; - -JobBackend.prototype.setDone = function (job, index, callback) { - var self = this; - var now = new Date().toISOString(); - var redisKey = this.redisPrefix + job.job_id; - var redisParams = [ - redisKey, - 'status', jobStatus.DONE, - 'updated_at', now - ]; - - if (!callback) { - callback = index; - } else if (index >= 0 && index < job.query.length) { - job.query[index].status = jobStatus.DONE; - redisParams = redisParams.concat('query', JSON.stringify(job.query)); - } - - this.metadataBackend.redisCmd(this.db, 'HMSET', redisParams , function (err) { - if (err) { - return callback(err); - } - - self.metadataBackend.redisCmd(self.db, 'EXPIRE', [ redisKey, JOBS_TTL_IN_SECONDS ], function (err) { - if (err) { - return callback(err); - } - - self.get(job.job_id, callback); - }); - }); -}; - -JobBackend.prototype.setJobPendingAndQueryDone = function (job, index, callback) { - var self = this; - var now = new Date().toISOString(); - var redisKey = this.redisPrefix + job.job_id; - - job.query[index].status = jobStatus.DONE; - - var redisParams = [ - redisKey, - 'status', jobStatus.PENDING, - 'updated_at', now, - 'query', JSON.stringify(job.query) - ]; - - this.metadataBackend.redisCmd(this.db, 'HMSET', redisParams , function (err) { - if (err) { - return callback(err); - } - - self.get(job.job_id, callback); - }); -}; - -JobBackend.prototype.setFailed = function (job, error, index, callback) { - var self = this; - var now = new Date().toISOString(); - var redisKey = this.redisPrefix + job.job_id; - var redisParams = [ - redisKey, - 'status', jobStatus.FAILED, - 'failed_reason', error.message, - 'updated_at', now - ]; - - if (!callback) { - callback = index; - } else if (index >= 0 && index < job.query.length) { - job.query[index].status = jobStatus.FAILED; - job.query[index].failed_reason = error.message; - redisParams = redisParams.concat('query', JSON.stringify(job.query)); - } - - this.metadataBackend.redisCmd(this.db, 'HMSET', redisParams , function (err) { - if (err) { - return callback(err); - } - - self.metadataBackend.redisCmd(self.db, 'EXPIRE', [ redisKey, JOBS_TTL_IN_SECONDS ], function (err) { - if (err) { - return callback(err); - } - - self.get(job.job_id, callback); - }); - }); -}; - -JobBackend.prototype.setCancelled = function (job, index, callback) { - var self = this; - var now = new Date().toISOString(); - var redisKey = this.redisPrefix + job.job_id; - var redisParams = [ - redisKey, - 'status', jobStatus.CANCELLED, - 'updated_at', now - ]; - - if (!callback) { - callback = index; - } else if (index >= 0 && index < job.query.length) { - job.query[index].status = jobStatus.CANCELLED; - redisParams = redisParams.concat('query', JSON.stringify(job.query)); - } - - this.metadataBackend.redisCmd(this.db, 'HMSET', redisParams , function (err) { - if (err) { - return callback(err); - } - - self.metadataBackend.redisCmd(self.db, 'PERSIST', [ redisKey ], function (err) { - if (err) { - return callback(err); - } - - self.get(job.job_id, callback); - }); - - }); -}; - -JobBackend.prototype.setUnknown = function (job_id, callback) { - var self = this; - - this.get(job_id, function (err, job) { - if (err) { - return callback(err); - } - - var now = new Date().toISOString(); - var redisKey = self.redisPrefix + job.job_id; - var redisParams = [ - redisKey, - 'status', jobStatus.UNKNOWN, - 'updated_at', now - ]; - - self.metadataBackend.redisCmd(self.db, 'HMSET', redisParams , function (err) { - if (err) { - return callback(err); - } - - self.metadataBackend.redisCmd(self.db, 'EXPIRE', [ redisKey, JOBS_TTL_IN_SECONDS ], function (err) { - if (err) { - return callback(err); - } - - self.get(job.job_id, callback); - }); - }); - }); -}; - - module.exports = JobBackend; diff --git a/batch/job_canceller.js b/batch/job_canceller.js index dfeb122a..fb41b683 100644 --- a/batch/job_canceller.js +++ b/batch/job_canceller.js @@ -1,87 +1,47 @@ 'use strict'; var PSQL = require('cartodb-psql'); -var jobStatus = require('./job_status'); -function JobCanceller(metadataBackend, userDatabaseMetadataService, jobBackend) { - this.metadataBackend = metadataBackend; +function JobCanceller(userDatabaseMetadataService) { this.userDatabaseMetadataService = userDatabaseMetadataService; - this.jobBackend = jobBackend; } -function getIndexOfRunningQuery(job) { - if (Array.isArray(job.query)) { - for (var i = 0; i < job.query.length; i++) { - if (job.query[i].status === jobStatus.RUNNING) { - return i; - } - } - } -} +module.exports = JobCanceller; -JobCanceller.prototype.cancel = function (job_id, callback) { - var self = this; - - self.jobBackend.get(job_id, function (err, job) { +JobCanceller.prototype.cancel = function (job, callback) { + this.userDatabaseMetadataService.getUserMetadata(job.data.user, function (err, userDatabaseMetadata) { if (err) { return callback(err); } - if (job.status === jobStatus.PENDING) { - return self.jobBackend.setCancelled(job, callback); + doCancel(job.data.job_id, userDatabaseMetadata, callback); + }); +}; + +function doCancel(job_id, userDatabaseMetadata, callback) { + var pg = new PSQL(userDatabaseMetadata, {}, { destroyOnError: true }); + + getQueryPID(pg, job_id, function (err, pid) { + if (err) { + return callback(err); } - if (job.status !== jobStatus.RUNNING) { - var cancelNotAllowedError = new Error('Job is ' + job.status + ', cancel is not allowed'); - cancelNotAllowedError.name = 'CancelNotAllowedError'; - return callback(cancelNotAllowedError); - } - - self.userDatabaseMetadataService.getUserMetadata(job.user, function (err, userDatabaseMetadata) { + doCancelQuery(pg, pid, function (err, isCancelled) { if (err) { return callback(err); } - self._query(job, userDatabaseMetadata, function (err, job) { - if (err) { - return callback(err); - } + if (!isCancelled) { + return callback(new Error('Query has not been cancelled')); + } - var queryIndex = getIndexOfRunningQuery(job); - - self.jobBackend.setCancelled(job, queryIndex, function (err, job) { - if (err) { - return callback(err); - } - - callback(null, job, queryIndex); - }); - }); + callback(null); }); }); -}; +} -JobCanceller.prototype.drain = function (job_id, callback) { - var self = this; - - this.cancel(job_id, function (err, job, queryIndex) { - if (err && err.name === 'CancelNotAllowedError') { - return callback(err); - } - - if (err) { - console.error('There was an error while draining job %s, %s ', job_id, err); - return self.jobBackend.setUnknown(job_id, callback); - } - - self.jobBackend.setPending(job, queryIndex, callback); - }); - -}; - -JobCanceller.prototype._query = function (job, userDatabaseMetadata, callback) { - var pg = new PSQL(userDatabaseMetadata, {}, { destroyOnError: true }); - var getPIDQuery = "SELECT pid FROM pg_stat_activity WHERE query LIKE '/* " + job.job_id + " */%'"; +function getQueryPID(pg, job_id, callback) { + var getPIDQuery = "SELECT pid FROM pg_stat_activity WHERE query LIKE '/* " + job_id + " */%'"; pg.query(getPIDQuery, function(err, result) { if (err) { @@ -92,24 +52,20 @@ JobCanceller.prototype._query = function (job, userDatabaseMetadata, callback) { return callback(new Error('Query is not running currently')); } - var pid = result.rows[0].pid; - var cancelQuery = 'SELECT pg_cancel_backend(' + pid + ')'; - - pg.query(cancelQuery, function (err, result) { - if (err) { - return callback(err); - } - - var isCancelled = result.rows[0].pg_cancel_backend; - - if (!isCancelled) { - return callback(new Error('Query has not been cancelled')); - } - - callback(null, job); - }); + callback(null, result.rows[0].pid); }); -}; +} +function doCancelQuery(pg, pid, callback) { + var cancelQuery = 'SELECT pg_cancel_backend(' + pid + ')'; -module.exports = JobCanceller; + pg.query(cancelQuery, function (err, result) { + if (err) { + return callback(err); + } + + var isCancelled = result.rows[0].pg_cancel_backend; + + callback(null, isCancelled); + }); +} diff --git a/batch/job_factory.js b/batch/job_factory.js new file mode 100644 index 00000000..6fa8aa80 --- /dev/null +++ b/batch/job_factory.js @@ -0,0 +1,22 @@ +'use strict'; + +var jobModels = require('./models'); + +function JobFactory() { +} + +module.exports = JobFactory; + +JobFactory.create = function (data) { + if (!data.query) { + throw new Error('You must indicate a valid SQL'); + } + + for (var i = 0; i < jobModels.length; i++) { + if (jobModels[i].is(data.query)) { + return new jobModels[i](data); + } + } + + throw new Error('there is no job class for the provided query'); +}; diff --git a/batch/job_runner.js b/batch/job_runner.js index 64a269e9..7c8bd73e 100644 --- a/batch/job_runner.js +++ b/batch/job_runner.js @@ -3,74 +3,30 @@ var errorCodes = require('../app/postgresql/error_codes').codeToCondition; var jobStatus = require('./job_status'); -function getNextQuery(job) { - if (!Array.isArray(job.query)) { - return { - query: job.query - }; - } - - for (var i = 0; i < job.query.length; i++) { - if (job.query[i].status === jobStatus.PENDING) { - return { - index: i, - query: job.query[i].query - }; - } - } -} - -function isLastQuery(job, index) { - if (!Array.isArray(job.query)) { - return true; - } - - if (index >= (job.query.length -1)) { - return true; - } - - return false; -} - -function JobRunner(jobBackend, jobQueue, queryRunner,userDatabaseMetadataService) { - this.jobBackend = jobBackend; +function JobRunner(jobService, jobQueue, queryRunner, userDatabaseMetadataService) { + this.jobService = jobService; this.jobQueue = jobQueue; this.queryRunner = queryRunner; - this.userDatabaseMetadataService = userDatabaseMetadataService; + this.userDatabaseMetadataService = userDatabaseMetadataService; // TODO: move to queryRunner } JobRunner.prototype.run = function (job_id, callback) { var self = this; - self.jobBackend.get(job_id, function (err, job) { + self.jobService.get(job_id, function (err, job) { if (err) { return callback(err); } - if (job.status !== jobStatus.PENDING) { - var invalidJobStatusError = new Error([ - 'Cannot run job', - job.job_id, - 'due to its status is', - job.status - ].join(' ')); - invalidJobStatusError.name = 'InvalidJobStatus'; - return callback(invalidJobStatusError); + var query = job.getNextQuery(); + + try { + job.setStatus(jobStatus.RUNNING); + } catch (err) { + return callback(err); } - var query = getNextQuery(job); - - if (!query) { - var queryNotFoundError = new Error([ - 'Cannot run job', - job.job_id, - ', there is no query to run' - ].join(' ')); - queryNotFoundError.name = 'QueryNotFound'; - return callback(queryNotFoundError); - } - - self.jobBackend.setRunning(job, query.index, function (err, job) { + self.jobService.save(job, function (err, job) { if (err) { return callback(err); } @@ -82,32 +38,46 @@ JobRunner.prototype.run = function (job_id, callback) { JobRunner.prototype._run = function (job, query, callback) { var self = this; - self.userDatabaseMetadataService.getUserMetadata(job.user, function (err, userDatabaseMetadata) { + + // TODO: move to query + self.userDatabaseMetadataService.getUserMetadata(job.data.user, function (err, userDatabaseMetadata) { if (err) { return callback(err); } - self.queryRunner.run(job.job_id, query.query, userDatabaseMetadata, function (err /*, result */) { + self.queryRunner.run(job.data.job_id, query, userDatabaseMetadata, function (err /*, result */) { if (err) { // if query has been cancelled then it's going to get the current // job status saved by query_canceller if (errorCodes[err.code.toString()] === 'query_canceled') { - return self.jobBackend.get(job.job_id, callback); + return self.jobService.get(job.data.job_id, callback); } - return self.jobBackend.setFailed(job, err, query.index, callback); + try { + job.setStatus(jobStatus.FAILED); + } catch (err) { + return callback(err); + } + + return self.jobService.save(job, callback); } - if (isLastQuery(job, query.index)) { - return self.jobBackend.setDone(job, query.index, callback); + try { + job.setStatus(jobStatus.DONE); + } catch (err) { + return callback(err); } - self.jobBackend.setJobPendingAndQueryDone(job, query.index, function (err, job) { + self.jobService.save(job, function (err, job) { if (err) { return callback(err); } - self.jobQueue.enqueue(job.job_id, userDatabaseMetadata.host, function (err){ + if (job.isDone()) { + return callback(null, job); + } + + self.jobQueue.enqueue(job.data.job_id, userDatabaseMetadata.host, function (err) { if (err) { return callback(err); } diff --git a/batch/job_service.js b/batch/job_service.js new file mode 100644 index 00000000..99ad458c --- /dev/null +++ b/batch/job_service.js @@ -0,0 +1,166 @@ +'use strict'; + +var debug = require('./util/debug')('job-service'); +var JobFactory = require('./job_factory'); +var jobStatus = require('./job_status'); + +function JobService(jobBackend, jobCanceller) { + this.jobBackend = jobBackend; + this.jobCanceller = jobCanceller; +} + +module.exports = JobService; + +JobService.prototype.get = function (job_id, callback) { + this.jobBackend.get(job_id, function (err, data) { + if (err) { + return callback(err); + } + + var job; + + try { + job = JobFactory.create(data); + } catch (err) { + return callback(err); + } + + callback(null, job); + }); +}; + +JobService.prototype.list = function (user, callback) { + this.jobBackend.list(user, function (err, dataList) { + if (err) { + return callback(err); + } + + var jobList = dataList.map(function (data) { + var job; + + try { + job = JobFactory.create(data); + } catch (err) { + return debug(err); + } + + return job; + }) + .filter(function (job) { + return job !== undefined; + }); + + callback(null, jobList); + }); +}; + +JobService.prototype.create = function (data, callback) { + try { + var job = JobFactory.create(data); + job.validate(); + this.jobBackend.create(job.data, function (err) { + if (err) { + return callback(err); + } + callback(null, job); + }); + } catch (err) { + return callback(err); + } +}; + +JobService.prototype.update = function (data, callback) { + var self = this; + + self.get(data.job_id, function (err, job) { + if (err) { + return callback(err); + } + + try { + job.setQuery(data.query); + self.save(job, callback); + } catch (err) { + return callback(err); + } + }); +}; + +JobService.prototype.save = function (job, callback) { + var self = this; + + try { + job.validate(); + } catch (err) { + return callback(err); + } + + self.jobBackend.update(job.data, function (err, data) { + if (err) { + return callback(err); + } + + try { + job = JobFactory.create(data); + } catch (err) { + return callback(err); + } + + callback(null, job); + }); +}; + +JobService.prototype.cancel = function (job_id, callback) { + var self = this; + + self.get(job_id, function (err, job) { + if (err) { + return callback(err); + } + + var isPending = job.isPending(); + + try { + job.setStatus(jobStatus.CANCELLED); + } catch (err) { + return callback(err); + } + + if (isPending) { + return self.save(job, callback); + } + + self.jobCanceller.cancel(job, function (err) { + if (err) { + return callback(err); + } + + self.save(job, callback); + }); + }); +}; + +JobService.prototype.drain = function (job_id, callback) { + var self = this; + + self.get(job_id, function (err, job) { + if (err) { + return callback(err); + } + + self.jobCanceller.cancel(job, function (err) { + if (err) { + debug('There was an error while draining job %s, %s ', job_id, err); + return callback(err); + } + + try { + job.setStatus(jobStatus.PENDING); + } catch (err) { + return callback(err); + } + + self.jobBackend.update(job.data, callback); + }); + }); +}; diff --git a/batch/job_subscriber.js b/batch/job_subscriber.js index 63d22f5a..2c411c29 100644 --- a/batch/job_subscriber.js +++ b/batch/job_subscriber.js @@ -1,11 +1,12 @@ 'use strict'; +var debug = require('./util/debug')('job-subscriber'); var SUBSCRIBE_INTERVAL_IN_MILLISECONDS = 10 * 60 * 1000; // 10 minutes function _subscribe(client, channel, queueSeeker, onMessage) { queueSeeker.seek(onMessage, function (err) { if (err) { - console.error(err); + debug(err); } client.removeAllListeners('message'); diff --git a/batch/models/index.js b/batch/models/index.js new file mode 100644 index 00000000..0e80c124 --- /dev/null +++ b/batch/models/index.js @@ -0,0 +1,6 @@ +'use strict'; + +var JobSimple = require('./job_simple'); +var JobMultiple = require('./job_multiple'); + +module.exports = [ JobSimple, JobMultiple ]; diff --git a/batch/models/job_base.js b/batch/models/job_base.js new file mode 100644 index 00000000..8fd6d9d2 --- /dev/null +++ b/batch/models/job_base.js @@ -0,0 +1,137 @@ +'use strict'; + +var assert = require('assert'); +var uuid = require('node-uuid'); +var jobStatus = require('../job_status'); +var validStatusTransitions = [ + [jobStatus.PENDING, jobStatus.RUNNING], + [jobStatus.PENDING, jobStatus.CANCELLED], + [jobStatus.PENDING, jobStatus.UNKNOWN], + [jobStatus.RUNNING, jobStatus.DONE], + [jobStatus.RUNNING, jobStatus.FAILED], + [jobStatus.RUNNING, jobStatus.CANCELLED], + [jobStatus.RUNNING, jobStatus.PENDING], + [jobStatus.RUNNING, jobStatus.UNKNOWN] +]; +var mandatoryProperties = [ + 'job_id', + 'status', + 'query', + 'created_at', + 'updated_at', + 'host', + 'user' +]; + +function JobBase(data) { + var now = new Date().toISOString(); + + this.data = data; + + if (!this.data.job_id) { + this.data.job_id = uuid.v4(); + } + + if (!this.data.created_at) { + this.data.created_at = now; + } + + if (!this.data.updated_at) { + this.data.updated_at = now; + } + + if (!this.data.status) { + this.data.status = jobStatus.PENDING; + } +} + +module.exports = JobBase; + +JobBase.prototype.isValidStatusTransition = function (initialStatus, finalStatus) { + var transition = [ initialStatus, finalStatus ]; + + for (var i = 0; i < validStatusTransitions.length; i++) { + try { + assert.deepEqual(transition, validStatusTransitions[i]); + return true; + } catch (e) { + continue; + } + } + + return false; +}; + +// should be implemented by childs +JobBase.prototype.getNextQuery = function () { + throw new Error('Unimplemented method'); +}; + +JobBase.prototype.hasNextQuery = function () { + return !!this.getNextQuery(); +}; + + + +JobBase.prototype.isPending = function () { + return this.data.status === jobStatus.PENDING; +}; + +JobBase.prototype.isRunning = function () { + return this.data.status === jobStatus.RUNNING; +}; + +JobBase.prototype.isDone = function () { + return this.data.status === jobStatus.DONE; +}; + +JobBase.prototype.isCancelled = function () { + return this.data.status === jobStatus.CANCELLED; +}; + +JobBase.prototype.isFailed = function () { + return this.data.status === jobStatus.FAILED; +}; + +JobBase.prototype.isUnknown = function () { + return this.data.status === jobStatus.UNKNOWN; +}; + +JobBase.prototype.setQuery = function (query) { + var now = new Date().toISOString(); + + if (!this.isPending()) { + throw new Error('Job is not pending, it cannot be updated'); + } + + this.data.updated_at = now; + this.data.query = query; +}; + +JobBase.prototype.setStatus = function (finalStatus) { + var now = new Date().toISOString(); + var initialStatus = this.data.status; + var isValid = this.isValidStatusTransition(initialStatus, finalStatus); + + if (!isValid) { + throw new Error('Cannot set status from ' + initialStatus + ' to ' + finalStatus); + } + + this.data.updated_at = now; + this.data.status = finalStatus; +}; + +JobBase.prototype.validate = function () { + for (var i = 0; i < mandatoryProperties.length; i++) { + if (!this.data[mandatoryProperties[i]]) { + throw new Error('property "' + mandatoryProperties[i] + '" is mandatory'); + } + } +}; + +JobBase.prototype.serialize = function () { + var data = JSON.parse(JSON.stringify(this.data)); + delete data.host; + + return data; +}; diff --git a/batch/models/job_multiple.js b/batch/models/job_multiple.js new file mode 100644 index 00000000..7bce92ae --- /dev/null +++ b/batch/models/job_multiple.js @@ -0,0 +1,83 @@ +'use strict'; + +var util = require('util'); +var JobBase = require('./job_base'); +var jobStatus = require('../job_status'); + +function JobMultiple(data) { + JobBase.call(this, data); + + this.init(); +} +util.inherits(JobMultiple, JobBase); + +module.exports = JobMultiple; + +JobMultiple.is = function (query) { + if (!Array.isArray(query)) { + return false; + } + + // 1. From user: ['select * from ...', 'select * from ...'] + // 2. From redis: [ { query: 'select * from ...', status: 'pending' }, + // { query: 'select * from ...', status: 'pending' } ] + for (var i = 0; i < query.length; i++) { + if (typeof query[i] !== 'string') { + if (typeof query[i].query !== 'string') { + return false; + } + } + } + + return true; +}; + +JobMultiple.prototype.init = function () { + for (var i = 0; i < this.data.query.length; i++) { + if (!this.data.query[i].query && !this.data.query[i].status) { + this.data.query[i] = { + query: this.data.query[i], + status: jobStatus.PENDING + }; + } + } +}; + +JobMultiple.prototype.getNextQuery = function () { + for (var i = 0; i < this.data.query.length; i++) { + if (this.data.query[i].status === jobStatus.PENDING) { + return this.data.query[i].query; + } + } +}; + +JobMultiple.prototype.setQuery = function (query) { + if (!JobMultiple.is(query)) { + throw new Error('You must indicate a valid SQL'); + } + + JobMultiple.super_.prototype.setQuery.call(this, query); +}; + +JobMultiple.prototype.setStatus = function (finalStatus) { + var initialStatus = this.data.status; + // if transition is to "done" and there are more queries to run + // then job status must be "pending" instead of "done" + // else job status transition to done (if "running") + if (finalStatus === jobStatus.DONE && this.hasNextQuery()) { + JobMultiple.super_.prototype.setStatus.call(this, jobStatus.PENDING); + } else { + JobMultiple.super_.prototype.setStatus.call(this, finalStatus); + } + + for (var i = 0; i < this.data.query.length; i++) { + var isValid = JobMultiple.super_.prototype.isValidStatusTransition(this.data.query[i].status, finalStatus); + + if (isValid) { + this.data.query[i].status = finalStatus; + return; + } + } + + throw new Error('Cannot set status from ' + initialStatus + ' to ' + finalStatus); +}; diff --git a/batch/models/job_simple.js b/batch/models/job_simple.js new file mode 100644 index 00000000..6e56fedb --- /dev/null +++ b/batch/models/job_simple.js @@ -0,0 +1,29 @@ +'use strict'; + +var util = require('util'); +var JobBase = require('./job_base'); + +function JobSimple(data) { + JobBase.call(this, data); +} +util.inherits(JobSimple, JobBase); + +module.exports = JobSimple; + +JobSimple.is = function (query) { + return typeof query === 'string'; +}; + +JobSimple.prototype.getNextQuery = function () { + if (this.isPending()) { + return this.data.query; + } +}; + +JobSimple.prototype.setQuery = function (query) { + if (!JobSimple.is(query)) { + throw new Error('You must indicate a valid SQL'); + } + + JobSimple.super_.prototype.setQuery.call(this, query); +}; diff --git a/batch/query_runner.js b/batch/query_runner.js index 9e545514..1d5701ff 100644 --- a/batch/query_runner.js +++ b/batch/query_runner.js @@ -5,8 +5,9 @@ var PSQL = require('cartodb-psql'); function QueryRunner() { } -QueryRunner.prototype.run = function (job_id, sql, userDatabaseMetadata, callback) { +module.exports = QueryRunner; +QueryRunner.prototype.run = function (job_id, sql, userDatabaseMetadata, callback) { var pg = new PSQL(userDatabaseMetadata, {}, { destroyOnError: true }); pg.query('SET statement_timeout=0', function (err) { @@ -35,6 +36,3 @@ QueryRunner.prototype.run = function (job_id, sql, userDatabaseMetadata, callbac }); }; - - -module.exports = QueryRunner; diff --git a/batch/util/debug.js b/batch/util/debug.js new file mode 100644 index 00000000..2d9116ea --- /dev/null +++ b/batch/util/debug.js @@ -0,0 +1,7 @@ +'use strict'; + +var debug = require('debug'); + +module.exports = function batchDebug (ns) { + return debug(['batch', ns].join(':')); +}; diff --git a/npm-shrinkwrap.json b/npm-shrinkwrap.json index f8242968..62121384 100644 --- a/npm-shrinkwrap.json +++ b/npm-shrinkwrap.json @@ -23,18 +23,6 @@ "resolved": "https://registry.npmjs.org/buffer-writer/-/buffer-writer-1.0.0.tgz" } } - }, - "debug": { - "version": "2.2.0", - "from": "debug@>=2.2.0 <2.3.0", - "resolved": "https://registry.npmjs.org/debug/-/debug-2.2.0.tgz", - "dependencies": { - "ms": { - "version": "0.7.1", - "from": "ms@0.7.1", - "resolved": "https://registry.npmjs.org/ms/-/ms-0.7.1.tgz" - } - } } } }, @@ -94,6 +82,18 @@ } } }, + "debug": { + "version": "2.2.0", + "from": "debug@2.2.0", + "resolved": "https://registry.npmjs.org/debug/-/debug-2.2.0.tgz", + "dependencies": { + "ms": { + "version": "0.7.1", + "from": "ms@0.7.1", + "resolved": "https://registry.npmjs.org/ms/-/ms-0.7.1.tgz" + } + } + }, "express": { "version": "2.5.11", "from": "express@>=2.5.11 <2.6.0", @@ -256,20 +256,7 @@ "step-profiler": { "version": "0.3.0", "from": "step-profiler@>=0.3.0 <0.4.0", - "dependencies": { - "debug": { - "version": "2.2.0", - "from": "debug@>=2.2.0 <2.3.0", - "resolved": "https://registry.npmjs.org/debug/-/debug-2.2.0.tgz", - "dependencies": { - "ms": { - "version": "0.7.1", - "from": "ms@0.7.1", - "resolved": "https://registry.npmjs.org/ms/-/ms-0.7.1.tgz" - } - } - } - } + "resolved": "https://registry.npmjs.org/step-profiler/-/step-profiler-0.3.0.tgz" }, "topojson": { "version": "0.0.8", diff --git a/package.json b/package.json index 96ed25de..8590a00f 100644 --- a/package.json +++ b/package.json @@ -19,6 +19,7 @@ "dependencies": { "cartodb-psql": "~0.6.0", "cartodb-redis": "~0.11.0", + "debug": "2.2.0", "express": "~2.5.11", "log4js": "https://github.com/CartoDB/log4js-node/tarball/cdb", "lru-cache": "~2.5.0", diff --git a/test/acceptance/batch.test.js b/test/acceptance/batch.test.js index 02f51f75..5035c011 100644 --- a/test/acceptance/batch.test.js +++ b/test/acceptance/batch.test.js @@ -3,10 +3,14 @@ var _ = require('underscore'); var redis = require('redis'); var queue = require('queue-async'); var batchFactory = require('../../batch'); + var JobPublisher = require('../../batch/job_publisher'); var JobQueue = require('../../batch/job_queue'); var UserIndexer = require('../../batch/user_indexer'); var JobBackend = require('../../batch/job_backend'); +var JobService = require('../../batch/job_service'); +var UserDatabaseMetadataService = require('../../batch/user_database_metadata_service'); +var JobCanceller = require('../../batch/job_canceller'); var metadataBackend = require('cartodb-redis')({ host: global.settings.redis_host, port: global.settings.redis_port, @@ -22,6 +26,9 @@ describe('batch module', function() { var jobPublisher = new JobPublisher(redis); var userIndexer = new UserIndexer(metadataBackend); var jobBackend = new JobBackend(metadataBackend, jobQueue, jobPublisher, userIndexer); + var userDatabaseMetadataService = new UserDatabaseMetadataService(metadataBackend); + var jobCanceller = new JobCanceller(userDatabaseMetadataService); + var jobService = new JobService(jobBackend, jobCanceller); var batch = batchFactory(metadataBackend); @@ -37,12 +44,18 @@ describe('batch module', function() { }); function createJob(sql, done) { - jobBackend.create(username, sql, dbInstance, function (err, job) { + var data = { + user: username, + query: sql, + host: dbInstance + }; + + jobService.create(data, function (err, job) { if (err) { return done(err); } - done(null, job); + done(null, job.serialize()); }); } @@ -196,8 +209,7 @@ describe('batch module', function() { }); it('should perform job with array of select', function (done) { - var queries = ['select * from private_table', 'select * from private_table']; - + var queries = ['select * from private_table limit 1', 'select * from private_table']; createJob(queries, function (err, job) { if (err) { diff --git a/test/acceptance/job.use-case-1.test.js b/test/acceptance/job.use-case-1.test.js index ab1dd41b..2c965bac 100644 --- a/test/acceptance/job.use-case-1.test.js +++ b/test/acceptance/job.use-case-1.test.js @@ -91,7 +91,7 @@ describe('Use case 1: cancel and modify a done job', function () { status: 400 }, function(res) { var errors = JSON.parse(res.body); - assert.equal(errors.error[0], "Job is done, cancel is not allowed"); + assert.equal(errors.error[0], "Cannot set status from done to cancelled"); done(); }); }); diff --git a/test/acceptance/job.use-case-10.test.js b/test/acceptance/job.use-case-10.test.js index 444e56d8..a327bd9f 100644 --- a/test/acceptance/job.use-case-10.test.js +++ b/test/acceptance/job.use-case-10.test.js @@ -26,7 +26,7 @@ var metadataBackend = require('cartodb-redis')({ }); var batchFactory = require('../../batch'); -describe('Use case 1: cancel and modify a done multiquery job', function () { +describe('Use case 10: cancel and modify a done multiquery job', function () { var batch = batchFactory(metadataBackend); @@ -95,7 +95,7 @@ describe('Use case 1: cancel and modify a done multiquery job', function () { status: 400 }, function(res) { var errors = JSON.parse(res.body); - assert.equal(errors.error[0], "Job is done, cancel is not allowed"); + assert.equal(errors.error[0], "Cannot set status from done to cancelled"); done(); }); }); @@ -106,7 +106,10 @@ describe('Use case 1: cancel and modify a done multiquery job', function () { headers: { 'host': 'vizzuality.cartodb.com', 'Content-Type': 'application/x-www-form-urlencoded' }, method: 'PUT', data: querystring.stringify({ - query: "SELECT cartodb_id FROM untitle_table_4" + query: [ + "SELECT * FROM untitle_table_4", + "SELECT * FROM untitle_table_4" + ] }) }, { status: 400 diff --git a/test/acceptance/job.use-case-2.test.js b/test/acceptance/job.use-case-2.test.js index 205a9920..c2552d63 100644 --- a/test/acceptance/job.use-case-2.test.js +++ b/test/acceptance/job.use-case-2.test.js @@ -121,7 +121,7 @@ describe('Use case 2: cancel a running job', function() { status: 400 }, function(res) { var errors = JSON.parse(res.body); - assert.equal(errors.error[0], "Job is cancelled, cancel is not allowed"); + assert.equal(errors.error[0], "Cannot set status from cancelled to cancelled"); done(); }); }); diff --git a/test/acceptance/job.use-case-3.test.js b/test/acceptance/job.use-case-3.test.js index 9d666132..a4d36055 100644 --- a/test/acceptance/job.use-case-3.test.js +++ b/test/acceptance/job.use-case-3.test.js @@ -97,7 +97,7 @@ describe('Use case 3: cancel a pending job', function() { }, 50); }); - it('Step 4, cancel a job should be cancelled', function (done){ + it('Step 4, cancel a pending job should be cancelled', function (done){ assert.response(app, { url: '/api/v2/sql/job/' + pendingJob.job_id + '?api_key=1234', headers: { 'host': 'vizzuality.cartodb.com', 'Content-Type': 'application/x-www-form-urlencoded' }, diff --git a/test/acceptance/job.use-case-8.test.js b/test/acceptance/job.use-case-8.test.js index ff48a326..2ea0fa19 100644 --- a/test/acceptance/job.use-case-8.test.js +++ b/test/acceptance/job.use-case-8.test.js @@ -125,7 +125,7 @@ describe('Use case 8: cancel a running multiquery job', function() { status: 400 }, function(res) { var errors = JSON.parse(res.body); - assert.equal(errors.error[0], "Job is cancelled, cancel is not allowed"); + assert.equal(errors.error[0], "Cannot set status from cancelled to cancelled"); done(); }); }); @@ -136,7 +136,12 @@ describe('Use case 8: cancel a running multiquery job', function() { headers: { 'host': 'vizzuality.cartodb.com', 'Content-Type': 'application/x-www-form-urlencoded' }, method: 'PUT', data: querystring.stringify({ - query: "SELECT cartodb_id FROM untitle_table_4" + query: [ + "select pg_sleep(1)", + "select pg_sleep(1)", + "select pg_sleep(1)", + "select pg_sleep(1)" + ] }) }, { status: 400 diff --git a/test/acceptance/job.use-case-9.test.js b/test/acceptance/job.use-case-9.test.js index b9378fbe..ab56ee7c 100644 --- a/test/acceptance/job.use-case-9.test.js +++ b/test/acceptance/job.use-case-9.test.js @@ -103,20 +103,29 @@ describe('Use case 9: modify a pending multiquery job', function() { }, 50); }); - it('Step 4, multiquery job should be modified', function (done){ + it('Step 4, multiquery job should be modified', function (done) { assert.response(app, { url: '/api/v2/sql/job/' + pendingJob.job_id + '?api_key=1234', headers: { 'host': 'vizzuality.cartodb.com', 'Content-Type': 'application/x-www-form-urlencoded' }, method: 'PUT', data: querystring.stringify({ - query: "SELECT cartodb_id FROM untitle_table_4" + query: [ + "SELECT * FROM untitle_table_4", + "SELECT * FROM untitle_table_4 limit 1" + ] }) }, { status: 200 }, function(res) { var jobGot = JSON.parse(res.body); assert.equal(jobGot.job_id, pendingJob.job_id); - assert.equal(jobGot.query, "SELECT cartodb_id FROM untitle_table_4"); + assert.deepEqual(jobGot.query, [{ + query: 'SELECT * FROM untitle_table_4', + status: 'pending' + }, { + query: 'SELECT * FROM untitle_table_4 limit 1', + status: 'pending' + }]); done(); }); });