Job model refactor

This commit is contained in:
Daniel García Aubert 2016-05-13 18:50:55 +02:00
parent 29287e0cd6
commit cc7dd7a0d2
12 changed files with 726 additions and 561 deletions

View File

@ -37,12 +37,13 @@ function getMaxSizeErrorMessage(sql) {
);
}
function JobController(userDatabaseService, jobBackend, jobCanceller) {
function JobController(userDatabaseService, jobService) {
this.userDatabaseService = userDatabaseService;
this.jobBackend = jobBackend;
this.jobCanceller = jobCanceller;
this.jobService = jobService;
}
module.exports = JobController;
JobController.prototype.route = function (app) {
app.post(global.settings.base_url + '/sql/job', this.createJob.bind(this));
app.get(global.settings.base_url + '/sql/job', this.listJob.bind(this));
@ -84,13 +85,13 @@ JobController.prototype.cancelJob = function (req, res) {
req.profiler.done('setDBAuth');
}
self.jobCanceller.cancel(job_id, function (err, job) {
self.jobService.cancel(job_id, function (err, job) {
if (err) {
return next(err);
}
next(null, {
job: job,
job: job.serialize(),
host: userDatabase.host
});
});
@ -149,13 +150,15 @@ JobController.prototype.listJob = function (req, res) {
req.profiler.done('setDBAuth');
}
self.jobBackend.list(cdbUsername, function (err, jobs) {
self.jobService.list(cdbUsername, function (err, jobs) {
if (err) {
return next(err);
}
next(null, {
jobs: jobs,
jobs: jobs.map(function (job) {
return job.serialize();
}),
host: userDatabase.host
});
});
@ -215,13 +218,13 @@ JobController.prototype.getJob = function (req, res) {
req.profiler.done('setDBAuth');
}
self.jobBackend.get(job_id, function (err, job) {
self.jobService.get(job_id, function (err, job) {
if (err) {
return next(err);
}
next(null, {
job: job,
job: job.serialize(),
host: userDatabase.host
});
});
@ -249,23 +252,6 @@ JobController.prototype.getJob = function (req, res) {
);
};
function isValidJob(sql) {
if (_.isArray(sql)) {
for (var i = 0; i < sql.length; i++) {
if (!_.isString(sql[i])) {
return false;
}
}
return true;
}
if (!_.isString(sql)) {
return false;
}
return true;
}
JobController.prototype.createJob = function (req, res) {
// jshint maxcomplexity: 7
var self = this;
@ -274,11 +260,7 @@ JobController.prototype.createJob = function (req, res) {
var sql = (params.query === "" || _.isUndefined(params.query)) ? null : params.query;
var cdbUsername = cdbReq.userByReq(req);
if (!isValidJob(sql)) {
return handleException(new Error('You must indicate a valid SQL'), res);
}
// TODO: in job.validate()
if (reachMaxQuerySizeLimit(sql)) {
return handleException(new Error(getMaxSizeErrorMessage(sql)), res);
}
@ -308,13 +290,19 @@ JobController.prototype.createJob = function (req, res) {
req.profiler.done('setDBAuth');
}
self.jobBackend.create(cdbUsername, sql, userDatabase.host, function (err, result) {
var data = {
user: cdbUsername,
query: sql,
host: userDatabase.host
};
self.jobService.create(data, function (err, job) {
if (err) {
return next(err);
}
next(null, {
job: result,
job: job.serialize(),
host: userDatabase.host
});
});
@ -342,7 +330,6 @@ JobController.prototype.createJob = function (req, res) {
);
};
JobController.prototype.updateJob = function (req, res) {
// jshint maxcomplexity: 7
var self = this;
@ -352,10 +339,7 @@ JobController.prototype.updateJob = function (req, res) {
var sql = (params.query === "" || _.isUndefined(params.query)) ? null : params.query;
var cdbUsername = cdbReq.userByReq(req);
if (!isValidJob(sql)) {
return handleException(new Error('You must indicate a valid SQL'), res);
}
// TODO: in jobValidate
if (reachMaxQuerySizeLimit(sql)) {
return handleException(new Error(getMaxSizeErrorMessage(sql)), res);
}
@ -385,13 +369,18 @@ JobController.prototype.updateJob = function (req, res) {
req.profiler.done('setDBAuth');
}
self.jobBackend.update(job_id, sql, function (err, job) {
var data = {
job_id: job_id,
query: sql
};
self.jobService.update(data, function (err, job) {
if (err) {
return next(err);
}
next(null, {
job: job,
job: job.serialize(),
host: userDatabase.host
});
});
@ -413,9 +402,8 @@ JobController.prototype.updateJob = function (req, res) {
if (result.host) {
res.header('X-Served-By-DB-Host', result.host);
}
res.send(result.job);
}
);
};
module.exports = JobController;

View File

@ -6,15 +6,17 @@ var forever = require('./forever');
var queue = require('queue-async');
var jobStatus = require('./job_status');
function Batch(jobSubscriber, jobQueuePool, jobRunner, jobCanceller) {
function Batch(jobSubscriber, jobQueuePool, jobRunner, jobService) {
EventEmitter.call(this);
this.jobSubscriber = jobSubscriber;
this.jobQueuePool = jobQueuePool;
this.jobRunner = jobRunner;
this.jobCanceller = jobCanceller;
this.jobService = jobService;
}
util.inherits(Batch, EventEmitter);
module.exports = Batch;
Batch.prototype.start = function () {
this._subscribe();
};
@ -46,11 +48,51 @@ Batch.prototype._subscribe = function () {
});
};
Batch.prototype.drain = function (callback) {
Batch.prototype._consumeJobs = function (host, queue, callback) {
var self = this;
var queues = this.jobQueuePool.list();
queue.dequeue(host, function (err, job_id) {
if (err) {
return callback(err);
}
if (!job_id) {
var emptyQueueError = new Error('Queue ' + host + ' is empty');
emptyQueueError.name = 'EmptyQueue';
return callback(emptyQueueError);
}
self.jobQueuePool.setCurrentJobId(host, job_id);
self.jobRunner.run(job_id, function (err, job) {
self.jobQueuePool.removeCurrentJobId(host);
if (err && err.name === 'JobNotRunnable') {
console.log(err.message);
return callback();
}
if (err) {
return callback(err);
}
if (job.data.status === jobStatus.FAILED) {
console.log('Job %s %s in %s due to: %s', job_id, job.data.status, host, job.failed_reason);
} else {
console.log('Job %s %s in %s', job_id, job.data.status, host);
}
self.emit('job:' + job.data.status, job_id);
callback();
});
});
};
Batch.prototype.drain = function (callback) {
var self = this;
var queues = this.jobQueuePool.list();
var batchQueues = queue(queues.length);
queues.forEach(function (host) {
@ -80,7 +122,7 @@ Batch.prototype._drainJob = function (host, callback) {
var queue = self.jobQueuePool.getQueue(host);
this.jobCanceller.drain(job_id, function (err) {
this.jobService.drain(job_id, function (err) {
if (err && err.name === 'CancelNotAllowedError') {
return callback();
}
@ -96,46 +138,3 @@ Batch.prototype._drainJob = function (host, callback) {
Batch.prototype.stop = function () {
this.jobSubscriber.unsubscribe();
};
Batch.prototype._consumeJobs = function (host, queue, callback) {
var self = this;
queue.dequeue(host, function (err, job_id) {
if (err) {
return callback(err);
}
if (!job_id) {
var emptyQueueError = new Error('Queue ' + host + ' is empty');
emptyQueueError.name = 'EmptyQueue';
return callback(emptyQueueError);
}
self.jobQueuePool.setCurrentJobId(host, job_id);
self.jobRunner.run(job_id, function (err, job) {
self.jobQueuePool.removeCurrentJobId(host);
if (err && err.name === 'InvalidJobStatus') {
console.log(err.message);
return callback();
}
if (err) {
return callback(err);
}
if (job.status === jobStatus.FAILED) {
console.log('Job %s %s in %s due to: %s', job_id, job.status, host, job.failed_reason);
} else {
console.log('Job %s %s in %s', job_id, job.status, host);
}
self.emit('job:' + job.status, job_id);
callback();
});
});
};
module.exports = Batch;

View File

@ -12,6 +12,7 @@ var JobPublisher = require('./job_publisher');
var JobQueue = require('./job_queue');
var UserIndexer = require('./user_indexer');
var JobBackend = require('./job_backend');
var JobService = require('./job_service');
var Batch = require('./batch');
module.exports = function batchFactory (metadataBackend) {
@ -23,9 +24,11 @@ module.exports = function batchFactory (metadataBackend) {
var userIndexer = new UserIndexer(metadataBackend);
var jobBackend = new JobBackend(metadataBackend, jobQueue, jobPublisher, userIndexer);
var userDatabaseMetadataService = new UserDatabaseMetadataService(metadataBackend);
// TODO: down userDatabaseMetadataService
var queryRunner = new QueryRunner();
var jobRunner = new JobRunner(jobBackend, jobQueue, queryRunner, userDatabaseMetadataService);
var jobCanceller = new JobCanceller(metadataBackend, userDatabaseMetadataService, jobBackend);
var jobService = new JobService(jobBackend, jobCanceller);
var jobRunner = new JobRunner(jobService, jobQueue, queryRunner, userDatabaseMetadataService);
return new Batch(jobSubscriber, jobQueuePool, jobRunner, jobCanceller);
return new Batch(jobSubscriber, jobQueuePool, jobRunner, jobService);
};

View File

@ -1,22 +1,14 @@
'use strict';
var uuid = require('node-uuid');
var queue = require('queue-async');
var JOBS_TTL_IN_SECONDS = global.settings.jobs_ttl_in_seconds || 48 * 3600; // 48 hours
var jobStatus = require('./job_status');
function setPendingIfMutiqueryJob(sql) {
if (Array.isArray(sql)) {
for (var j = 0; j < sql.length; j++) {
sql[j] = {
query: sql[j],
status: jobStatus.PENDING
};
}
}
return sql;
}
var finalStatus = [
jobStatus.CANCELLED,
jobStatus.DONE,
jobStatus.FAILED,
jobStatus.UNKNOWN
];
function JobBackend(metadataBackend, jobQueueProducer, jobPublisher, userIndexer) {
this.db = 5;
@ -27,103 +19,164 @@ function JobBackend(metadataBackend, jobQueueProducer, jobPublisher, userIndexer
this.userIndexer = userIndexer;
}
JobBackend.prototype.create = function (username, sql, host, callback) {
JobBackend.prototype.toRedisParams = function (obj) {
var redisParams = [];
for (var property in obj) {
if (obj.hasOwnProperty(property)) {
redisParams.push(property);
redisParams.push(obj[property]);
}
}
return redisParams;
};
JobBackend.prototype.toObject = function (redisParams, redisValues) {
var obj = {};
for (var i = 0; i < redisParams.length; i++) {
obj[redisParams[i]] = redisValues[i];
}
return obj;
};
// TODO: is it really necessary??
function isJobFound(redisValues) {
return redisValues[0] && redisValues[1] && redisValues[2] && redisValues[3] && redisValues[4];
}
JobBackend.prototype.get = function (job_id, callback) {
var self = this;
var job_id = uuid.v4();
var now = new Date().toISOString();
sql = setPendingIfMutiqueryJob(sql);
var redisParams = [
this.redisPrefix + job_id,
'user', username,
'status', jobStatus.PENDING,
'query', JSON.stringify(sql),
'created_at', now,
'updated_at', now
'user',
'status',
'query',
'created_at',
'updated_at',
'failed_reason'
];
this.metadataBackend.redisCmd(this.db, 'HMSET', redisParams , function (err) {
this.metadataBackend.redisCmd(this.db, 'HMGET', redisParams , function (err, redisValues) {
if (err) {
return callback(err);
}
self.jobQueueProducer.enqueue(job_id, host, function (err) {
if (!isJobFound(redisValues)) {
var notFoundError = new Error('Job with id ' + job_id + ' not found');
notFoundError.name = 'NotFoundError';
return callback(notFoundError);
}
var jobData = self.toObject(redisParams.slice(1), redisValues);
jobData.job_id = job_id;
callback(null, jobData);
});
};
JobBackend.prototype.create = function (data, callback) {
var self = this;
self.get(data.job_id, function (err) {
if (err && err.name !== 'NotFoundError') {
return callback(err);
}
self.save(data, function (err, job) {
if (err) {
return callback(err);
}
// broadcast to consumers
self.jobPublisher.publish(host);
self.jobQueueProducer.enqueue(data.job_id, data.host, function (err) {
if (err) {
return callback(err);
}
self.userIndexer.add(username, job_id, function (err) {
if (err) {
return callback(err);
}
// broadcast to consumers
self.jobPublisher.publish(data.host);
self.get(job_id, callback);
self.userIndexer.add(data.user, data.job_id, function (err) {
if (err) {
return callback(err);
}
callback(null, job);
});
});
});
});
};
JobBackend.prototype.update = function (job_id, sql, callback) {
JobBackend.prototype.update = function (data, callback) {
var self = this;
this.get(job_id, function (err, job) {
self.get(data.job_id, function (err) {
if (err) {
return callback(err);
}
if (job.status !== jobStatus.PENDING) {
return callback(new Error('Job is not pending, it cannot be updated'));
self.save(data, callback);
});
};
JobBackend.prototype.save = function (data, callback) {
var self = this;
var redisParams = self.toRedisParams(data);
self.metadataBackend.redisCmd(self.db, 'HMSET', redisParams , function (err) {
if (err) {
return callback(err);
}
if (Array.isArray(job.query)) {
for (var i = 0; i < job.query.length; i++) {
if (job.query[i].status !== jobStatus.PENDING) {
return callback(new Error('Job is not pending, it cannot be updated'));
}
}
}
sql = setPendingIfMutiqueryJob(sql);
var now = new Date().toISOString();
var redisParams = [
self.redisPrefix + job_id,
'query', JSON.stringify(sql),
'updated_at', now
];
self.metadataBackend.redisCmd(self.db, 'HMSET', redisParams , function (err) {
self.setTTL(data, function (err) {
if (err) {
return callback(err);
}
self.get(job_id, callback);
});
self.get(data.job_id, function (err, job) {
if (err) {
return callback(err);
}
callback(null, job);
});
});
});
};
JobBackend.prototype.list = function (username, callback) {
function isFrozen(status) {
return finalStatus.indexOf(status) !== -1;
}
JobBackend.prototype.setTTL = function (data, callback) {
var self = this;
var redisKey = this.redisPrefix + data.job_id;
if (!isFrozen(data.status)) {
return callback();
}
self.metadataBackend.redisCmd(self.db, 'EXPIRE', [ redisKey, JOBS_TTL_IN_SECONDS ], callback);
};
JobBackend.prototype.list = function (user, callback) {
var self = this;
this.userIndexer.list(username, function (err, job_ids) {
this.userIndexer.list(user, function (err, job_ids) {
if (err) {
return callback(err);
}
var initialLength = job_ids.length;
self._getCleanedList(username, job_ids, function (err, jobs) {
self._getCleanedList(user, job_ids, function (err, jobs) {
if (err) {
return callback(err);
}
if (jobs.length < initialLength) {
return self.list(username, callback);
return self.list(user, callback);
}
callback(null, jobs);
@ -131,13 +184,13 @@ JobBackend.prototype.list = function (username, callback) {
});
};
JobBackend.prototype._getCleanedList = function (username, job_ids, callback) {
JobBackend.prototype._getCleanedList = function (user, job_ids, callback) {
var self = this;
var jobsQueue = queue(job_ids.length);
job_ids.forEach(function(job_id) {
jobsQueue.defer(self._getIndexedJob.bind(self), job_id, username);
jobsQueue.defer(self._getIndexedJob.bind(self), job_id, user);
});
jobsQueue.awaitAll(function (err, jobs) {
@ -151,13 +204,12 @@ JobBackend.prototype._getCleanedList = function (username, job_ids, callback) {
});
};
JobBackend.prototype._getIndexedJob = function (job_id, username, callback) {
JobBackend.prototype._getIndexedJob = function (job_id, user, callback) {
var self = this;
this.get(job_id, function (err, job) {
if (err && err.name === 'NotFoundError') {
return self.userIndexer.remove(username, job_id, function (err) {
return self.userIndexer.remove(user, job_id, function (err) {
if (err) {
console.error('Error removing key %s in user set', job_id, err);
}
@ -173,257 +225,4 @@ JobBackend.prototype._getIndexedJob = function (job_id, username, callback) {
});
};
JobBackend.prototype._isJobFound = function (jobValues) {
return jobValues[0] && jobValues[1] && jobValues[2] && jobValues[3] && jobValues[4];
};
JobBackend.prototype.get = function (job_id, callback) {
var self = this;
var redisParams = [
this.redisPrefix + job_id,
'user',
'status',
'query',
'created_at',
'updated_at',
'failed_reason'
];
this.metadataBackend.redisCmd(this.db, 'HMGET', redisParams , function (err, jobValues) {
if (err) {
return callback(err);
}
if (!self._isJobFound(jobValues)) {
var notFoundError = new Error('Job with id ' + job_id + ' not found');
notFoundError.name = 'NotFoundError';
return callback(notFoundError);
}
var query;
try {
query = JSON.parse(jobValues[2]);
} catch (err) {
query = jobValues[2];
}
callback(null, {
job_id: job_id,
user: jobValues[0],
status: jobValues[1],
query: query,
created_at: jobValues[3],
updated_at: jobValues[4],
failed_reason: jobValues[5] ? jobValues[5] : undefined
});
});
};
JobBackend.prototype.setRunning = function (job, index, callback) {
var self = this;
var now = new Date().toISOString();
var redisParams = [
this.redisPrefix + job.job_id,
'status', jobStatus.RUNNING,
'updated_at', now,
];
if (!callback) {
callback = index;
} else if (index >= 0 && index < job.query.length) {
job.query[index].status = jobStatus.RUNNING;
redisParams = redisParams.concat('query', JSON.stringify(job.query));
}
this.metadataBackend.redisCmd(this.db, 'HMSET', redisParams, function (err) {
if (err) {
return callback(err);
}
self.get(job.job_id, callback);
});
};
JobBackend.prototype.setPending = function (job, index, callback) {
var self = this;
var now = new Date().toISOString();
var redisKey = this.redisPrefix + job.job_id;
var redisParams = [
redisKey,
'status', jobStatus.PENDING,
'updated_at', now
];
if (!callback) {
callback = index;
} else if (index >= 0 && index < job.query.length) {
job.query[index].status = jobStatus.PENDING;
redisParams = redisParams.concat('query', JSON.stringify(job.query));
}
this.metadataBackend.redisCmd(this.db, 'HMSET', redisParams , function (err) {
if (err) {
return callback(err);
}
self.get(job.job_id, callback);
});
};
JobBackend.prototype.setDone = function (job, index, callback) {
var self = this;
var now = new Date().toISOString();
var redisKey = this.redisPrefix + job.job_id;
var redisParams = [
redisKey,
'status', jobStatus.DONE,
'updated_at', now
];
if (!callback) {
callback = index;
} else if (index >= 0 && index < job.query.length) {
job.query[index].status = jobStatus.DONE;
redisParams = redisParams.concat('query', JSON.stringify(job.query));
}
this.metadataBackend.redisCmd(this.db, 'HMSET', redisParams , function (err) {
if (err) {
return callback(err);
}
self.metadataBackend.redisCmd(self.db, 'EXPIRE', [ redisKey, JOBS_TTL_IN_SECONDS ], function (err) {
if (err) {
return callback(err);
}
self.get(job.job_id, callback);
});
});
};
JobBackend.prototype.setJobPendingAndQueryDone = function (job, index, callback) {
var self = this;
var now = new Date().toISOString();
var redisKey = this.redisPrefix + job.job_id;
job.query[index].status = jobStatus.DONE;
var redisParams = [
redisKey,
'status', jobStatus.PENDING,
'updated_at', now,
'query', JSON.stringify(job.query)
];
this.metadataBackend.redisCmd(this.db, 'HMSET', redisParams , function (err) {
if (err) {
return callback(err);
}
self.get(job.job_id, callback);
});
};
JobBackend.prototype.setFailed = function (job, error, index, callback) {
var self = this;
var now = new Date().toISOString();
var redisKey = this.redisPrefix + job.job_id;
var redisParams = [
redisKey,
'status', jobStatus.FAILED,
'failed_reason', error.message,
'updated_at', now
];
if (!callback) {
callback = index;
} else if (index >= 0 && index < job.query.length) {
job.query[index].status = jobStatus.FAILED;
job.query[index].failed_reason = error.message;
redisParams = redisParams.concat('query', JSON.stringify(job.query));
}
this.metadataBackend.redisCmd(this.db, 'HMSET', redisParams , function (err) {
if (err) {
return callback(err);
}
self.metadataBackend.redisCmd(self.db, 'EXPIRE', [ redisKey, JOBS_TTL_IN_SECONDS ], function (err) {
if (err) {
return callback(err);
}
self.get(job.job_id, callback);
});
});
};
JobBackend.prototype.setCancelled = function (job, index, callback) {
var self = this;
var now = new Date().toISOString();
var redisKey = this.redisPrefix + job.job_id;
var redisParams = [
redisKey,
'status', jobStatus.CANCELLED,
'updated_at', now
];
if (!callback) {
callback = index;
} else if (index >= 0 && index < job.query.length) {
job.query[index].status = jobStatus.CANCELLED;
redisParams = redisParams.concat('query', JSON.stringify(job.query));
}
this.metadataBackend.redisCmd(this.db, 'HMSET', redisParams , function (err) {
if (err) {
return callback(err);
}
self.metadataBackend.redisCmd(self.db, 'PERSIST', [ redisKey ], function (err) {
if (err) {
return callback(err);
}
self.get(job.job_id, callback);
});
});
};
JobBackend.prototype.setUnknown = function (job_id, callback) {
var self = this;
this.get(job_id, function (err, job) {
if (err) {
return callback(err);
}
var now = new Date().toISOString();
var redisKey = self.redisPrefix + job.job_id;
var redisParams = [
redisKey,
'status', jobStatus.UNKNOWN,
'updated_at', now
];
self.metadataBackend.redisCmd(self.db, 'HMSET', redisParams , function (err) {
if (err) {
return callback(err);
}
self.metadataBackend.redisCmd(self.db, 'EXPIRE', [ redisKey, JOBS_TTL_IN_SECONDS ], function (err) {
if (err) {
return callback(err);
}
self.get(job.job_id, callback);
});
});
});
};
module.exports = JobBackend;

137
batch/job_base.js Normal file
View File

@ -0,0 +1,137 @@
'use strict';
var assert = require('assert');
var uuid = require('node-uuid');
var jobStatus = require('./job_status');
var validStatusTransitions = [
[jobStatus.PENDING, jobStatus.RUNNING],
[jobStatus.PENDING, jobStatus.CANCELLED],
[jobStatus.PENDING, jobStatus.UNKNOWN],
[jobStatus.RUNNING, jobStatus.DONE],
[jobStatus.RUNNING, jobStatus.FAILED],
[jobStatus.RUNNING, jobStatus.CANCELLED],
[jobStatus.RUNNING, jobStatus.PENDING],
[jobStatus.RUNNING, jobStatus.UNKNOWN]
];
var mandatoryProperties = [
'job_id',
'status',
'query',
'created_at',
'updated_at',
'host',
'user'
];
function JobBase(data) {
var now = new Date().toISOString();
this.data = data;
if (!this.data.job_id) {
this.data.job_id = uuid.v4();
}
if (!this.data.created_at) {
this.data.created_at = now;
}
if (!this.data.updated_at) {
this.data.updated_at = now;
}
if (!this.data.status) {
this.data.status = jobStatus.PENDING;
}
}
module.exports = JobBase;
JobBase.isValidStatusTransition = function (initialStatus, finalStatus) {
var transition = [ initialStatus, finalStatus ];
for (var i = 0; i < validStatusTransitions.length; i++) {
try {
assert.deepEqual(transition, validStatusTransitions[i]);
return true;
} catch (e) {
continue;
}
}
return false;
};
// should be implemented by childs
JobBase.prototype.getNextQuery = function () {
throw new Error('Unimplemented method');
};
// should be implemented by childs
JobBase.prototype.hasNextQuery = function () {
throw new Error('Unimplemented method');
};
JobBase.prototype.isPending = function () {
return this.data.status === jobStatus.PENDING;
};
JobBase.prototype.isRunning = function () {
return this.data.status === jobStatus.RUNNING;
};
JobBase.prototype.isDone = function () {
return this.data.status === jobStatus.DONE;
};
JobBase.prototype.isCancelled = function () {
return this.data.status === jobStatus.CANCELLED;
};
JobBase.prototype.isFailed = function () {
return this.data.status === jobStatus.FAILED;
};
JobBase.prototype.isUnknown = function () {
return this.data.status === jobStatus.UNKNOWN;
};
JobBase.prototype.set = function (data) {
var now = new Date().toISOString();
if (data.job_id !== this.data.job_id) {
throw new Error('Cannot modify id');
}
this.data.update_at = now;
};
JobBase.prototype.setQuery = function (/* query */) {
throw new Error('Unimplemented method');
};
JobBase.prototype.setStatus = function (finalStatus) {
var initialStatus = this.data.status;
var isValid = this.isValidStatusTransition(initialStatus, finalStatus);
if (!isValid) {
throw new Error('Cannot set status from ' + initialStatus + ' to ' + finalStatus);
}
this.data.status = finalStatus;
};
JobBase.prototype.validate = function () {
for (var i = 0; i < mandatoryProperties.length; i++) {
if (!this.data[mandatoryProperties[i]]) {
throw new Error('property "' + mandatoryProperties[i] + '" is mandatory');
}
}
};
JobBase.prototype.serialize = function () {
var data = JSON.parse(JSON.stringify(this.data));
delete data.host;
return data;
};

View File

@ -1,87 +1,47 @@
'use strict';
var PSQL = require('cartodb-psql');
var jobStatus = require('./job_status');
function JobCanceller(metadataBackend, userDatabaseMetadataService, jobBackend) {
this.metadataBackend = metadataBackend;
function JobCanceller(userDatabaseMetadataService) {
this.userDatabaseMetadataService = userDatabaseMetadataService;
this.jobBackend = jobBackend;
}
function getIndexOfRunningQuery(job) {
if (Array.isArray(job.query)) {
for (var i = 0; i < job.query.length; i++) {
if (job.query[i].status === jobStatus.RUNNING) {
return i;
}
}
}
}
module.exports = JobCanceller;
JobCanceller.prototype.cancel = function (job_id, callback) {
var self = this;
self.jobBackend.get(job_id, function (err, job) {
JobCanceller.prototype.cancel = function (job, callback) {
this.userDatabaseMetadataService.getUserMetadata(job.data.user, function (err, userDatabaseMetadata) {
if (err) {
return callback(err);
}
if (job.status === jobStatus.PENDING) {
return self.jobBackend.setCancelled(job, callback);
doCancel(job.data.job_id, userDatabaseMetadata, callback);
});
};
function doCancel(job_id, userDatabaseMetadata, callback) {
var pg = new PSQL(userDatabaseMetadata, {}, { destroyOnError: true });
getQueryPID(pg, job_id, function (err, pid) {
if (err) {
return callback(err);
}
if (job.status !== jobStatus.RUNNING) {
var cancelNotAllowedError = new Error('Job is ' + job.status + ', cancel is not allowed');
cancelNotAllowedError.name = 'CancelNotAllowedError';
return callback(cancelNotAllowedError);
}
self.userDatabaseMetadataService.getUserMetadata(job.user, function (err, userDatabaseMetadata) {
cancelQuery(pg, pid, function (err, isCancelled) {
if (err) {
return callback(err);
}
self._query(job, userDatabaseMetadata, function (err, job) {
if (err) {
return callback(err);
}
if (!isCancelled) {
return callback(new Error('Query has not been cancelled'));
}
var queryIndex = getIndexOfRunningQuery(job);
self.jobBackend.setCancelled(job, queryIndex, function (err, job) {
if (err) {
return callback(err);
}
callback(null, job, queryIndex);
});
});
callback(null);
});
});
};
}
JobCanceller.prototype.drain = function (job_id, callback) {
var self = this;
this.cancel(job_id, function (err, job, queryIndex) {
if (err && err.name === 'CancelNotAllowedError') {
return callback(err);
}
if (err) {
console.error('There was an error while draining job %s, %s ', job_id, err);
return self.jobBackend.setUnknown(job_id, callback);
}
self.jobBackend.setPending(job, queryIndex, callback);
});
};
JobCanceller.prototype._query = function (job, userDatabaseMetadata, callback) {
var pg = new PSQL(userDatabaseMetadata, {}, { destroyOnError: true });
var getPIDQuery = "SELECT pid FROM pg_stat_activity WHERE query LIKE '/* " + job.job_id + " */%'";
function getQueryPID(pg, job_id, callback) {
var getPIDQuery = "SELECT pid FROM pg_stat_activity WHERE query LIKE '/* " + job_id + " */%'";
pg.query(getPIDQuery, function(err, result) {
if (err) {
@ -92,24 +52,20 @@ JobCanceller.prototype._query = function (job, userDatabaseMetadata, callback) {
return callback(new Error('Query is not running currently'));
}
var pid = result.rows[0].pid;
var cancelQuery = 'SELECT pg_cancel_backend(' + pid + ')';
pg.query(cancelQuery, function (err, result) {
if (err) {
return callback(err);
}
var isCancelled = result.rows[0].pg_cancel_backend;
if (!isCancelled) {
return callback(new Error('Query has not been cancelled'));
}
callback(null, job);
});
callback(null, result.rows[0].pid);
});
};
}
function cancelQuery(pg, pid, callback) {
var cancelQuery = 'SELECT pg_cancel_backend(' + pid + ')';
module.exports = JobCanceller;
pg.query(cancelQuery, function (err, result) {
if (err) {
return callback(err);
}
var isCancelled = result.rows[0].pg_cancel_backend;
callback(null, isCancelled);
});
}

24
batch/job_factory.js Normal file
View File

@ -0,0 +1,24 @@
'use strict';
var JobSimple = require('job_simple');
var JobMultiple = require('job_multiple');
function JobFactory() {
this.jobClasses = [ JobSimple, JobMultiple ];
}
module.exports = JobFactory;
JobFactory.create = function (data) {
if (!data.query) {
throw new Error('param "query" is mandatory');
}
for (var i = 0; i < this.jobClasses.length; i++) {
if (this.jobClasses[i].is(data.query)) {
return new this.jobClasses[i](data);
}
}
throw new Error('there is no job class for the provided query');
};

105
batch/job_multiple.js Normal file
View File

@ -0,0 +1,105 @@
'use strict';
var util = require('util');
var JobBase = require('./job_base');
var jobStatus = require('./job_status');
function JobMultiple(data) {
JobBase.call(this, data);
this.init();
}
util.inherits(JobMultiple, JobBase);
module.exports = JobMultiple;
JobMultiple.prototype.is = function (query) {
if (!Array.isArray(query)) {
return false;
}
for (var i = 0; i < query.length; i++) {
if (typeof query[i] !== 'string') {
return false;
}
}
return true;
};
JobMultiple.prototype.init = function () {
for (var i = 0; i < this.data.query.length; i++) {
this.data.query[i] = {
query: this.data.query[i],
status: jobStatus.PENDING
};
}
};
JobMultiple.prototype.isPending = function (index) {
var isPending = JobMultiple.super_.prototype.isPending.call(this);
if (isPending && index) {
isPending = this.data.query[index].status === jobStatus.PENDING;
}
return isPending;
};
JobMultiple.prototype.hasNextQuery = function () {
return !!this.getNextQuery();
};
JobMultiple.prototype.getNextQuery = function () {
if (this.isPending()) {
for (var i = 0; i < this.data.query.length; i++) {
if (this.isPending(i)) {
return this.data.query[i].query;
}
}
}
};
JobMultiple.prototype.setQuery = function (query) {
var isMultiple = this.is(query);
if (this.isPending() && isMultiple) {
this.data.query = query;
}
};
JobMultiple.prototype.setStatus = function (finalStatus) {
var initialStatus = this.data.status;
// if transition is to "done" and there are more queries to run
// then job status must be "pending" instead of "done"
// else job status transition to done (if "running")
if (finalStatus === jobStatus.DONE && this.hasNextQuery()) {
JobMultiple.super_.prototype.setStatus.call(this, jobStatus.PENDING);
} else {
JobMultiple.super_.prototype.setStatus.call(this, finalStatus);
}
for (var i = 0; i < this.data.query.length; i++) {
var isValid = JobMultiple.super_.isValidStatusTransition(this.data.query[i].status, finalStatus);
if (isValid) {
this.data.query[i].status = finalStatus;
return;
}
}
throw new Error('Cannot set status from ' + initialStatus + ' to ' + finalStatus);
};
JobMultiple.prototype.set = function (data) {
JobMultiple.super_.prototype.set.call(this, data);
if (data.status) {
this.setStatus(data.status);
}
if (data.query) {
this.setQuery(data.query);
}
};

View File

@ -3,111 +3,81 @@
var errorCodes = require('../app/postgresql/error_codes').codeToCondition;
var jobStatus = require('./job_status');
function getNextQuery(job) {
if (!Array.isArray(job.query)) {
return {
query: job.query
};
}
for (var i = 0; i < job.query.length; i++) {
if (job.query[i].status === jobStatus.PENDING) {
return {
index: i,
query: job.query[i].query
};
}
}
}
function isLastQuery(job, index) {
if (!Array.isArray(job.query)) {
return true;
}
if (index >= (job.query.length -1)) {
return true;
}
return false;
}
function JobRunner(jobBackend, jobQueue, queryRunner,userDatabaseMetadataService) {
this.jobBackend = jobBackend;
function JobRunner(jobService, jobQueue, queryRunner, userDatabaseMetadataService) {
this.jobService = jobService;
this.jobQueue = jobQueue;
this.queryRunner = queryRunner;
this.userDatabaseMetadataService = userDatabaseMetadataService;
this.userDatabaseMetadataService = userDatabaseMetadataService; // TODO: move to queryRunner
}
JobRunner.prototype.run = function (job_id, callback) {
var self = this;
self.jobBackend.get(job_id, function (err, job) {
self.jobService.get(job_id, function (err, job) {
if (err) {
return callback(err);
}
if (job.status !== jobStatus.PENDING) {
var invalidJobStatusError = new Error([
'Cannot run job',
job.job_id,
'due to its status is',
job.status
].join(' '));
invalidJobStatusError.name = 'InvalidJobStatus';
return callback(invalidJobStatusError);
try {
job.setStatus(jobStatus.RUNNING);
} catch (err) {
return callback(err);
}
var query = getNextQuery(job);
if (!query) {
var queryNotFoundError = new Error([
'Cannot run job',
job.job_id,
', there is no query to run'
].join(' '));
queryNotFoundError.name = 'QueryNotFound';
return callback(queryNotFoundError);
}
self.jobBackend.setRunning(job, query.index, function (err, job) {
self.jobService.save(job, function (err, job) {
if (err) {
return callback(err);
}
self._run(job, query, callback);
self._run(job, callback);
});
});
};
JobRunner.prototype._run = function (job, query, callback) {
JobRunner.prototype._run = function (job, callback) {
var self = this;
self.userDatabaseMetadataService.getUserMetadata(job.user, function (err, userDatabaseMetadata) {
var query = job.getNextQuery();
// TODO: move to query
self.userDatabaseMetadataService.getUserMetadata(job.data.user, function (err, userDatabaseMetadata) {
if (err) {
return callback(err);
}
self.queryRunner.run(job.job_id, query.query, userDatabaseMetadata, function (err /*, result */) {
self.queryRunner.run(job.data.job_id, query, userDatabaseMetadata, function (err /*, result */) {
if (err) {
// if query has been cancelled then it's going to get the current
// job status saved by query_canceller
if (errorCodes[err.code.toString()] === 'query_canceled') {
return self.jobBackend.get(job.job_id, callback);
return self.jobService.get(job.data.job_id, callback);
}
return self.jobBackend.setFailed(job, err, query.index, callback);
try {
job.setStatus(jobStatus.FAILED);
} catch (err) {
return callback(err);
}
return self.jobService.save(job, callback);
}
if (isLastQuery(job, query.index)) {
return self.jobBackend.setDone(job, query.index, callback);
try {
job.setStatus(jobStatus.DONE);
} catch (err) {
return callback(err);
}
self.jobBackend.setJobPendingAndQueryDone(job, query.index, function (err, job) {
self.jobService.save(job, function (err, job) {
if (err) {
return callback(err);
}
self.jobQueue.enqueue(job.job_id, userDatabaseMetadata.host, function (err){
if (job.isDone()) {
return callback(null, job);
}
self.jobQueue.enqueue(job.data.job_id, userDatabaseMetadata.host, function (err) {
if (err) {
return callback(err);
}

141
batch/job_service.js Normal file
View File

@ -0,0 +1,141 @@
'use strict';
var JobFactory = require('./job_factory');
var jobStatus = require('./job_status');
function JobService(jobBackend, jobCanceller) {
this.jobBackend = jobBackend;
this.jobCanceller = jobCanceller;
}
module.exports = JobService;
JobService.prototype.get = function (job_id, callback) {
this.jobBackend.get(job_id, function (err, data) {
if (err) {
return callback(err);
}
var job;
try {
job = JobFactory.create(data);
} catch (err) {
return callback(err);
}
callback(null, job);
});
};
JobService.prototype.list = function (user, callback) {
this.jobBackend.list(user, function (err, dataList) {
if (err) {
return callback(err);
}
var jobList = dataList.map(function (data) {
var job;
try {
job = JobFactory.create(data);
} catch (err) {
return console.err(err);
}
return job;
})
.filter(function (job) {
return job !== undefined;
});
callback(null, jobList);
});
};
JobService.prototype.create = function (data, callback) {
try {
var job = JobFactory.create(data);
job.validate();
this.jobBackend.create(job.data, callback);
} catch (err) {
return callback(err);
}
};
JobService.prototype.update = function (data, callback) {
var self = this;
self.get(data.job_id, function (err, job) {
if (err) {
return callback(err);
}
try {
job.set(data);
self.save(job, callback);
} catch (err) {
return callback(err);
}
});
};
JobService.prototype.save = function (job, callback) {
var self = this;
try {
job.validate();
self.jobBackend.update(job.data, callback);
} catch (err) {
return callback(err);
}
};
JobService.prototype.cancel = function (job_id, callback) {
var self = this;
self.get(job_id, function (err, job) {
if (err) {
return callback(err);
}
try {
job.setStatus(jobStatus.CANCELLED);
} catch (err) {
return callback(err);
}
self.jobCanceller.cancel(job, function (err) {
if (err) {
return callback(err);
}
self.jobBackend.update(job.data, callback);
});
});
};
JobService.prototype.drain = function (job_id, callback) {
var self = this;
self.get(job_id, function (err, job) {
if (err) {
return callback(err);
}
self.jobCanceller.cancel(job, function (err) {
if (err) {
// console.error('There was an error while draining job %s, %s ', job_id, err);
return callback(err);
}
try {
job.setStatus(jobStatus.PENDING);
} catch (err) {
return callback(err);
}
self.jobBackend.update(job.data, callback);
});
});
};

45
batch/job_simple.js Normal file
View File

@ -0,0 +1,45 @@
'use strict';
var util = require('util');
var JobBase = require('./job_base');
function JobSimple(data) {
JobBase.call(this, data);
}
util.inherits(JobSimple, JobBase);
module.exports = JobSimple;
JobSimple.prototype.is = function (query) {
return typeof query === 'string';
};
JobSimple.prototype.hasNextQuery = function () {
return this.isPending();
};
JobSimple.prototype.getNextQuery = function () {
if (this.hasNextQuery()) {
return this.data.query;
}
};
JobSimple.prototype.setQuery = function (query) {
var isSimple = this.is(query);
if (this.isPending() && isSimple) {
this.data.query = query;
}
};
JobSimple.prototype.set = function (data) {
JobSimple.super_.prototype.set.call(this, data);
if (data.status) {
this.setStatus(data.status);
}
if (data.query) {
this.setQuery(data.query);
}
};

View File

@ -5,8 +5,9 @@ var PSQL = require('cartodb-psql');
function QueryRunner() {
}
QueryRunner.prototype.run = function (job_id, sql, userDatabaseMetadata, callback) {
module.exports = QueryRunner;
QueryRunner.prototype.run = function (job_id, sql, userDatabaseMetadata, callback) {
var pg = new PSQL(userDatabaseMetadata, {}, { destroyOnError: true });
pg.query('SET statement_timeout=0', function (err) {
@ -35,6 +36,3 @@ QueryRunner.prototype.run = function (job_id, sql, userDatabaseMetadata, callbac
});
};
module.exports = QueryRunner;