Merge pull request #686 from CartoDB/feature/ch145435/reduce-sql-api-log-verbosity
Reduce log verbosity
This commit is contained in:
commit
27a7a90d1a
@ -5,6 +5,7 @@ module.exports = function logReqRes ({ logOnEvent = 'finish' } = {}) {
|
|||||||
const { logger } = res.locals;
|
const { logger } = res.locals;
|
||||||
logger.info({ client_request: req }, 'Incoming request');
|
logger.info({ client_request: req }, 'Incoming request');
|
||||||
res.on(logOnEvent, () => logger.info({ server_response: res, status: res.statusCode }, 'Response sent'));
|
res.on(logOnEvent, () => logger.info({ server_response: res, status: res.statusCode }, 'Response sent'));
|
||||||
|
res.on('close', () => res.locals.logger.info({ end: true }, 'Request done'));
|
||||||
next();
|
next();
|
||||||
};
|
};
|
||||||
};
|
};
|
||||||
|
@ -102,10 +102,10 @@ function handleCopyTo ({ logger: mainLogger }) {
|
|||||||
const pid = streamCopy.clientProcessID;
|
const pid = streamCopy.clientProcessID;
|
||||||
streamCopy.cancel(pid, StreamCopy.ACTION_TO, (err) => {
|
streamCopy.cancel(pid, StreamCopy.ACTION_TO, (err) => {
|
||||||
if (err) {
|
if (err) {
|
||||||
return mainLogger.error({ request_id: requestId, exception: err, action: `copy-${StreamCopy.ACTION_TO}`, pid }, 'Unable to cancel stream query');
|
return mainLogger.error({ 'cdb-user': user, request_id: requestId, exception: err, action: `copy-${StreamCopy.ACTION_TO}`, pid }, 'Unable to cancel stream query');
|
||||||
}
|
}
|
||||||
|
|
||||||
mainLogger.info({ request_id: requestId, action: `copy-${StreamCopy.ACTION_TO}`, pid }, 'Canceled stream query successfully');
|
mainLogger.info({ 'cdb-user': user, request_id: requestId, action: `copy-${StreamCopy.ACTION_TO}`, pid }, 'Canceled stream query successfully');
|
||||||
});
|
});
|
||||||
|
|
||||||
return next(err);
|
return next(err);
|
||||||
|
@ -140,7 +140,7 @@ function createJob (jobService) {
|
|||||||
return next(err);
|
return next(err);
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.info({ job: job.toJSON() }, 'Batch query job created');
|
logger.info({ 'cdb-user': res.locals.user, job: job.toJSON() }, 'Batch query job created');
|
||||||
|
|
||||||
res.statusCode = 201;
|
res.statusCode = 201;
|
||||||
res.body = job.serialize();
|
res.body = job.serialize();
|
||||||
|
@ -116,9 +116,9 @@ Batch.prototype.processJob = function (user, callback) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (JobStatus.isFinal(job.data.status)) {
|
if (JobStatus.isFinal(job.data.status)) {
|
||||||
self.logger.info({ job: job.toJSON() }, 'Batch query job finished');
|
self.logger.info({ 'cdb-user': user, job: job.toJSON() }, 'Batch query job finished');
|
||||||
} else {
|
} else {
|
||||||
self.logger.debug({ job: job.toJSON() }, 'Batch query job: query done');
|
self.logger.debug({ 'cdb-user': user, job: job.toJSON() }, 'Batch query job: query done');
|
||||||
}
|
}
|
||||||
|
|
||||||
return callback(null, !EMPTY_QUEUE);
|
return callback(null, !EMPTY_QUEUE);
|
||||||
|
@ -82,6 +82,6 @@ module.exports = class StreamCopyMetrics {
|
|||||||
|
|
||||||
logData.success = this.success;
|
logData.success = this.success;
|
||||||
|
|
||||||
this.logger.info({ ingestion: logData }, 'Copy to/from query metrics');
|
this.logger.info({ 'cdb-user': this.username, ingestion: logData }, 'Copy to/from query metrics');
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
@ -29,7 +29,7 @@ module.exports = class Logger {
|
|||||||
serializers: {
|
serializers: {
|
||||||
client_request: requestSerializer,
|
client_request: requestSerializer,
|
||||||
server_response: responseSerializer,
|
server_response: responseSerializer,
|
||||||
exception: errorSerializer
|
exception: (err) => Array.isArray(err) ? err.map((err) => errorSerializer(err)) : [errorSerializer(err)]
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
const dest = pino.destination({ sync: false }); // stdout
|
const dest = pino.destination({ sync: false }); // stdout
|
||||||
|
105
metro/log-collector.js
Normal file
105
metro/log-collector.js
Normal file
@ -0,0 +1,105 @@
|
|||||||
|
'use strict'
|
||||||
|
|
||||||
|
const fs = require('fs');
|
||||||
|
const split = require('split2');
|
||||||
|
const assingDeep = require('assign-deep');
|
||||||
|
const { Transform } = require('stream');
|
||||||
|
const DEV_ENVS = ['test', 'development'];
|
||||||
|
const dumpPath = `${__dirname}/dump.json`;
|
||||||
|
|
||||||
|
let logs;
|
||||||
|
|
||||||
|
const LEVELS = {
|
||||||
|
'trace': 10,
|
||||||
|
'debug': 20,
|
||||||
|
'info': 30,
|
||||||
|
'warning': 40,
|
||||||
|
'error': 50,
|
||||||
|
'fatal': 60
|
||||||
|
};
|
||||||
|
|
||||||
|
module.exports = function logCollector () {
|
||||||
|
const stream = new Transform({
|
||||||
|
transform (chunk, enc, callback) {
|
||||||
|
let entry;
|
||||||
|
|
||||||
|
try {
|
||||||
|
entry = JSON.parse(chunk);
|
||||||
|
const { levelname, timestamp } = entry;
|
||||||
|
|
||||||
|
|
||||||
|
if (levelname === undefined && timestamp === undefined) {
|
||||||
|
throw new Error('Entry log is not valid');
|
||||||
|
}
|
||||||
|
} catch (e) {
|
||||||
|
if (DEV_ENVS.includes(process.env.NODE_ENV)) {
|
||||||
|
this.push(chunk + '\n');
|
||||||
|
}
|
||||||
|
return callback();
|
||||||
|
}
|
||||||
|
|
||||||
|
const { request_id: id } = entry;
|
||||||
|
|
||||||
|
if (id === undefined) {
|
||||||
|
this.push(`${JSON.stringify(entry)}\n`);
|
||||||
|
return callback();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (logs.has(id)) {
|
||||||
|
const accEntry = logs.get(id);
|
||||||
|
const { end } = entry;
|
||||||
|
|
||||||
|
if (end === true) {
|
||||||
|
accEntry.timestamp = entry.timestamp;
|
||||||
|
accEntry.event_message = entry.event_message;
|
||||||
|
this.push(`${JSON.stringify(accEntry)}\n`);
|
||||||
|
logs.delete(id);
|
||||||
|
return callback();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (LEVELS[accEntry.levelname] > LEVELS[entry.levelname]) {
|
||||||
|
delete entry.levelname;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (hasProperty(accEntry, 'exception') && hasProperty(entry, 'exception')) {
|
||||||
|
logs.set(id, assingDeep({}, accEntry, entry, { exception: accEntry.exception.concat(entry.exception) }));
|
||||||
|
} else {
|
||||||
|
logs.set(id, assingDeep({}, accEntry, entry));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
logs.set(id, entry);
|
||||||
|
}
|
||||||
|
|
||||||
|
callback();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
stream.on('pipe', () => {
|
||||||
|
if (!fs.existsSync(dumpPath)) {
|
||||||
|
logs = new Map();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
const dump = require(dumpPath);
|
||||||
|
logs = new Map(dump);
|
||||||
|
} catch (err) {
|
||||||
|
console.error(`Cannot read the dump for unfinished logs: ${err}`);
|
||||||
|
logs = new Map();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
stream.on('unpipe', () => {
|
||||||
|
try {
|
||||||
|
fs.writeFileSync(dumpPath, JSON.stringify([...logs]));
|
||||||
|
} catch (err) {
|
||||||
|
console.error(`Cannot create a dump for unfinished logs: ${err}`);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
return stream;
|
||||||
|
};
|
||||||
|
|
||||||
|
function hasProperty (obj, prop) {
|
||||||
|
return Object.prototype.hasOwnProperty.call(obj, prop);
|
||||||
|
}
|
@ -4,6 +4,7 @@ const util = require('util');
|
|||||||
const stream = require('stream');
|
const stream = require('stream');
|
||||||
const pipeline = util.promisify(stream.pipeline);
|
const pipeline = util.promisify(stream.pipeline);
|
||||||
const split = require('split2');
|
const split = require('split2');
|
||||||
|
const logCollector = require('./log-collector');
|
||||||
const MetricsCollector = require('./metrics-collector');
|
const MetricsCollector = require('./metrics-collector');
|
||||||
|
|
||||||
module.exports = async function metro ({ input = process.stdin, output = process.stdout, metrics = {} } = {}) {
|
module.exports = async function metro ({ input = process.stdin, output = process.stdout, metrics = {} } = {}) {
|
||||||
@ -12,7 +13,7 @@ module.exports = async function metro ({ input = process.stdin, output = process
|
|||||||
|
|
||||||
try {
|
try {
|
||||||
await metricsCollector.start();
|
await metricsCollector.start();
|
||||||
await pipeline(input, split(), metricsStream, output);
|
await pipeline(input, split(), logCollector(), metricsStream, output);
|
||||||
} finally {
|
} finally {
|
||||||
await metricsCollector.stop();
|
await metricsCollector.stop();
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user