Skip to content

Commit

Permalink
Remove pop/1
Browse files Browse the repository at this point in the history
  • Loading branch information
FelonEkonom committed Mar 7, 2024
1 parent 393d2ba commit 42ec024
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 78 deletions.
82 changes: 28 additions & 54 deletions lib/membrane/timestamp_queue.ex
Original file line number Diff line number Diff line change
Expand Up @@ -258,29 +258,41 @@ defmodule Membrane.TimestampQueue do
@type popped_value :: {Pad.ref(), item()}

@doc """
Pops up to 1 buffer from the queue.
Pops as many buffers from the queue, as it is possible.
Returns a suggested actions list, popped buffer and the updated queue.
Returns suggested actions list, list of popped buffers and the updated queue.
If amount of buffers from pad associated with popped buffer just dropped below
If the amount of buffers associated with any pad in the queue falls below the
`pause_demand_boundary`, the suggested actions list contains `t:Action.resume_auto_demand()`
action, otherwise it is an empty list.
actions, otherwise it is an empty list.
If the queue cannot return any buffer, returns `:none` in it's place instead. Note, that
the queue doesn't have to be empty to be unable to return a buffer - sometimes queue has to
keep up to 1 buffer for each pad, to be able to work correctly.
If the queue cannot return any buffer, empty list is returned. Note, that queue doesn't have to be
empty to be unable to return a buffer - sometimes queue must keep up to 1 buffer for each pad,
to be able to work correctly.
"""
@spec pop(t()) :: {[Action.resume_auto_demand()], popped_value() | :none, t()}
def pop(%__MODULE__{} = timestamp_queue) do
{value, timestamp_queue} = do_pop(timestamp_queue)
@spec pop_batch(t()) :: {[Action.resume_auto_demand()], [popped_value() | :none], t()}
def pop_batch(%__MODULE__{} = timestamp_queue) do
{batch, timestamp_queue} = do_pop_batch(timestamp_queue)

case value do
{pad_ref, {:buffer, _buffer}} ->
{actions, timestamp_queue} =
batch
|> Enum.reduce(MapSet.new(), fn
{pad_ref, {:buffer, _buffer}}, map_set -> MapSet.put(map_set, pad_ref)
_other, map_set -> map_set
end)
|> Enum.reduce({[], timestamp_queue}, fn pad_ref, {actions_acc, timestamp_queue} ->
{actions, timestamp_queue} = actions_after_popping_buffer(timestamp_queue, pad_ref)
{actions, value, timestamp_queue}
{actions ++ actions_acc, timestamp_queue}
end)

{actions, batch, timestamp_queue}
end

value ->
{[], value, timestamp_queue}
@spec do_pop_batch(t(), [popped_value()]) :: {[popped_value() | :none], t()}
defp do_pop_batch(timestamp_queue, acc \\ []) do
case do_pop(timestamp_queue) do
{:none, timestamp_queue} -> {Enum.reverse(acc), timestamp_queue}
{value, timestamp_queue} -> do_pop_batch(timestamp_queue, [value | acc])
end
end

Expand All @@ -300,6 +312,7 @@ defmodule Membrane.TimestampQueue do
end
end

@spec do_pop(t(), Pad.ref()) :: {popped_value() | :none, t()}
defp do_pop(timestamp_queue, pad_ref) do
pad_queue = Map.get(timestamp_queue.pad_queues, pad_ref)

Expand Down Expand Up @@ -352,45 +365,6 @@ defmodule Membrane.TimestampQueue do
end
end

@doc """
Pops as many buffers from the queue, as it is possible.
Returns suggested actions list, list of popped buffers and the updated queue.
If the amount of buffers associated with any pad in the queue falls below the
`pause_demand_boundary`, the suggested actions list contains `t:Action.resume_auto_demand()`
actions, otherwise it is an empty list.
If the queue cannot return any buffer, empty list is returned. Note, that queue doesn't have to be
empty to be unable to return a buffer - sometimes queue must keep up to 1 buffer for each pad,
to be able to work correctly.
"""
@spec pop_batch(t()) :: {[Action.resume_auto_demand()], [popped_value() | :none], t()}
def pop_batch(%__MODULE__{} = timestamp_queue) do
{batch, timestamp_queue} = do_pop_batch(timestamp_queue)

