Merge pull request #666 from CartoDB/feature/ch91877/remove-log-aggregation-in-metro
Adapt logger output to the new standard for the entire platform
This commit is contained in:
commit
ad2e37e89e
@ -19,8 +19,7 @@ module.exports = function affectedTables () {
|
||||
return next();
|
||||
})
|
||||
.catch(err => {
|
||||
err.message = `Error on query explain "${sql}": ${err.message}`;
|
||||
logger.warn({ error: err });
|
||||
logger.warn({ exception: err }, 'Error on query explain');
|
||||
|
||||
return next();
|
||||
});
|
||||
|
@ -8,7 +8,7 @@ module.exports = function error () {
|
||||
const errorHandler = errorHandlerFactory(err);
|
||||
const errorResponse = errorHandler.getResponse();
|
||||
|
||||
logger.error({ error: err });
|
||||
logger.error({ exception: err }, 'Error while handling the request');
|
||||
|
||||
// Force inline content disposition
|
||||
res.header('Content-Disposition', 'inline');
|
||||
|
@ -12,7 +12,7 @@ module.exports = function logQuery () {
|
||||
return function logQueryMiddleware (req, res, next) {
|
||||
const { logger } = res.locals;
|
||||
|
||||
logger.info({ sql: ensureMaxQueryLength(res.locals.params.sql) });
|
||||
logger.info({ sql: ensureMaxQueryLength(res.locals.params.sql) }, 'Input query');
|
||||
|
||||
return next();
|
||||
};
|
||||
|
10
lib/api/middlewares/log-req-res.js
Normal file
10
lib/api/middlewares/log-req-res.js
Normal file
@ -0,0 +1,10 @@
|
||||
'use strict';
|
||||
|
||||
module.exports = function logReqRes ({ logOnEvent = 'finish' } = {}) {
|
||||
return function logReqResMiddleware (req, res, next) {
|
||||
const { logger } = res.locals;
|
||||
logger.info({ client_request: req }, 'Incoming request');
|
||||
res.on(logOnEvent, () => logger.info({ server_response: res, status: res.statusCode }, 'Response sent'));
|
||||
next();
|
||||
};
|
||||
};
|
@ -2,15 +2,10 @@
|
||||
|
||||
const uuid = require('uuid');
|
||||
|
||||
module.exports = function initLogger ({ logger, logOnEvent = 'finish' }) {
|
||||
module.exports = function initLogger ({ logger }) {
|
||||
return function initLoggerMiddleware (req, res, next) {
|
||||
const id = req.get('X-Request-Id') || uuid.v4();
|
||||
res.locals.logger = logger.child({ id });
|
||||
|
||||
res.locals.logger.info({ request: req });
|
||||
res.on(logOnEvent, () => res.locals.logger.info({ response: res }));
|
||||
res.on('close', () => res.locals.logger.info({ end: true }));
|
||||
|
||||
const requestId = req.get('X-Request-Id') || uuid.v4();
|
||||
res.locals.logger = logger.child({ request_id: requestId });
|
||||
next();
|
||||
};
|
||||
};
|
||||
|
@ -18,12 +18,13 @@ module.exports = function profiler ({ statsClient, logOnEvent = 'finish' }) {
|
||||
res.on(logOnEvent, () => {
|
||||
req.profiler.add({ response: new Date() - start });
|
||||
req.profiler.end();
|
||||
logger.info({ stats: req.profiler.toJSON() });
|
||||
const stats = req.profiler.toJSON();
|
||||
logger.info({ stats, duration: stats.response / 1000, duration_ms: stats.response }, 'Request profiling stats');
|
||||
|
||||
try {
|
||||
req.profiler.sendStats();
|
||||
} catch (err) {
|
||||
logger.warn({ error: err });
|
||||
logger.warn({ exception: err }, 'Could not send stats to StatsD');
|
||||
}
|
||||
});
|
||||
|
||||
|
@ -8,7 +8,7 @@ module.exports = function tag ({ tags, logOnEvent = 'finish' }) {
|
||||
return function tagMiddleware (req, res, next) {
|
||||
const { logger } = res.locals;
|
||||
res.locals.tags = tags;
|
||||
res.on(logOnEvent, () => logger.info({ tags: res.locals.tags }));
|
||||
res.on(logOnEvent, () => logger.info({ tags: res.locals.tags }, 'Request tagged'));
|
||||
|
||||
next();
|
||||
};
|
||||
|
@ -6,11 +6,8 @@ module.exports = function user (metadataBackend) {
|
||||
const cdbRequest = new CdbRequest();
|
||||
|
||||
return function userMiddleware (req, res, next) {
|
||||
const { logger } = res.locals;
|
||||
|
||||
try {
|
||||
res.locals.user = getUserNameFromRequest(req, cdbRequest);
|
||||
logger.info({ user: res.locals.user });
|
||||
res.set('Carto-User', res.locals.user);
|
||||
} catch (err) {
|
||||
return next(err);
|
||||
@ -29,6 +26,7 @@ module.exports = function user (metadataBackend) {
|
||||
|
||||
res.locals.userId = userId;
|
||||
res.set('Carto-User-Id', `${userId}`);
|
||||
res.locals.logger = res.locals.logger.child({ 'cdb-user': res.locals.user });
|
||||
return next();
|
||||
});
|
||||
};
|
||||
|
@ -1,6 +1,7 @@
|
||||
'use strict';
|
||||
|
||||
const initLogger = require('../middlewares/logger');
|
||||
const logReqRes = require('../middlewares/log-req-res');
|
||||
const profiler = require('../middlewares/profiler');
|
||||
const user = require('../middlewares/user');
|
||||
const authorization = require('../middlewares/authorization');
|
||||
@ -31,9 +32,10 @@ module.exports = class CopyController {
|
||||
const copyFromMiddlewares = endpointGroup => {
|
||||
return [
|
||||
initLogger({ logger: this.logger }),
|
||||
user(this.metadataBackend),
|
||||
logReqRes(),
|
||||
profiler({ statsClient: this.statsdClient }),
|
||||
tag({ tags: ['ingestion', 'copyfrom'] }),
|
||||
user(this.metadataBackend),
|
||||
rateLimits(this.userLimitsService, endpointGroup),
|
||||
authorization(this.metadataBackend),
|
||||
connectionParams(this.userDatabaseService),
|
||||
@ -46,11 +48,12 @@ module.exports = class CopyController {
|
||||
|
||||
const copyToMiddlewares = endpointGroup => {
|
||||
return [
|
||||
initLogger({ logger: this.logger, logOnEvent: 'log' }),
|
||||
initLogger({ logger: this.logger }),
|
||||
user(this.metadataBackend),
|
||||
logReqRes({ logOnEvent: 'log' }),
|
||||
profiler({ statsClient: this.statsdClient, logOnEvent: 'log' }),
|
||||
tag({ tags: ['ingestion', 'copyto'], logOnEvent: 'log' }),
|
||||
bodyParser(),
|
||||
user(this.metadataBackend),
|
||||
rateLimits(this.userLimitsService, endpointGroup),
|
||||
authorization(this.metadataBackend),
|
||||
connectionParams(this.userDatabaseService),
|
||||
@ -69,7 +72,7 @@ module.exports = class CopyController {
|
||||
function handleCopyTo ({ logger: mainLogger }) {
|
||||
return function handleCopyToMiddleware (req, res, next) {
|
||||
const { logger } = res.locals;
|
||||
const { id: requestId } = logger.bindings();
|
||||
const { request_id: requestId } = logger.bindings();
|
||||
const { userDbParams, user } = res.locals;
|
||||
const { sql, filename } = res.locals.params;
|
||||
|
||||
@ -99,10 +102,10 @@ function handleCopyTo ({ logger: mainLogger }) {
|
||||
const pid = streamCopy.clientProcessID;
|
||||
streamCopy.cancel(pid, StreamCopy.ACTION_TO, (err) => {
|
||||
if (err) {
|
||||
return mainLogger.error({ error: err }, `Unable to cancel "copy ${StreamCopy.ACTION_TO}" stream query (pid: ${pid}, requestId: ${requestId})`);
|
||||
return mainLogger.error({ request_id: requestId, exception: err, action: `copy-${StreamCopy.ACTION_TO}`, pid }, 'Unable to cancel stream query');
|
||||
}
|
||||
|
||||
mainLogger.info(`Canceled "copy ${StreamCopy.ACTION_TO}" stream query successfully (pid: ${pid}, requestId: ${requestId})`);
|
||||
mainLogger.info({ request_id: requestId, action: `copy-${StreamCopy.ACTION_TO}`, pid }, 'Canceled stream query successfully');
|
||||
});
|
||||
|
||||
return next(err);
|
||||
@ -201,7 +204,7 @@ function errorHandler () {
|
||||
return function errorHandlerMiddleware (err, req, res, next) {
|
||||
const { logger } = res.locals;
|
||||
if (res.headersSent) {
|
||||
logger.error({ error: err });
|
||||
logger.error({ exception: err }, 'Error while handling the request');
|
||||
const errorHandler = errorHandlerFactory(err);
|
||||
res.write(JSON.stringify(errorHandler.getResponse()));
|
||||
res.end();
|
||||
|
@ -3,6 +3,7 @@
|
||||
const util = require('util');
|
||||
|
||||
const initLogger = require('../middlewares/logger');
|
||||
const logReqRes = require('../middlewares/log-req-res');
|
||||
const profiler = require('../middlewares/profiler');
|
||||
const bodyParser = require('../middlewares/body-parser');
|
||||
const user = require('../middlewares/user');
|
||||
@ -34,6 +35,8 @@ module.exports = class JobController {
|
||||
|
||||
sqlRouter.post('/job', [
|
||||
initLogger({ logger: this.logger }),
|
||||
user(this.metadataBackend),
|
||||
logReqRes(),
|
||||
profiler({ statsClient: this.statsdClient }),
|
||||
tag({ tags: ['job', 'create'] }),
|
||||
bodyParser(),
|
||||
@ -44,6 +47,8 @@ module.exports = class JobController {
|
||||
|
||||
sqlRouter.get('/job/:job_id', [
|
||||
initLogger({ logger: this.logger }),
|
||||
user(this.metadataBackend),
|
||||
logReqRes(),
|
||||
profiler({ statsClient: this.statsdClient }),
|
||||
tag({ tags: ['job', 'retrieve'] }),
|
||||
bodyParser(),
|
||||
@ -52,6 +57,8 @@ module.exports = class JobController {
|
||||
|
||||
sqlRouter.delete('/job/:job_id', [
|
||||
initLogger({ logger: this.logger }),
|
||||
user(this.metadataBackend),
|
||||
logReqRes(),
|
||||
profiler({ statsClient: this.statsdClient }),
|
||||
tag({ tags: ['job', 'cancel'] }),
|
||||
bodyParser(),
|
||||
@ -65,7 +72,6 @@ function composeJobMiddlewares (metadataBackend, userDatabaseService, jobService
|
||||
const forceToBeMaster = true;
|
||||
|
||||
return [
|
||||
user(metadataBackend),
|
||||
rateLimits(userLimitsService, endpointGroup),
|
||||
authorization(metadataBackend, forceToBeMaster),
|
||||
connectionParams(userDatabaseService),
|
||||
@ -134,7 +140,7 @@ function createJob (jobService) {
|
||||
return next(err);
|
||||
}
|
||||
|
||||
logger.info({ job: job.toJSON() });
|
||||
logger.info({ job: job.toJSON() }, 'Batch query job created');
|
||||
|
||||
res.statusCode = 201;
|
||||
res.body = job.serialize();
|
||||
|
@ -1,6 +1,7 @@
|
||||
'use strict';
|
||||
|
||||
const initLogger = require('../middlewares/logger');
|
||||
const logReqRes = require('../middlewares/log-req-res');
|
||||
const profiler = require('../middlewares/profiler');
|
||||
const bodyParser = require('../middlewares/body-parser');
|
||||
const user = require('../middlewares/user');
|
||||
@ -40,10 +41,11 @@ module.exports = class QueryController {
|
||||
const queryMiddlewares = () => {
|
||||
return [
|
||||
initLogger({ logger: this.logger }),
|
||||
user(this.metadataBackend),
|
||||
logReqRes(),
|
||||
profiler({ statsClient: this.statsdClient }),
|
||||
tag({ tags: ['query'] }),
|
||||
bodyParser(),
|
||||
user(this.metadataBackend),
|
||||
rateLimits(this.userLimitsService, RATE_LIMIT_ENDPOINTS_GROUPS.QUERY),
|
||||
authorization(this.metadataBackend, forceToBeMaster),
|
||||
connectionParams(this.userDatabaseService),
|
||||
|
@ -116,9 +116,9 @@ Batch.prototype.processJob = function (user, callback) {
|
||||
}
|
||||
|
||||
if (JobStatus.isFinal(job.data.status)) {
|
||||
self.logger.info({ job: job.toJSON() });
|
||||
self.logger.info({ job: job.toJSON() }, 'Batch query job finished');
|
||||
} else {
|
||||
self.logger.debug({ job: job.toJSON() });
|
||||
self.logger.debug({ job: job.toJSON() }, 'Batch query job: query done');
|
||||
}
|
||||
|
||||
return callback(null, !EMPTY_QUEUE);
|
||||
|
@ -82,6 +82,6 @@ module.exports = class StreamCopyMetrics {
|
||||
|
||||
logData.success = this.success;
|
||||
|
||||
this.logger.info({ ingestion: logData });
|
||||
this.logger.info({ ingestion: logData }, 'Copy to/from query metrics');
|
||||
}
|
||||
};
|
||||
|
@ -51,7 +51,7 @@ module.exports = class StreamCopy {
|
||||
|
||||
if (action === ACTION_TO) {
|
||||
pgstream.on('end', () => done());
|
||||
pgstream.on('warning', (msg) => this.logger.warn({ error: new Error(msg) }));
|
||||
pgstream.on('warning', (msg) => this.logger.warn({ exception: new Error(msg) }, msg));
|
||||
} else if (action === ACTION_FROM) {
|
||||
pgstream.on('finish', () => done());
|
||||
pgstream.on('client-close', err => client.connection.sendCopyFail(err.message));
|
||||
|
@ -15,10 +15,21 @@ module.exports = class Logger {
|
||||
const options = {
|
||||
base: null, // Do not bind hostname, pid and friends by default
|
||||
level: LOG_LEVEL || logLevelFromNodeEnv,
|
||||
formatters: {
|
||||
level (label) {
|
||||
if (label === 'warn') {
|
||||
return { levelname: 'warning' };
|
||||
}
|
||||
|
||||
return { levelname: label };
|
||||
}
|
||||
},
|
||||
messageKey: 'event_message',
|
||||
timestamp: () => `,"timestamp":"${new Date(Date.now()).toISOString()}"`,
|
||||
serializers: {
|
||||
request: requestSerializer,
|
||||
response: responseSerializer,
|
||||
error: (error) => Array.isArray(error) ? error.map((err) => errorSerializer(err)) : [errorSerializer(error)]
|
||||
client_request: requestSerializer,
|
||||
server_response: responseSerializer,
|
||||
exception: errorSerializer
|
||||
}
|
||||
};
|
||||
const dest = pino.destination({ sync: false }); // stdout
|
||||
|
94
metro/config.json
Normal file
94
metro/config.json
Normal file
@ -0,0 +1,94 @@
|
||||
{
|
||||
"metrics": {
|
||||
"port": 9144,
|
||||
"definitions": [
|
||||
{
|
||||
"type": "counter",
|
||||
"options": {
|
||||
"name": "sql_api_requests_total",
|
||||
"help": "SQL API requests total"
|
||||
},
|
||||
"valuePath": "server_response.statusCode",
|
||||
"shouldMeasure": "({ value }) => Number.isFinite(value)",
|
||||
"measure": "({ metric }) => metric.inc()"
|
||||
},
|
||||
{
|
||||
"type": "counter",
|
||||
"options": {
|
||||
"name": "sql_api_requests_ok_total",
|
||||
"help": "SQL API requests ok total"
|
||||
},
|
||||
"valuePath": "server_response.statusCode",
|
||||
"shouldMeasure": "({ value }) => value >= 200 && value < 400",
|
||||
"measure": "({ metric }) => metric.inc()"
|
||||
},
|
||||
{
|
||||
"type": "counter",
|
||||
"options": {
|
||||
"name": "sql_api_requests_errors_total",
|
||||
"help": "SQL API requests errors total"
|
||||
},
|
||||
"valuePath": "server_response.statusCode",
|
||||
"shouldMeasure": "({ value }) => value >= 400",
|
||||
"measure": "({ metric }) => metric.inc()"
|
||||
},
|
||||
{
|
||||
"type": "histogram",
|
||||
"options": {
|
||||
"name": "sql_api_response_time_total",
|
||||
"help": "SQL API response time total"
|
||||
},
|
||||
"valuePath": "stats.response",
|
||||
"shouldMeasure": "({ value }) => Number.isFinite(value)",
|
||||
"measure": "({ metric, value }) => metric.observe(value)"
|
||||
},
|
||||
{
|
||||
"type": "counter",
|
||||
"options": {
|
||||
"name": "sql_api_requests",
|
||||
"help": "SQL API requests per user",
|
||||
"labelNames": ["user", "http_code"]
|
||||
},
|
||||
"labelPaths": ["cdb-user", "server_response.statusCode"],
|
||||
"shouldMeasure": "({ labels }) => labels.every((label) => label !== undefined)",
|
||||
"measure": "({ metric, labels }) => metric.labels(...labels).inc()"
|
||||
},
|
||||
{
|
||||
"type": "counter",
|
||||
"options": {
|
||||
"name": "sql_api_requests_ok",
|
||||
"help": "SQL API requests per user with success HTTP code",
|
||||
"labelNames": ["user", "http_code"]
|
||||
},
|
||||
"labelPaths": ["cdb-user", "server_response.statusCode"],
|
||||
"valuePath": "server_response.statusCode",
|
||||
"shouldMeasure": "({ labels, value }) => labels.every((label) => label !== undefined) && value >= 200 && value < 400",
|
||||
"measure": "({ metric, labels }) => metric.labels(...labels).inc()"
|
||||
},
|
||||
{
|
||||
"type": "counter",
|
||||
"options": {
|
||||
"name": "sql_api_requests_errors",
|
||||
"help": "SQL API requests per user with error HTTP code",
|
||||
"labelNames": ["user", "http_code"]
|
||||
},
|
||||
"labelPaths": ["cdb-user", "server_response.statusCode"],
|
||||
"valuePath": "server_response.statusCode",
|
||||
"shouldMeasure": "({ labels, value }) => labels.every((label) => label !== undefined) && value >= 400",
|
||||
"measure": "({ metric, labels }) => metric.labels(...labels).inc()"
|
||||
},
|
||||
{
|
||||
"type": "histogram",
|
||||
"options": {
|
||||
"name": "sql_api_response_time",
|
||||
"help": "SQL API response time total",
|
||||
"labelNames": ["user"]
|
||||
},
|
||||
"labelPaths": ["cdb-user"],
|
||||
"valuePath": "stats.response",
|
||||
"shouldMeasure": "({ labels, value }) => labels.every((label) => label !== undefined) && Number.isFinite(value)",
|
||||
"measure": "({ metric, labels, value }) => metric.labels(...labels).observe(value)"
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
@ -1,29 +1,40 @@
|
||||
'use strict';
|
||||
|
||||
const split = require('split2');
|
||||
const logCollector = require('./log-collector');
|
||||
const metricsCollector = require('./metrics-collector');
|
||||
const metro = require('./metro');
|
||||
const path = require('path');
|
||||
const fs = require('fs');
|
||||
|
||||
const streams = [process.stdin, split(), logCollector(), metricsCollector(), process.stdout];
|
||||
const { CONFIG_PATH = path.resolve(__dirname, './config.json') } = process.env;
|
||||
const existsConfigFile = fs.existsSync(CONFIG_PATH);
|
||||
|
||||
pipeline('pipe', streams);
|
||||
if (!existsConfigFile) {
|
||||
exit(4)(new Error(`Wrong path for CONFIG_PATH env variable: ${CONFIG_PATH} no such file`));
|
||||
}
|
||||
|
||||
process.on('SIGINT', exitProcess(0));
|
||||
process.on('SIGTERM', exitProcess(0));
|
||||
process.on('uncaughtException', exitProcess(1));
|
||||
process.on('unhandledRejection', exitProcess(1));
|
||||
let config;
|
||||
|
||||
function pipeline (action, streams) {
|
||||
for (let index = 0; index < streams.length - 1; index++) {
|
||||
const source = streams[index];
|
||||
const destination = streams[index + 1];
|
||||
source[action](destination);
|
||||
if (existsConfigFile) {
|
||||
config = fs.readFileSync(CONFIG_PATH);
|
||||
try {
|
||||
config = JSON.parse(config);
|
||||
} catch (e) {
|
||||
exit(5)(new Error('Wrong config format: invalid JSON'));
|
||||
}
|
||||
}
|
||||
|
||||
function exitProcess (code = 0) {
|
||||
return function exitProcess (signal) {
|
||||
pipeline('unpipe', streams);
|
||||
metro({ metrics: config && config.metrics })
|
||||
.then(exit(0))
|
||||
.catch(exit(1));
|
||||
|
||||
process.on('uncaughtException', exit(2));
|
||||
process.on('unhandledRejection', exit(3));
|
||||
|
||||
function exit (code = 1) {
|
||||
return function (err) {
|
||||
if (err) {
|
||||
console.error(err);
|
||||
}
|
||||
|
||||
process.exit(code);
|
||||
};
|
||||
}
|
||||
|
@ -1,105 +0,0 @@
|
||||
'use strict'
|
||||
|
||||
const fs = require('fs');
|
||||
const split = require('split2');
|
||||
const assingDeep = require('assign-deep');
|
||||
const { Transform } = require('stream');
|
||||
const DEV_ENVS = ['test', 'development'];
|
||||
const dumpPath = `${__dirname}/dump.json`;
|
||||
|
||||
let logs;
|
||||
|
||||
const LEVELS = {
|
||||
10: 'trace',
|
||||
20: 'debug',
|
||||
30: 'info',
|
||||
40: 'warn',
|
||||
50: 'error',
|
||||
60: 'fatal'
|
||||
};
|
||||
|
||||
module.exports = function logCollector () {
|
||||
const stream = new Transform({
|
||||
transform (chunk, enc, callback) {
|
||||
let entry;
|
||||
|
||||
try {
|
||||
entry = JSON.parse(chunk);
|
||||
const { level, time } = entry;
|
||||
|
||||
if (level === undefined && time === undefined) {
|
||||
throw new Error('Entry log is not valid');
|
||||
}
|
||||
} catch (e) {
|
||||
if (DEV_ENVS.includes(process.env.NODE_ENV)) {
|
||||
this.push(chunk + '\n');
|
||||
}
|
||||
return callback();
|
||||
}
|
||||
|
||||
const { id } = entry;
|
||||
|
||||
if (id === undefined) {
|
||||
entry.level = LEVELS[entry.level];
|
||||
this.push(`${JSON.stringify(entry)}\n`);
|
||||
return callback();
|
||||
}
|
||||
|
||||
if (logs.has(id)) {
|
||||
const accEntry = logs.get(id);
|
||||
const { end } = entry;
|
||||
|
||||
if (end === true) {
|
||||
accEntry.level = LEVELS[accEntry.level];
|
||||
accEntry.time = entry.time;
|
||||
this.push(`${JSON.stringify(accEntry)}\n`);
|
||||
logs.delete(id);
|
||||
return callback();
|
||||
}
|
||||
|
||||
if (accEntry.level > entry.level) {
|
||||
delete entry.level;
|
||||
}
|
||||
|
||||
if (hasProperty(accEntry, 'error') && hasProperty(entry, 'error')) {
|
||||
logs.set(id, assingDeep({}, accEntry, entry, { error: accEntry.error.concat(entry.error) }));
|
||||
} else {
|
||||
logs.set(id, assingDeep({}, accEntry, entry));
|
||||
}
|
||||
} else {
|
||||
logs.set(id, entry);
|
||||
}
|
||||
|
||||
callback();
|
||||
}
|
||||
});
|
||||
|
||||
stream.on('pipe', () => {
|
||||
if (!fs.existsSync(dumpPath)) {
|
||||
logs = new Map();
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
const dump = require(dumpPath);
|
||||
logs = new Map(dump);
|
||||
} catch (err) {
|
||||
console.error(`Cannot read the dump for unfinished logs: ${err}`);
|
||||
logs = new Map();
|
||||
}
|
||||
});
|
||||
|
||||
stream.on('unpipe', () => {
|
||||
try {
|
||||
fs.writeFileSync(dumpPath, JSON.stringify([...logs]));
|
||||
} catch (err) {
|
||||
console.error(`Cannot create a dump for unfinished logs: ${err}`);
|
||||
}
|
||||
});
|
||||
|
||||
return stream;
|
||||
};
|
||||
|
||||
function hasProperty (obj, prop) {
|
||||
return Object.prototype.hasOwnProperty.call(obj, prop);
|
||||
}
|
@ -2,119 +2,101 @@
|
||||
|
||||
const http = require('http');
|
||||
const { Counter, Histogram, register } = require('prom-client');
|
||||
const { Transform } = require('stream');
|
||||
const flatten = require('flat');
|
||||
const { Transform, PassThrough } = require('stream');
|
||||
const DEV_ENVS = ['test', 'development'];
|
||||
|
||||
const requestCounter = new Counter({
|
||||
name: 'sql_api_requests_total',
|
||||
help: 'SQL API requests total'
|
||||
});
|
||||
const factory = {
|
||||
counter: Counter,
|
||||
histogram: Histogram
|
||||
};
|
||||
|
||||
const requestOkCounter = new Counter({
|
||||
name: 'sql_api_requests_ok_total',
|
||||
help: 'SQL API requests ok total'
|
||||
});
|
||||
module.exports = class MetricsCollector {
|
||||
constructor ({ port = 0, definitions } = {}) {
|
||||
this._port = port;
|
||||
this._definitions = definitions;
|
||||
this._server = null;
|
||||
this._stream = createTransformStream(this._definitions);
|
||||
}
|
||||
|
||||
const requestErrorCounter = new Counter({
|
||||
name: 'sql_api_requests_errors_total',
|
||||
help: 'SQL API requests errors total'
|
||||
});
|
||||
get stream () {
|
||||
return this._stream;
|
||||
}
|
||||
|
||||
const responseTimeHistogram = new Histogram({
|
||||
name: 'sql_api_response_time_total',
|
||||
help: 'SQL API response time total'
|
||||
});
|
||||
start () {
|
||||
return new Promise((resolve, reject) => {
|
||||
this._server = http.createServer((req, res) => {
|
||||
res.writeHead(200, { 'Content-Type': register.contentType });
|
||||
res.end(register.metrics());
|
||||
});
|
||||
|
||||
const userRequestCounter = new Counter({
|
||||
name: 'sql_api_requests',
|
||||
help: 'SQL API requests per user',
|
||||
labelNames: ['user', 'http_code']
|
||||
});
|
||||
this._server.once('error', err => reject(err));
|
||||
this._server.once('listening', () => resolve());
|
||||
this._server.listen(this._port);
|
||||
});
|
||||
}
|
||||
|
||||
const userRequestOkCounter = new Counter({
|
||||
name: 'sql_api_requests_ok',
|
||||
help: 'SQL API requests per user with success HTTP code',
|
||||
labelNames: ['user', 'http_code']
|
||||
});
|
||||
stop () {
|
||||
return new Promise((resolve) => {
|
||||
register.clear();
|
||||
if (!this._server) {
|
||||
return resolve();
|
||||
}
|
||||
|
||||
const userRequestErrorCounter = new Counter({
|
||||
name: 'sql_api_requests_errors',
|
||||
help: 'SQL API requests per user with error HTTP code',
|
||||
labelNames: ['user', 'http_code']
|
||||
});
|
||||
this._server.once('close', () => {
|
||||
this._server = null;
|
||||
resolve();
|
||||
});
|
||||
|
||||
const userResponseTimeHistogram = new Histogram({
|
||||
name: 'sql_api_response_time',
|
||||
help: 'SQL API response time total',
|
||||
labelNames: ['user']
|
||||
});
|
||||
this._server.close();
|
||||
});
|
||||
};
|
||||
};
|
||||
|
||||
function createTransformStream (definitions) {
|
||||
if (typeof definitions !== 'object') {
|
||||
return new PassThrough();
|
||||
}
|
||||
|
||||
const metrics = [];
|
||||
|
||||
for (const { type, options, valuePath, labelPaths, shouldMeasure, measure } of definitions) {
|
||||
metrics.push({
|
||||
instance: new factory[type](options),
|
||||
valuePath,
|
||||
labelPaths,
|
||||
shouldMeasure: eval(shouldMeasure), // eslint-disable-line no-eval
|
||||
measure: eval(measure) // eslint-disable-line no-eval
|
||||
});
|
||||
}
|
||||
|
||||
module.exports = function metricsCollector () {
|
||||
return new Transform({
|
||||
transform (chunk, enc, callback) {
|
||||
let entry;
|
||||
|
||||
try {
|
||||
entry = JSON.parse(chunk);
|
||||
const { level, time } = entry;
|
||||
|
||||
if (level === undefined && time === undefined) {
|
||||
throw new Error('Entry log is not valid');
|
||||
}
|
||||
} catch (e) {
|
||||
if (DEV_ENVS.includes(process.env.NODE_ENV)) {
|
||||
this.push(chunk);
|
||||
this.push(chunk + '\n');
|
||||
}
|
||||
return callback();
|
||||
}
|
||||
|
||||
const { request, response, stats } = entry;
|
||||
const flatEntry = flatten(entry);
|
||||
|
||||
if (request === undefined || response === undefined || stats === undefined) {
|
||||
this.push(chunk);
|
||||
return callback();
|
||||
}
|
||||
for (const metric of metrics) {
|
||||
const value = flatEntry[metric.valuePath];
|
||||
const labels = Array.isArray(metric.labelPaths) && metric.labelPaths.map(path => flatEntry[path]);
|
||||
|
||||
const { statusCode, headers } = response;
|
||||
const { 'carto-user': user } = headers;
|
||||
|
||||
requestCounter.inc();
|
||||
|
||||
if (statusCode !== undefined && user !== undefined) {
|
||||
userRequestCounter.labels(user, `${statusCode}`).inc();
|
||||
}
|
||||
|
||||
if (statusCode >= 200 && statusCode < 400) {
|
||||
requestOkCounter.inc();
|
||||
if (user !== undefined) {
|
||||
userRequestOkCounter.labels(user, `${statusCode}`).inc();
|
||||
}
|
||||
} else if (statusCode >= 400) {
|
||||
requestErrorCounter.inc();
|
||||
if (user !== undefined) {
|
||||
userRequestErrorCounter.labels(user, `${statusCode}`).inc();
|
||||
if (metric.shouldMeasure({ labels, value })) {
|
||||
metric.measure({ metric: metric.instance, labels, value });
|
||||
}
|
||||
}
|
||||
|
||||
const { response: responseTime } = stats;
|
||||
this.push(`${JSON.stringify(entry)}\n`);
|
||||
|
||||
if (Number.isFinite(responseTime)) {
|
||||
responseTimeHistogram.observe(responseTime);
|
||||
userResponseTimeHistogram.labels(user).observe(responseTime);
|
||||
}
|
||||
|
||||
this.push(chunk);
|
||||
callback();
|
||||
return callback();
|
||||
}
|
||||
});
|
||||
};
|
||||
|
||||
const port = process.env.PORT || 9144;
|
||||
|
||||
http
|
||||
.createServer((req, res) => {
|
||||
res.writeHead(200, { 'Content-Type': register.contentType });
|
||||
res.end(register.metrics());
|
||||
})
|
||||
.listen(port)
|
||||
.unref();
|
||||
}
|
||||
|
19
metro/metro.js
Normal file
19
metro/metro.js
Normal file
@ -0,0 +1,19 @@
|
||||
'use strict';
|
||||
|
||||
const util = require('util');
|
||||
const stream = require('stream');
|
||||
const pipeline = util.promisify(stream.pipeline);
|
||||
const split = require('split2');
|
||||
const MetricsCollector = require('./metrics-collector');
|
||||
|
||||
module.exports = async function metro ({ input = process.stdin, output = process.stdout, metrics = {} } = {}) {
|
||||
const metricsCollector = new MetricsCollector(metrics);
|
||||
const { stream: metricsStream } = metricsCollector;
|
||||
|
||||
try {
|
||||
await metricsCollector.start();
|
||||
await pipeline(input, split(), metricsStream, output);
|
||||
} finally {
|
||||
await metricsCollector.stop();
|
||||
}
|
||||
};
|
Loading…
Reference in New Issue
Block a user