Avoid scan behaviour to discover active queues of user's jobs, now keeps an index to know which queues are available
This commit is contained in:
parent
6592435e87
commit
8012fe26aa
@ -2,22 +2,29 @@
|
||||
|
||||
var debug = require('./util/debug')('queue');
|
||||
|
||||
function JobQueue(metadataBackend, jobPublisher) {
|
||||
function JobQueue(metadataBackend, jobPublisher, queueIndex) {
|
||||
this.metadataBackend = metadataBackend;
|
||||
this.jobPublisher = jobPublisher;
|
||||
this.queueIndex = queueIndex;
|
||||
}
|
||||
|
||||
module.exports = JobQueue;
|
||||
|
||||
var QUEUE = {
|
||||
DB: 5,
|
||||
PREFIX: 'batch:queue:'
|
||||
PREFIX: 'batch:queue:',
|
||||
INDEX: 'batch:indexes:queue'
|
||||
};
|
||||
|
||||
module.exports.QUEUE = QUEUE;
|
||||
|
||||
JobQueue.prototype.enqueue = function (user, jobId, callback) {
|
||||
debug('JobQueue.enqueue user=%s, jobId=%s', user, jobId);
|
||||
this.metadataBackend.redisCmd(QUEUE.DB, 'LPUSH', [ QUEUE.PREFIX + user, jobId ], function (err) {
|
||||
|
||||
this.metadataBackend.redisMultiCmd(QUEUE.DB, [
|
||||
[ 'LPUSH', QUEUE.PREFIX + user, jobId ],
|
||||
[ 'SADD', QUEUE.INDEX, user ]
|
||||
], function (err) {
|
||||
if (err) {
|
||||
return callback(err);
|
||||
}
|
||||
@ -32,7 +39,23 @@ JobQueue.prototype.size = function (user, callback) {
|
||||
};
|
||||
|
||||
JobQueue.prototype.dequeue = function (user, callback) {
|
||||
this.metadataBackend.redisCmd(QUEUE.DB, 'RPOP', [ QUEUE.PREFIX + user ], function(err, jobId) {
|
||||
var dequeueScript = [
|
||||
'local job_id = redis.call("RPOP", KEYS[1])',
|
||||
'if redis.call("LLEN", KEYS[1]) == 0 then',
|
||||
' redis.call("SREM", KEYS[2], ARGV[1])',
|
||||
'end',
|
||||
'return job_id'
|
||||
].join('\n');
|
||||
|
||||
var redisParams = [
|
||||
dequeueScript, //lua source code
|
||||
2, // Two "keys" to pass
|
||||
QUEUE.PREFIX + user, //KEYS[1], the key of the queue
|
||||
QUEUE.INDEX, //KEYS[2], the key of the index
|
||||
user // ARGV[1] - value of the element to remove form the index
|
||||
];
|
||||
|
||||
this.metadataBackend.redisCmd(QUEUE.DB, 'EVAL', redisParams, function (err, jobId) {
|
||||
debug('JobQueue.dequeued user=%s, jobId=%s', user, jobId);
|
||||
return callback(err, jobId);
|
||||
});
|
||||
|
@ -1,7 +1,7 @@
|
||||
'use strict';
|
||||
|
||||
var Channel = require('./channel');
|
||||
var QueueSeeker = require('./queue-seeker');
|
||||
var queueDiscover = require('./queue-discover');
|
||||
var debug = require('./../util/debug')('pubsub:subscriber');
|
||||
var error = require('./../util/debug')('pubsub:subscriber:error');
|
||||
|
||||
@ -11,74 +11,61 @@ var SUBSCRIBE_INTERVAL = 5 * MINUTE;
|
||||
function JobSubscriber(pool, userDatabaseMetadataService) {
|
||||
this.pool = pool;
|
||||
this.userDatabaseMetadataService = userDatabaseMetadataService;
|
||||
this.queueSeeker = new QueueSeeker(pool);
|
||||
}
|
||||
|
||||
module.exports = JobSubscriber;
|
||||
|
||||
function seeker(queueSeeker, onJobHandler, callback) {
|
||||
queueSeeker.seek(function (err, users) {
|
||||
if (err) {
|
||||
if (callback) {
|
||||
callback(err);
|
||||
}
|
||||
return error(err);
|
||||
}
|
||||
debug('queues found successfully');
|
||||
users.forEach(onJobHandler);
|
||||
|
||||
if (callback) {
|
||||
return callback(null);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
JobSubscriber.prototype.subscribe = function (onJobHandler, callback) {
|
||||
var self = this;
|
||||
|
||||
function wrappedJobHandlerListener(user) {
|
||||
self.userDatabaseMetadataService.getUserMetadata(user, function (err, userDatabaseMetadata) {
|
||||
if (err) {
|
||||
return callback(err);
|
||||
if (callback) {
|
||||
callback(err);
|
||||
}
|
||||
return error('Error getting user\'s host: ' + err.message);
|
||||
}
|
||||
return onJobHandler(user, userDatabaseMetadata.host);
|
||||
});
|
||||
}
|
||||
|
||||
seeker(this.queueSeeker, wrappedJobHandlerListener, function(err) {
|
||||
if (callback) {
|
||||
callback(err);
|
||||
queueDiscover(self.pool, wrappedJobHandlerListener, function (err, client) {
|
||||
if (err) {
|
||||
if (callback) {
|
||||
callback(err);
|
||||
}
|
||||
|
||||
return error('Error discovering user\'s queues: ' + err.message);
|
||||
}
|
||||
|
||||
// do not start any pooling until first seek has finished
|
||||
self.seekerInterval = setInterval(seeker, SUBSCRIBE_INTERVAL, self.queueSeeker, wrappedJobHandlerListener);
|
||||
self.discoverInterval = setInterval(queueDiscover, SUBSCRIBE_INTERVAL, self.pool, wrappedJobHandlerListener);
|
||||
|
||||
self.pool.acquire(Channel.DB, function (err, client) {
|
||||
if (err) {
|
||||
return error('Error adquiring redis client: ' + err.message);
|
||||
}
|
||||
self.client = client;
|
||||
client.removeAllListeners('message');
|
||||
client.unsubscribe(Channel.NAME);
|
||||
client.subscribe(Channel.NAME);
|
||||
|
||||
self.client = client;
|
||||
client.removeAllListeners('message');
|
||||
client.unsubscribe(Channel.NAME);
|
||||
client.subscribe(Channel.NAME);
|
||||
|
||||
client.on('message', function (channel, user) {
|
||||
debug('message received in channel=%s from user=%s', channel, user);
|
||||
wrappedJobHandlerListener(user);
|
||||
});
|
||||
|
||||
client.on('error', function () {
|
||||
self.unsubscribe();
|
||||
self.pool.release(Channel.DB, client);
|
||||
self.subscribe(onJobHandler);
|
||||
});
|
||||
client.on('message', function (channel, user) {
|
||||
debug('message received in channel=%s from user=%s', channel, user);
|
||||
wrappedJobHandlerListener(user);
|
||||
});
|
||||
|
||||
client.on('error', function () {
|
||||
self.unsubscribe();
|
||||
self.pool.release(Channel.DB, client);
|
||||
self.subscribe(onJobHandler);
|
||||
});
|
||||
|
||||
if (callback) {
|
||||
callback();
|
||||
}
|
||||
});
|
||||
};
|
||||
|
||||
JobSubscriber.prototype.unsubscribe = function (callback) {
|
||||
clearInterval(this.seekerInterval);
|
||||
clearInterval(this.discoverInterval);
|
||||
if (this.client && this.client.connected) {
|
||||
this.client.unsubscribe(Channel.NAME, callback);
|
||||
} else {
|
||||
|
27
batch/pubsub/queue-discover.js
Normal file
27
batch/pubsub/queue-discover.js
Normal file
@ -0,0 +1,27 @@
|
||||
'use strict';
|
||||
|
||||
var error = require('./../util/debug')('pubsub:queue-discover:error');
|
||||
var QUEUE = require('../job_queue').QUEUE;
|
||||
|
||||
module.exports = function queueDiscover (pool, wrappedJobHandlerListener, callback) {
|
||||
pool.acquire(QUEUE.DB, function (err, client) {
|
||||
if (err) {
|
||||
if (callback) {
|
||||
callback(err);
|
||||
}
|
||||
return error('Error adquiring redis client: ' + err.message);
|
||||
}
|
||||
|
||||
client.smembers(QUEUE.INDEX, function (err, queues) {
|
||||
if (err) {
|
||||
return error('Error getting queues from index: ' + err.message);
|
||||
}
|
||||
|
||||
queues.forEach(wrappedJobHandlerListener);
|
||||
|
||||
if (callback) {
|
||||
return callback(null, client, queues);
|
||||
}
|
||||
});
|
||||
});
|
||||
};
|
@ -1,57 +0,0 @@
|
||||
'use strict';
|
||||
|
||||
var QUEUE = require('../job_queue').QUEUE;
|
||||
var MAX_SCAN_ATTEMPTS = 50;
|
||||
var SCAN_COUNT_VALUE = 50;
|
||||
|
||||
function QueueSeeker(pool) {
|
||||
this.pool = pool;
|
||||
}
|
||||
|
||||
module.exports = QueueSeeker;
|
||||
|
||||
QueueSeeker.prototype.seek = function (callback) {
|
||||
var initialCursor = ['0'];
|
||||
var attemps = 0;
|
||||
var users = {};
|
||||
var self = this;
|
||||
|
||||
this.pool.acquire(QUEUE.DB, function(err, client) {
|
||||
if (err) {
|
||||
return callback(err);
|
||||
}
|
||||
self._seek(client, initialCursor, users, attemps, function(err, users) {
|
||||
self.pool.release(QUEUE.DB, client);
|
||||
return callback(err, Object.keys(users));
|
||||
});
|
||||
});
|
||||
};
|
||||
|
||||
QueueSeeker.prototype._seek = function (client, cursor, users, attemps, callback) {
|
||||
var self = this;
|
||||
var redisParams = [cursor[0], 'MATCH', QUEUE.PREFIX + '*', 'COUNT', SCAN_COUNT_VALUE];
|
||||
|
||||
client.scan(redisParams, function(err, currentCursor) {
|
||||
if (err) {
|
||||
return callback(null, users);
|
||||
}
|
||||
|
||||
var queues = currentCursor[1];
|
||||
if (Array.isArray(queues)) {
|
||||
for (var i = 0; i < queues.length; i++) {
|
||||
var user = queues[i].substr(QUEUE.PREFIX.length);
|
||||
users[user] = true;
|
||||
}
|
||||
}
|
||||
|
||||
var hasMore = (parseInt(currentCursor[0], 10) > 0) && (attemps < MAX_SCAN_ATTEMPTS);
|
||||
|
||||
if (!hasMore) {
|
||||
return callback(null, users);
|
||||
}
|
||||
|
||||
attemps += 1;
|
||||
|
||||
self._seek(client, currentCursor, users, attemps, callback);
|
||||
});
|
||||
};
|
@ -6,13 +6,12 @@ var redisUtils = require('../../support/redis_utils');
|
||||
|
||||
var metadataBackend = require('cartodb-redis')({ pool: redisUtils.getPool() });
|
||||
var JobPublisher = require('../../../batch/pubsub/job-publisher');
|
||||
var QueueSeeker = require('../../../batch/pubsub/queue-seeker');
|
||||
var JobQueue = require('../../../batch/job_queue');
|
||||
|
||||
var jobPublisher = new JobPublisher(redisUtils.getPool());
|
||||
var queueDiscover = require('../../../batch/pubsub/queue-discover');
|
||||
|
||||
|
||||
describe('queue seeker', function() {
|
||||
describe('queue discover', function () {
|
||||
var userA = 'userA';
|
||||
var userB = 'userB';
|
||||
|
||||
@ -25,15 +24,22 @@ describe('queue seeker', function() {
|
||||
});
|
||||
|
||||
it('should find queues for one user', function (done) {
|
||||
var seeker = new QueueSeeker(redisUtils.getPool());
|
||||
this.jobQueue.enqueue(userA, 'wadus-wadus-wadus-wadus', function(err) {
|
||||
if (err) {
|
||||
return done(err);
|
||||
}
|
||||
seeker.seek(function(err, users) {
|
||||
assert.ok(!err);
|
||||
assert.equal(users.length, 1);
|
||||
assert.equal(users[0], userA);
|
||||
|
||||
var onQueueDiscoveredCalledNumber = 0;
|
||||
|
||||
function onQueueDiscovered () {
|
||||
onQueueDiscoveredCalledNumber += 1;
|
||||
}
|
||||
|
||||
queueDiscover(redisUtils.getPool(), onQueueDiscovered, function (err, client, queues) {
|
||||
assert.ifError(err);
|
||||
assert.equal(queues.length, 1);
|
||||
assert.equal(onQueueDiscoveredCalledNumber, queues.length);
|
||||
assert.equal(queues[0], userA);
|
||||
|
||||
return done();
|
||||
});
|
||||
@ -42,7 +48,6 @@ describe('queue seeker', function() {
|
||||
|
||||
it('should find queues for more than one user', function (done) {
|
||||
var self = this;
|
||||
var seeker = new QueueSeeker(redisUtils.getPool());
|
||||
this.jobQueue.enqueue(userA, 'wadus-wadus-wadus-wadus', function(err) {
|
||||
if (err) {
|
||||
return done(err);
|
||||
@ -51,11 +56,19 @@ describe('queue seeker', function() {
|
||||
if (err) {
|
||||
return done(err);
|
||||
}
|
||||
seeker.seek(function(err, users) {
|
||||
assert.ok(!err);
|
||||
assert.equal(users.length, 2);
|
||||
assert.ok(users[0] === userA || users[0] === userB);
|
||||
assert.ok(users[1] === userA || users[1] === userB);
|
||||
|
||||
var onQueueDiscoveredCalledNumber = 0;
|
||||
|
||||
function onQueueDiscovered () {
|
||||
onQueueDiscoveredCalledNumber += 1;
|
||||
}
|
||||
|
||||
queueDiscover(redisUtils.getPool(), onQueueDiscovered, function (err, client, queues) {
|
||||
assert.ifError(err);
|
||||
assert.equal(queues.length, 2);
|
||||
assert.equal(onQueueDiscoveredCalledNumber, queues.length);
|
||||
assert.ok(queues[0] === userA || queues[0] === userB);
|
||||
assert.ok(queues[1] === userA || queues[1] === userB);
|
||||
|
||||
return done();
|
||||
});
|
@ -9,6 +9,12 @@ describe('batch API job queue', function () {
|
||||
process.nextTick(function () {
|
||||
callback(null, 'irrelevantJob');
|
||||
});
|
||||
},
|
||||
redisMultiCmd: function () {
|
||||
var callback = arguments[arguments.length -1];
|
||||
process.nextTick(function () {
|
||||
callback(null, 'irrelevantJob');
|
||||
});
|
||||
}
|
||||
};
|
||||
this.jobPublisher = {
|
||||
|
@ -30,7 +30,10 @@ describe('batch API job subscriber', function () {
|
||||
removeAllListeners: function () {
|
||||
return this;
|
||||
},
|
||||
connected: true
|
||||
smembers: function (key, callback) {
|
||||
callback(null, []);
|
||||
},
|
||||
connected: true,
|
||||
};
|
||||
this.pool = {
|
||||
acquire: function (db, cb) {
|
||||
|
Loading…
Reference in New Issue
Block a user