{actions, timestamp_queue} =
batch
|> Enum.reduce(MapSet.new(), fn
{pad_ref, {:buffer, _buffer}}, map_set -> MapSet.put(map_set, pad_ref)
_other, map_set -> map_set
end)
|> Enum.reduce({[], timestamp_queue}, fn pad_ref, {actions_acc, timestamp_queue} ->
{actions, timestamp_queue} = actions_after_popping_buffer(timestamp_queue, pad_ref)
{actions ++ actions_acc, timestamp_queue}
end)

{actions, batch, timestamp_queue}
end

@spec do_pop_batch(t(), [popped_value()]) :: {[popped_value() | :none], t()}
defp do_pop_batch(timestamp_queue, acc \\ []) do
case do_pop(timestamp_queue) do
{:none, timestamp_queue} -> {Enum.reverse(acc), timestamp_queue}
{value, timestamp_queue} -> do_pop_batch(timestamp_queue, [value | acc])
end
end

defp actions_after_popping_buffer(
%__MODULE__{pause_demand_boundary: boundary} = timestamp_queue,
pad_ref
Expand Down
28 changes: 4 additions & 24 deletions test/membrane/timestamp_queue_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -162,11 +162,12 @@ defmodule Membrane.TimestampQueueTest do

{[], queue} = TimestampQueue.push_buffer(queue, :a, %Buffer{dts: 1, payload: <<>>})
{[], queue} = TimestampQueue.push_buffer(queue, :a, %Buffer{dts: 2, payload: <<>>})

assert {[], [a: {:buffer, %Buffer{dts: 1}}], queue} = TimestampQueue.pop_batch(queue)

{[], queue} = TimestampQueue.push_buffer(queue, :a, %Buffer{dts: 3, payload: <<>>})
queue = TimestampQueue.push_end_of_stream(queue, :a)

assert {[], {:a, {:buffer, %Buffer{dts: 1}}}, queue} = TimestampQueue.pop(queue)

queue = TimestampQueue.push_stream_format(queue, :b, %StreamFormat{})
queue = TimestampQueue.push_event(queue, :b, %Event{})

Expand All @@ -180,7 +181,7 @@ defmodule Membrane.TimestampQueueTest do
a: :end_of_stream
]

assert {[], :none, ^queue} = TimestampQueue.pop(queue)
assert {[], [], ^queue} = TimestampQueue.pop_batch(queue)
end

[
Expand Down Expand Up @@ -220,31 +221,10 @@ defmodule Membrane.TimestampQueueTest do

pop_item = {:input, {:buffer, buffer}}

# testing pop_batch/1
expected_batch = for _i <- 1..(2 * boundary_in_buff_no - 2), do: pop_item

# note, that variable _queue is ignored
assert {[resume_auto_demand: :input], ^expected_batch, _queue} =
TimestampQueue.pop_batch(queue)

# testing pop/1
queue =
1..(boundary_in_buff_no - 1)
|> Enum.reduce(queue, fn _i, queue ->
assert {[], ^pop_item, queue} = TimestampQueue.pop(queue)
queue
end)

assert {[resume_auto_demand: :input], ^pop_item, queue} = TimestampQueue.pop(queue)

queue =
1..(boundary_in_buff_no - 2)
|> Enum.reduce(queue, fn _i, queue ->
assert {[], ^pop_item, queue} = TimestampQueue.pop(queue)
queue
end)

assert {[], :none, _queue} = TimestampQueue.pop(queue)
end
end)

Expand Down

0 comments on commit 42ec024

Please sign in to comment.