diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index b40676b1..b9341434 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -14,8 +14,8 @@ jobs:
name: CI on OTP ${{matrix.otp}} / Elixir ${{matrix.elixir}}
strategy:
matrix:
- otp: ['26']
- elixir: ['1.16']
+ otp: ['27']
+ elixir: ['1.17']
steps:
- name: Set up Elixir
uses: erlef/setup-beam@v1
diff --git a/examples/chat/.formatter.exs b/examples/chat/.formatter.exs
new file mode 100644
index 00000000..d2cda26e
--- /dev/null
+++ b/examples/chat/.formatter.exs
@@ -0,0 +1,4 @@
+# Used by "mix format"
+[
+ inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"]
+]
diff --git a/examples/chat/.gitignore b/examples/chat/.gitignore
new file mode 100644
index 00000000..e8c8f34d
--- /dev/null
+++ b/examples/chat/.gitignore
@@ -0,0 +1,26 @@
+# The directory Mix will write compiled artifacts to.
+/_build/
+
+# If you run "mix test --cover", coverage assets end up here.
+/cover/
+
+# The directory Mix downloads your dependencies sources to.
+/deps/
+
+# Where third-party dependencies like ExDoc output generated docs.
+/doc/
+
+# Ignore .fetch files in case you like to edit your project deps locally.
+/.fetch
+
+# If the VM crashes, it generates a dump, let's ignore it too.
+erl_crash.dump
+
+# Also ignore archive artifacts (built via "mix archive.build").
+*.ez
+
+# Ignore package tarball (built via "mix hex.build").
+chat-*.tar
+
+# Temporary files, for example, from tests.
+/tmp/
diff --git a/examples/chat/README.md b/examples/chat/README.md
new file mode 100644
index 00000000..bf2dd62f
--- /dev/null
+++ b/examples/chat/README.md
@@ -0,0 +1,11 @@
+# Chat
+
+Receive text chat messages using DataChannel and send them to other peers.
+
+While in `examples/chat` directory
+
+1. Run `mix deps.get`
+2. Run `mix run --no-halt`
+3. Visit `http://127.0.0.1:8829/index.html` in your browser.
+
+The IP and port of the app can be configured in `config/config.exs`.
diff --git a/examples/chat/config/config.exs b/examples/chat/config/config.exs
new file mode 100644
index 00000000..fd6796eb
--- /dev/null
+++ b/examples/chat/config/config.exs
@@ -0,0 +1,8 @@
+import Config
+
+config :logger, level: :info
+
+# normally you take these from env variables in `config/runtime.exs`
+config :chat,
+ ip: {127, 0, 0, 1},
+ port: 8829
diff --git a/examples/chat/lib/chat.ex b/examples/chat/lib/chat.ex
new file mode 100644
index 00000000..bf112273
--- /dev/null
+++ b/examples/chat/lib/chat.ex
@@ -0,0 +1,16 @@
+defmodule Chat do
+ use Application
+
+ @ip Application.compile_env!(:chat, :ip)
+ @port Application.compile_env!(:chat, :port)
+
+ @impl true
+ def start(_type, _args) do
+ children = [
+ {Bandit, plug: __MODULE__.Router, ip: @ip, port: @port},
+ {Registry, name: __MODULE__.PubSub, keys: :duplicate}
+ ]
+
+ Supervisor.start_link(children, strategy: :one_for_one)
+ end
+end
diff --git a/examples/chat/lib/chat/peer_handler.ex b/examples/chat/lib/chat/peer_handler.ex
new file mode 100644
index 00000000..8f54188b
--- /dev/null
+++ b/examples/chat/lib/chat/peer_handler.ex
@@ -0,0 +1,107 @@
+defmodule Chat.PeerHandler do
+ require Logger
+
+ alias ExWebRTC.{
+ DataChannel,
+ ICECandidate,
+ PeerConnection,
+ SessionDescription
+ }
+
+ @behaviour WebSock
+
+ @ice_servers [
+ %{urls: "stun:stun.l.google.com:19302"}
+ ]
+
+ @impl true
+ def init(_) do
+ {:ok, pc} = PeerConnection.start_link(ice_servers: @ice_servers)
+ {:ok, _} = Registry.register(Chat.PubSub, "chat", [])
+
+ state = %{
+ peer_connection: pc,
+ channel_ref: nil
+ }
+
+ {:ok, state}
+ end
+
+ @impl true
+ def handle_in({msg, [opcode: :text]}, state) do
+ msg
+ |> Jason.decode!()
+ |> handle_ws_msg(state)
+ end
+
+ @impl true
+ def handle_info({:ex_webrtc, _from, msg}, state) do
+ handle_webrtc_msg(msg, state)
+ end
+
+ @impl true
+ def handle_info({:chat_msg, msg}, state) do
+ :ok = PeerConnection.send_data(state.peer_connection, state.channel_ref, msg)
+ {:ok, state}
+ end
+
+ @impl true
+ def terminate(reason, _state) do
+ Logger.warning("WebSocket connection was terminated, reason: #{inspect(reason)}")
+ end
+
+ defp handle_ws_msg(%{"type" => "offer", "data" => data}, state) do
+ Logger.info("Received SDP offer:\n#{data["sdp"]}")
+
+ offer = SessionDescription.from_json(data)
+ :ok = PeerConnection.set_remote_description(state.peer_connection, offer)
+
+ {:ok, answer} = PeerConnection.create_answer(state.peer_connection)
+ :ok = PeerConnection.set_local_description(state.peer_connection, answer)
+
+ answer_json = SessionDescription.to_json(answer)
+
+ msg =
+ %{"type" => "answer", "data" => answer_json}
+ |> Jason.encode!()
+
+ Logger.info("Sent SDP answer:\n#{answer_json["sdp"]}")
+
+ {:push, {:text, msg}, state}
+ end
+
+ defp handle_ws_msg(%{"type" => "ice", "data" => data}, state) do
+ Logger.info("Received ICE candidate: #{data["candidate"]}")
+
+ candidate = ICECandidate.from_json(data)
+ :ok = PeerConnection.add_ice_candidate(state.peer_connection, candidate)
+ {:ok, state}
+ end
+
+ defp handle_webrtc_msg({:ice_candidate, candidate}, state) do
+ candidate_json = ICECandidate.to_json(candidate)
+
+ msg =
+ %{"type" => "ice", "data" => candidate_json}
+ |> Jason.encode!()
+
+ Logger.info("Sent ICE candidate: #{candidate_json["candidate"]}")
+
+ {:push, {:text, msg}, state}
+ end
+
+ defp handle_webrtc_msg({:data_channel, %DataChannel{ref: ref}}, state) do
+ state = %{state | channel_ref: ref}
+ {:ok, state}
+ end
+
+ defp handle_webrtc_msg({:data, ref, data}, %{channel_ref: ref} = state) do
+ Registry.dispatch(Chat.PubSub, "chat", fn entries ->
+ for {pid, _} <- entries, do: send(pid, {:chat_msg, data})
+ end)
+
+ {:ok, state}
+ end
+
+ defp handle_webrtc_msg(_msg, state), do: {:ok, state}
+end
diff --git a/examples/chat/lib/chat/router.ex b/examples/chat/lib/chat/router.ex
new file mode 100644
index 00000000..20fe22b4
--- /dev/null
+++ b/examples/chat/lib/chat/router.ex
@@ -0,0 +1,15 @@
+defmodule Chat.Router do
+ use Plug.Router
+
+ plug(Plug.Static, at: "/", from: :chat)
+ plug(:match)
+ plug(:dispatch)
+
+ get "/ws" do
+ WebSockAdapter.upgrade(conn, Chat.PeerHandler, %{}, [])
+ end
+
+ match _ do
+ send_resp(conn, 404, "not found")
+ end
+end
diff --git a/examples/chat/mix.exs b/examples/chat/mix.exs
new file mode 100644
index 00000000..fee3faed
--- /dev/null
+++ b/examples/chat/mix.exs
@@ -0,0 +1,30 @@
+defmodule Chat.MixProject do
+ use Mix.Project
+
+ def project do
+ [
+ app: :chat,
+ version: "0.1.0",
+ elixir: "~> 1.15",
+ start_permanent: Mix.env() == :prod,
+ deps: deps()
+ ]
+ end
+
+ def application do
+ [
+ extra_applications: [:logger],
+ mod: {Chat, []}
+ ]
+ end
+
+ defp deps do
+ [
+ {:plug, "~> 1.15.0"},
+ {:bandit, "~> 1.2.0"},
+ {:websock_adapter, "~> 0.5.0"},
+ {:jason, "~> 1.4.0"},
+ {:ex_webrtc, path: "../../."}
+ ]
+ end
+end
diff --git a/examples/chat/mix.lock b/examples/chat/mix.lock
new file mode 100644
index 00000000..2ebe8a71
--- /dev/null
+++ b/examples/chat/mix.lock
@@ -0,0 +1,40 @@
+%{
+ "bandit": {:hex, :bandit, "1.2.3", "a98d664a96fec23b68e776062296d76a94b4459795b38209f4ae89cb4225709c", [:mix], [{:hpax, "~> 0.1.1", [hex: :hpax, repo: "hexpm", optional: false]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:thousand_island, "~> 1.0", [hex: :thousand_island, repo: "hexpm", optional: false]}, {:websock, "~> 0.5", [hex: :websock, repo: "hexpm", optional: false]}], "hexpm", "3e29150245a9b5f56944434e5240966e75c917dad248f689ab589b32187a81af"},
+ "bunch": {:hex, :bunch, "1.6.1", "5393d827a64d5f846092703441ea50e65bc09f37fd8e320878f13e63d410aec7", [:mix], [], "hexpm", "286cc3add551628b30605efbe2fca4e38cc1bea89bcd0a1a7226920b3364fe4a"},
+ "bunch_native": {:hex, :bunch_native, "0.5.0", "8ac1536789a597599c10b652e0b526d8833348c19e4739a0759a2bedfd924e63", [:mix], [{:bundlex, "~> 1.0", [hex: :bundlex, repo: "hexpm", optional: false]}], "hexpm", "24190c760e32b23b36edeb2dc4852515c7c5b3b8675b1a864e0715bdd1c8f80d"},
+ "bundlex": {:hex, :bundlex, "1.5.3", "35d01e5bc0679510dd9a327936ffb518f63f47175c26a35e708cc29eaec0890b", [:mix], [{:bunch, "~> 1.0", [hex: :bunch, repo: "hexpm", optional: false]}, {:elixir_uuid, "~> 1.2", [hex: :elixir_uuid, repo: "hexpm", optional: false]}, {:qex, "~> 0.5", [hex: :qex, repo: "hexpm", optional: false]}, {:req, ">= 0.4.0", [hex: :req, repo: "hexpm", optional: false]}, {:zarex, "~> 1.0", [hex: :zarex, repo: "hexpm", optional: false]}], "hexpm", "debd0eac151b404f6216fc60222761dff049bf26f7d24d066c365317650cd118"},
+ "castore": {:hex, :castore, "1.0.8", "dedcf20ea746694647f883590b82d9e96014057aff1d44d03ec90f36a5c0dc6e", [:mix], [], "hexpm", "0b2b66d2ee742cb1d9cb8c8be3b43c3a70ee8651f37b75a8b982e036752983f1"},
+ "crc": {:hex, :crc, "0.10.5", "ee12a7c056ac498ef2ea985ecdc9fa53c1bfb4e53a484d9f17ff94803707dfd8", [:mix, :rebar3], [{:elixir_make, "~> 0.6", [hex: :elixir_make, repo: "hexpm", optional: false]}], "hexpm", "3e673b6495a9525c5c641585af1accba59a1eb33de697bedf341e247012c2c7f"},
+ "elixir_make": {:hex, :elixir_make, "0.8.4", "4960a03ce79081dee8fe119d80ad372c4e7badb84c493cc75983f9d3bc8bde0f", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:certifi, "~> 2.0", [hex: :certifi, repo: "hexpm", optional: true]}], "hexpm", "6e7f1d619b5f61dfabd0a20aa268e575572b542ac31723293a4c1a567d5ef040"},
+ "elixir_uuid": {:hex, :elixir_uuid, "1.2.1", "dce506597acb7e6b0daeaff52ff6a9043f5919a4c3315abb4143f0b00378c097", [:mix], [], "hexpm", "f7eba2ea6c3555cea09706492716b0d87397b88946e6380898c2889d68585752"},
+ "ex_dtls": {:git, "https://github.com/elixir-webrtc/ex_dtls.git", "dff8ec1998dfb556b2d3dafbd30574d0da18b958", []},
+ "ex_ice": {:hex, :ex_ice, "0.8.0", "f9bd181e8fd2f8ac9a808587ee8a47bf667143069d75f6e4892a62156d798aa7", [:mix], [{:elixir_uuid, "~> 1.0", [hex: :elixir_uuid, repo: "hexpm", optional: false]}, {:ex_stun, "~> 0.2.0", [hex: :ex_stun, repo: "hexpm", optional: false]}, {:ex_turn, "~> 0.1.0", [hex: :ex_turn, repo: "hexpm", optional: false]}], "hexpm", "b0476f6b18986f6df48fda4cecb3be5022323572790d1bb49da10b177c936b4e"},
+ "ex_libsrtp": {:hex, :ex_libsrtp, "0.7.2", "211bd89c08026943ce71f3e2c0231795b99cee748808ed3ae7b97cd8d2450b6b", [:mix], [{:bunch, "~> 1.6", [hex: :bunch, repo: "hexpm", optional: false]}, {:bundlex, "~> 1.3", [hex: :bundlex, repo: "hexpm", optional: false]}, {:membrane_precompiled_dependency_provider, "~> 0.1.0", [hex: :membrane_precompiled_dependency_provider, repo: "hexpm", optional: false]}, {:unifex, "~> 1.1", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "2e20645d0d739a4ecdcf8d4810a0c198120c8a2f617f2b75b2e2e704d59f492a"},
+ "ex_rtcp": {:hex, :ex_rtcp, "0.4.0", "f9e515462a9581798ff6413583a25174cfd2101c94a2ebee871cca7639886f0a", [:mix], [], "hexpm", "28956602cf210d692fcdaf3f60ca49681634e1deb28ace41246aee61ee22dc3b"},
+ "ex_rtp": {:hex, :ex_rtp, "0.4.0", "1f1b5c1440a904706011e3afbb41741f5da309ce251cb986690ce9fd82636658", [:mix], [], "hexpm", "0f72d80d5953a62057270040f0f1ee6f955c08eeae82ac659c038001d7d5a790"},
+ "ex_sctp": {:git, "https://github.com/elixir-webrtc/ex_sctp.git", "1576207bda0eba3634a2b0075899042e9b309e60", []},
+ "ex_sdp": {:hex, :ex_sdp, "1.0.0", "c66cd66d60ad03ff1eecdc6db6a1b8a7b89fec260fcc22e8d6703fc5bbf430a3", [:mix], [{:bunch, "~> 1.3", [hex: :bunch, repo: "hexpm", optional: false]}, {:elixir_uuid, "~> 1.2", [hex: :elixir_uuid, repo: "hexpm", optional: false]}], "hexpm", "e165dff988b8ab9d93588636aa5f3f683e1f848fc63b78b12382c8fa3dd39216"},
+ "ex_stun": {:hex, :ex_stun, "0.2.0", "feb1fc7db0356406655b2a617805e6c712b93308c8ea2bf0ba1197b1f0866deb", [:mix], [], "hexpm", "1e01ba8290082ccbf37acaa5190d1f69b51edd6de2026a8d6d51368b29d115d0"},
+ "ex_turn": {:hex, :ex_turn, "0.1.0", "177405aadf3d754567d0d37cf881a83f9cacf8f45314d188633b04c4a9e7c1ec", [:mix], [{:ex_stun, "~> 0.2.0", [hex: :ex_stun, repo: "hexpm", optional: false]}], "hexpm", "d677737fb7d45274d5dac19fe3c26b9038b6effbc0a6b3e7417bccc76b6d1cd3"},
+ "finch": {:hex, :finch, "0.18.0", "944ac7d34d0bd2ac8998f79f7a811b21d87d911e77a786bc5810adb75632ada4", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: false]}, {:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.3", [hex: :mint, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.4 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:nimble_pool, "~> 0.2.6 or ~> 1.0", [hex: :nimble_pool, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "69f5045b042e531e53edc2574f15e25e735b522c37e2ddb766e15b979e03aa65"},
+ "hpax": {:hex, :hpax, "0.1.2", "09a75600d9d8bbd064cdd741f21fc06fc1f4cf3d0fcc335e5aa19be1a7235c84", [:mix], [], "hexpm", "2c87843d5a23f5f16748ebe77969880e29809580efdaccd615cd3bed628a8c13"},
+ "jason": {:hex, :jason, "1.4.4", "b9226785a9aa77b6857ca22832cffa5d5011a667207eb2a0ad56adb5db443b8a", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "c5eb0cab91f094599f94d55bc63409236a8ec69a21a67814529e8d5f6cc90b3b"},
+ "membrane_precompiled_dependency_provider": {:hex, :membrane_precompiled_dependency_provider, "0.1.2", "8af73b7dc15ba55c9f5fbfc0453d4a8edfb007ade54b56c37d626be0d1189aba", [:mix], [{:bundlex, "~> 1.4", [hex: :bundlex, repo: "hexpm", optional: false]}], "hexpm", "7fe3e07361510445a29bee95336adde667c4162b76b7f4c8af3aeb3415292023"},
+ "mime": {:hex, :mime, "2.0.6", "8f18486773d9b15f95f4f4f1e39b710045fa1de891fada4516559967276e4dc2", [:mix], [], "hexpm", "c9945363a6b26d747389aac3643f8e0e09d30499a138ad64fe8fd1d13d9b153e"},
+ "mint": {:hex, :mint, "1.6.2", "af6d97a4051eee4f05b5500671d47c3a67dac7386045d87a904126fd4bbcea2e", [:mix], [{:castore, "~> 0.1.0 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:hpax, "~> 0.1.1 or ~> 0.2.0 or ~> 1.0", [hex: :hpax, repo: "hexpm", optional: false]}], "hexpm", "5ee441dffc1892f1ae59127f74afe8fd82fda6587794278d924e4d90ea3d63f9"},
+ "nimble_options": {:hex, :nimble_options, "1.1.1", "e3a492d54d85fc3fd7c5baf411d9d2852922f66e69476317787a7b2bb000a61b", [:mix], [], "hexpm", "821b2470ca9442c4b6984882fe9bb0389371b8ddec4d45a9504f00a66f650b44"},
+ "nimble_pool": {:hex, :nimble_pool, "1.1.0", "bf9c29fbdcba3564a8b800d1eeb5a3c58f36e1e11d7b7fb2e084a643f645f06b", [:mix], [], "hexpm", "af2e4e6b34197db81f7aad230c1118eac993acc0dae6bc83bac0126d4ae0813a"},
+ "plug": {:hex, :plug, "1.15.3", "712976f504418f6dff0a3e554c40d705a9bcf89a7ccef92fc6a5ef8f16a30a97", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.1.1 or ~> 1.2 or ~> 2.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "cc4365a3c010a56af402e0809208873d113e9c38c401cabd88027ef4f5c01fd2"},
+ "plug_crypto": {:hex, :plug_crypto, "2.1.0", "f44309c2b06d249c27c8d3f65cfe08158ade08418cf540fd4f72d4d6863abb7b", [:mix], [], "hexpm", "131216a4b030b8f8ce0f26038bc4421ae60e4bb95c5cf5395e1421437824c4fa"},
+ "qex": {:hex, :qex, "0.5.1", "0d82c0f008551d24fffb99d97f8299afcb8ea9cf99582b770bd004ed5af63fd6", [:mix], [], "hexpm", "935a39fdaf2445834b95951456559e9dc2063d0a055742c558a99987b38d6bab"},
+ "req": {:hex, :req, "0.5.6", "8fe1eead4a085510fe3d51ad854ca8f20a622aae46e97b302f499dfb84f726ac", [:mix], [{:brotli, "~> 0.3.1", [hex: :brotli, repo: "hexpm", optional: true]}, {:ezstd, "~> 1.0", [hex: :ezstd, repo: "hexpm", optional: true]}, {:finch, "~> 0.17", [hex: :finch, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:mime, "~> 2.0.6 or ~> 2.1", [hex: :mime, repo: "hexpm", optional: false]}, {:nimble_csv, "~> 1.0", [hex: :nimble_csv, repo: "hexpm", optional: true]}, {:plug, "~> 1.0", [hex: :plug, repo: "hexpm", optional: true]}], "hexpm", "cfaa8e720945d46654853de39d368f40362c2641c4b2153c886418914b372185"},
+ "rustler": {:hex, :rustler, "0.34.0", "e9a73ee419fc296a10e49b415a2eb87a88c9217aa0275ec9f383d37eed290c1c", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:req, "~> 0.5", [hex: :req, repo: "hexpm", optional: false]}, {:toml, "~> 0.6", [hex: :toml, repo: "hexpm", optional: false]}], "hexpm", "1d0c7449482b459513003230c0e2422b0252245776fe6fd6e41cb2b11bd8e628"},
+ "shmex": {:hex, :shmex, "0.5.1", "81dd209093416bf6608e66882cb7e676089307448a1afd4fc906c1f7e5b94cf4", [:mix], [{:bunch_native, "~> 0.5.0", [hex: :bunch_native, repo: "hexpm", optional: false]}, {:bundlex, "~> 1.0", [hex: :bundlex, repo: "hexpm", optional: false]}], "hexpm", "c29f8286891252f64c4e1dac40b217d960f7d58def597c4e606ff8fbe71ceb80"},
+ "telemetry": {:hex, :telemetry, "1.2.1", "68fdfe8d8f05a8428483a97d7aab2f268aaff24b49e0f599faa091f1d4e7f61c", [:rebar3], [], "hexpm", "dad9ce9d8effc621708f99eac538ef1cbe05d6a874dd741de2e689c47feafed5"},
+ "thousand_island": {:hex, :thousand_island, "1.3.5", "6022b6338f1635b3d32406ff98d68b843ba73b3aa95cfc27154223244f3a6ca5", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "2be6954916fdfe4756af3239fb6b6d75d0b8063b5df03ba76fd8a4c87849e180"},
+ "toml": {:hex, :toml, "0.7.0", "fbcd773caa937d0c7a02c301a1feea25612720ac3fa1ccb8bfd9d30d822911de", [:mix], [], "hexpm", "0690246a2478c1defd100b0c9b89b4ea280a22be9a7b313a8a058a2408a2fa70"},
+ "unifex": {:hex, :unifex, "1.2.0", "90d1ec5e6d788350e07e474f7bd8b0ee866d6606beb9ca4e20dbb26328712a84", [:mix], [{:bunch, "~> 1.0", [hex: :bunch, repo: "hexpm", optional: false]}, {:bundlex, "~> 1.4", [hex: :bundlex, repo: "hexpm", optional: false]}, {:shmex, "~> 0.5.0", [hex: :shmex, repo: "hexpm", optional: false]}], "hexpm", "7a8395aabc3ba6cff04bbe5b995de7f899a38eb57f189e49927d6b8b6ccb6883"},
+ "websock": {:hex, :websock, "0.5.3", "2f69a6ebe810328555b6fe5c831a851f485e303a7c8ce6c5f675abeb20ebdadc", [:mix], [], "hexpm", "6105453d7fac22c712ad66fab1d45abdf049868f253cf719b625151460b8b453"},
+ "websock_adapter": {:hex, :websock_adapter, "0.5.6", "0437fe56e093fd4ac422de33bf8fc89f7bc1416a3f2d732d8b2c8fd54792fe60", [:mix], [{:bandit, ">= 0.6.0", [hex: :bandit, repo: "hexpm", optional: true]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.6", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:websock, "~> 0.5", [hex: :websock, repo: "hexpm", optional: false]}], "hexpm", "e04378d26b0af627817ae84c92083b7e97aca3121196679b73c73b99d0d133ea"},
+ "zarex": {:hex, :zarex, "1.0.5", "58239e3ee5d75f343262bb4df5cf466555a1c689f920e5d3651a9333972f7c7e", [:mix], [], "hexpm", "9fb72ef0567c2b2742f5119a1ba8a24a2fabb21b8d09820aefbf3e592fa9a46a"},
+}
diff --git a/examples/chat/priv/static/index.html b/examples/chat/priv/static/index.html
new file mode 100644
index 00000000..9f3ae6c6
--- /dev/null
+++ b/examples/chat/priv/static/index.html
@@ -0,0 +1,16 @@
+
+
+
+
+
+
+ Elixir WebRTC Chat Example
+
+
+ Elixir WebRTC Chat Example
+
+
+
+
+
+
diff --git a/examples/chat/priv/static/script.js b/examples/chat/priv/static/script.js
new file mode 100644
index 00000000..7129de8a
--- /dev/null
+++ b/examples/chat/priv/static/script.js
@@ -0,0 +1,53 @@
+const pcConfig = { 'iceServers': [{ 'urls': 'stun:stun.l.google.com:19302' },] };
+const chatInput = document.getElementById("chatInput");
+const chatMessages = document.getElementById("chatMessages");
+
+const proto = window.location.protocol === "https:" ? "wss:" : "ws:"
+const ws = new WebSocket(`${proto}//${window.location.host}/ws`);
+ws.onopen = _ => start_connection(ws);
+ws.onclose = event => console.log("WebSocket connection was terminated:", event);
+
+const start_connection = async (ws) => {
+ const pc = new RTCPeerConnection(pcConfig);
+ pc.onicecandidate = event => {
+ if (event.candidate === null) return;
+
+ console.log("Sent ICE candidate:", event.candidate);
+ ws.send(JSON.stringify({ type: "ice", data: event.candidate }));
+ };
+
+ const dataChannel = pc.createDataChannel("chat");
+
+ dataChannel.onmessage = event => {
+ const msg = document.createElement("p");
+ msg.innerText = event.data;
+ chatMessages.appendChild(msg);
+ };
+
+ chatInput.onkeydown = event => {
+ if (event.code !== "Enter") return;
+ if (dataChannel.readyState !== "open") return;
+
+ dataChannel.send(chatInput.value);
+ chatInput.value = "";
+ };
+
+ ws.onmessage = async event => {
+ const {type, data} = JSON.parse(event.data);
+
+ switch (type) {
+ case "answer":
+ console.log("Received SDP answer:", data);
+ await pc.setRemoteDescription(data)
+ break;
+ case "ice":
+ console.log("Received ICE candidate:", data);
+ await pc.addIceCandidate(data);
+ }
+ };
+
+ const offer = await pc.createOffer();
+ await pc.setLocalDescription(offer);
+ console.log("Sent SDP offer:", offer)
+ ws.send(JSON.stringify({type: "offer", data: offer}));
+};
diff --git a/lib/ex_webrtc/data_channel.ex b/lib/ex_webrtc/data_channel.ex
new file mode 100644
index 00000000..9e37902a
--- /dev/null
+++ b/lib/ex_webrtc/data_channel.ex
@@ -0,0 +1,55 @@
+defmodule ExWebRTC.DataChannel do
+ @moduledoc """
+ Implementation of the [RTCDataChannel](https://developer.mozilla.org/en-US/docs/Web/API/RTCDataChannel).
+ """
+
+ @type order() :: :ordered | :unordered
+
+ @type ref() :: reference()
+
+ @type id() :: non_neg_integer()
+
+ @type ready_state() :: :connecting | :open | :closing | :closed
+
+ @typedoc """
+ Options used when creating a new DataChannel.
+
+ For more information refer to `ExWebRTC.PeerConnection.create_data_channel/3`.
+
+ As of now, Elixir WebRTC does not support `negotiated: true` option, all DataChannels need to be
+ negotiated in-band.
+ """
+ @type options() :: [
+ ordered: order(),
+ max_packet_life_time: non_neg_integer(),
+ max_retransmits: non_neg_integer(),
+ protocol: String.t()
+ ]
+
+ @typedoc """
+ Struct representing the DataChannel.
+
+ All of the fields have the same meaning as in [RTCDataChannel](https://developer.mozilla.org/en-US/docs/Web/API/RTCDataChannel)
+ except for `ref` which is a local identifier used when refering to this DataChannel in
+ received messages or when calling `ExWebRTC.PeerConnection.send_data/3` function.
+
+ It's worth to mention that `id` and `label` can be used by the other peer to identify the data
+ channel, althought be careful as:
+ * `label` does not have to be unique, channels can share a single label,
+ * `id` is only assigned after the SCTP connection has been established (which means
+ that DataChannels created before first negotiation will have `id` set to `nil`)
+ """
+ @type t() :: %__MODULE__{
+ ref: ref(),
+ id: non_neg_integer() | nil,
+ label: String.t(),
+ max_packet_life_time: non_neg_integer() | nil,
+ max_retransmits: non_neg_integer() | nil,
+ ordered: order(),
+ protocol: String.t(),
+ ready_state: ready_state()
+ }
+
+ @enforce_keys [:ref, :id, :label, :ordered, :protocol, :ready_state]
+ defstruct @enforce_keys ++ [:max_packet_life_time, :max_retransmits]
+end
diff --git a/lib/ex_webrtc/dtls_transport.ex b/lib/ex_webrtc/dtls_transport.ex
index 070f89f4..3ce5f86b 100644
--- a/lib/ex_webrtc/dtls_transport.ex
+++ b/lib/ex_webrtc/dtls_transport.ex
@@ -91,6 +91,11 @@ defmodule ExWebRTC.DTLSTransport do
GenServer.cast(dtls_transport, {:send_rtcp, data})
end
+ @spec send_data(dtls_transport(), binary()) :: :ok
+ def send_data(dtls_transport, data) do
+ GenServer.cast(dtls_transport, {:send_data, data})
+ end
+
@spec stop(dtls_transport()) :: :ok
def stop(dtls_transport) do
GenServer.stop(dtls_transport)
@@ -254,6 +259,16 @@ defmodule ExWebRTC.DTLSTransport do
{:noreply, state}
end
+ @impl true
+ def handle_cast({:send_data, data}, state) do
+ case ExDTLS.write_data(state.dtls, data) do
+ {:ok, protected} -> state.ice_transport.send_data(state.ice_pid, protected)
+ {:error, reason} -> Logger.warning("Unable to protect data: #{inspect(reason)}")
+ end
+
+ {:noreply, state}
+ end
+
@impl true
def handle_info(:dtls_timeout, %{buffered_local_packets: buffered_local_packets} = state) do
case ExDTLS.handle_timeout(state.dtls) do
@@ -309,7 +324,7 @@ defmodule ExWebRTC.DTLSTransport do
{:ok, state}
end
- defp handle_ice_data({:data, <> = data}, state) when f in 20..64 do
+ defp handle_ice_data({:data, <> = data}, state) when f in 20..63 do
case ExDTLS.handle_data(state.dtls, data) do
{:handshake_packets, packets, timeout} when state.ice_connected ->
:ok = state.ice_transport.send_data(state.ice_pid, packets)
@@ -361,6 +376,10 @@ defmodule ExWebRTC.DTLSTransport do
:handshake_want_read ->
{:ok, state}
+ {:ok, data} ->
+ notify(state.owner, {:data, data})
+ {:ok, state}
+
{:error, reason} = error ->
# TODO: consider buffering DTLS packets that came out of order during the handshake
Logger.debug("DTLS error: #{reason}")
diff --git a/lib/ex_webrtc/peer_connection.ex b/lib/ex_webrtc/peer_connection.ex
index 495c5976..15df00e8 100644
--- a/lib/ex_webrtc/peer_connection.ex
+++ b/lib/ex_webrtc/peer_connection.ex
@@ -12,12 +12,14 @@ defmodule ExWebRTC.PeerConnection do
alias __MODULE__.{Configuration, Demuxer, TWCCRecorder}
alias ExWebRTC.{
+ DataChannel,
DefaultICETransport,
DTLSTransport,
ICECandidate,
MediaStreamTrack,
RTPTransceiver,
RTPSender,
+ SCTPTransport,
SDPUtils,
SessionDescription,
Utils
@@ -63,6 +65,7 @@ defmodule ExWebRTC.PeerConnection do
Most of the messages match the [RTCPeerConnection events](https://developer.mozilla.org/en-US/docs/Web/API/RTCPeerConnection#events),
except for:
* `:track_muted`, `:track_ended` - these match the [MediaStreamTrack events](https://developer.mozilla.org/en-US/docs/Web/API/MediaStreamTrack#events).
+ * `:data` - data received from DataChannel identified by its `ref`.
* `:rtp` and `:rtcp` - these contain packets received by the PeerConnection. The third element of `:rtp` tuple is a simulcast RID and is set to `nil` if simulcast
is not used.
* each of the packets in `:rtcp` message is in the form of `{track_id, packet}` tuple, where `track_id` is the id of the corrsponding track.
@@ -77,9 +80,12 @@ defmodule ExWebRTC.PeerConnection do
| {:ice_gathering_state_change, ice_gathering_state()}
| :negotiation_needed
| {:signaling_state_change, signaling_state()}
+ | {:data_channel_state_change, DataChannel.ref(), DataChannel.ready_state()}
+ | {:data_channel, DataChannel.t()}
| {:track, MediaStreamTrack.t()}
| {:track_muted, MediaStreamTrack.id()}
| {:track_ended, MediaStreamTrack.id()}
+ | {:data, DataChannel.ref(), binary()}
| {:rtp, MediaStreamTrack.id(), String.t() | nil, ExRTP.Packet.t()}}
| {:rtcp, [{MediaStreamTrack.id() | nil, ExRTCP.Packet.packet()}]}
@@ -163,6 +169,16 @@ defmodule ExWebRTC.PeerConnection do
GenServer.cast(peer_connection, {:send_pli, track_id, rid})
end
+ @doc """
+ Sends data over DataChannel, using channel identified by `ref`.
+
+ Requires the channel to be in `:open` state.
+ """
+ @spec send_data(peer_connection(), DataChannel.ref(), binary()) :: :ok
+ def send_data(peer_connection, channel_ref, data) do
+ GenServer.cast(peer_connection, {:send_data, channel_ref, data})
+ end
+
#### MDN-API ####
@doc """
@@ -408,6 +424,17 @@ defmodule ExWebRTC.PeerConnection do
GenServer.call(peer_connection, {:remove_track, sender_id})
end
+ @doc """
+ Creates a new DataChannel.
+
+ For more information, refer to the [RTCPeerConnection: createDataChannel() method](https://developer.mozilla.org/en-US/docs/Web/API/RTCPeerConnection/createDataChannel).
+ """
+ @spec create_data_channel(peer_connection(), String.t(), DataChannel.options()) ::
+ {:ok, DataChannel.t()} | {:error, atom()}
+ def create_data_channel(peer_connection, label, opts \\ []) do
+ GenServer.call(peer_connection, {:create_data_channel, label, opts})
+ end
+
@doc """
Closes the PeerConnection.
@@ -455,6 +482,7 @@ defmodule ExWebRTC.PeerConnection do
current_remote_desc: nil,
pending_remote_desc: nil,
negotiation_needed: false,
+ sctp_transport: SCTPTransport.new(),
ice_transport: DefaultICETransport,
ice_pid: ice_pid,
dtls_transport: dtls_transport,
@@ -591,8 +619,13 @@ defmodule ExWebRTC.PeerConnection do
mlines =
Enum.map(remote_offer.media, fn mline ->
{:mid, mid} = ExSDP.get_attribute(mline, :mid)
- {_ix, transceiver} = find_transceiver(state.transceivers, mid)
- RTPTransceiver.to_answer_mline(transceiver, mline, opts)
+
+ if SDPUtils.data_channel?(mline) do
+ generate_data_mline(mid, opts)
+ else
+ {_ix, transceiver} = find_transceiver(state.transceivers, mid)
+ RTPTransceiver.to_answer_mline(transceiver, mline, opts)
+ end
end)
mids = SDPUtils.get_bundle_mids(mlines)
@@ -851,6 +884,34 @@ defmodule ExWebRTC.PeerConnection do
end
end
+ @impl true
+ def handle_call({:create_data_channel, label, opts}, _from, state) do
+ ordered = Keyword.get(opts, :ordered, true)
+ lifetime = Keyword.get(opts, :max_packet_life_time)
+ max_rtx = Keyword.get(opts, :max_retransmits)
+ protocol = Keyword.get(opts, :protocol, "")
+
+ with true <- byte_size(label) < 65_535,
+ true <- lifetime == nil or max_rtx == nil do
+ {events, channel, sctp_transport} =
+ SCTPTransport.add_channel(
+ state.sctp_transport,
+ label,
+ ordered,
+ protocol,
+ lifetime,
+ max_rtx
+ )
+
+ state = update_negotiation_needed(%{state | sctp_transport: sctp_transport})
+
+ handle_sctp_events(events, state)
+ {:reply, {:ok, channel}, state}
+ else
+ _other -> {:reply, {:error, :invalid_option}, state}
+ end
+ end
+
@impl true
def handle_call(:get_stats, _from, state) do
timestamp = System.os_time(:millisecond)
@@ -1032,6 +1093,17 @@ defmodule ExWebRTC.PeerConnection do
end
end
+ @impl true
+ def handle_cast({:send_data, channel_ref, data}, state) do
+ # TODO: allow for configuring the type of data
+ {events, sctp_transport} =
+ SCTPTransport.send(state.sctp_transport, channel_ref, :string, data)
+
+ handle_sctp_events(events, state)
+
+ {:noreply, %{state | sctp_transport: sctp_transport}}
+ end
+
@impl true
def handle_info({:ex_ice, _from, {:connection_state_change, new_ice_state}}, state) do
state = %{state | ice_state: new_ice_state}
@@ -1075,9 +1147,13 @@ defmodule ExWebRTC.PeerConnection do
@impl true
def handle_info({:dtls_transport, _pid, {:state_change, new_dtls_state}}, state) do
- state = %{state | dtls_state: new_dtls_state}
next_conn_state = next_conn_state(state.ice_state, new_dtls_state)
- state = update_conn_state(state, next_conn_state)
+
+ state =
+ %{state | dtls_state: new_dtls_state}
+ |> update_conn_state(next_conn_state)
+ |> maybe_connect_sctp()
+
{:noreply, state}
end
@@ -1159,6 +1235,14 @@ defmodule ExWebRTC.PeerConnection do
end
end
+ @impl true
+ def handle_info({:dtls_transport, _pid, {:data, data}}, state) do
+ {events, sctp_transport} = SCTPTransport.handle_data(state.sctp_transport, data)
+ handle_sctp_events(events, state)
+
+ {:noreply, %{state | sctp_transport: sctp_transport}}
+ end
+
@impl true
def handle_info(:send_twcc_feedback, %{twcc_recorder: twcc_recorder} = state) do
Process.send_after(self(), :send_twcc_feedback, @twcc_interval)
@@ -1229,6 +1313,14 @@ defmodule ExWebRTC.PeerConnection do
{:noreply, %{state | transceivers: transceivers}}
end
+ @impl true
+ def handle_info(:sctp_timeout, state) do
+ {events, sctp_transport} = SCTPTransport.handle_timeout(state.sctp_transport)
+ handle_sctp_events(events, state)
+
+ {:noreply, %{state | sctp_transport: sctp_transport}}
+ end
+
@impl true
def handle_info(msg, state) do
Logger.info("Received unexpected message: #{inspect(msg)}")
@@ -1248,7 +1340,7 @@ defmodule ExWebRTC.PeerConnection do
# converting them into mlines
next_mid = find_next_mid(state)
- {transceivers, _next_mid} =
+ {transceivers, next_mid} =
Enum.map_reduce(state.transceivers, next_mid, fn
# In the initial offer, we can't have stopped transceivers, only stopping ones.
# Also, stopped transceivers are immediately removed.
@@ -1267,7 +1359,14 @@ defmodule ExWebRTC.PeerConnection do
|> Enum.reject(fn tr -> tr.stopping == true end)
|> Enum.map(&RTPTransceiver.to_offer_mline(&1, opts))
- {transceivers, mlines}
+ data_mline =
+ if SCTPTransport.data_channels?(state.sctp_transport) do
+ [generate_data_mline(next_mid, opts)]
+ else
+ []
+ end
+
+ {transceivers, mlines ++ data_mline}
end
defp generate_offer_mlines(state, opts) do
@@ -1275,7 +1374,8 @@ defmodule ExWebRTC.PeerConnection do
next_mid = find_next_mid(state)
next_mline_idx = Enum.count(last_answer.media)
- transceivers = assign_mlines(state.transceivers, last_answer, next_mid, next_mline_idx)
+ {transceivers, next_mid} =
+ assign_mlines(state.transceivers, last_answer, next_mid, next_mline_idx)
# The idea is as follows:
# * Iterate over current local mlines
@@ -1293,13 +1393,17 @@ defmodule ExWebRTC.PeerConnection do
current_local_desc.media
|> Stream.with_index()
|> Enum.map(fn {local_mline, idx} ->
- case Enum.find(transceivers, &(&1.mline_idx == idx)) do
- # if there is no transceiver, the mline must have been rejected
- # in the past (in the offer or answer) so we always set the port to 0
- nil ->
+ tr = Enum.find(transceivers, &(&1.mline_idx == idx))
+
+ cond do
+ SDPUtils.data_channel?(local_mline) ->
+ {:mid, mid} = ExSDP.get_attribute(local_mline, :mid)
+ generate_data_mline(mid, opts)
+
+ tr == nil ->
%{local_mline | port: 0}
- tr ->
+ true ->
RTPTransceiver.to_offer_mline(tr, opts)
end
end)
@@ -1313,7 +1417,37 @@ defmodule ExWebRTC.PeerConnection do
final_mlines = final_mlines ++ rem_mlines
- {transceivers, final_mlines}
+ data_mline =
+ if SCTPTransport.data_channels?(state.sctp_transport) and
+ not Enum.any?(final_mlines, &SDPUtils.data_channel?(&1)) do
+ [generate_data_mline(next_mid, opts)]
+ else
+ []
+ end
+
+ {transceivers, final_mlines ++ data_mline}
+ end
+
+ def generate_data_mline(mid, opts) do
+ attributes =
+ [
+ {:mid, mid},
+ {:ice_ufrag, Keyword.fetch!(opts, :ice_ufrag)},
+ {:ice_pwd, Keyword.fetch!(opts, :ice_pwd)},
+ {:ice_options, Keyword.fetch!(opts, :ice_options)},
+ {:fingerprint, Keyword.fetch!(opts, :fingerprint)},
+ {:setup, Keyword.fetch!(opts, :setup)},
+ {"sctp-port", "5000"}
+ ]
+
+ # NOTICE: Media.new puts fmtp (`webrtc-datachannel`) into a list
+ %ExSDP.Media{
+ ExSDP.Media.new("application", 9, "UDP/DTLS/SCTP", "webrtc-datachannel")
+ | # mline must be followed by a cline, which must contain
+ # the default value "IN IP4 0.0.0.0" (as there are no candidates yet)
+ connection_data: [%ExSDP.ConnectionData{address: {0, 0, 0, 0}}]
+ }
+ |> ExSDP.add_attributes(attributes)
end
# next_mline_idx is future mline idx to use if there are no mlines to recycle
@@ -1327,7 +1461,7 @@ defmodule ExWebRTC.PeerConnection do
result \\ []
)
- defp assign_mlines([], _, _, _, _, result), do: Enum.reverse(result)
+ defp assign_mlines([], _, next_mid, _, _, result), do: {Enum.reverse(result), next_mid}
defp assign_mlines(
[%{mid: nil, mline_idx: nil, stopped: false} = tr | trs],
@@ -1369,7 +1503,10 @@ defmodule ExWebRTC.PeerConnection do
state.ice_transport.gather_candidates(state.ice_pid)
end
- transceivers = process_mlines_local(sdp.media, state.transceivers, type, state.owner)
+ transceivers =
+ sdp.media
+ |> Enum.reject(&SDPUtils.data_channel?/1)
+ |> process_mlines_local(state.transceivers, type, state.owner)
# TODO re-think order of those functions
# and demuxer update
@@ -1412,11 +1549,14 @@ defmodule ExWebRTC.PeerConnection do
state = %{state | config: config, twcc_extension_id: twcc_id}
transceivers =
- process_mlines_remote(sdp.media, state.transceivers, type, state.config, state.owner)
+ sdp.media
+ |> Enum.reject(&SDPUtils.data_channel?/1)
+ |> process_mlines_remote(state.transceivers, type, state.config, state.owner)
# infer our role from the remote role
dtls_role = if dtls_role in [:actpass, :passive], do: :active, else: :passive
DTLSTransport.start_dtls(state.dtls_transport, dtls_role, peer_fingerprint)
+ sctp_transport = SCTPTransport.set_role(state.sctp_transport, dtls_role)
# ice_creds will be nil if all of the mlines in the description are rejected
# in such case, if this is the first remote description, connection won't be established
@@ -1439,7 +1579,9 @@ defmodule ExWebRTC.PeerConnection do
|> Map.replace!(:transceivers, transceivers)
|> remove_stopped_transceivers(type, sdp)
|> update_signaling_state(next_sig_state)
+ |> Map.replace!(:sctp_transport, sctp_transport)
|> Map.update!(:demuxer, &Demuxer.update(&1, sdp))
+ |> maybe_connect_sctp()
if state.signaling_state == :stable do
state = %{state | negotiation_needed: false}
@@ -1751,7 +1893,9 @@ defmodule ExWebRTC.PeerConnection do
do: state
defp update_negotiation_needed(state) do
- negotiation_needed = negotiation_needed?(state.transceivers, state)
+ negotiation_needed =
+ tr_negotiation_needed?(state.transceivers, state) or
+ dc_negotiation_needed?(state)
cond do
negotiation_needed == true and state.negotiation_needed == true ->
@@ -1770,14 +1914,26 @@ defmodule ExWebRTC.PeerConnection do
end
end
+ defp dc_negotiation_needed?(state) do
+ first_channel = map_size(state.sctp_transport.channels) == 1
+
+ has_channels =
+ case state.current_local_desc do
+ nil -> false
+ {_, desc} -> Enum.any?(desc.media, &SDPUtils.data_channel?(&1))
+ end
+
+ first_channel and not has_channels
+ end
+
# We don't support MSIDs and stopping transceivers so
# we only check 5.2 and 5.3 from 4.7.3#check-if-negotiation-is-needed
# https://www.w3.org/TR/webrtc/#dfn-check-if-negotiation-is-needed
- defp negotiation_needed?([], _), do: false
+ defp tr_negotiation_needed?([], _), do: false
- defp negotiation_needed?([tr | _transceivers], _state) when tr.mid == nil, do: true
+ defp tr_negotiation_needed?([tr | _transceivers], _state) when tr.mid == nil, do: true
- defp negotiation_needed?([tr | transceivers], state) do
+ defp tr_negotiation_needed?([tr | transceivers], state) do
{local_desc_type, local_desc} = state.current_local_desc
{_, remote_desc} = state.current_remote_desc
@@ -1802,10 +1958,25 @@ defmodule ExWebRTC.PeerConnection do
true
true ->
- negotiation_needed?(transceivers, state)
+ tr_negotiation_needed?(transceivers, state)
+ end
+ end
+
+ defp maybe_connect_sctp(%{current_remote_desc: {:answer, sdp}} = state) do
+ has_channel? = Enum.any?(sdp.media, &SDPUtils.data_channel?(&1))
+ connected? = state.dtls_state == :connected
+
+ if has_channel? and connected? do
+ {events, sctp_transport} = SCTPTransport.connect(state.sctp_transport)
+ handle_sctp_events(events, state)
+ %{state | sctp_transport: sctp_transport}
+ else
+ state
end
end
+ defp maybe_connect_sctp(state), do: state
+
defp handle_rtcp_packet(state, %ExRTCP.Packet.ReceiverReport{} = report) do
with true <- :rtcp_reports in state.config.features,
{:ok, mid} <- Demuxer.demux_ssrc(state.demuxer, report.ssrc),
@@ -1876,6 +2047,24 @@ defmodule ExWebRTC.PeerConnection do
defp handle_rtcp_packet(state, _packet), do: {nil, state}
+ defp handle_sctp_events(events, state) do
+ for event <- events do
+ case event do
+ {:transmit, packets} ->
+ Enum.each(packets, &DTLSTransport.send_data(state.dtls_transport, &1))
+
+ {:channel, channel} ->
+ notify(state.owner, {:data_channel, channel})
+
+ {:state_change, ref, new_state} ->
+ notify(state.owner, {:data_channel_state_change, ref, new_state})
+
+ {:data, ref, data} ->
+ notify(state.owner, {:data, ref, data})
+ end
+ end
+ end
+
defp do_get_description(nil, _candidates), do: nil
defp do_get_description({type, sdp}, candidates) do
diff --git a/lib/ex_webrtc/sctp_transport.ex b/lib/ex_webrtc/sctp_transport.ex
new file mode 100644
index 00000000..009e6add
--- /dev/null
+++ b/lib/ex_webrtc/sctp_transport.ex
@@ -0,0 +1,315 @@
+defmodule ExWebRTC.SCTPTransport do
+ @moduledoc false
+
+ require Logger
+
+ alias __MODULE__.DCEP
+ alias ExWebRTC.DataChannel
+
+ @dcep_ppi 50
+
+ @opaque t() :: map()
+
+ @type event() ::
+ {:transmit, binary()}
+ | {:data, DataChannel.ref(), binary()}
+ | {:channel, DataChannel.t()}
+ | {:state_change, DataChannel.ref(), DataChannel.ready_state()}
+
+ @spec new() :: t()
+ def new do
+ %{
+ ref: ExSCTP.new(),
+ connected: false,
+ id_type: nil,
+ timer: nil,
+ channels: %{}
+ }
+ end
+
+ @spec connect(t()) :: {[event()], t()}
+ def connect(%{connected: true} = sctp_transport), do: {[], sctp_transport}
+
+ def connect(sctp_transport) do
+ :ok = ExSCTP.connect(sctp_transport.ref)
+ handle_events(sctp_transport)
+ end
+
+ @spec set_role(t(), :active | :passive) :: t()
+ def set_role(%{id_type: t} = sctp_transport, _type) when t != nil, do: sctp_transport
+ def set_role(sctp_transport, :active), do: %{sctp_transport | id_type: :even}
+ def set_role(sctp_transport, :passive), do: %{sctp_transport | id_type: :odd}
+
+ @spec data_channels?(t()) :: boolean()
+ def data_channels?(sctp_transport) do
+ sctp_transport.channels != %{}
+ end
+
+ @spec add_channel(
+ t(),
+ String.t(),
+ boolean(),
+ String.t(),
+ non_neg_integer() | nil,
+ non_neg_integer() | nil
+ ) ::
+ {[event()], DataChannel.t(), t()}
+ def add_channel(sctp_transport, label, ordered, protocol, lifetime, max_rtx) do
+ channel = %DataChannel{
+ ref: make_ref(),
+ id: nil,
+ label: label,
+ ordered: ordered,
+ protocol: protocol,
+ ready_state: :connecting,
+ max_packet_life_time: lifetime,
+ max_retransmits: max_rtx
+ }
+
+ channels = Map.put(sctp_transport.channels, channel.ref, channel)
+ sctp_transport = %{sctp_transport | channels: channels}
+
+ {events, sctp_transport} =
+ if sctp_transport.connected do
+ sctp_transport = handle_pending_channels(sctp_transport)
+ handle_events(sctp_transport)
+ else
+ {[], sctp_transport}
+ end
+
+ {events, channel, sctp_transport}
+ end
+
+ # TODO: close channel
+
+ @spec send(t(), DataChannel.ref(), :string | :binary, binary()) :: {[event()], t()}
+ def send(sctp_transport, ref, type, data) do
+ {ppi, data} = to_raw_data(data, type)
+
+ case Map.fetch(sctp_transport.channels, ref) do
+ {:ok, %DataChannel{ready_state: :open, id: id}} when id != nil ->
+ :ok = ExSCTP.send(sctp_transport.ref, id, ppi, data)
+ handle_events(sctp_transport)
+
+ {:ok, %DataChannel{id: id}} ->
+ Logger.warning(
+ "Trying to send data over DataChannel with id #{id} that is not opened yet"
+ )
+
+ {[], sctp_transport}
+
+ :error ->
+ Logger.warning("Trying to send data over non-existing DataChannel with ref #{ref}")
+ {[], sctp_transport}
+ end
+ end
+
+ @spec handle_timeout(t()) :: {[event()], t()}
+ def handle_timeout(sctp_transport) do
+ ExSCTP.handle_timeout(sctp_transport.ref)
+ handle_events(sctp_transport)
+ end
+
+ @spec handle_data(t(), binary()) :: {[event()], t()}
+ def handle_data(sctp_transport, data) do
+ :ok = ExSCTP.handle_data(sctp_transport.ref, data)
+ handle_events(sctp_transport)
+ end
+
+ defp handle_pending_channels(sctp_transport) do
+ sctp_transport.channels
+ |> Map.values()
+ |> Enum.filter(fn channel -> channel.id == nil end)
+ |> Enum.reduce(sctp_transport, fn channel, transport ->
+ handle_pending_channel(transport, channel)
+ end)
+ end
+
+ defp handle_pending_channel(sctp_transport, channel) do
+ id = new_id(sctp_transport)
+ :ok = ExSCTP.open_stream(sctp_transport.ref, id)
+
+ {reliability, param} =
+ cond do
+ channel.max_packet_life_time != nil -> {:timed, channel.max_packet_life_time}
+ channel.max_retransmits != nil -> {:rexmit, channel.max_retransmits}
+ true -> {:reliable, 0}
+ end
+
+ dco = %DCEP.DataChannelOpen{
+ reliability: reliability,
+ order: if(channel.ordered, do: :ordered, else: :unordered),
+ label: channel.label,
+ protocol: channel.protocol,
+ param: param,
+ priority: 0
+ }
+
+ :ok = ExSCTP.send(sctp_transport.ref, id, @dcep_ppi, DCEP.encode(dco))
+
+ channel = %DataChannel{channel | id: id}
+ %{sctp_transport | channels: Map.replace!(sctp_transport.channels, channel.ref, channel)}
+ end
+
+ defp handle_events(sctp_transport, events \\ []) do
+ event = ExSCTP.poll(sctp_transport.ref)
+
+ case handle_event(sctp_transport, event) do
+ {:none, transport} -> {Enum.reverse(events), transport}
+ {nil, transport} -> handle_events(transport, events)
+ {other, transport} -> handle_events(transport, [other | events])
+ end
+ end
+
+ # if SCTP disconnected, most likely DTLS disconnected, so we won't handle this here explcitly
+ defp handle_event(sctp_transport, :disconnected), do: {nil, sctp_transport}
+ defp handle_event(sctp_transport, :none), do: {:none, sctp_transport}
+ defp handle_event(sctp_transport, {:transmit, _data} = event), do: {event, sctp_transport}
+
+ defp handle_event(sctp_transport, {:stream_opened, id}) do
+ Logger.debug("SCTP stream #{id} has been opened")
+ {nil, sctp_transport}
+ end
+
+ defp handle_event(sctp_transport, {:stream_closed, _id}) do
+ # TODO: handle closing channels
+ {nil, sctp_transport}
+ end
+
+ defp handle_event(sctp_transport, :connected) do
+ Logger.debug("SCTP connection has been established")
+
+ sctp_transport =
+ %{sctp_transport | connected: true}
+ |> handle_pending_channels()
+
+ {nil, sctp_transport}
+ end
+
+ defp handle_event(sctp_transport, {:timeout, val}) do
+ # TODO: this seems to work
+ # but sometimes the data is send after quite a substensial timeout
+ # calling `handle_timeout` periodically (i.e. every 50s) seems to work better
+ # which is wierd, to investigate
+ if sctp_transport.timer != nil do
+ Process.cancel_timer(sctp_transport.timer)
+ end
+
+ timer =
+ case val do
+ nil -> nil
+ ms -> Process.send_after(self(), :sctp_timeout, ms)
+ end
+
+ {nil, %{sctp_transport | timer: timer}}
+ end
+
+ defp handle_event(sctp_transport, {:data, id, @dcep_ppi, data}) do
+ with {:ok, dcep} <- DCEP.decode(data),
+ {:ok, sctp_transport, event} <- handle_dcep(sctp_transport, id, dcep) do
+ {event, sctp_transport}
+ else
+ :error ->
+ # TODO: close the channel
+ Logger.warning("Received invalid DCEP message. Closing the stream with id #{id}")
+ {nil, sctp_transport}
+ end
+ end
+
+ defp handle_event(sctp_transport, {:data, id, ppi, data}) do
+ with {:ok, data} <- from_raw_data(data, ppi),
+ {ref, %DataChannel{ready_state: :open}} <-
+ Enum.find(sctp_transport.channels, fn {_k, v} -> v.id == id end) do
+ {{:data, ref, data}, sctp_transport}
+ else
+ {_ref, %DataChannel{}} ->
+ Logger.warning("Received data on DataChannel with id #{id} that is not open. Discarding")
+ {nil, sctp_transport}
+
+ nil ->
+ Logger.warning(
+ "Received data over non-existing DataChannel on stream with id #{id}. Discarding"
+ )
+
+ {nil, sctp_transport}
+
+ _other ->
+ Logger.warning("Received data in invalid format on stream with id #{id}. Discarding")
+ {nil, sctp_transport}
+ end
+ end
+
+ defp handle_dcep(sctp_transport, id, %DCEP.DataChannelOpen{} = dco) do
+ with false <- Enum.any?(sctp_transport.channels, fn {_k, v} -> v.id == id end),
+ true <- valid_id?(sctp_transport, id) do
+ :ok = ExSCTP.send(sctp_transport.ref, id, @dcep_ppi, DCEP.encode(%DCEP.DataChannelAck{}))
+
+ Logger.debug("Remote opened DataChannel #{id} succesfully")
+
+ channel = %DataChannel{
+ ref: make_ref(),
+ id: id,
+ label: dco.label,
+ ordered: dco.order == :ordered,
+ protocol: dco.protocol,
+ ready_state: :open,
+ max_packet_life_time: if(dco.reliability == :timed, do: dco.param, else: nil),
+ max_retransmits: if(dco.reliability == :rexmit, do: dco.param, else: nil)
+ }
+
+ # In theory, we should also send the :open event here (W3C 6.2.3)
+ channels = Map.put(sctp_transport.channels, channel.ref, channel)
+ {:ok, %{sctp_transport | channels: channels}, {:channel, channel}}
+ else
+ _other -> :error
+ end
+ end
+
+ defp handle_dcep(sctp_transport, id, %DCEP.DataChannelAck{}) do
+ case Enum.find(sctp_transport.channels, fn {_k, v} -> v.id == id end) do
+ {ref, %DataChannel{ready_state: :connecting} = channel} ->
+ Logger.debug("Locally opened DataChannel #{id} has been negotiated succesfully")
+ # TODO: set the parameters
+ channel = %DataChannel{channel | ready_state: :open}
+ channels = Map.put(sctp_transport.channels, ref, channel)
+ event = {:state_change, ref, :open}
+
+ {:ok, %{sctp_transport | channels: channels}, event}
+
+ _other ->
+ # TODO: close the channel
+ Logger.warning("Received DCEP Ack without sending the DCEP Open message on stream #{id}")
+ {:ok, sctp_transport, nil}
+ end
+ end
+
+ defp from_raw_data(data, ppi) when ppi in [51, 53], do: {:ok, data}
+ defp from_raw_data(_data, ppi) when ppi in [56, 57], do: {:ok, <<>>}
+ defp from_raw_data(_data, _ppi), do: :error
+
+ defp to_raw_data(<<>>, :string), do: {56, <<0>>}
+ defp to_raw_data(data, :string), do: {51, data}
+ defp to_raw_data(<<>>, :binary), do: {57, <<0>>}
+ defp to_raw_data(data, :binary), do: {53, data}
+
+ # for remote ids (so must be opposite than ours)
+ defp valid_id?(%{id_type: :even}, id), do: rem(id, 2) == 1
+ defp valid_id?(%{id_type: :odd}, id), do: rem(id, 2) == 0
+
+ defp new_id(sctp_transport) do
+ max_id =
+ sctp_transport.channels
+ |> Enum.filter(fn {_k, v} -> v.id != nil end)
+ |> Enum.map(fn {_k, v} -> v.id end)
+ |> Enum.max(&>=/2, fn -> -1 end)
+
+ case {sctp_transport.id_type, rem(max_id, 2)} do
+ {:even, -1} -> 0
+ {:odd, -1} -> 1
+ {:even, 0} -> max_id + 2
+ {:even, 1} -> max_id + 1
+ {:odd, 0} -> max_id + 1
+ {:odd, 1} -> max_id + 2
+ end
+ end
+end
diff --git a/lib/ex_webrtc/sctp_transport/dcep.ex b/lib/ex_webrtc/sctp_transport/dcep.ex
new file mode 100644
index 00000000..7f1efe7e
--- /dev/null
+++ b/lib/ex_webrtc/sctp_transport/dcep.ex
@@ -0,0 +1,76 @@
+defmodule ExWebRTC.SCTPTransport.DCEP do
+ @moduledoc false
+ # based on RFC 8832
+
+ defmodule DataChannelAck do
+ @moduledoc false
+
+ defstruct []
+
+ def decode(<<>>), do: {:ok, %__MODULE__{}}
+ def decode(_other), do: :error
+
+ def encode(%__MODULE__{}), do: <<0x02>>
+ end
+
+ defmodule DataChannelOpen do
+ @moduledoc false
+
+ @enforce_keys [:reliability, :order, :label, :protocol, :priority, :param]
+ defstruct @enforce_keys
+
+ def decode(
+ <>
+ ) do
+ with {:ok, reliability, order} <- to_channel_type(ch_type),
+ <> <- rest,
+ <> <- rest do
+ dca =
+ %__MODULE__{
+ reliability: reliability,
+ order: order,
+ param: param,
+ label: label,
+ protocol: protocol,
+ priority: priority
+ }
+
+ {:ok, dca}
+ else
+ _other -> :error
+ end
+ end
+
+ def encode(%__MODULE__{} = dco) do
+ ch_type = from_channel_type(dco.reliability, dco.order)
+ label_len = byte_size(dco.label)
+ proto_len = byte_size(dco.protocol)
+
+ <<0x03::8, ch_type::8, dco.priority::16, dco.param::32, label_len::16, proto_len::16,
+ dco.label::binary-size(label_len), dco.protocol::binary-size(proto_len)>>
+ end
+
+ # most significant bit determines order,
+ # least significant 2 bits determine reliability
+ defp to_channel_type(0x00), do: {:ok, :reliable, :ordered}
+ defp to_channel_type(0x80), do: {:ok, :reliable, :unordered}
+ defp to_channel_type(0x01), do: {:ok, :rexmit, :ordered}
+ defp to_channel_type(0x81), do: {:ok, :rexmit, :unordered}
+ defp to_channel_type(0x02), do: {:ok, :timed, :ordered}
+ defp to_channel_type(0x82), do: {:ok, :timed, :unordered}
+ defp to_channel_type(_other), do: :error
+
+ defp from_channel_type(:reliable, :ordered), do: 0x00
+ defp from_channel_type(:reliable, :unordered), do: 0x80
+ defp from_channel_type(:rexmit, :ordered), do: 0x01
+ defp from_channel_type(:rexmit, :unordered), do: 0x81
+ defp from_channel_type(:timed, :ordered), do: 0x02
+ defp from_channel_type(:timed, :unordered), do: 0x82
+ end
+
+ def decode(<<0x03::8, rest::binary>>), do: DataChannelOpen.decode(rest)
+ def decode(<<0x02::8, rest::binary>>), do: DataChannelAck.decode(rest)
+ def decode(_other), do: :error
+
+ def encode(%mod{} = dcep), do: mod.encode(dcep)
+end
diff --git a/lib/ex_webrtc/sdp_utils.ex b/lib/ex_webrtc/sdp_utils.ex
index ad8c1422..b16985eb 100644
--- a/lib/ex_webrtc/sdp_utils.ex
+++ b/lib/ex_webrtc/sdp_utils.ex
@@ -12,7 +12,8 @@ defmodule ExWebRTC.SDPUtils do
def ensure_valid(sdp) do
with :ok <- ensure_non_empty(sdp),
:ok <- ensure_mid(sdp),
- :ok <- ensure_bundle(sdp) do
+ :ok <- ensure_bundle(sdp),
+ :ok <- ensure_data_channel_valid(sdp) do
ensure_rtcp_mux(sdp)
end
end
@@ -65,10 +66,23 @@ defmodule ExWebRTC.SDPUtils do
Enum.filter(groups, fn %ExSDP.Attribute.Group{semantics: name} -> name == to_filter end)
end
+ defp ensure_data_channel_valid(sdp) do
+ with [data_mline] <- Enum.filter(sdp.media, &data_channel?/1),
+ "webrtc-datachannel" <- data_mline.fmt,
+ sctp_port when not is_nil(sctp_port) <- ExSDP.get_attribute(data_mline, "sctp-port") do
+ :ok
+ else
+ [] -> :ok
+ mlines when is_list(mlines) -> {:error, :multiple_data_mlines}
+ fmt when is_binary(fmt) -> {:error, :invalid_datachannel_format}
+ nil -> {:error, :no_sctp_port}
+ end
+ end
+
defp ensure_rtcp_mux(sdp) do
# Firefox does not add `rtcp_mux` in rejected mlines
sdp.media
- |> Enum.reject(&rejected?/1)
+ |> Enum.reject(&(rejected?(&1) or data_channel?(&1)))
|> Enum.all?(&(ExSDP.get_attribute(&1, :rtcp_mux) == :rtcp_mux))
|> case do
true -> :ok
@@ -364,6 +378,11 @@ defmodule ExWebRTC.SDPUtils do
def rejected?(%ExSDP.Media{}), do: false
+ @spec data_channel?(ExSDP.Media.t()) :: boolean()
+ def data_channel?(%ExSDP.Media{fmt: "webrtc-datachannel"}), do: true
+ def data_channel?(%ExSDP.Media{fmt: ["webrtc-datachannel"]}), do: true
+ def data_channel?(%ExSDP.Media{}), do: false
+
defp do_get_ice_credentials(sdp_or_mline) do
ice_ufrag =
case ExSDP.get_attribute(sdp_or_mline, :ice_ufrag) do
diff --git a/mix.exs b/mix.exs
index dda7bb08..da47b341 100644
--- a/mix.exs
+++ b/mix.exs
@@ -58,10 +58,11 @@ defmodule ExWebRTC.MixProject do
[
{:ex_sdp, "~> 1.0"},
{:ex_ice, "~> 0.8.0"},
- {:ex_dtls, "~> 0.15.0"},
+ {:ex_dtls, github: "elixir-webrtc/ex_dtls"},
{:ex_libsrtp, "~> 0.7.1"},
{:ex_rtp, "~> 0.4.0"},
{:ex_rtcp, "~> 0.4.0"},
+ {:ex_sctp, github: "elixir-webrtc/ex_sctp"},
{:crc, "~> 0.10"},
# dev/test
diff --git a/mix.lock b/mix.lock
index 5bdd05cd..708962ed 100644
--- a/mix.lock
+++ b/mix.lock
@@ -12,11 +12,12 @@
"elixir_uuid": {:hex, :elixir_uuid, "1.2.1", "dce506597acb7e6b0daeaff52ff6a9043f5919a4c3315abb4143f0b00378c097", [:mix], [], "hexpm", "f7eba2ea6c3555cea09706492716b0d87397b88946e6380898c2889d68585752"},
"erlex": {:hex, :erlex, "0.2.7", "810e8725f96ab74d17aac676e748627a07bc87eb950d2b83acd29dc047a30595", [:mix], [], "hexpm", "3ed95f79d1a844c3f6bf0cea61e0d5612a42ce56da9c03f01df538685365efb0"},
"ex_doc": {:hex, :ex_doc, "0.31.2", "8b06d0a5ac69e1a54df35519c951f1f44a7b7ca9a5bb7a260cd8a174d6322ece", [:mix], [{:earmark_parser, "~> 1.4.39", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.1", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "317346c14febaba9ca40fd97b5b5919f7751fb85d399cc8e7e8872049f37e0af"},
- "ex_dtls": {:hex, :ex_dtls, "0.15.2", "6c8c0f8eb67525216551bd3e0322ab33c9d851d56ef3e065efab4fd277a8fbb9", [:mix], [{:unifex, "~> 1.0", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "6b852bc926bbdc9c1b9c4ecc6cfc73a89d4e106042802cefea2c1503072a9f2a"},
+ "ex_dtls": {:git, "https://github.com/elixir-webrtc/ex_dtls.git", "dff8ec1998dfb556b2d3dafbd30574d0da18b958", []},
"ex_ice": {:hex, :ex_ice, "0.8.0", "f9bd181e8fd2f8ac9a808587ee8a47bf667143069d75f6e4892a62156d798aa7", [:mix], [{:elixir_uuid, "~> 1.0", [hex: :elixir_uuid, repo: "hexpm", optional: false]}, {:ex_stun, "~> 0.2.0", [hex: :ex_stun, repo: "hexpm", optional: false]}, {:ex_turn, "~> 0.1.0", [hex: :ex_turn, repo: "hexpm", optional: false]}], "hexpm", "b0476f6b18986f6df48fda4cecb3be5022323572790d1bb49da10b177c936b4e"},
"ex_libsrtp": {:hex, :ex_libsrtp, "0.7.2", "211bd89c08026943ce71f3e2c0231795b99cee748808ed3ae7b97cd8d2450b6b", [:mix], [{:bunch, "~> 1.6", [hex: :bunch, repo: "hexpm", optional: false]}, {:bundlex, "~> 1.3", [hex: :bundlex, repo: "hexpm", optional: false]}, {:membrane_precompiled_dependency_provider, "~> 0.1.0", [hex: :membrane_precompiled_dependency_provider, repo: "hexpm", optional: false]}, {:unifex, "~> 1.1", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "2e20645d0d739a4ecdcf8d4810a0c198120c8a2f617f2b75b2e2e704d59f492a"},
"ex_rtcp": {:hex, :ex_rtcp, "0.4.0", "f9e515462a9581798ff6413583a25174cfd2101c94a2ebee871cca7639886f0a", [:mix], [], "hexpm", "28956602cf210d692fcdaf3f60ca49681634e1deb28ace41246aee61ee22dc3b"},
"ex_rtp": {:hex, :ex_rtp, "0.4.0", "1f1b5c1440a904706011e3afbb41741f5da309ce251cb986690ce9fd82636658", [:mix], [], "hexpm", "0f72d80d5953a62057270040f0f1ee6f955c08eeae82ac659c038001d7d5a790"},
+ "ex_sctp": {:git, "https://github.com/elixir-webrtc/ex_sctp.git", "3de986b3cd00796a1f2a5ac40b001682d5712264", []},
"ex_sdp": {:hex, :ex_sdp, "1.0.0", "c66cd66d60ad03ff1eecdc6db6a1b8a7b89fec260fcc22e8d6703fc5bbf430a3", [:mix], [{:bunch, "~> 1.3", [hex: :bunch, repo: "hexpm", optional: false]}, {:elixir_uuid, "~> 1.2", [hex: :elixir_uuid, repo: "hexpm", optional: false]}], "hexpm", "e165dff988b8ab9d93588636aa5f3f683e1f848fc63b78b12382c8fa3dd39216"},
"ex_stun": {:hex, :ex_stun, "0.2.0", "feb1fc7db0356406655b2a617805e6c712b93308c8ea2bf0ba1197b1f0866deb", [:mix], [], "hexpm", "1e01ba8290082ccbf37acaa5190d1f69b51edd6de2026a8d6d51368b29d115d0"},
"ex_turn": {:hex, :ex_turn, "0.1.0", "177405aadf3d754567d0d37cf881a83f9cacf8f45314d188633b04c4a9e7c1ec", [:mix], [{:ex_stun, "~> 0.2.0", [hex: :ex_stun, repo: "hexpm", optional: false]}], "hexpm", "d677737fb7d45274d5dac19fe3c26b9038b6effbc0a6b3e7417bccc76b6d1cd3"},
@@ -36,8 +37,10 @@
"nimble_pool": {:hex, :nimble_pool, "1.1.0", "bf9c29fbdcba3564a8b800d1eeb5a3c58f36e1e11d7b7fb2e084a643f645f06b", [:mix], [], "hexpm", "af2e4e6b34197db81f7aad230c1118eac993acc0dae6bc83bac0126d4ae0813a"},
"qex": {:hex, :qex, "0.5.1", "0d82c0f008551d24fffb99d97f8299afcb8ea9cf99582b770bd004ed5af63fd6", [:mix], [], "hexpm", "935a39fdaf2445834b95951456559e9dc2063d0a055742c558a99987b38d6bab"},
"req": {:hex, :req, "0.5.6", "8fe1eead4a085510fe3d51ad854ca8f20a622aae46e97b302f499dfb84f726ac", [:mix], [{:brotli, "~> 0.3.1", [hex: :brotli, repo: "hexpm", optional: true]}, {:ezstd, "~> 1.0", [hex: :ezstd, repo: "hexpm", optional: true]}, {:finch, "~> 0.17", [hex: :finch, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:mime, "~> 2.0.6 or ~> 2.1", [hex: :mime, repo: "hexpm", optional: false]}, {:nimble_csv, "~> 1.0", [hex: :nimble_csv, repo: "hexpm", optional: true]}, {:plug, "~> 1.0", [hex: :plug, repo: "hexpm", optional: true]}], "hexpm", "cfaa8e720945d46654853de39d368f40362c2641c4b2153c886418914b372185"},
+ "rustler": {:hex, :rustler, "0.34.0", "e9a73ee419fc296a10e49b415a2eb87a88c9217aa0275ec9f383d37eed290c1c", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:req, "~> 0.5", [hex: :req, repo: "hexpm", optional: false]}, {:toml, "~> 0.6", [hex: :toml, repo: "hexpm", optional: false]}], "hexpm", "1d0c7449482b459513003230c0e2422b0252245776fe6fd6e41cb2b11bd8e628"},
"shmex": {:hex, :shmex, "0.5.1", "81dd209093416bf6608e66882cb7e676089307448a1afd4fc906c1f7e5b94cf4", [:mix], [{:bunch_native, "~> 0.5.0", [hex: :bunch_native, repo: "hexpm", optional: false]}, {:bundlex, "~> 1.0", [hex: :bundlex, repo: "hexpm", optional: false]}], "hexpm", "c29f8286891252f64c4e1dac40b217d960f7d58def597c4e606ff8fbe71ceb80"},
"telemetry": {:hex, :telemetry, "1.2.1", "68fdfe8d8f05a8428483a97d7aab2f268aaff24b49e0f599faa091f1d4e7f61c", [:rebar3], [], "hexpm", "dad9ce9d8effc621708f99eac538ef1cbe05d6a874dd741de2e689c47feafed5"},
+ "toml": {:hex, :toml, "0.7.0", "fbcd773caa937d0c7a02c301a1feea25612720ac3fa1ccb8bfd9d30d822911de", [:mix], [], "hexpm", "0690246a2478c1defd100b0c9b89b4ea280a22be9a7b313a8a058a2408a2fa70"},
"unifex": {:hex, :unifex, "1.2.0", "90d1ec5e6d788350e07e474f7bd8b0ee866d6606beb9ca4e20dbb26328712a84", [:mix], [{:bunch, "~> 1.0", [hex: :bunch, repo: "hexpm", optional: false]}, {:bundlex, "~> 1.4", [hex: :bundlex, repo: "hexpm", optional: false]}, {:shmex, "~> 0.5.0", [hex: :shmex, repo: "hexpm", optional: false]}], "hexpm", "7a8395aabc3ba6cff04bbe5b995de7f899a38eb57f189e49927d6b8b6ccb6883"},
"zarex": {:hex, :zarex, "1.0.5", "58239e3ee5d75f343262bb4df5cf466555a1c689f920e5d3651a9333972f7c7e", [:mix], [], "hexpm", "9fb72ef0567c2b2742f5119a1ba8a24a2fabb21b8d09820aefbf3e592fa9a46a"},
}
diff --git a/test/ex_webrtc/data_channel_test.exs b/test/ex_webrtc/data_channel_test.exs
new file mode 100644
index 00000000..916d4aed
--- /dev/null
+++ b/test/ex_webrtc/data_channel_test.exs
@@ -0,0 +1,168 @@
+defmodule ExWebRTC.DataChannelTest do
+ use ExUnit.Case, async: true
+
+ import ExWebRTC.Support.TestUtils
+
+ alias ExWebRTC.{DataChannel, PeerConnection, MediaStreamTrack}
+
+ test "establishing channels" do
+ {:ok, pc1} = PeerConnection.start_link()
+ {:ok, pc2} = PeerConnection.start_link()
+
+ label1 = "my label 1"
+ {:ok, %DataChannel{ref: ref1}} = PeerConnection.create_data_channel(pc1, label1)
+ assert_receive {:ex_webrtc, ^pc1, :negotiation_needed}
+
+ :ok = negotiate(pc1, pc2)
+
+ refute_receive {:ex_webrtc, ^pc2, {:data_channel, _}}
+
+ :ok = connect(pc1, pc2)
+
+ assert_receive {:ex_webrtc, ^pc2, {:data_channel, chan1}}
+ assert %DataChannel{id: 1, label: ^label1, ordered: true} = chan1
+ assert_receive {:ex_webrtc, ^pc1, {:data_channel_state_change, ^ref1, :open}}
+
+ label2 = "my label 2"
+ protocol = "my proto"
+
+ {:ok, %DataChannel{ref: ref2}} =
+ PeerConnection.create_data_channel(pc1, label2, protocol: protocol, ordered: false)
+
+ refute_receive {:ex_webrtc, ^pc1, :negotiation_needed}
+
+ assert_receive {:ex_webrtc, ^pc2, {:data_channel, chan2}}
+ assert %DataChannel{id: 3, label: ^label2, protocol: ^protocol, ordered: false} = chan2
+ assert_receive {:ex_webrtc, ^pc1, {:data_channel_state_change, ^ref2, :open}}
+
+ label3 = "my label 3"
+ {:ok, %DataChannel{ref: ref3}} = PeerConnection.create_data_channel(pc2, label3)
+
+ refute_receive {:ex_webrtc, ^pc2, :negotiation_needed}
+
+ assert_receive {:ex_webrtc, ^pc1, {:data_channel, chan3}}
+ assert %DataChannel{id: 4, label: ^label3} = chan3
+ assert_receive {:ex_webrtc, ^pc2, {:data_channel_state_change, ^ref3, :open}}
+ end
+
+ describe "negotiating" do
+ test "with only channel added" do
+ {:ok, pc1} = PeerConnection.start_link()
+ {:ok, pc2} = PeerConnection.start_link()
+
+ label = "my label"
+ {:ok, %DataChannel{ref: ref1}} = PeerConnection.create_data_channel(pc1, label)
+
+ :ok = negotiate(pc1, pc2)
+ :ok = connect(pc1, pc2)
+
+ assert_receive {:ex_webrtc, ^pc2, {:data_channel, %DataChannel{label: ^label}}}
+ assert_receive {:ex_webrtc, ^pc1, {:data_channel_state_change, ^ref1, :open}}
+ end
+
+ test "with channel mixed with transceivers" do
+ {:ok, pc1} = PeerConnection.start_link()
+ {:ok, pc2} = PeerConnection.start_link()
+
+ {:ok, _sender} = PeerConnection.add_track(pc1, MediaStreamTrack.new(:audio))
+ label1 = "my label"
+ {:ok, %DataChannel{ref: ref1}} = PeerConnection.create_data_channel(pc1, label1)
+ {:ok, _sender} = PeerConnection.add_track(pc1, MediaStreamTrack.new(:video))
+
+ :ok = negotiate(pc1, pc2)
+ :ok = connect(pc1, pc2)
+
+ assert_receive {:ex_webrtc, ^pc2, {:data_channel, %DataChannel{label: ^label1}}}
+ assert_receive {:ex_webrtc, ^pc1, {:data_channel_state_change, ^ref1, :open}}
+
+ # add more tracks and channels and renegotiate
+ {:ok, _sender} = PeerConnection.add_track(pc1, MediaStreamTrack.new(:video))
+ :ok = negotiate(pc1, pc2)
+
+ label2 = "my label 2"
+ {:ok, %DataChannel{ref: ref2}} = PeerConnection.create_data_channel(pc2, label2)
+ assert_receive {:ex_webrtc, ^pc1, {:data_channel, %DataChannel{label: ^label2}}}
+ assert_receive {:ex_webrtc, ^pc2, {:data_channel_state_change, ^ref2, :open}}
+ end
+
+ test "with channel added only in renegotiation" do
+ {:ok, pc1} = PeerConnection.start_link()
+ {:ok, pc2} = PeerConnection.start_link()
+ {:ok, _sender} = PeerConnection.add_track(pc1, MediaStreamTrack.new(:audio))
+ {:ok, _sender} = PeerConnection.add_track(pc1, MediaStreamTrack.new(:video))
+
+ :ok = negotiate(pc1, pc2)
+ :ok = connect(pc1, pc2)
+
+ label = "my label"
+ {:ok, %DataChannel{ref: ref}} = PeerConnection.create_data_channel(pc2, label)
+
+ refute_receive {:ex_webrtc, ^pc1, {:data_channel, _}}
+
+ :ok = negotiate(pc2, pc1)
+
+ assert_receive {:ex_webrtc, ^pc1, {:data_channel, %DataChannel{label: ^label}}}
+ assert_receive {:ex_webrtc, ^pc2, {:data_channel_state_change, ^ref, :open}}
+ end
+ end
+
+ describe "sending data" do
+ setup do
+ {:ok, pc1} = PeerConnection.start_link()
+ {:ok, pc2} = PeerConnection.start_link()
+ {:ok, %DataChannel{ref: ref1}} = PeerConnection.create_data_channel(pc1, "label")
+
+ :ok = negotiate(pc1, pc2)
+ :ok = connect(pc1, pc2)
+
+ assert_receive {:ex_webrtc, ^pc2, {:data_channel, %DataChannel{ref: ref2}}}
+ assert_receive {:ex_webrtc, ^pc1, {:data_channel_state_change, ^ref1, :open}}
+
+ %{pc1: pc1, pc2: pc2, ref1: ref1, ref2: ref2}
+ end
+
+ test "message from initiating peer", %{pc1: pc1, pc2: pc2, ref1: ref1, ref2: ref2} do
+ data1 = <<1, 2, 3>>
+ :ok = PeerConnection.send_data(pc1, ref1, data1)
+ assert_receive {:ex_webrtc, ^pc2, {:data, ^ref2, ^data1}}
+
+ data2 = for i <- 1..2000, into: <<>>, do: <>
+ :ok = PeerConnection.send_data(pc1, ref1, data2)
+ assert_receive {:ex_webrtc, ^pc2, {:data, ^ref2, ^data2}}
+
+ :ok = PeerConnection.send_data(pc1, ref1, <<>>)
+ assert_receive {:ex_webrtc, ^pc2, {:data, ^ref2, <<>>}}
+ end
+
+ test "from other peer", %{pc1: pc1, pc2: pc2, ref1: ref1, ref2: ref2} do
+ data1 = <<1, 2, 3>>
+ :ok = PeerConnection.send_data(pc2, ref2, data1)
+ assert_receive {:ex_webrtc, ^pc1, {:data, ^ref1, ^data1}}
+
+ data2 = <<>>
+ :ok = PeerConnection.send_data(pc2, ref2, data2)
+ assert_receive {:ex_webrtc, ^pc1, {:data, ^ref1, ^data2}}
+ end
+
+ test "back and forth", %{pc1: pc1, pc2: pc2, ref1: ref1, ref2: ref2} do
+ data = for i <- 1..1024, into: <<>>, do: <>
+ :ok = PeerConnection.send_data(pc2, ref2, data)
+ assert_receive {:ex_webrtc, ^pc1, {:data, ^ref1, msg}}
+ :ok = PeerConnection.send_data(pc1, ref1, msg)
+ assert_receive {:ex_webrtc, ^pc2, {:data, ^ref2, next_msg}}
+ :ok = PeerConnection.send_data(pc2, ref2, next_msg)
+ assert_receive {:ex_webrtc, ^pc1, {:data, ^ref1, ^data}}
+ end
+
+ test "over distinct channels", %{pc1: pc1, pc2: pc2, ref1: ref1, ref2: ref2} do
+ {:ok, %DataChannel{ref: ref3}} = PeerConnection.create_data_channel(pc2, "next label")
+ assert_receive {:ex_webrtc, ^pc1, {:data_channel, %DataChannel{ref: ref4}}}
+
+ data = for i <- 1..1024, into: <<>>, do: <>
+ :ok = PeerConnection.send_data(pc1, ref4, data)
+ assert_receive {:ex_webrtc, ^pc2, {:data, ^ref3, msg}}
+ :ok = PeerConnection.send_data(pc2, ref2, msg)
+ assert_receive {:ex_webrtc, ^pc1, {:data, ^ref1, ^data}}
+ end
+ end
+end
diff --git a/test/ex_webrtc/peer_connection_test.exs b/test/ex_webrtc/peer_connection_test.exs
index 71f5c66a..6300d0e5 100644
--- a/test/ex_webrtc/peer_connection_test.exs
+++ b/test/ex_webrtc/peer_connection_test.exs
@@ -1390,18 +1390,4 @@ defmodule ExWebRTC.PeerConnectionTest do
refute_receive {:ex_webrtc, ^pc1, :negotiation_needed}
refute_receive {:ex_webrtc, ^pc2, :negotiation_needed}, 0
end
-
- defp connect(pc1, pc2) do
- # exchange ICE candidates
- assert_receive {:ex_webrtc, ^pc1, {:ice_candidate, candidate}}
- :ok = PeerConnection.add_ice_candidate(pc2, candidate)
- assert_receive {:ex_webrtc, ^pc2, {:ice_candidate, candidate}}
- :ok = PeerConnection.add_ice_candidate(pc1, candidate)
-
- # wait to establish connection
- assert_receive {:ex_webrtc, ^pc1, {:connection_state_change, :connected}}
- assert_receive {:ex_webrtc, ^pc2, {:connection_state_change, :connected}}
-
- :ok
- end
end
diff --git a/test/ex_webrtc/sctp_transport/dcep_test.exs b/test/ex_webrtc/sctp_transport/dcep_test.exs
new file mode 100644
index 00000000..c1eaf09f
--- /dev/null
+++ b/test/ex_webrtc/sctp_transport/dcep_test.exs
@@ -0,0 +1,39 @@
+defmodule ExWebRTC.SCTPTransport.DCEPTest do
+ use ExUnit.Case, async: true
+
+ alias ExWebRTC.SCTPTransport.DCEP
+
+ @encoded_dco <<3, 130, 0, 5, 0, 0, 0, 100, 0, 5, 0, 6, 104, 101, 108, 108, 111, 119, 101, 98,
+ 114, 116, 99>>
+ @decoded_dco %DCEP.DataChannelOpen{
+ reliability: :timed,
+ order: :unordered,
+ label: "hello",
+ protocol: "webrtc",
+ priority: 5,
+ param: 100
+ }
+
+ @encoded_dca <<2>>
+ @decoded_dca %DCEP.DataChannelAck{}
+
+ describe "decode/1" do
+ test "DataChannelAck" do
+ assert DCEP.encode(@decoded_dca) == @encoded_dca
+ end
+
+ test "DataChannelOpen" do
+ assert DCEP.encode(@decoded_dco) == @encoded_dco
+ end
+ end
+
+ describe "encode/1" do
+ test "DataChannelAck" do
+ assert {:ok, @decoded_dca} = DCEP.decode(@encoded_dca)
+ end
+
+ test "DataChannelOpen" do
+ assert {:ok, @decoded_dco} = DCEP.decode(@encoded_dco)
+ end
+ end
+end
diff --git a/test/support/test_utils.ex b/test/support/test_utils.ex
index bc09215b..4fb9a3ed 100644
--- a/test/support/test_utils.ex
+++ b/test/support/test_utils.ex
@@ -13,4 +13,27 @@ defmodule ExWebRTC.Support.TestUtils do
:ok = PeerConnection.set_remote_description(pc1, answer)
:ok
end
+
+ @spec connect(PeerConnection.peer_connection(), PeerConnection.peer_connection()) :: :ok
+ def connect(pc1, pc2) do
+ # exchange ICE candidates
+ for {pc1, pc2} <- [{pc1, pc2}, {pc2, pc1}] do
+ receive do
+ {:ex_webrtc, ^pc1, {:ice_candidate, candidate}} ->
+ :ok = PeerConnection.add_ice_candidate(pc2, candidate)
+ after
+ 2000 -> raise "Unable to connect"
+ end
+ end
+
+ for pc <- [pc1, pc2] do
+ receive do
+ {:ex_webrtc, ^pc, {:connection_state_change, :connected}} -> :ok
+ after
+ 2000 -> raise "Unable to connect"
+ end
+ end
+
+ :ok
+ end
end