var Native = require('pg-native'); var EventEmitter = require('events').EventEmitter; var util = require('util'); var ConnectionParameters = require(__dirname + '/../connection-parameters'); var NativeQuery = require('./query'); var Client = module.exports = function(config) { EventEmitter.call(this); this.native = new Native(require('pg-types')); this._queryQueue = []; this._connected = false; //keep these on the object for legacy reasons //for the time being. TODO: deprecate all this jazz var cp = this.connectionParameters = new ConnectionParameters(config); this.user = cp.user; this.password = cp.password; this.database = cp.database; this.host = cp.host; this.port = cp.port; //a hash to hold named queries this.namedQueries = {}; }; util.inherits(Client, EventEmitter); //connect to the backend //pass an optional callback to be called once connected //or with an error if there was a connection error //if no callback is passed and there is a connection error //the client will emit an error event. Client.prototype.connect = function(cb) { var self = this; var onError = function(err) { if(cb) return cb(err); return self.emit('error', err); }; this.connectionParameters.getLibpqConnectionString(function(err, conString) { if(err) return onError(err); self.native.connect(conString, function(err) { if(err) return onError(err); //set internal states to connected self._connected = true; self.emit('connect'); self._pulseQueryQueue(true); //handle connection errors from the native layer self.native.on('error', function(err) { //error will be handled by active query if(self._activeQuery && self._activeQuery.state != 'end') { return; } self.emit('error', err); }); self.native.on('notification', function(msg) { self.emit('notification', { channel: msg.relname, payload: msg.extra }); }); //possibly call the optional callback if(cb) cb(); }); }); }; Client.prototype.query = function(config, values, callback) { var query = new NativeQuery(this.native); //support query('text', ...) style calls if(typeof config == 'string') { query.text = config; } //support passing everything in via a config object if(typeof config == 'object') { query.text = config.text; query.values = config.values; query.name = config.name; query.callback = config.callback; } //support query({...}, function() {}) style calls //& support query(..., ['values'], ...) style calls if(typeof values == 'function') { query.callback = values; } else if(util.isArray(values)) { query.values = values; } if(typeof callback == 'function') { query.callback = callback; } this._queryQueue.push(query); this._pulseQueryQueue(); return query; }; Client.prototype.end = function(cb) { var self = this; if(!this._connected) { this.once('connect', this.end.bind(this, cb)); } this.native.end(function() { //send an error to the active query if(self._hasActiveQuery()) { var msg = 'Connection terminated'; self._queryQueue.length = 0; self._activeQuery.handleError(new Error(msg)); } self.emit('end'); if(cb) cb(); }); }; Client.prototype._hasActiveQuery = function() { return this._activeQuery && this._activeQuery.state != 'error' && this._activeQuery.state != 'end'; }; Client.prototype._pulseQueryQueue = function(initialConnection) { if(!this._connected) { return; } if(this._hasActiveQuery()) { return; } var query = this._queryQueue.shift(); if(!query) { if(!initialConnection) { this.emit('drain'); } return; } this._activeQuery = query; query.submit(this); var self = this; query.once('_done', function() { self._pulseQueryQueue(); }); };