Merge pull request #301 from CartoDB/job-model-refactor

Job model refactor
This commit is contained in:
Daniel 2016-05-18 11:38:41 +02:00
commit 8c8da8e39c
25 changed files with 830 additions and 621 deletions

View File

@ -28,6 +28,8 @@ var JobQueue = require('../batch/job_queue');
var UserIndexer = require('../batch/user_indexer');
var JobBackend = require('../batch/job_backend');
var JobCanceller = require('../batch/job_canceller');
var JobService = require('../batch/job_service');
var UserDatabaseMetadataService = require('../batch/user_database_metadata_service');
var cors = require('./middlewares/cors');
@ -183,7 +185,9 @@ function App() {
var userIndexer = new UserIndexer(metadataBackend);
var jobBackend = new JobBackend(metadataBackend, jobQueue, jobPublisher, userIndexer);
var userDatabaseMetadataService = new UserDatabaseMetadataService(metadataBackend);
var jobCanceller = new JobCanceller(metadataBackend, userDatabaseMetadataService, jobBackend);
var jobCanceller = new JobCanceller(userDatabaseMetadataService);
var jobService = new JobService(jobBackend, jobCanceller);
var genericController = new GenericController();
genericController.route(app);
@ -191,7 +195,7 @@ function App() {
var queryController = new QueryController(userDatabaseService, tableCache, statsd_client);
queryController.route(app);
var jobController = new JobController(userDatabaseService, jobBackend, jobCanceller);
var jobController = new JobController(userDatabaseService, jobService, jobCanceller);
jobController.route(app);
var cacheStatusController = new CacheStatusController(tableCache);

View File

@ -37,12 +37,13 @@ function getMaxSizeErrorMessage(sql) {
);
}
function JobController(userDatabaseService, jobBackend, jobCanceller) {
function JobController(userDatabaseService, jobService) {
this.userDatabaseService = userDatabaseService;
this.jobBackend = jobBackend;
this.jobCanceller = jobCanceller;
this.jobService = jobService;
}
module.exports = JobController;
JobController.prototype.route = function (app) {
app.post(global.settings.base_url + '/sql/job', this.createJob.bind(this));
app.get(global.settings.base_url + '/sql/job', this.listJob.bind(this));
@ -84,13 +85,13 @@ JobController.prototype.cancelJob = function (req, res) {
req.profiler.done('setDBAuth');
}
self.jobCanceller.cancel(job_id, function (err, job) {
self.jobService.cancel(job_id, function (err, job) {
if (err) {
return next(err);
}
next(null, {
job: job,
job: job.serialize(),
host: userDatabase.host
});
});
@ -149,13 +150,15 @@ JobController.prototype.listJob = function (req, res) {
req.profiler.done('setDBAuth');
}
self.jobBackend.list(cdbUsername, function (err, jobs) {
self.jobService.list(cdbUsername, function (err, jobs) {
if (err) {
return next(err);
}
next(null, {
jobs: jobs,
jobs: jobs.map(function (job) {
return job.serialize();
}),
host: userDatabase.host
});
});
@ -215,13 +218,13 @@ JobController.prototype.getJob = function (req, res) {
req.profiler.done('setDBAuth');
}
self.jobBackend.get(job_id, function (err, job) {
self.jobService.get(job_id, function (err, job) {
if (err) {
return next(err);
}
next(null, {
job: job,
job: job.serialize(),
host: userDatabase.host
});
});
@ -249,23 +252,6 @@ JobController.prototype.getJob = function (req, res) {
);
};
function isValidJob(sql) {
if (_.isArray(sql)) {
for (var i = 0; i < sql.length; i++) {
if (!_.isString(sql[i])) {
return false;
}
}
return true;
}
if (!_.isString(sql)) {
return false;
}
return true;
}
JobController.prototype.createJob = function (req, res) {
// jshint maxcomplexity: 7
var self = this;
@ -274,11 +260,7 @@ JobController.prototype.createJob = function (req, res) {
var sql = (params.query === "" || _.isUndefined(params.query)) ? null : params.query;
var cdbUsername = cdbReq.userByReq(req);
if (!isValidJob(sql)) {
return handleException(new Error('You must indicate a valid SQL'), res);
}
// TODO: in job.validate()
if (reachMaxQuerySizeLimit(sql)) {
return handleException(new Error(getMaxSizeErrorMessage(sql)), res);
}
@ -308,13 +290,19 @@ JobController.prototype.createJob = function (req, res) {
req.profiler.done('setDBAuth');
}
self.jobBackend.create(cdbUsername, sql, userDatabase.host, function (err, result) {
var data = {
user: cdbUsername,
query: sql,
host: userDatabase.host
};
self.jobService.create(data, function (err, job) {
if (err) {
return next(err);
}
next(null, {
job: result,
job: job.serialize(),
host: userDatabase.host
});
});
@ -342,7 +330,6 @@ JobController.prototype.createJob = function (req, res) {
);
};
JobController.prototype.updateJob = function (req, res) {
// jshint maxcomplexity: 7
var self = this;
@ -352,10 +339,7 @@ JobController.prototype.updateJob = function (req, res) {
var sql = (params.query === "" || _.isUndefined(params.query)) ? null : params.query;
var cdbUsername = cdbReq.userByReq(req);
if (!isValidJob(sql)) {
return handleException(new Error('You must indicate a valid SQL'), res);
}
// TODO: in jobValidate
if (reachMaxQuerySizeLimit(sql)) {
return handleException(new Error(getMaxSizeErrorMessage(sql)), res);
}
@ -385,13 +369,18 @@ JobController.prototype.updateJob = function (req, res) {
req.profiler.done('setDBAuth');
}
self.jobBackend.update(job_id, sql, function (err, job) {
var data = {
job_id: job_id,
query: sql
};
self.jobService.update(data, function (err, job) {
if (err) {
return next(err);
}
next(null, {
job: job,
job: job.serialize(),
host: userDatabase.host
});
});
@ -413,9 +402,8 @@ JobController.prototype.updateJob = function (req, res) {
if (result.host) {
res.header('X-Served-By-DB-Host', result.host);
}
res.send(result.job);
}
);
};
module.exports = JobController;

View File

@ -2,19 +2,22 @@
var util = require('util');
var EventEmitter = require('events').EventEmitter;
var debug = require('./util/debug')('batch');
var forever = require('./forever');
var queue = require('queue-async');
var jobStatus = require('./job_status');
function Batch(jobSubscriber, jobQueuePool, jobRunner, jobCanceller) {
function Batch(jobSubscriber, jobQueuePool, jobRunner, jobService) {
EventEmitter.call(this);
this.jobSubscriber = jobSubscriber;
this.jobQueuePool = jobQueuePool;
this.jobRunner = jobRunner;
this.jobCanceller = jobCanceller;
this.jobService = jobService;
}
util.inherits(Batch, EventEmitter);
module.exports = Batch;
Batch.prototype.start = function () {
this._subscribe();
};
@ -38,64 +41,14 @@ Batch.prototype._subscribe = function () {
self.jobQueuePool.removeQueue(host);
if (err.name === 'EmptyQueue') {
return console.log(err.message);
return debug(err.message);
}
console.error(err);
debug(err);
});
});
};
Batch.prototype.drain = function (callback) {
var self = this;
var queues = this.jobQueuePool.list();
var batchQueues = queue(queues.length);
queues.forEach(function (host) {
batchQueues.defer(self._drainJob.bind(self), host);
});
batchQueues.awaitAll(function (err) {
if (err) {
console.error('Something went wrong draining', err);
} else {
console.log('Drain complete');
}
callback();
});
};
Batch.prototype._drainJob = function (host, callback) {
var self = this;
var job_id = self.jobQueuePool.getCurrentJobId(host);
if (!job_id) {
return process.nextTick(function () {
return callback();
});
}
var queue = self.jobQueuePool.getQueue(host);
this.jobCanceller.drain(job_id, function (err) {
if (err && err.name === 'CancelNotAllowedError') {
return callback();
}
if (err) {
return callback(err);
}
queue.enqueueFirst(job_id, host, callback);
});
};
Batch.prototype.stop = function () {
this.jobSubscriber.unsubscribe();
};
Batch.prototype._consumeJobs = function (host, queue, callback) {
var self = this;
@ -116,8 +69,8 @@ Batch.prototype._consumeJobs = function (host, queue, callback) {
self.jobRunner.run(job_id, function (err, job) {
self.jobQueuePool.removeCurrentJobId(host);
if (err && err.name === 'InvalidJobStatus') {
console.log(err.message);
if (err && err.name === 'JobNotRunnable') {
debug(err.message);
return callback();
}
@ -125,17 +78,64 @@ Batch.prototype._consumeJobs = function (host, queue, callback) {
return callback(err);
}
if (job.status === jobStatus.FAILED) {
console.log('Job %s %s in %s due to: %s', job_id, job.status, host, job.failed_reason);
if (job.data.status === jobStatus.FAILED) {
debug('Job %s %s in %s due to: %s', job_id, job.data.status, host, job.failed_reason);
} else {
console.log('Job %s %s in %s', job_id, job.status, host);
debug('Job %s %s in %s', job_id, job.data.status, host);
}
self.emit('job:' + job.status, job_id);
self.emit('job:' + job.data.status, job_id);
callback();
});
});
};
module.exports = Batch;
Batch.prototype.drain = function (callback) {
var self = this;
var queues = this.jobQueuePool.list();
var batchQueues = queue(queues.length);
queues.forEach(function (host) {
batchQueues.defer(self._drainJob.bind(self), host);
});
batchQueues.awaitAll(function (err) {
if (err) {
debug('Something went wrong draining', err);
} else {
debug('Drain complete');
}
callback();
});
};
Batch.prototype._drainJob = function (host, callback) {
var self = this;
var job_id = self.jobQueuePool.getCurrentJobId(host);
if (!job_id) {
return process.nextTick(function () {
return callback();
});
}
var queue = self.jobQueuePool.getQueue(host);
this.jobService.drain(job_id, function (err) {
if (err && err.name === 'CancelNotAllowedError') {
return callback();
}
if (err) {
return callback(err);
}
queue.enqueueFirst(job_id, host, callback);
});
};
Batch.prototype.stop = function () {
this.jobSubscriber.unsubscribe();
};

View File

@ -12,6 +12,7 @@ var JobPublisher = require('./job_publisher');
var JobQueue = require('./job_queue');
var UserIndexer = require('./user_indexer');
var JobBackend = require('./job_backend');
var JobService = require('./job_service');
var Batch = require('./batch');
module.exports = function batchFactory (metadataBackend) {
@ -23,9 +24,11 @@ module.exports = function batchFactory (metadataBackend) {
var userIndexer = new UserIndexer(metadataBackend);
var jobBackend = new JobBackend(metadataBackend, jobQueue, jobPublisher, userIndexer);
var userDatabaseMetadataService = new UserDatabaseMetadataService(metadataBackend);
// TODO: down userDatabaseMetadataService
var queryRunner = new QueryRunner();
var jobRunner = new JobRunner(jobBackend, jobQueue, queryRunner, userDatabaseMetadataService);
var jobCanceller = new JobCanceller(metadataBackend, userDatabaseMetadataService, jobBackend);
var jobCanceller = new JobCanceller(userDatabaseMetadataService);
var jobService = new JobService(jobBackend, jobCanceller);
var jobRunner = new JobRunner(jobService, jobQueue, queryRunner, userDatabaseMetadataService);
return new Batch(jobSubscriber, jobQueuePool, jobRunner, jobCanceller);
return new Batch(jobSubscriber, jobQueuePool, jobRunner, jobService);
};

View File

@ -1,129 +1,205 @@
'use strict';
var uuid = require('node-uuid');
var queue = require('queue-async');
var debug = require('./util/debug')('job-backend');
var REDIS_PREFIX = 'batch:jobs:';
var REDIS_DB = 5;
var JOBS_TTL_IN_SECONDS = global.settings.jobs_ttl_in_seconds || 48 * 3600; // 48 hours
var jobStatus = require('./job_status');
function setPendingIfMutiqueryJob(sql) {
if (Array.isArray(sql)) {
for (var j = 0; j < sql.length; j++) {
sql[j] = {
query: sql[j],
status: jobStatus.PENDING
};
}
}
return sql;
}
var finalStatus = [
jobStatus.CANCELLED,
jobStatus.DONE,
jobStatus.FAILED,
jobStatus.UNKNOWN
];
function JobBackend(metadataBackend, jobQueueProducer, jobPublisher, userIndexer) {
this.db = 5;
this.redisPrefix = 'batch:jobs:';
this.metadataBackend = metadataBackend;
this.jobQueueProducer = jobQueueProducer;
this.jobPublisher = jobPublisher;
this.userIndexer = userIndexer;
}
JobBackend.prototype.create = function (username, sql, host, callback) {
function toRedisParams(job) {
var redisParams = [REDIS_PREFIX + job.job_id];
var obj = JSON.parse(JSON.stringify(job));
delete obj.job_id;
for (var property in obj) {
if (obj.hasOwnProperty(property)) {
redisParams.push(property);
if (property === 'query' && typeof obj[property] !== 'string') {
redisParams.push(JSON.stringify(obj[property]));
} else {
redisParams.push(obj[property]);
}
}
}
return redisParams;
}
function toObject(job_id, redisParams, redisValues) {
var obj = {};
redisParams.shift(); // job_id value
redisParams.pop(); // WARN: weird function pushed by metadataBackend
for (var i = 0; i < redisParams.length; i++) {
if (redisParams[i] === 'query') {
try {
obj[redisParams[i]] = JSON.parse(redisValues[i]);
} catch (e) {
obj[redisParams[i]] = redisValues[i];
}
} else {
obj[redisParams[i]] = redisValues[i];
}
}
obj.job_id = job_id; // adds redisKey as object property
return obj;
}
// TODO: is it really necessary??
function isJobFound(redisValues) {
return redisValues[0] && redisValues[1] && redisValues[2] && redisValues[3] && redisValues[4];
}
JobBackend.prototype.get = function (job_id, callback) {
var self = this;
var job_id = uuid.v4();
var now = new Date().toISOString();
sql = setPendingIfMutiqueryJob(sql);
var redisParams = [
this.redisPrefix + job_id,
'user', username,
'status', jobStatus.PENDING,
'query', JSON.stringify(sql),
'created_at', now,
'updated_at', now
REDIS_PREFIX + job_id,
'user',
'status',
'query',
'created_at',
'updated_at',
'host',
'failed_reason'
];
this.metadataBackend.redisCmd(this.db, 'HMSET', redisParams , function (err) {
self.metadataBackend.redisCmd(REDIS_DB, 'HMGET', redisParams , function (err, redisValues) {
if (err) {
return callback(err);
}
self.jobQueueProducer.enqueue(job_id, host, function (err) {
if (!isJobFound(redisValues)) {
var notFoundError = new Error('Job with id ' + job_id + ' not found');
notFoundError.name = 'NotFoundError';
return callback(notFoundError);
}
var jobData = toObject(job_id, redisParams, redisValues);
callback(null, jobData);
});
};
JobBackend.prototype.create = function (job, callback) {
var self = this;
self.get(job.job_id, function (err) {
if (err && err.name !== 'NotFoundError') {
return callback(err);
}
self.save(job, function (err, jobSaved) {
if (err) {
return callback(err);
}
self.jobQueueProducer.enqueue(job.job_id, job.host, function (err) {
if (err) {
return callback(err);
}
// broadcast to consumers
self.jobPublisher.publish(host);
self.jobPublisher.publish(job.host);
self.userIndexer.add(username, job_id, function (err) {
self.userIndexer.add(job.user, job.job_id, function (err) {
if (err) {
return callback(err);
}
self.get(job_id, callback);
callback(null, jobSaved);
});
});
});
});
};
JobBackend.prototype.update = function (job_id, sql, callback) {
JobBackend.prototype.update = function (job, callback) {
var self = this;
this.get(job_id, function (err, job) {
self.get(job.job_id, function (err) {
if (err) {
return callback(err);
}
if (job.status !== jobStatus.PENDING) {
return callback(new Error('Job is not pending, it cannot be updated'));
}
if (Array.isArray(job.query)) {
for (var i = 0; i < job.query.length; i++) {
if (job.query[i].status !== jobStatus.PENDING) {
return callback(new Error('Job is not pending, it cannot be updated'));
}
}
}
sql = setPendingIfMutiqueryJob(sql);
var now = new Date().toISOString();
var redisParams = [
self.redisPrefix + job_id,
'query', JSON.stringify(sql),
'updated_at', now
];
self.metadataBackend.redisCmd(self.db, 'HMSET', redisParams , function (err) {
if (err) {
return callback(err);
}
self.get(job_id, callback);
});
self.save(job, callback);
});
};
JobBackend.prototype.list = function (username, callback) {
JobBackend.prototype.save = function (job, callback) {
var self = this;
var redisParams = toRedisParams(job);
self.metadataBackend.redisCmd(REDIS_DB, 'HMSET', redisParams , function (err) {
if (err) {
return callback(err);
}
self.setTTL(job, function (err) {
if (err) {
return callback(err);
}
self.get(job.job_id, function (err, job) {
if (err) {
return callback(err);
}
callback(null, job);
});
});
});
};
function isFrozen(status) {
return finalStatus.indexOf(status) !== -1;
}
JobBackend.prototype.setTTL = function (job, callback) {
var self = this;
var redisKey = REDIS_PREFIX + job.job_id;
if (!isFrozen(job.status)) {
return callback();
}
self.metadataBackend.redisCmd(REDIS_DB, 'EXPIRE', [ redisKey, JOBS_TTL_IN_SECONDS ], callback);
};
JobBackend.prototype.list = function (user, callback) {
var self = this;
this.userIndexer.list(username, function (err, job_ids) {
this.userIndexer.list(user, function (err, job_ids) {
if (err) {
return callback(err);
}
var initialLength = job_ids.length;
self._getCleanedList(username, job_ids, function (err, jobs) {
self._getCleanedList(user, job_ids, function (err, jobs) {
if (err) {
return callback(err);
}
if (jobs.length < initialLength) {
return self.list(username, callback);
return self.list(user, callback);
}
callback(null, jobs);
@ -131,13 +207,13 @@ JobBackend.prototype.list = function (username, callback) {
});
};
JobBackend.prototype._getCleanedList = function (username, job_ids, callback) {
JobBackend.prototype._getCleanedList = function (user, job_ids, callback) {
var self = this;
var jobsQueue = queue(job_ids.length);
job_ids.forEach(function(job_id) {
jobsQueue.defer(self._getIndexedJob.bind(self), job_id, username);
jobsQueue.defer(self._getIndexedJob.bind(self), job_id, user);
});
jobsQueue.awaitAll(function (err, jobs) {
@ -151,15 +227,14 @@ JobBackend.prototype._getCleanedList = function (username, job_ids, callback) {
});
};
JobBackend.prototype._getIndexedJob = function (job_id, username, callback) {
JobBackend.prototype._getIndexedJob = function (job_id, user, callback) {
var self = this;
this.get(job_id, function (err, job) {
if (err && err.name === 'NotFoundError') {
return self.userIndexer.remove(username, job_id, function (err) {
return self.userIndexer.remove(user, job_id, function (err) {
if (err) {
console.error('Error removing key %s in user set', job_id, err);
debug('Error removing key %s in user set', job_id, err);
}
callback();
});
@ -173,257 +248,4 @@ JobBackend.prototype._getIndexedJob = function (job_id, username, callback) {
});
};
JobBackend.prototype._isJobFound = function (jobValues) {
return jobValues[0] && jobValues[1] && jobValues[2] && jobValues[3] && jobValues[4];
};
JobBackend.prototype.get = function (job_id, callback) {
var self = this;
var redisParams = [
this.redisPrefix + job_id,
'user',
'status',
'query',
'created_at',
'updated_at',
'failed_reason'
];
this.metadataBackend.redisCmd(this.db, 'HMGET', redisParams , function (err, jobValues) {
if (err) {
return callback(err);
}
if (!self._isJobFound(jobValues)) {
var notFoundError = new Error('Job with id ' + job_id + ' not found');
notFoundError.name = 'NotFoundError';
return callback(notFoundError);
}
var query;
try {
query = JSON.parse(jobValues[2]);
} catch (err) {
query = jobValues[2];
}
callback(null, {
job_id: job_id,
user: jobValues[0],
status: jobValues[1],
query: query,
created_at: jobValues[3],
updated_at: jobValues[4],
failed_reason: jobValues[5] ? jobValues[5] : undefined
});
});
};
JobBackend.prototype.setRunning = function (job, index, callback) {
var self = this;
var now = new Date().toISOString();
var redisParams = [
this.redisPrefix + job.job_id,
'status', jobStatus.RUNNING,
'updated_at', now,
];
if (!callback) {
callback = index;
} else if (index >= 0 && index < job.query.length) {
job.query[index].status = jobStatus.RUNNING;
redisParams = redisParams.concat('query', JSON.stringify(job.query));
}
this.metadataBackend.redisCmd(this.db, 'HMSET', redisParams, function (err) {
if (err) {
return callback(err);
}
self.get(job.job_id, callback);
});
};
JobBackend.prototype.setPending = function (job, index, callback) {
var self = this;
var now = new Date().toISOString();
var redisKey = this.redisPrefix + job.job_id;
var redisParams = [
redisKey,
'status', jobStatus.PENDING,
'updated_at', now
];
if (!callback) {
callback = index;
} else if (index >= 0 && index < job.query.length) {
job.query[index].status = jobStatus.PENDING;
redisParams = redisParams.concat('query', JSON.stringify(job.query));
}
this.metadataBackend.redisCmd(this.db, 'HMSET', redisParams , function (err) {
if (err) {
return callback(err);
}
self.get(job.job_id, callback);
});
};
JobBackend.prototype.setDone = function (job, index, callback) {
var self = this;
var now = new Date().toISOString();
var redisKey = this.redisPrefix + job.job_id;
var redisParams = [
redisKey,
'status', jobStatus.DONE,
'updated_at', now
];
if (!callback) {
callback = index;
} else if (index >= 0 && index < job.query.length) {
job.query[index].status = jobStatus.DONE;
redisParams = redisParams.concat('query', JSON.stringify(job.query));
}
this.metadataBackend.redisCmd(this.db, 'HMSET', redisParams , function (err) {
if (err) {
return callback(err);
}
self.metadataBackend.redisCmd(self.db, 'EXPIRE', [ redisKey, JOBS_TTL_IN_SECONDS ], function (err) {
if (err) {
return callback(err);
}
self.get(job.job_id, callback);
});
});
};
JobBackend.prototype.setJobPendingAndQueryDone = function (job, index, callback) {
var self = this;
var now = new Date().toISOString();
var redisKey = this.redisPrefix + job.job_id;
job.query[index].status = jobStatus.DONE;
var redisParams = [
redisKey,
'status', jobStatus.PENDING,
'updated_at', now,
'query', JSON.stringify(job.query)
];
this.metadataBackend.redisCmd(this.db, 'HMSET', redisParams , function (err) {
if (err) {
return callback(err);
}
self.get(job.job_id, callback);
});
};
JobBackend.prototype.setFailed = function (job, error, index, callback) {
var self = this;
var now = new Date().toISOString();
var redisKey = this.redisPrefix + job.job_id;
var redisParams = [
redisKey,
'status', jobStatus.FAILED,
'failed_reason', error.message,
'updated_at', now
];
if (!callback) {
callback = index;
} else if (index >= 0 && index < job.query.length) {
job.query[index].status = jobStatus.FAILED;
job.query[index].failed_reason = error.message;
redisParams = redisParams.concat('query', JSON.stringify(job.query));
}
this.metadataBackend.redisCmd(this.db, 'HMSET', redisParams , function (err) {
if (err) {
return callback(err);
}
self.metadataBackend.redisCmd(self.db, 'EXPIRE', [ redisKey, JOBS_TTL_IN_SECONDS ], function (err) {
if (err) {
return callback(err);
}
self.get(job.job_id, callback);
});
});
};
JobBackend.prototype.setCancelled = function (job, index, callback) {
var self = this;
var now = new Date().toISOString();
var redisKey = this.redisPrefix + job.job_id;
var redisParams = [
redisKey,
'status', jobStatus.CANCELLED,
'updated_at', now
];
if (!callback) {
callback = index;
} else if (index >= 0 && index < job.query.length) {
job.query[index].status = jobStatus.CANCELLED;
redisParams = redisParams.concat('query', JSON.stringify(job.query));
}
this.metadataBackend.redisCmd(this.db, 'HMSET', redisParams , function (err) {
if (err) {
return callback(err);
}
self.metadataBackend.redisCmd(self.db, 'PERSIST', [ redisKey ], function (err) {
if (err) {
return callback(err);
}
self.get(job.job_id, callback);
});
});
};
JobBackend.prototype.setUnknown = function (job_id, callback) {
var self = this;
this.get(job_id, function (err, job) {
if (err) {
return callback(err);
}
var now = new Date().toISOString();
var redisKey = self.redisPrefix + job.job_id;
var redisParams = [
redisKey,
'status', jobStatus.UNKNOWN,
'updated_at', now
];
self.metadataBackend.redisCmd(self.db, 'HMSET', redisParams , function (err) {
if (err) {
return callback(err);
}
self.metadataBackend.redisCmd(self.db, 'EXPIRE', [ redisKey, JOBS_TTL_IN_SECONDS ], function (err) {
if (err) {
return callback(err);
}
self.get(job.job_id, callback);
});
});
});
};
module.exports = JobBackend;

View File

@ -1,87 +1,47 @@
'use strict';
var PSQL = require('cartodb-psql');
var jobStatus = require('./job_status');
function JobCanceller(metadataBackend, userDatabaseMetadataService, jobBackend) {
this.metadataBackend = metadataBackend;
function JobCanceller(userDatabaseMetadataService) {
this.userDatabaseMetadataService = userDatabaseMetadataService;
this.jobBackend = jobBackend;
}
function getIndexOfRunningQuery(job) {
if (Array.isArray(job.query)) {
for (var i = 0; i < job.query.length; i++) {
if (job.query[i].status === jobStatus.RUNNING) {
return i;
}
}
}
}
module.exports = JobCanceller;
JobCanceller.prototype.cancel = function (job_id, callback) {
var self = this;
self.jobBackend.get(job_id, function (err, job) {
JobCanceller.prototype.cancel = function (job, callback) {
this.userDatabaseMetadataService.getUserMetadata(job.data.user, function (err, userDatabaseMetadata) {
if (err) {
return callback(err);
}
if (job.status === jobStatus.PENDING) {
return self.jobBackend.setCancelled(job, callback);
}
if (job.status !== jobStatus.RUNNING) {
var cancelNotAllowedError = new Error('Job is ' + job.status + ', cancel is not allowed');
cancelNotAllowedError.name = 'CancelNotAllowedError';
return callback(cancelNotAllowedError);
}
self.userDatabaseMetadataService.getUserMetadata(job.user, function (err, userDatabaseMetadata) {
if (err) {
return callback(err);
}
self._query(job, userDatabaseMetadata, function (err, job) {
if (err) {
return callback(err);
}
var queryIndex = getIndexOfRunningQuery(job);
self.jobBackend.setCancelled(job, queryIndex, function (err, job) {
if (err) {
return callback(err);
}
callback(null, job, queryIndex);
});
});
});
doCancel(job.data.job_id, userDatabaseMetadata, callback);
});
};
JobCanceller.prototype.drain = function (job_id, callback) {
var self = this;
this.cancel(job_id, function (err, job, queryIndex) {
if (err && err.name === 'CancelNotAllowedError') {
return callback(err);
}
if (err) {
console.error('There was an error while draining job %s, %s ', job_id, err);
return self.jobBackend.setUnknown(job_id, callback);
}
self.jobBackend.setPending(job, queryIndex, callback);
});
};
JobCanceller.prototype._query = function (job, userDatabaseMetadata, callback) {
function doCancel(job_id, userDatabaseMetadata, callback) {
var pg = new PSQL(userDatabaseMetadata, {}, { destroyOnError: true });
var getPIDQuery = "SELECT pid FROM pg_stat_activity WHERE query LIKE '/* " + job.job_id + " */%'";
getQueryPID(pg, job_id, function (err, pid) {
if (err) {
return callback(err);
}
doCancelQuery(pg, pid, function (err, isCancelled) {
if (err) {
return callback(err);
}
if (!isCancelled) {
return callback(new Error('Query has not been cancelled'));
}
callback(null);
});
});
}
function getQueryPID(pg, job_id, callback) {
var getPIDQuery = "SELECT pid FROM pg_stat_activity WHERE query LIKE '/* " + job_id + " */%'";
pg.query(getPIDQuery, function(err, result) {
if (err) {
@ -92,7 +52,11 @@ JobCanceller.prototype._query = function (job, userDatabaseMetadata, callback) {
return callback(new Error('Query is not running currently'));
}
var pid = result.rows[0].pid;
callback(null, result.rows[0].pid);
});
}
function doCancelQuery(pg, pid, callback) {
var cancelQuery = 'SELECT pg_cancel_backend(' + pid + ')';
pg.query(cancelQuery, function (err, result) {
@ -102,14 +66,6 @@ JobCanceller.prototype._query = function (job, userDatabaseMetadata, callback) {
var isCancelled = result.rows[0].pg_cancel_backend;
if (!isCancelled) {
return callback(new Error('Query has not been cancelled'));
}
callback(null, job);
callback(null, isCancelled);
});
});
};
module.exports = JobCanceller;
}

22
batch/job_factory.js Normal file
View File

@ -0,0 +1,22 @@
'use strict';
var jobModels = require('./models');
function JobFactory() {
}
module.exports = JobFactory;
JobFactory.create = function (data) {
if (!data.query) {
throw new Error('You must indicate a valid SQL');
}
for (var i = 0; i < jobModels.length; i++) {
if (jobModels[i].is(data.query)) {
return new jobModels[i](data);
}
}
throw new Error('there is no job class for the provided query');
};

View File

@ -3,74 +3,30 @@
var errorCodes = require('../app/postgresql/error_codes').codeToCondition;
var jobStatus = require('./job_status');
function getNextQuery(job) {
if (!Array.isArray(job.query)) {
return {
query: job.query
};
}
for (var i = 0; i < job.query.length; i++) {
if (job.query[i].status === jobStatus.PENDING) {
return {
index: i,
query: job.query[i].query
};
}
}
}
function isLastQuery(job, index) {
if (!Array.isArray(job.query)) {
return true;
}
if (index >= (job.query.length -1)) {
return true;
}
return false;
}
function JobRunner(jobBackend, jobQueue, queryRunner,userDatabaseMetadataService) {
this.jobBackend = jobBackend;
function JobRunner(jobService, jobQueue, queryRunner, userDatabaseMetadataService) {
this.jobService = jobService;
this.jobQueue = jobQueue;
this.queryRunner = queryRunner;
this.userDatabaseMetadataService = userDatabaseMetadataService;
this.userDatabaseMetadataService = userDatabaseMetadataService; // TODO: move to queryRunner
}
JobRunner.prototype.run = function (job_id, callback) {
var self = this;
self.jobBackend.get(job_id, function (err, job) {
self.jobService.get(job_id, function (err, job) {
if (err) {
return callback(err);
}
if (job.status !== jobStatus.PENDING) {
var invalidJobStatusError = new Error([
'Cannot run job',
job.job_id,
'due to its status is',
job.status
].join(' '));
invalidJobStatusError.name = 'InvalidJobStatus';
return callback(invalidJobStatusError);
var query = job.getNextQuery();
try {
job.setStatus(jobStatus.RUNNING);
} catch (err) {
return callback(err);
}
var query = getNextQuery(job);
if (!query) {
var queryNotFoundError = new Error([
'Cannot run job',
job.job_id,
', there is no query to run'
].join(' '));
queryNotFoundError.name = 'QueryNotFound';
return callback(queryNotFoundError);
}
self.jobBackend.setRunning(job, query.index, function (err, job) {
self.jobService.save(job, function (err, job) {
if (err) {
return callback(err);
}
@ -82,32 +38,46 @@ JobRunner.prototype.run = function (job_id, callback) {
JobRunner.prototype._run = function (job, query, callback) {
var self = this;
self.userDatabaseMetadataService.getUserMetadata(job.user, function (err, userDatabaseMetadata) {
// TODO: move to query
self.userDatabaseMetadataService.getUserMetadata(job.data.user, function (err, userDatabaseMetadata) {
if (err) {
return callback(err);
}
self.queryRunner.run(job.job_id, query.query, userDatabaseMetadata, function (err /*, result */) {
self.queryRunner.run(job.data.job_id, query, userDatabaseMetadata, function (err /*, result */) {
if (err) {
// if query has been cancelled then it's going to get the current
// job status saved by query_canceller
if (errorCodes[err.code.toString()] === 'query_canceled') {
return self.jobBackend.get(job.job_id, callback);
return self.jobService.get(job.data.job_id, callback);
}
return self.jobBackend.setFailed(job, err, query.index, callback);
try {
job.setStatus(jobStatus.FAILED);
} catch (err) {
return callback(err);
}
if (isLastQuery(job, query.index)) {
return self.jobBackend.setDone(job, query.index, callback);
return self.jobService.save(job, callback);
}
self.jobBackend.setJobPendingAndQueryDone(job, query.index, function (err, job) {
try {
job.setStatus(jobStatus.DONE);
} catch (err) {
return callback(err);
}
self.jobService.save(job, function (err, job) {
if (err) {
return callback(err);
}
self.jobQueue.enqueue(job.job_id, userDatabaseMetadata.host, function (err){
if (job.isDone()) {
return callback(null, job);
}
self.jobQueue.enqueue(job.data.job_id, userDatabaseMetadata.host, function (err) {
if (err) {
return callback(err);
}

166
batch/job_service.js Normal file
View File

@ -0,0 +1,166 @@
'use strict';
var debug = require('./util/debug')('job-service');
var JobFactory = require('./job_factory');
var jobStatus = require('./job_status');
function JobService(jobBackend, jobCanceller) {
this.jobBackend = jobBackend;
this.jobCanceller = jobCanceller;
}
module.exports = JobService;
JobService.prototype.get = function (job_id, callback) {
this.jobBackend.get(job_id, function (err, data) {
if (err) {
return callback(err);
}
var job;
try {
job = JobFactory.create(data);
} catch (err) {
return callback(err);
}
callback(null, job);
});
};
JobService.prototype.list = function (user, callback) {
this.jobBackend.list(user, function (err, dataList) {
if (err) {
return callback(err);
}
var jobList = dataList.map(function (data) {
var job;
try {
job = JobFactory.create(data);
} catch (err) {
return debug(err);
}
return job;
})
.filter(function (job) {
return job !== undefined;
});
callback(null, jobList);
});
};
JobService.prototype.create = function (data, callback) {
try {
var job = JobFactory.create(data);
job.validate();
this.jobBackend.create(job.data, function (err) {
if (err) {
return callback(err);
}
callback(null, job);
});
} catch (err) {
return callback(err);
}
};
JobService.prototype.update = function (data, callback) {
var self = this;
self.get(data.job_id, function (err, job) {
if (err) {
return callback(err);
}
try {
job.setQuery(data.query);
self.save(job, callback);
} catch (err) {
return callback(err);
}
});
};
JobService.prototype.save = function (job, callback) {
var self = this;
try {
job.validate();
} catch (err) {
return callback(err);
}
self.jobBackend.update(job.data, function (err, data) {
if (err) {
return callback(err);
}
try {
job = JobFactory.create(data);
} catch (err) {
return callback(err);
}
callback(null, job);
});
};
JobService.prototype.cancel = function (job_id, callback) {
var self = this;
self.get(job_id, function (err, job) {
if (err) {
return callback(err);
}
var isPending = job.isPending();
try {
job.setStatus(jobStatus.CANCELLED);
} catch (err) {
return callback(err);
}
if (isPending) {
return self.save(job, callback);
}
self.jobCanceller.cancel(job, function (err) {
if (err) {
return callback(err);
}
self.save(job, callback);
});
});
};
JobService.prototype.drain = function (job_id, callback) {
var self = this;
self.get(job_id, function (err, job) {
if (err) {
return callback(err);
}
self.jobCanceller.cancel(job, function (err) {
if (err) {
debug('There was an error while draining job %s, %s ', job_id, err);
return callback(err);
}
try {
job.setStatus(jobStatus.PENDING);
} catch (err) {
return callback(err);
}
self.jobBackend.update(job.data, callback);
});
});
};

View File

@ -1,11 +1,12 @@
'use strict';
var debug = require('./util/debug')('job-subscriber');
var SUBSCRIBE_INTERVAL_IN_MILLISECONDS = 10 * 60 * 1000; // 10 minutes
function _subscribe(client, channel, queueSeeker, onMessage) {
queueSeeker.seek(onMessage, function (err) {
if (err) {
console.error(err);
debug(err);
}
client.removeAllListeners('message');

6
batch/models/index.js Normal file
View File

@ -0,0 +1,6 @@
'use strict';
var JobSimple = require('./job_simple');
var JobMultiple = require('./job_multiple');
module.exports = [ JobSimple, JobMultiple ];

137
batch/models/job_base.js Normal file
View File

@ -0,0 +1,137 @@
'use strict';
var assert = require('assert');
var uuid = require('node-uuid');
var jobStatus = require('../job_status');
var validStatusTransitions = [
[jobStatus.PENDING, jobStatus.RUNNING],
[jobStatus.PENDING, jobStatus.CANCELLED],
[jobStatus.PENDING, jobStatus.UNKNOWN],
[jobStatus.RUNNING, jobStatus.DONE],
[jobStatus.RUNNING, jobStatus.FAILED],
[jobStatus.RUNNING, jobStatus.CANCELLED],
[jobStatus.RUNNING, jobStatus.PENDING],
[jobStatus.RUNNING, jobStatus.UNKNOWN]
];
var mandatoryProperties = [
'job_id',
'status',
'query',
'created_at',
'updated_at',
'host',
'user'
];
function JobBase(data) {
var now = new Date().toISOString();
this.data = data;
if (!this.data.job_id) {
this.data.job_id = uuid.v4();
}
if (!this.data.created_at) {
this.data.created_at = now;
}
if (!this.data.updated_at) {
this.data.updated_at = now;
}
if (!this.data.status) {
this.data.status = jobStatus.PENDING;
}
}
module.exports = JobBase;
JobBase.prototype.isValidStatusTransition = function (initialStatus, finalStatus) {
var transition = [ initialStatus, finalStatus ];
for (var i = 0; i < validStatusTransitions.length; i++) {
try {
assert.deepEqual(transition, validStatusTransitions[i]);
return true;
} catch (e) {
continue;
}
}
return false;
};
// should be implemented by childs
JobBase.prototype.getNextQuery = function () {
throw new Error('Unimplemented method');
};
JobBase.prototype.hasNextQuery = function () {
return !!this.getNextQuery();
};
JobBase.prototype.isPending = function () {
return this.data.status === jobStatus.PENDING;
};
JobBase.prototype.isRunning = function () {
return this.data.status === jobStatus.RUNNING;
};
JobBase.prototype.isDone = function () {
return this.data.status === jobStatus.DONE;
};
JobBase.prototype.isCancelled = function () {
return this.data.status === jobStatus.CANCELLED;
};
JobBase.prototype.isFailed = function () {
return this.data.status === jobStatus.FAILED;
};
JobBase.prototype.isUnknown = function () {
return this.data.status === jobStatus.UNKNOWN;
};
JobBase.prototype.setQuery = function (query) {
var now = new Date().toISOString();
if (!this.isPending()) {
throw new Error('Job is not pending, it cannot be updated');
}
this.data.updated_at = now;
this.data.query = query;
};
JobBase.prototype.setStatus = function (finalStatus) {
var now = new Date().toISOString();
var initialStatus = this.data.status;
var isValid = this.isValidStatusTransition(initialStatus, finalStatus);
if (!isValid) {
throw new Error('Cannot set status from ' + initialStatus + ' to ' + finalStatus);
}
this.data.updated_at = now;
this.data.status = finalStatus;
};
JobBase.prototype.validate = function () {
for (var i = 0; i < mandatoryProperties.length; i++) {
if (!this.data[mandatoryProperties[i]]) {
throw new Error('property "' + mandatoryProperties[i] + '" is mandatory');
}
}
};
JobBase.prototype.serialize = function () {
var data = JSON.parse(JSON.stringify(this.data));
delete data.host;
return data;
};

View File

@ -0,0 +1,83 @@
'use strict';
var util = require('util');
var JobBase = require('./job_base');
var jobStatus = require('../job_status');
function JobMultiple(data) {
JobBase.call(this, data);
this.init();
}
util.inherits(JobMultiple, JobBase);
module.exports = JobMultiple;
JobMultiple.is = function (query) {
if (!Array.isArray(query)) {
return false;
}
// 1. From user: ['select * from ...', 'select * from ...']
// 2. From redis: [ { query: 'select * from ...', status: 'pending' },
// { query: 'select * from ...', status: 'pending' } ]
for (var i = 0; i < query.length; i++) {
if (typeof query[i] !== 'string') {
if (typeof query[i].query !== 'string') {
return false;
}
}
}
return true;
};
JobMultiple.prototype.init = function () {
for (var i = 0; i < this.data.query.length; i++) {
if (!this.data.query[i].query && !this.data.query[i].status) {
this.data.query[i] = {
query: this.data.query[i],
status: jobStatus.PENDING
};
}
}
};
JobMultiple.prototype.getNextQuery = function () {
for (var i = 0; i < this.data.query.length; i++) {
if (this.data.query[i].status === jobStatus.PENDING) {
return this.data.query[i].query;
}
}
};
JobMultiple.prototype.setQuery = function (query) {
if (!JobMultiple.is(query)) {
throw new Error('You must indicate a valid SQL');
}
JobMultiple.super_.prototype.setQuery.call(this, query);
};
JobMultiple.prototype.setStatus = function (finalStatus) {
var initialStatus = this.data.status;
// if transition is to "done" and there are more queries to run
// then job status must be "pending" instead of "done"
// else job status transition to done (if "running")
if (finalStatus === jobStatus.DONE && this.hasNextQuery()) {
JobMultiple.super_.prototype.setStatus.call(this, jobStatus.PENDING);
} else {
JobMultiple.super_.prototype.setStatus.call(this, finalStatus);
}
for (var i = 0; i < this.data.query.length; i++) {
var isValid = JobMultiple.super_.prototype.isValidStatusTransition(this.data.query[i].status, finalStatus);
if (isValid) {
this.data.query[i].status = finalStatus;
return;
}
}
throw new Error('Cannot set status from ' + initialStatus + ' to ' + finalStatus);
};

View File

@ -0,0 +1,29 @@
'use strict';
var util = require('util');
var JobBase = require('./job_base');
function JobSimple(data) {
JobBase.call(this, data);
}
util.inherits(JobSimple, JobBase);
module.exports = JobSimple;
JobSimple.is = function (query) {
return typeof query === 'string';
};
JobSimple.prototype.getNextQuery = function () {
if (this.isPending()) {
return this.data.query;
}
};
JobSimple.prototype.setQuery = function (query) {
if (!JobSimple.is(query)) {
throw new Error('You must indicate a valid SQL');
}
JobSimple.super_.prototype.setQuery.call(this, query);
};

View File

@ -5,8 +5,9 @@ var PSQL = require('cartodb-psql');
function QueryRunner() {
}
QueryRunner.prototype.run = function (job_id, sql, userDatabaseMetadata, callback) {
module.exports = QueryRunner;
QueryRunner.prototype.run = function (job_id, sql, userDatabaseMetadata, callback) {
var pg = new PSQL(userDatabaseMetadata, {}, { destroyOnError: true });
pg.query('SET statement_timeout=0', function (err) {
@ -35,6 +36,3 @@ QueryRunner.prototype.run = function (job_id, sql, userDatabaseMetadata, callbac
});
};
module.exports = QueryRunner;

7
batch/util/debug.js Normal file
View File

@ -0,0 +1,7 @@
'use strict';
var debug = require('debug');
module.exports = function batchDebug (ns) {
return debug(['batch', ns].join(':'));
};

39
npm-shrinkwrap.json generated
View File

@ -23,18 +23,6 @@
"resolved": "https://registry.npmjs.org/buffer-writer/-/buffer-writer-1.0.0.tgz"
}
}
},
"debug": {
"version": "2.2.0",
"from": "debug@>=2.2.0 <2.3.0",
"resolved": "https://registry.npmjs.org/debug/-/debug-2.2.0.tgz",
"dependencies": {
"ms": {
"version": "0.7.1",
"from": "ms@0.7.1",
"resolved": "https://registry.npmjs.org/ms/-/ms-0.7.1.tgz"
}
}
}
}
},
@ -94,6 +82,18 @@
}
}
},
"debug": {
"version": "2.2.0",
"from": "debug@2.2.0",
"resolved": "https://registry.npmjs.org/debug/-/debug-2.2.0.tgz",
"dependencies": {
"ms": {
"version": "0.7.1",
"from": "ms@0.7.1",
"resolved": "https://registry.npmjs.org/ms/-/ms-0.7.1.tgz"
}
}
},
"express": {
"version": "2.5.11",
"from": "express@>=2.5.11 <2.6.0",
@ -256,20 +256,7 @@
"step-profiler": {
"version": "0.3.0",
"from": "step-profiler@>=0.3.0 <0.4.0",
"dependencies": {
"debug": {
"version": "2.2.0",
"from": "debug@>=2.2.0 <2.3.0",
"resolved": "https://registry.npmjs.org/debug/-/debug-2.2.0.tgz",
"dependencies": {
"ms": {
"version": "0.7.1",
"from": "ms@0.7.1",
"resolved": "https://registry.npmjs.org/ms/-/ms-0.7.1.tgz"
}
}
}
}
"resolved": "https://registry.npmjs.org/step-profiler/-/step-profiler-0.3.0.tgz"
},
"topojson": {
"version": "0.0.8",

View File

@ -19,6 +19,7 @@
"dependencies": {
"cartodb-psql": "~0.6.0",
"cartodb-redis": "~0.11.0",
"debug": "2.2.0",
"express": "~2.5.11",
"log4js": "https://github.com/CartoDB/log4js-node/tarball/cdb",
"lru-cache": "~2.5.0",

View File

@ -3,10 +3,14 @@ var _ = require('underscore');
var redis = require('redis');
var queue = require('queue-async');
var batchFactory = require('../../batch');
var JobPublisher = require('../../batch/job_publisher');
var JobQueue = require('../../batch/job_queue');
var UserIndexer = require('../../batch/user_indexer');
var JobBackend = require('../../batch/job_backend');
var JobService = require('../../batch/job_service');
var UserDatabaseMetadataService = require('../../batch/user_database_metadata_service');
var JobCanceller = require('../../batch/job_canceller');
var metadataBackend = require('cartodb-redis')({
host: global.settings.redis_host,
port: global.settings.redis_port,
@ -22,6 +26,9 @@ describe('batch module', function() {
var jobPublisher = new JobPublisher(redis);
var userIndexer = new UserIndexer(metadataBackend);
var jobBackend = new JobBackend(metadataBackend, jobQueue, jobPublisher, userIndexer);
var userDatabaseMetadataService = new UserDatabaseMetadataService(metadataBackend);
var jobCanceller = new JobCanceller(userDatabaseMetadataService);
var jobService = new JobService(jobBackend, jobCanceller);
var batch = batchFactory(metadataBackend);
@ -37,12 +44,18 @@ describe('batch module', function() {
});
function createJob(sql, done) {
jobBackend.create(username, sql, dbInstance, function (err, job) {
var data = {
user: username,
query: sql,
host: dbInstance
};
jobService.create(data, function (err, job) {
if (err) {
return done(err);
}
done(null, job);
done(null, job.serialize());
});
}
@ -196,8 +209,7 @@ describe('batch module', function() {
});
it('should perform job with array of select', function (done) {
var queries = ['select * from private_table', 'select * from private_table'];
var queries = ['select * from private_table limit 1', 'select * from private_table'];
createJob(queries, function (err, job) {
if (err) {

View File

@ -91,7 +91,7 @@ describe('Use case 1: cancel and modify a done job', function () {
status: 400
}, function(res) {
var errors = JSON.parse(res.body);
assert.equal(errors.error[0], "Job is done, cancel is not allowed");
assert.equal(errors.error[0], "Cannot set status from done to cancelled");
done();
});
});

View File

@ -26,7 +26,7 @@ var metadataBackend = require('cartodb-redis')({
});
var batchFactory = require('../../batch');
describe('Use case 1: cancel and modify a done multiquery job', function () {
describe('Use case 10: cancel and modify a done multiquery job', function () {
var batch = batchFactory(metadataBackend);
@ -95,7 +95,7 @@ describe('Use case 1: cancel and modify a done multiquery job', function () {
status: 400
}, function(res) {
var errors = JSON.parse(res.body);
assert.equal(errors.error[0], "Job is done, cancel is not allowed");
assert.equal(errors.error[0], "Cannot set status from done to cancelled");
done();
});
});
@ -106,7 +106,10 @@ describe('Use case 1: cancel and modify a done multiquery job', function () {
headers: { 'host': 'vizzuality.cartodb.com', 'Content-Type': 'application/x-www-form-urlencoded' },
method: 'PUT',
data: querystring.stringify({
query: "SELECT cartodb_id FROM untitle_table_4"
query: [
"SELECT * FROM untitle_table_4",
"SELECT * FROM untitle_table_4"
]
})
}, {
status: 400

View File

@ -121,7 +121,7 @@ describe('Use case 2: cancel a running job', function() {
status: 400
}, function(res) {
var errors = JSON.parse(res.body);
assert.equal(errors.error[0], "Job is cancelled, cancel is not allowed");
assert.equal(errors.error[0], "Cannot set status from cancelled to cancelled");
done();
});
});

View File

@ -97,7 +97,7 @@ describe('Use case 3: cancel a pending job', function() {
}, 50);
});
it('Step 4, cancel a job should be cancelled', function (done){
it('Step 4, cancel a pending job should be cancelled', function (done){
assert.response(app, {
url: '/api/v2/sql/job/' + pendingJob.job_id + '?api_key=1234',
headers: { 'host': 'vizzuality.cartodb.com', 'Content-Type': 'application/x-www-form-urlencoded' },

View File

@ -125,7 +125,7 @@ describe('Use case 8: cancel a running multiquery job', function() {
status: 400
}, function(res) {
var errors = JSON.parse(res.body);
assert.equal(errors.error[0], "Job is cancelled, cancel is not allowed");
assert.equal(errors.error[0], "Cannot set status from cancelled to cancelled");
done();
});
});
@ -136,7 +136,12 @@ describe('Use case 8: cancel a running multiquery job', function() {
headers: { 'host': 'vizzuality.cartodb.com', 'Content-Type': 'application/x-www-form-urlencoded' },
method: 'PUT',
data: querystring.stringify({
query: "SELECT cartodb_id FROM untitle_table_4"
query: [
"select pg_sleep(1)",
"select pg_sleep(1)",
"select pg_sleep(1)",
"select pg_sleep(1)"
]
})
}, {
status: 400

View File

@ -103,20 +103,29 @@ describe('Use case 9: modify a pending multiquery job', function() {
}, 50);
});
it('Step 4, multiquery job should be modified', function (done){
it('Step 4, multiquery job should be modified', function (done) {
assert.response(app, {
url: '/api/v2/sql/job/' + pendingJob.job_id + '?api_key=1234',
headers: { 'host': 'vizzuality.cartodb.com', 'Content-Type': 'application/x-www-form-urlencoded' },
method: 'PUT',
data: querystring.stringify({
query: "SELECT cartodb_id FROM untitle_table_4"
query: [
"SELECT * FROM untitle_table_4",
"SELECT * FROM untitle_table_4 limit 1"
]
})
}, {
status: 200
}, function(res) {
var jobGot = JSON.parse(res.body);
assert.equal(jobGot.job_id, pendingJob.job_id);
assert.equal(jobGot.query, "SELECT cartodb_id FROM untitle_table_4");
assert.deepEqual(jobGot.query, [{
query: 'SELECT * FROM untitle_table_4',
status: 'pending'
}, {
query: 'SELECT * FROM untitle_table_4 limit 1',
status: 'pending'
}]);
done();
});
});