From f5ca879ce396b5c64ab2d85698f03553e0829cbf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Garc=C3=ADa=20Aubert?= Date: Thu, 10 Dec 2015 15:08:31 +0100 Subject: [PATCH] Added entry point for main app to batch api --- app/app.js | 4 ++++ batch/batch_launcher.js | 6 +++++- batch/batch_manager.js | 4 ++-- batch/job_service.js | 7 +++++-- 4 files changed, 16 insertions(+), 5 deletions(-) diff --git a/app/app.js b/app/app.js index b339bade..3148ceae 100644 --- a/app/app.js +++ b/app/app.js @@ -28,6 +28,8 @@ var CacheStatusController = require('./controllers/cache_status_controller'); var HealthCheckController = require('./controllers/health_check_controller'); var VersionController = require('./controllers/version_controller'); +var batchService = require('../batch'); + process.env.PGAPPNAME = process.env.PGAPPNAME || 'cartodb_sqlapi'; // override Date.toJSON @@ -178,6 +180,8 @@ function App() { var versionController = new VersionController(); versionController.route(app); + batchService(5000, 100); + return app; } diff --git a/batch/batch_launcher.js b/batch/batch_launcher.js index 66686ad3..8c03e0f3 100644 --- a/batch/batch_launcher.js +++ b/batch/batch_launcher.js @@ -10,7 +10,11 @@ BatchLauncher.prototype.start = function (interval) { interval = this.batchInterval || interval || 5000; this.intervalCallback = setInterval(function () { - self.batchManager.run(); + self.batchManager.run(function (err) { + if (err) { + console.log('Error in batch service: ', err); + } + }); }, interval); }; diff --git a/batch/batch_manager.js b/batch/batch_manager.js index 5aa5fd79..1f36c313 100644 --- a/batch/batch_manager.js +++ b/batch/batch_manager.js @@ -16,7 +16,7 @@ BatchManager.prototype.run = function (callback) { } if (!username) { - return callback(new Error('No jobs scheduled')); + return callback(); // no jobs scheduled } self.userDatabaseMetadataService.getUserMetadata(username, function (err, userDatabaseMetadata) { @@ -31,11 +31,11 @@ BatchManager.prototype.run = function (callback) { self.jobService.run(userDatabaseMetadata, function (err) { if (err) { - callback(err); self.usernameQueue.enqueue(username, function (err) { if (err) { callback(err); } + callback(); }); } diff --git a/batch/job_service.js b/batch/job_service.js index a7b7f6a4..e6a415e1 100644 --- a/batch/job_service.js +++ b/batch/job_service.js @@ -22,20 +22,23 @@ JobService.prototype.run = function (userDatabaseMetada, callback) { self.runJob(pg, job, function (err, jobResult) { if (err) { + self.setJobFailed(pg, job, err.message, function (err) { if (err) { return callback(err); } callback(null, jobResult); }); + } else { + self.setJobDone(pg, job, function (err) { if (err) { return callback(err); } - console.info('Job %s done successfully', job.job_id); callback(null, jobResult); }); + } }); }); @@ -46,7 +49,7 @@ JobService.prototype.runJob = function (pg, job, callback) { var query = job.query; if (job.query.match(/SELECT\s.*FROM\s.*/i)) { - query = 'SELECT * INTO job_' + job.job_id.replace(/-/g, '_') + ' FROM (' + job.query + ') as q'; + query = 'SELECT * INTO "job_' + job.job_id + '" FROM (' + job.query + ') AS j'; } pg.query(query, function (err, jobResult) {