Added callback to job subscriber to allow to batch service emit ready event
This commit is contained in:
parent
438d61fb70
commit
0586f45413
@ -25,7 +25,7 @@ Batch.prototype.start = function () {
|
|||||||
Batch.prototype._subscribe = function () {
|
Batch.prototype._subscribe = function () {
|
||||||
var self = this;
|
var self = this;
|
||||||
|
|
||||||
this.jobSubscriber.subscribe(function (channel, host) {
|
this.jobSubscriber.subscribe(function onMessage(channel, host) {
|
||||||
var queue = self.jobQueuePool.getQueue(host);
|
var queue = self.jobQueuePool.getQueue(host);
|
||||||
|
|
||||||
// there is nothing to do. It is already running jobs
|
// there is nothing to do. It is already running jobs
|
||||||
@ -46,6 +46,12 @@ Batch.prototype._subscribe = function () {
|
|||||||
|
|
||||||
debug(err);
|
debug(err);
|
||||||
});
|
});
|
||||||
|
}, function (err) {
|
||||||
|
if (err) {
|
||||||
|
return self.emit('error', err);
|
||||||
|
}
|
||||||
|
|
||||||
|
self.emit('ready');
|
||||||
});
|
});
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -6,12 +6,7 @@ var error = require('./util/debug')('pubsub:subscriber:error');
|
|||||||
var DB = 0;
|
var DB = 0;
|
||||||
var SUBSCRIBE_INTERVAL_IN_MILLISECONDS = 10 * 60 * 1000; // 10 minutes
|
var SUBSCRIBE_INTERVAL_IN_MILLISECONDS = 10 * 60 * 1000; // 10 minutes
|
||||||
|
|
||||||
function _subscribe(client, channel, queueSeeker, onMessage) {
|
function _subscribe(client, channel, queueSeeker, onMessage, callback) {
|
||||||
|
|
||||||
queueSeeker.seek(onMessage, function (err) {
|
|
||||||
if (err) {
|
|
||||||
error(err);
|
|
||||||
}
|
|
||||||
|
|
||||||
client.removeAllListeners('message');
|
client.removeAllListeners('message');
|
||||||
client.unsubscribe(channel);
|
client.unsubscribe(channel);
|
||||||
@ -21,6 +16,21 @@ function _subscribe(client, channel, queueSeeker, onMessage) {
|
|||||||
debug('message received from: ' + channel + ':' + host);
|
debug('message received from: ' + channel + ':' + host);
|
||||||
onMessage(channel, host);
|
onMessage(channel, host);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
queueSeeker.seek(onMessage, function (err) {
|
||||||
|
if (err) {
|
||||||
|
error(err);
|
||||||
|
|
||||||
|
if (callback) {
|
||||||
|
callback(err);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
debug('queues found successfully');
|
||||||
|
|
||||||
|
if (callback) {
|
||||||
|
callback();
|
||||||
|
}
|
||||||
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -32,7 +42,7 @@ function JobSubscriber(pool, queueSeeker) {
|
|||||||
|
|
||||||
module.exports = JobSubscriber;
|
module.exports = JobSubscriber;
|
||||||
|
|
||||||
JobSubscriber.prototype.subscribe = function (onMessage) {
|
JobSubscriber.prototype.subscribe = function (onMessage, callback) {
|
||||||
var self = this;
|
var self = this;
|
||||||
|
|
||||||
this.pool.acquire(DB, function (err, client) {
|
this.pool.acquire(DB, function (err, client) {
|
||||||
@ -42,8 +52,6 @@ JobSubscriber.prototype.subscribe = function (onMessage) {
|
|||||||
|
|
||||||
self.client = client;
|
self.client = client;
|
||||||
|
|
||||||
_subscribe(self.client, self.channel, self.queueSeeker, onMessage);
|
|
||||||
|
|
||||||
self.seekerInterval = setInterval(
|
self.seekerInterval = setInterval(
|
||||||
_subscribe,
|
_subscribe,
|
||||||
SUBSCRIBE_INTERVAL_IN_MILLISECONDS,
|
SUBSCRIBE_INTERVAL_IN_MILLISECONDS,
|
||||||
@ -52,6 +60,8 @@ JobSubscriber.prototype.subscribe = function (onMessage) {
|
|||||||
self.queueSeeker,
|
self.queueSeeker,
|
||||||
onMessage
|
onMessage
|
||||||
);
|
);
|
||||||
|
|
||||||
|
_subscribe(self.client, self.channel, self.queueSeeker, onMessage, callback);
|
||||||
});
|
});
|
||||||
|
|
||||||
};
|
};
|
||||||
|
Loading…
Reference in New Issue
Block a user