Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add jitter buffer #158

Merged
merged 10 commits into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 84 additions & 18 deletions examples/save_to_file/lib/save_to_file/peer_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ defmodule SaveToFile.PeerHandler do
}
mickel8 marked this conversation as resolved.
Show resolved Hide resolved

alias ExWebRTC.Media.{IVF, Ogg}
alias ExWebRTC.RTP.Depayloader
alias ExWebRTC.RTP.{Depayloader, JitterBuffer}

@behaviour WebSock

@jitter_buffer_latency_ms 100

@video_file "./video.ivf"
@audio_file "./audio.ogg"

Expand Down Expand Up @@ -53,8 +55,10 @@ defmodule SaveToFile.PeerHandler do
audio_track_id: nil,
video_writer: nil,
video_depayloader: nil,
video_buffer: nil,
audio_writer: nil,
audio_depayloader: nil,
audio_buffer: nil,
frames_cnt: 0
}

Expand All @@ -73,10 +77,22 @@ defmodule SaveToFile.PeerHandler do
handle_webrtc_msg(msg, state)
end

@impl true
def handle_info({:jitter_buffer_timer, kind}, state) do
case kind do
:video -> state.video_buffer
:audio -> state.audio_buffer
end
|> JitterBuffer.handle_timer()
|> handle_jitter_buffer_result(kind, state)
end

@impl true
def terminate(reason, state) do
Logger.warning("WebSocket connection was terminated, reason: #{inspect(reason)}")

state = flush_jitter_buffers(state)

if state.video_writer, do: IVF.Writer.close(state.video_writer)
if state.audio_writer, do: Ogg.Writer.close(state.audio_writer)
end
Expand Down Expand Up @@ -142,11 +158,13 @@ defmodule SaveToFile.PeerHandler do
)

{:ok, video_depayloader} = @video_codecs |> hd() |> Depayloader.new()
video_buffer = JitterBuffer.new(latency: @jitter_buffer_latency_ms)

state = %{
state
| video_depayloader: video_depayloader,
video_writer: video_writer,
video_buffer: video_buffer,
video_track_id: id
}

Expand All @@ -157,11 +175,13 @@ defmodule SaveToFile.PeerHandler do
# by default uses 1 mono channel and 48k clock rate
{:ok, audio_writer} = Ogg.Writer.open(@audio_file)
{:ok, audio_depayloader} = @audio_codecs |> hd() |> Depayloader.new()
audio_buffer = JitterBuffer.new(latency: @jitter_buffer_latency_ms)

state = %{
state
| audio_depayloader: audio_depayloader,
audio_writer: audio_writer,
audio_buffer: audio_buffer,
audio_track_id: id
}

Expand All @@ -175,32 +195,78 @@ defmodule SaveToFile.PeerHandler do
end

defp handle_webrtc_msg({:rtp, id, nil, packet}, %{video_track_id: id} = state) do
state.video_buffer
|> JitterBuffer.place_packet(packet)
|> handle_jitter_buffer_result(:video, state)
end

defp handle_webrtc_msg({:rtp, id, nil, packet}, %{audio_track_id: id} = state) do
state.audio_buffer
|> JitterBuffer.place_packet(packet)
|> handle_jitter_buffer_result(:audio, state)
end

defp handle_webrtc_msg(_msg, state), do: {:ok, state}

defp handle_jitter_buffer_result({packets, timer, buffer}, kind, state) do
state =
case Depayloader.depayload(state.video_depayloader, packet) do
{nil, video_depayloader} ->
%{state | video_depayloader: video_depayloader}

{vp8_frame, video_depayloader} ->
frame = %IVF.Frame{timestamp: state.frames_cnt, data: vp8_frame}
{:ok, video_writer} = IVF.Writer.write_frame(state.video_writer, frame)

%{
state
| video_depayloader: video_depayloader,
video_writer: video_writer,
frames_cnt: state.frames_cnt + 1
}
case kind do
:video -> %{state | video_buffer: buffer}
:audio -> %{state | audio_buffer: buffer}
end

state =
Enum.reduce(packets, state, fn packet, state -> handle_packet(packet, kind, state) end)

unless is_nil(timer), do: Process.send_after(self(), {:jitter_buffer_timer, kind}, timer)

