diff --git a/batch/scheduler/scheduler.js b/batch/scheduler/scheduler.js index f47bfd4d..a05ac1ca 100644 --- a/batch/scheduler/scheduler.js +++ b/batch/scheduler/scheduler.js @@ -64,6 +64,8 @@ Scheduler.prototype.add = function(user) { this.users[user] = taskEntity; this.tasksTree.insert(taskEntity); + this.emit('add'); + return false; } }; @@ -122,19 +124,33 @@ Scheduler.prototype.acquire = function(callback) { return callback(err); } + function addListener() { + self.removeListener('release', releaseListener); + debug('Got a new task'); + self.acquire(callback); + } + + function releaseListener() { + self.removeListener('add', addListener); + debug('Slot was released'); + self.acquire(callback); + } + debug('Trying to acquire task'); var allRunning = self.tasks.every(is(STATUS.RUNNING)); var running = self.tasks.filter(is(STATUS.RUNNING)); debug('[capacity=%d, running=%d, all=%s] candidates=%j', capacity, running.length, allRunning, self.tasks); + if (allRunning && running.length < capacity) { + debug('Waiting for tasks'); + self.once('add', addListener); + } + var isRunningAny = self.tasks.some(is(STATUS.RUNNING)); if (isRunningAny || running.length >= capacity) { debug('Waiting for slot'); - return self.once('release', function releaseListener() { - debug('Slot was released'); - self.acquire(callback); - }); + return self.once('release', releaseListener); } var candidate = self.tasksTree.min();