CartoDB-SQL-API/batch/job_backend.js

264 lines
7.5 KiB
JavaScript
Raw Normal View History

'use strict';
2016-05-18 01:27:05 +08:00
var REDIS_PREFIX = 'batch:jobs:';
2016-05-18 01:48:55 +08:00
var REDIS_DB = 5;
2016-10-10 18:09:13 +08:00
var JobStatus = require('./job_status');
2016-10-28 02:42:49 +08:00
var queue = require('queue-async');
var debug = require('./util/debug')('job-backend');
function JobBackend(metadataBackend, jobQueue) {
this.metadataBackend = metadataBackend;
this.jobQueue = jobQueue;
2016-10-17 21:23:53 +08:00
this.maxNumberOfQueuedJobs = global.settings.batch_max_queued_jobs || 64;
2016-10-17 16:51:50 +08:00
this.inSecondsJobTTLAfterFinished = global.settings.finished_jobs_ttl_in_seconds || 2 * 3600; // 2 hours
this.hostname = global.settings.api_hostname || 'batch';
}
2016-05-18 01:55:33 +08:00
function toRedisParams(job) {
var redisParams = [REDIS_PREFIX + job.job_id];
var obj = JSON.parse(JSON.stringify(job));
2016-05-16 07:22:47 +08:00
delete obj.job_id;
2016-05-14 00:50:55 +08:00
for (var property in obj) {
if (obj.hasOwnProperty(property)) {
redisParams.push(property);
2016-05-24 00:47:45 +08:00
if (property === 'query' && typeof obj[property] !== 'string') {
2016-05-17 07:00:27 +08:00
redisParams.push(JSON.stringify(obj[property]));
2016-05-16 07:22:47 +08:00
} else {
redisParams.push(obj[property]);
}
}
2016-05-14 00:50:55 +08:00
}
2016-05-14 00:50:55 +08:00
return redisParams;
2016-05-18 01:27:05 +08:00
}
2016-05-18 01:27:05 +08:00
function toObject(job_id, redisParams, redisValues) {
2016-05-14 00:50:55 +08:00
var obj = {};
2016-05-16 07:22:47 +08:00
redisParams.shift(); // job_id value
redisParams.pop(); // WARN: weird function pushed by metadataBackend
2016-05-14 00:50:55 +08:00
for (var i = 0; i < redisParams.length; i++) {
2016-05-17 07:00:27 +08:00
// TODO: this should be moved to job model
if (redisParams[i] === 'query') {
2016-05-16 07:22:47 +08:00
try {
obj[redisParams[i]] = JSON.parse(redisValues[i]);
} catch (e) {
obj[redisParams[i]] = redisValues[i];
}
2016-05-16 17:56:44 +08:00
} else if (redisValues[i]) {
2016-05-16 07:22:47 +08:00
obj[redisParams[i]] = redisValues[i];
}
2016-05-14 00:50:55 +08:00
}
2016-05-16 07:22:47 +08:00
obj.job_id = job_id; // adds redisKey as object property
2016-05-14 00:50:55 +08:00
return obj;
2016-05-18 01:27:05 +08:00
}
2016-05-14 00:50:55 +08:00
function isJobFound(redisValues) {
return !!(redisValues[0] && redisValues[1] && redisValues[2] && redisValues[3] && redisValues[4]);
2016-05-14 00:50:55 +08:00
}
JobBackend.prototype.get = function (job_id, callback) {
var self = this;
var redisParams = [
2016-05-18 01:27:05 +08:00
REDIS_PREFIX + job_id,
'user',
'status',
'query',
'created_at',
'updated_at',
2016-05-16 07:22:47 +08:00
'host',
'failed_reason',
'fallback_status'
];
2016-05-18 01:48:55 +08:00
self.metadataBackend.redisCmd(REDIS_DB, 'HMGET', redisParams , function (err, redisValues) {
if (err) {
return callback(err);
}
2016-05-14 00:50:55 +08:00
if (!isJobFound(redisValues)) {
var notFoundError = new Error('Job with id ' + job_id + ' not found');
notFoundError.name = 'NotFoundError';
return callback(notFoundError);
}
2016-05-18 01:27:05 +08:00
var jobData = toObject(job_id, redisParams, redisValues);
2016-05-14 00:50:55 +08:00
callback(null, jobData);
});
};
2016-05-18 01:55:33 +08:00
JobBackend.prototype.create = function (job, callback) {
var self = this;
this.jobQueue.size(job.user, function(err, size) {
if (err) {
return callback(new Error('Failed to create job, could not determine user queue size'));
}
if (size >= self.maxNumberOfQueuedJobs) {
2016-10-17 21:23:53 +08:00
return callback(new Error(
'Failed to create job. ' +
'Max number of jobs (' + self.maxNumberOfQueuedJobs + ') queued reached'
));
}
self.get(job.job_id, function (err) {
if (err && err.name !== 'NotFoundError') {
2016-05-14 00:50:55 +08:00
return callback(err);
}
self.save(job, function (err, jobSaved) {
2016-05-14 00:50:55 +08:00
if (err) {
return callback(err);
}
self.jobQueue.enqueue(job.user, job.job_id, function (err) {
if (err) {
return callback(err);
}
return callback(null, jobSaved);
});
2016-05-14 00:50:55 +08:00
});
});
});
};
2016-05-18 01:55:33 +08:00
JobBackend.prototype.update = function (job, callback) {
var self = this;
2016-05-18 01:55:33 +08:00
self.get(job.job_id, function (err) {
if (err) {
return callback(err);
}
2016-05-18 01:55:33 +08:00
self.save(job, callback);
});
};
2016-05-18 01:55:33 +08:00
JobBackend.prototype.save = function (job, callback) {
var self = this;
2016-05-18 01:55:33 +08:00
var redisParams = toRedisParams(job);
2016-05-18 01:48:55 +08:00
self.metadataBackend.redisCmd(REDIS_DB, 'HMSET', redisParams , function (err) {
if (err) {
return callback(err);
}
2016-05-18 01:55:33 +08:00
self.setTTL(job, function (err) {
if (err) {
return callback(err);
}
2016-05-18 01:55:33 +08:00
self.get(job.job_id, function (err, job) {
2016-05-14 00:50:55 +08:00
if (err) {
return callback(err);
}
callback(null, job);
});
});
});
};
var WORK_IN_PROGRESS_JOB = {
DB: 5,
2016-10-28 02:42:49 +08:00
PREFIX_USER: 'batch:wip:user:',
PREFIX_HOST: 'batch:wip:host:'
};
2016-10-27 23:46:43 +08:00
JobBackend.prototype.addWorkInProgressJob = function (user, jobId, callback) {
2016-10-28 02:42:49 +08:00
var hostWIPKey = WORK_IN_PROGRESS_JOB.PREFIX_HOST + this.hostname; // will be used for draining jobs.
var userWIPKey = WORK_IN_PROGRESS_JOB.PREFIX_USER + user; // will be used for listing users and their running jobs
this.metadataBackend.redisMultiCmd(WORK_IN_PROGRESS_JOB.DB, [
2016-10-27 23:46:43 +08:00
['RPUSH', hostWIPKey, jobId],
['RPUSH', userWIPKey, jobId]
], callback);
};
JobBackend.prototype.listWorkInProgressJobByUser = function (user, callback) {
2016-10-28 02:42:49 +08:00
var userWIPKey = WORK_IN_PROGRESS_JOB.PREFIX_USER + user;
this.metadataBackend.redisCmd(WORK_IN_PROGRESS_JOB.DB, 'LRANGE', [userWIPKey, 0, -1], callback);
};
2016-10-28 02:42:49 +08:00
JobBackend.prototype.listWorkInProgressJob = function (callback) {
var initialCursor = ['0'];
var users = {};
this._getWIPByUserKeys(initialCursor, users, function (err, users) {
if (err) {
return callback(err);
}
2016-10-28 02:45:47 +08:00
debug('found %j', users);
2016-10-28 02:42:49 +08:00
var usersName = Object.keys(users);
2016-10-28 02:43:42 +08:00
var usersQueue = queue(usersName.length);
2016-10-28 02:42:49 +08:00
2016-10-28 18:24:23 +08:00
usersName.forEach(function (userName) {
usersQueue.defer(this.listWorkInProgressJobByUser.bind(this), userName);
2016-10-28 02:42:49 +08:00
}.bind(this));
usersQueue.awaitAll(function (err, results) {
if (err) {
return callback(err);
}
var usersRes = usersName.reduce(function (users, userName, index) {
users[userName] = results[index];
return users;
}, {});
callback(null, usersRes);
});
}.bind(this));
};
JobBackend.prototype._getWIPByUserKeys = function (cursor, users, callback) {
var userWIPKeyPattern = WORK_IN_PROGRESS_JOB.PREFIX_USER + '*';
var scanParams = [cursor[0], 'MATCH', userWIPKeyPattern];
this.metadataBackend.redisCmd(WORK_IN_PROGRESS_JOB.DB, 'SCAN', scanParams, function (err, currentCursor) {
if (err) {
return callback(err);
}
var usersKeys = currentCursor[1];
if (usersKeys) {
usersKeys.forEach(function (userKey) {
var user = userKey.substr(WORK_IN_PROGRESS_JOB.PREFIX_USER.length);
users[user] = userKey;
});
}
var hasMore = currentCursor[0] !== '0';
if (!hasMore) {
return callback(null, users);
}
this._getWIPByUserKeys(currentCursor, users, callback);
}.bind(this));
};
2016-05-18 01:55:33 +08:00
JobBackend.prototype.setTTL = function (job, callback) {
2016-05-14 00:50:55 +08:00
var self = this;
2016-05-18 01:55:33 +08:00
var redisKey = REDIS_PREFIX + job.job_id;
2016-10-10 18:09:13 +08:00
if (!JobStatus.isFinal(job.status)) {
2016-05-14 00:50:55 +08:00
return callback();
}
2016-10-17 16:51:50 +08:00
self.metadataBackend.redisCmd(REDIS_DB, 'EXPIRE', [ redisKey, this.inSecondsJobTTLAfterFinished ], callback);
};
module.exports = JobBackend;