2017-04-04 01:26:21 +08:00
|
|
|
'use strict';
|
|
|
|
|
|
|
|
require('../../helper');
|
|
|
|
var assert = require('../../support/assert');
|
|
|
|
var redisUtils = require('../../support/redis_utils');
|
|
|
|
|
|
|
|
var metadataBackend = require('cartodb-redis')({ pool: redisUtils.getPool() });
|
2019-04-04 20:31:41 +08:00
|
|
|
var BatchLogger = require('../../../batch/batch-logger');
|
2017-04-04 01:26:21 +08:00
|
|
|
var JobPublisher = require('../../../batch/pubsub/job-publisher');
|
|
|
|
var JobQueue = require('../../../batch/job_queue');
|
|
|
|
|
|
|
|
var JobBackend = require('../../../batch/job_backend');
|
|
|
|
var JobService = require('../../../batch/job_service');
|
|
|
|
var JobCanceller = require('../../../batch/job_canceller');
|
|
|
|
var metadataBackend = require('cartodb-redis')({ pool: redisUtils.getPool() });
|
|
|
|
|
|
|
|
describe('job queue', function () {
|
|
|
|
var pool = redisUtils.getPool();
|
2019-04-04 20:31:41 +08:00
|
|
|
var logger = new BatchLogger(null, 'batch-queries');
|
2017-04-04 01:26:21 +08:00
|
|
|
var jobPublisher = new JobPublisher(pool);
|
2019-04-04 20:31:41 +08:00
|
|
|
var jobQueue = new JobQueue(metadataBackend, jobPublisher, logger);
|
|
|
|
var jobBackend = new JobBackend(metadataBackend, jobQueue, logger);
|
2018-05-30 23:36:55 +08:00
|
|
|
var jobCanceller = new JobCanceller();
|
2019-04-04 20:31:41 +08:00
|
|
|
var jobService = new JobService(jobBackend, jobCanceller, logger);
|
2017-04-04 01:26:21 +08:00
|
|
|
|
|
|
|
var userA = 'userA';
|
|
|
|
var userB = 'userB';
|
|
|
|
|
|
|
|
beforeEach(function () {
|
2019-04-04 20:31:41 +08:00
|
|
|
this.jobQueue = new JobQueue(metadataBackend, jobPublisher, logger);
|
2017-04-04 01:26:21 +08:00
|
|
|
});
|
|
|
|
|
|
|
|
afterEach(function (done) {
|
|
|
|
redisUtils.clean('batch:*', done);
|
|
|
|
});
|
|
|
|
|
|
|
|
it('should find queues for one user', function (done) {
|
|
|
|
var self = this;
|
|
|
|
|
|
|
|
this.jobQueue.enqueue(userA, 'wadus-wadus-wadus-wadus', function(err) {
|
|
|
|
if (err) {
|
|
|
|
return done(err);
|
|
|
|
}
|
|
|
|
|
|
|
|
self.jobQueue.scanQueues(function (err, queues) {
|
|
|
|
assert.ifError(err);
|
|
|
|
assert.equal(queues.length, 1);
|
|
|
|
assert.equal(queues[0], userA);
|
|
|
|
return done();
|
|
|
|
});
|
|
|
|
});
|
|
|
|
});
|
|
|
|
|
|
|
|
it('should find queues for more than one user', function (done) {
|
|
|
|
var self = this;
|
|
|
|
|
|
|
|
this.jobQueue.enqueue(userA, 'wadus-wadus-wadus-wadus', function(err) {
|
|
|
|
if (err) {
|
|
|
|
return done(err);
|
|
|
|
}
|
|
|
|
self.jobQueue.enqueue(userB, 'wadus-wadus-wadus-wadus', function(err) {
|
|
|
|
if (err) {
|
|
|
|
return done(err);
|
|
|
|
}
|
|
|
|
|
|
|
|
self.jobQueue.scanQueues(function (err, queues) {
|
|
|
|
assert.ifError(err);
|
|
|
|
assert.equal(queues.length, 2);
|
|
|
|
assert.ok(queues[0] === userA || queues[0] === userB);
|
|
|
|
assert.ok(queues[1] === userA || queues[1] === userB);
|
|
|
|
|
|
|
|
return done();
|
|
|
|
});
|
|
|
|
});
|
|
|
|
});
|
|
|
|
});
|
|
|
|
|
2017-04-04 21:42:34 +08:00
|
|
|
it('should find queues from jobs not using new Redis SETs for users', function(done) {
|
|
|
|
var self = this;
|
|
|
|
var redisArgs = [JobQueue.QUEUE.PREFIX + userA, 'wadus-id'];
|
|
|
|
metadataBackend.redisCmd(JobQueue.QUEUE.DB, 'LPUSH', redisArgs, function (err) {
|
|
|
|
assert.ok(!err, err);
|
|
|
|
self.jobQueue.scanQueues(function (err, queues) {
|
|
|
|
assert.ok(!err, err);
|
|
|
|
|
|
|
|
assert.equal(queues.length, 1);
|
|
|
|
assert.equal(queues[0], userA);
|
|
|
|
|
|
|
|
return done();
|
|
|
|
});
|
|
|
|
});
|
|
|
|
});
|
|
|
|
|
2017-04-04 01:26:21 +08:00
|
|
|
it('.scanQueues() should feed queue index', function (done) {
|
|
|
|
var self = this;
|
|
|
|
|
|
|
|
var data = {
|
|
|
|
user: 'vizzuality',
|
|
|
|
query: 'select 1 as cartodb_id',
|
|
|
|
host: 'localhost'
|
|
|
|
};
|
|
|
|
|
|
|
|
jobService.create(data, function (err) {
|
|
|
|
if (err) {
|
|
|
|
return done(err);
|
|
|
|
}
|
|
|
|
|
|
|
|
self.jobQueue.scanQueues(function (err, queuesFromScan) {
|
|
|
|
if (err) {
|
|
|
|
return done(err);
|
|
|
|
}
|
|
|
|
|
|
|
|
assert.equal(queuesFromScan.length, 1);
|
2017-04-04 15:50:45 +08:00
|
|
|
assert.ok(queuesFromScan.indexOf(data.user) >= 0);
|
2017-04-04 01:26:21 +08:00
|
|
|
|
|
|
|
self.jobQueue.getQueues(function (err, queuesFromIndex) {
|
|
|
|
if (err) {
|
|
|
|
done(err);
|
|
|
|
}
|
|
|
|
|
|
|
|
assert.equal(queuesFromIndex.length, 1);
|
2017-04-04 15:50:45 +08:00
|
|
|
assert.ok(queuesFromIndex.indexOf(data.user) >= 0);
|
2017-04-04 01:26:21 +08:00
|
|
|
|
|
|
|
redisUtils.clean('batch:*', done);
|
|
|
|
});
|
|
|
|
|
|
|
|
});
|
|
|
|
});
|
|
|
|
});
|
|
|
|
|
2017-04-04 15:50:45 +08:00
|
|
|
it('.scanQueues() should feed queue index with two users', function (done) {
|
2017-04-04 01:26:21 +08:00
|
|
|
var self = this;
|
|
|
|
|
|
|
|
var jobVizzuality = {
|
|
|
|
user: 'vizzuality',
|
|
|
|
query: 'select 1 as cartodb_id',
|
|
|
|
host: 'localhost'
|
|
|
|
};
|
|
|
|
|
|
|
|
var jobWadus = {
|
|
|
|
user: 'wadus',
|
|
|
|
query: 'select 1 as cartodb_id',
|
|
|
|
host: 'localhost'
|
|
|
|
};
|
|
|
|
|
|
|
|
jobService.create(jobVizzuality, function (err) {
|
|
|
|
if (err) {
|
|
|
|
return done(err);
|
|
|
|
}
|
|
|
|
|
|
|
|
jobService.create(jobWadus, function (err) {
|
|
|
|
if (err) {
|
|
|
|
return done(err);
|
|
|
|
}
|
|
|
|
|
|
|
|
self.jobQueue.scanQueues(function (err, queuesFromScan) {
|
|
|
|
if (err) {
|
|
|
|
return done(err);
|
|
|
|
}
|
|
|
|
|
|
|
|
assert.equal(queuesFromScan.length, 2);
|
2017-04-04 15:50:45 +08:00
|
|
|
assert.ok(queuesFromScan.indexOf(jobVizzuality.user) >= 0);
|
|
|
|
assert.ok(queuesFromScan.indexOf(jobWadus.user) >= 0);
|
2017-04-04 01:26:21 +08:00
|
|
|
|
|
|
|
self.jobQueue.getQueues(function (err, queuesFromIndex) {
|
|
|
|
if (err) {
|
|
|
|
done(err);
|
|
|
|
}
|
|
|
|
|
|
|
|
assert.equal(queuesFromIndex.length, 2);
|
2017-04-04 15:50:45 +08:00
|
|
|
assert.ok(queuesFromIndex.indexOf(jobVizzuality.user) >= 0);
|
|
|
|
assert.ok(queuesFromIndex.indexOf(jobWadus.user) >= 0);
|
2017-04-04 01:26:21 +08:00
|
|
|
|
|
|
|
redisUtils.clean('batch:*', done);
|
|
|
|
});
|
|
|
|
|
|
|
|
});
|
|
|
|
});
|
|
|
|
});
|
|
|
|
});
|
|
|
|
});
|