Merge pull request #633 from CartoDB/2239-throttle-bug

Fix COPY Throttling bug
This commit is contained in:
Javier Goizueta 2019-11-20 13:23:42 +01:00 committed by GitHub
commit 8d55dc8e10
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 41 additions and 27 deletions

View File

@ -8,34 +8,19 @@ module.exports = class Throttler extends Transform {
this.pgstream = pgstream; this.pgstream = pgstream;
this.sampleLength = global.settings.copy_from_maximum_slow_input_speed_interval || 15; this.sampleSeconds = global.settings.copy_from_maximum_slow_input_speed_interval || 15;
this.minimunBytesPerSecondThershold = global.settings.copy_from_minimum_input_speed || 0; this.minimunBytesPerSampleThreshold = global.settings.copy_from_minimum_input_speed || 0;
this.byteCount = 0; this.byteCount = 0;
this.bytesPerSecondHistory = [];
this._interval = setInterval(this._updateMetrics.bind(this), 1000); this._interval = setInterval(this._updateMetrics.bind(this), this.sampleSeconds*1000);
} }
_updateMetrics () { _updateMetrics () {
this.bytesPerSecondHistory.push(this.byteCount); if (this.byteCount < this.minimunBytesPerSampleThreshold) {
this.byteCount = 0;
if (this.bytesPerSecondHistory > this.sampleLength) {
this.bytesPerSecondHistory.shift();
}
const doesNotReachThreshold = [];
for (const bytesPerSecond of this.bytesPerSecondHistory) {
if (bytesPerSecond <= this.minimunBytesPerSecondThershold) {
doesNotReachThreshold.push(true);
}
}
if (doesNotReachThreshold.length >= this.sampleLength) {
clearInterval(this._interval); clearInterval(this._interval);
this.pgstream.emit('error', new Error('Connection closed by server: input data too slow')); this.pgstream.emit('error', new Error('Connection closed by server: input data too slow'));
} }
this.byteCount = 0;
} }
_transform (chunk, encoding, callback) { _transform (chunk, encoding, callback) {

View File

@ -8,15 +8,17 @@ const request = require('request');
const assert = require('assert'); const assert = require('assert');
const { Readable } = require('stream'); const { Readable } = require('stream');
const createTableQuery = `CREATE TABLE copy_from_throttle AS (SELECT 0::integer AS counter)`; const createTableQuery = `CREATE TABLE copy_from_throttle AS (SELECT 0::text AS counter)`;
const dropTableQuery = `DROP TABLE copy_from_throttle`; const dropTableQuery = `DROP TABLE copy_from_throttle`;
function * counterGenerator (timeout) { function * counterGenerator (timeout, max_count) {
let counter = 0; let counter = 0;
while (true) { while (!max_count || counter <= max_count) {
yield new Promise(resolve => setTimeout(() => resolve(`${counter++}`), timeout)); // jshint ignore:line yield new Promise(resolve => setTimeout(() => resolve(`${counter++}`), timeout)); // jshint ignore:line
} }
// generate also a delayed final marker (null) to simplify handling into a stream.
yield new Promise(resolve => setTimeout(() => resolve(null), timeout));
} }
class CounterStream extends Readable { class CounterStream extends Readable {
@ -27,14 +29,16 @@ class CounterStream extends Readable {
_read () { _read () {
const res = this.generator.next(); const res = this.generator.next();
res.value.then(value => res.done ? this.push(null) : this.push(value)); if (!res.done) {
res.value.then(value => this.push(value));
}
} }
} }
describe('COPY FROM throttle', function () { describe('COPY FROM throttle', function () {
before(function() { before(function() {
this.copy_from_minimum_input_speed = global.settings.copy_from_minimum_input_speed; this.copy_from_minimum_input_speed = global.settings.copy_from_minimum_input_speed;
global.settings.copy_from_minimum_input_speed = 1; global.settings.copy_from_minimum_input_speed = 2;
this.copy_from_maximum_slow_input_speed_interval = global.settings.copy_from_maximum_slow_input_speed_interval; this.copy_from_maximum_slow_input_speed_interval = global.settings.copy_from_maximum_slow_input_speed_interval;
global.settings.copy_from_maximum_slow_input_speed_interval = 1; global.settings.copy_from_maximum_slow_input_speed_interval = 1;
@ -107,7 +111,7 @@ describe('COPY FROM throttle', function () {
this.listener.close(done); this.listener.close(done);
}); });
it('hang while sendind data', function (done) { it('hangs while sending data', function (done) {
const { host, port } = this; const { host, port } = this;
const copy = querystring.stringify({ const copy = querystring.stringify({
@ -118,7 +122,7 @@ describe('COPY FROM throttle', function () {
const options = { const options = {
url: `http://${host}:${port}/api/v1/sql/copyfrom?${copy}`, url: `http://${host}:${port}/api/v1/sql/copyfrom?${copy}`,
headers: { host: 'vizzuality.cartodb.com' }, headers: { host: 'vizzuality.cartodb.com' },
body: new CounterStream(counterGenerator(1000)), body: new CounterStream(counterGenerator(600)),
method: 'POST' method: 'POST'
}; };
@ -134,4 +138,29 @@ describe('COPY FROM throttle', function () {
done(); done();
}); });
}); });
it('does not hang while sending data', function (done) {
const { host, port } = this;
const copy = querystring.stringify({
q: "COPY copy_from_throttle (counter) FROM STDIN WITH (FORMAT CSV, DELIMITER ',')",
api_key: 1234
});
const options = {
url: `http://${host}:${port}/api/v1/sql/copyfrom?${copy}`,
headers: { host: 'vizzuality.cartodb.com' },
body: new CounterStream(counterGenerator(400, 7)),
method: 'POST'
};
request(options, (err, res) => {
if (err) {
return done(err);
}
assert.equal(res.statusCode, 200);
done();
});
});
}); });