From cc84799c7ad7a26426f30ffbefa281d54c83abbd Mon Sep 17 00:00:00 2001 From: bmc Date: Wed, 20 Feb 2013 16:08:48 -0600 Subject: [PATCH] integrate new pool into existing codebase --- lib/index.js | 76 +++-------- lib/pool.js | 124 +++++++++--------- .../connection-pool/error-tests.js | 1 + .../connection-pool/optional-config-tests.js | 8 +- .../connection-pool/test-helper.js | 5 +- .../connection-pool/unique-name-tests.js | 63 --------- test/unit/pool/basic-tests.js | 50 ++++--- test/unit/pool/timeout-tests.js | 6 +- 8 files changed, 129 insertions(+), 204 deletions(-) delete mode 100644 test/integration/connection-pool/unique-name-tests.js diff --git a/lib/index.js b/lib/index.js index f05ec15..6dab133 100644 --- a/lib/index.js +++ b/lib/index.js @@ -2,26 +2,26 @@ var EventEmitter = require('events').EventEmitter; var util = require('util'); var Client = require(__dirname+'/client'); var defaults = require(__dirname + '/defaults'); - -//external genericPool module -var genericPool = require('generic-pool'); - -//cache of existing client pools -var pools = {}; +var pool = require(__dirname + '/pool'); +var types = require(__dirname + '/types'); +var Connection = require(__dirname + '/connection'); var PG = function(clientConstructor) { EventEmitter.call(this); - this.Client = clientConstructor; - this.Connection = require(__dirname + '/connection'); - this.Query = clientConstructor.Query; this.defaults = defaults; + this.Client = pool.Client = clientConstructor; + this.Query = this.Client.Query; + this.pools = pool; + this.types = types; + this.Connection = Connection; }; util.inherits(PG, EventEmitter); PG.prototype.end = function() { - Object.keys(pools).forEach(function(name) { - var pool = pools[name]; + var self = this; + Object.keys(self.pools.all).forEach(function(key) { + var pool = self.pools.all[key]; pool.drain(function() { pool.destroyAllNow(); }); @@ -29,51 +29,16 @@ PG.prototype.end = function() { }; PG.prototype.connect = function(config, callback) { - var self = this; - var c = config; - var cb = callback; - //allow for no config to be passed - if(typeof c === 'function') { - cb = c; - c = defaults; + if(typeof config == "function") { + callback = config; + config = null; + } + var pool = this.pools.getOrCreate(config); + pool.connect(callback); + if(!pool.listeners('error').length) { + //propagate errors up to pg object + pool.on('error', this.emit.bind(this, 'error')); } - - //get unique pool name even if object was used as config - var poolName = typeof(c) === 'string' ? c : c.user+c.host+c.port+c.database; - var pool = pools[poolName]; - - if(pool) { return pool.acquire(cb); } - - pool = pools[poolName] = genericPool.Pool({ - name: poolName, - create: function(callback) { - var client = new self.Client(c); - client.connect(function(err) { - if(err) { return callback(err); } - - //handle connected client background errors by emitting event - //via the pg object and then removing errored client from the pool - client.on('error', function(e) { - self.emit('error', e, client); - pool.destroy(client); - }); - - callback(null, client); - }); - - client.on('drain', function() { - pool.release(client); - }); - }, - destroy: function(client) { - client.end(); - }, - max: defaults.poolSize, - idleTimeoutMillis: defaults.poolIdleTimeout, - reapIntervalMillis: defaults.reapIntervalMillis, - log: defaults.poolLog - }); - return pool.acquire(cb); }; // cancel the query runned by the given client @@ -96,4 +61,3 @@ module.exports.__defineGetter__("native", function() { return module.exports.native; }); -module.exports.types = require('./types'); diff --git a/lib/pool.js b/lib/pool.js index c044592..bb7a918 100644 --- a/lib/pool.js +++ b/lib/pool.js @@ -3,56 +3,61 @@ var EventEmitter = require('events').EventEmitter; var defaults = require(__dirname + '/defaults'); var genericPool = require('generic-pool'); -//takes the same config structure as client -var createPool = function(clientConfig) { - clientConfig = clientConfig || {}; - var name = JSON.stringify(clientConfig); - var pool = createPool.all[name]; - if(pool) { +var pools = { + //dictionary of all key:pool pairs + all: {}, + //reference to the client constructor - can override in tests or for require('pg').native + Client: require(__dirname + '/client'), + getOrCreate: function(clientConfig) { + clientConfig = clientConfig || {}; + var name = JSON.stringify(clientConfig); + var pool = pools.all[name]; + if(pool) { + return pool; + } + pool = genericPool.Pool({ + name: name, + max: defaults.poolSize, + idleTimeoutMillis: defaults.poolIdleTimeout, + reapIntervalMillis: defaults.reapIntervalMillis, + log: defaults.poolLog, + create: function(cb) { + var client = new pools.Client(clientConfig); + client.connect(function(err) { + if(err) return cb(err, null); + + //handle connected client background errors by emitting event + //via the pg object and then removing errored client from the pool + client.on('error', function(e) { + pool.emit('error', e, client); + pool.destroy(client); + }); + + return cb(null, client); + }); + }, + destroy: function(client) { + client.end(); + } + }); + pools.all[name] = pool; + //mixin EventEmitter to pool + EventEmitter.call(pool); + for(var key in EventEmitter.prototype) { + if(EventEmitter.prototype.hasOwnProperty(key)) { + pool[key] = EventEmitter.prototype[key]; + } + } + //monkey-patch with connect method + pool.connect = function(cb) { + pool.acquire(function(err, client) { + if(err) return cb(err, null, function() {/*NOOP*/}); + //support both 2 (old) and 3 arguments + (cb.length > 2 ? newConnect : oldConnect)(pool, client, cb); + }); + }; return pool; } - pool = genericPool.Pool({ - name: name, - max: defaults.poolSize, - idleTimeoutMillis: defaults.poolIdleTimeout, - reapIntervalMillis: defaults.reapIntervalMillis, - log: defaults.poolLog, - create: function(cb) { - var client = new createPool.Client(clientConfig); - client.connect(function(err) { - if(err) return cb(err, null); - - //handle connected client background errors by emitting event - //via the pg object and then removing errored client from the pool - client.on('error', function(e) { - pool.emit('error', e, client); - pool.destroy(client); - }); - - return cb(null, client); - }); - }, - destroy: function(client) { - client.end(); - } - }); - createPool.all[name] = pool; - //mixin EventEmitter to pool - EventEmitter.call(pool); - for(var key in EventEmitter.prototype) { - if(EventEmitter.prototype.hasOwnProperty(key)) { - pool[key] = EventEmitter.prototype[key]; - } - } - //monkey-patch with connect method - pool.connect = function(cb) { - pool.acquire(function(err, client) { - if(err) return cb(err, null, function() {/*NOOP*/}); - //support both 2 (old) and 3 arguments - (cb.length > 2 ? newConnect : oldConnect)(pool, client, cb); - }); - }; - return pool; }; //the old connect method of the pool @@ -62,12 +67,15 @@ var createPool = function(clientConfig) { //a bunch of problems, but for backwards compatibility //we're leaving it in var alarmDuration = 5000; -var errorMessage = ['A client has been checked out from the pool for longer than ' + alarmDuration + ' ms.', -'You might have a leak!', -'You should use the following new way to check out clients','pg.connect(function(err, client, done)) {', -' //do something', -' done(); //call done() to signal you are finished with the client', -'}'].join(require('os').EOL); +var errorMessage = [ + 'A client has been checked out from the pool for longer than ' + alarmDuration + ' ms.', + 'You might have a leak!', + 'You should use the following new way to check out clients','pg.connect(function(err, client, done)) {', + ' //do something', + ' done(); //call done() to signal you are finished with the client', + '}' +].join(require('os').EOL); + var oldConnect = function(pool, client, cb) { var tid = setTimeout(function() { console.error(errorMessage); @@ -90,10 +98,4 @@ var newConnect = function(pool, client, cb) { }); }; -//list of all created pools -createPool.all = {}; - -//reference to client constructor -createPool.Client = require(__dirname + '/client'); - -module.exports = createPool; +module.exports = pools; diff --git a/test/integration/connection-pool/error-tests.js b/test/integration/connection-pool/error-tests.js index 11badf0..b540979 100644 --- a/test/integration/connection-pool/error-tests.js +++ b/test/integration/connection-pool/error-tests.js @@ -17,6 +17,7 @@ helper.pg.connect(helper.config, assert.success(function(client) { assert.ok(error); assert.ok(brokenClient); assert.equal(client.id, brokenClient.id); + client.emit('drain'); helper.pg.end(); }); //kill the connection from client diff --git a/test/integration/connection-pool/optional-config-tests.js b/test/integration/connection-pool/optional-config-tests.js index d3ddc50..690be7f 100644 --- a/test/integration/connection-pool/optional-config-tests.js +++ b/test/integration/connection-pool/optional-config-tests.js @@ -10,5 +10,11 @@ helper.pg.defaults.poolSize = 1; helper.pg.connect(assert.calls(function(err, client) { assert.isNull(err); - client.end(); + client.query('SELECT NOW()'); + client.once('drain', function() { + setTimeout(function() { + helper.pg.end(); + + }, 10); + }); })); diff --git a/test/integration/connection-pool/test-helper.js b/test/integration/connection-pool/test-helper.js index cc86677..199407c 100644 --- a/test/integration/connection-pool/test-helper.js +++ b/test/integration/connection-pool/test-helper.js @@ -9,7 +9,7 @@ helper.testPoolSize = function(max) { for(var i = 0; i < max; i++) { helper.pg.poolSize = 10; test("connection #" + i + " executes", function() { - helper.pg.connect(helper.config, function(err, client) { + helper.pg.connect(helper.config, function(err, client, done) { assert.isNull(err); client.query("select * from person", function(err, result) { assert.lengthIs(result.rows, 26) @@ -19,7 +19,8 @@ helper.testPoolSize = function(max) { }) var query = client.query("SELECT * FROM NOW()") query.on('end',function() { - sink.add() + sink.add(); + done(); }) }) }) diff --git a/test/integration/connection-pool/unique-name-tests.js b/test/integration/connection-pool/unique-name-tests.js deleted file mode 100644 index a92a004..0000000 --- a/test/integration/connection-pool/unique-name-tests.js +++ /dev/null @@ -1,63 +0,0 @@ -var helper = require(__dirname + '/test-helper'); - -helper.pg.defaults.poolSize = 1; -helper.pg.defaults.user = helper.args.user; -helper.pg.defaults.password = helper.args.password; -helper.pg.defaults.database = helper.args.database; -helper.pg.defaults.port = helper.args.port; -helper.pg.defaults.host = helper.args.host; -helper.pg.defaults.binary = helper.args.binary; -helper.pg.defaults.poolIdleTimeout = 100; - -var moreArgs = {}; -for (c in helper.config) { - moreArgs[c] = helper.config[c]; -} -moreArgs.zomg = true; - -var badArgs = {}; -for (c in helper.config) { - badArgs[c] = helper.config[c]; -} - -badArgs.user = badArgs.user + 'laksdjfl'; -badArgs.password = badArgs.password + 'asldkfjlas'; -badArgs.zomg = true; - -test('connecting with complete config', function() { - - helper.pg.connect(helper.config, assert.calls(function(err, client) { - assert.isNull(err); - client.iGotAccessed = true; - client.query("SELECT NOW()") - })); - -}); - -test('connecting with different config object', function() { - - helper.pg.connect(moreArgs, assert.calls(function(err, client) { - assert.isNull(err); - assert.ok(client.iGotAccessed === true) - client.query("SELECT NOW()"); - })) - -}); - -test('connecting with all defaults', function() { - - helper.pg.connect(assert.calls(function(err, client) { - assert.isNull(err); - assert.ok(client.iGotAccessed === true); - client.end(); - })); - -}); - -test('connecting with invalid config', function() { - - helper.pg.connect(badArgs, assert.calls(function(err, client) { - assert.ok(err != null, "Expected connection error using invalid connection credentials"); - })); - -}); diff --git a/test/unit/pool/basic-tests.js b/test/unit/pool/basic-tests.js index a19b9dd..b96937e 100644 --- a/test/unit/pool/basic-tests.js +++ b/test/unit/pool/basic-tests.js @@ -3,7 +3,7 @@ var EventEmitter = require('events').EventEmitter; var libDir = __dirname + '/../../../lib'; var defaults = require(libDir + '/defaults'); -var pool = require(libDir + '/pool'); +var pools = require(libDir + '/pool'); var poolId = 0; require(__dirname + '/../../test-helper'); @@ -41,26 +41,26 @@ HangingClient.prototype.end = function() { clearInterval(this.intervalId); } -pool.Client = FakeClient; +pools.Client = FakeClient; test('no pools exist', function() { - assert.empty(Object.keys(pool.all)); + assert.empty(Object.keys(pools.all)); }); test('pool creates pool on miss', function() { - var p = pool(); + var p = pools.getOrCreate(); assert.ok(p); - assert.equal(Object.keys(pool.all).length, 1); - var p2 = pool(); + assert.equal(Object.keys(pools.all).length, 1); + var p2 = pools.getOrCreate(); assert.equal(p, p2); - assert.equal(Object.keys(pool.all).length, 1); - var p3 = pool("pg://postgres:password@localhost:5432/postgres"); + assert.equal(Object.keys(pools.all).length, 1); + var p3 = pools.getOrCreate("pg://postgres:password@localhost:5432/postgres"); assert.notEqual(p, p3); - assert.equal(Object.keys(pool.all).length, 2); + assert.equal(Object.keys(pools.all).length, 2); }); test('pool follows defaults', function() { - var p = pool(poolId++); + var p = pools.getOrCreate(poolId++); for(var i = 0; i < 100; i++) { p.acquire(function(err, client) { }); @@ -69,7 +69,7 @@ test('pool follows defaults', function() { }); test('pool#connect with 2 parameters (legacy, for backwards compat)', function() { - var p = pool(poolId++); + var p = pools.getOrCreate(poolId++); p.connect(assert.success(function(client) { assert.ok(client); assert.equal(p.availableObjectsCount(), 0); @@ -82,7 +82,7 @@ test('pool#connect with 2 parameters (legacy, for backwards compat)', function() }); test('pool#connect with 3 parameters', function() { - var p = pool(poolId++); + var p = pools.getOrCreate(poolId++); var tid = setTimeout(function() { throw new Error("Connection callback was never called"); }, 100); @@ -103,7 +103,7 @@ test('pool#connect with 3 parameters', function() { }); test('on client error, client is removed from pool', function() { - var p = pool(poolId++); + var p = pools.getOrCreate(poolId++); p.connect(assert.success(function(client) { assert.ok(client); client.emit('drain'); @@ -128,7 +128,7 @@ test('on client error, client is removed from pool', function() { }); test('pool with connection error on connection', function() { - pool.Client = function() { + pools.Client = function() { return { connect: function(cb) { process.nextTick(function() { @@ -138,7 +138,7 @@ test('pool with connection error on connection', function() { }; } test('two parameters', function() { - var p = pool(poolId++); + var p = pools.getOrCreate(poolId++); p.connect(assert.calls(function(err, client) { assert.ok(err); assert.equal(client, null); @@ -148,7 +148,7 @@ test('pool with connection error on connection', function() { })); }); test('three parameters', function() { - var p = pool(poolId++); + var p = pools.getOrCreate(poolId++); var tid = setTimeout(function() { assert.fail('Did not call connect callback'); }, 100); @@ -166,8 +166,8 @@ test('pool with connection error on connection', function() { }); test('returnning an error to done()', function() { - var p = pool(poolId++); - pool.Client = FakeClient; + var p = pools.getOrCreate(poolId++); + pools.Client = FakeClient; p.connect(function(err, client, done) { assert.equal(err, null); assert(client); @@ -176,3 +176,17 @@ test('returnning an error to done()', function() { assert.equal(p.getPoolSize(), 0); }); }); + +test('fetching pool by object', function() { + var p = pools.getOrCreate({ + user: 'brian', + host: 'localhost', + password: 'password' + }); + var p2 = pools.getOrCreate({ + user: 'brian', + host: 'localhost', + password: 'password' + }); + assert.equal(p, p2); +}); diff --git a/test/unit/pool/timeout-tests.js b/test/unit/pool/timeout-tests.js index a79e1d3..0fc96b2 100644 --- a/test/unit/pool/timeout-tests.js +++ b/test/unit/pool/timeout-tests.js @@ -3,7 +3,7 @@ var EventEmitter = require('events').EventEmitter; var libDir = __dirname + '/../../../lib'; var defaults = require(libDir + '/defaults'); -var pool = require(libDir + '/pool'); +var pools = require(libDir + '/pool'); var poolId = 0; require(__dirname + '/../../test-helper'); @@ -26,8 +26,8 @@ defaults.poolIdleTimeout = 10; defaults.reapIntervalMillis = 10; test('client times out from idle', function() { - pool.Client = FakeClient; - var p = pool(poolId++); + pools.Client = FakeClient; + var p = pools.getOrCreate(poolId++); p.connect(function(err, client, done) { done(); });