diff --git a/lib/membrane/timestamp_queue.ex b/lib/membrane/timestamp_queue.ex index ba8de1003..f18dc0005 100644 --- a/lib/membrane/timestamp_queue.ex +++ b/lib/membrane/timestamp_queue.ex @@ -1,5 +1,10 @@ defmodule Membrane.TimestampQueue do @moduledoc """ + A queue, that accepts buffers, stream formats and events from various pads and sorts them basing on the + timestamps. This queue is able to manage demand of a pad, basing on the amount of buffers from the + specific pad currently stored in the queue. + + Queue accepts following options: """ alias Membrane.{Buffer, Event, Pad, StreamFormat} @@ -37,6 +42,15 @@ defmodule Membrane.TimestampQueue do pad_queues: %{}, pads_heap: Heap.max() + @typedoc """ + Options passed to #{inspect(__MODULE__)}.new/1. + + Following options are allowed: + - `:pause_demand_boundary` - positive integer or `:infinity` (default to `:infinity`). Tells, what + amount of buffers from a specific pad must be stored in the queue, to pause auto demand. + - `:pause_demand_boundary_unit` - `:buffers` or `:bytes` (deafult to `:buffers`). Tells, in which metric + `:pause_demand_boundary` is specified. + """ @type options :: [ pause_demand_boundary: pos_integer() | :infinity, pause_demand_boundary_unit: :buffers | :bytes @@ -55,10 +69,17 @@ defmodule Membrane.TimestampQueue do } end - @type suggested_action :: Action.pause_auto_demand() | Action.resume_auto_demand() - @type suggested_actions :: [suggested_action()] + @doc """ + Pushes buffer from the specified pad to the queue. - @spec push_buffer(t(), Pad.ref(), Buffer.t()) :: {suggested_actions(), t()} + Returns suggested actions list and updated queue. + + If amount of buffers from the specified pad stored just exceded `pause_demand_boundary`, suggested + actions list contains `t:Action.pause_auto_demand()` action, or equals empty list otherwise. + + Buffers pushed to the queue must have non-`nil` `pts`. + """ + @spec push_buffer(t(), Pad.ref(), Buffer.t()) :: {[Action.pause_auto_demand()], t()} def push_buffer(_timestamp_queue, pad_ref, %Buffer{dts: nil} = buffer) do raise """ #{inspect(__MODULE__)} accepts only buffers whose dts is not nil, but it received\n#{inspect(buffer, pretty: true)} @@ -82,16 +103,33 @@ defmodule Membrane.TimestampQueue do end) end + + @doc """ + Pushes stream format from the specified pad to the queue. + + Returns updated queue. + """ @spec push_stream_format(t(), Pad.ref(), StreamFormat.t()) :: t() def push_stream_format(%__MODULE__{} = timestamp_queue, pad_ref, stream_format) do push_item(timestamp_queue, pad_ref, {:stream_format, stream_format}) end + + @doc """ + Pushes event from the specified pad to the queue. + + Returns updated queue. + """ @spec push_event(t(), Pad.ref(), Event.t()) :: t() def push_event(%__MODULE__{} = timestamp_queue, pad_ref, event) do push_item(timestamp_queue, pad_ref, {:event, event}) end + @doc """ + Pushes end of stream of the specified pad to the queue. + + Returns updated queue. + """ @spec push_end_of_stream(t(), Pad.ref()) :: t() def push_end_of_stream(%__MODULE__{} = timestamp_queue, pad_ref) do push_item(timestamp_queue, pad_ref, :end_of_stream) @@ -157,7 +195,20 @@ defmodule Membrane.TimestampQueue do @type popped_value :: {Pad.ref(), item()} - @spec pop(t()) :: {suggested_actions(), popped_value() | :none, t()} + @doc""" + Pops up to 1 buffer from the queue. + + Returns suggested actions list, popped buffer and the updated queue. + + If amount of buffers from pad related to popped buffer just felt below `pause_demand_boundary`, + suggested actions list contains `t:Action.resume_auto_demand()` action, or equals empty list + otherwise. + + If queue cannot return any buffer, returns `:none` in it's place instead (note, that queue doesn't + have to be empty, to not be able to return buffer - sometimes queue has to hold up to 1 buffer for + each pad, to be able to work correctly). + """ + @spec pop(t()) :: {[Action.resume_auto_demand()], popped_value() | :none, t()} def pop(%__MODULE__{} = timestamp_queue) do {value, timestamp_queue} = do_pop(timestamp_queue) @@ -229,7 +280,20 @@ defmodule Membrane.TimestampQueue do end end - @spec pop_batch(t()) :: {suggested_actions(), [popped_value() | :none], t()} + @doc """ + Pops as many buffer from the queue, as it is possible. + + Returns suggested actions list, list of popped buffer and the updated queue. + + If amount of buffers related to any of pad in queue just felt below `pause_demand_boundary`, + suggested actions list contains `t:Action.resume_auto_demand()` actions , or equals empty list + otherwise. + + If queue cannot return any buffer, returns an empty list (note, that queue doesn't have to be + empty, to not be able to return buffer - sometimes queue has to hold up to 1 buffer for each pad, + to be able to work correctly). + """ + @spec pop_batch(t()) :: {[Action.resume_auto_demand()], [popped_value() | :none], t()} def pop_batch(%__MODULE__{} = timestamp_queue) do {batch, timestamp_queue} = do_pop_batch(timestamp_queue) diff --git a/test/membrane/timestamp_queue_test.exs b/test/membrane/timestamp_queue_test.exs index ccfa488c4..80d772b70 100644 --- a/test/membrane/timestamp_queue_test.exs +++ b/test/membrane/timestamp_queue_test.exs @@ -142,7 +142,7 @@ defmodule Membrane.TimestampQueueTest do TimestampQueue.push_end_of_stream(queue, Pad.ref(:input, i)) end) - # sanity check, if the test is written correctly + # sanity check, that test is written correctly assert %{} = pads_items assert {[], batch, _queue} = TimestampQueue.pop_batch(queue) @@ -247,6 +247,4 @@ defmodule Membrane.TimestampQueueTest do assert {[], :none, _queue} = TimestampQueue.pop(queue) end end) - - # todo: suggested actions test end