diff --git a/lib/membrane/core/element.ex b/lib/membrane/core/element.ex index 35a0be04e..b3966bb29 100644 --- a/lib/membrane/core/element.ex +++ b/lib/membrane/core/element.ex @@ -288,11 +288,16 @@ defmodule Membrane.Core.Element do end defp do_handle_info( - Message.new(:diamond_detection, [diamond_detection_ref, diamond_detection_path]), + Message.new(:diamond_detection, [ + input_pad_ref, + diamond_detection_ref, + diamond_detection_path + ]), state ) do state = DiamondDetectionController.continue_diamond_detection( + input_pad_ref, diamond_detection_ref, diamond_detection_path, state diff --git a/lib/membrane/core/element/diamond_detection_controller.ex b/lib/membrane/core/element/diamond_detection_controller.ex index cd2bdd227..d65880b15 100644 --- a/lib/membrane/core/element/diamond_detection_controller.ex +++ b/lib/membrane/core/element/diamond_detection_controller.ex @@ -2,24 +2,55 @@ defmodule Membrane.Core.Element.DiamondDetectionController do @moduledoc false require Membrane.Core.Message, as: Message + require Membrane.Logger require Membrane.Pad, as: Pad - alias Membrane.Child alias Membrane.Core.Element.State alias Membrane.Element.PadData - # TODO: don't forward diamond detection and triggers in endpoints + @component_path_suffix "__membrane_component_path_64_byte_suffix________________________" + @not_a_pad :__membrane_not_a_pad__ - @type diamond_detection_path :: [{pid(), Child.name(), Pad.ref()}] + @type diamond_detection_path_entry :: %{ + pid: pid(), + component_path: String.t(), + input_pad_ref: Pad.ref(), + output_pad_ref: Pad.ref() + } + + @type diamond_detection_path() :: [diamond_detection_path_entry()] @spec start_diamond_detection(State.t()) :: :ok def start_diamond_detection(state) do + diamond_detection_path = [ + %{ + pid: self(), + component_path: get_component_path(), + input_pad_ref: @not_a_pad, + output_pad_ref: nil + } + ] + make_ref() - |> forward_diamond_detection([], state) + |> forward_diamond_detection(diamond_detection_path, state) end - @spec continue_diamond_detection(reference(), diamond_detection_path(), State.t()) :: State.t() - def continue_diamond_detection(diamond_detection_ref, diamond_detecton_path, state) do + @spec continue_diamond_detection(Pad.ref(), reference(), diamond_detection_path(), State.t()) :: State.t() + def continue_diamond_detection( + input_pad_ref, + diamond_detection_ref, + diamond_detecton_path, + state + ) do + new_path_entry = %{ + pid: self(), + component_path: get_component_path(), + input_pad_ref: input_pad_ref, + output_pad_ref: @not_a_pad + } + + diamond_detecton_path = [new_path_entry | diamond_detecton_path] + cond do not is_map_key(state.diamond_detection_ref_to_path, diamond_detection_ref) -> :ok = forward_diamond_detection(diamond_detection_ref, diamond_detecton_path, state) @@ -37,6 +68,9 @@ defmodule Membrane.Core.Element.DiamondDetectionController do has_cycle?(diamond_detecton_path) -> state + have_common_prefix?(diamond_detecton_path, state.diamond_detection_ref_to_path[diamond_detection_ref]) -> + state + true -> # todo: log diamond state @@ -55,18 +89,18 @@ defmodule Membrane.Core.Element.DiamondDetectionController do @spec forward_diamond_detection(reference(), diamond_detection_path(), State.t()) :: :ok defp forward_diamond_detection(diamond_detection_ref, diamond_detection_path, state) do auto_pull_mode? = state.effective_flow_control == :pull + [current_entry | diamond_detection_path_tail] = diamond_detection_path state.pads_data |> Enum.each(fn {pad_ref, pad_data} -> if is_output_pull_pad(pad_data, auto_pull_mode?) do - name = state.name - # name = Membrane.ComponentPath.get() - diamond_detection_path = [{self(), name, pad_ref} | diamond_detection_path] + current_entry = %{current_entry | output_pad_ref: pad_ref} + diamond_detection_path = [current_entry | diamond_detection_path_tail] Message.send( pad_data.pid, :diamond_detection, - [diamond_detection_ref, diamond_detection_path] + [pad_data.other_ref, diamond_detection_ref, diamond_detection_path] ) end end) @@ -88,7 +122,7 @@ defmodule Membrane.Core.Element.DiamondDetectionController do end defp has_cycle?(diamond_detection_path) do - uniq_length = diamond_detection_path |> Enum.uniq() |> length() + uniq_length = diamond_detection_path |> Enum.uniq_by(& &1.pid) |> length() uniq_length < length(diamond_detection_path) end @@ -153,4 +187,17 @@ defmodule Membrane.Core.Element.DiamondDetectionController do self() |> Process.send_after(message, send_after_time) :ok end + + defp get_component_path() do + # adding @component_path_suffix to component path will cause that component path will + # always have more than 64 bytes, so it won't be copied during sending a message + (Membrane.ComponentPath.get() ++ [@component_path_suffix]) + |> Enum.join() + end + + defp have_common_prefix?(path_a, path_b), do: List.last(path_a) == List.last(path_b) + + defp log_diamond(path_a, path_b) do + + end end