Implemented tests for batch API

This commit is contained in:
Daniel García Aubert 2015-12-22 23:13:33 +01:00
parent 3c23bf12e7
commit b16c0983c6
3 changed files with 54 additions and 10 deletions

View File

@ -1,12 +1,12 @@
'use strict';
var util = require('util');
var EventEmitter = require('events').EventEmitter;
var JobRunner = require('./job_runner');
var JobQueuePool = require('./job_queue_pool');
var JobQueueConsumer = require('./job_queue_consumer');
var JobSubscriber = require('./job_subscriber');
var UserDatabaseMetadataService = require('./user_database_metadata_service');
var EventEmitter = require('events').EventEmitter;
var util = require('util');
function Batch(metadataBackend) {
EventEmitter.call(this);
@ -19,20 +19,22 @@ util.inherits(Batch, EventEmitter);
Batch.prototype.start = function () {
var self = this;
this.jobQueuePool = new JobQueuePool();
var jobRunner = this.jobRunner;
var metadataBackend = this.metadataBackend;
var jobQueuePool = new JobQueuePool();
// subscribe to message exchange broker in order to know what queues are available
this.jobSubscriber.subscribe(function onMessage(channel, host) {
var jobQueueConsumer = self.jobQueuePool.get(host);
var jobQueueConsumer = jobQueuePool.get(host);
// if queue consumer is not registered yet
if (!jobQueueConsumer) {
// creates new one
jobQueueConsumer = new JobQueueConsumer(self.metadataBackend, host);
jobQueueConsumer = new JobQueueConsumer(metadataBackend, host);
// register it in batch service
self.jobQueuePool.add(host, jobQueueConsumer);
jobQueuePool.add(host, jobQueueConsumer);
// while read from queue then perform job
jobQueueConsumer.on('data', function (jobId) {
@ -40,7 +42,7 @@ Batch.prototype.start = function () {
// limit one job at the same time per queue (queue <1:1> db intance)
jobQueueConsumer.pause();
var job = self.jobRunner.run(jobId);
var job = jobRunner.run(jobId);
job.on('done', function () {
// next job
@ -57,10 +59,10 @@ Batch.prototype.start = function () {
})
.on('error', function (err) {
console.error(err.stack || err);
self.jobQueuePool.remove(host);
jobQueuePool.remove(host);
})
.on('end', function () {
self.jobQueuePool.remove(host);
jobQueuePool.remove(host);
});
}
});

View File

@ -29,7 +29,8 @@
"step": "~0.0.5",
"step-profiler": "~0.1.0",
"topojson": "0.0.8",
"underscore": "~1.6.0"
"underscore": "~1.6.0",
"queue-async": "^1.0.7"
},
"devDependencies": {
"redis": "0.7.1",

View File

@ -1,3 +1,5 @@
var _ = require('underscore');
var queue = require('queue-async');
var Batch = require('../../batch/batch');
var JobPublisher = require('../../batch/job_publisher');
var JobQueueProducer = require('../../batch/job_queue_producer');
@ -85,4 +87,43 @@ describe('batch', function() {
});
});
it('should perform all job enqueued', function (done) {
var jobs = [
'select * from private_table',
'select * from private_table',
'select * from private_table',
'select * from private_table',
'select * from private_table',
'select * from private_table',
'select * from private_table',
'select * from private_table',
'select * from private_table',
'select * from private_table'
];
var jobsQueue = queue(jobs.length);
jobs.forEach(function(job) {
jobsQueue.defer(createJob, job);
});
jobsQueue.awaitAll(function (err, jobsCreated) {
if (err) {
return done(err);
}
var jobsDone = 0;
batch.on('job:done', function (jobId) {
_.find(jobsCreated, function(job) {
if (jobId === job.jobId) {
jobsDone += 1;
}
if (jobsDone === jobs.length) {
done();
}
});
});
});
});
});