Merge pull request #359 from CartoDB/query-runner-improvements

Query runner improvements
This commit is contained in:
Raul Ochoa 2016-01-13 18:56:26 +01:00
commit 9e74e8633a
6 changed files with 166 additions and 70 deletions

View File

@ -13,26 +13,22 @@ module.exports = QueryTablesApi;
QueryTablesApi.prototype.getAffectedTablesInQuery = function (username, sql, callback) {
var query = 'SELECT CDB_QueryTablesText($windshaft$' + prepareSql(sql) + '$windshaft$)';
this.pgQueryRunner.run(username, query, handleAffectedTablesInQueryRows, callback);
this.pgQueryRunner.run(username, query, function handleAffectedTablesInQueryRows (err, rows) {
if (err){
var msg = err.message ? err.message : err;
callback(new Error('could not fetch source tables: ' + msg));
return;
}
// This is an Array, so no need to split into parts
var tableNames = rows[0].cdb_querytablestext;
return callback(null, tableNames);
});
};
function handleAffectedTablesInQueryRows(err, rows, callback) {
if (err){
var msg = err.message ? err.message : err;
callback(new Error('could not fetch source tables: ' + msg));
return;
}
// This is an Array, so no need to split into parts
var tableNames = rows[0].cdb_querytablestext;
callback(null, tableNames);
}
QueryTablesApi.prototype.getAffectedTablesAndLastUpdatedTime = function (username, sql, callback) {
var query = [
'WITH querytables AS (',
'SELECT * FROM CDB_QueryTablesText($windshaft$' + prepareSql(sql) + '$windshaft$) as tablenames',
@ -42,28 +38,26 @@ QueryTablesApi.prototype.getAffectedTablesAndLastUpdatedTime = function (usernam
'WHERE m.tabname = any ((SELECT tablenames from querytables)::regclass[])'
].join(' ');
this.pgQueryRunner.run(username, query, handleAffectedTablesAndLastUpdatedTimeRows, callback);
};
this.pgQueryRunner.run(username, query, function handleAffectedTablesAndLastUpdatedTimeRows (err, rows) {
if (err || rows.length === 0) {
var msg = err.message ? err.message : err;
callback(new Error('could not fetch affected tables or last updated time: ' + msg));
return;
}
function handleAffectedTablesAndLastUpdatedTimeRows(err, rows, callback) {
if (err || rows.length === 0) {
var msg = err.message ? err.message : err;
callback(new Error('could not fetch affected tables or last updated time: ' + msg));
return;
}
var result = rows[0];
var result = rows[0];
// This is an Array, so no need to split into parts
var tableNames = result.tablenames;
// This is an Array, so no need to split into parts
var tableNames = result.tablenames;
var lastUpdatedTime = result.max || 0;
var lastUpdatedTime = result.max || 0;
callback(null, {
affectedTables: tableNames,
lastUpdatedTime: lastUpdatedTime * 1000
callback(null, {
affectedTables: tableNames,
lastUpdatedTime: lastUpdatedTime * 1000
});
});
}
};
QueryTablesApi.prototype.getLastUpdatedTime = function (username, tableNames, callback) {
if (!Array.isArray(tableNames) || tableNames.length === 0) {
@ -77,23 +71,21 @@ QueryTablesApi.prototype.getLastUpdatedTime = function (username, tableNames, ca
'])'
].join(' ');
this.pgQueryRunner.run(username, query, handleLastUpdatedTimeRows, callback);
this.pgQueryRunner.run(username, query, function handleLastUpdatedTimeRows (err, rows) {
if (err) {
var msg = err.message ? err.message : err;
return callback(new Error('could not fetch affected tables or last updated time: ' + msg));
}
// when the table has not updated_at means it hasn't been changed so a default last_updated is set
var lastUpdated = 0;
if (rows.length !== 0) {
lastUpdated = rows[0].max || 0;
}
return callback(null, lastUpdated*1000);
});
};
function handleLastUpdatedTimeRows(err, rows, callback) {
if (err) {
var msg = err.message ? err.message : err;
return callback(new Error('could not fetch affected tables or last updated time: ' + msg));
}
// when the table has not updated_at means it hasn't been changed so a default last_updated is set
var lastUpdated = 0;
if (rows.length !== 0) {
lastUpdated = rows[0].max || 0;
}
return callback(null, lastUpdated*1000);
}
function prepareSql(sql) {
return sql
.replace(affectedTableRegexCache.bbox, 'ST_MakeEnvelope(0,0,0,0)')

View File

@ -35,19 +35,17 @@ TablesExtentApi.prototype.getBounds = function (username, tableNames, callback)
"FROM ext"
].join(' ');
this.pgQueryRunner.run(username, query, handleBoundsResult, callback);
this.pgQueryRunner.run(username, query, function handleBoundsResult (err, rows) {
if (err) {
var msg = err.message ? err.message : err;
return callback(new Error('could not fetch source tables: ' + msg));
}
var result = null;
if (rows.length > 0) {
result = {
bounds: rows[0]
};
}
callback(null, result);
});
};
function handleBoundsResult(err, rows, callback) {
if (err) {
var msg = err.message ? err.message : err;
return callback(new Error('could not fetch source tables: ' + msg));
}
var result = null;
if (rows.length > 0) {
result = {
bounds: rows[0]
};
}
callback(null, result);
}

View File

@ -8,8 +8,14 @@ function PgQueryRunner(pgConnection) {
module.exports = PgQueryRunner;
PgQueryRunner.prototype.run = function(username, query, queryHandler, callback) {
/**
* Runs `query` with `username`'s PostgreSQL role, callback receives error and rows array.
*
* @param {String} username
* @param {String} query
* @param {Function} callback function({Error}, {Array}) second argument is guaranteed to be an array
*/
PgQueryRunner.prototype.run = function(username, query, callback) {
var self = this;
var params = {};
@ -33,8 +39,7 @@ PgQueryRunner.prototype.run = function(username, query, queryHandler, callback)
});
psql.query(query, function(err, resultSet) {
resultSet = resultSet || {};
var rows = resultSet.rows || [];
queryHandler(err, rows, callback);
return callback(err, resultSet.rows || []);
});
}
);

View File

@ -310,8 +310,8 @@ describe('tests from old api translated to multilayer', function() {
it("creates layergroup fails when postgresql queries fail to figure affected tables in query", function(done) {
var runQueryFn = PgQueryRunner.prototype.run;
PgQueryRunner.prototype.run = function(username, query, queryHandler, callback) {
return queryHandler(new Error('fake error message'), [], callback);
PgQueryRunner.prototype.run = function(username, query, callback) {
return callback(new Error('fake error message'), []);
};
var layergroup = singleLayergroupConfig('select * from gadm4', '#gadm4 { marker-fill: red; }');
@ -365,8 +365,8 @@ describe('tests from old api translated to multilayer', function() {
keysToDelete['user:localhost:mapviews:global'] = 5;
var runQueryFn = PgQueryRunner.prototype.run;
PgQueryRunner.prototype.run = function(username, query, queryHandler, callback) {
return queryHandler(new Error('failed to query database for affected tables'), [], callback);
PgQueryRunner.prototype.run = function(username, query, callback) {
return callback(new Error('failed to query database for affected tables'), []);
};
// reset internal cacheChannel cache

View File

@ -0,0 +1,46 @@
require('../support/test_helper');
var assert = require('assert');
var RedisPool = require('redis-mpool');
var cartodbRedis = require('cartodb-redis');
var PgConnection = require('../../lib/cartodb/backends/pg_connection');
var PgQueryRunner = require('../../lib/cartodb/backends/pg_query_runner');
describe('PgQueryRunner', function() {
var queryRunner;
before(function() {
var redisPool = new RedisPool(global.environment.redis);
var metadataBackend = cartodbRedis({pool: redisPool});
var pgConnection = new PgConnection(metadataBackend);
queryRunner = new PgQueryRunner(pgConnection);
});
it('should work for happy case', function(done) {
var query = 'select cartodb_id from test_table limit 3';
queryRunner.run('localhost', query, function(err, result) {
assert.ok(!err, err);
assert.ok(Array.isArray(result));
assert.equal(result.length, 3);
done();
});
});
it('should receive rows array even on error', function(done) {
var query = 'select __error___ from test_table';
queryRunner.run('localhost', query, function(err, result) {
assert.ok(err);
assert.ok(Array.isArray(result));
assert.equal(result.length, 0);
done();
});
});
});

View File

@ -0,0 +1,55 @@
require('../support/test_helper');
var assert = require('assert');
var RedisPool = require('redis-mpool');
var cartodbRedis = require('cartodb-redis');
var PgConnection = require('../../lib/cartodb/backends/pg_connection');
var PgQueryRunner = require('../../lib/cartodb/backends/pg_query_runner');
var QueryTablesApi = require('../../lib/cartodb/api/query_tables_api');
describe('QueryTablesApi', function() {
var queryTablesApi;
before(function() {
var redisPool = new RedisPool(global.environment.redis);
var metadataBackend = cartodbRedis({pool: redisPool});
var pgConnection = new PgConnection(metadataBackend);
var pgQueryRunner = new PgQueryRunner(pgConnection);
queryTablesApi = new QueryTablesApi(pgQueryRunner);
});
// Check test/support/sql/windshaft.test.sql to understand where the values come from.
it('should return an object with affected tables array and last updated time', function(done) {
var query = 'select * from test_table';
queryTablesApi.getAffectedTablesAndLastUpdatedTime('localhost', query, function(err, result) {
assert.ok(!err, err);
assert.deepEqual(result, {
affectedTables: [ 'public.test_table' ],
lastUpdatedTime: 1234567890123
});
done();
});
});
it('should work with private tables', function(done) {
var query = 'select * from test_table_private_1';
queryTablesApi.getAffectedTablesAndLastUpdatedTime('localhost', query, function(err, result) {
assert.ok(!err, err);
assert.deepEqual(result, {
affectedTables: [ 'public.test_table_private_1' ],
lastUpdatedTime: 1234567890123
});
done();
});
});
});