Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Timestamp queue POC #756

Closed
wants to merge 21 commits into from
Closed
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
338 changes: 338 additions & 0 deletions lib/membrane/timestamp_queue.ex
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we spoke, the following features would be useful

  • adding pad before pushing anything on it
  • specifying the minimal or average chunk size
  • flushing everything from the queue
  • queue live mode (separate PR)

Original file line number Diff line number Diff line change
@@ -0,0 +1,338 @@
defmodule Membrane.TimestampQueue do
@moduledoc """
Implementation of a queue, that accepts:
- Membrane buffers
- events
- stream formats
- end of streams
from various pads. Items in queue are sorted according to their timestamps.

Moreover, #{inspect(__MODULE__)} is able to manage demand of pads, based on the amount of buffers
from each pad currently stored in the queue.
"""

use Bunch.Access

alias Membrane.{Buffer, Event, Pad, StreamFormat}
alias Membrane.Buffer.Metric
alias Membrane.Element.Action

@type pad_queue :: %{
pad_ref: Pad.ref(),
dts_offset: integer(),
qex: Qex.t(),
buffers_size: non_neg_integer(),
paused_demand?: boolean(),
end_of_stream?: boolean()
}

@typedoc """
A queue, that accepts buffers, stream formats and events from various pads and sorts them based on
their timestamps.
"""
@type t :: %__MODULE__{
current_queue_time: Membrane.Time.t(),
pause_demand_boundary: pos_integer() | :infinity,
metric: Metric.ByteSize | Metric.Count,
pad_queues: %{optional(Pad.ref()) => pad_queue()},
pads_heap: Heap.t()
}

defstruct current_queue_time: Membrane.Time.seconds(0),
pause_demand_boundary: :infinity,
metric: Metric.Count,
pad_queues: %{},
pads_heap: Heap.max()

@typedoc """
Options passed to #{inspect(__MODULE__)}.new/1.

Following options are allowed:
- `:pause_demand_boundary` - positive integer or `:infinity` (default to `:infinity`). Tells, what
amount of buffers associated with specific pad must be stored in the queue, to pause auto demand.
- `:pause_demand_boundary_unit` - `:buffers` or `:bytes` (deafult to `:buffers`). Tells, in which metric
`:pause_demand_boundary` is specified.
Comment on lines +56 to +59
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that the boundary unit should be time. Count and byte_size can be supported too, but not necessarily

"""
@type options :: [
pause_demand_boundary: pos_integer() | :infinity,
pause_demand_boundary_unit: :buffers | :bytes
]

@spec new(options) :: t()
def new(options \\ []) do
metric =
options
|> Keyword.get(:pause_demand_boundary_unit, :buffers)
|> Metric.from_unit()

%__MODULE__{
metric: metric,
pause_demand_boundary: Keyword.get(options, :pause_demand_boundary, :infinity)
}
end

@doc """
Pushes a buffer associated with a specified pad to the queue.

Returns a suggested actions list and the updated queue.

If amount of buffers associated with specified pad in the queue just exceded
`pause_demand_boundary`, the suggested actions list contains `t:Action.pause_auto_demand()`
action, otherwise it is equal an empty list.

Buffers pushed to the queue must have non-`nil` `dts`.
"""
@spec push_buffer(t(), Pad.ref(), Buffer.t()) :: {[Action.pause_auto_demand()], t()}
def push_buffer(_timestamp_queue, pad_ref, %Buffer{dts: nil} = buffer) do
raise """
#{inspect(__MODULE__)} accepts only buffers whose dts is not nil, but it received\n#{inspect(buffer, pretty: true)}
from pad #{inspect(pad_ref)}
"""
end
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd try PTS in this case. Audio streams usually don't have DTS.


def push_buffer(%__MODULE__{} = timestamp_queue, pad_ref, buffer) do
buffer_size = timestamp_queue.metric.buffers_size([buffer])

timestamp_queue
|> push_item(pad_ref, {:buffer, buffer})
|> get_and_update_in([:pad_queues, pad_ref], fn pad_queue ->
pad_queue
|> Map.update!(:buffers_size, &(&1 + buffer_size))
|> Map.update!(:dts_offset, fn
nil -> timestamp_queue.current_queue_time - buffer.dts
valid_offset -> valid_offset
end)
|> actions_after_pushing_buffer(timestamp_queue.pause_demand_boundary)
end)
end

@doc """
Pushes stream format associated with a specified pad to the queue.

Returns the updated queue.
"""
@spec push_stream_format(t(), Pad.ref(), StreamFormat.t()) :: t()
def push_stream_format(%__MODULE__{} = timestamp_queue, pad_ref, stream_format) do
push_item(timestamp_queue, pad_ref, {:stream_format, stream_format})
end

@doc """
Pushes event associated with a specified pad to the queue.

Returns the updated queue.
"""
@spec push_event(t(), Pad.ref(), Event.t()) :: t()
def push_event(%__MODULE__{} = timestamp_queue, pad_ref, event) do
push_item(timestamp_queue, pad_ref, {:event, event})
end

