Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Include DataChannel stats in PeerConnection.get_stats #162

Merged
merged 2 commits into from
Aug 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions lib/ex_webrtc/peer_connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -1018,6 +1018,11 @@ defmodule ExWebRTC.PeerConnection do
|> Enum.flat_map(&RTPTransceiver.get_stats(&1, timestamp))
|> Map.new(fn stats -> {stats.id, stats} end)

data_channel_stats =
state.sctp_transport
|> SCTPTransport.get_stats(timestamp)
|> Map.new(fn stats -> {stats.id, stats} end)

stats = %{
peer_connection: %{
id: :peer_connection,
Expand Down Expand Up @@ -1057,6 +1062,7 @@ defmodule ExWebRTC.PeerConnection do
|> Map.merge(local_cands)
|> Map.merge(remote_cands)
|> Map.merge(rtp_stats)
|> Map.merge(data_channel_stats)

{:reply, stats, state}
end
Expand Down
70 changes: 62 additions & 8 deletions lib/ex_webrtc/sctp_transport.ex
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ defmodule ExWebRTC.SCTPTransport do
connected: false,
id_type: nil,
timer: nil,
channels: %{}
channels: %{},
stats: %{}
}
end

Expand All @@ -45,6 +46,24 @@ defmodule ExWebRTC.SCTPTransport do
sctp_transport.channels != %{}
end

@spec get_stats(t(), non_neg_integer()) :: [map()]
def get_stats(sctp_transport, timestamp) do
Enum.map(sctp_transport.channels, fn {ref, channel} ->
stats = Map.fetch!(sctp_transport.stats, ref)

%{
id: inspect(channel.ref),
type: :data_channel,
timestamp: timestamp,
data_channel_identifier: channel.id,
label: channel.label,
protocol: channel.protocol,
state: channel.ready_state
}
|> Map.merge(stats)
end)
end

@spec add_channel(
t(),
String.t(),
Expand All @@ -67,7 +86,8 @@ defmodule ExWebRTC.SCTPTransport do
}

channels = Map.put(sctp_transport.channels, channel.ref, channel)
sctp_transport = %{sctp_transport | channels: channels}
stats = Map.put(sctp_transport.stats, channel.ref, initial_stats())
sctp_transport = %{sctp_transport | channels: channels, stats: stats}

{events, sctp_transport} =
if sctp_transport.connected do
Expand All @@ -90,7 +110,8 @@ defmodule ExWebRTC.SCTPTransport do
{[], sctp_transport}

{%DataChannel{id: id}, channels} ->
sctp_transport = %{sctp_transport | channels: channels}
stats = Map.delete(sctp_transport.stats, ref)
sctp_transport = %{sctp_transport | channels: channels, stats: stats}

{events, sctp_transport} =
if id != nil do
Expand All @@ -114,8 +135,9 @@ defmodule ExWebRTC.SCTPTransport do

case Map.fetch(sctp_transport.channels, ref) do
{:ok, %DataChannel{ready_state: :open, id: id}} when id != nil ->
stats = update_stats(sctp_transport.stats, ref, data, :sent)
:ok = ExSCTP.send(sctp_transport.ref, id, ppi, data)
handle_events(sctp_transport)
handle_events(%{sctp_transport | stats: stats})

{:ok, %DataChannel{id: id}} ->
Logger.warning(
Expand Down Expand Up @@ -207,8 +229,9 @@ defmodule ExWebRTC.SCTPTransport do
case Enum.find(sctp_transport.channels, fn {_k, v} -> v.id == id end) do
{ref, %DataChannel{ref: ref}} ->
channels = Map.delete(sctp_transport.channels, ref)
stats = Map.delete(sctp_transport.stats, ref)
event = {:state_change, ref, :closed}
{event, %{sctp_transport | channels: channels}}
{event, %{sctp_transport | channels: channels, stats: stats}}

_other ->
{nil, sctp_transport}
Expand Down Expand Up @@ -257,7 +280,9 @@ defmodule ExWebRTC.SCTPTransport do
case Enum.find_value(sctp_transport.channels, fn {_k, v} -> v.id == id end) do
{ref, %DataChannel{}} ->
channels = Map.delete(sctp_transport.channels, ref)
{{:state_change, ref, :closed}, %{sctp_transport | channels: channels}}
stats = Map.delete(sctp_transport.stats, ref)
sctp_transport = %{sctp_transport | channels: channels, stats: stats}
{{:state_change, ref, :closed}, sctp_transport}

nil ->
{nil, sctp_transport}
Expand All @@ -269,7 +294,8 @@ defmodule ExWebRTC.SCTPTransport do
with {:ok, data} <- from_raw_data(data, ppi),
{ref, %DataChannel{ready_state: :open}} <-
Enum.find(sctp_transport.channels, fn {_k, v} -> v.id == id end) do
{{:data, ref, data}, sctp_transport}
stats = update_stats(sctp_transport.stats, ref, data, :received)
{{:data, ref, data}, %{sctp_transport | stats: stats}}
else
{_ref, %DataChannel{}} ->
Logger.warning("Received data on DataChannel with id #{id} that is not open. Discarding")
Expand Down Expand Up @@ -309,7 +335,8 @@ defmodule ExWebRTC.SCTPTransport do
# In theory, we should also send the :open event here (W3C 6.2.3)
# TODO
channels = Map.put(sctp_transport.channels, channel.ref, channel)
sctp_transport = %{sctp_transport | channels: channels}
stats = Map.put(sctp_transport.stats, channel.ref, initial_stats())
sctp_transport = %{sctp_transport | channels: channels, stats: stats}

case ExSCTP.configure_stream(
sctp_transport.ref,
Expand Down Expand Up @@ -396,4 +423,31 @@ defmodule ExWebRTC.SCTPTransport do
{:odd, 1} -> max_id + 2
end
end

defp initial_stats() do
%{
messages_sent: 0,
messages_received: 0,
bytes_sent: 0,
bytes_received: 0
}
end

defp update_stats(stats, ref, data, type) do
Map.update!(stats, ref, fn stat ->
if type == :sent do
%{
stat
| messages_sent: stat.messages_sent + 1,
bytes_sent: stat.bytes_sent + byte_size(data)
}
else
%{
stat
| messages_received: stat.messages_received + 1,
bytes_received: stat.bytes_received + byte_size(data)
}
end
end)
end
end
18 changes: 17 additions & 1 deletion test/ex_webrtc/data_channel_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ defmodule ExWebRTC.DataChannelTest do
%{pc1: pc1, pc2: pc2, ref1: ref1, ref2: ref2}
end

test "message from initiating peer", %{pc1: pc1, pc2: pc2, ref1: ref1, ref2: ref2} do
test "from initiating peer", %{pc1: pc1, pc2: pc2, ref1: ref1, ref2: ref2} do
data1 = <<1, 2, 3>>
:ok = PeerConnection.send_data(pc1, ref1, data1)
assert_receive {:ex_webrtc, ^pc2, {:data, ^ref2, ^data1}}
Expand Down Expand Up @@ -200,5 +200,21 @@ defmodule ExWebRTC.DataChannelTest do
:ok = PeerConnection.send_data(pc2, ref2, msg)
assert_receive {:ex_webrtc, ^pc1, {:data, ^ref1, ^data}}
end

test "and collecting stats about it", %{pc1: pc1, pc2: pc2, ref1: ref1} do
for _ <- 1..10 do
:ok = PeerConnection.send_data(pc1, ref1, <<1, 2, 3>>)
end

stats1 = PeerConnection.get_stats(pc1)
assert {_ref, channel_stats1} = Enum.find(stats1, fn {_, v} -> v.type == :data_channel end)
assert channel_stats1.bytes_sent == 30
assert channel_stats1.messages_sent == 10

stats2 = PeerConnection.get_stats(pc2)
assert {_ref, channel_stats2} = Enum.find(stats2, fn {_, v} -> v.type == :data_channel end)
assert channel_stats2.bytes_received == 30
assert channel_stats2.messages_received == 10
end
end
end