diff --git a/lib/models/formats/ogr.js b/lib/models/formats/ogr.js index 3daa2cd0..b911c0f7 100644 --- a/lib/models/formats/ogr.js +++ b/lib/models/formats/ogr.js @@ -3,15 +3,13 @@ var serverOptions = require('./../../server-options'); var { logger } = serverOptions(); var crypto = require('crypto'); +var uuid = require('uuid'); var step = require('step'); var fs = require('fs'); var _ = require('underscore'); var PSQL = require('cartodb-psql'); var spawn = require('child_process').spawn; -// Keeps track of what's waiting baking for export -var bakingExports = {}; - function OgrFormat (id) { this.id = id; } @@ -34,15 +32,6 @@ OgrFormat.prototype = { getFileExtension: function () { return this._fileExtension; }, - getKey: function (options) { - return [this.id, - options.dbopts.dbname, - options.dbopts.user, - options.gn, - this.generateMD5(options.filename), - this.generateMD5(options.sql)].concat(options.skipfields).join(':'); - }, - generateMD5: function (data) { var hash = crypto.createHash('md5'); hash.update(data); @@ -243,7 +232,8 @@ OgrFormat.prototype.toOGR_SingleFile = function (options, fmt, callback) { userId, gcol, this.generateMD5(layername), - this.generateMD5(sql) + this.generateMD5(sql), + uuid.v4() ].concat(skipfields).join(':')); var outdirpath = tmpdir + '/sqlapi-' + process.pid + '-' + reqKey; var dumpfile = outdirpath + ':cartodb-query.' + ext; @@ -254,69 +244,38 @@ OgrFormat.prototype.toOGR_SingleFile = function (options, fmt, callback) { }; OgrFormat.prototype.sendResponse = function (opts, callback) { - // var next = callback; - var reqKey = this.getKey(opts); - var qElem = new ExportRequest(opts.sink, callback, opts.beforeSink); - var baking = bakingExports[reqKey]; + var exportRequest = new ExportRequest(opts.sink, callback, opts.beforeSink); - if (baking) { - logger.info({ custom: true, reqKey: reqKey }, 'Baking!'); - baking.req.push(qElem); - } else { - logger.info({ custom: true, reqKey: reqKey }, 'Not baking!'); + logger.info({ custom: true }, 'sendResponse'); - baking = bakingExports[reqKey] = { req: [qElem] }; - this.generate(opts, function (err, dumpfile) { - if (opts.profiler) { - opts.profiler.done('generate'); - } - step( - function sendResults () { - var nextPipe = function (finish) { - logger.info({ custom: true, size: baking.req.length }, 'Sending responses'); - var r = baking.req.shift(); - logger.info({ custom: true, size: baking.req.length }, 'Request shifted'); - if (!r) { - logger.info({ custom: true, size: baking.req.length }, 'There is no request'); - finish(null); - return; - } - logger.info({ custom: true, size: baking.req.length }, 'Sending file'); - r.sendFile(err, dumpfile, function () { - logger.info({ custom: true, size: baking.req.length }, 'Sending file callback'); - nextPipe(finish); - }); - }; - - if (!err) { - logger.info({ custom: true }, 'Next pipe'); - nextPipe(this); - } else { - _.each(baking.req, function (r) { - r.cb(err); - }); - return true; - } - }, - function cleanup (/* err */) { - logger.info({ custom: true, reqKey: reqKey, size: bakingExports[reqKey].req.length }, 'Deleting key'); - delete bakingExports[reqKey]; - - // unlink dump file (sync to avoid race condition) - console.log('removing', dumpfile); - try { - logger.info({ custom: true }, 'Unlink dumpfile'); - fs.unlinkSync(dumpfile); - } catch (e) { - if (e.code !== 'ENOENT') { - logger.info({ custom: true }, 'Error removing dumpfile'); - console.log('Could not unlink dumpfile ' + dumpfile + ': ' + e); - } + this.generate(opts, function (err, dumpfile) { + if (opts.profiler) { + opts.profiler.done('generate'); + } + step( + function sendResult () { + logger.info({ custom: true }, 'sendResult'); + if (err) { + logger.info({ custom: true }, 'sendResult: error'); + exportRequest.cb(err); + return true; + } + exportRequest.sendFile(dumpfile, this); + }, + function cleanup () { + logger.info({ custom: true }, 'cleanup'); + // unlink dump file (sync to avoid race condition) + console.log('removing', dumpfile); + try { + fs.unlinkSync(dumpfile); + } catch (e) { + if (e.code !== 'ENOENT') { + console.log('Could not unlink dumpfile ' + dumpfile + ': ' + e); } } - ); - }); - } + } + ); + }); }; function ExportRequest (ostream, callback, beforeSink) { @@ -325,76 +284,55 @@ function ExportRequest (ostream, callback, beforeSink) { this.ostream = ostream; this.istream = null; this.canceled = false; - this.ended = false; var that = this; - logger.info({ custom: true }, 'New export request'); this.ostream.on('close', function () { - logger.info({ custom: true }, 'Stream closed'); + logger.info({ custom: true }, 'close ExportRequest'); that.canceled = true; if (that.istream) { that.istream.destroy(); } - }).on('pipe', function(src) { - logger.info({ custom: true, readable: src.readable }, 'Something is piping into the ostream'); }); } -ExportRequest.prototype.sendFile = function (err, filename, callback) { - if (err) { - logger.info({ custom: true }, 'There is an error sending file'); - return callback(err); +ExportRequest.prototype.sendFile = function (filename, callback) { + if (this.canceled) { + logger.info({ custom: true }, 'sendFile canceled'); + return callback(); } - logger.info({ custom: true, destroyed: this.ostream.connection.destroyed }, 'sendFile'); + var that = this; - if (!this.canceled && !this.ostream.connection.destroyed) { - logger.info({ custom: true }, 'Not cancelled (sending file)'); - this.istream = fs.createReadStream(filename) - .on('open', function (/* fd */) { - if (that.beforeSink) { - logger.info({ custom: true }, 'Before Sink'); - that.beforeSink(); - } + this.istream = fs.createReadStream(filename) + .on('open', function () { + if (that.beforeSink) { + that.beforeSink(); + } - logger.info({ custom: true }, 'Sending file: on open'); + logger.info({ custom: true }, 'sendFile: open'); - that.istream - .pipe(that.ostream) - .on('end', () => { - logger.info({ custom: true }, 'Sending file: on open end'); - callback(); - that.cb(); - }) - .on('error', (err) => { - logger.info({ custom: true }, 'Sending file: on open error'); - callback(); - that.cb(err); - }); - }) - .on('error', function (e) { - logger.info({ custom: true }, 'Sending file: on error'); - console.log("Can't send response: " + e); - that.ended = true; - that.ostream.end(); - that.cb(e); - callback(); - }) - .on('end', () => { - logger.info({ custom: true }, 'Sending file: on end'); - that.ended = true; - that.cb(); - callback(); - }) - .on('close', function() { - logger.info({ custom: true }, 'Sending file: testing on close event'); - if (!that.ended) { + that.istream + .pipe(that.ostream) + .on('end', () => { + that.cb(); callback(); - } - }); - } else { - logger.info({ custom: true }, 'It was cancelled (sending file)'); - callback(); - } + }) + .on('error', (err) => { + that.cb(err); + callback(); + }); + }) + .on('error', function (e) { + logger.info({ custom: true }, 'sendFile: error'); + console.log("Can't send response: " + e); + that.ostream.end(); + that.cb(e); + callback(); + }) + .on('end', () => { + logger.info({ custom: true }, 'sendFile: end'); + that.cb(); + callback(); + }); }; module.exports = OgrFormat; diff --git a/lib/models/formats/ogr/csv.js b/lib/models/formats/ogr/csv.js index cbefb20b..c8000400 100644 --- a/lib/models/formats/ogr/csv.js +++ b/lib/models/formats/ogr/csv.js @@ -2,7 +2,6 @@ var Ogr = require('./../ogr'); var serverOptions = require('./../../../server-options'); -var { logger } = serverOptions(); function CsvFormat () {} @@ -12,8 +11,6 @@ CsvFormat.prototype._contentType = 'text/csv; charset=utf-8; header=present'; CsvFormat.prototype._fileExtension = 'csv'; CsvFormat.prototype.generate = function (options, callback) { - logger.info({ custom: true }, 'Generating CSV format'); - this.toOGR_SingleFile(options, 'CSV', callback); };