Initial commit

This commit is contained in:
Brian M. Carlson 2013-10-28 17:31:11 -05:00
commit d3d648a623
7 changed files with 284 additions and 0 deletions

1
.gitignore vendored Normal file
View File

@ -0,0 +1 @@
node_modules/

105
copy-to.js Normal file
View File

@ -0,0 +1,105 @@
module.exports = function(txt) {
return new CopyStreamQuery(txt)
}
var Transform = require('stream').Transform
var util = require('util')
var CopyStreamQuery = function(text) {
Transform.call(this)
this.text = text
this._listeners = null
this._copyOutResponse = null
this.rowsRead = 0
}
util.inherits(CopyStreamQuery, Transform)
CopyStreamQuery.prototype.submit = function(connection) {
console.log('submitting')
connection.query(this.text)
this.connection = connection
this._listeners = connection.stream.listeners('data')
connection.stream.removeAllListeners('data')
connection.stream.pipe(this)
}
var code = {
E: 69, //Error
H: 72, //CopyOutResponse
d: 0x64, //CopyData
c: 0x63 //CopyDone
}
CopyStreamQuery.prototype._detach = function() {
this.connection.stream.unpipe()
this.connection.stream.removeAllListeners('data')
var self = this
this._listeners.forEach(function(listener) {
self.connection.stream.on('data', listener)
})
}
CopyStreamQuery.prototype._transform = function(chunk, enc, cb) {
var offset = 0
if(this._remainder && chunk) {
chunk = Buffer.concat([this._remainder, chunk])
}
if(!this._copyOutResponse) {
this._copyOutResponse = true
if(chunk[0] == code.E) {
this._detach()
this.connection.stream.unshift(chunk)
this.push(null)
return cb();
}
if(chunk[0] != code.H) {
this.emit('error', new Error('Expected copy out response'))
}
var length = chunk.readUInt32BE(1)
offset = 1
offset += length
}
while((chunk.length - offset) > 5) {
var messageCode = chunk[offset]
//complete
if(messageCode == code.c) {
this._detach()
this.connection.stream.unshift(chunk.slice(offset + 5))
this.push(null)
return cb();
}
//something bad happened
if(messageCode != code.d) {
return this.emit('error', new Error('expected "d" (copydata message)'))
}
var length = chunk.readUInt32BE(offset + 1) - 4 //subtract length of UInt32
//can we read the next row?
if(chunk.length > (offset + length + 5)) {
offset += 5
var slice = chunk.slice(offset, offset + length)
offset += length
this.rowsRead++
this.push(slice)
} else {
break;
}
}
if(chunk.length - offset) {
var slice = chunk.slice(offset)
this._remainder = slice
} else {
this._remainder = false
}
cb()
}
CopyStreamQuery.prototype.handleError = function(e) {
this.emit('error', e)
}
CopyStreamQuery.prototype.handleCommandComplete = function() {
}
CopyStreamQuery.prototype.handleReadyForQuery = function() {
}

66
index.js Normal file
View File

@ -0,0 +1,66 @@
var CopyToQueryStream = require('./copy-to')
module.exports = {
to: function(txt) {
return new CopyToQueryStream(txt)
},
from: function (txt) {
return new CopyStreamQuery(txt)
}
}
var Transform = require('stream').Transform
var util = require('util')
var CopyStreamQuery = function(text) {
Transform.call(this)
this.text = text
this._listeners = null
this._copyOutResponse = null
}
util.inherits(CopyStreamQuery, Transform)
CopyStreamQuery.prototype.submit = function(connection) {
this.connection = connection
connection.query(this.text)
}
var code = {
H: 72, //CopyOutResponse
d: 0x64, //CopyData
c: 0x63 //CopyDone
}
var copyDataBuffer = Buffer([code.d])
CopyStreamQuery.prototype._transform = function(chunk, enc, cb) {
this.push(copyDataBuffer)
var lenBuffer = Buffer(4)
lenBuffer.writeUInt32BE(chunk.length + 4, 0)
this.push(lenBuffer)
this.push(chunk)
cb()
}
CopyStreamQuery.prototype._flush = function(cb) {
var finBuffer = Buffer([code.c, 0, 0, 0, 4])
this.push(finBuffer)
//never call this callback, do not close underlying stream
//cb()
}
CopyStreamQuery.prototype.handleError = function(e) {
this.emit('error', e)
}
CopyStreamQuery.prototype.streamData = function(connection) {
this.pipe(connection.stream)
}
CopyStreamQuery.prototype.handleCommandComplete = function() {
this.unpipe()
this.emit('end')
}
CopyStreamQuery.prototype.handleReadyForQuery = function() {
}

