Implementing batch service

This commit is contained in:
Daniel García Aubert 2015-12-09 00:02:08 +01:00
parent 6f741827cd
commit 00721bcd02
13 changed files with 392 additions and 7 deletions

View File

@ -6,11 +6,13 @@ var assert = require('assert');
var PSQL = require('cartodb-psql'); var PSQL = require('cartodb-psql');
var UserDatabaseService = require('../services/user_database_service'); var UserDatabaseService = require('../services/user_database_service');
var UserDatabaseQueue = require('../../batch/user_database_queue');
var CdbRequest = require('../models/cartodb_request'); var CdbRequest = require('../models/cartodb_request');
var handleException = require('../utils/error_handler'); var handleException = require('../utils/error_handler');
var cdbReq = new CdbRequest(); var cdbReq = new CdbRequest();
var userDatabaseService = new UserDatabaseService(); var userDatabaseService = new UserDatabaseService();
var userDatabaseQueue = new UserDatabaseQueue();
function JobController(metadataBackend, tableCache, statsd_client) { function JobController(metadataBackend, tableCache, statsd_client) {
this.metadataBackend = metadataBackend; this.metadataBackend = metadataBackend;
@ -73,7 +75,7 @@ JobController.prototype.handleJob = function (req, res) {
}; };
userDatabaseService.getUserDatabase(options, this); userDatabaseService.getUserDatabase(options, this);
}, },
function enqueueJob(err, userDatabase) { function persistJob(err, userDatabase) {
assert.ifError(err); assert.ifError(err);
var next = this; var next = this;
@ -86,7 +88,7 @@ JobController.prototype.handleJob = function (req, res) {
pg = new PSQL(userDatabase, {}, { destroyOnError: true }); pg = new PSQL(userDatabase, {}, { destroyOnError: true });
var enqueueJobQuery = [ var persistJobQuery = [
'INSERT INTO cdb_jobs (', 'INSERT INTO cdb_jobs (',
'user_id, query', 'user_id, query',
') VALUES (', ') VALUES (',
@ -95,13 +97,30 @@ JobController.prototype.handleJob = function (req, res) {
') RETURNING job_id;' ') RETURNING job_id;'
].join('\n'); ].join('\n');
pg.query(enqueueJobQuery, function (err, result) { pg.query(persistJobQuery, function (err, result) {
if (err) { if (err) {
return next(err); return next(err);
} }
next(null, { next(null, {
job: result, job: result,
host: userDatabase.host userDatabase: userDatabase
});
});
},
function enqueueUserDatabase(err, result) {
assert.ifError(err);
var next = this;
userDatabaseQueue.enqueue(cdbUsername, function (err) {
if (err) {
return next(err);
}
next(null, {
job: result.job,
host: result.userDatabase.host
}); });
}); });
}, },

21
batch/batch_launcher.js Normal file
View File

@ -0,0 +1,21 @@
'use strict';
function BatchLauncher(batchManager) {
this.batchManager = batchManager;
this.batchInterval = global.settings.batch_interval;
}
BatchLauncher.prototype.start = function (interval) {
var self = this;
interval = this.batchInterval || interval || 5000;
this.intervalCallback = setInterval(function () {
self.batchManager.run();
}, interval);
};
BatchLauncher.prototype.stop = function () {
clearInterval(this.intervalCallback);
};
module.exports = BatchLauncher;

35
batch/batch_manager.js Normal file
View File

@ -0,0 +1,35 @@
'use strict';
function BatchManager(jobDequeuer, queryRunner, jobCounter) {
this.jobDequeuer = jobDequeuer;
this.queryRunner = queryRunner;
this.jobCounter = jobCounter;
}
BatchManager.prototype.run = function () {
var self = this;
this.jobDequeuer.dequeue(function (err, pg, job, host) {
if (err) {
return console.error(err);
}
if (!pg || !job || !host) {
return console.info('No job launched');
}
self.queryRunner.run(pg, job, host, function (err) {
if (err) {
return console.error(err);
}
if (!this.jobCounter.decrement(host)) {
return console.warn('Job counter for instance %s is out of range', host);
}
console.info('Job %s done successfully', job.job_id);
});
});
};
module.exports = BatchManager;

View File

@ -0,0 +1,43 @@
'use strict';
function DatabaseDequeuer(userDatabaseQueue, metadataBackend, jobCounter) {
this.userDatabaseQueue = userDatabaseQueue;
this.metadataBackend = metadataBackend;
this.jobCounter = jobCounter;
}
DatabaseDequeuer.prototype.dequeue = function (callback) {
var self = this;
this.userDatabaseQueue.dequeue(function (err, userDatabaseName) {
if (err) {
return callback(err);
}
if (!userDatabaseName) {
return callback();
}
self.metadataBackend.getAllUserDBParams(userDatabaseName, function (err, userDatabase) {
console.log('>>>>', userDatabaseName, userDatabase);
if (err) {
return callback(err);
}
if (this.jobCounter.increment(userDatabase.dbHost)) {
return callback(null, userDatabase);
}
// host is busy, enqueue job again!
this.userDatabaseQueue.enqueue(userDatabaseName, function (err) {
if (err) {
return callback(err);
}
callback();
});
});
});
};
module.exports = DatabaseDequeuer;

31
batch/index.js Normal file
View File

@ -0,0 +1,31 @@
'use strict';
var BatchLauncher = require('./batch_launcher');
var BatchManager = require('./batch_manager');
var JobDequeuer = require('./job_dequeuer');
var QueryRunner = require('./query_runner');
var DatabaseDequeuer = require('./database_dequeuer');
var UserDatabaseQueue = require('./user_database_queue');
var cartoDBRedis = require('cartodb-redis');
var JobCounter = require('./job_counter');
module.exports = function (interval, maxJobsPerHost) {
var jobCounter = new JobCounter(maxJobsPerHost);
var metadataBackend = cartoDBRedis({
host: global.settings.redis_host,
port: global.settings.redis_port,
max: global.settings.redisPool,
idleTimeoutMillis: global.settings.redisIdleTimeoutMillis,
reapIntervalMillis: global.settings.redisReapIntervalMillis
});
var userDatabaseQueue = new UserDatabaseQueue();
var databaseDequeuer = new DatabaseDequeuer(userDatabaseQueue, metadataBackend, jobCounter);
var queryRunner = new QueryRunner();
var jobDequeuer = new JobDequeuer(databaseDequeuer);
var batchManager = new BatchManager(jobDequeuer, queryRunner);
var batchLauncher = new BatchLauncher(batchManager);
// here we go!
batchLauncher.start(interval);
};

24
batch/job_counter.js Normal file
View File

@ -0,0 +1,24 @@
'use strict';
function JobsCounter(maxJobsPerIntance) {
this.maxJobsPerIntance = maxJobsPerIntance || global.settings.max_jobs_per_instance;
this.hosts = {};
}
JobsCounter.prototype.increment = function (host) {
if (this[host] < this.maxJobsPerHost) {
this[host] += 1;
return true;
}
return false;
};
JobsCounter.prototype.decrement = function (host) {
if (this[host] > 0) {
this[host] -= 1;
return true;
}
return false;
};
module.exports = JobsCounter;

36
batch/job_dequeuer.js Normal file
View File

@ -0,0 +1,36 @@
'use strict';
var PSQL = require('cartodb-psql');
function JobDequeuer(databaseDequeuer) {
this.databaseDequeuer = databaseDequeuer;
}
JobDequeuer.prototype.dequeue = function (callback) {
this.databaseDequeuer.dequeue(function (err, userDatabase) {
if (err) {
return callback(err);
}
if (!userDatabase) {
return callback();
}
var pg = new PSQL(userDatabase, {}, { destroyOnError: true });
var nextQuery = "select * from cdb_jobs where status='pending' order by updated_at asc limit 1";
pg.query(nextQuery, function (err, job) {
if (err) {
return callback(err);
}
callback(null, pg, job, userDatabase.host);
});
});
};
module.exports = JobDequeuer;

100
batch/query_runner.js Normal file
View File

@ -0,0 +1,100 @@
'use strict';
function QueryRunner() {
}
QueryRunner.prototype.run = function (pg, job, callback) {
var self = this;
console.log('QueryRunner.run');
this.setJobRunning(pg, job, function (err) {
if (err) {
return callback(err);
}
self.job(pg, job.query, function (err, jobResult) {
if (err) {
self.setJobFailed(err, pg, job, function (err) {
if (err) {
return callback(err);
}
callback(null, jobResult);
});
} else {
self.setJobDone(pg, job, function (err) {
if (err) {
return callback(err);
}
callback(null, jobResult);
});
}
});
});
};
QueryRunner.prototype.job = function (pg, jobQuery, callback) {
// TODO: wrap select query with select into
pg(jobQuery, function (err, jobResult) {
if (err) {
return callback(err);
}
callback(null, jobResult);
});
};
QueryRunner.prototype.setJobRunning = function (pg, job, callback) {
var runningJobQuery = [
'UPDATE cdb_jobs SET ',
'status = \'running\'',
'updated_at = ' + Date.now(),
' WHERE ',
'job_id = \'' + job.job_id + '\', ',
') RETURNING job_id;'
].join('\n');
pg(runningJobQuery, function (err, result) {
if (err) {
return callback(err);
}
callback(null, result);
});
};
QueryRunner.prototype.setJobDone = function (pg, job, callback) {
var doneJobQuery = [
'UPDATE cdb_jobs SET ',
'status = \'done\'',
'updated_at = ' + Date.now(),
' WHERE ',
'job_id = \'' + job.job_id + '\', ',
') RETURNING job_id;'
].join('\n');
pg(doneJobQuery, function (err, result) {
if (err) {
return callback(err);
}
callback(null, result);
});
};
QueryRunner.prototype.setJobFailed = function (err, pg, job, callback) {
var failedJobQuery = [
'UPDATE cdb_jobs SET ',
'status = \'failed\'',
'failed_reason = \'' + err.message + '\'',
'updated_at = ' + Date.now(),
' WHERE ',
'job_id = \'' + job.job_id + '\', ',
') RETURNING job_id;'
].join('\n');
pg(failedJobQuery, function (err, result) {
if (err) {
return callback(err);
}
callback(null, result);
});
};
module.exports = QueryRunner;

View File

@ -0,0 +1,66 @@
'use strict';
var RedisPool = require("redis-mpool");
var _ = require('underscore');
function UserDatabaseQueue(poolOptions) {
poolOptions = poolOptions || {};
var defaults = {
slowQueries: {
log: false,
elapsedThreshold: 25
}
};
var options = _.defaults(poolOptions, defaults);
this.redisPool = (options.pool) ?
poolOptions.pool :
new RedisPool(_.extend(poolOptions, {
name: 'userDatabaseQueue',
db: 12
}));
this.poolOptions = poolOptions;
}
UserDatabaseQueue.prototype.enqueue = function (userDatabaseName, callback) {
var self = this;
var db = this.poolOptions.db;
var queue = this.poolOptions.name;
this.redisPool.acquire(db, function (err, client) {
if (err) {
return callback(err);
}
client.lpush(queue, [ userDatabaseName ], function (err, userDataName) {
if (err) {
return callback(err);
}
self.redisPool.release(db, client);
callback(null, userDataName);
});
});
};
UserDatabaseQueue.prototype.dequeue = function (callback) {
var self = this;
var db = this.poolOptions.db;
var queue = this.poolOptions.name;
this.redisPool.acquire(db, function (err, client) {
if (err) {
return callback(err);
}
client.rpop(queue, function (err, userDatabaseName) {
if (err) {
return callback(err);
}
self.redisPool.release(db, client);
callback(null, userDatabaseName);
});
});
};
module.exports = UserDatabaseQueue;

View File

@ -27,7 +27,8 @@
"lru-cache":"~2.5.0", "lru-cache":"~2.5.0",
"log4js": "https://github.com/CartoDB/log4js-node/tarball/cdb", "log4js": "https://github.com/CartoDB/log4js-node/tarball/cdb",
"rollbar": "~0.3.2", "rollbar": "~0.3.2",
"node-statsd": "~0.0.7" "node-statsd": "~0.0.7",
"redis-mpool": "git://github.com/CartoDB/node-redis-mpool.git#0.1.0"
}, },
"devDependencies": { "devDependencies": {
"redis": "0.7.1", "redis": "0.7.1",

View File

@ -0,0 +1,8 @@
var batch = require('../../batch');
describe('batch service', function() {
it.skip('run', function() {
batch(1, 1);
});
});

View File

@ -37,7 +37,7 @@ describe('job.test', function() {
it('GET /api/v2/job with SQL parameter on SELECT no database param,just id using headers', function(done){ it('GET /api/v2/job with SQL parameter on SELECT no database param,just id using headers', function(done){
assert.response(app, { assert.response(app, {
url: '/api/v2/job?q=' + querystring.stringify({ q: "SELECT * FROM untitle_table_4" }), url: '/api/v2/job?' + querystring.stringify({ q: "SELECT * FROM untitle_table_4" }),
headers: { host: 'vizzuality.cartodb.com' }, headers: { host: 'vizzuality.cartodb.com' },
method: 'GET' method: 'GET'
}, { }, {

View File

@ -28,7 +28,8 @@ CREATE TABLE cdb_jobs (
status character varying DEFAULT 'pending', status character varying DEFAULT 'pending',
query character varying, query character varying,
updated_at timestamp without time zone DEFAULT now(), updated_at timestamp without time zone DEFAULT now(),
created_at timestamp without time zone DEFAULT now() created_at timestamp without time zone DEFAULT now(),
failed_reason character varying
); );
ALTER TABLE ONLY cdb_jobs ADD CONSTRAINT cdb_jobs_pkey PRIMARY KEY (job_id); ALTER TABLE ONLY cdb_jobs ADD CONSTRAINT cdb_jobs_pkey PRIMARY KEY (job_id);