Refactor the ConferenceCallManager class

This commit is contained in:
Robert Long 2021-08-06 14:56:14 -07:00
parent dff8a1acd3
commit 8e2688b3db
7 changed files with 325 additions and 366 deletions

View File

@ -1,3 +1,19 @@
/*
Copyright 2021 New Vector Ltd
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
import EventEmitter from "events";
export class ConferenceCallDebugger extends EventEmitter {
@ -11,17 +27,34 @@ export class ConferenceCallDebugger extends EventEmitter {
calls: new Map(),
};
this.bufferedEvents = [];
this.manager.on("call", this._onCall);
this.manager.on("debugstate", this._onDebugStateChanged);
this.manager.client.on("event", this._onEvent);
this.manager.on("entered", this._onEntered);
}
_onEntered = () => {
const eventCount = this.bufferedEvents.length;
for (let i = 0; i < eventCount; i++) {
const event = this.bufferedEvents.pop();
this._onEvent(event);
}
};
_onEvent = (event) => {
if (!this.manager.entered) {
this.bufferedEvents.push(event);
return;
}
const roomId = event.getRoomId();
const type = event.getType();
if (
roomId === this.manager.roomId &&
roomId === this.manager.room.roomId &&
(type.startsWith("m.call.") || type === "me.robertlong.call.info")
) {
const sender = event.getSender();
@ -332,7 +365,7 @@ export class ConferenceCallDebugger extends EventEmitter {
.map(processRemoteOutboundRTPStats);
this.manager.client.sendEvent(
this.manager.roomId,
this.manager.room.roomId,
"me.robertlong.call.info",
event
);
@ -373,7 +406,7 @@ export class ConferenceCallDebugger extends EventEmitter {
"icecandidateerror",
({ errorCode, url, errorText }) => {
this.manager.client.sendEvent(
this.manager.roomId,
this.manager.room.roomId,
"me.robertlong.call.ice_error",
{
call_id: call.callId,

View File

@ -131,343 +131,6 @@ export class ConferenceCallManager extends EventEmitter {
}
}
constructor(client) {
super();
this.client = client;
this.joined = false;
this.room = null;
const localUserId = client.getUserId();
this.localParticipant = {
local: true,
userId: localUserId,
stream: null,
call: null,
muted: true,
};
this.participants = [this.localParticipant];
this.pendingCalls = [];
this.client.on("RoomState.members", this._onMemberChanged);
this.client.on("Call.incoming", this._onIncomingCall);
this.callDebugger = new ConferenceCallDebugger(this);
}
setRoom(roomId) {
this.roomId = roomId;
this.room = this.client.getRoom(this.roomId);
}
async join() {
const mediaStream = await this.client.getLocalVideoStream();
this.localParticipant.stream = mediaStream;
this.joined = true;
this.emit("debugstate", this.client.getUserId(), null, "you");
const activeConf = this.room.currentState
.getStateEvents(CONF_ROOM, "")
?.getContent()?.active;
if (!activeConf) {
this.client.sendStateEvent(this.roomId, CONF_ROOM, { active: true }, "");
}
const roomMemberIds = this.room.getMembers().map(({ userId }) => userId);
roomMemberIds.forEach((userId) => {
this._processMember(userId);
});
for (const { call, onHangup, onReplaced } of this.pendingCalls) {
if (call.roomId !== this.roomId) {
continue;
}
call.removeListener("hangup", onHangup);
call.removeListener("replaced", onReplaced);
const userId = call.opponentMember.userId;
const existingParticipant = this.participants.find(
(p) => p.userId === userId
);
if (existingParticipant) {
existingParticipant.call = call;
}
this._addCall(call);
call.answer();
this.emit("call", call);
}
this.pendingCalls = [];
this._updateParticipantState();
}
_updateParticipantState = () => {
const userId = this.client.getUserId();
const currentMemberState = this.room.currentState.getStateEvents(
"m.room.member",
userId
);
this.client.sendStateEvent(
this.roomId,
"m.room.member",
{
...currentMemberState.getContent(),
[CONF_PARTICIPANT]: new Date().getTime(),
},
userId
);
this._participantStateTimeout = setTimeout(
this._updateParticipantState,
PARTICIPANT_TIMEOUT
);
};
_onMemberChanged = (_event, _state, member) => {
if (member.roomId !== this.roomId) {
return;
}
this._processMember(member.userId);
};
_processMember(userId) {
const localUserId = this.client.getUserId();
if (userId === localUserId) {
return;
}
// Don't process members until we've joined
if (!this.joined) {
return;
}
const participant = this.participants.find((p) => p.userId === userId);
if (participant) {
// Member has already been processed
return;
}
const memberStateEvent = this.room.currentState.getStateEvents(
"m.room.member",
userId
);
const participantTimeout = memberStateEvent.getContent()[CONF_PARTICIPANT];
if (
typeof participantTimeout !== "number" ||
new Date().getTime() - participantTimeout > PARTICIPANT_TIMEOUT
) {
// Member is inactive so don't call them.
this.emit("debugstate", userId, null, "inactive");
return;
}
// Only initiate a call with a user who has a userId that is lexicographically
// less than your own. Otherwise, that user will call you.
if (userId < localUserId) {
this.emit("debugstate", userId, null, "waiting for invite");
return;
}
const call = this.client.createCall(this.roomId, userId);
this._addCall(call);
call.placeVideoCall().then(() => {
this.emit("call", call);
});
}
_onIncomingCall = (call) => {
if (!this.joined) {
const onHangup = (call) => {
const index = this.pendingCalls.findIndex((p) => p.call === call);
if (index !== -1) {
this.pendingCalls.splice(index, 1);
}
};
const onReplaced = (call, newCall) => {
const index = this.pendingCalls.findIndex((p) => p.call === call);
if (index !== -1) {
this.pendingCalls.splice(index, 1, {
call: newCall,
onHangup: () => onHangup(newCall),
onReplaced: (nextCall) => onReplaced(newCall, nextCall),
});
}
};
this.pendingCalls.push({
call,
onHangup: () => onHangup(call),
onReplaced: (newCall) => onReplaced(call, newCall),
});
call.on("hangup", onHangup);
call.on("replaced", onReplaced);
return;
}
if (call.roomId !== this.roomId) {
return;
}
const userId = call.opponentMember.userId;
const existingParticipant = this.participants.find(
(p) => p.userId === userId
);
if (existingParticipant) {
existingParticipant.call = call;
}
this._addCall(call);
call.answer();
this.emit("call", call);
};
_addCall(call) {
const userId = call.opponentMember.userId;
this.participants.push({
userId,
stream: null,
call,
});
call.on("state", (state) =>
this.emit("debugstate", userId, call.callId, state)
);
call.on("feeds_changed", () => this._onCallFeedsChanged(call));
call.on("hangup", () => this._onCallHangup(call));
const onReplaced = (newCall) => {
this._onCallReplaced(call, newCall);
call.removeListener("replaced", onReplaced);
};
call.on("replaced", onReplaced);
this._onCallFeedsChanged(call);
this.emit("participants_changed");
}
_onCallFeedsChanged = (call) => {
for (const participant of this.participants) {
if (participant.local || participant.call !== call) {
continue;
}
const remoteFeeds = call.getRemoteFeeds();
if (
remoteFeeds.length > 0 &&
participant.stream !== remoteFeeds[0].stream
) {
participant.stream = remoteFeeds[0].stream;
this.emit("participants_changed");
}
}
};
_onCallHangup = (call) => {
const participantIndex = this.participants.findIndex(
(p) => !p.local && p.call === call
);
if (call.hangupReason === "replaced") {
return;
}
if (participantIndex === -1) {
return;
}
this.participants.splice(participantIndex, 1);
this.emit("participants_changed");
};
_onCallReplaced = (call, newCall) => {
const remoteParticipant = this.participants.find(
(p) => !p.local && p.call === call
);
remoteParticipant.call = newCall;
this.emit("call", newCall);
newCall.on("feeds_changed", () => this._onCallFeedsChanged(newCall));
newCall.on("hangup", () => this._onCallHangup(newCall));
newCall.on("replaced", (nextCall) =>
this._onCallReplaced(newCall, nextCall)
);
this._onCallFeedsChanged(newCall);
this.emit("participants_changed");
};
leaveCall() {
if (!this.joined) {
return;
}
const userId = this.client.getUserId();
const currentMemberState = this.room.currentState.getStateEvents(
"m.room.member",
userId
);
this.client.sendStateEvent(
this.roomId,
"m.room.member",
{
...currentMemberState.getContent(),
[CONF_PARTICIPANT]: null,
},
userId
);
for (const participant of this.participants) {
if (!participant.local && participant.call) {
participant.call.hangup("user_hangup", false);
}
}
this.client.stopLocalMediaStream();
this.joined = false;
this.participants = [this.localParticipant];
this.localParticipant.stream = null;
this.localParticipant.call = null;
this.emit("participants_changed");
}
logout() {
localStorage.removeItem("matrix-auth-store");
}
}
/**
* - incoming
* - you have not joined
* - you have joined
* - initial room members
* - new room members
*/
class ConferenceCallManager2 extends EventEmitter {
constructor(client) {
super();
@ -496,6 +159,7 @@ class ConferenceCallManager2 extends EventEmitter {
this.client.on("RoomState.members", this._onRoomStateMembers);
this.client.on("Call.incoming", this._onIncomingCall);
this.callDebugger = new ConferenceCallDebugger(this);
}
async enter(roomId, timeout = 30000) {
@ -505,7 +169,7 @@ class ConferenceCallManager2 extends EventEmitter {
// Get the room info, wait if it hasn't been fetched yet.
// Timeout after 30 seconds or the provided duration.
const room = await new Promise((resolve, reject) => {
const initialRoom = manager.client.getRoom(roomId);
const initialRoom = this.client.getRoom(roomId);
if (initialRoom) {
resolve(initialRoom);
@ -543,7 +207,10 @@ class ConferenceCallManager2 extends EventEmitter {
const stream = await this.client.getLocalVideoStream();
this.localParticipant = {
local: true,
userId,
sessionId: this.sessionId,
call: null,
stream,
};
@ -554,6 +221,16 @@ class ConferenceCallManager2 extends EventEmitter {
// Continue doing so every PARTICIPANT_TIMEOUT ms
this._updateMemberParticipantState();
this.entered = true;
// Answer any pending incoming calls.
const incomingCallCount = this._incomingCallQueue.length;
for (let i = 0; i < incomingCallCount; i++) {
const call = this._incomingCallQueue.pop();
this._onIncomingCall(call);
}
// Set up participants for the members currently in the room.
// Other members will be picked up by the RoomState.members event.
const initialMembers = room.getMembers();
@ -562,9 +239,55 @@ class ConferenceCallManager2 extends EventEmitter {
this._onMemberChanged(member);
}
this.entered = true;
this.emit("entered");
this.emit("participants_changed");
}
leaveCall() {
if (!this.entered) {
return;
}
const userId = this.client.getUserId();
const currentMemberState = this.room.currentState.getStateEvents(
"m.room.member",
userId
);
this.client.sendStateEvent(
this.room.roomId,
"m.room.member",
{
...currentMemberState.getContent(),
[CONF_PARTICIPANT]: null,
},
userId
);
for (const participant of this.participants) {
if (!participant.local && participant.call) {
participant.call.hangup("user_hangup", false);
}
}
this.client.stopLocalMediaStream();
this.entered = false;
this.participants = [this.localParticipant];
this.localParticipant.stream = null;
this.localParticipant.call = null;
this.emit("participants_changed");
}
logout() {
localStorage.removeItem("matrix-auth-store");
}
/**
* Call presence
*/
_updateMemberParticipantState = () => {
const userId = this.client.getUserId();
const currentMemberState = this.room.currentState.getStateEvents(
@ -585,6 +308,35 @@ class ConferenceCallManager2 extends EventEmitter {
userId
);
const now = new Date().getTime();
for (const participant of this.participants) {
if (participant.local) {
continue;
}
const memberStateEvent = this.room.currentState.getStateEvents(
"m.room.member",
participant.userId
);
const participantInfo = memberStateEvent.getContent()[CONF_PARTICIPANT];
if (
!participantInfo ||
(participantInfo.expiresAt && participantInfo.expiresAt < now)
) {
this.emit("debugstate", participant.userId, null, "inactive");
if (participant.call) {
// NOTE: This should remove the participant on the next tick
// since matrix-js-sdk awaits a promise before firing user_hangup
participant.call.hangup("user_hangup", false);
}
return;
}
}
this._memberParticipantStateTimeout = setTimeout(
this._updateMemberParticipantState,
PARTICIPANT_TIMEOUT
@ -601,32 +353,70 @@ class ConferenceCallManager2 extends EventEmitter {
*/
_onIncomingCall = (call) => {
// The incoming calls may be for another room, which we will ignore.
if (call.roomId !== this.room.roomId) {
return;
}
// If we haven't entered yet, add the call to a queue which we'll use later.
if (!this.entered) {
this._incomingCallQueue.push(call);
return;
}
// Check if the user calling has an existing participant and use this call instead.
// The incoming calls may be for another room, which we will ignore.
if (call.roomId !== this.room.roomId) {
return;
}
if (call.state !== "ringing") {
console.warn("Incoming call no longer in ringing state. Ignoring.");
return;
}
// Get the remote video stream if it exists.
const stream = call.getRemoteFeeds()[0]?.stream;
const userId = call.opponentMember.userId;
const existingParticipant = manager.participants.find(
const memberStateEvent = this.room.currentState.getStateEvents(
"m.room.member",
userId
);
const { sessionId } = memberStateEvent.getContent()[CONF_PARTICIPANT];
// Check if the user calling has an existing participant and use this call instead.
const existingParticipant = this.participants.find(
(p) => p.userId === userId
);
let participant;
if (existingParticipant) {
participant = existingParticipant;
// This also fires the hangup event and triggers those side-effects
existingParticipant.call.hangup("user_hangup", false);
existingParticipant.call.hangup("replaced", false);
existingParticipant.call = call;
existingParticipant.stream = stream;
existingParticipant.sessionId = sessionId;
} else {
participant = {
local: false,
userId,
sessionId,
call,
stream,
};
this.participants.push(participant);
}
call.on("state", (state) =>
this._onCallStateChanged(participant, call, state)
);
call.on("feeds_changed", () => this._onCallFeedsChanged(participant, call));
call.on("replaced", (newCall) =>
this._onCallReplaced(participant, call, newCall)
);
call.on("hangup", () => this._onCallHangup(participant, call));
call.answer();
this.emit("call", call);
this.emit("participants_changed");
};
_onRoomStateMembers = (_event, _state, member) => {
@ -644,19 +434,27 @@ class ConferenceCallManager2 extends EventEmitter {
return;
}
// Don't process your own member.
const localUserId = this.client.getUserId();
if (member.userId === localUserId) {
return;
}
// Get the latest member participant state event.
const memberStateEvent = this.room.currentState.getStateEvents(
"m.room.member",
member.userId
);
const { expiresAt, sessionId } =
memberStateEvent.getContent()[CONF_PARTICIPANT];
const participantInfo = memberStateEvent.getContent()[CONF_PARTICIPANT];
if (!participantInfo) {
return;
}
const { expiresAt, sessionId } = participantInfo;
// If the participant state has expired, ignore this user.
const now = new Date().getTime();
if (expiresAt < now) {
@ -664,15 +462,114 @@ class ConferenceCallManager2 extends EventEmitter {
return;
}
// Check the session id and expiration time of the existing participant to see if we should
// hang up the existing call and create a new one or ignore the changed member.
const participant = this.participants.find((p) => p.userId === userId);
// If there is an existing participant for this member check the session id.
// If the session id changed then we can hang up the old call and start a new one.
// Otherwise, ignore the member change event because we already have an active participant.
let participant = this.participants.find((p) => p.userId === member.userId);
if (participant && participant.sessionId !== sessionId) {
this.emit("debugstate", member.userId, null, "inactive");
participant.call.hangup("user_hangup", false);
if (participant) {
if (participant.sessionId !== sessionId) {
this.emit("debugstate", member.userId, null, "inactive");
participant.call.hangup("replaced", false);
} else {
return;
}
}
this.emit("call", call);
// Only initiate a call with a user who has a userId that is lexicographically
// less than your own. Otherwise, that user will call you.
if (member.userId < localUserId) {
this.emit("debugstate", member.userId, null, "waiting for invite");
return;
}
const call = this.client.createCall(this.room.roomId, member.userId);
if (participant) {
participant.sessionId = sessionId;
participant.call = call;
participant.stream = null;
} else {
participant = {
local: false,
userId: member.userId,
sessionId,
call,
stream: null,
};
this.participants.push(participant);
}
call.on("state", (state) =>
this._onCallStateChanged(participant, call, state)
);
call.on("feeds_changed", () => this._onCallFeedsChanged(participant, call));
call.on("replaced", (newCall) =>
this._onCallReplaced(participant, call, newCall)
);
call.on("hangup", () => this._onCallHangup(participant, call));
call.placeVideoCall().then(() => {
this.emit("call", call);
});
this.emit("participants_changed");
};
/**
* Call Event Handlers
*/
_onCallStateChanged = (participant, call, state) => {
this.emit("debugstate", participant.userId, call.callId, state);
};
_onCallFeedsChanged = (participant, call) => {
const feeds = call.getRemoteFeeds();
if (feeds.length > 0 && participant.stream !== feeds[0].stream) {
participant.stream = feeds[0].stream;
this.emit("participants_changed");
}
};
_onCallReplaced = (participant, call, newCall) => {
participant.call = newCall;
newCall.on("state", (state) =>
this._onCallStateChanged(participant, newCall, state)
);
newCall.on("feeds_changed", () =>
this._onCallFeedsChanged(participant, newCall)
);
newCall.on("replaced", (nextCall) =>
this._onCallReplaced(participant, newCall, nextCall)
);
newCall.on("hangup", () => this._onCallHangup(participant, newCall));
const feeds = newCall.getRemoteFeeds();
if (feeds.length > 0) {
participant.stream = feeds[0].stream;
}
this.emit("call", newCall);
this.emit("participants_changed");
};
_onCallHangup = (participant, call) => {
if (call.hangupReason === "replaced") {
return;
}
const participantIndex = this.participants.indexOf(participant);
if (participantIndex === -1) {
return;
}
this.participants.splice(participantIndex, 1);
this.emit("participants_changed");
};
}

View File

@ -170,7 +170,6 @@ export function useVideoRoom(manager, roomId, timeout = 5000) {
let initialRoom = manager.client.getRoom(roomId);
if (initialRoom) {
manager.setRoom(roomId);
setState((prevState) => ({
...prevState,
loading: false,
@ -186,7 +185,6 @@ export function useVideoRoom(manager, roomId, timeout = 5000) {
if (room && room.roomId === roomId) {
clearTimeout(timeoutId);
manager.client.removeListener("Room", roomCallback);
manager.setRoom(roomId);
setState((prevState) => ({
...prevState,
loading: false,
@ -226,7 +224,7 @@ export function useVideoRoom(manager, roomId, timeout = 5000) {
manager.on("participants_changed", onParticipantsChanged);
manager
.join()
.enter(roomId)
.then(() => {
setState((prevState) => ({
...prevState,

View File

@ -1,3 +1,19 @@
/*
Copyright 2021 New Vector Ltd
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
import React, { useCallback, useEffect, useRef, useState } from "react";
import ColorHash from "color-hash";
import classNames from "classnames";
@ -56,7 +72,7 @@ export function DevTools({ manager }) {
};
}, [manager]);
if (!manager.joined) {
if (!manager.entered) {
return <div className={styles.devTools} />;
}

View File

@ -1,3 +1,19 @@
/*
Copyright 2021 New Vector Ltd
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
.devTools {
display: flex;
flex-direction: column;

View File

@ -104,12 +104,12 @@ export function Room({ manager }) {
);
}
function Participant({ userId, stream, muted, local }) {
function Participant({ userId, stream, local }) {
const videoRef = useRef();
useEffect(() => {
if (stream) {
if (muted) {
if (local) {
videoRef.current.muted = true;
}

View File

@ -14,7 +14,6 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
.room {
position: relative;
display: flex;