{:ok, state}
end

defp handle_webrtc_msg({:rtp, id, nil, packet}, %{audio_track_id: id} = state) do
defp handle_packet(packet, :video, state) do
case Depayloader.depayload(state.video_depayloader, packet) do
{nil, video_depayloader} ->
%{state | video_depayloader: video_depayloader}

{vp8_frame, video_depayloader} ->
frame = %IVF.Frame{timestamp: state.frames_cnt, data: vp8_frame}
{:ok, video_writer} = IVF.Writer.write_frame(state.video_writer, frame)

%{
state
| video_depayloader: video_depayloader,
video_writer: video_writer,
frames_cnt: state.frames_cnt + 1
}
end
end

defp handle_packet(packet, :audio, state) do
{opus_packet, depayloader} = Depayloader.depayload(state.audio_depayloader, packet)
{:ok, audio_writer} = Ogg.Writer.write_packet(state.audio_writer, opus_packet)

{:ok, %{state | audio_depayloader: depayloader, audio_writer: audio_writer}}
%{state | audio_depayloader: depayloader, audio_writer: audio_writer}
end

defp handle_webrtc_msg(_msg, state), do: {:ok, state}
defp flush_jitter_buffers(state),
do: state |> flush_jitter_buffer(:video) |> flush_jitter_buffer(:audio)

defp flush_jitter_buffer(state, kind) do
buffer =
case kind do
:video -> state.video_buffer
:audio -> state.audio_buffer
end

if is_nil(buffer) do
state
else
{:ok, state} =
buffer
|> JitterBuffer.flush()
|> handle_jitter_buffer_result(kind, state)

