refactor of copy from PoC

This commit is contained in:
Simon Martín 2018-06-08 16:50:12 +02:00
parent e259a51623
commit f7454228c6
2 changed files with 50 additions and 44 deletions

View File

@ -9,7 +9,7 @@ const { initializeProfilerMiddleware } = require('../middlewares/profiler');
const rateLimitsMiddleware = require('../middlewares/rate-limit'); const rateLimitsMiddleware = require('../middlewares/rate-limit');
const { RATE_LIMIT_ENDPOINTS_GROUPS } = rateLimitsMiddleware; const { RATE_LIMIT_ENDPOINTS_GROUPS } = rateLimitsMiddleware;
const errorHandlerFactory = require('../services/error_handler_factory'); const errorHandlerFactory = require('../services/error_handler_factory');
const StreamCopy = require('../services/stream_copy'); const streamCopy = require('../services/stream_copy');
const StreamCopyMetrics = require('../services/stream_copy_metrics'); const StreamCopyMetrics = require('../services/stream_copy_metrics');
const Logger = require('../services/logger'); const Logger = require('../services/logger');
const { Client } = require('pg'); const { Client } = require('pg');
@ -23,8 +23,6 @@ function CopyController(metadataBackend, userDatabaseService, userLimitsService,
this.userDatabaseService = userDatabaseService; this.userDatabaseService = userDatabaseService;
this.userLimitsService = userLimitsService; this.userLimitsService = userLimitsService;
this.statsClient = statsClient; this.statsClient = statsClient;
this.streamCopy = new StreamCopy();
} }
CopyController.prototype.route = function (app) { CopyController.prototype.route = function (app) {
@ -39,7 +37,7 @@ CopyController.prototype.route = function (app) {
connectionParamsMiddleware(this.userDatabaseService), connectionParamsMiddleware(this.userDatabaseService),
timeoutLimitsMiddleware(this.metadataBackend), timeoutLimitsMiddleware(this.metadataBackend),
validateCopyQuery(), validateCopyQuery(),
handleCopyFrom(this.streamCopy), handleCopyFrom(),
errorHandler(), errorHandler(),
errorMiddleware() errorMiddleware()
]; ];
@ -54,7 +52,7 @@ CopyController.prototype.route = function (app) {
connectionParamsMiddleware(this.userDatabaseService), connectionParamsMiddleware(this.userDatabaseService),
timeoutLimitsMiddleware(this.metadataBackend), timeoutLimitsMiddleware(this.metadataBackend),
validateCopyQuery(), validateCopyQuery(),
handleCopyTo(this.streamCopy), handleCopyTo(),
errorHandler(), errorHandler(),
errorMiddleware() errorMiddleware()
]; ];
@ -65,7 +63,7 @@ CopyController.prototype.route = function (app) {
}; };
function handleCopyTo (streamCopy) { function handleCopyTo () {
return function handleCopyToMiddleware (req, res, next) { return function handleCopyToMiddleware (req, res, next) {
const sql = req.query.q; const sql = req.query.q;
const { userDbParams, user } = res.locals; const { userDbParams, user } = res.locals;
@ -135,7 +133,7 @@ function handleCopyTo (streamCopy) {
}; };
} }
function handleCopyFrom (streamCopy) { function handleCopyFrom () {
return function handleCopyFromMiddleware (req, res, next) { return function handleCopyFromMiddleware (req, res, next) {
const sql = req.query.q; const sql = req.query.q;
const { userDbParams, user } = res.locals; const { userDbParams, user } = res.locals;
@ -145,45 +143,24 @@ function handleCopyFrom (streamCopy) {
let metrics = new StreamCopyMetrics(logger, 'copyfrom', sql, user, gzip); let metrics = new StreamCopyMetrics(logger, 'copyfrom', sql, user, gzip);
const pg = new PSQL(userDbParams); streamCopy.from(sql, userDbParams, function (err, pgstream, client, done) {
pg.connect(function (err, client, done) {
if (err) { if (err) {
if (pgstream) {
req.unpipe(pgstream);
}
metrics.end(null, err);
next(err); next(err);
} }
let copyFromStream = copyFrom(sql);
const pgstream = client.query(copyFromStream);
pgstream
.on('error', err => {
metrics.end(null, err);
req.unpipe(pgstream);
done();
next(err);
})
.on('end', function () {
metrics.end(copyFromStream.rowCount);
done();
const { time, rows } = metrics;
if (!time || !rows) {
return next(new Error("No rows copied"));
}
res.send({
time,
total_rows: rows
});
});
let requestEnded = false; let requestEnded = false;
req req
.on('error', err => { .on('error', err => {
metrics.end(null, err); metrics.end(null, err);
req.unpipe(pgstream); req.unpipe(pgstream);
pgstream.end(); pgstream.end();
done(); done();
next(err); next(err);
}) })
.on('close', () => { .on('close', () => {
@ -215,6 +192,19 @@ function handleCopyFrom (streamCopy) {
} else { } else {
req.pipe(pgstream); req.pipe(pgstream);
} }
}, function(err, rows) {
metrics.end(rows);
const { time } = metrics;
if (!time || !rows) {
return next(new Error("No rows copied"));
}
res.send({
time,
total_rows: rows
});
}); });
}; };
} }

View File

@ -6,16 +6,32 @@ const StreamCopyMetrics = require('./stream_copy_metrics');
const { Client } = require('pg'); const { Client } = require('pg');
const Logger = require('./logger'); const Logger = require('./logger');
module.exports = class StreamCopy { module.exports = {
constructor() { to() {
this.logger = new Logger(global.settings.dataIngestionLogPath, 'data-ingestion');
}
to(res, sql, userDbParams, user, cb) {
} },
from(req, sql, userDbParams, user, gzip, cb) { from(sql, userDbParams, cb, next) {
const pg = new PSQL(userDbParams);
pg.connect(function (err, client, done) {
if (err) {
cb(err);
}
let copyFromStream = copyFrom(sql);
const pgstream = client.query(copyFromStream);
pgstream
.on('error', err => {
done();
cb(err, pgstream);
})
.on('end', function () {
done();
next(null, copyFromStream.rowCount);
});
cb(null, pgstream, client, done)
});
} }
}; };