Skip to content

Commit

Permalink
Delete handling_actions? flag
Browse files Browse the repository at this point in the history
  • Loading branch information
FelonEkonom committed Mar 15, 2024
1 parent e6538fa commit a6b15cb
Show file tree
Hide file tree
Showing 9 changed files with 72 additions and 40 deletions.
2 changes: 0 additions & 2 deletions lib/membrane/core/bin/state.ex
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ defmodule Membrane.Core.Bin.State do
terminating?: boolean(),
resource_guard: Membrane.ResourceGuard.t(),
setup_incomplete?: boolean(),
# handling_action?: boolean(),
stalker: Membrane.Core.Stalker.t()
}

Expand Down Expand Up @@ -73,7 +72,6 @@ defmodule Membrane.Core.Bin.State do
initialized?: false,
terminating?: false,
setup_incomplete?: false,
# handling_action?: false,
stalker: nil,
resource_guard: nil,
subprocess_supervisor: nil,
Expand Down
4 changes: 3 additions & 1 deletion lib/membrane/core/callback_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,9 @@ defmodule Membrane.Core.CallbackHandler do
end

was_delay_consuming_queues? = Map.get(state, :delay_consuming_queues?, false)
state = if Component.is_element?(state), do: %{state | delay_consuming_queues?: true}, else: state

state =
if Component.is_element?(state), do: %{state | delay_consuming_queues?: true}, else: state

