Skip to content

Commit

Permalink
Write docs wip
Browse files Browse the repository at this point in the history
  • Loading branch information
FelonEkonom committed Feb 29, 2024
1 parent da3e658 commit 4a9acf4
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 8 deletions.
74 changes: 69 additions & 5 deletions lib/membrane/timestamp_queue.ex
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
defmodule Membrane.TimestampQueue do
@moduledoc """
A queue, that accepts buffers, stream formats and events from various pads and sorts them basing on the
timestamps. This queue is able to manage demand of a pad, basing on the amount of buffers from the
specific pad currently stored in the queue.
Queue accepts following options:
"""

alias Membrane.{Buffer, Event, Pad, StreamFormat}
Expand Down Expand Up @@ -37,6 +42,15 @@ defmodule Membrane.TimestampQueue do
pad_queues: %{},
pads_heap: Heap.max()

@typedoc """
Options passed to #{inspect(__MODULE__)}.new/1.
Following options are allowed:
- `:pause_demand_boundary` - positive integer or `:infinity` (default to `:infinity`). Tells, what
amount of buffers from a specific pad must be stored in the queue, to pause auto demand.
- `:pause_demand_boundary_unit` - `:buffers` or `:bytes` (deafult to `:buffers`). Tells, in which metric
`:pause_demand_boundary` is specified.
"""
@type options :: [
pause_demand_boundary: pos_integer() | :infinity,
pause_demand_boundary_unit: :buffers | :bytes
Expand All @@ -55,10 +69,17 @@ defmodule Membrane.TimestampQueue do
}
end

@type suggested_action :: Action.pause_auto_demand() | Action.resume_auto_demand()
@type suggested_actions :: [suggested_action()]
@doc """
Pushes buffer from the specified pad to the queue.
@spec push_buffer(t(), Pad.ref(), Buffer.t()) :: {suggested_actions(), t()}
Returns suggested actions list and updated queue.
If amount of buffers from the specified pad stored just exceded `pause_demand_boundary`, suggested
actions list contains `t:Action.pause_auto_demand()` action, or equals empty list otherwise.
Buffers pushed to the queue must have non-`nil` `pts`.
"""
@spec push_buffer(t(), Pad.ref(), Buffer.t()) :: {[Action.pause_auto_demand()], t()}
def push_buffer(_timestamp_queue, pad_ref, %Buffer{dts: nil} = buffer) do
raise """
#{inspect(__MODULE__)} accepts only buffers whose dts is not nil, but it received\n#{inspect(buffer, pretty: true)}
Expand All @@ -82,16 +103,33 @@ defmodule Membrane.TimestampQueue do
end)
end


@doc """
Pushes stream format from the specified pad to the queue.
Returns updated queue.
"""
@spec push_stream_format(t(), Pad.ref(), StreamFormat.t()) :: t()
def push_stream_format(%__MODULE__{} = timestamp_queue, pad_ref, stream_format) do
push_item(timestamp_queue, pad_ref, {:stream_format, stream_format})
end


@doc """
Pushes event from the specified pad to the queue.
Returns updated queue.
"""
@spec push_event(t(), Pad.ref(), Event.t()) :: t()
def push_event(%__MODULE__{} = timestamp_queue, pad_ref, event) do
push_item(timestamp_queue, pad_ref, {:event, event})
end

@doc """
Pushes end of stream of the specified pad to the queue.
Returns updated queue.
"""
@spec push_end_of_stream(t(), Pad.ref()) :: t()
def push_end_of_stream(%__MODULE__{} = timestamp_queue, pad_ref) do
push_item(timestamp_queue, pad_ref, :end_of_stream)
Expand Down Expand Up @@ -157,7 +195,20 @@ defmodule Membrane.TimestampQueue do

@type popped_value :: {Pad.ref(), item()}

@spec pop(t()) :: {suggested_actions(), popped_value() | :none, t()}
@doc"""
Pops up to 1 buffer from the queue.
Returns suggested actions list, popped buffer and the updated queue.
If amount of buffers from pad related to popped buffer just felt below `pause_demand_boundary`,
suggested actions list contains `t:Action.resume_auto_demand()` action, or equals empty list
otherwise.
If queue cannot return any buffer, returns `:none` in it's place instead (note, that queue doesn't
have to be empty, to not be able to return buffer - sometimes queue has to hold up to 1 buffer for
each pad, to be able to work correctly).
"""
@spec pop(t()) :: {[Action.resume_auto_demand()], popped_value() | :none, t()}
def pop(%__MODULE__{} = timestamp_queue) do
{value, timestamp_queue} = do_pop(timestamp_queue)

Expand Down Expand Up @@ -229,7 +280,20 @@ defmodule Membrane.TimestampQueue do
end
end

@spec pop_batch(t()) :: {suggested_actions(), [popped_value() | :none], t()}
@doc """
Pops as many buffer from the queue, as it is possible.
Returns suggested actions list, list of popped buffer and the updated queue.
If amount of buffers related to any of pad in queue just felt below `pause_demand_boundary`,
suggested actions list contains `t:Action.resume_auto_demand()` actions , or equals empty list
otherwise.
If queue cannot return any buffer, returns an empty list (note, that queue doesn't have to be
empty, to not be able to return buffer - sometimes queue has to hold up to 1 buffer for each pad,
to be able to work correctly).
"""
@spec pop_batch(t()) :: {[Action.resume_auto_demand()], [popped_value() | :none], t()}
def pop_batch(%__MODULE__{} = timestamp_queue) do
{batch, timestamp_queue} = do_pop_batch(timestamp_queue)

Expand Down
4 changes: 1 addition & 3 deletions test/membrane/timestamp_queue_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ defmodule Membrane.TimestampQueueTest do
TimestampQueue.push_end_of_stream(queue, Pad.ref(:input, i))
end)

# sanity check, if the test is written correctly
# sanity check, that test is written correctly
assert %{} = pads_items

assert {[], batch, _queue} = TimestampQueue.pop_batch(queue)
Expand Down Expand Up @@ -247,6 +247,4 @@ defmodule Membrane.TimestampQueueTest do
assert {[], :none, _queue} = TimestampQueue.pop(queue)
end
end)

# todo: suggested actions test
end

0 comments on commit 4a9acf4

Please sign in to comment.