Merge pull request #415 from CartoDB/avoid-scan-to-discover-queues
Avoid scan command to discover queues
This commit is contained in:
commit
c4557f3c20
@ -8,14 +8,17 @@ var HostScheduler = require('./scheduler/host-scheduler');
|
|||||||
|
|
||||||
var EMPTY_QUEUE = true;
|
var EMPTY_QUEUE = true;
|
||||||
|
|
||||||
function Batch(name, jobSubscriber, jobQueue, jobRunner, jobService, jobPublisher, redisPool, logger) {
|
var MINUTE = 60 * 1000;
|
||||||
|
var SCHEDULE_INTERVAL = 1 * MINUTE;
|
||||||
|
|
||||||
|
function Batch(name, userDatabaseMetadataService, jobSubscriber, jobQueue, jobRunner, jobService, redisPool, logger) {
|
||||||
EventEmitter.call(this);
|
EventEmitter.call(this);
|
||||||
this.name = name || 'batch';
|
this.name = name || 'batch';
|
||||||
|
this.userDatabaseMetadataService = userDatabaseMetadataService;
|
||||||
this.jobSubscriber = jobSubscriber;
|
this.jobSubscriber = jobSubscriber;
|
||||||
this.jobQueue = jobQueue;
|
this.jobQueue = jobQueue;
|
||||||
this.jobRunner = jobRunner;
|
this.jobRunner = jobRunner;
|
||||||
this.jobService = jobService;
|
this.jobService = jobService;
|
||||||
this.jobPublisher = jobPublisher;
|
|
||||||
this.logger = logger;
|
this.logger = logger;
|
||||||
this.hostScheduler = new HostScheduler(this.name, { run: this.processJob.bind(this) }, redisPool);
|
this.hostScheduler = new HostScheduler(this.name, { run: this.processJob.bind(this) }, redisPool);
|
||||||
|
|
||||||
@ -28,31 +31,70 @@ module.exports = Batch;
|
|||||||
|
|
||||||
Batch.prototype.start = function () {
|
Batch.prototype.start = function () {
|
||||||
var self = this;
|
var self = this;
|
||||||
|
var onJobHandler = createJobHandler(self.name, self.userDatabaseMetadataService, self.hostScheduler);
|
||||||
|
|
||||||
this.jobSubscriber.subscribe(
|
self.jobQueue.scanQueues(function (err, queues) {
|
||||||
function onJobHandler(user, host) {
|
if (err) {
|
||||||
debug('[%s] onJobHandler(%s, %s)', self.name, user, host);
|
return self.emit('error', err);
|
||||||
self.hostScheduler.add(host, user, function(err) {
|
}
|
||||||
if (err) {
|
|
||||||
return debug(
|
queues.forEach(onJobHandler);
|
||||||
'Could not schedule host=%s user=%s from %s. Reason: %s',
|
self._startScheduleInterval(onJobHandler);
|
||||||
host, self.name, user, err.message
|
|
||||||
);
|
self.jobSubscriber.subscribe(onJobHandler, function (err) {
|
||||||
}
|
|
||||||
});
|
|
||||||
},
|
|
||||||
function onJobSubscriberReady(err) {
|
|
||||||
if (err) {
|
if (err) {
|
||||||
return self.emit('error', err);
|
return self.emit('error', err);
|
||||||
}
|
}
|
||||||
|
|
||||||
self.emit('ready');
|
self.emit('ready');
|
||||||
}
|
});
|
||||||
);
|
});
|
||||||
|
};
|
||||||
|
|
||||||
|
function createJobHandler (name, userDatabaseMetadataService, hostScheduler) {
|
||||||
|
return function onJobHandler(user) {
|
||||||
|
userDatabaseMetadataService.getUserMetadata(user, function (err, userDatabaseMetadata) {
|
||||||
|
if (err) {
|
||||||
|
return debug('Could not get host user=%s from %s. Reason: %s', user, name, err.message);
|
||||||
|
}
|
||||||
|
|
||||||
|
var host = userDatabaseMetadata.host;
|
||||||
|
|
||||||
|
debug('[%s] onJobHandler(%s, %s)', name, user, host);
|
||||||
|
hostScheduler.add(host, user, function(err) {
|
||||||
|
if (err) {
|
||||||
|
return debug(
|
||||||
|
'Could not schedule host=%s user=%s from %s. Reason: %s', host, user, name, err.message
|
||||||
|
);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
Batch.prototype._startScheduleInterval = function (onJobHandler) {
|
||||||
|
var self = this;
|
||||||
|
|
||||||
|
self.scheduleInterval = setInterval(function () {
|
||||||
|
self.jobQueue.getQueues(function (err, queues) {
|
||||||
|
if (err) {
|
||||||
|
return debug('Could not get queues from %s. Reason: %s', self.name, err.message);
|
||||||
|
}
|
||||||
|
|
||||||
|
queues.forEach(onJobHandler);
|
||||||
|
});
|
||||||
|
}, SCHEDULE_INTERVAL);
|
||||||
|
};
|
||||||
|
|
||||||
|
Batch.prototype._stopScheduleInterval = function () {
|
||||||
|
if (this.scheduleInterval) {
|
||||||
|
clearInterval(this.scheduleInterval);
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
Batch.prototype.processJob = function (user, callback) {
|
Batch.prototype.processJob = function (user, callback) {
|
||||||
var self = this;
|
var self = this;
|
||||||
|
|
||||||
self.jobQueue.dequeue(user, function (err, jobId) {
|
self.jobQueue.dequeue(user, function (err, jobId) {
|
||||||
if (err) {
|
if (err) {
|
||||||
return callback(new Error('Could not get job from "' + user + '". Reason: ' + err.message), !EMPTY_QUEUE);
|
return callback(new Error('Could not get job from "' + user + '". Reason: ' + err.message), !EMPTY_QUEUE);
|
||||||
@ -149,6 +191,7 @@ Batch.prototype._drainJob = function (user, callback) {
|
|||||||
|
|
||||||
Batch.prototype.stop = function (callback) {
|
Batch.prototype.stop = function (callback) {
|
||||||
this.removeAllListeners();
|
this.removeAllListeners();
|
||||||
|
this._stopScheduleInterval();
|
||||||
this.jobSubscriber.unsubscribe(callback);
|
this.jobSubscriber.unsubscribe(callback);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -15,7 +15,7 @@ var Batch = require('./batch');
|
|||||||
module.exports = function batchFactory (metadataBackend, redisPool, name, statsdClient, loggerPath) {
|
module.exports = function batchFactory (metadataBackend, redisPool, name, statsdClient, loggerPath) {
|
||||||
var userDatabaseMetadataService = new UserDatabaseMetadataService(metadataBackend);
|
var userDatabaseMetadataService = new UserDatabaseMetadataService(metadataBackend);
|
||||||
|
|
||||||
var jobSubscriber = new JobSubscriber(redisPool, userDatabaseMetadataService);
|
var jobSubscriber = new JobSubscriber(redisPool);
|
||||||
var jobPublisher = new JobPublisher(redisPool);
|
var jobPublisher = new JobPublisher(redisPool);
|
||||||
|
|
||||||
var jobQueue = new JobQueue(metadataBackend, jobPublisher);
|
var jobQueue = new JobQueue(metadataBackend, jobPublisher);
|
||||||
@ -28,11 +28,11 @@ module.exports = function batchFactory (metadataBackend, redisPool, name, statsd
|
|||||||
|
|
||||||
return new Batch(
|
return new Batch(
|
||||||
name,
|
name,
|
||||||
|
userDatabaseMetadataService,
|
||||||
jobSubscriber,
|
jobSubscriber,
|
||||||
jobQueue,
|
jobQueue,
|
||||||
jobRunner,
|
jobRunner,
|
||||||
jobService,
|
jobService,
|
||||||
jobPublisher,
|
|
||||||
redisPool,
|
redisPool,
|
||||||
logger
|
logger
|
||||||
);
|
);
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
'use strict';
|
'use strict';
|
||||||
|
|
||||||
var debug = require('./util/debug')('queue');
|
var debug = require('./util/debug')('queue');
|
||||||
|
var queueAsync = require('queue-async');
|
||||||
|
|
||||||
function JobQueue(metadataBackend, jobPublisher) {
|
function JobQueue(metadataBackend, jobPublisher) {
|
||||||
this.metadataBackend = metadataBackend;
|
this.metadataBackend = metadataBackend;
|
||||||
@ -11,13 +12,19 @@ module.exports = JobQueue;
|
|||||||
|
|
||||||
var QUEUE = {
|
var QUEUE = {
|
||||||
DB: 5,
|
DB: 5,
|
||||||
PREFIX: 'batch:queue:'
|
PREFIX: 'batch:queue:',
|
||||||
|
INDEX: 'batch:indexes:queue'
|
||||||
};
|
};
|
||||||
|
|
||||||
module.exports.QUEUE = QUEUE;
|
module.exports.QUEUE = QUEUE;
|
||||||
|
|
||||||
JobQueue.prototype.enqueue = function (user, jobId, callback) {
|
JobQueue.prototype.enqueue = function (user, jobId, callback) {
|
||||||
debug('JobQueue.enqueue user=%s, jobId=%s', user, jobId);
|
debug('JobQueue.enqueue user=%s, jobId=%s', user, jobId);
|
||||||
this.metadataBackend.redisCmd(QUEUE.DB, 'LPUSH', [ QUEUE.PREFIX + user, jobId ], function (err) {
|
|
||||||
|
this.metadataBackend.redisMultiCmd(QUEUE.DB, [
|
||||||
|
[ 'LPUSH', QUEUE.PREFIX + user, jobId ],
|
||||||
|
[ 'SADD', QUEUE.INDEX, user ]
|
||||||
|
], function (err) {
|
||||||
if (err) {
|
if (err) {
|
||||||
return callback(err);
|
return callback(err);
|
||||||
}
|
}
|
||||||
@ -32,7 +39,23 @@ JobQueue.prototype.size = function (user, callback) {
|
|||||||
};
|
};
|
||||||
|
|
||||||
JobQueue.prototype.dequeue = function (user, callback) {
|
JobQueue.prototype.dequeue = function (user, callback) {
|
||||||
this.metadataBackend.redisCmd(QUEUE.DB, 'RPOP', [ QUEUE.PREFIX + user ], function(err, jobId) {
|
var dequeueScript = [
|
||||||
|
'local job_id = redis.call("RPOP", KEYS[1])',
|
||||||
|
'if redis.call("LLEN", KEYS[1]) == 0 then',
|
||||||
|
' redis.call("SREM", KEYS[2], ARGV[1])',
|
||||||
|
'end',
|
||||||
|
'return job_id'
|
||||||
|
].join('\n');
|
||||||
|
|
||||||
|
var redisParams = [
|
||||||
|
dequeueScript, //lua source code
|
||||||
|
2, // Two "keys" to pass
|
||||||
|
QUEUE.PREFIX + user, //KEYS[1], the key of the queue
|
||||||
|
QUEUE.INDEX, //KEYS[2], the key of the index
|
||||||
|
user // ARGV[1] - value of the element to remove from the index
|
||||||
|
];
|
||||||
|
|
||||||
|
this.metadataBackend.redisCmd(QUEUE.DB, 'EVAL', redisParams, function (err, jobId) {
|
||||||
debug('JobQueue.dequeued user=%s, jobId=%s', user, jobId);
|
debug('JobQueue.dequeued user=%s, jobId=%s', user, jobId);
|
||||||
return callback(err, jobId);
|
return callback(err, jobId);
|
||||||
});
|
});
|
||||||
@ -42,3 +65,91 @@ JobQueue.prototype.enqueueFirst = function (user, jobId, callback) {
|
|||||||
debug('JobQueue.enqueueFirst user=%s, jobId=%s', user, jobId);
|
debug('JobQueue.enqueueFirst user=%s, jobId=%s', user, jobId);
|
||||||
this.metadataBackend.redisCmd(QUEUE.DB, 'RPUSH', [ QUEUE.PREFIX + user, jobId ], callback);
|
this.metadataBackend.redisCmd(QUEUE.DB, 'RPUSH', [ QUEUE.PREFIX + user, jobId ], callback);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
JobQueue.prototype.getQueues = function (callback) {
|
||||||
|
this.metadataBackend.redisCmd(QUEUE.DB, 'SMEMBERS', [ QUEUE.INDEX ], function (err, queues) {
|
||||||
|
if (err) {
|
||||||
|
return callback(err);
|
||||||
|
}
|
||||||
|
|
||||||
|
callback(null, queues);
|
||||||
|
});
|
||||||
|
};
|
||||||
|
|
||||||
|
JobQueue.prototype.scanQueues = function (callback) {
|
||||||
|
var self = this;
|
||||||
|
|
||||||
|
self.scan(function (err, queues) {
|
||||||
|
if (err) {
|
||||||
|
return callback(err);
|
||||||
|
}
|
||||||
|
|
||||||
|
self.addToQueueIndex(queues, function (err) {
|
||||||
|
if (err) {
|
||||||
|
return callback(err);
|
||||||
|
}
|
||||||
|
|
||||||
|
callback(null, queues);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
};
|
||||||
|
|
||||||
|
JobQueue.prototype.scan = function (callback) {
|
||||||
|
var self = this;
|
||||||
|
var initialCursor = ['0'];
|
||||||
|
var users = {};
|
||||||
|
|
||||||
|
self._scan(initialCursor, users, function(err, users) {
|
||||||
|
if (err) {
|
||||||
|
return callback(err);
|
||||||
|
}
|
||||||
|
|
||||||
|
callback(null, Object.keys(users));
|
||||||
|
});
|
||||||
|
};
|
||||||
|
|
||||||
|
JobQueue.prototype._scan = function (cursor, users, callback) {
|
||||||
|
var self = this;
|
||||||
|
var redisParams = [cursor[0], 'MATCH', QUEUE.PREFIX + '*'];
|
||||||
|
|
||||||
|
self.metadataBackend.redisCmd(QUEUE.DB, 'SCAN', redisParams, function (err, currentCursor) {
|
||||||
|
if (err) {
|
||||||
|
return callback(null, users);
|
||||||
|
}
|
||||||
|
|
||||||
|
var queues = currentCursor[1];
|
||||||
|
if (queues) {
|
||||||
|
queues.forEach(function (queue) {
|
||||||
|
var user = queue.substr(QUEUE.PREFIX.length);
|
||||||
|
users[user] = true;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
var hasMore = currentCursor[0] !== '0';
|
||||||
|
if (!hasMore) {
|
||||||
|
return callback(null, users);
|
||||||
|
}
|
||||||
|
|
||||||
|
self._scan(currentCursor, users, callback);
|
||||||
|
});
|
||||||
|
};
|
||||||
|
|
||||||
|
JobQueue.prototype.addToQueueIndex = function (users, callback) {
|
||||||
|
var self = this;
|
||||||
|
var usersQueues = queueAsync(users.length);
|
||||||
|
|
||||||
|
users.forEach(function (user) {
|
||||||
|
usersQueues.defer(function (user, callback) {
|
||||||
|
self.metadataBackend.redisCmd(QUEUE.DB, 'SADD', [ QUEUE.INDEX, user], callback);
|
||||||
|
}, user);
|
||||||
|
});
|
||||||
|
|
||||||
|
usersQueues.awaitAll(function (err) {
|
||||||
|
if (err) {
|
||||||
|
return callback(err);
|
||||||
|
}
|
||||||
|
|
||||||
|
callback(null);
|
||||||
|
});
|
||||||
|
};
|
||||||
|
@ -1,84 +1,49 @@
|
|||||||
'use strict';
|
'use strict';
|
||||||
|
|
||||||
var Channel = require('./channel');
|
var Channel = require('./channel');
|
||||||
var QueueSeeker = require('./queue-seeker');
|
|
||||||
var debug = require('./../util/debug')('pubsub:subscriber');
|
var debug = require('./../util/debug')('pubsub:subscriber');
|
||||||
var error = require('./../util/debug')('pubsub:subscriber:error');
|
var error = require('./../util/debug')('pubsub:subscriber:error');
|
||||||
|
|
||||||
var MINUTE = 60 * 1000;
|
function JobSubscriber(pool) {
|
||||||
var SUBSCRIBE_INTERVAL = 5 * MINUTE;
|
|
||||||
|
|
||||||
function JobSubscriber(pool, userDatabaseMetadataService) {
|
|
||||||
this.pool = pool;
|
this.pool = pool;
|
||||||
this.userDatabaseMetadataService = userDatabaseMetadataService;
|
|
||||||
this.queueSeeker = new QueueSeeker(pool);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
module.exports = JobSubscriber;
|
module.exports = JobSubscriber;
|
||||||
|
|
||||||
function seeker(queueSeeker, onJobHandler, callback) {
|
JobSubscriber.prototype.subscribe = function (onJobHandler, callback) {
|
||||||
queueSeeker.seek(function (err, users) {
|
var self = this;
|
||||||
|
|
||||||
|
self.pool.acquire(Channel.DB, function(err, client) {
|
||||||
if (err) {
|
if (err) {
|
||||||
if (callback) {
|
if (callback) {
|
||||||
callback(err);
|
callback(err);
|
||||||
}
|
}
|
||||||
return error(err);
|
return error('Error adquiring redis client: ' + err.message);
|
||||||
}
|
}
|
||||||
debug('queues found successfully');
|
|
||||||
users.forEach(onJobHandler);
|
self.client = client;
|
||||||
|
client.removeAllListeners('message');
|
||||||
|
client.unsubscribe(Channel.NAME);
|
||||||
|
client.subscribe(Channel.NAME);
|
||||||
|
|
||||||
|
client.on('message', function (channel, user) {
|
||||||
|
debug('message received in channel=%s from user=%s', channel, user);
|
||||||
|
onJobHandler(user);
|
||||||
|
});
|
||||||
|
|
||||||
|
client.on('error', function () {
|
||||||
|
self.unsubscribe();
|
||||||
|
self.pool.release(Channel.DB, client);
|
||||||
|
self.subscribe(onJobHandler);
|
||||||
|
});
|
||||||
|
|
||||||
if (callback) {
|
if (callback) {
|
||||||
return callback(null);
|
callback();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
|
||||||
|
|
||||||
JobSubscriber.prototype.subscribe = function (onJobHandler, callback) {
|
|
||||||
var self = this;
|
|
||||||
|
|
||||||
function wrappedJobHandlerListener(user) {
|
|
||||||
self.userDatabaseMetadataService.getUserMetadata(user, function (err, userDatabaseMetadata) {
|
|
||||||
if (err) {
|
|
||||||
return callback(err);
|
|
||||||
}
|
|
||||||
return onJobHandler(user, userDatabaseMetadata.host);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
seeker(this.queueSeeker, wrappedJobHandlerListener, function(err) {
|
|
||||||
if (callback) {
|
|
||||||
callback(err);
|
|
||||||
}
|
|
||||||
|
|
||||||
// do not start any pooling until first seek has finished
|
|
||||||
self.seekerInterval = setInterval(seeker, SUBSCRIBE_INTERVAL, self.queueSeeker, wrappedJobHandlerListener);
|
|
||||||
|
|
||||||
self.pool.acquire(Channel.DB, function (err, client) {
|
|
||||||
if (err) {
|
|
||||||
return error('Error adquiring redis client: ' + err.message);
|
|
||||||
}
|
|
||||||
|
|
||||||
self.client = client;
|
|
||||||
client.removeAllListeners('message');
|
|
||||||
client.unsubscribe(Channel.NAME);
|
|
||||||
client.subscribe(Channel.NAME);
|
|
||||||
|
|
||||||
client.on('message', function (channel, user) {
|
|
||||||
debug('message received in channel=%s from user=%s', channel, user);
|
|
||||||
wrappedJobHandlerListener(user);
|
|
||||||
});
|
|
||||||
|
|
||||||
client.on('error', function () {
|
|
||||||
self.unsubscribe();
|
|
||||||
self.pool.release(Channel.DB, client);
|
|
||||||
self.subscribe(onJobHandler);
|
|
||||||
});
|
|
||||||
});
|
|
||||||
});
|
|
||||||
};
|
};
|
||||||
|
|
||||||
JobSubscriber.prototype.unsubscribe = function (callback) {
|
JobSubscriber.prototype.unsubscribe = function (callback) {
|
||||||
clearInterval(this.seekerInterval);
|
|
||||||
if (this.client && this.client.connected) {
|
if (this.client && this.client.connected) {
|
||||||
this.client.unsubscribe(Channel.NAME, callback);
|
this.client.unsubscribe(Channel.NAME, callback);
|
||||||
} else {
|
} else {
|
||||||
|
@ -1,51 +0,0 @@
|
|||||||
'use strict';
|
|
||||||
|
|
||||||
var QUEUE = require('../job_queue').QUEUE;
|
|
||||||
|
|
||||||
function QueueSeeker(pool) {
|
|
||||||
this.pool = pool;
|
|
||||||
}
|
|
||||||
|
|
||||||
module.exports = QueueSeeker;
|
|
||||||
|
|
||||||
QueueSeeker.prototype.seek = function (callback) {
|
|
||||||
var initialCursor = ['0'];
|
|
||||||
var users = {};
|
|
||||||
var self = this;
|
|
||||||
|
|
||||||
this.pool.acquire(QUEUE.DB, function(err, client) {
|
|
||||||
if (err) {
|
|
||||||
return callback(err);
|
|
||||||
}
|
|
||||||
self._seek(client, initialCursor, users, function(err, users) {
|
|
||||||
self.pool.release(QUEUE.DB, client);
|
|
||||||
return callback(err, Object.keys(users));
|
|
||||||
});
|
|
||||||
});
|
|
||||||
};
|
|
||||||
|
|
||||||
QueueSeeker.prototype._seek = function (client, cursor, users, callback) {
|
|
||||||
var self = this;
|
|
||||||
var redisParams = [cursor[0], 'MATCH', QUEUE.PREFIX + '*'];
|
|
||||||
|
|
||||||
client.scan(redisParams, function(err, currentCursor) {
|
|
||||||
if (err) {
|
|
||||||
return callback(null, users);
|
|
||||||
}
|
|
||||||
|
|
||||||
var queues = currentCursor[1];
|
|
||||||
if (queues) {
|
|
||||||
queues.forEach(function (queue) {
|
|
||||||
var user = queue.substr(QUEUE.PREFIX.length);
|
|
||||||
users[user] = true;
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
var hasMore = currentCursor[0] !== '0';
|
|
||||||
if (!hasMore) {
|
|
||||||
return callback(null, users);
|
|
||||||
}
|
|
||||||
|
|
||||||
self._seek(client, currentCursor, users, callback);
|
|
||||||
});
|
|
||||||
};
|
|
181
test/integration/batch/job-queue.test.js
Normal file
181
test/integration/batch/job-queue.test.js
Normal file
@ -0,0 +1,181 @@
|
|||||||
|
'use strict';
|
||||||
|
|
||||||
|
require('../../helper');
|
||||||
|
var assert = require('../../support/assert');
|
||||||
|
var redisUtils = require('../../support/redis_utils');
|
||||||
|
|
||||||
|
var metadataBackend = require('cartodb-redis')({ pool: redisUtils.getPool() });
|
||||||
|
var JobPublisher = require('../../../batch/pubsub/job-publisher');
|
||||||
|
var JobQueue = require('../../../batch/job_queue');
|
||||||
|
|
||||||
|
var JobBackend = require('../../../batch/job_backend');
|
||||||
|
var JobService = require('../../../batch/job_service');
|
||||||
|
var UserDatabaseMetadataService = require('../../../batch/user_database_metadata_service');
|
||||||
|
var JobCanceller = require('../../../batch/job_canceller');
|
||||||
|
var metadataBackend = require('cartodb-redis')({ pool: redisUtils.getPool() });
|
||||||
|
|
||||||
|
describe('job queue', function () {
|
||||||
|
var pool = redisUtils.getPool();
|
||||||
|
var jobPublisher = new JobPublisher(pool);
|
||||||
|
var jobQueue = new JobQueue(metadataBackend, jobPublisher);
|
||||||
|
var jobBackend = new JobBackend(metadataBackend, jobQueue);
|
||||||
|
var userDatabaseMetadataService = new UserDatabaseMetadataService(metadataBackend);
|
||||||
|
var jobCanceller = new JobCanceller(userDatabaseMetadataService);
|
||||||
|
var jobService = new JobService(jobBackend, jobCanceller);
|
||||||
|
|
||||||
|
var userA = 'userA';
|
||||||
|
var userB = 'userB';
|
||||||
|
|
||||||
|
beforeEach(function () {
|
||||||
|
this.jobQueue = new JobQueue(metadataBackend, jobPublisher);
|
||||||
|
});
|
||||||
|
|
||||||
|
afterEach(function (done) {
|
||||||
|
redisUtils.clean('batch:*', done);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should find queues for one user', function (done) {
|
||||||
|
var self = this;
|
||||||
|
|
||||||
|
this.jobQueue.enqueue(userA, 'wadus-wadus-wadus-wadus', function(err) {
|
||||||
|
if (err) {
|
||||||
|
return done(err);
|
||||||
|
}
|
||||||
|
|
||||||
|
self.jobQueue.scanQueues(function (err, queues) {
|
||||||
|
assert.ifError(err);
|
||||||
|
assert.equal(queues.length, 1);
|
||||||
|
assert.equal(queues[0], userA);
|
||||||
|
return done();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should find queues for more than one user', function (done) {
|
||||||
|
var self = this;
|
||||||
|
|
||||||
|
this.jobQueue.enqueue(userA, 'wadus-wadus-wadus-wadus', function(err) {
|
||||||
|
if (err) {
|
||||||
|
return done(err);
|
||||||
|
}
|
||||||
|
self.jobQueue.enqueue(userB, 'wadus-wadus-wadus-wadus', function(err) {
|
||||||
|
if (err) {
|
||||||
|
return done(err);
|
||||||
|
}
|
||||||
|
|
||||||
|
self.jobQueue.scanQueues(function (err, queues) {
|
||||||
|
assert.ifError(err);
|
||||||
|
assert.equal(queues.length, 2);
|
||||||
|
assert.ok(queues[0] === userA || queues[0] === userB);
|
||||||
|
assert.ok(queues[1] === userA || queues[1] === userB);
|
||||||
|
|
||||||
|
return done();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should find queues from jobs not using new Redis SETs for users', function(done) {
|
||||||
|
var self = this;
|
||||||
|
var redisArgs = [JobQueue.QUEUE.PREFIX + userA, 'wadus-id'];
|
||||||
|
metadataBackend.redisCmd(JobQueue.QUEUE.DB, 'LPUSH', redisArgs, function (err) {
|
||||||
|
assert.ok(!err, err);
|
||||||
|
self.jobQueue.scanQueues(function (err, queues) {
|
||||||
|
assert.ok(!err, err);
|
||||||
|
|
||||||
|
assert.equal(queues.length, 1);
|
||||||
|
assert.equal(queues[0], userA);
|
||||||
|
|
||||||
|
return done();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it('.scanQueues() should feed queue index', function (done) {
|
||||||
|
var self = this;
|
||||||
|
|
||||||
|
var data = {
|
||||||
|
user: 'vizzuality',
|
||||||
|
query: 'select 1 as cartodb_id',
|
||||||
|
host: 'localhost'
|
||||||
|
};
|
||||||
|
|
||||||
|
jobService.create(data, function (err) {
|
||||||
|
if (err) {
|
||||||
|
return done(err);
|
||||||
|
}
|
||||||
|
|
||||||
|
self.jobQueue.scanQueues(function (err, queuesFromScan) {
|
||||||
|
if (err) {
|
||||||
|
return done(err);
|
||||||
|
}
|
||||||
|
|
||||||
|
assert.equal(queuesFromScan.length, 1);
|
||||||
|
assert.ok(queuesFromScan.indexOf(data.user) >= 0);
|
||||||
|
|
||||||
|
self.jobQueue.getQueues(function (err, queuesFromIndex) {
|
||||||
|
if (err) {
|
||||||
|
done(err);
|
||||||
|
}
|
||||||
|
|
||||||
|
assert.equal(queuesFromIndex.length, 1);
|
||||||
|
assert.ok(queuesFromIndex.indexOf(data.user) >= 0);
|
||||||
|
|
||||||
|
redisUtils.clean('batch:*', done);
|
||||||
|
});
|
||||||
|
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it('.scanQueues() should feed queue index with two users', function (done) {
|
||||||
|
var self = this;
|
||||||
|
|
||||||
|
var jobVizzuality = {
|
||||||
|
user: 'vizzuality',
|
||||||
|
query: 'select 1 as cartodb_id',
|
||||||
|
host: 'localhost'
|
||||||
|
};
|
||||||
|
|
||||||
|
var jobWadus = {
|
||||||
|
user: 'wadus',
|
||||||
|
query: 'select 1 as cartodb_id',
|
||||||
|
host: 'localhost'
|
||||||
|
};
|
||||||
|
|
||||||
|
jobService.create(jobVizzuality, function (err) {
|
||||||
|
if (err) {
|
||||||
|
return done(err);
|
||||||
|
}
|
||||||
|
|
||||||
|
jobService.create(jobWadus, function (err) {
|
||||||
|
if (err) {
|
||||||
|
return done(err);
|
||||||
|
}
|
||||||
|
|
||||||
|
self.jobQueue.scanQueues(function (err, queuesFromScan) {
|
||||||
|
if (err) {
|
||||||
|
return done(err);
|
||||||
|
}
|
||||||
|
|
||||||
|
assert.equal(queuesFromScan.length, 2);
|
||||||
|
assert.ok(queuesFromScan.indexOf(jobVizzuality.user) >= 0);
|
||||||
|
assert.ok(queuesFromScan.indexOf(jobWadus.user) >= 0);
|
||||||
|
|
||||||
|
self.jobQueue.getQueues(function (err, queuesFromIndex) {
|
||||||
|
if (err) {
|
||||||
|
done(err);
|
||||||
|
}
|
||||||
|
|
||||||
|
assert.equal(queuesFromIndex.length, 2);
|
||||||
|
assert.ok(queuesFromIndex.indexOf(jobVizzuality.user) >= 0);
|
||||||
|
assert.ok(queuesFromIndex.indexOf(jobWadus.user) >= 0);
|
||||||
|
|
||||||
|
redisUtils.clean('batch:*', done);
|
||||||
|
});
|
||||||
|
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
@ -1,65 +0,0 @@
|
|||||||
'use strict';
|
|
||||||
|
|
||||||
require('../../helper');
|
|
||||||
var assert = require('../../support/assert');
|
|
||||||
var redisUtils = require('../../support/redis_utils');
|
|
||||||
|
|
||||||
var metadataBackend = require('cartodb-redis')({ pool: redisUtils.getPool() });
|
|
||||||
var JobPublisher = require('../../../batch/pubsub/job-publisher');
|
|
||||||
var QueueSeeker = require('../../../batch/pubsub/queue-seeker');
|
|
||||||
var JobQueue = require('../../../batch/job_queue');
|
|
||||||
|
|
||||||
var jobPublisher = new JobPublisher(redisUtils.getPool());
|
|
||||||
|
|
||||||
|
|
||||||
describe('queue seeker', function() {
|
|
||||||
var userA = 'userA';
|
|
||||||
var userB = 'userB';
|
|
||||||
|
|
||||||
beforeEach(function () {
|
|
||||||
this.jobQueue = new JobQueue(metadataBackend, jobPublisher);
|
|
||||||
});
|
|
||||||
|
|
||||||
afterEach(function (done) {
|
|
||||||
redisUtils.clean('batch:*', done);
|
|
||||||
});
|
|
||||||
|
|
||||||
it('should find queues for one user', function (done) {
|
|
||||||
var seeker = new QueueSeeker(redisUtils.getPool());
|
|
||||||
this.jobQueue.enqueue(userA, 'wadus-wadus-wadus-wadus', function(err) {
|
|
||||||
if (err) {
|
|
||||||
return done(err);
|
|
||||||
}
|
|
||||||
seeker.seek(function(err, users) {
|
|
||||||
assert.ok(!err);
|
|
||||||
assert.equal(users.length, 1);
|
|
||||||
assert.equal(users[0], userA);
|
|
||||||
|
|
||||||
return done();
|
|
||||||
});
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
it('should find queues for more than one user', function (done) {
|
|
||||||
var self = this;
|
|
||||||
var seeker = new QueueSeeker(redisUtils.getPool());
|
|
||||||
this.jobQueue.enqueue(userA, 'wadus-wadus-wadus-wadus', function(err) {
|
|
||||||
if (err) {
|
|
||||||
return done(err);
|
|
||||||
}
|
|
||||||
self.jobQueue.enqueue(userB, 'wadus-wadus-wadus-wadus', function(err) {
|
|
||||||
if (err) {
|
|
||||||
return done(err);
|
|
||||||
}
|
|
||||||
seeker.seek(function(err, users) {
|
|
||||||
assert.ok(!err);
|
|
||||||
assert.equal(users.length, 2);
|
|
||||||
assert.ok(users[0] === userA || users[0] === userB);
|
|
||||||
assert.ok(users[1] === userA || users[1] === userB);
|
|
||||||
|
|
||||||
return done();
|
|
||||||
});
|
|
||||||
});
|
|
||||||
});
|
|
||||||
});
|
|
||||||
});
|
|
@ -9,6 +9,12 @@ describe('batch API job queue', function () {
|
|||||||
process.nextTick(function () {
|
process.nextTick(function () {
|
||||||
callback(null, 'irrelevantJob');
|
callback(null, 'irrelevantJob');
|
||||||
});
|
});
|
||||||
|
},
|
||||||
|
redisMultiCmd: function () {
|
||||||
|
var callback = arguments[arguments.length -1];
|
||||||
|
process.nextTick(function () {
|
||||||
|
callback(null, 'irrelevantJob');
|
||||||
|
});
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
this.jobPublisher = {
|
this.jobPublisher = {
|
||||||
|
@ -30,7 +30,10 @@ describe('batch API job subscriber', function () {
|
|||||||
removeAllListeners: function () {
|
removeAllListeners: function () {
|
||||||
return this;
|
return this;
|
||||||
},
|
},
|
||||||
connected: true
|
smembers: function (key, callback) {
|
||||||
|
callback(null, []);
|
||||||
|
},
|
||||||
|
connected: true,
|
||||||
};
|
};
|
||||||
this.pool = {
|
this.pool = {
|
||||||
acquire: function (db, cb) {
|
acquire: function (db, cb) {
|
||||||
|
Loading…
Reference in New Issue
Block a user