Implemented job controller to enqueue jobs

This commit is contained in:
Daniel García Aubert 2015-12-07 09:40:51 +01:00
parent a0895d4310
commit a1243ad64d
7 changed files with 330 additions and 86 deletions

View File

@ -23,6 +23,7 @@ var LRU = require('lru-cache');
var GenericController = require('./controllers/generic_controller'); var GenericController = require('./controllers/generic_controller');
var QueryController = require('./controllers/query_controller'); var QueryController = require('./controllers/query_controller');
var JobController = require('./controllers/job_controller');
var CacheStatusController = require('./controllers/cache_status_controller'); var CacheStatusController = require('./controllers/cache_status_controller');
var HealthCheckController = require('./controllers/health_check_controller'); var HealthCheckController = require('./controllers/health_check_controller');
var VersionController = require('./controllers/version_controller'); var VersionController = require('./controllers/version_controller');
@ -165,6 +166,9 @@ function App() {
var queryController = new QueryController(metadataBackend, tableCache, statsd_client); var queryController = new QueryController(metadataBackend, tableCache, statsd_client);
queryController.route(app); queryController.route(app);
var jobController = new JobController(metadataBackend, tableCache, statsd_client);
jobController.route(app);
var cacheStatusController = new CacheStatusController(tableCache); var cacheStatusController = new CacheStatusController(tableCache);
cacheStatusController.route(app); cacheStatusController.route(app);

View File

@ -0,0 +1,134 @@
'use strict';
var _ = require('underscore');
var step = require('step');
var assert = require('assert');
var PSQL = require('cartodb-psql');
var uuid = require('uuid');
var UserDatabaseService = require('../services/user_database_service');
var CdbRequest = require('../models/cartodb_request');
var handleException = require('../utils/error_handler');
var cdbReq = new CdbRequest();
var userDatabaseService = new UserDatabaseService();
function JobController(metadataBackend, tableCache, statsd_client) {
this.metadataBackend = metadataBackend;
this.tableCache = tableCache;
this.statsd_client = statsd_client;
}
JobController.prototype.route = function (app) {
app.all(global.settings.base_url + '/job', this.handleJob.bind(this));
};
// jshint maxcomplexity:21
JobController.prototype.handleJob = function (req, res) {
var self = this;
var body = (req.body) ? req.body : {};
var params = _.extend({}, req.query, body); // clone so don't modify req.params or req.body so oauth is not broken
var sql = (params.q === "" || _.isUndefined(params.q)) ? null : params.q;
var cdbUsername = cdbReq.userByReq(req);
if (!_.isString(sql)) {
return handleException(new Error("You must indicate a sql query"), res);
}
if ( req.profiler ) {
req.profiler.start('sqlapi.job');
}
req.aborted = false;
req.on("close", function() {
if (req.formatter && _.isFunction(req.formatter.cancel)) {
req.formatter.cancel();
}
req.aborted = true; // TODO: there must be a builtin way to check this
});
function checkAborted(step) {
if ( req.aborted ) {
var err = new Error("Request aborted during " + step);
// We'll use status 499, same as ngnix in these cases
// see http://en.wikipedia.org/wiki/List_of_HTTP_status_codes#4xx_Client_Error
err.http_status = 499;
throw err;
}
}
var pg;
if ( req.profiler ) {
req.profiler.done('init');
}
step(
function getUserDBInfo() {
var options = {
req: req,
params: params,
checkAborted: checkAborted,
metadataBackend: self.metadataBackend,
cdbUsername: cdbUsername
};
userDatabaseService.getUserDatabase(options, this);
},
function enqueueJob(err, userDatabase) {
assert.ifError(err);
var next = this;
checkAborted('enqueueJob');
if ( req.profiler ) {
req.profiler.done('setDBAuth');
}
pg = new PSQL(userDatabase, {}, { destroyOnError: true });
var enqueueJobQuery = [
'INSERT INTO cdb_jobs (',
'user_id, query',
') VALUES (',
'\'' + cdbUsername + '\', ',
'\'' + sql + '\' ',
');'
].join('\n');
pg.query(enqueueJobQuery, function (err, result) {
if (err) {
return next(err);
}
next(null, {
job: result,
host: userDatabase.host
});
});
},
function handleResponse(err, result) {
if ( err ) {
handleException(err, res);
}
if ( req.profiler ) {
req.profiler.done('enqueueJob');
res.header('X-SQLAPI-Profiler', req.profiler.toJSONString());
}
if (global.settings.api_hostname) {
res.header('X-Served-By-Host', global.settings.api_hostname);
}
if (result.host) {
res.header('X-Served-By-DB-Host', result.host);
}
res.send({
job_id: result.job.job_id
});
}
);
};
module.exports = JobController;

View File

@ -5,7 +5,7 @@ var step = require('step');
var assert = require('assert'); var assert = require('assert');
var PSQL = require('cartodb-psql'); var PSQL = require('cartodb-psql');
var AuthApi = require('../auth/auth_api'); var UserDatabaseService = require('../services/user_database_service');
var CdbRequest = require('../models/cartodb_request'); var CdbRequest = require('../models/cartodb_request');
var formats = require('../models/formats'); var formats = require('../models/formats');
@ -19,6 +19,7 @@ var generateCacheKey = require('../utils/cache_key_generator');
var handleException = require('../utils/error_handler'); var handleException = require('../utils/error_handler');
var cdbReq = new CdbRequest(); var cdbReq = new CdbRequest();
var userDatabaseService = new UserDatabaseService();
function QueryController(metadataBackend, tableCache, statsd_client) { function QueryController(metadataBackend, tableCache, statsd_client) {
this.metadataBackend = metadataBackend; this.metadataBackend = metadataBackend;
@ -33,7 +34,7 @@ QueryController.prototype.route = function (app) {
// jshint maxcomplexity:21 // jshint maxcomplexity:21
QueryController.prototype.handleQuery = function (req, res) { QueryController.prototype.handleQuery = function (req, res) {
var _this = this; var self = this;
// extract input // extract input
var body = (req.body) ? req.body : {}; var body = (req.body) ? req.body : {};
var params = _.extend({}, req.query, body); // clone so don't modify req.params or req.body so oauth is not broken var params = _.extend({}, req.query, body); // clone so don't modify req.params or req.body so oauth is not broken
@ -115,88 +116,34 @@ QueryController.prototype.handleQuery = function (req, res) {
var pg; var pg;
// Database options // Database options
var dbopts = { var dbopts = {};
port: global.settings.db_port,
pass: global.settings.db_pubuser_pass
};
var authenticated = false;
var formatter; var formatter;
var authApi = new AuthApi(req, params),
dbParams;
if ( req.profiler ) { if ( req.profiler ) {
req.profiler.done('init'); req.profiler.done('init');
} }
// 1. Get database from redis via the username stored in the host header subdomain // 1. Get user database and related parameters
// 2. Run the request through OAuth to get R/W user id if signed
// 3. Get the list of tables affected by the query // 3. Get the list of tables affected by the query
// 4. Setup headers // 4. Setup headers
// 5. Send formatted results back // 5. Send formatted results back
step( step(
function getDatabaseConnectionParams() { function getUserDBInfo() {
checkAborted('getDatabaseConnectionParams'); var options = {
// If the request is providing credentials it may require every DB parameters req: req,
if (authApi.hasCredentials()) { params: params,
_this.metadataBackend.getAllUserDBParams(cdbUsername, this); checkAborted: checkAborted,
} else { metadataBackend: self.metadataBackend,
_this.metadataBackend.getUserDBPublicConnectionParams(cdbUsername, this); cdbUsername: cdbUsername
} };
userDatabaseService.getUserDatabase(options, this);
}, },
function authenticate(err, userDBParams) { function queryExplain(err, userDatabase){
if (err) {
err.http_status = 404;
err.message = "Sorry, we can't find CartoDB user '" + cdbUsername + "'. " +
"Please check that you have entered the correct domain.";
throw err;
}
if ( req.profiler ) {
req.profiler.done('getDBParams');
}
dbParams = userDBParams;
dbopts.host = dbParams.dbhost;
dbopts.dbname = dbParams.dbname;
dbopts.user = (!!dbParams.dbpublicuser) ? dbParams.dbpublicuser : global.settings.db_pubuser;
authApi.verifyCredentials({
metadataBackend: _this.metadataBackend,
apiKey: dbParams.apikey
}, this);
},
function setDBAuth(err, isAuthenticated) {
if (err) {
throw err;
}
if ( req.profiler ) {
req.profiler.done('authenticate');
}
if (_.isBoolean(isAuthenticated) && isAuthenticated) {
authenticated = isAuthenticated;
dbopts.user = _.template(global.settings.db_user, {user_id: dbParams.dbuser});
if ( global.settings.hasOwnProperty('db_user_pass') ) {
dbopts.pass = _.template(global.settings.db_user_pass, {
user_id: dbParams.dbuser,
user_password: dbParams.dbpass
});
} else {
delete dbopts.pass;
}
}
return null;
},
function queryExplain(err){
var self = this;
assert.ifError(err); assert.ifError(err);
var next = this;
dbopts = userDatabase;
if ( req.profiler ) { if ( req.profiler ) {
req.profiler.done('setDBAuth'); req.profiler.done('setDBAuth');
} }
@ -205,7 +152,7 @@ QueryController.prototype.handleQuery = function (req, res) {
pg = new PSQL(dbopts, {}, { destroyOnError: true }); pg = new PSQL(dbopts, {}, { destroyOnError: true });
// get all the tables from Cache or SQL // get all the tables from Cache or SQL
tableCacheItem = _this.tableCache.get(sql_md5); tableCacheItem = self.tableCache.get(sql_md5);
if (tableCacheItem) { if (tableCacheItem) {
tableCacheItem.hits++; tableCacheItem.hits++;
return false; return false;
@ -235,7 +182,7 @@ QueryController.prototype.handleQuery = function (req, res) {
console.error("Error on query explain '%s': %s", sql, errorMessage); console.error("Error on query explain '%s': %s", sql, errorMessage);
} }
return self(null, { return next(null, {
affectedTables: tableNames, affectedTables: tableNames,
lastUpdatedTime: lastUpdatedTime lastUpdatedTime: lastUpdatedTime
}); });
@ -261,10 +208,10 @@ QueryController.prototype.handleQuery = function (req, res) {
// initialise hit counter // initialise hit counter
hits: 1 hits: 1
}; };
_this.tableCache.set(sql_md5, tableCacheItem); self.tableCache.set(sql_md5, tableCacheItem);
} }
if ( !authenticated && tableCacheItem ) { if ( !dbopts.authenticated && tableCacheItem ) {
var affected_tables = tableCacheItem.affected_tables; var affected_tables = tableCacheItem.affected_tables;
for ( var i = 0; i < affected_tables.length; ++i ) { for ( var i = 0; i < affected_tables.length; ++i ) {
var t = affected_tables[i]; var t = affected_tables[i];
@ -305,7 +252,7 @@ QueryController.prototype.handleQuery = function (req, res) {
// Only set an X-Cache-Channel for responses we want Varnish to cache. // Only set an X-Cache-Channel for responses we want Varnish to cache.
if ( tableCacheItem && tableCacheItem.affected_tables.length > 0 && !tableCacheItem.may_write ) { if ( tableCacheItem && tableCacheItem.affected_tables.length > 0 && !tableCacheItem.may_write ) {
res.header('X-Cache-Channel', generateCacheKey(dbopts.dbname, tableCacheItem, authenticated)); res.header('X-Cache-Channel', generateCacheKey(dbopts.dbname, tableCacheItem, dbopts.authenticated));
} }
var lastModified = (tableCacheItem && tableCacheItem.last_modified) ? var lastModified = (tableCacheItem && tableCacheItem.last_modified) ?
@ -362,11 +309,11 @@ QueryController.prototype.handleQuery = function (req, res) {
if ( req.profiler ) { if ( req.profiler ) {
req.profiler.sendStats(); // TODO: do on nextTick ? req.profiler.sendStats(); // TODO: do on nextTick ?
} }
if (_this.statsd_client) { if (self.statsd_client) {
if ( err ) { if ( err ) {
_this.statsd_client.increment('sqlapi.query.error'); self.statsd_client.increment('sqlapi.query.error');
} else { } else {
_this.statsd_client.increment('sqlapi.query.success'); self.statsd_client.increment('sqlapi.query.success');
} }
} }
} }
@ -374,8 +321,8 @@ QueryController.prototype.handleQuery = function (req, res) {
} catch (err) { } catch (err) {
handleException(err, res); handleException(err, res);
if (_this.statsd_client) { if (self.statsd_client) {
_this.statsd_client.increment('sqlapi.query.error'); self.statsd_client.increment('sqlapi.query.error');
} }
} }

View File

@ -0,0 +1,96 @@
'use strict';
var step = require('step');
var _ = require('underscore');
var AuthApi = require('../auth/auth_api');
function DbService() {
}
DbService.prototype.getUserDatabase = function (options, callback) {
var req = options.req;
var params = options.params;
var checkAborted = options.checkAborted;
var metadataBackend = options.metadataBackend;
var cdbUsername = options.cdbUsername;
var authApi = new AuthApi(req, params);
var dbParams;
var dbopts = {
port: global.settings.db_port,
pass: global.settings.db_pubuser_pass
};
// 1. Get database from redis via the username stored in the host header subdomain
// 2. Run the request through OAuth to get R/W user id if signed
// 3. Set to user authorization params
step(
function getDatabaseConnectionParams() {
checkAborted('getDatabaseConnectionParams');
// If the request is providing credentials it may require every DB parameters
if (authApi.hasCredentials()) {
metadataBackend.getAllUserDBParams(cdbUsername, this);
} else {
metadataBackend.getUserDBPublicConnectionParams(cdbUsername, this);
}
},
function authenticate(err, userDBParams) {
if (err) {
err.http_status = 404;
err.message = "Sorry, we can't find CartoDB user '" + cdbUsername + "'. " +
"Please check that you have entered the correct domain.";
return callback(err);
}
if ( req.profiler ) {
req.profiler.done('getDBParams');
}
dbParams = userDBParams;
dbopts.host = dbParams.dbhost;
dbopts.dbname = dbParams.dbname;
dbopts.user = (!!dbParams.dbpublicuser) ? dbParams.dbpublicuser : global.settings.db_pubuser;
authApi.verifyCredentials({
metadataBackend: metadataBackend,
apiKey: dbParams.apikey
}, this);
},
function setDBAuth(err, isAuthenticated) {
if (err) {
throw err;
}
if ( req.profiler ) {
req.profiler.done('authenticate');
}
if (_.isBoolean(isAuthenticated) && isAuthenticated) {
dbopts.authenticated = isAuthenticated;
dbopts.user = _.template(global.settings.db_user, {user_id: dbParams.dbuser});
if ( global.settings.hasOwnProperty('db_user_pass') ) {
dbopts.pass = _.template(global.settings.db_user_pass, {
user_id: dbParams.dbuser,
user_password: dbParams.dbpass
});
} else {
delete dbopts.pass;
}
}
return dbopts;
},
function errorHandle(err, dbopts) {
if (err) {
return callback(err);
}
callback(null, dbopts);
}
);
};
module.exports = DbService;

View File

@ -0,0 +1,48 @@
/**
*
* Requires the database and tables setup in config/environments/test.js to exist
* Ensure the user is present in the pgbouncer auth file too
* TODO: Add OAuth tests.
*
* To run this test, ensure that cartodb_test_user_1_db metadata exists
* in Redis for the vizzuality.cartodb.com domain
*
* SELECT 5
* HSET rails:users:vizzuality id 1
* HSET rails:users:vizzuality database_name cartodb_test_user_1_db
*
*/
require('../helper');
var app = require(global.settings.app_root + '/app/app')();
var assert = require('../support/assert');
describe.only('job.test', function() {
it('GET /api/v2/job', function (done){
assert.response(app, {
url: '/api/v2/job',
headers: { host: 'vizzuality.cartodb.com' },
method: 'GET'
}, {
status: 400
}, function(res) {
assert.deepEqual(res.headers['content-type'], 'application/json; charset=utf-8');
assert.deepEqual(res.headers['content-disposition'], 'inline');
assert.deepEqual(JSON.parse(res.body), {"error":["You must indicate a sql query"]});
done();
});
});
it('GET /api/v2/job with SQL parameter on SELECT no database param,just id using headers', function(done){
assert.response(app, {
url: '/api/v2/job?q=SELECT%20*%20FROM%20untitle_table_4',
headers: { host: 'vizzuality.cartodb.com' },
method: 'GET'
}, {
}, function (res) {
assert.equal(res.statusCode, 200, res.body);
done();
});
});
});

View File

@ -67,6 +67,7 @@ if test x"$PREPARE_PGSQL" = xyes; then
echo "preparing postgres..." echo "preparing postgres..."
dropdb ${TEST_DB} # 2> /dev/null # error expected if doesn't exist, but not otherwise dropdb ${TEST_DB} # 2> /dev/null # error expected if doesn't exist, but not otherwise
createdb -Ttemplate_postgis -EUTF8 ${TEST_DB} || die "Could not create test database" createdb -Ttemplate_postgis -EUTF8 ${TEST_DB} || die "Could not create test database"
psql -c 'CREATE EXTENSION "uuid-ossp";' ${TEST_DB}
cat test.sql | cat test.sql |
sed "s/:PUBLICUSER/${PUBLICUSER}/" | sed "s/:PUBLICUSER/${PUBLICUSER}/" |
sed "s/:PUBLICPASS/${PUBLICPASS}/" | sed "s/:PUBLICPASS/${PUBLICPASS}/" |
@ -108,7 +109,7 @@ HMSET rails:users:vizzuality \
id 1 \ id 1 \
database_name ${TEST_DB} \ database_name ${TEST_DB} \
database_host localhost \ database_host localhost \
map_key 1234 map_key 1234
SADD rails:users:vizzuality:map_key 1235 SADD rails:users:vizzuality:map_key 1235
EOF EOF
@ -137,5 +138,3 @@ fi
echo "ok, you can run test now" echo "ok, you can run test now"

View File

@ -1,8 +1,8 @@
-- --
-- sql-api test database -- sql-api test database
-- --
-- To use: -- To use:
-- --
-- > dropdb -Upostgres -hlocalhost cartodb_test_user_1_db -- > dropdb -Upostgres -hlocalhost cartodb_test_user_1_db
-- > createdb -Upostgres -hlocalhost -Ttemplate_postgis -Opostgres -EUTF8 cartodb_test_user_1_db -- > createdb -Upostgres -hlocalhost -Ttemplate_postgis -Opostgres -EUTF8 cartodb_test_user_1_db
-- > psql -Upostgres -hlocalhost cartodb_test_user_1_db < test.sql -- > psql -Upostgres -hlocalhost cartodb_test_user_1_db < test.sql
@ -20,6 +20,20 @@ SET search_path = public, pg_catalog;
SET default_tablespace = ''; SET default_tablespace = '';
SET default_with_oids = false; SET default_with_oids = false;
-- jobs table
DROP TABLE IF EXISTS cdb_jobs;
CREATE TABLE cdb_jobs (
job_id uuid DEFAULT uuid_generate_v4(),
user_id character varying,
status character varying DEFAULT 'pending',
query character varying,
updated_at timestamp without time zone DEFAULT now(),
created_at timestamp without time zone DEFAULT now()
);
ALTER TABLE ONLY cdb_jobs ADD CONSTRAINT cdb_jobs_pkey PRIMARY KEY (job_id);
CREATE INDEX cdb_jobs_idx ON cdb_jobs (created_at, status);
-- first table -- first table
DROP TABLE IF EXISTS untitle_table_4; DROP TABLE IF EXISTS untitle_table_4;
CREATE TABLE untitle_table_4 ( CREATE TABLE untitle_table_4 (
@ -119,6 +133,8 @@ ALTER ROLE :PUBLICUSER SET statement_timeout = 2000;
DROP USER IF EXISTS :TESTUSER; DROP USER IF EXISTS :TESTUSER;
CREATE USER :TESTUSER WITH PASSWORD ':TESTPASS'; CREATE USER :TESTUSER WITH PASSWORD ':TESTPASS';
GRANT ALL ON TABLE cdb_jobs TO :TESTUSER;
GRANT ALL ON TABLE cdb_jobs TO :PUBLICUSER;
GRANT ALL ON TABLE untitle_table_4 TO :TESTUSER; GRANT ALL ON TABLE untitle_table_4 TO :TESTUSER;
GRANT SELECT ON TABLE untitle_table_4 TO :PUBLICUSER; GRANT SELECT ON TABLE untitle_table_4 TO :PUBLICUSER;
GRANT ALL ON TABLE private_table TO :TESTUSER; GRANT ALL ON TABLE private_table TO :TESTUSER;