Emit 'row' events on streams for progress tracking

This commit is contained in:
Brian M. Carlson 2013-10-28 21:27:15 -05:00
parent d3d648a623
commit c9c843fd07
4 changed files with 14 additions and 5 deletions

View File

@ -10,13 +10,11 @@ var CopyStreamQuery = function(text) {
this.text = text this.text = text
this._listeners = null this._listeners = null
this._copyOutResponse = null this._copyOutResponse = null
this.rowsRead = 0
} }
util.inherits(CopyStreamQuery, Transform) util.inherits(CopyStreamQuery, Transform)
CopyStreamQuery.prototype.submit = function(connection) { CopyStreamQuery.prototype.submit = function(connection) {
console.log('submitting')
connection.query(this.text) connection.query(this.text)
this.connection = connection this.connection = connection
this._listeners = connection.stream.listeners('data') this._listeners = connection.stream.listeners('data')
@ -79,8 +77,8 @@ CopyStreamQuery.prototype._transform = function(chunk, enc, cb) {
offset += 5 offset += 5
var slice = chunk.slice(offset, offset + length) var slice = chunk.slice(offset, offset + length)
offset += length offset += length
this.rowsRead++
this.push(slice) this.push(slice)
this.emit('row')
} else { } else {
break; break;
} }

View File

@ -39,6 +39,7 @@ CopyStreamQuery.prototype._transform = function(chunk, enc, cb) {
lenBuffer.writeUInt32BE(chunk.length + 4, 0) lenBuffer.writeUInt32BE(chunk.length + 4, 0)
this.push(lenBuffer) this.push(lenBuffer)
this.push(chunk) this.push(chunk)
this.emit('row')
cb() cb()
} }

View File

@ -20,6 +20,10 @@ var testRange = function(top) {
var txt = 'COPY numbers FROM STDIN' var txt = 'COPY numbers FROM STDIN'
var stream = fromClient.query(copy(txt)) var stream = fromClient.query(copy(txt))
var rowEmitCount = 0
stream.on('row', function() {
rowEmitCount++
})
for(var i = 0; i < top; i++) { for(var i = 0; i < top; i++) {
stream.write(Buffer('' + i + '\t' + i*10 + '\n')) stream.write(Buffer('' + i + '\t' + i*10 + '\n'))
} }
@ -32,6 +36,7 @@ var testRange = function(top) {
console.log('found ', res.rows.length, 'rows') console.log('found ', res.rows.length, 'rows')
countDone() countDone()
var firstRowDone = gonna('have correct result') var firstRowDone = gonna('have correct result')
assert.equal(rowEmitCount, top, 'should have emitted "row" event ' + top + ' times')
fromClient.query('SELECT (max(num)) AS num FROM numbers', function(err, res) { fromClient.query('SELECT (max(num)) AS num FROM numbers', function(err, res) {
assert.ifError(err) assert.ifError(err)
assert.equal(res.rows[0].num, top-1) assert.equal(res.rows[0].num, top-1)

View File

@ -15,17 +15,22 @@ var testRange = function(top) {
var fromClient = client() var fromClient = client()
var copy = require('../').to var copy = require('../').to
var txt = 'COPY (SELECT * from generate_series(0, ' + top + ')) TO STDOUT' var txt = 'COPY (SELECT * from generate_series(0, ' + (top - 1) + ')) TO STDOUT'
var stream = fromClient.query(copy(txt)) var stream = fromClient.query(copy(txt))
var rowEmitCount = 0
stream.on('row', function() {
rowEmitCount++
})
var done = gonna('finish piping out', 1000, function() { var done = gonna('finish piping out', 1000, function() {
fromClient.end() fromClient.end()
}) })
stream.pipe(concat(function(buf) { stream.pipe(concat(function(buf) {
var res = buf.toString('utf8') var res = buf.toString('utf8')
var expected = _.range(0, top+1).join('\n') + '\n' var expected = _.range(0, top).join('\n') + '\n'
assert.equal(res, expected) assert.equal(res, expected)
assert.equal(rowEmitCount, top, 'should have emitted "row" ' + top + ' times but got ' + rowEmitCount)
done() done()
})) }))
} }