2015-12-07 16:40:51 +08:00
|
|
|
'use strict';
|
|
|
|
|
|
|
|
var _ = require('underscore');
|
|
|
|
var step = require('step');
|
|
|
|
var assert = require('assert');
|
|
|
|
|
|
|
|
var UserDatabaseService = require('../services/user_database_service');
|
2015-12-16 22:57:58 +08:00
|
|
|
var JobPublisher = require('../../batch/job_publisher');
|
|
|
|
var JobQueueProducer = require('../../batch/job_queue_producer');
|
2015-12-22 02:57:10 +08:00
|
|
|
var JobBackend = require('../../batch/job_backend');
|
2015-12-07 16:40:51 +08:00
|
|
|
var CdbRequest = require('../models/cartodb_request');
|
|
|
|
var handleException = require('../utils/error_handler');
|
|
|
|
|
|
|
|
var cdbReq = new CdbRequest();
|
|
|
|
var userDatabaseService = new UserDatabaseService();
|
2015-12-16 22:57:58 +08:00
|
|
|
var jobPublisher = new JobPublisher();
|
2015-12-07 16:40:51 +08:00
|
|
|
|
|
|
|
function JobController(metadataBackend, tableCache, statsd_client) {
|
|
|
|
this.metadataBackend = metadataBackend;
|
|
|
|
this.tableCache = tableCache;
|
|
|
|
this.statsd_client = statsd_client;
|
2015-12-16 22:57:58 +08:00
|
|
|
this.jobQueueProducer = new JobQueueProducer(metadataBackend);
|
2015-12-22 02:57:10 +08:00
|
|
|
this.jobBackend = new JobBackend(metadataBackend);
|
2015-12-07 16:40:51 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
JobController.prototype.route = function (app) {
|
2015-12-22 02:57:10 +08:00
|
|
|
app.post(global.settings.base_url + '/job', this.createJob.bind(this));
|
|
|
|
// app.get(global.settings.base_url + '/job:jobId', this.getJob.bind(this));
|
2015-12-07 16:40:51 +08:00
|
|
|
};
|
|
|
|
|
2015-12-22 02:57:10 +08:00
|
|
|
// JobController.prototype.getJob = function (req, res) {
|
|
|
|
//
|
|
|
|
// };
|
2015-12-22 18:02:16 +08:00
|
|
|
|
2015-12-07 16:40:51 +08:00
|
|
|
// jshint maxcomplexity:21
|
2015-12-22 02:57:10 +08:00
|
|
|
JobController.prototype.createJob = function (req, res) {
|
2015-12-07 16:40:51 +08:00
|
|
|
var self = this;
|
|
|
|
var body = (req.body) ? req.body : {};
|
|
|
|
var params = _.extend({}, req.query, body); // clone so don't modify req.params or req.body so oauth is not broken
|
2015-12-22 02:57:10 +08:00
|
|
|
var sql = (params.query === "" || _.isUndefined(params.query)) ? null : params.query;
|
2015-12-07 16:40:51 +08:00
|
|
|
var cdbUsername = cdbReq.userByReq(req);
|
|
|
|
|
|
|
|
if (!_.isString(sql)) {
|
|
|
|
return handleException(new Error("You must indicate a sql query"), res);
|
|
|
|
}
|
|
|
|
|
|
|
|
if ( req.profiler ) {
|
|
|
|
req.profiler.start('sqlapi.job');
|
|
|
|
}
|
|
|
|
|
|
|
|
req.aborted = false;
|
|
|
|
req.on("close", function() {
|
|
|
|
if (req.formatter && _.isFunction(req.formatter.cancel)) {
|
|
|
|
req.formatter.cancel();
|
|
|
|
}
|
|
|
|
req.aborted = true; // TODO: there must be a builtin way to check this
|
|
|
|
});
|
|
|
|
|
|
|
|
function checkAborted(step) {
|
|
|
|
if ( req.aborted ) {
|
|
|
|
var err = new Error("Request aborted during " + step);
|
|
|
|
// We'll use status 499, same as ngnix in these cases
|
|
|
|
// see http://en.wikipedia.org/wiki/List_of_HTTP_status_codes#4xx_Client_Error
|
|
|
|
err.http_status = 499;
|
|
|
|
throw err;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if ( req.profiler ) {
|
|
|
|
req.profiler.done('init');
|
|
|
|
}
|
|
|
|
|
|
|
|
step(
|
|
|
|
function getUserDBInfo() {
|
|
|
|
var options = {
|
|
|
|
req: req,
|
|
|
|
params: params,
|
|
|
|
checkAborted: checkAborted,
|
|
|
|
metadataBackend: self.metadataBackend,
|
|
|
|
cdbUsername: cdbUsername
|
|
|
|
};
|
|
|
|
userDatabaseService.getUserDatabase(options, this);
|
|
|
|
},
|
2015-12-09 07:02:08 +08:00
|
|
|
function persistJob(err, userDatabase) {
|
2015-12-07 16:40:51 +08:00
|
|
|
assert.ifError(err);
|
|
|
|
|
|
|
|
var next = this;
|
|
|
|
|
|
|
|
checkAborted('enqueueJob');
|
|
|
|
|
|
|
|
if ( req.profiler ) {
|
|
|
|
req.profiler.done('setDBAuth');
|
|
|
|
}
|
|
|
|
|
2015-12-22 02:57:10 +08:00
|
|
|
self.jobBackend.create(cdbUsername, sql, function (err, result) {
|
2015-12-07 16:40:51 +08:00
|
|
|
if (err) {
|
|
|
|
return next(err);
|
|
|
|
}
|
2015-12-09 07:02:08 +08:00
|
|
|
|
2015-12-07 16:40:51 +08:00
|
|
|
next(null, {
|
|
|
|
job: result,
|
2015-12-09 07:02:08 +08:00
|
|
|
userDatabase: userDatabase
|
|
|
|
});
|
|
|
|
});
|
|
|
|
},
|
2015-12-22 02:57:10 +08:00
|
|
|
function enqueueJob(err, result) {
|
2015-12-09 07:02:08 +08:00
|
|
|
assert.ifError(err);
|
|
|
|
|
|
|
|
var next = this;
|
|
|
|
|
2015-12-22 02:57:10 +08:00
|
|
|
self.jobQueueProducer.enqueue(result.job.jobId, result.userDatabase.host, function (err) {
|
2015-12-09 07:02:08 +08:00
|
|
|
if (err) {
|
|
|
|
return next(err);
|
|
|
|
}
|
|
|
|
|
2015-12-17 01:13:48 +08:00
|
|
|
// broadcast to consumers
|
2015-12-16 22:57:58 +08:00
|
|
|
jobPublisher.publish(result.userDatabase.host);
|
|
|
|
|
2015-12-09 07:02:08 +08:00
|
|
|
next(null, {
|
|
|
|
job: result.job,
|
|
|
|
host: result.userDatabase.host
|
2015-12-07 16:40:51 +08:00
|
|
|
});
|
|
|
|
});
|
|
|
|
},
|
|
|
|
function handleResponse(err, result) {
|
|
|
|
if ( err ) {
|
|
|
|
handleException(err, res);
|
|
|
|
}
|
|
|
|
|
|
|
|
if ( req.profiler ) {
|
|
|
|
req.profiler.done('enqueueJob');
|
|
|
|
res.header('X-SQLAPI-Profiler', req.profiler.toJSONString());
|
|
|
|
}
|
|
|
|
|
|
|
|
if (global.settings.api_hostname) {
|
|
|
|
res.header('X-Served-By-Host', global.settings.api_hostname);
|
|
|
|
}
|
|
|
|
|
|
|
|
if (result.host) {
|
|
|
|
res.header('X-Served-By-DB-Host', result.host);
|
|
|
|
}
|
2015-12-07 18:29:55 +08:00
|
|
|
res.send(result.job);
|
2015-12-07 16:40:51 +08:00
|
|
|
}
|
|
|
|
);
|
|
|
|
};
|
|
|
|
|
|
|
|
module.exports = JobController;
|