2015-12-22 02:57:10 +08:00
|
|
|
'use strict';
|
|
|
|
|
2016-05-18 01:27:05 +08:00
|
|
|
var REDIS_PREFIX = 'batch:jobs:';
|
2016-05-18 01:48:55 +08:00
|
|
|
var REDIS_DB = 5;
|
2019-10-04 00:24:39 +08:00
|
|
|
var JobStatus = require('./job-status');
|
2016-10-28 02:42:49 +08:00
|
|
|
var queue = require('queue-async');
|
2016-04-01 17:33:27 +08:00
|
|
|
|
2019-12-24 01:19:08 +08:00
|
|
|
function JobBackend (metadataBackend, jobQueue, logger) {
|
2015-12-22 02:57:10 +08:00
|
|
|
this.metadataBackend = metadataBackend;
|
2016-10-13 04:40:09 +08:00
|
|
|
this.jobQueue = jobQueue;
|
2016-10-17 21:23:53 +08:00
|
|
|
this.maxNumberOfQueuedJobs = global.settings.batch_max_queued_jobs || 64;
|
2016-10-17 16:51:50 +08:00
|
|
|
this.inSecondsJobTTLAfterFinished = global.settings.finished_jobs_ttl_in_seconds || 2 * 3600; // 2 hours
|
2016-10-27 23:36:40 +08:00
|
|
|
this.hostname = global.settings.api_hostname || 'batch';
|
2019-04-04 20:31:41 +08:00
|
|
|
this.logger = logger;
|
2015-12-22 02:57:10 +08:00
|
|
|
}
|
|
|
|
|
2019-12-24 01:19:08 +08:00
|
|
|
function toRedisParams (job) {
|
2016-05-18 01:55:33 +08:00
|
|
|
var redisParams = [REDIS_PREFIX + job.job_id];
|
|
|
|
var obj = JSON.parse(JSON.stringify(job));
|
2016-05-16 07:22:47 +08:00
|
|
|
delete obj.job_id;
|
|
|
|
|
2016-05-14 00:50:55 +08:00
|
|
|
for (var property in obj) {
|
2019-12-27 01:28:01 +08:00
|
|
|
if (Object.prototype.hasOwnProperty.call(obj, property)) {
|
2016-05-14 00:50:55 +08:00
|
|
|
redisParams.push(property);
|
2016-05-24 00:47:45 +08:00
|
|
|
if (property === 'query' && typeof obj[property] !== 'string') {
|
2016-05-17 07:00:27 +08:00
|
|
|
redisParams.push(JSON.stringify(obj[property]));
|
2016-05-16 07:22:47 +08:00
|
|
|
} else {
|
|
|
|
redisParams.push(obj[property]);
|
|
|
|
}
|
2016-01-07 19:06:01 +08:00
|
|
|
}
|
2016-05-14 00:50:55 +08:00
|
|
|
}
|
2016-01-07 19:06:01 +08:00
|
|
|
|
2016-05-14 00:50:55 +08:00
|
|
|
return redisParams;
|
2016-05-18 01:27:05 +08:00
|
|
|
}
|
2016-01-07 19:06:01 +08:00
|
|
|
|
2019-12-27 01:28:01 +08:00
|
|
|
function toObject (jobId, redisParams, redisValues) {
|
2016-05-14 00:50:55 +08:00
|
|
|
var obj = {};
|
2016-05-16 07:22:47 +08:00
|
|
|
|
|
|
|
redisParams.shift(); // job_id value
|
|
|
|
redisParams.pop(); // WARN: weird function pushed by metadataBackend
|
|
|
|
|
2016-05-14 00:50:55 +08:00
|
|
|
for (var i = 0; i < redisParams.length; i++) {
|
2016-05-17 07:00:27 +08:00
|
|
|
// TODO: this should be moved to job model
|
2016-05-23 21:05:14 +08:00
|
|
|
if (redisParams[i] === 'query') {
|
2016-05-16 07:22:47 +08:00
|
|
|
try {
|
|
|
|
obj[redisParams[i]] = JSON.parse(redisValues[i]);
|
|
|
|
} catch (e) {
|
|
|
|
obj[redisParams[i]] = redisValues[i];
|
|
|
|
}
|
2016-05-16 17:56:44 +08:00
|
|
|
} else if (redisValues[i]) {
|
2016-05-16 07:22:47 +08:00
|
|
|
obj[redisParams[i]] = redisValues[i];
|
|
|
|
}
|
2016-05-14 00:50:55 +08:00
|
|
|
}
|
2016-01-05 02:08:13 +08:00
|
|
|
|
2019-12-27 01:28:01 +08:00
|
|
|
obj.job_id = jobId; // adds redisKey as object property
|
2016-05-16 07:22:47 +08:00
|
|
|
|
2016-05-14 00:50:55 +08:00
|
|
|
return obj;
|
2016-05-18 01:27:05 +08:00
|
|
|
}
|
2016-01-05 02:08:13 +08:00
|
|
|
|
2019-12-24 01:19:08 +08:00
|
|
|
function isJobFound (redisValues) {
|
2016-07-19 18:34:06 +08:00
|
|
|
return !!(redisValues[0] && redisValues[1] && redisValues[2] && redisValues[3] && redisValues[4]);
|
2016-05-14 00:50:55 +08:00
|
|
|
}
|
2016-01-22 19:43:41 +08:00
|
|
|
|
2019-12-27 01:28:01 +08:00
|
|
|
function getNotFoundError (jobId) {
|
|
|
|
var notFoundError = new Error('Job with id ' + jobId + ' not found');
|
2018-02-12 17:39:58 +08:00
|
|
|
notFoundError.name = 'NotFoundError';
|
|
|
|
return notFoundError;
|
|
|
|
}
|
|
|
|
|
2019-12-27 01:28:01 +08:00
|
|
|
JobBackend.prototype.get = function (jobId, callback) {
|
|
|
|
if (!jobId) {
|
|
|
|
return callback(getNotFoundError(jobId));
|
2018-02-12 17:39:58 +08:00
|
|
|
}
|
|
|
|
|
2016-01-22 19:43:41 +08:00
|
|
|
var self = this;
|
2015-12-22 02:57:10 +08:00
|
|
|
var redisParams = [
|
2019-12-27 01:28:01 +08:00
|
|
|
REDIS_PREFIX + jobId,
|
2015-12-22 02:57:10 +08:00
|
|
|
'user',
|
|
|
|
'status',
|
|
|
|
'query',
|
|
|
|
'created_at',
|
2015-12-24 00:29:11 +08:00
|
|
|
'updated_at',
|
2016-05-16 07:22:47 +08:00
|
|
|
'host',
|
2016-05-23 21:05:14 +08:00
|
|
|
'failed_reason',
|
2018-02-15 19:31:08 +08:00
|
|
|
'fallback_status',
|
|
|
|
'host',
|
|
|
|
'port',
|
|
|
|
'pass',
|
|
|
|
'dbname',
|
|
|
|
'dbuser'
|
2015-12-22 02:57:10 +08:00
|
|
|
];
|
|
|
|
|
2019-12-24 01:19:08 +08:00
|
|
|
self.metadataBackend.redisCmd(REDIS_DB, 'HMGET', redisParams, function (err, redisValues) {
|
2015-12-22 02:57:10 +08:00
|
|
|
if (err) {
|
|
|
|
return callback(err);
|
|
|
|
}
|
|
|
|
|
2016-05-14 00:50:55 +08:00
|
|
|
if (!isJobFound(redisValues)) {
|
2019-12-27 01:28:01 +08:00
|
|
|
return callback(getNotFoundError(jobId));
|
2015-12-22 02:57:10 +08:00
|
|
|
}
|
|
|
|
|
2019-12-27 01:28:01 +08:00
|
|
|
var jobData = toObject(jobId, redisParams, redisValues);
|
2016-03-18 21:57:18 +08:00
|
|
|
|
2016-05-14 00:50:55 +08:00
|
|
|
callback(null, jobData);
|
2015-12-22 02:57:10 +08:00
|
|
|
});
|
|
|
|
};
|
|
|
|
|
2016-05-18 01:55:33 +08:00
|
|
|
JobBackend.prototype.create = function (job, callback) {
|
2016-03-30 22:44:55 +08:00
|
|
|
var self = this;
|
2015-12-22 02:57:10 +08:00
|
|
|
|
2019-12-24 01:19:08 +08:00
|
|
|
this.jobQueue.size(job.user, function (err, size) {
|
2016-10-13 04:40:09 +08:00
|
|
|
if (err) {
|
|
|
|
return callback(new Error('Failed to create job, could not determine user queue size'));
|
2015-12-22 02:57:10 +08:00
|
|
|
}
|
|
|
|
|
2016-10-13 04:40:09 +08:00
|
|
|
if (size >= self.maxNumberOfQueuedJobs) {
|
2016-10-17 21:23:53 +08:00
|
|
|
return callback(new Error(
|
|
|
|
'Failed to create job. ' +
|
|
|
|
'Max number of jobs (' + self.maxNumberOfQueuedJobs + ') queued reached'
|
|
|
|
));
|
2016-10-13 04:40:09 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
self.get(job.job_id, function (err) {
|
|
|
|
if (err && err.name !== 'NotFoundError') {
|
2016-05-14 00:50:55 +08:00
|
|
|
return callback(err);
|
|
|
|
}
|
|
|
|
|
2016-10-13 04:40:09 +08:00
|
|
|
self.save(job, function (err, jobSaved) {
|
2016-05-14 00:50:55 +08:00
|
|
|
if (err) {
|
|
|
|
return callback(err);
|
|
|
|
}
|
|
|
|
|
2016-10-13 04:40:09 +08:00
|
|
|
self.jobQueue.enqueue(job.user, job.job_id, function (err) {
|
|
|
|
if (err) {
|
|
|
|
return callback(err);
|
|
|
|
}
|
|
|
|
|
|
|
|
return callback(null, jobSaved);
|
|
|
|
});
|
2016-05-14 00:50:55 +08:00
|
|
|
});
|
|
|
|
});
|
2015-12-22 02:57:10 +08:00
|
|
|
});
|
|
|
|
};
|
|
|
|
|
2016-05-18 01:55:33 +08:00
|
|
|
JobBackend.prototype.update = function (job, callback) {
|
2016-03-30 22:44:55 +08:00
|
|
|
var self = this;
|
2016-01-13 23:25:25 +08:00
|
|
|
|
2016-05-18 01:55:33 +08:00
|
|
|
self.get(job.job_id, function (err) {
|
2016-01-13 23:25:25 +08:00
|
|
|
if (err) {
|
|
|
|
return callback(err);
|
|
|
|
}
|
|
|
|
|
2016-05-18 01:55:33 +08:00
|
|
|
self.save(job, callback);
|
2016-01-13 23:25:25 +08:00
|
|
|
});
|
|
|
|
};
|
|
|
|
|
2016-05-18 01:55:33 +08:00
|
|
|
JobBackend.prototype.save = function (job, callback) {
|
2015-12-22 02:57:10 +08:00
|
|
|
var self = this;
|
2016-05-18 01:55:33 +08:00
|
|
|
var redisParams = toRedisParams(job);
|
2016-03-31 18:39:03 +08:00
|
|
|
|
2019-12-24 01:19:08 +08:00
|
|
|
self.metadataBackend.redisCmd(REDIS_DB, 'HMSET', redisParams, function (err) {
|
2015-12-22 02:57:10 +08:00
|
|
|
if (err) {
|
2016-01-08 22:47:59 +08:00
|
|
|
return callback(err);
|
2015-12-22 02:57:10 +08:00
|
|
|
}
|
|
|
|
|
2016-05-18 01:55:33 +08:00
|
|
|
self.setTTL(job, function (err) {
|
2016-01-04 22:20:06 +08:00
|
|
|
if (err) {
|
2016-01-08 22:47:59 +08:00
|
|
|
return callback(err);
|
2016-01-04 22:20:06 +08:00
|
|
|
}
|
|
|
|
|
2016-05-18 01:55:33 +08:00
|
|
|
self.get(job.job_id, function (err, job) {
|
2016-05-14 00:50:55 +08:00
|
|
|
if (err) {
|
|
|
|
return callback(err);
|
|
|
|
}
|
|
|
|
|
|
|
|
callback(null, job);
|
|
|
|
});
|
2016-01-04 22:20:06 +08:00
|
|
|
});
|
2015-12-22 02:57:10 +08:00
|
|
|
});
|
|
|
|
};
|
|
|
|
|
2016-10-27 23:36:40 +08:00
|
|
|
var WORK_IN_PROGRESS_JOB = {
|
|
|
|
DB: 5,
|
2016-10-28 02:42:49 +08:00
|
|
|
PREFIX_USER: 'batch:wip:user:',
|
2016-11-03 21:22:43 +08:00
|
|
|
USER_INDEX_KEY: 'batch:wip:users'
|
2016-10-27 23:36:40 +08:00
|
|
|
};
|
|
|
|
|
2016-10-27 23:46:43 +08:00
|
|
|
JobBackend.prototype.addWorkInProgressJob = function (user, jobId, callback) {
|
2016-11-03 21:22:43 +08:00
|
|
|
var userWIPKey = WORK_IN_PROGRESS_JOB.PREFIX_USER + user;
|
2019-04-04 20:31:41 +08:00
|
|
|
this.logger.debug('add job %s to user %s (%s)', jobId, user, userWIPKey);
|
2016-10-27 23:36:40 +08:00
|
|
|
this.metadataBackend.redisMultiCmd(WORK_IN_PROGRESS_JOB.DB, [
|
2016-11-03 21:22:43 +08:00
|
|
|
['SADD', WORK_IN_PROGRESS_JOB.USER_INDEX_KEY, user],
|
2016-10-27 23:46:43 +08:00
|
|
|
['RPUSH', userWIPKey, jobId]
|
2016-10-27 23:36:40 +08:00
|
|
|
], callback);
|
|
|
|
};
|
|
|
|
|
2016-10-28 21:18:57 +08:00
|
|
|
JobBackend.prototype.clearWorkInProgressJob = function (user, jobId, callback) {
|
2016-11-03 21:22:43 +08:00
|
|
|
var self = this;
|
|
|
|
var DB = WORK_IN_PROGRESS_JOB.DB;
|
|
|
|
var userWIPKey = WORK_IN_PROGRESS_JOB.PREFIX_USER + user;
|
2016-10-28 21:18:57 +08:00
|
|
|
|
2016-11-03 21:22:43 +08:00
|
|
|
var params = [userWIPKey, 0, jobId];
|
|
|
|
self.metadataBackend.redisCmd(DB, 'LREM', params, function (err) {
|
|
|
|
if (err) {
|
|
|
|
return callback(err);
|
|
|
|
}
|
|
|
|
|
|
|
|
params = [userWIPKey, 0, -1];
|
|
|
|
self.metadataBackend.redisCmd(DB, 'LRANGE', params, function (err, workInProgressJobs) {
|
|
|
|
if (err) {
|
|
|
|
return callback(err);
|
|
|
|
}
|
|
|
|
|
2019-04-04 20:31:41 +08:00
|
|
|
self.logger.debug('user %s has work in progress jobs %j', user, workInProgressJobs);
|
2016-11-03 21:22:43 +08:00
|
|
|
|
|
|
|
if (workInProgressJobs.length < 0) {
|
|
|
|
return callback();
|
|
|
|
}
|
|
|
|
|
2019-04-04 20:31:41 +08:00
|
|
|
self.logger.debug('delete user %s from index', user);
|
2016-11-03 21:22:43 +08:00
|
|
|
|
|
|
|
params = [WORK_IN_PROGRESS_JOB.USER_INDEX_KEY, user];
|
|
|
|
self.metadataBackend.redisCmd(DB, 'SREM', params, function (err) {
|
|
|
|
if (err) {
|
|
|
|
return callback(err);
|
|
|
|
}
|
|
|
|
|
|
|
|
return callback();
|
|
|
|
});
|
|
|
|
});
|
|
|
|
});
|
2016-10-28 21:18:57 +08:00
|
|
|
};
|
|
|
|
|
2016-10-28 00:43:28 +08:00
|
|
|
JobBackend.prototype.listWorkInProgressJobByUser = function (user, callback) {
|
2016-10-28 02:42:49 +08:00
|
|
|
var userWIPKey = WORK_IN_PROGRESS_JOB.PREFIX_USER + user;
|
2016-11-03 21:22:43 +08:00
|
|
|
var params = [userWIPKey, 0, -1];
|
|
|
|
this.metadataBackend.redisCmd(WORK_IN_PROGRESS_JOB.DB, 'LRANGE', params, callback);
|
2016-10-28 00:43:28 +08:00
|
|
|
};
|
|
|
|
|
2016-10-28 18:50:33 +08:00
|
|
|
JobBackend.prototype.listWorkInProgressJobs = function (callback) {
|
2016-10-28 18:30:33 +08:00
|
|
|
var self = this;
|
2016-11-03 21:22:43 +08:00
|
|
|
var DB = WORK_IN_PROGRESS_JOB.DB;
|
2016-10-28 02:42:49 +08:00
|
|
|
|
2016-11-03 21:22:43 +08:00
|
|
|
var params = [WORK_IN_PROGRESS_JOB.USER_INDEX_KEY];
|
|
|
|
this.metadataBackend.redisCmd(DB, 'SMEMBERS', params, function (err, workInProgressUsers) {
|
2016-10-28 02:42:49 +08:00
|
|
|
if (err) {
|
|
|
|
return callback(err);
|
|
|
|
}
|
|
|
|
|
2016-11-03 21:22:43 +08:00
|
|
|
if (workInProgressUsers < 1) {
|
|
|
|
return callback(null, {});
|
|
|
|
}
|
|
|
|
|
2019-04-04 20:31:41 +08:00
|
|
|
self.logger.debug('found %j work in progress users', workInProgressUsers);
|
2016-10-28 02:42:49 +08:00
|
|
|
|
2016-11-03 21:22:43 +08:00
|
|
|
var usersQueue = queue(4);
|
2016-10-28 18:26:24 +08:00
|
|
|
|
2016-11-03 21:22:43 +08:00
|
|
|
workInProgressUsers.forEach(function (user) {
|
|
|
|
usersQueue.defer(self.listWorkInProgressJobByUser.bind(self), user);
|
2016-10-28 18:30:33 +08:00
|
|
|
});
|
2016-10-28 02:42:49 +08:00
|
|
|
|
2016-10-28 18:29:28 +08:00
|
|
|
usersQueue.awaitAll(function (err, userWorkInProgressJobs) {
|
2016-10-28 02:42:49 +08:00
|
|
|
if (err) {
|
|
|
|
return callback(err);
|
|
|
|
}
|
|
|
|
|
2016-11-03 21:22:43 +08:00
|
|
|
var workInProgressJobs = workInProgressUsers.reduce(function (users, user, index) {
|
|
|
|
users[user] = userWorkInProgressJobs[index];
|
2019-04-04 20:31:41 +08:00
|
|
|
self.logger.debug('found %j work in progress jobs for user %s', userWorkInProgressJobs[index], user);
|
2016-10-28 02:42:49 +08:00
|
|
|
return users;
|
|
|
|
}, {});
|
|
|
|
|
2016-10-28 18:29:28 +08:00
|
|
|
callback(null, workInProgressJobs);
|
2016-10-28 02:42:49 +08:00
|
|
|
});
|
2016-10-28 18:30:33 +08:00
|
|
|
});
|
2016-10-28 02:42:49 +08:00
|
|
|
};
|
|
|
|
|
2016-05-18 01:55:33 +08:00
|
|
|
JobBackend.prototype.setTTL = function (job, callback) {
|
2016-05-14 00:50:55 +08:00
|
|
|
var self = this;
|
2016-05-18 01:55:33 +08:00
|
|
|
var redisKey = REDIS_PREFIX + job.job_id;
|
2016-03-31 18:39:03 +08:00
|
|
|
|
2016-10-10 18:09:13 +08:00
|
|
|
if (!JobStatus.isFinal(job.status)) {
|
2016-05-14 00:50:55 +08:00
|
|
|
return callback();
|
|
|
|
}
|
2016-03-31 18:39:03 +08:00
|
|
|
|
2019-12-24 01:19:08 +08:00
|
|
|
self.metadataBackend.redisCmd(REDIS_DB, 'EXPIRE', [redisKey, this.inSecondsJobTTLAfterFinished], callback);
|
2016-03-31 18:39:03 +08:00
|
|
|
};
|
|
|
|
|
2015-12-22 02:57:10 +08:00
|
|
|
module.exports = JobBackend;
|