diff --git a/examples/chat/lib/chat/peer_handler.ex b/examples/chat/lib/chat/peer_handler.ex index 8f54188..e0fd606 100644 --- a/examples/chat/lib/chat/peer_handler.ex +++ b/examples/chat/lib/chat/peer_handler.ex @@ -103,5 +103,10 @@ defmodule Chat.PeerHandler do {:ok, state} end + defp handle_webrtc_msg({:data_channel_state_change, ref, :closed}, %{channel_ref: ref} = state) do + Logger.warning("Channel #{inspect(ref)} has been closed") + {:stop, :channel_closed, state} + end + defp handle_webrtc_msg(_msg, state), do: {:ok, state} end diff --git a/examples/chat/mix.lock b/examples/chat/mix.lock index 2ebe8a7..c48bf44 100644 --- a/examples/chat/mix.lock +++ b/examples/chat/mix.lock @@ -12,7 +12,7 @@ "ex_libsrtp": {:hex, :ex_libsrtp, "0.7.2", "211bd89c08026943ce71f3e2c0231795b99cee748808ed3ae7b97cd8d2450b6b", [:mix], [{:bunch, "~> 1.6", [hex: :bunch, repo: "hexpm", optional: false]}, {:bundlex, "~> 1.3", [hex: :bundlex, repo: "hexpm", optional: false]}, {:membrane_precompiled_dependency_provider, "~> 0.1.0", [hex: :membrane_precompiled_dependency_provider, repo: "hexpm", optional: false]}, {:unifex, "~> 1.1", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "2e20645d0d739a4ecdcf8d4810a0c198120c8a2f617f2b75b2e2e704d59f492a"}, "ex_rtcp": {:hex, :ex_rtcp, "0.4.0", "f9e515462a9581798ff6413583a25174cfd2101c94a2ebee871cca7639886f0a", [:mix], [], "hexpm", "28956602cf210d692fcdaf3f60ca49681634e1deb28ace41246aee61ee22dc3b"}, "ex_rtp": {:hex, :ex_rtp, "0.4.0", "1f1b5c1440a904706011e3afbb41741f5da309ce251cb986690ce9fd82636658", [:mix], [], "hexpm", "0f72d80d5953a62057270040f0f1ee6f955c08eeae82ac659c038001d7d5a790"}, - "ex_sctp": {:git, "https://github.com/elixir-webrtc/ex_sctp.git", "1576207bda0eba3634a2b0075899042e9b309e60", []}, + "ex_sctp": {:git, "https://github.com/elixir-webrtc/ex_sctp.git", "0e9f33fc220ee7f2b1f5843d5ab02cb7c229d837", []}, "ex_sdp": {:hex, :ex_sdp, "1.0.0", "c66cd66d60ad03ff1eecdc6db6a1b8a7b89fec260fcc22e8d6703fc5bbf430a3", [:mix], [{:bunch, "~> 1.3", [hex: :bunch, repo: "hexpm", optional: false]}, {:elixir_uuid, "~> 1.2", [hex: :elixir_uuid, repo: "hexpm", optional: false]}], "hexpm", "e165dff988b8ab9d93588636aa5f3f683e1f848fc63b78b12382c8fa3dd39216"}, "ex_stun": {:hex, :ex_stun, "0.2.0", "feb1fc7db0356406655b2a617805e6c712b93308c8ea2bf0ba1197b1f0866deb", [:mix], [], "hexpm", "1e01ba8290082ccbf37acaa5190d1f69b51edd6de2026a8d6d51368b29d115d0"}, "ex_turn": {:hex, :ex_turn, "0.1.0", "177405aadf3d754567d0d37cf881a83f9cacf8f45314d188633b04c4a9e7c1ec", [:mix], [{:ex_stun, "~> 0.2.0", [hex: :ex_stun, repo: "hexpm", optional: false]}], "hexpm", "d677737fb7d45274d5dac19fe3c26b9038b6effbc0a6b3e7417bccc76b6d1cd3"}, diff --git a/lib/ex_webrtc/peer_connection.ex b/lib/ex_webrtc/peer_connection.ex index 15df00e..d6b19d4 100644 --- a/lib/ex_webrtc/peer_connection.ex +++ b/lib/ex_webrtc/peer_connection.ex @@ -173,6 +173,8 @@ defmodule ExWebRTC.PeerConnection do Sends data over DataChannel, using channel identified by `ref`. Requires the channel to be in `:open` state. + + If `ref` does not identify any DataChannel, this function behaves like no-op. """ @spec send_data(peer_connection(), DataChannel.ref(), binary()) :: :ok def send_data(peer_connection, channel_ref, data) do @@ -435,6 +437,33 @@ defmodule ExWebRTC.PeerConnection do GenServer.call(peer_connection, {:create_data_channel, label, opts}) end + @doc """ + Closes a DataChannel identified by `ref`. + + As of now, the closed channel directly transitions to closed state, + which is signaled with `{:data_channel_state_change, ref, :closed}` message. + For more information, refer to the [RTCDataChannel: close() method](https://developer.mozilla.org/en-US/docs/Web/API/RTCDataChannel/close). + + If `ref` does not identify any DataChannel, this function behaves like no-op. + """ + @spec close_data_channel(peer_connection(), DataChannel.ref()) :: :ok + def close_data_channel(peer_connection, channel_ref) do + GenServer.call(peer_connection, {:close_data_channel, channel_ref}) + end + + @doc """ + Returns `t:ExWebRTC.DataChannel.t()` identified by `channel_ref` if it exists, `nil` otherwise. + + This function can be especially helpful when you want to obtain DataChannel `id`. Normally, + before SCTP connection is established, `create_data_channel/3` will return DataChannel struct + with `id` set to `nil`. After receiving `{:data_channel_state_change, ref, :open}` message, + you can call this function to obtain the same struct, but with `id` set to proper value. + """ + @spec get_data_channel(peer_connection(), DataChannel.ref()) :: DataChannel.t() | nil + def get_data_channel(peer_connection, channel_ref) do + GenServer.call(peer_connection, {:get_data_channel, channel_ref}) + end + @doc """ Closes the PeerConnection. @@ -912,6 +941,20 @@ defmodule ExWebRTC.PeerConnection do end end + @impl true + def handle_call({:close_data_channel, channel_ref}, _from, state) do + {events, sctp_transport} = SCTPTransport.close_channel(state.sctp_transport, channel_ref) + handle_sctp_events(events, state) + + {:reply, :ok, %{state | sctp_transport: sctp_transport}} + end + + @impl true + def handle_call({:get_data_channel, channel_ref}, _from, state) do + channel = SCTPTransport.get_channel(state.sctp_transport, channel_ref) + {:reply, channel, state} + end + @impl true def handle_call(:get_stats, _from, state) do timestamp = System.os_time(:millisecond) diff --git a/lib/ex_webrtc/sctp_transport.ex b/lib/ex_webrtc/sctp_transport.ex index 009e6ad..594e1c7 100644 --- a/lib/ex_webrtc/sctp_transport.ex +++ b/lib/ex_webrtc/sctp_transport.ex @@ -80,7 +80,33 @@ defmodule ExWebRTC.SCTPTransport do {events, channel, sctp_transport} end - # TODO: close channel + @spec close_channel(t(), DataChannel.ref()) :: {[event()], t()} + def close_channel(sctp_transport, ref) do + # TODO: according to spec, this should move to `closing` state + # and only then be closed, but oh well... + case Map.pop(sctp_transport.channels, ref) do + {nil, _channels} -> + Logger.warning("Trying to close non-existent channel with ref #{inspect(ref)}") + {[], sctp_transport} + + {%DataChannel{id: id}, channels} -> + sctp_transport = %{sctp_transport | channels: channels} + + {events, sctp_transport} = + if id != nil do + :ok = ExSCTP.close_stream(sctp_transport.ref, id) + handle_events(sctp_transport) + else + {[], sctp_transport} + end + + event = {:state_change, ref, :closed} + {[event | events], sctp_transport} + end + end + + @spec get_channel(t(), DataChannel.ref()) :: DataChannel.t() | nil + def get_channel(sctp_transport, ref), do: Map.get(sctp_transport.channels, ref) @spec send(t(), DataChannel.ref(), :string | :binary, binary()) :: {[event()], t()} def send(sctp_transport, ref, type, data) do @@ -99,7 +125,10 @@ defmodule ExWebRTC.SCTPTransport do {[], sctp_transport} :error -> - Logger.warning("Trying to send data over non-existing DataChannel with ref #{ref}") + Logger.warning( + "Trying to send data over non-existent DataChannel with ref #{inspect(ref)}" + ) + {[], sctp_transport} end end @@ -157,6 +186,7 @@ defmodule ExWebRTC.SCTPTransport do case handle_event(sctp_transport, event) do {:none, transport} -> {Enum.reverse(events), transport} {nil, transport} -> handle_events(transport, events) + {other, transport} when is_list(other) -> handle_events(transport, other ++ events) {other, transport} -> handle_events(transport, [other | events]) end end @@ -171,9 +201,18 @@ defmodule ExWebRTC.SCTPTransport do {nil, sctp_transport} end - defp handle_event(sctp_transport, {:stream_closed, _id}) do - # TODO: handle closing channels - {nil, sctp_transport} + defp handle_event(sctp_transport, {:stream_closed, id}) do + Logger.debug("SCTP stream #{id} has been closed") + + case Enum.find(sctp_transport.channels, fn {_k, v} -> v.id == id end) do + {ref, %DataChannel{ref: ref}} -> + channels = Map.delete(sctp_transport.channels, ref) + event = {:state_change, ref, :closed} + {event, %{sctp_transport | channels: channels}} + + _other -> + {nil, sctp_transport} + end end defp handle_event(sctp_transport, :connected) do @@ -206,13 +245,23 @@ defmodule ExWebRTC.SCTPTransport do defp handle_event(sctp_transport, {:data, id, @dcep_ppi, data}) do with {:ok, dcep} <- DCEP.decode(data), - {:ok, sctp_transport, event} <- handle_dcep(sctp_transport, id, dcep) do - {event, sctp_transport} + {:ok, sctp_transport, events} <- handle_dcep(sctp_transport, id, dcep) do + # events is either list or a single event + {events, sctp_transport} else :error -> - # TODO: close the channel Logger.warning("Received invalid DCEP message. Closing the stream with id #{id}") - {nil, sctp_transport} + + ExSCTP.close_stream(sctp_transport.ref, id) + + case Enum.find_value(sctp_transport.channels, fn {_k, v} -> v.id == id end) do + {ref, %DataChannel{}} -> + channels = Map.delete(sctp_transport.channels, ref) + {{:state_change, ref, :closed}, %{sctp_transport | channels: channels}} + + nil -> + {nil, sctp_transport} + end end end @@ -228,7 +277,7 @@ defmodule ExWebRTC.SCTPTransport do nil -> Logger.warning( - "Received data over non-existing DataChannel on stream with id #{id}. Discarding" + "Received data over non-existent DataChannel on stream with id #{id}. Discarding" ) {nil, sctp_transport} @@ -258,10 +307,32 @@ defmodule ExWebRTC.SCTPTransport do } # In theory, we should also send the :open event here (W3C 6.2.3) + # TODO channels = Map.put(sctp_transport.channels, channel.ref, channel) - {:ok, %{sctp_transport | channels: channels}, {:channel, channel}} + sctp_transport = %{sctp_transport | channels: channels} + + case ExSCTP.configure_stream( + sctp_transport.ref, + id, + channel.ordered, + dco.reliability, + dco.param + ) do + :ok -> + # remote channels also result in open event + # even tho they already have ready_state open in the {:data_channel, ...} message + # W3C 6.2.3 + events = [{:state_change, channel.ref, :open}, {:channel, channel}] + {:ok, sctp_transport, events} + + {:error, _res} -> + Logger.warning("Unable to set stream #{id} parameters") + :error + end else - _other -> :error + _other -> + Logger.warning("Received invalid DCEP Open on stream #{id}") + :error end end @@ -269,17 +340,30 @@ defmodule ExWebRTC.SCTPTransport do case Enum.find(sctp_transport.channels, fn {_k, v} -> v.id == id end) do {ref, %DataChannel{ready_state: :connecting} = channel} -> Logger.debug("Locally opened DataChannel #{id} has been negotiated succesfully") - # TODO: set the parameters + channel = %DataChannel{channel | ready_state: :open} channels = Map.put(sctp_transport.channels, ref, channel) - event = {:state_change, ref, :open} + sctp_transport = %{sctp_transport | channels: channels} + + {rel_type, rel_param} = + case channel do + %DataChannel{max_packet_life_time: nil, max_retransmits: nil} -> {:reliable, 0} + %DataChannel{max_retransmits: v} when v != nil -> {:rexmit, v} + %DataChannel{max_packet_life_time: v} when v != nil -> {:timed, v} + end + + case ExSCTP.configure_stream(sctp_transport.ref, id, channel.ordered, rel_type, rel_param) do + :ok -> + {:ok, sctp_transport, {:state_change, ref, :open}} - {:ok, %{sctp_transport | channels: channels}, event} + {:error, _res} -> + Logger.warning("Unable to set stream #{id} parameters") + :error + end _other -> - # TODO: close the channel Logger.warning("Received DCEP Ack without sending the DCEP Open message on stream #{id}") - {:ok, sctp_transport, nil} + :error end end diff --git a/mix.lock b/mix.lock index 708962e..a0e9638 100644 --- a/mix.lock +++ b/mix.lock @@ -17,7 +17,7 @@ "ex_libsrtp": {:hex, :ex_libsrtp, "0.7.2", "211bd89c08026943ce71f3e2c0231795b99cee748808ed3ae7b97cd8d2450b6b", [:mix], [{:bunch, "~> 1.6", [hex: :bunch, repo: "hexpm", optional: false]}, {:bundlex, "~> 1.3", [hex: :bundlex, repo: "hexpm", optional: false]}, {:membrane_precompiled_dependency_provider, "~> 0.1.0", [hex: :membrane_precompiled_dependency_provider, repo: "hexpm", optional: false]}, {:unifex, "~> 1.1", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "2e20645d0d739a4ecdcf8d4810a0c198120c8a2f617f2b75b2e2e704d59f492a"}, "ex_rtcp": {:hex, :ex_rtcp, "0.4.0", "f9e515462a9581798ff6413583a25174cfd2101c94a2ebee871cca7639886f0a", [:mix], [], "hexpm", "28956602cf210d692fcdaf3f60ca49681634e1deb28ace41246aee61ee22dc3b"}, "ex_rtp": {:hex, :ex_rtp, "0.4.0", "1f1b5c1440a904706011e3afbb41741f5da309ce251cb986690ce9fd82636658", [:mix], [], "hexpm", "0f72d80d5953a62057270040f0f1ee6f955c08eeae82ac659c038001d7d5a790"}, - "ex_sctp": {:git, "https://github.com/elixir-webrtc/ex_sctp.git", "3de986b3cd00796a1f2a5ac40b001682d5712264", []}, + "ex_sctp": {:git, "https://github.com/elixir-webrtc/ex_sctp.git", "0e9f33fc220ee7f2b1f5843d5ab02cb7c229d837", []}, "ex_sdp": {:hex, :ex_sdp, "1.0.0", "c66cd66d60ad03ff1eecdc6db6a1b8a7b89fec260fcc22e8d6703fc5bbf430a3", [:mix], [{:bunch, "~> 1.3", [hex: :bunch, repo: "hexpm", optional: false]}, {:elixir_uuid, "~> 1.2", [hex: :elixir_uuid, repo: "hexpm", optional: false]}], "hexpm", "e165dff988b8ab9d93588636aa5f3f683e1f848fc63b78b12382c8fa3dd39216"}, "ex_stun": {:hex, :ex_stun, "0.2.0", "feb1fc7db0356406655b2a617805e6c712b93308c8ea2bf0ba1197b1f0866deb", [:mix], [], "hexpm", "1e01ba8290082ccbf37acaa5190d1f69b51edd6de2026a8d6d51368b29d115d0"}, "ex_turn": {:hex, :ex_turn, "0.1.0", "177405aadf3d754567d0d37cf881a83f9cacf8f45314d188633b04c4a9e7c1ec", [:mix], [{:ex_stun, "~> 0.2.0", [hex: :ex_stun, repo: "hexpm", optional: false]}], "hexpm", "d677737fb7d45274d5dac19fe3c26b9038b6effbc0a6b3e7417bccc76b6d1cd3"}, diff --git a/test/ex_webrtc/data_channel_test.exs b/test/ex_webrtc/data_channel_test.exs index 916d4ae..bfff4a3 100644 --- a/test/ex_webrtc/data_channel_test.exs +++ b/test/ex_webrtc/data_channel_test.exs @@ -10,7 +10,7 @@ defmodule ExWebRTC.DataChannelTest do {:ok, pc2} = PeerConnection.start_link() label1 = "my label 1" - {:ok, %DataChannel{ref: ref1}} = PeerConnection.create_data_channel(pc1, label1) + {:ok, %DataChannel{ref: ref1, id: nil}} = PeerConnection.create_data_channel(pc1, label1) assert_receive {:ex_webrtc, ^pc1, :negotiation_needed} :ok = negotiate(pc1, pc2) @@ -20,9 +20,12 @@ defmodule ExWebRTC.DataChannelTest do :ok = connect(pc1, pc2) assert_receive {:ex_webrtc, ^pc2, {:data_channel, chan1}} - assert %DataChannel{id: 1, label: ^label1, ordered: true} = chan1 + assert %DataChannel{ref: rem_ref1, id: 1, label: ^label1, ordered: true} = chan1 + assert_receive {:ex_webrtc, ^pc2, {:data_channel_state_change, ^rem_ref1, :open}} assert_receive {:ex_webrtc, ^pc1, {:data_channel_state_change, ^ref1, :open}} + assert %DataChannel{id: 1} = PeerConnection.get_data_channel(pc1, ref1) + label2 = "my label 2" protocol = "my proto" @@ -32,7 +35,11 @@ defmodule ExWebRTC.DataChannelTest do refute_receive {:ex_webrtc, ^pc1, :negotiation_needed} assert_receive {:ex_webrtc, ^pc2, {:data_channel, chan2}} - assert %DataChannel{id: 3, label: ^label2, protocol: ^protocol, ordered: false} = chan2 + + assert %DataChannel{ref: rem_ref2, id: 3, label: ^label2, protocol: ^protocol, ordered: false} = + chan2 + + assert_receive {:ex_webrtc, ^pc2, {:data_channel_state_change, ^rem_ref2, :open}} assert_receive {:ex_webrtc, ^pc1, {:data_channel_state_change, ^ref2, :open}} label3 = "my label 3" @@ -41,8 +48,37 @@ defmodule ExWebRTC.DataChannelTest do refute_receive {:ex_webrtc, ^pc2, :negotiation_needed} assert_receive {:ex_webrtc, ^pc1, {:data_channel, chan3}} - assert %DataChannel{id: 4, label: ^label3} = chan3 + assert %DataChannel{ref: rem_ref3, id: 4, label: ^label3} = chan3 assert_receive {:ex_webrtc, ^pc2, {:data_channel_state_change, ^ref3, :open}} + assert_receive {:ex_webrtc, ^pc1, {:data_channel_state_change, ^rem_ref3, :open}} + end + + describe "closing the channel" do + setup do + {:ok, pc1} = PeerConnection.start_link() + {:ok, pc2} = PeerConnection.start_link() + {:ok, %DataChannel{ref: ref1}} = PeerConnection.create_data_channel(pc1, "label") + + :ok = negotiate(pc1, pc2) + :ok = connect(pc1, pc2) + + assert_receive {:ex_webrtc, ^pc2, {:data_channel, %DataChannel{ref: ref2}}} + assert_receive {:ex_webrtc, ^pc1, {:data_channel_state_change, ^ref1, :open}} + + %{pc1: pc1, pc2: pc2, ref1: ref1, ref2: ref2} + end + + test "by initiating peer", %{pc1: pc1, pc2: pc2, ref1: ref1, ref2: ref2} do + assert :ok = PeerConnection.close_data_channel(pc1, ref1) + assert_receive {:ex_webrtc, ^pc1, {:data_channel_state_change, ^ref1, :closed}} + assert_receive {:ex_webrtc, ^pc2, {:data_channel_state_change, ^ref2, :closed}} + end + + test "by receiving peer", %{pc1: pc1, pc2: pc2, ref1: ref1, ref2: ref2} do + assert :ok = PeerConnection.close_data_channel(pc2, ref2) + assert_receive {:ex_webrtc, ^pc1, {:data_channel_state_change, ^ref1, :closed}} + assert_receive {:ex_webrtc, ^pc2, {:data_channel_state_change, ^ref2, :closed}} + end end describe "negotiating" do