Merge branch 'pgcopy-stream' of github.com:CartoDB/CartoDB-SQL-API into pgcopy-stream

This commit is contained in:
Paul Ramsey 2018-06-04 11:35:00 -07:00
commit aee15ebc8d
5 changed files with 46 additions and 18 deletions

View File

@ -69,7 +69,8 @@ function handleCopyTo (logger) {
streamCopy.to(
res,
req.query.q,
res.locals.userDbParams,
res.locals.userDbParams,
res.locals.user,
logger,
function(err) {
if (err) {
@ -89,6 +90,7 @@ function handleCopyFrom (logger) {
req,
req.query.q,
res.locals.userDbParams,
res.locals.user,
req.get('content-encoding') === 'gzip',
logger,
function(err, metrics) { // TODO: remove when data-ingestion log works: {time, rows}

View File

@ -5,8 +5,8 @@ const copyFrom = require('pg-copy-streams').from;
const StreamCopyMetrics = require('./stream_copy_metrics');
module.exports = {
to (res, sql, userDbParams, logger, cb) {
let metrics = new StreamCopyMetrics(logger, 'copyto', sql);
to (res, sql, userDbParams, user, logger, cb) {
let metrics = new StreamCopyMetrics(logger, 'copyto', sql, user);
const pg = new PSQL(userDbParams);
pg.connect(function (err, client) {
@ -30,8 +30,8 @@ module.exports = {
});
},
from (req, sql, userDbParams, gzip, logger, cb) {
let metrics = new StreamCopyMetrics(logger, 'copyfrom', sql, gzip);
from (req, sql, userDbParams, user, gzip, logger, cb) {
let metrics = new StreamCopyMetrics(logger, 'copyfrom', sql, user, gzip);
const pg = new PSQL(userDbParams);
pg.connect(function (err, client) {

View File

@ -1,36 +1,61 @@
const { getFormatFromCopyQuery } = require('../utils/query_info');
module.exports = class StreamCopyMetrics {
constructor(logger, type, sql, gzip = null) {
constructor(logger, type, sql, user, gzip = null) {
this.logger = logger;
this.type = type;
this.type = type;
this.format = getFormatFromCopyQuery(sql);
this.gzip = gzip;
this.username = user;
this.size = 0;
this.rows = 0;
this.startTime = Date.now();
this.startTime = new Date();
this.endTime = null;
this.time = null;
this.error = null;
}
addSize (size) {
addSize(size) {
this.size += size;
}
end (rows = null) {
this.rows = rows;
this.endTime = Date.now();
this.time = (this.endTime - this.startTime) / 1000;
end(rows = null, error = null) {
if (Number.isInteger(rows)) {
this.rows = rows;
}
this.logger.info({
if (error instanceof Error) {
this.error = error;
}
this.endTime = new Date();
this.time = (this.endTime.getTime() - this.startTime.getTime()) / 1000;
this._log(
this.startTime.toISOString(),
this.error ? this.error.message : null
);
}
_log(timestamp, errorMessage = null) {
let logData = {
type: this.type,
format: this.format,
gzip: this.gzip,
size: this.size,
rows: this.rows,
time: this.time
});
gzip: this.gzip,
username: this.username,
time: this.time,
timestamp
};
if (errorMessage) {
logData.error = errorMessage;
}
this.logger.info(logData);
}
};

View File

@ -10,7 +10,7 @@ module.exports = {
return false;
}
if(copyQuery.includes(' WITH ') && copyQuery.includes('FORMAT ')) {
if(copyQuery.includes(' WITH') && copyQuery.includes('FORMAT ')) {
const regex = /\bFORMAT\s+(\w+)/;
const result = regex.exec(copyQuery);

View File

@ -9,6 +9,7 @@ describe('query info', function () {
"COPY copy_endpoints_test (id, name) FROM STDIN WITH (FORMAT CSV, DELIMITER ',', HEADER true)",
"COPY copy_endpoints_test (id, name) FROM STDIN WITH (FORMAT CSV , DELIMITER ',', HEADER true)",
"COPY copy_endpoints_test (id, name) FROM STDIN WITH (FORMAT CSV)",
"COPY copy_endpoints_test FROM STDIN WITH(FORMAT csv,HEADER true)"
];
validQueries.forEach(query => {