diff --git a/lib/membrane/core/element.ex b/lib/membrane/core/element.ex index ead730ecf..806e74aed 100644 --- a/lib/membrane/core/element.ex +++ b/lib/membrane/core/element.ex @@ -26,6 +26,7 @@ defmodule Membrane.Core.Element do DemandController, EffectiveFlowController, EventController, + ManualFlowController, LifecycleController, PadController, State, @@ -210,7 +211,7 @@ defmodule Membrane.Core.Element do end defp do_handle_info(Message.new(:resume_delayed_demands_loop), state) do - state = DemandController.Manual.resume_delayed_demands_loop(state) + state = ManualFlowController.resume_delayed_demands_loop(state) {:noreply, state} end diff --git a/lib/membrane/core/element/action_handler.ex b/lib/membrane/core/element/action_handler.ex index ac0beb6cc..72585b796 100644 --- a/lib/membrane/core/element/action_handler.ex +++ b/lib/membrane/core/element/action_handler.ex @@ -24,7 +24,8 @@ defmodule Membrane.Core.Element.ActionHandler do alias Membrane.Core.Element.{ DemandController, State, - StreamFormatController + StreamFormatController, + ManualFlowController } alias Membrane.Core.{Events, TimerController} @@ -69,7 +70,7 @@ defmodule Membrane.Core.Element.ActionHandler do defp hdd(state) do with %{delay_demands?: false} <- state do - DemandController.Manual.handle_delayed_demands(state) + ManualFlowController.handle_delayed_demands(state) end end @@ -421,8 +422,8 @@ defmodule Membrane.Core.Element.ActionHandler do with %{direction: :input, flow_control: :manual} <- PadModel.get_data!(state, pad_ref) do # todo: get_data! above could be eradicated - state = DemandController.Manual.update_demand(pad_ref, size, state) - DemandController.Manual.delay_demand_supply(pad_ref, state) + state = ManualFlowController.update_demand(pad_ref, size, state) + ManualFlowController.delay_demand_supply(pad_ref, state) else %{direction: :output} -> raise PadDirectionError, action: :demand, direction: :output, pad: pad_ref @@ -443,7 +444,7 @@ defmodule Membrane.Core.Element.ActionHandler do with %{direction: :output, flow_control: :manual} <- PadModel.get_data!(state, pad_ref) do # todo: get_data! above could be eradicated - DemandController.Manual.delay_redemand(pad_ref, state) + ManualFlowController.delay_redemand(pad_ref, state) else %{direction: :input} -> raise ElementError, "Tried to make a redemand on input pad #{inspect(pad_ref)}" @@ -479,7 +480,7 @@ defmodule Membrane.Core.Element.ActionHandler do @spec handle_outgoing_event(Pad.ref(), Event.t(), State.t()) :: State.t() defp handle_outgoing_event(pad_ref, %Events.EndOfStream{}, state) do with %{direction: :output, end_of_stream?: false} <- PadModel.get_data!(state, pad_ref) do - DemandController.Manual.remove_pad_from_delayed_demands(pad_ref, state) + ManualFlowController.remove_pad_from_delayed_demands(pad_ref, state) |> Map.update!(:satisfied_auto_output_pads, &MapSet.delete(&1, pad_ref)) |> PadModel.set_data!(pad_ref, :end_of_stream?, true) |> DemandController.Auto.pop_queues_and_bump_demand() diff --git a/lib/membrane/core/element/buffer_controller.ex b/lib/membrane/core/element/buffer_controller.ex index a40800cd0..162f0d6a7 100644 --- a/lib/membrane/core/element/buffer_controller.ex +++ b/lib/membrane/core/element/buffer_controller.ex @@ -15,6 +15,7 @@ defmodule Membrane.Core.Element.BufferController do DemandController, EventController, InputQueue, + ManualFlowController, PlaybackQueue, State } @@ -26,7 +27,7 @@ defmodule Membrane.Core.Element.BufferController do @doc """ Handles incoming buffer: either stores it in InputQueue, or executes element's - callback. Also calls `Membrane.Core.Element.DemandController.Manual.supply_demand/2` + callback. Also calls `Membrane.Core.Element.ManualFlowController.supply_demand/2` to check if there are any unsupplied demands. """ @spec handle_buffer(Pad.ref(), [Buffer.t()] | Buffer.t(), State.t()) :: State.t() @@ -83,7 +84,7 @@ defmodule Membrane.Core.Element.BufferController do state = PadModel.set_data!(state, pad_ref, :input_queue, input_queue) if old_input_queue |> InputQueue.empty?() do - DemandController.Manual.supply_demand(pad_ref, state) + ManualFlowController.supply_demand(pad_ref, state) else state end diff --git a/lib/membrane/core/element/demand_controller.ex b/lib/membrane/core/element/demand_controller.ex index 6cb50d739..84cb2b37a 100644 --- a/lib/membrane/core/element/demand_controller.ex +++ b/lib/membrane/core/element/demand_controller.ex @@ -5,12 +5,20 @@ defmodule Membrane.Core.Element.DemandController do use Bunch + alias Membrane.Core.Element.ManualFlowController alias __MODULE__.{Auto, Manual} alias Membrane.Buffer + alias Membrane.Core.CallbackHandler + alias Membrane.Core.Element.CallbackContext + + alias Membrane.Core.Element.{ + ActionHandler, AtomicDemand, + ManualFlowController, + AutoFlowController, PlaybackQueue, State } @@ -78,7 +86,7 @@ defmodule Membrane.Core.Element.DemandController do } ) - Manual.handle_redemand(pad_data.ref, state) + ManualFlowController.handle_redemand(pad_data.ref, state) else _other -> state end @@ -111,4 +119,52 @@ defmodule Membrane.Core.Element.DemandController do atomic_demand: atomic_demand }) end + + + @spec exec_handle_demand(Pad.ref(), State.t()) :: State.t() + def exec_handle_demand(pad_ref, state) do + with {:ok, pad_data} <- PadModel.get_data(state, pad_ref), + true <- exec_handle_demand?(pad_data) do + do_exec_handle_demand(pad_data, state) + else + _other -> state + end + end + + @spec do_exec_handle_demand(PadData.t(), State.t()) :: State.t() + defp do_exec_handle_demand(pad_data, state) do + context = &CallbackContext.from_state(&1, incoming_demand: pad_data.incoming_demand) + + CallbackHandler.exec_and_handle_callback( + :handle_demand, + ActionHandler, + %{ + split_continuation_arbiter: &exec_handle_demand?(PadModel.get_data!(&1, pad_data.ref)), + context: context + }, + [pad_data.ref, pad_data.demand, pad_data.demand_unit], + state + ) + end + + defp exec_handle_demand?(%{end_of_stream?: true}) do + Membrane.Logger.debug_verbose(""" + Demand controller: not executing handle_demand as :end_of_stream action has already been returned + """) + + false + end + + defp exec_handle_demand?(%{demand: demand}) when demand <= 0 do + Membrane.Logger.debug_verbose(""" + Demand controller: not executing handle_demand as demand is not greater than 0, + demand: #{inspect(demand)} + """) + + false + end + + defp exec_handle_demand?(_pad_data) do + true + end end diff --git a/lib/membrane/core/element/event_controller.ex b/lib/membrane/core/element/event_controller.ex index 66a3d41e5..e79394213 100644 --- a/lib/membrane/core/element/event_controller.ex +++ b/lib/membrane/core/element/event_controller.ex @@ -14,6 +14,8 @@ defmodule Membrane.Core.Element.EventController do CallbackContext, DemandController, InputQueue, + ManualFlowController, + AutoFlowController, PlaybackQueue, State } @@ -107,7 +109,7 @@ defmodule Membrane.Core.Element.EventController do Membrane.Logger.debug("Received end of stream on pad #{inspect(pad_ref)}") state = - DemandController.Manual.remove_pad_from_delayed_demands(pad_ref, state) + ManualFlowController.remove_pad_from_delayed_demands(pad_ref, state) |> PadModel.set_data!(pad_ref, :end_of_stream?, true) |> Map.update!(:awaiting_auto_input_pads, &MapSet.delete(&1, pad_ref)) |> Map.update!(:auto_input_pads, &List.delete(&1, pad_ref)) diff --git a/lib/membrane/core/element/demand_controller/manual.ex b/lib/membrane/core/element/manual_flow_controller.ex similarity index 81% rename from lib/membrane/core/element/demand_controller/manual.ex rename to lib/membrane/core/element/manual_flow_controller.ex index 2d6d11f21..04ec59b2e 100644 --- a/lib/membrane/core/element/demand_controller/manual.ex +++ b/lib/membrane/core/element/manual_flow_controller.ex @@ -1,4 +1,4 @@ -defmodule Membrane.Core.Element.DemandController.Manual do +defmodule Membrane.Core.Element.ManualFlowController do @moduledoc false # Module handling demands requested on output pads. @@ -9,6 +9,7 @@ defmodule Membrane.Core.Element.DemandController.Manual do ActionHandler, BufferController, CallbackContext, + DemandController, EventController, InputQueue, State, @@ -47,7 +48,7 @@ defmodule Membrane.Core.Element.DemandController.Manual do defp do_handle_redemand(pad_ref, state) do state = %{state | delay_demands?: true} - state = exec_handle_demand(pad_ref, state) + state = DemandController.exec_handle_demand(pad_ref, state) %{state | delay_demands?: false} end @@ -100,11 +101,7 @@ defmodule Membrane.Core.Element.DemandController.Manual do %State{state | delay_demands?: false} end - @spec update_demand( - Pad.ref(), - non_neg_integer() | (non_neg_integer() -> non_neg_integer()), - State.t() - ) :: State.t() + @spec update_demand(Pad.ref(), non_neg_integer() | (non_neg_integer() -> non_neg_integer()), State.t()) :: State.t() def update_demand(pad_ref, size, state) when is_integer(size) do PadModel.set_data!(state, pad_ref, :manual_demand_size, size) end @@ -206,51 +203,4 @@ defmodule Membrane.Core.Element.DemandController.Manual do BufferController.exec_buffer_callback(pad_ref, buffers, state) end - - @spec exec_handle_demand(Pad.ref(), State.t()) :: State.t() - defp exec_handle_demand(pad_ref, state) do - with {:ok, pad_data} <- PadModel.get_data(state, pad_ref), - true <- exec_handle_demand?(pad_data) do - do_exec_handle_demand(pad_data, state) - else - _other -> state - end - end - - @spec do_exec_handle_demand(PadData.t(), State.t()) :: State.t() - defp do_exec_handle_demand(pad_data, state) do - context = &CallbackContext.from_state(&1, incoming_demand: pad_data.incoming_demand) - - CallbackHandler.exec_and_handle_callback( - :handle_demand, - ActionHandler, - %{ - split_continuation_arbiter: &exec_handle_demand?(PadModel.get_data!(&1, pad_data.ref)), - context: context - }, - [pad_data.ref, pad_data.demand, pad_data.demand_unit], - state - ) - end - - defp exec_handle_demand?(%{end_of_stream?: true}) do - Membrane.Logger.debug_verbose(""" - Demand controller: not executing handle_demand as :end_of_stream action has already been returned - """) - - false - end - - defp exec_handle_demand?(%{demand: demand}) when demand <= 0 do - Membrane.Logger.debug_verbose(""" - Demand controller: not executing handle_demand as demand is not greater than 0, - demand: #{inspect(demand)} - """) - - false - end - - defp exec_handle_demand?(_pad_data) do - true - end end