Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DataChannels: allow for closing, apply negotiated stream parameters #161

Merged
merged 6 commits into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions examples/chat/lib/chat/peer_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -103,5 +103,10 @@ defmodule Chat.PeerHandler do
{:ok, state}
end

defp handle_webrtc_msg({:data_channel_state_change, ref, :closed}, %{channel_ref: ref} = state) do
Logger.warning("Channel #{inspect(ref)} has been closed")
{:stop, :channel_closed, state}
end

defp handle_webrtc_msg(_msg, state), do: {:ok, state}
end
2 changes: 1 addition & 1 deletion examples/chat/mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
"ex_libsrtp": {:hex, :ex_libsrtp, "0.7.2", "211bd89c08026943ce71f3e2c0231795b99cee748808ed3ae7b97cd8d2450b6b", [:mix], [{:bunch, "~> 1.6", [hex: :bunch, repo: "hexpm", optional: false]}, {:bundlex, "~> 1.3", [hex: :bundlex, repo: "hexpm", optional: false]}, {:membrane_precompiled_dependency_provider, "~> 0.1.0", [hex: :membrane_precompiled_dependency_provider, repo: "hexpm", optional: false]}, {:unifex, "~> 1.1", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "2e20645d0d739a4ecdcf8d4810a0c198120c8a2f617f2b75b2e2e704d59f492a"},
"ex_rtcp": {:hex, :ex_rtcp, "0.4.0", "f9e515462a9581798ff6413583a25174cfd2101c94a2ebee871cca7639886f0a", [:mix], [], "hexpm", "28956602cf210d692fcdaf3f60ca49681634e1deb28ace41246aee61ee22dc3b"},
"ex_rtp": {:hex, :ex_rtp, "0.4.0", "1f1b5c1440a904706011e3afbb41741f5da309ce251cb986690ce9fd82636658", [:mix], [], "hexpm", "0f72d80d5953a62057270040f0f1ee6f955c08eeae82ac659c038001d7d5a790"},
"ex_sctp": {:git, "https://github.com/elixir-webrtc/ex_sctp.git", "1576207bda0eba3634a2b0075899042e9b309e60", []},
"ex_sctp": {:git, "https://github.com/elixir-webrtc/ex_sctp.git", "0e9f33fc220ee7f2b1f5843d5ab02cb7c229d837", []},
"ex_sdp": {:hex, :ex_sdp, "1.0.0", "c66cd66d60ad03ff1eecdc6db6a1b8a7b89fec260fcc22e8d6703fc5bbf430a3", [:mix], [{:bunch, "~> 1.3", [hex: :bunch, repo: "hexpm", optional: false]}, {:elixir_uuid, "~> 1.2", [hex: :elixir_uuid, repo: "hexpm", optional: false]}], "hexpm", "e165dff988b8ab9d93588636aa5f3f683e1f848fc63b78b12382c8fa3dd39216"},
"ex_stun": {:hex, :ex_stun, "0.2.0", "feb1fc7db0356406655b2a617805e6c712b93308c8ea2bf0ba1197b1f0866deb", [:mix], [], "hexpm", "1e01ba8290082ccbf37acaa5190d1f69b51edd6de2026a8d6d51368b29d115d0"},
"ex_turn": {:hex, :ex_turn, "0.1.0", "177405aadf3d754567d0d37cf881a83f9cacf8f45314d188633b04c4a9e7c1ec", [:mix], [{:ex_stun, "~> 0.2.0", [hex: :ex_stun, repo: "hexpm", optional: false]}], "hexpm", "d677737fb7d45274d5dac19fe3c26b9038b6effbc0a6b3e7417bccc76b6d1cd3"},
Expand Down
43 changes: 43 additions & 0 deletions lib/ex_webrtc/peer_connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,8 @@ defmodule ExWebRTC.PeerConnection do
Sends data over DataChannel, using channel identified by `ref`.

Requires the channel to be in `:open` state.

If `ref` does not identify any DataChannel, this function behaves like no-op.
"""
@spec send_data(peer_connection(), DataChannel.ref(), binary()) :: :ok
def send_data(peer_connection, channel_ref, data) do
Expand Down Expand Up @@ -435,6 +437,33 @@ defmodule ExWebRTC.PeerConnection do
GenServer.call(peer_connection, {:create_data_channel, label, opts})
end

@doc """
Closes a DataChannel identified by `ref`.

As of now, the closed channel directly transitions to closed state,
which is signaled with `{:data_channel_state_change, ref, :closed}` message.
For more information, refer to the [RTCDataChannel: close() method](https://developer.mozilla.org/en-US/docs/Web/API/RTCDataChannel/close).

If `ref` does not identify any DataChannel, this function behaves like no-op.
"""
@spec close_data_channel(peer_connection(), DataChannel.ref()) :: :ok
def close_data_channel(peer_connection, channel_ref) do
GenServer.call(peer_connection, {:close_data_channel, channel_ref})
end

@doc """
Returns `t:ExWebRTC.DataChannel.t()` identified by `channel_ref` if it exists, `nil` otherwise.

This function can be especially helpful when you want to obtain DataChannel `id`. Normally,
before SCTP connection is established, `create_data_channel/3` will return DataChannel struct
with `id` set to `nil`. After receiving `{:data_channel_state_change, ref, :open}` message,
you can call this function to obtain the same struct, but with `id` set to proper value.
"""
@spec get_data_channel(peer_connection(), DataChannel.ref()) :: DataChannel.t() | nil
def get_data_channel(peer_connection, channel_ref) do
GenServer.call(peer_connection, {:get_data_channel, channel_ref})
end

@doc """
Closes the PeerConnection.

Expand Down Expand Up @@ -912,6 +941,20 @@ defmodule ExWebRTC.PeerConnection do
end
end

@impl true
def handle_call({:close_data_channel, channel_ref}, _from, state) do
{events, sctp_transport} = SCTPTransport.close_channel(state.sctp_transport, channel_ref)
handle_sctp_events(events, state)

{:reply, :ok, %{state | sctp_transport: sctp_transport}}
end

@impl true
def handle_call({:get_data_channel, channel_ref}, _from, state) do
channel = SCTPTransport.get_channel(state.sctp_transport, channel_ref)
{:reply, channel, state}
end

@impl true
def handle_call(:get_stats, _from, state) do
timestamp = System.os_time(:millisecond)
Expand Down
118 changes: 101 additions & 17 deletions lib/ex_webrtc/sctp_transport.ex
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,33 @@ defmodule ExWebRTC.SCTPTransport do
{events, channel, sctp_transport}
end

# TODO: close channel
@spec close_channel(t(), DataChannel.ref()) :: {[event()], t()}
def close_channel(sctp_transport, ref) do
# TODO: according to spec, this should move to `closing` state
# and only then be closed, but oh well...
case Map.pop(sctp_transport.channels, ref) do
{nil, _channels} ->
Logger.warning("Trying to close non-existent channel with ref #{inspect(ref)}")
{[], sctp_transport}

{%DataChannel{id: id}, channels} ->
sctp_transport = %{sctp_transport | channels: channels}

{events, sctp_transport} =
if id != nil do
:ok = ExSCTP.close_stream(sctp_transport.ref, id)
handle_events(sctp_transport)
else
{[], sctp_transport}
end

event = {:state_change, ref, :closed}
{[event | events], sctp_transport}
end
end

@spec get_channel(t(), DataChannel.ref()) :: DataChannel.t() | nil
def get_channel(sctp_transport, ref), do: Map.get(sctp_transport.channels, ref)

@spec send(t(), DataChannel.ref(), :string | :binary, binary()) :: {[event()], t()}
def send(sctp_transport, ref, type, data) do
Expand All @@ -99,7 +125,10 @@ defmodule ExWebRTC.SCTPTransport do
{[], sctp_transport}

:error ->
Logger.warning("Trying to send data over non-existing DataChannel with ref #{ref}")
Logger.warning(
"Trying to send data over non-existent DataChannel with ref #{inspect(ref)}"
)

{[], sctp_transport}
end
end
Expand Down Expand Up @@ -157,6 +186,7 @@ defmodule ExWebRTC.SCTPTransport do
case handle_event(sctp_transport, event) do
{:none, transport} -> {Enum.reverse(events), transport}
{nil, transport} -> handle_events(transport, events)
{other, transport} when is_list(other) -> handle_events(transport, other ++ events)
{other, transport} -> handle_events(transport, [other | events])
end
end
Expand All @@ -171,9 +201,18 @@ defmodule ExWebRTC.SCTPTransport do
{nil, sctp_transport}
end

defp handle_event(sctp_transport, {:stream_closed, _id}) do
# TODO: handle closing channels
{nil, sctp_transport}
defp handle_event(sctp_transport, {:stream_closed, id}) do
Logger.debug("SCTP stream #{id} has been closed")

case Enum.find(sctp_transport.channels, fn {_k, v} -> v.id == id end) do
{ref, %DataChannel{ref: ref}} ->
channels = Map.delete(sctp_transport.channels, ref)
event = {:state_change, ref, :closed}
{event, %{sctp_transport | channels: channels}}

_other ->
{nil, sctp_transport}
end
end

defp handle_event(sctp_transport, :connected) do
Expand Down Expand Up @@ -206,13 +245,23 @@ defmodule ExWebRTC.SCTPTransport do

defp handle_event(sctp_transport, {:data, id, @dcep_ppi, data}) do
with {:ok, dcep} <- DCEP.decode(data),
{:ok, sctp_transport, event} <- handle_dcep(sctp_transport, id, dcep) do
{event, sctp_transport}
{:ok, sctp_transport, events} <- handle_dcep(sctp_transport, id, dcep) do
# events is either list or a single event
{events, sctp_transport}
else
:error ->
# TODO: close the channel
Logger.warning("Received invalid DCEP message. Closing the stream with id #{id}")
{nil, sctp_transport}

ExSCTP.close_stream(sctp_transport.ref, id)

case Enum.find_value(sctp_transport.channels, fn {_k, v} -> v.id == id end) do
{ref, %DataChannel{}} ->
channels = Map.delete(sctp_transport.channels, ref)
{{:state_change, ref, :closed}, %{sctp_transport | channels: channels}}

nil ->
{nil, sctp_transport}
end
end
end

Expand All @@ -228,7 +277,7 @@ defmodule ExWebRTC.SCTPTransport do

nil ->
Logger.warning(
"Received data over non-existing DataChannel on stream with id #{id}. Discarding"
"Received data over non-existent DataChannel on stream with id #{id}. Discarding"
)

{nil, sctp_transport}
Expand Down Expand Up @@ -258,28 +307,63 @@ defmodule ExWebRTC.SCTPTransport do
}

# In theory, we should also send the :open event here (W3C 6.2.3)
# TODO
channels = Map.put(sctp_transport.channels, channel.ref, channel)
{:ok, %{sctp_transport | channels: channels}, {:channel, channel}}
sctp_transport = %{sctp_transport | channels: channels}

case ExSCTP.configure_stream(
sctp_transport.ref,
id,
channel.ordered,
dco.reliability,
dco.param
) do
:ok ->
# remote channels also result in open event
# even tho they already have ready_state open in the {:data_channel, ...} message
# W3C 6.2.3
events = [{:state_change, channel.ref, :open}, {:channel, channel}]
{:ok, sctp_transport, events}

{:error, _res} ->
Logger.warning("Unable to set stream #{id} parameters")
:error
end
else
_other -> :error
_other ->
Logger.warning("Received invalid DCEP Open on stream #{id}")
:error
end
end

defp handle_dcep(sctp_transport, id, %DCEP.DataChannelAck{}) do
case Enum.find(sctp_transport.channels, fn {_k, v} -> v.id == id end) do
{ref, %DataChannel{ready_state: :connecting} = channel} ->
Logger.debug("Locally opened DataChannel #{id} has been negotiated succesfully")
# TODO: set the parameters

channel = %DataChannel{channel | ready_state: :open}
channels = Map.put(sctp_transport.channels, ref, channel)
event = {:state_change, ref, :open}
sctp_transport = %{sctp_transport | channels: channels}

{rel_type, rel_param} =
case channel do
%DataChannel{max_packet_life_time: nil, max_retransmits: nil} -> {:reliable, 0}
%DataChannel{max_retransmits: v} when v != nil -> {:rexmit, v}
%DataChannel{max_packet_life_time: v} when v != nil -> {:timed, v}
end

case ExSCTP.configure_stream(sctp_transport.ref, id, channel.ordered, rel_type, rel_param) do
:ok ->
{:ok, sctp_transport, {:state_change, ref, :open}}

{:ok, %{sctp_transport | channels: channels}, event}
{:error, _res} ->
Logger.warning("Unable to set stream #{id} parameters")
:error
end

_other ->
# TODO: close the channel
Logger.warning("Received DCEP Ack without sending the DCEP Open message on stream #{id}")
{:ok, sctp_transport, nil}
:error
end
end

Expand Down
2 changes: 1 addition & 1 deletion mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
"ex_libsrtp": {:hex, :ex_libsrtp, "0.7.2", "211bd89c08026943ce71f3e2c0231795b99cee748808ed3ae7b97cd8d2450b6b", [:mix], [{:bunch, "~> 1.6", [hex: :bunch, repo: "hexpm", optional: false]}, {:bundlex, "~> 1.3", [hex: :bundlex, repo: "hexpm", optional: false]}, {:membrane_precompiled_dependency_provider, "~> 0.1.0", [hex: :membrane_precompiled_dependency_provider, repo: "hexpm", optional: false]}, {:unifex, "~> 1.1", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "2e20645d0d739a4ecdcf8d4810a0c198120c8a2f617f2b75b2e2e704d59f492a"},
"ex_rtcp": {:hex, :ex_rtcp, "0.4.0", "f9e515462a9581798ff6413583a25174cfd2101c94a2ebee871cca7639886f0a", [:mix], [], "hexpm", "28956602cf210d692fcdaf3f60ca49681634e1deb28ace41246aee61ee22dc3b"},
"ex_rtp": {:hex, :ex_rtp, "0.4.0", "1f1b5c1440a904706011e3afbb41741f5da309ce251cb986690ce9fd82636658", [:mix], [], "hexpm", "0f72d80d5953a62057270040f0f1ee6f955c08eeae82ac659c038001d7d5a790"},
"ex_sctp": {:git, "https://github.com/elixir-webrtc/ex_sctp.git", "3de986b3cd00796a1f2a5ac40b001682d5712264", []},
"ex_sctp": {:git, "https://github.com/elixir-webrtc/ex_sctp.git", "0e9f33fc220ee7f2b1f5843d5ab02cb7c229d837", []},
"ex_sdp": {:hex, :ex_sdp, "1.0.0", "c66cd66d60ad03ff1eecdc6db6a1b8a7b89fec260fcc22e8d6703fc5bbf430a3", [:mix], [{:bunch, "~> 1.3", [hex: :bunch, repo: "hexpm", optional: false]}, {:elixir_uuid, "~> 1.2", [hex: :elixir_uuid, repo: "hexpm", optional: false]}], "hexpm", "e165dff988b8ab9d93588636aa5f3f683e1f848fc63b78b12382c8fa3dd39216"},
"ex_stun": {:hex, :ex_stun, "0.2.0", "feb1fc7db0356406655b2a617805e6c712b93308c8ea2bf0ba1197b1f0866deb", [:mix], [], "hexpm", "1e01ba8290082ccbf37acaa5190d1f69b51edd6de2026a8d6d51368b29d115d0"},
"ex_turn": {:hex, :ex_turn, "0.1.0", "177405aadf3d754567d0d37cf881a83f9cacf8f45314d188633b04c4a9e7c1ec", [:mix], [{:ex_stun, "~> 0.2.0", [hex: :ex_stun, repo: "hexpm", optional: false]}], "hexpm", "d677737fb7d45274d5dac19fe3c26b9038b6effbc0a6b3e7417bccc76b6d1cd3"},
Expand Down
44 changes: 40 additions & 4 deletions test/ex_webrtc/data_channel_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ defmodule ExWebRTC.DataChannelTest do
{:ok, pc2} = PeerConnection.start_link()

label1 = "my label 1"
{:ok, %DataChannel{ref: ref1}} = PeerConnection.create_data_channel(pc1, label1)
{:ok, %DataChannel{ref: ref1, id: nil}} = PeerConnection.create_data_channel(pc1, label1)
assert_receive {:ex_webrtc, ^pc1, :negotiation_needed}

:ok = negotiate(pc1, pc2)
Expand All @@ -20,9 +20,12 @@ defmodule ExWebRTC.DataChannelTest do
:ok = connect(pc1, pc2)

assert_receive {:ex_webrtc, ^pc2, {:data_channel, chan1}}
assert %DataChannel{id: 1, label: ^label1, ordered: true} = chan1
assert %DataChannel{ref: rem_ref1, id: 1, label: ^label1, ordered: true} = chan1
assert_receive {:ex_webrtc, ^pc2, {:data_channel_state_change, ^rem_ref1, :open}}
assert_receive {:ex_webrtc, ^pc1, {:data_channel_state_change, ^ref1, :open}}

assert %DataChannel{id: 1} = PeerConnection.get_data_channel(pc1, ref1)

label2 = "my label 2"
protocol = "my proto"

Expand All @@ -32,7 +35,11 @@ defmodule ExWebRTC.DataChannelTest do
refute_receive {:ex_webrtc, ^pc1, :negotiation_needed}

assert_receive {:ex_webrtc, ^pc2, {:data_channel, chan2}}
assert %DataChannel{id: 3, label: ^label2, protocol: ^protocol, ordered: false} = chan2

assert %DataChannel{ref: rem_ref2, id: 3, label: ^label2, protocol: ^protocol, ordered: false} =
chan2

assert_receive {:ex_webrtc, ^pc2, {:data_channel_state_change, ^rem_ref2, :open}}
assert_receive {:ex_webrtc, ^pc1, {:data_channel_state_change, ^ref2, :open}}

label3 = "my label 3"
Expand All @@ -41,8 +48,37 @@ defmodule ExWebRTC.DataChannelTest do
refute_receive {:ex_webrtc, ^pc2, :negotiation_needed}

assert_receive {:ex_webrtc, ^pc1, {:data_channel, chan3}}
assert %DataChannel{id: 4, label: ^label3} = chan3
assert %DataChannel{ref: rem_ref3, id: 4, label: ^label3} = chan3
assert_receive {:ex_webrtc, ^pc2, {:data_channel_state_change, ^ref3, :open}}
assert_receive {:ex_webrtc, ^pc1, {:data_channel_state_change, ^rem_ref3, :open}}
end

describe "closing the channel" do
setup do
{:ok, pc1} = PeerConnection.start_link()
{:ok, pc2} = PeerConnection.start_link()
{:ok, %DataChannel{ref: ref1}} = PeerConnection.create_data_channel(pc1, "label")

:ok = negotiate(pc1, pc2)
:ok = connect(pc1, pc2)

assert_receive {:ex_webrtc, ^pc2, {:data_channel, %DataChannel{ref: ref2}}}
assert_receive {:ex_webrtc, ^pc1, {:data_channel_state_change, ^ref1, :open}}

%{pc1: pc1, pc2: pc2, ref1: ref1, ref2: ref2}
end

test "by initiating peer", %{pc1: pc1, pc2: pc2, ref1: ref1, ref2: ref2} do
assert :ok = PeerConnection.close_data_channel(pc1, ref1)
assert_receive {:ex_webrtc, ^pc1, {:data_channel_state_change, ^ref1, :closed}}
assert_receive {:ex_webrtc, ^pc2, {:data_channel_state_change, ^ref2, :closed}}
end

test "by receiving peer", %{pc1: pc1, pc2: pc2, ref1: ref1, ref2: ref2} do
assert :ok = PeerConnection.close_data_channel(pc2, ref2)
assert_receive {:ex_webrtc, ^pc1, {:data_channel_state_change, ^ref1, :closed}}
assert_receive {:ex_webrtc, ^pc2, {:data_channel_state_change, ^ref2, :closed}}
end
end

describe "negotiating" do
Expand Down