Switch to using evented query model for postgresql

This enables formats for processing rows as they arrive from the
database, thus possibly reducing memory use. For a start the skip
fields are immediately removed from the result, rather than only
at the end.
This commit is contained in:
Sandro Santilli 2013-05-29 14:36:31 +02:00
parent 1f26d31ac4
commit 510ac0533f
2 changed files with 89 additions and 31 deletions

View File

@ -19,7 +19,60 @@ pg.prototype = {
};
pg.prototype.handleQueryRow = function(row, result) {
//console.log("Got query row, row is "); console.dir(row);
//console.log("opts are: "); console.dir(this.opts);
var sf = this.opts.skipfields;
if ( sf.length ){
for ( var j=0; j<sf.length; ++j ) {
delete row[sf[j]];
}
}
result.addRow(row);
};
pg.prototype.handleQueryEnd = function(result) {
if ( this.error ) {
this.callback(this.error);
return;
}
//console.log("Got query end, result is "); console.dir(result);
var end = Date.now();
this.opts.total_time = (end - this.start_time)/1000;
var that = this;
Step (
function packageResult() {
that.transform(result, that.opts, this);
},
function sendResults(err, out){
if (err) throw err;
// return to browser
if ( out ) {
that.opts.sink.send(out);
} else {
console.error("No output from transform, doing nothing ?!");
}
},
function errorHandle(err){
that.callback(err);
}
);
};
pg.prototype.sendResponse = function(opts, callback) {
if ( this.callback ) {
callback(new Error("Invalid double call to .sendResponse on a pg formatter"));
return;
}
this.callback = callback;
this.opts = opts;
var sql = this.getQuery(opts.sql, {
gn: opts.gn,
dp: opts.dp,
@ -28,40 +81,19 @@ pg.prototype.sendResponse = function(opts, callback) {
var that = this;
var start = Date.now();
this.start_time = Date.now();
Step (
function sendQuery() {
var client = new PSQL(opts.user_id, opts.database);
client.query(sql, this);
},
function packageResults(err, result) {
if (err) throw err;
if ( result && opts.skipfields.length ){
for ( var i=0; i<result.rows.length; ++i ) {
for ( var j=0; j<opts.skipfields.length; ++j ) {
delete result.rows[i][opts.skipfields[j]];
}
}
var client = new PSQL(opts.user_id, opts.database);
client.eventedQuery(sql, function(err, query) {
if (err) {
callback(err);
return;
}
var end = Date.now();
opts.total_time = (end - start)/1000;
that.transform(result, opts, this);
},
function sendResults(err, out){
if (err) throw err;
// return to browser
if ( out ) opts.sink.send(out);
},
function errorHandle(err){
callback(err);
}
);
query.on('row', that.handleQueryRow.bind(that));
query.on('end', that.handleQueryEnd.bind(that));
query.on('error', function(err) { that.error = err; });
});
};
module.exports = pg;

View File

@ -58,6 +58,32 @@ var PSQL = function(user_id, db) {
global.settings.db_port + "/" +
me.database();
me.eventedQuery = function(sql, callback){
var that = this;
Step(
function(){
that.sanitize(sql, this);
},
function(err, clean){
if (err) throw err;
pg.connect(that.conString, this);
},
function(err, client, done){
if (err) throw err;
var query = client.query(sql);
// NOTE: for some obscure reason passing "done" directly
// as the listener works but can be slower
// (by x2 factor!)
query.on('end', function() { done(); });
return query;
},
function(err, query){
callback(err, query)
}
);
},
me.query = function(sql, callback){
var that = this;
var finish;