Create sql-router to encapsulate common stuff from api-router. Extract WIP jobs from job-controller and attatch it to api-router.

Note: it modifies routes structure in configuration.
This commit is contained in:
Daniel García Aubert 2019-10-02 16:02:13 +02:00
parent 2951f07ae5
commit 51ea2b066d
13 changed files with 192 additions and 140 deletions

View File

@ -2,8 +2,11 @@
const { Router: router } = require('express'); const { Router: router } = require('express');
const UserDatabaseService = require('../services/user_database_service'); const SqlRouter = require('./sql-router');
const UserLimitsService = require('../services/user_limits');
const HealthCheckController = require('./health_check_controller');
const VersionController = require('./version_controller');
const JobsWipController = require('./jobs_wip_controller');
const BatchLogger = require('../../batch/batch-logger'); const BatchLogger = require('../../batch/batch-logger');
@ -13,44 +16,8 @@ const JobBackend = require('../../batch/job_backend');
const JobCanceller = require('../../batch/job_canceller'); const JobCanceller = require('../../batch/job_canceller');
const JobService = require('../../batch/job_service'); const JobService = require('../../batch/job_service');
const QueryController = require('./query_controller');
const CopyController = require('./copy_controller');
const JobController = require('./job_controller');
const socketTimeout = require('../middlewares/socket-timeout');
const logger = require('../middlewares/logger');
const profiler = require('../middlewares/profiler');
const cors = require('../middlewares/cors');
const servedByHostHeader = require('../middlewares/served-by-host-header');
module.exports = class ApiRouter { module.exports = class ApiRouter {
constructor ({ routes, redisPool, metadataBackend, statsClient, dataIngestionLogger }) { constructor ({ redisPool, metadataBackend, statsClient, dataIngestionLogger }) {
this.routes = routes;
this.statsClient = statsClient;
const userLimitsServiceOptions = {
limits: {
rateLimitsEnabled: global.settings.ratelimits.rateLimitsEnabled
}
};
const userDatabaseService = new UserDatabaseService(metadataBackend);
const userLimitsService = new UserLimitsService(metadataBackend, userLimitsServiceOptions);
this.queryController = new QueryController(
metadataBackend,
userDatabaseService,
statsClient,
userLimitsService
);
this.copyController = new CopyController(
metadataBackend,
userDatabaseService,
userLimitsService,
dataIngestionLogger
);
const logger = new BatchLogger(global.settings.batch_log_filename, 'batch-queries'); const logger = new BatchLogger(global.settings.batch_log_filename, 'batch-queries');
const jobPublisher = new JobPublisher(redisPool); const jobPublisher = new JobPublisher(redisPool);
const jobQueue = new JobQueue(metadataBackend, jobPublisher, logger); const jobQueue = new JobQueue(metadataBackend, jobPublisher, logger);
@ -58,17 +25,20 @@ module.exports = class ApiRouter {
const jobCanceller = new JobCanceller(); const jobCanceller = new JobCanceller();
const jobService = new JobService(jobBackend, jobCanceller, logger); const jobService = new JobService(jobBackend, jobCanceller, logger);
this.jobController = new JobController( this.healthCheckController = new HealthCheckController();
this.versionController = new VersionController();
this.jobsWipController = new JobsWipController({ jobService });
this.sqlRouter = new SqlRouter({
metadataBackend, metadataBackend,
userDatabaseService,
jobService,
statsClient, statsClient,
userLimitsService dataIngestionLogger,
); jobService
});
} }
route (app) { route (app, routes) {
Object.values(this.routes).forEach(route => { routes.forEach(route => {
const apiRouter = router({ mergeParams: true }); const apiRouter = router({ mergeParams: true });
const paths = route.paths; const paths = route.paths;
@ -76,15 +46,17 @@ module.exports = class ApiRouter {
middlewares.forEach(middleware => apiRouter.use(middleware())); middlewares.forEach(middleware => apiRouter.use(middleware()));
apiRouter.use(socketTimeout()); // FIXME: version controller should be atached to the main entry point: "/"
apiRouter.use(logger()); // instead of "/api/:version" or "/user/:user/api/:version"
apiRouter.use(profiler({ statsClient: this.statsClient })); this.healthCheckController.route(apiRouter);
apiRouter.use(cors());
apiRouter.use(servedByHostHeader());
this.queryController.route(apiRouter); // FIXME: version controller should be atached to the main entry point: "/"
this.copyController.route(apiRouter); // instead of "/api/:version" or "/user/:user/api/:version"
this.jobController.route(apiRouter); this.versionController.route(apiRouter);
this.jobsWipController.route(apiRouter);
this.sqlRouter.route(apiRouter, route.sql);
paths.forEach(path => app.use(path, apiRouter)); paths.forEach(path => app.use(path, apiRouter));
}); });

View File

@ -25,7 +25,7 @@ module.exports = class CopyController {
this.logger = logger; this.logger = logger;
} }
route (apiRouter) { route (sqlRouter) {
const copyFromMiddlewares = endpointGroup => { const copyFromMiddlewares = endpointGroup => {
return [ return [
initializeProfilerMiddleware('copyfrom'), initializeProfilerMiddleware('copyfrom'),
@ -56,9 +56,9 @@ module.exports = class CopyController {
]; ];
}; };
apiRouter.post('/sql/copyfrom', copyFromMiddlewares(RATE_LIMIT_ENDPOINTS_GROUPS.COPY_FROM)); sqlRouter.post('/copyfrom', copyFromMiddlewares(RATE_LIMIT_ENDPOINTS_GROUPS.COPY_FROM));
apiRouter.get('/sql/copyto', copyToMiddlewares(RATE_LIMIT_ENDPOINTS_GROUPS.COPY_TO)); sqlRouter.get('/copyto', copyToMiddlewares(RATE_LIMIT_ENDPOINTS_GROUPS.COPY_TO));
apiRouter.post('/sql/copyto', copyToMiddlewares(RATE_LIMIT_ENDPOINTS_GROUPS.COPY_TO)); sqlRouter.post('/copyto', copyToMiddlewares(RATE_LIMIT_ENDPOINTS_GROUPS.COPY_TO));
} }
}; };

View File

@ -3,17 +3,12 @@
const HealthCheckBackend = require('../monitoring/health_check'); const HealthCheckBackend = require('../monitoring/health_check');
module.exports = class HealthCheckController { module.exports = class HealthCheckController {
constructor ({ routes }) { constructor () {
this.routes = routes;
this.healthCheckBackend = new HealthCheckBackend(global.settings.disabled_file); this.healthCheckBackend = new HealthCheckBackend(global.settings.disabled_file);
} }
route (app) { route (apiRouter) {
const paths = this.routes.paths || []; apiRouter.get('/health', healthCheck({ healthCheckBackend: this.healthCheckBackend }));
paths.forEach(path => app.get(`${path}/health`, healthCheck({
healthCheckBackend: this.healthCheckBackend
})));
} }
}; };

View File

@ -22,7 +22,7 @@ module.exports = class JobController {
this.userLimitsService = userLimitsService; this.userLimitsService = userLimitsService;
} }
route (apiRouter) { route (sqlRouter) {
const jobMiddlewares = composeJobMiddlewares( const jobMiddlewares = composeJobMiddlewares(
this.metadataBackend, this.metadataBackend,
this.userDatabaseService, this.userDatabaseService,
@ -31,34 +31,23 @@ module.exports = class JobController {
this.userLimitsService this.userLimitsService
); );
apiRouter.get( sqlRouter.post('/job', [
'/jobs-wip',
bodyParserMiddleware(),
listWorkInProgressJobs(this.jobService),
sendResponse(),
errorMiddleware()
);
apiRouter.post(
'/sql/job',
bodyParserMiddleware(), bodyParserMiddleware(),
checkBodyPayloadSize(), checkBodyPayloadSize(),
params({ strategy: 'job' }), params({ strategy: 'job' }),
logMiddleware(logMiddleware.TYPES.JOB), logMiddleware(logMiddleware.TYPES.JOB),
jobMiddlewares('create', createJob, RATE_LIMIT_ENDPOINTS_GROUPS.JOB_CREATE) jobMiddlewares('create', createJob, RATE_LIMIT_ENDPOINTS_GROUPS.JOB_CREATE)
); ]);
apiRouter.get( sqlRouter.get('/job/:job_id', [
'/sql/job/:job_id',
bodyParserMiddleware(), bodyParserMiddleware(),
jobMiddlewares('retrieve', getJob, RATE_LIMIT_ENDPOINTS_GROUPS.JOB_GET) jobMiddlewares('retrieve', getJob, RATE_LIMIT_ENDPOINTS_GROUPS.JOB_GET)
); ]);
apiRouter.delete( sqlRouter.delete('/job/:job_id', [
'/sql/job/:job_id',
bodyParserMiddleware(), bodyParserMiddleware(),
jobMiddlewares('cancel', cancelJob, RATE_LIMIT_ENDPOINTS_GROUPS.JOB_DELETE) jobMiddlewares('cancel', cancelJob, RATE_LIMIT_ENDPOINTS_GROUPS.JOB_DELETE)
); ]);
} }
}; };
@ -155,21 +144,6 @@ function createJob (jobService) {
}; };
} }
function listWorkInProgressJobs (jobService) {
return function listWorkInProgressJobsMiddleware (req, res, next) {
jobService.listWorkInProgressJobs((err, list) => {
if (err) {
return next(err);
}
res.body = list;
next();
});
};
}
function checkBodyPayloadSize () { function checkBodyPayloadSize () {
return function checkBodyPayloadSizeMiddleware(req, res, next) { return function checkBodyPayloadSizeMiddleware(req, res, next) {
const payload = JSON.stringify(req.body); const payload = JSON.stringify(req.body);

View File

@ -0,0 +1,39 @@
'use strict';
const bodyParserMiddleware = require('../middlewares/body-parser');
const errorMiddleware = require('../middlewares/error');
module.exports = class JobsWipController {
constructor ({ jobService }) {
this.jobService = jobService;
}
route (apiRouter) {
apiRouter.get('/jobs-wip', [
bodyParserMiddleware(),
listWorkInProgressJobs(this.jobService),
sendResponse(),
errorMiddleware()
]);
}
};
function listWorkInProgressJobs (jobService) {
return function listWorkInProgressJobsMiddleware (req, res, next) {
jobService.listWorkInProgressJobs((err, list) => {
if (err) {
return next(err);
}
res.body = list;
next();
});
};
}
function sendResponse () {
return function sendResponseMiddleware (req, res) {
res.status(res.statusCode || 200).send(res.body);
};
}

View File

@ -32,7 +32,7 @@ module.exports = class QueryController {
this.userLimitsService = userLimitsService; this.userLimitsService = userLimitsService;
} }
route (apiRouter) { route (sqlRouter) {
const forceToBeMaster = false; const forceToBeMaster = false;
const queryMiddlewares = () => { const queryMiddlewares = () => {
@ -61,8 +61,8 @@ module.exports = class QueryController {
]; ];
}; };
apiRouter.all('/sql', queryMiddlewares()); sqlRouter.all('/', queryMiddlewares());
apiRouter.all('/sql.:f', queryMiddlewares()); sqlRouter.all('.:f', queryMiddlewares());
} }
}; };

