node-postgres/lib/connection.js
Kibae Shin 4f790deb73 Support for logical streaming replication (#1271)
* Support for logical streaming replication

* Wrong compare expr in getLibpqConnectionString

* Simplify codes for replication parameter
2017-04-24 13:24:30 -05:00

674 lines
17 KiB
JavaScript

/**
* Copyright (c) 2010-2016 Brian Carlson (brian.m.carlson@gmail.com)
* All rights reserved.
*
* This source code is licensed under the MIT license found in the
* README.md file in the root directory of this source tree.
*/
var net = require('net');
var EventEmitter = require('events').EventEmitter;
var util = require('util');
var Writer = require('buffer-writer');
var Reader = require('packet-reader');
var indexOf =
'indexOf' in Buffer.prototype ?
function indexOf(buffer, value, start) {
return buffer.indexOf(value, start);
} :
function indexOf(buffer, value, start) {
for (var i = start, len = buffer.length; i < len; i++) {
if (buffer[i] === value) {
return i;
}
}
return -1;
};
var TEXT_MODE = 0;
var BINARY_MODE = 1;
var Connection = function(config) {
EventEmitter.call(this);
config = config || {};
this.stream = config.stream || new net.Stream();
this._keepAlive = config.keepAlive;
this.lastBuffer = false;
this.lastOffset = 0;
this.buffer = null;
this.offset = null;
this.encoding = 'utf8';
this.parsedStatements = {};
this.writer = new Writer();
this.ssl = config.ssl || false;
this._ending = false;
this._mode = TEXT_MODE;
this._emitMessage = false;
this._reader = new Reader({
headerSize: 1,
lengthPadding: -4
});
var self = this;
this.on('newListener', function(eventName) {
if(eventName == 'message') {
self._emitMessage = true;
}
});
};
util.inherits(Connection, EventEmitter);
Connection.prototype.connect = function(port, host) {
if(this.stream.readyState === 'closed') {
this.stream.connect(port, host);
} else if(this.stream.readyState == 'open') {
this.emit('connect');
}
var self = this;
this.stream.on('connect', function() {
if (self._keepAlive) {
self.stream.setKeepAlive(true);
}
self.emit('connect');
});
this.stream.on('error', function(error) {
//don't raise ECONNRESET errors - they can & should be ignored
//during disconnect
if(self._ending && error.code == 'ECONNRESET') {
return;
}
self.emit('error', error);
});
this.stream.on('close', function() {
// NOTE: node-0.10 emits both 'end' and 'close'
// for streams closed by the peer, while
// node-0.8 only emits 'close'
self.emit('end');
});
if(!this.ssl) {
return this.attachListeners(this.stream);
}
this.stream.once('data', function(buffer) {
var responseCode = buffer.toString('utf8');
if(responseCode != 'S') {
return self.emit('error', new Error('The server does not support SSL connections'));
}
var tls = require('tls');
self.stream = tls.connect({
socket: self.stream,
servername: host,
rejectUnauthorized: self.ssl.rejectUnauthorized,
ca: self.ssl.ca,
pfx: self.ssl.pfx,
key: self.ssl.key,
passphrase: self.ssl.passphrase,
cert: self.ssl.cert,
NPNProtocols: self.ssl.NPNProtocols
});
self.attachListeners(self.stream);
self.emit('sslconnect');
self.stream.on('error', function(error){
self.emit('error', error);
});
});
};
Connection.prototype.attachListeners = function(stream) {
var self = this;
stream.on('data', function(buff) {
self._reader.addChunk(buff);
var packet = self._reader.read();
while(packet) {
var msg = self.parseMessage(packet);
if(self._emitMessage) {
self.emit('message', msg);
}
self.emit(msg.name, msg);
packet = self._reader.read();
}
});
stream.on('end', function() {
self.emit('end');
});
};
Connection.prototype.requestSsl = function() {
this.checkSslResponse = true;
var bodyBuffer = this.writer
.addInt16(0x04D2)
.addInt16(0x162F).flush();
var length = bodyBuffer.length + 4;
var buffer = new Writer()
.addInt32(length)
.add(bodyBuffer)
.join();
this.stream.write(buffer);
};
Connection.prototype.startup = function(config) {
var writer = this.writer
.addInt16(3)
.addInt16(0)
;
Object.keys(config).forEach(function(key){
var val = config[key];
writer.addCString(key).addCString(val);
});
writer.addCString('client_encoding').addCString("'utf-8'");
var bodyBuffer = writer.addCString('').flush();
//this message is sent without a code
var length = bodyBuffer.length + 4;
var buffer = new Writer()
.addInt32(length)
.add(bodyBuffer)
.join();
this.stream.write(buffer);
};
Connection.prototype.cancel = function(processID, secretKey) {
var bodyBuffer = this.writer
.addInt16(1234)
.addInt16(5678)
.addInt32(processID)
.addInt32(secretKey)
.flush();
var length = bodyBuffer.length + 4;
var buffer = new Writer()
.addInt32(length)
.add(bodyBuffer)
.join();
this.stream.write(buffer);
};
Connection.prototype.password = function(password) {
//0x70 = 'p'
this._send(0x70, this.writer.addCString(password));
};
Connection.prototype._send = function(code, more) {
if(!this.stream.writable) { return false; }
if(more === true) {
this.writer.addHeader(code);
} else {
return this.stream.write(this.writer.flush(code));
}
};
Connection.prototype.query = function(text) {
//0x51 = Q
this.stream.write(this.writer.addCString(text).flush(0x51));
};
//send parse message
//"more" === true to buffer the message until flush() is called
Connection.prototype.parse = function(query, more) {
//expect something like this:
// { name: 'queryName',
// text: 'select * from blah',
// types: ['int8', 'bool'] }
//normalize missing query names to allow for null
query.name = query.name || '';
if (query.name.length > 63) {
console.error('Warning! Postgres only supports 63 characters for query names.');
console.error('You supplied', query.name, '(', query.name.length, ')');
console.error('This can cause conflicts and silent errors executing queries');
}
//normalize null type array
query.types = query.types || [];
var len = query.types.length;
var buffer = this.writer
.addCString(query.name) //name of query
.addCString(query.text) //actual query text
.addInt16(len);
for(var i = 0; i < len; i++) {
buffer.addInt32(query.types[i]);
}
var code = 0x50;
this._send(code, more);
};
//send bind message
//"more" === true to buffer the message until flush() is called
Connection.prototype.bind = function(config, more) {
//normalize config
config = config || {};
config.portal = config.portal || '';
config.statement = config.statement || '';
config.binary = config.binary || false;
var values = config.values || [];
var len = values.length;
var useBinary = false;
for (var j = 0; j < len; j++)
useBinary |= values[j] instanceof Buffer;
var buffer = this.writer
.addCString(config.portal)
.addCString(config.statement);
if (!useBinary)
buffer.addInt16(0);
else {
buffer.addInt16(len);
for (j = 0; j < len; j++)
buffer.addInt16(values[j] instanceof Buffer);
}
buffer.addInt16(len);
for(var i = 0; i < len; i++) {
var val = values[i];
if(val === null || typeof val === "undefined") {
buffer.addInt32(-1);
} else if (val instanceof Buffer) {
buffer.addInt32(val.length);
buffer.add(val);
} else {
buffer.addInt32(Buffer.byteLength(val));
buffer.addString(val);
}
}
if(config.binary) {
buffer.addInt16(1); // format codes to use binary
buffer.addInt16(1);
}
else {
buffer.addInt16(0); // format codes to use text
}
//0x42 = 'B'
this._send(0x42, more);
};
//send execute message
//"more" === true to buffer the message until flush() is called
Connection.prototype.execute = function(config, more) {
config = config || {};
config.portal = config.portal || '';
config.rows = config.rows || '';
this.writer
.addCString(config.portal)
.addInt32(config.rows);
//0x45 = 'E'
this._send(0x45, more);
};
var emptyBuffer = Buffer(0);
Connection.prototype.flush = function() {
//0x48 = 'H'
this.writer.add(emptyBuffer);
this._send(0x48);
};
Connection.prototype.sync = function() {
//clear out any pending data in the writer
this.writer.flush(0);
this.writer.add(emptyBuffer);
this._ending = true;
this._send(0x53);
};
Connection.prototype.end = function() {
//0x58 = 'X'
this.writer.add(emptyBuffer);
this._ending = true;
this._send(0x58);
};
Connection.prototype.close = function(msg, more) {
this.writer.addCString(msg.type + (msg.name || ''));
this._send(0x43, more);
};
Connection.prototype.describe = function(msg, more) {
this.writer.addCString(msg.type + (msg.name || ''));
this._send(0x44, more);
};
Connection.prototype.sendCopyFromChunk = function (chunk) {
this.stream.write(this.writer.add(chunk).flush(0x64));
};
Connection.prototype.endCopyFrom = function () {
this.stream.write(this.writer.add(emptyBuffer).flush(0x63));
};
Connection.prototype.sendCopyFail = function (msg) {
//this.stream.write(this.writer.add(emptyBuffer).flush(0x66));
this.writer.addCString(msg);
this._send(0x66);
};
var Message = function(name, length) {
this.name = name;
this.length = length;
};
Connection.prototype.parseMessage = function(buffer) {
this.offset = 0;
var length = buffer.length + 4;
switch(this._reader.header)
{
case 0x52: //R
return this.parseR(buffer, length);
case 0x53: //S
return this.parseS(buffer, length);
case 0x4b: //K
return this.parseK(buffer, length);
case 0x43: //C
return this.parseC(buffer, length);
case 0x5a: //Z
return this.parseZ(buffer, length);
case 0x54: //T
return this.parseT(buffer, length);
case 0x44: //D
return this.parseD(buffer, length);
case 0x45: //E
return this.parseE(buffer, length);
case 0x4e: //N
return this.parseN(buffer, length);
case 0x31: //1
return new Message('parseComplete', length);
case 0x32: //2
return new Message('bindComplete', length);
case 0x33: //3
return new Message('closeComplete', length);
case 0x41: //A
return this.parseA(buffer, length);
case 0x6e: //n
return new Message('noData', length);
case 0x49: //I
return new Message('emptyQuery', length);
case 0x73: //s
return new Message('portalSuspended', length);
case 0x47: //G
return this.parseG(buffer, length);
case 0x48: //H
return this.parseH(buffer, length);
case 0x57: //W
return new Message('replicationStart', length);
case 0x63: //c
return new Message('copyDone', length);
case 0x64: //d
return this.parsed(buffer, length);
}
};
Connection.prototype.parseR = function(buffer, length) {
var code = 0;
var msg = new Message('authenticationOk', length);
if(msg.length === 8) {
code = this.parseInt32(buffer);
if(code === 3) {
msg.name = 'authenticationCleartextPassword';
}
return msg;
}
if(msg.length === 12) {
code = this.parseInt32(buffer);
if(code === 5) { //md5 required
msg.name = 'authenticationMD5Password';
msg.salt = new Buffer(4);
buffer.copy(msg.salt, 0, this.offset, this.offset + 4);
this.offset += 4;
return msg;
}
}
throw new Error("Unknown authenticationOk message type" + util.inspect(msg));
};
Connection.prototype.parseS = function(buffer, length) {
var msg = new Message('parameterStatus', length);
msg.parameterName = this.parseCString(buffer);
msg.parameterValue = this.parseCString(buffer);
return msg;
};
Connection.prototype.parseK = function(buffer, length) {
var msg = new Message('backendKeyData', length);
msg.processID = this.parseInt32(buffer);
msg.secretKey = this.parseInt32(buffer);
return msg;
};
Connection.prototype.parseC = function(buffer, length) {
var msg = new Message('commandComplete', length);
msg.text = this.parseCString(buffer);
return msg;
};
Connection.prototype.parseZ = function(buffer, length) {
var msg = new Message('readyForQuery', length);
msg.name = 'readyForQuery';
msg.status = this.readString(buffer, 1);
return msg;
};
var ROW_DESCRIPTION = 'rowDescription';
Connection.prototype.parseT = function(buffer, length) {
var msg = new Message(ROW_DESCRIPTION, length);
msg.fieldCount = this.parseInt16(buffer);
var fields = [];
for(var i = 0; i < msg.fieldCount; i++){
fields.push(this.parseField(buffer));
}
msg.fields = fields;
return msg;
};
var Field = function() {
this.name = null;
this.tableID = null;
this.columnID = null;
this.dataTypeID = null;
this.dataTypeSize = null;
this.dataTypeModifier = null;
this.format = null;
};
var FORMAT_TEXT = 'text';
var FORMAT_BINARY = 'binary';
Connection.prototype.parseField = function(buffer) {
var field = new Field();
field.name = this.parseCString(buffer);
field.tableID = this.parseInt32(buffer);
field.columnID = this.parseInt16(buffer);
field.dataTypeID = this.parseInt32(buffer);
field.dataTypeSize = this.parseInt16(buffer);
field.dataTypeModifier = this.parseInt32(buffer);
if(this.parseInt16(buffer) === TEXT_MODE) {
this._mode = TEXT_MODE;
field.format = FORMAT_TEXT;
} else {
this._mode = BINARY_MODE;
field.format = FORMAT_BINARY;
}
return field;
};
var DATA_ROW = 'dataRow';
var DataRowMessage = function(length, fieldCount) {
this.name = DATA_ROW;
this.length = length;
this.fieldCount = fieldCount;
this.fields = [];
};
//extremely hot-path code
Connection.prototype.parseD = function(buffer, length) {
var fieldCount = this.parseInt16(buffer);
var msg = new DataRowMessage(length, fieldCount);
for(var i = 0; i < fieldCount; i++) {
msg.fields.push(this._readValue(buffer));
}
return msg;
};
//extremely hot-path code
Connection.prototype._readValue = function(buffer) {
var length = this.parseInt32(buffer);
if(length === -1) return null;
if(this._mode === TEXT_MODE) {
return this.readString(buffer, length);
}
return this.readBytes(buffer, length);
};
//parses error
Connection.prototype.parseE = function(buffer, length) {
var fields = {};
var msg, item;
var input = new Message('error', length);
var fieldType = this.readString(buffer, 1);
while(fieldType != '\0') {
fields[fieldType] = this.parseCString(buffer);
fieldType = this.readString(buffer, 1);
}
if(input.name === 'error') {
// the msg is an Error instance
msg = new Error(fields.M);
for (item in input) {
// copy input properties to the error
if(input.hasOwnProperty(item)) {
msg[item] = input[item];
}
}
} else {
// the msg is an object literal
msg = input;
msg.message = fields.M;
}
msg.severity = fields.S;
msg.code = fields.C;
msg.detail = fields.D;
msg.hint = fields.H;
msg.position = fields.P;
msg.internalPosition = fields.p;
msg.internalQuery = fields.q;
msg.where = fields.W;
msg.schema = fields.s;
msg.table = fields.t;
msg.column = fields.c;
msg.dataType = fields.d;
msg.constraint = fields.n;
msg.file = fields.F;
msg.line = fields.L;
msg.routine = fields.R;
return msg;
};
//same thing, different name
Connection.prototype.parseN = function(buffer, length) {
var msg = this.parseE(buffer, length);
msg.name = 'notice';
return msg;
};
Connection.prototype.parseA = function(buffer, length) {
var msg = new Message('notification', length);
msg.processId = this.parseInt32(buffer);
msg.channel = this.parseCString(buffer);
msg.payload = this.parseCString(buffer);
return msg;
};
Connection.prototype.parseG = function (buffer, length) {
var msg = new Message('copyInResponse', length);
return this.parseGH(buffer, msg);
};
Connection.prototype.parseH = function(buffer, length) {
var msg = new Message('copyOutResponse', length);
return this.parseGH(buffer, msg);
};
Connection.prototype.parseGH = function (buffer, msg) {
var isBinary = buffer[this.offset] !== 0;
this.offset++;
msg.binary = isBinary;
var columnCount = this.parseInt16(buffer);
msg.columnTypes = [];
for(var i = 0; i<columnCount; i++) {
msg.columnTypes.push(this.parseInt16(buffer));
}
return msg;
};
Connection.prototype.parsed = function (buffer, length) {
var msg = new Message('copyData', length);
msg.chunk = this.readBytes(buffer, msg.length - 4);
return msg;
};
Connection.prototype.parseInt32 = function(buffer) {
var value = buffer.readInt32BE(this.offset, true);
this.offset += 4;
return value;
};
Connection.prototype.parseInt16 = function(buffer) {
var value = buffer.readInt16BE(this.offset, true);
this.offset += 2;
return value;
};
Connection.prototype.readString = function(buffer, length) {
return buffer.toString(this.encoding, this.offset, (this.offset += length));
};
Connection.prototype.readBytes = function(buffer, length) {
return buffer.slice(this.offset, this.offset += length);
};
Connection.prototype.parseCString = function(buffer) {
var start = this.offset;
var end = indexOf(buffer, 0, start);
this.offset = end + 1;
return buffer.toString(this.encoding, start, end);
};
//end parsing methods
module.exports = Connection;