Merge pull request #345 from CartoDB/fix-batch-api-test

Fix batch api test
This commit is contained in:
Raul Ochoa 2016-07-22 18:16:37 +02:00 committed by GitHub
commit 01d1866e11
27 changed files with 1033 additions and 102 deletions

View File

@ -13,8 +13,9 @@ jshint:
@echo "***jshint***" @echo "***jshint***"
@./node_modules/.bin/jshint app/ batch/ test/ app.js @./node_modules/.bin/jshint app/ batch/ test/ app.js
TEST_SUITE := $(shell find test/{acceptance,unit} -name "*.js") TEST_SUITE := $(shell find test/{acceptance,unit,integration} -name "*.js")
TEST_SUITE_UNIT := $(shell find test/unit -name "*.js") TEST_SUITE_UNIT := $(shell find test/unit -name "*.js")
TEST_SUITE_INTEGRATION := $(shell find test/integration -name "*.js")
TEST_SUITE_ACCEPTANCE := $(shell find test/acceptance -name "*.js") TEST_SUITE_ACCEPTANCE := $(shell find test/acceptance -name "*.js")
test: test:
@ -25,6 +26,10 @@ test-unit:
@echo "***unit tests***" @echo "***unit tests***"
@$(SHELL) test/run_tests.sh ${RUNTESTFLAGS} $(TEST_SUITE_UNIT) @$(SHELL) test/run_tests.sh ${RUNTESTFLAGS} $(TEST_SUITE_UNIT)
test-integration:
@echo "***integration tests***"
@$(SHELL) test/run_tests.sh ${RUNTESTFLAGS} $(TEST_SUITE_INTEGRATION)
test-acceptance: test-acceptance:
@echo "***acceptance tests***" @echo "***acceptance tests***"
@$(SHELL) test/run_tests.sh ${RUNTESTFLAGS} $(TEST_SUITE_ACCEPTANCE) @$(SHELL) test/run_tests.sh ${RUNTESTFLAGS} $(TEST_SUITE_ACCEPTANCE)

View File

@ -25,7 +25,7 @@ Batch.prototype.start = function () {
Batch.prototype._subscribe = function () { Batch.prototype._subscribe = function () {
var self = this; var self = this;
this.jobSubscriber.subscribe(function (channel, host) { this.jobSubscriber.subscribe(function onMessage(channel, host) {
var queue = self.jobQueuePool.getQueue(host); var queue = self.jobQueuePool.getQueue(host);
// there is nothing to do. It is already running jobs // there is nothing to do. It is already running jobs
@ -46,6 +46,12 @@ Batch.prototype._subscribe = function () {
debug(err); debug(err);
}); });
}, function (err) {
if (err) {
return self.emit('error', err);
}
self.emit('ready');
}); });
}; };

View File

