From 8c42019641ca98f8460941b3b2de38e8e559cc62 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Garc=C3=ADa=20Aubert?= Date: Tue, 29 Dec 2015 15:46:04 +0100 Subject: [PATCH] Improved namespace in redis for batch's stuff --- batch/batch.js | 8 ++++---- batch/job_backend.js | 11 ++++++----- batch/job_publisher.js | 2 +- batch/job_queue.js | 11 +++-------- batch/job_queue_pool.js | 3 --- batch/job_subscriber.js | 2 +- batch/user_indexer.js | 15 +++------------ test/acceptance/batch.test.js | 2 +- test/acceptance/job.test.js | 21 ++++++++++++++++++++- test/prepare_db.sh | 13 +++++++++++-- 10 files changed, 50 insertions(+), 38 deletions(-) diff --git a/batch/batch.js b/batch/batch.js index a9179f16..6453881c 100644 --- a/batch/batch.js +++ b/batch/batch.js @@ -23,10 +23,10 @@ Batch.prototype.start = function () { if (!queue) { queue = self.jobQueuePool.add(host); - run(queue); + consume(queue); } - function run(queue) { + function consume(queue) { queue.dequeue(host, function (err, job_id) { if (err) { self.jobQueuePool.remove(host); @@ -42,12 +42,12 @@ Batch.prototype.start = function () { .on('done', function (job) { console.log('Job %s done in %s', job_id, host); self.emit('job:done', job_id); - run(queue); + consume(queue); // recursive call }) .on('failed', function (job) { console.log('Job %s done in %s', job_id, host); self.emit('job:failed', job_id); - run(queue); + consume(queue); // recursive call }) .on('error', function (err) { self.emit('job:failed', job_id); diff --git a/batch/job_backend.js b/batch/job_backend.js index 3907097a..f81be039 100644 --- a/batch/job_backend.js +++ b/batch/job_backend.js @@ -12,6 +12,7 @@ function JobBackend(metadataBackend, jobQueueProducer, jobPublisher, userIndexer this.jobPublisher = jobPublisher; this.userIndexer = userIndexer; this.db = 5; + this.redisPrefix = 'batch:jobs:'; } util.inherits(JobBackend, EventEmitter); @@ -20,7 +21,7 @@ JobBackend.prototype.create = function (username, sql, host, callback) { var job_id = uuid.v4(); var now = new Date().toISOString(); var redisParams = [ - 'job:' + job_id, + this.redisPrefix + job_id, 'user', username, 'status', 'pending', 'query', sql, @@ -76,7 +77,7 @@ JobBackend.prototype.list = function (username, callback) { JobBackend.prototype.get = function (job_id, callback) { var redisParams = [ - 'job:' + job_id, + this.redisPrefix + job_id, 'user', 'status', 'query', @@ -113,7 +114,7 @@ JobBackend.prototype.get = function (job_id, callback) { JobBackend.prototype.setRunning = function (job) { var self = this; var redisParams = [ - 'job:' + job.job_id, + this.redisPrefix + job.job_id, 'status', 'running', 'updated_at', new Date().toISOString() ]; @@ -130,7 +131,7 @@ JobBackend.prototype.setRunning = function (job) { JobBackend.prototype.setDone = function (job) { var self = this; var redisParams = [ - 'job:' + job.job_id, + this.redisPrefix + job.job_id, 'status', 'done', 'updated_at', new Date().toISOString() ]; @@ -147,7 +148,7 @@ JobBackend.prototype.setDone = function (job) { JobBackend.prototype.setFailed = function (job, err) { var self = this; var redisParams = [ - 'job:' + job.job_id, + this.redisPrefix + job.job_id, 'status', 'failed', 'failed_reason', err.message, 'updated_at', new Date().toISOString() diff --git a/batch/job_publisher.js b/batch/job_publisher.js index e6372ab9..b55f6d72 100644 --- a/batch/job_publisher.js +++ b/batch/job_publisher.js @@ -3,7 +3,7 @@ var redis = require('redis'); function JobPublisher() { - this.channel = 'host:job'; + this.channel = 'batch:hosts'; this.client = redis.createClient(global.settings.redis_port, global.settings.redis_host); } diff --git a/batch/job_queue.js b/batch/job_queue.js index 3c84eb3b..2647b07a 100644 --- a/batch/job_queue.js +++ b/batch/job_queue.js @@ -3,20 +3,15 @@ function JobQueue(metadataBackend) { this.metadataBackend = metadataBackend; this.db = 5; + this.prefixRedis = 'batch:queues:'; } JobQueue.prototype.enqueue = function (job_id, host, callback) { - var db = this.db; - var queue = 'queue:' + host; - - this.metadataBackend.redisCmd(db, 'LPUSH', [queue, job_id], callback); + this.metadataBackend.redisCmd(this.db, 'LPUSH', [ this.prefixRedis + host, job_id ], callback); }; JobQueue.prototype.dequeue = function (host, callback) { - var db = this.db; - var queue = 'queue:' + host; - - this.metadataBackend.redisCmd(this.db, 'RPOP', [ queue ], callback); + this.metadataBackend.redisCmd(this.db, 'RPOP', [ this.prefixRedis + host ], callback); }; module.exports = JobQueue; diff --git a/batch/job_queue_pool.js b/batch/job_queue_pool.js index 502f849a..dcd2d487 100644 --- a/batch/job_queue_pool.js +++ b/batch/job_queue_pool.js @@ -11,9 +11,6 @@ JobQueuePool.prototype.get = function (host) { return this.queues[host]; }; -JobQueuePool.prototype.tap = function (host) { -}; - JobQueuePool.prototype.list = function () { return Object.keys(this.queues); }; diff --git a/batch/job_subscriber.js b/batch/job_subscriber.js index 7d502c17..1c3d5aaf 100644 --- a/batch/job_subscriber.js +++ b/batch/job_subscriber.js @@ -3,7 +3,7 @@ var redis = require('redis'); function JobSubscriber() { - this.channel = 'host:job'; + this.channel = 'batch:hosts'; this.client = redis.createClient(global.settings.redis_port, global.settings.redis_host); } diff --git a/batch/user_indexer.js b/batch/user_indexer.js index 1cfd7ebb..b2c04ec8 100644 --- a/batch/user_indexer.js +++ b/batch/user_indexer.js @@ -3,24 +3,15 @@ function UserIndexer(metadataBackend) { this.metadataBackend = metadataBackend; this.db = 5; + this.prefixRedis = 'batch:users:'; } UserIndexer.prototype.add = function (username, job_id, callback) { - this.metadataBackend.redisCmd(this.db, 'SADD', [username, job_id] , function (err) { - if (err) { - return callback(err); - } - callback(); - }); + this.metadataBackend.redisCmd(this.db, 'SADD', [ this.prefixRedis + username, job_id ] , callback); }; UserIndexer.prototype.list = function (username, callback) { - this.metadataBackend.redisCmd(this.db, 'SMEMBERS', [username] , function (err, job_ids) { - if (err) { - return callback(err); - } - callback(null, job_ids); - }); + this.metadataBackend.redisCmd(this.db, 'SMEMBERS', [ this.prefixRedis + username ] , callback); }; module.exports = UserIndexer; diff --git a/test/acceptance/batch.test.js b/test/acceptance/batch.test.js index 98268fea..d69b8cec 100644 --- a/test/acceptance/batch.test.js +++ b/test/acceptance/batch.test.js @@ -13,7 +13,7 @@ var metadataBackend = require('cartodb-redis')({ reapIntervalMillis: global.settings.redisReapIntervalMillis }); -describe('batch', function() { +describe('batch module', function() { var dbInstance = 'localhost'; var username = 'vizzuality'; var jobQueue = new JobQueue(metadataBackend); diff --git a/test/acceptance/job.test.js b/test/acceptance/job.test.js index 59c20441..0531510f 100644 --- a/test/acceptance/job.test.js +++ b/test/acceptance/job.test.js @@ -18,7 +18,7 @@ var app = require(global.settings.app_root + '/app/app')(); var assert = require('../support/assert'); var querystring = require('querystring'); -describe('job', function() { +describe('job module', function() { var job = {}; it('POST /api/v2/job', function (done){ @@ -34,6 +34,7 @@ describe('job', function() { }, function(res) { job = JSON.parse(res.body); assert.deepEqual(res.headers['content-type'], 'application/json; charset=utf-8'); + assert.ok(job.job_id); assert.equal(job.query, "SELECT * FROM untitle_table_4"); assert.equal(job.user, "vizzuality"); done(); @@ -56,4 +57,22 @@ describe('job', function() { }); }); + it('GET /api/v2/job/', function (done){ + assert.response(app, { + url: '/api/v2/job?api_key=1234', + headers: { 'host': 'vizzuality.cartodb.com', 'Content-Type': 'application/x-www-form-urlencoded' }, + method: 'GET' + }, { + status: 200 + }, function(res) { + var jobs = JSON.parse(res.body); + assert.deepEqual(res.headers['content-type'], 'application/json; charset=utf-8'); + assert.ok(jobs instanceof Array); + assert.ok(jobs.length > 0); + assert.ok(jobs[0].job_id); + assert.ok(jobs[0].status); + assert.ok(jobs[0].query); + done(); + }); + }); }); diff --git a/test/prepare_db.sh b/test/prepare_db.sh index 4b68f944..92a17fa0 100755 --- a/test/prepare_db.sh +++ b/test/prepare_db.sh @@ -135,8 +135,17 @@ EOF # delete previous jobs cat <