Stop indexing jobs per user
Removes .list() from job backend
This commit is contained in:
parent
05ada98124
commit
d33fe5ac21
@ -12,7 +12,6 @@ var QueueSeeker = require('./queue_seeker');
|
||||
var UserDatabaseMetadataService = require('./user_database_metadata_service');
|
||||
var JobPublisher = require('./job_publisher');
|
||||
var JobQueue = require('./job_queue');
|
||||
var UserIndexer = require('./user_indexer');
|
||||
var JobBackend = require('./job_backend');
|
||||
var JobService = require('./job_service');
|
||||
var Batch = require('./batch');
|
||||
@ -25,8 +24,7 @@ module.exports = function batchFactory (metadataBackend, redisConfig, statsdClie
|
||||
var jobPublisher = new JobPublisher(redisPoolPublisher);
|
||||
var jobQueuePool = new JobQueuePool(metadataBackend, jobPublisher);
|
||||
var jobQueue = new JobQueue(metadataBackend, jobPublisher);
|
||||
var userIndexer = new UserIndexer(metadataBackend);
|
||||
var jobBackend = new JobBackend(metadataBackend, jobQueue, userIndexer);
|
||||
var jobBackend = new JobBackend(metadataBackend, jobQueue);
|
||||
var userDatabaseMetadataService = new UserDatabaseMetadataService(metadataBackend);
|
||||
var queryRunner = new QueryRunner(userDatabaseMetadataService);
|
||||
var jobCanceller = new JobCanceller(userDatabaseMetadataService);
|
||||
|
@ -1,8 +1,5 @@
|
||||
'use strict';
|
||||
|
||||
|
||||
var queue = require('queue-async');
|
||||
var debug = require('./util/debug')('job-backend');
|
||||
var REDIS_PREFIX = 'batch:jobs:';
|
||||
var REDIS_DB = 5;
|
||||
var FINISHED_JOBS_TTL_IN_SECONDS = global.settings.finished_jobs_ttl_in_seconds || 2 * 3600; // 2 hours
|
||||
@ -14,10 +11,9 @@ var finalStatus = [
|
||||
jobStatus.UNKNOWN
|
||||
];
|
||||
|
||||
function JobBackend(metadataBackend, jobQueueProducer, userIndexer) {
|
||||
function JobBackend(metadataBackend, jobQueueProducer) {
|
||||
this.metadataBackend = metadataBackend;
|
||||
this.jobQueueProducer = jobQueueProducer;
|
||||
this.userIndexer = userIndexer;
|
||||
}
|
||||
|
||||
function toRedisParams(job) {
|
||||
@ -116,13 +112,7 @@ JobBackend.prototype.create = function (job, callback) {
|
||||
return callback(err);
|
||||
}
|
||||
|
||||
self.userIndexer.add(job.user, job.job_id, function (err) {
|
||||
if (err) {
|
||||
return callback(err);
|
||||
}
|
||||
|
||||
callback(null, jobSaved);
|
||||
});
|
||||
return callback(null, jobSaved);
|
||||
});
|
||||
});
|
||||
});
|
||||
@ -181,69 +171,4 @@ JobBackend.prototype.setTTL = function (job, callback) {
|
||||
self.metadataBackend.redisCmd(REDIS_DB, 'EXPIRE', [ redisKey, FINISHED_JOBS_TTL_IN_SECONDS ], callback);
|
||||
};
|
||||
|
||||
JobBackend.prototype.list = function (user, callback) {
|
||||
var self = this;
|
||||
|
||||
this.userIndexer.list(user, function (err, job_ids) {
|
||||
if (err) {
|
||||
return callback(err);
|
||||
}
|
||||
|
||||
var initialLength = job_ids.length;
|
||||
|
||||
self._getCleanedList(user, job_ids, function (err, jobs) {
|
||||
if (err) {
|
||||
return callback(err);
|
||||
}
|
||||
|
||||
if (jobs.length < initialLength) {
|
||||
return self.list(user, callback);
|
||||
}
|
||||
|
||||
callback(null, jobs);
|
||||
});
|
||||
});
|
||||
};
|
||||
|
||||
JobBackend.prototype._getCleanedList = function (user, job_ids, callback) {
|
||||
var self = this;
|
||||
|
||||
var jobsQueue = queue(job_ids.length);
|
||||
|
||||
job_ids.forEach(function(job_id) {
|
||||
jobsQueue.defer(self._getIndexedJob.bind(self), job_id, user);
|
||||
});
|
||||
|
||||
jobsQueue.awaitAll(function (err, jobs) {
|
||||
if (err) {
|
||||
return callback(err);
|
||||
}
|
||||
|
||||
callback(null, jobs.filter(function (job) {
|
||||
return job ? true : false;
|
||||
}));
|
||||
});
|
||||
};
|
||||
|
||||
JobBackend.prototype._getIndexedJob = function (job_id, user, callback) {
|
||||
var self = this;
|
||||
|
||||
this.get(job_id, function (err, job) {
|
||||
if (err && err.name === 'NotFoundError') {
|
||||
return self.userIndexer.remove(user, job_id, function (err) {
|
||||
if (err) {
|
||||
debug('Error removing key %s in user set', job_id, err);
|
||||
}
|
||||
callback();
|
||||
});
|
||||
}
|
||||
|
||||
if (err) {
|
||||
return callback(err);
|
||||
}
|
||||
|
||||
callback(null, job);
|
||||
});
|
||||
};
|
||||
|
||||
module.exports = JobBackend;
|
||||
|
@ -110,38 +110,4 @@ describe('job backend', function() {
|
||||
done();
|
||||
});
|
||||
});
|
||||
|
||||
it('.list() should return a list of user\'s jobs', function (done) {
|
||||
var job = createWadusJob();
|
||||
|
||||
jobBackend.create(job.data, function (err, jobCreated) {
|
||||
if (err) {
|
||||
return done(err);
|
||||
}
|
||||
|
||||
jobBackend.list(USER, function (err, jobs) {
|
||||
var found = false;
|
||||
|
||||
assert.ok(!err, err);
|
||||
assert.ok(jobs.length);
|
||||
|
||||
jobs.forEach(function (job) {
|
||||
if (job.job_id === jobCreated.job_id) {
|
||||
found = true;
|
||||
}
|
||||
});
|
||||
|
||||
assert.ok(found, 'Job expeted to be listed not found');
|
||||
done();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
it('.list() should return a empty list for nonexitent user', function (done) {
|
||||
jobBackend.list('wadus_user', function (err, jobs) {
|
||||
assert.ok(!err, err);
|
||||
assert.ok(!jobs.length);
|
||||
done();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
Loading…
Reference in New Issue
Block a user