From 57d04fc8010ef53f0193e5493d4735ea45a37782 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20=C5=9Aled=C5=BA?= Date: Mon, 5 Feb 2024 15:24:53 +0100 Subject: [PATCH] Add get_stats/1 (#64) --- lib/ex_webrtc/app.ex | 10 ++ lib/ex_webrtc/dtls_transport.ex | 62 +++++++++ lib/ex_webrtc/ice_transport.ex | 3 + lib/ex_webrtc/peer_connection.ex | 176 ++++++++++++++++++++++-- lib/ex_webrtc/rtp_receiver.ex | 37 ++++- lib/ex_webrtc/rtp_sender.ex | 45 +++++- lib/ex_webrtc/utils.ex | 4 + mix.exs | 9 +- mix.lock | 2 +- test/ex_webrtc/dtls_transport_test.exs | 3 + test/ex_webrtc/peer_connection_test.exs | 165 ++++++++++++++++++++-- test/ex_webrtc/rtp_receiver_test.exs | 52 +++++++ test/ex_webrtc/rtp_sender_test.exs | 48 ++++++- 13 files changed, 579 insertions(+), 37 deletions(-) create mode 100644 lib/ex_webrtc/app.ex create mode 100644 test/ex_webrtc/rtp_receiver_test.exs diff --git a/lib/ex_webrtc/app.ex b/lib/ex_webrtc/app.ex new file mode 100644 index 00000000..38ce0239 --- /dev/null +++ b/lib/ex_webrtc/app.ex @@ -0,0 +1,10 @@ +defmodule ExWebRTC.App do + @moduledoc false + use Application + + @impl true + def start(_type, _args) do + children = [{Registry, keys: :unique, name: ExWebRTC.Registry}] + Supervisor.start_link(children, strategy: :one_for_one) + end +end diff --git a/lib/ex_webrtc/dtls_transport.ex b/lib/ex_webrtc/dtls_transport.ex index 2a9909de..fd2a56c1 100644 --- a/lib/ex_webrtc/dtls_transport.ex +++ b/lib/ex_webrtc/dtls_transport.ex @@ -33,6 +33,19 @@ defmodule ExWebRTC.DTLSTransport do """ @type dtls_state() :: :new | :connecting | :connected | :closed | :failed + @typedoc """ + Information about DTLS certificate. + + * `fingerprint` - hex dump of the cert fingerprint + * `fingerprint_algorithm` - always `:sha_256` + * `base64_certificate` - base 64 encoded certificate + """ + @type cert_info :: %{ + fingerprint: binary(), + fingerprint_algorithm: :sha_256, + base64_certificate: binary() + } + @spec start_link(ICETransport.t(), pid()) :: GenServer.on_start() def start_link(ice_transport \\ DefaultICETransport, ice_pid) do behaviour = ice_transport.__info__(:attributes)[:behaviour] || [] @@ -49,6 +62,14 @@ defmodule ExWebRTC.DTLSTransport do GenServer.call(dtls_transport, :set_ice_connected) end + @spec get_certs_info(dtls_transport()) :: %{ + local_cert_info: cert_info(), + remote_cert_info: cert_info() | nil + } + def get_certs_info(dtls_transport) do + GenServer.call(dtls_transport, :get_certs_info) + end + @spec get_fingerprint(dtls_transport()) :: binary() def get_fingerprint(dtls_transport) do GenServer.call(dtls_transport, :get_fingerprint) @@ -87,8 +108,12 @@ defmodule ExWebRTC.DTLSTransport do ice_connected: false, buffered_packets: nil, cert: cert, + base64_cert: Base.encode64(cert), pkey: pkey, fingerprint: fingerprint, + remote_cert: nil, + remote_base64_cert: nil, + remote_fingerprint: nil, in_srtp: ExLibSRTP.new(), out_srtp: ExLibSRTP.new(), # sha256 hex dump @@ -133,6 +158,33 @@ defmodule ExWebRTC.DTLSTransport do end end + @impl true + def handle_call(:get_certs_info, _from, state) do + local_cert_info = %{ + fingerprint: Utils.hex_dump(state.fingerprint), + fingerprint_algorithm: :sha_256, + base64_certificate: state.base64_cert + } + + remote_cert_info = + if state.dtls_state == :connected do + %{ + fingerprint: Utils.hex_dump(state.remote_fingerprint), + fingerprint_algorithm: :sha_256, + base64_certificate: state.remote_base64_cert + } + else + nil + end + + certs_info = %{ + local_cert_info: local_cert_info, + remote_cert_info: remote_cert_info + } + + {:reply, certs_info, state} + end + @impl true def handle_call(:get_fingerprint, _from, state) do {:reply, state.fingerprint, state} @@ -247,6 +299,7 @@ defmodule ExWebRTC.DTLSTransport do {:handshake_finished, lkm, rkm, profile, packets} -> Logger.debug("DTLS handshake finished") + state = update_remote_cert_info(state) state.ice_transport.send_data(state.ice_pid, packets) peer_fingerprint = @@ -269,6 +322,7 @@ defmodule ExWebRTC.DTLSTransport do Logger.debug("DTLS handshake finished") :ok = setup_srtp(state, lkm, rkm, profile) state = update_dtls_state(state, :connected) + state = update_remote_cert_info(state) {:ok, state} :handshake_want_read -> @@ -340,5 +394,13 @@ defmodule ExWebRTC.DTLSTransport do %{state | dtls_state: new_dtls_state} end + defp update_remote_cert_info(state) do + cert = ExDTLS.get_cert(state.dtls) + fingerprint = ExDTLS.get_cert_fingerprint(cert) + base64_cert = Base.encode64(cert) + + %{state | remote_cert: cert, remote_base64_cert: base64_cert, remote_fingerprint: fingerprint} + end + defp notify(dst, msg), do: send(dst, {:dtls_transport, self(), msg}) end diff --git a/lib/ex_webrtc/ice_transport.ex b/lib/ex_webrtc/ice_transport.ex index 32efebb0..49110093 100644 --- a/lib/ex_webrtc/ice_transport.ex +++ b/lib/ex_webrtc/ice_transport.ex @@ -14,6 +14,7 @@ defmodule ExWebRTC.ICETransport do @callback restart(pid()) :: :ok @callback send_data(pid(), binary()) :: :ok @callback set_remote_credentials(pid(), ufrag :: binary(), pwd :: binary()) :: :ok + @callback get_stats(pid()) :: map() @callback stop(pid()) :: :ok end @@ -43,5 +44,7 @@ defmodule ExWebRTC.DefaultICETransport do @impl true defdelegate set_remote_credentials(pid, ufrag, pwd), to: ICEAgent @impl true + defdelegate get_stats(pid), to: ICEAgent + @impl true defdelegate stop(pid), to: ICEAgent end diff --git a/lib/ex_webrtc/peer_connection.ex b/lib/ex_webrtc/peer_connection.ex index 946b3e29..e6fedc84 100644 --- a/lib/ex_webrtc/peer_connection.ex +++ b/lib/ex_webrtc/peer_connection.ex @@ -18,6 +18,7 @@ defmodule ExWebRTC.PeerConnection do MediaStreamTrack, RTPTransceiver, RTPSender, + RTPReceiver, SDPUtils, SessionDescription, Utils @@ -59,6 +60,15 @@ defmodule ExWebRTC.PeerConnection do @type connection_state() :: :closed | :failed | :disconnected | :new | :connecting | :connected #### API #### + @doc """ + Returns a list of all running peer connections. + """ + @spec get_all_running() :: [pid()] + def get_all_running() do + Registry.select(ExWebRTC.Registry, [{{:_, :"$1", :_}, [], [:"$1"]}]) + |> Enum.filter(fn pid -> Process.alive?(pid) end) + end + @spec start_link(Configuration.options()) :: GenServer.on_start() def start_link(options \\ []) do configuration = Configuration.from_options!(options) @@ -162,6 +172,16 @@ defmodule ExWebRTC.PeerConnection do GenServer.cast(peer_connection, {:send_rtp, track_id, packet}) end + @doc """ + Returns peer connection's statistics. + + See [RTCStatsReport](https://www.w3.org/TR/webrtc/#rtcstatsreport-object) for the output structure. + """ + @spec get_stats(peer_connection()) :: %{(atom() | integer()) => map()} + def get_stats(peer_connection) do + GenServer.call(peer_connection, :get_stats) + end + @spec close(peer_connection()) :: :ok def close(peer_connection) do GenServer.stop(peer_connection) @@ -171,6 +191,7 @@ defmodule ExWebRTC.PeerConnection do @impl true def init({owner, config}) do + {:ok, _} = Registry.register(ExWebRTC.Registry, self(), self()) ice_config = [stun_servers: config.ice_servers, ip_filter: config.ice_ip_filter, on_data: nil] {:ok, ice_pid} = DefaultICETransport.start_link(:controlled, ice_config) {:ok, dtls_transport} = DTLSTransport.start_link(DefaultICETransport, ice_pid) @@ -519,7 +540,7 @@ defmodule ExWebRTC.PeerConnection do {:reply, :ok, state} true -> - # that's not compliant with the W3C but it's safer not + # that's not compliant with the W3C but it's safer not # to allow for this until we have clear use case {:reply, {:error, :invalid_transceiver_direction}, state} end @@ -558,6 +579,135 @@ defmodule ExWebRTC.PeerConnection do end end + @impl true + def handle_call(:get_stats, _from, state) do + timestamp = System.os_time(:millisecond) + + ice_stats = state.ice_transport.get_stats(state.ice_pid) + + %{local_cert_info: local_cert_info, remote_cert_info: remote_cert_info} = + DTLSTransport.get_certs_info(state.dtls_transport) + + remote_certificate = + if remote_cert_info != nil do + %{ + id: :remote_certificate, + type: :certificate, + timestamp: timestamp, + fingerprint: remote_cert_info.fingerprint, + fingerprint_algorithm: remote_cert_info.fingerprint_algorithm, + base64_certificate: remote_cert_info.base64_certificate + } + else + %{ + id: :remote_certificate, + type: :certificate, + timestamp: timestamp, + fingerprint: nil, + fingerprint_algorithm: nil, + base64_certificate: nil + } + end + + to_stats_candidate = fn cand, type, timestamp -> + %{ + id: cand.id, + timestamp: timestamp, + type: type, + address: cand.address, + port: cand.port, + protocol: cand.transport, + candidate_type: cand.type, + priority: cand.priority, + foundation: cand.foundation, + related_address: cand.base_address, + related_port: cand.base_port + } + end + + local_cands = + Map.new(ice_stats.local_candidates, fn local_cand -> + cand = to_stats_candidate.(local_cand, :local_candidate, timestamp) + {cand.id, cand} + end) + + remote_cands = + Map.new(ice_stats.remote_candidates, fn remote_cand -> + cand = to_stats_candidate.(remote_cand, :remote_candidate, timestamp) + {cand.id, cand} + end) + + rtp_stats = + Enum.flat_map(state.transceivers, fn tr -> + tr_stats = %{kind: tr.kind, mid: tr.mid} + + case tr.current_direction do + :sendonly -> + stats = RTPSender.get_stats(tr.sender, timestamp) + [Map.merge(stats, tr_stats)] + + :recvonly -> + stats = RTPReceiver.get_stats(tr.receiver, timestamp) + [Map.merge(stats, tr_stats)] + + :sendrecv -> + sender_stats = RTPSender.get_stats(tr.sender, timestamp) + receiver_stats = RTPReceiver.get_stats(tr.receiver, timestamp) + + [ + Map.merge(sender_stats, tr_stats), + Map.merge(receiver_stats, tr_stats) + ] + + _other -> + [] + end + end) + |> Map.new(fn stats -> {stats.id, stats} end) + + stats = %{ + peer_connection: %{ + id: :peer_connection, + type: :peer_connection, + timestamp: timestamp, + signaling_state: state.signaling_state, + negotiation_needed: state.negotiation_needed, + connection_state: state.conn_state + }, + local_certificate: %{ + id: :local_certificate, + type: :certificate, + timestamp: timestamp, + fingerprint: local_cert_info.fingerprint, + fingerprint_algorithm: local_cert_info.fingerprint_algorithm, + base64_certificate: local_cert_info.base64_certificate + }, + remote_certificate: remote_certificate, + transport: %{ + id: :transport, + type: :transport, + timestamp: timestamp, + ice_state: ice_stats.state, + ice_gathering_state: state.ice_gathering_state, + ice_role: ice_stats.role, + ice_local_ufrag: ice_stats.local_ufrag, + dtls_state: state.dtls_state, + bytes_sent: ice_stats.bytes_sent, + bytes_received: ice_stats.bytes_received, + packets_sent: ice_stats.packets_sent, + packets_received: ice_stats.packets_received + } + } + + stats = + stats + |> Map.merge(local_cands) + |> Map.merge(remote_cands) + |> Map.merge(rtp_stats) + + {:reply, stats, state} + end + @impl true def handle_cast({:send_rtp, track_id, packet}, state) do # TODO: iterating over transceivers is not optimal @@ -638,10 +788,12 @@ defmodule ExWebRTC.PeerConnection do @impl true def handle_info({:dtls_transport, _pid, {:rtp, data}}, state) do with {:ok, demuxer, mid, packet} <- Demuxer.demux(state.demuxer, data), - %RTPTransceiver{} = t <- Enum.find(state.transceivers, &(&1.mid == mid)) do - track_id = t.receiver.track.id - notify(state.owner, {:rtp, track_id, packet}) - {:noreply, %{state | demuxer: demuxer}} + {idx, %RTPTransceiver{} = t} <- find_transceiver(state.transceivers, mid) do + receiver = RTPReceiver.recv(t.receiver, packet, data) + transceivers = List.update_at(state.transceivers, idx, &%{&1 | receiver: receiver}) + state = %{state | demuxer: demuxer, transceivers: transceivers} + notify(state.owner, {:rtp, t.receiver.track.id, packet}) + {:noreply, state} else nil -> Logger.warning("Received RTP with unrecognized MID: #{inspect(data)}") @@ -717,7 +869,7 @@ defmodule ExWebRTC.PeerConnection do # mline from the last offer/answer, do it (i.e. recycle free mline) # * If there is no transceiver's mline, just rewrite # mline from the offer/answer respecting its port number i.e. whether - # it is rejected or not. + # it is rejected or not. # This is to preserve the same number of mlines # between subsequent offer/anser exchanges. # * At the end, add remaining transceiver mlines @@ -751,7 +903,7 @@ defmodule ExWebRTC.PeerConnection do end # next_mline_idx is future mline idx to use if there are no mlines to recycle - # next_mid is the next free mid + # next_mid is the next free mid defp assign_mlines( transceivers, last_answer, @@ -1167,7 +1319,7 @@ defmodule ExWebRTC.PeerConnection do # If signaling state is not stable i.e. we are during negotiation, # don't fire negotiation needed notification. - # We will do this when moving to the stable state as part of the + # We will do this when moving to the stable state as part of the # steps for setting remote description. defp update_negotiation_needed(%{signaling_state: sig_state} = state) when sig_state != :stable, do: state @@ -1185,14 +1337,14 @@ defmodule ExWebRTC.PeerConnection do negotiation_needed == false -> # We need to clear the flag. - # Consider scenario where we add a transceiver and then - # remove it without performing negotiation. + # Consider scenario where we add a transceiver and then + # remove it without performing negotiation. # At the end of the day, negotiation_needed flag has to be cleared. %{state | negotiation_needed: false} end end - # We don't support MSIDs and stopping transceivers so + # We don't support MSIDs and stopping transceivers so # we only check 5.2 and 5.3 from 4.7.3#check-if-negotiation-is-needed # https://www.w3.org/TR/webrtc/#dfn-check-if-negotiation-is-needed defp negotiation_needed?([], _), do: false @@ -1212,7 +1364,7 @@ defmodule ExWebRTC.PeerConnection do cond do # Consider the following scenario: # 1. offerer offers sendrecv - # 2. answerer answers with recvonly + # 2. answerer answers with recvonly # 3. offerer changes from sendrecv to sendonly # We don't need to renegotiate in such a case. local_desc_type == :offer and diff --git a/lib/ex_webrtc/rtp_receiver.ex b/lib/ex_webrtc/rtp_receiver.ex index 82304531..52b541f4 100644 --- a/lib/ex_webrtc/rtp_receiver.ex +++ b/lib/ex_webrtc/rtp_receiver.ex @@ -3,11 +3,42 @@ defmodule ExWebRTC.RTPReceiver do Implementation of the [RTCRtpReceiver](https://www.w3.org/TR/webrtc/#rtcrtpreceiver-interface). """ - alias ExWebRTC.MediaStreamTrack + alias ExWebRTC.{MediaStreamTrack, Utils} @type t() :: %__MODULE__{ - track: MediaStreamTrack.t() | nil + track: MediaStreamTrack.t(), + ssrc: non_neg_integer() | nil, + bytes_received: non_neg_integer(), + packets_received: non_neg_integer(), + markers_received: non_neg_integer() } - defstruct [:track] + defstruct [:track, :ssrc, bytes_received: 0, packets_received: 0, markers_received: 0] + + @doc false + @spec recv(t(), ExRTP.Packet.t(), binary()) :: t() + def recv(receiver, packet, raw_packet) do + # TODO assign ssrc when applying local/remote description. + %__MODULE__{ + receiver + | ssrc: packet.ssrc, + bytes_received: receiver.bytes_received + byte_size(raw_packet), + packets_received: receiver.packets_received + 1, + markers_received: receiver.markers_received + Utils.to_int(packet.marker) + } + end + + @doc false + @spec get_stats(t(), non_neg_integer()) :: map() + def get_stats(receiver, timestamp) do + %{ + id: receiver.track.id, + type: :inbound_rtp, + timestamp: timestamp, + ssrc: receiver.ssrc, + bytes_received: receiver.bytes_received, + packets_received: receiver.packets_received, + markers_received: receiver.markers_received + } + end end diff --git a/lib/ex_webrtc/rtp_sender.ex b/lib/ex_webrtc/rtp_sender.ex index beedb19f..767d41b5 100644 --- a/lib/ex_webrtc/rtp_sender.ex +++ b/lib/ex_webrtc/rtp_sender.ex @@ -19,11 +19,25 @@ defmodule ExWebRTC.RTPSender do mid: String.t() | nil, pt: non_neg_integer() | nil, ssrc: non_neg_integer() | nil, - last_seq_num: non_neg_integer() + last_seq_num: non_neg_integer(), + packets_sent: non_neg_integer(), + bytes_sent: non_neg_integer(), + markers_sent: non_neg_integer() } @enforce_keys [:id, :last_seq_num] - defstruct @enforce_keys ++ [:track, :codec, :mid, :pt, :ssrc, rtp_hdr_exts: %{}] + defstruct @enforce_keys ++ + [ + :track, + :codec, + :mid, + :pt, + :ssrc, + rtp_hdr_exts: %{}, + packets_sent: 0, + bytes_sent: 0, + markers_sent: 0 + ] @doc false @spec new( @@ -78,13 +92,34 @@ defmodule ExWebRTC.RTPSender do next_seq_num = sender.last_seq_num + 1 &&& 0xFFFF packet = %{packet | payload_type: sender.pt, ssrc: sender.ssrc, sequence_number: next_seq_num} - packet = + data = packet |> ExRTP.Packet.add_extension(mid_ext) |> ExRTP.Packet.encode() - sender = %{sender | last_seq_num: next_seq_num} - {packet, sender} + sender = %{ + sender + | last_seq_num: next_seq_num, + packets_sent: sender.packets_sent + 1, + bytes_sent: sender.bytes_sent + byte_size(data), + markers_sent: sender.markers_sent + Utils.to_int(packet.marker) + } + + {data, sender} + end + + @doc false + @spec get_stats(t(), non_neg_integer()) :: map() + def get_stats(sender, timestamp) do + %{ + timestamp: timestamp, + type: :outbound_rtp, + id: sender.id, + ssrc: sender.ssrc, + packets_sent: sender.packets_sent, + bytes_sent: sender.bytes_sent, + markers_sent: sender.markers_sent + } end defp random_seq_num(), do: Enum.random(0..65_535) diff --git a/lib/ex_webrtc/utils.ex b/lib/ex_webrtc/utils.ex index 9549f0f7..2d2ef1f9 100644 --- a/lib/ex_webrtc/utils.ex +++ b/lib/ex_webrtc/utils.ex @@ -13,4 +13,8 @@ defmodule ExWebRTC.Utils do <> = :crypto.strong_rand_bytes(12) id end + + @spec to_int(boolean()) :: 0 | 1 + def to_int(false), do: 0 + def to_int(true), do: 1 end diff --git a/mix.exs b/mix.exs index e7360c81..1a4ad638 100644 --- a/mix.exs +++ b/mix.exs @@ -33,6 +33,7 @@ defmodule ExWebRTC.MixProject do def application do [ + mod: {ExWebRTC.App, []}, extra_applications: [:logger] ] end @@ -50,7 +51,7 @@ defmodule ExWebRTC.MixProject do defp deps do [ {:ex_sdp, "~> 0.14.0"}, - {:ex_ice, "~> 0.5.0"}, + {:ex_ice, "~> 0.6.0"}, {:ex_dtls, "~> 0.15.0"}, {:ex_libsrtp, "~> 0.7.1"}, {:ex_rtp, "~> 0.3.0"}, @@ -83,15 +84,15 @@ defmodule ExWebRTC.MixProject do defp before_closing_body_tag(:html) do # highlight JS code blocks """ - +