From 43f759e96a5984642c925491701dbdea77490276 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Garc=C3=ADa=20Aubert?= Date: Wed, 16 Dec 2015 15:57:58 +0100 Subject: [PATCH] Refactored Batch API using streams instead a interval to consume the job queue. Limited one job running at the same time per queue instead of using a job's counter to limit it. --- app/app.js | 4 +- app/controllers/job_controller.js | 10 +- batch/batch_launcher.js | 25 ----- batch/batch_manager.js | 61 ------------ batch/batch_manager_factory.js | 17 ---- batch/index.js | 51 ++++++++-- batch/job.js | 114 ++++++++++++++++++++++ batch/job_counter_service.js | 53 ---------- batch/job_publisher.js | 14 +++ batch/job_queue_consumer.js | 28 ++++++ batch/job_queue_pool.js | 23 +++++ batch/job_queue_producer.js | 15 +++ batch/job_service.js | 124 ++---------------------- batch/job_subscriber.js | 15 +++ batch/user_database_metadata_service.js | 1 - batch/username_queue.js | 24 ----- package.json | 95 +++++++++--------- test/acceptance/batch.test.js | 35 +++++-- 18 files changed, 346 insertions(+), 363 deletions(-) delete mode 100644 batch/batch_launcher.js delete mode 100644 batch/batch_manager.js delete mode 100644 batch/batch_manager_factory.js create mode 100644 batch/job.js delete mode 100644 batch/job_counter_service.js create mode 100644 batch/job_publisher.js create mode 100644 batch/job_queue_consumer.js create mode 100644 batch/job_queue_pool.js create mode 100644 batch/job_queue_producer.js create mode 100644 batch/job_subscriber.js delete mode 100644 batch/username_queue.js diff --git a/app/app.js b/app/app.js index 17e8e798..8424c09d 100644 --- a/app/app.js +++ b/app/app.js @@ -28,7 +28,7 @@ var CacheStatusController = require('./controllers/cache_status_controller'); var HealthCheckController = require('./controllers/health_check_controller'); var VersionController = require('./controllers/version_controller'); -var batchService = require('../batch'); +var batch = require('../batch'); process.env.PGAPPNAME = process.env.PGAPPNAME || 'cartodb_sqlapi'; @@ -181,7 +181,7 @@ function App() { versionController.route(app); if (global.settings.environment !== 'test') { - batchService(metadataBackend, 5000, 100); + batch(metadataBackend); } return app; diff --git a/app/controllers/job_controller.js b/app/controllers/job_controller.js index addf301c..fd296c5d 100644 --- a/app/controllers/job_controller.js +++ b/app/controllers/job_controller.js @@ -6,18 +6,20 @@ var assert = require('assert'); var PSQL = require('cartodb-psql'); var UserDatabaseService = require('../services/user_database_service'); -var UsernameQueue = require('../../batch/username_queue'); +var JobPublisher = require('../../batch/job_publisher'); +var JobQueueProducer = require('../../batch/job_queue_producer'); var CdbRequest = require('../models/cartodb_request'); var handleException = require('../utils/error_handler'); var cdbReq = new CdbRequest(); var userDatabaseService = new UserDatabaseService(); +var jobPublisher = new JobPublisher(); function JobController(metadataBackend, tableCache, statsd_client) { this.metadataBackend = metadataBackend; this.tableCache = tableCache; this.statsd_client = statsd_client; - this.userDatabaseQueue = new UsernameQueue(metadataBackend); + this.jobQueueProducer = new JobQueueProducer(metadataBackend); } JobController.prototype.route = function (app) { @@ -113,11 +115,13 @@ JobController.prototype.handleJob = function (req, res) { var next = this; - self.userDatabaseQueue.enqueue(cdbUsername, function (err) { + self.jobQueueProducer.enqueue(cdbUsername, result.userDatabase.host, function (err) { if (err) { return next(err); } + jobPublisher.publish(result.userDatabase.host); + next(null, { job: result.job, host: result.userDatabase.host diff --git a/batch/batch_launcher.js b/batch/batch_launcher.js deleted file mode 100644 index 8c03e0f3..00000000 --- a/batch/batch_launcher.js +++ /dev/null @@ -1,25 +0,0 @@ -'use strict'; - -function BatchLauncher(batchManager) { - this.batchManager = batchManager; - this.batchInterval = global.settings.batch_interval; -} - -BatchLauncher.prototype.start = function (interval) { - var self = this; - interval = this.batchInterval || interval || 5000; - - this.intervalCallback = setInterval(function () { - self.batchManager.run(function (err) { - if (err) { - console.log('Error in batch service: ', err); - } - }); - }, interval); -}; - -BatchLauncher.prototype.stop = function () { - clearInterval(this.intervalCallback); -}; - -module.exports = BatchLauncher; diff --git a/batch/batch_manager.js b/batch/batch_manager.js deleted file mode 100644 index ebca6ecb..00000000 --- a/batch/batch_manager.js +++ /dev/null @@ -1,61 +0,0 @@ -'use strict'; - -function BatchManager(usernameQueue, userDatabaseMetadataService, jobService, jobCounterService) { - this.usernameQueue = usernameQueue; - this.userDatabaseMetadataService = userDatabaseMetadataService; - this.jobService = jobService; - this.jobCounterService = jobCounterService; -} - -BatchManager.prototype.run = function (callback) { - var self = this; - - this.usernameQueue.dequeue(function (err, username) { - if (err) { - return callback(err); - } - - if (!username) { - return callback(); // no jobs scheduled - } - - self.userDatabaseMetadataService.getUserMetadata(username, function (err, userDatabaseMetadata) { - if (err) { - return callback(err); - } - - self.jobCounterService.increment(userDatabaseMetadata.host, function (err) { - if (err && err.name === 'JobLimitReachedError') { - self.usernameQueue.enqueue(username, function (err) { - if (err) { - callback(err); - } - callback(); - }); - } else if (err) { - return callback(err); - } - - self.jobService.run(userDatabaseMetadata, function (err) { - if (err) { - self.usernameQueue.enqueue(username, function (err) { - if (err) { - callback(err); - } - callback(); - }); - } - - self.jobCounterService.decrement(userDatabaseMetadata.host, function (err) { - if (err) { - return callback(err); - } - callback(); - }); - }); - }); - }); - }); -}; - -module.exports = BatchManager; diff --git a/batch/batch_manager_factory.js b/batch/batch_manager_factory.js deleted file mode 100644 index 8eaf8d66..00000000 --- a/batch/batch_manager_factory.js +++ /dev/null @@ -1,17 +0,0 @@ -'use strict'; - -var UserDatabaseMetadataService = require('./user_database_metadata_service'); -var UsernameQueue = require('./username_queue'); -var JobService = require('./job_service'); -var JobCounterService = require('./job_counter_service'); -var BatchManager = require('./batch_manager'); - -module.exports = function (metadataBackend ,maxJobsPerHost) { - var usernameQueue = new UsernameQueue(metadataBackend); - var userDatabaseMetadataService = new UserDatabaseMetadataService(metadataBackend); - var jobService = new JobService(metadataBackend); - var jobCounterService = new JobCounterService(maxJobsPerHost, metadataBackend); - var batchManager = new BatchManager(usernameQueue, userDatabaseMetadataService, jobService, jobCounterService); - - return batchManager; -}; diff --git a/batch/index.js b/batch/index.js index 89373233..d5d09ef6 100644 --- a/batch/index.js +++ b/batch/index.js @@ -1,12 +1,49 @@ 'use strict'; -var BatchLauncher = require('./batch_launcher'); -var batchManagerFactory = require('./batch_manager_factory'); +var Job = require('./job'); +var JobQueuePool = require('./job_queue_pool'); +var JobQueueConsumer = require('./job_queue_consumer'); +var JobSubscriber = require('./job_subscriber'); +var UserDatabaseMetadataService = require('./user_database_metadata_service'); +var JobService = require('./job_service'); -module.exports = function (metadataBackend, interval, maxJobsPerHost) { - var batchManager = batchManagerFactory(metadataBackend, maxJobsPerHost); - var batchLauncher = new BatchLauncher(batchManager); +module.exports = function (metadataBackend) { + var jobQueuePool = new JobQueuePool(); + var jobSubscriber = new JobSubscriber(); + var job = new Job(); + var userDatabaseMetadataService = new UserDatabaseMetadataService(metadataBackend); + var jobService = new JobService(userDatabaseMetadataService, job); - // here we go! - batchLauncher.start(interval); + jobSubscriber.subscribe(function onMessage(channel, host) { + var jobQueueConsumer = jobQueuePool.get(host); + + // if queue consumer is not registered in batch service + if (!jobQueueConsumer) { + + // creates new one + jobQueueConsumer = new JobQueueConsumer(metadataBackend, host); + + // register it in batch service + jobQueuePool.add(host, jobQueueConsumer); + + // while read from queue then perform job + jobQueueConsumer.on('data', function (username) { + + // limit one job at the same time per queue (queue <1:1> db intance) + jobQueueConsumer.pause(); + + jobService.run(username, function (err) { + if (err) { + console.error(err.stack); + } + + // next job + jobQueueConsumer.resume(); + }); + }) + .on('error', function (err) { + console.error(err.stack || err); + }); + } + }); }; diff --git a/batch/job.js b/batch/job.js new file mode 100644 index 00000000..41e81500 --- /dev/null +++ b/batch/job.js @@ -0,0 +1,114 @@ +'use strict'; + +var PSQL = require('cartodb-psql'); + +function Job() { +} + +Job.prototype.run = function (userDatabaseMetada, callback) { + var self = this; + + var pg = new PSQL(userDatabaseMetada, {}, { destroyOnError: true }); + + this.getJob(pg, function (err, job) { + if (err) { + return callback(err); + } + + if (!job) { + return callback(); + } + + self.setJobRunning(pg, job, function (err) { + if (err) { + return callback(err); + } + + self.runJob(pg, job, function (err, jobResult) { + if (err) { + return self.setJobFailed(pg, job, err.message, function () { + callback(err); + }); + } + + self.setJobDone(pg, job, function () { + callback(null, jobResult); + }); + }); + }); + }); +}; + +Job.prototype.getJob = function (pg, callback) { + var getNextJob = "SELECT * FROM cdb_jobs WHERE status='pending' ORDER BY updated_at ASC LIMIT 1"; + + pg.query(getNextJob, function (err, result) { + if (err) { + return callback(err); + } + + callback(null, result.rows[0]); + }); +}; + +Job.prototype.runJob = function (pg, job, callback) { + var query = job.query; + + if (job.query.match(/SELECT\s.*FROM\s.*/i)) { + query = 'SELECT * INTO "job_' + job.job_id + '" FROM (' + job.query + ') AS j'; + } + + pg.query(query, callback); +}; + +Job.prototype.setJobRunning = function (pg, job, callback) { + var runningJobQuery = [ + 'UPDATE cdb_jobs SET ', + 'status = \'running\', ', + 'updated_at = now() ', + ' WHERE ', + 'job_id = \'' + job.job_id + '\' ', + ' RETURNING job_id;' + ].join('\n'); + + pg.query(runningJobQuery, callback); +}; + +Job.prototype.setJobDone = function (pg, job, callback) { + var doneJobQuery = [ + 'UPDATE cdb_jobs SET ', + 'status = \'done\', ', + 'updated_at = now() ', + ' WHERE ', + 'job_id = \'' + job.job_id + '\' ', + ' RETURNING job_id;' + ].join('\n'); + + pg.query(doneJobQuery, function (err) { + if (err) { + console.error(err.stack); + } + callback(); + }); +}; + +Job.prototype.setJobFailed = function (pg, job, message, callback) { + var failedJobQuery = [ + 'UPDATE cdb_jobs SET ', + 'status = \'failed\', ', + 'failed_reason = \'' + message + '\', ', + 'updated_at = now() ', + ' WHERE ', + 'job_id = \'' + job.job_id + '\' ', + ' RETURNING job_id;' + ].join('\n'); + + pg.query(failedJobQuery, function (err) { + if (err) { + console.error(err.stack); + } + callback(); + }); +}; + +module.exports = Job; diff --git a/batch/job_counter_service.js b/batch/job_counter_service.js deleted file mode 100644 index cd7f340a..00000000 --- a/batch/job_counter_service.js +++ /dev/null @@ -1,53 +0,0 @@ -'use strict'; - -function JobCounterService(maxJobsPerHost, metadataBackend) { - this.metadataBackend = metadataBackend; - this.maxJobsPerHost = maxJobsPerHost || global.settings.max_jobs_per_instance; - this.db = 5; -} - -JobCounterService.prototype.increment = function (host, callback) { - var self = this; - var db = this.db; - - this.metadataBackend.redisCmd(db, 'GET', [host], function (err, hostCounter) { - if (err) { - return callback(err); - } - - if (hostCounter >= self.maxJobsPerHost) { - return callback(new Error('Limit max job per host is reached: %s jobs', hostCounter)); - } - - self.metadataBackend.redisCmd(db, 'INCR', [host], function (err /*, hostCounter */) { - if (err) { - return callback(err); - } - callback(); - }); - }); -}; - -JobCounterService.prototype.decrement = function (host, callback) { - var self = this; - var db = this.db; - - this.metadataBackend.redisCmd(db, 'GET', [host], function (err, hostCounter) { - if (err) { - return callback(err); - } - - if (hostCounter < 0) { - return callback(new Error('Limit max job per host is reached').name= 'JobLimitReachedError'); - } - - self.metadataBackend.redisCmd(db, 'DECR', [host], function (err /*, hostCounter */) { - if (err) { - return callback(err); - } - callback(); - }); - }); - }; - -module.exports = JobCounterService; diff --git a/batch/job_publisher.js b/batch/job_publisher.js new file mode 100644 index 00000000..e6372ab9 --- /dev/null +++ b/batch/job_publisher.js @@ -0,0 +1,14 @@ +'use strict'; + +var redis = require('redis'); + +function JobPublisher() { + this.channel = 'host:job'; + this.client = redis.createClient(global.settings.redis_port, global.settings.redis_host); +} + +JobPublisher.prototype.publish = function (host) { + this.client.publish(this.channel, host); +}; + +module.exports = JobPublisher; diff --git a/batch/job_queue_consumer.js b/batch/job_queue_consumer.js new file mode 100644 index 00000000..072ab555 --- /dev/null +++ b/batch/job_queue_consumer.js @@ -0,0 +1,28 @@ +'use strict'; + +var util = require('util'); +var Readable = require('stream').Readable; + +function JobQueueConsumer(metadataBackend, host) { + Readable.call(this, { + encoding: 'utf8', + objectMode: true + }); + this.db = 5; + this.queueName = 'queue:' + host; + this.metadataBackend = metadataBackend; +} +util.inherits(JobQueueConsumer, Readable); + +JobQueueConsumer.prototype._read = function () { + var self = this; + this.metadataBackend.redisCmd(this.db, 'RPOP', [ this.queueName ], function (err, username) { + if (err) { + return self.emit('error', err); + } + + self.push(username); + }); +}; + +module.exports = JobQueueConsumer; diff --git a/batch/job_queue_pool.js b/batch/job_queue_pool.js new file mode 100644 index 00000000..0896e149 --- /dev/null +++ b/batch/job_queue_pool.js @@ -0,0 +1,23 @@ +'use strict'; + +function JobQueuePool() { + this.queues = {}; +} + +JobQueuePool.prototype.get = function (host) { + return this.queues[host]; +}; + +JobQueuePool.prototype.list = function () { + return Object.keys(this.queues); +}; + +JobQueuePool.prototype.add = function (host, queue) { + this.queues[host] = queue; +}; + +JobQueuePool.prototype.remove = function (host) { + delete this.queues[host]; +}; + +module.exports = JobQueuePool; diff --git a/batch/job_queue_producer.js b/batch/job_queue_producer.js new file mode 100644 index 00000000..da0d9689 --- /dev/null +++ b/batch/job_queue_producer.js @@ -0,0 +1,15 @@ +'use strict'; + +function JobQueueProducer(metadataBackend) { + this.metadataBackend = metadataBackend; + this.db = 5; +} + +JobQueueProducer.prototype.enqueue = function (cdbUsername, host, callback) { + var db = this.db; + var queue = 'queue:' + host; + + this.metadataBackend.redisCmd(db, 'LPUSH', [queue, cdbUsername], callback); +}; + +module.exports = JobQueueProducer; diff --git a/batch/job_service.js b/batch/job_service.js index 27394dc5..1023c03c 100644 --- a/batch/job_service.js +++ b/batch/job_service.js @@ -1,130 +1,20 @@ 'use strict'; -var PSQL = require('cartodb-psql'); - -function JobService() { +function JobService(userDatabaseMetadataService, job) { + this.userDatabaseMetadataService = userDatabaseMetadataService; + this.job = job; } -JobService.prototype.run = function (userDatabaseMetada, callback) { +JobService.prototype.run = function (username, callback) { var self = this; - var pg = new PSQL(userDatabaseMetada, {}, { destroyOnError: true }); - - this.getJob(pg, function (err, job) { + this.userDatabaseMetadataService.getUserMetadata(username, function (err, userDatabaseMetadata) { if (err) { return callback(err); } - self.setJobRunning(pg, job, function (err) { - if (err) { - return callback(err); - } - - self.runJob(pg, job, function (err, jobResult) { - if (err) { - - self.setJobFailed(pg, job, err.message, function (err) { - if (err) { - return callback(err); - } - callback(null, jobResult); - }); - - } else { - - self.setJobDone(pg, job, function (err) { - if (err) { - return callback(err); - } - callback(null, jobResult); - }); - - } - }); - }); + self.job.run(userDatabaseMetadata, callback); }); }; -JobService.prototype.getJob = function (pg, callback) { - var getNextJob = "SELECT * FROM cdb_jobs WHERE status='pending' ORDER BY updated_at ASC LIMIT 1"; - - pg.query(getNextJob, function (err, result) { - if (err) { - return callback(err); - } - - callback(null, result.rows[0]); - }); -}; - -JobService.prototype.runJob = function (pg, job, callback) { - var query = job.query; - - if (job.query.match(/SELECT\s.*FROM\s.*/i)) { - query = 'SELECT * INTO "job_' + job.job_id + '" FROM (' + job.query + ') AS j'; - } - - pg.query(query, function (err, jobResult) { - if (err) { - return callback(err); - } - callback(null, jobResult); - }); -}; - -JobService.prototype.setJobRunning = function (pg, job, callback) { - var runningJobQuery = [ - 'UPDATE cdb_jobs SET ', - 'status = \'running\', ', - 'updated_at = now() ', - ' WHERE ', - 'job_id = \'' + job.job_id + '\' ', - ' RETURNING job_id;' - ].join('\n'); - - pg.query(runningJobQuery, function (err, result) { - if (err) { - return callback(err); - } - callback(null, result); - }); -}; - -JobService.prototype.setJobDone = function (pg, job, callback) { - var doneJobQuery = [ - 'UPDATE cdb_jobs SET ', - 'status = \'done\', ', - 'updated_at = now() ', - ' WHERE ', - 'job_id = \'' + job.job_id + '\' ', - ' RETURNING job_id;' - ].join('\n'); - - pg.query(doneJobQuery, function (err, result) { - if (err) { - return callback(err); - } - callback(null, result); - }); -}; - -JobService.prototype.setJobFailed = function (pg, job, message, callback) { - var failedJobQuery = [ - 'UPDATE cdb_jobs SET ', - 'status = \'failed\', ', - 'failed_reason = \'' + message + '\', ', - 'updated_at = now() ', - ' WHERE ', - 'job_id = \'' + job.job_id + '\' ', - ' RETURNING job_id;' - ].join('\n'); - - pg.query(failedJobQuery, function (err, result) { - if (err) { - return callback(err); - } - callback(null, result); - }); -}; - -module.exports = JobService; +module.exports = JobService; \ No newline at end of file diff --git a/batch/job_subscriber.js b/batch/job_subscriber.js new file mode 100644 index 00000000..57e880c6 --- /dev/null +++ b/batch/job_subscriber.js @@ -0,0 +1,15 @@ +'use strict'; + +var redis = require('redis'); + +function JobSubscriber() { + this.channel = 'host:job'; + this.client = redis.createClient(global.settings.redis_port, global.settings.redis_host); +} + +JobSubscriber.prototype.subscribe = function (onMessage) { + this.client.subscribe(this.channel); + this.client.on('message', onMessage); +}; + +module.exports = JobSubscriber; diff --git a/batch/user_database_metadata_service.js b/batch/user_database_metadata_service.js index 962005ba..0dd78444 100644 --- a/batch/user_database_metadata_service.js +++ b/batch/user_database_metadata_service.js @@ -39,7 +39,6 @@ UserDatabaseMetadataService.prototype.parseMetadaToDatabase = function (userData }); return dbopts; - }; module.exports = UserDatabaseMetadataService; diff --git a/batch/username_queue.js b/batch/username_queue.js deleted file mode 100644 index 380a4df6..00000000 --- a/batch/username_queue.js +++ /dev/null @@ -1,24 +0,0 @@ -'use strict'; - - -function UsernameQueue(metadataBackend) { - this.metadataBackend = metadataBackend; - this.db = 5; - this.queueName = 'usernameBatchQueue'; -} - -UsernameQueue.prototype.enqueue = function (cdbUsername, callback) { - var db = this.db; - var queue = this.queueName; - - this.metadataBackend.redisCmd(db, 'LPUSH', [queue, cdbUsername], callback); -}; - -UsernameQueue.prototype.dequeue = function (callback) { - var db = this.db; - var queue = this.queueName; - - this.metadataBackend.redisCmd(db, 'RPOP', [queue], callback); -}; - -module.exports = UsernameQueue; diff --git a/package.json b/package.json index 8bb21efe..241a770e 100644 --- a/package.json +++ b/package.json @@ -1,49 +1,50 @@ { - "private": true, - "name": "cartodb_sql_api", - "description": "high speed SQL api for cartodb", - "keywords": [ - "cartodb" - ], - "version": "1.25.0", - "repository": { - "type": "git", - "url": "git://github.com/CartoDB/CartoDB-SQL-API.git" - }, - "author": "Vizzuality (http://vizzuality.com)", - "contributors": [ - "Simon Tokumine ", - "Sandro Santilli " - ], - "dependencies": { - "cartodb-redis": "~0.11.0", - "cartodb-psql": "~0.6.0", - "step-profiler": "~0.1.0", - "underscore": "~1.6.0", - "express": "~2.5.11", - "step": "~0.0.5", - "topojson": "0.0.8", - "oauth-client": "0.3.0", - "lru-cache":"~2.5.0", - "log4js": "https://github.com/CartoDB/log4js-node/tarball/cdb", - "rollbar": "~0.3.2", - "node-statsd": "~0.0.7" - }, - "devDependencies": { - "redis": "0.7.1", - "request": "~2.60.0", - "shapefile": "0.3.0", - "mocha": "~1.21.4", - "jshint": "~2.6.0", - "zipfile": "~0.5.0", - "libxmljs": "~0.8.1", - "sqlite3": "~3.0.8" - }, - "scripts": { - "test": "make test-all" - }, - "engines": { - "node": ">=0.8 <0.11", - "npm": ">=1.2.1" - } + "private": true, + "name": "cartodb_sql_api", + "description": "high speed SQL api for cartodb", + "keywords": [ + "cartodb" + ], + "version": "1.25.0", + "repository": { + "type": "git", + "url": "git://github.com/CartoDB/CartoDB-SQL-API.git" + }, + "author": "Vizzuality (http://vizzuality.com)", + "contributors": [ + "Simon Tokumine ", + "Sandro Santilli " + ], + "dependencies": { + "cartodb-psql": "~0.6.0", + "cartodb-redis": "~0.11.0", + "express": "~2.5.11", + "log4js": "https://github.com/CartoDB/log4js-node/tarball/cdb", + "lru-cache": "~2.5.0", + "node-statsd": "~0.0.7", + "oauth-client": "0.3.0", + "redis": "^2.4.2", + "rollbar": "~0.3.2", + "step": "~0.0.5", + "step-profiler": "~0.1.0", + "topojson": "0.0.8", + "underscore": "~1.6.0" + }, + "devDependencies": { + "redis": "0.7.1", + "request": "~2.60.0", + "shapefile": "0.3.0", + "mocha": "~1.21.4", + "jshint": "~2.6.0", + "zipfile": "~0.5.0", + "libxmljs": "~0.8.1", + "sqlite3": "~3.0.8" + }, + "scripts": { + "test": "make test-all" + }, + "engines": { + "node": ">=0.8 <0.11", + "npm": ">=1.2.1" + } } diff --git a/test/acceptance/batch.test.js b/test/acceptance/batch.test.js index 73992276..5b1c2110 100644 --- a/test/acceptance/batch.test.js +++ b/test/acceptance/batch.test.js @@ -1,8 +1,10 @@ -var batchManagerFactory = require('../../batch/batch_manager_factory'); +var batch = require('../../batch/'); +var JobPublisher = require('../../batch/job_publisher'); +var JobQueueProducer = require('../../batch/job_queue_producer'); -describe('batch manager', function() { - it('run', function (done) { +describe('batch', function() { + it('should be initialized successfuly', function () { var metadataBackend = require('cartodb-redis')({ host: global.settings.redis_host, port: global.settings.redis_port, @@ -10,10 +12,31 @@ describe('batch manager', function() { idleTimeoutMillis: global.settings.redisIdleTimeoutMillis, reapIntervalMillis: global.settings.redisReapIntervalMillis }); - var maxJobsPerHost = 100; - batchManagerFactory(metadataBackend, maxJobsPerHost).run(function (err) { - done(err); + batch(metadataBackend); + }); + + it.skip('should perform one job', function (done) { + var metadataBackend = require('cartodb-redis')({ + host: global.settings.redis_host, + port: global.settings.redis_port, + max: global.settings.redisPool, + idleTimeoutMillis: global.settings.redisIdleTimeoutMillis, + reapIntervalMillis: global.settings.redisReapIntervalMillis + }); + var jobQueueProducer = new JobQueueProducer(metadataBackend); + var jobPublisher = new JobPublisher(); + + batch(metadataBackend); + + jobQueueProducer.enqueue('vizzuality', '127.0.0.1', function (err) { + if (err) { + return done(err); + } + jobPublisher.publish('127.0.0.1'); + setTimeout(function () { + done(); + }, 4000); }); }); });