Moved userDatabaseMetadataService from job_runner to query_runner
This commit is contained in:
parent
a60c0e1927
commit
636686386e
@ -24,11 +24,10 @@ module.exports = function batchFactory (metadataBackend) {
|
||||
var userIndexer = new UserIndexer(metadataBackend);
|
||||
var jobBackend = new JobBackend(metadataBackend, jobQueue, jobPublisher, userIndexer);
|
||||
var userDatabaseMetadataService = new UserDatabaseMetadataService(metadataBackend);
|
||||
// TODO: down userDatabaseMetadataService
|
||||
var queryRunner = new QueryRunner();
|
||||
var queryRunner = new QueryRunner(userDatabaseMetadataService);
|
||||
var jobCanceller = new JobCanceller(userDatabaseMetadataService);
|
||||
var jobService = new JobService(jobBackend, jobCanceller);
|
||||
var jobRunner = new JobRunner(jobService, jobQueue, queryRunner, userDatabaseMetadataService);
|
||||
var jobRunner = new JobRunner(jobService, jobQueue, queryRunner);
|
||||
|
||||
return new Batch(jobSubscriber, jobQueuePool, jobRunner, jobService);
|
||||
};
|
||||
|
@ -3,11 +3,10 @@
|
||||
var errorCodes = require('../app/postgresql/error_codes').codeToCondition;
|
||||
var jobStatus = require('./job_status');
|
||||
|
||||
function JobRunner(jobService, jobQueue, queryRunner, userDatabaseMetadataService) {
|
||||
function JobRunner(jobService, jobQueue, queryRunner) {
|
||||
this.jobService = jobService;
|
||||
this.jobQueue = jobQueue;
|
||||
this.queryRunner = queryRunner;
|
||||
this.userDatabaseMetadataService = userDatabaseMetadataService; // TODO: move to queryRunner
|
||||
}
|
||||
|
||||
JobRunner.prototype.run = function (job_id, callback) {
|
||||
@ -39,47 +38,40 @@ JobRunner.prototype.run = function (job_id, callback) {
|
||||
JobRunner.prototype._run = function (job, query, callback) {
|
||||
var self = this;
|
||||
|
||||
// TODO: move to query
|
||||
self.userDatabaseMetadataService.getUserMetadata(job.data.user, function (err, userDatabaseMetadata) {
|
||||
self.queryRunner.run(job.data.job_id, query, job.data.user, function (err /*, result */) {
|
||||
if (err) {
|
||||
// if query has been cancelled then it's going to get the current
|
||||
// job status saved by query_canceller
|
||||
if (errorCodes[err.code.toString()] === 'query_canceled') {
|
||||
return self.jobService.get(job.data.job_id, callback);
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
if (err) {
|
||||
job.setStatus(jobStatus.FAILED, err.message);
|
||||
} else {
|
||||
job.setStatus(jobStatus.DONE);
|
||||
}
|
||||
} catch (err) {
|
||||
return callback(err);
|
||||
}
|
||||
|
||||
self.queryRunner.run(job.data.job_id, query, userDatabaseMetadata, function (err /*, result */) {
|
||||
self.jobService.save(job, function (err, job) {
|
||||
if (err) {
|
||||
// if query has been cancelled then it's going to get the current
|
||||
// job status saved by query_canceller
|
||||
if (errorCodes[err.code.toString()] === 'query_canceled') {
|
||||
return self.jobService.get(job.data.job_id, callback);
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
if (err) {
|
||||
job.setStatus(jobStatus.FAILED, err.message);
|
||||
} else {
|
||||
job.setStatus(jobStatus.DONE);
|
||||
}
|
||||
} catch (err) {
|
||||
return callback(err);
|
||||
}
|
||||
|
||||
self.jobService.save(job, function (err, job) {
|
||||
if (!job.hasNextQuery()) {
|
||||
return callback(null, job);
|
||||
}
|
||||
|
||||
self.jobQueue.enqueue(job.data.job_id, job.data.host, function (err) {
|
||||
if (err) {
|
||||
return callback(err);
|
||||
}
|
||||
|
||||
if (!job.hasNextQuery()) {
|
||||
return callback(null, job);
|
||||
}
|
||||
|
||||
self.jobQueue.enqueue(job.data.job_id, userDatabaseMetadata.host, function (err) {
|
||||
if (err) {
|
||||
return callback(err);
|
||||
}
|
||||
|
||||
callback(null, job);
|
||||
});
|
||||
callback(null, job);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
@ -19,6 +19,7 @@ var mandatoryProperties = [
|
||||
'query',
|
||||
'created_at',
|
||||
'updated_at',
|
||||
'host',
|
||||
'user'
|
||||
];
|
||||
|
||||
|
@ -2,37 +2,43 @@
|
||||
|
||||
var PSQL = require('cartodb-psql');
|
||||
|
||||
function QueryRunner() {
|
||||
function QueryRunner(userDatabaseMetadataService) {
|
||||
this.userDatabaseMetadataService = userDatabaseMetadataService;
|
||||
}
|
||||
|
||||
module.exports = QueryRunner;
|
||||
|
||||
QueryRunner.prototype.run = function (job_id, sql, userDatabaseMetadata, callback) {
|
||||
var pg = new PSQL(userDatabaseMetadata, {}, { destroyOnError: true });
|
||||
|
||||
pg.query('SET statement_timeout=0', function (err) {
|
||||
if(err) {
|
||||
QueryRunner.prototype.run = function (job_id, sql, user, callback) {
|
||||
this.userDatabaseMetadataService.getUserMetadata(user, function (err, userDatabaseMetadata) {
|
||||
if (err) {
|
||||
return callback(err);
|
||||
}
|
||||
|
||||
// mark query to allow to users cancel their queries
|
||||
sql = '/* ' + job_id + ' */ ' + sql;
|
||||
var pg = new PSQL(userDatabaseMetadata, {}, { destroyOnError: true });
|
||||
|
||||
pg.eventedQuery(sql, function (err, query) {
|
||||
if (err) {
|
||||
pg.query('SET statement_timeout=0', function (err) {
|
||||
if(err) {
|
||||
return callback(err);
|
||||
}
|
||||
|
||||
query.on('error', callback);
|
||||
// mark query to allow to users cancel their queries
|
||||
sql = '/* ' + job_id + ' */ ' + sql;
|
||||
|
||||
query.on('end', function (result) {
|
||||
// only if result is present then query is done sucessfully otherwise an error has happened
|
||||
// and it was handled by error listener
|
||||
if (result) {
|
||||
callback(null, result);
|
||||
pg.eventedQuery(sql, function (err, query) {
|
||||
if (err) {
|
||||
return callback(err);
|
||||
}
|
||||
|
||||
query.on('error', callback);
|
||||
|
||||
query.on('end', function (result) {
|
||||
// only if result is present then query is done sucessfully otherwise an error has happened
|
||||
// and it was handled by error listener
|
||||
if (result) {
|
||||
callback(null, result);
|
||||
}
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
};
|
||||
|
Loading…
Reference in New Issue
Block a user