From 409273bafea5fd23d084f050ff61faaf0c197318 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20Mart=C3=ADn?= Date: Tue, 12 Jun 2018 18:39:50 +0200 Subject: [PATCH] removing 2nd calback using events --- app/controllers/copy_controller.js | 23 +++++++++++++++++++---- app/services/stream_copy.js | 17 ++++++++++------- 2 files changed, 29 insertions(+), 11 deletions(-) diff --git a/app/controllers/copy_controller.js b/app/controllers/copy_controller.js index 44920d3d..0e2c91d7 100644 --- a/app/controllers/copy_controller.js +++ b/app/controllers/copy_controller.js @@ -45,7 +45,7 @@ CopyController.prototype.route = function (app) { const copyToMiddlewares = endpointGroup => { return [ initializeProfilerMiddleware('copyto'), - userMiddleware(this.metadataBackend), + userMiddleware(this.metadataBackend), rateLimitsMiddleware(this.userLimitsService, endpointGroup), authorizationMiddleware(this.metadataBackend), connectionParamsMiddleware(this.userDatabaseService), @@ -74,6 +74,10 @@ function handleCopyTo (logger) { res.header("Content-Disposition", `attachment; filename=${encodeURIComponent(filename)}`); res.header("Content-Type", "application/octet-stream"); + streamCopy.on('copy-to-end', rows => { + metrics.end(rows); + }); + streamCopy.to( function (err, pgstream, client, done) { if (err) { @@ -117,9 +121,6 @@ function handleCopyTo (logger) { pgstream .on('data', data => metrics.addSize(data.length)) .pipe(res); - }, - function (err, rows) { - metrics.end(rows); } ); }; @@ -134,6 +135,20 @@ function handleCopyFrom (logger) { const streamCopy = new StreamCopy(sql, userDbParams); const metrics = new StreamCopyMetrics(logger, 'copyfrom', sql, user, isGzip); + streamCopy.on('copy-from-end', rows => { + metrics.end(rows); + + const { time } = metrics; + if (!time || !rows) { + return next(new Error("No rows copied")); + } + + res.send({ + time, + total_rows: rows + }); + }); + streamCopy.from( function (err, pgstream, client, done) { if (err) { diff --git a/app/services/stream_copy.js b/app/services/stream_copy.js index f6075226..9b101a1f 100644 --- a/app/services/stream_copy.js +++ b/app/services/stream_copy.js @@ -1,15 +1,18 @@ +const EventEmitter = require('events'); const PSQL = require('cartodb-psql'); const copyTo = require('pg-copy-streams').to; const copyFrom = require('pg-copy-streams').from; -module.exports = class StreamCopy { +module.exports = class StreamCopy extends EventEmitter { constructor (sql, userDbParams) { + super(); + this.pg = new PSQL(userDbParams); this.sql = sql; this.connectionClosedByClient = false; } - to(cb, next) { + to(cb) { this.pg.connect((err, client, done) => { if (err) { cb(err); @@ -17,7 +20,7 @@ module.exports = class StreamCopy { const copyToStream = copyTo(this.sql); const pgstream = client.query(copyToStream); - + pgstream .on('error', err => { if (!this.connectionClosedByClient) { @@ -27,14 +30,14 @@ module.exports = class StreamCopy { }) .on('end', () => { done(); - return next(null, copyToStream.rowCount); + this.emit('copy-to-end', copyToStream.rowCount); }); cb(null, pgstream, client, done); }); } - from(cb, next) { + from(cb) { this.pg.connect((err, client, done) => { if (err) { cb(err); @@ -49,9 +52,9 @@ module.exports = class StreamCopy { cb(err, pgstream); }) - .on('end', function () { + .on('end', () => { done(); - next(null, copyFromStream.rowCount); + this.emit('copy-from-end', copyFromStream.rowCount); }); cb(null, pgstream, client, done);