From 002a36bcfcad68244d29af5e01cf0fecdf05868e Mon Sep 17 00:00:00 2001 From: Rafa de la Torre Date: Tue, 4 Jun 2019 12:21:46 +0200 Subject: [PATCH 1/5] Fix for interspersed messages The way the messages were buffered caused interspersed messages to be inserted in the middle of CopyData messages disrupting the normal COPY TO flow. This fixes it by consuming (adjusting offsets as appropriate) and just logging them to console, effectively discarding them from the COPY flow. --- copy-to.js | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/copy-to.js b/copy-to.js index 5b843d3..c7d268f 100644 --- a/copy-to.js +++ b/copy-to.js @@ -95,13 +95,20 @@ CopyStreamQuery.prototype._transform = function(chunk, enc, cb) { length = chunk.readUInt32BE(offset+Byte1Len) if(chunk.length >= (offset + Byte1Len + length)) { offset += Byte1Len + Int32Len - if (needPush) { - var row = chunk.slice(offset, offset + length - Int32Len) - this.rowCount++ - row.copy(buffer, buffer_offset); - buffer_offset += row.length; + var message = chunk.slice(offset, offset + length - Int32Len) + switch(messageCode) { + case code.CopyData: + this.rowCount++; + message.copy(buffer, buffer_offset); + buffer_offset += message.length; + break; + case code.ParameterStatus: + case code.NoticeResponse: + case code.NotificationResponse: + console.log("Got an interspersed message: " + message); + break; } - offset += (length - Int32Len) + offset += (length - Int32Len); } else { // we need more chunks for a complete message break; From 5fec3a5cc13c323f60cb10d1a4f38b81d278e107 Mon Sep 17 00:00:00 2001 From: Rafa de la Torre Date: Tue, 4 Jun 2019 14:37:04 +0200 Subject: [PATCH 2/5] Replace console.log with an event --- copy-to.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/copy-to.js b/copy-to.js index c7d268f..1fcd828 100644 --- a/copy-to.js +++ b/copy-to.js @@ -105,7 +105,7 @@ CopyStreamQuery.prototype._transform = function(chunk, enc, cb) { case code.ParameterStatus: case code.NoticeResponse: case code.NotificationResponse: - console.log("Got an interspersed message: " + message); + this.emit('warning', 'Got an interspersed message: ' + message); break; } offset += (length - Int32Len); From 0c787920a4e3ca9e15bf6153e5f47420599627c4 Mon Sep 17 00:00:00 2001 From: Rafa de la Torre Date: Tue, 4 Jun 2019 15:18:46 +0200 Subject: [PATCH 3/5] Test to assert interspersed messages do not break COPY TO flow --- test/copy-to.js | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/test/copy-to.js b/test/copy-to.js index d49eea9..bca3d95 100644 --- a/test/copy-to.js +++ b/test/copy-to.js @@ -117,6 +117,40 @@ var testNoticeResponse = function() { } testNoticeResponse(); +var warnAndReturnOne = ` +CREATE OR REPLACE FUNCTION pg_temp.test_warn_return_one() +RETURNS INTEGER +AS $$ +BEGIN + RAISE WARNING 'hey, this is returning one'; + RETURN 1; +END; +$$ LANGUAGE plpgsql`; + +var testInterspersedMessageDoesNotBreakCopyFlow = function() { + var toClient = client(); + toClient.query(warnAndReturnOne, (err, res) => { + var q = "COPY (SELECT * FROM pg_temp.test_warn_return_one()) TO STDOUT WITH (FORMAT 'csv', HEADER true)"; + var stream = toClient.query(copy(q)); + var done = gonna('got expected COPY TO payload', 1000, function() { + toClient.end(); + }); + + stream.pipe(concat(function(buf) { + res = buf.toString('utf8') + })); + + stream.on('end', function() { + var expected = "test_warn_return_one\n1\n"; + assert.equal(res, expected); + // note the header counts as a row + assert.equal(stream.rowCount, 2, 'should have rowCount = 2 but got ' + stream.rowCount); + done(); + }); + }); +}; +testInterspersedMessageDoesNotBreakCopyFlow(); + var testClientReuse = function() { var c = client(); var limit = 100000; From 645616c2e0811c72e8ee8b468ab9aa83da8f942f Mon Sep 17 00:00:00 2001 From: Rafa de la Torre Date: Tue, 4 Jun 2019 15:41:59 +0200 Subject: [PATCH 4/5] Test interspersed messages emit a warning event --- test/copy-to.js | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/test/copy-to.js b/test/copy-to.js index bca3d95..e885c27 100644 --- a/test/copy-to.js +++ b/test/copy-to.js @@ -151,6 +151,24 @@ var testInterspersedMessageDoesNotBreakCopyFlow = function() { }; testInterspersedMessageDoesNotBreakCopyFlow(); +var testInterspersedMessageEmitsWarnign = function() { + var toClient = client(); + toClient.query(warnAndReturnOne, (err, res) => { + var q = "COPY (SELECT * FROM pg_temp.test_warn_return_one()) TO STDOUT WITH (FORMAT 'csv', HEADER true)"; + var stream = toClient.query(copy(q)); + var done = gonna('got expected warning event', 1000, function() { + toClient.end(); + }); + + stream.on('warning', function(msg) { + assert(msg.match(/Got an interspersed message:.*WARNING.*hey, this is returning one/), + 'did not get expected warning for interspersed message in COPY TO'); + done(); + }) + }); +}; +testInterspersedMessageEmitsWarnign(); + var testClientReuse = function() { var c = client(); var limit = 100000; From 0acb07290646d7c606c2310639269c2e86db2cee Mon Sep 17 00:00:00 2001 From: Rafa de la Torre Date: Tue, 4 Jun 2019 16:42:45 +0200 Subject: [PATCH 5/5] Update NEW.carto.md with fix for interspersed messages --- NEWS.carto.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/NEWS.carto.md b/NEWS.carto.md index d7d0fe0..80f2e3e 100644 --- a/NEWS.carto.md +++ b/NEWS.carto.md @@ -1,10 +1,11 @@ # CARTO's Changelog ## v2.2.0-carto.1 -Released 2018-mm-dd +Released 2019-mm-dd Bug fixes: * Copy to: ensure stream is detached when finished + * Copy to: deal with interspersed messages properly. See [#9](https://github.com/CartoDB/node-pg-copy-streams/pull/9) ## v1.2.0-carto.3 Released 2018-11-21