From 61bc713e0cd60ae1ef18c035787f75d9abbf7fc1 Mon Sep 17 00:00:00 2001 From: Rafa de la Torre Date: Fri, 8 Jun 2018 12:49:22 +0200 Subject: [PATCH] Improve performance of COPY TO #56 Under some circumstances, the COPY TO streamming can be CPU-bound, particularly when PG holds the resultset in memory buffers and the size of the rows << chunk (64 KB in my linux box). This commits improves the situation by creating a buffer of `chunk` size and fitting in as many rows as it can before pushing them. This results in more balanced read and writes (in terms of size and in bigger chunks) as well as more frequent calls to the callback, thus freeing the main loop for other events to be processed, and therefore avoiding starvation. --- copy-to.js | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/copy-to.js b/copy-to.js index a45ee6e..4063abb 100644 --- a/copy-to.js +++ b/copy-to.js @@ -42,6 +42,10 @@ CopyStreamQuery.prototype._transform = function(chunk, enc, cb) { var messageCode; var needPush = false; + var buffer = Buffer.alloc(chunk.length); + var buffer_offset = 0; + var buffer_sent = false; + while((chunk.length - offset) >= (Byte1Len + Int32Len)) { var messageCode = chunk[offset] @@ -70,6 +74,11 @@ CopyStreamQuery.prototype._transform = function(chunk, enc, cb) { case code.ErrorResponse: case code.CopyDone: + if(needPush && !buffer_sent && buffer_offset > 0) { + this.push(buffer.slice(0, buffer_offset)) + buffer_sent = true; + buffer_offset = 0; + } this._detach() this.push(null) return cb(); @@ -84,7 +93,8 @@ CopyStreamQuery.prototype._transform = function(chunk, enc, cb) { if (needPush) { var row = chunk.slice(offset, offset + length - Int32Len) this.rowCount++ - this.push(row) + row.copy(buffer, buffer_offset); + buffer_offset += row.length; } offset += (length - Int32Len) } else { @@ -93,6 +103,12 @@ CopyStreamQuery.prototype._transform = function(chunk, enc, cb) { } } + if(needPush && !buffer_sent && buffer_offset > 0) { + this.push(buffer.slice(0, buffer_offset)) + buffer_sent = true; + buffer_offset = 0; + } + if(chunk.length - offset) { var slice = chunk.slice(offset) this._remainder = slice