Skip to content

Commit

Permalink
write tests wip
Browse files Browse the repository at this point in the history
  • Loading branch information
FelonEkonom committed Feb 26, 2024
1 parent 69bf37b commit 88970bf
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 4 deletions.
22 changes: 18 additions & 4 deletions lib/membrane/timestamp_queue.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ defmodule Membrane.TimestampQueue do
alias Membrane.Buffer.Metric
alias Membrane.Element.Action

use Bunch.Access

@type buffer_entry :: {:buffer, Buffer.t(), buffer_time :: Membrane.Time.t()}
@type stream_format_entry :: {:stream_format, StreamFormat.t()}
@type event_entry :: {:event, Event.t()}
Expand Down Expand Up @@ -42,16 +44,28 @@ defmodule Membrane.TimestampQueue do

@spec new(options) :: t()
def new(options \\ []) do
{unit, options} = Keyword.pop(options, :pause_demand_boundary_unit, :buffers)
options = [metric: Metric.from_unit(unit)] ++ options

struct!(__MODULE__, options)
metric =
options
|> Keyword.get(:pause_demand_boundary_unit, :buffers)
|> Metric.from_unit()

%__MODULE__{
metric: metric,
pause_demand_boundary: Keyword.pop(options, :pause_demand_boundary, :infinity)
}
end

@type suggested_action :: Action.pause_auto_demand() | Action.resume_auto_demand()
@type suggested_actions :: [suggested_action()]

@spec push_buffer(t(), Pad.ref(), Buffer.t()) :: {suggested_actions(), t()}
def push_buffer(_timestamp_queue, pad_ref, %Buffer{dts: nil} = buffer) do
raise """
#{inspect(__MODULE__)} accepts only buffers whose dts is not nil, but it received\n#{inspect(buffer, pretty: true)}
from pad #{inspect(pad_ref)}
"""
end

def push_buffer(%__MODULE__{} = timestamp_queue, pad_ref, buffer) do
buffer_size = timestamp_queue.metric.buffers_size([buffer])

Expand Down
52 changes: 52 additions & 0 deletions test/membrane/timestamp_queue_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
defmodule Membrane.TimestampQueueTest do
alias Membrane.TimestampQueue
alias Membrane.Buffer

require Membrane.Pad, as: Pad

use ExUnit.Case, async: true

test "queue raises on buffer with nil dts" do
assert_raise(RuntimeError, fn ->
TimestampQueue.new()
|> TimestampQueue.push_buffer(:input, %Buffer{dts: nil, payload: <<>>})
end)
end

test "queue sorts buffers from different pads based on buffer dts" do
input_order = [9,4,7,3,1,8,5,6,2,0]

pad_generator = fn i -> Pad.ref(:input, i) end
buffer_generator = fn i -> %Buffer{dts: i, payload: <<>>} end

queue =
input_order
|> Enum.reduce(TimestampQueue.new(), fn i, queue ->
assert {[], queue} =
queue
|> TimestampQueue.push_buffer(pad_generator.(i), buffer_generator.(i))
|> TimestampQueue.push_end_of_stream(pad_generator.(i))

queue
end)

assert {[], batch, queue} = TimestampQueue.pop_batch(queue)

# assert queue empty
assert queue.pad_queues == TimestampQueue.new().pad_queues
assert queue.pads_heap == TimestampQueue.new().pads_heap

# assert batch
expected_batch =
input_order
|> Enum.sort()
|> Enum.flat_map(fn i ->
[
{pad_generator.(i), {:buffer, buffer_generator.(i)}},
{pad_generator.(i), :end_of_stream}
]
end)

assert batch == expected_batch
end
end

0 comments on commit 88970bf

Please sign in to comment.