Merge pull request #680 from CartoDB/feature/ch139802/phl-inconsistent-errors-when-downloading
[ch139802] Avoid concurrent requests from getting stuck when exporting file formats handled by OGR
This commit is contained in:
commit
63e3f64fdf
@ -1,15 +1,12 @@
|
|||||||
'use strict';
|
'use strict';
|
||||||
|
|
||||||
var crypto = require('crypto');
|
var crypto = require('crypto');
|
||||||
|
var uuid = require('uuid');
|
||||||
var step = require('step');
|
var step = require('step');
|
||||||
var fs = require('fs');
|
var fs = require('fs');
|
||||||
var _ = require('underscore');
|
|
||||||
var PSQL = require('cartodb-psql');
|
var PSQL = require('cartodb-psql');
|
||||||
var spawn = require('child_process').spawn;
|
var spawn = require('child_process').spawn;
|
||||||
|
|
||||||
// Keeps track of what's waiting baking for export
|
|
||||||
var bakingExports = {};
|
|
||||||
|
|
||||||
function OgrFormat (id) {
|
function OgrFormat (id) {
|
||||||
this.id = id;
|
this.id = id;
|
||||||
}
|
}
|
||||||
@ -32,15 +29,6 @@ OgrFormat.prototype = {
|
|||||||
|
|
||||||
getFileExtension: function () { return this._fileExtension; },
|
getFileExtension: function () { return this._fileExtension; },
|
||||||
|
|
||||||
getKey: function (options) {
|
|
||||||
return [this.id,
|
|
||||||
options.dbopts.dbname,
|
|
||||||
options.dbopts.user,
|
|
||||||
options.gn,
|
|
||||||
this.generateMD5(options.filename),
|
|
||||||
this.generateMD5(options.sql)].concat(options.skipfields).join(':');
|
|
||||||
},
|
|
||||||
|
|
||||||
generateMD5: function (data) {
|
generateMD5: function (data) {
|
||||||
var hash = crypto.createHash('md5');
|
var hash = crypto.createHash('md5');
|
||||||
hash.update(data);
|
hash.update(data);
|
||||||
@ -241,7 +229,8 @@ OgrFormat.prototype.toOGR_SingleFile = function (options, fmt, callback) {
|
|||||||
userId,
|
userId,
|
||||||
gcol,
|
gcol,
|
||||||
this.generateMD5(layername),
|
this.generateMD5(layername),
|
||||||
this.generateMD5(sql)
|
this.generateMD5(sql),
|
||||||
|
uuid.v4()
|
||||||
].concat(skipfields).join(':'));
|
].concat(skipfields).join(':'));
|
||||||
var outdirpath = tmpdir + '/sqlapi-' + process.pid + '-' + reqKey;
|
var outdirpath = tmpdir + '/sqlapi-' + process.pid + '-' + reqKey;
|
||||||
var dumpfile = outdirpath + ':cartodb-query.' + ext;
|
var dumpfile = outdirpath + ':cartodb-query.' + ext;
|
||||||
@ -252,51 +241,31 @@ OgrFormat.prototype.toOGR_SingleFile = function (options, fmt, callback) {
|
|||||||
};
|
};
|
||||||
|
|
||||||
OgrFormat.prototype.sendResponse = function (opts, callback) {
|
OgrFormat.prototype.sendResponse = function (opts, callback) {
|
||||||
// var next = callback;
|
var exportRequest = new ExportRequest(opts.sink, callback, opts.beforeSink);
|
||||||
var reqKey = this.getKey(opts);
|
|
||||||
var qElem = new ExportRequest(opts.sink, callback, opts.beforeSink);
|
|
||||||
var baking = bakingExports[reqKey];
|
|
||||||
if (baking) {
|
|
||||||
baking.req.push(qElem);
|
|
||||||
} else {
|
|
||||||
baking = bakingExports[reqKey] = { req: [qElem] };
|
|
||||||
this.generate(opts, function (err, dumpfile) {
|
|
||||||
if (opts.profiler) {
|
|
||||||
opts.profiler.done('generate');
|
|
||||||
}
|
|
||||||
step(
|
|
||||||
function sendResults () {
|
|
||||||
var nextPipe = function (finish) {
|
|
||||||
var r = baking.req.shift();
|
|
||||||
if (!r) { finish(null); return; }
|
|
||||||
r.sendFile(err, dumpfile, function () {
|
|
||||||
nextPipe(finish);
|
|
||||||
});
|
|
||||||
};
|
|
||||||
|
|
||||||
if (!err) {
|
this.generate(opts, function (err, dumpfile) {
|
||||||
nextPipe(this);
|
if (opts.profiler) {
|
||||||
} else {
|
opts.profiler.done('generate');
|
||||||
_.each(baking.req, function (r) {
|
}
|
||||||
r.cb(err);
|
step(
|
||||||
});
|
function sendResult () {
|
||||||
return true;
|
if (err) {
|
||||||
}
|
exportRequest.cb(err);
|
||||||
},
|
return true;
|
||||||
function cleanup (/* err */) {
|
}
|
||||||
delete bakingExports[reqKey];
|
exportRequest.sendFile(dumpfile, this);
|
||||||
|
},
|
||||||
// unlink dump file (sync to avoid race condition)
|
function cleanup () {
|
||||||
console.log('removing', dumpfile);
|
// unlink dump file (sync to avoid race condition)
|
||||||
try { fs.unlinkSync(dumpfile); } catch (e) {
|
console.log('removing', dumpfile);
|
||||||
if (e.code !== 'ENOENT') {
|
try { fs.unlinkSync(dumpfile); } catch (e) {
|
||||||
console.log('Could not unlink dumpfile ' + dumpfile + ': ' + e);
|
if (e.code !== 'ENOENT') {
|
||||||
}
|
console.log('Could not unlink dumpfile ' + dumpfile + ': ' + e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
);
|
}
|
||||||
});
|
);
|
||||||
}
|
});
|
||||||
};
|
};
|
||||||
|
|
||||||
function ExportRequest (ostream, callback, beforeSink) {
|
function ExportRequest (ostream, callback, beforeSink) {
|
||||||
@ -305,6 +274,7 @@ function ExportRequest (ostream, callback, beforeSink) {
|
|||||||
this.ostream = ostream;
|
this.ostream = ostream;
|
||||||
this.istream = null;
|
this.istream = null;
|
||||||
this.canceled = false;
|
this.canceled = false;
|
||||||
|
this.read = false;
|
||||||
|
|
||||||
var that = this;
|
var that = this;
|
||||||
|
|
||||||
@ -316,42 +286,46 @@ function ExportRequest (ostream, callback, beforeSink) {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
ExportRequest.prototype.sendFile = function (err, filename, callback) {
|
ExportRequest.prototype.sendFile = function (filename, callback) {
|
||||||
if (err) {
|
if (this.canceled) {
|
||||||
return callback(err);
|
return callback();
|
||||||
}
|
}
|
||||||
|
|
||||||
var that = this;
|
var that = this;
|
||||||
if (!this.canceled) {
|
this.istream = fs.createReadStream(filename)
|
||||||
this.istream = fs.createReadStream(filename)
|
.on('open', function () {
|
||||||
.on('open', function (/* fd */) {
|
if (that.beforeSink) {
|
||||||
if (that.beforeSink) {
|
that.beforeSink();
|
||||||
that.beforeSink();
|
}
|
||||||
}
|
|
||||||
that.istream
|
that.istream
|
||||||
.pipe(that.ostream)
|
.pipe(that.ostream)
|
||||||
.on('end', () => {
|
.on('end', () => {
|
||||||
callback();
|
that.cb();
|
||||||
that.cb();
|
callback();
|
||||||
})
|
})
|
||||||
.on('error', (err) => {
|
.on('error', (err) => {
|
||||||
callback();
|
that.cb(err);
|
||||||
that.cb(err);
|
callback();
|
||||||
});
|
});
|
||||||
})
|
})
|
||||||
.on('error', function (e) {
|
.on('error', function (e) {
|
||||||
console.log("Can't send response: " + e);
|
console.log("Can't send response: " + e);
|
||||||
that.ostream.end();
|
that.read = true;
|
||||||
that.cb(e);
|
that.ostream.end();
|
||||||
|
that.cb(e);
|
||||||
|
callback();
|
||||||
|
})
|
||||||
|
.on('end', () => {
|
||||||
|
that.read = true;
|
||||||
|
that.cb();
|
||||||
|
callback();
|
||||||
|
})
|
||||||
|
.on('close', () => {
|
||||||
|
if (!that.read) { // NOTE: Covering 304 responses, when the 'ostream' is closed after 'pipe'
|
||||||
callback();
|
callback();
|
||||||
})
|
}
|
||||||
.on('end', () => {
|
});
|
||||||
that.cb();
|
|
||||||
callback();
|
|
||||||
});
|
|
||||||
} else {
|
|
||||||
callback();
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
|
|
||||||
module.exports = OgrFormat;
|
module.exports = OgrFormat;
|
||||||
|
Loading…
Reference in New Issue
Block a user