From 48354721a2941ff148fcfcde39c5e5e18f06d3ab Mon Sep 17 00:00:00 2001 From: ganfra Date: Tue, 17 Nov 2020 17:06:49 +0100 Subject: [PATCH] VoIP: start handling negotiation flow (wip) --- .../sdk/api/session/call/CallListener.kt | 6 + .../android/sdk/api/session/call/CallState.kt | 5 + .../android/sdk/api/session/call/MxCall.kt | 13 +- .../api/session/room/model/call/SdpType.kt | 19 +- .../session/call/CallListenersDispatcher.kt | 5 + .../call/DefaultCallSignalingService.kt | 13 + .../internal/session/call/model/MxCallImpl.kt | 14 + .../app/features/call/SdpObserverAdapter.kt | 4 +- .../call/WebRtcPeerConnectionManager.kt | 337 +++++++++++------- .../features/call/utils/PeerConnectionExt.kt | 82 +++++ 10 files changed, 371 insertions(+), 127 deletions(-) create mode 100644 vector/src/main/java/im/vector/app/features/call/utils/PeerConnectionExt.kt diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/api/session/call/CallListener.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/api/session/call/CallListener.kt index 255f156f0f..c68b6494e6 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/api/session/call/CallListener.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/api/session/call/CallListener.kt @@ -20,6 +20,7 @@ import org.matrix.android.sdk.api.session.room.model.call.CallAnswerContent import org.matrix.android.sdk.api.session.room.model.call.CallCandidatesContent import org.matrix.android.sdk.api.session.room.model.call.CallHangupContent import org.matrix.android.sdk.api.session.room.model.call.CallInviteContent +import org.matrix.android.sdk.api.session.room.model.call.CallNegotiateContent import org.matrix.android.sdk.api.session.room.model.call.CallRejectContent import org.matrix.android.sdk.api.session.room.model.call.CallSelectAnswerContent @@ -51,5 +52,10 @@ interface CallListener { */ fun onCallSelectAnswerReceived(callSelectAnswerContent: CallSelectAnswerContent) + /** + * Called when a negotiation is sent + */ + fun onCallNegotiateReceived(callNegotiateContent: CallNegotiateContent) + fun onCallManagedByOtherSession(callId: String) } diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/api/session/call/CallState.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/api/session/call/CallState.kt index e55546e12d..e012365de2 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/api/session/call/CallState.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/api/session/call/CallState.kt @@ -23,6 +23,11 @@ sealed class CallState { /** Idle, setting up objects */ object Idle : CallState() + /** + * CreateOffer. Intermediate state between Idle and Dialing. + */ + object CreateOffer: CallState() + /** Dialing. Outgoing call is signaling the remote peer */ object Dialing : CallState() diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/api/session/call/MxCall.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/api/session/call/MxCall.kt index 2a84fd658d..7c3d377b79 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/api/session/call/MxCall.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/api/session/call/MxCall.kt @@ -25,11 +25,7 @@ interface MxCallDetail { val isOutgoing: Boolean val roomId: String val opponentUserId: String - val ourPartyId: String val isVideoCall: Boolean - - var opponentPartyId: Optional? - var opponentVersion: Int } /** @@ -41,7 +37,9 @@ interface MxCall : MxCallDetail { const val VOIP_PROTO_VERSION = 0 } - + val ourPartyId: String + var opponentPartyId: Optional? + var opponentVersion: Int var state: CallState @@ -51,6 +49,11 @@ interface MxCall : MxCallDetail { */ fun accept(sdp: SessionDescription) + /** + * SDP negotiation for media pause, hold/resume, ICE restarts and voice/video call up/downgrading + */ + fun negotiate(sdp: SessionDescription) + /** * This has to be sent by the caller's client once it has chosen an answer. */ diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/api/session/room/model/call/SdpType.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/api/session/room/model/call/SdpType.kt index ff393135ea..69181f0d1b 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/api/session/room/model/call/SdpType.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/api/session/room/model/call/SdpType.kt @@ -18,6 +18,7 @@ package org.matrix.android.sdk.api.session.room.model.call import com.squareup.moshi.Json import com.squareup.moshi.JsonClass +import org.webrtc.SessionDescription @JsonClass(generateAdapter = false) enum class SdpType { @@ -25,5 +26,21 @@ enum class SdpType { OFFER, @Json(name = "answer") - ANSWER + ANSWER; +} + +fun SdpType.asWebRTC(): SessionDescription.Type { + return if (this == SdpType.OFFER) { + SessionDescription.Type.OFFER + } else { + SessionDescription.Type.ANSWER + } +} + +fun SessionDescription.Type.toSdpType(): SdpType { + return if (this == SessionDescription.Type.OFFER) { + SdpType.OFFER + } else { + SdpType.ANSWER + } } diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/call/CallListenersDispatcher.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/call/CallListenersDispatcher.kt index 302b5e01e7..38826a194f 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/call/CallListenersDispatcher.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/call/CallListenersDispatcher.kt @@ -23,6 +23,7 @@ import org.matrix.android.sdk.api.session.room.model.call.CallAnswerContent import org.matrix.android.sdk.api.session.room.model.call.CallCandidatesContent import org.matrix.android.sdk.api.session.room.model.call.CallHangupContent import org.matrix.android.sdk.api.session.room.model.call.CallInviteContent +import org.matrix.android.sdk.api.session.room.model.call.CallNegotiateContent import org.matrix.android.sdk.api.session.room.model.call.CallRejectContent import org.matrix.android.sdk.api.session.room.model.call.CallSelectAnswerContent @@ -59,6 +60,10 @@ class CallListenersDispatcher(private val listeners: Set) : CallLi it.onCallSelectAnswerReceived(callSelectAnswerContent) } + override fun onCallNegotiateReceived(callNegotiateContent: CallNegotiateContent) = dispatch { + it.onCallNegotiateReceived(callNegotiateContent) + } + private fun dispatch(lambda: (CallListener) -> Unit) { listeners.toList().forEach { tryOrNull { diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/call/DefaultCallSignalingService.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/call/DefaultCallSignalingService.kt index 76b0b1d630..a1cd1c018e 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/call/DefaultCallSignalingService.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/call/DefaultCallSignalingService.kt @@ -156,9 +156,22 @@ internal class DefaultCallSignalingService @Inject constructor( EventType.CALL_SELECT_ANSWER -> { handleCallSelectAnswerEvent(event) } + EventType.CALL_NEGOTIATE -> { + handleCallNegotiateEvent(event) + } } } + private fun handleCallNegotiateEvent(event: Event) { + val content = event.getClearContent().toModel() ?: return + val call = content.getCall() ?: return + if (call.ourPartyId == content.partyId) { + // Ignore remote echo + return + } + callListenersDispatcher.onCallSelectAnswerReceived(content) + } + private fun handleCallSelectAnswerEvent(event: Event) { val content = event.getClearContent().toModel() ?: return val call = content.getCall() ?: return diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/call/model/MxCallImpl.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/call/model/MxCallImpl.kt index f07053fb28..b484315cac 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/call/model/MxCallImpl.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/call/model/MxCallImpl.kt @@ -28,8 +28,10 @@ import org.matrix.android.sdk.api.session.room.model.call.CallAnswerContent import org.matrix.android.sdk.api.session.room.model.call.CallCandidatesContent import org.matrix.android.sdk.api.session.room.model.call.CallHangupContent import org.matrix.android.sdk.api.session.room.model.call.CallInviteContent +import org.matrix.android.sdk.api.session.room.model.call.CallNegotiateContent import org.matrix.android.sdk.api.session.room.model.call.CallRejectContent import org.matrix.android.sdk.api.session.room.model.call.CallSelectAnswerContent +import org.matrix.android.sdk.api.session.room.model.call.toSdpType import org.matrix.android.sdk.api.util.Optional import org.matrix.android.sdk.internal.session.call.DefaultCallSignalingService import org.matrix.android.sdk.internal.session.room.send.queue.EventSenderProcessor @@ -164,6 +166,18 @@ internal class MxCallImpl( .also { eventSenderProcessor.postEvent(it) } } + override fun negotiate(sdp: SessionDescription) { + Timber.v("## VOIP negotiate $callId") + CallNegotiateContent( + callId = callId, + partyId = ourPartyId, + lifetime = DefaultCallSignalingService.CALL_TIMEOUT_MS, + description = CallNegotiateContent.Description(sdp = sdp.description, type = sdp.type.toSdpType()) + ) + .let { createEventAndLocalEcho(type = EventType.CALL_NEGOTIATE, roomId = roomId, content = it.toContent()) } + .also { eventSenderProcessor.postEvent(it) } + } + override fun selectAnswer() { Timber.v("## VOIP select answer $callId") if (isOutgoing) return diff --git a/vector/src/main/java/im/vector/app/features/call/SdpObserverAdapter.kt b/vector/src/main/java/im/vector/app/features/call/SdpObserverAdapter.kt index 0685928d1c..8cd7d0765b 100644 --- a/vector/src/main/java/im/vector/app/features/call/SdpObserverAdapter.kt +++ b/vector/src/main/java/im/vector/app/features/call/SdpObserverAdapter.kt @@ -30,10 +30,10 @@ open class SdpObserverAdapter : SdpObserver { } override fun onCreateSuccess(p0: SessionDescription?) { - Timber.e("## SdpObserver: onSetFailure $p0") + Timber.v("## SdpObserver: onCreateSuccess $p0") } override fun onCreateFailure(p0: String?) { - Timber.e("## SdpObserver: onSetFailure $p0") + Timber.e("## SdpObserver: onCreateFailure $p0") } } diff --git a/vector/src/main/java/im/vector/app/features/call/WebRtcPeerConnectionManager.kt b/vector/src/main/java/im/vector/app/features/call/WebRtcPeerConnectionManager.kt index d755bed698..18ab98b248 100644 --- a/vector/src/main/java/im/vector/app/features/call/WebRtcPeerConnectionManager.kt +++ b/vector/src/main/java/im/vector/app/features/call/WebRtcPeerConnectionManager.kt @@ -26,16 +26,26 @@ import im.vector.app.ActiveSessionDataSource import im.vector.app.core.services.BluetoothHeadsetReceiver import im.vector.app.core.services.CallService import im.vector.app.core.services.WiredHeadsetStateReceiver +import im.vector.app.features.call.utils.awaitCreateAnswer +import im.vector.app.features.call.utils.awaitCreateOffer +import im.vector.app.features.call.utils.awaitSetLocalDescription +import im.vector.app.features.call.utils.awaitSetRemoteDescription import im.vector.app.push.fcm.FcmHelper import io.reactivex.disposables.Disposable import io.reactivex.subjects.PublishSubject import io.reactivex.subjects.ReplaySubject -import org.matrix.android.sdk.api.MatrixCallback +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.asCoroutineDispatcher +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import kotlinx.coroutines.withContext import org.matrix.android.sdk.api.extensions.orFalse import org.matrix.android.sdk.api.extensions.tryOrNull import org.matrix.android.sdk.api.session.Session -import org.matrix.android.sdk.api.session.call.CallState import org.matrix.android.sdk.api.session.call.CallListener +import org.matrix.android.sdk.api.session.call.CallState import org.matrix.android.sdk.api.session.call.EglUtils import org.matrix.android.sdk.api.session.call.MxCall import org.matrix.android.sdk.api.session.call.TurnServerResponse @@ -43,8 +53,12 @@ import org.matrix.android.sdk.api.session.room.model.call.CallAnswerContent import org.matrix.android.sdk.api.session.room.model.call.CallCandidatesContent import org.matrix.android.sdk.api.session.room.model.call.CallHangupContent import org.matrix.android.sdk.api.session.room.model.call.CallInviteContent +import org.matrix.android.sdk.api.session.room.model.call.CallNegotiateContent import org.matrix.android.sdk.api.session.room.model.call.CallRejectContent import org.matrix.android.sdk.api.session.room.model.call.CallSelectAnswerContent +import org.matrix.android.sdk.api.session.room.model.call.SdpType +import org.matrix.android.sdk.api.session.room.model.call.asWebRTC +import org.matrix.android.sdk.internal.util.awaitCallback import org.webrtc.AudioSource import org.webrtc.AudioTrack import org.webrtc.Camera1Enumerator @@ -59,6 +73,7 @@ import org.webrtc.MediaStream import org.webrtc.PeerConnection import org.webrtc.PeerConnectionFactory import org.webrtc.RtpReceiver +import org.webrtc.RtpTransceiver import org.webrtc.SessionDescription import org.webrtc.SurfaceTextureHelper import org.webrtc.SurfaceViewRenderer @@ -120,7 +135,11 @@ class WebRtcPeerConnectionManager @Inject constructor( var localVideoSource: VideoSource? = null, var localVideoTrack: VideoTrack? = null, - var remoteVideoTrack: VideoTrack? = null + var remoteVideoTrack: VideoTrack? = null, + + // Perfect negotiation state: https://www.w3.org/TR/webrtc/#perfect-negotiation-example + var makingOffer: Boolean = false, + var ignoreOffer: Boolean = false ) { var offerSdp: CallInviteContent.Offer? = null @@ -165,6 +184,7 @@ class WebRtcPeerConnectionManager @Inject constructor( // var localMediaStream: MediaStream? = null private val executor = Executors.newSingleThreadExecutor() + private val dispatcher = executor.asCoroutineDispatcher() private val rootEglBase by lazy { EglUtils.rootEglBase } @@ -291,39 +311,46 @@ class WebRtcPeerConnectionManager @Inject constructor( callContext.peerConnection = peerConnectionFactory?.createPeerConnection(iceServers, StreamObserver(callContext)) } - private fun sendSdpOffer(callContext: CallContext) { + private fun CoroutineScope.sendSdpOffer(callContext: CallContext) = launch(dispatcher) { val constraints = MediaConstraints() // These are deprecated options // constraints.mandatory.add(MediaConstraints.KeyValuePair("OfferToReceiveAudio", "true")) // constraints.mandatory.add(MediaConstraints.KeyValuePair("OfferToReceiveVideo", if (currentCall?.mxCall?.isVideoCall == true) "true" else "false")) + val call = callContext.mxCall + val peerConnection = callContext.peerConnection ?: return@launch Timber.v("## VOIP creating offer...") - callContext.peerConnection?.createOffer(object : SdpObserverAdapter() { - override fun onCreateSuccess(p0: SessionDescription?) { - if (p0 == null) return -// localSdp = p0 - callContext.peerConnection?.setLocalDescription(object : SdpObserverAdapter() {}, p0) - // send offer to peer - currentCall?.mxCall?.offerSdp(p0) - - if(currentCall?.mxCall?.opponentPartyId?.hasValue().orFalse()){ - currentCall?.mxCall?.selectAnswer() - } + callContext.makingOffer = true + try { + val sessionDescription = peerConnection.awaitCreateOffer(constraints) ?: return@launch + peerConnection.awaitSetLocalDescription(sessionDescription) + if (peerConnection.iceGatheringState() == PeerConnection.IceGatheringState.GATHERING) { + // Allow a short time for initial candidates to be gathered + delay(200) } - }, constraints) + if (call.state == CallState.Terminated) { + return@launch + } + if (call.state == CallState.CreateOffer) { + // send offer to peer + call.offerSdp(sessionDescription) + } else { + call.negotiate(sessionDescription) + } + } catch (failure: Throwable) { + // Need to handle error properly. + Timber.v("Failure while creating offer") + } finally { + callContext.makingOffer = false + } } - private fun getTurnServer(callback: ((TurnServerResponse?) -> Unit)) { - currentSession?.callSignalingService() - ?.getTurnServer(object : MatrixCallback { - override fun onSuccess(data: TurnServerResponse?) { - callback(data) - } - - override fun onFailure(failure: Throwable) { - callback(null) - } - }) + private suspend fun getTurnServer(): TurnServerResponse? { + return tryOrNull { + awaitCallback { + currentSession?.callSignalingService()?.getTurnServer(it) + } + } } fun attachViewRenderers(localViewRenderer: SurfaceViewRenderer?, remoteViewRenderer: SurfaceViewRenderer, mode: String?) { @@ -349,10 +376,11 @@ class WebRtcPeerConnectionManager @Inject constructor( callId = mxCall.callId) } - getTurnServer { turnServer -> - val call = currentCall ?: return@getTurnServer + GlobalScope.launch(dispatcher) { + val turnServer = getTurnServer() + val call = currentCall ?: return@launch when (mode) { - VectorCallActivity.INCOMING_ACCEPT -> { + VectorCallActivity.INCOMING_ACCEPT -> { internalAcceptIncomingCall(call, turnServer) } VectorCallActivity.INCOMING_RINGING -> { @@ -360,28 +388,25 @@ class WebRtcPeerConnectionManager @Inject constructor( // TODO eventually we could already display local stream in PIP? } VectorCallActivity.OUTGOING_CREATED -> { - executor.execute { - // 1. Create RTCPeerConnection - createPeerConnection(call, turnServer) + call.mxCall.state = CallState.CreateOffer + // 1. Create RTCPeerConnection + createPeerConnection(call, turnServer) - // 2. Access camera (if video call) + microphone, create local stream - createLocalStream(call) + // 2. Access camera (if video call) + microphone, create local stream + createLocalStream(call) - // 3. add local stream - call.localMediaStream?.let { call.peerConnection?.addStream(it) } - attachViewRenderersInternal() + // 3. add local stream + call.localMediaStream?.let { call.peerConnection?.addStream(it) } + attachViewRenderersInternal() - // create an offer, set local description and send via signaling - sendSdpOffer(call) - - Timber.v("## VOIP remoteCandidateSource ${call.remoteCandidateSource}") - call.remoteIceCandidateDisposable = call.remoteCandidateSource?.subscribe({ - Timber.v("## VOIP adding remote ice candidate $it") - call.peerConnection?.addIceCandidate(it) - }, { - Timber.v("## VOIP failed to add remote ice candidate $it") - }) - } + Timber.v("## VOIP remoteCandidateSource ${call.remoteCandidateSource}") + call.remoteIceCandidateDisposable = call.remoteCandidateSource?.subscribe({ + Timber.v("## VOIP adding remote ice candidate $it") + call.peerConnection?.addIceCandidate(it) + }, { + Timber.v("## VOIP failed to add remote ice candidate $it") + }) + // Now wait for negotiation callback } else -> { // sink existing tracks (configuration change, e.g screen rotation) @@ -391,49 +416,49 @@ class WebRtcPeerConnectionManager @Inject constructor( } } - private fun internalAcceptIncomingCall(callContext: CallContext, turnServerResponse: TurnServerResponse?) { + private suspend fun internalAcceptIncomingCall(callContext: CallContext, turnServerResponse: TurnServerResponse?) { val mxCall = callContext.mxCall // Update service state - - val name = currentSession?.getUser(mxCall.opponentUserId)?.getBestName() - ?: mxCall.roomId - CallService.onPendingCall( - context = context, - isVideo = mxCall.isVideoCall, - roomName = name, - roomId = mxCall.roomId, - matrixId = currentSession?.myUserId ?: "", - callId = mxCall.callId - ) - executor.execute { - // 1) create peer connection - createPeerConnection(callContext, turnServerResponse) - - // create sdp using offer, and set remote description - // the offer has beed stored when invite was received - callContext.offerSdp?.sdp?.let { - SessionDescription(SessionDescription.Type.OFFER, it) - }?.let { - callContext.peerConnection?.setRemoteDescription(SdpObserverAdapter(), it) - } - // 2) Access camera + microphone, create local stream - createLocalStream(callContext) - - // 2) add local stream - currentCall?.localMediaStream?.let { callContext.peerConnection?.addStream(it) } - attachViewRenderersInternal() - - // create a answer, set local description and send via signaling - createAnswer() - - Timber.v("## VOIP remoteCandidateSource ${callContext.remoteCandidateSource}") - callContext.remoteIceCandidateDisposable = callContext.remoteCandidateSource?.subscribe({ - Timber.v("## VOIP adding remote ice candidate $it") - callContext.peerConnection?.addIceCandidate(it) - }, { - Timber.v("## VOIP failed to add remote ice candidate $it") - }) + withContext(Dispatchers.Main) { + val name = currentSession?.getUser(mxCall.opponentUserId)?.getBestName() + ?: mxCall.roomId + CallService.onPendingCall( + context = context, + isVideo = mxCall.isVideoCall, + roomName = name, + roomId = mxCall.roomId, + matrixId = currentSession?.myUserId ?: "", + callId = mxCall.callId + ) } + // 1) create peer connection + createPeerConnection(callContext, turnServerResponse) + + // create sdp using offer, and set remote description + // the offer has beed stored when invite was received + callContext.offerSdp?.sdp?.let { + SessionDescription(SessionDescription.Type.OFFER, it) + }?.let { + callContext.peerConnection?.setRemoteDescription(SdpObserverAdapter(), it) + } + // 2) Access camera + microphone, create local stream + createLocalStream(callContext) + + // 2) add local stream + currentCall?.localMediaStream?.let { callContext.peerConnection?.addStream(it) } + attachViewRenderersInternal() + + // create a answer, set local description and send via signaling + createAnswer()?.also { + callContext.mxCall.accept(it) + } + Timber.v("## VOIP remoteCandidateSource ${callContext.remoteCandidateSource}") + callContext.remoteIceCandidateDisposable = callContext.remoteCandidateSource?.subscribe({ + Timber.v("## VOIP adding remote ice candidate $it") + callContext.peerConnection?.addIceCandidate(it) + }, { + Timber.v("## VOIP failed to add remote ice candidate $it") + }) } private fun createLocalStream(callContext: CallContext) { @@ -544,10 +569,11 @@ class WebRtcPeerConnectionManager @Inject constructor( } fun acceptIncomingCall() { - Timber.v("## VOIP acceptIncomingCall from state ${currentCall?.mxCall?.state}") - val mxCall = currentCall?.mxCall - if (mxCall?.state == CallState.LocalRinging) { - getTurnServer { turnServer -> + GlobalScope.launch(dispatcher) { + Timber.v("## VOIP acceptIncomingCall from state ${currentCall?.mxCall?.state}") + val mxCall = currentCall?.mxCall + if (mxCall?.state == CallState.LocalRinging) { + val turnServer = getTurnServer() internalAcceptIncomingCall(currentCall!!, turnServer) } } @@ -739,22 +765,21 @@ class WebRtcPeerConnectionManager @Inject constructor( } } - private fun createAnswer() { + private suspend fun createAnswer(): SessionDescription? { Timber.w("## VOIP createAnswer") - val call = currentCall ?: return + val call = currentCall ?: return null + val peerConnection = call.peerConnection ?: return null val constraints = MediaConstraints().apply { mandatory.add(MediaConstraints.KeyValuePair("OfferToReceiveAudio", "true")) mandatory.add(MediaConstraints.KeyValuePair("OfferToReceiveVideo", if (call.mxCall.isVideoCall) "true" else "false")) } - executor.execute { - call.peerConnection?.createAnswer(object : SdpObserverAdapter() { - override fun onCreateSuccess(p0: SessionDescription?) { - if (p0 == null) return - call.peerConnection?.setLocalDescription(object : SdpObserverAdapter() {}, p0) - // Now need to send it - call.mxCall.accept(p0) - } - }, constraints) + return try { + val localDescription = peerConnection.awaitCreateAnswer(constraints) ?: return null + peerConnection.awaitSetLocalDescription(localDescription) + localDescription + } catch (failure: Throwable) { + Timber.v("Fail to create answer") + null } } @@ -862,11 +887,17 @@ class WebRtcPeerConnectionManager @Inject constructor( matrixId = currentSession?.myUserId ?: "", callId = mxCall.callId ) - executor.execute { + GlobalScope.launch(dispatcher) { Timber.v("## VOIP onCallAnswerReceived ${callAnswerContent.callId}") val sdp = SessionDescription(SessionDescription.Type.ANSWER, callAnswerContent.answer.sdp) - call.peerConnection?.setRemoteDescription(object : SdpObserverAdapter() { - }, sdp) + try { + call.peerConnection?.awaitSetRemoteDescription(sdp) + } catch (failure: Throwable) { + return@launch + } + if (call.mxCall.opponentPartyId?.hasValue().orFalse()) { + call.mxCall.selectAnswer() + } } } @@ -902,7 +933,50 @@ class WebRtcPeerConnectionManager @Inject constructor( call.mxCall.state = CallState.Terminated endCall(false) } + } + override fun onCallNegotiateReceived(callNegotiateContent: CallNegotiateContent) { + val call = currentCall ?: return + if (call.mxCall.callId != callNegotiateContent.callId) return Unit.also { + Timber.w("onCallNegotiateReceived for non active call? ${callNegotiateContent.callId}") + } + val description = callNegotiateContent.description + val type = description?.type + val sdpText = description?.sdp + if (type == null || sdpText == null) { + Timber.i("Ignoring invalid m.call.negotiate event"); + return; + } + val peerConnection = call.peerConnection ?: return + // Politeness always follows the direction of the call: in a glare situation, + // we pick either the inbound or outbound call, so one side will always be + // inbound and one outbound + val polite = !call.mxCall.isOutgoing + // Here we follow the perfect negotiation logic from + // https://developer.mozilla.org/en-US/docs/Web/API/WebRTC_API/Perfect_negotiation + val offerCollision = description.type == SdpType.OFFER + && (call.makingOffer || peerConnection.signalingState() != PeerConnection.SignalingState.STABLE) + + call.ignoreOffer = !polite && offerCollision + if (call.ignoreOffer) { + Timber.i("Ignoring colliding negotiate event because we're impolite") + return + } + + GlobalScope.launch(dispatcher) { + try { + val sdp = SessionDescription(type.asWebRTC(), sdpText) + peerConnection.awaitSetRemoteDescription(sdp) + if (type == SdpType.OFFER) { + // create a answer, set local description and send via signaling + createAnswer()?.also { + call.mxCall.negotiate(it) + } + } + } catch (failure: Throwable) { + Timber.e(failure, "Failed to complete negotiation") + } + } } override fun onCallManagedByOtherSession(callId: String) { @@ -921,6 +995,27 @@ class WebRtcPeerConnectionManager @Inject constructor( } } + /** + * Indicates whether we are 'on hold' to the remote party (ie. if true, + * they cannot hear us). Note that this will return true when we put the + * remote on hold too due to the way hold is implemented (since we don't + * wish to play hold music when we put a call on hold, we use 'inactive' + * rather than 'sendonly') + * @returns true if the other party has put us on hold + */ + private fun isLocalOnHold(callContext: CallContext): Boolean { + if (callContext.mxCall.state !is CallState.Connected) return false + var callOnHold = true + // We consider a call to be on hold only if *all* the tracks are on hold + // (is this the right thing to do?) + for (transceiver in callContext.peerConnection?.transceivers ?: emptyList()) { + val trackOnHold = transceiver.currentDirection == RtpTransceiver.RtpTransceiverDirection.INACTIVE + || transceiver.currentDirection == RtpTransceiver.RtpTransceiverDirection.RECV_ONLY + if (!trackOnHold) callOnHold = false; + } + return callOnHold; + } + private inner class StreamObserver(val callContext: CallContext) : PeerConnection.Observer { override fun onConnectionChange(newState: PeerConnection.PeerConnectionState?) { @@ -930,14 +1025,14 @@ class WebRtcPeerConnectionManager @Inject constructor( * Every ICE transport used by the connection is either in use (state "connected" or "completed") * or is closed (state "closed"); in addition, at least one transport is either "connected" or "completed" */ - PeerConnection.PeerConnectionState.CONNECTED -> { + PeerConnection.PeerConnectionState.CONNECTED -> { callContext.mxCall.state = CallState.Connected(newState) callAudioManager.onCallConnected(callContext.mxCall) } /** * One or more of the ICE transports on the connection is in the "failed" state. */ - PeerConnection.PeerConnectionState.FAILED -> { + PeerConnection.PeerConnectionState.FAILED -> { // This can be temporary, e.g when other ice not yet received... // callContext.mxCall.state = CallState.ERROR callContext.mxCall.state = CallState.Connected(newState) @@ -953,7 +1048,7 @@ class WebRtcPeerConnectionManager @Inject constructor( * One or more of the ICE transports are currently in the process of establishing a connection; * that is, their RTCIceConnectionState is either "checking" or "connected", and no transports are in the "failed" state */ - PeerConnection.PeerConnectionState.CONNECTING -> { + PeerConnection.PeerConnectionState.CONNECTING -> { callContext.mxCall.state = CallState.Connected(PeerConnection.PeerConnectionState.CONNECTING) } /** @@ -969,7 +1064,7 @@ class WebRtcPeerConnectionManager @Inject constructor( PeerConnection.PeerConnectionState.DISCONNECTED -> { callContext.mxCall.state = CallState.Connected(newState) } - null -> { + null -> { } } } @@ -994,14 +1089,14 @@ class WebRtcPeerConnectionManager @Inject constructor( * the ICE agent is gathering addresses or is waiting to be given remote candidates through * calls to RTCPeerConnection.addIceCandidate() (or both). */ - PeerConnection.IceConnectionState.NEW -> { + PeerConnection.IceConnectionState.NEW -> { } /** * The ICE agent has been given one or more remote candidates and is checking pairs of local and remote candidates * against one another to try to find a compatible match, but has not yet found a pair which will allow * the peer connection to be made. It's possible that gathering of candidates is also still underway. */ - PeerConnection.IceConnectionState.CHECKING -> { + PeerConnection.IceConnectionState.CHECKING -> { } /** @@ -1010,7 +1105,7 @@ class WebRtcPeerConnectionManager @Inject constructor( * It's possible that gathering is still underway, and it's also possible that the ICE agent is still checking * candidates against one another looking for a better connection to use. */ - PeerConnection.IceConnectionState.CONNECTED -> { + PeerConnection.IceConnectionState.CONNECTED -> { } /** * Checks to ensure that components are still connected failed for at least one component of the RTCPeerConnection. @@ -1024,7 +1119,7 @@ class WebRtcPeerConnectionManager @Inject constructor( * compatible matches for all components of the connection. * It is, however, possible that the ICE agent did find compatible connections for some components. */ - PeerConnection.IceConnectionState.FAILED -> { + PeerConnection.IceConnectionState.FAILED -> { // I should not hangup here.. // because new candidates could arrive // callContext.mxCall.hangUp() @@ -1032,12 +1127,12 @@ class WebRtcPeerConnectionManager @Inject constructor( /** * The ICE agent has finished gathering candidates, has checked all pairs against one another, and has found a connection for all components. */ - PeerConnection.IceConnectionState.COMPLETED -> { + PeerConnection.IceConnectionState.COMPLETED -> { } /** * The ICE agent for this RTCPeerConnection has shut down and is no longer handling requests. */ - PeerConnection.IceConnectionState.CLOSED -> { + PeerConnection.IceConnectionState.CLOSED -> { } } } @@ -1090,8 +1185,12 @@ class WebRtcPeerConnectionManager @Inject constructor( override fun onRenegotiationNeeded() { Timber.v("## VOIP StreamObserver onRenegotiationNeeded") - // Should not do anything, for now we follow a pre-agreed-upon - // signaling/negotiation protocol. + val call = currentCall ?: return + if (call.mxCall.state != CallState.CreateOffer && call.mxCall.opponentVersion == 0) { + Timber.v("Opponent does not support renegotiation: ignoring onRenegotiationNeeded event") + return + } + GlobalScope.sendSdpOffer(callContext) } /** diff --git a/vector/src/main/java/im/vector/app/features/call/utils/PeerConnectionExt.kt b/vector/src/main/java/im/vector/app/features/call/utils/PeerConnectionExt.kt new file mode 100644 index 0000000000..8cce0d9a75 --- /dev/null +++ b/vector/src/main/java/im/vector/app/features/call/utils/PeerConnectionExt.kt @@ -0,0 +1,82 @@ +/* + * Copyright (c) 2020 New Vector Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package im.vector.app.features.call.utils + +import im.vector.app.features.call.SdpObserverAdapter +import org.webrtc.MediaConstraints +import org.webrtc.PeerConnection +import org.webrtc.SessionDescription +import kotlin.coroutines.resume +import kotlin.coroutines.resumeWithException +import kotlin.coroutines.suspendCoroutine + +suspend fun PeerConnection.awaitCreateOffer(mediaConstraints: MediaConstraints): SessionDescription? = suspendCoroutine { cont -> + createOffer(object : SdpObserverAdapter() { + override fun onCreateSuccess(p0: SessionDescription?) { + super.onCreateSuccess(p0) + cont.resume(p0) + } + + override fun onCreateFailure(p0: String?) { + super.onCreateFailure(p0) + cont.resumeWithException(IllegalStateException(p0)) + } + }, mediaConstraints) +} + +suspend fun PeerConnection.awaitCreateAnswer(mediaConstraints: MediaConstraints): SessionDescription? = suspendCoroutine { cont -> + createAnswer(object : SdpObserverAdapter() { + override fun onCreateSuccess(p0: SessionDescription?) { + super.onCreateSuccess(p0) + cont.resume(p0) + } + + override fun onCreateFailure(p0: String?) { + super.onCreateFailure(p0) + cont.resumeWithException(IllegalStateException(p0)) + } + }, mediaConstraints) +} + +suspend fun PeerConnection.awaitSetLocalDescription(sessionDescription: SessionDescription): Unit = suspendCoroutine { cont -> + setLocalDescription(object : SdpObserverAdapter() { + override fun onSetFailure(p0: String?) { + super.onSetFailure(p0) + cont.resumeWithException(IllegalStateException(p0)) + } + + override fun onSetSuccess() { + super.onSetSuccess() + cont.resume(Unit) + } + }, sessionDescription) +} + +suspend fun PeerConnection.awaitSetRemoteDescription(sessionDescription: SessionDescription): Unit = suspendCoroutine { cont -> + setRemoteDescription(object : SdpObserverAdapter() { + override fun onSetFailure(p0: String?) { + super.onSetFailure(p0) + cont.resumeWithException(IllegalStateException(p0)) + } + + override fun onSetSuccess() { + super.onSetSuccess() + cont.resume(Unit) + } + }, sessionDescription) +} +