diff --git a/batch/leader/locker.js b/batch/leader/locker.js index 559880c9..87ecdca6 100644 --- a/batch/leader/locker.js +++ b/batch/leader/locker.js @@ -7,18 +7,51 @@ var debug = require('../util/debug')('leader-locker'); function Locker(locker) { this.locker = locker; + this.intervalIds = {}; } module.exports = Locker; Locker.prototype.lock = function(host, ttl, callback) { + var self = this; debug('Locker.lock(%s, %d)', host, ttl); - this.locker.lock(host, ttl, callback); + this.locker.lock(host, ttl, function (err, lock) { + self.startRenewal(host); + return callback(err, lock); + }); }; Locker.prototype.unlock = function(host, callback) { + var self = this; debug('Locker.unlock(%s)', host); - this.locker.unlock(host, callback); + this.locker.unlock(host, function(err) { + self.stopRenewal(host); + return callback(err); + }); +}; + +Locker.prototype.startRenewal = function(host) { + var self = this; + if (!this.intervalIds.hasOwnProperty(host)) { + this.intervalIds[host] = setInterval(function() { + debug('Trying to extend lock host=%s', host); + self.locker.lock(host, 5000, function(err, _lock) { + if (err) { + return self.stopRenewal(host); + } + if (_lock) { + debug('Extended lock host=%s', host); + } + }); + }, 1000); + } +}; + +Locker.prototype.stopRenewal = function(host) { + if (this.intervalIds.hasOwnProperty(host)) { + clearInterval(this.intervalIds[host]); + delete this.intervalIds[host]; + } }; module.exports.create = function createLocker(type, config) {