diff --git a/lib/membrane/timestamp_queue.ex b/lib/membrane/timestamp_queue.ex index 954269ab5..daa762298 100644 --- a/lib/membrane/timestamp_queue.ex +++ b/lib/membrane/timestamp_queue.ex @@ -240,6 +240,8 @@ defmodule Membrane.TimestampQueue do buffer_size = timestamp_queue.metric.buffers_size([buffer]) cond do + # todo 1: consider updating heap just after popping a buffer + # todo 2: holding buffers counter in pad_queue would help with a problem with buffer with payload: <<>> and metric: Bytes pad_priority != -buffer_time -> timestamp_queue |> Map.update!(:pads_heap, &(&1 |> Heap.pop() |> Heap.push({-buffer_time, pad_ref}))) diff --git a/test/membrane/timestamp_queue_test.exs b/test/membrane/timestamp_queue_test.exs index 1470a090b..e8ac40493 100644 --- a/test/membrane/timestamp_queue_test.exs +++ b/test/membrane/timestamp_queue_test.exs @@ -247,4 +247,49 @@ defmodule Membrane.TimestampQueueTest do assert {[], :none, _queue} = TimestampQueue.pop(queue) end end) + + test "queue sorts buffers from various pads when they aren't linked in the same moment" do + iteration_size = 100 + iterations = 100 + + 1..iterations + |> Enum.reduce(TimestampQueue.new(), fn pads_in_iteration, queue -> + pads = for i <- 1..pads_in_iteration, do: Pad.ref(:input, i) + + new_pad = Pad.ref(:input, pads_in_iteration) + + queue = + Enum.reduce([0, 1], queue, fn dts, queue -> + buffer = %Buffer{dts: dts, payload: <<>>} + {[], queue} = TimestampQueue.push_buffer(queue, new_pad, buffer) + queue + end) + + queue = + pads + |> Enum.reduce(queue, fn pad_ref, queue -> + Pad.ref(:input, pad_idx) = pad_ref + pad_offset = iteration_size * (pads_in_iteration - pad_idx) + 2 + + pad_offset..(pad_offset + iteration_size - 1) + |> Enum.reduce(queue, fn dts, queue -> + buffer = %Buffer{dts: dts, payload: <<>>} + {[], queue} = TimestampQueue.push_buffer(queue, pad_ref, buffer) + queue + end) + end) + + {[], batch, queue} = TimestampQueue.pop_batch(queue) + + sorted_batch = + batch + |> Enum.sort_by(fn {Pad.ref(:input, pad_idx), {:buffer, buffer}} -> + buffer.dts + pad_idx * iteration_size + end) + + assert batch == sorted_batch + + queue + end) + end end