Skip to content

Commit

Permalink
Delete unncessary flags, rename some modules
Browse files Browse the repository at this point in the history
  • Loading branch information
FelonEkonom committed Mar 26, 2024
1 parent 27a3b01 commit 9c309b0
Show file tree
Hide file tree
Showing 12 changed files with 74 additions and 72 deletions.
23 changes: 5 additions & 18 deletions lib/membrane/core/callback_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ defmodule Membrane.Core.CallbackHandler do
use Bunch

alias Membrane.CallbackError
alias Membrane.Core.Component

require Membrane.Logger

Expand Down Expand Up @@ -189,15 +188,8 @@ defmodule Membrane.Core.CallbackHandler do
reraise e, __STACKTRACE__
end

# was_handling_action? = state.handling_action?
# state = %{state | handling_action?: true}

# Updating :delay_demands? flag value here is a temporal fix.
# Setting it to `true` while handling actions causes postponing calls
# of handle_redemand/2 and supply_demand/2 until a moment, when all
# actions returned from the callback are handled
was_delay_demands? = Map.get(state, :delay_demands?, false)
state = if Component.is_element?(state), do: %{state | delay_demands?: true}, else: state
# was_delay_demands? = Map.get(state, :delay_demands?, false)
# state = if Component.is_element?(state), do: %{state | delay_demands?: true}, else: state

state =
Enum.reduce(actions, state, fn action, state ->
Expand All @@ -214,14 +206,9 @@ defmodule Membrane.Core.CallbackHandler do
end)

# state =
# if was_handling_action?,
# do: state,
# else: %{state | handling_action?: false}

state =
if Component.is_element?(state) and not was_delay_demands?,
do: %{state | delay_demands?: false},
else: state
# if Component.is_element?(state) and not was_delay_demands?,
# do: %{state | delay_demands?: false},
# else: state

