Skip to content

Commit

Permalink
Timestamp Queue wip
Browse files Browse the repository at this point in the history
  • Loading branch information
FelonEkonom committed Feb 22, 2024
1 parent d62f6a1 commit 54101da
Showing 1 changed file with 63 additions and 12 deletions.
75 changes: 63 additions & 12 deletions lib/membrane/timestamp_queue.ex
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,23 @@ defmodule Membrane.TimestampQueue do

@spec pop(t()) :: {suggested_actions(), popped_value() | :none, t()}
def pop(%__MODULE__{} = timestamp_queue) do
{value, timestamp_queue} = do_pop(timestamp_queue)

case value do
{pad_ref, {:buffer, _buffer}} ->
{actions, timestamp_queue} = actions_after_popping_buffer(timestamp_queue, pad_ref)
{actions, value, timestamp_queue}

value ->
{[], value, timestamp_queue}
end
end

@spec do_pop(t()) :: {popped_value() | :none, t()}
defp do_pop(timestamp_queue) do
case Heap.root(timestamp_queue.pads_heap) do
{priority, pad_ref} -> do_pop(timestamp_queue, pad_ref, priority)
nil -> {[], :none, timestamp_queue}
nil -> {:none, timestamp_queue}
end
end

Expand All @@ -162,11 +176,11 @@ defmodule Membrane.TimestampQueue do
buffer_time != -pad_priority ->
timestamp_queue
|> Map.update!(:pads_heap, &(&1 |> Heap.pop() |> Heap.push({-buffer_time, pad_ref})))
|> pop()
|> do_pop()

buffer_size == pad_queue.buffers_size and not pad_queue.end_of_stream? ->
# last buffer on pad queue without end of stream
{[], :none, timestamp_queue}
{:none, timestamp_queue}

true ->
# this must be recursive call of pop()
Expand All @@ -179,27 +193,64 @@ defmodule Membrane.TimestampQueue do

timestamp_queue = timestamp_queue |> put_in([:pad_queues, pad_ref], pad_queue)

suggested_actions =
if pad_queue.demand_paused? and
pad_queue.buffers_size < timestamp_queue.pause_demand_boundary,
do: [resume_auto_demand: pad_ref],
else: []

{suggested_actions, {pad_ref, {:buffer, buffer}}, timestamp_queue}
{{pad_ref, {:buffer, buffer}}, timestamp_queue}
end

{{:value, item}, qex} ->
timestamp_queue =
timestamp_queue
|> put_in([:pad_queues, pad_ref, :qex], qex)

{[], {pad_ref, item}, timestamp_queue}
{{pad_ref, item}, timestamp_queue}

{:empty, _qex} ->
timestamp_queue
|> Map.update!(:pad_queues, &Map.delete(&1, pad_ref))
|> Map.update!(:pads_heap, &Heap.pop/1)
|> pop()
|> do_pop()
end
end

@spec pop_batch(t()) :: {suggested_actions(), [popped_value() | :none], t()}
def pop_batch(%__MODULE__{} = timestamp_queue) do
{batch, timestamp_queue} = do_pop_batch(timestamp_queue)

{actions, timestamp_queue} =
batch
|> Enum.reduce(MapSet.new(), fn
{pad_ref, {:buffer, _buffer}}, map_set -> MapSet.put(map_set, pad_ref)
_other, map_set -> map_set
end)
|> Enum.reduce({[], timestamp_queue}, fn pad_ref, {actions_acc, timestamp_queue} ->
{actions, timestamp_queue} = actions_after_popping_buffer(timestamp_queue, pad_ref)
{actions ++ actions_acc, timestamp_queue}
end)

{actions, batch, timestamp_queue}
end

@spec do_pop_batch(t(), [popped_value()]) :: {[popped_value() | :none], t()}
defp do_pop_batch(timestamp_queue, acc \\ []) do
case do_pop(timestamp_queue) do
{:none, timestamp_queue} -> {Enum.reverse(acc), timestamp_queue}
{value, timestamp_queue} -> do_pop_batch(timestamp_queue, [value | acc])
end
end

defp actions_after_popping_buffer(
%__MODULE__{pause_demand_boundary: boundary} = timestamp_queue,
pad_ref
) do
pad_queue = get_in(timestamp_queue, [:pad_queues, pad_ref])

if pad_queue.demand_paused? and pad_queue.buffers_size < boundary do
timestamp_queue =
timestamp_queue
|> put_in([:pad_queues, pad_ref, :demand_paused?], false)

{[resume_auto_demand: pad_ref], timestamp_queue}
else
{[], timestamp_queue}
end
end
end

0 comments on commit 54101da

Please sign in to comment.