commit d3d648a623c148bfdec42d10df950caccf6a64e3 Author: Brian M. Carlson Date: Mon Oct 28 17:31:11 2013 -0500 Initial commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..c2658d7 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +node_modules/ diff --git a/copy-to.js b/copy-to.js new file mode 100644 index 0000000..fab8b58 --- /dev/null +++ b/copy-to.js @@ -0,0 +1,105 @@ +module.exports = function(txt) { + return new CopyStreamQuery(txt) +} + +var Transform = require('stream').Transform +var util = require('util') + +var CopyStreamQuery = function(text) { + Transform.call(this) + this.text = text + this._listeners = null + this._copyOutResponse = null + this.rowsRead = 0 +} + +util.inherits(CopyStreamQuery, Transform) + +CopyStreamQuery.prototype.submit = function(connection) { + console.log('submitting') + connection.query(this.text) + this.connection = connection + this._listeners = connection.stream.listeners('data') + connection.stream.removeAllListeners('data') + connection.stream.pipe(this) +} + +var code = { + E: 69, //Error + H: 72, //CopyOutResponse + d: 0x64, //CopyData + c: 0x63 //CopyDone +} + +CopyStreamQuery.prototype._detach = function() { + this.connection.stream.unpipe() + this.connection.stream.removeAllListeners('data') + var self = this + this._listeners.forEach(function(listener) { + self.connection.stream.on('data', listener) + }) +} + +CopyStreamQuery.prototype._transform = function(chunk, enc, cb) { + var offset = 0 + if(this._remainder && chunk) { + chunk = Buffer.concat([this._remainder, chunk]) + } + if(!this._copyOutResponse) { + this._copyOutResponse = true + if(chunk[0] == code.E) { + this._detach() + this.connection.stream.unshift(chunk) + this.push(null) + return cb(); + } + if(chunk[0] != code.H) { + this.emit('error', new Error('Expected copy out response')) + } + var length = chunk.readUInt32BE(1) + offset = 1 + offset += length + } + while((chunk.length - offset) > 5) { + var messageCode = chunk[offset] + //complete + if(messageCode == code.c) { + this._detach() + this.connection.stream.unshift(chunk.slice(offset + 5)) + this.push(null) + return cb(); + } + //something bad happened + if(messageCode != code.d) { + return this.emit('error', new Error('expected "d" (copydata message)')) + } + var length = chunk.readUInt32BE(offset + 1) - 4 //subtract length of UInt32 + //can we read the next row? + if(chunk.length > (offset + length + 5)) { + offset += 5 + var slice = chunk.slice(offset, offset + length) + offset += length + this.rowsRead++ + this.push(slice) + } else { + break; + } + } + if(chunk.length - offset) { + var slice = chunk.slice(offset) + this._remainder = slice + } else { + this._remainder = false + } + cb() +} + +CopyStreamQuery.prototype.handleError = function(e) { + this.emit('error', e) +} + +CopyStreamQuery.prototype.handleCommandComplete = function() { +} + +CopyStreamQuery.prototype.handleReadyForQuery = function() { +} diff --git a/index.js b/index.js new file mode 100644 index 0000000..34742bc --- /dev/null +++ b/index.js @@ -0,0 +1,66 @@ +var CopyToQueryStream = require('./copy-to') + +module.exports = { + to: function(txt) { + return new CopyToQueryStream(txt) + }, + from: function (txt) { + return new CopyStreamQuery(txt) + } +} + +var Transform = require('stream').Transform +var util = require('util') + +var CopyStreamQuery = function(text) { + Transform.call(this) + this.text = text + this._listeners = null + this._copyOutResponse = null +} + +util.inherits(CopyStreamQuery, Transform) + +CopyStreamQuery.prototype.submit = function(connection) { + this.connection = connection + connection.query(this.text) +} + +var code = { + H: 72, //CopyOutResponse + d: 0x64, //CopyData + c: 0x63 //CopyDone +} + +var copyDataBuffer = Buffer([code.d]) +CopyStreamQuery.prototype._transform = function(chunk, enc, cb) { + this.push(copyDataBuffer) + var lenBuffer = Buffer(4) + lenBuffer.writeUInt32BE(chunk.length + 4, 0) + this.push(lenBuffer) + this.push(chunk) + cb() +} + +CopyStreamQuery.prototype._flush = function(cb) { + var finBuffer = Buffer([code.c, 0, 0, 0, 4]) + this.push(finBuffer) + //never call this callback, do not close underlying stream + //cb() +} + +CopyStreamQuery.prototype.handleError = function(e) { + this.emit('error', e) +} + +CopyStreamQuery.prototype.streamData = function(connection) { + this.pipe(connection.stream) +} + +CopyStreamQuery.prototype.handleCommandComplete = function() { + this.unpipe() + this.emit('end') +} + +CopyStreamQuery.prototype.handleReadyForQuery = function() { +} diff --git a/package.json b/package.json new file mode 100644 index 0000000..f0732eb --- /dev/null +++ b/package.json @@ -0,0 +1,32 @@ +{ + "name": "pg-copy-streams", + "version": "0.0.0", + "description": "Low-Level COPY TO and COPY FROM streams for PostgreSQL in JavaScript using", + "main": "index.js", + "scripts": { + "test": "node test" + }, + "repository": { + "type": "git", + "url": "git://github.com/brianc/node-pg-copy-streams.git" + }, + "keywords": [ + "postgres", + "copy", + "in", + "out", + "stream" + ], + "author": "Brian M. Carlson", + "license": "MIT", + "bugs": { + "url": "https://github.com/brianc/node-pg-copy-streams/issues" + }, + "devDependencies": { + "pg.js": "~2.8.1", + "concat-stream": "~1.1.0", + "gonna": "0.0.0", + "lodash": "~2.2.1", + "heroku-env": "~0.1.1" + } +} diff --git a/test/copy-from.js b/test/copy-from.js new file mode 100644 index 0000000..9f0662e --- /dev/null +++ b/test/copy-from.js @@ -0,0 +1,45 @@ +var assert = require('assert') +var gonna = require('gonna') + +var concat = require('concat-stream') +var _ = require('lodash') +var pg = require('pg.js') + +var testRange = function(top) { + var client = function() { + var client = new pg.Client() + client.connect() + client.query('CREATE TEMP TABLE numbers(num int, bigger_num int)') + return client + } + + var fromClient = client() + var copy = require('../').from + + + var txt = 'COPY numbers FROM STDIN' + + var stream = fromClient.query(copy(txt)) + for(var i = 0; i < top; i++) { + stream.write(Buffer('' + i + '\t' + i*10 + '\n')) + } + stream.end() + var countDone = gonna('have correct count') + stream.on('end', function() { + fromClient.query('SELECT COUNT(*) FROM numbers', function(err, res) { + assert.ifError(err) + assert.equal(res.rows[0].count, top, 'expected ' + top + ' rows but got ' + res.rows[0].count) + console.log('found ', res.rows.length, 'rows') + countDone() + var firstRowDone = gonna('have correct result') + fromClient.query('SELECT (max(num)) AS num FROM numbers', function(err, res) { + assert.ifError(err) + assert.equal(res.rows[0].num, top-1) + firstRowDone() + fromClient.end() + }) + }) + }) +} + +testRange(1000) diff --git a/test/copy-to.js b/test/copy-to.js new file mode 100644 index 0000000..5c1bb4f --- /dev/null +++ b/test/copy-to.js @@ -0,0 +1,33 @@ +var assert = require('assert') +var gonna = require('gonna') + +var concat = require('concat-stream') +var _ = require('lodash') +var pg = require('pg.js') + +var testRange = function(top) { + var client = function() { + var client = new pg.Client() + client.connect() + return client + } + + var fromClient = client() + var copy = require('../').to + + var txt = 'COPY (SELECT * from generate_series(0, ' + top + ')) TO STDOUT' + + var stream = fromClient.query(copy(txt)) + var done = gonna('finish piping out', 1000, function() { + fromClient.end() + }) + + stream.pipe(concat(function(buf) { + var res = buf.toString('utf8') + var expected = _.range(0, top+1).join('\n') + '\n' + assert.equal(res, expected) + done() + })) +} + +testRange(10000) diff --git a/test/index.js b/test/index.js new file mode 100644 index 0000000..21c5ea9 --- /dev/null +++ b/test/index.js @@ -0,0 +1,2 @@ +require('./copy-from') +require('./copy-to')