diff --git a/lib/membrane/timestamp_queue.ex b/lib/membrane/timestamp_queue.ex index 8a7400a98..ba8de1003 100644 --- a/lib/membrane/timestamp_queue.ex +++ b/lib/membrane/timestamp_queue.ex @@ -259,11 +259,11 @@ defmodule Membrane.TimestampQueue do %__MODULE__{pause_demand_boundary: boundary} = timestamp_queue, pad_ref ) do - with %{demand_paused?: true, buffers_size: size} when size < boundary <- + with %{paused_demand?: true, buffers_size: size} when size < boundary <- get_in(timestamp_queue, [:pad_queues, pad_ref]) do timestamp_queue = timestamp_queue - |> put_in([:pad_queues, pad_ref, :demand_paused?], false) + |> put_in([:pad_queues, pad_ref, :paused_demand?], false) {[resume_auto_demand: pad_ref], timestamp_queue} else diff --git a/test/membrane/timestamp_queue_test.exs b/test/membrane/timestamp_queue_test.exs index 3306dab8c..ccfa488c4 100644 --- a/test/membrane/timestamp_queue_test.exs +++ b/test/membrane/timestamp_queue_test.exs @@ -13,7 +13,7 @@ defmodule Membrane.TimestampQueueTest do end) end - test "queue sorts buffers some buffers from different pads based on buffer dts" do + test "queue sorts some buffers from different pads based on buffer dts" do input_order = [9, 4, 7, 3, 1, 8, 5, 6, 2, 10] pad_generator = fn i -> Pad.ref(:input, i) end @@ -151,9 +151,8 @@ defmodule Membrane.TimestampQueueTest do batch_without_eos = Enum.reject(batch, &match?({_pad_ref, :end_of_stream}, &1)) sorted_batch_without_eos = - Enum.sort_by(batch_without_eos, fn {pad_ref, {_type, item}} -> - item.dts - dts_offsets[pad_ref] - end) + batch_without_eos + |> Enum.sort_by(fn {pad_ref, {_type, item}} -> item.dts - dts_offsets[pad_ref] end) assert batch_without_eos == sorted_batch_without_eos end @@ -166,7 +165,7 @@ defmodule Membrane.TimestampQueueTest do {[], queue} = TimestampQueue.push_buffer(queue, :a, %Buffer{dts: 3, payload: <<>>}) queue = TimestampQueue.push_end_of_stream(queue, :a) - assert {[], {pad_ref, {:buffer, %Buffer{dts: 1}}}, queue} = TimestampQueue.pop(queue) + assert {[], {:a, {:buffer, %Buffer{dts: 1}}}, queue} = TimestampQueue.pop(queue) queue = TimestampQueue.push_stream_format(queue, :b, %StreamFormat{}) queue = TimestampQueue.push_event(queue, :b, %Event{}) @@ -184,5 +183,70 @@ defmodule Membrane.TimestampQueueTest do assert {[], :none, ^queue} = TimestampQueue.pop(queue) end + [ + %{unit: :buffers, buffer_size: 1, buffer: %Buffer{dts: 0, payload: <<>>}}, + %{unit: :bytes, buffer_size: 100, buffer: %Buffer{dts: 0, payload: <<1::8*100>>}} + ] + |> Enum.map(fn params -> + test "queue returns proper suggested actions when boundary unit is #{inspect(params.unit)}" do + %{unit: unit, buffer_size: buffer_size, buffer: buffer} = + unquote(Macro.escape(params)) + + boundary_in_buff_no = 100 + boundary = buffer_size * boundary_in_buff_no + + queue = + TimestampQueue.new( + pause_demand_boundary: boundary, + pause_demand_boundary_unit: unit + ) + + queue = + 1..(boundary_in_buff_no - 1) + |> Enum.reduce(queue, fn _i, queue -> + assert {[], queue} = TimestampQueue.push_buffer(queue, :input, buffer) + queue + end) + + assert {[pause_auto_demand: :input], queue} = + TimestampQueue.push_buffer(queue, :input, buffer) + + queue = + 1..(boundary_in_buff_no - 1) + |> Enum.reduce(queue, fn _i, queue -> + assert {[], queue} = TimestampQueue.push_buffer(queue, :input, buffer) + queue + end) + + pop_item = {:input, {:buffer, buffer}} + + # testing pop_batch/1 + expected_batch = for _i <- 1..(2 * boundary_in_buff_no - 2), do: pop_item + + # note, that variable _queue is ignored + assert {[resume_auto_demand: :input], ^expected_batch, _queue} = + TimestampQueue.pop_batch(queue) + + # testing pop/1 + queue = + 1..(boundary_in_buff_no - 1) + |> Enum.reduce(queue, fn _i, queue -> + assert {[], ^pop_item, queue} = TimestampQueue.pop(queue) + queue + end) + + assert {[resume_auto_demand: :input], ^pop_item, queue} = TimestampQueue.pop(queue) + + queue = + 1..(boundary_in_buff_no - 2) + |> Enum.reduce(queue, fn _i, queue -> + assert {[], ^pop_item, queue} = TimestampQueue.pop(queue) + queue + end) + + assert {[], :none, _queue} = TimestampQueue.pop(queue) + end + end) + # todo: suggested actions test end