node-postgres/lib/client.js

327 lines
8.2 KiB
JavaScript
Raw Normal View History

2010-10-23 07:16:40 +08:00
var crypto = require('crypto');
2010-10-24 01:46:27 +08:00
var EventEmitter = require('events').EventEmitter;
2011-10-11 08:40:52 +08:00
var util = require('util');
2013-12-06 07:01:51 +08:00
var pgPass = require('pgpass');
var ConnectionParameters = require(__dirname + '/connection-parameters');
var Query = require(__dirname + '/query');
2010-11-21 04:09:18 +08:00
var defaults = require(__dirname + '/defaults');
2010-10-24 08:02:13 +08:00
var Connection = require(__dirname + '/connection');
var CopyFromStream = require(__dirname + '/copystream').CopyFromStream;
var CopyToStream = require(__dirname + '/copystream').CopyToStream;
2013-01-24 08:15:37 +08:00
2010-10-11 11:37:30 +08:00
var Client = function(config) {
EventEmitter.call(this);
this.connectionParameters = new ConnectionParameters(config);
this.user = this.connectionParameters.user;
this.database = this.connectionParameters.database;
this.port = this.connectionParameters.port;
this.host = this.connectionParameters.host;
this.password = this.connectionParameters.password;
var c = config || {};
this.connection = c.connection || new Connection({
stream: c.stream,
ssl: this.connectionParameters.ssl
});
2010-10-11 11:37:30 +08:00
this.queryQueue = [];
this.binary = c.binary || defaults.binary;
2010-10-20 13:34:16 +08:00
this.encoding = 'utf8';
2011-11-02 23:07:14 +08:00
this.processID = null;
this.secretKey = null;
this.ssl = this.connectionParameters.ssl || false;
2010-10-11 11:37:30 +08:00
};
2011-10-11 08:40:52 +08:00
util.inherits(Client, EventEmitter);
2010-10-11 11:37:30 +08:00
Client.prototype.connect = function(callback) {
2010-10-11 11:37:30 +08:00
var self = this;
2010-10-24 08:02:13 +08:00
var con = this.connection;
2013-12-06 07:01:51 +08:00
if(this.host && this.host.indexOf('/') === 0) {
con.connect(this.host + '/.s.PGSQL.' + this.port);
} else {
con.connect(this.port, this.host);
}
2010-10-24 09:26:24 +08:00
2010-10-25 11:52:12 +08:00
//once connection is established send startup message
2010-10-24 09:26:24 +08:00
con.on('connect', function() {
if(self.ssl) {
con.requestSsl();
} else {
con.startup({
user: self.user,
database: self.database
});
}
});
con.on('sslconnect', function() {
con.startup({
2010-10-24 09:26:24 +08:00
user: self.user,
database: self.database
});
});
2013-12-06 07:01:51 +08:00
function checkPgPass(cb) {
return function(msg) {
if (null !== self.password) {
cb(msg);
} else {
pgPass(self.connectionParameters, function(pass){
if (undefined !== pass) {
self.connectionParameters.password = self.password = pass;
}
cb(msg);
});
}
};
}
2010-10-25 11:52:12 +08:00
//password request handling
2013-12-06 07:01:51 +08:00
con.on('authenticationCleartextPassword', checkPgPass(function() {
con.password(self.password);
2013-12-06 07:01:51 +08:00
}));
2010-10-25 11:52:12 +08:00
//password request handling
2013-12-06 07:01:51 +08:00
con.on('authenticationMD5Password', checkPgPass(function(msg) {
2010-10-24 09:26:24 +08:00
var inner = Client.md5(self.password + self.user);
var outer = Client.md5(inner + msg.salt.toString('binary'));
var md5password = "md5" + outer;
con.password(md5password);
2013-12-06 07:01:51 +08:00
}));
2011-11-02 23:07:14 +08:00
con.once('backendKeyData', function(msg) {
2012-03-23 04:32:56 +08:00
self.processID = msg.processID;
self.secretKey = msg.secretKey;
});
2011-11-02 23:07:14 +08:00
2011-02-05 10:03:41 +08:00
//hook up query handling events to connection
//after the connection initially becomes ready for queries
2011-02-05 08:51:23 +08:00
con.once('readyForQuery', function() {
//delegate rowDescription to active query
con.on('rowDescription', function(msg) {
self.activeQuery.handleRowDescription(msg);
});
//delegate dataRow to active query
con.on('dataRow', function(msg) {
self.activeQuery.handleDataRow(msg);
});
//delegate portalSuspended to active query
con.on('portalSuspended', function(msg) {
self.activeQuery.handlePortalSuspended(con);
});
//delegate commandComplete to active query
con.on('commandComplete', function(msg) {
self.activeQuery.handleCommandComplete(msg, con);
});
con.on('copyInResponse', function(msg) {
2013-10-22 13:23:43 +08:00
self.activeQuery.handleCopyInResponse(self.connection);
});
con.on('copyOutResponse', function(msg) {
2013-10-22 13:23:43 +08:00
if(self.activeQuery.stream === undefined) {
self.activeQuery._canceledDueToError = new Error('No destination stream defined');
2013-01-24 08:15:37 +08:00
//canceling query requires creation of new connection
//look for postgres frontend/backend protocol
2013-10-22 13:23:43 +08:00
//TODO - this needs to die/be refactored
2013-01-24 08:15:37 +08:00
(new self.constructor({port: self.port, host: self.host}))
.cancel(self, self.activeQuery);
}
});
con.on('copyData', function (msg) {
2013-10-22 13:23:43 +08:00
self.activeQuery.handleCopyData(msg, self.connection);
});
2011-03-05 03:30:19 +08:00
con.on('notification', function(msg) {
self.emit('notification', msg);
});
2011-03-05 03:30:19 +08:00
//process possible callback argument to Client#connect
if (callback) {
callback(null, self);
//remove callback for proper error handling
//after the connect event
callback = null;
}
self.emit('connect');
2011-02-05 10:03:41 +08:00
});
2011-02-05 08:51:23 +08:00
2011-02-05 10:03:41 +08:00
con.on('readyForQuery', function() {
var activeQuery = self.activeQuery;
self.activeQuery = null;
2011-02-05 10:03:41 +08:00
self.readyForQuery = true;
self._pulseQueryQueue();
if(activeQuery) {
activeQuery.handleReadyForQuery();
}
2010-10-25 11:52:12 +08:00
});
2010-10-26 14:51:12 +08:00
con.on('error', function(error) {
if(self.activeQuery) {
var activeQuery = self.activeQuery;
self.activeQuery = null;
return activeQuery.handleError(error, con);
}
if(!callback) {
return self.emit('error', error);
}
callback(error);
2010-10-26 14:51:12 +08:00
});
con.once('end', function() {
if(self.activeQuery) {
2013-10-21 22:39:49 +08:00
var disconnectError = new Error('Stream unexpectedly ended during query execution');
self.activeQuery.handleError(disconnectError);
self.activeQuery = null;
}
self.emit('end');
});
con.on('notice', function(msg) {
self.emit('notice', msg);
});
2010-10-24 02:12:49 +08:00
};
Client.prototype.cancel = function(client, query) {
if(client.activeQuery == query) {
2012-03-23 04:32:56 +08:00
var con = this.connection;
if(this.host && this.host.indexOf('/') === 0) {
con.connect(this.host + '/.s.PGSQL.' + this.port);
} else {
con.connect(this.port, this.host);
}
//once connection is established send cancel message
con.on('connect', function() {
con.cancel(client.processID, client.secretKey);
});
} else if(client.queryQueue.indexOf(query) != -1) {
2012-03-23 04:32:56 +08:00
client.queryQueue.splice(client.queryQueue.indexOf(query), 1);
}
2011-11-02 23:07:14 +08:00
};
// Ported from PostgreSQL 9.2.4 source code in src/interfaces/libpq/fe-exec.c
Client.prototype.escapeIdentifier = function(str) {
var escaped = '"';
for(var i = 0; i < str.length; i++) {
var c = str[i];
if(c === '"') {
escaped += c + c;
} else {
escaped += c;
}
}
escaped += '"';
return escaped;
2013-07-12 07:23:59 +08:00
};
// Ported from PostgreSQL 9.2.4 source code in src/interfaces/libpq/fe-exec.c
Client.prototype.escapeLiteral = function(str) {
var hasBackslash = false;
var escaped = '\'';
for(var i = 0; i < str.length; i++) {
var c = str[i];
if(c === '\'') {
escaped += c + c;
} else if (c === '\\') {
escaped += c + c;
hasBackslash = true;
} else {
escaped += c;
}
}
escaped += '\'';
if(hasBackslash === true) {
escaped = ' E' + escaped;
}
return escaped;
2013-07-12 07:23:59 +08:00
};
Client.prototype._pulseQueryQueue = function() {
if(this.readyForQuery===true) {
2011-02-05 10:03:41 +08:00
this.activeQuery = this.queryQueue.shift();
if(this.activeQuery) {
this.readyForQuery = false;
this.hasExecuted = true;
2011-02-05 10:03:41 +08:00
this.activeQuery.submit(this.connection);
} else if(this.hasExecuted) {
this.activeQuery = null;
2013-03-08 06:12:09 +08:00
this.emit('drain');
}
2010-10-11 11:37:30 +08:00
}
};
Client.prototype._copy = function (text, stream) {
var config = {};
config.text = text;
config.stream = stream;
config.callback = function (error) {
if(error) {
config.stream.error(error);
} else {
config.stream.close();
}
2013-01-21 21:40:18 +08:00
};
var query = new Query(config);
this.queryQueue.push(query);
this._pulseQueryQueue();
return config.stream;
2010-10-11 11:37:30 +08:00
};
Client.prototype.copyFrom = function (text) {
return this._copy(text, new CopyFromStream());
2013-01-21 21:40:18 +08:00
};
Client.prototype.copyTo = function (text) {
return this._copy(text, new CopyToStream());
2013-01-21 21:40:18 +08:00
};
Client.prototype.query = function(config, values, callback) {
2012-08-10 07:31:32 +08:00
//can take in strings, config object or query object
var query = (typeof config.submit == 'function') ? config :
2013-01-24 08:15:37 +08:00
new Query(config, values, callback);
if(this.binary && !query.binary) {
2012-08-10 07:31:32 +08:00
query.binary = true;
}
2011-02-05 09:45:30 +08:00
2010-10-25 12:32:18 +08:00
this.queryQueue.push(query);
this._pulseQueryQueue();
2010-10-25 12:32:18 +08:00
return query;
2010-10-24 02:12:49 +08:00
};
Client.prototype.end = function() {
2010-10-26 10:24:17 +08:00
this.connection.end();
};
2010-10-25 11:52:12 +08:00
Client.md5 = function(string) {
return crypto.createHash('md5').update(string).digest('hex');
2010-10-24 02:12:49 +08:00
};
// expose a Query constructor
Client.Query = Query;
2010-10-11 11:37:30 +08:00
module.exports = Client;