Merge pull request #634 from CartoDB/2239-throttle-refactor

Simplify throttling logic
This commit is contained in:
Javier Goizueta 2019-11-20 13:12:43 +01:00 committed by GitHub
commit 92a86e811c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 41 additions and 27 deletions

View File

@ -8,34 +8,19 @@ module.exports = class Throttler extends Transform {
this.pgstream = pgstream;
this.sampleLength = global.settings.copy_from_maximum_slow_input_speed_interval || 15;
this.minimunBytesPerSecondThershold = global.settings.copy_from_minimum_input_speed || 0;
this.sampleSeconds = global.settings.copy_from_maximum_slow_input_speed_interval || 15;
this.minimunBytesPerSampleThreshold = global.settings.copy_from_minimum_input_speed || 0;
this.byteCount = 0;
this.bytesPerSecondHistory = [];
this._interval = setInterval(this._updateMetrics.bind(this), 1000);
this._interval = setInterval(this._updateMetrics.bind(this), this.sampleSeconds*1000);
}
_updateMetrics () {
this.bytesPerSecondHistory.push(this.byteCount);
this.byteCount = 0;
if (this.bytesPerSecondHistory.length > this.sampleLength) {
this.bytesPerSecondHistory.shift();
}
let doesNotReachThresholdCount = 0;
for (const bytesPerSecond of this.bytesPerSecondHistory) {
if (bytesPerSecond <= this.minimunBytesPerSecondThershold) {
doesNotReachThresholdCount += 1;
}
}
if (doesNotReachThresholdCount >= this.sampleLength) {
if (this.byteCount < this.minimunBytesPerSampleThreshold) {
clearInterval(this._interval);
this.pgstream.emit('error', new Error('Connection closed by server: input data too slow'));
}
this.byteCount = 0;
}
_transform (chunk, encoding, callback) {

View File

@ -8,15 +8,17 @@ const request = require('request');
const assert = require('assert');
const { Readable } = require('stream');
const createTableQuery = `CREATE TABLE copy_from_throttle AS (SELECT 0::integer AS counter)`;
const createTableQuery = `CREATE TABLE copy_from_throttle AS (SELECT 0::text AS counter)`;
const dropTableQuery = `DROP TABLE copy_from_throttle`;
function * counterGenerator (timeout) {
function * counterGenerator (timeout, max_count) {
let counter = 0;
while (true) {
while (!max_count || counter <= max_count) {
yield new Promise(resolve => setTimeout(() => resolve(`${counter++}`), timeout)); // jshint ignore:line
}
// generate also a delayed final marker (null) to simplify handling into a stream.
yield new Promise(resolve => setTimeout(() => resolve(null), timeout));
}
class CounterStream extends Readable {
@ -27,14 +29,16 @@ class CounterStream extends Readable {
_read () {
const res = this.generator.next();
res.value.then(value => res.done ? this.push(null) : this.push(value));
if (!res.done) {
res.value.then(value => this.push(value));
}
}
}
describe('COPY FROM throttle', function () {
before(function() {
this.copy_from_minimum_input_speed = global.settings.copy_from_minimum_input_speed;
global.settings.copy_from_minimum_input_speed = 1;
global.settings.copy_from_minimum_input_speed = 2;
this.copy_from_maximum_slow_input_speed_interval = global.settings.copy_from_maximum_slow_input_speed_interval;
global.settings.copy_from_maximum_slow_input_speed_interval = 1;
@ -107,7 +111,7 @@ describe('COPY FROM throttle', function () {
this.listener.close(done);
});
it('hang while sendind data', function (done) {
it('hangs while sending data', function (done) {
const { host, port } = this;
const copy = querystring.stringify({
@ -118,7 +122,7 @@ describe('COPY FROM throttle', function () {
const options = {
url: `http://${host}:${port}/api/v1/sql/copyfrom?${copy}`,
headers: { host: 'vizzuality.cartodb.com' },
body: new CounterStream(counterGenerator(1000)),
body: new CounterStream(counterGenerator(600)),
method: 'POST'
};
@ -134,4 +138,29 @@ describe('COPY FROM throttle', function () {
done();
});
});
it('does not hang while sending data', function (done) {
const { host, port } = this;
const copy = querystring.stringify({
q: "COPY copy_from_throttle (counter) FROM STDIN WITH (FORMAT CSV, DELIMITER ',')",
api_key: 1234
});
const options = {
url: `http://${host}:${port}/api/v1/sql/copyfrom?${copy}`,
headers: { host: 'vizzuality.cartodb.com' },
body: new CounterStream(counterGenerator(400, 7)),
method: 'POST'
};
request(options, (err, res) => {
if (err) {
return done(err);
}
assert.equal(res.statusCode, 200);
done();
});
});
});