diff --git a/examples/save_to_file/lib/save_to_file/peer_handler.ex b/examples/save_to_file/lib/save_to_file/peer_handler.ex index ca32a73..793cfa1 100644 --- a/examples/save_to_file/lib/save_to_file/peer_handler.ex +++ b/examples/save_to_file/lib/save_to_file/peer_handler.ex @@ -10,10 +10,12 @@ defmodule SaveToFile.PeerHandler do } alias ExWebRTC.Media.{IVF, Ogg} - alias ExWebRTC.RTP.Depayloader + alias ExWebRTC.RTP.{Depayloader, JitterBuffer} @behaviour WebSock + @jitter_buffer_latency_ms 100 + @video_file "./video.ivf" @audio_file "./audio.ogg" @@ -53,8 +55,10 @@ defmodule SaveToFile.PeerHandler do audio_track_id: nil, video_writer: nil, video_depayloader: nil, + video_buffer: nil, audio_writer: nil, audio_depayloader: nil, + audio_buffer: nil, frames_cnt: 0 } @@ -73,10 +77,22 @@ defmodule SaveToFile.PeerHandler do handle_webrtc_msg(msg, state) end + @impl true + def handle_info({:jitter_buffer_timer, kind}, state) do + case kind do + :video -> state.video_buffer + :audio -> state.audio_buffer + end + |> JitterBuffer.handle_timer() + |> handle_jitter_buffer_result(kind, state) + end + @impl true def terminate(reason, state) do Logger.warning("WebSocket connection was terminated, reason: #{inspect(reason)}") + state = flush_jitter_buffers(state) + if state.video_writer, do: IVF.Writer.close(state.video_writer) if state.audio_writer, do: Ogg.Writer.close(state.audio_writer) end @@ -142,11 +158,13 @@ defmodule SaveToFile.PeerHandler do ) {:ok, video_depayloader} = @video_codecs |> hd() |> Depayloader.new() + video_buffer = JitterBuffer.new(latency: @jitter_buffer_latency_ms) state = %{ state | video_depayloader: video_depayloader, video_writer: video_writer, + video_buffer: video_buffer, video_track_id: id } @@ -157,11 +175,13 @@ defmodule SaveToFile.PeerHandler do # by default uses 1 mono channel and 48k clock rate {:ok, audio_writer} = Ogg.Writer.open(@audio_file) {:ok, audio_depayloader} = @audio_codecs |> hd() |> Depayloader.new() + audio_buffer = JitterBuffer.new(latency: @jitter_buffer_latency_ms) state = %{ state | audio_depayloader: audio_depayloader, audio_writer: audio_writer, + audio_buffer: audio_buffer, audio_track_id: id } @@ -175,32 +195,78 @@ defmodule SaveToFile.PeerHandler do end defp handle_webrtc_msg({:rtp, id, nil, packet}, %{video_track_id: id} = state) do + state.video_buffer + |> JitterBuffer.place_packet(packet) + |> handle_jitter_buffer_result(:video, state) + end + + defp handle_webrtc_msg({:rtp, id, nil, packet}, %{audio_track_id: id} = state) do + state.audio_buffer + |> JitterBuffer.place_packet(packet) + |> handle_jitter_buffer_result(:audio, state) + end + + defp handle_webrtc_msg(_msg, state), do: {:ok, state} + + defp handle_jitter_buffer_result({packets, timer, buffer}, kind, state) do state = - case Depayloader.depayload(state.video_depayloader, packet) do - {nil, video_depayloader} -> - %{state | video_depayloader: video_depayloader} - - {vp8_frame, video_depayloader} -> - frame = %IVF.Frame{timestamp: state.frames_cnt, data: vp8_frame} - {:ok, video_writer} = IVF.Writer.write_frame(state.video_writer, frame) - - %{ - state - | video_depayloader: video_depayloader, - video_writer: video_writer, - frames_cnt: state.frames_cnt + 1 - } + case kind do + :video -> %{state | video_buffer: buffer} + :audio -> %{state | audio_buffer: buffer} end + state = + Enum.reduce(packets, state, fn packet, state -> handle_packet(packet, kind, state) end) + + unless is_nil(timer), do: Process.send_after(self(), {:jitter_buffer_timer, kind}, timer) + {:ok, state} end - defp handle_webrtc_msg({:rtp, id, nil, packet}, %{audio_track_id: id} = state) do + defp handle_packet(packet, :video, state) do + case Depayloader.depayload(state.video_depayloader, packet) do + {nil, video_depayloader} -> + %{state | video_depayloader: video_depayloader} + + {vp8_frame, video_depayloader} -> + frame = %IVF.Frame{timestamp: state.frames_cnt, data: vp8_frame} + {:ok, video_writer} = IVF.Writer.write_frame(state.video_writer, frame) + + %{ + state + | video_depayloader: video_depayloader, + video_writer: video_writer, + frames_cnt: state.frames_cnt + 1 + } + end + end + + defp handle_packet(packet, :audio, state) do {opus_packet, depayloader} = Depayloader.depayload(state.audio_depayloader, packet) {:ok, audio_writer} = Ogg.Writer.write_packet(state.audio_writer, opus_packet) - {:ok, %{state | audio_depayloader: depayloader, audio_writer: audio_writer}} + %{state | audio_depayloader: depayloader, audio_writer: audio_writer} end - defp handle_webrtc_msg(_msg, state), do: {:ok, state} + defp flush_jitter_buffers(state), + do: state |> flush_jitter_buffer(:video) |> flush_jitter_buffer(:audio) + + defp flush_jitter_buffer(state, kind) do + buffer = + case kind do + :video -> state.video_buffer + :audio -> state.audio_buffer + end + + if is_nil(buffer) do + state + else + {:ok, state} = + buffer + |> JitterBuffer.flush() + |> handle_jitter_buffer_result(kind, state) + + state + end + end end diff --git a/examples/save_to_file/mix.lock b/examples/save_to_file/mix.lock index 9800460..63910ca 100644 --- a/examples/save_to_file/mix.lock +++ b/examples/save_to_file/mix.lock @@ -3,12 +3,11 @@ "bunch": {:hex, :bunch, "1.6.1", "5393d827a64d5f846092703441ea50e65bc09f37fd8e320878f13e63d410aec7", [:mix], [], "hexpm", "286cc3add551628b30605efbe2fca4e38cc1bea89bcd0a1a7226920b3364fe4a"}, "bunch_native": {:hex, :bunch_native, "0.5.0", "8ac1536789a597599c10b652e0b526d8833348c19e4739a0759a2bedfd924e63", [:mix], [{:bundlex, "~> 1.0", [hex: :bundlex, repo: "hexpm", optional: false]}], "hexpm", "24190c760e32b23b36edeb2dc4852515c7c5b3b8675b1a864e0715bdd1c8f80d"}, "bundlex": {:hex, :bundlex, "1.5.3", "35d01e5bc0679510dd9a327936ffb518f63f47175c26a35e708cc29eaec0890b", [:mix], [{:bunch, "~> 1.0", [hex: :bunch, repo: "hexpm", optional: false]}, {:elixir_uuid, "~> 1.2", [hex: :elixir_uuid, repo: "hexpm", optional: false]}, {:qex, "~> 0.5", [hex: :qex, repo: "hexpm", optional: false]}, {:req, ">= 0.4.0", [hex: :req, repo: "hexpm", optional: false]}, {:zarex, "~> 1.0", [hex: :zarex, repo: "hexpm", optional: false]}], "hexpm", "debd0eac151b404f6216fc60222761dff049bf26f7d24d066c365317650cd118"}, - "castore": {:hex, :castore, "1.0.8", "dedcf20ea746694647f883590b82d9e96014057aff1d44d03ec90f36a5c0dc6e", [:mix], [], "hexpm", "0b2b66d2ee742cb1d9cb8c8be3b43c3a70ee8651f37b75a8b982e036752983f1"}, "crc": {:hex, :crc, "0.10.5", "ee12a7c056ac498ef2ea985ecdc9fa53c1bfb4e53a484d9f17ff94803707dfd8", [:mix, :rebar3], [{:elixir_make, "~> 0.6", [hex: :elixir_make, repo: "hexpm", optional: false]}], "hexpm", "3e673b6495a9525c5c641585af1accba59a1eb33de697bedf341e247012c2c7f"}, "elixir_make": {:hex, :elixir_make, "0.8.4", "4960a03ce79081dee8fe119d80ad372c4e7badb84c493cc75983f9d3bc8bde0f", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:certifi, "~> 2.0", [hex: :certifi, repo: "hexpm", optional: true]}], "hexpm", "6e7f1d619b5f61dfabd0a20aa268e575572b542ac31723293a4c1a567d5ef040"}, "elixir_uuid": {:hex, :elixir_uuid, "1.2.1", "dce506597acb7e6b0daeaff52ff6a9043f5919a4c3315abb4143f0b00378c097", [:mix], [], "hexpm", "f7eba2ea6c3555cea09706492716b0d87397b88946e6380898c2889d68585752"}, "ex_dtls": {:hex, :ex_dtls, "0.16.0", "3ae38025ccc77f6db573e2e391602fa9bbc02253c137d8d2d59469a66cbe806b", [:mix], [{:bundlex, "~> 1.5.3", [hex: :bundlex, repo: "hexpm", optional: false]}, {:unifex, "~> 1.0", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "2a4e30d74c6ddf95cc5b796423293c06a0da295454c3823819808ff031b4b361"}, - "ex_ice": {:hex, :ex_ice, "0.8.0", "f9bd181e8fd2f8ac9a808587ee8a47bf667143069d75f6e4892a62156d798aa7", [:mix], [{:elixir_uuid, "~> 1.0", [hex: :elixir_uuid, repo: "hexpm", optional: false]}, {:ex_stun, "~> 0.2.0", [hex: :ex_stun, repo: "hexpm", optional: false]}, {:ex_turn, "~> 0.1.0", [hex: :ex_turn, repo: "hexpm", optional: false]}], "hexpm", "b0476f6b18986f6df48fda4cecb3be5022323572790d1bb49da10b177c936b4e"}, + "ex_ice": {:hex, :ex_ice, "0.8.1", "4d5c911766ce92e13323b632a55d9ab821092f13fc2ebf236dc233c8c1f9a64c", [:mix], [{:elixir_uuid, "~> 1.0", [hex: :elixir_uuid, repo: "hexpm", optional: false]}, {:ex_stun, "~> 0.2.0", [hex: :ex_stun, repo: "hexpm", optional: false]}, {:ex_turn, "~> 0.1.0", [hex: :ex_turn, repo: "hexpm", optional: false]}], "hexpm", "8f10134e2eb7e6aebbf8fba0d5fcec56d8f8db3e94c3dde045feb463979c2dda"}, "ex_libsrtp": {:hex, :ex_libsrtp, "0.7.2", "211bd89c08026943ce71f3e2c0231795b99cee748808ed3ae7b97cd8d2450b6b", [:mix], [{:bunch, "~> 1.6", [hex: :bunch, repo: "hexpm", optional: false]}, {:bundlex, "~> 1.3", [hex: :bundlex, repo: "hexpm", optional: false]}, {:membrane_precompiled_dependency_provider, "~> 0.1.0", [hex: :membrane_precompiled_dependency_provider, repo: "hexpm", optional: false]}, {:unifex, "~> 1.1", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "2e20645d0d739a4ecdcf8d4810a0c198120c8a2f617f2b75b2e2e704d59f492a"}, "ex_rtcp": {:hex, :ex_rtcp, "0.4.0", "f9e515462a9581798ff6413583a25174cfd2101c94a2ebee871cca7639886f0a", [:mix], [], "hexpm", "28956602cf210d692fcdaf3f60ca49681634e1deb28ace41246aee61ee22dc3b"}, "ex_rtp": {:hex, :ex_rtp, "0.4.0", "1f1b5c1440a904706011e3afbb41741f5da309ce251cb986690ce9fd82636658", [:mix], [], "hexpm", "0f72d80d5953a62057270040f0f1ee6f955c08eeae82ac659c038001d7d5a790"}, @@ -22,7 +21,6 @@ "mime": {:hex, :mime, "2.0.6", "8f18486773d9b15f95f4f4f1e39b710045fa1de891fada4516559967276e4dc2", [:mix], [], "hexpm", "c9945363a6b26d747389aac3643f8e0e09d30499a138ad64fe8fd1d13d9b153e"}, "mint": {:hex, :mint, "1.6.2", "af6d97a4051eee4f05b5500671d47c3a67dac7386045d87a904126fd4bbcea2e", [:mix], [{:castore, "~> 0.1.0 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:hpax, "~> 0.1.1 or ~> 0.2.0 or ~> 1.0", [hex: :hpax, repo: "hexpm", optional: false]}], "hexpm", "5ee441dffc1892f1ae59127f74afe8fd82fda6587794278d924e4d90ea3d63f9"}, "nimble_options": {:hex, :nimble_options, "1.1.1", "e3a492d54d85fc3fd7c5baf411d9d2852922f66e69476317787a7b2bb000a61b", [:mix], [], "hexpm", "821b2470ca9442c4b6984882fe9bb0389371b8ddec4d45a9504f00a66f650b44"}, - "nimble_ownership": {:hex, :nimble_ownership, "0.3.1", "99d5244672fafdfac89bfad3d3ab8f0d367603ce1dc4855f86a1c75008bce56f", [:mix], [], "hexpm", "4bf510adedff0449a1d6e200e43e57a814794c8b5b6439071274d248d272a549"}, "nimble_pool": {:hex, :nimble_pool, "1.1.0", "bf9c29fbdcba3564a8b800d1eeb5a3c58f36e1e11d7b7fb2e084a643f645f06b", [:mix], [], "hexpm", "af2e4e6b34197db81f7aad230c1118eac993acc0dae6bc83bac0126d4ae0813a"}, "plug": {:hex, :plug, "1.15.3", "712976f504418f6dff0a3e554c40d705a9bcf89a7ccef92fc6a5ef8f16a30a97", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.1.1 or ~> 1.2 or ~> 2.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "cc4365a3c010a56af402e0809208873d113e9c38c401cabd88027ef4f5c01fd2"}, "plug_crypto": {:hex, :plug_crypto, "2.1.0", "f44309c2b06d249c27c8d3f65cfe08158ade08418cf540fd4f72d4d6863abb7b", [:mix], [], "hexpm", "131216a4b030b8f8ce0f26038bc4421ae60e4bb95c5cf5395e1421437824c4fa"}, @@ -33,6 +31,6 @@ "thousand_island": {:hex, :thousand_island, "1.3.5", "6022b6338f1635b3d32406ff98d68b843ba73b3aa95cfc27154223244f3a6ca5", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "2be6954916fdfe4756af3239fb6b6d75d0b8063b5df03ba76fd8a4c87849e180"}, "unifex": {:hex, :unifex, "1.2.0", "90d1ec5e6d788350e07e474f7bd8b0ee866d6606beb9ca4e20dbb26328712a84", [:mix], [{:bunch, "~> 1.0", [hex: :bunch, repo: "hexpm", optional: false]}, {:bundlex, "~> 1.4", [hex: :bundlex, repo: "hexpm", optional: false]}, {:shmex, "~> 0.5.0", [hex: :shmex, repo: "hexpm", optional: false]}], "hexpm", "7a8395aabc3ba6cff04bbe5b995de7f899a38eb57f189e49927d6b8b6ccb6883"}, "websock": {:hex, :websock, "0.5.3", "2f69a6ebe810328555b6fe5c831a851f485e303a7c8ce6c5f675abeb20ebdadc", [:mix], [], "hexpm", "6105453d7fac22c712ad66fab1d45abdf049868f253cf719b625151460b8b453"}, - "websock_adapter": {:hex, :websock_adapter, "0.5.6", "0437fe56e093fd4ac422de33bf8fc89f7bc1416a3f2d732d8b2c8fd54792fe60", [:mix], [{:bandit, ">= 0.6.0", [hex: :bandit, repo: "hexpm", optional: true]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.6", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:websock, "~> 0.5", [hex: :websock, repo: "hexpm", optional: false]}], "hexpm", "e04378d26b0af627817ae84c92083b7e97aca3121196679b73c73b99d0d133ea"}, + "websock_adapter": {:hex, :websock_adapter, "0.5.7", "65fa74042530064ef0570b75b43f5c49bb8b235d6515671b3d250022cb8a1f9e", [:mix], [{:bandit, ">= 0.6.0", [hex: :bandit, repo: "hexpm", optional: true]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.6", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:websock, "~> 0.5", [hex: :websock, repo: "hexpm", optional: false]}], "hexpm", "d0f478ee64deddfec64b800673fd6e0c8888b079d9f3444dd96d2a98383bdbd1"}, "zarex": {:hex, :zarex, "1.0.5", "58239e3ee5d75f343262bb4df5cf466555a1c689f920e5d3651a9333972f7c7e", [:mix], [], "hexpm", "9fb72ef0567c2b2742f5119a1ba8a24a2fabb21b8d09820aefbf3e592fa9a46a"}, } diff --git a/lib/ex_webrtc/rtp/jitter_buffer.ex b/lib/ex_webrtc/rtp/jitter_buffer.ex new file mode 100644 index 0000000..c85a979 --- /dev/null +++ b/lib/ex_webrtc/rtp/jitter_buffer.ex @@ -0,0 +1,164 @@ +defmodule ExWebRTC.RTP.JitterBuffer do + @moduledoc """ + Buffers and reorders RTP packets based on `sequence_number`, introducing controlled latency + in order to combat network jitter and improve the QoS. + """ + + # Heavily inspired by: + # https://github.com/membraneframework/membrane_rtp_plugin/blob/23f3279540aea7dea3a194fd5a1680c2549aebae/lib/membrane/rtp/jitter_buffer.ex + + alias ExWebRTC.RTP.JitterBuffer.PacketStore + alias ExRTP.Packet + + @default_latency_ms 200 + + @typedoc """ + Options that can be passed to `new/1`. + + * `latency` - latency introduced by the buffer, in milliseconds. `#{@default_latency_ms}` by default. + """ + @type options :: [latency: non_neg_integer()] + + @typedoc """ + Time (in milliseconds) after which `handle_timeout/1` should be called. + Can be `nil`, in which case no timer needs to be set. + """ + @type timer :: non_neg_integer() | nil + + @typedoc """ + The 3-element tuple returned by all functions other than `new/1`. + + * `packets` - a list with packets flushed from the buffer as a result of the function call. May be empty. + * `timer_duration_ms` - see `t:timer/0`. + * `buffer` - `t:#{inspect(__MODULE__)}.t/0`. + + Generally speaking, all results of this type can be handled in the same way. + """ + @type result :: {packets :: [Packet.t()], timer_duration_ms :: timer(), buffer :: t()} + + @opaque t :: %__MODULE__{ + latency: non_neg_integer(), + store: PacketStore.t(), + state: :initial_wait | :timer_set | :timer_not_set + } + + @enforce_keys [:latency] + defstruct @enforce_keys ++ + [ + store: %PacketStore{}, + state: :initial_wait + ] + + @doc """ + Creates a new `t:#{inspect(__MODULE__)}.t/0`. + """ + @spec new(options()) :: t() + def new(opts \\ []) do + %__MODULE__{ + latency: opts[:latency] || @default_latency_ms + } + end + + @doc """ + Places a packet in the JitterBuffer. + + Note: The initial latency timer will be set after the first packet is inserted into the buffer. + If you want to start it at your own discretion, schedule a `handle_timeout/1` call prior to that. + """ + @spec insert(t(), Packet.t()) :: result() + def insert(buffer, packet) + + def insert(%{state: :initial_wait} = buffer, packet) do + {buffer, timer} = maybe_set_timer(buffer) + {_result, buffer} = try_insert_packet(buffer, packet) + + {[], timer, buffer} + end + + def insert(buffer, packet) do + case try_insert_packet(buffer, packet) do + {:ok, buffer} -> send_packets(buffer) + {:error, buffer} -> {[], nil, buffer} + end + end + + @doc """ + Flushes all remaining packets and resets the JitterBuffer. + + Note: After flushing, the rollover counter is reset to `0`. + """ + @spec flush(t()) :: result() + def flush(buffer) do + packets = + buffer.store + |> PacketStore.dump() + |> handle_missing_packets() + + {packets, nil, %__MODULE__{latency: buffer.latency}} + end + + @doc """ + Handles the end of a previously set timer. + """ + @spec handle_timeout(t()) :: result() + def handle_timeout(buffer) do + %__MODULE__{buffer | state: :timer_not_set} |> send_packets() + end + + @spec try_insert_packet(t(), Packet.t()) :: {:ok | :error, t()} + defp try_insert_packet(buffer, packet) do + case PacketStore.insert(buffer.store, packet) do + {:ok, store} -> {:ok, %__MODULE__{buffer | store: store}} + {:error, :late_packet} -> {:error, buffer} + end + end + + @spec send_packets(t()) :: result() + defp send_packets(%{store: store} = buffer) do + # Flush packets that stayed in queue longer than latency and any gaps before them + {too_old_packets, store} = PacketStore.flush_older_than(store, buffer.latency) + # Additionally, flush packets as long as there are no gaps + {gapless_packets, store} = PacketStore.flush_ordered(store) + + packets = + too_old_packets + |> Stream.concat(gapless_packets) + |> handle_missing_packets() + + {buffer, timer} = maybe_set_timer(%__MODULE__{buffer | store: store}) + + {packets, timer, buffer} + end + + @spec handle_missing_packets(Enumerable.t(Packet.t() | nil)) :: [Packet.t()] + defp handle_missing_packets(packets) do + # TODO: nil -- missing packet (maybe owner should be notified about that) + Enum.reject(packets, &is_nil/1) + end + + @spec maybe_set_timer(t()) :: {t(), timer()} + defp maybe_set_timer(buffer) + + defp maybe_set_timer(%{state: :initial_wait} = buffer) do + case PacketStore.first_packet_timestamp(buffer.store) do + # If we're inserting the very first packet, set the initial latency timer + nil -> {buffer, buffer.latency} + _ts -> {buffer, nil} + end + end + + defp maybe_set_timer(%{state: :timer_not_set} = buffer) do + case PacketStore.first_packet_timestamp(buffer.store) do + nil -> + {buffer, nil} + + timestamp_ms -> + since_insertion = System.monotonic_time(:millisecond) - timestamp_ms + send_after_time = max(0, buffer.latency - since_insertion) + + {%__MODULE__{buffer | state: :timer_set}, send_after_time} + end + end + + defp maybe_set_timer(%{state: :timer_set} = buffer), do: {buffer, nil} +end diff --git a/lib/ex_webrtc/rtp/jitter_buffer/heap.ex b/lib/ex_webrtc/rtp/jitter_buffer/heap.ex new file mode 100644 index 0000000..6166f59 --- /dev/null +++ b/lib/ex_webrtc/rtp/jitter_buffer/heap.ex @@ -0,0 +1,118 @@ +defmodule ExWebRTC.RTP.JitterBuffer.Heap do + @moduledoc false + # Implementation of a heap (min-heap by default) + # + # At the moment, this module uses a regular `Map` (with O(log n) access) to store the data. + # + # TODO: Run performance tests and determine if it would be better to use: + # - ETS + # - :array + # - some other data structure? + + @type comparator :: (term(), term() -> boolean()) + + @opaque t :: %__MODULE__{ + comparator: comparator(), + tree: %{non_neg_integer() => term()} + } + + defstruct [:comparator, :tree] + + defimpl Enumerable do + alias ExWebRTC.RTP.JitterBuffer.Heap + + def count(heap), do: {:ok, map_size(heap.tree)} + def member?(heap, elem), do: {:ok, heap.tree |> Map.values() |> Enum.member?(elem)} + + def reduce(_heap, {:halt, acc}, _fun), do: {:halted, acc} + def reduce(heap, {:suspend, acc}, fun), do: {:suspended, acc, &reduce(heap, &1, fun)} + def reduce(heap, {:cont, acc}, _fun) when map_size(heap.tree) == 0, do: {:done, acc} + + def reduce(heap, {:cont, acc}, fun) do + elem = Heap.root(heap) + + heap |> Heap.pop() |> reduce(fun.(elem, acc), fun) + end + + def slice(_heap), do: {:error, __MODULE__} + end + + @spec new(comparator()) :: t() + def new(comparator \\ & _first, ^last_idx => last} = heap.tree + + tree = + heap.tree + |> Map.delete(last_idx) + |> Map.put(0, last) + + heapify(%{heap | tree: tree}, 0) + end + + defp restore_heap(heap, 0), do: heap + + defp restore_heap(heap, idx) do + p = parent(idx) + + heap + |> heapify(p) + |> restore_heap(p) + end + + defp heapify(heap, i) do + n = map_size(heap.tree) + + max_idx = + [i, left(i), right(i)] + |> Stream.filter(&(&1 < n)) + |> Enum.max_by(&heap.tree[&1], heap.comparator, fn -> nil end) + + if max_idx != i do + %{^i => t1, ^max_idx => t2} = heap.tree + + tree = + heap.tree + |> Map.put(i, t2) + |> Map.put(max_idx, t1) + + heapify(%{heap | tree: tree}, max_idx) + else + heap + end + end + + defp left(i), do: 2 * i + 1 + defp right(i), do: 2 * i + 2 + defp parent(i), do: div(i - 1, 2) +end diff --git a/lib/ex_webrtc/rtp/jitter_buffer/packet_store.ex b/lib/ex_webrtc/rtp/jitter_buffer/packet_store.ex new file mode 100644 index 0000000..1b7cd39 --- /dev/null +++ b/lib/ex_webrtc/rtp/jitter_buffer/packet_store.ex @@ -0,0 +1,267 @@ +defmodule ExWebRTC.RTP.JitterBuffer.PacketStore do + @moduledoc false + + # Store for RTP packets. Packets are stored in `Heap` ordered by packet index. Packet index is + # defined in RFC 3711 (SRTP) as: 2^16 * rollover count + sequence number. + + import Bitwise + + alias ExWebRTC.RTP.JitterBuffer.Heap + + defmodule Entry do + @moduledoc false + # Describes a structure that is stored in the PacketStore. + + @enforce_keys [:index, :timestamp_ms, :packet] + defstruct @enforce_keys + + @type t :: %__MODULE__{ + index: non_neg_integer(), + timestamp_ms: integer(), + packet: ExRTP.Packet.t() + } + + @spec new(ExRTP.Packet.t(), non_neg_integer()) :: t() + def new(packet, index) do + %__MODULE__{ + index: index, + timestamp_ms: System.monotonic_time(:millisecond), + packet: packet + } + end + + @doc """ + Compares two entries. + + Returns true if the first entry is older than the second one. + """ + @spec comparator(t(), t()) :: boolean() + # Designed to be used with `JitterBuffer.Heap` + def comparator(%__MODULE__{index: l_index}, %__MODULE__{index: r_index}), + do: l_index < r_index + end + + @seq_number_limit bsl(1, 16) + + defstruct flush_index: nil, + highest_incoming_index: nil, + heap: Heap.new(&Entry.comparator/2), + set: MapSet.new(), + rollover_count: 0 + + @typedoc """ + Type describing PacketStore structure. + + Fields: + - `flush_index` - index of the last packet that has been emitted (or would have been + emitted, but never arrived) as a result of a call to one of the `flush` functions + - `highest_incoming_index` - the highest index in the buffer so far, mapping to the most recently produced + RTP packet placed in JitterBuffer + - `rollover_count` - count of all performed rollovers (cycles of sequence number) + - `heap` - contains entries containing packets + - `set` - helper structure for faster read operations; content is the same as in `heap` + """ + @type t :: %__MODULE__{ + flush_index: non_neg_integer() | nil, + highest_incoming_index: non_neg_integer() | nil, + heap: Heap.t(), + set: MapSet.t(), + rollover_count: non_neg_integer() + } + + @doc """ + Inserts a packet into the Store. + + Each subsequent packet must have sequence number greater than the previously returned + one or be part of a rollover. + """ + @spec insert(t(), ExRTP.Packet.t()) :: {:ok, t()} | {:error, :late_packet} + def insert(store, %{sequence_number: seq_num} = packet) do + do_insert_packet(store, packet, seq_num) + end + + defp do_insert_packet(%__MODULE__{flush_index: nil} = store, packet, 0) do + store = add_entry(store, Entry.new(packet, @seq_number_limit), :next) + {:ok, %__MODULE__{store | flush_index: @seq_number_limit - 1}} + end + + defp do_insert_packet(%__MODULE__{flush_index: nil} = store, packet, seq_num) do + store = add_entry(store, Entry.new(packet, seq_num), :current) + {:ok, %__MODULE__{store | flush_index: seq_num - 1}} + end + + defp do_insert_packet( + %__MODULE__{ + flush_index: flush_index, + highest_incoming_index: highest_incoming_index, + rollover_count: roc + } = store, + packet, + seq_num + ) do + highest_seq_num = rem(highest_incoming_index, @seq_number_limit) + + {rollover, index} = + case from_which_rollover(highest_seq_num, seq_num, @seq_number_limit) do + :current -> {:current, seq_num + roc * @seq_number_limit} + :previous -> {:previous, seq_num + (roc - 1) * @seq_number_limit} + :next -> {:next, seq_num + (roc + 1) * @seq_number_limit} + end + + if index > flush_index do + entry = Entry.new(packet, index) + {:ok, add_entry(store, entry, rollover)} + else + {:error, :late_packet} + end + end + + @doc """ + Flushes the store until the first gap in sequence numbers of entries + """ + @spec flush_ordered(t()) :: {[ExRTP.Packet.t() | nil], t()} + def flush_ordered(store) do + flush_while(store, fn %__MODULE__{flush_index: flush_index}, %Entry{index: index} -> + index == flush_index + 1 + end) + end + + @doc """ + Flushes the store as long as it contains a packet with the timestamp older than provided duration + """ + @spec flush_older_than(t(), non_neg_integer()) :: {[ExRTP.Packet.t() | nil], t()} + def flush_older_than(store, max_age_ms) do + max_age_timestamp = System.monotonic_time(:millisecond) - max_age_ms + + flush_while(store, fn _store, %Entry{timestamp_ms: timestamp} -> + timestamp <= max_age_timestamp + end) + end + + @doc """ + Returns all packets that are stored in the `PacketStore`. + """ + @spec dump(t()) :: [ExRTP.Packet.t() | nil] + def dump(%__MODULE__{} = store) do + {packets, _store} = flush_while(store, fn _store, _entry -> true end) + packets + end + + @doc """ + Returns timestamp (time of insertion) of the packet with the lowest index + """ + @spec first_packet_timestamp(t()) :: integer() | nil + def first_packet_timestamp(%__MODULE__{heap: heap}) do + case Heap.root(heap) do + %Entry{timestamp_ms: time} -> time + nil -> nil + end + end + + @spec from_which_rollover(number() | nil, number(), number()) :: :current | :previous | :next + def from_which_rollover(previous_value, new_value, rollover_length) + + def from_which_rollover(nil, _new, _rollover_length), do: :current + + def from_which_rollover(previous_value, new_value, rollover_length) do + # a) current rollover + distance_if_current = abs(previous_value - new_value) + # b) new_value is from the previous rollover + distance_if_previous = abs(previous_value - (new_value - rollover_length)) + # c) new_value is in the next rollover + distance_if_next = abs(previous_value - (new_value + rollover_length)) + + [ + {:current, distance_if_current}, + {:previous, distance_if_previous}, + {:next, distance_if_next} + ] + |> Enum.min_by(fn {_atom, distance} -> distance end) + |> then(fn {result, _value} -> result end) + end + + @doc false + @spec flush_one(t()) :: {Entry.t() | nil, t()} + # Flushes the store to the packet with the next sequence number. + # + # If this packet is present, it will be returned. + # Otherwise it will be treated as late and rejected on attempt to insert into the store. + # + # Should be called directly only when testing this module + def flush_one(store) + + def flush_one(%__MODULE__{flush_index: nil} = store) do + {nil, store} + end + + def flush_one(%__MODULE__{flush_index: flush_index, heap: heap, set: set} = store) do + record = Heap.root(heap) + + expected_next_index = flush_index + 1 + + {result, store} = + if record != nil and record.index == expected_next_index do + updated_heap = Heap.pop(heap) + updated_set = MapSet.delete(set, record.index) + + updated_store = %__MODULE__{store | heap: updated_heap, set: updated_set} + + {record, updated_store} + else + # TODO: instead of nil use expected_next_index to notify owner about missing packet + {nil, store} + end + + {result, %__MODULE__{store | flush_index: expected_next_index}} + end + + defp flush_while(%__MODULE__{heap: heap} = store, fun, acc \\ []) do + heap + |> Heap.root() + |> case do + nil -> + {Enum.reverse(acc), store} + + entry -> + if fun.(store, entry) do + {entry, store} = flush_one(store) + packet = get_packet(entry) + flush_while(store, fun, [packet | acc]) + else + {Enum.reverse(acc), store} + end + end + end + + defp add_entry(%__MODULE__{heap: heap, set: set} = store, %Entry{} = entry, entry_rollover) do + if set |> MapSet.member?(entry.index) do + store + else + %__MODULE__{store | heap: Heap.push(heap, entry), set: MapSet.put(set, entry.index)} + |> update_highest_incoming_index(entry.index) + |> update_roc(entry_rollover) + end + end + + defp update_highest_incoming_index( + %__MODULE__{highest_incoming_index: last} = store, + added_index + ) + when added_index > last or last == nil, + do: %__MODULE__{store | highest_incoming_index: added_index} + + defp update_highest_incoming_index( + %__MODULE__{highest_incoming_index: last} = store, + added_index + ) + when last >= added_index, + do: store + + defp update_roc(%{rollover_count: roc} = store, :next), + do: %__MODULE__{store | rollover_count: roc + 1} + + defp update_roc(store, _entry_rollover), do: store + + defp get_packet(nil), do: nil + defp get_packet(entry), do: entry.packet +end diff --git a/mix.lock b/mix.lock index b9b74f3..c024172 100644 --- a/mix.lock +++ b/mix.lock @@ -3,7 +3,6 @@ "bunch_native": {:hex, :bunch_native, "0.5.0", "8ac1536789a597599c10b652e0b526d8833348c19e4739a0759a2bedfd924e63", [:mix], [{:bundlex, "~> 1.0", [hex: :bundlex, repo: "hexpm", optional: false]}], "hexpm", "24190c760e32b23b36edeb2dc4852515c7c5b3b8675b1a864e0715bdd1c8f80d"}, "bundlex": {:hex, :bundlex, "1.5.3", "35d01e5bc0679510dd9a327936ffb518f63f47175c26a35e708cc29eaec0890b", [:mix], [{:bunch, "~> 1.0", [hex: :bunch, repo: "hexpm", optional: false]}, {:elixir_uuid, "~> 1.2", [hex: :elixir_uuid, repo: "hexpm", optional: false]}, {:qex, "~> 0.5", [hex: :qex, repo: "hexpm", optional: false]}, {:req, ">= 0.4.0", [hex: :req, repo: "hexpm", optional: false]}, {:zarex, "~> 1.0", [hex: :zarex, repo: "hexpm", optional: false]}], "hexpm", "debd0eac151b404f6216fc60222761dff049bf26f7d24d066c365317650cd118"}, "bunt": {:hex, :bunt, "1.0.0", "081c2c665f086849e6d57900292b3a161727ab40431219529f13c4ddcf3e7a44", [:mix], [], "hexpm", "dc5f86aa08a5f6fa6b8096f0735c4e76d54ae5c9fa2c143e5a1fc7c1cd9bb6b5"}, - "castore": {:hex, :castore, "1.0.8", "dedcf20ea746694647f883590b82d9e96014057aff1d44d03ec90f36a5c0dc6e", [:mix], [], "hexpm", "0b2b66d2ee742cb1d9cb8c8be3b43c3a70ee8651f37b75a8b982e036752983f1"}, "crc": {:hex, :crc, "0.10.5", "ee12a7c056ac498ef2ea985ecdc9fa53c1bfb4e53a484d9f17ff94803707dfd8", [:mix, :rebar3], [{:elixir_make, "~> 0.6", [hex: :elixir_make, repo: "hexpm", optional: false]}], "hexpm", "3e673b6495a9525c5c641585af1accba59a1eb33de697bedf341e247012c2c7f"}, "credo": {:hex, :credo, "1.7.7", "771445037228f763f9b2afd612b6aa2fd8e28432a95dbbc60d8e03ce71ba4446", [:mix], [{:bunt, "~> 0.2.1 or ~> 1.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2 or ~> 1.0", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "8bc87496c9aaacdc3f90f01b7b0582467b69b4bd2441fe8aae3109d843cc2f2e"}, "dialyxir": {:hex, :dialyxir, "1.4.3", "edd0124f358f0b9e95bfe53a9fcf806d615d8f838e2202a9f430d59566b6b53b", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "bf2cfb75cd5c5006bec30141b131663299c661a864ec7fbbc72dfa557487a986"}, @@ -13,7 +12,7 @@ "erlex": {:hex, :erlex, "0.2.7", "810e8725f96ab74d17aac676e748627a07bc87eb950d2b83acd29dc047a30595", [:mix], [], "hexpm", "3ed95f79d1a844c3f6bf0cea61e0d5612a42ce56da9c03f01df538685365efb0"}, "ex_doc": {:hex, :ex_doc, "0.31.2", "8b06d0a5ac69e1a54df35519c951f1f44a7b7ca9a5bb7a260cd8a174d6322ece", [:mix], [{:earmark_parser, "~> 1.4.39", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.1", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "317346c14febaba9ca40fd97b5b5919f7751fb85d399cc8e7e8872049f37e0af"}, "ex_dtls": {:hex, :ex_dtls, "0.16.0", "3ae38025ccc77f6db573e2e391602fa9bbc02253c137d8d2d59469a66cbe806b", [:mix], [{:bundlex, "~> 1.5.3", [hex: :bundlex, repo: "hexpm", optional: false]}, {:unifex, "~> 1.0", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "2a4e30d74c6ddf95cc5b796423293c06a0da295454c3823819808ff031b4b361"}, - "ex_ice": {:hex, :ex_ice, "0.8.0", "f9bd181e8fd2f8ac9a808587ee8a47bf667143069d75f6e4892a62156d798aa7", [:mix], [{:elixir_uuid, "~> 1.0", [hex: :elixir_uuid, repo: "hexpm", optional: false]}, {:ex_stun, "~> 0.2.0", [hex: :ex_stun, repo: "hexpm", optional: false]}, {:ex_turn, "~> 0.1.0", [hex: :ex_turn, repo: "hexpm", optional: false]}], "hexpm", "b0476f6b18986f6df48fda4cecb3be5022323572790d1bb49da10b177c936b4e"}, + "ex_ice": {:hex, :ex_ice, "0.8.1", "4d5c911766ce92e13323b632a55d9ab821092f13fc2ebf236dc233c8c1f9a64c", [:mix], [{:elixir_uuid, "~> 1.0", [hex: :elixir_uuid, repo: "hexpm", optional: false]}, {:ex_stun, "~> 0.2.0", [hex: :ex_stun, repo: "hexpm", optional: false]}, {:ex_turn, "~> 0.1.0", [hex: :ex_turn, repo: "hexpm", optional: false]}], "hexpm", "8f10134e2eb7e6aebbf8fba0d5fcec56d8f8db3e94c3dde045feb463979c2dda"}, "ex_libsrtp": {:hex, :ex_libsrtp, "0.7.2", "211bd89c08026943ce71f3e2c0231795b99cee748808ed3ae7b97cd8d2450b6b", [:mix], [{:bunch, "~> 1.6", [hex: :bunch, repo: "hexpm", optional: false]}, {:bundlex, "~> 1.3", [hex: :bundlex, repo: "hexpm", optional: false]}, {:membrane_precompiled_dependency_provider, "~> 0.1.0", [hex: :membrane_precompiled_dependency_provider, repo: "hexpm", optional: false]}, {:unifex, "~> 1.1", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "2e20645d0d739a4ecdcf8d4810a0c198120c8a2f617f2b75b2e2e704d59f492a"}, "ex_rtcp": {:hex, :ex_rtcp, "0.4.0", "f9e515462a9581798ff6413583a25174cfd2101c94a2ebee871cca7639886f0a", [:mix], [], "hexpm", "28956602cf210d692fcdaf3f60ca49681634e1deb28ace41246aee61ee22dc3b"}, "ex_rtp": {:hex, :ex_rtp, "0.4.0", "1f1b5c1440a904706011e3afbb41741f5da309ce251cb986690ce9fd82636658", [:mix], [], "hexpm", "0f72d80d5953a62057270040f0f1ee6f955c08eeae82ac659c038001d7d5a790"}, @@ -21,9 +20,9 @@ "ex_sdp": {:hex, :ex_sdp, "1.0.0", "c66cd66d60ad03ff1eecdc6db6a1b8a7b89fec260fcc22e8d6703fc5bbf430a3", [:mix], [{:bunch, "~> 1.3", [hex: :bunch, repo: "hexpm", optional: false]}, {:elixir_uuid, "~> 1.2", [hex: :elixir_uuid, repo: "hexpm", optional: false]}], "hexpm", "e165dff988b8ab9d93588636aa5f3f683e1f848fc63b78b12382c8fa3dd39216"}, "ex_stun": {:hex, :ex_stun, "0.2.0", "feb1fc7db0356406655b2a617805e6c712b93308c8ea2bf0ba1197b1f0866deb", [:mix], [], "hexpm", "1e01ba8290082ccbf37acaa5190d1f69b51edd6de2026a8d6d51368b29d115d0"}, "ex_turn": {:hex, :ex_turn, "0.1.0", "177405aadf3d754567d0d37cf881a83f9cacf8f45314d188633b04c4a9e7c1ec", [:mix], [{:ex_stun, "~> 0.2.0", [hex: :ex_stun, repo: "hexpm", optional: false]}], "hexpm", "d677737fb7d45274d5dac19fe3c26b9038b6effbc0a6b3e7417bccc76b6d1cd3"}, - "excoveralls": {:hex, :excoveralls, "0.18.2", "86efd87a0676a3198ff50b8c77620ea2f445e7d414afa9ec6c4ba84c9f8bdcc2", [:mix], [{:castore, "~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "230262c418f0de64077626a498bd4fdf1126d5c2559bb0e6b43deac3005225a4"}, - "file_system": {:hex, :file_system, "1.0.0", "b689cc7dcee665f774de94b5a832e578bd7963c8e637ef940cd44327db7de2cd", [:mix], [], "hexpm", "6752092d66aec5a10e662aefeed8ddb9531d79db0bc145bb8c40325ca1d8536d"}, - "finch": {:hex, :finch, "0.18.0", "944ac7d34d0bd2ac8998f79f7a811b21d87d911e77a786bc5810adb75632ada4", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: false]}, {:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.3", [hex: :mint, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.4 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:nimble_pool, "~> 0.2.6 or ~> 1.0", [hex: :nimble_pool, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "69f5045b042e531e53edc2574f15e25e735b522c37e2ddb766e15b979e03aa65"}, + "excoveralls": {:hex, :excoveralls, "0.18.3", "bca47a24d69a3179951f51f1db6d3ed63bca9017f476fe520eb78602d45f7756", [:mix], [{:castore, "~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "746f404fcd09d5029f1b211739afb8fb8575d775b21f6a3908e7ce3e640724c6"}, + "file_system": {:hex, :file_system, "1.0.1", "79e8ceaddb0416f8b8cd02a0127bdbababe7bf4a23d2a395b983c1f8b3f73edd", [:mix], [], "hexpm", "4414d1f38863ddf9120720cd976fce5bdde8e91d8283353f0e31850fa89feb9e"}, + "finch": {:hex, :finch, "0.19.0", "c644641491ea854fc5c1bbaef36bfc764e3f08e7185e1f084e35e0672241b76d", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.6.2 or ~> 1.7", [hex: :mint, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.4 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:nimble_pool, "~> 1.1", [hex: :nimble_pool, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "fc5324ce209125d1e2fa0fcd2634601c52a787aff1cd33ee833664a5af4ea2b6"}, "hpax": {:hex, :hpax, "1.0.0", "28dcf54509fe2152a3d040e4e3df5b265dcb6cb532029ecbacf4ce52caea3fd2", [:mix], [], "hexpm", "7f1314731d711e2ca5fdc7fd361296593fc2542570b3105595bb0bc6d0fad601"}, "jason": {:hex, :jason, "1.4.4", "b9226785a9aa77b6857ca22832cffa5d5011a667207eb2a0ad56adb5db443b8a", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "c5eb0cab91f094599f94d55bc63409236a8ec69a21a67814529e8d5f6cc90b3b"}, "makeup": {:hex, :makeup, "1.1.2", "9ba8837913bdf757787e71c1581c21f9d2455f4dd04cfca785c70bbfff1a76a3", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "cce1566b81fbcbd21eca8ffe808f33b221f9eee2cbc7a1706fc3da9ff18e6cac"}, @@ -39,7 +38,7 @@ "req": {:hex, :req, "0.5.6", "8fe1eead4a085510fe3d51ad854ca8f20a622aae46e97b302f499dfb84f726ac", [:mix], [{:brotli, "~> 0.3.1", [hex: :brotli, repo: "hexpm", optional: true]}, {:ezstd, "~> 1.0", [hex: :ezstd, repo: "hexpm", optional: true]}, {:finch, "~> 0.17", [hex: :finch, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:mime, "~> 2.0.6 or ~> 2.1", [hex: :mime, repo: "hexpm", optional: false]}, {:nimble_csv, "~> 1.0", [hex: :nimble_csv, repo: "hexpm", optional: true]}, {:plug, "~> 1.0", [hex: :plug, repo: "hexpm", optional: true]}], "hexpm", "cfaa8e720945d46654853de39d368f40362c2641c4b2153c886418914b372185"}, "rustler": {:hex, :rustler, "0.34.0", "e9a73ee419fc296a10e49b415a2eb87a88c9217aa0275ec9f383d37eed290c1c", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:req, "~> 0.5", [hex: :req, repo: "hexpm", optional: false]}, {:toml, "~> 0.6", [hex: :toml, repo: "hexpm", optional: false]}], "hexpm", "1d0c7449482b459513003230c0e2422b0252245776fe6fd6e41cb2b11bd8e628"}, "shmex": {:hex, :shmex, "0.5.1", "81dd209093416bf6608e66882cb7e676089307448a1afd4fc906c1f7e5b94cf4", [:mix], [{:bunch_native, "~> 0.5.0", [hex: :bunch_native, repo: "hexpm", optional: false]}, {:bundlex, "~> 1.0", [hex: :bundlex, repo: "hexpm", optional: false]}], "hexpm", "c29f8286891252f64c4e1dac40b217d960f7d58def597c4e606ff8fbe71ceb80"}, - "telemetry": {:hex, :telemetry, "1.2.1", "68fdfe8d8f05a8428483a97d7aab2f268aaff24b49e0f599faa091f1d4e7f61c", [:rebar3], [], "hexpm", "dad9ce9d8effc621708f99eac538ef1cbe05d6a874dd741de2e689c47feafed5"}, + "telemetry": {:hex, :telemetry, "1.3.0", "fedebbae410d715cf8e7062c96a1ef32ec22e764197f70cda73d82778d61e7a2", [:rebar3], [], "hexpm", "7015fc8919dbe63764f4b4b87a95b7c0996bd539e0d499be6ec9d7f3875b79e6"}, "toml": {:hex, :toml, "0.7.0", "fbcd773caa937d0c7a02c301a1feea25612720ac3fa1ccb8bfd9d30d822911de", [:mix], [], "hexpm", "0690246a2478c1defd100b0c9b89b4ea280a22be9a7b313a8a058a2408a2fa70"}, "unifex": {:hex, :unifex, "1.2.0", "90d1ec5e6d788350e07e474f7bd8b0ee866d6606beb9ca4e20dbb26328712a84", [:mix], [{:bunch, "~> 1.0", [hex: :bunch, repo: "hexpm", optional: false]}, {:bundlex, "~> 1.4", [hex: :bundlex, repo: "hexpm", optional: false]}, {:shmex, "~> 0.5.0", [hex: :shmex, repo: "hexpm", optional: false]}], "hexpm", "7a8395aabc3ba6cff04bbe5b995de7f899a38eb57f189e49927d6b8b6ccb6883"}, "zarex": {:hex, :zarex, "1.0.5", "58239e3ee5d75f343262bb4df5cf466555a1c689f920e5d3651a9333972f7c7e", [:mix], [], "hexpm", "9fb72ef0567c2b2742f5119a1ba8a24a2fabb21b8d09820aefbf3e592fa9a46a"}, diff --git a/test/ex_webrtc/rtp/jitter_buffer/heap_test.exs b/test/ex_webrtc/rtp/jitter_buffer/heap_test.exs new file mode 100644 index 0000000..ac66a9d --- /dev/null +++ b/test/ex_webrtc/rtp/jitter_buffer/heap_test.exs @@ -0,0 +1,74 @@ +defmodule ExWebRTC.RTP.JitterBuffer.HeapTest do + use ExUnit.Case, async: true + + alias ExWebRTC.RTP.JitterBuffer.Heap + + test "stores things and reports size" do + heap = Heap.new() + assert Heap.size(heap) == 0 + assert Heap.root(heap) == nil + + heap = Heap.push(heap, 123) + assert Heap.size(heap) == 1 + assert Heap.root(heap) == 123 + + heap = Heap.push(heap, 0) + assert Heap.size(heap) == 2 + assert Heap.root(heap) == 0 + + heap = Heap.pop(heap) + heap = Heap.pop(heap) + assert Heap.size(heap) == 0 + assert Heap.root(heap) == nil + + # popping on empty heap shouldn't raise + _heap = Heap.pop(heap) + end + + test "sorts integers" do + test_base = 1..100 + heap = shuffle_into_heap(test_base, Heap.new()) + + Enum.reduce(test_base, heap, fn num, heap -> + assert Heap.root(heap) == num + Heap.pop(heap) + end) + end + + test "sorts using comparator" do + test_base = 1..100//-1 + heap = shuffle_into_heap(test_base, Heap.new(&>/2)) + + Enum.reduce(test_base, heap, fn num, heap -> + assert Heap.root(heap) == num + Heap.pop(heap) + end) + end + + test "implements Enumerable" do + heap = Heap.new() + + assert Enum.member?(heap, 123) == false + heap = Heap.push(heap, 123) + assert Enum.member?(heap, 123) == true + + assert Enum.count(heap) == 1 + + test_base = 1..100 + + heap = shuffle_into_heap(test_base, Heap.new()) + + test_base + |> Enum.zip(heap) + |> Enum.each(fn {num, elem} -> + assert num == elem + end) + end + + defp shuffle_into_heap(range, heap) do + range + |> Enum.into([]) + |> Enum.shuffle() + |> Enum.reduce(heap, fn elem, heap -> Heap.push(heap, elem) end) + end +end diff --git a/test/ex_webrtc/rtp/jitter_buffer/packet_store_test.exs b/test/ex_webrtc/rtp/jitter_buffer/packet_store_test.exs new file mode 100644 index 0000000..5c8ebd4 --- /dev/null +++ b/test/ex_webrtc/rtp/jitter_buffer/packet_store_test.exs @@ -0,0 +1,293 @@ +defmodule ExWebRTC.RTP.JitterBuffer.PacketStoreTest do + use ExUnit.Case, async: true + + alias ExWebRTC.RTP.PacketFactory + alias ExWebRTC.RTP.JitterBuffer.PacketStore.Entry + alias ExWebRTC.RTP.JitterBuffer.{Heap, PacketStore} + + @seq_number_limit 65_536 + @base_index 65_505 + @next_index @base_index + 1 + + setup_all do + [base_store: new_testing_store(@base_index)] + end + + describe "When adding packet to the PacketStore it" do + test "accepts the first packet" do + packet = PacketFactory.sample_packet(@base_index) + + assert {:ok, updated_store} = PacketStore.insert(%PacketStore{}, packet) + assert has_packet(updated_store, packet) + end + + test "refuses packet with a seq_number smaller than last served", %{base_store: store} do + packet = PacketFactory.sample_packet(@base_index - 1) + + assert {:error, :late_packet} = PacketStore.insert(store, packet) + end + + test "accepts a packet that got in time", %{base_store: store} do + packet = PacketFactory.sample_packet(@next_index) + assert {:ok, updated_store} = PacketStore.insert(store, packet) + assert has_packet(updated_store, packet) + end + + test "puts it to the rollover if a sequence number has rolled over", %{base_store: store} do + packet = PacketFactory.sample_packet(10) + assert {:ok, store} = PacketStore.insert(store, packet) + assert has_packet(store, packet) + end + + test "handles first packets starting with sequence_number 0" do + store = %PacketStore{} + packet_a = PacketFactory.sample_packet(0) + assert {:ok, store} = PacketStore.insert(store, packet_a) + + {record_a, store} = PacketStore.flush_one(store) + + assert record_a.index == @seq_number_limit + assert record_a.packet.sequence_number == 0 + + packet_b = PacketFactory.sample_packet(1) + assert {:ok, store} = PacketStore.insert(store, packet_b) + + {record_b, _store} = PacketStore.flush_one(store) + assert record_b.index == @seq_number_limit + 1 + assert record_b.packet.sequence_number == 1 + end + + test "handles packets with very big gaps" do + store = %PacketStore{} + first_packet = PacketFactory.sample_packet(20_072) + assert {:ok, store} = PacketStore.insert(store, first_packet) + + second_packet = PacketFactory.sample_packet(52_840) + assert {:ok, store} = PacketStore.insert(store, second_packet) + + third_packet = PacketFactory.sample_packet(52_841) + assert {:ok, _store} = PacketStore.insert(store, third_packet) + end + + test "handles late packets when starting with sequence_number 0" do + store = %PacketStore{} + packet = PacketFactory.sample_packet(0) + assert {:ok, store} = PacketStore.insert(store, packet) + + packet = PacketFactory.sample_packet(1) + assert {:ok, store} = PacketStore.insert(store, packet) + + packet = PacketFactory.sample_packet(@seq_number_limit - 1) + assert {:error, :late_packet} = PacketStore.insert(store, packet) + end + + test "handles rollover before any packet was sent" do + store = %PacketStore{} + packet = PacketFactory.sample_packet(@seq_number_limit - 1) + assert {:ok, store} = PacketStore.insert(store, packet) + + packet = PacketFactory.sample_packet(0) + assert {:ok, store} = PacketStore.insert(store, packet) + + packet = PacketFactory.sample_packet(1) + assert {:ok, store} = PacketStore.insert(store, packet) + + seq_numbers = + store + |> dump_store() + |> Enum.map(& &1.packet.sequence_number) + + assert seq_numbers == [65_535, 0, 1] + + indexes = + store + |> dump_store() + |> Enum.map(& &1.index) + + assert indexes == [@seq_number_limit - 1, @seq_number_limit, @seq_number_limit + 1] + end + + test "handles late packet after rollover" do + store = %PacketStore{} + first_packet = PacketFactory.sample_packet(@seq_number_limit - 1) + assert {:ok, store} = PacketStore.insert(store, first_packet) + + second_packet = PacketFactory.sample_packet(0) + assert {:ok, store} = PacketStore.insert(store, second_packet) + + packet = PacketFactory.sample_packet(1) + assert {:ok, store} = PacketStore.insert(store, packet) + + assert {%Entry{packet: ^first_packet}, store} = PacketStore.flush_one(store) + assert {%Entry{packet: ^second_packet}, store} = PacketStore.flush_one(store) + + packet = PacketFactory.sample_packet(@seq_number_limit - 2) + assert {:error, :late_packet} = PacketStore.insert(store, packet) + + seq_numbers = + store + |> dump_store() + |> Enum.map(& &1.packet.sequence_number) + + assert seq_numbers == [1] + end + end + + describe "When getting a packet from PacketStore it" do + setup %{base_store: base_store} do + packet = PacketFactory.sample_packet(@next_index) + {:ok, store} = PacketStore.insert(base_store, packet) + + [ + store: store, + packet: packet + ] + end + + test "returns the root packet and initializes it", %{store: store, packet: packet} do + assert {%Entry{} = record, empty_store} = PacketStore.flush_one(store) + assert record.packet == packet + assert Heap.size(empty_store.heap) == 0 + assert empty_store.flush_index == record.index + end + + test "returns nil when store is empty and bumps flush_index", %{base_store: store} do + assert {nil, new_store} = PacketStore.flush_one(store) + assert new_store.flush_index == store.flush_index + 1 + end + + test "returns nil when heap is not empty, but the next packet is not present", %{ + store: store + } do + broken_store = %PacketStore{store | flush_index: @base_index - 1} + assert {nil, new_store} = PacketStore.flush_one(broken_store) + assert new_store.flush_index == @base_index + end + + test "sorts packets by index number", %{base_store: store} do + test_base = 1..100 + + test_base + |> Enum.into([]) + |> Enum.shuffle() + |> enum_into_store(store) + |> (fn store -> store.heap end).() + |> Enum.zip(test_base) + |> Enum.each(fn {record, base_element} -> + assert %Entry{index: index} = record + assert rem(index, 65_536) == base_element + end) + end + + test "handles rollover", %{base_store: base_store} do + store = %PacketStore{base_store | flush_index: 65_533} + before_rollover_seq_nums = 65_534..65_535 + after_rollover_seq_nums = 0..10 + + combined = Enum.into(before_rollover_seq_nums, []) ++ Enum.into(after_rollover_seq_nums, []) + combined_store = enum_into_store(combined, store) + + store = + Enum.reduce(combined, combined_store, fn elem, store -> + {record, store} = PacketStore.flush_one(store) + assert %Entry{packet: packet} = record + assert %ExRTP.Packet{sequence_number: seq_number} = packet + assert seq_number == elem + store + end) + + assert store.rollover_count == 1 + end + + test "handles empty rollover", %{base_store: base_store} do + store = %PacketStore{base_store | flush_index: 65_533} + base_data = Enum.into(65_534..65_535, []) + store = enum_into_store(base_data, store) + + Enum.reduce(base_data, store, fn elem, store -> + {record, store} = PacketStore.flush_one(store) + assert %Entry{index: ^elem} = record + store + end) + end + + test "handles later rollovers" do + m = @seq_number_limit + + flush_index = 3 * m - 6 + + store = %PacketStore{ + flush_index: flush_index, + highest_incoming_index: flush_index, + rollover_count: 2 + } + + store = + (Enum.into((m - 5)..(m - 1), []) ++ Enum.into(0..4, [])) + |> enum_into_store(store) + + store_content = PacketStore.dump(store) + assert length(store_content) == 10 + end + + test "handles late packets after a rollover" do + indexes = [65_535, 0, 65_534] + + store = + enum_into_store(indexes, %PacketStore{flush_index: 65_533, highest_incoming_index: 65_533}) + + Enum.each(indexes, fn _index -> + assert {%Entry{}, _store} = PacketStore.flush_one(store) + end) + end + end + + describe "When dumping it" do + test "returns list that contains packets from heap" do + store = enum_into_store(1..10) + result = PacketStore.dump(store) + assert is_list(result) + assert Enum.count(result) == 10 + end + + test "returns empty list if no records are inside" do + assert PacketStore.dump(%PacketStore{}) == [] + end + end + + defp new_testing_store(index) do + %PacketStore{ + flush_index: index, + highest_incoming_index: index, + heap: Heap.new(&Entry.comparator/2) + } + end + + defp enum_into_store(enumerable, store \\ %PacketStore{}) do + Enum.reduce(enumerable, store, fn elem, acc -> + packet = PacketFactory.sample_packet(elem) + {:ok, store} = PacketStore.insert(acc, packet) + store + end) + end + + defp has_packet(%PacketStore{heap: heap}, %ExRTP.Packet{sequence_number: seq_num}) do + assert is_integer(seq_num) + + heap + |> Enum.to_list() + |> Enum.map(& &1.packet.sequence_number) + |> Enum.member?(seq_num) + end + + defp dump_store(store, acc \\ []) do + case PacketStore.first_packet_timestamp(store) do + nil -> + Enum.reverse(acc) + + _ts -> + {entry, store} = PacketStore.flush_one(store) + dump_store(store, [entry | acc]) + end + end +end diff --git a/test/ex_webrtc/rtp/jitter_buffer/realtime_test.exs b/test/ex_webrtc/rtp/jitter_buffer/realtime_test.exs new file mode 100644 index 0000000..5b711aa --- /dev/null +++ b/test/ex_webrtc/rtp/jitter_buffer/realtime_test.exs @@ -0,0 +1,117 @@ +defmodule ExWebRTC.RTP.JitterBuffer.RealtimeTest do + use ExUnit.Case + + alias ExWebRTC.RTP.{JitterBuffer, PacketFactory} + alias ExRTP.Packet + + @seq_number_limit 65_536 + + defmodule PacketSource do + @moduledoc false + use GenServer + + @seq_number_limit 65_536 + + @impl true + def init(state) do + {:ok, state, {:continue, :after_init}} + end + + @impl true + def handle_continue( + :after_init, + %{ + packet_delay_ms: delay_ms, + packet_num: packet_num, + max_latency: max_latency + } = state + ) do + now = System.monotonic_time(:millisecond) + + 1..packet_num + |> Enum.each(fn n -> + time = + cond do + # Delay less than max latency + rem(n, 15) == 0 -> n * delay_ms + div(max_latency, 2) + # Delay more than max latency + rem(n, 19) == 0 -> n * delay_ms + max_latency * 2 + true -> n * delay_ms + end + + if rem(n, 50) < 30 or rem(n, 50) > 32 do + seq_number = rem(n, @seq_number_limit) + Process.send_after(self(), {:push_packet, seq_number}, now + time, abs: true) + end + end) + + {:noreply, state} + end + + @impl true + def handle_info({:push_packet, n}, %{buffer: buffer} = state) do + buffer + |> JitterBuffer.insert(PacketFactory.sample_packet(n)) + |> handle_jitter_buffer_result(state) + end + + @impl true + def handle_info(:jitter_buffer_timer, %{buffer: buffer} = state) do + buffer + |> JitterBuffer.handle_timeout() + |> handle_jitter_buffer_result(state) + end + + defp handle_jitter_buffer_result({packets, timer, buffer}, state) do + for packet <- packets do + send(state.owner, packet) + end + + unless is_nil(timer), do: Process.send_after(self(), :jitter_buffer_timer, timer) + + {:noreply, %{state | buffer: buffer}} + end + end + + test "Jitter Buffer works in a pipeline with small latency" do + test_pipeline(300, 10, 200) + end + + test "Jitter Buffer works in a pipeline with large latency" do + test_pipeline(100, 30, 1000) + end + + @tag :long_running + @tag timeout: 70_000 * 10 + 20_000 + test "Jitter Buffer works in a long-running pipeline with small latency" do + test_pipeline(70_000, 10, 100) + end + + defp test_pipeline(packets, packet_delay_ms, latency_ms) do + {:ok, _pid} = + GenServer.start_link(PacketSource, %{ + owner: self(), + buffer: JitterBuffer.new(latency: latency_ms), + packet_num: packets, + packet_delay_ms: packet_delay_ms, + max_latency: latency_ms + }) + + timeout = latency_ms + packet_delay_ms + 200 + + Enum.each(1..packets, fn n -> + seq_num = rem(n, @seq_number_limit) + + cond do + rem(n, 50) >= 30 and rem(n, 50) <= 32 -> + refute_receive %Packet{sequence_number: ^seq_num}, timeout + + rem(n, 19) == 0 and rem(n, 15) != 0 -> + refute_receive %Packet{sequence_number: ^seq_num}, timeout + + true -> + assert_receive %Packet{sequence_number: ^seq_num}, timeout + end + end) + end +end diff --git a/test/ex_webrtc/rtp/jitter_buffer_test.exs b/test/ex_webrtc/rtp/jitter_buffer_test.exs new file mode 100644 index 0000000..80cc1e9 --- /dev/null +++ b/test/ex_webrtc/rtp/jitter_buffer_test.exs @@ -0,0 +1,126 @@ +defmodule ExWebRTC.RTP.JitterBufferTest do + use ExUnit.Case, async: true + + alias ExWebRTC.RTP.JitterBuffer.PacketStore + alias ExWebRTC.RTP.{JitterBuffer, PacketFactory} + + @base_seq_number PacketFactory.base_seq_number() + @buffer_latency_ms 10 + + setup do + packet = PacketFactory.sample_packet(@base_seq_number) + + buffer = JitterBuffer.new(latency: @buffer_latency_ms) + buffer = %{buffer | state: :timer_not_set} + + [buffer: buffer, packet: packet] + end + + describe "When JitterBuffer is in initial_wait state" do + setup do + [buffer: JitterBuffer.new(latency: @buffer_latency_ms)] + end + + test "first packet starts timer that changes state", %{buffer: buffer, packet: packet} do + assert buffer.state == :initial_wait + {[], timer, buffer} = JitterBuffer.insert(buffer, packet) + assert timer == buffer.latency + {_packets, _timer, buffer} = JitterBuffer.handle_timeout(buffer) + assert buffer.state != :initial_wait + end + + test "any new packet is kept", %{buffer: buffer, packet: packet} do + {[], _timer, buffer} = JitterBuffer.flush(buffer) + {[], _timer, buffer} = JitterBuffer.insert(buffer, packet) + + {[^packet], _timer, buffer} = JitterBuffer.flush(buffer) + {[], _timer, _buffer} = JitterBuffer.flush(buffer) + end + end + + describe "When new packet arrives when not waiting and already pushed some packet" do + setup %{buffer: buffer} do + flush_index = @base_seq_number - 1 + store = %{buffer.store | flush_index: flush_index, highest_incoming_index: flush_index} + [buffer: %{buffer | state: :timer_not_set, store: store}] + end + + test "outputs it immediately if it is in order", %{buffer: buffer, packet: packet} do + {[^packet], _timer, buffer} = JitterBuffer.insert(buffer, packet) + {[], _timer, _buffer} = JitterBuffer.flush(buffer) + end + + test "refuses to add that packet when it comes too late", %{buffer: buffer} do + late_packet = PacketFactory.sample_packet(@base_seq_number - 2) + {[], nil, new_buffer} = JitterBuffer.insert(buffer, late_packet) + assert new_buffer == buffer + end + + test "adds it and when it fills the gap, returns all packets in order", %{buffer: buffer} do + first_packet = PacketFactory.sample_packet(@base_seq_number) + second_packet = PacketFactory.sample_packet(@base_seq_number + 1) + third_packet = PacketFactory.sample_packet(@base_seq_number + 2) + + flush_index = @base_seq_number - 1 + + store = %PacketStore{ + buffer.store + | flush_index: flush_index, + highest_incoming_index: flush_index + } + + buffer = %{buffer | store: store} + + {[], _timer, buffer} = JitterBuffer.insert(buffer, second_packet) + {[], _timer, buffer} = JitterBuffer.insert(buffer, third_packet) + + {packets, _timer, buffer} = JitterBuffer.insert(buffer, first_packet) + + assert packets == [first_packet, second_packet, third_packet] + + {[], _timer, _buffer} = JitterBuffer.flush(buffer) + end + end + + describe "When latency passes without filling the gap, JitterBuffer" do + test "outputs the late packet", %{buffer: buffer, packet: packet} do + flush_index = @base_seq_number - 2 + + store = %PacketStore{ + buffer.store + | flush_index: flush_index, + highest_incoming_index: flush_index + } + + buffer = %{buffer | store: store, state: :timer_not_set} + + {[], timer, buffer} = JitterBuffer.insert(buffer, packet) + assert timer != nil + assert buffer.state == :timer_set + + Process.sleep(buffer.latency + 5) + {[^packet], _timer, _buffer} = JitterBuffer.handle_timeout(buffer) + end + end + + describe "When asked to flush, JitterBuffer" do + test "dumps store and resets itself", %{buffer: buffer, packet: packet} do + flush_index = @base_seq_number - 2 + + store = %PacketStore{ + buffer.store + | flush_index: flush_index, + highest_incoming_index: flush_index + } + + buffer = %{buffer | store: store} + {[], _timer, buffer} = JitterBuffer.insert(buffer, packet) + + {[^packet], nil, buffer} = JitterBuffer.flush(buffer) + + assert buffer.store == %PacketStore{} + + {[], nil, _buffer} = JitterBuffer.flush(buffer) + end + end +end diff --git a/test/support/packet_factory.ex b/test/support/packet_factory.ex new file mode 100644 index 0000000..69e54cd --- /dev/null +++ b/test/support/packet_factory.ex @@ -0,0 +1,24 @@ +defmodule ExWebRTC.RTP.PacketFactory do + @moduledoc false + + alias ExRTP.Packet + + @timestamp_increment 30_000 + @base_seq_number 50 + + @spec base_seq_number() :: Packet.uint16() + def base_seq_number(), do: @base_seq_number + + @spec sample_packet(Packet.uint16()) :: Packet.t() + def sample_packet(seq_num) do + seq_num_offset = seq_num - @base_seq_number + + Packet.new( + <<0, 255>>, + payload_type: 127, + ssrc: 0xDEADCAFE, + timestamp: seq_num_offset * @timestamp_increment, + sequence_number: seq_num + ) + end +end diff --git a/test/test_helper.exs b/test/test_helper.exs index fb40ccc..d40f0ff 100644 --- a/test/test_helper.exs +++ b/test/test_helper.exs @@ -1 +1 @@ -ExUnit.start(capture_log: true, assert_receive_timeout: 2000) +ExUnit.start(capture_log: true, exclude: [:long_running], assert_receive_timeout: 2000)