Add callbacks to pg.connect
And call them to return connections to the pool.
This commit is contained in:
parent
b59ae1d057
commit
994e8a702b
@ -10,7 +10,7 @@ module.exports = {
|
|||||||
let metrics = new StreamCopyMetrics(logger, 'copyto', sql);
|
let metrics = new StreamCopyMetrics(logger, 'copyto', sql);
|
||||||
|
|
||||||
const pg = new PSQL(userDbParams);
|
const pg = new PSQL(userDbParams);
|
||||||
pg.connect(function (err, client) {
|
pg.connect(function (err, client, done) {
|
||||||
if (err) {
|
if (err) {
|
||||||
return cb(err);
|
return cb(err);
|
||||||
}
|
}
|
||||||
@ -20,6 +20,7 @@ module.exports = {
|
|||||||
res
|
res
|
||||||
.on('error', err => {
|
.on('error', err => {
|
||||||
pgstream.unpipe(res);
|
pgstream.unpipe(res);
|
||||||
|
done();
|
||||||
return cb(err);
|
return cb(err);
|
||||||
})
|
})
|
||||||
.on('close', () => {
|
.on('close', () => {
|
||||||
@ -29,11 +30,11 @@ module.exports = {
|
|||||||
const runningClient = client;
|
const runningClient = client;
|
||||||
const cancelingClient = new Client(runningClient.connectionParameters);
|
const cancelingClient = new Client(runningClient.connectionParameters);
|
||||||
const connection = cancelingClient.connection;
|
const connection = cancelingClient.connection;
|
||||||
connection.connect(runningClient.port, runningClient.host);
|
|
||||||
connection.on('connect', () => {
|
connection.on('connect', () => {
|
||||||
connection.cancel(runningClient.processID, runningClient.secretKey);
|
connection.cancel(runningClient.processID, runningClient.secretKey);
|
||||||
|
done();
|
||||||
});
|
});
|
||||||
|
connection.connect(runningClient.port, runningClient.host);
|
||||||
return cb(new Error('Connection closed by client'));
|
return cb(new Error('Connection closed by client'));
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
@ -44,11 +45,13 @@ module.exports = {
|
|||||||
pgstream
|
pgstream
|
||||||
.on('error', err => {
|
.on('error', err => {
|
||||||
pgstream.unpipe(res);
|
pgstream.unpipe(res);
|
||||||
|
done();
|
||||||
return cb(err);
|
return cb(err);
|
||||||
})
|
})
|
||||||
.on('data', data => metrics.addSize(data.length))
|
.on('data', data => metrics.addSize(data.length))
|
||||||
.on('end', () => {
|
.on('end', () => {
|
||||||
metrics.end(copyToStream.rowCount);
|
metrics.end(copyToStream.rowCount);
|
||||||
|
done();
|
||||||
return cb(null, metrics);
|
return cb(null, metrics);
|
||||||
})
|
})
|
||||||
.pipe(res);
|
.pipe(res);
|
||||||
@ -59,7 +62,7 @@ module.exports = {
|
|||||||
let metrics = new StreamCopyMetrics(logger, 'copyfrom', sql, gzip);
|
let metrics = new StreamCopyMetrics(logger, 'copyfrom', sql, gzip);
|
||||||
|
|
||||||
const pg = new PSQL(userDbParams);
|
const pg = new PSQL(userDbParams);
|
||||||
pg.connect(function (err, client) {
|
pg.connect(function (err, client, done) {
|
||||||
if (err) {
|
if (err) {
|
||||||
return cb(err);
|
return cb(err);
|
||||||
}
|
}
|
||||||
@ -69,10 +72,12 @@ module.exports = {
|
|||||||
pgstream
|
pgstream
|
||||||
.on('error', err => {
|
.on('error', err => {
|
||||||
req.unpipe(pgstream);
|
req.unpipe(pgstream);
|
||||||
|
done();
|
||||||
return cb(err);
|
return cb(err);
|
||||||
})
|
})
|
||||||
.on('end', function () {
|
.on('end', function () {
|
||||||
metrics.end(copyFromStream.rowCount);
|
metrics.end(copyFromStream.rowCount);
|
||||||
|
done();
|
||||||
return cb(null, metrics);
|
return cb(null, metrics);
|
||||||
});
|
});
|
||||||
|
|
||||||
@ -82,6 +87,7 @@ module.exports = {
|
|||||||
.on('error', err => {
|
.on('error', err => {
|
||||||
req.unpipe(pgstream);
|
req.unpipe(pgstream);
|
||||||
pgstream.end();
|
pgstream.end();
|
||||||
|
done();
|
||||||
return cb(err);
|
return cb(err);
|
||||||
})
|
})
|
||||||
.on('close', () => {
|
.on('close', () => {
|
||||||
@ -89,6 +95,7 @@ module.exports = {
|
|||||||
const connection = client.connection;
|
const connection = client.connection;
|
||||||
connection.sendCopyFail('CARTO SQL API: Connection closed by client');
|
connection.sendCopyFail('CARTO SQL API: Connection closed by client');
|
||||||
req.unpipe(pgstream);
|
req.unpipe(pgstream);
|
||||||
|
done();
|
||||||
return cb(new Error('Connection closed by client'));
|
return cb(new Error('Connection closed by client'));
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
Loading…
Reference in New Issue
Block a user