From 738771f367d32dd4e1c332d30a92820b41629f15 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Wala?= Date: Thu, 29 Aug 2024 14:17:47 +0200 Subject: [PATCH] Make DataChannel support optional (#163) --- README.md | 14 + examples/chat/mix.exs | 3 +- examples/chat/mix.lock | 6 +- lib/ex_webrtc/peer_connection.ex | 2 +- lib/ex_webrtc/sctp_transport.ex | 802 +++++++++++++++++-------------- mix.exs | 2 +- 6 files changed, 449 insertions(+), 380 deletions(-) diff --git a/README.md b/README.md index 882c6e1..0dcf696 100644 --- a/README.md +++ b/README.md @@ -23,6 +23,20 @@ def deps do end ``` +Elixir WebRTC comes with optional support for DataChannels, but it must be explicitely turned on by +adding optional `ex_sctp` dependency + +```elixir +def deps do + [ + {:ex_webrtc, "~> 0.4.1"}, + {:ex_sctp, "~> 0.1.0"} + ] +end +``` + +Please note that `ex_sctp` requires you to have Rust installed in order to compile. + ## Getting started To get started with Elixir WebRTC, check out: diff --git a/examples/chat/mix.exs b/examples/chat/mix.exs index fee3fae..6437db8 100644 --- a/examples/chat/mix.exs +++ b/examples/chat/mix.exs @@ -24,7 +24,8 @@ defmodule Chat.MixProject do {:bandit, "~> 1.2.0"}, {:websock_adapter, "~> 0.5.0"}, {:jason, "~> 1.4.0"}, - {:ex_webrtc, path: "../../."} + {:ex_webrtc, path: "../../."}, + {:ex_sctp, github: "elixir-webrtc/ex_sctp"} ] end end diff --git a/examples/chat/mix.lock b/examples/chat/mix.lock index c48bf44..8c53151 100644 --- a/examples/chat/mix.lock +++ b/examples/chat/mix.lock @@ -8,7 +8,7 @@ "elixir_make": {:hex, :elixir_make, "0.8.4", "4960a03ce79081dee8fe119d80ad372c4e7badb84c493cc75983f9d3bc8bde0f", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:certifi, "~> 2.0", [hex: :certifi, repo: "hexpm", optional: true]}], "hexpm", "6e7f1d619b5f61dfabd0a20aa268e575572b542ac31723293a4c1a567d5ef040"}, "elixir_uuid": {:hex, :elixir_uuid, "1.2.1", "dce506597acb7e6b0daeaff52ff6a9043f5919a4c3315abb4143f0b00378c097", [:mix], [], "hexpm", "f7eba2ea6c3555cea09706492716b0d87397b88946e6380898c2889d68585752"}, "ex_dtls": {:git, "https://github.com/elixir-webrtc/ex_dtls.git", "dff8ec1998dfb556b2d3dafbd30574d0da18b958", []}, - "ex_ice": {:hex, :ex_ice, "0.8.0", "f9bd181e8fd2f8ac9a808587ee8a47bf667143069d75f6e4892a62156d798aa7", [:mix], [{:elixir_uuid, "~> 1.0", [hex: :elixir_uuid, repo: "hexpm", optional: false]}, {:ex_stun, "~> 0.2.0", [hex: :ex_stun, repo: "hexpm", optional: false]}, {:ex_turn, "~> 0.1.0", [hex: :ex_turn, repo: "hexpm", optional: false]}], "hexpm", "b0476f6b18986f6df48fda4cecb3be5022323572790d1bb49da10b177c936b4e"}, + "ex_ice": {:hex, :ex_ice, "0.8.1", "4d5c911766ce92e13323b632a55d9ab821092f13fc2ebf236dc233c8c1f9a64c", [:mix], [{:elixir_uuid, "~> 1.0", [hex: :elixir_uuid, repo: "hexpm", optional: false]}, {:ex_stun, "~> 0.2.0", [hex: :ex_stun, repo: "hexpm", optional: false]}, {:ex_turn, "~> 0.1.0", [hex: :ex_turn, repo: "hexpm", optional: false]}], "hexpm", "8f10134e2eb7e6aebbf8fba0d5fcec56d8f8db3e94c3dde045feb463979c2dda"}, "ex_libsrtp": {:hex, :ex_libsrtp, "0.7.2", "211bd89c08026943ce71f3e2c0231795b99cee748808ed3ae7b97cd8d2450b6b", [:mix], [{:bunch, "~> 1.6", [hex: :bunch, repo: "hexpm", optional: false]}, {:bundlex, "~> 1.3", [hex: :bundlex, repo: "hexpm", optional: false]}, {:membrane_precompiled_dependency_provider, "~> 0.1.0", [hex: :membrane_precompiled_dependency_provider, repo: "hexpm", optional: false]}, {:unifex, "~> 1.1", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "2e20645d0d739a4ecdcf8d4810a0c198120c8a2f617f2b75b2e2e704d59f492a"}, "ex_rtcp": {:hex, :ex_rtcp, "0.4.0", "f9e515462a9581798ff6413583a25174cfd2101c94a2ebee871cca7639886f0a", [:mix], [], "hexpm", "28956602cf210d692fcdaf3f60ca49681634e1deb28ace41246aee61ee22dc3b"}, "ex_rtp": {:hex, :ex_rtp, "0.4.0", "1f1b5c1440a904706011e3afbb41741f5da309ce251cb986690ce9fd82636658", [:mix], [], "hexpm", "0f72d80d5953a62057270040f0f1ee6f955c08eeae82ac659c038001d7d5a790"}, @@ -30,11 +30,11 @@ "req": {:hex, :req, "0.5.6", "8fe1eead4a085510fe3d51ad854ca8f20a622aae46e97b302f499dfb84f726ac", [:mix], [{:brotli, "~> 0.3.1", [hex: :brotli, repo: "hexpm", optional: true]}, {:ezstd, "~> 1.0", [hex: :ezstd, repo: "hexpm", optional: true]}, {:finch, "~> 0.17", [hex: :finch, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:mime, "~> 2.0.6 or ~> 2.1", [hex: :mime, repo: "hexpm", optional: false]}, {:nimble_csv, "~> 1.0", [hex: :nimble_csv, repo: "hexpm", optional: true]}, {:plug, "~> 1.0", [hex: :plug, repo: "hexpm", optional: true]}], "hexpm", "cfaa8e720945d46654853de39d368f40362c2641c4b2153c886418914b372185"}, "rustler": {:hex, :rustler, "0.34.0", "e9a73ee419fc296a10e49b415a2eb87a88c9217aa0275ec9f383d37eed290c1c", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:req, "~> 0.5", [hex: :req, repo: "hexpm", optional: false]}, {:toml, "~> 0.6", [hex: :toml, repo: "hexpm", optional: false]}], "hexpm", "1d0c7449482b459513003230c0e2422b0252245776fe6fd6e41cb2b11bd8e628"}, "shmex": {:hex, :shmex, "0.5.1", "81dd209093416bf6608e66882cb7e676089307448a1afd4fc906c1f7e5b94cf4", [:mix], [{:bunch_native, "~> 0.5.0", [hex: :bunch_native, repo: "hexpm", optional: false]}, {:bundlex, "~> 1.0", [hex: :bundlex, repo: "hexpm", optional: false]}], "hexpm", "c29f8286891252f64c4e1dac40b217d960f7d58def597c4e606ff8fbe71ceb80"}, - "telemetry": {:hex, :telemetry, "1.2.1", "68fdfe8d8f05a8428483a97d7aab2f268aaff24b49e0f599faa091f1d4e7f61c", [:rebar3], [], "hexpm", "dad9ce9d8effc621708f99eac538ef1cbe05d6a874dd741de2e689c47feafed5"}, + "telemetry": {:hex, :telemetry, "1.3.0", "fedebbae410d715cf8e7062c96a1ef32ec22e764197f70cda73d82778d61e7a2", [:rebar3], [], "hexpm", "7015fc8919dbe63764f4b4b87a95b7c0996bd539e0d499be6ec9d7f3875b79e6"}, "thousand_island": {:hex, :thousand_island, "1.3.5", "6022b6338f1635b3d32406ff98d68b843ba73b3aa95cfc27154223244f3a6ca5", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "2be6954916fdfe4756af3239fb6b6d75d0b8063b5df03ba76fd8a4c87849e180"}, "toml": {:hex, :toml, "0.7.0", "fbcd773caa937d0c7a02c301a1feea25612720ac3fa1ccb8bfd9d30d822911de", [:mix], [], "hexpm", "0690246a2478c1defd100b0c9b89b4ea280a22be9a7b313a8a058a2408a2fa70"}, "unifex": {:hex, :unifex, "1.2.0", "90d1ec5e6d788350e07e474f7bd8b0ee866d6606beb9ca4e20dbb26328712a84", [:mix], [{:bunch, "~> 1.0", [hex: :bunch, repo: "hexpm", optional: false]}, {:bundlex, "~> 1.4", [hex: :bundlex, repo: "hexpm", optional: false]}, {:shmex, "~> 0.5.0", [hex: :shmex, repo: "hexpm", optional: false]}], "hexpm", "7a8395aabc3ba6cff04bbe5b995de7f899a38eb57f189e49927d6b8b6ccb6883"}, "websock": {:hex, :websock, "0.5.3", "2f69a6ebe810328555b6fe5c831a851f485e303a7c8ce6c5f675abeb20ebdadc", [:mix], [], "hexpm", "6105453d7fac22c712ad66fab1d45abdf049868f253cf719b625151460b8b453"}, - "websock_adapter": {:hex, :websock_adapter, "0.5.6", "0437fe56e093fd4ac422de33bf8fc89f7bc1416a3f2d732d8b2c8fd54792fe60", [:mix], [{:bandit, ">= 0.6.0", [hex: :bandit, repo: "hexpm", optional: true]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.6", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:websock, "~> 0.5", [hex: :websock, repo: "hexpm", optional: false]}], "hexpm", "e04378d26b0af627817ae84c92083b7e97aca3121196679b73c73b99d0d133ea"}, + "websock_adapter": {:hex, :websock_adapter, "0.5.7", "65fa74042530064ef0570b75b43f5c49bb8b235d6515671b3d250022cb8a1f9e", [:mix], [{:bandit, ">= 0.6.0", [hex: :bandit, repo: "hexpm", optional: true]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.6", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:websock, "~> 0.5", [hex: :websock, repo: "hexpm", optional: false]}], "hexpm", "d0f478ee64deddfec64b800673fd6e0c8888b079d9f3444dd96d2a98383bdbd1"}, "zarex": {:hex, :zarex, "1.0.5", "58239e3ee5d75f343262bb4df5cf466555a1c689f920e5d3651a9333972f7c7e", [:mix], [], "hexpm", "9fb72ef0567c2b2742f5119a1ba8a24a2fabb21b8d09820aefbf3e592fa9a46a"}, } diff --git a/lib/ex_webrtc/peer_connection.ex b/lib/ex_webrtc/peer_connection.ex index 1277264..4f2dcda 100644 --- a/lib/ex_webrtc/peer_connection.ex +++ b/lib/ex_webrtc/peer_connection.ex @@ -1964,7 +1964,7 @@ defmodule ExWebRTC.PeerConnection do end defp dc_negotiation_needed?(state) do - first_channel = map_size(state.sctp_transport.channels) == 1 + first_channel = SCTPTransport.channel_count(state.sctp_transport) == 1 has_channels = case state.current_local_desc do diff --git a/lib/ex_webrtc/sctp_transport.ex b/lib/ex_webrtc/sctp_transport.ex index 79c1d76..cb71319 100644 --- a/lib/ex_webrtc/sctp_transport.ex +++ b/lib/ex_webrtc/sctp_transport.ex @@ -1,453 +1,507 @@ -defmodule ExWebRTC.SCTPTransport do - @moduledoc false - - require Logger - - alias __MODULE__.DCEP - alias ExWebRTC.DataChannel - - @dcep_ppi 50 - - @opaque t() :: map() - - @type event() :: - {:transmit, binary()} - | {:data, DataChannel.ref(), binary()} - | {:channel, DataChannel.t()} - | {:state_change, DataChannel.ref(), DataChannel.ready_state()} - - @spec new() :: t() - def new do - %{ - ref: ExSCTP.new(), - connected: false, - id_type: nil, - timer: nil, - channels: %{}, - stats: %{} - } - end +if Code.ensure_loaded?(ExSCTP) do + defmodule ExWebRTC.SCTPTransport do + @moduledoc false - @spec connect(t()) :: {[event()], t()} - def connect(%{connected: true} = sctp_transport), do: {[], sctp_transport} + require Logger - def connect(sctp_transport) do - :ok = ExSCTP.connect(sctp_transport.ref) - handle_events(sctp_transport) - end + alias __MODULE__.DCEP + alias ExWebRTC.DataChannel - @spec set_role(t(), :active | :passive) :: t() - def set_role(%{id_type: t} = sctp_transport, _type) when t != nil, do: sctp_transport - def set_role(sctp_transport, :active), do: %{sctp_transport | id_type: :even} - def set_role(sctp_transport, :passive), do: %{sctp_transport | id_type: :odd} + @dcep_ppi 50 - @spec data_channels?(t()) :: boolean() - def data_channels?(sctp_transport) do - sctp_transport.channels != %{} - end + @opaque t() :: map() - @spec get_stats(t(), non_neg_integer()) :: [map()] - def get_stats(sctp_transport, timestamp) do - Enum.map(sctp_transport.channels, fn {ref, channel} -> - stats = Map.fetch!(sctp_transport.stats, ref) + @type event() :: + {:transmit, binary()} + | {:data, DataChannel.ref(), binary()} + | {:channel, DataChannel.t()} + | {:state_change, DataChannel.ref(), DataChannel.ready_state()} + @spec new() :: t() + def new do %{ - id: inspect(channel.ref), - type: :data_channel, - timestamp: timestamp, - data_channel_identifier: channel.id, - label: channel.label, - protocol: channel.protocol, - state: channel.ready_state + ref: ExSCTP.new(), + connected: false, + id_type: nil, + timer: nil, + channels: %{}, + stats: %{} } - |> Map.merge(stats) - end) - end + end - @spec add_channel( - t(), - String.t(), - boolean(), - String.t(), - non_neg_integer() | nil, - non_neg_integer() | nil - ) :: - {[event()], DataChannel.t(), t()} - def add_channel(sctp_transport, label, ordered, protocol, lifetime, max_rtx) do - channel = %DataChannel{ - ref: make_ref(), - id: nil, - label: label, - ordered: ordered, - protocol: protocol, - ready_state: :connecting, - max_packet_life_time: lifetime, - max_retransmits: max_rtx - } - - channels = Map.put(sctp_transport.channels, channel.ref, channel) - stats = Map.put(sctp_transport.stats, channel.ref, initial_stats()) - sctp_transport = %{sctp_transport | channels: channels, stats: stats} - - {events, sctp_transport} = - if sctp_transport.connected do - sctp_transport = handle_pending_channels(sctp_transport) - handle_events(sctp_transport) - else - {[], sctp_transport} - end + @spec connect(t()) :: {[event()], t()} + def connect(%{connected: true} = sctp_transport), do: {[], sctp_transport} - {events, channel, sctp_transport} - end + def connect(sctp_transport) do + :ok = ExSCTP.connect(sctp_transport.ref) + handle_events(sctp_transport) + end - @spec close_channel(t(), DataChannel.ref()) :: {[event()], t()} - def close_channel(sctp_transport, ref) do - # TODO: according to spec, this should move to `closing` state - # and only then be closed, but oh well... - case Map.pop(sctp_transport.channels, ref) do - {nil, _channels} -> - Logger.warning("Trying to close non-existent channel with ref #{inspect(ref)}") - {[], sctp_transport} - - {%DataChannel{id: id}, channels} -> - stats = Map.delete(sctp_transport.stats, ref) - sctp_transport = %{sctp_transport | channels: channels, stats: stats} + @spec set_role(t(), :active | :passive) :: t() + def set_role(%{id_type: t} = sctp_transport, _type) when t != nil, do: sctp_transport + def set_role(sctp_transport, :active), do: %{sctp_transport | id_type: :even} + def set_role(sctp_transport, :passive), do: %{sctp_transport | id_type: :odd} - {events, sctp_transport} = - if id != nil do - :ok = ExSCTP.close_stream(sctp_transport.ref, id) - handle_events(sctp_transport) - else - {[], sctp_transport} - end + @spec data_channels?(t()) :: boolean() + def data_channels?(sctp_transport), do: sctp_transport.channels != %{} - event = {:state_change, ref, :closed} - {[event | events], sctp_transport} - end - end + @spec channel_count(t()) :: non_neg_integer() + def channel_count(sctp_transport), do: map_size(sctp_transport.channels) - @spec get_channel(t(), DataChannel.ref()) :: DataChannel.t() | nil - def get_channel(sctp_transport, ref), do: Map.get(sctp_transport.channels, ref) + @spec get_stats(t(), non_neg_integer()) :: [map()] + def get_stats(sctp_transport, timestamp) do + Enum.map(sctp_transport.channels, fn {ref, channel} -> + stats = Map.fetch!(sctp_transport.stats, ref) - @spec send(t(), DataChannel.ref(), :string | :binary, binary()) :: {[event()], t()} - def send(sctp_transport, ref, type, data) do - {ppi, data} = to_raw_data(data, type) + %{ + id: inspect(channel.ref), + type: :data_channel, + timestamp: timestamp, + data_channel_identifier: channel.id, + label: channel.label, + protocol: channel.protocol, + state: channel.ready_state + } + |> Map.merge(stats) + end) + end - case Map.fetch(sctp_transport.channels, ref) do - {:ok, %DataChannel{ready_state: :open, id: id}} when id != nil -> - stats = update_stats(sctp_transport.stats, ref, data, :sent) - :ok = ExSCTP.send(sctp_transport.ref, id, ppi, data) - handle_events(%{sctp_transport | stats: stats}) + @spec add_channel( + t(), + String.t(), + boolean(), + String.t(), + non_neg_integer() | nil, + non_neg_integer() | nil + ) :: + {[event()], DataChannel.t(), t()} + def add_channel(sctp_transport, label, ordered, protocol, lifetime, max_rtx) do + channel = %DataChannel{ + ref: make_ref(), + id: nil, + label: label, + ordered: ordered, + protocol: protocol, + ready_state: :connecting, + max_packet_life_time: lifetime, + max_retransmits: max_rtx + } - {:ok, %DataChannel{id: id}} -> - Logger.warning( - "Trying to send data over DataChannel with id #{id} that is not opened yet" - ) + channels = Map.put(sctp_transport.channels, channel.ref, channel) + stats = Map.put(sctp_transport.stats, channel.ref, initial_stats()) + sctp_transport = %{sctp_transport | channels: channels, stats: stats} - {[], sctp_transport} + {events, sctp_transport} = + if sctp_transport.connected do + sctp_transport = handle_pending_channels(sctp_transport) + handle_events(sctp_transport) + else + {[], sctp_transport} + end - :error -> - Logger.warning( - "Trying to send data over non-existent DataChannel with ref #{inspect(ref)}" - ) + {events, channel, sctp_transport} + end - {[], sctp_transport} + @spec close_channel(t(), DataChannel.ref()) :: {[event()], t()} + def close_channel(sctp_transport, ref) do + # TODO: according to spec, this should move to `closing` state + # and only then be closed, but oh well... + case Map.pop(sctp_transport.channels, ref) do + {nil, _channels} -> + Logger.warning("Trying to close non-existent channel with ref #{inspect(ref)}") + {[], sctp_transport} + + {%DataChannel{id: id}, channels} -> + stats = Map.delete(sctp_transport.stats, ref) + sctp_transport = %{sctp_transport | channels: channels, stats: stats} + + {events, sctp_transport} = + if id != nil do + :ok = ExSCTP.close_stream(sctp_transport.ref, id) + handle_events(sctp_transport) + else + {[], sctp_transport} + end + + event = {:state_change, ref, :closed} + {[event | events], sctp_transport} + end end - end - @spec handle_timeout(t()) :: {[event()], t()} - def handle_timeout(sctp_transport) do - ExSCTP.handle_timeout(sctp_transport.ref) - handle_events(sctp_transport) - end + @spec get_channel(t(), DataChannel.ref()) :: DataChannel.t() | nil + def get_channel(sctp_transport, ref), do: Map.get(sctp_transport.channels, ref) - @spec handle_data(t(), binary()) :: {[event()], t()} - def handle_data(sctp_transport, data) do - :ok = ExSCTP.handle_data(sctp_transport.ref, data) - handle_events(sctp_transport) - end + @spec send(t(), DataChannel.ref(), :string | :binary, binary()) :: {[event()], t()} + def send(sctp_transport, ref, type, data) do + {ppi, data} = to_raw_data(data, type) - defp handle_pending_channels(sctp_transport) do - sctp_transport.channels - |> Map.values() - |> Enum.filter(fn channel -> channel.id == nil end) - |> Enum.reduce(sctp_transport, fn channel, transport -> - handle_pending_channel(transport, channel) - end) - end + case Map.fetch(sctp_transport.channels, ref) do + {:ok, %DataChannel{ready_state: :open, id: id}} when id != nil -> + stats = update_stats(sctp_transport.stats, ref, data, :sent) + :ok = ExSCTP.send(sctp_transport.ref, id, ppi, data) + handle_events(%{sctp_transport | stats: stats}) - defp handle_pending_channel(sctp_transport, channel) do - id = new_id(sctp_transport) - :ok = ExSCTP.open_stream(sctp_transport.ref, id) + {:ok, %DataChannel{id: id}} -> + Logger.warning( + "Trying to send data over DataChannel with id #{id} that is not opened yet" + ) - {reliability, param} = - cond do - channel.max_packet_life_time != nil -> {:timed, channel.max_packet_life_time} - channel.max_retransmits != nil -> {:rexmit, channel.max_retransmits} - true -> {:reliable, 0} - end + {[], sctp_transport} - dco = %DCEP.DataChannelOpen{ - reliability: reliability, - order: if(channel.ordered, do: :ordered, else: :unordered), - label: channel.label, - protocol: channel.protocol, - param: param, - priority: 0 - } + :error -> + Logger.warning( + "Trying to send data over non-existent DataChannel with ref #{inspect(ref)}" + ) - :ok = ExSCTP.send(sctp_transport.ref, id, @dcep_ppi, DCEP.encode(dco)) + {[], sctp_transport} + end + end - channel = %DataChannel{channel | id: id} - %{sctp_transport | channels: Map.replace!(sctp_transport.channels, channel.ref, channel)} - end + @spec handle_timeout(t()) :: {[event()], t()} + def handle_timeout(sctp_transport) do + ExSCTP.handle_timeout(sctp_transport.ref) + handle_events(sctp_transport) + end - defp handle_events(sctp_transport, events \\ []) do - event = ExSCTP.poll(sctp_transport.ref) + @spec handle_data(t(), binary()) :: {[event()], t()} + def handle_data(sctp_transport, data) do + :ok = ExSCTP.handle_data(sctp_transport.ref, data) + handle_events(sctp_transport) + end - case handle_event(sctp_transport, event) do - {:none, transport} -> {Enum.reverse(events), transport} - {nil, transport} -> handle_events(transport, events) - {other, transport} when is_list(other) -> handle_events(transport, other ++ events) - {other, transport} -> handle_events(transport, [other | events]) + defp handle_pending_channels(sctp_transport) do + sctp_transport.channels + |> Map.values() + |> Enum.filter(fn channel -> channel.id == nil end) + |> Enum.reduce(sctp_transport, fn channel, transport -> + handle_pending_channel(transport, channel) + end) end - end - # if SCTP disconnected, most likely DTLS disconnected, so we won't handle this here explcitly - defp handle_event(sctp_transport, :disconnected), do: {nil, sctp_transport} - defp handle_event(sctp_transport, :none), do: {:none, sctp_transport} - defp handle_event(sctp_transport, {:transmit, _data} = event), do: {event, sctp_transport} + defp handle_pending_channel(sctp_transport, channel) do + id = new_id(sctp_transport) + :ok = ExSCTP.open_stream(sctp_transport.ref, id) - defp handle_event(sctp_transport, {:stream_opened, id}) do - Logger.debug("SCTP stream #{id} has been opened") - {nil, sctp_transport} - end + {reliability, param} = + cond do + channel.max_packet_life_time != nil -> {:timed, channel.max_packet_life_time} + channel.max_retransmits != nil -> {:rexmit, channel.max_retransmits} + true -> {:reliable, 0} + end - defp handle_event(sctp_transport, {:stream_closed, id}) do - Logger.debug("SCTP stream #{id} has been closed") + dco = %DCEP.DataChannelOpen{ + reliability: reliability, + order: if(channel.ordered, do: :ordered, else: :unordered), + label: channel.label, + protocol: channel.protocol, + param: param, + priority: 0 + } - case Enum.find(sctp_transport.channels, fn {_k, v} -> v.id == id end) do - {ref, %DataChannel{ref: ref}} -> - channels = Map.delete(sctp_transport.channels, ref) - stats = Map.delete(sctp_transport.stats, ref) - event = {:state_change, ref, :closed} - {event, %{sctp_transport | channels: channels, stats: stats}} + :ok = ExSCTP.send(sctp_transport.ref, id, @dcep_ppi, DCEP.encode(dco)) - _other -> - {nil, sctp_transport} + channel = %DataChannel{channel | id: id} + %{sctp_transport | channels: Map.replace!(sctp_transport.channels, channel.ref, channel)} end - end - defp handle_event(sctp_transport, :connected) do - Logger.debug("SCTP connection has been established") + defp handle_events(sctp_transport, events \\ []) do + event = ExSCTP.poll(sctp_transport.ref) - sctp_transport = - %{sctp_transport | connected: true} - |> handle_pending_channels() + case handle_event(sctp_transport, event) do + {:none, transport} -> {Enum.reverse(events), transport} + {nil, transport} -> handle_events(transport, events) + {other, transport} when is_list(other) -> handle_events(transport, other ++ events) + {other, transport} -> handle_events(transport, [other | events]) + end + end - {nil, sctp_transport} - end + # if SCTP disconnected, most likely DTLS disconnected, so we won't handle this here explcitly + defp handle_event(sctp_transport, :disconnected), do: {nil, sctp_transport} + defp handle_event(sctp_transport, :none), do: {:none, sctp_transport} + defp handle_event(sctp_transport, {:transmit, _data} = event), do: {event, sctp_transport} - defp handle_event(sctp_transport, {:timeout, val}) do - # TODO: this seems to work - # but sometimes the data is send after quite a substensial timeout - # calling `handle_timeout` periodically (i.e. every 50s) seems to work better - # which is wierd, to investigate - if sctp_transport.timer != nil do - Process.cancel_timer(sctp_transport.timer) + defp handle_event(sctp_transport, {:stream_opened, id}) do + Logger.debug("SCTP stream #{id} has been opened") + {nil, sctp_transport} end - timer = - case val do - nil -> nil - ms -> Process.send_after(self(), :sctp_timeout, ms) + defp handle_event(sctp_transport, {:stream_closed, id}) do + Logger.debug("SCTP stream #{id} has been closed") + + case Enum.find(sctp_transport.channels, fn {_k, v} -> v.id == id end) do + {ref, %DataChannel{ref: ref}} -> + channels = Map.delete(sctp_transport.channels, ref) + stats = Map.delete(sctp_transport.stats, ref) + event = {:state_change, ref, :closed} + {event, %{sctp_transport | channels: channels, stats: stats}} + + _other -> + {nil, sctp_transport} end + end - {nil, %{sctp_transport | timer: timer}} - end + defp handle_event(sctp_transport, :connected) do + Logger.debug("SCTP connection has been established") + + sctp_transport = + %{sctp_transport | connected: true} + |> handle_pending_channels() + + {nil, sctp_transport} + end + + defp handle_event(sctp_transport, {:timeout, val}) do + # TODO: this seems to work + # but sometimes the data is send after quite a substensial timeout + # calling `handle_timeout` periodically (i.e. every 50s) seems to work better + # which is wierd, to investigate + if sctp_transport.timer != nil do + Process.cancel_timer(sctp_transport.timer) + end - defp handle_event(sctp_transport, {:data, id, @dcep_ppi, data}) do - with {:ok, dcep} <- DCEP.decode(data), - {:ok, sctp_transport, events} <- handle_dcep(sctp_transport, id, dcep) do - # events is either list or a single event - {events, sctp_transport} - else - :error -> - Logger.warning("Received invalid DCEP message. Closing the stream with id #{id}") - - ExSCTP.close_stream(sctp_transport.ref, id) - - case Enum.find_value(sctp_transport.channels, fn {_k, v} -> v.id == id end) do - {ref, %DataChannel{}} -> - channels = Map.delete(sctp_transport.channels, ref) - stats = Map.delete(sctp_transport.stats, ref) - sctp_transport = %{sctp_transport | channels: channels, stats: stats} - {{:state_change, ref, :closed}, sctp_transport} - - nil -> - {nil, sctp_transport} + timer = + case val do + nil -> nil + ms -> Process.send_after(self(), :sctp_timeout, ms) end + + {nil, %{sctp_transport | timer: timer}} end - end - defp handle_event(sctp_transport, {:data, id, ppi, data}) do - with {:ok, data} <- from_raw_data(data, ppi), - {ref, %DataChannel{ready_state: :open}} <- - Enum.find(sctp_transport.channels, fn {_k, v} -> v.id == id end) do - stats = update_stats(sctp_transport.stats, ref, data, :received) - {{:data, ref, data}, %{sctp_transport | stats: stats}} - else - {_ref, %DataChannel{}} -> - Logger.warning("Received data on DataChannel with id #{id} that is not open. Discarding") - {nil, sctp_transport} - - nil -> - Logger.warning( - "Received data over non-existent DataChannel on stream with id #{id}. Discarding" - ) - - {nil, sctp_transport} - - _other -> - Logger.warning("Received data in invalid format on stream with id #{id}. Discarding") - {nil, sctp_transport} + defp handle_event(sctp_transport, {:data, id, @dcep_ppi, data}) do + with {:ok, dcep} <- DCEP.decode(data), + {:ok, sctp_transport, events} <- handle_dcep(sctp_transport, id, dcep) do + # events is either list or a single event + {events, sctp_transport} + else + :error -> + Logger.warning("Received invalid DCEP message. Closing the stream with id #{id}") + + ExSCTP.close_stream(sctp_transport.ref, id) + + case Enum.find_value(sctp_transport.channels, fn {_k, v} -> v.id == id end) do + {ref, %DataChannel{}} -> + channels = Map.delete(sctp_transport.channels, ref) + stats = Map.delete(sctp_transport.stats, ref) + sctp_transport = %{sctp_transport | channels: channels, stats: stats} + {{:state_change, ref, :closed}, sctp_transport} + + nil -> + {nil, sctp_transport} + end + end end - end - defp handle_dcep(sctp_transport, id, %DCEP.DataChannelOpen{} = dco) do - with false <- Enum.any?(sctp_transport.channels, fn {_k, v} -> v.id == id end), - true <- valid_id?(sctp_transport, id) do - :ok = ExSCTP.send(sctp_transport.ref, id, @dcep_ppi, DCEP.encode(%DCEP.DataChannelAck{})) + defp handle_event(sctp_transport, {:data, id, ppi, data}) do + with {:ok, data} <- from_raw_data(data, ppi), + {ref, %DataChannel{ready_state: :open}} <- + Enum.find(sctp_transport.channels, fn {_k, v} -> v.id == id end) do + stats = update_stats(sctp_transport.stats, ref, data, :received) + {{:data, ref, data}, %{sctp_transport | stats: stats}} + else + {_ref, %DataChannel{}} -> + Logger.warning( + "Received data on DataChannel with id #{id} that is not open. Discarding" + ) - Logger.debug("Remote opened DataChannel #{id} succesfully") + {nil, sctp_transport} - channel = %DataChannel{ - ref: make_ref(), - id: id, - label: dco.label, - ordered: dco.order == :ordered, - protocol: dco.protocol, - ready_state: :open, - max_packet_life_time: if(dco.reliability == :timed, do: dco.param, else: nil), - max_retransmits: if(dco.reliability == :rexmit, do: dco.param, else: nil) - } + nil -> + Logger.warning( + "Received data over non-existent DataChannel on stream with id #{id}. Discarding" + ) - # In theory, we should also send the :open event here (W3C 6.2.3) - # TODO - channels = Map.put(sctp_transport.channels, channel.ref, channel) - stats = Map.put(sctp_transport.stats, channel.ref, initial_stats()) - sctp_transport = %{sctp_transport | channels: channels, stats: stats} + {nil, sctp_transport} - case ExSCTP.configure_stream( - sctp_transport.ref, - id, - channel.ordered, - dco.reliability, - dco.param - ) do - :ok -> - # remote channels also result in open event - # even tho they already have ready_state open in the {:data_channel, ...} message - # W3C 6.2.3 - events = [{:state_change, channel.ref, :open}, {:channel, channel}] - {:ok, sctp_transport, events} - - {:error, _res} -> - Logger.warning("Unable to set stream #{id} parameters") - :error + _other -> + Logger.warning("Received data in invalid format on stream with id #{id}. Discarding") + {nil, sctp_transport} end - else - _other -> - Logger.warning("Received invalid DCEP Open on stream #{id}") - :error end - end - - defp handle_dcep(sctp_transport, id, %DCEP.DataChannelAck{}) do - case Enum.find(sctp_transport.channels, fn {_k, v} -> v.id == id end) do - {ref, %DataChannel{ready_state: :connecting} = channel} -> - Logger.debug("Locally opened DataChannel #{id} has been negotiated succesfully") - channel = %DataChannel{channel | ready_state: :open} - channels = Map.put(sctp_transport.channels, ref, channel) - sctp_transport = %{sctp_transport | channels: channels} + defp handle_dcep(sctp_transport, id, %DCEP.DataChannelOpen{} = dco) do + with false <- Enum.any?(sctp_transport.channels, fn {_k, v} -> v.id == id end), + true <- valid_id?(sctp_transport, id) do + :ok = ExSCTP.send(sctp_transport.ref, id, @dcep_ppi, DCEP.encode(%DCEP.DataChannelAck{})) + + Logger.debug("Remote opened DataChannel #{id} succesfully") + + channel = %DataChannel{ + ref: make_ref(), + id: id, + label: dco.label, + ordered: dco.order == :ordered, + protocol: dco.protocol, + ready_state: :open, + max_packet_life_time: if(dco.reliability == :timed, do: dco.param, else: nil), + max_retransmits: if(dco.reliability == :rexmit, do: dco.param, else: nil) + } - {rel_type, rel_param} = - case channel do - %DataChannel{max_packet_life_time: nil, max_retransmits: nil} -> {:reliable, 0} - %DataChannel{max_retransmits: v} when v != nil -> {:rexmit, v} - %DataChannel{max_packet_life_time: v} when v != nil -> {:timed, v} - end + # In theory, we should also send the :open event here (W3C 6.2.3) + # TODO + channels = Map.put(sctp_transport.channels, channel.ref, channel) + stats = Map.put(sctp_transport.stats, channel.ref, initial_stats()) + sctp_transport = %{sctp_transport | channels: channels, stats: stats} - case ExSCTP.configure_stream(sctp_transport.ref, id, channel.ordered, rel_type, rel_param) do + case ExSCTP.configure_stream( + sctp_transport.ref, + id, + channel.ordered, + dco.reliability, + dco.param + ) do :ok -> - {:ok, sctp_transport, {:state_change, ref, :open}} + # remote channels also result in open event + # even tho they already have ready_state open in the {:data_channel, ...} message + # W3C 6.2.3 + events = [{:state_change, channel.ref, :open}, {:channel, channel}] + {:ok, sctp_transport, events} {:error, _res} -> Logger.warning("Unable to set stream #{id} parameters") :error end + else + _other -> + Logger.warning("Received invalid DCEP Open on stream #{id}") + :error + end + end + + defp handle_dcep(sctp_transport, id, %DCEP.DataChannelAck{}) do + case Enum.find(sctp_transport.channels, fn {_k, v} -> v.id == id end) do + {ref, %DataChannel{ready_state: :connecting} = channel} -> + Logger.debug("Locally opened DataChannel #{id} has been negotiated succesfully") + + channel = %DataChannel{channel | ready_state: :open} + channels = Map.put(sctp_transport.channels, ref, channel) + sctp_transport = %{sctp_transport | channels: channels} + + {rel_type, rel_param} = + case channel do + %DataChannel{max_packet_life_time: nil, max_retransmits: nil} -> {:reliable, 0} + %DataChannel{max_retransmits: v} when v != nil -> {:rexmit, v} + %DataChannel{max_packet_life_time: v} when v != nil -> {:timed, v} + end + + case ExSCTP.configure_stream( + sctp_transport.ref, + id, + channel.ordered, + rel_type, + rel_param + ) do + :ok -> + {:ok, sctp_transport, {:state_change, ref, :open}} + + {:error, _res} -> + Logger.warning("Unable to set stream #{id} parameters") + :error + end + + _other -> + Logger.warning( + "Received DCEP Ack without sending the DCEP Open message on stream #{id}" + ) + + :error + end + end + + defp from_raw_data(data, ppi) when ppi in [51, 53], do: {:ok, data} + defp from_raw_data(_data, ppi) when ppi in [56, 57], do: {:ok, <<>>} + defp from_raw_data(_data, _ppi), do: :error + + defp to_raw_data(<<>>, :string), do: {56, <<0>>} + defp to_raw_data(data, :string), do: {51, data} + defp to_raw_data(<<>>, :binary), do: {57, <<0>>} + defp to_raw_data(data, :binary), do: {53, data} + + # for remote ids (so must be opposite than ours) + defp valid_id?(%{id_type: :even}, id), do: rem(id, 2) == 1 + defp valid_id?(%{id_type: :odd}, id), do: rem(id, 2) == 0 + + defp new_id(sctp_transport) do + max_id = + sctp_transport.channels + |> Enum.filter(fn {_k, v} -> v.id != nil end) + |> Enum.map(fn {_k, v} -> v.id end) + |> Enum.max(&>=/2, fn -> -1 end) + + case {sctp_transport.id_type, rem(max_id, 2)} do + {:even, -1} -> 0 + {:odd, -1} -> 1 + {:even, 0} -> max_id + 2 + {:even, 1} -> max_id + 1 + {:odd, 0} -> max_id + 1 + {:odd, 1} -> max_id + 2 + end + end - _other -> - Logger.warning("Received DCEP Ack without sending the DCEP Open message on stream #{id}") - :error + defp initial_stats() do + %{ + messages_sent: 0, + messages_received: 0, + bytes_sent: 0, + bytes_received: 0 + } + end + + defp update_stats(stats, ref, data, type) do + Map.update!(stats, ref, fn stat -> + if type == :sent do + %{ + stat + | messages_sent: stat.messages_sent + 1, + bytes_sent: stat.bytes_sent + byte_size(data) + } + else + %{ + stat + | messages_received: stat.messages_received + 1, + bytes_received: stat.bytes_received + byte_size(data) + } + end + end) end end +else + defmodule ExWebRTC.SCTPTransport do + @moduledoc false - defp from_raw_data(data, ppi) when ppi in [51, 53], do: {:ok, data} - defp from_raw_data(_data, ppi) when ppi in [56, 57], do: {:ok, <<>>} - defp from_raw_data(_data, _ppi), do: :error + require Logger - defp to_raw_data(<<>>, :string), do: {56, <<0>>} - defp to_raw_data(data, :string), do: {51, data} - defp to_raw_data(<<>>, :binary), do: {57, <<0>>} - defp to_raw_data(data, :binary), do: {53, data} + @tip "Install Rust and add `ex_sctp` dependency to your project in order to enable DataChannels." - # for remote ids (so must be opposite than ours) - defp valid_id?(%{id_type: :even}, id), do: rem(id, 2) == 1 - defp valid_id?(%{id_type: :odd}, id), do: rem(id, 2) == 0 + def new, do: nil + def set_role(_, _), do: nil + def data_channels?(_), do: false + def channel_count(_), do: 0 + def get_stats(_, _), do: [] - defp new_id(sctp_transport) do - max_id = - sctp_transport.channels - |> Enum.filter(fn {_k, v} -> v.id != nil end) - |> Enum.map(fn {_k, v} -> v.id end) - |> Enum.max(&>=/2, fn -> -1 end) - - case {sctp_transport.id_type, rem(max_id, 2)} do - {:even, -1} -> 0 - {:odd, -1} -> 1 - {:even, 0} -> max_id + 2 - {:even, 1} -> max_id + 1 - {:odd, 0} -> max_id + 1 - {:odd, 1} -> max_id + 2 + def add_channel(_, _, _, _, _, _), do: error() + def close_channel(_, _), do: error() + def get_channel(_, _), do: error() + def send(_, _, _, _), do: error() + def handle_timeout(_), do: error() + + def handle_data(_, _) do + text = "Received SCTP data from remote peer, but DataChannel support is turned off." + Logger.warning("#{text} #{@tip}") + + {[], nil} end - end - defp initial_stats() do - %{ - messages_sent: 0, - messages_received: 0, - bytes_sent: 0, - bytes_received: 0 - } - end + def connect(_) do + text = "Attempting to establish SCTP connection, but DataChannel support is turned off." + Logger.warning("#{text} #{@tip}") - defp update_stats(stats, ref, data, type) do - Map.update!(stats, ref, fn stat -> - if type == :sent do - %{ - stat - | messages_sent: stat.messages_sent + 1, - bytes_sent: stat.bytes_sent + byte_size(data) - } - else - %{ - stat - | messages_received: stat.messages_received + 1, - bytes_received: stat.bytes_received + byte_size(data) - } - end - end) + {[], nil} + end + + defp error do + text = "DataChannel support is turned off." + raise("#{text} #{@tip}") + end end end diff --git a/mix.exs b/mix.exs index da47b34..4acc6f7 100644 --- a/mix.exs +++ b/mix.exs @@ -62,7 +62,7 @@ defmodule ExWebRTC.MixProject do {:ex_libsrtp, "~> 0.7.1"}, {:ex_rtp, "~> 0.4.0"}, {:ex_rtcp, "~> 0.4.0"}, - {:ex_sctp, github: "elixir-webrtc/ex_sctp"}, + {:ex_sctp, github: "elixir-webrtc/ex_sctp", optional: true}, {:crc, "~> 0.10"}, # dev/test