diff --git a/app/controllers/copy_controller.js b/app/controllers/copy_controller.js index 16cc7ba7..c0f11b05 100644 --- a/app/controllers/copy_controller.js +++ b/app/controllers/copy_controller.js @@ -10,6 +10,13 @@ const rateLimitsMiddleware = require('../middlewares/rate-limit'); const { RATE_LIMIT_ENDPOINTS_GROUPS } = rateLimitsMiddleware; const errorHandlerFactory = require('../services/error_handler_factory'); const StreamCopy = require('../services/stream_copy'); +const StreamCopyMetrics = require('../services/stream_copy_metrics'); +const Logger = require('../services/logger'); +const { Client } = require('pg'); +const zlib = require('zlib'); +const PSQL = require('cartodb-psql'); +const copyTo = require('pg-copy-streams').to; +const copyFrom = require('pg-copy-streams').from; function CopyController(metadataBackend, userDatabaseService, userLimitsService, statsClient) { this.metadataBackend = metadataBackend; @@ -60,52 +67,155 @@ CopyController.prototype.route = function (app) { function handleCopyTo (streamCopy) { return function handleCopyToMiddleware (req, res, next) { + const sql = req.query.q; + const { userDbParams, user } = res.locals; const filename = req.query.filename || 'carto-sql-copyto.dmp'; + const logger = new Logger(global.settings.dataIngestionLogPath, 'data-ingestion'); + let metrics = new StreamCopyMetrics(logger, 'copyto', sql, user); + res.header("Content-Disposition", `attachment; filename=${encodeURIComponent(filename)}`); res.header("Content-Type", "application/octet-stream"); - streamCopy.to( - res, - req.query.q, - res.locals.userDbParams, - res.locals.user, - function(err) { - if (err) { - return next(err); - } - - // this is a especial endpoint - // the data from postgres is streamed to response directly + const pg = new PSQL(userDbParams); + pg.connect(function (err, client, done) { + if (err) { + return next(err); } - ); + + let responseEnded = false; + let connectionClosedByClient = false; + + const copyToStream = copyTo(sql); + const pgstream = client.query(copyToStream); + + res + .on('error', err => { + metrics.end(null, err); + pgstream.unpipe(res); + done(); + return next(err); + }) + .on('close', () => { + if (!responseEnded) { + connectionClosedByClient = true; + // Cancel the running COPY TO query + // See https://www.postgresql.org/docs/9.5/static/protocol-flow.html#PROTOCOL-COPY + const runningClient = client; + const cancelingClient = new Client(runningClient.connectionParameters); + cancelingClient.cancel(runningClient, pgstream); + + const err = new Error('Connection closed by client'); + metrics.end(null, err); + pgstream.unpipe(res); + // see https://node-postgres.com/api/pool#releasecallback + done(err); + return next(err); + } + }) + .on('end', () => responseEnded = true); + + pgstream + .on('error', err => { + if (!connectionClosedByClient) { + metrics.end(null, err); + pgstream.unpipe(res); + done(err); + return next(err); + } + }) + .on('data', data => metrics.addSize(data.length)) + .on('end', () => { + metrics.end(copyToStream.rowCount); + done(); + return next(null, metrics); + }) + .pipe(res); + }); }; } function handleCopyFrom (streamCopy) { return function handleCopyFromMiddleware (req, res, next) { - streamCopy.from( - req, - req.query.q, - res.locals.userDbParams, - res.locals.user, - req.get('content-encoding') === 'gzip', - function(err, metrics) { - if (err) { - return next(err); - } + const sql = req.query.q; + const { userDbParams, user } = res.locals; + const gzip = req.get('content-encoding') === 'gzip'; - const { time, rows } = metrics; - if (!time || !rows) { - return next(new Error("No rows copied")); - } - - res.send({ - time, - total_rows: rows - }); + const logger = new Logger(global.settings.dataIngestionLogPath, 'data-ingestion'); + let metrics = new StreamCopyMetrics(logger, 'copyfrom', sql, user, gzip); + + + const pg = new PSQL(userDbParams); + pg.connect(function (err, client, done) { + if (err) { + next(err); } - ); + + let copyFromStream = copyFrom(sql); + const pgstream = client.query(copyFromStream); + pgstream + .on('error', err => { + metrics.end(null, err); + req.unpipe(pgstream); + done(); + next(err); + }) + .on('end', function () { + metrics.end(copyFromStream.rowCount); + + done(); + + const { time, rows } = metrics; + if (!time || !rows) { + return next(new Error("No rows copied")); + } + + res.send({ + time, + total_rows: rows + }); + }); + + let requestEnded = false; + + req + .on('error', err => { + metrics.end(null, err); + req.unpipe(pgstream); + pgstream.end(); + done(); + next(err); + }) + .on('close', () => { + if (!requestEnded) { + const err = new Error('Connection closed by client'); + metrics.end(null, err); + const connection = client.connection; + connection.sendCopyFail('CARTO SQL API: Connection closed by client'); + req.unpipe(pgstream); + done(); + next(err); + } + }) + .on('data', data => { + if (gzip) { + metrics.addGzipSize(data.length); + } else { + metrics.addSize(data.length); + } + }) + .on('end', () => requestEnded = true); + + + if (gzip) { + req + .pipe(zlib.createGunzip()) + .on('data', data => metrics.addSize(data.length)) + .pipe(pgstream); + } else { + req.pipe(pgstream); + } + }); }; } @@ -114,12 +224,12 @@ function validateCopyQuery () { const sql = req.query.q; if (!sql) { - next(new Error("SQL is missing")); + return next(new Error("SQL is missing")); } // Only accept SQL that starts with 'COPY' if (!sql.toUpperCase().startsWith("COPY ")) { - next(new Error("SQL must start with COPY")); + return next(new Error("SQL must start with COPY")); } next(); diff --git a/app/services/stream_copy.js b/app/services/stream_copy.js index 7e86d81c..41242314 100644 --- a/app/services/stream_copy.js +++ b/app/services/stream_copy.js @@ -12,126 +12,10 @@ module.exports = class StreamCopy { } to(res, sql, userDbParams, user, cb) { - let metrics = new StreamCopyMetrics(this.logger, 'copyto', sql, user); - - const pg = new PSQL(userDbParams); - pg.connect(function (err, client, done) { - if (err) { - return cb(err); - } - - let responseEnded = false; - let connectionClosedByClient = false; - const copyToStream = copyTo(sql); - const pgstream = client.query(copyToStream); - - res - .on('error', err => { - metrics.end(null, err); - pgstream.unpipe(res); - done(); - return cb(err); - }) - .on('close', () => { - if (!responseEnded) { - connectionClosedByClient = true; - // Cancel the running COPY TO query - // See https://www.postgresql.org/docs/9.5/static/protocol-flow.html#PROTOCOL-COPY - const runningClient = client; - const cancelingClient = new Client(runningClient.connectionParameters); - cancelingClient.cancel(runningClient, pgstream); - - const err = new Error('Connection closed by client'); - metrics.end(null, err); - pgstream.unpipe(res); - // see https://node-postgres.com/api/pool#releasecallback - done(err); - return cb(err); - } - }) - .on('end', () => responseEnded = true); - - pgstream - .on('error', err => { - if (!connectionClosedByClient) { - metrics.end(null, err); - pgstream.unpipe(res); - done(err); - return cb(err); - } - }) - .on('data', data => metrics.addSize(data.length)) - .on('end', () => { - metrics.end(copyToStream.rowCount); - done(); - return cb(null, metrics); - }) - .pipe(res); - }); + } from(req, sql, userDbParams, user, gzip, cb) { - let metrics = new StreamCopyMetrics(this.logger, 'copyfrom', sql, user, gzip); - - const pg = new PSQL(userDbParams); - pg.connect(function (err, client, done) { - if (err) { - return cb(err); - } - - let copyFromStream = copyFrom(sql); - const pgstream = client.query(copyFromStream); - pgstream - .on('error', err => { - metrics.end(null, err); - req.unpipe(pgstream); - done(); - return cb(err); - }) - .on('end', function () { - metrics.end(copyFromStream.rowCount); - done(); - return cb(null, metrics); - }); - - let requestEnded = false; - - req - .on('error', err => { - metrics.end(null, err); - req.unpipe(pgstream); - pgstream.end(); - done(); - return cb(err); - }) - .on('close', () => { - if (!requestEnded) { - const err = new Error('Connection closed by client'); - metrics.end(null, err); - const connection = client.connection; - connection.sendCopyFail('CARTO SQL API: Connection closed by client'); - req.unpipe(pgstream); - done(); - return cb(err); - } - }) - .on('data', data => { - if (gzip) { - metrics.addGzipSize(data.length); - } else { - metrics.addSize(data.length); - } - }) - .on('end', () => requestEnded = true); - - if (gzip) { - req - .pipe(zlib.createGunzip()) - .on('data', data => metrics.addSize(data.length)) - .pipe(pgstream); - } else { - req.pipe(pgstream); - } - }); + } };