diff --git a/lib/api/sql/copy-controller.js b/lib/api/sql/copy-controller.js index 8071820e..97ed37bc 100644 --- a/lib/api/sql/copy-controller.js +++ b/lib/api/sql/copy-controller.js @@ -90,10 +90,17 @@ function handleCopyTo (logger) { pgstream .on('data', data => metrics.addSize(data.length)) - .on('error', err => { - metrics.end(null, err); + .on('error', error => { + const pid = streamCopy.clientProcessID; + streamCopy.cancel(pid, StreamCopy.ACTION_TO, (err) => { + if (err) { + logger.error({ error: err }); + } - return next(err); + metrics.end(null, error); + + return next(error); + }); }) .on('end', () => { metrics.end(streamCopy.getRowCount()); diff --git a/lib/services/stream-copy.js b/lib/services/stream-copy.js index 642e2118..f9e9c243 100644 --- a/lib/services/stream-copy.js +++ b/lib/services/stream-copy.js @@ -13,14 +13,13 @@ const terminateQuery = pid => `SELECT pg_terminate_backend(${pid}) as terminated const timeoutQuery = timeout => `SET statement_timeout=${timeout}`; module.exports = class StreamCopy { - constructor (sql, userDbParams, logger) { + constructor (sql, userDbParams) { this.dbParams = Object.assign({}, userDbParams, { port: global.settings.db_batch_port || userDbParams.port }); this.sql = sql; this.stream = null; this.timeout = global.settings.copy_timeout || DEFAULT_TIMEOUT; - this.logger = logger; } static get ACTION_TO () { @@ -52,7 +51,6 @@ module.exports = class StreamCopy { if (action === ACTION_TO) { pgstream.on('end', () => done()); - pgstream.on('error', () => this._cancel(client.processID, action)); pgstream.on('warning', (msg) => this.logger.warn({ error: new Error(msg) })); } else if (action === ACTION_FROM) { pgstream.on('finish', () => done()); @@ -70,33 +68,33 @@ module.exports = class StreamCopy { return this.stream.rowCount; } - _cancel (pid, action) { + cancel (pid, action, callback) { const pg = new PSQL(this.dbParams); const actionType = action === ACTION_TO ? ACTION_TO : ACTION_FROM; pg.query(cancelQuery(pid), (err, result) => { if (err) { - return this.logger.error({ error: err }); + return callback(err); } const isCancelled = result.rows.length && result.rows[0].cancelled; if (isCancelled) { - return this.logger.info(`Canceled "copy ${actionType}" stream query successfully (pid: ${pid})`); + return callback(); } return pg.query(terminateQuery(pid), (err, result) => { if (err) { - return this.logger.error({ error: err }); + return callback(err); } const isTerminated = result.rows.length && result.rows[0].terminated; if (!isTerminated) { - return this.logger.error({ error: new Error(`Unable to terminate "copy ${actionType}" stream query (pid: ${pid})`) }); + return callback(new Error(`Unable to terminate "copy ${actionType}" stream query (pid: ${pid})`)); } - return this.logger.info(`Terminated "copy ${actionType}" stream query successfully (pid: ${pid})`); + return callback(); }); }); }