diff --git a/lib/membrane/timestamp_queue.ex b/lib/membrane/timestamp_queue.ex index daa762298..eb8176822 100644 --- a/lib/membrane/timestamp_queue.ex +++ b/lib/membrane/timestamp_queue.ex @@ -22,6 +22,7 @@ defmodule Membrane.TimestampQueue do dts_offset: integer(), qex: Qex.t(), buffers_size: non_neg_integer(), + buffers_number: non_neg_integer(), paused_demand?: boolean(), end_of_stream?: boolean() } @@ -97,11 +98,14 @@ defmodule Membrane.TimestampQueue do |> push_item(pad_ref, {:buffer, buffer}) |> get_and_update_in([:pad_queues, pad_ref], fn pad_queue -> pad_queue - |> Map.update!(:buffers_size, &(&1 + buffer_size)) |> Map.update!(:dts_offset, fn nil -> timestamp_queue.current_queue_time - buffer.dts valid_offset -> valid_offset end) + |> Map.merge(%{ + buffers_size: pad_queue.buffers_size + buffer_size, + buffers_number: pad_queue.buffers_number + 1 + }) |> actions_after_pushing_buffer(timestamp_queue.pause_demand_boundary) end) end @@ -173,6 +177,7 @@ defmodule Membrane.TimestampQueue do dts_offset: nil, qex: Qex.new(), buffers_size: 0, + buffers_number: 0, paused_demand?: false, end_of_stream?: false } @@ -231,6 +236,7 @@ defmodule Membrane.TimestampQueue do end end + # todo 1: consider updating heap just after popping a buffer defp do_pop(timestamp_queue, pad_ref, pad_priority) do pad_queue = Map.get(timestamp_queue.pad_queues, pad_ref) @@ -240,14 +246,12 @@ defmodule Membrane.TimestampQueue do buffer_size = timestamp_queue.metric.buffers_size([buffer]) cond do - # todo 1: consider updating heap just after popping a buffer - # todo 2: holding buffers counter in pad_queue would help with a problem with buffer with payload: <<>> and metric: Bytes pad_priority != -buffer_time -> timestamp_queue |> Map.update!(:pads_heap, &(&1 |> Heap.pop() |> Heap.push({-buffer_time, pad_ref}))) |> do_pop() - buffer_size == pad_queue.buffers_size and not pad_queue.end_of_stream? -> + pad_queue.buffers_number == 1 and not pad_queue.end_of_stream? -> # last buffer on pad queue without end of stream {:none, timestamp_queue} @@ -257,7 +261,8 @@ defmodule Membrane.TimestampQueue do pad_queue = %{ pad_queue | qex: qex, - buffers_size: pad_queue.buffers_size - buffer_size + buffers_size: pad_queue.buffers_size - buffer_size, + buffers_number: pad_queue.buffers_number - 1 } timestamp_queue =