Metro: stop aggregating log per request id, use new config.json file

remotes/origin/alasarr/gitignore
Daniel García Aubert 4 years ago
parent afd4ad500f
commit b32a073ac3

@ -198,7 +198,7 @@ function addHandlers (listener, killTimeout) {
process.on('unhandledRejection', (err) => exitProcess(err, listener, 'unhandledRejection', killTimeout));
process.on('ENOMEM', (err) => exitProcess(err, listener, 'ENOMEM', killTimeout));
process.on('SIGINT', () => exitProcess(null, listener, 'SIGINT', killTimeout));
process.on('SIGTERM', () => exitProcess(null, listener, 'SIGINT', killTimeout));
process.on('SIGTERM', () => exitProcess(null, listener, 'SIGTERM', killTimeout));
}
addHandlers(listener, 45000);

@ -9,7 +9,6 @@ module.exports = function initLogger ({ logger }) {
res.locals.logger.info({ request: req });
res.on('finish', () => res.locals.logger.info({ response: res }));
res.on('close', () => res.locals.logger.info({ end: true }));
next();
};

@ -21,7 +21,7 @@ module.exports = function profiler (options) {
res.on('finish', () => {
req.profiler.done('response');
req.profiler.end();
logger.info({ stats: req.profiler.toJSON() });
logger.info({ user: res.locals.user, stats: req.profiler.toJSON() });
try {
// May throw due to dns, see: http://github.com/CartoDB/Windshaft/issues/166

@ -0,0 +1,94 @@
{
"metrics": {
"port": 9145,
"definitions": [
{
"type": "counter",
"options": {
"name": "maps_api_requests_total",
"help": "MAPS API requests total"
},
"valuePath": "response.statusCode",
"shouldMeasure": "({ value }) => Number.isFinite(value)",
"measure": "({ metric }) => metric.inc()"
},
{
"type": "counter",
"options": {
"name": "maps_api_requests_ok_total",
"help": "MAPS API requests ok total"
},
"valuePath": "response.statusCode",
"shouldMeasure": "({ value }) => value >= 200 && value < 400",
"measure": "({ metric }) => metric.inc()"
},
{
"type": "counter",
"options": {
"name": "maps_api_requests_errors_total",
"help": "MAPS API requests errors total"
},
"valuePath": "response.statusCode",
"shouldMeasure": "({ value }) => value >= 400",
"measure": "({ metric }) => metric.inc()"
},
{
"type": "histogram",
"options": {
"name": "maps_api_response_time_total",
"help": "MAPS API response time total"
},
"valuePath": "stats.response",
"shouldMeasure": "({ value }) => Number.isFinite(value)",
"measure": "({ metric, value }) => metric.observe(value)"
},
{
"type": "counter",
"options": {
"name": "maps_api_requests",
"help": "MAPS API requests per user",
"labelNames": ["user", "http_code"]
},
"labelPaths": ["response.headers.carto-user", "response.statusCode"],
"shouldMeasure": "({ labels }) => labels.every((label) => label !== undefined)",
"measure": "({ metric, labels }) => metric.labels(...labels).inc()"
},
{
"type": "counter",
"options": {
"name": "maps_api_requests_ok",
"help": "MAPS API requests per user with success HTTP code",
"labelNames": ["user", "http_code"]
},
"labelPaths": ["response.headers.carto-user", "response.statusCode"],
"valuePath": "response.statusCode",
"shouldMeasure": "({ labels, value }) => labels.every((label) => label !== undefined) && value >= 200 && value < 400",
"measure": "({ metric, labels }) => metric.labels(...labels).inc()"
},
{
"type": "counter",
"options": {
"name": "maps_api_requests_errors",
"help": "MAPS API requests per user with error HTTP code",
"labelNames": ["user", "http_code"]
},
"labelPaths": ["response.headers.carto-user", "response.statusCode"],
"valuePath": "response.statusCode",
"shouldMeasure": "({ labels, value }) => labels.every((label) => label !== undefined) && value >= 400",
"measure": "({ metric, labels }) => metric.labels(...labels).inc()"
},
{
"type": "histogram",
"options": {
"name": "maps_api_response_time",
"help": "MAPS API response time total",
"labelNames": ["user"]
},
"labelPaths": ["user"],
"valuePath": "stats.response",
"shouldMeasure": "({ labels, value }) => labels.every((label) => label !== undefined) && Number.isFinite(value)",
"measure": "({ metric, labels, value }) => metric.labels(...labels).observe(value)"
}
]
}
}

@ -1,29 +1,40 @@
'use strict';
const split = require('split2');
const logCollector = require('./log-collector');
const metricsCollector = require('./metrics-collector');
const metro = require('./metro');
const path = require('path');
const fs = require('fs');
const streams = [process.stdin, split(), logCollector(), metricsCollector(), process.stdout];
const { CONFIG_PATH = path.resolve(__dirname, './config.json') } = process.env;
const existsConfigFile = fs.existsSync(CONFIG_PATH);
pipeline('pipe', streams);
if (!existsConfigFile) {
exit(4)(new Error(`Wrong path for CONFIG_PATH env variable: ${CONFIG_PATH} no such file`));
}
process.on('SIGINT', exitProcess(0));
process.on('SIGTERM', exitProcess(0));
process.on('uncaughtException', exitProcess(1));
process.on('unhandledRejection', exitProcess(1));
let config;
function pipeline (action, streams) {
for (let index = 0; index < streams.length - 1; index++) {
const source = streams[index];
const destination = streams[index + 1];
source[action](destination);
if (existsConfigFile) {
config = fs.readFileSync(CONFIG_PATH);
try {
config = JSON.parse(config);
} catch (e) {
exit(5)(new Error('Wrong config format: invalid JSON'));
}
}
function exitProcess (code = 0) {
return function exitProcess (signal) {
pipeline('unpipe', streams);
metro({ metrics: config && config.metrics })
.then(exit(0))
.catch(exit(1));
process.on('uncaughtException', exit(2));
process.on('unhandledRejection', exit(3));
function exit (code = 1) {
return function (err) {
if (err) {
console.error(err);
}
process.exit(code);
};
}

@ -1,105 +0,0 @@
'use strict'
const fs = require('fs');
const split = require('split2');
const assingDeep = require('assign-deep');
const { Transform } = require('stream');
const DEV_ENVS = ['test', 'development'];
const dumpPath = `${__dirname}/dump.json`;
let logs;
const LEVELS = {
10: 'trace',
20: 'debug',
30: 'info',
40: 'warn',
50: 'error',
60: 'fatal'
};
module.exports = function logCollector () {
const stream = new Transform({
transform (chunk, enc, callback) {
let entry;
try {
entry = JSON.parse(chunk);
const { level, time } = entry;
if (level === undefined && time === undefined) {
throw new Error('Entry log is not valid');
}
} catch (e) {
if (DEV_ENVS.includes(process.env.NODE_ENV)) {
this.push(chunk + '\n');
}
return callback();
}
const { id } = entry;
if (id === undefined) {
entry.level = LEVELS[entry.level];
this.push(`${JSON.stringify(entry)}\n`);
return callback();
}
if (logs.has(id)) {
const accEntry = logs.get(id);
const { end } = entry;
if (end === true) {
accEntry.level = LEVELS[accEntry.level];
accEntry.time = entry.time;
this.push(`${JSON.stringify(accEntry)}\n`);
logs.delete(id);
return callback();
}
if (accEntry.level > entry.level) {
delete entry.level;
}
if (hasProperty(accEntry, 'error') && hasProperty(entry, 'error')) {
logs.set(id, assingDeep({}, accEntry, entry, { error: accEntry.error.concat(entry.error) }));
} else {
logs.set(id, assingDeep({}, accEntry, entry));
}
} else {
logs.set(id, entry);
}
callback();
}
});
stream.on('pipe', () => {
if (!fs.existsSync(dumpPath)) {
logs = new Map();
return;
}
try {
const dump = require(dumpPath);
logs = new Map(dump);
} catch (err) {
console.error(`Cannot read the dump for unfinished logs: ${err}`);
logs = new Map();
}
});
stream.on('unpipe', () => {
try {
fs.writeFileSync(dumpPath, JSON.stringify([...logs]));
} catch (err) {
console.error(`Cannot create a dump for unfinished logs: ${err}`);
}
});
return stream;
};
function hasProperty (obj, prop) {
return Object.prototype.hasOwnProperty.call(obj, prop);
}

@ -0,0 +1,42 @@
'use strict';
const { Transform } = require('stream');
const DEV_ENVS = ['test', 'development'];
const LEVELS = {
10: 'trace',
20: 'debug',
30: 'info',
40: 'warn',
50: 'error',
60: 'fatal'
};
module.exports = function logParser () {
return new Transform({
transform (chunk, enc, callback) {
let entry;
try {
entry = JSON.parse(chunk);
} catch (e) {
if (DEV_ENVS.includes(process.env.NODE_ENV)) {
this.push(chunk + '\n');
}
return callback();
}
if (entry.level && LEVELS[entry.level]) {
entry.level = LEVELS[entry.level];
}
if (Number.isFinite(entry.time)) {
entry.time = new Date(entry.time).toISOString();
}
this.push(`${JSON.stringify(entry)}\n`);
return callback();
}
});
};

@ -1,67 +1,81 @@
'use strict'
'use strict';
const http = require('http');
const { Counter, Histogram, register } = require('prom-client');
const split = require('split2');
const { Transform } = require('stream');
const flatten = require('flat');
const { Transform, PassThrough } = require('stream');
const DEV_ENVS = ['test', 'development'];
const requestCounter = new Counter({
name: 'maps_api_requests_total',
help: 'MAPS API requests total'
});
const requestOkCounter = new Counter({
name: 'maps_api_requests_ok_total',
help: 'MAPS API requests ok total'
});
const requestErrorCounter = new Counter({
name: 'maps_api_requests_errors_total',
help: 'MAPS API requests errors total'
});
const responseTimeHistogram = new Histogram({
name: 'maps_api_response_time_total',
help: 'MAPS API response time total'
});
const userRequestCounter = new Counter({
name: 'maps_api_requests',
help: 'MAPS API requests per user',
labelNames: ['user', 'http_code']
});
const userRequestOkCounter = new Counter({
name: 'maps_api_requests_ok',
help: 'MAPS API requests per user with success HTTP code',
labelNames: ['user', 'http_code']
});
const userRequestErrorCounter = new Counter({
name: 'maps_api_requests_errors',
help: 'MAPS API requests per user with error HTTP code',
labelNames: ['user', 'http_code']
});
const userResponseTimeHistogram = new Histogram({
name: 'maps_api_response_time',
help: 'MAPS API response time total',
labelNames: ['user']
});
module.exports = function metricsCollector () {
const factory = {
counter: Counter,
histogram: Histogram
};
module.exports = class MetricsCollector {
constructor ({ port = 0, definitions } = {}) {
this._port = port;
this._definitions = definitions;
this._server = null;
this._stream = createTransformStream(this._definitions);
}
get stream () {
return this._stream;
}
start () {
return new Promise((resolve, reject) => {
this._server = http.createServer((req, res) => {
res.writeHead(200, { 'Content-Type': register.contentType });
res.end(register.metrics());
});
this._server.once('error', err => reject(err));
this._server.once('listening', () => resolve());
this._server.listen(this._port);
});
}
stop () {
return new Promise((resolve) => {
register.clear();
if (!this._server) {
return resolve();
}
this._server.once('close', () => {
this._server = null;
resolve();
});
this._server.close();
});
};
};
function createTransformStream (definitions) {
if (typeof definitions !== 'object') {
return new PassThrough();
}
const metrics = [];
for (const { type, options, valuePath, labelPaths, shouldMeasure, measure } of definitions) {
metrics.push({
instance: new factory[type](options),
valuePath,
labelPaths,
shouldMeasure: eval(shouldMeasure), // eslint-disable-line no-eval
measure: eval(measure) // eslint-disable-line no-eval
});
}
return new Transform({
transform (chunk, enc, callback) {
let entry;
try {
entry = JSON.parse(chunk);
const { level, time } = entry;
if (level === undefined && time === undefined) {
throw new Error('Entry log is not valid');
}
} catch (e) {
if (DEV_ENVS.includes(process.env.NODE_ENV)) {
this.push(chunk);
@ -69,53 +83,19 @@ module.exports = function metricsCollector () {
return callback();
}
const { request, response, stats } = entry;
if (request === undefined || response === undefined || stats === undefined) {
this.push(chunk);
return callback();
}
const { statusCode, headers } = response;
const { 'carto-user': user } = headers;
const flatEntry = flatten(entry);
requestCounter.inc();
for (const metric of metrics) {
const value = flatEntry[metric.valuePath];
const labels = Array.isArray(metric.labelPaths) && metric.labelPaths.map(path => flatEntry[path]);
if (statusCode !== undefined && user !== undefined) {
userRequestCounter.labels(user, `${statusCode}`).inc();
}
if (statusCode >= 200 && statusCode < 400) {
requestOkCounter.inc();
if (user !== undefined) {
userRequestOkCounter.labels(user, `${statusCode}`).inc();
if (metric.shouldMeasure({ labels, value })) {
metric.measure({ metric: metric.instance, labels, value });
}
} else if (statusCode >= 400) {
requestErrorCounter.inc();
if (user !== undefined) {
userRequestErrorCounter.labels(user, `${statusCode}`).inc();
}
}
const { response: responseTime } = stats;
if (Number.isFinite(responseTime)) {
responseTimeHistogram.observe(responseTime);
userResponseTimeHistogram.labels(user).observe(responseTime);
}
this.push(chunk);
callback();
}
})
});
}
const port = process.env.PORT || 9145;
http
.createServer((req, res) => {
res.writeHead(200, { 'Content-Type': register.contentType });
res.end(register.metrics());
})
.listen(port)
.unref();

@ -0,0 +1,20 @@
'use strict';
const util = require('util');
const stream = require('stream');
const pipeline = util.promisify(stream.pipeline);
const split = require('split2');
const logParser = require('./log-parser');
const MetricsCollector = require('./metrics-collector');
module.exports = async function metro ({ input = process.stdin, output = process.stdout, metrics = {} } = {}) {
const metricsCollector = new MetricsCollector(metrics);
const { stream: metricsStream } = metricsCollector;
try {
await metricsCollector.start();
await pipeline(input, split(), logParser(), metricsStream, output);
} finally {
await metricsCollector.stop();
}
};
Loading…
Cancel
Save