From 1b9fe94151af37f170ed5e37012fa87554dcd51c Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Wed, 22 May 2024 13:22:36 +0200 Subject: [PATCH] Implement CR --- CHANGELOG.md | 2 +- lib/membrane/bin.ex | 24 +++++------ .../core/parent/child_life_controller.ex | 34 ++++++++++++--- .../child_life_controller/startup_utils.ex | 33 ++++---------- .../core/parent/lifecycle_controller.ex | 9 +--- lib/membrane/pipeline.ex | 20 ++++----- lib/membrane/testing/pipeline.ex | 22 +++++++--- .../integration/spec_callbacks_test.exs | 43 +++++++++---------- 8 files changed, 98 insertions(+), 89 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0b13b6351..89c44f407 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,7 @@ # Changelog ## 1.1.0-rc1 - * Add new callbacks `handle_spec_setup_completed/3` and `handle_spec_playing/3` in Bins and Pipelines. [#801](https://github.com/membraneframework/membrane_core/pull/801) + * Add new callbacks `handle_child_setup_completed/3` and `handle_child_playing/3` in Bins and Pipelines. [#801](https://github.com/membraneframework/membrane_core/pull/801) ## 1.1.0-rc0 * Deprecate `handle_spec_started/3` callback in Bins and Pipelines. [#708](https://github.com/membraneframework/membrane_core/pull/708) diff --git a/lib/membrane/bin.ex b/lib/membrane/bin.ex index c11347261..767e6744b 100644 --- a/lib/membrane/bin.ex +++ b/lib/membrane/bin.ex @@ -180,23 +180,23 @@ defmodule Membrane.Bin do ) :: callback_return @doc """ - Callback invoked when all children of `Membrane.ChildrenSpec` complete their setup. + Callback invoked when a child complete its setup. By default, it does nothing. """ - @callback handle_spec_setup_completed( - children :: [Child.name()], + @callback handle_child_setup_completed( + child :: Child.name(), context :: CallbackContext.t(), state ) :: callback_return @doc """ - Callback invoked when children of `Membrane.ChildrenSpec` enter `playing` playback. + Callback invoked when a child enters `playing` playback. By default, it does nothing. """ - @callback handle_spec_playing( - children :: [Child.name()], + @callback handle_child_playing( + child :: Child.name(), context :: CallbackContext.t(), state ) :: callback_return @@ -239,8 +239,8 @@ defmodule Membrane.Bin do handle_playing: 2, handle_info: 3, handle_spec_started: 3, - handle_spec_setup_completed: 3, - handle_spec_playing: 3, + handle_child_setup_completed: 3, + handle_child_playing: 3, handle_element_start_of_stream: 4, handle_element_end_of_stream: 4, handle_child_notification: 4, @@ -382,10 +382,10 @@ defmodule Membrane.Bin do end @impl true - def handle_spec_setup_completed(_children_names, _ctx, state), do: {[], state} + def handle_child_setup_completed(_child, _ctx, state), do: {[], state} @impl true - def handle_spec_playing(_children_names, _ctx, state), do: {[], state} + def handle_child_playing(_child, _ctx, state), do: {[], state} @impl true def handle_element_start_of_stream(_element, _pad, _ctx, state), do: {[], state} @@ -411,8 +411,8 @@ defmodule Membrane.Bin do handle_setup: 2, handle_playing: 2, handle_info: 3, - handle_spec_setup_completed: 3, - handle_spec_playing: 3, + handle_child_setup_completed: 3, + handle_child_playing: 3, handle_element_start_of_stream: 4, handle_element_end_of_stream: 4, handle_child_notification: 4, diff --git a/lib/membrane/core/parent/child_life_controller.ex b/lib/membrane/core/parent/child_life_controller.ex index 267be1d8c..e77308769 100644 --- a/lib/membrane/core/parent/child_life_controller.ex +++ b/lib/membrane/core/parent/child_life_controller.ex @@ -320,10 +320,7 @@ defmodule Membrane.Core.Parent.ChildLifeController do Enum.empty?(spec_data.dependent_specs) do Membrane.Logger.debug("Spec #{inspect(spec_ref)} status changed to initialized") - state = - MapSet.to_list(spec_data.children_names) - |> StartupUtils.exec_handle_spec_setup_completed(state) - + state = handle_children_setup_completed(spec_data.children_names, state) do_proceed_spec_startup(spec_ref, %{spec_data | status: :initialized}, state) else {spec_data, state} @@ -428,8 +425,7 @@ defmodule Membrane.Core.Parent.ChildLifeController do state = with %{playback: :playing} <- state do - MapSet.to_list(spec_data.children_names) - |> StartupUtils.exec_handle_spec_playing(state) + handle_children_playing(spec_data.children_names, state) end {spec_data, state} @@ -630,6 +626,32 @@ defmodule Membrane.Core.Parent.ChildLifeController do } end + @spec handle_children_setup_completed(MapSet.t(Child.name()) | [Child.name()], Parent.state()) :: + Parent.state() + def handle_children_setup_completed(children_names, state) do + exec_child_playback_related_callbacks(:handle_child_setup_completed, children_names, state) + end + + @spec handle_children_playing(MapSet.t(Child.name()) | [Child.name()], Parent.state()) :: + Parent.state() + def handle_children_playing(children_names, state) do + exec_child_playback_related_callbacks(:handle_child_playing, children_names, state) + end + + defp exec_child_playback_related_callbacks(callback, children_names, state) do + action_handler = Component.action_handler(state) + + Enum.reduce(children_names, state, fn child, state -> + CallbackHandler.exec_and_handle_callback( + callback, + action_handler, + %{context: &Component.context_from_state/1}, + [child], + state + ) + end) + end + @spec handle_child_pad_removed(Child.name(), Pad.ref(), Parent.state()) :: Parent.state() def handle_child_pad_removed(child, child_pad_ref, state) do Membrane.Logger.debug_verbose("Child #{inspect(child)} removed pad #{inspect(child_pad_ref)}") diff --git a/lib/membrane/core/parent/child_life_controller/startup_utils.ex b/lib/membrane/core/parent/child_life_controller/startup_utils.ex index 9123233bc..622273adc 100644 --- a/lib/membrane/core/parent/child_life_controller/startup_utils.ex +++ b/lib/membrane/core/parent/child_life_controller/startup_utils.ex @@ -108,35 +108,20 @@ defmodule Membrane.Core.Parent.ChildLifeController.StartupUtils do # handle_spec_started/3 callback is deprecated, so we don't require its implementation if function_exported?(state.module, callback_name, 3) do - exec_spec_related_callback(callback_name, children_names, state) + action_handler = Component.action_handler(state) + + CallbackHandler.exec_and_handle_callback( + callback_name, + action_handler, + %{context: &Component.context_from_state/1}, + [children_names], + state + ) else state end end - @spec exec_handle_spec_setup_completed([Membrane.Child.name()], Parent.state()) :: - Parent.state() - def exec_handle_spec_setup_completed(children_names, state) do - exec_spec_related_callback(:handle_spec_setup_completed, children_names, state) - end - - @spec exec_handle_spec_playing([Membrane.Child.name()], Parent.state()) :: Parent.state() - def exec_handle_spec_playing(children_names, state) do - exec_spec_related_callback(:handle_spec_playing, children_names, state) - end - - defp exec_spec_related_callback(callback, children_names, state) do - action_handler = Component.action_handler(state) - - CallbackHandler.exec_and_handle_callback( - callback, - action_handler, - %{context: &Component.context_from_state/1}, - [children_names], - state - ) - end - @spec check_if_children_names_and_children_groups_ids_are_unique( ChildLifeController.children_spec_canonical_form(), Parent.state() diff --git a/lib/membrane/core/parent/lifecycle_controller.ex b/lib/membrane/core/parent/lifecycle_controller.ex index 5404d574d..21a3f16ba 100644 --- a/lib/membrane/core/parent/lifecycle_controller.ex +++ b/lib/membrane/core/parent/lifecycle_controller.ex @@ -2,7 +2,6 @@ defmodule Membrane.Core.Parent.LifecycleController do @moduledoc false use Bunch - alias Membrane.Core.Parent.ChildLifeController.StartupUtils alias Membrane.{Child, ChildNotification, Core, Pad, Sync} alias Membrane.Core.{ @@ -14,7 +13,7 @@ defmodule Membrane.Core.Parent.LifecycleController do } alias Membrane.Core.Events - alias Membrane.Core.Parent.{ChildLifeController} + alias Membrane.Core.Parent.ChildLifeController require Membrane.Core.Component require Membrane.Core.Message @@ -66,11 +65,7 @@ defmodule Membrane.Core.Parent.LifecycleController do state ) - pinged_children - |> Enum.group_by(&state.children[&1].spec_ref) - |> Enum.reduce(state, fn {_spec_ref, children}, state -> - StartupUtils.exec_handle_spec_playing(children, state) - end) + ChildLifeController.handle_children_playing(pinged_children, state) end @spec handle_terminate_request(Parent.state()) :: Parent.state() diff --git a/lib/membrane/pipeline.ex b/lib/membrane/pipeline.ex index 06837debf..0b2869d2a 100644 --- a/lib/membrane/pipeline.ex +++ b/lib/membrane/pipeline.ex @@ -225,8 +225,8 @@ defmodule Membrane.Pipeline do By default, it does nothing. """ - @callback handle_spec_setup_completed( - children :: [Child.name()], + @callback handle_child_setup_completed( + child :: Child.name(), context :: CallbackContext.t(), state ) :: {[Action.common_actions()], state()} @@ -236,8 +236,8 @@ defmodule Membrane.Pipeline do By default, it does nothing. """ - @callback handle_spec_playing( - children :: [Child.name()], + @callback handle_child_playing( + child :: Child.name(), context :: CallbackContext.t(), state ) :: {[Action.common_actions()], state()} @@ -282,8 +282,8 @@ defmodule Membrane.Pipeline do handle_playing: 2, handle_info: 3, handle_spec_started: 3, - handle_spec_setup_completed: 3, - handle_spec_playing: 3, + handle_child_setup_completed: 3, + handle_child_playing: 3, handle_element_start_of_stream: 4, handle_element_end_of_stream: 4, handle_child_notification: 4, @@ -540,10 +540,10 @@ defmodule Membrane.Pipeline do end @impl true - def handle_spec_setup_completed(_children_names, _ctx, state), do: {[], state} + def handle_child_setup_completed(_child, _ctx, state), do: {[], state} @impl true - def handle_spec_playing(_children_names, _ctx, state), do: {[], state} + def handle_child_playing(_child, _ctx, state), do: {[], state} @impl true def handle_element_start_of_stream(_element, _pad, _ctx, state), do: {[], state} @@ -568,8 +568,8 @@ defmodule Membrane.Pipeline do handle_setup: 2, handle_playing: 2, handle_info: 3, - handle_spec_setup_completed: 3, - handle_spec_playing: 3, + handle_child_setup_completed: 3, + handle_child_playing: 3, handle_element_start_of_stream: 4, handle_element_end_of_stream: 4, handle_child_notification: 4, diff --git a/lib/membrane/testing/pipeline.ex b/lib/membrane/testing/pipeline.ex index 687db890c..eea46c3d2 100644 --- a/lib/membrane/testing/pipeline.ex +++ b/lib/membrane/testing/pipeline.ex @@ -396,18 +396,26 @@ defmodule Membrane.Testing.Pipeline do {custom_actions, Map.put(state, :custom_pipeline_state, custom_state)} end - [ - :handle_spec_started, - :handle_spec_setup_completed, - :handle_spec_playing - ] + @impl true + def handle_spec_started(children, ctx, %State{} = state) do + {custom_actions, custom_state} = + eval_injected_module_callback( + :handle_spec_started, + [children, ctx], + state + ) + + {custom_actions, Map.put(state, :custom_pipeline_state, custom_state)} + end + + [:handle_child_setup_completed, :handle_child_playing] |> Enum.map(fn callback -> @impl true - def unquote(callback)(children, ctx, %State{} = state) do + def unquote(callback)(child, ctx, %State{} = state) do {custom_actions, custom_state} = eval_injected_module_callback( unquote(callback), - [children, ctx], + [child, ctx], state ) diff --git a/test/membrane/integration/spec_callbacks_test.exs b/test/membrane/integration/spec_callbacks_test.exs index 886d369e1..3349f0fee 100644 --- a/test/membrane/integration/spec_callbacks_test.exs +++ b/test/membrane/integration/spec_callbacks_test.exs @@ -20,14 +20,14 @@ defmodule Membrane.SpecCallbacksTest do end @impl true - def handle_spec_setup_completed(children, _ctx, state) do - send(state.test_pid, {:setup_completed, children}) + def handle_child_setup_completed(child, _ctx, state) do + send(state.test_pid, {:setup_completed, child}) {[], state} end @impl true - def handle_spec_playing(children, _ctx, state) do - send(state.test_pid, {:playing, children}) + def handle_child_playing(child, _ctx, state) do + send(state.test_pid, {:playing, child}) {[], state} end end @@ -62,7 +62,7 @@ defmodule Membrane.SpecCallbacksTest do ] end - test "handle_spec_setup_completed and handle_spec_playing when children complete setup after parent" do + test "handle_child_setup_completed and handle_child_playing when a child completes setup after parent" do pipeline = Testing.Pipeline.start_link_supervised!( module: Pipeline, @@ -71,24 +71,24 @@ defmodule Membrane.SpecCallbacksTest do Testing.Pipeline.execute_actions(pipeline, pipeline_spec_actions()) - assert_receive {:setup_completed, [:a]} - assert_receive {:playing, [:a]} + assert_receive {:setup_completed, :a} + assert_receive {:playing, :a} refute_receive {:setup_completed, _any} refute_receive {:playing, _any} Testing.Pipeline.notify_child(pipeline, :b, :complete_setup) - assert_receive {:setup_completed, [:b]} - assert_receive {:playing, [:b]} + assert_receive {:setup_completed, :b} + assert_receive {:playing, :b} refute_receive {:setup_completed, _any} refute_receive {:playing, _any} Testing.Pipeline.notify_child(pipeline, :d, :complete_setup) - assert_receive {:setup_completed, children} - assert Enum.sort(children) == [:c, :d] - assert_receive {:playing, children} - assert Enum.sort(children) == [:c, :d] + assert_receive {:setup_completed, :c} + assert_receive {:setup_completed, :d} + assert_receive {:playing, :c} + assert_receive {:playing, :d} refute_receive {:setup_completed, _any} refute_receive {:playing, _any} @@ -96,7 +96,7 @@ defmodule Membrane.SpecCallbacksTest do :ok = Testing.Pipeline.terminate(pipeline) end - test "handle_spec_setup_completed and handle_spec_playing when children complete setup before parent" do + test "handle_child_setup_completed and handle_child_playing when a child completes setup before parent" do pipeline = Testing.Pipeline.start_link_supervised!( module: Pipeline, @@ -105,29 +105,28 @@ defmodule Membrane.SpecCallbacksTest do Testing.Pipeline.execute_actions(pipeline, pipeline_spec_actions()) - assert_receive {:setup_completed, [:a]} + assert_receive {:setup_completed, :a} refute_receive {:setup_completed, _any} refute_receive {:playing, _any} Testing.Pipeline.notify_child(pipeline, :b, :complete_setup) - assert_receive {:setup_completed, [:b]} + assert_receive {:setup_completed, :b} refute_receive {:setup_completed, _any} refute_receive {:playing, _any} Testing.Pipeline.notify_child(pipeline, :d, :complete_setup) - assert_receive {:setup_completed, children} - assert Enum.sort(children) == [:c, :d] + assert_receive {:setup_completed, :c} + assert_receive {:setup_completed, :d} refute_receive {:setup_completed, _any} refute_receive {:playing, _any} Testing.Pipeline.execute_actions(pipeline, setup: :complete) - assert_receive {:playing, [:a]} - assert_receive {:playing, [:b]} - assert_receive {:playing, children} - assert Enum.sort(children) == [:c, :d] + for child <- [:a, :b, :c, :d] do + assert_receive {:playing, ^child} + end refute_receive {:setup_completed, _any} refute_receive {:playing, _any}