From 00721bcd02e54cad47b5e0a7d39e709617ceb215 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Garc=C3=ADa=20Aubert?= Date: Wed, 9 Dec 2015 00:02:08 +0100 Subject: [PATCH] Implementing batch service --- app/controllers/job_controller.js | 27 ++++++-- batch/batch_launcher.js | 21 +++++++ batch/batch_manager.js | 35 +++++++++++ batch/database_dequeuer.js | 43 +++++++++++++ batch/index.js | 31 +++++++++ batch/job_counter.js | 24 +++++++ batch/job_dequeuer.js | 36 +++++++++++ batch/query_runner.js | 100 ++++++++++++++++++++++++++++++ batch/user_database_queue.js | 66 ++++++++++++++++++++ package.json | 3 +- test/acceptance/batch.test.js | 8 +++ test/acceptance/job.test.js | 2 +- test/test.sql | 3 +- 13 files changed, 392 insertions(+), 7 deletions(-) create mode 100644 batch/batch_launcher.js create mode 100644 batch/batch_manager.js create mode 100644 batch/database_dequeuer.js create mode 100644 batch/index.js create mode 100644 batch/job_counter.js create mode 100644 batch/job_dequeuer.js create mode 100644 batch/query_runner.js create mode 100644 batch/user_database_queue.js create mode 100644 test/acceptance/batch.test.js diff --git a/app/controllers/job_controller.js b/app/controllers/job_controller.js index 8e725d1c..9c879031 100644 --- a/app/controllers/job_controller.js +++ b/app/controllers/job_controller.js @@ -6,11 +6,13 @@ var assert = require('assert'); var PSQL = require('cartodb-psql'); var UserDatabaseService = require('../services/user_database_service'); +var UserDatabaseQueue = require('../../batch/user_database_queue'); var CdbRequest = require('../models/cartodb_request'); var handleException = require('../utils/error_handler'); var cdbReq = new CdbRequest(); var userDatabaseService = new UserDatabaseService(); +var userDatabaseQueue = new UserDatabaseQueue(); function JobController(metadataBackend, tableCache, statsd_client) { this.metadataBackend = metadataBackend; @@ -73,7 +75,7 @@ JobController.prototype.handleJob = function (req, res) { }; userDatabaseService.getUserDatabase(options, this); }, - function enqueueJob(err, userDatabase) { + function persistJob(err, userDatabase) { assert.ifError(err); var next = this; @@ -86,7 +88,7 @@ JobController.prototype.handleJob = function (req, res) { pg = new PSQL(userDatabase, {}, { destroyOnError: true }); - var enqueueJobQuery = [ + var persistJobQuery = [ 'INSERT INTO cdb_jobs (', 'user_id, query', ') VALUES (', @@ -95,13 +97,30 @@ JobController.prototype.handleJob = function (req, res) { ') RETURNING job_id;' ].join('\n'); - pg.query(enqueueJobQuery, function (err, result) { + pg.query(persistJobQuery, function (err, result) { if (err) { return next(err); } + next(null, { job: result, - host: userDatabase.host + userDatabase: userDatabase + }); + }); + }, + function enqueueUserDatabase(err, result) { + assert.ifError(err); + + var next = this; + + userDatabaseQueue.enqueue(cdbUsername, function (err) { + if (err) { + return next(err); + } + + next(null, { + job: result.job, + host: result.userDatabase.host }); }); }, diff --git a/batch/batch_launcher.js b/batch/batch_launcher.js new file mode 100644 index 00000000..66686ad3 --- /dev/null +++ b/batch/batch_launcher.js @@ -0,0 +1,21 @@ +'use strict'; + +function BatchLauncher(batchManager) { + this.batchManager = batchManager; + this.batchInterval = global.settings.batch_interval; +} + +BatchLauncher.prototype.start = function (interval) { + var self = this; + interval = this.batchInterval || interval || 5000; + + this.intervalCallback = setInterval(function () { + self.batchManager.run(); + }, interval); +}; + +BatchLauncher.prototype.stop = function () { + clearInterval(this.intervalCallback); +}; + +module.exports = BatchLauncher; diff --git a/batch/batch_manager.js b/batch/batch_manager.js new file mode 100644 index 00000000..6a498118 --- /dev/null +++ b/batch/batch_manager.js @@ -0,0 +1,35 @@ +'use strict'; + +function BatchManager(jobDequeuer, queryRunner, jobCounter) { + this.jobDequeuer = jobDequeuer; + this.queryRunner = queryRunner; + this.jobCounter = jobCounter; +} + +BatchManager.prototype.run = function () { + var self = this; + + this.jobDequeuer.dequeue(function (err, pg, job, host) { + if (err) { + return console.error(err); + } + + if (!pg || !job || !host) { + return console.info('No job launched'); + } + + self.queryRunner.run(pg, job, host, function (err) { + if (err) { + return console.error(err); + } + + if (!this.jobCounter.decrement(host)) { + return console.warn('Job counter for instance %s is out of range', host); + } + + console.info('Job %s done successfully', job.job_id); + }); + }); +}; + +module.exports = BatchManager; diff --git a/batch/database_dequeuer.js b/batch/database_dequeuer.js new file mode 100644 index 00000000..64f1651f --- /dev/null +++ b/batch/database_dequeuer.js @@ -0,0 +1,43 @@ +'use strict'; + +function DatabaseDequeuer(userDatabaseQueue, metadataBackend, jobCounter) { + this.userDatabaseQueue = userDatabaseQueue; + this.metadataBackend = metadataBackend; + this.jobCounter = jobCounter; +} + +DatabaseDequeuer.prototype.dequeue = function (callback) { + var self = this; + + this.userDatabaseQueue.dequeue(function (err, userDatabaseName) { + if (err) { + return callback(err); + } + + if (!userDatabaseName) { + return callback(); + } + + self.metadataBackend.getAllUserDBParams(userDatabaseName, function (err, userDatabase) { + console.log('>>>>', userDatabaseName, userDatabase); + if (err) { + return callback(err); + } + + if (this.jobCounter.increment(userDatabase.dbHost)) { + return callback(null, userDatabase); + } + + // host is busy, enqueue job again! + this.userDatabaseQueue.enqueue(userDatabaseName, function (err) { + if (err) { + return callback(err); + } + callback(); + }); + + }); + }); +}; + +module.exports = DatabaseDequeuer; diff --git a/batch/index.js b/batch/index.js new file mode 100644 index 00000000..b0de8e20 --- /dev/null +++ b/batch/index.js @@ -0,0 +1,31 @@ +'use strict'; + +var BatchLauncher = require('./batch_launcher'); +var BatchManager = require('./batch_manager'); +var JobDequeuer = require('./job_dequeuer'); +var QueryRunner = require('./query_runner'); +var DatabaseDequeuer = require('./database_dequeuer'); +var UserDatabaseQueue = require('./user_database_queue'); +var cartoDBRedis = require('cartodb-redis'); +var JobCounter = require('./job_counter'); + +module.exports = function (interval, maxJobsPerHost) { + var jobCounter = new JobCounter(maxJobsPerHost); + var metadataBackend = cartoDBRedis({ + host: global.settings.redis_host, + port: global.settings.redis_port, + max: global.settings.redisPool, + idleTimeoutMillis: global.settings.redisIdleTimeoutMillis, + reapIntervalMillis: global.settings.redisReapIntervalMillis + }); + + var userDatabaseQueue = new UserDatabaseQueue(); + var databaseDequeuer = new DatabaseDequeuer(userDatabaseQueue, metadataBackend, jobCounter); + var queryRunner = new QueryRunner(); + var jobDequeuer = new JobDequeuer(databaseDequeuer); + var batchManager = new BatchManager(jobDequeuer, queryRunner); + var batchLauncher = new BatchLauncher(batchManager); + + // here we go! + batchLauncher.start(interval); +}; diff --git a/batch/job_counter.js b/batch/job_counter.js new file mode 100644 index 00000000..5e252f1c --- /dev/null +++ b/batch/job_counter.js @@ -0,0 +1,24 @@ +'use strict'; + +function JobsCounter(maxJobsPerIntance) { + this.maxJobsPerIntance = maxJobsPerIntance || global.settings.max_jobs_per_instance; + this.hosts = {}; +} + +JobsCounter.prototype.increment = function (host) { + if (this[host] < this.maxJobsPerHost) { + this[host] += 1; + return true; + } + return false; +}; + +JobsCounter.prototype.decrement = function (host) { + if (this[host] > 0) { + this[host] -= 1; + return true; + } + return false; + }; + +module.exports = JobsCounter; diff --git a/batch/job_dequeuer.js b/batch/job_dequeuer.js new file mode 100644 index 00000000..23baeac7 --- /dev/null +++ b/batch/job_dequeuer.js @@ -0,0 +1,36 @@ +'use strict'; + +var PSQL = require('cartodb-psql'); + +function JobDequeuer(databaseDequeuer) { + this.databaseDequeuer = databaseDequeuer; +} + +JobDequeuer.prototype.dequeue = function (callback) { + + this.databaseDequeuer.dequeue(function (err, userDatabase) { + if (err) { + return callback(err); + } + + if (!userDatabase) { + return callback(); + } + + var pg = new PSQL(userDatabase, {}, { destroyOnError: true }); + + var nextQuery = "select * from cdb_jobs where status='pending' order by updated_at asc limit 1"; + + pg.query(nextQuery, function (err, job) { + if (err) { + return callback(err); + } + + callback(null, pg, job, userDatabase.host); + }); + + }); + +}; + +module.exports = JobDequeuer; diff --git a/batch/query_runner.js b/batch/query_runner.js new file mode 100644 index 00000000..c5f0c7d2 --- /dev/null +++ b/batch/query_runner.js @@ -0,0 +1,100 @@ +'use strict'; + +function QueryRunner() { +} + +QueryRunner.prototype.run = function (pg, job, callback) { + var self = this; + + console.log('QueryRunner.run'); + this.setJobRunning(pg, job, function (err) { + if (err) { + return callback(err); + } + + self.job(pg, job.query, function (err, jobResult) { + if (err) { + self.setJobFailed(err, pg, job, function (err) { + if (err) { + return callback(err); + } + callback(null, jobResult); + }); + } else { + self.setJobDone(pg, job, function (err) { + if (err) { + return callback(err); + } + callback(null, jobResult); + }); + } + }); + }); +}; + +QueryRunner.prototype.job = function (pg, jobQuery, callback) { + // TODO: wrap select query with select into + pg(jobQuery, function (err, jobResult) { + if (err) { + return callback(err); + } + callback(null, jobResult); + }); +}; + +QueryRunner.prototype.setJobRunning = function (pg, job, callback) { + var runningJobQuery = [ + 'UPDATE cdb_jobs SET ', + 'status = \'running\'', + 'updated_at = ' + Date.now(), + ' WHERE ', + 'job_id = \'' + job.job_id + '\', ', + ') RETURNING job_id;' + ].join('\n'); + + pg(runningJobQuery, function (err, result) { + if (err) { + return callback(err); + } + callback(null, result); + }); +}; + +QueryRunner.prototype.setJobDone = function (pg, job, callback) { + var doneJobQuery = [ + 'UPDATE cdb_jobs SET ', + 'status = \'done\'', + 'updated_at = ' + Date.now(), + ' WHERE ', + 'job_id = \'' + job.job_id + '\', ', + ') RETURNING job_id;' + ].join('\n'); + + pg(doneJobQuery, function (err, result) { + if (err) { + return callback(err); + } + callback(null, result); + }); +}; + +QueryRunner.prototype.setJobFailed = function (err, pg, job, callback) { + var failedJobQuery = [ + 'UPDATE cdb_jobs SET ', + 'status = \'failed\'', + 'failed_reason = \'' + err.message + '\'', + 'updated_at = ' + Date.now(), + ' WHERE ', + 'job_id = \'' + job.job_id + '\', ', + ') RETURNING job_id;' + ].join('\n'); + + pg(failedJobQuery, function (err, result) { + if (err) { + return callback(err); + } + callback(null, result); + }); +}; + +module.exports = QueryRunner; diff --git a/batch/user_database_queue.js b/batch/user_database_queue.js new file mode 100644 index 00000000..cabc5de7 --- /dev/null +++ b/batch/user_database_queue.js @@ -0,0 +1,66 @@ +'use strict'; + +var RedisPool = require("redis-mpool"); +var _ = require('underscore'); + +function UserDatabaseQueue(poolOptions) { + poolOptions = poolOptions || {}; + var defaults = { + slowQueries: { + log: false, + elapsedThreshold: 25 + } + }; + + var options = _.defaults(poolOptions, defaults); + + this.redisPool = (options.pool) ? + poolOptions.pool : + new RedisPool(_.extend(poolOptions, { + name: 'userDatabaseQueue', + db: 12 + })); + this.poolOptions = poolOptions; +} + +UserDatabaseQueue.prototype.enqueue = function (userDatabaseName, callback) { + var self = this; + var db = this.poolOptions.db; + var queue = this.poolOptions.name; + + this.redisPool.acquire(db, function (err, client) { + if (err) { + return callback(err); + } + + client.lpush(queue, [ userDatabaseName ], function (err, userDataName) { + if (err) { + return callback(err); + } + self.redisPool.release(db, client); + callback(null, userDataName); + }); + }); +}; + +UserDatabaseQueue.prototype.dequeue = function (callback) { + var self = this; + var db = this.poolOptions.db; + var queue = this.poolOptions.name; + + this.redisPool.acquire(db, function (err, client) { + if (err) { + return callback(err); + } + + client.rpop(queue, function (err, userDatabaseName) { + if (err) { + return callback(err); + } + self.redisPool.release(db, client); + callback(null, userDatabaseName); + }); + }); +}; + +module.exports = UserDatabaseQueue; diff --git a/package.json b/package.json index 8bb21efe..4211e683 100644 --- a/package.json +++ b/package.json @@ -27,7 +27,8 @@ "lru-cache":"~2.5.0", "log4js": "https://github.com/CartoDB/log4js-node/tarball/cdb", "rollbar": "~0.3.2", - "node-statsd": "~0.0.7" + "node-statsd": "~0.0.7", + "redis-mpool": "git://github.com/CartoDB/node-redis-mpool.git#0.1.0" }, "devDependencies": { "redis": "0.7.1", diff --git a/test/acceptance/batch.test.js b/test/acceptance/batch.test.js new file mode 100644 index 00000000..3bbb9178 --- /dev/null +++ b/test/acceptance/batch.test.js @@ -0,0 +1,8 @@ + +var batch = require('../../batch'); + +describe('batch service', function() { + it.skip('run', function() { + batch(1, 1); + }); +}); diff --git a/test/acceptance/job.test.js b/test/acceptance/job.test.js index e5e03d20..fef203fd 100644 --- a/test/acceptance/job.test.js +++ b/test/acceptance/job.test.js @@ -37,7 +37,7 @@ describe('job.test', function() { it('GET /api/v2/job with SQL parameter on SELECT no database param,just id using headers', function(done){ assert.response(app, { - url: '/api/v2/job?q=' + querystring.stringify({ q: "SELECT * FROM untitle_table_4" }), + url: '/api/v2/job?' + querystring.stringify({ q: "SELECT * FROM untitle_table_4" }), headers: { host: 'vizzuality.cartodb.com' }, method: 'GET' }, { diff --git a/test/test.sql b/test/test.sql index 0adfbc13..cb1b3c46 100644 --- a/test/test.sql +++ b/test/test.sql @@ -28,7 +28,8 @@ CREATE TABLE cdb_jobs ( status character varying DEFAULT 'pending', query character varying, updated_at timestamp without time zone DEFAULT now(), - created_at timestamp without time zone DEFAULT now() + created_at timestamp without time zone DEFAULT now(), + failed_reason character varying ); ALTER TABLE ONLY cdb_jobs ADD CONSTRAINT cdb_jobs_pkey PRIMARY KEY (job_id);