Skip to content

Commit

Permalink
Add jitter buffer (#158)
Browse files Browse the repository at this point in the history
  • Loading branch information
sgfn authored Sep 20, 2024
1 parent f310ba0 commit e42c09d
Show file tree
Hide file tree
Showing 12 changed files with 1,275 additions and 29 deletions.
102 changes: 84 additions & 18 deletions examples/save_to_file/lib/save_to_file/peer_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ defmodule SaveToFile.PeerHandler do
}

alias ExWebRTC.Media.{IVF, Ogg}
alias ExWebRTC.RTP.Depayloader
alias ExWebRTC.RTP.{Depayloader, JitterBuffer}

@behaviour WebSock

@jitter_buffer_latency_ms 100

@video_file "./video.ivf"
@audio_file "./audio.ogg"

Expand Down Expand Up @@ -53,8 +55,10 @@ defmodule SaveToFile.PeerHandler do
audio_track_id: nil,
video_writer: nil,
video_depayloader: nil,
video_buffer: nil,
audio_writer: nil,
audio_depayloader: nil,
audio_buffer: nil,
frames_cnt: 0
}

Expand All @@ -73,10 +77,22 @@ defmodule SaveToFile.PeerHandler do
handle_webrtc_msg(msg, state)
end

@impl true
def handle_info({:jitter_buffer_timer, kind}, state) do
case kind do
:video -> state.video_buffer
:audio -> state.audio_buffer
end
|> JitterBuffer.handle_timer()
|> handle_jitter_buffer_result(kind, state)
end

@impl true
def terminate(reason, state) do
Logger.warning("WebSocket connection was terminated, reason: #{inspect(reason)}")

state = flush_jitter_buffers(state)

if state.video_writer, do: IVF.Writer.close(state.video_writer)
if state.audio_writer, do: Ogg.Writer.close(state.audio_writer)
end
Expand Down Expand Up @@ -142,11 +158,13 @@ defmodule SaveToFile.PeerHandler do
)

{:ok, video_depayloader} = @video_codecs |> hd() |> Depayloader.new()
video_buffer = JitterBuffer.new(latency: @jitter_buffer_latency_ms)

state = %{
state
| video_depayloader: video_depayloader,
video_writer: video_writer,
video_buffer: video_buffer,
video_track_id: id
}

Expand All @@ -157,11 +175,13 @@ defmodule SaveToFile.PeerHandler do
# by default uses 1 mono channel and 48k clock rate
{:ok, audio_writer} = Ogg.Writer.open(@audio_file)
{:ok, audio_depayloader} = @audio_codecs |> hd() |> Depayloader.new()
audio_buffer = JitterBuffer.new(latency: @jitter_buffer_latency_ms)

state = %{
state
| audio_depayloader: audio_depayloader,
audio_writer: audio_writer,
audio_buffer: audio_buffer,
audio_track_id: id
}

Expand All @@ -175,32 +195,78 @@ defmodule SaveToFile.PeerHandler do
end

defp handle_webrtc_msg({:rtp, id, nil, packet}, %{video_track_id: id} = state) do
state.video_buffer
|> JitterBuffer.place_packet(packet)
|> handle_jitter_buffer_result(:video, state)
end

defp handle_webrtc_msg({:rtp, id, nil, packet}, %{audio_track_id: id} = state) do
state.audio_buffer
|> JitterBuffer.place_packet(packet)
|> handle_jitter_buffer_result(:audio, state)
end

defp handle_webrtc_msg(_msg, state), do: {:ok, state}

defp handle_jitter_buffer_result({packets, timer, buffer}, kind, state) do
state =
case Depayloader.depayload(state.video_depayloader, packet) do
{nil, video_depayloader} ->
%{state | video_depayloader: video_depayloader}

{vp8_frame, video_depayloader} ->
frame = %IVF.Frame{timestamp: state.frames_cnt, data: vp8_frame}
{:ok, video_writer} = IVF.Writer.write_frame(state.video_writer, frame)

%{
state
| video_depayloader: video_depayloader,
video_writer: video_writer,
frames_cnt: state.frames_cnt + 1
}
case kind do
:video -> %{state | video_buffer: buffer}
:audio -> %{state | audio_buffer: buffer}
end

state =
Enum.reduce(packets, state, fn packet, state -> handle_packet(packet, kind, state) end)

unless is_nil(timer), do: Process.send_after(self(), {:jitter_buffer_timer, kind}, timer)

{:ok, state}
end

