diff --git a/copy-to.js b/copy-to.js index 891aea3..4fd02ce 100644 --- a/copy-to.js +++ b/copy-to.js @@ -8,18 +8,23 @@ var util = require('util') var CopyStreamQuery = function(text) { Transform.call(this) this.text = text - this._listeners = null + this._listeners = {} this._copyOutResponse = null this.rowCount = 0 } util.inherits(CopyStreamQuery, Transform) +var eventTypes = ['close', 'data', 'end'] + CopyStreamQuery.prototype.submit = function(connection) { connection.query(this.text) this.connection = connection - this._listeners = connection.stream.listeners('data') - connection.stream.removeAllListeners('data') + var self = this + eventTypes.forEach(function(type) { + self._listeners[type] = connection.stream.listeners(type) + connection.stream.removeAllListeners(type) + }) connection.stream.pipe(this) } @@ -32,10 +37,12 @@ var code = { CopyStreamQuery.prototype._detach = function() { this.connection.stream.unpipe() - this.connection.stream.removeAllListeners('data') var self = this - this._listeners.forEach(function(listener) { - self.connection.stream.on('data', listener) + eventTypes.forEach(function(type) { + self.connection.stream.removeAllListeners(type) + self._listeners[type].forEach(function(listener) { + self.connection.stream.on(type, listener) + }) }) } diff --git a/package.json b/package.json index e596a02..a04d140 100644 --- a/package.json +++ b/package.json @@ -27,6 +27,7 @@ "concat-stream": "~1.1.0", "gonna": "0.0.0", "lodash": "~2.2.1", - "heroku-env": "~0.1.1" + "heroku-env": "~0.1.1", + "async": "~0.2.10" } } diff --git a/test/copy-to.js b/test/copy-to.js index 2d99f0b..adae358 100644 --- a/test/copy-to.js +++ b/test/copy-to.js @@ -1,20 +1,21 @@ var assert = require('assert') var gonna = require('gonna') -var concat = require('concat-stream') var _ = require('lodash') +var async = require('async') +var concat = require('concat-stream') var pg = require('pg.js') +var copy = require('../').to + +var client = function() { + var client = new pg.Client() + client.connect() + return client +} + var testRange = function(top) { - var client = function() { - var client = new pg.Client() - client.connect() - return client - } - var fromClient = client() - var copy = require('../').to - var txt = 'COPY (SELECT * from generate_series(0, ' + (top - 1) + ')) TO STDOUT' var stream = fromClient.query(copy(txt)) @@ -32,3 +33,27 @@ var testRange = function(top) { } testRange(10000) + +var testLeak = function(rounds) { + var fromClient = client() + var txt = 'COPY (SELECT 10) TO STDOUT' + + var runStream = function(num, callback) { + var stream = fromClient.query(copy(txt)) + stream.on('data', function(data) { + // Just throw away the data. + }) + stream.on('end', callback) + stream.on('error', callback) + } + + async.timesSeries(rounds, runStream, function(err) { + assert.equal(err, null) + assert.equal(fromClient.connection.stream.listeners('data').length, 1) + assert.equal(fromClient.connection.stream.listeners('end').length, 2) + assert.equal(fromClient.connection.stream.listeners('close').length, 0) + fromClient.end() + }) +} + +testLeak(5)