Remove internal pool (#1049)

* Initial work on removing internal pool

* Port backwards-compabible properties

* Cleanup test execution & makefile cruft

* Attempt to fix flakey error test
remotes/origin/cdb-6.1
Brian C 8 years ago committed by GitHub
parent 1596a933eb
commit 796a44f54f

@ -13,7 +13,6 @@ all:
npm install npm install
help: help:
@echo "make prepare-test-db [connectionString=postgres://<your connection string>]"
@echo "make test-all [connectionString=postgres://<your connection string>]" @echo "make test-all [connectionString=postgres://<your connection string>]"
test: test-unit test: test-unit
@ -32,11 +31,7 @@ test-unit:
test-connection: test-connection:
@echo "***Testing connection***" @echo "***Testing connection***"
@node script/test-connection.js $(params) @node script/create-test-tables.js $(params)
test-connection-binary:
@echo "***Testing binary connection***"
@node script/test-connection.js $(params) binary
test-missing-native: test-missing-native:
@echo "***Testing optional native install***" @echo "***Testing optional native install***"
@ -47,7 +42,7 @@ test-missing-native:
node_modules/pg-native/index.js: node_modules/pg-native/index.js:
@npm i pg-native @npm i pg-native
test-native: node_modules/pg-native/index.js test-native: node_modules/pg-native/index.js test-connection
@echo "***Testing native bindings***" @echo "***Testing native bindings***"
@find test/native -name "*-tests.js" | $(node-command) @find test/native -name "*-tests.js" | $(node-command)
@find test/integration -name "*-tests.js" | $(node-command) native @find test/integration -name "*-tests.js" | $(node-command) native
@ -56,13 +51,12 @@ test-integration: test-connection
@echo "***Testing Pure Javascript***" @echo "***Testing Pure Javascript***"
@find test/integration -name "*-tests.js" | $(node-command) @find test/integration -name "*-tests.js" | $(node-command)
test-binary: test-connection-binary test-binary: test-connection
@echo "***Testing Pure Javascript (binary)***" @echo "***Testing Pure Javascript (binary)***"
@find test/integration -name "*-tests.js" | $(node-command) binary @find test/integration -name "*-tests.js" | $(node-command) binary
prepare-test-db: test-pool:
@echo "***Preparing the database for tests***" @find test/integration/connection-pool -name "*.js" | $(node-command) binary
@find script/create-test-tables.js | $(node-command)
jshint: jshint:
@echo "***Starting jshint***" @echo "***Starting jshint***"

@ -19,13 +19,21 @@ $ npm install pg
Generally you will access the PostgreSQL server through a pool of clients. A client takes a non-trivial amount of time to establish a new connection. A client also consumes a non-trivial amount of resources on the PostgreSQL server - not something you want to do on every http request. Good news: node-postgres ships with built in client pooling. Generally you will access the PostgreSQL server through a pool of clients. A client takes a non-trivial amount of time to establish a new connection. A client also consumes a non-trivial amount of resources on the PostgreSQL server - not something you want to do on every http request. Good news: node-postgres ships with built in client pooling.
```javascript ```javascript
var pg = require('pg'); var Pool = require('pg').Pool;
var conString = "postgres://username:password@localhost/database";
var config = {
user: 'foo', //env var: PGUSER
database: 'my_db', //env var: PGDATABASE
password: 'secret', //env var: PGPASSWORD
port: 5432 //env var: PGPORT
};
var pool = new Pool(config);
//this initializes a connection pool //this initializes a connection pool
//it will keep idle connections open for a (configurable) 30 seconds //it will keep idle connections open for a (configurable) 30 seconds
//and set a limit of 10 (also configurable) //and set a limit of 10 (also configurable)
pg.connect(conString, function(err, client, done) { pool.connect(function(err, client, done) {
if(err) { if(err) {
return console.error('error fetching client from pool', err); return console.error('error fetching client from pool', err);
} }
@ -42,6 +50,8 @@ pg.connect(conString, function(err, client, done) {
}); });
``` ```
node-postgres uses [pg-pool](https://github.com/brianc/node-pg-pool.git) to manage pooling and only provides a very thin layer on top. It's highly recommend you read the documentation for [pg-pool](https://github.com/brianc/node-pg-pool.git)
[Check this out for the get up and running quickly example](https://github.com/brianc/node-postgres/wiki/Example) [Check this out for the get up and running quickly example](https://github.com/brianc/node-postgres/wiki/Example)
### Client instance ### Client instance
@ -85,7 +95,7 @@ node-postgres contains a pure JavaScript protocol implementation which is quite
To use the native bindings, first install [pg-native](https://github.com/brianc/node-pg-native.git). Once pg-native is installed, simply replace `require('pg')` with `require('pg').native`. To use the native bindings, first install [pg-native](https://github.com/brianc/node-pg-native.git). Once pg-native is installed, simply replace `require('pg')` with `require('pg').native`.
node-postgres abstracts over the pg-native module to provide exactly the same interface as the pure JavaScript version. __No other code changes are required__. If you find yourself having to change code other than the require statement when switching from `require('pg')` to `require('pg').native` please report an issue. node-postgres abstracts over the pg-native module to provide exactly the same interface as the pure JavaScript version. Care has been taken to keep the number of api differences between the two modules to a minimum; however, it is recommend you use either the pure JavaScript or native bindings in both development and production and don't mix & match them in the same process - it can get confusing!
## Features ## Features

@ -2,15 +2,17 @@ var EventEmitter = require('events').EventEmitter;
var util = require('util'); var util = require('util');
var Client = require('./client'); var Client = require('./client');
var defaults = require('./defaults'); var defaults = require('./defaults');
var pool = require('./pool');
var Connection = require('./connection'); var Connection = require('./connection');
var ConnectionParameters = require('./connection-parameters');
var Pool = require('pg-pool');
var PG = function(clientConstructor) { var PG = function(clientConstructor) {
EventEmitter.call(this); EventEmitter.call(this);
this.defaults = defaults; this.defaults = defaults;
this.Client = clientConstructor; this.Client = clientConstructor;
this.Query = this.Client.Query; this.Query = this.Client.Query;
this.pools = pool(clientConstructor); this.Pool = Pool;
this.pools = [];
this.Connection = Connection; this.Connection = Connection;
this.types = require('pg-types'); this.types = require('pg-types');
}; };
@ -19,16 +21,16 @@ util.inherits(PG, EventEmitter);
PG.prototype.end = function() { PG.prototype.end = function() {
var self = this; var self = this;
var keys = Object.keys(self.pools.all); var keys = Object.keys(this.pools);
var count = keys.length; var count = keys.length;
if(count === 0) { if(count === 0) {
self.emit('end'); self.emit('end');
} else { } else {
keys.forEach(function(key) { keys.forEach(function(key) {
var pool = self.pools.all[key]; var pool = self.pools[key];
delete self.pools.all[key]; delete self.pools[key];
pool.drain(function() { pool.pool.drain(function() {
pool.destroyAllNow(function() { pool.pool.destroyAllNow(function() {
count--; count--;
if(count === 0) { if(count === 0) {
self.emit('end'); self.emit('end');
@ -39,17 +41,31 @@ PG.prototype.end = function() {
} }
}; };
PG.prototype.connect = function(config, callback) { PG.prototype.connect = function(config, callback) {
if(typeof config == "function") { if(typeof config == "function") {
callback = config; callback = config;
config = null; config = null;
} }
var pool = this.pools.getOrCreate(config); var poolName = JSON.stringify(config || {});
if (typeof config == 'string') {
config = new ConnectionParameters(config);
}
config = config || {};
//for backwards compatibility
config.max = config.max || config.poolSize || defaults.poolSize;
config.idleTimeoutMillis = config.idleTimeoutMillis || config.poolIdleTimeout || defaults.poolIdleTimeout;
config.log = config.log || config.poolLog || defaults.poolLog;
this.pools[poolName] = this.pools[poolName] || new Pool(config, this.Client);
var pool = this.pools[poolName];
pool.connect(callback); pool.connect(callback);
if(!pool.listeners('error').length) { if(!pool.listeners('error').length) {
//propagate errors up to pg object //propagate errors up to pg object
pool.on('error', this.emit.bind(this, 'error')); pool.on('error', function(e) {
this.emit('error', e, e.client);
}.bind(this));
} }
}; };

@ -1,99 +0,0 @@
var EventEmitter = require('events').EventEmitter;
var defaults = require('./defaults');
var genericPool = require('generic-pool');
module.exports = function(Client) {
var pools = {
Client: Client,
//dictionary of all key:pool pairs
all: {},
//reference to the client constructor - can override in tests or for require('pg').native
getOrCreate: function(clientConfig) {
clientConfig = clientConfig || {};
var name = JSON.stringify(clientConfig);
var pool = pools.all[name];
if(pool) {
return pool;
}
pool = genericPool.Pool({
name: name,
max: clientConfig.poolSize || defaults.poolSize,
idleTimeoutMillis: clientConfig.poolIdleTimeout || defaults.poolIdleTimeout,
reapIntervalMillis: clientConfig.reapIntervalMillis || defaults.reapIntervalMillis,
returnToHead: clientConfig.returnToHead || defaults.returnToHead,
log: clientConfig.poolLog || defaults.poolLog,
create: function(cb) {
var client = new pools.Client(clientConfig);
// Ignore errors on pooled clients until they are connected.
client.on('error', Function.prototype);
client.connect(function(err) {
if(err) return cb(err, null);
// Remove the noop error handler after a connection has been established.
client.removeListener('error', Function.prototype);
//handle connected client background errors by emitting event
//via the pg object and then removing errored client from the pool
client.on('error', function(e) {
pool.emit('error', e, client);
// If the client is already being destroyed, the error
// occurred during stream ending. Do not attempt to destroy
// the client again.
if (!client._destroying) {
pool.destroy(client);
}
});
// Remove connection from pool on disconnect
client.on('end', function(e) {
// Do not enter infinite loop between pool.destroy
// and client 'end' event...
if ( ! client._destroying ) {
pool.destroy(client);
}
});
client.poolCount = 0;
return cb(null, client);
});
},
destroy: function(client) {
client._destroying = true;
client.poolCount = undefined;
client.end();
}
});
pools.all[name] = pool;
//mixin EventEmitter to pool
EventEmitter.call(pool);
for(var key in EventEmitter.prototype) {
if(EventEmitter.prototype.hasOwnProperty(key)) {
pool[key] = EventEmitter.prototype[key];
}
}
//monkey-patch with connect method
pool.connect = function(cb) {
var domain = process.domain;
pool.acquire(function(err, client) {
if(domain) {
cb = domain.bind(cb);
}
if(err) return cb(err, null, function() {/*NOOP*/});
client.poolCount++;
cb(null, client, function(err) {
if(err) {
pool.destroy(client);
} else {
pool.release(client);
}
});
});
};
return pool;
}
};
return pools;
};

@ -19,9 +19,9 @@
"main": "./lib", "main": "./lib",
"dependencies": { "dependencies": {
"buffer-writer": "1.0.1", "buffer-writer": "1.0.1",
"generic-pool": "2.4.2",
"packet-reader": "0.2.0", "packet-reader": "0.2.0",
"pg-connection-string": "0.1.3", "pg-connection-string": "0.1.3",
"pg-pool": "1.*",
"pg-types": "1.*", "pg-types": "1.*",
"pgpass": "0.0.6", "pgpass": "0.0.6",
"semver": "4.3.2" "semver": "4.3.2"
@ -29,7 +29,9 @@
"devDependencies": { "devDependencies": {
"async": "0.9.0", "async": "0.9.0",
"jshint": "2.5.2", "jshint": "2.5.2",
"pg-copy-streams": "0.3.0" "lodash": "4.13.1",
"pg-copy-streams": "0.3.0",
"promise-polyfill": "5.2.1"
}, },
"minNativeVersion": "1.7.0", "minNativeVersion": "1.7.0",
"scripts": { "scripts": {

@ -38,23 +38,17 @@ var con = new pg.Client({
database: args.database database: args.database
}); });
con.connect(); con.connect();
if(args.down) { var query = con.query("drop table if exists person");
console.log("Dropping table 'person'") query.on('end', function() {
var query = con.query("drop table if exists person"); console.log("Dropped table 'person'")
query.on('end', function() { });
console.log("Dropped!"); con.query("create table person(id serial, name varchar(10), age integer)").on('end', function(){
con.end(); console.log("Created table person");
}); console.log("Filling it with people");
} else { });
console.log("Creating table 'person'"); people.map(function(person) {
con.query("create table person(id serial, name varchar(10), age integer)").on('end', function(){ return con.query("insert into person(name, age) values('"+person.name + "', '" + person.age + "')");
console.log("Created!"); }).pop().on('end', function(){
console.log("Filling it with people"); console.log("Inserted 26 people");
});; con.end();
people.map(function(person) { });
return con.query("insert into person(name, age) values('"+person.name + "', '" + person.age + "')");
}).pop().on('end', function(){
console.log("Inserted 26 people");
con.end();
});
}

@ -1,24 +0,0 @@
var helper = require(__dirname + '/../test/test-helper');
console.log();
console.log("testing ability to connect to '%j'", helper.config);
var pg = require(__dirname + '/../lib');
pg.connect(helper.config, function(err, client, done) {
if(err !== null) {
console.error("Recieved connection error when attempting to contact PostgreSQL:");
console.error(err);
process.exit(255);
}
console.log("Checking for existance of required test table 'person'")
client.query("SELECT COUNT(name) FROM person", function(err, callback) {
if(err != null) {
console.error("Recieved error when executing query 'SELECT COUNT(name) FROM person'")
console.error("It is possible you have not yet run the table create script under script/create-test-tables")
console.error("Consult the postgres-node wiki under the 'Testing' section for more information")
console.error(err);
process.exit(255);
}
done();
pg.end();
})
})

@ -1,34 +0,0 @@
return console.log('query-callback-error-tests: DEPRECATED - if you want safety in your callback, you can try/catch your own functions');
var helper = require(__dirname + '/test-helper');
var util = require('util');
var withQuery = function(text, resultLength, cb) {
test('error during query execution', function() {
var client = new Client(helper.args);
process.removeAllListeners('uncaughtException');
assert.emits(process, 'uncaughtException', function() {
assert.equal(client.activeQuery, null, 'should remove active query even if error happens in callback');
client.query('SELECT * FROM blah', assert.success(function(result) {
assert.equal(result.rows.length, resultLength);
client.end();
cb();
}));
});
client.connect(assert.success(function() {
client.query('CREATE TEMP TABLE "blah"(data text)', assert.success(function() {
var q = client.query(text, ['yo'], assert.calls(function() {
assert.emits(client, 'drain');
throw new Error('WHOOOAAAHH!!');
}));
}));
}));
});
}
//test with good query so our callback is called
//as a successful callback
withQuery('INSERT INTO blah(data) VALUES($1)', 1, function() {
//test with an error query so our callback is called with an error
withQuery('INSERT INTO asldkfjlaskfj eoooeoriiri', 0, function() {
});
});

@ -22,16 +22,18 @@ test('error during query execution', function() {
query1.on('end', function() { query1.on('end', function() {
assert.fail('Query with an error should not emit "end" event') assert.fail('Query with an error should not emit "end" event')
}) })
var client2 = new Client(helper.args); setTimeout(function() {
client2.connect(assert.success(function() { var client2 = new Client(helper.args);
var killIdleQuery = "SELECT " + pidColName + ", (SELECT pg_terminate_backend(" + pidColName + ")) AS killed FROM pg_stat_activity WHERE " + queryColName + " = $1"; client2.connect(assert.success(function() {
client2.query(killIdleQuery, [sleepQuery], assert.calls(function(err, res) { var killIdleQuery = "SELECT " + pidColName + ", (SELECT pg_terminate_backend(" + pidColName + ")) AS killed FROM pg_stat_activity WHERE " + queryColName + " = $1";
assert.ifError(err); client2.query(killIdleQuery, [sleepQuery], assert.calls(function(err, res) {
assert.equal(res.rows.length, 1); assert.ifError(err);
client2.end(); assert.equal(res.rows.length, 1);
assert.emits(client2, 'end'); client2.end();
assert.emits(client2, 'end');
}));
})); }));
})); }, 100)
})); }));
})); }));
}); });

@ -1,12 +1,13 @@
var helper = require(__dirname + '/test-helper'); var helper = require(__dirname + '/test-helper');
var _ = require('lodash')
helper.pg.defaults.poolIdleTimeout = 200; const config = _.extend({ }, helper.config, { idleTimeoutMillis: 50 })
test('idle timeout', function() { test('idle timeout', function() {
helper.pg.connect(helper.config, assert.calls(function(err, client, done) { helper.pg.connect(config, assert.calls(function(err, client, done) {
assert.isNull(err); assert.isNull(err);
client.query('SELECT NOW()'); client.query('SELECT NOW()');
//just let this one time out //just let this one time out
//test will hang if pool doesn't timeout //test will hang if pool doesn't timeout
done(); done();
})); }));

@ -15,7 +15,6 @@ helper.pg.connect(assert.calls(function(err, client, done) {
setTimeout(function() { setTimeout(function() {
helper.pg.end(); helper.pg.end();
done(); done();
}, 10); }, 10);
}); });
})); }));

@ -1,6 +1,11 @@
//make assert a global... //make assert a global...
assert = require('assert'); assert = require('assert');
//support for node@0.10.x
if (typeof Promise == 'undefined') {
global.Promise = require('promise-polyfill')
}
var EventEmitter = require('events').EventEmitter; var EventEmitter = require('events').EventEmitter;
var sys = require('util'); var sys = require('util');
var BufferList = require(__dirname+'/buffer-list') var BufferList = require(__dirname+'/buffer-list')

@ -1,214 +0,0 @@
var util = require('util');
var EventEmitter = require('events').EventEmitter;
var libDir = __dirname + '/../../../lib';
var poolsFactory = require(libDir + '/pool')
var defaults = require(libDir + '/defaults');
var poolId = 0;
require(__dirname + '/../../test-helper');
var FakeClient = function() {
EventEmitter.call(this);
}
util.inherits(FakeClient, EventEmitter);
FakeClient.prototype.connect = function(cb) {
process.nextTick(cb);
}
FakeClient.prototype.end = function() {
this.endCalled = true;
}
var pools = poolsFactory(FakeClient);
//Hangs the event loop until 'end' is called on client
var HangingClient = function(config) {
EventEmitter.call(this);
this.config = config;
}
util.inherits(HangingClient, EventEmitter);
HangingClient.prototype.connect = function(cb) {
this.intervalId = setInterval(function() {
console.log('hung client...');
}, 1000);
process.nextTick(cb);
}
HangingClient.prototype.end = function() {
clearInterval(this.intervalId);
}
test('no pools exist', function() {
assert.empty(Object.keys(pools.all));
});
test('pool creates pool on miss', function() {
var p = pools.getOrCreate();
assert.ok(p);
assert.equal(Object.keys(pools.all).length, 1);
var p2 = pools.getOrCreate();
assert.equal(p, p2);
assert.equal(Object.keys(pools.all).length, 1);
var p3 = pools.getOrCreate("postgres://postgres:password@localhost:5432/postgres");
assert.notEqual(p, p3);
assert.equal(Object.keys(pools.all).length, 2);
});
test('pool follows defaults', function() {
var p = pools.getOrCreate(poolId++);
for(var i = 0; i < 100; i++) {
p.acquire(function(err, client) {
});
}
assert.equal(p.getPoolSize(), defaults.poolSize);
});
test('pool#connect with 3 parameters', function() {
var p = pools.getOrCreate(poolId++);
var tid = setTimeout(function() {
throw new Error("Connection callback was never called");
}, 100);
p.connect(function(err, client, done) {
clearTimeout(tid);
assert.ifError(err, null);
assert.ok(client);
assert.equal(p.availableObjectsCount(), 0);
assert.equal(p.getPoolSize(), 1);
client.emit('drain');
assert.equal(p.availableObjectsCount(), 0);
assert.equal(p.getPoolSize(), 1);
done();
assert.equal(p.availableObjectsCount(), 1);
assert.equal(p.getPoolSize(), 1);
p.destroyAllNow();
});
});
test('on client error, client is removed from pool', function() {
var p = pools.getOrCreate(poolId++);
p.connect(assert.success(function(client, done) {
assert.ok(client);
done();
assert.equal(p.availableObjectsCount(), 1);
assert.equal(p.getPoolSize(), 1);
//error event fires on pool BEFORE pool.destroy is called with client
assert.emits(p, 'error', function(err) {
assert.equal(err.message, 'test error');
assert.ok(!client.endCalled);
assert.equal(p.availableObjectsCount(), 1);
assert.equal(p.getPoolSize(), 1);
//after we're done in our callback, pool.destroy is called
process.nextTick(function() {
assert.ok(client.endCalled);
assert.equal(p.availableObjectsCount(), 0);
assert.equal(p.getPoolSize(), 0);
p.destroyAllNow();
});
});
client.emit('error', new Error('test error'));
}));
});
test('pool with connection error on connection', function() {
var errorPools = poolsFactory(function() {
return {
connect: function(cb) {
process.nextTick(function() {
cb(new Error('Could not connect'));
});
},
on: Function.prototype
};
})
test('two parameters', function() {
var p = errorPools.getOrCreate(poolId++);
p.connect(assert.calls(function(err, client) {
assert.ok(err);
assert.equal(client, null);
//client automatically removed
assert.equal(p.availableObjectsCount(), 0);
assert.equal(p.getPoolSize(), 0);
}));
});
test('three parameters', function() {
var p = errorPools.getOrCreate(poolId++);
var tid = setTimeout(function() {
assert.fail('Did not call connect callback');
}, 100);
p.connect(function(err, client, done) {
clearTimeout(tid);
assert.ok(err);
assert.equal(client, null);
//done does nothing
done(new Error('OH NOOOO'));
done();
assert.equal(p.availableObjectsCount(), 0);
assert.equal(p.getPoolSize(), 0);
});
});
});
test('returnning an error to done()', function() {
var p = pools.getOrCreate(poolId++);
p.connect(function(err, client, done) {
assert.equal(err, null);
assert(client);
done(new Error("BROKEN"));
assert.equal(p.availableObjectsCount(), 0);
assert.equal(p.getPoolSize(), 0);
});
});
test('fetching pool by object', function() {
var p = pools.getOrCreate({
user: 'brian',
host: 'localhost',
password: 'password'
});
var p2 = pools.getOrCreate({
user: 'brian',
host: 'localhost',
password: 'password'
});
assert.equal(p, p2);
});
test('pool#connect client.poolCount', function() {
var p = pools.getOrCreate(poolId++);
var tid;
setConnectTimeout = function() {
tid = setTimeout(function() {
throw new Error("Connection callback was never called");
}, 100);
}
setConnectTimeout();
p.connect(function(err, client, done) {
clearTimeout(tid);
assert.equal(client.poolCount, 1,
'after connect, poolCount should be 1');
done();
assert.equal(client.poolCount, 1,
'after returning client to pool, poolCount should still be 1');
setConnectTimeout();
p.connect(function(err, client, done) {
clearTimeout(tid);
assert.equal(client.poolCount, 2,
'after second connect, poolCount should be 2');
done();
setConnectTimeout();
p.destroyAllNow(function() {
clearTimeout(tid);
assert.equal(client.poolCount, undefined,
'after pool is destroyed, count should be undefined');
});
})
});
});

@ -1,43 +0,0 @@
var util = require('util');
var EventEmitter = require('events').EventEmitter;
var libDir = __dirname + '/../../../lib';
var defaults = require(libDir + '/defaults');
var poolsFactory = require(libDir + '/pool');
var poolId = 0;
require(__dirname + '/../../test-helper');
var FakeClient = function() {
EventEmitter.call(this);
}
util.inherits(FakeClient, EventEmitter);
FakeClient.prototype.connect = function(cb) {
process.nextTick(cb);
}
FakeClient.prototype.end = function() {
this.endCalled = true;
}
defaults.poolIdleTimeout = 10;
defaults.reapIntervalMillis = 10;
var pools = poolsFactory(FakeClient)
test('client times out from idle', function() {
var p = pools.getOrCreate(poolId++);
p.connect(function(err, client, done) {
done();
});
process.nextTick(function() {
assert.equal(p.availableObjectsCount(), 1);
assert.equal(p.getPoolSize(), 1);
setTimeout(function() {
assert.equal(p.availableObjectsCount(), 0);
assert.equal(p.getPoolSize(), 0);
}, 50);
});
});
Loading…
Cancel
Save