From 6cc48bf9dd8f85013ff6e361fa9384ca63a071fd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Garc=C3=ADa=20Aubert?= Date: Wed, 9 Dec 2015 20:17:45 +0100 Subject: [PATCH] Implemented batch service --- app/controllers/job_controller.js | 4 +- batch/batch_manager.js | 49 ++++--- batch/batch_manager_factory.js | 26 ++++ batch/database_dequeuer.js | 45 ------- batch/index.js | 24 +--- batch/job_counter.js | 25 ---- batch/job_counter_service.js | 53 ++++++++ batch/job_dequeuer.js | 36 ----- batch/job_service.js | 124 ++++++++++++++++++ batch/query_runner.js | 100 -------------- batch/user_database_metadata_service.js | 17 +++ ...er_database_queue.js => username_queue.js} | 8 +- test/acceptance/batch.test.js | 10 +- test/prepare_db.sh | 7 +- test/test.sql | 2 + 15 files changed, 275 insertions(+), 255 deletions(-) create mode 100644 batch/batch_manager_factory.js delete mode 100644 batch/database_dequeuer.js delete mode 100644 batch/job_counter.js create mode 100644 batch/job_counter_service.js delete mode 100644 batch/job_dequeuer.js create mode 100644 batch/job_service.js delete mode 100644 batch/query_runner.js create mode 100644 batch/user_database_metadata_service.js rename batch/{user_database_queue.js => username_queue.js} (74%) diff --git a/app/controllers/job_controller.js b/app/controllers/job_controller.js index 3ab426ec..addf301c 100644 --- a/app/controllers/job_controller.js +++ b/app/controllers/job_controller.js @@ -6,7 +6,7 @@ var assert = require('assert'); var PSQL = require('cartodb-psql'); var UserDatabaseService = require('../services/user_database_service'); -var UserDatabaseQueue = require('../../batch/user_database_queue'); +var UsernameQueue = require('../../batch/username_queue'); var CdbRequest = require('../models/cartodb_request'); var handleException = require('../utils/error_handler'); @@ -17,7 +17,7 @@ function JobController(metadataBackend, tableCache, statsd_client) { this.metadataBackend = metadataBackend; this.tableCache = tableCache; this.statsd_client = statsd_client; - this.userDatabaseQueue = new UserDatabaseQueue(metadataBackend); + this.userDatabaseQueue = new UsernameQueue(metadataBackend); } JobController.prototype.route = function (app) { diff --git a/batch/batch_manager.js b/batch/batch_manager.js index 6a498118..5aa5fd79 100644 --- a/batch/batch_manager.js +++ b/batch/batch_manager.js @@ -1,33 +1,52 @@ 'use strict'; -function BatchManager(jobDequeuer, queryRunner, jobCounter) { - this.jobDequeuer = jobDequeuer; - this.queryRunner = queryRunner; - this.jobCounter = jobCounter; +function BatchManager(usernameQueue, userDatabaseMetadataService, jobService, jobCounterService) { + this.usernameQueue = usernameQueue; + this.userDatabaseMetadataService = userDatabaseMetadataService; + this.jobService = jobService; + this.jobCounterService = jobCounterService; } -BatchManager.prototype.run = function () { +BatchManager.prototype.run = function (callback) { var self = this; - this.jobDequeuer.dequeue(function (err, pg, job, host) { + this.usernameQueue.dequeue(function (err, username) { if (err) { - return console.error(err); + return callback(err); } - if (!pg || !job || !host) { - return console.info('No job launched'); + if (!username) { + return callback(new Error('No jobs scheduled')); } - self.queryRunner.run(pg, job, host, function (err) { + self.userDatabaseMetadataService.getUserMetadata(username, function (err, userDatabaseMetadata) { if (err) { - return console.error(err); + return callback(err); } - if (!this.jobCounter.decrement(host)) { - return console.warn('Job counter for instance %s is out of range', host); - } + self.jobCounterService.increment(userDatabaseMetadata.host, function (err) { + if (err) { + return callback(err); + } - console.info('Job %s done successfully', job.job_id); + self.jobService.run(userDatabaseMetadata, function (err) { + if (err) { + callback(err); + self.usernameQueue.enqueue(username, function (err) { + if (err) { + callback(err); + } + }); + } + + self.jobCounterService.decrement(userDatabaseMetadata.host, function (err) { + if (err) { + return callback(err); + } + callback(); + }); + }); + }); }); }); }; diff --git a/batch/batch_manager_factory.js b/batch/batch_manager_factory.js new file mode 100644 index 00000000..7843bcd3 --- /dev/null +++ b/batch/batch_manager_factory.js @@ -0,0 +1,26 @@ +'use strict'; + +var cartoDBRedis = require('cartodb-redis'); +var UserDatabaseMetadataService = require('./user_database_metadata_service'); +var UsernameQueue = require('./username_queue'); +var JobService = require('./job_service'); +var JobCounterService = require('./job_counter_service'); +var BatchManager = require('./batch_manager'); + +module.exports = function (maxJobsPerHost) { + var metadataBackend = cartoDBRedis({ + host: global.settings.redis_host, + port: global.settings.redis_port, + max: global.settings.redisPool, + idleTimeoutMillis: global.settings.redisIdleTimeoutMillis, + reapIntervalMillis: global.settings.redisReapIntervalMillis + }); + + var usernameQueue = new UsernameQueue(metadataBackend); + var userDatabaseMetadataService = new UserDatabaseMetadataService(metadataBackend); + var jobService = new JobService(metadataBackend); + var jobCounterService = new JobCounterService(maxJobsPerHost, metadataBackend); + var batchManager = new BatchManager(usernameQueue, userDatabaseMetadataService, jobService, jobCounterService); + + return batchManager; +}; diff --git a/batch/database_dequeuer.js b/batch/database_dequeuer.js deleted file mode 100644 index 414ca137..00000000 --- a/batch/database_dequeuer.js +++ /dev/null @@ -1,45 +0,0 @@ -'use strict'; - -function DatabaseDequeuer(userDatabaseQueue, metadataBackend, jobCounter) { - this.userDatabaseQueue = userDatabaseQueue; - this.metadataBackend = metadataBackend; - this.jobCounter = jobCounter; -} - -DatabaseDequeuer.prototype.dequeue = function (callback) { - var self = this; - - this.userDatabaseQueue.dequeue(function (err, userDatabaseName) { - if (err) { - return callback(err); - } - - if (!userDatabaseName) { - return callback(); - } - - console.log('>>>>>>>>>>>>>>>>>>>>>>>> 1'); - - self.metadataBackend.getAllUserDBParams(userDatabaseName, function (err, userDatabase) { - console.log('>>>>>>>>>>>>>>>>>>>>>>>> 2'); - if (err) { - return callback(err); - } - - if (this.jobCounter.increment(userDatabase.dbHost)) { - return callback(null, userDatabase); - } - - // host is busy, enqueue job again! - this.userDatabaseQueue.enqueue(userDatabaseName, function (err) { - if (err) { - return callback(err); - } - callback(); - }); - - }); - }); -}; - -module.exports = DatabaseDequeuer; diff --git a/batch/index.js b/batch/index.js index efba1280..b9d9635b 100644 --- a/batch/index.js +++ b/batch/index.js @@ -1,30 +1,10 @@ 'use strict'; var BatchLauncher = require('./batch_launcher'); -var BatchManager = require('./batch_manager'); -var JobDequeuer = require('./job_dequeuer'); -var QueryRunner = require('./query_runner'); -var DatabaseDequeuer = require('./database_dequeuer'); -var UserDatabaseQueue = require('./user_database_queue'); -var cartoDBRedis = require('cartodb-redis'); -var JobCounter = require('./job_counter'); +var batchManagerFactory = require('./batch_manager_factory'); module.exports = function (interval, maxJobsPerHost) { - var jobCounter = new JobCounter(maxJobsPerHost); - - var metadataBackend = cartoDBRedis({ - host: global.settings.redis_host, - port: global.settings.redis_port, - max: global.settings.redisPool, - idleTimeoutMillis: global.settings.redisIdleTimeoutMillis, - reapIntervalMillis: global.settings.redisReapIntervalMillis - }); - - var userDatabaseQueue = new UserDatabaseQueue(metadataBackend); - var databaseDequeuer = new DatabaseDequeuer(userDatabaseQueue, metadataBackend, jobCounter); - var queryRunner = new QueryRunner(); - var jobDequeuer = new JobDequeuer(databaseDequeuer); - var batchManager = new BatchManager(jobDequeuer, queryRunner); + var batchManager = batchManagerFactory(maxJobsPerHost); var batchLauncher = new BatchLauncher(batchManager); // here we go! diff --git a/batch/job_counter.js b/batch/job_counter.js deleted file mode 100644 index 6f5060f4..00000000 --- a/batch/job_counter.js +++ /dev/null @@ -1,25 +0,0 @@ -'use strict'; - -function JobsCounter(maxJobsPerIntance, metadataBackend) { - this.metadataBackend = metadataBackend; - this.maxJobsPerIntance = maxJobsPerIntance || global.settings.max_jobs_per_instance; - this.hosts = {}; -} - -JobsCounter.prototype.increment = function (host) { - if (this[host] < this.maxJobsPerHost) { - this[host] += 1; - return true; - } - return false; -}; - -JobsCounter.prototype.decrement = function (host) { - if (this[host] > 0) { - this[host] -= 1; - return true; - } - return false; - }; - -module.exports = JobsCounter; diff --git a/batch/job_counter_service.js b/batch/job_counter_service.js new file mode 100644 index 00000000..87476a30 --- /dev/null +++ b/batch/job_counter_service.js @@ -0,0 +1,53 @@ +'use strict'; + +function JobCounterService(maxJobsPerHost, metadataBackend) { + this.metadataBackend = metadataBackend; + this.maxJobsPerHost = maxJobsPerHost || global.settings.max_jobs_per_instance; + this.db = 5; +} + +JobCounterService.prototype.increment = function (host, callback) { + var self = this; + var db = this.db; + + this.metadataBackend.redisCmd(db, 'GET', [host], function (err, hostCounter) { + if (err) { + return callback(err); + } + + if (hostCounter >= self.maxJobsPerHost) { + return callback(new Error('Limit max job per host is reached: %s jobs', hostCounter)); + } + + self.metadataBackend.redisCmd(db, 'INCR', [host], function (err /*, hostCounter */) { + if (err) { + return callback(err); + } + callback(); + }); + }); +}; + +JobCounterService.prototype.decrement = function (host, callback) { + var self = this; + var db = this.db; + + this.metadataBackend.redisCmd(db, 'GET', [host], function (err, hostCounter) { + if (err) { + return callback(err); + } + + if (hostCounter < 0) { + return callback(new Error('Limit max job per host is reached')); + } + + self.metadataBackend.redisCmd(db, 'DECR', [host], function (err /*, hostCounter */) { + if (err) { + return callback(err); + } + callback(); + }); + }); + }; + +module.exports = JobCounterService; diff --git a/batch/job_dequeuer.js b/batch/job_dequeuer.js deleted file mode 100644 index 23baeac7..00000000 --- a/batch/job_dequeuer.js +++ /dev/null @@ -1,36 +0,0 @@ -'use strict'; - -var PSQL = require('cartodb-psql'); - -function JobDequeuer(databaseDequeuer) { - this.databaseDequeuer = databaseDequeuer; -} - -JobDequeuer.prototype.dequeue = function (callback) { - - this.databaseDequeuer.dequeue(function (err, userDatabase) { - if (err) { - return callback(err); - } - - if (!userDatabase) { - return callback(); - } - - var pg = new PSQL(userDatabase, {}, { destroyOnError: true }); - - var nextQuery = "select * from cdb_jobs where status='pending' order by updated_at asc limit 1"; - - pg.query(nextQuery, function (err, job) { - if (err) { - return callback(err); - } - - callback(null, pg, job, userDatabase.host); - }); - - }); - -}; - -module.exports = JobDequeuer; diff --git a/batch/job_service.js b/batch/job_service.js new file mode 100644 index 00000000..99077486 --- /dev/null +++ b/batch/job_service.js @@ -0,0 +1,124 @@ +'use strict'; + +var PSQL = require('cartodb-psql'); + +function JobService() { +} + +JobService.prototype.run = function (userDatabaseMetada, callback) { + var self = this; + + var pg = new PSQL(userDatabaseMetada, {}, { destroyOnError: true }); + + this.getJob(pg, function (err, job) { + if (err) { + return callback(err); + } + + self.setJobRunning(pg, job, function (err) { + if (err) { + return callback(err); + } + + self.runJob(pg, job.query, function (err, jobResult) { + if (err) { + self.setJobFailed(pg, job, err.message, function (err) { + if (err) { + return callback(err); + } + callback(null, jobResult); + }); + } else { + self.setJobDone(pg, job, function (err) { + if (err) { + return callback(err); + } + console.info('Job %s done successfully', job.job_id); + callback(null, jobResult); + }); + } + }); + }); + }); +}; + +JobService.prototype.runJob = function (pg, jobQuery, callback) { + // TODO: wrap select query with select into + pg.query(jobQuery, function (err, jobResult) { + if (err) { + return callback(err); + } + callback(null, jobResult); + }); +}; + +JobService.prototype.setJobRunning = function (pg, job, callback) { + var runningJobQuery = [ + 'UPDATE cdb_jobs SET ', + 'status = \'running\', ', + 'updated_at = now() ', + ' WHERE ', + 'job_id = \'' + job.job_id + '\' ', + ' RETURNING job_id;' + ].join('\n'); + + pg.query(runningJobQuery, function (err, result) { + if (err) { + return callback(err); + } + callback(null, result); + }); +}; + +JobService.prototype.setJobDone = function (pg, job, callback) { + var doneJobQuery = [ + 'UPDATE cdb_jobs SET ', + 'status = \'done\', ', + 'updated_at = now() ', + ' WHERE ', + 'job_id = \'' + job.job_id + '\' ', + ' RETURNING job_id;' + ].join('\n'); + + pg.query(doneJobQuery, function (err, result) { + if (err) { + return callback(err); + } + callback(null, result); + }); +}; + +JobService.prototype.setJobFailed = function (pg, job, message, callback) { + var failedJobQuery = [ + 'UPDATE cdb_jobs SET ', + 'status = \'failed\', ', + 'failed_reason = \'' + message + '\', ', + 'updated_at = now() ', + ' WHERE ', + 'job_id = \'' + job.job_id + '\' ', + ' RETURNING job_id;' + ].join('\n'); + + pg.query(failedJobQuery, function (err, result) { + if (err) { + return callback(err); + } + callback(null, result); + }); +}; + +JobService.prototype.getJob = function (pg, callback) { + + var getNextJob = "SELECT * FROM cdb_jobs WHERE status='pending' ORDER BY updated_at ASC LIMIT 1"; + + pg.query(getNextJob, function (err, result) { + if (err) { + return callback(err); + } + + callback(null, result.rows[0]); + }); +}; + + +module.exports = JobService; diff --git a/batch/query_runner.js b/batch/query_runner.js deleted file mode 100644 index c5f0c7d2..00000000 --- a/batch/query_runner.js +++ /dev/null @@ -1,100 +0,0 @@ -'use strict'; - -function QueryRunner() { -} - -QueryRunner.prototype.run = function (pg, job, callback) { - var self = this; - - console.log('QueryRunner.run'); - this.setJobRunning(pg, job, function (err) { - if (err) { - return callback(err); - } - - self.job(pg, job.query, function (err, jobResult) { - if (err) { - self.setJobFailed(err, pg, job, function (err) { - if (err) { - return callback(err); - } - callback(null, jobResult); - }); - } else { - self.setJobDone(pg, job, function (err) { - if (err) { - return callback(err); - } - callback(null, jobResult); - }); - } - }); - }); -}; - -QueryRunner.prototype.job = function (pg, jobQuery, callback) { - // TODO: wrap select query with select into - pg(jobQuery, function (err, jobResult) { - if (err) { - return callback(err); - } - callback(null, jobResult); - }); -}; - -QueryRunner.prototype.setJobRunning = function (pg, job, callback) { - var runningJobQuery = [ - 'UPDATE cdb_jobs SET ', - 'status = \'running\'', - 'updated_at = ' + Date.now(), - ' WHERE ', - 'job_id = \'' + job.job_id + '\', ', - ') RETURNING job_id;' - ].join('\n'); - - pg(runningJobQuery, function (err, result) { - if (err) { - return callback(err); - } - callback(null, result); - }); -}; - -QueryRunner.prototype.setJobDone = function (pg, job, callback) { - var doneJobQuery = [ - 'UPDATE cdb_jobs SET ', - 'status = \'done\'', - 'updated_at = ' + Date.now(), - ' WHERE ', - 'job_id = \'' + job.job_id + '\', ', - ') RETURNING job_id;' - ].join('\n'); - - pg(doneJobQuery, function (err, result) { - if (err) { - return callback(err); - } - callback(null, result); - }); -}; - -QueryRunner.prototype.setJobFailed = function (err, pg, job, callback) { - var failedJobQuery = [ - 'UPDATE cdb_jobs SET ', - 'status = \'failed\'', - 'failed_reason = \'' + err.message + '\'', - 'updated_at = ' + Date.now(), - ' WHERE ', - 'job_id = \'' + job.job_id + '\', ', - ') RETURNING job_id;' - ].join('\n'); - - pg(failedJobQuery, function (err, result) { - if (err) { - return callback(err); - } - callback(null, result); - }); -}; - -module.exports = QueryRunner; diff --git a/batch/user_database_metadata_service.js b/batch/user_database_metadata_service.js new file mode 100644 index 00000000..6482fb28 --- /dev/null +++ b/batch/user_database_metadata_service.js @@ -0,0 +1,17 @@ +'use strict'; + +function UserDatabaseMetadataService(metadataBackend) { + this.metadataBackend = metadataBackend; +} + +UserDatabaseMetadataService.prototype.getUserMetadata = function (username, callback) { + this.metadataBackend.getAllUserDBParams(username, function (err, userDatabaseMetadata) { + if (err) { + return callback(err); + } + + callback(null, userDatabaseMetadata); + }); +}; + +module.exports = UserDatabaseMetadataService; diff --git a/batch/user_database_queue.js b/batch/username_queue.js similarity index 74% rename from batch/user_database_queue.js rename to batch/username_queue.js index 2bd42431..8cbb97ff 100644 --- a/batch/user_database_queue.js +++ b/batch/username_queue.js @@ -1,13 +1,13 @@ 'use strict'; -function UsernameBatchQueue(metadataBackend) { +function UsernameQueue(metadataBackend) { this.metadataBackend = metadataBackend; this.db = 5; this.queueName = 'usernameBatchQueue'; } -UsernameBatchQueue.prototype.enqueue = function (cdbUsername, callback) { +UsernameQueue.prototype.enqueue = function (cdbUsername, callback) { var db = this.db; var queue = this.queueName; @@ -20,7 +20,7 @@ UsernameBatchQueue.prototype.enqueue = function (cdbUsername, callback) { }); }; -UsernameBatchQueue.prototype.dequeue = function (callback) { +UsernameQueue.prototype.dequeue = function (callback) { var db = this.db; var queue = this.queueName; @@ -33,4 +33,4 @@ UsernameBatchQueue.prototype.dequeue = function (callback) { }); }; -module.exports = UsernameBatchQueue; +module.exports = UsernameQueue; diff --git a/test/acceptance/batch.test.js b/test/acceptance/batch.test.js index 3bbb9178..6f9e8be9 100644 --- a/test/acceptance/batch.test.js +++ b/test/acceptance/batch.test.js @@ -1,8 +1,10 @@ -var batch = require('../../batch'); +var batchManagerFactory = require('../../batch/batch_manager_factory'); -describe('batch service', function() { - it.skip('run', function() { - batch(1, 1); +describe('batch manager', function() { + it('run', function (done) { + batchManagerFactory().run(function (err) { + done(err); + }); }); }); diff --git a/test/prepare_db.sh b/test/prepare_db.sh index ed69b309..c9ba73ab 100755 --- a/test/prepare_db.sh +++ b/test/prepare_db.sh @@ -133,8 +133,11 @@ HMSET rails:oauth_access_tokens:l0lPbtP68ao8NfStCiA3V3neqfM03JKhToxhUQTR \ time sometime EOF +# insert in username queue for testin jobs +cat <