node-postgres/lib/native/index.js

194 lines
5.2 KiB
JavaScript
Raw Normal View History

var Native = require('pg-native');
var TypeOverrides = require('../type-overrides');
var semver = require('semver');
var pkg = require('../../package.json');
var assert = require('assert');
var EventEmitter = require('events').EventEmitter;
var util = require('util');
2014-09-14 13:23:02 +08:00
var ConnectionParameters = require(__dirname + '/../connection-parameters');
2014-11-19 00:51:40 +08:00
var msg = 'Version >= ' + pkg.minNativeVersion + ' of pg-native required.';
2014-11-18 22:32:16 +08:00
assert(semver.gte(Native.version, pkg.minNativeVersion), msg);
var NativeQuery = require('./query');
2014-09-14 10:37:30 +08:00
var Client = module.exports = function(config) {
EventEmitter.call(this);
config = config || {};
this._types = new TypeOverrides(config.types);
this.native = new Native({
types: this._types
});
this._queryQueue = [];
2014-09-14 13:23:02 +08:00
this._connected = false;
//keep these on the object for legacy reasons
//for the time being. TODO: deprecate all this jazz
2014-09-15 09:11:51 +08:00
var cp = this.connectionParameters = new ConnectionParameters(config);
2014-09-14 13:23:02 +08:00
this.user = cp.user;
this.password = cp.password;
this.database = cp.database;
this.host = cp.host;
this.port = cp.port;
2014-09-15 09:11:51 +08:00
//a hash to hold named queries
this.namedQueries = {};
};
util.inherits(Client, EventEmitter);
//connect to the backend
//pass an optional callback to be called once connected
//or with an error if there was a connection error
//if no callback is passed and there is a connection error
//the client will emit an error event.
Client.prototype.connect = function(cb) {
var self = this;
2014-09-15 09:11:51 +08:00
var onError = function(err) {
if(cb) return cb(err);
return self.emit('error', err);
};
2014-09-15 09:11:51 +08:00
this.connectionParameters.getLibpqConnectionString(function(err, conString) {
if(err) return onError(err);
self.native.connect(conString, function(err) {
if(err) return onError(err);
//set internal states to connected
self._connected = true;
//handle connection errors from the native layer
self.native.on('error', function(err) {
//error will be handled by active query
if(self._activeQuery && self._activeQuery.state != 'end') {
return;
}
self.emit('error', err);
});
self.native.on('notification', function(msg) {
self.emit('notification', {
channel: msg.relname,
payload: msg.extra
});
});
//signal we are connected now
self.emit('connect');
self._pulseQueryQueue(true);
2014-09-15 09:11:51 +08:00
//possibly call the optional callback
if(cb) cb();
});
});
};
2014-11-19 00:51:40 +08:00
//send a query to the server
//this method is highly overloaded to take
//1) string query, optional array of parameters, optional function callback
//2) object query with {
// string query
// optional array values,
// optional function callback instead of as a separate parameter
// optional string name to name & cache the query plan
// optional string rowMode = 'array' for an array of results
// }
Client.prototype.query = function(config, values, callback) {
var query = new NativeQuery(this.native);
//support query('text', ...) style calls
if(typeof config == 'string') {
query.text = config;
}
//support passing everything in via a config object
if(typeof config == 'object') {
query.text = config.text;
query.values = config.values;
query.name = config.name;
query.callback = config.callback;
query._arrayMode = config.rowMode == 'array';
}
//support query({...}, function() {}) style calls
//& support query(..., ['values'], ...) style calls
if(typeof values == 'function') {
query.callback = values;
}
else if(util.isArray(values)) {
query.values = values;
}
if(typeof callback == 'function') {
query.callback = callback;
}
this._queryQueue.push(query);
this._pulseQueryQueue();
return query;
};
2014-11-19 00:51:40 +08:00
//disconnect from the backend server
Client.prototype.end = function(cb) {
2014-09-14 13:23:02 +08:00
var self = this;
2014-09-24 06:57:20 +08:00
if(!this._connected) {
this.once('connect', this.end.bind(this, cb));
}
2014-09-14 13:23:02 +08:00
this.native.end(function() {
2014-09-15 09:11:51 +08:00
//send an error to the active query
if(self._hasActiveQuery()) {
var msg = 'Connection terminated';
self._queryQueue.length = 0;
self._activeQuery.handleError(new Error(msg));
}
2014-09-14 13:23:02 +08:00
self.emit('end');
if(cb) cb();
});
};
2014-09-15 09:11:51 +08:00
Client.prototype._hasActiveQuery = function() {
return this._activeQuery && this._activeQuery.state != 'error' && this._activeQuery.state != 'end';
};
Client.prototype._pulseQueryQueue = function(initialConnection) {
if(!this._connected) {
return;
}
2014-09-15 09:11:51 +08:00
if(this._hasActiveQuery()) {
return;
}
var query = this._queryQueue.shift();
if(!query) {
if(!initialConnection) {
this.emit('drain');
}
return;
}
this._activeQuery = query;
2014-09-15 09:11:51 +08:00
query.submit(this);
2014-09-14 10:37:30 +08:00
var self = this;
query.once('_done', function() {
self._pulseQueryQueue();
});
};
2014-10-10 09:12:17 +08:00
2014-11-19 00:51:40 +08:00
//attempt to cancel an in-progress query
2014-10-10 09:12:17 +08:00
Client.prototype.cancel = function(query) {
if(this._activeQuery == query) {
this.native.cancel(function() {});
} else if (this._queryQueue.indexOf(query) != -1) {
this._queryQueue.splice(this._queryQueue.indexOf(query), 1);
}
};
Client.prototype.setTypeParser = function(oid, format, parseFn) {
return this._types.setTypeParser(oid, format, parseFn);
};
Client.prototype.getTypeParser = function(oid, format) {
return this._types.getTypeParser(oid, format);
};