Skip to content

Commit

Permalink
Timestamp Queue wip
Browse files Browse the repository at this point in the history
  • Loading branch information
FelonEkonom committed Feb 16, 2024
1 parent f112f94 commit d62f6a1
Show file tree
Hide file tree
Showing 3 changed files with 207 additions and 0 deletions.
205 changes: 205 additions & 0 deletions lib/membrane/timestamp_queue.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
defmodule Membrane.TimestampQueue do
@moduledoc """
"""

alias Membrane.{Buffer, Event, Pad, StreamFormat}
alias Membrane.Buffer.Metric
alias Membrane.Element.Action

@type buffer_entry :: {:buffer, Buffer.t(), buffer_time :: Membrane.Time.t()}
@type stream_format_entry :: {:stream_format, StreamFormat.t()}
@type event_entry :: {:event, Event.t()}

@type queue_entry :: buffer_entry() | stream_format_entry() | event_entry()

@type pad_queue :: %{
pad_ref: Pad.ref(),
dts_offset: integer(),
qex: Qex.t(),
buffers_size: non_neg_integer(),
paused_demand?: boolean(),
end_of_stream?: boolean()
}

@type t :: %__MODULE__{
current_queue_time: Membrane.Time.t(),
pause_demand_boundary: pos_integer() | :infinity,
metric: Metric.ByteSize | Metric.Count,
pad_queues: %{optional(Pad.ref()) => pad_queue()},
pads_heap: Heap.t()
}

defstruct current_queue_time: Membrane.Time.seconds(0),
pause_demand_boundary: :infinity,
metric: Metric.Count,
pad_queues: %{},
pads_heap: Heap.min()

@type option ::
{:pause_demand_boundary, pos_integer() | :infinity}
| {:pause_demand_boundary_unit, :buffers | :bytes}
@type options :: [option]

@spec new(options) :: t()
def new(options \\ []) do
{unit, options} = Keyword.pop(options, :pause_demand_boundary_unit, :buffers)
options = [metric: Metric.from_unit(unit)] ++ options

struct!(__MODULE__, options)
end

@type suggested_action :: Action.pause_auto_demand() | Action.resume_auto_demand()
@type suggested_actions :: [suggested_action()]

@spec push_buffer(t(), Pad.ref(), Buffer.t()) :: {suggested_actions(), t()}
def push_buffer(%__MODULE__{} = timestamp_queue, pad_ref, buffer) do
buffer_size = timestamp_queue.metric.buffers_size([buffer])

timestamp_queue
|> push_item(pad_ref, {:buffer, buffer})
|> get_and_update_in([:pad_queues, pad_ref], fn pad_queue ->
pad_queue
|> Map.update!(:buffers_size, &(&1 + buffer_size))
|> Map.update!(:dts_offset, fn
nil -> timestamp_queue.current_queue_time - buffer.dts
valid_offset -> valid_offset
end)
|> actions_after_pushing_buffer(timestamp_queue.pause_demand_boundary)
end)
end

@spec push_stream_format(t(), Pad.ref(), StreamFormat.t()) :: t()
def push_stream_format(%__MODULE__{} = timestamp_queue, pad_ref, stream_format) do
push_item(timestamp_queue, pad_ref, {:stream_format, stream_format})
end

@spec push_event(t(), Pad.ref(), Event.t()) :: t()
def push_event(%__MODULE__{} = timestamp_queue, pad_ref, event) do
push_item(timestamp_queue, pad_ref, {:event, event})
end

@spec push_end_of_stream(t(), Pad.ref()) :: t()
def push_end_of_stream(%__MODULE__{} = timestamp_queue, pad_ref) do
push_item(timestamp_queue, pad_ref, :end_of_stream)
end

defp push_item(%__MODULE__{} = timestamp_queue, pad_ref, item) do
timestamp_queue
|> maybe_handle_item_from_new_pad(item, pad_ref)
|> update_in(
[:pads_queue, pad_ref, :qex],
&Qex.push(&1, item)
)
end

defp maybe_handle_item_from_new_pad(
%__MODULE__{pad_queues: pad_queues} = timestamp_queue,
_item,
pad_ref
)
when is_map_key(pad_queues, pad_ref) do
timestamp_queue
end

defp maybe_handle_item_from_new_pad(%__MODULE__{} = timestamp_queue, first_item, pad_ref) do
priority =
case first_item do
{:buffer, %Buffer{dts: dts}} -> -get_buffer_time(dts, timestamp_queue.current_queue_time)
_other -> :infinity
end

timestamp_queue
|> put_in([:pad_queues, pad_ref], new_pad_queue(pad_ref))
|> Map.update!(:pads_heap, &Heap.push(&1, {priority, pad_ref}))
end

defp new_pad_queue(pad_ref) do
%{
pad_ref: pad_ref,
dts_offset: nil,
qex: Qex.new(),
buffers_size: 0,
paused_demand?: false,
end_of_stream?: false
}
end

