Implemented job cancelation in Batch API
This commit is contained in:
parent
41a01bc0e3
commit
ada39d84b8
@ -9,6 +9,9 @@ var JobPublisher = require('../../batch/job_publisher');
|
|||||||
var JobQueue = require('../../batch/job_queue');
|
var JobQueue = require('../../batch/job_queue');
|
||||||
var UserIndexer = require('../../batch/user_indexer');
|
var UserIndexer = require('../../batch/user_indexer');
|
||||||
var JobBackend = require('../../batch/job_backend');
|
var JobBackend = require('../../batch/job_backend');
|
||||||
|
var JobCanceller = require('../../batch/job_canceller');
|
||||||
|
var UserDatabaseMetadataService = require('../../batch/user_database_metadata_service');
|
||||||
|
|
||||||
var CdbRequest = require('../models/cartodb_request');
|
var CdbRequest = require('../models/cartodb_request');
|
||||||
var handleException = require('../utils/error_handler');
|
var handleException = require('../utils/error_handler');
|
||||||
|
|
||||||
@ -24,12 +27,116 @@ function JobController(metadataBackend, tableCache, statsd_client) {
|
|||||||
this.tableCache = tableCache;
|
this.tableCache = tableCache;
|
||||||
this.statsd_client = statsd_client;
|
this.statsd_client = statsd_client;
|
||||||
this.jobBackend = new JobBackend(metadataBackend, jobQueue, jobPublisher, userIndexer);
|
this.jobBackend = new JobBackend(metadataBackend, jobQueue, jobPublisher, userIndexer);
|
||||||
|
this.userDatabaseMetadataService = new UserDatabaseMetadataService(metadataBackend);
|
||||||
}
|
}
|
||||||
|
|
||||||
JobController.prototype.route = function (app) {
|
JobController.prototype.route = function (app) {
|
||||||
app.post(global.settings.base_url + '/job', this.createJob.bind(this));
|
app.post(global.settings.base_url + '/job', this.createJob.bind(this));
|
||||||
app.get(global.settings.base_url + '/job', this.listJob.bind(this));
|
app.get(global.settings.base_url + '/job', this.listJob.bind(this));
|
||||||
app.get(global.settings.base_url + '/job/:job_id', this.getJob.bind(this));
|
app.get(global.settings.base_url + '/job/:job_id', this.getJob.bind(this));
|
||||||
|
app.delete(global.settings.base_url + '/job/:job_id', this.cancelJob.bind(this));
|
||||||
|
};
|
||||||
|
|
||||||
|
JobController.prototype.cancelJob = function (req, res) {
|
||||||
|
var self = this;
|
||||||
|
var job_id = req.params.job_id;
|
||||||
|
var body = (req.body) ? req.body : {};
|
||||||
|
var params = _.extend({}, req.query, body); // clone so don't modify req.params or req.body so oauth is not broken
|
||||||
|
var cdbUsername = cdbReq.userByReq(req);
|
||||||
|
|
||||||
|
if (!_.isString(job_id)) {
|
||||||
|
return handleException(new Error("You must indicate a job id"), res);
|
||||||
|
}
|
||||||
|
|
||||||
|
if ( req.profiler ) {
|
||||||
|
req.profiler.start('sqlapi.job');
|
||||||
|
}
|
||||||
|
|
||||||
|
req.aborted = false;
|
||||||
|
req.on("close", function() {
|
||||||
|
if (req.formatter && _.isFunction(req.formatter.cancel)) {
|
||||||
|
req.formatter.cancel();
|
||||||
|
}
|
||||||
|
req.aborted = true; // TODO: there must be a builtin way to check this
|
||||||
|
});
|
||||||
|
|
||||||
|
function checkAborted(step) {
|
||||||
|
if ( req.aborted ) {
|
||||||
|
var err = new Error("Request aborted during " + step);
|
||||||
|
// We'll use status 499, same as ngnix in these cases
|
||||||
|
// see http://en.wikipedia.org/wiki/List_of_HTTP_status_codes#4xx_Client_Error
|
||||||
|
err.http_status = 499;
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if ( req.profiler ) {
|
||||||
|
req.profiler.done('init');
|
||||||
|
}
|
||||||
|
|
||||||
|
step(
|
||||||
|
function getUserDBInfo() {
|
||||||
|
var options = {
|
||||||
|
req: req,
|
||||||
|
params: params,
|
||||||
|
checkAborted: checkAborted,
|
||||||
|
metadataBackend: self.metadataBackend,
|
||||||
|
cdbUsername: cdbUsername
|
||||||
|
};
|
||||||
|
userDatabaseService.getUserDatabase(options, this);
|
||||||
|
},
|
||||||
|
function cancelJob(err, userDatabase) {
|
||||||
|
assert.ifError(err);
|
||||||
|
|
||||||
|
if (!userDatabase.authenticated) {
|
||||||
|
throw new Error('permission denied');
|
||||||
|
}
|
||||||
|
|
||||||
|
var next = this;
|
||||||
|
|
||||||
|
checkAborted('cancelJob');
|
||||||
|
|
||||||
|
if ( req.profiler ) {
|
||||||
|
req.profiler.done('setDBAuth');
|
||||||
|
}
|
||||||
|
|
||||||
|
var jobCanceller = new JobCanceller(self.metadataBackend, self.userDatabaseMetadataService);
|
||||||
|
|
||||||
|
jobCanceller.cancel(job_id)
|
||||||
|
.on('cancelled', function () {
|
||||||
|
next(null, {
|
||||||
|
cancelled: true,
|
||||||
|
host: userDatabase.host
|
||||||
|
});
|
||||||
|
})
|
||||||
|
.on('error', function (err) {
|
||||||
|
next(err);
|
||||||
|
});
|
||||||
|
|
||||||
|
},
|
||||||
|
function handleResponse(err, result) {
|
||||||
|
if ( err ) {
|
||||||
|
return handleException(err, res);
|
||||||
|
}
|
||||||
|
|
||||||
|
if ( req.profiler ) {
|
||||||
|
req.profiler.done('cancelJob');
|
||||||
|
res.header('X-SQLAPI-Profiler', req.profiler.toJSONString());
|
||||||
|
}
|
||||||
|
|
||||||
|
if (global.settings.api_hostname) {
|
||||||
|
res.header('X-Served-By-Host', global.settings.api_hostname);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (result.host) {
|
||||||
|
res.header('X-Served-By-DB-Host', result.host);
|
||||||
|
}
|
||||||
|
|
||||||
|
res.send({
|
||||||
|
cancelled: result.cancelled
|
||||||
|
});
|
||||||
|
}
|
||||||
|
);
|
||||||
};
|
};
|
||||||
|
|
||||||
JobController.prototype.listJob = function (req, res) {
|
JobController.prototype.listJob = function (req, res) {
|
||||||
@ -97,7 +204,7 @@ JobController.prototype.listJob = function (req, res) {
|
|||||||
|
|
||||||
next(null, {
|
next(null, {
|
||||||
jobs: jobs,
|
jobs: jobs,
|
||||||
userDatabase: userDatabase
|
host: userDatabase.host
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
},
|
},
|
||||||
@ -194,7 +301,7 @@ JobController.prototype.getJob = function (req, res) {
|
|||||||
|
|
||||||
next(null, {
|
next(null, {
|
||||||
job: job,
|
job: job,
|
||||||
userDatabase: userDatabase
|
host: userDatabase.host
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
},
|
},
|
||||||
|
@ -41,15 +41,16 @@ Batch.prototype.start = function () {
|
|||||||
self.jobRunner.run(job_id)
|
self.jobRunner.run(job_id)
|
||||||
.on('done', function (job) {
|
.on('done', function (job) {
|
||||||
console.log('Job %s done in %s', job_id, host);
|
console.log('Job %s done in %s', job_id, host);
|
||||||
self.emit('job:done', job_id);
|
self.emit('job:done', job.job_id);
|
||||||
consume(queue); // recursive call
|
consume(queue); // recursive call
|
||||||
})
|
})
|
||||||
.on('failed', function (job) {
|
.on('failed', function (job) {
|
||||||
console.log('Job %s done in %s', job_id, host);
|
console.log('Job %s done in %s', job_id, host);
|
||||||
self.emit('job:failed', job_id);
|
self.emit('job:failed', job.job_id);
|
||||||
consume(queue); // recursive call
|
consume(queue); // recursive call
|
||||||
})
|
})
|
||||||
.on('error', function (err) {
|
.on('error', function (err) {
|
||||||
|
console.error('Error in job ', err.message || err);
|
||||||
self.emit('job:failed', job_id);
|
self.emit('job:failed', job_id);
|
||||||
self.jobQueuePool.remove(host);
|
self.jobQueuePool.remove(host);
|
||||||
});
|
});
|
||||||
|
@ -163,4 +163,22 @@ JobBackend.prototype.setFailed = function (job, err) {
|
|||||||
});
|
});
|
||||||
};
|
};
|
||||||
|
|
||||||
|
JobBackend.prototype.setCancelled = function (job) {
|
||||||
|
var self = this;
|
||||||
|
var redisParams = [
|
||||||
|
this.redisPrefix + job.job_id,
|
||||||
|
'status', 'cancelled',
|
||||||
|
'updated_at', new Date().toISOString()
|
||||||
|
];
|
||||||
|
|
||||||
|
this.metadataBackend.redisCmd(this.db, 'HMSET', redisParams , function (err) {
|
||||||
|
if (err) {
|
||||||
|
return self.emit('error', err);
|
||||||
|
}
|
||||||
|
|
||||||
|
self.emit('cancelled', job);
|
||||||
|
});
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
module.exports = JobBackend;
|
module.exports = JobBackend;
|
||||||
|
79
batch/job_canceller.js
Normal file
79
batch/job_canceller.js
Normal file
@ -0,0 +1,79 @@
|
|||||||
|
'use strict';
|
||||||
|
|
||||||
|
var JobBackend = require('./job_backend');
|
||||||
|
var PSQL = require('cartodb-psql');
|
||||||
|
var JobPublisher = require('./job_publisher');
|
||||||
|
var JobQueue = require('./job_queue');
|
||||||
|
var UserIndexer = require('./user_indexer');
|
||||||
|
|
||||||
|
function JobCanceller(metadataBackend, userDatabaseMetadataService) {
|
||||||
|
this.metadataBackend = metadataBackend;
|
||||||
|
this.userDatabaseMetadataService = userDatabaseMetadataService;
|
||||||
|
}
|
||||||
|
|
||||||
|
JobCanceller.prototype.cancel = function (job_id) {
|
||||||
|
var self = this;
|
||||||
|
var jobQueue = new JobQueue(this.metadataBackend);
|
||||||
|
var jobPublisher = new JobPublisher();
|
||||||
|
var userIndexer = new UserIndexer(this.metadataBackend);
|
||||||
|
var jobBackend = new JobBackend(this.metadataBackend, jobQueue, jobPublisher, userIndexer);
|
||||||
|
|
||||||
|
jobBackend.get(job_id, function (err, job) {
|
||||||
|
if (err) {
|
||||||
|
return jobBackend.emit('error', err);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (job.status === 'pending') {
|
||||||
|
return jobBackend.setCancelled(job);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (job.status !== 'running') {
|
||||||
|
return jobBackend.emit('error', new Error('Job has been dispatched previously, nothing to do'));
|
||||||
|
}
|
||||||
|
|
||||||
|
self.userDatabaseMetadataService.getUserMetadata(job.user, function (err, userDatabaseMetadata) {
|
||||||
|
if (err) {
|
||||||
|
return jobBackend.emit('error', err);
|
||||||
|
}
|
||||||
|
|
||||||
|
var pg = new PSQL(userDatabaseMetadata, {}, { destroyOnError: true });
|
||||||
|
|
||||||
|
var getPIDQuery = 'SELECT pid FROM pg_stat_activity WHERE query = \'' +
|
||||||
|
job.query +
|
||||||
|
' /* ' + job.job_id + ' */\'';
|
||||||
|
|
||||||
|
pg.query(getPIDQuery, function(err, result) {
|
||||||
|
if(err) {
|
||||||
|
return jobBackend.emit('error', err);
|
||||||
|
}
|
||||||
|
|
||||||
|
var pid = result.rows[0].pid;
|
||||||
|
|
||||||
|
if (!pid) {
|
||||||
|
return jobBackend.emit('error', new Error('Query not running currently'));
|
||||||
|
}
|
||||||
|
|
||||||
|
var cancelQuery = 'SELECT pg_cancel_backend(' + pid +')';
|
||||||
|
|
||||||
|
pg.query(cancelQuery, function (err, result) {
|
||||||
|
if (err) {
|
||||||
|
return jobBackend.emit('error', err);
|
||||||
|
}
|
||||||
|
|
||||||
|
var isCancelled = result.rows[0].pg_cancel_backend;
|
||||||
|
|
||||||
|
if (!isCancelled) {
|
||||||
|
return jobBackend.emit('error', new Error('Query has not been cancelled'));
|
||||||
|
}
|
||||||
|
|
||||||
|
jobBackend.emit('cancelled');
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
return jobBackend;
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
|
module.exports = JobCanceller;
|
@ -3,15 +3,15 @@
|
|||||||
function JobQueue(metadataBackend) {
|
function JobQueue(metadataBackend) {
|
||||||
this.metadataBackend = metadataBackend;
|
this.metadataBackend = metadataBackend;
|
||||||
this.db = 5;
|
this.db = 5;
|
||||||
this.prefixRedis = 'batch:queues:';
|
this.redisPrefix = 'batch:queues:';
|
||||||
}
|
}
|
||||||
|
|
||||||
JobQueue.prototype.enqueue = function (job_id, host, callback) {
|
JobQueue.prototype.enqueue = function (job_id, host, callback) {
|
||||||
this.metadataBackend.redisCmd(this.db, 'LPUSH', [ this.prefixRedis + host, job_id ], callback);
|
this.metadataBackend.redisCmd(this.db, 'LPUSH', [ this.redisPrefix + host, job_id ], callback);
|
||||||
};
|
};
|
||||||
|
|
||||||
JobQueue.prototype.dequeue = function (host, callback) {
|
JobQueue.prototype.dequeue = function (host, callback) {
|
||||||
this.metadataBackend.redisCmd(this.db, 'RPOP', [ this.prefixRedis + host ], callback);
|
this.metadataBackend.redisCmd(this.db, 'RPOP', [ this.redisPrefix + host ], callback);
|
||||||
};
|
};
|
||||||
|
|
||||||
module.exports = JobQueue;
|
module.exports = JobQueue;
|
||||||
|
@ -23,6 +23,11 @@ JobRunner.prototype.run = function (job_id) {
|
|||||||
return jobBackend.emit('error', err);
|
return jobBackend.emit('error', err);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (job.status !== 'pending') {
|
||||||
|
return jobBackend.emit('error',
|
||||||
|
new Error('Cannot run job ' + job.job_id + ' due to its status is ' + job.status));
|
||||||
|
}
|
||||||
|
|
||||||
self.userDatabaseMetadataService.getUserMetadata(job.user, function (err, userDatabaseMetadata) {
|
self.userDatabaseMetadataService.getUserMetadata(job.user, function (err, userDatabaseMetadata) {
|
||||||
if (err) {
|
if (err) {
|
||||||
return jobBackend.emit('error', err);
|
return jobBackend.emit('error', err);
|
||||||
@ -37,7 +42,10 @@ JobRunner.prototype.run = function (job_id) {
|
|||||||
return jobBackend.setFailed(job, err);
|
return jobBackend.setFailed(job, err);
|
||||||
}
|
}
|
||||||
|
|
||||||
pg.eventedQuery(job.query, function (err, query /* , queryCanceller */) {
|
// mark query to allow to users cancel their queries whether users request for that
|
||||||
|
var sql = job.query + ' /* ' + job.job_id + ' */';
|
||||||
|
|
||||||
|
pg.eventedQuery(sql, function (err, query /* , queryCanceller */) {
|
||||||
if (err) {
|
if (err) {
|
||||||
return jobBackend.setFailed(job, err);
|
return jobBackend.setFailed(job, err);
|
||||||
}
|
}
|
||||||
|
@ -3,15 +3,15 @@
|
|||||||
function UserIndexer(metadataBackend) {
|
function UserIndexer(metadataBackend) {
|
||||||
this.metadataBackend = metadataBackend;
|
this.metadataBackend = metadataBackend;
|
||||||
this.db = 5;
|
this.db = 5;
|
||||||
this.prefixRedis = 'batch:users:';
|
this.redisPrefix = 'batch:users:';
|
||||||
}
|
}
|
||||||
|
|
||||||
UserIndexer.prototype.add = function (username, job_id, callback) {
|
UserIndexer.prototype.add = function (username, job_id, callback) {
|
||||||
this.metadataBackend.redisCmd(this.db, 'SADD', [ this.prefixRedis + username, job_id ] , callback);
|
this.metadataBackend.redisCmd(this.db, 'SADD', [ this.redisPrefix + username, job_id ] , callback);
|
||||||
};
|
};
|
||||||
|
|
||||||
UserIndexer.prototype.list = function (username, callback) {
|
UserIndexer.prototype.list = function (username, callback) {
|
||||||
this.metadataBackend.redisCmd(this.db, 'SMEMBERS', [ this.prefixRedis + username ] , callback);
|
this.metadataBackend.redisCmd(this.db, 'SMEMBERS', [ this.redisPrefix + username ] , callback);
|
||||||
};
|
};
|
||||||
|
|
||||||
module.exports = UserIndexer;
|
module.exports = UserIndexer;
|
||||||
|
Loading…
Reference in New Issue
Block a user