defp handle_webrtc_msg({:rtp, id, nil, packet}, %{audio_track_id: id} = state) do
defp handle_packet(packet, :video, state) do
case Depayloader.depayload(state.video_depayloader, packet) do
{nil, video_depayloader} ->
%{state | video_depayloader: video_depayloader}

{vp8_frame, video_depayloader} ->
frame = %IVF.Frame{timestamp: state.frames_cnt, data: vp8_frame}
{:ok, video_writer} = IVF.Writer.write_frame(state.video_writer, frame)

%{
state
| video_depayloader: video_depayloader,
video_writer: video_writer,
frames_cnt: state.frames_cnt + 1
}
end
end

defp handle_packet(packet, :audio, state) do
{opus_packet, depayloader} = Depayloader.depayload(state.audio_depayloader, packet)
{:ok, audio_writer} = Ogg.Writer.write_packet(state.audio_writer, opus_packet)

{:ok, %{state | audio_depayloader: depayloader, audio_writer: audio_writer}}
%{state | audio_depayloader: depayloader, audio_writer: audio_writer}
end

defp handle_webrtc_msg(_msg, state), do: {:ok, state}
defp flush_jitter_buffers(state),
do: state |> flush_jitter_buffer(:video) |> flush_jitter_buffer(:audio)

defp flush_jitter_buffer(state, kind) do
buffer =
case kind do
:video -> state.video_buffer
:audio -> state.audio_buffer
end

if is_nil(buffer) do
state
else
{:ok, state} =
buffer
|> JitterBuffer.flush()
|> handle_jitter_buffer_result(kind, state)

