StreamCopy to class

This commit is contained in:
Simon Martín 2018-06-12 16:56:18 +02:00
parent d67cf2f7d5
commit ba2f85421d
2 changed files with 17 additions and 15 deletions

View File

@ -9,7 +9,7 @@ const { initializeProfilerMiddleware } = require('../middlewares/profiler');
const rateLimitsMiddleware = require('../middlewares/rate-limit');
const { RATE_LIMIT_ENDPOINTS_GROUPS } = rateLimitsMiddleware;
const errorHandlerFactory = require('../services/error_handler_factory');
const streamCopy = require('../services/stream_copy');
const StreamCopy = require('../services/stream_copy');
const StreamCopyMetrics = require('../services/stream_copy_metrics');
const Logger = require('../services/logger');
const { Client } = require('pg');
@ -68,14 +68,13 @@ function handleCopyTo (logger) {
const { userDbParams, user } = res.locals;
const filename = req.query.filename || 'carto-sql-copyto.dmp';
const streamCopy = new StreamCopy(sql, userDbParams);
const metrics = new StreamCopyMetrics(logger, 'copyto', sql, user);
res.header("Content-Disposition", `attachment; filename=${encodeURIComponent(filename)}`);
res.header("Content-Type", "application/octet-stream");
streamCopy.to(
sql,
userDbParams,
function (err, pgstream, client, done) {
if (err) {
return next(err);
@ -135,11 +134,10 @@ function handleCopyFrom (logger) {
const { userDbParams, user } = res.locals;
const isGzip = req.get('content-encoding') === 'gzip';
const streamCopy = new StreamCopy(sql, userDbParams);
const metrics = new StreamCopyMetrics(logger, 'copyfrom', sql, user, isGzip);
streamCopy.from(
sql,
userDbParams,
function (err, pgstream, client, done) {
if (err) {
if (pgstream) {

View File

@ -2,15 +2,19 @@ const PSQL = require('cartodb-psql');
const copyTo = require('pg-copy-streams').to;
const copyFrom = require('pg-copy-streams').from;
module.exports = {
to(sql, userDbParams, cb, next) {
const pg = new PSQL(userDbParams);
pg.connect(function (err, client, done) {
module.exports = class StreamCopy {
constructor (sql, userDbParams) {
this.pg = new PSQL(userDbParams);
this.sql = sql;
}
to(cb, next) {
this.pg.connect((err, client, done) => {
if (err) {
cb(err);
}
const copyToStream = copyTo(sql);
const copyToStream = copyTo(this.sql);
const pgstream = client.query(copyToStream);
pgstream
@ -21,22 +25,22 @@ module.exports = {
cb(null, pgstream, client, done);
});
},
}
from(sql, userDbParams, cb, next) {
const pg = new PSQL(userDbParams);
pg.connect(function (err, client, done) {
from(cb, next) {
this.pg.connect((err, client, done) => {
if (err) {
cb(err);
}
let copyFromStream = copyFrom(sql);
const copyFromStream = copyFrom(this.sql);
const pgstream = client.query(copyFromStream);
pgstream
.on('error', err => {
done();
cb(err, pgstream);
})
.on('end', function () {
done();