diff --git a/app.js b/app.js index 8e231c54..3f6e9e35 100755 --- a/app.js +++ b/app.js @@ -198,7 +198,7 @@ function addHandlers (listener, killTimeout) { process.on('unhandledRejection', (err) => exitProcess(err, listener, 'unhandledRejection', killTimeout)); process.on('ENOMEM', (err) => exitProcess(err, listener, 'ENOMEM', killTimeout)); process.on('SIGINT', () => exitProcess(null, listener, 'SIGINT', killTimeout)); - process.on('SIGTERM', () => exitProcess(null, listener, 'SIGINT', killTimeout)); + process.on('SIGTERM', () => exitProcess(null, listener, 'SIGTERM', killTimeout)); } addHandlers(listener, 45000); diff --git a/lib/api/middlewares/logger.js b/lib/api/middlewares/logger.js index c081c8ea..09b92ef0 100644 --- a/lib/api/middlewares/logger.js +++ b/lib/api/middlewares/logger.js @@ -9,7 +9,6 @@ module.exports = function initLogger ({ logger }) { res.locals.logger.info({ request: req }); res.on('finish', () => res.locals.logger.info({ response: res })); - res.on('close', () => res.locals.logger.info({ end: true })); next(); }; diff --git a/lib/api/middlewares/profiler.js b/lib/api/middlewares/profiler.js index f4af21d7..3807036c 100644 --- a/lib/api/middlewares/profiler.js +++ b/lib/api/middlewares/profiler.js @@ -21,7 +21,7 @@ module.exports = function profiler (options) { res.on('finish', () => { req.profiler.done('response'); req.profiler.end(); - logger.info({ stats: req.profiler.toJSON() }); + logger.info({ user: res.locals.user, stats: req.profiler.toJSON() }); try { // May throw due to dns, see: http://github.com/CartoDB/Windshaft/issues/166 diff --git a/metro/config.json b/metro/config.json new file mode 100644 index 00000000..8735e605 --- /dev/null +++ b/metro/config.json @@ -0,0 +1,94 @@ +{ + "metrics": { + "port": 9145, + "definitions": [ + { + "type": "counter", + "options": { + "name": "maps_api_requests_total", + "help": "MAPS API requests total" + }, + "valuePath": "response.statusCode", + "shouldMeasure": "({ value }) => Number.isFinite(value)", + "measure": "({ metric }) => metric.inc()" + }, + { + "type": "counter", + "options": { + "name": "maps_api_requests_ok_total", + "help": "MAPS API requests ok total" + }, + "valuePath": "response.statusCode", + "shouldMeasure": "({ value }) => value >= 200 && value < 400", + "measure": "({ metric }) => metric.inc()" + }, + { + "type": "counter", + "options": { + "name": "maps_api_requests_errors_total", + "help": "MAPS API requests errors total" + }, + "valuePath": "response.statusCode", + "shouldMeasure": "({ value }) => value >= 400", + "measure": "({ metric }) => metric.inc()" + }, + { + "type": "histogram", + "options": { + "name": "maps_api_response_time_total", + "help": "MAPS API response time total" + }, + "valuePath": "stats.response", + "shouldMeasure": "({ value }) => Number.isFinite(value)", + "measure": "({ metric, value }) => metric.observe(value)" + }, + { + "type": "counter", + "options": { + "name": "maps_api_requests", + "help": "MAPS API requests per user", + "labelNames": ["user", "http_code"] + }, + "labelPaths": ["response.headers.carto-user", "response.statusCode"], + "shouldMeasure": "({ labels }) => labels.every((label) => label !== undefined)", + "measure": "({ metric, labels }) => metric.labels(...labels).inc()" + }, + { + "type": "counter", + "options": { + "name": "maps_api_requests_ok", + "help": "MAPS API requests per user with success HTTP code", + "labelNames": ["user", "http_code"] + }, + "labelPaths": ["response.headers.carto-user", "response.statusCode"], + "valuePath": "response.statusCode", + "shouldMeasure": "({ labels, value }) => labels.every((label) => label !== undefined) && value >= 200 && value < 400", + "measure": "({ metric, labels }) => metric.labels(...labels).inc()" + }, + { + "type": "counter", + "options": { + "name": "maps_api_requests_errors", + "help": "MAPS API requests per user with error HTTP code", + "labelNames": ["user", "http_code"] + }, + "labelPaths": ["response.headers.carto-user", "response.statusCode"], + "valuePath": "response.statusCode", + "shouldMeasure": "({ labels, value }) => labels.every((label) => label !== undefined) && value >= 400", + "measure": "({ metric, labels }) => metric.labels(...labels).inc()" + }, + { + "type": "histogram", + "options": { + "name": "maps_api_response_time", + "help": "MAPS API response time total", + "labelNames": ["user"] + }, + "labelPaths": ["user"], + "valuePath": "stats.response", + "shouldMeasure": "({ labels, value }) => labels.every((label) => label !== undefined) && Number.isFinite(value)", + "measure": "({ metric, labels, value }) => metric.labels(...labels).observe(value)" + } + ] + } +} diff --git a/metro/index.js b/metro/index.js index 123e147f..d9425d12 100644 --- a/metro/index.js +++ b/metro/index.js @@ -1,29 +1,40 @@ 'use strict'; -const split = require('split2'); -const logCollector = require('./log-collector'); -const metricsCollector = require('./metrics-collector'); +const metro = require('./metro'); +const path = require('path'); +const fs = require('fs'); -const streams = [process.stdin, split(), logCollector(), metricsCollector(), process.stdout]; +const { CONFIG_PATH = path.resolve(__dirname, './config.json') } = process.env; +const existsConfigFile = fs.existsSync(CONFIG_PATH); -pipeline('pipe', streams); +if (!existsConfigFile) { + exit(4)(new Error(`Wrong path for CONFIG_PATH env variable: ${CONFIG_PATH} no such file`)); +} -process.on('SIGINT', exitProcess(0)); -process.on('SIGTERM', exitProcess(0)); -process.on('uncaughtException', exitProcess(1)); -process.on('unhandledRejection', exitProcess(1)); +let config; -function pipeline (action, streams) { - for (let index = 0; index < streams.length - 1; index++) { - const source = streams[index]; - const destination = streams[index + 1]; - source[action](destination); +if (existsConfigFile) { + config = fs.readFileSync(CONFIG_PATH); + try { + config = JSON.parse(config); + } catch (e) { + exit(5)(new Error('Wrong config format: invalid JSON')); } } -function exitProcess (code = 0) { - return function exitProcess (signal) { - pipeline('unpipe', streams); +metro({ metrics: config && config.metrics }) + .then(exit(0)) + .catch(exit(1)); + +process.on('uncaughtException', exit(2)); +process.on('unhandledRejection', exit(3)); + +function exit (code = 1) { + return function (err) { + if (err) { + console.error(err); + } + process.exit(code); }; } diff --git a/metro/log-collector.js b/metro/log-collector.js deleted file mode 100644 index 88123db9..00000000 --- a/metro/log-collector.js +++ /dev/null @@ -1,105 +0,0 @@ -'use strict' - -const fs = require('fs'); -const split = require('split2'); -const assingDeep = require('assign-deep'); -const { Transform } = require('stream'); -const DEV_ENVS = ['test', 'development']; -const dumpPath = `${__dirname}/dump.json`; - -let logs; - -const LEVELS = { - 10: 'trace', - 20: 'debug', - 30: 'info', - 40: 'warn', - 50: 'error', - 60: 'fatal' -}; - -module.exports = function logCollector () { - const stream = new Transform({ - transform (chunk, enc, callback) { - let entry; - - try { - entry = JSON.parse(chunk); - const { level, time } = entry; - - if (level === undefined && time === undefined) { - throw new Error('Entry log is not valid'); - } - } catch (e) { - if (DEV_ENVS.includes(process.env.NODE_ENV)) { - this.push(chunk + '\n'); - } - return callback(); - } - - const { id } = entry; - - if (id === undefined) { - entry.level = LEVELS[entry.level]; - this.push(`${JSON.stringify(entry)}\n`); - return callback(); - } - - if (logs.has(id)) { - const accEntry = logs.get(id); - const { end } = entry; - - if (end === true) { - accEntry.level = LEVELS[accEntry.level]; - accEntry.time = entry.time; - this.push(`${JSON.stringify(accEntry)}\n`); - logs.delete(id); - return callback(); - } - - if (accEntry.level > entry.level) { - delete entry.level; - } - - if (hasProperty(accEntry, 'error') && hasProperty(entry, 'error')) { - logs.set(id, assingDeep({}, accEntry, entry, { error: accEntry.error.concat(entry.error) })); - } else { - logs.set(id, assingDeep({}, accEntry, entry)); - } - } else { - logs.set(id, entry); - } - - callback(); - } - }); - - stream.on('pipe', () => { - if (!fs.existsSync(dumpPath)) { - logs = new Map(); - return; - } - - try { - const dump = require(dumpPath); - logs = new Map(dump); - } catch (err) { - console.error(`Cannot read the dump for unfinished logs: ${err}`); - logs = new Map(); - } - }); - - stream.on('unpipe', () => { - try { - fs.writeFileSync(dumpPath, JSON.stringify([...logs])); - } catch (err) { - console.error(`Cannot create a dump for unfinished logs: ${err}`); - } - }); - - return stream; -}; - -function hasProperty (obj, prop) { - return Object.prototype.hasOwnProperty.call(obj, prop); -} diff --git a/metro/log-parser.js b/metro/log-parser.js new file mode 100644 index 00000000..2022eb41 --- /dev/null +++ b/metro/log-parser.js @@ -0,0 +1,42 @@ +'use strict'; + +const { Transform } = require('stream'); +const DEV_ENVS = ['test', 'development']; + +const LEVELS = { + 10: 'trace', + 20: 'debug', + 30: 'info', + 40: 'warn', + 50: 'error', + 60: 'fatal' +}; + +module.exports = function logParser () { + return new Transform({ + transform (chunk, enc, callback) { + let entry; + + try { + entry = JSON.parse(chunk); + } catch (e) { + if (DEV_ENVS.includes(process.env.NODE_ENV)) { + this.push(chunk + '\n'); + } + return callback(); + } + + if (entry.level && LEVELS[entry.level]) { + entry.level = LEVELS[entry.level]; + } + + if (Number.isFinite(entry.time)) { + entry.time = new Date(entry.time).toISOString(); + } + + this.push(`${JSON.stringify(entry)}\n`); + + return callback(); + } + }); +}; diff --git a/metro/metrics-collector.js b/metro/metrics-collector.js index e20377e8..5db7f8c4 100644 --- a/metro/metrics-collector.js +++ b/metro/metrics-collector.js @@ -1,67 +1,81 @@ -'use strict' +'use strict'; const http = require('http'); const { Counter, Histogram, register } = require('prom-client'); -const split = require('split2'); -const { Transform } = require('stream'); +const flatten = require('flat'); +const { Transform, PassThrough } = require('stream'); const DEV_ENVS = ['test', 'development']; -const requestCounter = new Counter({ - name: 'maps_api_requests_total', - help: 'MAPS API requests total' -}); - -const requestOkCounter = new Counter({ - name: 'maps_api_requests_ok_total', - help: 'MAPS API requests ok total' -}); - -const requestErrorCounter = new Counter({ - name: 'maps_api_requests_errors_total', - help: 'MAPS API requests errors total' -}); - -const responseTimeHistogram = new Histogram({ - name: 'maps_api_response_time_total', - help: 'MAPS API response time total' -}); - -const userRequestCounter = new Counter({ - name: 'maps_api_requests', - help: 'MAPS API requests per user', - labelNames: ['user', 'http_code'] -}); - -const userRequestOkCounter = new Counter({ - name: 'maps_api_requests_ok', - help: 'MAPS API requests per user with success HTTP code', - labelNames: ['user', 'http_code'] -}); - -const userRequestErrorCounter = new Counter({ - name: 'maps_api_requests_errors', - help: 'MAPS API requests per user with error HTTP code', - labelNames: ['user', 'http_code'] -}); - -const userResponseTimeHistogram = new Histogram({ - name: 'maps_api_response_time', - help: 'MAPS API response time total', - labelNames: ['user'] -}); - -module.exports = function metricsCollector () { +const factory = { + counter: Counter, + histogram: Histogram +}; + +module.exports = class MetricsCollector { + constructor ({ port = 0, definitions } = {}) { + this._port = port; + this._definitions = definitions; + this._server = null; + this._stream = createTransformStream(this._definitions); + } + + get stream () { + return this._stream; + } + + start () { + return new Promise((resolve, reject) => { + this._server = http.createServer((req, res) => { + res.writeHead(200, { 'Content-Type': register.contentType }); + res.end(register.metrics()); + }); + + this._server.once('error', err => reject(err)); + this._server.once('listening', () => resolve()); + this._server.listen(this._port); + }); + } + + stop () { + return new Promise((resolve) => { + register.clear(); + if (!this._server) { + return resolve(); + } + + this._server.once('close', () => { + this._server = null; + resolve(); + }); + + this._server.close(); + }); + }; +}; + +function createTransformStream (definitions) { + if (typeof definitions !== 'object') { + return new PassThrough(); + } + + const metrics = []; + + for (const { type, options, valuePath, labelPaths, shouldMeasure, measure } of definitions) { + metrics.push({ + instance: new factory[type](options), + valuePath, + labelPaths, + shouldMeasure: eval(shouldMeasure), // eslint-disable-line no-eval + measure: eval(measure) // eslint-disable-line no-eval + }); + } + return new Transform({ transform (chunk, enc, callback) { let entry; try { entry = JSON.parse(chunk); - const { level, time } = entry; - - if (level === undefined && time === undefined) { - throw new Error('Entry log is not valid'); - } } catch (e) { if (DEV_ENVS.includes(process.env.NODE_ENV)) { this.push(chunk); @@ -69,53 +83,19 @@ module.exports = function metricsCollector () { return callback(); } - const { request, response, stats } = entry; - - if (request === undefined || response === undefined || stats === undefined) { - this.push(chunk); - return callback(); - } - - const { statusCode, headers } = response; - const { 'carto-user': user } = headers; + const flatEntry = flatten(entry); - requestCounter.inc(); + for (const metric of metrics) { + const value = flatEntry[metric.valuePath]; + const labels = Array.isArray(metric.labelPaths) && metric.labelPaths.map(path => flatEntry[path]); - if (statusCode !== undefined && user !== undefined) { - userRequestCounter.labels(user, `${statusCode}`).inc(); - } - - if (statusCode >= 200 && statusCode < 400) { - requestOkCounter.inc(); - if (user !== undefined) { - userRequestOkCounter.labels(user, `${statusCode}`).inc(); + if (metric.shouldMeasure({ labels, value })) { + metric.measure({ metric: metric.instance, labels, value }); } - } else if (statusCode >= 400) { - requestErrorCounter.inc(); - if (user !== undefined) { - userRequestErrorCounter.labels(user, `${statusCode}`).inc(); - } - } - - const { response: responseTime } = stats; - - if (Number.isFinite(responseTime)) { - responseTimeHistogram.observe(responseTime); - userResponseTimeHistogram.labels(user).observe(responseTime); } this.push(chunk); callback(); } - }) + }); } - -const port = process.env.PORT || 9145; - -http - .createServer((req, res) => { - res.writeHead(200, { 'Content-Type': register.contentType }); - res.end(register.metrics()); - }) - .listen(port) - .unref(); diff --git a/metro/metro.js b/metro/metro.js new file mode 100644 index 00000000..c26f373a --- /dev/null +++ b/metro/metro.js @@ -0,0 +1,20 @@ +'use strict'; + +const util = require('util'); +const stream = require('stream'); +const pipeline = util.promisify(stream.pipeline); +const split = require('split2'); +const logParser = require('./log-parser'); +const MetricsCollector = require('./metrics-collector'); + +module.exports = async function metro ({ input = process.stdin, output = process.stdout, metrics = {} } = {}) { + const metricsCollector = new MetricsCollector(metrics); + const { stream: metricsStream } = metricsCollector; + + try { + await metricsCollector.start(); + await pipeline(input, split(), logParser(), metricsStream, output); + } finally { + await metricsCollector.stop(); + } +};