Refactored Batch API using streams instead a interval to consume the job queue. Limited one job running at the same time per queue instead of using a job's counter to limit it.

This commit is contained in:
Daniel García Aubert 2015-12-16 15:57:58 +01:00
parent d0787d03f7
commit 43f759e96a
18 changed files with 346 additions and 363 deletions

View File

@ -28,7 +28,7 @@ var CacheStatusController = require('./controllers/cache_status_controller');
var HealthCheckController = require('./controllers/health_check_controller');
var VersionController = require('./controllers/version_controller');
var batchService = require('../batch');
var batch = require('../batch');
process.env.PGAPPNAME = process.env.PGAPPNAME || 'cartodb_sqlapi';
@ -181,7 +181,7 @@ function App() {
versionController.route(app);
if (global.settings.environment !== 'test') {
batchService(metadataBackend, 5000, 100);
batch(metadataBackend);
}
return app;

View File

@ -6,18 +6,20 @@ var assert = require('assert');
var PSQL = require('cartodb-psql');
var UserDatabaseService = require('../services/user_database_service');
var UsernameQueue = require('../../batch/username_queue');
var JobPublisher = require('../../batch/job_publisher');
var JobQueueProducer = require('../../batch/job_queue_producer');
var CdbRequest = require('../models/cartodb_request');
var handleException = require('../utils/error_handler');
var cdbReq = new CdbRequest();
var userDatabaseService = new UserDatabaseService();
var jobPublisher = new JobPublisher();
function JobController(metadataBackend, tableCache, statsd_client) {
this.metadataBackend = metadataBackend;
this.tableCache = tableCache;
this.statsd_client = statsd_client;
this.userDatabaseQueue = new UsernameQueue(metadataBackend);
this.jobQueueProducer = new JobQueueProducer(metadataBackend);
}
JobController.prototype.route = function (app) {
@ -113,11 +115,13 @@ JobController.prototype.handleJob = function (req, res) {
var next = this;
self.userDatabaseQueue.enqueue(cdbUsername, function (err) {
self.jobQueueProducer.enqueue(cdbUsername, result.userDatabase.host, function (err) {
if (err) {
return next(err);
}
jobPublisher.publish(result.userDatabase.host);
next(null, {
job: result.job,
host: result.userDatabase.host

View File

@ -1,25 +0,0 @@
'use strict';
function BatchLauncher(batchManager) {
this.batchManager = batchManager;
this.batchInterval = global.settings.batch_interval;
}
BatchLauncher.prototype.start = function (interval) {
var self = this;
interval = this.batchInterval || interval || 5000;
this.intervalCallback = setInterval(function () {
self.batchManager.run(function (err) {
if (err) {
console.log('Error in batch service: ', err);
}
});
}, interval);
};
BatchLauncher.prototype.stop = function () {
clearInterval(this.intervalCallback);
};
module.exports = BatchLauncher;

View File

@ -1,61 +0,0 @@
'use strict';
function BatchManager(usernameQueue, userDatabaseMetadataService, jobService, jobCounterService) {
this.usernameQueue = usernameQueue;
this.userDatabaseMetadataService = userDatabaseMetadataService;
this.jobService = jobService;
this.jobCounterService = jobCounterService;
}
BatchManager.prototype.run = function (callback) {
var self = this;
this.usernameQueue.dequeue(function (err, username) {
if (err) {
return callback(err);
}
if (!username) {
return callback(); // no jobs scheduled
}
self.userDatabaseMetadataService.getUserMetadata(username, function (err, userDatabaseMetadata) {
if (err) {
return callback(err);
}
self.jobCounterService.increment(userDatabaseMetadata.host, function (err) {
if (err && err.name === 'JobLimitReachedError') {
self.usernameQueue.enqueue(username, function (err) {
if (err) {
callback(err);
}
callback();
});
} else if (err) {
return callback(err);
}
self.jobService.run(userDatabaseMetadata, function (err) {
if (err) {
self.usernameQueue.enqueue(username, function (err) {
if (err) {
callback(err);
}
callback();
});
}
self.jobCounterService.decrement(userDatabaseMetadata.host, function (err) {
if (err) {
return callback(err);
}
callback();
});
});
});
});
});
};
module.exports = BatchManager;

View File

@ -1,17 +0,0 @@
'use strict';
var UserDatabaseMetadataService = require('./user_database_metadata_service');
var UsernameQueue = require('./username_queue');
var JobService = require('./job_service');
var JobCounterService = require('./job_counter_service');
var BatchManager = require('./batch_manager');
module.exports = function (metadataBackend ,maxJobsPerHost) {
var usernameQueue = new UsernameQueue(metadataBackend);
var userDatabaseMetadataService = new UserDatabaseMetadataService(metadataBackend);
var jobService = new JobService(metadataBackend);
var jobCounterService = new JobCounterService(maxJobsPerHost, metadataBackend);
var batchManager = new BatchManager(usernameQueue, userDatabaseMetadataService, jobService, jobCounterService);
return batchManager;
};

View File

@ -1,12 +1,49 @@
'use strict';
var BatchLauncher = require('./batch_launcher');
var batchManagerFactory = require('./batch_manager_factory');
var Job = require('./job');
var JobQueuePool = require('./job_queue_pool');
var JobQueueConsumer = require('./job_queue_consumer');
var JobSubscriber = require('./job_subscriber');
var UserDatabaseMetadataService = require('./user_database_metadata_service');
var JobService = require('./job_service');
module.exports = function (metadataBackend, interval, maxJobsPerHost) {
var batchManager = batchManagerFactory(metadataBackend, maxJobsPerHost);
var batchLauncher = new BatchLauncher(batchManager);
module.exports = function (metadataBackend) {
var jobQueuePool = new JobQueuePool();
var jobSubscriber = new JobSubscriber();
var job = new Job();
var userDatabaseMetadataService = new UserDatabaseMetadataService(metadataBackend);
var jobService = new JobService(userDatabaseMetadataService, job);
// here we go!
batchLauncher.start(interval);
jobSubscriber.subscribe(function onMessage(channel, host) {
var jobQueueConsumer = jobQueuePool.get(host);
// if queue consumer is not registered in batch service
if (!jobQueueConsumer) {
// creates new one
jobQueueConsumer = new JobQueueConsumer(metadataBackend, host);
// register it in batch service
jobQueuePool.add(host, jobQueueConsumer);
// while read from queue then perform job
jobQueueConsumer.on('data', function (username) {
// limit one job at the same time per queue (queue <1:1> db intance)
jobQueueConsumer.pause();
jobService.run(username, function (err) {
if (err) {
console.error(err.stack);
}
// next job
jobQueueConsumer.resume();
});
})
.on('error', function (err) {
console.error(err.stack || err);
});
}
});
};

114
batch/job.js Normal file
View File

@ -0,0 +1,114 @@
'use strict';
var PSQL = require('cartodb-psql');
function Job() {
}
Job.prototype.run = function (userDatabaseMetada, callback) {
var self = this;
var pg = new PSQL(userDatabaseMetada, {}, { destroyOnError: true });
this.getJob(pg, function (err, job) {
if (err) {
return callback(err);
}
if (!job) {
return callback();
}
self.setJobRunning(pg, job, function (err) {
if (err) {
return callback(err);
}
self.runJob(pg, job, function (err, jobResult) {
if (err) {
return self.setJobFailed(pg, job, err.message, function () {
callback(err);
});
}
self.setJobDone(pg, job, function () {
callback(null, jobResult);
});
});
});
});
};
Job.prototype.getJob = function (pg, callback) {
var getNextJob = "SELECT * FROM cdb_jobs WHERE status='pending' ORDER BY updated_at ASC LIMIT 1";
pg.query(getNextJob, function (err, result) {
if (err) {
return callback(err);
}
callback(null, result.rows[0]);
});
};
Job.prototype.runJob = function (pg, job, callback) {
var query = job.query;
if (job.query.match(/SELECT\s.*FROM\s.*/i)) {
query = 'SELECT * INTO "job_' + job.job_id + '" FROM (' + job.query + ') AS j';
}
pg.query(query, callback);
};
Job.prototype.setJobRunning = function (pg, job, callback) {
var runningJobQuery = [
'UPDATE cdb_jobs SET ',
'status = \'running\', ',
'updated_at = now() ',
' WHERE ',
'job_id = \'' + job.job_id + '\' ',
' RETURNING job_id;'
].join('\n');
pg.query(runningJobQuery, callback);
};
Job.prototype.setJobDone = function (pg, job, callback) {
var doneJobQuery = [
'UPDATE cdb_jobs SET ',
'status = \'done\', ',
'updated_at = now() ',
' WHERE ',
'job_id = \'' + job.job_id + '\' ',
' RETURNING job_id;'
].join('\n');
pg.query(doneJobQuery, function (err) {
if (err) {
console.error(err.stack);
}
callback();
});
};
Job.prototype.setJobFailed = function (pg, job, message, callback) {
var failedJobQuery = [
'UPDATE cdb_jobs SET ',
'status = \'failed\', ',
'failed_reason = \'' + message + '\', ',
'updated_at = now() ',
' WHERE ',
'job_id = \'' + job.job_id + '\' ',
' RETURNING job_id;'
].join('\n');
pg.query(failedJobQuery, function (err) {
if (err) {
console.error(err.stack);
}
callback();
});
};
module.exports = Job;

View File

@ -1,53 +0,0 @@
'use strict';
function JobCounterService(maxJobsPerHost, metadataBackend) {
this.metadataBackend = metadataBackend;
this.maxJobsPerHost = maxJobsPerHost || global.settings.max_jobs_per_instance;
this.db = 5;
}
JobCounterService.prototype.increment = function (host, callback) {
var self = this;
var db = this.db;
this.metadataBackend.redisCmd(db, 'GET', [host], function (err, hostCounter) {
if (err) {
return callback(err);
}
if (hostCounter >= self.maxJobsPerHost) {
return callback(new Error('Limit max job per host is reached: %s jobs', hostCounter));
}
self.metadataBackend.redisCmd(db, 'INCR', [host], function (err /*, hostCounter */) {
if (err) {
return callback(err);
}
callback();
});
});
};
JobCounterService.prototype.decrement = function (host, callback) {
var self = this;
var db = this.db;
this.metadataBackend.redisCmd(db, 'GET', [host], function (err, hostCounter) {
if (err) {
return callback(err);
}
if (hostCounter < 0) {
return callback(new Error('Limit max job per host is reached').name= 'JobLimitReachedError');
}
self.metadataBackend.redisCmd(db, 'DECR', [host], function (err /*, hostCounter */) {
if (err) {
return callback(err);
}
callback();
});
});
};
module.exports = JobCounterService;

14
batch/job_publisher.js Normal file
View File

@ -0,0 +1,14 @@
'use strict';
var redis = require('redis');
function JobPublisher() {
this.channel = 'host:job';
this.client = redis.createClient(global.settings.redis_port, global.settings.redis_host);
}
JobPublisher.prototype.publish = function (host) {
this.client.publish(this.channel, host);
};
module.exports = JobPublisher;

View File

@ -0,0 +1,28 @@
'use strict';
var util = require('util');
var Readable = require('stream').Readable;
function JobQueueConsumer(metadataBackend, host) {
Readable.call(this, {
encoding: 'utf8',
objectMode: true
});
this.db = 5;
this.queueName = 'queue:' + host;
this.metadataBackend = metadataBackend;
}
util.inherits(JobQueueConsumer, Readable);
JobQueueConsumer.prototype._read = function () {
var self = this;
this.metadataBackend.redisCmd(this.db, 'RPOP', [ this.queueName ], function (err, username) {
if (err) {
return self.emit('error', err);
}
self.push(username);
});
};
module.exports = JobQueueConsumer;

23
batch/job_queue_pool.js Normal file
View File

@ -0,0 +1,23 @@
'use strict';
function JobQueuePool() {
this.queues = {};
}
JobQueuePool.prototype.get = function (host) {
return this.queues[host];
};
JobQueuePool.prototype.list = function () {
return Object.keys(this.queues);
};
JobQueuePool.prototype.add = function (host, queue) {
this.queues[host] = queue;
};
JobQueuePool.prototype.remove = function (host) {
delete this.queues[host];
};
module.exports = JobQueuePool;

View File

@ -0,0 +1,15 @@
'use strict';
function JobQueueProducer(metadataBackend) {
this.metadataBackend = metadataBackend;
this.db = 5;
}
JobQueueProducer.prototype.enqueue = function (cdbUsername, host, callback) {
var db = this.db;
var queue = 'queue:' + host;
this.metadataBackend.redisCmd(db, 'LPUSH', [queue, cdbUsername], callback);
};
module.exports = JobQueueProducer;

View File

@ -1,129 +1,19 @@
'use strict';
var PSQL = require('cartodb-psql');
function JobService() {
function JobService(userDatabaseMetadataService, job) {
this.userDatabaseMetadataService = userDatabaseMetadataService;
this.job = job;
}
JobService.prototype.run = function (userDatabaseMetada, callback) {
JobService.prototype.run = function (username, callback) {
var self = this;
var pg = new PSQL(userDatabaseMetada, {}, { destroyOnError: true });
this.getJob(pg, function (err, job) {
this.userDatabaseMetadataService.getUserMetadata(username, function (err, userDatabaseMetadata) {
if (err) {
return callback(err);
}
self.setJobRunning(pg, job, function (err) {
if (err) {
return callback(err);
}
self.runJob(pg, job, function (err, jobResult) {
if (err) {
self.setJobFailed(pg, job, err.message, function (err) {
if (err) {
return callback(err);
}
callback(null, jobResult);
});
} else {
self.setJobDone(pg, job, function (err) {
if (err) {
return callback(err);
}
callback(null, jobResult);
});
}
});
});
});
};
JobService.prototype.getJob = function (pg, callback) {
var getNextJob = "SELECT * FROM cdb_jobs WHERE status='pending' ORDER BY updated_at ASC LIMIT 1";
pg.query(getNextJob, function (err, result) {
if (err) {
return callback(err);
}
callback(null, result.rows[0]);
});
};
JobService.prototype.runJob = function (pg, job, callback) {
var query = job.query;
if (job.query.match(/SELECT\s.*FROM\s.*/i)) {
query = 'SELECT * INTO "job_' + job.job_id + '" FROM (' + job.query + ') AS j';
}
pg.query(query, function (err, jobResult) {
if (err) {
return callback(err);
}
callback(null, jobResult);
});
};
JobService.prototype.setJobRunning = function (pg, job, callback) {
var runningJobQuery = [
'UPDATE cdb_jobs SET ',
'status = \'running\', ',
'updated_at = now() ',
' WHERE ',
'job_id = \'' + job.job_id + '\' ',
' RETURNING job_id;'
].join('\n');
pg.query(runningJobQuery, function (err, result) {
if (err) {
return callback(err);
}
callback(null, result);
});
};
JobService.prototype.setJobDone = function (pg, job, callback) {
var doneJobQuery = [
'UPDATE cdb_jobs SET ',
'status = \'done\', ',
'updated_at = now() ',
' WHERE ',
'job_id = \'' + job.job_id + '\' ',
' RETURNING job_id;'
].join('\n');
pg.query(doneJobQuery, function (err, result) {
if (err) {
return callback(err);
}
callback(null, result);
});
};
JobService.prototype.setJobFailed = function (pg, job, message, callback) {
var failedJobQuery = [
'UPDATE cdb_jobs SET ',
'status = \'failed\', ',
'failed_reason = \'' + message + '\', ',
'updated_at = now() ',
' WHERE ',
'job_id = \'' + job.job_id + '\' ',
' RETURNING job_id;'
].join('\n');
pg.query(failedJobQuery, function (err, result) {
if (err) {
return callback(err);
}
callback(null, result);
self.job.run(userDatabaseMetadata, callback);
});
};

15
batch/job_subscriber.js Normal file
View File

@ -0,0 +1,15 @@
'use strict';
var redis = require('redis');
function JobSubscriber() {
this.channel = 'host:job';
this.client = redis.createClient(global.settings.redis_port, global.settings.redis_host);
}
JobSubscriber.prototype.subscribe = function (onMessage) {
this.client.subscribe(this.channel);
this.client.on('message', onMessage);
};
module.exports = JobSubscriber;

View File

@ -39,7 +39,6 @@ UserDatabaseMetadataService.prototype.parseMetadaToDatabase = function (userData
});
return dbopts;
};
module.exports = UserDatabaseMetadataService;

View File

@ -1,24 +0,0 @@
'use strict';
function UsernameQueue(metadataBackend) {
this.metadataBackend = metadataBackend;
this.db = 5;
this.queueName = 'usernameBatchQueue';
}
UsernameQueue.prototype.enqueue = function (cdbUsername, callback) {
var db = this.db;
var queue = this.queueName;
this.metadataBackend.redisCmd(db, 'LPUSH', [queue, cdbUsername], callback);
};
UsernameQueue.prototype.dequeue = function (callback) {
var db = this.db;
var queue = this.queueName;
this.metadataBackend.redisCmd(db, 'RPOP', [queue], callback);
};
module.exports = UsernameQueue;

View File

@ -16,18 +16,19 @@
"Sandro Santilli <strk@vizzuality.com>"
],
"dependencies": {
"cartodb-redis": "~0.11.0",
"cartodb-psql": "~0.6.0",
"step-profiler": "~0.1.0",
"underscore": "~1.6.0",
"cartodb-redis": "~0.11.0",
"express": "~2.5.11",
"step": "~0.0.5",
"topojson": "0.0.8",
"oauth-client": "0.3.0",
"lru-cache":"~2.5.0",
"log4js": "https://github.com/CartoDB/log4js-node/tarball/cdb",
"lru-cache": "~2.5.0",
"node-statsd": "~0.0.7",
"oauth-client": "0.3.0",
"redis": "^2.4.2",
"rollbar": "~0.3.2",
"node-statsd": "~0.0.7"
"step": "~0.0.5",
"step-profiler": "~0.1.0",
"topojson": "0.0.8",
"underscore": "~1.6.0"
},
"devDependencies": {
"redis": "0.7.1",

View File

@ -1,8 +1,10 @@
var batchManagerFactory = require('../../batch/batch_manager_factory');
var batch = require('../../batch/');
var JobPublisher = require('../../batch/job_publisher');
var JobQueueProducer = require('../../batch/job_queue_producer');
describe('batch manager', function() {
it('run', function (done) {
describe('batch', function() {
it('should be initialized successfuly', function () {
var metadataBackend = require('cartodb-redis')({
host: global.settings.redis_host,
port: global.settings.redis_port,
@ -10,10 +12,31 @@ describe('batch manager', function() {
idleTimeoutMillis: global.settings.redisIdleTimeoutMillis,
reapIntervalMillis: global.settings.redisReapIntervalMillis
});
var maxJobsPerHost = 100;
batchManagerFactory(metadataBackend, maxJobsPerHost).run(function (err) {
done(err);
batch(metadataBackend);
});
it.skip('should perform one job', function (done) {
var metadataBackend = require('cartodb-redis')({
host: global.settings.redis_host,
port: global.settings.redis_port,
max: global.settings.redisPool,
idleTimeoutMillis: global.settings.redisIdleTimeoutMillis,
reapIntervalMillis: global.settings.redisReapIntervalMillis
});
var jobQueueProducer = new JobQueueProducer(metadataBackend);
var jobPublisher = new JobPublisher();
batch(metadataBackend);
jobQueueProducer.enqueue('vizzuality', '127.0.0.1', function (err) {
if (err) {
return done(err);
}
jobPublisher.publish('127.0.0.1');
setTimeout(function () {
done();
}, 4000);
});
});
});