Changed redis data structure for users jobs

This commit is contained in:
Daniel García Aubert 2016-01-04 19:08:13 +01:00
parent 13ffaab0d5
commit 246655de94
5 changed files with 47 additions and 12 deletions

View File

@ -36,6 +36,7 @@ JobController.prototype.route = function (app) {
app.get(global.settings.base_url + '/job/:job_id', this.getJob.bind(this));
app.delete(global.settings.base_url + '/job/:job_id', this.cancelJob.bind(this));
app.put(global.settings.base_url + '/job/:job_id', this.updateJob.bind(this));
app.patch(global.settings.base_url + '/job/:job_id', this.updateJob.bind(this));
};
JobController.prototype.cancelJob = function (req, res) {
@ -104,9 +105,12 @@ JobController.prototype.cancelJob = function (req, res) {
var jobCanceller = new JobCanceller(self.metadataBackend, self.userDatabaseMetadataService);
jobCanceller.cancel(job_id)
.on('cancelled', function () {
.on('cancelled', function (job) {
// job is cancelled but surelly jobRunner has not deal whith it yet and it's not saved
job.status = 'cancelled';
next(null, {
cancelled: true,
job: job,
host: userDatabase.host
});
})
@ -133,7 +137,7 @@ JobController.prototype.cancelJob = function (req, res) {
}
res.send({
cancelled: result.cancelled
cancelled: result.job
});
}
);

View File

@ -94,18 +94,43 @@ JobBackend.prototype.list = function (username, callback) {
var jobsQueue = queue(job_ids.length);
job_ids.forEach(function(job_id) {
jobsQueue.defer(self.get.bind(self), job_id);
jobsQueue.defer(self._getForList.bind(self), job_id, username);
});
jobsQueue.awaitAll(function (err, jobs) {
if (err) {
return callback(err);
}
callback(null, jobs);
callback(null, jobs.filter(function (job) {
return job ? true : false;
}));
});
});
};
JobBackend.prototype._getForList = function (job_id, username, callback) {
var self = this;
this.get(job_id, function (err, job) {
if (err && err.name === 'NotFoundError') {
return self.userIndexer.remove(username, job_id, function (err) {
if (err) {
console.error('Error removing key %s in user set', job_id, err);
}
callback();
});
}
if (err) {
return callback(err);
}
callback(null, job);
});
};
JobBackend.prototype.get = function (job_id, callback) {
var redisParams = [
this.redisPrefix + job_id,
@ -127,7 +152,9 @@ JobBackend.prototype.get = function (job_id, callback) {
}
if (!isJobFound(jobValues)) {
return callback(new Error('Job with id ' + job_id + ' not found'));
var notFoundError = new Error('Job with id ' + job_id + ' not found');
notFoundError.name = 'NotFoundError';
return callback(notFoundError);
}
callback(null, {
@ -198,7 +225,7 @@ JobBackend.prototype.setFailed = function (job, err) {
return self.emit('error', err);
}
self.metadataBackend.redisCmd(this.db, 'EXPIRE', [ redisKey, JOBS_TTL_AFTER_RESOLUTION ], function (err) {
self.metadataBackend.redisCmd(self.db, 'EXPIRE', [ redisKey, JOBS_TTL_AFTER_RESOLUTION ], function (err) {
if (err) {
return self.emit('error', err);
}
@ -222,7 +249,7 @@ JobBackend.prototype.setCancelled = function (job) {
return self.emit('error', err);
}
self.metadataBackend.redisCmd(this.db, 'EXPIRE', [ redisKey, JOBS_TTL_AFTER_RESOLUTION ], function (err) {
self.metadataBackend.redisCmd(self.db, 'EXPIRE', [ redisKey, JOBS_TTL_AFTER_RESOLUTION ], function (err) {
if (err) {
return self.emit('error', err);
}

View File

@ -65,7 +65,7 @@ JobCanceller.prototype.cancel = function (job_id) {
return jobBackend.emit('error', new Error('Query has not been cancelled'));
}
jobBackend.emit('cancelled');
jobBackend.emit('cancelled', job);
});
});
});

View File

@ -5,7 +5,7 @@ var PSQL = require('cartodb-psql');
var JobPublisher = require('./job_publisher');
var JobQueue = require('./job_queue');
var UserIndexer = require('./user_indexer');
var QUERY_CANCELED = 57014;
var QUERY_CANCELED = '57014';
function JobRunner(metadataBackend, userDatabaseMetadataService) {
this.metadataBackend = metadataBackend;

View File

@ -7,11 +7,15 @@ function UserIndexer(metadataBackend) {
}
UserIndexer.prototype.add = function (username, job_id, callback) {
this.metadataBackend.redisCmd(this.db, 'SADD', [ this.redisPrefix + username, job_id ] , callback);
this.metadataBackend.redisCmd(this.db, 'RPUSH', [ this.redisPrefix + username, job_id ] , callback);
};
UserIndexer.prototype.list = function (username, callback) {
this.metadataBackend.redisCmd(this.db, 'SMEMBERS', [ this.redisPrefix + username ] , callback);
this.metadataBackend.redisCmd(this.db, 'LRANGE', [ this.redisPrefix + username, -100, -1 ] , callback);
};
UserIndexer.prototype.remove = function (username, job_id, callback) {
this.metadataBackend.redisCmd(this.db, 'LREM', [ this.redisPrefix + username, 0, job_id] , callback);
};
module.exports = UserIndexer;