Compare commits

...

13 Commits

Author SHA1 Message Date
Rafa de la Torre
c9157cd1ab
Merge pull request #9 from CartoDB/v2.x-carto-fix-interspersed-messages
Fix for interspersed messages
2019-06-04 16:52:35 +02:00
Rafa de la Torre
0acb072906 Update NEW.carto.md with fix for interspersed messages 2019-06-04 16:42:45 +02:00
Rafa de la Torre
645616c2e0 Test interspersed messages emit a warning event 2019-06-04 15:41:59 +02:00
Rafa de la Torre
0c787920a4 Test to assert interspersed messages do not break COPY TO flow 2019-06-04 15:32:10 +02:00
Rafa de la Torre
5fec3a5cc1 Replace console.log with an event 2019-06-04 14:37:04 +02:00
Rafa de la Torre
002a36bcfc Fix for interspersed messages
The way the messages were buffered caused interspersed messages to be
inserted in the middle of CopyData messages disrupting the normal COPY
TO flow.

This fixes it by consuming (adjusting offsets as appropriate) and just
logging them to console, effectively discarding them from the COPY
flow.
2019-06-04 12:21:46 +02:00
Daniel G. Aubert
be101502d9
Merge pull request #8 from CartoDB/copyto-detach-connection-after-done
Do not detach streams before finishing
2019-05-24 17:20:37 +02:00
Daniel García Aubert
c865db412b Update NEWS 2019-05-24 17:11:26 +02:00
Daniel García Aubert
edc0462470 Do not detach streams before finishing 2019-05-24 17:06:47 +02:00
Daniel G. Aubert
d26449cc69
Merge pull request #7 from CartoDB/add-carto-readme
Do not set postgres version
2019-05-24 16:55:55 +02:00
Daniel García Aubert
6b12cbc876 Do not set postgres version 2019-05-24 16:47:10 +02:00
Daniel G. Aubert
27ff2465ce
Merge pull request #5 from CartoDB/add-carto-readme
Add NEWS Carto
2019-05-24 16:41:51 +02:00
Daniel García Aubert
69b5b60e9f Add NEWS Carto 2019-05-24 16:34:57 +02:00
4 changed files with 100 additions and 9 deletions

View File

@ -4,8 +4,6 @@ node_js:
- "8"
- "10"
- "11"
addons:
postgresql: "9.2"
services:
- postgresql

34
NEWS.carto.md Normal file
View File

@ -0,0 +1,34 @@
# CARTO's Changelog
## v2.2.0-carto.1
Released 2019-mm-dd
Bug fixes:
* Copy to: ensure stream is detached when finished
* Copy to: deal with interspersed messages properly. See [#9](https://github.com/CartoDB/node-pg-copy-streams/pull/9)
## v1.2.0-carto.3
Released 2018-11-21
Features:
* Drop support for Node.js 0.12, 4 and, 5.
* Add support for Node.js 8 and 10.
* Add package-lock.json
* Do not use deprecated Buffer constructors.
## v1.2.0-carto.2
Released 2018-10-26
Bug fixes:
* Make all modules to use strict mode semantics.
## v1.2.0-carto.1
Released 2018-06-11
Bug fixes:
* Improves performance of COPY TO by sending bigger chunks through low level `push()`. See https://github.com/CartoDB/node-pg-copy-streams/pull/1
## v1.2.0
Released 2016-08-22
Vanilla version v1.2.0 from upstream repository. See https://github.com/CartoDB/node-pg-copy-streams/releases/tag/v1.2.0

View File

@ -23,6 +23,7 @@ CopyStreamQuery.prototype.submit = function(connection) {
connection.query(this.text)
this.connection = connection
this.connection.removeAllListeners('copyData')
this.on('end', () => this._detach())
connection.stream.pipe(this)
}
@ -84,7 +85,6 @@ CopyStreamQuery.prototype._transform = function(chunk, enc, cb) {
case code.ErrorResponse:
case code.CopyDone:
pushBufferIfneeded();
this._detach()
this.push(null)
return cb();
break;
@ -95,13 +95,20 @@ CopyStreamQuery.prototype._transform = function(chunk, enc, cb) {
length = chunk.readUInt32BE(offset+Byte1Len)
if(chunk.length >= (offset + Byte1Len + length)) {
offset += Byte1Len + Int32Len
if (needPush) {
var row = chunk.slice(offset, offset + length - Int32Len)
this.rowCount++
row.copy(buffer, buffer_offset);
buffer_offset += row.length;
var message = chunk.slice(offset, offset + length - Int32Len)
switch(messageCode) {
case code.CopyData:
this.rowCount++;
message.copy(buffer, buffer_offset);
buffer_offset += message.length;
break;
case code.ParameterStatus:
case code.NoticeResponse:
case code.NotificationResponse:
this.emit('warning', 'Got an interspersed message: ' + message);
break;
}
offset += (length - Int32Len)
offset += (length - Int32Len);
} else {
// we need more chunks for a complete message
break;

View File

@ -117,6 +117,58 @@ var testNoticeResponse = function() {
}
testNoticeResponse();
var warnAndReturnOne = `
CREATE OR REPLACE FUNCTION pg_temp.test_warn_return_one()
RETURNS INTEGER
AS $$
BEGIN
RAISE WARNING 'hey, this is returning one';
RETURN 1;
END;
$$ LANGUAGE plpgsql`;
var testInterspersedMessageDoesNotBreakCopyFlow = function() {
var toClient = client();
toClient.query(warnAndReturnOne, (err, res) => {
var q = "COPY (SELECT * FROM pg_temp.test_warn_return_one()) TO STDOUT WITH (FORMAT 'csv', HEADER true)";
var stream = toClient.query(copy(q));
var done = gonna('got expected COPY TO payload', 1000, function() {
toClient.end();
});
stream.pipe(concat(function(buf) {
res = buf.toString('utf8')
}));
stream.on('end', function() {
var expected = "test_warn_return_one\n1\n";
assert.equal(res, expected);
// note the header counts as a row
assert.equal(stream.rowCount, 2, 'should have rowCount = 2 but got ' + stream.rowCount);
done();
});
});
};
testInterspersedMessageDoesNotBreakCopyFlow();
var testInterspersedMessageEmitsWarnign = function() {
var toClient = client();
toClient.query(warnAndReturnOne, (err, res) => {
var q = "COPY (SELECT * FROM pg_temp.test_warn_return_one()) TO STDOUT WITH (FORMAT 'csv', HEADER true)";
var stream = toClient.query(copy(q));
var done = gonna('got expected warning event', 1000, function() {
toClient.end();
});
stream.on('warning', function(msg) {
assert(msg.match(/Got an interspersed message:.*WARNING.*hey, this is returning one/),
'did not get expected warning for interspersed message in COPY TO');
done();
})
});
};
testInterspersedMessageEmitsWarnign();
var testClientReuse = function() {
var c = client();
var limit = 100000;