Skip to content

Commit

Permalink
Trigger diamon detection between elements WiP
Browse files Browse the repository at this point in the history
  • Loading branch information
FelonEkonom committed Nov 21, 2024
1 parent 5a76b10 commit 0be84b9
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 5 deletions.
75 changes: 71 additions & 4 deletions lib/membrane/core/element/diamond_detection_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ defmodule Membrane.Core.Element.DiamondDetectionController do
def continue_diamond_detection(diamond_detection_ref, diamond_detecton_path, state) do
cond do
not is_map_key(state.diamond_detection_ref_to_path, diamond_detection_ref) ->
delete_message = Message.new(:delete_diamond_detection_ref, diamond_detection_ref)
send_after_time = Membrane.Time.seconds(10) |> Membrane.Time.as_milliseconds(:round)
self() |> Process.send_after(delete_message, send_after_time)

:ok = forward_diamond_detection(diamond_detection_ref, diamond_detecton_path, state)

:ok =
Message.new(:delete_diamond_detection_ref, diamond_detection_ref)
|> send_after_to_self()

state
|> put_in(
[:diamond_detection_ref_to_path, diamond_detection_ref],
Expand Down Expand Up @@ -68,6 +68,15 @@ defmodule Membrane.Core.Element.DiamondDetectionController do
end)
end

defp forward_diamond_detection_trigger(trigger_ref, state) do
state.pads_data
|> Enum.each(fn {_pad_ref, %PadData{} = pad_data} ->
if pad_data.direction == :output and pad_data.flow_control != :push do
Message.send(pad_data.pid, :diamond_detection_trigger, trigger_ref)
end
end)
end

defp is_output_pull_pad(%PadData{} = pad_data, auto_pull_mode?) do
pad_data.direction == :output and
(pad_data.flow_control == :manual or
Expand All @@ -78,4 +87,62 @@ defmodule Membrane.Core.Element.DiamondDetectionController do
uniq_length = diamond_detection_path |> Enum.uniq() |> length()
uniq_length < length(diamond_detection_path)
end

def start_diamond_detection_trigger(spec_ref, state) when map_size(state.pads_data) >= 2 do
handle_diamond_detection_trigger(spec_ref, state)
end

def start_diamond_detection_trigger(_spec_ref, state), do: state

def handle_diamond_detection_trigger(trigger_ref, %State{} = state) do
if MapSet.member?(state.diamond_detection_trigger_refs, trigger_ref),
do: state,
else: do_handle_diamond_detection_trigger(trigger_ref, state)
end

defp do_handle_diamond_detection_trigger(trigger_ref, %State{} = state) do
state =
state
|> Map.update!(:diamond_detection_trigger_refs, &MapSet.put(&1, trigger_ref))

:ok =
Message.new(:delete_diamond_detection_trigger_ref, trigger_ref)
|> send_after_to_self()

:ok = forward_diamond_detection_trigger(trigger_ref, state)

if output_pull_arity(state) >= 2,
do: postpone_diamond_detection(state),
else: state
end

defp postpone_diamond_detection(%State{} = state) when state.diamond_detection_postponed? do
state
end

defp postpone_diamond_detection(%State{} = state) do
:ok =
Message.new(:start_diamond_detection)
|> send_after_to_self(1)

%{state | diamond_detection_postponed?: true}
end

def delete_diamond_detection_trigger_ref(trigger_ref, state) do
state
|> Map.update!(:diamond_detection_trigger_refs, &MapSet.delete(&1, trigger_ref))
end

defp output_pull_arity(state) do
auto_pull_mode? = state.effective_flow_control == :pull

state.pads_data
|> Enum.count(fn {_pad_ref, pad_data} -> is_output_pull_pad(pad_data, auto_pull_mode?) end)
end

defp send_after_to_self(message, seconds \\ 10) do
send_after_time = Membrane.Time.seconds(seconds) |> Membrane.Time.as_milliseconds(:round)
self() |> Process.send_after(message, send_after_time)
:ok
end
end
6 changes: 5 additions & 1 deletion lib/membrane/core/element/state.ex
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ defmodule Membrane.Core.Element.State do
resume_delayed_demands_loop_in_mailbox?: boolean(),
diamond_detection_ref_to_path: %{
optional(reference()) => DiamondDetectionController.diamond_detection_path()
}
},
diamond_detection_trigger_refs: MapSet.t(reference()),
diamond_detection_postponed?: boolean()
}

# READ THIS BEFORE ADDING NEW FIELD!!!
Expand Down Expand Up @@ -83,6 +85,8 @@ defmodule Membrane.Core.Element.State do
pads_to_snapshot: MapSet.new(),
playback_queue: [],
diamond_detection_ref_to_path: %{},
diamond_detection_trigger_refs: MapSet.new(),
diamond_detection_postponed?: false,
pads_data: %{},
satisfied_auto_output_pads: MapSet.new(),
awaiting_auto_input_pads: MapSet.new(),
Expand Down
2 changes: 2 additions & 0 deletions lib/membrane/core/parent/child_life_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,8 @@ defmodule Membrane.Core.Parent.ChildLifeController do
withl spec_data: {:ok, spec_data} <- Map.fetch(state.pending_specs, spec_ref),
do: {spec_data, state} = do_proceed_spec_startup(spec_ref, spec_data, state),
status: :ready <- spec_data.status do


cleanup_spec_startup(spec_ref, state)
else
spec_data: :error -> state
Expand Down

0 comments on commit 0be84b9

Please sign in to comment.