Added entry point for main app to batch api

This commit is contained in:
Daniel García Aubert 2015-12-10 15:08:31 +01:00
parent 78ee92dbe5
commit f5ca879ce3
4 changed files with 16 additions and 5 deletions

View File

@ -28,6 +28,8 @@ var CacheStatusController = require('./controllers/cache_status_controller');
var HealthCheckController = require('./controllers/health_check_controller'); var HealthCheckController = require('./controllers/health_check_controller');
var VersionController = require('./controllers/version_controller'); var VersionController = require('./controllers/version_controller');
var batchService = require('../batch');
process.env.PGAPPNAME = process.env.PGAPPNAME || 'cartodb_sqlapi'; process.env.PGAPPNAME = process.env.PGAPPNAME || 'cartodb_sqlapi';
// override Date.toJSON // override Date.toJSON
@ -178,6 +180,8 @@ function App() {
var versionController = new VersionController(); var versionController = new VersionController();
versionController.route(app); versionController.route(app);
batchService(5000, 100);
return app; return app;
} }

View File

@ -10,7 +10,11 @@ BatchLauncher.prototype.start = function (interval) {
interval = this.batchInterval || interval || 5000; interval = this.batchInterval || interval || 5000;
this.intervalCallback = setInterval(function () { this.intervalCallback = setInterval(function () {
self.batchManager.run(); self.batchManager.run(function (err) {
if (err) {
console.log('Error in batch service: ', err);
}
});
}, interval); }, interval);
}; };

View File

@ -16,7 +16,7 @@ BatchManager.prototype.run = function (callback) {
} }
if (!username) { if (!username) {
return callback(new Error('No jobs scheduled')); return callback(); // no jobs scheduled
} }
self.userDatabaseMetadataService.getUserMetadata(username, function (err, userDatabaseMetadata) { self.userDatabaseMetadataService.getUserMetadata(username, function (err, userDatabaseMetadata) {
@ -31,11 +31,11 @@ BatchManager.prototype.run = function (callback) {
self.jobService.run(userDatabaseMetadata, function (err) { self.jobService.run(userDatabaseMetadata, function (err) {
if (err) { if (err) {
callback(err);
self.usernameQueue.enqueue(username, function (err) { self.usernameQueue.enqueue(username, function (err) {
if (err) { if (err) {
callback(err); callback(err);
} }
callback();
}); });
} }

View File

@ -22,20 +22,23 @@ JobService.prototype.run = function (userDatabaseMetada, callback) {
self.runJob(pg, job, function (err, jobResult) { self.runJob(pg, job, function (err, jobResult) {
if (err) { if (err) {
self.setJobFailed(pg, job, err.message, function (err) { self.setJobFailed(pg, job, err.message, function (err) {
if (err) { if (err) {
return callback(err); return callback(err);
} }
callback(null, jobResult); callback(null, jobResult);
}); });
} else { } else {
self.setJobDone(pg, job, function (err) { self.setJobDone(pg, job, function (err) {
if (err) { if (err) {
return callback(err); return callback(err);
} }
console.info('Job %s done successfully', job.job_id);
callback(null, jobResult); callback(null, jobResult);
}); });
} }
}); });
}); });
@ -46,7 +49,7 @@ JobService.prototype.runJob = function (pg, job, callback) {
var query = job.query; var query = job.query;
if (job.query.match(/SELECT\s.*FROM\s.*/i)) { if (job.query.match(/SELECT\s.*FROM\s.*/i)) {
query = 'SELECT * INTO job_' + job.job_id.replace(/-/g, '_') + ' FROM (' + job.query + ') as q'; query = 'SELECT * INTO "job_' + job.job_id + '" FROM (' + job.query + ') AS j';
} }
pg.query(query, function (err, jobResult) { pg.query(query, function (err, jobResult) {