diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS new file mode 100644 index 0000000..79d1cc2 --- /dev/null +++ b/.github/CODEOWNERS @@ -0,0 +1,2 @@ +* @FelonEkonom + diff --git a/README.md b/README.md index e057e4c..e5f9b24 100644 --- a/README.md +++ b/README.md @@ -1,35 +1,29 @@ -# Membrane Template Plugin +# Membrane Timestamp Queue -[![Hex.pm](https://img.shields.io/hexpm/v/membrane_template_plugin.svg)](https://hex.pm/packages/membrane_template_plugin) -[![API Docs](https://img.shields.io/badge/api-docs-yellow.svg?style=flat)](https://hexdocs.pm/membrane_template_plugin) -[![CircleCI](https://circleci.com/gh/membraneframework/membrane_template_plugin.svg?style=svg)](https://circleci.com/gh/membraneframework/membrane_template_plugin) +[![Hex.pm](https://img.shields.io/hexpm/v/membrane_timestamp_queue.svg)](https://hex.pm/packages/membrane_timestamp_queue) +[![API Docs](https://img.shields.io/badge/api-docs-yellow.svg?style=flat)](https://hexdocs.pm/membrane_timestamp_queue) +[![CircleCI](https://circleci.com/gh/membraneframework/membrane_timestamp_queue.svg?style=svg)](https://circleci.com/gh/membraneframework/membrane_timestamp_queue) -This repository contains a template for new plugins. - -Check out different branches for other flavors of this template. +This repository contains implementation of `Membrane.TimestampQueue`, a helper queue that is aimed to help manage flow control in `Membrane` elements with pads with `flow_control: :auto`. It's a part of the [Membrane Framework](https://membrane.stream). ## Installation -The package can be installed by adding `membrane_template_plugin` to your list of dependencies in `mix.exs`: +The package can be installed by adding `membrane_timestamp_queue` to your list of dependencies in `mix.exs`: ```elixir def deps do [ - {:membrane_template_plugin, "~> 0.1.0"} + {:membrane_timestamp_queue, "~> 0.1.0"} ] end ``` -## Usage - -TODO - ## Copyright and License -Copyright 2020, [Software Mansion](https://swmansion.com/?utm_source=git&utm_medium=readme&utm_campaign=membrane_template_plugin) +Copyright 2020, [Software Mansion](https://swmansion.com/?utm_source=git&utm_medium=readme&utm_campaign=membrane_timestamp_queue) -[![Software Mansion](https://logo.swmansion.com/logo?color=white&variant=desktop&width=200&tag=membrane-github)](https://swmansion.com/?utm_source=git&utm_medium=readme&utm_campaign=membrane_template_plugin) +[![Software Mansion](https://logo.swmansion.com/logo?color=white&variant=desktop&width=200&tag=membrane-github)](https://swmansion.com/?utm_source=git&utm_medium=readme&utm_campaign=membrane_timestamp_queue) Licensed under the [Apache License, Version 2.0](LICENSE) diff --git a/lib/membrane/timestamp_queue.ex b/lib/membrane/timestamp_queue.ex new file mode 100644 index 0000000..30186af --- /dev/null +++ b/lib/membrane/timestamp_queue.ex @@ -0,0 +1,386 @@ +defmodule Membrane.TimestampQueue do + @moduledoc """ + Implementation of a queue, that accepts: + - Membrane buffers + - events + - stream formats + - end of streams + from various pads. Items in queue are sorted according to their timestamps. + + Moreover, #{inspect(__MODULE__)} is able to manage demand of pads, based on the amount of buffers + from each pad currently stored in the queue. + """ + + use Bunch.Access + + alias Membrane.{Buffer, Event, Pad, StreamFormat} + alias Membrane.Buffer.Metric + alias Membrane.Element.Action + + @type pad_queue :: %{ + timestamp_offset: integer(), + qex: Qex.t(), + buffers_size: non_neg_integer(), + buffers_number: non_neg_integer(), + update_heap_on_buffer?: boolean(), + paused_demand?: boolean(), + end_of_stream?: boolean(), + use_pts?: boolean() | nil, + max_timestamp_on_qex: Membrane.Time.t() | nil + } + + @typedoc """ + A queue, that accepts buffers, stream formats and events from various pads and sorts them based on + their timestamps. + """ + @opaque t :: %__MODULE__{ + current_queue_time: Membrane.Time.t(), + pause_demand_boundary: pos_integer() | :infinity, + metric: Metric.ByteSize | Metric.Count, + pad_queues: %{optional(Pad.ref()) => pad_queue()}, + pads_heap: Heap.t(), + waiting_on_buffer_from: MapSet.t() + } + + defstruct current_queue_time: Membrane.Time.seconds(0), + pause_demand_boundary: :infinity, + metric: Metric.Count, + pad_queues: %{}, + pads_heap: Heap.max(), + waiting_on_buffer_from: MapSet.new() + + @typedoc """ + Options passed to #{inspect(__MODULE__)}.new/1. + + Following options are allowed: + - `:pause_demand_boundary` - positive integer or `:infinity` (default to `:infinity`). Tells, what + amount of buffers associated with specific pad must be stored in the queue, to pause auto demand. + - `:pause_demand_boundary_unit` - `:buffers` or `:bytes` (deafult to `:buffers`). Tells, in which metric + `:pause_demand_boundary` is specified. + """ + @type options :: [ + pause_demand_boundary: pos_integer() | :infinity, + pause_demand_boundary_unit: :buffers | :bytes + ] + + @spec new(options) :: t() + def new(options \\ []) do + metric = + options + |> Keyword.get(:pause_demand_boundary_unit, :buffers) + |> Metric.from_unit() + + %__MODULE__{ + metric: metric, + pause_demand_boundary: Keyword.get(options, :pause_demand_boundary, :infinity) + } + end + + @doc """ + Registers an input pad in the queue without pushing anything on that pad. + + Once a pad is registered, the `pop_batch/3` function won't return buffers + until a `buffer` or `end_of_stream` is available on the registered pad. + + Pushing a buffer on an unregistered pad automatically registers it. + """ + @spec register_pad(t(), Pad.ref()) :: t() + def register_pad(%__MODULE__{} = timestamp_queue, pad_ref) do + timestamp_queue + |> Map.update!(:waiting_on_buffer_from, &MapSet.put(&1, pad_ref)) + end + + @doc """ + Pushes a buffer associated with a specified pad to the queue. + + Returns a suggested actions list and the updated queue. + + If amount of buffers associated with specified pad in the queue just exceded + `pause_demand_boundary`, the suggested actions list contains `t:Action.pause_auto_demand()` + action, otherwise it is equal an empty list. + + Buffers pushed to the queue must have a non-`nil` `dts` or `pts`. + """ + @spec push_buffer(t(), Pad.ref(), Buffer.t()) :: {[Action.pause_auto_demand()], t()} + def push_buffer(_timestamp_queue, pad_ref, %Buffer{dts: nil, pts: nil} = buffer) do + raise """ + #{inspect(__MODULE__)} accepts only buffers whose dts or pts is not nil, but it received\n#{inspect(buffer, pretty: true)} + from pad #{inspect(pad_ref)} + """ + end + + def push_buffer(%__MODULE__{} = timestamp_queue, pad_ref, buffer) do + buffer_size = timestamp_queue.metric.buffers_size([buffer]) + + timestamp_queue + |> Map.update!(:waiting_on_buffer_from, &MapSet.delete(&1, pad_ref)) + |> push_item(pad_ref, {:buffer, buffer}) + |> get_and_update_in([:pad_queues, pad_ref], fn pad_queue -> + pad_queue + |> Map.merge(%{ + buffers_size: pad_queue.buffers_size + buffer_size, + buffers_number: pad_queue.buffers_number + 1 + }) + |> Map.update!(:timestamp_offset, fn + nil -> timestamp_queue.current_queue_time - (buffer.dts || buffer.pts) + valid_offset -> valid_offset + end) + |> Map.update!(:use_pts?, fn + nil -> buffer.dts == nil + valid_boolean -> valid_boolean + end) + |> check_timestamps_consistency!(buffer, pad_ref) + |> actions_after_pushing_buffer(pad_ref, timestamp_queue.pause_demand_boundary) + end) + end + + defp check_timestamps_consistency!(pad_queue, buffer, pad_ref) do + if not pad_queue.use_pts? and buffer.dts == nil do + raise """ + Buffer #{inspect(buffer, pretty: true)} from pad #{inspect(pad_ref)} has nil dts, while \ + the first buffer from this pad had valid integer there. If the first buffer from a pad has \ + dts different from nil, all later buffers from this pad must meet this property. + """ + end + + buffer_timestamp = if pad_queue.use_pts?, do: buffer.pts, else: buffer.dts + max_timestamp = pad_queue.max_timestamp_on_qex + + if is_integer(max_timestamp) and max_timestamp > buffer_timestamp do + timestamp_field = if pad_queue.use_pts?, do: "pts", else: "dts" + + raise """ + Buffer #{inspect(buffer, pretty: true)} from pad #{inspect(pad_ref)} has #{timestamp_field} equal \ + #{inspect(buffer_timestamp)}, but previous buffer pushed on queue from this pad had #{timestamp_field} \ + equal #{inspect(max_timestamp)}. Buffers from a single pad must have non-decreasing timestamps. + """ + end + + %{pad_queue | max_timestamp_on_qex: buffer_timestamp} + end + + @doc """ + Pushes stream format associated with a specified pad to the queue. + + Returns the updated queue. + """ + @spec push_stream_format(t(), Pad.ref(), StreamFormat.t()) :: t() + def push_stream_format(%__MODULE__{} = timestamp_queue, pad_ref, stream_format) do + push_item(timestamp_queue, pad_ref, {:stream_format, stream_format}) + end + + @doc """ + Pushes event associated with a specified pad to the queue. + + Returns the updated queue. + """ + @spec push_event(t(), Pad.ref(), Event.t()) :: t() + def push_event(%__MODULE__{} = timestamp_queue, pad_ref, event) do + push_item(timestamp_queue, pad_ref, {:event, event}) + end + + @doc """ + Pushes end of stream of the specified pad to the queue. + + Returns the updated queue. + """ + @spec push_end_of_stream(t(), Pad.ref()) :: t() + def push_end_of_stream(%__MODULE__{} = timestamp_queue, pad_ref) do + timestamp_queue + |> Map.update!(:waiting_on_buffer_from, &MapSet.delete(&1, pad_ref)) + |> push_item(pad_ref, :end_of_stream) + |> put_in([:pad_queues, pad_ref, :end_of_stream?], true) + end + + defp push_item(%__MODULE__{} = timestamp_queue, pad_ref, item) do + timestamp_queue + |> maybe_handle_item_from_new_pad(item, pad_ref) + |> update_in( + [:pad_queues, pad_ref, :qex], + &Qex.push(&1, item) + ) + end + + defp maybe_handle_item_from_new_pad( + %__MODULE__{pad_queues: pad_queues} = timestamp_queue, + _item, + pad_ref + ) + when is_map_key(pad_queues, pad_ref) do + timestamp_queue + end + + defp maybe_handle_item_from_new_pad(%__MODULE__{} = timestamp_queue, first_item, pad_ref) do + priority = + case first_item do + {:buffer, _buffer} -> -timestamp_queue.current_queue_time + _other -> :infinity + end + + timestamp_queue + |> put_in([:pad_queues, pad_ref], new_pad_queue()) + |> Map.update!(:pads_heap, &Heap.push(&1, {priority, pad_ref})) + end + + defp new_pad_queue() do + %{ + timestamp_offset: nil, + qex: Qex.new(), + buffers_size: 0, + buffers_number: 0, + update_heap_on_buffer?: true, + paused_demand?: false, + end_of_stream?: false, + use_pts?: nil, + max_timestamp_on_qex: nil, + recently_returned_timestamp: nil + } + end + + defp actions_after_pushing_buffer(pad_queue, pad_ref, pause_demand_boundary) do + if not pad_queue.paused_demand? and pad_queue.buffers_size >= pause_demand_boundary do + {[pause_auto_demand: pad_ref], %{pad_queue | paused_demand?: true}} + else + {[], pad_queue} + end + end + + defp buffer_time(%Buffer{dts: dts}, %{use_pts?: false, timestamp_offset: timestamp_offset}), + do: dts + timestamp_offset + + defp buffer_time(%Buffer{pts: pts}, %{use_pts?: true, timestamp_offset: timestamp_offset}), + do: pts + timestamp_offset + + @type item :: + {:stream_format, StreamFormat.t()} + | {:buffer, Buffer.t()} + | {:event, Event.t()} + | :end_of_stream + + @type popped_value :: {Pad.ref(), item()} + + @doc """ + Pops items from the queue while they are available. + + An item that is not a buffer is always considered available. A buffer is + available when the following conditions are met: + - There is another buffer or `end_of_stream` enqueued on the pad + - On each other pad there is either `end_of_stream` or a buffer with a lower timestamp. + + The returned value is a suggested actions list, a list of popped buffers and the updated queue. + + If the amount of buffers associated with any pad in the queue falls below the + `pause_demand_boundary`, the suggested actions list contains `t:Action.resume_auto_demand()` + actions, otherwise it is an empty list. + """ + @spec pop_batch(t()) :: {[Action.resume_auto_demand()], [popped_value() | :none], t()} + def pop_batch(%__MODULE__{} = timestamp_queue) do + {batch, timestamp_queue} = do_pop_batch(timestamp_queue) + + {actions, timestamp_queue} = + batch + |> Enum.reduce(MapSet.new(), fn + {pad_ref, {:buffer, _buffer}}, map_set -> MapSet.put(map_set, pad_ref) + _other, map_set -> map_set + end) + |> Enum.reduce({[], timestamp_queue}, fn pad_ref, {actions_acc, timestamp_queue} -> + {actions, timestamp_queue} = actions_after_popping_buffer(timestamp_queue, pad_ref) + {actions ++ actions_acc, timestamp_queue} + end) + + {actions, batch, timestamp_queue} + end + + @spec do_pop_batch(t(), [popped_value()]) :: {[popped_value() | :none], t()} + defp do_pop_batch(timestamp_queue, acc \\ []) do + case do_pop(timestamp_queue) do + {:none, timestamp_queue} -> {Enum.reverse(acc), timestamp_queue} + {value, timestamp_queue} -> do_pop_batch(timestamp_queue, [value | acc]) + end + end + + @spec do_pop(t()) :: {popped_value() | :none, t()} + defp do_pop(timestamp_queue) do + if MapSet.size(timestamp_queue.waiting_on_buffer_from) == 0 do + case Heap.root(timestamp_queue.pads_heap) do + {_priority, pad_ref} -> do_pop(timestamp_queue, pad_ref) + nil -> {:none, timestamp_queue} + end + else + case Heap.root(timestamp_queue.pads_heap) do + # priority :infinity cannot be associated with a buffer + {:infinity, pad_ref} -> do_pop(timestamp_queue, pad_ref) + _other -> {:none, timestamp_queue} + end + end + end + + @spec do_pop(t(), Pad.ref()) :: {popped_value() | :none, t()} + defp do_pop(timestamp_queue, pad_ref) do + pad_queue = Map.get(timestamp_queue.pad_queues, pad_ref) + + case Qex.pop(pad_queue.qex) do + {{:value, {:buffer, buffer}}, qex} -> + buffer_time = buffer_time(buffer, pad_queue) + + case pad_queue do + %{update_heap_on_buffer?: true} -> + timestamp_queue + |> Map.update!(:pads_heap, &(&1 |> Heap.pop() |> Heap.push({-buffer_time, pad_ref}))) + |> put_in([:pad_queues, pad_ref, :update_heap_on_buffer?], false) + |> do_pop() + + %{buffers_number: 1, end_of_stream?: false} -> + # last buffer on pad queue without end of stream + {:none, timestamp_queue} + + pad_queue -> + buffer_size = timestamp_queue.metric.buffers_size([buffer]) + + pad_queue = %{ + pad_queue + | qex: qex, + buffers_size: pad_queue.buffers_size - buffer_size, + buffers_number: pad_queue.buffers_number - 1, + update_heap_on_buffer?: true + } + + timestamp_queue = + timestamp_queue + |> Map.put(:current_queue_time, buffer_time) + |> put_in([:pad_queues, pad_ref], pad_queue) + + {{pad_ref, {:buffer, buffer}}, timestamp_queue} + end + + {{:value, item}, qex} -> + timestamp_queue = + timestamp_queue + |> put_in([:pad_queues, pad_ref, :qex], qex) + + {{pad_ref, item}, timestamp_queue} + + {:empty, _qex} -> + timestamp_queue + |> Map.update!(:pad_queues, &Map.delete(&1, pad_ref)) + |> Map.update!(:pads_heap, &Heap.pop/1) + |> do_pop() + end + end + + defp actions_after_popping_buffer( + %__MODULE__{pause_demand_boundary: boundary} = timestamp_queue, + pad_ref + ) do + with %{paused_demand?: true, buffers_size: size} when size < boundary <- + get_in(timestamp_queue, [:pad_queues, pad_ref]) do + timestamp_queue = + timestamp_queue + |> put_in([:pad_queues, pad_ref, :paused_demand?], false) + + {[resume_auto_demand: pad_ref], timestamp_queue} + else + _other -> {[], timestamp_queue} + end + end +end diff --git a/lib/membrane_template.ex b/lib/membrane_template.ex deleted file mode 100644 index c6882fb..0000000 --- a/lib/membrane_template.ex +++ /dev/null @@ -1,2 +0,0 @@ -defmodule Membrane.Template do -end diff --git a/mix.exs b/mix.exs index 3d9466b..bfb94fb 100644 --- a/mix.exs +++ b/mix.exs @@ -1,12 +1,12 @@ -defmodule Membrane.Template.Mixfile do +defmodule Membrane.TimestampQueue.Mixfile do use Mix.Project @version "0.1.0" - @github_url "https://github.com/membraneframework/membrane_template_plugin" + @github_url "https://github.com/membraneframework/membrane_timestamp_queue" def project do [ - app: :membrane_template_plugin, + app: :membrane_timestamp_queue, version: @version, elixir: "~> 1.13", elixirc_paths: elixirc_paths(Mix.env()), @@ -15,11 +15,11 @@ defmodule Membrane.Template.Mixfile do dialyzer: dialyzer(), # hex - description: "Template Plugin for Membrane Framework", + description: "Membrane Timestamp Queue", package: package(), # docs - name: "Membrane Template plugin", + name: "Membrane Timestamp Queue", source_url: @github_url, docs: docs() ] @@ -37,6 +37,7 @@ defmodule Membrane.Template.Mixfile do defp deps do [ {:membrane_core, "~> 1.0"}, + {:heap, "~> 3.0"}, {:ex_doc, ">= 0.0.0", only: :dev, runtime: false}, {:dialyxir, ">= 0.0.0", only: :dev, runtime: false}, {:credo, ">= 0.0.0", only: :dev, runtime: false} @@ -73,7 +74,7 @@ defmodule Membrane.Template.Mixfile do extras: ["README.md", "LICENSE"], formatters: ["html"], source_ref: "v#{@version}", - nest_modules_by_prefix: [Membrane.Template] + nest_modules_by_prefix: [Membrane.TimestampQueue] ] end end diff --git a/mix.lock b/mix.lock index 8d15140..8b15e82 100644 --- a/mix.lock +++ b/mix.lock @@ -8,6 +8,7 @@ "erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"}, "ex_doc": {:hex, :ex_doc, "0.30.6", "5f8b54854b240a2b55c9734c4b1d0dd7bdd41f71a095d42a70445c03cf05a281", [:mix], [{:earmark_parser, "~> 1.4.31", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "bd48f2ddacf4e482c727f9293d9498e0881597eae6ddc3d9562bd7923375109f"}, "file_system": {:hex, :file_system, "0.2.10", "fb082005a9cd1711c05b5248710f8826b02d7d1784e7c3451f9c1231d4fc162d", [:mix], [], "hexpm", "41195edbfb562a593726eda3b3e8b103a309b733ad25f3d642ba49696bf715dc"}, + "heap": {:hex, :heap, "3.0.0", "c6dbcd6e9a0b021432176e89cfd829dd065bd6c115981fdcd981a4251fff5fde", [:mix], [], "hexpm", "373eaca5787e2a2b009c42338e70414f590ceabcf96cfc786627ed762ad4dfc6"}, "jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"}, "makeup": {:hex, :makeup, "1.1.0", "6b67c8bc2882a6b6a445859952a602afc1a41c2e08379ca057c0f525366fc3ca", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "0a45ed501f4a8897f580eabf99a2e5234ea3e75a4373c8a52824f6e873be57a6"}, "makeup_elixir": {:hex, :makeup_elixir, "0.16.1", "cc9e3ca312f1cfeccc572b37a09980287e243648108384b97ff2b76e505c3555", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "e127a341ad1b209bd80f7bd1620a15693a9908ed780c3b763bccf7d200c767c6"}, diff --git a/test/membrane_timestamp_queue/unit_test.exs b/test/membrane_timestamp_queue/unit_test.exs new file mode 100644 index 0000000..f305bb0 --- /dev/null +++ b/test/membrane_timestamp_queue/unit_test.exs @@ -0,0 +1,334 @@ +defmodule Membrane.TimestampQueue.UnitTest do + use ExUnit.Case, async: true + + require Membrane.Pad, as: Pad + + alias Membrane.Buffer + alias Membrane.TimestampQueue + + test "queue raises on buffer with nil dts" do + assert_raise(RuntimeError, fn -> + TimestampQueue.new() + |> TimestampQueue.push_buffer(:input, %Buffer{dts: nil, payload: <<>>}) + end) + end + + test "queue sorts some buffers from different pads based on buffer dts" do + input_order = [9, 4, 7, 3, 1, 8, 5, 6, 2, 10] + + pad_generator = fn i -> Pad.ref(:input, i) end + buffer_generator = fn i -> %Buffer{dts: i, payload: <<>>} end + + queue = + input_order + |> Enum.reduce(TimestampQueue.new(), fn i, queue -> + assert {[], queue} = + TimestampQueue.push_buffer(queue, pad_generator.(i), %Buffer{ + dts: 0, + payload: <<>> + }) + + queue + end) + + queue = + input_order + |> Enum.reduce(queue, fn i, queue -> + assert {[], queue} = + TimestampQueue.push_buffer(queue, pad_generator.(i), buffer_generator.(i)) + + queue + end) + + # assert that queue won't pop last buffer from pad queue, if it hasn't recevied EoS on this pad + assert {[], batch, queue} = TimestampQueue.pop_batch(queue) + + Enum.each(batch, fn item -> + assert {_pad_ref, {:buffer, %Buffer{dts: 0}}} = item + end) + + queue = + input_order + |> Enum.reduce(queue, fn i, queue -> + TimestampQueue.push_end_of_stream(queue, pad_generator.(i)) + end) + + assert {[], batch, queue} = TimestampQueue.pop_batch(queue) + + # assert batch + expected_batch = + input_order + |> Enum.sort() + |> Enum.flat_map(fn i -> + [ + {pad_generator.(i), {:buffer, buffer_generator.(i)}}, + {pad_generator.(i), :end_of_stream} + ] + end) + + assert batch == expected_batch + + # assert queue empty + assert queue.pad_queues == TimestampQueue.new().pad_queues + assert queue.pads_heap == TimestampQueue.new().pads_heap + end + + defmodule StreamFormat do + defstruct [:dts] + end + + defmodule Event do + defstruct [:dts] + end + + test "queue sorts buffers a lot of buffers from different pads based on buffer dts" do + pads_number = 100 + pad_items_number = 200 + + dts_offsets = + Map.new(1..pads_number, fn pad_idx -> + {Pad.ref(:input, pad_idx), Enum.random(1..10_000)} + end) + + pads_items = + Map.new(1..pads_number, fn pad_idx -> + pad_ref = Pad.ref(:input, pad_idx) + dts_offset = dts_offsets[pad_ref] + + {items, _last_buffer_dts} = + Enum.map_reduce(dts_offset..(dts_offset + pad_items_number - 1), dts_offset, fn idx, + last_buffer_dts -> + if idx == dts_offset do + {{:push_buffer, %Buffer{dts: idx, payload: <<>>}}, idx} + else + Enum.random([ + {{:push_buffer, %Buffer{dts: idx, payload: <<>>}}, idx}, + {{:push_event, %Event{dts: last_buffer_dts}}, last_buffer_dts}, + {{:push_stream_format, %StreamFormat{dts: last_buffer_dts}}, last_buffer_dts} + ]) + end + end) + + {pad_ref, items} + end) + + queue = TimestampQueue.new() + + {pads_items, queue} = + 1..(pads_number * pad_items_number) + |> Enum.reduce({pads_items, queue}, fn _i, {pads_items, queue} -> + {pad_ref, items} = Enum.random(pads_items) + [{fun_name, item} | items] = items + + pads_items = + case items do + [] -> Map.delete(pads_items, pad_ref) + items -> Map.put(pads_items, pad_ref, items) + end + + queue = + case apply(TimestampQueue, fun_name, [queue, pad_ref, item]) do + # if buffer + {[], queue} -> queue + # if event or stream_format + queue -> queue + end + + {pads_items, queue} + end) + + queue = + Enum.reduce(1..pads_number, queue, fn i, queue -> + TimestampQueue.push_end_of_stream(queue, Pad.ref(:input, i)) + end) + + # sanity check, that test is written correctly + assert %{} = pads_items + + assert {[], batch, _queue} = TimestampQueue.pop_batch(queue) + assert length(batch) == pads_number * pad_items_number + pads_number + + batch_without_eos = Enum.reject(batch, &match?({_pad_ref, :end_of_stream}, &1)) + + sorted_batch_without_eos = + batch_without_eos + |> Enum.sort_by(fn {pad_ref, {_type, item}} -> item.dts - dts_offsets[pad_ref] end) + + assert batch_without_eos == sorted_batch_without_eos + end + + test "queue prioritizes stream formats and buffers not preceded by a buffer" do + queue = TimestampQueue.new() + + {[], queue} = TimestampQueue.push_buffer(queue, :a, %Buffer{dts: 1, payload: <<>>}) + {[], queue} = TimestampQueue.push_buffer(queue, :a, %Buffer{dts: 2, payload: <<>>}) + + assert {[], [a: {:buffer, %Buffer{dts: 1}}], queue} = TimestampQueue.pop_batch(queue) + + {[], queue} = TimestampQueue.push_buffer(queue, :a, %Buffer{dts: 3, payload: <<>>}) + queue = TimestampQueue.push_end_of_stream(queue, :a) + + queue = TimestampQueue.push_stream_format(queue, :b, %StreamFormat{}) + queue = TimestampQueue.push_event(queue, :b, %Event{}) + + assert {[], batch, queue} = TimestampQueue.pop_batch(queue) + + assert batch == [ + b: {:stream_format, %StreamFormat{}}, + b: {:event, %Event{}}, + a: {:buffer, %Buffer{dts: 2, payload: <<>>}}, + a: {:buffer, %Buffer{dts: 3, payload: <<>>}}, + a: :end_of_stream + ] + + assert {[], [], ^queue} = TimestampQueue.pop_batch(queue) + end + + [ + %{unit: :buffers, buffer_size: 1, buffer: %Buffer{dts: 0, payload: <<>>}}, + %{unit: :bytes, buffer_size: 100, buffer: %Buffer{dts: 0, payload: <<1::8*100>>}} + ] + |> Enum.map(fn params -> + test "queue returns proper suggested actions when boundary unit is #{inspect(params.unit)}" do + %{unit: unit, buffer_size: buffer_size, buffer: buffer} = + unquote(Macro.escape(params)) + + boundary_in_buff_no = 100 + boundary = buffer_size * boundary_in_buff_no + + queue = + TimestampQueue.new( + pause_demand_boundary: boundary, + pause_demand_boundary_unit: unit + ) + + queue = + 1..(boundary_in_buff_no - 1) + |> Enum.reduce(queue, fn _i, queue -> + assert {[], queue} = TimestampQueue.push_buffer(queue, :input, buffer) + queue + end) + + assert {[pause_auto_demand: :input], queue} = + TimestampQueue.push_buffer(queue, :input, buffer) + + queue = + 1..(boundary_in_buff_no - 1) + |> Enum.reduce(queue, fn _i, queue -> + assert {[], queue} = TimestampQueue.push_buffer(queue, :input, buffer) + queue + end) + + pop_item = {:input, {:buffer, buffer}} + + expected_batch = for _i <- 1..(2 * boundary_in_buff_no - 2), do: pop_item + + assert {[resume_auto_demand: :input], ^expected_batch, _queue} = + TimestampQueue.pop_batch(queue) + end + end) + + test "queue sorts buffers from various pads when they aren't linked in the same moment" do + iteration_size = 100 + iterations = 100 + + 1..iterations + |> Enum.reduce(TimestampQueue.new(), fn pads_in_iteration, queue -> + pads = for i <- 1..pads_in_iteration, do: Pad.ref(:input, i) + new_pad = Pad.ref(:input, pads_in_iteration) + + queue = + Enum.reduce([0, 1], queue, fn timestamp, queue -> + timestamp_field = if div(pads_in_iteration, 2) == 1, do: :dts, else: :pts + + buffer = + %Buffer{payload: <<>>} + |> Map.put(timestamp_field, timestamp) + + {[], queue} = TimestampQueue.push_buffer(queue, new_pad, buffer) + queue + end) + + queue = + pads + |> Enum.reduce(queue, fn pad_ref, queue -> + Pad.ref(:input, pad_idx) = pad_ref + pad_offset = iteration_size * (pads_in_iteration - pad_idx) + 2 + timestamp_field = if div(pad_idx, 2) == 1, do: :dts, else: :pts + + pad_offset..(pad_offset + iteration_size - 1) + |> Enum.reduce(queue, fn timestamp, queue -> + buffer = + %Buffer{payload: <<>>} + |> Map.put(timestamp_field, timestamp) + + {[], queue} = TimestampQueue.push_buffer(queue, pad_ref, buffer) + queue + end) + end) + + {[], batch, queue} = TimestampQueue.pop_batch(queue) + + sorted_batch = + batch + |> Enum.sort_by(fn {Pad.ref(:input, pad_idx), {:buffer, buffer}} -> + (buffer.dts || buffer.pts) + pad_idx * iteration_size + end) + + assert batch == sorted_batch + + queue + end) + end + + test "queue doesn't return any buffer, if it should wait on buffer from registered pad" do + queue = + TimestampQueue.new() + |> TimestampQueue.register_pad(:a) + |> TimestampQueue.register_pad(:b) + + events = for i <- 1..1000, do: %Event{dts: i} + buffers = for i <- 1..1000, do: %Buffer{dts: i, payload: <<>>} + + queue = + events + |> Enum.reduce(queue, fn event, queue -> + queue + |> TimestampQueue.push_event(:a, event) + |> TimestampQueue.push_event(:b, event) + end) + + queue = + buffers + |> Enum.reduce(queue, fn buffer, queue -> + {[], queue} = TimestampQueue.push_buffer(queue, :a, buffer) + queue + end) + + {[], batch, queue} = TimestampQueue.pop_batch(queue) + + grouped_batch = Enum.group_by(batch, &elem(&1, 0), &(elem(&1, 1) |> elem(1))) + assert grouped_batch == %{a: events, b: events} + + assert {[], [], queue} = TimestampQueue.pop_batch(queue) + + queue = + buffers + |> Enum.reduce(queue, fn buffer, queue -> + {[], queue} = TimestampQueue.push_buffer(queue, :b, buffer) + queue + end) + + {[], batch, _queue} = TimestampQueue.pop_batch(queue) + + sorted_batch = Enum.sort_by(batch, fn {_pad_ref, {:buffer, buffer}} -> buffer.dts end) + assert batch == sorted_batch + + grouped_batch = Enum.group_by(batch, &elem(&1, 0), &(elem(&1, 1) |> elem(1))) + + assert grouped_batch == %{ + a: List.delete_at(buffers, 999), + b: List.delete_at(buffers, 999) + } + end +end