From 5a116c69e9a60e49f66ff574c69b34951d6059d1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Garc=C3=ADa=20Aubert?= Date: Mon, 27 May 2019 19:32:43 +0200 Subject: [PATCH] Draft for copy from throttle --- app/controllers/copy_controller.js | 25 ++++++++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/app/controllers/copy_controller.js b/app/controllers/copy_controller.js index 359dad16..7641811c 100644 --- a/app/controllers/copy_controller.js +++ b/app/controllers/copy_controller.js @@ -12,7 +12,7 @@ const errorHandlerFactory = require('../services/error_handler_factory'); const StreamCopy = require('../services/stream_copy'); const StreamCopyMetrics = require('../services/stream_copy_metrics'); const zlib = require('zlib'); -const { PassThrough } = require('stream'); +const { PassThrough, Transform } = require('stream'); const handleQueryMiddleware = require('../middlewares/handle-query'); function CopyController(metadataBackend, userDatabaseService, userLimitsService, logger) { @@ -97,6 +97,26 @@ function handleCopyTo (logger) { }; } +class Throttle extends Transform { + constructor (pgstream, ...args) { + super(...args); + + this.pgstream = pgstream; + + this.timeout = setTimeout(() => { + pgstream.emit('error', new Error('Connection closed by server: no data received')); + }, 5000); + } + + _transform (chunk, encoding, callback) { + if (!this.timeout._destroyed) { + clearTimeout(this.timeout); + } + + callback(null, chunk); + } +} + function handleCopyFrom (logger) { return function handleCopyFromMiddleware (req, res, next) { const { sql, userDbParams, user, dbRemainingQuota } = res.locals; @@ -113,6 +133,8 @@ function handleCopyFrom (logger) { return next(err); } + const throttle = new Throttle(pgstream); + req .on('data', data => isGzip ? metrics.addGzipSize(data.length) : undefined) .on('error', err => { @@ -120,6 +142,7 @@ function handleCopyFrom (logger) { pgstream.emit('error', err); }) .on('close', () => pgstream.emit('error', new Error('Connection closed by client'))) + .pipe(throttle) .pipe(decompress) .on('data', data => { metrics.addSize(data.length);