Merge pull request #497 from CartoDB/cancel-copy-to-client-disconnect

Cancel copy to upon client disconnect
This commit is contained in:
Rafa de la Torre 2018-06-05 09:59:49 +02:00 committed by GitHub
commit d2c0e68a78
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 118 additions and 5 deletions

2
.gitignore vendored
View File

@ -12,3 +12,5 @@ test/test.log
test/acceptance/oauth/venv/* test/acceptance/oauth/venv/*
coverage/ coverage/
npm-debug.log npm-debug.log
log/*.log
yarn.lock

View File

@ -3,27 +3,59 @@ const PSQL = require('cartodb-psql');
const copyTo = require('pg-copy-streams').to; const copyTo = require('pg-copy-streams').to;
const copyFrom = require('pg-copy-streams').from; const copyFrom = require('pg-copy-streams').from;
const StreamCopyMetrics = require('./stream_copy_metrics'); const StreamCopyMetrics = require('./stream_copy_metrics');
const { Client } = require('pg');
module.exports = { module.exports = {
to (res, sql, userDbParams, user, logger, cb) { to (res, sql, userDbParams, user, logger, cb) {
let metrics = new StreamCopyMetrics(logger, 'copyto', sql, user); let metrics = new StreamCopyMetrics(logger, 'copyto', sql, user);
const pg = new PSQL(userDbParams); const pg = new PSQL(userDbParams);
pg.connect(function (err, client) { pg.connect(function (err, client, done) {
if (err) { if (err) {
return cb(err); return cb(err);
} }
let responseEnded = false;
let connectionClosedByClient = false;
const copyToStream = copyTo(sql); const copyToStream = copyTo(sql);
const pgstream = client.query(copyToStream); const pgstream = client.query(copyToStream);
pgstream
res
.on('error', err => { .on('error', err => {
pgstream.unpipe(res); pgstream.unpipe(res);
done();
return cb(err); return cb(err);
}) })
.on('close', () => {
if (!responseEnded) {
connectionClosedByClient = true;
// Cancel the running COPY TO query
// See https://www.postgresql.org/docs/9.5/static/protocol-flow.html#PROTOCOL-COPY
const runningClient = client;
const cancelingClient = new Client(runningClient.connectionParameters);
cancelingClient.cancel(runningClient, pgstream);
const err = new Error('Connection closed by client');
pgstream.unpipe(res);
// see https://node-postgres.com/api/pool#releasecallback
done(err);
return cb(err);
}
})
.on('end', () => responseEnded = true);
pgstream
.on('error', err => {
if (!connectionClosedByClient) {
pgstream.unpipe(res);
done(err);
return cb(err);
}
})
.on('data', data => metrics.addSize(data.length)) .on('data', data => metrics.addSize(data.length))
.on('end', () => { .on('end', () => {
metrics.end(copyToStream.rowCount); metrics.end(copyToStream.rowCount);
done();
return cb(null, metrics); return cb(null, metrics);
}) })
.pipe(res); .pipe(res);
@ -34,7 +66,7 @@ module.exports = {
let metrics = new StreamCopyMetrics(logger, 'copyfrom', sql, user, gzip); let metrics = new StreamCopyMetrics(logger, 'copyfrom', sql, user, gzip);
const pg = new PSQL(userDbParams); const pg = new PSQL(userDbParams);
pg.connect(function (err, client) { pg.connect(function (err, client, done) {
if (err) { if (err) {
return cb(err); return cb(err);
} }
@ -44,10 +76,12 @@ module.exports = {
pgstream pgstream
.on('error', err => { .on('error', err => {
req.unpipe(pgstream); req.unpipe(pgstream);
done();
return cb(err); return cb(err);
}) })
.on('end', function () { .on('end', function () {
metrics.end(copyFromStream.rowCount); metrics.end(copyFromStream.rowCount);
done();
return cb(null, metrics); return cb(null, metrics);
}); });
@ -57,13 +91,15 @@ module.exports = {
.on('error', err => { .on('error', err => {
req.unpipe(pgstream); req.unpipe(pgstream);
pgstream.end(); pgstream.end();
done();
return cb(err); return cb(err);
}) })
.on('close', () => { .on('close', () => {
if (!requestEnded) { if (!requestEnded) {
const connection = client.connection; const connection = client.connection;
connection.sendCopyFail(); connection.sendCopyFail('CARTO SQL API: Connection closed by client');
req.unpipe(pgstream); req.unpipe(pgstream);
done();
return cb(new Error('Connection closed by client')); return cb(new Error('Connection closed by client'));
} }
}) })

View File

@ -4,6 +4,7 @@ const fs = require('fs');
const querystring = require('querystring'); const querystring = require('querystring');
const assert = require('../support/assert'); const assert = require('../support/assert');
const os = require('os'); const os = require('os');
const { Client } = require('pg');
const StatsClient = require('../../app/stats/client'); const StatsClient = require('../../app/stats/client');
if (global.settings.statsd) { if (global.settings.statsd) {
@ -18,6 +19,20 @@ const server = require('../../app/server')(statsClient);
describe('copy-endpoints', function() { describe('copy-endpoints', function() {
before(function(done) {
const client = new Client({
user: 'postgres',
host: 'localhost',
database: 'cartodb_test_user_1_db',
port: 5432,
});
client.connect();
client.query('TRUNCATE copy_endpoints_test', (err/*, res */) => {
client.end();
done(err);
});
});
it('should work with copyfrom endpoint', function(done){ it('should work with copyfrom endpoint', function(done){
assert.response(server, { assert.response(server, {
url: "/api/v1/sql/copyfrom?" + querystring.stringify({ url: "/api/v1/sql/copyfrom?" + querystring.stringify({
@ -178,7 +193,7 @@ describe('copy-endpoints', function() {
}); });
describe('copy-endpoints timeout', function() { describe('copy-endpoints timeout', function() {
it('should fail with copyfrom and timeout', function(done){ it('should fail with copyfrom and timeout', function(done){
assert.response(server, { assert.response(server, {
url: '/api/v1/sql?q=set statement_timeout = 10', url: '/api/v1/sql?q=set statement_timeout = 10',
@ -255,3 +270,63 @@ describe('copy-endpoints timeout', function() {
}); });
}); });
}); });
describe('copy-endpoints db connections', function() {
before(function() {
this.db_pool_size = global.settings.db_pool_size;
global.settings.db_pool_size = 1;
});
after(function() {
global.settings.db_pool_size = this.db_pool_size;
});
it('copyfrom', function(done) {
const query = "COPY copy_endpoints_test (id, name) FROM STDIN WITH (FORMAT CSV, DELIMITER ',', HEADER true)";
function doCopyFrom() {
return new Promise(resolve => {
assert.response(server, {
url: "/api/v1/sql/copyfrom?" + querystring.stringify({
q: query
}),
data: fs.createReadStream(__dirname + '/../support/csv/copy_test_table.csv'),
headers: {host: 'vizzuality.cartodb.com'},
method: 'POST'
},{}, function(err, res) {
assert.ifError(err);
const response = JSON.parse(res.body);
assert.ok(response.time);
resolve();
});
});
}
Promise.all([doCopyFrom(), doCopyFrom(), doCopyFrom()]).then(function() {
done();
});
});
it('copyto', function(done) {
function doCopyTo() {
return new Promise(resolve => {
assert.response(server, {
url: "/api/v1/sql/copyto?" + querystring.stringify({
q: 'COPY copy_endpoints_test TO STDOUT',
filename: '/tmp/output.dmp'
}),
headers: {host: 'vizzuality.cartodb.com'},
method: 'GET'
},{}, function(err, res) {
assert.ifError(err);
assert.ok(res.body);
resolve();
});
});
}
Promise.all([doCopyTo(), doCopyTo(), doCopyTo()]).then(function() {
done();
});
});
});