@doc """
Pushes end of stream of the specified pad to the queue.

Returns the updated queue.
"""
@spec push_end_of_stream(t(), Pad.ref()) :: t()
def push_end_of_stream(%__MODULE__{} = timestamp_queue, pad_ref) do
push_item(timestamp_queue, pad_ref, :end_of_stream)
|> put_in([:pad_queues, pad_ref, :end_of_stream?], true)
end

defp push_item(%__MODULE__{} = timestamp_queue, pad_ref, item) do
timestamp_queue
|> maybe_handle_item_from_new_pad(item, pad_ref)
|> update_in(
[:pad_queues, pad_ref, :qex],
&Qex.push(&1, item)
)
end

defp maybe_handle_item_from_new_pad(
%__MODULE__{pad_queues: pad_queues} = timestamp_queue,
_item,
pad_ref
)
when is_map_key(pad_queues, pad_ref) do
timestamp_queue
end

defp maybe_handle_item_from_new_pad(%__MODULE__{} = timestamp_queue, first_item, pad_ref) do
priority =
case first_item do
{:buffer, _buffer} -> -timestamp_queue.current_queue_time
_other -> :infinity
end

timestamp_queue
|> put_in([:pad_queues, pad_ref], new_pad_queue(pad_ref))
|> Map.update!(:pads_heap, &Heap.push(&1, {priority, pad_ref}))
end

defp new_pad_queue(pad_ref) do
%{
pad_ref: pad_ref,
dts_offset: nil,
qex: Qex.new(),
buffers_size: 0,
paused_demand?: false,
end_of_stream?: false
}
end

defp actions_after_pushing_buffer(pad_queue, pause_demand_boundary) do
if not pad_queue.paused_demand? and pad_queue.buffers_size >= pause_demand_boundary do
{[pause_auto_demand: pad_queue.pad_ref], %{pad_queue | paused_demand?: true}}
else
{[], pad_queue}
end
end

defp get_buffer_time(buffer_dts, dts_offset), do: buffer_dts + dts_offset

@type item ::
{:stream_format, StreamFormat.t()}
| {:buffer, Buffer.t()}
| {:event, Event.t()}
| :end_of_stream

@type popped_value :: {Pad.ref(), item()}

@doc """
Pops up to 1 buffer from the queue.

Returns a suggested actions list, popped buffer and the updated queue.

If amount of buffers from pad associated with popped buffer just dropped below
`pause_demand_boundary`, the suggested actions list contains `t:Action.resume_auto_demand()`
action, otherwise it is an empty list.

If the queue cannot return any buffer, returns `:none` in it's place instead. Note, that
the queue doesn't have to be empty to be unable to return a buffer - sometimes queue has to
keep up to 1 buffer for each pad, to be able to work correctly.
"""
@spec pop(t()) :: {[Action.resume_auto_demand()], popped_value() | :none, t()}
def pop(%__MODULE__{} = timestamp_queue) do
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we spoke, let's remove this function, at least for now

{value, timestamp_queue} = do_pop(timestamp_queue)

case value do
{pad_ref, {:buffer, _buffer}} ->
{actions, timestamp_queue} = actions_after_popping_buffer(timestamp_queue, pad_ref)
{actions, value, timestamp_queue}

value ->
{[], value, timestamp_queue}
end
end

@spec do_pop(t()) :: {popped_value() | :none, t()}
defp do_pop(timestamp_queue) do
case Heap.root(timestamp_queue.pads_heap) do
{priority, pad_ref} -> do_pop(timestamp_queue, pad_ref, priority)
nil -> {:none, timestamp_queue}
end
end

defp do_pop(timestamp_queue, pad_ref, pad_priority) do
pad_queue = Map.get(timestamp_queue.pad_queues, pad_ref)

case Qex.pop(pad_queue.qex) do
{{:value, {:buffer, buffer}}, qex} ->
buffer_time = get_buffer_time(buffer.dts, pad_queue.dts_offset)
buffer_size = timestamp_queue.metric.buffers_size([buffer])

cond do
pad_priority != -buffer_time ->
timestamp_queue
|> Map.update!(:pads_heap, &(&1 |> Heap.pop() |> Heap.push({-buffer_time, pad_ref})))
|> do_pop()

buffer_size == pad_queue.buffers_size and not pad_queue.end_of_stream? ->
# last buffer on pad queue without end of stream
{:none, timestamp_queue}

true ->
# this must be recursive call of pop()

pad_queue = %{
pad_queue
| qex: qex,
buffers_size: pad_queue.buffers_size - buffer_size
}

timestamp_queue =
timestamp_queue
|> Map.put(:current_queue_time, buffer_time)
|> put_in([:pad_queues, pad_ref], pad_queue)

{{pad_ref, {:buffer, buffer}}, timestamp_queue}
end

{{:value, item}, qex} ->
timestamp_queue =
timestamp_queue
|> put_in([:pad_queues, pad_ref, :qex], qex)

