Merge pull request #146 from rochoa/CDB-2081

Support for order_by through HTTP params
This commit is contained in:
Raul Ochoa 2014-04-14 16:24:54 +02:00
commit 41e3747998
6 changed files with 260 additions and 113 deletions

View File

@ -37,6 +37,7 @@ var express = require('express')
// global.settings.app_root + '/app/models/metadata') // global.settings.app_root + '/app/models/metadata')
, oAuth = require(global.settings.app_root + '/app/models/oauth') , oAuth = require(global.settings.app_root + '/app/models/oauth')
, PSQL = require(global.settings.app_root + '/app/models/psql') , PSQL = require(global.settings.app_root + '/app/models/psql')
, PSQLWrapper = require(global.settings.app_root + '/app/sql/psql_wrapper')
, CdbRequest = require(global.settings.app_root + '/app/models/cartodb_request') , CdbRequest = require(global.settings.app_root + '/app/models/cartodb_request')
, ApiKeyAuth = require(global.settings.app_root + '/app/models/apikey_auth') , ApiKeyAuth = require(global.settings.app_root + '/app/models/apikey_auth')
, _ = require('underscore') , _ = require('underscore')
@ -195,6 +196,8 @@ function handleQuery(req, res) {
var database = params.database; // TODO: Deprecate var database = params.database; // TODO: Deprecate
var limit = parseInt(params.rows_per_page); var limit = parseInt(params.rows_per_page);
var offset = parseInt(params.page); var offset = parseInt(params.page);
var orderBy = params.order_by;
var sortOrder = params.sort_order;
var requestedFormat = params.format; var requestedFormat = params.format;
var format = _.isArray(requestedFormat) ? _.last(requestedFormat) : requestedFormat; var format = _.isArray(requestedFormat) ? _.last(requestedFormat) : requestedFormat;
var requestedFilename = params.filename; var requestedFilename = params.filename;
@ -447,7 +450,7 @@ function handleQuery(req, res) {
checkAborted('generateFormat'); checkAborted('generateFormat');
// TODO: drop this, fix UI! // TODO: drop this, fix UI!
sql = PSQL.window_sql(sql,limit,offset); sql = new PSQLWrapper(sql).orderBy(orderBy, sortOrder).window(limit, offset).query();
var opts = { var opts = {
dbopts: dbopts, dbopts: dbopts,

View File

@ -1,6 +1,7 @@
var _ = require('underscore') var _ = require('underscore'),
, Step = require('step') PSQLWrapper = require('../sql/psql_wrapper'),
, pg = require('pg');//.native; // disabled for now due to: https://github.com/brianc/node-postgres/issues/48 Step = require('step'),
pg = require('pg');//.native; // disabled for now due to: https://github.com/brianc/node-postgres/issues/48
_.mixin(require('underscore.string')); _.mixin(require('underscore.string'));
// Max database connections in the pool // Max database connections in the pool
@ -261,81 +262,19 @@ var PSQL = function(dbopts) {
return me; return me;
}; };
// little hack for UI
// /**
// TODO:drop, fix in the UI (it's not documented in doc/API) * Little hack for UI
// * TODO: drop, fix in the UI (it's not documented in doc/API)
*
* @param {string} sql
* @param {number} limit
* @param {number} offset
* @returns {string} The wrapped SQL query with the limit window
*/
PSQL.window_sql = function(sql, limit, offset) { PSQL.window_sql = function(sql, limit, offset) {
// only window select functions (NOTE: "values" will be broken, "with" will be broken) // keeping it here for backwards compatibility
if (!_.isNumber(limit) || !_.isNumber(offset) ) return sql; return new PSQLWrapper(sql).window(limit, offset).query();
};
// Strip comments
sql = sql.replace(/(^|\n)\s*--.*\n/g, '');
var cte = '';
if ( sql.match(/^\s*WITH\s/i) ) {
var rem = sql; // analyzed portion of sql
var q; // quote char
var n = 0; // nested parens level
var s = 0; // 0:outQuote, 1:inQuote
var l;
while (1) {
l = rem.search(/[("')]/);
//console.log("REM Is " + rem);
if ( l < 0 ) {
console.log("Malformed SQL");
return sql;
}
var f = rem.charAt(l);
//console.log("n:" + n + " s:" + s + " l:" + l + " charAt(l):" + f + " charAt(l+1):" + rem.charAt(l+1));
if ( s == 0 ) {
if ( f == '(' ) ++n;
else if ( f == ')' ) {
if ( ! --n ) { // end of CTE
cte += rem.substr(0, l+1);
rem = rem.substr(l+1);
//console.log("Out of cte, rem is " + rem);
if ( rem.search(/^s*,/) < 0 ) break;
else continue; // cte and rem already updated
}
}
else { // enter quoting
s = 1; q = f;
}
}
else if ( f == q ) {
if ( rem.charAt(l+1) == f ) ++l; // escaped
else s = 0; // exit quoting
}
cte += rem.substr(0, l+1);
rem = rem.substr(l+1);
}
/*
console.log("cte: " + cte);
console.log("rem: " + rem);
*/
sql = rem; //sql.substr(l+1);
}
var re_SELECT = RegExp(/^\s*SELECT\s/i);
var re_INTO = RegExp(/\sINTO\s+([^\s]+|"([^"]|"")*")\s*$/i);
//console.log("SQL " + sql);
//console.log(" does " + ( sql.match(re_SELECT) ? '' : 'not ' ) + "match re_SELECT " + re_SELECT);
//console.log(" does " + ( sql.match(re_INTO) ? '' : 'not ' ) + "match re_INTO " + re_INTO);
if (
sql.match(re_SELECT) &&
! sql.match(re_INTO)
)
{
return cte + "SELECT * FROM (" + sql + ") AS cdbq_1 LIMIT " + limit + " OFFSET " + offset;
}
return cte + sql;
}
module.exports = PSQL; module.exports = PSQL;

132
app/sql/psql_wrapper.js Normal file
View File

@ -0,0 +1,132 @@
'use strict';
var _ = require('underscore'),
util = require('util'),
SORT_ORDER_OPTIONS = {ASC: 1, DESC: 1},
REGEX_SELECT = /^\s*SELECT\s/i,
REGEX_INTO = /\sINTO\s+([^\s]+|"([^"]|"")*")\s*$/i;
function PSQLWrapper(sql) {
this.sqlQuery = sql;
this.sqlClauses = {
orderBy: '',
limit: ''
};
}
/**
* Only window select functions (NOTE: "values" will be broken, "with" will be broken)
*
* @param {number} limit
* @param {number} offset
* @returns {PSQLWrapper}
*/
PSQLWrapper.prototype.window = function (limit, offset) {
if (!_.isNumber(limit) || !_.isNumber(offset)) {
return this;
}
this.sqlClauses.limit = util.format(' LIMIT %d OFFSET %d', limit, offset);
return this;
};
/**
*
* @param {string} column The name of the column to sort by
* @param {string} sortOrder Whether it's ASC or DESC ordering
* @returns {PSQLWrapper}
*/
PSQLWrapper.prototype.orderBy = function (column, sortOrder) {
if (!_.isString(column) || _.isEmpty(column)) {
return this;
}
this.sqlClauses.orderBy = util.format(' ORDER BY "%s"', column);
if (!_.isUndefined(sortOrder)) {
sortOrder = sortOrder.toUpperCase();
if (SORT_ORDER_OPTIONS[sortOrder]) {
this.sqlClauses.orderBy += util.format(' %s', sortOrder);
}
}
return this;
};
/**
* Builds an SQL query with extra clauses based on the builder calls.
*
* @returns {string} The SQL query with the extra clauses
*/
PSQLWrapper.prototype.query = function () {
if (_.isEmpty(this.sqlClauses.orderBy) && _.isEmpty(this.sqlClauses.limit)) {
return this.sqlQuery;
}
// Strip comments
this.sqlQuery = this.sqlQuery.replace(/(^|\n)\s*--.*\n/g, '');
var cte = '';
if (this.sqlQuery.match(/^\s*WITH\s/i)) {
var rem = this.sqlQuery, // analyzed portion of sql
q, // quote char
n = 0, // nested parens level
s = 0, // 0:outQuote, 1:inQuote
l;
while (1) {
l = rem.search(/[("')]/);
// console.log("REM Is " + rem);
if (l < 0) {
// console.log("Malformed SQL");
return this.sqlQuery;
}
var f = rem.charAt(l);
// console.log("n:" + n + " s:" + s + " l:" + l + " charAt(l):" + f + " charAt(l+1):" + rem.charAt(l+1));
if (s == 0) {
if (f == '(') {
++n;
} else if (f == ')') {
if (!--n) { // end of CTE
cte += rem.substr(0, l + 1);
rem = rem.substr(l + 1);
//console.log("Out of cte, rem is " + rem);
if (rem.search(/^s*,/) < 0) {
break;
} else {
continue; // cte and rem already updated
}
}
} else { // enter quoting
s = 1;
q = f;
}
} else if (f == q) {
if (rem.charAt(l + 1) == f) {
++l; // escaped
} else {
s = 0; // exit quoting
}
}
cte += rem.substr(0, l + 1);
rem = rem.substr(l + 1);
}
/*
console.log("cte: " + cte);
console.log("rem: " + rem);
*/
this.sqlQuery = rem; //sql.substr(l+1);
}
//console.log("SQL " + sql);
//console.log(" does " + ( sql.match(REGEX_SELECT) ? '' : 'not ' ) + "match REGEX_SELECT " + REGEX_SELECT);
//console.log(" does " + ( sql.match(REGEX_INTO) ? '' : 'not ' ) + "match REGEX_INTO " + REGEX_INTO);
if (this.sqlQuery.match(REGEX_SELECT) && !this.sqlQuery.match(REGEX_INTO)) {
return util.format(
'%sSELECT * FROM (%s) AS cdbq_1%s%s',
cte, this.sqlQuery, this.sqlClauses.orderBy, this.sqlClauses.limit
);
}
return cte + this.sqlQuery;
};
module.exports = PSQLWrapper;

View File

@ -43,6 +43,16 @@ Supported query string parameters:
used to specify which page to start returning rows from. used to specify which page to start returning rows from.
First page has index 0. First page has index 0.
'order_by':
Causes the result rows to be sorted according to the specified
case sensitive column name. See also 'sort_order'.
'sort_order':
Optional param combined with 'order_by' one. Values are limited
to ASC (ascending) and DESC (descending), case insensitive. If
not specified or wrongly specified, ASC is assumed by default.
Response formats Response formats
---------------- ----------------

View File

@ -86,40 +86,6 @@ test('test public user cannot execute INSERT on db', function(done){
}); });
}); });
test('Windowed SQL with simple select', function(){
// NOTE: intentionally mixed-case and space-padded
var sql = "\n \tSEleCT * from table1";
var out = PSQL.window_sql(sql, 1, 0);
assert.equal(out, "SELECT * FROM (" + sql + ") AS cdbq_1 LIMIT 1 OFFSET 0");
});
test('Windowed SQL with CTE select', function(){
// NOTE: intentionally mixed-case and space-padded
var cte = "\n \twiTh x as( update test set x=x+1)";
var select = "\n \tSEleCT * from x";
var sql = cte + select;
var out = PSQL.window_sql(sql, 1, 0);
assert.equal(out, cte + "SELECT * FROM (" + select + ") AS cdbq_1 LIMIT 1 OFFSET 0");
});
test('Windowed SQL with CTE update', function(){
// NOTE: intentionally mixed-case and space-padded
var cte = "\n \twiTh a as( update test set x=x+1)";
var upd = "\n \tupdate tost set y=x from x";
var sql = cte + upd;
var out = PSQL.window_sql(sql, 1, 0);
assert.equal(out, sql);
});
test('Windowed SQL with complex CTE and insane quoting', function(){
// NOTE: intentionally mixed-case and space-padded
var cte = "\n \twiTh \"('a\" as( update \"\"\"test)\" set x='x'+1), \")b(\" as ( select ')))\"' from z )";
var sel = "\n \tselect '\"' from x";
var sql = cte + sel;
var out = PSQL.window_sql(sql, 1, 0);
assert.equal(out, cte + "SELECT * FROM (" + sel + ") AS cdbq_1 LIMIT 1 OFFSET 0");
});
test('dbkey depends on dbopts', function(){ test('dbkey depends on dbopts', function(){
var opt1 = _.clone(dbopts_anon); var opt1 = _.clone(dbopts_anon);
opt1.dbname = 'dbname1'; opt1.dbname = 'dbname1';

View File

@ -0,0 +1,97 @@
var _ = require('underscore'),
PSQLWrapper = require('../../../app/sql/psql_wrapper'),
assert = require('assert');
// NOTE: intentionally mixed-case and space-padded
var simpleSql = "\n \tSEleCT * from table1";
suite('psql_wrapper', function() {
test('Windowed SQL with simple select', function(){
var out = new PSQLWrapper(simpleSql).window(1, 0).query();
assert.equal(out, "SELECT * FROM (" + simpleSql + ") AS cdbq_1 LIMIT 1 OFFSET 0");
});
test('Windowed SQL with CTE select', function(){
// NOTE: intentionally mixed-case and space-padded
var cte = "\n \twiTh x as( update test set x=x+1)";
var select = "\n \tSEleCT * from x";
var sql = cte + select;
var out = new PSQLWrapper(sql).window(1, 0).query();
assert.equal(out, cte + "SELECT * FROM (" + select + ") AS cdbq_1 LIMIT 1 OFFSET 0");
});
test('Windowed SQL with CTE update', function(){
// NOTE: intentionally mixed-case and space-padded
var cte = "\n \twiTh a as( update test set x=x+1)";
var upd = "\n \tupdate tost set y=x from x";
var sql = cte + upd;
var out = new PSQLWrapper(sql).window(1, 0).query();
assert.equal(out, sql);
});
test('Windowed SQL with complex CTE and insane quoting', function(){
// NOTE: intentionally mixed-case and space-padded
var cte = "\n \twiTh \"('a\" as( update \"\"\"test)\" set x='x'+1), \")b(\" as ( select ')))\"' from z )";
var sel = "\n \tselect '\"' from x";
var sql = cte + sel;
var out = new PSQLWrapper(sql).window(1, 0).query();
assert.equal(out, cte + "SELECT * FROM (" + sel + ") AS cdbq_1 LIMIT 1 OFFSET 0");
});
test('Different instances return different queries', function() {
var aWrapper = new PSQLWrapper('select 1');
var bWrapper = new PSQLWrapper('select * from databaseB');
assert.notEqual(aWrapper, bWrapper);
assert.notEqual(aWrapper.query(), bWrapper.query(), 'queries should be different');
});
test('Order by SQL with simple select and empty column name returns original query', function() {
var expectedSql = simpleSql;
var outputSql = new PSQLWrapper(simpleSql).orderBy('').query();
assert.equal(outputSql, expectedSql);
});
test('Order by SQL with simple select and no sort order', function() {
var expectedSql = 'SELECT * FROM (' + simpleSql + ') AS cdbq_1 ORDER BY "foo"';
var outputSql = new PSQLWrapper(simpleSql).orderBy('foo').query();
assert.equal(outputSql, expectedSql);
});
test('Order by SQL with simple select and invalid sort order use no sort order', function() {
var expectedSql = 'SELECT * FROM (' + simpleSql + ') AS cdbq_1 ORDER BY "foo"';
var outputSql = new PSQLWrapper(simpleSql).orderBy('foo', "BAD_SORT_ORDER").query();
assert.equal(outputSql, expectedSql);
});
test('Order by SQL with simple select and asc order', function() {
var expectedSql = 'SELECT * FROM (' + simpleSql + ') AS cdbq_1 ORDER BY "foo" ASC';
var outputSql = new PSQLWrapper(simpleSql).orderBy('foo', "asc").query();
assert.equal(outputSql, expectedSql);
});
test('Order by SQL with simple select and DESC order', function() {
var expectedSql = 'SELECT * FROM (' + simpleSql + ') AS cdbq_1 ORDER BY "foo" DESC';
var outputSql = new PSQLWrapper(simpleSql).orderBy('foo', "DESC").query();
assert.equal(outputSql, expectedSql);
});
});