From f026121ad58a3b71161f87059f43fee0a79d087d Mon Sep 17 00:00:00 2001 From: Javier Goizueta Date: Wed, 20 Nov 2019 12:09:06 +0100 Subject: [PATCH 1/4] Simplify throttling logic This introduces a couple of changes in the parameter interpretation: copy_from_minimum_input_speed is now the total number of bytes in the interval (so we could multiply the currently configured values by the interval to preserve min. throughput. the minimum speed is now inclusive (only smaller values will cause an exception). --- lib/services/throttler-stream.js | 23 +++---------- test/acceptance/copy-throttle-test.js | 47 ++++++++++++++++++++++----- 2 files changed, 43 insertions(+), 27 deletions(-) diff --git a/lib/services/throttler-stream.js b/lib/services/throttler-stream.js index 7e94f39b..75e4d41a 100644 --- a/lib/services/throttler-stream.js +++ b/lib/services/throttler-stream.js @@ -9,33 +9,18 @@ module.exports = class Throttler extends Transform { this.pgstream = pgstream; this.sampleLength = global.settings.copy_from_maximum_slow_input_speed_interval || 15; - this.minimunBytesPerSecondThershold = global.settings.copy_from_minimum_input_speed || 0; + this.minimunBytesPerSecondThreshold = global.settings.copy_from_minimum_input_speed || 0; this.byteCount = 0; - this.bytesPerSecondHistory = []; - this._interval = setInterval(this._updateMetrics.bind(this), 1000); + this._interval = setInterval(this._updateMetrics.bind(this), this.sampleLength*1000); } _updateMetrics () { - this.bytesPerSecondHistory.push(this.byteCount); - this.byteCount = 0; - - if (this.bytesPerSecondHistory.length > this.sampleLength) { - this.bytesPerSecondHistory.shift(); - } - - let doesNotReachThreshold = 0; - - for (const bytesPerSecond of this.bytesPerSecondHistory) { - if (bytesPerSecond <= this.minimunBytesPerSecondThershold) { - doesNotReachThreshold += 1; - } - } - - if (doesNotReachThreshold >= this.sampleLength) { + if (this.byteCount < this.minimunBytesPerSecondThreshold) { clearInterval(this._interval); this.pgstream.emit('error', new Error('Connection closed by server: input data too slow')); } + this.byteCount = 0; } _transform (chunk, encoding, callback) { diff --git a/test/acceptance/copy-throttle-test.js b/test/acceptance/copy-throttle-test.js index 8ff69fcd..92d76910 100644 --- a/test/acceptance/copy-throttle-test.js +++ b/test/acceptance/copy-throttle-test.js @@ -8,15 +8,19 @@ const request = require('request'); const assert = require('assert'); const { Readable } = require('stream'); -const createTableQuery = `CREATE TABLE copy_from_throttle AS (SELECT 0::integer AS counter)`; +const createTableQuery = `CREATE TABLE copy_from_throttle AS (SELECT 0::text AS counter)`; const dropTableQuery = `DROP TABLE copy_from_throttle`; -function * counterGenerator (timeout) { +function * counterGenerator (initialTimeout, timeout, max_count) { let counter = 0; + let t = initialTimeout + timeout; - while (true) { - yield new Promise(resolve => setTimeout(() => resolve(`${counter++}`), timeout)); // jshint ignore:line + while (!max_count || counter <= max_count) { + yield new Promise(resolve => setTimeout(() => resolve(`${counter++}`), t)); // jshint ignore:line + t = timeout; } + // generate also a delayed final marker (null) to simplify handling into a stream. + yield new Promise(resolve => setTimeout(() => resolve(null), timeout)); } class CounterStream extends Readable { @@ -27,14 +31,16 @@ class CounterStream extends Readable { _read () { const res = this.generator.next(); - res.value.then(value => res.done ? this.push(null) : this.push(value)); + if (!res.done) { + res.value.then(value => this.push(value)); + } } } describe('COPY FROM throttle', function () { before(function() { this.copy_from_minimum_input_speed = global.settings.copy_from_minimum_input_speed; - global.settings.copy_from_minimum_input_speed = 1; + global.settings.copy_from_minimum_input_speed = 2; this.copy_from_maximum_slow_input_speed_interval = global.settings.copy_from_maximum_slow_input_speed_interval; global.settings.copy_from_maximum_slow_input_speed_interval = 1; @@ -107,7 +113,7 @@ describe('COPY FROM throttle', function () { this.listener.close(done); }); - it('hang while sendind data', function (done) { + it('hangs while sending data', function (done) { const { host, port } = this; const copy = querystring.stringify({ @@ -118,7 +124,7 @@ describe('COPY FROM throttle', function () { const options = { url: `http://${host}:${port}/api/v1/sql/copyfrom?${copy}`, headers: { host: 'vizzuality.cartodb.com' }, - body: new CounterStream(counterGenerator(1000)), + body: new CounterStream(counterGenerator(600)), method: 'POST' }; @@ -134,4 +140,29 @@ describe('COPY FROM throttle', function () { done(); }); }); + + it('does not hang while sending data', function (done) { + const { host, port } = this; + + const copy = querystring.stringify({ + q: "COPY copy_from_throttle (counter) FROM STDIN WITH (FORMAT CSV, DELIMITER ',')", + api_key: 1234 + }); + + const options = { + url: `http://${host}:${port}/api/v1/sql/copyfrom?${copy}`, + headers: { host: 'vizzuality.cartodb.com' }, + body: new CounterStream(counterGenerator(400, 7)), + method: 'POST' + }; + + request(options, (err, res, body) => { + if (err) { + return done(err); + } + assert.equal(res.statusCode, 200); + + done(); + }); + }); }); From 1767384746d0226f2bd4285d99e57b3f65ec0f13 Mon Sep 17 00:00:00 2001 From: Javier Goizueta Date: Wed, 20 Nov 2019 12:57:18 +0100 Subject: [PATCH 2/4] Rename vars --- lib/services/throttler-stream.js | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/services/throttler-stream.js b/lib/services/throttler-stream.js index 75e4d41a..9f326ea7 100644 --- a/lib/services/throttler-stream.js +++ b/lib/services/throttler-stream.js @@ -8,15 +8,15 @@ module.exports = class Throttler extends Transform { this.pgstream = pgstream; - this.sampleLength = global.settings.copy_from_maximum_slow_input_speed_interval || 15; - this.minimunBytesPerSecondThreshold = global.settings.copy_from_minimum_input_speed || 0; + this.sampleSeconds = global.settings.copy_from_maximum_slow_input_speed_interval || 15; + this.minimunBytesPerSampleThreshold = global.settings.copy_from_minimum_input_speed || 0; this.byteCount = 0; - this._interval = setInterval(this._updateMetrics.bind(this), this.sampleLength*1000); + this._interval = setInterval(this._updateMetrics.bind(this), this.sampleSeconds*1000); } _updateMetrics () { - if (this.byteCount < this.minimunBytesPerSecondThreshold) { + if (this.byteCount < this.minimunBytesPerSampleThreshold) { clearInterval(this._interval); this.pgstream.emit('error', new Error('Connection closed by server: input data too slow')); } From 41ba98b8f4dff5d9856f74ee30a6429794903643 Mon Sep 17 00:00:00 2001 From: Javier Goizueta Date: Wed, 20 Nov 2019 13:00:03 +0100 Subject: [PATCH 3/4] Fix tests --- test/acceptance/copy-throttle-test.js | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/test/acceptance/copy-throttle-test.js b/test/acceptance/copy-throttle-test.js index 92d76910..0d81b2b6 100644 --- a/test/acceptance/copy-throttle-test.js +++ b/test/acceptance/copy-throttle-test.js @@ -11,13 +11,11 @@ const { Readable } = require('stream'); const createTableQuery = `CREATE TABLE copy_from_throttle AS (SELECT 0::text AS counter)`; const dropTableQuery = `DROP TABLE copy_from_throttle`; -function * counterGenerator (initialTimeout, timeout, max_count) { +function * counterGenerator (timeout, max_count) { let counter = 0; - let t = initialTimeout + timeout; while (!max_count || counter <= max_count) { - yield new Promise(resolve => setTimeout(() => resolve(`${counter++}`), t)); // jshint ignore:line - t = timeout; + yield new Promise(resolve => setTimeout(() => resolve(`${counter++}`), timeout)); // jshint ignore:line } // generate also a delayed final marker (null) to simplify handling into a stream. yield new Promise(resolve => setTimeout(() => resolve(null), timeout)); From 8db90dc3ab3150f6aeba135179f798e59fdd44fa Mon Sep 17 00:00:00 2001 From: Javier Goizueta Date: Wed, 20 Nov 2019 13:06:36 +0100 Subject: [PATCH 4/4] Lint fix --- test/acceptance/copy-throttle-test.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/acceptance/copy-throttle-test.js b/test/acceptance/copy-throttle-test.js index 0d81b2b6..c6fd4dfa 100644 --- a/test/acceptance/copy-throttle-test.js +++ b/test/acceptance/copy-throttle-test.js @@ -154,7 +154,7 @@ describe('COPY FROM throttle', function () { method: 'POST' }; - request(options, (err, res, body) => { + request(options, (err, res) => { if (err) { return done(err); }