diff --git a/lib/membrane/core/element.ex b/lib/membrane/core/element.ex index 806e74aed..119e4f3dd 100644 --- a/lib/membrane/core/element.ex +++ b/lib/membrane/core/element.ex @@ -23,7 +23,7 @@ defmodule Membrane.Core.Element do alias Membrane.Core.Element.{ BufferController, - DemandController, + DemandController, AutoFlowController, EffectiveFlowController, EventController, ManualFlowController, diff --git a/lib/membrane/core/element/action_handler.ex b/lib/membrane/core/element/action_handler.ex index 72585b796..8e7d89ddb 100644 --- a/lib/membrane/core/element/action_handler.ex +++ b/lib/membrane/core/element/action_handler.ex @@ -22,7 +22,8 @@ defmodule Membrane.Core.Element.ActionHandler do } alias Membrane.Core.Element.{ - DemandController, + AutoFlowController, + DemandController, AutoFlowController, State, StreamFormatController, ManualFlowController @@ -183,13 +184,13 @@ defmodule Membrane.Core.Element.ActionHandler do @impl CallbackHandler def handle_action({:pause_auto_demand, in_ref}, _cb, _params, %State{type: type} = state) when type in [:sink, :filter, :endpoint] do - DemandController.Auto.pause_demands(in_ref, state) + AutoFlowController.pause_demands(in_ref, state) end @impl CallbackHandler def handle_action({:resume_auto_demand, in_ref}, _cb, _params, %State{type: type} = state) when type in [:sink, :filter, :endpoint] do - DemandController.Auto.resume_demands(in_ref, state) + AutoFlowController.resume_demands(in_ref, state) end @impl CallbackHandler @@ -483,7 +484,7 @@ defmodule Membrane.Core.Element.ActionHandler do ManualFlowController.remove_pad_from_delayed_demands(pad_ref, state) |> Map.update!(:satisfied_auto_output_pads, &MapSet.delete(&1, pad_ref)) |> PadModel.set_data!(pad_ref, :end_of_stream?, true) - |> DemandController.Auto.pop_queues_and_bump_demand() + |> AutoFlowController.pop_queues_and_bump_demand() else %{direction: :input} -> raise PadDirectionError, action: "end of stream", direction: :input, pad: pad_ref diff --git a/lib/membrane/core/element/demand_controller/auto.ex b/lib/membrane/core/element/auto_flow_controller.ex similarity index 99% rename from lib/membrane/core/element/demand_controller/auto.ex rename to lib/membrane/core/element/auto_flow_controller.ex index a71602c22..36c6fc3e1 100644 --- a/lib/membrane/core/element/demand_controller/auto.ex +++ b/lib/membrane/core/element/auto_flow_controller.ex @@ -1,4 +1,4 @@ -defmodule Membrane.Core.Element.DemandController.Auto do +defmodule Membrane.Core.Element.AutoFlowController do @moduledoc false alias Membrane.Buffer diff --git a/lib/membrane/core/element/buffer_controller.ex b/lib/membrane/core/element/buffer_controller.ex index 162f0d6a7..bb0fb3055 100644 --- a/lib/membrane/core/element/buffer_controller.ex +++ b/lib/membrane/core/element/buffer_controller.ex @@ -11,8 +11,9 @@ defmodule Membrane.Core.Element.BufferController do alias Membrane.Core.Element.{ ActionHandler, + AutoFlowController, CallbackContext, - DemandController, + DemandController, AutoFlowController, EventController, InputQueue, ManualFlowController, @@ -70,10 +71,10 @@ defmodule Membrane.Core.Element.BufferController do :atomics.put(stalker_metrics.demand, 1, demand - buf_size) if state.effective_flow_control == :pull and MapSet.size(state.satisfied_auto_output_pads) > 0 do - DemandController.Auto.store_buffers_in_queue(pad_ref, buffers, state) + AutoFlowController.store_buffers_in_queue(pad_ref, buffers, state) else state = exec_buffer_callback(pad_ref, buffers, state) - DemandController.Auto.auto_adjust_atomic_demand(pad_ref, state) + AutoFlowController.auto_adjust_atomic_demand(pad_ref, state) end end diff --git a/lib/membrane/core/element/demand_controller.ex b/lib/membrane/core/element/demand_controller.ex index 84cb2b37a..905876ea2 100644 --- a/lib/membrane/core/element/demand_controller.ex +++ b/lib/membrane/core/element/demand_controller.ex @@ -5,9 +5,6 @@ defmodule Membrane.Core.Element.DemandController do use Bunch - alias Membrane.Core.Element.ManualFlowController - alias __MODULE__.{Auto, Manual} - alias Membrane.Buffer alias Membrane.Core.CallbackHandler @@ -63,7 +60,7 @@ defmodule Membrane.Core.Element.DemandController do if atomic_value > 0 do state |> Map.update!(:satisfied_auto_output_pads, &MapSet.delete(&1, pad_data.ref)) - |> Auto.pop_queues_and_bump_demand() + |> AutoFlowController.pop_queues_and_bump_demand() else state end diff --git a/lib/membrane/core/element/effective_flow_controller.ex b/lib/membrane/core/element/effective_flow_controller.ex index e07dcedb5..385c2e999 100644 --- a/lib/membrane/core/element/effective_flow_controller.ex +++ b/lib/membrane/core/element/effective_flow_controller.ex @@ -19,6 +19,7 @@ defmodule Membrane.Core.Element.EffectiveFlowController do # Effective flow control of a single element can switch between :push and :pull many times during the element's lifetime. alias Membrane.Core.Element.DemandController + alias Membrane.Core.Element.AutoFlowController alias Membrane.Core.Element.{AtomicDemand, State} require Membrane.Core.Child.PadModel, as: PadModel @@ -140,6 +141,6 @@ defmodule Membrane.Core.Element.EffectiveFlowController do state end) end - |> DemandController.Auto.pop_queues_and_bump_demand() + |> AutoFlowController.pop_queues_and_bump_demand() end end diff --git a/lib/membrane/core/element/event_controller.ex b/lib/membrane/core/element/event_controller.ex index e79394213..246eaef16 100644 --- a/lib/membrane/core/element/event_controller.ex +++ b/lib/membrane/core/element/event_controller.ex @@ -12,7 +12,7 @@ defmodule Membrane.Core.Element.EventController do alias Membrane.Core.Element.{ ActionHandler, CallbackContext, - DemandController, + DemandController, AutoFlowController, InputQueue, ManualFlowController, AutoFlowController, @@ -55,7 +55,7 @@ defmodule Membrane.Core.Element.EventController do # event goes to the auto flow control queue not async? and MapSet.member?(state.awaiting_auto_input_pads, pad_ref) -> - DemandController.Auto.store_event_in_queue(pad_ref, event, state) + AutoFlowController.store_event_in_queue(pad_ref, event, state) true -> exec_handle_event(pad_ref, event, state) diff --git a/lib/membrane/core/element/manual_flow_controller.ex b/lib/membrane/core/element/manual_flow_controller.ex index 04ec59b2e..83793d283 100644 --- a/lib/membrane/core/element/manual_flow_controller.ex +++ b/lib/membrane/core/element/manual_flow_controller.ex @@ -9,7 +9,7 @@ defmodule Membrane.Core.Element.ManualFlowController do ActionHandler, BufferController, CallbackContext, - DemandController, + DemandController, AutoFlowController, EventController, InputQueue, State, diff --git a/lib/membrane/core/element/pad_controller.ex b/lib/membrane/core/element/pad_controller.ex index dfbb52f8f..85479f9a1 100644 --- a/lib/membrane/core/element/pad_controller.ex +++ b/lib/membrane/core/element/pad_controller.ex @@ -9,8 +9,9 @@ defmodule Membrane.Core.Element.PadController do alias Membrane.Core.Element.{ ActionHandler, AtomicDemand, + AutoFlowController, CallbackContext, - DemandController, + DemandController, AutoFlowController, EffectiveFlowController, EventController, InputQueue, @@ -241,7 +242,7 @@ defmodule Membrane.Core.Element.PadController do |> Map.update!(:satisfied_auto_output_pads, &MapSet.delete(&1, pad_ref)) |> Map.update!(:awaiting_auto_input_pads, &MapSet.delete(&1, pad_ref)) |> Map.update!(:auto_input_pads, &List.delete(&1, pad_ref)) - |> DemandController.Auto.pop_queues_and_bump_demand() + |> AutoFlowController.pop_queues_and_bump_demand() else {:ok, %{availability: :always}} when state.terminating? -> state @@ -335,7 +336,7 @@ defmodule Membrane.Core.Element.PadController do Map.update!(state, :satisfied_auto_output_pads, &MapSet.put(&1, pad_data.ref)) %{direction: :input, flow_control: :auto} -> - DemandController.Auto.auto_adjust_atomic_demand(endpoint.pad_ref, state) + AutoFlowController.auto_adjust_atomic_demand(endpoint.pad_ref, state) |> Map.update!(:auto_input_pads, &[endpoint.pad_ref | &1]) _pad_data -> diff --git a/lib/membrane/core/element/stream_format_controller.ex b/lib/membrane/core/element/stream_format_controller.ex index f28368a3f..e70949fa0 100644 --- a/lib/membrane/core/element/stream_format_controller.ex +++ b/lib/membrane/core/element/stream_format_controller.ex @@ -12,7 +12,7 @@ defmodule Membrane.Core.Element.StreamFormatController do alias Membrane.Core.Element.{ ActionHandler, CallbackContext, - DemandController, + DemandController, AutoFlowController, InputQueue, PlaybackQueue, State @@ -48,7 +48,7 @@ defmodule Membrane.Core.Element.StreamFormatController do # stream format goes to the auto flow control queue pad_ref in state.awaiting_auto_input_pads -> - DemandController.Auto.store_stream_format_in_queue(pad_ref, stream_format, state) + AutoFlowController.store_stream_format_in_queue(pad_ref, stream_format, state) true -> exec_handle_stream_format(pad_ref, stream_format, state) diff --git a/lib/membrane/element/pad_data.ex b/lib/membrane/element/pad_data.ex index f6f9054b9..98022cef1 100644 --- a/lib/membrane/element/pad_data.ex +++ b/lib/membrane/element/pad_data.ex @@ -56,7 +56,7 @@ defmodule Membrane.Element.PadData do # with input pad, but hasn't been sent yet by the element with output pad. Detects toilet overflow as well. atomic_demand: private_field, - # Field used in DemandController.Auto and InputQueue, to caluclate, how much AtomicDemand should be increased. + # Field used in AutoFlowController and InputQueue, to caluclate, how much AtomicDemand should be increased. # Contains amount of data (:buffers/:bytes), that has been demanded from the element on the other side of link, but # hasn't arrived yet. Unused for output pads. manual_demand_size: private_field,