node-pg-copy-streams/index.js

69 lines
1.5 KiB
JavaScript
Raw Normal View History

2013-10-29 06:31:11 +08:00
var CopyToQueryStream = require('./copy-to')
module.exports = {
to: function(txt) {
return new CopyToQueryStream(txt)
},
from: function (txt) {
return new CopyStreamQuery(txt)
}
}
var Transform = require('stream').Transform
var util = require('util')
var CopyStreamQuery = function(text) {
Transform.call(this)
this.text = text
this._listeners = null
this._copyOutResponse = null
2013-10-29 10:50:45 +08:00
this.rowCount = 0
2013-10-29 06:31:11 +08:00
}
util.inherits(CopyStreamQuery, Transform)
CopyStreamQuery.prototype.submit = function(connection) {
this.connection = connection
connection.query(this.text)
}
var code = {
H: 72, //CopyOutResponse
d: 0x64, //CopyData
c: 0x63 //CopyDone
}
var copyDataBuffer = Buffer([code.d])
CopyStreamQuery.prototype._transform = function(chunk, enc, cb) {
this.push(copyDataBuffer)
var lenBuffer = Buffer(4)
lenBuffer.writeUInt32BE(chunk.length + 4, 0)
this.push(lenBuffer)
this.push(chunk)
2013-10-29 10:50:45 +08:00
this.rowCount++
2013-10-29 06:31:11 +08:00
cb()
}
CopyStreamQuery.prototype._flush = function(cb) {
var finBuffer = Buffer([code.c, 0, 0, 0, 4])
this.push(finBuffer)
//never call this callback, do not close underlying stream
//cb()
}
CopyStreamQuery.prototype.handleError = function(e) {
this.emit('error', e)
}
2013-12-10 00:40:59 +08:00
CopyStreamQuery.prototype.handleCopyInResponse = function(connection) {
2013-10-29 06:31:11 +08:00
this.pipe(connection.stream)
}
CopyStreamQuery.prototype.handleCommandComplete = function() {
this.unpipe()
this.emit('end')
}
CopyStreamQuery.prototype.handleReadyForQuery = function() {
}