CartoDB-SQL-API/app/models/psql.js
Sandro Santilli 5e379012a8 Add support for CTE in sql windowing, add unit tests
This is still an undocumented feature, but as long as it's present
and used (by cartodb UI) better tested than broken...

NOTE: more tests are needed for CTE and RETURNING queries
2013-06-17 16:36:36 +02:00

199 lines
5.9 KiB
JavaScript

var _ = require('underscore')
, Step = require('step')
, pg = require('pg');//.native; // disabled for now due to: https://github.com/brianc/node-postgres/issues/48
_.mixin(require('underscore.string'));
// Max database connections in the pool
// Subsequent connections will block waiting for a free slot
pg.defaults.poolSize = global.settings.db_pool_size || 16;
// Milliseconds of idle time before removing connection from pool
// TODO: make config setting ?
pg.defaults.poolIdleTimeout = global.settings.db_pool_idleTimeout || 30000;
// Frequency to check for idle clients within the pool, ms
// TODO: make config setting ?
pg.defaults.reapIntervalMillis = global.settings.db_pool_reapInterval || 1000;
pg.on('error', function(err, client) {
console.log("PostgreSQL connection error: " + err);
});
// PSQL
//
// A simple postgres wrapper with logic about username and database to connect
//
// * intended for use with pg_bouncer
// * defaults to connecting with a "READ ONLY" user to given DB if not passed a specific user_id
var PSQL = function(user_id, db) {
var error_text = "Incorrect access parameters. If you are accessing via OAuth, please check your tokens are correct. For public users, please ensure your table is published."
if (!_.isString(user_id) && !_.isString(db)) throw new Error(error_text);
var me = {
public_user: "publicuser"
, user_id: user_id
, db: db
};
me.username = function(){
var username = this.public_user;
if (_.isString(this.user_id))
username = _.template(global.settings.db_user, {user_id: this.user_id});
return username;
};
me.database = function(){
var database = db;
if (_.isString(this.user_id))
database = _.template(global.settings.db_base_name, {user_id: this.user_id});
return database;
};
me.conString = "tcp://" + me.username() + "@" +
global.settings.db_host + ":" +
global.settings.db_port + "/" +
me.database();
me.eventedQuery = function(sql, callback){
var that = this;
Step(
function(){
that.sanitize(sql, this);
},
function(err, clean){
if (err) throw err;
pg.connect(that.conString, this);
},
function(err, client, done){
if (err) throw err;
var query = client.query(sql);
// NOTE: for some obscure reason passing "done" directly
// as the listener works but can be slower
// (by x2 factor!)
query.on('end', function() { done(); });
return query;
},
function(err, query){
callback(err, query)
}
);
},
me.query = function(sql, callback){
var that = this;
var finish;
Step(
function(){
that.sanitize(sql, this);
},
function(err, clean){
if (err) throw err;
pg.connect(that.conString, this);
},
function(err, client, done){
if (err) throw err;
finish = done;
client.query(sql, this);
},
function(err, res){
// Release client to the pool
// should this be postponed to after the callback ?
// NOTE: if we pass a true value to finish() the client
// will be removed from the pool.
// We don't want this. Not now.
if ( finish ) finish();
callback(err, res)
}
);
};
// throw exception if illegal operations are detected
// NOTE: this check is weak hack, better database
// permissions should be used instead.
me.sanitize = function(sql, callback){
// NOTE: illegal table access is checked in main app
if (sql.match(/^\s+set\s+/i)){
var error = new SyntaxError("SET command is forbidden");
error.http_status = 403;
callback(error);
return;
}
callback(null,true);
};
return me;
};
// little hack for UI
//
// TODO:drop, fix in the UI (it's not documented in doc/API)
//
PSQL.window_sql = function(sql, limit, offset) {
// only window select functions (NOTE: "values" will be broken, "with" will be broken)
if (!_.isNumber(limit) || !_.isNumber(offset) ) return sql;
var cte = '';
if ( sql.match(/^\s*WITH\s/i) ) {
var rem = sql; // analyzed portion of sql
var q; // quote char
var n = 0; // nested parens level
var s = 0; // 0:outQuote, 1:inQuote
var l;
while (1) {
l = rem.search(/[("')]/);
//console.log("REM Is " + rem);
if ( l < 0 ) {
console.log("Malformed SQL");
return sql;
}
var f = rem.charAt(l);
//console.log("n:" + n + " s:" + s + " l:" + l + " charAt(l):" + f + " charAt(l+1):" + rem.charAt(l+1));
if ( s == 0 ) {
if ( f == '(' ) ++n;
else if ( f == ')' ) {
if ( ! --n ) { // end of CTE
cte += rem.substr(0, l+1);
rem = rem.substr(l+1);
//console.log("Out of cte, rem is " + rem);
if ( rem.search(/^s*,/) < 0 ) break;
else continue; // cte and rem already updated
}
}
else { // enter quoting
s = 1; q = f;
}
}
else if ( f == q ) {
if ( rem.charAt(l+1) == f ) ++l; // escaped
else s = 0; // exit quoting
}
cte += rem.substr(0, l+1);
rem = rem.substr(l+1);
}
/*
console.log("cte: " + cte);
console.log("rem: " + rem);
*/
sql = rem; //sql.substr(l+1);
}
if ( sql.match(/^\s*SELECT\s/i) ) {
return cte + "SELECT * FROM (" + sql + ") AS cdbq_1 LIMIT " + limit + " OFFSET " + offset;
}
return cte + sql;
}
module.exports = PSQL;