Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Detect diamonds #909

Open
wants to merge 33 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
7263c9a
Write separate module for diamonds detection WiP
FelonEkonom Nov 20, 2024
f4bce2d
Implement parallel diamond detection WiP
FelonEkonom Nov 20, 2024
e9e8c77
Small refactor
FelonEkonom Nov 20, 2024
cabbcfb
Fix bug
FelonEkonom Nov 20, 2024
a58b363
Fix typo
FelonEkonom Nov 21, 2024
5a76b10
Comment out triggering diamond detection in parents
FelonEkonom Nov 21, 2024
0be84b9
Trigger diamon detection between elements WiP
FelonEkonom Nov 21, 2024
58c5384
Trigger diamon detection between elements WiP
FelonEkonom Nov 22, 2024
7b5fdad
Fix typos
FelonEkonom Nov 22, 2024
646898c
wip
FelonEkonom Nov 22, 2024
30b757e
Log diamond wip
FelonEkonom Nov 25, 2024
39030a7
Add test wip
FelonEkonom Nov 26, 2024
a3c6bdd
Add more test cases wip
FelonEkonom Nov 26, 2024
9855e8a
Refactor diamond detection code
FelonEkonom Nov 26, 2024
f5d6941
Remove unused import
FelonEkonom Nov 26, 2024
acbe87d
Improve diamond log message, fix credo
FelonEkonom Nov 27, 2024
f02d57e
Remove leftover files
FelonEkonom Nov 27, 2024
2327b24
Merge remote-tracking branch 'origin/master' into pull-diamonds
FelonEkonom Nov 27, 2024
35cf655
Update changelog
FelonEkonom Nov 27, 2024
c8b586a
Remove leftovers
FelonEkonom Nov 27, 2024
97d15cf
Fix Connector test
FelonEkonom Nov 27, 2024
6314803
Fix typo in the comment
FelonEkonom Nov 27, 2024
1f94101
Refactor State due to CR
FelonEkonom Dec 4, 2024
090488a
Refactor diamond detection messages
FelonEkonom Dec 4, 2024
1d003ba
stop creating serialized component path every time
FelonEkonom Dec 4, 2024
501aeee
Write some comments describing how the diamond detection mechanism works
FelonEkonom Dec 5, 2024
07fcfd1
Move the comments within the file
FelonEkonom Dec 5, 2024
cc9039d
Fix typos in the algorithm description
FelonEkonom Dec 6, 2024
d364df9
Implement CR suggestions WiP
FelonEkonom Dec 16, 2024
c158ee8
Implement CR suggestions WiP
FelonEkonom Dec 16, 2024
8df71f8
Further refactor
FelonEkonom Dec 16, 2024
0bff060
Algorithm description refactor
FelonEkonom Dec 16, 2024
2dff094
Implement suggestions from CR
FelonEkonom Dec 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## 1.2.0
* Add `:max_instances` option for dynamic pads. [#876](https://github.com/membraneframework/membrane_core/pull/876)
* Implement `Membrane.Connector`. [#904](https://github.com/membraneframework/membrane_core/pull/904)
* Implememnt diamonds detection. [#909](https://github.com/membraneframework/membrane_core/pull/909)

## 1.1.2
* Add new callback `handle_child_terminated/3` along with new assertions. [#894](https://github.com/membraneframework/membrane_core/pull/894)
Expand Down
45 changes: 45 additions & 0 deletions lib/membrane/core/element.ex
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ defmodule Membrane.Core.Element do
alias Membrane.Core.Element.{
BufferController,
DemandController,
DiamondDetectionController,
EffectiveFlowController,
EventController,
LifecycleController,
Expand Down Expand Up @@ -281,6 +282,50 @@ defmodule Membrane.Core.Element do
{:noreply, state}
end

defp do_handle_info(Message.new(:start_diamond_detection), state) do
:ok = DiamondDetectionController.start_diamond_detection(state)
{:noreply, state}
end

defp do_handle_info(
Message.new(:diamond_detection, [
input_pad_ref,
diamond_detection_ref,
diamond_detection_path
]),
state
) do
state =
DiamondDetectionController.continue_diamond_detection(
input_pad_ref,
diamond_detection_ref,
diamond_detection_path,
state
)

{:noreply, state}
end

defp do_handle_info(Message.new(:delete_diamond_detection_ref, diamond_detection_ref), state) do
state = DiamondDetectionController.delete_diamond_detection_ref(diamond_detection_ref, state)
{:noreply, state}
end

defp do_handle_info(Message.new(:start_diamond_detection_trigger, trigger_ref), state) do
state = DiamondDetectionController.start_diamond_detection_trigger(trigger_ref, state)
{:noreply, state}
end

defp do_handle_info(Message.new(:diamond_detection_trigger, trigger_ref), state) do
state = DiamondDetectionController.handle_diamond_detection_trigger(trigger_ref, state)
{:noreply, state}
end

defp do_handle_info(Message.new(:delete_diamond_detection_trigger_ref, trigger_ref), state) do
state = DiamondDetectionController.delete_diamond_detection_trigger_ref(trigger_ref, state)
{:noreply, state}
end
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These messages seem quite internal to the diamond detection. I'd make them a single message and dispatch within DiamondDetectionController


defp do_handle_info(Message.new(:terminate), state) do
state = LifecycleController.handle_terminate_request(state)
{:noreply, state}
Expand Down
207 changes: 207 additions & 0 deletions lib/membrane/core/element/diamond_detection_controller.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
defmodule Membrane.Core.Element.DiamondDetectionController do
@moduledoc false

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's describe how the algorithm works

alias __MODULE__.{DiamondLogger, PathInGraph}
alias __MODULE__.PathInGraph.Vertex
alias Membrane.Core.Element.State
alias Membrane.Element.PadData

require Membrane.Core.Message, as: Message
require Membrane.Logger
require Membrane.Pad, as: Pad

@component_path_prefix "__membrane_component_path_64_byte_prefix________________________"

@spec start_diamond_detection(State.t()) :: :ok
def start_diamond_detection(state) do
diamond_detection_path = [
%PathInGraph.Vertex{pid: self(), component_path: get_component_path()}
]

make_ref()
|> forward_diamond_detection(diamond_detection_path, state)
end

@spec continue_diamond_detection(Pad.ref(), reference(), PathInGraph.t(), State.t()) ::
State.t()
def continue_diamond_detection(
input_pad_ref,
diamond_detection_ref,
diamond_detecton_path,
state
) do
new_path_vertex = %PathInGraph.Vertex{
pid: self(),
component_path: get_component_path(),
input_pad_ref: input_pad_ref
}

diamond_detecton_path = [new_path_vertex | diamond_detecton_path]

cond do
not is_map_key(state.diamond_detection_ref_to_path, diamond_detection_ref) ->
:ok = forward_diamond_detection(diamond_detection_ref, diamond_detecton_path, state)

:ok =
Message.new(:delete_diamond_detection_ref, diamond_detection_ref)
|> send_after_to_self()

state
|> put_in(
[:diamond_detection_ref_to_path, diamond_detection_ref],
diamond_detecton_path
)

has_cycle?(diamond_detecton_path) ->
state

have_common_prefix?(
diamond_detecton_path,
state.diamond_detection_ref_to_path[diamond_detection_ref]
) ->
state

true ->
old_diamond_detection_path =
state.diamond_detection_ref_to_path[diamond_detection_ref]
|> remove_component_path_prefix()

:ok =
diamond_detecton_path
|> remove_component_path_prefix()
|> DiamondLogger.log_diamond(old_diamond_detection_path)

state
end
end

@spec delete_diamond_detection_ref(reference(), State.t()) :: State.t()
def delete_diamond_detection_ref(diamond_detection_ref, state) do
state
|> Map.update!(
:diamond_detection_ref_to_path,
&Map.delete(&1, diamond_detection_ref)
)
end

@spec forward_diamond_detection(reference(), PathInGraph.t(), State.t()) :: :ok
defp forward_diamond_detection(diamond_detection_ref, diamond_detection_path, state) do
auto_pull_mode? = state.effective_flow_control == :pull
[current_entry | diamond_detection_path_tail] = diamond_detection_path

state.pads_data
|> Enum.each(fn {pad_ref, pad_data} ->
if output_pull_pad?(pad_data, auto_pull_mode?) do
current_entry = %{current_entry | output_pad_ref: pad_ref}
diamond_detection_path = [current_entry | diamond_detection_path_tail]

Message.send(
pad_data.pid,
:diamond_detection,
[pad_data.other_ref, diamond_detection_ref, diamond_detection_path]
)
end
end)
end

defp forward_diamond_detection_trigger(trigger_ref, state) do
state.pads_data
|> Enum.each(fn {_pad_ref, %PadData{} = pad_data} ->
if pad_data.direction == :input and pad_data.flow_control != :push do
Message.send(pad_data.pid, :diamond_detection_trigger, trigger_ref)
end
end)
end

defp output_pull_pad?(%PadData{} = pad_data, auto_pull_mode?) do
pad_data.direction == :output and
(pad_data.flow_control == :manual or
(pad_data.flow_control == :auto and auto_pull_mode?))
end

defp has_cycle?(diamond_detection_path) do
uniq_length = diamond_detection_path |> Enum.uniq_by(& &1.pid) |> length()
uniq_length < length(diamond_detection_path)
end

@spec start_diamond_detection_trigger(reference(), State.t()) :: State.t()
def start_diamond_detection_trigger(spec_ref, state) do
if map_size(state.pads_data) < 2 or
MapSet.member?(state.diamond_detection_trigger_refs, spec_ref) do
state
else
do_handle_diamond_detection_trigger(spec_ref, state)
end
end

@spec handle_diamond_detection_trigger(reference(), State.t()) :: State.t()
def handle_diamond_detection_trigger(trigger_ref, %State{} = state) do
if state.type == :endpoint or
MapSet.member?(state.diamond_detection_trigger_refs, trigger_ref),
do: state,
else: do_handle_diamond_detection_trigger(trigger_ref, state)
end

defp do_handle_diamond_detection_trigger(trigger_ref, %State{} = state) do
state =
state
|> Map.update!(:diamond_detection_trigger_refs, &MapSet.put(&1, trigger_ref))

:ok =
Message.new(:delete_diamond_detection_trigger_ref, trigger_ref)
|> send_after_to_self()

:ok = forward_diamond_detection_trigger(trigger_ref, state)

if output_pull_arity(state) >= 2,
do: postpone_diamond_detection(state),
else: state
end

defp postpone_diamond_detection(%State{} = state) when state.diamond_detection_postponed? do
state
end

defp postpone_diamond_detection(%State{} = state) do
:ok =
Message.new(:start_diamond_detection)
|> send_after_to_self(1)

%{state | diamond_detection_postponed?: true}
end

@spec delete_diamond_detection_trigger_ref(reference(), State.t()) :: State.t()
def delete_diamond_detection_trigger_ref(trigger_ref, state) do
state
|> Map.update!(:diamond_detection_trigger_refs, &MapSet.delete(&1, trigger_ref))
end

defp output_pull_arity(state) do
auto_pull_mode? = state.effective_flow_control == :pull

state.pads_data
|> Enum.count(fn {_pad_ref, pad_data} -> output_pull_pad?(pad_data, auto_pull_mode?) end)
end

defp send_after_to_self(message, seconds \\ 10) do
send_after_time = Membrane.Time.seconds(seconds) |> Membrane.Time.as_milliseconds(:round)
self() |> Process.send_after(message, send_after_time)
:ok
end

defp get_component_path() do
# adding @component_path_prefix to component path causes that component path
# always has more than 64 bytes, so it won't be copied during sending a message
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not convinced it's worth the hassle. Did you do any benchmarks? BTW, it seems we construct this 'serialized' component path every time we need it - I'd do that once and keep it in state.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see it's now constructed once, but I'm still unconvinced about prepending the prefix. Handling ref-counted binaries strains GC, and copying may be more efficient if the binary is small, so I wouldn't force either way, at least without benchmarks.

[@component_path_prefix | Membrane.ComponentPath.get()]
|> Enum.join()
end

defp have_common_prefix?(path_a, path_b), do: List.last(path_a) == List.last(path_b)

defp remove_component_path_prefix(path_in_graph) do
path_in_graph
|> Enum.map(fn %Vertex{component_path: @component_path_prefix <> component_path} = vertex ->
%{vertex | component_path: component_path}
end)
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
defmodule Membrane.Core.Element.DiamondDetectionController.DiamondLogger do
@moduledoc false

alias Membrane.Core.Element.DiamondDetectionController.PathInGraph
alias Membrane.Core.Element.DiamondDetectionController.PathInGraph.Vertex

require Membrane.Logger

# logging a diamond is moved to the separate module due to testing

@spec log_diamond(PathInGraph.t(), PathInGraph.t()) :: :ok
def log_diamond(path_a, path_b) do
from = List.last(path_a) |> Map.fetch!(:component_path)
to = List.first(path_a) |> Map.fetch!(:component_path)

Membrane.Logger.debug("""
Two paths from element #{from} to #{to} were detected, in which all pads are operating in pull \
mode. With such a pipeline configuration, the membrane flow control mechanism may stop demanding \
buffers. If you are debugging such an issue, keep in mind that input pads with `flow_control: :auto` \
demand data when there is a demand for data on ALL output pads with `flow_control: :auto`.

The first path from #{from} to #{to} leads:
#{inspect_path(path_a)}

The second path from #{from} to #{to} leads:
#{inspect_path(path_b)}
""")

:ok
end

defp inspect_path(path) do
path
|> Enum.reverse()
|> Enum.chunk_every(2, 1, :discard)
|> Enum.map_join("\n", fn [%Vertex{} = from, %Vertex{} = to] ->
"""
From #{from.component_path} via output pad #{inspect(from.output_pad_ref)} \
to #{to.component_path} via input pad #{inspect(to.input_pad_ref)}.
"""
end)
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
defmodule Membrane.Core.Element.DiamondDetectionController.PathInGraph do
@moduledoc false

defmodule Vertex do
@moduledoc false
require Membrane.Pad, as: Pad

defstruct [:pid, :component_path, :input_pad_ref, :output_pad_ref]

@type t :: %__MODULE__{
pid: pid(),
component_path: String.t(),
input_pad_ref: Pad.ref() | nil,
output_pad_ref: Pad.ref() | nil
}
end

@type t :: [Vertex.t()]
end
11 changes: 10 additions & 1 deletion lib/membrane/core/element/state.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ defmodule Membrane.Core.Element.State do

alias Membrane.{Clock, Element, Pad, Sync}
alias Membrane.Core.Child.PadModel
alias Membrane.Core.Element.DiamondDetectionController.PathInGraph
alias Membrane.Core.Element.EffectiveFlowController
alias Membrane.Core.Timer

Expand Down Expand Up @@ -46,7 +47,12 @@ defmodule Membrane.Core.Element.State do
stalker: Membrane.Core.Stalker.t(),
satisfied_auto_output_pads: MapSet.t(),
awaiting_auto_input_pads: MapSet.t(),
resume_delayed_demands_loop_in_mailbox?: boolean()
resume_delayed_demands_loop_in_mailbox?: boolean(),
diamond_detection_ref_to_path: %{
optional(reference()) => PathInGraph.t()
},
diamond_detection_trigger_refs: MapSet.t(reference()),
diamond_detection_postponed?: boolean()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's put these into a single diamond_detection map/struct

}

# READ THIS BEFORE ADDING NEW FIELD!!!
Expand Down Expand Up @@ -79,6 +85,9 @@ defmodule Membrane.Core.Element.State do
handle_demand_loop_counter: 0,
pads_to_snapshot: MapSet.new(),
playback_queue: [],
diamond_detection_ref_to_path: %{},
diamond_detection_trigger_refs: MapSet.new(),
diamond_detection_postponed?: false,
pads_data: %{},
satisfied_auto_output_pads: MapSet.new(),
awaiting_auto_input_pads: MapSet.new(),
Expand Down
Loading
Loading