Stream JSON responses

Reduces memory usage for big datasets.
JIRA CDB-2600 #resolve
This commit is contained in:
Sandro Santilli 2014-04-03 15:43:34 +02:00
parent 4606a44917
commit 49ef1bc0c7
2 changed files with 87 additions and 9 deletions

View File

@ -1,6 +1,10 @@
1.10.0 - 2014-MM-DD
-------------------
Enhancements:
* Stream JSON responses
1.9.1 - 2014-03-27
------------------

View File

@ -1,4 +1,5 @@
var pg = require('./pg');
var util = require('util');
function json() {}
@ -42,23 +43,96 @@ p.formatResultFields = function(flds) {
return nfields;
}
p.transform = function(result, options, callback) {
var j = {
time: options.total_time,
fields: this.formatResultFields(result.fields),
total_rows: result.rowCount,
rows: result.rows
p.startStreaming = function() {
if ( this.opts.profiler ) this.opts.profiler.done('startStreaming');
this.total_rows = 0;
if ( this.opts.beforeSink ) {
this.opts.beforeSink();
delete this.opts.beforeSink;
}
var out = '{"rows":[';
this.opts.sink.write(out);
this._streamingStarted = true;
};
p.handleQueryRow = function(row) {
if ( ! this._streamingStarted ) {
this.startStreaming();
}
var sf = this.opts.skipfields;
if ( sf.length ){
for ( var j=0; j<sf.length; ++j ) {
delete row[sf[j]];
}
}
var out = ( this.total_rows ? ',' : '' ) + JSON.stringify(row);
this.opts.sink.write(out);
this.total_rows++;
}
p.handleQueryEnd = function(result) {
if ( this.error ) {
this.callback(this.error);
return;
}
if ( this.opts.profiler ) this.opts.profiler.done('gotRows');
//console.log("Got query end, result is "); console.dir(result);
if ( ! this._streamingStarted ) {
this.startStreaming();
}
var end = Date.now();
this.opts.total_time = (end - this.start_time)/1000;
// Drop field description for skipped fields
var newfields = [];
var sf = this.opts.skipfields;
if ( sf.length ){
for ( var j=0; j<result.fields.length; ++j ) {
var f = result.fields[j];
if ( sf.indexOf(f.name) == -1 ) newfields.push(f);
}
result.fields = newfields;
}
var end = Date.now();
var total_time = (end - this.start_time)/1000;
var out = [
'],', // end of "rows" array
'"time":', JSON.stringify(total_time),
',"fields":', JSON.stringify(this.formatResultFields(result.fields)),
',"total_rows":', JSON.stringify(result.rowCount)
];
if ( result.notices ) {
var j = {};
for (var i=0; i<result.notices.length; ++i) {
var m = result.notices[i];
var l = m.severity.toLowerCase() + 's';
if ( ! j[l] ) j[l] = [];
j[l].push(m.message);
}
for (var s in j) {
out.push(',"' + s + '":');
out.push( JSON.stringify(j[s]) );
}
delete j;
}
callback(null, j);
};
out.push('}');
while (out.length) {
var cmp = out.shift();
this.opts.sink.write(cmp);
}
this.opts.sink.end();
if ( this.opts.profiler ) this.opts.profiler.done('endStreaming');
this.callback();
}
module.exports = json;