2020-07-21 01:33:33 +08:00
|
|
|
'use strict';
|
2020-06-09 02:16:00 +08:00
|
|
|
|
2020-06-09 15:40:24 +08:00
|
|
|
const http = require('http');
|
2020-06-09 02:16:00 +08:00
|
|
|
const { Counter, Histogram, register } = require('prom-client');
|
2020-07-21 01:33:33 +08:00
|
|
|
const flatten = require('flat');
|
|
|
|
const { Transform, PassThrough } = require('stream');
|
2020-06-09 02:16:00 +08:00
|
|
|
const DEV_ENVS = ['test', 'development'];
|
|
|
|
|
2020-07-21 01:33:33 +08:00
|
|
|
const factory = {
|
|
|
|
counter: Counter,
|
|
|
|
histogram: Histogram
|
|
|
|
};
|
|
|
|
|
|
|
|
module.exports = class MetricsCollector {
|
|
|
|
constructor ({ port = 0, definitions } = {}) {
|
|
|
|
this._port = port;
|
|
|
|
this._definitions = definitions;
|
|
|
|
this._server = null;
|
|
|
|
this._stream = createTransformStream(this._definitions);
|
|
|
|
}
|
|
|
|
|
|
|
|
get stream () {
|
|
|
|
return this._stream;
|
|
|
|
}
|
|
|
|
|
|
|
|
start () {
|
|
|
|
return new Promise((resolve, reject) => {
|
|
|
|
this._server = http.createServer((req, res) => {
|
|
|
|
res.writeHead(200, { 'Content-Type': register.contentType });
|
|
|
|
res.end(register.metrics());
|
|
|
|
});
|
|
|
|
|
|
|
|
this._server.once('error', err => reject(err));
|
|
|
|
this._server.once('listening', () => resolve());
|
|
|
|
this._server.listen(this._port);
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
|
|
|
stop () {
|
|
|
|
return new Promise((resolve) => {
|
|
|
|
register.clear();
|
|
|
|
if (!this._server) {
|
|
|
|
return resolve();
|
|
|
|
}
|
|
|
|
|
|
|
|
this._server.once('close', () => {
|
|
|
|
this._server = null;
|
|
|
|
resolve();
|
|
|
|
});
|
|
|
|
|
|
|
|
this._server.close();
|
|
|
|
});
|
|
|
|
};
|
|
|
|
};
|
|
|
|
|
|
|
|
function createTransformStream (definitions) {
|
|
|
|
if (typeof definitions !== 'object') {
|
|
|
|
return new PassThrough();
|
|
|
|
}
|
|
|
|
|
|
|
|
const metrics = [];
|
|
|
|
|
|
|
|
for (const { type, options, valuePath, labelPaths, shouldMeasure, measure } of definitions) {
|
|
|
|
metrics.push({
|
|
|
|
instance: new factory[type](options),
|
|
|
|
valuePath,
|
|
|
|
labelPaths,
|
|
|
|
shouldMeasure: eval(shouldMeasure), // eslint-disable-line no-eval
|
|
|
|
measure: eval(measure) // eslint-disable-line no-eval
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
2020-06-09 02:16:00 +08:00
|
|
|
return new Transform({
|
|
|
|
transform (chunk, enc, callback) {
|
|
|
|
let entry;
|
|
|
|
|
|
|
|
try {
|
|
|
|
entry = JSON.parse(chunk);
|
|
|
|
} catch (e) {
|
|
|
|
if (DEV_ENVS.includes(process.env.NODE_ENV)) {
|
2020-07-21 22:36:23 +08:00
|
|
|
this.push(chunk + '\n');
|
2020-06-09 02:16:00 +08:00
|
|
|
}
|
|
|
|
return callback();
|
|
|
|
}
|
|
|
|
|
2020-07-21 01:33:33 +08:00
|
|
|
const flatEntry = flatten(entry);
|
2020-06-09 02:16:00 +08:00
|
|
|
|
2020-07-21 01:33:33 +08:00
|
|
|
for (const metric of metrics) {
|
|
|
|
const value = flatEntry[metric.valuePath];
|
|
|
|
const labels = Array.isArray(metric.labelPaths) && metric.labelPaths.map(path => flatEntry[path]);
|
2020-06-19 16:18:06 +08:00
|
|
|
|
2020-07-21 01:33:33 +08:00
|
|
|
if (metric.shouldMeasure({ labels, value })) {
|
|
|
|
metric.measure({ metric: metric.instance, labels, value });
|
2020-06-19 16:18:06 +08:00
|
|
|
}
|
2020-06-09 02:16:00 +08:00
|
|
|
}
|
|
|
|
|
2020-07-21 22:36:23 +08:00
|
|
|
this.push(`${JSON.stringify(entry)}\n`);
|
|
|
|
|
|
|
|
return callback();
|
2020-06-09 02:16:00 +08:00
|
|
|
}
|
2020-07-21 01:33:33 +08:00
|
|
|
});
|
2020-06-09 02:16:00 +08:00
|
|
|
}
|