View File

@ -0,0 +1,75 @@
'use strict';
const { Router: router } = require('express');
const UserDatabaseService = require('../services/user_database_service');
const UserLimitsService = require('../services/user_limits');
const socketTimeout = require('../middlewares/socket-timeout');
const logger = require('../middlewares/logger');
const profiler = require('../middlewares/profiler');
const cors = require('../middlewares/cors');
const servedByHostHeader = require('../middlewares/served-by-host-header');
const QueryController = require('./query_controller');
const CopyController = require('./copy_controller');
const JobController = require('./job_controller');
module.exports = class SqlRouter {
constructor ({ routes, metadataBackend, statsClient, dataIngestionLogger, jobService }) {
this.routes = routes;
const userLimitsServiceOptions = {
limits: {
rateLimitsEnabled: global.settings.ratelimits.rateLimitsEnabled
}
};
const userDatabaseService = new UserDatabaseService(metadataBackend);
const userLimitsService = new UserLimitsService(metadataBackend, userLimitsServiceOptions);
this.queryController = new QueryController(
metadataBackend,
userDatabaseService,
statsClient,
userLimitsService
);
this.copyController = new CopyController(
metadataBackend,
userDatabaseService,
userLimitsService,
dataIngestionLogger
);
this.jobController = new JobController(
metadataBackend,
userDatabaseService,
jobService,
statsClient,
userLimitsService
);
}
route (apiRouter, routes) {
routes.forEach(route => {
const sqlRouter = router({ mergeParams: true });
const paths = route.paths;
const middlewares = route.middlewares || [];
middlewares.forEach(middleware => sqlRouter.use(middleware()));
sqlRouter.use(socketTimeout());
sqlRouter.use(logger());
sqlRouter.use(profiler({ statsClient: this.statsClient }));
sqlRouter.use(cors());
sqlRouter.use(servedByHostHeader());
this.queryController.route(sqlRouter);
this.copyController.route(sqlRouter);
this.jobController.route(sqlRouter);
paths.forEach(path => apiRouter.use(path, sqlRouter));
});
}
};

