From 935f08c5b5933822ab5ebc7841978c1898fe9344 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Garc=C3=ADa=20Aubert?= Date: Mon, 5 Apr 2021 16:03:33 +0200 Subject: [PATCH 1/2] feat: reduce log verbosity --- lib/api/middlewares/log-req-res.js | 1 + lib/utils/logger.js | 2 +- metro/log-collector.js | 105 +++++++++++++++++++++++++++++ metro/metro.js | 3 +- 4 files changed, 109 insertions(+), 2 deletions(-) create mode 100644 metro/log-collector.js diff --git a/lib/api/middlewares/log-req-res.js b/lib/api/middlewares/log-req-res.js index 0a3e1f65..03181215 100644 --- a/lib/api/middlewares/log-req-res.js +++ b/lib/api/middlewares/log-req-res.js @@ -5,6 +5,7 @@ module.exports = function logReqRes ({ logOnEvent = 'finish' } = {}) { const { logger } = res.locals; logger.info({ client_request: req }, 'Incoming request'); res.on(logOnEvent, () => logger.info({ server_response: res, status: res.statusCode }, 'Response sent')); + res.on('close', () => res.locals.logger.info({ end: true }, 'Request done')); next(); }; }; diff --git a/lib/utils/logger.js b/lib/utils/logger.js index f47f9634..128a5f76 100644 --- a/lib/utils/logger.js +++ b/lib/utils/logger.js @@ -29,7 +29,7 @@ module.exports = class Logger { serializers: { client_request: requestSerializer, server_response: responseSerializer, - exception: errorSerializer + exception: (err) => Array.isArray(err) ? err.map((err) => errorSerializer(err)) : [errorSerializer(err)] } }; const dest = pino.destination({ sync: false }); // stdout diff --git a/metro/log-collector.js b/metro/log-collector.js new file mode 100644 index 00000000..1c4a1c0c --- /dev/null +++ b/metro/log-collector.js @@ -0,0 +1,105 @@ +'use strict' + +const fs = require('fs'); +const split = require('split2'); +const assingDeep = require('assign-deep'); +const { Transform } = require('stream'); +const DEV_ENVS = ['test', 'development']; +const dumpPath = `${__dirname}/dump.json`; + +let logs; + +const LEVELS = { + 'trace': 10, + 'debug': 20, + 'info': 30, + 'warning': 40, + 'error': 50, + 'fatal': 60 +}; + +module.exports = function logCollector () { + const stream = new Transform({ + transform (chunk, enc, callback) { + let entry; + + try { + entry = JSON.parse(chunk); + const { levelname, timestamp } = entry; + + + if (levelname === undefined && timestamp === undefined) { + throw new Error('Entry log is not valid'); + } + } catch (e) { + if (DEV_ENVS.includes(process.env.NODE_ENV)) { + this.push(chunk + '\n'); + } + return callback(); + } + + const { request_id: id } = entry; + + if (id === undefined) { + this.push(`${JSON.stringify(entry)}\n`); + return callback(); + } + + if (logs.has(id)) { + const accEntry = logs.get(id); + const { end } = entry; + + if (end === true) { + accEntry.timestamp = entry.timestamp; + accEntry.event_message = entry.event_message; + this.push(`${JSON.stringify(accEntry)}\n`); + logs.delete(id); + return callback(); + } + + if (LEVELS[accEntry.levelname] > LEVELS[entry.levelname]) { + delete entry.levelname; + } + + if (hasProperty(accEntry, 'exception') && hasProperty(entry, 'exception')) { + logs.set(id, assingDeep({}, accEntry, entry, { exception: accEntry.exception.concat(entry.exception) })); + } else { + logs.set(id, assingDeep({}, accEntry, entry)); + } + } else { + logs.set(id, entry); + } + + callback(); + } + }); + + stream.on('pipe', () => { + if (!fs.existsSync(dumpPath)) { + logs = new Map(); + return; + } + + try { + const dump = require(dumpPath); + logs = new Map(dump); + } catch (err) { + console.error(`Cannot read the dump for unfinished logs: ${err}`); + logs = new Map(); + } + }); + + stream.on('unpipe', () => { + try { + fs.writeFileSync(dumpPath, JSON.stringify([...logs])); + } catch (err) { + console.error(`Cannot create a dump for unfinished logs: ${err}`); + } + }); + + return stream; +}; + +function hasProperty (obj, prop) { + return Object.prototype.hasOwnProperty.call(obj, prop); +} diff --git a/metro/metro.js b/metro/metro.js index 241c49ee..851539fb 100644 --- a/metro/metro.js +++ b/metro/metro.js @@ -4,6 +4,7 @@ const util = require('util'); const stream = require('stream'); const pipeline = util.promisify(stream.pipeline); const split = require('split2'); +const logCollector = require('./log-collector'); const MetricsCollector = require('./metrics-collector'); module.exports = async function metro ({ input = process.stdin, output = process.stdout, metrics = {} } = {}) { @@ -12,7 +13,7 @@ module.exports = async function metro ({ input = process.stdin, output = process try { await metricsCollector.start(); - await pipeline(input, split(), metricsStream, output); + await pipeline(input, split(), logCollector(), metricsStream, output); } finally { await metricsCollector.stop(); } From 53f69dd6e49c8a6091ac0cf8a42f20b829d2f99a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Garc=C3=ADa=20Aubert?= Date: Thu, 8 Apr 2021 17:05:34 +0200 Subject: [PATCH 2/2] fix: add cdb-user field to data ingestion and batch queries logs --- lib/api/sql/copy-controller.js | 4 ++-- lib/api/sql/job-controller.js | 2 +- lib/batch/batch.js | 4 ++-- lib/services/stream-copy-metrics.js | 2 +- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/lib/api/sql/copy-controller.js b/lib/api/sql/copy-controller.js index 572dc86b..efad79de 100644 --- a/lib/api/sql/copy-controller.js +++ b/lib/api/sql/copy-controller.js @@ -102,10 +102,10 @@ function handleCopyTo ({ logger: mainLogger }) { const pid = streamCopy.clientProcessID; streamCopy.cancel(pid, StreamCopy.ACTION_TO, (err) => { if (err) { - return mainLogger.error({ request_id: requestId, exception: err, action: `copy-${StreamCopy.ACTION_TO}`, pid }, 'Unable to cancel stream query'); + return mainLogger.error({ 'cdb-user': user, request_id: requestId, exception: err, action: `copy-${StreamCopy.ACTION_TO}`, pid }, 'Unable to cancel stream query'); } - mainLogger.info({ request_id: requestId, action: `copy-${StreamCopy.ACTION_TO}`, pid }, 'Canceled stream query successfully'); + mainLogger.info({ 'cdb-user': user, request_id: requestId, action: `copy-${StreamCopy.ACTION_TO}`, pid }, 'Canceled stream query successfully'); }); return next(err); diff --git a/lib/api/sql/job-controller.js b/lib/api/sql/job-controller.js index 662b0303..39782bb2 100644 --- a/lib/api/sql/job-controller.js +++ b/lib/api/sql/job-controller.js @@ -140,7 +140,7 @@ function createJob (jobService) { return next(err); } - logger.info({ job: job.toJSON() }, 'Batch query job created'); + logger.info({ 'cdb-user': res.locals.user, job: job.toJSON() }, 'Batch query job created'); res.statusCode = 201; res.body = job.serialize(); diff --git a/lib/batch/batch.js b/lib/batch/batch.js index 329acb4e..f51b4864 100644 --- a/lib/batch/batch.js +++ b/lib/batch/batch.js @@ -116,9 +116,9 @@ Batch.prototype.processJob = function (user, callback) { } if (JobStatus.isFinal(job.data.status)) { - self.logger.info({ job: job.toJSON() }, 'Batch query job finished'); + self.logger.info({ 'cdb-user': user, job: job.toJSON() }, 'Batch query job finished'); } else { - self.logger.debug({ job: job.toJSON() }, 'Batch query job: query done'); + self.logger.debug({ 'cdb-user': user, job: job.toJSON() }, 'Batch query job: query done'); } return callback(null, !EMPTY_QUEUE); diff --git a/lib/services/stream-copy-metrics.js b/lib/services/stream-copy-metrics.js index 4e04cbf5..3fd8b723 100644 --- a/lib/services/stream-copy-metrics.js +++ b/lib/services/stream-copy-metrics.js @@ -82,6 +82,6 @@ module.exports = class StreamCopyMetrics { logData.success = this.success; - this.logger.info({ ingestion: logData }, 'Copy to/from query metrics'); + this.logger.info({ 'cdb-user': this.username, ingestion: logData }, 'Copy to/from query metrics'); } };