@ -64,7 +64,7 @@ function toObject(job_id, redisParams, redisValues) {
} }
function isJobFound(redisValues) { function isJobFound(redisValues) {
return redisValues[0] && redisValues[1] && redisValues[2] && redisValues[3] && redisValues[4]; return !!(redisValues[0] && redisValues[1] && redisValues[2] && redisValues[3] && redisValues[4]);
} }
JobBackend.prototype.get = function (job_id, callback) { JobBackend.prototype.get = function (job_id, callback) {
@ -132,6 +132,7 @@ JobBackend.prototype.update = function (job, callback) {
var self = this; var self = this;
self.get(job.job_id, function (err) { self.get(job.job_id, function (err) {
if (err) { if (err) {
return callback(err); return callback(err);
} }

View File

@ -6,12 +6,7 @@ var error = require('./util/debug')('pubsub:subscriber:error');
var DB = 0; var DB = 0;
var SUBSCRIBE_INTERVAL_IN_MILLISECONDS = 10 * 60 * 1000; // 10 minutes var SUBSCRIBE_INTERVAL_IN_MILLISECONDS = 10 * 60 * 1000; // 10 minutes
function _subscribe(client, channel, queueSeeker, onMessage) { function _subscribe(client, channel, queueSeeker, onMessage, callback) {
queueSeeker.seek(onMessage, function (err) {
if (err) {
error(err);
}
client.removeAllListeners('message'); client.removeAllListeners('message');
client.unsubscribe(channel); client.unsubscribe(channel);
@ -21,6 +16,21 @@ function _subscribe(client, channel, queueSeeker, onMessage) {
debug('message received from: ' + channel + ':' + host); debug('message received from: ' + channel + ':' + host);
onMessage(channel, host); onMessage(channel, host);
}); });
queueSeeker.seek(onMessage, function (err) {
if (err) {
error(err);
if (callback) {
callback(err);
}
} else {
debug('queues found successfully');
if (callback) {
callback();
}
}
}); });
} }
@ -32,7 +42,7 @@ function JobSubscriber(pool, queueSeeker) {
module.exports = JobSubscriber; module.exports = JobSubscriber;
JobSubscriber.prototype.subscribe = function (onMessage) { JobSubscriber.prototype.subscribe = function (onMessage, callback) {
var self = this; var self = this;
this.pool.acquire(DB, function (err, client) { this.pool.acquire(DB, function (err, client) {
@ -42,8 +52,6 @@ JobSubscriber.prototype.subscribe = function (onMessage) {
self.client = client; self.client = client;
_subscribe(self.client, self.channel, self.queueSeeker, onMessage);
self.seekerInterval = setInterval( self.seekerInterval = setInterval(
_subscribe, _subscribe,
SUBSCRIBE_INTERVAL_IN_MILLISECONDS, SUBSCRIBE_INTERVAL_IN_MILLISECONDS,
@ -52,6 +60,8 @@ JobSubscriber.prototype.subscribe = function (onMessage) {
self.queueSeeker, self.queueSeeker,
onMessage onMessage
); );
_subscribe(self.client, self.channel, self.queueSeeker, onMessage, callback);
}); });
}; };

View File

@ -1,4 +1,5 @@
var assert = require('../support/assert'); var assert = require('../support/assert');
var redisUtils = require('../support/redis_utils');
var _ = require('underscore'); var _ = require('underscore');
var RedisPool = require('redis-mpool'); var RedisPool = require('redis-mpool');
var queue = require('queue-async'); var queue = require('queue-async');
@ -34,15 +35,14 @@ describe('batch module', function() {
var batch = batchFactory(metadataBackend, redisConfig); var batch = batchFactory(metadataBackend, redisConfig);
before(function () { before(function (done) {
batch.start(); batch.start();
batch.on('ready', done);
}); });
after(function (done) { after(function (done) {
batch.stop(); batch.stop();
batch.drain(function () { redisUtils.clean('batch:*', done);
metadataBackend.redisCmd(5, 'DEL', [ 'batch:queues:localhost' ], done);
});
}); });
function createJob(sql, done) { function createJob(sql, done) {

View File

@ -1,6 +1,7 @@
require('../helper'); require('../helper');
var assert = require('../support/assert'); var assert = require('../support/assert');
var redisUtils = require('../support/redis_utils');
var app = require(global.settings.app_root + '/app/app')(); var app = require(global.settings.app_root + '/app/app')();
var querystring = require('qs'); var querystring = require('qs');
var redisConfig = { var redisConfig = {
@ -96,15 +97,14 @@ describe('Batch API callback templates', function () {
var batch = batchFactory(metadataBackend, redisConfig); var batch = batchFactory(metadataBackend, redisConfig);
before(function () { before(function (done) {
batch.start(); batch.start();
batch.on('ready', done);
}); });
after(function (done) { after(function (done) {
batch.stop(); batch.stop();
batch.drain(function () { redisUtils.clean('batch:*', done);
metadataBackend.redisCmd(5, 'DEL', [ 'batch:queues:localhost' ], done);
});
}); });
describe.skip('should use templates for error_message and job_id onerror callback', function () { describe.skip('should use templates for error_message and job_id onerror callback', function () {

View File

@ -1,6 +1,7 @@
require('../helper'); require('../helper');
var assert = require('../support/assert'); var assert = require('../support/assert');
var redisUtils = require('../support/redis_utils');
var app = require(global.settings.app_root + '/app/app')(); var app = require(global.settings.app_root + '/app/app')();
var querystring = require('qs'); var querystring = require('qs');
var redisConfig = { var redisConfig = {
@ -37,15 +38,14 @@ describe('Batch API fallback job', function () {
var batch = batchFactory(metadataBackend, redisConfig); var batch = batchFactory(metadataBackend, redisConfig);
before(function () { before(function (done) {
batch.start(); batch.start();
batch.on('ready', done);
}); });
after(function (done) { after(function (done) {
batch.stop(); batch.stop();
batch.drain(function () { redisUtils.clean('batch:*', done);
metadataBackend.redisCmd(5, 'DEL', [ 'batch:queues:localhost' ], done);
});
}); });
describe('"onsuccess" on first query should be triggered', function () { describe('"onsuccess" on first query should be triggered', function () {

View File

@ -14,17 +14,10 @@
*/ */
require('../helper'); require('../helper');
var JobController = require('../../app/controllers/job_controller'); var JobController = require('../../app/controllers/job_controller');
var redisUtils = require('../support/redis_utils');
var app = require(global.settings.app_root + '/app/app')(); var app = require(global.settings.app_root + '/app/app')();
var assert = require('../support/assert'); var assert = require('../support/assert');
var querystring = require('qs'); var querystring = require('qs');
var metadataBackend = require('cartodb-redis')({
host: global.settings.redis_host,
port: global.settings.redis_port,
max: global.settings.redisPool,
idleTimeoutMillis: global.settings.redisIdleTimeoutMillis,
reapIntervalMillis: global.settings.redisReapIntervalMillis
});
function payload(query) { function payload(query) {
return JSON.stringify({query: query}); return JSON.stringify({query: query});
@ -44,9 +37,7 @@ describe('job query limit', function() {
} }
after(function (done) { after(function (done) {
// batch services is not activate, so we need empty the queue to avoid unexpected redisUtils.clean('batch:*', done);
// behaviour in further tests
metadataBackend.redisCmd(5, 'DEL', [ 'batch:queues:localhost' ], done);
}); });
it('POST /api/v2/sql/job with a invalid query size should respond with 400 query too long', function (done){ it('POST /api/v2/sql/job with a invalid query size should respond with 400 query too long', function (done){

View File

@ -16,22 +16,14 @@ require('../helper');
var app = require(global.settings.app_root + '/app/app')(); var app = require(global.settings.app_root + '/app/app')();
var assert = require('../support/assert'); var assert = require('../support/assert');
var redisUtils = require('../support/redis_utils');
var querystring = require('querystring'); var querystring = require('querystring');
var metadataBackend = require('cartodb-redis')({
host: global.settings.redis_host,
port: global.settings.redis_port,
max: global.settings.redisPool,
idleTimeoutMillis: global.settings.redisIdleTimeoutMillis,
reapIntervalMillis: global.settings.redisReapIntervalMillis
});
describe('job module', function() { describe('job module', function() {
var job = {}; var job = {};
after(function (done) { after(function (done) {
// batch services is not activate, so we need empty the queue to avoid unexpected redisUtils.clean('batch:*', done);
// behaviour in further tests
metadataBackend.redisCmd(5, 'DEL', [ 'batch:queues:localhost' ], done);
}); });
it('POST /api/v2/sql/job should respond with 200 and the created job', function (done){ it('POST /api/v2/sql/job should respond with 200 and the created job', function (done){

View File

@ -1,6 +1,7 @@
require('../helper'); require('../helper');
var assert = require('../support/assert'); var assert = require('../support/assert');
var redisUtils = require('../support/redis_utils');
var app = require(global.settings.app_root + '/app/app')(); var app = require(global.settings.app_root + '/app/app')();
var querystring = require('qs'); var querystring = require('qs');
var redisConfig = { var redisConfig = {
@ -73,15 +74,14 @@ describe('Batch API query timing', function () {
var batch = batchFactory(metadataBackend, redisConfig); var batch = batchFactory(metadataBackend, redisConfig);
before(function () { before(function (done) {
batch.start(); batch.start();
batch.on('ready', done);
}); });
after(function (done) { after(function (done) {
batch.stop(); batch.stop();
batch.drain(function () { redisUtils.clean('batch:*', done);
metadataBackend.redisCmd(5, 'DEL', [ 'batch:queues:localhost' ], done);
});
}); });
describe('should report start and end time for each query with fallback queries', function () { describe('should report start and end time for each query with fallback queries', function () {

View File

@ -16,6 +16,7 @@ require('../helper');
var app = require(global.settings.app_root + '/app/app')(); var app = require(global.settings.app_root + '/app/app')();
var assert = require('../support/assert'); var assert = require('../support/assert');
var redisUtils = require('../support/redis_utils');
var querystring = require('querystring'); var querystring = require('querystring');
var redisConfig = { var redisConfig = {
host: global.settings.redis_host, host: global.settings.redis_host,
@ -28,18 +29,16 @@ var metadataBackend = require('cartodb-redis')(redisConfig);
var batchFactory = require('../../batch'); var batchFactory = require('../../batch');
describe('Use case 1: cancel and modify a done job', function () { describe('Use case 1: cancel and modify a done job', function () {
var batch = batchFactory(metadataBackend, redisConfig); var batch = batchFactory(metadataBackend, redisConfig);
before(function () { before(function (done) {
batch.start(); batch.start();
batch.on('ready', done);
}); });
after(function (done) { after(function (done) {
batch.stop(); batch.stop();
batch.drain(function () { redisUtils.clean('batch:*', done);
metadataBackend.redisCmd(5, 'DEL', [ 'batch:queues:localhost' ], done);
});
}); });
var doneJob = {}; var doneJob = {};

View File

@ -16,6 +16,7 @@ require('../helper');
var app = require(global.settings.app_root + '/app/app')(); var app = require(global.settings.app_root + '/app/app')();
var assert = require('../support/assert'); var assert = require('../support/assert');
var redisUtils = require('../support/redis_utils');
var querystring = require('querystring'); var querystring = require('querystring');
var redisConfig = { var redisConfig = {
host: global.settings.redis_host, host: global.settings.redis_host,
@ -28,18 +29,16 @@ var metadataBackend = require('cartodb-redis')(redisConfig);
var batchFactory = require('../../batch'); var batchFactory = require('../../batch');
describe('Use case 10: cancel and modify a done multiquery job', function () { describe('Use case 10: cancel and modify a done multiquery job', function () {
var batch = batchFactory(metadataBackend, redisConfig); var batch = batchFactory(metadataBackend, redisConfig);
before(function () { before(function (done) {
batch.start(); batch.start();
batch.on('ready', done);
}); });
after(function (done) { after(function (done) {
batch.stop(); batch.stop();
batch.drain(function () { redisUtils.clean('batch:*', done);
metadataBackend.redisCmd(5, 'DEL', [ 'batch:queues:localhost' ], done);
});
}); });
var doneJob = {}; var doneJob = {};

View File

@ -16,6 +16,7 @@ require('../helper');
var app = require(global.settings.app_root + '/app/app')(); var app = require(global.settings.app_root + '/app/app')();
var assert = require('../support/assert'); var assert = require('../support/assert');
var redisUtils = require('../support/redis_utils');
var querystring = require('querystring'); var querystring = require('querystring');
var redisConfig = { var redisConfig = {
host: global.settings.redis_host, host: global.settings.redis_host,
@ -28,18 +29,16 @@ var metadataBackend = require('cartodb-redis')(redisConfig);
var batchFactory = require('../../batch'); var batchFactory = require('../../batch');
describe('Use case 2: cancel a running job', function() { describe('Use case 2: cancel a running job', function() {
var batch = batchFactory(metadataBackend, redisConfig); var batch = batchFactory(metadataBackend, redisConfig);
before(function () { before(function (done) {
batch.start(); batch.start();
batch.on('ready', done);
}); });
after(function (done) { after(function (done) {
batch.stop(); batch.stop();
batch.drain(function () { redisUtils.clean('batch:*', done);
metadataBackend.redisCmd(5, 'DEL', [ 'batch:queues:localhost' ], done);
});
}); });
var runningJob = {}; var runningJob = {};

View File

@ -16,6 +16,7 @@ require('../helper');
var app = require(global.settings.app_root + '/app/app')(); var app = require(global.settings.app_root + '/app/app')();
var assert = require('../support/assert'); var assert = require('../support/assert');
var redisUtils = require('../support/redis_utils');
var querystring = require('querystring'); var querystring = require('querystring');
var redisConfig = { var redisConfig = {
host: global.settings.redis_host, host: global.settings.redis_host,
@ -28,18 +29,16 @@ var metadataBackend = require('cartodb-redis')(redisConfig);
var batchFactory = require('../../batch'); var batchFactory = require('../../batch');
describe('Use case 3: cancel a pending job', function() { describe('Use case 3: cancel a pending job', function() {
var batch = batchFactory(metadataBackend, redisConfig); var batch = batchFactory(metadataBackend, redisConfig);
before(function () { before(function (done) {
batch.start(); batch.start();
batch.on('ready', done);
}); });
after(function (done) { after(function (done) {
batch.stop(); batch.stop();
batch.drain(function () { redisUtils.clean('batch:*', done);
metadataBackend.redisCmd(5, 'DEL', [ 'batch:queues:localhost' ], done);
});
}); });
var runningJob = {}; var runningJob = {};

View File

@ -16,6 +16,7 @@ require('../helper');
var app = require(global.settings.app_root + '/app/app')(); var app = require(global.settings.app_root + '/app/app')();
var assert = require('../support/assert'); var assert = require('../support/assert');
var redisUtils = require('../support/redis_utils');
var querystring = require('querystring'); var querystring = require('querystring');
var redisConfig = { var redisConfig = {
host: global.settings.redis_host, host: global.settings.redis_host,
@ -28,18 +29,16 @@ var metadataBackend = require('cartodb-redis')(redisConfig);
var batchFactory = require('../../batch'); var batchFactory = require('../../batch');
describe('Use case 4: modify a pending job', function() { describe('Use case 4: modify a pending job', function() {
var batch = batchFactory(metadataBackend, redisConfig); var batch = batchFactory(metadataBackend, redisConfig);
before(function () { before(function (done) {
batch.start(); batch.start();
batch.on('ready', done);
}); });
after(function (done) { after(function (done) {
batch.stop(); batch.stop();
batch.drain(function () { redisUtils.clean('batch:*', done);
metadataBackend.redisCmd(5, 'DEL', [ 'batch:queues:localhost' ], done);
});
}); });
var runningJob = {}; var runningJob = {};

View File

@ -16,6 +16,7 @@ require('../helper');
var app = require(global.settings.app_root + '/app/app')(); var app = require(global.settings.app_root + '/app/app')();
var assert = require('../support/assert'); var assert = require('../support/assert');
var redisUtils = require('../support/redis_utils');
var querystring = require('querystring'); var querystring = require('querystring');
var redisConfig = { var redisConfig = {
host: global.settings.redis_host, host: global.settings.redis_host,
@ -30,15 +31,14 @@ var batchFactory = require('../../batch');
describe('Use case 5: modify a running job', function() { describe('Use case 5: modify a running job', function() {
var batch = batchFactory(metadataBackend, redisConfig); var batch = batchFactory(metadataBackend, redisConfig);
before(function () { before(function (done) {
batch.start(); batch.start();
batch.on('ready', done);
}); });
after(function (done) { after(function (done) {
batch.stop(); batch.stop();
batch.drain(function () { redisUtils.clean('batch:*', done);
metadataBackend.redisCmd(5, 'DEL', [ 'batch:queues:localhost' ], done);
});
}); });
var runningJob = {}; var runningJob = {};

View File

@ -16,6 +16,7 @@ require('../helper');
var app = require(global.settings.app_root + '/app/app')(); var app = require(global.settings.app_root + '/app/app')();
var assert = require('../support/assert'); var assert = require('../support/assert');
var redisUtils = require('../support/redis_utils');
var querystring = require('querystring'); var querystring = require('querystring');
var redisConfig = { var redisConfig = {
host: global.settings.redis_host, host: global.settings.redis_host,
@ -28,18 +29,16 @@ var metadataBackend = require('cartodb-redis')(redisConfig);
var batchFactory = require('../../batch'); var batchFactory = require('../../batch');
describe('Use case 6: modify a done job', function() { describe('Use case 6: modify a done job', function() {
var batch = batchFactory(metadataBackend, redisConfig); var batch = batchFactory(metadataBackend, redisConfig);
before(function () { before(function (done) {
batch.start(); batch.start();
batch.on('ready', done);
}); });
after(function (done) { after(function (done) {
batch.stop(); batch.stop();
batch.drain(function () { redisUtils.clean('batch:*', done);
metadataBackend.redisCmd(5, 'DEL', [ 'batch:queues:localhost' ], done);
});
}); });
var doneJob = {}; var doneJob = {};

View File

@ -16,6 +16,7 @@ require('../helper');
var app = require(global.settings.app_root + '/app/app')(); var app = require(global.settings.app_root + '/app/app')();
var assert = require('../support/assert'); var assert = require('../support/assert');
var redisUtils = require('../support/redis_utils');
var querystring = require('querystring'); var querystring = require('querystring');
var redisConfig = { var redisConfig = {
host: global.settings.redis_host, host: global.settings.redis_host,
@ -28,18 +29,16 @@ var metadataBackend = require('cartodb-redis')(redisConfig);
var batchFactory = require('../../batch'); var batchFactory = require('../../batch');
describe('Use case 7: cancel a job with quotes', function() { describe('Use case 7: cancel a job with quotes', function() {
var batch = batchFactory(metadataBackend, redisConfig); var batch = batchFactory(metadataBackend, redisConfig);
before(function () { before(function (done) {
batch.start(); batch.start();
batch.on('ready', done);
}); });
after(function (done) { after(function (done) {
batch.stop(); batch.stop();
batch.drain(function () { redisUtils.clean('batch:*', done);
metadataBackend.redisCmd(5, 'DEL', [ 'batch:queues:localhost' ], done);
});
}); });
var runningJob = {}; var runningJob = {};

View File

@ -16,6 +16,7 @@ require('../helper');
var app = require(global.settings.app_root + '/app/app')(); var app = require(global.settings.app_root + '/app/app')();
var assert = require('../support/assert'); var assert = require('../support/assert');
var redisUtils = require('../support/redis_utils');
var querystring = require('querystring'); var querystring = require('querystring');
var redisConfig = { var redisConfig = {
host: global.settings.redis_host, host: global.settings.redis_host,
@ -28,18 +29,16 @@ var metadataBackend = require('cartodb-redis')(redisConfig);
var batchFactory = require('../../batch'); var batchFactory = require('../../batch');
describe('Use case 8: cancel a running multiquery job', function() { describe('Use case 8: cancel a running multiquery job', function() {
var batch = batchFactory(metadataBackend, redisConfig); var batch = batchFactory(metadataBackend, redisConfig);
before(function () { before(function (done) {
batch.start(); batch.start();
batch.on('ready', done);
}); });
after(function (done) { after(function (done) {
batch.stop(); batch.stop();
batch.drain(function () { redisUtils.clean('batch:*', done);
metadataBackend.redisCmd(5, 'DEL', [ 'batch:queues:localhost' ], done);
});
}); });
var runningJob = {}; var runningJob = {};

View File

@ -16,6 +16,7 @@ require('../helper');
var app = require(global.settings.app_root + '/app/app')(); var app = require(global.settings.app_root + '/app/app')();
var assert = require('../support/assert'); var assert = require('../support/assert');
var redisUtils = require('../support/redis_utils');
var querystring = require('querystring'); var querystring = require('querystring');
var redisConfig = { var redisConfig = {
host: global.settings.redis_host, host: global.settings.redis_host,
@ -28,18 +29,16 @@ var metadataBackend = require('cartodb-redis')(redisConfig);
var batchFactory = require('../../batch'); var batchFactory = require('../../batch');
describe('Use case 9: modify a pending multiquery job', function() { describe('Use case 9: modify a pending multiquery job', function() {
var batch = batchFactory(metadataBackend, redisConfig); var batch = batchFactory(metadataBackend, redisConfig);
before(function () { before(function (done) {
batch.start(); batch.start();
batch.on('ready', done);
}); });
after(function (done) { after(function (done) {
batch.stop(); batch.stop();
batch.drain(function () { redisUtils.clean('batch:*', done);
metadataBackend.redisCmd(5, 'DEL', [ 'batch:queues:localhost' ], done);
});
}); });
var runningJob = {}; var runningJob = {};

View File

@ -0,0 +1,224 @@
'use strict';
require('../../helper');
var assert = require('../../support/assert');
var redisUtils = require('../../support/redis_utils');
var queue = require('queue-async');
var redisConfig = {
host: global.settings.redis_host,
port: global.settings.redis_port,
max: global.settings.redisPool,
idleTimeoutMillis: global.settings.redisIdleTimeoutMillis,
reapIntervalMillis: global.settings.redisReapIntervalMillis
};
var metadataBackend = require('cartodb-redis')(redisConfig);
var StatsD = require('node-statsd').StatsD;
var statsdClient = new StatsD(global.settings.statsd);
var BATCH_SOURCE = '../../../batch/';
var batchFactory = require(BATCH_SOURCE + 'index');
var _ = require('underscore');
var RedisPool = require('redis-mpool');
var jobStatus = require(BATCH_SOURCE + 'job_status');
var JobPublisher = require(BATCH_SOURCE + 'job_publisher');
var JobQueue = require(BATCH_SOURCE + 'job_queue');
var UserIndexer = require(BATCH_SOURCE + 'user_indexer');
var JobBackend = require(BATCH_SOURCE + 'job_backend');
var JobFactory = require(BATCH_SOURCE + 'models/job_factory');
var redisPoolPublisher = new RedisPool(_.extend(redisConfig, { name: 'batch-publisher'}));
var jobPublisher = new JobPublisher(redisPoolPublisher);
var jobQueue = new JobQueue(metadataBackend, jobPublisher);
var userIndexer = new UserIndexer(metadataBackend);
var jobBackend = new JobBackend(metadataBackend, jobQueue, userIndexer);
var USER = 'vizzuality';
var HOST = 'localhost';
function createJob(job) {
jobBackend.create(job, function () {});
}
function getJob(job_id, callback) {
jobBackend.get(job_id, function (err, job) {
if (err) {
return callback(err);
}
callback(null, job);
});
}
function assertJob(job, expectedStatus, done) {
return function (job_id) {
if (job.job_id === job_id) {
getJob(job_id, function (err, jobDone) {
if (err) {
return done(err);
}
assert.equal(jobDone.status, expectedStatus);
done();
});
}
};
}
describe('batch multiquery', function() {
var batch = batchFactory(metadataBackend, redisConfig, statsdClient);
before(function (done) {
batch.start();
batch.on('ready', done);
});
after(function (done) {
batch.removeAllListeners();
batch.stop();
redisUtils.clean('batch:*', done);
});
it('should perform one multiquery job with two queries', function (done) {
var queries = [
'select pg_sleep(0)',
'select pg_sleep(0)'
];
var job = JobFactory.create({ user: USER, host: HOST, query: queries});
var assertCallback = assertJob(job.data, jobStatus.DONE, done);
batch.on('job:done', assertCallback);
createJob(job.data);
});
it('should perform one multiquery job with two queries and fail on last one', function (done) {
var queries = [
'select pg_sleep(0)',
'select shouldFail()'
];
var job = JobFactory.create({ user: USER, host: HOST, query: queries});
var assertCallback = assertJob(job.data, jobStatus.FAILED, done);
batch.on('job:failed', assertCallback);
createJob(job.data);
});
it('should perform one multiquery job with three queries and fail on last one', function (done) {
var queries = [
'select pg_sleep(0)',
'select pg_sleep(0)',
'select shouldFail()'
];
var job = JobFactory.create({ user: USER, host: HOST, query: queries});
var assertCallback = assertJob(job.data, jobStatus.FAILED, done);
batch.on('job:failed', assertCallback);
createJob(job.data);
});
it('should perform one multiquery job with three queries and fail on second one', function (done) {
var queries = [
'select pg_sleep(0)',
'select shouldFail()',
'select pg_sleep(0)'
];
var job = JobFactory.create({ user: USER, host: HOST, query: queries});
var assertCallback = assertJob(job.data, jobStatus.FAILED, done);
batch.on('job:failed', assertCallback);
createJob(job.data);
});
it('should perform two multiquery job with two queries for each one', function (done) {
var jobs = [];
jobs.push(JobFactory.create({ user: USER, host: HOST, query: [
'select pg_sleep(0)',
'select pg_sleep(0)'
]}));
jobs.push(JobFactory.create({ user: USER, host: HOST, query: [
'select pg_sleep(0)',
'select pg_sleep(0)'
]}));
var jobsQueue = queue(jobs.length);
jobs.forEach(function (job) {
jobsQueue.defer(function (callback) {
batch.on('job:done', assertJob(job.data, jobStatus.DONE, callback));
createJob(job.data);
});
});
jobsQueue.awaitAll(done);
});
it('should perform two multiquery job with two queries for each one and fail the first one', function (done) {
var jobs = [];
jobs.push(JobFactory.create({ user: USER, host: HOST, query: [
'select pg_sleep(0)',
'select shouldFail()'
]}));
jobs.push(JobFactory.create({ user: USER, host: HOST, query: [
'select pg_sleep(0)',
'select pg_sleep(0)'
]}));
var jobsQueue = queue(jobs.length);
jobsQueue.defer(function (callback) {
batch.on('job:failed', assertJob(jobs[0].data, jobStatus.FAILED, callback));
createJob(jobs[0].data);
});
jobsQueue.defer(function (callback) {
batch.on('job:done', assertJob(jobs[1].data, jobStatus.DONE, callback));
createJob(jobs[1].data);
});
jobsQueue.awaitAll(done);
});
it('should perform two multiquery job with two queries for each one and fail the second one', function (done) {
var jobs = [];
jobs.push(JobFactory.create({ user: USER, host: HOST, query: [
'select pg_sleep(0)',
'select pg_sleep(0)'
]}));
jobs.push(JobFactory.create({ user: USER, host: HOST, query: [
'select pg_sleep(0)',
'select shouldFail()'
]}));
var jobsQueue = queue(jobs.length);
jobsQueue.defer(function (callback) {
batch.on('job:done', assertJob(jobs[0].data, jobStatus.DONE, callback));
createJob(jobs[0].data);
});
jobsQueue.defer(function (callback) {
batch.on('job:failed', assertJob(jobs[1].data, jobStatus.FAILED, callback));
createJob(jobs[1].data);
});
jobsQueue.awaitAll(done);
});
});

View File

@ -0,0 +1,147 @@
'use strict';
require('../../helper');
var BATCH_SOURCE = '../../../batch/';
var assert = require('../../support/assert');
var redisUtils = require('../../support/redis_utils');
var _ = require('underscore');
var RedisPool = require('redis-mpool');
var UserIndexer = require(BATCH_SOURCE + 'user_indexer');
var JobQueue = require(BATCH_SOURCE + 'job_queue');
var JobBackend = require(BATCH_SOURCE + 'job_backend');
var JobPublisher = require(BATCH_SOURCE + 'job_publisher');
var JobFactory = require(BATCH_SOURCE + 'models/job_factory');
var jobStatus = require(BATCH_SOURCE + 'job_status');
var redisConfig = {
host: global.settings.redis_host,
port: global.settings.redis_port,
max: global.settings.redisPool,
idleTimeoutMillis: global.settings.redisIdleTimeoutMillis,
reapIntervalMillis: global.settings.redisReapIntervalMillis
};
var metadataBackend = require('cartodb-redis')(redisConfig);
var redisPoolPublisher = new RedisPool(_.extend(redisConfig, { name: 'batch-publisher'}));
var jobPublisher = new JobPublisher(redisPoolPublisher);
var jobQueue = new JobQueue(metadataBackend, jobPublisher);
var userIndexer = new UserIndexer(metadataBackend);
var USER = 'vizzuality';
var QUERY = 'select pg_sleep(0)';
var HOST = 'localhost';
var JOB = {
user: USER,
query: QUERY,
host: HOST
};
function createWadusJob() {
return JobFactory.create(JSON.parse(JSON.stringify(JOB)));
}
describe('job backend', function() {
var jobBackend = new JobBackend(metadataBackend, jobQueue, userIndexer);
after(function (done) {
redisUtils.clean('batch:*', done);
});
it('.create() should persist a job', function (done) {
var job = createWadusJob();
jobBackend.create(job.data, function (err, jobCreated) {
if (err) {
return done(err);
}
assert.ok(jobCreated.job_id);
assert.equal(jobCreated.status, jobStatus.PENDING);
done();
});
});
it('.create() should return error', function (done) {
var job = createWadusJob();
delete job.data.job_id;
jobBackend.create(job, function (err) {
assert.ok(err);
assert.equal(err.name, 'NotFoundError');
assert.equal(err.message, 'Job with id undefined not found');
done();
});
});
it('.update() should update an existent job', function (done) {
var job = createWadusJob();
jobBackend.create(job.data, function (err, jobCreated) {
if (err) {
return done(err);
}
jobCreated.query = 'select pg_sleep(1)';
var job = JobFactory.create(jobCreated);
jobBackend.update(job.data, function (err, jobUpdated) {
if (err) {
return done(err);
}
assert.equal(jobUpdated.query, 'select pg_sleep(1)');
done();
});
});
});
it('.update() should return error when updates a nonexistent job', function (done) {
var job = createWadusJob();
jobBackend.update(job.data, function (err) {
assert.ok(err, err);
assert.equal(err.name, 'NotFoundError');
assert.equal(err.message, 'Job with id ' + job.data.job_id + ' not found');
done();
});
});
it('.list() should return a list of user\'s jobs', function (done) {
var job = createWadusJob();
jobBackend.create(job.data, function (err, jobCreated) {
if (err) {
return done(err);
}
jobBackend.list(USER, function (err, jobs) {
var found = false;
assert.ok(!err, err);
assert.ok(jobs.length);
jobs.forEach(function (job) {
if (job.job_id === jobCreated.job_id) {
found = true;
}
});
assert.ok(found, 'Job expeted to be listed not found');
done();
});
});
});
it('.list() should return a empty list for nonexitent user', function (done) {
jobBackend.list('wadus_user', function (err, jobs) {
assert.ok(!err, err);
assert.ok(!jobs.length);
done();
});
});
});

View File

@ -0,0 +1,129 @@
'use strict';
require('../../helper');
var BATCH_SOURCE = '../../../batch/';
var assert = require('../../support/assert');
var redisUtils = require('../../support/redis_utils');
var _ = require('underscore');
var RedisPool = require('redis-mpool');
var UserIndexer = require(BATCH_SOURCE + 'user_indexer');
var JobQueue = require(BATCH_SOURCE + 'job_queue');
var JobBackend = require(BATCH_SOURCE + 'job_backend');
var JobPublisher = require(BATCH_SOURCE + 'job_publisher');
var jobStatus = require(BATCH_SOURCE + 'job_status');
var UserDatabaseMetadataService = require(BATCH_SOURCE + 'user_database_metadata_service');
var JobCanceller = require(BATCH_SOURCE + 'job_canceller');
var PSQL = require('cartodb-psql');
var redisConfig = {
host: global.settings.redis_host,
port: global.settings.redis_port,
max: global.settings.redisPool,
idleTimeoutMillis: global.settings.redisIdleTimeoutMillis,
reapIntervalMillis: global.settings.redisReapIntervalMillis
};
var metadataBackend = require('cartodb-redis')(redisConfig);
var redisPoolPublisher = new RedisPool(_.extend(redisConfig, { name: 'batch-publisher'}));
var jobPublisher = new JobPublisher(redisPoolPublisher);
var jobQueue = new JobQueue(metadataBackend, jobPublisher);
var userIndexer = new UserIndexer(metadataBackend);
var jobBackend = new JobBackend(metadataBackend, jobQueue, userIndexer);
var userDatabaseMetadataService = new UserDatabaseMetadataService(metadataBackend);
var JobFactory = require(BATCH_SOURCE + 'models/job_factory');
var USER = 'vizzuality';
var QUERY = 'select pg_sleep(0)';
var HOST = 'localhost';
// sets job to running, run its query and returns inmediatly (don't wait for query finishes)
// in order to test query cancelation/draining
function runQueryHelper(job, callback) {
var job_id = job.job_id;
var user = job.user;
var sql = job.query;
job.status = jobStatus.RUNNING;
jobBackend.update(job, function (err) {
if (err) {
return callback(err);
}
userDatabaseMetadataService.getUserMetadata(user, function (err, userDatabaseMetadata) {
if (err) {
return callback(err);
}
var pg = new PSQL(userDatabaseMetadata, {}, { destroyOnError: true });
sql = '/* ' + job_id + ' */ ' + sql;
pg.eventedQuery(sql, function (err, query) {
if (err) {
return callback(err);
}
callback(null, query);
});
});
});
}
function createWadusJob(query) {
query = query || QUERY;
return JobFactory.create(JSON.parse(JSON.stringify({
user: USER,
query: query,
host: HOST
})));
}
describe('job canceller', function() {
var jobCanceller = new JobCanceller(userDatabaseMetadataService);
after(function (done) {
redisUtils.clean('batch:*', done);
});
it('.cancel() should cancel a job', function (done) {
var job = createWadusJob('select pg_sleep(1)');
jobBackend.create(job.data, function (err, jobCreated) {
if (err) {
return done(err);
}
assert.equal(job.data.job_id, jobCreated.job_id);
runQueryHelper(job.data, function (err) {
if (err) {
return done(err);
}
jobCanceller.cancel(job, function (err) {
if (err) {
return done(err);
}
done();
});
});
});
});
it('.cancel() a non running job should not return an error', function (done) {
var job = createWadusJob();
jobCanceller.cancel(job, function (err) {
if (err) {
return done(err);
}
done();
});
});
});

View File

@ -0,0 +1,50 @@
'use strict';
require('../../helper');
var BATCH_SOURCE = '../../../batch/';
var assert = require('../../support/assert');
var _ = require('underscore');
var RedisPool = require('redis-mpool');
var JobPublisher = require(BATCH_SOURCE + 'job_publisher');
var redisConfig = {
host: global.settings.redis_host,
port: global.settings.redis_port,
max: global.settings.redisPool,
idleTimeoutMillis: global.settings.redisIdleTimeoutMillis,
reapIntervalMillis: global.settings.redisReapIntervalMillis
};
var redisPoolPublisher = new RedisPool(_.extend(redisConfig, { name: 'batch-publisher'}));
var redisPoolSubscriber = new RedisPool(_.extend(redisConfig, { name: 'batch-subscriber'}));
var HOST = 'wadus';
var CHANNEL = 'batch:hosts';
var DB = 0;
describe('job publisher', function() {
var jobPublisher = new JobPublisher(redisPoolPublisher);
it('.publish() should publish in job channel', function (done) {
redisPoolSubscriber.acquire(DB, function (err, client) {
if (err) {
return done(err);
}
client.subscribe(CHANNEL);
client.on('message', function (channel, host) {
assert.equal(host, HOST);
assert.equal(channel, CHANNEL) ;
done();
});
jobPublisher.publish(HOST);
});
});
});

View File

@ -0,0 +1,86 @@
'use strict';
require('../../helper');
var BATCH_SOURCE = '../../../batch/';
var assert = require('../../support/assert');
var redisUtils = require('../../support/redis_utils');
var _ = require('underscore');
var RedisPool = require('redis-mpool');
var UserIndexer = require(BATCH_SOURCE + 'user_indexer');
var JobQueue = require(BATCH_SOURCE + 'job_queue');
var JobBackend = require(BATCH_SOURCE + 'job_backend');
var JobPublisher = require(BATCH_SOURCE + 'job_publisher');
var jobStatus = require(BATCH_SOURCE + 'job_status');
var UserDatabaseMetadataService = require(BATCH_SOURCE + 'user_database_metadata_service');
var JobCanceller = require(BATCH_SOURCE + 'job_canceller');
var JobService = require(BATCH_SOURCE + 'job_service');
var JobRunner = require(BATCH_SOURCE + 'job_runner');
var QueryRunner = require(BATCH_SOURCE + 'query_runner');
var redisConfig = {
host: global.settings.redis_host,
port: global.settings.redis_port,
max: global.settings.redisPool,
idleTimeoutMillis: global.settings.redisIdleTimeoutMillis,
reapIntervalMillis: global.settings.redisReapIntervalMillis
};
var metadataBackend = require('cartodb-redis')(redisConfig);
var redisPoolPublisher = new RedisPool(_.extend(redisConfig, { name: 'batch-publisher'}));
var jobPublisher = new JobPublisher(redisPoolPublisher);
var jobQueue = new JobQueue(metadataBackend, jobPublisher);
var userIndexer = new UserIndexer(metadataBackend);
var jobBackend = new JobBackend(metadataBackend, jobQueue, userIndexer);
var userDatabaseMetadataService = new UserDatabaseMetadataService(metadataBackend);
var jobCanceller = new JobCanceller(userDatabaseMetadataService);
var jobService = new JobService(jobBackend, jobCanceller);
var queryRunner = new QueryRunner(userDatabaseMetadataService);
var StatsD = require('node-statsd').StatsD;
var statsdClient = new StatsD(global.settings.statsd);
var USER = 'vizzuality';
var QUERY = 'select pg_sleep(0)';
var HOST = 'localhost';
var JOB = {
user: USER,
query: QUERY,
host: HOST
};
describe('job runner', function() {
var jobRunner = new JobRunner(jobService, jobQueue, queryRunner, statsdClient);
after(function (done) {
redisUtils.clean('batch:*', done);
});
it('.run() should run a job', function (done) {
jobService.create(JOB, function (err, job) {
if (err) {
return done(err);
}
jobRunner.run(job.data.job_id, function (err, job) {
if (err) {
return done(err);
}
assert.equal(job.data.status, jobStatus.DONE);
done();
});
});
});
it('.run() should return a job not found error', function (done) {
jobRunner.run('wadus_job_id', function (err) {
assert.ok(err, err);
assert.equal(err.name, 'NotFoundError');
assert.equal(err.message, 'Job with id wadus_job_id not found');
done();
});
});
});

View File

@ -0,0 +1,279 @@
'use strict';
require('../../helper');
var BATCH_SOURCE = '../../../batch/';
var assert = require('../../support/assert');
var redisUtils = require('../../support/redis_utils');
var _ = require('underscore');
var RedisPool = require('redis-mpool');
var UserIndexer = require(BATCH_SOURCE + 'user_indexer');
var JobQueue = require(BATCH_SOURCE + 'job_queue');
var JobBackend = require(BATCH_SOURCE + 'job_backend');
var JobPublisher = require(BATCH_SOURCE + 'job_publisher');
var jobStatus = require(BATCH_SOURCE + 'job_status');
var UserDatabaseMetadataService = require(BATCH_SOURCE + 'user_database_metadata_service');
var JobCanceller = require(BATCH_SOURCE + 'job_canceller');
var JobService = require(BATCH_SOURCE + 'job_service');
var PSQL = require('cartodb-psql');
var redisConfig = {
host: global.settings.redis_host,
port: global.settings.redis_port,
max: global.settings.redisPool,
idleTimeoutMillis: global.settings.redisIdleTimeoutMillis,
reapIntervalMillis: global.settings.redisReapIntervalMillis
};
var metadataBackend = require('cartodb-redis')(redisConfig);
var redisPoolPublisher = new RedisPool(_.extend(redisConfig, { name: 'batch-publisher'}));
var jobPublisher = new JobPublisher(redisPoolPublisher);
var jobQueue = new JobQueue(metadataBackend, jobPublisher);
var userIndexer = new UserIndexer(metadataBackend);
var jobBackend = new JobBackend(metadataBackend, jobQueue, userIndexer);
var userDatabaseMetadataService = new UserDatabaseMetadataService(metadataBackend);
var jobCanceller = new JobCanceller(userDatabaseMetadataService);
var USER = 'vizzuality';
var QUERY = 'select pg_sleep(0)';
var HOST = 'localhost';
var JOB = {
user: USER,
query: QUERY,
host: HOST
};
function createWadusDataJob() {
return JSON.parse(JSON.stringify(JOB));
}
// sets job to running, run its query and returns inmediatly (don't wait for query finishes)
// in order to test query cancelation/draining
function runQueryHelper(job, callback) {
var job_id = job.job_id;
var user = job.user;
var sql = job.query;
job.status = jobStatus.RUNNING;
jobBackend.update(job, function (err) {
if (err) {
return callback(err);
}
userDatabaseMetadataService.getUserMetadata(user, function (err, userDatabaseMetadata) {
if (err) {
return callback(err);
}
var pg = new PSQL(userDatabaseMetadata, {}, { destroyOnError: true });
sql = '/* ' + job_id + ' */ ' + sql;
pg.eventedQuery(sql, function (err, query) {
if (err) {
return callback(err);
}
callback(null, query);
});
});
});
}
describe('job service', function() {
var jobService = new JobService(jobBackend, jobCanceller);
after(function (done) {
redisUtils.clean('batch:*', done);
});
it('.get() should return a job', function (done) {
jobService.create(createWadusDataJob(), function (err, jobCreated) {
if (err) {
return done(err);
}
jobService.get(jobCreated.data.job_id, function (err, job) {
if (err) {
return done(err);
}
assert.equal(job.data.job_id, jobCreated.data.job_id);
done();
});
});
});
it('.get() should return a not found error', function (done) {
jobService.get('wadus_job_id', function (err) {
assert.ok(err);
assert.equal(err.message, 'Job with id wadus_job_id not found');
done();
});
});
it('.create() should persist a job', function (done) {
jobService.create(createWadusDataJob(), function (err, jobCreated) {
if (err) {
return done(err);
}
assert.ok(jobCreated.data.job_id);
assert.equal(jobCreated.data.status, jobStatus.PENDING);
done();
});
});
it('.create() should return error with invalid job data', function (done) {
var job = createWadusDataJob();
delete job.query;
jobService.create(job, function (err) {
assert.ok(err);
assert.equal(err.message, 'You must indicate a valid SQL');
done();
});
});
it('.list() should return a list of user\'s jobs', function (done) {
jobService.create(createWadusDataJob(), function (err, jobCreated) {
if (err) {
return done(err);
}
jobService.list(USER, function (err, jobs) {
var found = false;
assert.ok(!err, err);
assert.ok(jobs.length);
jobs.forEach(function (job) {
if (job.data.job_id === jobCreated.data.job_id) {
found = true;
}
});
assert.ok(found, 'Job expeted to be listed not found');
done();
});
});
});
it('.list() should return a empty list for nonexitent user', function (done) {
jobService.list('wadus_user', function (err, jobs) {
assert.ok(!err, err);
assert.ok(!jobs.length);
done();
});
});
it('.update() should update a job', function (done) {
jobService.create(createWadusDataJob(), function (err, jobCreated) {
if (err) {
return done(err);
}
jobCreated.data.query = 'select pg_sleep(1)';
jobService.update(jobCreated.data, function (err, jobUpdated) {
if (err) {
return done(err);
}
assert.equal(jobUpdated.data.job_id, jobCreated.data.job_id);
assert.equal(jobUpdated.data.query, 'select pg_sleep(1)');
done();
});
});
});
it('.update() should return error when updates a nonexistent job', function (done) {
var job = createWadusDataJob();
job.job_id = 'wadus_job_id';
jobService.update(job, function (err) {
assert.ok(err, err);
assert.equal(err.name, 'NotFoundError');
assert.equal(err.message, 'Job with id ' + job.job_id + ' not found');
done();
});
});
it('.cancel() should cancel a running job', function (done) {
var job = createWadusDataJob();
job.query = 'select pg_sleep(3)';
jobService.create(job, function (err, job) {
if (err) {
return done(err);
}
runQueryHelper(job.data, function (err) {
if (err) {
return done(err);
}
jobService.cancel(job.data.job_id, function (err, jobCancelled) {
if (err) {
return done(err);
}
assert.equal(jobCancelled.data.job_id, job.data.job_id);
assert.equal(jobCancelled.data.status, jobStatus.CANCELLED);
done();
});
});
});
});
it('.cancel() should return a job not found error', function (done) {
jobService.cancel('wadus_job_id', function (err) {
assert.ok(err, err);
assert.equal(err.name, 'NotFoundError');
assert.equal(err.message, 'Job with id wadus_job_id not found');
done();
});
});
it('.drain() should draing a running job', function (done) {
var job = createWadusDataJob();
job.query = 'select pg_sleep(3)';
jobService.create(job, function (err, job) {
if (err) {
return done(err);
}
runQueryHelper(job.data, function (err) {
if (err) {
return done(err);
}
jobService.drain(job.data.job_id, function (err, jobDrained) {
if (err) {
return done(err);
}
assert.equal(jobDrained.job_id, job.data.job_id);
assert.equal(jobDrained.status, jobStatus.PENDING);
done();
});
});
});
});
it('.drain() should return a job not found error', function (done) {
jobService.drain('wadus_job_id', function (err) {
assert.ok(err, err);
assert.equal(err.name, 'NotFoundError');
assert.equal(err.message, 'Job with id wadus_job_id not found');
done();
});
});
});

View File

@ -0,0 +1,20 @@
'use strict';
var redisConfig = {
host: global.settings.redis_host,
port: global.settings.redis_port,
max: global.settings.redisPool,
idleTimeoutMillis: global.settings.redisIdleTimeoutMillis,
reapIntervalMillis: global.settings.redisReapIntervalMillis
};
var metadataBackend = require('cartodb-redis')(redisConfig);
module.exports.clean = function clean(pattern, callback) {
metadataBackend.redisCmd(5, 'KEYS', [ pattern ], function (err, keys) {
if (err) {
return callback(err);
}
metadataBackend.redisCmd(5, 'DEL', keys, callback);
});
};