Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow selecting synchronization strategy #5

Merged
merged 7 commits into from
Apr 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
216 changes: 147 additions & 69 deletions lib/membrane/timestamp_queue.ex
Original file line number Diff line number Diff line change
Expand Up @@ -37,30 +37,33 @@ defmodule Membrane.TimestampQueue do
metric_unit: :buffers | :bytes | :time,
pad_queues: %{optional(Pad.ref()) => pad_queue()},
pads_heap: Heap.t(),
registered_pads: MapSet.t(),
blocking_registered_pads: MapSet.t(),
registered_pads_offsets: %{optional(Pad.ref()) => integer()},
# :awaiting_pads contain at most one element at the time
awaiting_pads: [Pad.ref()],
mat-hek marked this conversation as resolved.
Show resolved Hide resolved
closed?: boolean(),
chunk_duration: nil | Membrane.Time.t(),
chunk_full?: boolean(),
next_chunk_boundary: nil | Membrane.Time.t(),
max_time_in_queues: Membrane.Time.t()
synchronization_strategy: :synchronize_on_arrival | :explicit_offsets
}

defstruct current_queue_time: Membrane.Time.seconds(0),
pause_demand_boundary: :infinity,
metric_unit: :buffers,
pad_queues: %{},
pads_heap: Heap.max(),
registered_pads: MapSet.new(),
blocking_registered_pads: MapSet.new(),
registered_pads_offsets: %{},
awaiting_pads: [],
closed?: false,
chunk_duration: nil,
chunk_full?: false,
next_chunk_boundary: nil,
max_time_in_queues: 0
synchronization_strategy: :synchronize_on_arrival

@typedoc """
Options passed to #{inspect(__MODULE__)}.new/1.
Options passed to `#{inspect(__MODULE__)}.new/1`.

Following options are allowed:
- `:pause_demand_boundary` - positive integer, `t:Membrane.Time.t()` or `:infinity` (default to `:infinity`). Tells,
Expand All @@ -69,47 +72,100 @@ defmodule Membrane.TimestampQueue do
`:pause_demand_boundary` is specified.
- `:chunk_duration` - `Membrane.Time.t()`. Specifies how long the fragments returned by
`#{inspect(__MODULE__)}.pop_chunked/1` will be approximately.
- `:synchronization_strategy` - `:synchronize_on_arrival` or `:exact_timestamps` (default to `:synchronize_on_arrival`).
Specyfies, how items from different pads will be synchronized with each other. If it is set to:
* `:synchronize_on_arrival` - in the moment of the arrival of the first buffer from a specific pad, there will be
caluclated timestamp offset for this pad. These offsets will be added to the buffers timestamps, to caluclate from
which pad items should be returned in the first order. Every offset will be calculated in such a way that the first
buffer from a new pad will be returned as the next item.
* `:explicit_offsets` - buffers from various pads will be sorted based on their timestamps and pads offsets. Pads
offsets can be set using `#{inspect(__MODULE__)}.register_pad/3` function. If pad offset is not explicitly set
before the first buffer from this pad, it will be equal 0.
"""
@type options :: [
pause_demand_boundary: pos_integer() | Membrane.Time.t() | :infinity,
pause_demand_boundary_unit: :buffers | :bytes | :time,
chunk_duration: Membrane.Time.t()
chunk_duration: Membrane.Time.t(),
synchronization_strategy: :synchronize_on_arrival | :explicit_offsets
]

@spec new(options) :: t()
def new(options \\ []) do
[
chunk_duration: chunk_duration,
pause_demand_boundary: boundary,
pause_demand_boundary_unit: unit
pause_demand_boundary_unit: unit,
synchronization_strategy: synchronization_strategy
] =
options
|> Keyword.validate!(
chunk_duration: nil,
pause_demand_boundary: :infinity,
pause_demand_boundary_unit: :buffers
pause_demand_boundary_unit: :buffers,
synchronization_strategy: :synchronize_on_arrival
)
|> Enum.sort()

%__MODULE__{
pause_demand_boundary: boundary,
metric_unit: unit,
chunk_duration: chunk_duration
chunk_duration: chunk_duration,
synchronization_strategy: synchronization_strategy
}
end

@typedoc """
Options passed to `#{inspect(__MODULE__)}.register_pad/3`.

Following options are allowed:
- `:wait_on_buffers?` - `boolean()`, default to `true`. Specyfies, if the queue will wait with returning buffers
in `pop_*` functions, until it receives the first buffer from a pad passed as a second argument to the function.
- `:timestamp_offset` - integer. Specyfies, what will be the timestamp offset of a pad passed as a second argument
to the function. Allowed only if `#{inspect(__MODULE__)}` synchronization strategy is `:explicit_offsets`.
"""
@type register_pad_options :: [
timestamp_offset: integer(),
wait_on_buffers?: boolean()
]

