Scheduler uses a red–black tree to decide on next job candidate

remotes/origin/fix-broken-test-in-travis
Raul Ochoa 8 years ago
parent 71d32e003b
commit 1ee0878631

@ -1,7 +1,13 @@
'use strict';
// Inspiration from:
// - https://www.kernel.org/doc/Documentation/scheduler/sched-design-CFS.txt
// - https://www.kernel.org/doc/Documentation/rbtree.txt
// - http://www.ibm.com/developerworks/linux/library/l-completely-fair-scheduler/
var util = require('util');
var EventEmitter = require('events').EventEmitter;
var RBTree = require('bintrees').RBTree;
var debug = require('../util/debug')('scheduler');
@ -13,6 +19,30 @@ function Scheduler(capacity, taskRunner) {
this.capacity = capacity;
this.tasks = [];
this.users = {};
this.tasksTree = new RBTree(function(taskEntityA, taskEntityB) {
// if the user is the same it's the same entity
if (taskEntityA.user === taskEntityB.user) {
return 0;
}
// priority for entity with less executed jobs
if (taskEntityA.jobs !== taskEntityB.jobs) {
return taskEntityA.jobs - taskEntityB.jobs;
}
// priority for entity with oldest executed job
if (taskEntityA.runAt !== taskEntityB.runAt) {
return taskEntityA.runAt - taskEntityB.runAt;
}
// priority for oldest job
if (taskEntityA.createdAt !== taskEntityB.createdAt) {
return taskEntityA.createdAt - taskEntityB.createdAt;
}
// we don't care if we arrive here
return -1;
});
}
util.inherits(Scheduler, EventEmitter);
@ -20,17 +50,18 @@ module.exports = Scheduler;
Scheduler.prototype.add = function(user) {
debug('add(%s)', user);
var task = this.users[user];
if (task) {
if (task.status === STATUS.DONE) {
task.status = STATUS.PENDING;
var taskEntity = this.users[user];
if (taskEntity) {
if (taskEntity.status === STATUS.DONE) {
taskEntity.status = STATUS.PENDING;
}
return true;
} else {
task = new TaskEntity(user);
this.tasks.push(task);
this.users[user] = task;
taskEntity = new TaskEntity(user, this.tasks.length);
this.tasks.push(taskEntity);
this.users[user] = taskEntity;
this.tasksTree.insert(taskEntity);
return false;
}
@ -45,7 +76,7 @@ Scheduler.prototype.schedule = function() {
var self = this;
forever(
function (next) {
debug('Trying to acquire user');
debug('Waiting for task');
self.acquire(function(err, taskEntity) {
debug('Acquired user=%j', taskEntity);
@ -53,23 +84,26 @@ Scheduler.prototype.schedule = function() {
return next(new Error('all users finished'));
}
taskEntity.status = STATUS.RUNNING;
// try to acquire next user
// will block until capacity slow is available
next();
self.tasksTree.remove(taskEntity);
taskEntity.running();
debug('Running task for user=%s', taskEntity.user);
self.taskRunner.run(taskEntity.user, function(err, userQueueIsEmpty) {
taskEntity.status = userQueueIsEmpty ? STATUS.DONE : STATUS.PENDING;
debug('Run task=%j, done=%s', taskEntity, userQueueIsEmpty);
taskEntity.ran(userQueueIsEmpty);
self.release(err, taskEntity);
});
// try to acquire next user
// will block until capacity slot is available
next();
});
},
function (err) {
debug('done: %s', err.message);
self.running = false;
self.emit('done');
self.removeAllListeners();
}
);
@ -80,28 +114,29 @@ Scheduler.prototype.acquire = function(callback) {
if (this.tasks.every(is(STATUS.DONE))) {
return callback(null, null);
}
var self = this;
this.capacity.getCapacity(function(err, capacity) {
if (err) {
return callback(err);
}
debug('Trying to acquire task');
var allRunning = self.tasks.every(is(STATUS.RUNNING));
var running = self.tasks.filter(is(STATUS.RUNNING));
debug('[capacity=%d, running=%d, all=%s] candidates=%j', capacity, running.length, allRunning, self.tasks);
debug('Trying to acquire users=%j, running=%d, capacity=%d', self.tasks, running.length, capacity);
var allUsersRunning = self.tasks.every(is(STATUS.RUNNING));
if (running.length >= capacity || allUsersRunning) {
debug(
'Waiting for slot. capacity=%s, running=%s, all_running=%s',
capacity, running.length, allUsersRunning
);
return self.once('release', function() {
var isRunningAny = self.tasks.some(is(STATUS.RUNNING));
if (isRunningAny || running.length >= capacity) {
debug('Waiting for slot');
return self.once('release', function releaseListener() {
debug('Slot was released');
self.acquire(callback);
});
}
var candidate = self.tasks.filter(is(STATUS.PENDING))[0];
var candidate = self.tasksTree.min();
return callback(null, candidate);
});
@ -109,7 +144,9 @@ Scheduler.prototype.acquire = function(callback) {
Scheduler.prototype.release = function(err, taskEntity) {
debug('Released %j', taskEntity);
// decide what to do based on status/jobs
if (taskEntity.is(STATUS.PENDING)) {
this.tasksTree.insert(taskEntity);
}
this.emit('release');
};
@ -122,16 +159,28 @@ var STATUS = {
DONE: 'done'
};
function TaskEntity(user) {
function TaskEntity(user, createdAt) {
this.user = user;
this.createdAt = createdAt;
this.status = STATUS.PENDING;
this.jobs = 0;
this.runAt = 0;
}
TaskEntity.prototype.is = function(status) {
return this.status === status;
};
TaskEntity.prototype.running = function() {
this.status = STATUS.RUNNING;
this.runAt = Date.now();
};
TaskEntity.prototype.ran = function(userQueueIsEmpty) {
this.jobs++;
this.status = userQueueIsEmpty ? STATUS.DONE : STATUS.PENDING;
};
function is(status) {
return function(taskEntity) {
return taskEntity.is(status);

@ -2,6 +2,11 @@
"name": "cartodb_sql_api",
"version": "1.39.2",
"dependencies": {
"bintrees": {
"version": "1.0.1",
"from": "bintrees@>=1.0.1 <2.0.0",
"resolved": "https://registry.npmjs.org/bintrees/-/bintrees-1.0.1.tgz"
},
"bunyan": {
"version": "1.8.1",
"from": "bunyan@1.8.1",
@ -52,9 +57,9 @@
"resolved": "https://registry.npmjs.org/glob/-/glob-6.0.4.tgz",
"dependencies": {
"inflight": {
"version": "1.0.5",
"version": "1.0.6",
"from": "inflight@>=1.0.4 <2.0.0",
"resolved": "https://registry.npmjs.org/inflight/-/inflight-1.0.5.tgz",
"resolved": "https://registry.npmjs.org/inflight/-/inflight-1.0.6.tgz",
"dependencies": {
"wrappy": {
"version": "1.0.2",

@ -17,6 +17,7 @@
"Sandro Santilli <strk@vizzuality.com>"
],
"dependencies": {
"bintrees": "1.0.1",
"bunyan": "1.8.1",
"cartodb-psql": "~0.6.0",
"cartodb-query-tables": "0.2.0",

@ -0,0 +1,106 @@
'use strict';
require('../../helper');
var assert = require('../../support/assert');
var Scheduler = require('../../../batch/scheduler/scheduler');
var OneCapacity = require('../../../batch/scheduler/capacity/one');
var InfinityCapacity = require('../../../batch/scheduler/capacity/infinity');
describe('scheduler', function() {
function TaskRunner(userTasks) {
this.results = [];
this.userTasks = userTasks;
}
TaskRunner.prototype.run = function(user, callback) {
this.results.push(user);
this.userTasks[user]--;
return callback(null, this.userTasks[user] === 0);
};
// simulate one by one or infinity capacity
var capacities = [new OneCapacity(), new InfinityCapacity()];
capacities.forEach(function(capacity) {
it('should run tasks', function (done) {
var taskRunner = new TaskRunner({
userA: 1
});
var scheduler = new Scheduler(capacity, taskRunner);
scheduler.add('userA');
scheduler.on('done', function() {
var results = taskRunner.results;
assert.equal(results.length, 1);
assert.equal(results[0], 'userA');
return done();
});
scheduler.schedule();
});
it('should run tasks for different users', function (done) {
var taskRunner = new TaskRunner({
userA: 1,
userB: 1,
userC: 1
});
var scheduler = new Scheduler(capacity, taskRunner);
scheduler.add('userA');
scheduler.add('userB');
scheduler.add('userC');
scheduler.on('done', function() {
var results = taskRunner.results;
assert.equal(results.length, 3);
assert.equal(results[0], 'userA');
assert.equal(results[1], 'userB');
assert.equal(results[2], 'userC');
return done();
});
scheduler.schedule();
});
it('should be fair when scheduling tasks', function (done) {
var taskRunner = new TaskRunner({
userA: 3,
userB: 2,
userC: 1
});
var scheduler = new Scheduler(capacity, taskRunner);
scheduler.add('userA');
scheduler.add('userA');
scheduler.add('userA');
scheduler.add('userB');
scheduler.add('userB');
scheduler.add('userC');
scheduler.on('done', function() {
var results = taskRunner.results;
assert.equal(results.length, 6);
assert.equal(results[0], 'userA');
assert.equal(results[1], 'userB');
assert.equal(results[2], 'userC');
assert.equal(results[3], 'userA');
assert.equal(results[4], 'userB');
assert.equal(results[5], 'userA');
return done();
});
scheduler.schedule();
});
});
});
Loading…
Cancel
Save