Remove 'baking' logic
This commit is contained in:
parent
8c35f8912c
commit
cf74ef1d17
@ -3,15 +3,13 @@
|
||||
var serverOptions = require('./../../server-options');
|
||||
var { logger } = serverOptions();
|
||||
var crypto = require('crypto');
|
||||
var uuid = require('uuid');
|
||||
var step = require('step');
|
||||
var fs = require('fs');
|
||||
var _ = require('underscore');
|
||||
var PSQL = require('cartodb-psql');
|
||||
var spawn = require('child_process').spawn;
|
||||
|
||||
// Keeps track of what's waiting baking for export
|
||||
var bakingExports = {};
|
||||
|
||||
function OgrFormat (id) {
|
||||
this.id = id;
|
||||
}
|
||||
@ -34,15 +32,6 @@ OgrFormat.prototype = {
|
||||
|
||||
getFileExtension: function () { return this._fileExtension; },
|
||||
|
||||
getKey: function (options) {
|
||||
return [this.id,
|
||||
options.dbopts.dbname,
|
||||
options.dbopts.user,
|
||||
options.gn,
|
||||
this.generateMD5(options.filename),
|
||||
this.generateMD5(options.sql)].concat(options.skipfields).join(':');
|
||||
},
|
||||
|
||||
generateMD5: function (data) {
|
||||
var hash = crypto.createHash('md5');
|
||||
hash.update(data);
|
||||
@ -243,7 +232,8 @@ OgrFormat.prototype.toOGR_SingleFile = function (options, fmt, callback) {
|
||||
userId,
|
||||
gcol,
|
||||
this.generateMD5(layername),
|
||||
this.generateMD5(sql)
|
||||
this.generateMD5(sql),
|
||||
uuid.v4()
|
||||
].concat(skipfields).join(':'));
|
||||
var outdirpath = tmpdir + '/sqlapi-' + process.pid + '-' + reqKey;
|
||||
var dumpfile = outdirpath + ':cartodb-query.' + ext;
|
||||
@ -254,69 +244,38 @@ OgrFormat.prototype.toOGR_SingleFile = function (options, fmt, callback) {
|
||||
};
|
||||
|
||||
OgrFormat.prototype.sendResponse = function (opts, callback) {
|
||||
// var next = callback;
|
||||
var reqKey = this.getKey(opts);
|
||||
var qElem = new ExportRequest(opts.sink, callback, opts.beforeSink);
|
||||
var baking = bakingExports[reqKey];
|
||||
var exportRequest = new ExportRequest(opts.sink, callback, opts.beforeSink);
|
||||
|
||||
if (baking) {
|
||||
logger.info({ custom: true, reqKey: reqKey }, 'Baking!');
|
||||
baking.req.push(qElem);
|
||||
} else {
|
||||
logger.info({ custom: true, reqKey: reqKey }, 'Not baking!');
|
||||
logger.info({ custom: true }, 'sendResponse');
|
||||
|
||||
baking = bakingExports[reqKey] = { req: [qElem] };
|
||||
this.generate(opts, function (err, dumpfile) {
|
||||
if (opts.profiler) {
|
||||
opts.profiler.done('generate');
|
||||
}
|
||||
step(
|
||||
function sendResults () {
|
||||
var nextPipe = function (finish) {
|
||||
logger.info({ custom: true, size: baking.req.length }, 'Sending responses');
|
||||
var r = baking.req.shift();
|
||||
logger.info({ custom: true, size: baking.req.length }, 'Request shifted');
|
||||
if (!r) {
|
||||
logger.info({ custom: true, size: baking.req.length }, 'There is no request');
|
||||
finish(null);
|
||||
return;
|
||||
}
|
||||
logger.info({ custom: true, size: baking.req.length }, 'Sending file');
|
||||
r.sendFile(err, dumpfile, function () {
|
||||
logger.info({ custom: true, size: baking.req.length }, 'Sending file callback');
|
||||
nextPipe(finish);
|
||||
});
|
||||
};
|
||||
|
||||
if (!err) {
|
||||
logger.info({ custom: true }, 'Next pipe');
|
||||
nextPipe(this);
|
||||
} else {
|
||||
_.each(baking.req, function (r) {
|
||||
r.cb(err);
|
||||
});
|
||||
return true;
|
||||
}
|
||||
},
|
||||
function cleanup (/* err */) {
|
||||
logger.info({ custom: true, reqKey: reqKey, size: bakingExports[reqKey].req.length }, 'Deleting key');
|
||||
delete bakingExports[reqKey];
|
||||
|
||||
// unlink dump file (sync to avoid race condition)
|
||||
console.log('removing', dumpfile);
|
||||
try {
|
||||
logger.info({ custom: true }, 'Unlink dumpfile');
|
||||
fs.unlinkSync(dumpfile);
|
||||
} catch (e) {
|
||||
if (e.code !== 'ENOENT') {
|
||||
logger.info({ custom: true }, 'Error removing dumpfile');
|
||||
console.log('Could not unlink dumpfile ' + dumpfile + ': ' + e);
|
||||
}
|
||||
this.generate(opts, function (err, dumpfile) {
|
||||
if (opts.profiler) {
|
||||
opts.profiler.done('generate');
|
||||
}
|
||||
step(
|
||||
function sendResult () {
|
||||
logger.info({ custom: true }, 'sendResult');
|
||||
if (err) {
|
||||
logger.info({ custom: true }, 'sendResult: error');
|
||||
exportRequest.cb(err);
|
||||
return true;
|
||||
}
|
||||
exportRequest.sendFile(dumpfile, this);
|
||||
},
|
||||
function cleanup () {
|
||||
logger.info({ custom: true }, 'cleanup');
|
||||
// unlink dump file (sync to avoid race condition)
|
||||
console.log('removing', dumpfile);
|
||||
try {
|
||||
fs.unlinkSync(dumpfile);
|
||||
} catch (e) {
|
||||
if (e.code !== 'ENOENT') {
|
||||
console.log('Could not unlink dumpfile ' + dumpfile + ': ' + e);
|
||||
}
|
||||
}
|
||||
);
|
||||
});
|
||||
}
|
||||
}
|
||||
);
|
||||
});
|
||||
};
|
||||
|
||||
function ExportRequest (ostream, callback, beforeSink) {
|
||||
@ -325,76 +284,55 @@ function ExportRequest (ostream, callback, beforeSink) {
|
||||
this.ostream = ostream;
|
||||
this.istream = null;
|
||||
this.canceled = false;
|
||||
this.ended = false;
|
||||
|
||||
var that = this;
|
||||
logger.info({ custom: true }, 'New export request');
|
||||
this.ostream.on('close', function () {
|
||||
logger.info({ custom: true }, 'Stream closed');
|
||||
logger.info({ custom: true }, 'close ExportRequest');
|
||||
that.canceled = true;
|
||||
if (that.istream) {
|
||||
that.istream.destroy();
|
||||
}
|
||||
}).on('pipe', function(src) {
|
||||
logger.info({ custom: true, readable: src.readable }, 'Something is piping into the ostream');
|
||||
});
|
||||
}
|
||||
|
||||
ExportRequest.prototype.sendFile = function (err, filename, callback) {
|
||||
if (err) {
|
||||
logger.info({ custom: true }, 'There is an error sending file');
|
||||
return callback(err);
|
||||
ExportRequest.prototype.sendFile = function (filename, callback) {
|
||||
if (this.canceled) {
|
||||
logger.info({ custom: true }, 'sendFile canceled');
|
||||
return callback();
|
||||
}
|
||||
logger.info({ custom: true, destroyed: this.ostream.connection.destroyed }, 'sendFile');
|
||||
|
||||
var that = this;
|
||||
if (!this.canceled && !this.ostream.connection.destroyed) {
|
||||
logger.info({ custom: true }, 'Not cancelled (sending file)');
|
||||
this.istream = fs.createReadStream(filename)
|
||||
.on('open', function (/* fd */) {
|
||||
if (that.beforeSink) {
|
||||
logger.info({ custom: true }, 'Before Sink');
|
||||
that.beforeSink();
|
||||
}
|
||||
this.istream = fs.createReadStream(filename)
|
||||
.on('open', function () {
|
||||
if (that.beforeSink) {
|
||||
that.beforeSink();
|
||||
}
|
||||
|
||||
logger.info({ custom: true }, 'Sending file: on open');
|
||||
logger.info({ custom: true }, 'sendFile: open');
|
||||
|
||||
that.istream
|
||||
.pipe(that.ostream)
|
||||
.on('end', () => {
|
||||
logger.info({ custom: true }, 'Sending file: on open end');
|
||||
callback();
|
||||
that.cb();
|
||||
})
|
||||
.on('error', (err) => {
|
||||
logger.info({ custom: true }, 'Sending file: on open error');
|
||||
callback();
|
||||
that.cb(err);
|
||||
});
|
||||
})
|
||||
.on('error', function (e) {
|
||||
logger.info({ custom: true }, 'Sending file: on error');
|
||||
console.log("Can't send response: " + e);
|
||||
that.ended = true;
|
||||
that.ostream.end();
|
||||
that.cb(e);
|
||||
callback();
|
||||
})
|
||||
.on('end', () => {
|
||||
logger.info({ custom: true }, 'Sending file: on end');
|
||||
that.ended = true;
|
||||
that.cb();
|
||||
callback();
|
||||
})
|
||||
.on('close', function() {
|
||||
logger.info({ custom: true }, 'Sending file: testing on close event');
|
||||
if (!that.ended) {
|
||||
that.istream
|
||||
.pipe(that.ostream)
|
||||
.on('end', () => {
|
||||
that.cb();
|
||||
callback();
|
||||
}
|
||||
});
|
||||
} else {
|
||||
logger.info({ custom: true }, 'It was cancelled (sending file)');
|
||||
callback();
|
||||
}
|
||||
})
|
||||
.on('error', (err) => {
|
||||
that.cb(err);
|
||||
callback();
|
||||
});
|
||||
})
|
||||
.on('error', function (e) {
|
||||
logger.info({ custom: true }, 'sendFile: error');
|
||||
console.log("Can't send response: " + e);
|
||||
that.ostream.end();
|
||||
that.cb(e);
|
||||
callback();
|
||||
})
|
||||
.on('end', () => {
|
||||
logger.info({ custom: true }, 'sendFile: end');
|
||||
that.cb();
|
||||
callback();
|
||||
});
|
||||
};
|
||||
|
||||
module.exports = OgrFormat;
|
||||
|
@ -2,7 +2,6 @@
|
||||
|
||||
var Ogr = require('./../ogr');
|
||||
var serverOptions = require('./../../../server-options');
|
||||
var { logger } = serverOptions();
|
||||
|
||||
function CsvFormat () {}
|
||||
|
||||
@ -12,8 +11,6 @@ CsvFormat.prototype._contentType = 'text/csv; charset=utf-8; header=present';
|
||||
CsvFormat.prototype._fileExtension = 'csv';
|
||||
|
||||
CsvFormat.prototype.generate = function (options, callback) {
|
||||
logger.info({ custom: true }, 'Generating CSV format');
|
||||
|
||||
this.toOGR_SingleFile(options, 'CSV', callback);
|
||||
};
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user