Passed tests

This commit is contained in:
Daniel García Aubert 2016-05-16 01:22:47 +02:00
parent cc7dd7a0d2
commit d2d3ba8159
17 changed files with 160 additions and 120 deletions

View File

@ -28,6 +28,8 @@ var JobQueue = require('../batch/job_queue');
var UserIndexer = require('../batch/user_indexer');
var JobBackend = require('../batch/job_backend');
var JobCanceller = require('../batch/job_canceller');
var JobService = require('../batch/job_service');
var UserDatabaseMetadataService = require('../batch/user_database_metadata_service');
var cors = require('./middlewares/cors');
@ -183,7 +185,9 @@ function App() {
var userIndexer = new UserIndexer(metadataBackend);
var jobBackend = new JobBackend(metadataBackend, jobQueue, jobPublisher, userIndexer);
var userDatabaseMetadataService = new UserDatabaseMetadataService(metadataBackend);
var jobCanceller = new JobCanceller(metadataBackend, userDatabaseMetadataService, jobBackend);
var jobCanceller = new JobCanceller(userDatabaseMetadataService);
var jobService = new JobService(jobBackend, jobCanceller);
var genericController = new GenericController();
genericController.route(app);
@ -191,7 +195,7 @@ function App() {
var queryController = new QueryController(userDatabaseService, tableCache, statsd_client);
queryController.route(app);
var jobController = new JobController(userDatabaseService, jobBackend, jobCanceller);
var jobController = new JobController(userDatabaseService, jobService, jobCanceller);
jobController.route(app);
var cacheStatusController = new CacheStatusController(tableCache);

View File

@ -26,7 +26,7 @@ module.exports = function batchFactory (metadataBackend) {
var userDatabaseMetadataService = new UserDatabaseMetadataService(metadataBackend);
// TODO: down userDatabaseMetadataService
var queryRunner = new QueryRunner();
var jobCanceller = new JobCanceller(metadataBackend, userDatabaseMetadataService, jobBackend);
var jobCanceller = new JobCanceller(userDatabaseMetadataService);
var jobService = new JobService(jobBackend, jobCanceller);
var jobRunner = new JobRunner(jobService, jobQueue, queryRunner, userDatabaseMetadataService);

View File

@ -19,23 +19,44 @@ function JobBackend(metadataBackend, jobQueueProducer, jobPublisher, userIndexer
this.userIndexer = userIndexer;
}
JobBackend.prototype.toRedisParams = function (obj) {
var redisParams = [];
JobBackend.prototype.toRedisParams = function (data) {
var redisParams = [this.redisPrefix + data.job_id];
var obj = JSON.parse(JSON.stringify(data));
delete obj.job_id;
for (var property in obj) {
if (obj.hasOwnProperty(property)) {
redisParams.push(property);
if (property === 'query' && typeof obj[property] !== 'string') {
redisParams.push(JSON.stringify(obj[property]));
} else {
redisParams.push(obj[property]);
}
}
}
return redisParams;
};
JobBackend.prototype.toObject = function (redisParams, redisValues) {
JobBackend.prototype.toObject = function (job_id, redisParams, redisValues) {
var obj = {};
redisParams.shift(); // job_id value
redisParams.pop(); // WARN: weird function pushed by metadataBackend
for (var i = 0; i < redisParams.length; i++) {
if (redisParams[i] === 'query') {
try {
obj[redisParams[i]] = JSON.parse(redisValues[i]);
} catch (e) {
obj[redisParams[i]] = redisValues[i];
}
} else {
obj[redisParams[i]] = redisValues[i];
}
}
obj.job_id = job_id; // adds redisKey as object property
return obj;
};
@ -54,6 +75,7 @@ JobBackend.prototype.get = function (job_id, callback) {
'query',
'created_at',
'updated_at',
'host',
'failed_reason'
];
@ -68,8 +90,7 @@ JobBackend.prototype.get = function (job_id, callback) {
return callback(notFoundError);
}
var jobData = self.toObject(redisParams.slice(1), redisValues);
jobData.job_id = job_id;
var jobData = self.toObject(job_id, redisParams, redisValues);
callback(null, jobData);
});

View File

@ -47,7 +47,7 @@ function JobBase(data) {
module.exports = JobBase;
JobBase.isValidStatusTransition = function (initialStatus, finalStatus) {
JobBase.prototype.isValidStatusTransition = function (initialStatus, finalStatus) {
var transition = [ initialStatus, finalStatus ];
for (var i = 0; i < validStatusTransitions.length; i++) {
@ -67,11 +67,12 @@ JobBase.prototype.getNextQuery = function () {
throw new Error('Unimplemented method');
};
// should be implemented by childs
JobBase.prototype.hasNextQuery = function () {
throw new Error('Unimplemented method');
return !!this.getNextQuery();
};
JobBase.prototype.isPending = function () {
return this.data.status === jobStatus.PENDING;
};
@ -96,21 +97,19 @@ JobBase.prototype.isUnknown = function () {
return this.data.status === jobStatus.UNKNOWN;
};
JobBase.prototype.set = function (data) {
JobBase.prototype.setQuery = function (query) {
var now = new Date().toISOString();
if (data.job_id !== this.data.job_id) {
throw new Error('Cannot modify id');
if (!this.isPending()) {
throw new Error('Job is not pending, it cannot be updated');
}
this.data.update_at = now;
};
JobBase.prototype.setQuery = function (/* query */) {
throw new Error('Unimplemented method');
this.data.updated_at = now;
this.data.query = query;
};
JobBase.prototype.setStatus = function (finalStatus) {
var now = new Date().toISOString();
var initialStatus = this.data.status;
var isValid = this.isValidStatusTransition(initialStatus, finalStatus);
@ -118,6 +117,7 @@ JobBase.prototype.setStatus = function (finalStatus) {
throw new Error('Cannot set status from ' + initialStatus + ' to ' + finalStatus);
}
this.data.updated_at = now;
this.data.status = finalStatus;
};

View File

@ -26,7 +26,7 @@ function doCancel(job_id, userDatabaseMetadata, callback) {
return callback(err);
}
cancelQuery(pg, pid, function (err, isCancelled) {
doCancelQuery(pg, pid, function (err, isCancelled) {
if (err) {
return callback(err);
}
@ -56,7 +56,7 @@ function getQueryPID(pg, job_id, callback) {
});
}
function cancelQuery(pg, pid, callback) {
function doCancelQuery(pg, pid, callback) {
var cancelQuery = 'SELECT pg_cancel_backend(' + pid + ')';
pg.query(cancelQuery, function (err, result) {

View File

@ -1,22 +1,22 @@
'use strict';
var JobSimple = require('job_simple');
var JobMultiple = require('job_multiple');
var JobSimple = require('./job_simple');
var JobMultiple = require('./job_multiple');
var jobClasses = [ JobSimple, JobMultiple ];
function JobFactory() {
this.jobClasses = [ JobSimple, JobMultiple ];
}
module.exports = JobFactory;
JobFactory.create = function (data) {
if (!data.query) {
throw new Error('param "query" is mandatory');
throw new Error('You must indicate a valid SQL');
}
for (var i = 0; i < this.jobClasses.length; i++) {
if (this.jobClasses[i].is(data.query)) {
return new this.jobClasses[i](data);
for (var i = 0; i < jobClasses.length; i++) {
if (jobClasses[i].is(data.query)) {
return new jobClasses[i](data);
}
}

View File

@ -13,64 +13,54 @@ util.inherits(JobMultiple, JobBase);
module.exports = JobMultiple;
JobMultiple.prototype.is = function (query) {
JobMultiple.is = function (query) {
if (!Array.isArray(query)) {
return false;
}
// 1. From user: ['select * from ...', 'select * from ...']
// 2. From redis: [ { query: 'select * from ...', status: 'pending' },
// { query: 'select * from ...', status: 'pending' } ]
for (var i = 0; i < query.length; i++) {
if (typeof query[i] !== 'string') {
if (typeof query[i].query !== 'string') {
return false;
}
}
}
return true;
};
JobMultiple.prototype.init = function () {
for (var i = 0; i < this.data.query.length; i++) {
if (!this.data.query[i].query && !this.data.query[i].status) {
this.data.query[i] = {
query: this.data.query[i],
status: jobStatus.PENDING
};
}
};
JobMultiple.prototype.isPending = function (index) {
var isPending = JobMultiple.super_.prototype.isPending.call(this);
if (isPending && index) {
isPending = this.data.query[index].status === jobStatus.PENDING;
}
return isPending;
};
JobMultiple.prototype.hasNextQuery = function () {
return !!this.getNextQuery();
};
JobMultiple.prototype.getNextQuery = function () {
if (this.isPending()) {
for (var i = 0; i < this.data.query.length; i++) {
if (this.isPending(i)) {
if (this.data.query[i].status === jobStatus.PENDING) {
return this.data.query[i].query;
}
}
}
};
JobMultiple.prototype.setQuery = function (query) {
var isMultiple = this.is(query);
if (this.isPending() && isMultiple) {
this.data.query = query;
if (!JobMultiple.is(query)) {
throw new Error('You must indicate a valid SQL');
}
JobMultiple.super_.prototype.setQuery.call(this, query);
};
JobMultiple.prototype.setStatus = function (finalStatus) {
var initialStatus = this.data.status;
// if transition is to "done" and there are more queries to run
// then job status must be "pending" instead of "done"
// else job status transition to done (if "running")
@ -81,7 +71,7 @@ JobMultiple.prototype.setStatus = function (finalStatus) {
}
for (var i = 0; i < this.data.query.length; i++) {
var isValid = JobMultiple.super_.isValidStatusTransition(this.data.query[i].status, finalStatus);
var isValid = JobMultiple.super_.prototype.isValidStatusTransition(this.data.query[i].status, finalStatus);
if (isValid) {
this.data.query[i].status = finalStatus;
@ -91,15 +81,3 @@ JobMultiple.prototype.setStatus = function (finalStatus) {
throw new Error('Cannot set status from ' + initialStatus + ' to ' + finalStatus);
};
JobMultiple.prototype.set = function (data) {
JobMultiple.super_.prototype.set.call(this, data);
if (data.status) {
this.setStatus(data.status);
}
if (data.query) {
this.setQuery(data.query);
}
};

View File

@ -18,6 +18,8 @@ JobRunner.prototype.run = function (job_id, callback) {
return callback(err);
}
var query = job.getNextQuery();
try {
job.setStatus(jobStatus.RUNNING);
} catch (err) {
@ -29,16 +31,14 @@ JobRunner.prototype.run = function (job_id, callback) {
return callback(err);
}
self._run(job, callback);
self._run(job, query, callback);
});
});
};
JobRunner.prototype._run = function (job, callback) {
JobRunner.prototype._run = function (job, query, callback) {
var self = this;
var query = job.getNextQuery();
// TODO: move to query
self.userDatabaseMetadataService.getUserMetadata(job.data.user, function (err, userDatabaseMetadata) {
if (err) {

View File

@ -57,7 +57,12 @@ JobService.prototype.create = function (data, callback) {
try {
var job = JobFactory.create(data);
job.validate();
this.jobBackend.create(job.data, callback);
this.jobBackend.create(job.data, function (err) {
if (err) {
return callback(err);
}
callback(null, job);
});
} catch (err) {
return callback(err);
}
@ -72,7 +77,7 @@ JobService.prototype.update = function (data, callback) {
}
try {
job.set(data);
job.setQuery(data.query);
self.save(job, callback);
} catch (err) {
return callback(err);
@ -85,10 +90,23 @@ JobService.prototype.save = function (job, callback) {
try {
job.validate();
self.jobBackend.update(job.data, callback);
} catch (err) {
return callback(err);
}
self.jobBackend.update(job.data, function (err, data) {
if (err) {
return callback(err);
}
try {
job = JobFactory.create(data);
} catch (err) {
return callback(err);
}
callback(null, job);
});
};
JobService.prototype.cancel = function (job_id, callback) {
@ -99,18 +117,24 @@ JobService.prototype.cancel = function (job_id, callback) {
return callback(err);
}
var isPending = job.isPending();
try {
job.setStatus(jobStatus.CANCELLED);
} catch (err) {
return callback(err);
}
if (isPending) {
return self.save(job, callback);
}
self.jobCanceller.cancel(job, function (err) {
if (err) {
return callback(err);
}
self.jobBackend.update(job.data, callback);
self.save(job, callback);
});
});
};
@ -125,7 +149,7 @@ JobService.prototype.drain = function (job_id, callback) {
self.jobCanceller.cancel(job, function (err) {
if (err) {
// console.error('There was an error while draining job %s, %s ', job_id, err);
console.error('There was an error while draining job %s, %s ', job_id, err);
return callback(err);
}

View File

@ -10,36 +10,20 @@ util.inherits(JobSimple, JobBase);
module.exports = JobSimple;
JobSimple.prototype.is = function (query) {
JobSimple.is = function (query) {
return typeof query === 'string';
};
JobSimple.prototype.hasNextQuery = function () {
return this.isPending();
};
JobSimple.prototype.getNextQuery = function () {
if (this.hasNextQuery()) {
if (this.isPending()) {
return this.data.query;
}
};
JobSimple.prototype.setQuery = function (query) {
var isSimple = this.is(query);
if (this.isPending() && isSimple) {
this.data.query = query;
}
};
JobSimple.prototype.set = function (data) {
JobSimple.super_.prototype.set.call(this, data);
if (data.status) {
this.setStatus(data.status);
}
if (data.query) {
this.setQuery(data.query);
if (!JobSimple.is(query)) {
throw new Error('You must indicate a valid SQL');
}
JobSimple.super_.prototype.setQuery.call(this, query);
};

View File

@ -3,10 +3,14 @@ var _ = require('underscore');
var redis = require('redis');
var queue = require('queue-async');
var batchFactory = require('../../batch');
var JobPublisher = require('../../batch/job_publisher');
var JobQueue = require('../../batch/job_queue');
var UserIndexer = require('../../batch/user_indexer');
var JobBackend = require('../../batch/job_backend');
var JobService = require('../../batch/job_service');
var UserDatabaseMetadataService = require('../../batch/user_database_metadata_service');
var JobCanceller = require('../../batch/job_canceller');
var metadataBackend = require('cartodb-redis')({
host: global.settings.redis_host,
port: global.settings.redis_port,
@ -22,6 +26,9 @@ describe('batch module', function() {
var jobPublisher = new JobPublisher(redis);
var userIndexer = new UserIndexer(metadataBackend);
var jobBackend = new JobBackend(metadataBackend, jobQueue, jobPublisher, userIndexer);
var userDatabaseMetadataService = new UserDatabaseMetadataService(metadataBackend);
var jobCanceller = new JobCanceller(userDatabaseMetadataService);
var jobService = new JobService(jobBackend, jobCanceller);
var batch = batchFactory(metadataBackend);
@ -37,12 +44,18 @@ describe('batch module', function() {
});
function createJob(sql, done) {
jobBackend.create(username, sql, dbInstance, function (err, job) {
var data = {
user: username,
query: sql,
host: dbInstance
};
jobService.create(data, function (err, job) {
if (err) {
return done(err);
}
done(null, job);
done(null, job.serialize());
});
}
@ -196,8 +209,7 @@ describe('batch module', function() {
});
it('should perform job with array of select', function (done) {
var queries = ['select * from private_table', 'select * from private_table'];
var queries = ['select * from private_table limit 1', 'select * from private_table'];
createJob(queries, function (err, job) {
if (err) {

View File

@ -91,7 +91,7 @@ describe('Use case 1: cancel and modify a done job', function () {
status: 400
}, function(res) {
var errors = JSON.parse(res.body);
assert.equal(errors.error[0], "Job is done, cancel is not allowed");
assert.equal(errors.error[0], "Cannot set status from done to cancelled");
done();
});
});

View File

@ -26,7 +26,7 @@ var metadataBackend = require('cartodb-redis')({
});
var batchFactory = require('../../batch');
describe('Use case 1: cancel and modify a done multiquery job', function () {
describe('Use case 10: cancel and modify a done multiquery job', function () {
var batch = batchFactory(metadataBackend);
@ -95,7 +95,7 @@ describe('Use case 1: cancel and modify a done multiquery job', function () {
status: 400
}, function(res) {
var errors = JSON.parse(res.body);
assert.equal(errors.error[0], "Job is done, cancel is not allowed");
assert.equal(errors.error[0], "Cannot set status from done to cancelled");
done();
});
});
@ -106,7 +106,10 @@ describe('Use case 1: cancel and modify a done multiquery job', function () {
headers: { 'host': 'vizzuality.cartodb.com', 'Content-Type': 'application/x-www-form-urlencoded' },
method: 'PUT',
data: querystring.stringify({
query: "SELECT cartodb_id FROM untitle_table_4"
query: [
"SELECT * FROM untitle_table_4",
"SELECT * FROM untitle_table_4"
]
})
}, {
status: 400

View File

@ -121,7 +121,7 @@ describe('Use case 2: cancel a running job', function() {
status: 400
}, function(res) {
var errors = JSON.parse(res.body);
assert.equal(errors.error[0], "Job is cancelled, cancel is not allowed");
assert.equal(errors.error[0], "Cannot set status from cancelled to cancelled");
done();
});
});

View File

@ -97,7 +97,7 @@ describe('Use case 3: cancel a pending job', function() {
}, 50);
});
it('Step 4, cancel a job should be cancelled', function (done){
it('Step 4, cancel a pending job should be cancelled', function (done){
assert.response(app, {
url: '/api/v2/sql/job/' + pendingJob.job_id + '?api_key=1234',
headers: { 'host': 'vizzuality.cartodb.com', 'Content-Type': 'application/x-www-form-urlencoded' },

View File

@ -125,7 +125,7 @@ describe('Use case 8: cancel a running multiquery job', function() {
status: 400
}, function(res) {
var errors = JSON.parse(res.body);
assert.equal(errors.error[0], "Job is cancelled, cancel is not allowed");
assert.equal(errors.error[0], "Cannot set status from cancelled to cancelled");
done();
});
});
@ -136,7 +136,12 @@ describe('Use case 8: cancel a running multiquery job', function() {
headers: { 'host': 'vizzuality.cartodb.com', 'Content-Type': 'application/x-www-form-urlencoded' },
method: 'PUT',
data: querystring.stringify({
query: "SELECT cartodb_id FROM untitle_table_4"
query: [
"select pg_sleep(1)",
"select pg_sleep(1)",
"select pg_sleep(1)",
"select pg_sleep(1)"
]
})
}, {
status: 400

View File

@ -109,14 +109,23 @@ describe('Use case 9: modify a pending multiquery job', function() {
headers: { 'host': 'vizzuality.cartodb.com', 'Content-Type': 'application/x-www-form-urlencoded' },
method: 'PUT',
data: querystring.stringify({
query: "SELECT cartodb_id FROM untitle_table_4"
query: [
"SELECT * FROM untitle_table_4",
"SELECT * FROM untitle_table_4 limit 1"
]
})
}, {
status: 200
}, function(res) {
var jobGot = JSON.parse(res.body);
assert.equal(jobGot.job_id, pendingJob.job_id);
assert.equal(jobGot.query, "SELECT cartodb_id FROM untitle_table_4");
assert.deepEqual(jobGot.query, [{
query: 'SELECT * FROM untitle_table_4',
status: 'pending'
}, {
query: 'SELECT * FROM untitle_table_4 limit 1',
status: 'pending'
}]);
done();
});
});