CartoDB-SQL-API/lib/batch/job-queue.js

168 lines
4.4 KiB
JavaScript
Raw Normal View History

'use strict';
var queueAsync = require('queue-async');
2016-10-18 00:16:52 +08:00
function JobQueue(metadataBackend, jobPublisher, logger) {
this.metadataBackend = metadataBackend;
this.jobPublisher = jobPublisher;
this.logger = logger;
}
2016-10-12 23:53:03 +08:00
module.exports = JobQueue;
var QUEUE = {
DB: 5,
PREFIX: 'batch:queue:',
INDEX: 'batch:indexes:queue'
2016-10-12 23:53:03 +08:00
};
2016-10-12 23:53:03 +08:00
module.exports.QUEUE = QUEUE;
JobQueue.prototype.enqueue = function (user, jobId, callback) {
this.logger.debug('JobQueue.enqueue user=%s, jobId=%s', user, jobId);
this.metadataBackend.redisMultiCmd(QUEUE.DB, [
[ 'LPUSH', QUEUE.PREFIX + user, jobId ],
[ 'SADD', QUEUE.INDEX, user ]
], function (err) {
if (err) {
return callback(err);
}
2016-10-18 00:16:52 +08:00
this.jobPublisher.publish(user);
callback();
2016-10-18 00:16:52 +08:00
}.bind(this));
};
JobQueue.prototype.size = function (user, callback) {
this.metadataBackend.redisCmd(QUEUE.DB, 'LLEN', [ QUEUE.PREFIX + user ], callback);
};
JobQueue.prototype.dequeue = function (user, callback) {
var self = this;
var dequeueScript = [
'local job_id = redis.call("RPOP", KEYS[1])',
'if redis.call("LLEN", KEYS[1]) == 0 then',
' redis.call("SREM", KEYS[2], ARGV[1])',
'end',
'return job_id'
].join('\n');
var redisParams = [
dequeueScript, //lua source code
2, // Two "keys" to pass
QUEUE.PREFIX + user, //KEYS[1], the key of the queue
QUEUE.INDEX, //KEYS[2], the key of the index
2017-04-03 17:38:20 +08:00
user // ARGV[1] - value of the element to remove from the index
];
this.metadataBackend.redisCmd(QUEUE.DB, 'EVAL', redisParams, function (err, jobId) {
self.logger.debug('JobQueue.dequeued user=%s, jobId=%s', user, jobId);
2016-10-18 00:16:52 +08:00
return callback(err, jobId);
});
};
JobQueue.prototype.enqueueFirst = function (user, jobId, callback) {
this.logger.debug('JobQueue.enqueueFirst user=%s, jobId=%s', user, jobId);
this.metadataBackend.redisMultiCmd(QUEUE.DB, [
[ 'RPUSH', QUEUE.PREFIX + user, jobId ],
[ 'SADD', QUEUE.INDEX, user ]
], function (err) {
if (err) {
return callback(err);
}
this.jobPublisher.publish(user);
callback();
}.bind(this));
};
JobQueue.prototype.getQueues = function (callback) {
this.metadataBackend.redisCmd(QUEUE.DB, 'SMEMBERS', [ QUEUE.INDEX ], function (err, queues) {
if (err) {
return callback(err);
}
callback(null, queues);
});
};
JobQueue.prototype.scanQueues = function (callback) {
var self = this;
self.scan(function (err, queues) {
if (err) {
return callback(err);
}
2019-04-04 23:18:45 +08:00
self.logger.debug('JobQueue.scanQueues found queues=%s', queues.join(', '));
self.addToQueueIndex(queues, function (err) {
if (err) {
return callback(err);
}
callback(null, queues);
});
});
};
JobQueue.prototype.scan = function (callback) {
var self = this;
var initialCursor = ['0'];
var users = {};
self._scan(initialCursor, users, function(err, users) {
if (err) {
return callback(err);
}
callback(null, Object.keys(users));
});
};
JobQueue.prototype._scan = function (cursor, users, callback) {
var self = this;
var redisParams = [cursor[0], 'MATCH', QUEUE.PREFIX + '*'];
self.metadataBackend.redisCmd(QUEUE.DB, 'SCAN', redisParams, function (err, currentCursor) {
if (err) {
return callback(null, users);
}
var queues = currentCursor[1];
if (queues) {
queues.forEach(function (queue) {
var user = queue.substr(QUEUE.PREFIX.length);
users[user] = true;
});
}
var hasMore = currentCursor[0] !== '0';
if (!hasMore) {
return callback(null, users);
}
self._scan(currentCursor, users, callback);
});
};
JobQueue.prototype.addToQueueIndex = function (users, callback) {
var self = this;
var usersQueues = queueAsync(users.length);
users.forEach(function (user) {
usersQueues.defer(function (user, callback) {
self.metadataBackend.redisCmd(QUEUE.DB, 'SADD', [ QUEUE.INDEX, user], callback);
}, user);
});
usersQueues.awaitAll(function (err) {
if (err) {
return callback(err);
}
callback(null);
});
};