CartoDB-SQL-API/app/services/stream_copy.js

124 lines
4.4 KiB
JavaScript
Raw Normal View History

2018-05-25 23:42:30 +08:00
const zlib = require('zlib');
const PSQL = require('cartodb-psql');
const copyTo = require('pg-copy-streams').to;
const copyFrom = require('pg-copy-streams').from;
2018-05-26 00:47:41 +08:00
const StreamCopyMetrics = require('./stream_copy_metrics');
const { Client } = require('pg');
2018-05-25 23:42:30 +08:00
module.exports = {
2018-06-05 00:08:34 +08:00
to (res, sql, userDbParams, user, logger, cb) {
let metrics = new StreamCopyMetrics(logger, 'copyto', sql, user);
2018-05-25 23:42:30 +08:00
const pg = new PSQL(userDbParams);
pg.connect(function (err, client, done) {
2018-05-25 23:42:30 +08:00
if (err) {
return cb(err);
}
let responseEnded = false;
let connectionClosedByClient = false;
2018-05-25 23:42:30 +08:00
const copyToStream = copyTo(sql);
const pgstream = client.query(copyToStream);
res
2018-05-25 23:42:30 +08:00
.on('error', err => {
2018-06-05 18:35:52 +08:00
metrics.end(null, err);
2018-05-25 23:42:30 +08:00
pgstream.unpipe(res);
done();
2018-05-25 23:42:30 +08:00
return cb(err);
})
.on('close', () => {
if (!responseEnded) {
connectionClosedByClient = true;
// Cancel the running COPY TO query
// See https://www.postgresql.org/docs/9.5/static/protocol-flow.html#PROTOCOL-COPY
const runningClient = client;
const cancelingClient = new Client(runningClient.connectionParameters);
cancelingClient.cancel(runningClient, pgstream);
2018-06-05 02:36:16 +08:00
const err = new Error('Connection closed by client');
2018-06-05 18:35:52 +08:00
metrics.end(null, err);
pgstream.unpipe(res);
2018-06-05 02:51:21 +08:00
// see https://node-postgres.com/api/pool#releasecallback
2018-06-05 02:36:16 +08:00
done(err);
return cb(err);
}
})
.on('end', () => responseEnded = true);
2018-05-25 23:42:30 +08:00
pgstream
.on('error', err => {
if (!connectionClosedByClient) {
2018-06-05 18:35:52 +08:00
metrics.end(null, err);
pgstream.unpipe(res);
2018-06-05 02:51:21 +08:00
done(err);
return cb(err);
}
2018-05-25 23:42:30 +08:00
})
2018-05-26 00:47:41 +08:00
.on('data', data => metrics.addSize(data.length))
2018-05-25 23:42:30 +08:00
.on('end', () => {
2018-05-26 00:47:41 +08:00
metrics.end(copyToStream.rowCount);
done();
2018-05-25 23:42:30 +08:00
return cb(null, metrics);
})
.pipe(res);
});
},
2018-06-05 00:08:34 +08:00
from (req, sql, userDbParams, user, gzip, logger, cb) {
let metrics = new StreamCopyMetrics(logger, 'copyfrom', sql, user, gzip);
2018-05-25 23:42:30 +08:00
const pg = new PSQL(userDbParams);
pg.connect(function (err, client, done) {
2018-05-25 23:42:30 +08:00
if (err) {
return cb(err);
}
let copyFromStream = copyFrom(sql);
const pgstream = client.query(copyFromStream);
pgstream
.on('error', err => {
2018-06-05 18:35:52 +08:00
metrics.end(null, err);
2018-05-25 23:42:30 +08:00
req.unpipe(pgstream);
done();
2018-05-29 22:19:53 +08:00
return cb(err);
2018-05-25 23:42:30 +08:00
})
.on('end', function () {
2018-05-26 00:47:41 +08:00
metrics.end(copyFromStream.rowCount);
done();
2018-05-26 00:47:41 +08:00
return cb(null, metrics);
2018-05-25 23:42:30 +08:00
});
let requestEnded = false;
req
.on('error', err => {
2018-06-05 18:35:52 +08:00
metrics.end(null, err);
2018-05-25 23:42:30 +08:00
req.unpipe(pgstream);
pgstream.end();
done();
2018-05-25 23:42:30 +08:00
return cb(err);
})
.on('close', () => {
if (!requestEnded) {
2018-06-05 18:35:52 +08:00
const err = new Error('Connection closed by client');
metrics.end(null, err);
const connection = client.connection;
2018-05-31 22:41:22 +08:00
connection.sendCopyFail('CARTO SQL API: Connection closed by client');
2018-05-25 23:42:30 +08:00
req.unpipe(pgstream);
done();
2018-06-05 18:35:52 +08:00
return cb(err);
2018-05-25 23:42:30 +08:00
}
})
.on('data', data => metrics.size += data.length)
2018-05-29 22:19:53 +08:00
.on('end', () => requestEnded = true);
if (gzip) {
req.pipe(zlib.createGunzip()).pipe(pgstream);
} else {
req.pipe(pgstream);
}
2018-05-25 23:42:30 +08:00
});
}
2018-05-26 00:50:56 +08:00
};