View File

@ -5,14 +5,8 @@ const versions = {
}; };
module.exports = class VersionController { module.exports = class VersionController {
constructor ({ routes }) { route (apiRouter) {
this.routes = routes; apiRouter.get('/version', version());
}
route (app) {
const paths = this.routes.paths || [];
paths.forEach(path => app.get(`${path}/version`, version()));
} }
}; };

View File

@ -7,8 +7,6 @@ const RedisPool = require('redis-mpool');
const cartodbRedis = require('cartodb-redis'); const cartodbRedis = require('cartodb-redis');
const Logger = require('./services/logger'); const Logger = require('./services/logger');
const ApiRouter = require('./controllers/api-router'); const ApiRouter = require('./controllers/api-router');
const HealthCheckController = require('./controllers/health_check_controller');
const VersionController = require('./controllers/version_controller');
const batchFactory = require('../batch'); const batchFactory = require('../batch');
process.env.PGAPPNAME = process.env.PGAPPNAME || 'cartodb_sqlapi'; process.env.PGAPPNAME = process.env.PGAPPNAME || 'cartodb_sqlapi';
@ -61,28 +59,13 @@ module.exports = function createServer (statsClient) {
const dataIngestionLogger = new Logger(global.settings.dataIngestionLogPath, 'data-ingestion'); const dataIngestionLogger = new Logger(global.settings.dataIngestionLogPath, 'data-ingestion');
app.dataIngestionLogger = dataIngestionLogger; app.dataIngestionLogger = dataIngestionLogger;
// FIXME: health controller should be atached to the main entry point: "/"
// instead of "/api/v1/" or "/user/:user/api/:version"
const healthCheckController = new HealthCheckController({
routes: global.settings.routes.api
});
healthCheckController.route(app);
// FIXME: version controller should be atached to the main entry point: "/"
// instead of "/api/:version" or "/user/:user/api/:version"
const versionController = new VersionController({
routes: global.settings.routes.api
});
versionController.route(app);
const apiRouter = new ApiRouter({ const apiRouter = new ApiRouter({
routes: global.settings.routes,
redisPool, redisPool,
metadataBackend, metadataBackend,
statsClient, statsClient,
dataIngestionLogger dataIngestionLogger
}); });
apiRouter.route(app); apiRouter.route(app, global.settings.routes.api);
const isBatchProcess = process.argv.indexOf('--no-batch') === -1; const isBatchProcess = process.argv.indexOf('--no-batch') === -1;

View File

@ -2,7 +2,7 @@
// Disable by using <=0 value. // Disable by using <=0 value.
module.exports.gc_interval = 10000; module.exports.gc_interval = 10000;
module.exports.routes = { module.exports.routes = {
api: { api: [{
paths: [ paths: [
// In case the path has a :user param the username will be the one specified in the URL, // In case the path has a :user param the username will be the one specified in the URL,
// otherwise it will fallback to extract the username from the host header. // otherwise it will fallback to extract the username from the host header.
@ -17,8 +17,13 @@ module.exports.routes = {
next(); next();
} }
} }
] ],
} sql: [{
paths: [
'/sql'
]
}]
}]
}; };
// If useProfiler is true every response will be served with an // If useProfiler is true every response will be served with an
// X-SQLAPI-Profile header containing elapsed timing for various // X-SQLAPI-Profile header containing elapsed timing for various

