From 0eab03a7e76ff4e02f8d77d5e0647e2dc8e61748 Mon Sep 17 00:00:00 2001 From: Rafa de la Torre Date: Thu, 31 May 2018 16:41:22 +0200 Subject: [PATCH 01/10] Add a more informative message --- app/services/stream_copy.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/services/stream_copy.js b/app/services/stream_copy.js index b7387a0e..273a1a33 100644 --- a/app/services/stream_copy.js +++ b/app/services/stream_copy.js @@ -62,7 +62,7 @@ module.exports = { .on('close', () => { if (!requestEnded) { const connection = client.connection; - connection.sendCopyFail(); + connection.sendCopyFail('CARTO SQL API: Connection closed by client'); req.unpipe(pgstream); return cb(new Error('Connection closed by client')); } From 332f7096d380dea5e54ef7d6287cc74546a1f1ca Mon Sep 17 00:00:00 2001 From: Rafa de la Torre Date: Thu, 31 May 2018 17:06:17 +0200 Subject: [PATCH 02/10] Listen to response events (on behalf of @oleurud) --- app/services/stream_copy.js | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/app/services/stream_copy.js b/app/services/stream_copy.js index 273a1a33..0427bfb3 100644 --- a/app/services/stream_copy.js +++ b/app/services/stream_copy.js @@ -14,6 +14,21 @@ module.exports = { return cb(err); } + let responseEnded = false; + + res + .on('error', err => { + pgstream.unpipe(res); + return cb(err); + }) + .on('close', () => { + if (!responseEnded) { + + return cb(new Error('Connection closed by client')); + } + }) + .on('end', () => responseEnded = true); + const copyToStream = copyTo(sql); const pgstream = client.query(copyToStream); pgstream From b59ae1d057fb349a4b921fc365eb6ce2f5fac900 Mon Sep 17 00:00:00 2001 From: Rafa de la Torre Date: Thu, 31 May 2018 17:27:35 +0200 Subject: [PATCH 03/10] Make sure the COPY TO query is cancelled Issue a CancelRequest upon client disconnection, to make sure the COPY TO query is cancelled and the connection/session is put back to the pg pool. --- app/services/stream_copy.js | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/app/services/stream_copy.js b/app/services/stream_copy.js index 0427bfb3..b5548bae 100644 --- a/app/services/stream_copy.js +++ b/app/services/stream_copy.js @@ -3,6 +3,7 @@ const PSQL = require('cartodb-psql'); const copyTo = require('pg-copy-streams').to; const copyFrom = require('pg-copy-streams').from; const StreamCopyMetrics = require('./stream_copy_metrics'); +const { Client } = require('pg'); module.exports = { to (res, sql, userDbParams, logger, cb) { @@ -23,6 +24,15 @@ module.exports = { }) .on('close', () => { if (!responseEnded) { + // Cancel the running COPY TO query. + // See https://www.postgresql.org/docs/9.5/static/protocol-flow.html#PROTOCOL-COPY + const runningClient = client; + const cancelingClient = new Client(runningClient.connectionParameters); + const connection = cancelingClient.connection; + connection.connect(runningClient.port, runningClient.host); + connection.on('connect', () => { + connection.cancel(runningClient.processID, runningClient.secretKey); + }); return cb(new Error('Connection closed by client')); } From 994e8a702bd6fda13e42453821bdf621fe4a4e1e Mon Sep 17 00:00:00 2001 From: Rafa de la Torre Date: Thu, 31 May 2018 18:59:17 +0200 Subject: [PATCH 04/10] Add callbacks to pg.connect And call them to return connections to the pool. --- app/services/stream_copy.js | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/app/services/stream_copy.js b/app/services/stream_copy.js index b5548bae..3c3cc143 100644 --- a/app/services/stream_copy.js +++ b/app/services/stream_copy.js @@ -10,7 +10,7 @@ module.exports = { let metrics = new StreamCopyMetrics(logger, 'copyto', sql); const pg = new PSQL(userDbParams); - pg.connect(function (err, client) { + pg.connect(function (err, client, done) { if (err) { return cb(err); } @@ -20,6 +20,7 @@ module.exports = { res .on('error', err => { pgstream.unpipe(res); + done(); return cb(err); }) .on('close', () => { @@ -29,11 +30,11 @@ module.exports = { const runningClient = client; const cancelingClient = new Client(runningClient.connectionParameters); const connection = cancelingClient.connection; - connection.connect(runningClient.port, runningClient.host); connection.on('connect', () => { connection.cancel(runningClient.processID, runningClient.secretKey); + done(); }); - + connection.connect(runningClient.port, runningClient.host); return cb(new Error('Connection closed by client')); } }) @@ -44,11 +45,13 @@ module.exports = { pgstream .on('error', err => { pgstream.unpipe(res); + done(); return cb(err); }) .on('data', data => metrics.addSize(data.length)) .on('end', () => { metrics.end(copyToStream.rowCount); + done(); return cb(null, metrics); }) .pipe(res); @@ -59,7 +62,7 @@ module.exports = { let metrics = new StreamCopyMetrics(logger, 'copyfrom', sql, gzip); const pg = new PSQL(userDbParams); - pg.connect(function (err, client) { + pg.connect(function (err, client, done) { if (err) { return cb(err); } @@ -69,10 +72,12 @@ module.exports = { pgstream .on('error', err => { req.unpipe(pgstream); + done(); return cb(err); }) .on('end', function () { metrics.end(copyFromStream.rowCount); + done(); return cb(null, metrics); }); @@ -82,6 +87,7 @@ module.exports = { .on('error', err => { req.unpipe(pgstream); pgstream.end(); + done(); return cb(err); }) .on('close', () => { @@ -89,6 +95,7 @@ module.exports = { const connection = client.connection; connection.sendCopyFail('CARTO SQL API: Connection closed by client'); req.unpipe(pgstream); + done(); return cb(new Error('Connection closed by client')); } }) From b05ded92aa02ce80d983314d6ef6038e514ec493 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20Mart=C3=ADn?= Date: Fri, 1 Jun 2018 11:26:28 +0200 Subject: [PATCH 05/10] db connections usage test --- test/acceptance/copy-endpoints.js | 62 ++++++++++++++++++++++++++++++- 1 file changed, 61 insertions(+), 1 deletion(-) diff --git a/test/acceptance/copy-endpoints.js b/test/acceptance/copy-endpoints.js index f750986d..45b11ba0 100644 --- a/test/acceptance/copy-endpoints.js +++ b/test/acceptance/copy-endpoints.js @@ -178,7 +178,7 @@ describe('copy-endpoints', function() { }); -describe('copy-endpoints timeout', function() { +describe('copy-endpoints timeout', function() { it('should fail with copyfrom and timeout', function(done){ assert.response(server, { url: '/api/v1/sql?q=set statement_timeout = 10', @@ -255,3 +255,63 @@ describe('copy-endpoints timeout', function() { }); }); }); + + +describe('copy-endpoints db connections', function() { + before(function() { + this.db_pool_size = global.settings.db_pool_size; + global.settings.db_pool_size = 1; + }); + + after(function() { + global.settings.db_pool_size = this.db_pool_size; + }); + + it('copyfrom', function(done) { + const query = "COPY copy_endpoints_test (id, name) FROM STDIN WITH (FORMAT CSV, DELIMITER ',', HEADER true)"; + function doCopyFrom() { + return new Promise(resolve => { + assert.response(server, { + url: "/api/v1/sql/copyfrom?" + querystring.stringify({ + q: query + }), + data: fs.createReadStream(__dirname + '/../support/csv/copy_test_table.csv'), + headers: {host: 'vizzuality.cartodb.com'}, + method: 'POST' + },{}, function(err, res) { + assert.ifError(err); + const response = JSON.parse(res.body); + assert.ok(response.time); + resolve(); + }); + }); + } + + Promise.all([doCopyFrom(), doCopyFrom(), doCopyFrom()]).then(function() { + done(); + }); + }); + + it('copyto', function(done) { + function doCopyTo() { + return new Promise(resolve => { + assert.response(server, { + url: "/api/v1/sql/copyto?" + querystring.stringify({ + q: 'COPY copy_endpoints_test TO STDOUT', + filename: '/tmp/output.dmp' + }), + headers: {host: 'vizzuality.cartodb.com'}, + method: 'GET' + },{}, function(err, res) { + assert.ifError(err); + assert.ok(res.body); + resolve(); + }); + }); + } + + Promise.all([doCopyTo(), doCopyTo(), doCopyTo()]).then(function() { + done(); + }); + }); +}); From 4022fb2967ea61b67ad426da76b52d0f77a0d4fd Mon Sep 17 00:00:00 2001 From: Rafa de la Torre Date: Mon, 4 Jun 2018 15:46:39 +0200 Subject: [PATCH 06/10] Clean up before executing the copy suite So that it can be executed saving a bit of setup/teardown time: test/run_tests.sh --nodrop --nocreate test/acceptance/copy-endpoints.js --- test/acceptance/copy-endpoints.js | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/test/acceptance/copy-endpoints.js b/test/acceptance/copy-endpoints.js index 45b11ba0..4a8ef9cc 100644 --- a/test/acceptance/copy-endpoints.js +++ b/test/acceptance/copy-endpoints.js @@ -4,6 +4,7 @@ const fs = require('fs'); const querystring = require('querystring'); const assert = require('../support/assert'); const os = require('os'); +const { Client } = require('pg'); const StatsClient = require('../../app/stats/client'); if (global.settings.statsd) { @@ -18,6 +19,20 @@ const server = require('../../app/server')(statsClient); describe('copy-endpoints', function() { + before(function(done) { + const client = new Client({ + user: 'postgres', + host: 'localhost', + database: 'cartodb_test_user_1_db', + port: 5432, + }); + client.connect(); + client.query('TRUNCATE copy_endpoints_test', (err/*, res */) => { + client.end(); + done(err); + }); + }); + it('should work with copyfrom endpoint', function(done){ assert.response(server, { url: "/api/v1/sql/copyfrom?" + querystring.stringify({ From 014ea8142b572042adb0772022afdb62b676a2a7 Mon Sep 17 00:00:00 2001 From: Rafa de la Torre Date: Mon, 4 Jun 2018 16:34:37 +0200 Subject: [PATCH 07/10] A cleaner approach to the cancel command --- app/services/stream_copy.js | 27 ++++++++++++++------------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/app/services/stream_copy.js b/app/services/stream_copy.js index 3c3cc143..1b627ec9 100644 --- a/app/services/stream_copy.js +++ b/app/services/stream_copy.js @@ -16,6 +16,9 @@ module.exports = { } let responseEnded = false; + let connectionClosedByClient = false; + const copyToStream = copyTo(sql); + const pgstream = client.query(copyToStream); res .on('error', err => { @@ -25,28 +28,26 @@ module.exports = { }) .on('close', () => { if (!responseEnded) { - // Cancel the running COPY TO query. + connectionClosedByClient = true; + // Cancel the running COPY TO query // See https://www.postgresql.org/docs/9.5/static/protocol-flow.html#PROTOCOL-COPY const runningClient = client; const cancelingClient = new Client(runningClient.connectionParameters); - const connection = cancelingClient.connection; - connection.on('connect', () => { - connection.cancel(runningClient.processID, runningClient.secretKey); - done(); - }); - connection.connect(runningClient.port, runningClient.host); - return cb(new Error('Connection closed by client')); + cancelingClient.cancel(runningClient, pgstream); + pgstream.unpipe(res); + done(new Error('Connection closed by client')); + return cb(err); } }) .on('end', () => responseEnded = true); - const copyToStream = copyTo(sql); - const pgstream = client.query(copyToStream); pgstream .on('error', err => { - pgstream.unpipe(res); - done(); - return cb(err); + if (!connectionClosedByClient) { + pgstream.unpipe(res); + done(new Error('Connection closed by client')); + return cb(err); + } }) .on('data', data => metrics.addSize(data.length)) .on('end', () => { From 8d486ef9673245f36cb34ff800d14ea51c4f64de Mon Sep 17 00:00:00 2001 From: Rafa de la Torre Date: Mon, 4 Jun 2018 19:07:03 +0200 Subject: [PATCH 08/10] Ignore log and yarn.lock --- .gitignore | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.gitignore b/.gitignore index ed26bf95..147f2348 100644 --- a/.gitignore +++ b/.gitignore @@ -12,3 +12,5 @@ test/test.log test/acceptance/oauth/venv/* coverage/ npm-debug.log +log/*.log +yarn.lock \ No newline at end of file From 7b6056b79947e528b6d3ffed26eaf1017e2acd74 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20Mart=C3=ADn?= Date: Mon, 4 Jun 2018 20:36:16 +0200 Subject: [PATCH 09/10] using the correct errors in done --- app/services/stream_copy.js | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/app/services/stream_copy.js b/app/services/stream_copy.js index 1b627ec9..2c8050b9 100644 --- a/app/services/stream_copy.js +++ b/app/services/stream_copy.js @@ -34,8 +34,11 @@ module.exports = { const runningClient = client; const cancelingClient = new Client(runningClient.connectionParameters); cancelingClient.cancel(runningClient, pgstream); + + const err = new Error('Connection closed by client'); pgstream.unpipe(res); - done(new Error('Connection closed by client')); + // see https://node-postgres.com/api/pool#release-err-error- + done(err); return cb(err); } }) @@ -45,7 +48,7 @@ module.exports = { .on('error', err => { if (!connectionClosedByClient) { pgstream.unpipe(res); - done(new Error('Connection closed by client')); + done(); return cb(err); } }) From 66f7ab45fea36427948c90349b86107cd558a2ad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20Mart=C3=ADn?= Date: Mon, 4 Jun 2018 20:51:21 +0200 Subject: [PATCH 10/10] release connection with error --- app/services/stream_copy.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/app/services/stream_copy.js b/app/services/stream_copy.js index 2c8050b9..15d388b7 100644 --- a/app/services/stream_copy.js +++ b/app/services/stream_copy.js @@ -37,7 +37,7 @@ module.exports = { const err = new Error('Connection closed by client'); pgstream.unpipe(res); - // see https://node-postgres.com/api/pool#release-err-error- + // see https://node-postgres.com/api/pool#releasecallback done(err); return cb(err); } @@ -48,7 +48,7 @@ module.exports = { .on('error', err => { if (!connectionClosedByClient) { pgstream.unpipe(res); - done(); + done(err); return cb(err); } })