From 637f2d7f940a20c66d2895096ad31fd67c664737 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Wala?= Date: Tue, 27 Aug 2024 14:26:31 +0200 Subject: [PATCH] Add DataChannels (#157) --- .github/workflows/ci.yml | 4 +- examples/chat/.formatter.exs | 4 + examples/chat/.gitignore | 26 ++ examples/chat/README.md | 11 + examples/chat/config/config.exs | 8 + examples/chat/lib/chat.ex | 16 + examples/chat/lib/chat/peer_handler.ex | 107 +++++++ examples/chat/lib/chat/router.ex | 15 + examples/chat/mix.exs | 30 ++ examples/chat/mix.lock | 40 +++ examples/chat/priv/static/index.html | 16 + examples/chat/priv/static/script.js | 53 ++++ lib/ex_webrtc/data_channel.ex | 55 ++++ lib/ex_webrtc/dtls_transport.ex | 21 +- lib/ex_webrtc/peer_connection.ex | 231 ++++++++++++-- lib/ex_webrtc/sctp_transport.ex | 315 ++++++++++++++++++++ lib/ex_webrtc/sctp_transport/dcep.ex | 76 +++++ lib/ex_webrtc/sdp_utils.ex | 23 +- mix.exs | 3 +- mix.lock | 5 +- test/ex_webrtc/data_channel_test.exs | 168 +++++++++++ test/ex_webrtc/peer_connection_test.exs | 14 - test/ex_webrtc/sctp_transport/dcep_test.exs | 39 +++ test/support/test_utils.ex | 23 ++ 24 files changed, 1261 insertions(+), 42 deletions(-) create mode 100644 examples/chat/.formatter.exs create mode 100644 examples/chat/.gitignore create mode 100644 examples/chat/README.md create mode 100644 examples/chat/config/config.exs create mode 100644 examples/chat/lib/chat.ex create mode 100644 examples/chat/lib/chat/peer_handler.ex create mode 100644 examples/chat/lib/chat/router.ex create mode 100644 examples/chat/mix.exs create mode 100644 examples/chat/mix.lock create mode 100644 examples/chat/priv/static/index.html create mode 100644 examples/chat/priv/static/script.js create mode 100644 lib/ex_webrtc/data_channel.ex create mode 100644 lib/ex_webrtc/sctp_transport.ex create mode 100644 lib/ex_webrtc/sctp_transport/dcep.ex create mode 100644 test/ex_webrtc/data_channel_test.exs create mode 100644 test/ex_webrtc/sctp_transport/dcep_test.exs diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index b40676b1..b9341434 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -14,8 +14,8 @@ jobs: name: CI on OTP ${{matrix.otp}} / Elixir ${{matrix.elixir}} strategy: matrix: - otp: ['26'] - elixir: ['1.16'] + otp: ['27'] + elixir: ['1.17'] steps: - name: Set up Elixir uses: erlef/setup-beam@v1 diff --git a/examples/chat/.formatter.exs b/examples/chat/.formatter.exs new file mode 100644 index 00000000..d2cda26e --- /dev/null +++ b/examples/chat/.formatter.exs @@ -0,0 +1,4 @@ +# Used by "mix format" +[ + inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"] +] diff --git a/examples/chat/.gitignore b/examples/chat/.gitignore new file mode 100644 index 00000000..e8c8f34d --- /dev/null +++ b/examples/chat/.gitignore @@ -0,0 +1,26 @@ +# The directory Mix will write compiled artifacts to. +/_build/ + +# If you run "mix test --cover", coverage assets end up here. +/cover/ + +# The directory Mix downloads your dependencies sources to. +/deps/ + +# Where third-party dependencies like ExDoc output generated docs. +/doc/ + +# Ignore .fetch files in case you like to edit your project deps locally. +/.fetch + +# If the VM crashes, it generates a dump, let's ignore it too. +erl_crash.dump + +# Also ignore archive artifacts (built via "mix archive.build"). +*.ez + +# Ignore package tarball (built via "mix hex.build"). +chat-*.tar + +# Temporary files, for example, from tests. +/tmp/ diff --git a/examples/chat/README.md b/examples/chat/README.md new file mode 100644 index 00000000..bf2dd62f --- /dev/null +++ b/examples/chat/README.md @@ -0,0 +1,11 @@ +# Chat + +Receive text chat messages using DataChannel and send them to other peers. + +While in `examples/chat` directory + +1. Run `mix deps.get` +2. Run `mix run --no-halt` +3. Visit `http://127.0.0.1:8829/index.html` in your browser. + +The IP and port of the app can be configured in `config/config.exs`. diff --git a/examples/chat/config/config.exs b/examples/chat/config/config.exs new file mode 100644 index 00000000..fd6796eb --- /dev/null +++ b/examples/chat/config/config.exs @@ -0,0 +1,8 @@ +import Config + +config :logger, level: :info + +# normally you take these from env variables in `config/runtime.exs` +config :chat, + ip: {127, 0, 0, 1}, + port: 8829 diff --git a/examples/chat/lib/chat.ex b/examples/chat/lib/chat.ex new file mode 100644 index 00000000..bf112273 --- /dev/null +++ b/examples/chat/lib/chat.ex @@ -0,0 +1,16 @@ +defmodule Chat do + use Application + + @ip Application.compile_env!(:chat, :ip) + @port Application.compile_env!(:chat, :port) + + @impl true + def start(_type, _args) do + children = [ + {Bandit, plug: __MODULE__.Router, ip: @ip, port: @port}, + {Registry, name: __MODULE__.PubSub, keys: :duplicate} + ] + + Supervisor.start_link(children, strategy: :one_for_one) + end +end diff --git a/examples/chat/lib/chat/peer_handler.ex b/examples/chat/lib/chat/peer_handler.ex new file mode 100644 index 00000000..8f54188b --- /dev/null +++ b/examples/chat/lib/chat/peer_handler.ex @@ -0,0 +1,107 @@ +defmodule Chat.PeerHandler do + require Logger + + alias ExWebRTC.{ + DataChannel, + ICECandidate, + PeerConnection, + SessionDescription + } + + @behaviour WebSock + + @ice_servers [ + %{urls: "stun:stun.l.google.com:19302"} + ] + + @impl true + def init(_) do + {:ok, pc} = PeerConnection.start_link(ice_servers: @ice_servers) + {:ok, _} = Registry.register(Chat.PubSub, "chat", []) + + state = %{ + peer_connection: pc, + channel_ref: nil + } + + {:ok, state} + end + + @impl true + def handle_in({msg, [opcode: :text]}, state) do + msg + |> Jason.decode!() + |> handle_ws_msg(state) + end + + @impl true + def handle_info({:ex_webrtc, _from, msg}, state) do + handle_webrtc_msg(msg, state) + end + + @impl true + def handle_info({:chat_msg, msg}, state) do + :ok = PeerConnection.send_data(state.peer_connection, state.channel_ref, msg) + {:ok, state} + end + + @impl true + def terminate(reason, _state) do + Logger.warning("WebSocket connection was terminated, reason: #{inspect(reason)}") + end + + defp handle_ws_msg(%{"type" => "offer", "data" => data}, state) do + Logger.info("Received SDP offer:\n#{data["sdp"]}") + + offer = SessionDescription.from_json(data) + :ok = PeerConnection.set_remote_description(state.peer_connection, offer) + + {:ok, answer} = PeerConnection.create_answer(state.peer_connection) + :ok = PeerConnection.set_local_description(state.peer_connection, answer) + + answer_json = SessionDescription.to_json(answer) + + msg = + %{"type" => "answer", "data" => answer_json} + |> Jason.encode!() + + Logger.info("Sent SDP answer:\n#{answer_json["sdp"]}") + + {:push, {:text, msg}, state} + end + + defp handle_ws_msg(%{"type" => "ice", "data" => data}, state) do + Logger.info("Received ICE candidate: #{data["candidate"]}") + + candidate = ICECandidate.from_json(data) + :ok = PeerConnection.add_ice_candidate(state.peer_connection, candidate) + {:ok, state} + end + + defp handle_webrtc_msg({:ice_candidate, candidate}, state) do + candidate_json = ICECandidate.to_json(candidate) + + msg = + %{"type" => "ice", "data" => candidate_json} + |> Jason.encode!() + + Logger.info("Sent ICE candidate: #{candidate_json["candidate"]}") + + {:push, {:text, msg}, state} + end + + defp handle_webrtc_msg({:data_channel, %DataChannel{ref: ref}}, state) do + state = %{state | channel_ref: ref} + {:ok, state} + end + + defp handle_webrtc_msg({:data, ref, data}, %{channel_ref: ref} = state) do + Registry.dispatch(Chat.PubSub, "chat", fn entries -> + for {pid, _} <- entries, do: send(pid, {:chat_msg, data}) + end) + + {:ok, state} + end + + defp handle_webrtc_msg(_msg, state), do: {:ok, state} +end diff --git a/examples/chat/lib/chat/router.ex b/examples/chat/lib/chat/router.ex new file mode 100644 index 00000000..20fe22b4 --- /dev/null +++ b/examples/chat/lib/chat/router.ex @@ -0,0 +1,15 @@ +defmodule Chat.Router do + use Plug.Router + + plug(Plug.Static, at: "/", from: :chat) + plug(:match) + plug(:dispatch) + + get "/ws" do + WebSockAdapter.upgrade(conn, Chat.PeerHandler, %{}, []) + end + + match _ do + send_resp(conn, 404, "not found") + end +end diff --git a/examples/chat/mix.exs b/examples/chat/mix.exs new file mode 100644 index 00000000..fee3faed --- /dev/null +++ b/examples/chat/mix.exs @@ -0,0 +1,30 @@ +defmodule Chat.MixProject do + use Mix.Project + + def project do + [ + app: :chat, + version: "0.1.0", + elixir: "~> 1.15", + start_permanent: Mix.env() == :prod, + deps: deps() + ] + end + + def application do + [ + extra_applications: [:logger], + mod: {Chat, []} + ] + end + + defp deps do + [ + {:plug, "~> 1.15.0"}, + {:bandit, "~> 1.2.0"}, + {:websock_adapter, "~> 0.5.0"}, + {:jason, "~> 1.4.0"}, + {:ex_webrtc, path: "../../."} + ] + end +end diff --git a/examples/chat/mix.lock b/examples/chat/mix.lock new file mode 100644 index 00000000..2ebe8a71 --- /dev/null +++ b/examples/chat/mix.lock @@ -0,0 +1,40 @@ +%{ + "bandit": {:hex, :bandit, "1.2.3", "a98d664a96fec23b68e776062296d76a94b4459795b38209f4ae89cb4225709c", [:mix], [{:hpax, "~> 0.1.1", [hex: :hpax, repo: "hexpm", optional: false]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:thousand_island, "~> 1.0", [hex: :thousand_island, repo: "hexpm", optional: false]}, {:websock, "~> 0.5", [hex: :websock, repo: "hexpm", optional: false]}], "hexpm", "3e29150245a9b5f56944434e5240966e75c917dad248f689ab589b32187a81af"}, + "bunch": {:hex, :bunch, "1.6.1", "5393d827a64d5f846092703441ea50e65bc09f37fd8e320878f13e63d410aec7", [:mix], [], "hexpm", "286cc3add551628b30605efbe2fca4e38cc1bea89bcd0a1a7226920b3364fe4a"}, + "bunch_native": {:hex, :bunch_native, "0.5.0", "8ac1536789a597599c10b652e0b526d8833348c19e4739a0759a2bedfd924e63", [:mix], [{:bundlex, "~> 1.0", [hex: :bundlex, repo: "hexpm", optional: false]}], "hexpm", "24190c760e32b23b36edeb2dc4852515c7c5b3b8675b1a864e0715bdd1c8f80d"}, + "bundlex": {:hex, :bundlex, "1.5.3", "35d01e5bc0679510dd9a327936ffb518f63f47175c26a35e708cc29eaec0890b", [:mix], [{:bunch, "~> 1.0", [hex: :bunch, repo: "hexpm", optional: false]}, {:elixir_uuid, "~> 1.2", [hex: :elixir_uuid, repo: "hexpm", optional: false]}, {:qex, "~> 0.5", [hex: :qex, repo: "hexpm", optional: false]}, {:req, ">= 0.4.0", [hex: :req, repo: "hexpm", optional: false]}, {:zarex, "~> 1.0", [hex: :zarex, repo: "hexpm", optional: false]}], "hexpm", "debd0eac151b404f6216fc60222761dff049bf26f7d24d066c365317650cd118"}, + "castore": {:hex, :castore, "1.0.8", "dedcf20ea746694647f883590b82d9e96014057aff1d44d03ec90f36a5c0dc6e", [:mix], [], "hexpm", "0b2b66d2ee742cb1d9cb8c8be3b43c3a70ee8651f37b75a8b982e036752983f1"}, + "crc": {:hex, :crc, "0.10.5", "ee12a7c056ac498ef2ea985ecdc9fa53c1bfb4e53a484d9f17ff94803707dfd8", [:mix, :rebar3], [{:elixir_make, "~> 0.6", [hex: :elixir_make, repo: "hexpm", optional: false]}], "hexpm", "3e673b6495a9525c5c641585af1accba59a1eb33de697bedf341e247012c2c7f"}, + "elixir_make": {:hex, :elixir_make, "0.8.4", "4960a03ce79081dee8fe119d80ad372c4e7badb84c493cc75983f9d3bc8bde0f", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:certifi, "~> 2.0", [hex: :certifi, repo: "hexpm", optional: true]}], "hexpm", "6e7f1d619b5f61dfabd0a20aa268e575572b542ac31723293a4c1a567d5ef040"}, + "elixir_uuid": {:hex, :elixir_uuid, "1.2.1", "dce506597acb7e6b0daeaff52ff6a9043f5919a4c3315abb4143f0b00378c097", [:mix], [], "hexpm", "f7eba2ea6c3555cea09706492716b0d87397b88946e6380898c2889d68585752"}, + "ex_dtls": {:git, "https://github.com/elixir-webrtc/ex_dtls.git", "dff8ec1998dfb556b2d3dafbd30574d0da18b958", []}, + "ex_ice": {:hex, :ex_ice, "0.8.0", "f9bd181e8fd2f8ac9a808587ee8a47bf667143069d75f6e4892a62156d798aa7", [:mix], [{:elixir_uuid, "~> 1.0", [hex: :elixir_uuid, repo: "hexpm", optional: false]}, {:ex_stun, "~> 0.2.0", [hex: :ex_stun, repo: "hexpm", optional: false]}, {:ex_turn, "~> 0.1.0", [hex: :ex_turn, repo: "hexpm", optional: false]}], "hexpm", "b0476f6b18986f6df48fda4cecb3be5022323572790d1bb49da10b177c936b4e"}, + "ex_libsrtp": {:hex, :ex_libsrtp, "0.7.2", "211bd89c08026943ce71f3e2c0231795b99cee748808ed3ae7b97cd8d2450b6b", [:mix], [{:bunch, "~> 1.6", [hex: :bunch, repo: "hexpm", optional: false]}, {:bundlex, "~> 1.3", [hex: :bundlex, repo: "hexpm", optional: false]}, {:membrane_precompiled_dependency_provider, "~> 0.1.0", [hex: :membrane_precompiled_dependency_provider, repo: "hexpm", optional: false]}, {:unifex, "~> 1.1", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "2e20645d0d739a4ecdcf8d4810a0c198120c8a2f617f2b75b2e2e704d59f492a"}, + "ex_rtcp": {:hex, :ex_rtcp, "0.4.0", "f9e515462a9581798ff6413583a25174cfd2101c94a2ebee871cca7639886f0a", [:mix], [], "hexpm", "28956602cf210d692fcdaf3f60ca49681634e1deb28ace41246aee61ee22dc3b"}, + "ex_rtp": {:hex, :ex_rtp, "0.4.0", "1f1b5c1440a904706011e3afbb41741f5da309ce251cb986690ce9fd82636658", [:mix], [], "hexpm", "0f72d80d5953a62057270040f0f1ee6f955c08eeae82ac659c038001d7d5a790"}, + "ex_sctp": {:git, "https://github.com/elixir-webrtc/ex_sctp.git", "1576207bda0eba3634a2b0075899042e9b309e60", []}, + "ex_sdp": {:hex, :ex_sdp, "1.0.0", "c66cd66d60ad03ff1eecdc6db6a1b8a7b89fec260fcc22e8d6703fc5bbf430a3", [:mix], [{:bunch, "~> 1.3", [hex: :bunch, repo: "hexpm", optional: false]}, {:elixir_uuid, "~> 1.2", [hex: :elixir_uuid, repo: "hexpm", optional: false]}], "hexpm", "e165dff988b8ab9d93588636aa5f3f683e1f848fc63b78b12382c8fa3dd39216"}, + "ex_stun": {:hex, :ex_stun, "0.2.0", "feb1fc7db0356406655b2a617805e6c712b93308c8ea2bf0ba1197b1f0866deb", [:mix], [], "hexpm", "1e01ba8290082ccbf37acaa5190d1f69b51edd6de2026a8d6d51368b29d115d0"}, + "ex_turn": {:hex, :ex_turn, "0.1.0", "177405aadf3d754567d0d37cf881a83f9cacf8f45314d188633b04c4a9e7c1ec", [:mix], [{:ex_stun, "~> 0.2.0", [hex: :ex_stun, repo: "hexpm", optional: false]}], "hexpm", "d677737fb7d45274d5dac19fe3c26b9038b6effbc0a6b3e7417bccc76b6d1cd3"}, + "finch": {:hex, :finch, "0.18.0", "944ac7d34d0bd2ac8998f79f7a811b21d87d911e77a786bc5810adb75632ada4", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: false]}, {:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.3", [hex: :mint, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.4 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:nimble_pool, "~> 0.2.6 or ~> 1.0", [hex: :nimble_pool, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "69f5045b042e531e53edc2574f15e25e735b522c37e2ddb766e15b979e03aa65"}, + "hpax": {:hex, :hpax, "0.1.2", "09a75600d9d8bbd064cdd741f21fc06fc1f4cf3d0fcc335e5aa19be1a7235c84", [:mix], [], "hexpm", "2c87843d5a23f5f16748ebe77969880e29809580efdaccd615cd3bed628a8c13"}, + "jason": {:hex, :jason, "1.4.4", "b9226785a9aa77b6857ca22832cffa5d5011a667207eb2a0ad56adb5db443b8a", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "c5eb0cab91f094599f94d55bc63409236a8ec69a21a67814529e8d5f6cc90b3b"}, + "membrane_precompiled_dependency_provider": {:hex, :membrane_precompiled_dependency_provider, "0.1.2", "8af73b7dc15ba55c9f5fbfc0453d4a8edfb007ade54b56c37d626be0d1189aba", [:mix], [{:bundlex, "~> 1.4", [hex: :bundlex, repo: "hexpm", optional: false]}], "hexpm", "7fe3e07361510445a29bee95336adde667c4162b76b7f4c8af3aeb3415292023"}, + "mime": {:hex, :mime, "2.0.6", "8f18486773d9b15f95f4f4f1e39b710045fa1de891fada4516559967276e4dc2", [:mix], [], "hexpm", "c9945363a6b26d747389aac3643f8e0e09d30499a138ad64fe8fd1d13d9b153e"}, + "mint": {:hex, :mint, "1.6.2", "af6d97a4051eee4f05b5500671d47c3a67dac7386045d87a904126fd4bbcea2e", [:mix], [{:castore, "~> 0.1.0 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:hpax, "~> 0.1.1 or ~> 0.2.0 or ~> 1.0", [hex: :hpax, repo: "hexpm", optional: false]}], "hexpm", "5ee441dffc1892f1ae59127f74afe8fd82fda6587794278d924e4d90ea3d63f9"}, + "nimble_options": {:hex, :nimble_options, "1.1.1", "e3a492d54d85fc3fd7c5baf411d9d2852922f66e69476317787a7b2bb000a61b", [:mix], [], "hexpm", "821b2470ca9442c4b6984882fe9bb0389371b8ddec4d45a9504f00a66f650b44"}, + "nimble_pool": {:hex, :nimble_pool, "1.1.0", "bf9c29fbdcba3564a8b800d1eeb5a3c58f36e1e11d7b7fb2e084a643f645f06b", [:mix], [], "hexpm", "af2e4e6b34197db81f7aad230c1118eac993acc0dae6bc83bac0126d4ae0813a"}, + "plug": {:hex, :plug, "1.15.3", "712976f504418f6dff0a3e554c40d705a9bcf89a7ccef92fc6a5ef8f16a30a97", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.1.1 or ~> 1.2 or ~> 2.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "cc4365a3c010a56af402e0809208873d113e9c38c401cabd88027ef4f5c01fd2"}, + "plug_crypto": {:hex, :plug_crypto, "2.1.0", "f44309c2b06d249c27c8d3f65cfe08158ade08418cf540fd4f72d4d6863abb7b", [:mix], [], "hexpm", "131216a4b030b8f8ce0f26038bc4421ae60e4bb95c5cf5395e1421437824c4fa"}, + "qex": {:hex, :qex, "0.5.1", "0d82c0f008551d24fffb99d97f8299afcb8ea9cf99582b770bd004ed5af63fd6", [:mix], [], "hexpm", "935a39fdaf2445834b95951456559e9dc2063d0a055742c558a99987b38d6bab"}, + "req": {:hex, :req, "0.5.6", "8fe1eead4a085510fe3d51ad854ca8f20a622aae46e97b302f499dfb84f726ac", [:mix], [{:brotli, "~> 0.3.1", [hex: :brotli, repo: "hexpm", optional: true]}, {:ezstd, "~> 1.0", [hex: :ezstd, repo: "hexpm", optional: true]}, {:finch, "~> 0.17", [hex: :finch, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:mime, "~> 2.0.6 or ~> 2.1", [hex: :mime, repo: "hexpm", optional: false]}, {:nimble_csv, "~> 1.0", [hex: :nimble_csv, repo: "hexpm", optional: true]}, {:plug, "~> 1.0", [hex: :plug, repo: "hexpm", optional: true]}], "hexpm", "cfaa8e720945d46654853de39d368f40362c2641c4b2153c886418914b372185"}, + "rustler": {:hex, :rustler, "0.34.0", "e9a73ee419fc296a10e49b415a2eb87a88c9217aa0275ec9f383d37eed290c1c", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:req, "~> 0.5", [hex: :req, repo: "hexpm", optional: false]}, {:toml, "~> 0.6", [hex: :toml, repo: "hexpm", optional: false]}], "hexpm", "1d0c7449482b459513003230c0e2422b0252245776fe6fd6e41cb2b11bd8e628"}, + "shmex": {:hex, :shmex, "0.5.1", "81dd209093416bf6608e66882cb7e676089307448a1afd4fc906c1f7e5b94cf4", [:mix], [{:bunch_native, "~> 0.5.0", [hex: :bunch_native, repo: "hexpm", optional: false]}, {:bundlex, "~> 1.0", [hex: :bundlex, repo: "hexpm", optional: false]}], "hexpm", "c29f8286891252f64c4e1dac40b217d960f7d58def597c4e606ff8fbe71ceb80"}, + "telemetry": {:hex, :telemetry, "1.2.1", "68fdfe8d8f05a8428483a97d7aab2f268aaff24b49e0f599faa091f1d4e7f61c", [:rebar3], [], "hexpm", "dad9ce9d8effc621708f99eac538ef1cbe05d6a874dd741de2e689c47feafed5"}, + "thousand_island": {:hex, :thousand_island, "1.3.5", "6022b6338f1635b3d32406ff98d68b843ba73b3aa95cfc27154223244f3a6ca5", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "2be6954916fdfe4756af3239fb6b6d75d0b8063b5df03ba76fd8a4c87849e180"}, + "toml": {:hex, :toml, "0.7.0", "fbcd773caa937d0c7a02c301a1feea25612720ac3fa1ccb8bfd9d30d822911de", [:mix], [], "hexpm", "0690246a2478c1defd100b0c9b89b4ea280a22be9a7b313a8a058a2408a2fa70"}, + "unifex": {:hex, :unifex, "1.2.0", "90d1ec5e6d788350e07e474f7bd8b0ee866d6606beb9ca4e20dbb26328712a84", [:mix], [{:bunch, "~> 1.0", [hex: :bunch, repo: "hexpm", optional: false]}, {:bundlex, "~> 1.4", [hex: :bundlex, repo: "hexpm", optional: false]}, {:shmex, "~> 0.5.0", [hex: :shmex, repo: "hexpm", optional: false]}], "hexpm", "7a8395aabc3ba6cff04bbe5b995de7f899a38eb57f189e49927d6b8b6ccb6883"}, + "websock": {:hex, :websock, "0.5.3", "2f69a6ebe810328555b6fe5c831a851f485e303a7c8ce6c5f675abeb20ebdadc", [:mix], [], "hexpm", "6105453d7fac22c712ad66fab1d45abdf049868f253cf719b625151460b8b453"}, + "websock_adapter": {:hex, :websock_adapter, "0.5.6", "0437fe56e093fd4ac422de33bf8fc89f7bc1416a3f2d732d8b2c8fd54792fe60", [:mix], [{:bandit, ">= 0.6.0", [hex: :bandit, repo: "hexpm", optional: true]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.6", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:websock, "~> 0.5", [hex: :websock, repo: "hexpm", optional: false]}], "hexpm", "e04378d26b0af627817ae84c92083b7e97aca3121196679b73c73b99d0d133ea"}, + "zarex": {:hex, :zarex, "1.0.5", "58239e3ee5d75f343262bb4df5cf466555a1c689f920e5d3651a9333972f7c7e", [:mix], [], "hexpm", "9fb72ef0567c2b2742f5119a1ba8a24a2fabb21b8d09820aefbf3e592fa9a46a"}, +} diff --git a/examples/chat/priv/static/index.html b/examples/chat/priv/static/index.html new file mode 100644 index 00000000..9f3ae6c6 --- /dev/null +++ b/examples/chat/priv/static/index.html @@ -0,0 +1,16 @@ + + + + + + + Elixir WebRTC Chat Example + + +

Elixir WebRTC Chat Example

+ + +
+ + + diff --git a/examples/chat/priv/static/script.js b/examples/chat/priv/static/script.js new file mode 100644 index 00000000..7129de8a --- /dev/null +++ b/examples/chat/priv/static/script.js @@ -0,0 +1,53 @@ +const pcConfig = { 'iceServers': [{ 'urls': 'stun:stun.l.google.com:19302' },] }; +const chatInput = document.getElementById("chatInput"); +const chatMessages = document.getElementById("chatMessages"); + +const proto = window.location.protocol === "https:" ? "wss:" : "ws:" +const ws = new WebSocket(`${proto}//${window.location.host}/ws`); +ws.onopen = _ => start_connection(ws); +ws.onclose = event => console.log("WebSocket connection was terminated:", event); + +const start_connection = async (ws) => { + const pc = new RTCPeerConnection(pcConfig); + pc.onicecandidate = event => { + if (event.candidate === null) return; + + console.log("Sent ICE candidate:", event.candidate); + ws.send(JSON.stringify({ type: "ice", data: event.candidate })); + }; + + const dataChannel = pc.createDataChannel("chat"); + + dataChannel.onmessage = event => { + const msg = document.createElement("p"); + msg.innerText = event.data; + chatMessages.appendChild(msg); + }; + + chatInput.onkeydown = event => { + if (event.code !== "Enter") return; + if (dataChannel.readyState !== "open") return; + + dataChannel.send(chatInput.value); + chatInput.value = ""; + }; + + ws.onmessage = async event => { + const {type, data} = JSON.parse(event.data); + + switch (type) { + case "answer": + console.log("Received SDP answer:", data); + await pc.setRemoteDescription(data) + break; + case "ice": + console.log("Received ICE candidate:", data); + await pc.addIceCandidate(data); + } + }; + + const offer = await pc.createOffer(); + await pc.setLocalDescription(offer); + console.log("Sent SDP offer:", offer) + ws.send(JSON.stringify({type: "offer", data: offer})); +}; diff --git a/lib/ex_webrtc/data_channel.ex b/lib/ex_webrtc/data_channel.ex new file mode 100644 index 00000000..9e37902a --- /dev/null +++ b/lib/ex_webrtc/data_channel.ex @@ -0,0 +1,55 @@ +defmodule ExWebRTC.DataChannel do + @moduledoc """ + Implementation of the [RTCDataChannel](https://developer.mozilla.org/en-US/docs/Web/API/RTCDataChannel). + """ + + @type order() :: :ordered | :unordered + + @type ref() :: reference() + + @type id() :: non_neg_integer() + + @type ready_state() :: :connecting | :open | :closing | :closed + + @typedoc """ + Options used when creating a new DataChannel. + + For more information refer to `ExWebRTC.PeerConnection.create_data_channel/3`. + + As of now, Elixir WebRTC does not support `negotiated: true` option, all DataChannels need to be + negotiated in-band. + """ + @type options() :: [ + ordered: order(), + max_packet_life_time: non_neg_integer(), + max_retransmits: non_neg_integer(), + protocol: String.t() + ] + + @typedoc """ + Struct representing the DataChannel. + + All of the fields have the same meaning as in [RTCDataChannel](https://developer.mozilla.org/en-US/docs/Web/API/RTCDataChannel) + except for `ref` which is a local identifier used when refering to this DataChannel in + received messages or when calling `ExWebRTC.PeerConnection.send_data/3` function. + + It's worth to mention that `id` and `label` can be used by the other peer to identify the data + channel, althought be careful as: + * `label` does not have to be unique, channels can share a single label, + * `id` is only assigned after the SCTP connection has been established (which means + that DataChannels created before first negotiation will have `id` set to `nil`) + """ + @type t() :: %__MODULE__{ + ref: ref(), + id: non_neg_integer() | nil, + label: String.t(), + max_packet_life_time: non_neg_integer() | nil, + max_retransmits: non_neg_integer() | nil, + ordered: order(), + protocol: String.t(), + ready_state: ready_state() + } + + @enforce_keys [:ref, :id, :label, :ordered, :protocol, :ready_state] + defstruct @enforce_keys ++ [:max_packet_life_time, :max_retransmits] +end diff --git a/lib/ex_webrtc/dtls_transport.ex b/lib/ex_webrtc/dtls_transport.ex index 070f89f4..3ce5f86b 100644 --- a/lib/ex_webrtc/dtls_transport.ex +++ b/lib/ex_webrtc/dtls_transport.ex @@ -91,6 +91,11 @@ defmodule ExWebRTC.DTLSTransport do GenServer.cast(dtls_transport, {:send_rtcp, data}) end + @spec send_data(dtls_transport(), binary()) :: :ok + def send_data(dtls_transport, data) do + GenServer.cast(dtls_transport, {:send_data, data}) + end + @spec stop(dtls_transport()) :: :ok def stop(dtls_transport) do GenServer.stop(dtls_transport) @@ -254,6 +259,16 @@ defmodule ExWebRTC.DTLSTransport do {:noreply, state} end + @impl true + def handle_cast({:send_data, data}, state) do + case ExDTLS.write_data(state.dtls, data) do + {:ok, protected} -> state.ice_transport.send_data(state.ice_pid, protected) + {:error, reason} -> Logger.warning("Unable to protect data: #{inspect(reason)}") + end + + {:noreply, state} + end + @impl true def handle_info(:dtls_timeout, %{buffered_local_packets: buffered_local_packets} = state) do case ExDTLS.handle_timeout(state.dtls) do @@ -309,7 +324,7 @@ defmodule ExWebRTC.DTLSTransport do {:ok, state} end - defp handle_ice_data({:data, <> = data}, state) when f in 20..64 do + defp handle_ice_data({:data, <> = data}, state) when f in 20..63 do case ExDTLS.handle_data(state.dtls, data) do {:handshake_packets, packets, timeout} when state.ice_connected -> :ok = state.ice_transport.send_data(state.ice_pid, packets) @@ -361,6 +376,10 @@ defmodule ExWebRTC.DTLSTransport do :handshake_want_read -> {:ok, state} + {:ok, data} -> + notify(state.owner, {:data, data}) + {:ok, state} + {:error, reason} = error -> # TODO: consider buffering DTLS packets that came out of order during the handshake Logger.debug("DTLS error: #{reason}") diff --git a/lib/ex_webrtc/peer_connection.ex b/lib/ex_webrtc/peer_connection.ex index 495c5976..15df00e8 100644 --- a/lib/ex_webrtc/peer_connection.ex +++ b/lib/ex_webrtc/peer_connection.ex @@ -12,12 +12,14 @@ defmodule ExWebRTC.PeerConnection do alias __MODULE__.{Configuration, Demuxer, TWCCRecorder} alias ExWebRTC.{ + DataChannel, DefaultICETransport, DTLSTransport, ICECandidate, MediaStreamTrack, RTPTransceiver, RTPSender, + SCTPTransport, SDPUtils, SessionDescription, Utils @@ -63,6 +65,7 @@ defmodule ExWebRTC.PeerConnection do Most of the messages match the [RTCPeerConnection events](https://developer.mozilla.org/en-US/docs/Web/API/RTCPeerConnection#events), except for: * `:track_muted`, `:track_ended` - these match the [MediaStreamTrack events](https://developer.mozilla.org/en-US/docs/Web/API/MediaStreamTrack#events). + * `:data` - data received from DataChannel identified by its `ref`. * `:rtp` and `:rtcp` - these contain packets received by the PeerConnection. The third element of `:rtp` tuple is a simulcast RID and is set to `nil` if simulcast is not used. * each of the packets in `:rtcp` message is in the form of `{track_id, packet}` tuple, where `track_id` is the id of the corrsponding track. @@ -77,9 +80,12 @@ defmodule ExWebRTC.PeerConnection do | {:ice_gathering_state_change, ice_gathering_state()} | :negotiation_needed | {:signaling_state_change, signaling_state()} + | {:data_channel_state_change, DataChannel.ref(), DataChannel.ready_state()} + | {:data_channel, DataChannel.t()} | {:track, MediaStreamTrack.t()} | {:track_muted, MediaStreamTrack.id()} | {:track_ended, MediaStreamTrack.id()} + | {:data, DataChannel.ref(), binary()} | {:rtp, MediaStreamTrack.id(), String.t() | nil, ExRTP.Packet.t()}} | {:rtcp, [{MediaStreamTrack.id() | nil, ExRTCP.Packet.packet()}]} @@ -163,6 +169,16 @@ defmodule ExWebRTC.PeerConnection do GenServer.cast(peer_connection, {:send_pli, track_id, rid}) end + @doc """ + Sends data over DataChannel, using channel identified by `ref`. + + Requires the channel to be in `:open` state. + """ + @spec send_data(peer_connection(), DataChannel.ref(), binary()) :: :ok + def send_data(peer_connection, channel_ref, data) do + GenServer.cast(peer_connection, {:send_data, channel_ref, data}) + end + #### MDN-API #### @doc """ @@ -408,6 +424,17 @@ defmodule ExWebRTC.PeerConnection do GenServer.call(peer_connection, {:remove_track, sender_id}) end + @doc """ + Creates a new DataChannel. + + For more information, refer to the [RTCPeerConnection: createDataChannel() method](https://developer.mozilla.org/en-US/docs/Web/API/RTCPeerConnection/createDataChannel). + """ + @spec create_data_channel(peer_connection(), String.t(), DataChannel.options()) :: + {:ok, DataChannel.t()} | {:error, atom()} + def create_data_channel(peer_connection, label, opts \\ []) do + GenServer.call(peer_connection, {:create_data_channel, label, opts}) + end + @doc """ Closes the PeerConnection. @@ -455,6 +482,7 @@ defmodule ExWebRTC.PeerConnection do current_remote_desc: nil, pending_remote_desc: nil, negotiation_needed: false, + sctp_transport: SCTPTransport.new(), ice_transport: DefaultICETransport, ice_pid: ice_pid, dtls_transport: dtls_transport, @@ -591,8 +619,13 @@ defmodule ExWebRTC.PeerConnection do mlines = Enum.map(remote_offer.media, fn mline -> {:mid, mid} = ExSDP.get_attribute(mline, :mid) - {_ix, transceiver} = find_transceiver(state.transceivers, mid) - RTPTransceiver.to_answer_mline(transceiver, mline, opts) + + if SDPUtils.data_channel?(mline) do + generate_data_mline(mid, opts) + else + {_ix, transceiver} = find_transceiver(state.transceivers, mid) + RTPTransceiver.to_answer_mline(transceiver, mline, opts) + end end) mids = SDPUtils.get_bundle_mids(mlines) @@ -851,6 +884,34 @@ defmodule ExWebRTC.PeerConnection do end end + @impl true + def handle_call({:create_data_channel, label, opts}, _from, state) do + ordered = Keyword.get(opts, :ordered, true) + lifetime = Keyword.get(opts, :max_packet_life_time) + max_rtx = Keyword.get(opts, :max_retransmits) + protocol = Keyword.get(opts, :protocol, "") + + with true <- byte_size(label) < 65_535, + true <- lifetime == nil or max_rtx == nil do + {events, channel, sctp_transport} = + SCTPTransport.add_channel( + state.sctp_transport, + label, + ordered, + protocol, + lifetime, + max_rtx + ) + + state = update_negotiation_needed(%{state | sctp_transport: sctp_transport}) + + handle_sctp_events(events, state) + {:reply, {:ok, channel}, state} + else + _other -> {:reply, {:error, :invalid_option}, state} + end + end + @impl true def handle_call(:get_stats, _from, state) do timestamp = System.os_time(:millisecond) @@ -1032,6 +1093,17 @@ defmodule ExWebRTC.PeerConnection do end end + @impl true + def handle_cast({:send_data, channel_ref, data}, state) do + # TODO: allow for configuring the type of data + {events, sctp_transport} = + SCTPTransport.send(state.sctp_transport, channel_ref, :string, data) + + handle_sctp_events(events, state) + + {:noreply, %{state | sctp_transport: sctp_transport}} + end + @impl true def handle_info({:ex_ice, _from, {:connection_state_change, new_ice_state}}, state) do state = %{state | ice_state: new_ice_state} @@ -1075,9 +1147,13 @@ defmodule ExWebRTC.PeerConnection do @impl true def handle_info({:dtls_transport, _pid, {:state_change, new_dtls_state}}, state) do - state = %{state | dtls_state: new_dtls_state} next_conn_state = next_conn_state(state.ice_state, new_dtls_state) - state = update_conn_state(state, next_conn_state) + + state = + %{state | dtls_state: new_dtls_state} + |> update_conn_state(next_conn_state) + |> maybe_connect_sctp() + {:noreply, state} end @@ -1159,6 +1235,14 @@ defmodule ExWebRTC.PeerConnection do end end + @impl true + def handle_info({:dtls_transport, _pid, {:data, data}}, state) do + {events, sctp_transport} = SCTPTransport.handle_data(state.sctp_transport, data) + handle_sctp_events(events, state) + + {:noreply, %{state | sctp_transport: sctp_transport}} + end + @impl true def handle_info(:send_twcc_feedback, %{twcc_recorder: twcc_recorder} = state) do Process.send_after(self(), :send_twcc_feedback, @twcc_interval) @@ -1229,6 +1313,14 @@ defmodule ExWebRTC.PeerConnection do {:noreply, %{state | transceivers: transceivers}} end + @impl true + def handle_info(:sctp_timeout, state) do + {events, sctp_transport} = SCTPTransport.handle_timeout(state.sctp_transport) + handle_sctp_events(events, state) + + {:noreply, %{state | sctp_transport: sctp_transport}} + end + @impl true def handle_info(msg, state) do Logger.info("Received unexpected message: #{inspect(msg)}") @@ -1248,7 +1340,7 @@ defmodule ExWebRTC.PeerConnection do # converting them into mlines next_mid = find_next_mid(state) - {transceivers, _next_mid} = + {transceivers, next_mid} = Enum.map_reduce(state.transceivers, next_mid, fn # In the initial offer, we can't have stopped transceivers, only stopping ones. # Also, stopped transceivers are immediately removed. @@ -1267,7 +1359,14 @@ defmodule ExWebRTC.PeerConnection do |> Enum.reject(fn tr -> tr.stopping == true end) |> Enum.map(&RTPTransceiver.to_offer_mline(&1, opts)) - {transceivers, mlines} + data_mline = + if SCTPTransport.data_channels?(state.sctp_transport) do + [generate_data_mline(next_mid, opts)] + else + [] + end + + {transceivers, mlines ++ data_mline} end defp generate_offer_mlines(state, opts) do @@ -1275,7 +1374,8 @@ defmodule ExWebRTC.PeerConnection do next_mid = find_next_mid(state) next_mline_idx = Enum.count(last_answer.media) - transceivers = assign_mlines(state.transceivers, last_answer, next_mid, next_mline_idx) + {transceivers, next_mid} = + assign_mlines(state.transceivers, last_answer, next_mid, next_mline_idx) # The idea is as follows: # * Iterate over current local mlines @@ -1293,13 +1393,17 @@ defmodule ExWebRTC.PeerConnection do current_local_desc.media |> Stream.with_index() |> Enum.map(fn {local_mline, idx} -> - case Enum.find(transceivers, &(&1.mline_idx == idx)) do - # if there is no transceiver, the mline must have been rejected - # in the past (in the offer or answer) so we always set the port to 0 - nil -> + tr = Enum.find(transceivers, &(&1.mline_idx == idx)) + + cond do + SDPUtils.data_channel?(local_mline) -> + {:mid, mid} = ExSDP.get_attribute(local_mline, :mid) + generate_data_mline(mid, opts) + + tr == nil -> %{local_mline | port: 0} - tr -> + true -> RTPTransceiver.to_offer_mline(tr, opts) end end) @@ -1313,7 +1417,37 @@ defmodule ExWebRTC.PeerConnection do final_mlines = final_mlines ++ rem_mlines - {transceivers, final_mlines} + data_mline = + if SCTPTransport.data_channels?(state.sctp_transport) and + not Enum.any?(final_mlines, &SDPUtils.data_channel?(&1)) do + [generate_data_mline(next_mid, opts)] + else + [] + end + + {transceivers, final_mlines ++ data_mline} + end + + def generate_data_mline(mid, opts) do + attributes = + [ + {:mid, mid}, + {:ice_ufrag, Keyword.fetch!(opts, :ice_ufrag)}, + {:ice_pwd, Keyword.fetch!(opts, :ice_pwd)}, + {:ice_options, Keyword.fetch!(opts, :ice_options)}, + {:fingerprint, Keyword.fetch!(opts, :fingerprint)}, + {:setup, Keyword.fetch!(opts, :setup)}, + {"sctp-port", "5000"} + ] + + # NOTICE: Media.new puts fmtp (`webrtc-datachannel`) into a list + %ExSDP.Media{ + ExSDP.Media.new("application", 9, "UDP/DTLS/SCTP", "webrtc-datachannel") + | # mline must be followed by a cline, which must contain + # the default value "IN IP4 0.0.0.0" (as there are no candidates yet) + connection_data: [%ExSDP.ConnectionData{address: {0, 0, 0, 0}}] + } + |> ExSDP.add_attributes(attributes) end # next_mline_idx is future mline idx to use if there are no mlines to recycle @@ -1327,7 +1461,7 @@ defmodule ExWebRTC.PeerConnection do result \\ [] ) - defp assign_mlines([], _, _, _, _, result), do: Enum.reverse(result) + defp assign_mlines([], _, next_mid, _, _, result), do: {Enum.reverse(result), next_mid} defp assign_mlines( [%{mid: nil, mline_idx: nil, stopped: false} = tr | trs], @@ -1369,7 +1503,10 @@ defmodule ExWebRTC.PeerConnection do state.ice_transport.gather_candidates(state.ice_pid) end - transceivers = process_mlines_local(sdp.media, state.transceivers, type, state.owner) + transceivers = + sdp.media + |> Enum.reject(&SDPUtils.data_channel?/1) + |> process_mlines_local(state.transceivers, type, state.owner) # TODO re-think order of those functions # and demuxer update @@ -1412,11 +1549,14 @@ defmodule ExWebRTC.PeerConnection do state = %{state | config: config, twcc_extension_id: twcc_id} transceivers = - process_mlines_remote(sdp.media, state.transceivers, type, state.config, state.owner) + sdp.media + |> Enum.reject(&SDPUtils.data_channel?/1) + |> process_mlines_remote(state.transceivers, type, state.config, state.owner) # infer our role from the remote role dtls_role = if dtls_role in [:actpass, :passive], do: :active, else: :passive DTLSTransport.start_dtls(state.dtls_transport, dtls_role, peer_fingerprint) + sctp_transport = SCTPTransport.set_role(state.sctp_transport, dtls_role) # ice_creds will be nil if all of the mlines in the description are rejected # in such case, if this is the first remote description, connection won't be established @@ -1439,7 +1579,9 @@ defmodule ExWebRTC.PeerConnection do |> Map.replace!(:transceivers, transceivers) |> remove_stopped_transceivers(type, sdp) |> update_signaling_state(next_sig_state) + |> Map.replace!(:sctp_transport, sctp_transport) |> Map.update!(:demuxer, &Demuxer.update(&1, sdp)) + |> maybe_connect_sctp() if state.signaling_state == :stable do state = %{state | negotiation_needed: false} @@ -1751,7 +1893,9 @@ defmodule ExWebRTC.PeerConnection do do: state defp update_negotiation_needed(state) do - negotiation_needed = negotiation_needed?(state.transceivers, state) + negotiation_needed = + tr_negotiation_needed?(state.transceivers, state) or + dc_negotiation_needed?(state) cond do negotiation_needed == true and state.negotiation_needed == true -> @@ -1770,14 +1914,26 @@ defmodule ExWebRTC.PeerConnection do end end + defp dc_negotiation_needed?(state) do + first_channel = map_size(state.sctp_transport.channels) == 1 + + has_channels = + case state.current_local_desc do + nil -> false + {_, desc} -> Enum.any?(desc.media, &SDPUtils.data_channel?(&1)) + end + + first_channel and not has_channels + end + # We don't support MSIDs and stopping transceivers so # we only check 5.2 and 5.3 from 4.7.3#check-if-negotiation-is-needed # https://www.w3.org/TR/webrtc/#dfn-check-if-negotiation-is-needed - defp negotiation_needed?([], _), do: false + defp tr_negotiation_needed?([], _), do: false - defp negotiation_needed?([tr | _transceivers], _state) when tr.mid == nil, do: true + defp tr_negotiation_needed?([tr | _transceivers], _state) when tr.mid == nil, do: true - defp negotiation_needed?([tr | transceivers], state) do + defp tr_negotiation_needed?([tr | transceivers], state) do {local_desc_type, local_desc} = state.current_local_desc {_, remote_desc} = state.current_remote_desc @@ -1802,10 +1958,25 @@ defmodule ExWebRTC.PeerConnection do true true -> - negotiation_needed?(transceivers, state) + tr_negotiation_needed?(transceivers, state) + end + end + + defp maybe_connect_sctp(%{current_remote_desc: {:answer, sdp}} = state) do + has_channel? = Enum.any?(sdp.media, &SDPUtils.data_channel?(&1)) + connected? = state.dtls_state == :connected + + if has_channel? and connected? do + {events, sctp_transport} = SCTPTransport.connect(state.sctp_transport) + handle_sctp_events(events, state) + %{state | sctp_transport: sctp_transport} + else + state end end + defp maybe_connect_sctp(state), do: state + defp handle_rtcp_packet(state, %ExRTCP.Packet.ReceiverReport{} = report) do with true <- :rtcp_reports in state.config.features, {:ok, mid} <- Demuxer.demux_ssrc(state.demuxer, report.ssrc), @@ -1876,6 +2047,24 @@ defmodule ExWebRTC.PeerConnection do defp handle_rtcp_packet(state, _packet), do: {nil, state} + defp handle_sctp_events(events, state) do + for event <- events do + case event do + {:transmit, packets} -> + Enum.each(packets, &DTLSTransport.send_data(state.dtls_transport, &1)) + + {:channel, channel} -> + notify(state.owner, {:data_channel, channel}) + + {:state_change, ref, new_state} -> + notify(state.owner, {:data_channel_state_change, ref, new_state}) + + {:data, ref, data} -> + notify(state.owner, {:data, ref, data}) + end + end + end + defp do_get_description(nil, _candidates), do: nil defp do_get_description({type, sdp}, candidates) do diff --git a/lib/ex_webrtc/sctp_transport.ex b/lib/ex_webrtc/sctp_transport.ex new file mode 100644 index 00000000..009e6add --- /dev/null +++ b/lib/ex_webrtc/sctp_transport.ex @@ -0,0 +1,315 @@ +defmodule ExWebRTC.SCTPTransport do + @moduledoc false + + require Logger + + alias __MODULE__.DCEP + alias ExWebRTC.DataChannel + + @dcep_ppi 50 + + @opaque t() :: map() + + @type event() :: + {:transmit, binary()} + | {:data, DataChannel.ref(), binary()} + | {:channel, DataChannel.t()} + | {:state_change, DataChannel.ref(), DataChannel.ready_state()} + + @spec new() :: t() + def new do + %{ + ref: ExSCTP.new(), + connected: false, + id_type: nil, + timer: nil, + channels: %{} + } + end + + @spec connect(t()) :: {[event()], t()} + def connect(%{connected: true} = sctp_transport), do: {[], sctp_transport} + + def connect(sctp_transport) do + :ok = ExSCTP.connect(sctp_transport.ref) + handle_events(sctp_transport) + end + + @spec set_role(t(), :active | :passive) :: t() + def set_role(%{id_type: t} = sctp_transport, _type) when t != nil, do: sctp_transport + def set_role(sctp_transport, :active), do: %{sctp_transport | id_type: :even} + def set_role(sctp_transport, :passive), do: %{sctp_transport | id_type: :odd} + + @spec data_channels?(t()) :: boolean() + def data_channels?(sctp_transport) do + sctp_transport.channels != %{} + end + + @spec add_channel( + t(), + String.t(), + boolean(), + String.t(), + non_neg_integer() | nil, + non_neg_integer() | nil + ) :: + {[event()], DataChannel.t(), t()} + def add_channel(sctp_transport, label, ordered, protocol, lifetime, max_rtx) do + channel = %DataChannel{ + ref: make_ref(), + id: nil, + label: label, + ordered: ordered, + protocol: protocol, + ready_state: :connecting, + max_packet_life_time: lifetime, + max_retransmits: max_rtx + } + + channels = Map.put(sctp_transport.channels, channel.ref, channel) + sctp_transport = %{sctp_transport | channels: channels} + + {events, sctp_transport} = + if sctp_transport.connected do + sctp_transport = handle_pending_channels(sctp_transport) + handle_events(sctp_transport) + else + {[], sctp_transport} + end + + {events, channel, sctp_transport} + end + + # TODO: close channel + + @spec send(t(), DataChannel.ref(), :string | :binary, binary()) :: {[event()], t()} + def send(sctp_transport, ref, type, data) do + {ppi, data} = to_raw_data(data, type) + + case Map.fetch(sctp_transport.channels, ref) do + {:ok, %DataChannel{ready_state: :open, id: id}} when id != nil -> + :ok = ExSCTP.send(sctp_transport.ref, id, ppi, data) + handle_events(sctp_transport) + + {:ok, %DataChannel{id: id}} -> + Logger.warning( + "Trying to send data over DataChannel with id #{id} that is not opened yet" + ) + + {[], sctp_transport} + + :error -> + Logger.warning("Trying to send data over non-existing DataChannel with ref #{ref}") + {[], sctp_transport} + end + end + + @spec handle_timeout(t()) :: {[event()], t()} + def handle_timeout(sctp_transport) do + ExSCTP.handle_timeout(sctp_transport.ref) + handle_events(sctp_transport) + end + + @spec handle_data(t(), binary()) :: {[event()], t()} + def handle_data(sctp_transport, data) do + :ok = ExSCTP.handle_data(sctp_transport.ref, data) + handle_events(sctp_transport) + end + + defp handle_pending_channels(sctp_transport) do + sctp_transport.channels + |> Map.values() + |> Enum.filter(fn channel -> channel.id == nil end) + |> Enum.reduce(sctp_transport, fn channel, transport -> + handle_pending_channel(transport, channel) + end) + end + + defp handle_pending_channel(sctp_transport, channel) do + id = new_id(sctp_transport) + :ok = ExSCTP.open_stream(sctp_transport.ref, id) + + {reliability, param} = + cond do + channel.max_packet_life_time != nil -> {:timed, channel.max_packet_life_time} + channel.max_retransmits != nil -> {:rexmit, channel.max_retransmits} + true -> {:reliable, 0} + end + + dco = %DCEP.DataChannelOpen{ + reliability: reliability, + order: if(channel.ordered, do: :ordered, else: :unordered), + label: channel.label, + protocol: channel.protocol, + param: param, + priority: 0 + } + + :ok = ExSCTP.send(sctp_transport.ref, id, @dcep_ppi, DCEP.encode(dco)) + + channel = %DataChannel{channel | id: id} + %{sctp_transport | channels: Map.replace!(sctp_transport.channels, channel.ref, channel)} + end + + defp handle_events(sctp_transport, events \\ []) do + event = ExSCTP.poll(sctp_transport.ref) + + case handle_event(sctp_transport, event) do + {:none, transport} -> {Enum.reverse(events), transport} + {nil, transport} -> handle_events(transport, events) + {other, transport} -> handle_events(transport, [other | events]) + end + end + + # if SCTP disconnected, most likely DTLS disconnected, so we won't handle this here explcitly + defp handle_event(sctp_transport, :disconnected), do: {nil, sctp_transport} + defp handle_event(sctp_transport, :none), do: {:none, sctp_transport} + defp handle_event(sctp_transport, {:transmit, _data} = event), do: {event, sctp_transport} + + defp handle_event(sctp_transport, {:stream_opened, id}) do + Logger.debug("SCTP stream #{id} has been opened") + {nil, sctp_transport} + end + + defp handle_event(sctp_transport, {:stream_closed, _id}) do + # TODO: handle closing channels + {nil, sctp_transport} + end + + defp handle_event(sctp_transport, :connected) do + Logger.debug("SCTP connection has been established") + + sctp_transport = + %{sctp_transport | connected: true} + |> handle_pending_channels() + + {nil, sctp_transport} + end + + defp handle_event(sctp_transport, {:timeout, val}) do + # TODO: this seems to work + # but sometimes the data is send after quite a substensial timeout + # calling `handle_timeout` periodically (i.e. every 50s) seems to work better + # which is wierd, to investigate + if sctp_transport.timer != nil do + Process.cancel_timer(sctp_transport.timer) + end + + timer = + case val do + nil -> nil + ms -> Process.send_after(self(), :sctp_timeout, ms) + end + + {nil, %{sctp_transport | timer: timer}} + end + + defp handle_event(sctp_transport, {:data, id, @dcep_ppi, data}) do + with {:ok, dcep} <- DCEP.decode(data), + {:ok, sctp_transport, event} <- handle_dcep(sctp_transport, id, dcep) do + {event, sctp_transport} + else + :error -> + # TODO: close the channel + Logger.warning("Received invalid DCEP message. Closing the stream with id #{id}") + {nil, sctp_transport} + end + end + + defp handle_event(sctp_transport, {:data, id, ppi, data}) do + with {:ok, data} <- from_raw_data(data, ppi), + {ref, %DataChannel{ready_state: :open}} <- + Enum.find(sctp_transport.channels, fn {_k, v} -> v.id == id end) do + {{:data, ref, data}, sctp_transport} + else + {_ref, %DataChannel{}} -> + Logger.warning("Received data on DataChannel with id #{id} that is not open. Discarding") + {nil, sctp_transport} + + nil -> + Logger.warning( + "Received data over non-existing DataChannel on stream with id #{id}. Discarding" + ) + + {nil, sctp_transport} + + _other -> + Logger.warning("Received data in invalid format on stream with id #{id}. Discarding") + {nil, sctp_transport} + end + end + + defp handle_dcep(sctp_transport, id, %DCEP.DataChannelOpen{} = dco) do + with false <- Enum.any?(sctp_transport.channels, fn {_k, v} -> v.id == id end), + true <- valid_id?(sctp_transport, id) do + :ok = ExSCTP.send(sctp_transport.ref, id, @dcep_ppi, DCEP.encode(%DCEP.DataChannelAck{})) + + Logger.debug("Remote opened DataChannel #{id} succesfully") + + channel = %DataChannel{ + ref: make_ref(), + id: id, + label: dco.label, + ordered: dco.order == :ordered, + protocol: dco.protocol, + ready_state: :open, + max_packet_life_time: if(dco.reliability == :timed, do: dco.param, else: nil), + max_retransmits: if(dco.reliability == :rexmit, do: dco.param, else: nil) + } + + # In theory, we should also send the :open event here (W3C 6.2.3) + channels = Map.put(sctp_transport.channels, channel.ref, channel) + {:ok, %{sctp_transport | channels: channels}, {:channel, channel}} + else + _other -> :error + end + end + + defp handle_dcep(sctp_transport, id, %DCEP.DataChannelAck{}) do + case Enum.find(sctp_transport.channels, fn {_k, v} -> v.id == id end) do + {ref, %DataChannel{ready_state: :connecting} = channel} -> + Logger.debug("Locally opened DataChannel #{id} has been negotiated succesfully") + # TODO: set the parameters + channel = %DataChannel{channel | ready_state: :open} + channels = Map.put(sctp_transport.channels, ref, channel) + event = {:state_change, ref, :open} + + {:ok, %{sctp_transport | channels: channels}, event} + + _other -> + # TODO: close the channel + Logger.warning("Received DCEP Ack without sending the DCEP Open message on stream #{id}") + {:ok, sctp_transport, nil} + end + end + + defp from_raw_data(data, ppi) when ppi in [51, 53], do: {:ok, data} + defp from_raw_data(_data, ppi) when ppi in [56, 57], do: {:ok, <<>>} + defp from_raw_data(_data, _ppi), do: :error + + defp to_raw_data(<<>>, :string), do: {56, <<0>>} + defp to_raw_data(data, :string), do: {51, data} + defp to_raw_data(<<>>, :binary), do: {57, <<0>>} + defp to_raw_data(data, :binary), do: {53, data} + + # for remote ids (so must be opposite than ours) + defp valid_id?(%{id_type: :even}, id), do: rem(id, 2) == 1 + defp valid_id?(%{id_type: :odd}, id), do: rem(id, 2) == 0 + + defp new_id(sctp_transport) do + max_id = + sctp_transport.channels + |> Enum.filter(fn {_k, v} -> v.id != nil end) + |> Enum.map(fn {_k, v} -> v.id end) + |> Enum.max(&>=/2, fn -> -1 end) + + case {sctp_transport.id_type, rem(max_id, 2)} do + {:even, -1} -> 0 + {:odd, -1} -> 1 + {:even, 0} -> max_id + 2 + {:even, 1} -> max_id + 1 + {:odd, 0} -> max_id + 1 + {:odd, 1} -> max_id + 2 + end + end +end diff --git a/lib/ex_webrtc/sctp_transport/dcep.ex b/lib/ex_webrtc/sctp_transport/dcep.ex new file mode 100644 index 00000000..7f1efe7e --- /dev/null +++ b/lib/ex_webrtc/sctp_transport/dcep.ex @@ -0,0 +1,76 @@ +defmodule ExWebRTC.SCTPTransport.DCEP do + @moduledoc false + # based on RFC 8832 + + defmodule DataChannelAck do + @moduledoc false + + defstruct [] + + def decode(<<>>), do: {:ok, %__MODULE__{}} + def decode(_other), do: :error + + def encode(%__MODULE__{}), do: <<0x02>> + end + + defmodule DataChannelOpen do + @moduledoc false + + @enforce_keys [:reliability, :order, :label, :protocol, :priority, :param] + defstruct @enforce_keys + + def decode( + <> + ) do + with {:ok, reliability, order} <- to_channel_type(ch_type), + <> <- rest, + <> <- rest do + dca = + %__MODULE__{ + reliability: reliability, + order: order, + param: param, + label: label, + protocol: protocol, + priority: priority + } + + {:ok, dca} + else + _other -> :error + end + end + + def encode(%__MODULE__{} = dco) do + ch_type = from_channel_type(dco.reliability, dco.order) + label_len = byte_size(dco.label) + proto_len = byte_size(dco.protocol) + + <<0x03::8, ch_type::8, dco.priority::16, dco.param::32, label_len::16, proto_len::16, + dco.label::binary-size(label_len), dco.protocol::binary-size(proto_len)>> + end + + # most significant bit determines order, + # least significant 2 bits determine reliability + defp to_channel_type(0x00), do: {:ok, :reliable, :ordered} + defp to_channel_type(0x80), do: {:ok, :reliable, :unordered} + defp to_channel_type(0x01), do: {:ok, :rexmit, :ordered} + defp to_channel_type(0x81), do: {:ok, :rexmit, :unordered} + defp to_channel_type(0x02), do: {:ok, :timed, :ordered} + defp to_channel_type(0x82), do: {:ok, :timed, :unordered} + defp to_channel_type(_other), do: :error + + defp from_channel_type(:reliable, :ordered), do: 0x00 + defp from_channel_type(:reliable, :unordered), do: 0x80 + defp from_channel_type(:rexmit, :ordered), do: 0x01 + defp from_channel_type(:rexmit, :unordered), do: 0x81 + defp from_channel_type(:timed, :ordered), do: 0x02 + defp from_channel_type(:timed, :unordered), do: 0x82 + end + + def decode(<<0x03::8, rest::binary>>), do: DataChannelOpen.decode(rest) + def decode(<<0x02::8, rest::binary>>), do: DataChannelAck.decode(rest) + def decode(_other), do: :error + + def encode(%mod{} = dcep), do: mod.encode(dcep) +end diff --git a/lib/ex_webrtc/sdp_utils.ex b/lib/ex_webrtc/sdp_utils.ex index ad8c1422..b16985eb 100644 --- a/lib/ex_webrtc/sdp_utils.ex +++ b/lib/ex_webrtc/sdp_utils.ex @@ -12,7 +12,8 @@ defmodule ExWebRTC.SDPUtils do def ensure_valid(sdp) do with :ok <- ensure_non_empty(sdp), :ok <- ensure_mid(sdp), - :ok <- ensure_bundle(sdp) do + :ok <- ensure_bundle(sdp), + :ok <- ensure_data_channel_valid(sdp) do ensure_rtcp_mux(sdp) end end @@ -65,10 +66,23 @@ defmodule ExWebRTC.SDPUtils do Enum.filter(groups, fn %ExSDP.Attribute.Group{semantics: name} -> name == to_filter end) end + defp ensure_data_channel_valid(sdp) do + with [data_mline] <- Enum.filter(sdp.media, &data_channel?/1), + "webrtc-datachannel" <- data_mline.fmt, + sctp_port when not is_nil(sctp_port) <- ExSDP.get_attribute(data_mline, "sctp-port") do + :ok + else + [] -> :ok + mlines when is_list(mlines) -> {:error, :multiple_data_mlines} + fmt when is_binary(fmt) -> {:error, :invalid_datachannel_format} + nil -> {:error, :no_sctp_port} + end + end + defp ensure_rtcp_mux(sdp) do # Firefox does not add `rtcp_mux` in rejected mlines sdp.media - |> Enum.reject(&rejected?/1) + |> Enum.reject(&(rejected?(&1) or data_channel?(&1))) |> Enum.all?(&(ExSDP.get_attribute(&1, :rtcp_mux) == :rtcp_mux)) |> case do true -> :ok @@ -364,6 +378,11 @@ defmodule ExWebRTC.SDPUtils do def rejected?(%ExSDP.Media{}), do: false + @spec data_channel?(ExSDP.Media.t()) :: boolean() + def data_channel?(%ExSDP.Media{fmt: "webrtc-datachannel"}), do: true + def data_channel?(%ExSDP.Media{fmt: ["webrtc-datachannel"]}), do: true + def data_channel?(%ExSDP.Media{}), do: false + defp do_get_ice_credentials(sdp_or_mline) do ice_ufrag = case ExSDP.get_attribute(sdp_or_mline, :ice_ufrag) do diff --git a/mix.exs b/mix.exs index dda7bb08..da47b341 100644 --- a/mix.exs +++ b/mix.exs @@ -58,10 +58,11 @@ defmodule ExWebRTC.MixProject do [ {:ex_sdp, "~> 1.0"}, {:ex_ice, "~> 0.8.0"}, - {:ex_dtls, "~> 0.15.0"}, + {:ex_dtls, github: "elixir-webrtc/ex_dtls"}, {:ex_libsrtp, "~> 0.7.1"}, {:ex_rtp, "~> 0.4.0"}, {:ex_rtcp, "~> 0.4.0"}, + {:ex_sctp, github: "elixir-webrtc/ex_sctp"}, {:crc, "~> 0.10"}, # dev/test diff --git a/mix.lock b/mix.lock index 5bdd05cd..708962ed 100644 --- a/mix.lock +++ b/mix.lock @@ -12,11 +12,12 @@ "elixir_uuid": {:hex, :elixir_uuid, "1.2.1", "dce506597acb7e6b0daeaff52ff6a9043f5919a4c3315abb4143f0b00378c097", [:mix], [], "hexpm", "f7eba2ea6c3555cea09706492716b0d87397b88946e6380898c2889d68585752"}, "erlex": {:hex, :erlex, "0.2.7", "810e8725f96ab74d17aac676e748627a07bc87eb950d2b83acd29dc047a30595", [:mix], [], "hexpm", "3ed95f79d1a844c3f6bf0cea61e0d5612a42ce56da9c03f01df538685365efb0"}, "ex_doc": {:hex, :ex_doc, "0.31.2", "8b06d0a5ac69e1a54df35519c951f1f44a7b7ca9a5bb7a260cd8a174d6322ece", [:mix], [{:earmark_parser, "~> 1.4.39", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.1", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "317346c14febaba9ca40fd97b5b5919f7751fb85d399cc8e7e8872049f37e0af"}, - "ex_dtls": {:hex, :ex_dtls, "0.15.2", "6c8c0f8eb67525216551bd3e0322ab33c9d851d56ef3e065efab4fd277a8fbb9", [:mix], [{:unifex, "~> 1.0", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "6b852bc926bbdc9c1b9c4ecc6cfc73a89d4e106042802cefea2c1503072a9f2a"}, + "ex_dtls": {:git, "https://github.com/elixir-webrtc/ex_dtls.git", "dff8ec1998dfb556b2d3dafbd30574d0da18b958", []}, "ex_ice": {:hex, :ex_ice, "0.8.0", "f9bd181e8fd2f8ac9a808587ee8a47bf667143069d75f6e4892a62156d798aa7", [:mix], [{:elixir_uuid, "~> 1.0", [hex: :elixir_uuid, repo: "hexpm", optional: false]}, {:ex_stun, "~> 0.2.0", [hex: :ex_stun, repo: "hexpm", optional: false]}, {:ex_turn, "~> 0.1.0", [hex: :ex_turn, repo: "hexpm", optional: false]}], "hexpm", "b0476f6b18986f6df48fda4cecb3be5022323572790d1bb49da10b177c936b4e"}, "ex_libsrtp": {:hex, :ex_libsrtp, "0.7.2", "211bd89c08026943ce71f3e2c0231795b99cee748808ed3ae7b97cd8d2450b6b", [:mix], [{:bunch, "~> 1.6", [hex: :bunch, repo: "hexpm", optional: false]}, {:bundlex, "~> 1.3", [hex: :bundlex, repo: "hexpm", optional: false]}, {:membrane_precompiled_dependency_provider, "~> 0.1.0", [hex: :membrane_precompiled_dependency_provider, repo: "hexpm", optional: false]}, {:unifex, "~> 1.1", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "2e20645d0d739a4ecdcf8d4810a0c198120c8a2f617f2b75b2e2e704d59f492a"}, "ex_rtcp": {:hex, :ex_rtcp, "0.4.0", "f9e515462a9581798ff6413583a25174cfd2101c94a2ebee871cca7639886f0a", [:mix], [], "hexpm", "28956602cf210d692fcdaf3f60ca49681634e1deb28ace41246aee61ee22dc3b"}, "ex_rtp": {:hex, :ex_rtp, "0.4.0", "1f1b5c1440a904706011e3afbb41741f5da309ce251cb986690ce9fd82636658", [:mix], [], "hexpm", "0f72d80d5953a62057270040f0f1ee6f955c08eeae82ac659c038001d7d5a790"}, + "ex_sctp": {:git, "https://github.com/elixir-webrtc/ex_sctp.git", "3de986b3cd00796a1f2a5ac40b001682d5712264", []}, "ex_sdp": {:hex, :ex_sdp, "1.0.0", "c66cd66d60ad03ff1eecdc6db6a1b8a7b89fec260fcc22e8d6703fc5bbf430a3", [:mix], [{:bunch, "~> 1.3", [hex: :bunch, repo: "hexpm", optional: false]}, {:elixir_uuid, "~> 1.2", [hex: :elixir_uuid, repo: "hexpm", optional: false]}], "hexpm", "e165dff988b8ab9d93588636aa5f3f683e1f848fc63b78b12382c8fa3dd39216"}, "ex_stun": {:hex, :ex_stun, "0.2.0", "feb1fc7db0356406655b2a617805e6c712b93308c8ea2bf0ba1197b1f0866deb", [:mix], [], "hexpm", "1e01ba8290082ccbf37acaa5190d1f69b51edd6de2026a8d6d51368b29d115d0"}, "ex_turn": {:hex, :ex_turn, "0.1.0", "177405aadf3d754567d0d37cf881a83f9cacf8f45314d188633b04c4a9e7c1ec", [:mix], [{:ex_stun, "~> 0.2.0", [hex: :ex_stun, repo: "hexpm", optional: false]}], "hexpm", "d677737fb7d45274d5dac19fe3c26b9038b6effbc0a6b3e7417bccc76b6d1cd3"}, @@ -36,8 +37,10 @@ "nimble_pool": {:hex, :nimble_pool, "1.1.0", "bf9c29fbdcba3564a8b800d1eeb5a3c58f36e1e11d7b7fb2e084a643f645f06b", [:mix], [], "hexpm", "af2e4e6b34197db81f7aad230c1118eac993acc0dae6bc83bac0126d4ae0813a"}, "qex": {:hex, :qex, "0.5.1", "0d82c0f008551d24fffb99d97f8299afcb8ea9cf99582b770bd004ed5af63fd6", [:mix], [], "hexpm", "935a39fdaf2445834b95951456559e9dc2063d0a055742c558a99987b38d6bab"}, "req": {:hex, :req, "0.5.6", "8fe1eead4a085510fe3d51ad854ca8f20a622aae46e97b302f499dfb84f726ac", [:mix], [{:brotli, "~> 0.3.1", [hex: :brotli, repo: "hexpm", optional: true]}, {:ezstd, "~> 1.0", [hex: :ezstd, repo: "hexpm", optional: true]}, {:finch, "~> 0.17", [hex: :finch, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:mime, "~> 2.0.6 or ~> 2.1", [hex: :mime, repo: "hexpm", optional: false]}, {:nimble_csv, "~> 1.0", [hex: :nimble_csv, repo: "hexpm", optional: true]}, {:plug, "~> 1.0", [hex: :plug, repo: "hexpm", optional: true]}], "hexpm", "cfaa8e720945d46654853de39d368f40362c2641c4b2153c886418914b372185"}, + "rustler": {:hex, :rustler, "0.34.0", "e9a73ee419fc296a10e49b415a2eb87a88c9217aa0275ec9f383d37eed290c1c", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:req, "~> 0.5", [hex: :req, repo: "hexpm", optional: false]}, {:toml, "~> 0.6", [hex: :toml, repo: "hexpm", optional: false]}], "hexpm", "1d0c7449482b459513003230c0e2422b0252245776fe6fd6e41cb2b11bd8e628"}, "shmex": {:hex, :shmex, "0.5.1", "81dd209093416bf6608e66882cb7e676089307448a1afd4fc906c1f7e5b94cf4", [:mix], [{:bunch_native, "~> 0.5.0", [hex: :bunch_native, repo: "hexpm", optional: false]}, {:bundlex, "~> 1.0", [hex: :bundlex, repo: "hexpm", optional: false]}], "hexpm", "c29f8286891252f64c4e1dac40b217d960f7d58def597c4e606ff8fbe71ceb80"}, "telemetry": {:hex, :telemetry, "1.2.1", "68fdfe8d8f05a8428483a97d7aab2f268aaff24b49e0f599faa091f1d4e7f61c", [:rebar3], [], "hexpm", "dad9ce9d8effc621708f99eac538ef1cbe05d6a874dd741de2e689c47feafed5"}, + "toml": {:hex, :toml, "0.7.0", "fbcd773caa937d0c7a02c301a1feea25612720ac3fa1ccb8bfd9d30d822911de", [:mix], [], "hexpm", "0690246a2478c1defd100b0c9b89b4ea280a22be9a7b313a8a058a2408a2fa70"}, "unifex": {:hex, :unifex, "1.2.0", "90d1ec5e6d788350e07e474f7bd8b0ee866d6606beb9ca4e20dbb26328712a84", [:mix], [{:bunch, "~> 1.0", [hex: :bunch, repo: "hexpm", optional: false]}, {:bundlex, "~> 1.4", [hex: :bundlex, repo: "hexpm", optional: false]}, {:shmex, "~> 0.5.0", [hex: :shmex, repo: "hexpm", optional: false]}], "hexpm", "7a8395aabc3ba6cff04bbe5b995de7f899a38eb57f189e49927d6b8b6ccb6883"}, "zarex": {:hex, :zarex, "1.0.5", "58239e3ee5d75f343262bb4df5cf466555a1c689f920e5d3651a9333972f7c7e", [:mix], [], "hexpm", "9fb72ef0567c2b2742f5119a1ba8a24a2fabb21b8d09820aefbf3e592fa9a46a"}, } diff --git a/test/ex_webrtc/data_channel_test.exs b/test/ex_webrtc/data_channel_test.exs new file mode 100644 index 00000000..916d4aed --- /dev/null +++ b/test/ex_webrtc/data_channel_test.exs @@ -0,0 +1,168 @@ +defmodule ExWebRTC.DataChannelTest do + use ExUnit.Case, async: true + + import ExWebRTC.Support.TestUtils + + alias ExWebRTC.{DataChannel, PeerConnection, MediaStreamTrack} + + test "establishing channels" do + {:ok, pc1} = PeerConnection.start_link() + {:ok, pc2} = PeerConnection.start_link() + + label1 = "my label 1" + {:ok, %DataChannel{ref: ref1}} = PeerConnection.create_data_channel(pc1, label1) + assert_receive {:ex_webrtc, ^pc1, :negotiation_needed} + + :ok = negotiate(pc1, pc2) + + refute_receive {:ex_webrtc, ^pc2, {:data_channel, _}} + + :ok = connect(pc1, pc2) + + assert_receive {:ex_webrtc, ^pc2, {:data_channel, chan1}} + assert %DataChannel{id: 1, label: ^label1, ordered: true} = chan1 + assert_receive {:ex_webrtc, ^pc1, {:data_channel_state_change, ^ref1, :open}} + + label2 = "my label 2" + protocol = "my proto" + + {:ok, %DataChannel{ref: ref2}} = + PeerConnection.create_data_channel(pc1, label2, protocol: protocol, ordered: false) + + refute_receive {:ex_webrtc, ^pc1, :negotiation_needed} + + assert_receive {:ex_webrtc, ^pc2, {:data_channel, chan2}} + assert %DataChannel{id: 3, label: ^label2, protocol: ^protocol, ordered: false} = chan2 + assert_receive {:ex_webrtc, ^pc1, {:data_channel_state_change, ^ref2, :open}} + + label3 = "my label 3" + {:ok, %DataChannel{ref: ref3}} = PeerConnection.create_data_channel(pc2, label3) + + refute_receive {:ex_webrtc, ^pc2, :negotiation_needed} + + assert_receive {:ex_webrtc, ^pc1, {:data_channel, chan3}} + assert %DataChannel{id: 4, label: ^label3} = chan3 + assert_receive {:ex_webrtc, ^pc2, {:data_channel_state_change, ^ref3, :open}} + end + + describe "negotiating" do + test "with only channel added" do + {:ok, pc1} = PeerConnection.start_link() + {:ok, pc2} = PeerConnection.start_link() + + label = "my label" + {:ok, %DataChannel{ref: ref1}} = PeerConnection.create_data_channel(pc1, label) + + :ok = negotiate(pc1, pc2) + :ok = connect(pc1, pc2) + + assert_receive {:ex_webrtc, ^pc2, {:data_channel, %DataChannel{label: ^label}}} + assert_receive {:ex_webrtc, ^pc1, {:data_channel_state_change, ^ref1, :open}} + end + + test "with channel mixed with transceivers" do + {:ok, pc1} = PeerConnection.start_link() + {:ok, pc2} = PeerConnection.start_link() + + {:ok, _sender} = PeerConnection.add_track(pc1, MediaStreamTrack.new(:audio)) + label1 = "my label" + {:ok, %DataChannel{ref: ref1}} = PeerConnection.create_data_channel(pc1, label1) + {:ok, _sender} = PeerConnection.add_track(pc1, MediaStreamTrack.new(:video)) + + :ok = negotiate(pc1, pc2) + :ok = connect(pc1, pc2) + + assert_receive {:ex_webrtc, ^pc2, {:data_channel, %DataChannel{label: ^label1}}} + assert_receive {:ex_webrtc, ^pc1, {:data_channel_state_change, ^ref1, :open}} + + # add more tracks and channels and renegotiate + {:ok, _sender} = PeerConnection.add_track(pc1, MediaStreamTrack.new(:video)) + :ok = negotiate(pc1, pc2) + + label2 = "my label 2" + {:ok, %DataChannel{ref: ref2}} = PeerConnection.create_data_channel(pc2, label2) + assert_receive {:ex_webrtc, ^pc1, {:data_channel, %DataChannel{label: ^label2}}} + assert_receive {:ex_webrtc, ^pc2, {:data_channel_state_change, ^ref2, :open}} + end + + test "with channel added only in renegotiation" do + {:ok, pc1} = PeerConnection.start_link() + {:ok, pc2} = PeerConnection.start_link() + {:ok, _sender} = PeerConnection.add_track(pc1, MediaStreamTrack.new(:audio)) + {:ok, _sender} = PeerConnection.add_track(pc1, MediaStreamTrack.new(:video)) + + :ok = negotiate(pc1, pc2) + :ok = connect(pc1, pc2) + + label = "my label" + {:ok, %DataChannel{ref: ref}} = PeerConnection.create_data_channel(pc2, label) + + refute_receive {:ex_webrtc, ^pc1, {:data_channel, _}} + + :ok = negotiate(pc2, pc1) + + assert_receive {:ex_webrtc, ^pc1, {:data_channel, %DataChannel{label: ^label}}} + assert_receive {:ex_webrtc, ^pc2, {:data_channel_state_change, ^ref, :open}} + end + end + + describe "sending data" do + setup do + {:ok, pc1} = PeerConnection.start_link() + {:ok, pc2} = PeerConnection.start_link() + {:ok, %DataChannel{ref: ref1}} = PeerConnection.create_data_channel(pc1, "label") + + :ok = negotiate(pc1, pc2) + :ok = connect(pc1, pc2) + + assert_receive {:ex_webrtc, ^pc2, {:data_channel, %DataChannel{ref: ref2}}} + assert_receive {:ex_webrtc, ^pc1, {:data_channel_state_change, ^ref1, :open}} + + %{pc1: pc1, pc2: pc2, ref1: ref1, ref2: ref2} + end + + test "message from initiating peer", %{pc1: pc1, pc2: pc2, ref1: ref1, ref2: ref2} do + data1 = <<1, 2, 3>> + :ok = PeerConnection.send_data(pc1, ref1, data1) + assert_receive {:ex_webrtc, ^pc2, {:data, ^ref2, ^data1}} + + data2 = for i <- 1..2000, into: <<>>, do: <> + :ok = PeerConnection.send_data(pc1, ref1, data2) + assert_receive {:ex_webrtc, ^pc2, {:data, ^ref2, ^data2}} + + :ok = PeerConnection.send_data(pc1, ref1, <<>>) + assert_receive {:ex_webrtc, ^pc2, {:data, ^ref2, <<>>}} + end + + test "from other peer", %{pc1: pc1, pc2: pc2, ref1: ref1, ref2: ref2} do + data1 = <<1, 2, 3>> + :ok = PeerConnection.send_data(pc2, ref2, data1) + assert_receive {:ex_webrtc, ^pc1, {:data, ^ref1, ^data1}} + + data2 = <<>> + :ok = PeerConnection.send_data(pc2, ref2, data2) + assert_receive {:ex_webrtc, ^pc1, {:data, ^ref1, ^data2}} + end + + test "back and forth", %{pc1: pc1, pc2: pc2, ref1: ref1, ref2: ref2} do + data = for i <- 1..1024, into: <<>>, do: <> + :ok = PeerConnection.send_data(pc2, ref2, data) + assert_receive {:ex_webrtc, ^pc1, {:data, ^ref1, msg}} + :ok = PeerConnection.send_data(pc1, ref1, msg) + assert_receive {:ex_webrtc, ^pc2, {:data, ^ref2, next_msg}} + :ok = PeerConnection.send_data(pc2, ref2, next_msg) + assert_receive {:ex_webrtc, ^pc1, {:data, ^ref1, ^data}} + end + + test "over distinct channels", %{pc1: pc1, pc2: pc2, ref1: ref1, ref2: ref2} do + {:ok, %DataChannel{ref: ref3}} = PeerConnection.create_data_channel(pc2, "next label") + assert_receive {:ex_webrtc, ^pc1, {:data_channel, %DataChannel{ref: ref4}}} + + data = for i <- 1..1024, into: <<>>, do: <> + :ok = PeerConnection.send_data(pc1, ref4, data) + assert_receive {:ex_webrtc, ^pc2, {:data, ^ref3, msg}} + :ok = PeerConnection.send_data(pc2, ref2, msg) + assert_receive {:ex_webrtc, ^pc1, {:data, ^ref1, ^data}} + end + end +end diff --git a/test/ex_webrtc/peer_connection_test.exs b/test/ex_webrtc/peer_connection_test.exs index 71f5c66a..6300d0e5 100644 --- a/test/ex_webrtc/peer_connection_test.exs +++ b/test/ex_webrtc/peer_connection_test.exs @@ -1390,18 +1390,4 @@ defmodule ExWebRTC.PeerConnectionTest do refute_receive {:ex_webrtc, ^pc1, :negotiation_needed} refute_receive {:ex_webrtc, ^pc2, :negotiation_needed}, 0 end - - defp connect(pc1, pc2) do - # exchange ICE candidates - assert_receive {:ex_webrtc, ^pc1, {:ice_candidate, candidate}} - :ok = PeerConnection.add_ice_candidate(pc2, candidate) - assert_receive {:ex_webrtc, ^pc2, {:ice_candidate, candidate}} - :ok = PeerConnection.add_ice_candidate(pc1, candidate) - - # wait to establish connection - assert_receive {:ex_webrtc, ^pc1, {:connection_state_change, :connected}} - assert_receive {:ex_webrtc, ^pc2, {:connection_state_change, :connected}} - - :ok - end end diff --git a/test/ex_webrtc/sctp_transport/dcep_test.exs b/test/ex_webrtc/sctp_transport/dcep_test.exs new file mode 100644 index 00000000..c1eaf09f --- /dev/null +++ b/test/ex_webrtc/sctp_transport/dcep_test.exs @@ -0,0 +1,39 @@ +defmodule ExWebRTC.SCTPTransport.DCEPTest do + use ExUnit.Case, async: true + + alias ExWebRTC.SCTPTransport.DCEP + + @encoded_dco <<3, 130, 0, 5, 0, 0, 0, 100, 0, 5, 0, 6, 104, 101, 108, 108, 111, 119, 101, 98, + 114, 116, 99>> + @decoded_dco %DCEP.DataChannelOpen{ + reliability: :timed, + order: :unordered, + label: "hello", + protocol: "webrtc", + priority: 5, + param: 100 + } + + @encoded_dca <<2>> + @decoded_dca %DCEP.DataChannelAck{} + + describe "decode/1" do + test "DataChannelAck" do + assert DCEP.encode(@decoded_dca) == @encoded_dca + end + + test "DataChannelOpen" do + assert DCEP.encode(@decoded_dco) == @encoded_dco + end + end + + describe "encode/1" do + test "DataChannelAck" do + assert {:ok, @decoded_dca} = DCEP.decode(@encoded_dca) + end + + test "DataChannelOpen" do + assert {:ok, @decoded_dco} = DCEP.decode(@encoded_dco) + end + end +end diff --git a/test/support/test_utils.ex b/test/support/test_utils.ex index bc09215b..4fb9a3ed 100644 --- a/test/support/test_utils.ex +++ b/test/support/test_utils.ex @@ -13,4 +13,27 @@ defmodule ExWebRTC.Support.TestUtils do :ok = PeerConnection.set_remote_description(pc1, answer) :ok end + + @spec connect(PeerConnection.peer_connection(), PeerConnection.peer_connection()) :: :ok + def connect(pc1, pc2) do + # exchange ICE candidates + for {pc1, pc2} <- [{pc1, pc2}, {pc2, pc1}] do + receive do + {:ex_webrtc, ^pc1, {:ice_candidate, candidate}} -> + :ok = PeerConnection.add_ice_candidate(pc2, candidate) + after + 2000 -> raise "Unable to connect" + end + end + + for pc <- [pc1, pc2] do + receive do + {:ex_webrtc, ^pc, {:connection_state_change, :connected}} -> :ok + after + 2000 -> raise "Unable to connect" + end + end + + :ok + end end