Simplify throttling logic
This introduces a couple of changes in the parameter interpretation: copy_from_minimum_input_speed is now the total number of bytes in the interval (so we could multiply the currently configured values by the interval to preserve min. throughput. the minimum speed is now inclusive (only smaller values will cause an exception).
This commit is contained in:
parent
936b7804de
commit
f026121ad5
@ -9,33 +9,18 @@ module.exports = class Throttler extends Transform {
|
||||
this.pgstream = pgstream;
|
||||
|
||||
this.sampleLength = global.settings.copy_from_maximum_slow_input_speed_interval || 15;
|
||||
this.minimunBytesPerSecondThershold = global.settings.copy_from_minimum_input_speed || 0;
|
||||
this.minimunBytesPerSecondThreshold = global.settings.copy_from_minimum_input_speed || 0;
|
||||
this.byteCount = 0;
|
||||
this.bytesPerSecondHistory = [];
|
||||
|
||||
this._interval = setInterval(this._updateMetrics.bind(this), 1000);
|
||||
this._interval = setInterval(this._updateMetrics.bind(this), this.sampleLength*1000);
|
||||
}
|
||||
|
||||
_updateMetrics () {
|
||||
this.bytesPerSecondHistory.push(this.byteCount);
|
||||
this.byteCount = 0;
|
||||
|
||||
if (this.bytesPerSecondHistory.length > this.sampleLength) {
|
||||
this.bytesPerSecondHistory.shift();
|
||||
}
|
||||
|
||||
let doesNotReachThreshold = 0;
|
||||
|
||||
for (const bytesPerSecond of this.bytesPerSecondHistory) {
|
||||
if (bytesPerSecond <= this.minimunBytesPerSecondThershold) {
|
||||
doesNotReachThreshold += 1;
|
||||
}
|
||||
}
|
||||
|
||||
if (doesNotReachThreshold >= this.sampleLength) {
|
||||
if (this.byteCount < this.minimunBytesPerSecondThreshold) {
|
||||
clearInterval(this._interval);
|
||||
this.pgstream.emit('error', new Error('Connection closed by server: input data too slow'));
|
||||
}
|
||||
this.byteCount = 0;
|
||||
}
|
||||
|
||||
_transform (chunk, encoding, callback) {
|
||||
|
@ -8,15 +8,19 @@ const request = require('request');
|
||||
const assert = require('assert');
|
||||
const { Readable } = require('stream');
|
||||
|
||||
const createTableQuery = `CREATE TABLE copy_from_throttle AS (SELECT 0::integer AS counter)`;
|
||||
const createTableQuery = `CREATE TABLE copy_from_throttle AS (SELECT 0::text AS counter)`;
|
||||
const dropTableQuery = `DROP TABLE copy_from_throttle`;
|
||||
|
||||
function * counterGenerator (timeout) {
|
||||
function * counterGenerator (initialTimeout, timeout, max_count) {
|
||||
let counter = 0;
|
||||
let t = initialTimeout + timeout;
|
||||
|
||||
while (true) {
|
||||
yield new Promise(resolve => setTimeout(() => resolve(`${counter++}`), timeout)); // jshint ignore:line
|
||||
while (!max_count || counter <= max_count) {
|
||||
yield new Promise(resolve => setTimeout(() => resolve(`${counter++}`), t)); // jshint ignore:line
|
||||
t = timeout;
|
||||
}
|
||||
// generate also a delayed final marker (null) to simplify handling into a stream.
|
||||
yield new Promise(resolve => setTimeout(() => resolve(null), timeout));
|
||||
}
|
||||
|
||||
class CounterStream extends Readable {
|
||||
@ -27,14 +31,16 @@ class CounterStream extends Readable {
|
||||
|
||||
_read () {
|
||||
const res = this.generator.next();
|
||||
res.value.then(value => res.done ? this.push(null) : this.push(value));
|
||||
if (!res.done) {
|
||||
res.value.then(value => this.push(value));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
describe('COPY FROM throttle', function () {
|
||||
before(function() {
|
||||
this.copy_from_minimum_input_speed = global.settings.copy_from_minimum_input_speed;
|
||||
global.settings.copy_from_minimum_input_speed = 1;
|
||||
global.settings.copy_from_minimum_input_speed = 2;
|
||||
|
||||
this.copy_from_maximum_slow_input_speed_interval = global.settings.copy_from_maximum_slow_input_speed_interval;
|
||||
global.settings.copy_from_maximum_slow_input_speed_interval = 1;
|
||||
@ -107,7 +113,7 @@ describe('COPY FROM throttle', function () {
|
||||
this.listener.close(done);
|
||||
});
|
||||
|
||||
it('hang while sendind data', function (done) {
|
||||
it('hangs while sending data', function (done) {
|
||||
const { host, port } = this;
|
||||
|
||||
const copy = querystring.stringify({
|
||||
@ -118,7 +124,7 @@ describe('COPY FROM throttle', function () {
|
||||
const options = {
|
||||
url: `http://${host}:${port}/api/v1/sql/copyfrom?${copy}`,
|
||||
headers: { host: 'vizzuality.cartodb.com' },
|
||||
body: new CounterStream(counterGenerator(1000)),
|
||||
body: new CounterStream(counterGenerator(600)),
|
||||
method: 'POST'
|
||||
};
|
||||
|
||||
@ -134,4 +140,29 @@ describe('COPY FROM throttle', function () {
|
||||
done();
|
||||
});
|
||||
});
|
||||
|
||||
it('does not hang while sending data', function (done) {
|
||||
const { host, port } = this;
|
||||
|
||||
const copy = querystring.stringify({
|
||||
q: "COPY copy_from_throttle (counter) FROM STDIN WITH (FORMAT CSV, DELIMITER ',')",
|
||||
api_key: 1234
|
||||
});
|
||||
|
||||
const options = {
|
||||
url: `http://${host}:${port}/api/v1/sql/copyfrom?${copy}`,
|
||||
headers: { host: 'vizzuality.cartodb.com' },
|
||||
body: new CounterStream(counterGenerator(400, 7)),
|
||||
method: 'POST'
|
||||
};
|
||||
|
||||
request(options, (err, res, body) => {
|
||||
if (err) {
|
||||
return done(err);
|
||||
}
|
||||
assert.equal(res.statusCode, 200);
|
||||
|
||||
done();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
Loading…
Reference in New Issue
Block a user