2014-09-14 00:32:53 +08:00
|
|
|
var Native = require('pg-native');
|
|
|
|
var EventEmitter = require('events').EventEmitter;
|
|
|
|
var util = require('util');
|
2014-09-14 13:23:02 +08:00
|
|
|
var ConnectionParameters = require(__dirname + '/../connection-parameters');
|
2014-09-14 00:32:53 +08:00
|
|
|
|
|
|
|
var NativeQuery = require('./query');
|
|
|
|
|
2014-09-14 10:37:30 +08:00
|
|
|
var Client = module.exports = function(config) {
|
2014-09-14 00:32:53 +08:00
|
|
|
EventEmitter.call(this);
|
|
|
|
this.native = new Native();
|
|
|
|
this._queryQueue = [];
|
2014-09-14 13:23:02 +08:00
|
|
|
this._connected = false;
|
|
|
|
|
|
|
|
//keep these on the object for legacy reasons
|
|
|
|
//for the time being. TODO: deprecate all this jazz
|
2014-09-15 09:11:51 +08:00
|
|
|
var cp = this.connectionParameters = new ConnectionParameters(config);
|
2014-09-14 13:23:02 +08:00
|
|
|
this.user = cp.user;
|
|
|
|
this.password = cp.password;
|
|
|
|
this.database = cp.database;
|
|
|
|
this.host = cp.host;
|
|
|
|
this.port = cp.port;
|
2014-09-15 09:11:51 +08:00
|
|
|
|
|
|
|
//a hash to hold named queries
|
|
|
|
this.namedQueries = {};
|
2014-09-14 00:32:53 +08:00
|
|
|
};
|
|
|
|
|
|
|
|
util.inherits(Client, EventEmitter);
|
|
|
|
|
|
|
|
//connect to the backend
|
|
|
|
//pass an optional callback to be called once connected
|
|
|
|
//or with an error if there was a connection error
|
|
|
|
//if no callback is passed and there is a connection error
|
|
|
|
//the client will emit an error event.
|
|
|
|
Client.prototype.connect = function(cb) {
|
|
|
|
var self = this;
|
|
|
|
|
2014-09-15 09:11:51 +08:00
|
|
|
var onError = function(err) {
|
|
|
|
if(cb) return cb(err);
|
|
|
|
return self.emit('error', err);
|
|
|
|
};
|
2014-09-14 00:32:53 +08:00
|
|
|
|
2014-09-15 09:11:51 +08:00
|
|
|
this.connectionParameters.getLibpqConnectionString(function(err, conString) {
|
|
|
|
if(err) return onError(err);
|
|
|
|
self.native.connect(conString, function(err) {
|
|
|
|
if(err) return onError(err);
|
|
|
|
|
|
|
|
//set internal states to connected
|
|
|
|
self._connected = true;
|
|
|
|
self.emit('connect');
|
|
|
|
self._pulseQueryQueue(true);
|
|
|
|
|
|
|
|
//handle connection errors from the native layer
|
|
|
|
self.native.on('error', function(err) {
|
|
|
|
//error will be handled by active query
|
|
|
|
if(self._activeQuery && self._activeQuery.state != 'end') {
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
self.emit('error', err);
|
|
|
|
});
|
|
|
|
|
|
|
|
self.native.on('notification', function(msg) {
|
|
|
|
self.emit('notification', {
|
|
|
|
channel: msg.relname,
|
|
|
|
payload: msg.extra
|
|
|
|
});
|
|
|
|
});
|
|
|
|
|
|
|
|
//possibly call the optional callback
|
|
|
|
if(cb) cb();
|
|
|
|
});
|
2014-09-14 00:32:53 +08:00
|
|
|
});
|
|
|
|
};
|
|
|
|
|
|
|
|
Client.prototype.query = function(config, values, callback) {
|
|
|
|
var query = new NativeQuery(this.native);
|
|
|
|
|
|
|
|
//support query('text', ...) style calls
|
|
|
|
if(typeof config == 'string') {
|
|
|
|
query.text = config;
|
|
|
|
}
|
|
|
|
|
|
|
|
//support passing everything in via a config object
|
|
|
|
if(typeof config == 'object') {
|
|
|
|
query.text = config.text;
|
|
|
|
query.values = config.values;
|
|
|
|
query.name = config.name;
|
|
|
|
query.callback = config.callback;
|
|
|
|
}
|
|
|
|
|
|
|
|
//support query({...}, function() {}) style calls
|
|
|
|
//& support query(..., ['values'], ...) style calls
|
|
|
|
if(typeof values == 'function') {
|
|
|
|
query.callback = values;
|
|
|
|
}
|
|
|
|
else if(util.isArray(values)) {
|
|
|
|
query.values = values;
|
|
|
|
}
|
|
|
|
if(typeof callback == 'function') {
|
|
|
|
query.callback = callback;
|
|
|
|
}
|
|
|
|
|
|
|
|
this._queryQueue.push(query);
|
|
|
|
this._pulseQueryQueue();
|
|
|
|
return query;
|
|
|
|
};
|
|
|
|
|
|
|
|
Client.prototype.end = function(cb) {
|
2014-09-14 13:23:02 +08:00
|
|
|
var self = this;
|
|
|
|
this.native.end(function() {
|
2014-09-15 09:11:51 +08:00
|
|
|
//send an error to the active query
|
|
|
|
if(self._hasActiveQuery()) {
|
|
|
|
var msg = 'Connection terminated';
|
|
|
|
self._queryQueue.length = 0;
|
|
|
|
self._activeQuery.handleError(new Error(msg));
|
|
|
|
}
|
2014-09-14 13:23:02 +08:00
|
|
|
self.emit('end');
|
|
|
|
if(cb) cb();
|
|
|
|
});
|
2014-09-14 00:32:53 +08:00
|
|
|
};
|
|
|
|
|
2014-09-15 09:11:51 +08:00
|
|
|
Client.prototype._hasActiveQuery = function() {
|
|
|
|
return this._activeQuery && this._activeQuery.state != 'error' && this._activeQuery.state != 'end';
|
|
|
|
};
|
|
|
|
|
2014-09-14 00:32:53 +08:00
|
|
|
Client.prototype._pulseQueryQueue = function(initialConnection) {
|
|
|
|
if(!this._connected) {
|
|
|
|
return;
|
|
|
|
}
|
2014-09-15 09:11:51 +08:00
|
|
|
if(this._hasActiveQuery()) {
|
|
|
|
return;
|
2014-09-14 00:32:53 +08:00
|
|
|
}
|
|
|
|
var query = this._queryQueue.shift();
|
|
|
|
if(!query) {
|
|
|
|
if(!initialConnection) {
|
|
|
|
this.emit('drain');
|
|
|
|
}
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
this._activeQuery = query;
|
2014-09-15 09:11:51 +08:00
|
|
|
query.submit(this);
|
2014-09-14 10:37:30 +08:00
|
|
|
var self = this;
|
|
|
|
query.once('_done', function() {
|
|
|
|
self._pulseQueryQueue();
|
|
|
|
});
|
2014-09-14 00:32:53 +08:00
|
|
|
};
|