Extract class

This commit is contained in:
Daniel García Aubert 2019-05-29 11:20:18 +02:00
parent 2b86abbeba
commit e6b9325b77
2 changed files with 52 additions and 48 deletions

View File

@ -11,8 +11,9 @@ const { RATE_LIMIT_ENDPOINTS_GROUPS } = rateLimitsMiddleware;
const errorHandlerFactory = require('../services/error_handler_factory');
const StreamCopy = require('../services/stream_copy');
const StreamCopyMetrics = require('../services/stream_copy_metrics');
const Throttler = require('../services/throttler-stream');
const zlib = require('zlib');
const { PassThrough, Transform } = require('stream');
const { PassThrough } = require('stream');
const handleQueryMiddleware = require('../middlewares/handle-query');
function CopyController(metadataBackend, userDatabaseService, userLimitsService, logger) {
@ -97,53 +98,6 @@ function handleCopyTo (logger) {
};
}
class Throttler extends Transform {
constructor (pgstream, ...args) {
super(...args);
this.pgstream = pgstream;
this.sampleLength = global.settings.copy_from_maximum_slow_input_speed_interval || 15;
this.minimunBytesPerSecondThershold = global.settings.copy_from_minimum_input_speed || 1;
this.byteCount = 0;
this.bytesPerSecondHistory = [];
this._interval = setInterval(this._updateMetrics.bind(this), 1000);
}
_updateMetrics () {
this.bytesPerSecondHistory.push(this.byteCount);
this.byteCount = 0;
if (this.bytesPerSecondHistory > this.sampleLength) {
this.bytesPerSecondHistory.shift();
}
const doesNotReachThreshold = [];
for (const bytesPerSecond of this.bytesPerSecondHistory) {
if (bytesPerSecond <= this.minimunBytesPerSecondThershold) {
doesNotReachThreshold.push(true);
}
}
if (doesNotReachThreshold.length >= this.sampleLength) {
clearInterval(this._interval);
this.pgstream.emit('error', new Error('Connection closed by server: input data too slow'));
}
}
_transform (chunk, encoding, callback) {
this.byteCount += chunk.length;
callback(null, chunk);
}
_flush (callback) {
clearInterval(this._interval);
callback();
}
}
function handleCopyFrom (logger) {
return function handleCopyFromMiddleware (req, res, next) {
const { sql, userDbParams, user, dbRemainingQuota } = res.locals;

View File

@ -0,0 +1,50 @@
'use strict';
const { Transform } = require('stream');
module.exports = class Throttler extends Transform {
constructor (pgstream, ...args) {
super(...args);
this.pgstream = pgstream;
this.sampleLength = global.settings.copy_from_maximum_slow_input_speed_interval || 15;
this.minimunBytesPerSecondThershold = global.settings.copy_from_minimum_input_speed || 0;
this.byteCount = 0;
this.bytesPerSecondHistory = [];
this._interval = setInterval(this._updateMetrics.bind(this), 1000);
}
_updateMetrics () {
this.bytesPerSecondHistory.push(this.byteCount);
this.byteCount = 0;
if (this.bytesPerSecondHistory > this.sampleLength) {
this.bytesPerSecondHistory.shift();
}
const doesNotReachThreshold = [];
for (const bytesPerSecond of this.bytesPerSecondHistory) {
if (bytesPerSecond <= this.minimunBytesPerSecondThershold) {
doesNotReachThreshold.push(true);
}
}
if (doesNotReachThreshold.length >= this.sampleLength) {
clearInterval(this._interval);
this.pgstream.emit('error', new Error('Connection closed by server: input data too slow'));
}
}
_transform (chunk, encoding, callback) {
this.byteCount += chunk.length;
callback(null, chunk);
}
_flush (callback) {
clearInterval(this._interval);
callback();
}
}