diff --git a/app/controllers/copy_controller.js b/app/controllers/copy_controller.js index fd31976c..fa7b7460 100644 --- a/app/controllers/copy_controller.js +++ b/app/controllers/copy_controller.js @@ -71,31 +71,30 @@ function handleCopyTo (logger) { res.header("Content-Disposition", `attachment; filename=${encodeURIComponent(filename)}`); res.header("Content-Type", "application/octet-stream"); - streamCopy.getPGStream(StreamCopy.ACTION_TO, function (err, pgstream) { - if (err) { - return next(err); - } - - pgstream - .on('data', data => metrics.addSize(data.length)) - .on('error', (err) => { - metrics.end(null, err); - pgstream.unpipe(res); - - return next(err); - }) - .on('end', () => metrics.end( streamCopy.getRowCount(StreamCopy.ACTION_TO) )) - .pipe(res) - .on('close', () => { - const err = new Error('Connection closed by client'); - pgstream.emit('cancelQuery', err); - pgstream.emit('error', err); - }) - .on('error', err => { - pgstream.emit('error', err); - }); + streamCopy.getPGStream(StreamCopy.ACTION_TO, (err, pgstream) => { + if (err) { + return next(err); } - ); + + pgstream + .on('data', data => metrics.addSize(data.length)) + .on('error', err => { + metrics.end(null, err); + pgstream.unpipe(res); + + return next(err); + }) + .on('end', () => metrics.end( streamCopy.getRowCount(StreamCopy.ACTION_TO) )) + .pipe(res) + .on('close', () => { + const err = new Error('Connection closed by client'); + pgstream.emit('cancelQuery', err); + pgstream.emit('error', err); + }) + .on('error', err => { + pgstream.emit('error', err); + }); + }); }; } @@ -108,46 +107,45 @@ function handleCopyFrom (logger) { const streamCopy = new StreamCopy(sql, userDbParams); const metrics = new StreamCopyMetrics(logger, 'copyfrom', sql, user, isGzip); - streamCopy.getPGStream(StreamCopy.ACTION_FROM, function (err, pgstream) { - if (err) { - return next(err); - } - - req - .on('data', data => isGzip ? metrics.addGzipSize(data.length) : undefined) - .on('error', err => { - metrics.end(null, err); - pgstream.emit('error', err); - }) - .on('close', () => { - const err = new Error('Connection closed by client'); - pgstream.emit('cancelQuery', err); - pgstream.emit('error', err); - }) - .pipe(isGzip ? zlib.createGunzip() : new PassThrough()) - .on('data', data => metrics.addSize(data.length)) - .pipe(pgstream) - .on('error', (err) => { - metrics.end(null, err); - req.unpipe(pgstream); - return next(err); - }) - .on('end', () => { - metrics.end( streamCopy.getRowCount(StreamCopy.ACTION_FROM) ); - - const { time, rows } = metrics; - - if (!rows) { - return next(new Error("No rows copied")); - } - - res.send({ - time, - total_rows: rows - }); - }); + streamCopy.getPGStream(StreamCopy.ACTION_FROM, (err, pgstream) => { + if (err) { + return next(err); } - ); + + req + .on('data', data => isGzip ? metrics.addGzipSize(data.length) : undefined) + .on('error', err => { + metrics.end(null, err); + pgstream.emit('error', err); + }) + .on('close', () => { + const err = new Error('Connection closed by client'); + pgstream.emit('cancelQuery', err); + pgstream.emit('error', err); + }) + .pipe(isGzip ? zlib.createGunzip() : new PassThrough()) + .on('data', data => metrics.addSize(data.length)) + .pipe(pgstream) + .on('error', err => { + metrics.end(null, err); + req.unpipe(pgstream); + return next(err); + }) + .on('end', () => { + metrics.end( streamCopy.getRowCount(StreamCopy.ACTION_FROM) ); + + const { time, rows } = metrics; + + if (!rows) { + return next(new Error("No rows copied")); + } + + res.send({ + time, + total_rows: rows + }); + }); + }); }; }