diff --git a/lib/membrane/timestamp_queue.ex b/lib/membrane/timestamp_queue.ex index 1250530..caad3ad 100644 --- a/lib/membrane/timestamp_queue.ex +++ b/lib/membrane/timestamp_queue.ex @@ -37,13 +37,15 @@ defmodule Membrane.TimestampQueue do metric_unit: :buffers | :bytes | :time, pad_queues: %{optional(Pad.ref()) => pad_queue()}, pads_heap: Heap.t(), - registered_pads: MapSet.t(), + blocking_registered_pads: MapSet.t(), + registered_pads_offsets: %{optional(Pad.ref()) => integer()}, + # :awaiting_pads contain at most one element at the time awaiting_pads: [Pad.ref()], closed?: boolean(), chunk_duration: nil | Membrane.Time.t(), chunk_full?: boolean(), next_chunk_boundary: nil | Membrane.Time.t(), - max_time_in_queues: Membrane.Time.t() + synchronization_strategy: :synchronize_on_arrival | :explicit_offsets } defstruct current_queue_time: Membrane.Time.seconds(0), @@ -51,16 +53,17 @@ defmodule Membrane.TimestampQueue do metric_unit: :buffers, pad_queues: %{}, pads_heap: Heap.max(), - registered_pads: MapSet.new(), + blocking_registered_pads: MapSet.new(), + registered_pads_offsets: %{}, awaiting_pads: [], closed?: false, chunk_duration: nil, chunk_full?: false, next_chunk_boundary: nil, - max_time_in_queues: 0 + synchronization_strategy: :synchronize_on_arrival @typedoc """ - Options passed to #{inspect(__MODULE__)}.new/1. + Options passed to `#{inspect(__MODULE__)}.new/1`. Following options are allowed: - `:pause_demand_boundary` - positive integer, `t:Membrane.Time.t()` or `:infinity` (default to `:infinity`). Tells, @@ -69,11 +72,21 @@ defmodule Membrane.TimestampQueue do `:pause_demand_boundary` is specified. - `:chunk_duration` - `Membrane.Time.t()`. Specifies how long the fragments returned by `#{inspect(__MODULE__)}.pop_chunked/1` will be approximately. + - `:synchronization_strategy` - `:synchronize_on_arrival` or `:exact_timestamps` (default to `:synchronize_on_arrival`). + Specyfies, how items from different pads will be synchronized with each other. If it is set to: + * `:synchronize_on_arrival` - in the moment of the arrival of the first buffer from a specific pad, there will be + caluclated timestamp offset for this pad. These offsets will be added to the buffers timestamps, to caluclate from + which pad items should be returned in the first order. Every offset will be calculated in such a way that the first + buffer from a new pad will be returned as the next item. + * `:explicit_offsets` - buffers from various pads will be sorted based on their timestamps and pads offsets. Pads + offsets can be set using `#{inspect(__MODULE__)}.register_pad/3` function. If pad offset is not explicitly set + before the first buffer from this pad, it will be equal 0. """ @type options :: [ pause_demand_boundary: pos_integer() | Membrane.Time.t() | :infinity, pause_demand_boundary_unit: :buffers | :bytes | :time, - chunk_duration: Membrane.Time.t() + chunk_duration: Membrane.Time.t(), + synchronization_strategy: :synchronize_on_arrival | :explicit_offsets ] @spec new(options) :: t() @@ -81,35 +94,78 @@ defmodule Membrane.TimestampQueue do [ chunk_duration: chunk_duration, pause_demand_boundary: boundary, - pause_demand_boundary_unit: unit + pause_demand_boundary_unit: unit, + synchronization_strategy: synchronization_strategy ] = options |> Keyword.validate!( chunk_duration: nil, pause_demand_boundary: :infinity, - pause_demand_boundary_unit: :buffers + pause_demand_boundary_unit: :buffers, + synchronization_strategy: :synchronize_on_arrival ) |> Enum.sort() %__MODULE__{ pause_demand_boundary: boundary, metric_unit: unit, - chunk_duration: chunk_duration + chunk_duration: chunk_duration, + synchronization_strategy: synchronization_strategy } end + @typedoc """ + Options passed to `#{inspect(__MODULE__)}.register_pad/3`. + + Following options are allowed: + - `:wait_on_buffers?` - `boolean()`, default to `true`. Specyfies, if the queue will wait with returning buffers + in `pop_*` functions, until it receives the first buffer from a pad passed as a second argument to the function. + - `:timestamp_offset` - integer. Specyfies, what will be the timestamp offset of a pad passed as a second argument + to the function. Allowed only if `#{inspect(__MODULE__)}` synchronization strategy is `:explicit_offsets`. + """ + @type register_pad_options :: [ + timestamp_offset: integer(), + wait_on_buffers?: boolean() + ] + @doc """ Registers an input pad in the queue without pushing anything on that pad. - Once a pad is registered, the `pop_available_items/3` function won't return buffers - until a `buffer` or `end_of_stream` is available on the registered pad. + Once a pad is registered with option `wait_on_buffers?: true` (default), the `pop_available_items/3` function won't + return any buffers until a `buffer` or `end_of_stream` is available on the registered pad. Pushing a buffer on an unregistered pad automatically registers it. """ - @spec register_pad(t(), Pad.ref()) :: t() - def register_pad(%__MODULE__{} = timestamp_queue, pad_ref) do - timestamp_queue - |> Map.update!(:registered_pads, &MapSet.put(&1, pad_ref)) + @spec register_pad(t(), Pad.ref(), register_pad_options()) :: t() + def register_pad(%__MODULE__{} = timestamp_queue, pad_ref, opts \\ []) do + [timestamp_offset: offset, wait_on_buffers?: wait?] = + opts + |> Keyword.validate!(timestamp_offset: nil, wait_on_buffers?: true) + |> Enum.sort() + + if offset != nil and timestamp_queue.synchronization_strategy == :synchronize_on_arrival do + raise """ + Option :timestamp_offset in #{inspect(__MODULE__)}.register_pad/3 cannot be set if #{inspect(__MODULE__)} \ + synchronization strategy is :synchronize_on_arrival (default). + """ + end + + with %{timestamp_offset: offset} when offset != nil <- + Map.get(timestamp_queue.pad_queues, pad_ref) do + raise """ + Cannot register pad `#{inspect(pad_ref)}, because buffers from it are already in `#{inspect(__MODULE__)}. \ + Every pad can be registered only before pushing the first buffer from it on the queue. + """ + end + + timestamp_queue = + if wait?, + do: Map.update!(timestamp_queue, :blocking_registered_pads, &MapSet.put(&1, pad_ref)), + else: timestamp_queue + + if offset != nil, + do: put_in(timestamp_queue, [:registered_pads_offsets, pad_ref], offset), + else: timestamp_queue end @doc """ @@ -134,23 +190,14 @@ defmodule Membrane.TimestampQueue do def push_buffer(%__MODULE__{} = timestamp_queue, pad_ref, buffer) do {actions, timestamp_queue} = timestamp_queue - |> push_item(pad_ref, {:buffer, buffer}) - |> Map.update!(:registered_pads, &MapSet.delete(&1, pad_ref)) - |> Map.update!(:awaiting_pads, &List.delete(&1, pad_ref)) + |> Map.update!(:pad_queues, &Map.put_new_lazy(&1, pad_ref, fn -> new_pad_queue() end)) |> get_and_update_in([:pad_queues, pad_ref], fn pad_queue -> old_buffers_size = pad_queue.buffers_size pad_queue = pad_queue |> Map.update!(:buffers_number, &(&1 + 1)) - |> Map.update!(:timestamp_offset, fn - nil -> timestamp_queue.current_queue_time - (buffer.dts || buffer.pts) - valid_offset -> valid_offset - end) - |> Map.update!(:use_pts?, fn - nil -> buffer.dts == nil - valid_boolean -> valid_boolean - end) + |> maybe_handle_first_buffer(pad_ref, buffer, timestamp_queue) |> increase_buffers_size(buffer, timestamp_queue.metric_unit) |> check_timestamps_consistency!(buffer, pad_ref) @@ -169,7 +216,6 @@ defmodule Membrane.TimestampQueue do timestamp_queue = timestamp_queue - |> Map.update!(:max_time_in_queues, &max(&1, buff_time)) |> Map.update!(:next_chunk_boundary, fn nil when timestamp_queue.chunk_duration != nil -> buff_time + timestamp_queue.chunk_duration @@ -177,10 +223,35 @@ defmodule Membrane.TimestampQueue do other -> other end) + |> remove_pad_from_registered_and_awaiting_pads(pad_ref) + |> push_pad_on_heap_if_qex_empty(pad_ref, -buff_time, pad_queue) + |> push_item_on_qex(pad_ref, {:buffer, buffer}) {actions, timestamp_queue} end + defp maybe_handle_first_buffer( + %{timestamp_offset: nil} = pad_queue, + pad_ref, + buffer, + timestamp_queue + ) do + offset = + case timestamp_queue.synchronization_strategy do + :synchronize_on_arrival -> + (buffer.dts || buffer.pts) - timestamp_queue.current_queue_time + + :explicit_offsets -> + Map.get(timestamp_queue.registered_pads_offsets, pad_ref, 0) + end + + use_pts? = buffer.pts != nil + + %{pad_queue | timestamp_offset: offset, use_pts?: use_pts?} + end + + defp maybe_handle_first_buffer(pad_queue, _pad_ref, _buffer, _timestamp_queue), do: pad_queue + defp check_timestamps_consistency!(pad_queue, buffer, pad_ref) do if not pad_queue.use_pts? and buffer.dts == nil do raise """ @@ -236,49 +307,21 @@ defmodule Membrane.TimestampQueue do timestamp_queue |> push_item(pad_ref, :end_of_stream) |> put_in([:pad_queues, pad_ref, :end_of_stream?], true) - |> Map.update!(:registered_pads, &MapSet.delete(&1, pad_ref)) - |> Map.update!(:awaiting_pads, &List.delete(&1, pad_ref)) + |> remove_pad_from_registered_and_awaiting_pads(pad_ref) end - defp push_item(%__MODULE__{closed?: true}, pad_ref, item) do - inspected_item = - case item do - :end_of_stream -> "end of stream" - {:stream_format, value} -> "stream format #{inspect(value)}" - {type, value} -> "#{type} #{inspect(value)}" - end - - raise """ - Unable to push #{inspected_item} from pad #{inspect(pad_ref)} on the already closed #{inspect(__MODULE__)}. \ - After calling #{inspect(__MODULE__)}.flush_and_close/1 queue is not capable to handle new items and new \ - queue has to be created. - """ + defp remove_pad_from_registered_and_awaiting_pads(timestamp_queue, pad_ref) do + timestamp_queue + |> Map.update!(:blocking_registered_pads, &MapSet.delete(&1, pad_ref)) + |> Map.update!(:registered_pads_offsets, &Map.delete(&1, pad_ref)) + |> Map.update!(:awaiting_pads, &List.delete(&1, pad_ref)) end defp push_item(%__MODULE__{} = timestamp_queue, pad_ref, item) do timestamp_queue - |> maybe_push_pad_on_heap_on_new_item(pad_ref, item) - |> Map.update!(:pad_queues, &Map.put_new(&1, pad_ref, new_pad_queue())) - |> update_in([:pad_queues, pad_ref, :qex], &Qex.push(&1, item)) - end - - defp maybe_push_pad_on_heap_on_new_item(timestamp_queue, pad_ref, item) do - pad_queue = Map.get(timestamp_queue.pad_queues, pad_ref) - empty_qex = Qex.new() - - case {item, pad_queue} do - {{:buffer, _buffer}, nil} -> - push_pad_on_heap(timestamp_queue, pad_ref, -timestamp_queue.current_queue_time) - - {{:buffer, buffer}, pad_queue} when pad_queue.qex == empty_qex -> - push_pad_on_heap(timestamp_queue, pad_ref, -buffer_time(buffer, pad_queue)) - - {_non_buffer, pad_queue} when pad_queue == nil or pad_queue.qex == empty_qex -> - push_pad_on_heap(timestamp_queue, pad_ref, :infinity) - - _else -> - timestamp_queue - end + |> Map.update!(:pad_queues, &Map.put_new_lazy(&1, pad_ref, fn -> new_pad_queue() end)) + |> push_pad_on_heap_if_qex_empty(pad_ref, :infinity) + |> push_item_on_qex(pad_ref, item) end defp new_pad_queue() do @@ -340,10 +383,10 @@ defmodule Membrane.TimestampQueue do do: %{pad_queue | buffers_size: pad_queue.buffers_size - byte_size(payload)} defp buffer_time(%Buffer{dts: dts}, %{use_pts?: false, timestamp_offset: timestamp_offset}), - do: dts + timestamp_offset + do: dts - timestamp_offset defp buffer_time(%Buffer{pts: pts}, %{use_pts?: true, timestamp_offset: timestamp_offset}), - do: pts + timestamp_offset + do: pts - timestamp_offset @type item :: {:stream_format, StreamFormat.t()} @@ -422,7 +465,7 @@ defmodule Membrane.TimestampQueue do min_max_time {_pad_ref, %{max_timestamp_on_qex: max_ts, timestamp_offset: offset}}, min_max_time -> - min(min_max_time, max_ts + offset) + min(min_max_time, max_ts - offset) end) do_pop_chunked(timestamp_queue, min_max_time) @@ -455,7 +498,8 @@ defmodule Membrane.TimestampQueue do defp do_pop(%__MODULE__{} = timestamp_queue, actions_acc, items_acc, pop_chunk?) do try_return_buffer? = - MapSet.size(timestamp_queue.registered_pads) == 0 and timestamp_queue.awaiting_pads == [] and + MapSet.size(timestamp_queue.blocking_registered_pads) == 0 and + timestamp_queue.awaiting_pads == [] and not timestamp_queue.chunk_full? case Heap.root(timestamp_queue.pads_heap) do @@ -562,6 +606,23 @@ defmodule Membrane.TimestampQueue do {actions, items, timestamp_queue} end + defp push_item_on_qex(timestamp_queue, pad_ref, item) do + :ok = ensure_queue_not_closed!(timestamp_queue, pad_ref, item) + + timestamp_queue + |> update_in([:pad_queues, pad_ref, :qex], &Qex.push(&1, item)) + end + + defp push_pad_on_heap_if_qex_empty(timestamp_queue, pad_ref, priority, pad_queue \\ nil) do + qex = + (pad_queue || Map.get(timestamp_queue.pad_queues, pad_ref)) + |> Map.get(:qex) + + if qex == Qex.new(), + do: push_pad_on_heap(timestamp_queue, pad_ref, priority), + else: timestamp_queue + end + defp push_pad_on_heap(timestamp_queue, pad_ref, priority) do heap_item = {priority, pad_ref} Map.update!(timestamp_queue, :pads_heap, &Heap.push(&1, heap_item)) @@ -580,7 +641,7 @@ defmodule Membrane.TimestampQueue do """ @spec flush_and_close(t()) :: {[Action.resume_auto_demand()], [popped_value()], t()} def flush_and_close(%__MODULE__{} = timestamp_queue) do - %{timestamp_queue | closed?: true, registered_pads: MapSet.new(), awaiting_pads: []} + %{timestamp_queue | closed?: true, blocking_registered_pads: MapSet.new(), awaiting_pads: []} |> Map.update!( :pad_queues, &Map.new(&1, fn {pad_ref, data} -> @@ -589,4 +650,21 @@ defmodule Membrane.TimestampQueue do ) |> pop_available_items() end + + defp ensure_queue_not_closed!(%__MODULE__{closed?: true}, pad_ref, item) do + inspected_item = + case item do + :end_of_stream -> "end of stream" + {:stream_format, value} -> "stream format #{inspect(value)}" + {type, value} -> "#{type} #{inspect(value)}" + end + + raise """ + Unable to push #{inspected_item} from pad #{inspect(pad_ref)} on the already closed #{inspect(__MODULE__)}. \ + After calling #{inspect(__MODULE__)}.flush_and_close/1 queue is not capable to handle new items and new \ + queue has to be created. + """ + end + + defp ensure_queue_not_closed!(_timestamp_queue, _pad_ref, _item), do: :ok end diff --git a/mix.exs b/mix.exs index 5a1973d..0750beb 100644 --- a/mix.exs +++ b/mix.exs @@ -38,7 +38,7 @@ defmodule Membrane.TimestampQueue.Mixfile do defp deps do [ {:membrane_core, "~> 1.0"}, - {:heap, "~> 3.0"}, + {:heap, "~> 2.0"}, {:ex_doc, ">= 0.0.0", only: :dev, runtime: false}, {:dialyxir, ">= 0.0.0", only: :dev, runtime: false}, {:credo, ">= 0.0.0", only: :dev, runtime: false} diff --git a/mix.lock b/mix.lock index 8b15e82..3b6f655 100644 --- a/mix.lock +++ b/mix.lock @@ -8,7 +8,7 @@ "erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"}, "ex_doc": {:hex, :ex_doc, "0.30.6", "5f8b54854b240a2b55c9734c4b1d0dd7bdd41f71a095d42a70445c03cf05a281", [:mix], [{:earmark_parser, "~> 1.4.31", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "bd48f2ddacf4e482c727f9293d9498e0881597eae6ddc3d9562bd7923375109f"}, "file_system": {:hex, :file_system, "0.2.10", "fb082005a9cd1711c05b5248710f8826b02d7d1784e7c3451f9c1231d4fc162d", [:mix], [], "hexpm", "41195edbfb562a593726eda3b3e8b103a309b733ad25f3d642ba49696bf715dc"}, - "heap": {:hex, :heap, "3.0.0", "c6dbcd6e9a0b021432176e89cfd829dd065bd6c115981fdcd981a4251fff5fde", [:mix], [], "hexpm", "373eaca5787e2a2b009c42338e70414f590ceabcf96cfc786627ed762ad4dfc6"}, + "heap": {:hex, :heap, "2.0.2", "d98cb178286cfeb5edbcf17785e2d20af73ca57b5a2cf4af584118afbcf917eb", [:mix], [], "hexpm", "ba9ea2fe99eb4bcbd9a8a28eaf71cbcac449ca1d8e71731596aace9028c9d429"}, "jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"}, "makeup": {:hex, :makeup, "1.1.0", "6b67c8bc2882a6b6a445859952a602afc1a41c2e08379ca057c0f525366fc3ca", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "0a45ed501f4a8897f580eabf99a2e5234ea3e75a4373c8a52824f6e873be57a6"}, "makeup_elixir": {:hex, :makeup_elixir, "0.16.1", "cc9e3ca312f1cfeccc572b37a09980287e243648108384b97ff2b76e505c3555", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "e127a341ad1b209bd80f7bd1620a15693a9908ed780c3b763bccf7d200c767c6"}, diff --git a/test/membrane_timestamp_queue/unit_test.exs b/test/membrane_timestamp_queue/unit_test.exs index 54bef8b..d14f81b 100644 --- a/test/membrane_timestamp_queue/unit_test.exs +++ b/test/membrane_timestamp_queue/unit_test.exs @@ -339,7 +339,7 @@ defmodule Membrane.TimestampQueue.UnitTest do end) end - test "queue doesn't return any buffer, if it should wait on buffer from registered pad" do + test "queue doesn't return any buffer, if it should wait on buffer from the registered pad" do queue = TimestampQueue.new() |> TimestampQueue.register_pad(:a) @@ -390,6 +390,45 @@ defmodule Membrane.TimestampQueue.UnitTest do MapSet.new([buffers, List.delete_at(buffers, 999)]) end + test "queue doesn't wait on buffers from pad registered with option wait_on_buffers?: false" do + buffer = %Buffer{dts: 0, payload: ""} + + assert {[], [a: {:buffer, ^buffer}], _queue} = + TimestampQueue.new() + |> TimestampQueue.register_pad(:b, wait_on_buffers?: false) + |> TimestampQueue.push_buffer_and_pop_available_items(:a, buffer) + end + + test "queue correctly sorts items from various pads when synchronization stratey is :explicit_offsets" do + offsets = %{a: 0, b: 10_001, c: 20_002, d: 30_003} + + queue = + TimestampQueue.new(synchronization_strategy: :explicit_offsets) + |> TimestampQueue.register_pad(:b, timestamp_offset: offsets.b, wait_on_buffers?: false) + |> TimestampQueue.register_pad(:c, timestamp_offset: offsets.c, wait_on_buffers?: false) + |> TimestampQueue.register_pad(:d, timestamp_offset: offsets.d, wait_on_buffers?: false) + + {data, queue} = + 1..100_000//10 + |> Enum.map_reduce(queue, fn i, queue -> + pad_ref = Enum.random([:a, :b, :c, :d]) + buffer = %Buffer{pts: i, payload: ""} + + {[], queue} = TimestampQueue.push_buffer(queue, pad_ref, buffer) + + {{pad_ref, {:buffer, buffer}}, queue} + end) + + expected_batch = + Enum.sort_by(data, fn {pad_ref, {:buffer, buffer}} -> + buffer.pts - offsets[pad_ref] + end) + + {[], given_batch, _queue} = TimestampQueue.flush_and_close(queue) + + assert given_batch == expected_batch + end + test "queue returns events and stream formats, even if it cannot return next buffer" do queue = TimestampQueue.new() @@ -471,6 +510,8 @@ defmodule Membrane.TimestampQueue.UnitTest do assert closed_queue.pads_heap == popped_queue.pads_heap assert closed_queue.pad_queues == popped_queue.pad_queues + assert closed_queue.closed? + assert_raise RuntimeError, ~r/Unable to push .* already closed/, fn -> buffer = %Buffer{dts: 10_001, payload: ""} TimestampQueue.push_buffer(closed_queue, Pad.ref(:input, 0), buffer) @@ -479,14 +520,14 @@ defmodule Membrane.TimestampQueue.UnitTest do end test "pop_chunked/1 returns properly chunked buffers from a single pad" do - overbound = Membrane.Time.seconds(10) + upperbound = Membrane.Time.seconds(10) step = Membrane.Time.millisecond() chunk_duration = Membrane.Time.second() queue = TimestampQueue.new(chunk_duration: chunk_duration) buffers = - 1..overbound//step + 1..upperbound//step |> Enum.map(&%Buffer{pts: &1, payload: ""}) {[], queue} = @@ -496,7 +537,7 @@ defmodule Membrane.TimestampQueue.UnitTest do queue end) |> TimestampQueue.push_buffer(:input, %Buffer{ - pts: overbound + Membrane.Time.nanosecond(), + pts: upperbound + Membrane.Time.nanosecond(), payload: "" })