refactoring copy controller middlewares
This commit is contained in:
parent
433bd01c27
commit
f01191472b
@ -34,8 +34,8 @@ CopyController.prototype.route = function (app) {
|
|||||||
authorizationMiddleware(this.metadataBackend),
|
authorizationMiddleware(this.metadataBackend),
|
||||||
connectionParamsMiddleware(this.userDatabaseService),
|
connectionParamsMiddleware(this.userDatabaseService),
|
||||||
timeoutLimitsMiddleware(this.metadataBackend),
|
timeoutLimitsMiddleware(this.metadataBackend),
|
||||||
this.handleCopyFrom.bind(this),
|
handleCopyFrom(),
|
||||||
this.responseCopyFrom.bind(this),
|
responseCopyFrom(),
|
||||||
errorMiddleware()
|
errorMiddleware()
|
||||||
];
|
];
|
||||||
};
|
};
|
||||||
@ -106,80 +106,83 @@ function handleCopyTo (statsClient) {
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
CopyController.prototype.handleCopyFrom = function (req, res, next) {
|
function handleCopyFrom () {
|
||||||
const { sql } = req.query;
|
return function handleCopyFromMiddleware (req, res, next) {
|
||||||
|
const { sql } = req.query;
|
||||||
|
|
||||||
if (!sql) {
|
if (!sql) {
|
||||||
return next(new Error("Parameter 'sql' is missing, must be in URL or first field in POST"));
|
return next(new Error("Parameter 'sql' is missing, must be in URL or first field in POST"));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Only accept SQL that starts with 'COPY'
|
// Only accept SQL that starts with 'COPY'
|
||||||
if (!sql.toUpperCase().startsWith("COPY ")) {
|
if (!sql.toUpperCase().startsWith("COPY ")) {
|
||||||
return next(new Error("SQL must start with COPY"));
|
return next(new Error("SQL must start with COPY"));
|
||||||
}
|
}
|
||||||
|
|
||||||
res.locals.copyFromSize = 0;
|
res.locals.copyFromSize = 0;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
const start_time = Date.now();
|
const start_time = Date.now();
|
||||||
|
|
||||||
// Connect and run the COPY
|
// Connect and run the COPY
|
||||||
const pg = new PSQL(res.locals.userDbParams);
|
const pg = new PSQL(res.locals.userDbParams);
|
||||||
pg.connect(function (err, client) {
|
pg.connect(function (err, client) {
|
||||||
if (err) {
|
if (err) {
|
||||||
return next(err);
|
return next(err);
|
||||||
}
|
}
|
||||||
|
|
||||||
let copyFromStream = copyFrom(sql);
|
let copyFromStream = copyFrom(sql);
|
||||||
const pgstream = client.query(copyFromStream);
|
const pgstream = client.query(copyFromStream);
|
||||||
pgstream.on('error', next);
|
pgstream.on('error', next);
|
||||||
pgstream.on('end', function () {
|
pgstream.on('end', function () {
|
||||||
const end_time = Date.now();
|
const end_time = Date.now();
|
||||||
res.body = {
|
res.body = {
|
||||||
time: (end_time - start_time) / 1000,
|
time: (end_time - start_time) / 1000,
|
||||||
total_rows: copyFromStream.rowCount
|
total_rows: copyFromStream.rowCount
|
||||||
};
|
};
|
||||||
|
|
||||||
return next();
|
return next();
|
||||||
|
});
|
||||||
|
|
||||||
|
if (req.get('content-encoding') === 'gzip') {
|
||||||
|
req
|
||||||
|
.pipe(zlib.createGunzip())
|
||||||
|
.on('data', data => res.locals.copyFromSize += data.length)
|
||||||
|
.pipe(pgstream);
|
||||||
|
} else {
|
||||||
|
req
|
||||||
|
.on('data', data => res.locals.copyFromSize += data.length)
|
||||||
|
.pipe(pgstream);
|
||||||
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
if (req.get('content-encoding') === 'gzip') {
|
} catch (err) {
|
||||||
req
|
next(err);
|
||||||
.pipe(zlib.createGunzip())
|
}
|
||||||
.on('data', data => res.locals.copyFromSize += data.length)
|
};
|
||||||
.pipe(pgstream);
|
}
|
||||||
} else {
|
|
||||||
req
|
|
||||||
.on('data', data => res.locals.copyFromSize += data.length)
|
|
||||||
.pipe(pgstream);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
} catch (err) {
|
function responseCopyFrom () {
|
||||||
next(err);
|
return function responseCopyFromMiddleware (req, res, next) {
|
||||||
}
|
if (!res.body || !res.body.total_rows) {
|
||||||
|
return next(new Error("No rows copied"));
|
||||||
|
}
|
||||||
|
|
||||||
};
|
if (req.profiler) {
|
||||||
|
const copyFromMetrics = {
|
||||||
|
size: res.locals.copyFromSize, //bytes
|
||||||
|
format: getFormatFromCopyQuery(req.query.sql),
|
||||||
|
time: res.body.time, //seconds
|
||||||
|
total_rows: res.body.total_rows,
|
||||||
|
gzip: req.get('content-encoding') === 'gzip'
|
||||||
|
};
|
||||||
|
|
||||||
CopyController.prototype.responseCopyFrom = function (req, res, next) {
|
req.profiler.add({ copyFrom: copyFromMetrics });
|
||||||
if (!res.body || !res.body.total_rows) {
|
res.header('X-SQLAPI-Profiler', req.profiler.toJSONString());
|
||||||
return next(new Error("No rows copied"));
|
}
|
||||||
}
|
|
||||||
|
|
||||||
if (req.profiler) {
|
res.send(res.body);
|
||||||
const copyFromMetrics = {
|
};
|
||||||
size: res.locals.copyFromSize, //bytes
|
}
|
||||||
format: getFormatFromCopyQuery(req.query.sql),
|
|
||||||
time: res.body.time, //seconds
|
|
||||||
total_rows: res.body.total_rows,
|
|
||||||
gzip: req.get('content-encoding') === 'gzip'
|
|
||||||
};
|
|
||||||
|
|
||||||
req.profiler.add({ copyFrom: copyFromMetrics });
|
|
||||||
res.header('X-SQLAPI-Profiler', req.profiler.toJSONString());
|
|
||||||
}
|
|
||||||
|
|
||||||
res.send(res.body);
|
|
||||||
};
|
|
||||||
|
|
||||||
module.exports = CopyController;
|
module.exports = CopyController;
|
||||||
|
Loading…
Reference in New Issue
Block a user