From f7454228c63d3c9caf170a7b4bf342aba00663bc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20Mart=C3=ADn?= Date: Fri, 8 Jun 2018 16:50:12 +0200 Subject: [PATCH] refactor of copy from PoC --- app/controllers/copy_controller.js | 60 +++++++++++++----------------- app/services/stream_copy.js | 34 ++++++++++++----- 2 files changed, 50 insertions(+), 44 deletions(-) diff --git a/app/controllers/copy_controller.js b/app/controllers/copy_controller.js index c0f11b05..33a5b032 100644 --- a/app/controllers/copy_controller.js +++ b/app/controllers/copy_controller.js @@ -9,7 +9,7 @@ const { initializeProfilerMiddleware } = require('../middlewares/profiler'); const rateLimitsMiddleware = require('../middlewares/rate-limit'); const { RATE_LIMIT_ENDPOINTS_GROUPS } = rateLimitsMiddleware; const errorHandlerFactory = require('../services/error_handler_factory'); -const StreamCopy = require('../services/stream_copy'); +const streamCopy = require('../services/stream_copy'); const StreamCopyMetrics = require('../services/stream_copy_metrics'); const Logger = require('../services/logger'); const { Client } = require('pg'); @@ -23,8 +23,6 @@ function CopyController(metadataBackend, userDatabaseService, userLimitsService, this.userDatabaseService = userDatabaseService; this.userLimitsService = userLimitsService; this.statsClient = statsClient; - - this.streamCopy = new StreamCopy(); } CopyController.prototype.route = function (app) { @@ -39,7 +37,7 @@ CopyController.prototype.route = function (app) { connectionParamsMiddleware(this.userDatabaseService), timeoutLimitsMiddleware(this.metadataBackend), validateCopyQuery(), - handleCopyFrom(this.streamCopy), + handleCopyFrom(), errorHandler(), errorMiddleware() ]; @@ -54,7 +52,7 @@ CopyController.prototype.route = function (app) { connectionParamsMiddleware(this.userDatabaseService), timeoutLimitsMiddleware(this.metadataBackend), validateCopyQuery(), - handleCopyTo(this.streamCopy), + handleCopyTo(), errorHandler(), errorMiddleware() ]; @@ -65,7 +63,7 @@ CopyController.prototype.route = function (app) { }; -function handleCopyTo (streamCopy) { +function handleCopyTo () { return function handleCopyToMiddleware (req, res, next) { const sql = req.query.q; const { userDbParams, user } = res.locals; @@ -135,7 +133,7 @@ function handleCopyTo (streamCopy) { }; } -function handleCopyFrom (streamCopy) { +function handleCopyFrom () { return function handleCopyFromMiddleware (req, res, next) { const sql = req.query.q; const { userDbParams, user } = res.locals; @@ -145,45 +143,24 @@ function handleCopyFrom (streamCopy) { let metrics = new StreamCopyMetrics(logger, 'copyfrom', sql, user, gzip); - const pg = new PSQL(userDbParams); - pg.connect(function (err, client, done) { + streamCopy.from(sql, userDbParams, function (err, pgstream, client, done) { if (err) { + if (pgstream) { + req.unpipe(pgstream); + } + + metrics.end(null, err); next(err); } - let copyFromStream = copyFrom(sql); - const pgstream = client.query(copyFromStream); - pgstream - .on('error', err => { - metrics.end(null, err); - req.unpipe(pgstream); - done(); - next(err); - }) - .on('end', function () { - metrics.end(copyFromStream.rowCount); - - done(); - - const { time, rows } = metrics; - if (!time || !rows) { - return next(new Error("No rows copied")); - } - - res.send({ - time, - total_rows: rows - }); - }); - let requestEnded = false; - req .on('error', err => { metrics.end(null, err); req.unpipe(pgstream); pgstream.end(); done(); + next(err); }) .on('close', () => { @@ -215,6 +192,19 @@ function handleCopyFrom (streamCopy) { } else { req.pipe(pgstream); } + + }, function(err, rows) { + metrics.end(rows); + + const { time } = metrics; + if (!time || !rows) { + return next(new Error("No rows copied")); + } + + res.send({ + time, + total_rows: rows + }); }); }; } diff --git a/app/services/stream_copy.js b/app/services/stream_copy.js index 41242314..9e96332b 100644 --- a/app/services/stream_copy.js +++ b/app/services/stream_copy.js @@ -6,16 +6,32 @@ const StreamCopyMetrics = require('./stream_copy_metrics'); const { Client } = require('pg'); const Logger = require('./logger'); -module.exports = class StreamCopy { - constructor() { - this.logger = new Logger(global.settings.dataIngestionLogPath, 'data-ingestion'); - } - - to(res, sql, userDbParams, user, cb) { +module.exports = { + to() { - } + }, - from(req, sql, userDbParams, user, gzip, cb) { - + from(sql, userDbParams, cb, next) { + const pg = new PSQL(userDbParams); + pg.connect(function (err, client, done) { + if (err) { + cb(err); + } + + let copyFromStream = copyFrom(sql); + const pgstream = client.query(copyFromStream); + + pgstream + .on('error', err => { + done(); + cb(err, pgstream); + }) + .on('end', function () { + done(); + next(null, copyFromStream.rowCount); + }); + + cb(null, pgstream, client, done) + }); } };