Refactor format objects to expose a common stream-able interface
See app/models/formats/README for info
This commit is contained in:
parent
aff77399b1
commit
76705a3fd7
@ -45,9 +45,6 @@ var tableCache = LRU({
|
||||
maxAge: global.settings.tableCacheMaxAge || 1000*60*10
|
||||
});
|
||||
|
||||
// Keeps track of what's waiting baking for export
|
||||
var bakingExports = {};
|
||||
|
||||
app.use(express.bodyParser());
|
||||
app.enable('jsonp callback');
|
||||
|
||||
@ -139,9 +136,6 @@ function handleQuery(req, res) {
|
||||
skipfields = [];
|
||||
}
|
||||
|
||||
// setup step run
|
||||
var start = new Date().getTime();
|
||||
|
||||
if ( -1 === supportedFormats.indexOf(format) )
|
||||
throw new Error("Invalid format: " + format);
|
||||
|
||||
@ -155,11 +149,13 @@ function handleQuery(req, res) {
|
||||
|
||||
var authenticated;
|
||||
|
||||
var formatter;
|
||||
|
||||
// 1. Get database from redis via the username stored in the host header subdomain
|
||||
// 2. Run the request through OAuth to get R/W user id if signed
|
||||
// 3. Get the list of tables affected by the query
|
||||
// 4. Run query with r/w or public user
|
||||
// 5. package results and send back
|
||||
// 4. Setup headers
|
||||
// 5. Send formatted results back
|
||||
Step(
|
||||
function getDatabaseName() {
|
||||
if (_.isNull(database)) {
|
||||
@ -205,7 +201,7 @@ function handleQuery(req, res) {
|
||||
pg.query("SELECT CDB_QueryTables($quotesql$" + sql + "$quotesql$)", this);
|
||||
}
|
||||
},
|
||||
function queryResult(err, result){
|
||||
function setHeaders(err, result){
|
||||
if (err) throw err;
|
||||
|
||||
// store explain result in local Cache
|
||||
@ -239,28 +235,15 @@ function handleQuery(req, res) {
|
||||
}
|
||||
|
||||
|
||||
var f = formats[format]
|
||||
if(f && !f.is_file) {
|
||||
sql = formats[format].getQuery(sql, {
|
||||
gn: gn,
|
||||
dp: dp,
|
||||
skipfields: skipfields
|
||||
})
|
||||
} else {
|
||||
// These format are implemented via OGR2OGR, so we don't
|
||||
// need to run a query ourselves
|
||||
return null;
|
||||
}
|
||||
if ( ! formats.hasOwnProperty(format) ) throw new Error("Unknown format " + format);
|
||||
var fClass = formats[format]
|
||||
formatter = new fClass();
|
||||
|
||||
pg.query(window_sql(sql,limit,offset), this);
|
||||
},
|
||||
function setHeaders(err, result){
|
||||
if (err) throw err;
|
||||
|
||||
// configure headers for given format
|
||||
var use_inline = !requestedFormat && !requestedFilename;
|
||||
res.header("Content-Disposition", getContentDisposition(format, filename, use_inline));
|
||||
res.header("Content-Type", getContentType(format));
|
||||
res.header("Content-Type", formatter.getContentType());
|
||||
|
||||
// allow cross site post
|
||||
setCrossDomain(res);
|
||||
@ -277,38 +260,16 @@ function handleQuery(req, res) {
|
||||
res.header('Cache-Control', 'no-cache,max-age='+ttl+',must-revalidate,public');
|
||||
}
|
||||
|
||||
|
||||
return result;
|
||||
},
|
||||
function packageResults(err, result){
|
||||
function generateFormat(err, result){
|
||||
if (err) throw err;
|
||||
|
||||
if ( result && skipfields.length ){
|
||||
for ( var i=0; i<result.rows.length; ++i ) {
|
||||
for ( var j=0; j<skipfields.length; ++j ) {
|
||||
delete result.rows[i][skipfields[j]];
|
||||
}
|
||||
}
|
||||
}
|
||||
// TODO: drop this, fix UI!
|
||||
sql = window_sql(sql,limit,offset);
|
||||
|
||||
var end = new Date().getTime();
|
||||
var total_time = (end - start)/1000;
|
||||
|
||||
var f = formats[format];
|
||||
if(!f.is_file) {
|
||||
f.transform(result, {
|
||||
gn: gn,
|
||||
dp: dp,
|
||||
skipfields: skipfields,
|
||||
total_time: total_time,
|
||||
database: database,
|
||||
user_id: user_id,
|
||||
sql: sql,
|
||||
filename: filename
|
||||
}, this)
|
||||
return;
|
||||
} else {
|
||||
var opts = {
|
||||
sink: res,
|
||||
gn: gn,
|
||||
dp: dp,
|
||||
skipfields: skipfields,
|
||||
@ -317,65 +278,11 @@ function handleQuery(req, res) {
|
||||
sql: sql,
|
||||
filename: filename
|
||||
}
|
||||
var next = this;
|
||||
var reqKey = f.getKey(opts);
|
||||
var qElem = new ExportRequest(res, this);
|
||||
var baking = bakingExports[reqKey];
|
||||
if ( baking ) {
|
||||
baking.req.push( qElem );
|
||||
} else {
|
||||
baking = bakingExports[reqKey] = { req: [ qElem ] };
|
||||
f.generate(opts, function(err, dumpfile) {
|
||||
if(err) {
|
||||
next(err);
|
||||
return;
|
||||
}
|
||||
Step (
|
||||
function sendResults(err) {
|
||||
var nextPipe = function(finish) {
|
||||
var r = baking.req.shift();
|
||||
if ( ! r ) { finish(null); return; }
|
||||
r.sendFile(err, dumpfile, function() {
|
||||
nextPipe(finish);
|
||||
});
|
||||
}
|
||||
|
||||
if ( ! err ) nextPipe(this);
|
||||
else {
|
||||
_.each(baking.req, function(r) {
|
||||
r.cb(err);
|
||||
});
|
||||
return true;
|
||||
}
|
||||
formatter.sendResponse(opts, this);
|
||||
},
|
||||
function cleanup(err) {
|
||||
delete bakingExports[reqKey];
|
||||
|
||||
// unlink dump file (sync to avoid race condition)
|
||||
console.log("removing", dumpfile);
|
||||
try { fs.unlinkSync(dumpfile); }
|
||||
catch (e) {
|
||||
if ( e.code != 'ENOENT' ) {
|
||||
console.log("Could not unlink dumpfile " + dumpfile + ": " + e);
|
||||
}
|
||||
}
|
||||
}
|
||||
);
|
||||
})
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
throw new Error("Unexpected format in packageResults: " + format);
|
||||
},
|
||||
function sendResults(err, out){
|
||||
if (err) throw err;
|
||||
|
||||
// return to browser
|
||||
if ( out ) res.send(out);
|
||||
},
|
||||
function errorHandle(err, result){
|
||||
handleException(err, res);
|
||||
function errorHandle(err){
|
||||
if ( err ) handleException(err, res);
|
||||
}
|
||||
);
|
||||
} catch (err) {
|
||||
@ -391,44 +298,8 @@ function handleCacheStatus(req, res){
|
||||
res.send({explain: {pid: process.pid, hits: totalExplainHits, keys : totalExplainKeys }});
|
||||
}
|
||||
|
||||
function ExportRequest(ostream, callback) {
|
||||
this.cb = callback;
|
||||
this.ostream = ostream;
|
||||
this.istream = null;
|
||||
this.canceled = false;
|
||||
|
||||
var that = this;
|
||||
|
||||
this.ostream.on('close', function() {
|
||||
//console.log("Request close event, qElem.stream is " + qElem.stream);
|
||||
that.canceled = true;
|
||||
if ( that.istream ) {
|
||||
that.istream.destroy();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
ExportRequest.prototype.sendFile = function (err, filename, callback) {
|
||||
var that = this;
|
||||
if ( ! this.canceled ) {
|
||||
//console.log("Creating readable stream out of dumpfile");
|
||||
this.istream = fs.createReadStream(filename)
|
||||
.on('open', function(fd) {
|
||||
that.istream.pipe(that.ostream);
|
||||
callback();
|
||||
})
|
||||
.on('error', function(e) {
|
||||
console.log("Can't send response: " + e);
|
||||
that.ostream.end();
|
||||
callback();
|
||||
});
|
||||
} else {
|
||||
//console.log("Response was canceled, not streaming the file");
|
||||
callback();
|
||||
}
|
||||
this.cb();
|
||||
}
|
||||
|
||||
// TODO: delegate to formats
|
||||
function getContentDisposition(format, filename, inline) {
|
||||
var ext = 'json';
|
||||
if (format === 'geojson'){
|
||||
@ -453,13 +324,6 @@ function getContentDisposition(format, filename, inline) {
|
||||
return ( inline ? 'inline' : 'attachment' ) +'; filename=' + filename + '.' + ext + '; modification-date="' + time + '";';
|
||||
}
|
||||
|
||||
function getContentType(format){
|
||||
var type = "application/json; charset=utf-8";
|
||||
var f = formats[format]
|
||||
type = f.getContentType();
|
||||
return type;
|
||||
}
|
||||
|
||||
function setCrossDomain(res){
|
||||
res.header("Access-Control-Allow-Origin", "*");
|
||||
res.header("Access-Control-Allow-Headers", "X-Requested-With, X-Prototype-Version, X-CSRF-Token");
|
||||
|
18
app/models/formats/README
Normal file
18
app/models/formats/README
Normal file
@ -0,0 +1,18 @@
|
||||
Format classes are required to expose a constructor with no arguments
|
||||
and a sendResponse(opts,callback) method.
|
||||
|
||||
The ``opts`` parameter contains:
|
||||
|
||||
sink Output stream to send the reponse to
|
||||
sql SQL query requested by the user
|
||||
skipfields Comma separate list of fields to skip from output
|
||||
really only needed with "SELECT *" queries
|
||||
gn Name of the geometry column (for formats requiring one)
|
||||
dp Number of decimal points of precision for geometries (if used)
|
||||
database Name of the database to connect to
|
||||
user_id Identifier of the user
|
||||
filename Name to use for attachment disposition
|
||||
|
||||
The ``callback`` parameter is a function that is invoked when the
|
||||
format object finished with sending the result to the sink.
|
||||
If an error occurs the callback is invoked with an Error argument.
|
@ -1,59 +1,17 @@
|
||||
var ogr = require('./ogr');
|
||||
|
||||
function csv() {}
|
||||
|
||||
var shp = require('./shp');
|
||||
var toOGR = shp.toOGR;
|
||||
var generateMD5 = shp.generateMD5;
|
||||
csv.prototype = new ogr('csv');
|
||||
|
||||
function csv() {
|
||||
}
|
||||
var p = csv.prototype;
|
||||
|
||||
csv.prototype = {
|
||||
p._contentType = "text/csv; charset=utf-8; header=present";
|
||||
p._fileExtension = "csv";
|
||||
|
||||
id: "csv",
|
||||
|
||||
is_file: true,
|
||||
|
||||
getQuery: function(sql, options) {
|
||||
return null; // dont execute the query
|
||||
},
|
||||
|
||||
getContentType: function(){
|
||||
return "text/csv; charset=utf-8; header=present";
|
||||
},
|
||||
|
||||
getFileExtension: function() {
|
||||
return "csv"
|
||||
},
|
||||
|
||||
transform: function(result, options, callback) {
|
||||
throw "should not be called for file formats"
|
||||
},
|
||||
|
||||
getKey: function(options) {
|
||||
return [this.id,
|
||||
options.dbname,
|
||||
options.user_id,
|
||||
options.gn,
|
||||
generateMD5(options.sql)].concat(options.skipfields).join(':');
|
||||
},
|
||||
|
||||
generate: function(options, callback) {
|
||||
p.generate = function(options, callback) {
|
||||
var o = options;
|
||||
toOGR_SingleFile(o.database, o.user_id, o.gn, o.sql, o.skipfields, 'CSV', 'csv', callback);
|
||||
}
|
||||
|
||||
this.toOGR_SingleFile(o.database, o.user_id, o.gn, o.sql, o.skipfields, 'CSV', 'csv', callback);
|
||||
};
|
||||
|
||||
function toOGR_SingleFile(dbname, user_id, gcol, sql, skipfields, fmt, ext, callback) {
|
||||
var tmpdir = global.settings.tmpDir || '/tmp';
|
||||
var reqKey = [ fmt, dbname, user_id, gcol, generateMD5(sql) ].concat(skipfields).join(':');
|
||||
var outdirpath = tmpdir + '/sqlapi-' + reqKey;
|
||||
var dumpfile = outdirpath + ':cartodb-query.' + ext;
|
||||
|
||||
// TODO: following tests:
|
||||
// - fetch query with no "the_geom" column
|
||||
toOGR(dbname, user_id, gcol, sql, skipfields, fmt, dumpfile, callback);
|
||||
}
|
||||
|
||||
module.exports = new csv();
|
||||
module.exports.toOGR_SingleFile = toOGR_SingleFile;
|
||||
module.exports = csv;
|
||||
|
@ -1,31 +1,23 @@
|
||||
|
||||
var _ = require('underscore')
|
||||
var pg = require('./pg');
|
||||
|
||||
function geojson() {
|
||||
}
|
||||
function geojson() {}
|
||||
|
||||
geojson.prototype = {
|
||||
geojson.prototype = new pg('geojson');
|
||||
|
||||
id: "geojson",
|
||||
var p = geojson.prototype;
|
||||
|
||||
getQuery: function(sql, options) {
|
||||
p._contentType = "application/json; charset=utf-8";
|
||||
|
||||
p.getQuery = function(sql, options) {
|
||||
var gn = options.gn;
|
||||
var dp = options.dp;
|
||||
return 'SELECT *, ST_AsGeoJSON(' + gn + ',' + dp + ') as the_geom FROM (' + sql + ') as foo';
|
||||
},
|
||||
};
|
||||
|
||||
getContentType: function(){
|
||||
return "application/json; charset=utf-8";
|
||||
},
|
||||
|
||||
getFileExtension: function() {
|
||||
return this.id;
|
||||
},
|
||||
|
||||
transform: function(result, options, callback) {
|
||||
p.transform = function(result, options, callback) {
|
||||
_toGeoJSON(result, options.gn, callback);
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
function _toGeoJSON(data, gn, callback){
|
||||
@ -55,5 +47,5 @@ function _toGeoJSON(data, gn, callback){
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = new geojson();
|
||||
module.exports = geojson;
|
||||
module.exports.toGeoJSON = _toGeoJSON
|
||||
|
@ -8,9 +8,9 @@ var path = require('path');
|
||||
var folder = __dirname + "/"
|
||||
//"./app/models/formats/"
|
||||
require("fs").readdirSync(folder).forEach(function(file) {
|
||||
if (path.extname(file) === '.js' && file !== 'index.js') {
|
||||
if (path.extname(file) === '.js' && file !== 'index.js' && file !== 'ogr.js' && file !== 'pg.js' ) {
|
||||
var format = require(folder + file);
|
||||
formats[format.id] = format;
|
||||
formats[format.prototype.id] = format;
|
||||
}
|
||||
});
|
||||
|
||||
|
@ -1,32 +1,20 @@
|
||||
var pg = require('./pg');
|
||||
|
||||
function json() {
|
||||
}
|
||||
function json() {}
|
||||
|
||||
json.prototype = {
|
||||
json.prototype = new pg('json');
|
||||
|
||||
id: "json",
|
||||
var p = json.prototype;
|
||||
|
||||
getQuery: function(sql, options) {
|
||||
return sql;
|
||||
},
|
||||
p._contentType = "application/json; charset=utf-8";
|
||||
|
||||
getContentType: function(){
|
||||
return "application/json; charset=utf-8";
|
||||
},
|
||||
|
||||
getFileExtension: function() {
|
||||
return this.id;
|
||||
},
|
||||
|
||||
transform: function(result, options, callback) {
|
||||
p.transform = function(result, options, callback) {
|
||||
var j = {
|
||||
time: options.total_time,
|
||||
total_rows: result.rowCount,
|
||||
rows: result.rows
|
||||
}
|
||||
callback(null, j);
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
module.exports = new json();
|
||||
module.exports = json;
|
||||
|
@ -1,44 +1,17 @@
|
||||
var toOGR_SingleFile = require('./csv').toOGR_SingleFile
|
||||
var generateMD5 = require('./shp').generateMD5;
|
||||
var ogr = require('./ogr');
|
||||
|
||||
function kml() {
|
||||
}
|
||||
function kml() {}
|
||||
|
||||
kml.prototype = {
|
||||
kml.prototype = new ogr('kml');
|
||||
|
||||
id: "kml",
|
||||
var p = kml.prototype;
|
||||
|
||||
is_file: true,
|
||||
p._contentType = "application/kml; charset=utf-8";
|
||||
p._fileExtension = "kml";
|
||||
|
||||
getQuery: function(sql, options) {
|
||||
return null; // dont execute the query
|
||||
},
|
||||
|
||||
getContentType: function(){
|
||||
return "application/kml; charset=utf-8";
|
||||
},
|
||||
|
||||
getFileExtension: function() {
|
||||
return "kml"
|
||||
},
|
||||
|
||||
transform: function(result, options, callback) {
|
||||
throw "should not be called for file formats"
|
||||
},
|
||||
|
||||
getKey: function(options) {
|
||||
return [this.id,
|
||||
options.dbname,
|
||||
options.user_id,
|
||||
options.gn,
|
||||
generateMD5(options.sql)].concat(options.skipfields).join(':');
|
||||
},
|
||||
|
||||
generate: function(options, callback) {
|
||||
p.generate = function(options, callback) {
|
||||
var o = options;
|
||||
toOGR_SingleFile(o.database, o.user_id, o.gn, o.sql, o.skipfields, 'KML', 'kml', callback);
|
||||
}
|
||||
|
||||
this.toOGR_SingleFile(o.database, o.user_id, o.gn, o.sql, o.skipfields, 'KML', 'kml', callback);
|
||||
};
|
||||
|
||||
module.exports = new kml();
|
||||
module.exports = kml;
|
||||
|
263
app/models/formats/ogr.js
Normal file
263
app/models/formats/ogr.js
Normal file
@ -0,0 +1,263 @@
|
||||
|
||||
var crypto = require('crypto')
|
||||
var Step = require('step')
|
||||
var fs = require('fs')
|
||||
var _ = require('underscore')
|
||||
var PSQL = require(global.settings.app_root + '/app/models/psql')
|
||||
var spawn = require('child_process').spawn
|
||||
|
||||
// Keeps track of what's waiting baking for export
|
||||
var bakingExports = {};
|
||||
|
||||
// Return database username from user_id
|
||||
// NOTE: a "null" user_id is a request to use the public user
|
||||
function userid_to_dbuser(user_id) {
|
||||
if ( _.isString(user_id) )
|
||||
return _.template(global.settings.db_user, {user_id: user_id});
|
||||
return "publicuser" // FIXME: make configurable
|
||||
};
|
||||
|
||||
|
||||
|
||||
function ogr(id) {
|
||||
this.id = id;
|
||||
}
|
||||
|
||||
ogr.prototype = {
|
||||
|
||||
id: "ogr",
|
||||
|
||||
is_file: true,
|
||||
|
||||
getQuery: function(sql, options) {
|
||||
return null; // dont execute the query
|
||||
},
|
||||
|
||||
transform: function(result, options, callback) {
|
||||
throw "should not be called for file formats"
|
||||
},
|
||||
|
||||
getContentType: function(){ return this._contentType; },
|
||||
|
||||
getFileExtension: function(){ return this._fileExtension; },
|
||||
|
||||
getKey: function(options) {
|
||||
return [this.id,
|
||||
options.dbname,
|
||||
options.user_id,
|
||||
options.gn,
|
||||
this.generateMD5(options.sql)].concat(options.skipfields).join(':');
|
||||
},
|
||||
|
||||
generateMD5: function (data){
|
||||
var hash = crypto.createHash('md5');
|
||||
hash.update(data);
|
||||
return hash.digest('hex');
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
// Internal function usable by all OGR-driven outputs
|
||||
ogr.prototype.toOGR = function(dbname, user_id, gcol, sql, skipfields, out_format, out_filename, callback) {
|
||||
var ogr2ogr = 'ogr2ogr'; // FIXME: make configurable
|
||||
var dbhost = global.settings.db_host;
|
||||
var dbport = global.settings.db_port;
|
||||
var dbuser = userid_to_dbuser(user_id);
|
||||
var dbpass = ''; // turn into a parameter..
|
||||
|
||||
var columns = [];
|
||||
|
||||
// Drop ending semicolon (ogr doens't like it)
|
||||
sql = sql.replace(/;\s*$/, '');
|
||||
|
||||
Step (
|
||||
|
||||
function fetchColumns() {
|
||||
var colsql = 'SELECT * FROM (' + sql + ') as _cartodbsqlapi LIMIT 1';
|
||||
var pg = new PSQL(user_id, dbname, 1, 0);
|
||||
pg.query(colsql, this);
|
||||
},
|
||||
function spawnDumper(err, result) {
|
||||
if (err) throw err;
|
||||
|
||||
//if ( ! result.rows.length ) throw new Error("Query returns no rows");
|
||||
|
||||
// Skip system columns
|
||||
if ( result.rows.length ) {
|
||||
for (var k in result.rows[0]) {
|
||||
if ( skipfields.indexOf(k) != -1 ) continue;
|
||||
if ( out_format != 'CSV' && k == "the_geom_webmercator" ) continue; // TODO: drop ?
|
||||
if ( out_format == 'CSV' ) columns.push('"' + k + '"::text');
|
||||
else columns.push('"' + k + '"');
|
||||
}
|
||||
} else columns.push('*');
|
||||
//console.log(columns.join(','));
|
||||
|
||||
var next = this;
|
||||
|
||||
sql = 'SELECT ' + columns.join(',')
|
||||
+ ' FROM (' + sql + ') as _cartodbsqlapi';
|
||||
|
||||
var child = spawn(ogr2ogr, [
|
||||
'-f', out_format,
|
||||
'-lco', 'ENCODING=UTF-8',
|
||||
'-lco', 'LINEFORMAT=CRLF',
|
||||
out_filename,
|
||||
"PG:host=" + dbhost
|
||||
+ " user=" + dbuser
|
||||
+ " dbname=" + dbname
|
||||
+ " password=" + dbpass
|
||||
+ " tables=fake" // trick to skip query to geometry_columns
|
||||
+ "",
|
||||
'-sql', sql
|
||||
]);
|
||||
|
||||
/*
|
||||
console.log(['ogr2ogr',
|
||||
'-f', '"'+out_format+'"',
|
||||
out_filename,
|
||||
"'PG:host=" + dbhost
|
||||
+ " user=" + dbuser
|
||||
+ " dbname=" + dbname
|
||||
+ " password=" + dbpass
|
||||
+ " tables=fake" // trick to skip query to geometry_columns
|
||||
+ "'",
|
||||
"-sql '", sql, "'"].join(' '));
|
||||
*/
|
||||
|
||||
var stdout = '';
|
||||
child.stdout.on('data', function(data) {
|
||||
stdout += data;
|
||||
//console.log('stdout: ' + data);
|
||||
});
|
||||
|
||||
var stderr;
|
||||
var logErrPat = new RegExp(/^ERROR/);
|
||||
child.stderr.on('data', function(data) {
|
||||
data = data.toString(); // know of a faster way ?
|
||||
// Store only the first ERROR line
|
||||
if ( ! stderr && data.match(logErrPat) ) stderr = data;
|
||||
console.log('ogr2ogr stderr: ' + data);
|
||||
});
|
||||
|
||||
child.on('exit', function(code) {
|
||||
if ( code ) {
|
||||
var emsg = stderr.split('\n')[0];
|
||||
// TODO: add more info about this error ?
|
||||
//if ( RegExp(/attempt to write non-.*geometry.*to.*type shapefile/i).exec(emsg) )
|
||||
next(new Error(emsg));
|
||||
} else {
|
||||
next(null);
|
||||
}
|
||||
});
|
||||
},
|
||||
function finish(err) {
|
||||
callback(err, out_filename);
|
||||
}
|
||||
);
|
||||
};
|
||||
|
||||
ogr.prototype.toOGR_SingleFile = function(dbname, user_id, gcol, sql, skipfields, fmt, ext, callback) {
|
||||
var tmpdir = global.settings.tmpDir || '/tmp';
|
||||
var reqKey = [ fmt, dbname, user_id, gcol, this.generateMD5(sql) ].concat(skipfields).join(':');
|
||||
var outdirpath = tmpdir + '/sqlapi-' + reqKey;
|
||||
var dumpfile = outdirpath + ':cartodb-query.' + ext;
|
||||
|
||||
// TODO: following tests:
|
||||
// - fetch query with no "the_geom" column
|
||||
this.toOGR(dbname, user_id, gcol, sql, skipfields, fmt, dumpfile, callback);
|
||||
};
|
||||
|
||||
ogr.prototype.sendResponse = function(opts, callback) {
|
||||
var next = callback;
|
||||
var reqKey = this.getKey(opts);
|
||||
var qElem = new ExportRequest(opts.sink, callback);
|
||||
var baking = bakingExports[reqKey];
|
||||
if ( baking ) {
|
||||
baking.req.push( qElem );
|
||||
} else {
|
||||
baking = bakingExports[reqKey] = { req: [ qElem ] };
|
||||
this.generate(opts, function(err, dumpfile) {
|
||||
if(err) {
|
||||
next(err);
|
||||
return;
|
||||
}
|
||||
Step (
|
||||
function sendResults(err) {
|
||||
var nextPipe = function(finish) {
|
||||
var r = baking.req.shift();
|
||||
if ( ! r ) { finish(null); return; }
|
||||
r.sendFile(err, dumpfile, function() {
|
||||
nextPipe(finish);
|
||||
});
|
||||
}
|
||||
|
||||
if ( ! err ) nextPipe(this);
|
||||
else {
|
||||
_.each(baking.req, function(r) {
|
||||
r.cb(err);
|
||||
});
|
||||
return true;
|
||||
}
|
||||
},
|
||||
function cleanup(err) {
|
||||
delete bakingExports[reqKey];
|
||||
|
||||
// unlink dump file (sync to avoid race condition)
|
||||
console.log("removing", dumpfile);
|
||||
try { fs.unlinkSync(dumpfile); }
|
||||
catch (e) {
|
||||
if ( e.code != 'ENOENT' ) {
|
||||
console.log("Could not unlink dumpfile " + dumpfile + ": " + e);
|
||||
}
|
||||
}
|
||||
}
|
||||
);
|
||||
})
|
||||
}
|
||||
return;
|
||||
};
|
||||
|
||||
// TODO: put in an ExportRequest.js ----- {
|
||||
|
||||
function ExportRequest(ostream, callback) {
|
||||
this.cb = callback;
|
||||
this.ostream = ostream;
|
||||
this.istream = null;
|
||||
this.canceled = false;
|
||||
|
||||
var that = this;
|
||||
|
||||
this.ostream.on('close', function() {
|
||||
//console.log("Request close event, qElem.stream is " + qElem.stream);
|
||||
that.canceled = true;
|
||||
if ( that.istream ) {
|
||||
that.istream.destroy();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
ExportRequest.prototype.sendFile = function (err, filename, callback) {
|
||||
var that = this;
|
||||
if ( ! this.canceled ) {
|
||||
//console.log("Creating readable stream out of dumpfile");
|
||||
this.istream = fs.createReadStream(filename)
|
||||
.on('open', function(fd) {
|
||||
that.istream.pipe(that.ostream);
|
||||
callback();
|
||||
})
|
||||
.on('error', function(e) {
|
||||
console.log("Can't send response: " + e);
|
||||
that.ostream.end();
|
||||
callback();
|
||||
});
|
||||
} else {
|
||||
//console.log("Response was canceled, not streaming the file");
|
||||
callback();
|
||||
}
|
||||
this.cb();
|
||||
}
|
||||
|
||||
//------ }
|
||||
|
||||
module.exports = ogr;
|
67
app/models/formats/pg.js
Normal file
67
app/models/formats/pg.js
Normal file
@ -0,0 +1,67 @@
|
||||
var Step = require('step')
|
||||
var PSQL = require(global.settings.app_root + '/app/models/psql')
|
||||
|
||||
function pg(id) { this.id = id; }
|
||||
|
||||
pg.prototype = {
|
||||
|
||||
getQuery: function(sql, options) {
|
||||
return sql;
|
||||
},
|
||||
|
||||
getContentType: function(){
|
||||
return this._contentType;
|
||||
},
|
||||
|
||||
getFileExtension: function() {
|
||||
return this.id;
|
||||
},
|
||||
|
||||
};
|
||||
|
||||
pg.prototype.sendResponse = function(opts, callback) {
|
||||
var sql = this.getQuery(opts.sql, {
|
||||
gn: opts.gn,
|
||||
dp: opts.dp,
|
||||
skipfields: opts.skipfields
|
||||
});
|
||||
|
||||
var that = this;
|
||||
|
||||
var start = Date.now();
|
||||
|
||||
Step (
|
||||
function sendQuery() {
|
||||
var client = new PSQL(opts.user_id, opts.database);
|
||||
client.query(sql, this);
|
||||
},
|
||||
function packageResults(err, result) {
|
||||
if (err) throw err;
|
||||
|
||||
if ( result && opts.skipfields.length ){
|
||||
for ( var i=0; i<result.rows.length; ++i ) {
|
||||
for ( var j=0; j<opts.skipfields.length; ++j ) {
|
||||
delete result.rows[i][opts.skipfields[j]];
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var end = Date.now();
|
||||
opts.total_time = (end - start)/1000;
|
||||
|
||||
that.transform(result, opts, this);
|
||||
},
|
||||
function sendResults(err, out){
|
||||
|
||||
if (err) throw err;
|
||||
|
||||
// return to browser
|
||||
if ( out ) opts.sink.send(out);
|
||||
},
|
||||
function errorHandle(err){
|
||||
callback(err);
|
||||
}
|
||||
);
|
||||
};
|
||||
|
||||
module.exports = pg;
|
@ -1,173 +1,30 @@
|
||||
|
||||
var crypto = require('crypto')
|
||||
var Step = require('step')
|
||||
var fs = require('fs')
|
||||
var _ = require('underscore')
|
||||
var PSQL = require(global.settings.app_root + '/app/models/psql')
|
||||
var spawn = require('child_process').spawn
|
||||
var crypto = require('crypto');
|
||||
var Step = require('step');
|
||||
var fs = require('fs');
|
||||
var spawn = require('child_process').spawn;
|
||||
var ogr = require('./ogr');
|
||||
|
||||
function shp() {
|
||||
}
|
||||
|
||||
shp.prototype = {
|
||||
shp.prototype = new ogr('shp');
|
||||
|
||||
id: "shp",
|
||||
var p = shp.prototype;
|
||||
|
||||
is_file: true,
|
||||
p._contentType = "application/zip; charset=utf-8";
|
||||
p._fileExtension = "zip";
|
||||
|
||||
getQuery: function(sql, options) {
|
||||
return null; // dont execute the query
|
||||
},
|
||||
|
||||
getContentType: function(){
|
||||
return "application/zip; charset=utf-8";
|
||||
},
|
||||
|
||||
getFileExtension: function() {
|
||||
return "zip"
|
||||
},
|
||||
|
||||
transform: function(result, options, callback) {
|
||||
throw "should not be called for file formats"
|
||||
},
|
||||
|
||||
getKey: function(options) {
|
||||
return [this.id,
|
||||
options.dbname,
|
||||
options.user_id,
|
||||
options.gn,
|
||||
generateMD5(options.sql)].concat(options.skipfields).join(':');
|
||||
},
|
||||
|
||||
generate: function(options, callback) {
|
||||
p.generate = function(options, callback) {
|
||||
var o = options;
|
||||
toSHP(o.database, o.user_id, o.gn, o.sql, o.skipfields, o.filename, callback);
|
||||
}
|
||||
|
||||
this.toSHP(o.database, o.user_id, o.gn, o.sql, o.skipfields, o.filename, callback);
|
||||
};
|
||||
|
||||
function generateMD5(data){
|
||||
var hash = crypto.createHash('md5');
|
||||
hash.update(data);
|
||||
return hash.digest('hex');
|
||||
}
|
||||
|
||||
|
||||
|
||||
// Return database username from user_id
|
||||
// NOTE: a "null" user_id is a request to use the public user
|
||||
function userid_to_dbuser(user_id) {
|
||||
if ( _.isString(user_id) )
|
||||
return _.template(global.settings.db_user, {user_id: user_id});
|
||||
return "publicuser" // FIXME: make configurable
|
||||
};
|
||||
|
||||
|
||||
|
||||
// Internal function usable by all OGR-driven outputs
|
||||
function toOGR(dbname, user_id, gcol, sql, skipfields, out_format, out_filename, callback) {
|
||||
var ogr2ogr = 'ogr2ogr'; // FIXME: make configurable
|
||||
var dbhost = global.settings.db_host;
|
||||
var dbport = global.settings.db_port;
|
||||
var dbuser = userid_to_dbuser(user_id);
|
||||
var dbpass = ''; // turn into a parameter..
|
||||
|
||||
var columns = [];
|
||||
|
||||
// Drop ending semicolon (ogr doens't like it)
|
||||
sql = sql.replace(/;\s*$/, '');
|
||||
|
||||
Step (
|
||||
|
||||
function fetchColumns() {
|
||||
var colsql = 'SELECT * FROM (' + sql + ') as _cartodbsqlapi LIMIT 1';
|
||||
var pg = new PSQL(user_id, dbname, 1, 0);
|
||||
pg.query(colsql, this);
|
||||
},
|
||||
function spawnDumper(err, result) {
|
||||
if (err) throw err;
|
||||
|
||||
//if ( ! result.rows.length ) throw new Error("Query returns no rows");
|
||||
|
||||
// Skip system columns
|
||||
if ( result.rows.length ) {
|
||||
for (var k in result.rows[0]) {
|
||||
if ( skipfields.indexOf(k) != -1 ) continue;
|
||||
if ( out_format != 'CSV' && k == "the_geom_webmercator" ) continue; // TODO: drop ?
|
||||
if ( out_format == 'CSV' ) columns.push('"' + k + '"::text');
|
||||
else columns.push('"' + k + '"');
|
||||
}
|
||||
} else columns.push('*');
|
||||
//console.log(columns.join(','));
|
||||
|
||||
var next = this;
|
||||
|
||||
sql = 'SELECT ' + columns.join(',')
|
||||
+ ' FROM (' + sql + ') as _cartodbsqlapi';
|
||||
|
||||
var child = spawn(ogr2ogr, [
|
||||
'-f', out_format,
|
||||
'-lco', 'ENCODING=UTF-8',
|
||||
'-lco', 'LINEFORMAT=CRLF',
|
||||
out_filename,
|
||||
"PG:host=" + dbhost
|
||||
+ " user=" + dbuser
|
||||
+ " dbname=" + dbname
|
||||
+ " password=" + dbpass
|
||||
+ " tables=fake" // trick to skip query to geometry_columns
|
||||
+ "",
|
||||
'-sql', sql
|
||||
]);
|
||||
|
||||
/*
|
||||
console.log(['ogr2ogr',
|
||||
'-f', '"'+out_format+'"',
|
||||
out_filename,
|
||||
"'PG:host=" + dbhost
|
||||
+ " user=" + dbuser
|
||||
+ " dbname=" + dbname
|
||||
+ " password=" + dbpass
|
||||
+ " tables=fake" // trick to skip query to geometry_columns
|
||||
+ "'",
|
||||
"-sql '", sql, "'"].join(' '));
|
||||
*/
|
||||
|
||||
var stdout = '';
|
||||
child.stdout.on('data', function(data) {
|
||||
stdout += data;
|
||||
//console.log('stdout: ' + data);
|
||||
});
|
||||
|
||||
var stderr;
|
||||
var logErrPat = new RegExp(/^ERROR/);
|
||||
child.stderr.on('data', function(data) {
|
||||
data = data.toString(); // know of a faster way ?
|
||||
// Store only the first ERROR line
|
||||
if ( ! stderr && data.match(logErrPat) ) stderr = data;
|
||||
console.log('ogr2ogr stderr: ' + data);
|
||||
});
|
||||
|
||||
child.on('exit', function(code) {
|
||||
if ( code ) {
|
||||
var emsg = stderr.split('\n')[0];
|
||||
// TODO: add more info about this error ?
|
||||
//if ( RegExp(/attempt to write non-.*geometry.*to.*type shapefile/i).exec(emsg) )
|
||||
next(new Error(emsg));
|
||||
} else {
|
||||
next(null);
|
||||
}
|
||||
});
|
||||
},
|
||||
function finish(err) {
|
||||
callback(err, out_filename);
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
function toSHP(dbname, user_id, gcol, sql, skipfields, filename, callback) {
|
||||
p.toSHP = function (dbname, user_id, gcol, sql, skipfields, filename, callback) {
|
||||
var fmtObj = this;
|
||||
var zip = 'zip'; // FIXME: make configurable
|
||||
var tmpdir = global.settings.tmpDir || '/tmp';
|
||||
var reqKey = [ 'shp', dbname, user_id, gcol, generateMD5(sql) ].concat(skipfields).join(':');
|
||||
var reqKey = [ 'shp', dbname, user_id, gcol, this.generateMD5(sql) ].concat(skipfields).join(':');
|
||||
var outdirpath = tmpdir + '/sqlapi-' + reqKey;
|
||||
var zipfile = outdirpath + '.zip';
|
||||
var shapefile = outdirpath + '/' + filename + '.shp';
|
||||
@ -181,7 +38,7 @@ function toSHP(dbname, user_id, gcol, sql, skipfields, filename, callback) {
|
||||
},
|
||||
function spawnDumper(err) {
|
||||
if ( err ) throw err;
|
||||
toOGR(dbname, user_id, gcol, sql, skipfields, 'ESRI Shapefile', shapefile, this);
|
||||
fmtObj.toOGR(dbname, user_id, gcol, sql, skipfields, 'ESRI Shapefile', shapefile, this);
|
||||
},
|
||||
function doZip(err) {
|
||||
if ( err ) throw err;
|
||||
@ -237,10 +94,8 @@ function toSHP(dbname, user_id, gcol, sql, skipfields, filename, callback) {
|
||||
});
|
||||
}
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
module.exports = new shp();
|
||||
module.exports.toOGR = toOGR;
|
||||
module.exports.generateMD5 = generateMD5
|
||||
module.exports = shp;
|
||||
|
||||
|
@ -1,19 +1,17 @@
|
||||
|
||||
var pg = require('./pg');
|
||||
var _ = require('underscore')
|
||||
|
||||
function svg() {
|
||||
}
|
||||
|
||||
var svg_width = 1024.0;
|
||||
var svg_height = 768.0;
|
||||
var svg_ratio = svg_width/svg_height;
|
||||
|
||||
svg.prototype = {
|
||||
function svg() {}
|
||||
|
||||
id: "svg",
|
||||
svg.prototype = new pg('svg');
|
||||
|
||||
var p = svg.prototype;
|
||||
|
||||
getQuery: function(sql, options) {
|
||||
p.getQuery = function(sql, options) {
|
||||
var gn = options.gn;
|
||||
var dp = options.dp;
|
||||
return 'WITH source AS ( ' + sql + '), extent AS ( '
|
||||
@ -35,20 +33,12 @@ svg.prototype = {
|
||||
+ '-x0, -y0, s, s), 0, ' + dp + ') as ' + gn
|
||||
//+ ', ex0, ey0, ew, eh, s ' // DEBUG ONLY
|
||||
+ ' FROM trans, extent_info, source';
|
||||
},
|
||||
};
|
||||
|
||||
getContentType: function(){
|
||||
return "image/svg+xml; charset=utf-8";
|
||||
},
|
||||
p._contentType = "image/svg+xml; charset=utf-8";
|
||||
|
||||
getFileExtension: function() {
|
||||
return this.id;
|
||||
},
|
||||
|
||||
transform: function(result, options, callback) {
|
||||
p.transform = function(result, options, callback) {
|
||||
toSVG(result.rows, options.gn, callback);
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
|
||||
@ -147,4 +137,4 @@ function toSVG(rows, gn, callback) {
|
||||
callback(null, out.join("\n"));
|
||||
}
|
||||
|
||||
module.exports = new svg();
|
||||
module.exports = svg;
|
||||
|
@ -1,26 +1,23 @@
|
||||
|
||||
var pg = require('./pg');
|
||||
var _ = require('underscore')
|
||||
var geojson = require('./geojson');
|
||||
var TopoJSON = require('topojson');
|
||||
|
||||
function topojson() {
|
||||
}
|
||||
function topojson() { }
|
||||
|
||||
topojson.prototype = {};
|
||||
topojson.prototype = new pg('topojson');
|
||||
|
||||
_.extend(topojson.prototype, geojson, {
|
||||
id: "topojson",
|
||||
var p = topojson.prototype;
|
||||
|
||||
getQuery: function(sql, options) {
|
||||
var sql = geojson.getQuery(sql, options);
|
||||
p.getQuery = function(sql, options) {
|
||||
var sql = geojson.prototype.getQuery(sql, options);
|
||||
return sql + ' where ' + options.gn + ' is not null';
|
||||
},
|
||||
};
|
||||
|
||||
transform: function(result, options, callback) {
|
||||
p.transform = function(result, options, callback) {
|
||||
toTopoJSON(result, options.gn, options.skipfields, callback);
|
||||
}
|
||||
|
||||
})
|
||||
};
|
||||
|
||||
function toTopoJSON(data, gn, skipfields, callback){
|
||||
geojson.toGeoJSON(data, gn, function(err, geojson) {
|
||||
@ -47,4 +44,4 @@ function toTopoJSON(data, gn, skipfields, callback){
|
||||
}
|
||||
|
||||
|
||||
module.exports = new topojson();
|
||||
module.exports = topojson;
|
||||
|
Loading…
Reference in New Issue
Block a user