Merge pull request #8 from drob/handler-leak

Fixes an event handler leak.
This commit is contained in:
Brian C 2014-03-30 00:06:42 -04:00
commit b8946265c3
3 changed files with 49 additions and 16 deletions

View File

@ -8,18 +8,23 @@ var util = require('util')
var CopyStreamQuery = function(text) {
Transform.call(this)
this.text = text
this._listeners = null
this._listeners = {}
this._copyOutResponse = null
this.rowCount = 0
}
util.inherits(CopyStreamQuery, Transform)
var eventTypes = ['close', 'data', 'end']
CopyStreamQuery.prototype.submit = function(connection) {
connection.query(this.text)
this.connection = connection
this._listeners = connection.stream.listeners('data')
connection.stream.removeAllListeners('data')
var self = this
eventTypes.forEach(function(type) {
self._listeners[type] = connection.stream.listeners(type)
connection.stream.removeAllListeners(type)
})
connection.stream.pipe(this)
}
@ -32,10 +37,12 @@ var code = {
CopyStreamQuery.prototype._detach = function() {
this.connection.stream.unpipe()
this.connection.stream.removeAllListeners('data')
var self = this
this._listeners.forEach(function(listener) {
self.connection.stream.on('data', listener)
eventTypes.forEach(function(type) {
self.connection.stream.removeAllListeners(type)
self._listeners[type].forEach(function(listener) {
self.connection.stream.on(type, listener)
})
})
}

View File

@ -27,6 +27,7 @@
"concat-stream": "~1.1.0",
"gonna": "0.0.0",
"lodash": "~2.2.1",
"heroku-env": "~0.1.1"
"heroku-env": "~0.1.1",
"async": "~0.2.10"
}
}

View File

@ -1,20 +1,21 @@
var assert = require('assert')
var gonna = require('gonna')
var concat = require('concat-stream')
var _ = require('lodash')
var async = require('async')
var concat = require('concat-stream')
var pg = require('pg.js')
var copy = require('../').to
var client = function() {
var client = new pg.Client()
client.connect()
return client
}
var testRange = function(top) {
var client = function() {
var client = new pg.Client()
client.connect()
return client
}
var fromClient = client()
var copy = require('../').to
var txt = 'COPY (SELECT * from generate_series(0, ' + (top - 1) + ')) TO STDOUT'
var stream = fromClient.query(copy(txt))
@ -32,3 +33,27 @@ var testRange = function(top) {
}
testRange(10000)
var testLeak = function(rounds) {
var fromClient = client()
var txt = 'COPY (SELECT 10) TO STDOUT'
var runStream = function(num, callback) {
var stream = fromClient.query(copy(txt))
stream.on('data', function(data) {
// Just throw away the data.
})
stream.on('end', callback)
stream.on('error', callback)
}
async.timesSeries(rounds, runStream, function(err) {
assert.equal(err, null)
assert.equal(fromClient.connection.stream.listeners('data').length, 1)
assert.equal(fromClient.connection.stream.listeners('end').length, 2)
assert.equal(fromClient.connection.stream.listeners('close').length, 0)
fromClient.end()
})
}
testLeak(5)