Compare commits
13 Commits
master
...
v2.x-carto
Author | SHA1 | Date | |
---|---|---|---|
|
c9157cd1ab | ||
|
0acb072906 | ||
|
645616c2e0 | ||
|
0c787920a4 | ||
|
5fec3a5cc1 | ||
|
002a36bcfc | ||
|
be101502d9 | ||
|
c865db412b | ||
|
edc0462470 | ||
|
d26449cc69 | ||
|
6b12cbc876 | ||
|
27ff2465ce | ||
|
69b5b60e9f |
@ -4,8 +4,6 @@ node_js:
|
||||
- "8"
|
||||
- "10"
|
||||
- "11"
|
||||
addons:
|
||||
postgresql: "9.2"
|
||||
|
||||
services:
|
||||
- postgresql
|
||||
|
34
NEWS.carto.md
Normal file
34
NEWS.carto.md
Normal file
@ -0,0 +1,34 @@
|
||||
# CARTO's Changelog
|
||||
|
||||
## v2.2.0-carto.1
|
||||
Released 2019-mm-dd
|
||||
|
||||
Bug fixes:
|
||||
* Copy to: ensure stream is detached when finished
|
||||
* Copy to: deal with interspersed messages properly. See [#9](https://github.com/CartoDB/node-pg-copy-streams/pull/9)
|
||||
|
||||
## v1.2.0-carto.3
|
||||
Released 2018-11-21
|
||||
|
||||
Features:
|
||||
* Drop support for Node.js 0.12, 4 and, 5.
|
||||
* Add support for Node.js 8 and 10.
|
||||
* Add package-lock.json
|
||||
* Do not use deprecated Buffer constructors.
|
||||
|
||||
## v1.2.0-carto.2
|
||||
Released 2018-10-26
|
||||
|
||||
Bug fixes:
|
||||
* Make all modules to use strict mode semantics.
|
||||
|
||||
## v1.2.0-carto.1
|
||||
Released 2018-06-11
|
||||
|
||||
Bug fixes:
|
||||
* Improves performance of COPY TO by sending bigger chunks through low level `push()`. See https://github.com/CartoDB/node-pg-copy-streams/pull/1
|
||||
|
||||
## v1.2.0
|
||||
Released 2016-08-22
|
||||
|
||||
Vanilla version v1.2.0 from upstream repository. See https://github.com/CartoDB/node-pg-copy-streams/releases/tag/v1.2.0
|
21
copy-to.js
21
copy-to.js
@ -23,6 +23,7 @@ CopyStreamQuery.prototype.submit = function(connection) {
|
||||
connection.query(this.text)
|
||||
this.connection = connection
|
||||
this.connection.removeAllListeners('copyData')
|
||||
this.on('end', () => this._detach())
|
||||
connection.stream.pipe(this)
|
||||
}
|
||||
|
||||
@ -84,7 +85,6 @@ CopyStreamQuery.prototype._transform = function(chunk, enc, cb) {
|
||||
case code.ErrorResponse:
|
||||
case code.CopyDone:
|
||||
pushBufferIfneeded();
|
||||
this._detach()
|
||||
this.push(null)
|
||||
return cb();
|
||||
break;
|
||||
@ -95,13 +95,20 @@ CopyStreamQuery.prototype._transform = function(chunk, enc, cb) {
|
||||
length = chunk.readUInt32BE(offset+Byte1Len)
|
||||
if(chunk.length >= (offset + Byte1Len + length)) {
|
||||
offset += Byte1Len + Int32Len
|
||||
if (needPush) {
|
||||
var row = chunk.slice(offset, offset + length - Int32Len)
|
||||
this.rowCount++
|
||||
row.copy(buffer, buffer_offset);
|
||||
buffer_offset += row.length;
|
||||
var message = chunk.slice(offset, offset + length - Int32Len)
|
||||
switch(messageCode) {
|
||||
case code.CopyData:
|
||||
this.rowCount++;
|
||||
message.copy(buffer, buffer_offset);
|
||||
buffer_offset += message.length;
|
||||
break;
|
||||
case code.ParameterStatus:
|
||||
case code.NoticeResponse:
|
||||
case code.NotificationResponse:
|
||||
this.emit('warning', 'Got an interspersed message: ' + message);
|
||||
break;
|
||||
}
|
||||
offset += (length - Int32Len)
|
||||
offset += (length - Int32Len);
|
||||
} else {
|
||||
// we need more chunks for a complete message
|
||||
break;
|
||||
|
@ -117,6 +117,58 @@ var testNoticeResponse = function() {
|
||||
}
|
||||
testNoticeResponse();
|
||||
|
||||
var warnAndReturnOne = `
|
||||
CREATE OR REPLACE FUNCTION pg_temp.test_warn_return_one()
|
||||
RETURNS INTEGER
|
||||
AS $$
|
||||
BEGIN
|
||||
RAISE WARNING 'hey, this is returning one';
|
||||
RETURN 1;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql`;
|
||||
|
||||
var testInterspersedMessageDoesNotBreakCopyFlow = function() {
|
||||
var toClient = client();
|
||||
toClient.query(warnAndReturnOne, (err, res) => {
|
||||
var q = "COPY (SELECT * FROM pg_temp.test_warn_return_one()) TO STDOUT WITH (FORMAT 'csv', HEADER true)";
|
||||
var stream = toClient.query(copy(q));
|
||||
var done = gonna('got expected COPY TO payload', 1000, function() {
|
||||
toClient.end();
|
||||
});
|
||||
|
||||
stream.pipe(concat(function(buf) {
|
||||
res = buf.toString('utf8')
|
||||
}));
|
||||
|
||||
stream.on('end', function() {
|
||||
var expected = "test_warn_return_one\n1\n";
|
||||
assert.equal(res, expected);
|
||||
// note the header counts as a row
|
||||
assert.equal(stream.rowCount, 2, 'should have rowCount = 2 but got ' + stream.rowCount);
|
||||
done();
|
||||
});
|
||||
});
|
||||
};
|
||||
testInterspersedMessageDoesNotBreakCopyFlow();
|
||||
|
||||
var testInterspersedMessageEmitsWarnign = function() {
|
||||
var toClient = client();
|
||||
toClient.query(warnAndReturnOne, (err, res) => {
|
||||
var q = "COPY (SELECT * FROM pg_temp.test_warn_return_one()) TO STDOUT WITH (FORMAT 'csv', HEADER true)";
|
||||
var stream = toClient.query(copy(q));
|
||||
var done = gonna('got expected warning event', 1000, function() {
|
||||
toClient.end();
|
||||
});
|
||||
|
||||
stream.on('warning', function(msg) {
|
||||
assert(msg.match(/Got an interspersed message:.*WARNING.*hey, this is returning one/),
|
||||
'did not get expected warning for interspersed message in COPY TO');
|
||||
done();
|
||||
})
|
||||
});
|
||||
};
|
||||
testInterspersedMessageEmitsWarnign();
|
||||
|
||||
var testClientReuse = function() {
|
||||
var c = client();
|
||||
var limit = 100000;
|
||||
|
Loading…
Reference in New Issue
Block a user