{{pad_ref, item}, timestamp_queue}

{:empty, _qex} ->
timestamp_queue
|> Map.update!(:pad_queues, &Map.delete(&1, pad_ref))
|> Map.update!(:pads_heap, &Heap.pop/1)
|> do_pop()
end
end

@doc """
Pops as many buffers from the queue, as it is possible.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does it mean 'as many buffers as possible'? The docs should explain the conditions


Returns suggested actions list, list of popped buffers and the updated queue.

If the amount of buffers associated with any pad in the queue falls below the
`pause_demand_boundary`, the suggested actions list contains `t:Action.resume_auto_demand()`
actions, otherwise it is an empty list.

If the queue cannot return any buffer, empty list is returned. Note, that queue doesn't have to be
empty to be unable to return a buffer - sometimes queue must keep up to 1 buffer for each pad,
to be able to work correctly.
"""
@spec pop_batch(t()) :: {[Action.resume_auto_demand()], [popped_value() | :none], t()}
def pop_batch(%__MODULE__{} = timestamp_queue) do
{batch, timestamp_queue} = do_pop_batch(timestamp_queue)

{actions, timestamp_queue} =
batch
|> Enum.reduce(MapSet.new(), fn
{pad_ref, {:buffer, _buffer}}, map_set -> MapSet.put(map_set, pad_ref)
_other, map_set -> map_set
end)
|> Enum.reduce({[], timestamp_queue}, fn pad_ref, {actions_acc, timestamp_queue} ->
{actions, timestamp_queue} = actions_after_popping_buffer(timestamp_queue, pad_ref)
{actions ++ actions_acc, timestamp_queue}
end)

{actions, batch, timestamp_queue}
end

@spec do_pop_batch(t(), [popped_value()]) :: {[popped_value() | :none], t()}
defp do_pop_batch(timestamp_queue, acc \\ []) do
case do_pop(timestamp_queue) do
{:none, timestamp_queue} -> {Enum.reverse(acc), timestamp_queue}
{value, timestamp_queue} -> do_pop_batch(timestamp_queue, [value | acc])
end
end

defp actions_after_popping_buffer(
%__MODULE__{pause_demand_boundary: boundary} = timestamp_queue,
pad_ref
) do
with %{paused_demand?: true, buffers_size: size} when size < boundary <-
get_in(timestamp_queue, [:pad_queues, pad_ref]) do
timestamp_queue =
timestamp_queue
|> put_in([:pad_queues, pad_ref, :paused_demand?], false)

{[resume_auto_demand: pad_ref], timestamp_queue}
else
_other -> {[], timestamp_queue}
end
end
end
1 change: 1 addition & 0 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ defmodule Membrane.Mixfile do
defp deps do
[
{:qex, "~> 0.3"},
{:heap, "~> 3.0"},
{:telemetry, "~> 1.0"},
{:bunch, "~> 1.6"},
{:ratio, "~> 3.0"},
Expand Down
1 change: 1 addition & 0 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
"ex_doc": {:hex, :ex_doc, "0.30.6", "5f8b54854b240a2b55c9734c4b1d0dd7bdd41f71a095d42a70445c03cf05a281", [:mix], [{:earmark_parser, "~> 1.4.31", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "bd48f2ddacf4e482c727f9293d9498e0881597eae6ddc3d9562bd7923375109f"},
"excoveralls": {:hex, :excoveralls, "0.17.1", "83fa7906ef23aa7fc8ad7ee469c357a63b1b3d55dd701ff5b9ce1f72442b2874", [:mix], [{:castore, "~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "95bc6fda953e84c60f14da4a198880336205464e75383ec0f570180567985ae0"},
"file_system": {:hex, :file_system, "0.2.10", "fb082005a9cd1711c05b5248710f8826b02d7d1784e7c3451f9c1231d4fc162d", [:mix], [], "hexpm", "41195edbfb562a593726eda3b3e8b103a309b733ad25f3d642ba49696bf715dc"},
"heap": {:hex, :heap, "3.0.0", "c6dbcd6e9a0b021432176e89cfd829dd065bd6c115981fdcd981a4251fff5fde", [:mix], [], "hexpm", "373eaca5787e2a2b009c42338e70414f590ceabcf96cfc786627ed762ad4dfc6"},
"jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"},
"junit_formatter": {:hex, :junit_formatter, "3.3.1", "c729befb848f1b9571f317d2fefa648e9d4869befc4b2980daca7c1edc468e40", [:mix], [], "hexpm", "761fc5be4b4c15d8ba91a6dafde0b2c2ae6db9da7b8832a55b5a1deb524da72b"},
"makeup": {:hex, :makeup, "1.1.0", "6b67c8bc2882a6b6a445859952a602afc1a41c2e08379ca057c0f525366fc3ca", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "0a45ed501f4a8897f580eabf99a2e5234ea3e75a4373c8a52824f6e873be57a6"},
Expand Down
Loading
Loading