@doc """
Registers an input pad in the queue without pushing anything on that pad.

Once a pad is registered, the `pop_available_items/3` function won't return buffers
until a `buffer` or `end_of_stream` is available on the registered pad.
Once a pad is registered with option `wait_on_buffers?: true` (default), the `pop_available_items/3` function won't
return any buffers until a `buffer` or `end_of_stream` is available on the registered pad.

Pushing a buffer on an unregistered pad automatically registers it.
"""
@spec register_pad(t(), Pad.ref()) :: t()
def register_pad(%__MODULE__{} = timestamp_queue, pad_ref) do
timestamp_queue
|> Map.update!(:registered_pads, &MapSet.put(&1, pad_ref))
@spec register_pad(t(), Pad.ref(), register_pad_options()) :: t()
def register_pad(%__MODULE__{} = timestamp_queue, pad_ref, opts \\ []) do
[timestamp_offset: offset, wait_on_buffers?: wait?] =
opts
|> Keyword.validate!(timestamp_offset: nil, wait_on_buffers?: true)
|> Enum.sort()

if offset != nil and timestamp_queue.synchronization_strategy == :synchronize_on_arrival do
raise """
Option :timestamp_offset in #{inspect(__MODULE__)}.register_pad/3 cannot be set if #{inspect(__MODULE__)} \
synchronization strategy is :synchronize_on_arrival (default).
"""
end

with %{timestamp_offset: offset} when offset != nil <-
Map.get(timestamp_queue.pad_queues, pad_ref) do
raise """
Cannot register pad `#{inspect(pad_ref)}, because buffers from it are already in `#{inspect(__MODULE__)}. \
Every pad can be registered only before pushing the first buffer from it on the queue.
"""
end

timestamp_queue =
if wait?,
do: Map.update!(timestamp_queue, :blocking_registered_pads, &MapSet.put(&1, pad_ref)),
else: timestamp_queue

if offset != nil,
do: put_in(timestamp_queue, [:registered_pads_offsets, pad_ref], offset),
else: timestamp_queue
end

@doc """
Expand All @@ -134,23 +190,14 @@ defmodule Membrane.TimestampQueue do
def push_buffer(%__MODULE__{} = timestamp_queue, pad_ref, buffer) do
{actions, timestamp_queue} =
timestamp_queue
|> push_item(pad_ref, {:buffer, buffer})
|> Map.update!(:registered_pads, &MapSet.delete(&1, pad_ref))
|> Map.update!(:awaiting_pads, &List.delete(&1, pad_ref))
|> Map.update!(:pad_queues, &Map.put_new_lazy(&1, pad_ref, fn -> new_pad_queue() end))
|> get_and_update_in([:pad_queues, pad_ref], fn pad_queue ->
old_buffers_size = pad_queue.buffers_size

pad_queue =
pad_queue
|> Map.update!(:buffers_number, &(&1 + 1))
|> Map.update!(:timestamp_offset, fn
nil -> timestamp_queue.current_queue_time - (buffer.dts || buffer.pts)
valid_offset -> valid_offset
end)
|> Map.update!(:use_pts?, fn
nil -> buffer.dts == nil
valid_boolean -> valid_boolean
end)
|> maybe_handle_first_buffer(pad_ref, buffer, timestamp_queue)
|> increase_buffers_size(buffer, timestamp_queue.metric_unit)
|> check_timestamps_consistency!(buffer, pad_ref)

Expand All @@ -169,18 +216,42 @@ defmodule Membrane.TimestampQueue do

timestamp_queue =
timestamp_queue
|> Map.update!(:max_time_in_queues, &max(&1, buff_time))
|> Map.update!(:next_chunk_boundary, fn
nil when timestamp_queue.chunk_duration != nil ->
buff_time + timestamp_queue.chunk_duration

other ->
other
end)
|> remove_pad_from_registered_and_awaiting_pads(pad_ref)
|> push_pad_on_heap_if_qex_empty(pad_ref, -buff_time, pad_queue)
|> push_item_on_qex(pad_ref, {:buffer, buffer})

{actions, timestamp_queue}
end

defp maybe_handle_first_buffer(
%{timestamp_offset: nil} = pad_queue,
pad_ref,
buffer,
timestamp_queue
) do
offset =
case timestamp_queue.synchronization_strategy do
:synchronize_on_arrival ->
(buffer.dts || buffer.pts) - timestamp_queue.current_queue_time

:explicit_offsets ->
Map.get(timestamp_queue.registered_pads_offsets, pad_ref, 0)
end

use_pts? = buffer.pts != nil

%{pad_queue | timestamp_offset: offset, use_pts?: use_pts?}
end

defp maybe_handle_first_buffer(pad_queue, _pad_ref, _buffer, _timestamp_queue), do: pad_queue

defp check_timestamps_consistency!(pad_queue, buffer, pad_ref) do
if not pad_queue.use_pts? and buffer.dts == nil do
raise """
Expand Down Expand Up @@ -236,49 +307,21 @@ defmodule Membrane.TimestampQueue do
timestamp_queue
|> push_item(pad_ref, :end_of_stream)
|> put_in([:pad_queues, pad_ref, :end_of_stream?], true)
|> Map.update!(:registered_pads, &MapSet.delete(&1, pad_ref))
|> Map.update!(:awaiting_pads, &List.delete(&1, pad_ref))
|> remove_pad_from_registered_and_awaiting_pads(pad_ref)
end

