@ -3,93 +3,95 @@ var EventEmitter = require('events').EventEmitter;
var defaults = require ( './defaults' ) ;
var genericPool = require ( 'generic-pool' ) ;
var pools = {
//dictionary of all key:pool pairs
all : { } ,
//reference to the client constructor - can override in tests or for require('pg').native
Client : require ( './client' ) ,
getOrCreate : function ( clientConfig ) {
clientConfig = clientConfig || { } ;
var name = JSON . stringify ( clientConfig ) ;
var pool = pools . all [ name ] ;
if ( pool ) {
return pool ;
}
pool = genericPool . Pool ( {
name : name ,
max : clientConfig . poolSize || defaults . poolSize ,
idleTimeoutMillis : clientConfig . poolIdleTimeout || defaults . poolIdleTimeout ,
reapIntervalMillis : clientConfig . reapIntervalMillis || defaults . reapIntervalMillis ,
log : clientConfig . poolLog || defaults . poolLog ,
create : function ( cb ) {
var client = new pools . Client ( clientConfig ) ;
// Ignore errors on pooled clients until they are connected.
client . on ( 'error' , Function . prototype ) ;
client . connect ( function ( err ) {
if ( err ) return cb ( err , null ) ;
// Remove the noop error handler after a connection has been established.
client . removeListener ( 'error' , Function . prototype ) ;
module . exports = function ( Client ) {
var pools = {
//dictionary of all key:pool pairs
all : { } ,
//reference to the client constructor - can override in tests or for require('pg').native
getOrCreate : function ( clientConfig ) {
clientConfig = clientConfig || { } ;
var name = JSON . stringify ( clientConfig ) ;
var pool = pools . all [ name ] ;
if ( pool ) {
return pool ;
}
pool = genericPool . Pool ( {
name : name ,
max : clientConfig . poolSize || defaults . poolSize ,
idleTimeoutMillis : clientConfig . poolIdleTimeout || defaults . poolIdleTimeout ,
reapIntervalMillis : clientConfig . reapIntervalMillis || defaults . reapIntervalMillis ,
log : clientConfig . poolLog || defaults . poolLog ,
create : function ( cb ) {
var client = new Client ( clientConfig ) ;
// Ignore errors on pooled clients until they are connected.
client . on ( 'error' , Function . prototype ) ;
client . connect ( function ( err ) {
if ( err ) return cb ( err , null ) ;
//handle connected client background errors by emitting event
//via the pg object and then removing errored client from the pool
client . on ( 'error' , function ( e ) {
pool . emit ( 'error' , e , client ) ;
// Remove the noop error handler after a connection has been established.
client . removeListener ( 'error' , Function . prototype ) ;
// If the client is already being destroyed, the error
// occurred during stream ending. Do not attempt to destroy
// the client again.
if ( ! client . _destroying ) {
pool . destroy ( client ) ;
}
} ) ;
//handle connected client background errors by emitting event
//via the pg object and then removing errored client from the pool
client . on ( 'error' , function ( e ) {
pool . emit ( 'error' , e , client ) ;
// Remove connection from pool on disconnect
client . on ( 'end' , function ( e ) {
// Do not enter infinite loop between pool.destroy
// and client 'end' event...
if ( ! client . _destroying ) {
// If the client is already being destroyed, the error
// occurred during stream ending. Do not attempt to destroy
// the client again.
if ( ! client . _destroying ) {
pool . destroy ( client ) ;
}
} ) ;
// Remove connection from pool on disconnect
client . on ( 'end' , function ( e ) {
// Do not enter infinite loop between pool.destroy
// and client 'end' event...
if ( ! client . _destroying ) {
pool . destroy ( client ) ;
}
} ) ;
client . poolCount = 0 ;
return cb ( null , client ) ;
} ) ;
} ,
destroy : function ( client ) {
client . _destroying = true ;
client . poolCount = undefined ;
client . end ( ) ;
}
} ) ;
pools . all [ name ] = pool ;
//mixin EventEmitter to pool
EventEmitter . call ( pool ) ;
for ( var key in EventEmitter . prototype ) {
if ( EventEmitter . prototype . hasOwnProperty ( key ) ) {
pool [ key ] = EventEmitter . prototype [ key ] ;
}
}
//monkey-patch with connect method
pool . connect = function ( cb ) {
var domain = process . domain ;
pool . acquire ( function ( err , client ) {
if ( domain ) {
cb = domain . bind ( cb ) ;
}
if ( err ) return cb ( err , null , function ( ) { /*NOOP*/ } ) ;
client . poolCount ++ ;
cb ( null , client , function ( err ) {
if ( err ) {
pool . destroy ( client ) ;
} else {
pool . release ( client ) ;
}
} ) ;
client . poolCount = 0 ;
return cb ( null , client ) ;
} ) ;
} ,
destroy : function ( client ) {
client . _destroying = true ;
client . poolCount = undefined ;
client . end ( ) ;
}
} ) ;
pools . all [ name ] = pool ;
//mixin EventEmitter to pool
EventEmitter . call ( pool ) ;
for ( var key in EventEmitter . prototype ) {
if ( EventEmitter . prototype . hasOwnProperty ( key ) ) {
pool [ key ] = EventEmitter . prototype [ key ] ;
}
} ;
return pool ;
}
//monkey-patch with connect method
pool . connect = function ( cb ) {
var domain = process . domain ;
pool . acquire ( function ( err , client ) {
if ( domain ) {
cb = domain . bind ( cb ) ;
}
if ( err ) return cb ( err , null , function ( ) { /*NOOP*/ } ) ;
client . poolCount ++ ;
cb ( null , client , function ( err ) {
if ( err ) {
pool . destroy ( client ) ;
} else {
pool . release ( client ) ;
}
} ) ;
} ) ;
} ;
return pool ;
}
} ;
} ;
module . exports = pools ;
return pools ;
} ;