Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve handling of data channels #4536

Open
wants to merge 16 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions app/src/main/java/com/nextcloud/talk/activities/CallActivity.kt
Original file line number Diff line number Diff line change
Expand Up @@ -1174,12 +1174,12 @@ class CallActivity : CallBaseActivity() {
if (isConnectionEstablished && othersInCall) {
if (!hasMCU) {
for (peerConnectionWrapper in peerConnectionWrapperList) {
peerConnectionWrapper.sendChannelData(DataChannelMessage(isSpeakingMessage))
peerConnectionWrapper.send(DataChannelMessage(isSpeakingMessage))
}
} else {
for (peerConnectionWrapper in peerConnectionWrapperList) {
if (peerConnectionWrapper.sessionId == webSocketClient!!.sessionId) {
peerConnectionWrapper.sendChannelData(DataChannelMessage(isSpeakingMessage))
peerConnectionWrapper.send(DataChannelMessage(isSpeakingMessage))
break
}
}
Expand Down Expand Up @@ -1370,12 +1370,12 @@ class CallActivity : CallBaseActivity() {
if (isConnectionEstablished) {
if (!hasMCU) {
for (peerConnectionWrapper in peerConnectionWrapperList) {
peerConnectionWrapper.sendChannelData(DataChannelMessage(message))
peerConnectionWrapper.send(DataChannelMessage(message))
}
} else {
for (peerConnectionWrapper in peerConnectionWrapperList) {
if (peerConnectionWrapper.sessionId == webSocketClient!!.sessionId) {
peerConnectionWrapper.sendChannelData(DataChannelMessage(message))
peerConnectionWrapper.send(DataChannelMessage(message))
break
}
}
Expand Down Expand Up @@ -2563,7 +2563,7 @@ class CallActivity : CallBaseActivity() {
}

override fun onNext(aLong: Long) {
peerConnectionWrapper.sendChannelData(dataChannelMessage)
peerConnectionWrapper.send(dataChannelMessage)
}

override fun onError(e: Throwable) {
Expand Down
184 changes: 140 additions & 44 deletions app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,9 @@
*/
package com.nextcloud.talk.webrtc;

import android.content.Context;
import android.util.Log;

import com.bluelinelabs.logansquare.LoganSquare;
import com.nextcloud.talk.application.NextcloudTalkApplication;
import com.nextcloud.talk.models.json.signaling.DataChannelMessage;
import com.nextcloud.talk.models.json.signaling.NCIceCandidate;
import com.nextcloud.talk.models.json.signaling.NCMessagePayload;
Expand All @@ -36,21 +34,15 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;

import javax.inject.Inject;

import androidx.annotation.Nullable;
import autodagger.AutoInjector;

@AutoInjector(NextcloudTalkApplication.class)
public class PeerConnectionWrapper {

@Inject
Context context;

private static final String TAG = PeerConnectionWrapper.class.getCanonicalName();

private final SignalingMessageReceiver signalingMessageReceiver;
Expand All @@ -66,7 +58,8 @@ public class PeerConnectionWrapper {
private PeerConnection peerConnection;
private String sessionId;
private final MediaConstraints mediaConstraints;
private DataChannel dataChannel;
private final Map<String, DataChannel> dataChannels = new HashMap<>();
private final List<DataChannelMessage> pendingDataChannelMessages = new ArrayList<>();
private final SdpObserver sdpObserver;

private final boolean hasInitiated;
Expand All @@ -81,6 +74,9 @@ public class PeerConnectionWrapper {
/**
* Listener for data channel messages.
* <p>
* Messages might have been received on any data channel, independently of its label or whether it was open by the
* local or the remote peer.
* <p>
* The messages are bound to a specific peer connection, so each listener is expected to handle messages only for
* a single peer connection.
* <p>
Expand Down Expand Up @@ -117,9 +113,6 @@ public PeerConnectionWrapper(PeerConnectionFactory peerConnectionFactory,
boolean isMCUPublisher, boolean hasMCU, String videoStreamType,
SignalingMessageReceiver signalingMessageReceiver,
SignalingMessageSender signalingMessageSender) {

Objects.requireNonNull(NextcloudTalkApplication.Companion.getSharedApplication()).getComponentApplication().inject(this);

this.localStream = localStream;
this.videoStreamType = videoStreamType;

Expand Down Expand Up @@ -153,8 +146,11 @@ public PeerConnectionWrapper(PeerConnectionFactory peerConnectionFactory,
if (hasMCU || hasInitiated) {
DataChannel.Init init = new DataChannel.Init();
init.negotiated = false;
dataChannel = peerConnection.createDataChannel("status", init);
dataChannel.registerObserver(new DataChannelObserver());

DataChannel statusDataChannel = peerConnection.createDataChannel("status", init);
statusDataChannel.registerObserver(new DataChannelObserver(statusDataChannel));
dataChannels.put("status", statusDataChannel);

if (isMCUPublisher) {
peerConnection.createOffer(sdpObserver, mediaConstraints);
} else if (hasMCU && "video".equals(this.videoStreamType)) {
Expand Down Expand Up @@ -239,16 +235,15 @@ public MediaStream getStream() {
return stream;
}

public void removePeerConnection() {
public synchronized void removePeerConnection() {
signalingMessageReceiver.removeListener(webRtcMessageListener);

if (dataChannel != null) {
for (DataChannel dataChannel: dataChannels.values()) {
Log.d(TAG, "Disposed DataChannel " + dataChannel.label());

dataChannel.dispose();
dataChannel = null;
Log.d(TAG, "Disposed DataChannel");
} else {
Log.d(TAG, "DataChannel is null.");
}
dataChannels.clear();

if (peerConnection != null) {
peerConnection.close();
Expand Down Expand Up @@ -278,15 +273,51 @@ private void addCandidate(IceCandidate iceCandidate) {
}
}

public void sendChannelData(DataChannelMessage dataChannelMessage) {
ByteBuffer buffer;
if (dataChannel != null && dataChannelMessage != null) {
try {
buffer = ByteBuffer.wrap(LoganSquare.serialize(dataChannelMessage).getBytes());
dataChannel.send(new DataChannel.Buffer(buffer, false));
} catch (Exception e) {
Log.d(TAG, "Failed to send channel data, attempting regular " + dataChannelMessage);
}
/**
* Sends a data channel message.
* <p>
* Data channel messages are always sent on the "status" data channel locally opened. However, if Janus is used,
* messages can be sent only on publisher connections, even if subscriber connections have a "status" data channel;
* messages sent on subscriber connections will be simply ignored. Moreover, even if the message is sent on the
* "status" data channel subscriber connections will receive it on a data channel with a different label, as
* Janus opens its own data channel on subscriber connections and "multiplexes" all the received data channel
* messages on it, independently of on which data channel they were originally sent.
* <p>
* Data channel messages can be sent at any time; if the "status" data channel is not open yet the messages will be
* queued and sent once it is opened. Nevertheless, if Janus is used, it is not guaranteed that the messages will
* be received by other participants, as it is only known when the data channel of the publisher was opened, but
* not if the data channel of the subscribers was. However, in general this should be a concern only during the
* first seconds after a participant joins; after some time the subscriber connections should be established and
* their data channels open.
*
* @param dataChannelMessage the message to send
*/
public synchronized void send(DataChannelMessage dataChannelMessage) {
if (dataChannelMessage == null) {
return;
}

DataChannel statusDataChannel = dataChannels.get("status");
if (statusDataChannel == null || statusDataChannel.state() != DataChannel.State.OPEN ||
!pendingDataChannelMessages.isEmpty()) {
Log.d(TAG, "Queuing data channel message (" + dataChannelMessage + ") " + sessionId);

pendingDataChannelMessages.add(dataChannelMessage);

return;
}

sendWithoutQueuing(statusDataChannel, dataChannelMessage);
}

private void sendWithoutQueuing(DataChannel statusDataChannel, DataChannelMessage dataChannelMessage) {
try {
Log.d(TAG, "Sending data channel message (" + dataChannelMessage + ") " + sessionId);

ByteBuffer buffer = ByteBuffer.wrap(LoganSquare.serialize(dataChannelMessage).getBytes());
statusDataChannel.send(new DataChannel.Buffer(buffer, false));
} catch (Exception e) {
Log.w(TAG, "Failed to send data channel message");
}
}

Expand All @@ -301,15 +332,15 @@ public String getSessionId() {
private void sendInitialMediaStatus() {
if (localStream != null) {
if (localStream.videoTracks.size() == 1 && localStream.videoTracks.get(0).enabled()) {
sendChannelData(new DataChannelMessage("videoOn"));
send(new DataChannelMessage("videoOn"));
} else {
sendChannelData(new DataChannelMessage("videoOff"));
send(new DataChannelMessage("videoOff"));
}

if (localStream.audioTracks.size() == 1 && localStream.audioTracks.get(0).enabled()) {
sendChannelData(new DataChannelMessage("audioOn"));
send(new DataChannelMessage("audioOn"));
} else {
sendChannelData(new DataChannelMessage("audioOff"));
send(new DataChannelMessage("audioOff"));
}
}
}
Expand Down Expand Up @@ -373,31 +404,63 @@ public void onEndOfCandidates() {

private class DataChannelObserver implements DataChannel.Observer {

private final DataChannel dataChannel;
private final String dataChannelLabel;

public DataChannelObserver(DataChannel dataChannel) {
this.dataChannel = Objects.requireNonNull(dataChannel);
this.dataChannelLabel = dataChannel.label();
}

@Override
public void onBufferedAmountChange(long l) {

}

@Override
public void onStateChange() {
if (dataChannel != null &&
dataChannel.state() == DataChannel.State.OPEN) {
sendInitialMediaStatus();
synchronized (PeerConnectionWrapper.this) {
// The PeerConnection could have been removed in parallel even with the synchronization (as just after
// "onStateChange" was called "removePeerConnection" could have acquired the lock).
if (peerConnection == null) {
return;
}

if (dataChannel.state() == DataChannel.State.OPEN && "status".equals(dataChannelLabel)) {
for (DataChannelMessage dataChannelMessage : pendingDataChannelMessages) {
sendWithoutQueuing(dataChannel, dataChannelMessage);
}
pendingDataChannelMessages.clear();
}

if (dataChannel.state() == DataChannel.State.OPEN) {
sendInitialMediaStatus();
}
}
}

@Override
public void onMessage(DataChannel.Buffer buffer) {
synchronized (PeerConnectionWrapper.this) {
// It is assumed that, even if its data channel was disposed, its buffers can be used while there is
// a reference to them, so it would not be necessary to check this from a thread-safety point of view.
// Nevertheless, if the remote peer connection was removed it would not make sense to notify the
// listeners anyway.
if (peerConnection == null) {
return;
}
}

if (buffer.binary) {
Log.d(TAG, "Received binary msg over " + TAG + " " + sessionId);
Log.d(TAG, "Received binary data channel message over " + dataChannelLabel + " " + sessionId);
return;
}

ByteBuffer data = buffer.data;
final byte[] bytes = new byte[data.capacity()];
data.get(bytes);
String strData = new String(bytes);
Log.d(TAG, "Got msg: " + strData + " over " + TAG + " " + sessionId);
Log.d(TAG, "Received data channel message (" + strData + ") over " + dataChannelLabel + " " + sessionId);

DataChannelMessage dataChannelMessage;
try {
Expand Down Expand Up @@ -517,12 +580,45 @@ public void onRemoveStream(MediaStream mediaStream) {

@Override
public void onDataChannel(DataChannel dataChannel) {
if (PeerConnectionWrapper.this.dataChannel != null) {
Log.w(TAG, "Data channel with label " + PeerConnectionWrapper.this.dataChannel.label()
+ " exists, but received onDataChannel event for DataChannel with label " + dataChannel.label());
synchronized (PeerConnectionWrapper.this) {
// Another data channel with the same label, no matter if the same instance or a different one, should
// not be added, but this is handled just in case.
// Moreover, if it were possible that an already added data channel was added again there would be a
// potential race condition with "removePeerConnection", even with the synchronization, as it would
// be possible that "onDataChannel" was called, then "removePeerConnection" disposed the data
// channel, and then "onDataChannel" continued in the synchronized statements and tried to get the
// label, which would throw an exception due to the data channel having been disposed already.
String dataChannelLabel;
try {
dataChannelLabel = dataChannel.label();
} catch (IllegalStateException e) {
// The data channel was disposed already, nothing to do.
return;
}

DataChannel oldDataChannel = dataChannels.get(dataChannelLabel);
if (oldDataChannel == dataChannel) {
Log.w(TAG, "Data channel with label " + dataChannel.label() + " added again");

return;
}

if (oldDataChannel != null) {
Log.w(TAG, "Data channel with label " + dataChannel.label() + " exists");

oldDataChannel.dispose();
}

// If the peer connection was removed in parallel dispose the data channel instead of adding it.
if (peerConnection == null) {
dataChannel.dispose();

return;
}

dataChannel.registerObserver(new DataChannelObserver(dataChannel));
dataChannels.put(dataChannel.label(), dataChannel);
}
PeerConnectionWrapper.this.dataChannel = dataChannel;
PeerConnectionWrapper.this.dataChannel.registerObserver(new DataChannelObserver());
}

@Override
Expand Down
51 changes: 51 additions & 0 deletions app/src/test/java/android/util/Log.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Nextcloud Talk - Android Client
*
* SPDX-FileCopyrightText: 2024 Daniel Calviño Sánchez <[email protected]>
* SPDX-License-Identifier: GPL-3.0-or-later
*/
package android.util;

/**
* Dummy implementation of android.util.Log to be used in unit tests.
* <p>
* The Android Gradle plugin provides a library with the APIs of the Android framework that throws an exception if any
* of them are called. This class is loaded before that library and therefore becomes the implementation used during the
* tests, simply printing the messages to the system console.
*/
public class Log {

public static int d(String tag, String msg) {
System.out.println("DEBUG: " + tag + ": " + msg);

return 1;
}

public static int e(String tag, String msg) {
System.out.println("ERROR: " + tag + ": " + msg);

return 1;
}

public static int i(String tag, String msg) {
System.out.println("INFO: " + tag + ": " + msg);

return 1;
}

public static boolean isLoggable(String tag, int level) {
return true;
}

public static int v(String tag, String msg) {
System.out.println("VERBOSE: " + tag + ": " + msg);

return 1;
}

public static int w(String tag, String msg) {
System.out.println("WARN: " + tag + ": " + msg);

return 1;
}
}
Loading
Loading