Fixed conflicts in merge

This commit is contained in:
Daniel García Aubert 2016-06-03 11:43:21 +02:00
commit cc1a5641ea
5 changed files with 53 additions and 57 deletions

View File

@ -24,11 +24,10 @@ module.exports = function batchFactory (metadataBackend) {
var userIndexer = new UserIndexer(metadataBackend); var userIndexer = new UserIndexer(metadataBackend);
var jobBackend = new JobBackend(metadataBackend, jobQueue, jobPublisher, userIndexer); var jobBackend = new JobBackend(metadataBackend, jobQueue, jobPublisher, userIndexer);
var userDatabaseMetadataService = new UserDatabaseMetadataService(metadataBackend); var userDatabaseMetadataService = new UserDatabaseMetadataService(metadataBackend);
// TODO: down userDatabaseMetadataService var queryRunner = new QueryRunner(userDatabaseMetadataService);
var queryRunner = new QueryRunner();
var jobCanceller = new JobCanceller(userDatabaseMetadataService); var jobCanceller = new JobCanceller(userDatabaseMetadataService);
var jobService = new JobService(jobBackend, jobCanceller); var jobService = new JobService(jobBackend, jobCanceller);
var jobRunner = new JobRunner(jobService, jobQueue, queryRunner, userDatabaseMetadataService); var jobRunner = new JobRunner(jobService, jobQueue, queryRunner);
return new Batch(jobSubscriber, jobQueuePool, jobRunner, jobService); return new Batch(jobSubscriber, jobQueuePool, jobRunner, jobService);
}; };

View File

@ -29,8 +29,7 @@ function toRedisParams(job) {
for (var property in obj) { for (var property in obj) {
if (obj.hasOwnProperty(property)) { if (obj.hasOwnProperty(property)) {
redisParams.push(property); redisParams.push(property);
// TODO: this should be moved to job model ?? if (property === 'query' && typeof obj[property] !== 'string') {
if ((property === 'query' || property === 'status') && typeof obj[property] !== 'string') {
redisParams.push(JSON.stringify(obj[property])); redisParams.push(JSON.stringify(obj[property]));
} else { } else {
redisParams.push(obj[property]); redisParams.push(obj[property]);
@ -65,7 +64,6 @@ function toObject(job_id, redisParams, redisValues) {
return obj; return obj;
} }
// TODO: is it really necessary??
function isJobFound(redisValues) { function isJobFound(redisValues) {
return redisValues[0] && redisValues[1] && redisValues[2] && redisValues[3] && redisValues[4]; return redisValues[0] && redisValues[1] && redisValues[2] && redisValues[3] && redisValues[4];
} }

View File

@ -3,11 +3,10 @@
var errorCodes = require('../app/postgresql/error_codes').codeToCondition; var errorCodes = require('../app/postgresql/error_codes').codeToCondition;
var jobStatus = require('./job_status'); var jobStatus = require('./job_status');
function JobRunner(jobService, jobQueue, queryRunner, userDatabaseMetadataService) { function JobRunner(jobService, jobQueue, queryRunner) {
this.jobService = jobService; this.jobService = jobService;
this.jobQueue = jobQueue; this.jobQueue = jobQueue;
this.queryRunner = queryRunner; this.queryRunner = queryRunner;
this.userDatabaseMetadataService = userDatabaseMetadataService; // TODO: move to queryRunner
} }
JobRunner.prototype.run = function (job_id, callback) { JobRunner.prototype.run = function (job_id, callback) {
@ -39,50 +38,43 @@ JobRunner.prototype.run = function (job_id, callback) {
JobRunner.prototype._run = function (job, query, callback) { JobRunner.prototype._run = function (job, query, callback) {
var self = this; var self = this;
// TODO: move to query self.queryRunner.run(job.data.job_id, query, job.data.user, function (err /*, result */) {
self.userDatabaseMetadataService.getUserMetadata(job.data.user, function (err, userDatabaseMetadata) {
if (err) { if (err) {
if (!err.code) {
return callback(err);
}
// if query has been cancelled then it's going to get the current
// job status saved by query_canceller
if (errorCodes[err.code.toString()] === 'query_canceled') {
return self.jobService.get(job.data.job_id, callback);
}
}
try {
if (err) {
job.setStatus(jobStatus.FAILED, err.message);
} else {
job.setStatus(jobStatus.DONE);
}
} catch (err) {
return callback(err); return callback(err);
} }
self.queryRunner.run(job.data.job_id, query, userDatabaseMetadata, function (err /*, result */) { self.jobService.save(job, function (err, job) {
if (err) { if (err) {
if (!err.code) {
return callback(err);
}
// if query has been cancelled then it's going to get the current
// job status saved by query_canceller
if (errorCodes[err.code.toString()] === 'query_canceled') {
return self.jobService.get(job.data.job_id, callback);
}
}
try {
if (err) {
job.setStatus(jobStatus.FAILED, err.message);
} else {
job.setStatus(jobStatus.DONE);
}
} catch (err) {
return callback(err); return callback(err);
} }
self.jobService.save(job, function (err, job) { if (!job.hasNextQuery()) {
return callback(null, job);
}
self.jobQueue.enqueue(job.data.job_id, job.data.host, function (err) {
if (err) { if (err) {
return callback(err); return callback(err);
} }
if (!job.hasNextQuery()) { callback(null, job);
return callback(null, job);
}
self.jobQueue.enqueue(job.data.job_id, userDatabaseMetadata.host, function (err) {
if (err) {
return callback(err);
}
callback(null, job);
});
}); });
}); });
}); });

View File

@ -19,6 +19,7 @@ var mandatoryProperties = [
'query', 'query',
'created_at', 'created_at',
'updated_at', 'updated_at',
'host',
'user' 'user'
]; ];

View File

@ -2,37 +2,43 @@
var PSQL = require('cartodb-psql'); var PSQL = require('cartodb-psql');
function QueryRunner() { function QueryRunner(userDatabaseMetadataService) {
this.userDatabaseMetadataService = userDatabaseMetadataService;
} }
module.exports = QueryRunner; module.exports = QueryRunner;
QueryRunner.prototype.run = function (job_id, sql, userDatabaseMetadata, callback) { QueryRunner.prototype.run = function (job_id, sql, user, callback) {
var pg = new PSQL(userDatabaseMetadata, {}, { destroyOnError: true }); this.userDatabaseMetadataService.getUserMetadata(user, function (err, userDatabaseMetadata) {
if (err) {
pg.query('SET statement_timeout=0', function (err) {
if(err) {
return callback(err); return callback(err);
} }
// mark query to allow to users cancel their queries var pg = new PSQL(userDatabaseMetadata, {}, { destroyOnError: true });
sql = '/* ' + job_id + ' */ ' + sql;
pg.eventedQuery(sql, function (err, query) { pg.query('SET statement_timeout=0', function (err) {
if (err) { if(err) {
return callback(err); return callback(err);
} }
query.on('error', callback); // mark query to allow to users cancel their queries
sql = '/* ' + job_id + ' */ ' + sql;
query.on('end', function (result) { pg.eventedQuery(sql, function (err, query) {
// only if result is present then query is done sucessfully otherwise an error has happened if (err) {
// and it was handled by error listener return callback(err);
if (result) {
callback(null, result);
} }
query.on('error', callback);
query.on('end', function (result) {
// only if result is present then query is done sucessfully otherwise an error has happened
// and it was handled by error listener
if (result) {
callback(null, result);
}
});
}); });
}); });
}); });
}; };