defp push_item(%__MODULE__{closed?: true}, pad_ref, item) do
inspected_item =
case item do
:end_of_stream -> "end of stream"
{:stream_format, value} -> "stream format #{inspect(value)}"
{type, value} -> "#{type} #{inspect(value)}"
end

raise """
Unable to push #{inspected_item} from pad #{inspect(pad_ref)} on the already closed #{inspect(__MODULE__)}. \
After calling #{inspect(__MODULE__)}.flush_and_close/1 queue is not capable to handle new items and new \
queue has to be created.
"""
defp remove_pad_from_registered_and_awaiting_pads(timestamp_queue, pad_ref) do
timestamp_queue
|> Map.update!(:blocking_registered_pads, &MapSet.delete(&1, pad_ref))
|> Map.update!(:registered_pads_offsets, &Map.delete(&1, pad_ref))
|> Map.update!(:awaiting_pads, &List.delete(&1, pad_ref))
end

defp push_item(%__MODULE__{} = timestamp_queue, pad_ref, item) do
timestamp_queue
|> maybe_push_pad_on_heap_on_new_item(pad_ref, item)
|> Map.update!(:pad_queues, &Map.put_new(&1, pad_ref, new_pad_queue()))
|> update_in([:pad_queues, pad_ref, :qex], &Qex.push(&1, item))
end

defp maybe_push_pad_on_heap_on_new_item(timestamp_queue, pad_ref, item) do
pad_queue = Map.get(timestamp_queue.pad_queues, pad_ref)
empty_qex = Qex.new()

case {item, pad_queue} do
{{:buffer, _buffer}, nil} ->
push_pad_on_heap(timestamp_queue, pad_ref, -timestamp_queue.current_queue_time)

{{:buffer, buffer}, pad_queue} when pad_queue.qex == empty_qex ->
push_pad_on_heap(timestamp_queue, pad_ref, -buffer_time(buffer, pad_queue))

{_non_buffer, pad_queue} when pad_queue == nil or pad_queue.qex == empty_qex ->
push_pad_on_heap(timestamp_queue, pad_ref, :infinity)

_else ->
timestamp_queue
end
|> Map.update!(:pad_queues, &Map.put_new_lazy(&1, pad_ref, fn -> new_pad_queue() end))
|> push_pad_on_heap_if_qex_empty(pad_ref, :infinity)
|> push_item_on_qex(pad_ref, item)
end

defp new_pad_queue() do
Expand Down Expand Up @@ -340,10 +383,10 @@ defmodule Membrane.TimestampQueue do
do: %{pad_queue | buffers_size: pad_queue.buffers_size - byte_size(payload)}

defp buffer_time(%Buffer{dts: dts}, %{use_pts?: false, timestamp_offset: timestamp_offset}),
do: dts + timestamp_offset
do: dts - timestamp_offset

defp buffer_time(%Buffer{pts: pts}, %{use_pts?: true, timestamp_offset: timestamp_offset}),
do: pts + timestamp_offset
do: pts - timestamp_offset

@type item ::
{:stream_format, StreamFormat.t()}
Expand Down Expand Up @@ -422,7 +465,7 @@ defmodule Membrane.TimestampQueue do
min_max_time

{_pad_ref, %{max_timestamp_on_qex: max_ts, timestamp_offset: offset}}, min_max_time ->
min(min_max_time, max_ts + offset)
min(min_max_time, max_ts - offset)
end)

do_pop_chunked(timestamp_queue, min_max_time)
Expand Down Expand Up @@ -455,7 +498,8 @@ defmodule Membrane.TimestampQueue do

defp do_pop(%__MODULE__{} = timestamp_queue, actions_acc, items_acc, pop_chunk?) do
try_return_buffer? =
MapSet.size(timestamp_queue.registered_pads) == 0 and timestamp_queue.awaiting_pads == [] and
MapSet.size(timestamp_queue.blocking_registered_pads) == 0 and
timestamp_queue.awaiting_pads == [] and
not timestamp_queue.chunk_full?

case Heap.root(timestamp_queue.pads_heap) do
Expand Down Expand Up @@ -562,6 +606,23 @@ defmodule Membrane.TimestampQueue do
{actions, items, timestamp_queue}
end

