diff --git a/app/controllers/copy_controller.js b/app/controllers/copy_controller.js index 359dad16..1c47b20b 100644 --- a/app/controllers/copy_controller.js +++ b/app/controllers/copy_controller.js @@ -11,6 +11,7 @@ const { RATE_LIMIT_ENDPOINTS_GROUPS } = rateLimitsMiddleware; const errorHandlerFactory = require('../services/error_handler_factory'); const StreamCopy = require('../services/stream_copy'); const StreamCopyMetrics = require('../services/stream_copy_metrics'); +const Throttler = require('../services/throttler-stream'); const zlib = require('zlib'); const { PassThrough } = require('stream'); const handleQueryMiddleware = require('../middlewares/handle-query'); @@ -113,6 +114,8 @@ function handleCopyFrom (logger) { return next(err); } + const throttle = new Throttler(pgstream); + req .on('data', data => isGzip ? metrics.addGzipSize(data.length) : undefined) .on('error', err => { @@ -120,6 +123,7 @@ function handleCopyFrom (logger) { pgstream.emit('error', err); }) .on('close', () => pgstream.emit('error', new Error('Connection closed by client'))) + .pipe(throttle) .pipe(decompress) .on('data', data => { metrics.addSize(data.length); diff --git a/app/services/stream_copy.js b/app/services/stream_copy.js index ce3690d5..616397c5 100644 --- a/app/services/stream_copy.js +++ b/app/services/stream_copy.js @@ -32,7 +32,6 @@ module.exports = class StreamCopy { } getPGStream(action, callback) { - this.action = action; const pg = new PSQL(this.dbParams); pg.connect((err, client, done) => { @@ -53,14 +52,13 @@ module.exports = class StreamCopy { if (action === ACTION_TO) { pgstream.on('end', () => done()); + pgstream.on('error', () => this._cancel(client.processID, action)); } else if (action === ACTION_FROM) { pgstream.on('finish', () => done()); + pgstream.on('error', err => client.connection.sendCopyFail(err.message)); } - pgstream.on('error', err => { - this._cancel(client.processID, action); - done(err); - }); + pgstream.on('error', err => done(err)); callback(null, pgstream); }); diff --git a/app/services/throttler-stream.js b/app/services/throttler-stream.js new file mode 100644 index 00000000..936ebfcd --- /dev/null +++ b/app/services/throttler-stream.js @@ -0,0 +1,50 @@ +'use strict'; + +const { Transform } = require('stream'); + +module.exports = class Throttler extends Transform { + constructor (pgstream, ...args) { + super(...args); + + this.pgstream = pgstream; + + this.sampleLength = global.settings.copy_from_maximum_slow_input_speed_interval || 15; + this.minimunBytesPerSecondThershold = global.settings.copy_from_minimum_input_speed || 0; + this.byteCount = 0; + this.bytesPerSecondHistory = []; + + this._interval = setInterval(this._updateMetrics.bind(this), 1000); + } + + _updateMetrics () { + this.bytesPerSecondHistory.push(this.byteCount); + this.byteCount = 0; + + if (this.bytesPerSecondHistory > this.sampleLength) { + this.bytesPerSecondHistory.shift(); + } + + const doesNotReachThreshold = []; + + for (const bytesPerSecond of this.bytesPerSecondHistory) { + if (bytesPerSecond <= this.minimunBytesPerSecondThershold) { + doesNotReachThreshold.push(true); + } + } + + if (doesNotReachThreshold.length >= this.sampleLength) { + clearInterval(this._interval); + this.pgstream.emit('error', new Error('Connection closed by server: input data too slow')); + } + } + + _transform (chunk, encoding, callback) { + this.byteCount += chunk.length; + callback(null, chunk); + } + + _flush (callback) { + clearInterval(this._interval); + callback(); + } +}; diff --git a/config/environments/development.js.example b/config/environments/development.js.example index ba6259e0..19c4b9e2 100644 --- a/config/environments/development.js.example +++ b/config/environments/development.js.example @@ -38,6 +38,8 @@ module.exports.batch_log_filename = 'logs/batch-queries.log'; module.exports.copy_timeout = "'5h'"; module.exports.copy_from_max_post_size = 2 * 1024 * 1024 * 1024 // 2 GB; module.exports.copy_from_max_post_size_pretty = '2 GB'; +module.exports.copy_from_minimum_input_speed = 0; // 1 byte per second +module.exports.copy_from_maximum_slow_input_speed_interval = 15 // 15 seconds // Max number of queued jobs a user can have at a given time module.exports.batch_max_queued_jobs = 64; // Capacity strategy to use. diff --git a/config/environments/production.js.example b/config/environments/production.js.example index c7581af8..2987d2fc 100644 --- a/config/environments/production.js.example +++ b/config/environments/production.js.example @@ -39,6 +39,8 @@ module.exports.batch_log_filename = 'logs/batch-queries.log'; module.exports.copy_timeout = "'5h'"; module.exports.copy_from_max_post_size = 2 * 1024 * 1024 * 1024 // 2 GB; module.exports.copy_from_max_post_size_pretty = '2 GB'; +module.exports.copy_from_minimum_input_speed = 0; // 1 byte per second +module.exports.copy_from_maximum_slow_input_speed_interval = 15 // 15 seconds // Max number of queued jobs a user can have at a given time module.exports.batch_max_queued_jobs = 64; // Capacity strategy to use. diff --git a/config/environments/staging.js.example b/config/environments/staging.js.example index 94fd7b29..ee7ab0d2 100644 --- a/config/environments/staging.js.example +++ b/config/environments/staging.js.example @@ -39,6 +39,8 @@ module.exports.batch_log_filename = 'logs/batch-queries.log'; module.exports.copy_timeout = "'5h'"; module.exports.copy_from_max_post_size = 2 * 1024 * 1024 * 1024 // 2 GB; module.exports.copy_from_max_post_size_pretty = '2 GB'; +module.exports.copy_from_minimum_input_speed = 0; // 1 byte per second +module.exports.copy_from_maximum_slow_input_speed_interval = 15 // 15 seconds // Max number of queued jobs a user can have at a given time module.exports.batch_max_queued_jobs = 64; // Capacity strategy to use. diff --git a/config/environments/test.js.example b/config/environments/test.js.example index cf435939..031493bb 100644 --- a/config/environments/test.js.example +++ b/config/environments/test.js.example @@ -36,6 +36,8 @@ module.exports.batch_log_filename = 'logs/batch-queries.log'; module.exports.copy_timeout = "'5h'"; module.exports.copy_from_max_post_size = 2 * 1024 * 1024 * 1024 // 2 GB; module.exports.copy_from_max_post_size_pretty = '2 GB'; +module.exports.copy_from_minimum_input_speed = 0; // 1 byte per second +module.exports.copy_from_maximum_slow_input_speed_interval = 15 // 15 seconds // Max number of queued jobs a user can have at a given time module.exports.batch_max_queued_jobs = 64; // Capacity strategy to use. diff --git a/test/acceptance/copy-throttle.js b/test/acceptance/copy-throttle.js new file mode 100644 index 00000000..0be5876e --- /dev/null +++ b/test/acceptance/copy-throttle.js @@ -0,0 +1,137 @@ +'use strict'; + +const querystring = require('querystring'); +const StatsClient = require('../../app/stats/client'); +const statsClient = StatsClient.getInstance(global.settings.statsd); +const server = require('../../app/server')(statsClient); +const request = require('request'); +const assert = require('assert'); +const { Readable } = require('stream'); + +const createTableQuery = `CREATE TABLE copy_from_throttle AS (SELECT 0::integer AS counter)`; +const dropTableQuery = `DROP TABLE copy_from_throttle`; + +function * counterGenerator (timeout) { + let counter = 0; + + while (true) { + yield new Promise(resolve => setTimeout(() => resolve(`${counter++}`), timeout)); // jshint ignore:line + } +} + +class CounterStream extends Readable { + constructor(generator, ...args) { + super(...args); + this.generator = generator; + } + + _read () { + const res = this.generator.next(); + res.value.then(value => res.done ? this.push(null) : this.push(value)); + } +} + +describe('COPY FROM throttle', function () { + before(function() { + this.copy_from_minimum_input_speed = global.settings.copy_from_minimum_input_speed; + global.settings.copy_from_minimum_input_speed = 1; + + this.copy_from_maximum_slow_input_speed_interval = global.settings.copy_from_maximum_slow_input_speed_interval; + global.settings.copy_from_maximum_slow_input_speed_interval = 1; + + }); + + after(function() { + global.settings.copy_from_first_chunk_timeout = this.copy_from_first_chunk_timeout; + global.settings.copy_from_maximum_slow_input_speed_interval = this.copy_from_maximum_slow_input_speed_interval; + }); + + beforeEach(function (done) { + this.listener = server.listen(0, '127.0.0.1'); + + this.listener.on('error', done); + + this.listener.on('listening', () => { + const { address, port } = this.listener.address(); + + this.host = address; + this.port = port; + + done(); + }); + }); + + beforeEach(function (done) { + const { host, port } = this; + + const createTable = querystring.stringify({ q: createTableQuery, api_key: 1234}); + + const createTableOptions = { + url: `http://${host}:${port}/api/v1/sql?${createTable}`, + headers: { host: 'vizzuality.cartodb.com' }, + method: 'GET' + }; + + request(createTableOptions, function (err, res) { + if (err) { + return done(err); + } + + assert.equal(res.statusCode, 200); + + done(); + }); + }); + + afterEach(function (done) { + const { host, port } = this; + + const dropTable = querystring.stringify({ q: dropTableQuery, api_key: 1234 }); + + const dropTableOptions = { + url: `http://${host}:${port}/api/v1/sql?${dropTable}`, + headers: { host: 'vizzuality.cartodb.com' }, + method: 'GET' + }; + + request(dropTableOptions, function (err) { + if (err) { + return done(err); + } + + done(); + }); + }); + + afterEach(function (done) { + this.listener.close(done); + }); + + it('hang while sendind data', function (done) { + const { host, port } = this; + + const copy = querystring.stringify({ + q: "COPY copy_from_throttle (counter) FROM STDIN WITH (FORMAT CSV, DELIMITER ',')", + api_key: 1234 + }); + + const options = { + url: `http://${host}:${port}/api/v1/sql/copyfrom?${copy}`, + headers: { host: 'vizzuality.cartodb.com' }, + body: new CounterStream(counterGenerator(1000)), + method: 'POST' + }; + + request(options, (err, res, body) => { + if (err) { + return done(err); + } + + assert.equal(res.statusCode, 400); + body = JSON.parse(body); + assert.deepEqual(body, { error: ["Connection closed by server: input data too slow"] }); + + done(); + }); + }); +});