node-pg-copy-streams/copy-to.js

117 lines
2.9 KiB
JavaScript
Raw Normal View History

module.exports = function(txt, options) {
return new CopyStreamQuery(txt, options)
2013-10-29 06:31:11 +08:00
}
var Transform = require('stream').Transform
var util = require('util')
var code = require('./message-formats')
2013-10-29 06:31:11 +08:00
var CopyStreamQuery = function(text, options) {
Transform.call(this, options)
2013-10-29 06:31:11 +08:00
this.text = text
this._gotCopyOutResponse = false
2013-10-29 10:50:45 +08:00
this.rowCount = 0
2013-10-29 06:31:11 +08:00
}
util.inherits(CopyStreamQuery, Transform)
var eventTypes = ['close', 'data', 'end', 'error']
2013-10-29 06:31:11 +08:00
CopyStreamQuery.prototype.submit = function(connection) {
connection.query(this.text)
this.connection = connection
this.connection.removeAllListeners('copyData')
2013-10-29 06:31:11 +08:00
connection.stream.pipe(this)
}
CopyStreamQuery.prototype._detach = function() {
this.connection.stream.unpipe(this)
// Unpipe can drop us out of flowing mode
this.connection.stream.resume()
2013-10-29 06:31:11 +08:00
}
CopyStreamQuery.prototype._transform = function(chunk, enc, cb) {
var offset = 0
var Byte1Len = 1;
var Int32Len = 4;
2013-10-29 06:31:11 +08:00
if(this._remainder && chunk) {
chunk = Buffer.concat([this._remainder, chunk])
}
var length;
var messageCode;
var needPush = false;
while((chunk.length - offset) >= (Byte1Len + Int32Len)) {
2013-10-29 06:31:11 +08:00
var messageCode = chunk[offset]
//console.log('PostgreSQL message ' + String.fromCharCode(messageCode))
switch(messageCode) {
// detect COPY start
case code.CopyOutResponse:
if (!this._gotCopyOutResponse) {
this._gotCopyOutResponse = true
} else {
this.emit('error', new Error('Unexpected CopyOutResponse message (H)'))
}
break;
// meaningful row
case code.CopyData:
needPush = true;
break;
// standard interspersed messages. discard
case code.ParameterStatus:
case code.NoticeResponse:
case code.NotificationResponse:
break;
case code.ErrorResponse:
case code.CopyDone:
this._detach()
this.push(null)
return cb();
break;
default:
this.emit('error', new Error('Unexpected PostgreSQL message ' + String.fromCharCode(messageCode)))
2013-10-29 06:31:11 +08:00
}
length = chunk.readUInt32BE(offset+Byte1Len)
if(chunk.length >= (offset + Byte1Len + length)) {
offset += Byte1Len + Int32Len
if (needPush) {
var row = chunk.slice(offset, offset + length - Int32Len)
this.rowCount++
this.push(row)
}
offset += (length - Int32Len)
2013-10-29 06:31:11 +08:00
} else {
// we need more chunks for a complete message
2013-10-29 06:31:11 +08:00
break;
}
}
2013-10-29 06:31:11 +08:00
if(chunk.length - offset) {
var slice = chunk.slice(offset)
this._remainder = slice
} else {
this._remainder = false
}
cb()
}
CopyStreamQuery.prototype.handleError = function(e) {
this.emit('error', e)
}
CopyStreamQuery.prototype.handleCopyData = function(chunk) {
}
2013-10-29 06:31:11 +08:00
CopyStreamQuery.prototype.handleCommandComplete = function() {
}
CopyStreamQuery.prototype.handleReadyForQuery = function() {
}