state =
Enum.reduce(actions, state, fn action, state ->
Expand Down
2 changes: 1 addition & 1 deletion lib/membrane/core/element.ex
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,6 @@ defmodule Membrane.Core.Element do
terminating?: false,
setup_incomplete?: false,
effective_flow_control: :push,
# handling_action?: false,
popping_auto_flow_queue?: false,
pads_to_snapshot: MapSet.new(),
stalker: options.stalker,
Expand Down Expand Up @@ -227,6 +226,7 @@ defmodule Membrane.Core.Element do
end

defp do_handle_info(Message.new(:resume_handle_demand_loop), state) do
# pytanie: consume queues czy handle delayed demands?
state = DemandHandler.handle_delayed_demands(state)
{:noreply, state}
end
Expand Down
52 changes: 27 additions & 25 deletions lib/membrane/core/element/action_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -51,41 +51,43 @@ defmodule Membrane.Core.Element.ActionHandler do
# favoring manual pads over auto pads (or vice versa), especially after
# introducting auto flow queues.

# with %{delay_consuming_queues?: false, handling_action?: true} <- state do
# raise "dupppppaaaaaaa"
# end

# IO.inspect({state.delay_consuming_queues?, state.handling_action?}, label: "TT FF TF")
with %{delay_consuming_queues?: false, handling_action?: true} <- state do
raise "dupppppaaaaaaa"
end

manual_demands_first? = Enum.random([1, 2]) == 1
# manual_demands_first? = Enum.random([1, 2]) == 1

state =
if manual_demands_first?,
do: maybe_handle_delayed_demands(state),
else: state
# state =
# if manual_demands_first?,
# do: maybe_handle_delayed_demands(state),
# else: state

state = maybe_handle_pads_to_snapshot(state)
# state = maybe_handle_pads_to_snapshot(state)

state =
if manual_demands_first?,
do: state,
else: maybe_handle_delayed_demands(state)
# state =
# if manual_demands_first?,
# do: state,
# else: maybe_handle_delayed_demands(state)

state
end
# state

defp maybe_handle_delayed_demands(state) do
with %{delay_consuming_queues?: false} <- state do
DemandHandler.handle_delayed_demands(state)
DemandController.consume_queues(state)
end
end

defp maybe_handle_pads_to_snapshot(state) do
with %{delay_consuming_queues?: false} <- state do
Enum.reduce(state.pads_to_snapshot, state, &DemandController.snapshot_atomic_demand/2)
|> Map.put(:pads_to_snapshot, MapSet.new())
end
end
# defp maybe_handle_delayed_demands(state) do
# with %{delay_consuming_queues?: false} <- state do
# DemandHandler.handle_delayed_demands(state)
# end
# end

# defp maybe_handle_pads_to_snapshot(state) do
# with %{delay_consuming_queues?: false} <- state do
# Enum.reduce(state.pads_to_snapshot, state, &DemandController.snapshot_atomic_demand/2)
# |> Map.put(:pads_to_snapshot, MapSet.new())
# end
# end

@impl CallbackHandler
def handle_action({action, _}, :handle_init, _params, _state)
Expand Down
7 changes: 6 additions & 1 deletion lib/membrane/core/element/buffer_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,12 @@ defmodule Membrane.Core.Element.BufferController do
end
end

@spec do_handle_ingoing_buffer(Pad.ref(), PadModel.pad_data(), [Buffer.t()] | Buffer.t(), State.t()) ::
@spec do_handle_ingoing_buffer(
Pad.ref(),
PadModel.pad_data(),
[Buffer.t()] | Buffer.t(),
State.t()
) ::
State.t()
defp do_handle_ingoing_buffer(pad_ref, %{flow_control: :auto} = data, buffers, state) do
%{demand: demand, demand_unit: demand_unit, stalker_metrics: stalker_metrics} = data
Expand Down
28 changes: 28 additions & 0 deletions lib/membrane/core/element/demand_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,34 @@ defmodule Membrane.Core.Element.DemandController do
require Membrane.Core.Child.PadModel, as: PadModel
require Membrane.Logger

# problem potencjalnie jest taki, ze np handlujac redemand robimy snapshot auto demandow, co moze powodowac faworyzacje auto > manual
# wiec zrobilem zawsze zaczynanie od manual, potem auto
# 1) nie wiem czy jest to problem, bo:
# - resume demand loop counter
# - zawsze ogranicza nas bedac w petli to co jest w kolejkach

@spec consume_queues(State.t()) :: State.t()
def consume_queues(state) do
# if Enum.random([1, 2]) == 1 do
# state
# |> snapshot_pads_to_snapshot()
# |> DemandHandler.handle_delayed_demands()
# else
# state
# |> DemandHandler.handle_delayed_demands()
# |> snapshot_pads_to_snapshot()
# end
state
|> DemandHandler.handle_delayed_demands()
|> snapshot_pads_to_snapshot()
end

@spec snapshot_pads_to_snapshot(State.t()) :: State.t()
def snapshot_pads_to_snapshot(state) do
Enum.reduce(state.pads_to_snapshot, state, &snapshot_atomic_demand/2)
|> Map.put(:pads_to_snapshot, MapSet.new())
end

@spec snapshot_atomic_demand(Pad.ref(), State.t()) :: State.t()
def snapshot_atomic_demand(pad_ref, state) do
with {:ok, pad_data} when not pad_data.end_of_stream? <- PadModel.get_data(state, pad_ref),
Expand Down
13 changes: 7 additions & 6 deletions lib/membrane/core/element/demand_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ defmodule Membrane.Core.Element.DemandHandler do

# Module handling demands requested on output pads.

alias Membrane.Core.Element.DemandController
alias Membrane.Core.CallbackHandler

alias Membrane.Core.Element.{
Expand Down Expand Up @@ -38,7 +39,7 @@ defmodule Membrane.Core.Element.DemandHandler do

def handle_redemand(pad_ref, %State{} = state) do
do_handle_redemand(pad_ref, state)
|> handle_delayed_demands()
|> DemandController.consume_queues()
end

defp do_handle_redemand(pad_ref, state) do
Expand Down Expand Up @@ -80,7 +81,8 @@ defmodule Membrane.Core.Element.DemandHandler do

def supply_demand(pad_ref, state) do
do_supply_demand(pad_ref, state)
|> handle_delayed_demands()
# |> handle_delayed_demands()
|> DemandController.consume_queues()
end

defp do_supply_demand(pad_ref, state) do
Expand Down Expand Up @@ -120,7 +122,7 @@ defmodule Membrane.Core.Element.DemandHandler do
# one pad are supplied right away while another one is waiting for buffers
# potentially for a long time.

state =
# state =
cond do
state.delay_consuming_queues? ->
raise "Cannot handle delayed demands while already supplying demand"
Expand All @@ -146,9 +148,8 @@ defmodule Membrane.Core.Element.DemandHandler do
end
end

Enum.reduce(state.pads_to_snapshot, state, &Membrane.Core.Element.DemandController.snapshot_atomic_demand/2)
|> Map.put(:pads_to_snapshot, MapSet.new())

# Enum.reduce(state.pads_to_snapshot, state, &Membrane.Core.Element.DemandController.snapshot_atomic_demand/2)
# |> Map.put(:pads_to_snapshot, MapSet.new())
end

@spec remove_pad_from_delayed_demands(Pad.ref(), State.t()) :: State.t()
Expand Down
2 changes: 0 additions & 2 deletions lib/membrane/core/element/state.ex
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ defmodule Membrane.Core.Element.State do
terminating?: boolean(),
setup_incomplete?: boolean(),
effective_flow_control: EffectiveFlowController.effective_flow_control(),
handling_action?: boolean(),
popping_auto_flow_queue?: boolean(),
pads_to_snapshot: MapSet.t(),
stalker: Membrane.Core.Stalker.t(),
Expand Down Expand Up @@ -75,7 +74,6 @@ defmodule Membrane.Core.Element.State do
:terminating?,
:setup_incomplete?,
:delay_consuming_queues?,
:handling_action?,
:popping_auto_flow_queue?,
:stalker,
:resource_guard,
Expand Down
2 changes: 0 additions & 2 deletions lib/membrane/core/pipeline/state.ex
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ defmodule Membrane.Core.Pipeline.State do
terminating?: boolean(),
resource_guard: Membrane.ResourceGuard.t(),
setup_incomplete?: boolean(),
handling_action?: boolean(),
stalker: Membrane.Core.Stalker.t(),
subprocess_supervisor: pid(),
awaiting_setup_completition?: boolean()
Expand All @@ -56,7 +55,6 @@ defmodule Membrane.Core.Pipeline.State do
initialized?: false,
terminating?: false,
setup_incomplete?: false,
handling_action?: false,
stalker: nil,
resource_guard: nil,
subprocess_supervisor: nil,
Expand Down

0 comments on commit a6b15cb

Please sign in to comment.