diff --git a/lib/connection.js b/lib/connection.js index f604857..3a6ad7c 100644 --- a/lib/connection.js +++ b/lib/connection.js @@ -41,7 +41,7 @@ Connection.prototype.connect = function(port, host) { }); this.stream.on('error', function(error) { - //don't raise ECONNRESET errors - they can & should be ignored + //don't raise ECONNRESET errors - they can & should be ignored //during disconnect if(self._ending && error.code == 'ECONNRESET') { return; @@ -81,8 +81,10 @@ Connection.prototype.connect = function(port, host) { Connection.prototype.attachListeners = function(stream) { var self = this; - stream.on('data', function(buffer) { - self.setBuffer(buffer); + stream.on('readable', function() { + var buff = stream.read(); + if(!buff) return; + self.setBuffer(buff); var msg = self.parseMessage(); while(msg) { self.emit('message', msg); @@ -322,8 +324,9 @@ Connection.prototype.parseMessage = function() { //read message id code var id = this.buffer[this.offset++]; + var buffer = this.buffer; //read message length - var length = this.parseInt32(); + var length = this.parseInt32(buffer); if(remaining <= length) { this.lastBuffer = this.buffer; @@ -340,95 +343,106 @@ Connection.prototype.parseMessage = function() { case 0x52: //R msg.name = 'authenticationOk'; - return this.parseR(msg); + msg = this.parseR(msg); + break; case 0x53: //S msg.name = 'parameterStatus'; - return this.parseS(msg); + msg = this.parseS(msg); + break; case 0x4b: //K msg.name = 'backendKeyData'; - return this.parseK(msg); + msg = this.parseK(msg); + break; case 0x43: //C msg.name = 'commandComplete'; - return this.parseC(msg); + msg = this.parseC(msg); + break; case 0x5a: //Z msg.name = 'readyForQuery'; - return this.parseZ(msg); + msg = this.parseZ(msg); + break; case 0x54: //T msg.name = 'rowDescription'; - return this.parseT(msg); + msg = this.parseT(msg); + break; case 0x44: //D - msg.name = 'dataRow'; - return this.parseD(msg); + msg = this.parseD(buffer, length); + break; case 0x45: //E msg.name = 'error'; - return this.parseE(msg); + msg = this.parseE(msg); + break; case 0x4e: //N msg.name = 'notice'; - return this.parseN(msg); + msg = this.parseN(msg); + break; case 0x31: //1 msg.name = 'parseComplete'; - return msg; + break; case 0x32: //2 msg.name = 'bindComplete'; - return msg; + break; case 0x41: //A msg.name = 'notification'; - return this.parseA(msg); + msg = this.parseA(msg); + break; case 0x6e: //n msg.name = 'noData'; - return msg; + break; case 0x49: //I msg.name = 'emptyQuery'; - return msg; + break; case 0x73: //s msg.name = 'portalSuspended'; - return msg; + break; case 0x47: //G msg.name = 'copyInResponse'; - return this.parseGH(msg); + msg = this.parseGH(msg); + break; case 0x48: //H msg.name = 'copyOutResponse'; - return this.parseGH(msg); + msg = this.parseGH(msg); + break; case 0x63: //c msg.name = 'copyDone'; - return msg; + break; case 0x64: //d msg.name = 'copyData'; - return this.parsed(msg); - - default: - throw new Error("Unrecognized message code " + id); + msg = this.parsed(msg); + break; } + return msg; }; Connection.prototype.parseR = function(msg) { var code = 0; + var buffer = this.buffer; if(msg.length === 8) { - code = this.parseInt32(); + code = this.parseInt32(buffer); if(code === 3) { msg.name = 'authenticationCleartextPassword'; } return msg; } if(msg.length === 12) { - code = this.parseInt32(); + code = this.parseInt32(buffer); if(code === 5) { //md5 required msg.name = 'authenticationMD5Password'; msg.salt = new Buffer(4); @@ -441,85 +455,103 @@ Connection.prototype.parseR = function(msg) { }; Connection.prototype.parseS = function(msg) { - msg.parameterName = this.parseCString(); - msg.parameterValue = this.parseCString(); + var buffer = this.buffer; + msg.parameterName = this.parseCString(buffer); + msg.parameterValue = this.parseCString(buffer); return msg; }; Connection.prototype.parseK = function(msg) { - msg.processID = this.parseInt32(); - msg.secretKey = this.parseInt32(); + var buffer = this.buffer; + msg.processID = this.parseInt32(buffer); + msg.secretKey = this.parseInt32(buffer); return msg; }; Connection.prototype.parseC = function(msg) { - msg.text = this.parseCString(); + var buffer = this.buffer; + msg.text = this.parseCString(buffer); return msg; }; Connection.prototype.parseZ = function(msg) { - msg.status = this.readChar(); + var buffer = this.buffer; + msg.status = this.readString(buffer, 1); return msg; }; Connection.prototype.parseT = function(msg) { - msg.fieldCount = this.parseInt16(); + var buffer = this.buffer; + msg.fieldCount = this.parseInt16(buffer); var fields = []; for(var i = 0; i < msg.fieldCount; i++){ - fields.push(this.parseField()); + fields.push(this.parseField(buffer)); } msg.fields = fields; return msg; }; -Connection.prototype.parseField = function() { +Connection.prototype.parseField = function(buffer) { var field = { - name: this.parseCString(), - tableID: this.parseInt32(), - columnID: this.parseInt16(), - dataTypeID: this.parseInt32(), - dataTypeSize: this.parseInt16(), - dataTypeModifier: this.parseInt32(), + name: this.parseCString(buffer), + tableID: this.parseInt32(buffer), + columnID: this.parseInt16(buffer), + dataTypeID: this.parseInt32(buffer), + dataTypeSize: this.parseInt16(buffer), + dataTypeModifier: this.parseInt32(buffer), format: undefined }; - if(this.parseInt16() === TEXT_MODE) { + if(this.parseInt16(buffer) === TEXT_MODE) { this._mode = TEXT_MODE; field.format = 'text'; } else { this._mode = BINARY_MODE; + this.readField = this.readBytes; field.format = 'binary'; } return field; }; -Connection.prototype.parseD = function(msg) { - var fieldCount = this.parseInt16(); - var fields = []; +var Message = function(name, length) { + this.name = name; + this.length = length; +}; + +var DataRowMessage = function(name, length, fieldCount) { + this.name = name; + this.length = length; + this.fieldCount = fieldCount; + this.fields = []; +} + +Connection.prototype.parseD = function(buffer, length) { + var fieldCount = this.parseInt16(buffer); + var msg = new DataRowMessage('dataRow', length, fieldCount); for(var i = 0; i < fieldCount; i++) { - var length = this.parseInt32(); - var value = null; - if(length !== -1) { - if(this._mode === TEXT_MODE) { - value = this.readString(length); - } else { - value = this.readBytes(length); - } - } - fields.push(value); + var value = this._readValue(buffer); + msg.fields.push(value); } - msg.fieldCount = fieldCount; - msg.fields = fields; return msg; }; +Connection.prototype._readValue = function(buffer) { + var length = this.parseInt32(buffer); + if(length === -1) return null; + if(this._mode === TEXT_MODE) { + return this.readString(buffer, length); + } + return this.readBytes(buffer, length); +}; + //parses error Connection.prototype.parseE = function(input) { + var buffer = this.buffer; var fields = {}; var msg, item; - var fieldType = this.readString(1); + var fieldType = this.readString(buffer, 1); while(fieldType != '\0') { - fields[fieldType] = this.parseCString(); - fieldType = this.readString(1); + fields[fieldType] = this.parseCString(buffer); + fieldType = this.readString(buffer, 1); } if(input.name === 'error') { // the msg is an Error instance @@ -553,57 +585,56 @@ Connection.prototype.parseE = function(input) { Connection.prototype.parseN = Connection.prototype.parseE; Connection.prototype.parseA = function(msg) { - msg.processId = this.parseInt32(); - msg.channel = this.parseCString(); - msg.payload = this.parseCString(); + var buffer = this.buffer; + msg.processId = this.parseInt32(buffer); + msg.channel = this.parseCString(buffer); + msg.payload = this.parseCString(buffer); return msg; }; Connection.prototype.parseGH = function (msg) { + var buffer = this.buffer; var isBinary = this.buffer[this.offset] !== 0; this.offset++; msg.binary = isBinary; - var columnCount = this.parseInt16(); + var columnCount = this.parseInt16(buffer); msg.columnTypes = []; for(var i = 0; i