Now jobs are stored in redis instead of user's database

This commit is contained in:
Daniel García Aubert 2015-12-21 19:57:10 +01:00
parent fb8feeb964
commit 3762ad7a39
12 changed files with 224 additions and 301 deletions

View File

@ -3,37 +3,40 @@
var _ = require('underscore');
var step = require('step');
var assert = require('assert');
var PSQL = require('cartodb-psql');
var UserDatabaseService = require('../services/user_database_service');
var JobPublisher = require('../../batch/job_publisher');
var JobQueueProducer = require('../../batch/job_queue_producer');
var Job = require('../../batch/job');
var JobBackend = require('../../batch/job_backend');
var CdbRequest = require('../models/cartodb_request');
var handleException = require('../utils/error_handler');
var cdbReq = new CdbRequest();
var userDatabaseService = new UserDatabaseService();
var jobPublisher = new JobPublisher();
var job = new Job();
function JobController(metadataBackend, tableCache, statsd_client) {
this.metadataBackend = metadataBackend;
this.tableCache = tableCache;
this.statsd_client = statsd_client;
this.jobQueueProducer = new JobQueueProducer(metadataBackend);
this.jobBackend = new JobBackend(metadataBackend);
}
JobController.prototype.route = function (app) {
app.all(global.settings.base_url + '/job', this.handleJob.bind(this));
app.post(global.settings.base_url + '/job', this.createJob.bind(this));
// app.get(global.settings.base_url + '/job:jobId', this.getJob.bind(this));
};
// JobController.prototype.getJob = function (req, res) {
//
// };
// jshint maxcomplexity:21
JobController.prototype.handleJob = function (req, res) {
JobController.prototype.createJob = function (req, res) {
var self = this;
var body = (req.body) ? req.body : {};
var params = _.extend({}, req.query, body); // clone so don't modify req.params or req.body so oauth is not broken
var sql = (params.q === "" || _.isUndefined(params.q)) ? null : params.q;
var sql = (params.query === "" || _.isUndefined(params.query)) ? null : params.query;
var cdbUsername = cdbReq.userByReq(req);
if (!_.isString(sql)) {
@ -62,8 +65,6 @@ JobController.prototype.handleJob = function (req, res) {
}
}
var pg;
if ( req.profiler ) {
req.profiler.done('init');
}
@ -90,9 +91,7 @@ JobController.prototype.handleJob = function (req, res) {
req.profiler.done('setDBAuth');
}
pg = new PSQL(userDatabase, {}, { destroyOnError: true });
job.createJob(pg, cdbUsername, sql, function (err, result) {
self.jobBackend.create(cdbUsername, sql, function (err, result) {
if (err) {
return next(err);
}
@ -103,12 +102,12 @@ JobController.prototype.handleJob = function (req, res) {
});
});
},
function enqueueUserDatabase(err, result) {
function enqueueJob(err, result) {
assert.ifError(err);
var next = this;
self.jobQueueProducer.enqueue(cdbUsername, result.userDatabase.host, function (err) {
self.jobQueueProducer.enqueue(result.job.jobId, result.userDatabase.host, function (err) {
if (err) {
return next(err);
}

View File

@ -1,23 +1,26 @@
'use strict';
var Job = require('./job');
var JobRunner = require('./job_runner');
var JobQueuePool = require('./job_queue_pool');
var JobQueueConsumer = require('./job_queue_consumer');
var JobSubscriber = require('./job_subscriber');
var UserDatabaseMetadataService = require('./user_database_metadata_service');
var JobService = require('./job_service');
var EventEmitter = require('events').EventEmitter;
module.exports = function batch(metadataBackend) {
var jobQueuePool = new JobQueuePool();
var jobSubscriber = new JobSubscriber();
var job = new Job();
var userDatabaseMetadataService = new UserDatabaseMetadataService(metadataBackend);
var jobService = new JobService(userDatabaseMetadataService, job);
var jobRunner = new JobRunner(metadataBackend, userDatabaseMetadataService);
var eventEmitter = global.settings.environment === 'test' ? new EventEmitter() : {
emit: function () {}
};
// subscribe to message exchange broker in order to know what queues are available
jobSubscriber.subscribe(function onMessage(channel, host) {
var jobQueueConsumer = jobQueuePool.get(host);
// if queue consumer is not registered in batch service
// if queue consumer is not registered yet
if (!jobQueueConsumer) {
// creates new one
@ -27,19 +30,25 @@ module.exports = function batch(metadataBackend) {
jobQueuePool.add(host, jobQueueConsumer);
// while read from queue then perform job
jobQueueConsumer.on('data', function (username) {
jobQueueConsumer.on('data', function (jobId) {
// limit one job at the same time per queue (queue <1:1> db intance)
jobQueueConsumer.pause();
jobService.run(username, function (err) {
if (err) {
console.error(err.stack);
}
var job = jobRunner.run(jobId);
job.on('done', function () {
// next job
eventEmitter.emit('job:done', jobId);
jobQueueConsumer.resume();
});
job.on('error', function (err) {
console.error(err.stack || err);
eventEmitter.emit('job:failed', jobId);
jobQueueConsumer.resume();
});
})
.on('error', function (err) {
console.error(err.stack || err);
@ -47,5 +56,5 @@ module.exports = function batch(metadataBackend) {
}
});
return job;
return eventEmitter;
};

View File

@ -1,121 +0,0 @@
'use strict';
var PSQL = require('cartodb-psql');
var util = require('util');
var EventEmitter = require('events').EventEmitter;
function Job() {
EventEmitter.call(this);
}
util.inherits(Job, EventEmitter);
Job.prototype.run = function (userDatabaseMetada, callback) {
var self = this;
var pg = new PSQL(userDatabaseMetada, {}, { destroyOnError: true });
this.getJob(pg, function (err, job) {
if (err) {
return callback(err);
}
if (!job) {
return callback();
}
self.setJobRunning(pg, job, function (err) {
if (err) {
return callback(err);
}
self.emit('job:running', job);
self.runJob(pg, job, function (err, jobResult) {
if (err) {
return self.setJobFailed(pg, job, err.message, function () {
self.emit('job:failed', { job: job.job_id, error: err });
callback(err);
});
}
self.setJobDone(pg, job, function () {
self.emit('job:done', job);
callback(null, jobResult);
});
});
});
});
};
Job.prototype.createJob = function (pg, username, sql, callback) {
var persistJobQuery = [
'INSERT INTO cdb_jobs (',
'user_id, query',
') VALUES (',
'\'' + username + '\', ',
'\'' + sql + '\' ',
') RETURNING job_id;'
].join('\n');
pg.query(persistJobQuery, callback);
};
Job.prototype.getJob = function (pg, callback) {
var getNextJob = "SELECT * FROM cdb_jobs WHERE status='pending' ORDER BY updated_at ASC LIMIT 1";
pg.query(getNextJob, function (err, result) {
if (err) {
return callback(err);
}
callback(null, result.rows[0]);
});
};
Job.prototype.runJob = function (pg, job, callback) {
var query = job.query;
pg.query(query, callback);
};
Job.prototype.setJobRunning = function (pg, job, callback) {
var runningJobQuery = [
'UPDATE cdb_jobs SET ',
'status = \'running\', ',
'updated_at = now() ',
' WHERE ',
'job_id = \'' + job.job_id + '\' ',
' RETURNING job_id;'
].join('\n');
pg.query(runningJobQuery, callback);
};
Job.prototype.setJobDone = function (pg, job, callback) {
var doneJobQuery = [
'UPDATE cdb_jobs SET ',
'status = \'done\', ',
'updated_at = now() ',
' WHERE ',
'job_id = \'' + job.job_id + '\' ',
' RETURNING job_id;'
].join('\n');
pg.query(doneJobQuery, callback);
};
Job.prototype.setJobFailed = function (pg, job, message, callback) {
var failedJobQuery = [
'UPDATE cdb_jobs SET ',
'status = \'failed\', ',
'failed_reason = \'' + message + '\', ',
'updated_at = now() ',
' WHERE ',
'job_id = \'' + job.job_id + '\' ',
' RETURNING job_id;'
].join('\n');
pg.query(failedJobQuery, callback);
};
module.exports = Job;

117
batch/job_backend.js Normal file
View File

@ -0,0 +1,117 @@
'use strict';
var util = require('util');
var EventEmitter = require('events').EventEmitter;
var uuid = require('node-uuid');
function JobBackend(metadataBackend) {
EventEmitter.call(this);
this.metadataBackend = metadataBackend;
this.db = 5;
}
util.inherits(JobBackend, EventEmitter);
JobBackend.prototype.create = function (username, sql, callback) {
var self = this;
var jobId = uuid.v4();
var redisParams = [
jobId,
'user', username,
'status', 'pending',
'query', sql,
'created_at', Date.now(),
'updated_at', Date.now()
];
this.metadataBackend.redisCmd(this.db, 'HMSET', redisParams , function (err) {
if (err) {
return callback(err);
}
self.get(jobId, callback);
});
};
JobBackend.prototype.get = function (jobId, callback) {
var redisParams = [
jobId,
'user',
'status',
'query',
'created_at',
'updated_at'
];
this.metadataBackend.redisCmd(this.db, 'HMGET', redisParams , function (err, jobValues) {
if (err) {
return callback(err);
}
if (!jobValues) {
return callback(new Error('Job not found'));
}
callback(null, {
jobId: jobId,
user: jobValues[0],
status: jobValues[1],
query: jobValues[2],
created_at: jobValues[3],
updated_at: jobValues[4]
});
});
};
JobBackend.prototype.setRunning = function (job) {
var self = this;
var redisParams = [
job.jobId,
'status', 'running',
'updated_at', Date.now()
];
this.metadataBackend.redisCmd(this.db, 'HMSET', redisParams, function (err) {
if (err) {
return self.emit('error', err);
}
self.emit('running', job);
});
};
JobBackend.prototype.setDone = function (job) {
var self = this;
var redisParams = [
job.jobId,
'status', 'done',
'updated_at', Date.now()
];
this.metadataBackend.redisCmd(this.db, 'HMSET', redisParams , function (err) {
if (err) {
return self.emit('error', err);
}
self.emit('done', job);
});
};
JobBackend.prototype.setFailed = function (job, err) {
var self = this;
var redisParams = [
job.jobId,
'status', 'failed',
'failed_reason', err.message,
'updated_at', Date.now()
];
this.metadataBackend.redisCmd(this.db, 'HMSET', redisParams , function (err) {
if (err) {
return self.emit('error', err);
}
self.emit('failed', job);
});
};
module.exports = JobBackend;

View File

@ -16,12 +16,12 @@ util.inherits(JobQueueConsumer, Readable);
JobQueueConsumer.prototype._read = function () {
var self = this;
this.metadataBackend.redisCmd(this.db, 'RPOP', [ this.queueName ], function (err, username) {
this.metadataBackend.redisCmd(this.db, 'RPOP', [ this.queueName ], function (err, jobId) {
if (err) {
return self.emit('error', err);
}
self.push(username);
self.push(jobId);
});
};

View File

@ -5,11 +5,11 @@ function JobQueueProducer(metadataBackend) {
this.db = 5;
}
JobQueueProducer.prototype.enqueue = function (cdbUsername, host, callback) {
JobQueueProducer.prototype.enqueue = function (jobId, host, callback) {
var db = this.db;
var queue = 'queue:' + host;
this.metadataBackend.redisCmd(db, 'LPUSH', [queue, cdbUsername], callback);
this.metadataBackend.redisCmd(db, 'LPUSH', [queue, jobId], callback);
};
module.exports = JobQueueProducer;

45
batch/job_runner.js Normal file
View File

@ -0,0 +1,45 @@
'use strict';
var JobBackend = require('./job_backend');
var PSQL = require('cartodb-psql');
function JobRunner(metadataBackend, userDatabaseMetadataService) {
this.metadataBackend = metadataBackend;
this.userDatabaseMetadataService = userDatabaseMetadataService;
}
JobRunner.prototype.run = function (jobId) {
var self = this;
var jobBackend = new JobBackend(this.metadataBackend);
jobBackend.get(jobId, function (err, job) {
if (err) {
return jobBackend.emit('error', err);
}
self.userDatabaseMetadataService.getUserMetadata(job.user, function (err, userDatabaseMetadata) {
if (err) {
return jobBackend.emit('error', err);
}
var pg = new PSQL(userDatabaseMetadata, {}, { destroyOnError: true });
jobBackend.setRunning(job);
pg.eventedQuery(job.query, function (err, query) {
query.on('error', function (err) {
jobBackend.setFailed(job, err);
});
query.on('end', function () {
jobBackend.setDone(job);
});
});
});
});
return jobBackend;
};
module.exports = JobRunner;

View File

@ -1,20 +0,0 @@
'use strict';
function JobService(userDatabaseMetadataService, job) {
this.userDatabaseMetadataService = userDatabaseMetadataService;
this.job = job;
}
JobService.prototype.run = function (username, callback) {
var self = this;
this.userDatabaseMetadataService.getUserMetadata(username, function (err, userDatabaseMetadata) {
if (err) {
return callback(err);
}
self.job.run(userDatabaseMetadata, callback);
});
};
module.exports = JobService;

View File

@ -22,6 +22,7 @@
"log4js": "https://github.com/CartoDB/log4js-node/tarball/cdb",
"lru-cache": "~2.5.0",
"node-statsd": "~0.0.7",
"node-uuid": "^1.4.7",
"oauth-client": "0.3.0",
"redis": "^2.4.2",
"rollbar": "~0.3.2",

View File

@ -1,10 +1,9 @@
var assert = require('assert');
var batchFactory = require('../../batch/');
var batch = require('../../batch/');
var JobPublisher = require('../../batch/job_publisher');
var JobQueueProducer = require('../../batch/job_queue_producer');
var Job = require('../../batch/job');
var JobBackend = require('../../batch/job_backend');
var UserDatabaseMetadataService = require('../../batch/user_database_metadata_service');
var PSQL = require('cartodb-psql');
var metadataBackend = require('cartodb-redis')({
host: global.settings.redis_host,
port: global.settings.redis_port,
@ -18,9 +17,9 @@ describe('batch', function() {
it('should perform one job', function (done) {
var jobQueueProducer = new JobQueueProducer(metadataBackend);
var jobPublisher = new JobPublisher();
var job = new Job();
var jobBackend = new JobBackend(metadataBackend);
var username = 'vizzuality';
var jobId = '';
var _jobId = '';
var userDatabaseMetadataService = new UserDatabaseMetadataService(metadataBackend);
@ -29,17 +28,17 @@ describe('batch', function() {
return done(err);
}
var pg = new PSQL(userDatabaseMetadata, {}, { destroyOnError: true });
var sql = "select * from private_table limit 1";
job.createJob(pg, username, sql, function (err, result) {
// create job in redis
jobBackend.create(username, sql, function (err, job) {
if (err) {
return done(err);
}
jobId = result.rows[0].job_id;
_jobId = job.jobId;
jobQueueProducer.enqueue(username, userDatabaseMetadata.host, function (err) {
jobQueueProducer.enqueue(job.jobId, userDatabaseMetadata.host, function (err) {
if (err) {
return done(err);
}
@ -49,11 +48,10 @@ describe('batch', function() {
});
});
var batch = batchFactory(metadataBackend);
batch.on('job:done', function (job) {
assert.equal(jobId, job.job_id);
done();
});
batch(metadataBackend)
.on('job:done', function (jobId) {
assert.equal(_jobId, jobId);
done();
});
});
});

View File

@ -18,112 +18,26 @@ var app = require(global.settings.app_root + '/app/app')();
var assert = require('../support/assert');
var querystring = require('querystring');
describe('job.test', function() {
describe('job', function() {
it('GET /api/v2/job', function (done){
it('POST /api/v2/job', function (done){
assert.response(app, {
url: '/api/v2/job',
headers: { host: 'vizzuality.cartodb.com' },
method: 'GET'
headers: { 'host': 'vizzuality.cartodb.com', 'Content-Type': 'application/x-www-form-urlencoded' },
method: 'POST',
data: querystring.stringify({
query: "SELECT * FROM untitle_table_4"
})
}, {
status: 400
status: 200
}, function(res) {
var job = JSON.parse(res.body);
assert.deepEqual(res.headers['content-type'], 'application/json; charset=utf-8');
assert.deepEqual(res.headers['content-disposition'], 'inline');
assert.deepEqual(JSON.parse(res.body), {"error":["You must indicate a sql query"]});
assert.equal(job.query, "SELECT * FROM untitle_table_4");
assert.equal(job.status, "pending");
assert.equal(job.user, "vizzuality");
done();
});
});
it('GET /api/v2/job with SQL parameter on SELECT no database param,just id using headers', function(done){
assert.response(app, {
url: '/api/v2/job?' + querystring.stringify({ q: "SELECT * FROM untitle_table_4" }),
headers: { host: 'vizzuality.cartodb.com' },
method: 'GET'
}, {
}, function (res) {
assert.equal(res.statusCode, 200, res.body);
done();
});
});
it('GET job status via /api/v2/sql with SQL parameter. no database param, just id using headers. Authenticated.',
function(done){
assert.response(app, {
url: '/api/v2/job?' + querystring.stringify({ q: "SELECT * FROM untitle_table_4" }),
headers: { host: 'vizzuality.cartodb.com' },
method: 'GET'
}, {
}, function (res) {
var job_id = JSON.parse(res.body).rows[0].job_id;
assert.equal(res.statusCode, 200, res.body);
assert.response(app, {
url: '/api/v2/sql?&api_key=1234&' + querystring.stringify({
q: 'SELECT status FROM cdb_jobs WHERE job_id=\'' + job_id + '\''
}),
headers: {host: 'vizzuality.cartodb.com'},
method: 'GET'
},{ }, function(res) {
assert.equal(res.statusCode, 200, res.body);
assert.equal(JSON.parse(res.body).rows[0].status, 'pending');
done();
});
});
});
it('UPDATE job query via /api/v2/sql with SQL parameter. no database param, just id using headers. Authenticated.',
function (done) {
assert.response(app, {
url: '/api/v2/job?' + querystring.stringify({ q: "SELECT * FROM untitle_table_4" }),
headers: { host: 'vizzuality.cartodb.com' },
method: 'GET'
}, {
}, function (res) {
assert.equal(res.statusCode, 200, res.body);
var job_id = JSON.parse(res.body).rows[0].job_id;
var updated_query = 'SELECT cartodb_id FROM untitle_table_4';
assert.response(app, {
url: '/api/v2/sql??api_key=1234&' + querystring.stringify({
q: "UPDATE cdb_jobs SET query='" + updated_query + "' WHERE job_id = '" + job_id +
"' RETURNING job_id, query"
}),
headers: {host: 'vizzuality.cartodb.com'},
method: 'GET'
},{ }, function(res) {
assert.equal(res.statusCode, 200, res.body);
assert.equal(updated_query, JSON.parse(res.body).rows[0].query);
done();
});
});
});
it('DELETE job query via /api/v2/sql with SQL parameter. no database param, just id using headers. Authenticated.',
function (done) {
assert.response(app, {
url: '/api/v2/job?' + querystring.stringify({ q: "SELECT * FROM untitle_table_4" }),
headers: { host: 'vizzuality.cartodb.com' },
method: 'GET'
}, {
}, function (res) {
assert.equal(res.statusCode, 200, res.body);
var job_id = JSON.parse(res.body).rows[0].job_id;
var deleted_query = 'SELECT * FROM untitle_table_4';
assert.response(app, {
url: '/api/v2/sql??api_key=1234&' + querystring.stringify({
q: "DELETE FROM cdb_jobs WHERE job_id = '" + job_id +
"' RETURNING job_id, query"
}),
headers: {host: 'vizzuality.cartodb.com'},
method: 'GET'
},{ }, function(res) {
assert.equal(res.statusCode, 200, res.body);
assert.equal(deleted_query, JSON.parse(res.body).rows[0].query);
assert.equal(job_id, JSON.parse(res.body).rows[0].job_id);
done();
});
});
});
});

View File

@ -20,23 +20,6 @@ SET search_path = public, pg_catalog;
SET default_tablespace = '';
SET default_with_oids = false;
-- jobs table
DROP TABLE IF EXISTS cdb_jobs;
CREATE TABLE cdb_jobs (
job_id uuid DEFAULT uuid_generate_v4(),
user_id character varying,
status character varying DEFAULT 'pending',
query character varying,
updated_at timestamp without time zone DEFAULT now(),
created_at timestamp without time zone DEFAULT now(),
failed_reason character varying
);
ALTER TABLE ONLY cdb_jobs ADD CONSTRAINT cdb_jobs_pkey PRIMARY KEY (job_id);
CREATE INDEX cdb_jobs_idx ON cdb_jobs (created_at, status);
-- INSERT INTO cdb_jobs (user_id, query) VALUES ('vizzuality', 'select * from private_table') RETURNING job_id;
-- first table
DROP TABLE IF EXISTS untitle_table_4;
CREATE TABLE untitle_table_4 (
@ -136,8 +119,6 @@ ALTER ROLE :PUBLICUSER SET statement_timeout = 2000;
DROP USER IF EXISTS :TESTUSER;
CREATE USER :TESTUSER WITH PASSWORD ':TESTPASS';
GRANT ALL ON TABLE cdb_jobs TO :TESTUSER;
GRANT ALL ON TABLE cdb_jobs TO :PUBLICUSER;
GRANT ALL ON TABLE untitle_table_4 TO :TESTUSER;
GRANT SELECT ON TABLE untitle_table_4 TO :PUBLICUSER;
GRANT ALL ON TABLE private_table TO :TESTUSER;