Implemented batch service

This commit is contained in:
Daniel García Aubert 2015-12-09 20:17:45 +01:00
parent c7680722ca
commit 6cc48bf9dd
15 changed files with 275 additions and 255 deletions

View File

@ -6,7 +6,7 @@ var assert = require('assert');
var PSQL = require('cartodb-psql');
var UserDatabaseService = require('../services/user_database_service');
var UserDatabaseQueue = require('../../batch/user_database_queue');
var UsernameQueue = require('../../batch/username_queue');
var CdbRequest = require('../models/cartodb_request');
var handleException = require('../utils/error_handler');
@ -17,7 +17,7 @@ function JobController(metadataBackend, tableCache, statsd_client) {
this.metadataBackend = metadataBackend;
this.tableCache = tableCache;
this.statsd_client = statsd_client;
this.userDatabaseQueue = new UserDatabaseQueue(metadataBackend);
this.userDatabaseQueue = new UsernameQueue(metadataBackend);
}
JobController.prototype.route = function (app) {

View File

@ -1,33 +1,52 @@
'use strict';
function BatchManager(jobDequeuer, queryRunner, jobCounter) {
this.jobDequeuer = jobDequeuer;
this.queryRunner = queryRunner;
this.jobCounter = jobCounter;
function BatchManager(usernameQueue, userDatabaseMetadataService, jobService, jobCounterService) {
this.usernameQueue = usernameQueue;
this.userDatabaseMetadataService = userDatabaseMetadataService;
this.jobService = jobService;
this.jobCounterService = jobCounterService;
}
BatchManager.prototype.run = function () {
BatchManager.prototype.run = function (callback) {
var self = this;
this.jobDequeuer.dequeue(function (err, pg, job, host) {
this.usernameQueue.dequeue(function (err, username) {
if (err) {
return console.error(err);
return callback(err);
}
if (!pg || !job || !host) {
return console.info('No job launched');
if (!username) {
return callback(new Error('No jobs scheduled'));
}
self.queryRunner.run(pg, job, host, function (err) {
self.userDatabaseMetadataService.getUserMetadata(username, function (err, userDatabaseMetadata) {
if (err) {
return console.error(err);
return callback(err);
}
if (!this.jobCounter.decrement(host)) {
return console.warn('Job counter for instance %s is out of range', host);
}
self.jobCounterService.increment(userDatabaseMetadata.host, function (err) {
if (err) {
return callback(err);
}
console.info('Job %s done successfully', job.job_id);
self.jobService.run(userDatabaseMetadata, function (err) {
if (err) {
callback(err);
self.usernameQueue.enqueue(username, function (err) {
if (err) {
callback(err);
}
});
}
self.jobCounterService.decrement(userDatabaseMetadata.host, function (err) {
if (err) {
return callback(err);
}
callback();
});
});
});
});
});
};

View File

@ -0,0 +1,26 @@
'use strict';
var cartoDBRedis = require('cartodb-redis');
var UserDatabaseMetadataService = require('./user_database_metadata_service');
var UsernameQueue = require('./username_queue');
var JobService = require('./job_service');
var JobCounterService = require('./job_counter_service');
var BatchManager = require('./batch_manager');
module.exports = function (maxJobsPerHost) {
var metadataBackend = cartoDBRedis({
host: global.settings.redis_host,
port: global.settings.redis_port,
max: global.settings.redisPool,
idleTimeoutMillis: global.settings.redisIdleTimeoutMillis,
reapIntervalMillis: global.settings.redisReapIntervalMillis
});
var usernameQueue = new UsernameQueue(metadataBackend);
var userDatabaseMetadataService = new UserDatabaseMetadataService(metadataBackend);
var jobService = new JobService(metadataBackend);
var jobCounterService = new JobCounterService(maxJobsPerHost, metadataBackend);
var batchManager = new BatchManager(usernameQueue, userDatabaseMetadataService, jobService, jobCounterService);
return batchManager;
};

View File

@ -1,45 +0,0 @@
'use strict';
function DatabaseDequeuer(userDatabaseQueue, metadataBackend, jobCounter) {
this.userDatabaseQueue = userDatabaseQueue;
this.metadataBackend = metadataBackend;
this.jobCounter = jobCounter;
}
DatabaseDequeuer.prototype.dequeue = function (callback) {
var self = this;
this.userDatabaseQueue.dequeue(function (err, userDatabaseName) {
if (err) {
return callback(err);
}
if (!userDatabaseName) {
return callback();
}
console.log('>>>>>>>>>>>>>>>>>>>>>>>> 1');
self.metadataBackend.getAllUserDBParams(userDatabaseName, function (err, userDatabase) {
console.log('>>>>>>>>>>>>>>>>>>>>>>>> 2');
if (err) {
return callback(err);
}
if (this.jobCounter.increment(userDatabase.dbHost)) {
return callback(null, userDatabase);
}
// host is busy, enqueue job again!
this.userDatabaseQueue.enqueue(userDatabaseName, function (err) {
if (err) {
return callback(err);
}
callback();
});
});
});
};
module.exports = DatabaseDequeuer;

View File

@ -1,30 +1,10 @@
'use strict';
var BatchLauncher = require('./batch_launcher');
var BatchManager = require('./batch_manager');
var JobDequeuer = require('./job_dequeuer');
var QueryRunner = require('./query_runner');
var DatabaseDequeuer = require('./database_dequeuer');
var UserDatabaseQueue = require('./user_database_queue');
var cartoDBRedis = require('cartodb-redis');
var JobCounter = require('./job_counter');
var batchManagerFactory = require('./batch_manager_factory');
module.exports = function (interval, maxJobsPerHost) {
var jobCounter = new JobCounter(maxJobsPerHost);
var metadataBackend = cartoDBRedis({
host: global.settings.redis_host,
port: global.settings.redis_port,
max: global.settings.redisPool,
idleTimeoutMillis: global.settings.redisIdleTimeoutMillis,
reapIntervalMillis: global.settings.redisReapIntervalMillis
});
var userDatabaseQueue = new UserDatabaseQueue(metadataBackend);
var databaseDequeuer = new DatabaseDequeuer(userDatabaseQueue, metadataBackend, jobCounter);
var queryRunner = new QueryRunner();
var jobDequeuer = new JobDequeuer(databaseDequeuer);
var batchManager = new BatchManager(jobDequeuer, queryRunner);
var batchManager = batchManagerFactory(maxJobsPerHost);
var batchLauncher = new BatchLauncher(batchManager);
// here we go!

View File

@ -1,25 +0,0 @@
'use strict';
function JobsCounter(maxJobsPerIntance, metadataBackend) {
this.metadataBackend = metadataBackend;
this.maxJobsPerIntance = maxJobsPerIntance || global.settings.max_jobs_per_instance;
this.hosts = {};
}
JobsCounter.prototype.increment = function (host) {
if (this[host] < this.maxJobsPerHost) {
this[host] += 1;
return true;
}
return false;
};
JobsCounter.prototype.decrement = function (host) {
if (this[host] > 0) {
this[host] -= 1;
return true;
}
return false;
};
module.exports = JobsCounter;

View File

@ -0,0 +1,53 @@
'use strict';
function JobCounterService(maxJobsPerHost, metadataBackend) {
this.metadataBackend = metadataBackend;
this.maxJobsPerHost = maxJobsPerHost || global.settings.max_jobs_per_instance;
this.db = 5;
}
JobCounterService.prototype.increment = function (host, callback) {
var self = this;
var db = this.db;
this.metadataBackend.redisCmd(db, 'GET', [host], function (err, hostCounter) {
if (err) {
return callback(err);
}
if (hostCounter >= self.maxJobsPerHost) {
return callback(new Error('Limit max job per host is reached: %s jobs', hostCounter));
}
self.metadataBackend.redisCmd(db, 'INCR', [host], function (err /*, hostCounter */) {
if (err) {
return callback(err);
}
callback();
});
});
};
JobCounterService.prototype.decrement = function (host, callback) {
var self = this;
var db = this.db;
this.metadataBackend.redisCmd(db, 'GET', [host], function (err, hostCounter) {
if (err) {
return callback(err);
}
if (hostCounter < 0) {
return callback(new Error('Limit max job per host is reached'));
}
self.metadataBackend.redisCmd(db, 'DECR', [host], function (err /*, hostCounter */) {
if (err) {
return callback(err);
}
callback();
});
});
};
module.exports = JobCounterService;

View File

@ -1,36 +0,0 @@
'use strict';
var PSQL = require('cartodb-psql');
function JobDequeuer(databaseDequeuer) {
this.databaseDequeuer = databaseDequeuer;
}
JobDequeuer.prototype.dequeue = function (callback) {
this.databaseDequeuer.dequeue(function (err, userDatabase) {
if (err) {
return callback(err);
}
if (!userDatabase) {
return callback();
}
var pg = new PSQL(userDatabase, {}, { destroyOnError: true });
var nextQuery = "select * from cdb_jobs where status='pending' order by updated_at asc limit 1";
pg.query(nextQuery, function (err, job) {
if (err) {
return callback(err);
}
callback(null, pg, job, userDatabase.host);
});
});
};
module.exports = JobDequeuer;

124
batch/job_service.js Normal file
View File

@ -0,0 +1,124 @@
'use strict';
var PSQL = require('cartodb-psql');
function JobService() {
}
JobService.prototype.run = function (userDatabaseMetada, callback) {
var self = this;
var pg = new PSQL(userDatabaseMetada, {}, { destroyOnError: true });
this.getJob(pg, function (err, job) {
if (err) {
return callback(err);
}
self.setJobRunning(pg, job, function (err) {
if (err) {
return callback(err);
}
self.runJob(pg, job.query, function (err, jobResult) {
if (err) {
self.setJobFailed(pg, job, err.message, function (err) {
if (err) {
return callback(err);
}
callback(null, jobResult);
});
} else {
self.setJobDone(pg, job, function (err) {
if (err) {
return callback(err);
}
console.info('Job %s done successfully', job.job_id);
callback(null, jobResult);
});
}
});
});
});
};
JobService.prototype.runJob = function (pg, jobQuery, callback) {
// TODO: wrap select query with select into
pg.query(jobQuery, function (err, jobResult) {
if (err) {
return callback(err);
}
callback(null, jobResult);
});
};
JobService.prototype.setJobRunning = function (pg, job, callback) {
var runningJobQuery = [
'UPDATE cdb_jobs SET ',
'status = \'running\', ',
'updated_at = now() ',
' WHERE ',
'job_id = \'' + job.job_id + '\' ',
' RETURNING job_id;'
].join('\n');
pg.query(runningJobQuery, function (err, result) {
if (err) {
return callback(err);
}
callback(null, result);
});
};
JobService.prototype.setJobDone = function (pg, job, callback) {
var doneJobQuery = [
'UPDATE cdb_jobs SET ',
'status = \'done\', ',
'updated_at = now() ',
' WHERE ',
'job_id = \'' + job.job_id + '\' ',
' RETURNING job_id;'
].join('\n');
pg.query(doneJobQuery, function (err, result) {
if (err) {
return callback(err);
}
callback(null, result);
});
};
JobService.prototype.setJobFailed = function (pg, job, message, callback) {
var failedJobQuery = [
'UPDATE cdb_jobs SET ',
'status = \'failed\', ',
'failed_reason = \'' + message + '\', ',
'updated_at = now() ',
' WHERE ',
'job_id = \'' + job.job_id + '\' ',
' RETURNING job_id;'
].join('\n');
pg.query(failedJobQuery, function (err, result) {
if (err) {
return callback(err);
}
callback(null, result);
});
};
JobService.prototype.getJob = function (pg, callback) {
var getNextJob = "SELECT * FROM cdb_jobs WHERE status='pending' ORDER BY updated_at ASC LIMIT 1";
pg.query(getNextJob, function (err, result) {
if (err) {
return callback(err);
}
callback(null, result.rows[0]);
});
};
module.exports = JobService;

View File

@ -1,100 +0,0 @@
'use strict';
function QueryRunner() {
}
QueryRunner.prototype.run = function (pg, job, callback) {
var self = this;
console.log('QueryRunner.run');
this.setJobRunning(pg, job, function (err) {
if (err) {
return callback(err);
}
self.job(pg, job.query, function (err, jobResult) {
if (err) {
self.setJobFailed(err, pg, job, function (err) {
if (err) {
return callback(err);
}
callback(null, jobResult);
});
} else {
self.setJobDone(pg, job, function (err) {
if (err) {
return callback(err);
}
callback(null, jobResult);
});
}
});
});
};
QueryRunner.prototype.job = function (pg, jobQuery, callback) {
// TODO: wrap select query with select into
pg(jobQuery, function (err, jobResult) {
if (err) {
return callback(err);
}
callback(null, jobResult);
});
};
QueryRunner.prototype.setJobRunning = function (pg, job, callback) {
var runningJobQuery = [
'UPDATE cdb_jobs SET ',
'status = \'running\'',
'updated_at = ' + Date.now(),
' WHERE ',
'job_id = \'' + job.job_id + '\', ',
') RETURNING job_id;'
].join('\n');
pg(runningJobQuery, function (err, result) {
if (err) {
return callback(err);
}
callback(null, result);
});
};
QueryRunner.prototype.setJobDone = function (pg, job, callback) {
var doneJobQuery = [
'UPDATE cdb_jobs SET ',
'status = \'done\'',
'updated_at = ' + Date.now(),
' WHERE ',
'job_id = \'' + job.job_id + '\', ',
') RETURNING job_id;'
].join('\n');
pg(doneJobQuery, function (err, result) {
if (err) {
return callback(err);
}
callback(null, result);
});
};
QueryRunner.prototype.setJobFailed = function (err, pg, job, callback) {
var failedJobQuery = [
'UPDATE cdb_jobs SET ',
'status = \'failed\'',
'failed_reason = \'' + err.message + '\'',
'updated_at = ' + Date.now(),
' WHERE ',
'job_id = \'' + job.job_id + '\', ',
') RETURNING job_id;'
].join('\n');
pg(failedJobQuery, function (err, result) {
if (err) {
return callback(err);
}
callback(null, result);
});
};
module.exports = QueryRunner;

View File

@ -0,0 +1,17 @@
'use strict';
function UserDatabaseMetadataService(metadataBackend) {
this.metadataBackend = metadataBackend;
}
UserDatabaseMetadataService.prototype.getUserMetadata = function (username, callback) {
this.metadataBackend.getAllUserDBParams(username, function (err, userDatabaseMetadata) {
if (err) {
return callback(err);
}
callback(null, userDatabaseMetadata);
});
};
module.exports = UserDatabaseMetadataService;

View File

@ -1,13 +1,13 @@
'use strict';
function UsernameBatchQueue(metadataBackend) {
function UsernameQueue(metadataBackend) {
this.metadataBackend = metadataBackend;
this.db = 5;
this.queueName = 'usernameBatchQueue';
}
UsernameBatchQueue.prototype.enqueue = function (cdbUsername, callback) {
UsernameQueue.prototype.enqueue = function (cdbUsername, callback) {
var db = this.db;
var queue = this.queueName;
@ -20,7 +20,7 @@ UsernameBatchQueue.prototype.enqueue = function (cdbUsername, callback) {
});
};
UsernameBatchQueue.prototype.dequeue = function (callback) {
UsernameQueue.prototype.dequeue = function (callback) {
var db = this.db;
var queue = this.queueName;
@ -33,4 +33,4 @@ UsernameBatchQueue.prototype.dequeue = function (callback) {
});
};
module.exports = UsernameBatchQueue;
module.exports = UsernameQueue;

View File

@ -1,8 +1,10 @@
var batch = require('../../batch');
var batchManagerFactory = require('../../batch/batch_manager_factory');
describe('batch service', function() {
it.skip('run', function() {
batch(1, 1);
describe('batch manager', function() {
it('run', function (done) {
batchManagerFactory().run(function (err) {
done(err);
});
});
});

View File

@ -133,8 +133,11 @@ HMSET rails:oauth_access_tokens:l0lPbtP68ao8NfStCiA3V3neqfM03JKhToxhUQTR \
time sometime
EOF
# insert in username queue for testin jobs
cat <<EOF | redis-cli -p ${REDIS_PORT} -n 5
LPUSH usernameBatchQueue vizzuality
EOF
fi
echo "ok, you can run test now"

View File

@ -35,6 +35,8 @@ CREATE TABLE cdb_jobs (
ALTER TABLE ONLY cdb_jobs ADD CONSTRAINT cdb_jobs_pkey PRIMARY KEY (job_id);
CREATE INDEX cdb_jobs_idx ON cdb_jobs (created_at, status);
INSERT INTO cdb_jobs (user_id, query) VALUES ('vizzuality', 'select * from private_table') RETURNING job_id;
-- first table
DROP TABLE IF EXISTS untitle_table_4;
CREATE TABLE untitle_table_4 (