Skip to content

Commit

Permalink
Log diamond wip
Browse files Browse the repository at this point in the history
  • Loading branch information
FelonEkonom committed Nov 25, 2024
1 parent 646898c commit 30b757e
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 12 deletions.
7 changes: 6 additions & 1 deletion lib/membrane/core/element.ex
Original file line number Diff line number Diff line change
Expand Up @@ -288,11 +288,16 @@ defmodule Membrane.Core.Element do
end

defp do_handle_info(
Message.new(:diamond_detection, [diamond_detection_ref, diamond_detection_path]),
Message.new(:diamond_detection, [
input_pad_ref,
diamond_detection_ref,
diamond_detection_path
]),
state
) do
state =
DiamondDetectionController.continue_diamond_detection(
input_pad_ref,
diamond_detection_ref,
diamond_detection_path,
state
Expand Down
69 changes: 58 additions & 11 deletions lib/membrane/core/element/diamond_detection_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,55 @@ defmodule Membrane.Core.Element.DiamondDetectionController do
@moduledoc false

require Membrane.Core.Message, as: Message
require Membrane.Logger
require Membrane.Pad, as: Pad

alias Membrane.Child
alias Membrane.Core.Element.State
alias Membrane.Element.PadData

# TODO: don't forward diamond detection and triggers in endpoints
@component_path_suffix "__membrane_component_path_64_byte_suffix________________________"
@not_a_pad :__membrane_not_a_pad__

@type diamond_detection_path :: [{pid(), Child.name(), Pad.ref()}]
@type diamond_detection_path_entry :: %{
pid: pid(),
component_path: String.t(),
input_pad_ref: Pad.ref(),
output_pad_ref: Pad.ref()
}

@type diamond_detection_path() :: [diamond_detection_path_entry()]

@spec start_diamond_detection(State.t()) :: :ok
def start_diamond_detection(state) do
diamond_detection_path = [
%{
pid: self(),
component_path: get_component_path(),
input_pad_ref: @not_a_pad,
output_pad_ref: nil
}
]

make_ref()
|> forward_diamond_detection([], state)
|> forward_diamond_detection(diamond_detection_path, state)
end

@spec continue_diamond_detection(reference(), diamond_detection_path(), State.t()) :: State.t()
def continue_diamond_detection(diamond_detection_ref, diamond_detecton_path, state) do
@spec continue_diamond_detection(Pad.ref(), reference(), diamond_detection_path(), State.t()) :: State.t()
def continue_diamond_detection(
input_pad_ref,
diamond_detection_ref,
diamond_detecton_path,
state
) do
new_path_entry = %{
pid: self(),
component_path: get_component_path(),
input_pad_ref: input_pad_ref,
output_pad_ref: @not_a_pad
}

diamond_detecton_path = [new_path_entry | diamond_detecton_path]

cond do
not is_map_key(state.diamond_detection_ref_to_path, diamond_detection_ref) ->
:ok = forward_diamond_detection(diamond_detection_ref, diamond_detecton_path, state)
Expand All @@ -37,6 +68,9 @@ defmodule Membrane.Core.Element.DiamondDetectionController do
has_cycle?(diamond_detecton_path) ->
state

have_common_prefix?(diamond_detecton_path, state.diamond_detection_ref_to_path[diamond_detection_ref]) ->
state

true ->
# todo: log diamond
state
Expand All @@ -55,18 +89,18 @@ defmodule Membrane.Core.Element.DiamondDetectionController do
@spec forward_diamond_detection(reference(), diamond_detection_path(), State.t()) :: :ok
defp forward_diamond_detection(diamond_detection_ref, diamond_detection_path, state) do
auto_pull_mode? = state.effective_flow_control == :pull
[current_entry | diamond_detection_path_tail] = diamond_detection_path

state.pads_data
|> Enum.each(fn {pad_ref, pad_data} ->
if is_output_pull_pad(pad_data, auto_pull_mode?) do
name = state.name
# name = Membrane.ComponentPath.get()
diamond_detection_path = [{self(), name, pad_ref} | diamond_detection_path]
current_entry = %{current_entry | output_pad_ref: pad_ref}
diamond_detection_path = [current_entry | diamond_detection_path_tail]

Message.send(
pad_data.pid,
:diamond_detection,
[diamond_detection_ref, diamond_detection_path]
[pad_data.other_ref, diamond_detection_ref, diamond_detection_path]
)
end
end)
Expand All @@ -88,7 +122,7 @@ defmodule Membrane.Core.Element.DiamondDetectionController do
end

defp has_cycle?(diamond_detection_path) do
uniq_length = diamond_detection_path |> Enum.uniq() |> length()
uniq_length = diamond_detection_path |> Enum.uniq_by(& &1.pid) |> length()
uniq_length < length(diamond_detection_path)
end

Expand Down Expand Up @@ -153,4 +187,17 @@ defmodule Membrane.Core.Element.DiamondDetectionController do
self() |> Process.send_after(message, send_after_time)
:ok
end

defp get_component_path() do
# adding @component_path_suffix to component path will cause that component path will
# always have more than 64 bytes, so it won't be copied during sending a message
(Membrane.ComponentPath.get() ++ [@component_path_suffix])
|> Enum.join()
end

defp have_common_prefix?(path_a, path_b), do: List.last(path_a) == List.last(path_b)

defp log_diamond(path_a, path_b) do

end
end

0 comments on commit 30b757e

Please sign in to comment.