Fix NoticeResponse test and handle other messages (#52)

This commit is contained in:
jeromew 2016-07-28 23:13:22 +02:00 committed by Brian C
parent f155899570
commit 2a4db2920e
3 changed files with 73 additions and 44 deletions

View File

@ -9,7 +9,7 @@ var code = require('./message-formats')
var CopyStreamQuery = function(text, options) { var CopyStreamQuery = function(text, options) {
Transform.call(this, options) Transform.call(this, options)
this.text = text this.text = text
this._copyOutResponse = null this._gotCopyOutResponse = false
this.rowCount = 0 this.rowCount = 0
} }
@ -30,50 +30,69 @@ CopyStreamQuery.prototype._detach = function() {
this.connection.stream.resume() this.connection.stream.resume()
} }
CopyStreamQuery.prototype._transform = function(chunk, enc, cb) { CopyStreamQuery.prototype._transform = function(chunk, enc, cb) {
var offset = 0 var offset = 0
var Byte1Len = 1;
var Int32Len = 4;
if(this._remainder && chunk) { if(this._remainder && chunk) {
chunk = Buffer.concat([this._remainder, chunk]) chunk = Buffer.concat([this._remainder, chunk])
} }
if(!this._copyOutResponse) {
this._copyOutResponse = true var length;
if(chunk[0] == code.ErrorResponse) { var messageCode;
this._detach() var needPush = false;
this.push(null)
return cb(); while((chunk.length - offset) > (Byte1Len + Int32Len)) {
}
if(chunk[0] != code.CopyOutResponse) {
this.emit('error', new Error('Expected CopyOutResponse code (H)'))
}
var length = chunk.readUInt32BE(1)
offset = 1
offset += length
}
while((chunk.length - offset) > 5) {
var messageCode = chunk[offset] var messageCode = chunk[offset]
//complete or error
if(messageCode == code.CopyDone || messageCode == code.ErrorResponse) { //console.log(c, w, offset, 'PostgreSQL message ' + String.fromCharCode(messageCode))
switch(messageCode) {
// detect COPY start
case code.CopyOutResponse:
if (!this._gotCopyOutResponse) {
this._gotCopyOutResponse = true
} else {
this.emit('error', new Error('Unexpected CopyOutResponse message (H)'))
}
break;
// meaningful row
case code.CopyData:
needPush = true;
break;
// standard interspersed messages. discard
case code.ParameterStatus:
case code.NoticeResponse:
case code.NotificationResponse:
break;
case code.ErrorResponse:
case code.CopyDone:
this._detach() this._detach()
this.push(null) this.push(null)
return cb(); return cb();
break;
default:
this.emit('error', new Error('Unexpected PostgreSQL message ' + String.fromCharCode(messageCode)))
} }
//something bad happened
if(messageCode != code.CopyData) { length = chunk.readUInt32BE(offset+Byte1Len)
return this.emit('error', new Error('Expected CopyData code (d)')) if(chunk.length > (offset + Byte1Len + length)) {
} offset += Byte1Len + Int32Len
var length = chunk.readUInt32BE(offset + 1) - 4 //subtract length of UInt32 if (needPush) {
//can we read the next row? var row = chunk.slice(offset, offset + length - Int32Len)
if(chunk.length > (offset + length + 5)) {
offset += 5
var slice = chunk.slice(offset, offset + length)
offset += length
this.push(slice)
this.rowCount++ this.rowCount++
this.push(row)
}
offset += (length - Int32Len)
} else { } else {
// we need more chunks for a complete message
break; break;
} }
} }
if(chunk.length - offset) { if(chunk.length - offset) {
var slice = chunk.slice(offset) var slice = chunk.slice(offset)
this._remainder = slice this._remainder = slice

View File

@ -30,16 +30,18 @@ CopyStreamQuery.prototype.submit = function(connection) {
var copyDataBuffer = Buffer([code.CopyData]) var copyDataBuffer = Buffer([code.CopyData])
CopyStreamQuery.prototype._transform = function(chunk, enc, cb) { CopyStreamQuery.prototype._transform = function(chunk, enc, cb) {
var Int32Len = 4;
this.push(copyDataBuffer) this.push(copyDataBuffer)
var lenBuffer = Buffer(4) var lenBuffer = Buffer(Int32Len)
lenBuffer.writeUInt32BE(chunk.length + 4, 0) lenBuffer.writeUInt32BE(chunk.length + Int32Len, 0)
this.push(lenBuffer) this.push(lenBuffer)
this.push(chunk) this.push(chunk)
cb() cb()
} }
CopyStreamQuery.prototype._flush = function(cb) { CopyStreamQuery.prototype._flush = function(cb) {
var finBuffer = Buffer([code.CopyDone, 0, 0, 0, 4]) var Int32Len = 4;
var finBuffer = Buffer([code.CopyDone, 0, 0, 0, Int32Len])
this.push(finBuffer) this.push(finBuffer)
cb() cb()
} }

View File

@ -7,11 +7,19 @@
* https://www.postgresql.org/docs/current/static/protocol-flow.html * https://www.postgresql.org/docs/current/static/protocol-flow.html
*/ */
module.exports = { module.exports = {
ErrorResponse: 0x45, ErrorResponse: 0x45, // E
CopyInResponse: 0x47, CopyInResponse: 0x47, // G
CopyOutResponse: 0x48, CopyOutResponse: 0x48, // H
CopyBothResponse: 0x57, CopyBothResponse: 0x57, // W
CopyData: 0x64, CopyDone: 0x63, // c
CopyDone: 0x63, CopyData: 0x64, // d
CopyFail: 0x66 CopyFail: 0x66, // f
// It is possible for NoticeResponse and ParameterStatus messages to be interspersed between CopyData messages;
// frontends must handle these cases, and should be prepared for other asynchronous message types as well
// (see Section 50.2.6).
// Otherwise, any message type other than CopyData or CopyDone may be treated as terminating copy-out mode.
NotificationResponse: 0x41, // A
NoticeResponse: 0x4E, // N
ParameterStatus: 0x53 // S
} }