32
package.json Normal file
View File

@ -0,0 +1,32 @@
{
"name": "pg-copy-streams",
"version": "0.0.0",
"description": "Low-Level COPY TO and COPY FROM streams for PostgreSQL in JavaScript using",
"main": "index.js",
"scripts": {
"test": "node test"
},
"repository": {
"type": "git",
"url": "git://github.com/brianc/node-pg-copy-streams.git"
},
"keywords": [
"postgres",
"copy",
"in",
"out",
"stream"
],
"author": "Brian M. Carlson",
"license": "MIT",
"bugs": {
"url": "https://github.com/brianc/node-pg-copy-streams/issues"
},
"devDependencies": {
"pg.js": "~2.8.1",
"concat-stream": "~1.1.0",
"gonna": "0.0.0",
"lodash": "~2.2.1",
"heroku-env": "~0.1.1"
}
}

45
test/copy-from.js Normal file
View File

@ -0,0 +1,45 @@
var assert = require('assert')
var gonna = require('gonna')
var concat = require('concat-stream')
var _ = require('lodash')
var pg = require('pg.js')
var testRange = function(top) {
var client = function() {
var client = new pg.Client()
client.connect()
client.query('CREATE TEMP TABLE numbers(num int, bigger_num int)')
return client
}
var fromClient = client()
var copy = require('../').from
var txt = 'COPY numbers FROM STDIN'
var stream = fromClient.query(copy(txt))
for(var i = 0; i < top; i++) {
stream.write(Buffer('' + i + '\t' + i*10 + '\n'))
}
stream.end()
var countDone = gonna('have correct count')
stream.on('end', function() {
fromClient.query('SELECT COUNT(*) FROM numbers', function(err, res) {
assert.ifError(err)
assert.equal(res.rows[0].count, top, 'expected ' + top + ' rows but got ' + res.rows[0].count)
console.log('found ', res.rows.length, 'rows')
countDone()
var firstRowDone = gonna('have correct result')
fromClient.query('SELECT (max(num)) AS num FROM numbers', function(err, res) {
assert.ifError(err)
assert.equal(res.rows[0].num, top-1)
firstRowDone()
fromClient.end()
})
})
})
}
testRange(1000)

33
test/copy-to.js Normal file
View File

@ -0,0 +1,33 @@
var assert = require('assert')
var gonna = require('gonna')
var concat = require('concat-stream')
var _ = require('lodash')
var pg = require('pg.js')
var testRange = function(top) {
var client = function() {
var client = new pg.Client()
client.connect()
return client
}
var fromClient = client()
var copy = require('../').to
var txt = 'COPY (SELECT * from generate_series(0, ' + top + ')) TO STDOUT'
var stream = fromClient.query(copy(txt))
var done = gonna('finish piping out', 1000, function() {
fromClient.end()
})
stream.pipe(concat(function(buf) {
var res = buf.toString('utf8')
var expected = _.range(0, top+1).join('\n') + '\n'
assert.equal(res, expected)
done()
}))
}
testRange(10000)

2
test/index.js Normal file
View File

@ -0,0 +1,2 @@
require('./copy-from')
require('./copy-to')