diff --git a/lib/index.js b/lib/index.js index c8b9f53..e7df427 100644 --- a/lib/index.js +++ b/lib/index.js @@ -8,9 +8,9 @@ var Connection = require('./connection'); var PG = function(clientConstructor) { EventEmitter.call(this); this.defaults = defaults; - this.Client = pool.Client = clientConstructor; + this.Client = clientConstructor; this.Query = this.Client.Query; - this.pools = pool; + this.pools = pool(clientConstructor); this.Connection = Connection; this.types = require('pg-types'); }; diff --git a/lib/pool.js b/lib/pool.js index 885a351..358cb2c 100644 --- a/lib/pool.js +++ b/lib/pool.js @@ -3,93 +3,95 @@ var EventEmitter = require('events').EventEmitter; var defaults = require('./defaults'); var genericPool = require('generic-pool'); -var pools = { - //dictionary of all key:pool pairs - all: {}, - //reference to the client constructor - can override in tests or for require('pg').native - Client: require('./client'), - getOrCreate: function(clientConfig) { - clientConfig = clientConfig || {}; - var name = JSON.stringify(clientConfig); - var pool = pools.all[name]; - if(pool) { + +module.exports = function(Client) { + var pools = { + //dictionary of all key:pool pairs + all: {}, + //reference to the client constructor - can override in tests or for require('pg').native + getOrCreate: function(clientConfig) { + clientConfig = clientConfig || {}; + var name = JSON.stringify(clientConfig); + var pool = pools.all[name]; + if(pool) { + return pool; + } + pool = genericPool.Pool({ + name: name, + max: clientConfig.poolSize || defaults.poolSize, + idleTimeoutMillis: clientConfig.poolIdleTimeout || defaults.poolIdleTimeout, + reapIntervalMillis: clientConfig.reapIntervalMillis || defaults.reapIntervalMillis, + log: clientConfig.poolLog || defaults.poolLog, + create: function(cb) { + var client = new Client(clientConfig); + // Ignore errors on pooled clients until they are connected. + client.on('error', Function.prototype); + client.connect(function(err) { + if(err) return cb(err, null); + + // Remove the noop error handler after a connection has been established. + client.removeListener('error', Function.prototype); + + //handle connected client background errors by emitting event + //via the pg object and then removing errored client from the pool + client.on('error', function(e) { + pool.emit('error', e, client); + + // If the client is already being destroyed, the error + // occurred during stream ending. Do not attempt to destroy + // the client again. + if (!client._destroying) { + pool.destroy(client); + } + }); + + // Remove connection from pool on disconnect + client.on('end', function(e) { + // Do not enter infinite loop between pool.destroy + // and client 'end' event... + if ( ! client._destroying ) { + pool.destroy(client); + } + }); + client.poolCount = 0; + return cb(null, client); + }); + }, + destroy: function(client) { + client._destroying = true; + client.poolCount = undefined; + client.end(); + } + }); + pools.all[name] = pool; + //mixin EventEmitter to pool + EventEmitter.call(pool); + for(var key in EventEmitter.prototype) { + if(EventEmitter.prototype.hasOwnProperty(key)) { + pool[key] = EventEmitter.prototype[key]; + } + } + //monkey-patch with connect method + pool.connect = function(cb) { + var domain = process.domain; + pool.acquire(function(err, client) { + if(domain) { + cb = domain.bind(cb); + } + if(err) return cb(err, null, function() {/*NOOP*/}); + client.poolCount++; + cb(null, client, function(err) { + if(err) { + pool.destroy(client); + } else { + pool.release(client); + } + }); + }); + }; return pool; } - pool = genericPool.Pool({ - name: name, - max: clientConfig.poolSize || defaults.poolSize, - idleTimeoutMillis: clientConfig.poolIdleTimeout || defaults.poolIdleTimeout, - reapIntervalMillis: clientConfig.reapIntervalMillis || defaults.reapIntervalMillis, - log: clientConfig.poolLog || defaults.poolLog, - create: function(cb) { - var client = new pools.Client(clientConfig); - // Ignore errors on pooled clients until they are connected. - client.on('error', Function.prototype); - client.connect(function(err) { - if(err) return cb(err, null); + }; - // Remove the noop error handler after a connection has been established. - client.removeListener('error', Function.prototype); - - //handle connected client background errors by emitting event - //via the pg object and then removing errored client from the pool - client.on('error', function(e) { - pool.emit('error', e, client); - - // If the client is already being destroyed, the error - // occurred during stream ending. Do not attempt to destroy - // the client again. - if (!client._destroying) { - pool.destroy(client); - } - }); - - // Remove connection from pool on disconnect - client.on('end', function(e) { - // Do not enter infinite loop between pool.destroy - // and client 'end' event... - if ( ! client._destroying ) { - pool.destroy(client); - } - }); - client.poolCount = 0; - return cb(null, client); - }); - }, - destroy: function(client) { - client._destroying = true; - client.poolCount = undefined; - client.end(); - } - }); - pools.all[name] = pool; - //mixin EventEmitter to pool - EventEmitter.call(pool); - for(var key in EventEmitter.prototype) { - if(EventEmitter.prototype.hasOwnProperty(key)) { - pool[key] = EventEmitter.prototype[key]; - } - } - //monkey-patch with connect method - pool.connect = function(cb) { - var domain = process.domain; - pool.acquire(function(err, client) { - if(domain) { - cb = domain.bind(cb); - } - if(err) return cb(err, null, function() {/*NOOP*/}); - client.poolCount++; - cb(null, client, function(err) { - if(err) { - pool.destroy(client); - } else { - pool.release(client); - } - }); - }); - }; - return pool; - } + return pools; }; - -module.exports = pools; diff --git a/test/integration/gh-issues/981-tests.js b/test/integration/gh-issues/981-tests.js new file mode 100644 index 0000000..ed3c312 --- /dev/null +++ b/test/integration/gh-issues/981-tests.js @@ -0,0 +1,27 @@ +var helper = require(__dirname + '/../test-helper'); + +//native bindings are only installed for native tests +if(!helper.args.native) { + return; +} + +var assert = require('assert') +var pg = require('../../../lib') +var native = require('../../../lib').native + +var JsClient = require('../../../lib/client') +var NativeClient = require('../../../lib/native') + +assert(pg.Client === JsClient); +assert(native.Client === NativeClient); + +pg.connect(function(err, client, done) { + assert(client instanceof JsClient); + client.end(); + + native.connect(function(err, client, done) { + assert(client instanceof NativeClient); + client.end(); + }); +}); + diff --git a/test/unit/pool/basic-tests.js b/test/unit/pool/basic-tests.js index 2bf458a..68748d2 100644 --- a/test/unit/pool/basic-tests.js +++ b/test/unit/pool/basic-tests.js @@ -2,8 +2,8 @@ var util = require('util'); var EventEmitter = require('events').EventEmitter; var libDir = __dirname + '/../../../lib'; +var poolsFactory = require(libDir + '/pool') var defaults = require(libDir + '/defaults'); -var pools = require(libDir + '/pool'); var poolId = 0; require(__dirname + '/../../test-helper'); @@ -21,6 +21,7 @@ FakeClient.prototype.connect = function(cb) { FakeClient.prototype.end = function() { this.endCalled = true; } +var pools = poolsFactory(FakeClient); //Hangs the event loop until 'end' is called on client var HangingClient = function(config) { @@ -41,8 +42,6 @@ HangingClient.prototype.end = function() { clearInterval(this.intervalId); } -pools.Client = FakeClient; - test('no pools exist', function() { assert.empty(Object.keys(pools.all)); }); @@ -115,7 +114,7 @@ test('on client error, client is removed from pool', function() { }); test('pool with connection error on connection', function() { - pools.Client = function() { + var errorPools = poolsFactory(function() { return { connect: function(cb) { process.nextTick(function() { @@ -124,9 +123,10 @@ test('pool with connection error on connection', function() { }, on: Function.prototype }; - }; + }) + test('two parameters', function() { - var p = pools.getOrCreate(poolId++); + var p = errorPools.getOrCreate(poolId++); p.connect(assert.calls(function(err, client) { assert.ok(err); assert.equal(client, null); @@ -136,7 +136,7 @@ test('pool with connection error on connection', function() { })); }); test('three parameters', function() { - var p = pools.getOrCreate(poolId++); + var p = errorPools.getOrCreate(poolId++); var tid = setTimeout(function() { assert.fail('Did not call connect callback'); }, 100); @@ -155,7 +155,6 @@ test('pool with connection error on connection', function() { test('returnning an error to done()', function() { var p = pools.getOrCreate(poolId++); - pools.Client = FakeClient; p.connect(function(err, client, done) { assert.equal(err, null); assert(client); diff --git a/test/unit/pool/timeout-tests.js b/test/unit/pool/timeout-tests.js index 0fc96b2..f7facd1 100644 --- a/test/unit/pool/timeout-tests.js +++ b/test/unit/pool/timeout-tests.js @@ -3,7 +3,7 @@ var EventEmitter = require('events').EventEmitter; var libDir = __dirname + '/../../../lib'; var defaults = require(libDir + '/defaults'); -var pools = require(libDir + '/pool'); +var poolsFactory = require(libDir + '/pool'); var poolId = 0; require(__dirname + '/../../test-helper'); @@ -25,8 +25,9 @@ FakeClient.prototype.end = function() { defaults.poolIdleTimeout = 10; defaults.reapIntervalMillis = 10; +var pools = poolsFactory(FakeClient) + test('client times out from idle', function() { - pools.Client = FakeClient; var p = pools.getOrCreate(poolId++); p.connect(function(err, client, done) { done();