diff --git a/app/controllers/copy_controller.js b/app/controllers/copy_controller.js index 3cc30ac3..359dad16 100644 --- a/app/controllers/copy_controller.js +++ b/app/controllers/copy_controller.js @@ -91,13 +91,8 @@ function handleCopyTo (logger) { }) .on('end', () => metrics.end(streamCopy.getRowCount())) .pipe(res) - .on('close', () => { - streamCopy.cancel(); - pgstream.emit('error', new Error('Connection closed by client')); - }) - .on('error', err => { - pgstream.emit('error', err); - }); + .on('close', () => pgstream.emit('error', new Error('Connection closed by client'))) + .on('error', err => pgstream.emit('error', err)); }); }; } @@ -124,21 +119,16 @@ function handleCopyFrom (logger) { metrics.end(null, err); pgstream.emit('error', err); }) - .on('close', () => { - streamCopy.cancel(); - pgstream.emit('error', new Error('Connection closed by client')); - }) + .on('close', () => pgstream.emit('error', new Error('Connection closed by client'))) .pipe(decompress) .on('data', data => { metrics.addSize(data.length); if(metrics.size > dbRemainingQuota) { - streamCopy.cancel(); return pgstream.emit('error', new Error('DB Quota exceeded')); } if((metrics.gzipSize || metrics.size) > COPY_FROM_MAX_POST_SIZE) { - streamCopy.cancel(); return pgstream.emit('error', new Error( `COPY FROM maximum POST size of ${COPY_FROM_MAX_POST_SIZE_PRETTY} exceeded` )); @@ -147,7 +137,6 @@ function handleCopyFrom (logger) { .on('error', err => { err.message = `Error while gunzipping: ${err.message}`; metrics.end(null, err); - streamCopy.cancel(); pgstream.emit('error', err); }) .pipe(pgstream) diff --git a/app/services/stream_copy.js b/app/services/stream_copy.js index e75ef568..ce3690d5 100644 --- a/app/services/stream_copy.js +++ b/app/services/stream_copy.js @@ -47,7 +47,7 @@ module.exports = class StreamCopy { this.clientProcessID = client.processID; - this.stream = action === ACTION_TO ? copyTo(this.sql) : copyFrom(this.sql); + this.stream = action === ACTION_TO ? copyTo(this.sql) : copyFrom(this.sql); const pgstream = client.query(this.stream); @@ -57,7 +57,10 @@ module.exports = class StreamCopy { pgstream.on('finish', () => done()); } - pgstream.on('error', err => done(err)); + pgstream.on('error', err => { + this._cancel(client.processID, action); + done(err); + }); callback(null, pgstream); }); @@ -68,10 +71,9 @@ module.exports = class StreamCopy { return this.stream.rowCount; } - cancel () { - const pid = this.clientProcessID; + _cancel (pid, action) { const pg = new PSQL(this.dbParams); - const actionType = this.action === ACTION_TO ? ACTION_TO : ACTION_FROM; + const actionType = action === ACTION_TO ? ACTION_TO : ACTION_FROM; pg.query(cancelQuery(pid), (err, result) => { if (err) {