Improved namespace in redis for batch's stuff

This commit is contained in:
Daniel García Aubert 2015-12-29 15:46:04 +01:00
parent 851c393f75
commit 8c42019641
10 changed files with 50 additions and 38 deletions

View File

@ -23,10 +23,10 @@ Batch.prototype.start = function () {
if (!queue) {
queue = self.jobQueuePool.add(host);
run(queue);
consume(queue);
}
function run(queue) {
function consume(queue) {
queue.dequeue(host, function (err, job_id) {
if (err) {
self.jobQueuePool.remove(host);
@ -42,12 +42,12 @@ Batch.prototype.start = function () {
.on('done', function (job) {
console.log('Job %s done in %s', job_id, host);
self.emit('job:done', job_id);
run(queue);
consume(queue); // recursive call
})
.on('failed', function (job) {
console.log('Job %s done in %s', job_id, host);
self.emit('job:failed', job_id);
run(queue);
consume(queue); // recursive call
})
.on('error', function (err) {
self.emit('job:failed', job_id);

View File

@ -12,6 +12,7 @@ function JobBackend(metadataBackend, jobQueueProducer, jobPublisher, userIndexer
this.jobPublisher = jobPublisher;
this.userIndexer = userIndexer;
this.db = 5;
this.redisPrefix = 'batch:jobs:';
}
util.inherits(JobBackend, EventEmitter);
@ -20,7 +21,7 @@ JobBackend.prototype.create = function (username, sql, host, callback) {
var job_id = uuid.v4();
var now = new Date().toISOString();
var redisParams = [
'job:' + job_id,
this.redisPrefix + job_id,
'user', username,
'status', 'pending',
'query', sql,
@ -76,7 +77,7 @@ JobBackend.prototype.list = function (username, callback) {
JobBackend.prototype.get = function (job_id, callback) {
var redisParams = [
'job:' + job_id,
this.redisPrefix + job_id,
'user',
'status',
'query',
@ -113,7 +114,7 @@ JobBackend.prototype.get = function (job_id, callback) {
JobBackend.prototype.setRunning = function (job) {
var self = this;
var redisParams = [
'job:' + job.job_id,
this.redisPrefix + job.job_id,
'status', 'running',
'updated_at', new Date().toISOString()
];
@ -130,7 +131,7 @@ JobBackend.prototype.setRunning = function (job) {
JobBackend.prototype.setDone = function (job) {
var self = this;
var redisParams = [
'job:' + job.job_id,
this.redisPrefix + job.job_id,
'status', 'done',
'updated_at', new Date().toISOString()
];
@ -147,7 +148,7 @@ JobBackend.prototype.setDone = function (job) {
JobBackend.prototype.setFailed = function (job, err) {
var self = this;
var redisParams = [
'job:' + job.job_id,
this.redisPrefix + job.job_id,
'status', 'failed',
'failed_reason', err.message,
'updated_at', new Date().toISOString()

View File

@ -3,7 +3,7 @@
var redis = require('redis');
function JobPublisher() {
this.channel = 'host:job';
this.channel = 'batch:hosts';
this.client = redis.createClient(global.settings.redis_port, global.settings.redis_host);
}

View File

@ -3,20 +3,15 @@
function JobQueue(metadataBackend) {
this.metadataBackend = metadataBackend;
this.db = 5;
this.prefixRedis = 'batch:queues:';
}
JobQueue.prototype.enqueue = function (job_id, host, callback) {
var db = this.db;
var queue = 'queue:' + host;
this.metadataBackend.redisCmd(db, 'LPUSH', [queue, job_id], callback);
this.metadataBackend.redisCmd(this.db, 'LPUSH', [ this.prefixRedis + host, job_id ], callback);
};
JobQueue.prototype.dequeue = function (host, callback) {
var db = this.db;
var queue = 'queue:' + host;
this.metadataBackend.redisCmd(this.db, 'RPOP', [ queue ], callback);
this.metadataBackend.redisCmd(this.db, 'RPOP', [ this.prefixRedis + host ], callback);
};
module.exports = JobQueue;

View File

@ -11,9 +11,6 @@ JobQueuePool.prototype.get = function (host) {
return this.queues[host];
};
JobQueuePool.prototype.tap = function (host) {
};
JobQueuePool.prototype.list = function () {
return Object.keys(this.queues);
};

View File

@ -3,7 +3,7 @@
var redis = require('redis');
function JobSubscriber() {
this.channel = 'host:job';
this.channel = 'batch:hosts';
this.client = redis.createClient(global.settings.redis_port, global.settings.redis_host);
}

View File

@ -3,24 +3,15 @@
function UserIndexer(metadataBackend) {
this.metadataBackend = metadataBackend;
this.db = 5;
this.prefixRedis = 'batch:users:';
}
UserIndexer.prototype.add = function (username, job_id, callback) {
this.metadataBackend.redisCmd(this.db, 'SADD', [username, job_id] , function (err) {
if (err) {
return callback(err);
}
callback();
});
this.metadataBackend.redisCmd(this.db, 'SADD', [ this.prefixRedis + username, job_id ] , callback);
};
UserIndexer.prototype.list = function (username, callback) {
this.metadataBackend.redisCmd(this.db, 'SMEMBERS', [username] , function (err, job_ids) {
if (err) {
return callback(err);
}
callback(null, job_ids);
});
this.metadataBackend.redisCmd(this.db, 'SMEMBERS', [ this.prefixRedis + username ] , callback);
};
module.exports = UserIndexer;

View File

@ -13,7 +13,7 @@ var metadataBackend = require('cartodb-redis')({
reapIntervalMillis: global.settings.redisReapIntervalMillis
});
describe('batch', function() {
describe('batch module', function() {
var dbInstance = 'localhost';
var username = 'vizzuality';
var jobQueue = new JobQueue(metadataBackend);

View File

@ -18,7 +18,7 @@ var app = require(global.settings.app_root + '/app/app')();
var assert = require('../support/assert');
var querystring = require('querystring');
describe('job', function() {
describe('job module', function() {
var job = {};
it('POST /api/v2/job', function (done){
@ -34,6 +34,7 @@ describe('job', function() {
}, function(res) {
job = JSON.parse(res.body);
assert.deepEqual(res.headers['content-type'], 'application/json; charset=utf-8');
assert.ok(job.job_id);
assert.equal(job.query, "SELECT * FROM untitle_table_4");
assert.equal(job.user, "vizzuality");
done();
@ -56,4 +57,22 @@ describe('job', function() {
});
});
it('GET /api/v2/job/', function (done){
assert.response(app, {
url: '/api/v2/job?api_key=1234',
headers: { 'host': 'vizzuality.cartodb.com', 'Content-Type': 'application/x-www-form-urlencoded' },
method: 'GET'
}, {
status: 200
}, function(res) {
var jobs = JSON.parse(res.body);
assert.deepEqual(res.headers['content-type'], 'application/json; charset=utf-8');
assert.ok(jobs instanceof Array);
assert.ok(jobs.length > 0);
assert.ok(jobs[0].job_id);
assert.ok(jobs[0].status);
assert.ok(jobs[0].query);
done();
});
});
});

View File

@ -135,8 +135,17 @@ EOF
# delete previous jobs
cat <<EOF | redis-cli -p ${REDIS_PORT} -n 5
EVAL "return redis.call('del', unpack(redis.call('keys', ARGV[1])))" 0 job:*
DEL queue:localhost
EVAL "return redis.call('del', unpack(redis.call('keys', ARGV[1])))" 0 batch:jobs:*
EOF
# delete job queue
cat <<EOF | redis-cli -p ${REDIS_PORT} -n 5
DEL batch:queues:localhost
EOF
# delete user index
cat <<EOF | redis-cli -p ${REDIS_PORT} -n 5
DEL batch:users:vizzuality
EOF
fi