From 416563760fc345ab306c3cac90ae9d0358bf05ee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Calvi=C3=B1o=20S=C3=A1nchez?= Date: Thu, 5 Dec 2024 11:32:51 +0100 Subject: [PATCH 01/16] Remove Dagger related code from PeerConnectionWrapper MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The PeerConnectionWrapper does not need to be injected in the application, nor the Context needs to be injected in the PeerConnectionWrapper. This all seems to be leftovers from the past, and removing them would ease adding unit tests for the PeerConnectionWrapper. Signed-off-by: Daniel Calviño Sánchez --- .../talk/webrtc/PeerConnectionWrapper.java | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java b/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java index fe44e797d8..b77950ec38 100644 --- a/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java +++ b/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java @@ -7,11 +7,9 @@ */ package com.nextcloud.talk.webrtc; -import android.content.Context; import android.util.Log; import com.bluelinelabs.logansquare.LoganSquare; -import com.nextcloud.talk.application.NextcloudTalkApplication; import com.nextcloud.talk.models.json.signaling.DataChannelMessage; import com.nextcloud.talk.models.json.signaling.NCIceCandidate; import com.nextcloud.talk.models.json.signaling.NCMessagePayload; @@ -38,19 +36,11 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Objects; - -import javax.inject.Inject; import androidx.annotation.Nullable; -import autodagger.AutoInjector; -@AutoInjector(NextcloudTalkApplication.class) public class PeerConnectionWrapper { - @Inject - Context context; - private static final String TAG = PeerConnectionWrapper.class.getCanonicalName(); private final SignalingMessageReceiver signalingMessageReceiver; @@ -117,9 +107,6 @@ public PeerConnectionWrapper(PeerConnectionFactory peerConnectionFactory, boolean isMCUPublisher, boolean hasMCU, String videoStreamType, SignalingMessageReceiver signalingMessageReceiver, SignalingMessageSender signalingMessageSender) { - - Objects.requireNonNull(NextcloudTalkApplication.Companion.getSharedApplication()).getComponentApplication().inject(this); - this.localStream = localStream; this.videoStreamType = videoStreamType; From acf0db136e6a7624419352d05c43d1287ff57cbd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Calvi=C3=B1o=20S=C3=A1nchez?= Date: Thu, 5 Dec 2024 14:13:36 +0100 Subject: [PATCH 02/16] Add dummy Log implementation to be used in tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Log methods are static, so they can not be mocked using Mockito. Although it might be possible to use PowerMockito a dummy implementation was added instead, as Log uses are widespread and it is not something worth mocking anyway. Signed-off-by: Daniel Calviño Sánchez --- app/src/test/java/android/util/Log.java | 51 +++++++++++++++++++++++++ 1 file changed, 51 insertions(+) create mode 100644 app/src/test/java/android/util/Log.java diff --git a/app/src/test/java/android/util/Log.java b/app/src/test/java/android/util/Log.java new file mode 100644 index 0000000000..7090f73cd5 --- /dev/null +++ b/app/src/test/java/android/util/Log.java @@ -0,0 +1,51 @@ +/* + * Nextcloud Talk - Android Client + * + * SPDX-FileCopyrightText: 2024 Daniel Calviño Sánchez + * SPDX-License-Identifier: GPL-3.0-or-later + */ +package android.util; + +/** + * Dummy implementation of android.util.Log to be used in unit tests. + *

+ * The Android Gradle plugin provides a library with the APIs of the Android framework that throws an exception if any + * of them are called. This class is loaded before that library and therefore becomes the implementation used during the + * tests, simply printing the messages to the system console. + */ +public class Log { + + public static int d(String tag, String msg) { + System.out.println("DEBUG: " + tag + ": " + msg); + + return 1; + } + + public static int e(String tag, String msg) { + System.out.println("ERROR: " + tag + ": " + msg); + + return 1; + } + + public static int i(String tag, String msg) { + System.out.println("INFO: " + tag + ": " + msg); + + return 1; + } + + public static boolean isLoggable(String tag, int level) { + return true; + } + + public static int v(String tag, String msg) { + System.out.println("VERBOSE: " + tag + ": " + msg); + + return 1; + } + + public static int w(String tag, String msg) { + System.out.println("WARN: " + tag + ": " + msg); + + return 1; + } +} From bc29b0e25816835ff07e0c96ef78562189142b6f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Calvi=C3=B1o=20S=C3=A1nchez?= Date: Thu, 5 Dec 2024 14:34:06 +0100 Subject: [PATCH 03/16] Add unit tests for receiving data channel messages MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Daniel Calviño Sánchez --- .../talk/webrtc/PeerConnectionWrapper.java | 3 + .../talk/webrtc/PeerConnectionWrapperTest.kt | 194 ++++++++++++++++++ 2 files changed, 197 insertions(+) create mode 100644 app/src/test/java/com/nextcloud/talk/webrtc/PeerConnectionWrapperTest.kt diff --git a/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java b/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java index b77950ec38..fdc211e743 100644 --- a/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java +++ b/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java @@ -71,6 +71,9 @@ public class PeerConnectionWrapper { /** * Listener for data channel messages. *

+ * Messages might have been received on any data channel, independently of its label or whether it was open by the + * local or the remote peer. + *

* The messages are bound to a specific peer connection, so each listener is expected to handle messages only for * a single peer connection. *

diff --git a/app/src/test/java/com/nextcloud/talk/webrtc/PeerConnectionWrapperTest.kt b/app/src/test/java/com/nextcloud/talk/webrtc/PeerConnectionWrapperTest.kt new file mode 100644 index 0000000000..98e2de4ae0 --- /dev/null +++ b/app/src/test/java/com/nextcloud/talk/webrtc/PeerConnectionWrapperTest.kt @@ -0,0 +1,194 @@ +/* + * Nextcloud Talk - Android Client + * + * SPDX-FileCopyrightText: 2024 Daniel Calviño Sánchez + * SPDX-License-Identifier: GPL-3.0-or-later + */ +package com.nextcloud.talk.webrtc + +import com.bluelinelabs.logansquare.LoganSquare +import com.nextcloud.talk.models.json.signaling.DataChannelMessage +import com.nextcloud.talk.signaling.SignalingMessageReceiver +import com.nextcloud.talk.signaling.SignalingMessageSender +import com.nextcloud.talk.webrtc.PeerConnectionWrapper.DataChannelMessageListener +import org.junit.Before +import org.junit.Test +import org.mockito.ArgumentCaptor +import org.mockito.ArgumentMatchers.any +import org.mockito.ArgumentMatchers.eq +import org.mockito.Mockito +import org.mockito.Mockito.doNothing +import org.webrtc.DataChannel +import org.webrtc.MediaConstraints +import org.webrtc.PeerConnection +import org.webrtc.PeerConnectionFactory +import java.nio.ByteBuffer +import java.util.HashMap + +class PeerConnectionWrapperTest { + + private var peerConnectionWrapper: PeerConnectionWrapper? = null + private var mockedPeerConnection: PeerConnection? = null + private var mockedPeerConnectionFactory: PeerConnectionFactory? = null + private var mockedSignalingMessageReceiver: SignalingMessageReceiver? = null + private var mockedSignalingMessageSender: SignalingMessageSender? = null + + private fun dataChannelMessageToBuffer(dataChannelMessage: DataChannelMessage): DataChannel.Buffer { + return DataChannel.Buffer( + ByteBuffer.wrap(LoganSquare.serialize(dataChannelMessage).toByteArray()), + false + ) + } + + @Before + fun setUp() { + mockedPeerConnection = Mockito.mock(PeerConnection::class.java) + mockedPeerConnectionFactory = Mockito.mock(PeerConnectionFactory::class.java) + mockedSignalingMessageReceiver = Mockito.mock(SignalingMessageReceiver::class.java) + mockedSignalingMessageSender = Mockito.mock(SignalingMessageSender::class.java) + } + + @Test + fun testReceiveDataChannelMessage() { + Mockito.`when`( + mockedPeerConnectionFactory!!.createPeerConnection( + any(PeerConnection.RTCConfiguration::class.java), + any(PeerConnection.Observer::class.java) + ) + ).thenReturn(mockedPeerConnection) + + val mockedStatusDataChannel = Mockito.mock(DataChannel::class.java) + Mockito.`when`(mockedStatusDataChannel.label()).thenReturn("status") + Mockito.`when`(mockedStatusDataChannel.state()).thenReturn(DataChannel.State.OPEN) + Mockito.`when`(mockedPeerConnection!!.createDataChannel(eq("status"), any())) + .thenReturn(mockedStatusDataChannel) + + val statusDataChannelObserverArgumentCaptor: ArgumentCaptor = + ArgumentCaptor.forClass(DataChannel.Observer::class.java) + + doNothing().`when`(mockedStatusDataChannel).registerObserver(statusDataChannelObserverArgumentCaptor.capture()) + + peerConnectionWrapper = PeerConnectionWrapper( + mockedPeerConnectionFactory, + ArrayList(), + MediaConstraints(), + "the-session-id", + "the-local-session-id", + null, + true, + true, + "video", + mockedSignalingMessageReceiver, + mockedSignalingMessageSender + ) + + val mockedDataChannelMessageListener = Mockito.mock(DataChannelMessageListener::class.java) + peerConnectionWrapper!!.addListener(mockedDataChannelMessageListener) + + // The payload must be a map to be able to serialize it and, therefore, generate the data that would have been + // received from another participant, so it is not possible to test receiving the nick as a String payload. + val payloadMap = HashMap() + payloadMap["name"] = "the-nick-in-map" + + statusDataChannelObserverArgumentCaptor.value.onMessage( + dataChannelMessageToBuffer(DataChannelMessage("nickChanged", null, payloadMap)) + ) + + Mockito.verify(mockedDataChannelMessageListener).onNickChanged("the-nick-in-map") + Mockito.verifyNoMoreInteractions(mockedDataChannelMessageListener) + + statusDataChannelObserverArgumentCaptor.value.onMessage( + dataChannelMessageToBuffer(DataChannelMessage("audioOn")) + ) + + Mockito.verify(mockedDataChannelMessageListener).onAudioOn() + Mockito.verifyNoMoreInteractions(mockedDataChannelMessageListener) + + statusDataChannelObserverArgumentCaptor.value.onMessage( + dataChannelMessageToBuffer(DataChannelMessage("audioOff")) + ) + + Mockito.verify(mockedDataChannelMessageListener).onAudioOff() + Mockito.verifyNoMoreInteractions(mockedDataChannelMessageListener) + + statusDataChannelObserverArgumentCaptor.value.onMessage( + dataChannelMessageToBuffer(DataChannelMessage("videoOn")) + ) + + Mockito.verify(mockedDataChannelMessageListener).onVideoOn() + Mockito.verifyNoMoreInteractions(mockedDataChannelMessageListener) + + statusDataChannelObserverArgumentCaptor.value.onMessage( + dataChannelMessageToBuffer(DataChannelMessage("videoOff")) + ) + + Mockito.verify(mockedDataChannelMessageListener).onVideoOff() + Mockito.verifyNoMoreInteractions(mockedDataChannelMessageListener) + } + + @Test + fun testReceiveDataChannelMessageWithOpenRemoteDataChannel() { + val peerConnectionObserverArgumentCaptor: ArgumentCaptor = + ArgumentCaptor.forClass(PeerConnection.Observer::class.java) + + Mockito.`when`( + mockedPeerConnectionFactory!!.createPeerConnection( + any(PeerConnection.RTCConfiguration::class.java), + peerConnectionObserverArgumentCaptor.capture() + ) + ).thenReturn(mockedPeerConnection) + + val mockedStatusDataChannel = Mockito.mock(DataChannel::class.java) + Mockito.`when`(mockedStatusDataChannel.label()).thenReturn("status") + Mockito.`when`(mockedStatusDataChannel.state()).thenReturn(DataChannel.State.OPEN) + Mockito.`when`(mockedPeerConnection!!.createDataChannel(eq("status"), any())) + .thenReturn(mockedStatusDataChannel) + + val statusDataChannelObserverArgumentCaptor: ArgumentCaptor = + ArgumentCaptor.forClass(DataChannel.Observer::class.java) + + doNothing().`when`(mockedStatusDataChannel).registerObserver(statusDataChannelObserverArgumentCaptor.capture()) + + peerConnectionWrapper = PeerConnectionWrapper( + mockedPeerConnectionFactory, + ArrayList(), + MediaConstraints(), + "the-session-id", + "the-local-session-id", + null, + true, + true, + "video", + mockedSignalingMessageReceiver, + mockedSignalingMessageSender + ) + + val randomIdDataChannelObserverArgumentCaptor: ArgumentCaptor = + ArgumentCaptor.forClass(DataChannel.Observer::class.java) + + val mockedRandomIdDataChannel = Mockito.mock(DataChannel::class.java) + Mockito.`when`(mockedRandomIdDataChannel.label()).thenReturn("random-id") + Mockito.`when`(mockedRandomIdDataChannel.state()).thenReturn(DataChannel.State.OPEN) + doNothing().`when`(mockedRandomIdDataChannel).registerObserver( + randomIdDataChannelObserverArgumentCaptor.capture() + ) + peerConnectionObserverArgumentCaptor.value.onDataChannel(mockedRandomIdDataChannel) + + val mockedDataChannelMessageListener = Mockito.mock(DataChannelMessageListener::class.java) + peerConnectionWrapper!!.addListener(mockedDataChannelMessageListener) + + statusDataChannelObserverArgumentCaptor.value.onMessage( + dataChannelMessageToBuffer(DataChannelMessage("audioOn")) + ) + + Mockito.verify(mockedDataChannelMessageListener).onAudioOn() + Mockito.verifyNoMoreInteractions(mockedDataChannelMessageListener) + + randomIdDataChannelObserverArgumentCaptor.value.onMessage( + dataChannelMessageToBuffer(DataChannelMessage("audioOff")) + ) + + Mockito.verify(mockedDataChannelMessageListener).onAudioOff() + Mockito.verifyNoMoreInteractions(mockedDataChannelMessageListener) + } +} From 1b4d80dad6568e9ca6b8d47fbfa8ef1106853e9b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Calvi=C3=B1o=20S=C3=A1nchez?= Date: Thu, 5 Dec 2024 14:54:00 +0100 Subject: [PATCH 04/16] Unify log messages for received data channel messages MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Daniel Calviño Sánchez --- .../java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java b/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java index fdc211e743..34ea47eaa9 100644 --- a/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java +++ b/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java @@ -379,7 +379,7 @@ public void onStateChange() { @Override public void onMessage(DataChannel.Buffer buffer) { if (buffer.binary) { - Log.d(TAG, "Received binary msg over " + TAG + " " + sessionId); + Log.d(TAG, "Received binary data channel message over " + TAG + " " + sessionId); return; } @@ -387,7 +387,7 @@ public void onMessage(DataChannel.Buffer buffer) { final byte[] bytes = new byte[data.capacity()]; data.get(bytes); String strData = new String(bytes); - Log.d(TAG, "Got msg: " + strData + " over " + TAG + " " + sessionId); + Log.d(TAG, "Received data channel message (" + strData + ") over " + TAG + " " + sessionId); DataChannelMessage dataChannelMessage; try { From f2046629e44c05409dbfb9f6c9016a9fcf7ae068 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Calvi=C3=B1o=20S=C3=A1nchez?= Date: Thu, 5 Dec 2024 14:55:57 +0100 Subject: [PATCH 05/16] Include data channel label in log message MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This implicitly fixes trying to send the initial state on the latest remote data channel found (which is the one stored in the "dataChannel" attribute of the "PeerConnectionWrapper") when any other existing data channel changes its status to open. Nevertheless, as all this will be reworked, no unit test was added for it. Signed-off-by: Daniel Calviño Sánchez --- .../talk/webrtc/PeerConnectionWrapper.java | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java b/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java index 34ea47eaa9..2d033d08e6 100644 --- a/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java +++ b/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java @@ -36,6 +36,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Objects; import androidx.annotation.Nullable; @@ -144,7 +145,7 @@ public PeerConnectionWrapper(PeerConnectionFactory peerConnectionFactory, DataChannel.Init init = new DataChannel.Init(); init.negotiated = false; dataChannel = peerConnection.createDataChannel("status", init); - dataChannel.registerObserver(new DataChannelObserver()); + dataChannel.registerObserver(new DataChannelObserver(dataChannel)); if (isMCUPublisher) { peerConnection.createOffer(sdpObserver, mediaConstraints); } else if (hasMCU && "video".equals(this.videoStreamType)) { @@ -363,6 +364,12 @@ public void onEndOfCandidates() { private class DataChannelObserver implements DataChannel.Observer { + private final DataChannel dataChannel; + + public DataChannelObserver(DataChannel dataChannel) { + this.dataChannel = Objects.requireNonNull(dataChannel); + } + @Override public void onBufferedAmountChange(long l) { @@ -370,8 +377,7 @@ public void onBufferedAmountChange(long l) { @Override public void onStateChange() { - if (dataChannel != null && - dataChannel.state() == DataChannel.State.OPEN) { + if (dataChannel.state() == DataChannel.State.OPEN) { sendInitialMediaStatus(); } } @@ -379,7 +385,7 @@ public void onStateChange() { @Override public void onMessage(DataChannel.Buffer buffer) { if (buffer.binary) { - Log.d(TAG, "Received binary data channel message over " + TAG + " " + sessionId); + Log.d(TAG, "Received binary data channel message over " + dataChannel.label() + " " + sessionId); return; } @@ -387,7 +393,7 @@ public void onMessage(DataChannel.Buffer buffer) { final byte[] bytes = new byte[data.capacity()]; data.get(bytes); String strData = new String(bytes); - Log.d(TAG, "Received data channel message (" + strData + ") over " + TAG + " " + sessionId); + Log.d(TAG, "Received data channel message (" + strData + ") over " + dataChannel.label() + " " + sessionId); DataChannelMessage dataChannelMessage; try { @@ -512,7 +518,7 @@ public void onDataChannel(DataChannel dataChannel) { + " exists, but received onDataChannel event for DataChannel with label " + dataChannel.label()); } PeerConnectionWrapper.this.dataChannel = dataChannel; - PeerConnectionWrapper.this.dataChannel.registerObserver(new DataChannelObserver()); + PeerConnectionWrapper.this.dataChannel.registerObserver(new DataChannelObserver(dataChannel)); } @Override From e9daa6ec5fae88f6b9732cc6c3197389f02e579e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Calvi=C3=B1o=20S=C3=A1nchez?= Date: Fri, 6 Dec 2024 14:00:57 +0100 Subject: [PATCH 06/16] Rename "sendChannelData" to "send" MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The legacy name was a bit strange, so now it is renamed to just "send" as the parameter type ("DataChannelMessage") gives enough context. Signed-off-by: Daniel Calviño Sánchez --- .../java/com/nextcloud/talk/activities/CallActivity.kt | 10 +++++----- .../nextcloud/talk/webrtc/PeerConnectionWrapper.java | 10 +++++----- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/app/src/main/java/com/nextcloud/talk/activities/CallActivity.kt b/app/src/main/java/com/nextcloud/talk/activities/CallActivity.kt index 9220952c0d..3bcc7611b0 100644 --- a/app/src/main/java/com/nextcloud/talk/activities/CallActivity.kt +++ b/app/src/main/java/com/nextcloud/talk/activities/CallActivity.kt @@ -1174,12 +1174,12 @@ class CallActivity : CallBaseActivity() { if (isConnectionEstablished && othersInCall) { if (!hasMCU) { for (peerConnectionWrapper in peerConnectionWrapperList) { - peerConnectionWrapper.sendChannelData(DataChannelMessage(isSpeakingMessage)) + peerConnectionWrapper.send(DataChannelMessage(isSpeakingMessage)) } } else { for (peerConnectionWrapper in peerConnectionWrapperList) { if (peerConnectionWrapper.sessionId == webSocketClient!!.sessionId) { - peerConnectionWrapper.sendChannelData(DataChannelMessage(isSpeakingMessage)) + peerConnectionWrapper.send(DataChannelMessage(isSpeakingMessage)) break } } @@ -1370,12 +1370,12 @@ class CallActivity : CallBaseActivity() { if (isConnectionEstablished) { if (!hasMCU) { for (peerConnectionWrapper in peerConnectionWrapperList) { - peerConnectionWrapper.sendChannelData(DataChannelMessage(message)) + peerConnectionWrapper.send(DataChannelMessage(message)) } } else { for (peerConnectionWrapper in peerConnectionWrapperList) { if (peerConnectionWrapper.sessionId == webSocketClient!!.sessionId) { - peerConnectionWrapper.sendChannelData(DataChannelMessage(message)) + peerConnectionWrapper.send(DataChannelMessage(message)) break } } @@ -2563,7 +2563,7 @@ class CallActivity : CallBaseActivity() { } override fun onNext(aLong: Long) { - peerConnectionWrapper.sendChannelData(dataChannelMessage) + peerConnectionWrapper.send(dataChannelMessage) } override fun onError(e: Throwable) { diff --git a/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java b/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java index 2d033d08e6..bf95befb82 100644 --- a/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java +++ b/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java @@ -269,7 +269,7 @@ private void addCandidate(IceCandidate iceCandidate) { } } - public void sendChannelData(DataChannelMessage dataChannelMessage) { + public void send(DataChannelMessage dataChannelMessage) { ByteBuffer buffer; if (dataChannel != null && dataChannelMessage != null) { try { @@ -292,15 +292,15 @@ public String getSessionId() { private void sendInitialMediaStatus() { if (localStream != null) { if (localStream.videoTracks.size() == 1 && localStream.videoTracks.get(0).enabled()) { - sendChannelData(new DataChannelMessage("videoOn")); + send(new DataChannelMessage("videoOn")); } else { - sendChannelData(new DataChannelMessage("videoOff")); + send(new DataChannelMessage("videoOff")); } if (localStream.audioTracks.size() == 1 && localStream.audioTracks.get(0).enabled()) { - sendChannelData(new DataChannelMessage("audioOn")); + send(new DataChannelMessage("audioOn")); } else { - sendChannelData(new DataChannelMessage("audioOff")); + send(new DataChannelMessage("audioOff")); } } } From 9384ef6ced95ad44bdb8d9032086f8fa6aef10a2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Calvi=C3=B1o=20S=C3=A1nchez?= Date: Thu, 5 Dec 2024 17:42:34 +0100 Subject: [PATCH 07/16] Send data channel messages using "status" data channel MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Data channel messages are expected to be sent using the "status" data channel that is locally created. However, if another data channel was opened by the remote peer the reference to the "status" data channel was overwritten with the new data channel, and messages were sent instead on the remote data channel. In current Talk versions that was not a problem, and the change makes no difference either, because since the support for Janus 1.x was added data channel messages are listened on all data channels, independently of their label or whether they were created by the local or remote peer. However, in older Talk versions this fixes a regression introduced with the support for Janus 1.x. In those versions only messages coming from the "status" or "JanusDataChannel" data channels were taken into account. When Janus is not used the WebUI opens the legacy "simplewebrtc" data channel, so that data channel may be the one used to send data channel messages (if it is open after the "status" data channel), but the messages received on that data channel were ignored by the WebUI. Nevertheless, at this point this is more an academic problem than a real world problem, as it is unlikely that there are many Nextcloud servers with Talk < 16 and without HPB being used. Independently of all that, when the peer connection is removed only the "status" data channel is disposed, but none of the remote data channels are. This is just a variation of an already existing bug (the last open data channel was the one disposed due to being the last saved reference, but the rest were not) and it will be fixed in another commit. Signed-off-by: Daniel Calviño Sánchez --- .../talk/webrtc/PeerConnectionWrapper.java | 35 ++++--- .../talk/webrtc/PeerConnectionWrapperTest.kt | 97 +++++++++++++++++++ 2 files changed, 118 insertions(+), 14 deletions(-) diff --git a/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java b/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java index bf95befb82..6413f0344b 100644 --- a/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java +++ b/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java @@ -57,7 +57,7 @@ public class PeerConnectionWrapper { private PeerConnection peerConnection; private String sessionId; private final MediaConstraints mediaConstraints; - private DataChannel dataChannel; + private DataChannel statusDataChannel; private final SdpObserver sdpObserver; private final boolean hasInitiated; @@ -144,8 +144,8 @@ public PeerConnectionWrapper(PeerConnectionFactory peerConnectionFactory, if (hasMCU || hasInitiated) { DataChannel.Init init = new DataChannel.Init(); init.negotiated = false; - dataChannel = peerConnection.createDataChannel("status", init); - dataChannel.registerObserver(new DataChannelObserver(dataChannel)); + statusDataChannel = peerConnection.createDataChannel("status", init); + statusDataChannel.registerObserver(new DataChannelObserver(statusDataChannel)); if (isMCUPublisher) { peerConnection.createOffer(sdpObserver, mediaConstraints); } else if (hasMCU && "video".equals(this.videoStreamType)) { @@ -233,9 +233,9 @@ public MediaStream getStream() { public void removePeerConnection() { signalingMessageReceiver.removeListener(webRtcMessageListener); - if (dataChannel != null) { - dataChannel.dispose(); - dataChannel = null; + if (statusDataChannel != null) { + statusDataChannel.dispose(); + statusDataChannel = null; Log.d(TAG, "Disposed DataChannel"); } else { Log.d(TAG, "DataChannel is null."); @@ -269,12 +269,24 @@ private void addCandidate(IceCandidate iceCandidate) { } } + /** + * Sends a data channel message. + *

+ * Data channel messages are always sent on the "status" data channel locally opened. However, if Janus is used, + * messages can be sent only on publisher connections, even if subscriber connections have a "status" data channel; + * messages sent on subscriber connections will be simply ignored. Moreover, even if the message is sent on the + * "status" data channel subscriber connections will receive it on a data channel with a different label, as + * Janus opens its own data channel on subscriber connections and "multiplexes" all the received data channel + * messages on it, independently of on which data channel they were originally sent. + * + * @param dataChannelMessage the message to send + */ public void send(DataChannelMessage dataChannelMessage) { ByteBuffer buffer; - if (dataChannel != null && dataChannelMessage != null) { + if (statusDataChannel != null && dataChannelMessage != null) { try { buffer = ByteBuffer.wrap(LoganSquare.serialize(dataChannelMessage).getBytes()); - dataChannel.send(new DataChannel.Buffer(buffer, false)); + statusDataChannel.send(new DataChannel.Buffer(buffer, false)); } catch (Exception e) { Log.d(TAG, "Failed to send channel data, attempting regular " + dataChannelMessage); } @@ -513,12 +525,7 @@ public void onRemoveStream(MediaStream mediaStream) { @Override public void onDataChannel(DataChannel dataChannel) { - if (PeerConnectionWrapper.this.dataChannel != null) { - Log.w(TAG, "Data channel with label " + PeerConnectionWrapper.this.dataChannel.label() - + " exists, but received onDataChannel event for DataChannel with label " + dataChannel.label()); - } - PeerConnectionWrapper.this.dataChannel = dataChannel; - PeerConnectionWrapper.this.dataChannel.registerObserver(new DataChannelObserver(dataChannel)); + dataChannel.registerObserver(new DataChannelObserver(dataChannel)); } @Override diff --git a/app/src/test/java/com/nextcloud/talk/webrtc/PeerConnectionWrapperTest.kt b/app/src/test/java/com/nextcloud/talk/webrtc/PeerConnectionWrapperTest.kt index 98e2de4ae0..be628c2afd 100644 --- a/app/src/test/java/com/nextcloud/talk/webrtc/PeerConnectionWrapperTest.kt +++ b/app/src/test/java/com/nextcloud/talk/webrtc/PeerConnectionWrapperTest.kt @@ -14,10 +14,13 @@ import com.nextcloud.talk.webrtc.PeerConnectionWrapper.DataChannelMessageListene import org.junit.Before import org.junit.Test import org.mockito.ArgumentCaptor +import org.mockito.ArgumentMatcher import org.mockito.ArgumentMatchers.any +import org.mockito.ArgumentMatchers.argThat import org.mockito.ArgumentMatchers.eq import org.mockito.Mockito import org.mockito.Mockito.doNothing +import org.mockito.Mockito.never import org.webrtc.DataChannel import org.webrtc.MediaConstraints import org.webrtc.PeerConnection @@ -33,6 +36,19 @@ class PeerConnectionWrapperTest { private var mockedSignalingMessageReceiver: SignalingMessageReceiver? = null private var mockedSignalingMessageSender: SignalingMessageSender? = null + /** + * Helper matcher for DataChannelMessages. + */ + private inner class MatchesDataChannelMessage( + private val expectedDataChannelMessage: DataChannelMessage + ) : ArgumentMatcher { + override fun matches(buffer: DataChannel.Buffer): Boolean { + // DataChannel.Buffer does not implement "equals", so the comparison needs to be done on the ByteBuffer + // instead. + return dataChannelMessageToBuffer(expectedDataChannelMessage).data.equals(buffer.data) + } + } + private fun dataChannelMessageToBuffer(dataChannelMessage: DataChannelMessage): DataChannel.Buffer { return DataChannel.Buffer( ByteBuffer.wrap(LoganSquare.serialize(dataChannelMessage).toByteArray()), @@ -48,6 +64,87 @@ class PeerConnectionWrapperTest { mockedSignalingMessageSender = Mockito.mock(SignalingMessageSender::class.java) } + @Test + fun testSendDataChannelMessage() { + Mockito.`when`( + mockedPeerConnectionFactory!!.createPeerConnection( + any(PeerConnection.RTCConfiguration::class.java), + any(PeerConnection.Observer::class.java) + ) + ).thenReturn(mockedPeerConnection) + + val mockedStatusDataChannel = Mockito.mock(DataChannel::class.java) + Mockito.`when`(mockedStatusDataChannel.label()).thenReturn("status") + Mockito.`when`(mockedStatusDataChannel.state()).thenReturn(DataChannel.State.OPEN) + Mockito.`when`(mockedPeerConnection!!.createDataChannel(eq("status"), any())) + .thenReturn(mockedStatusDataChannel) + + peerConnectionWrapper = PeerConnectionWrapper( + mockedPeerConnectionFactory, + ArrayList(), + MediaConstraints(), + "the-session-id", + "the-local-session-id", + null, + true, + true, + "video", + mockedSignalingMessageReceiver, + mockedSignalingMessageSender + ) + + peerConnectionWrapper!!.send(DataChannelMessage("the-message-type")) + + Mockito.verify(mockedStatusDataChannel).send( + argThat(MatchesDataChannelMessage(DataChannelMessage("the-message-type"))) + ) + } + + @Test + fun testSendDataChannelMessageWithOpenRemoteDataChannel() { + val peerConnectionObserverArgumentCaptor: ArgumentCaptor = + ArgumentCaptor.forClass(PeerConnection.Observer::class.java) + + Mockito.`when`( + mockedPeerConnectionFactory!!.createPeerConnection( + any(PeerConnection.RTCConfiguration::class.java), + peerConnectionObserverArgumentCaptor.capture() + ) + ).thenReturn(mockedPeerConnection) + + val mockedStatusDataChannel = Mockito.mock(DataChannel::class.java) + Mockito.`when`(mockedStatusDataChannel.label()).thenReturn("status") + Mockito.`when`(mockedStatusDataChannel.state()).thenReturn(DataChannel.State.OPEN) + Mockito.`when`(mockedPeerConnection!!.createDataChannel(eq("status"), any())) + .thenReturn(mockedStatusDataChannel) + + peerConnectionWrapper = PeerConnectionWrapper( + mockedPeerConnectionFactory, + ArrayList(), + MediaConstraints(), + "the-session-id", + "the-local-session-id", + null, + true, + true, + "video", + mockedSignalingMessageReceiver, + mockedSignalingMessageSender + ) + + val mockedRandomIdDataChannel = Mockito.mock(DataChannel::class.java) + Mockito.`when`(mockedRandomIdDataChannel.label()).thenReturn("random-id") + Mockito.`when`(mockedRandomIdDataChannel.state()).thenReturn(DataChannel.State.OPEN) + peerConnectionObserverArgumentCaptor.value.onDataChannel(mockedRandomIdDataChannel) + + peerConnectionWrapper!!.send(DataChannelMessage("the-message-type")) + + Mockito.verify(mockedStatusDataChannel).send( + argThat(MatchesDataChannelMessage(DataChannelMessage("the-message-type"))) + ) + Mockito.verify(mockedRandomIdDataChannel, never()).send(any()) + } + @Test fun testReceiveDataChannelMessage() { Mockito.`when`( From 1ede0689df34c9e874d2f0dd2a9ad9f60a983115 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Calvi=C3=B1o=20S=C3=A1nchez?= Date: Fri, 6 Dec 2024 03:08:15 +0100 Subject: [PATCH 08/16] Fix remote data channels not disposed when removing peer connection MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Daniel Calviño Sánchez --- .../talk/webrtc/PeerConnectionWrapper.java | 36 ++++++++++++---- .../talk/webrtc/PeerConnectionWrapperTest.kt | 43 +++++++++++++++++++ 2 files changed, 71 insertions(+), 8 deletions(-) diff --git a/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java b/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java index 6413f0344b..27ad9e439a 100644 --- a/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java +++ b/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java @@ -34,6 +34,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -57,7 +58,7 @@ public class PeerConnectionWrapper { private PeerConnection peerConnection; private String sessionId; private final MediaConstraints mediaConstraints; - private DataChannel statusDataChannel; + private final Map dataChannels = new HashMap<>(); private final SdpObserver sdpObserver; private final boolean hasInitiated; @@ -144,8 +145,11 @@ public PeerConnectionWrapper(PeerConnectionFactory peerConnectionFactory, if (hasMCU || hasInitiated) { DataChannel.Init init = new DataChannel.Init(); init.negotiated = false; - statusDataChannel = peerConnection.createDataChannel("status", init); + + DataChannel statusDataChannel = peerConnection.createDataChannel("status", init); statusDataChannel.registerObserver(new DataChannelObserver(statusDataChannel)); + dataChannels.put("status", statusDataChannel); + if (isMCUPublisher) { peerConnection.createOffer(sdpObserver, mediaConstraints); } else if (hasMCU && "video".equals(this.videoStreamType)) { @@ -233,13 +237,12 @@ public MediaStream getStream() { public void removePeerConnection() { signalingMessageReceiver.removeListener(webRtcMessageListener); - if (statusDataChannel != null) { - statusDataChannel.dispose(); - statusDataChannel = null; - Log.d(TAG, "Disposed DataChannel"); - } else { - Log.d(TAG, "DataChannel is null."); + for (DataChannel dataChannel: dataChannels.values()) { + Log.d(TAG, "Disposed DataChannel " + dataChannel.label()); + + dataChannel.dispose(); } + dataChannels.clear(); if (peerConnection != null) { peerConnection.close(); @@ -283,6 +286,7 @@ private void addCandidate(IceCandidate iceCandidate) { */ public void send(DataChannelMessage dataChannelMessage) { ByteBuffer buffer; + DataChannel statusDataChannel = dataChannels.get("status"); if (statusDataChannel != null && dataChannelMessage != null) { try { buffer = ByteBuffer.wrap(LoganSquare.serialize(dataChannelMessage).getBytes()); @@ -525,7 +529,23 @@ public void onRemoveStream(MediaStream mediaStream) { @Override public void onDataChannel(DataChannel dataChannel) { + // Another data channel with the same label, no matter if the same instance or a different one, should not + // be added, but just in case. + DataChannel oldDataChannel = dataChannels.get(dataChannel.label()); + if (oldDataChannel == dataChannel) { + Log.w(TAG, "Data channel with label " + dataChannel.label() + " added again"); + + return; + } + + if (oldDataChannel != null) { + Log.w(TAG, "Data channel with label " + dataChannel.label() + " exists"); + + oldDataChannel.dispose(); + } + dataChannel.registerObserver(new DataChannelObserver(dataChannel)); + dataChannels.put(dataChannel.label(), dataChannel); } @Override diff --git a/app/src/test/java/com/nextcloud/talk/webrtc/PeerConnectionWrapperTest.kt b/app/src/test/java/com/nextcloud/talk/webrtc/PeerConnectionWrapperTest.kt index be628c2afd..450cc4fa83 100644 --- a/app/src/test/java/com/nextcloud/talk/webrtc/PeerConnectionWrapperTest.kt +++ b/app/src/test/java/com/nextcloud/talk/webrtc/PeerConnectionWrapperTest.kt @@ -288,4 +288,47 @@ class PeerConnectionWrapperTest { Mockito.verify(mockedDataChannelMessageListener).onAudioOff() Mockito.verifyNoMoreInteractions(mockedDataChannelMessageListener) } + + @Test + fun testRemovePeerConnectionWithOpenRemoteDataChannel() { + val peerConnectionObserverArgumentCaptor: ArgumentCaptor = + ArgumentCaptor.forClass(PeerConnection.Observer::class.java) + + Mockito.`when`( + mockedPeerConnectionFactory!!.createPeerConnection( + any(PeerConnection.RTCConfiguration::class.java), + peerConnectionObserverArgumentCaptor.capture() + ) + ).thenReturn(mockedPeerConnection) + + val mockedStatusDataChannel = Mockito.mock(DataChannel::class.java) + Mockito.`when`(mockedStatusDataChannel.label()).thenReturn("status") + Mockito.`when`(mockedStatusDataChannel.state()).thenReturn(DataChannel.State.OPEN) + Mockito.`when`(mockedPeerConnection!!.createDataChannel(eq("status"), any())) + .thenReturn(mockedStatusDataChannel) + + peerConnectionWrapper = PeerConnectionWrapper( + mockedPeerConnectionFactory, + ArrayList(), + MediaConstraints(), + "the-session-id", + "the-local-session-id", + null, + true, + true, + "video", + mockedSignalingMessageReceiver, + mockedSignalingMessageSender + ) + + val mockedRandomIdDataChannel = Mockito.mock(DataChannel::class.java) + Mockito.`when`(mockedRandomIdDataChannel.label()).thenReturn("random-id") + Mockito.`when`(mockedRandomIdDataChannel.state()).thenReturn(DataChannel.State.OPEN) + peerConnectionObserverArgumentCaptor.value.onDataChannel(mockedRandomIdDataChannel) + + peerConnectionWrapper!!.removePeerConnection() + + Mockito.verify(mockedStatusDataChannel).dispose() + Mockito.verify(mockedRandomIdDataChannel).dispose() + } } From 2f051033ee47e08a459a011e09ef9506f1354eb2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Calvi=C3=B1o=20S=C3=A1nchez?= Date: Fri, 6 Dec 2024 04:13:49 +0100 Subject: [PATCH 09/16] Move variable declaration into try block MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Daniel Calviño Sánchez --- .../java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java b/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java index 27ad9e439a..08c6b8fcf4 100644 --- a/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java +++ b/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java @@ -285,11 +285,10 @@ private void addCandidate(IceCandidate iceCandidate) { * @param dataChannelMessage the message to send */ public void send(DataChannelMessage dataChannelMessage) { - ByteBuffer buffer; DataChannel statusDataChannel = dataChannels.get("status"); if (statusDataChannel != null && dataChannelMessage != null) { try { - buffer = ByteBuffer.wrap(LoganSquare.serialize(dataChannelMessage).getBytes()); + ByteBuffer buffer = ByteBuffer.wrap(LoganSquare.serialize(dataChannelMessage).getBytes()); statusDataChannel.send(new DataChannel.Buffer(buffer, false)); } catch (Exception e) { Log.d(TAG, "Failed to send channel data, attempting regular " + dataChannelMessage); From 0223d6700d6ea39b3380bcf9c11b0f9e16e5fdde Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Calvi=C3=B1o=20S=C3=A1nchez?= Date: Fri, 6 Dec 2024 04:14:58 +0100 Subject: [PATCH 10/16] Rewrite method to return early MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Daniel Calviño Sánchez --- .../talk/webrtc/PeerConnectionWrapper.java | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java b/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java index 08c6b8fcf4..9474471fea 100644 --- a/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java +++ b/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java @@ -286,13 +286,15 @@ private void addCandidate(IceCandidate iceCandidate) { */ public void send(DataChannelMessage dataChannelMessage) { DataChannel statusDataChannel = dataChannels.get("status"); - if (statusDataChannel != null && dataChannelMessage != null) { - try { - ByteBuffer buffer = ByteBuffer.wrap(LoganSquare.serialize(dataChannelMessage).getBytes()); - statusDataChannel.send(new DataChannel.Buffer(buffer, false)); - } catch (Exception e) { - Log.d(TAG, "Failed to send channel data, attempting regular " + dataChannelMessage); - } + if (statusDataChannel == null || dataChannelMessage == null) { + return; + } + + try { + ByteBuffer buffer = ByteBuffer.wrap(LoganSquare.serialize(dataChannelMessage).getBytes()); + statusDataChannel.send(new DataChannel.Buffer(buffer, false)); + } catch (Exception e) { + Log.d(TAG, "Failed to send channel data, attempting regular " + dataChannelMessage); } } From 3ebcd261dc320d4ac89b61910d007e246ea9a3bd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Calvi=C3=B1o=20S=C3=A1nchez?= Date: Fri, 6 Dec 2024 04:15:46 +0100 Subject: [PATCH 11/16] Split condition MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Daniel Calviño Sánchez --- .../com/nextcloud/talk/webrtc/PeerConnectionWrapper.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java b/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java index 9474471fea..26f09d31e7 100644 --- a/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java +++ b/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java @@ -285,8 +285,12 @@ private void addCandidate(IceCandidate iceCandidate) { * @param dataChannelMessage the message to send */ public void send(DataChannelMessage dataChannelMessage) { + if (dataChannelMessage == null) { + return; + } + DataChannel statusDataChannel = dataChannels.get("status"); - if (statusDataChannel == null || dataChannelMessage == null) { + if (statusDataChannel == null) { return; } From c77b38598ab5c7247affdb88edefaf8c7ebfd546 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Calvi=C3=B1o=20S=C3=A1nchez?= Date: Fri, 6 Dec 2024 04:29:22 +0100 Subject: [PATCH 12/16] Queue data channel messages sent when data channel is not open MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Data channel messages can be sent only when the data channel is open. Otherwise the message is simply lost. Clients of the PeerConnectionWrapper do not need to be aware of that detail or keep track of whether the data channel was open already or not, so now data channel messages sent before the data channel is open are queued and sent once the data channel is opened. Signed-off-by: Daniel Calviño Sánchez --- .../talk/webrtc/PeerConnectionWrapper.java | 19 ++++++- .../talk/webrtc/PeerConnectionWrapperTest.kt | 50 +++++++++++++++++++ 2 files changed, 68 insertions(+), 1 deletion(-) diff --git a/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java b/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java index 26f09d31e7..be55a68635 100644 --- a/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java +++ b/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java @@ -59,6 +59,7 @@ public class PeerConnectionWrapper { private String sessionId; private final MediaConstraints mediaConstraints; private final Map dataChannels = new HashMap<>(); + private final List pendingDataChannelMessages = new ArrayList<>(); private final SdpObserver sdpObserver; private final boolean hasInitiated; @@ -281,6 +282,13 @@ private void addCandidate(IceCandidate iceCandidate) { * "status" data channel subscriber connections will receive it on a data channel with a different label, as * Janus opens its own data channel on subscriber connections and "multiplexes" all the received data channel * messages on it, independently of on which data channel they were originally sent. + *

+ * Data channel messages can be sent at any time; if the "status" data channel is not open yet the messages will be + * queued and sent once it is opened. Nevertheless, if Janus is used, it is not guaranteed that the messages will + * be received by other participants, as it is only known when the data channel of the publisher was opened, but + * not if the data channel of the subscribers was. However, in general this should be a concern only during the + * first seconds after a participant joins; after some time the subscriber connections should be established and + * their data channels open. * * @param dataChannelMessage the message to send */ @@ -290,7 +298,9 @@ public void send(DataChannelMessage dataChannelMessage) { } DataChannel statusDataChannel = dataChannels.get("status"); - if (statusDataChannel == null) { + if (statusDataChannel == null || statusDataChannel.state() != DataChannel.State.OPEN) { + pendingDataChannelMessages.add(dataChannelMessage); + return; } @@ -398,6 +408,13 @@ public void onBufferedAmountChange(long l) { @Override public void onStateChange() { + if (dataChannel.state() == DataChannel.State.OPEN && "status".equals(dataChannel.label())) { + for (DataChannelMessage dataChannelMessage: pendingDataChannelMessages) { + send(dataChannelMessage); + } + pendingDataChannelMessages.clear(); + } + if (dataChannel.state() == DataChannel.State.OPEN) { sendInitialMediaStatus(); } diff --git a/app/src/test/java/com/nextcloud/talk/webrtc/PeerConnectionWrapperTest.kt b/app/src/test/java/com/nextcloud/talk/webrtc/PeerConnectionWrapperTest.kt index 450cc4fa83..61c5f5daa4 100644 --- a/app/src/test/java/com/nextcloud/talk/webrtc/PeerConnectionWrapperTest.kt +++ b/app/src/test/java/com/nextcloud/talk/webrtc/PeerConnectionWrapperTest.kt @@ -145,6 +145,56 @@ class PeerConnectionWrapperTest { Mockito.verify(mockedRandomIdDataChannel, never()).send(any()) } + @Test + fun testSendDataChannelMessageBeforeOpeningDataChannel() { + Mockito.`when`( + mockedPeerConnectionFactory!!.createPeerConnection( + any(PeerConnection.RTCConfiguration::class.java), + any(PeerConnection.Observer::class.java) + ) + ).thenReturn(mockedPeerConnection) + + val mockedStatusDataChannel = Mockito.mock(DataChannel::class.java) + Mockito.`when`(mockedStatusDataChannel.label()).thenReturn("status") + Mockito.`when`(mockedStatusDataChannel.state()).thenReturn(DataChannel.State.CONNECTING) + Mockito.`when`(mockedPeerConnection!!.createDataChannel(eq("status"), any())) + .thenReturn(mockedStatusDataChannel) + + val statusDataChannelObserverArgumentCaptor: ArgumentCaptor = + ArgumentCaptor.forClass(DataChannel.Observer::class.java) + + doNothing().`when`(mockedStatusDataChannel).registerObserver(statusDataChannelObserverArgumentCaptor.capture()) + + peerConnectionWrapper = PeerConnectionWrapper( + mockedPeerConnectionFactory, + ArrayList(), + MediaConstraints(), + "the-session-id", + "the-local-session-id", + null, + true, + true, + "video", + mockedSignalingMessageReceiver, + mockedSignalingMessageSender + ) + + peerConnectionWrapper!!.send(DataChannelMessage("the-message-type")) + peerConnectionWrapper!!.send(DataChannelMessage("another-message-type")) + + Mockito.verify(mockedStatusDataChannel, never()).send(any()) + + Mockito.`when`(mockedStatusDataChannel.state()).thenReturn(DataChannel.State.OPEN) + statusDataChannelObserverArgumentCaptor.value.onStateChange() + + Mockito.verify(mockedStatusDataChannel).send( + argThat(MatchesDataChannelMessage(DataChannelMessage("the-message-type"))) + ) + Mockito.verify(mockedStatusDataChannel).send( + argThat(MatchesDataChannelMessage(DataChannelMessage("another-message-type"))) + ) + } + @Test fun testReceiveDataChannelMessage() { Mockito.`when`( From fbb7a71fb0633ae97c0dcf9ee8bad431b71ecac7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Calvi=C3=B1o=20S=C3=A1nchez?= Date: Fri, 6 Dec 2024 04:37:13 +0100 Subject: [PATCH 13/16] Add logs for sending data channel messages MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Daniel Calviño Sánchez --- .../com/nextcloud/talk/webrtc/PeerConnectionWrapper.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java b/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java index be55a68635..7bc1f04695 100644 --- a/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java +++ b/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java @@ -299,16 +299,20 @@ public void send(DataChannelMessage dataChannelMessage) { DataChannel statusDataChannel = dataChannels.get("status"); if (statusDataChannel == null || statusDataChannel.state() != DataChannel.State.OPEN) { + Log.d(TAG, "Queuing data channel message (" + dataChannelMessage + ") " + sessionId); + pendingDataChannelMessages.add(dataChannelMessage); return; } try { + Log.d(TAG, "Sending data channel message (" + dataChannelMessage + ") " + sessionId); + ByteBuffer buffer = ByteBuffer.wrap(LoganSquare.serialize(dataChannelMessage).getBytes()); statusDataChannel.send(new DataChannel.Buffer(buffer, false)); } catch (Exception e) { - Log.d(TAG, "Failed to send channel data, attempting regular " + dataChannelMessage); + Log.w(TAG, "Failed to send data channel message"); } } From b7a9bf7033341955bbb276653ed4d25f25748515 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Calvi=C3=B1o=20S=C3=A1nchez?= Date: Mon, 9 Dec 2024 03:11:54 +0100 Subject: [PATCH 14/16] Store data channel label MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Getting the label is no longer possible once the data channel has been disposed. This will help to make the observer thread-safe. Signed-off-by: Daniel Calviño Sánchez --- .../com/nextcloud/talk/webrtc/PeerConnectionWrapper.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java b/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java index 7bc1f04695..08da1f7267 100644 --- a/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java +++ b/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java @@ -400,9 +400,11 @@ public void onEndOfCandidates() { private class DataChannelObserver implements DataChannel.Observer { private final DataChannel dataChannel; + private final String dataChannelLabel; public DataChannelObserver(DataChannel dataChannel) { this.dataChannel = Objects.requireNonNull(dataChannel); + this.dataChannelLabel = dataChannel.label(); } @Override @@ -412,7 +414,7 @@ public void onBufferedAmountChange(long l) { @Override public void onStateChange() { - if (dataChannel.state() == DataChannel.State.OPEN && "status".equals(dataChannel.label())) { + if (dataChannel.state() == DataChannel.State.OPEN && "status".equals(dataChannelLabel)) { for (DataChannelMessage dataChannelMessage: pendingDataChannelMessages) { send(dataChannelMessage); } @@ -427,7 +429,7 @@ public void onStateChange() { @Override public void onMessage(DataChannel.Buffer buffer) { if (buffer.binary) { - Log.d(TAG, "Received binary data channel message over " + dataChannel.label() + " " + sessionId); + Log.d(TAG, "Received binary data channel message over " + dataChannelLabel + " " + sessionId); return; } @@ -435,7 +437,7 @@ public void onMessage(DataChannel.Buffer buffer) { final byte[] bytes = new byte[data.capacity()]; data.get(bytes); String strData = new String(bytes); - Log.d(TAG, "Received data channel message (" + strData + ") over " + dataChannel.label() + " " + sessionId); + Log.d(TAG, "Received data channel message (" + strData + ") over " + dataChannelLabel + " " + sessionId); DataChannelMessage dataChannelMessage; try { From 58f46a9f6c9193afce4e14fd004e7554ab78282a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Calvi=C3=B1o=20S=C3=A1nchez?= Date: Sun, 8 Dec 2024 05:23:11 +0100 Subject: [PATCH 15/16] Fix "removePeerConnection" not being thread-safe MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adding and disposing remote data channels is done from different threads; they are added from the WebRTC signaling thread when "onDataChannel" is called, while they can be disposed potentially from any thread when "removePeerConnection" is called. To prevent race conditions between them now both operations are synchronized. However, as "onDataChannel" belongs to an inner class it needs to use a synchronized statement with the outer class lock. This could still cause a race condition if the same data channel was added again; this should not happen, but it is handled just in case. Moreover, once a data channel is disposed it can be no longer used, and trying to call any of its methods throws an "IllegalStateException". Due to this, as sending can be also done potentially from any thread, it needs to be synchronized too with removing the peer connection. State changes on data channels as well as receiving messages are also done in the WebRTC signaling thread. State changes needs synchronization as well, although receiving messages should not, as it does not directly use the data channel (and it is assumed that using the buffers of a disposed data channel is safe). Nevertheless a little check (which in this case requires synchronization) was added to ignore the received messages if the peer connection was removed already. Finally, the synchronization added to "send" and "onStateChange" had the nice side effect of making the pending data channel messages thread-safe too, as before it could happen that a message was enqueued when the pending messages were being sent, which caused a "ConcurrentModificationException". Signed-off-by: Daniel Calviño Sánchez --- .../talk/webrtc/PeerConnectionWrapper.java | 84 +++- .../talk/webrtc/PeerConnectionWrapperTest.kt | 375 ++++++++++++++++++ 2 files changed, 437 insertions(+), 22 deletions(-) diff --git a/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java b/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java index 08da1f7267..13bb5f2f7b 100644 --- a/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java +++ b/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java @@ -235,7 +235,7 @@ public MediaStream getStream() { return stream; } - public void removePeerConnection() { + public synchronized void removePeerConnection() { signalingMessageReceiver.removeListener(webRtcMessageListener); for (DataChannel dataChannel: dataChannels.values()) { @@ -292,7 +292,7 @@ private void addCandidate(IceCandidate iceCandidate) { * * @param dataChannelMessage the message to send */ - public void send(DataChannelMessage dataChannelMessage) { + public synchronized void send(DataChannelMessage dataChannelMessage) { if (dataChannelMessage == null) { return; } @@ -414,20 +414,38 @@ public void onBufferedAmountChange(long l) { @Override public void onStateChange() { - if (dataChannel.state() == DataChannel.State.OPEN && "status".equals(dataChannelLabel)) { - for (DataChannelMessage dataChannelMessage: pendingDataChannelMessages) { - send(dataChannelMessage); + synchronized (PeerConnectionWrapper.this) { + // The PeerConnection could have been removed in parallel even with the synchronization (as just after + // "onStateChange" was called "removePeerConnection" could have acquired the lock). + if (peerConnection == null) { + return; } - pendingDataChannelMessages.clear(); - } - if (dataChannel.state() == DataChannel.State.OPEN) { - sendInitialMediaStatus(); + if (dataChannel.state() == DataChannel.State.OPEN && "status".equals(dataChannelLabel)) { + for (DataChannelMessage dataChannelMessage : pendingDataChannelMessages) { + send(dataChannelMessage); + } + pendingDataChannelMessages.clear(); + } + + if (dataChannel.state() == DataChannel.State.OPEN) { + sendInitialMediaStatus(); + } } } @Override public void onMessage(DataChannel.Buffer buffer) { + synchronized (PeerConnectionWrapper.this) { + // It is assumed that, even if its data channel was disposed, its buffers can be used while there is + // a reference to them, so it would not be necessary to check this from a thread-safety point of view. + // Nevertheless, if the remote peer connection was removed it would not make sense to notify the + // listeners anyway. + if (peerConnection == null) { + return; + } + } + if (buffer.binary) { Log.d(TAG, "Received binary data channel message over " + dataChannelLabel + " " + sessionId); return; @@ -557,23 +575,45 @@ public void onRemoveStream(MediaStream mediaStream) { @Override public void onDataChannel(DataChannel dataChannel) { - // Another data channel with the same label, no matter if the same instance or a different one, should not - // be added, but just in case. - DataChannel oldDataChannel = dataChannels.get(dataChannel.label()); - if (oldDataChannel == dataChannel) { - Log.w(TAG, "Data channel with label " + dataChannel.label() + " added again"); + synchronized (PeerConnectionWrapper.this) { + // Another data channel with the same label, no matter if the same instance or a different one, should + // not be added, but this is handled just in case. + // Moreover, if it were possible that an already added data channel was added again there would be a + // potential race condition with "removePeerConnection", even with the synchronization, as it would + // be possible that "onDataChannel" was called, then "removePeerConnection" disposed the data + // channel, and then "onDataChannel" continued in the synchronized statements and tried to get the + // label, which would throw an exception due to the data channel having been disposed already. + String dataChannelLabel; + try { + dataChannelLabel = dataChannel.label(); + } catch (IllegalStateException e) { + // The data channel was disposed already, nothing to do. + return; + } - return; - } + DataChannel oldDataChannel = dataChannels.get(dataChannelLabel); + if (oldDataChannel == dataChannel) { + Log.w(TAG, "Data channel with label " + dataChannel.label() + " added again"); - if (oldDataChannel != null) { - Log.w(TAG, "Data channel with label " + dataChannel.label() + " exists"); + return; + } - oldDataChannel.dispose(); - } + if (oldDataChannel != null) { + Log.w(TAG, "Data channel with label " + dataChannel.label() + " exists"); + + oldDataChannel.dispose(); + } + + // If the peer connection was removed in parallel dispose the data channel instead of adding it. + if (peerConnection == null) { + dataChannel.dispose(); - dataChannel.registerObserver(new DataChannelObserver(dataChannel)); - dataChannels.put(dataChannel.label(), dataChannel); + return; + } + + dataChannel.registerObserver(new DataChannelObserver(dataChannel)); + dataChannels.put(dataChannel.label(), dataChannel); + } } @Override diff --git a/app/src/test/java/com/nextcloud/talk/webrtc/PeerConnectionWrapperTest.kt b/app/src/test/java/com/nextcloud/talk/webrtc/PeerConnectionWrapperTest.kt index 61c5f5daa4..a7819fa6f6 100644 --- a/app/src/test/java/com/nextcloud/talk/webrtc/PeerConnectionWrapperTest.kt +++ b/app/src/test/java/com/nextcloud/talk/webrtc/PeerConnectionWrapperTest.kt @@ -19,14 +19,20 @@ import org.mockito.ArgumentMatchers.any import org.mockito.ArgumentMatchers.argThat import org.mockito.ArgumentMatchers.eq import org.mockito.Mockito +import org.mockito.Mockito.atLeast +import org.mockito.Mockito.atMostOnce +import org.mockito.Mockito.doAnswer import org.mockito.Mockito.doNothing import org.mockito.Mockito.never +import org.mockito.invocation.InvocationOnMock +import org.mockito.stubbing.Answer import org.webrtc.DataChannel import org.webrtc.MediaConstraints import org.webrtc.PeerConnection import org.webrtc.PeerConnectionFactory import java.nio.ByteBuffer import java.util.HashMap +import kotlin.concurrent.thread class PeerConnectionWrapperTest { @@ -36,6 +42,23 @@ class PeerConnectionWrapperTest { private var mockedSignalingMessageReceiver: SignalingMessageReceiver? = null private var mockedSignalingMessageSender: SignalingMessageSender? = null + /** + * Helper answer for DataChannel methods. + */ + private class ReturnValueOrThrowIfDisposed(val value: T) : + Answer { + override fun answer(currentInvocation: InvocationOnMock): T { + if (Mockito.mockingDetails(currentInvocation.mock).invocations.find { + it!!.method.name === "dispose" + } !== null + ) { + throw IllegalStateException("DataChannel has been disposed") + } + + return value + } + } + /** * Helper matcher for DataChannelMessages. */ @@ -195,6 +218,83 @@ class PeerConnectionWrapperTest { ) } + @Test + fun testSendDataChannelMessageBeforeOpeningDataChannelWithDifferentThreads() { + // A brute force approach is used to test race conditions between different threads just repeating the test + // several times. Due to this the test passing could be a false positive, as it could have been a matter of + // luck, but even if the test may wrongly pass sometimes it is better than nothing (although, in general, with + // that number of reruns, it fails when it should). + for (i in 1..1000) { + Mockito.`when`( + mockedPeerConnectionFactory!!.createPeerConnection( + any(PeerConnection.RTCConfiguration::class.java), + any(PeerConnection.Observer::class.java) + ) + ).thenReturn(mockedPeerConnection) + + val mockedStatusDataChannel = Mockito.mock(DataChannel::class.java) + Mockito.`when`(mockedStatusDataChannel.label()).thenReturn("status") + Mockito.`when`(mockedStatusDataChannel.state()).thenReturn(DataChannel.State.CONNECTING) + Mockito.`when`(mockedPeerConnection!!.createDataChannel(eq("status"), any())) + .thenReturn(mockedStatusDataChannel) + + val statusDataChannelObserverArgumentCaptor: ArgumentCaptor = + ArgumentCaptor.forClass(DataChannel.Observer::class.java) + + doNothing().`when`(mockedStatusDataChannel) + .registerObserver(statusDataChannelObserverArgumentCaptor.capture()) + + peerConnectionWrapper = PeerConnectionWrapper( + mockedPeerConnectionFactory, + ArrayList(), + MediaConstraints(), + "the-session-id", + "the-local-session-id", + null, + true, + true, + "video", + mockedSignalingMessageReceiver, + mockedSignalingMessageSender + ) + + val dataChannelMessageCount = 5 + + val sendThread = thread { + for (j in 1..dataChannelMessageCount) { + peerConnectionWrapper!!.send(DataChannelMessage("the-message-type-$j")) + } + } + + // Exceptions thrown in threads are not propagated to the main thread, so it needs to be explicitly done + // (for example, for ConcurrentModificationExceptions when iterating over the data channel messages). + var exceptionOnStateChange: Exception? = null + + val openDataChannelThread = thread { + Mockito.`when`(mockedStatusDataChannel.state()).thenReturn(DataChannel.State.OPEN) + + try { + statusDataChannelObserverArgumentCaptor.value.onStateChange() + } catch (e: Exception) { + exceptionOnStateChange = e + } + } + + sendThread.join() + openDataChannelThread.join() + + if (exceptionOnStateChange !== null) { + throw exceptionOnStateChange!! + } + + for (j in 1..dataChannelMessageCount) { + Mockito.verify(mockedStatusDataChannel).send( + argThat(MatchesDataChannelMessage(DataChannelMessage("the-message-type-$j"))) + ) + } + } + } + @Test fun testReceiveDataChannelMessage() { Mockito.`when`( @@ -381,4 +481,279 @@ class PeerConnectionWrapperTest { Mockito.verify(mockedStatusDataChannel).dispose() Mockito.verify(mockedRandomIdDataChannel).dispose() } + + @Test + fun testRemovePeerConnectionWhileAddingRemoteDataChannelsWithDifferentThreads() { + // A brute force approach is used to test race conditions between different threads just repeating the test + // several times. Due to this the test passing could be a false positive, as it could have been a matter of + // luck, but even if the test may wrongly pass sometimes it is better than nothing (although, in general, with + // that number of reruns, it fails when it should). + for (i in 1..1000) { + val peerConnectionObserverArgumentCaptor: ArgumentCaptor = + ArgumentCaptor.forClass(PeerConnection.Observer::class.java) + + Mockito.`when`( + mockedPeerConnectionFactory!!.createPeerConnection( + any(PeerConnection.RTCConfiguration::class.java), + peerConnectionObserverArgumentCaptor.capture() + ) + ).thenReturn(mockedPeerConnection) + + val mockedStatusDataChannel = Mockito.mock(DataChannel::class.java) + Mockito.`when`(mockedStatusDataChannel.label()).thenAnswer(ReturnValueOrThrowIfDisposed("status")) + Mockito.`when`(mockedStatusDataChannel.state()).thenAnswer( + ReturnValueOrThrowIfDisposed(DataChannel.State.OPEN) + ) + Mockito.`when`(mockedPeerConnection!!.createDataChannel(eq("status"), any())) + .thenReturn(mockedStatusDataChannel) + + peerConnectionWrapper = PeerConnectionWrapper( + mockedPeerConnectionFactory, + ArrayList(), + MediaConstraints(), + "the-session-id", + "the-local-session-id", + null, + true, + true, + "video", + mockedSignalingMessageReceiver, + mockedSignalingMessageSender + ) + + val dataChannelCount = 5 + + val mockedRandomIdDataChannels: MutableList = ArrayList() + val dataChannelObservers: MutableList = ArrayList() + for (j in 0.. + if (Mockito.mockingDetails(invocation.mock).invocations.find { + it!!.method.name === "dispose" + } !== null + ) { + throw IllegalStateException("DataChannel has been disposed") + } + + dataChannelObservers[j] = invocation.getArgument(0, DataChannel.Observer::class.java) + + null + }.`when`(mockedRandomIdDataChannels[j]).registerObserver(any()) + } + + val onDataChannelThread = thread { + // Add again "status" data channel to test that it is correctly disposed also in that case (which + // should not happen anyway even if it was added by the remote peer, but just in case) + peerConnectionObserverArgumentCaptor.value.onDataChannel(mockedStatusDataChannel) + + for (j in 0.. = + ArgumentCaptor.forClass(PeerConnection.Observer::class.java) + + Mockito.`when`( + mockedPeerConnectionFactory!!.createPeerConnection( + any(PeerConnection.RTCConfiguration::class.java), + peerConnectionObserverArgumentCaptor.capture() + ) + ).thenReturn(mockedPeerConnection) + + val mockedStatusDataChannel = Mockito.mock(DataChannel::class.java) + + Mockito.`when`(mockedStatusDataChannel.label()).thenAnswer(ReturnValueOrThrowIfDisposed("status")) + Mockito.`when`(mockedStatusDataChannel.state()) + .thenAnswer(ReturnValueOrThrowIfDisposed(DataChannel.State.OPEN)) + Mockito.`when`(mockedStatusDataChannel.send(any())).thenAnswer(ReturnValueOrThrowIfDisposed(true)) + Mockito.`when`(mockedPeerConnection!!.createDataChannel(eq("status"), any())) + .thenReturn(mockedStatusDataChannel) + + peerConnectionWrapper = PeerConnectionWrapper( + mockedPeerConnectionFactory, + ArrayList(), + MediaConstraints(), + "the-session-id", + "the-local-session-id", + null, + true, + true, + "video", + mockedSignalingMessageReceiver, + mockedSignalingMessageSender + ) + + val dataChannelMessageCount = 5 + + // Exceptions thrown in threads are not propagated to the main thread, so it needs to be explicitly done + // (for example, for IllegalStateExceptions when using a disposed data channel). + var exceptionSend: Exception? = null + + val sendThread = thread { + try { + for (j in 0.. = + ArgumentCaptor.forClass(PeerConnection.Observer::class.java) + + Mockito.`when`( + mockedPeerConnectionFactory!!.createPeerConnection( + any(PeerConnection.RTCConfiguration::class.java), + peerConnectionObserverArgumentCaptor.capture() + ) + ).thenReturn(mockedPeerConnection) + + val mockedStatusDataChannel = Mockito.mock(DataChannel::class.java) + Mockito.`when`(mockedStatusDataChannel.label()).thenAnswer(ReturnValueOrThrowIfDisposed("status")) + Mockito.`when`(mockedStatusDataChannel.state()).thenAnswer( + ReturnValueOrThrowIfDisposed(DataChannel.State.OPEN) + ) + Mockito.`when`(mockedPeerConnection!!.createDataChannel(eq("status"), any())) + .thenReturn(mockedStatusDataChannel) + + val statusDataChannelObserverArgumentCaptor: ArgumentCaptor = + ArgumentCaptor.forClass(DataChannel.Observer::class.java) + + doNothing().`when`(mockedStatusDataChannel) + .registerObserver(statusDataChannelObserverArgumentCaptor.capture()) + + peerConnectionWrapper = PeerConnectionWrapper( + mockedPeerConnectionFactory, + ArrayList(), + MediaConstraints(), + "the-session-id", + "the-local-session-id", + null, + true, + true, + "video", + mockedSignalingMessageReceiver, + mockedSignalingMessageSender + ) + + val mockedDataChannelMessageListener = Mockito.mock(DataChannelMessageListener::class.java) + peerConnectionWrapper!!.addListener(mockedDataChannelMessageListener) + + // Exceptions thrown in threads are not propagated to the main thread, so it needs to be explicitly done + // (for example, for IllegalStateExceptions when using a disposed data channel). + var exceptionOnMessage: Exception? = null + + val onMessageThread = thread { + try { + // It is assumed that, even if its data channel was disposed, its buffers can be used while there + // is a reference to them, so no special mock behaviour is added to throw an exception in that case. + statusDataChannelObserverArgumentCaptor.value.onMessage( + dataChannelMessageToBuffer(DataChannelMessage("audioOn")) + ) + + statusDataChannelObserverArgumentCaptor.value.onMessage( + dataChannelMessageToBuffer(DataChannelMessage("audioOff")) + ) + } catch (e: Exception) { + exceptionOnMessage = e + } + } + + val removePeerConnectionThread = thread { + peerConnectionWrapper!!.removePeerConnection() + } + + onMessageThread.join() + removePeerConnectionThread.join() + + if (exceptionOnMessage !== null) { + throw exceptionOnMessage!! + } + + Mockito.verify(mockedStatusDataChannel).registerObserver(any()) + Mockito.verify(mockedStatusDataChannel).dispose() + Mockito.verify(mockedStatusDataChannel, atLeast(0)).label() + Mockito.verify(mockedStatusDataChannel, atLeast(0)).state() + Mockito.verifyNoMoreInteractions(mockedStatusDataChannel) + Mockito.verify(mockedDataChannelMessageListener, atMostOnce()).onAudioOn() + Mockito.verify(mockedDataChannelMessageListener, atMostOnce()).onAudioOff() + Mockito.verifyNoMoreInteractions(mockedDataChannelMessageListener) + } + } } From 1ac2c413cdb5d365cf815fda7316f53f6d77c36c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Calvi=C3=B1o=20S=C3=A1nchez?= Date: Mon, 9 Dec 2024 18:36:22 +0100 Subject: [PATCH 16/16] Fix "send" not respecting order of pending messages MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When the data channel is not open yet data channel messages are queued and then sent once opened. "onStateChange" is called from the WebRTC signaling thread, while "send" can be called potentially from any thread, so to send the data channel messages in the same order that they were added new messages need to be enqueued until all the pending messages have been sent. Otherwise, even if there is synchronization already, it could happen that "onStateChange" was called but, before getting the lock, "send" gets it and sends the new message before the pending messages were sent. Signed-off-by: Daniel Calviño Sánchez --- .../com/nextcloud/talk/webrtc/PeerConnectionWrapper.java | 9 +++++++-- .../nextcloud/talk/webrtc/PeerConnectionWrapperTest.kt | 5 ++++- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java b/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java index 13bb5f2f7b..9f2a90f859 100644 --- a/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java +++ b/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java @@ -298,7 +298,8 @@ public synchronized void send(DataChannelMessage dataChannelMessage) { } DataChannel statusDataChannel = dataChannels.get("status"); - if (statusDataChannel == null || statusDataChannel.state() != DataChannel.State.OPEN) { + if (statusDataChannel == null || statusDataChannel.state() != DataChannel.State.OPEN || + !pendingDataChannelMessages.isEmpty()) { Log.d(TAG, "Queuing data channel message (" + dataChannelMessage + ") " + sessionId); pendingDataChannelMessages.add(dataChannelMessage); @@ -306,6 +307,10 @@ public synchronized void send(DataChannelMessage dataChannelMessage) { return; } + sendWithoutQueuing(statusDataChannel, dataChannelMessage); + } + + private void sendWithoutQueuing(DataChannel statusDataChannel, DataChannelMessage dataChannelMessage) { try { Log.d(TAG, "Sending data channel message (" + dataChannelMessage + ") " + sessionId); @@ -423,7 +428,7 @@ public void onStateChange() { if (dataChannel.state() == DataChannel.State.OPEN && "status".equals(dataChannelLabel)) { for (DataChannelMessage dataChannelMessage : pendingDataChannelMessages) { - send(dataChannelMessage); + sendWithoutQueuing(dataChannel, dataChannelMessage); } pendingDataChannelMessages.clear(); } diff --git a/app/src/test/java/com/nextcloud/talk/webrtc/PeerConnectionWrapperTest.kt b/app/src/test/java/com/nextcloud/talk/webrtc/PeerConnectionWrapperTest.kt index a7819fa6f6..f49c25ec09 100644 --- a/app/src/test/java/com/nextcloud/talk/webrtc/PeerConnectionWrapperTest.kt +++ b/app/src/test/java/com/nextcloud/talk/webrtc/PeerConnectionWrapperTest.kt @@ -23,6 +23,7 @@ import org.mockito.Mockito.atLeast import org.mockito.Mockito.atMostOnce import org.mockito.Mockito.doAnswer import org.mockito.Mockito.doNothing +import org.mockito.Mockito.inOrder import org.mockito.Mockito.never import org.mockito.invocation.InvocationOnMock import org.mockito.stubbing.Answer @@ -287,8 +288,10 @@ class PeerConnectionWrapperTest { throw exceptionOnStateChange!! } + val inOrder = inOrder(mockedStatusDataChannel) + for (j in 1..dataChannelMessageCount) { - Mockito.verify(mockedStatusDataChannel).send( + inOrder.verify(mockedStatusDataChannel).send( argThat(MatchesDataChannelMessage(DataChannelMessage("the-message-type-$j"))) ) }