Merge pull request #376 from CartoDB/queue-work-scheduling

Queue work scheduling
This commit is contained in:
Raul Ochoa 2016-10-20 12:37:13 +02:00 committed by GitHub
commit 5ed8e655a8
25 changed files with 1570 additions and 464 deletions

View File

@ -1,6 +1,12 @@
1.39.2 - 2016-mm-dd
1.40.0 - 2016-mm-dd
-------------------
New features:
* Batch queries are handled per db host.
- There is an scheduler controlling how many queries and in what order they are run.
- Priority is based on: number of queries already ran, and oldest user in queue.
* Batch queries capacity: allow to configure how many jobs to run per db host.
1.39.1 - 2016-10-17
-------------------

View File

@ -3,10 +3,11 @@
var util = require('util');
var EventEmitter = require('events').EventEmitter;
var debug = require('./util/debug')('batch');
var forever = require('./util/forever');
var queue = require('queue-async');
var Locker = require('./leader/locker');
var HostUserQueueMover = require('./maintenance/host-user-queue-mover');
var HostScheduler = require('./scheduler/host-scheduler');
var EMPTY_QUEUE = true;
function Batch(name, jobSubscriber, jobQueue, jobRunner, jobService, jobPublisher, redisPool, logger) {
EventEmitter.call(this);
@ -17,11 +18,11 @@ function Batch(name, jobSubscriber, jobQueue, jobRunner, jobService, jobPublishe
this.jobService = jobService;
this.jobPublisher = jobPublisher;
this.logger = logger;
this.locker = Locker.create('redis-distlock', { pool: redisPool });
this.hostScheduler = new HostScheduler(name, { run: this.processJob.bind(this) }, redisPool);
this.hostUserQueueMover = new HostUserQueueMover(jobQueue, jobService, this.locker, redisPool);
// map: host => jobId
this.workingQueues = {};
// map: user => jobId. Will be used for draining jobs.
this.workInProgressJobs = {};
}
util.inherits(Batch, EventEmitter);
@ -38,37 +39,15 @@ Batch.prototype.subscribe = function () {
this.jobSubscriber.subscribe(
function onJobHandler(user, host) {
var resource = host + ':' + user;
debug('onJobHandler(%s, %s)', user, host);
if (self.isProcessingUser(user)) {
return debug('%s is already processing user=%s', self.name, user);
}
// do forever, it does not throw a stack overflow
forever(
function (next) {
self.locker.lock(resource, function(err) {
// we didn't get the lock for the host
if (err) {
debug(
'Could not lock host=%s for user=%s from %s. Reason: %s',
host, self.name, user, err.message
);
return next(err);
}
debug('Locked host=%s for user=%s from %s', host, user, self.name);
self.processNextJob(user, next);
});
},
function (err) {
if (err) {
debug(err.name === 'EmptyQueue' ? err.message : err);
}
self.finishedProcessingUser(user);
self.locker.unlock(resource, debug);
debug('[%s] onJobHandler(%s, %s)', self.name, user, host);
self.hostScheduler.add(host, user, function(err) {
if (err) {
return debug(
'Could not schedule host=%s user=%s from %s. Reason: %s',
host, self.name, user, err.message
);
}
);
});
},
function onJobSubscriberReady(err) {
if (err) {
@ -80,50 +59,45 @@ Batch.prototype.subscribe = function () {
);
};
Batch.prototype.processNextJob = function (user, callback) {
// This is missing the logic for processing several users within the same host
// It requires to:
// - Take care of number of jobs running at the same time per host.
// - Execute user jobs in order.
Batch.prototype.processJob = function (user, callback) {
var self = this;
self.jobQueue.dequeue(user, function (err, jobId) {
if (err) {
return callback(err);
return callback(new Error('Could not get job from "' + user + '". Reason: ' + err.message), !EMPTY_QUEUE);
}
if (!jobId) {
var emptyQueueError = new Error('Queue for user="' + user + '" is empty');
emptyQueueError.name = 'EmptyQueue';
return callback(emptyQueueError);
debug('Queue empty user=%s', user);
return callback(null, EMPTY_QUEUE);
}
self.setProcessingJobId(user, jobId);
self.setWorkInProgressJob(user, jobId);
self.jobRunner.run(jobId, function (err, job) {
self.setProcessingJobId(user, null);
self.clearWorkInProgressJob(user);
if (err) {
debug(err);
if (err.name === 'JobNotRunnable') {
return callback();
return callback(null, !EMPTY_QUEUE);
}
return callback(err);
return callback(err, !EMPTY_QUEUE);
}
debug('Job=%s status=%s user=%s (failed_reason=%s)', jobId, job.data.status, user, job.failed_reason);
debug(
'[%s] Job=%s status=%s user=%s (failed_reason=%s)',
self.name, jobId, job.data.status, user, job.failed_reason
);
self.logger.log(job);
self.emit('job:' + job.data.status, jobId);
callback();
return callback(null, !EMPTY_QUEUE);
});
});
};
Batch.prototype.drain = function (callback) {
var self = this;
var workingUsers = this.getWorkingUsers();
var workingUsers = this.getWorkInProgressUsers();
var batchQueues = queue(workingUsers.length);
workingUsers.forEach(function (user) {
@ -143,7 +117,7 @@ Batch.prototype.drain = function (callback) {
Batch.prototype._drainJob = function (user, callback) {
var self = this;
var job_id = this.getProcessingJobId(user);
var job_id = this.getWorkInProgressJob(user);
if (!job_id) {
return process.nextTick(function () {
@ -169,22 +143,21 @@ Batch.prototype.stop = function (callback) {
this.jobSubscriber.unsubscribe(callback);
};
Batch.prototype.isProcessingUser = function(user) {
return this.workingQueues.hasOwnProperty(user);
/* Work in progress jobs */
Batch.prototype.setWorkInProgressJob = function(user, jobId) {
this.workInProgressJobs[user] = jobId;
};
Batch.prototype.getWorkingUsers = function() {
return Object.keys(this.workingQueues);
Batch.prototype.getWorkInProgressJob = function(user) {
return this.workInProgressJobs[user];
};
Batch.prototype.setProcessingJobId = function(user, jobId) {
this.workingQueues[user] = jobId;
Batch.prototype.clearWorkInProgressJob = function(user) {
delete this.workInProgressJobs[user];
};
Batch.prototype.getProcessingJobId = function(user) {
return this.workingQueues[user];
};
Batch.prototype.finishedProcessingUser = function(user) {
delete this.workingQueues[user];
Batch.prototype.getWorkInProgressUsers = function() {
return Object.keys(this.workInProgressJobs);
};

View File

@ -1,5 +1,7 @@
'use strict';
var debug = require('./util/debug')('queue');
function JobQueue(metadataBackend, jobPublisher) {
this.metadataBackend = metadataBackend;
this.jobPublisher = jobPublisher;
@ -14,16 +16,15 @@ var QUEUE = {
module.exports.QUEUE = QUEUE;
JobQueue.prototype.enqueue = function (user, jobId, callback) {
var self = this;
debug('JobQueue.enqueue user=%s, jobId=%s', user, jobId);
this.metadataBackend.redisCmd(QUEUE.DB, 'LPUSH', [ QUEUE.PREFIX + user, jobId ], function (err) {
if (err) {
return callback(err);
}
self.jobPublisher.publish(user);
this.jobPublisher.publish(user);
callback();
});
}.bind(this));
};
JobQueue.prototype.size = function (user, callback) {
@ -31,9 +32,13 @@ JobQueue.prototype.size = function (user, callback) {
};
JobQueue.prototype.dequeue = function (user, callback) {
this.metadataBackend.redisCmd(QUEUE.DB, 'RPOP', [ QUEUE.PREFIX + user ], callback);
this.metadataBackend.redisCmd(QUEUE.DB, 'RPOP', [ QUEUE.PREFIX + user ], function(err, jobId) {
debug('JobQueue.dequeued user=%s, jobId=%s', user, jobId);
return callback(err, jobId);
});
};
JobQueue.prototype.enqueueFirst = function (user, jobId, callback) {
debug('JobQueue.enqueueFirst user=%s, jobId=%s', user, jobId);
this.metadataBackend.redisCmd(QUEUE.DB, 'RPUSH', [ QUEUE.PREFIX + user, jobId ], callback);
};

View File

@ -2,17 +2,21 @@
var RedisDistlockLocker = require('./provider/redis-distlock');
var debug = require('../util/debug')('leader-locker');
var EventEmitter = require('events').EventEmitter;
var util = require('util');
var LOCK = {
TTL: 5000
};
function Locker(locker, ttl) {
EventEmitter.call(this);
this.locker = locker;
this.ttl = (Number.isFinite(ttl) && ttl > 0) ? ttl : LOCK.TTL;
this.renewInterval = this.ttl / 5;
this.intervalIds = {};
}
util.inherits(Locker, EventEmitter);
module.exports = Locker;
@ -43,6 +47,7 @@ Locker.prototype.startRenewal = function(resource) {
debug('Trying to extend lock resource=%s', resource);
self.locker.lock(resource, self.ttl, function(err, _lock) {
if (err) {
self.emit('error', err, resource);
return self.stopRenewal(resource);
}
if (_lock) {

View File

@ -6,7 +6,7 @@ var REDIS_DISTLOCK = {
};
var Redlock = require('redlock');
var debug = require('../../util/debug')('redis-distlock');
var debug = require('../../util/debug')('leader:redis-distlock');
function RedisDistlockLocker(redisPool) {
this.pool = redisPool;

View File

@ -0,0 +1,11 @@
'use strict';
function FixedCapacity(capacity) {
this.capacity = Math.max(1, capacity);
}
module.exports = FixedCapacity;
FixedCapacity.prototype.getCapacity = function(callback) {
return callback(null, this.capacity);
};

View File

@ -0,0 +1,32 @@
'use strict';
var util = require('util');
var debug = require('../../util/debug')('capacity-http-load');
var HttpSimpleCapacity = require('./http-simple');
function HttpLoadCapacity(host, capacityEndpoint) {
HttpSimpleCapacity.call(this, host, capacityEndpoint);
}
util.inherits(HttpLoadCapacity, HttpSimpleCapacity);
module.exports = HttpLoadCapacity;
HttpLoadCapacity.prototype.getCapacity = function(callback) {
this.getResponse(function(err, values) {
var capacity = 1;
if (err) {
return callback(null, capacity);
}
var cores = parseInt(values.cores, 10);
var relativeLoad = parseFloat(values.relative_load);
capacity = Math.max(1, Math.floor(((1 - relativeLoad) * cores) - 1));
capacity = Number.isFinite(capacity) ? capacity : 1;
debug('host=%s, capacity=%s', this.host, capacity);
return callback(null, capacity);
}.bind(this));
};

View File

@ -0,0 +1,62 @@
'use strict';
var request = require('request');
var debug = require('../../util/debug')('capacity-http-simple');
function HttpSimpleCapacity(host, capacityEndpoint) {
this.host = host;
this.capacityEndpoint = capacityEndpoint;
this.lastResponse = null;
this.lastResponseTime = 0;
}
module.exports = HttpSimpleCapacity;
HttpSimpleCapacity.prototype.getCapacity = function(callback) {
this.getResponse(function(err, values) {
var capacity = 1;
if (err) {
return callback(null, capacity);
}
var availableCores = parseInt(values.available_cores, 10);
capacity = Math.max(availableCores, 1);
capacity = Number.isFinite(capacity) ? capacity : 1;
debug('host=%s, capacity=%s', this.host, capacity);
return callback(null, capacity);
}.bind(this));
};
HttpSimpleCapacity.prototype.getResponse = function(callback) {
var requestParams = {
method: 'POST',
url: this.capacityEndpoint,
timeout: 2000,
json: true
};
debug('getCapacity(%s)', this.host);
// throttle requests for 500 ms
var now = Date.now();
if (this.lastResponse !== null && ((now - this.lastResponseTime) < 500)) {
return callback(null, this.lastResponse);
}
request.post(requestParams, function(err, res, jsonRes) {
if (err) {
return callback(err);
}
if (jsonRes && jsonRes.retcode === 0) {
this.lastResponse = jsonRes.return_values || {};
// We could go more aggressive by updating lastResponseTime on failures.
this.lastResponseTime = now;
return callback(null, this.lastResponse);
}
return callback(new Error('Could not retrieve information from endpoint'));
}.bind(this));
};

View File

@ -0,0 +1,85 @@
'use strict';
var _ = require('underscore');
var debug = require('../util/debug')('host-scheduler');
var Scheduler = require('./scheduler');
var Locker = require('../leader/locker');
var FixedCapacity = require('./capacity/fixed');
var HttpSimpleCapacity = require('./capacity/http-simple');
var HttpLoadCapacity = require('./capacity/http-load');
function HostScheduler(name, taskRunner, redisPool) {
this.name = name || 'scheduler';
this.taskRunner = taskRunner;
this.locker = Locker.create('redis-distlock', { pool: redisPool });
this.locker.on('error', function(err, host) {
debug('[%s] Locker.error %s', this.name, err.message);
this.unlock(host);
}.bind(this));
// host => Scheduler
this.schedulers = {};
}
module.exports = HostScheduler;
HostScheduler.prototype.add = function(host, user, callback) {
this.lock(host, function(err, scheduler) {
if (err) {
debug('[%s] Could not lock host=%s', this.name, host);
return callback(err);
}
scheduler.add(user);
var wasRunning = scheduler.schedule();
debug('[%s] Scheduler host=%s was running=%s', this.name, host, wasRunning);
return callback(err, wasRunning);
}.bind(this));
};
HostScheduler.prototype.getCapacityProvider = function(host) {
var strategy = global.settings.batch_capacity_strategy;
if (strategy === 'http-simple' || strategy === 'http-load') {
if (global.settings.batch_capacity_http_url_template) {
var endpoint = _.template(global.settings.batch_capacity_http_url_template, { dbhost: host });
debug('Using strategy=%s capacity. Endpoint=%s', strategy, endpoint);
if (strategy === 'http-simple') {
return new HttpSimpleCapacity(host, endpoint);
}
return new HttpLoadCapacity(host, endpoint);
}
}
var fixedCapacity = global.settings.batch_capacity_fixed_amount || 2;
debug('Using strategy=fixed capacity=%d', fixedCapacity);
return new FixedCapacity(fixedCapacity);
};
HostScheduler.prototype.lock = function(host, callback) {
debug('[%s] lock(%s)', this.name, host);
var self = this;
this.locker.lock(host, function(err) {
if (err) {
debug('[%s] Could not lock host=%s. Reason: %s', self.name, host, err.message);
return callback(err);
}
if (!self.schedulers.hasOwnProperty(host)) {
var scheduler = new Scheduler(self.getCapacityProvider(host), self.taskRunner);
scheduler.on('done', self.unlock.bind(self, host));
self.schedulers[host] = scheduler;
}
debug('[%s] Locked host=%s', self.name, host);
return callback(null, self.schedulers[host]);
});
};
HostScheduler.prototype.unlock = function(host) {
debug('[%s] unlock(%s)', this.name, host);
if (this.schedulers.hasOwnProperty(host)) {
// TODO stop scheduler?
delete this.schedulers[host];
}
this.locker.unlock(host, debug);
};

View File

@ -0,0 +1,200 @@
'use strict';
// Inspiration from:
// - https://www.kernel.org/doc/Documentation/scheduler/sched-design-CFS.txt
// - https://www.kernel.org/doc/Documentation/rbtree.txt
// - http://www.ibm.com/developerworks/linux/library/l-completely-fair-scheduler/
var util = require('util');
var EventEmitter = require('events').EventEmitter;
var RBTree = require('bintrees').RBTree;
var debug = require('../util/debug')('scheduler');
var forever = require('../util/forever');
function Scheduler(capacity, taskRunner) {
EventEmitter.call(this);
debug('new Scheduler');
this.taskRunner = taskRunner;
this.capacity = capacity;
this.tasks = [];
this.users = {};
this.tasksTree = new RBTree(function(taskEntityA, taskEntityB) {
// if the user is the same it's the same entity
if (taskEntityA.user === taskEntityB.user) {
return 0;
}
// priority for entity with less executed jobs
if (taskEntityA.jobs !== taskEntityB.jobs) {
return taskEntityA.jobs - taskEntityB.jobs;
}
// priority for oldest job
if (taskEntityA.createdAt !== taskEntityB.createdAt) {
return taskEntityA.createdAt - taskEntityB.createdAt;
}
// we don't care if we arrive here
return -1;
});
}
util.inherits(Scheduler, EventEmitter);
module.exports = Scheduler;
Scheduler.prototype.add = function(user) {
debug('add(%s)', user);
var taskEntity = this.users[user];
if (taskEntity) {
if (taskEntity.status === STATUS.DONE) {
taskEntity.status = STATUS.PENDING;
}
return true;
} else {
taskEntity = new TaskEntity(user, this.tasks.length);
this.tasks.push(taskEntity);
this.users[user] = taskEntity;
this.tasksTree.insert(taskEntity);
this.emit('add');
return false;
}
};
Scheduler.prototype.schedule = function() {
if (this.running) {
return true;
}
this.running = true;
var self = this;
forever(
function (next) {
debug('Waiting for task');
self.acquire(function(err, taskEntity) {
debug('Acquired user=%j', taskEntity);
if (!taskEntity) {
return next(new Error('all users finished'));
}
self.tasksTree.remove(taskEntity);
taskEntity.running();
debug('Running task for user=%s', taskEntity.user);
self.taskRunner.run(taskEntity.user, function(err, userQueueIsEmpty) {
debug('Run task=%j, done=%s', taskEntity, userQueueIsEmpty);
taskEntity.ran(userQueueIsEmpty);
self.release(err, taskEntity);
});
// try to acquire next user
// will block until capacity slot is available
next();
});
},
function (err) {
debug('done: %s', err.message);
self.running = false;
self.emit('done');
self.removeAllListeners();
}
);
return false;
};
Scheduler.prototype.acquire = function(callback) {
if (this.tasks.every(is(STATUS.DONE))) {
return callback(null, null);
}
var self = this;
this.capacity.getCapacity(function(err, capacity) {
if (err) {
return callback(err);
}
function addListener() {
self.removeListener('release', releaseListener);
debug('Got a new task');
self.acquire(callback);
}
function releaseListener() {
self.removeListener('add', addListener);
debug('Slot was released');
self.acquire(callback);
}
debug('Trying to acquire task');
var allRunning = self.tasks.every(is(STATUS.RUNNING));
var running = self.tasks.filter(is(STATUS.RUNNING));
debug('[capacity=%d, running=%d, all=%s] candidates=%j', capacity, running.length, allRunning, self.tasks);
if (allRunning && running.length < capacity) {
debug('Waiting for tasks');
self.once('add', addListener);
}
var isRunningAny = self.tasks.some(is(STATUS.RUNNING));
if (isRunningAny || running.length >= capacity) {
debug('Waiting for slot');
return self.once('release', releaseListener);
}
var candidate = self.tasksTree.min();
return callback(null, candidate);
});
};
Scheduler.prototype.release = function(err, taskEntity) {
debug('Released %j', taskEntity);
if (taskEntity.is(STATUS.PENDING)) {
this.tasksTree.insert(taskEntity);
}
this.emit('release');
};
/* Task entities */
var STATUS = {
PENDING: 'pending',
RUNNING: 'running',
DONE: 'done'
};
function TaskEntity(user, createdAt) {
this.user = user;
this.createdAt = createdAt;
this.status = STATUS.PENDING;
this.jobs = 0;
this.runAt = 0;
}
TaskEntity.prototype.is = function(status) {
return this.status === status;
};
TaskEntity.prototype.running = function() {
this.status = STATUS.RUNNING;
this.runAt = Date.now();
};
TaskEntity.prototype.ran = function(userQueueIsEmpty) {
this.jobs++;
this.status = userQueueIsEmpty ? STATUS.DONE : STATUS.PENDING;
};
function is(status) {
return function(taskEntity) {
return taskEntity.is(status);
};
}

View File

@ -34,6 +34,23 @@ module.exports.batch_query_timeout = 12 * 3600 * 1000; // 12 hours in millisecon
module.exports.batch_log_filename = 'logs/batch-queries.log';
// Max number of queued jobs a user can have at a given time
module.exports.batch_max_queued_jobs = 64;
// Capacity strategy to use.
// It allows to tune how many queries run at a db host at the same time.
// Options: 'fixed', 'http-simple', 'http-load'
module.exports.batch_capacity_strategy = 'fixed';
// Applies when strategy='fixed'.
// Number of simultaneous users running queries in the same host.
// It will use 1 as min.
// Default 2.
module.exports.batch_capacity_fixed_amount = 2;
// Applies when strategy='http-simple' or strategy='http-load'.
// HTTP endpoint to check db host load.
// Helps to decide the number of simultaneous users running queries in that host.
// 'http-simple' will use 'available_cores' to decide the number.
// 'http-load' will use 'cores' and 'relative_load' to decide the number.
// It will use 1 as min.
// If no template is provided it will default to 'fixed' strategy.
module.exports.batch_capacity_http_url_template = 'http://<%= dbhost %>:9999/load';
// Max database connections in the pool
// Subsequent connections will wait for a free slot.
// NOTE: not used by OGR-mediated accesses

View File

@ -35,6 +35,23 @@ module.exports.batch_query_timeout = 12 * 3600 * 1000; // 12 hours in millisecon
module.exports.batch_log_filename = 'logs/batch-queries.log';
// Max number of queued jobs a user can have at a given time
module.exports.batch_max_queued_jobs = 64;
// Capacity strategy to use.
// It allows to tune how many queries run at a db host at the same time.
// Options: 'fixed', 'http-simple', 'http-load'
module.exports.batch_capacity_strategy = 'fixed';
// Applies when strategy='fixed'.
// Number of simultaneous users running queries in the same host.
// It will use 1 as min.
// Default 2.
module.exports.batch_capacity_fixed_amount = 2;
// Applies when strategy='http-simple' or strategy='http-load'.
// HTTP endpoint to check db host load.
// Helps to decide the number of simultaneous users running queries in that host.
// 'http-simple' will use 'available_cores' to decide the number.
// 'http-load' will use 'cores' and 'relative_load' to decide the number.
// It will use 1 as min.
// If no template is provided it will default to 'fixed' strategy.
module.exports.batch_capacity_http_url_template = 'http://<%= dbhost %>:9999/load';
// Max database connections in the pool
// Subsequent connections will wait for a free slot.i
// NOTE: not used by OGR-mediated accesses

View File

@ -35,6 +35,23 @@ module.exports.batch_query_timeout = 12 * 3600 * 1000; // 12 hours in millisecon
module.exports.batch_log_filename = 'logs/batch-queries.log';
// Max number of queued jobs a user can have at a given time
module.exports.batch_max_queued_jobs = 64;
// Capacity strategy to use.
// It allows to tune how many queries run at a db host at the same time.
// Options: 'fixed', 'http-simple', 'http-load'
module.exports.batch_capacity_strategy = 'fixed';
// Applies when strategy='fixed'.
// Number of simultaneous users running queries in the same host.
// It will use 1 as min.
// Default 2.
module.exports.batch_capacity_fixed_amount = 2;
// Applies when strategy='http-simple' or strategy='http-load'.
// HTTP endpoint to check db host load.
// Helps to decide the number of simultaneous users running queries in that host.
// 'http-simple' will use 'available_cores' to decide the number.
// 'http-load' will use 'cores' and 'relative_load' to decide the number.
// It will use 1 as min.
// If no template is provided it will default to 'fixed' strategy.
module.exports.batch_capacity_http_url_template = 'http://<%= dbhost %>:9999/load';
// Max database connections in the pool
// Subsequent connections will wait for a free slot.
// NOTE: not used by OGR-mediated accesses

View File

@ -32,6 +32,23 @@ module.exports.batch_query_timeout = 5 * 1000; // 5 seconds in milliseconds
module.exports.batch_log_filename = 'logs/batch-queries.log';
// Max number of queued jobs a user can have at a given time
module.exports.batch_max_queued_jobs = 64;
// Capacity strategy to use.
// It allows to tune how many queries run at a db host at the same time.
// Options: 'fixed', 'http-simple', 'http-load'
module.exports.batch_capacity_strategy = 'fixed';
// Applies when strategy='fixed'.
// Number of simultaneous users running queries in the same host.
// It will use 1 as min.
// Default 2.
module.exports.batch_capacity_fixed_amount = 2;
// Applies when strategy='http-simple' or strategy='http-load'.
// HTTP endpoint to check db host load.
// Helps to decide the number of simultaneous users running queries in that host.
// 'http-simple' will use 'available_cores' to decide the number.
// 'http-load' will use 'cores' and 'relative_load' to decide the number.
// It will use 1 as min.
// If no template is provided it will default to 'fixed' strategy.
module.exports.batch_capacity_http_url_template = 'http://<%= dbhost %>:9999/load';
// Max database connections in the pool
// Subsequent connections will wait for a free slot.
// NOTE: not used by OGR-mediated accesses

388
npm-shrinkwrap.json generated
View File

@ -1,7 +1,12 @@
{
"name": "cartodb_sql_api",
"version": "1.39.2",
"version": "1.40.0",
"dependencies": {
"bintrees": {
"version": "1.0.1",
"from": "bintrees@1.0.1",
"resolved": "https://registry.npmjs.org/bintrees/-/bintrees-1.0.1.tgz"
},
"bunyan": {
"version": "1.8.1",
"from": "bunyan@1.8.1",
@ -52,9 +57,9 @@
"resolved": "https://registry.npmjs.org/glob/-/glob-6.0.4.tgz",
"dependencies": {
"inflight": {
"version": "1.0.5",
"version": "1.0.6",
"from": "inflight@>=1.0.4 <2.0.0",
"resolved": "https://registry.npmjs.org/inflight/-/inflight-1.0.5.tgz",
"resolved": "https://registry.npmjs.org/inflight/-/inflight-1.0.6.tgz",
"dependencies": {
"wrappy": {
"version": "1.0.2",
@ -646,7 +651,7 @@
},
"mime-types": {
"version": "2.1.12",
"from": "mime-types@>=2.1.11 <2.2.0",
"from": "mime-types@>=2.1.7 <2.2.0",
"resolved": "https://registry.npmjs.org/mime-types/-/mime-types-2.1.12.tgz",
"dependencies": {
"mime-db": {
@ -743,6 +748,377 @@
}
}
},
"request": {
"version": "2.75.0",
"from": "request@>=2.75.0 <2.76.0",
"resolved": "https://registry.npmjs.org/request/-/request-2.75.0.tgz",
"dependencies": {
"aws-sign2": {
"version": "0.6.0",
"from": "aws-sign2@>=0.6.0 <0.7.0",
"resolved": "https://registry.npmjs.org/aws-sign2/-/aws-sign2-0.6.0.tgz"
},
"aws4": {
"version": "1.5.0",
"from": "aws4@>=1.2.1 <2.0.0",
"resolved": "https://registry.npmjs.org/aws4/-/aws4-1.5.0.tgz"
},
"bl": {
"version": "1.1.2",
"from": "bl@>=1.1.2 <1.2.0",
"resolved": "https://registry.npmjs.org/bl/-/bl-1.1.2.tgz",
"dependencies": {
"readable-stream": {
"version": "2.0.6",
"from": "readable-stream@>=2.0.5 <2.1.0",
"resolved": "https://registry.npmjs.org/readable-stream/-/readable-stream-2.0.6.tgz",
"dependencies": {
"core-util-is": {
"version": "1.0.2",
"from": "core-util-is@>=1.0.0 <1.1.0",
"resolved": "https://registry.npmjs.org/core-util-is/-/core-util-is-1.0.2.tgz"
},
"inherits": {
"version": "2.0.3",
"from": "inherits@>=2.0.1 <2.1.0",
"resolved": "https://registry.npmjs.org/inherits/-/inherits-2.0.3.tgz"
},
"isarray": {
"version": "1.0.0",
"from": "isarray@>=1.0.0 <1.1.0",
"resolved": "https://registry.npmjs.org/isarray/-/isarray-1.0.0.tgz"
},
"process-nextick-args": {
"version": "1.0.7",
"from": "process-nextick-args@>=1.0.6 <1.1.0",
"resolved": "https://registry.npmjs.org/process-nextick-args/-/process-nextick-args-1.0.7.tgz"
},
"string_decoder": {
"version": "0.10.31",
"from": "string_decoder@>=0.10.0 <0.11.0",
"resolved": "https://registry.npmjs.org/string_decoder/-/string_decoder-0.10.31.tgz"
},
"util-deprecate": {
"version": "1.0.2",
"from": "util-deprecate@>=1.0.1 <1.1.0",
"resolved": "https://registry.npmjs.org/util-deprecate/-/util-deprecate-1.0.2.tgz"
}
}
}
}
},
"caseless": {
"version": "0.11.0",
"from": "caseless@>=0.11.0 <0.12.0",
"resolved": "https://registry.npmjs.org/caseless/-/caseless-0.11.0.tgz"
},
"combined-stream": {
"version": "1.0.5",
"from": "combined-stream@>=1.0.5 <1.1.0",
"resolved": "https://registry.npmjs.org/combined-stream/-/combined-stream-1.0.5.tgz",
"dependencies": {
"delayed-stream": {
"version": "1.0.0",
"from": "delayed-stream@>=1.0.0 <1.1.0",
"resolved": "https://registry.npmjs.org/delayed-stream/-/delayed-stream-1.0.0.tgz"
}
}
},
"extend": {
"version": "3.0.0",
"from": "extend@>=3.0.0 <3.1.0",
"resolved": "https://registry.npmjs.org/extend/-/extend-3.0.0.tgz"
},
"forever-agent": {
"version": "0.6.1",
"from": "forever-agent@>=0.6.1 <0.7.0",
"resolved": "https://registry.npmjs.org/forever-agent/-/forever-agent-0.6.1.tgz"
},
"form-data": {
"version": "2.0.0",
"from": "form-data@>=2.0.0 <2.1.0",
"resolved": "https://registry.npmjs.org/form-data/-/form-data-2.0.0.tgz",
"dependencies": {
"asynckit": {
"version": "0.4.0",
"from": "asynckit@>=0.4.0 <0.5.0",
"resolved": "https://registry.npmjs.org/asynckit/-/asynckit-0.4.0.tgz"
}
}
},
"har-validator": {
"version": "2.0.6",
"from": "har-validator@>=2.0.6 <2.1.0",
"resolved": "https://registry.npmjs.org/har-validator/-/har-validator-2.0.6.tgz",
"dependencies": {
"chalk": {
"version": "1.1.3",
"from": "chalk@>=1.1.1 <2.0.0",
"resolved": "https://registry.npmjs.org/chalk/-/chalk-1.1.3.tgz",
"dependencies": {
"ansi-styles": {
"version": "2.2.1",
"from": "ansi-styles@>=2.2.1 <3.0.0",
"resolved": "https://registry.npmjs.org/ansi-styles/-/ansi-styles-2.2.1.tgz"
},
"escape-string-regexp": {
"version": "1.0.5",
"from": "escape-string-regexp@>=1.0.2 <2.0.0",
"resolved": "https://registry.npmjs.org/escape-string-regexp/-/escape-string-regexp-1.0.5.tgz"
},
"has-ansi": {
"version": "2.0.0",
"from": "has-ansi@>=2.0.0 <3.0.0",
"resolved": "https://registry.npmjs.org/has-ansi/-/has-ansi-2.0.0.tgz",
"dependencies": {
"ansi-regex": {
"version": "2.0.0",
"from": "ansi-regex@>=2.0.0 <3.0.0",
"resolved": "https://registry.npmjs.org/ansi-regex/-/ansi-regex-2.0.0.tgz"
}
}
},
"strip-ansi": {
"version": "3.0.1",
"from": "strip-ansi@>=3.0.0 <4.0.0",
"resolved": "https://registry.npmjs.org/strip-ansi/-/strip-ansi-3.0.1.tgz",
"dependencies": {
"ansi-regex": {
"version": "2.0.0",
"from": "ansi-regex@>=2.0.0 <3.0.0",
"resolved": "https://registry.npmjs.org/ansi-regex/-/ansi-regex-2.0.0.tgz"
}
}
},
"supports-color": {
"version": "2.0.0",
"from": "supports-color@>=2.0.0 <3.0.0",
"resolved": "https://registry.npmjs.org/supports-color/-/supports-color-2.0.0.tgz"
}
}
},
"commander": {
"version": "2.9.0",
"from": "commander@>=2.9.0 <3.0.0",
"resolved": "https://registry.npmjs.org/commander/-/commander-2.9.0.tgz",
"dependencies": {
"graceful-readlink": {
"version": "1.0.1",
"from": "graceful-readlink@>=1.0.0",
"resolved": "https://registry.npmjs.org/graceful-readlink/-/graceful-readlink-1.0.1.tgz"
}
}
},
"is-my-json-valid": {
"version": "2.15.0",
"from": "is-my-json-valid@>=2.12.4 <3.0.0",
"resolved": "https://registry.npmjs.org/is-my-json-valid/-/is-my-json-valid-2.15.0.tgz",
"dependencies": {
"generate-function": {
"version": "2.0.0",
"from": "generate-function@>=2.0.0 <3.0.0",
"resolved": "https://registry.npmjs.org/generate-function/-/generate-function-2.0.0.tgz"
},
"generate-object-property": {
"version": "1.2.0",
"from": "generate-object-property@>=1.1.0 <2.0.0",
"resolved": "https://registry.npmjs.org/generate-object-property/-/generate-object-property-1.2.0.tgz",
"dependencies": {
"is-property": {
"version": "1.0.2",
"from": "is-property@>=1.0.0 <2.0.0",
"resolved": "https://registry.npmjs.org/is-property/-/is-property-1.0.2.tgz"
}
}
},
"jsonpointer": {
"version": "4.0.0",
"from": "jsonpointer@>=4.0.0 <5.0.0",
"resolved": "https://registry.npmjs.org/jsonpointer/-/jsonpointer-4.0.0.tgz"
},
"xtend": {
"version": "4.0.1",
"from": "xtend@>=4.0.0 <5.0.0",
"resolved": "https://registry.npmjs.org/xtend/-/xtend-4.0.1.tgz"
}
}
},
"pinkie-promise": {
"version": "2.0.1",
"from": "pinkie-promise@>=2.0.0 <3.0.0",
"resolved": "https://registry.npmjs.org/pinkie-promise/-/pinkie-promise-2.0.1.tgz",
"dependencies": {
"pinkie": {
"version": "2.0.4",
"from": "pinkie@>=2.0.0 <3.0.0",
"resolved": "https://registry.npmjs.org/pinkie/-/pinkie-2.0.4.tgz"
}
}
}
}
},
"hawk": {
"version": "3.1.3",
"from": "hawk@>=3.1.3 <3.2.0",
"resolved": "https://registry.npmjs.org/hawk/-/hawk-3.1.3.tgz",
"dependencies": {
"hoek": {
"version": "2.16.3",
"from": "hoek@>=2.0.0 <3.0.0",
"resolved": "https://registry.npmjs.org/hoek/-/hoek-2.16.3.tgz"
},
"boom": {
"version": "2.10.1",
"from": "boom@>=2.0.0 <3.0.0",
"resolved": "https://registry.npmjs.org/boom/-/boom-2.10.1.tgz"
},
"cryptiles": {
"version": "2.0.5",
"from": "cryptiles@>=2.0.0 <3.0.0",
"resolved": "https://registry.npmjs.org/cryptiles/-/cryptiles-2.0.5.tgz"
},
"sntp": {
"version": "1.0.9",
"from": "sntp@>=1.0.0 <2.0.0",
"resolved": "https://registry.npmjs.org/sntp/-/sntp-1.0.9.tgz"
}
}
},
"http-signature": {
"version": "1.1.1",
"from": "http-signature@>=1.1.0 <1.2.0",
"resolved": "https://registry.npmjs.org/http-signature/-/http-signature-1.1.1.tgz",
"dependencies": {
"assert-plus": {
"version": "0.2.0",
"from": "assert-plus@>=0.2.0 <0.3.0",
"resolved": "https://registry.npmjs.org/assert-plus/-/assert-plus-0.2.0.tgz"
},
"jsprim": {
"version": "1.3.1",
"from": "jsprim@>=1.2.2 <2.0.0",
"resolved": "https://registry.npmjs.org/jsprim/-/jsprim-1.3.1.tgz",
"dependencies": {
"extsprintf": {
"version": "1.0.2",
"from": "extsprintf@1.0.2",
"resolved": "https://registry.npmjs.org/extsprintf/-/extsprintf-1.0.2.tgz"
},
"json-schema": {
"version": "0.2.3",
"from": "json-schema@0.2.3",
"resolved": "https://registry.npmjs.org/json-schema/-/json-schema-0.2.3.tgz"
},
"verror": {
"version": "1.3.6",
"from": "verror@1.3.6",
"resolved": "https://registry.npmjs.org/verror/-/verror-1.3.6.tgz"
}
}
},
"sshpk": {
"version": "1.10.1",
"from": "sshpk@>=1.7.0 <2.0.0",
"resolved": "https://registry.npmjs.org/sshpk/-/sshpk-1.10.1.tgz",
"dependencies": {
"asn1": {
"version": "0.2.3",
"from": "asn1@>=0.2.3 <0.3.0",
"resolved": "https://registry.npmjs.org/asn1/-/asn1-0.2.3.tgz"
},
"assert-plus": {
"version": "1.0.0",
"from": "assert-plus@>=1.0.0 <2.0.0",
"resolved": "https://registry.npmjs.org/assert-plus/-/assert-plus-1.0.0.tgz"
},
"dashdash": {
"version": "1.14.0",
"from": "dashdash@>=1.12.0 <2.0.0",
"resolved": "https://registry.npmjs.org/dashdash/-/dashdash-1.14.0.tgz"
},
"getpass": {
"version": "0.1.6",
"from": "getpass@>=0.1.1 <0.2.0",
"resolved": "https://registry.npmjs.org/getpass/-/getpass-0.1.6.tgz"
},
"jsbn": {
"version": "0.1.0",
"from": "jsbn@>=0.1.0 <0.2.0",
"resolved": "https://registry.npmjs.org/jsbn/-/jsbn-0.1.0.tgz"
},
"tweetnacl": {
"version": "0.14.3",
"from": "tweetnacl@>=0.14.0 <0.15.0",
"resolved": "https://registry.npmjs.org/tweetnacl/-/tweetnacl-0.14.3.tgz"
},
"jodid25519": {
"version": "1.0.2",
"from": "jodid25519@>=1.0.0 <2.0.0",
"resolved": "https://registry.npmjs.org/jodid25519/-/jodid25519-1.0.2.tgz"
},
"ecc-jsbn": {
"version": "0.1.1",
"from": "ecc-jsbn@>=0.1.1 <0.2.0",
"resolved": "https://registry.npmjs.org/ecc-jsbn/-/ecc-jsbn-0.1.1.tgz"
},
"bcrypt-pbkdf": {
"version": "1.0.0",
"from": "bcrypt-pbkdf@>=1.0.0 <2.0.0",
"resolved": "https://registry.npmjs.org/bcrypt-pbkdf/-/bcrypt-pbkdf-1.0.0.tgz"
}
}
}
}
},
"is-typedarray": {
"version": "1.0.0",
"from": "is-typedarray@>=1.0.0 <1.1.0",
"resolved": "https://registry.npmjs.org/is-typedarray/-/is-typedarray-1.0.0.tgz"
},
"isstream": {
"version": "0.1.2",
"from": "isstream@>=0.1.2 <0.2.0",
"resolved": "https://registry.npmjs.org/isstream/-/isstream-0.1.2.tgz"
},
"json-stringify-safe": {
"version": "5.0.1",
"from": "json-stringify-safe@>=5.0.1 <5.1.0",
"resolved": "https://registry.npmjs.org/json-stringify-safe/-/json-stringify-safe-5.0.1.tgz"
},
"mime-types": {
"version": "2.1.12",
"from": "mime-types@>=2.1.7 <2.2.0",
"resolved": "https://registry.npmjs.org/mime-types/-/mime-types-2.1.12.tgz",
"dependencies": {
"mime-db": {
"version": "1.24.0",
"from": "mime-db@>=1.24.0 <1.25.0",
"resolved": "https://registry.npmjs.org/mime-db/-/mime-db-1.24.0.tgz"
}
}
},
"oauth-sign": {
"version": "0.8.2",
"from": "oauth-sign@>=0.8.1 <0.9.0",
"resolved": "https://registry.npmjs.org/oauth-sign/-/oauth-sign-0.8.2.tgz"
},
"stringstream": {
"version": "0.0.5",
"from": "stringstream@>=0.0.4 <0.1.0",
"resolved": "https://registry.npmjs.org/stringstream/-/stringstream-0.0.5.tgz"
},
"tough-cookie": {
"version": "2.3.1",
"from": "tough-cookie@>=2.3.0 <2.4.0",
"resolved": "https://registry.npmjs.org/tough-cookie/-/tough-cookie-2.3.1.tgz"
},
"tunnel-agent": {
"version": "0.4.3",
"from": "tunnel-agent@>=0.4.1 <0.5.0",
"resolved": "https://registry.npmjs.org/tunnel-agent/-/tunnel-agent-0.4.3.tgz"
}
}
},
"step": {
"version": "0.0.6",
"from": "step@>=0.0.5 <0.1.0",
@ -789,7 +1165,7 @@
"dependencies": {
"strip-ansi": {
"version": "3.0.1",
"from": "strip-ansi@>=3.0.1 <4.0.0",
"from": "strip-ansi@>=3.0.0 <4.0.0",
"resolved": "https://registry.npmjs.org/strip-ansi/-/strip-ansi-3.0.1.tgz",
"dependencies": {
"ansi-regex": {
@ -1066,7 +1442,7 @@
},
"strip-ansi": {
"version": "3.0.1",
"from": "strip-ansi@>=3.0.1 <4.0.0",
"from": "strip-ansi@>=3.0.0 <4.0.0",
"resolved": "https://registry.npmjs.org/strip-ansi/-/strip-ansi-3.0.1.tgz",
"dependencies": {
"ansi-regex": {

View File

@ -5,7 +5,7 @@
"keywords": [
"cartodb"
],
"version": "1.39.2",
"version": "1.40.0",
"repository": {
"type": "git",
"url": "git://github.com/CartoDB/CartoDB-SQL-API.git"
@ -17,6 +17,7 @@
"Sandro Santilli <strk@vizzuality.com>"
],
"dependencies": {
"bintrees": "1.0.1",
"bunyan": "1.8.1",
"cartodb-psql": "~0.6.0",
"cartodb-query-tables": "0.2.0",
@ -33,6 +34,7 @@
"queue-async": "~1.0.7",
"redis-mpool": "0.4.0",
"redlock": "2.0.1",
"request": "~2.75.0",
"step": "~0.0.5",
"step-profiler": "~0.3.0",
"topojson": "0.0.8",
@ -41,7 +43,6 @@
},
"devDependencies": {
"istanbul": "~0.4.2",
"request": "~2.60.0",
"shapefile": "0.3.0",
"mocha": "~1.21.4",
"jshint": "~2.6.0",

View File

@ -0,0 +1,80 @@
require('../../helper');
var assert = require('../../support/assert');
var redisUtils = require('../../support/redis_utils');
var batchFactory = require('../../../batch/index');
var JobPublisher = require('../../../batch/pubsub/job-publisher');
var JobQueue = require('../../../batch/job_queue');
var JobBackend = require('../../../batch/job_backend');
var JobService = require('../../../batch/job_service');
var UserDatabaseMetadataService = require('../../../batch/user_database_metadata_service');
var JobCanceller = require('../../../batch/job_canceller');
var metadataBackend = require('cartodb-redis')({ pool: redisUtils.getPool() });
describe('batch module', function() {
var dbInstance = 'localhost';
var username = 'vizzuality';
var pool = redisUtils.getPool();
var jobPublisher = new JobPublisher(pool);
var jobQueue = new JobQueue(metadataBackend, jobPublisher);
var jobBackend = new JobBackend(metadataBackend, jobQueue);
var userDatabaseMetadataService = new UserDatabaseMetadataService(metadataBackend);
var jobCanceller = new JobCanceller(userDatabaseMetadataService);
var jobService = new JobService(jobBackend, jobCanceller);
before(function (done) {
this.batch = batchFactory(metadataBackend, pool);
this.batch.start();
this.batch.on('ready', done);
});
after(function (done) {
this.batch.stop();
redisUtils.clean('batch:*', done);
});
function createJob(sql, done) {
var data = {
user: username,
query: sql,
host: dbInstance
};
jobService.create(data, function (err, job) {
if (err) {
return done(err);
}
done(null, job.serialize());
});
}
it('should drain the current job', function (done) {
var self = this;
createJob('select pg_sleep(3)', function (err, job) {
if (err) {
return done(err);
}
setTimeout(function () {
jobBackend.get(job.job_id, function (err, job) {
if (err) {
done(err);
}
assert.equal(job.status, 'running');
self.batch.drain(function () {
jobBackend.get(job.job_id, function (err, job) {
if (err) {
done(err);
}
assert.equal(job.status, 'pending');
done();
});
});
});
}, 50);
});
});
});

View File

@ -0,0 +1,236 @@
'use strict';
require('../../helper');
var BatchTestClient = require('../../support/batch-test-client');
var JobStatus = require('../../../batch/job_status');
var assert = require('../../support/assert');
var queue = require('queue-async');
describe('batch multiquery', function() {
function jobPayload(query) {
return {
query: query
};
}
before(function() {
this.batchTestClient = new BatchTestClient();
});
after(function (done) {
this.batchTestClient.drain(done);
});
it('should perform one multiquery job with two queries', function (done) {
var queries = [
'select pg_sleep(0)',
'select pg_sleep(0)'
];
var payload = jobPayload(queries);
this.batchTestClient.createJob(payload, function(err, jobResult) {
if (err) {
return done(err);
}
jobResult.getStatus(function (err, job) {
if (err) {
return done(err);
}
assert.equal(job.status, JobStatus.DONE);
return done();
});
});
});
it('should perform one multiquery job with two queries and fail on last one', function (done) {
var queries = [
'select pg_sleep(0)',
'select shouldFail()'
];
var payload = jobPayload(queries);
this.batchTestClient.createJob(payload, function(err, jobResult) {
if (err) {
return done(err);
}
jobResult.getStatus(function (err, job) {
if (err) {
return done(err);
}
assert.equal(job.status, JobStatus.FAILED);
return done();
});
});
});
it('should perform one multiquery job with three queries and fail on last one', function (done) {
var queries = [
'select pg_sleep(0)',
'select pg_sleep(0)',
'select shouldFail()'
];
var payload = jobPayload(queries);
this.batchTestClient.createJob(payload, function(err, jobResult) {
if (err) {
return done(err);
}
jobResult.getStatus(function (err, job) {
if (err) {
return done(err);
}
assert.equal(job.status, JobStatus.FAILED);
return done();
});
});
});
it('should perform one multiquery job with three queries and fail on second one', function (done) {
var queries = [
'select pg_sleep(0)',
'select shouldFail()',
'select pg_sleep(0)'
];
var payload = jobPayload(queries);
this.batchTestClient.createJob(payload, function(err, jobResult) {
if (err) {
return done(err);
}
jobResult.getStatus(function (err, job) {
if (err) {
return done(err);
}
assert.equal(job.status, JobStatus.FAILED);
return done();
});
});
});
it('should perform two multiquery job with two queries for each one', function (done) {
var self = this;
var jobs = [
[
'select pg_sleep(0)',
'select pg_sleep(0)'
],
[
'select pg_sleep(0)',
'select pg_sleep(0)'
]
];
var jobsQueue = queue(2);
jobs.forEach(function(job) {
jobsQueue.defer(function(payload, done) {
self.batchTestClient.createJob(payload, function(err, jobResult) {
if (err) {
return done(err);
}
jobResult.getStatus(done);
});
}, jobPayload(job));
});
jobsQueue.awaitAll(function (err, jobsCreated) {
if (err) {
return done(err);
}
jobsCreated.forEach(function(job) {
assert.equal(job.status, JobStatus.DONE);
});
return done();
});
});
it('should perform two multiquery job with two queries for each one and fail the first one', function (done) {
var self = this;
var jobs = [
[
'select pg_sleep(0)',
'select shouldFail()'
],
[
'select pg_sleep(0)',
'select pg_sleep(0)'
]
];
var expectedStatus = [JobStatus.FAILED, JobStatus.DONE];
var jobsQueue = queue(2);
jobs.forEach(function(job) {
jobsQueue.defer(function(payload, done) {
self.batchTestClient.createJob(payload, function(err, jobResult) {
if (err) {
return done(err);
}
jobResult.getStatus(done);
});
}, jobPayload(job));
});
jobsQueue.awaitAll(function (err, jobsCreated) {
if (err) {
return done(err);
}
var statuses = jobsCreated.map(function(job) {
return job.status;
});
assert.deepEqual(statuses, expectedStatus);
return done();
});
});
it('should perform two multiquery job with two queries for each one and fail the second one', function (done) {
var self = this;
var jobs = [
[
'select pg_sleep(0)',
'select pg_sleep(0)'
],
[
'select pg_sleep(0)',
'select shouldFail()'
]
];
var expectedStatus = [JobStatus.DONE, JobStatus.FAILED];
var jobsQueue = queue(2);
jobs.forEach(function(job) {
jobsQueue.defer(function(payload, done) {
self.batchTestClient.createJob(payload, function(err, jobResult) {
if (err) {
return done(err);
}
jobResult.getStatus(done);
});
}, jobPayload(job));
});
jobsQueue.awaitAll(function (err, jobsCreated) {
if (err) {
return done(err);
}
var statuses = jobsCreated.map(function(job) {
return job.status;
});
assert.deepEqual(statuses, expectedStatus);
return done();
});
});
});

View File

@ -1,100 +1,77 @@
require('../../helper');
var assert = require('../../support/assert');
var redisUtils = require('../../support/redis_utils');
var _ = require('underscore');
var queue = require('queue-async');
var batchFactory = require('../../../batch/index');
var BatchTestClient = require('../../support/batch-test-client');
var JobStatus = require('../../../batch/job_status');
var JobPublisher = require('../../../batch/pubsub/job-publisher');
var JobQueue = require('../../../batch/job_queue');
var JobBackend = require('../../../batch/job_backend');
var JobService = require('../../../batch/job_service');
var UserDatabaseMetadataService = require('../../../batch/user_database_metadata_service');
var JobCanceller = require('../../../batch/job_canceller');
var metadataBackend = require('cartodb-redis')({ pool: redisUtils.getPool() });
describe('batch happy cases', function() {
describe('batch module', function() {
var dbInstance = 'localhost';
var username = 'vizzuality';
var pool = redisUtils.getPool();
var jobPublisher = new JobPublisher(pool);
var jobQueue = new JobQueue(metadataBackend, jobPublisher);
var jobBackend = new JobBackend(metadataBackend, jobQueue);
var userDatabaseMetadataService = new UserDatabaseMetadataService(metadataBackend);
var jobCanceller = new JobCanceller(userDatabaseMetadataService);
var jobService = new JobService(jobBackend, jobCanceller);
var batch = batchFactory(metadataBackend, pool);
before(function (done) {
batch.start();
batch.on('ready', done);
before(function() {
this.batchTestClient = new BatchTestClient();
});
after(function (done) {
batch.stop();
redisUtils.clean('batch:*', done);
after(function(done) {
this.batchTestClient.drain(done);
});
function createJob(sql, done) {
var data = {
user: username,
query: sql,
host: dbInstance
function jobPayload(query) {
return {
query: query
};
jobService.create(data, function (err, job) {
if (err) {
return done(err);
}
done(null, job.serialize());
});
}
it('should perform job with select', function (done) {
createJob('select * from private_table', function (err, job) {
var payload = jobPayload('select * from private_table');
this.batchTestClient.createJob(payload, function(err, jobResult) {
if (err) {
return done(err);
}
batch.on('job:done', function (job_id) {
if (job_id === job.job_id) {
done();
jobResult.getStatus(function (err, job) {
if (err) {
return done(err);
}
assert.equal(job.status, JobStatus.DONE);
return done();
});
});
});
it('should perform job with select into', function (done) {
createJob('select * into batch_test_table from (select * from private_table) as job', function (err, job) {
var payload = jobPayload('select * into batch_test_table from (select * from private_table) as job');
this.batchTestClient.createJob(payload, function(err, jobResult) {
if (err) {
return done(err);
}
batch.on('job:done', function (job_id) {
if (job_id === job.job_id) {
done();
jobResult.getStatus(function (err, job) {
if (err) {
return done(err);
}
assert.equal(job.status, JobStatus.DONE);
return done();
});
});
});
it('should perform job swith select from result table', function (done) {
createJob('select * from batch_test_table', function (err, job) {
it('should perform job with select from result table', function (done) {
var payload = jobPayload('select * from batch_test_table');
this.batchTestClient.createJob(payload, function(err, jobResult) {
if (err) {
return done(err);
}
batch.on('job:done', function (job_id) {
if (job_id === job.job_id) {
done();
jobResult.getStatus(function (err, job) {
if (err) {
return done(err);
}
assert.equal(job.status, JobStatus.DONE);
return done();
});
});
});
it('should perform all enqueued jobs', function (done) {
var self = this;
var jobs = [
'select * from private_table',
'select * from private_table',
@ -108,10 +85,17 @@ describe('batch module', function() {
'select * from private_table'
];
var jobsQueue = queue(jobs.length);
var jobsQueue = queue(4);
jobs.forEach(function(job) {
jobsQueue.defer(createJob, job);
jobsQueue.defer(function(payload, done) {
self.batchTestClient.createJob(payload, function(err, jobResult) {
if (err) {
return done(err);
}
jobResult.getStatus(done);
});
}, jobPayload(job));
});
jobsQueue.awaitAll(function (err, jobsCreated) {
@ -119,22 +103,17 @@ describe('batch module', function() {
return done(err);
}
var jobsDone = 0;
batch.on('job:done', function (job_id) {
_.find(jobsCreated, function(job) {
if (job_id === job.job_id) {
jobsDone += 1;
if (jobsDone === jobs.length) {
done();
}
}
});
jobsCreated.forEach(function(job) {
assert.equal(job.status, JobStatus.DONE);
});
return done();
});
});
it('should set all job as failed', function (done) {
var self = this;
var jobs = [
'select * from unexistent_table',
'select * from unexistent_table',
@ -148,10 +127,17 @@ describe('batch module', function() {
'select * from unexistent_table'
];
var jobsQueue = queue(jobs.length);
var jobsQueue = queue(4);
jobs.forEach(function(job) {
jobsQueue.defer(createJob, job);
jobsQueue.defer(function(payload, done) {
self.batchTestClient.createJob(payload, function(err, jobResult) {
if (err) {
return done(err);
}
jobResult.getStatus(done);
});
}, jobPayload(job));
});
jobsQueue.awaitAll(function (err, jobsCreated) {
@ -159,84 +145,46 @@ describe('batch module', function() {
return done(err);
}
var jobsFailed = 0;
batch.on('job:failed', function (job_id) {
_.find(jobsCreated, function(job) {
if (job_id === job.job_id) {
jobsFailed += 1;
if (jobsFailed === jobs.length) {
done();
}
}
});
jobsCreated.forEach(function(job) {
assert.equal(job.status, JobStatus.FAILED);
});
});
});
it('should drain the current job', function (done) {
createJob('select pg_sleep(3)', function (err, job) {
if (err) {
return done(err);
}
setTimeout(function () {
jobBackend.get(job.job_id, function (err, job) {
if (err) {
done(err);
}
assert.equal(job.status, 'running');
batch.drain(function () {
jobBackend.get(job.job_id, function (err, job) {
if (err) {
done(err);
}
assert.equal(job.status, 'pending');
done();
});
});
});
}, 50);
return done();
});
});
it('should perform job with array of select', function (done) {
var queries = ['select * from private_table limit 1', 'select * from private_table'];
createJob(queries, function (err, job) {
var payload = jobPayload(queries);
this.batchTestClient.createJob(payload, function(err, jobResult) {
if (err) {
return done(err);
}
var queriesDone = 0;
var checkJobDone = function (job_id) {
if (job_id === job.job_id) {
queriesDone += 1;
if (queriesDone === queries.length) {
done();
}
jobResult.getStatus(function (err, job) {
if (err) {
return done(err);
}
};
batch.on('job:done', checkJobDone);
batch.on('job:pending', checkJobDone);
assert.equal(job.status, JobStatus.DONE);
return done();
});
});
});
it('should set job as failed if last query fails', function (done) {
var queries = ['select * from private_table', 'select * from undefined_table'];
createJob(queries, function (err, job) {
var payload = jobPayload(queries);
this.batchTestClient.createJob(payload, function(err, jobResult) {
if (err) {
return done(err);
}
batch.on('job:failed', function (job_id) {
if (job_id === job.job_id) {
done();
jobResult.getStatus(function (err, job) {
if (err) {
return done(err);
}
assert.equal(job.status, JobStatus.FAILED);
return done();
});
});
});
@ -244,15 +192,17 @@ describe('batch module', function() {
it('should set job as failed if first query fails', function (done) {
var queries = ['select * from undefined_table', 'select * from private_table'];
createJob(queries, function (err, job) {
var payload = jobPayload(queries);
this.batchTestClient.createJob(payload, function(err, jobResult) {
if (err) {
return done(err);
}
batch.on('job:failed', function (job_id) {
if (job_id === job.job_id) {
done();
jobResult.getStatus(function (err, job) {
if (err) {
return done(err);
}
assert.equal(job.status, JobStatus.FAILED);
return done();
});
});
});

View File

@ -11,9 +11,14 @@ describe('multiple batch clients and users, job query order', function() {
this.batchTestClientA = new BatchTestClient({ name: 'consumerA' });
this.batchTestClientB = new BatchTestClient({ name: 'consumerB' });
this.testClientA = new TestClient();
this.testClientA.getResult(
'drop table if exists ordered_inserts; create table ordered_inserts (status numeric)',
this.testClient = new TestClient();
this.testClient.getResult(
[
'drop table if exists ordered_inserts_a',
'drop table if exists ordered_inserts_bbbbb',
'create table ordered_inserts_a (status numeric)',
'create table ordered_inserts_bbbbb (status numeric)'
].join(';'),
done
);
});
@ -36,16 +41,16 @@ describe('multiple batch clients and users, job query order', function() {
it('should run job queries in order (multiple consumers)', function (done) {
var jobRequestA1 = createJob([
"insert into ordered_inserts values(1)",
"insert into ordered_inserts_a values(1)",
"select pg_sleep(0.25)",
"insert into ordered_inserts values(2)"
"insert into ordered_inserts_a values(2)"
]);
var jobRequestA2 = createJob([
"insert into ordered_inserts values(3)"
"insert into ordered_inserts_a values(3)"
]);
var jobRequestB1 = createJob([
"insert into ordered_inserts values(4)"
"insert into ordered_inserts_bbbbb values(1)"
]);
var self = this;
@ -55,14 +60,14 @@ describe('multiple batch clients and users, job query order', function() {
return done(err);
}
// we don't care about the producer
self.batchTestClientB.createJob(jobRequestA2, function(err, jobResultA2) {
var override = { host: 'cartodb250user.cartodb.com' };
self.batchTestClientB.createJob(jobRequestB1, override, function(err, jobResultB1) {
if (err) {
return done(err);
}
var override = { host: 'cartodb250user.cartodb.com' };
self.batchTestClientB.createJob(jobRequestB1, override, function(err, jobResultB1) {
// we don't care about the producer
self.batchTestClientB.createJob(jobRequestA2, function(err, jobResultA2) {
if (err) {
return done(err);
}
@ -77,26 +82,38 @@ describe('multiple batch clients and users, job query order', function() {
}
jobResultB1.getStatus(function(err, jobB1) {
assert.equal(jobA1.status, JobStatus.DONE);
assert.equal(jobA1.status, JobStatus.DONE);
assert.equal(jobA2.status, JobStatus.DONE);
assert.equal(jobB1.status, JobStatus.DONE);
self.testClientA.getResult('select * from ordered_inserts', function(err, rows) {
assert.ok(
new Date(jobA1.updated_at).getTime() < new Date(jobA2.updated_at).getTime(),
'A1 (' + jobA1.updated_at + ') ' +
'should finish before A2 (' + jobA2.updated_at + ')'
);
assert.ok(
new Date(jobB1.updated_at).getTime() < new Date(jobA1.updated_at).getTime(),
'B1 (' + jobA1.updated_at + ') ' +
'should finish before A1 (' + jobA1.updated_at + ')'
);
function statusMapper (status) { return { status: status }; }
self.testClient.getResult('select * from ordered_inserts_a', function(err, rows) {
assert.ok(!err);
// cartodb250user and vizzuality test users share database
var expectedRows = [1, 4, 2, 3].map(function(status) { return {status: status}; });
var expectedRows = [1, 2, 3].map(statusMapper);
assert.deepEqual(rows, expectedRows);
assert.ok(
new Date(jobA1.updated_at).getTime() < new Date(jobA2.updated_at).getTime(),
'A1 (' + jobA1.updated_at + ') ' +
'should finish before A2 (' + jobA2.updated_at + ')'
);
assert.ok(
new Date(jobB1.updated_at).getTime() < new Date(jobA1.updated_at).getTime(),
'B1 (' + jobA1.updated_at + ') ' +
'should finish before A1 (' + jobA1.updated_at + ')'
);
done();
var query = 'select * from ordered_inserts_bbbbb';
self.testClient.getResult(query, override, function(err, rows) {
assert.ok(!err);
var expectedRows = [1].map(statusMapper);
assert.deepEqual(rows, expectedRows);
done();
});
});
});

View File

@ -25,15 +25,17 @@ describe('max queued jobs', function() {
var batch = batchFactory(metadataBackend, redisUtils.getPool());
batch.start();
batch.on('ready', function() {
batch.on('job:done', function() {
self.testClient.getResult('select count(*) from max_queued_jobs_inserts', function(err, rows) {
assert.ok(!err);
assert.equal(rows[0].count, 1);
// this is not ideal as the first job might not be committed yet
setTimeout(function() {
batch.stop(function() {
self.testClient.getResult('select count(*) from max_queued_jobs_inserts', function(err, rows) {
assert.ok(!err);
assert.equal(rows[0].count, 1);
batch.stop();
redisUtils.clean('batch:*', done);
redisUtils.clean('batch:*', done);
});
});
});
}, 100);
});
});

View File

@ -0,0 +1,97 @@
require('../../helper');
var assert = require('../../support/assert');
var TestClient = require('../../support/test-client');
var BatchTestClient = require('../../support/batch-test-client');
var JobStatus = require('../../../batch/job_status');
describe('basic scheduling', function() {
before(function(done) {
this.batchTestClientA = new BatchTestClient({ name: 'consumerA' });
this.batchTestClientB = new BatchTestClient({ name: 'consumerB' });
this.testClient = new TestClient();
this.testClient.getResult(
[
'drop table if exists ordered_inserts_a',
'create table ordered_inserts_a (status numeric)'
].join(';'),
done
);
});
after(function (done) {
this.batchTestClientA.drain(function(err) {
if (err) {
return done(err);
}
this.batchTestClientB.drain(done);
}.bind(this));
});
function createJob(queries) {
return {
query: queries
};
}
it('should run job queries in order (multiple consumers)', function (done) {
var jobRequestA1 = createJob([
"insert into ordered_inserts_a values(1)",
"select pg_sleep(0.25)",
"insert into ordered_inserts_a values(2)"
]);
var jobRequestA2 = createJob([
"insert into ordered_inserts_a values(3)"
]);
var self = this;
this.batchTestClientA.createJob(jobRequestA1, function(err, jobResultA1) {
if (err) {
return done(err);
}
// we don't care about the producer
self.batchTestClientB.createJob(jobRequestA2, function(err, jobResultA2) {
if (err) {
return done(err);
}
jobResultA1.getStatus(function (err, jobA1) {
if (err) {
return done(err);
}
jobResultA2.getStatus(function(err, jobA2) {
if (err) {
return done(err);
}
assert.equal(jobA1.status, JobStatus.DONE);
assert.equal(jobA2.status, JobStatus.DONE);
assert.ok(
new Date(jobA1.updated_at).getTime() < new Date(jobA2.updated_at).getTime(),
'A1 (' + jobA1.updated_at + ') ' +
'should finish before A2 (' + jobA2.updated_at + ')'
);
function statusMapper (status) { return { status: status }; }
self.testClient.getResult('select * from ordered_inserts_a', function(err, rows) {
assert.ok(!err);
// cartodb250user and vizzuality test users share database
var expectedRows = [1, 2, 3].map(statusMapper);
assert.deepEqual(rows, expectedRows);
return done();
});
});
});
});
});
});
});

View File

@ -1,209 +0,0 @@
'use strict';
require('../../helper');
var assert = require('../../support/assert');
var redisUtils = require('../../support/redis_utils');
var queue = require('queue-async');
var metadataBackend = require('cartodb-redis')({ pool: redisUtils.getPool() });
var StatsD = require('node-statsd').StatsD;
var statsdClient = new StatsD(global.settings.statsd);
var BATCH_SOURCE = '../../../batch/';
var batchFactory = require(BATCH_SOURCE + 'index');
var jobStatus = require(BATCH_SOURCE + 'job_status');
var JobPublisher = require(BATCH_SOURCE + 'pubsub/job-publisher');
var JobQueue = require(BATCH_SOURCE + 'job_queue');
var JobBackend = require(BATCH_SOURCE + 'job_backend');
var JobFactory = require(BATCH_SOURCE + 'models/job_factory');
var jobPublisher = new JobPublisher(redisUtils.getPool());
var jobQueue = new JobQueue(metadataBackend, jobPublisher);
var jobBackend = new JobBackend(metadataBackend, jobQueue);
var USER = 'vizzuality';
var HOST = 'localhost';
function createJob(job) {
jobBackend.create(job, function () {});
}
function getJob(job_id, callback) {
jobBackend.get(job_id, function (err, job) {
if (err) {
return callback(err);
}
callback(null, job);
});
}
function assertJob(job, expectedStatus, done) {
return function (job_id) {
if (job.job_id === job_id) {
getJob(job_id, function (err, jobDone) {
if (err) {
return done(err);
}
assert.equal(jobDone.status, expectedStatus);
done();
});
}
};
}
describe('batch multiquery', function() {
var batch = batchFactory(metadataBackend, redisUtils.getPool(), statsdClient);
before(function (done) {
batch.start();
batch.on('ready', done);
});
after(function (done) {
batch.stop();
redisUtils.clean('batch:*', done);
});
it('should perform one multiquery job with two queries', function (done) {
var queries = [
'select pg_sleep(0)',
'select pg_sleep(0)'
];
var job = JobFactory.create({ user: USER, host: HOST, query: queries});
var assertCallback = assertJob(job.data, jobStatus.DONE, done);
batch.on('job:done', assertCallback);
createJob(job.data);
});
it('should perform one multiquery job with two queries and fail on last one', function (done) {
var queries = [
'select pg_sleep(0)',
'select shouldFail()'
];
var job = JobFactory.create({ user: USER, host: HOST, query: queries});
var assertCallback = assertJob(job.data, jobStatus.FAILED, done);
batch.on('job:failed', assertCallback);
createJob(job.data);
});
it('should perform one multiquery job with three queries and fail on last one', function (done) {
var queries = [
'select pg_sleep(0)',
'select pg_sleep(0)',
'select shouldFail()'
];
var job = JobFactory.create({ user: USER, host: HOST, query: queries});
var assertCallback = assertJob(job.data, jobStatus.FAILED, done);
batch.on('job:failed', assertCallback);
createJob(job.data);
});
it('should perform one multiquery job with three queries and fail on second one', function (done) {
var queries = [
'select pg_sleep(0)',
'select shouldFail()',
'select pg_sleep(0)'
];
var job = JobFactory.create({ user: USER, host: HOST, query: queries});
var assertCallback = assertJob(job.data, jobStatus.FAILED, done);
batch.on('job:failed', assertCallback);
createJob(job.data);
});
it('should perform two multiquery job with two queries for each one', function (done) {
var jobs = [];
jobs.push(JobFactory.create({ user: USER, host: HOST, query: [
'select pg_sleep(0)',
'select pg_sleep(0)'
]}));
jobs.push(JobFactory.create({ user: USER, host: HOST, query: [
'select pg_sleep(0)',
'select pg_sleep(0)'
]}));
var jobsQueue = queue(jobs.length);
jobs.forEach(function (job) {
jobsQueue.defer(function (callback) {
batch.on('job:done', assertJob(job.data, jobStatus.DONE, callback));
createJob(job.data);
});
});
jobsQueue.awaitAll(done);
});
it('should perform two multiquery job with two queries for each one and fail the first one', function (done) {
var jobs = [];
jobs.push(JobFactory.create({ user: USER, host: HOST, query: [
'select pg_sleep(0)',
'select shouldFail()'
]}));
jobs.push(JobFactory.create({ user: USER, host: HOST, query: [
'select pg_sleep(0)',
'select pg_sleep(0)'
]}));
var jobsQueue = queue(jobs.length);
jobsQueue.defer(function (callback) {
batch.on('job:failed', assertJob(jobs[0].data, jobStatus.FAILED, callback));
createJob(jobs[0].data);
});
jobsQueue.defer(function (callback) {
batch.on('job:done', assertJob(jobs[1].data, jobStatus.DONE, callback));
createJob(jobs[1].data);
});
jobsQueue.awaitAll(done);
});
it('should perform two multiquery job with two queries for each one and fail the second one', function (done) {
var jobs = [];
jobs.push(JobFactory.create({ user: USER, host: HOST, query: [
'select pg_sleep(0)',
'select pg_sleep(0)'
]}));
jobs.push(JobFactory.create({ user: USER, host: HOST, query: [
'select pg_sleep(0)',
'select shouldFail()'
]}));
var jobsQueue = queue(jobs.length);
jobsQueue.defer(function (callback) {
batch.on('job:done', assertJob(jobs[0].data, jobStatus.DONE, callback));
createJob(jobs[0].data);
});
jobsQueue.defer(function (callback) {
batch.on('job:failed', assertJob(jobs[1].data, jobStatus.FAILED, callback));
createJob(jobs[1].data);
});
jobsQueue.awaitAll(done);
});
});

View File

@ -0,0 +1,105 @@
'use strict';
require('../../helper');
var assert = require('../../support/assert');
var Scheduler = require('../../../batch/scheduler/scheduler');
var FixedCapacity = require('../../../batch/scheduler/capacity/fixed');
describe('scheduler', function() {
function TaskRunner(userTasks) {
this.results = [];
this.userTasks = userTasks;
}
TaskRunner.prototype.run = function(user, callback) {
this.results.push(user);
this.userTasks[user]--;
return callback(null, this.userTasks[user] === 0);
};
// simulate one by one or infinity capacity
var capacities = [new FixedCapacity(1), new FixedCapacity(Infinity)];
capacities.forEach(function(capacity) {
it('should run tasks', function (done) {
var taskRunner = new TaskRunner({
userA: 1
});
var scheduler = new Scheduler(capacity, taskRunner);
scheduler.add('userA');
scheduler.on('done', function() {
var results = taskRunner.results;
assert.equal(results.length, 1);
assert.equal(results[0], 'userA');
return done();
});
scheduler.schedule();
});
it('should run tasks for different users', function (done) {
var taskRunner = new TaskRunner({
userA: 1,
userB: 1,
userC: 1
});
var scheduler = new Scheduler(capacity, taskRunner);
scheduler.add('userA');
scheduler.add('userB');
scheduler.add('userC');
scheduler.on('done', function() {
var results = taskRunner.results;
assert.equal(results.length, 3);
assert.equal(results[0], 'userA');
assert.equal(results[1], 'userB');
assert.equal(results[2], 'userC');
return done();
});
scheduler.schedule();
});
it('should be fair when scheduling tasks', function (done) {
var taskRunner = new TaskRunner({
userA: 3,
userB: 2,
userC: 1
});
var scheduler = new Scheduler(capacity, taskRunner);
scheduler.add('userA');
scheduler.add('userA');
scheduler.add('userA');
scheduler.add('userB');
scheduler.add('userB');
scheduler.add('userC');
scheduler.on('done', function() {
var results = taskRunner.results;
assert.equal(results.length, 6);
assert.equal(results[0], 'userA');
assert.equal(results[1], 'userB');
assert.equal(results[2], 'userC');
assert.equal(results[3], 'userA');
assert.equal(results[4], 'userB');
assert.equal(results[5], 'userA');
return done();
});
scheduler.schedule();
});
});
});

View File

@ -24,13 +24,17 @@ function TestClient(config) {
module.exports = TestClient;
TestClient.prototype.getResult = function(query, callback) {
TestClient.prototype.getResult = function(query, override, callback) {
if (!callback) {
callback = override;
override = {};
}
assert.response(
this.server,
{
url: this.getUrl(),
url: this.getUrl(override),
headers: {
host: this.getHost(),
host: this.getHost(override),
'Content-Type': 'application/json'
},
method: 'POST',
@ -50,10 +54,10 @@ TestClient.prototype.getResult = function(query, callback) {
);
};
TestClient.prototype.getHost = function() {
return this.config.host || 'vizzuality.cartodb.com';
TestClient.prototype.getHost = function(override) {
return override.host || this.config.host || 'vizzuality.cartodb.com';
};
TestClient.prototype.getUrl = function() {
return '/api/v2/sql?api_key=' + (this.config.apiKey || '1234');
TestClient.prototype.getUrl = function(override) {
return '/api/v2/sql?api_key=' + (override.apiKey || this.config.apiKey || '1234');
};