Voice call states and voice users back-end migrated to new async API.

This commit is contained in:
imdt 2023-03-16 15:37:24 -03:00 committed by Ramón Souza
parent 3fb59bcb41
commit 6949e9b59b
22 changed files with 105 additions and 88 deletions

View File

@ -7,7 +7,7 @@ if (Meteor.isServer) {
// 1. intId
// 2. meetingId, intId
VoiceCallStates._ensureIndex({ meetingId: 1, userId: 1 });
VoiceCallStates.createIndexAsync({ meetingId: 1, userId: 1 });
}
export default VoiceCallStates;

View File

@ -4,7 +4,7 @@ import Logger from '/imports/startup/server/logger';
// "CALL_STARTED", "IN_ECHO_TEST", "IN_CONFERENCE", "CALL_ENDED"
export default function handleVoiceCallStateEvent({ body }, meetingId) {
export default async function handleVoiceCallStateEvent({ body }, meetingId) {
const {
voiceConf,
clientSession,
@ -37,7 +37,7 @@ export default function handleVoiceCallStateEvent({ body }, meetingId) {
};
try {
const { numberAffected } = VoiceCallState.upsert(selector, modifier);
const { numberAffected } = await VoiceCallState.upsertAsync(selector, modifier);
if (numberAffected) {
Logger.debug('Update voice call', {

View File

@ -1,10 +1,10 @@
import Logger from '/imports/startup/server/logger';
import VoiceCallStates from '/imports/api/voice-call-states';
export default function clearVoiceCallStates(meetingId) {
export default async function clearVoiceCallStates(meetingId) {
if (meetingId) {
try {
const numberAffected = VoiceCallStates.remove({ meetingId });
const numberAffected = await VoiceCallStates.removeAsync({ meetingId });
if (numberAffected) {
Logger.info(`Cleared VoiceCallStates in (${meetingId})`);
@ -14,7 +14,7 @@ export default function clearVoiceCallStates(meetingId) {
}
} else {
try {
const numberAffected = VoiceCallStates.remove({});
const numberAffected = await VoiceCallStates.removeAsync({});
if (numberAffected) {
Logger.info('Cleared VoiceCallStates in all meetings');

View File

@ -3,8 +3,9 @@ import { Meteor } from 'meteor/meteor';
import Logger from '/imports/startup/server/logger';
import AuthTokenValidation, { ValidationStates } from '/imports/api/auth-token-validation';
function voiceCallStates() {
const tokenValidation = AuthTokenValidation.findOne({ connectionId: this.connection.id });
async function voiceCallStates() {
const tokenValidation = await AuthTokenValidation
.findOneAsync({ connectionId: this.connection.id });
if (!tokenValidation || tokenValidation.validationStatus !== ValidationStates.VALIDATED) {
Logger.warn(`Publishing VoiceCallStates was requested by unauth connection ${this.connection.id}`);

View File

@ -11,8 +11,8 @@ if (Meteor.isServer) {
// 1. intId
// 2. meetingId, intId
VoiceUsers._ensureIndex({ intId: 1 });
VoiceUsers._ensureIndex({ meetingId: 1, intId: 1 });
VoiceUsers.createIndexAsync({ intId: 1 });
VoiceUsers.createIndexAsync({ meetingId: 1, intId: 1 });
}
export default VoiceUsers;

View File

@ -5,24 +5,26 @@ import addVoiceUser from '../modifiers/addVoiceUser';
import removeVoiceUser from '../modifiers/removeVoiceUser';
import updateVoiceUser from '../modifiers/updateVoiceUser';
export default function handleGetVoiceUsers({ body }, meetingId) {
export default async function handleGetVoiceUsers({ body }, meetingId) {
const { users } = body;
check(meetingId, String);
check(users, Array);
const meeting = Meetings.findOne({ meetingId }, { fields: { 'voiceProp.voiceConf': 1 } });
const usersIds = users.map(m => m.intId);
const meeting = await Meetings.findOneAsync({ meetingId }, { fields: { 'voiceProp.voiceConf': 1 } });
const usersIds = users.map((m) => m.intId);
const voiceUsersIdsToUpdate = VoiceUsers.find({
const voiceUsersFetch = await VoiceUsers.find({
meetingId,
intId: { $in: usersIds },
}, { fields: { intId: 1 } }).fetch().map(m => m.intId);
}, { fields: { intId: 1 } }).fetchAsync();
users.forEach((user) => {
const voiceUsersIdsToUpdate = voiceUsersFetch.map((m) => m.intId);
await Promise.all(users.map(async (user) => {
if (voiceUsersIdsToUpdate.indexOf(user.intId) >= 0) {
// user already exist, then update
updateVoiceUser(meetingId, {
await updateVoiceUser(meetingId, {
intId: user.intId,
voiceUserId: user.voiceUserId,
talking: user.talking,
@ -32,7 +34,7 @@ export default function handleGetVoiceUsers({ body }, meetingId) {
});
} else {
// user doesn't exist yet, then add it
addVoiceUser(meetingId, {
await addVoiceUser(meetingId, {
voiceUserId: user.voiceUserId,
intId: user.intId,
callerName: user.callerName,
@ -45,16 +47,17 @@ export default function handleGetVoiceUsers({ body }, meetingId) {
joined: true,
});
}
});
}));
// removing extra users already existing in Mongo
const voiceUsersToRemove = VoiceUsers.find({
const voiceUsersToRemove = await VoiceUsers.find({
meetingId,
intId: { $nin: usersIds },
}).fetch();
voiceUsersToRemove.forEach(user => removeVoiceUser(meetingId, {
voiceConf: meeting.voiceProp.voiceConf,
voiceUserId: user.voiceUserId,
intId: user.intId,
}).fetchAsync();
await Promise.all(voiceUsersToRemove.map(async (user) => {
await removeVoiceUser(meetingId, {
voiceConf: meeting.voiceProp.voiceConf,
voiceUserId: user.voiceUserId,
intId: user.intId,
});
}));
}

View File

@ -4,7 +4,7 @@ import addDialInUser from '/imports/api/users/server/modifiers/addDialInUser';
import addVoiceUser from '../modifiers/addVoiceUser';
export default function handleJoinVoiceUser({ body }, meetingId) {
export default async function handleJoinVoiceUser({ body }, meetingId) {
const voiceUser = body;
voiceUser.joined = true;
@ -26,15 +26,16 @@ export default function handleJoinVoiceUser({ body }, meetingId) {
intId,
} = voiceUser;
const User = Users.findOne({
const User = await Users.findOneAsync({
meetingId,
intId,
});
if (!User) {
/* voice-only user - called into the conference */
addDialInUser(meetingId, voiceUser);
await addDialInUser(meetingId, voiceUser);
}
return addVoiceUser(meetingId, voiceUser);
const result = await addVoiceUser(meetingId, voiceUser);
return result;
}

View File

@ -4,7 +4,7 @@ import removeVoiceUser from '/imports/api/voice-users/server/modifiers/removeVoi
import removeUser from '/imports/api/users/server/modifiers/removeUser';
import Users from '/imports/api/users';
export default function handleVoiceUpdate({ body }, meetingId) {
export default async function handleVoiceUpdate({ body }, meetingId) {
const voiceUser = body;
check(meetingId, String);
@ -19,10 +19,14 @@ export default function handleVoiceUpdate({ body }, meetingId) {
voiceUserId,
} = voiceUser;
const isDialInUser = (userId, meetingID) => !!Users.findOne({ meetingId: meetingID, userId, clientType: 'dial-in-user' });
const isDialInUser = async (userId, meetingID) => {
const user = await Users.findOneAsync({ meetingId: meetingID, userId, clientType: 'dial-in-user' });
return !!user;
};
// if the user is dial-in, leaving voice also means leaving userlist
if (isDialInUser(voiceUserId, meetingId)) removeUser(voiceUser, meetingId);
if (await isDialInUser(voiceUserId, meetingId)) removeUser(voiceUser, meetingId);
return removeVoiceUser(meetingId, voiceUser);
const result = await removeVoiceUser(meetingId, voiceUser);
return result;
}

View File

@ -1,5 +1,5 @@
import changeMuteMeeting from '../modifiers/changeMuteMeeting';
export default function handleMeetingMuted({ body }, meetingId) {
changeMuteMeeting(meetingId, body);
export default async function handleMeetingMuted({ body }, meetingId) {
await changeMuteMeeting(meetingId, body);
}

View File

@ -2,7 +2,7 @@ import { check } from 'meteor/check';
import updateVoiceUser from '../modifiers/updateVoiceUser';
export default function handleVoiceUpdate({ body }, meetingId) {
export default async function handleVoiceUpdate({ body }, meetingId) {
const voiceUser = body;
check(meetingId, String);
@ -12,5 +12,6 @@ export default function handleVoiceUpdate({ body }, meetingId) {
voiceUser.talking = false;
}
return updateVoiceUser(meetingId, voiceUser);
const result = await updateVoiceUser(meetingId, voiceUser);
return result;
}

View File

@ -2,10 +2,11 @@ import { check } from 'meteor/check';
import updateVoiceUser from '../modifiers/updateVoiceUser';
export default function handleVoiceUpdate({ body }, meetingId) {
export default async function handleVoiceUpdate({ body }, meetingId) {
const voiceUser = body;
check(meetingId, String);
return updateVoiceUser(meetingId, voiceUser);
const result = await updateVoiceUser(meetingId, voiceUser);
return result;
}

View File

@ -6,22 +6,24 @@ import updateVoiceUser from '../modifiers/updateVoiceUser';
import addVoiceUser from '../modifiers/addVoiceUser';
export default function handleVoiceUsers({ header, body }) {
export default async function handleVoiceUsers({ header, body }) {
const { voiceUsers } = body;
const { meetingId } = header;
const meeting = Meetings.findOne({ meetingId }, { fields: { 'voiceProp.voiceConf': 1 } });
const usersIds = voiceUsers.map(m => m.intId);
const meeting = await Meetings.findOneAsync({ meetingId }, { fields: { 'voiceProp.voiceConf': 1 } });
const usersIds = voiceUsers.map((m) => m.intId);
const voiceUsersIdsToUpdate = VoiceUsers.find({
const voiceUsersFetch = await VoiceUsers.find({
meetingId,
intId: { $in: usersIds },
}, { fields: { intId: 1 } }).fetch().map(m => m.intId);
}, { fields: { intId: 1 } }).fetchAsync();
voiceUsers.forEach((voice) => {
const voiceUsersIdsToUpdate = voiceUsersFetch.map((m) => m.intId);
await Promise.all(voiceUsers.map(async (voice) => {
if (voiceUsersIdsToUpdate.indexOf(voice.intId) >= 0) {
// user already exist, then update
updateVoiceUser(meetingId, {
await updateVoiceUser(meetingId, {
intId: voice.intId,
voiceUserId: voice.voiceUserId,
talking: voice.talking,
@ -31,7 +33,7 @@ export default function handleVoiceUsers({ header, body }) {
});
} else {
// user doesn't exist yet, then add it
addVoiceUser(meetingId, {
await addVoiceUser(meetingId, {
voiceUserId: voice.voiceUserId,
intId: voice.intId,
callerName: voice.callerName,
@ -44,18 +46,21 @@ export default function handleVoiceUsers({ header, body }) {
joined: true,
});
addDialInUser(meetingId, voice);
await addDialInUser(meetingId, voice);
}
});
}));
// removing extra users already existing in Mongo
const voiceUsersToRemove = VoiceUsers.find({
const voiceUsersToRemove = await VoiceUsers.find({
meetingId,
intId: { $nin: usersIds },
}, { fields: { voiceUserId: 1, intId: 1 } }).fetch();
voiceUsersToRemove.forEach(user => removeVoiceUser(meetingId, {
voiceConf: meeting.voiceProp.voiceConf,
voiceUserId: user.voiceUserId,
intId: user.intId,
}, { fields: { voiceUserId: 1, intId: 1 } }).fetchAsync();
await Promise.all(voiceUsersToRemove.map(async (user) => {
await removeVoiceUser(meetingId, {
voiceConf: meeting.voiceProp.voiceConf,
voiceUserId: user.voiceUserId,
intId: user.intId,
});
}));
}

View File

@ -5,7 +5,7 @@ import { extractCredentials } from '/imports/api/common/server/helpers';
import { check } from 'meteor/check';
import Logger from '/imports/startup/server/logger';
export default function muteAllExceptPresenterToggle() {
export default async function muteAllExceptPresenterToggle() {
try {
const REDIS_CONFIG = Meteor.settings.private.redis;
const CHANNEL = REDIS_CONFIG.channels.toAkkaApps;
@ -16,7 +16,7 @@ export default function muteAllExceptPresenterToggle() {
check(meetingId, String);
check(requesterUserId, String);
const meeting = Meetings.findOne({ meetingId });
const meeting = await Meetings.findOneAsync({ meetingId });
const toggleMeetingMuted = !meeting.voiceProp.muteOnStart;
const payload = {

View File

@ -5,7 +5,7 @@ import { extractCredentials } from '/imports/api/common/server/helpers';
import { check } from 'meteor/check';
import Logger from '/imports/startup/server/logger';
export default function muteAllToggle() {
export default async function muteAllToggle() {
try {
const REDIS_CONFIG = Meteor.settings.private.redis;
const CHANNEL = REDIS_CONFIG.channels.toAkkaApps;
@ -16,7 +16,7 @@ export default function muteAllToggle() {
check(meetingId, String);
check(requesterUserId, String);
const meeting = Meetings.findOne({ meetingId });
const meeting = await Meetings.findOneAsync({ meetingId });
const toggleMeetingMuted = !meeting.voiceProp.muteOnStart;
const payload = {

View File

@ -7,7 +7,7 @@ import Meetings from '/imports/api/meetings';
import Logger from '/imports/startup/server/logger';
import { check } from 'meteor/check';
export default function muteToggle(uId, toggle) {
export default async function muteToggle(uId, toggle) {
try {
const REDIS_CONFIG = Meteor.settings.private.redis;
const CHANNEL = REDIS_CONFIG.channels.toAkkaApps;
@ -20,12 +20,12 @@ export default function muteToggle(uId, toggle) {
const userToMute = uId || requesterUserId;
const requester = Users.findOne({
const requester = await Users.findOneAsync({
meetingId,
userId: requesterUserId,
});
const voiceUser = VoiceUsers.findOne({
const voiceUser = await VoiceUsers.findOneAsync({
intId: userToMute,
meetingId,
});
@ -37,7 +37,7 @@ export default function muteToggle(uId, toggle) {
// if allowModsToUnmuteUsers is false, users will be kicked out for attempting to unmute others
if (requesterUserId !== userToMute && muted) {
const meeting = Meetings.findOne({ meetingId },
const meeting = await Meetings.findOneAsync({ meetingId },
{ fields: { 'usersProp.allowModsToUnmuteUsers': 1 } });
if (meeting.usersProp && !meeting.usersProp.allowModsToUnmuteUsers) {
Logger.warn(`Attempted unmuting by another user meetingId:${meetingId} requester: ${requesterUserId} userId: ${userToMute}`);

View File

@ -4,7 +4,7 @@ import VoiceUsers from '/imports/api/voice-users';
import Users from '/imports/api/users';
import flat from 'flat';
export default function addVoiceUser(meetingId, voiceUser) {
export default async function addVoiceUser(meetingId, voiceUser) {
check(meetingId, String);
check(voiceUser, {
voiceUserId: String,
@ -33,7 +33,7 @@ export default function addVoiceUser(meetingId, voiceUser) {
),
};
const user = Users.findOne({ meetingId, userId: intId }, {
const user = await Users.findOneAsync({ meetingId, userId: intId }, {
fields: {
color: 1,
},
@ -42,7 +42,7 @@ export default function addVoiceUser(meetingId, voiceUser) {
if (user) modifier.$set.color = user.color;
try {
const { numberAffected } = VoiceUsers.upsert(selector, modifier);
const { numberAffected } = await VoiceUsers.upsertAsync(selector, modifier);
if (numberAffected) {
Logger.info(`Add voice user=${intId} meeting=${meetingId}`);

View File

@ -2,7 +2,7 @@ import Meetings from '/imports/api/meetings';
import { check } from 'meteor/check';
import Logger from '/imports/startup/server/logger';
export default function changeMuteMeeting(meetingId, payload) {
export default async function changeMuteMeeting(meetingId, payload) {
check(meetingId, String);
check(payload, {
muted: Boolean,
@ -20,7 +20,7 @@ export default function changeMuteMeeting(meetingId, payload) {
};
try {
const { numberAffected } = Meetings.upsert(selector, modifier);
const { numberAffected } = await Meetings.upsertAsync(selector, modifier);
if (numberAffected) {
Logger.info(`Changed meeting mute status meeting=${meetingId}`);

View File

@ -1,12 +1,12 @@
import Logger from '/imports/startup/server/logger';
import VoiceUsers from '/imports/api/voice-users';
export default function clearVoiceUser(meetingId, intId) {
export default async function clearVoiceUser(meetingId, intId) {
try {
check(meetingId, String);
check(intId, String);
const numberAffected = VoiceUsers.remove({ meetingId, intId });
const numberAffected = await VoiceUsers.removeAsync({ meetingId, intId });
if (numberAffected) {
Logger.info(`Remove voiceUser=${intId} meeting=${meetingId} (clear)`);

View File

@ -1,10 +1,10 @@
import Logger from '/imports/startup/server/logger';
import VoiceUsers from '/imports/api/voice-users';
export default function clearVoiceUser(meetingId) {
export default async function clearVoiceUser(meetingId) {
if (meetingId) {
try {
const numberAffected = VoiceUsers.remove({ meetingId });
const numberAffected = await VoiceUsers.removeAsync({ meetingId });
if (numberAffected) {
Logger.info(`Cleared VoiceUsers in (${meetingId})`);
@ -14,7 +14,7 @@ export default function clearVoiceUser(meetingId) {
}
} else {
try {
const numberAffected = VoiceUsers.remove({});
const numberAffected = await VoiceUsers.removeAsync({});
if (numberAffected) {
Logger.info('Cleared VoiceUsers in all meetings');

View File

@ -3,7 +3,7 @@ import Logger from '/imports/startup/server/logger';
import VoiceUsers from '/imports/api/voice-users';
import { clearSpokeTimeout } from '/imports/api/common/server/helpers';
export default function removeVoiceUser(meetingId, voiceUser) {
export default async function removeVoiceUser(meetingId, voiceUser) {
check(meetingId, String);
check(voiceUser, {
voiceConf: String,
@ -29,8 +29,8 @@ export default function removeVoiceUser(meetingId, voiceUser) {
};
try {
clearSpokeTimeout(meetingId, intId);
const numberAffected = VoiceUsers.update(selector, modifier);
await clearSpokeTimeout(meetingId, intId);
const numberAffected = await VoiceUsers.updateAsync(selector, modifier);
if (numberAffected) {
Logger.info(`Remove voiceUser=${intId} meeting=${meetingId}`);

View File

@ -6,7 +6,7 @@ import { spokeTimeoutHandles, clearSpokeTimeout } from '/imports/api/common/serv
const TALKING_TIMEOUT = 6000;
export default function updateVoiceUser(meetingId, voiceUser) {
export default async function updateVoiceUser(meetingId, voiceUser) {
check(meetingId, String);
check(voiceUser, {
intId: String,
@ -33,7 +33,7 @@ export default function updateVoiceUser(meetingId, voiceUser) {
};
if (voiceUser.talking) {
const user = VoiceUsers.findOne({ meetingId, intId }, {
const user = await VoiceUsers.findOneAsync({ meetingId, intId }, {
fields: {
startTime: 1,
},
@ -46,8 +46,8 @@ export default function updateVoiceUser(meetingId, voiceUser) {
}
if (!voiceUser.talking) {
const timeoutHandle = Meteor.setTimeout(() => {
const user = VoiceUsers.findOne({ meetingId, intId }, {
const timeoutHandle = Meteor.setTimeout(async () => {
const user = await VoiceUsers.findOneAsync({ meetingId, intId }, {
fields: {
endTime: 1,
talking: 1,
@ -61,7 +61,7 @@ export default function updateVoiceUser(meetingId, voiceUser) {
modifier.$set.spoke = false;
modifier.$set.startTime = null;
try {
const numberAffected = VoiceUsers.update(selector, modifier);
const numberAffected = await VoiceUsers.updateAsync(selector, modifier);
if (numberAffected) {
Logger.debug('Update voiceUser', { voiceUser: intId, meetingId });
@ -77,7 +77,7 @@ export default function updateVoiceUser(meetingId, voiceUser) {
}
try {
const numberAffected = VoiceUsers.update(selector, modifier);
const numberAffected = await VoiceUsers.updateAsync(selector, modifier);
if (numberAffected) {
Logger.debug('Update voiceUser', { voiceUser: intId, meetingId });

View File

@ -4,8 +4,9 @@ import Logger from '/imports/startup/server/logger';
import AuthTokenValidation, { ValidationStates } from '/imports/api/auth-token-validation';
import ejectUserFromVoice from './methods/ejectUserFromVoice';
function voiceUser() {
const tokenValidation = AuthTokenValidation.findOne({ connectionId: this.connection.id });
async function voiceUser() {
const tokenValidation = await AuthTokenValidation
.findOneAsync({ connectionId: this.connection.id });
if (!tokenValidation || tokenValidation.validationStatus !== ValidationStates.VALIDATED) {
Logger.warn(`Publishing VoiceUsers was requested by unauth connection ${this.connection.id}`);
@ -14,12 +15,12 @@ function voiceUser() {
const { meetingId, userId: requesterUserId } = tokenValidation;
const onCloseConnection = Meteor.bindEnvironment(() => {
const onCloseConnection = Meteor.bindEnvironment(async () => {
try {
// I used user because voiceUser is the function's name
const User = VoiceUsers.findOne({ meetingId, requesterUserId });
const User = await VoiceUsers.findOneAsync({ meetingId, requesterUserId });
if (User) {
ejectUserFromVoice(requesterUserId);
await ejectUserFromVoice(requesterUserId);
}
} catch (e) {
Logger.error(`Exception while executing ejectUserFromVoice for ${requesterUserId}: ${e}`);