View File

@ -2,7 +2,7 @@
// Disable by using <=0 value. // Disable by using <=0 value.
module.exports.gc_interval = 10000; module.exports.gc_interval = 10000;
module.exports.routes = { module.exports.routes = {
api: { api: [{
paths: [ paths: [
// In case the path has a :user param the username will be the one specified in the URL, // In case the path has a :user param the username will be the one specified in the URL,
// otherwise it will fallback to extract the username from the host header. // otherwise it will fallback to extract the username from the host header.
@ -17,8 +17,13 @@ module.exports.routes = {
next(); next();
} }
} }
] ],
} sql: [{
paths: [
'/sql'
]
}]
}]
}; };
// If useProfiler is true every response will be served with an // If useProfiler is true every response will be served with an
// X-SQLAPI-Profile header containing elapsed timing for various // X-SQLAPI-Profile header containing elapsed timing for various

View File

@ -2,7 +2,7 @@
// Disable by using <=0 value. // Disable by using <=0 value.
module.exports.gc_interval = 10000; module.exports.gc_interval = 10000;
module.exports.routes = { module.exports.routes = {
api: { api: [{
paths: [ paths: [
// In case the path has a :user param the username will be the one specified in the URL, // In case the path has a :user param the username will be the one specified in the URL,
// otherwise it will fallback to extract the username from the host header. // otherwise it will fallback to extract the username from the host header.
@ -17,8 +17,13 @@ module.exports.routes = {
next(); next();
} }
} }
] ],
} sql: [{
paths: [
'/sql'
]
}]
}]
}; };
// If useProfiler is true every response will be served with an // If useProfiler is true every response will be served with an
// X-SQLAPI-Profile header containing elapsed timing for various // X-SQLAPI-Profile header containing elapsed timing for various

View File

@ -2,7 +2,7 @@
// Disable by using <=0 value. // Disable by using <=0 value.
module.exports.gc_interval = 10000; module.exports.gc_interval = 10000;
module.exports.routes = { module.exports.routes = {
api: { api: [{
paths: [ paths: [
// In case the path has a :user param the username will be the one specified in the URL, // In case the path has a :user param the username will be the one specified in the URL,
// otherwise it will fallback to extract the username from the host header. // otherwise it will fallback to extract the username from the host header.
@ -17,8 +17,13 @@ module.exports.routes = {
next(); next();
} }
} }
] ],
} sql: [{
paths: [
'/sql'
]
}]
}]
}; };
// If useProfiler is true every response will be served with an // If useProfiler is true every response will be served with an
// X-SQLAPI-Profile header containing elapsed timing for various // X-SQLAPI-Profile header containing elapsed timing for various