From 1ee08786319df145dff0cef23ce57d781efc0c56 Mon Sep 17 00:00:00 2001 From: Raul Ochoa Date: Wed, 19 Oct 2016 12:39:33 +0200 Subject: [PATCH] =?UTF-8?q?Scheduler=20uses=20a=20red=E2=80=93black=20tree?= =?UTF-8?q?=20to=20decide=20on=20next=20job=20candidate?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- batch/scheduler/scheduler.js | 101 +++++++++++++++++++------- npm-shrinkwrap.json | 9 ++- package.json | 1 + test/integration/batch/scheduler.js | 106 ++++++++++++++++++++++++++++ 4 files changed, 189 insertions(+), 28 deletions(-) create mode 100644 test/integration/batch/scheduler.js diff --git a/batch/scheduler/scheduler.js b/batch/scheduler/scheduler.js index f706e1c2..f044c324 100644 --- a/batch/scheduler/scheduler.js +++ b/batch/scheduler/scheduler.js @@ -1,7 +1,13 @@ 'use strict'; +// Inspiration from: +// - https://www.kernel.org/doc/Documentation/scheduler/sched-design-CFS.txt +// - https://www.kernel.org/doc/Documentation/rbtree.txt +// - http://www.ibm.com/developerworks/linux/library/l-completely-fair-scheduler/ + var util = require('util'); var EventEmitter = require('events').EventEmitter; +var RBTree = require('bintrees').RBTree; var debug = require('../util/debug')('scheduler'); @@ -13,6 +19,30 @@ function Scheduler(capacity, taskRunner) { this.capacity = capacity; this.tasks = []; this.users = {}; + this.tasksTree = new RBTree(function(taskEntityA, taskEntityB) { + // if the user is the same it's the same entity + if (taskEntityA.user === taskEntityB.user) { + return 0; + } + + // priority for entity with less executed jobs + if (taskEntityA.jobs !== taskEntityB.jobs) { + return taskEntityA.jobs - taskEntityB.jobs; + } + + // priority for entity with oldest executed job + if (taskEntityA.runAt !== taskEntityB.runAt) { + return taskEntityA.runAt - taskEntityB.runAt; + } + + // priority for oldest job + if (taskEntityA.createdAt !== taskEntityB.createdAt) { + return taskEntityA.createdAt - taskEntityB.createdAt; + } + + // we don't care if we arrive here + return -1; + }); } util.inherits(Scheduler, EventEmitter); @@ -20,17 +50,18 @@ module.exports = Scheduler; Scheduler.prototype.add = function(user) { debug('add(%s)', user); - var task = this.users[user]; - if (task) { - if (task.status === STATUS.DONE) { - task.status = STATUS.PENDING; + var taskEntity = this.users[user]; + if (taskEntity) { + if (taskEntity.status === STATUS.DONE) { + taskEntity.status = STATUS.PENDING; } return true; } else { - task = new TaskEntity(user); - this.tasks.push(task); - this.users[user] = task; + taskEntity = new TaskEntity(user, this.tasks.length); + this.tasks.push(taskEntity); + this.users[user] = taskEntity; + this.tasksTree.insert(taskEntity); return false; } @@ -45,7 +76,7 @@ Scheduler.prototype.schedule = function() { var self = this; forever( function (next) { - debug('Trying to acquire user'); + debug('Waiting for task'); self.acquire(function(err, taskEntity) { debug('Acquired user=%j', taskEntity); @@ -53,23 +84,26 @@ Scheduler.prototype.schedule = function() { return next(new Error('all users finished')); } - taskEntity.status = STATUS.RUNNING; - // try to acquire next user - // will block until capacity slow is available - next(); + self.tasksTree.remove(taskEntity); + taskEntity.running(); debug('Running task for user=%s', taskEntity.user); self.taskRunner.run(taskEntity.user, function(err, userQueueIsEmpty) { - taskEntity.status = userQueueIsEmpty ? STATUS.DONE : STATUS.PENDING; - + debug('Run task=%j, done=%s', taskEntity, userQueueIsEmpty); + taskEntity.ran(userQueueIsEmpty); self.release(err, taskEntity); }); + + // try to acquire next user + // will block until capacity slot is available + next(); }); }, function (err) { debug('done: %s', err.message); self.running = false; self.emit('done'); + self.removeAllListeners(); } ); @@ -80,28 +114,29 @@ Scheduler.prototype.acquire = function(callback) { if (this.tasks.every(is(STATUS.DONE))) { return callback(null, null); } + var self = this; this.capacity.getCapacity(function(err, capacity) { if (err) { return callback(err); } - var running = self.tasks.filter(is(STATUS.RUNNING)); + debug('Trying to acquire task'); - debug('Trying to acquire users=%j, running=%d, capacity=%d', self.tasks, running.length, capacity); - var allUsersRunning = self.tasks.every(is(STATUS.RUNNING)); - if (running.length >= capacity || allUsersRunning) { - debug( - 'Waiting for slot. capacity=%s, running=%s, all_running=%s', - capacity, running.length, allUsersRunning - ); - return self.once('release', function() { + var allRunning = self.tasks.every(is(STATUS.RUNNING)); + var running = self.tasks.filter(is(STATUS.RUNNING)); + debug('[capacity=%d, running=%d, all=%s] candidates=%j', capacity, running.length, allRunning, self.tasks); + + var isRunningAny = self.tasks.some(is(STATUS.RUNNING)); + if (isRunningAny || running.length >= capacity) { + debug('Waiting for slot'); + return self.once('release', function releaseListener() { debug('Slot was released'); self.acquire(callback); }); } - var candidate = self.tasks.filter(is(STATUS.PENDING))[0]; + var candidate = self.tasksTree.min(); return callback(null, candidate); }); @@ -109,7 +144,9 @@ Scheduler.prototype.acquire = function(callback) { Scheduler.prototype.release = function(err, taskEntity) { debug('Released %j', taskEntity); - // decide what to do based on status/jobs + if (taskEntity.is(STATUS.PENDING)) { + this.tasksTree.insert(taskEntity); + } this.emit('release'); }; @@ -122,16 +159,28 @@ var STATUS = { DONE: 'done' }; -function TaskEntity(user) { +function TaskEntity(user, createdAt) { this.user = user; + this.createdAt = createdAt; this.status = STATUS.PENDING; this.jobs = 0; + this.runAt = 0; } TaskEntity.prototype.is = function(status) { return this.status === status; }; +TaskEntity.prototype.running = function() { + this.status = STATUS.RUNNING; + this.runAt = Date.now(); +}; + +TaskEntity.prototype.ran = function(userQueueIsEmpty) { + this.jobs++; + this.status = userQueueIsEmpty ? STATUS.DONE : STATUS.PENDING; +}; + function is(status) { return function(taskEntity) { return taskEntity.is(status); diff --git a/npm-shrinkwrap.json b/npm-shrinkwrap.json index 7fb0db50..395250ac 100644 --- a/npm-shrinkwrap.json +++ b/npm-shrinkwrap.json @@ -2,6 +2,11 @@ "name": "cartodb_sql_api", "version": "1.39.2", "dependencies": { + "bintrees": { + "version": "1.0.1", + "from": "bintrees@>=1.0.1 <2.0.0", + "resolved": "https://registry.npmjs.org/bintrees/-/bintrees-1.0.1.tgz" + }, "bunyan": { "version": "1.8.1", "from": "bunyan@1.8.1", @@ -52,9 +57,9 @@ "resolved": "https://registry.npmjs.org/glob/-/glob-6.0.4.tgz", "dependencies": { "inflight": { - "version": "1.0.5", + "version": "1.0.6", "from": "inflight@>=1.0.4 <2.0.0", - "resolved": "https://registry.npmjs.org/inflight/-/inflight-1.0.5.tgz", + "resolved": "https://registry.npmjs.org/inflight/-/inflight-1.0.6.tgz", "dependencies": { "wrappy": { "version": "1.0.2", diff --git a/package.json b/package.json index 68fec31e..c353b3e1 100644 --- a/package.json +++ b/package.json @@ -17,6 +17,7 @@ "Sandro Santilli " ], "dependencies": { + "bintrees": "1.0.1", "bunyan": "1.8.1", "cartodb-psql": "~0.6.0", "cartodb-query-tables": "0.2.0", diff --git a/test/integration/batch/scheduler.js b/test/integration/batch/scheduler.js new file mode 100644 index 00000000..7a2a51c0 --- /dev/null +++ b/test/integration/batch/scheduler.js @@ -0,0 +1,106 @@ +'use strict'; + +require('../../helper'); +var assert = require('../../support/assert'); +var Scheduler = require('../../../batch/scheduler/scheduler'); +var OneCapacity = require('../../../batch/scheduler/capacity/one'); +var InfinityCapacity = require('../../../batch/scheduler/capacity/infinity'); + +describe('scheduler', function() { + + function TaskRunner(userTasks) { + this.results = []; + this.userTasks = userTasks; + } + + TaskRunner.prototype.run = function(user, callback) { + this.results.push(user); + this.userTasks[user]--; + return callback(null, this.userTasks[user] === 0); + }; + + // simulate one by one or infinity capacity + var capacities = [new OneCapacity(), new InfinityCapacity()]; + + capacities.forEach(function(capacity) { + it('should run tasks', function (done) { + var taskRunner = new TaskRunner({ + userA: 1 + }); + var scheduler = new Scheduler(capacity, taskRunner); + scheduler.add('userA'); + + scheduler.on('done', function() { + var results = taskRunner.results; + + assert.equal(results.length, 1); + + assert.equal(results[0], 'userA'); + + return done(); + }); + + scheduler.schedule(); + }); + + + it('should run tasks for different users', function (done) { + var taskRunner = new TaskRunner({ + userA: 1, + userB: 1, + userC: 1 + }); + var scheduler = new Scheduler(capacity, taskRunner); + scheduler.add('userA'); + scheduler.add('userB'); + scheduler.add('userC'); + + scheduler.on('done', function() { + var results = taskRunner.results; + + assert.equal(results.length, 3); + + assert.equal(results[0], 'userA'); + assert.equal(results[1], 'userB'); + assert.equal(results[2], 'userC'); + + return done(); + }); + + scheduler.schedule(); + }); + + it('should be fair when scheduling tasks', function (done) { + var taskRunner = new TaskRunner({ + userA: 3, + userB: 2, + userC: 1 + }); + + var scheduler = new Scheduler(capacity, taskRunner); + scheduler.add('userA'); + scheduler.add('userA'); + scheduler.add('userA'); + scheduler.add('userB'); + scheduler.add('userB'); + scheduler.add('userC'); + + scheduler.on('done', function() { + var results = taskRunner.results; + + assert.equal(results.length, 6); + + assert.equal(results[0], 'userA'); + assert.equal(results[1], 'userB'); + assert.equal(results[2], 'userC'); + assert.equal(results[3], 'userA'); + assert.equal(results[4], 'userB'); + assert.equal(results[5], 'userA'); + + return done(); + }); + + scheduler.schedule(); + }); + }); +});