defp actions_after_pushing_buffer(pad_queue, pause_demand_boundary) do
if not pad_queue.paused_demand? and pad_queue.buffers_size >= pause_demand_boundary do
{[pause_auto_demand: pad_queue.pad_ref], %{pad_queue | paused_demand?: true}}
else
{[], pad_queue}
end
end

defp get_buffer_time(buffer_dts, dts_offset), do: buffer_dts + dts_offset

@type item ::
{:stream_format, StreamFormat.t()}
| {:buffer, Buffer.t()}
| {:event, Event.t()}
| :end_of_stream

@type popped_value :: {Pad.ref(), item()}

@spec pop(t()) :: {suggested_actions(), popped_value() | :none, t()}
def pop(%__MODULE__{} = timestamp_queue) do
case Heap.root(timestamp_queue.pads_heap) do
{priority, pad_ref} -> do_pop(timestamp_queue, pad_ref, priority)
nil -> {[], :none, timestamp_queue}
end
end

defp do_pop(timestamp_queue, pad_ref, pad_priority) do
pad_queue = Map.get(timestamp_queue.pad_queues, pad_ref)

case Qex.pop(pad_queue.qex) do
{{:value, {:buffer, buffer}}, qex} ->
buffer_time = get_buffer_time(buffer.dts, pad_queue.dts_offset)
buffer_size = timestamp_queue.metric.buffers_size([buffer])

cond do
buffer_time != -pad_priority ->
timestamp_queue
|> Map.update!(:pads_heap, &(&1 |> Heap.pop() |> Heap.push({-buffer_time, pad_ref})))
|> pop()

buffer_size == pad_queue.buffers_size and not pad_queue.end_of_stream? ->
# last buffer on pad queue without end of stream
{[], :none, timestamp_queue}

true ->
# this must be recursive call of pop()

pad_queue = %{
pad_queue
| qex: qex,
buffers_size: pad_queue.buffers_size - buffer_size
}

timestamp_queue = timestamp_queue |> put_in([:pad_queues, pad_ref], pad_queue)

suggested_actions =
if pad_queue.demand_paused? and
pad_queue.buffers_size < timestamp_queue.pause_demand_boundary,
do: [resume_auto_demand: pad_ref],
else: []

{suggested_actions, {pad_ref, {:buffer, buffer}}, timestamp_queue}
end

{{:value, item}, qex} ->
timestamp_queue =
timestamp_queue
|> put_in([:pad_queues, pad_ref, :qex], qex)

{[], {pad_ref, item}, timestamp_queue}

{:empty, _qex} ->
timestamp_queue
|> Map.update!(:pad_queues, &Map.delete(&1, pad_ref))
|> Map.update!(:pads_heap, &Heap.pop/1)
|> pop()
end
end
end
1 change: 1 addition & 0 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ defmodule Membrane.Mixfile do
defp deps do
[
{:qex, "~> 0.3"},
{:heap, "~> 3.0"},
{:telemetry, "~> 1.0"},
{:bunch, "~> 1.6"},
{:ratio, "~> 3.0"},
Expand Down
1 change: 1 addition & 0 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
"ex_doc": {:hex, :ex_doc, "0.30.6", "5f8b54854b240a2b55c9734c4b1d0dd7bdd41f71a095d42a70445c03cf05a281", [:mix], [{:earmark_parser, "~> 1.4.31", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "bd48f2ddacf4e482c727f9293d9498e0881597eae6ddc3d9562bd7923375109f"},
"excoveralls": {:hex, :excoveralls, "0.17.1", "83fa7906ef23aa7fc8ad7ee469c357a63b1b3d55dd701ff5b9ce1f72442b2874", [:mix], [{:castore, "~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "95bc6fda953e84c60f14da4a198880336205464e75383ec0f570180567985ae0"},
"file_system": {:hex, :file_system, "0.2.10", "fb082005a9cd1711c05b5248710f8826b02d7d1784e7c3451f9c1231d4fc162d", [:mix], [], "hexpm", "41195edbfb562a593726eda3b3e8b103a309b733ad25f3d642ba49696bf715dc"},
"heap": {:hex, :heap, "3.0.0", "c6dbcd6e9a0b021432176e89cfd829dd065bd6c115981fdcd981a4251fff5fde", [:mix], [], "hexpm", "373eaca5787e2a2b009c42338e70414f590ceabcf96cfc786627ed762ad4dfc6"},
"jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"},
"junit_formatter": {:hex, :junit_formatter, "3.3.1", "c729befb848f1b9571f317d2fefa648e9d4869befc4b2980daca7c1edc468e40", [:mix], [], "hexpm", "761fc5be4b4c15d8ba91a6dafde0b2c2ae6db9da7b8832a55b5a1deb524da72b"},
"makeup": {:hex, :makeup, "1.1.0", "6b67c8bc2882a6b6a445859952a602afc1a41c2e08379ca057c0f525366fc3ca", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "0a45ed501f4a8897f580eabf99a2e5234ea3e75a4373c8a52824f6e873be57a6"},
Expand Down

0 comments on commit d62f6a1

Please sign in to comment.