Skip to content

Commit

Permalink
Add registering pads
Browse files Browse the repository at this point in the history
  • Loading branch information
FelonEkonom committed Mar 6, 2024
1 parent 8ce06f2 commit 1d4a9f2
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 8 deletions.
33 changes: 26 additions & 7 deletions lib/membrane/timestamp_queue.ex
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,16 @@ defmodule Membrane.TimestampQueue do
pause_demand_boundary: pos_integer() | :infinity,
metric: Metric.ByteSize | Metric.Count,
pad_queues: %{optional(Pad.ref()) => pad_queue()},
pads_heap: Heap.t()
pads_heap: Heap.t(),
waiting_on_buffer_from: MapSet.t()
}

defstruct current_queue_time: Membrane.Time.seconds(0),
pause_demand_boundary: :infinity,
metric: Metric.Count,
pad_queues: %{},
pads_heap: Heap.max()
pads_heap: Heap.max(),
waiting_on_buffer_from: MapSet.new()

@typedoc """
Options passed to #{inspect(__MODULE__)}.new/1.
Expand Down Expand Up @@ -74,6 +76,12 @@ defmodule Membrane.TimestampQueue do
}
end

@spec register_pad(t(), Pad.ref()) :: t()
def register_pad(%__MODULE__{} = timestamp_queue, pad_ref) do
timestamp_queue
|> Map.update!(:waiting_on_buffer_from, &MapSet.put(&1, pad_ref))
end

@doc """
Pushes a buffer associated with a specified pad to the queue.
Expand All @@ -97,6 +105,7 @@ defmodule Membrane.TimestampQueue do
buffer_size = timestamp_queue.metric.buffers_size([buffer])

timestamp_queue
|> Map.update!(:waiting_on_buffer_from, &MapSet.delete(&1, pad_ref))
|> push_item(pad_ref, {:buffer, buffer})
|> get_and_update_in([:pad_queues, pad_ref], fn pad_queue ->
pad_queue
Expand Down Expand Up @@ -167,7 +176,9 @@ defmodule Membrane.TimestampQueue do
"""
@spec push_end_of_stream(t(), Pad.ref()) :: t()
def push_end_of_stream(%__MODULE__{} = timestamp_queue, pad_ref) do
push_item(timestamp_queue, pad_ref, :end_of_stream)
timestamp_queue
|> Map.update!(:waiting_on_buffer_from, &MapSet.delete(&1, pad_ref))
|> push_item(pad_ref, :end_of_stream)
|> put_in([:pad_queues, pad_ref, :end_of_stream?], true)
end

Expand Down Expand Up @@ -207,7 +218,7 @@ defmodule Membrane.TimestampQueue do
qex: Qex.new(),
buffers_size: 0,
buffers_number: 0,
update_heap_on_buffer?: false,
update_heap_on_buffer?: true,
paused_demand?: false,
end_of_stream?: false,
use_pts?: nil,
Expand Down Expand Up @@ -266,9 +277,17 @@ defmodule Membrane.TimestampQueue do

@spec do_pop(t()) :: {popped_value() | :none, t()}
defp do_pop(timestamp_queue) do
case Heap.root(timestamp_queue.pads_heap) do
{_priority, pad_ref} -> do_pop(timestamp_queue, pad_ref)
nil -> {:none, timestamp_queue}
if MapSet.size(timestamp_queue.waiting_on_buffer_from) == 0 do
case Heap.root(timestamp_queue.pads_heap) do
{_priority, pad_ref} -> do_pop(timestamp_queue, pad_ref)
nil -> {:none, timestamp_queue}
end
else
case Heap.root(timestamp_queue.pads_heap) do
# priority :infinity cannot be associated with a buffer
{:infinity, pad_ref} -> do_pop(timestamp_queue, pad_ref)
_other -> {:none, timestamp_queue}
end
end
end

Expand Down
51 changes: 50 additions & 1 deletion test/membrane/timestamp_queue_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,6 @@ defmodule Membrane.TimestampQueueTest do
1..iterations
|> Enum.reduce(TimestampQueue.new(), fn pads_in_iteration, queue ->
pads = for i <- 1..pads_in_iteration, do: Pad.ref(:input, i)

new_pad = Pad.ref(:input, pads_in_iteration)

queue =
Expand Down Expand Up @@ -301,4 +300,54 @@ defmodule Membrane.TimestampQueueTest do
queue
end)
end

# todo: unify tests naming convention
test "registering pads" do
queue =
TimestampQueue.new()
|> TimestampQueue.register_pad(:a)
|> TimestampQueue.register_pad(:b)

events = for i <- 1..1000, do: %Event{dts: i}
buffers = for i <- 1..1000, do: %Buffer{dts: i, payload: <<>>}

queue =
events
|> Enum.reduce(queue, fn event, queue ->
queue
|> TimestampQueue.push_event(:a, event)
|> TimestampQueue.push_event(:b, event)
end)

queue =
buffers
|> Enum.reduce(queue, fn buffer, queue ->
{[], queue} = TimestampQueue.push_buffer(queue, :a, buffer)
queue
end)

{[], batch, queue} = TimestampQueue.pop_batch(queue)

grouped_batch = Enum.group_by(batch, &elem(&1, 0), &(elem(&1, 1) |> elem(1)))

assert grouped_batch == %{a: events, b: events}

queue =
buffers
|> Enum.reduce(queue, fn buffer, queue ->
{[], queue} = TimestampQueue.push_buffer(queue, :b, buffer)
queue
end)

{[], batch, _queue} = TimestampQueue.pop_batch(queue)

sorted_batch = Enum.sort_by(batch, fn {_pad_ref, {:buffer, buffer}} -> buffer.dts end)
assert batch == sorted_batch

grouped_batch = Enum.group_by(batch, &elem(&1, 0), &(elem(&1, 1) |> elem(1)))
assert grouped_batch == %{
a: List.delete_at(buffers, 999),
b: List.delete_at(buffers, 999)
}
end
end

0 comments on commit 1d4a9f2

Please sign in to comment.