Discover and add to index queues of users with jobs on batch service startup
This commit is contained in:
parent
8f35e1d1af
commit
24ff1cf808
@ -5,6 +5,7 @@ var EventEmitter = require('events').EventEmitter;
|
|||||||
var debug = require('./util/debug')('batch');
|
var debug = require('./util/debug')('batch');
|
||||||
var queue = require('queue-async');
|
var queue = require('queue-async');
|
||||||
var HostScheduler = require('./scheduler/host-scheduler');
|
var HostScheduler = require('./scheduler/host-scheduler');
|
||||||
|
var startupQueueDiscover = require('./pubsub/queue-discover').startupQueueDiscover;
|
||||||
|
|
||||||
var EMPTY_QUEUE = true;
|
var EMPTY_QUEUE = true;
|
||||||
|
|
||||||
@ -18,6 +19,7 @@ function Batch(name, jobSubscriber, jobQueue, jobRunner, jobService, jobPublishe
|
|||||||
this.jobPublisher = jobPublisher;
|
this.jobPublisher = jobPublisher;
|
||||||
this.logger = logger;
|
this.logger = logger;
|
||||||
this.hostScheduler = new HostScheduler(this.name, { run: this.processJob.bind(this) }, redisPool);
|
this.hostScheduler = new HostScheduler(this.name, { run: this.processJob.bind(this) }, redisPool);
|
||||||
|
this.pool = redisPool;
|
||||||
|
|
||||||
// map: user => jobId. Will be used for draining jobs.
|
// map: user => jobId. Will be used for draining jobs.
|
||||||
this.workInProgressJobs = {};
|
this.workInProgressJobs = {};
|
||||||
@ -46,7 +48,13 @@ Batch.prototype.start = function () {
|
|||||||
return self.emit('error', err);
|
return self.emit('error', err);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
startupQueueDiscover(self.pool, function (err) {
|
||||||
|
if (err) {
|
||||||
|
return self.emit('error', err);
|
||||||
|
}
|
||||||
|
|
||||||
self.emit('ready');
|
self.emit('ready');
|
||||||
|
});
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
};
|
};
|
||||||
|
@ -2,6 +2,7 @@
|
|||||||
|
|
||||||
var error = require('./../util/debug')('pubsub:queue-discover:error');
|
var error = require('./../util/debug')('pubsub:queue-discover:error');
|
||||||
var QUEUE = require('../job_queue').QUEUE;
|
var QUEUE = require('../job_queue').QUEUE;
|
||||||
|
var queueAsync = require('queue-async');
|
||||||
|
|
||||||
module.exports = function queueDiscover (pool, wrappedJobHandlerListener, callback) {
|
module.exports = function queueDiscover (pool, wrappedJobHandlerListener, callback) {
|
||||||
pool.acquire(QUEUE.DB, function (err, client) {
|
pool.acquire(QUEUE.DB, function (err, client) {
|
||||||
@ -25,3 +26,57 @@ module.exports = function queueDiscover (pool, wrappedJobHandlerListener, callba
|
|||||||
});
|
});
|
||||||
});
|
});
|
||||||
};
|
};
|
||||||
|
|
||||||
|
module.exports.startupQueueDiscover = function startupQueueDiscover (pool, callback) {
|
||||||
|
var initialCursor = ['0'];
|
||||||
|
var users = {};
|
||||||
|
|
||||||
|
pool.acquire(QUEUE.DB, function(err, client) {
|
||||||
|
if (err) {
|
||||||
|
return callback(err);
|
||||||
|
}
|
||||||
|
|
||||||
|
scanQueues(client, initialCursor, users, function(err, users) {
|
||||||
|
var usernames = Object.keys(users);
|
||||||
|
var usersQueues = queueAsync(usernames.length);
|
||||||
|
|
||||||
|
usernames.forEach(function (username) {
|
||||||
|
usersQueues.defer(client.sadd.bind(client), QUEUE.INDEX, username);
|
||||||
|
});
|
||||||
|
|
||||||
|
usersQueues.awaitAll(function (err) {
|
||||||
|
if (err) {
|
||||||
|
return callback(err);
|
||||||
|
}
|
||||||
|
|
||||||
|
pool.release(QUEUE.DB, client);
|
||||||
|
callback(null);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
};
|
||||||
|
|
||||||
|
function scanQueues (client, cursor, users, callback) {
|
||||||
|
var redisParams = [cursor[0], 'MATCH', QUEUE.PREFIX + '*'];
|
||||||
|
|
||||||
|
client.scan(redisParams, function(err, currentCursor) {
|
||||||
|
if (err) {
|
||||||
|
return callback(null, users);
|
||||||
|
}
|
||||||
|
|
||||||
|
var queues = currentCursor[1];
|
||||||
|
if (queues) {
|
||||||
|
queues.forEach(function (queue) {
|
||||||
|
var user = queue.substr(QUEUE.PREFIX.length);
|
||||||
|
users[user] = true;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
var hasMore = currentCursor[0] !== '0';
|
||||||
|
if (!hasMore) {
|
||||||
|
return callback(null, users);
|
||||||
|
}
|
||||||
|
|
||||||
|
scanQueues(client, currentCursor, users, callback);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user