defp push_item_on_qex(timestamp_queue, pad_ref, item) do
:ok = ensure_queue_not_closed!(timestamp_queue, pad_ref, item)

timestamp_queue
|> update_in([:pad_queues, pad_ref, :qex], &Qex.push(&1, item))
end

defp push_pad_on_heap_if_qex_empty(timestamp_queue, pad_ref, priority, pad_queue \\ nil) do
qex =
(pad_queue || Map.get(timestamp_queue.pad_queues, pad_ref))
|> Map.get(:qex)

if qex == Qex.new(),
do: push_pad_on_heap(timestamp_queue, pad_ref, priority),
else: timestamp_queue
end

defp push_pad_on_heap(timestamp_queue, pad_ref, priority) do
heap_item = {priority, pad_ref}
Map.update!(timestamp_queue, :pads_heap, &Heap.push(&1, heap_item))
Expand All @@ -580,7 +641,7 @@ defmodule Membrane.TimestampQueue do
"""
@spec flush_and_close(t()) :: {[Action.resume_auto_demand()], [popped_value()], t()}
def flush_and_close(%__MODULE__{} = timestamp_queue) do
%{timestamp_queue | closed?: true, registered_pads: MapSet.new(), awaiting_pads: []}
%{timestamp_queue | closed?: true, blocking_registered_pads: MapSet.new(), awaiting_pads: []}
|> Map.update!(
:pad_queues,
&Map.new(&1, fn {pad_ref, data} ->
Expand All @@ -589,4 +650,21 @@ defmodule Membrane.TimestampQueue do
)
|> pop_available_items()
end

defp ensure_queue_not_closed!(%__MODULE__{closed?: true}, pad_ref, item) do
inspected_item =
case item do
:end_of_stream -> "end of stream"
{:stream_format, value} -> "stream format #{inspect(value)}"
{type, value} -> "#{type} #{inspect(value)}"
end

raise """
Unable to push #{inspected_item} from pad #{inspect(pad_ref)} on the already closed #{inspect(__MODULE__)}. \
After calling #{inspect(__MODULE__)}.flush_and_close/1 queue is not capable to handle new items and new \
queue has to be created.
"""
end

defp ensure_queue_not_closed!(_timestamp_queue, _pad_ref, _item), do: :ok
end
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ defmodule Membrane.TimestampQueue.Mixfile do
defp deps do
[
{:membrane_core, "~> 1.0"},
{:heap, "~> 3.0"},
{:heap, "~> 2.0"},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because 3.0 has weird licence

{:ex_doc, ">= 0.0.0", only: :dev, runtime: false},
{:dialyxir, ">= 0.0.0", only: :dev, runtime: false},
{:credo, ">= 0.0.0", only: :dev, runtime: false}
Expand Down
2 changes: 1 addition & 1 deletion mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
"erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"},
"ex_doc": {:hex, :ex_doc, "0.30.6", "5f8b54854b240a2b55c9734c4b1d0dd7bdd41f71a095d42a70445c03cf05a281", [:mix], [{:earmark_parser, "~> 1.4.31", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "bd48f2ddacf4e482c727f9293d9498e0881597eae6ddc3d9562bd7923375109f"},
"file_system": {:hex, :file_system, "0.2.10", "fb082005a9cd1711c05b5248710f8826b02d7d1784e7c3451f9c1231d4fc162d", [:mix], [], "hexpm", "41195edbfb562a593726eda3b3e8b103a309b733ad25f3d642ba49696bf715dc"},
"heap": {:hex, :heap, "3.0.0", "c6dbcd6e9a0b021432176e89cfd829dd065bd6c115981fdcd981a4251fff5fde", [:mix], [], "hexpm", "373eaca5787e2a2b009c42338e70414f590ceabcf96cfc786627ed762ad4dfc6"},
"heap": {:hex, :heap, "2.0.2", "d98cb178286cfeb5edbcf17785e2d20af73ca57b5a2cf4af584118afbcf917eb", [:mix], [], "hexpm", "ba9ea2fe99eb4bcbd9a8a28eaf71cbcac449ca1d8e71731596aace9028c9d429"},
"jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"},
"makeup": {:hex, :makeup, "1.1.0", "6b67c8bc2882a6b6a445859952a602afc1a41c2e08379ca057c0f525366fc3ca", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "0a45ed501f4a8897f580eabf99a2e5234ea3e75a4373c8a52824f6e873be57a6"},
"makeup_elixir": {:hex, :makeup_elixir, "0.16.1", "cc9e3ca312f1cfeccc572b37a09980287e243648108384b97ff2b76e505c3555", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "e127a341ad1b209bd80f7bd1620a15693a9908ed780c3b763bccf7d200c767c6"},
Expand Down
Loading