Merge pull request #11239 from jfsiebel/sync-with-active-connections
Synchronize users with active connections
This commit is contained in:
commit
d122639042
@ -4,6 +4,7 @@ import Users from '/imports/api/users';
|
||||
import userJoin from './userJoin';
|
||||
import pendingAuthenticationsStore from '../store/pendingAuthentications';
|
||||
import createDummyUser from '../modifiers/createDummyUser';
|
||||
import ClientConnections from '/imports/startup/server/ClientConnections';
|
||||
|
||||
import upsertValidationState from '/imports/api/auth-token-validation/server/modifiers/upsertValidationState';
|
||||
import { ValidationStates } from '/imports/api/auth-token-validation';
|
||||
@ -75,6 +76,7 @@ export default function handleValidateAuthToken({ body }, meetingId) {
|
||||
createDummyUser(meetingId, userId, authToken);
|
||||
}
|
||||
|
||||
ClientConnections.add(sessionId, methodInvocationObject.connection);
|
||||
upsertValidationState(meetingId, userId, ValidationStates.VALIDATED, methodInvocationObject.connection.id);
|
||||
|
||||
/* End of logic migrated from validateAuthToken */
|
||||
|
@ -1,18 +1,18 @@
|
||||
import { Meteor } from 'meteor/meteor';
|
||||
import RedisPubSub from '/imports/startup/server/redis';
|
||||
import Logger from '/imports/startup/server/logger';
|
||||
import ClientConnections from '/imports/startup/server/ClientConnections';
|
||||
import upsertValidationState from '/imports/api/auth-token-validation/server/modifiers/upsertValidationState';
|
||||
import { ValidationStates } from '/imports/api/auth-token-validation';
|
||||
import pendingAuthenticationsStore from '../store/pendingAuthentications';
|
||||
import BannedUsers from '../store/bannedUsers';
|
||||
import Users from '/imports/api/users';
|
||||
|
||||
export default function validateAuthToken(meetingId, requesterUserId, requesterToken, externalId) {
|
||||
const REDIS_CONFIG = Meteor.settings.private.redis;
|
||||
const CHANNEL = REDIS_CONFIG.channels.toAkkaApps;
|
||||
const EVENT_NAME = 'ValidateAuthTokenReqMsg';
|
||||
|
||||
Logger.debug('ValidateAuthToken method called', { meetingId, requesterUserId, requesterToken, externalId });
|
||||
|
||||
// Check if externalId is banned from the meeting
|
||||
if (externalId) {
|
||||
if (BannedUsers.has(meetingId, externalId)) {
|
||||
@ -21,24 +21,7 @@ export default function validateAuthToken(meetingId, requesterUserId, requesterT
|
||||
}
|
||||
}
|
||||
|
||||
// Prevent users who have left or been ejected to use the same sessionToken again.
|
||||
const isUserInvalid = Users.findOne({
|
||||
meetingId,
|
||||
userId: requesterUserId,
|
||||
authToken: requesterToken,
|
||||
$or: [{ ejected: true }, { loggedOut: true }],
|
||||
});
|
||||
|
||||
if (isUserInvalid) {
|
||||
Logger.warn(`An invalid sessionToken tried to validateAuthToken meetingId=${meetingId} authToken=${requesterToken}`);
|
||||
return {
|
||||
invalid: true,
|
||||
reason: `User has an invalid sessionToken due to ${isUserInvalid.ejected ? 'ejection' : 'log out'}`,
|
||||
error_type: `invalid_session_token_due_to_${isUserInvalid.ejected ? 'eject' : 'log_out'}`,
|
||||
};
|
||||
}
|
||||
|
||||
ClientConnections.add(`${meetingId}--${requesterUserId}`, this.connection);
|
||||
if (!meetingId) return false;
|
||||
|
||||
// Store reference of methodInvocationObject ( to postpone the connection userId definition )
|
||||
pendingAuthenticationsStore.add(meetingId, requesterUserId, requesterToken, this);
|
||||
|
@ -1,6 +1,10 @@
|
||||
import Logger from './logger';
|
||||
import userLeaving from '/imports/api/users/server/methods/userLeaving';
|
||||
import { extractCredentials } from '/imports/api/common/server/helpers';
|
||||
import AuthTokenValidation from '/imports/api/auth-token-validation';
|
||||
import Users from '/imports/api/users';
|
||||
|
||||
const { enabled, syncInterval } = Meteor.settings.public.syncUsersWithConnectionManager;
|
||||
|
||||
class ClientConnections {
|
||||
constructor() {
|
||||
@ -11,10 +15,15 @@ class ClientConnections {
|
||||
this.print();
|
||||
}, 30000);
|
||||
|
||||
// setTimeout(() => {
|
||||
// this.syncConnectionsWithServer();
|
||||
// }, 10000);
|
||||
if (enabled) {
|
||||
const syncConnections = Meteor.bindEnvironment(() => {
|
||||
this.syncConnectionsWithServer();
|
||||
});
|
||||
|
||||
setInterval(() => {
|
||||
syncConnections();
|
||||
}, syncInterval);
|
||||
}
|
||||
}
|
||||
|
||||
add(sessionId, connection) {
|
||||
@ -29,6 +38,13 @@ class ClientConnections {
|
||||
|
||||
const { meetingId, requesterUserId: userId } = extractCredentials(sessionId);
|
||||
|
||||
if (!meetingId) {
|
||||
Logger.error('Error on add new client connection. sessionId=${sessionId} connection=${connection.id}',
|
||||
{ logCode: 'client_connections_add_error_meeting_id_null', extraInfo: { meetingId, userId } }
|
||||
);
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!this.exists(meetingId)) {
|
||||
Logger.info(`Meeting not found in connections: meetingId=${meetingId}`);
|
||||
this.createMeetingConnections(meetingId);
|
||||
@ -92,7 +108,7 @@ class ClientConnections {
|
||||
Logger.info(`Removing connectionId for user. sessionId=${sessionId} connectionId=${connectionId}`);
|
||||
const { meetingId, requesterUserId: userId } = extractCredentials(sessionId);
|
||||
|
||||
const meetingConnections = this.connections.get(meetingId)
|
||||
const meetingConnections = this.connections.get(meetingId);
|
||||
|
||||
if (meetingConnections?.has(userId)) {
|
||||
const filteredConnections = meetingConnections.get(userId).filter(c => c !== connectionId);
|
||||
@ -109,7 +125,38 @@ class ClientConnections {
|
||||
}
|
||||
|
||||
syncConnectionsWithServer() {
|
||||
console.error('syncConnectionsWithServer', Array.from(Meteor.server.sessions.keys()), Meteor.server);
|
||||
Logger.info('Syncing ClientConnections with server');
|
||||
const activeConnections = Array.from(Meteor.server.sessions.keys());
|
||||
|
||||
Logger.debug(`Found ${activeConnections.length} active connections in server`);
|
||||
|
||||
const onlineUsers = AuthTokenValidation
|
||||
.find(
|
||||
{ connectionId: { $in: activeConnections } },
|
||||
{ fields: { meetingId: 1, userId: 1 } }
|
||||
)
|
||||
.fetch();
|
||||
|
||||
const onlineUsersId = onlineUsers.map(({ userId }) => userId);
|
||||
|
||||
const usersQuery = { userId: { $nin: onlineUsersId } };
|
||||
|
||||
const userWithoutConnectionIds = Users.find(usersQuery, { fields: { meetingId: 1, userId: 1 } }).fetch();
|
||||
|
||||
const removedUsersWithoutConnection = Users.remove(usersQuery);
|
||||
|
||||
if (removedUsersWithoutConnection) {
|
||||
Logger.info(`Removed ${removedUsersWithoutConnection} users that are not connected`);
|
||||
Logger.info(`Clearing connections`);
|
||||
try {
|
||||
userWithoutConnectionIds
|
||||
.forEach(({ meetingId, userId }) => {
|
||||
this.removeClientConnection(`${meetingId}--${userId}`);
|
||||
});
|
||||
} catch (err) {
|
||||
Logger.error('Error on sync ClientConnections', err);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -272,9 +272,9 @@ public:
|
||||
mobilePageSizes:
|
||||
moderator: 2
|
||||
viewer: 2
|
||||
pingPong:
|
||||
clearUsersInSeconds: 180
|
||||
pongTimeInSeconds: 15
|
||||
syncUsersWithConnectionManager:
|
||||
enabled: false
|
||||
syncInterval: 60000
|
||||
allowOutsideCommands:
|
||||
toggleRecording: false
|
||||
toggleSelfVoice: false
|
||||
|
Loading…
Reference in New Issue
Block a user