Implemented multi-jobs, user is able to send an array of jobs and batch service will run them in series

This commit is contained in:
Daniel García Aubert 2016-03-18 14:57:18 +01:00
parent dbc7e27f94
commit 056f22b156
4 changed files with 84 additions and 8 deletions

View File

@ -221,6 +221,23 @@ JobController.prototype.getJob = function (req, res) {
);
};
function isValidJob(sql) {
if (_.isArray(sql)) {
for (var i = 0; i < sql.length; i++) {
if (!_.isString(sql[i])) {
return false;
}
}
return true;
}
if (!_.isString(sql)) {
return false;
}
return true;
}
JobController.prototype.createJob = function (req, res) {
var self = this;
var body = (req.body) ? req.body : {};
@ -228,8 +245,8 @@ JobController.prototype.createJob = function (req, res) {
var sql = (params.query === "" || _.isUndefined(params.query)) ? null : params.query;
var cdbUsername = cdbReq.userByReq(req);
if (!_.isString(sql)) {
return handleException(new Error("You must indicate a sql query"), res);
if (!isValidJob(sql)) {
return handleException(new Error("You must indicate a valid SQL query"), res);
}
if ( req.profiler ) {
@ -300,7 +317,7 @@ JobController.prototype.updateJob = function (req, res) {
var sql = (params.query === "" || _.isUndefined(params.query)) ? null : params.query;
var cdbUsername = cdbReq.userByReq(req);
if (!_.isString(sql)) {
if (!isValidJob(sql)) {
return handleException(new Error("You must indicate a sql query"), res);
}

View File

@ -21,7 +21,7 @@ JobBackend.prototype.create = function (username, sql, host, callback) {
this.redisPrefix + job_id,
'user', username,
'status', 'pending',
'query', sql,
'query', JSON.stringify(sql),
'created_at', now,
'updated_at', now
];
@ -99,6 +99,8 @@ JobBackend.prototype.list = function (username, callback) {
return self.list(username, callback);
}
callback(null, jobs);
});
});
@ -173,11 +175,19 @@ JobBackend.prototype.get = function (job_id, callback) {
return callback(notFoundError);
}
var query;
try {
query = JSON.parse(jobValues[2]);
} catch (err) {
query = jobValues[2];
}
callback(null, {
job_id: job_id,
user: jobValues[0],
status: jobValues[1],
query: jobValues[2],
query: query,
created_at: jobValues[3],
updated_at: jobValues[4],
failed_reason: jobValues[5] ? jobValues[5] : undefined

View File

@ -2,6 +2,7 @@
var errorCodes = require('../app/postgresql/error_codes').codeToCondition;
var PSQL = require('cartodb-psql');
var queue = require('queue-async');
function JobRunner(jobBackend, userDatabaseMetadataService) {
@ -33,13 +34,38 @@ JobRunner.prototype.run = function (job_id, callback) {
return callback(err);
}
self._query(job, userDatabaseMetadata, callback);
self._series(job, userDatabaseMetadata, callback);
});
});
});
};
JobRunner.prototype._query = function (job, userDatabaseMetadata, callback) {
JobRunner.prototype._series = function(job, userDatabaseMetadata, callback) {
var jobQueue = queue(1); // performs in series
if (!Array.isArray(job.query)) {
job.query = [ job.query ];
}
for (var i = 0; i < job.query.length; i++) {
jobQueue.defer(this._query.bind(this), job, userDatabaseMetadata, i);
}
jobQueue.await(function (err, result) {
if (err) {
return callback(err);
}
// last result is the good one
if (Array.isArray(result)) {
return callback(null, result[result.length - 1]);
}
callback(null, result);
})
};
JobRunner.prototype._query = function (job, userDatabaseMetadata, index, callback) {
var self = this;
var pg = new PSQL(userDatabaseMetadata, {}, { destroyOnError: true });
@ -50,7 +76,7 @@ JobRunner.prototype._query = function (job, userDatabaseMetadata, callback) {
}
// mark query to allow to users cancel their queries whether users request for it
var sql = job.query + ' /* ' + job.job_id + ' */';
var sql = job.query[index] + ' /* ' + job.job_id + ' */';
pg.eventedQuery(sql, function (err, query) {
if (err) {

View File

@ -195,4 +195,27 @@ describe('batch module', function() {
});
});
it.skip('should perform job with array of select', function (done) {
var jobs = ['select * from private_table', 'select * from private_table'];
var queriesDone = 0
createJob(jobs, function (err, job) {
if (err) {
return done(err);
}
batch.on('job:done', function (job_id) {
if (job_id === job.job_id) {
queriesDone++;
if (queriesDone === jobs.length) {
done();
}
}
});
});
});
});