268 lines
8.2 KiB
JavaScript
268 lines
8.2 KiB
JavaScript
var assert = require('../support/assert');
|
|
var redisUtils = require('../support/redis_utils');
|
|
var _ = require('underscore');
|
|
var RedisPool = require('redis-mpool');
|
|
var queue = require('queue-async');
|
|
var batchFactory = require('../../batch');
|
|
|
|
var JobPublisher = require('../../batch/job_publisher');
|
|
var JobQueue = require('../../batch/job_queue');
|
|
var JobBackend = require('../../batch/job_backend');
|
|
var JobService = require('../../batch/job_service');
|
|
var UserDatabaseMetadataService = require('../../batch/user_database_metadata_service');
|
|
var JobCanceller = require('../../batch/job_canceller');
|
|
var redisConfig = {
|
|
host: global.settings.redis_host,
|
|
port: global.settings.redis_port,
|
|
max: global.settings.redisPool,
|
|
idleTimeoutMillis: global.settings.redisIdleTimeoutMillis,
|
|
reapIntervalMillis: global.settings.redisReapIntervalMillis
|
|
};
|
|
var metadataBackend = require('cartodb-redis')(redisConfig);
|
|
|
|
describe('batch module', function() {
|
|
var dbInstance = 'localhost';
|
|
var username = 'vizzuality';
|
|
var redisPoolPublisher = new RedisPool(_.extend(redisConfig, { name: 'batch-publisher'}));
|
|
var jobPublisher = new JobPublisher(redisPoolPublisher);
|
|
var jobQueue = new JobQueue(metadataBackend, jobPublisher);
|
|
var jobBackend = new JobBackend(metadataBackend, jobQueue);
|
|
var userDatabaseMetadataService = new UserDatabaseMetadataService(metadataBackend);
|
|
var jobCanceller = new JobCanceller(userDatabaseMetadataService);
|
|
var jobService = new JobService(jobBackend, jobCanceller);
|
|
|
|
var batch = batchFactory(metadataBackend, redisConfig);
|
|
|
|
before(function (done) {
|
|
batch.start();
|
|
batch.on('ready', done);
|
|
});
|
|
|
|
after(function (done) {
|
|
batch.stop();
|
|
redisUtils.clean('batch:*', done);
|
|
});
|
|
|
|
function createJob(sql, done) {
|
|
var data = {
|
|
user: username,
|
|
query: sql,
|
|
host: dbInstance
|
|
};
|
|
|
|
jobService.create(data, function (err, job) {
|
|
if (err) {
|
|
return done(err);
|
|
}
|
|
|
|
done(null, job.serialize());
|
|
});
|
|
}
|
|
|
|
it('should perform job with select', function (done) {
|
|
createJob('select * from private_table', function (err, job) {
|
|
if (err) {
|
|
return done(err);
|
|
}
|
|
|
|
batch.on('job:done', function (job_id) {
|
|
if (job_id === job.job_id) {
|
|
done();
|
|
}
|
|
});
|
|
});
|
|
});
|
|
|
|
it('should perform job with select into', function (done) {
|
|
createJob('select * into batch_test_table from (select * from private_table) as job', function (err, job) {
|
|
if (err) {
|
|
return done(err);
|
|
}
|
|
|
|
batch.on('job:done', function (job_id) {
|
|
if (job_id === job.job_id) {
|
|
done();
|
|
}
|
|
});
|
|
});
|
|
});
|
|
|
|
it('should perform job swith select from result table', function (done) {
|
|
createJob('select * from batch_test_table', function (err, job) {
|
|
if (err) {
|
|
return done(err);
|
|
}
|
|
|
|
batch.on('job:done', function (job_id) {
|
|
if (job_id === job.job_id) {
|
|
done();
|
|
}
|
|
});
|
|
});
|
|
});
|
|
|
|
it('should perform all enqueued jobs', function (done) {
|
|
var jobs = [
|
|
'select * from private_table',
|
|
'select * from private_table',
|
|
'select * from private_table',
|
|
'select * from private_table',
|
|
'select * from private_table',
|
|
'select * from private_table',
|
|
'select * from private_table',
|
|
'select * from private_table',
|
|
'select * from private_table',
|
|
'select * from private_table'
|
|
];
|
|
|
|
var jobsQueue = queue(jobs.length);
|
|
|
|
jobs.forEach(function(job) {
|
|
jobsQueue.defer(createJob, job);
|
|
});
|
|
|
|
jobsQueue.awaitAll(function (err, jobsCreated) {
|
|
if (err) {
|
|
return done(err);
|
|
}
|
|
|
|
var jobsDone = 0;
|
|
|
|
batch.on('job:done', function (job_id) {
|
|
_.find(jobsCreated, function(job) {
|
|
if (job_id === job.job_id) {
|
|
jobsDone += 1;
|
|
if (jobsDone === jobs.length) {
|
|
done();
|
|
}
|
|
}
|
|
});
|
|
});
|
|
});
|
|
});
|
|
|
|
it('should set all job as failed', function (done) {
|
|
var jobs = [
|
|
'select * from unexistent_table',
|
|
'select * from unexistent_table',
|
|
'select * from unexistent_table',
|
|
'select * from unexistent_table',
|
|
'select * from unexistent_table',
|
|
'select * from unexistent_table',
|
|
'select * from unexistent_table',
|
|
'select * from unexistent_table',
|
|
'select * from unexistent_table',
|
|
'select * from unexistent_table'
|
|
];
|
|
|
|
var jobsQueue = queue(jobs.length);
|
|
|
|
jobs.forEach(function(job) {
|
|
jobsQueue.defer(createJob, job);
|
|
});
|
|
|
|
jobsQueue.awaitAll(function (err, jobsCreated) {
|
|
if (err) {
|
|
return done(err);
|
|
}
|
|
|
|
var jobsFailed = 0;
|
|
|
|
batch.on('job:failed', function (job_id) {
|
|
_.find(jobsCreated, function(job) {
|
|
if (job_id === job.job_id) {
|
|
jobsFailed += 1;
|
|
if (jobsFailed === jobs.length) {
|
|
done();
|
|
}
|
|
}
|
|
});
|
|
});
|
|
});
|
|
});
|
|
|
|
it('should drain the current job', function (done) {
|
|
createJob('select pg_sleep(3)', function (err, job) {
|
|
if (err) {
|
|
return done(err);
|
|
}
|
|
setTimeout(function () {
|
|
jobBackend.get(job.job_id, function (err, job) {
|
|
if (err) {
|
|
done(err);
|
|
}
|
|
|
|
assert.equal(job.status, 'running');
|
|
|
|
batch.drain(function () {
|
|
jobBackend.get(job.job_id, function (err, job) {
|
|
if (err) {
|
|
done(err);
|
|
}
|
|
assert.equal(job.status, 'pending');
|
|
done();
|
|
});
|
|
});
|
|
});
|
|
}, 50);
|
|
});
|
|
});
|
|
|
|
it('should perform job with array of select', function (done) {
|
|
var queries = ['select * from private_table limit 1', 'select * from private_table'];
|
|
|
|
createJob(queries, function (err, job) {
|
|
if (err) {
|
|
return done(err);
|
|
}
|
|
|
|
var queriesDone = 0;
|
|
|
|
var checkJobDone = function (job_id) {
|
|
if (job_id === job.job_id) {
|
|
queriesDone += 1;
|
|
if (queriesDone === queries.length) {
|
|
done();
|
|
}
|
|
}
|
|
};
|
|
|
|
batch.on('job:done', checkJobDone);
|
|
batch.on('job:pending', checkJobDone);
|
|
});
|
|
});
|
|
|
|
it('should set job as failed if last query fails', function (done) {
|
|
var queries = ['select * from private_table', 'select * from undefined_table'];
|
|
|
|
createJob(queries, function (err, job) {
|
|
if (err) {
|
|
return done(err);
|
|
}
|
|
|
|
batch.on('job:failed', function (job_id) {
|
|
if (job_id === job.job_id) {
|
|
done();
|
|
}
|
|
});
|
|
});
|
|
});
|
|
|
|
it('should set job as failed if first query fails', function (done) {
|
|
var queries = ['select * from undefined_table', 'select * from private_table'];
|
|
|
|
createJob(queries, function (err, job) {
|
|
if (err) {
|
|
return done(err);
|
|
}
|
|
|
|
batch.on('job:failed', function (job_id) {
|
|
if (job_id === job.job_id) {
|
|
done();
|
|
}
|
|
});
|
|
});
|
|
});
|
|
|
|
});
|