WIP add bulk insert and throttle for chat messages

This commit is contained in:
Joao Siebel 2020-11-30 11:27:26 -03:00
parent fcd47b5371
commit b07786b9b8
6 changed files with 64 additions and 18 deletions

View File

@ -3,10 +3,11 @@ import handleGroupChatsMsgs from './handlers/groupChatsMsgs';
import handleGroupChatMsgBroadcast from './handlers/groupChatMsgBroadcast';
import handleClearPublicGroupChat from './handlers/clearPublicGroupChat';
import handleUserTyping from './handlers/userTyping';
import handleSyncGroupChatMsg from './handlers/syncGroupsChat';
import { processForHTML5ServerOnly } from '/imports/api/common/server/helpers';
RedisPubSub.on('GetGroupChatMsgsRespMsg', processForHTML5ServerOnly(handleGroupChatsMsgs));
RedisPubSub.on('GetGroupChatMsgsRespMsg', processForHTML5ServerOnly(handleSyncGroupChatMsg));
RedisPubSub.on('GroupChatMessageBroadcastEvtMsg', handleGroupChatMsgBroadcast);
RedisPubSub.on('ClearPublicChatHistoryEvtMsg', handleClearPublicGroupChat);
RedisPubSub.on('SyncGetGroupChatMsgsRespMsg', handleGroupChatsMsgs);
RedisPubSub.on('SyncGetGroupChatMsgsRespMsg', handleSyncGroupChatMsg);
RedisPubSub.on('UserTypingEvtMsg', handleUserTyping);

View File

@ -1,5 +1,13 @@
import { check } from 'meteor/check';
import _ from 'lodash';
import addGroupChatMsg from '../modifiers/addGroupChatMsg';
import addBulkGroupChatMsgs from '../modifiers/addBulkGroupChatMsgs';
const { bufferChatInsertsMs } = Meteor.settings.public.chat;
const msgBuffer = [];
const bulkFn = _.throttle(addBulkGroupChatMsgs, bufferChatInsertsMs);
export default function handleGroupChatMsgBroadcast({ body }, meetingId) {
const { chatId, msg } = body;
@ -8,5 +16,10 @@ export default function handleGroupChatMsgBroadcast({ body }, meetingId) {
check(chatId, String);
check(msg, Object);
addGroupChatMsg(meetingId, chatId, msg);
if (bufferChatInsertsMs) {
msgBuffer.push({ meetingId, chatId, msg });
bulkFn(msgBuffer);
} else {
addGroupChatMsg(meetingId, chatId, msg);
}
}

View File

@ -0,0 +1,12 @@
import { Match, check } from 'meteor/check';
import addBulkGroupChatMsgs from '../modifiers/addBulkGroupChatMsgs';
export default function handleSyncGroupChat({ body }, meetingId) {
const { chatId, msgs } = body;
check(meetingId, String);
check(chatId, String);
check(msgs, Match.Maybe(Array));
addBulkGroupChatMsgs(msgs);
}

View File

@ -0,0 +1,29 @@
import { Match, check } from 'meteor/check';
import { GroupChatMsg } from '/imports/api/group-chat-msg';
import Logger from '/imports/startup/server/logger';
import flat from 'flat';
import { parseMessage } from './addGroupChatMsg';
export default async function addBulkGroupChatMsgs(msgs) {
const mappedMsgs = msgs
.map(({ chatId, meetingId, msg }) => ({
_id: new Mongo.ObjectID()._str,
...msg,
meetingId,
chatId,
message: parseMessage(msg.message),
sender: msg.sender.id,
}))
.map(el => flat(el, { safe: true }));
try {
const { insertedCount } = await GroupChatMsg.rawCollection().insertMany(mappedMsgs);
msgs.length = 0;
if (insertedCount) {
Logger.info(`Inserted ${insertedCount} messages`);
}
} catch (err) {
Logger.error(`Error on bulk insert. ${err}`);
}
}

View File

@ -4,7 +4,7 @@ import Logger from '/imports/startup/server/logger';
import { GroupChatMsg } from '/imports/api/group-chat-msg';
import { BREAK_LINE } from '/imports/utils/lineEndings';
const parseMessage = (message) => {
export function parseMessage(message) {
let parsedMessage = message || '';
// Replace \r and \n to <br/>
@ -15,7 +15,7 @@ const parseMessage = (message) => {
parsedMessage = parsedMessage.split('<a href="event:').join('<a target="_blank" href="');
return parsedMessage;
};
}
export default function addGroupChatMsg(meetingId, chatId, msg) {
check(meetingId, String);
@ -37,25 +37,15 @@ export default function addGroupChatMsg(meetingId, chatId, msg) {
sender: msg.sender.id,
};
const selector = {
meetingId,
chatId,
id: msg.id,
};
const modifier = {
$set: flat(msgDocument, { safe: true }),
};
const modifier = flat(msgDocument, { safe: true });
try {
const { insertedId } = GroupChatMsg.upsert(selector, modifier);
const insertedId = GroupChatMsg.insert(modifier);
if (insertedId) {
Logger.info(`Added group-chat-msg msgId=${msg.id} chatId=${chatId} meetingId=${meetingId}`);
} else {
Logger.info(`Upserted group-chat-msg msgId=${msg.id} chatId=${chatId} meetingId=${meetingId}`);
}
} catch (err) {
Logger.error(`Adding group-chat-msg to collection: ${err}`);
Logger.error(`Error on adding group-chat-msg to collection: ${err}`);
}
}

View File

@ -244,6 +244,7 @@ public:
time: 5000
chat:
enabled: true
bufferChatInsertsMs: 100
startClosed: false
min_message_length: 1
max_message_length: 5000