diff --git a/lib/api/middlewares/affected-tables.js b/lib/api/middlewares/affected-tables.js index 71d19c6c..bed65391 100644 --- a/lib/api/middlewares/affected-tables.js +++ b/lib/api/middlewares/affected-tables.js @@ -19,8 +19,7 @@ module.exports = function affectedTables () { return next(); }) .catch(err => { - err.message = `Error on query explain "${sql}": ${err.message}`; - logger.warn({ error: err }); + logger.warn({ exception: err }, 'Error on query explain'); return next(); }); diff --git a/lib/api/middlewares/error.js b/lib/api/middlewares/error.js index d409a167..108dd2cc 100644 --- a/lib/api/middlewares/error.js +++ b/lib/api/middlewares/error.js @@ -8,7 +8,7 @@ module.exports = function error () { const errorHandler = errorHandlerFactory(err); const errorResponse = errorHandler.getResponse(); - logger.error({ error: err }); + logger.error({ exception: err }, 'Error while handling the request'); // Force inline content disposition res.header('Content-Disposition', 'inline'); diff --git a/lib/api/middlewares/log-query.js b/lib/api/middlewares/log-query.js index 0bfa6507..e23318f5 100644 --- a/lib/api/middlewares/log-query.js +++ b/lib/api/middlewares/log-query.js @@ -12,7 +12,7 @@ module.exports = function logQuery () { return function logQueryMiddleware (req, res, next) { const { logger } = res.locals; - logger.info({ sql: ensureMaxQueryLength(res.locals.params.sql) }); + logger.info({ sql: ensureMaxQueryLength(res.locals.params.sql) }, 'Input query'); return next(); }; diff --git a/lib/api/middlewares/log-req-res.js b/lib/api/middlewares/log-req-res.js new file mode 100644 index 00000000..0a3e1f65 --- /dev/null +++ b/lib/api/middlewares/log-req-res.js @@ -0,0 +1,10 @@ +'use strict'; + +module.exports = function logReqRes ({ logOnEvent = 'finish' } = {}) { + return function logReqResMiddleware (req, res, next) { + const { logger } = res.locals; + logger.info({ client_request: req }, 'Incoming request'); + res.on(logOnEvent, () => logger.info({ server_response: res, status: res.statusCode }, 'Response sent')); + next(); + }; +}; diff --git a/lib/api/middlewares/logger.js b/lib/api/middlewares/logger.js index ff7ee8bd..9be06b0b 100644 --- a/lib/api/middlewares/logger.js +++ b/lib/api/middlewares/logger.js @@ -2,15 +2,10 @@ const uuid = require('uuid'); -module.exports = function initLogger ({ logger, logOnEvent = 'finish' }) { +module.exports = function initLogger ({ logger }) { return function initLoggerMiddleware (req, res, next) { - const id = req.get('X-Request-Id') || uuid.v4(); - res.locals.logger = logger.child({ id }); - - res.locals.logger.info({ request: req }); - res.on(logOnEvent, () => res.locals.logger.info({ response: res })); - res.on('close', () => res.locals.logger.info({ end: true })); - + const requestId = req.get('X-Request-Id') || uuid.v4(); + res.locals.logger = logger.child({ request_id: requestId }); next(); }; }; diff --git a/lib/api/middlewares/profiler.js b/lib/api/middlewares/profiler.js index ced8a9f2..c80113cf 100644 --- a/lib/api/middlewares/profiler.js +++ b/lib/api/middlewares/profiler.js @@ -18,12 +18,13 @@ module.exports = function profiler ({ statsClient, logOnEvent = 'finish' }) { res.on(logOnEvent, () => { req.profiler.add({ response: new Date() - start }); req.profiler.end(); - logger.info({ stats: req.profiler.toJSON() }); + const stats = req.profiler.toJSON(); + logger.info({ stats, duration: stats.response / 1000, duration_ms: stats.response }, 'Request profiling stats'); try { req.profiler.sendStats(); } catch (err) { - logger.warn({ error: err }); + logger.warn({ exception: err }, 'Could not send stats to StatsD'); } }); diff --git a/lib/api/middlewares/tag.js b/lib/api/middlewares/tag.js index 172cf030..42b145c6 100644 --- a/lib/api/middlewares/tag.js +++ b/lib/api/middlewares/tag.js @@ -8,7 +8,7 @@ module.exports = function tag ({ tags, logOnEvent = 'finish' }) { return function tagMiddleware (req, res, next) { const { logger } = res.locals; res.locals.tags = tags; - res.on(logOnEvent, () => logger.info({ tags: res.locals.tags })); + res.on(logOnEvent, () => logger.info({ tags: res.locals.tags }, 'Request tagged')); next(); }; diff --git a/lib/api/middlewares/user.js b/lib/api/middlewares/user.js index dd234509..b3d0307b 100644 --- a/lib/api/middlewares/user.js +++ b/lib/api/middlewares/user.js @@ -6,11 +6,8 @@ module.exports = function user (metadataBackend) { const cdbRequest = new CdbRequest(); return function userMiddleware (req, res, next) { - const { logger } = res.locals; - try { res.locals.user = getUserNameFromRequest(req, cdbRequest); - logger.info({ user: res.locals.user }); res.set('Carto-User', res.locals.user); } catch (err) { return next(err); @@ -29,6 +26,7 @@ module.exports = function user (metadataBackend) { res.locals.userId = userId; res.set('Carto-User-Id', `${userId}`); + res.locals.logger = res.locals.logger.child({ 'cdb-user': res.locals.user }); return next(); }); }; diff --git a/lib/api/sql/copy-controller.js b/lib/api/sql/copy-controller.js index b401c179..572dc86b 100644 --- a/lib/api/sql/copy-controller.js +++ b/lib/api/sql/copy-controller.js @@ -1,6 +1,7 @@ 'use strict'; const initLogger = require('../middlewares/logger'); +const logReqRes = require('../middlewares/log-req-res'); const profiler = require('../middlewares/profiler'); const user = require('../middlewares/user'); const authorization = require('../middlewares/authorization'); @@ -31,9 +32,10 @@ module.exports = class CopyController { const copyFromMiddlewares = endpointGroup => { return [ initLogger({ logger: this.logger }), + user(this.metadataBackend), + logReqRes(), profiler({ statsClient: this.statsdClient }), tag({ tags: ['ingestion', 'copyfrom'] }), - user(this.metadataBackend), rateLimits(this.userLimitsService, endpointGroup), authorization(this.metadataBackend), connectionParams(this.userDatabaseService), @@ -46,11 +48,12 @@ module.exports = class CopyController { const copyToMiddlewares = endpointGroup => { return [ - initLogger({ logger: this.logger, logOnEvent: 'log' }), + initLogger({ logger: this.logger }), + user(this.metadataBackend), + logReqRes({ logOnEvent: 'log' }), profiler({ statsClient: this.statsdClient, logOnEvent: 'log' }), tag({ tags: ['ingestion', 'copyto'], logOnEvent: 'log' }), bodyParser(), - user(this.metadataBackend), rateLimits(this.userLimitsService, endpointGroup), authorization(this.metadataBackend), connectionParams(this.userDatabaseService), @@ -69,7 +72,7 @@ module.exports = class CopyController { function handleCopyTo ({ logger: mainLogger }) { return function handleCopyToMiddleware (req, res, next) { const { logger } = res.locals; - const { id: requestId } = logger.bindings(); + const { request_id: requestId } = logger.bindings(); const { userDbParams, user } = res.locals; const { sql, filename } = res.locals.params; @@ -99,10 +102,10 @@ function handleCopyTo ({ logger: mainLogger }) { const pid = streamCopy.clientProcessID; streamCopy.cancel(pid, StreamCopy.ACTION_TO, (err) => { if (err) { - return mainLogger.error({ error: err }, `Unable to cancel "copy ${StreamCopy.ACTION_TO}" stream query (pid: ${pid}, requestId: ${requestId})`); + return mainLogger.error({ request_id: requestId, exception: err, action: `copy-${StreamCopy.ACTION_TO}`, pid }, 'Unable to cancel stream query'); } - mainLogger.info(`Canceled "copy ${StreamCopy.ACTION_TO}" stream query successfully (pid: ${pid}, requestId: ${requestId})`); + mainLogger.info({ request_id: requestId, action: `copy-${StreamCopy.ACTION_TO}`, pid }, 'Canceled stream query successfully'); }); return next(err); @@ -201,7 +204,7 @@ function errorHandler () { return function errorHandlerMiddleware (err, req, res, next) { const { logger } = res.locals; if (res.headersSent) { - logger.error({ error: err }); + logger.error({ exception: err }, 'Error while handling the request'); const errorHandler = errorHandlerFactory(err); res.write(JSON.stringify(errorHandler.getResponse())); res.end(); diff --git a/lib/api/sql/job-controller.js b/lib/api/sql/job-controller.js index c60be449..662b0303 100644 --- a/lib/api/sql/job-controller.js +++ b/lib/api/sql/job-controller.js @@ -3,6 +3,7 @@ const util = require('util'); const initLogger = require('../middlewares/logger'); +const logReqRes = require('../middlewares/log-req-res'); const profiler = require('../middlewares/profiler'); const bodyParser = require('../middlewares/body-parser'); const user = require('../middlewares/user'); @@ -34,6 +35,8 @@ module.exports = class JobController { sqlRouter.post('/job', [ initLogger({ logger: this.logger }), + user(this.metadataBackend), + logReqRes(), profiler({ statsClient: this.statsdClient }), tag({ tags: ['job', 'create'] }), bodyParser(), @@ -44,6 +47,8 @@ module.exports = class JobController { sqlRouter.get('/job/:job_id', [ initLogger({ logger: this.logger }), + user(this.metadataBackend), + logReqRes(), profiler({ statsClient: this.statsdClient }), tag({ tags: ['job', 'retrieve'] }), bodyParser(), @@ -52,6 +57,8 @@ module.exports = class JobController { sqlRouter.delete('/job/:job_id', [ initLogger({ logger: this.logger }), + user(this.metadataBackend), + logReqRes(), profiler({ statsClient: this.statsdClient }), tag({ tags: ['job', 'cancel'] }), bodyParser(), @@ -65,7 +72,6 @@ function composeJobMiddlewares (metadataBackend, userDatabaseService, jobService const forceToBeMaster = true; return [ - user(metadataBackend), rateLimits(userLimitsService, endpointGroup), authorization(metadataBackend, forceToBeMaster), connectionParams(userDatabaseService), @@ -134,7 +140,7 @@ function createJob (jobService) { return next(err); } - logger.info({ job: job.toJSON() }); + logger.info({ job: job.toJSON() }, 'Batch query job created'); res.statusCode = 201; res.body = job.serialize(); diff --git a/lib/api/sql/query-controller.js b/lib/api/sql/query-controller.js index 6e56dd5f..f8be5e15 100644 --- a/lib/api/sql/query-controller.js +++ b/lib/api/sql/query-controller.js @@ -1,6 +1,7 @@ 'use strict'; const initLogger = require('../middlewares/logger'); +const logReqRes = require('../middlewares/log-req-res'); const profiler = require('../middlewares/profiler'); const bodyParser = require('../middlewares/body-parser'); const user = require('../middlewares/user'); @@ -40,10 +41,11 @@ module.exports = class QueryController { const queryMiddlewares = () => { return [ initLogger({ logger: this.logger }), + user(this.metadataBackend), + logReqRes(), profiler({ statsClient: this.statsdClient }), tag({ tags: ['query'] }), bodyParser(), - user(this.metadataBackend), rateLimits(this.userLimitsService, RATE_LIMIT_ENDPOINTS_GROUPS.QUERY), authorization(this.metadataBackend, forceToBeMaster), connectionParams(this.userDatabaseService), diff --git a/lib/batch/batch.js b/lib/batch/batch.js index 408a211f..329acb4e 100644 --- a/lib/batch/batch.js +++ b/lib/batch/batch.js @@ -116,9 +116,9 @@ Batch.prototype.processJob = function (user, callback) { } if (JobStatus.isFinal(job.data.status)) { - self.logger.info({ job: job.toJSON() }); + self.logger.info({ job: job.toJSON() }, 'Batch query job finished'); } else { - self.logger.debug({ job: job.toJSON() }); + self.logger.debug({ job: job.toJSON() }, 'Batch query job: query done'); } return callback(null, !EMPTY_QUEUE); diff --git a/lib/services/stream-copy-metrics.js b/lib/services/stream-copy-metrics.js index f608fa75..4e04cbf5 100644 --- a/lib/services/stream-copy-metrics.js +++ b/lib/services/stream-copy-metrics.js @@ -82,6 +82,6 @@ module.exports = class StreamCopyMetrics { logData.success = this.success; - this.logger.info({ ingestion: logData }); + this.logger.info({ ingestion: logData }, 'Copy to/from query metrics'); } }; diff --git a/lib/services/stream-copy.js b/lib/services/stream-copy.js index f9e9c243..0ac210c1 100644 --- a/lib/services/stream-copy.js +++ b/lib/services/stream-copy.js @@ -51,7 +51,7 @@ module.exports = class StreamCopy { if (action === ACTION_TO) { pgstream.on('end', () => done()); - pgstream.on('warning', (msg) => this.logger.warn({ error: new Error(msg) })); + pgstream.on('warning', (msg) => this.logger.warn({ exception: new Error(msg) }, msg)); } else if (action === ACTION_FROM) { pgstream.on('finish', () => done()); pgstream.on('client-close', err => client.connection.sendCopyFail(err.message)); diff --git a/lib/utils/logger.js b/lib/utils/logger.js index 3acc1166..f47f9634 100644 --- a/lib/utils/logger.js +++ b/lib/utils/logger.js @@ -15,10 +15,21 @@ module.exports = class Logger { const options = { base: null, // Do not bind hostname, pid and friends by default level: LOG_LEVEL || logLevelFromNodeEnv, + formatters: { + level (label) { + if (label === 'warn') { + return { levelname: 'warning' }; + } + + return { levelname: label }; + } + }, + messageKey: 'event_message', + timestamp: () => `,"timestamp":"${new Date(Date.now()).toISOString()}"`, serializers: { - request: requestSerializer, - response: responseSerializer, - error: (error) => Array.isArray(error) ? error.map((err) => errorSerializer(err)) : [errorSerializer(error)] + client_request: requestSerializer, + server_response: responseSerializer, + exception: errorSerializer } }; const dest = pino.destination({ sync: false }); // stdout diff --git a/metro/config.json b/metro/config.json new file mode 100644 index 00000000..70d472f2 --- /dev/null +++ b/metro/config.json @@ -0,0 +1,94 @@ +{ + "metrics": { + "port": 9144, + "definitions": [ + { + "type": "counter", + "options": { + "name": "sql_api_requests_total", + "help": "SQL API requests total" + }, + "valuePath": "server_response.statusCode", + "shouldMeasure": "({ value }) => Number.isFinite(value)", + "measure": "({ metric }) => metric.inc()" + }, + { + "type": "counter", + "options": { + "name": "sql_api_requests_ok_total", + "help": "SQL API requests ok total" + }, + "valuePath": "server_response.statusCode", + "shouldMeasure": "({ value }) => value >= 200 && value < 400", + "measure": "({ metric }) => metric.inc()" + }, + { + "type": "counter", + "options": { + "name": "sql_api_requests_errors_total", + "help": "SQL API requests errors total" + }, + "valuePath": "server_response.statusCode", + "shouldMeasure": "({ value }) => value >= 400", + "measure": "({ metric }) => metric.inc()" + }, + { + "type": "histogram", + "options": { + "name": "sql_api_response_time_total", + "help": "SQL API response time total" + }, + "valuePath": "stats.response", + "shouldMeasure": "({ value }) => Number.isFinite(value)", + "measure": "({ metric, value }) => metric.observe(value)" + }, + { + "type": "counter", + "options": { + "name": "sql_api_requests", + "help": "SQL API requests per user", + "labelNames": ["user", "http_code"] + }, + "labelPaths": ["cdb-user", "server_response.statusCode"], + "shouldMeasure": "({ labels }) => labels.every((label) => label !== undefined)", + "measure": "({ metric, labels }) => metric.labels(...labels).inc()" + }, + { + "type": "counter", + "options": { + "name": "sql_api_requests_ok", + "help": "SQL API requests per user with success HTTP code", + "labelNames": ["user", "http_code"] + }, + "labelPaths": ["cdb-user", "server_response.statusCode"], + "valuePath": "server_response.statusCode", + "shouldMeasure": "({ labels, value }) => labels.every((label) => label !== undefined) && value >= 200 && value < 400", + "measure": "({ metric, labels }) => metric.labels(...labels).inc()" + }, + { + "type": "counter", + "options": { + "name": "sql_api_requests_errors", + "help": "SQL API requests per user with error HTTP code", + "labelNames": ["user", "http_code"] + }, + "labelPaths": ["cdb-user", "server_response.statusCode"], + "valuePath": "server_response.statusCode", + "shouldMeasure": "({ labels, value }) => labels.every((label) => label !== undefined) && value >= 400", + "measure": "({ metric, labels }) => metric.labels(...labels).inc()" + }, + { + "type": "histogram", + "options": { + "name": "sql_api_response_time", + "help": "SQL API response time total", + "labelNames": ["user"] + }, + "labelPaths": ["cdb-user"], + "valuePath": "stats.response", + "shouldMeasure": "({ labels, value }) => labels.every((label) => label !== undefined) && Number.isFinite(value)", + "measure": "({ metric, labels, value }) => metric.labels(...labels).observe(value)" + } + ] + } +} diff --git a/metro/index.js b/metro/index.js index 123e147f..d9425d12 100644 --- a/metro/index.js +++ b/metro/index.js @@ -1,29 +1,40 @@ 'use strict'; -const split = require('split2'); -const logCollector = require('./log-collector'); -const metricsCollector = require('./metrics-collector'); +const metro = require('./metro'); +const path = require('path'); +const fs = require('fs'); -const streams = [process.stdin, split(), logCollector(), metricsCollector(), process.stdout]; +const { CONFIG_PATH = path.resolve(__dirname, './config.json') } = process.env; +const existsConfigFile = fs.existsSync(CONFIG_PATH); -pipeline('pipe', streams); +if (!existsConfigFile) { + exit(4)(new Error(`Wrong path for CONFIG_PATH env variable: ${CONFIG_PATH} no such file`)); +} -process.on('SIGINT', exitProcess(0)); -process.on('SIGTERM', exitProcess(0)); -process.on('uncaughtException', exitProcess(1)); -process.on('unhandledRejection', exitProcess(1)); +let config; -function pipeline (action, streams) { - for (let index = 0; index < streams.length - 1; index++) { - const source = streams[index]; - const destination = streams[index + 1]; - source[action](destination); +if (existsConfigFile) { + config = fs.readFileSync(CONFIG_PATH); + try { + config = JSON.parse(config); + } catch (e) { + exit(5)(new Error('Wrong config format: invalid JSON')); } } -function exitProcess (code = 0) { - return function exitProcess (signal) { - pipeline('unpipe', streams); +metro({ metrics: config && config.metrics }) + .then(exit(0)) + .catch(exit(1)); + +process.on('uncaughtException', exit(2)); +process.on('unhandledRejection', exit(3)); + +function exit (code = 1) { + return function (err) { + if (err) { + console.error(err); + } + process.exit(code); }; } diff --git a/metro/log-collector.js b/metro/log-collector.js deleted file mode 100644 index 88123db9..00000000 --- a/metro/log-collector.js +++ /dev/null @@ -1,105 +0,0 @@ -'use strict' - -const fs = require('fs'); -const split = require('split2'); -const assingDeep = require('assign-deep'); -const { Transform } = require('stream'); -const DEV_ENVS = ['test', 'development']; -const dumpPath = `${__dirname}/dump.json`; - -let logs; - -const LEVELS = { - 10: 'trace', - 20: 'debug', - 30: 'info', - 40: 'warn', - 50: 'error', - 60: 'fatal' -}; - -module.exports = function logCollector () { - const stream = new Transform({ - transform (chunk, enc, callback) { - let entry; - - try { - entry = JSON.parse(chunk); - const { level, time } = entry; - - if (level === undefined && time === undefined) { - throw new Error('Entry log is not valid'); - } - } catch (e) { - if (DEV_ENVS.includes(process.env.NODE_ENV)) { - this.push(chunk + '\n'); - } - return callback(); - } - - const { id } = entry; - - if (id === undefined) { - entry.level = LEVELS[entry.level]; - this.push(`${JSON.stringify(entry)}\n`); - return callback(); - } - - if (logs.has(id)) { - const accEntry = logs.get(id); - const { end } = entry; - - if (end === true) { - accEntry.level = LEVELS[accEntry.level]; - accEntry.time = entry.time; - this.push(`${JSON.stringify(accEntry)}\n`); - logs.delete(id); - return callback(); - } - - if (accEntry.level > entry.level) { - delete entry.level; - } - - if (hasProperty(accEntry, 'error') && hasProperty(entry, 'error')) { - logs.set(id, assingDeep({}, accEntry, entry, { error: accEntry.error.concat(entry.error) })); - } else { - logs.set(id, assingDeep({}, accEntry, entry)); - } - } else { - logs.set(id, entry); - } - - callback(); - } - }); - - stream.on('pipe', () => { - if (!fs.existsSync(dumpPath)) { - logs = new Map(); - return; - } - - try { - const dump = require(dumpPath); - logs = new Map(dump); - } catch (err) { - console.error(`Cannot read the dump for unfinished logs: ${err}`); - logs = new Map(); - } - }); - - stream.on('unpipe', () => { - try { - fs.writeFileSync(dumpPath, JSON.stringify([...logs])); - } catch (err) { - console.error(`Cannot create a dump for unfinished logs: ${err}`); - } - }); - - return stream; -}; - -function hasProperty (obj, prop) { - return Object.prototype.hasOwnProperty.call(obj, prop); -} diff --git a/metro/metrics-collector.js b/metro/metrics-collector.js index 0d63c587..168c504d 100644 --- a/metro/metrics-collector.js +++ b/metro/metrics-collector.js @@ -2,119 +2,101 @@ const http = require('http'); const { Counter, Histogram, register } = require('prom-client'); -const { Transform } = require('stream'); +const flatten = require('flat'); +const { Transform, PassThrough } = require('stream'); const DEV_ENVS = ['test', 'development']; -const requestCounter = new Counter({ - name: 'sql_api_requests_total', - help: 'SQL API requests total' -}); +const factory = { + counter: Counter, + histogram: Histogram +}; -const requestOkCounter = new Counter({ - name: 'sql_api_requests_ok_total', - help: 'SQL API requests ok total' -}); +module.exports = class MetricsCollector { + constructor ({ port = 0, definitions } = {}) { + this._port = port; + this._definitions = definitions; + this._server = null; + this._stream = createTransformStream(this._definitions); + } -const requestErrorCounter = new Counter({ - name: 'sql_api_requests_errors_total', - help: 'SQL API requests errors total' -}); + get stream () { + return this._stream; + } -const responseTimeHistogram = new Histogram({ - name: 'sql_api_response_time_total', - help: 'SQL API response time total' -}); + start () { + return new Promise((resolve, reject) => { + this._server = http.createServer((req, res) => { + res.writeHead(200, { 'Content-Type': register.contentType }); + res.end(register.metrics()); + }); -const userRequestCounter = new Counter({ - name: 'sql_api_requests', - help: 'SQL API requests per user', - labelNames: ['user', 'http_code'] -}); + this._server.once('error', err => reject(err)); + this._server.once('listening', () => resolve()); + this._server.listen(this._port); + }); + } -const userRequestOkCounter = new Counter({ - name: 'sql_api_requests_ok', - help: 'SQL API requests per user with success HTTP code', - labelNames: ['user', 'http_code'] -}); + stop () { + return new Promise((resolve) => { + register.clear(); + if (!this._server) { + return resolve(); + } -const userRequestErrorCounter = new Counter({ - name: 'sql_api_requests_errors', - help: 'SQL API requests per user with error HTTP code', - labelNames: ['user', 'http_code'] -}); + this._server.once('close', () => { + this._server = null; + resolve(); + }); -const userResponseTimeHistogram = new Histogram({ - name: 'sql_api_response_time', - help: 'SQL API response time total', - labelNames: ['user'] -}); + this._server.close(); + }); + }; +}; + +function createTransformStream (definitions) { + if (typeof definitions !== 'object') { + return new PassThrough(); + } + + const metrics = []; + + for (const { type, options, valuePath, labelPaths, shouldMeasure, measure } of definitions) { + metrics.push({ + instance: new factory[type](options), + valuePath, + labelPaths, + shouldMeasure: eval(shouldMeasure), // eslint-disable-line no-eval + measure: eval(measure) // eslint-disable-line no-eval + }); + } -module.exports = function metricsCollector () { return new Transform({ transform (chunk, enc, callback) { let entry; try { entry = JSON.parse(chunk); - const { level, time } = entry; - - if (level === undefined && time === undefined) { - throw new Error('Entry log is not valid'); - } } catch (e) { if (DEV_ENVS.includes(process.env.NODE_ENV)) { - this.push(chunk); + this.push(chunk + '\n'); } return callback(); } - const { request, response, stats } = entry; + const flatEntry = flatten(entry); - if (request === undefined || response === undefined || stats === undefined) { - this.push(chunk); - return callback(); - } + for (const metric of metrics) { + const value = flatEntry[metric.valuePath]; + const labels = Array.isArray(metric.labelPaths) && metric.labelPaths.map(path => flatEntry[path]); - const { statusCode, headers } = response; - const { 'carto-user': user } = headers; - - requestCounter.inc(); - - if (statusCode !== undefined && user !== undefined) { - userRequestCounter.labels(user, `${statusCode}`).inc(); - } - - if (statusCode >= 200 && statusCode < 400) { - requestOkCounter.inc(); - if (user !== undefined) { - userRequestOkCounter.labels(user, `${statusCode}`).inc(); - } - } else if (statusCode >= 400) { - requestErrorCounter.inc(); - if (user !== undefined) { - userRequestErrorCounter.labels(user, `${statusCode}`).inc(); + if (metric.shouldMeasure({ labels, value })) { + metric.measure({ metric: metric.instance, labels, value }); } } - const { response: responseTime } = stats; + this.push(`${JSON.stringify(entry)}\n`); - if (Number.isFinite(responseTime)) { - responseTimeHistogram.observe(responseTime); - userResponseTimeHistogram.labels(user).observe(responseTime); - } - - this.push(chunk); - callback(); + return callback(); } }); -}; - -const port = process.env.PORT || 9144; - -http - .createServer((req, res) => { - res.writeHead(200, { 'Content-Type': register.contentType }); - res.end(register.metrics()); - }) - .listen(port) - .unref(); +} diff --git a/metro/metro.js b/metro/metro.js new file mode 100644 index 00000000..241c49ee --- /dev/null +++ b/metro/metro.js @@ -0,0 +1,19 @@ +'use strict'; + +const util = require('util'); +const stream = require('stream'); +const pipeline = util.promisify(stream.pipeline); +const split = require('split2'); +const MetricsCollector = require('./metrics-collector'); + +module.exports = async function metro ({ input = process.stdin, output = process.stdout, metrics = {} } = {}) { + const metricsCollector = new MetricsCollector(metrics); + const { stream: metricsStream } = metricsCollector; + + try { + await metricsCollector.start(); + await pipeline(input, split(), metricsStream, output); + } finally { + await metricsCollector.stop(); + } +};