diff --git a/copy-to.js b/copy-to.js index 6eefc36..716a072 100644 --- a/copy-to.js +++ b/copy-to.js @@ -9,7 +9,7 @@ var code = require('./message-formats') var CopyStreamQuery = function(text, options) { Transform.call(this, options) this.text = text - this._copyOutResponse = null + this._gotCopyOutResponse = false this.rowCount = 0 } @@ -30,50 +30,69 @@ CopyStreamQuery.prototype._detach = function() { this.connection.stream.resume() } - CopyStreamQuery.prototype._transform = function(chunk, enc, cb) { var offset = 0 + var Byte1Len = 1; + var Int32Len = 4; if(this._remainder && chunk) { chunk = Buffer.concat([this._remainder, chunk]) } - if(!this._copyOutResponse) { - this._copyOutResponse = true - if(chunk[0] == code.ErrorResponse) { - this._detach() - this.push(null) - return cb(); - } - if(chunk[0] != code.CopyOutResponse) { - this.emit('error', new Error('Expected CopyOutResponse code (H)')) - } - var length = chunk.readUInt32BE(1) - offset = 1 - offset += length - } - while((chunk.length - offset) > 5) { + + var length; + var messageCode; + var needPush = false; + + while((chunk.length - offset) > (Byte1Len + Int32Len)) { var messageCode = chunk[offset] - //complete or error - if(messageCode == code.CopyDone || messageCode == code.ErrorResponse) { - this._detach() - this.push(null) - return cb(); - } - //something bad happened - if(messageCode != code.CopyData) { - return this.emit('error', new Error('Expected CopyData code (d)')) + + //console.log(c, w, offset, 'PostgreSQL message ' + String.fromCharCode(messageCode)) + switch(messageCode) { + + // detect COPY start + case code.CopyOutResponse: + if (!this._gotCopyOutResponse) { + this._gotCopyOutResponse = true + } else { + this.emit('error', new Error('Unexpected CopyOutResponse message (H)')) + } + break; + + // meaningful row + case code.CopyData: + needPush = true; + break; + + // standard interspersed messages. discard + case code.ParameterStatus: + case code.NoticeResponse: + case code.NotificationResponse: + break; + + case code.ErrorResponse: + case code.CopyDone: + this._detach() + this.push(null) + return cb(); + break; + default: + this.emit('error', new Error('Unexpected PostgreSQL message ' + String.fromCharCode(messageCode))) } - var length = chunk.readUInt32BE(offset + 1) - 4 //subtract length of UInt32 - //can we read the next row? - if(chunk.length > (offset + length + 5)) { - offset += 5 - var slice = chunk.slice(offset, offset + length) - offset += length - this.push(slice) - this.rowCount++ + + length = chunk.readUInt32BE(offset+Byte1Len) + if(chunk.length > (offset + Byte1Len + length)) { + offset += Byte1Len + Int32Len + if (needPush) { + var row = chunk.slice(offset, offset + length - Int32Len) + this.rowCount++ + this.push(row) + } + offset += (length - Int32Len) } else { + // we need more chunks for a complete message break; } } + if(chunk.length - offset) { var slice = chunk.slice(offset) this._remainder = slice diff --git a/index.js b/index.js index 25e02c2..d6f0399 100644 --- a/index.js +++ b/index.js @@ -30,16 +30,18 @@ CopyStreamQuery.prototype.submit = function(connection) { var copyDataBuffer = Buffer([code.CopyData]) CopyStreamQuery.prototype._transform = function(chunk, enc, cb) { + var Int32Len = 4; this.push(copyDataBuffer) - var lenBuffer = Buffer(4) - lenBuffer.writeUInt32BE(chunk.length + 4, 0) + var lenBuffer = Buffer(Int32Len) + lenBuffer.writeUInt32BE(chunk.length + Int32Len, 0) this.push(lenBuffer) this.push(chunk) cb() } CopyStreamQuery.prototype._flush = function(cb) { - var finBuffer = Buffer([code.CopyDone, 0, 0, 0, 4]) + var Int32Len = 4; + var finBuffer = Buffer([code.CopyDone, 0, 0, 0, Int32Len]) this.push(finBuffer) cb() } diff --git a/message-formats.js b/message-formats.js index 9827498..41325cc 100644 --- a/message-formats.js +++ b/message-formats.js @@ -7,11 +7,19 @@ * https://www.postgresql.org/docs/current/static/protocol-flow.html */ module.exports = { - ErrorResponse: 0x45, - CopyInResponse: 0x47, - CopyOutResponse: 0x48, - CopyBothResponse: 0x57, - CopyData: 0x64, - CopyDone: 0x63, - CopyFail: 0x66 + ErrorResponse: 0x45, // E + CopyInResponse: 0x47, // G + CopyOutResponse: 0x48, // H + CopyBothResponse: 0x57, // W + CopyDone: 0x63, // c + CopyData: 0x64, // d + CopyFail: 0x66, // f + + // It is possible for NoticeResponse and ParameterStatus messages to be interspersed between CopyData messages; + // frontends must handle these cases, and should be prepared for other asynchronous message types as well + // (see Section 50.2.6). + // Otherwise, any message type other than CopyData or CopyDone may be treated as terminating copy-out mode. + NotificationResponse: 0x41, // A + NoticeResponse: 0x4E, // N + ParameterStatus: 0x53 // S }