diff --git a/NEWS.carto.md b/NEWS.carto.md index d7d0fe0..80f2e3e 100644 --- a/NEWS.carto.md +++ b/NEWS.carto.md @@ -1,10 +1,11 @@ # CARTO's Changelog ## v2.2.0-carto.1 -Released 2018-mm-dd +Released 2019-mm-dd Bug fixes: * Copy to: ensure stream is detached when finished + * Copy to: deal with interspersed messages properly. See [#9](https://github.com/CartoDB/node-pg-copy-streams/pull/9) ## v1.2.0-carto.3 Released 2018-11-21 diff --git a/copy-to.js b/copy-to.js index 5b843d3..1fcd828 100644 --- a/copy-to.js +++ b/copy-to.js @@ -95,13 +95,20 @@ CopyStreamQuery.prototype._transform = function(chunk, enc, cb) { length = chunk.readUInt32BE(offset+Byte1Len) if(chunk.length >= (offset + Byte1Len + length)) { offset += Byte1Len + Int32Len - if (needPush) { - var row = chunk.slice(offset, offset + length - Int32Len) - this.rowCount++ - row.copy(buffer, buffer_offset); - buffer_offset += row.length; + var message = chunk.slice(offset, offset + length - Int32Len) + switch(messageCode) { + case code.CopyData: + this.rowCount++; + message.copy(buffer, buffer_offset); + buffer_offset += message.length; + break; + case code.ParameterStatus: + case code.NoticeResponse: + case code.NotificationResponse: + this.emit('warning', 'Got an interspersed message: ' + message); + break; } - offset += (length - Int32Len) + offset += (length - Int32Len); } else { // we need more chunks for a complete message break; diff --git a/test/copy-to.js b/test/copy-to.js index d49eea9..e885c27 100644 --- a/test/copy-to.js +++ b/test/copy-to.js @@ -117,6 +117,58 @@ var testNoticeResponse = function() { } testNoticeResponse(); +var warnAndReturnOne = ` +CREATE OR REPLACE FUNCTION pg_temp.test_warn_return_one() +RETURNS INTEGER +AS $$ +BEGIN + RAISE WARNING 'hey, this is returning one'; + RETURN 1; +END; +$$ LANGUAGE plpgsql`; + +var testInterspersedMessageDoesNotBreakCopyFlow = function() { + var toClient = client(); + toClient.query(warnAndReturnOne, (err, res) => { + var q = "COPY (SELECT * FROM pg_temp.test_warn_return_one()) TO STDOUT WITH (FORMAT 'csv', HEADER true)"; + var stream = toClient.query(copy(q)); + var done = gonna('got expected COPY TO payload', 1000, function() { + toClient.end(); + }); + + stream.pipe(concat(function(buf) { + res = buf.toString('utf8') + })); + + stream.on('end', function() { + var expected = "test_warn_return_one\n1\n"; + assert.equal(res, expected); + // note the header counts as a row + assert.equal(stream.rowCount, 2, 'should have rowCount = 2 but got ' + stream.rowCount); + done(); + }); + }); +}; +testInterspersedMessageDoesNotBreakCopyFlow(); + +var testInterspersedMessageEmitsWarnign = function() { + var toClient = client(); + toClient.query(warnAndReturnOne, (err, res) => { + var q = "COPY (SELECT * FROM pg_temp.test_warn_return_one()) TO STDOUT WITH (FORMAT 'csv', HEADER true)"; + var stream = toClient.query(copy(q)); + var done = gonna('got expected warning event', 1000, function() { + toClient.end(); + }); + + stream.on('warning', function(msg) { + assert(msg.match(/Got an interspersed message:.*WARNING.*hey, this is returning one/), + 'did not get expected warning for interspersed message in COPY TO'); + done(); + }) + }); +}; +testInterspersedMessageEmitsWarnign(); + var testClientReuse = function() { var c = client(); var limit = 100000;