From 24ff1cf8087fcb1145a86f87097ccf6f1f944577 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Garc=C3=ADa=20Aubert?= Date: Mon, 3 Apr 2017 12:34:30 +0200 Subject: [PATCH] Discover and add to index queues of users with jobs on batch service startup --- batch/batch.js | 10 ++++++- batch/pubsub/queue-discover.js | 55 ++++++++++++++++++++++++++++++++++ 2 files changed, 64 insertions(+), 1 deletion(-) diff --git a/batch/batch.js b/batch/batch.js index ad4ec064..e73ba2c0 100644 --- a/batch/batch.js +++ b/batch/batch.js @@ -5,6 +5,7 @@ var EventEmitter = require('events').EventEmitter; var debug = require('./util/debug')('batch'); var queue = require('queue-async'); var HostScheduler = require('./scheduler/host-scheduler'); +var startupQueueDiscover = require('./pubsub/queue-discover').startupQueueDiscover; var EMPTY_QUEUE = true; @@ -18,6 +19,7 @@ function Batch(name, jobSubscriber, jobQueue, jobRunner, jobService, jobPublishe this.jobPublisher = jobPublisher; this.logger = logger; this.hostScheduler = new HostScheduler(this.name, { run: this.processJob.bind(this) }, redisPool); + this.pool = redisPool; // map: user => jobId. Will be used for draining jobs. this.workInProgressJobs = {}; @@ -46,7 +48,13 @@ Batch.prototype.start = function () { return self.emit('error', err); } - self.emit('ready'); + startupQueueDiscover(self.pool, function (err) { + if (err) { + return self.emit('error', err); + } + + self.emit('ready'); + }); } ); }; diff --git a/batch/pubsub/queue-discover.js b/batch/pubsub/queue-discover.js index 7a0474a8..09f20eaf 100644 --- a/batch/pubsub/queue-discover.js +++ b/batch/pubsub/queue-discover.js @@ -2,6 +2,7 @@ var error = require('./../util/debug')('pubsub:queue-discover:error'); var QUEUE = require('../job_queue').QUEUE; +var queueAsync = require('queue-async'); module.exports = function queueDiscover (pool, wrappedJobHandlerListener, callback) { pool.acquire(QUEUE.DB, function (err, client) { @@ -25,3 +26,57 @@ module.exports = function queueDiscover (pool, wrappedJobHandlerListener, callba }); }); }; + +module.exports.startupQueueDiscover = function startupQueueDiscover (pool, callback) { + var initialCursor = ['0']; + var users = {}; + + pool.acquire(QUEUE.DB, function(err, client) { + if (err) { + return callback(err); + } + + scanQueues(client, initialCursor, users, function(err, users) { + var usernames = Object.keys(users); + var usersQueues = queueAsync(usernames.length); + + usernames.forEach(function (username) { + usersQueues.defer(client.sadd.bind(client), QUEUE.INDEX, username); + }); + + usersQueues.awaitAll(function (err) { + if (err) { + return callback(err); + } + + pool.release(QUEUE.DB, client); + callback(null); + }); + }); + }); +}; + +function scanQueues (client, cursor, users, callback) { + var redisParams = [cursor[0], 'MATCH', QUEUE.PREFIX + '*']; + + client.scan(redisParams, function(err, currentCursor) { + if (err) { + return callback(null, users); + } + + var queues = currentCursor[1]; + if (queues) { + queues.forEach(function (queue) { + var user = queue.substr(QUEUE.PREFIX.length); + users[user] = true; + }); + } + + var hasMore = currentCursor[0] !== '0'; + if (!hasMore) { + return callback(null, users); + } + + scanQueues(client, currentCursor, users, callback); + }); +}