133 lines
4.7 KiB
JavaScript
133 lines
4.7 KiB
JavaScript
const zlib = require('zlib');
|
|
const PSQL = require('cartodb-psql');
|
|
const copyTo = require('pg-copy-streams').to;
|
|
const copyFrom = require('pg-copy-streams').from;
|
|
const StreamCopyMetrics = require('./stream_copy_metrics');
|
|
const { Client } = require('pg');
|
|
|
|
module.exports = {
|
|
to (res, sql, userDbParams, user, logger, cb) {
|
|
let metrics = new StreamCopyMetrics(logger, 'copyto', sql, user);
|
|
|
|
const pg = new PSQL(userDbParams);
|
|
pg.connect(function (err, client, done) {
|
|
if (err) {
|
|
return cb(err);
|
|
}
|
|
|
|
let responseEnded = false;
|
|
let connectionClosedByClient = false;
|
|
const copyToStream = copyTo(sql);
|
|
const pgstream = client.query(copyToStream);
|
|
|
|
res
|
|
.on('error', err => {
|
|
metrics.end(null, err);
|
|
pgstream.unpipe(res);
|
|
done();
|
|
return cb(err);
|
|
})
|
|
.on('close', () => {
|
|
if (!responseEnded) {
|
|
connectionClosedByClient = true;
|
|
// Cancel the running COPY TO query
|
|
// See https://www.postgresql.org/docs/9.5/static/protocol-flow.html#PROTOCOL-COPY
|
|
const runningClient = client;
|
|
const cancelingClient = new Client(runningClient.connectionParameters);
|
|
cancelingClient.cancel(runningClient, pgstream);
|
|
|
|
const err = new Error('Connection closed by client');
|
|
metrics.end(null, err);
|
|
pgstream.unpipe(res);
|
|
// see https://node-postgres.com/api/pool#releasecallback
|
|
done(err);
|
|
return cb(err);
|
|
}
|
|
})
|
|
.on('end', () => responseEnded = true);
|
|
|
|
pgstream
|
|
.on('error', err => {
|
|
if (!connectionClosedByClient) {
|
|
metrics.end(null, err);
|
|
pgstream.unpipe(res);
|
|
done(err);
|
|
return cb(err);
|
|
}
|
|
})
|
|
.on('data', data => metrics.addSize(data.length))
|
|
.on('end', () => {
|
|
metrics.end(copyToStream.rowCount);
|
|
done();
|
|
return cb(null, metrics);
|
|
})
|
|
.pipe(res);
|
|
});
|
|
},
|
|
|
|
from (req, sql, userDbParams, user, gzip, logger, cb) {
|
|
let metrics = new StreamCopyMetrics(logger, 'copyfrom', sql, user, gzip);
|
|
|
|
const pg = new PSQL(userDbParams);
|
|
pg.connect(function (err, client, done) {
|
|
if (err) {
|
|
return cb(err);
|
|
}
|
|
|
|
let copyFromStream = copyFrom(sql);
|
|
const pgstream = client.query(copyFromStream);
|
|
pgstream
|
|
.on('error', err => {
|
|
metrics.end(null, err);
|
|
req.unpipe(pgstream);
|
|
done();
|
|
return cb(err);
|
|
})
|
|
.on('end', function () {
|
|
metrics.end(copyFromStream.rowCount);
|
|
done();
|
|
return cb(null, metrics);
|
|
});
|
|
|
|
let requestEnded = false;
|
|
|
|
req
|
|
.on('error', err => {
|
|
metrics.end(null, err);
|
|
req.unpipe(pgstream);
|
|
pgstream.end();
|
|
done();
|
|
return cb(err);
|
|
})
|
|
.on('close', () => {
|
|
if (!requestEnded) {
|
|
const err = new Error('Connection closed by client');
|
|
metrics.end(null, err);
|
|
const connection = client.connection;
|
|
connection.sendCopyFail('CARTO SQL API: Connection closed by client');
|
|
req.unpipe(pgstream);
|
|
done();
|
|
return cb(err);
|
|
}
|
|
})
|
|
.on('data', data => {
|
|
if (gzip) {
|
|
metrics.addGzipSize(data.length)
|
|
} else {
|
|
metrics.addSize(data.length)
|
|
}
|
|
})
|
|
.on('end', () => requestEnded = true);
|
|
|
|
if (gzip) {
|
|
req
|
|
.pipe(zlib.createGunzip())
|
|
.on('data', data => metrics.addSize(data.length))
|
|
.pipe(pgstream);
|
|
} else {
|
|
req.pipe(pgstream);
|
|
}
|
|
});
|
|
}
|
|
};
|