5ee0ff9af2
Same rationale as in video-provider's commit (34fa37ae4f092af4a5aef0cf01d96c033d97473c). This commit does the following: - Implement actual heartbeat checks to trigger reconnects when necessary - Properly catch and log WebSocket.send errors
365 lines
9.7 KiB
JavaScript
365 lines
9.7 KiB
JavaScript
import logger from '/imports/startup/client/logger';
|
|
import { notifyStreamStateChange } from '/imports/ui/services/bbb-webrtc-sfu/stream-state-service';
|
|
import { SFU_BROKER_ERRORS } from '/imports/ui/services/bbb-webrtc-sfu/broker-base-errors';
|
|
|
|
const WS_HEARTBEAT_OPTS = {
|
|
interval: 15000,
|
|
delay: 3000,
|
|
};
|
|
|
|
class BaseBroker {
|
|
static assembleError(code, reason) {
|
|
const message = reason || SFU_BROKER_ERRORS[code];
|
|
const error = new Error(message);
|
|
error.errorCode = code;
|
|
// Duplicating key-vals because we can't settle on an error pattern... - prlanzarin
|
|
error.errorCause = error.message;
|
|
error.errorMessage = error.message;
|
|
|
|
return error;
|
|
}
|
|
|
|
constructor(sfuComponent, wsUrl) {
|
|
this.wsUrl = wsUrl;
|
|
this.sfuComponent = sfuComponent;
|
|
this.ws = null;
|
|
this.webRtcPeer = null;
|
|
this.wsHeartbeat = null;
|
|
this.started = false;
|
|
this.signallingTransportOpen = false;
|
|
this.logCodePrefix = `${this.sfuComponent}_broker`;
|
|
this.peerConfiguration = {};
|
|
|
|
this.onbeforeunload = this.onbeforeunload.bind(this);
|
|
this._onWSError = this._onWSError.bind(this);
|
|
window.addEventListener('beforeunload', this.onbeforeunload);
|
|
}
|
|
|
|
set started (val) {
|
|
this._started = val;
|
|
}
|
|
|
|
get started () {
|
|
return this._started;
|
|
}
|
|
|
|
onbeforeunload () {
|
|
return this.stop();
|
|
}
|
|
|
|
onstart () {
|
|
// To be implemented by inheritors
|
|
}
|
|
|
|
onerror (error) {
|
|
// To be implemented by inheritors
|
|
}
|
|
|
|
onended () {
|
|
// To be implemented by inheritors
|
|
}
|
|
|
|
handleSFUError (sfuResponse) {
|
|
// To be implemented by inheritors
|
|
}
|
|
|
|
sendLocalDescription (localDescription) {
|
|
// To be implemented by inheritors
|
|
}
|
|
|
|
_onWSMessage(message) {
|
|
this._updateLastMsgTime();
|
|
this.onWSMessage(message);
|
|
}
|
|
|
|
onWSMessage(message) {
|
|
// To be implemented by inheritors
|
|
}
|
|
|
|
_onWSError(error) {
|
|
let normalizedError;
|
|
|
|
logger.error({
|
|
logCode: `${this.logCodePrefix}_websocket_error`,
|
|
extraInfo: {
|
|
errorMessage: error.name || error.message || 'Unknown error',
|
|
sfuComponent: this.sfuComponent,
|
|
}
|
|
}, 'WebSocket connection to SFU failed');
|
|
|
|
if (this.signallingTransportOpen) {
|
|
// 1301: "WEBSOCKET_DISCONNECTED", transport was already open
|
|
normalizedError = BaseBroker.assembleError(1301);
|
|
} else {
|
|
// 1302: "WEBSOCKET_CONNECTION_FAILED", transport errored before establishment
|
|
normalizedError = BaseBroker.assembleError(1302);
|
|
}
|
|
|
|
this.onerror(normalizedError);
|
|
return normalizedError;
|
|
}
|
|
|
|
openWSConnection () {
|
|
return new Promise((resolve, reject) => {
|
|
this.ws = new WebSocket(this.wsUrl);
|
|
|
|
this.ws.onmessage = this._onWSMessage.bind(this);
|
|
|
|
this.ws.onclose = () => {
|
|
// 1301: "WEBSOCKET_DISCONNECTED",
|
|
this.onerror(BaseBroker.assembleError(1301));
|
|
};
|
|
|
|
this.ws.onerror = (error) => reject(this._onWSError(error));
|
|
|
|
this.ws.onopen = () => {
|
|
this.setupWSHeartbeat();
|
|
this.signallingTransportOpen = true;
|
|
return resolve();
|
|
};
|
|
});
|
|
}
|
|
|
|
closeWs() {
|
|
this.clearWSHeartbeat();
|
|
|
|
if (this.ws !== null) {
|
|
this.ws.onclose = function (){};
|
|
this.ws.close();
|
|
}
|
|
}
|
|
|
|
_updateLastMsgTime() {
|
|
this.ws.isAlive = true;
|
|
this.ws.lastMsgTime = Date.now();
|
|
}
|
|
|
|
_getTimeSinceLastMsg() {
|
|
return Date.now() - this.ws.lastMsgTime;
|
|
}
|
|
|
|
setupWSHeartbeat() {
|
|
if (WS_HEARTBEAT_OPTS.interval === 0 || this.ws == null) return;
|
|
|
|
this.ws.isAlive = true;
|
|
this.wsHeartbeat = setInterval(() => {
|
|
if (this.ws.isAlive === false) {
|
|
logger.warn({
|
|
logCode: `${this.logCodePrefix}_ws_heartbeat_failed`,
|
|
}, `WS heartbeat failed (${this.sfuComponent})`);
|
|
this.closeWs();
|
|
this._onWSError(new Error('HeartbeatFailed'));
|
|
return;
|
|
}
|
|
|
|
if (this._getTimeSinceLastMsg() < (
|
|
WS_HEARTBEAT_OPTS.interval - WS_HEARTBEAT_OPTS.delay
|
|
)) {
|
|
return;
|
|
}
|
|
|
|
this.ws.isAlive = false;
|
|
this.ping();
|
|
}, WS_HEARTBEAT_OPTS.interval);
|
|
|
|
this.ping();
|
|
}
|
|
|
|
clearWSHeartbeat() {
|
|
if (this.wsHeartbeat) {
|
|
clearInterval(this.wsHeartbeat);
|
|
}
|
|
}
|
|
|
|
sendMessage (message) {
|
|
const jsonMessage = JSON.stringify(message);
|
|
|
|
try {
|
|
this.ws.send(jsonMessage);
|
|
} catch (error) {
|
|
logger.error({
|
|
logCode: `${this.logCodePrefix}_ws_send_error`,
|
|
extraInfo: {
|
|
errorName: error.name,
|
|
errorMessage: error.message,
|
|
sfuComponent: this.sfuComponent,
|
|
},
|
|
}, `Failed to send WebSocket message (${this.sfuComponent})`);
|
|
}
|
|
}
|
|
|
|
ping () {
|
|
this.sendMessage({ id: 'ping' });
|
|
}
|
|
|
|
_processRemoteDescription(localDescription = null) {
|
|
// There is a new local description; send it back to the server
|
|
if (localDescription) this.sendLocalDescription(localDescription);
|
|
// Mark the peer as negotiated and flush the ICE queue
|
|
this.webRtcPeer.negotiated = true;
|
|
this.processIceQueue();
|
|
}
|
|
|
|
_validateStartResponse (sfuResponse) {
|
|
const { response, role } = sfuResponse;
|
|
|
|
if (response !== 'accepted') {
|
|
this.handleSFUError(sfuResponse);
|
|
return false;
|
|
}
|
|
|
|
logger.debug({
|
|
logCode: `${this.logCodePrefix}_start_success`,
|
|
extraInfo: {
|
|
role,
|
|
sfuComponent: this.sfuComponent,
|
|
}
|
|
}, `Start request accepted for ${this.sfuComponent}`);
|
|
|
|
return true;
|
|
}
|
|
|
|
processOffer(sfuResponse) {
|
|
if (this._validateStartResponse(sfuResponse)) {
|
|
this.webRtcPeer.processOffer(sfuResponse.sdpAnswer)
|
|
.then(this._processRemoteDescription.bind(this))
|
|
.catch((error) => {
|
|
logger.error({
|
|
logCode: `${this.logCodePrefix}_processoffer_error`,
|
|
extraInfo: {
|
|
errorMessage: error.name || error.message || 'Unknown error',
|
|
sfuComponent: this.sfuComponent,
|
|
},
|
|
}, `Error processing offer from SFU for ${this.sfuComponent}`);
|
|
// 1305: "PEER_NEGOTIATION_FAILED",
|
|
this.onerror(BaseBroker.assembleError(1305));
|
|
});
|
|
}
|
|
}
|
|
|
|
processAnswer(sfuResponse) {
|
|
if (this._validateStartResponse(sfuResponse)) {
|
|
this.webRtcPeer.processAnswer(sfuResponse.sdpAnswer)
|
|
.then(this._processRemoteDescription.bind(this))
|
|
.catch((error) => {
|
|
logger.error({
|
|
logCode: `${this.logCodePrefix}_processanswer_error`,
|
|
extraInfo: {
|
|
errorMessage: error.name || error.message || 'Unknown error',
|
|
sfuComponent: this.sfuComponent,
|
|
},
|
|
}, `Error processing answer from SFU for ${this.sfuComponent}`);
|
|
// 1305: "PEER_NEGOTIATION_FAILED",
|
|
this.onerror(BaseBroker.assembleError(1305));
|
|
});
|
|
}
|
|
}
|
|
|
|
populatePeerConfiguration () {
|
|
this.addIceServers();
|
|
if (this.forceRelay) {
|
|
this.setRelayTransportPolicy();
|
|
}
|
|
|
|
return this.peerConfiguration;
|
|
}
|
|
|
|
addIceServers () {
|
|
if (this.iceServers && this.iceServers.length > 0) {
|
|
this.peerConfiguration.iceServers = this.iceServers;
|
|
}
|
|
}
|
|
|
|
setRelayTransportPolicy () {
|
|
this.peerConfiguration.iceTransportPolicy = 'relay';
|
|
}
|
|
|
|
handleConnectionStateChange (eventIdentifier) {
|
|
if (this.webRtcPeer) {
|
|
const { peerConnection } = this.webRtcPeer;
|
|
const connectionState = peerConnection.connectionState;
|
|
if (eventIdentifier) {
|
|
notifyStreamStateChange(eventIdentifier, connectionState);
|
|
}
|
|
|
|
if (connectionState === 'failed' || connectionState === 'closed') {
|
|
this.webRtcPeer.peerConnection.onconnectionstatechange = null;
|
|
// 1307: "ICE_STATE_FAILED",
|
|
const error = BaseBroker.assembleError(1307);
|
|
this.onerror(error);
|
|
}
|
|
}
|
|
}
|
|
|
|
addIceCandidate(candidate) {
|
|
this.webRtcPeer.addIceCandidate(candidate).catch((error) => {
|
|
// Just log the error. We can't be sure if a candidate failure on add is
|
|
// fatal or not, so that's why we have a timeout set up for negotiations and
|
|
// listeners for ICE state transitioning to failures, so we won't act on it here
|
|
logger.error({
|
|
logCode: `${this.logCodePrefix}_addicecandidate_error`,
|
|
extraInfo: {
|
|
errorMessage: error.name || error.message || 'Unknown error',
|
|
errorCode: error.code || 'Unknown code',
|
|
sfuComponent: this.sfuComponent,
|
|
started: this.started,
|
|
},
|
|
}, 'Adding ICE candidate failed');
|
|
});
|
|
}
|
|
|
|
processIceQueue () {
|
|
const peer = this.webRtcPeer;
|
|
while (peer.iceQueue.length) {
|
|
const candidate = peer.iceQueue.shift();
|
|
this.addIceCandidate(candidate);
|
|
}
|
|
}
|
|
|
|
handleIceCandidate (candidate) {
|
|
const peer = this.webRtcPeer;
|
|
|
|
if (peer.negotiated) {
|
|
this.addIceCandidate(candidate);
|
|
} else {
|
|
// ICE candidates are queued until a SDP answer has been processed.
|
|
// This was done due to a long term iOS/Safari quirk where it'd (as of 2018)
|
|
// fail if candidates were added before the offer/answer cycle was completed.
|
|
// IT STILL HAPPENS - prlanzarin sept 2019
|
|
// still happens - prlanzarin sept 2020
|
|
peer.iceQueue.push(candidate);
|
|
}
|
|
}
|
|
|
|
disposePeer () {
|
|
if (this.webRtcPeer) {
|
|
this.webRtcPeer.dispose();
|
|
this.webRtcPeer = null;
|
|
}
|
|
}
|
|
|
|
stop () {
|
|
this.onstart = function(){};
|
|
this.onerror = function(){};
|
|
window.removeEventListener('beforeunload', this.onbeforeunload);
|
|
|
|
if (this.webRtcPeer) {
|
|
this.webRtcPeer.peerConnection.onconnectionstatechange = null;
|
|
}
|
|
|
|
this.closeWs();
|
|
this.disposePeer();
|
|
this.started = false;
|
|
|
|
logger.debug({
|
|
logCode: `${this.logCodePrefix}_stop`,
|
|
extraInfo: { sfuComponent: this.sfuComponent },
|
|
}, `Stopped broker session for ${this.sfuComponent}`);
|
|
|
|
this.onended();
|
|
this.onended = function(){};
|
|
}
|
|
}
|
|
|
|
export default BaseBroker;
|