Use streaming for json responses. Number of buffered rows can be specified by config.

This commit is contained in:
Raul Ochoa 2014-07-31 02:17:07 +02:00
parent eaba2e20d4
commit 73a195a7fa
3 changed files with 113 additions and 34 deletions

View File

@ -55,6 +55,7 @@ var apiKeyAuth = new ApiKeyAuth(Meta, cdbReq);
// Set default configuration
global.settings.db_pubuser = global.settings.db_pubuser || "publicuser";
global.settings.bufferedRows = global.settings.bufferedRows || 1000;
var tableCache = LRU({
// store no more than these many items in the cache
@ -470,6 +471,8 @@ function handleQuery(req, res) {
skipfields: skipfields,
sql: sql,
filename: filename,
bufferedRows: global.settings.bufferedRows,
callback: params.callback,
abortChecker: checkAborted
};

View File

@ -1,14 +1,16 @@
var pg = require('./pg');
var pg = require('./pg'),
util = require('util'),
_ = require('underscore');
function json() {}
function JsonFormat() {
this.buffer = '';
}
json.prototype = new pg('json');
JsonFormat.prototype = new pg('json');
var p = json.prototype;
JsonFormat.prototype._contentType = "application/json; charset=utf-8";
p._contentType = "application/json; charset=utf-8";
p.formatResultFields = function(flds) {
JsonFormat.prototype.formatResultFields = function(flds) {
var nfields = {};
for (var i=0; i<flds.length; ++i) {
var f = flds[i];
@ -36,29 +38,104 @@ p.formatResultFields = function(flds) {
tname += '[]';
}
}
//console.log('cname:'+cname+' tname:'+tname);
nfields[f.name] = { type: tname };
}
return nfields;
}
p.transform = function(result, options, callback) {
var j = {
time: options.total_time,
fields: this.formatResultFields(result.fields),
total_rows: result.rowCount,
rows: result.rows
}
if ( result.notices ) {
for (var i=0; i<result.notices.length; ++i) {
var m = result.notices[i];
var l = m.severity.toLowerCase() + 's';
if ( ! j[l] ) j[l] = [];
j[l].push(m.message);
}
}
callback(null, j);
};
module.exports = json;
JsonFormat.prototype.startStreaming = function() {
this.total_rows = 0;
if (this.opts.beforeSink) {
this.opts.beforeSink();
}
if (this.opts.callback) {
this.buffer += this.opts.callback + '(';
}
this.buffer += '{"rows":[';
this._streamingStarted = true;
};
JsonFormat.prototype.handleQueryRow = function(row) {
if ( ! this._streamingStarted ) {
this.startStreaming();
}
this.buffer += (this.total_rows++ ? ',' : '') + JSON.stringify(row);
if (this.total_rows % (this.opts.bufferedRows || 1000)) {
this.opts.sink.write(this.buffer);
this.buffer = '';
}
};
JsonFormat.prototype.handleQueryEnd = function(result) {
if ( this.error ) {
this.callback(this.error);
return;
}
if ( this.opts.profiler ) this.opts.profiler.done('gotRows');
if ( ! this._streamingStarted ) {
this.startStreaming();
}
this.opts.total_time = (Date.now() - this.start_time)/1000;
// Drop field description for skipped fields
if (this.hasSkipFields) {
var newfields = [];
var sf = this.opts.skipfields;
for (var i = 0; i < result.fields.length; i++) {
var f = result.fields[i];
if ( sf.indexOf(f.name) == -1 ) newfields.push(f);
}
result.fields = newfields;
}
var total_time = (Date.now() - this.start_time)/1000;
var out = [
'],', // end of "rows" array
'"time":', JSON.stringify(total_time),
',"fields":', JSON.stringify(this.formatResultFields(result.fields)),
',"total_rows":', JSON.stringify(result.rowCount)
];
if ( result.notices && result.notices.length > 0 ) {
var notices = {},
severities = [];
_.each(result.notices, function(notice) {
var severity = notice.severity.toLowerCase() + 's';
if (!notices[severity]) {
severities.push(severity);
notices[severity] = [];
}
notices[severity].push(notice.message)
});
_.each(severities, function(severity) {
out.push(',');
out.push(JSON.stringify(severity));
out.push(':');
out.push(JSON.stringify(notices[severity]));
});
}
out.push('}');
this.buffer += out.join('');
if (this.opts.callback) {
this.buffer += ')';
}
this.opts.sink.write(this.buffer);
this.opts.sink.end();
this.buffer = '';
this.callback();
};
module.exports = JsonFormat;

View File

@ -30,15 +30,14 @@ PostgresFormat.prototype.handleQueryRowWithSkipFields = function(row, result) {
for ( var j=0; j<sf.length; ++j ) {
delete row[sf[j]];
}
result.addRow(row);
this.handleQueryRow(row, result);
};
PostgresFormat.prototype.handleNotice = function(msg, result) {
if ( ! result.notices ) result.notices = [];
for (var i=0; i<msg.length; ++i) {
var m = msg[i];
result.notices.push(m);
}
if ( ! result.notices ) result.notices = [];
for (var i=0; i<msg.length; i++) {
result.notices.push(msg[i]);
}
};
PostgresFormat.prototype.handleQueryEnd = function(result) {