From c6e906d3efbae9c2418e0d7af3e0a6f4004650dc Mon Sep 17 00:00:00 2001 From: Raul Ochoa Date: Mon, 17 Oct 2016 17:48:28 +0200 Subject: [PATCH 01/35] Use same debug group --- batch/leader/provider/redis-distlock.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/batch/leader/provider/redis-distlock.js b/batch/leader/provider/redis-distlock.js index 448a127b..01e944f7 100644 --- a/batch/leader/provider/redis-distlock.js +++ b/batch/leader/provider/redis-distlock.js @@ -6,7 +6,7 @@ var REDIS_DISTLOCK = { }; var Redlock = require('redlock'); -var debug = require('../../util/debug')('redis-distlock'); +var debug = require('../../util/debug')('leader:redis-distlock'); function RedisDistlockLocker(redisPool) { this.pool = redisPool; From a8e03f01c9b67d6b39c627d94f1f95ee706f7284 Mon Sep 17 00:00:00 2001 From: Raul Ochoa Date: Mon, 17 Oct 2016 18:16:52 +0200 Subject: [PATCH 02/35] Add debug information in Jobs Queue --- batch/job_queue.js | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/batch/job_queue.js b/batch/job_queue.js index 2bab17dd..7ed1cc59 100644 --- a/batch/job_queue.js +++ b/batch/job_queue.js @@ -1,5 +1,7 @@ 'use strict'; +var debug = require('./util/debug')('queue'); + function JobQueue(metadataBackend, jobPublisher) { this.metadataBackend = metadataBackend; this.jobPublisher = jobPublisher; @@ -14,16 +16,15 @@ var QUEUE = { module.exports.QUEUE = QUEUE; JobQueue.prototype.enqueue = function (user, jobId, callback) { - var self = this; - + debug('JobQueue.enqueue user=%s, jobId=%s', user, jobId); this.metadataBackend.redisCmd(QUEUE.DB, 'LPUSH', [ QUEUE.PREFIX + user, jobId ], function (err) { if (err) { return callback(err); } - self.jobPublisher.publish(user); + this.jobPublisher.publish(user); callback(); - }); + }.bind(this)); }; JobQueue.prototype.size = function (user, callback) { @@ -31,9 +32,13 @@ JobQueue.prototype.size = function (user, callback) { }; JobQueue.prototype.dequeue = function (user, callback) { - this.metadataBackend.redisCmd(QUEUE.DB, 'RPOP', [ QUEUE.PREFIX + user ], callback); + this.metadataBackend.redisCmd(QUEUE.DB, 'RPOP', [ QUEUE.PREFIX + user ], function(err, jobId) { + debug('JobQueue.dequeued user=%s, jobId=%s', user, jobId); + return callback(err, jobId); + }); }; JobQueue.prototype.enqueueFirst = function (user, jobId, callback) { + debug('JobQueue.enqueueFirst user=%s, jobId=%s', user, jobId); this.metadataBackend.redisCmd(QUEUE.DB, 'RPUSH', [ QUEUE.PREFIX + user, jobId ], callback); }; From 761fbe52056823ce30d48675317f1b18683740a2 Mon Sep 17 00:00:00 2001 From: Raul Ochoa Date: Mon, 17 Oct 2016 18:42:29 +0200 Subject: [PATCH 03/35] Separate job draining from processing --- batch/batch.js | 37 ++++++++++++++++++++++++++----------- 1 file changed, 26 insertions(+), 11 deletions(-) diff --git a/batch/batch.js b/batch/batch.js index 6cf93c5e..41c3ba63 100644 --- a/batch/batch.js +++ b/batch/batch.js @@ -20,8 +20,10 @@ function Batch(name, jobSubscriber, jobQueue, jobRunner, jobService, jobPublishe this.locker = Locker.create('redis-distlock', { pool: redisPool }); this.hostUserQueueMover = new HostUserQueueMover(jobQueue, jobService, this.locker, redisPool); - // map: host => jobId this.workingQueues = {}; + + // map: user => jobId. Will be used for draining jobs. + this.workInProgressJobs = {}; } util.inherits(Batch, EventEmitter); @@ -97,9 +99,11 @@ Batch.prototype.processNextJob = function (user, callback) { return callback(emptyQueueError); } + self.setWorkInProgressJob(user, jobId); self.setProcessingJobId(user, jobId); self.jobRunner.run(jobId, function (err, job) { + self.clearWorkInProgressJob(user); self.setProcessingJobId(user, null); if (err) { @@ -123,7 +127,7 @@ Batch.prototype.processNextJob = function (user, callback) { Batch.prototype.drain = function (callback) { var self = this; - var workingUsers = this.getWorkingUsers(); + var workingUsers = this.getWorkInProgressUsers(); var batchQueues = queue(workingUsers.length); workingUsers.forEach(function (user) { @@ -143,7 +147,7 @@ Batch.prototype.drain = function (callback) { Batch.prototype._drainJob = function (user, callback) { var self = this; - var job_id = this.getProcessingJobId(user); + var job_id = this.getWorkInProgressJob(user); if (!job_id) { return process.nextTick(function () { @@ -173,18 +177,29 @@ Batch.prototype.isProcessingUser = function(user) { return this.workingQueues.hasOwnProperty(user); }; -Batch.prototype.getWorkingUsers = function() { - return Object.keys(this.workingQueues); -}; - Batch.prototype.setProcessingJobId = function(user, jobId) { this.workingQueues[user] = jobId; }; -Batch.prototype.getProcessingJobId = function(user) { - return this.workingQueues[user]; -}; - Batch.prototype.finishedProcessingUser = function(user) { delete this.workingQueues[user]; }; + + +/* Work in progress jobs */ + +Batch.prototype.setWorkInProgressJob = function(user, jobId) { + this.workInProgressJobs[user] = jobId; +}; + +Batch.prototype.getWorkInProgressJob = function(user) { + return this.workInProgressJobs[user]; +}; + +Batch.prototype.clearWorkInProgressJob = function(user) { + delete this.workInProgressJobs[user]; +}; + +Batch.prototype.getWorkInProgressUsers = function() { + return Object.keys(this.workInProgressJobs); +}; From ac7bad43a5c570ebfc83a3e295f1f93c28e46ace Mon Sep 17 00:00:00 2001 From: Raul Ochoa Date: Mon, 17 Oct 2016 19:03:55 +0200 Subject: [PATCH 04/35] Lock by host instead of host + user - Host lock only released if there are no pending jobs. - Will allow to schedule jobs by host. --- batch/batch.js | 46 +++++++++++++++++++++++++++++++--------------- 1 file changed, 31 insertions(+), 15 deletions(-) diff --git a/batch/batch.js b/batch/batch.js index 41c3ba63..0a1816a1 100644 --- a/batch/batch.js +++ b/batch/batch.js @@ -20,7 +20,8 @@ function Batch(name, jobSubscriber, jobQueue, jobRunner, jobService, jobPublishe this.locker = Locker.create('redis-distlock', { pool: redisPool }); this.hostUserQueueMover = new HostUserQueueMover(jobQueue, jobService, this.locker, redisPool); - this.workingQueues = {}; + // map: host => map{user}. Useful to determine pending queued users. + this.workingHosts = {}; // map: user => jobId. Will be used for draining jobs. this.workInProgressJobs = {}; @@ -40,16 +41,17 @@ Batch.prototype.subscribe = function () { this.jobSubscriber.subscribe( function onJobHandler(user, host) { - var resource = host + ':' + user; debug('onJobHandler(%s, %s)', user, host); - if (self.isProcessingUser(user)) { + if (self.isProcessing(host, user)) { return debug('%s is already processing user=%s', self.name, user); } + self.setProcessing(host, user); + // do forever, it does not throw a stack overflow forever( function (next) { - self.locker.lock(resource, function(err) { + self.locker.lock(host, function(err) { // we didn't get the lock for the host if (err) { debug( @@ -67,8 +69,10 @@ Batch.prototype.subscribe = function () { debug(err.name === 'EmptyQueue' ? err.message : err); } - self.finishedProcessingUser(user); - self.locker.unlock(resource, debug); + self.clearProcessing(host, user); + if (!self.hasPendingJobs(host)) { + self.locker.unlock(host, debug); + } } ); }, @@ -100,11 +104,8 @@ Batch.prototype.processNextJob = function (user, callback) { } self.setWorkInProgressJob(user, jobId); - self.setProcessingJobId(user, jobId); - self.jobRunner.run(jobId, function (err, job) { self.clearWorkInProgressJob(user); - self.setProcessingJobId(user, null); if (err) { debug(err); @@ -173,16 +174,31 @@ Batch.prototype.stop = function (callback) { this.jobSubscriber.unsubscribe(callback); }; -Batch.prototype.isProcessingUser = function(user) { - return this.workingQueues.hasOwnProperty(user); + +/* Processing hosts => users */ + +Batch.prototype.setProcessing = function(host, user) { + if (!this.workingHosts.hasOwnProperty(host)) { + this.workingHosts[host] = {}; + } + this.workingHosts[host][user] = true; }; -Batch.prototype.setProcessingJobId = function(user, jobId) { - this.workingQueues[user] = jobId; +Batch.prototype.clearProcessing = function(host, user) { + if (this.workingHosts.hasOwnProperty(host)) { + delete this.workingHosts[host][user]; + if (!this.hasPendingJobs(host)) { + delete this.workingHosts[host]; + } + } }; -Batch.prototype.finishedProcessingUser = function(user) { - delete this.workingQueues[user]; +Batch.prototype.isProcessing = function(host, user) { + return this.workingHosts.hasOwnProperty(host) && this.workingHosts[host].hasOwnProperty(user); +}; + +Batch.prototype.hasPendingJobs = function(host) { + return this.workingHosts.hasOwnProperty(host) && Object.keys(this.workingHosts[host]).length > 0; }; From ef6cd24bf38b5d110380b069b609059306ce5a16 Mon Sep 17 00:00:00 2001 From: Raul Ochoa Date: Tue, 18 Oct 2016 11:18:11 +0200 Subject: [PATCH 05/35] Correct debug --- batch/batch.js | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/batch/batch.js b/batch/batch.js index 0a1816a1..a8cfeefa 100644 --- a/batch/batch.js +++ b/batch/batch.js @@ -43,7 +43,7 @@ Batch.prototype.subscribe = function () { function onJobHandler(user, host) { debug('onJobHandler(%s, %s)', user, host); if (self.isProcessing(host, user)) { - return debug('%s is already processing user=%s', self.name, user); + return debug('%s is already processing host=%s user=%s', self.name, host, user); } self.setProcessing(host, user); @@ -54,13 +54,10 @@ Batch.prototype.subscribe = function () { self.locker.lock(host, function(err) { // we didn't get the lock for the host if (err) { - debug( - 'Could not lock host=%s for user=%s from %s. Reason: %s', - host, self.name, user, err.message - ); + debug('Could not lock host=%s from %s. Reason: %s', host, self.name, err.message); return next(err); } - debug('Locked host=%s for user=%s from %s', host, user, self.name); + debug('Locked host=%s from %s', host, user, self.name); self.processNextJob(user, next); }); }, From a1400e956d8b13fc950ed6214d7e9004e48afdf6 Mon Sep 17 00:00:00 2001 From: Raul Ochoa Date: Tue, 18 Oct 2016 20:01:11 +0200 Subject: [PATCH 06/35] Don't rely on batch.on(job:status) --- test/acceptance/batch/batch-drain.test.js | 80 ++++++++ test/acceptance/batch/batch.test.js | 218 +++++++++------------- 2 files changed, 164 insertions(+), 134 deletions(-) create mode 100644 test/acceptance/batch/batch-drain.test.js diff --git a/test/acceptance/batch/batch-drain.test.js b/test/acceptance/batch/batch-drain.test.js new file mode 100644 index 00000000..9a1cf0d2 --- /dev/null +++ b/test/acceptance/batch/batch-drain.test.js @@ -0,0 +1,80 @@ +require('../../helper'); +var assert = require('../../support/assert'); +var redisUtils = require('../../support/redis_utils'); +var batchFactory = require('../../../batch/index'); + +var JobPublisher = require('../../../batch/pubsub/job-publisher'); +var JobQueue = require('../../../batch/job_queue'); +var JobBackend = require('../../../batch/job_backend'); +var JobService = require('../../../batch/job_service'); +var UserDatabaseMetadataService = require('../../../batch/user_database_metadata_service'); +var JobCanceller = require('../../../batch/job_canceller'); +var metadataBackend = require('cartodb-redis')({ pool: redisUtils.getPool() }); + +describe('batch module', function() { + var dbInstance = 'localhost'; + var username = 'vizzuality'; + var pool = redisUtils.getPool(); + var jobPublisher = new JobPublisher(pool); + var jobQueue = new JobQueue(metadataBackend, jobPublisher); + var jobBackend = new JobBackend(metadataBackend, jobQueue); + var userDatabaseMetadataService = new UserDatabaseMetadataService(metadataBackend); + var jobCanceller = new JobCanceller(userDatabaseMetadataService); + var jobService = new JobService(jobBackend, jobCanceller); + + before(function (done) { + this.batch = batchFactory(metadataBackend, pool); + this.batch.start(); + this.batch.on('ready', done); + }); + + after(function (done) { + this.batch.stop(); + redisUtils.clean('batch:*', done); + }); + + function createJob(sql, done) { + var data = { + user: username, + query: sql, + host: dbInstance + }; + + jobService.create(data, function (err, job) { + if (err) { + return done(err); + } + + done(null, job.serialize()); + }); + } + + it('should drain the current job', function (done) { + var self = this; + createJob('select pg_sleep(3)', function (err, job) { + if (err) { + return done(err); + } + setTimeout(function () { + jobBackend.get(job.job_id, function (err, job) { + if (err) { + done(err); + } + + assert.equal(job.status, 'running'); + + self.batch.drain(function () { + jobBackend.get(job.job_id, function (err, job) { + if (err) { + done(err); + } + assert.equal(job.status, 'pending'); + done(); + }); + }); + }); + }, 50); + }); + }); + +}); diff --git a/test/acceptance/batch/batch.test.js b/test/acceptance/batch/batch.test.js index eedcddb9..cdf5835f 100644 --- a/test/acceptance/batch/batch.test.js +++ b/test/acceptance/batch/batch.test.js @@ -1,100 +1,77 @@ require('../../helper'); + var assert = require('../../support/assert'); -var redisUtils = require('../../support/redis_utils'); -var _ = require('underscore'); var queue = require('queue-async'); -var batchFactory = require('../../../batch/index'); +var BatchTestClient = require('../../support/batch-test-client'); +var JobStatus = require('../../../batch/job_status'); -var JobPublisher = require('../../../batch/pubsub/job-publisher'); -var JobQueue = require('../../../batch/job_queue'); -var JobBackend = require('../../../batch/job_backend'); -var JobService = require('../../../batch/job_service'); -var UserDatabaseMetadataService = require('../../../batch/user_database_metadata_service'); -var JobCanceller = require('../../../batch/job_canceller'); -var metadataBackend = require('cartodb-redis')({ pool: redisUtils.getPool() }); +describe('batch happy cases', function() { -describe('batch module', function() { - var dbInstance = 'localhost'; - var username = 'vizzuality'; - var pool = redisUtils.getPool(); - var jobPublisher = new JobPublisher(pool); - var jobQueue = new JobQueue(metadataBackend, jobPublisher); - var jobBackend = new JobBackend(metadataBackend, jobQueue); - var userDatabaseMetadataService = new UserDatabaseMetadataService(metadataBackend); - var jobCanceller = new JobCanceller(userDatabaseMetadataService); - var jobService = new JobService(jobBackend, jobCanceller); - - var batch = batchFactory(metadataBackend, pool); - - before(function (done) { - batch.start(); - batch.on('ready', done); + before(function() { + this.batchTestClient = new BatchTestClient(); }); - after(function (done) { - batch.stop(); - redisUtils.clean('batch:*', done); + after(function(done) { + this.batchTestClient.drain(done); }); - function createJob(sql, done) { - var data = { - user: username, - query: sql, - host: dbInstance + function jobPayload(query) { + return { + query: query }; - - jobService.create(data, function (err, job) { - if (err) { - return done(err); - } - - done(null, job.serialize()); - }); } it('should perform job with select', function (done) { - createJob('select * from private_table', function (err, job) { + var payload = jobPayload('select * from private_table'); + this.batchTestClient.createJob(payload, function(err, jobResult) { if (err) { return done(err); } - - batch.on('job:done', function (job_id) { - if (job_id === job.job_id) { - done(); + jobResult.getStatus(function (err, job) { + if (err) { + return done(err); } + assert.equal(job.status, JobStatus.DONE); + return done(); }); }); }); it('should perform job with select into', function (done) { - createJob('select * into batch_test_table from (select * from private_table) as job', function (err, job) { + var payload = jobPayload('select * into batch_test_table from (select * from private_table) as job'); + this.batchTestClient.createJob(payload, function(err, jobResult) { if (err) { return done(err); } - - batch.on('job:done', function (job_id) { - if (job_id === job.job_id) { - done(); + jobResult.getStatus(function (err, job) { + if (err) { + return done(err); } + assert.equal(job.status, JobStatus.DONE); + return done(); }); }); }); - it('should perform job swith select from result table', function (done) { - createJob('select * from batch_test_table', function (err, job) { + it('should perform job with select from result table', function (done) { + var payload = jobPayload('select * from batch_test_table'); + this.batchTestClient.createJob(payload, function(err, jobResult) { if (err) { return done(err); } - - batch.on('job:done', function (job_id) { - if (job_id === job.job_id) { - done(); + jobResult.getStatus(function (err, job) { + if (err) { + return done(err); } + assert.equal(job.status, JobStatus.DONE); + return done(); }); }); }); it('should perform all enqueued jobs', function (done) { + var self = this; + var jobs = [ 'select * from private_table', 'select * from private_table', @@ -108,10 +85,17 @@ describe('batch module', function() { 'select * from private_table' ]; - var jobsQueue = queue(jobs.length); + var jobsQueue = queue(4); jobs.forEach(function(job) { - jobsQueue.defer(createJob, job); + jobsQueue.defer(function(payload, done) { + self.batchTestClient.createJob(payload, function(err, jobResult) { + if (err) { + return done(err); + } + jobResult.getStatus(done); + }); + }, jobPayload(job)); }); jobsQueue.awaitAll(function (err, jobsCreated) { @@ -119,22 +103,17 @@ describe('batch module', function() { return done(err); } - var jobsDone = 0; - - batch.on('job:done', function (job_id) { - _.find(jobsCreated, function(job) { - if (job_id === job.job_id) { - jobsDone += 1; - if (jobsDone === jobs.length) { - done(); - } - } - }); + jobsCreated.forEach(function(job) { + assert.equal(job.status, JobStatus.DONE); }); + + return done(); }); }); it('should set all job as failed', function (done) { + var self = this; + var jobs = [ 'select * from unexistent_table', 'select * from unexistent_table', @@ -148,10 +127,17 @@ describe('batch module', function() { 'select * from unexistent_table' ]; - var jobsQueue = queue(jobs.length); + var jobsQueue = queue(4); jobs.forEach(function(job) { - jobsQueue.defer(createJob, job); + jobsQueue.defer(function(payload, done) { + self.batchTestClient.createJob(payload, function(err, jobResult) { + if (err) { + return done(err); + } + jobResult.getStatus(done); + }); + }, jobPayload(job)); }); jobsQueue.awaitAll(function (err, jobsCreated) { @@ -159,84 +145,46 @@ describe('batch module', function() { return done(err); } - var jobsFailed = 0; - - batch.on('job:failed', function (job_id) { - _.find(jobsCreated, function(job) { - if (job_id === job.job_id) { - jobsFailed += 1; - if (jobsFailed === jobs.length) { - done(); - } - } - }); + jobsCreated.forEach(function(job) { + assert.equal(job.status, JobStatus.FAILED); }); - }); - }); - it('should drain the current job', function (done) { - createJob('select pg_sleep(3)', function (err, job) { - if (err) { - return done(err); - } - setTimeout(function () { - jobBackend.get(job.job_id, function (err, job) { - if (err) { - done(err); - } - - assert.equal(job.status, 'running'); - - batch.drain(function () { - jobBackend.get(job.job_id, function (err, job) { - if (err) { - done(err); - } - assert.equal(job.status, 'pending'); - done(); - }); - }); - }); - }, 50); + return done(); }); }); it('should perform job with array of select', function (done) { var queries = ['select * from private_table limit 1', 'select * from private_table']; - createJob(queries, function (err, job) { + var payload = jobPayload(queries); + this.batchTestClient.createJob(payload, function(err, jobResult) { if (err) { return done(err); } - - var queriesDone = 0; - - var checkJobDone = function (job_id) { - if (job_id === job.job_id) { - queriesDone += 1; - if (queriesDone === queries.length) { - done(); - } + jobResult.getStatus(function (err, job) { + if (err) { + return done(err); } - }; - - batch.on('job:done', checkJobDone); - batch.on('job:pending', checkJobDone); + assert.equal(job.status, JobStatus.DONE); + return done(); + }); }); }); it('should set job as failed if last query fails', function (done) { var queries = ['select * from private_table', 'select * from undefined_table']; - createJob(queries, function (err, job) { + var payload = jobPayload(queries); + this.batchTestClient.createJob(payload, function(err, jobResult) { if (err) { return done(err); } - - batch.on('job:failed', function (job_id) { - if (job_id === job.job_id) { - done(); + jobResult.getStatus(function (err, job) { + if (err) { + return done(err); } + assert.equal(job.status, JobStatus.FAILED); + return done(); }); }); }); @@ -244,15 +192,17 @@ describe('batch module', function() { it('should set job as failed if first query fails', function (done) { var queries = ['select * from undefined_table', 'select * from private_table']; - createJob(queries, function (err, job) { + var payload = jobPayload(queries); + this.batchTestClient.createJob(payload, function(err, jobResult) { if (err) { return done(err); } - - batch.on('job:failed', function (job_id) { - if (job_id === job.job_id) { - done(); + jobResult.getStatus(function (err, job) { + if (err) { + return done(err); } + assert.equal(job.status, JobStatus.FAILED); + return done(); }); }); }); From a29f847767ee06eca83f063e55986c4b2ca62d62 Mon Sep 17 00:00:00 2001 From: Raul Ochoa Date: Tue, 18 Oct 2016 20:05:57 +0200 Subject: [PATCH 07/35] Don't rely on batch.on(job:status) --- test/acceptance/batch/queued-jobs-limit.test.js | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/test/acceptance/batch/queued-jobs-limit.test.js b/test/acceptance/batch/queued-jobs-limit.test.js index d94577f5..f62089b8 100644 --- a/test/acceptance/batch/queued-jobs-limit.test.js +++ b/test/acceptance/batch/queued-jobs-limit.test.js @@ -25,15 +25,17 @@ describe('max queued jobs', function() { var batch = batchFactory(metadataBackend, redisUtils.getPool()); batch.start(); batch.on('ready', function() { - batch.on('job:done', function() { - self.testClient.getResult('select count(*) from max_queued_jobs_inserts', function(err, rows) { - assert.ok(!err); - assert.equal(rows[0].count, 1); + // this is not ideal as the first job might not be committed yet + setTimeout(function() { + batch.stop(function() { + self.testClient.getResult('select count(*) from max_queued_jobs_inserts', function(err, rows) { + assert.ok(!err); + assert.equal(rows[0].count, 1); - batch.stop(); - redisUtils.clean('batch:*', done); + redisUtils.clean('batch:*', done); + }); }); - }); + }, 100); }); }); From 1e490be0a152b4323f00d94d53adfcca7ebf03c1 Mon Sep 17 00:00:00 2001 From: Raul Ochoa Date: Tue, 18 Oct 2016 20:18:49 +0200 Subject: [PATCH 08/35] Don't rely on batch.on(job:status) --- .../acceptance/batch/batch.multiquery.test.js | 236 ++++++++++++++++++ .../batch/batch.multiquery.test.js | 209 ---------------- 2 files changed, 236 insertions(+), 209 deletions(-) create mode 100644 test/acceptance/batch/batch.multiquery.test.js delete mode 100644 test/integration/batch/batch.multiquery.test.js diff --git a/test/acceptance/batch/batch.multiquery.test.js b/test/acceptance/batch/batch.multiquery.test.js new file mode 100644 index 00000000..b512ed12 --- /dev/null +++ b/test/acceptance/batch/batch.multiquery.test.js @@ -0,0 +1,236 @@ +'use strict'; + +require('../../helper'); + +var BatchTestClient = require('../../support/batch-test-client'); +var JobStatus = require('../../../batch/job_status'); + +var assert = require('../../support/assert'); +var queue = require('queue-async'); + +describe('batch multiquery', function() { + function jobPayload(query) { + return { + query: query + }; + } + + before(function() { + this.batchTestClient = new BatchTestClient(); + }); + + after(function (done) { + this.batchTestClient.drain(done); + }); + + it('should perform one multiquery job with two queries', function (done) { + var queries = [ + 'select pg_sleep(0)', + 'select pg_sleep(0)' + ]; + + var payload = jobPayload(queries); + this.batchTestClient.createJob(payload, function(err, jobResult) { + if (err) { + return done(err); + } + jobResult.getStatus(function (err, job) { + if (err) { + return done(err); + } + assert.equal(job.status, JobStatus.DONE); + return done(); + }); + }); + }); + + it('should perform one multiquery job with two queries and fail on last one', function (done) { + var queries = [ + 'select pg_sleep(0)', + 'select shouldFail()' + ]; + + var payload = jobPayload(queries); + this.batchTestClient.createJob(payload, function(err, jobResult) { + if (err) { + return done(err); + } + jobResult.getStatus(function (err, job) { + if (err) { + return done(err); + } + assert.equal(job.status, JobStatus.FAILED); + return done(); + }); + }); + }); + + it('should perform one multiquery job with three queries and fail on last one', function (done) { + var queries = [ + 'select pg_sleep(0)', + 'select pg_sleep(0)', + 'select shouldFail()' + ]; + + var payload = jobPayload(queries); + this.batchTestClient.createJob(payload, function(err, jobResult) { + if (err) { + return done(err); + } + jobResult.getStatus(function (err, job) { + if (err) { + return done(err); + } + assert.equal(job.status, JobStatus.FAILED); + return done(); + }); + }); + }); + + + it('should perform one multiquery job with three queries and fail on second one', function (done) { + var queries = [ + 'select pg_sleep(0)', + 'select shouldFail()', + 'select pg_sleep(0)' + ]; + + var payload = jobPayload(queries); + this.batchTestClient.createJob(payload, function(err, jobResult) { + if (err) { + return done(err); + } + jobResult.getStatus(function (err, job) { + if (err) { + return done(err); + } + assert.equal(job.status, JobStatus.FAILED); + return done(); + }); + }); + }); + + it('should perform two multiquery job with two queries for each one', function (done) { + var self = this; + + var jobs = [ + [ + 'select pg_sleep(0)', + 'select pg_sleep(0)' + ], + [ + 'select pg_sleep(0)', + 'select pg_sleep(0)' + ] + ]; + + var jobsQueue = queue(2); + + jobs.forEach(function(job) { + jobsQueue.defer(function(payload, done) { + self.batchTestClient.createJob(payload, function(err, jobResult) { + if (err) { + return done(err); + } + jobResult.getStatus(done); + }); + }, jobPayload(job)); + }); + + jobsQueue.awaitAll(function (err, jobsCreated) { + if (err) { + return done(err); + } + + jobsCreated.forEach(function(job) { + assert.equal(job.status, JobStatus.DONE); + }); + + return done(); + }); + }); + + it('should perform two multiquery job with two queries for each one and fail the first one', function (done) { + var self = this; + + var jobs = [ + [ + 'select pg_sleep(0)', + 'select shouldFail()' + ], + [ + 'select pg_sleep(0)', + 'select pg_sleep(0)' + ] + ]; + + var expectedStatus = [JobStatus.FAILED, JobStatus.DONE]; + var jobsQueue = queue(2); + + jobs.forEach(function(job) { + jobsQueue.defer(function(payload, done) { + self.batchTestClient.createJob(payload, function(err, jobResult) { + if (err) { + return done(err); + } + jobResult.getStatus(done); + }); + }, jobPayload(job)); + }); + + jobsQueue.awaitAll(function (err, jobsCreated) { + if (err) { + return done(err); + } + + var statuses = jobsCreated.map(function(job) { + return job.status; + }); + assert.deepEqual(statuses, expectedStatus); + + return done(); + }); + }); + + it('should perform two multiquery job with two queries for each one and fail the second one', function (done) { + var self = this; + + var jobs = [ + [ + 'select pg_sleep(0)', + 'select pg_sleep(0)' + ], + [ + 'select pg_sleep(0)', + 'select shouldFail()' + ] + ]; + + var expectedStatus = [JobStatus.DONE, JobStatus.FAILED]; + var jobsQueue = queue(2); + + jobs.forEach(function(job) { + jobsQueue.defer(function(payload, done) { + self.batchTestClient.createJob(payload, function(err, jobResult) { + if (err) { + return done(err); + } + jobResult.getStatus(done); + }); + }, jobPayload(job)); + }); + + jobsQueue.awaitAll(function (err, jobsCreated) { + if (err) { + return done(err); + } + + var statuses = jobsCreated.map(function(job) { + return job.status; + }); + assert.deepEqual(statuses, expectedStatus); + + return done(); + }); + }); +}); diff --git a/test/integration/batch/batch.multiquery.test.js b/test/integration/batch/batch.multiquery.test.js deleted file mode 100644 index 88acd796..00000000 --- a/test/integration/batch/batch.multiquery.test.js +++ /dev/null @@ -1,209 +0,0 @@ - 'use strict'; - -require('../../helper'); -var assert = require('../../support/assert'); -var redisUtils = require('../../support/redis_utils'); -var queue = require('queue-async'); - -var metadataBackend = require('cartodb-redis')({ pool: redisUtils.getPool() }); -var StatsD = require('node-statsd').StatsD; -var statsdClient = new StatsD(global.settings.statsd); - -var BATCH_SOURCE = '../../../batch/'; -var batchFactory = require(BATCH_SOURCE + 'index'); - -var jobStatus = require(BATCH_SOURCE + 'job_status'); -var JobPublisher = require(BATCH_SOURCE + 'pubsub/job-publisher'); -var JobQueue = require(BATCH_SOURCE + 'job_queue'); -var JobBackend = require(BATCH_SOURCE + 'job_backend'); -var JobFactory = require(BATCH_SOURCE + 'models/job_factory'); - -var jobPublisher = new JobPublisher(redisUtils.getPool()); -var jobQueue = new JobQueue(metadataBackend, jobPublisher); -var jobBackend = new JobBackend(metadataBackend, jobQueue); - -var USER = 'vizzuality'; -var HOST = 'localhost'; - -function createJob(job) { - jobBackend.create(job, function () {}); -} - -function getJob(job_id, callback) { - jobBackend.get(job_id, function (err, job) { - if (err) { - return callback(err); - } - - callback(null, job); - }); -} - -function assertJob(job, expectedStatus, done) { - return function (job_id) { - if (job.job_id === job_id) { - getJob(job_id, function (err, jobDone) { - if (err) { - return done(err); - } - - assert.equal(jobDone.status, expectedStatus); - done(); - }); - } - }; -} - -describe('batch multiquery', function() { - var batch = batchFactory(metadataBackend, redisUtils.getPool(), statsdClient); - - before(function (done) { - batch.start(); - batch.on('ready', done); - }); - - after(function (done) { - batch.stop(); - redisUtils.clean('batch:*', done); - }); - - it('should perform one multiquery job with two queries', function (done) { - var queries = [ - 'select pg_sleep(0)', - 'select pg_sleep(0)' - ]; - - var job = JobFactory.create({ user: USER, host: HOST, query: queries}); - var assertCallback = assertJob(job.data, jobStatus.DONE, done); - - batch.on('job:done', assertCallback); - - createJob(job.data); - }); - - it('should perform one multiquery job with two queries and fail on last one', function (done) { - var queries = [ - 'select pg_sleep(0)', - 'select shouldFail()' - ]; - - var job = JobFactory.create({ user: USER, host: HOST, query: queries}); - var assertCallback = assertJob(job.data, jobStatus.FAILED, done); - - batch.on('job:failed', assertCallback); - - createJob(job.data); - }); - - it('should perform one multiquery job with three queries and fail on last one', function (done) { - var queries = [ - 'select pg_sleep(0)', - 'select pg_sleep(0)', - 'select shouldFail()' - ]; - - var job = JobFactory.create({ user: USER, host: HOST, query: queries}); - var assertCallback = assertJob(job.data, jobStatus.FAILED, done); - - batch.on('job:failed', assertCallback); - - createJob(job.data); - }); - - - it('should perform one multiquery job with three queries and fail on second one', function (done) { - var queries = [ - 'select pg_sleep(0)', - 'select shouldFail()', - 'select pg_sleep(0)' - ]; - - var job = JobFactory.create({ user: USER, host: HOST, query: queries}); - var assertCallback = assertJob(job.data, jobStatus.FAILED, done); - - batch.on('job:failed', assertCallback); - - createJob(job.data); - }); - - it('should perform two multiquery job with two queries for each one', function (done) { - var jobs = []; - - jobs.push(JobFactory.create({ user: USER, host: HOST, query: [ - 'select pg_sleep(0)', - 'select pg_sleep(0)' - ]})); - - jobs.push(JobFactory.create({ user: USER, host: HOST, query: [ - 'select pg_sleep(0)', - 'select pg_sleep(0)' - ]})); - - var jobsQueue = queue(jobs.length); - - jobs.forEach(function (job) { - jobsQueue.defer(function (callback) { - batch.on('job:done', assertJob(job.data, jobStatus.DONE, callback)); - createJob(job.data); - }); - }); - - jobsQueue.awaitAll(done); - }); - - it('should perform two multiquery job with two queries for each one and fail the first one', function (done) { - var jobs = []; - - jobs.push(JobFactory.create({ user: USER, host: HOST, query: [ - 'select pg_sleep(0)', - 'select shouldFail()' - ]})); - - jobs.push(JobFactory.create({ user: USER, host: HOST, query: [ - 'select pg_sleep(0)', - 'select pg_sleep(0)' - ]})); - - var jobsQueue = queue(jobs.length); - - jobsQueue.defer(function (callback) { - batch.on('job:failed', assertJob(jobs[0].data, jobStatus.FAILED, callback)); - createJob(jobs[0].data); - }); - - jobsQueue.defer(function (callback) { - batch.on('job:done', assertJob(jobs[1].data, jobStatus.DONE, callback)); - createJob(jobs[1].data); - }); - - jobsQueue.awaitAll(done); - }); - - it('should perform two multiquery job with two queries for each one and fail the second one', function (done) { - var jobs = []; - - jobs.push(JobFactory.create({ user: USER, host: HOST, query: [ - 'select pg_sleep(0)', - 'select pg_sleep(0)' - ]})); - - jobs.push(JobFactory.create({ user: USER, host: HOST, query: [ - 'select pg_sleep(0)', - 'select shouldFail()' - ]})); - - var jobsQueue = queue(jobs.length); - - jobsQueue.defer(function (callback) { - batch.on('job:done', assertJob(jobs[0].data, jobStatus.DONE, callback)); - createJob(jobs[0].data); - }); - - jobsQueue.defer(function (callback) { - batch.on('job:failed', assertJob(jobs[1].data, jobStatus.FAILED, callback)); - createJob(jobs[1].data); - }); - - jobsQueue.awaitAll(done); - }); -}); From d1e3be2e2241adf3a32795dd24e9f58dc4c6754c Mon Sep 17 00:00:00 2001 From: Raul Ochoa Date: Tue, 18 Oct 2016 20:19:44 +0200 Subject: [PATCH 09/35] Do not emit job:status from batch --- batch/batch.js | 2 -- 1 file changed, 2 deletions(-) diff --git a/batch/batch.js b/batch/batch.js index a8cfeefa..452bb9e9 100644 --- a/batch/batch.js +++ b/batch/batch.js @@ -116,8 +116,6 @@ Batch.prototype.processNextJob = function (user, callback) { self.logger.log(job); - self.emit('job:' + job.data.status, jobId); - callback(); }); }); From dce051d52be7c2ce4bf9d0d3c4949dd0eaa6393b Mon Sep 17 00:00:00 2001 From: Raul Ochoa Date: Tue, 18 Oct 2016 20:34:22 +0200 Subject: [PATCH 10/35] Make leader locker to emit on renewal errors --- batch/leader/locker.js | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/batch/leader/locker.js b/batch/leader/locker.js index 938cab71..463e74ce 100644 --- a/batch/leader/locker.js +++ b/batch/leader/locker.js @@ -2,17 +2,21 @@ var RedisDistlockLocker = require('./provider/redis-distlock'); var debug = require('../util/debug')('leader-locker'); +var EventEmitter = require('events').EventEmitter; +var util = require('util'); var LOCK = { TTL: 5000 }; function Locker(locker, ttl) { + EventEmitter.call(this); this.locker = locker; this.ttl = (Number.isFinite(ttl) && ttl > 0) ? ttl : LOCK.TTL; this.renewInterval = this.ttl / 5; this.intervalIds = {}; } +util.inherits(Locker, EventEmitter); module.exports = Locker; @@ -43,6 +47,7 @@ Locker.prototype.startRenewal = function(resource) { debug('Trying to extend lock resource=%s', resource); self.locker.lock(resource, self.ttl, function(err, _lock) { if (err) { + self.emit('error', err, resource); return self.stopRenewal(resource); } if (_lock) { From 3a57331a5420a22bc6ceac793c2750dd86067054 Mon Sep 17 00:00:00 2001 From: Raul Ochoa Date: Tue, 18 Oct 2016 20:43:15 +0200 Subject: [PATCH 11/35] Delegate job scheduling There is a host scheduler managing the host locking. When it can acquire a lock over the host it will delegate all the tasks related to that host to the same scheduler. This scheduler will take care of how many jobs it will submit, and in which order. It's also responsible for guaranteeing the execution order per user. Capacity planner dictates how many jobs can be run at the same time in a given host. There are two simple strategies: 1. Infinity: it will attempt to run as many jobs as different users. 2. One: it will run just one job at the same time. Missing things: - Handle lock renewal failures. - Fair scheduling for pending/waiting users. - Capacity based on real resources. --- batch/batch.js | 90 +++------------ batch/scheduler/capacity/infinity.js | 11 ++ batch/scheduler/capacity/one.js | 11 ++ batch/scheduler/host-scheduler.js | 59 ++++++++++ batch/scheduler/scheduler.js | 163 +++++++++++++++++++++++++++ 5 files changed, 261 insertions(+), 73 deletions(-) create mode 100644 batch/scheduler/capacity/infinity.js create mode 100644 batch/scheduler/capacity/one.js create mode 100644 batch/scheduler/host-scheduler.js create mode 100644 batch/scheduler/scheduler.js diff --git a/batch/batch.js b/batch/batch.js index 452bb9e9..fbc7eb84 100644 --- a/batch/batch.js +++ b/batch/batch.js @@ -3,10 +3,11 @@ var util = require('util'); var EventEmitter = require('events').EventEmitter; var debug = require('./util/debug')('batch'); -var forever = require('./util/forever'); var queue = require('queue-async'); -var Locker = require('./leader/locker'); var HostUserQueueMover = require('./maintenance/host-user-queue-mover'); +var HostScheduler = require('./scheduler/host-scheduler'); + +var EMPTY_QUEUE = true; function Batch(name, jobSubscriber, jobQueue, jobRunner, jobService, jobPublisher, redisPool, logger) { EventEmitter.call(this); @@ -17,12 +18,9 @@ function Batch(name, jobSubscriber, jobQueue, jobRunner, jobService, jobPublishe this.jobService = jobService; this.jobPublisher = jobPublisher; this.logger = logger; - this.locker = Locker.create('redis-distlock', { pool: redisPool }); + this.hostScheduler = new HostScheduler({ run: this.processJob.bind(this) }, redisPool); this.hostUserQueueMover = new HostUserQueueMover(jobQueue, jobService, this.locker, redisPool); - // map: host => map{user}. Useful to determine pending queued users. - this.workingHosts = {}; - // map: user => jobId. Will be used for draining jobs. this.workInProgressJobs = {}; } @@ -42,36 +40,14 @@ Batch.prototype.subscribe = function () { this.jobSubscriber.subscribe( function onJobHandler(user, host) { debug('onJobHandler(%s, %s)', user, host); - if (self.isProcessing(host, user)) { - return debug('%s is already processing host=%s user=%s', self.name, host, user); - } - - self.setProcessing(host, user); - - // do forever, it does not throw a stack overflow - forever( - function (next) { - self.locker.lock(host, function(err) { - // we didn't get the lock for the host - if (err) { - debug('Could not lock host=%s from %s. Reason: %s', host, self.name, err.message); - return next(err); - } - debug('Locked host=%s from %s', host, user, self.name); - self.processNextJob(user, next); - }); - }, - function (err) { - if (err) { - debug(err.name === 'EmptyQueue' ? err.message : err); - } - - self.clearProcessing(host, user); - if (!self.hasPendingJobs(host)) { - self.locker.unlock(host, debug); - } + self.hostScheduler.schedule(host, user, function(err) { + if (err) { + return debug( + 'Could not schedule host=%s user=%s from %s. Reason: %s', + host, self.name, user, err.message + ); } - ); + }); }, function onJobSubscriberReady(err) { if (err) { @@ -83,21 +59,16 @@ Batch.prototype.subscribe = function () { ); }; -Batch.prototype.processNextJob = function (user, callback) { - // This is missing the logic for processing several users within the same host - // It requires to: - // - Take care of number of jobs running at the same time per host. - // - Execute user jobs in order. +Batch.prototype.processJob = function (user, callback) { var self = this; self.jobQueue.dequeue(user, function (err, jobId) { if (err) { - return callback(err); + return callback(new Error('Could not dequeue job from user "' + user + '". Reason: ' + err.message)); } if (!jobId) { - var emptyQueueError = new Error('Queue for user="' + user + '" is empty'); - emptyQueueError.name = 'EmptyQueue'; - return callback(emptyQueueError); + debug('Queue empty user=%s', user); + return callback(null, EMPTY_QUEUE); } self.setWorkInProgressJob(user, jobId); @@ -107,7 +78,7 @@ Batch.prototype.processNextJob = function (user, callback) { if (err) { debug(err); if (err.name === 'JobNotRunnable') { - return callback(); + return callback(null, !EMPTY_QUEUE); } return callback(err); } @@ -116,7 +87,7 @@ Batch.prototype.processNextJob = function (user, callback) { self.logger.log(job); - callback(); + return callback(null, !EMPTY_QUEUE); }); }); }; @@ -170,33 +141,6 @@ Batch.prototype.stop = function (callback) { }; -/* Processing hosts => users */ - -Batch.prototype.setProcessing = function(host, user) { - if (!this.workingHosts.hasOwnProperty(host)) { - this.workingHosts[host] = {}; - } - this.workingHosts[host][user] = true; -}; - -Batch.prototype.clearProcessing = function(host, user) { - if (this.workingHosts.hasOwnProperty(host)) { - delete this.workingHosts[host][user]; - if (!this.hasPendingJobs(host)) { - delete this.workingHosts[host]; - } - } -}; - -Batch.prototype.isProcessing = function(host, user) { - return this.workingHosts.hasOwnProperty(host) && this.workingHosts[host].hasOwnProperty(user); -}; - -Batch.prototype.hasPendingJobs = function(host) { - return this.workingHosts.hasOwnProperty(host) && Object.keys(this.workingHosts[host]).length > 0; -}; - - /* Work in progress jobs */ Batch.prototype.setWorkInProgressJob = function(user, jobId) { diff --git a/batch/scheduler/capacity/infinity.js b/batch/scheduler/capacity/infinity.js new file mode 100644 index 00000000..de3f5ab4 --- /dev/null +++ b/batch/scheduler/capacity/infinity.js @@ -0,0 +1,11 @@ +'use strict'; + +function InfinityCapacity() { + +} + +module.exports = InfinityCapacity; + +InfinityCapacity.prototype.getCapacity = function(callback) { + return callback(null, Infinity); +}; diff --git a/batch/scheduler/capacity/one.js b/batch/scheduler/capacity/one.js new file mode 100644 index 00000000..4d57f059 --- /dev/null +++ b/batch/scheduler/capacity/one.js @@ -0,0 +1,11 @@ +'use strict'; + +function OneCapacity() { + +} + +module.exports = OneCapacity; + +OneCapacity.prototype.getCapacity = function(callback) { + return callback(null, 1); +}; diff --git a/batch/scheduler/host-scheduler.js b/batch/scheduler/host-scheduler.js new file mode 100644 index 00000000..43a088a9 --- /dev/null +++ b/batch/scheduler/host-scheduler.js @@ -0,0 +1,59 @@ +'use strict'; + +var debug = require('../util/debug')('host-scheduler'); +var Scheduler = require('./scheduler'); +var Locker = require('../leader/locker'); +var InfinityCapacity = require('./capacity/infinity'); +//var OneCapacity = require('./capacity/one'); + +function HostScheduler(taskRunner, redisPool) { + this.taskRunner = taskRunner; + this.locker = Locker.create('redis-distlock', { pool: redisPool }); + this.locker.on('error', function(err, host) { + debug('Locker.error %s', err.message); + this.unlock(host); + }.bind(this)); + // host => Scheduler + this.schedulers = {}; +} + +module.exports = HostScheduler; + +HostScheduler.prototype.schedule = function(host, user, callback) { + this.lock(host, function(err, scheduler) { + if (err) { + return callback(err); + } + var wasRunning = scheduler.add(user); + return callback(err, wasRunning); + }); +}; + +HostScheduler.prototype.lock = function(host, callback) { + debug('lock(%s)', host); + var self = this; + this.locker.lock(host, function(err) { + if (err) { + debug('Could not lock host=%s. Reason: %s', host, err.message); + return callback(err); + } + + if (!self.schedulers.hasOwnProperty(host)) { + var scheduler = new Scheduler(new InfinityCapacity(host), self.taskRunner); + scheduler.on('done', self.unlock.bind(self, host)); + self.schedulers[host] = scheduler; + } + + debug('Locked host=%s', host); + return callback(null, self.schedulers[host]); + }); +}; + +HostScheduler.prototype.unlock = function(host) { + debug('unlock(%s)', host); + if (this.schedulers.hasOwnProperty(host)) { + // TODO stop scheduler? + delete this.schedulers[host]; + } + this.locker.unlock(host, debug); +}; diff --git a/batch/scheduler/scheduler.js b/batch/scheduler/scheduler.js new file mode 100644 index 00000000..096d3853 --- /dev/null +++ b/batch/scheduler/scheduler.js @@ -0,0 +1,163 @@ +'use strict'; + +var util = require('util'); +var EventEmitter = require('events').EventEmitter; + +var debug = require('../util/debug')('scheduler'); + +var forever = require('../util/forever'); + +/** + * TODO + * + * - It requires to: + * - Take care of number of jobs running at the same time per host. + * - Execute user jobs in order. + * + */ + +//var STATUS = { +// PENDING: 100, +// WAITING: 50, +// RUNNING: 0, +// DONE: -10 +//}; + +var STATUS = { + PENDING: 'pending', + WAITING: 'waiting', + RUNNING: 'running', + DONE: 'done' +}; + +function Scheduler(capacity, taskRunner) { + EventEmitter.call(this); + this.taskRunner = taskRunner; + this.capacity = capacity; + this.users = {}; +} +util.inherits(Scheduler, EventEmitter); + +module.exports = Scheduler; + +Scheduler.prototype.add = function(user) { + debug('add(%s)', user); + if (!this.users.hasOwnProperty(user) || this.users[user].status === STATUS.DONE) { + this.users[user] = { + name: user, + status: STATUS.PENDING + }; + } + return this.run(); +}; + +Scheduler.prototype.run = function() { + if (this.running) { + return true; + } + this.running = true; + + var self = this; + forever( + function (next) { + debug('Trying to acquire user'); + self.acquire(function(err, user) { + debug('Acquired user=%s', user); + + if (!user) { + return next(new Error('all users finished')); + } + + // try to acquire next user + // will block until capacity slow is available + next(); + + debug('Running task for user=%s', user); + self.taskRunner.run(user, function(err, userQueueIsEmpty, done) { + self.release(user, userQueueIsEmpty, done); + }); + }); + }, + function (err) { + debug('done: %s', err.message); + self.running = false; + self.emit('done'); + } + ); +}; + +function nextCandidate(users) { + var sortedCandidates = Object.keys(users) + .filter(function(user) { + return isCandidate(users[user]); + }); +// .sort(function(candidateNameA, candidateNameB) { +// return users[candidateNameA].status - users[candidateNameB].status; +// }); + return sortedCandidates[0]; +} + +function allRunning(users) { + return all(users, STATUS.RUNNING); +} + +function allDone(users) { + return all(users, STATUS.DONE); +} + +function all(users, status) { + return Object.keys(users).every(function(user) { + return users[user].status === status; + }); +} + +function isCandidate(candidate) { + return candidate.status === STATUS.PENDING || candidate.status === STATUS.WAITING; +} + +function isRunning(candidate) { + return candidate.status === STATUS.RUNNING; +} + +Scheduler.prototype.acquire = function(callback) { + if (allDone(this.users)) { + return callback(null, null); + } + var self = this; + this.capacity.getCapacity(function(err, capacity) { + if (err) { + return callback(err); + } + + var running = Object.keys(self.users).filter(function(user) { + return isRunning(self.users[user]); + }); + + debug('Trying to acquire users=%j, running=%d, capacity=%d', self.users, running.length, capacity); + var allUsersRunning = allRunning(self.users); + if (running.length >= capacity || allUsersRunning) { + debug( + 'Waiting for slot. capacity=%s, running=%s, all_running=%s', + capacity, running.length, allUsersRunning + ); + return self.once('release', function() { + debug('Slot was released'); + self.acquire(callback); + }); + } + + var candidate = nextCandidate(self.users); + if (candidate) { + self.users[candidate].status = STATUS.RUNNING; + } + return callback(null, candidate); + }); +}; + +Scheduler.prototype.release = function(user, isDone, done) { + debug('Released user=%s done=%s', user, isDone); + this.users[user].status = isDone ? STATUS.DONE : STATUS.WAITING; + this.emit('release'); + + return done && done(); +}; From ac65c1c39adfb0584b6ee251c319ca26ed9d1cf1 Mon Sep 17 00:00:00 2001 From: Raul Ochoa Date: Wed, 19 Oct 2016 10:36:13 +0200 Subject: [PATCH 12/35] Rename --- batch/batch.js | 2 +- batch/scheduler/host-scheduler.js | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/batch/batch.js b/batch/batch.js index fbc7eb84..703e0ae3 100644 --- a/batch/batch.js +++ b/batch/batch.js @@ -40,7 +40,7 @@ Batch.prototype.subscribe = function () { this.jobSubscriber.subscribe( function onJobHandler(user, host) { debug('onJobHandler(%s, %s)', user, host); - self.hostScheduler.schedule(host, user, function(err) { + self.hostScheduler.add(host, user, function(err) { if (err) { return debug( 'Could not schedule host=%s user=%s from %s. Reason: %s', diff --git a/batch/scheduler/host-scheduler.js b/batch/scheduler/host-scheduler.js index 43a088a9..0e143c82 100644 --- a/batch/scheduler/host-scheduler.js +++ b/batch/scheduler/host-scheduler.js @@ -19,7 +19,7 @@ function HostScheduler(taskRunner, redisPool) { module.exports = HostScheduler; -HostScheduler.prototype.schedule = function(host, user, callback) { +HostScheduler.prototype.add = function(host, user, callback) { this.lock(host, function(err, scheduler) { if (err) { return callback(err); From 2853c7b0a75311922b4e69e1d57daf22ab6cbbb7 Mon Sep 17 00:00:00 2001 From: Raul Ochoa Date: Wed, 19 Oct 2016 10:36:27 +0200 Subject: [PATCH 13/35] Fix status check --- test/acceptance/batch/leader-multiple-users-query-order.test.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/acceptance/batch/leader-multiple-users-query-order.test.js b/test/acceptance/batch/leader-multiple-users-query-order.test.js index 78aea443..3b076545 100644 --- a/test/acceptance/batch/leader-multiple-users-query-order.test.js +++ b/test/acceptance/batch/leader-multiple-users-query-order.test.js @@ -77,7 +77,7 @@ describe('multiple batch clients and users, job query order', function() { } jobResultB1.getStatus(function(err, jobB1) { assert.equal(jobA1.status, JobStatus.DONE); - assert.equal(jobA1.status, JobStatus.DONE); + assert.equal(jobA2.status, JobStatus.DONE); assert.equal(jobB1.status, JobStatus.DONE); self.testClientA.getResult('select * from ordered_inserts', function(err, rows) { From 51ac1a3ab7f4bbb334bd4db1cd1300bf76651ac5 Mon Sep 17 00:00:00 2001 From: Raul Ochoa Date: Wed, 19 Oct 2016 10:38:39 +0200 Subject: [PATCH 14/35] Remove TODO as it is already done --- batch/scheduler/scheduler.js | 9 --------- 1 file changed, 9 deletions(-) diff --git a/batch/scheduler/scheduler.js b/batch/scheduler/scheduler.js index 096d3853..fc528796 100644 --- a/batch/scheduler/scheduler.js +++ b/batch/scheduler/scheduler.js @@ -7,15 +7,6 @@ var debug = require('../util/debug')('scheduler'); var forever = require('../util/forever'); -/** - * TODO - * - * - It requires to: - * - Take care of number of jobs running at the same time per host. - * - Execute user jobs in order. - * - */ - //var STATUS = { // PENDING: 100, // WAITING: 50, From 6c232a1fd0d633ab62393ad562cea07be28af697 Mon Sep 17 00:00:00 2001 From: Raul Ochoa Date: Wed, 19 Oct 2016 10:40:03 +0200 Subject: [PATCH 15/35] Discard numeric status --- batch/scheduler/scheduler.js | 7 ------- 1 file changed, 7 deletions(-) diff --git a/batch/scheduler/scheduler.js b/batch/scheduler/scheduler.js index fc528796..8487f2df 100644 --- a/batch/scheduler/scheduler.js +++ b/batch/scheduler/scheduler.js @@ -7,13 +7,6 @@ var debug = require('../util/debug')('scheduler'); var forever = require('../util/forever'); -//var STATUS = { -// PENDING: 100, -// WAITING: 50, -// RUNNING: 0, -// DONE: -10 -//}; - var STATUS = { PENDING: 'pending', WAITING: 'waiting', From ca3d71ea485e122f89790582633cbc7fb3f5a86d Mon Sep 17 00:00:00 2001 From: Raul Ochoa Date: Wed, 19 Oct 2016 10:43:24 +0200 Subject: [PATCH 16/35] Tasks with their own entity - Use a list of tasks and keep an index per user. - Removes WAITING status. TODO: improve candidate selection. --- batch/scheduler/scheduler.js | 110 +++++++++++++++-------------------- 1 file changed, 48 insertions(+), 62 deletions(-) diff --git a/batch/scheduler/scheduler.js b/batch/scheduler/scheduler.js index 8487f2df..65cb1fb2 100644 --- a/batch/scheduler/scheduler.js +++ b/batch/scheduler/scheduler.js @@ -9,7 +9,6 @@ var forever = require('../util/forever'); var STATUS = { PENDING: 'pending', - WAITING: 'waiting', RUNNING: 'running', DONE: 'done' }; @@ -18,6 +17,7 @@ function Scheduler(capacity, taskRunner) { EventEmitter.call(this); this.taskRunner = taskRunner; this.capacity = capacity; + this.tasks = []; this.users = {}; } util.inherits(Scheduler, EventEmitter); @@ -26,16 +26,20 @@ module.exports = Scheduler; Scheduler.prototype.add = function(user) { debug('add(%s)', user); - if (!this.users.hasOwnProperty(user) || this.users[user].status === STATUS.DONE) { - this.users[user] = { - name: user, - status: STATUS.PENDING - }; + var task = this.users[user]; + if (task) { + if (task.status === STATUS.DONE) { + task.status = STATUS.PENDING; + } + } else { + task = new TaskEntity(user); + this.tasks.push(task); + this.users[user] = task; } - return this.run(); + return this.schedule(); }; -Scheduler.prototype.run = function() { +Scheduler.prototype.schedule = function() { if (this.running) { return true; } @@ -45,20 +49,23 @@ Scheduler.prototype.run = function() { forever( function (next) { debug('Trying to acquire user'); - self.acquire(function(err, user) { - debug('Acquired user=%s', user); + self.acquire(function(err, taskEntity) { + debug('Acquired user=%s', taskEntity); - if (!user) { + if (!taskEntity) { return next(new Error('all users finished')); } + taskEntity.status = STATUS.RUNNING; // try to acquire next user // will block until capacity slow is available next(); - debug('Running task for user=%s', user); - self.taskRunner.run(user, function(err, userQueueIsEmpty, done) { - self.release(user, userQueueIsEmpty, done); + debug('Running task for user=%s', taskEntity.user); + self.taskRunner.run(taskEntity.user, function(err, userQueueIsEmpty) { + taskEntity.status = userQueueIsEmpty ? STATUS.DONE : STATUS.PENDING; + + self.release(err, taskEntity); }); }); }, @@ -68,43 +75,12 @@ Scheduler.prototype.run = function() { self.emit('done'); } ); + + return false; }; -function nextCandidate(users) { - var sortedCandidates = Object.keys(users) - .filter(function(user) { - return isCandidate(users[user]); - }); -// .sort(function(candidateNameA, candidateNameB) { -// return users[candidateNameA].status - users[candidateNameB].status; -// }); - return sortedCandidates[0]; -} - -function allRunning(users) { - return all(users, STATUS.RUNNING); -} - -function allDone(users) { - return all(users, STATUS.DONE); -} - -function all(users, status) { - return Object.keys(users).every(function(user) { - return users[user].status === status; - }); -} - -function isCandidate(candidate) { - return candidate.status === STATUS.PENDING || candidate.status === STATUS.WAITING; -} - -function isRunning(candidate) { - return candidate.status === STATUS.RUNNING; -} - Scheduler.prototype.acquire = function(callback) { - if (allDone(this.users)) { + if (this.tasks.every(is(STATUS.DONE))) { return callback(null, null); } var self = this; @@ -113,12 +89,10 @@ Scheduler.prototype.acquire = function(callback) { return callback(err); } - var running = Object.keys(self.users).filter(function(user) { - return isRunning(self.users[user]); - }); + var running = self.tasks.filter(is(STATUS.RUNNING)); - debug('Trying to acquire users=%j, running=%d, capacity=%d', self.users, running.length, capacity); - var allUsersRunning = allRunning(self.users); + debug('Trying to acquire users=%j, running=%d, capacity=%d', self.tasks, running.length, capacity); + var allUsersRunning = self.tasks.every(is(STATUS.RUNNING)); if (running.length >= capacity || allUsersRunning) { debug( 'Waiting for slot. capacity=%s, running=%s, all_running=%s', @@ -130,18 +104,30 @@ Scheduler.prototype.acquire = function(callback) { }); } - var candidate = nextCandidate(self.users); - if (candidate) { - self.users[candidate].status = STATUS.RUNNING; - } + var candidate = self.tasks.filter(is(STATUS.PENDING))[0]; + return callback(null, candidate); }); }; -Scheduler.prototype.release = function(user, isDone, done) { - debug('Released user=%s done=%s', user, isDone); - this.users[user].status = isDone ? STATUS.DONE : STATUS.WAITING; +Scheduler.prototype.release = function(err, taskEntity) { + debug('Released %j', taskEntity); + // decide what to do based on status/jobs this.emit('release'); - - return done && done(); }; + +function TaskEntity(user) { + this.user = user; + this.status = STATUS.PENDING; + this.jobs = 0; +} + +TaskEntity.prototype.is = function(status) { + return this.status === status; +}; + +function is(status) { + return function(taskEntity) { + return taskEntity.is(status); + }; +} From e26bed2e66c54e29c563eef8214451d801d1fdf3 Mon Sep 17 00:00:00 2001 From: Raul Ochoa Date: Wed, 19 Oct 2016 10:45:37 +0200 Subject: [PATCH 17/35] Move status close to entity --- batch/scheduler/scheduler.js | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/batch/scheduler/scheduler.js b/batch/scheduler/scheduler.js index 65cb1fb2..63e1d4c0 100644 --- a/batch/scheduler/scheduler.js +++ b/batch/scheduler/scheduler.js @@ -7,12 +7,6 @@ var debug = require('../util/debug')('scheduler'); var forever = require('../util/forever'); -var STATUS = { - PENDING: 'pending', - RUNNING: 'running', - DONE: 'done' -}; - function Scheduler(capacity, taskRunner) { EventEmitter.call(this); this.taskRunner = taskRunner; @@ -116,6 +110,15 @@ Scheduler.prototype.release = function(err, taskEntity) { this.emit('release'); }; + +/* Task entities */ + +var STATUS = { + PENDING: 'pending', + RUNNING: 'running', + DONE: 'done' +}; + function TaskEntity(user) { this.user = user; this.status = STATUS.PENDING; From 4daa39bd2c2cd12a14544d4db018168e0de6f423 Mon Sep 17 00:00:00 2001 From: Raul Ochoa Date: Wed, 19 Oct 2016 11:45:48 +0200 Subject: [PATCH 18/35] Start scheduler from host-scheduler --- batch/scheduler/host-scheduler.js | 4 +++- batch/scheduler/scheduler.js | 5 ++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/batch/scheduler/host-scheduler.js b/batch/scheduler/host-scheduler.js index 0e143c82..62dc4ead 100644 --- a/batch/scheduler/host-scheduler.js +++ b/batch/scheduler/host-scheduler.js @@ -24,7 +24,9 @@ HostScheduler.prototype.add = function(host, user, callback) { if (err) { return callback(err); } - var wasRunning = scheduler.add(user); + scheduler.add(user); + var wasRunning = scheduler.schedule(); + debug('Scheduler host=%s was running = %s', host, wasRunning); return callback(err, wasRunning); }); }; diff --git a/batch/scheduler/scheduler.js b/batch/scheduler/scheduler.js index 63e1d4c0..db342d5a 100644 --- a/batch/scheduler/scheduler.js +++ b/batch/scheduler/scheduler.js @@ -25,12 +25,15 @@ Scheduler.prototype.add = function(user) { if (task.status === STATUS.DONE) { task.status = STATUS.PENDING; } + + return true; } else { task = new TaskEntity(user); this.tasks.push(task); this.users[user] = task; + + return false; } - return this.schedule(); }; Scheduler.prototype.schedule = function() { From 71d32e003be0b80bedbe8051dc6546f69568f57e Mon Sep 17 00:00:00 2001 From: Raul Ochoa Date: Wed, 19 Oct 2016 11:46:02 +0200 Subject: [PATCH 19/35] Better debug --- batch/scheduler/scheduler.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/batch/scheduler/scheduler.js b/batch/scheduler/scheduler.js index db342d5a..f706e1c2 100644 --- a/batch/scheduler/scheduler.js +++ b/batch/scheduler/scheduler.js @@ -47,7 +47,7 @@ Scheduler.prototype.schedule = function() { function (next) { debug('Trying to acquire user'); self.acquire(function(err, taskEntity) { - debug('Acquired user=%s', taskEntity); + debug('Acquired user=%j', taskEntity); if (!taskEntity) { return next(new Error('all users finished')); From 1ee08786319df145dff0cef23ce57d781efc0c56 Mon Sep 17 00:00:00 2001 From: Raul Ochoa Date: Wed, 19 Oct 2016 12:39:33 +0200 Subject: [PATCH 20/35] =?UTF-8?q?Scheduler=20uses=20a=20red=E2=80=93black?= =?UTF-8?q?=20tree=20to=20decide=20on=20next=20job=20candidate?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- batch/scheduler/scheduler.js | 101 +++++++++++++++++++------- npm-shrinkwrap.json | 9 ++- package.json | 1 + test/integration/batch/scheduler.js | 106 ++++++++++++++++++++++++++++ 4 files changed, 189 insertions(+), 28 deletions(-) create mode 100644 test/integration/batch/scheduler.js diff --git a/batch/scheduler/scheduler.js b/batch/scheduler/scheduler.js index f706e1c2..f044c324 100644 --- a/batch/scheduler/scheduler.js +++ b/batch/scheduler/scheduler.js @@ -1,7 +1,13 @@ 'use strict'; +// Inspiration from: +// - https://www.kernel.org/doc/Documentation/scheduler/sched-design-CFS.txt +// - https://www.kernel.org/doc/Documentation/rbtree.txt +// - http://www.ibm.com/developerworks/linux/library/l-completely-fair-scheduler/ + var util = require('util'); var EventEmitter = require('events').EventEmitter; +var RBTree = require('bintrees').RBTree; var debug = require('../util/debug')('scheduler'); @@ -13,6 +19,30 @@ function Scheduler(capacity, taskRunner) { this.capacity = capacity; this.tasks = []; this.users = {}; + this.tasksTree = new RBTree(function(taskEntityA, taskEntityB) { + // if the user is the same it's the same entity + if (taskEntityA.user === taskEntityB.user) { + return 0; + } + + // priority for entity with less executed jobs + if (taskEntityA.jobs !== taskEntityB.jobs) { + return taskEntityA.jobs - taskEntityB.jobs; + } + + // priority for entity with oldest executed job + if (taskEntityA.runAt !== taskEntityB.runAt) { + return taskEntityA.runAt - taskEntityB.runAt; + } + + // priority for oldest job + if (taskEntityA.createdAt !== taskEntityB.createdAt) { + return taskEntityA.createdAt - taskEntityB.createdAt; + } + + // we don't care if we arrive here + return -1; + }); } util.inherits(Scheduler, EventEmitter); @@ -20,17 +50,18 @@ module.exports = Scheduler; Scheduler.prototype.add = function(user) { debug('add(%s)', user); - var task = this.users[user]; - if (task) { - if (task.status === STATUS.DONE) { - task.status = STATUS.PENDING; + var taskEntity = this.users[user]; + if (taskEntity) { + if (taskEntity.status === STATUS.DONE) { + taskEntity.status = STATUS.PENDING; } return true; } else { - task = new TaskEntity(user); - this.tasks.push(task); - this.users[user] = task; + taskEntity = new TaskEntity(user, this.tasks.length); + this.tasks.push(taskEntity); + this.users[user] = taskEntity; + this.tasksTree.insert(taskEntity); return false; } @@ -45,7 +76,7 @@ Scheduler.prototype.schedule = function() { var self = this; forever( function (next) { - debug('Trying to acquire user'); + debug('Waiting for task'); self.acquire(function(err, taskEntity) { debug('Acquired user=%j', taskEntity); @@ -53,23 +84,26 @@ Scheduler.prototype.schedule = function() { return next(new Error('all users finished')); } - taskEntity.status = STATUS.RUNNING; - // try to acquire next user - // will block until capacity slow is available - next(); + self.tasksTree.remove(taskEntity); + taskEntity.running(); debug('Running task for user=%s', taskEntity.user); self.taskRunner.run(taskEntity.user, function(err, userQueueIsEmpty) { - taskEntity.status = userQueueIsEmpty ? STATUS.DONE : STATUS.PENDING; - + debug('Run task=%j, done=%s', taskEntity, userQueueIsEmpty); + taskEntity.ran(userQueueIsEmpty); self.release(err, taskEntity); }); + + // try to acquire next user + // will block until capacity slot is available + next(); }); }, function (err) { debug('done: %s', err.message); self.running = false; self.emit('done'); + self.removeAllListeners(); } ); @@ -80,28 +114,29 @@ Scheduler.prototype.acquire = function(callback) { if (this.tasks.every(is(STATUS.DONE))) { return callback(null, null); } + var self = this; this.capacity.getCapacity(function(err, capacity) { if (err) { return callback(err); } - var running = self.tasks.filter(is(STATUS.RUNNING)); + debug('Trying to acquire task'); - debug('Trying to acquire users=%j, running=%d, capacity=%d', self.tasks, running.length, capacity); - var allUsersRunning = self.tasks.every(is(STATUS.RUNNING)); - if (running.length >= capacity || allUsersRunning) { - debug( - 'Waiting for slot. capacity=%s, running=%s, all_running=%s', - capacity, running.length, allUsersRunning - ); - return self.once('release', function() { + var allRunning = self.tasks.every(is(STATUS.RUNNING)); + var running = self.tasks.filter(is(STATUS.RUNNING)); + debug('[capacity=%d, running=%d, all=%s] candidates=%j', capacity, running.length, allRunning, self.tasks); + + var isRunningAny = self.tasks.some(is(STATUS.RUNNING)); + if (isRunningAny || running.length >= capacity) { + debug('Waiting for slot'); + return self.once('release', function releaseListener() { debug('Slot was released'); self.acquire(callback); }); } - var candidate = self.tasks.filter(is(STATUS.PENDING))[0]; + var candidate = self.tasksTree.min(); return callback(null, candidate); }); @@ -109,7 +144,9 @@ Scheduler.prototype.acquire = function(callback) { Scheduler.prototype.release = function(err, taskEntity) { debug('Released %j', taskEntity); - // decide what to do based on status/jobs + if (taskEntity.is(STATUS.PENDING)) { + this.tasksTree.insert(taskEntity); + } this.emit('release'); }; @@ -122,16 +159,28 @@ var STATUS = { DONE: 'done' }; -function TaskEntity(user) { +function TaskEntity(user, createdAt) { this.user = user; + this.createdAt = createdAt; this.status = STATUS.PENDING; this.jobs = 0; + this.runAt = 0; } TaskEntity.prototype.is = function(status) { return this.status === status; }; +TaskEntity.prototype.running = function() { + this.status = STATUS.RUNNING; + this.runAt = Date.now(); +}; + +TaskEntity.prototype.ran = function(userQueueIsEmpty) { + this.jobs++; + this.status = userQueueIsEmpty ? STATUS.DONE : STATUS.PENDING; +}; + function is(status) { return function(taskEntity) { return taskEntity.is(status); diff --git a/npm-shrinkwrap.json b/npm-shrinkwrap.json index 7fb0db50..395250ac 100644 --- a/npm-shrinkwrap.json +++ b/npm-shrinkwrap.json @@ -2,6 +2,11 @@ "name": "cartodb_sql_api", "version": "1.39.2", "dependencies": { + "bintrees": { + "version": "1.0.1", + "from": "bintrees@>=1.0.1 <2.0.0", + "resolved": "https://registry.npmjs.org/bintrees/-/bintrees-1.0.1.tgz" + }, "bunyan": { "version": "1.8.1", "from": "bunyan@1.8.1", @@ -52,9 +57,9 @@ "resolved": "https://registry.npmjs.org/glob/-/glob-6.0.4.tgz", "dependencies": { "inflight": { - "version": "1.0.5", + "version": "1.0.6", "from": "inflight@>=1.0.4 <2.0.0", - "resolved": "https://registry.npmjs.org/inflight/-/inflight-1.0.5.tgz", + "resolved": "https://registry.npmjs.org/inflight/-/inflight-1.0.6.tgz", "dependencies": { "wrappy": { "version": "1.0.2", diff --git a/package.json b/package.json index 68fec31e..c353b3e1 100644 --- a/package.json +++ b/package.json @@ -17,6 +17,7 @@ "Sandro Santilli " ], "dependencies": { + "bintrees": "1.0.1", "bunyan": "1.8.1", "cartodb-psql": "~0.6.0", "cartodb-query-tables": "0.2.0", diff --git a/test/integration/batch/scheduler.js b/test/integration/batch/scheduler.js new file mode 100644 index 00000000..7a2a51c0 --- /dev/null +++ b/test/integration/batch/scheduler.js @@ -0,0 +1,106 @@ +'use strict'; + +require('../../helper'); +var assert = require('../../support/assert'); +var Scheduler = require('../../../batch/scheduler/scheduler'); +var OneCapacity = require('../../../batch/scheduler/capacity/one'); +var InfinityCapacity = require('../../../batch/scheduler/capacity/infinity'); + +describe('scheduler', function() { + + function TaskRunner(userTasks) { + this.results = []; + this.userTasks = userTasks; + } + + TaskRunner.prototype.run = function(user, callback) { + this.results.push(user); + this.userTasks[user]--; + return callback(null, this.userTasks[user] === 0); + }; + + // simulate one by one or infinity capacity + var capacities = [new OneCapacity(), new InfinityCapacity()]; + + capacities.forEach(function(capacity) { + it('should run tasks', function (done) { + var taskRunner = new TaskRunner({ + userA: 1 + }); + var scheduler = new Scheduler(capacity, taskRunner); + scheduler.add('userA'); + + scheduler.on('done', function() { + var results = taskRunner.results; + + assert.equal(results.length, 1); + + assert.equal(results[0], 'userA'); + + return done(); + }); + + scheduler.schedule(); + }); + + + it('should run tasks for different users', function (done) { + var taskRunner = new TaskRunner({ + userA: 1, + userB: 1, + userC: 1 + }); + var scheduler = new Scheduler(capacity, taskRunner); + scheduler.add('userA'); + scheduler.add('userB'); + scheduler.add('userC'); + + scheduler.on('done', function() { + var results = taskRunner.results; + + assert.equal(results.length, 3); + + assert.equal(results[0], 'userA'); + assert.equal(results[1], 'userB'); + assert.equal(results[2], 'userC'); + + return done(); + }); + + scheduler.schedule(); + }); + + it('should be fair when scheduling tasks', function (done) { + var taskRunner = new TaskRunner({ + userA: 3, + userB: 2, + userC: 1 + }); + + var scheduler = new Scheduler(capacity, taskRunner); + scheduler.add('userA'); + scheduler.add('userA'); + scheduler.add('userA'); + scheduler.add('userB'); + scheduler.add('userB'); + scheduler.add('userC'); + + scheduler.on('done', function() { + var results = taskRunner.results; + + assert.equal(results.length, 6); + + assert.equal(results[0], 'userA'); + assert.equal(results[1], 'userB'); + assert.equal(results[2], 'userC'); + assert.equal(results[3], 'userA'); + assert.equal(results[4], 'userB'); + assert.equal(results[5], 'userA'); + + return done(); + }); + + scheduler.schedule(); + }); + }); +}); From 5030fddc9cbaafae55904ea4c3d3d25343e01917 Mon Sep 17 00:00:00 2001 From: Raul Ochoa Date: Wed, 19 Oct 2016 16:56:43 +0200 Subject: [PATCH 21/35] Allow to override test client --- test/support/test-client.js | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/test/support/test-client.js b/test/support/test-client.js index 86b87b87..c1a74ae0 100644 --- a/test/support/test-client.js +++ b/test/support/test-client.js @@ -24,13 +24,17 @@ function TestClient(config) { module.exports = TestClient; -TestClient.prototype.getResult = function(query, callback) { +TestClient.prototype.getResult = function(query, override, callback) { + if (!callback) { + callback = override; + override = {}; + } assert.response( this.server, { - url: this.getUrl(), + url: this.getUrl(override), headers: { - host: this.getHost(), + host: this.getHost(override), 'Content-Type': 'application/json' }, method: 'POST', @@ -50,10 +54,10 @@ TestClient.prototype.getResult = function(query, callback) { ); }; -TestClient.prototype.getHost = function() { - return this.config.host || 'vizzuality.cartodb.com'; +TestClient.prototype.getHost = function(override) { + return override.host || this.config.host || 'vizzuality.cartodb.com'; }; -TestClient.prototype.getUrl = function() { - return '/api/v2/sql?api_key=' + (this.config.apiKey || '1234'); +TestClient.prototype.getUrl = function(override) { + return '/api/v2/sql?api_key=' + (override.apiKey || this.config.apiKey || '1234'); }; From 372c9f55110969756a53f7b7851b74f4007dc360 Mon Sep 17 00:00:00 2001 From: Raul Ochoa Date: Wed, 19 Oct 2016 16:57:10 +0200 Subject: [PATCH 22/35] Basic test to test scheduler happy case --- test/acceptance/batch/scheduler-basic.test.js | 97 +++++++++++++++++++ 1 file changed, 97 insertions(+) create mode 100644 test/acceptance/batch/scheduler-basic.test.js diff --git a/test/acceptance/batch/scheduler-basic.test.js b/test/acceptance/batch/scheduler-basic.test.js new file mode 100644 index 00000000..ba5decbd --- /dev/null +++ b/test/acceptance/batch/scheduler-basic.test.js @@ -0,0 +1,97 @@ +require('../../helper'); +var assert = require('../../support/assert'); + +var TestClient = require('../../support/test-client'); +var BatchTestClient = require('../../support/batch-test-client'); +var JobStatus = require('../../../batch/job_status'); + +describe('basic scheduling', function() { + + before(function(done) { + this.batchTestClientA = new BatchTestClient({ name: 'consumerA' }); + this.batchTestClientB = new BatchTestClient({ name: 'consumerB' }); + + this.testClient = new TestClient(); + this.testClient.getResult( + [ + 'drop table if exists ordered_inserts_a', + 'create table ordered_inserts_a (status numeric)' + ].join(';'), + done + ); + }); + + after(function (done) { + this.batchTestClientA.drain(function(err) { + if (err) { + return done(err); + } + + this.batchTestClientB.drain(done); + }.bind(this)); + }); + + function createJob(queries) { + return { + query: queries + }; + } + + it('should run job queries in order (multiple consumers)', function (done) { + var jobRequestA1 = createJob([ + "insert into ordered_inserts_a values(1)", + "select pg_sleep(0.25)", + "insert into ordered_inserts_a values(2)" + ]); + var jobRequestA2 = createJob([ + "insert into ordered_inserts_a values(3)" + ]); + + var self = this; + + this.batchTestClientA.createJob(jobRequestA1, function(err, jobResultA1) { + if (err) { + return done(err); + } + + // we don't care about the producer + self.batchTestClientB.createJob(jobRequestA2, function(err, jobResultA2) { + if (err) { + return done(err); + } + + jobResultA1.getStatus(function (err, jobA1) { + if (err) { + return done(err); + } + + jobResultA2.getStatus(function(err, jobA2) { + if (err) { + return done(err); + } + assert.equal(jobA1.status, JobStatus.DONE); + assert.equal(jobA2.status, JobStatus.DONE); + + assert.ok( + new Date(jobA1.updated_at).getTime() < new Date(jobA2.updated_at).getTime(), + 'A1 (' + jobA1.updated_at + ') ' + + 'should finish before A2 (' + jobA2.updated_at + ')' + ); + + function statusMapper (status) { return { status: status }; } + + self.testClient.getResult('select * from ordered_inserts_a', function(err, rows) { + assert.ok(!err); + + // cartodb250user and vizzuality test users share database + var expectedRows = [1, 2, 3].map(statusMapper); + assert.deepEqual(rows, expectedRows); + + return done(); + }); + }); + }); + }); + }); + }); +}); From b164ec8c8619bc86afa753902f4c083bbd1ca304 Mon Sep 17 00:00:00 2001 From: Raul Ochoa Date: Wed, 19 Oct 2016 16:58:00 +0200 Subject: [PATCH 23/35] Better debugging --- batch/batch.js | 9 ++++++--- batch/scheduler/host-scheduler.js | 18 ++++++++++-------- batch/scheduler/scheduler.js | 1 + 3 files changed, 17 insertions(+), 11 deletions(-) diff --git a/batch/batch.js b/batch/batch.js index 703e0ae3..6178e915 100644 --- a/batch/batch.js +++ b/batch/batch.js @@ -18,7 +18,7 @@ function Batch(name, jobSubscriber, jobQueue, jobRunner, jobService, jobPublishe this.jobService = jobService; this.jobPublisher = jobPublisher; this.logger = logger; - this.hostScheduler = new HostScheduler({ run: this.processJob.bind(this) }, redisPool); + this.hostScheduler = new HostScheduler(name, { run: this.processJob.bind(this) }, redisPool); this.hostUserQueueMover = new HostUserQueueMover(jobQueue, jobService, this.locker, redisPool); // map: user => jobId. Will be used for draining jobs. @@ -39,7 +39,7 @@ Batch.prototype.subscribe = function () { this.jobSubscriber.subscribe( function onJobHandler(user, host) { - debug('onJobHandler(%s, %s)', user, host); + debug('[%s] onJobHandler(%s, %s)', self.name, user, host); self.hostScheduler.add(host, user, function(err) { if (err) { return debug( @@ -83,7 +83,10 @@ Batch.prototype.processJob = function (user, callback) { return callback(err); } - debug('Job=%s status=%s user=%s (failed_reason=%s)', jobId, job.data.status, user, job.failed_reason); + debug( + '[%s] Job=%s status=%s user=%s (failed_reason=%s)', + self.name, jobId, job.data.status, user, job.failed_reason + ); self.logger.log(job); diff --git a/batch/scheduler/host-scheduler.js b/batch/scheduler/host-scheduler.js index 62dc4ead..a28be120 100644 --- a/batch/scheduler/host-scheduler.js +++ b/batch/scheduler/host-scheduler.js @@ -6,11 +6,12 @@ var Locker = require('../leader/locker'); var InfinityCapacity = require('./capacity/infinity'); //var OneCapacity = require('./capacity/one'); -function HostScheduler(taskRunner, redisPool) { +function HostScheduler(name, taskRunner, redisPool) { + this.name = name || 'scheduler'; this.taskRunner = taskRunner; this.locker = Locker.create('redis-distlock', { pool: redisPool }); this.locker.on('error', function(err, host) { - debug('Locker.error %s', err.message); + debug('[%s] Locker.error %s', this.name, err.message); this.unlock(host); }.bind(this)); // host => Scheduler @@ -22,21 +23,22 @@ module.exports = HostScheduler; HostScheduler.prototype.add = function(host, user, callback) { this.lock(host, function(err, scheduler) { if (err) { + debug('[%s] Could not lock host=%s', this.name, host); return callback(err); } scheduler.add(user); var wasRunning = scheduler.schedule(); - debug('Scheduler host=%s was running = %s', host, wasRunning); + debug('[%s] Scheduler host=%s was running=%s', this.name, host, wasRunning); return callback(err, wasRunning); - }); + }.bind(this)); }; HostScheduler.prototype.lock = function(host, callback) { - debug('lock(%s)', host); + debug('[%s] lock(%s)', this.name, host); var self = this; this.locker.lock(host, function(err) { if (err) { - debug('Could not lock host=%s. Reason: %s', host, err.message); + debug('[%s] Could not lock host=%s. Reason: %s', self.name, host, err.message); return callback(err); } @@ -46,13 +48,13 @@ HostScheduler.prototype.lock = function(host, callback) { self.schedulers[host] = scheduler; } - debug('Locked host=%s', host); + debug('[%s] Locked host=%s', self.name, host); return callback(null, self.schedulers[host]); }); }; HostScheduler.prototype.unlock = function(host) { - debug('unlock(%s)', host); + debug('[%s] unlock(%s)', this.name, host); if (this.schedulers.hasOwnProperty(host)) { // TODO stop scheduler? delete this.schedulers[host]; diff --git a/batch/scheduler/scheduler.js b/batch/scheduler/scheduler.js index f044c324..f47bfd4d 100644 --- a/batch/scheduler/scheduler.js +++ b/batch/scheduler/scheduler.js @@ -15,6 +15,7 @@ var forever = require('../util/forever'); function Scheduler(capacity, taskRunner) { EventEmitter.call(this); + debug('new Scheduler'); this.taskRunner = taskRunner; this.capacity = capacity; this.tasks = []; From 95b3a8adf10976eac1a4dd06917fd7e3b0dd1098 Mon Sep 17 00:00:00 2001 From: Raul Ochoa Date: Wed, 19 Oct 2016 16:58:31 +0200 Subject: [PATCH 24/35] Be explicit about queue status --- batch/batch.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/batch/batch.js b/batch/batch.js index 6178e915..7bc985eb 100644 --- a/batch/batch.js +++ b/batch/batch.js @@ -63,7 +63,7 @@ Batch.prototype.processJob = function (user, callback) { var self = this; self.jobQueue.dequeue(user, function (err, jobId) { if (err) { - return callback(new Error('Could not dequeue job from user "' + user + '". Reason: ' + err.message)); + return callback(new Error('Could not get job from "' + user + '". Reason: ' + err.message), !EMPTY_QUEUE); } if (!jobId) { @@ -80,7 +80,7 @@ Batch.prototype.processJob = function (user, callback) { if (err.name === 'JobNotRunnable') { return callback(null, !EMPTY_QUEUE); } - return callback(err); + return callback(err, !EMPTY_QUEUE); } debug( From 604e28533ce36dbf7c3e5ddd299cbec5a717b82e Mon Sep 17 00:00:00 2001 From: Raul Ochoa Date: Wed, 19 Oct 2016 16:58:57 +0200 Subject: [PATCH 25/35] Fix and improve test --- .../leader-multiple-users-query-order.test.js | 65 ++++++++++++------- 1 file changed, 41 insertions(+), 24 deletions(-) diff --git a/test/acceptance/batch/leader-multiple-users-query-order.test.js b/test/acceptance/batch/leader-multiple-users-query-order.test.js index 3b076545..782dee19 100644 --- a/test/acceptance/batch/leader-multiple-users-query-order.test.js +++ b/test/acceptance/batch/leader-multiple-users-query-order.test.js @@ -11,9 +11,14 @@ describe('multiple batch clients and users, job query order', function() { this.batchTestClientA = new BatchTestClient({ name: 'consumerA' }); this.batchTestClientB = new BatchTestClient({ name: 'consumerB' }); - this.testClientA = new TestClient(); - this.testClientA.getResult( - 'drop table if exists ordered_inserts; create table ordered_inserts (status numeric)', + this.testClient = new TestClient(); + this.testClient.getResult( + [ + 'drop table if exists ordered_inserts_a', + 'drop table if exists ordered_inserts_bbbbb', + 'create table ordered_inserts_a (status numeric)', + 'create table ordered_inserts_bbbbb (status numeric)' + ].join(';'), done ); }); @@ -36,16 +41,16 @@ describe('multiple batch clients and users, job query order', function() { it('should run job queries in order (multiple consumers)', function (done) { var jobRequestA1 = createJob([ - "insert into ordered_inserts values(1)", + "insert into ordered_inserts_a values(1)", "select pg_sleep(0.25)", - "insert into ordered_inserts values(2)" + "insert into ordered_inserts_a values(2)" ]); var jobRequestA2 = createJob([ - "insert into ordered_inserts values(3)" + "insert into ordered_inserts_a values(3)" ]); var jobRequestB1 = createJob([ - "insert into ordered_inserts values(4)" + "insert into ordered_inserts_bbbbb values(1)" ]); var self = this; @@ -55,14 +60,14 @@ describe('multiple batch clients and users, job query order', function() { return done(err); } - // we don't care about the producer - self.batchTestClientB.createJob(jobRequestA2, function(err, jobResultA2) { + var override = { host: 'cartodb250user.cartodb.com' }; + self.batchTestClientB.createJob(jobRequestB1, override, function(err, jobResultB1) { if (err) { return done(err); } - var override = { host: 'cartodb250user.cartodb.com' }; - self.batchTestClientB.createJob(jobRequestB1, override, function(err, jobResultB1) { + // we don't care about the producer + self.batchTestClientB.createJob(jobRequestA2, function(err, jobResultA2) { if (err) { return done(err); } @@ -80,23 +85,35 @@ describe('multiple batch clients and users, job query order', function() { assert.equal(jobA2.status, JobStatus.DONE); assert.equal(jobB1.status, JobStatus.DONE); - self.testClientA.getResult('select * from ordered_inserts', function(err, rows) { + assert.ok( + new Date(jobA1.updated_at).getTime() < new Date(jobA2.updated_at).getTime(), + 'A1 (' + jobA1.updated_at + ') ' + + 'should finish before A2 (' + jobA2.updated_at + ')' + ); + assert.ok( + new Date(jobB1.updated_at).getTime() < new Date(jobA1.updated_at).getTime(), + 'B1 (' + jobA1.updated_at + ') ' + + 'should finish before A1 (' + jobA1.updated_at + ')' + ); + + function statusMapper (status) { return { status: status }; } + + self.testClient.getResult('select * from ordered_inserts_a', function(err, rows) { assert.ok(!err); // cartodb250user and vizzuality test users share database - var expectedRows = [1, 4, 2, 3].map(function(status) { return {status: status}; }); + var expectedRows = [1, 2, 3].map(statusMapper); assert.deepEqual(rows, expectedRows); - assert.ok( - new Date(jobA1.updated_at).getTime() < new Date(jobA2.updated_at).getTime(), - 'A1 (' + jobA1.updated_at + ') ' + - 'should finish before A2 (' + jobA2.updated_at + ')' - ); - assert.ok( - new Date(jobB1.updated_at).getTime() < new Date(jobA1.updated_at).getTime(), - 'B1 (' + jobA1.updated_at + ') ' + - 'should finish before A1 (' + jobA1.updated_at + ')' - ); - done(); + + var query = 'select * from ordered_inserts_bbbbb'; + self.testClient.getResult(query, override, function(err, rows) { + assert.ok(!err); + + var expectedRows = [1].map(statusMapper); + assert.deepEqual(rows, expectedRows); + + done(); + }); }); }); From 9596ac4730a8f7ab63d38b4b8cae41d547fd4502 Mon Sep 17 00:00:00 2001 From: Raul Ochoa Date: Wed, 19 Oct 2016 16:59:27 +0200 Subject: [PATCH 26/35] Scheduler handles new tasks when there is free slots --- batch/scheduler/scheduler.js | 24 ++++++++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/batch/scheduler/scheduler.js b/batch/scheduler/scheduler.js index f47bfd4d..a05ac1ca 100644 --- a/batch/scheduler/scheduler.js +++ b/batch/scheduler/scheduler.js @@ -64,6 +64,8 @@ Scheduler.prototype.add = function(user) { this.users[user] = taskEntity; this.tasksTree.insert(taskEntity); + this.emit('add'); + return false; } }; @@ -122,19 +124,33 @@ Scheduler.prototype.acquire = function(callback) { return callback(err); } + function addListener() { + self.removeListener('release', releaseListener); + debug('Got a new task'); + self.acquire(callback); + } + + function releaseListener() { + self.removeListener('add', addListener); + debug('Slot was released'); + self.acquire(callback); + } + debug('Trying to acquire task'); var allRunning = self.tasks.every(is(STATUS.RUNNING)); var running = self.tasks.filter(is(STATUS.RUNNING)); debug('[capacity=%d, running=%d, all=%s] candidates=%j', capacity, running.length, allRunning, self.tasks); + if (allRunning && running.length < capacity) { + debug('Waiting for tasks'); + self.once('add', addListener); + } + var isRunningAny = self.tasks.some(is(STATUS.RUNNING)); if (isRunningAny || running.length >= capacity) { debug('Waiting for slot'); - return self.once('release', function releaseListener() { - debug('Slot was released'); - self.acquire(callback); - }); + return self.once('release', releaseListener); } var candidate = self.tasksTree.min(); From 0af5cf703a3f9746eddb9486665d2f0eed55dfdd Mon Sep 17 00:00:00 2001 From: Raul Ochoa Date: Wed, 19 Oct 2016 18:42:53 +0200 Subject: [PATCH 27/35] Allow to configure capacity strategy - HTTP strategy: mechanism to compute load from db host. - Fixed strategy: hardcoded number of queries to run at the same time, via configuration. --- batch/scheduler/capacity/fixed.js | 11 + batch/scheduler/capacity/http-simple.js | 42 +++ batch/scheduler/capacity/infinity.js | 11 - batch/scheduler/capacity/one.js | 11 - batch/scheduler/host-scheduler.js | 22 +- config/environments/development.js.example | 14 + config/environments/production.js.example | 14 + config/environments/staging.js.example | 14 + config/environments/test.js.example | 14 + npm-shrinkwrap.json | 379 ++++++++++++++++++++- package.json | 2 +- 11 files changed, 504 insertions(+), 30 deletions(-) create mode 100644 batch/scheduler/capacity/fixed.js create mode 100644 batch/scheduler/capacity/http-simple.js delete mode 100644 batch/scheduler/capacity/infinity.js delete mode 100644 batch/scheduler/capacity/one.js diff --git a/batch/scheduler/capacity/fixed.js b/batch/scheduler/capacity/fixed.js new file mode 100644 index 00000000..aef83be8 --- /dev/null +++ b/batch/scheduler/capacity/fixed.js @@ -0,0 +1,11 @@ +'use strict'; + +function FixedCapacity(capacity) { + this.capacity = Math.max(1, capacity); +} + +module.exports = FixedCapacity; + +FixedCapacity.prototype.getCapacity = function(callback) { + return callback(null, this.capacity); +}; diff --git a/batch/scheduler/capacity/http-simple.js b/batch/scheduler/capacity/http-simple.js new file mode 100644 index 00000000..da817392 --- /dev/null +++ b/batch/scheduler/capacity/http-simple.js @@ -0,0 +1,42 @@ +'use strict'; + +var request = require('request'); +var debug = require('../../util/debug')('capacity-http'); + +function HttpSimpleCapacity(host, capacityEndpoint) { + this.host = host; + this.capacityEndpoint = capacityEndpoint; +} + +module.exports = HttpSimpleCapacity; + +HttpSimpleCapacity.prototype.getCapacity = function(callback) { + var requestParams = { + method: 'POST', + url: this.capacityEndpoint, + json: true + }; + debug('getCapacity(%s)', this.host); + request.post(requestParams, function(err, res, jsonRes) { + var capacity = 1; + + if (!err && jsonRes) { + if (jsonRes.retcode === 0) { + var values = jsonRes.return_values; + + var cores = parseInt(values.cores, 10); + var relativeLoad = parseFloat(values.relative_load); + + capacity = Math.max( + Math.floor(((1 - relativeLoad) * cores) - 1), + 1 + ); + + debug('host=%s, capacity=%s', this.host, capacity); + } + } + + debug('host=%s, capacity=%s', this.host, capacity); + return callback(null, capacity); + }.bind(this)); +}; diff --git a/batch/scheduler/capacity/infinity.js b/batch/scheduler/capacity/infinity.js deleted file mode 100644 index de3f5ab4..00000000 --- a/batch/scheduler/capacity/infinity.js +++ /dev/null @@ -1,11 +0,0 @@ -'use strict'; - -function InfinityCapacity() { - -} - -module.exports = InfinityCapacity; - -InfinityCapacity.prototype.getCapacity = function(callback) { - return callback(null, Infinity); -}; diff --git a/batch/scheduler/capacity/one.js b/batch/scheduler/capacity/one.js deleted file mode 100644 index 4d57f059..00000000 --- a/batch/scheduler/capacity/one.js +++ /dev/null @@ -1,11 +0,0 @@ -'use strict'; - -function OneCapacity() { - -} - -module.exports = OneCapacity; - -OneCapacity.prototype.getCapacity = function(callback) { - return callback(null, 1); -}; diff --git a/batch/scheduler/host-scheduler.js b/batch/scheduler/host-scheduler.js index a28be120..259ec9b2 100644 --- a/batch/scheduler/host-scheduler.js +++ b/batch/scheduler/host-scheduler.js @@ -1,10 +1,11 @@ 'use strict'; +var _ = require('underscore'); var debug = require('../util/debug')('host-scheduler'); var Scheduler = require('./scheduler'); var Locker = require('../leader/locker'); -var InfinityCapacity = require('./capacity/infinity'); -//var OneCapacity = require('./capacity/one'); +var FixedCapacity = require('./capacity/fixed'); +var HttpSimpleCapacity = require('./capacity/http-simple'); function HostScheduler(name, taskRunner, redisPool) { this.name = name || 'scheduler'; @@ -33,6 +34,21 @@ HostScheduler.prototype.add = function(host, user, callback) { }.bind(this)); }; +HostScheduler.prototype.getCapacityProvider = function(host) { + var strategy = global.settings.batch_capacity_strategy || 'fixed'; + if (strategy === 'http') { + if (global.settings.batch_capacity_http_url_template) { + var endpoint = _.template(global.settings.batch_capacity_http_url_template, { dbhost: host }); + debug('Using strategy=%s capacity. Endpoint=%s', strategy, endpoint); + return new HttpSimpleCapacity(host, endpoint); + } + } + + var fixedCapacity = global.settings.batch_capacity_fixed_amount || 1; + debug('Using strategy=fixed capacity=%d', fixedCapacity); + return new FixedCapacity(fixedCapacity); +}; + HostScheduler.prototype.lock = function(host, callback) { debug('[%s] lock(%s)', this.name, host); var self = this; @@ -43,7 +59,7 @@ HostScheduler.prototype.lock = function(host, callback) { } if (!self.schedulers.hasOwnProperty(host)) { - var scheduler = new Scheduler(new InfinityCapacity(host), self.taskRunner); + var scheduler = new Scheduler(self.getCapacityProvider(host), self.taskRunner); scheduler.on('done', self.unlock.bind(self, host)); self.schedulers[host] = scheduler; } diff --git a/config/environments/development.js.example b/config/environments/development.js.example index 8bbe5fd4..f7fdb858 100644 --- a/config/environments/development.js.example +++ b/config/environments/development.js.example @@ -34,6 +34,20 @@ module.exports.batch_query_timeout = 12 * 3600 * 1000; // 12 hours in millisecon module.exports.batch_log_filename = 'logs/batch-queries.log'; // Max number of queued jobs a user can have at a given time module.exports.batch_max_queued_jobs = 64; +// Capacity strategy to use. +// It allows to tune how many queries run at a db host at the same time. +// Options: 'fixed', 'http' +module.exports.batch_capacity_strategy = 'fixed'; +// Applies when strategy='fixed'. +// Number of simultaneous users running queries in the same host. +// It will use 1 as min. +module.exports.batch_capacity_fixed_amount = 2; +// Applies when strategy='http'. +// HTTP endpoint to check db host load. +// Helps to decide the number of simultaneous users running queries in that host. +// It will use 1 as min. +// If no template is provided it will default to 'fixed' strategy. +module.exports.batch_capacity_http_url_template = 'http://<%= dbhost %>:9999/load'; // Max database connections in the pool // Subsequent connections will wait for a free slot. // NOTE: not used by OGR-mediated accesses diff --git a/config/environments/production.js.example b/config/environments/production.js.example index b69c3b07..b4017f7d 100644 --- a/config/environments/production.js.example +++ b/config/environments/production.js.example @@ -35,6 +35,20 @@ module.exports.batch_query_timeout = 12 * 3600 * 1000; // 12 hours in millisecon module.exports.batch_log_filename = 'logs/batch-queries.log'; // Max number of queued jobs a user can have at a given time module.exports.batch_max_queued_jobs = 64; +// Capacity strategy to use. +// It allows to tune how many queries run at a db host at the same time. +// Options: 'fixed', 'http' +module.exports.batch_capacity_strategy = 'fixed'; +// Applies when strategy='fixed'. +// Number of simultaneous users running queries in the same host. +// It will use 1 as min. +module.exports.batch_capacity_fixed_amount = 2; +// Applies when strategy='http'. +// HTTP endpoint to check db host load. +// Helps to decide the number of simultaneous users running queries in that host. +// It will use 1 as min. +// If no template is provided it will default to 'fixed' strategy. +module.exports.batch_capacity_http_url_template = 'http://<%= dbhost %>:9999/load'; // Max database connections in the pool // Subsequent connections will wait for a free slot.i // NOTE: not used by OGR-mediated accesses diff --git a/config/environments/staging.js.example b/config/environments/staging.js.example index af890e96..74c01aae 100644 --- a/config/environments/staging.js.example +++ b/config/environments/staging.js.example @@ -35,6 +35,20 @@ module.exports.batch_query_timeout = 12 * 3600 * 1000; // 12 hours in millisecon module.exports.batch_log_filename = 'logs/batch-queries.log'; // Max number of queued jobs a user can have at a given time module.exports.batch_max_queued_jobs = 64; +// Capacity strategy to use. +// It allows to tune how many queries run at a db host at the same time. +// Options: 'fixed', 'http' +module.exports.batch_capacity_strategy = 'fixed'; +// Applies when strategy='fixed'. +// Number of simultaneous users running queries in the same host. +// It will use 1 as min. +module.exports.batch_capacity_fixed_amount = 2; +// Applies when strategy='http'. +// HTTP endpoint to check db host load. +// Helps to decide the number of simultaneous users running queries in that host. +// It will use 1 as min. +// If no template is provided it will default to 'fixed' strategy. +module.exports.batch_capacity_http_url_template = 'http://<%= dbhost %>:9999/load'; // Max database connections in the pool // Subsequent connections will wait for a free slot. // NOTE: not used by OGR-mediated accesses diff --git a/config/environments/test.js.example b/config/environments/test.js.example index b5782c20..71111f0d 100644 --- a/config/environments/test.js.example +++ b/config/environments/test.js.example @@ -32,6 +32,20 @@ module.exports.batch_query_timeout = 5 * 1000; // 5 seconds in milliseconds module.exports.batch_log_filename = 'logs/batch-queries.log'; // Max number of queued jobs a user can have at a given time module.exports.batch_max_queued_jobs = 64; +// Capacity strategy to use. +// It allows to tune how many queries run at a db host at the same time. +// Options: 'fixed', 'http' +module.exports.batch_capacity_strategy = 'fixed'; +// Applies when strategy='fixed'. +// Number of simultaneous users running queries in the same host. +// It will use 1 as min. +module.exports.batch_capacity_fixed_amount = 2; +// Applies when strategy='http'. +// HTTP endpoint to check db host load. +// Helps to decide the number of simultaneous users running queries in that host. +// It will use 1 as min. +// If no template is provided it will default to 'fixed' strategy. +module.exports.batch_capacity_http_url_template = 'http://<%= dbhost %>:9999/load'; // Max database connections in the pool // Subsequent connections will wait for a free slot. // NOTE: not used by OGR-mediated accesses diff --git a/npm-shrinkwrap.json b/npm-shrinkwrap.json index 395250ac..a00ce331 100644 --- a/npm-shrinkwrap.json +++ b/npm-shrinkwrap.json @@ -4,7 +4,7 @@ "dependencies": { "bintrees": { "version": "1.0.1", - "from": "bintrees@>=1.0.1 <2.0.0", + "from": "bintrees@1.0.1", "resolved": "https://registry.npmjs.org/bintrees/-/bintrees-1.0.1.tgz" }, "bunyan": { @@ -651,7 +651,7 @@ }, "mime-types": { "version": "2.1.12", - "from": "mime-types@>=2.1.11 <2.2.0", + "from": "mime-types@>=2.1.7 <2.2.0", "resolved": "https://registry.npmjs.org/mime-types/-/mime-types-2.1.12.tgz", "dependencies": { "mime-db": { @@ -748,6 +748,377 @@ } } }, + "request": { + "version": "2.75.0", + "from": "request@>=2.75.0 <2.76.0", + "resolved": "https://registry.npmjs.org/request/-/request-2.75.0.tgz", + "dependencies": { + "aws-sign2": { + "version": "0.6.0", + "from": "aws-sign2@>=0.6.0 <0.7.0", + "resolved": "https://registry.npmjs.org/aws-sign2/-/aws-sign2-0.6.0.tgz" + }, + "aws4": { + "version": "1.5.0", + "from": "aws4@>=1.2.1 <2.0.0", + "resolved": "https://registry.npmjs.org/aws4/-/aws4-1.5.0.tgz" + }, + "bl": { + "version": "1.1.2", + "from": "bl@>=1.1.2 <1.2.0", + "resolved": "https://registry.npmjs.org/bl/-/bl-1.1.2.tgz", + "dependencies": { + "readable-stream": { + "version": "2.0.6", + "from": "readable-stream@>=2.0.5 <2.1.0", + "resolved": "https://registry.npmjs.org/readable-stream/-/readable-stream-2.0.6.tgz", + "dependencies": { + "core-util-is": { + "version": "1.0.2", + "from": "core-util-is@>=1.0.0 <1.1.0", + "resolved": "https://registry.npmjs.org/core-util-is/-/core-util-is-1.0.2.tgz" + }, + "inherits": { + "version": "2.0.3", + "from": "inherits@>=2.0.1 <2.1.0", + "resolved": "https://registry.npmjs.org/inherits/-/inherits-2.0.3.tgz" + }, + "isarray": { + "version": "1.0.0", + "from": "isarray@>=1.0.0 <1.1.0", + "resolved": "https://registry.npmjs.org/isarray/-/isarray-1.0.0.tgz" + }, + "process-nextick-args": { + "version": "1.0.7", + "from": "process-nextick-args@>=1.0.6 <1.1.0", + "resolved": "https://registry.npmjs.org/process-nextick-args/-/process-nextick-args-1.0.7.tgz" + }, + "string_decoder": { + "version": "0.10.31", + "from": "string_decoder@>=0.10.0 <0.11.0", + "resolved": "https://registry.npmjs.org/string_decoder/-/string_decoder-0.10.31.tgz" + }, + "util-deprecate": { + "version": "1.0.2", + "from": "util-deprecate@>=1.0.1 <1.1.0", + "resolved": "https://registry.npmjs.org/util-deprecate/-/util-deprecate-1.0.2.tgz" + } + } + } + } + }, + "caseless": { + "version": "0.11.0", + "from": "caseless@>=0.11.0 <0.12.0", + "resolved": "https://registry.npmjs.org/caseless/-/caseless-0.11.0.tgz" + }, + "combined-stream": { + "version": "1.0.5", + "from": "combined-stream@>=1.0.5 <1.1.0", + "resolved": "https://registry.npmjs.org/combined-stream/-/combined-stream-1.0.5.tgz", + "dependencies": { + "delayed-stream": { + "version": "1.0.0", + "from": "delayed-stream@>=1.0.0 <1.1.0", + "resolved": "https://registry.npmjs.org/delayed-stream/-/delayed-stream-1.0.0.tgz" + } + } + }, + "extend": { + "version": "3.0.0", + "from": "extend@>=3.0.0 <3.1.0", + "resolved": "https://registry.npmjs.org/extend/-/extend-3.0.0.tgz" + }, + "forever-agent": { + "version": "0.6.1", + "from": "forever-agent@>=0.6.1 <0.7.0", + "resolved": "https://registry.npmjs.org/forever-agent/-/forever-agent-0.6.1.tgz" + }, + "form-data": { + "version": "2.0.0", + "from": "form-data@>=2.0.0 <2.1.0", + "resolved": "https://registry.npmjs.org/form-data/-/form-data-2.0.0.tgz", + "dependencies": { + "asynckit": { + "version": "0.4.0", + "from": "asynckit@>=0.4.0 <0.5.0", + "resolved": "https://registry.npmjs.org/asynckit/-/asynckit-0.4.0.tgz" + } + } + }, + "har-validator": { + "version": "2.0.6", + "from": "har-validator@>=2.0.6 <2.1.0", + "resolved": "https://registry.npmjs.org/har-validator/-/har-validator-2.0.6.tgz", + "dependencies": { + "chalk": { + "version": "1.1.3", + "from": "chalk@>=1.1.1 <2.0.0", + "resolved": "https://registry.npmjs.org/chalk/-/chalk-1.1.3.tgz", + "dependencies": { + "ansi-styles": { + "version": "2.2.1", + "from": "ansi-styles@>=2.2.1 <3.0.0", + "resolved": "https://registry.npmjs.org/ansi-styles/-/ansi-styles-2.2.1.tgz" + }, + "escape-string-regexp": { + "version": "1.0.5", + "from": "escape-string-regexp@>=1.0.2 <2.0.0", + "resolved": "https://registry.npmjs.org/escape-string-regexp/-/escape-string-regexp-1.0.5.tgz" + }, + "has-ansi": { + "version": "2.0.0", + "from": "has-ansi@>=2.0.0 <3.0.0", + "resolved": "https://registry.npmjs.org/has-ansi/-/has-ansi-2.0.0.tgz", + "dependencies": { + "ansi-regex": { + "version": "2.0.0", + "from": "ansi-regex@>=2.0.0 <3.0.0", + "resolved": "https://registry.npmjs.org/ansi-regex/-/ansi-regex-2.0.0.tgz" + } + } + }, + "strip-ansi": { + "version": "3.0.1", + "from": "strip-ansi@>=3.0.0 <4.0.0", + "resolved": "https://registry.npmjs.org/strip-ansi/-/strip-ansi-3.0.1.tgz", + "dependencies": { + "ansi-regex": { + "version": "2.0.0", + "from": "ansi-regex@>=2.0.0 <3.0.0", + "resolved": "https://registry.npmjs.org/ansi-regex/-/ansi-regex-2.0.0.tgz" + } + } + }, + "supports-color": { + "version": "2.0.0", + "from": "supports-color@>=2.0.0 <3.0.0", + "resolved": "https://registry.npmjs.org/supports-color/-/supports-color-2.0.0.tgz" + } + } + }, + "commander": { + "version": "2.9.0", + "from": "commander@>=2.9.0 <3.0.0", + "resolved": "https://registry.npmjs.org/commander/-/commander-2.9.0.tgz", + "dependencies": { + "graceful-readlink": { + "version": "1.0.1", + "from": "graceful-readlink@>=1.0.0", + "resolved": "https://registry.npmjs.org/graceful-readlink/-/graceful-readlink-1.0.1.tgz" + } + } + }, + "is-my-json-valid": { + "version": "2.15.0", + "from": "is-my-json-valid@>=2.12.4 <3.0.0", + "resolved": "https://registry.npmjs.org/is-my-json-valid/-/is-my-json-valid-2.15.0.tgz", + "dependencies": { + "generate-function": { + "version": "2.0.0", + "from": "generate-function@>=2.0.0 <3.0.0", + "resolved": "https://registry.npmjs.org/generate-function/-/generate-function-2.0.0.tgz" + }, + "generate-object-property": { + "version": "1.2.0", + "from": "generate-object-property@>=1.1.0 <2.0.0", + "resolved": "https://registry.npmjs.org/generate-object-property/-/generate-object-property-1.2.0.tgz", + "dependencies": { + "is-property": { + "version": "1.0.2", + "from": "is-property@>=1.0.0 <2.0.0", + "resolved": "https://registry.npmjs.org/is-property/-/is-property-1.0.2.tgz" + } + } + }, + "jsonpointer": { + "version": "4.0.0", + "from": "jsonpointer@>=4.0.0 <5.0.0", + "resolved": "https://registry.npmjs.org/jsonpointer/-/jsonpointer-4.0.0.tgz" + }, + "xtend": { + "version": "4.0.1", + "from": "xtend@>=4.0.0 <5.0.0", + "resolved": "https://registry.npmjs.org/xtend/-/xtend-4.0.1.tgz" + } + } + }, + "pinkie-promise": { + "version": "2.0.1", + "from": "pinkie-promise@>=2.0.0 <3.0.0", + "resolved": "https://registry.npmjs.org/pinkie-promise/-/pinkie-promise-2.0.1.tgz", + "dependencies": { + "pinkie": { + "version": "2.0.4", + "from": "pinkie@>=2.0.0 <3.0.0", + "resolved": "https://registry.npmjs.org/pinkie/-/pinkie-2.0.4.tgz" + } + } + } + } + }, + "hawk": { + "version": "3.1.3", + "from": "hawk@>=3.1.3 <3.2.0", + "resolved": "https://registry.npmjs.org/hawk/-/hawk-3.1.3.tgz", + "dependencies": { + "hoek": { + "version": "2.16.3", + "from": "hoek@>=2.0.0 <3.0.0", + "resolved": "https://registry.npmjs.org/hoek/-/hoek-2.16.3.tgz" + }, + "boom": { + "version": "2.10.1", + "from": "boom@>=2.0.0 <3.0.0", + "resolved": "https://registry.npmjs.org/boom/-/boom-2.10.1.tgz" + }, + "cryptiles": { + "version": "2.0.5", + "from": "cryptiles@>=2.0.0 <3.0.0", + "resolved": "https://registry.npmjs.org/cryptiles/-/cryptiles-2.0.5.tgz" + }, + "sntp": { + "version": "1.0.9", + "from": "sntp@>=1.0.0 <2.0.0", + "resolved": "https://registry.npmjs.org/sntp/-/sntp-1.0.9.tgz" + } + } + }, + "http-signature": { + "version": "1.1.1", + "from": "http-signature@>=1.1.0 <1.2.0", + "resolved": "https://registry.npmjs.org/http-signature/-/http-signature-1.1.1.tgz", + "dependencies": { + "assert-plus": { + "version": "0.2.0", + "from": "assert-plus@>=0.2.0 <0.3.0", + "resolved": "https://registry.npmjs.org/assert-plus/-/assert-plus-0.2.0.tgz" + }, + "jsprim": { + "version": "1.3.1", + "from": "jsprim@>=1.2.2 <2.0.0", + "resolved": "https://registry.npmjs.org/jsprim/-/jsprim-1.3.1.tgz", + "dependencies": { + "extsprintf": { + "version": "1.0.2", + "from": "extsprintf@1.0.2", + "resolved": "https://registry.npmjs.org/extsprintf/-/extsprintf-1.0.2.tgz" + }, + "json-schema": { + "version": "0.2.3", + "from": "json-schema@0.2.3", + "resolved": "https://registry.npmjs.org/json-schema/-/json-schema-0.2.3.tgz" + }, + "verror": { + "version": "1.3.6", + "from": "verror@1.3.6", + "resolved": "https://registry.npmjs.org/verror/-/verror-1.3.6.tgz" + } + } + }, + "sshpk": { + "version": "1.10.1", + "from": "sshpk@>=1.7.0 <2.0.0", + "resolved": "https://registry.npmjs.org/sshpk/-/sshpk-1.10.1.tgz", + "dependencies": { + "asn1": { + "version": "0.2.3", + "from": "asn1@>=0.2.3 <0.3.0", + "resolved": "https://registry.npmjs.org/asn1/-/asn1-0.2.3.tgz" + }, + "assert-plus": { + "version": "1.0.0", + "from": "assert-plus@>=1.0.0 <2.0.0", + "resolved": "https://registry.npmjs.org/assert-plus/-/assert-plus-1.0.0.tgz" + }, + "dashdash": { + "version": "1.14.0", + "from": "dashdash@>=1.12.0 <2.0.0", + "resolved": "https://registry.npmjs.org/dashdash/-/dashdash-1.14.0.tgz" + }, + "getpass": { + "version": "0.1.6", + "from": "getpass@>=0.1.1 <0.2.0", + "resolved": "https://registry.npmjs.org/getpass/-/getpass-0.1.6.tgz" + }, + "jsbn": { + "version": "0.1.0", + "from": "jsbn@>=0.1.0 <0.2.0", + "resolved": "https://registry.npmjs.org/jsbn/-/jsbn-0.1.0.tgz" + }, + "tweetnacl": { + "version": "0.14.3", + "from": "tweetnacl@>=0.14.0 <0.15.0", + "resolved": "https://registry.npmjs.org/tweetnacl/-/tweetnacl-0.14.3.tgz" + }, + "jodid25519": { + "version": "1.0.2", + "from": "jodid25519@>=1.0.0 <2.0.0", + "resolved": "https://registry.npmjs.org/jodid25519/-/jodid25519-1.0.2.tgz" + }, + "ecc-jsbn": { + "version": "0.1.1", + "from": "ecc-jsbn@>=0.1.1 <0.2.0", + "resolved": "https://registry.npmjs.org/ecc-jsbn/-/ecc-jsbn-0.1.1.tgz" + }, + "bcrypt-pbkdf": { + "version": "1.0.0", + "from": "bcrypt-pbkdf@>=1.0.0 <2.0.0", + "resolved": "https://registry.npmjs.org/bcrypt-pbkdf/-/bcrypt-pbkdf-1.0.0.tgz" + } + } + } + } + }, + "is-typedarray": { + "version": "1.0.0", + "from": "is-typedarray@>=1.0.0 <1.1.0", + "resolved": "https://registry.npmjs.org/is-typedarray/-/is-typedarray-1.0.0.tgz" + }, + "isstream": { + "version": "0.1.2", + "from": "isstream@>=0.1.2 <0.2.0", + "resolved": "https://registry.npmjs.org/isstream/-/isstream-0.1.2.tgz" + }, + "json-stringify-safe": { + "version": "5.0.1", + "from": "json-stringify-safe@>=5.0.1 <5.1.0", + "resolved": "https://registry.npmjs.org/json-stringify-safe/-/json-stringify-safe-5.0.1.tgz" + }, + "mime-types": { + "version": "2.1.12", + "from": "mime-types@>=2.1.7 <2.2.0", + "resolved": "https://registry.npmjs.org/mime-types/-/mime-types-2.1.12.tgz", + "dependencies": { + "mime-db": { + "version": "1.24.0", + "from": "mime-db@>=1.24.0 <1.25.0", + "resolved": "https://registry.npmjs.org/mime-db/-/mime-db-1.24.0.tgz" + } + } + }, + "oauth-sign": { + "version": "0.8.2", + "from": "oauth-sign@>=0.8.1 <0.9.0", + "resolved": "https://registry.npmjs.org/oauth-sign/-/oauth-sign-0.8.2.tgz" + }, + "stringstream": { + "version": "0.0.5", + "from": "stringstream@>=0.0.4 <0.1.0", + "resolved": "https://registry.npmjs.org/stringstream/-/stringstream-0.0.5.tgz" + }, + "tough-cookie": { + "version": "2.3.1", + "from": "tough-cookie@>=2.3.0 <2.4.0", + "resolved": "https://registry.npmjs.org/tough-cookie/-/tough-cookie-2.3.1.tgz" + }, + "tunnel-agent": { + "version": "0.4.3", + "from": "tunnel-agent@>=0.4.1 <0.5.0", + "resolved": "https://registry.npmjs.org/tunnel-agent/-/tunnel-agent-0.4.3.tgz" + } + } + }, "step": { "version": "0.0.6", "from": "step@>=0.0.5 <0.1.0", @@ -794,7 +1165,7 @@ "dependencies": { "strip-ansi": { "version": "3.0.1", - "from": "strip-ansi@>=3.0.1 <4.0.0", + "from": "strip-ansi@>=3.0.0 <4.0.0", "resolved": "https://registry.npmjs.org/strip-ansi/-/strip-ansi-3.0.1.tgz", "dependencies": { "ansi-regex": { @@ -1071,7 +1442,7 @@ }, "strip-ansi": { "version": "3.0.1", - "from": "strip-ansi@>=3.0.1 <4.0.0", + "from": "strip-ansi@>=3.0.0 <4.0.0", "resolved": "https://registry.npmjs.org/strip-ansi/-/strip-ansi-3.0.1.tgz", "dependencies": { "ansi-regex": { diff --git a/package.json b/package.json index c353b3e1..82722930 100644 --- a/package.json +++ b/package.json @@ -34,6 +34,7 @@ "queue-async": "~1.0.7", "redis-mpool": "0.4.0", "redlock": "2.0.1", + "request": "~2.75.0", "step": "~0.0.5", "step-profiler": "~0.3.0", "topojson": "0.0.8", @@ -42,7 +43,6 @@ }, "devDependencies": { "istanbul": "~0.4.2", - "request": "~2.60.0", "shapefile": "0.3.0", "mocha": "~1.21.4", "jshint": "~2.6.0", From 80d2e190ad3b4aaf0067db6113bba6e5f2331212 Mon Sep 17 00:00:00 2001 From: Raul Ochoa Date: Wed, 19 Oct 2016 18:47:55 +0200 Subject: [PATCH 28/35] Fix test, use fixed to replace one and infinity --- test/integration/batch/scheduler.js | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/test/integration/batch/scheduler.js b/test/integration/batch/scheduler.js index 7a2a51c0..4ec5a8f0 100644 --- a/test/integration/batch/scheduler.js +++ b/test/integration/batch/scheduler.js @@ -3,8 +3,7 @@ require('../../helper'); var assert = require('../../support/assert'); var Scheduler = require('../../../batch/scheduler/scheduler'); -var OneCapacity = require('../../../batch/scheduler/capacity/one'); -var InfinityCapacity = require('../../../batch/scheduler/capacity/infinity'); +var FixedCapacity = require('../../../batch/scheduler/capacity/fixed'); describe('scheduler', function() { @@ -20,7 +19,7 @@ describe('scheduler', function() { }; // simulate one by one or infinity capacity - var capacities = [new OneCapacity(), new InfinityCapacity()]; + var capacities = [new FixedCapacity(1), new FixedCapacity(Infinity)]; capacities.forEach(function(capacity) { it('should run tasks', function (done) { From 4a57d641c7c8fda0715647dbb7724124759277b2 Mon Sep 17 00:00:00 2001 From: Raul Ochoa Date: Thu, 20 Oct 2016 10:20:51 +0200 Subject: [PATCH 29/35] Update news and bump version --- NEWS.md | 8 +++++++- npm-shrinkwrap.json | 2 +- package.json | 2 +- 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/NEWS.md b/NEWS.md index 3b419cab..2c2c14ec 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,6 +1,12 @@ -1.39.2 - 2016-mm-dd +1.40.0 - 2016-mm-dd ------------------- +New features: + * Batch queries are handled per db host. + - There is an scheduler controlling how many queries and in what order they are run. + - Priority is based on: # of queries already run, last execution time, and oldest user in queue. + * Batch queries capacity: allow to configure how many jobs to run per db host. + 1.39.1 - 2016-10-17 ------------------- diff --git a/npm-shrinkwrap.json b/npm-shrinkwrap.json index a00ce331..d87d1ee1 100644 --- a/npm-shrinkwrap.json +++ b/npm-shrinkwrap.json @@ -1,6 +1,6 @@ { "name": "cartodb_sql_api", - "version": "1.39.2", + "version": "1.40.0", "dependencies": { "bintrees": { "version": "1.0.1", diff --git a/package.json b/package.json index 82722930..c4f14e6d 100644 --- a/package.json +++ b/package.json @@ -5,7 +5,7 @@ "keywords": [ "cartodb" ], - "version": "1.39.2", + "version": "1.40.0", "repository": { "type": "git", "url": "git://github.com/CartoDB/CartoDB-SQL-API.git" From 66cc137d0488248dbcbbb3718366779574ada86c Mon Sep 17 00:00:00 2001 From: Raul Ochoa Date: Thu, 20 Oct 2016 11:12:08 +0200 Subject: [PATCH 30/35] Split http capacity between simple and load - Simple will use 'available_cores' from response. - Load will use 'cores' and 'relative_load'. --- batch/scheduler/capacity/http-load.js | 34 ++++++++++++++++ batch/scheduler/capacity/http-simple.js | 47 ++++++++++++---------- batch/scheduler/host-scheduler.js | 12 ++++-- config/environments/development.js.example | 6 ++- config/environments/production.js.example | 6 ++- config/environments/staging.js.example | 6 ++- config/environments/test.js.example | 6 ++- 7 files changed, 85 insertions(+), 32 deletions(-) create mode 100644 batch/scheduler/capacity/http-load.js diff --git a/batch/scheduler/capacity/http-load.js b/batch/scheduler/capacity/http-load.js new file mode 100644 index 00000000..fcc65978 --- /dev/null +++ b/batch/scheduler/capacity/http-load.js @@ -0,0 +1,34 @@ +'use strict'; + +var util = require('util'); +var debug = require('../../util/debug')('capacity-http-load'); +var HttpSimpleCapacity = require('./http-simple'); + +function HttpLoadCapacity(host, capacityEndpoint) { + HttpSimpleCapacity.call(this); + this.host = host; + this.capacityEndpoint = capacityEndpoint; +} +util.inherits(HttpLoadCapacity, HttpSimpleCapacity); + +module.exports = HttpLoadCapacity; + +HttpLoadCapacity.prototype.getCapacity = function(callback) { + this.getResponse(function(err, values) { + var capacity = 1; + + if (err) { + return callback(null, capacity); + } + + var cores = parseInt(values.cores, 10); + var relativeLoad = parseFloat(values.relative_load); + + capacity = Math.max(1, Math.floor(((1 - relativeLoad) * cores) - 1)); + + capacity = Number.isFinite(capacity) ? capacity : 1; + + debug('host=%s, capacity=%s', this.host, capacity); + return callback(null, capacity); + }.bind(this)); +}; diff --git a/batch/scheduler/capacity/http-simple.js b/batch/scheduler/capacity/http-simple.js index da817392..26be67dd 100644 --- a/batch/scheduler/capacity/http-simple.js +++ b/batch/scheduler/capacity/http-simple.js @@ -1,7 +1,7 @@ 'use strict'; var request = require('request'); -var debug = require('../../util/debug')('capacity-http'); +var debug = require('../../util/debug')('capacity-http-simple'); function HttpSimpleCapacity(host, capacityEndpoint) { this.host = host; @@ -11,6 +11,24 @@ function HttpSimpleCapacity(host, capacityEndpoint) { module.exports = HttpSimpleCapacity; HttpSimpleCapacity.prototype.getCapacity = function(callback) { + this.getResponse(function(err, values) { + var capacity = 1; + + if (err) { + return callback(null, capacity); + } + + var availableCores = parseInt(values.available_cores, 10); + + capacity = Math.max(availableCores, 1); + capacity = Number.isFinite(capacity) ? capacity : 1; + + debug('host=%s, capacity=%s', this.host, capacity); + return callback(null, capacity); + }.bind(this)); +}; + +HttpSimpleCapacity.prototype.getResponse = function(callback) { var requestParams = { method: 'POST', url: this.capacityEndpoint, @@ -18,25 +36,12 @@ HttpSimpleCapacity.prototype.getCapacity = function(callback) { }; debug('getCapacity(%s)', this.host); request.post(requestParams, function(err, res, jsonRes) { - var capacity = 1; - - if (!err && jsonRes) { - if (jsonRes.retcode === 0) { - var values = jsonRes.return_values; - - var cores = parseInt(values.cores, 10); - var relativeLoad = parseFloat(values.relative_load); - - capacity = Math.max( - Math.floor(((1 - relativeLoad) * cores) - 1), - 1 - ); - - debug('host=%s, capacity=%s', this.host, capacity); - } + if (err) { + return callback(err); } - - debug('host=%s, capacity=%s', this.host, capacity); - return callback(null, capacity); - }.bind(this)); + if (jsonRes && jsonRes.retcode === 0) { + return callback(null, jsonRes.return_values || {}); + } + return callback(new Error('Could not retrieve information from endpoint')); + }); }; diff --git a/batch/scheduler/host-scheduler.js b/batch/scheduler/host-scheduler.js index 259ec9b2..f8f1f7f7 100644 --- a/batch/scheduler/host-scheduler.js +++ b/batch/scheduler/host-scheduler.js @@ -6,6 +6,7 @@ var Scheduler = require('./scheduler'); var Locker = require('../leader/locker'); var FixedCapacity = require('./capacity/fixed'); var HttpSimpleCapacity = require('./capacity/http-simple'); +var HttpLoadCapacity = require('./capacity/http-load'); function HostScheduler(name, taskRunner, redisPool) { this.name = name || 'scheduler'; @@ -35,12 +36,17 @@ HostScheduler.prototype.add = function(host, user, callback) { }; HostScheduler.prototype.getCapacityProvider = function(host) { - var strategy = global.settings.batch_capacity_strategy || 'fixed'; - if (strategy === 'http') { + var strategy = global.settings.batch_capacity_strategy; + + if (strategy === 'http-simple' || strategy === 'http-load') { if (global.settings.batch_capacity_http_url_template) { var endpoint = _.template(global.settings.batch_capacity_http_url_template, { dbhost: host }); debug('Using strategy=%s capacity. Endpoint=%s', strategy, endpoint); - return new HttpSimpleCapacity(host, endpoint); + + if (strategy === 'http-simple') { + return new HttpSimpleCapacity(host, endpoint); + } + return new HttpLoadCapacity(host, endpoint); } } diff --git a/config/environments/development.js.example b/config/environments/development.js.example index f7fdb858..21043d35 100644 --- a/config/environments/development.js.example +++ b/config/environments/development.js.example @@ -36,15 +36,17 @@ module.exports.batch_log_filename = 'logs/batch-queries.log'; module.exports.batch_max_queued_jobs = 64; // Capacity strategy to use. // It allows to tune how many queries run at a db host at the same time. -// Options: 'fixed', 'http' +// Options: 'fixed', 'http-simple', 'http-load' module.exports.batch_capacity_strategy = 'fixed'; // Applies when strategy='fixed'. // Number of simultaneous users running queries in the same host. // It will use 1 as min. module.exports.batch_capacity_fixed_amount = 2; -// Applies when strategy='http'. +// Applies when strategy='http-simple' or strategy='http-load'. // HTTP endpoint to check db host load. // Helps to decide the number of simultaneous users running queries in that host. +// 'http-simple' will use 'available_cores' to decide the number. +// 'http-load' will use 'cores' and 'relative_load' to decide the number. // It will use 1 as min. // If no template is provided it will default to 'fixed' strategy. module.exports.batch_capacity_http_url_template = 'http://<%= dbhost %>:9999/load'; diff --git a/config/environments/production.js.example b/config/environments/production.js.example index b4017f7d..6ad7e3ab 100644 --- a/config/environments/production.js.example +++ b/config/environments/production.js.example @@ -37,15 +37,17 @@ module.exports.batch_log_filename = 'logs/batch-queries.log'; module.exports.batch_max_queued_jobs = 64; // Capacity strategy to use. // It allows to tune how many queries run at a db host at the same time. -// Options: 'fixed', 'http' +// Options: 'fixed', 'http-simple', 'http-load' module.exports.batch_capacity_strategy = 'fixed'; // Applies when strategy='fixed'. // Number of simultaneous users running queries in the same host. // It will use 1 as min. module.exports.batch_capacity_fixed_amount = 2; -// Applies when strategy='http'. +// Applies when strategy='http-simple' or strategy='http-load'. // HTTP endpoint to check db host load. // Helps to decide the number of simultaneous users running queries in that host. +// 'http-simple' will use 'available_cores' to decide the number. +// 'http-load' will use 'cores' and 'relative_load' to decide the number. // It will use 1 as min. // If no template is provided it will default to 'fixed' strategy. module.exports.batch_capacity_http_url_template = 'http://<%= dbhost %>:9999/load'; diff --git a/config/environments/staging.js.example b/config/environments/staging.js.example index 74c01aae..926480a1 100644 --- a/config/environments/staging.js.example +++ b/config/environments/staging.js.example @@ -37,15 +37,17 @@ module.exports.batch_log_filename = 'logs/batch-queries.log'; module.exports.batch_max_queued_jobs = 64; // Capacity strategy to use. // It allows to tune how many queries run at a db host at the same time. -// Options: 'fixed', 'http' +// Options: 'fixed', 'http-simple', 'http-load' module.exports.batch_capacity_strategy = 'fixed'; // Applies when strategy='fixed'. // Number of simultaneous users running queries in the same host. // It will use 1 as min. module.exports.batch_capacity_fixed_amount = 2; -// Applies when strategy='http'. +// Applies when strategy='http-simple' or strategy='http-load'. // HTTP endpoint to check db host load. // Helps to decide the number of simultaneous users running queries in that host. +// 'http-simple' will use 'available_cores' to decide the number. +// 'http-load' will use 'cores' and 'relative_load' to decide the number. // It will use 1 as min. // If no template is provided it will default to 'fixed' strategy. module.exports.batch_capacity_http_url_template = 'http://<%= dbhost %>:9999/load'; diff --git a/config/environments/test.js.example b/config/environments/test.js.example index 71111f0d..19320de9 100644 --- a/config/environments/test.js.example +++ b/config/environments/test.js.example @@ -34,15 +34,17 @@ module.exports.batch_log_filename = 'logs/batch-queries.log'; module.exports.batch_max_queued_jobs = 64; // Capacity strategy to use. // It allows to tune how many queries run at a db host at the same time. -// Options: 'fixed', 'http' +// Options: 'fixed', 'http-simple', 'http-load' module.exports.batch_capacity_strategy = 'fixed'; // Applies when strategy='fixed'. // Number of simultaneous users running queries in the same host. // It will use 1 as min. module.exports.batch_capacity_fixed_amount = 2; -// Applies when strategy='http'. +// Applies when strategy='http-simple' or strategy='http-load'. // HTTP endpoint to check db host load. // Helps to decide the number of simultaneous users running queries in that host. +// 'http-simple' will use 'available_cores' to decide the number. +// 'http-load' will use 'cores' and 'relative_load' to decide the number. // It will use 1 as min. // If no template is provided it will default to 'fixed' strategy. module.exports.batch_capacity_http_url_template = 'http://<%= dbhost %>:9999/load'; From 19def2f31e68509316b6343da79cee1f850b641e Mon Sep 17 00:00:00 2001 From: Raul Ochoa Date: Thu, 20 Oct 2016 11:12:27 +0200 Subject: [PATCH 31/35] Default to 2 jobs in fixed capacity. --- batch/scheduler/host-scheduler.js | 2 +- config/environments/development.js.example | 1 + config/environments/production.js.example | 1 + config/environments/staging.js.example | 1 + config/environments/test.js.example | 1 + 5 files changed, 5 insertions(+), 1 deletion(-) diff --git a/batch/scheduler/host-scheduler.js b/batch/scheduler/host-scheduler.js index f8f1f7f7..5cd238bd 100644 --- a/batch/scheduler/host-scheduler.js +++ b/batch/scheduler/host-scheduler.js @@ -50,7 +50,7 @@ HostScheduler.prototype.getCapacityProvider = function(host) { } } - var fixedCapacity = global.settings.batch_capacity_fixed_amount || 1; + var fixedCapacity = global.settings.batch_capacity_fixed_amount || 2; debug('Using strategy=fixed capacity=%d', fixedCapacity); return new FixedCapacity(fixedCapacity); }; diff --git a/config/environments/development.js.example b/config/environments/development.js.example index 21043d35..32bc65e8 100644 --- a/config/environments/development.js.example +++ b/config/environments/development.js.example @@ -41,6 +41,7 @@ module.exports.batch_capacity_strategy = 'fixed'; // Applies when strategy='fixed'. // Number of simultaneous users running queries in the same host. // It will use 1 as min. +// Default 2. module.exports.batch_capacity_fixed_amount = 2; // Applies when strategy='http-simple' or strategy='http-load'. // HTTP endpoint to check db host load. diff --git a/config/environments/production.js.example b/config/environments/production.js.example index 6ad7e3ab..527d6d63 100644 --- a/config/environments/production.js.example +++ b/config/environments/production.js.example @@ -42,6 +42,7 @@ module.exports.batch_capacity_strategy = 'fixed'; // Applies when strategy='fixed'. // Number of simultaneous users running queries in the same host. // It will use 1 as min. +// Default 2. module.exports.batch_capacity_fixed_amount = 2; // Applies when strategy='http-simple' or strategy='http-load'. // HTTP endpoint to check db host load. diff --git a/config/environments/staging.js.example b/config/environments/staging.js.example index 926480a1..82a52fc8 100644 --- a/config/environments/staging.js.example +++ b/config/environments/staging.js.example @@ -42,6 +42,7 @@ module.exports.batch_capacity_strategy = 'fixed'; // Applies when strategy='fixed'. // Number of simultaneous users running queries in the same host. // It will use 1 as min. +// Default 2. module.exports.batch_capacity_fixed_amount = 2; // Applies when strategy='http-simple' or strategy='http-load'. // HTTP endpoint to check db host load. diff --git a/config/environments/test.js.example b/config/environments/test.js.example index 19320de9..17aa5e85 100644 --- a/config/environments/test.js.example +++ b/config/environments/test.js.example @@ -39,6 +39,7 @@ module.exports.batch_capacity_strategy = 'fixed'; // Applies when strategy='fixed'. // Number of simultaneous users running queries in the same host. // It will use 1 as min. +// Default 2. module.exports.batch_capacity_fixed_amount = 2; // Applies when strategy='http-simple' or strategy='http-load'. // HTTP endpoint to check db host load. From 75f1ddb049623dfe67dc0b0296507aa80ab12b8c Mon Sep 17 00:00:00 2001 From: Raul Ochoa Date: Thu, 20 Oct 2016 12:06:17 +0200 Subject: [PATCH 32/35] Timeout for http capacity requests --- batch/scheduler/capacity/http-simple.js | 1 + 1 file changed, 1 insertion(+) diff --git a/batch/scheduler/capacity/http-simple.js b/batch/scheduler/capacity/http-simple.js index 26be67dd..945c8868 100644 --- a/batch/scheduler/capacity/http-simple.js +++ b/batch/scheduler/capacity/http-simple.js @@ -32,6 +32,7 @@ HttpSimpleCapacity.prototype.getResponse = function(callback) { var requestParams = { method: 'POST', url: this.capacityEndpoint, + timeout: 2000, json: true }; debug('getCapacity(%s)', this.host); From d3f3d5ca36f220a406c237afe3a74698f3ef929a Mon Sep 17 00:00:00 2001 From: Raul Ochoa Date: Thu, 20 Oct 2016 12:06:32 +0200 Subject: [PATCH 33/35] Call parent with params --- batch/scheduler/capacity/http-load.js | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/batch/scheduler/capacity/http-load.js b/batch/scheduler/capacity/http-load.js index fcc65978..4fd6c1ae 100644 --- a/batch/scheduler/capacity/http-load.js +++ b/batch/scheduler/capacity/http-load.js @@ -5,9 +5,7 @@ var debug = require('../../util/debug')('capacity-http-load'); var HttpSimpleCapacity = require('./http-simple'); function HttpLoadCapacity(host, capacityEndpoint) { - HttpSimpleCapacity.call(this); - this.host = host; - this.capacityEndpoint = capacityEndpoint; + HttpSimpleCapacity.call(this, host, capacityEndpoint); } util.inherits(HttpLoadCapacity, HttpSimpleCapacity); From 5185c1e225043ecf876787737204a2a726c828f1 Mon Sep 17 00:00:00 2001 From: Raul Ochoa Date: Thu, 20 Oct 2016 12:06:51 +0200 Subject: [PATCH 34/35] Cache valid responses for 500 ms --- batch/scheduler/capacity/http-simple.js | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/batch/scheduler/capacity/http-simple.js b/batch/scheduler/capacity/http-simple.js index 945c8868..fe73a7a3 100644 --- a/batch/scheduler/capacity/http-simple.js +++ b/batch/scheduler/capacity/http-simple.js @@ -6,6 +6,9 @@ var debug = require('../../util/debug')('capacity-http-simple'); function HttpSimpleCapacity(host, capacityEndpoint) { this.host = host; this.capacityEndpoint = capacityEndpoint; + + this.lastResponse = null; + this.lastResponseTime = 0; } module.exports = HttpSimpleCapacity; @@ -36,13 +39,24 @@ HttpSimpleCapacity.prototype.getResponse = function(callback) { json: true }; debug('getCapacity(%s)', this.host); + + // throttle requests for 500 ms + var now = Date.now(); + if (this.lastResponse !== null && ((now - this.lastResponseTime) < 500)) { + return callback(null, this.lastResponse); + } + request.post(requestParams, function(err, res, jsonRes) { if (err) { return callback(err); } if (jsonRes && jsonRes.retcode === 0) { - return callback(null, jsonRes.return_values || {}); + this.lastResponse = jsonRes.return_values || {}; + // We could go more aggressive by updating lastResponseTime on failures. + this.lastResponseTime = now; + + return callback(null, this.lastResponse); } return callback(new Error('Could not retrieve information from endpoint')); - }); + }.bind(this)); }; From 4e3bff9a702c1bb1a7cd3fa2b4fd908da75d9fb5 Mon Sep 17 00:00:00 2001 From: Raul Ochoa Date: Thu, 20 Oct 2016 12:21:41 +0200 Subject: [PATCH 35/35] Simplify scheduler to only consider task creation and number of queries --- NEWS.md | 2 +- batch/scheduler/scheduler.js | 5 ----- 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/NEWS.md b/NEWS.md index 2c2c14ec..665308b6 100644 --- a/NEWS.md +++ b/NEWS.md @@ -4,7 +4,7 @@ New features: * Batch queries are handled per db host. - There is an scheduler controlling how many queries and in what order they are run. - - Priority is based on: # of queries already run, last execution time, and oldest user in queue. + - Priority is based on: number of queries already ran, and oldest user in queue. * Batch queries capacity: allow to configure how many jobs to run per db host. diff --git a/batch/scheduler/scheduler.js b/batch/scheduler/scheduler.js index a05ac1ca..7ee10560 100644 --- a/batch/scheduler/scheduler.js +++ b/batch/scheduler/scheduler.js @@ -31,11 +31,6 @@ function Scheduler(capacity, taskRunner) { return taskEntityA.jobs - taskEntityB.jobs; } - // priority for entity with oldest executed job - if (taskEntityA.runAt !== taskEntityB.runAt) { - return taskEntityA.runAt - taskEntityB.runAt; - } - // priority for oldest job if (taskEntityA.createdAt !== taskEntityB.createdAt) { return taskEntityA.createdAt - taskEntityB.createdAt;