Skip to content

Commit

Permalink
Implement CR suggestions
Browse files Browse the repository at this point in the history
  • Loading branch information
FelonEkonom committed Jun 11, 2024
1 parent 8fad39c commit b524f70
Show file tree
Hide file tree
Showing 12 changed files with 216 additions and 132 deletions.
41 changes: 28 additions & 13 deletions lib/membrane/clock.ex
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ defmodule Membrane.Clock do
alias Membrane.Core.Message
alias Membrane.Time

require Membrane.Core.Utils, as: Utils

@typedoc @moduledoc
@type t :: pid

Expand Down Expand Up @@ -115,6 +117,12 @@ defmodule Membrane.Clock do

@impl GenServer
def init(options) do
Utils.log_on_error do
do_init(options)
end
end

defp do_init(options) do
proxy_opts = get_proxy_options(options[:proxy], options[:proxy_for])

state =
Expand All @@ -131,7 +139,13 @@ defmodule Membrane.Clock do
end

@impl GenServer
def handle_cast({:proxy_for, proxy_for}, %{proxy: true} = state) do
def handle_cast(request, state) do
Utils.log_on_error do
do_handle_cast(request, state)
end
end

defp do_handle_cast({:proxy_for, proxy_for}, %{proxy: true} = state) do
if state.proxy_for, do: unsubscribe(state.proxy_for)

state = %{state | proxy_for: proxy_for}
Expand All @@ -147,8 +161,7 @@ defmodule Membrane.Clock do
{:noreply, state}
end

