Refactor: Migrate video-streams for the Meteor 3.0 api

This commit is contained in:
Tainan Felipe 2023-03-20 09:23:39 -03:00 committed by Ramón Souza
parent 6949e9b59b
commit 3959eab633
14 changed files with 57 additions and 48 deletions

View File

@ -10,7 +10,7 @@ if (Meteor.isServer) {
// types of queries for the video users:
// 2. meetingId
VideoStreams._ensureIndex({ meetingId: 1 });
VideoStreams.createIndexAsync({ meetingId: 1 });
}
export default VideoStreams;

View File

@ -1,12 +1,12 @@
import { check } from 'meteor/check';
import floorChanged from '../modifiers/floorChanged';
export default function handleFloorChanged({ header, body }, meetingId) {
export default async function handleFloorChanged({ header, body }, meetingId) {
const { intId, floor, lastFloorTime } = body;
check(meetingId, String);
check(intId, String);
check(floor, Boolean);
check(lastFloorTime, String);
return floorChanged(meetingId, intId, floor, lastFloorTime);
const result = await floorChanged(meetingId, intId, floor, lastFloorTime);
return result;
}

View File

@ -1,13 +1,13 @@
import { check } from 'meteor/check';
import changePin from '../modifiers/changePin';
export default function userPinChanged({ body }, meetingId) {
export default async function userPinChanged({ body }, meetingId) {
const { userId, pin, changedBy } = body;
check(meetingId, String);
check(userId, String);
check(pin, Boolean);
check(changedBy, String);
return changePin(meetingId, userId, pin, changedBy);
const result = await changePin(meetingId, userId, pin, changedBy);
return result;
}

View File

@ -2,7 +2,7 @@ import { check } from 'meteor/check';
import sharedWebcam from '../modifiers/sharedWebcam';
import { isValidStream } from '/imports/api/video-streams/server/helpers';
export default function handleUserSharedHtml5Webcam({ header, body }, meetingId) {
export default async function handleUserSharedHtml5Webcam({ header, body }, meetingId) {
const { userId, stream } = body;
check(header, Object);
@ -11,6 +11,6 @@ export default function handleUserSharedHtml5Webcam({ header, body }, meetingId)
check(stream, String);
if (!isValidStream(stream)) return false;
return sharedWebcam(meetingId, userId, stream);
const result = await sharedWebcam(meetingId, userId, stream);
return result;
}

View File

@ -2,7 +2,7 @@ import { check } from 'meteor/check';
import unsharedWebcam from '../modifiers/unsharedWebcam';
import { isValidStream } from '/imports/api/video-streams/server/helpers';
export default function handleUserUnsharedHtml5Webcam({ header, body }, meetingId) {
export default async function handleUserUnsharedHtml5Webcam({ header, body }, meetingId) {
const { userId, stream } = body;
check(header, Object);
@ -11,6 +11,6 @@ export default function handleUserUnsharedHtml5Webcam({ header, body }, meetingI
check(stream, String);
if (!isValidStream(stream)) return false;
return unsharedWebcam(meetingId, userId, stream);
const result = await unsharedWebcam(meetingId, userId, stream);
return result;
}

View File

@ -4,7 +4,7 @@ import VideoStreams from '/imports/api/video-streams/';
import updatedVideoStream from '../modifiers/updatedVideoStream';
import unsharedWebcam from '../modifiers/unsharedWebcam';
export default function handleWebcamSync({ body }, meetingId) {
export default async function handleWebcamSync({ body }, meetingId) {
check(meetingId, String);
check(body, Object);
const { webcamListSync } = body;
@ -12,28 +12,29 @@ export default function handleWebcamSync({ body }, meetingId) {
const streamsIds = webcamListSync.map((webcam) => webcam.stream);
const webcamStreamsToUpdate = VideoStreams.find({
const webcamStreams = VideoStreams.find({
meetingId,
stream: { $in: streamsIds },
}, {
fields: {
stream: 1,
},
}).fetch()
.map((m) => m.stream);
}).fetchAsync();
webcamListSync.forEach((webcam) => {
const webcamStreamsToUpdate = webcamStreams.map((m) => m.stream);
await Promise.all(webcamListSync.map(async (webcam) => {
if (webcamStreamsToUpdate.indexOf(webcam.stream) >= 0) {
// stream already exist, then update
updatedVideoStream(meetingId, webcam);
await updatedVideoStream(meetingId, webcam);
} else {
// stream doesn't exist yet, then add it
addWebcamSync(meetingId, webcam);
await addWebcamSync(meetingId, webcam);
}
});
}));
// removing extra video streams already existing in Mongo
const videoStreamsToRemove = VideoStreams.find({
const videoStreamsToRemove = await VideoStreams.find({
meetingId,
stream: { $nin: streamsIds },
}, {
@ -41,8 +42,10 @@ export default function handleWebcamSync({ body }, meetingId) {
stream: 1,
userId: 1,
},
}).fetch();
}).fetchAsynch();
videoStreamsToRemove
.forEach((videoStream) => unsharedWebcam(meetingId, videoStream.userId, videoStream.stream));
await Promise.all(videoStreamsToRemove
.map(async (videoStream) => {
await unsharedWebcam(meetingId, videoStream.userId, videoStream.stream);
}));
}

View File

@ -12,8 +12,8 @@ const getDeviceId = (stream) => {
return stream;
};
const getUserName = (userId, meetingId) => {
const user = Users.findOne(
const getUserName = async (userId, meetingId) => {
const user = await Users.findOneAsync(
{ userId, meetingId },
{ fields: { name: 1 } },
);

View File

@ -1,7 +1,7 @@
import Logger from '/imports/startup/server/logger';
import VideoStreams from '/imports/api/video-streams';
export default function changePin(meetingId, userId, pin) {
export default async function changePin(meetingId, userId, pin) {
const selector = {
meetingId,
userId,
@ -14,7 +14,7 @@ export default function changePin(meetingId, userId, pin) {
};
try {
const numberAffected = VideoStreams.update(selector, modifier, { multi: true });
const numberAffected = await VideoStreams.updateAsync(selector, modifier, { multi: true });
if (numberAffected) {
Logger.info(`Updated user streams pinned userId=${userId} pinned=${pin}`);

View File

@ -1,10 +1,10 @@
import Logger from '/imports/startup/server/logger';
import VideoStreams from '/imports/api/video-streams';
export default function clearVideoStreams(meetingId) {
export default async function clearVideoStreams(meetingId) {
if (meetingId) {
try {
const numberAffected = VideoStreams.remove({ meetingId });
const numberAffected = await VideoStreams.removeAsync({ meetingId });
if (numberAffected) {
Logger.info(`Cleared VideoStreams in (${meetingId})`);
@ -14,7 +14,7 @@ export default function clearVideoStreams(meetingId) {
}
} else {
try {
const numberAffected = VideoStreams.remove({});
const numberAffected = await VideoStreams.removeAsync({});
if (numberAffected) {
Logger.info('Cleared VideoStreams in all meetings');

View File

@ -2,7 +2,12 @@ import Logger from '/imports/startup/server/logger';
import VideoStreams from '/imports/api/video-streams';
import { check } from 'meteor/check';
export default function floorChanged(meetingId, userId, floor, lastFloorTime) {
export default async function floorChanged(
meetingId,
userId,
floor,
lastFloorTime,
) {
check(meetingId, String);
check(userId, String);
check(floor, Boolean);
@ -11,7 +16,7 @@ export default function floorChanged(meetingId, userId, floor, lastFloorTime) {
const selector = {
meetingId,
userId,
}
};
const modifier = {
$set: {
@ -21,7 +26,7 @@ export default function floorChanged(meetingId, userId, floor, lastFloorTime) {
};
try {
const numberAffected = VideoStreams.update(selector, modifier, { multi: true });
const numberAffected = await VideoStreams.updateAsync(selector, modifier, { multi: true });
if (numberAffected) {
Logger.info(`Updated user streams floor times userId=${userId} floor=${floor} lastFloorTime=${lastFloorTime}`);

View File

@ -11,18 +11,18 @@ import { lowercaseTrim } from '/imports/utils/string-utils';
const BASE_FLOOR_TIME = "0";
export default function sharedWebcam(meetingId, userId, stream) {
export default async function sharedWebcam(meetingId, userId, stream) {
check(meetingId, String);
check(userId, String);
check(stream, String);
const deviceId = getDeviceId(stream);
const name = getUserName(userId, meetingId);
const vu = VoiceUsers.findOne(
const name = await getUserName(userId, meetingId);
const vu = await VoiceUsers.findOneAsync(
{ meetingId, intId: userId },
{ fields: { floor: 1, lastFloorTime: 1 }}
) || {};
const u = Users.findOne(
const u = await Users.findOneAsync(
{ meetingId, intId: userId },
{ fields: { pin: 1 } },
) || {};
@ -49,7 +49,7 @@ export default function sharedWebcam(meetingId, userId, stream) {
};
try {
const { insertedId } = VideoStreams.upsert(selector, modifier);
const { insertedId } = await VideoStreams.upsertAsync(selector, modifier);
if (insertedId) {
Logger.info(`Updated stream=${stream} meeting=${meetingId}`);
@ -59,7 +59,7 @@ export default function sharedWebcam(meetingId, userId, stream) {
}
}
export function addWebcamSync(meetingId, videoStream) {
export async function addWebcamSync(meetingId, videoStream) {
check(videoStream, {
userId: String,
stream: String,
@ -92,7 +92,7 @@ export function addWebcamSync(meetingId, videoStream) {
};
try {
const { insertedId } = VideoStreams.upsert(selector, modifier);
const { insertedId } = await VideoStreams.upsertAsync(selector, modifier);
if (insertedId) {
Logger.info(`Synced stream=${stream} meeting=${meetingId}`);

View File

@ -3,7 +3,7 @@ import VideoStreams from '/imports/api/video-streams';
import { check } from 'meteor/check';
import { getDeviceId } from '/imports/api/video-streams/server/helpers';
export default function unsharedWebcam(meetingId, userId, stream) {
export default async function unsharedWebcam(meetingId, userId, stream) {
check(meetingId, String);
check(userId, String);
check(stream, String);
@ -17,7 +17,7 @@ export default function unsharedWebcam(meetingId, userId, stream) {
};
try {
VideoStreams.remove(selector);
await VideoStreams.removeAsync(selector);
Logger.info(`Removed stream=${stream} meeting=${meetingId}`);
} catch (err) {

View File

@ -3,7 +3,7 @@ import Logger from '/imports/startup/server/logger';
import VideoStreams from '/imports/api/video-streams';
import flat from 'flat';
export default function updateVideoStream(meetingId, videoStream) {
export default async function updateVideoStream(meetingId, videoStream) {
check(meetingId, String);
check(videoStream, {
userId: String,
@ -29,7 +29,7 @@ export default function updateVideoStream(meetingId, videoStream) {
};
try {
const numberAffected = VideoStreams.update(selector, modifier);
const numberAffected = await VideoStreams.updateAsync(selector, modifier);
if (numberAffected) {
Logger.debug(`Update videoStream ${stream} for user ${userId} in ${meetingId}`);

View File

@ -3,8 +3,9 @@ import Logger from '/imports/startup/server/logger';
import VideoStreams from '/imports/api/video-streams';
import AuthTokenValidation, { ValidationStates } from '/imports/api/auth-token-validation';
function videoStreams() {
const tokenValidation = AuthTokenValidation.findOne({ connectionId: this.connection.id });
async function videoStreams() {
const tokenValidation = await AuthTokenValidation
.findOneAsync({ connectionId: this.connection.id });
if (!tokenValidation || tokenValidation.validationStatus !== ValidationStates.VALIDATED) {
Logger.warn(`Publishing VideoStreams was requested by unauth connection ${this.connection.id}`);