Refactor message format codes handling (#45)
This commit is contained in:
parent
ee84aba89f
commit
ae5b344395
20
copy-to.js
20
copy-to.js
@ -4,6 +4,7 @@ module.exports = function(txt, options) {
|
||||
|
||||
var Transform = require('stream').Transform
|
||||
var util = require('util')
|
||||
var code = require('./message-formats')
|
||||
|
||||
var CopyStreamQuery = function(text, options) {
|
||||
Transform.call(this, options)
|
||||
@ -23,13 +24,6 @@ CopyStreamQuery.prototype.submit = function(connection) {
|
||||
connection.stream.pipe(this)
|
||||
}
|
||||
|
||||
var code = {
|
||||
E: 69, //Error
|
||||
H: 72, //CopyOutResponse
|
||||
d: 0x64, //CopyData
|
||||
c: 0x63 //CopyDone
|
||||
}
|
||||
|
||||
CopyStreamQuery.prototype._detach = function() {
|
||||
this.connection.stream.unpipe(this)
|
||||
// Unpipe can drop us out of flowing mode
|
||||
@ -44,13 +38,13 @@ CopyStreamQuery.prototype._transform = function(chunk, enc, cb) {
|
||||
}
|
||||
if(!this._copyOutResponse) {
|
||||
this._copyOutResponse = true
|
||||
if(chunk[0] == code.E) {
|
||||
if(chunk[0] == code.ErrorResponse) {
|
||||
this._detach()
|
||||
this.push(null)
|
||||
return cb();
|
||||
}
|
||||
if(chunk[0] != code.H) {
|
||||
this.emit('error', new Error('Expected copy out response'))
|
||||
if(chunk[0] != code.CopyOutResponse) {
|
||||
this.emit('error', new Error('Expected CopyOutResponse code (H)'))
|
||||
}
|
||||
var length = chunk.readUInt32BE(1)
|
||||
offset = 1
|
||||
@ -59,14 +53,14 @@ CopyStreamQuery.prototype._transform = function(chunk, enc, cb) {
|
||||
while((chunk.length - offset) > 5) {
|
||||
var messageCode = chunk[offset]
|
||||
//complete or error
|
||||
if(messageCode == code.c || messageCode == code.E) {
|
||||
if(messageCode == code.CopyDone || messageCode == code.ErrorResponse) {
|
||||
this._detach()
|
||||
this.push(null)
|
||||
return cb();
|
||||
}
|
||||
//something bad happened
|
||||
if(messageCode != code.d) {
|
||||
return this.emit('error', new Error('expected "d" (copydata message)'))
|
||||
if(messageCode != code.CopyData) {
|
||||
return this.emit('error', new Error('Expected CopyData code (d)'))
|
||||
}
|
||||
var length = chunk.readUInt32BE(offset + 1) - 4 //subtract length of UInt32
|
||||
//can we read the next row?
|
||||
|
10
index.js
10
index.js
@ -10,6 +10,7 @@ module.exports = {
|
||||
|
||||
var Transform = require('stream').Transform
|
||||
var util = require('util')
|
||||
var code = require('./message-formats')
|
||||
|
||||
var CopyStreamQuery = function(text, options) {
|
||||
Transform.call(this, options)
|
||||
@ -26,13 +27,8 @@ CopyStreamQuery.prototype.submit = function(connection) {
|
||||
connection.query(this.text)
|
||||
}
|
||||
|
||||
var code = {
|
||||
H: 72, //CopyOutResponse
|
||||
d: 0x64, //CopyData
|
||||
c: 0x63 //CopyDone
|
||||
}
|
||||
|
||||
var copyDataBuffer = Buffer([code.d])
|
||||
var copyDataBuffer = Buffer([code.CopyData])
|
||||
CopyStreamQuery.prototype._transform = function(chunk, enc, cb) {
|
||||
this.push(copyDataBuffer)
|
||||
var lenBuffer = Buffer(4)
|
||||
@ -43,7 +39,7 @@ CopyStreamQuery.prototype._transform = function(chunk, enc, cb) {
|
||||
}
|
||||
|
||||
CopyStreamQuery.prototype._flush = function(cb) {
|
||||
var finBuffer = Buffer([code.c, 0, 0, 0, 4])
|
||||
var finBuffer = Buffer([code.CopyDone, 0, 0, 0, 4])
|
||||
this.push(finBuffer)
|
||||
cb()
|
||||
}
|
||||
|
17
message-formats.js
Normal file
17
message-formats.js
Normal file
@ -0,0 +1,17 @@
|
||||
/**
|
||||
* The COPY feature uses the following protocol codes.
|
||||
* The codes for the most recent protocol version are documented on
|
||||
* https://www.postgresql.org/docs/current/static/protocol-message-formats.html
|
||||
*
|
||||
* The protocol flow itself is described on
|
||||
* https://www.postgresql.org/docs/current/static/protocol-flow.html
|
||||
*/
|
||||
module.exports = {
|
||||
ErrorResponse: 0x45,
|
||||
CopyInResponse: 0x47,
|
||||
CopyOutResponse: 0x48,
|
||||
CopyBothResponse: 0x57,
|
||||
CopyData: 0x64,
|
||||
CopyDone: 0x63,
|
||||
CopyFail: 0x66
|
||||
}
|
Loading…
Reference in New Issue
Block a user