Encapsulate cancel

This commit is contained in:
Daniel García Aubert 2019-05-27 19:24:48 +02:00
parent 73eb9656cd
commit 7a3e4d617d
2 changed files with 10 additions and 19 deletions

View File

@ -91,13 +91,8 @@ function handleCopyTo (logger) {
})
.on('end', () => metrics.end(streamCopy.getRowCount()))
.pipe(res)
.on('close', () => {
streamCopy.cancel();
pgstream.emit('error', new Error('Connection closed by client'));
})
.on('error', err => {
pgstream.emit('error', err);
});
.on('close', () => pgstream.emit('error', new Error('Connection closed by client')))
.on('error', err => pgstream.emit('error', err));
});
};
}
@ -124,21 +119,16 @@ function handleCopyFrom (logger) {
metrics.end(null, err);
pgstream.emit('error', err);
})
.on('close', () => {
streamCopy.cancel();
pgstream.emit('error', new Error('Connection closed by client'));
})
.on('close', () => pgstream.emit('error', new Error('Connection closed by client')))
.pipe(decompress)
.on('data', data => {
metrics.addSize(data.length);
if(metrics.size > dbRemainingQuota) {
streamCopy.cancel();
return pgstream.emit('error', new Error('DB Quota exceeded'));
}
if((metrics.gzipSize || metrics.size) > COPY_FROM_MAX_POST_SIZE) {
streamCopy.cancel();
return pgstream.emit('error', new Error(
`COPY FROM maximum POST size of ${COPY_FROM_MAX_POST_SIZE_PRETTY} exceeded`
));
@ -147,7 +137,6 @@ function handleCopyFrom (logger) {
.on('error', err => {
err.message = `Error while gunzipping: ${err.message}`;
metrics.end(null, err);
streamCopy.cancel();
pgstream.emit('error', err);
})
.pipe(pgstream)

View File

@ -47,7 +47,7 @@ module.exports = class StreamCopy {
this.clientProcessID = client.processID;
this.stream = action === ACTION_TO ? copyTo(this.sql) : copyFrom(this.sql);
this.stream = action === ACTION_TO ? copyTo(this.sql) : copyFrom(this.sql);
const pgstream = client.query(this.stream);
@ -57,7 +57,10 @@ module.exports = class StreamCopy {
pgstream.on('finish', () => done());
}
pgstream.on('error', err => done(err));
pgstream.on('error', err => {
this._cancel(client.processID, action);
done(err);
});
callback(null, pgstream);
});
@ -68,10 +71,9 @@ module.exports = class StreamCopy {
return this.stream.rowCount;
}
cancel () {
const pid = this.clientProcessID;
_cancel (pid, action) {
const pg = new PSQL(this.dbParams);
const actionType = this.action === ACTION_TO ? ACTION_TO : ACTION_FROM;
const actionType = action === ACTION_TO ? ACTION_TO : ACTION_FROM;
pg.query(cancelQuery(pid), (err, result) => {
if (err) {