From a6b15cb2ec48371c4a7ccb6342f165dcb5c9517e Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Fri, 15 Mar 2024 13:55:14 +0200 Subject: [PATCH] Delete handling_actions? flag --- lib/membrane/core/bin/state.ex | 2 - lib/membrane/core/callback_handler.ex | 4 +- lib/membrane/core/element.ex | 2 +- lib/membrane/core/element/action_handler.ex | 52 ++++++++++--------- .../core/element/buffer_controller.ex | 7 ++- .../core/element/demand_controller.ex | 28 ++++++++++ lib/membrane/core/element/demand_handler.ex | 13 ++--- lib/membrane/core/element/state.ex | 2 - lib/membrane/core/pipeline/state.ex | 2 - 9 files changed, 72 insertions(+), 40 deletions(-) diff --git a/lib/membrane/core/bin/state.ex b/lib/membrane/core/bin/state.ex index c917513a5..e9605a1cf 100644 --- a/lib/membrane/core/bin/state.ex +++ b/lib/membrane/core/bin/state.ex @@ -45,7 +45,6 @@ defmodule Membrane.Core.Bin.State do terminating?: boolean(), resource_guard: Membrane.ResourceGuard.t(), setup_incomplete?: boolean(), - # handling_action?: boolean(), stalker: Membrane.Core.Stalker.t() } @@ -73,7 +72,6 @@ defmodule Membrane.Core.Bin.State do initialized?: false, terminating?: false, setup_incomplete?: false, - # handling_action?: false, stalker: nil, resource_guard: nil, subprocess_supervisor: nil, diff --git a/lib/membrane/core/callback_handler.ex b/lib/membrane/core/callback_handler.ex index 2c8e535d8..9c3e3099f 100644 --- a/lib/membrane/core/callback_handler.ex +++ b/lib/membrane/core/callback_handler.ex @@ -190,7 +190,9 @@ defmodule Membrane.Core.CallbackHandler do end was_delay_consuming_queues? = Map.get(state, :delay_consuming_queues?, false) - state = if Component.is_element?(state), do: %{state | delay_consuming_queues?: true}, else: state + + state = + if Component.is_element?(state), do: %{state | delay_consuming_queues?: true}, else: state state = Enum.reduce(actions, state, fn action, state -> diff --git a/lib/membrane/core/element.ex b/lib/membrane/core/element.ex index f4f22384d..218ea7300 100644 --- a/lib/membrane/core/element.ex +++ b/lib/membrane/core/element.ex @@ -157,7 +157,6 @@ defmodule Membrane.Core.Element do terminating?: false, setup_incomplete?: false, effective_flow_control: :push, - # handling_action?: false, popping_auto_flow_queue?: false, pads_to_snapshot: MapSet.new(), stalker: options.stalker, @@ -227,6 +226,7 @@ defmodule Membrane.Core.Element do end defp do_handle_info(Message.new(:resume_handle_demand_loop), state) do + # pytanie: consume queues czy handle delayed demands? state = DemandHandler.handle_delayed_demands(state) {:noreply, state} end diff --git a/lib/membrane/core/element/action_handler.ex b/lib/membrane/core/element/action_handler.ex index 98bbe381a..3c1e611e0 100644 --- a/lib/membrane/core/element/action_handler.ex +++ b/lib/membrane/core/element/action_handler.ex @@ -51,41 +51,43 @@ defmodule Membrane.Core.Element.ActionHandler do # favoring manual pads over auto pads (or vice versa), especially after # introducting auto flow queues. + # with %{delay_consuming_queues?: false, handling_action?: true} <- state do + # raise "dupppppaaaaaaa" + # end - # IO.inspect({state.delay_consuming_queues?, state.handling_action?}, label: "TT FF TF") - with %{delay_consuming_queues?: false, handling_action?: true} <- state do - raise "dupppppaaaaaaa" - end - - manual_demands_first? = Enum.random([1, 2]) == 1 + # manual_demands_first? = Enum.random([1, 2]) == 1 - state = - if manual_demands_first?, - do: maybe_handle_delayed_demands(state), - else: state + # state = + # if manual_demands_first?, + # do: maybe_handle_delayed_demands(state), + # else: state - state = maybe_handle_pads_to_snapshot(state) + # state = maybe_handle_pads_to_snapshot(state) - state = - if manual_demands_first?, - do: state, - else: maybe_handle_delayed_demands(state) + # state = + # if manual_demands_first?, + # do: state, + # else: maybe_handle_delayed_demands(state) - state - end + # state - defp maybe_handle_delayed_demands(state) do with %{delay_consuming_queues?: false} <- state do - DemandHandler.handle_delayed_demands(state) + DemandController.consume_queues(state) end end - defp maybe_handle_pads_to_snapshot(state) do - with %{delay_consuming_queues?: false} <- state do - Enum.reduce(state.pads_to_snapshot, state, &DemandController.snapshot_atomic_demand/2) - |> Map.put(:pads_to_snapshot, MapSet.new()) - end - end + # defp maybe_handle_delayed_demands(state) do + # with %{delay_consuming_queues?: false} <- state do + # DemandHandler.handle_delayed_demands(state) + # end + # end + + # defp maybe_handle_pads_to_snapshot(state) do + # with %{delay_consuming_queues?: false} <- state do + # Enum.reduce(state.pads_to_snapshot, state, &DemandController.snapshot_atomic_demand/2) + # |> Map.put(:pads_to_snapshot, MapSet.new()) + # end + # end @impl CallbackHandler def handle_action({action, _}, :handle_init, _params, _state) diff --git a/lib/membrane/core/element/buffer_controller.ex b/lib/membrane/core/element/buffer_controller.ex index 973f99519..ccc875a4f 100644 --- a/lib/membrane/core/element/buffer_controller.ex +++ b/lib/membrane/core/element/buffer_controller.ex @@ -60,7 +60,12 @@ defmodule Membrane.Core.Element.BufferController do end end - @spec do_handle_ingoing_buffer(Pad.ref(), PadModel.pad_data(), [Buffer.t()] | Buffer.t(), State.t()) :: + @spec do_handle_ingoing_buffer( + Pad.ref(), + PadModel.pad_data(), + [Buffer.t()] | Buffer.t(), + State.t() + ) :: State.t() defp do_handle_ingoing_buffer(pad_ref, %{flow_control: :auto} = data, buffers, state) do %{demand: demand, demand_unit: demand_unit, stalker_metrics: stalker_metrics} = data diff --git a/lib/membrane/core/element/demand_controller.ex b/lib/membrane/core/element/demand_controller.ex index daa5fb784..1d517d61d 100644 --- a/lib/membrane/core/element/demand_controller.ex +++ b/lib/membrane/core/element/demand_controller.ex @@ -21,6 +21,34 @@ defmodule Membrane.Core.Element.DemandController do require Membrane.Core.Child.PadModel, as: PadModel require Membrane.Logger + # problem potencjalnie jest taki, ze np handlujac redemand robimy snapshot auto demandow, co moze powodowac faworyzacje auto > manual + # wiec zrobilem zawsze zaczynanie od manual, potem auto + # 1) nie wiem czy jest to problem, bo: + # - resume demand loop counter + # - zawsze ogranicza nas bedac w petli to co jest w kolejkach + + @spec consume_queues(State.t()) :: State.t() + def consume_queues(state) do + # if Enum.random([1, 2]) == 1 do + # state + # |> snapshot_pads_to_snapshot() + # |> DemandHandler.handle_delayed_demands() + # else + # state + # |> DemandHandler.handle_delayed_demands() + # |> snapshot_pads_to_snapshot() + # end + state + |> DemandHandler.handle_delayed_demands() + |> snapshot_pads_to_snapshot() + end + + @spec snapshot_pads_to_snapshot(State.t()) :: State.t() + def snapshot_pads_to_snapshot(state) do + Enum.reduce(state.pads_to_snapshot, state, &snapshot_atomic_demand/2) + |> Map.put(:pads_to_snapshot, MapSet.new()) + end + @spec snapshot_atomic_demand(Pad.ref(), State.t()) :: State.t() def snapshot_atomic_demand(pad_ref, state) do with {:ok, pad_data} when not pad_data.end_of_stream? <- PadModel.get_data(state, pad_ref), diff --git a/lib/membrane/core/element/demand_handler.ex b/lib/membrane/core/element/demand_handler.ex index 72f783fdf..467bbec1a 100644 --- a/lib/membrane/core/element/demand_handler.ex +++ b/lib/membrane/core/element/demand_handler.ex @@ -3,6 +3,7 @@ defmodule Membrane.Core.Element.DemandHandler do # Module handling demands requested on output pads. + alias Membrane.Core.Element.DemandController alias Membrane.Core.CallbackHandler alias Membrane.Core.Element.{ @@ -38,7 +39,7 @@ defmodule Membrane.Core.Element.DemandHandler do def handle_redemand(pad_ref, %State{} = state) do do_handle_redemand(pad_ref, state) - |> handle_delayed_demands() + |> DemandController.consume_queues() end defp do_handle_redemand(pad_ref, state) do @@ -80,7 +81,8 @@ defmodule Membrane.Core.Element.DemandHandler do def supply_demand(pad_ref, state) do do_supply_demand(pad_ref, state) - |> handle_delayed_demands() + # |> handle_delayed_demands() + |> DemandController.consume_queues() end defp do_supply_demand(pad_ref, state) do @@ -120,7 +122,7 @@ defmodule Membrane.Core.Element.DemandHandler do # one pad are supplied right away while another one is waiting for buffers # potentially for a long time. - state = + # state = cond do state.delay_consuming_queues? -> raise "Cannot handle delayed demands while already supplying demand" @@ -146,9 +148,8 @@ defmodule Membrane.Core.Element.DemandHandler do end end - Enum.reduce(state.pads_to_snapshot, state, &Membrane.Core.Element.DemandController.snapshot_atomic_demand/2) - |> Map.put(:pads_to_snapshot, MapSet.new()) - + # Enum.reduce(state.pads_to_snapshot, state, &Membrane.Core.Element.DemandController.snapshot_atomic_demand/2) + # |> Map.put(:pads_to_snapshot, MapSet.new()) end @spec remove_pad_from_delayed_demands(Pad.ref(), State.t()) :: State.t() diff --git a/lib/membrane/core/element/state.ex b/lib/membrane/core/element/state.ex index cf0656b68..f65e1b068 100644 --- a/lib/membrane/core/element/state.ex +++ b/lib/membrane/core/element/state.ex @@ -42,7 +42,6 @@ defmodule Membrane.Core.Element.State do terminating?: boolean(), setup_incomplete?: boolean(), effective_flow_control: EffectiveFlowController.effective_flow_control(), - handling_action?: boolean(), popping_auto_flow_queue?: boolean(), pads_to_snapshot: MapSet.t(), stalker: Membrane.Core.Stalker.t(), @@ -75,7 +74,6 @@ defmodule Membrane.Core.Element.State do :terminating?, :setup_incomplete?, :delay_consuming_queues?, - :handling_action?, :popping_auto_flow_queue?, :stalker, :resource_guard, diff --git a/lib/membrane/core/pipeline/state.ex b/lib/membrane/core/pipeline/state.ex index 6f644947b..37f24ea2d 100644 --- a/lib/membrane/core/pipeline/state.ex +++ b/lib/membrane/core/pipeline/state.ex @@ -32,7 +32,6 @@ defmodule Membrane.Core.Pipeline.State do terminating?: boolean(), resource_guard: Membrane.ResourceGuard.t(), setup_incomplete?: boolean(), - handling_action?: boolean(), stalker: Membrane.Core.Stalker.t(), subprocess_supervisor: pid(), awaiting_setup_completition?: boolean() @@ -56,7 +55,6 @@ defmodule Membrane.Core.Pipeline.State do initialized?: false, terminating?: false, setup_incomplete?: false, - handling_action?: false, stalker: nil, resource_guard: nil, subprocess_supervisor: nil,