From 415e7c5670c7d8fd8c3af8b1663ba245427e31b3 Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Wed, 27 Mar 2024 17:02:24 +0100 Subject: [PATCH] Bump default input queue size to 100 --- lib/membrane/core/element/action_handler.ex | 12 ++++++------ lib/membrane/core/element/buffer_controller.ex | 7 ++++++- lib/membrane/core/element/manual_flow_controller.ex | 11 ++++++----- .../element/manual_flow_controller/input_queue.ex | 6 +++--- lib/membrane/core/element/pad_controller.ex | 2 +- test/membrane/core/element/event_controller_test.exs | 2 +- test/membrane/core/element/input_queue_test.exs | 8 ++++---- .../core/element/lifecycle_controller_test.exs | 2 +- .../core/element/stream_format_controller_test.exs | 2 +- 9 files changed, 29 insertions(+), 23 deletions(-) diff --git a/lib/membrane/core/element/action_handler.ex b/lib/membrane/core/element/action_handler.ex index 32aae1537..df8603e8d 100644 --- a/lib/membrane/core/element/action_handler.ex +++ b/lib/membrane/core/element/action_handler.ex @@ -245,7 +245,7 @@ defmodule Membrane.Core.Element.ActionHandler do %State{type: type} = state ) when is_pad_ref(pad_ref) and is_demand_size(size) and type in [:sink, :filter, :endpoint] do - supply_demand(pad_ref, size, state) + delay_supplying_demand(pad_ref, size, state) end @impl CallbackHandler @@ -403,28 +403,28 @@ defmodule Membrane.Core.Element.ActionHandler do end end - @spec supply_demand( + @spec delay_supplying_demand( Pad.ref(), Action.demand_size(), State.t() ) :: State.t() - defp supply_demand(pad_ref, 0, state) do + defp delay_supplying_demand(pad_ref, 0, state) do Membrane.Logger.debug_verbose("Ignoring demand of size of 0 on pad #{inspect(pad_ref)}") state end - defp supply_demand(pad_ref, size, _state) + defp delay_supplying_demand(pad_ref, size, _state) when is_integer(size) and size < 0 do raise ElementError, "Tried to request a negative demand of size #{inspect(size)} on pad #{inspect(pad_ref)}" end - defp supply_demand(pad_ref, size, state) do + defp delay_supplying_demand(pad_ref, size, state) do with %{direction: :input, flow_control: :manual} <- PadModel.get_data!(state, pad_ref) do # todo: get_data! above could be eradicated state = ManualFlowController.update_demand(pad_ref, size, state) - ManualFlowController.delay_demand_supply(pad_ref, state) + ManualFlowController.delay_supplying_demand(pad_ref, state) else %{direction: :output} -> raise PadDirectionError, action: :demand, direction: :output, pad: pad_ref diff --git a/lib/membrane/core/element/buffer_controller.ex b/lib/membrane/core/element/buffer_controller.ex index 0398191cf..397a01be4 100644 --- a/lib/membrane/core/element/buffer_controller.ex +++ b/lib/membrane/core/element/buffer_controller.ex @@ -64,7 +64,12 @@ defmodule Membrane.Core.Element.BufferController do end # todo: move it to the flow controllers? - @spec do_handle_ingoing_buffers(Pad.ref(), PadModel.pad_data(), [Buffer.t()] | Buffer.t(), State.t()) :: + @spec do_handle_ingoing_buffers( + Pad.ref(), + PadModel.pad_data(), + [Buffer.t()] | Buffer.t(), + State.t() + ) :: State.t() defp do_handle_ingoing_buffers(pad_ref, %{flow_control: :auto} = data, buffers, state) do %{demand: demand, demand_unit: demand_unit, stalker_metrics: stalker_metrics} = data diff --git a/lib/membrane/core/element/manual_flow_controller.ex b/lib/membrane/core/element/manual_flow_controller.ex index 39497332a..f2f2ca215 100644 --- a/lib/membrane/core/element/manual_flow_controller.ex +++ b/lib/membrane/core/element/manual_flow_controller.ex @@ -77,7 +77,7 @@ defmodule Membrane.Core.Element.ManualFlowController do @spec supply_demand(Pad.ref(), State.t()) :: State.t() def supply_demand(pad_ref, %State{delay_demands?: true} = state) do - delay_demand_supply(pad_ref, state) + delay_supplying_demand(pad_ref, state) end def supply_demand(pad_ref, state) do @@ -85,10 +85,6 @@ defmodule Membrane.Core.Element.ManualFlowController do |> handle_delayed_demands() end - def delay_demand_supply(pad_ref, state) do - Map.update!(state, :delayed_demands, &MapSet.put(&1, {pad_ref, :supply})) - end - defp do_supply_demand(pad_ref, state) do # marking is state that actual demand supply has been started (note changing back to false when finished) state = %State{state | delay_demands?: true} @@ -103,6 +99,11 @@ defmodule Membrane.Core.Element.ManualFlowController do %State{state | delay_demands?: false} end + @spec delay_supplying_demand(Pad.ref(), State.t()) :: State.t() + def delay_supplying_demand(pad_ref, state) do + Map.update!(state, :delayed_demands, &MapSet.put(&1, {pad_ref, :supply})) + end + @spec update_demand( Pad.ref(), non_neg_integer() | (non_neg_integer() -> non_neg_integer()), diff --git a/lib/membrane/core/element/manual_flow_controller/input_queue.ex b/lib/membrane/core/element/manual_flow_controller/input_queue.ex index dbebea030..8eb1d4ed4 100644 --- a/lib/membrane/core/element/manual_flow_controller/input_queue.ex +++ b/lib/membrane/core/element/manual_flow_controller/input_queue.ex @@ -54,12 +54,12 @@ defmodule Membrane.Core.Element.ManualFlowController.InputQueue do defstruct @enforce_keys ++ [size: 0, demand: 0] - @default_target_size_factor 40 + @default_target_size_factor 100 @spec default_min_demand_factor() :: number() def default_min_demand_factor, do: 0.25 - @spec init(%{ + @spec new(%{ inbound_demand_unit: Buffer.Metric.unit(), outbound_demand_unit: Buffer.Metric.unit(), atomic_demand: AtomicDemand.t(), @@ -67,7 +67,7 @@ defmodule Membrane.Core.Element.ManualFlowController.InputQueue do log_tag: String.t(), target_size: pos_integer() | nil }) :: t() - def init(config) do + def new(config) do %{ inbound_demand_unit: inbound_demand_unit, outbound_demand_unit: outbound_demand_unit, diff --git a/lib/membrane/core/element/pad_controller.ex b/lib/membrane/core/element/pad_controller.ex index cbad131a3..68863f30a 100644 --- a/lib/membrane/core/element/pad_controller.ex +++ b/lib/membrane/core/element/pad_controller.ex @@ -376,7 +376,7 @@ defmodule Membrane.Core.Element.PadController do } = pad_data input_queue = - InputQueue.init(%{ + InputQueue.new(%{ inbound_demand_unit: other_pad_info[:demand_unit] || this_demand_unit, outbound_demand_unit: this_demand_unit, atomic_demand: atomic_demand, diff --git a/test/membrane/core/element/event_controller_test.exs b/test/membrane/core/element/event_controller_test.exs index 57e2ccfad..5b67e4853 100644 --- a/test/membrane/core/element/event_controller_test.exs +++ b/test/membrane/core/element/event_controller_test.exs @@ -33,7 +33,7 @@ defmodule Membrane.Core.Element.EventControllerTest do }) input_queue = - InputQueue.init(%{ + InputQueue.new(%{ inbound_demand_unit: :buffers, outbound_demand_unit: :buffers, pad_ref: :some_pad, diff --git a/test/membrane/core/element/input_queue_test.exs b/test/membrane/core/element/input_queue_test.exs index 42904d7aa..0bef78f60 100644 --- a/test/membrane/core/element/input_queue_test.exs +++ b/test/membrane/core/element/input_queue_test.exs @@ -25,7 +25,7 @@ defmodule Membrane.Core.Element.InputQueueTest do end test "return InputQueue struct and send demand message", context do - assert InputQueue.init(%{ + assert InputQueue.new(%{ inbound_demand_unit: context.inbound_demand_unit, outbound_demand_unit: context.outbound_demand_unit, pad_ref: context.pad_ref, @@ -187,7 +187,7 @@ defmodule Membrane.Core.Element.InputQueueTest do describe ".take/2 should" do setup do input_queue = - InputQueue.init(%{ + InputQueue.new(%{ inbound_demand_unit: :buffers, outbound_demand_unit: :buffers, pad_ref: :output_pad_ref, @@ -302,7 +302,7 @@ defmodule Membrane.Core.Element.InputQueueTest do atomic_demand = new_atomic_demand() queue = - InputQueue.init(%{ + InputQueue.new(%{ inbound_demand_unit: :bytes, outbound_demand_unit: :buffers, atomic_demand: atomic_demand, @@ -339,7 +339,7 @@ defmodule Membrane.Core.Element.InputQueueTest do atomic_demand = new_atomic_demand() queue = - InputQueue.init(%{ + InputQueue.new(%{ inbound_demand_unit: :buffers, outbound_demand_unit: :bytes, atomic_demand: atomic_demand, diff --git a/test/membrane/core/element/lifecycle_controller_test.exs b/test/membrane/core/element/lifecycle_controller_test.exs index de2a33d1a..61b4bece6 100644 --- a/test/membrane/core/element/lifecycle_controller_test.exs +++ b/test/membrane/core/element/lifecycle_controller_test.exs @@ -33,7 +33,7 @@ defmodule Membrane.Core.Element.LifecycleControllerTest do }) input_queue = - InputQueue.init(%{ + InputQueue.new(%{ inbound_demand_unit: :buffers, outbound_demand_unit: :buffers, atomic_demand: atomic_demand, diff --git a/test/membrane/core/element/stream_format_controller_test.exs b/test/membrane/core/element/stream_format_controller_test.exs index 914f02c8d..1e3a2cb6e 100644 --- a/test/membrane/core/element/stream_format_controller_test.exs +++ b/test/membrane/core/element/stream_format_controller_test.exs @@ -26,7 +26,7 @@ defmodule Membrane.Core.Element.StreamFormatControllerTest do }) input_queue = - InputQueue.init(%{ + InputQueue.new(%{ inbound_demand_unit: :buffers, outbound_demand_unit: :buffers, atomic_demand: atomic_demand,