diff --git a/app/controllers/job_controller.js b/app/controllers/job_controller.js index bd81d86d..1ba79c44 100644 --- a/app/controllers/job_controller.js +++ b/app/controllers/job_controller.js @@ -221,6 +221,23 @@ JobController.prototype.getJob = function (req, res) { ); }; +function isValidJob(sql) { + if (_.isArray(sql)) { + for (var i = 0; i < sql.length; i++) { + if (!_.isString(sql[i])) { + return false; + } + } + return true; + } + + if (!_.isString(sql)) { + return false; + } + + return true; +} + JobController.prototype.createJob = function (req, res) { var self = this; var body = (req.body) ? req.body : {}; @@ -228,8 +245,8 @@ JobController.prototype.createJob = function (req, res) { var sql = (params.query === "" || _.isUndefined(params.query)) ? null : params.query; var cdbUsername = cdbReq.userByReq(req); - if (!_.isString(sql)) { - return handleException(new Error("You must indicate a sql query"), res); + if (!isValidJob(sql)) { + return handleException(new Error("You must indicate a valid SQL query"), res); } if ( req.profiler ) { @@ -300,7 +317,7 @@ JobController.prototype.updateJob = function (req, res) { var sql = (params.query === "" || _.isUndefined(params.query)) ? null : params.query; var cdbUsername = cdbReq.userByReq(req); - if (!_.isString(sql)) { + if (!isValidJob(sql)) { return handleException(new Error("You must indicate a sql query"), res); } diff --git a/batch/job_backend.js b/batch/job_backend.js index 6437c044..b1d1e070 100644 --- a/batch/job_backend.js +++ b/batch/job_backend.js @@ -21,7 +21,7 @@ JobBackend.prototype.create = function (username, sql, host, callback) { this.redisPrefix + job_id, 'user', username, 'status', 'pending', - 'query', sql, + 'query', JSON.stringify(sql), 'created_at', now, 'updated_at', now ]; @@ -99,6 +99,8 @@ JobBackend.prototype.list = function (username, callback) { return self.list(username, callback); } + + callback(null, jobs); }); }); @@ -173,11 +175,19 @@ JobBackend.prototype.get = function (job_id, callback) { return callback(notFoundError); } + var query; + + try { + query = JSON.parse(jobValues[2]); + } catch (err) { + query = jobValues[2]; + } + callback(null, { job_id: job_id, user: jobValues[0], status: jobValues[1], - query: jobValues[2], + query: query, created_at: jobValues[3], updated_at: jobValues[4], failed_reason: jobValues[5] ? jobValues[5] : undefined diff --git a/batch/job_runner.js b/batch/job_runner.js index 2027bcd4..aecc8a1b 100644 --- a/batch/job_runner.js +++ b/batch/job_runner.js @@ -2,6 +2,7 @@ var errorCodes = require('../app/postgresql/error_codes').codeToCondition; var PSQL = require('cartodb-psql'); +var queue = require('queue-async'); function JobRunner(jobBackend, userDatabaseMetadataService) { @@ -33,13 +34,38 @@ JobRunner.prototype.run = function (job_id, callback) { return callback(err); } - self._query(job, userDatabaseMetadata, callback); + self._series(job, userDatabaseMetadata, callback); }); }); }); }; -JobRunner.prototype._query = function (job, userDatabaseMetadata, callback) { +JobRunner.prototype._series = function(job, userDatabaseMetadata, callback) { + var jobQueue = queue(1); // performs in series + + if (!Array.isArray(job.query)) { + job.query = [ job.query ]; + } + + for (var i = 0; i < job.query.length; i++) { + jobQueue.defer(this._query.bind(this), job, userDatabaseMetadata, i); + } + + jobQueue.await(function (err, result) { + if (err) { + return callback(err); + } + + // last result is the good one + if (Array.isArray(result)) { + return callback(null, result[result.length - 1]); + } + + callback(null, result); + }) +}; + +JobRunner.prototype._query = function (job, userDatabaseMetadata, index, callback) { var self = this; var pg = new PSQL(userDatabaseMetadata, {}, { destroyOnError: true }); @@ -50,7 +76,7 @@ JobRunner.prototype._query = function (job, userDatabaseMetadata, callback) { } // mark query to allow to users cancel their queries whether users request for it - var sql = job.query + ' /* ' + job.job_id + ' */'; + var sql = job.query[index] + ' /* ' + job.job_id + ' */'; pg.eventedQuery(sql, function (err, query) { if (err) { diff --git a/test/acceptance/batch.test.js b/test/acceptance/batch.test.js index ea803cd0..45cd5605 100644 --- a/test/acceptance/batch.test.js +++ b/test/acceptance/batch.test.js @@ -195,4 +195,27 @@ describe('batch module', function() { }); }); + it.skip('should perform job with array of select', function (done) { + var jobs = ['select * from private_table', 'select * from private_table']; + var queriesDone = 0 + + createJob(jobs, function (err, job) { + if (err) { + return done(err); + } + + batch.on('job:done', function (job_id) { + if (job_id === job.job_id) { + queriesDone++; + + if (queriesDone === jobs.length) { + done(); + } + } + }); + + + }); + }); + });