diff --git a/lib/membrane/timestamp_queue.ex b/lib/membrane/timestamp_queue.ex index 9801683..77b630a 100644 --- a/lib/membrane/timestamp_queue.ex +++ b/lib/membrane/timestamp_queue.ex @@ -6,6 +6,7 @@ defmodule Membrane.TimestampQueue do - stream formats - end of streams from various pads. Items in queue are sorted according to their timestamps. + Moreover, #{inspect(__MODULE__)} is able to manage demand of pads, based on the amount of buffers from each pad currently stored in the queue. """ @@ -50,6 +51,7 @@ defmodule Membrane.TimestampQueue do @typedoc """ Options passed to #{inspect(__MODULE__)}.new/1. + Following options are allowed: - `:pause_demand_boundary` - positive integer or `:infinity` (default to `:infinity`). Tells, what amount of buffers associated with specific pad must be stored in the queue, to pause auto demand. @@ -63,6 +65,7 @@ defmodule Membrane.TimestampQueue do @doc """ Creates and returnes new #{inspect(__MODULE__)}. + Accepts `t:options()`. """ @spec new(options) :: t() @@ -90,10 +93,13 @@ defmodule Membrane.TimestampQueue do @doc """ Pushes a buffer associated with a specified pad to the queue. + Returns a suggested actions list and the updated queue. + If amount of buffers associated with specified pad in the queue just exceded `pause_demand_boundary`, the suggested actions list contains `t:Action.pause_auto_demand()` action, otherwise it is equal an empty list. + Buffers pushed to the queue must have a non-`nil` `dts` or `pts`. """ @spec push_buffer(t(), Pad.ref(), Buffer.t()) :: {[Action.pause_auto_demand()], t()} @@ -154,6 +160,7 @@ defmodule Membrane.TimestampQueue do @doc """ Pushes stream format associated with a specified pad to the queue. + Returns the updated queue. """ @spec push_stream_format(t(), Pad.ref(), StreamFormat.t()) :: t() @@ -163,6 +170,7 @@ defmodule Membrane.TimestampQueue do @doc """ Pushes event associated with a specified pad to the queue. + Returns the updated queue. """ @spec push_event(t(), Pad.ref(), Event.t()) :: t() @@ -172,6 +180,7 @@ defmodule Membrane.TimestampQueue do @doc """ Pushes end of stream of the specified pad to the queue. + Returns the updated queue. """ @spec push_end_of_stream(t(), Pad.ref()) :: t() @@ -251,13 +260,21 @@ defmodule Membrane.TimestampQueue do @doc """ Pops as many buffers from the queue, as it is possible. + Returns suggested actions list, list of popped buffers and the updated queue. + If the amount of buffers associated with any pad in the queue falls below the `pause_demand_boundary`, the suggested actions list contains `t:Action.resume_auto_demand()` actions, otherwise it is an empty list. + If the queue cannot return any buffer, empty list is returned. Note, that queue doesn't have to be empty to be unable to return a buffer - sometimes queue must keep up to 1 buffer for each pad, to be able to work correctly. + + To be able to maintain proper order of buffers from varius pads, queue won't return next buffer, + if the next buffer that should be returned: + - is the only buffer from a certain pad + - and that certain pad has not received the end of stream yet """ @spec pop_batch(t()) :: {[Action.resume_auto_demand()], [popped_value() | :none], t()} def pop_batch(%__MODULE__{} = timestamp_queue) do @@ -369,14 +386,4 @@ defmodule Membrane.TimestampQueue do _other -> {[], timestamp_queue} end end - - # TODO: - # 1) specify average chunk size, expresesed in time duration - # - sorted queue - # - change in pause/resume demand mechanism - # - - # 2) specify pause demand boundary in time duration - # - hold last returned buffer timestamp and last buffer on queue timestamp to pad_queue - # - - # 3) write benchmarks and optimize the queue end diff --git a/test/membrane_timestamp_queue/unit_test.exs b/test/membrane_timestamp_queue/unit_test.exs index 9093aaa..f4a0e84 100644 --- a/test/membrane_timestamp_queue/unit_test.exs +++ b/test/membrane_timestamp_queue/unit_test.exs @@ -1,11 +1,11 @@ defmodule Membrane.TimestampQueue.UnitTest do use ExUnit.Case, async: true + require Membrane.Pad, as: Pad + alias Membrane.Buffer alias Membrane.TimestampQueue - require Membrane.Pad, as: Pad - test "queue raises on buffer with nil dts" do assert_raise(RuntimeError, fn -> TimestampQueue.new() @@ -281,8 +281,7 @@ defmodule Membrane.TimestampQueue.UnitTest do end) end - # todo: unify tests naming convention - test "waiting on pads" do + test "queue doesn't return any buffer, if it should wait on pad" do queue = TimestampQueue.new() |> TimestampQueue.wait_on_pad(:a)