Update annotations streams logic

This commit is contained in:
Joao Siebel 2019-10-24 17:48:03 -03:00
parent 5f356338c8
commit 8c159b4165
7 changed files with 61 additions and 59 deletions

View File

@ -1,40 +0,0 @@
import { Meteor } from 'meteor/meteor';
import Users from '/imports/api/users';
const Streamer = new Meteor.Streamer('cursor', { retransmit: false, });
if (Meteor.isServer) {
import publishCursorUpdate from './server/methods/publishCursorUpdate';
Streamer.on('publish', (message) => {
publishCursorUpdate(message.credentials, message.payload);
});
Streamer.allowRead(function(eventName, ...args) {
return true;
});
Streamer.allowEmit(function(eventName, { meetingId }) {
return this.userId && this.userId.includes(meetingId);
});
Streamer.allowWrite(function(eventName, { credentials }) {
if (!this.userId || !credentials) return false;
const { meetingId, requesterUserId: userId, requesterToken: authToken } = credentials;
const user = Users.findOne({
meetingId,
userId,
authToken,
connectionId: this.connection.id,
validated: true,
connectionStatus: 'online',
});
if (!user) return false;
return true;
});
}
export const CursorStreamer = Streamer;

View File

@ -1,5 +1,5 @@
import { check } from 'meteor/check'; import { check } from 'meteor/check';
import { CursorStreamer } from '/imports/api/cursor'; import CursorStreamer from '/imports/api/cursor/server/streamer';
const CURSOR_PROCCESS_INTERVAL = 30; const CURSOR_PROCCESS_INTERVAL = 30;
@ -12,9 +12,9 @@ const proccess = () => {
return; return;
} }
cursorRecieverIsRunning = true; cursorRecieverIsRunning = true;
Object.keys(cursorQueue).forEach(meetingId => { Object.keys(cursorQueue).forEach((meetingId) => {
CursorStreamer.emit('message', { meetingId, cursors: cursorQueue[meetingId] }); CursorStreamer(meetingId).emit('message', { meetingId, cursors: cursorQueue[meetingId] });
}); });
cursorQueue = {}; cursorQueue = {};
@ -28,7 +28,7 @@ export default function handleCursorUpdate({ header, body }, meetingId) {
check(meetingId, String); check(meetingId, String);
check(userId, String); check(userId, String);
if(!cursorQueue.hasOwnProperty(meetingId)) { if (!cursorQueue.hasOwnProperty(meetingId)) {
cursorQueue[meetingId] = {}; cursorQueue[meetingId] = {};
} }
// overwrite since we dont care about the other positions // overwrite since we dont care about the other positions

View File

@ -0,0 +1,25 @@
import publishCursorUpdate from './methods/publishCursorUpdate';
export function removeCursorStreamer(meetingId) {
delete Meteor.StreamerCentral.instances[`cursor-${meetingId}`];
}
export function addCursorStreamer(meetingId) {
const streamer = new Meteor.Streamer(`cursor-${meetingId}`, { retransmit: false });
streamer.allowRead(function allowRead() {
return this.userId && this.userId.includes(meetingId);
});
streamer.allowWrite(function allowWrite() {
return this.userId && this.userId.includes(meetingId);
});
streamer.on('publish', (message) => {
publishCursorUpdate(message.credentials, message.payload);
});
}
export default function get(meetingId) {
return Meteor.StreamerCentral.instances[`cursor-${meetingId}`];
}

View File

@ -7,6 +7,7 @@ import Meetings, { RecordMeetings } from '/imports/api/meetings';
import Logger from '/imports/startup/server/logger'; import Logger from '/imports/startup/server/logger';
import createNote from '/imports/api/note/server/methods/createNote'; import createNote from '/imports/api/note/server/methods/createNote';
import createCaptions from '/imports/api/captions/server/methods/createCaptions'; import createCaptions from '/imports/api/captions/server/methods/createCaptions';
import { addCursorStreamer } from '/imports/api/cursor/server/streamer';
export default function addMeeting(meeting) { export default function addMeeting(meeting) {
const meetingId = meeting.meetingProp.intId; const meetingId = meeting.meetingProp.intId;
@ -174,5 +175,7 @@ export default function addMeeting(meeting) {
...recordProp, ...recordProp,
}, cbRecord); }, cbRecord);
addCursorStreamer(meetingId);
return Meetings.upsert(selector, modifier, cb); return Meetings.upsert(selector, modifier, cb);
} }

View File