handler_module.handle_end_of_actions(callback, state)
end
Expand Down
3 changes: 1 addition & 2 deletions lib/membrane/core/element.ex
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ defmodule Membrane.Core.Element do
alias Membrane.Core.Element.{
BufferController,
DemandController,
DemandHandler,
EffectiveFlowController,
EventController,
LifecycleController,
Expand Down Expand Up @@ -211,7 +210,7 @@ defmodule Membrane.Core.Element do
end

defp do_handle_info(Message.new(:resume_delayed_demands_loop), state) do
state = DemandHandler.resume_delayed_demands_loop(state)
state = DemandController.Manual.resume_delayed_demands_loop(state)
{:noreply, state}
end

Expand Down
40 changes: 23 additions & 17 deletions lib/membrane/core/element/action_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,10 @@ defmodule Membrane.Core.Element.ActionHandler do

alias Membrane.Core.Element.{
DemandController,
DemandHandler,
State,
StreamFormatController
}

alias Membrane.Core.Element.DemandController.AutoFlowUtils
alias Membrane.Core.{Events, TimerController}
alias Membrane.Element.Action

Expand All @@ -51,26 +49,31 @@ defmodule Membrane.Core.Element.ActionHandler do
# favoring manual pads over auto pads (or vice versa), especially after
# introducting auto flow queues.

if Enum.random([1, 2]) == 1 do
snapshot(callback, state)
|> hdd()
# Condition in if below is caused by a fact, that handle_spec_started is the only callback, that might
# be executed in between handling actions returned from other callbacks.
# This callback has been deprecated and should be removed in v2.0.0, along with the if statement below.

if callback != :handle_spec_started do
if Enum.random([1, 2]) == 1 do
snapshot(callback, state)
|> hdd()
else
state
|> hdd()
|> then(&snapshot(callback, &1))
end
else
state
|> hdd()
|> then(&snapshot(callback, &1))
end
end

defp hdd(state) do
with %{delay_demands?: false} <- state do
DemandHandler.handle_delayed_demands(state)
DemandController.Manual.handle_delayed_demands(state)
end
end

defp snapshot(callback, state) do
# Condition in if below is caused by a fact, that handle_spec_started is the only callback, that might
# be executed in between handling actions returned from other callbacks.
# This callback has been deprecated and should be removed in v2.0.0, along with the if statement below.
if callback != :handle_spec_started do
state.pads_to_snapshot
|> Enum.shuffle()
Expand Down Expand Up @@ -179,13 +182,13 @@ defmodule Membrane.Core.Element.ActionHandler do
@impl CallbackHandler
def handle_action({:pause_auto_demand, in_ref}, _cb, _params, %State{type: type} = state)
when type in [:sink, :filter, :endpoint] do
DemandController.AutoFlowUtils.pause_demands(in_ref, state)
DemandController.Auto.pause_demands(in_ref, state)
end

@impl CallbackHandler
def handle_action({:resume_auto_demand, in_ref}, _cb, _params, %State{type: type} = state)
when type in [:sink, :filter, :endpoint] do
DemandController.AutoFlowUtils.resume_demands(in_ref, state)
DemandController.Auto.resume_demands(in_ref, state)
end

@impl CallbackHandler
Expand Down Expand Up @@ -417,7 +420,9 @@ defmodule Membrane.Core.Element.ActionHandler do
defp supply_demand(pad_ref, size, state) do
with %{direction: :input, flow_control: :manual} <-
PadModel.get_data!(state, pad_ref) do
DemandHandler.supply_demand(pad_ref, size, state)
# todo: get_data! above could be eradicated
state = DemandController.Manual.update_demand(pad_ref, size, state)
DemandController.Manual.delay_demand_supply(pad_ref, state)
else
%{direction: :output} ->
raise PadDirectionError, action: :demand, direction: :output, pad: pad_ref
Expand All @@ -437,7 +442,8 @@ defmodule Membrane.Core.Element.ActionHandler do
when type in [:source, :filter, :endpoint] do
with %{direction: :output, flow_control: :manual} <-
PadModel.get_data!(state, pad_ref) do
DemandHandler.handle_redemand(pad_ref, state)
# todo: get_data! above could be eradicated
DemandController.Manual.delay_redemand(pad_ref, state)
else
%{direction: :input} ->
raise ElementError, "Tried to make a redemand on input pad #{inspect(pad_ref)}"
Expand Down Expand Up @@ -473,10 +479,10 @@ defmodule Membrane.Core.Element.ActionHandler do
@spec handle_outgoing_event(Pad.ref(), Event.t(), State.t()) :: State.t()
defp handle_outgoing_event(pad_ref, %Events.EndOfStream{}, state) do
with %{direction: :output, end_of_stream?: false} <- PadModel.get_data!(state, pad_ref) do
DemandHandler.remove_pad_from_delayed_demands(pad_ref, state)
DemandController.Manual.remove_pad_from_delayed_demands(pad_ref, state)
|> Map.update!(:satisfied_auto_output_pads, &MapSet.delete(&1, pad_ref))
|> PadModel.set_data!(pad_ref, :end_of_stream?, true)
|> AutoFlowUtils.pop_queues_and_bump_demand()
|> DemandController.Auto.pop_queues_and_bump_demand()
else
%{direction: :input} ->
raise PadDirectionError, action: "end of stream", direction: :input, pad: pad_ref
Expand Down
11 changes: 5 additions & 6 deletions lib/membrane/core/element/buffer_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,21 @@ defmodule Membrane.Core.Element.BufferController do
alias Membrane.Core.Element.{
ActionHandler,
CallbackContext,
DemandHandler,
DemandController,
EventController,
InputQueue,
PlaybackQueue,
State
}

alias Membrane.Core.Element.DemandController.AutoFlowUtils
alias Membrane.Core.Telemetry

require Membrane.Core.Child.PadModel
require Membrane.Core.Telemetry

@doc """
Handles incoming buffer: either stores it in InputQueue, or executes element's
callback. Also calls `Membrane.Core.Element.DemandHandler.supply_demand/2`
callback. Also calls `Membrane.Core.Element.DemandController.Manual.supply_demand/2`
to check if there are any unsupplied demands.
"""
@spec handle_buffer(Pad.ref(), [Buffer.t()] | Buffer.t(), State.t()) :: State.t()
Expand Down Expand Up @@ -70,10 +69,10 @@ defmodule Membrane.Core.Element.BufferController do
:atomics.put(stalker_metrics.demand, 1, demand - buf_size)

if state.effective_flow_control == :pull and MapSet.size(state.satisfied_auto_output_pads) > 0 do
AutoFlowUtils.store_buffers_in_queue(pad_ref, buffers, state)
DemandController.Auto.store_buffers_in_queue(pad_ref, buffers, state)
else
state = exec_buffer_callback(pad_ref, buffers, state)
AutoFlowUtils.auto_adjust_atomic_demand(pad_ref, state)
DemandController.Auto.auto_adjust_atomic_demand(pad_ref, state)
end
end

Expand All @@ -84,7 +83,7 @@ defmodule Membrane.Core.Element.BufferController do
state = PadModel.set_data!(state, pad_ref, :input_queue, input_queue)

if old_input_queue |> InputQueue.empty?() do
DemandHandler.supply_demand(pad_ref, state)
DemandController.Manual.supply_demand(pad_ref, state)
else
state
end
Expand Down
7 changes: 3 additions & 4 deletions lib/membrane/core/element/demand_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,12 @@ defmodule Membrane.Core.Element.DemandController do

use Bunch

alias __MODULE__.AutoFlowUtils
alias __MODULE__.{Auto, Manual}

alias Membrane.Buffer

alias Membrane.Core.Element.{
AtomicDemand,
DemandHandler,
PlaybackQueue,
State
}
Expand Down Expand Up @@ -56,7 +55,7 @@ defmodule Membrane.Core.Element.DemandController do
if atomic_value > 0 do
state
|> Map.update!(:satisfied_auto_output_pads, &MapSet.delete(&1, pad_data.ref))
|> AutoFlowUtils.pop_queues_and_bump_demand()
|> Auto.pop_queues_and_bump_demand()
else
state
end
Expand All @@ -79,7 +78,7 @@ defmodule Membrane.Core.Element.DemandController do
}
)

