From 34b352a0a6bae910dfb825ee74ec8c15848e5cfc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Garc=C3=ADa=20Aubert?= Date: Wed, 29 Mar 2017 19:38:36 +0200 Subject: [PATCH 01/16] Use max number of attempts to scan user queues --- batch/pubsub/queue-seeker.js | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/batch/pubsub/queue-seeker.js b/batch/pubsub/queue-seeker.js index 0e1b7a4b..f220a761 100644 --- a/batch/pubsub/queue-seeker.js +++ b/batch/pubsub/queue-seeker.js @@ -1,6 +1,7 @@ 'use strict'; var QUEUE = require('../job_queue').QUEUE; +var MAX_SCAN_ATTEMPTS = 50; function QueueSeeker(pool) { this.pool = pool; @@ -10,6 +11,7 @@ module.exports = QueueSeeker; QueueSeeker.prototype.seek = function (callback) { var initialCursor = ['0']; + var attemps = 0; var users = {}; var self = this; @@ -17,14 +19,14 @@ QueueSeeker.prototype.seek = function (callback) { if (err) { return callback(err); } - self._seek(client, initialCursor, users, function(err, users) { + self._seek(client, initialCursor, users, attemps, function(err, users) { self.pool.release(QUEUE.DB, client); return callback(err, Object.keys(users)); }); }); }; -QueueSeeker.prototype._seek = function (client, cursor, users, callback) { +QueueSeeker.prototype._seek = function (client, cursor, users, attemps, callback) { var self = this; var redisParams = [cursor[0], 'MATCH', QUEUE.PREFIX + '*']; @@ -41,11 +43,13 @@ QueueSeeker.prototype._seek = function (client, cursor, users, callback) { }); } - var hasMore = currentCursor[0] !== '0'; + var hasMore = currentCursor[0] !== '0' && attemps < MAX_SCAN_ATTEMPTS; if (!hasMore) { return callback(null, users); } - self._seek(client, currentCursor, users, callback); + attemps += 1; + + self._seek(client, currentCursor, users, attemps, callback); }); }; From 2edfd1fad55dd0fe3a9c7709efb1b98193f8e8e3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Garc=C3=ADa=20Aubert?= Date: Wed, 29 Mar 2017 19:51:19 +0200 Subject: [PATCH 02/16] Compare interger instead of strings --- batch/pubsub/queue-seeker.js | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/batch/pubsub/queue-seeker.js b/batch/pubsub/queue-seeker.js index f220a761..46ed0c6f 100644 --- a/batch/pubsub/queue-seeker.js +++ b/batch/pubsub/queue-seeker.js @@ -43,7 +43,8 @@ QueueSeeker.prototype._seek = function (client, cursor, users, attemps, callback }); } - var hasMore = currentCursor[0] !== '0' && attemps < MAX_SCAN_ATTEMPTS; + var hasMore = (parseInt(currentCursor[0], 10) > 0) && (attemps < MAX_SCAN_ATTEMPTS); + if (!hasMore) { return callback(null, users); } From 56429ffebfbd83a36495669fa485f3efd0a06a93 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Garc=C3=ADa=20Aubert?= Date: Wed, 29 Mar 2017 19:54:08 +0200 Subject: [PATCH 03/16] Use traditional for loop instead of .forEach() --- batch/pubsub/queue-seeker.js | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/batch/pubsub/queue-seeker.js b/batch/pubsub/queue-seeker.js index 46ed0c6f..00d672f0 100644 --- a/batch/pubsub/queue-seeker.js +++ b/batch/pubsub/queue-seeker.js @@ -37,10 +37,10 @@ QueueSeeker.prototype._seek = function (client, cursor, users, attemps, callback var queues = currentCursor[1]; if (queues) { - queues.forEach(function (queue) { - var user = queue.substr(QUEUE.PREFIX.length); + for (var i = 0; i < queues.length; i++) { + var user = queues[i].substr(QUEUE.PREFIX.length); users[user] = true; - }); + } } var hasMore = (parseInt(currentCursor[0], 10) > 0) && (attemps < MAX_SCAN_ATTEMPTS); From 09d5a14adee09656d2a49f23520aa550833ae073 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Garc=C3=ADa=20Aubert?= Date: Wed, 29 Mar 2017 20:04:19 +0200 Subject: [PATCH 04/16] Check data from redis scan is an array --- batch/pubsub/queue-seeker.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/batch/pubsub/queue-seeker.js b/batch/pubsub/queue-seeker.js index 00d672f0..8e8bee66 100644 --- a/batch/pubsub/queue-seeker.js +++ b/batch/pubsub/queue-seeker.js @@ -36,7 +36,7 @@ QueueSeeker.prototype._seek = function (client, cursor, users, attemps, callback } var queues = currentCursor[1]; - if (queues) { + if (Array.isArray(queues)) { for (var i = 0; i < queues.length; i++) { var user = queues[i].substr(QUEUE.PREFIX.length); users[user] = true; From 6592435e87310c80332b3859d105dc3d7513b748 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Garc=C3=ADa=20Aubert?= Date: Wed, 29 Mar 2017 20:40:40 +0200 Subject: [PATCH 05/16] Use COUNT option for scan command --- batch/pubsub/queue-seeker.js | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/batch/pubsub/queue-seeker.js b/batch/pubsub/queue-seeker.js index 8e8bee66..ac501841 100644 --- a/batch/pubsub/queue-seeker.js +++ b/batch/pubsub/queue-seeker.js @@ -2,6 +2,7 @@ var QUEUE = require('../job_queue').QUEUE; var MAX_SCAN_ATTEMPTS = 50; +var SCAN_COUNT_VALUE = 50; function QueueSeeker(pool) { this.pool = pool; @@ -28,7 +29,7 @@ QueueSeeker.prototype.seek = function (callback) { QueueSeeker.prototype._seek = function (client, cursor, users, attemps, callback) { var self = this; - var redisParams = [cursor[0], 'MATCH', QUEUE.PREFIX + '*']; + var redisParams = [cursor[0], 'MATCH', QUEUE.PREFIX + '*', 'COUNT', SCAN_COUNT_VALUE]; client.scan(redisParams, function(err, currentCursor) { if (err) { From 8012fe26aa28243cc1a5f19f9ba91ec5e3286592 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Garc=C3=ADa=20Aubert?= Date: Fri, 31 Mar 2017 14:30:33 +0200 Subject: [PATCH 06/16] Avoid scan behaviour to discover active queues of user's jobs, now keeps an index to know which queues are available --- batch/job_queue.js | 31 +++++++- batch/pubsub/job-subscriber.js | 75 ++++++++----------- batch/pubsub/queue-discover.js | 27 +++++++ batch/pubsub/queue-seeker.js | 57 -------------- .../{queue-seeker.js => queue-discover.js} | 41 ++++++---- test/unit/batch/job_queue.js | 6 ++ test/unit/batch/job_subscriber.js | 5 +- 7 files changed, 122 insertions(+), 120 deletions(-) create mode 100644 batch/pubsub/queue-discover.js delete mode 100644 batch/pubsub/queue-seeker.js rename test/integration/batch/{queue-seeker.js => queue-discover.js} (54%) diff --git a/batch/job_queue.js b/batch/job_queue.js index 7ed1cc59..4b38f504 100644 --- a/batch/job_queue.js +++ b/batch/job_queue.js @@ -2,22 +2,29 @@ var debug = require('./util/debug')('queue'); -function JobQueue(metadataBackend, jobPublisher) { +function JobQueue(metadataBackend, jobPublisher, queueIndex) { this.metadataBackend = metadataBackend; this.jobPublisher = jobPublisher; + this.queueIndex = queueIndex; } module.exports = JobQueue; var QUEUE = { DB: 5, - PREFIX: 'batch:queue:' + PREFIX: 'batch:queue:', + INDEX: 'batch:indexes:queue' }; + module.exports.QUEUE = QUEUE; JobQueue.prototype.enqueue = function (user, jobId, callback) { debug('JobQueue.enqueue user=%s, jobId=%s', user, jobId); - this.metadataBackend.redisCmd(QUEUE.DB, 'LPUSH', [ QUEUE.PREFIX + user, jobId ], function (err) { + + this.metadataBackend.redisMultiCmd(QUEUE.DB, [ + [ 'LPUSH', QUEUE.PREFIX + user, jobId ], + [ 'SADD', QUEUE.INDEX, user ] + ], function (err) { if (err) { return callback(err); } @@ -32,7 +39,23 @@ JobQueue.prototype.size = function (user, callback) { }; JobQueue.prototype.dequeue = function (user, callback) { - this.metadataBackend.redisCmd(QUEUE.DB, 'RPOP', [ QUEUE.PREFIX + user ], function(err, jobId) { + var dequeueScript = [ + 'local job_id = redis.call("RPOP", KEYS[1])', + 'if redis.call("LLEN", KEYS[1]) == 0 then', + ' redis.call("SREM", KEYS[2], ARGV[1])', + 'end', + 'return job_id' + ].join('\n'); + + var redisParams = [ + dequeueScript, //lua source code + 2, // Two "keys" to pass + QUEUE.PREFIX + user, //KEYS[1], the key of the queue + QUEUE.INDEX, //KEYS[2], the key of the index + user // ARGV[1] - value of the element to remove form the index + ]; + + this.metadataBackend.redisCmd(QUEUE.DB, 'EVAL', redisParams, function (err, jobId) { debug('JobQueue.dequeued user=%s, jobId=%s', user, jobId); return callback(err, jobId); }); diff --git a/batch/pubsub/job-subscriber.js b/batch/pubsub/job-subscriber.js index f0a3c641..5f109c24 100644 --- a/batch/pubsub/job-subscriber.js +++ b/batch/pubsub/job-subscriber.js @@ -1,7 +1,7 @@ 'use strict'; var Channel = require('./channel'); -var QueueSeeker = require('./queue-seeker'); +var queueDiscover = require('./queue-discover'); var debug = require('./../util/debug')('pubsub:subscriber'); var error = require('./../util/debug')('pubsub:subscriber:error'); @@ -11,74 +11,61 @@ var SUBSCRIBE_INTERVAL = 5 * MINUTE; function JobSubscriber(pool, userDatabaseMetadataService) { this.pool = pool; this.userDatabaseMetadataService = userDatabaseMetadataService; - this.queueSeeker = new QueueSeeker(pool); } module.exports = JobSubscriber; -function seeker(queueSeeker, onJobHandler, callback) { - queueSeeker.seek(function (err, users) { - if (err) { - if (callback) { - callback(err); - } - return error(err); - } - debug('queues found successfully'); - users.forEach(onJobHandler); - - if (callback) { - return callback(null); - } - }); -} - JobSubscriber.prototype.subscribe = function (onJobHandler, callback) { var self = this; function wrappedJobHandlerListener(user) { self.userDatabaseMetadataService.getUserMetadata(user, function (err, userDatabaseMetadata) { if (err) { - return callback(err); + if (callback) { + callback(err); + } + return error('Error getting user\'s host: ' + err.message); } return onJobHandler(user, userDatabaseMetadata.host); }); } - seeker(this.queueSeeker, wrappedJobHandlerListener, function(err) { - if (callback) { - callback(err); + queueDiscover(self.pool, wrappedJobHandlerListener, function (err, client) { + if (err) { + if (callback) { + callback(err); + } + + return error('Error discovering user\'s queues: ' + err.message); } // do not start any pooling until first seek has finished - self.seekerInterval = setInterval(seeker, SUBSCRIBE_INTERVAL, self.queueSeeker, wrappedJobHandlerListener); + self.discoverInterval = setInterval(queueDiscover, SUBSCRIBE_INTERVAL, self.pool, wrappedJobHandlerListener); - self.pool.acquire(Channel.DB, function (err, client) { - if (err) { - return error('Error adquiring redis client: ' + err.message); - } + self.client = client; + client.removeAllListeners('message'); + client.unsubscribe(Channel.NAME); + client.subscribe(Channel.NAME); - self.client = client; - client.removeAllListeners('message'); - client.unsubscribe(Channel.NAME); - client.subscribe(Channel.NAME); - - client.on('message', function (channel, user) { - debug('message received in channel=%s from user=%s', channel, user); - wrappedJobHandlerListener(user); - }); - - client.on('error', function () { - self.unsubscribe(); - self.pool.release(Channel.DB, client); - self.subscribe(onJobHandler); - }); + client.on('message', function (channel, user) { + debug('message received in channel=%s from user=%s', channel, user); + wrappedJobHandlerListener(user); }); + + client.on('error', function () { + self.unsubscribe(); + self.pool.release(Channel.DB, client); + self.subscribe(onJobHandler); + }); + + if (callback) { + callback(); + } }); }; JobSubscriber.prototype.unsubscribe = function (callback) { - clearInterval(this.seekerInterval); + clearInterval(this.discoverInterval); if (this.client && this.client.connected) { this.client.unsubscribe(Channel.NAME, callback); } else { diff --git a/batch/pubsub/queue-discover.js b/batch/pubsub/queue-discover.js new file mode 100644 index 00000000..7a0474a8 --- /dev/null +++ b/batch/pubsub/queue-discover.js @@ -0,0 +1,27 @@ +'use strict'; + +var error = require('./../util/debug')('pubsub:queue-discover:error'); +var QUEUE = require('../job_queue').QUEUE; + +module.exports = function queueDiscover (pool, wrappedJobHandlerListener, callback) { + pool.acquire(QUEUE.DB, function (err, client) { + if (err) { + if (callback) { + callback(err); + } + return error('Error adquiring redis client: ' + err.message); + } + + client.smembers(QUEUE.INDEX, function (err, queues) { + if (err) { + return error('Error getting queues from index: ' + err.message); + } + + queues.forEach(wrappedJobHandlerListener); + + if (callback) { + return callback(null, client, queues); + } + }); + }); +}; diff --git a/batch/pubsub/queue-seeker.js b/batch/pubsub/queue-seeker.js deleted file mode 100644 index ac501841..00000000 --- a/batch/pubsub/queue-seeker.js +++ /dev/null @@ -1,57 +0,0 @@ -'use strict'; - -var QUEUE = require('../job_queue').QUEUE; -var MAX_SCAN_ATTEMPTS = 50; -var SCAN_COUNT_VALUE = 50; - -function QueueSeeker(pool) { - this.pool = pool; -} - -module.exports = QueueSeeker; - -QueueSeeker.prototype.seek = function (callback) { - var initialCursor = ['0']; - var attemps = 0; - var users = {}; - var self = this; - - this.pool.acquire(QUEUE.DB, function(err, client) { - if (err) { - return callback(err); - } - self._seek(client, initialCursor, users, attemps, function(err, users) { - self.pool.release(QUEUE.DB, client); - return callback(err, Object.keys(users)); - }); - }); -}; - -QueueSeeker.prototype._seek = function (client, cursor, users, attemps, callback) { - var self = this; - var redisParams = [cursor[0], 'MATCH', QUEUE.PREFIX + '*', 'COUNT', SCAN_COUNT_VALUE]; - - client.scan(redisParams, function(err, currentCursor) { - if (err) { - return callback(null, users); - } - - var queues = currentCursor[1]; - if (Array.isArray(queues)) { - for (var i = 0; i < queues.length; i++) { - var user = queues[i].substr(QUEUE.PREFIX.length); - users[user] = true; - } - } - - var hasMore = (parseInt(currentCursor[0], 10) > 0) && (attemps < MAX_SCAN_ATTEMPTS); - - if (!hasMore) { - return callback(null, users); - } - - attemps += 1; - - self._seek(client, currentCursor, users, attemps, callback); - }); -}; diff --git a/test/integration/batch/queue-seeker.js b/test/integration/batch/queue-discover.js similarity index 54% rename from test/integration/batch/queue-seeker.js rename to test/integration/batch/queue-discover.js index e5ade0e2..b82e85c2 100644 --- a/test/integration/batch/queue-seeker.js +++ b/test/integration/batch/queue-discover.js @@ -6,13 +6,12 @@ var redisUtils = require('../../support/redis_utils'); var metadataBackend = require('cartodb-redis')({ pool: redisUtils.getPool() }); var JobPublisher = require('../../../batch/pubsub/job-publisher'); -var QueueSeeker = require('../../../batch/pubsub/queue-seeker'); var JobQueue = require('../../../batch/job_queue'); var jobPublisher = new JobPublisher(redisUtils.getPool()); +var queueDiscover = require('../../../batch/pubsub/queue-discover'); - -describe('queue seeker', function() { +describe('queue discover', function () { var userA = 'userA'; var userB = 'userB'; @@ -25,15 +24,22 @@ describe('queue seeker', function() { }); it('should find queues for one user', function (done) { - var seeker = new QueueSeeker(redisUtils.getPool()); this.jobQueue.enqueue(userA, 'wadus-wadus-wadus-wadus', function(err) { if (err) { return done(err); } - seeker.seek(function(err, users) { - assert.ok(!err); - assert.equal(users.length, 1); - assert.equal(users[0], userA); + + var onQueueDiscoveredCalledNumber = 0; + + function onQueueDiscovered () { + onQueueDiscoveredCalledNumber += 1; + } + + queueDiscover(redisUtils.getPool(), onQueueDiscovered, function (err, client, queues) { + assert.ifError(err); + assert.equal(queues.length, 1); + assert.equal(onQueueDiscoveredCalledNumber, queues.length); + assert.equal(queues[0], userA); return done(); }); @@ -42,7 +48,6 @@ describe('queue seeker', function() { it('should find queues for more than one user', function (done) { var self = this; - var seeker = new QueueSeeker(redisUtils.getPool()); this.jobQueue.enqueue(userA, 'wadus-wadus-wadus-wadus', function(err) { if (err) { return done(err); @@ -51,11 +56,19 @@ describe('queue seeker', function() { if (err) { return done(err); } - seeker.seek(function(err, users) { - assert.ok(!err); - assert.equal(users.length, 2); - assert.ok(users[0] === userA || users[0] === userB); - assert.ok(users[1] === userA || users[1] === userB); + + var onQueueDiscoveredCalledNumber = 0; + + function onQueueDiscovered () { + onQueueDiscoveredCalledNumber += 1; + } + + queueDiscover(redisUtils.getPool(), onQueueDiscovered, function (err, client, queues) { + assert.ifError(err); + assert.equal(queues.length, 2); + assert.equal(onQueueDiscoveredCalledNumber, queues.length); + assert.ok(queues[0] === userA || queues[0] === userB); + assert.ok(queues[1] === userA || queues[1] === userB); return done(); }); diff --git a/test/unit/batch/job_queue.js b/test/unit/batch/job_queue.js index c3f376e3..e001e736 100644 --- a/test/unit/batch/job_queue.js +++ b/test/unit/batch/job_queue.js @@ -9,6 +9,12 @@ describe('batch API job queue', function () { process.nextTick(function () { callback(null, 'irrelevantJob'); }); + }, + redisMultiCmd: function () { + var callback = arguments[arguments.length -1]; + process.nextTick(function () { + callback(null, 'irrelevantJob'); + }); } }; this.jobPublisher = { diff --git a/test/unit/batch/job_subscriber.js b/test/unit/batch/job_subscriber.js index 70caa1a9..45f2931c 100644 --- a/test/unit/batch/job_subscriber.js +++ b/test/unit/batch/job_subscriber.js @@ -30,7 +30,10 @@ describe('batch API job subscriber', function () { removeAllListeners: function () { return this; }, - connected: true + smembers: function (key, callback) { + callback(null, []); + }, + connected: true, }; this.pool = { acquire: function (db, cb) { From dd7cb733374f05d881077dd70bb3a8430a10fb79 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Garc=C3=ADa=20Aubert?= Date: Fri, 31 Mar 2017 14:34:24 +0200 Subject: [PATCH 07/16] Remove unused dependency --- batch/job_queue.js | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/batch/job_queue.js b/batch/job_queue.js index 4b38f504..7f373ac3 100644 --- a/batch/job_queue.js +++ b/batch/job_queue.js @@ -2,10 +2,9 @@ var debug = require('./util/debug')('queue'); -function JobQueue(metadataBackend, jobPublisher, queueIndex) { +function JobQueue(metadataBackend, jobPublisher) { this.metadataBackend = metadataBackend; this.jobPublisher = jobPublisher; - this.queueIndex = queueIndex; } module.exports = JobQueue; From 8f35e1d1af25672d994f6173be4f7774bd64d50c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Garc=C3=ADa=20Aubert?= Date: Mon, 3 Apr 2017 11:38:20 +0200 Subject: [PATCH 08/16] Fix typo --- batch/job_queue.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/batch/job_queue.js b/batch/job_queue.js index 7f373ac3..62bad8f2 100644 --- a/batch/job_queue.js +++ b/batch/job_queue.js @@ -51,7 +51,7 @@ JobQueue.prototype.dequeue = function (user, callback) { 2, // Two "keys" to pass QUEUE.PREFIX + user, //KEYS[1], the key of the queue QUEUE.INDEX, //KEYS[2], the key of the index - user // ARGV[1] - value of the element to remove form the index + user // ARGV[1] - value of the element to remove from the index ]; this.metadataBackend.redisCmd(QUEUE.DB, 'EVAL', redisParams, function (err, jobId) { From 24ff1cf8087fcb1145a86f87097ccf6f1f944577 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Garc=C3=ADa=20Aubert?= Date: Mon, 3 Apr 2017 12:34:30 +0200 Subject: [PATCH 09/16] Discover and add to index queues of users with jobs on batch service startup --- batch/batch.js | 10 ++++++- batch/pubsub/queue-discover.js | 55 ++++++++++++++++++++++++++++++++++ 2 files changed, 64 insertions(+), 1 deletion(-) diff --git a/batch/batch.js b/batch/batch.js index ad4ec064..e73ba2c0 100644 --- a/batch/batch.js +++ b/batch/batch.js @@ -5,6 +5,7 @@ var EventEmitter = require('events').EventEmitter; var debug = require('./util/debug')('batch'); var queue = require('queue-async'); var HostScheduler = require('./scheduler/host-scheduler'); +var startupQueueDiscover = require('./pubsub/queue-discover').startupQueueDiscover; var EMPTY_QUEUE = true; @@ -18,6 +19,7 @@ function Batch(name, jobSubscriber, jobQueue, jobRunner, jobService, jobPublishe this.jobPublisher = jobPublisher; this.logger = logger; this.hostScheduler = new HostScheduler(this.name, { run: this.processJob.bind(this) }, redisPool); + this.pool = redisPool; // map: user => jobId. Will be used for draining jobs. this.workInProgressJobs = {}; @@ -46,7 +48,13 @@ Batch.prototype.start = function () { return self.emit('error', err); } - self.emit('ready'); + startupQueueDiscover(self.pool, function (err) { + if (err) { + return self.emit('error', err); + } + + self.emit('ready'); + }); } ); }; diff --git a/batch/pubsub/queue-discover.js b/batch/pubsub/queue-discover.js index 7a0474a8..09f20eaf 100644 --- a/batch/pubsub/queue-discover.js +++ b/batch/pubsub/queue-discover.js @@ -2,6 +2,7 @@ var error = require('./../util/debug')('pubsub:queue-discover:error'); var QUEUE = require('../job_queue').QUEUE; +var queueAsync = require('queue-async'); module.exports = function queueDiscover (pool, wrappedJobHandlerListener, callback) { pool.acquire(QUEUE.DB, function (err, client) { @@ -25,3 +26,57 @@ module.exports = function queueDiscover (pool, wrappedJobHandlerListener, callba }); }); }; + +module.exports.startupQueueDiscover = function startupQueueDiscover (pool, callback) { + var initialCursor = ['0']; + var users = {}; + + pool.acquire(QUEUE.DB, function(err, client) { + if (err) { + return callback(err); + } + + scanQueues(client, initialCursor, users, function(err, users) { + var usernames = Object.keys(users); + var usersQueues = queueAsync(usernames.length); + + usernames.forEach(function (username) { + usersQueues.defer(client.sadd.bind(client), QUEUE.INDEX, username); + }); + + usersQueues.awaitAll(function (err) { + if (err) { + return callback(err); + } + + pool.release(QUEUE.DB, client); + callback(null); + }); + }); + }); +}; + +function scanQueues (client, cursor, users, callback) { + var redisParams = [cursor[0], 'MATCH', QUEUE.PREFIX + '*']; + + client.scan(redisParams, function(err, currentCursor) { + if (err) { + return callback(null, users); + } + + var queues = currentCursor[1]; + if (queues) { + queues.forEach(function (queue) { + var user = queue.substr(QUEUE.PREFIX.length); + users[user] = true; + }); + } + + var hasMore = currentCursor[0] !== '0'; + if (!hasMore) { + return callback(null, users); + } + + scanQueues(client, currentCursor, users, callback); + }); +} From e4e6207565dd18e05ff29259bfdd019f2405e92a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Garc=C3=ADa=20Aubert?= Date: Mon, 3 Apr 2017 12:59:23 +0200 Subject: [PATCH 10/16] Add test --- .../batch/batch-queue-discover.test.js | 72 +++++++++++++++++++ 1 file changed, 72 insertions(+) create mode 100644 test/acceptance/batch/batch-queue-discover.test.js diff --git a/test/acceptance/batch/batch-queue-discover.test.js b/test/acceptance/batch/batch-queue-discover.test.js new file mode 100644 index 00000000..042ce8ec --- /dev/null +++ b/test/acceptance/batch/batch-queue-discover.test.js @@ -0,0 +1,72 @@ +require('../../helper'); +var assert = require('../../support/assert'); +var redisUtils = require('../../support/redis_utils'); +var batchFactory = require('../../../batch/index'); + +var JobPublisher = require('../../../batch/pubsub/job-publisher'); +var JobQueue = require('../../../batch/job_queue'); +var JobBackend = require('../../../batch/job_backend'); +var JobService = require('../../../batch/job_service'); +var UserDatabaseMetadataService = require('../../../batch/user_database_metadata_service'); +var JobCanceller = require('../../../batch/job_canceller'); +var metadataBackend = require('cartodb-redis')({ pool: redisUtils.getPool() }); +var queueDiscover = require('../../../batch/pubsub/queue-discover'); + +describe('batch startup', function() { + var dbInstance = 'localhost'; + var username = 'vizzuality'; + var pool = redisUtils.getPool(); + var jobPublisher = new JobPublisher(pool); + var jobQueue = new JobQueue(metadataBackend, jobPublisher); + var jobBackend = new JobBackend(metadataBackend, jobQueue); + var userDatabaseMetadataService = new UserDatabaseMetadataService(metadataBackend); + var jobCanceller = new JobCanceller(userDatabaseMetadataService); + var jobService = new JobService(jobBackend, jobCanceller); + + var batch = batchFactory(metadataBackend, pool); + + after(function (done) { + batch.stop(); + redisUtils.clean('batch:*', done); + }); + + function createJob(sql, done) { + var data = { + user: username, + query: sql, + host: dbInstance + }; + + jobService.create(data, function (err, job) { + if (err) { + return done(err); + } + + done(null, job.serialize()); + }); + } + + it('should feed queue index at startup', function (done) { + createJob('select pg_sleep(3)', function (err) { + if (err) { + return done(err); + } + + batch.start(); + + batch.on('ready', function () { + var onDiscoveredQueue = function () {}; + queueDiscover(pool, onDiscoveredQueue, function (err, client, queues) { + if (err) { + done(err); + } + + assert.equal(queues.length, 1); + assert.equal(queues[0], 'vizzuality'); + done(); + }); + }); + }); + }); + +}); From dbdfe861717d9d202a9244565bd829e745c325e0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Garc=C3=ADa=20Aubert?= Date: Mon, 3 Apr 2017 13:07:27 +0200 Subject: [PATCH 11/16] Remove mocha hooks --- .../batch/batch-queue-discover.test.js | 22 ++++++++++--------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/test/acceptance/batch/batch-queue-discover.test.js b/test/acceptance/batch/batch-queue-discover.test.js index 042ce8ec..11b48459 100644 --- a/test/acceptance/batch/batch-queue-discover.test.js +++ b/test/acceptance/batch/batch-queue-discover.test.js @@ -23,13 +23,6 @@ describe('batch startup', function() { var jobCanceller = new JobCanceller(userDatabaseMetadataService); var jobService = new JobService(jobBackend, jobCanceller); - var batch = batchFactory(metadataBackend, pool); - - after(function (done) { - batch.stop(); - redisUtils.clean('batch:*', done); - }); - function createJob(sql, done) { var data = { user: username, @@ -52,10 +45,15 @@ describe('batch startup', function() { return done(err); } + var batch = batchFactory(metadataBackend, pool); batch.start(); - batch.on('ready', function () { - var onDiscoveredQueue = function () {}; + var queuesDiscovered = 0; + + var onDiscoveredQueue = function () { + queuesDiscovered += 1; + }; + queueDiscover(pool, onDiscoveredQueue, function (err, client, queues) { if (err) { done(err); @@ -63,7 +61,11 @@ describe('batch startup', function() { assert.equal(queues.length, 1); assert.equal(queues[0], 'vizzuality'); - done(); + assert.equal(queuesDiscovered, 1); + + batch.stop(function () { + redisUtils.clean('batch:*', done); + }); }); }); }); From 3a0bbc1eadc972d12e9e5e47cb87f5bb334de8a2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Garc=C3=ADa=20Aubert?= Date: Mon, 3 Apr 2017 14:40:23 +0200 Subject: [PATCH 12/16] Avoid use sleep function --- test/acceptance/batch/batch-queue-discover.test.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/acceptance/batch/batch-queue-discover.test.js b/test/acceptance/batch/batch-queue-discover.test.js index 11b48459..987aec36 100644 --- a/test/acceptance/batch/batch-queue-discover.test.js +++ b/test/acceptance/batch/batch-queue-discover.test.js @@ -40,7 +40,7 @@ describe('batch startup', function() { } it('should feed queue index at startup', function (done) { - createJob('select pg_sleep(3)', function (err) { + createJob('select 1', function (err) { if (err) { return done(err); } From 36d5dc17195681b6643d1422571ea22b99810c71 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Garc=C3=ADa=20Aubert?= Date: Mon, 3 Apr 2017 19:26:21 +0200 Subject: [PATCH 13/16] Queue seeker refactor: - Extract from job subcriber everything related to queue dicovering - Job Queue is responsible of queue dicovering and exposes functions to scan user queues. - Batch creates an interval to consume queues - Now Batch.start() does: * Finds existing queues from metadata, adding to queue index the ones that are not indexed yet * Binds callback to process jobs for every queue found. * Inits queue consumer interval * Subscribes to PUB/SUB channel --- batch/batch.js | 83 ++++++--- batch/index.js | 4 +- batch/job_queue.js | 89 ++++++++++ batch/pubsub/job-subscriber.js | 30 +--- batch/pubsub/queue-discover.js | 82 --------- .../batch/batch-queue-discover.test.js | 74 -------- test/integration/batch/job-queue.test.js | 163 ++++++++++++++++++ test/integration/batch/queue-discover.js | 78 --------- 8 files changed, 317 insertions(+), 286 deletions(-) delete mode 100644 batch/pubsub/queue-discover.js delete mode 100644 test/acceptance/batch/batch-queue-discover.test.js create mode 100644 test/integration/batch/job-queue.test.js delete mode 100644 test/integration/batch/queue-discover.js diff --git a/batch/batch.js b/batch/batch.js index e73ba2c0..3a9cb9f5 100644 --- a/batch/batch.js +++ b/batch/batch.js @@ -5,21 +5,22 @@ var EventEmitter = require('events').EventEmitter; var debug = require('./util/debug')('batch'); var queue = require('queue-async'); var HostScheduler = require('./scheduler/host-scheduler'); -var startupQueueDiscover = require('./pubsub/queue-discover').startupQueueDiscover; var EMPTY_QUEUE = true; -function Batch(name, jobSubscriber, jobQueue, jobRunner, jobService, jobPublisher, redisPool, logger) { +var MINUTE = 60 * 1000; +var CONSUME_QUEUE_INTERVAL = 1 * MINUTE; + +function Batch(name, userDatabaseMetadataService, jobSubscriber, jobQueue, jobRunner, jobService, redisPool, logger) { EventEmitter.call(this); this.name = name || 'batch'; + this.userDatabaseMetadataService = userDatabaseMetadataService; this.jobSubscriber = jobSubscriber; this.jobQueue = jobQueue; this.jobRunner = jobRunner; this.jobService = jobService; - this.jobPublisher = jobPublisher; this.logger = logger; this.hostScheduler = new HostScheduler(this.name, { run: this.processJob.bind(this) }, redisPool); - this.pool = redisPool; // map: user => jobId. Will be used for draining jobs. this.workInProgressJobs = {}; @@ -30,37 +31,70 @@ module.exports = Batch; Batch.prototype.start = function () { var self = this; + var onJobHandler = createJobHandler(self.name, self.userDatabaseMetadataService, self.hostScheduler); - this.jobSubscriber.subscribe( - function onJobHandler(user, host) { - debug('[%s] onJobHandler(%s, %s)', self.name, user, host); - self.hostScheduler.add(host, user, function(err) { - if (err) { - return debug( - 'Could not schedule host=%s user=%s from %s. Reason: %s', - host, self.name, user, err.message - ); - } - }); - }, - function onJobSubscriberReady(err) { + self.jobQueue.scanQueues(function (err, queues) { + if (err) { + return self.emit('error', err); + } + + queues.forEach(onJobHandler); + self._startQueueConsumerInterval(onJobHandler); + + self.jobSubscriber.subscribe(onJobHandler, function (err) { if (err) { return self.emit('error', err); } - startupQueueDiscover(self.pool, function (err) { - if (err) { - return self.emit('error', err); - } + self.emit('ready'); + }); + }); +}; - self.emit('ready'); +function createJobHandler (name, userDatabaseMetadataService, hostScheduler) { + return function onJobHandler(user) { + userDatabaseMetadataService.getUserMetadata(user, function (err, userDatabaseMetadata) { + if (err) { + return debug('Could not get host user=%s from %s. Reason: %s', user, name, err.message); + } + + var host = userDatabaseMetadata.host; + + debug('[%s] onJobHandler(%s, %s)', name, user, host); + hostScheduler.add(host, user, function(err) { + if (err) { + return debug( + 'Could not schedule host=%s user=%s from %s. Reason: %s', host, user, name, err.message + ); + } }); - } - ); + }); + }; +} + +Batch.prototype._startQueueConsumerInterval = function (onJobHandler) { + var self = this; + + self.consumeQueueInterval = setInterval(function () { + self.jobQueue.getQueues(function (err, queues) { + if (err) { + return debug('Could not get queues from %s. Reason: %s', self.name, err.message); + } + + queues.forEach(onJobHandler); + }); + }, CONSUME_QUEUE_INTERVAL); +}; + +Batch.prototype._stopQueueConsumerInterval = function () { + if (this.consumeQueueInterval) { + clearInterval(this.consumeQueueInterval); + } }; Batch.prototype.processJob = function (user, callback) { var self = this; + self.jobQueue.dequeue(user, function (err, jobId) { if (err) { return callback(new Error('Could not get job from "' + user + '". Reason: ' + err.message), !EMPTY_QUEUE); @@ -157,6 +191,7 @@ Batch.prototype._drainJob = function (user, callback) { Batch.prototype.stop = function (callback) { this.removeAllListeners(); + this._stopQueueConsumerInterval(); this.jobSubscriber.unsubscribe(callback); }; diff --git a/batch/index.js b/batch/index.js index 8e662917..2c0e820b 100644 --- a/batch/index.js +++ b/batch/index.js @@ -15,7 +15,7 @@ var Batch = require('./batch'); module.exports = function batchFactory (metadataBackend, redisPool, name, statsdClient, loggerPath) { var userDatabaseMetadataService = new UserDatabaseMetadataService(metadataBackend); - var jobSubscriber = new JobSubscriber(redisPool, userDatabaseMetadataService); + var jobSubscriber = new JobSubscriber(redisPool); var jobPublisher = new JobPublisher(redisPool); var jobQueue = new JobQueue(metadataBackend, jobPublisher); @@ -28,11 +28,11 @@ module.exports = function batchFactory (metadataBackend, redisPool, name, statsd return new Batch( name, + userDatabaseMetadataService, jobSubscriber, jobQueue, jobRunner, jobService, - jobPublisher, redisPool, logger ); diff --git a/batch/job_queue.js b/batch/job_queue.js index 62bad8f2..2e6135e8 100644 --- a/batch/job_queue.js +++ b/batch/job_queue.js @@ -1,6 +1,7 @@ 'use strict'; var debug = require('./util/debug')('queue'); +var queueAsync = require('queue-async'); function JobQueue(metadataBackend, jobPublisher) { this.metadataBackend = metadataBackend; @@ -64,3 +65,91 @@ JobQueue.prototype.enqueueFirst = function (user, jobId, callback) { debug('JobQueue.enqueueFirst user=%s, jobId=%s', user, jobId); this.metadataBackend.redisCmd(QUEUE.DB, 'RPUSH', [ QUEUE.PREFIX + user, jobId ], callback); }; + + +JobQueue.prototype.getQueues = function (callback) { + this.metadataBackend.redisCmd(QUEUE.DB, 'SMEMBERS', [ QUEUE.INDEX ], function (err, queues) { + if (err) { + return callback(err); + } + + callback(null, queues); + }); +}; + +JobQueue.prototype.scanQueues = function (callback) { + var self = this; + + self.scan(function (err, queues) { + if (err) { + return callback(err); + } + + self.addToQueueIndex(queues, function (err) { + if (err) { + return callback(err); + } + + callback(null, queues); + }); + }); +}; + +JobQueue.prototype.scan = function (callback) { + var self = this; + var initialCursor = ['0']; + var users = {}; + + self._scan(initialCursor, users, function(err, users) { + if (err) { + return callback(err); + } + + callback(null, Object.keys(users)); + }); +}; + +JobQueue.prototype._scan = function (cursor, users, callback) { + var self = this; + var redisParams = [cursor[0], 'MATCH', QUEUE.PREFIX + '*']; + + self.metadataBackend.redisCmd(QUEUE.DB, 'SCAN', redisParams, function (err, currentCursor) { + if (err) { + return callback(null, users); + } + + var queues = currentCursor[1]; + if (queues) { + queues.forEach(function (queue) { + var user = queue.substr(QUEUE.PREFIX.length); + users[user] = true; + }); + } + + var hasMore = currentCursor[0] !== '0'; + if (!hasMore) { + return callback(null, users); + } + + self._scan(currentCursor, users, callback); + }); +}; + +JobQueue.prototype.addToQueueIndex = function (users, callback) { + var self = this; + var usersQueues = queueAsync(users.length); + + users.forEach(function (user) { + usersQueues.defer(function (user, callback) { + self.metadataBackend.redisCmd(QUEUE.DB, 'SADD', [ QUEUE.INDEX, user], callback); + }, user); + }); + + usersQueues.awaitAll(function (err) { + if (err) { + return callback(err); + } + + callback(null); + }); +}; diff --git a/batch/pubsub/job-subscriber.js b/batch/pubsub/job-subscriber.js index 5f109c24..9c3d6865 100644 --- a/batch/pubsub/job-subscriber.js +++ b/batch/pubsub/job-subscriber.js @@ -1,16 +1,11 @@ 'use strict'; var Channel = require('./channel'); -var queueDiscover = require('./queue-discover'); var debug = require('./../util/debug')('pubsub:subscriber'); var error = require('./../util/debug')('pubsub:subscriber:error'); -var MINUTE = 60 * 1000; -var SUBSCRIBE_INTERVAL = 5 * MINUTE; - -function JobSubscriber(pool, userDatabaseMetadataService) { +function JobSubscriber(pool) { this.pool = pool; - this.userDatabaseMetadataService = userDatabaseMetadataService; } module.exports = JobSubscriber; @@ -18,30 +13,14 @@ module.exports = JobSubscriber; JobSubscriber.prototype.subscribe = function (onJobHandler, callback) { var self = this; - function wrappedJobHandlerListener(user) { - self.userDatabaseMetadataService.getUserMetadata(user, function (err, userDatabaseMetadata) { - if (err) { - if (callback) { - callback(err); - } - return error('Error getting user\'s host: ' + err.message); - } - return onJobHandler(user, userDatabaseMetadata.host); - }); - } - - queueDiscover(self.pool, wrappedJobHandlerListener, function (err, client) { + self.pool.acquire(Channel.DB, function(err, client) { if (err) { if (callback) { callback(err); } - - return error('Error discovering user\'s queues: ' + err.message); + return error('Error adquiring redis client: ' + err.message); } - // do not start any pooling until first seek has finished - self.discoverInterval = setInterval(queueDiscover, SUBSCRIBE_INTERVAL, self.pool, wrappedJobHandlerListener); - self.client = client; client.removeAllListeners('message'); client.unsubscribe(Channel.NAME); @@ -49,7 +28,7 @@ JobSubscriber.prototype.subscribe = function (onJobHandler, callback) { client.on('message', function (channel, user) { debug('message received in channel=%s from user=%s', channel, user); - wrappedJobHandlerListener(user); + onJobHandler(user); }); client.on('error', function () { @@ -65,7 +44,6 @@ JobSubscriber.prototype.subscribe = function (onJobHandler, callback) { }; JobSubscriber.prototype.unsubscribe = function (callback) { - clearInterval(this.discoverInterval); if (this.client && this.client.connected) { this.client.unsubscribe(Channel.NAME, callback); } else { diff --git a/batch/pubsub/queue-discover.js b/batch/pubsub/queue-discover.js deleted file mode 100644 index 09f20eaf..00000000 --- a/batch/pubsub/queue-discover.js +++ /dev/null @@ -1,82 +0,0 @@ -'use strict'; - -var error = require('./../util/debug')('pubsub:queue-discover:error'); -var QUEUE = require('../job_queue').QUEUE; -var queueAsync = require('queue-async'); - -module.exports = function queueDiscover (pool, wrappedJobHandlerListener, callback) { - pool.acquire(QUEUE.DB, function (err, client) { - if (err) { - if (callback) { - callback(err); - } - return error('Error adquiring redis client: ' + err.message); - } - - client.smembers(QUEUE.INDEX, function (err, queues) { - if (err) { - return error('Error getting queues from index: ' + err.message); - } - - queues.forEach(wrappedJobHandlerListener); - - if (callback) { - return callback(null, client, queues); - } - }); - }); -}; - -module.exports.startupQueueDiscover = function startupQueueDiscover (pool, callback) { - var initialCursor = ['0']; - var users = {}; - - pool.acquire(QUEUE.DB, function(err, client) { - if (err) { - return callback(err); - } - - scanQueues(client, initialCursor, users, function(err, users) { - var usernames = Object.keys(users); - var usersQueues = queueAsync(usernames.length); - - usernames.forEach(function (username) { - usersQueues.defer(client.sadd.bind(client), QUEUE.INDEX, username); - }); - - usersQueues.awaitAll(function (err) { - if (err) { - return callback(err); - } - - pool.release(QUEUE.DB, client); - callback(null); - }); - }); - }); -}; - -function scanQueues (client, cursor, users, callback) { - var redisParams = [cursor[0], 'MATCH', QUEUE.PREFIX + '*']; - - client.scan(redisParams, function(err, currentCursor) { - if (err) { - return callback(null, users); - } - - var queues = currentCursor[1]; - if (queues) { - queues.forEach(function (queue) { - var user = queue.substr(QUEUE.PREFIX.length); - users[user] = true; - }); - } - - var hasMore = currentCursor[0] !== '0'; - if (!hasMore) { - return callback(null, users); - } - - scanQueues(client, currentCursor, users, callback); - }); -} diff --git a/test/acceptance/batch/batch-queue-discover.test.js b/test/acceptance/batch/batch-queue-discover.test.js deleted file mode 100644 index 987aec36..00000000 --- a/test/acceptance/batch/batch-queue-discover.test.js +++ /dev/null @@ -1,74 +0,0 @@ -require('../../helper'); -var assert = require('../../support/assert'); -var redisUtils = require('../../support/redis_utils'); -var batchFactory = require('../../../batch/index'); - -var JobPublisher = require('../../../batch/pubsub/job-publisher'); -var JobQueue = require('../../../batch/job_queue'); -var JobBackend = require('../../../batch/job_backend'); -var JobService = require('../../../batch/job_service'); -var UserDatabaseMetadataService = require('../../../batch/user_database_metadata_service'); -var JobCanceller = require('../../../batch/job_canceller'); -var metadataBackend = require('cartodb-redis')({ pool: redisUtils.getPool() }); -var queueDiscover = require('../../../batch/pubsub/queue-discover'); - -describe('batch startup', function() { - var dbInstance = 'localhost'; - var username = 'vizzuality'; - var pool = redisUtils.getPool(); - var jobPublisher = new JobPublisher(pool); - var jobQueue = new JobQueue(metadataBackend, jobPublisher); - var jobBackend = new JobBackend(metadataBackend, jobQueue); - var userDatabaseMetadataService = new UserDatabaseMetadataService(metadataBackend); - var jobCanceller = new JobCanceller(userDatabaseMetadataService); - var jobService = new JobService(jobBackend, jobCanceller); - - function createJob(sql, done) { - var data = { - user: username, - query: sql, - host: dbInstance - }; - - jobService.create(data, function (err, job) { - if (err) { - return done(err); - } - - done(null, job.serialize()); - }); - } - - it('should feed queue index at startup', function (done) { - createJob('select 1', function (err) { - if (err) { - return done(err); - } - - var batch = batchFactory(metadataBackend, pool); - batch.start(); - batch.on('ready', function () { - var queuesDiscovered = 0; - - var onDiscoveredQueue = function () { - queuesDiscovered += 1; - }; - - queueDiscover(pool, onDiscoveredQueue, function (err, client, queues) { - if (err) { - done(err); - } - - assert.equal(queues.length, 1); - assert.equal(queues[0], 'vizzuality'); - assert.equal(queuesDiscovered, 1); - - batch.stop(function () { - redisUtils.clean('batch:*', done); - }); - }); - }); - }); - }); - -}); diff --git a/test/integration/batch/job-queue.test.js b/test/integration/batch/job-queue.test.js new file mode 100644 index 00000000..63fa1780 --- /dev/null +++ b/test/integration/batch/job-queue.test.js @@ -0,0 +1,163 @@ +'use strict'; + +require('../../helper'); +var assert = require('../../support/assert'); +var redisUtils = require('../../support/redis_utils'); + +var metadataBackend = require('cartodb-redis')({ pool: redisUtils.getPool() }); +var JobPublisher = require('../../../batch/pubsub/job-publisher'); +var JobQueue = require('../../../batch/job_queue'); + +var JobBackend = require('../../../batch/job_backend'); +var JobService = require('../../../batch/job_service'); +var UserDatabaseMetadataService = require('../../../batch/user_database_metadata_service'); +var JobCanceller = require('../../../batch/job_canceller'); +var metadataBackend = require('cartodb-redis')({ pool: redisUtils.getPool() }); + +describe('job queue', function () { + var pool = redisUtils.getPool(); + var jobPublisher = new JobPublisher(pool); + var jobQueue = new JobQueue(metadataBackend, jobPublisher); + var jobBackend = new JobBackend(metadataBackend, jobQueue); + var userDatabaseMetadataService = new UserDatabaseMetadataService(metadataBackend); + var jobCanceller = new JobCanceller(userDatabaseMetadataService); + var jobService = new JobService(jobBackend, jobCanceller); + + var userA = 'userA'; + var userB = 'userB'; + + beforeEach(function () { + this.jobQueue = new JobQueue(metadataBackend, jobPublisher); + }); + + afterEach(function (done) { + redisUtils.clean('batch:*', done); + }); + + it('should find queues for one user', function (done) { + var self = this; + + this.jobQueue.enqueue(userA, 'wadus-wadus-wadus-wadus', function(err) { + if (err) { + return done(err); + } + + self.jobQueue.scanQueues(function (err, queues) { + assert.ifError(err); + assert.equal(queues.length, 1); + assert.equal(queues[0], userA); + return done(); + }); + }); + }); + + it('should find queues for more than one user', function (done) { + var self = this; + + this.jobQueue.enqueue(userA, 'wadus-wadus-wadus-wadus', function(err) { + if (err) { + return done(err); + } + self.jobQueue.enqueue(userB, 'wadus-wadus-wadus-wadus', function(err) { + if (err) { + return done(err); + } + + self.jobQueue.scanQueues(function (err, queues) { + assert.ifError(err); + assert.equal(queues.length, 2); + assert.ok(queues[0] === userA || queues[0] === userB); + assert.ok(queues[1] === userA || queues[1] === userB); + + return done(); + }); + }); + }); + }); + + it('.scanQueues() should feed queue index', function (done) { + var self = this; + + var data = { + user: 'vizzuality', + query: 'select 1 as cartodb_id', + host: 'localhost' + }; + + jobService.create(data, function (err) { + if (err) { + return done(err); + } + + self.jobQueue.scanQueues(function (err, queuesFromScan) { + if (err) { + return done(err); + } + + assert.equal(queuesFromScan.length, 1); + assert.equal(queuesFromScan[0], 'vizzuality'); + + self.jobQueue.getQueues(function (err, queuesFromIndex) { + if (err) { + done(err); + } + + assert.equal(queuesFromIndex.length, 1); + assert.equal(queuesFromIndex[0], 'vizzuality'); + assert.deepEqual(queuesFromIndex, queuesFromScan); + + redisUtils.clean('batch:*', done); + }); + + }); + }); + }); + + it('.scanQueues() should feed queue index', function (done) { + var self = this; + + var jobVizzuality = { + user: 'vizzuality', + query: 'select 1 as cartodb_id', + host: 'localhost' + }; + + var jobWadus = { + user: 'wadus', + query: 'select 1 as cartodb_id', + host: 'localhost' + }; + + jobService.create(jobVizzuality, function (err) { + if (err) { + return done(err); + } + + jobService.create(jobWadus, function (err) { + if (err) { + return done(err); + } + + self.jobQueue.scanQueues(function (err, queuesFromScan) { + if (err) { + return done(err); + } + + assert.equal(queuesFromScan.length, 2); + + self.jobQueue.getQueues(function (err, queuesFromIndex) { + if (err) { + done(err); + } + + assert.equal(queuesFromIndex.length, 2); + assert.deepEqual(queuesFromIndex, queuesFromScan); + + redisUtils.clean('batch:*', done); + }); + + }); + }); + }); + }); +}); diff --git a/test/integration/batch/queue-discover.js b/test/integration/batch/queue-discover.js deleted file mode 100644 index b82e85c2..00000000 --- a/test/integration/batch/queue-discover.js +++ /dev/null @@ -1,78 +0,0 @@ -'use strict'; - -require('../../helper'); -var assert = require('../../support/assert'); -var redisUtils = require('../../support/redis_utils'); - -var metadataBackend = require('cartodb-redis')({ pool: redisUtils.getPool() }); -var JobPublisher = require('../../../batch/pubsub/job-publisher'); -var JobQueue = require('../../../batch/job_queue'); - -var jobPublisher = new JobPublisher(redisUtils.getPool()); -var queueDiscover = require('../../../batch/pubsub/queue-discover'); - -describe('queue discover', function () { - var userA = 'userA'; - var userB = 'userB'; - - beforeEach(function () { - this.jobQueue = new JobQueue(metadataBackend, jobPublisher); - }); - - afterEach(function (done) { - redisUtils.clean('batch:*', done); - }); - - it('should find queues for one user', function (done) { - this.jobQueue.enqueue(userA, 'wadus-wadus-wadus-wadus', function(err) { - if (err) { - return done(err); - } - - var onQueueDiscoveredCalledNumber = 0; - - function onQueueDiscovered () { - onQueueDiscoveredCalledNumber += 1; - } - - queueDiscover(redisUtils.getPool(), onQueueDiscovered, function (err, client, queues) { - assert.ifError(err); - assert.equal(queues.length, 1); - assert.equal(onQueueDiscoveredCalledNumber, queues.length); - assert.equal(queues[0], userA); - - return done(); - }); - }); - }); - - it('should find queues for more than one user', function (done) { - var self = this; - this.jobQueue.enqueue(userA, 'wadus-wadus-wadus-wadus', function(err) { - if (err) { - return done(err); - } - self.jobQueue.enqueue(userB, 'wadus-wadus-wadus-wadus', function(err) { - if (err) { - return done(err); - } - - var onQueueDiscoveredCalledNumber = 0; - - function onQueueDiscovered () { - onQueueDiscoveredCalledNumber += 1; - } - - queueDiscover(redisUtils.getPool(), onQueueDiscovered, function (err, client, queues) { - assert.ifError(err); - assert.equal(queues.length, 2); - assert.equal(onQueueDiscoveredCalledNumber, queues.length); - assert.ok(queues[0] === userA || queues[0] === userB); - assert.ok(queues[1] === userA || queues[1] === userB); - - return done(); - }); - }); - }); - }); -}); From 25a291bb38d353c690e211588d577adadc713739 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Garc=C3=ADa=20Aubert?= Date: Tue, 4 Apr 2017 09:50:45 +0200 Subject: [PATCH 14/16] Improve waek assertion --- test/integration/batch/job-queue.test.js | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/test/integration/batch/job-queue.test.js b/test/integration/batch/job-queue.test.js index 63fa1780..e23c35ec 100644 --- a/test/integration/batch/job-queue.test.js +++ b/test/integration/batch/job-queue.test.js @@ -95,7 +95,7 @@ describe('job queue', function () { } assert.equal(queuesFromScan.length, 1); - assert.equal(queuesFromScan[0], 'vizzuality'); + assert.ok(queuesFromScan.indexOf(data.user) >= 0); self.jobQueue.getQueues(function (err, queuesFromIndex) { if (err) { @@ -103,8 +103,7 @@ describe('job queue', function () { } assert.equal(queuesFromIndex.length, 1); - assert.equal(queuesFromIndex[0], 'vizzuality'); - assert.deepEqual(queuesFromIndex, queuesFromScan); + assert.ok(queuesFromIndex.indexOf(data.user) >= 0); redisUtils.clean('batch:*', done); }); @@ -113,7 +112,7 @@ describe('job queue', function () { }); }); - it('.scanQueues() should feed queue index', function (done) { + it('.scanQueues() should feed queue index with two users', function (done) { var self = this; var jobVizzuality = { @@ -144,6 +143,8 @@ describe('job queue', function () { } assert.equal(queuesFromScan.length, 2); + assert.ok(queuesFromScan.indexOf(jobVizzuality.user) >= 0); + assert.ok(queuesFromScan.indexOf(jobWadus.user) >= 0); self.jobQueue.getQueues(function (err, queuesFromIndex) { if (err) { @@ -151,7 +152,8 @@ describe('job queue', function () { } assert.equal(queuesFromIndex.length, 2); - assert.deepEqual(queuesFromIndex, queuesFromScan); + assert.ok(queuesFromIndex.indexOf(jobVizzuality.user) >= 0); + assert.ok(queuesFromIndex.indexOf(jobWadus.user) >= 0); redisUtils.clean('batch:*', done); }); From c1dff1e3953f04e3c6a10ba8df140c0bc130f94a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Garc=C3=ADa=20Aubert?= Date: Tue, 4 Apr 2017 10:08:08 +0200 Subject: [PATCH 15/16] Improve naming --- batch/batch.js | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/batch/batch.js b/batch/batch.js index 3a9cb9f5..a7a5e72f 100644 --- a/batch/batch.js +++ b/batch/batch.js @@ -9,7 +9,7 @@ var HostScheduler = require('./scheduler/host-scheduler'); var EMPTY_QUEUE = true; var MINUTE = 60 * 1000; -var CONSUME_QUEUE_INTERVAL = 1 * MINUTE; +var SCHEDULE_INTERVAL = 1 * MINUTE; function Batch(name, userDatabaseMetadataService, jobSubscriber, jobQueue, jobRunner, jobService, redisPool, logger) { EventEmitter.call(this); @@ -39,7 +39,7 @@ Batch.prototype.start = function () { } queues.forEach(onJobHandler); - self._startQueueConsumerInterval(onJobHandler); + self._startScheduleInterval(onJobHandler); self.jobSubscriber.subscribe(onJobHandler, function (err) { if (err) { @@ -72,10 +72,10 @@ function createJobHandler (name, userDatabaseMetadataService, hostScheduler) { }; } -Batch.prototype._startQueueConsumerInterval = function (onJobHandler) { +Batch.prototype._startScheduleInterval = function (onJobHandler) { var self = this; - self.consumeQueueInterval = setInterval(function () { + self.scheduleInterval = setInterval(function () { self.jobQueue.getQueues(function (err, queues) { if (err) { return debug('Could not get queues from %s. Reason: %s', self.name, err.message); @@ -83,12 +83,12 @@ Batch.prototype._startQueueConsumerInterval = function (onJobHandler) { queues.forEach(onJobHandler); }); - }, CONSUME_QUEUE_INTERVAL); + }, SCHEDULE_INTERVAL); }; -Batch.prototype._stopQueueConsumerInterval = function () { - if (this.consumeQueueInterval) { - clearInterval(this.consumeQueueInterval); +Batch.prototype._stopScheduleInterval = function () { + if (this.scheduleInterval) { + clearInterval(this.scheduleInterval); } }; @@ -191,7 +191,7 @@ Batch.prototype._drainJob = function (user, callback) { Batch.prototype.stop = function (callback) { this.removeAllListeners(); - this._stopQueueConsumerInterval(); + this._stopScheduleInterval(); this.jobSubscriber.unsubscribe(callback); }; From ea9454d87ce9dc5441a404b27b12e3e692345216 Mon Sep 17 00:00:00 2001 From: Raul Ochoa Date: Tue, 4 Apr 2017 15:42:34 +0200 Subject: [PATCH 16/16] Add test to validate old jobs will get discovered --- test/integration/batch/job-queue.test.js | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/test/integration/batch/job-queue.test.js b/test/integration/batch/job-queue.test.js index e23c35ec..e6b074ae 100644 --- a/test/integration/batch/job-queue.test.js +++ b/test/integration/batch/job-queue.test.js @@ -75,6 +75,22 @@ describe('job queue', function () { }); }); + it('should find queues from jobs not using new Redis SETs for users', function(done) { + var self = this; + var redisArgs = [JobQueue.QUEUE.PREFIX + userA, 'wadus-id']; + metadataBackend.redisCmd(JobQueue.QUEUE.DB, 'LPUSH', redisArgs, function (err) { + assert.ok(!err, err); + self.jobQueue.scanQueues(function (err, queues) { + assert.ok(!err, err); + + assert.equal(queues.length, 1); + assert.equal(queues[0], userA); + + return done(); + }); + }); + }); + it('.scanQueues() should feed queue index', function (done) { var self = this;