Compare commits
5 Commits
v2.x-carto
...
PushSpring
Author | SHA1 | Date | |
---|---|---|---|
|
be7af371d8 | ||
|
17697e98d7 | ||
|
bd4a87d3a0 | ||
|
9d197a91e1 | ||
|
e1ce9a3948 |
13
.travis.yml
13
.travis.yml
@ -1,7 +1,16 @@
|
|||||||
language: node_js
|
language: node_js
|
||||||
node_js:
|
node_js:
|
||||||
- "0.10"
|
- "0.12"
|
||||||
- "0.11"
|
- "4"
|
||||||
|
- "5"
|
||||||
|
- "6"
|
||||||
|
|
||||||
|
addons:
|
||||||
|
postgresql: "9.2"
|
||||||
|
|
||||||
|
services:
|
||||||
|
- postgresql
|
||||||
|
|
||||||
before_install:
|
before_install:
|
||||||
- npm install npm --global
|
- npm install npm --global
|
||||||
env:
|
env:
|
||||||
|
28
copy-to.js
28
copy-to.js
@ -8,7 +8,6 @@ var util = require('util')
|
|||||||
var CopyStreamQuery = function(text, options) {
|
var CopyStreamQuery = function(text, options) {
|
||||||
Transform.call(this, options)
|
Transform.call(this, options)
|
||||||
this.text = text
|
this.text = text
|
||||||
this._listeners = {}
|
|
||||||
this._copyOutResponse = null
|
this._copyOutResponse = null
|
||||||
this.rowCount = 0
|
this.rowCount = 0
|
||||||
}
|
}
|
||||||
@ -20,11 +19,7 @@ var eventTypes = ['close', 'data', 'end', 'error']
|
|||||||
CopyStreamQuery.prototype.submit = function(connection) {
|
CopyStreamQuery.prototype.submit = function(connection) {
|
||||||
connection.query(this.text)
|
connection.query(this.text)
|
||||||
this.connection = connection
|
this.connection = connection
|
||||||
var self = this
|
this.connection.removeAllListeners('copyData')
|
||||||
eventTypes.forEach(function(type) {
|
|
||||||
self._listeners[type] = connection.stream.listeners(type)
|
|
||||||
connection.stream.removeAllListeners(type)
|
|
||||||
})
|
|
||||||
connection.stream.pipe(this)
|
connection.stream.pipe(this)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -36,16 +31,12 @@ var code = {
|
|||||||
}
|
}
|
||||||
|
|
||||||
CopyStreamQuery.prototype._detach = function() {
|
CopyStreamQuery.prototype._detach = function() {
|
||||||
this.connection.stream.unpipe()
|
this.connection.stream.unpipe(this)
|
||||||
var self = this
|
// Unpipe can drop us out of flowing mode
|
||||||
eventTypes.forEach(function(type) {
|
this.connection.stream.resume()
|
||||||
self.connection.stream.removeAllListeners(type)
|
|
||||||
self._listeners[type].forEach(function(listener) {
|
|
||||||
self.connection.stream.on(type, listener)
|
|
||||||
})
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
CopyStreamQuery.prototype._transform = function(chunk, enc, cb) {
|
CopyStreamQuery.prototype._transform = function(chunk, enc, cb) {
|
||||||
var offset = 0
|
var offset = 0
|
||||||
if(this._remainder && chunk) {
|
if(this._remainder && chunk) {
|
||||||
@ -55,7 +46,6 @@ CopyStreamQuery.prototype._transform = function(chunk, enc, cb) {
|
|||||||
this._copyOutResponse = true
|
this._copyOutResponse = true
|
||||||
if(chunk[0] == code.E) {
|
if(chunk[0] == code.E) {
|
||||||
this._detach()
|
this._detach()
|
||||||
this.connection.stream.unshift(chunk)
|
|
||||||
this.push(null)
|
this.push(null)
|
||||||
return cb();
|
return cb();
|
||||||
}
|
}
|
||||||
@ -71,11 +61,6 @@ CopyStreamQuery.prototype._transform = function(chunk, enc, cb) {
|
|||||||
//complete or error
|
//complete or error
|
||||||
if(messageCode == code.c || messageCode == code.E) {
|
if(messageCode == code.c || messageCode == code.E) {
|
||||||
this._detach()
|
this._detach()
|
||||||
if (messageCode == code.c) {
|
|
||||||
this.connection.stream.unshift(chunk.slice(offset + 5))
|
|
||||||
} else {
|
|
||||||
this.connection.stream.unshift(chunk.slice(offset))
|
|
||||||
}
|
|
||||||
this.push(null)
|
this.push(null)
|
||||||
return cb();
|
return cb();
|
||||||
}
|
}
|
||||||
@ -108,6 +93,9 @@ CopyStreamQuery.prototype.handleError = function(e) {
|
|||||||
this.emit('error', e)
|
this.emit('error', e)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
CopyStreamQuery.prototype.handleCopyData = function(chunk) {
|
||||||
|
}
|
||||||
|
|
||||||
CopyStreamQuery.prototype.handleCommandComplete = function() {
|
CopyStreamQuery.prototype.handleCommandComplete = function() {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
1
index.js
1
index.js
@ -1,5 +1,4 @@
|
|||||||
var CopyToQueryStream = require('./copy-to')
|
var CopyToQueryStream = require('./copy-to')
|
||||||
|
|
||||||
module.exports = {
|
module.exports = {
|
||||||
to: function(txt, options) {
|
to: function(txt, options) {
|
||||||
return new CopyToQueryStream(txt, options)
|
return new CopyToQueryStream(txt, options)
|
||||||
|
@ -23,7 +23,7 @@
|
|||||||
"url": "https://github.com/brianc/node-pg-copy-streams/issues"
|
"url": "https://github.com/brianc/node-pg-copy-streams/issues"
|
||||||
},
|
},
|
||||||
"devDependencies": {
|
"devDependencies": {
|
||||||
"pg.js": "~2.8.1",
|
"pg": "~4.4.3",
|
||||||
"concat-stream": "~1.1.0",
|
"concat-stream": "~1.1.0",
|
||||||
"gonna": "0.0.0",
|
"gonna": "0.0.0",
|
||||||
"lodash": "~2.2.1",
|
"lodash": "~2.2.1",
|
||||||
|
@ -4,7 +4,7 @@ var gonna = require('gonna')
|
|||||||
var async = require('async')
|
var async = require('async')
|
||||||
var concat = require('concat-stream')
|
var concat = require('concat-stream')
|
||||||
var _ = require('lodash')
|
var _ = require('lodash')
|
||||||
var pg = require('pg.js')
|
var pg = require('pg')
|
||||||
|
|
||||||
var from = require('../').from
|
var from = require('../').from
|
||||||
var to = require('../').to
|
var to = require('../').to
|
||||||
|
@ -3,7 +3,7 @@ var gonna = require('gonna')
|
|||||||
|
|
||||||
var concat = require('concat-stream')
|
var concat = require('concat-stream')
|
||||||
var _ = require('lodash')
|
var _ = require('lodash')
|
||||||
var pg = require('pg.js')
|
var pg = require('pg')
|
||||||
|
|
||||||
var copy = require('../').from
|
var copy = require('../').from
|
||||||
|
|
||||||
|
@ -4,7 +4,7 @@ var gonna = require('gonna')
|
|||||||
var _ = require('lodash')
|
var _ = require('lodash')
|
||||||
var async = require('async')
|
var async = require('async')
|
||||||
var concat = require('concat-stream')
|
var concat = require('concat-stream')
|
||||||
var pg = require('pg.js')
|
var pg = require('pg')
|
||||||
|
|
||||||
var copy = require('../').to
|
var copy = require('../').to
|
||||||
|
|
||||||
@ -25,6 +25,8 @@ testConstruction()
|
|||||||
var testRange = function(top) {
|
var testRange = function(top) {
|
||||||
var fromClient = client()
|
var fromClient = client()
|
||||||
var txt = 'COPY (SELECT * from generate_series(0, ' + (top - 1) + ')) TO STDOUT'
|
var txt = 'COPY (SELECT * from generate_series(0, ' + (top - 1) + ')) TO STDOUT'
|
||||||
|
var res;
|
||||||
|
|
||||||
|
|
||||||
var stream = fromClient.query(copy(txt))
|
var stream = fromClient.query(copy(txt))
|
||||||
var done = gonna('finish piping out', 1000, function() {
|
var done = gonna('finish piping out', 1000, function() {
|
||||||
@ -32,41 +34,19 @@ var testRange = function(top) {
|
|||||||
})
|
})
|
||||||
|
|
||||||
stream.pipe(concat(function(buf) {
|
stream.pipe(concat(function(buf) {
|
||||||
var res = buf.toString('utf8')
|
res = buf.toString('utf8')
|
||||||
|
}))
|
||||||
|
|
||||||
|
stream.on('end', function() {
|
||||||
var expected = _.range(0, top).join('\n') + '\n'
|
var expected = _.range(0, top).join('\n') + '\n'
|
||||||
assert.equal(res, expected)
|
assert.equal(res, expected)
|
||||||
assert.equal(stream.rowCount, top, 'should have rowCount ' + top + ' but got ' + stream.rowCount)
|
assert.equal(stream.rowCount, top, 'should have rowCount ' + top + ' but got ' + stream.rowCount)
|
||||||
done()
|
done()
|
||||||
}))
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
testRange(10000)
|
testRange(10000)
|
||||||
|
|
||||||
var testLeak = function(rounds) {
|
|
||||||
var fromClient = client()
|
|
||||||
var txt = 'COPY (SELECT 10) TO STDOUT'
|
|
||||||
|
|
||||||
var runStream = function(num, callback) {
|
|
||||||
var stream = fromClient.query(copy(txt))
|
|
||||||
stream.on('data', function(data) {
|
|
||||||
// Just throw away the data.
|
|
||||||
})
|
|
||||||
stream.on('end', callback)
|
|
||||||
stream.on('error', callback)
|
|
||||||
}
|
|
||||||
|
|
||||||
async.timesSeries(rounds, runStream, function(err) {
|
|
||||||
assert.equal(err, null)
|
|
||||||
assert.equal(fromClient.connection.stream.listeners('close').length, 0)
|
|
||||||
assert.equal(fromClient.connection.stream.listeners('data').length, 1)
|
|
||||||
assert.equal(fromClient.connection.stream.listeners('end').length, 2)
|
|
||||||
assert.equal(fromClient.connection.stream.listeners('error').length, 1)
|
|
||||||
fromClient.end()
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
testLeak(5)
|
|
||||||
|
|
||||||
var testInternalPostgresError = function() {
|
var testInternalPostgresError = function() {
|
||||||
var cancelClient = client()
|
var cancelClient = client()
|
||||||
var queryClient = client()
|
var queryClient = client()
|
||||||
|
Loading…
Reference in New Issue
Block a user