DemandHandler.handle_redemand(pad_data.ref, state)
Manual.handle_redemand(pad_data.ref, state)
else
_other -> state
end
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
defmodule Membrane.Core.Element.DemandController.AutoFlowUtils do
defmodule Membrane.Core.Element.DemandController.Auto do
@moduledoc false

alias Membrane.Buffer
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
defmodule Membrane.Core.Element.DemandHandler do
defmodule Membrane.Core.Element.DemandController.Manual do
@moduledoc false

# Module handling demands requested on output pads.
Expand All @@ -24,6 +24,10 @@ defmodule Membrane.Core.Element.DemandHandler do

@handle_demand_loop_limit 20

def delay_redemand(pad_ref, state) do
Map.update!(state, :delayed_demands, &MapSet.put(&1, {pad_ref, :redemand}))
end

@doc """
Called when redemand action was returned.
* If element is currently supplying demand, it means that after finishing `supply_demand` it will call
Expand All @@ -33,7 +37,7 @@ defmodule Membrane.Core.Element.DemandHandler do
"""
@spec handle_redemand(Pad.ref(), State.t()) :: State.t()
def handle_redemand(pad_ref, %State{delay_demands?: true} = state) do
Map.update!(state, :delayed_demands, &MapSet.put(&1, {pad_ref, :redemand}))
delay_redemand(pad_ref, state)
end

