diff --git a/CHANGELOG.md b/CHANGELOG.md index 04d410f1e..52447447e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ## 1.2.0 * Add `:max_instances` option for dynamic pads. [#876](https://github.com/membraneframework/membrane_core/pull/876) * Implement `Membrane.Connector`. [#904](https://github.com/membraneframework/membrane_core/pull/904) + * Implememnt diamonds detection. [#909](https://github.com/membraneframework/membrane_core/pull/909) ## 1.1.2 * Add new callback `handle_child_terminated/3` along with new assertions. [#894](https://github.com/membraneframework/membrane_core/pull/894) diff --git a/lib/membrane/core/element.ex b/lib/membrane/core/element.ex index 82d10c479..0c639dacc 100644 --- a/lib/membrane/core/element.ex +++ b/lib/membrane/core/element.ex @@ -24,6 +24,7 @@ defmodule Membrane.Core.Element do alias Membrane.Core.Element.{ BufferController, DemandController, + DiamondDetectionController, EffectiveFlowController, EventController, LifecycleController, @@ -281,6 +282,11 @@ defmodule Membrane.Core.Element do {:noreply, state} end + defp do_handle_info(Message.new(:diamond_detection, message), state) do + state = DiamondDetectionController.handle_diamond_detection_message(message, state) + {:noreply, state} + end + defp do_handle_info(Message.new(:terminate), state) do state = LifecycleController.handle_terminate_request(state) {:noreply, state} diff --git a/lib/membrane/core/element/diamond_detection_controller.ex b/lib/membrane/core/element/diamond_detection_controller.ex new file mode 100644 index 000000000..7e4b5650a --- /dev/null +++ b/lib/membrane/core/element/diamond_detection_controller.ex @@ -0,0 +1,312 @@ +defmodule Membrane.Core.Element.DiamondDetectionController do + @moduledoc false + + # DESCRIPTION OF THE ALGORITHM OF FINDING DIAMONDS IN THE PIPELINE + + # Definitions: + + # diamond - directed graph that has at least two distinct elements (sink and source) and + # has two vertex-disjoint paths from the source to the sink. + + # This algorithm takes the directed graph made by all elements within a single pipeline and + # finds some diamond-subgraphs where all the edges (links) work in the :pull mode, + # that is they're either in the :manual flow control or in the :auto flow control and the + # effective flow control is set to :pull. + + # These diamonds can be dangerous when used with pull flow control, e.g. let's consider + # a pipeline that contains: + # * MP4 demuxer that has two output pads + # * MKV muxer that is linked to both of them + # and let's assume that the MP4 file that is consumed by the MP4 demuxer is unbalanced + # (audio and video streams are not interleaved, for example audio comes first and then video) + # If the MKV muxer has pads working in pull mode, then demand on one pad will be satisfied, + # but on the other won't, because the source MP4 file is unbalanced. Then, if the MP4 demuxer + # has pads in auto flow control and its effective flow control is set to :pull, it won't + # demand on the input, because one of the pads output with :auto flow control doesn't + # have positive demand, so the whole pipeline will get stuck and won't process more data. + + # The algorithm is made of two phases: (1) triggering and (2) proper searching. + + # (1) Triggering + + # Let's notice that: + # * a new diamond can be created only after linking a new spec + # * if the new spec created a new diamond, this diamond will contain some of + # the links spawned in this spec + # If the diamond contains a link, it must also contain an element whose output pad + # is part of this link. + + # After the spec status is set to :done, the parent component that returned the spec will + # triggerall elements whose output pads have been linked in this spec (`type: :start_trigger`). + # The reference of the trigger is always set to the spec reference. + + # If the element is triggered with a specific reference (`type: :trigger`) for the first time, + # it does two things: + # * the element forwards the trigger with the same reference via all input pads working + # in the pull mode (`type: :trigger`) + # * if the element has at least two output pads working in the pull mode, it postpones + # the proper searching that will be spawned from itself (`type: :start_search`). The time + # between postponing and the proper searching is one second. If during this time an element + # is triggered once again with a different reference, it won't cause another postponement + # of the proper searching, this means that at the time there is at most one proper searching + # postponed in the single element + + # (2) Proper searching: + + # Proper searching is started only in elements that have at least two output pads + # working in the pull mode. When an element starts proper searching, it assigns + # a new reference to it, different from the reference of the related trigger. + + # When proper searching enters the element (no matter if it is the element that has + # just started the proper searching, or maybe it was forwarded to it via a link): + # * if the element sees the proper searching reference for the first time, then: + # - it forwards proper searching via all output pads working in the pull mode + # (`type: :search`) + # - when proper searching is forwarded, it remembers the path in the graph through + # the elements that it has already passed + # * if the element has already seen the reference of proper searching, but there is + # a repeated element on the path that proper searching traversed to this element, + # the element does nothing + # * if the element has already seen the reference of proper searching and the traversed + # path doesn't contain any repeated elements, it means that the current traversed path + # and the path that the proper searching traversed when it entered the element + # the previous time together make a diamond. Then, the element logs the found diamond + # and doesn't forward proper searching further. + + alias __MODULE__.{DiamondLogger, PathInGraph} + alias Membrane.Core.Element.State + alias Membrane.Element.PadData + + require Membrane.Core.Message, as: Message + require Membrane.Logger + require Membrane.Pad, as: Pad + + @type diamond_detection_message() :: %{ + :type => + :start_search + | :search + | :delete_search_ref + | :start_trigger + | :trigger + | :delete_trigger_ref, + optional(:ref) => reference(), + optional(:path) => PathInGraph.t(), + optional(:pad_ref) => Pad.ref() + } + + @spec handle_diamond_detection_message(diamond_detection_message(), State.t()) :: State.t() + def handle_diamond_detection_message(%{type: type} = message, state) do + case type do + :start_search -> + :ok = start_search(state) + state + + :search -> + handle_and_forward_search(message.pad_ref, message.ref, message.path, state) + + :delete_search_ref -> + delete_search_ref(message.ref, state) + + :start_trigger -> + start_trigger(message.ref, state) + + :trigger -> + handle_and_forward_trigger(message.ref, state) + + :delete_trigger_ref -> + delete_trigger_ref(message.ref, state) + end + end + + @spec start_search(State.t()) :: :ok + defp start_search(state) do + component_path = Membrane.ComponentPath.get_formatted() + + diamond_detection_path = [ + %PathInGraph.Vertex{pid: self(), component_path: component_path} + ] + + :ok = + make_ref() + |> forward_search(diamond_detection_path, state) + + :ok + end + + @spec handle_and_forward_search(Pad.ref(), reference(), PathInGraph.t(), State.t()) :: + State.t() + defp handle_and_forward_search( + input_pad_ref, + diamond_detection_ref, + diamond_detecton_path, + state + ) do + component_path = Membrane.ComponentPath.get_formatted() + + new_path_vertex = %PathInGraph.Vertex{ + pid: self(), + component_path: component_path, + input_pad_ref: input_pad_ref + } + + diamond_detecton_path = [new_path_vertex | diamond_detecton_path] + + cond do + not is_map_key(state.diamond_detection_state.ref_to_path, diamond_detection_ref) -> + :ok = forward_search(diamond_detection_ref, diamond_detecton_path, state) + + :ok = + %{type: :delete_search_ref, ref: diamond_detection_ref} + |> send_after_to_self() + + state + |> put_in( + [:diamond_detection_state, :ref_to_path, diamond_detection_ref], + diamond_detecton_path + ) + + has_cycle?(diamond_detecton_path) -> + state + + have_common_prefix?( + diamond_detecton_path, + state.diamond_detection_state.ref_to_path[diamond_detection_ref] + ) -> + state + + true -> + :ok = + state.diamond_detection_state.ref_to_path[diamond_detection_ref] + |> DiamondLogger.log_diamond(diamond_detecton_path) + + state + end + end + + @spec delete_search_ref(reference(), State.t()) :: State.t() + defp delete_search_ref(diamond_detection_ref, state) do + {_path, %State{} = state} = + state + |> pop_in([:diamond_detection_state, :ref_to_path, diamond_detection_ref]) + + state + end + + @spec forward_search(reference(), PathInGraph.t(), State.t()) :: :ok + defp forward_search(diamond_detection_ref, diamond_detection_path, state) do + auto_pull_mode? = state.effective_flow_control == :pull + [current_entry | diamond_detection_path_tail] = diamond_detection_path + + state.pads_data + |> Enum.each(fn {pad_ref, pad_data} -> + if output_pull_pad?(pad_data, auto_pull_mode?) do + current_entry = %{current_entry | output_pad_ref: pad_ref} + diamond_detection_path = [current_entry | diamond_detection_path_tail] + + message = %{ + type: :search, + pad_ref: pad_data.other_ref, + ref: diamond_detection_ref, + path: diamond_detection_path + } + + Message.send(pad_data.pid, :diamond_detection, message) + end + end) + end + + defp forward_diamond_detection_trigger(trigger_ref, state) do + state.pads_data + |> Enum.each(fn {_pad_ref, %PadData{} = pad_data} -> + if pad_data.direction == :input and pad_data.flow_control != :push do + message = %{type: :trigger, ref: trigger_ref} + Message.send(pad_data.pid, :diamond_detection, message) + end + end) + end + + defp output_pull_pad?(%PadData{} = pad_data, auto_pull_mode?) do + pad_data.direction == :output and + (pad_data.flow_control == :manual or + (pad_data.flow_control == :auto and auto_pull_mode?)) + end + + defp has_cycle?(diamond_detection_path) do + uniq_length = diamond_detection_path |> Enum.uniq_by(& &1.pid) |> length() + uniq_length < length(diamond_detection_path) + end + + @spec start_trigger(reference(), State.t()) :: State.t() + defp start_trigger(spec_ref, state) do + if map_size(state.pads_data) < 2 or + MapSet.member?(state.diamond_detection_state.trigger_refs, spec_ref) do + state + else + do_handle_and_forward_trigger(spec_ref, state) + end + end + + @spec handle_and_forward_trigger(reference(), State.t()) :: State.t() + defp handle_and_forward_trigger(trigger_ref, %State{} = state) do + if state.type == :endpoint or + MapSet.member?(state.diamond_detection_state.trigger_refs, trigger_ref), + do: state, + else: do_handle_and_forward_trigger(trigger_ref, state) + end + + defp do_handle_and_forward_trigger(trigger_ref, %State{} = state) do + state = + state + |> update_in( + [:diamond_detection_state, :trigger_refs], + &MapSet.put(&1, trigger_ref) + ) + + :ok = + %{type: :delete_trigger_ref, ref: trigger_ref} + |> send_after_to_self() + + :ok = forward_diamond_detection_trigger(trigger_ref, state) + + if output_pull_arity(state) >= 2, + do: postpone_diamond_detection(state), + else: state + end + + defp postpone_diamond_detection(%State{} = state) + when state.diamond_detection_state.postponed? do + state + end + + defp postpone_diamond_detection(%State{} = state) do + :ok = %{type: :start_search} |> send_after_to_self(1) + + state + |> put_in([:diamond_detection_state, :postponed?], true) + end + + @spec delete_trigger_ref(reference(), State.t()) :: State.t() + defp delete_trigger_ref(trigger_ref, state) do + state + |> update_in( + [:diamond_detection_state, :trigger_refs], + &MapSet.delete(&1, trigger_ref) + ) + end + + defp output_pull_arity(state) do + auto_pull_mode? = state.effective_flow_control == :pull + + state.pads_data + |> Enum.count(fn {_pad_ref, pad_data} -> output_pull_pad?(pad_data, auto_pull_mode?) end) + end + + defp send_after_to_self(%{type: _type} = message, seconds \\ 10) do + send_after_time = Membrane.Time.seconds(seconds) |> Membrane.Time.as_milliseconds(:round) + message = Message.new(:diamond_detection, message) + self() |> Process.send_after(message, send_after_time) + :ok + end + + defp have_common_prefix?(path_a, path_b), do: List.last(path_a) == List.last(path_b) +end diff --git a/lib/membrane/core/element/diamond_detection_controller/diamond_detection_state.ex b/lib/membrane/core/element/diamond_detection_controller/diamond_detection_state.ex new file mode 100644 index 000000000..da2545177 --- /dev/null +++ b/lib/membrane/core/element/diamond_detection_controller/diamond_detection_state.ex @@ -0,0 +1,17 @@ +defmodule Membrane.Core.Element.DiamondDetectionController.DiamondDatectionState do + @moduledoc false + + use Bunch.Access + + alias Membrane.Core.Element.DiamondDetectionController.PathInGraph + + defstruct ref_to_path: %{}, + trigger_refs: MapSet.new(), + postponed?: false + + @type t :: %__MODULE__{ + ref_to_path: %{optional(reference()) => PathInGraph.t()}, + trigger_refs: MapSet.t(reference()), + postponed?: boolean() + } +end diff --git a/lib/membrane/core/element/diamond_detection_controller/diamond_logger.ex b/lib/membrane/core/element/diamond_detection_controller/diamond_logger.ex new file mode 100644 index 000000000..14eeaa8c7 --- /dev/null +++ b/lib/membrane/core/element/diamond_detection_controller/diamond_logger.ex @@ -0,0 +1,43 @@ +defmodule Membrane.Core.Element.DiamondDetectionController.DiamondLogger do + @moduledoc false + + alias Membrane.Core.Element.DiamondDetectionController.PathInGraph + alias Membrane.Core.Element.DiamondDetectionController.PathInGraph.Vertex + + require Membrane.Logger + + # logging a diamond is moved to the separate module due to testing + + @spec log_diamond(PathInGraph.t(), PathInGraph.t()) :: :ok + def log_diamond(path_a, path_b) do + from = List.last(path_a) |> Map.fetch!(:component_path) + to = List.first(path_a) |> Map.fetch!(:component_path) + + Membrane.Logger.debug(""" + Two paths from element #{from} to #{to} were detected, in which all pads are operating in pull \ + mode. With such a pipeline configuration, the membrane flow control mechanism may stop demanding \ + buffers. If you are debugging such an issue, keep in mind that input pads with `flow_control: :auto` \ + demand data when there is a demand for data on ALL output pads with `flow_control: :auto`. + + The first path from #{from} to #{to} leads: + #{inspect_path(path_a)} + + The second path from #{from} to #{to} leads: + #{inspect_path(path_b)} + """) + + :ok + end + + defp inspect_path(path) do + path + |> Enum.reverse() + |> Enum.chunk_every(2, 1, :discard) + |> Enum.map_join("\n", fn [%Vertex{} = from, %Vertex{} = to] -> + """ + * from #{from.component_path} via output pad #{inspect(from.output_pad_ref)} to \ + #{to.component_path} via input pad #{inspect(to.input_pad_ref)} + """ + end) + end +end diff --git a/lib/membrane/core/element/diamond_detection_controller/path_in_grapth.ex b/lib/membrane/core/element/diamond_detection_controller/path_in_grapth.ex new file mode 100644 index 000000000..bf0b0c63c --- /dev/null +++ b/lib/membrane/core/element/diamond_detection_controller/path_in_grapth.ex @@ -0,0 +1,19 @@ +defmodule Membrane.Core.Element.DiamondDetectionController.PathInGraph do + @moduledoc false + + defmodule Vertex do + @moduledoc false + require Membrane.Pad, as: Pad + + defstruct [:pid, :component_path, :input_pad_ref, :output_pad_ref] + + @type t :: %__MODULE__{ + pid: pid(), + component_path: String.t(), + input_pad_ref: Pad.ref() | nil, + output_pad_ref: Pad.ref() | nil + } + end + + @type t :: [Vertex.t()] +end diff --git a/lib/membrane/core/element/state.ex b/lib/membrane/core/element/state.ex index e7e6392f4..1c4d2d898 100644 --- a/lib/membrane/core/element/state.ex +++ b/lib/membrane/core/element/state.ex @@ -9,6 +9,7 @@ defmodule Membrane.Core.Element.State do alias Membrane.{Clock, Element, Pad, Sync} alias Membrane.Core.Child.PadModel + alias Membrane.Core.Element.DiamondDetectionController.DiamondDatectionState alias Membrane.Core.Element.EffectiveFlowController alias Membrane.Core.Timer @@ -46,7 +47,8 @@ defmodule Membrane.Core.Element.State do stalker: Membrane.Core.Stalker.t(), satisfied_auto_output_pads: MapSet.t(), awaiting_auto_input_pads: MapSet.t(), - resume_delayed_demands_loop_in_mailbox?: boolean() + resume_delayed_demands_loop_in_mailbox?: boolean(), + diamond_detection_state: DiamondDatectionState.t() } # READ THIS BEFORE ADDING NEW FIELD!!! @@ -79,6 +81,7 @@ defmodule Membrane.Core.Element.State do handle_demand_loop_counter: 0, pads_to_snapshot: MapSet.new(), playback_queue: [], + diamond_detection_state: %DiamondDatectionState{}, pads_data: %{}, satisfied_auto_output_pads: MapSet.new(), awaiting_auto_input_pads: MapSet.new(), diff --git a/lib/membrane/core/parent/child_life_controller.ex b/lib/membrane/core/parent/child_life_controller.ex index c9936ba8d..13014be02 100644 --- a/lib/membrane/core/parent/child_life_controller.ex +++ b/lib/membrane/core/parent/child_life_controller.ex @@ -11,6 +11,7 @@ defmodule Membrane.Core.Parent.ChildLifeController do ChildEntryParser, ChildrenModel, ClockHandler, + DiamondDetectionController, Link, SpecificationParser } @@ -425,7 +426,7 @@ defmodule Membrane.Core.Parent.ChildLifeController do end end - defp do_proceed_spec_startup(_spec_ref, %{status: :ready} = spec_data, state) do + defp do_proceed_spec_startup(spec_ref, %{status: :ready} = spec_data, state) do state = Enum.reduce(spec_data.children_names, state, fn child, state -> %{pid: pid, terminating?: terminating?} = state.children[child] @@ -439,6 +440,13 @@ defmodule Membrane.Core.Parent.ChildLifeController do put_in(state.children[child].ready?, true) end) + spec_data.links_ids + |> Enum.map(&state.links[&1].from.child) + |> Enum.uniq() + |> Enum.each(fn child_name -> + DiamondDetectionController.start_diamond_detection_trigger(child_name, spec_ref, state) + end) + state = with %{playback: :playing} <- state do handle_children_playing(spec_data.children_names, state) diff --git a/lib/membrane/core/parent/diamond_detection_controller.ex b/lib/membrane/core/parent/diamond_detection_controller.ex new file mode 100644 index 000000000..dd3488f3a --- /dev/null +++ b/lib/membrane/core/parent/diamond_detection_controller.ex @@ -0,0 +1,18 @@ +defmodule Membrane.Core.Parent.DiamondDetectionController do + @moduledoc false + + alias Membrane.Child + alias Membrane.Core.Parent + + require Membrane.Core.Message, as: Message + + @spec start_diamond_detection_trigger(Child.name(), reference(), Parent.state()) :: :ok + def start_diamond_detection_trigger(child_name, trigger_ref, state) do + with %{component_type: :element, pid: pid} <- state.children[child_name] do + message = %{type: :start_trigger, ref: trigger_ref} + Message.send(pid, :diamond_detection, message) + end + + :ok + end +end diff --git a/lib/membrane/core/parent/link_endpoint.ex b/lib/membrane/core/parent/link_endpoint.ex index c828e8f8d..ca5cb1f27 100644 --- a/lib/membrane/core/parent/link_endpoint.ex +++ b/lib/membrane/core/parent/link_endpoint.ex @@ -3,14 +3,14 @@ defmodule Membrane.Core.Parent.Link.Endpoint do use Bunch.Access - alias Membrane.{Element, Pad} + alias Membrane.{Child, Pad} @enforce_keys [:child, :pad_spec] defstruct @enforce_keys ++ [pad_ref: nil, pid: nil, pad_props: [], pad_info: %{}, child_spec_ref: nil] @type t() :: %__MODULE__{ - child: Element.name() | {Membrane.Bin, :itself}, + child: Child.name() | {Membrane.Bin, :itself}, pad_spec: Pad.name() | Pad.ref(), pad_ref: Pad.ref(), pid: pid(), diff --git a/mix.exs b/mix.exs index c1fab8be5..77b38ce84 100644 --- a/mix.exs +++ b/mix.exs @@ -152,6 +152,7 @@ defmodule Membrane.Mixfile do {:credo, "~> 1.7", only: :dev, runtime: false}, # Testing {:mox, "~> 1.0", only: :test}, + {:mock, "~> 0.3.8", only: :test}, {:junit_formatter, "~> 3.1", only: :test}, {:excoveralls, "~> 0.14", only: :test} ] diff --git a/mix.lock b/mix.lock index d7ed3e717..c59ec0d56 100644 --- a/mix.lock +++ b/mix.lock @@ -15,6 +15,8 @@ "makeup_diff": {:hex, :makeup_diff, "0.1.1", "01498f8c95970081297837eaf4686b6f3813e535795b8421f15ace17a59aea37", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "fadb0bf014bd328badb7be986eadbce1a29955dd51c27a9e401c3045cf24184e"}, "makeup_elixir": {:hex, :makeup_elixir, "0.16.2", "627e84b8e8bf22e60a2579dad15067c755531fea049ae26ef1020cad58fe9578", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "41193978704763f6bbe6cc2758b84909e62984c7752b3784bd3c218bb341706b"}, "makeup_erlang": {:hex, :makeup_erlang, "1.0.1", "c7f58c120b2b5aa5fd80d540a89fdf866ed42f1f3994e4fe189abebeab610839", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "8a89a1eeccc2d798d6ea15496a6e4870b75e014d1af514b1b71fa33134f57814"}, + "meck": {:hex, :meck, "0.9.2", "85ccbab053f1db86c7ca240e9fc718170ee5bda03810a6292b5306bf31bae5f5", [:rebar3], [], "hexpm", "81344f561357dc40a8344afa53767c32669153355b626ea9fcbc8da6b3045826"}, + "mock": {:hex, :mock, "0.3.8", "7046a306b71db2488ef54395eeb74df0a7f335a7caca4a3d3875d1fc81c884dd", [:mix], [{:meck, "~> 0.9.2", [hex: :meck, repo: "hexpm", optional: false]}], "hexpm", "7fa82364c97617d79bb7d15571193fc0c4fe5afd0c932cef09426b3ee6fe2022"}, "mox": {:hex, :mox, "1.2.0", "a2cd96b4b80a3883e3100a221e8adc1b98e4c3a332a8fc434c39526babafd5b3", [:mix], [{:nimble_ownership, "~> 1.0", [hex: :nimble_ownership, repo: "hexpm", optional: false]}], "hexpm", "c7b92b3cc69ee24a7eeeaf944cd7be22013c52fcb580c1f33f50845ec821089a"}, "nimble_ownership": {:hex, :nimble_ownership, "1.0.0", "3f87744d42c21b2042a0aa1d48c83c77e6dd9dd357e425a038dd4b49ba8b79a1", [:mix], [], "hexpm", "7c16cc74f4e952464220a73055b557a273e8b1b7ace8489ec9d86e9ad56cb2cc"}, "nimble_parsec": {:hex, :nimble_parsec, "1.4.0", "51f9b613ea62cfa97b25ccc2c1b4216e81df970acd8e16e8d1bdc58fef21370d", [:mix], [], "hexpm", "9c565862810fb383e9838c1dd2d7d2c437b3d13b267414ba6af33e50d2d1cf28"}, diff --git a/test/membrane/diamond_detection_test.exs b/test/membrane/diamond_detection_test.exs new file mode 100644 index 000000000..7f4c007fe --- /dev/null +++ b/test/membrane/diamond_detection_test.exs @@ -0,0 +1,129 @@ +defmodule Membrane.DiamondDetectionTest do + use ExUnit.Case, async: true + + import Membrane.ChildrenSpec + import Mock + + alias Membrane.Core.Element.DiamondDetectionController.DiamondLogger + alias Membrane.Core.Element.DiamondDetectionController.PathInGraph.Vertex + alias Membrane.Testing + + require Membrane.Pad, as: Pad + + defmodule Node do + use Membrane.Filter + + def_input_pad :input, + accepted_format: _any, + availability: :on_request, + flow_control: :manual, + demand_unit: :buffers + + def_output_pad :output, + accepted_format: _any, + availability: :on_request, + flow_control: :manual + + @impl true + def handle_demand(_pad, _demand, _unit, _ctx, state), do: {[], state} + end + + test "diamond detection algorithm" do + with_mock DiamondLogger, log_diamond: fn _path_a, _path_b -> :ok end do + # a -> b -> c and a -> c + spec = [ + child(:a, Node) + |> via_out(Pad.ref(:output, 1)) + |> via_in(Pad.ref(:input, 1)) + |> child(:b, Node) + |> via_out(Pad.ref(:output, 1)) + |> via_in(Pad.ref(:input, 1)) + |> child(:c, Node), + get_child(:a) + |> via_out(Pad.ref(:output, 2)) + |> via_in(Pad.ref(:input, 2)) + |> get_child(:c) + ] + + pipeline = Testing.Pipeline.start_link_supervised!(spec: spec) + + Process.sleep(1500) + + assert [{_element_pid, {DiamondLogger, :log_diamond, [path_a, path_b]}, :ok}] = + call_history(DiamondLogger) + + "#PID" <> pipeline_id = inspect(pipeline) + + [shorter_path, longer_path] = Enum.sort_by([path_a, path_b], &length/1) + + # assert a -> b -> c + assert [ + %Vertex{ + component_path: ^pipeline_id <> "/:c", + input_pad_ref: {Membrane.Pad, :input, 1} + }, + %Vertex{ + component_path: ^pipeline_id <> "/:b", + input_pad_ref: {Membrane.Pad, :input, 1}, + output_pad_ref: {Membrane.Pad, :output, 1} + }, + %Vertex{ + component_path: ^pipeline_id <> "/:a", + output_pad_ref: {Membrane.Pad, :output, 1} + } + ] = longer_path + + # assert a -> c + assert [ + %Vertex{ + component_path: ^pipeline_id <> "/:c", + input_pad_ref: {Membrane.Pad, :input, 2} + }, + %Vertex{ + component_path: ^pipeline_id <> "/:a", + output_pad_ref: {Membrane.Pad, :output, 2} + } + ] = shorter_path + + # d -> a -> b -> c and a -> c + spec = + child(:d, Node) + |> via_out(Pad.ref(:output, 3)) + |> via_in(Pad.ref(:input, 3)) + |> get_child(:a) + + Testing.Pipeline.execute_actions(pipeline, spec: spec) + + Process.sleep(1500) + + # adding this link shouldn't trigger logging a diamond + assert call_history(DiamondLogger) |> length() == 1 + + # d -> a -> b -> c and a -> c and d -> c + spec = + get_child(:d) + |> via_out(Pad.ref(:output, 4)) + |> via_in(Pad.ref(:input, 4)) + |> get_child(:c) + + Testing.Pipeline.execute_actions(pipeline, spec: spec) + + Process.sleep(1500) + + # there should be one or two new logged diamonds, depending on the race condition + assert [_old_entry | new_entries] = call_history(DiamondLogger) + assert length(new_entries) in [1, 2] + + new_entries + |> Enum.flat_map(fn {_element_pid, {DiamondLogger, :log_diamond, [path_a, path_b]}, :ok} -> + [path_a, path_b] + end) + |> Enum.each(fn path -> + assert %{component_path: ^pipeline_id <> "/:c"} = List.first(path) + assert %{component_path: ^pipeline_id <> "/:d"} = List.last(path) + end) + + Testing.Pipeline.terminate(pipeline) + end + end +end diff --git a/test/membrane/integration/connector_test.exs b/test/membrane/integration/connector_test.exs index cc6b7102e..59c8a683e 100644 --- a/test/membrane/integration/connector_test.exs +++ b/test/membrane/integration/connector_test.exs @@ -34,6 +34,8 @@ defmodule Membrane.Integration.ConnectorTest do }) ) + assert_child_playing(pipeline, :source) + data = generate_data(100, [:stream_format, :buffer, :event]) data diff --git a/test/membrane/integration/effective_flow_control_resolution_test.exs b/test/membrane/integration/effective_flow_control_resolution_test.exs index 894207fef..ca95853ed 100644 --- a/test/membrane/integration/effective_flow_control_resolution_test.exs +++ b/test/membrane/integration/effective_flow_control_resolution_test.exs @@ -7,7 +7,7 @@ defmodule Membrane.Integration.EffectiveFlowControlResolutionTest do alias Membrane.Testing defmodule AutoFilter do - use Membrane.Filter + use Membrane.Filter, flow_control_hints?: false def_input_pad :input, availability: :on_request, accepted_format: _any def_output_pad :output, availability: :on_request, accepted_format: _any diff --git a/test/membrane/integration/toilet_forwarding_test.exs b/test/membrane/integration/toilet_forwarding_test.exs index 37097add8..580823a2a 100644 --- a/test/membrane/integration/toilet_forwarding_test.exs +++ b/test/membrane/integration/toilet_forwarding_test.exs @@ -13,7 +13,7 @@ defmodule Membrane.Integration.ToiletForwardingTest do end defmodule AutoFilter do - use Membrane.Filter + use Membrane.Filter, flow_control_hints?: false def_input_pad :input, availability: :on_request, accepted_format: _any, flow_control: :auto def_output_pad :output, availability: :on_request, accepted_format: _any, flow_control: :auto