state
end
end
end
6 changes: 2 additions & 4 deletions examples/save_to_file/mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,11 @@
"bunch": {:hex, :bunch, "1.6.1", "5393d827a64d5f846092703441ea50e65bc09f37fd8e320878f13e63d410aec7", [:mix], [], "hexpm", "286cc3add551628b30605efbe2fca4e38cc1bea89bcd0a1a7226920b3364fe4a"},
"bunch_native": {:hex, :bunch_native, "0.5.0", "8ac1536789a597599c10b652e0b526d8833348c19e4739a0759a2bedfd924e63", [:mix], [{:bundlex, "~> 1.0", [hex: :bundlex, repo: "hexpm", optional: false]}], "hexpm", "24190c760e32b23b36edeb2dc4852515c7c5b3b8675b1a864e0715bdd1c8f80d"},
"bundlex": {:hex, :bundlex, "1.5.3", "35d01e5bc0679510dd9a327936ffb518f63f47175c26a35e708cc29eaec0890b", [:mix], [{:bunch, "~> 1.0", [hex: :bunch, repo: "hexpm", optional: false]}, {:elixir_uuid, "~> 1.2", [hex: :elixir_uuid, repo: "hexpm", optional: false]}, {:qex, "~> 0.5", [hex: :qex, repo: "hexpm", optional: false]}, {:req, ">= 0.4.0", [hex: :req, repo: "hexpm", optional: false]}, {:zarex, "~> 1.0", [hex: :zarex, repo: "hexpm", optional: false]}], "hexpm", "debd0eac151b404f6216fc60222761dff049bf26f7d24d066c365317650cd118"},
"castore": {:hex, :castore, "1.0.8", "dedcf20ea746694647f883590b82d9e96014057aff1d44d03ec90f36a5c0dc6e", [:mix], [], "hexpm", "0b2b66d2ee742cb1d9cb8c8be3b43c3a70ee8651f37b75a8b982e036752983f1"},
"crc": {:hex, :crc, "0.10.5", "ee12a7c056ac498ef2ea985ecdc9fa53c1bfb4e53a484d9f17ff94803707dfd8", [:mix, :rebar3], [{:elixir_make, "~> 0.6", [hex: :elixir_make, repo: "hexpm", optional: false]}], "hexpm", "3e673b6495a9525c5c641585af1accba59a1eb33de697bedf341e247012c2c7f"},
"elixir_make": {:hex, :elixir_make, "0.8.4", "4960a03ce79081dee8fe119d80ad372c4e7badb84c493cc75983f9d3bc8bde0f", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:certifi, "~> 2.0", [hex: :certifi, repo: "hexpm", optional: true]}], "hexpm", "6e7f1d619b5f61dfabd0a20aa268e575572b542ac31723293a4c1a567d5ef040"},
"elixir_uuid": {:hex, :elixir_uuid, "1.2.1", "dce506597acb7e6b0daeaff52ff6a9043f5919a4c3315abb4143f0b00378c097", [:mix], [], "hexpm", "f7eba2ea6c3555cea09706492716b0d87397b88946e6380898c2889d68585752"},
"ex_dtls": {:hex, :ex_dtls, "0.16.0", "3ae38025ccc77f6db573e2e391602fa9bbc02253c137d8d2d59469a66cbe806b", [:mix], [{:bundlex, "~> 1.5.3", [hex: :bundlex, repo: "hexpm", optional: false]}, {:unifex, "~> 1.0", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "2a4e30d74c6ddf95cc5b796423293c06a0da295454c3823819808ff031b4b361"},
"ex_ice": {:hex, :ex_ice, "0.8.0", "f9bd181e8fd2f8ac9a808587ee8a47bf667143069d75f6e4892a62156d798aa7", [:mix], [{:elixir_uuid, "~> 1.0", [hex: :elixir_uuid, repo: "hexpm", optional: false]}, {:ex_stun, "~> 0.2.0", [hex: :ex_stun, repo: "hexpm", optional: false]}, {:ex_turn, "~> 0.1.0", [hex: :ex_turn, repo: "hexpm", optional: false]}], "hexpm", "b0476f6b18986f6df48fda4cecb3be5022323572790d1bb49da10b177c936b4e"},
"ex_ice": {:hex, :ex_ice, "0.8.1", "4d5c911766ce92e13323b632a55d9ab821092f13fc2ebf236dc233c8c1f9a64c", [:mix], [{:elixir_uuid, "~> 1.0", [hex: :elixir_uuid, repo: "hexpm", optional: false]}, {:ex_stun, "~> 0.2.0", [hex: :ex_stun, repo: "hexpm", optional: false]}, {:ex_turn, "~> 0.1.0", [hex: :ex_turn, repo: "hexpm", optional: false]}], "hexpm", "8f10134e2eb7e6aebbf8fba0d5fcec56d8f8db3e94c3dde045feb463979c2dda"},
"ex_libsrtp": {:hex, :ex_libsrtp, "0.7.2", "211bd89c08026943ce71f3e2c0231795b99cee748808ed3ae7b97cd8d2450b6b", [:mix], [{:bunch, "~> 1.6", [hex: :bunch, repo: "hexpm", optional: false]}, {:bundlex, "~> 1.3", [hex: :bundlex, repo: "hexpm", optional: false]}, {:membrane_precompiled_dependency_provider, "~> 0.1.0", [hex: :membrane_precompiled_dependency_provider, repo: "hexpm", optional: false]}, {:unifex, "~> 1.1", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "2e20645d0d739a4ecdcf8d4810a0c198120c8a2f617f2b75b2e2e704d59f492a"},
"ex_rtcp": {:hex, :ex_rtcp, "0.4.0", "f9e515462a9581798ff6413583a25174cfd2101c94a2ebee871cca7639886f0a", [:mix], [], "hexpm", "28956602cf210d692fcdaf3f60ca49681634e1deb28ace41246aee61ee22dc3b"},
"ex_rtp": {:hex, :ex_rtp, "0.4.0", "1f1b5c1440a904706011e3afbb41741f5da309ce251cb986690ce9fd82636658", [:mix], [], "hexpm", "0f72d80d5953a62057270040f0f1ee6f955c08eeae82ac659c038001d7d5a790"},
Expand All @@ -22,7 +21,6 @@
"mime": {:hex, :mime, "2.0.6", "8f18486773d9b15f95f4f4f1e39b710045fa1de891fada4516559967276e4dc2", [:mix], [], "hexpm", "c9945363a6b26d747389aac3643f8e0e09d30499a138ad64fe8fd1d13d9b153e"},
"mint": {:hex, :mint, "1.6.2", "af6d97a4051eee4f05b5500671d47c3a67dac7386045d87a904126fd4bbcea2e", [:mix], [{:castore, "~> 0.1.0 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:hpax, "~> 0.1.1 or ~> 0.2.0 or ~> 1.0", [hex: :hpax, repo: "hexpm", optional: false]}], "hexpm", "5ee441dffc1892f1ae59127f74afe8fd82fda6587794278d924e4d90ea3d63f9"},
"nimble_options": {:hex, :nimble_options, "1.1.1", "e3a492d54d85fc3fd7c5baf411d9d2852922f66e69476317787a7b2bb000a61b", [:mix], [], "hexpm", "821b2470ca9442c4b6984882fe9bb0389371b8ddec4d45a9504f00a66f650b44"},
"nimble_ownership": {:hex, :nimble_ownership, "0.3.1", "99d5244672fafdfac89bfad3d3ab8f0d367603ce1dc4855f86a1c75008bce56f", [:mix], [], "hexpm", "4bf510adedff0449a1d6e200e43e57a814794c8b5b6439071274d248d272a549"},
"nimble_pool": {:hex, :nimble_pool, "1.1.0", "bf9c29fbdcba3564a8b800d1eeb5a3c58f36e1e11d7b7fb2e084a643f645f06b", [:mix], [], "hexpm", "af2e4e6b34197db81f7aad230c1118eac993acc0dae6bc83bac0126d4ae0813a"},
"plug": {:hex, :plug, "1.15.3", "712976f504418f6dff0a3e554c40d705a9bcf89a7ccef92fc6a5ef8f16a30a97", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.1.1 or ~> 1.2 or ~> 2.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "cc4365a3c010a56af402e0809208873d113e9c38c401cabd88027ef4f5c01fd2"},
"plug_crypto": {:hex, :plug_crypto, "2.1.0", "f44309c2b06d249c27c8d3f65cfe08158ade08418cf540fd4f72d4d6863abb7b", [:mix], [], "hexpm", "131216a4b030b8f8ce0f26038bc4421ae60e4bb95c5cf5395e1421437824c4fa"},
Expand All @@ -33,6 +31,6 @@
"thousand_island": {:hex, :thousand_island, "1.3.5", "6022b6338f1635b3d32406ff98d68b843ba73b3aa95cfc27154223244f3a6ca5", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "2be6954916fdfe4756af3239fb6b6d75d0b8063b5df03ba76fd8a4c87849e180"},
"unifex": {:hex, :unifex, "1.2.0", "90d1ec5e6d788350e07e474f7bd8b0ee866d6606beb9ca4e20dbb26328712a84", [:mix], [{:bunch, "~> 1.0", [hex: :bunch, repo: "hexpm", optional: false]}, {:bundlex, "~> 1.4", [hex: :bundlex, repo: "hexpm", optional: false]}, {:shmex, "~> 0.5.0", [hex: :shmex, repo: "hexpm", optional: false]}], "hexpm", "7a8395aabc3ba6cff04bbe5b995de7f899a38eb57f189e49927d6b8b6ccb6883"},
"websock": {:hex, :websock, "0.5.3", "2f69a6ebe810328555b6fe5c831a851f485e303a7c8ce6c5f675abeb20ebdadc", [:mix], [], "hexpm", "6105453d7fac22c712ad66fab1d45abdf049868f253cf719b625151460b8b453"},
"websock_adapter": {:hex, :websock_adapter, "0.5.6", "0437fe56e093fd4ac422de33bf8fc89f7bc1416a3f2d732d8b2c8fd54792fe60", [:mix], [{:bandit, ">= 0.6.0", [hex: :bandit, repo: "hexpm", optional: true]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.6", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:websock, "~> 0.5", [hex: :websock, repo: "hexpm", optional: false]}], "hexpm", "e04378d26b0af627817ae84c92083b7e97aca3121196679b73c73b99d0d133ea"},
"websock_adapter": {:hex, :websock_adapter, "0.5.7", "65fa74042530064ef0570b75b43f5c49bb8b235d6515671b3d250022cb8a1f9e", [:mix], [{:bandit, ">= 0.6.0", [hex: :bandit, repo: "hexpm", optional: true]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.6", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:websock, "~> 0.5", [hex: :websock, repo: "hexpm", optional: false]}], "hexpm", "d0f478ee64deddfec64b800673fd6e0c8888b079d9f3444dd96d2a98383bdbd1"},
"zarex": {:hex, :zarex, "1.0.5", "58239e3ee5d75f343262bb4df5cf466555a1c689f920e5d3651a9333972f7c7e", [:mix], [], "hexpm", "9fb72ef0567c2b2742f5119a1ba8a24a2fabb21b8d09820aefbf3e592fa9a46a"},
}
164 changes: 164 additions & 0 deletions lib/ex_webrtc/rtp/jitter_buffer.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
defmodule ExWebRTC.RTP.JitterBuffer do
@moduledoc """
Buffers and reorders RTP packets based on `sequence_number`, introducing controlled latency
in order to combat network jitter and improve the QoS.
"""

# Heavily inspired by:
# https://github.com/membraneframework/membrane_rtp_plugin/blob/23f3279540aea7dea3a194fd5a1680c2549aebae/lib/membrane/rtp/jitter_buffer.ex

alias ExWebRTC.RTP.JitterBuffer.PacketStore
alias ExRTP.Packet

@default_latency_ms 200

@typedoc """
Options that can be passed to `new/1`.
* `latency` - latency introduced by the buffer, in milliseconds. `#{@default_latency_ms}` by default.
"""
@type options :: [latency: non_neg_integer()]

@typedoc """
Time (in milliseconds) after which `handle_timeout/1` should be called.
Can be `nil`, in which case no timer needs to be set.
"""
@type timer :: non_neg_integer() | nil

@typedoc """
The 3-element tuple returned by all functions other than `new/1`.
* `packets` - a list with packets flushed from the buffer as a result of the function call. May be empty.
* `timer_duration_ms` - see `t:timer/0`.
* `buffer` - `t:#{inspect(__MODULE__)}.t/0`.
Generally speaking, all results of this type can be handled in the same way.
"""
@type result :: {packets :: [Packet.t()], timer_duration_ms :: timer(), buffer :: t()}

@opaque t :: %__MODULE__{
latency: non_neg_integer(),
store: PacketStore.t(),
state: :initial_wait | :timer_set | :timer_not_set
}

@enforce_keys [:latency]
defstruct @enforce_keys ++
[
store: %PacketStore{},
state: :initial_wait
]

@doc """
Creates a new `t:#{inspect(__MODULE__)}.t/0`.
"""
@spec new(options()) :: t()
def new(opts \\ []) do
%__MODULE__{
latency: opts[:latency] || @default_latency_ms
}
end

@doc """
Places a packet in the JitterBuffer.
Note: The initial latency timer will be set after the first packet is inserted into the buffer.
If you want to start it at your own discretion, schedule a `handle_timeout/1` call prior to that.
"""
@spec insert(t(), Packet.t()) :: result()
def insert(buffer, packet)

def insert(%{state: :initial_wait} = buffer, packet) do
{buffer, timer} = maybe_set_timer(buffer)
{_result, buffer} = try_insert_packet(buffer, packet)

{[], timer, buffer}
end

def insert(buffer, packet) do
case try_insert_packet(buffer, packet) do
{:ok, buffer} -> send_packets(buffer)
{:error, buffer} -> {[], nil, buffer}
end
end

@doc """
Flushes all remaining packets and resets the JitterBuffer.
Note: After flushing, the rollover counter is reset to `0`.
"""
@spec flush(t()) :: result()
def flush(buffer) do
packets =
buffer.store
|> PacketStore.dump()
|> handle_missing_packets()

{packets, nil, %__MODULE__{latency: buffer.latency}}
end

@doc """
Handles the end of a previously set timer.
"""
@spec handle_timeout(t()) :: result()
def handle_timeout(buffer) do
%__MODULE__{buffer | state: :timer_not_set} |> send_packets()
end

@spec try_insert_packet(t(), Packet.t()) :: {:ok | :error, t()}
defp try_insert_packet(buffer, packet) do
case PacketStore.insert(buffer.store, packet) do
{:ok, store} -> {:ok, %__MODULE__{buffer | store: store}}
{:error, :late_packet} -> {:error, buffer}
end
end

@spec send_packets(t()) :: result()
defp send_packets(%{store: store} = buffer) do
# Flush packets that stayed in queue longer than latency and any gaps before them
{too_old_packets, store} = PacketStore.flush_older_than(store, buffer.latency)
# Additionally, flush packets as long as there are no gaps
{gapless_packets, store} = PacketStore.flush_ordered(store)

packets =
too_old_packets
|> Stream.concat(gapless_packets)
|> handle_missing_packets()

{buffer, timer} = maybe_set_timer(%__MODULE__{buffer | store: store})

{packets, timer, buffer}
end

@spec handle_missing_packets(Enumerable.t(Packet.t() | nil)) :: [Packet.t()]
defp handle_missing_packets(packets) do
# TODO: nil -- missing packet (maybe owner should be notified about that)
Enum.reject(packets, &is_nil/1)
end

@spec maybe_set_timer(t()) :: {t(), timer()}
defp maybe_set_timer(buffer)

defp maybe_set_timer(%{state: :initial_wait} = buffer) do
case PacketStore.first_packet_timestamp(buffer.store) do
# If we're inserting the very first packet, set the initial latency timer
nil -> {buffer, buffer.latency}
_ts -> {buffer, nil}
end
end

defp maybe_set_timer(%{state: :timer_not_set} = buffer) do
case PacketStore.first_packet_timestamp(buffer.store) do
nil ->
{buffer, nil}

timestamp_ms ->
since_insertion = System.monotonic_time(:millisecond) - timestamp_ms
send_after_time = max(0, buffer.latency - since_insertion)

{%__MODULE__{buffer | state: :timer_set}, send_after_time}
end
end

defp maybe_set_timer(%{state: :timer_set} = buffer), do: {buffer, nil}
end
Loading

0 comments on commit e42c09d

Please sign in to comment.