@impl GenServer
def handle_cast({:clock_subscribe, pid}, state) do
defp do_handle_cast({:clock_subscribe, pid}, state) do
state
|> update_in([:subscribers, pid], fn
nil ->
Expand All @@ -162,8 +175,7 @@ defmodule Membrane.Clock do
~> {:noreply, &1}
end

@impl GenServer
def handle_cast({:clock_unsubscribe, pid}, state) do
defp do_handle_cast({:clock_unsubscribe, pid}, state) do
if Map.has_key?(state.subscribers, pid) do
{subs, state} =
state |> Bunch.Access.get_updated_in([:subscribers, pid, :subscriptions], &(&1 - 1))
Expand All @@ -175,24 +187,27 @@ defmodule Membrane.Clock do
~> {:noreply, &1}
end

@impl GenServer
def handle_info({:membrane_clock_update, till_next}, %{proxy: false} = state) do
@impl true
def handle_info(msg, state) do
Utils.log_on_error do
do_handle_info(msg, state)
end
end

defp do_handle_info({:membrane_clock_update, till_next}, %{proxy: false} = state) do
{:noreply, handle_clock_update(till_next, state)}
end

@impl GenServer
def handle_info({:membrane_clock_ratio, pid, ratio}, %{proxy: true, proxy_for: pid} = state) do
defp do_handle_info({:membrane_clock_ratio, pid, ratio}, %{proxy: true, proxy_for: pid} = state) do
{:noreply, broadcast_and_update_ratio(ratio, state)}
end

@impl GenServer
# When ratio from previously proxied clock comes in after unsubscribing
def handle_info({:membrane_clock_ratio, _pid, _ratio}, %{proxy: true} = state) do
defp do_handle_info({:membrane_clock_ratio, _pid, _ratio}, %{proxy: true} = state) do
{:noreply, state}
end

@impl GenServer
def handle_info({:DOWN, _ref, :process, pid, _reason}, state) do
defp do_handle_info({:DOWN, _ref, :process, pid, _reason}, state) do
{:noreply, handle_unsubscribe(pid, state)}
end

Expand Down
10 changes: 5 additions & 5 deletions lib/membrane/core/bin.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ defmodule Membrane.Core.Bin do

alias Membrane.ResourceGuard

require Membrane.Core.Macros, as: Macros
require Membrane.Core.Utils, as: Utils
require Membrane.Core.Message
require Membrane.Core.Telemetry
require Membrane.Logger
Expand Down Expand Up @@ -80,7 +80,7 @@ defmodule Membrane.Core.Bin do

@impl GenServer
def init(options) do
Macros.log_on_error do
Utils.log_on_error do
do_init(options)
end
end
Expand Down Expand Up @@ -151,15 +151,15 @@ defmodule Membrane.Core.Bin do

@impl GenServer
def handle_continue(:setup, state) do
Macros.log_on_error do
Utils.log_on_error do
state = Parent.LifecycleController.handle_setup(state)
{:noreply, state}
end
end

@impl GenServer
def handle_info(message, state) do
Macros.log_on_error do
Utils.log_on_error do
do_handle_info(message, state)
end
end
Expand Down Expand Up @@ -262,7 +262,7 @@ defmodule Membrane.Core.Bin do

@impl GenServer
def handle_call(request, from, state) do
Macros.log_on_error do
Utils.log_on_error do
do_handle_call(request, from, state)
end
end
Expand Down
10 changes: 5 additions & 5 deletions lib/membrane/core/element.ex
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ defmodule Membrane.Core.Element do

alias Membrane.Core.{SubprocessSupervisor, TimerController}

require Membrane.Core.Macros, as: Macros
require Membrane.Core.Utils, as: Utils
require Membrane.Core.Message, as: Message
require Membrane.Core.Stalker, as: Stalker
require Membrane.Core.Telemetry, as: Telemetry
Expand Down Expand Up @@ -95,7 +95,7 @@ defmodule Membrane.Core.Element do

@impl GenServer
def init(options) do
Macros.log_on_error do
Utils.log_on_error do
do_init(options)
end
end
Expand Down Expand Up @@ -162,15 +162,15 @@ defmodule Membrane.Core.Element do

@impl GenServer
def handle_continue(:setup, state) do
Macros.log_on_error do
Utils.log_on_error do
state = LifecycleController.handle_setup(state)
{:noreply, state}
end
end

@impl GenServer
def handle_call(request, from, state) do
Macros.log_on_error do
Utils.log_on_error do
do_handle_call(request, from, state)
end
end
Expand Down Expand Up @@ -204,7 +204,7 @@ defmodule Membrane.Core.Element do

@impl GenServer
def handle_info(message, state) do
Macros.log_on_error do
Utils.log_on_error do
Telemetry.report_metric(
:queue_len,
:erlang.process_info(self(), :message_queue_len) |> elem(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ defmodule Membrane.Core.Element.AtomicDemand.DistributedAtomic.Worker do

use GenServer

require Membrane.Core.Utils, as: Utils

@type t :: pid()

@spec start_link(any()) :: {:ok, t}
Expand All @@ -18,25 +20,33 @@ defmodule Membrane.Core.Element.AtomicDemand.DistributedAtomic.Worker do

@impl true
def handle_call({:add_get, atomic_ref, value}, _from, _state) do
result = :atomics.add_get(atomic_ref, 1, value)
{:reply, result, nil}
Utils.log_on_error do
result = :atomics.add_get(atomic_ref, 1, value)
{:reply, result, nil}
end
end

@impl true
def handle_call({:sub_get, atomic_ref, value}, _from, _state) do
result = :atomics.sub_get(atomic_ref, 1, value)
{:reply, result, nil}
Utils.log_on_error do
result = :atomics.sub_get(atomic_ref, 1, value)
{:reply, result, nil}
end
end

@impl true
def handle_call({:get, atomic_ref}, _from, _state) do
result = :atomics.get(atomic_ref, 1)
{:reply, result, nil}
Utils.log_on_error do
result = :atomics.get(atomic_ref, 1)
{:reply, result, nil}
end
end

@impl true
def handle_cast({:put, atomic_ref, value}, _state) do
:atomics.put(atomic_ref, 1, value)
{:noreply, nil}
Utils.log_on_error do
:atomics.put(atomic_ref, 1, value)
{:noreply, nil}
end
end
end
10 changes: 5 additions & 5 deletions lib/membrane/core/pipeline.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ defmodule Membrane.Core.Pipeline do
alias Membrane.Core.Parent.{ChildLifeController, LifecycleController}
alias Membrane.Core.TimerController

require Membrane.Core.Macros, as: Macros
require Membrane.Core.Utils, as: Utils
require Membrane.Core.Message, as: Message
require Membrane.Core.Telemetry, as: Telemetry
require Membrane.Core.Component
Expand All @@ -24,7 +24,7 @@ defmodule Membrane.Core.Pipeline do

@impl GenServer
def init(params) do
Macros.log_on_error do
Utils.log_on_error do
do_init(params)
end
end
Expand Down Expand Up @@ -81,15 +81,15 @@ defmodule Membrane.Core.Pipeline do

@impl GenServer
def handle_continue(:setup, state) do
Macros.log_on_error do
Utils.log_on_error do
state = LifecycleController.handle_setup(state)
{:noreply, state}
end
end

@impl GenServer
def handle_info(msg, state) do
Macros.log_on_error do
Utils.log_on_error do
do_handle_info(msg, state)
end
end
Expand Down Expand Up @@ -163,7 +163,7 @@ defmodule Membrane.Core.Pipeline do

@impl GenServer
def handle_call(msg, from, state) do
Macros.log_on_error do
Utils.log_on_error do
do_handle_call(msg, from, state)
end
end
Expand Down
59 changes: 35 additions & 24 deletions lib/membrane/core/pipeline/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ defmodule Membrane.Core.Pipeline.Supervisor do
alias Membrane.Core.{ProcessHelper, SubprocessSupervisor}

require Membrane.Core.Message, as: Message
require Membrane.Core.Utils, as: Utils
require Membrane.Logger

@spec run(
Expand All @@ -30,7 +31,13 @@ defmodule Membrane.Core.Pipeline.Supervisor do
end

@impl true
def init({start_fun, name, reply_to}) do
def init(params) do
Utils.log_on_error do
do_init(params)
end
end

defp do_init({start_fun, name, reply_to}) do
Process.flag(:trap_exit, true)
subprocess_supervisor = SubprocessSupervisor.start_link!()

Expand All @@ -52,18 +59,26 @@ defmodule Membrane.Core.Pipeline.Supervisor do

@impl true
def handle_call(:which_children, _from, state) do
reply =
[{SubprocessSupervisor, state.subprocess_supervisor, :supervisor, SubprocessSupervisor}] ++
case state.pipeline do
{:alive, pid} -> [{:pipeline, pid, :worker, []}]
{:exited, _reason} -> []
end

{:reply, reply, state}
Utils.log_on_error do
reply =
[{SubprocessSupervisor, state.subprocess_supervisor, :supervisor, SubprocessSupervisor}] ++
case state.pipeline do
{:alive, pid} -> [{:pipeline, pid, :worker, []}]
{:exited, _reason} -> []
end

{:reply, reply, state}
end
end

@impl true
def handle_info({:EXIT, pid, reason}, %{pipeline: {:alive, pid}} = state) do
def handle_info(msg, state) do
Utils.log_on_error do
do_handle_info(msg, state)
end
end

defp do_handle_info({:EXIT, pid, reason}, %{pipeline: {:alive, pid}} = state) do
Membrane.Logger.debug(
"got exit from pipeline with reason #{inspect(reason)}, stopping subprocess supervisor"
)
Expand All @@ -72,32 +87,28 @@ defmodule Membrane.Core.Pipeline.Supervisor do
{:noreply, %{state | pipeline: {:exited, reason}}}
end

@impl true
def handle_info(
{:EXIT, pid, :normal},
%{subprocess_supervisor: pid, pipeline: {:exited, pipeline_exit_reason}}
) do
defp do_handle_info(
{:EXIT, pid, :normal},
%{subprocess_supervisor: pid, pipeline: {:exited, pipeline_exit_reason}}
) do
Membrane.Logger.debug("got exit from subprocess supervisor, exiting")
ProcessHelper.notoelo(pipeline_exit_reason, log?: false)
end

@impl true
def handle_info({:EXIT, pid, reason}, %{
subprocess_supervisor: pid,
pipeline: {:alive, _pipeline_pid}
}) do
defp do_handle_info({:EXIT, pid, reason}, %{
subprocess_supervisor: pid,
pipeline: {:alive, _pipeline_pid}
}) do
raise "Subprocess supervisor failure, reason: #{inspect(reason)}"
end

@impl true
def handle_info({:EXIT, _pid, reason}, %{pipeline: {:alive, pipeline_pid}} = state) do
defp do_handle_info({:EXIT, _pid, reason}, %{pipeline: {:alive, pipeline_pid}} = state) do
Membrane.Logger.debug("got exit from a linked process, stopping pipeline")
Process.exit(pipeline_pid, reason)
{:noreply, state}
end

@impl true
def handle_info({:EXIT, _pid, _reason}, state) do
defp do_handle_info({:EXIT, _pid, _reason}, state) do
Membrane.Logger.debug(
"got exit from a linked process, pipeline already dead, waiting for subprocess supervisor to exit"
)
Expand Down
Loading

0 comments on commit b524f70

Please sign in to comment.