Skip to content

Commit

Permalink
DataChannels: allow for closing, apply negotiated stream parameters (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
LVala authored Aug 28, 2024
1 parent 637f2d7 commit 8109856
Show file tree
Hide file tree
Showing 6 changed files with 191 additions and 23 deletions.
5 changes: 5 additions & 0 deletions examples/chat/lib/chat/peer_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -103,5 +103,10 @@ defmodule Chat.PeerHandler do
{:ok, state}
end

defp handle_webrtc_msg({:data_channel_state_change, ref, :closed}, %{channel_ref: ref} = state) do
Logger.warning("Channel #{inspect(ref)} has been closed")
{:stop, :channel_closed, state}
end

defp handle_webrtc_msg(_msg, state), do: {:ok, state}
end
2 changes: 1 addition & 1 deletion examples/chat/mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
"ex_libsrtp": {:hex, :ex_libsrtp, "0.7.2", "211bd89c08026943ce71f3e2c0231795b99cee748808ed3ae7b97cd8d2450b6b", [:mix], [{:bunch, "~> 1.6", [hex: :bunch, repo: "hexpm", optional: false]}, {:bundlex, "~> 1.3", [hex: :bundlex, repo: "hexpm", optional: false]}, {:membrane_precompiled_dependency_provider, "~> 0.1.0", [hex: :membrane_precompiled_dependency_provider, repo: "hexpm", optional: false]}, {:unifex, "~> 1.1", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "2e20645d0d739a4ecdcf8d4810a0c198120c8a2f617f2b75b2e2e704d59f492a"},
"ex_rtcp": {:hex, :ex_rtcp, "0.4.0", "f9e515462a9581798ff6413583a25174cfd2101c94a2ebee871cca7639886f0a", [:mix], [], "hexpm", "28956602cf210d692fcdaf3f60ca49681634e1deb28ace41246aee61ee22dc3b"},
"ex_rtp": {:hex, :ex_rtp, "0.4.0", "1f1b5c1440a904706011e3afbb41741f5da309ce251cb986690ce9fd82636658", [:mix], [], "hexpm", "0f72d80d5953a62057270040f0f1ee6f955c08eeae82ac659c038001d7d5a790"},
"ex_sctp": {:git, "https://github.com/elixir-webrtc/ex_sctp.git", "1576207bda0eba3634a2b0075899042e9b309e60", []},
"ex_sctp": {:git, "https://github.com/elixir-webrtc/ex_sctp.git", "0e9f33fc220ee7f2b1f5843d5ab02cb7c229d837", []},
"ex_sdp": {:hex, :ex_sdp, "1.0.0", "c66cd66d60ad03ff1eecdc6db6a1b8a7b89fec260fcc22e8d6703fc5bbf430a3", [:mix], [{:bunch, "~> 1.3", [hex: :bunch, repo: "hexpm", optional: false]}, {:elixir_uuid, "~> 1.2", [hex: :elixir_uuid, repo: "hexpm", optional: false]}], "hexpm", "e165dff988b8ab9d93588636aa5f3f683e1f848fc63b78b12382c8fa3dd39216"},
"ex_stun": {:hex, :ex_stun, "0.2.0", "feb1fc7db0356406655b2a617805e6c712b93308c8ea2bf0ba1197b1f0866deb", [:mix], [], "hexpm", "1e01ba8290082ccbf37acaa5190d1f69b51edd6de2026a8d6d51368b29d115d0"},
"ex_turn": {:hex, :ex_turn, "0.1.0", "177405aadf3d754567d0d37cf881a83f9cacf8f45314d188633b04c4a9e7c1ec", [:mix], [{:ex_stun, "~> 0.2.0", [hex: :ex_stun, repo: "hexpm", optional: false]}], "hexpm", "d677737fb7d45274d5dac19fe3c26b9038b6effbc0a6b3e7417bccc76b6d1cd3"},
Expand Down
43 changes: 43 additions & 0 deletions lib/ex_webrtc/peer_connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,8 @@ defmodule ExWebRTC.PeerConnection do
Sends data over DataChannel, using channel identified by `ref`.
Requires the channel to be in `:open` state.
If `ref` does not identify any DataChannel, this function behaves like no-op.
"""
@spec send_data(peer_connection(), DataChannel.ref(), binary()) :: :ok
def send_data(peer_connection, channel_ref, data) do
Expand Down Expand Up @@ -435,6 +437,33 @@ defmodule ExWebRTC.PeerConnection do
GenServer.call(peer_connection, {:create_data_channel, label, opts})
end

@doc """
Closes a DataChannel identified by `ref`.
As of now, the closed channel directly transitions to closed state,
which is signaled with `{:data_channel_state_change, ref, :closed}` message.
For more information, refer to the [RTCDataChannel: close() method](https://developer.mozilla.org/en-US/docs/Web/API/RTCDataChannel/close).
If `ref` does not identify any DataChannel, this function behaves like no-op.
"""
@spec close_data_channel(peer_connection(), DataChannel.ref()) :: :ok
def close_data_channel(peer_connection, channel_ref) do
GenServer.call(peer_connection, {:close_data_channel, channel_ref})
end

@doc """
Returns `t:ExWebRTC.DataChannel.t()` identified by `channel_ref` if it exists, `nil` otherwise.
This function can be especially helpful when you want to obtain DataChannel `id`. Normally,
before SCTP connection is established, `create_data_channel/3` will return DataChannel struct
with `id` set to `nil`. After receiving `{:data_channel_state_change, ref, :open}` message,
you can call this function to obtain the same struct, but with `id` set to proper value.
"""
@spec get_data_channel(peer_connection(), DataChannel.ref()) :: DataChannel.t() | nil
def get_data_channel(peer_connection, channel_ref) do
GenServer.call(peer_connection, {:get_data_channel, channel_ref})
end

@doc """
Closes the PeerConnection.
Expand Down Expand Up @@ -912,6 +941,20 @@ defmodule ExWebRTC.PeerConnection do
end
end

@impl true
def handle_call({:close_data_channel, channel_ref}, _from, state) do
{events, sctp_transport} = SCTPTransport.close_channel(state.sctp_transport, channel_ref)
handle_sctp_events(events, state)

{:reply, :ok, %{state | sctp_transport: sctp_transport}}
end

@impl true
def handle_call({:get_data_channel, channel_ref}, _from, state) do
channel = SCTPTransport.get_channel(state.sctp_transport, channel_ref)
{:reply, channel, state}
end

@impl true
def handle_call(:get_stats, _from, state) do
timestamp = System.os_time(:millisecond)
Expand Down
118 changes: 101 additions & 17 deletions lib/ex_webrtc/sctp_transport.ex
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,33 @@ defmodule ExWebRTC.SCTPTransport do
{events, channel, sctp_transport}
end

# TODO: close channel
@spec close_channel(t(), DataChannel.ref()) :: {[event()], t()}
def close_channel(sctp_transport, ref) do
# TODO: according to spec, this should move to `closing` state
# and only then be closed, but oh well...
case Map.pop(sctp_transport.channels, ref) do
{nil, _channels} ->
Logger.warning("Trying to close non-existent channel with ref #{inspect(ref)}")
{[], sctp_transport}

{%DataChannel{id: id}, channels} ->
sctp_transport = %{sctp_transport | channels: channels}

{events, sctp_transport} =
if id != nil do
:ok = ExSCTP.close_stream(sctp_transport.ref, id)
handle_events(sctp_transport)
else
{[], sctp_transport}
end

event = {:state_change, ref, :closed}
{[event | events], sctp_transport}
end
end

@spec get_channel(t(), DataChannel.ref()) :: DataChannel.t() | nil
def get_channel(sctp_transport, ref), do: Map.get(sctp_transport.channels, ref)

@spec send(t(), DataChannel.ref(), :string | :binary, binary()) :: {[event()], t()}
def send(sctp_transport, ref, type, data) do
Expand All @@ -99,7 +125,10 @@ defmodule ExWebRTC.SCTPTransport do
{[], sctp_transport}

:error ->
Logger.warning("Trying to send data over non-existing DataChannel with ref #{ref}")
Logger.warning(
"Trying to send data over non-existent DataChannel with ref #{inspect(ref)}"
)

{[], sctp_transport}
end
end
Expand Down Expand Up @@ -157,6 +186,7 @@ defmodule ExWebRTC.SCTPTransport do
case handle_event(sctp_transport, event) do
{:none, transport} -> {Enum.reverse(events), transport}
{nil, transport} -> handle_events(transport, events)
{other, transport} when is_list(other) -> handle_events(transport, other ++ events)
{other, transport} -> handle_events(transport, [other | events])
end
end
Expand All @@ -171,9 +201,18 @@ defmodule ExWebRTC.SCTPTransport do
{nil, sctp_transport}
end

defp handle_event(sctp_transport, {:stream_closed, _id}) do
# TODO: handle closing channels
{nil, sctp_transport}
defp handle_event(sctp_transport, {:stream_closed, id}) do
Logger.debug("SCTP stream #{id} has been closed")

case Enum.find(sctp_transport.channels, fn {_k, v} -> v.id == id end) do
{ref, %DataChannel{ref: ref}} ->
channels = Map.delete(sctp_transport.channels, ref)
event = {:state_change, ref, :closed}
{event, %{sctp_transport | channels: channels}}

_other ->
{nil, sctp_transport}
end
end

defp handle_event(sctp_transport, :connected) do
Expand Down Expand Up @@ -206,13 +245,23 @@ defmodule ExWebRTC.SCTPTransport do

defp handle_event(sctp_transport, {:data, id, @dcep_ppi, data}) do
with {:ok, dcep} <- DCEP.decode(data),
{:ok, sctp_transport, event} <- handle_dcep(sctp_transport, id, dcep) do
{event, sctp_transport}
{:ok, sctp_transport, events} <- handle_dcep(sctp_transport, id, dcep) do
# events is either list or a single event
{events, sctp_transport}
else
:error ->
# TODO: close the channel
Logger.warning("Received invalid DCEP message. Closing the stream with id #{id}")
{nil, sctp_transport}

ExSCTP.close_stream(sctp_transport.ref, id)

case Enum.find_value(sctp_transport.channels, fn {_k, v} -> v.id == id end) do
{ref, %DataChannel{}} ->
channels = Map.delete(sctp_transport.channels, ref)
{{:state_change, ref, :closed}, %{sctp_transport | channels: channels}}

nil ->
{nil, sctp_transport}
end
end
end

Expand All @@ -228,7 +277,7 @@ defmodule ExWebRTC.SCTPTransport do

nil ->
Logger.warning(
"Received data over non-existing DataChannel on stream with id #{id}. Discarding"
"Received data over non-existent DataChannel on stream with id #{id}. Discarding"
)

{nil, sctp_transport}
Expand Down Expand Up @@ -258,28 +307,63 @@ defmodule ExWebRTC.SCTPTransport do
}

# In theory, we should also send the :open event here (W3C 6.2.3)
# TODO
channels = Map.put(sctp_transport.channels, channel.ref, channel)
{:ok, %{sctp_transport | channels: channels}, {:channel, channel}}
sctp_transport = %{sctp_transport | channels: channels}

case ExSCTP.configure_stream(
sctp_transport.ref,
id,
channel.ordered,
dco.reliability,
dco.param
) do
:ok ->
# remote channels also result in open event
# even tho they already have ready_state open in the {:data_channel, ...} message
# W3C 6.2.3
events = [{:state_change, channel.ref, :open}, {:channel, channel}]
{:ok, sctp_transport, events}

{:error, _res} ->
Logger.warning("Unable to set stream #{id} parameters")
:error
end
else
_other -> :error
_other ->
Logger.warning("Received invalid DCEP Open on stream #{id}")
:error
end
end

defp handle_dcep(sctp_transport, id, %DCEP.DataChannelAck{}) do
case Enum.find(sctp_transport.channels, fn {_k, v} -> v.id == id end) do
{ref, %DataChannel{ready_state: :connecting} = channel} ->
Logger.debug("Locally opened DataChannel #{id} has been negotiated succesfully")
# TODO: set the parameters

channel = %DataChannel{channel | ready_state: :open}
channels = Map.put(sctp_transport.channels, ref, channel)
event = {:state_change, ref, :open}
sctp_transport = %{sctp_transport | channels: channels}

{rel_type, rel_param} =
case channel do
%DataChannel{max_packet_life_time: nil, max_retransmits: nil} -> {:reliable, 0}
%DataChannel{max_retransmits: v} when v != nil -> {:rexmit, v}
%DataChannel{max_packet_life_time: v} when v != nil -> {:timed, v}
end

case ExSCTP.configure_stream(sctp_transport.ref, id, channel.ordered, rel_type, rel_param) do
:ok ->
{:ok, sctp_transport, {:state_change, ref, :open}}

{:ok, %{sctp_transport | channels: channels}, event}
{:error, _res} ->
Logger.warning("Unable to set stream #{id} parameters")
:error
end

_other ->
# TODO: close the channel
Logger.warning("Received DCEP Ack without sending the DCEP Open message on stream #{id}")
{:ok, sctp_transport, nil}
:error
end
end

Expand Down
2 changes: 1 addition & 1 deletion mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
"ex_libsrtp": {:hex, :ex_libsrtp, "0.7.2", "211bd89c08026943ce71f3e2c0231795b99cee748808ed3ae7b97cd8d2450b6b", [:mix], [{:bunch, "~> 1.6", [hex: :bunch, repo: "hexpm", optional: false]}, {:bundlex, "~> 1.3", [hex: :bundlex, repo: "hexpm", optional: false]}, {:membrane_precompiled_dependency_provider, "~> 0.1.0", [hex: :membrane_precompiled_dependency_provider, repo: "hexpm", optional: false]}, {:unifex, "~> 1.1", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "2e20645d0d739a4ecdcf8d4810a0c198120c8a2f617f2b75b2e2e704d59f492a"},
"ex_rtcp": {:hex, :ex_rtcp, "0.4.0", "f9e515462a9581798ff6413583a25174cfd2101c94a2ebee871cca7639886f0a", [:mix], [], "hexpm", "28956602cf210d692fcdaf3f60ca49681634e1deb28ace41246aee61ee22dc3b"},
"ex_rtp": {:hex, :ex_rtp, "0.4.0", "1f1b5c1440a904706011e3afbb41741f5da309ce251cb986690ce9fd82636658", [:mix], [], "hexpm", "0f72d80d5953a62057270040f0f1ee6f955c08eeae82ac659c038001d7d5a790"},
"ex_sctp": {:git, "https://github.com/elixir-webrtc/ex_sctp.git", "3de986b3cd00796a1f2a5ac40b001682d5712264", []},
"ex_sctp": {:git, "https://github.com/elixir-webrtc/ex_sctp.git", "0e9f33fc220ee7f2b1f5843d5ab02cb7c229d837", []},
"ex_sdp": {:hex, :ex_sdp, "1.0.0", "c66cd66d60ad03ff1eecdc6db6a1b8a7b89fec260fcc22e8d6703fc5bbf430a3", [:mix], [{:bunch, "~> 1.3", [hex: :bunch, repo: "hexpm", optional: false]}, {:elixir_uuid, "~> 1.2", [hex: :elixir_uuid, repo: "hexpm", optional: false]}], "hexpm", "e165dff988b8ab9d93588636aa5f3f683e1f848fc63b78b12382c8fa3dd39216"},
"ex_stun": {:hex, :ex_stun, "0.2.0", "feb1fc7db0356406655b2a617805e6c712b93308c8ea2bf0ba1197b1f0866deb", [:mix], [], "hexpm", "1e01ba8290082ccbf37acaa5190d1f69b51edd6de2026a8d6d51368b29d115d0"},
"ex_turn": {:hex, :ex_turn, "0.1.0", "177405aadf3d754567d0d37cf881a83f9cacf8f45314d188633b04c4a9e7c1ec", [:mix], [{:ex_stun, "~> 0.2.0", [hex: :ex_stun, repo: "hexpm", optional: false]}], "hexpm", "d677737fb7d45274d5dac19fe3c26b9038b6effbc0a6b3e7417bccc76b6d1cd3"},
Expand Down
44 changes: 40 additions & 4 deletions test/ex_webrtc/data_channel_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ defmodule ExWebRTC.DataChannelTest do
{:ok, pc2} = PeerConnection.start_link()

label1 = "my label 1"
{:ok, %DataChannel{ref: ref1}} = PeerConnection.create_data_channel(pc1, label1)
{:ok, %DataChannel{ref: ref1, id: nil}} = PeerConnection.create_data_channel(pc1, label1)
assert_receive {:ex_webrtc, ^pc1, :negotiation_needed}

:ok = negotiate(pc1, pc2)
Expand All @@ -20,9 +20,12 @@ defmodule ExWebRTC.DataChannelTest do
:ok = connect(pc1, pc2)

assert_receive {:ex_webrtc, ^pc2, {:data_channel, chan1}}
assert %DataChannel{id: 1, label: ^label1, ordered: true} = chan1
assert %DataChannel{ref: rem_ref1, id: 1, label: ^label1, ordered: true} = chan1
assert_receive {:ex_webrtc, ^pc2, {:data_channel_state_change, ^rem_ref1, :open}}
assert_receive {:ex_webrtc, ^pc1, {:data_channel_state_change, ^ref1, :open}}

assert %DataChannel{id: 1} = PeerConnection.get_data_channel(pc1, ref1)

label2 = "my label 2"
protocol = "my proto"

Expand All @@ -32,7 +35,11 @@ defmodule ExWebRTC.DataChannelTest do
refute_receive {:ex_webrtc, ^pc1, :negotiation_needed}

assert_receive {:ex_webrtc, ^pc2, {:data_channel, chan2}}
assert %DataChannel{id: 3, label: ^label2, protocol: ^protocol, ordered: false} = chan2

assert %DataChannel{ref: rem_ref2, id: 3, label: ^label2, protocol: ^protocol, ordered: false} =
chan2

assert_receive {:ex_webrtc, ^pc2, {:data_channel_state_change, ^rem_ref2, :open}}
assert_receive {:ex_webrtc, ^pc1, {:data_channel_state_change, ^ref2, :open}}

label3 = "my label 3"
Expand All @@ -41,8 +48,37 @@ defmodule ExWebRTC.DataChannelTest do
refute_receive {:ex_webrtc, ^pc2, :negotiation_needed}

assert_receive {:ex_webrtc, ^pc1, {:data_channel, chan3}}
assert %DataChannel{id: 4, label: ^label3} = chan3
assert %DataChannel{ref: rem_ref3, id: 4, label: ^label3} = chan3
assert_receive {:ex_webrtc, ^pc2, {:data_channel_state_change, ^ref3, :open}}
assert_receive {:ex_webrtc, ^pc1, {:data_channel_state_change, ^rem_ref3, :open}}
end

describe "closing the channel" do
setup do
{:ok, pc1} = PeerConnection.start_link()
{:ok, pc2} = PeerConnection.start_link()
{:ok, %DataChannel{ref: ref1}} = PeerConnection.create_data_channel(pc1, "label")

:ok = negotiate(pc1, pc2)
:ok = connect(pc1, pc2)

assert_receive {:ex_webrtc, ^pc2, {:data_channel, %DataChannel{ref: ref2}}}
assert_receive {:ex_webrtc, ^pc1, {:data_channel_state_change, ^ref1, :open}}

%{pc1: pc1, pc2: pc2, ref1: ref1, ref2: ref2}
end

test "by initiating peer", %{pc1: pc1, pc2: pc2, ref1: ref1, ref2: ref2} do
assert :ok = PeerConnection.close_data_channel(pc1, ref1)
assert_receive {:ex_webrtc, ^pc1, {:data_channel_state_change, ^ref1, :closed}}
assert_receive {:ex_webrtc, ^pc2, {:data_channel_state_change, ^ref2, :closed}}
end

test "by receiving peer", %{pc1: pc1, pc2: pc2, ref1: ref1, ref2: ref2} do
assert :ok = PeerConnection.close_data_channel(pc2, ref2)
assert_receive {:ex_webrtc, ^pc1, {:data_channel_state_change, ^ref1, :closed}}
assert_receive {:ex_webrtc, ^pc2, {:data_channel_state_change, ^ref2, :closed}}
end
end

describe "negotiating" do
Expand Down

0 comments on commit 8109856

Please sign in to comment.