Merge pull request #10993 from jfsiebel/redis-metrics

Generate data from redis messages
This commit is contained in:
Anton Georgiev 2020-12-08 10:33:33 -05:00 committed by GitHub
commit 185df30e3d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 86 additions and 0 deletions

View File

@ -3,11 +3,20 @@ import Redis from 'redis';
import { Meteor } from 'meteor/meteor';
import { EventEmitter2 } from 'eventemitter2';
import { check } from 'meteor/check';
import fs from 'fs';
import Logger from './logger';
// Fake meetingId used for messages that have no meetingId
const NO_MEETING_ID = '_';
const metrics = {};
const {
metricsDumpIntervalMs,
metricsFolderPath,
queueMetrics,
} = Meteor.settings.private.redis.metrics;
const makeEnvelope = (channel, eventName, header, body, routing) => {
const envelope = {
envelope: {
@ -59,6 +68,49 @@ class MeetingMessageQueue {
Logger.debug(`Redis: ${eventName} completed ${isAsync ? 'async' : 'sync'}`);
}
called = true;
if (queueMetrics) {
const queueId = meetingId || NO_MEETING_ID;
const dataLength = JSON.stringify(data).length;
if (!metrics[queueId].wasInQueue.hasOwnProperty(eventName)) {
metrics[queueId].wasInQueue[eventName] = {
count: 1,
payloadSize: {
min: dataLength,
max: dataLength,
last: dataLength,
total: dataLength,
avg: dataLength,
},
};
metrics[queueId].currentlyInQueue[eventName].count -= 1;
if (!metrics[queueId].currentlyInQueue[eventName].count) delete metrics[queueId].currentlyInQueue[eventName];
} else {
metrics[queueId].currentlyInQueue[eventName].count -= 1;
if (!metrics[queueId].currentlyInQueue[eventName].count) delete metrics[queueId].currentlyInQueue[eventName];
const processedEventMetrics = metrics[queueId].wasInQueue[eventName];
processedEventMetrics.count += 1;
processedEventMetrics.payloadSize.last = dataLength;
processedEventMetrics.payloadSize.total += dataLength;
if (processedEventMetrics.payloadSize.min > dataLength) {
processedEventMetrics.payloadSize.min = dataLength;
}
if (processedEventMetrics.payloadSize.max < dataLength) {
processedEventMetrics.payloadSize.max = dataLength;
}
processedEventMetrics.payloadSize.avg = processedEventMetrics.payloadSize.total / processedEventMetrics.count;
}
}
const queueLength = this.queue.length();
if (queueLength > 100) {
Logger.warn(`Redis: MeetingMessageQueue for meetingId=${meetingId} has queue size=${queueLength} `);
@ -119,6 +171,17 @@ class RedisPubSub {
this.sub = Redis.createClient({ host, port });
}
if (queueMetrics) {
Meteor.setInterval(() => {
try {
fs.writeFileSync(`${metricsFolderPath}/${new Date().getTime()}-metrics.json`, JSON.stringify(metrics));
Logger.info('Metric file successfully writen');
} catch (err) {
Logger.error('Error on writing metrics to disk.', err);
}
}, metricsDumpIntervalMs || 10000);
}
this.emitter = new EventEmitter2();
this.mettingsQueues = {};
@ -182,6 +245,25 @@ class RedisPubSub {
const queueId = meetingId || NO_MEETING_ID;
if (queueMetrics) {
if (!metrics.hasOwnProperty(queueId)) {
metrics[queueId] = {
currentlyInQueue: {},
wasInQueue: {},
};
}
if (!metrics[queueId].currentlyInQueue.hasOwnProperty(eventName)) {
metrics[queueId].currentlyInQueue[eventName] = {
count: 1,
payloadSize: message.length,
};
} else {
metrics[queueId].currentlyInQueue[eventName].count += 1;
metrics[queueId].currentlyInQueue[eventName].payloadSize += message.length;
}
}
if (!(queueId in this.mettingsQueues)) {
this.mettingsQueues[meetingId] = new MeetingMessageQueue(this.emitter, async, this.redisDebugEnabled);
}

View File

@ -442,6 +442,10 @@ private:
timeout: 5000
password: null
debug: false
metrics:
queueMetrics: false
metricsDumpIntervalMs: 60000
metricsFolderPath: METRICS_FOLDER
channels:
toAkkaApps: to-akka-apps-redis-channel
toThirdParty: to-third-party-redis-channel