2016-10-12 01:01:39 +08:00
|
|
|
'use strict';
|
|
|
|
|
2016-10-12 23:53:03 +08:00
|
|
|
var QUEUE = require('../job_queue').QUEUE;
|
|
|
|
|
2016-10-12 01:01:39 +08:00
|
|
|
function QueueSeeker(pool) {
|
|
|
|
this.pool = pool;
|
|
|
|
}
|
|
|
|
|
|
|
|
module.exports = QueueSeeker;
|
|
|
|
|
2016-10-12 01:45:26 +08:00
|
|
|
QueueSeeker.prototype.seek = function (callback) {
|
2016-10-12 01:01:39 +08:00
|
|
|
var initialCursor = ['0'];
|
2016-10-12 01:45:26 +08:00
|
|
|
var hosts = {};
|
|
|
|
this._seek(initialCursor, hosts, callback);
|
2016-10-12 01:01:39 +08:00
|
|
|
};
|
|
|
|
|
2016-10-12 01:45:26 +08:00
|
|
|
QueueSeeker.prototype._seek = function (cursor, hosts, callback) {
|
2016-10-12 01:01:39 +08:00
|
|
|
var self = this;
|
2016-10-12 23:53:03 +08:00
|
|
|
var redisParams = [cursor[0], 'MATCH', QUEUE.PREFIX + '*'];
|
2016-10-12 01:01:39 +08:00
|
|
|
|
2016-10-12 23:53:03 +08:00
|
|
|
this.pool.acquire(QUEUE.DB, function(err, client) {
|
2016-10-12 01:01:39 +08:00
|
|
|
if (err) {
|
|
|
|
return callback(err);
|
|
|
|
}
|
|
|
|
|
|
|
|
client.scan(redisParams, function(err, currentCursor) {
|
|
|
|
// checks if iteration has ended
|
|
|
|
if (currentCursor[0] === '0') {
|
2016-10-12 23:53:03 +08:00
|
|
|
self.pool.release(QUEUE.DB, client);
|
2016-10-12 01:45:26 +08:00
|
|
|
return callback(null, Object.keys(hosts));
|
2016-10-12 01:01:39 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
var queues = currentCursor[1];
|
|
|
|
|
|
|
|
if (!queues) {
|
|
|
|
return callback(null);
|
|
|
|
}
|
|
|
|
|
|
|
|
queues.forEach(function (queue) {
|
2016-10-12 23:53:03 +08:00
|
|
|
var host = queue.substr(QUEUE.PREFIX.length);
|
2016-10-12 01:45:26 +08:00
|
|
|
hosts[host] = true;
|
2016-10-12 01:01:39 +08:00
|
|
|
});
|
|
|
|
|
2016-10-12 01:45:26 +08:00
|
|
|
self._seek(currentCursor, hosts, callback);
|
2016-10-12 01:01:39 +08:00
|
|
|
});
|
|
|
|
});
|
|
|
|
};
|