'use strict'; var Channel = require('./channel'); var debug = require('./../util/debug')('pubsub:subscriber'); var error = require('./../util/debug')('pubsub:subscriber:error'); var SUBSCRIBE_INTERVAL_IN_MILLISECONDS = 10 * 60 * 1000; // 10 minutes function _subscribe(client, channel, queueSeeker, onMessage, callback) { client.removeAllListeners('message'); client.unsubscribe(channel); client.subscribe(channel); client.on('message', function (channel, host) { debug('message received from: ' + channel + ':' + host); onMessage(channel, host); }); queueSeeker.seek(onMessage, function (err) { if (err) { error(err); if (callback) { callback(err); } } else { debug('queues found successfully'); if (callback) { callback(); } } }); } function JobSubscriber(pool, queueSeeker) { this.pool = pool; this.queueSeeker = queueSeeker; } module.exports = JobSubscriber; JobSubscriber.prototype.subscribe = function (onMessage, callback) { var self = this; this.pool.acquire(Channel.DB, function (err, client) { if (err) { return error('Error adquiring redis client: ' + err.message); } self.client = client; self.seekerInterval = setInterval( _subscribe, SUBSCRIBE_INTERVAL_IN_MILLISECONDS, self.client, Channel.NAME, self.queueSeeker, onMessage ); _subscribe(self.client, Channel.NAME, self.queueSeeker, onMessage, callback); }); }; JobSubscriber.prototype.unsubscribe = function () { clearInterval(this.seekerInterval); if (this.client && this.client.connected) { this.client.unsubscribe(Channel.NAME); } };