CartoDB-SQL-API/app/models/formats/pg.js

163 lines
3.8 KiB
JavaScript
Raw Normal View History

2015-05-13 19:00:01 +08:00
var step = require('step');
var PSQL = require('cartodb-psql');
var assert = require('assert');
2014-07-31 01:43:43 +08:00
function PostgresFormat(id) {
this.id = id;
}
2014-07-31 01:43:43 +08:00
PostgresFormat.prototype = {
2015-05-13 19:00:01 +08:00
getQuery: function(sql/*, options*/) {
return sql;
},
getContentType: function(){
return this._contentType;
},
getFileExtension: function() {
return this.id;
2014-06-03 00:57:45 +08:00
}
};
2014-07-31 01:43:43 +08:00
PostgresFormat.prototype.handleQueryRow = function(row, result) {
result.addRow(row);
};
PostgresFormat.prototype.handleQueryRowWithSkipFields = function(row, result) {
var sf = this.opts.skipfields;
for ( var j=0; j<sf.length; ++j ) {
delete row[sf[j]];
}
this.handleQueryRow(row, result);
};
2014-07-31 01:43:43 +08:00
PostgresFormat.prototype.handleNotice = function(msg, result) {
2015-05-13 19:00:01 +08:00
if ( ! result.notices ) {
result.notices = [];
}
for (var i=0; i<msg.length; i++) {
result.notices.push(msg[i]);
}
};
2014-07-31 01:43:43 +08:00
PostgresFormat.prototype.handleQueryEnd = function(result) {
this.queryCanceller = undefined;
if ( this.error ) {
this.callback(this.error);
return;
}
2015-05-13 19:00:01 +08:00
if ( this.opts.profiler ) {
this.opts.profiler.done('gotRows');
}
2014-03-20 01:34:21 +08:00
this.opts.total_time = (Date.now() - this.start_time)/1000;
// Drop field description for skipped fields
if (this.hasSkipFields) {
var sf = this.opts.skipfields;
var newfields = [];
for ( var j=0; j<result.fields.length; ++j ) {
var f = result.fields[j];
2015-05-13 19:00:01 +08:00
if ( sf.indexOf(f.name) === -1 ) {
newfields.push(f);
}
}
result.fields = newfields;
}
var that = this;
2015-05-13 19:00:01 +08:00
step (
function packageResult() {
if ( that.opts.abortChecker ) {
that.opts.abortChecker('packageResult');
}
that.transform(result, that.opts, this);
},
function sendResults(err, out){
2015-05-13 19:00:01 +08:00
assert.ifError(err);
// return to browser
if ( out ) {
2015-05-13 19:00:01 +08:00
if ( that.opts.beforeSink ) {
that.opts.beforeSink();
}
that.opts.sink.send(out);
} else {
console.error("No output from transform, doing nothing ?!");
}
},
function errorHandle(err){
that.callback(err);
}
);
};
2014-07-31 01:43:43 +08:00
PostgresFormat.prototype.sendResponse = function(opts, callback) {
if ( this.callback ) {
callback(new Error("Invalid double call to .sendResponse on a pg formatter"));
return;
}
this.callback = callback;
this.opts = opts;
this.hasSkipFields = opts.skipfields.length;
var sql = this.getQuery(opts.sql, {
gn: opts.gn,
dp: opts.dp,
skipfields: opts.skipfields
});
var that = this;
this.start_time = Date.now();
this.client = new PSQL(opts.dbopts, {}, { destroyOnError: true });
this.client.eventedQuery(sql, function(err, query, queryCanceller) {
that.queryCanceller = queryCanceller;
if (err) {
callback(err);
return;
}
2015-05-13 19:00:01 +08:00
if ( that.opts.profiler ) {
that.opts.profiler.done('eventedQuery');
}
if (that.hasSkipFields) {
query.on('row', that.handleQueryRowWithSkipFields.bind(that));
} else {
query.on('row', that.handleQueryRow.bind(that));
}
query.on('end', that.handleQueryEnd.bind(that));
2015-03-02 21:34:01 +08:00
query.on('error', function(err) {
that.error = err;
if (err.message && err.message.match(/row too large, was \d* bytes/i)) {
return console.error(JSON.stringify({
2015-03-02 21:34:01 +08:00
username: opts.username,
type: 'row_size_limit_exceeded',
error: err.message
}));
}
that.handleQueryEnd();
2015-03-02 21:34:01 +08:00
});
query.on('notice', function(msg) {
that.handleNotice(msg, query._result);
});
});
};
2014-07-31 01:43:43 +08:00
PostgresFormat.prototype.cancel = function() {
if (this.queryCanceller) {
this.queryCanceller.call();
}
};
2014-07-31 01:43:43 +08:00
module.exports = PostgresFormat;