def handle_redemand(pad_ref, %State{} = state) do
Expand Down Expand Up @@ -65,24 +69,23 @@ defmodule Membrane.Core.Element.DemandHandler do
"""
@spec supply_demand(
Pad.ref(),
size :: non_neg_integer | (non_neg_integer() -> non_neg_integer()),
State.t()
) :: State.t()
def supply_demand(pad_ref, size, state) do
state = update_demand(pad_ref, size, state)
supply_demand(pad_ref, state)
end

@spec supply_demand(Pad.ref(), State.t()) :: State.t()
def supply_demand(pad_ref, %State{delay_demands?: true} = state) do
Map.update!(state, :delayed_demands, &MapSet.put(&1, {pad_ref, :supply}))
delay_demand_supply(pad_ref, state)
end

def supply_demand(pad_ref, state) do
do_supply_demand(pad_ref, state)
|> handle_delayed_demands()
end

def delay_demand_supply(pad_ref, state) do
Map.update!(state, :delayed_demands, &MapSet.put(&1, {pad_ref, :supply}))
end

defp do_supply_demand(pad_ref, state) do
# marking is state that actual demand supply has been started (note changing back to false when finished)
state = %State{state | delay_demands?: true}
Expand All @@ -97,11 +100,16 @@ defmodule Membrane.Core.Element.DemandHandler do
%State{state | delay_demands?: false}
end

defp update_demand(pad_ref, size, state) when is_integer(size) do
@spec update_demand(
Pad.ref(),
non_neg_integer() | (non_neg_integer() -> non_neg_integer()),
State.t()
) :: State.t()
def update_demand(pad_ref, size, state) when is_integer(size) do
PadModel.set_data!(state, pad_ref, :manual_demand_size, size)
end

defp update_demand(pad_ref, size_fun, state) when is_function(size_fun) do
def update_demand(pad_ref, size_fun, state) when is_function(size_fun) do
manual_demand_size = PadModel.get_data!(state, pad_ref, :manual_demand_size)
new_manual_demand_size = size_fun.(manual_demand_size)

Expand Down
3 changes: 1 addition & 2 deletions lib/membrane/core/element/effective_flow_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ defmodule Membrane.Core.Element.EffectiveFlowController do
# Effective flow control of a single element can switch between :push and :pull many times during the element's lifetime.

alias Membrane.Core.Element.DemandController
alias Membrane.Core.Element.DemandController.AutoFlowUtils
alias Membrane.Core.Element.{AtomicDemand, State}

require Membrane.Core.Child.PadModel, as: PadModel
Expand Down Expand Up @@ -141,6 +140,6 @@ defmodule Membrane.Core.Element.EffectiveFlowController do
state
end)
end
|> AutoFlowUtils.pop_queues_and_bump_demand()
|> DemandController.Auto.pop_queues_and_bump_demand()
end
end
8 changes: 3 additions & 5 deletions lib/membrane/core/element/event_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,12 @@ defmodule Membrane.Core.Element.EventController do
alias Membrane.Core.Element.{
ActionHandler,
CallbackContext,
DemandHandler,
DemandController,
InputQueue,
PlaybackQueue,
State
}

alias Membrane.Core.Element.DemandController.AutoFlowUtils

require Membrane.Core.Child.PadModel
require Membrane.Core.Message
require Membrane.Core.Telemetry
Expand Down Expand Up @@ -55,7 +53,7 @@ defmodule Membrane.Core.Element.EventController do

# event goes to the auto flow control queue
not async? and MapSet.member?(state.awaiting_auto_input_pads, pad_ref) ->
AutoFlowUtils.store_event_in_queue(pad_ref, event, state)
DemandController.Auto.store_event_in_queue(pad_ref, event, state)

true ->
exec_handle_event(pad_ref, event, state)
Expand Down Expand Up @@ -109,7 +107,7 @@ defmodule Membrane.Core.Element.EventController do
Membrane.Logger.debug("Received end of stream on pad #{inspect(pad_ref)}")

state =
DemandHandler.remove_pad_from_delayed_demands(pad_ref, state)
DemandController.Manual.remove_pad_from_delayed_demands(pad_ref, state)
|> PadModel.set_data!(pad_ref, :end_of_stream?, true)
|> Map.update!(:awaiting_auto_input_pads, &MapSet.delete(&1, pad_ref))
|> Map.update!(:auto_input_pads, &List.delete(&1, pad_ref))
Expand Down
Loading

0 comments on commit 9c309b0

Please sign in to comment.