var sys = require('sys'); var net = require('net'); var crypto = require('crypto'); var EventEmitter = require('events').EventEmitter; var utils = require(__dirname + '/utils'); var BufferList = require(__dirname + '/buffer-list'); var Connection = function(config) { EventEmitter.call(this); config = config || {}; this.stream = config.stream || new net.Stream(); this.lastBuffer = false; this.lastOffset = 0; this.buffer = null; this.offset = null; this.encoding = 'utf8'; }; sys.inherits(Connection, EventEmitter); var p = Connection.prototype; p.connect = function(port, host) { if(this.stream.readyState === 'closed'){ this.stream.connect(port, host); } else if(this.stream.readyState == 'open') { this.emit('connect'); } var self = this; this.stream.on('connect', function() { self.emit('connect'); }); this.stream.on('data', function(buffer) { self.setBuffer(buffer); var msg; while(msg = self.parseMessage()) { self.emit('message', msg); self.emit(msg.name, msg); } }); }; p.startup = function(config) { var buffer = new BufferList() .addInt16(3) .addInt16(0) .addCString('user') .addCString(config.user) .addCString('database') .addCString(config.database) .addCString(''); this.send(false, buffer.join()); }; p.password = function(password) { this.send('p', Buffer(password + '\0', this.encoding)); }; p.send = function(code, bodyBuffer) { var length = bodyBuffer.length + 4; var buffer = Buffer(length + (code ? 1 : 0)); var offset = 0; if(code) { buffer[offset++] = Buffer(code, this.encoding) [0]; } this.writeInt32(buffer, offset, length); bodyBuffer.copy(buffer, offset+4, 0); return this.stream.write(buffer); }; p.writeInt32 = function(buffer, offset, value) { buffer[offset++] = value >>> 24 & 0xFF; buffer[offset++] = value >>> 16 & 0xFF; buffer[offset++] = value >>> 8 & 0xFF; buffer[offset++] = value >>> 0 & 0xFF; }; p.end = function() { var terminationBuffer = new Buffer([0x58,0,0,0,4]); var wrote = this.stream.end(terminationBuffer); }; p.query = function(text) { this.send('Q', new Buffer(text + '\0', this.encoding)); }; p.parse = function(query) { //expect something like this: // { name: 'queryName', // text: 'select * from blah', // types: ['int8', 'bool'] } //normalize missing query names to allow for null query.name = query.name || ''; //normalize null type array query.types = query.types || []; var len = query.types.length; var buffer = new BufferList() .addCString(query.name) //name of query .addCString(query.text) //actual query text .addInt16(len); for(var i = 0; i < len; i++) { buffer.addInt32(query.types[i]); } this.send('P', buffer.join()); return this; }; p.bind = function(config) { //normalize config config = config || {}; config.portal = config.portal || ''; config.statement = config.statement || ''; var values = config.values || []; var len = values.length; var buffer = new BufferList() .addCString(config.portal) .addCString(config.statement) .addInt16(0) //always use default text format .addInt16(len); //number of parameters for(var i = 0; i < len; i++) { var val = values[i]; if(val === null) { buffer.addInt32(-1); } else { val = val.toString(); buffer.addInt32(Buffer.byteLength(val)); buffer.add(Buffer(val,this.encoding)); } } buffer.addInt16(0); //no format codes, use text this.send('B', buffer.join()); }; p.execute = function(config) { config = config || {}; config.portal = config.portal || ''; config.rows = config.rows || ''; var buffer = new BufferList() .addCString(config.portal) .addInt32(config.rows) .join(); this.send('E', buffer); }; p.flush = function() { this.send('H',Buffer(0)); } p.sync = function() { this.send('S', Buffer(0)); }; p.end = function() { this.send('X', Buffer(0)); }; //parsing methods p.setBuffer = function(buffer) { if(this.lastBuffer) { //we have unfinished biznaz //need to combine last two buffers var remaining = this.lastBuffer.length - this.lastOffset; var combinedBuffer = new Buffer(buffer.length + remaining); this.lastBuffer.copy(combinedBuffer, 0, this.lastOffset); buffer.copy(combinedBuffer, remaining, 0); buffer = combinedBuffer; } this.buffer = buffer; this.offset = 0; }; var messageNames = { R: 'authenticationOk', S: 'parameterStatus', K: 'backendKeyData', C: 'commandComplete', Z: 'readyForQuery', T: 'rowDescription', D: 'dataRow', E: 'error', N: 'notice', 1: 'parseComplete', 2: 'bindComplete', A: 'notification' }; p.parseMessage = function() { var remaining = this.buffer.length - (this.offset); if(remaining < 5) { //cannot read id + length without at least 5 bytes //just abort the read now this.lastBuffer = this.buffer; this.lastOffset = this.offset; return; } var id = this.readChar(); var message = { id: id, name: messageNames[id], length: this.parseInt32() }; if(remaining <= message.length) { this.lastBuffer = this.buffer; //rewind the last 5 bytes we read this.lastOffset = this.offset-5; return false; } return this["parse"+message.id](message); }; p.parseR = function(msg) { var code = 0; if(msg.length === 8) { code = this.parseInt32(); if(code === 3) { msg.name = 'authenticationCleartextPassword'; } return msg; } if(msg.length === 12) { code = this.parseInt32(); if(code === 5) { //md5 required msg.name = 'authenticationMD5Password'; msg.salt = new Buffer(4); this.buffer.copy(msg.salt, 0, this.offset, this.offset + 4); this.offset += 4; return msg; } } throw new Error("Unknown authenticatinOk message type" + sys.inspect(msg)); }; p.parseS = function(msg) { msg.parameterName = this.parseCString(); msg.parameterValue = this.parseCString(); return msg; }; p.parseK = function(msg) { msg.processID = this.parseInt32(); msg.secretKey = this.parseInt32(); return msg; }; p.parseC = function(msg) { msg.text = this.parseCString(); return msg; }; p.parseZ = function(msg) { msg.status = this.readChar(); return msg; }; p.parseT = function(msg) { msg.fieldCount = this.parseInt16(); var fields = []; for(var i = 0; i < msg.fieldCount; i++){ fields[i] = this.parseField(); } msg.fields = fields; return msg; }; p.parseField = function() { var field = { name: this.parseCString(), tableID: this.parseInt32(), columnID: this.parseInt16(), dataTypeID: this.parseInt32(), dataTypeSize: this.parseInt16(), dataTypeModifier: this.parseInt32(), format: this.parseInt16() === 0 ? 'text' : 'binary' }; return field; }; p.parseD = function(msg) { var fieldCount = this.parseInt16(); var fields = []; for(var i = 0; i < fieldCount; i++) { var length = this.parseInt32(); fields[i] = (length === -1 ? null : this.readString(length)) }; msg.fieldCount = fieldCount; msg.fields = fields; return msg; }; //parses error p.parseE = function(msg) { var fields = {}; var fieldType = this.readString(1); while(fieldType != '\0') { fields[fieldType] = this.parseCString(); fieldType = this.readString(1); } msg.severity = fields.S; msg.code = fields.C; msg.message = fields.M; msg.detail = fields.D; msg.hint = fields.H; msg.position = fields.P; msg.internalPosition = fields.p; msg.internalQuery = fields.q; msg.where = fields.W; msg.file = fields.F; msg.line = fields.L; msg.routine = fields.R; return msg; }; //same thing, different name p.parseN = p.parseE; //parses parseComplete p.parse1 = function(msg) { return msg; }; //parses bindComplete p.parse2 = function(msg) { return msg; }; p.parseA = function(msg) { msg.processId = this.parseInt32(); msg.channel = this.parseCString(); msg.payload = this.parseCString(); return msg; }; p.readChar = function() { return Buffer([this.buffer[this.offset++]]).toString(this.encoding); }; p.parseInt32 = function() { var value = this.peekInt32(); this.offset += 4; return value; }; p.peekInt32 = function(offset) { offset = offset || this.offset; var buffer = this.buffer; return ((buffer[offset++] << 24) + (buffer[offset++] << 16) + (buffer[offset++] << 8) + buffer[offset++]); }; p.parseInt16 = function() { return ((this.buffer[this.offset++] << 8) + (this.buffer[this.offset++] << 0)); }; p.readString = function(length) { return this.buffer.toString(this.encoding, this.offset, (this.offset += length)); }; p.parseCString = function() { var start = this.offset; while(this.buffer[this.offset++]) { }; return this.buffer.toString(this.encoding, start, this.offset - 1); }; //end parsing methods module.exports = Connection;