Implement redis metrics object
This commit is contained in:
parent
a3b0d24395
commit
829591ffc7
@ -3,11 +3,20 @@ import Redis from 'redis';
|
||||
import { Meteor } from 'meteor/meteor';
|
||||
import { EventEmitter2 } from 'eventemitter2';
|
||||
import { check } from 'meteor/check';
|
||||
import fs from 'fs';
|
||||
import Logger from './logger';
|
||||
|
||||
// Fake meetingId used for messages that have no meetingId
|
||||
const NO_MEETING_ID = '_';
|
||||
|
||||
const metrics = {};
|
||||
|
||||
const {
|
||||
metricsDumpIntervalMs,
|
||||
metricsFolderPath,
|
||||
queueMetrics,
|
||||
} = Meteor.settings.private.redis.metrics;
|
||||
|
||||
const makeEnvelope = (channel, eventName, header, body, routing) => {
|
||||
const envelope = {
|
||||
envelope: {
|
||||
@ -34,6 +43,15 @@ class MeetingMessageQueue {
|
||||
this.queue = new PowerQueue();
|
||||
this.redisDebugEnabled = redisDebugEnabled;
|
||||
|
||||
Meteor.setInterval(() => {
|
||||
try {
|
||||
fs.writeFileSync(`${metricsFolderPath}/${new Date().getTime()}-metrics.json`, JSON.stringify(metrics));
|
||||
Logger.info('Metric file successfully writen');
|
||||
} catch (err) {
|
||||
Logger.error('Error on writing metrics to disk.', err);
|
||||
}
|
||||
}, metricsDumpIntervalMs);
|
||||
|
||||
this.handleTask = this.handleTask.bind(this);
|
||||
this.queue.taskHandler = this.handleTask;
|
||||
}
|
||||
@ -76,6 +94,35 @@ class MeetingMessageQueue {
|
||||
Logger.debug(`Redis: ${JSON.stringify(data.parsedMessage.core)} emitted`);
|
||||
}
|
||||
|
||||
if (queueMetrics) {
|
||||
const queueId = meetingId || NO_MEETING_ID;
|
||||
|
||||
const dataLength = JSON.stringify(data).length;
|
||||
if (!metrics[queueId].wasInQueue.hasOwnProperty(eventName)) {
|
||||
metrics[queueId].wasInQueue[eventName] = {
|
||||
count: 1,
|
||||
payloadSize: {
|
||||
min: dataLength,
|
||||
max: dataLength,
|
||||
last: dataLength,
|
||||
total: dataLength,
|
||||
avg: dataLength,
|
||||
},
|
||||
}
|
||||
} else {
|
||||
metrics[queueId].currentlyInQueue[eventName].count -= 1;
|
||||
|
||||
metrics[queueId].wasInQueue[eventName].count += 1;
|
||||
|
||||
metrics[queueId].wasInQueue[eventName].payloadSize.last = dataLength;
|
||||
metrics[queueId].wasInQueue[eventName].payloadSize.total += dataLength;
|
||||
metrics[queueId].wasInQueue[eventName].payloadSize.min > dataLength ? metrics[queueId].wasInQueue[eventName].payloadSize.min = dataLength : null
|
||||
metrics[queueId].wasInQueue[eventName].payloadSize.max < dataLength ? metrics[queueId].wasInQueue[eventName].payloadSize.max = dataLength : null
|
||||
|
||||
metrics[queueId].wasInQueue[eventName].payloadSize.avg = metrics[queueId].wasInQueue[eventName].payloadSize.total / metrics[queueId].wasInQueue[eventName].count;
|
||||
}
|
||||
}
|
||||
|
||||
if (isAsync) {
|
||||
callNext();
|
||||
}
|
||||
@ -182,6 +229,25 @@ class RedisPubSub {
|
||||
|
||||
const queueId = meetingId || NO_MEETING_ID;
|
||||
|
||||
if (queueMetrics) {
|
||||
if (!metrics.hasOwnProperty(queueId)) {
|
||||
metrics[queueId] = {
|
||||
currentlyInQueue: {},
|
||||
wasInQueue: {},
|
||||
};
|
||||
}
|
||||
|
||||
if (!metrics[queueId].currentlyInQueue.hasOwnProperty(eventName)) {
|
||||
metrics[queueId].currentlyInQueue[eventName] = {
|
||||
count: 1,
|
||||
payloadSize: message.length,
|
||||
};
|
||||
} else {
|
||||
metrics[queueId].currentlyInQueue[eventName].count += 1;
|
||||
metrics[queueId].currentlyInQueue[eventName].payloadSize += message.length;
|
||||
}
|
||||
}
|
||||
|
||||
if (!(queueId in this.mettingsQueues)) {
|
||||
this.mettingsQueues[meetingId] = new MeetingMessageQueue(this.emitter, async, this.redisDebugEnabled);
|
||||
}
|
||||
|
@ -442,6 +442,10 @@ private:
|
||||
timeout: 5000
|
||||
password: null
|
||||
debug: false
|
||||
metrics:
|
||||
queueMetrics: true
|
||||
metricsDumpIntervalMs: 30000
|
||||
metricsFolderPath: "/tmp/metrics"
|
||||
channels:
|
||||
toAkkaApps: to-akka-apps-redis-channel
|
||||
toThirdParty: to-third-party-redis-channel
|
||||
|
Loading…
Reference in New Issue
Block a user