initial crack at connection pooling -- still dirty

This commit is contained in:
Brian Carlson 2010-12-10 17:32:34 -06:00
parent 5a87972983
commit ab13d0c1eb
10 changed files with 184 additions and 14 deletions

View File

@ -1,11 +1,106 @@
var EventEmitter = require('events').EventEmitter;
var sys = require('sys');
var net = require('net');
var Pool = require(__dirname + '/utils').Pool;
var Client = require(__dirname+'/client');
var defaults = require(__dirname + '/defaults');
//connection pool global cache
var clientPools = {
}
var poolEnabled = function() {
return defaults.poolSize;
}
var log = function() {
}
var log = function() {
console.log.apply(console, arguments);
}
var getPooledClient = function(config, callback) {
//lookup pool using config as key
//TODO this don't work so hot w/ object configs
var pool = clientPools[config];
//create pool if doesn't exist
if(!pool) {
log("creating pool %s", config)
pool = clientPools[config] = new Pool(defaults.poolSize, function() {
log("creating new client in pool %s", config)
var client = new Client(config);
client.connected = false;
return client;
})
}
pool.checkOut(function(err, client) {
//if client already connected just
//pass it along to the callback and return
if(client.connected) {
callback(null, client);
return;
}
var onError = function(error) {
client.connection.removeListener('readyForQuery', onReady);
callback(error);
pool.checkIn(client);
}
var onReady = function() {
client.removeListener('error', onError);
client.connected = true;
callback(null, client);
client.on('drain', function() {
pool.checkIn(client);
});
}
client.once('error', onError);
//TODO refactor
//i don't like reaching into the client's connection for attaching
//to specific events here
client.connection.once('readyForQuery', onReady);
client.connect();
});
}
//destroys the world
var end = function(callback) {
for(var name in clientPools) {
var pool = clientPools[name];
log("destroying pool %s", name);
pool.waits.forEach(function(wait) {
wait(new Error("Client is being destroyed"))
})
pool.waits = [];
pool.items.forEach(function(item) {
var client = item.ref;
if(client.activeQuery) {
log("client is still active, waiting for it to complete");
client.on('drain', client.end.bind(client))
} else {
client.end();
}
})
//remove reference to pool lookup
clientPools[name] = null;
delete(clientPools[name])
}
}
//wrap up common connection management boilerplate
var connect = function(config, callback) {
if(poolEnabled()) {
return getPooledClient(config, callback)
}
throw new Error("FUCK")
var client = new Client(config);
client.connect();
@ -32,5 +127,6 @@ module.exports = {
Client: Client,
Connection: require(__dirname + '/connection'),
connect: connect,
defaults: require(__dirname + '/defaults')
end: end,
defaults: defaults
}

View File

@ -1,4 +1,5 @@
var events = require('events');
var sys = require('sys');
if(typeof events.EventEmitter.prototype.once !== 'function') {
events.EventEmitter.prototype.once = function (type, listener) {
@ -10,12 +11,13 @@ if(typeof events.EventEmitter.prototype.once !== 'function') {
};
}
var Pool = function(maxSize, createFn) {
events.EventEmitter.call(this);
this.maxSize = maxSize;
this.createFn = createFn;
this.items = [];
this.waits = [];
}
sys.inherits(Pool, events.EventEmitter);
var p = Pool.prototype;
p.checkOut = function(callback) {
@ -27,10 +29,19 @@ p.checkOut = function(callback) {
}
}
//check if we can create a new item
if(len < this.maxSize && this.createFn) {
var item = {ref: this.createFn()}
if(this.items.length < this.maxSize && this.createFn) {
var result = this.createFn();
var item = result;
//create function can return item conforming to interface
//of stored items to allow for create function to create
//checked out items
if(typeof item.checkedIn == "undefined") {
var item = {ref: result, checkedIn: true}
}
this.items.push(item);
return this._pulse(item, callback)
if(item.checkedIn) {
return this._pulse(item, callback)
}
}
this.waits.push(callback);
return false; //did not execute sync
@ -42,17 +53,19 @@ p.checkIn = function(item) {
var currentItem = this.items[i];
if(currentItem.ref == item) {
currentItem.checkedIn = true;
return this._pulse(currentItem);
this._pulse(currentItem);
return true;
}
}
//add new item
var newItem = {ref: item, checkedIn: true};
this.items.push(newItem);
return this._pulse(newItem);
this._pulse(newItem);
return false;
}
p._pulse = function(item, cb) {
cb = cb || this.waits.pop()
cb = cb || this.waits.shift()
if(cb) {
item.checkedIn = false;
cb(null, item.ref)

View File

@ -36,7 +36,6 @@ test('api', function() {
}))
})
test('executing nested queries', function() {
pg.connect(helper.args, assert.calls(function(err, client) {
client.query('select now as now from NOW()', assert.calls(function(err, result) {
@ -50,9 +49,11 @@ test('executing nested queries', function() {
}))
})
test('raises error if cannot connect', function() {
pg.connect({database:'asdlfkajsdf there is no way this is a real database, right?!'}, assert.calls(function(err, client) {
var connectionString = "pg://asdf@seoiasfd:4884/ieieie";
pg.connect(connectionString, assert.calls(function(err, client) {
assert.ok(err, 'should have raised an error')
}))
})
pg.end();

View File

@ -0,0 +1,32 @@
var helper = require(__dirname + "/test-helper")
setTimeout(function() {
helper.pg.defaults.poolSize = 10;
test('executes a single pooled connection/query', function() {
var args = helper.args;
var conString = "pg://"+args.user+":"+args.password+"@"+args.host+":"+args.port+"/"+args.database;
var queryCount = 0;
helper.pg.connect(conString, assert.calls(function(err, client) {
assert.isNull(err);
client.query("select * from NOW()", assert.calls(function(err, result) {
assert.isNull(err);
queryCount++;
}))
}))
var id = setTimeout(function() {
assert.equal(queryCount, 1)
}, 1000)
var check = function() {
setTimeout(function() {
if(queryCount == 1) {
clearTimeout(id)
helper.pg.end();
} else {
check();
}
}, 50)
}
check();
})
}, 1000)

View File

@ -0,0 +1,4 @@
module.exports = {
args: require(__dirname + "/../test-helper").args,
pg: require(__dirname + "/../../../lib")
}

View File

@ -86,7 +86,7 @@ var expect = function(callback, timeout) {
var executed = false;
var id = setTimeout(function() {
assert.ok(executed, "Expected execution of " + callback + " fired");
}, timeout || 1000)
}, timeout || 2000)
return function(err, queryResult) {
clearTimeout(id);

View File

@ -60,7 +60,7 @@ test('an empty pool', function() {
}))
assert.ok(sync, "Should have generated item & called callback in sync")
})
test('creates again if item is checked out', function() {
var sync = pool.checkOut(assert.calls(function(err, item) {
assert.equal(item.name, "first2")
@ -87,6 +87,30 @@ test('an empty pool', function() {
pool.checkIn(external);
})
})
})
test('when creating async new pool members', function() {
var count = 0;
var pool = new Pool(3, function() {
var item = {ref: {name: ++count}, checkedIn: false};
process.nextTick(function() {
pool.checkIn(item.ref)
})
return item;
})
test('one request recieves member', function() {
pool.checkOut(assert.calls(function(err, item) {
assert.equal(item.name, 1)
pool.checkOut(assert.calls(function(err, item) {
assert.equal(item.name, 2)
pool.checkOut(assert.calls(function(err, item) {
assert.equal(item.name, 3)
}))
}))
}))
})
})