state
end
end
end
6 changes: 2 additions & 4 deletions examples/save_to_file/mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,11 @@
"bunch": {:hex, :bunch, "1.6.1", "5393d827a64d5f846092703441ea50e65bc09f37fd8e320878f13e63d410aec7", [:mix], [], "hexpm", "286cc3add551628b30605efbe2fca4e38cc1bea89bcd0a1a7226920b3364fe4a"},
"bunch_native": {:hex, :bunch_native, "0.5.0", "8ac1536789a597599c10b652e0b526d8833348c19e4739a0759a2bedfd924e63", [:mix], [{:bundlex, "~> 1.0", [hex: :bundlex, repo: "hexpm", optional: false]}], "hexpm", "24190c760e32b23b36edeb2dc4852515c7c5b3b8675b1a864e0715bdd1c8f80d"},
"bundlex": {:hex, :bundlex, "1.5.3", "35d01e5bc0679510dd9a327936ffb518f63f47175c26a35e708cc29eaec0890b", [:mix], [{:bunch, "~> 1.0", [hex: :bunch, repo: "hexpm", optional: false]}, {:elixir_uuid, "~> 1.2", [hex: :elixir_uuid, repo: "hexpm", optional: false]}, {:qex, "~> 0.5", [hex: :qex, repo: "hexpm", optional: false]}, {:req, ">= 0.4.0", [hex: :req, repo: "hexpm", optional: false]}, {:zarex, "~> 1.0", [hex: :zarex, repo: "hexpm", optional: false]}], "hexpm", "debd0eac151b404f6216fc60222761dff049bf26f7d24d066c365317650cd118"},
"castore": {:hex, :castore, "1.0.8", "dedcf20ea746694647f883590b82d9e96014057aff1d44d03ec90f36a5c0dc6e", [:mix], [], "hexpm", "0b2b66d2ee742cb1d9cb8c8be3b43c3a70ee8651f37b75a8b982e036752983f1"},
"crc": {:hex, :crc, "0.10.5", "ee12a7c056ac498ef2ea985ecdc9fa53c1bfb4e53a484d9f17ff94803707dfd8", [:mix, :rebar3], [{:elixir_make, "~> 0.6", [hex: :elixir_make, repo: "hexpm", optional: false]}], "hexpm", "3e673b6495a9525c5c641585af1accba59a1eb33de697bedf341e247012c2c7f"},
"elixir_make": {:hex, :elixir_make, "0.8.4", "4960a03ce79081dee8fe119d80ad372c4e7badb84c493cc75983f9d3bc8bde0f", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:certifi, "~> 2.0", [hex: :certifi, repo: "hexpm", optional: true]}], "hexpm", "6e7f1d619b5f61dfabd0a20aa268e575572b542ac31723293a4c1a567d5ef040"},
"elixir_uuid": {:hex, :elixir_uuid, "1.2.1", "dce506597acb7e6b0daeaff52ff6a9043f5919a4c3315abb4143f0b00378c097", [:mix], [], "hexpm", "f7eba2ea6c3555cea09706492716b0d87397b88946e6380898c2889d68585752"},
"ex_dtls": {:hex, :ex_dtls, "0.16.0", "3ae38025ccc77f6db573e2e391602fa9bbc02253c137d8d2d59469a66cbe806b", [:mix], [{:bundlex, "~> 1.5.3", [hex: :bundlex, repo: "hexpm", optional: false]}, {:unifex, "~> 1.0", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "2a4e30d74c6ddf95cc5b796423293c06a0da295454c3823819808ff031b4b361"},
"ex_ice": {:hex, :ex_ice, "0.8.0", "f9bd181e8fd2f8ac9a808587ee8a47bf667143069d75f6e4892a62156d798aa7", [:mix], [{:elixir_uuid, "~> 1.0", [hex: :elixir_uuid, repo: "hexpm", optional: false]}, {:ex_stun, "~> 0.2.0", [hex: :ex_stun, repo: "hexpm", optional: false]}, {:ex_turn, "~> 0.1.0", [hex: :ex_turn, repo: "hexpm", optional: false]}], "hexpm", "b0476f6b18986f6df48fda4cecb3be5022323572790d1bb49da10b177c936b4e"},
"ex_ice": {:hex, :ex_ice, "0.8.1", "4d5c911766ce92e13323b632a55d9ab821092f13fc2ebf236dc233c8c1f9a64c", [:mix], [{:elixir_uuid, "~> 1.0", [hex: :elixir_uuid, repo: "hexpm", optional: false]}, {:ex_stun, "~> 0.2.0", [hex: :ex_stun, repo: "hexpm", optional: false]}, {:ex_turn, "~> 0.1.0", [hex: :ex_turn, repo: "hexpm", optional: false]}], "hexpm", "8f10134e2eb7e6aebbf8fba0d5fcec56d8f8db3e94c3dde045feb463979c2dda"},
"ex_libsrtp": {:hex, :ex_libsrtp, "0.7.2", "211bd89c08026943ce71f3e2c0231795b99cee748808ed3ae7b97cd8d2450b6b", [:mix], [{:bunch, "~> 1.6", [hex: :bunch, repo: "hexpm", optional: false]}, {:bundlex, "~> 1.3", [hex: :bundlex, repo: "hexpm", optional: false]}, {:membrane_precompiled_dependency_provider, "~> 0.1.0", [hex: :membrane_precompiled_dependency_provider, repo: "hexpm", optional: false]}, {:unifex, "~> 1.1", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "2e20645d0d739a4ecdcf8d4810a0c198120c8a2f617f2b75b2e2e704d59f492a"},
"ex_rtcp": {:hex, :ex_rtcp, "0.4.0", "f9e515462a9581798ff6413583a25174cfd2101c94a2ebee871cca7639886f0a", [:mix], [], "hexpm", "28956602cf210d692fcdaf3f60ca49681634e1deb28ace41246aee61ee22dc3b"},
"ex_rtp": {:hex, :ex_rtp, "0.4.0", "1f1b5c1440a904706011e3afbb41741f5da309ce251cb986690ce9fd82636658", [:mix], [], "hexpm", "0f72d80d5953a62057270040f0f1ee6f955c08eeae82ac659c038001d7d5a790"},
Expand All @@ -22,7 +21,6 @@
"mime": {:hex, :mime, "2.0.6", "8f18486773d9b15f95f4f4f1e39b710045fa1de891fada4516559967276e4dc2", [:mix], [], "hexpm", "c9945363a6b26d747389aac3643f8e0e09d30499a138ad64fe8fd1d13d9b153e"},
"mint": {:hex, :mint, "1.6.2", "af6d97a4051eee4f05b5500671d47c3a67dac7386045d87a904126fd4bbcea2e", [:mix], [{:castore, "~> 0.1.0 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:hpax, "~> 0.1.1 or ~> 0.2.0 or ~> 1.0", [hex: :hpax, repo: "hexpm", optional: false]}], "hexpm", "5ee441dffc1892f1ae59127f74afe8fd82fda6587794278d924e4d90ea3d63f9"},
"nimble_options": {:hex, :nimble_options, "1.1.1", "e3a492d54d85fc3fd7c5baf411d9d2852922f66e69476317787a7b2bb000a61b", [:mix], [], "hexpm", "821b2470ca9442c4b6984882fe9bb0389371b8ddec4d45a9504f00a66f650b44"},
"nimble_ownership": {:hex, :nimble_ownership, "0.3.1", "99d5244672fafdfac89bfad3d3ab8f0d367603ce1dc4855f86a1c75008bce56f", [:mix], [], "hexpm", "4bf510adedff0449a1d6e200e43e57a814794c8b5b6439071274d248d272a549"},
"nimble_pool": {:hex, :nimble_pool, "1.1.0", "bf9c29fbdcba3564a8b800d1eeb5a3c58f36e1e11d7b7fb2e084a643f645f06b", [:mix], [], "hexpm", "af2e4e6b34197db81f7aad230c1118eac993acc0dae6bc83bac0126d4ae0813a"},
"plug": {:hex, :plug, "1.15.3", "712976f504418f6dff0a3e554c40d705a9bcf89a7ccef92fc6a5ef8f16a30a97", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.1.1 or ~> 1.2 or ~> 2.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "cc4365a3c010a56af402e0809208873d113e9c38c401cabd88027ef4f5c01fd2"},
"plug_crypto": {:hex, :plug_crypto, "2.1.0", "f44309c2b06d249c27c8d3f65cfe08158ade08418cf540fd4f72d4d6863abb7b", [:mix], [], "hexpm", "131216a4b030b8f8ce0f26038bc4421ae60e4bb95c5cf5395e1421437824c4fa"},
Expand All @@ -33,6 +31,6 @@
"thousand_island": {:hex, :thousand_island, "1.3.5", "6022b6338f1635b3d32406ff98d68b843ba73b3aa95cfc27154223244f3a6ca5", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "2be6954916fdfe4756af3239fb6b6d75d0b8063b5df03ba76fd8a4c87849e180"},
"unifex": {:hex, :unifex, "1.2.0", "90d1ec5e6d788350e07e474f7bd8b0ee866d6606beb9ca4e20dbb26328712a84", [:mix], [{:bunch, "~> 1.0", [hex: :bunch, repo: "hexpm", optional: false]}, {:bundlex, "~> 1.4", [hex: :bundlex, repo: "hexpm", optional: false]}, {:shmex, "~> 0.5.0", [hex: :shmex, repo: "hexpm", optional: false]}], "hexpm", "7a8395aabc3ba6cff04bbe5b995de7f899a38eb57f189e49927d6b8b6ccb6883"},
"websock": {:hex, :websock, "0.5.3", "2f69a6ebe810328555b6fe5c831a851f485e303a7c8ce6c5f675abeb20ebdadc", [:mix], [], "hexpm", "6105453d7fac22c712ad66fab1d45abdf049868f253cf719b625151460b8b453"},
"websock_adapter": {:hex, :websock_adapter, "0.5.6", "0437fe56e093fd4ac422de33bf8fc89f7bc1416a3f2d732d8b2c8fd54792fe60", [:mix], [{:bandit, ">= 0.6.0", [hex: :bandit, repo: "hexpm", optional: true]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.6", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:websock, "~> 0.5", [hex: :websock, repo: "hexpm", optional: false]}], "hexpm", "e04378d26b0af627817ae84c92083b7e97aca3121196679b73c73b99d0d133ea"},
"websock_adapter": {:hex, :websock_adapter, "0.5.7", "65fa74042530064ef0570b75b43f5c49bb8b235d6515671b3d250022cb8a1f9e", [:mix], [{:bandit, ">= 0.6.0", [hex: :bandit, repo: "hexpm", optional: true]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.6", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:websock, "~> 0.5", [hex: :websock, repo: "hexpm", optional: false]}], "hexpm", "d0f478ee64deddfec64b800673fd6e0c8888b079d9f3444dd96d2a98383bdbd1"},
"zarex": {:hex, :zarex, "1.0.5", "58239e3ee5d75f343262bb4df5cf466555a1c689f920e5d3651a9333972f7c7e", [:mix], [], "hexpm", "9fb72ef0567c2b2742f5119a1ba8a24a2fabb21b8d09820aefbf3e592fa9a46a"},
}
164 changes: 164 additions & 0 deletions lib/ex_webrtc/rtp/jitter_buffer.ex
LVala marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
defmodule ExWebRTC.RTP.JitterBuffer do
sgfn marked this conversation as resolved.
Show resolved Hide resolved
@moduledoc """
Buffers and reorders RTP packets based on `sequence_number`, introducing controlled latency
in order to combat network jitter and improve the QoS.
"""

# Heavily inspired by:
# https://github.com/membraneframework/membrane_rtp_plugin/blob/23f3279540aea7dea3a194fd5a1680c2549aebae/lib/membrane/rtp/jitter_buffer.ex

alias ExWebRTC.RTP.JitterBuffer.PacketStore
alias ExRTP.Packet

@default_latency_ms 200

@typedoc """
Options that can be passed to `new/1`.

* `latency` - latency introduced by the buffer, in milliseconds. `#{@default_latency_ms}` by default.
"""
@type options :: [latency: non_neg_integer()]

@typedoc """
Time (in milliseconds) after which `handle_timeout/1` should be called.
Can be `nil`, in which case no timer needs to be set.
"""
@type timer :: non_neg_integer() | nil

@typedoc """
The 3-element tuple returned by all functions other than `new/1`.

* `packets` - a list with packets flushed from the buffer as a result of the function call. May be empty.
* `timer_duration_ms` - see `t:timer/0`.
* `buffer` - `t:#{inspect(__MODULE__)}.t/0`.

Generally speaking, all results of this type can be handled in the same way.
"""
@type result :: {packets :: [Packet.t()], timer_duration_ms :: timer(), buffer :: t()}

@opaque t :: %__MODULE__{
latency: non_neg_integer(),
store: PacketStore.t(),
state: :initial_wait | :timer_set | :timer_not_set
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could replace this by started? boolean and changing maybe_set_timer to get_next_timeout.
Just return time to the next action.

  @spec get_next_timeout(t()) :: non_neg_integer()
  defp get_next_timeout(buffer)

  defp get_next_timeout(%{started: false} = buffer) do
    case PacketStore.first_record_timestamp(buffer.store) do
      # If we're inserting the very first packet, set the initial latency timer
      nil -> buffer.latency
      _ts -> nil
    end
  end

  defp get_next_timeout(%{started: true} = buffer) do
    case PacketStore.first_record_timestamp(buffer.store) do
      nil ->
        nil

      timestamp_ms ->
        since_insertion = System.monotonic_time(:millisecond) - timestamp_ms
        max(0, buffer.latency - since_insertion)
    end
  end

if we also decide that the latency for the first packet is not needed we could simplify this further into

  defp get_next_timeout(buffer) do
    case PacketStore.first_record_timestamp(buffer.store) do
      nil ->
        nil

      timestamp_ms ->
        since_insertion = System.monotonic_time(:millisecond) - timestamp_ms
        max(0, buffer.latency - since_insertion)
    end
  end

Does it make sense to you?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not differentiating between having the timer set and unset means we'll be telling the user to set a timer every time a packet is inserted, which to me seems quite unnecessary and wasteful when it can easily be avoided with just a few lines of code.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PS I love that you chose to include the typespec for the private function here :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not differentiating between having the timer set and unset means we'll be telling the user to set a timer every time a packet is inserted

To be precise, every time when usser inserts something and we are waiting for some packet. In most cases, when packets are ordered, we will immediately flush them.

Anyway, this indeed might not be the bes UX, I didn't notice that earlier 🤔

On the other hand, right now, if someone calls handle_timeout too early, they will get nil as timer and will think everything is okay

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be precise, every time when usser inserts something and we are waiting for some packet. In most cases, when packets are ordered, we will immediately flush them.

Yeah, you're correct, my bad

On the other hand, right now, if someone calls handle_timeout too early, they will get nil as timer and will think everything is okay

I don't see how that would happen, we're changing the buffer state to :timer_not_set immediately after handle_timeout is called, so all of the timer-setting code is being executed

  def handle_timeout(buffer) do
    %__MODULE__{buffer | state: :timer_not_set} |> send_packets()
  end

}

@enforce_keys [:latency]
defstruct @enforce_keys ++
[
store: %PacketStore{},
state: :initial_wait
]

@doc """
Creates a new `t:#{inspect(__MODULE__)}.t/0`.
"""
@spec new(options()) :: t()
def new(opts \\ []) do
%__MODULE__{
latency: opts[:latency] || @default_latency_ms
}
end

@doc """
Places a packet in the JitterBuffer.

Note: The initial latency timer will be set after the first packet is inserted into the buffer.
If you want to start it at your own discretion, schedule a `handle_timeout/1` call prior to that.
"""
@spec insert(t(), Packet.t()) :: result()
def insert(buffer, packet)

def insert(%{state: :initial_wait} = buffer, packet) do
{buffer, timer} = maybe_set_timer(buffer)
{_result, buffer} = try_insert_packet(buffer, packet)

{[], timer, buffer}
end

def insert(buffer, packet) do
case try_insert_packet(buffer, packet) do
{:ok, buffer} -> send_packets(buffer)
{:error, buffer} -> {[], nil, buffer}
end
end

@doc """
Flushes all remaining packets and resets the JitterBuffer.

Note: After flushing, the rollover counter is reset to `0`.
"""
@spec flush(t()) :: result()
def flush(buffer) do
packets =
buffer.store
|> PacketStore.dump()
|> handle_missing_packets()

{packets, nil, %__MODULE__{latency: buffer.latency}}
end

@doc """
Handles the end of a previously set timer.
"""
@spec handle_timeout(t()) :: result()
def handle_timeout(buffer) do
%__MODULE__{buffer | state: :timer_not_set} |> send_packets()
end

@spec try_insert_packet(t(), Packet.t()) :: {:ok | :error, t()}
LVala marked this conversation as resolved.
Show resolved Hide resolved
defp try_insert_packet(buffer, packet) do
case PacketStore.insert(buffer.store, packet) do
{:ok, store} -> {:ok, %__MODULE__{buffer | store: store}}
{:error, :late_packet} -> {:error, buffer}
end
end

@spec send_packets(t()) :: result()
mickel8 marked this conversation as resolved.
Show resolved Hide resolved
defp send_packets(%{store: store} = buffer) do
# Flush packets that stayed in queue longer than latency and any gaps before them
{too_old_packets, store} = PacketStore.flush_older_than(store, buffer.latency)
# Additionally, flush packets as long as there are no gaps
{gapless_packets, store} = PacketStore.flush_ordered(store)

packets =
too_old_packets
|> Stream.concat(gapless_packets)
|> handle_missing_packets()

{buffer, timer} = maybe_set_timer(%__MODULE__{buffer | store: store})

{packets, timer, buffer}
end

@spec handle_missing_packets(Enumerable.t(Packet.t() | nil)) :: [Packet.t()]
defp handle_missing_packets(packets) do
# TODO: nil -- missing packet (maybe owner should be notified about that)
Enum.reject(packets, &is_nil/1)
end

@spec maybe_set_timer(t()) :: {t(), timer()}
defp maybe_set_timer(buffer)

defp maybe_set_timer(%{state: :initial_wait} = buffer) do
case PacketStore.first_packet_timestamp(buffer.store) do
# If we're inserting the very first packet, set the initial latency timer
nil -> {buffer, buffer.latency}
_ts -> {buffer, nil}
end
end

defp maybe_set_timer(%{state: :timer_not_set} = buffer) do
case PacketStore.first_packet_timestamp(buffer.store) do
nil ->
{buffer, nil}

timestamp_ms ->
since_insertion = System.monotonic_time(:millisecond) - timestamp_ms
send_after_time = max(0, buffer.latency - since_insertion)

{%__MODULE__{buffer | state: :timer_set}, send_after_time}
end
end

defp maybe_set_timer(%{state: :timer_set} = buffer), do: {buffer, nil}
end
Loading