From 3ea52d2c4deb0b68169ddcbf2fb8aa200a5c352d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Chali=C5=84ski?= Date: Fri, 2 Feb 2024 17:46:55 +0100 Subject: [PATCH 01/11] check if pr author is membrane admin (#741) --- scripts/python/get_author_origin.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/scripts/python/get_author_origin.py b/scripts/python/get_author_origin.py index 0ff857386..7a6e7804a 100644 --- a/scripts/python/get_author_origin.py +++ b/scripts/python/get_author_origin.py @@ -3,6 +3,10 @@ membrane_team = json.load(sys.stdin) pr_author = sys.argv[1] +if pr_author == "membraneframeworkadmin": + print("MEMBRANE") + sys.exit(0) + try: for person in membrane_team: if person["login"] == pr_author: From 28ff5b3aaea745338f7229bcb4d7290d5cf56aac Mon Sep 17 00:00:00 2001 From: Mateusz Front Date: Fri, 9 Feb 2024 17:11:40 +0100 Subject: [PATCH 02/11] Remove portaudio and mad installation instructions from readme (#748) --- README.md | 4 ---- 1 file changed, 4 deletions(-) diff --git a/README.md b/README.md index a7d250fc5..caed0cbdc 100644 --- a/README.md +++ b/README.md @@ -53,10 +53,6 @@ Membrane.Pipeline.start_link(MyPipeline, mp3_url) ``` This is an [Elixir](elixir-lang.org) snippet, that streams an mp3 via HTTP and plays it on your speaker. Here's how to run it: -- Install [libmad](https://github.com/markjeee/libmad) and [portaudio](https://github.com/PortAudio/portaudio). Membrane uses these libs to decode the mp3 and to access your speaker, respectively. You can use these commands: - - On Mac OS: `brew install libmad portaudio pkg-config` - - On Debian: `apt install libmad0-dev portaudio19-dev` - - Option 1: Click the button below: [![Run in Livebook](https://livebook.dev/badge/v1/blue.svg)](https://livebook.dev/run?url=https%3A%2F%2Fgithub.com%2Fmembraneframework%2Fmembrane_core%2Fblob%2Fmaster%2Fexample.livemd) From 12f1e8ed006b39571d4892aaf64b82b859880591 Mon Sep 17 00:00:00 2001 From: Brad Hanks Date: Mon, 12 Feb 2024 08:24:24 -0700 Subject: [PATCH 03/11] lib/membrane/utility_supervisor.ex doc edits (#746) --- lib/membrane/utility_supervisor.ex | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/lib/membrane/utility_supervisor.ex b/lib/membrane/utility_supervisor.ex index a46bbbfd3..b60d556e2 100644 --- a/lib/membrane/utility_supervisor.ex +++ b/lib/membrane/utility_supervisor.ex @@ -1,14 +1,16 @@ defmodule Membrane.UtilitySupervisor do @moduledoc """ - A supervisor that allows to start utility processes under the pipeline's + A supervisor responsible for managing utility processes under the pipeline's supervision tree. - The supervisor is spawned with each component and can be obtained from - callback contexts. + The supervisor is spawned with each component and can be accessed from callback contexts. - The supervisor never restarts any processes, it just makes sure they - terminate when the component that started them terminates. If restarting - is needed, a dedicated supervisor should be spawned under this supervisor, like + `Membrane.UtilitySupervisor` does not restart processes. Rather, it ensures that these utility processes + terminate gracefully when the component that initiated them terminates. + + If a process needs to be able to restart, spawn a dedicated supervisor under this supervisor. + + ## Example def handle_setup(ctx, state) do Membrane.UtilitySupervisor.start_link_child( @@ -26,7 +28,7 @@ defmodule Membrane.UtilitySupervisor do @doc """ Starts a process under the utility supervisor. - Semantics of the `child_spec` argument is the same as in `Supervisor.child_spec/2`. + Semantics of the `child_spec` argument are the same as in `Supervisor.child_spec/2`. """ @spec start_child(t, Supervisor.child_spec() | {module(), term()} | module()) :: Supervisor.on_start_child() @@ -37,7 +39,7 @@ defmodule Membrane.UtilitySupervisor do @doc """ Starts a process under the utility supervisor and links it to the current process. - Semantics of the `child_spec` argument is the same as in `Supervisor.child_spec/2`. + Semantics of the `child_spec` argument are the same as in `Supervisor.child_spec/2`. """ @spec start_link_child(t, Supervisor.child_spec() | {module(), term()} | module()) :: Supervisor.on_start_child() From f112f9473a8110ede0d88e5f240969e28b4220eb Mon Sep 17 00:00:00 2001 From: DominikWolek Date: Tue, 13 Feb 2024 14:50:29 +0100 Subject: [PATCH 04/11] Add crash reason handle crash group (#720) * Add :reason to the ctx in handle_crash_group_down * Remove type reason as it is private * Add :reason to CallbackContexts * Add test for crashing reason * Apply revier's suggestions * Rename :reason to :crash_reason Specify crash_reason type to more descriptive type * Apply revier suggestions * Unify field name --------- Co-authored-by: feliks.pobiedzinski@swmansion.com --- CHANGELOG.md | 1 + lib/membrane/bin/callback_context.ex | 5 ++-- lib/membrane/core/bin/callback_context.ex | 6 ++++- .../crash_group_utils.ex | 12 ++++++---- lib/membrane/core/parent/crash_group.ex | 7 ++++-- .../core/pipeline/callback_context.ex | 6 ++++- lib/membrane/pipeline/callback_context.ex | 3 ++- .../membrane/integration/child_crash_test.exs | 17 ++++++++++++++ test/support/child_crash_test/pipeline.ex | 23 ++++++++++++++++++- 9 files changed, 67 insertions(+), 13 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 000523f6c..a75560ee0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ * Log messages in the default handle_info implementation [#680](https://github.com/membraneframework/membrane_core/pull/680) * Fix typespecs in Membrane.UtilitySupervisor [#681](https://github.com/membraneframework/membrane_core/pull/681) * Improve callback return types and group actions types [#702](https://github.com/membraneframework/membrane_core/pull/702) + * Add `crash_reason` to `handle_crash_group_down/3` callback context in bins and pipelines. [#720](https://github.com/membraneframework/membrane_core/pull/720) ## 1.0.0 * Introduce `:remove_link` action in pipelines and bins. diff --git a/lib/membrane/bin/callback_context.ex b/lib/membrane/bin/callback_context.ex index 9608bb3c5..c6aa09527 100644 --- a/lib/membrane/bin/callback_context.ex +++ b/lib/membrane/bin/callback_context.ex @@ -12,8 +12,8 @@ defmodule Membrane.Bin.CallbackContext do Field `:start_of_stream_received?` is present only in `c:Membrane.Bin.handle_element_end_of_stream/4`. - Fields `:members` and `:crash_initiator` are present only in - `c:Membrane.Pipeline.handle_crash_group_down/3`. + Fields `:members`, `:crash_initiator` and `crash_reason` and are present only in + `c:Membrane.Bin.handle_crash_group_down/3`. """ @type t :: %{ :clock => Membrane.Clock.t(), @@ -27,6 +27,7 @@ defmodule Membrane.Bin.CallbackContext do optional(:pad_options) => map(), optional(:members) => [Membrane.Child.name()], optional(:crash_initiator) => Membrane.Child.name(), + optional(:crash_reason) => :normal | :shutdown | {:shutdown, term()} | term(), optional(:start_of_stream_received?) => boolean() } end diff --git a/lib/membrane/core/bin/callback_context.ex b/lib/membrane/core/bin/callback_context.ex index 704822720..205e90c2a 100644 --- a/lib/membrane/core/bin/callback_context.ex +++ b/lib/membrane/core/bin/callback_context.ex @@ -3,7 +3,11 @@ defmodule Membrane.Core.Bin.CallbackContext do @type optional_fields :: [pad_options: map()] - | [members: [Membrane.Child.name()], crash_initiator: Membrane.Child.name()] + | [ + members: [Membrane.Child.name()], + crash_initiator: Membrane.Child.name(), + crash_reason: :normal | :shutdown | {:shutdown, term()} | term() + ] | [start_of_stream_received?: boolean()] @spec from_state(Membrane.Core.Bin.State.t(), optional_fields()) :: diff --git a/lib/membrane/core/parent/child_life_controller/crash_group_utils.ex b/lib/membrane/core/parent/child_life_controller/crash_group_utils.ex index 6bcb0037e..8537f7d5c 100644 --- a/lib/membrane/core/parent/child_life_controller/crash_group_utils.ex +++ b/lib/membrane/core/parent/child_life_controller/crash_group_utils.ex @@ -53,12 +53,12 @@ defmodule Membrane.Core.Parent.ChildLifeController.CrashGroupUtils do end end - def handle_crash_group_member_death(child_name, %CrashGroup{} = group, _reason, state) do + def handle_crash_group_member_death(child_name, %CrashGroup{} = group, crash_reason, state) do state = if group.detonating? do state else - detonate_crash_group(child_name, group, state) + detonate_crash_group(child_name, group, crash_reason, state) end all_members_dead? = @@ -72,7 +72,7 @@ defmodule Membrane.Core.Parent.ChildLifeController.CrashGroupUtils do end end - defp detonate_crash_group(crash_initiator, %CrashGroup{} = group, state) do + defp detonate_crash_group(crash_initiator, %CrashGroup{} = group, crash_reason, state) do state = ChildLifeController.remove_children_from_specs(group.members, state) state = LinkUtils.unlink_crash_group(group, state) @@ -88,7 +88,8 @@ defmodule Membrane.Core.Parent.ChildLifeController.CrashGroupUtils do &%CrashGroup{ &1 | detonating?: true, - crash_initiator: crash_initiator + crash_initiator: crash_initiator, + crash_reason: crash_reason } ) end @@ -108,7 +109,8 @@ defmodule Membrane.Core.Parent.ChildLifeController.CrashGroupUtils do context_generator = &Component.context_from_state(&1, members: crash_group.members, - crash_initiator: crash_group.crash_initiator + crash_initiator: crash_group.crash_initiator, + crash_reason: crash_group.crash_reason ) CallbackHandler.exec_and_handle_callback( diff --git a/lib/membrane/core/parent/crash_group.ex b/lib/membrane/core/parent/crash_group.ex index 1b8327ca8..62d1631c6 100644 --- a/lib/membrane/core/parent/crash_group.ex +++ b/lib/membrane/core/parent/crash_group.ex @@ -5,6 +5,7 @@ defmodule Membrane.Core.Parent.CrashGroup do # * name - name that identifies the group # * type - responsible for restart policy of members of groups # * members - list of members of group + # * reason - reason of the crash use Bunch.Access @@ -15,9 +16,11 @@ defmodule Membrane.Core.Parent.CrashGroup do mode: :temporary, members: [Membrane.Child.name()], detonating?: boolean(), - crash_initiator: Membrane.Child.name() + crash_initiator: Membrane.Child.name(), + crash_reason: :normal | :shutdown | {:shutdown, term()} | term() } @enforce_keys [:name, :mode] - defstruct @enforce_keys ++ [members: [], detonating?: false, crash_initiator: nil] + defstruct @enforce_keys ++ + [members: [], detonating?: false, crash_initiator: nil, crash_reason: nil] end diff --git a/lib/membrane/core/pipeline/callback_context.ex b/lib/membrane/core/pipeline/callback_context.ex index d2537b760..0414147a6 100644 --- a/lib/membrane/core/pipeline/callback_context.ex +++ b/lib/membrane/core/pipeline/callback_context.ex @@ -3,7 +3,11 @@ defmodule Membrane.Core.Pipeline.CallbackContext do @type optional_fields :: [from: GenServer.from()] - | [members: [Membrane.Child.name()], crash_initiator: Membrane.Child.name()] + | [ + members: [Membrane.Child.name()], + crash_initiator: Membrane.Child.name(), + crash_reason: :normal | :shutdown | {:shutdown, term()} | term() + ] | [start_of_stream_received?: boolean()] @spec from_state(Membrane.Core.Pipeline.State.t(), optional_fields()) :: diff --git a/lib/membrane/pipeline/callback_context.ex b/lib/membrane/pipeline/callback_context.ex index b352c4896..3c5a6fa6d 100644 --- a/lib/membrane/pipeline/callback_context.ex +++ b/lib/membrane/pipeline/callback_context.ex @@ -11,7 +11,7 @@ defmodule Membrane.Pipeline.CallbackContext do Field `:start_of_stream_received?` is present only in `c:Membrane.Pipeline.handle_element_end_of_stream/4`. - Fields `:members` and `:crash_initiator` are present only in + Fields `:members`, `:crash_initiator` and `:crash_reason` are present only in `c:Membrane.Pipeline.handle_crash_group_down/3`. """ @type t :: %{ @@ -23,6 +23,7 @@ defmodule Membrane.Pipeline.CallbackContext do optional(:from) => [GenServer.from()], optional(:members) => [Membrane.Child.name()], optional(:crash_initiator) => Membrane.Child.name(), + optional(:crash_reason) => :normal | :shutdown | {:shutdown, term()} | term(), optional(:start_of_stream_received?) => boolean() } end diff --git a/test/membrane/integration/child_crash_test.exs b/test/membrane/integration/child_crash_test.exs index f90ac6942..da423a5ca 100644 --- a/test/membrane/integration/child_crash_test.exs +++ b/test/membrane/integration/child_crash_test.exs @@ -67,6 +67,23 @@ defmodule Membrane.Integration.ChildCrashTest do assert_pipeline_crash_group_down(pipeline_pid, 1) end + test "Pipeline receives correct crash reason" do + pipeline_pid = Testing.Pipeline.start_supervised!(module: ChildCrashTest.Pipeline) + ChildCrashTest.Pipeline.add_path(pipeline_pid, [], :source, 1, :group_1) + + # time for pipeline to start :source + Process.sleep(100) + + ChildCrashTest.Pipeline.inform_about_details_in_case_of_crash(pipeline_pid) + + Testing.Pipeline.get_child_pid!(pipeline_pid, :source) + |> Process.exit(:custom_crash_reason) + + assert_receive {:crash, crash_reason: :custom_crash_reason} + + Testing.Pipeline.terminate(pipeline_pid) + end + test "Crash group consisting of bin crashes" do Process.flag(:trap_exit, true) diff --git a/test/support/child_crash_test/pipeline.ex b/test/support/child_crash_test/pipeline.ex index f23be8c0e..6761a67c1 100644 --- a/test/support/child_crash_test/pipeline.ex +++ b/test/support/child_crash_test/pipeline.ex @@ -21,7 +21,7 @@ defmodule Membrane.Support.ChildCrashTest.Pipeline do child(:center_filter, Filter) |> child(:sink, Testing.Sink) - {[spec: spec], %{}} + {[spec: spec], %{send_to: nil}} end @impl true @@ -29,6 +29,22 @@ defmodule Membrane.Support.ChildCrashTest.Pipeline do {[spec: spec], state} end + @impl true + def handle_info({:inform_about_crash, send_to}, _ctx, state) do + {[], %{state | send_to: send_to}} + end + + @impl true + def handle_crash_group_down(_group_name, _ctx, %{send_to: nil} = state) do + {[], state} + end + + @impl true + def handle_crash_group_down(_group_name, %{crash_reason: crash_reason}, %{send_to: pid} = state) do + send(pid, {:crash, crash_reason: crash_reason}) + {[], state} + end + @spec add_single_source(pid(), any(), any(), any()) :: any() def add_single_source(pid, source_name, group \\ nil, source \\ Testing.Source) do spec = child(source_name, source) |> get_child(:center_filter) @@ -92,4 +108,9 @@ defmodule Membrane.Support.ChildCrashTest.Pipeline do send(pid, {:create_path, spec}) end + + @spec inform_about_details_in_case_of_crash(pid()) :: any() + def inform_about_details_in_case_of_crash(pid) do + send(pid, {:inform_about_crash, self()}) + end end From 2eacd1a79d69e28ba1d54768e07899d21df779cb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Feliks=20Pobiedzi=C5=84ski?= <38541925+FelonEkonom@users.noreply.github.com> Date: Thu, 22 Feb 2024 15:42:21 +0100 Subject: [PATCH 05/11] Bump version to v1.0.1 (#755) --- README.md | 2 +- mix.exs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index caed0cbdc..8c3f44399 100644 --- a/README.md +++ b/README.md @@ -92,7 +92,7 @@ Apart from plugins, Membrane has stream formats, which live in `membrane_X_forma The API for creating pipelines (and custom elements too) is provided by [membrane_core](https://github.com/membraneframework/membrane_core). To install it, add the following line to your `deps` in `mix.exs` and run `mix deps.get` ```elixir -{:membrane_core, "~> 1.0.0"} +{:membrane_core, "~> 1.0"} ``` **Standalone libraries** diff --git a/mix.exs b/mix.exs index 4c96bd981..ac1df050b 100644 --- a/mix.exs +++ b/mix.exs @@ -1,7 +1,7 @@ defmodule Membrane.Mixfile do use Mix.Project - @version "1.0.0" + @version "1.0.1" @source_ref "v#{@version}" def project do From 4c37a06e47b96447e79ea90d7afd878f463cca36 Mon Sep 17 00:00:00 2001 From: Brad Hanks Date: Fri, 23 Feb 2024 02:28:02 -0700 Subject: [PATCH 06/11] Next (#750) --- lib/membrane/event.ex | 18 ++++++++++++------ lib/membrane/stream_format.ex | 14 ++++++++------ 2 files changed, 20 insertions(+), 12 deletions(-) diff --git a/lib/membrane/event.ex b/lib/membrane/event.ex index 2ed7405b6..c342a52f3 100644 --- a/lib/membrane/event.ex +++ b/lib/membrane/event.ex @@ -1,18 +1,24 @@ defmodule Membrane.Event do @moduledoc """ - Event is an entity that can be sent between elements. + Represents a communication event, capable of flowing both downstream and upstream. - Events can flow either downstream or upstream - they can be sent with - `t:Membrane.Element.Action.event/0`, and can be handled in - `c:Membrane.Element.Base.handle_event/4`. Each event is - to implement `Membrane.EventProtocol`, which allows to configure its behaviour. + Events are dispatched using `t:Membrane.Element.Action.event/0` and are handled via the + `c:Membrane.Element.Base.handle_event/4` callback. Each event must conform to the + `Membrane.EventProtocol` to ensure the proper configuration of its behaviour. """ alias Membrane.EventProtocol - @typedoc @moduledoc + @typedoc """ + The Membrane event, based on the `Membrane.EventProtocol`. + """ @type t :: EventProtocol.t() + @doc """ + Checks if the given argument is a Membrane event. + + Returns `true` if the `event` implements the `Membrane.EventProtocol`, otherwise `false`. + """ @spec event?(t()) :: boolean def event?(event) do EventProtocol.impl_for(event) != nil diff --git a/lib/membrane/stream_format.ex b/lib/membrane/stream_format.ex index f003f2448..9f63c9590 100644 --- a/lib/membrane/stream_format.ex +++ b/lib/membrane/stream_format.ex @@ -1,14 +1,16 @@ defmodule Membrane.StreamFormat do @moduledoc """ - Describes capabilities of some pad. + Defines the capabilities of a pad within the Membrane framework. - Every pad has some capabilities, which define a type of data that pad is - expecting. This format can be, for example, raw audio with specific sample - rate or encoded audio in given format. + Each pad in a multimedia pipeline has specific capabilities, determining the type and format + of data it can handle. For example, a pad's capabilities might include handling raw audio + with a specific sample rate or managing encoded audio in a specified format. - To link two pads together, their capabilities have to be compatible. + To successfully link two pads together, their capabilities must be compatible. """ - @typedoc @moduledoc + @typedoc """ + Represents a pad's capabilities. For more information, see: `Membrane.StreamFormat`. + """ @type t :: struct end From 475c22c045c050721cd3ffa19ebd7e73e5eff19a Mon Sep 17 00:00:00 2001 From: Brad Hanks Date: Fri, 23 Feb 2024 08:20:49 -0700 Subject: [PATCH 07/11] Improve Pipeline docs (#744) --- lib/membrane/pipeline.ex | 148 +++++++++++++++++++++------------------ 1 file changed, 79 insertions(+), 69 deletions(-) diff --git a/lib/membrane/pipeline.ex b/lib/membrane/pipeline.ex index f78be3d60..e3da30050 100644 --- a/lib/membrane/pipeline.ex +++ b/lib/membrane/pipeline.ex @@ -1,29 +1,25 @@ defmodule Membrane.Pipeline do @moduledoc """ - Module containing functions for constructing and supervising pipelines. + A behaviour module for implementing pipelines. - Pipelines are units that make it possible to instantiate, link and manage - elements and bins in convenient way (actually they should always be used inside - a pipeline). Linking pipeline children together enables them to pass data to one - another, and process it in different ways. - - To create a pipeline, use the `__using__/1` macro and implement callbacks - of `Membrane.Pipeline` behaviour. For details on instantiating and linking - children, see `Membrane.ChildrenSpec`. + `Membrane.Pipeline` contains the callbacks and functions for constructing and supervising pipelines. + Pipelines facilitate the convenient instantiation, linking, and management of elements and bins.\\ + Linking pipeline children together enables them to pass and process data. + To create a pipeline, use `use Membrane.Pipeline` and implement callbacks of `Membrane.Pipeline` behaviour. + See `Membrane.ChildrenSpec` for details on instantiating and linking children. ## Starting and supervision - Pipeline can be started with `start_link/2` or `start/2` functions. They both - return `{:ok, supervisor_pid, pipeline_pid}` in case of success, because the pipeline - is always spawned under a dedicated supervisor. The supervisor never restarts the - pipeline, but it makes sure that the pipeline and its children terminate properly. - If the pipeline needs to be restarted in case of failure, it should be spawned under - another supervisor with a proper strategy. + Start a pipeline with `start_link/2` or `start/2`. Pipelines always spawn under a dedicated supervisor, so + in the case of success, either function will return `{:ok, supervisor_pid, pipeline_pid}` . + + The supervisor never restarts the pipeline, but it does ensure that the pipeline and its children terminate properly. + If the pipeline needs to be restarted, it should be spawned under a different supervisor with the appropriate strategy. ### Starting under a supervision tree - The pipeline can be spawned under a supervision tree like any `GenServer`. Also, - `__using__/1` macro injects a `child_spec/1` function. A simple scenario can look like: + A pipeline can be spawned under a supervision tree like any other `GenServer`.\\ + `use Membrane.Pipeline` injects a `child_spec/1` function. A simple scenario could look like this: defmodule MyPipeline do use Membrane.Pipeline @@ -40,22 +36,22 @@ defmodule Membrane.Pipeline do ### Starting outside of a supervision tree - When starting a pipeline outside a supervision tree and willing to interact with - the pipeline by pid, `pipeline_pid` returned from `start_link` can be used, for example + When starting a pipeline outside a supervision tree, use the `pipeline_pid` pid to interact with the pipeline. + A simple scenario could look like this: {:ok, _supervisor_pid, pipeline_pid} = Membrane.Pipeline.start_link(MyPipeline, option: :value) send(pipeline_pid, :message) - ### Visualizing pipeline's supervision tree + ### Visualizing the supervision tree - Pipeline's internal supervision tree can be looked up with Applications tab of Erlang's Stalker - or with Livebook's `Kino` library. - For debugging (and ONLY for debugging) purposes, you may use the following configuration: + Use the [Applications tab](https://www.erlang.org/doc/apps/observer/observer_ug#applications-tab) in Erlang's Observer GUI + (or the `Kino` library in Livebook) to visualize a pipeline's internal supervision tree. Use the following configuration for debugging purposes only: config :membrane_core, unsafely_name_processes_for_stalker: [:components] - that makes the stalker's process tree graph more readable by naming pipeline's descendants, for example: - ![Stalker graph](assets/images/stalker_graph.png). + This improves the readability of the Observer's process tree graph by naming the pipeline descendants, as demonstrated here: + + ![Observer graph](assets/images/observer_graph.png). """ use Bunch @@ -67,20 +63,30 @@ defmodule Membrane.Pipeline do require Membrane.Core.Message, as: Message @typedoc """ - Defines options that can be passed to `start/3` / `start_link/3` and received - in `c:handle_init/2` callback. + Defines options passed to the `start/3` and `start_link/3` and subsequently received + in the `c:handle_init/2` callback. """ @type pipeline_options :: any + @typedoc "The Pipeline name" @type name :: GenServer.name() + @typedoc "List of configurations used by `start/3` and `start_link/3`." @type config :: [config_entry()] + + @typedoc "Defines configuration value used by the `start/3` and `start_link/3`." @type config_entry :: {:name, name()} + @typedoc """ + Defines the return value of the `start/3` and `start_link/3`." + """ @type on_start :: {:ok, supervisor_pid :: pid, pipeline_pid :: pid} | {:error, {:already_started, pid()} | term()} + @typedoc """ + The pipeline state. + """ @type state :: any() @typedoc """ @@ -88,29 +94,27 @@ defmodule Membrane.Pipeline do ## Return values - * `{[action], state}` - Return a list of actions that will be performed within the - pipeline. This can be used to start new children, or to send messages to specific children, - for example. Actions are a tuple of `{type, arguments}`, so may be written in the - form a keyword list. See `Membrane.Pipeline.Action` for more info. + * `{[action], state}` - Returns a list of actions that will be performed within the + pipeline, e.g., starting new children, sending messages to specific children, etc. + Actions are tuples of `{type, arguments}`, so they can be expressed as a keyword list. + See `Membrane.Pipeline.Action` for more info. """ @type callback_return :: {[Action.t()], state} @doc """ - Callback invoked on initialization of pipeline. + Callback invoked on initialization of the pipeline. - This callback is synchronous: the process which started the pipeline waits until `handle_init` - finishes. For that reason, it's important to do any long-lasting or complex work in `c:handle_setup/2`, - while `handle_init` should be used for things like parsing options, initializing state or spawning - children. - By default, it converts the `opts` to a map if they're a struct and sets them as the pipeline state. + This callback is synchronous: the process that started the pipeline waits until `handle_init` + finishes, so it's important to do any long-lasting or complex work in `c:handle_setup/2`. + `handle_init` should be used for things, like parsing options, initializing state, or spawning + children. By default, `handle_init` converts `opts` to a map if they're a struct and sets them as the pipeline state. """ @callback handle_init(context :: CallbackContext.t(), options :: pipeline_options) :: {[Action.common_actions()], state()} @doc """ - Callback invoked when pipeline is requested to terminate with `terminate/2`. - + Callback invoked when the pipeline is requested to terminate with `terminate/2`. By default, it returns `t:Membrane.Pipeline.Action.terminate/0` with reason `:normal`. """ @callback handle_terminate_request(context :: CallbackContext.t(), state) :: @@ -129,7 +133,7 @@ defmodule Membrane.Pipeline do {[Action.common_actions()], state()} @doc """ - Callback invoked when pipeline switches the playback to `:playing`. + Callback invoked when the pipeline switches the playback to `:playing`. By default, it does nothing. """ @callback handle_playing( @@ -166,10 +170,10 @@ defmodule Membrane.Pipeline do ) :: {[Action.common_actions()], state()} @doc """ - Callback invoked when pipeline receives a message that is not recognized - as an internal membrane message. + Callback invoked when the pipeline receives a message that is not recognized + as an internal Membrane message. - Useful for receiving data sent from NIFs or other stuff. + Useful for receiving data sent from NIFs or other external sources. By default, it logs and ignores the received message. """ @callback handle_info( @@ -180,7 +184,7 @@ defmodule Membrane.Pipeline do {[Action.common_actions()], state()} @doc """ - Callback invoked when a child element starts processing stream via given pad. + Callback invoked when a child element starts processing a stream via the given pad. By default, it does nothing. """ @@ -192,7 +196,7 @@ defmodule Membrane.Pipeline do ) :: {[Action.common_actions()], state()} @doc """ - Callback invoked when a child element finishes processing stream via given pad. + Callback invoked when a child element finishes processing a stream via the given pad. By default, it does nothing. """ @@ -225,7 +229,7 @@ defmodule Membrane.Pipeline do ) :: {[Action.common_actions()], state()} @doc """ - Callback invoked when crash of the crash group happens. + Callback invoked when a crash group crashes. Context passed to this callback contains 2 additional fields: `:members` and `:crash_initiator`. By default, it does nothing. @@ -237,9 +241,9 @@ defmodule Membrane.Pipeline do ) :: {[Action.common_actions()], state()} @doc """ - Callback invoked when pipeline is called using a synchronous call. + Callback invoked when the pipeline is called using a synchronous call. - Context passed to this callback contains additional field `:from`. + Context passed to this callback contains an additional field `:from`. By default, it does nothing. """ @callback handle_call( @@ -264,10 +268,10 @@ defmodule Membrane.Pipeline do handle_child_pad_removed: 4 @doc """ - Starts the Pipeline based on given module and links it to the current - process. + Starts the pipeline based on the given module and links it to the current process. + - Pipeline options are passed to module's `c:handle_init/2` callback. + Pipeline options are passed to the `c:handle_init/2` callback. Note that this function returns `{:ok, supervisor_pid, pipeline_pid}` in case of success. Check the 'Starting and supervision' section of the moduledoc for details. """ @@ -276,7 +280,7 @@ defmodule Membrane.Pipeline do do: do_start(:start_link, module, pipeline_options, process_options) @doc """ - Does the same as `start_link/3` but starts process outside of supervision tree. + Starts the pipeline outside a supervision tree. Compare to `start_link/3`. """ @spec start(module, pipeline_options, config) :: on_start def start(module, pipeline_options \\ nil, process_options \\ []), @@ -328,21 +332,21 @@ defmodule Membrane.Pipeline do Terminates the pipeline. Accepts three options: - * `asynchronous?` - if set to `true`, pipline termination won't be blocking and - will be executed in the process, which pid is returned as function result. If - set to `false`, pipeline termination will be blocking and will be executed in - the process that called this function. Defaults to `false`. - * `timeout` - tells how much time (ms) to wait for pipeline to get gracefully - terminated. Defaults to 5000. - * `force?` - if set to `true` and pipeline is still alive after `timeout`, - pipeline will be killed using `Process.exit/2` with reason `:kill`, and function - will return `{:error, :timeout}`. If set to `false` and pipeline is still alive - after `timeout`, function will raise an error. Defaults to `false`. + * `asynchronous?` - if set to `true`, pipeline termination won't be blocking and + will be executed in the process whose pid is returned as a function result. + If set to `false`, pipeline termination will be blocking and will be executed in + the process that called this function. Defaults to `false`. + * `timeout` - specifies how much time (ms) to wait for the pipeline to gracefully + terminate. Defaults to 5000. + * `force?` - determines how to handle a pipeline still alive after `timeout`. + If set to `true`, `Process.exit/2` kills the pipeline with reason `:kill` and returns + `{:error, :timeout}`. + If set to `false`, it raises an error. Defaults to `false`. Returns: - * `{:ok, pid}` - if option `asynchronous?: true` was passed. - * `:ok` - if pipeline was gracefully terminated within `timeout`. - * `{:error, :timeout}` - if pipeline was killed after a `timeout`. + * `{:ok, pid}` - option `asynchronous?: true` was passed. + * `:ok` - pipeline gracefully terminated within `timeout`. + * `{:error, :timeout}` - pipeline was killed after `timeout`. """ @spec terminate(pipeline :: pid, timeout: timeout(), @@ -393,13 +397,18 @@ defmodule Membrane.Pipeline do end end + @doc """ + Calls the pipeline with a message. + + Returns the result of the pipeline call. + """ @spec call(pid, any, timeout()) :: term() def call(pipeline, message, timeout \\ 5000) do GenServer.call(pipeline, message, timeout) end @doc """ - Checks whether module is a pipeline. + Checks whether the module is a pipeline. """ @spec pipeline?(module) :: boolean def pipeline?(module) do @@ -407,9 +416,9 @@ defmodule Membrane.Pipeline do end @doc """ - Lists PIDs of all the pipelines currently running on the current node. + Returns list of pipeline PIDs currently running on the current node. - Use only for debugging purposes. + Use for debugging only. """ @spec list_pipelines() :: [pid] def list_pipelines() do @@ -423,7 +432,8 @@ defmodule Membrane.Pipeline do end @doc """ - Like `list_pipelines/0`, but allows to pass a node. + Returns list of pipeline PIDs currently running on the passed node. \\ + Compare to `list_pipelines/0`. """ @spec list_pipelines(node()) :: [pid] def list_pipelines(node) do From dc5f6530f5ef1b7405a5e8148abd8705ce838d57 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Feliks=20Pobiedzi=C5=84ski?= <38541925+FelonEkonom@users.noreply.github.com> Date: Mon, 26 Feb 2024 15:47:48 +0100 Subject: [PATCH 08/11] Queue buffers when auto demand is low enough (#693) * Implement auto flow queue * Fix bugs wip * Fix bugs introduced in recent changes * Update changelog * Refactor code related to auto flow queues * Write tests for auto flow queue * wip * wip * Fix tests wip * Fix tests wip * Write more tests for recent changes * Fix tests wip * Refactor code * Refactor wip * Fix unit tests wip * Fix unit tests wip * Fix tests wip * Fix tests wip * Fix tests wip * Small refactor * Add comments describing, how auto flow queueing works * Small refactor in structs fields names * Make membrane fast again wip * Remove leftovers * Remove leftovers wip * Remove leftovers * Fix CI * Remove comments * Refactor auto flow queues mechanism description * wip * Revert "wip" This reverts commit 95b184840ab6eb589c4b5544b7a278d46567c59a. * Remove inspects * Impelemnt CR --- CHANGELOG.md | 1 + lib/membrane/core/child/pad_model.ex | 1 - lib/membrane/core/element.ex | 6 +- lib/membrane/core/element/action_handler.ex | 13 +- lib/membrane/core/element/atomic_demand.ex | 23 +- .../core/element/buffer_controller.ex | 14 +- .../core/element/demand_controller.ex | 30 ++- .../demand_controller/auto_flow_utils.ex | 208 +++++++++++++++++- .../core/element/effective_flow_controller.ex | 22 +- lib/membrane/core/element/event_controller.ex | 36 ++- lib/membrane/core/element/pad_controller.ex | 67 ++---- lib/membrane/core/element/state.ex | 12 +- .../core/element/stream_format_controller.ex | 26 ++- lib/membrane/element/pad_data.ex | 4 +- .../core/element/action_handler_test.exs | 14 +- .../core/element/atomic_demand_test.exs | 22 +- .../core/element/event_controller_test.exs | 7 +- .../core/element/input_queue_test.exs | 4 +- .../element/lifecycle_controller_test.exs | 7 +- .../core/element/pad_controller_test.exs | 8 +- .../element/stream_format_controller_test.exs | 7 +- .../integration/auto_demands_test.exs | 202 ++++++++++++++++- test/membrane/integration/demands_test.exs | 3 - 23 files changed, 577 insertions(+), 160 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a75560ee0..da6ceab62 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ## 1.0.1 * Specify the order in which state fields will be printed in the error logs. [#614](https://github.com/membraneframework/membrane_core/pull/614) + * Handle buffers from input pads having `flow_control: :auto` only if demand on all output pads having `flow_control: :auto` is positive. [#693](https://github.com/membraneframework/membrane_core/pull/693) * Fix clock selection [#626](https://github.com/membraneframework/membrane_core/pull/626) * Log messages in the default handle_info implementation [#680](https://github.com/membraneframework/membrane_core/pull/680) * Fix typespecs in Membrane.UtilitySupervisor [#681](https://github.com/membraneframework/membrane_core/pull/681) diff --git a/lib/membrane/core/child/pad_model.ex b/lib/membrane/core/child/pad_model.ex index c545a7421..9fd1c4ace 100644 --- a/lib/membrane/core/child/pad_model.ex +++ b/lib/membrane/core/child/pad_model.ex @@ -42,7 +42,6 @@ defmodule Membrane.Core.Child.PadModel do input_queue: Membrane.Core.Element.InputQueue.t() | nil, options: %{optional(atom) => any}, auto_demand_size: pos_integer() | nil, - associated_pads: [Pad.ref()] | nil, sticky_events: [Membrane.Event.t()], stalker_metrics: %{atom => :atomics.atomics_ref()} } diff --git a/lib/membrane/core/element.ex b/lib/membrane/core/element.ex index 8bd3d91c7..b6500a593 100644 --- a/lib/membrane/core/element.ex +++ b/lib/membrane/core/element.ex @@ -158,8 +158,12 @@ defmodule Membrane.Core.Element do setup_incomplete?: false, effective_flow_control: :push, handling_action?: false, + popping_auto_flow_queue?: false, pads_to_snapshot: MapSet.new(), - stalker: options.stalker + stalker: options.stalker, + satisfied_auto_output_pads: MapSet.new(), + awaiting_auto_input_pads: MapSet.new(), + auto_input_pads: [] } |> PadSpecHandler.init_pads() diff --git a/lib/membrane/core/element/action_handler.ex b/lib/membrane/core/element/action_handler.ex index 0812367b6..759237203 100644 --- a/lib/membrane/core/element/action_handler.ex +++ b/lib/membrane/core/element/action_handler.ex @@ -24,11 +24,11 @@ defmodule Membrane.Core.Element.ActionHandler do alias Membrane.Core.Element.{ DemandController, DemandHandler, - PadController, State, StreamFormatController } + alias Membrane.Core.Element.DemandController.AutoFlowUtils alias Membrane.Core.{Events, TimerController} alias Membrane.Element.Action @@ -176,7 +176,11 @@ defmodule Membrane.Core.Element.ActionHandler do _other -> :output end - pads = state |> PadModel.filter_data(%{direction: dir}) |> Map.keys() + pads = + Enum.flat_map(state.pads_data, fn + {pad_ref, %{direction: ^dir}} -> [pad_ref] + _pad_entry -> [] + end) Enum.reduce(pads, state, fn pad, state -> action = @@ -466,8 +470,9 @@ defmodule Membrane.Core.Element.ActionHandler do @spec handle_outgoing_event(Pad.ref(), Event.t(), State.t()) :: State.t() defp handle_outgoing_event(pad_ref, %Events.EndOfStream{}, state) do with %{direction: :output, end_of_stream?: false} <- PadModel.get_data!(state, pad_ref) do - state = PadController.remove_pad_associations(pad_ref, state) - PadModel.set_data!(state, pad_ref, :end_of_stream?, true) + Map.update!(state, :satisfied_auto_output_pads, &MapSet.delete(&1, pad_ref)) + |> PadModel.set_data!(pad_ref, :end_of_stream?, true) + |> AutoFlowUtils.pop_queues_and_bump_demand() else %{direction: :input} -> raise PadDirectionError, action: "end of stream", direction: :input, pad: pad_ref diff --git a/lib/membrane/core/element/atomic_demand.ex b/lib/membrane/core/element/atomic_demand.ex index 1b387254e..4b7a9a020 100644 --- a/lib/membrane/core/element/atomic_demand.ex +++ b/lib/membrane/core/element/atomic_demand.ex @@ -139,14 +139,14 @@ defmodule Membrane.Core.Element.AtomicDemand do :ok end - @spec decrease(t, non_neg_integer()) :: t + @spec decrease(t, non_neg_integer()) :: {{:decreased, integer()}, t} | {:unchanged, t} def decrease(%__MODULE__{} = atomic_demand, value) do atomic_demand = Map.update!(atomic_demand, :buffered_decrementation, &(&1 + value)) if atomic_demand.buffered_decrementation >= atomic_demand.throttling_factor do flush_buffered_decrementation(atomic_demand) else - atomic_demand + {:unchanged, atomic_demand} end end @@ -164,14 +164,17 @@ defmodule Membrane.Core.Element.AtomicDemand do atomic_demand = %{atomic_demand | buffered_decrementation: 0} - if not atomic_demand.toilet_overflowed? and - get_receiver_status(atomic_demand) == {:resolved, :pull} and - get_sender_status(atomic_demand) == {:resolved, :push} and - -1 * atomic_demand_value > atomic_demand.toilet_capacity do - overflow(atomic_demand, atomic_demand_value) - else - atomic_demand - end + atomic_demand = + if not atomic_demand.toilet_overflowed? and + get_receiver_status(atomic_demand) == {:resolved, :pull} and + get_sender_status(atomic_demand) == {:resolved, :push} and + -1 * atomic_demand_value > atomic_demand.toilet_capacity do + overflow(atomic_demand, atomic_demand_value) + else + atomic_demand + end + + {{:decreased, atomic_demand_value}, atomic_demand} end defp overflow(atomic_demand, atomic_demand_value) do diff --git a/lib/membrane/core/element/buffer_controller.ex b/lib/membrane/core/element/buffer_controller.ex index ada22f144..986a204cf 100644 --- a/lib/membrane/core/element/buffer_controller.ex +++ b/lib/membrane/core/element/buffer_controller.ex @@ -69,8 +69,12 @@ defmodule Membrane.Core.Element.BufferController do state = PadModel.set_data!(state, pad_ref, :demand, demand - buf_size) :atomics.put(stalker_metrics.demand, 1, demand - buf_size) - state = AutoFlowUtils.auto_adjust_atomic_demand(pad_ref, state) - exec_buffer_callback(pad_ref, buffers, state) + if state.effective_flow_control == :pull and MapSet.size(state.satisfied_auto_output_pads) > 0 do + AutoFlowUtils.store_buffers_in_queue(pad_ref, buffers, state) + else + state = exec_buffer_callback(pad_ref, buffers, state) + AutoFlowUtils.auto_adjust_atomic_demand(pad_ref, state) + end end defp do_handle_buffer(pad_ref, %{flow_control: :manual} = data, buffers, state) do @@ -93,11 +97,7 @@ defmodule Membrane.Core.Element.BufferController do @doc """ Executes `handle_buffer` callback. """ - @spec exec_buffer_callback( - Pad.ref(), - [Buffer.t()] | Buffer.t(), - State.t() - ) :: State.t() + @spec exec_buffer_callback(Pad.ref(), [Buffer.t()], State.t()) :: State.t() def exec_buffer_callback(pad_ref, buffers, %State{type: :filter} = state) do Telemetry.report_metric("buffer", 1, inspect(pad_ref)) diff --git a/lib/membrane/core/element/demand_controller.ex b/lib/membrane/core/element/demand_controller.ex index 20d164dc6..daa5fb784 100644 --- a/lib/membrane/core/element/demand_controller.ex +++ b/lib/membrane/core/element/demand_controller.ex @@ -23,13 +23,20 @@ defmodule Membrane.Core.Element.DemandController do @spec snapshot_atomic_demand(Pad.ref(), State.t()) :: State.t() def snapshot_atomic_demand(pad_ref, state) do - with {:ok, pad_data} <- PadModel.get_data(state, pad_ref), + with {:ok, pad_data} when not pad_data.end_of_stream? <- PadModel.get_data(state, pad_ref), %State{playback: :playing} <- state do if pad_data.direction == :input, do: raise("cannot snapshot atomic counter in input pad") do_snapshot_atomic_demand(pad_data, state) else + {:ok, %{end_of_stream?: true}} -> + Membrane.Logger.debug_verbose( + "Skipping snapshot of pad #{inspect(pad_ref)}, because it has flag :end_of_stream? set to true" + ) + + state + {:error, :unknown_pad} -> # We've got a :atomic_demand_increased message on already unlinked pad state @@ -43,13 +50,10 @@ defmodule Membrane.Core.Element.DemandController do %{flow_control: :auto} = pad_data, %{effective_flow_control: :pull} = state ) do - %{ - atomic_demand: atomic_demand, - associated_pads: associated_pads - } = pad_data - - if AtomicDemand.get(atomic_demand) > 0 do - AutoFlowUtils.auto_adjust_atomic_demand(associated_pads, state) + if AtomicDemand.get(pad_data.atomic_demand) > 0 do + state + |> Map.update!(:satisfied_auto_output_pads, &MapSet.delete(&1, pad_data.ref)) + |> AutoFlowUtils.pop_queues_and_bump_demand() else state end @@ -91,9 +95,15 @@ defmodule Membrane.Core.Element.DemandController do buffers_size = Buffer.Metric.from_unit(pad_data.demand_unit).buffers_size(buffers) demand = pad_data.demand - buffers_size - atomic_demand = AtomicDemand.decrease(pad_data.atomic_demand, buffers_size) + {decrease_result, atomic_demand} = AtomicDemand.decrease(pad_data.atomic_demand, buffers_size) - PadModel.set_data!(state, pad_ref, %{ + with {:decreased, new_value} when new_value <= 0 <- decrease_result, + %{flow_control: :auto} <- pad_data do + Map.update!(state, :satisfied_auto_output_pads, &MapSet.put(&1, pad_ref)) + else + _other -> state + end + |> PadModel.set_data!(pad_ref, %{ pad_data | demand: demand, atomic_demand: atomic_demand diff --git a/lib/membrane/core/element/demand_controller/auto_flow_utils.ex b/lib/membrane/core/element/demand_controller/auto_flow_utils.ex index a76a7441f..62d0b2077 100644 --- a/lib/membrane/core/element/demand_controller/auto_flow_utils.ex +++ b/lib/membrane/core/element/demand_controller/auto_flow_utils.ex @@ -1,15 +1,95 @@ defmodule Membrane.Core.Element.DemandController.AutoFlowUtils do @moduledoc false + alias Membrane.Buffer + alias Membrane.Event + alias Membrane.StreamFormat + alias Membrane.Core.Element.{ AtomicDemand, - State + BufferController, + EventController, + State, + StreamFormatController } require Membrane.Core.Child.PadModel, as: PadModel require Membrane.Logger require Membrane.Pad, as: Pad + @empty_map_set MapSet.new() + + # Description of the auto flow control queueing mechanism + + # General concept: Buffers coming to auto input pads should be handled only if + # all auto output pads have positive demand. Buffers arriving when any of the auto + # output pads has negative demand should be queued and only processed when the + # demand everywhere is positive + + # An Element is `corked` when its effective flow control is :pull and it has an auto output pad, + # who's demand is non-positive + + # The following events can make the element shift from `corked` state to `uncorked` state: + # - change of effective flow control from :pull to :push + # - increase in the value of auto output pad demand. We check the demand value: + # - after sending the buffer to a given output pad + # - after receiving a message :atomic_demand_increased from the next element + # - unlinking an auto output pad + # - sending EOS to an auto output pad + + # Analogically, transition from `uncorcked` to `corcked` might be caused by: + # - change of effective flow control from :push to :pull + # - sending a buffer through an output pad + # - linking an output pad + + # In addition, an invariant is maintained, which is that the head of all non-empty + # auto_flow_queue queues contains a buffer (the queue can also contain events and + # stream formats). After popping a queue + # of a given pad, if it has an event or stream format in its head, we pop it further, + # until it becomes empty or a buffer is encountered. + + # auto_flow_queues hold single buffers, event if they arrive to the element in batch, because if we + # done otherwise, we would have to handle whole batch after popping it from the queue, even if demand + # of all output pads would be satisfied after handling first buffer + + # Fields in Element state, that take important part in this mechanism: + # - satisfied_auto_output_pads - MapSet of auto output pads, whose demand is less than or equal to 0. + # We consider only pads with the end_of_stream? flag set to false + # - awaiting_auto_input_pads - MapSet of auto input pads, which have a non-empty auto_flow_queue + # - popping_auto_flow_queue? - a flag determining whether we are on the stack somewhere above popping a queue. + # It's used to avoid situations where the function that pops from the queue calls itself multiple times, + # what could potentially lead to things like altering the order of sent buffers. + + # Each auto input pad in PadData contains a queue in the :auto_flow_queue field, in which it stores queued + # buffers, events and stream formats. If queue is non-empty, corresponding pad_ref should be + # in the Mapset awaiting_auto_input_pads in element state + + # The introduced mechanism consists of two parts, the pseudocode for which is included below + + # def onBufferArrived() do + # if element uncorked do + # exec handle_buffer + # else + # store buffer in queue + # end + # end + + # def onUncorck() do + # # EFC means `effective flow control` + + # if EFC == pull do + # bump demand on auto input pads with an empty queue + # end + + # while (output demand positive or EFC == push) and some queues are not empty do + # pop random queue and handle its head + # end + + # if EFC == pull do + # bump demand on auto input pads with an empty queue + # end + # end + defguardp is_input_auto_pad_data(pad_data) when is_map(pad_data) and is_map_key(pad_data, :flow_control) and pad_data.flow_control == :auto and is_map_key(pad_data, :direction) and @@ -59,12 +139,39 @@ defmodule Membrane.Core.Element.DemandController.AutoFlowUtils do state end + @spec store_buffers_in_queue(Pad.ref(), [Buffer.t()], State.t()) :: State.t() + def store_buffers_in_queue(pad_ref, buffers, state) do + state + |> Map.update!(:awaiting_auto_input_pads, &MapSet.put(&1, pad_ref)) + |> PadModel.update_data!(pad_ref, :auto_flow_queue, fn queue -> + Enum.reduce(buffers, queue, fn buffer, queue -> + Qex.push(queue, {:buffer, buffer}) + end) + end) + end + + @spec store_event_in_queue(Pad.ref(), Event.t(), State.t()) :: State.t() + def store_event_in_queue(pad_ref, event, state) do + queue_item = {:event, event} + PadModel.update_data!(state, pad_ref, :auto_flow_queue, &Qex.push(&1, queue_item)) + end + + @spec store_stream_format_in_queue(Pad.ref(), StreamFormat.t(), State.t()) :: State.t() + def store_stream_format_in_queue(pad_ref, stream_format, state) do + queue_item = {:stream_format, stream_format} + PadModel.update_data!(state, pad_ref, :auto_flow_queue, &Qex.push(&1, queue_item)) + end + @spec auto_adjust_atomic_demand(Pad.ref() | [Pad.ref()], State.t()) :: State.t() def auto_adjust_atomic_demand(pad_ref_list, state) when is_list(pad_ref_list) do - Enum.reduce(pad_ref_list, state, &auto_adjust_atomic_demand/2) + pad_ref_list + |> Enum.reduce(state, fn pad_ref, state -> + PadModel.get_data!(state, pad_ref) + |> do_auto_adjust_atomic_demand(state) + end) end - def auto_adjust_atomic_demand(pad_ref, state) when Pad.is_pad_ref(pad_ref) do + def auto_adjust_atomic_demand(pad_ref, state) do PadModel.get_data!(state, pad_ref) |> do_auto_adjust_atomic_demand(state) end @@ -83,6 +190,7 @@ defmodule Membrane.Core.Element.DemandController.AutoFlowUtils do :ok = AtomicDemand.increase(atomic_demand, diff) :atomics.put(stalker_metrics.demand, 1, auto_demand_size) + PadModel.set_data!(state, ref, :demand, auto_demand_size) else state @@ -97,14 +205,96 @@ defmodule Membrane.Core.Element.DemandController.AutoFlowUtils do state.effective_flow_control == :pull and not pad_data.auto_demand_paused? and pad_data.demand < pad_data.auto_demand_size / 2 and - Enum.all?(pad_data.associated_pads, &atomic_demand_positive?(&1, state)) + state.satisfied_auto_output_pads == @empty_map_set + end + + @spec pop_queues_and_bump_demand(State.t()) :: State.t() + def pop_queues_and_bump_demand(%State{popping_auto_flow_queue?: true} = state), do: state + + def pop_queues_and_bump_demand(%State{} = state) do + %{state | popping_auto_flow_queue?: true} + |> pop_auto_flow_queues_while_needed() + |> bump_demand() + |> Map.put(:popping_auto_flow_queue?, false) + end + + defp bump_demand(state) do + if state.effective_flow_control == :pull and + state.satisfied_auto_output_pads == @empty_map_set do + do_bump_demand(state) + else + state + end + end + + defp do_bump_demand(state) do + state.auto_input_pads + |> Enum.reject(&MapSet.member?(state.awaiting_auto_input_pads, &1)) + |> Enum.reduce(state, fn pad_ref, state -> + pad_data = PadModel.get_data!(state, pad_ref) + + if not pad_data.auto_demand_paused? and + pad_data.demand < pad_data.auto_demand_size / 2 do + diff = pad_data.auto_demand_size - pad_data.demand + :ok = AtomicDemand.increase(pad_data.atomic_demand, diff) + + :atomics.put(pad_data.stalker_metrics.demand, 1, pad_data.auto_demand_size) + + PadModel.set_data!(state, pad_ref, :demand, pad_data.auto_demand_size) + else + state + end + end) + end + + @spec pop_auto_flow_queues_while_needed(State.t()) :: State.t() + def pop_auto_flow_queues_while_needed(state) do + if (state.effective_flow_control == :push or + MapSet.size(state.satisfied_auto_output_pads) == 0) and + MapSet.size(state.awaiting_auto_input_pads) > 0 do + pop_random_auto_flow_queue(state) + |> pop_auto_flow_queues_while_needed() + else + state + end end - defp atomic_demand_positive?(pad_ref, state) do - atomic_demand_value = - PadModel.get_data!(state, pad_ref, :atomic_demand) - |> AtomicDemand.get() + defp pop_random_auto_flow_queue(state) do + pad_ref = Enum.random(state.awaiting_auto_input_pads) - atomic_demand_value > 0 + state + |> PadModel.get_data!(pad_ref, :auto_flow_queue) + |> Qex.pop() + |> case do + {{:value, {:buffer, buffer}}, popped_queue} -> + state = PadModel.set_data!(state, pad_ref, :auto_flow_queue, popped_queue) + state = BufferController.exec_buffer_callback(pad_ref, [buffer], state) + pop_stream_formats_and_events(pad_ref, state) + + {:empty, _empty_queue} -> + Map.update!(state, :awaiting_auto_input_pads, &MapSet.delete(&1, pad_ref)) + end + end + + defp pop_stream_formats_and_events(pad_ref, state) do + PadModel.get_data!(state, pad_ref, :auto_flow_queue) + |> Qex.pop() + |> case do + {{:value, {:event, event}}, popped_queue} -> + state = PadModel.set_data!(state, pad_ref, :auto_flow_queue, popped_queue) + state = EventController.exec_handle_event(pad_ref, event, state) + pop_stream_formats_and_events(pad_ref, state) + + {{:value, {:stream_format, stream_format}}, popped_queue} -> + state = PadModel.set_data!(state, pad_ref, :auto_flow_queue, popped_queue) + state = StreamFormatController.exec_handle_stream_format(pad_ref, stream_format, state) + pop_stream_formats_and_events(pad_ref, state) + + {{:value, {:buffer, _buffer}}, _popped_queue} -> + state + + {:empty, _empty_queue} -> + Map.update!(state, :awaiting_auto_input_pads, &MapSet.delete(&1, pad_ref)) + end end end diff --git a/lib/membrane/core/element/effective_flow_controller.ex b/lib/membrane/core/element/effective_flow_controller.ex index 8db26fb79..6d0849439 100644 --- a/lib/membrane/core/element/effective_flow_controller.ex +++ b/lib/membrane/core/element/effective_flow_controller.ex @@ -18,6 +18,7 @@ defmodule Membrane.Core.Element.EffectiveFlowController do # Effective flow control of a single element can switch between :push and :pull many times during the element's lifetime. + alias Membrane.Core.Element.DemandController alias Membrane.Core.Element.DemandController.AutoFlowUtils alias Membrane.Core.Element.{AtomicDemand, State} @@ -104,8 +105,8 @@ defmodule Membrane.Core.Element.EffectiveFlowController do state.pads_data |> Enum.filter(fn {_ref, %{flow_control: flow_control}} -> flow_control == :auto end) - |> Enum.reduce(state, fn - {_ref, %{direction: :output} = pad_data}, state -> + |> Enum.each(fn + {_ref, %{direction: :output} = pad_data} -> :ok = AtomicDemand.set_sender_status( pad_data.atomic_demand, @@ -120,9 +121,7 @@ defmodule Membrane.Core.Element.EffectiveFlowController do [pad_data.other_ref, new_effective_flow_control] ) - state - - {pad_ref, %{direction: :input} = pad_data}, state -> + {pad_ref, %{direction: :input} = pad_data} -> if triggering_pad in [pad_ref, nil] or AtomicDemand.get_receiver_status(pad_data.atomic_demand) != :to_be_resolved do :ok = @@ -131,8 +130,17 @@ defmodule Membrane.Core.Element.EffectiveFlowController do {:resolved, new_effective_flow_control} ) end - - AutoFlowUtils.auto_adjust_atomic_demand(pad_ref, state) end) + + with %{effective_flow_control: :pull} <- state do + Enum.reduce(state.pads_data, state, fn + {pad_ref, %{direction: :output, flow_control: :auto, end_of_stream?: false}}, state -> + DemandController.snapshot_atomic_demand(pad_ref, state) + + _pad_entry, state -> + state + end) + end + |> AutoFlowUtils.pop_queues_and_bump_demand() end end diff --git a/lib/membrane/core/element/event_controller.ex b/lib/membrane/core/element/event_controller.ex index 377b70a30..e4aff679e 100644 --- a/lib/membrane/core/element/event_controller.ex +++ b/lib/membrane/core/element/event_controller.ex @@ -13,11 +13,12 @@ defmodule Membrane.Core.Element.EventController do ActionHandler, CallbackContext, InputQueue, - PadController, PlaybackQueue, State } + alias Membrane.Core.Element.DemandController.AutoFlowUtils + require Membrane.Core.Child.PadModel require Membrane.Core.Message require Membrane.Core.Telemetry @@ -39,15 +40,24 @@ defmodule Membrane.Core.Element.EventController do playback: %State{playback: :playing} <- state do Telemetry.report_metric(:event, 1, inspect(pad_ref)) - if not Event.async?(event) and buffers_before_event_present?(data) do - PadModel.update_data!( - state, - pad_ref, - :input_queue, - &InputQueue.store(&1, :event, event) - ) - else - exec_handle_event(pad_ref, event, state) + async? = Event.async?(event) + + cond do + # events goes to the manual flow control input queue + not async? and buffers_before_event_present?(data) -> + PadModel.update_data!( + state, + pad_ref, + :input_queue, + &InputQueue.store(&1, :event, event) + ) + + # event goes to the auto flow control queue + not async? and MapSet.member?(state.awaiting_auto_input_pads, pad_ref) -> + AutoFlowUtils.store_event_in_queue(pad_ref, event, state) + + true -> + exec_handle_event(pad_ref, event, state) end else pad: {:error, :unknown_pad} -> @@ -97,8 +107,10 @@ defmodule Membrane.Core.Element.EventController do else Membrane.Logger.debug("Received end of stream on pad #{inspect(pad_ref)}") - state = PadModel.set_data!(state, pad_ref, :end_of_stream?, true) - state = PadController.remove_pad_associations(pad_ref, state) + state = + PadModel.set_data!(state, pad_ref, :end_of_stream?, true) + |> Map.update!(:awaiting_auto_input_pads, &MapSet.delete(&1, pad_ref)) + |> Map.update!(:auto_input_pads, &List.delete(&1, pad_ref)) %{ start_of_stream?: start_of_stream?, diff --git a/lib/membrane/core/element/pad_controller.ex b/lib/membrane/core/element/pad_controller.ex index 6b4d73ff9..61530f83f 100644 --- a/lib/membrane/core/element/pad_controller.ex +++ b/lib/membrane/core/element/pad_controller.ex @@ -227,7 +227,6 @@ defmodule Membrane.Core.Element.PadController do Stalker.unregister_link(state.stalker, pad_ref) state = generate_eos_if_needed(pad_ref, state) state = maybe_handle_pad_removed(pad_ref, state) - state = remove_pad_associations(pad_ref, state) {pad_data, state} = Map.update!(state, :pad_refs, &List.delete(&1, pad_ref)) @@ -239,6 +238,10 @@ defmodule Membrane.Core.Element.PadController do else _pad_data -> state end + |> Map.update!(:satisfied_auto_output_pads, &MapSet.delete(&1, pad_ref)) + |> Map.update!(:awaiting_auto_input_pads, &MapSet.delete(&1, pad_ref)) + |> Map.update!(:auto_input_pads, &List.delete(&1, pad_ref)) + |> AutoFlowUtils.pop_queues_and_bump_demand() else {:ok, %{availability: :always}} when state.terminating? -> state @@ -307,7 +310,6 @@ defmodule Membrane.Core.Element.PadController do stream_format: nil, start_of_stream?: false, end_of_stream?: false, - associated_pads: [], atomic_demand: metadata.atomic_demand, stalker_metrics: %{ total_buffers: total_buffers_metric @@ -328,31 +330,19 @@ defmodule Membrane.Core.Element.PadController do {:resolved, EffectiveFlowController.get_pad_effective_flow_control(pad_data.ref, state)} ) - state = update_associated_pads(pad_data, state) + case pad_data do + %{direction: :output, flow_control: :auto} -> + Map.update!(state, :satisfied_auto_output_pads, &MapSet.put(&1, pad_data.ref)) - if pad_data.direction == :input and pad_data.flow_control == :auto do - AutoFlowUtils.auto_adjust_atomic_demand(endpoint.pad_ref, state) - else - state - end - end + %{direction: :input, flow_control: :auto} -> + AutoFlowUtils.auto_adjust_atomic_demand(endpoint.pad_ref, state) + |> Map.update!(:auto_input_pads, &[endpoint.pad_ref | &1]) - defp update_associated_pads(%{flow_control: :auto} = pad_data, state) do - state.pads_data - |> Map.values() - |> Enum.filter(&(&1.direction != pad_data.direction and &1.flow_control == :auto)) - |> Enum.reduce(state, fn associated_pad_data, state -> - PadModel.update_data!( - state, - associated_pad_data.ref, - :associated_pads, - &[pad_data.ref | &1] - ) - end) + _pad_data -> + state + end end - defp update_associated_pads(_pad_data, state), do: state - defp merge_pad_direction_data(%{direction: :input} = pad_data, metadata, _state) do pad_data |> Map.merge(%{ @@ -412,14 +402,8 @@ defmodule Membrane.Core.Element.PadController do %{flow_control: :auto, direction: direction} = pad_data, pad_props, _other_info, - %State{} = state + _state ) do - associated_pads = - state.pads_data - |> Map.values() - |> Enum.filter(&(&1.direction != direction and &1.flow_control == :auto)) - |> Enum.map(& &1.ref) - auto_demand_size = cond do direction == :output -> @@ -449,7 +433,6 @@ defmodule Membrane.Core.Element.PadController do pad_data |> Map.merge(%{ demand: 0, - associated_pads: associated_pads, auto_demand_size: auto_demand_size }) |> put_in([:stalker_metrics, :demand], demand_metric) @@ -472,28 +455,6 @@ defmodule Membrane.Core.Element.PadController do end end - @doc """ - Removes all associations between the given pad and any other_endpoint pads. - """ - @spec remove_pad_associations(Pad.ref(), State.t()) :: State.t() - def remove_pad_associations(pad_ref, state) do - case PadModel.get_data!(state, pad_ref) do - %{flow_control: :auto} = pad_data -> - state = - Enum.reduce(pad_data.associated_pads, state, fn pad, state -> - PadModel.update_data!(state, pad, :associated_pads, &List.delete(&1, pad_data.ref)) - end) - |> PadModel.set_data!(pad_ref, :associated_pads, []) - - if pad_data.direction == :output, - do: AutoFlowUtils.auto_adjust_atomic_demand(pad_data.associated_pads, state), - else: state - - _pad_data -> - state - end - end - @spec maybe_handle_pad_added(Pad.ref(), State.t()) :: State.t() defp maybe_handle_pad_added(ref, state) do %{options: pad_opts, availability: availability} = PadModel.get_data!(state, ref) diff --git a/lib/membrane/core/element/state.ex b/lib/membrane/core/element/state.ex index 141b53afa..9f990e68f 100644 --- a/lib/membrane/core/element/state.ex +++ b/lib/membrane/core/element/state.ex @@ -33,6 +33,7 @@ defmodule Membrane.Core.Element.State do stream_sync: Sync.t(), clock: Clock.t() | nil }, + auto_input_pads: [Pad.ref()], initialized?: boolean(), playback: Membrane.Playback.t(), playback_queue: Membrane.Core.Element.PlaybackQueue.t(), @@ -42,8 +43,11 @@ defmodule Membrane.Core.Element.State do setup_incomplete?: boolean(), effective_flow_control: EffectiveFlowController.effective_flow_control(), handling_action?: boolean(), + popping_auto_flow_queue?: boolean(), pads_to_snapshot: MapSet.t(), - stalker: Membrane.Core.Stalker.t() + stalker: Membrane.Core.Stalker.t(), + satisfied_auto_output_pads: MapSet.t(), + awaiting_auto_input_pads: MapSet.t() } # READ THIS BEFORE ADDING NEW FIELD!!! @@ -72,6 +76,7 @@ defmodule Membrane.Core.Element.State do :setup_incomplete?, :supplying_demand?, :handling_action?, + :popping_auto_flow_queue?, :stalker, :resource_guard, :subprocess_supervisor, @@ -79,6 +84,9 @@ defmodule Membrane.Core.Element.State do :demand_size, :pads_to_snapshot, :playback_queue, - :pads_data + :pads_data, + :satisfied_auto_output_pads, + :awaiting_auto_input_pads, + :auto_input_pads ] end diff --git a/lib/membrane/core/element/stream_format_controller.ex b/lib/membrane/core/element/stream_format_controller.ex index 1c03bce9e..b22637cc7 100644 --- a/lib/membrane/core/element/stream_format_controller.ex +++ b/lib/membrane/core/element/stream_format_controller.ex @@ -9,6 +9,7 @@ defmodule Membrane.Core.Element.StreamFormatController do alias Membrane.Core.{CallbackHandler, Telemetry} alias Membrane.Core.Child.PadModel alias Membrane.Core.Element.{ActionHandler, CallbackContext, InputQueue, PlaybackQueue, State} + alias Membrane.Core.Element.DemandController.AutoFlowUtils require Membrane.Core.Child.PadModel require Membrane.Core.Telemetry @@ -28,15 +29,22 @@ defmodule Membrane.Core.Element.StreamFormatController do queue = data.input_queue - if queue && not InputQueue.empty?(queue) do - PadModel.set_data!( - state, - pad_ref, - :input_queue, - InputQueue.store(queue, :stream_format, stream_format) - ) - else - exec_handle_stream_format(pad_ref, stream_format, state) + cond do + # stream format goes to the manual flow control input queue + queue && not InputQueue.empty?(queue) -> + PadModel.set_data!( + state, + pad_ref, + :input_queue, + InputQueue.store(queue, :stream_format, stream_format) + ) + + # stream format goes to the auto flow control queue + pad_ref in state.awaiting_auto_input_pads -> + AutoFlowUtils.store_stream_format_in_queue(pad_ref, stream_format, state) + + true -> + exec_handle_stream_format(pad_ref, stream_format, state) end else pad: {:error, :unknown_pad} -> diff --git a/lib/membrane/element/pad_data.ex b/lib/membrane/element/pad_data.ex index f427edfac..3be09d473 100644 --- a/lib/membrane/element/pad_data.ex +++ b/lib/membrane/element/pad_data.ex @@ -40,6 +40,7 @@ defmodule Membrane.Element.PadData do pid: private_field, other_ref: private_field, input_queue: private_field, + auto_flow_queue: private_field, incoming_demand: integer() | nil, demand_unit: private_field, other_demand_unit: private_field, @@ -59,7 +60,6 @@ defmodule Membrane.Element.PadData do # Contains amount of data (:buffers/:bytes), that has been demanded from the element on the other side of link, but # hasn't arrived yet. Unused for output pads. manual_demand_size: private_field, - associated_pads: private_field, sticky_events: private_field, other_effective_flow_control: private_field, stalker_metrics: private_field @@ -80,6 +80,7 @@ defmodule Membrane.Element.PadData do defstruct @enforce_keys ++ [ input_queue: nil, + auto_flow_queue: Qex.new(), demand: 0, incoming_demand: nil, demand_unit: nil, @@ -89,7 +90,6 @@ defmodule Membrane.Element.PadData do sticky_messages: [], atomic_demand: nil, manual_demand_size: 0, - associated_pads: [], sticky_events: [], stream_format_validation_params: [], other_demand_unit: nil, diff --git a/test/membrane/core/element/action_handler_test.exs b/test/membrane/core/element/action_handler_test.exs index 4c505230d..5a82d92cd 100644 --- a/test/membrane/core/element/action_handler_test.exs +++ b/test/membrane/core/element/action_handler_test.exs @@ -18,7 +18,7 @@ defmodule Membrane.Core.Element.ActionHandlerTest do defp demand_test_filter(_context) do state = - struct(State, + struct!(State, module: Filter, name: :test_name, type: :filter, @@ -47,7 +47,10 @@ defmodule Membrane.Core.Element.ActionHandlerTest do pads_info: %{ input: %{flow_control: :manual}, input_push: %{flow_control: :push} - } + }, + satisfied_auto_output_pads: MapSet.new(), + awaiting_auto_input_pads: MapSet.new(), + popping_auto_flow_queue?: false ) [state: state] @@ -100,7 +103,7 @@ defmodule Membrane.Core.Element.ActionHandlerTest do }) state = - struct(State, + struct!(State, module: TrivialFilter, name: :elem_name, type: :filter, @@ -140,7 +143,10 @@ defmodule Membrane.Core.Element.ActionHandlerTest do pads_info: %{ output: %{flow_control: :push}, input: %{flow_control: :push} - } + }, + satisfied_auto_output_pads: MapSet.new(), + awaiting_auto_input_pads: MapSet.new(), + popping_auto_flow_queue?: false ) [state: state] diff --git a/test/membrane/core/element/atomic_demand_test.exs b/test/membrane/core/element/atomic_demand_test.exs index 0cbd513fd..edf425ecf 100644 --- a/test/membrane/core/element/atomic_demand_test.exs +++ b/test/membrane/core/element/atomic_demand_test.exs @@ -10,7 +10,7 @@ defmodule Membrane.Core.Element.AtomicDemandTest do assert get_atomic_value(atomic_demand) == 10 - atomic_demand = AtomicDemand.decrease(atomic_demand, 15) + assert {{:decreased, -5}, atomic_demand} = AtomicDemand.decrease(atomic_demand, 15) assert atomic_demand.buffered_decrementation == 0 assert get_atomic_value(atomic_demand) == -5 @@ -74,7 +74,7 @@ defmodule Membrane.Core.Element.AtomicDemandTest do atomic_demand = new_atomic_demand(:pull, sleeping_process, self()) :ok = AtomicDemand.set_sender_status(atomic_demand, {:resolved, :push}) - atomic_demand = AtomicDemand.decrease(atomic_demand, 100) + {{:decreased, -100}, atomic_demand} = AtomicDemand.decrease(atomic_demand, 100) refute_receive {:DOWN, ^monitor_ref, :process, _pid, _reason} @@ -88,8 +88,9 @@ defmodule Membrane.Core.Element.AtomicDemandTest do |> Enum.reduce(atomic_demand, fn {sender_status, receiver_status}, atomic_demand -> :ok = AtomicDemand.set_sender_status(atomic_demand, sender_status) :ok = AtomicDemand.set_receiver_status(atomic_demand, receiver_status) - atomic_demand = AtomicDemand.decrease(atomic_demand, 1000) + {_status_update, atomic_demand} = AtomicDemand.decrease(atomic_demand, 1000) + refute atomic_demand.toilet_overflowed? refute_receive {:DOWN, ^monitor_ref, :process, _pid, _reason} atomic_demand @@ -97,8 +98,9 @@ defmodule Membrane.Core.Element.AtomicDemandTest do :ok = AtomicDemand.set_sender_status(atomic_demand, {:resolved, :push}) :ok = AtomicDemand.set_receiver_status(atomic_demand, {:resolved, :pull}) - _atomic_demand = AtomicDemand.decrease(atomic_demand, 1000) + {{:decreased, _atomic_value}, atomic_demand} = AtomicDemand.decrease(atomic_demand, 1000) + assert atomic_demand.toilet_overflowed? assert_receive {:DOWN, ^monitor_ref, :process, _pid, _reason} end @@ -109,19 +111,19 @@ defmodule Membrane.Core.Element.AtomicDemandTest do assert %AtomicDemand{throttling_factor: 150} = atomic_demand - atomic_demand = AtomicDemand.decrease(atomic_demand, 100) + assert {:unchanged, %AtomicDemand{buffered_decrementation: 100} = atomic_demand} = + AtomicDemand.decrease(atomic_demand, 100) - assert %AtomicDemand{buffered_decrementation: 100} = atomic_demand assert get_atomic_value(atomic_demand) == 0 - atomic_demand = AtomicDemand.decrease(atomic_demand, 49) + assert {:unchanged, %AtomicDemand{buffered_decrementation: 149} = atomic_demand} = + AtomicDemand.decrease(atomic_demand, 49) - assert %AtomicDemand{buffered_decrementation: 149} = atomic_demand assert get_atomic_value(atomic_demand) == 0 - atomic_demand = AtomicDemand.decrease(atomic_demand, 51) + assert {{:decreased, -200}, %AtomicDemand{buffered_decrementation: 0} = atomic_demand} = + AtomicDemand.decrease(atomic_demand, 51) - assert %AtomicDemand{buffered_decrementation: 0} = atomic_demand assert get_atomic_value(atomic_demand) == -200 end diff --git a/test/membrane/core/element/event_controller_test.exs b/test/membrane/core/element/event_controller_test.exs index 198d13e4e..356501e6d 100644 --- a/test/membrane/core/element/event_controller_test.exs +++ b/test/membrane/core/element/event_controller_test.exs @@ -43,7 +43,7 @@ defmodule Membrane.Core.Element.EventControllerTest do }) state = - struct(State, + struct!(State, module: MockEventHandlingElement, name: :test_name, type: :filter, @@ -64,7 +64,10 @@ defmodule Membrane.Core.Element.EventControllerTest do input_queue: input_queue, demand: 0 ) - } + }, + satisfied_auto_output_pads: MapSet.new(), + awaiting_auto_input_pads: MapSet.new(), + auto_input_pads: [] ) assert AtomicDemand.get(atomic_demand) > 0 diff --git a/test/membrane/core/element/input_queue_test.exs b/test/membrane/core/element/input_queue_test.exs index 116143637..efa95c490 100644 --- a/test/membrane/core/element/input_queue_test.exs +++ b/test/membrane/core/element/input_queue_test.exs @@ -317,7 +317,7 @@ defmodule Membrane.Core.Element.InputQueueTest do queue = InputQueue.store(queue, [%Buffer{payload: "12345678"}]) queue = InputQueue.store(queue, [%Buffer{payload: "12"}]) queue = InputQueue.store(queue, [%Buffer{payload: "12"}]) - queue = Map.update!(queue, :atomic_demand, &AtomicDemand.decrease(&1, 16)) + queue = Map.update!(queue, :atomic_demand, &elem(AtomicDemand.decrease(&1, 16), 1)) assert queue.size == 16 assert queue.demand == -6 {out, queue} = InputQueue.take(queue, 2) @@ -354,7 +354,7 @@ defmodule Membrane.Core.Element.InputQueueTest do queue = InputQueue.store(queue, [%Buffer{payload: "12345678"}]) queue = InputQueue.store(queue, [%Buffer{payload: "12"}]) queue = InputQueue.store(queue, [%Buffer{payload: "12"}]) - queue = Map.update!(queue, :atomic_demand, &AtomicDemand.decrease(&1, 4)) + queue = Map.update!(queue, :atomic_demand, &elem(AtomicDemand.decrease(&1, 4), 1)) assert queue.size == 4 assert queue.demand == -1 {out, queue} = InputQueue.take(queue, 2) diff --git a/test/membrane/core/element/lifecycle_controller_test.exs b/test/membrane/core/element/lifecycle_controller_test.exs index e2d68faa2..53d06f2cd 100644 --- a/test/membrane/core/element/lifecycle_controller_test.exs +++ b/test/membrane/core/element/lifecycle_controller_test.exs @@ -42,7 +42,7 @@ defmodule Membrane.Core.Element.LifecycleControllerTest do }) state = - struct(State, + struct!(State, module: DummyElement, name: :test_name, type: :filter, @@ -63,7 +63,10 @@ defmodule Membrane.Core.Element.LifecycleControllerTest do input_queue: input_queue, demand: 0 ) - } + }, + satisfied_auto_output_pads: MapSet.new(), + awaiting_auto_input_pads: MapSet.new(), + auto_input_pads: [] ) assert_received Message.new(:atomic_demand_increased, :some_pad) diff --git a/test/membrane/core/element/pad_controller_test.exs b/test/membrane/core/element/pad_controller_test.exs index ad17195e1..e52ea842b 100644 --- a/test/membrane/core/element/pad_controller_test.exs +++ b/test/membrane/core/element/pad_controller_test.exs @@ -15,16 +15,18 @@ defmodule Membrane.Core.Element.PadControllerTest do @module Membrane.Core.Element.PadController defp prepare_state(elem_module, name \\ :element) do - struct(State, + struct!(State, name: name, module: elem_module, - callback_depth_counter: 0, pads_to_snapshot: MapSet.new(), parent_pid: self(), internal_state: %{}, synchronization: %{clock: nil, parent_clock: nil}, subprocess_supervisor: SubprocessSupervisor.start_link!(), - stalker: %Membrane.Core.Stalker{pid: spawn(fn -> :ok end), ets: nil} + stalker: %Membrane.Core.Stalker{pid: spawn(fn -> :ok end), ets: nil}, + satisfied_auto_output_pads: MapSet.new(), + awaiting_auto_input_pads: MapSet.new(), + auto_input_pads: [] ) |> PadSpecHandler.init_pads() end diff --git a/test/membrane/core/element/stream_format_controller_test.exs b/test/membrane/core/element/stream_format_controller_test.exs index 309313887..1235e26a3 100644 --- a/test/membrane/core/element/stream_format_controller_test.exs +++ b/test/membrane/core/element/stream_format_controller_test.exs @@ -35,10 +35,9 @@ defmodule Membrane.Core.Element.StreamFormatControllerTest do }) state = - struct(State, + struct!(State, module: Filter, name: :test_name, - parent: self(), type: :filter, playback: :playing, synchronization: %{clock: nil, parent_clock: nil}, @@ -54,7 +53,9 @@ defmodule Membrane.Core.Element.StreamFormatControllerTest do input_queue: input_queue, demand: 0 ) - } + }, + satisfied_auto_output_pads: MapSet.new(), + awaiting_auto_input_pads: MapSet.new() ) assert_received Message.new(:atomic_demand_increased, :some_pad) diff --git a/test/membrane/integration/auto_demands_test.exs b/test/membrane/integration/auto_demands_test.exs index 18bdd7b71..702f8b1f1 100644 --- a/test/membrane/integration/auto_demands_test.exs +++ b/test/membrane/integration/auto_demands_test.exs @@ -4,9 +4,12 @@ defmodule Membrane.Integration.AutoDemandsTest do import Membrane.ChildrenSpec import Membrane.Testing.Assertions + alias Membrane.Buffer alias Membrane.Testing.{Pipeline, Sink, Source} - defmodule AutoDemandFilter do + require Membrane.Pad, as: Pad + + defmodule ExponentialAutoFilter do use Membrane.Filter def_input_pad :input, accepted_format: _any @@ -35,6 +38,32 @@ defmodule Membrane.Integration.AutoDemandsTest do end end + defmodule NotifyingAutoFilter do + use Membrane.Filter + + def_input_pad :input, accepted_format: _any, availability: :on_request + def_output_pad :output, accepted_format: _any + + @impl true + def handle_playing(_ctx, state), do: {[notify_parent: :playing], state} + + @impl true + def handle_parent_notification(actions, _ctx, state), do: {actions, state} + + @impl true + def handle_buffer(pad, buffer, _ctx, state) do + actions = [ + notify_parent: {:handling_buffer, pad, buffer}, + buffer: {:output, buffer} + ] + + {actions, state} + end + + @impl true + def handle_end_of_stream(_pad, _ctx, state), do: {[], state} + end + defmodule AutoDemandTee do use Membrane.Filter @@ -64,7 +93,7 @@ defmodule Membrane.Integration.AutoDemandsTest do :down -> {mult_payloads, payloads} end - filter = %AutoDemandFilter{factor: factor, direction: direction} + filter = %ExponentialAutoFilter{factor: factor, direction: direction} pipeline = Pipeline.start_link_supervised!( @@ -81,6 +110,8 @@ defmodule Membrane.Integration.AutoDemandsTest do assert_end_of_stream(pipeline, :sink) refute_sink_buffer(pipeline, :sink, _buffer, 0) + + Pipeline.terminate(pipeline) end end) @@ -108,6 +139,8 @@ defmodule Membrane.Integration.AutoDemandsTest do end) refute_sink_buffer(pipeline, :left_sink, %{payload: 25_000}) + + Pipeline.terminate(pipeline) end test "handle removed branch" do @@ -127,6 +160,8 @@ defmodule Membrane.Integration.AutoDemandsTest do assert_sink_buffer(pipeline, :left_sink, buffer) assert buffer.payload == payload end) + + Pipeline.terminate(pipeline) end defmodule NotifyingSink do @@ -174,6 +209,8 @@ defmodule Membrane.Integration.AutoDemandsTest do {:buffer_arrived, %Membrane.Buffer{payload: ^payload}} ) end + + Pipeline.terminate(pipeline) end end) @@ -202,7 +239,7 @@ defmodule Membrane.Integration.AutoDemandsTest do Pipeline.start_link_supervised!( spec: child(:source, PushSource) - |> child(:filter, AutoDemandFilter) + |> child(:filter, ExponentialAutoFilter) |> child(:sink, Sink) ) @@ -230,7 +267,7 @@ defmodule Membrane.Integration.AutoDemandsTest do Pipeline.start_supervised!( spec: child(:source, PushSource) - |> child(:filter, AutoDemandFilter) + |> child(:filter, ExponentialAutoFilter) |> child(:sink, %Sink{autodemand: false}) ) @@ -246,6 +283,163 @@ defmodule Membrane.Integration.AutoDemandsTest do ) end + defp source_definiton(name) do + # Testing.Source fed with such actions generator will produce buffers with incremenal + # sequence of numbers as payloads + actions_generator = + fn counter, _size -> + Process.sleep(1) + + buffer = %Buffer{ + metadata: %{creator: name}, + payload: counter + } + + actions = [buffer: {:output, buffer}, redemand: :output] + {actions, counter + 1} + end + + %Source{output: {1, actions_generator}} + end + + defp setup_pipeline_with_notifying_auto_filter(_context) do + pipeline = + Pipeline.start_link_supervised!( + spec: [ + child({:source, 0}, source_definiton({:source, 0})) + |> via_in(Pad.ref(:input, 0)) + |> child(:filter, NotifyingAutoFilter) + |> child(:sink, %Sink{autodemand: false}), + child({:source, 1}, source_definiton({:source, 1})) + |> via_in(Pad.ref(:input, 1)) + |> get_child(:filter) + ] + ) + + [pipeline: pipeline] + end + + describe "auto flow queue" do + setup :setup_pipeline_with_notifying_auto_filter + + defp receive_processed_buffers(pipeline, limit, acc \\ []) + + defp receive_processed_buffers(_pipeline, limit, acc) when limit <= 0 do + Enum.reverse(acc) + end + + defp receive_processed_buffers(pipeline, limit, acc) do + receive do + {Pipeline, ^pipeline, + {:handle_child_notification, {{:handling_buffer, _pad, buffer}, :filter}}} -> + receive_processed_buffers(pipeline, limit - 1, [buffer | acc]) + after + 500 -> Enum.reverse(acc) + end + end + + test "when there is no demand on the output pad", %{pipeline: pipeline} do + manual_flow_queue_size = 40 + + assert_pipeline_notified(pipeline, :filter, :playing) + + buffers = receive_processed_buffers(pipeline, 100) + assert length(buffers) == manual_flow_queue_size + + demand = 10_000 + Pipeline.message_child(pipeline, :sink, {:make_demand, demand}) + + buffers = receive_processed_buffers(pipeline, 2 * demand) + buffers_number = length(buffers) + + # check if filter processed proper number of buffers + assert demand <= buffers_number + assert buffers_number <= demand + manual_flow_queue_size + + # check if filter processed buffers from both sources + buffers_by_creator = Enum.group_by(buffers, & &1.metadata.creator) + assert Enum.count(buffers_by_creator) == 2 + + # check if filter balanced procesesd buffers by their origin - numbers of + # buffers coming from each source should be similar + counter_0 = Map.fetch!(buffers_by_creator, {:source, 0}) |> length() + counter_1 = Map.fetch!(buffers_by_creator, {:source, 1}) |> length() + sources_ratio = counter_0 / counter_1 + + assert 0.8 < sources_ratio and sources_ratio < 1.2 + + Pipeline.terminate(pipeline) + end + + test "when an element returns :pause_auto_demand and :resume_auto_demand action", %{ + pipeline: pipeline + } do + manual_flow_queue_size = 40 + auto_flow_demand_size = 400 + + assert_pipeline_notified(pipeline, :filter, :playing) + + Pipeline.message_child(pipeline, :filter, pause_auto_demand: Pad.ref(:input, 0)) + + # time for :filter to pause demand on Pad.ref(:input, 0) + Process.sleep(500) + + buffers = receive_processed_buffers(pipeline, 100) + assert length(buffers) == manual_flow_queue_size + + demand = 10_000 + Pipeline.message_child(pipeline, :sink, {:make_demand, demand}) + + # fliter paused auto demand on Pad.ref(:input, 0), so it should receive + # at most auto_flow_demand_size buffers from there and rest of the buffers + # from Pad.ref(:input, 1) + buffers = receive_processed_buffers(pipeline, 2 * demand) + buffers_number = length(buffers) + + assert demand <= buffers_number + assert buffers_number <= demand + manual_flow_queue_size + + buffers_by_creator = Enum.group_by(buffers, & &1.metadata.creator) + counter_0 = Map.get(buffers_by_creator, {:source, 0}, []) |> length() + counter_1 = Map.fetch!(buffers_by_creator, {:source, 1}) |> length() + + # at most auto_flow_demand_size buffers came from {:source, 0} + assert auto_flow_demand_size - manual_flow_queue_size <= counter_0 + assert counter_0 <= auto_flow_demand_size + + # rest of them came from {:source, 1} + assert demand - auto_flow_demand_size <= counter_1 + + Pipeline.message_child(pipeline, :filter, resume_auto_demand: Pad.ref(:input, 0)) + + # time for :filter to resume demand on Pad.ref(:input, 0) + Process.sleep(500) + + Pipeline.message_child(pipeline, :sink, {:make_demand, demand}) + + buffers = receive_processed_buffers(pipeline, 2 * demand) + buffers_number = length(buffers) + + # check if filter processed proper number of buffers + assert demand <= buffers_number + assert buffers_number <= demand + manual_flow_queue_size + + # check if filter processed buffers from both sources + buffers_by_creator = Enum.group_by(buffers, & &1.metadata.creator) + assert Enum.count(buffers_by_creator) == 2 + + # check if filter balanced procesesd buffers by their origin - numbers of + # buffers coming from each source should be similar + counter_0 = Map.fetch!(buffers_by_creator, {:source, 0}) |> length() + counter_1 = Map.fetch!(buffers_by_creator, {:source, 1}) |> length() + sources_ratio = counter_0 / counter_1 + + assert 0.8 < sources_ratio and sources_ratio < 1.2 + + Pipeline.terminate(pipeline) + end + end + defp reduce_link(link, enum, fun) do Enum.reduce(enum, link, &fun.(&2, &1)) end diff --git a/test/membrane/integration/demands_test.exs b/test/membrane/integration/demands_test.exs index ea9880c56..3f3b30b2c 100644 --- a/test/membrane/integration/demands_test.exs +++ b/test/membrane/integration/demands_test.exs @@ -280,9 +280,6 @@ defmodule Membrane.Integration.DemandsTest do assert_sink_playing(pipeline, :sink) - # time for pipeline to start playing - Process.sleep(1000) - for i <- 1..10 do # during sleep below source should send around 100 buffers Process.sleep(100 * RedemandingSource.sleep_time()) From facc587e170996f6bb5872dbcf231ab4dfd7174c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Chali=C5=84ski?= Date: Mon, 26 Feb 2024 16:14:56 +0100 Subject: [PATCH 09/11] commented out action that fails due to gh actions bug --- .../add_pr_to_smackore_board/action.yml | 28 ++++++++++--------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/.github/actions/add_pr_to_smackore_board/action.yml b/.github/actions/add_pr_to_smackore_board/action.yml index d4e708f02..27ce92b46 100644 --- a/.github/actions/add_pr_to_smackore_board/action.yml +++ b/.github/actions/add_pr_to_smackore_board/action.yml @@ -1,5 +1,5 @@ name: 'Add PR to Smackore board, if author is from community' -description: 'Adds PR to "New issues by community" column in Smackore project board, if PR author is from outside Membrane Team.' +description: '(disabled due to github-side bug) Adds PR to "New issues by community" column in Smackore project board, if PR author is from outside Membrane Team.' inputs: GITHUB_TOKEN: description: 'GitHub token' @@ -19,21 +19,23 @@ runs: repository: membraneframework/membrane_core - name: Maybe add PR to board and set ticket status run: | - export PROJECT_NUMBER=19 - export PROJECT_ID=PVT_kwDOAYE_z84AWEIB - export STATUS_FIELD_ID=PVTSSF_lADOAYE_z84AWEIBzgOGd1k - export TARGET_COLUMN_ID=e6b1ee10 + # currently this causes github action crash, more info here: https://github.com/membraneframework/membrane_core/issues/749 + + # export PROJECT_NUMBER=19 + # export PROJECT_ID=PVT_kwDOAYE_z84AWEIB + # export STATUS_FIELD_ID=PVTSSF_lADOAYE_z84AWEIBzgOGd1k + # export TARGET_COLUMN_ID=e6b1ee10 - export AUTHOR_ORIGIN=$(gh api -H "Accept: application/vnd.github+json" -H "X-GitHub-Api-Version: 2022-11-28" /orgs/membraneframework/teams/membraneteam/members | python scripts/python/get_author_origin.py $AUTHOR_LOGIN) + # export AUTHOR_ORIGIN=$(gh api -H "Accept: application/vnd.github+json" -H "X-GitHub-Api-Version: 2022-11-28" /orgs/membraneframework/teams/membraneteam/members | python scripts/python/get_author_origin.py $AUTHOR_LOGIN) - if [ "$AUTHOR_ORIGIN" == "COMMUNITY" ] - then - gh pr edit "$PR_URL" --add-project Smackore - sleep 10 + # if [ "$AUTHOR_ORIGIN" == "COMMUNITY" ] + # then + # gh pr edit "$PR_URL" --add-project Smackore + # sleep 10 - export TICKET_ID=$(gh project item-list $PROJECT_NUMBER --owner membraneframework --format json --limit 10000000 | python scripts/python/get_ticket_id.py "$PR_URL") - gh project item-edit --id $TICKET_ID --field-id $STATUS_FIELD_ID --project-id $PROJECT_ID --single-select-option-id $TARGET_COLUMN_ID - fi + # export TICKET_ID=$(gh project item-list $PROJECT_NUMBER --owner membraneframework --format json --limit 10000000 | python scripts/python/get_ticket_id.py "$PR_URL") + # gh project item-edit --id $TICKET_ID --field-id $STATUS_FIELD_ID --project-id $PROJECT_ID --single-select-option-id $TARGET_COLUMN_ID + # fi env: GH_TOKEN: ${{ inputs.GITHUB_TOKEN }} From e4305da67493219a224ccb2ccadd75360f81cbee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Feliks=20Pobiedzi=C5=84ski?= <38541925+FelonEkonom@users.noreply.github.com> Date: Mon, 26 Feb 2024 16:42:31 +0100 Subject: [PATCH 10/11] Fix bug in handling actions returned from handle_tick. Deprecate handle_spec_started. (#708) * wip * Write tests wip * Write tests wip * Revert "Fix timer running late (#685)" This reverts commit 168f57e12bf3280cda82b1116796151d4b1cb486. * Fix actions handling order bug related to Pipeline.handle_playing * Add assertion on value passed with :setup action * WIP Fix bug in executing handle_buffer while handling actions from previous callback * Fix tests wip * Fix CI * Update changelog * Stopt calling handle_spec_started in between handling actions * Make demands test more strict * Add dots to changelog * Fix double tick bug * wip * Deprecate handle_spec_started/3 * Remove unused aliases * Remove unnecessary warning * Bump version to 1.0.1 * Remove leftovers * Fix docs * Implement suggestions from CR, bump version to 1.1.0-rc --- CHANGELOG.md | 5 +- lib/membrane/bin.ex | 9 +- lib/membrane/core/callback_handler.ex | 13 ++ lib/membrane/core/element.ex | 13 -- lib/membrane/core/element/action_handler.ex | 66 +++--- lib/membrane/core/element/demand_handler.ex | 9 + lib/membrane/core/element/event_controller.ex | 4 +- lib/membrane/core/lifecycle_controller.ex | 18 +- lib/membrane/core/parent.ex | 17 ++ .../child_life_controller/startup_utils.ex | 21 +- lib/membrane/core/pipeline/action_handler.ex | 10 + lib/membrane/core/pipeline/state.ex | 6 +- lib/membrane/core/timer.ex | 21 +- lib/membrane/core/timer_controller.ex | 7 + lib/membrane/pipeline.ex | 9 +- lib/membrane/testing/pipeline.ex | 7 +- mix.exs | 2 +- .../core/element/action_handler_test.exs | 11 - .../core/element/event_controller_test.exs | 3 + .../element/lifecycle_controller_test.exs | 2 + .../core/element/pad_controller_test.exs | 3 + .../element/stream_format_controller_test.exs | 3 + test/membrane/core/pipeline_test.exs | 2 + .../actions_handling_order_test.exs | 191 ++++++++++++++++++ test/membrane/integration/demands_test.exs | 3 +- test/membrane/integration/linking_test.exs | 48 ++--- .../integration/toilet_forwarding_test.exs | 3 +- 27 files changed, 382 insertions(+), 124 deletions(-) create mode 100644 test/membrane/integration/actions_handling_order_test.exs diff --git a/CHANGELOG.md b/CHANGELOG.md index da6ceab62..283e1bbee 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,8 +1,11 @@ # Changelog +## 1.1.0-rc0 + * Deprecate `handle_spec_started/3` callback in Bins and Pipelines. [#708](https://github.com/membraneframework/membrane_core/pull/708) + * Handle buffers from input pads having `flow_control: :auto` only if demand on all output pads having `flow_control: :auto` is positive. [#693](https://github.com/membraneframework/membrane_core/pull/693) + ## 1.0.1 * Specify the order in which state fields will be printed in the error logs. [#614](https://github.com/membraneframework/membrane_core/pull/614) - * Handle buffers from input pads having `flow_control: :auto` only if demand on all output pads having `flow_control: :auto` is positive. [#693](https://github.com/membraneframework/membrane_core/pull/693) * Fix clock selection [#626](https://github.com/membraneframework/membrane_core/pull/626) * Log messages in the default handle_info implementation [#680](https://github.com/membraneframework/membrane_core/pull/680) * Fix typespecs in Membrane.UtilitySupervisor [#681](https://github.com/membraneframework/membrane_core/pull/681) diff --git a/lib/membrane/bin.ex b/lib/membrane/bin.ex index d13151fd4..b2968073e 100644 --- a/lib/membrane/bin.ex +++ b/lib/membrane/bin.ex @@ -167,9 +167,11 @@ defmodule Membrane.Bin do ) :: callback_return @doc """ + This callback is deprecated since v1.1.0-rc0 + Callback invoked when children of `Membrane.ChildrenSpec` are started. - By default, it does nothing. + It is invoked, only if pipeline module contains its definition. Otherwise, nothing happens. """ @callback handle_spec_started( children :: [Child.name()], @@ -309,6 +311,7 @@ defmodule Membrane.Bin do alias unquote(__MODULE__) @behaviour unquote(__MODULE__) @before_compile unquote(__MODULE__) + @after_compile {Membrane.Core.Parent, :check_deprecated_callbacks} unquote(bring_spec) unquote(bring_pad) @@ -354,9 +357,6 @@ defmodule Membrane.Bin do {[], state} end - @impl true - def handle_spec_started(new_children, _ctx, state), do: {[], state} - @impl true def handle_element_start_of_stream(_element, _pad, _ctx, state), do: {[], state} @@ -381,7 +381,6 @@ defmodule Membrane.Bin do handle_setup: 2, handle_playing: 2, handle_info: 3, - handle_spec_started: 3, handle_element_start_of_stream: 4, handle_element_end_of_stream: 4, handle_child_notification: 4, diff --git a/lib/membrane/core/callback_handler.ex b/lib/membrane/core/callback_handler.ex index 30cb8d3bc..91729aef0 100644 --- a/lib/membrane/core/callback_handler.ex +++ b/lib/membrane/core/callback_handler.ex @@ -8,6 +8,7 @@ defmodule Membrane.Core.CallbackHandler do use Bunch alias Membrane.CallbackError + alias Membrane.Core.Component require Membrane.Logger @@ -191,6 +192,13 @@ defmodule Membrane.Core.CallbackHandler do was_handling_action? = state.handling_action? state = %{state | handling_action?: true} + # Updating :supplying_demand? flag value here is a temporal fix. + # Setting it to `true` while handling actions causes postponing calls + # of handle_redemand/2 and supply_demand/2 until a moment, when all + # actions returned from the callback are handled + was_supplying_demand? = Map.get(state, :supplying_demand?, false) + state = if Component.is_element?(state), do: %{state | supplying_demand?: true}, else: state + state = Enum.reduce(actions, state, fn action, state -> try do @@ -210,6 +218,11 @@ defmodule Membrane.Core.CallbackHandler do do: state, else: %{state | handling_action?: false} + state = + if Component.is_element?(state) and not was_supplying_demand?, + do: %{state | supplying_demand?: false}, + else: state + handler_module.handle_end_of_actions(state) end end diff --git a/lib/membrane/core/element.ex b/lib/membrane/core/element.ex index b6500a593..8f673455f 100644 --- a/lib/membrane/core/element.ex +++ b/lib/membrane/core/element.ex @@ -260,20 +260,7 @@ defmodule Membrane.Core.Element do end defp do_handle_info(Message.new(:timer_tick, timer_id), state) do - # Guarding the `TimerController.handle_tick/2` invocation is - # required since there might be a case in which `handle_tick` - # callback's implementation returns demand action. - # In this scenario, without this guard, there would a possibility that - # the `handle_buffer` would be called immediately, returning - # some action that would affect the timer and the original state - # of the timer, set with actions returned from `handle_tick`, - # would be overwritten with that action. - # - # For more information see: https://github.com/membraneframework/membrane_core/issues/670 - state = %{state | supplying_demand?: true} state = TimerController.handle_tick(timer_id, state) - state = %{state | supplying_demand?: false} - state = Membrane.Core.Element.DemandHandler.handle_delayed_demands(state) {:noreply, state} end diff --git a/lib/membrane/core/element/action_handler.ex b/lib/membrane/core/element/action_handler.ex index 759237203..3650b58ad 100644 --- a/lib/membrane/core/element/action_handler.ex +++ b/lib/membrane/core/element/action_handler.ex @@ -38,22 +38,47 @@ defmodule Membrane.Core.Element.ActionHandler do require Membrane.Logger @impl CallbackHandler - def transform_actions(actions, callback, _handler_params, state) do + def transform_actions(actions, _callback, _handler_params, state) do actions = join_buffers(actions) - ensure_nothing_after_redemand(actions, callback, state) {actions, state} end defguardp is_demand_size(size) when is_integer(size) or is_function(size) @impl CallbackHandler - def handle_end_of_actions(state) when not state.handling_action? do - Enum.reduce(state.pads_to_snapshot, state, &DemandController.snapshot_atomic_demand/2) - |> Map.put(:pads_to_snapshot, MapSet.new()) + def handle_end_of_actions(state) do + # Fixed order of handling demand of manual and auto pads would lead to + # favoring manual pads over auto pads (or vice versa), especially after + # introducting auto flow queues. + manual_demands_first? = Enum.random([1, 2]) == 1 + + state = + if manual_demands_first?, + do: maybe_handle_delayed_demands(state), + else: state + + state = maybe_handle_pads_to_snapshot(state) + + state = + if manual_demands_first?, + do: state, + else: maybe_handle_delayed_demands(state) + + state end - @impl CallbackHandler - def handle_end_of_actions(state), do: state + defp maybe_handle_delayed_demands(state) do + with %{supplying_demand?: false} <- state do + DemandHandler.handle_delayed_demands(state) + end + end + + defp maybe_handle_pads_to_snapshot(state) do + with %{handling_action?: false} <- state do + Enum.reduce(state.pads_to_snapshot, state, &DemandController.snapshot_atomic_demand/2) + |> Map.put(:pads_to_snapshot, MapSet.new()) + end + end @impl CallbackHandler def handle_action({action, _}, :handle_init, _params, _state) @@ -284,30 +309,6 @@ defmodule Membrane.Core.Element.ActionHandler do ) end - defp ensure_nothing_after_redemand(actions, callback, state) do - {redemands, actions_after_redemands} = - actions - |> Enum.drop_while(fn - {:redemand, _args} -> false - _other_action -> true - end) - |> Enum.split_while(fn - {:redemand, _args} -> true - _other_action -> false - end) - - case {redemands, actions_after_redemands} do - {_redemands, []} -> - :ok - - {[redemand | _redemands], _actions_after_redemands} -> - raise ActionError, - reason: :actions_after_redemand, - action: redemand, - callback: {state.module, callback} - end - end - @spec send_buffer(Pad.ref(), [Buffer.t()] | Buffer.t(), State.t()) :: State.t() defp send_buffer(_pad_ref, [], state) do state @@ -470,7 +471,8 @@ defmodule Membrane.Core.Element.ActionHandler do @spec handle_outgoing_event(Pad.ref(), Event.t(), State.t()) :: State.t() defp handle_outgoing_event(pad_ref, %Events.EndOfStream{}, state) do with %{direction: :output, end_of_stream?: false} <- PadModel.get_data!(state, pad_ref) do - Map.update!(state, :satisfied_auto_output_pads, &MapSet.delete(&1, pad_ref)) + DemandHandler.remove_pad_from_delayed_demands(pad_ref, state) + |> Map.update!(:satisfied_auto_output_pads, &MapSet.delete(&1, pad_ref)) |> PadModel.set_data!(pad_ref, :end_of_stream?, true) |> AutoFlowUtils.pop_queues_and_bump_demand() else diff --git a/lib/membrane/core/element/demand_handler.ex b/lib/membrane/core/element/demand_handler.ex index 74c6ce05a..1d22d7eb9 100644 --- a/lib/membrane/core/element/demand_handler.ex +++ b/lib/membrane/core/element/demand_handler.ex @@ -146,6 +146,15 @@ defmodule Membrane.Core.Element.DemandHandler do end end + @spec remove_pad_from_delayed_demands(Pad.ref(), State.t()) :: State.t() + def remove_pad_from_delayed_demands(pad_ref, state) do + Map.update!(state, :delayed_demands, fn delayed_demands_set -> + delayed_demands_set + |> MapSet.delete({pad_ref, :supply}) + |> MapSet.delete({pad_ref, :redemand}) + end) + end + @spec handle_input_queue_output( Pad.ref(), [InputQueue.output_value()], diff --git a/lib/membrane/core/element/event_controller.ex b/lib/membrane/core/element/event_controller.ex index e4aff679e..f6b59a34b 100644 --- a/lib/membrane/core/element/event_controller.ex +++ b/lib/membrane/core/element/event_controller.ex @@ -12,6 +12,7 @@ defmodule Membrane.Core.Element.EventController do alias Membrane.Core.Element.{ ActionHandler, CallbackContext, + DemandHandler, InputQueue, PlaybackQueue, State @@ -108,7 +109,8 @@ defmodule Membrane.Core.Element.EventController do Membrane.Logger.debug("Received end of stream on pad #{inspect(pad_ref)}") state = - PadModel.set_data!(state, pad_ref, :end_of_stream?, true) + DemandHandler.remove_pad_from_delayed_demands(pad_ref, state) + |> PadModel.set_data!(pad_ref, :end_of_stream?, true) |> Map.update!(:awaiting_auto_input_pads, &MapSet.delete(&1, pad_ref)) |> Map.update!(:auto_input_pads, &List.delete(&1, pad_ref)) diff --git a/lib/membrane/core/lifecycle_controller.ex b/lib/membrane/core/lifecycle_controller.ex index cbfc310cd..5e4e03f72 100644 --- a/lib/membrane/core/lifecycle_controller.ex +++ b/lib/membrane/core/lifecycle_controller.ex @@ -14,12 +14,16 @@ defmodule Membrane.Core.LifecycleController do def handle_setup_operation(operation, state) do :ok = assert_operation_allowed!(operation, state.setup_incomplete?) - case operation do - :incomplete -> + cond do + operation == :incomplete -> Membrane.Logger.debug("Component deferred initialization") %{state | setup_incomplete?: true} - :complete -> + Component.is_pipeline?(state) -> + # complete_setup/1 will be called in Membrane.Core.Pipeline.ActionHandler.handle_end_of_actions/1 + %{state | awaiting_setup_completition?: true} + + Component.is_child?(state) -> complete_setup(state) end end @@ -52,5 +56,13 @@ defmodule Membrane.Core.LifecycleController do """ end + defp assert_operation_allowed!(operation, _status) + when operation not in [:incomplete, :complete] do + raise SetupError, """ + Action {:setup, #{inspect(operation)}} was returned, but second element in the tuple must + be :complete or :incomplete + """ + end + defp assert_operation_allowed!(_operation, _status), do: :ok end diff --git a/lib/membrane/core/parent.ex b/lib/membrane/core/parent.ex index 4f0fb5f8a..f4471b697 100644 --- a/lib/membrane/core/parent.ex +++ b/lib/membrane/core/parent.ex @@ -2,4 +2,21 @@ defmodule Membrane.Core.Parent do @moduledoc false @type state :: Membrane.Core.Bin.State.t() | Membrane.Core.Pipeline.State.t() + + @spec check_deprecated_callbacks(Macro.Env.t(), binary) :: :ok + def check_deprecated_callbacks(env, _bytecode) do + modules_whitelist = [Membrane.Testing.Pipeline] + + if env.module not in modules_whitelist and + Module.defines?(env.module, {:handle_spec_started, 3}, :def) do + warn_message = """ + Callback handle_spec_started/3 has been deprecated since \ + :membrane_core v1.1.0-rc0, but it is implemented in #{inspect(env.module)} + """ + + IO.warn(warn_message, []) + end + + :ok + end end diff --git a/lib/membrane/core/parent/child_life_controller/startup_utils.ex b/lib/membrane/core/parent/child_life_controller/startup_utils.ex index a4d4d8884..70cc96de8 100644 --- a/lib/membrane/core/parent/child_life_controller/startup_utils.ex +++ b/lib/membrane/core/parent/child_life_controller/startup_utils.ex @@ -104,15 +104,20 @@ defmodule Membrane.Core.Parent.ChildLifeController.StartupUtils do @spec exec_handle_spec_started([Membrane.Child.name()], Parent.state()) :: Parent.state() def exec_handle_spec_started(children_names, state) do - action_handler = Component.action_handler(state) - - CallbackHandler.exec_and_handle_callback( - :handle_spec_started, - action_handler, - %{context: &Component.context_from_state/1}, - [children_names], + # handle_spec_started/3 callback is deprecated, so we don't require its implementation + if function_exported?(state.module, :handle_spec_started, 3) do + action_handler = Component.action_handler(state) + + CallbackHandler.exec_and_handle_callback( + :handle_spec_started, + action_handler, + %{context: &Component.context_from_state/1}, + [children_names], + state + ) + else state - ) + end end @spec check_if_children_names_and_children_groups_ids_are_unique( diff --git a/lib/membrane/core/pipeline/action_handler.ex b/lib/membrane/core/pipeline/action_handler.ex index 92065ed43..382a4f5a8 100644 --- a/lib/membrane/core/pipeline/action_handler.ex +++ b/lib/membrane/core/pipeline/action_handler.ex @@ -8,6 +8,8 @@ defmodule Membrane.Core.Pipeline.ActionHandler do alias Membrane.Core.Parent.LifecycleController alias Membrane.Core.Pipeline.State + require Membrane.Logger + @impl CallbackHandler def handle_action({:spec, args}, _cb, _params, %State{terminating?: true}) do raise Membrane.ParentError, @@ -103,4 +105,12 @@ defmodule Membrane.Core.Pipeline.ActionHandler do def handle_action(action, _callback, _params, _state) do raise ActionError, action: action, reason: {:unknown_action, Membrane.Pipeline.Action} end + + @impl CallbackHandler + def handle_end_of_actions(state) do + with %{awaiting_setup_completition?: true} <- state do + %{state | awaiting_setup_completition?: false} + |> Membrane.Core.LifecycleController.complete_setup() + end + end end diff --git a/lib/membrane/core/pipeline/state.ex b/lib/membrane/core/pipeline/state.ex index 59ecaded7..6f644947b 100644 --- a/lib/membrane/core/pipeline/state.ex +++ b/lib/membrane/core/pipeline/state.ex @@ -34,7 +34,8 @@ defmodule Membrane.Core.Pipeline.State do setup_incomplete?: boolean(), handling_action?: boolean(), stalker: Membrane.Core.Stalker.t(), - subprocess_supervisor: pid() + subprocess_supervisor: pid(), + awaiting_setup_completition?: boolean() } # READ THIS BEFORE ADDING NEW FIELD!!! @@ -58,5 +59,6 @@ defmodule Membrane.Core.Pipeline.State do handling_action?: false, stalker: nil, resource_guard: nil, - subprocess_supervisor: nil + subprocess_supervisor: nil, + awaiting_setup_completition?: false end diff --git a/lib/membrane/core/timer.ex b/lib/membrane/core/timer.ex index 5a23c957c..bd4eab23f 100644 --- a/lib/membrane/core/timer.ex +++ b/lib/membrane/core/timer.ex @@ -15,11 +15,13 @@ defmodule Membrane.Core.Timer do clock: Clock.t(), next_tick_time: Time.t(), ratio: Clock.ratio(), - timer_ref: reference() | nil + timer_ref: reference() | nil, + awaiting_message?: boolean() } @enforce_keys [:interval, :clock, :init_time, :id] - defstruct @enforce_keys ++ [next_tick_time: 0, ratio: Ratio.new(1), timer_ref: nil] + defstruct @enforce_keys ++ + [next_tick_time: 0, ratio: Ratio.new(1), timer_ref: nil, awaiting_message?: false] @spec start(id, interval, Clock.t()) :: t def start(id, interval, clock) do @@ -42,8 +44,14 @@ defmodule Membrane.Core.Timer do %__MODULE__{timer | ratio: ratio} end + @spec handle_message_arrived(t) :: t + def handle_message_arrived(%__MODULE__{awaiting_message?: true} = timer) do + %{timer | awaiting_message?: false} + end + @spec tick(t) :: t - def tick(%__MODULE__{interval: :no_interval} = timer) do + def tick(%__MODULE__{} = timer) + when timer.awaiting_message? or timer.interval == :no_interval do timer end @@ -67,7 +75,12 @@ defmodule Membrane.Core.Timer do timer_ref = Process.send_after(self(), Message.new(:timer_tick, id), beam_next_tick_time, abs: true) - %__MODULE__{timer | next_tick_time: next_tick_time |> Ratio.floor(), timer_ref: timer_ref} + %__MODULE__{ + timer + | next_tick_time: next_tick_time |> Ratio.floor(), + timer_ref: timer_ref, + awaiting_message?: true + } end @spec set_interval(t, interval) :: t diff --git a/lib/membrane/core/timer_controller.ex b/lib/membrane/core/timer_controller.ex index 85e154fcb..a5c5faa64 100644 --- a/lib/membrane/core/timer_controller.ex +++ b/lib/membrane/core/timer_controller.ex @@ -58,6 +58,13 @@ defmodule Membrane.Core.TimerController do @spec handle_tick(Timer.id(), Component.state()) :: Component.state() def handle_tick(timer_id, state) when is_timer_present(timer_id, state) do + state = + update_in( + state, + [:synchronization, :timers, timer_id], + &Timer.handle_message_arrived/1 + ) + state = CallbackHandler.exec_and_handle_callback( :handle_tick, diff --git a/lib/membrane/pipeline.ex b/lib/membrane/pipeline.ex index e3da30050..491c76289 100644 --- a/lib/membrane/pipeline.ex +++ b/lib/membrane/pipeline.ex @@ -208,9 +208,11 @@ defmodule Membrane.Pipeline do ) :: {[Action.common_actions()], state()} @doc """ + This callback is deprecated since v1.1.0-rc0. + Callback invoked when children of `Membrane.ChildrenSpec` are started. - By default, it does nothing. + It is invoked, only if pipeline module contains its definition. Otherwise, nothing happens. """ @callback handle_spec_started( children :: [Child.name()], @@ -469,6 +471,7 @@ defmodule Membrane.Pipeline do alias unquote(__MODULE__) require Membrane.Logger @behaviour unquote(__MODULE__) + @after_compile {Membrane.Core.Parent, :check_deprecated_callbacks} unquote(bring_spec) unquote(bring_pad) @@ -512,9 +515,6 @@ defmodule Membrane.Pipeline do {[], state} end - @impl true - def handle_spec_started(new_children, _ctx, state), do: {[], state} - @impl true def handle_element_start_of_stream(_element, _pad, _ctx, state), do: {[], state} @@ -538,7 +538,6 @@ defmodule Membrane.Pipeline do handle_setup: 2, handle_playing: 2, handle_info: 3, - handle_spec_started: 3, handle_element_start_of_stream: 4, handle_element_end_of_stream: 4, handle_child_notification: 4, diff --git a/lib/membrane/testing/pipeline.ex b/lib/membrane/testing/pipeline.ex index 7d4084ef5..73becaa8c 100644 --- a/lib/membrane/testing/pipeline.ex +++ b/lib/membrane/testing/pipeline.ex @@ -522,7 +522,12 @@ defmodule Membrane.Testing.Pipeline do do: {[], nil} defp eval_injected_module_callback(callback, args, state) do - apply(state.module, callback, args ++ [state.custom_pipeline_state]) + if callback != :handle_spec_started or + function_exported?(state.module, :handle_spec_started, 3) do + apply(state.module, callback, args ++ [state.custom_pipeline_state]) + else + {[], state.custom_pipeline_state} + end end defp notify_test_process(test_process, message) do diff --git a/mix.exs b/mix.exs index ac1df050b..7f7033497 100644 --- a/mix.exs +++ b/mix.exs @@ -1,7 +1,7 @@ defmodule Membrane.Mixfile do use Mix.Project - @version "1.0.1" + @version "1.1.0-rc0" @source_ref "v#{@version}" def project do diff --git a/test/membrane/core/element/action_handler_test.exs b/test/membrane/core/element/action_handler_test.exs index 5a82d92cd..06b1913ae 100644 --- a/test/membrane/core/element/action_handler_test.exs +++ b/test/membrane/core/element/action_handler_test.exs @@ -546,16 +546,5 @@ defmodule Membrane.Core.Element.ActionHandlerTest do end ) end - - test "when :redemand is not the last action", %{state: state} do - assert_raise ActionError, ~r/redemand.*last/i, fn -> - @module.transform_actions( - [redemand: :output, notify_parent: :a, notify_parent: :b], - :handle_other, - %{}, - state - ) - end - end end end diff --git a/test/membrane/core/element/event_controller_test.exs b/test/membrane/core/element/event_controller_test.exs index 356501e6d..27928d271 100644 --- a/test/membrane/core/element/event_controller_test.exs +++ b/test/membrane/core/element/event_controller_test.exs @@ -51,7 +51,10 @@ defmodule Membrane.Core.Element.EventControllerTest do parent_pid: self(), synchronization: %{clock: nil, parent_clock: nil, stream_sync: nil}, handling_action?: false, + supplying_demand?: false, pads_to_snapshot: MapSet.new(), + delayed_demands: MapSet.new(), + handle_demand_loop_counter: 0, pads_data: %{ input: struct(Membrane.Element.PadData, diff --git a/test/membrane/core/element/lifecycle_controller_test.exs b/test/membrane/core/element/lifecycle_controller_test.exs index 53d06f2cd..3a9be602b 100644 --- a/test/membrane/core/element/lifecycle_controller_test.exs +++ b/test/membrane/core/element/lifecycle_controller_test.exs @@ -50,7 +50,9 @@ defmodule Membrane.Core.Element.LifecycleControllerTest do parent_pid: self(), synchronization: %{clock: nil, parent_clock: nil}, handling_action?: false, + supplying_demand?: false, pads_to_snapshot: MapSet.new(), + delayed_demands: MapSet.new(), pads_data: %{ input: struct(Membrane.Element.PadData, diff --git a/test/membrane/core/element/pad_controller_test.exs b/test/membrane/core/element/pad_controller_test.exs index e52ea842b..82b6a5926 100644 --- a/test/membrane/core/element/pad_controller_test.exs +++ b/test/membrane/core/element/pad_controller_test.exs @@ -18,7 +18,10 @@ defmodule Membrane.Core.Element.PadControllerTest do struct!(State, name: name, module: elem_module, + handling_action?: false, + supplying_demand?: false, pads_to_snapshot: MapSet.new(), + delayed_demands: MapSet.new(), parent_pid: self(), internal_state: %{}, synchronization: %{clock: nil, parent_clock: nil}, diff --git a/test/membrane/core/element/stream_format_controller_test.exs b/test/membrane/core/element/stream_format_controller_test.exs index 1235e26a3..d94e93030 100644 --- a/test/membrane/core/element/stream_format_controller_test.exs +++ b/test/membrane/core/element/stream_format_controller_test.exs @@ -42,7 +42,10 @@ defmodule Membrane.Core.Element.StreamFormatControllerTest do playback: :playing, synchronization: %{clock: nil, parent_clock: nil}, handling_action?: false, + supplying_demand?: false, pads_to_snapshot: MapSet.new(), + delayed_demands: MapSet.new(), + handle_demand_loop_counter: 0, pads_data: %{ input: struct(Membrane.Element.PadData, diff --git a/test/membrane/core/pipeline_test.exs b/test/membrane/core/pipeline_test.exs index 5ef8f2f7d..d59fc2b2c 100644 --- a/test/membrane/core/pipeline_test.exs +++ b/test/membrane/core/pipeline_test.exs @@ -79,6 +79,7 @@ defmodule Membrane.Core.PipelineTest do [], state ) + |> ActionHandler.handle_end_of_actions() end end @@ -92,6 +93,7 @@ defmodule Membrane.Core.PipelineTest do [], state ) + |> ActionHandler.handle_end_of_actions() end end end diff --git a/test/membrane/integration/actions_handling_order_test.exs b/test/membrane/integration/actions_handling_order_test.exs new file mode 100644 index 000000000..f54ac0d49 --- /dev/null +++ b/test/membrane/integration/actions_handling_order_test.exs @@ -0,0 +1,191 @@ +defmodule Membrane.Integration.ActionsHandlingOrderTest do + use ExUnit.Case, async: true + + import Membrane.ChildrenSpec + import Membrane.Testing.Assertions + + alias Membrane.Testing + + defmodule TickingPipeline do + use Membrane.Pipeline + + @tick_time Membrane.Time.milliseconds(100) + + @impl true + def handle_init(_ctx, test_process: test_process), + do: {[], %{ticked?: false, test_process: test_process}} + + @impl true + def handle_setup(_ctx, state) do + {[setup: :incomplete, start_timer: {:one, @tick_time}], state} + end + + @impl true + def handle_playing(_ctx, state) do + {[timer_interval: {:one, @tick_time}], state} + end + + @impl true + def handle_tick(:one, _ctx, %{ticked?: false} = state) do + {[setup: :complete, timer_interval: {:one, :no_interval}], %{state | ticked?: true}} + end + + @impl true + def handle_tick(:one, _ctx, state) do + send(state.test_process, :ticked_two_times) + {[timer_interval: {:one, :no_interval}], state} + end + end + + defmodule NotifyingPipeline do + use Membrane.Pipeline + + alias Membrane.Integration.ActionsHandlingOrderTest.NotifyingPipelineChild + + @impl true + def handle_init(_ctx, _opts) do + spec = child(:child, NotifyingPipelineChild) + {[spec: spec], %{}} + end + + @impl true + def handle_setup(_ctx, state) do + self() |> send(:time_to_play) + {[setup: :incomplete], state} + end + + @impl true + def handle_playing(_ctx, state) do + {[notify_child: {:child, :second_notification}], state} + end + + @impl true + def handle_info(:time_to_play, _ctx, state) do + {[setup: :complete, notify_child: {:child, :first_notification}], state} + end + + @impl true + def handle_info({:get_notifications, test_process}, _ctx, state) do + actions = [notify_child: {:child, :get_notifications}] + state = Map.put(state, :test_process, test_process) + + {actions, state} + end + + @impl true + def handle_child_notification(notifications, :child, _ctx, state) do + send(state.test_process, {:notifications, notifications}) + {[], state} + end + end + + defmodule NotifyingPipelineChild do + use Membrane.Filter + + @impl true + def handle_init(_ctx, _opts), do: {[], %{}} + + @impl true + def handle_parent_notification(:get_notifications, _ctx, state) do + {[notify_parent: state.notifications], state} + end + + @impl true + def handle_parent_notification(notification, _ctx, state) do + state = Map.update(state, :notifications, [notification], &(&1 ++ [notification])) + {[], state} + end + end + + defmodule TickingSink do + use Membrane.Sink + + @short_tick_time Membrane.Time.milliseconds(100) + @long_tick_time Membrane.Time.seconds(2) + + def_input_pad :input, flow_control: :manual, demand_unit: :buffers, accepted_format: _any + + @impl true + def handle_init(_ctx, _opts), do: {[], %{tick_counter: 0}} + + @impl true + def handle_parent_notification(:start_timer, _ctx, state) do + {[start_timer: {:timer, @short_tick_time}], state} + end + + @impl true + def handle_tick(:timer, _ctx, %{tick_counter: 0} = state) do + actions = [ + demand: {:input, 1}, + timer_interval: {:timer, :no_interval} + ] + + {actions, %{state | tick_counter: 1}} + end + + @impl true + def handle_tick(:timer, _ctx, %{tick_counter: 1} = state) do + actions = [ + notify_parent: :second_tick, + timer_interval: {:timer, @long_tick_time} + ] + + {actions, %{state | tick_counter: 2}} + end + + @impl true + def handle_tick(:timer, _ctx, %{tick_counter: 2} = state) do + {[notify_parent: :third_tick], %{state | tick_counter: 3}} + end + + @impl true + def handle_buffer(:input, _buffer, _ctx, state) do + {[timer_interval: {:timer, @short_tick_time}], state} + end + end + + test "order of handling :tick action" do + {:ok, _supervisor, pipeline} = + Membrane.Pipeline.start_link(TickingPipeline, test_process: self()) + + assert_receive :ticked_two_times + + Membrane.Pipeline.terminate(pipeline) + end + + test "order of handling :notify_child action" do + {:ok, _supervisor, pipeline} = Membrane.Pipeline.start_link(NotifyingPipeline) + + # time for pipeline to play + Process.sleep(500) + + send(pipeline, {:get_notifications, self()}) + + assert_receive {:notifications, [:first_notification, :second_notification]} + + Membrane.Pipeline.terminate(pipeline) + end + + test ":demand and :timer_interval actions don't interact with each other" do + spec = + child(:source, %Testing.Source{output: [<<>>]}) + |> child(:sink, TickingSink) + + pipeline = Testing.Pipeline.start_link_supervised!(spec: spec) + + # time for pipeline to play + Process.sleep(100) + + Testing.Pipeline.message_child(pipeline, :sink, :start_timer) + + assert_pipeline_notified(pipeline, :sink, :second_tick) + + # third tick should arrive after two seconds, not ealier + refute_pipeline_notified(pipeline, :sink, :third_tick, 1_500) + assert_pipeline_notified(pipeline, :sink, :third_tick) + + assert Testing.Pipeline.get_child_pid!(pipeline, :source) |> Process.alive?() + + Testing.Pipeline.terminate(pipeline) + end +end diff --git a/test/membrane/integration/demands_test.exs b/test/membrane/integration/demands_test.exs index 3f3b30b2c..dacd0d60a 100644 --- a/test/membrane/integration/demands_test.exs +++ b/test/membrane/integration/demands_test.exs @@ -15,7 +15,8 @@ defmodule Membrane.Integration.DemandsTest do defp assert_buffers_received(range, pid) do Enum.each(range, fn i -> - assert_sink_buffer(pid, :sink, %Buffer{payload: <<^i::16>> <> <<255>>}) + assert_sink_buffer(pid, :sink, buffer) + assert %Buffer{payload: <<^i::16>> <> <<255>>} = buffer end) end diff --git a/test/membrane/integration/linking_test.exs b/test/membrane/integration/linking_test.exs index b23cde06c..a2a9e5081 100644 --- a/test/membrane/integration/linking_test.exs +++ b/test/membrane/integration/linking_test.exs @@ -110,12 +110,6 @@ defmodule Membrane.Integration.LinkingTest do def handle_info(_msg, _ctx, state) do {[], state} end - - @impl true - def handle_spec_started(_children, _ctx, state) do - send(state.testing_pid, :spec_started) - {[], state} - end end setup do @@ -139,12 +133,13 @@ defmodule Membrane.Integration.LinkingTest do ] send(pipeline, {:start_spec, %{spec: spec}}) - assert_receive(:spec_started) assert_sink_buffer(pipeline, :sink, %Buffer{payload: ~c"a"}) assert_sink_buffer(pipeline, :sink, %Buffer{payload: ~c"b"}) assert_sink_buffer(pipeline, :sink, %Buffer{payload: ~c"c"}) send(pipeline, {:remove_children, :sink}) assert_pipeline_notified(pipeline, :bin, :handle_pad_removed) + + Membrane.Pipeline.terminate(pipeline) end test "and element crashes, bin forwards the unlink message to child", %{pipeline: pipeline} do @@ -166,10 +161,7 @@ defmodule Membrane.Integration.LinkingTest do ] send(pipeline, {:start_spec, %{spec: bin_spec}}) - assert_receive(:spec_started) - send(pipeline, {:start_spec, %{spec: sink_spec}}) - assert_receive(:spec_started) sink_pid = get_child_pid(:sink, pipeline) bin_pid = get_child_pid(:bin, pipeline) @@ -188,6 +180,8 @@ defmodule Membrane.Integration.LinkingTest do match?(%Membrane.PadError{}, error) assert error.message =~ ~r/static.*pad.*unlink/u + + Membrane.Pipeline.terminate(pipeline) end end @@ -208,13 +202,12 @@ defmodule Membrane.Integration.LinkingTest do ] send(pipeline, {:start_spec, %{spec: spec}}) - send(pipeline, {:kill, [:sink]}) - assert_receive(:spec_started) - assert_pipeline_crash_group_down(pipeline, :group_1) assert_pipeline_crash_group_down(pipeline, :group_2) + + Membrane.Pipeline.terminate(pipeline) end test "element shouldn't crash when its neighbor connected via dynamic pad crashes", %{ @@ -237,12 +230,12 @@ defmodule Membrane.Integration.LinkingTest do spec = [spec_1, spec_2, links_spec] send(pipeline, {:start_spec, %{spec: spec}}) - assert_receive(:spec_started) - send(pipeline, {:kill, [:sink]}) refute_pipeline_crash_group_down(pipeline, :group_1) assert_pipeline_crash_group_down(pipeline, :group_2) + + Membrane.Pipeline.terminate(pipeline) end test "element shouldn't crash when its neighbor connected via dynamic pad crashes and the crash groups are set within nested spec", @@ -264,31 +257,12 @@ defmodule Membrane.Integration.LinkingTest do |> get_child(:sink) send(pipeline, {:start_spec, %{spec: [spec, links_spec]}}) - assert_receive(:spec_started) - send(pipeline, {:kill, [:sink]}) refute_pipeline_crash_group_down(pipeline, :group_1) assert_pipeline_crash_group_down(pipeline, :group_2) - end - - test "pipeline playback should change successfully after spec with links has been returned", - %{pipeline: pipeline} do - bin_spec = { - child(:bin, %Bin{child: %Testing.Source{output: [~c"a", ~c"b", ~c"c"]}}), - group: :group_1, crash_group_mode: :temporary - } - - sink_spec = { - child(:sink, Testing.Sink), - group: :group_1, crash_group_mode: :temporary - } - links_spec = get_child(:bin) |> get_child(:sink) - - spec = [bin_spec, sink_spec, links_spec] - send(pipeline, {:start_spec, %{spec: spec}}) - assert_receive(:spec_started) + Membrane.Pipeline.terminate(pipeline) end defmodule SlowSetupSink do @@ -349,6 +323,8 @@ defmodule Membrane.Integration.LinkingTest do {element, pad} end) + + Membrane.Pipeline.terminate(pipeline) end test "Elements and bins can be spawned, linked and removed" do @@ -507,6 +483,8 @@ defmodule Membrane.Integration.LinkingTest do refute_link_removed(pipeline, i) end end + + Membrane.Pipeline.terminate(pipeline) end describe "Spec shouldn't wait on links with" do diff --git a/test/membrane/integration/toilet_forwarding_test.exs b/test/membrane/integration/toilet_forwarding_test.exs index 1f240dfff..37097add8 100644 --- a/test/membrane/integration/toilet_forwarding_test.exs +++ b/test/membrane/integration/toilet_forwarding_test.exs @@ -234,7 +234,8 @@ defmodule Membrane.Integration.ToiletForwardingTest do ) for i <- 1..3000 do - assert_sink_buffer(pipeline, :sink, %Membrane.Buffer{payload: <>}) + assert_sink_buffer(pipeline, :sink, buffer) + assert %Membrane.Buffer{payload: <>} = buffer assert buff_idx == i end From 2ed8f4d319a114c014f53cdd0a6522c09d46e2a6 Mon Sep 17 00:00:00 2001 From: Jakub Pryc <94321002+Noarkhh@users.noreply.github.com> Date: Thu, 14 Mar 2024 13:28:45 +0100 Subject: [PATCH 11/11] Update demand type to allow for the default demand size syntax (#768) * Update demand type to allow for the default demand size syntax --- lib/membrane/element/action.ex | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/lib/membrane/element/action.ex b/lib/membrane/element/action.ex index 055821142..0d5db32a9 100644 --- a/lib/membrane/element/action.ex +++ b/lib/membrane/element/action.ex @@ -82,11 +82,12 @@ defmodule Membrane.Element.Action do guaranteed not to receive more data than demanded. Demand size can be either a non-negative integer, that overrides existing demand, - or a function that is passed current demand, and is to return the new demand. + or a function that is passed current demand, and is to return the new demand. In case only pad + is specified, the demand size defaults to 1. Allowed only when playback is playing. """ - @type demand :: {:demand, {Pad.ref(), demand_size}} + @type demand :: {:demand, {Pad.ref(), demand_size} | Pad.ref()} @type demand_size :: pos_integer | (pos_integer() -> non_neg_integer()) @typedoc """