From 6c585e4787982e39274a4885cbb8d2fb91855a72 Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Wed, 27 Mar 2024 16:42:46 +0100 Subject: [PATCH] InputQueue -> ManualFlowController.InputQueue --- lib/membrane/children_spec.ex | 2 +- lib/membrane/core/child/pad_model.ex | 3 ++- lib/membrane/core/element.ex | 2 +- .../core/element/buffer_controller.ex | 22 ++++++++++--------- .../core/element/demand_controller.ex | 6 +++-- lib/membrane/core/element/event_controller.ex | 3 ++- .../core/element/manual_flow_controller.ex | 3 ++- .../input_queue.ex | 5 +++-- lib/membrane/core/element/pad_controller.ex | 3 ++- .../core/element/stream_format_controller.ex | 3 ++- .../core/element/event_controller_test.exs | 3 ++- .../core/element/input_queue_test.exs | 3 ++- .../element/lifecycle_controller_test.exs | 3 ++- .../element/stream_format_controller_test.exs | 3 ++- 14 files changed, 39 insertions(+), 25 deletions(-) rename lib/membrane/core/element/{ => manual_flow_controller}/input_queue.ex (98%) diff --git a/lib/membrane/children_spec.ex b/lib/membrane/children_spec.ex index cd810096d..7507da384 100644 --- a/lib/membrane/children_spec.ex +++ b/lib/membrane/children_spec.ex @@ -514,7 +514,7 @@ defmodule Membrane.ChildrenSpec do Membrane won't send smaller demand than `minimal demand`, to reduce demands' overhead. However, the user will always receive as many buffers, as demanded, all excess buffers will be queued internally. Used only for pads working in `:manual` flow control mode. See `t:Membrane.Pad.flow_control/0` - for more info. Defaults to `#{Membrane.Core.Element.InputQueue.default_min_demand_factor()}` (the default may change in the future). + for more info. Defaults to `#{Membrane.Core.Element.ManualFlowController.InputQueue.default_min_demand_factor()}` (the default may change in the future). - `auto_demand_size` - Size of automatically generated demands. Used only for pads working in `:auto` flow control mode. See `t:Membrane.Pad.flow_control/0` for more info. - `throttling_factor` - an integer specifying how frequently should a sender update the number of buffers in the `Toilet`. Defaults to 1, diff --git a/lib/membrane/core/child/pad_model.ex b/lib/membrane/core/child/pad_model.ex index 9fd1c4ace..cfd7a97a9 100644 --- a/lib/membrane/core/child/pad_model.ex +++ b/lib/membrane/core/child/pad_model.ex @@ -7,6 +7,7 @@ defmodule Membrane.Core.Child.PadModel do alias Membrane.Core.Child alias Membrane.Core.Element.EffectiveFlowController + alias Membrane.Core.Element.ManualFlowController.InputQueue alias Membrane.{Pad, UnknownPadError} @type bin_pad_data :: %Membrane.Bin.PadData{ @@ -39,7 +40,7 @@ defmodule Membrane.Core.Child.PadModel do pid: pid, other_ref: Pad.ref(), sticky_messages: [Membrane.Event.t()], - input_queue: Membrane.Core.Element.InputQueue.t() | nil, + input_queue: InputQueue.t() | nil, options: %{optional(atom) => any}, auto_demand_size: pos_integer() | nil, sticky_events: [Membrane.Event.t()], diff --git a/lib/membrane/core/element.ex b/lib/membrane/core/element.ex index 26d482833..c4776867f 100644 --- a/lib/membrane/core/element.ex +++ b/lib/membrane/core/element.ex @@ -218,7 +218,7 @@ defmodule Membrane.Core.Element do defp do_handle_info(Message.new(:buffer, buffers, _opts) = msg, state) do pad_ref = Message.for_pad(msg) - state = BufferController.handle_buffer(pad_ref, buffers, state) + state = BufferController.handle_ingoing_buffers(pad_ref, buffers, state) {:noreply, state} end diff --git a/lib/membrane/core/element/buffer_controller.ex b/lib/membrane/core/element/buffer_controller.ex index 99d7173df..0398191cf 100644 --- a/lib/membrane/core/element/buffer_controller.ex +++ b/lib/membrane/core/element/buffer_controller.ex @@ -16,12 +16,13 @@ defmodule Membrane.Core.Element.BufferController do DemandController, AutoFlowController, EventController, - InputQueue, ManualFlowController, PlaybackQueue, State } + alias Membrane.Core.Element.ManualFlowController.InputQueue + alias Membrane.Core.Telemetry require Membrane.Core.Child.PadModel @@ -32,8 +33,8 @@ defmodule Membrane.Core.Element.BufferController do callback. Also calls `Membrane.Core.Element.ManualFlowController.supply_demand/2` to check if there are any unsupplied demands. """ - @spec handle_buffer(Pad.ref(), [Buffer.t()] | Buffer.t(), State.t()) :: State.t() - def handle_buffer(pad_ref, buffers, state) do + @spec handle_ingoing_buffers(Pad.ref(), [Buffer.t()] | Buffer.t(), State.t()) :: State.t() + def handle_ingoing_buffers(pad_ref, buffers, state) do withl pad: {:ok, data} <- PadModel.get_data(state, pad_ref), playback: %State{playback: :playing} <- state do %{ @@ -51,20 +52,21 @@ defmodule Membrane.Core.Element.BufferController do EventController.handle_start_of_stream(pad_ref, state) end - do_handle_buffer(pad_ref, data, buffers, state) + do_handle_ingoing_buffers(pad_ref, data, buffers, state) else pad: {:error, :unknown_pad} -> # We've got a buffer from already unlinked pad state playback: _playback -> - PlaybackQueue.store(&handle_buffer(pad_ref, buffers, &1), state) + PlaybackQueue.store(&handle_ingoing_buffers(pad_ref, buffers, &1), state) end end - @spec do_handle_buffer(Pad.ref(), PadModel.pad_data(), [Buffer.t()] | Buffer.t(), State.t()) :: + # todo: move it to the flow controllers? + @spec do_handle_ingoing_buffers(Pad.ref(), PadModel.pad_data(), [Buffer.t()] | Buffer.t(), State.t()) :: State.t() - defp do_handle_buffer(pad_ref, %{flow_control: :auto} = data, buffers, state) do + defp do_handle_ingoing_buffers(pad_ref, %{flow_control: :auto} = data, buffers, state) do %{demand: demand, demand_unit: demand_unit, stalker_metrics: stalker_metrics} = data buf_size = Buffer.Metric.from_unit(demand_unit).buffers_size(buffers) @@ -79,20 +81,20 @@ defmodule Membrane.Core.Element.BufferController do end end - defp do_handle_buffer(pad_ref, %{flow_control: :manual} = data, buffers, state) do + defp do_handle_ingoing_buffers(pad_ref, %{flow_control: :manual} = data, buffers, state) do %{input_queue: old_input_queue} = data input_queue = InputQueue.store(old_input_queue, buffers) state = PadModel.set_data!(state, pad_ref, :input_queue, input_queue) - if old_input_queue |> InputQueue.empty?() do + if InputQueue.empty?(old_input_queue) do ManualFlowController.supply_demand(pad_ref, state) else state end end - defp do_handle_buffer(pad_ref, %{flow_control: :push}, buffers, state) do + defp do_handle_ingoing_buffers(pad_ref, %{flow_control: :push}, buffers, state) do exec_buffer_callback(pad_ref, buffers, state) end diff --git a/lib/membrane/core/element/demand_controller.ex b/lib/membrane/core/element/demand_controller.ex index c85940228..bbfc109c4 100644 --- a/lib/membrane/core/element/demand_controller.ex +++ b/lib/membrane/core/element/demand_controller.ex @@ -6,6 +6,7 @@ defmodule Membrane.Core.Element.DemandController do use Bunch alias Membrane.Buffer + alias Membrane.Element.PadData alias Membrane.Core.CallbackHandler alias Membrane.Core.Element.CallbackContext @@ -28,8 +29,9 @@ defmodule Membrane.Core.Element.DemandController do def snapshot_atomic_demand(pad_ref, state) do with {:ok, pad_data} when not pad_data.end_of_stream? <- PadModel.get_data(state, pad_ref), %State{playback: :playing} <- state do - if pad_data.direction == :input, - do: raise("cannot snapshot atomic counter in input pad") + if pad_data.direction == :input do + raise("cannot snapshot atomic counter in input pad") + end do_snapshot_atomic_demand(pad_data, state) else diff --git a/lib/membrane/core/element/event_controller.ex b/lib/membrane/core/element/event_controller.ex index f86c887ff..771ddae06 100644 --- a/lib/membrane/core/element/event_controller.ex +++ b/lib/membrane/core/element/event_controller.ex @@ -14,13 +14,14 @@ defmodule Membrane.Core.Element.EventController do CallbackContext, DemandController, AutoFlowController, - InputQueue, ManualFlowController, AutoFlowController, PlaybackQueue, State } + alias Membrane.Core.Element.ManualFlowController.InputQueue + require Membrane.Core.Child.PadModel require Membrane.Core.Message require Membrane.Core.Telemetry diff --git a/lib/membrane/core/element/manual_flow_controller.ex b/lib/membrane/core/element/manual_flow_controller.ex index a1440bd52..39497332a 100644 --- a/lib/membrane/core/element/manual_flow_controller.ex +++ b/lib/membrane/core/element/manual_flow_controller.ex @@ -12,11 +12,12 @@ defmodule Membrane.Core.Element.ManualFlowController do DemandController, AutoFlowController, EventController, - InputQueue, State, StreamFormatController } + alias __MODULE__.InputQueue + alias Membrane.Element.PadData alias Membrane.Pad diff --git a/lib/membrane/core/element/input_queue.ex b/lib/membrane/core/element/manual_flow_controller/input_queue.ex similarity index 98% rename from lib/membrane/core/element/input_queue.ex rename to lib/membrane/core/element/manual_flow_controller/input_queue.ex index a3b3a2e15..dbebea030 100644 --- a/lib/membrane/core/element/input_queue.ex +++ b/lib/membrane/core/element/manual_flow_controller/input_queue.ex @@ -1,4 +1,4 @@ -defmodule Membrane.Core.Element.InputQueue do +defmodule Membrane.Core.Element.ManualFlowController.InputQueue do @moduledoc false # Queue that is attached to the `:input` pad when working in a `:manual` flow control mode. @@ -105,7 +105,8 @@ defmodule Membrane.Core.Element.InputQueue do |> maybe_increase_atomic_demand() end - @spec store(t(), atom(), queue_item() | [queue_item()]) :: t() + @spec store(t(), :buffer | :buffers | :event | :stream_format, queue_item() | [queue_item()]) :: + t() def store(input_queue, type \\ :buffers, v) def store(input_queue, :buffers, v) when is_list(v) do diff --git a/lib/membrane/core/element/pad_controller.ex b/lib/membrane/core/element/pad_controller.ex index f51a03252..cbad131a3 100644 --- a/lib/membrane/core/element/pad_controller.ex +++ b/lib/membrane/core/element/pad_controller.ex @@ -15,11 +15,12 @@ defmodule Membrane.Core.Element.PadController do AutoFlowController, EffectiveFlowController, EventController, - InputQueue, State, StreamFormatController } + alias Membrane.Core.Element.ManualFlowController.InputQueue + alias Membrane.Core.Parent.Link.Endpoint alias Membrane.LinkError diff --git a/lib/membrane/core/element/stream_format_controller.ex b/lib/membrane/core/element/stream_format_controller.ex index 4a20c6775..a713b0749 100644 --- a/lib/membrane/core/element/stream_format_controller.ex +++ b/lib/membrane/core/element/stream_format_controller.ex @@ -14,11 +14,12 @@ defmodule Membrane.Core.Element.StreamFormatController do CallbackContext, DemandController, AutoFlowController, - InputQueue, PlaybackQueue, State } + alias Membrane.Core.Element.ManualFlowController.InputQueue + require Membrane.Core.Child.PadModel require Membrane.Core.Telemetry diff --git a/test/membrane/core/element/event_controller_test.exs b/test/membrane/core/element/event_controller_test.exs index e9ea8f70a..57e2ccfad 100644 --- a/test/membrane/core/element/event_controller_test.exs +++ b/test/membrane/core/element/event_controller_test.exs @@ -1,7 +1,8 @@ defmodule Membrane.Core.Element.EventControllerTest do use ExUnit.Case, async: true - alias Membrane.Core.Element.{AtomicDemand, EventController, InputQueue, State} + alias Membrane.Core.Element.{AtomicDemand, EventController, State} + alias Membrane.Core.Element.ManualFlowController.InputQueue alias Membrane.Core.Events alias Membrane.Core.SubprocessSupervisor alias Membrane.Event diff --git a/test/membrane/core/element/input_queue_test.exs b/test/membrane/core/element/input_queue_test.exs index efa95c490..42904d7aa 100644 --- a/test/membrane/core/element/input_queue_test.exs +++ b/test/membrane/core/element/input_queue_test.exs @@ -2,7 +2,8 @@ defmodule Membrane.Core.Element.InputQueueTest do use ExUnit.Case, async: true alias Membrane.Buffer - alias Membrane.Core.Element.{AtomicDemand, InputQueue} + alias Membrane.Core.Element.AtomicDemand + alias Membrane.Core.Element.ManualFlowController.InputQueue alias Membrane.Core.Message alias Membrane.Core.SubprocessSupervisor alias Membrane.Testing.Event diff --git a/test/membrane/core/element/lifecycle_controller_test.exs b/test/membrane/core/element/lifecycle_controller_test.exs index 74131182c..de2a33d1a 100644 --- a/test/membrane/core/element/lifecycle_controller_test.exs +++ b/test/membrane/core/element/lifecycle_controller_test.exs @@ -1,7 +1,8 @@ defmodule Membrane.Core.Element.LifecycleControllerTest do use ExUnit.Case, async: true - alias Membrane.Core.Element.{AtomicDemand, InputQueue, LifecycleController, State} + alias Membrane.Core.Element.{AtomicDemand, LifecycleController, State} + alias Membrane.Core.Element.ManualFlowController.InputQueue alias Membrane.Core.{ Message, diff --git a/test/membrane/core/element/stream_format_controller_test.exs b/test/membrane/core/element/stream_format_controller_test.exs index 5b68ad29a..914f02c8d 100644 --- a/test/membrane/core/element/stream_format_controller_test.exs +++ b/test/membrane/core/element/stream_format_controller_test.exs @@ -3,7 +3,8 @@ defmodule Membrane.Core.Element.StreamFormatControllerTest do alias Membrane.Buffer alias Membrane.Core.Message - alias Membrane.Core.Element.{AtomicDemand, InputQueue, State} + alias Membrane.Core.Element.{AtomicDemand, State} + alias Membrane.Core.Element.ManualFlowController.InputQueue alias Membrane.Core.SubprocessSupervisor alias Membrane.StreamFormat.Mock, as: MockStreamFormat alias Membrane.Support.DemandsTest.Filter