Merge pull request #11033 from antobinary/merge-31

Merge 2.2.31 into 2.3
This commit is contained in:
Anton Georgiev 2020-12-18 09:58:09 -05:00 committed by GitHub
commit 2d6565d4f9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
29 changed files with 1096 additions and 320 deletions

View File

@ -201,7 +201,7 @@ HERE
}
disableMultipleKurentos() {
echo " - Configuring a single Kurento Media Server for listen only, webcam, and screeshare"
echo " - Configuring a single Kurento Media Server for listen only, webcam, and screenshare"
systemctl stop kurento-media-server.service
for i in `seq 8888 8890`; do
@ -269,6 +269,8 @@ source /etc/bigbluebutton/bbb-conf/apply-lib.sh
#setNumberOfHTML5Processes 2
# Shorten the FreeSWITCH "you have been muted" and "you have been unmuted" prompts
# cp -r /etc/bigbluebutton/bbb-conf/sounds /opt/freeswitch/share/freeswitch
HERE
chmod +x /etc/bigbluebutton/bbb-conf/apply-config.sh

45
bigbluebutton-config/bin/bbb-conf Executable file → Normal file
View File

@ -650,20 +650,22 @@ check_configuration() {
fi
fi
if [ "$IP" != "$NGINX_IP" ] && [ "_" != "$NGINX_IP" ]; then
if [ "$IP" != "$HOSTS" ]; then
echo "# IP does not match:"
echo "# IP from ifconfig: $IP"
echo "# /etc/nginx/sites-available/bigbluebutton: $NGINX_IP"
fi
fi
# Depreciated: BigBlueButton must be installed with a valid hostname, so comparing a hostnae to IP address
# does not provide any useful information.
#if [ "$IP" != "$NGINX_IP" ]; then
# if [ "$IP" != "$HOSTS" ]; then
# echo "# IP does not match:"
# echo "# IP from ifconfig: $IP"
# echo "# /etc/nginx/sites-available/bigbluebutton: $NGINX_IP"
# fi
#fi
if [ -f /var/lib/tomcat7/webapps/demo/bbb_api_conf.jsp ]; then
#
# Make sure the shared secret for the API matches the server
#
SECRET_PROPERTIES=$(cat ${SERVLET_DIR}/WEB-INF/classes/bigbluebutton.properties | grep -v '#' | tr -d '\r' | sed -n '/securitySalt/{s/.*=//;p}')
SECRET_DEMO=$(cat /var/lib/tomcat7/webapps/demo/bbb_api_conf.jsp | grep -v '^//' | tr -d '\r' | sed -n '/salt[ ]*=/{s/.*=[ ]*"//;s/".*//g;p}')
SECRET_DEMO=$(cat ${TOMCAT_DIR}/webapps/demo/bbb_api_conf.jsp | grep -v '^//' | tr -d '\r' | sed -n '/salt[ ]*=/{s/.*=[ ]*"//;s/".*//g;p}')
if [ "$SECRET_PROPERTIES" != "$SECRET_DEMO" ]; then
echo "#"
@ -677,12 +679,12 @@ check_configuration() {
echo
fi
API_IP=$(cat /var/lib/tomcat7/webapps/demo/bbb_api_conf.jsp | grep -v '^//' | sed -n '/String BigBlueButtonURL/{s/.*http[s]*:\/\///;s/\/.*//;p}' | tr -d '\015')
if [ "$IP" != "$API_IP" ]; then
echo "# Warning: API URL IPs do not match host:"
if ! grep -q https ${TOMCAT_DIR}/webapps/demo/bbb_api_conf.jsp; then
echo
echo "# Warning: Did not detect https for API demos in "
echo "#"
echo "# ${TOMCAT_DIR}/webapps/demo/bbb_api_conf.jsp"
echo "#"
echo "# IP from ifconfig: $IP"
echo "# ${TOMCAT_DIR}/demo/bbb_api_conf.jsp: $API_IP"
echo
fi
fi
@ -1006,7 +1008,8 @@ check_state() {
echo
fi
if [ "$SIP_NGINX_IP" != $IP ]; then
if [ "$(yq r /usr/share/meteor/bundle/programs/server/assets/app/config/settings.yml public.media.sipjsHackViaWs)" == "false" ]; then
if [ "$SIP_NGINX_IP" != $IP ]; then
if [ "$SIP_NGINX_IP" != "\$freeswitch_addr" ]; then
echo "# Warning: The setting of $SIP_NGINX_IP for proxy_pass in"
echo "#"
@ -1016,6 +1019,7 @@ check_state() {
echo "# (This is OK if you've manually changed the values)"
echo
fi
fi
fi
VARS_IP=$(cat $FREESWITCH_VARS | sed -n '/"local_ip_v4/{s/.*local_ip_v4=//;s/".*//;p}')
@ -1258,7 +1262,7 @@ if [ $CHECK ]; then
echo
echo "/etc/nginx/sites-available/bigbluebutton (nginx)"
echo " server name: $NGINX_IP"
echo " server_name: $NGINX_IP"
PORT=$(cat /etc/nginx/sites-available/bigbluebutton | grep -v '#' | sed -n '/listen/{s/.*listen[ ]*//;s/;//;p}' | grep -v ssl | tr --delete '\n' | sed 's/\[/, \[/g' | sed 's/0$/0\n/g')
echo " port: $PORT"
@ -1305,6 +1309,7 @@ if [ $CHECK ]; then
echo
echo "$SIP_CONFIG (sip.nginx)"
echo " proxy_pass: $SIP_NGINX_IP"
echo " protocol: $(cat /etc/bigbluebutton/nginx/sip.nginx | grep -v \# | sed -n '/proxy_pass/{s/.*proxy_pass [ ]*//;s/:.*//;p}' | head -n 1)"
fi
if [ -f $KURENTO_CONFIG ]; then
@ -1326,6 +1331,16 @@ if [ $CHECK ]; then
echo " build: $(yq r $HTML5_CONFIG public.app.html5ClientBuild)"
echo " kurentoUrl: $(yq r $HTML5_CONFIG public.kurento.wsUrl)"
echo " enableListenOnly: $(yq r $HTML5_CONFIG public.kurento.enableListenOnly)"
echo " sipjsHackViaWs: $(yq r $HTML5_CONFIG public.media.sipjsHackViaWs)"
fi
TURN=/usr/share/bbb-web/WEB-INF/classes/spring/turn-stun-servers.xml
STUN="$(xmlstarlet sel -N x="http://www.springframework.org/schema/beans" -t -m '_:beans/_:bean[@class="org.bigbluebutton.web.services.turn.StunTurnService"]/_:property[@name="stunServers"]/_:set/_:ref' -v @bean $TURN)"
if [ ! -z "$STUN" ]; then
echo
echo "$TURN (STUN Server)"
echo " $(xmlstarlet sel -N x="http://www.springframework.org/schema/beans" -t -m "_:beans/_:bean[@id=\"$STUN\"]/_:constructor-arg[@index=\"0\"]" -v @value $TURN)"
fi
if [ "$DISTRIB_CODENAME" == "xenial" ]; then

View File

@ -0,0 +1,3 @@
These wav files enable the administrator to shorten the FreeSWITCH audio prompts "you have been muted" and "you have been unmuted" to "muted" and "unmuted" versions.
cp -r /etc/bigbluebutton/bbb-conf/sounds /opt/freeswitch/share/freeswitch

View File

@ -1,6 +1,9 @@
import { check } from 'meteor/check';
import AnnotationsStreamer from '/imports/api/annotations/server/streamer';
import addAnnotation from '../modifiers/addAnnotation';
import Metrics from '/imports/startup/server/metrics';
const { queueMetrics } = Meteor.settings.private.redis.metrics;
const ANNOTATION_PROCCESS_INTERVAL = 60;
@ -15,6 +18,9 @@ const proccess = () => {
annotationsRecieverIsRunning = true;
Object.keys(annotationsQueue).forEach((meetingId) => {
AnnotationsStreamer(meetingId).emit('added', { meetingId, annotations: annotationsQueue[meetingId] });
if (queueMetrics) {
Metrics.setAnnotationQueueLength(meetingId, 0);
}
});
annotationsQueue = {};
@ -31,11 +37,14 @@ export default function handleWhiteboardSend({ header, body }, meetingId) {
const whiteboardId = annotation.wbId;
check(whiteboardId, String);
if(!annotationsQueue.hasOwnProperty(meetingId)) {
if (!annotationsQueue.hasOwnProperty(meetingId)) {
annotationsQueue[meetingId] = [];
}
annotationsQueue[meetingId].push({ meetingId, whiteboardId, userId, annotation });
if (queueMetrics) {
Metrics.setAnnotationQueueLength(meetingId, annotationsQueue[meetingId].length);
}
if (!annotationsRecieverIsRunning) proccess();
return addAnnotation(meetingId, whiteboardId, userId, annotation);

View File

@ -1,258 +1,53 @@
import BaseAudioBridge from './base';
import Auth from '/imports/ui/services/auth';
import { fetchWebRTCMappedStunTurnServers, getMappedFallbackStun } from '/imports/utils/fetchStunTurnServers';
import playAndRetry from '/imports/utils/mediaElementPlayRetry';
import logger from '/imports/startup/client/logger';
import ListenOnlyBroker from '/imports/ui/services/bbb-webrtc-sfu/listenonly-broker';
import loadAndPlayMediaStream from '/imports/ui/services/bbb-webrtc-sfu/load-play';
import {
fetchWebRTCMappedStunTurnServers,
getMappedFallbackStun
} from '/imports/utils/fetchStunTurnServers';
const SFU_URL = Meteor.settings.public.kurento.wsUrl;
const MEDIA = Meteor.settings.public.media;
const MEDIA_TAG = MEDIA.mediaTag.replace(/#/g, '');
const GLOBAL_AUDIO_PREFIX = 'GLOBAL_AUDIO_';
const RECONNECT_TIMEOUT_MS = MEDIA.listenOnlyCallTimeout || 15000;
const RECV_ROLE = 'recv';
const BRIDGE_NAME = 'kurento';
// SFU's base broker has distinct error codes so that it can be reused by different
// modules. Errors that have a valid, localized counterpart in audio manager are
// mapped so that the user gets a localized error message.
// The ones that haven't (ie SFU's servers-side errors), aren't mapped.
const errorCodeMap = {
1301: 1001,
1302: 1002,
1305: 1005,
1307: 1007,
}
const mapErrorCode = (error) => {
const { errorCode } = error;
const mappedErrorCode = errorCodeMap[errorCode];
if (errorCode == null || mappedErrorCode == null) return error;
error.errorCode = mappedErrorCode;
return error;
}
export default class KurentoAudioBridge extends BaseAudioBridge {
constructor(userData) {
super();
const {
userId,
username,
voiceBridge,
meetingId,
sessionToken,
} = userData;
this.user = {
userId,
name: username,
sessionToken,
};
this.internalMeetingID = userData.meetingId;
this.voiceBridge = userData.voiceBridge;
this.userId = userData.userId;
this.name = userData.username;
this.sessionToken = userData.sessionToken;
this.media = {
inputDevice: {},
};
this.internalMeetingID = meetingId;
this.voiceBridge = voiceBridge;
this.reconnectOngoing = false;
this.hasSuccessfullyStarted = false;
}
static normalizeError(error = {}) {
const errorMessage = error.name || error.message || error.reason || 'Unknown error';
const errorCode = error.code || undefined;
let errorReason = error.reason || error.id || 'Undefined reason';
// HOPEFULLY TEMPORARY
// The errors are often just strings so replace the errorReason if that's the case
if (typeof error === 'string') {
errorReason = error;
}
// END OF HOPEFULLY TEMPORARY
return { errorMessage, errorCode, errorReason };
}
joinAudio({ isListenOnly, inputStream }, callback) {
return new Promise(async (resolve, reject) => {
this.callback = callback;
let iceServers = [];
try {
logger.info({
logCode: 'sfuaudiobridge_stunturn_fetch_start',
extraInfo: { iceServers },
}, 'SFU audio bridge starting STUN/TURN fetch');
iceServers = await fetchWebRTCMappedStunTurnServers(this.user.sessionToken);
} catch (error) {
logger.error({ logCode: 'sfuaudiobridge_stunturn_fetch_failed' },
'SFU audio bridge failed to fetch STUN/TURN info, using default servers');
iceServers = getMappedFallbackStun();
} finally {
logger.info({
logCode: 'sfuaudiobridge_stunturn_fetch_sucess',
extraInfo: { iceServers },
}, 'SFU audio bridge got STUN/TURN servers');
const options = {
wsUrl: Auth.authenticateURL(SFU_URL),
userName: this.user.name,
caleeName: `${GLOBAL_AUDIO_PREFIX}${this.voiceBridge}`,
iceServers,
logger,
inputStream,
};
const audioTag = document.getElementById(MEDIA_TAG);
const playElement = () => {
const mediaTagPlayed = () => {
logger.info({
logCode: 'listenonly_media_play_success',
}, 'Listen only media played successfully');
resolve(this.callback({ status: this.baseCallStates.started }));
};
if (audioTag.paused) {
// Tag isn't playing yet. Play it.
audioTag.play()
.then(mediaTagPlayed)
.catch((error) => {
// NotAllowedError equals autoplay issues, fire autoplay handling event.
// This will be handled in audio-manager.
if (error.name === 'NotAllowedError') {
logger.error({
logCode: 'listenonly_error_autoplay',
extraInfo: { errorName: error.name },
}, 'Listen only media play failed due to autoplay error');
const tagFailedEvent = new CustomEvent('audioPlayFailed', { detail: { mediaElement: audioTag } });
window.dispatchEvent(tagFailedEvent);
resolve(this.callback({
status: this.baseCallStates.autoplayBlocked,
}));
} else {
// Tag failed for reasons other than autoplay. Log the error and
// try playing again a few times until it works or fails for good
const played = playAndRetry(audioTag);
if (!played) {
logger.error({
logCode: 'listenonly_error_media_play_failed',
extraInfo: { errorName: error.name },
}, `Listen only media play failed due to ${error.name}`);
} else {
mediaTagPlayed();
}
}
});
} else {
// Media tag is already playing, so log a success. This is really a
// logging fallback for a case that shouldn't happen. But if it does
// (ie someone re-enables the autoPlay prop in the element), then it
// means the stream is playing properly and it'll be logged.
mediaTagPlayed();
}
};
const onSuccess = () => {
const { webRtcPeer } = window.kurentoManager.kurentoAudio;
this.hasSuccessfullyStarted = true;
if (webRtcPeer) {
logger.info({
logCode: 'sfuaudiobridge_audio_negotiation_success',
}, 'SFU audio bridge negotiated audio with success');
const stream = webRtcPeer.getRemoteStream();
audioTag.pause();
audioTag.srcObject = stream;
audioTag.muted = false;
logger.info({
logCode: 'sfuaudiobridge_audio_ready_to_play',
}, 'SFU audio bridge is ready to play');
playElement();
} else {
logger.info({
logCode: 'sfuaudiobridge_audio_negotiation_failed',
}, 'SFU audio bridge failed to negotiate audio');
this.callback({
status: this.baseCallStates.failed,
error: this.baseErrorCodes.CONNECTION_ERROR,
bridgeError: 'No WebRTC Peer',
});
}
if (this.reconnectOngoing) {
this.reconnectOngoing = false;
clearTimeout(this.reconnectTimeout);
}
};
const onFail = (error) => {
const { errorMessage, errorCode, errorReason } = KurentoAudioBridge.normalizeError(error);
// Listen only connected successfully already and dropped mid-call.
// Try to reconnect ONCE (binded to reconnectOngoing flag)
if (this.hasSuccessfullyStarted && !this.reconnectOngoing) {
logger.error({
logCode: 'listenonly_error_try_to_reconnect',
extraInfo: { errorMessage, errorCode, errorReason },
}, `Listen only failed for an ongoing session, try to reconnect. - reason: ${errorReason}`);
window.kurentoExitAudio();
this.callback({ status: this.baseCallStates.reconnecting });
this.reconnectOngoing = true;
// Set up a reconnectionTimeout in case the server is unresponsive
// for some reason. If it gets triggered, end the session and stop
// trying to reconnect
this.reconnectTimeout = setTimeout(() => {
this.callback({
status: this.baseCallStates.failed,
error: this.baseErrorCodes.CONNECTION_ERROR,
bridgeError: 'Reconnect Timeout',
});
this.reconnectOngoing = false;
this.hasSuccessfullyStarted = false;
window.kurentoExitAudio();
}, RECONNECT_TIMEOUT_MS);
window.kurentoJoinAudio(
MEDIA_TAG,
this.voiceBridge,
this.user.userId,
this.internalMeetingID,
onFail,
onSuccess,
options,
);
} else {
// Already tried reconnecting once OR the user handn't succesfully
// connected firsthand. Just finish the session and reject with error
if (!this.reconnectOngoing) {
logger.error({
logCode: 'listenonly_error_failed_to_connect',
extraInfo: { errorMessage, errorCode, errorReason },
}, `Listen only failed when trying to start due to ${errorReason}`);
} else {
logger.error({
logCode: 'listenonly_error_reconnect_failed',
extraInfo: { errorMessage, errorCode, errorReason },
}, `Listen only failed when trying to reconnect due to ${errorReason}`);
}
this.reconnectOngoing = false;
this.hasSuccessfullyStarted = false;
window.kurentoExitAudio();
this.callback({
status: this.baseCallStates.failed,
error: this.baseErrorCodes.CONNECTION_ERROR,
bridgeError: errorReason,
});
reject(errorReason);
}
};
if (!isListenOnly) {
return reject(new Error('Invalid bridge option'));
}
logger.info({
logCode: 'sfuaudiobridge_ready_to_join_audio',
}, 'SFU audio bridge is ready to join audio');
window.kurentoJoinAudio(
MEDIA_TAG,
this.voiceBridge,
this.user.userId,
this.internalMeetingID,
onFail,
onSuccess,
options,
);
}
});
this.broker;
this.reconnecting = false;
}
async changeOutputDevice(value) {
@ -262,8 +57,10 @@ export default class KurentoAudioBridge extends BaseAudioBridge {
await audioContext.setSinkId(value);
this.media.outputDeviceId = value;
} catch (error) {
logger.error({ logCode: 'sfuaudiobridge_changeoutputdevice_error', extraInfo: { error } },
'SFU audio bridge failed to fetch STUN/TURN info, using default');
logger.error({
logCode: 'listenonly_changeoutputdevice_error',
extraInfo: { error, bridge: BRIDGE_NAME }
}, 'Audio bridge failed to change output device');
throw new Error(this.baseErrorCodes.MEDIA_ERROR);
}
}
@ -272,18 +69,175 @@ export default class KurentoAudioBridge extends BaseAudioBridge {
}
getPeerConnection() {
const { webRtcPeer } = window.kurentoManager.kurentoAudio;
if (webRtcPeer) {
return webRtcPeer.peerConnection;
}
const webRtcPeer = this.broker.webRtcPeer;
if (webRtcPeer) return webRtcPeer.peerConnection;
return null;
}
exitAudio() {
return new Promise((resolve) => {
this.hasSuccessfullyStarted = false;
window.kurentoExitAudio();
return resolve(this.callback({ status: this.baseCallStates.ended }));
handleTermination() {
return this.callback({ status: this.baseCallStates.ended, bridge: BRIDGE_NAME });
}
clearReconnectionTimeout() {
this.reconnecting = false;
if (this.reconnectionTimeout) {
clearTimeout(this.reconnectionTimeout);
this.reconnectionTimeout = null;
}
}
reconnect() {
this.broker.stop();
this.callback({ status: this.baseCallStates.reconnecting, bridge: BRIDGE_NAME });
this.reconnecting = true;
// Set up a reconnectionTimeout in case the server is unresponsive
// for some reason. If it gets triggered, end the session and stop
// trying to reconnect
this.reconnectionTimeout = setTimeout(() => {
this.callback({
status: this.baseCallStates.failed,
error: 1010,
bridgeError: 'Reconnection timeout',
bridge: BRIDGE_NAME,
});
this.broker.stop();
this.clearReconnectionTimeout();
}, RECONNECT_TIMEOUT_MS);
this.joinAudio({ isListenOnly: true }, this.callback).then(() => {
this.clearReconnectionTimeout();
}).catch(error => {
// Error handling is a no-op because it will be "handled" in handleBrokerFailure
logger.debug({
logCode: 'listenonly_reconnect_failed',
extraInfo: {
errorMessage: error.errorMessage,
reconnecting: this.reconnecting,
bridge: BRIDGE_NAME
},
}, 'Listen only reconnect failed');
});
}
handleBrokerFailure(error) {
return new Promise((resolve, reject) => {
mapErrorCode(error);
const { errorMessage, errorCause, errorCode } = error;
if (this.broker.started && !this.reconnecting) {
logger.error({
logCode: 'listenonly_error_try_to_reconnect',
extraInfo: { errorMessage, errorCode, errorCause, bridge: BRIDGE_NAME },
}, 'Listen only failed, try to reconnect');
this.reconnect();
return resolve();
} else {
// Already tried reconnecting once OR the user handn't succesfully
// connected firsthand. Just finish the session and reject with error
logger.error({
logCode: 'listenonly_error',
extraInfo: {
errorMessage, errorCode, errorCause,
reconnecting: this.reconnecting,
bridge: BRIDGE_NAME
},
}, 'Listen only failed');
this.clearReconnectionTimeout();
this.broker.stop();
this.callback({
status: this.baseCallStates.failed,
error: errorCode,
bridgeError: errorMessage,
bridge: BRIDGE_NAME,
});
return reject(error);
}
});
}
dispatchAutoplayHandlingEvent(mediaElement) {
const tagFailedEvent = new CustomEvent('audioPlayFailed', {
detail: { mediaElement }
});
window.dispatchEvent(tagFailedEvent);
this.callback({ status: this.baseCallStates.autoplayBlocked, bridge: BRIDGE_NAME });
}
handleStart() {
const stream = this.broker.webRtcPeer.getRemoteStream();
const mediaElement = document.getElementById(MEDIA_TAG);
return loadAndPlayMediaStream(stream, mediaElement, false).then(() => {
return this.callback({ status: this.baseCallStates.started, bridge: BRIDGE_NAME });
}).catch(error => {
// NotAllowedError equals autoplay issues, fire autoplay handling event.
// This will be handled in audio-manager.
if (error.name === 'NotAllowedError') {
logger.error({
logCode: 'listenonly_error_autoplay',
extraInfo: { errorName: error.name, bridge: BRIDGE_NAME },
}, 'Listen only media play failed due to autoplay error');
this.dispatchAutoplayHandlingEvent(mediaElement);
} else {
const normalizedError = {
errorCode: 1004,
errorMessage: error.message || 'AUDIO_PLAY_FAILED',
};
this.callback({
status: this.baseCallStates.failed,
error: normalizedError.errorCode,
bridgeError: normalizedError.errorMessage,
bridge: BRIDGE_NAME,
})
throw normalizedError;
}
});
}
joinAudio({ isListenOnly }, callback) {
return new Promise(async (resolve, reject) => {
if (!isListenOnly) return reject(new Error('Invalid bridge option'));
this.callback = callback;
let iceServers = [];
try {
iceServers = await fetchWebRTCMappedStunTurnServers(this.sessionToken);
} catch (error) {
logger.error({ logCode: 'listenonly_stunturn_fetch_failed' },
'SFU audio bridge failed to fetch STUN/TURN info, using default servers');
iceServers = getMappedFallbackStun();
} finally {
const options = {
userName: this.name,
caleeName: `${GLOBAL_AUDIO_PREFIX}${this.voiceBridge}`,
iceServers,
};
this.broker = new ListenOnlyBroker(
Auth.authenticateURL(SFU_URL),
this.voiceBridge,
this.userId,
this.internalMeetingID,
RECV_ROLE,
options,
);
this.broker.onended = this.handleTermination.bind(this);
this.broker.onerror = (error) => {
this.handleBrokerFailure(error).catch(reject);
}
this.broker.onstart = () => {
this.handleStart().then(resolve).catch(reject);
};
this.broker.listen().catch(reject);
}
});
}
exitAudio() {
this.broker.stop();
this.clearReconnectionTimeout();
return Promise.resolve();
}
}

View File

@ -32,6 +32,10 @@ const USER_AGENT_RECONNECTION_ATTEMPTS = 3;
const USER_AGENT_RECONNECTION_DELAY_MS = 5000;
const USER_AGENT_CONNECTION_TIMEOUT_MS = 5000;
const ICE_GATHERING_TIMEOUT = MEDIA.iceGatheringTimeout || 5000;
const BRIDGE_NAME = 'sip';
const WEBSOCKET_KEEP_ALIVE_INTERVAL = MEDIA.websocketKeepAliveInterval || 0;
const WEBSOCKET_KEEP_ALIVE_DEBOUNCE = MEDIA.websocketKeepAliveDebounce || 10;
const TRACE_SIP = MEDIA.traceSip || false;
const getAudioSessionNumber = () => {
let currItem = parseInt(sessionStorage.getItem(AUDIO_SESSION_NUM_KEY), 10);
@ -172,6 +176,7 @@ class SIPSession {
status: this.baseCallStates.failed,
error: 1008,
bridgeError: 'Timeout on call transfer',
bridge: BRIDGE_NAME,
});
this.exitAudio();
@ -293,6 +298,7 @@ class SIPSession {
status: this.baseCallStates.failed,
error: 1006,
bridgeError: 'Timeout on call hangup',
bridge: BRIDGE_NAME,
});
return reject(this.baseErrorCodes.REQUEST_TIMEOUT);
}
@ -365,6 +371,9 @@ class SIPSession {
transportOptions: {
server: `${(protocol === 'https:' ? 'wss://' : 'ws://')}${hostname}/ws?${token}`,
connectionTimeout: USER_AGENT_CONNECTION_TIMEOUT_MS,
keepAliveInterval: WEBSOCKET_KEEP_ALIVE_INTERVAL,
keepAliveDebounce: WEBSOCKET_KEEP_ALIVE_DEBOUNCE,
traceSip: TRACE_SIP,
},
sessionDescriptionHandlerFactoryOptions: {
peerConnectionConfiguration: {
@ -433,6 +442,7 @@ class SIPSession {
status: this.baseCallStates.failed,
error,
bridgeError,
bridge: BRIDGE_NAME,
});
reject(this.baseErrorCodes.CONNECTION_ERROR);
});
@ -472,6 +482,7 @@ class SIPSession {
status: this.baseCallStates.failed,
error: 1002,
bridgeError: 'Websocket failed to connect',
bridge: BRIDGE_NAME,
});
return reject({
type: this.baseErrorCodes.CONNECTION_ERROR,
@ -502,6 +513,7 @@ class SIPSession {
status: this.baseCallStates.failed,
error: 1002,
bridgeError: 'Websocket failed to connect',
bridge: BRIDGE_NAME,
});
reject({
@ -660,7 +672,7 @@ class SIPSession {
},
}, 'Audio call - setup remote media');
this.callback({ status: this.baseCallStates.started });
this.callback({ status: this.baseCallStates.started, bridge: BRIDGE_NAME });
resolve();
}
};
@ -672,6 +684,7 @@ class SIPSession {
status: this.baseCallStates.failed,
error: 1006,
bridgeError: `Call timed out on start after ${CALL_CONNECT_TIMEOUT / 1000}s`,
bridge: BRIDGE_NAME,
});
this.exitAudio();
@ -691,6 +704,7 @@ class SIPSession {
error: 1010,
bridgeError: 'ICE negotiation timeout after '
+ `${ICE_NEGOTIATION_TIMEOUT / 1000}s`,
bridge: BRIDGE_NAME,
});
this.exitAudio();
@ -726,6 +740,7 @@ class SIPSession {
error: 1007,
bridgeError: 'ICE negotiation failed. Current state '
+ `- ${peer.iceConnectionState}`,
bridge: BRIDGE_NAME,
});
};
@ -744,6 +759,7 @@ class SIPSession {
error: 1012,
bridgeError: 'ICE connection closed. Current state -'
+ `${peer.iceConnectionState}`,
bridge: BRIDGE_NAME,
});
};
@ -837,6 +853,7 @@ class SIPSession {
if (!message && !!this.userRequestedHangup) {
return this.callback({
status: this.baseCallStates.ended,
bridge: BRIDGE_NAME,
});
}
@ -864,6 +881,7 @@ class SIPSession {
status: this.baseCallStates.failed,
error: mappedCause,
bridgeError: cause,
bridge: BRIDGE_NAME,
});
};
@ -972,7 +990,7 @@ export default class SIPBridge extends BaseAudioBridge {
if (this.activeSession.webrtcConnected) {
// webrtc was able to connect so just try again
message.silenceNotifications = true;
callback({ status: this.baseCallStates.reconnecting });
callback({ status: this.baseCallStates.reconnecting, bridge: BRIDGE_NAME, });
shouldTryReconnect = true;
} else if (hasFallbackDomain === true && hostname !== IPV4_FALLBACK_DOMAIN) {
message.silenceNotifications = true;

View File

@ -1,12 +1,12 @@
import RedisPubSub from '/imports/startup/server/redis';
import handleGroupChatsMsgs from './handlers/groupChatsMsgs';
import handleGroupChatMsgBroadcast from './handlers/groupChatMsgBroadcast';
import handleClearPublicGroupChat from './handlers/clearPublicGroupChat';
import handleUserTyping from './handlers/userTyping';
import handleSyncGroupChatMsg from './handlers/syncGroupsChat';
import { processForHTML5ServerOnly } from '/imports/api/common/server/helpers';
RedisPubSub.on('GetGroupChatMsgsRespMsg', processForHTML5ServerOnly(handleGroupChatsMsgs));
RedisPubSub.on('GetGroupChatMsgsRespMsg', processForHTML5ServerOnly(handleSyncGroupChatMsg));
RedisPubSub.on('GroupChatMessageBroadcastEvtMsg', handleGroupChatMsgBroadcast);
RedisPubSub.on('ClearPublicChatHistoryEvtMsg', handleClearPublicGroupChat);
RedisPubSub.on('SyncGetGroupChatMsgsRespMsg', handleGroupChatsMsgs);
RedisPubSub.on('SyncGetGroupChatMsgsRespMsg', handleSyncGroupChatMsg);
RedisPubSub.on('UserTypingEvtMsg', handleUserTyping);

View File

@ -1,5 +1,13 @@
import { check } from 'meteor/check';
import _ from 'lodash';
import addGroupChatMsg from '../modifiers/addGroupChatMsg';
import addBulkGroupChatMsgs from '../modifiers/addBulkGroupChatMsgs';
const { bufferChatInsertsMs } = Meteor.settings.public.chat;
const msgBuffer = [];
const bulkFn = _.throttle(addBulkGroupChatMsgs, bufferChatInsertsMs);
export default function handleGroupChatMsgBroadcast({ body }, meetingId) {
const { chatId, msg } = body;
@ -8,5 +16,10 @@ export default function handleGroupChatMsgBroadcast({ body }, meetingId) {
check(chatId, String);
check(msg, Object);
addGroupChatMsg(meetingId, chatId, msg);
if (bufferChatInsertsMs) {
msgBuffer.push({ meetingId, chatId, msg });
bulkFn(msgBuffer);
} else {
addGroupChatMsg(meetingId, chatId, msg);
}
}

View File

@ -1,19 +0,0 @@
import { Match, check } from 'meteor/check';
import addGroupChatMsg from '../modifiers/addGroupChatMsg';
export default function handleGroupChatsMsgs({ body }, meetingId) {
const { chatId, msgs, msg } = body;
check(meetingId, String);
check(chatId, String);
check(msgs, Match.Maybe(Array));
check(msg, Match.Maybe(Array));
const msgsAdded = [];
(msgs || msg).forEach((m) => {
msgsAdded.push(addGroupChatMsg(meetingId, chatId, m));
});
return msgsAdded;
}

View File

@ -0,0 +1,12 @@
import { Match, check } from 'meteor/check';
import syncMeetingChatMsgs from '../modifiers/syncMeetingChatMsgs';
export default function handleSyncGroupChat({ body }, meetingId) {
const { chatId, msgs } = body;
check(meetingId, String);
check(chatId, String);
check(msgs, Match.Maybe(Array));
syncMeetingChatMsgs(meetingId, chatId, msgs);
}

View File

@ -10,7 +10,7 @@ export default function stopUserTyping() {
userId: requesterUserId,
});
if (userTyping) {
if (userTyping && meetingId && requesterUserId) {
stopTyping(meetingId, requesterUserId, true);
}
}

View File

@ -0,0 +1,30 @@
import { GroupChatMsg } from '/imports/api/group-chat-msg';
import Logger from '/imports/startup/server/logger';
import flat from 'flat';
import { parseMessage } from './addGroupChatMsg';
export default async function addBulkGroupChatMsgs(msgs) {
if (!msgs.length) return;
const mappedMsgs = msgs
.map(({ chatId, meetingId, msg }) => ({
_id: new Mongo.ObjectID()._str,
...msg,
meetingId,
chatId,
message: parseMessage(msg.message),
sender: msg.sender.id,
}))
.map(el => flat(el, { safe: true }));
try {
const { insertedCount } = await GroupChatMsg.rawCollection().insertMany(mappedMsgs);
msgs.length = 0;
if (insertedCount) {
Logger.info(`Inserted ${insertedCount} messages`);
}
} catch (err) {
Logger.error(`Error on bulk insert. ${err}`);
}
}

View File

@ -4,7 +4,7 @@ import Logger from '/imports/startup/server/logger';
import { GroupChatMsg } from '/imports/api/group-chat-msg';
import { BREAK_LINE } from '/imports/utils/lineEndings';
const parseMessage = (message) => {
export function parseMessage(message) {
let parsedMessage = message || '';
// Replace \r and \n to <br/>
@ -15,7 +15,7 @@ const parseMessage = (message) => {
parsedMessage = parsedMessage.split('<a href="event:').join('<a target="_blank" href="');
return parsedMessage;
};
}
export default function addGroupChatMsg(meetingId, chatId, msg) {
check(meetingId, String);
@ -28,7 +28,6 @@ export default function addGroupChatMsg(meetingId, chatId, msg) {
message: String,
correlationId: Match.Maybe(String),
});
const msgDocument = {
...msg,
meetingId,
@ -36,25 +35,13 @@ export default function addGroupChatMsg(meetingId, chatId, msg) {
message: parseMessage(msg.message),
};
const selector = {
meetingId,
chatId,
id: msg.id,
};
const modifier = {
$set: msgDocument,
};
try {
const { insertedId } = GroupChatMsg.upsert(selector, modifier);
const insertedId = GroupChatMsg.insert(msgDocument);
if (insertedId) {
Logger.info(`Added group-chat-msg msgId=${msg.id} chatId=${chatId} meetingId=${meetingId}`);
} else {
Logger.info(`Upserted group-chat-msg msgId=${msg.id} chatId=${chatId} meetingId=${meetingId}`);
}
} catch (err) {
Logger.error(`Adding group-chat-msg to collection: ${err}`);
Logger.error(`Error on adding group-chat-msg to collection: ${err}`);
}
}

View File

@ -0,0 +1,44 @@
import { Match, check } from 'meteor/check';
import flat from 'flat';
import { GroupChatMsg } from '/imports/api/group-chat-msg';
import Logger from '/imports/startup/server/logger';
import { parseMessage } from './addGroupChatMsg';
export default function syncMeetingChatMsgs(meetingId, chatId, msgs) {
if (!msgs.length) return;
check(meetingId, String);
check(chatId, String);
check(msgs, Match.Maybe(Array));
try {
const bulkOperations = GroupChatMsg.rawCollection().initializeOrderedBulkOp();
msgs
.forEach((msg) => {
const msgToSync = {
...msg,
meetingId,
chatId,
message: parseMessage(msg.message),
sender: msg.sender.id,
};
const modifier = flat(msgToSync, { safe: true });
bulkOperations
.find({ chatId, meetingId, id: msg.id })
.upsert()
.updateOne({
$setOnInsert: { _id: new Mongo.ObjectID()._str },
$set: { ...modifier },
});
});
bulkOperations.execute();
Logger.info('Chat messages synchronized', { chatId, meetingId });
} catch (err) {
Logger.error(`Error on sync chat messages: ${err}`);
}
}

View File

@ -24,6 +24,7 @@ import clearRecordMeeting from './clearRecordMeeting';
import clearVoiceCallStates from '/imports/api/voice-call-states/server/modifiers/clearVoiceCallStates';
import clearVideoStreams from '/imports/api/video-streams/server/modifiers/clearVideoStreams';
import clearAuthTokenValidation from '/imports/api/auth-token-validation/server/modifiers/clearAuthTokenValidation';
import Metrics from '/imports/startup/server/metrics';
export default function meetingHasEnded(meetingId) {
removeAnnotationsStreamer(meetingId);
@ -50,6 +51,7 @@ export default function meetingHasEnded(meetingId) {
clearVideoStreams(meetingId);
clearAuthTokenValidation(meetingId);
BannedUsers.delete(meetingId);
Metrics.removeMeeting(meetingId);
Logger.info(`Cleared Meetings with id ${meetingId}`);
});

View File

@ -20,6 +20,11 @@ process.on('uncaughtException', (err) => {
process.exit(1);
});
process.on('uncaughtException', (err) => {
Logger.error(`uncaughtException: ${err}`);
process.exit(1);
});
Meteor.startup(() => {
const APP_CONFIG = Meteor.settings.public.app;
const env = Meteor.isDevelopment ? 'development' : 'production';

View File

@ -0,0 +1,144 @@
/* eslint-disable no-prototype-builtins */
import fs from 'fs';
import path from 'path';
import { Meteor } from 'meteor/meteor';
import Logger from './logger';
const {
metricsDumpIntervalMs,
metricsFolderPath,
removeMeetingOnEnd,
} = Meteor.settings.private.redis.metrics;
class Metrics {
constructor() {
this.metrics = {};
}
addEvent(meetingId, eventName, messageLength) {
if (!this.metrics.hasOwnProperty(meetingId)) {
this.metrics[meetingId] = {
currentlyInQueue: {},
wasInQueue: {},
};
}
const { currentlyInQueue } = this.metrics[meetingId];
if (!currentlyInQueue.hasOwnProperty(eventName)) {
currentlyInQueue[eventName] = {
count: 1,
payloadSize: messageLength,
};
} else {
currentlyInQueue[eventName].count += 1;
currentlyInQueue[eventName].payloadSize += messageLength;
}
}
processEvent(meetingId, eventName, size, processingStartTimestamp) {
const currentProcessingTimestamp = Date.now();
const processTime = currentProcessingTimestamp - processingStartTimestamp;
if (!this.metrics[meetingId].wasInQueue.hasOwnProperty(eventName)) {
this.metrics[meetingId].wasInQueue[eventName] = {
count: 1,
payloadSize: {
min: size,
max: size,
last: size,
total: size,
avg: size,
},
processingTime: {
min: processTime,
max: processTime,
last: processTime,
total: processTime,
avg: processTime,
},
};
this.metrics[meetingId].currentlyInQueue[eventName].count -= 1;
if (!this.metrics[meetingId].currentlyInQueue[eventName].count) {
delete this.metrics[meetingId].currentlyInQueue[eventName];
}
} else {
const { currentlyInQueue, wasInQueue } = this.metrics[meetingId];
currentlyInQueue[eventName].count -= 1;
if (!currentlyInQueue[eventName].count) {
delete currentlyInQueue[eventName];
}
const { payloadSize, processingTime } = wasInQueue[eventName];
wasInQueue[eventName].count += 1;
payloadSize.last = size;
payloadSize.total += size;
if (payloadSize.min > size) payloadSize.min = size;
if (payloadSize.max < size) payloadSize.max = size;
payloadSize.avg = payloadSize.total / wasInQueue[eventName].count;
if (processingTime.min > processTime) processingTime.min = processTime;
if (processingTime.max < processTime) processingTime.max = processTime;
processingTime.last = processTime;
processingTime.total += processTime;
processingTime.avg = processingTime.total / wasInQueue[eventName].count;
}
}
setAnnotationQueueLength(meetingId, size) {
this.metrics[meetingId].annotationQueueLength = size;
}
startDumpFile() {
Meteor.setInterval(() => {
try {
const fileDate = new Date();
const fullYear = fileDate.getFullYear();
const month = (fileDate.getMonth() + 1).toString().padStart(2, '0');
const day = fileDate.getDate().toString().padStart(2, '0');
const hour = fileDate.getHours().toString().padStart(2, '0');
const minutes = fileDate.getMinutes().toString().padStart(2, '0');
const seconds = fileDate.getSeconds().toString().padStart(2, '0');
const folderName = `${fullYear}${month}${day}_${hour}`;
const fileName = `${folderName}${minutes}${seconds}_metrics.json`;
const folderPath = path.join(metricsFolderPath, folderName);
const fullFilePath = path.join(folderPath, fileName);
if (!fs.existsSync(folderPath)) {
Logger.debug(`Creating folder: ${folderPath}`);
fs.mkdirSync(folderPath);
}
fs.writeFileSync(fullFilePath, JSON.stringify(this.metrics));
Logger.info('Metric file successfully written');
} catch (err) {
Logger.error('Error on writing metrics to disk.', err);
}
}, metricsDumpIntervalMs);
}
removeMeeting(meetingId) {
if (removeMeetingOnEnd) {
Logger.info(`Removing meeting ${meetingId} from metrics`);
delete this.metrics[meetingId];
} else {
Logger.info(`Skipping remove of meeting ${meetingId} from metrics`);
}
}
}
const metricsSingleton = new Metrics();
export default metricsSingleton;

View File

@ -3,11 +3,15 @@ import Redis from 'redis';
import { Meteor } from 'meteor/meteor';
import { EventEmitter2 } from 'eventemitter2';
import { check } from 'meteor/check';
import fs from 'fs';
import Logger from './logger';
import Metrics from './metrics';
// Fake meetingId used for messages that have no meetingId
const NO_MEETING_ID = '_';
const { queueMetrics } = Meteor.settings.private.redis.metrics;
const makeEnvelope = (channel, eventName, header, body, routing) => {
const envelope = {
envelope: {
@ -47,6 +51,7 @@ class MeetingMessageQueue {
const isAsync = this.asyncMessages.includes(channel)
|| this.asyncMessages.includes(eventName);
const beginHandleTimestamp = Date.now();
let called = false;
check(eventName, String);
@ -58,6 +63,14 @@ class MeetingMessageQueue {
Logger.debug(`Redis: ${eventName} completed ${isAsync ? 'async' : 'sync'}`);
}
called = true;
if (queueMetrics) {
const queueId = meetingId || NO_MEETING_ID;
const dataLength = JSON.stringify(data).length;
Metrics.processEvent(queueId, eventName, dataLength, beginHandleTimestamp);
}
const queueLength = this.queue.length();
if (queueLength > 100) {
Logger.warn(`Redis: MeetingMessageQueue for meetingId=${meetingId} has queue size=${queueLength} `);
@ -122,6 +135,10 @@ class RedisPubSub {
this.sub = Redis.createClient({ host, port });
}
if (queueMetrics) {
Metrics.startDumpFile();
}
this.emitter = new EventEmitter2();
this.mettingsQueues = {};
this.mettingsQueues[NO_MEETING_ID] = new MeetingMessageQueue(this.emitter, this.config.async, this.config.debug);
@ -202,10 +219,10 @@ class RedisPubSub {
}
}
if (channel !== this.customRedisChannel && queueId in this.mettingsQueues) {
Logger.error(`Consider routing ${eventName} to ${this.customRedisChannel}` );
// Logger.error(`Consider routing ${eventName} to ${this.customRedisChannel}` + message);
}
// if (channel !== this.customRedisChannel && queueId in this.mettingsQueues) {
// Logger.error(`Consider routing ${eventName} to ${this.customRedisChannel}` );
// // Logger.error(`Consider routing ${eventName} to ${this.customRedisChannel}` + message);
// }
if (channel === this.customRedisChannel || queueId in this.mettingsQueues) {
this.mettingsQueues[queueId].add({

View File

@ -1,5 +1,6 @@
import { Tracker } from 'meteor/tracker';
import KurentoBridge from '/imports/api/audio/client/bridge/kurento';
import Auth from '/imports/ui/services/auth';
import VoiceUsers from '/imports/api/voice-users';
import SIPBridge from '/imports/api/audio/client/bridge/sip';
@ -209,7 +210,7 @@ class AudioManager {
const exitKurentoAudio = () => {
if (this.useKurento) {
window.kurentoExitAudio();
bridge.exitAudio();
const audio = document.querySelector(MEDIA_TAG);
audio.muted = false;
}
@ -230,7 +231,7 @@ class AudioManager {
audioBridge: bridgeInUse,
retries,
},
}, `Listen only error - ${err} - bridge: ${bridgeInUse}`);
}, `Listen only error - ${errorReason} - bridge: ${bridgeInUse}`);
};
logger.info({ logCode: 'audiomanager_join_listenonly', extraInfo: { logType: 'user_action' } }, 'user requested to connect to audio conference as listen only');
@ -384,6 +385,7 @@ class AudioManager {
error,
bridgeError,
silenceNotifications,
bridge,
} = response;
if (status === STARTED) {
@ -401,6 +403,7 @@ class AudioManager {
extraInfo: {
errorCode: error,
cause: bridgeError,
bridge,
},
}, `Audio error - errorCode=${error}, cause=${bridgeError}`);
if (silenceNotifications !== true) {

View File

@ -0,0 +1,25 @@
export default SFU_BROKER_ERRORS = {
// 13xx errors are client-side bbb-webrtc-sfu's base broker errors
1301: "WEBSOCKET_DISCONNECTED",
1302: "WEBSOCKET_CONNECTION_FAILED",
1305: "PEER_NEGOTIATION_FAILED",
1307: "ICE_STATE_FAILED",
// 2xxx codes are server-side bbb-webrtc-sfu errors
2000: "MEDIA_SERVER_CONNECTION_ERROR",
2001: "MEDIA_SERVER_OFFLINE",
2002: "MEDIA_SERVER_NO_RESOURCES",
2003: "MEDIA_SERVER_REQUEST_TIMEOUT",
2004: "MEDIA_SERVER_GENERIC_ERROR",
2020: "ICE_ADD_CANDIDATE_FAILED",
2021: "ICE_GATHERING_FAILED",
2022: "ICE_STATE_FAILED",
2200: "MEDIA_GENERIC_ERROR",
2201: "MEDIA_NOT_FOUND",
2202: "MEDIA_INVALID_SDP",
2203: "MEDIA_NO_AVAILABLE_CODEC",
2208: "MEDIA_GENERIC_PROCESS_ERROR",
2209: "MEDIA_ADAPTER_OBJECT_NOT_FOUND",
2210: "MEDIA_CONNECT_ERROR",
2211: "MEDIA_NOT_FLOWING",
2300: "SFU_INVALID_REQUEST",
}

View File

@ -0,0 +1,163 @@
import logger from '/imports/startup/client/logger';
import BaseBroker from '/imports/ui/services/bbb-webrtc-sfu/sfu-base-broker';
const ON_ICE_CANDIDATE_MSG = 'iceCandidate';
const SFU_COMPONENT_NAME = 'audio';
class ListenOnlyBroker extends BaseBroker {
constructor(
wsUrl,
voiceBridge,
userId,
internalMeetingId,
role,
options = {},
) {
super(SFU_COMPONENT_NAME, wsUrl);
this.voiceBridge = voiceBridge;
this.userId = userId;
this.internalMeetingId = internalMeetingId;
this.role = role;
// Optional parameters are: userName, caleeName, iceServers
Object.assign(this, options);
}
joinListenOnly () {
return new Promise((resolve, reject) => {
const options = {
mediaConstraints: {
audio: true,
video: false,
},
onicecandidate: (candidate) => {
this.onIceCandidate(candidate, this.role);
},
};
this.addIceServers(options);
this.webRtcPeer = kurentoUtils.WebRtcPeer.WebRtcPeerRecvonly(options, (error) => {
if (error) {
// 1305: "PEER_NEGOTIATION_FAILED",
const normalizedError = BaseBroker.assembleError(1305);
logger.error({
logCode: `${this.logCodePrefix}_peer_creation_failed`,
extraInfo: {
errorMessage: error.name || error.message || 'Unknown error',
errorCode: normalizedError.errorCode,
sfuComponent: this.sfuComponent,
started: this.started,
},
}, `Listen only peer creation failed`);
this.onerror(normalizedError);
return reject(normalizedError);
}
this.webRtcPeer.iceQueue = [];
this.webRtcPeer.generateOffer(this.onOfferGenerated.bind(this));
});
this.webRtcPeer.peerConnection.onconnectionstatechange = this.handleConnectionStateChange.bind(this);
return resolve();
});
}
listen () {
return this.openWSConnection()
.then(this.joinListenOnly.bind(this));
}
onWSMessage (message) {
const parsedMessage = JSON.parse(message.data);
switch (parsedMessage.id) {
case 'startResponse':
this.processAnswer(parsedMessage);
break;
case 'iceCandidate':
this.handleIceCandidate(parsedMessage.candidate);
break;
case 'webRTCAudioSuccess':
this.onstart(parsedMessage.success);
this.started = true;
break;
case 'webRTCAudioError':
case 'error':
this.handleSFUError(parsedMessage);
break;
case 'pong':
break;
default:
logger.debug({
logCode: `${this.logCodePrefix}_invalid_req`,
extraInfo: { messageId: parsedMessage.id || 'Unknown', sfuComponent: this.sfuComponent }
}, `Discarded invalid SFU message`);
}
}
handleSFUError (sfuResponse) {
const { code, reason, role } = sfuResponse;
const error = BaseBroker.assembleError(code, reason);
logger.error({
logCode: `${this.logCodePrefix}_sfu_error`,
extraInfo: {
errorCode: code,
errorMessage: error.errorMessage,
role,
sfuComponent: this.sfuComponent,
started: this.started,
},
}, `Listen only failed in SFU`);
this.onerror(error);
}
onOfferGenerated (error, sdpOffer) {
if (error) {
logger.error({
logCode: `${this.logCodePrefix}_offer_failure`,
extraInfo: {
errorMessage: error.name || error.message || 'Unknown error',
sfuComponent: this.sfuComponent
},
}, `Listen only offer generation failed`);
// 1305: "PEER_NEGOTIATION_FAILED",
const normalizedError = BaseBroker.assembleError(1305);
return this.onerror(error);
}
const message = {
id: 'start',
type: this.sfuComponent,
role: this.role,
internalMeetingId: this.internalMeetingId,
voiceBridge: this.voiceBridge,
caleeName: this.caleeName,
userId: this.userId,
userName: this.userName,
sdpOffer,
};
logger.debug({
logCode: `${this.logCodePrefix}_offer_generated`,
extraInfo: { sfuComponent: this.sfuComponent, role: this.role },
}, `SFU audio offer generated`);
this.sendMessage(message);
}
onIceCandidate (candidate, role) {
const message = {
id: ON_ICE_CANDIDATE_MSG,
role,
type: this.sfuComponent,
voiceBridge: this.voiceBridge,
candidate,
};
this.sendMessage(message);
}
}
export default ListenOnlyBroker;

View File

@ -0,0 +1,30 @@
const playMediaElement = (mediaElement) => {
return new Promise((resolve, reject) => {
if (mediaElement.paused) {
// Tag isn't playing yet. Play it.
mediaElement.play()
.then(resolve)
.catch((error) => {
if (error.name === 'NotAllowedError') return reject(error);
// Tag failed for reasons other than autoplay. Log the error and
// try playing again a few times until it works or fails for good
const played = playAndRetry(mediaElement);
if (!played) return reject(error);
return resolve();
});
} else {
// Media tag is already playing, so log a success. This is really a
// logging fallback for a case that shouldn't happen. But if it does
// (ie someone re-enables the autoPlay prop in the mediaElement), then it
// means the mediaStream is playing properly and it'll be logged.
return resolve();
}
});
}
export default function loadAndPlayMediaStream (mediaStream, mediaElement, muted = true) {
mediaElement.muted = muted;
mediaElement.pause();
mediaElement.srcObject = mediaStream;
return playMediaElement(mediaElement);
}

View File

@ -0,0 +1,246 @@
import logger from '/imports/startup/client/logger';
import { notifyStreamStateChange } from '/imports/ui/services/bbb-webrtc-sfu/stream-state-service';
import SFU_BROKER_ERRORS from '/imports/ui/services/bbb-webrtc-sfu/broker-base-errors';
const PING_INTERVAL_MS = 15000;
class BaseBroker {
static assembleError(code, reason) {
const message = reason || SFU_BROKER_ERRORS[code];
const error = new Error(message);
error.errorCode = code;
// Duplicating key-vals because we can't settle on an error pattern... - prlanzarin
error.errorCause = error.message;
error.errorMessage = error.message;
return error;
}
constructor(sfuComponent, wsUrl) {
this.wsUrl = wsUrl;
this.sfuComponent = sfuComponent;
this.ws = null;
this.webRtcPeer = null;
this.pingInterval = null;
this.started = false;
this.signallingTransportOpen = false;
this.logCodePrefix = `${this.sfuComponent}_broker`;
this.onbeforeunload = this.onbeforeunload.bind(this);
window.addEventListener('beforeunload', this.onbeforeunload);
}
set started (val) {
this._started = val;
}
get started () {
return this._started;
}
onbeforeunload () {
return this.stop();
}
onstart () {
// To be implemented by inheritors
}
onerror (error) {
// To be implemented by inheritors
}
onended () {
// To be implemented by inheritors
}
openWSConnection () {
return new Promise((resolve, reject) => {
this.ws = new WebSocket(this.wsUrl);
this.ws.onmessage = this.onWSMessage.bind(this);
this.ws.onclose = () => {
// 1301: "WEBSOCKET_DISCONNECTED",
this.onerror(BaseBroker.assembleError(1301));
};
this.ws.onerror = (error) => {
logger.error({
logCode: `${this.logCodePrefix}_websocket_error`,
extraInfo: {
errorMessage: error.name || error.message || 'Unknown error',
sfuComponent: this.sfuComponent,
}
}, 'WebSocket connection to SFU failed');
if (this.signallingTransportOpen) {
// 1301: "WEBSOCKET_DISCONNECTED", transport was already open
this.onerror(BaseBroker.assembleError(1301));
} else {
// 1302: "WEBSOCKET_CONNECTION_FAILED", transport errored before establishment
const normalized1302 = BaseBroker.assembleError(1302);
this.onerror(normalized1302);
return reject(normalized1302);
}
};
this.ws.onopen = () => {
this.pingInterval = setInterval(this.ping.bind(this), PING_INTERVAL_MS);
this.signallingTransportOpen = true;
return resolve();
};
});
}
sendMessage (message) {
const jsonMessage = JSON.stringify(message);
this.ws.send(jsonMessage);
}
ping () {
this.sendMessage({ id: 'ping' });
}
processAnswer (message) {
const { response, sdpAnswer, role, connectionId } = message;
if (response !== 'accepted') return this.handleSFUError(message);
logger.debug({
logCode: `${this.logCodePrefix}_start_success`,
extraInfo: {
sfuConnectionId: connectionId,
role,
sfuComponent: this.sfuComponent,
}
}, `Start request accepted for ${this.sfuComponent}`);
this.webRtcPeer.processAnswer(sdpAnswer, (error) => {
if (error) {
logger.error({
logCode: `${this.logCodePrefix}_processanswer_error`,
extraInfo: {
errorMessage: error.name || error.message || 'Unknown error',
sfuConnectionId: connectionId,
role,
sfuComponent: this.sfuComponent,
}
}, `Error processing SDP answer from SFU for ${this.sfuComponent}`);
// 1305: "PEER_NEGOTIATION_FAILED",
return this.onerror(BaseBroker.assembleError(1305));
}
// Mark the peer as negotiated and flush the ICE queue
this.webRtcPeer.negotiated = true;
this.processIceQueue();
});
}
addIceServers (options) {
if (this.iceServers && this.iceServers.length > 0) {
options.configuration = {};
options.configuration.iceServers = this.iceServers;
}
return options;
}
handleConnectionStateChange (eventIdentifier) {
if (this.webRtcPeer) {
const { peerConnection } = this.webRtcPeer;
const connectionState = peerConnection.connectionState;
if (eventIdentifier) {
notifyStreamStateChange(eventIdentifier, connectionState);
}
if (connectionState === 'failed' || connectionState === 'closed') {
this.webRtcPeer.peerConnection.onconnectionstatechange = null;
// 1307: "ICE_STATE_FAILED",
const error = BaseBroker.assembleError(1307);
this.onerror(error);
}
}
}
addIceCandidate (candidate) {
this.webRtcPeer.addIceCandidate(candidate, (error) => {
if (error) {
// Just log the error. We can't be sure if a candidate failure on add is
// fatal or not, so that's why we have a timeout set up for negotiations and
// listeners for ICE state transitioning to failures, so we won't act on it here
logger.error({
logCode: `${this.logCodePrefix}_addicecandidate_error`,
extraInfo: {
errorMessage: error.name || error.message || 'Unknown error',
errorCode: error.code || 'Unknown code',
sfuComponent: this.sfuComponent,
started: this.started,
}
}, `Adding ICE candidate failed`);
}
});
}
processIceQueue () {
const peer = this.webRtcPeer;
while (peer.iceQueue.length) {
const candidate = peer.iceQueue.shift();
this.addIceCandidate(candidate);
}
}
handleIceCandidate (candidate) {
const peer = this.webRtcPeer;
if (peer.negotiated) {
this.addIceCandidate(candidate);
} else {
// ICE candidates are queued until a SDP answer has been processed.
// This was done due to a long term iOS/Safari quirk where it'd (as of 2018)
// fail if candidates were added before the offer/answer cycle was completed.
// IT STILL HAPPENS - prlanzarin sept 2019
// still happens - prlanzarin sept 2020
peer.iceQueue.push(candidate);
}
}
disposePeer () {
if (this.webRtcPeer) {
this.webRtcPeer.dispose();
this.webRtcPeer = null;
}
}
stop () {
this.onstart = function(){};
this.onerror = function(){};
window.removeEventListener('beforeunload', this.onbeforeunload);
if (this.webRtcPeer) {
this.webRtcPeer.peerConnection.onconnectionstatechange = null;
}
if (this.ws !== null) {
this.ws.onclose = function (){};
this.ws.close();
}
if (this.pingInterval) {
clearInterval(this.pingInterval);
}
this.disposePeer();
this.started = false;
logger.debug({
logCode: `${this.logCodePrefix}_stop`,
extraInfo: { sfuComponent: this.sfuComponent },
}, `Stopped broker session for ${this.sfuComponent}`);
this.onended();
this.onended = function(){};
}
}
export default BaseBroker;

View File

@ -0,0 +1,45 @@
/*
* The idea behind this whole utilitary is proving a decoupled way of propagating
* peer connection states up and down the component tree without coming up with
* weird trackers, hooks and/or prop drilling. This is mainly aimed at component
* trees that aren't well organized in the first place (ie video-provider).
* The base use case for this is notifying stream state changes to correctly
* handle UI for reconnection scenarios.
*/
const STREAM_STATE_CHANGED_EVENT_PREFIX = 'streamStateChanged';
/*
* The event name format for notify/subscribe/unsubscribe is
* `${STREAM_STATE_CHANGED_EVENT_PREFIX}:${eventTag}`. eventTag can be any string.
* streamState must be a valid member of either RTCIceConnectionState or
* RTCPeerConnectionState enums
*/
export const notifyStreamStateChange = (eventTag, streamState) => {
const eventName = `${STREAM_STATE_CHANGED_EVENT_PREFIX}:${eventTag}`;
const streamStateChanged = new CustomEvent(
eventName,
{ detail: { eventTag, streamState } },
);
window.dispatchEvent(streamStateChanged);
}
// `callback` is the method to be called when a new state is notified
// via notifyStreamStateChange
export const subscribeToStreamStateChange = (eventTag, callback) => {
const eventName = `${STREAM_STATE_CHANGED_EVENT_PREFIX}:${eventTag}`;
window.addEventListener(eventName, callback, false);
}
export const unsubscribeFromStreamStateChange = (eventTag, callback) => {
const eventName = `${STREAM_STATE_CHANGED_EVENT_PREFIX}:${eventTag}`;
window.removeEventListener(eventName, callback);
}
export const isStreamStateUnhealthy = (streamState) => {
return streamState === 'disconnected' || streamState === 'failed' || streamState === 'closed';
}
export const isStreamStateHealthy = (streamState) => {
return streamState === 'connected' || streamState === 'completed';
}

View File

@ -43,7 +43,8 @@ public:
# can generate excessive overhead to the server. We recommend
# this value to be kept under 12.
breakoutRoomLimit: 8
customHeartbeat: false
# https://github.com/bigbluebutton/bigbluebutton/pull/10826
customHeartbeat: true
defaultSettings:
application:
animations: true
@ -273,6 +274,7 @@ public:
time: 5000
chat:
enabled: true
bufferChatInsertsMs: 0
startClosed: false
min_message_length: 1
max_message_length: 5000
@ -340,6 +342,16 @@ public:
#user activates microphone.
iceGatheringTimeout: 5000
sipjsHackViaWs: false
#Websocket keepAlive interval (seconds). You may set this to prevent
#websocket disconnection in some environments. When set, BBB will send
#'\r\n\r\n' string through SIP.js's websocket. If not set, default value
#is 0.
websocketKeepAliveInterval: 30
#Debounce time (seconds) for sending SIP.js's websocket keep alive message.
#If not set, default value is 10.
websocketKeepAliveDebounce: 10
#Trace sip/audio messages in browser. If not set, default value is false.
traceSip: false
presentation:
allowDownloadable: true
defaultPresentationFile: default.pdf
@ -500,6 +512,11 @@ private:
timeout: 5000
password: null
debug: false
metrics:
queueMetrics: false
metricsDumpIntervalMs: 60000
metricsFolderPath: METRICS_FOLDER
removeMeetingOnEnd: true
channels:
toAkkaApps: to-akka-apps-redis-channel
toThirdParty: to-third-party-redis-channel
@ -523,6 +540,8 @@ private:
enabled: false
heapdump:
enabled: false
heapdumpFolderPath: HEAPDUMP_FOLDER
heapdumpIntervalMs: 3600000
minBrowserVersions:
- browser: chrome
version: 72

View File

@ -237,6 +237,11 @@ if not FileTest.directory?(target_dir)
BigBlueButton.process_deskshare_videos(target_dir, temp_dir, meeting_id, deskshare_width, deskshare_height, presentation_props['video_formats'])
end
# Copy shared notes from raw files
if !Dir["#{raw_archive_dir}/notes/*"].empty?
FileUtils.cp_r("#{raw_archive_dir}/notes", target_dir)
end
process_done = File.new("#{recording_dir}/status/processed/#{meeting_id}-presentation.done", "w")
process_done.write("Processed #{meeting_id}")
process_done.close

View File

@ -31,7 +31,7 @@ require 'fastimage' # require fastimage to get the image size of the slides (gem
# This script lives in scripts/archive/steps while properties.yaml lives in scripts/
bbb_props = BigBlueButton.read_props
$presentation_props = YAML::load(File.open('presentation.yml'))
$presentation_props = YAML::load(File.read('presentation.yml'))
# There's a couple of places where stuff is mysteriously divided or multiplied
# by 2. This is just here to call out how spooky that is.
@ -874,7 +874,7 @@ def processPresentation(package_dir)
# Iterate through the events.xml and store the events, building the
# xml files as we go
last_timestamp = 0.0
events_xml = Nokogiri::XML(File.open("#{$process_dir}/events.xml"))
events_xml = Nokogiri::XML(File.read("#{$process_dir}/events.xml"))
events_xml.xpath('/recording/event').each do |event|
eventname = event['eventname']
last_timestamp = timestamp =
@ -1215,9 +1215,13 @@ begin
FileUtils.cp("#{$process_dir}/presentation_text.json", package_dir)
end
if File.exist?("#{$process_dir}/notes/notes.html")
FileUtils.cp("#{$process_dir}/notes/notes.html", package_dir)
end
processing_time = File.read("#{$process_dir}/processing_time")
@doc = Nokogiri::XML(File.open("#{$process_dir}/events.xml"))
@doc = Nokogiri::XML(File.read("#{$process_dir}/events.xml"))
# Retrieve record events and calculate total recording duration.
$rec_events = BigBlueButton::Events.match_start_and_stop_rec_events(
@ -1248,7 +1252,7 @@ begin
# Update state and add playback to metadata.xml
## Load metadata.xml
metadata = Nokogiri::XML(File.open("#{package_dir}/metadata.xml"))
metadata = Nokogiri::XML(File.read("#{package_dir}/metadata.xml"))
## Update state
recording = metadata.root
state = recording.at_xpath("state")