Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement basic functionalities #1

Merged
merged 7 commits into from
Apr 5, 2024
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 18 additions & 26 deletions lib/membrane/timestamp_queue.ex
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@ defmodule Membrane.TimestampQueue do
A queue, that accepts buffers, stream formats and events from various pads and sorts them based on
their timestamps.
"""
@type t :: %__MODULE__{
current_queue_time: Membrane.Time.t(),
pause_demand_boundary: pos_integer() | :infinity,
metric: Metric.ByteSize | Metric.Count,
pad_queues: %{optional(Pad.ref()) => pad_queue()},
pads_heap: Heap.t(),
waiting_on_buffer_from: MapSet.t()
}
@opaque t :: %__MODULE__{
current_queue_time: Membrane.Time.t(),
pause_demand_boundary: pos_integer() | :infinity,
metric: Metric.ByteSize | Metric.Count,
pad_queues: %{optional(Pad.ref()) => pad_queue()},
pads_heap: Heap.t(),
waiting_on_buffer_from: MapSet.t()
}

defstruct current_queue_time: Membrane.Time.seconds(0),
pause_demand_boundary: :infinity,
Expand All @@ -63,11 +63,6 @@ defmodule Membrane.TimestampQueue do
pause_demand_boundary_unit: :buffers | :bytes
]

@doc """
Creates and returnes new #{inspect(__MODULE__)}.

Accepts `t:options()`.
"""
@spec new(options) :: t()
def new(options \\ []) do
metric =
Expand Down Expand Up @@ -145,12 +140,13 @@ defmodule Membrane.TimestampQueue do
end

buffer_timestamp = if pad_queue.use_pts?, do: buffer.pts, else: buffer.dts
timestamp_field = if pad_queue.use_pts?, do: "pts", else: "dts"
max_timestamp = pad_queue.max_timestamp_on_qex

if is_integer(max_timestamp) and max_timestamp > buffer_timestamp do
raise """
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
timestamp_field = if pad_queue.use_pts?, do: "pts", else: "dts"
max_timestamp = pad_queue.max_timestamp_on_qex
if is_integer(max_timestamp) and max_timestamp > buffer_timestamp do
raise """
max_timestamp = pad_queue.max_timestamp_on_qex
if is_integer(max_timestamp) and max_timestamp > buffer_timestamp do
timestamp_field = if pad_queue.use_pts?, do: "pts", else: "dts"
raise """

Buffer #{inspect(buffer, pretty: true)} from pad #{inspect(pad_ref)} has timestamp equal \
#{inspect(buffer_timestamp)}, but previous buffer pushed on queue from this pad had timestamp \
Buffer #{inspect(buffer, pretty: true)} from pad #{inspect(pad_ref)} has #{timestamp_field} equal \
#{inspect(buffer_timestamp)}, but previous buffer pushed on queue from this pad had #{timestamp_field} \
equal #{inspect(max_timestamp)}. Buffers from a single pad must have non-decreasing timestamps.
"""
end
Expand Down Expand Up @@ -259,22 +255,18 @@ defmodule Membrane.TimestampQueue do
@type popped_value :: {Pad.ref(), item()}

@doc """
Pops as many buffers from the queue, as it is possible.
Pops items from the queue while they are available.

An item that is not a buffer is always considered available. A buffer is
available when the following conditions are met:
- There is another buffer or `end_of_stream` enqueued on the pad
- On each other pad there is either `end_of_stream` or a buffer with a lower timestamp.

Returns suggested actions list, list of popped buffers and the updated queue.
The returned value is a suggested actions list, a list of popped buffers and the updated queue.

If the amount of buffers associated with any pad in the queue falls below the
`pause_demand_boundary`, the suggested actions list contains `t:Action.resume_auto_demand()`
actions, otherwise it is an empty list.

If the queue cannot return any buffer, empty list is returned. Note, that queue doesn't have to be
empty to be unable to return a buffer - sometimes queue must keep up to 1 buffer for each pad,
to be able to work correctly.

To be able to maintain proper order of buffers from varius pads, queue won't return next buffer,
if the next buffer that should be returned:
- is the only buffer from a certain pad
- and that certain pad has not received the end of stream yet
"""
@spec pop_batch(t()) :: {[Action.resume_auto_demand()], [popped_value() | :none], t()}
def pop_batch(%__MODULE__{} = timestamp_queue) do
Expand Down