Merge pull request #619 from CartoDB/routes-hierarchy

Implement router hierarchy to be able to attach middlewares to different routers
This commit is contained in:
Daniel G. Aubert 2019-10-07 17:53:36 +02:00 committed by GitHub
commit df6d6b01e2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
207 changed files with 1241 additions and 669 deletions

View File

@ -11,7 +11,7 @@ check:
jshint:
@echo "***jshint***"
@./node_modules/.bin/jshint app/ batch/ test/ app.js
@./node_modules/.bin/jshint lib/ test/ app.js
TEST_SUITE := $(shell find test/{unit,integration,acceptance} -name "*.js")
TEST_SUITE_UNIT := $(shell find test/unit -name "*.js")

61
app.js
View File

@ -2,24 +2,15 @@
'use strict';
/*
* SQL API loader
* ===============
*
* node app [environment]
*
* environments: [development, test, production]
*
*/
var fs = require('fs');
var path = require('path');
const fs = require('fs');
const path = require('path');
const fqdn = require('@carto/fqdn-sync');
var argv = require('yargs')
.usage('Usage: $0 <environment> [options]')
const argv = require('yargs')
.usage('Usage: node $0 <environment> [options]')
.help('h')
.example(
'$0 production -c /etc/sql-api/config.js',
'node $0 production -c /etc/sql-api/config.js',
'start server in production environment with /etc/sql-api/config.js as config file'
)
.alias('h', 'help')
@ -28,18 +19,20 @@ var argv = require('yargs')
.describe('c', 'Load configuration from path')
.argv;
var environmentArg = argv._[0] || process.env.NODE_ENV || 'development';
var configurationFile = path.resolve(argv.config || './config/environments/' + environmentArg + '.js');
const environmentArg = argv._[0] || process.env.NODE_ENV || 'development';
const configurationFile = path.resolve(argv.config || './config/environments/' + environmentArg + '.js');
if (!fs.existsSync(configurationFile)) {
console.error('Configuration file "%s" does not exist', configurationFile);
process.exit(1);
}
global.settings = require(configurationFile);
var ENVIRONMENT = argv._[0] || process.env.NODE_ENV || global.settings.environment;
const ENVIRONMENT = argv._[0] || process.env.NODE_ENV || global.settings.environment;
process.env.NODE_ENV = ENVIRONMENT;
var availableEnvironments = ['development', 'production', 'test', 'staging'];
const availableEnvironments = ['development', 'production', 'test', 'staging'];
// sanity check arguments
if (availableEnvironments.indexOf(ENVIRONMENT) === -1) {
@ -51,14 +44,14 @@ if (availableEnvironments.indexOf(ENVIRONMENT) === -1) {
global.settings.api_hostname = fqdn.hostname();
global.log4js = require('log4js');
var log4jsConfig = {
const log4jsConfig = {
appenders: [],
replaceConsole: true
};
if ( global.settings.log_filename ) {
var logFilename = path.resolve(global.settings.log_filename);
var logDirectory = path.dirname(logFilename);
if (global.settings.log_filename) {
const logFilename = path.resolve(global.settings.log_filename);
const logDirectory = path.dirname(logFilename);
if (!fs.existsSync(logDirectory)) {
console.error("Log filename directory does not exist: " + logDirectory);
process.exit(1);
@ -72,28 +65,26 @@ if ( global.settings.log_filename ) {
{ type: "console", layout: { type:'basic' } }
);
}
global.log4js.configure(log4jsConfig);
global.logger = global.log4js.getLogger();
const version = require("./package").version;
// kick off controller
if ( ! global.settings.base_url ) {
global.settings.base_url = '/api/*';
}
const StatsClient = require('./lib/stats/client');
var version = require("./package").version;
var StatsClient = require('./app/stats/client');
if (global.settings.statsd) {
// Perform keyword substitution in statsd
if (global.settings.statsd.prefix) {
global.settings.statsd.prefix = global.settings.statsd.prefix.replace(/:host/, fqdn.reverse());
}
}
var statsClient = StatsClient.getInstance(global.settings.statsd);
const statsClient = StatsClient.getInstance(global.settings.statsd);
var server = require('./app/server')(statsClient);
var listener = server.listen(global.settings.node_port, global.settings.node_host);
const createServer = require('./lib/server');
const server = createServer(statsClient);
const listener = server.listen(global.settings.node_port, global.settings.node_host);
listener.on('listening', function() {
console.info("Using Node.js %s", process.version);
console.info('Using configuration file "%s"', configurationFile);
@ -181,9 +172,9 @@ function scheduleForcedExit ({ killTimeout }) {
}
function isGteMinVersion(version, minVersion) {
var versionMatch = /[a-z]?([0-9]*)/.exec(version);
const versionMatch = /[a-z]?([0-9]*)/.exec(version);
if (versionMatch) {
var majorVersion = parseInt(versionMatch[1], 10);
const majorVersion = parseInt(versionMatch[1], 10);
if (Number.isFinite(majorVersion)) {
return majorVersion >= minVersion;
}
@ -234,7 +225,7 @@ setInterval(function cpuUsageMetrics () {
}, 5000);
if (global.gc && isGteMinVersion(process.version, 6)) {
var gcInterval = Number.isFinite(global.settings.gc_interval) ?
const gcInterval = Number.isFinite(global.settings.gc_interval) ?
global.settings.gc_interval :
10000;

View File

@ -1,14 +0,0 @@
'use strict';
function GenericController() {
}
GenericController.prototype.route = function (app) {
app.options('*', this.handleRequest.bind(this));
};
GenericController.prototype.handleRequest = function(req, res) {
res.end();
};
module.exports = GenericController;

View File

@ -1,35 +0,0 @@
'use strict';
var HealthCheck = require('../monitoring/health_check');
function HealthCheckController() {
this.healthCheck = new HealthCheck(global.settings.disabled_file);
}
HealthCheckController.prototype.route = function (app) {
app.get(global.settings.base_url + '/health', this.handleHealthCheck.bind(this));
};
HealthCheckController.prototype.handleHealthCheck = function (req, res) {
var healthConfig = global.settings.health || {};
if (!!healthConfig.enabled) {
var startTime = Date.now();
this.healthCheck.check(function(err) {
var ok = !err;
var response = {
enabled: true,
ok: ok,
elapsed: Date.now() - startTime
};
if (err) {
response.err = err.message;
}
res.status(ok ? 200 : 503).send(response);
});
} else {
res.status(200).send({enabled: false, ok: true});
}
};
module.exports = HealthCheckController;

View File

@ -1,18 +0,0 @@
'use strict';
var version = {
cartodb_sql_api: require(__dirname + '/../../package.json').version
};
function VersionController() {
}
VersionController.prototype.route = function (app) {
app.get(global.settings.base_url + '/version', this.handleVersion.bind(this));
};
VersionController.prototype.handleVersion = function (req, res) {
res.send(version);
};
module.exports = VersionController;

View File

@ -1,207 +0,0 @@
'use strict';
// CartoDB SQL API
//
// all requests expect the following URL args:
// - `sql` {String} SQL to execute
//
// for private (read/write) queries:
// - OAuth. Must have proper OAuth 1.1 headers. For OAuth 1.1 spec see Google
//
// eg. /api/v1/?sql=SELECT 1 as one (with a load of OAuth headers or URL arguments)
//
// for public (read only) queries:
// - sql only, provided the subdomain exists in CartoDB and the table's sharing options are public
//
// eg. vizzuality.cartodb.com/api/v1/?sql=SELECT * from my_table
//
var express = require('express');
var Profiler = require('./stats/profiler-proxy');
var _ = require('underscore');
var fs = require('fs');
var mkdirp = require('mkdirp');
var RedisPool = require('redis-mpool');
var cartodbRedis = require('cartodb-redis');
var UserDatabaseService = require('./services/user_database_service');
var UserLimitsService = require('./services/user_limits');
var BatchLogger = require('../batch/batch-logger');
var JobPublisher = require('../batch/pubsub/job-publisher');
var JobQueue = require('../batch/job_queue');
var JobBackend = require('../batch/job_backend');
var JobCanceller = require('../batch/job_canceller');
var JobService = require('../batch/job_service');
const Logger = require('./services/logger');
var cors = require('./middlewares/cors');
var GenericController = require('./controllers/generic_controller');
var QueryController = require('./controllers/query_controller');
var CopyController = require('./controllers/copy_controller');
var JobController = require('./controllers/job_controller');
var HealthCheckController = require('./controllers/health_check_controller');
var VersionController = require('./controllers/version_controller');
var batchFactory = require('../batch');
process.env.PGAPPNAME = process.env.PGAPPNAME || 'cartodb_sqlapi';
// override Date.toJSON
require('./utils/date_to_json');
// jshint maxcomplexity:9
function App(statsClient) {
var app = express();
var redisPool = new RedisPool({
name: 'sql-api',
host: global.settings.redis_host,
port: global.settings.redis_port,
max: global.settings.redisPool,
idleTimeoutMillis: global.settings.redisIdleTimeoutMillis,
reapIntervalMillis: global.settings.redisReapIntervalMillis
});
var metadataBackend = cartodbRedis({ pool: redisPool });
// Set default configuration
global.settings.db_pubuser = global.settings.db_pubuser || "publicuser";
global.settings.bufferedRows = global.settings.bufferedRows || 1000;
global.settings.ratelimits = Object.assign(
{
rateLimitsEnabled: false,
endpoints: {
query: false,
job_create: false,
job_get: false,
job_delete: false,
copy_from: false,
copy_to: false
}
},
global.settings.ratelimits
);
global.settings.tmpDir = global.settings.tmpDir || '/tmp';
if (!fs.existsSync(global.settings.tmpDir)) {
mkdirp.sync(global.settings.tmpDir);
}
if ( global.log4js ) {
var loggerOpts = {
buffer: true,
// log4js provides a tokens solution as expess but in does not provide the request/response in the callback.
// Thus it is not possible to extract relevant information from them.
// This is a workaround to be able to access request/response.
format: function(req, res, format) {
var logFormat = global.settings.log_format ||
':remote-addr :method :req[Host]:url :status :response-time ms -> :res[Content-Type]';
return format(logFormat);
}
};
app.use(global.log4js.connectLogger(global.log4js.getLogger(), _.defaults(loggerOpts, {level:'info'})));
}
app.use(cors());
// Use step-profiler
app.use(function bootstrap$prepareRequestResponse(req, res, next) {
res.locals = res.locals || {};
if (global.settings.api_hostname) {
res.header('X-Served-By-Host', global.settings.api_hostname);
}
var profile = global.settings.useProfiler;
req.profiler = new Profiler({
profile: profile,
statsd_client: statsClient
});
next();
});
// Set connection timeout
if ( global.settings.hasOwnProperty('node_socket_timeout') ) {
var timeout = parseInt(global.settings.node_socket_timeout);
app.use(function(req, res, next) {
req.connection.setTimeout(timeout);
next();
});
}
app.enable('jsonp callback');
app.set("trust proxy", true);
app.disable('x-powered-by');
app.disable('etag');
// basic routing
var userDatabaseService = new UserDatabaseService(metadataBackend);
const userLimitsServiceOptions = {
limits: {
rateLimitsEnabled: global.settings.ratelimits.rateLimitsEnabled
}
};
const userLimitsService = new UserLimitsService(metadataBackend, userLimitsServiceOptions);
const dataIngestionLogger = new Logger(global.settings.dataIngestionLogPath, 'data-ingestion');
app.dataIngestionLogger = dataIngestionLogger;
var logger = new BatchLogger(global.settings.batch_log_filename, 'batch-queries');
var jobPublisher = new JobPublisher(redisPool);
var jobQueue = new JobQueue(metadataBackend, jobPublisher, logger);
var jobBackend = new JobBackend(metadataBackend, jobQueue, logger);
var jobCanceller = new JobCanceller();
var jobService = new JobService(jobBackend, jobCanceller, logger);
var genericController = new GenericController();
genericController.route(app);
var queryController = new QueryController(
metadataBackend,
userDatabaseService,
statsClient,
userLimitsService
);
queryController.route(app);
var copyController = new CopyController(
metadataBackend,
userDatabaseService,
userLimitsService,
dataIngestionLogger
);
copyController.route(app);
var jobController = new JobController(
metadataBackend,
userDatabaseService,
jobService,
statsClient,
userLimitsService
);
jobController.route(app);
var healthCheckController = new HealthCheckController();
healthCheckController.route(app);
var versionController = new VersionController();
versionController.route(app);
var isBatchProcess = process.argv.indexOf('--no-batch') === -1;
if (global.settings.environment !== 'test' && isBatchProcess) {
var batchName = global.settings.api_hostname || 'batch';
app.batch = batchFactory(
metadataBackend, redisPool, batchName, statsClient, global.settings.batch_log_filename
);
app.batch.start();
}
return app;
}
module.exports = App;

View File

@ -1,9 +1,36 @@
// Time in milliseconds to force GC cycle.
// Disable by using <=0 value.
module.exports.gc_interval = 10000;
// In case the base_url has a :user param the username will be the one specified in the URL,
// otherwise it will fallback to extract the username from the host header.
module.exports.base_url = '(?:/api/:version|/user/:user/api/:version)';
module.exports.routes = {
// Each entry corresponds with an express' router.
// You must define at least one path. However, middlewares are optional.
api: [{
// Required: path where other "routers" or "controllers" will be attached to.
paths: [
// In case the path has a :user param the username will be the one specified in the URL,
// otherwise it will fallback to extract the username from the host header.
'/api/:version',
'/user/:user/api/:version',
],
// Optional: attach middlewares at the begining of the router
// to perform custom operations.
middlewares: [
function noop () {
return function noopMiddleware (req, res, next) {
next();
}
}
],
sql: [{
// Required
paths: [
'/sql'
],
// Optional
middlewares: []
}]
}]
};
// If useProfiler is true every response will be served with an
// X-SQLAPI-Profile header containing elapsed timing for various
// steps taken for producing the response.

View File

@ -1,9 +1,36 @@
// Time in milliseconds to force GC cycle.
// Disable by using <=0 value.
module.exports.gc_interval = 10000;
// In case the base_url has a :user param the username will be the one specified in the URL,
// otherwise it will fallback to extract the username from the host header.
module.exports.base_url = '(?:/api/:version|/user/:user/api/:version)';
module.exports.routes = {
// Each entry corresponds with an express' router.
// You must define at least one path. However, middlewares are optional.
api: [{
// Required: path where other "routers" or "controllers" will be attached to.
paths: [
// In case the path has a :user param the username will be the one specified in the URL,
// otherwise it will fallback to extract the username from the host header.
'/api/:version',
'/user/:user/api/:version',
],
// Optional: attach middlewares at the begining of the router
// to perform custom operations.
middlewares: [
function noop () {
return function noopMiddleware (req, res, next) {
next();
}
}
],
sql: [{
// Required
paths: [
'/sql'
],
// Optional
middlewares: []
}]
}]
};
// If useProfiler is true every response will be served with an
// X-SQLAPI-Profile header containing elapsed timing for various
// steps taken for producing the response.

View File

@ -1,9 +1,36 @@
// Time in milliseconds to force GC cycle.
// Disable by using <=0 value.
module.exports.gc_interval = 10000;
// In case the base_url has a :user param the username will be the one specified in the URL,
// otherwise it will fallback to extract the username from the host header.
module.exports.base_url = '(?:/api/:version|/user/:user/api/:version)';
module.exports.routes = {
// Each entry corresponds with an express' router.
// You must define at least one path. However, middlewares are optional.
api: [{
// Required: path where other "routers" or "controllers" will be attached to.
paths: [
// In case the path has a :user param the username will be the one specified in the URL,
// otherwise it will fallback to extract the username from the host header.
'/api/:version',
'/user/:user/api/:version',
],
// Optional: attach middlewares at the begining of the router
// to perform custom operations.
middlewares: [
function noop () {
return function noopMiddleware (req, res, next) {
next();
}
}
],
sql: [{
// Required
paths: [
'/sql'
],
// Optional
middlewares: []
}]
}]
};
// If useProfiler is true every response will be served with an
// X-SQLAPI-Profile header containing elapsed timing for various
// steps taken for producing the response.

View File

@ -1,9 +1,36 @@
// Time in milliseconds to force GC cycle.
// Disable by using <=0 value.
module.exports.gc_interval = 10000;
// In case the base_url has a :user param the username will be the one specified in the URL,
// otherwise it will fallback to extract the username from the host header.
module.exports.base_url = '(?:/api/:version|/user/:user/api/:version)';
module.exports.routes = {
// Each entry corresponds with an express' router.
// You must define at least one path. However, middlewares are optional.
api: [{
// Required: path where other "routers" or "controllers" will be attached to.
paths: [
// In case the path has a :user param the username will be the one specified in the URL,
// otherwise it will fallback to extract the username from the host header.
'/api/:version',
'/user/:user/api/:version',
],
// Optional: attach middlewares at the begining of the router
// to perform custom operations.
middlewares: [
function noop () {
return function noopMiddleware (req, res, next) {
next();
}
}
],
sql: [{
// Required
paths: [
'/sql'
],
// Optional
middlewares: []
}]
}]
};
// If useProfiler is true every response will be served with an
// X-SQLAPI-Profile header containing elapsed timing for various
// steps taken for producing the response.

64
lib/api/api-router.js Normal file
View File

@ -0,0 +1,64 @@
'use strict';
const { Router: router } = require('express');
const SqlRouter = require('./sql/sql-router');
const HealthCheckController = require('./health-check-controller');
const VersionController = require('./version-controller');
const JobsWipController = require('./jobs-wip-controller');
const BatchLogger = require('../batch/batch-logger');
const JobPublisher = require('../batch/pubsub/job-publisher');
const JobQueue = require('../batch/job-queue');
const JobBackend = require('../batch/job-backend');
const JobCanceller = require('../batch/job-canceller');
const JobService = require('../batch/job-service');
module.exports = class ApiRouter {
constructor ({ redisPool, metadataBackend, statsClient, dataIngestionLogger }) {
const logger = new BatchLogger(global.settings.batch_log_filename, 'batch-queries');
const jobPublisher = new JobPublisher(redisPool);
const jobQueue = new JobQueue(metadataBackend, jobPublisher, logger);
const jobBackend = new JobBackend(metadataBackend, jobQueue, logger);
const jobCanceller = new JobCanceller();
const jobService = new JobService(jobBackend, jobCanceller, logger);
this.healthCheckController = new HealthCheckController();
this.versionController = new VersionController();
this.jobsWipController = new JobsWipController({ jobService });
this.sqlRouter = new SqlRouter({
metadataBackend,
statsClient,
dataIngestionLogger,
jobService
});
}
route (app, routes) {
routes.forEach(route => {
const apiRouter = router({ mergeParams: true });
const paths = route.paths;
const middlewares = route.middlewares || [];
middlewares.forEach(middleware => apiRouter.use(middleware()));
// FIXME: version controller should be atached to the main entry point: "/"
// instead of "/api/:version" or "/user/:user/api/:version"
this.healthCheckController.route(apiRouter);
// FIXME: version controller should be atached to the main entry point: "/"
// instead of "/api/:version" or "/user/:user/api/:version"
this.versionController.route(apiRouter);
this.jobsWipController.route(apiRouter);
this.sqlRouter.route(apiRouter, route.sql);
paths.forEach(path => app.use(path, apiRouter));
});
}
};

View File

@ -0,0 +1,40 @@
'use strict';
const HealthCheckBackend = require('../monitoring/health-check');
module.exports = class HealthCheckController {
constructor () {
this.healthCheckBackend = new HealthCheckBackend(global.settings.disabled_file);
}
route (apiRouter) {
apiRouter.get('/health', healthCheck({ healthCheckBackend: this.healthCheckBackend }));
}
};
function healthCheck ({ healthCheckBackend }) {
return function healthCheckMiddleware (req, res) {
const healthConfig = global.settings.health || {};
if (!healthConfig.enabled) {
return res.status(200).send({enabled: false, ok: true});
}
const startTime = Date.now();
healthCheckBackend.check((err) => {
const ok = !err;
const response = {
enabled: true,
ok,
elapsed: Date.now() - startTime
};
if (err) {
response.err = err.message;
}
res.status(ok ? 200 : 503).send(response);
});
};
}

View File

@ -0,0 +1,39 @@
'use strict';
const bodyParser = require('./middlewares/body-parser');
const error = require('./middlewares/error');
module.exports = class JobsWipController {
constructor ({ jobService }) {
this.jobService = jobService;
}
route (apiRouter) {
apiRouter.get('/jobs-wip', [
bodyParser(),
listWorkInProgressJobs(this.jobService),
sendResponse(),
error()
]);
}
};
function listWorkInProgressJobs (jobService) {
return function listWorkInProgressJobsMiddleware (req, res, next) {
jobService.listWorkInProgressJobs((err, list) => {
if (err) {
return next(err);
}
res.body = list;
next();
});
};
}
function sendResponse () {
return function sendResponseMiddleware (req, res) {
res.status(res.statusCode || 200).send(res.body);
};
}

View File

@ -1,6 +1,6 @@
'use strict';
const pgEntitiesAccessValidator = require('../services/pg-entities-access-validator');
const pgEntitiesAccessValidator = require('../../services/pg-entities-access-validator');
module.exports = function accessValidator () {
return function accessValidatorMiddleware (req, res, next) {

View File

@ -1,6 +1,6 @@
'use strict';
const AuthApi = require('../auth/auth_api');
const AuthApi = require('../../auth/auth-api');
const basicAuth = require('basic-auth');
module.exports = function authorization (metadataBackend, forceToBeMaster = false) {

View File

@ -1,6 +1,6 @@
'use strict';
const getContentDisposition = require('../utils/content_disposition');
const getContentDisposition = require('../../utils/content-disposition');
module.exports = function content () {
return function contentMiddleware (req, res, next) {

View File

@ -21,6 +21,10 @@ module.exports = function cors(extraHeaders = []) {
res.header('Access-Control-Allow-Headers', headers.join(', '));
res.header('Access-Control-Expose-Headers', exposedHeaders.join(', '));
if (req.method === 'OPTIONS') {
return res.send();
}
next();
};
};

View File

@ -1,7 +1,7 @@
'use strict';
const errorHandlerFactory = require('../services/error_handler_factory');
const { stringifyForLogs } = require('../utils/logs');
const errorHandlerFactory = require('../../services/error-handler-factory');
const { stringifyForLogs } = require('../../utils/logs');
const MAX_ERROR_STRING_LENGTH = 1024;
module.exports = function error() {

View File

@ -1,6 +1,6 @@
'use strict';
const formats = require('../models/formats');
const formats = require('../../models/formats');
module.exports = function formatter () {
return function formatterMiddleware (req, res, next) {

View File

@ -1,6 +1,6 @@
'use strict';
const { stringifyForLogs } = require('../utils/logs');
const { stringifyForLogs } = require('../../utils/logs');
const MAX_SQL_LENGTH = (global.settings.logQueries && global.settings.maxQueriesLogLength) || 1024;

View File

@ -0,0 +1,27 @@
'use strict';
module.exports = function logger () {
if (!global.log4js) {
return function dummyLoggerMiddleware (req, res, next) {
next();
};
}
const options = {
level: 'info',
buffer: true,
// log4js provides a tokens solution as expess but in does not provide the request/response in the callback.
// Thus it is not possible to extract relevant information from them.
// This is a workaround to be able to access request/response.
format: function (req, res, format) {
const defaultFormat = ':remote-addr :method :req[Host]:url :status :response-time ms -> :res[Content-Type]';
const logFormat = global.settings.log_format || defaultFormat;
return format(logFormat);
}
};
const logger = global.log4js.getLogger();
return global.log4js.connectLogger(logger, options);
};

View File

@ -1,7 +1,7 @@
'use strict';
const sanitizeFilename = require('../utils/filename_sanitizer');
const formats = require('../models/formats');
const sanitizeFilename = require('../../utils/filename-sanitizer');
const formats = require('../../models/formats');
module.exports = function params ({ strategy = 'query' } = {}) {
const getParams = getParamsFromStrategy(strategy);

View File

@ -1,6 +1,19 @@
'use strict';
module.exports.initializeProfilerMiddleware = function initializeProfiler (label) {
const Profiler = require('../../stats/profiler-proxy');
module.exports = function profiler ({ statsClient }) {
return function profilerMiddleware (req, res, next) {
req.profiler = new Profiler({
profile: global.settings.useProfiler,
statsd_client: statsClient
});
next();
};
};
module.exports.initializeProfiler = function initializeProfiler (label) {
return function initializeProfilerMiddleware (req, res, next) {
if (req.profiler) {
req.profiler.start(`sqlapi.${label}`);
@ -10,7 +23,7 @@ module.exports.initializeProfilerMiddleware = function initializeProfiler (label
};
};
module.exports.finishProfilerMiddleware = function finishProfiler () {
module.exports.finishProfiler = function finishProfiler () {
return function finishProfilerMiddleware (req, res, next) {
if (req.profiler) {
req.profiler.end();

View File

@ -1,6 +1,6 @@
'use strict';
const queryMayWrite = require('../utils/query_may_write');
const queryMayWrite = require('../../utils/query-may-write');
module.exports = function mayWrite () {
return function mayWriteMiddleware (req, res, next) {

View File

@ -0,0 +1,13 @@
'use strict';
const os = require('os');
module.exports = function servedByHostHeader () {
const hostname = global.settings.api_hostname || os.hostname().split('.')[0];
return function servedByHostHeaderMiddleware (req, res, next) {
res.set('X-Served-By-Host', hostname);
next();
};
};

View File

@ -0,0 +1,18 @@
'use strict';
module.exports = function socketTimeout () {
if (!global.settings.hasOwnProperty('node_socket_timeout')) {
return function dummySocketTimeoutMiddleware (req, res, next) {
next();
};
}
const timeout = parseInt(global.settings.node_socket_timeout);
return function socketTimeoutMiddleware (req, res, next) {
// Set connection timeout
req.connection.setTimeout(timeout);
next();
};
};

View File

@ -1,8 +1,8 @@
'use strict';
const CdbRequest = require('../models/cartodb_request');
const CdbRequest = require('../../models/cartodb-request');
module.exports = function user(metadataBackend) {
module.exports = function user (metadataBackend) {
const cdbRequest = new CdbRequest();
return function userMiddleware (req, res, next) {
@ -23,16 +23,16 @@ module.exports = function user(metadataBackend) {
};
};
function getUserNameFromRequest(req, cdbRequest) {
function getUserNameFromRequest (req, cdbRequest) {
return cdbRequest.userByReq(req);
}
function checkUserExists(metadataBackend, userName, callback) {
function checkUserExists (metadataBackend, userName, callback) {
metadataBackend.getUserId(userName, function(err) {
callback(err, !err);
});
}
function errorUserNotFoundMessageTemplate(user) {
function errorUserNotFoundMessageTemplate (user) {
return `Sorry, we can't find CARTO user '${user}'. Please check that you have entered the correct domain.`;
}

View File

@ -1,65 +1,65 @@
'use strict';
const userMiddleware = require('../middlewares/user');
const errorMiddleware = require('../middlewares/error');
const authorizationMiddleware = require('../middlewares/authorization');
const connectionParamsMiddleware = require('../middlewares/connection-params');
const { initializeProfilerMiddleware } = require('../middlewares/profiler');
const rateLimitsMiddleware = require('../middlewares/rate-limit');
const dbQuotaMiddleware = require('../middlewares/db-quota');
const { RATE_LIMIT_ENDPOINTS_GROUPS } = rateLimitsMiddleware;
const errorHandlerFactory = require('../services/error_handler_factory');
const StreamCopy = require('../services/stream_copy');
const StreamCopyMetrics = require('../services/stream_copy_metrics');
const Throttler = require('../services/throttler-stream');
const user = require('../middlewares/user');
const error = require('../middlewares/error');
const authorization = require('../middlewares/authorization');
const connectionParams = require('../middlewares/connection-params');
const { initializeProfiler } = require('../middlewares/profiler');
const dbQuota = require('../middlewares/db-quota');
const bodyParser = require('../middlewares/body-parser');
const rateLimits = require('../middlewares/rate-limit');
const { RATE_LIMIT_ENDPOINTS_GROUPS } = rateLimits;
const errorHandlerFactory = require('../../services/error-handler-factory');
const StreamCopy = require('../../services/stream-copy');
const StreamCopyMetrics = require('../../services/stream-copy-metrics');
const Throttler = require('../../services/throttler-stream');
const zlib = require('zlib');
const { PassThrough } = require('stream');
const params = require('../middlewares/params');
const bodyParserMiddleware = require('../middlewares/body-parser');
function CopyController(metadataBackend, userDatabaseService, userLimitsService, logger) {
this.metadataBackend = metadataBackend;
this.userDatabaseService = userDatabaseService;
this.userLimitsService = userLimitsService;
this.logger = logger;
}
module.exports = class CopyController {
constructor (metadataBackend, userDatabaseService, userLimitsService, logger) {
this.metadataBackend = metadataBackend;
this.userDatabaseService = userDatabaseService;
this.userLimitsService = userLimitsService;
this.logger = logger;
}
CopyController.prototype.route = function (app) {
const { base_url } = global.settings;
route (sqlRouter) {
const copyFromMiddlewares = endpointGroup => {
return [
initializeProfiler('copyfrom'),
user(this.metadataBackend),
rateLimits(this.userLimitsService, endpointGroup),
authorization(this.metadataBackend),
connectionParams(this.userDatabaseService),
dbQuota(),
params({ strategy: 'copyfrom' }),
handleCopyFrom(this.logger),
errorHandler(this.logger),
error()
];
};
const copyFromMiddlewares = endpointGroup => {
return [
initializeProfilerMiddleware('copyfrom'),
userMiddleware(this.metadataBackend),
rateLimitsMiddleware(this.userLimitsService, endpointGroup),
authorizationMiddleware(this.metadataBackend),
connectionParamsMiddleware(this.userDatabaseService),
dbQuotaMiddleware(),
params({ strategy: 'copyfrom' }),
handleCopyFrom(this.logger),
errorHandler(this.logger),
errorMiddleware()
];
};
const copyToMiddlewares = endpointGroup => {
return [
bodyParser(),
initializeProfiler('copyto'),
user(this.metadataBackend),
rateLimits(this.userLimitsService, endpointGroup),
authorization(this.metadataBackend),
connectionParams(this.userDatabaseService),
params({ strategy: 'copyto' }),
handleCopyTo(this.logger),
errorHandler(this.logger),
error()
];
};
const copyToMiddlewares = endpointGroup => {
return [
bodyParserMiddleware(),
initializeProfilerMiddleware('copyto'),
userMiddleware(this.metadataBackend),
rateLimitsMiddleware(this.userLimitsService, endpointGroup),
authorizationMiddleware(this.metadataBackend),
connectionParamsMiddleware(this.userDatabaseService),
params({ strategy: 'copyto' }),
handleCopyTo(this.logger),
errorHandler(this.logger),
errorMiddleware()
];
};
app.post(`${base_url}/sql/copyfrom`, copyFromMiddlewares(RATE_LIMIT_ENDPOINTS_GROUPS.COPY_FROM));
app.get(`${base_url}/sql/copyto`, copyToMiddlewares(RATE_LIMIT_ENDPOINTS_GROUPS.COPY_TO));
app.post(`${base_url}/sql/copyto`, copyToMiddlewares(RATE_LIMIT_ENDPOINTS_GROUPS.COPY_TO));
sqlRouter.post('/copyfrom', copyFromMiddlewares(RATE_LIMIT_ENDPOINTS_GROUPS.COPY_FROM));
sqlRouter.get('/copyto', copyToMiddlewares(RATE_LIMIT_ENDPOINTS_GROUPS.COPY_TO));
sqlRouter.post('/copyto', copyToMiddlewares(RATE_LIMIT_ENDPOINTS_GROUPS.COPY_TO));
}
};
function handleCopyTo (logger) {
@ -180,5 +180,3 @@ function errorHandler (logger) {
}
};
}
module.exports = CopyController;

View File

@ -2,82 +2,73 @@
const util = require('util');
const bodyParserMiddleware = require('../middlewares/body-parser');
const userMiddleware = require('../middlewares/user');
const { initializeProfilerMiddleware, finishProfilerMiddleware } = require('../middlewares/profiler');
const authorizationMiddleware = require('../middlewares/authorization');
const connectionParamsMiddleware = require('../middlewares/connection-params');
const errorMiddleware = require('../middlewares/error');
const rateLimitsMiddleware = require('../middlewares/rate-limit');
const { RATE_LIMIT_ENDPOINTS_GROUPS } = rateLimitsMiddleware;
const bodyParser = require('../middlewares/body-parser');
const user = require('../middlewares/user');
const { initializeProfiler, finishProfiler } = require('../middlewares/profiler');
const authorization = require('../middlewares/authorization');
const connectionParams = require('../middlewares/connection-params');
const error = require('../middlewares/error');
const rateLimits = require('../middlewares/rate-limit');
const { RATE_LIMIT_ENDPOINTS_GROUPS } = rateLimits;
const params = require('../middlewares/params');
const logMiddleware = require('../middlewares/log');
const log = require('../middlewares/log');
function JobController(metadataBackend, userDatabaseService, jobService, statsdClient, userLimitsService) {
this.metadataBackend = metadataBackend;
this.userDatabaseService = userDatabaseService;
this.jobService = jobService;
this.statsdClient = statsdClient;
this.userLimitsService = userLimitsService;
}
module.exports = class JobController {
constructor (metadataBackend, userDatabaseService, jobService, statsdClient, userLimitsService) {
this.metadataBackend = metadataBackend;
this.userDatabaseService = userDatabaseService;
this.jobService = jobService;
this.statsdClient = statsdClient;
this.userLimitsService = userLimitsService;
}
module.exports = JobController;
route (sqlRouter) {
const jobMiddlewares = composeJobMiddlewares(
this.metadataBackend,
this.userDatabaseService,
this.jobService,
this.statsdClient,
this.userLimitsService
);
JobController.prototype.route = function (app) {
const { base_url } = global.settings;
const jobMiddlewares = composeJobMiddlewares(
this.metadataBackend,
this.userDatabaseService,
this.jobService,
this.statsdClient,
this.userLimitsService
);
sqlRouter.post('/job', [
bodyParser(),
checkBodyPayloadSize(),
params({ strategy: 'job' }),
log(log.TYPES.JOB),
jobMiddlewares('create', createJob, RATE_LIMIT_ENDPOINTS_GROUPS.JOB_CREATE)
]);
app.get(
`${base_url}/jobs-wip`,
bodyParserMiddleware(),
listWorkInProgressJobs(this.jobService),
sendResponse(),
errorMiddleware()
);
app.post(
`${base_url}/sql/job`,
bodyParserMiddleware(),
checkBodyPayloadSize(),
params({ strategy: 'job' }),
logMiddleware(logMiddleware.TYPES.JOB),
jobMiddlewares('create', createJob, RATE_LIMIT_ENDPOINTS_GROUPS.JOB_CREATE)
);
app.get(
`${base_url}/sql/job/:job_id`,
bodyParserMiddleware(),
jobMiddlewares('retrieve', getJob, RATE_LIMIT_ENDPOINTS_GROUPS.JOB_GET)
);
app.delete(
`${base_url}/sql/job/:job_id`,
bodyParserMiddleware(),
jobMiddlewares('cancel', cancelJob, RATE_LIMIT_ENDPOINTS_GROUPS.JOB_DELETE)
);
sqlRouter.get('/job/:job_id', [
bodyParser(),
jobMiddlewares('retrieve', getJob, RATE_LIMIT_ENDPOINTS_GROUPS.JOB_GET)
]);
sqlRouter.delete('/job/:job_id', [
bodyParser(),
jobMiddlewares('cancel', cancelJob, RATE_LIMIT_ENDPOINTS_GROUPS.JOB_DELETE)
]);
}
};
function composeJobMiddlewares (metadataBackend, userDatabaseService, jobService, statsdClient, userLimitsService) {
return function jobMiddlewares (action, jobMiddleware, endpointGroup) {
return function jobMiddlewares (action, job, endpointGroup) {
const forceToBeMaster = true;
return [
initializeProfilerMiddleware('job'),
userMiddleware(metadataBackend),
rateLimitsMiddleware(userLimitsService, endpointGroup),
authorizationMiddleware(metadataBackend, forceToBeMaster),
connectionParamsMiddleware(userDatabaseService),
jobMiddleware(jobService),
initializeProfiler('job'),
user(metadataBackend),
rateLimits(userLimitsService, endpointGroup),
authorization(metadataBackend, forceToBeMaster),
connectionParams(userDatabaseService),
job(jobService),
setServedByDBHostHeader(),
finishProfilerMiddleware(),
finishProfiler(),
logJobResult(action),
incrementSuccessMetrics(statsdClient),
sendResponse(),
incrementErrorMetrics(statsdClient),
errorMiddleware()
error()
];
};
}
@ -153,21 +144,6 @@ function createJob (jobService) {
};
}
function listWorkInProgressJobs (jobService) {
return function listWorkInProgressJobsMiddleware (req, res, next) {
jobService.listWorkInProgressJobs((err, list) => {
if (err) {
return next(err);
}
res.body = list;
next();
});
};
}
function checkBodyPayloadSize () {
return function checkBodyPayloadSizeMiddleware(req, res, next) {
const payload = JSON.stringify(req.body);

View File

@ -1,7 +1,7 @@
'use strict';
const bodyParser = require('../middlewares/body-parser');
const { initializeProfilerMiddleware: initializeProfiler } = require('../middlewares/profiler');
const { initializeProfiler } = require('../middlewares/profiler');
const user = require('../middlewares/user');
const rateLimits = require('../middlewares/rate-limit');
const authorization = require('../middlewares/authorization');
@ -32,8 +32,7 @@ module.exports = class QueryController {
this.userLimitsService = userLimitsService;
}
route (app) {
const { base_url } = global.settings;
route (sqlRouter) {
const forceToBeMaster = false;
const queryMiddlewares = () => {
@ -62,8 +61,8 @@ module.exports = class QueryController {
];
};
app.all(`${base_url}/sql`, queryMiddlewares());
app.all(`${base_url}/sql.:f`, queryMiddlewares());
sqlRouter.all('/', queryMiddlewares());
sqlRouter.all('.:f', queryMiddlewares());
}
};

73
lib/api/sql/sql-router.js Normal file
View File

@ -0,0 +1,73 @@
'use strict';
const { Router: router } = require('express');
const UserDatabaseService = require('../../services/user-database-service');
const UserLimitsService = require('../../services/user-limits');
const socketTimeout = require('../middlewares/socket-timeout');
const logger = require('../middlewares/logger');
const profiler = require('../middlewares/profiler');
const cors = require('../middlewares/cors');
const servedByHostHeader = require('../middlewares/served-by-host-header');
const QueryController = require('./query-controller');
const CopyController = require('./copy-controller');
const JobController = require('./job-controller');
module.exports = class SqlRouter {
constructor ({ metadataBackend, statsClient, dataIngestionLogger, jobService }) {
const userLimitsServiceOptions = {
limits: {
rateLimitsEnabled: global.settings.ratelimits.rateLimitsEnabled
}
};
const userDatabaseService = new UserDatabaseService(metadataBackend);
const userLimitsService = new UserLimitsService(metadataBackend, userLimitsServiceOptions);
this.queryController = new QueryController(
metadataBackend,
userDatabaseService,
statsClient,
userLimitsService
);
this.copyController = new CopyController(
metadataBackend,
userDatabaseService,
userLimitsService,
dataIngestionLogger
);
this.jobController = new JobController(
metadataBackend,
userDatabaseService,
jobService,
statsClient,
userLimitsService
);
}
route (apiRouter, routes) {
routes.forEach(route => {
const sqlRouter = router({ mergeParams: true });
const paths = route.paths;
const middlewares = route.middlewares || [];
middlewares.forEach(middleware => sqlRouter.use(middleware()));
sqlRouter.use(socketTimeout());
sqlRouter.use(logger());
sqlRouter.use(profiler({ statsClient: this.statsClient }));
sqlRouter.use(cors());
sqlRouter.use(servedByHostHeader());
this.queryController.route(sqlRouter);
this.copyController.route(sqlRouter);
this.jobController.route(sqlRouter);
paths.forEach(path => apiRouter.use(path, sqlRouter));
});
}
};

View File

@ -0,0 +1,17 @@
'use strict';
const versions = {
cartodb_sql_api: require('./../../package.json').version
};
module.exports = class VersionController {
route (apiRouter) {
apiRouter.get('/version', version());
}
};
function version () {
return function versionMiddleware (req, res) {
res.send(versions);
};
}

View File

@ -4,7 +4,7 @@
var _ = require('underscore');
var OAuthUtil = require('oauth-client');
var step = require('step');
var CdbRequest = require('../models/cartodb_request');
var CdbRequest = require('../models/cartodb-request');
var cdbReq = new CdbRequest();
var oAuth = (function(){

View File

@ -1,6 +1,6 @@
'use strict';
const Logger = require('../app/services/logger');
const Logger = require('../services/logger');
class BatchLogger extends Logger {
constructor (path, name) {
@ -10,7 +10,6 @@ class BatchLogger extends Logger {
log (job) {
return job.log(this.logger);
}
}
module.exports = BatchLogger;

View File

@ -1,14 +1,14 @@
'use strict';
var JobRunner = require('./job_runner');
var QueryRunner = require('./query_runner');
var JobCanceller = require('./job_canceller');
var JobRunner = require('./job-runner');
var QueryRunner = require('./query-runner');
var JobCanceller = require('./job-canceller');
var JobSubscriber = require('./pubsub/job-subscriber');
var UserDatabaseMetadataService = require('./user_database_metadata_service');
var UserDatabaseMetadataService = require('./user-database-metadata-service');
var JobPublisher = require('./pubsub/job-publisher');
var JobQueue = require('./job_queue');
var JobBackend = require('./job_backend');
var JobService = require('./job_service');
var JobQueue = require('./job-queue');
var JobBackend = require('./job-backend');
var JobService = require('./job-service');
var BatchLogger = require('./batch-logger');
var Batch = require('./batch');

View File

@ -2,7 +2,7 @@
var REDIS_PREFIX = 'batch:jobs:';
var REDIS_DB = 5;
var JobStatus = require('./job_status');
var JobStatus = require('./job-status');
var queue = require('queue-async');
function JobBackend(metadataBackend, jobQueue, logger) {

View File

@ -1,7 +1,7 @@
'use strict';
var errorCodes = require('../app/postgresql/error_codes').codeToCondition;
var jobStatus = require('./job_status');
var errorCodes = require('../postgresql/error-codes').codeToCondition;
var jobStatus = require('./job-status');
var Profiler = require('step-profiler');
var _ = require('underscore');

View File

@ -1,7 +1,7 @@
'use strict';
var JobFactory = require('./models/job_factory');
var jobStatus = require('./job_status');
var JobFactory = require('./models/job-factory');
var jobStatus = require('./job-status');
function JobService(jobBackend, jobCanceller, logger) {
this.jobBackend = jobBackend;

View File

@ -2,8 +2,8 @@
var util = require('util');
var uuid = require('node-uuid');
var JobStateMachine = require('./job_state_machine');
var jobStatus = require('../job_status');
var JobStateMachine = require('./job-state-machine');
var jobStatus = require('../job-status');
var mandatoryProperties = [
'job_id',
'status',

View File

@ -1,8 +1,8 @@
'use strict';
var JobSimple = require('./job_simple');
var JobMultiple = require('./job_multiple');
var JobFallback = require('./job_fallback');
var JobSimple = require('./job-simple');
var JobMultiple = require('./job-multiple');
var JobFallback = require('./job-fallback');
var Models = [ JobSimple, JobMultiple, JobFallback ];

View File

@ -1,11 +1,11 @@
'use strict';
var util = require('util');
var JobBase = require('./job_base');
var JobStatus = require('../job_status');
var QueryFallback = require('./query/query_fallback');
var MainFallback = require('./query/main_fallback');
var QueryFactory = require('./query/query_factory');
var JobBase = require('./job-base');
var JobStatus = require('../job-status');
var QueryFallback = require('./query/query-fallback');
var MainFallback = require('./query/main-fallback');
var QueryFactory = require('./query/query-factory');
function JobFallback(jobDefinition) {
JobBase.call(this, jobDefinition);

View File

@ -1,8 +1,8 @@
'use strict';
var util = require('util');
var JobBase = require('./job_base');
var jobStatus = require('../job_status');
var JobBase = require('./job-base');
var jobStatus = require('../job-status');
function JobMultiple(jobDefinition) {
JobBase.call(this, jobDefinition);

View File

@ -1,8 +1,8 @@
'use strict';
var util = require('util');
var JobBase = require('./job_base');
var jobStatus = require('../job_status');
var JobBase = require('./job-base');
var jobStatus = require('../job-status');
function JobSimple(jobDefinition) {
JobBase.call(this, jobDefinition);

View File

@ -1,7 +1,7 @@
'use strict';
var assert = require('assert');
var JobStatus = require('../job_status');
var JobStatus = require('../job-status');
var validStatusTransitions = [
[JobStatus.PENDING, JobStatus.RUNNING],
[JobStatus.PENDING, JobStatus.CANCELLED],

View File

@ -1,8 +1,8 @@
'use strict';
var util = require('util');
var QueryBase = require('./query_base');
var jobStatus = require('../../job_status');
var QueryBase = require('./query-base');
var jobStatus = require('../../job-status');
function Fallback(index) {
QueryBase.call(this, index);

View File

@ -1,8 +1,8 @@
'use strict';
var util = require('util');
var QueryBase = require('./query_base');
var jobStatus = require('../../job_status');
var QueryBase = require('./query-base');
var jobStatus = require('../../job-status');
function MainFallback() {
QueryBase.call(this);

View File

@ -1,7 +1,7 @@
'use strict';
var util = require('util');
var JobStateMachine = require('../job_state_machine');
var JobStateMachine = require('../job-state-machine');
function QueryBase(index) {
JobStateMachine.call(this);

View File

@ -1,6 +1,6 @@
'use strict';
var QueryFallback = require('./query_fallback');
var QueryFallback = require('./query-fallback');
function QueryFactory() {
}

View File

@ -1,10 +1,10 @@
'use strict';
var util = require('util');
var QueryBase = require('./query_base');
var QueryBase = require('./query-base');
var Query = require('./query');
var Fallback = require('./fallback');
var jobStatus = require('../../job_status');
var jobStatus = require('../../job-status');
function QueryFallback(job, index) {
QueryBase.call(this, index);

View File

@ -1,8 +1,8 @@
'use strict';
var util = require('util');
var QueryBase = require('./query_base');
var jobStatus = require('../../job_status');
var QueryBase = require('./query-base');
var jobStatus = require('../../job-status');
function Query(index) {
QueryBase.call(this, index);

View File

@ -3,7 +3,7 @@
var _ = require('underscore');
var Pg = require('./../pg');
var ArrayBufferSer = require("../../bin_encoder");
var ArrayBufferSer = require("../../bin-encoder");
function BinaryFormat() {}

View File

@ -2,8 +2,8 @@
var _ = require('underscore');
var Pg = require('./../pg');
const errorHandlerFactory = require('../../../services/error_handler_factory');
var Pg = require('./../pg');
const errorHandlerFactory = require('../../../services/error-handler-factory');
function GeoJsonFormat() {
this.buffer = '';

View File

@ -3,7 +3,7 @@
var _ = require('underscore');
var Pg = require('./../pg');
const errorHandlerFactory = require('../../../services/error_handler_factory');
const errorHandlerFactory = require('../../../services/error-handler-factory');
function JsonFormat() {
this.buffer = '';

View File

@ -1,5 +1,7 @@
'use strict';
// jshint ignore:start
var Pg = require('./../pg');
var _ = require('underscore');
var geojson = require('./geojson');
@ -133,6 +135,6 @@ TopoJsonFormat.prototype.cancel = function() {
}
};
module.exports = TopoJsonFormat;
// jshint ignore:end

Some files were not shown because too many files have changed in this diff Show More