diff --git a/lib/membrane/timestamp_queue.ex b/lib/membrane/timestamp_queue.ex index f259aa57d..944a938d7 100644 --- a/lib/membrane/timestamp_queue.ex +++ b/lib/membrane/timestamp_queue.ex @@ -63,6 +63,11 @@ defmodule Membrane.TimestampQueue do pause_demand_boundary_unit: :buffers | :bytes ] + @doc """ + Creates and returnes new #{inspect(__MODULE__)}. + + Accepts `t:options()`. + """ @spec new(options) :: t() def new(options \\ []) do metric = @@ -76,8 +81,12 @@ defmodule Membrane.TimestampQueue do } end - @spec register_pad(t(), Pad.ref()) :: t() - def register_pad(%__MODULE__{} = timestamp_queue, pad_ref) do + @doc """ + Makes the queue not return any buffer in `pop_batch/3`, until a buffer or end of stream arrival + from `pad_ref`. + """ + @spec wait_on_pad(t(), Pad.ref()) :: t() + def wait_on_pad(%__MODULE__{} = timestamp_queue, pad_ref) do timestamp_queue |> Map.update!(:waiting_on_buffer_from, &MapSet.put(&1, pad_ref)) end diff --git a/test/membrane/timestamp_queue_test.exs b/test/membrane/timestamp_queue_test.exs index f860f7bd2..9b5f860d1 100644 --- a/test/membrane/timestamp_queue_test.exs +++ b/test/membrane/timestamp_queue_test.exs @@ -305,8 +305,8 @@ defmodule Membrane.TimestampQueueTest do test "registering pads" do queue = TimestampQueue.new() - |> TimestampQueue.register_pad(:a) - |> TimestampQueue.register_pad(:b) + |> TimestampQueue.wait_on_pad(:a) + |> TimestampQueue.wait_on_pad(:b) events = for i <- 1..1000, do: %Event{dts: i} buffers = for i <- 1..1000, do: %Buffer{dts: i, payload: <<>>} @@ -346,9 +346,10 @@ defmodule Membrane.TimestampQueueTest do assert batch == sorted_batch grouped_batch = Enum.group_by(batch, &elem(&1, 0), &(elem(&1, 1) |> elem(1))) + assert grouped_batch == %{ - a: List.delete_at(buffers, 999), - b: List.delete_at(buffers, 999) - } + a: List.delete_at(buffers, 999), + b: List.delete_at(buffers, 999) + } end end