removing 2nd calback using events
This commit is contained in:
parent
19aae3c40f
commit
409273bafe
@ -45,7 +45,7 @@ CopyController.prototype.route = function (app) {
|
||||
const copyToMiddlewares = endpointGroup => {
|
||||
return [
|
||||
initializeProfilerMiddleware('copyto'),
|
||||
userMiddleware(this.metadataBackend),
|
||||
userMiddleware(this.metadataBackend),
|
||||
rateLimitsMiddleware(this.userLimitsService, endpointGroup),
|
||||
authorizationMiddleware(this.metadataBackend),
|
||||
connectionParamsMiddleware(this.userDatabaseService),
|
||||
@ -74,6 +74,10 @@ function handleCopyTo (logger) {
|
||||
res.header("Content-Disposition", `attachment; filename=${encodeURIComponent(filename)}`);
|
||||
res.header("Content-Type", "application/octet-stream");
|
||||
|
||||
streamCopy.on('copy-to-end', rows => {
|
||||
metrics.end(rows);
|
||||
});
|
||||
|
||||
streamCopy.to(
|
||||
function (err, pgstream, client, done) {
|
||||
if (err) {
|
||||
@ -117,9 +121,6 @@ function handleCopyTo (logger) {
|
||||
pgstream
|
||||
.on('data', data => metrics.addSize(data.length))
|
||||
.pipe(res);
|
||||
},
|
||||
function (err, rows) {
|
||||
metrics.end(rows);
|
||||
}
|
||||
);
|
||||
};
|
||||
@ -134,6 +135,20 @@ function handleCopyFrom (logger) {
|
||||
const streamCopy = new StreamCopy(sql, userDbParams);
|
||||
const metrics = new StreamCopyMetrics(logger, 'copyfrom', sql, user, isGzip);
|
||||
|
||||
streamCopy.on('copy-from-end', rows => {
|
||||
metrics.end(rows);
|
||||
|
||||
const { time } = metrics;
|
||||
if (!time || !rows) {
|
||||
return next(new Error("No rows copied"));
|
||||
}
|
||||
|
||||
res.send({
|
||||
time,
|
||||
total_rows: rows
|
||||
});
|
||||
});
|
||||
|
||||
streamCopy.from(
|
||||
function (err, pgstream, client, done) {
|
||||
if (err) {
|
||||
|
@ -1,15 +1,18 @@
|
||||
const EventEmitter = require('events');
|
||||
const PSQL = require('cartodb-psql');
|
||||
const copyTo = require('pg-copy-streams').to;
|
||||
const copyFrom = require('pg-copy-streams').from;
|
||||
|
||||
module.exports = class StreamCopy {
|
||||
module.exports = class StreamCopy extends EventEmitter {
|
||||
constructor (sql, userDbParams) {
|
||||
super();
|
||||
|
||||
this.pg = new PSQL(userDbParams);
|
||||
this.sql = sql;
|
||||
this.connectionClosedByClient = false;
|
||||
}
|
||||
|
||||
to(cb, next) {
|
||||
to(cb) {
|
||||
this.pg.connect((err, client, done) => {
|
||||
if (err) {
|
||||
cb(err);
|
||||
@ -17,7 +20,7 @@ module.exports = class StreamCopy {
|
||||
|
||||
const copyToStream = copyTo(this.sql);
|
||||
const pgstream = client.query(copyToStream);
|
||||
|
||||
|
||||
pgstream
|
||||
.on('error', err => {
|
||||
if (!this.connectionClosedByClient) {
|
||||
@ -27,14 +30,14 @@ module.exports = class StreamCopy {
|
||||
})
|
||||
.on('end', () => {
|
||||
done();
|
||||
return next(null, copyToStream.rowCount);
|
||||
this.emit('copy-to-end', copyToStream.rowCount);
|
||||
});
|
||||
|
||||
cb(null, pgstream, client, done);
|
||||
});
|
||||
}
|
||||
|
||||
from(cb, next) {
|
||||
from(cb) {
|
||||
this.pg.connect((err, client, done) => {
|
||||
if (err) {
|
||||
cb(err);
|
||||
@ -49,9 +52,9 @@ module.exports = class StreamCopy {
|
||||
cb(err, pgstream);
|
||||
|
||||
})
|
||||
.on('end', function () {
|
||||
.on('end', () => {
|
||||
done();
|
||||
next(null, copyFromStream.rowCount);
|
||||
this.emit('copy-from-end', copyFromStream.rowCount);
|
||||
});
|
||||
|
||||
cb(null, pgstream, client, done);
|
||||
|
Loading…
Reference in New Issue
Block a user