Skip to content

Commit

Permalink
Small refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
FelonEkonom committed Nov 20, 2024
1 parent f4bce2d commit e9e8c77
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 35 deletions.
20 changes: 10 additions & 10 deletions lib/membrane/core/bin.ex
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,16 @@ defmodule Membrane.Core.Bin do
{:noreply, state}
end

defp do_handle_info(Message.new(:start_diamond_detection), state) do
:ok = Parent.DiamondDetectionController.start_diamond_detection(state)
{:noreply, state}
end

defp do_handle_info(Message.new(:trigger_diamond_detection), state) do
:ok = __MODULE__.DiamondDetectionController.trigger_diamond_detection(state)
{:noreply, state}
end

defp do_handle_info(Message.new(:child_death, [name, reason]), state) do
case Parent.ChildLifeController.handle_child_death(name, reason, state) do
{:stop, reason, _state} -> ProcessHelper.notoelo(reason)
Expand All @@ -261,16 +271,6 @@ defmodule Membrane.Core.Bin do
{:noreply, state}
end

defp do_handle_info(Message.new(:start_diamond_detection), state) do
:ok = Parent.DiamondDetectionController.start_diamond_detection(state)
{:noreply, state}
end

defp do_handle_info(Message.new(:trigger_diamond_detection), state) do
:ok = __MODULE__.DiamondDetectionController.trigger_diamond_detection(state)
{:noreply, state}
end

defp do_handle_info(Message.new(_type, _args, _opts) = message, _state) do
raise Membrane.BinError, "Received invalid message #{inspect(message)}"
end
Expand Down
1 change: 0 additions & 1 deletion lib/membrane/core/bin/diamond_detection_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ defmodule Membrane.Core.Bin.DiamondDetectionController do

alias Membrane.Core.Bin.State

# todo: trigger DD on spawning new child & linking
@spec trigger_diamond_detection(State.t()) :: :ok
def trigger_diamond_detection(state) do
Message.send(state.parent, :trigger_diamond_detection)
Expand Down
10 changes: 5 additions & 5 deletions lib/membrane/core/element.ex
Original file line number Diff line number Diff line change
Expand Up @@ -282,11 +282,6 @@ defmodule Membrane.Core.Element do
{:noreply, state}
end

defp do_handle_info(Message.new(:terminate), state) do
state = LifecycleController.handle_terminate_request(state)
{:noreply, state}
end

defp do_handle_info(Message.new(:start_diamond_detection), state) do
:ok = DiamondDetectionController.start_diamond_detection(state)
{:noreply, state}
Expand All @@ -311,6 +306,11 @@ defmodule Membrane.Core.Element do
{:noreply, state}
end

defp do_handle_info(Message.new(:terminate), state) do
state = LifecycleController.handle_terminate_request(state)
{:noreply, state}
end

defp do_handle_info(Message.new(_type, _args, _opts) = message, _state) do
raise Membrane.ElementError, "Received invalid message #{inspect(message)}"
end
Expand Down
26 changes: 17 additions & 9 deletions lib/membrane/core/element/diamond_detection_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,26 @@ defmodule Membrane.Core.Element.DiamondDetectionController do

@spec continue_diamond_detection(reference(), diamond_detection_path(), State.t()) :: State.t()
def continue_diamond_detection(diamond_detection_ref, diamond_detecton_path, state) do
delete_message = Message.new(:delete_diamond_detection_ref, diamond_detection_ref)
send_after_time = Membrane.Time.seconds(10) |> Membrane.Time.as_milliseconds(:round)
self() |> Process.send_after(delete_message, send_after_time)
cond do
not is_map_key(state.diamond_detection_ref_to_path, diamond_detection_ref) ->
delete_message = Message.new(:delete_diamond_detection_ref, diamond_detection_ref)
send_after_time = Membrane.Time.seconds(10) |> Membrane.Time.as_milliseconds(:round)
self() |> Process.send_after(delete_message, send_after_time)

case Map.fetch(state.diamond_detection_ref_to_path, diamond_detection_ref) do
{:ok, _old_path} ->
# todo: log diamond if not cycle
state

:error ->
:ok = forward_diamond_detection(diamond_detection_ref, diamond_detecton_path, state)

state
|> put_in(
[:diamond_detection_ref_to_path, diamond_detection_ref],
diamond_detecton_path
)

has_cycle?(diamond_detecton_path) ->
state

true ->
# todo: log diamond
state
end
end

Expand Down Expand Up @@ -70,4 +73,9 @@ defmodule Membrane.Core.Element.DiamondDetectionController do
(pad_data.flow_control == :manual or
(pad_data.flow_control == :auto and auto_pull_mode?))
end

defp has_cycle?(diamond_detection_path) do
uniq_length = diamond_detection_path |> Enum.uniq() |> length()
uniq_length < length(diamond_detection_path)
end
end
15 changes: 15 additions & 0 deletions lib/membrane/core/parent/child_life_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ defmodule Membrane.Core.Parent.ChildLifeController do
"""
@spec handle_spec(ChildrenSpec.t(), Parent.state()) :: Parent.state() | no_return()
def handle_spec(spec, state) do



spec_ref = make_ref()
canonical_spec = make_canonical(spec)

Expand Down Expand Up @@ -162,11 +165,23 @@ defmodule Membrane.Core.Parent.ChildLifeController do
dependent_specs: dependent_specs,
awaiting_responses: MapSet.new()
})
|> trigger_diamond_detection()

state = StartupUtils.exec_handle_spec_started(all_children_names, state)
proceed_spec_startup(spec_ref, state)
end

defp trigger_diamond_detection(state) do
cond do
Component.bin?(state) ->
:ok = Bin.DiamondDetectionController.trigger_diamond_detection(state)
state

Component.pipeline?(state) ->
Pipeline.DiamondDetectionController.trigger_diamond_detection(state)
end
end

defp assert_all_static_pads_linked_in_spec!(children_definitions, links) do
pads_to_link =
links
Expand Down
20 changes: 10 additions & 10 deletions lib/membrane/core/pipeline.ex
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,16 @@ defmodule Membrane.Core.Pipeline do
{:noreply, state}
end

defp do_handle_info(Message.new(:start_diamond_detection), state) do
state = __MODULE__.DiamondDetectionController.start_diamond_detection(state)
{:noreply, state}
end

defp do_handle_info(Message.new(:trigger_diamond_detection), state) do
state = __MODULE__.DiamondDetectionController.trigger_diamond_detection(state)
{:noreply, state}
end

defp do_handle_info(Message.new(:initialized, child), state) do
state = ChildLifeController.handle_child_initialized(child, state)
{:noreply, state}
Expand All @@ -147,16 +157,6 @@ defmodule Membrane.Core.Pipeline do
{:noreply, state}
end

defp do_handle_info(Message.new(:start_diamond_detection), state) do
state = __MODULE__.DiamondDetectionController.start_diamond_detection(state)
{:noreply, state}
end

defp do_handle_info(Message.new(:trigger_diamond_detection), state) do
state = __MODULE__.DiamondDetectionController.trigger_diamond_detection(state)
{:noreply, state}
end

defp do_handle_info(Message.new(_type, _args, _opts) = message, _state) do
raise Membrane.PipelineError, "Received invalid message #{inspect(message)}"
end
Expand Down

0 comments on commit e9e8c77

Please sign in to comment.