Refactored batch service to avoid event noise, doing in callback way

This commit is contained in:
Daniel García Aubert 2016-01-08 15:47:59 +01:00
parent f9f52d2bd1
commit 20f00d58d9
7 changed files with 158 additions and 163 deletions

View File

@ -28,6 +28,7 @@ function JobController(metadataBackend, tableCache, statsd_client) {
this.statsd_client = statsd_client;
this.jobBackend = new JobBackend(metadataBackend, jobQueue, jobPublisher, userIndexer);
this.userDatabaseMetadataService = new UserDatabaseMetadataService(metadataBackend);
this.jobCanceller = new JobCanceller(this.metadataBackend, this.userDatabaseMetadataService, this.jobBackend);
}
JobController.prototype.route = function (app) {
@ -102,21 +103,17 @@ JobController.prototype.cancelJob = function (req, res) {
req.profiler.done('setDBAuth');
}
var jobCanceller = new JobCanceller(self.metadataBackend, self.userDatabaseMetadataService);
jobCanceller.cancel(job_id)
.on('cancelled', function (job) {
// job is cancelled but surelly jobRunner has not deal whith it yet and it's not saved
job.status = 'cancelled';
self.jobCanceller.cancel(job_id, function (err, job) {
if (err) {
return next(err);
}
next(null, {
job: job,
host: userDatabase.host
});
})
.on('error', function (err) {
next(err);
next(null, {
job: job,
host: userDatabase.host
});
});
},
function handleResponse(err, result) {
if ( err ) {

View File

@ -27,7 +27,7 @@ Batch.prototype.start = function () {
// do forever, it does not cause a stack overflow
forever(function (next) {
self._consume(host, queue, next);
self._consumeJobs(host, queue, next);
}, function (err) {
self.jobQueuePool.remove(host);
@ -44,7 +44,7 @@ Batch.prototype.stop = function () {
this.jobSubscriber.unsubscribe();
};
Batch.prototype._consume = function consume(host, queue, callback) {
Batch.prototype._consumeJobs = function (host, queue, callback) {
var self = this;
queue.dequeue(host, function (err, job_id) {
@ -58,22 +58,16 @@ Batch.prototype._consume = function consume(host, queue, callback) {
return callback(emptyQueueError);
}
self.jobRunner.run(job_id)
.on('done', function (job) {
console.log('Job %s done in %s', job_id, host);
self.emit('job:done', job.job_id);
callback();
})
.on('failed', function (job) {
console.log('Job %s failed in %s', job_id, host);
self.emit('job:failed', job.job_id);
callback();
})
.on('error', function (err) {
console.error('Error in job %s due to:', job_id, err.message || err);
self.emit('job:failed', job_id);
callback();
});
self.jobRunner.run(job_id, function (err, job) {
if (err) {
return callback(err);
}
console.log('Job %s %s in %s', job_id, job.status, host);
self.emit('job:' + job.status, job_id);
callback();
});
});
};

View File

@ -7,25 +7,19 @@ var UserDatabaseMetadataService = require('./user_database_metadata_service');
var JobPublisher = require('./job_publisher');
var JobQueue = require('./job_queue');
var UserIndexer = require('./user_indexer');
var JobBackend = require('./job_backend');
var Batch = require('./batch');
module.exports = function batchFactory (metadataBackend) {
var jobSubscriber = new JobSubscriber();
var jobQueuePool = new JobQueuePool(metadataBackend);
var userDatabaseMetadataService = new UserDatabaseMetadataService(metadataBackend);
var jobPublisher = new JobPublisher();
var jobQueue = new JobQueue(metadataBackend);
var userIndexer = new UserIndexer(metadataBackend);
var jobRunner = new JobRunner(
metadataBackend,
userDatabaseMetadataService,
jobPublisher,
jobQueue,
userIndexer
);
var jobBackend = new JobBackend(metadataBackend, jobQueue, jobPublisher, userIndexer);
var userDatabaseMetadataService = new UserDatabaseMetadataService(metadataBackend);
var jobRunner = new JobRunner(jobBackend, userDatabaseMetadataService);
return new Batch(jobSubscriber, jobQueuePool, jobRunner);
};

View File

@ -1,21 +1,17 @@
'use strict';
var util = require('util');
var EventEmitter = require('events').EventEmitter;
var uuid = require('node-uuid');
var queue = require('queue-async');
var JOBS_TTL_AFTER_RESOLUTION = 48 * 3600;
var JOBS_TTL_IN_SECONDS = 48 * 3600;
function JobBackend(metadataBackend, jobQueueProducer, jobPublisher, userIndexer) {
EventEmitter.call(this);
this.db = 5;
this.redisPrefix = 'batch:jobs:';
this.metadataBackend = metadataBackend;
this.jobQueueProducer = jobQueueProducer;
this.jobPublisher = jobPublisher;
this.userIndexer = userIndexer;
this.db = 5;
this.redisPrefix = 'batch:jobs:';
}
util.inherits(JobBackend, EventEmitter);
JobBackend.prototype.create = function (username, sql, host, callback) {
var self = this;
@ -188,92 +184,107 @@ JobBackend.prototype.get = function (job_id, callback) {
});
};
JobBackend.prototype.setRunning = function (job) {
var self = this;
JobBackend.prototype.setRunning = function (job, callback) {
var now = new Date().toISOString();
var redisParams = [
this.redisPrefix + job.job_id,
'status', 'running',
'updated_at', new Date().toISOString()
'updated_at', now
];
this.metadataBackend.redisCmd(this.db, 'HMSET', redisParams, function (err) {
if (err) {
return self.emit('error', err);
return callback(err);
}
self.emit('running', job);
job.status = 'running';
job.updated_at = now;
callback(null, job);
});
};
JobBackend.prototype.setDone = function (job) {
JobBackend.prototype.setDone = function (job, callback) {
var self = this;
var now = new Date().toISOString();
var redisKey = this.redisPrefix + job.job_id;
var redisParams = [
redisKey,
'status', 'done',
'updated_at', new Date().toISOString()
'updated_at', now
];
this.metadataBackend.redisCmd(this.db, 'HMSET', redisParams , function (err) {
if (err) {
return self.emit('error', err);
return callback(err);
}
self.metadataBackend.redisCmd(self.db, 'EXPIRE', [ redisKey, JOBS_TTL_AFTER_RESOLUTION ], function (err) {
self.metadataBackend.redisCmd(self.db, 'EXPIRE', [ redisKey, JOBS_TTL_IN_SECONDS ], function (err) {
if (err) {
return self.emit('error', err);
return callback(err);
}
self.emit('done', job);
job.status = 'done';
job.updated_at = now;
callback(null, job);
});
});
};
JobBackend.prototype.setFailed = function (job, err) {
JobBackend.prototype.setFailed = function (job, err, callback) {
var self = this;
var now = new Date().toISOString();
var redisKey = this.redisPrefix + job.job_id;
var redisParams = [
redisKey,
'status', 'failed',
'failed_reason', err.message,
'updated_at', new Date().toISOString()
'updated_at', now
];
this.metadataBackend.redisCmd(this.db, 'HMSET', redisParams , function (err) {
if (err) {
return self.emit('error', err);
return callback(err);
}
self.metadataBackend.redisCmd(self.db, 'EXPIRE', [ redisKey, JOBS_TTL_AFTER_RESOLUTION ], function (err) {
self.metadataBackend.redisCmd(self.db, 'EXPIRE', [ redisKey, JOBS_TTL_IN_SECONDS ], function (err) {
if (err) {
return self.emit('error', err);
return callback(err);
}
self.emit('failed', job);
job.status = 'failed';
job.updated_at = now;
callback(null, job);
});
});
};
JobBackend.prototype.setCancelled = function (job) {
JobBackend.prototype.setCancelled = function (job, callback) {
var self = this;
var now = new Date().toISOString();
var redisKey = this.redisPrefix + job.job_id;
var redisParams = [
redisKey,
'status', 'cancelled',
'updated_at', new Date().toISOString()
'updated_at', now
];
this.metadataBackend.redisCmd(this.db, 'HMSET', redisParams , function (err) {
if (err) {
return self.emit('error', err);
return callback(err);
}
self.metadataBackend.redisCmd(self.db, 'EXPIRE', [ redisKey, JOBS_TTL_AFTER_RESOLUTION ], function (err) {
self.metadataBackend.redisCmd(self.db, 'EXPIRE', [ redisKey, JOBS_TTL_IN_SECONDS ], function (err) {
if (err) {
return self.emit('error', err);
return callback(err);
}
self.emit('cancelled', job);
job.status = 'cancelled';
job.updated_at = now;
callback(null, job);
});
});

View File

@ -1,77 +1,75 @@
'use strict';
var JobBackend = require('./job_backend');
var PSQL = require('cartodb-psql');
var JobPublisher = require('./job_publisher');
var JobQueue = require('./job_queue');
var UserIndexer = require('./user_indexer');
function JobCanceller(metadataBackend, userDatabaseMetadataService) {
function JobCanceller(metadataBackend, userDatabaseMetadataService, jobBackend) {
this.metadataBackend = metadataBackend;
this.userDatabaseMetadataService = userDatabaseMetadataService;
this.jobBackend = jobBackend;
}
JobCanceller.prototype.cancel = function (job_id) {
JobCanceller.prototype.cancel = function (job_id, callback) {
var self = this;
var jobQueue = new JobQueue(this.metadataBackend);
var jobPublisher = new JobPublisher();
var userIndexer = new UserIndexer(this.metadataBackend);
var jobBackend = new JobBackend(this.metadataBackend, jobQueue, jobPublisher, userIndexer);
jobBackend.get(job_id, function (err, job) {
self.jobBackend.get(job_id, function (err, job) {
if (err) {
return jobBackend.emit('error', err);
return callback(err);
}
if (job.status === 'pending') {
return jobBackend.setCancelled(job);
return self.jobBackend.setCancelled(job, callback);
}
if (job.status !== 'running') {
return jobBackend.emit('error', new Error('Job is ' + job.status + ' nothing to do'));
return callback(new Error('Job is ' + job.status + ' nothing to do'));
}
self.userDatabaseMetadataService.getUserMetadata(job.user, function (err, userDatabaseMetadata) {
if (err) {
return jobBackend.emit('error', err);
return callback(err);
}
var pg = new PSQL(userDatabaseMetadata, {}, { destroyOnError: true });
var getPIDQuery = 'SELECT pid FROM pg_stat_activity WHERE query = \'' +
job.query +
' /* ' + job.job_id + ' */\'';
pg.query(getPIDQuery, function(err, result) {
if(err) {
return jobBackend.emit('error', err);
}
if (!result.rows[0] || !result.rows[0].pid) {
return jobBackend.emit('error', new Error('Query not running currently'));
}
var pid = result.rows[0].pid;
var cancelQuery = 'SELECT pg_cancel_backend(' + pid +')';
pg.query(cancelQuery, function (err, result) {
if (err) {
return jobBackend.emit('error', err);
}
var isCancelled = result.rows[0].pg_cancel_backend;
if (!isCancelled) {
return jobBackend.emit('error', new Error('Query has not been cancelled'));
}
jobBackend.emit('cancelled', job);
});
});
self._query(job, userDatabaseMetadata, callback);
});
});
};
return jobBackend;
JobCanceller.prototype._query = function (job, userDatabaseMetadata, callback) {
var pg = new PSQL(userDatabaseMetadata, {}, { destroyOnError: true });
var getPIDQuery = 'SELECT pid FROM pg_stat_activity WHERE query = \'' + job.query +
' /* ' + job.job_id + ' */\'';
pg.query(getPIDQuery, function(err, result) {
if(err) {
return callback(err);
}
if (!result.rows[0] || !result.rows[0].pid) {
return callback(new Error('Query not running currently'));
}
var pid = result.rows[0].pid;
var cancelQuery = 'SELECT pg_cancel_backend(' + pid +')';
pg.query(cancelQuery, function (err, result) {
if (err) {
return callback(err);
}
var isCancelled = result.rows[0].pg_cancel_backend;
if (!isCancelled) {
return callback(new Error('Query has not been cancelled'));
}
// JobRunner handles job status through the PG's client error handler (see JobRunner.run:48)
// Due to user needs feedback, this modifies to the current status and updated dat
job.updated_at = new Date().toISOString();
job.status = 'cancelled';
callback(null, job);
});
});
};

View File

@ -1,74 +1,74 @@
'use strict';
var JobBackend = require('./job_backend');
var PSQL = require('cartodb-psql');
var QUERY_CANCELED = '57014';
function JobRunner(metadataBackend, userDatabaseMetadataService, jobPublisher, jobQueue, userIndexer) {
this.metadataBackend = metadataBackend;
function JobRunner(jobBackend, userDatabaseMetadataService) {
this.jobBackend = jobBackend;
this.userDatabaseMetadataService = userDatabaseMetadataService;
this.jobPublisher = jobPublisher;
this.jobQueue = jobQueue;
this.userIndexer = userIndexer;
}
JobRunner.prototype.run = function (job_id) {
JobRunner.prototype.run = function (job_id, callback) {
var self = this;
var jobBackend = new JobBackend(this.metadataBackend, this.jobQueue, this.jobPublisher, this.userIndexer);
jobBackend.get(job_id, function (err, job) {
self.jobBackend.get(job_id, function (err, job) {
if (err) {
return jobBackend.emit('error', err);
return callback(err);
}
if (job.status !== 'pending') {
return jobBackend.emit('error',
new Error('Cannot run job ' + job.job_id + ' due to its status is ' + job.status));
return callback(new Error('Cannot run job ' + job.job_id + ' due to its status is ' + job.status));
}
self.userDatabaseMetadataService.getUserMetadata(job.user, function (err, userDatabaseMetadata) {
if (err) {
return jobBackend.emit('error', err);
return callback(err);
}
var pg = new PSQL(userDatabaseMetadata, {}, { destroyOnError: true });
jobBackend.setRunning(job);
pg.query('SET statement_timeout=0', function(err) {
if(err) {
return jobBackend.setFailed(job, err);
self.jobBackend.setRunning(job, function (err, job) {
if (err) {
return callback(err);
}
// mark query to allow to users cancel their queries whether users request for it
var sql = job.query + ' /* ' + job.job_id + ' */';
pg.eventedQuery(sql, function (err, query /* , queryCanceller */) {
if (err) {
return jobBackend.setFailed(job, err);
}
query.on('error', function (err) {
if (err.code === QUERY_CANCELED) {
return jobBackend.setCancelled(job);
}
jobBackend.setFailed(job, err);
});
query.on('end', function (result) {
if (result) {
jobBackend.setDone(job);
}
});
});
self._query(job, userDatabaseMetadata, callback);
});
});
});
return jobBackend;
};
JobRunner.prototype._query = function (job, userDatabaseMetadata, callback) {
var self = this;
var pg = new PSQL(userDatabaseMetadata, {}, { destroyOnError: true });
pg.query('SET statement_timeout=0', function (err) {
if(err) {
return self.jobBackend.setFailed(job, err, callback);
}
// mark query to allow to users cancel their queries whether users request for it
var sql = job.query + ' /* ' + job.job_id + ' */';
pg.eventedQuery(sql, function (err, query) {
if (err) {
return self.jobBackend.setFailed(job, err, callback);
}
query.on('error', function (err) {
if (err.code === QUERY_CANCELED) {
return self.jobBackend.setCancelled(job, callback);
}
self.jobBackend.setFailed(job, err, callback);
});
query.on('end', function (result) {
if (result) {
self.jobBackend.setDone(job, callback);
}
});
});
});
};
module.exports = JobRunner;

View File

@ -21,6 +21,7 @@ describe('batch module', function() {
var jobPublisher = new JobPublisher();
var userIndexer = new UserIndexer(metadataBackend);
var jobBackend = new JobBackend(metadataBackend, jobQueue, jobPublisher, userIndexer);
var batch = new Batch(metadataBackend);
before(function () {