diff --git a/app/app.js b/app/app.js index c942a212..b339bade 100644 --- a/app/app.js +++ b/app/app.js @@ -23,6 +23,7 @@ var LRU = require('lru-cache'); var GenericController = require('./controllers/generic_controller'); var QueryController = require('./controllers/query_controller'); +var JobController = require('./controllers/job_controller'); var CacheStatusController = require('./controllers/cache_status_controller'); var HealthCheckController = require('./controllers/health_check_controller'); var VersionController = require('./controllers/version_controller'); @@ -165,6 +166,9 @@ function App() { var queryController = new QueryController(metadataBackend, tableCache, statsd_client); queryController.route(app); + var jobController = new JobController(metadataBackend, tableCache, statsd_client); + jobController.route(app); + var cacheStatusController = new CacheStatusController(tableCache); cacheStatusController.route(app); diff --git a/app/controllers/job_controller.js b/app/controllers/job_controller.js new file mode 100644 index 00000000..6588b7b2 --- /dev/null +++ b/app/controllers/job_controller.js @@ -0,0 +1,134 @@ +'use strict'; + +var _ = require('underscore'); +var step = require('step'); +var assert = require('assert'); +var PSQL = require('cartodb-psql'); +var uuid = require('uuid'); + +var UserDatabaseService = require('../services/user_database_service'); +var CdbRequest = require('../models/cartodb_request'); +var handleException = require('../utils/error_handler'); + +var cdbReq = new CdbRequest(); +var userDatabaseService = new UserDatabaseService(); + +function JobController(metadataBackend, tableCache, statsd_client) { + this.metadataBackend = metadataBackend; + this.tableCache = tableCache; + this.statsd_client = statsd_client; +} + +JobController.prototype.route = function (app) { + app.all(global.settings.base_url + '/job', this.handleJob.bind(this)); +}; + +// jshint maxcomplexity:21 +JobController.prototype.handleJob = function (req, res) { + var self = this; + var body = (req.body) ? req.body : {}; + var params = _.extend({}, req.query, body); // clone so don't modify req.params or req.body so oauth is not broken + var sql = (params.q === "" || _.isUndefined(params.q)) ? null : params.q; + var cdbUsername = cdbReq.userByReq(req); + + if (!_.isString(sql)) { + return handleException(new Error("You must indicate a sql query"), res); + } + + if ( req.profiler ) { + req.profiler.start('sqlapi.job'); + } + + req.aborted = false; + req.on("close", function() { + if (req.formatter && _.isFunction(req.formatter.cancel)) { + req.formatter.cancel(); + } + req.aborted = true; // TODO: there must be a builtin way to check this + }); + + function checkAborted(step) { + if ( req.aborted ) { + var err = new Error("Request aborted during " + step); + // We'll use status 499, same as ngnix in these cases + // see http://en.wikipedia.org/wiki/List_of_HTTP_status_codes#4xx_Client_Error + err.http_status = 499; + throw err; + } + } + + var pg; + + if ( req.profiler ) { + req.profiler.done('init'); + } + + step( + function getUserDBInfo() { + var options = { + req: req, + params: params, + checkAborted: checkAborted, + metadataBackend: self.metadataBackend, + cdbUsername: cdbUsername + }; + userDatabaseService.getUserDatabase(options, this); + }, + function enqueueJob(err, userDatabase) { + assert.ifError(err); + + var next = this; + + checkAborted('enqueueJob'); + + if ( req.profiler ) { + req.profiler.done('setDBAuth'); + } + + pg = new PSQL(userDatabase, {}, { destroyOnError: true }); + + var enqueueJobQuery = [ + 'INSERT INTO cdb_jobs (', + 'user_id, query', + ') VALUES (', + '\'' + cdbUsername + '\', ', + '\'' + sql + '\' ', + ');' + ].join('\n'); + + pg.query(enqueueJobQuery, function (err, result) { + if (err) { + return next(err); + } + next(null, { + job: result, + host: userDatabase.host + }); + }); + }, + function handleResponse(err, result) { + if ( err ) { + handleException(err, res); + } + + if ( req.profiler ) { + req.profiler.done('enqueueJob'); + res.header('X-SQLAPI-Profiler', req.profiler.toJSONString()); + } + + if (global.settings.api_hostname) { + res.header('X-Served-By-Host', global.settings.api_hostname); + } + + if (result.host) { + res.header('X-Served-By-DB-Host', result.host); + } + + res.send({ + job_id: result.job.job_id + }); + } + ); +}; + +module.exports = JobController; diff --git a/app/controllers/query_controller.js b/app/controllers/query_controller.js index 87cd0270..81421061 100644 --- a/app/controllers/query_controller.js +++ b/app/controllers/query_controller.js @@ -5,7 +5,7 @@ var step = require('step'); var assert = require('assert'); var PSQL = require('cartodb-psql'); -var AuthApi = require('../auth/auth_api'); +var UserDatabaseService = require('../services/user_database_service'); var CdbRequest = require('../models/cartodb_request'); var formats = require('../models/formats'); @@ -19,6 +19,7 @@ var generateCacheKey = require('../utils/cache_key_generator'); var handleException = require('../utils/error_handler'); var cdbReq = new CdbRequest(); +var userDatabaseService = new UserDatabaseService(); function QueryController(metadataBackend, tableCache, statsd_client) { this.metadataBackend = metadataBackend; @@ -33,7 +34,7 @@ QueryController.prototype.route = function (app) { // jshint maxcomplexity:21 QueryController.prototype.handleQuery = function (req, res) { - var _this = this; + var self = this; // extract input var body = (req.body) ? req.body : {}; var params = _.extend({}, req.query, body); // clone so don't modify req.params or req.body so oauth is not broken @@ -115,88 +116,34 @@ QueryController.prototype.handleQuery = function (req, res) { var pg; // Database options - var dbopts = { - port: global.settings.db_port, - pass: global.settings.db_pubuser_pass - }; - - var authenticated = false; - + var dbopts = {}; var formatter; - var authApi = new AuthApi(req, params), - dbParams; - if ( req.profiler ) { req.profiler.done('init'); } - // 1. Get database from redis via the username stored in the host header subdomain - // 2. Run the request through OAuth to get R/W user id if signed + // 1. Get user database and related parameters // 3. Get the list of tables affected by the query // 4. Setup headers // 5. Send formatted results back step( - function getDatabaseConnectionParams() { - checkAborted('getDatabaseConnectionParams'); - // If the request is providing credentials it may require every DB parameters - if (authApi.hasCredentials()) { - _this.metadataBackend.getAllUserDBParams(cdbUsername, this); - } else { - _this.metadataBackend.getUserDBPublicConnectionParams(cdbUsername, this); - } + function getUserDBInfo() { + var options = { + req: req, + params: params, + checkAborted: checkAborted, + metadataBackend: self.metadataBackend, + cdbUsername: cdbUsername + }; + userDatabaseService.getUserDatabase(options, this); }, - function authenticate(err, userDBParams) { - if (err) { - err.http_status = 404; - err.message = "Sorry, we can't find CartoDB user '" + cdbUsername + "'. " + - "Please check that you have entered the correct domain."; - throw err; - } - - if ( req.profiler ) { - req.profiler.done('getDBParams'); - } - - dbParams = userDBParams; - - dbopts.host = dbParams.dbhost; - dbopts.dbname = dbParams.dbname; - dbopts.user = (!!dbParams.dbpublicuser) ? dbParams.dbpublicuser : global.settings.db_pubuser; - - authApi.verifyCredentials({ - metadataBackend: _this.metadataBackend, - apiKey: dbParams.apikey - }, this); - }, - function setDBAuth(err, isAuthenticated) { - if (err) { - throw err; - } - - if ( req.profiler ) { - req.profiler.done('authenticate'); - } - - if (_.isBoolean(isAuthenticated) && isAuthenticated) { - authenticated = isAuthenticated; - dbopts.user = _.template(global.settings.db_user, {user_id: dbParams.dbuser}); - if ( global.settings.hasOwnProperty('db_user_pass') ) { - dbopts.pass = _.template(global.settings.db_user_pass, { - user_id: dbParams.dbuser, - user_password: dbParams.dbpass - }); - } else { - delete dbopts.pass; - } - } - return null; - }, - function queryExplain(err){ - var self = this; - + function queryExplain(err, userDatabase){ assert.ifError(err); + var next = this; + dbopts = userDatabase; + if ( req.profiler ) { req.profiler.done('setDBAuth'); } @@ -205,7 +152,7 @@ QueryController.prototype.handleQuery = function (req, res) { pg = new PSQL(dbopts, {}, { destroyOnError: true }); // get all the tables from Cache or SQL - tableCacheItem = _this.tableCache.get(sql_md5); + tableCacheItem = self.tableCache.get(sql_md5); if (tableCacheItem) { tableCacheItem.hits++; return false; @@ -235,7 +182,7 @@ QueryController.prototype.handleQuery = function (req, res) { console.error("Error on query explain '%s': %s", sql, errorMessage); } - return self(null, { + return next(null, { affectedTables: tableNames, lastUpdatedTime: lastUpdatedTime }); @@ -261,10 +208,10 @@ QueryController.prototype.handleQuery = function (req, res) { // initialise hit counter hits: 1 }; - _this.tableCache.set(sql_md5, tableCacheItem); + self.tableCache.set(sql_md5, tableCacheItem); } - if ( !authenticated && tableCacheItem ) { + if ( !dbopts.authenticated && tableCacheItem ) { var affected_tables = tableCacheItem.affected_tables; for ( var i = 0; i < affected_tables.length; ++i ) { var t = affected_tables[i]; @@ -305,7 +252,7 @@ QueryController.prototype.handleQuery = function (req, res) { // Only set an X-Cache-Channel for responses we want Varnish to cache. if ( tableCacheItem && tableCacheItem.affected_tables.length > 0 && !tableCacheItem.may_write ) { - res.header('X-Cache-Channel', generateCacheKey(dbopts.dbname, tableCacheItem, authenticated)); + res.header('X-Cache-Channel', generateCacheKey(dbopts.dbname, tableCacheItem, dbopts.authenticated)); } var lastModified = (tableCacheItem && tableCacheItem.last_modified) ? @@ -362,11 +309,11 @@ QueryController.prototype.handleQuery = function (req, res) { if ( req.profiler ) { req.profiler.sendStats(); // TODO: do on nextTick ? } - if (_this.statsd_client) { + if (self.statsd_client) { if ( err ) { - _this.statsd_client.increment('sqlapi.query.error'); + self.statsd_client.increment('sqlapi.query.error'); } else { - _this.statsd_client.increment('sqlapi.query.success'); + self.statsd_client.increment('sqlapi.query.success'); } } } @@ -374,8 +321,8 @@ QueryController.prototype.handleQuery = function (req, res) { } catch (err) { handleException(err, res); - if (_this.statsd_client) { - _this.statsd_client.increment('sqlapi.query.error'); + if (self.statsd_client) { + self.statsd_client.increment('sqlapi.query.error'); } } diff --git a/app/services/user_database_service.js b/app/services/user_database_service.js new file mode 100644 index 00000000..dfb8c33d --- /dev/null +++ b/app/services/user_database_service.js @@ -0,0 +1,96 @@ +'use strict'; + +var step = require('step'); +var _ = require('underscore'); +var AuthApi = require('../auth/auth_api'); + +function DbService() { +} + +DbService.prototype.getUserDatabase = function (options, callback) { + var req = options.req; + var params = options.params; + var checkAborted = options.checkAborted; + var metadataBackend = options.metadataBackend; + var cdbUsername = options.cdbUsername; + + var authApi = new AuthApi(req, params); + + var dbParams; + var dbopts = { + port: global.settings.db_port, + pass: global.settings.db_pubuser_pass + }; + + // 1. Get database from redis via the username stored in the host header subdomain + // 2. Run the request through OAuth to get R/W user id if signed + // 3. Set to user authorization params + step( + function getDatabaseConnectionParams() { + checkAborted('getDatabaseConnectionParams'); + // If the request is providing credentials it may require every DB parameters + if (authApi.hasCredentials()) { + metadataBackend.getAllUserDBParams(cdbUsername, this); + } else { + metadataBackend.getUserDBPublicConnectionParams(cdbUsername, this); + } + }, + function authenticate(err, userDBParams) { + if (err) { + err.http_status = 404; + err.message = "Sorry, we can't find CartoDB user '" + cdbUsername + "'. " + + "Please check that you have entered the correct domain."; + return callback(err); + } + + if ( req.profiler ) { + req.profiler.done('getDBParams'); + } + + dbParams = userDBParams; + + dbopts.host = dbParams.dbhost; + dbopts.dbname = dbParams.dbname; + dbopts.user = (!!dbParams.dbpublicuser) ? dbParams.dbpublicuser : global.settings.db_pubuser; + + authApi.verifyCredentials({ + metadataBackend: metadataBackend, + apiKey: dbParams.apikey + }, this); + }, + function setDBAuth(err, isAuthenticated) { + if (err) { + throw err; + } + + if ( req.profiler ) { + req.profiler.done('authenticate'); + } + + if (_.isBoolean(isAuthenticated) && isAuthenticated) { + dbopts.authenticated = isAuthenticated; + dbopts.user = _.template(global.settings.db_user, {user_id: dbParams.dbuser}); + if ( global.settings.hasOwnProperty('db_user_pass') ) { + dbopts.pass = _.template(global.settings.db_user_pass, { + user_id: dbParams.dbuser, + user_password: dbParams.dbpass + }); + } else { + delete dbopts.pass; + } + } + + return dbopts; + }, + function errorHandle(err, dbopts) { + if (err) { + return callback(err); + } + + callback(null, dbopts); + } + ); + +}; + +module.exports = DbService; diff --git a/test/acceptance/job.test.js b/test/acceptance/job.test.js new file mode 100644 index 00000000..4e51f2ed --- /dev/null +++ b/test/acceptance/job.test.js @@ -0,0 +1,48 @@ +/** + * + * Requires the database and tables setup in config/environments/test.js to exist + * Ensure the user is present in the pgbouncer auth file too + * TODO: Add OAuth tests. + * + * To run this test, ensure that cartodb_test_user_1_db metadata exists + * in Redis for the vizzuality.cartodb.com domain + * + * SELECT 5 + * HSET rails:users:vizzuality id 1 + * HSET rails:users:vizzuality database_name cartodb_test_user_1_db + * + */ +require('../helper'); + +var app = require(global.settings.app_root + '/app/app')(); +var assert = require('../support/assert'); + +describe.only('job.test', function() { + + it('GET /api/v2/job', function (done){ + assert.response(app, { + url: '/api/v2/job', + headers: { host: 'vizzuality.cartodb.com' }, + method: 'GET' + }, { + status: 400 + }, function(res) { + assert.deepEqual(res.headers['content-type'], 'application/json; charset=utf-8'); + assert.deepEqual(res.headers['content-disposition'], 'inline'); + assert.deepEqual(JSON.parse(res.body), {"error":["You must indicate a sql query"]}); + done(); + }); + }); + + it('GET /api/v2/job with SQL parameter on SELECT no database param,just id using headers', function(done){ + assert.response(app, { + url: '/api/v2/job?q=SELECT%20*%20FROM%20untitle_table_4', + headers: { host: 'vizzuality.cartodb.com' }, + method: 'GET' + }, { + }, function (res) { + assert.equal(res.statusCode, 200, res.body); + done(); + }); + }); +}); diff --git a/test/prepare_db.sh b/test/prepare_db.sh index bb233fb4..ed69b309 100755 --- a/test/prepare_db.sh +++ b/test/prepare_db.sh @@ -67,6 +67,7 @@ if test x"$PREPARE_PGSQL" = xyes; then echo "preparing postgres..." dropdb ${TEST_DB} # 2> /dev/null # error expected if doesn't exist, but not otherwise createdb -Ttemplate_postgis -EUTF8 ${TEST_DB} || die "Could not create test database" + psql -c 'CREATE EXTENSION "uuid-ossp";' ${TEST_DB} cat test.sql | sed "s/:PUBLICUSER/${PUBLICUSER}/" | sed "s/:PUBLICPASS/${PUBLICPASS}/" | @@ -108,7 +109,7 @@ HMSET rails:users:vizzuality \ id 1 \ database_name ${TEST_DB} \ database_host localhost \ - map_key 1234 + map_key 1234 SADD rails:users:vizzuality:map_key 1235 EOF @@ -137,5 +138,3 @@ fi echo "ok, you can run test now" - - diff --git a/test/test.sql b/test/test.sql index 7dc09320..0adfbc13 100644 --- a/test/test.sql +++ b/test/test.sql @@ -1,8 +1,8 @@ -- -- sql-api test database --- +-- -- To use: --- +-- -- > dropdb -Upostgres -hlocalhost cartodb_test_user_1_db -- > createdb -Upostgres -hlocalhost -Ttemplate_postgis -Opostgres -EUTF8 cartodb_test_user_1_db -- > psql -Upostgres -hlocalhost cartodb_test_user_1_db < test.sql @@ -20,6 +20,20 @@ SET search_path = public, pg_catalog; SET default_tablespace = ''; SET default_with_oids = false; +-- jobs table +DROP TABLE IF EXISTS cdb_jobs; +CREATE TABLE cdb_jobs ( + job_id uuid DEFAULT uuid_generate_v4(), + user_id character varying, + status character varying DEFAULT 'pending', + query character varying, + updated_at timestamp without time zone DEFAULT now(), + created_at timestamp without time zone DEFAULT now() +); + +ALTER TABLE ONLY cdb_jobs ADD CONSTRAINT cdb_jobs_pkey PRIMARY KEY (job_id); +CREATE INDEX cdb_jobs_idx ON cdb_jobs (created_at, status); + -- first table DROP TABLE IF EXISTS untitle_table_4; CREATE TABLE untitle_table_4 ( @@ -119,6 +133,8 @@ ALTER ROLE :PUBLICUSER SET statement_timeout = 2000; DROP USER IF EXISTS :TESTUSER; CREATE USER :TESTUSER WITH PASSWORD ':TESTPASS'; +GRANT ALL ON TABLE cdb_jobs TO :TESTUSER; +GRANT ALL ON TABLE cdb_jobs TO :PUBLICUSER; GRANT ALL ON TABLE untitle_table_4 TO :TESTUSER; GRANT SELECT ON TABLE untitle_table_4 TO :PUBLICUSER; GRANT ALL ON TABLE private_table TO :TESTUSER;