Improve copy-to overall performance by keeping the in-out chunk balance (ported from CartoDB's fork)
This commit is contained in:
parent
1c3271ad06
commit
f01c278615
18
copy-to.js
18
copy-to.js
@ -44,6 +44,16 @@ CopyStreamQuery.prototype._transform = function(chunk, enc, cb) {
|
|||||||
var messageCode;
|
var messageCode;
|
||||||
var needPush = false;
|
var needPush = false;
|
||||||
|
|
||||||
|
var buffer = Buffer.alloc(chunk.length);
|
||||||
|
var buffer_offset = 0;
|
||||||
|
|
||||||
|
this.pushBufferIfneeded = function() {
|
||||||
|
if (needPush && buffer_offset > 0) {
|
||||||
|
this.push(buffer.slice(0, buffer_offset))
|
||||||
|
buffer_offset = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
while((chunk.length - offset) >= (Byte1Len + Int32Len)) {
|
while((chunk.length - offset) >= (Byte1Len + Int32Len)) {
|
||||||
var messageCode = chunk[offset]
|
var messageCode = chunk[offset]
|
||||||
|
|
||||||
@ -69,9 +79,10 @@ CopyStreamQuery.prototype._transform = function(chunk, enc, cb) {
|
|||||||
case code.NoticeResponse:
|
case code.NoticeResponse:
|
||||||
case code.NotificationResponse:
|
case code.NotificationResponse:
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case code.ErrorResponse:
|
case code.ErrorResponse:
|
||||||
case code.CopyDone:
|
case code.CopyDone:
|
||||||
|
this.pushBufferIfneeded();
|
||||||
this._detach()
|
this._detach()
|
||||||
this.push(null)
|
this.push(null)
|
||||||
return cb();
|
return cb();
|
||||||
@ -86,7 +97,8 @@ CopyStreamQuery.prototype._transform = function(chunk, enc, cb) {
|
|||||||
if (needPush) {
|
if (needPush) {
|
||||||
var row = chunk.slice(offset, offset + length - Int32Len)
|
var row = chunk.slice(offset, offset + length - Int32Len)
|
||||||
this.rowCount++
|
this.rowCount++
|
||||||
this.push(row)
|
row.copy(buffer, buffer_offset);
|
||||||
|
buffer_offset += row.length;
|
||||||
}
|
}
|
||||||
offset += (length - Int32Len)
|
offset += (length - Int32Len)
|
||||||
} else {
|
} else {
|
||||||
@ -95,6 +107,8 @@ CopyStreamQuery.prototype._transform = function(chunk, enc, cb) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
this.pushBufferIfneeded();
|
||||||
|
|
||||||
if(chunk.length - offset) {
|
if(chunk.length - offset) {
|
||||||
var slice = chunk.slice(offset)
|
var slice = chunk.slice(offset)
|
||||||
this._remainder = slice
|
this._remainder = slice
|
||||||
|
Loading…
Reference in New Issue
Block a user