2015-12-22 02:57:10 +08:00
|
|
|
'use strict';
|
|
|
|
|
2015-12-25 00:42:49 +08:00
|
|
|
var queue = require('queue-async');
|
2016-05-18 01:27:05 +08:00
|
|
|
var REDIS_PREFIX = 'batch:jobs:';
|
2016-01-21 22:33:42 +08:00
|
|
|
var JOBS_TTL_IN_SECONDS = global.settings.jobs_ttl_in_seconds || 48 * 3600; // 48 hours
|
2016-04-06 00:50:04 +08:00
|
|
|
var jobStatus = require('./job_status');
|
2016-05-14 00:50:55 +08:00
|
|
|
var finalStatus = [
|
|
|
|
jobStatus.CANCELLED,
|
|
|
|
jobStatus.DONE,
|
|
|
|
jobStatus.FAILED,
|
|
|
|
jobStatus.UNKNOWN
|
|
|
|
];
|
2016-04-01 17:33:27 +08:00
|
|
|
|
2015-12-25 00:42:49 +08:00
|
|
|
function JobBackend(metadataBackend, jobQueueProducer, jobPublisher, userIndexer) {
|
2016-01-08 22:47:59 +08:00
|
|
|
this.db = 5;
|
2015-12-22 02:57:10 +08:00
|
|
|
this.metadataBackend = metadataBackend;
|
2015-12-25 00:42:49 +08:00
|
|
|
this.jobQueueProducer = jobQueueProducer;
|
|
|
|
this.jobPublisher = jobPublisher;
|
|
|
|
this.userIndexer = userIndexer;
|
2015-12-22 02:57:10 +08:00
|
|
|
}
|
|
|
|
|
2016-05-18 01:27:05 +08:00
|
|
|
function toRedisParams(data) {
|
|
|
|
var redisParams = [REDIS_PREFIX + data.job_id];
|
2016-05-16 07:22:47 +08:00
|
|
|
var obj = JSON.parse(JSON.stringify(data));
|
|
|
|
delete obj.job_id;
|
|
|
|
|
2016-05-14 00:50:55 +08:00
|
|
|
for (var property in obj) {
|
|
|
|
if (obj.hasOwnProperty(property)) {
|
|
|
|
redisParams.push(property);
|
2016-05-17 07:00:27 +08:00
|
|
|
// TODO: this should be moved to job model
|
2016-05-18 01:27:05 +08:00
|
|
|
if ((property === 'query' || property === 'status') && typeof obj[property] !== 'string') {
|
2016-05-17 07:00:27 +08:00
|
|
|
redisParams.push(JSON.stringify(obj[property]));
|
2016-05-16 07:22:47 +08:00
|
|
|
} else {
|
|
|
|
redisParams.push(obj[property]);
|
|
|
|
}
|
2016-01-07 19:06:01 +08:00
|
|
|
}
|
2016-05-14 00:50:55 +08:00
|
|
|
}
|
2016-01-07 19:06:01 +08:00
|
|
|
|
2016-05-14 00:50:55 +08:00
|
|
|
return redisParams;
|
2016-05-18 01:27:05 +08:00
|
|
|
}
|
2016-01-07 19:06:01 +08:00
|
|
|
|
2016-05-18 01:27:05 +08:00
|
|
|
function toObject(job_id, redisParams, redisValues) {
|
2016-05-14 00:50:55 +08:00
|
|
|
var obj = {};
|
2016-05-16 07:22:47 +08:00
|
|
|
|
|
|
|
redisParams.shift(); // job_id value
|
|
|
|
redisParams.pop(); // WARN: weird function pushed by metadataBackend
|
|
|
|
|
2016-05-14 00:50:55 +08:00
|
|
|
for (var i = 0; i < redisParams.length; i++) {
|
2016-05-17 07:00:27 +08:00
|
|
|
// TODO: this should be moved to job model
|
|
|
|
if (redisParams[i] === 'query' || redisParams[i] === 'status') {
|
2016-05-16 07:22:47 +08:00
|
|
|
try {
|
|
|
|
obj[redisParams[i]] = JSON.parse(redisValues[i]);
|
|
|
|
} catch (e) {
|
|
|
|
obj[redisParams[i]] = redisValues[i];
|
|
|
|
}
|
2016-05-16 17:56:44 +08:00
|
|
|
} else if (redisValues[i]) {
|
2016-05-16 07:22:47 +08:00
|
|
|
obj[redisParams[i]] = redisValues[i];
|
|
|
|
}
|
2016-05-14 00:50:55 +08:00
|
|
|
}
|
2016-01-05 02:08:13 +08:00
|
|
|
|
2016-05-16 07:22:47 +08:00
|
|
|
obj.job_id = job_id; // adds redisKey as object property
|
|
|
|
|
2016-05-14 00:50:55 +08:00
|
|
|
return obj;
|
2016-05-18 01:27:05 +08:00
|
|
|
}
|
2016-01-05 02:08:13 +08:00
|
|
|
|
2016-05-14 00:50:55 +08:00
|
|
|
// TODO: is it really necessary??
|
|
|
|
function isJobFound(redisValues) {
|
|
|
|
return redisValues[0] && redisValues[1] && redisValues[2] && redisValues[3] && redisValues[4];
|
|
|
|
}
|
2016-01-22 19:43:41 +08:00
|
|
|
|
2015-12-24 00:29:11 +08:00
|
|
|
JobBackend.prototype.get = function (job_id, callback) {
|
2016-01-22 19:43:41 +08:00
|
|
|
var self = this;
|
2015-12-22 02:57:10 +08:00
|
|
|
var redisParams = [
|
2016-05-18 01:27:05 +08:00
|
|
|
REDIS_PREFIX + job_id,
|
2015-12-22 02:57:10 +08:00
|
|
|
'user',
|
|
|
|
'status',
|
|
|
|
'query',
|
|
|
|
'created_at',
|
2015-12-24 00:29:11 +08:00
|
|
|
'updated_at',
|
2016-05-16 07:22:47 +08:00
|
|
|
'host',
|
2015-12-24 00:29:11 +08:00
|
|
|
'failed_reason'
|
2015-12-22 02:57:10 +08:00
|
|
|
];
|
|
|
|
|
2016-05-18 01:27:05 +08:00
|
|
|
self.metadataBackend.redisCmd(this.db, 'HMGET', redisParams , function (err, redisValues) {
|
2015-12-22 02:57:10 +08:00
|
|
|
if (err) {
|
|
|
|
return callback(err);
|
|
|
|
}
|
|
|
|
|
2016-05-14 00:50:55 +08:00
|
|
|
if (!isJobFound(redisValues)) {
|
2016-01-05 02:08:13 +08:00
|
|
|
var notFoundError = new Error('Job with id ' + job_id + ' not found');
|
|
|
|
notFoundError.name = 'NotFoundError';
|
|
|
|
return callback(notFoundError);
|
2015-12-22 02:57:10 +08:00
|
|
|
}
|
|
|
|
|
2016-05-18 01:27:05 +08:00
|
|
|
var jobData = toObject(job_id, redisParams, redisValues);
|
2016-03-18 21:57:18 +08:00
|
|
|
|
2016-05-14 00:50:55 +08:00
|
|
|
callback(null, jobData);
|
2015-12-22 02:57:10 +08:00
|
|
|
});
|
|
|
|
};
|
|
|
|
|
2016-05-14 00:50:55 +08:00
|
|
|
JobBackend.prototype.create = function (data, callback) {
|
2016-03-30 22:44:55 +08:00
|
|
|
var self = this;
|
2015-12-22 02:57:10 +08:00
|
|
|
|
2016-05-14 00:50:55 +08:00
|
|
|
self.get(data.job_id, function (err) {
|
|
|
|
if (err && err.name !== 'NotFoundError') {
|
2016-01-08 22:47:59 +08:00
|
|
|
return callback(err);
|
2015-12-22 02:57:10 +08:00
|
|
|
}
|
|
|
|
|
2016-05-14 00:50:55 +08:00
|
|
|
self.save(data, function (err, job) {
|
|
|
|
if (err) {
|
|
|
|
return callback(err);
|
|
|
|
}
|
|
|
|
|
|
|
|
self.jobQueueProducer.enqueue(data.job_id, data.host, function (err) {
|
|
|
|
if (err) {
|
|
|
|
return callback(err);
|
|
|
|
}
|
|
|
|
|
|
|
|
// broadcast to consumers
|
|
|
|
self.jobPublisher.publish(data.host);
|
|
|
|
|
|
|
|
self.userIndexer.add(data.user, data.job_id, function (err) {
|
|
|
|
if (err) {
|
|
|
|
return callback(err);
|
|
|
|
}
|
|
|
|
|
|
|
|
callback(null, job);
|
|
|
|
});
|
|
|
|
});
|
|
|
|
});
|
2015-12-22 02:57:10 +08:00
|
|
|
});
|
|
|
|
};
|
|
|
|
|
2016-05-14 00:50:55 +08:00
|
|
|
JobBackend.prototype.update = function (data, callback) {
|
2016-03-30 22:44:55 +08:00
|
|
|
var self = this;
|
2016-01-13 23:25:25 +08:00
|
|
|
|
2016-05-14 00:50:55 +08:00
|
|
|
self.get(data.job_id, function (err) {
|
2016-01-13 23:25:25 +08:00
|
|
|
if (err) {
|
|
|
|
return callback(err);
|
|
|
|
}
|
|
|
|
|
2016-05-14 00:50:55 +08:00
|
|
|
self.save(data, callback);
|
2016-01-13 23:25:25 +08:00
|
|
|
});
|
|
|
|
};
|
|
|
|
|
2016-05-14 00:50:55 +08:00
|
|
|
JobBackend.prototype.save = function (data, callback) {
|
2015-12-22 02:57:10 +08:00
|
|
|
var self = this;
|
2016-05-18 01:27:05 +08:00
|
|
|
var redisParams = toRedisParams(data);
|
2016-03-31 18:39:03 +08:00
|
|
|
|
2016-05-14 00:50:55 +08:00
|
|
|
self.metadataBackend.redisCmd(self.db, 'HMSET', redisParams , function (err) {
|
2015-12-22 02:57:10 +08:00
|
|
|
if (err) {
|
2016-01-08 22:47:59 +08:00
|
|
|
return callback(err);
|
2015-12-22 02:57:10 +08:00
|
|
|
}
|
|
|
|
|
2016-05-14 00:50:55 +08:00
|
|
|
self.setTTL(data, function (err) {
|
2016-01-04 22:20:06 +08:00
|
|
|
if (err) {
|
2016-01-08 22:47:59 +08:00
|
|
|
return callback(err);
|
2016-01-04 22:20:06 +08:00
|
|
|
}
|
|
|
|
|
2016-05-14 00:50:55 +08:00
|
|
|
self.get(data.job_id, function (err, job) {
|
|
|
|
if (err) {
|
|
|
|
return callback(err);
|
|
|
|
}
|
|
|
|
|
|
|
|
callback(null, job);
|
|
|
|
});
|
2016-01-04 22:20:06 +08:00
|
|
|
});
|
2015-12-22 02:57:10 +08:00
|
|
|
});
|
|
|
|
};
|
|
|
|
|
2016-05-16 17:56:44 +08:00
|
|
|
function isFinalStatus(status) {
|
2016-05-14 00:50:55 +08:00
|
|
|
return finalStatus.indexOf(status) !== -1;
|
|
|
|
}
|
2016-03-31 18:39:03 +08:00
|
|
|
|
2016-05-14 00:50:55 +08:00
|
|
|
JobBackend.prototype.setTTL = function (data, callback) {
|
|
|
|
var self = this;
|
2016-05-18 01:27:05 +08:00
|
|
|
var redisKey = REDIS_PREFIX + data.job_id;
|
2016-03-31 18:39:03 +08:00
|
|
|
|
2016-05-16 17:56:44 +08:00
|
|
|
if (!isFinalStatus(data.status)) {
|
2016-05-14 00:50:55 +08:00
|
|
|
return callback();
|
|
|
|
}
|
2016-03-31 18:39:03 +08:00
|
|
|
|
2016-05-14 00:50:55 +08:00
|
|
|
self.metadataBackend.redisCmd(self.db, 'EXPIRE', [ redisKey, JOBS_TTL_IN_SECONDS ], callback);
|
2016-03-31 18:39:03 +08:00
|
|
|
};
|
|
|
|
|
2016-05-14 00:50:55 +08:00
|
|
|
JobBackend.prototype.list = function (user, callback) {
|
2015-12-22 02:57:10 +08:00
|
|
|
var self = this;
|
|
|
|
|
2016-05-14 00:50:55 +08:00
|
|
|
this.userIndexer.list(user, function (err, job_ids) {
|
2015-12-22 02:57:10 +08:00
|
|
|
if (err) {
|
2016-01-08 22:47:59 +08:00
|
|
|
return callback(err);
|
2015-12-22 02:57:10 +08:00
|
|
|
}
|
|
|
|
|
2016-05-14 00:50:55 +08:00
|
|
|
var initialLength = job_ids.length;
|
|
|
|
|
|
|
|
self._getCleanedList(user, job_ids, function (err, jobs) {
|
2016-01-04 22:20:06 +08:00
|
|
|
if (err) {
|
2016-01-08 22:47:59 +08:00
|
|
|
return callback(err);
|
2016-01-04 22:20:06 +08:00
|
|
|
}
|
|
|
|
|
2016-05-14 00:50:55 +08:00
|
|
|
if (jobs.length < initialLength) {
|
|
|
|
return self.list(user, callback);
|
|
|
|
}
|
|
|
|
|
|
|
|
callback(null, jobs);
|
2016-01-04 22:20:06 +08:00
|
|
|
});
|
2015-12-22 02:57:10 +08:00
|
|
|
});
|
|
|
|
};
|
|
|
|
|
2016-05-14 00:50:55 +08:00
|
|
|
JobBackend.prototype._getCleanedList = function (user, job_ids, callback) {
|
2015-12-31 03:16:18 +08:00
|
|
|
var self = this;
|
|
|
|
|
2016-05-14 00:50:55 +08:00
|
|
|
var jobsQueue = queue(job_ids.length);
|
|
|
|
|
|
|
|
job_ids.forEach(function(job_id) {
|
|
|
|
jobsQueue.defer(self._getIndexedJob.bind(self), job_id, user);
|
|
|
|
});
|
2016-03-31 18:39:03 +08:00
|
|
|
|
2016-05-14 00:50:55 +08:00
|
|
|
jobsQueue.awaitAll(function (err, jobs) {
|
2015-12-31 03:16:18 +08:00
|
|
|
if (err) {
|
2016-01-08 22:47:59 +08:00
|
|
|
return callback(err);
|
2015-12-31 03:16:18 +08:00
|
|
|
}
|
|
|
|
|
2016-05-14 00:50:55 +08:00
|
|
|
callback(null, jobs.filter(function (job) {
|
|
|
|
return job ? true : false;
|
|
|
|
}));
|
2015-12-31 03:16:18 +08:00
|
|
|
});
|
|
|
|
};
|
|
|
|
|
2016-05-14 00:50:55 +08:00
|
|
|
JobBackend.prototype._getIndexedJob = function (job_id, user, callback) {
|
2016-01-26 03:07:41 +08:00
|
|
|
var self = this;
|
|
|
|
|
|
|
|
this.get(job_id, function (err, job) {
|
2016-05-14 00:50:55 +08:00
|
|
|
if (err && err.name === 'NotFoundError') {
|
|
|
|
return self.userIndexer.remove(user, job_id, function (err) {
|
|
|
|
if (err) {
|
|
|
|
console.error('Error removing key %s in user set', job_id, err);
|
|
|
|
}
|
|
|
|
callback();
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
2016-01-26 03:07:41 +08:00
|
|
|
if (err) {
|
|
|
|
return callback(err);
|
|
|
|
}
|
|
|
|
|
2016-05-14 00:50:55 +08:00
|
|
|
callback(null, job);
|
2016-01-26 03:07:41 +08:00
|
|
|
});
|
|
|
|
};
|
|
|
|
|
2015-12-22 02:57:10 +08:00
|
|
|
module.exports = JobBackend;
|