Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement basic functionalities #1

Merged
merged 7 commits into from
Apr 5, 2024
Merged

Conversation

FelonEkonom
Copy link
Member

@FelonEkonom FelonEkonom force-pushed the basic-functionalities branch from d7ef045 to 58c1df0 Compare March 26, 2024 10:19
@FelonEkonom FelonEkonom requested a review from mat-hek March 26, 2024 10:28
@FelonEkonom FelonEkonom added the enhancement New feature or request label Mar 26, 2024
@FelonEkonom FelonEkonom force-pushed the basic-functionalities branch from 5b507ea to b469dbf Compare March 26, 2024 10:38
def new(options \\ []) do
metric =
options
|> Keyword.get(:pause_demand_boundary_unit, :buffers)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as we spoke, the most appropriate unit seems to be time

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will implement having boundary unit expressed in time in the another PR

Comment on lines 85 to 89
Makes the queue not return any buffer in `pop_batch/3`, until a buffer or end of stream arrival
from `pad_ref`.
"""
@spec wait_on_pad(t(), Pad.ref()) :: t()
def wait_on_pad(%__MODULE__{} = timestamp_queue, pad_ref) do
Copy link
Member

@mat-hek mat-hek Mar 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Makes the queue not return any buffer in `pop_batch/3`, until a buffer or end of stream arrival
from `pad_ref`.
"""
@spec wait_on_pad(t(), Pad.ref()) :: t()
def wait_on_pad(%__MODULE__{} = timestamp_queue, pad_ref) do
Registers an input pad in the queue without pushing anything on that pad.
Once a pad is registered, the `pop_batch/3` function won't return buffers
until a `buffer` or `end_of_stream` is available on the registered pad.
Pushing a buffer on an unregistered pad automatically registers it.
"""
@spec register_pad(t(), Pad.ref()) :: t()
def register_pad(%__MODULE__{} = timestamp_queue, pad_ref) do

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. After calling this function, stream formats and events still can occur in the returned batch
  2. Calling push_stream_format or push_event doesn't cause waiting on the pad

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated the suggestion

Comment on lines 66 to 70
@doc """
Creates and returnes new #{inspect(__MODULE__)}.

Accepts `t:options()`.
"""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this says nothing :P

Suggested change
@doc """
Creates and returnes new #{inspect(__MODULE__)}.
Accepts `t:options()`.
"""

A queue, that accepts buffers, stream formats and events from various pads and sorts them based on
their timestamps.
"""
@type t :: %__MODULE__{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should probably be opaque

lib/membrane/timestamp_queue.ex Show resolved Hide resolved
Comment on lines 152 to 154
Buffer #{inspect(buffer, pretty: true)} from pad #{inspect(pad_ref)} has timestamp equal \
#{inspect(buffer_timestamp)}, but previous buffer pushed on queue from this pad had timestamp \
equal #{inspect(max_timestamp)}. Buffers from a single pad must have non-decreasing timestamps.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to change the word timestamp to PTS or DTS, whichever we rely on

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can refactor error message, but simply changing timestamp to PTS or DTS will make this log untrue, since having buffers with monotonic DTS and non-monotonic PTS is totally ok

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

having buffers with monotonic DTS and non-monotonic PTS is totally ok

If the queue relies on PTS and they are not monotonic, the queue can't handle this case no matter what DTS are, so I think the log would be true. Such a situation would probably mean a bug anyway - I believe PTS and DTS should either be always set or never set.

Copy link
Member Author

@FelonEkonom FelonEkonom Apr 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If buffers from a specific pad contain DTS, queue will look only at DTS and won't look at PTS at all

Comment on lines 262 to 277
Pops as many buffers from the queue, as it is possible.

Returns suggested actions list, list of popped buffers and the updated queue.

If the amount of buffers associated with any pad in the queue falls below the
`pause_demand_boundary`, the suggested actions list contains `t:Action.resume_auto_demand()`
actions, otherwise it is an empty list.

If the queue cannot return any buffer, empty list is returned. Note, that queue doesn't have to be
empty to be unable to return a buffer - sometimes queue must keep up to 1 buffer for each pad,
to be able to work correctly.

To be able to maintain proper order of buffers from varius pads, queue won't return next buffer,
if the next buffer that should be returned:
- is the only buffer from a certain pad
- and that certain pad has not received the end of stream yet
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's my try, correct me if I'm wrong:

Suggested change
Pops as many buffers from the queue, as it is possible.
Returns suggested actions list, list of popped buffers and the updated queue.
If the amount of buffers associated with any pad in the queue falls below the
`pause_demand_boundary`, the suggested actions list contains `t:Action.resume_auto_demand()`
actions, otherwise it is an empty list.
If the queue cannot return any buffer, empty list is returned. Note, that queue doesn't have to be
empty to be unable to return a buffer - sometimes queue must keep up to 1 buffer for each pad,
to be able to work correctly.
To be able to maintain proper order of buffers from varius pads, queue won't return next buffer,
if the next buffer that should be returned:
- is the only buffer from a certain pad
- and that certain pad has not received the end of stream yet
Pops items from the queue while they are available.
An item that is not a buffer is always considered available. A buffer is
available when the following conditions are met:
- There is another buffer or `end_of_stream` enqueued on the pad
- On each other pad there is either `end_of_stream` or a buffer with a lower timestamp.
The returned value is a suggested actions list, a list of popped buffers and the updated queue.
If the amount of buffers associated with any pad in the queue falls below the
`pause_demand_boundary`, the suggested actions list contains `t:Action.resume_auto_demand()`
actions, otherwise it is an empty list.

lib/membrane/timestamp_queue.ex Show resolved Hide resolved
@FelonEkonom FelonEkonom requested a review from mat-hek April 4, 2024 11:48
Comment on lines 143 to 147
timestamp_field = if pad_queue.use_pts?, do: "pts", else: "dts"
max_timestamp = pad_queue.max_timestamp_on_qex

if is_integer(max_timestamp) and max_timestamp > buffer_timestamp do
raise """
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
timestamp_field = if pad_queue.use_pts?, do: "pts", else: "dts"
max_timestamp = pad_queue.max_timestamp_on_qex
if is_integer(max_timestamp) and max_timestamp > buffer_timestamp do
raise """
max_timestamp = pad_queue.max_timestamp_on_qex
if is_integer(max_timestamp) and max_timestamp > buffer_timestamp do
timestamp_field = if pad_queue.use_pts?, do: "pts", else: "dts"
raise """

@FelonEkonom FelonEkonom force-pushed the basic-functionalities branch from 000fa9f to 1ac231e Compare April 5, 2024 12:42
@FelonEkonom FelonEkonom requested a review from mat-hek April 5, 2024 12:48
@FelonEkonom FelonEkonom merged commit f61b5ee into master Apr 5, 2024
3 checks passed
@FelonEkonom FelonEkonom deleted the basic-functionalities branch April 5, 2024 13:06
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants