diff --git a/batch/index.js b/batch/index.js index 985d7204..3947eaa5 100644 --- a/batch/index.js +++ b/batch/index.js @@ -24,11 +24,10 @@ module.exports = function batchFactory (metadataBackend) { var userIndexer = new UserIndexer(metadataBackend); var jobBackend = new JobBackend(metadataBackend, jobQueue, jobPublisher, userIndexer); var userDatabaseMetadataService = new UserDatabaseMetadataService(metadataBackend); - // TODO: down userDatabaseMetadataService - var queryRunner = new QueryRunner(); + var queryRunner = new QueryRunner(userDatabaseMetadataService); var jobCanceller = new JobCanceller(userDatabaseMetadataService); var jobService = new JobService(jobBackend, jobCanceller); - var jobRunner = new JobRunner(jobService, jobQueue, queryRunner, userDatabaseMetadataService); + var jobRunner = new JobRunner(jobService, jobQueue, queryRunner); return new Batch(jobSubscriber, jobQueuePool, jobRunner, jobService); }; diff --git a/batch/job_runner.js b/batch/job_runner.js index 6dada4e4..ca8b880b 100644 --- a/batch/job_runner.js +++ b/batch/job_runner.js @@ -3,11 +3,10 @@ var errorCodes = require('../app/postgresql/error_codes').codeToCondition; var jobStatus = require('./job_status'); -function JobRunner(jobService, jobQueue, queryRunner, userDatabaseMetadataService) { +function JobRunner(jobService, jobQueue, queryRunner) { this.jobService = jobService; this.jobQueue = jobQueue; this.queryRunner = queryRunner; - this.userDatabaseMetadataService = userDatabaseMetadataService; // TODO: move to queryRunner } JobRunner.prototype.run = function (job_id, callback) { @@ -39,47 +38,40 @@ JobRunner.prototype.run = function (job_id, callback) { JobRunner.prototype._run = function (job, query, callback) { var self = this; - // TODO: move to query - self.userDatabaseMetadataService.getUserMetadata(job.data.user, function (err, userDatabaseMetadata) { + self.queryRunner.run(job.data.job_id, query, job.data.user, function (err /*, result */) { if (err) { + // if query has been cancelled then it's going to get the current + // job status saved by query_canceller + if (errorCodes[err.code.toString()] === 'query_canceled') { + return self.jobService.get(job.data.job_id, callback); + } + } + + try { + if (err) { + job.setStatus(jobStatus.FAILED, err.message); + } else { + job.setStatus(jobStatus.DONE); + } + } catch (err) { return callback(err); } - self.queryRunner.run(job.data.job_id, query, userDatabaseMetadata, function (err /*, result */) { + self.jobService.save(job, function (err, job) { if (err) { - // if query has been cancelled then it's going to get the current - // job status saved by query_canceller - if (errorCodes[err.code.toString()] === 'query_canceled') { - return self.jobService.get(job.data.job_id, callback); - } - } - - try { - if (err) { - job.setStatus(jobStatus.FAILED, err.message); - } else { - job.setStatus(jobStatus.DONE); - } - } catch (err) { return callback(err); } - self.jobService.save(job, function (err, job) { + if (!job.hasNextQuery()) { + return callback(null, job); + } + + self.jobQueue.enqueue(job.data.job_id, job.data.host, function (err) { if (err) { return callback(err); } - if (!job.hasNextQuery()) { - return callback(null, job); - } - - self.jobQueue.enqueue(job.data.job_id, userDatabaseMetadata.host, function (err) { - if (err) { - return callback(err); - } - - callback(null, job); - }); + callback(null, job); }); }); }); diff --git a/batch/models/job_base.js b/batch/models/job_base.js index 5169e560..03c3451d 100644 --- a/batch/models/job_base.js +++ b/batch/models/job_base.js @@ -19,6 +19,7 @@ var mandatoryProperties = [ 'query', 'created_at', 'updated_at', + 'host', 'user' ]; diff --git a/batch/query_runner.js b/batch/query_runner.js index 1d5701ff..27825412 100644 --- a/batch/query_runner.js +++ b/batch/query_runner.js @@ -2,37 +2,43 @@ var PSQL = require('cartodb-psql'); -function QueryRunner() { +function QueryRunner(userDatabaseMetadataService) { + this.userDatabaseMetadataService = userDatabaseMetadataService; } module.exports = QueryRunner; -QueryRunner.prototype.run = function (job_id, sql, userDatabaseMetadata, callback) { - var pg = new PSQL(userDatabaseMetadata, {}, { destroyOnError: true }); - - pg.query('SET statement_timeout=0', function (err) { - if(err) { +QueryRunner.prototype.run = function (job_id, sql, user, callback) { + this.userDatabaseMetadataService.getUserMetadata(user, function (err, userDatabaseMetadata) { + if (err) { return callback(err); } - // mark query to allow to users cancel their queries - sql = '/* ' + job_id + ' */ ' + sql; + var pg = new PSQL(userDatabaseMetadata, {}, { destroyOnError: true }); - pg.eventedQuery(sql, function (err, query) { - if (err) { + pg.query('SET statement_timeout=0', function (err) { + if(err) { return callback(err); } - query.on('error', callback); + // mark query to allow to users cancel their queries + sql = '/* ' + job_id + ' */ ' + sql; - query.on('end', function (result) { - // only if result is present then query is done sucessfully otherwise an error has happened - // and it was handled by error listener - if (result) { - callback(null, result); + pg.eventedQuery(sql, function (err, query) { + if (err) { + return callback(err); } + + query.on('error', callback); + + query.on('end', function (result) { + // only if result is present then query is done sucessfully otherwise an error has happened + // and it was handled by error listener + if (result) { + callback(null, result); + } + }); }); }); }); - };