integrate new pool into existing codebase

This commit is contained in:
bmc 2013-02-20 16:08:48 -06:00
parent bb448fe61a
commit cc84799c7a
8 changed files with 129 additions and 204 deletions

View File

@ -2,26 +2,26 @@ var EventEmitter = require('events').EventEmitter;
var util = require('util');
var Client = require(__dirname+'/client');
var defaults = require(__dirname + '/defaults');
//external genericPool module
var genericPool = require('generic-pool');
//cache of existing client pools
var pools = {};
var pool = require(__dirname + '/pool');
var types = require(__dirname + '/types');
var Connection = require(__dirname + '/connection');
var PG = function(clientConstructor) {
EventEmitter.call(this);
this.Client = clientConstructor;
this.Connection = require(__dirname + '/connection');
this.Query = clientConstructor.Query;
this.defaults = defaults;
this.Client = pool.Client = clientConstructor;
this.Query = this.Client.Query;
this.pools = pool;
this.types = types;
this.Connection = Connection;
};
util.inherits(PG, EventEmitter);
PG.prototype.end = function() {
Object.keys(pools).forEach(function(name) {
var pool = pools[name];
var self = this;
Object.keys(self.pools.all).forEach(function(key) {
var pool = self.pools.all[key];
pool.drain(function() {
pool.destroyAllNow();
});
@ -29,51 +29,16 @@ PG.prototype.end = function() {
};
PG.prototype.connect = function(config, callback) {
var self = this;
var c = config;
var cb = callback;
//allow for no config to be passed
if(typeof c === 'function') {
cb = c;
c = defaults;
if(typeof config == "function") {
callback = config;
config = null;
}
var pool = this.pools.getOrCreate(config);
pool.connect(callback);
if(!pool.listeners('error').length) {
//propagate errors up to pg object
pool.on('error', this.emit.bind(this, 'error'));
}
//get unique pool name even if object was used as config
var poolName = typeof(c) === 'string' ? c : c.user+c.host+c.port+c.database;
var pool = pools[poolName];
if(pool) { return pool.acquire(cb); }
pool = pools[poolName] = genericPool.Pool({
name: poolName,
create: function(callback) {
var client = new self.Client(c);
client.connect(function(err) {
if(err) { return callback(err); }
//handle connected client background errors by emitting event
//via the pg object and then removing errored client from the pool
client.on('error', function(e) {
self.emit('error', e, client);
pool.destroy(client);
});
callback(null, client);
});
client.on('drain', function() {
pool.release(client);
});
},
destroy: function(client) {
client.end();
},
max: defaults.poolSize,
idleTimeoutMillis: defaults.poolIdleTimeout,
reapIntervalMillis: defaults.reapIntervalMillis,
log: defaults.poolLog
});
return pool.acquire(cb);
};
// cancel the query runned by the given client
@ -96,4 +61,3 @@ module.exports.__defineGetter__("native", function() {
return module.exports.native;
});
module.exports.types = require('./types');

View File

@ -3,56 +3,61 @@ var EventEmitter = require('events').EventEmitter;
var defaults = require(__dirname + '/defaults');
var genericPool = require('generic-pool');
//takes the same config structure as client
var createPool = function(clientConfig) {
clientConfig = clientConfig || {};
var name = JSON.stringify(clientConfig);
var pool = createPool.all[name];
if(pool) {
var pools = {
//dictionary of all key:pool pairs
all: {},
//reference to the client constructor - can override in tests or for require('pg').native
Client: require(__dirname + '/client'),
getOrCreate: function(clientConfig) {
clientConfig = clientConfig || {};
var name = JSON.stringify(clientConfig);
var pool = pools.all[name];
if(pool) {
return pool;
}
pool = genericPool.Pool({
name: name,
max: defaults.poolSize,
idleTimeoutMillis: defaults.poolIdleTimeout,
reapIntervalMillis: defaults.reapIntervalMillis,
log: defaults.poolLog,
create: function(cb) {
var client = new pools.Client(clientConfig);
client.connect(function(err) {
if(err) return cb(err, null);
//handle connected client background errors by emitting event
//via the pg object and then removing errored client from the pool
client.on('error', function(e) {
pool.emit('error', e, client);
pool.destroy(client);
});
return cb(null, client);
});
},
destroy: function(client) {
client.end();
}
});
pools.all[name] = pool;
//mixin EventEmitter to pool
EventEmitter.call(pool);
for(var key in EventEmitter.prototype) {
if(EventEmitter.prototype.hasOwnProperty(key)) {
pool[key] = EventEmitter.prototype[key];
}
}
//monkey-patch with connect method
pool.connect = function(cb) {
pool.acquire(function(err, client) {
if(err) return cb(err, null, function() {/*NOOP*/});
//support both 2 (old) and 3 arguments
(cb.length > 2 ? newConnect : oldConnect)(pool, client, cb);
});
};
return pool;
}
pool = genericPool.Pool({
name: name,
max: defaults.poolSize,
idleTimeoutMillis: defaults.poolIdleTimeout,
reapIntervalMillis: defaults.reapIntervalMillis,
log: defaults.poolLog,
create: function(cb) {
var client = new createPool.Client(clientConfig);
client.connect(function(err) {
if(err) return cb(err, null);
//handle connected client background errors by emitting event
//via the pg object and then removing errored client from the pool
client.on('error', function(e) {
pool.emit('error', e, client);
pool.destroy(client);
});
return cb(null, client);
});
},
destroy: function(client) {
client.end();
}
});
createPool.all[name] = pool;
//mixin EventEmitter to pool
EventEmitter.call(pool);
for(var key in EventEmitter.prototype) {
if(EventEmitter.prototype.hasOwnProperty(key)) {
pool[key] = EventEmitter.prototype[key];
}
}
//monkey-patch with connect method
pool.connect = function(cb) {
pool.acquire(function(err, client) {
if(err) return cb(err, null, function() {/*NOOP*/});
//support both 2 (old) and 3 arguments
(cb.length > 2 ? newConnect : oldConnect)(pool, client, cb);
});
};
return pool;
};
//the old connect method of the pool
@ -62,12 +67,15 @@ var createPool = function(clientConfig) {
//a bunch of problems, but for backwards compatibility
//we're leaving it in
var alarmDuration = 5000;
var errorMessage = ['A client has been checked out from the pool for longer than ' + alarmDuration + ' ms.',
'You might have a leak!',
'You should use the following new way to check out clients','pg.connect(function(err, client, done)) {',
' //do something',
' done(); //call done() to signal you are finished with the client',
'}'].join(require('os').EOL);
var errorMessage = [
'A client has been checked out from the pool for longer than ' + alarmDuration + ' ms.',
'You might have a leak!',
'You should use the following new way to check out clients','pg.connect(function(err, client, done)) {',
' //do something',
' done(); //call done() to signal you are finished with the client',
'}'
].join(require('os').EOL);
var oldConnect = function(pool, client, cb) {
var tid = setTimeout(function() {
console.error(errorMessage);
@ -90,10 +98,4 @@ var newConnect = function(pool, client, cb) {
});
};
//list of all created pools
createPool.all = {};
//reference to client constructor
createPool.Client = require(__dirname + '/client');
module.exports = createPool;
module.exports = pools;

View File

@ -17,6 +17,7 @@ helper.pg.connect(helper.config, assert.success(function(client) {
assert.ok(error);
assert.ok(brokenClient);
assert.equal(client.id, brokenClient.id);
client.emit('drain');
helper.pg.end();
});
//kill the connection from client

View File

@ -10,5 +10,11 @@ helper.pg.defaults.poolSize = 1;
helper.pg.connect(assert.calls(function(err, client) {
assert.isNull(err);
client.end();
client.query('SELECT NOW()');
client.once('drain', function() {
setTimeout(function() {
helper.pg.end();
}, 10);
});
}));

View File

@ -9,7 +9,7 @@ helper.testPoolSize = function(max) {
for(var i = 0; i < max; i++) {
helper.pg.poolSize = 10;
test("connection #" + i + " executes", function() {
helper.pg.connect(helper.config, function(err, client) {
helper.pg.connect(helper.config, function(err, client, done) {
assert.isNull(err);
client.query("select * from person", function(err, result) {
assert.lengthIs(result.rows, 26)
@ -19,7 +19,8 @@ helper.testPoolSize = function(max) {
})
var query = client.query("SELECT * FROM NOW()")
query.on('end',function() {
sink.add()
sink.add();
done();
})
})
})

View File

@ -1,63 +0,0 @@
var helper = require(__dirname + '/test-helper');
helper.pg.defaults.poolSize = 1;
helper.pg.defaults.user = helper.args.user;
helper.pg.defaults.password = helper.args.password;
helper.pg.defaults.database = helper.args.database;
helper.pg.defaults.port = helper.args.port;
helper.pg.defaults.host = helper.args.host;
helper.pg.defaults.binary = helper.args.binary;
helper.pg.defaults.poolIdleTimeout = 100;
var moreArgs = {};
for (c in helper.config) {
moreArgs[c] = helper.config[c];
}
moreArgs.zomg = true;
var badArgs = {};
for (c in helper.config) {
badArgs[c] = helper.config[c];
}
badArgs.user = badArgs.user + 'laksdjfl';
badArgs.password = badArgs.password + 'asldkfjlas';
badArgs.zomg = true;
test('connecting with complete config', function() {
helper.pg.connect(helper.config, assert.calls(function(err, client) {
assert.isNull(err);
client.iGotAccessed = true;
client.query("SELECT NOW()")
}));
});
test('connecting with different config object', function() {
helper.pg.connect(moreArgs, assert.calls(function(err, client) {
assert.isNull(err);
assert.ok(client.iGotAccessed === true)
client.query("SELECT NOW()");
}))
});
test('connecting with all defaults', function() {
helper.pg.connect(assert.calls(function(err, client) {
assert.isNull(err);
assert.ok(client.iGotAccessed === true);
client.end();
}));
});
test('connecting with invalid config', function() {
helper.pg.connect(badArgs, assert.calls(function(err, client) {
assert.ok(err != null, "Expected connection error using invalid connection credentials");
}));
});

View File

@ -3,7 +3,7 @@ var EventEmitter = require('events').EventEmitter;
var libDir = __dirname + '/../../../lib';
var defaults = require(libDir + '/defaults');
var pool = require(libDir + '/pool');
var pools = require(libDir + '/pool');
var poolId = 0;
require(__dirname + '/../../test-helper');
@ -41,26 +41,26 @@ HangingClient.prototype.end = function() {
clearInterval(this.intervalId);
}
pool.Client = FakeClient;
pools.Client = FakeClient;
test('no pools exist', function() {
assert.empty(Object.keys(pool.all));
assert.empty(Object.keys(pools.all));
});
test('pool creates pool on miss', function() {
var p = pool();
var p = pools.getOrCreate();
assert.ok(p);
assert.equal(Object.keys(pool.all).length, 1);
var p2 = pool();
assert.equal(Object.keys(pools.all).length, 1);
var p2 = pools.getOrCreate();
assert.equal(p, p2);
assert.equal(Object.keys(pool.all).length, 1);
var p3 = pool("pg://postgres:password@localhost:5432/postgres");
assert.equal(Object.keys(pools.all).length, 1);
var p3 = pools.getOrCreate("pg://postgres:password@localhost:5432/postgres");
assert.notEqual(p, p3);
assert.equal(Object.keys(pool.all).length, 2);
assert.equal(Object.keys(pools.all).length, 2);
});
test('pool follows defaults', function() {
var p = pool(poolId++);
var p = pools.getOrCreate(poolId++);
for(var i = 0; i < 100; i++) {
p.acquire(function(err, client) {
});
@ -69,7 +69,7 @@ test('pool follows defaults', function() {
});
test('pool#connect with 2 parameters (legacy, for backwards compat)', function() {
var p = pool(poolId++);
var p = pools.getOrCreate(poolId++);
p.connect(assert.success(function(client) {
assert.ok(client);
assert.equal(p.availableObjectsCount(), 0);
@ -82,7 +82,7 @@ test('pool#connect with 2 parameters (legacy, for backwards compat)', function()
});
test('pool#connect with 3 parameters', function() {
var p = pool(poolId++);
var p = pools.getOrCreate(poolId++);
var tid = setTimeout(function() {
throw new Error("Connection callback was never called");
}, 100);
@ -103,7 +103,7 @@ test('pool#connect with 3 parameters', function() {
});
test('on client error, client is removed from pool', function() {
var p = pool(poolId++);
var p = pools.getOrCreate(poolId++);
p.connect(assert.success(function(client) {
assert.ok(client);
client.emit('drain');
@ -128,7 +128,7 @@ test('on client error, client is removed from pool', function() {
});
test('pool with connection error on connection', function() {
pool.Client = function() {
pools.Client = function() {
return {
connect: function(cb) {
process.nextTick(function() {
@ -138,7 +138,7 @@ test('pool with connection error on connection', function() {
};
}
test('two parameters', function() {
var p = pool(poolId++);
var p = pools.getOrCreate(poolId++);
p.connect(assert.calls(function(err, client) {
assert.ok(err);
assert.equal(client, null);
@ -148,7 +148,7 @@ test('pool with connection error on connection', function() {
}));
});
test('three parameters', function() {
var p = pool(poolId++);
var p = pools.getOrCreate(poolId++);
var tid = setTimeout(function() {
assert.fail('Did not call connect callback');
}, 100);
@ -166,8 +166,8 @@ test('pool with connection error on connection', function() {
});
test('returnning an error to done()', function() {
var p = pool(poolId++);
pool.Client = FakeClient;
var p = pools.getOrCreate(poolId++);
pools.Client = FakeClient;
p.connect(function(err, client, done) {
assert.equal(err, null);
assert(client);
@ -176,3 +176,17 @@ test('returnning an error to done()', function() {
assert.equal(p.getPoolSize(), 0);
});
});
test('fetching pool by object', function() {
var p = pools.getOrCreate({
user: 'brian',
host: 'localhost',
password: 'password'
});
var p2 = pools.getOrCreate({
user: 'brian',
host: 'localhost',
password: 'password'
});
assert.equal(p, p2);
});

View File

@ -3,7 +3,7 @@ var EventEmitter = require('events').EventEmitter;
var libDir = __dirname + '/../../../lib';
var defaults = require(libDir + '/defaults');
var pool = require(libDir + '/pool');
var pools = require(libDir + '/pool');
var poolId = 0;
require(__dirname + '/../../test-helper');
@ -26,8 +26,8 @@ defaults.poolIdleTimeout = 10;
defaults.reapIntervalMillis = 10;
test('client times out from idle', function() {
pool.Client = FakeClient;
var p = pool(poolId++);
pools.Client = FakeClient;
var p = pools.getOrCreate(poolId++);
p.connect(function(err, client, done) {
done();
});