diff --git a/batch/batch.js b/batch/batch.js index 703e0ae3..6178e915 100644 --- a/batch/batch.js +++ b/batch/batch.js @@ -18,7 +18,7 @@ function Batch(name, jobSubscriber, jobQueue, jobRunner, jobService, jobPublishe this.jobService = jobService; this.jobPublisher = jobPublisher; this.logger = logger; - this.hostScheduler = new HostScheduler({ run: this.processJob.bind(this) }, redisPool); + this.hostScheduler = new HostScheduler(name, { run: this.processJob.bind(this) }, redisPool); this.hostUserQueueMover = new HostUserQueueMover(jobQueue, jobService, this.locker, redisPool); // map: user => jobId. Will be used for draining jobs. @@ -39,7 +39,7 @@ Batch.prototype.subscribe = function () { this.jobSubscriber.subscribe( function onJobHandler(user, host) { - debug('onJobHandler(%s, %s)', user, host); + debug('[%s] onJobHandler(%s, %s)', self.name, user, host); self.hostScheduler.add(host, user, function(err) { if (err) { return debug( @@ -83,7 +83,10 @@ Batch.prototype.processJob = function (user, callback) { return callback(err); } - debug('Job=%s status=%s user=%s (failed_reason=%s)', jobId, job.data.status, user, job.failed_reason); + debug( + '[%s] Job=%s status=%s user=%s (failed_reason=%s)', + self.name, jobId, job.data.status, user, job.failed_reason + ); self.logger.log(job); diff --git a/batch/scheduler/host-scheduler.js b/batch/scheduler/host-scheduler.js index 62dc4ead..a28be120 100644 --- a/batch/scheduler/host-scheduler.js +++ b/batch/scheduler/host-scheduler.js @@ -6,11 +6,12 @@ var Locker = require('../leader/locker'); var InfinityCapacity = require('./capacity/infinity'); //var OneCapacity = require('./capacity/one'); -function HostScheduler(taskRunner, redisPool) { +function HostScheduler(name, taskRunner, redisPool) { + this.name = name || 'scheduler'; this.taskRunner = taskRunner; this.locker = Locker.create('redis-distlock', { pool: redisPool }); this.locker.on('error', function(err, host) { - debug('Locker.error %s', err.message); + debug('[%s] Locker.error %s', this.name, err.message); this.unlock(host); }.bind(this)); // host => Scheduler @@ -22,21 +23,22 @@ module.exports = HostScheduler; HostScheduler.prototype.add = function(host, user, callback) { this.lock(host, function(err, scheduler) { if (err) { + debug('[%s] Could not lock host=%s', this.name, host); return callback(err); } scheduler.add(user); var wasRunning = scheduler.schedule(); - debug('Scheduler host=%s was running = %s', host, wasRunning); + debug('[%s] Scheduler host=%s was running=%s', this.name, host, wasRunning); return callback(err, wasRunning); - }); + }.bind(this)); }; HostScheduler.prototype.lock = function(host, callback) { - debug('lock(%s)', host); + debug('[%s] lock(%s)', this.name, host); var self = this; this.locker.lock(host, function(err) { if (err) { - debug('Could not lock host=%s. Reason: %s', host, err.message); + debug('[%s] Could not lock host=%s. Reason: %s', self.name, host, err.message); return callback(err); } @@ -46,13 +48,13 @@ HostScheduler.prototype.lock = function(host, callback) { self.schedulers[host] = scheduler; } - debug('Locked host=%s', host); + debug('[%s] Locked host=%s', self.name, host); return callback(null, self.schedulers[host]); }); }; HostScheduler.prototype.unlock = function(host) { - debug('unlock(%s)', host); + debug('[%s] unlock(%s)', this.name, host); if (this.schedulers.hasOwnProperty(host)) { // TODO stop scheduler? delete this.schedulers[host]; diff --git a/batch/scheduler/scheduler.js b/batch/scheduler/scheduler.js index f044c324..f47bfd4d 100644 --- a/batch/scheduler/scheduler.js +++ b/batch/scheduler/scheduler.js @@ -15,6 +15,7 @@ var forever = require('../util/forever'); function Scheduler(capacity, taskRunner) { EventEmitter.call(this); + debug('new Scheduler'); this.taskRunner = taskRunner; this.capacity = capacity; this.tasks = [];