@ -1,6 +1,8 @@
import Meetings from '/imports/api/meetings'; import Meetings from '/imports/api/meetings';
import Logger from '/imports/startup/server/logger'; import Logger from '/imports/startup/server/logger';
import { removeCursorStreamer } from '/imports/api/cursor/server/streamer';
import clearUsers from '/imports/api/users/server/modifiers/clearUsers'; import clearUsers from '/imports/api/users/server/modifiers/clearUsers';
import clearUsersSettings from '/imports/api/users-settings/server/modifiers/clearUsersSettings'; import clearUsersSettings from '/imports/api/users-settings/server/modifiers/clearUsersSettings';
import clearGroupChat from '/imports/api/group-chat/server/modifiers/clearGroupChat'; import clearGroupChat from '/imports/api/group-chat/server/modifiers/clearGroupChat';
@ -18,6 +20,8 @@ import clearLocalSettings from '/imports/api/local-settings/server/modifiers/cle
import clearRecordMeeting from './clearRecordMeeting'; import clearRecordMeeting from './clearRecordMeeting';
export default function meetingHasEnded(meetingId) { export default function meetingHasEnded(meetingId) {
removeCursorStreamer(meetingId);
return Meetings.remove({ meetingId }, () => { return Meetings.remove({ meetingId }, () => {
clearCaptions(meetingId); clearCaptions(meetingId);
clearGroupChat(meetingId); clearGroupChat(meetingId);

View File

@ -1,8 +1,8 @@
import Auth from '/imports/ui/services/auth'; import Auth from '/imports/ui/services/auth';
import { CursorStreamer } from '/imports/api/cursor';
import { throttle } from 'lodash'; import { throttle } from 'lodash';
const Cursor = new Mongo.Collection(null); const Cursor = new Mongo.Collection(null);
let cursorStreamListener = null;
function updateCursor(userId, payload) { function updateCursor(userId, payload) {
const selector = { const selector = {
@ -20,22 +20,30 @@ function updateCursor(userId, payload) {
return Cursor.upsert(selector, modifier); return Cursor.upsert(selector, modifier);
} }
CursorStreamer.on('message', ({ meetingID, cursors }) => {
Object.keys(cursors).forEach((userId) => {
if (Auth.userID === userId) return;
updateCursor(userId, cursors[userId]);
});
});
const throttledEmit = throttle(CursorStreamer.emit.bind(CursorStreamer), 30, { trailing: true });
export function publishCursorUpdate(payload) { export function publishCursorUpdate(payload) {
throttledEmit('publish', { if (cursorStreamListener) {
credentials: Auth.credentials, const throttledEmit = throttle(cursorStreamListener.emit.bind(cursorStreamListener), 30, { trailing: true });
payload,
}); throttledEmit('publish', {
credentials: Auth.credentials,
payload,
});
}
return updateCursor(Auth.userID, payload); return updateCursor(Auth.userID, payload);
} }
export function initCursorStreamListener() {
if (!cursorStreamListener) {
cursorStreamListener = new Meteor.Streamer(`cursor-${Auth.meetingID}`, { retransmit: false });
cursorStreamListener.on('message', ({ cursors }) => {
Object.keys(cursors).forEach((userId) => {
if (Auth.userID === userId) return;
updateCursor(userId, cursors[userId]);
});
});
}
}
export default Cursor; export default Cursor;

View File

@ -7,6 +7,7 @@ import Users from '/imports/api/users';
import Annotations from '/imports/api/annotations'; import Annotations from '/imports/api/annotations';
import AnnotationsTextService from '/imports/ui/components/whiteboard/annotations/text/service'; import AnnotationsTextService from '/imports/ui/components/whiteboard/annotations/text/service';
import AnnotationsLocal from '/imports/ui/components/whiteboard/service'; import AnnotationsLocal from '/imports/ui/components/whiteboard/service';
import { initCursorStreamListener } from '/imports/ui/components/cursor/service';
const CHAT_CONFIG = Meteor.settings.public.chat; const CHAT_CONFIG = Meteor.settings.public.chat;
@ -94,6 +95,7 @@ export default withTracker(() => {
Meteor.subscribe('users', credentials, userIsModerator, subscriptionErrorHandler); Meteor.subscribe('users', credentials, userIsModerator, subscriptionErrorHandler);
Meteor.subscribe('breakouts', credentials, userIsModerator, subscriptionErrorHandler); Meteor.subscribe('breakouts', credentials, userIsModerator, subscriptionErrorHandler);
Meteor.subscribe('meetings', credentials, userIsModerator, subscriptionErrorHandler); Meteor.subscribe('meetings', credentials, userIsModerator, subscriptionErrorHandler);
initCursorStreamListener();
} }
const annotationsHandler = Meteor.subscribe('annotations', credentials, { const annotationsHandler = Meteor.subscribe('annotations', credentials, {