From 9b3a2cc9e5a83061f60402e3cb806f4ac8c5fa0d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Wala?= Date: Wed, 28 Aug 2024 12:32:36 +0200 Subject: [PATCH 1/2] Include DataChannel stats in `get_state` --- lib/ex_webrtc/peer_connection.ex | 6 +++ lib/ex_webrtc/sctp_transport.ex | 70 ++++++++++++++++++++++++++++---- 2 files changed, 68 insertions(+), 8 deletions(-) diff --git a/lib/ex_webrtc/peer_connection.ex b/lib/ex_webrtc/peer_connection.ex index d6b19d4..1277264 100644 --- a/lib/ex_webrtc/peer_connection.ex +++ b/lib/ex_webrtc/peer_connection.ex @@ -1018,6 +1018,11 @@ defmodule ExWebRTC.PeerConnection do |> Enum.flat_map(&RTPTransceiver.get_stats(&1, timestamp)) |> Map.new(fn stats -> {stats.id, stats} end) + data_channel_stats = + state.sctp_transport + |> SCTPTransport.get_stats(timestamp) + |> Map.new(fn stats -> {stats.id, stats} end) + stats = %{ peer_connection: %{ id: :peer_connection, @@ -1057,6 +1062,7 @@ defmodule ExWebRTC.PeerConnection do |> Map.merge(local_cands) |> Map.merge(remote_cands) |> Map.merge(rtp_stats) + |> Map.merge(data_channel_stats) {:reply, stats, state} end diff --git a/lib/ex_webrtc/sctp_transport.ex b/lib/ex_webrtc/sctp_transport.ex index 594e1c7..79c1d76 100644 --- a/lib/ex_webrtc/sctp_transport.ex +++ b/lib/ex_webrtc/sctp_transport.ex @@ -23,7 +23,8 @@ defmodule ExWebRTC.SCTPTransport do connected: false, id_type: nil, timer: nil, - channels: %{} + channels: %{}, + stats: %{} } end @@ -45,6 +46,24 @@ defmodule ExWebRTC.SCTPTransport do sctp_transport.channels != %{} end + @spec get_stats(t(), non_neg_integer()) :: [map()] + def get_stats(sctp_transport, timestamp) do + Enum.map(sctp_transport.channels, fn {ref, channel} -> + stats = Map.fetch!(sctp_transport.stats, ref) + + %{ + id: inspect(channel.ref), + type: :data_channel, + timestamp: timestamp, + data_channel_identifier: channel.id, + label: channel.label, + protocol: channel.protocol, + state: channel.ready_state + } + |> Map.merge(stats) + end) + end + @spec add_channel( t(), String.t(), @@ -67,7 +86,8 @@ defmodule ExWebRTC.SCTPTransport do } channels = Map.put(sctp_transport.channels, channel.ref, channel) - sctp_transport = %{sctp_transport | channels: channels} + stats = Map.put(sctp_transport.stats, channel.ref, initial_stats()) + sctp_transport = %{sctp_transport | channels: channels, stats: stats} {events, sctp_transport} = if sctp_transport.connected do @@ -90,7 +110,8 @@ defmodule ExWebRTC.SCTPTransport do {[], sctp_transport} {%DataChannel{id: id}, channels} -> - sctp_transport = %{sctp_transport | channels: channels} + stats = Map.delete(sctp_transport.stats, ref) + sctp_transport = %{sctp_transport | channels: channels, stats: stats} {events, sctp_transport} = if id != nil do @@ -114,8 +135,9 @@ defmodule ExWebRTC.SCTPTransport do case Map.fetch(sctp_transport.channels, ref) do {:ok, %DataChannel{ready_state: :open, id: id}} when id != nil -> + stats = update_stats(sctp_transport.stats, ref, data, :sent) :ok = ExSCTP.send(sctp_transport.ref, id, ppi, data) - handle_events(sctp_transport) + handle_events(%{sctp_transport | stats: stats}) {:ok, %DataChannel{id: id}} -> Logger.warning( @@ -207,8 +229,9 @@ defmodule ExWebRTC.SCTPTransport do case Enum.find(sctp_transport.channels, fn {_k, v} -> v.id == id end) do {ref, %DataChannel{ref: ref}} -> channels = Map.delete(sctp_transport.channels, ref) + stats = Map.delete(sctp_transport.stats, ref) event = {:state_change, ref, :closed} - {event, %{sctp_transport | channels: channels}} + {event, %{sctp_transport | channels: channels, stats: stats}} _other -> {nil, sctp_transport} @@ -257,7 +280,9 @@ defmodule ExWebRTC.SCTPTransport do case Enum.find_value(sctp_transport.channels, fn {_k, v} -> v.id == id end) do {ref, %DataChannel{}} -> channels = Map.delete(sctp_transport.channels, ref) - {{:state_change, ref, :closed}, %{sctp_transport | channels: channels}} + stats = Map.delete(sctp_transport.stats, ref) + sctp_transport = %{sctp_transport | channels: channels, stats: stats} + {{:state_change, ref, :closed}, sctp_transport} nil -> {nil, sctp_transport} @@ -269,7 +294,8 @@ defmodule ExWebRTC.SCTPTransport do with {:ok, data} <- from_raw_data(data, ppi), {ref, %DataChannel{ready_state: :open}} <- Enum.find(sctp_transport.channels, fn {_k, v} -> v.id == id end) do - {{:data, ref, data}, sctp_transport} + stats = update_stats(sctp_transport.stats, ref, data, :received) + {{:data, ref, data}, %{sctp_transport | stats: stats}} else {_ref, %DataChannel{}} -> Logger.warning("Received data on DataChannel with id #{id} that is not open. Discarding") @@ -309,7 +335,8 @@ defmodule ExWebRTC.SCTPTransport do # In theory, we should also send the :open event here (W3C 6.2.3) # TODO channels = Map.put(sctp_transport.channels, channel.ref, channel) - sctp_transport = %{sctp_transport | channels: channels} + stats = Map.put(sctp_transport.stats, channel.ref, initial_stats()) + sctp_transport = %{sctp_transport | channels: channels, stats: stats} case ExSCTP.configure_stream( sctp_transport.ref, @@ -396,4 +423,31 @@ defmodule ExWebRTC.SCTPTransport do {:odd, 1} -> max_id + 2 end end + + defp initial_stats() do + %{ + messages_sent: 0, + messages_received: 0, + bytes_sent: 0, + bytes_received: 0 + } + end + + defp update_stats(stats, ref, data, type) do + Map.update!(stats, ref, fn stat -> + if type == :sent do + %{ + stat + | messages_sent: stat.messages_sent + 1, + bytes_sent: stat.bytes_sent + byte_size(data) + } + else + %{ + stat + | messages_received: stat.messages_received + 1, + bytes_received: stat.bytes_received + byte_size(data) + } + end + end) + end end From 8db92484e560260dbf4dc6167f809e55173e60cd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Wala?= Date: Wed, 28 Aug 2024 13:01:18 +0200 Subject: [PATCH 2/2] Add test --- test/ex_webrtc/data_channel_test.exs | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/test/ex_webrtc/data_channel_test.exs b/test/ex_webrtc/data_channel_test.exs index bfff4a3..1ca7115 100644 --- a/test/ex_webrtc/data_channel_test.exs +++ b/test/ex_webrtc/data_channel_test.exs @@ -157,7 +157,7 @@ defmodule ExWebRTC.DataChannelTest do %{pc1: pc1, pc2: pc2, ref1: ref1, ref2: ref2} end - test "message from initiating peer", %{pc1: pc1, pc2: pc2, ref1: ref1, ref2: ref2} do + test "from initiating peer", %{pc1: pc1, pc2: pc2, ref1: ref1, ref2: ref2} do data1 = <<1, 2, 3>> :ok = PeerConnection.send_data(pc1, ref1, data1) assert_receive {:ex_webrtc, ^pc2, {:data, ^ref2, ^data1}} @@ -200,5 +200,21 @@ defmodule ExWebRTC.DataChannelTest do :ok = PeerConnection.send_data(pc2, ref2, msg) assert_receive {:ex_webrtc, ^pc1, {:data, ^ref1, ^data}} end + + test "and collecting stats about it", %{pc1: pc1, pc2: pc2, ref1: ref1} do + for _ <- 1..10 do + :ok = PeerConnection.send_data(pc1, ref1, <<1, 2, 3>>) + end + + stats1 = PeerConnection.get_stats(pc1) + assert {_ref, channel_stats1} = Enum.find(stats1, fn {_, v} -> v.type == :data_channel end) + assert channel_stats1.bytes_sent == 30 + assert channel_stats1.messages_sent == 10 + + stats2 = PeerConnection.get_stats(pc2) + assert {_ref, channel_stats2} = Enum.find(stats2, fn {_, v} -> v.type == :data_channel end) + assert channel_stats2.bytes_received == 30 + assert channel_stats2.messages_received == 10 + end end end