Merge pull request #161 from CartoDB/CDB-3714

Streaming responses for JSON and GeoJSON formats
This commit is contained in:
Raul Ochoa 2014-07-31 09:08:53 +02:00
commit 893bde5794
5 changed files with 211 additions and 69 deletions

View File

@ -55,6 +55,7 @@ var apiKeyAuth = new ApiKeyAuth(Meta, cdbReq);
// Set default configuration
global.settings.db_pubuser = global.settings.db_pubuser || "publicuser";
global.settings.bufferedRows = global.settings.bufferedRows || 1000;
var tableCache = LRU({
// store no more than these many items in the cache
@ -470,6 +471,8 @@ function handleQuery(req, res) {
skipfields: skipfields,
sql: sql,
filename: filename,
bufferedRows: global.settings.bufferedRows,
callback: params.callback,
abortChecker: checkAborted
};

View File

@ -2,22 +2,68 @@
var _ = require('underscore')
var pg = require('./pg');
function geojson() {}
function GeoJsonFormat() {
this.buffer = '';
}
geojson.prototype = new pg('geojson');
GeoJsonFormat.prototype = new pg('geojson');
var p = geojson.prototype;
GeoJsonFormat.prototype._contentType = "application/json; charset=utf-8";
p._contentType = "application/json; charset=utf-8";
p.getQuery = function(sql, options) {
GeoJsonFormat.prototype.getQuery = function(sql, options) {
var gn = options.gn;
var dp = options.dp;
return 'SELECT *, ST_AsGeoJSON(' + gn + ',' + dp + ') as the_geom FROM (' + sql + ') as foo';
};
p.transform = function(result, options, callback) {
_toGeoJSON(result, options.gn, callback);
GeoJsonFormat.prototype.startStreaming = function() {
this.total_rows = 0;
if (this.opts.beforeSink) {
this.opts.beforeSink();
}
if (this.opts.callback) {
this.buffer += this.opts.callback + '(';
}
this.buffer += '{"type": "FeatureCollection", "features": [';
this._streamingStarted = true;
};
GeoJsonFormat.prototype.handleQueryRow = function(row) {
if ( ! this._streamingStarted ) {
this.startStreaming();
}
var geojson = [
'{',
'"type":"Feature",',
'"geometry":' + row[this.opts.gn] + ',',
'"properties":'
];
delete row[this.opts.gn];
delete row['the_geom_webmercator'];
geojson.push(JSON.stringify(row));
geojson.push('}');
this.buffer += (this.total_rows++ ? ',' : '') + geojson.join('');
if (this.total_rows % (this.opts.bufferedRows || 1000)) {
this.opts.sink.write(this.buffer);
this.buffer = '';
}
};
GeoJsonFormat.prototype.handleQueryEnd = function(result) {
this.buffer += ']}'; // end of features
if (this.opts.callback) {
this.buffer += ')';
}
this.opts.sink.write(this.buffer);
this.opts.sink.end();
this.buffer = '';
this.callback();
};
function _toGeoJSON(data, gn, callback){
@ -47,5 +93,5 @@ function _toGeoJSON(data, gn, callback){
}
}
module.exports = geojson;
module.exports.toGeoJSON = _toGeoJSON
module.exports = GeoJsonFormat;
module.exports.toGeoJSON = _toGeoJSON;

View File

@ -1,14 +1,16 @@
var pg = require('./pg');
var pg = require('./pg'),
util = require('util'),
_ = require('underscore');
function json() {}
function JsonFormat() {
this.buffer = '';
}
json.prototype = new pg('json');
JsonFormat.prototype = new pg('json');
var p = json.prototype;
JsonFormat.prototype._contentType = "application/json; charset=utf-8";
p._contentType = "application/json; charset=utf-8";
p.formatResultFields = function(flds) {
JsonFormat.prototype.formatResultFields = function(flds) {
var nfields = {};
for (var i=0; i<flds.length; ++i) {
var f = flds[i];
@ -36,29 +38,104 @@ p.formatResultFields = function(flds) {
tname += '[]';
}
}
//console.log('cname:'+cname+' tname:'+tname);
nfields[f.name] = { type: tname };
}
return nfields;
}
p.transform = function(result, options, callback) {
var j = {
time: options.total_time,
fields: this.formatResultFields(result.fields),
total_rows: result.rowCount,
rows: result.rows
}
if ( result.notices ) {
for (var i=0; i<result.notices.length; ++i) {
var m = result.notices[i];
var l = m.severity.toLowerCase() + 's';
if ( ! j[l] ) j[l] = [];
j[l].push(m.message);
}
}
callback(null, j);
};
module.exports = json;
JsonFormat.prototype.startStreaming = function() {
this.total_rows = 0;
if (this.opts.beforeSink) {
this.opts.beforeSink();
}
if (this.opts.callback) {
this.buffer += this.opts.callback + '(';
}
this.buffer += '{"rows":[';
this._streamingStarted = true;
};
JsonFormat.prototype.handleQueryRow = function(row) {
if ( ! this._streamingStarted ) {
this.startStreaming();
}
this.buffer += (this.total_rows++ ? ',' : '') + JSON.stringify(row);
if (this.total_rows % (this.opts.bufferedRows || 1000)) {
this.opts.sink.write(this.buffer);
this.buffer = '';
}
};
JsonFormat.prototype.handleQueryEnd = function(result) {
if ( this.error ) {
this.callback(this.error);
return;
}
if ( this.opts.profiler ) this.opts.profiler.done('gotRows');
if ( ! this._streamingStarted ) {
this.startStreaming();
}
this.opts.total_time = (Date.now() - this.start_time)/1000;
// Drop field description for skipped fields
if (this.hasSkipFields) {
var newfields = [];
var sf = this.opts.skipfields;
for (var i = 0; i < result.fields.length; i++) {
var f = result.fields[i];
if ( sf.indexOf(f.name) == -1 ) newfields.push(f);
}
result.fields = newfields;
}
var total_time = (Date.now() - this.start_time)/1000;
var out = [
'],', // end of "rows" array
'"time":', JSON.stringify(total_time),
',"fields":', JSON.stringify(this.formatResultFields(result.fields)),
',"total_rows":', JSON.stringify(result.rowCount)
];
if ( result.notices && result.notices.length > 0 ) {
var notices = {},
severities = [];
_.each(result.notices, function(notice) {
var severity = notice.severity.toLowerCase() + 's';
if (!notices[severity]) {
severities.push(severity);
notices[severity] = [];
}
notices[severity].push(notice.message)
});
_.each(severities, function(severity) {
out.push(',');
out.push(JSON.stringify(severity));
out.push(':');
out.push(JSON.stringify(notices[severity]));
});
}
out.push('}');
this.buffer += out.join('');
if (this.opts.callback) {
this.buffer += ')';
}
this.opts.sink.write(this.buffer);
this.opts.sink.end();
this.buffer = '';
this.callback();
};
module.exports = JsonFormat;

View File

@ -1,9 +1,11 @@
var Step = require('step')
var PSQL = require(global.settings.app_root + '/app/models/psql')
var Step = require('step'),
PSQL = require(global.settings.app_root + '/app/models/psql');
function pg(id) { this.id = id; }
function PostgresFormat(id) {
this.id = id;
}
pg.prototype = {
PostgresFormat.prototype = {
getQuery: function(sql, options) {
return sql;
@ -19,27 +21,26 @@ pg.prototype = {
};
pg.prototype.handleQueryRow = function(row, result) {
//console.log("Got query row, row is "); console.dir(row);
//console.log("opts are: "); console.dir(this.opts);
var sf = this.opts.skipfields;
if ( sf.length ){
PostgresFormat.prototype.handleQueryRow = function(row, result) {
result.addRow(row);
};
PostgresFormat.prototype.handleQueryRowWithSkipFields = function(row, result) {
var sf = this.opts.skipfields;
for ( var j=0; j<sf.length; ++j ) {
delete row[sf[j]];
delete row[sf[j]];
}
}
result.addRow(row);
this.handleQueryRow(row, result);
};
pg.prototype.handleNotice = function(msg, result) {
if ( ! result.notices ) result.notices = [];
for (var i=0; i<msg.length; ++i) {
var m = msg[i];
result.notices.push(m);
}
PostgresFormat.prototype.handleNotice = function(msg, result) {
if ( ! result.notices ) result.notices = [];
for (var i=0; i<msg.length; i++) {
result.notices.push(msg[i]);
}
};
pg.prototype.handleQueryEnd = function(result) {
PostgresFormat.prototype.handleQueryEnd = function(result) {
this.queryCanceller = undefined;
if ( this.error ) {
@ -49,14 +50,11 @@ pg.prototype.handleQueryEnd = function(result) {
if ( this.opts.profiler ) this.opts.profiler.done('gotRows');
//console.log("Got query end, result is "); console.dir(result);
var end = Date.now();
this.opts.total_time = (end - this.start_time)/1000;
this.opts.total_time = (Date.now() - this.start_time)/1000;
// Drop field description for skipped fields
var sf = this.opts.skipfields;
if ( sf.length ){
if (this.hasSkipFields) {
var sf = this.opts.skipfields;
var newfields = [];
for ( var j=0; j<result.fields.length; ++j ) {
var f = result.fields[j];
@ -85,7 +83,7 @@ pg.prototype.handleQueryEnd = function(result) {
if ( that.opts.beforeSink ) that.opts.beforeSink();
that.opts.sink.send(out);
} else {
console.error("No output from transform, doing nothing ?!");
console.error("No output from transform, doing nothing ?!");
}
},
function errorHandle(err){
@ -94,7 +92,7 @@ console.error("No output from transform, doing nothing ?!");
);
};
pg.prototype.sendResponse = function(opts, callback) {
PostgresFormat.prototype.sendResponse = function(opts, callback) {
if ( this.callback ) {
callback(new Error("Invalid double call to .sendResponse on a pg formatter"));
return;
@ -102,6 +100,8 @@ pg.prototype.sendResponse = function(opts, callback) {
this.callback = callback;
this.opts = opts;
this.hasSkipFields = opts.skipfields.length;
var sql = this.getQuery(opts.sql, {
gn: opts.gn,
dp: opts.dp,
@ -121,19 +121,23 @@ pg.prototype.sendResponse = function(opts, callback) {
}
if ( that.opts.profiler ) that.opts.profiler.done('eventedQuery');
query.on('row', that.handleQueryRow.bind(that));
if (that.hasSkipFields) {
query.on('row', that.handleQueryRowWithSkipFields.bind(that));
} else {
query.on('row', that.handleQueryRow.bind(that));
}
query.on('end', that.handleQueryEnd.bind(that));
query.on('error', function(err) { that.error = err; });
query.on('notice', function(msg) {
that.handleNotice(msg, query._result);
that.handleNotice(msg, query._result);
});
});
};
pg.prototype.cancel = function() {
PostgresFormat.prototype.cancel = function() {
if (this.queryCanceller) {
this.queryCanceller.call();
}
};
module.exports = pg;
module.exports = PostgresFormat;

View File

@ -1409,4 +1409,16 @@ test('GET /api/v1/sql with SQL parameter on SELECT only should return CORS heade
});
});
test('GET with callback param returns wrapped result set with callback as jsonp', function(done) {
assert.response(app, {
url: '/api/v1/sql?q=SELECT%20*%20FROM%20untitle_table_4&callback=foo_jsonp',
headers: {host: 'vizzuality.cartodb.com'},
method: 'GET'
},{ }, function(res) {
assert.equal(res.statusCode, 200, res.body);
assert.ok(res.body.match(/foo\_jsonp\(.*\)/));
done();
});
});
});