diff --git a/lib/membrane/clock.ex b/lib/membrane/clock.ex index 5f1097a5c..21d01351d 100644 --- a/lib/membrane/clock.ex +++ b/lib/membrane/clock.ex @@ -32,6 +32,8 @@ defmodule Membrane.Clock do alias Membrane.Core.Message alias Membrane.Time + require Membrane.Core.Utils, as: Utils + @typedoc @moduledoc @type t :: pid @@ -115,6 +117,12 @@ defmodule Membrane.Clock do @impl GenServer def init(options) do + Utils.log_on_error do + do_init(options) + end + end + + defp do_init(options) do proxy_opts = get_proxy_options(options[:proxy], options[:proxy_for]) state = @@ -131,7 +139,13 @@ defmodule Membrane.Clock do end @impl GenServer - def handle_cast({:proxy_for, proxy_for}, %{proxy: true} = state) do + def handle_cast(request, state) do + Utils.log_on_error do + do_handle_cast(request, state) + end + end + + defp do_handle_cast({:proxy_for, proxy_for}, %{proxy: true} = state) do if state.proxy_for, do: unsubscribe(state.proxy_for) state = %{state | proxy_for: proxy_for} @@ -147,8 +161,7 @@ defmodule Membrane.Clock do {:noreply, state} end - @impl GenServer - def handle_cast({:clock_subscribe, pid}, state) do + defp do_handle_cast({:clock_subscribe, pid}, state) do state |> update_in([:subscribers, pid], fn nil -> @@ -162,8 +175,7 @@ defmodule Membrane.Clock do ~> {:noreply, &1} end - @impl GenServer - def handle_cast({:clock_unsubscribe, pid}, state) do + defp do_handle_cast({:clock_unsubscribe, pid}, state) do if Map.has_key?(state.subscribers, pid) do {subs, state} = state |> Bunch.Access.get_updated_in([:subscribers, pid, :subscriptions], &(&1 - 1)) @@ -175,24 +187,27 @@ defmodule Membrane.Clock do ~> {:noreply, &1} end - @impl GenServer - def handle_info({:membrane_clock_update, till_next}, %{proxy: false} = state) do + @impl true + def handle_info(msg, state) do + Utils.log_on_error do + do_handle_info(msg, state) + end + end + + defp do_handle_info({:membrane_clock_update, till_next}, %{proxy: false} = state) do {:noreply, handle_clock_update(till_next, state)} end - @impl GenServer - def handle_info({:membrane_clock_ratio, pid, ratio}, %{proxy: true, proxy_for: pid} = state) do + defp do_handle_info({:membrane_clock_ratio, pid, ratio}, %{proxy: true, proxy_for: pid} = state) do {:noreply, broadcast_and_update_ratio(ratio, state)} end - @impl GenServer # When ratio from previously proxied clock comes in after unsubscribing - def handle_info({:membrane_clock_ratio, _pid, _ratio}, %{proxy: true} = state) do + defp do_handle_info({:membrane_clock_ratio, _pid, _ratio}, %{proxy: true} = state) do {:noreply, state} end - @impl GenServer - def handle_info({:DOWN, _ref, :process, pid, _reason}, state) do + defp do_handle_info({:DOWN, _ref, :process, pid, _reason}, state) do {:noreply, handle_unsubscribe(pid, state)} end diff --git a/lib/membrane/core/bin.ex b/lib/membrane/core/bin.ex index 6b3f615a5..b7bf83a85 100644 --- a/lib/membrane/core/bin.ex +++ b/lib/membrane/core/bin.ex @@ -19,7 +19,7 @@ defmodule Membrane.Core.Bin do alias Membrane.ResourceGuard - require Membrane.Core.Macros, as: Macros + require Membrane.Core.Utils, as: Utils require Membrane.Core.Message require Membrane.Core.Telemetry require Membrane.Logger @@ -80,7 +80,7 @@ defmodule Membrane.Core.Bin do @impl GenServer def init(options) do - Macros.log_on_error do + Utils.log_on_error do do_init(options) end end @@ -151,7 +151,7 @@ defmodule Membrane.Core.Bin do @impl GenServer def handle_continue(:setup, state) do - Macros.log_on_error do + Utils.log_on_error do state = Parent.LifecycleController.handle_setup(state) {:noreply, state} end @@ -159,7 +159,7 @@ defmodule Membrane.Core.Bin do @impl GenServer def handle_info(message, state) do - Macros.log_on_error do + Utils.log_on_error do do_handle_info(message, state) end end @@ -262,7 +262,7 @@ defmodule Membrane.Core.Bin do @impl GenServer def handle_call(request, from, state) do - Macros.log_on_error do + Utils.log_on_error do do_handle_call(request, from, state) end end diff --git a/lib/membrane/core/element.ex b/lib/membrane/core/element.ex index 867152d7e..e5424a672 100644 --- a/lib/membrane/core/element.ex +++ b/lib/membrane/core/element.ex @@ -35,7 +35,7 @@ defmodule Membrane.Core.Element do alias Membrane.Core.{SubprocessSupervisor, TimerController} - require Membrane.Core.Macros, as: Macros + require Membrane.Core.Utils, as: Utils require Membrane.Core.Message, as: Message require Membrane.Core.Stalker, as: Stalker require Membrane.Core.Telemetry, as: Telemetry @@ -95,7 +95,7 @@ defmodule Membrane.Core.Element do @impl GenServer def init(options) do - Macros.log_on_error do + Utils.log_on_error do do_init(options) end end @@ -162,7 +162,7 @@ defmodule Membrane.Core.Element do @impl GenServer def handle_continue(:setup, state) do - Macros.log_on_error do + Utils.log_on_error do state = LifecycleController.handle_setup(state) {:noreply, state} end @@ -170,7 +170,7 @@ defmodule Membrane.Core.Element do @impl GenServer def handle_call(request, from, state) do - Macros.log_on_error do + Utils.log_on_error do do_handle_call(request, from, state) end end @@ -204,7 +204,7 @@ defmodule Membrane.Core.Element do @impl GenServer def handle_info(message, state) do - Macros.log_on_error do + Utils.log_on_error do Telemetry.report_metric( :queue_len, :erlang.process_info(self(), :message_queue_len) |> elem(1) diff --git a/lib/membrane/core/element/atomic_demand/distributed_atomic/worker.ex b/lib/membrane/core/element/atomic_demand/distributed_atomic/worker.ex index 5f60450d7..56a967326 100644 --- a/lib/membrane/core/element/atomic_demand/distributed_atomic/worker.ex +++ b/lib/membrane/core/element/atomic_demand/distributed_atomic/worker.ex @@ -6,6 +6,8 @@ defmodule Membrane.Core.Element.AtomicDemand.DistributedAtomic.Worker do use GenServer + require Membrane.Core.Utils, as: Utils + @type t :: pid() @spec start_link(any()) :: {:ok, t} @@ -18,25 +20,33 @@ defmodule Membrane.Core.Element.AtomicDemand.DistributedAtomic.Worker do @impl true def handle_call({:add_get, atomic_ref, value}, _from, _state) do - result = :atomics.add_get(atomic_ref, 1, value) - {:reply, result, nil} + Utils.log_on_error do + result = :atomics.add_get(atomic_ref, 1, value) + {:reply, result, nil} + end end @impl true def handle_call({:sub_get, atomic_ref, value}, _from, _state) do - result = :atomics.sub_get(atomic_ref, 1, value) - {:reply, result, nil} + Utils.log_on_error do + result = :atomics.sub_get(atomic_ref, 1, value) + {:reply, result, nil} + end end @impl true def handle_call({:get, atomic_ref}, _from, _state) do - result = :atomics.get(atomic_ref, 1) - {:reply, result, nil} + Utils.log_on_error do + result = :atomics.get(atomic_ref, 1) + {:reply, result, nil} + end end @impl true def handle_cast({:put, atomic_ref, value}, _state) do - :atomics.put(atomic_ref, 1, value) - {:noreply, nil} + Utils.log_on_error do + :atomics.put(atomic_ref, 1, value) + {:noreply, nil} + end end end diff --git a/lib/membrane/core/pipeline.ex b/lib/membrane/core/pipeline.ex index 4dc4acdd5..6f9fc95db 100644 --- a/lib/membrane/core/pipeline.ex +++ b/lib/membrane/core/pipeline.ex @@ -9,7 +9,7 @@ defmodule Membrane.Core.Pipeline do alias Membrane.Core.Parent.{ChildLifeController, LifecycleController} alias Membrane.Core.TimerController - require Membrane.Core.Macros, as: Macros + require Membrane.Core.Utils, as: Utils require Membrane.Core.Message, as: Message require Membrane.Core.Telemetry, as: Telemetry require Membrane.Core.Component @@ -24,7 +24,7 @@ defmodule Membrane.Core.Pipeline do @impl GenServer def init(params) do - Macros.log_on_error do + Utils.log_on_error do do_init(params) end end @@ -81,7 +81,7 @@ defmodule Membrane.Core.Pipeline do @impl GenServer def handle_continue(:setup, state) do - Macros.log_on_error do + Utils.log_on_error do state = LifecycleController.handle_setup(state) {:noreply, state} end @@ -89,7 +89,7 @@ defmodule Membrane.Core.Pipeline do @impl GenServer def handle_info(msg, state) do - Macros.log_on_error do + Utils.log_on_error do do_handle_info(msg, state) end end @@ -163,7 +163,7 @@ defmodule Membrane.Core.Pipeline do @impl GenServer def handle_call(msg, from, state) do - Macros.log_on_error do + Utils.log_on_error do do_handle_call(msg, from, state) end end diff --git a/lib/membrane/core/pipeline/supervisor.ex b/lib/membrane/core/pipeline/supervisor.ex index d139642a4..0f68c53b3 100644 --- a/lib/membrane/core/pipeline/supervisor.ex +++ b/lib/membrane/core/pipeline/supervisor.ex @@ -6,6 +6,7 @@ defmodule Membrane.Core.Pipeline.Supervisor do alias Membrane.Core.{ProcessHelper, SubprocessSupervisor} require Membrane.Core.Message, as: Message + require Membrane.Core.Utils, as: Utils require Membrane.Logger @spec run( @@ -30,7 +31,13 @@ defmodule Membrane.Core.Pipeline.Supervisor do end @impl true - def init({start_fun, name, reply_to}) do + def init(params) do + Utils.log_on_error do + do_init(params) + end + end + + defp do_init({start_fun, name, reply_to}) do Process.flag(:trap_exit, true) subprocess_supervisor = SubprocessSupervisor.start_link!() @@ -52,18 +59,26 @@ defmodule Membrane.Core.Pipeline.Supervisor do @impl true def handle_call(:which_children, _from, state) do - reply = - [{SubprocessSupervisor, state.subprocess_supervisor, :supervisor, SubprocessSupervisor}] ++ - case state.pipeline do - {:alive, pid} -> [{:pipeline, pid, :worker, []}] - {:exited, _reason} -> [] - end - - {:reply, reply, state} + Utils.log_on_error do + reply = + [{SubprocessSupervisor, state.subprocess_supervisor, :supervisor, SubprocessSupervisor}] ++ + case state.pipeline do + {:alive, pid} -> [{:pipeline, pid, :worker, []}] + {:exited, _reason} -> [] + end + + {:reply, reply, state} + end end @impl true - def handle_info({:EXIT, pid, reason}, %{pipeline: {:alive, pid}} = state) do + def handle_info(msg, state) do + Utils.log_on_error do + do_handle_info(msg, state) + end + end + + defp do_handle_info({:EXIT, pid, reason}, %{pipeline: {:alive, pid}} = state) do Membrane.Logger.debug( "got exit from pipeline with reason #{inspect(reason)}, stopping subprocess supervisor" ) @@ -72,32 +87,28 @@ defmodule Membrane.Core.Pipeline.Supervisor do {:noreply, %{state | pipeline: {:exited, reason}}} end - @impl true - def handle_info( - {:EXIT, pid, :normal}, - %{subprocess_supervisor: pid, pipeline: {:exited, pipeline_exit_reason}} - ) do + defp do_handle_info( + {:EXIT, pid, :normal}, + %{subprocess_supervisor: pid, pipeline: {:exited, pipeline_exit_reason}} + ) do Membrane.Logger.debug("got exit from subprocess supervisor, exiting") ProcessHelper.notoelo(pipeline_exit_reason, log?: false) end - @impl true - def handle_info({:EXIT, pid, reason}, %{ - subprocess_supervisor: pid, - pipeline: {:alive, _pipeline_pid} - }) do + defp do_handle_info({:EXIT, pid, reason}, %{ + subprocess_supervisor: pid, + pipeline: {:alive, _pipeline_pid} + }) do raise "Subprocess supervisor failure, reason: #{inspect(reason)}" end - @impl true - def handle_info({:EXIT, _pid, reason}, %{pipeline: {:alive, pipeline_pid}} = state) do + defp do_handle_info({:EXIT, _pid, reason}, %{pipeline: {:alive, pipeline_pid}} = state) do Membrane.Logger.debug("got exit from a linked process, stopping pipeline") Process.exit(pipeline_pid, reason) {:noreply, state} end - @impl true - def handle_info({:EXIT, _pid, _reason}, state) do + defp do_handle_info({:EXIT, _pid, _reason}, state) do Membrane.Logger.debug( "got exit from a linked process, pipeline already dead, waiting for subprocess supervisor to exit" ) diff --git a/lib/membrane/core/stalker.ex b/lib/membrane/core/stalker.ex index 7ab89b9b7..384459f20 100644 --- a/lib/membrane/core/stalker.ex +++ b/lib/membrane/core/stalker.ex @@ -3,6 +3,7 @@ defmodule Membrane.Core.Stalker do use GenServer alias Membrane.{ComponentPath, Pad, Time} + require Membrane.Core.Utils, as: Utils @unsafely_name_processes_for_observer Application.compile_env( :membrane_core, @@ -336,7 +337,13 @@ defmodule Membrane.Core.Stalker do end @impl true - def init(%{pipeline: pipeline}) do + def init(options) do + Utils.log_on_error do + do_init(options) + end + end + + defp do_init(%{pipeline: pipeline}) do Process.send_after(self(), :scrape_metrics, @scrape_interval) ets = create_ets() send(pipeline, {:ets, ets}) @@ -354,7 +361,13 @@ defmodule Membrane.Core.Stalker do end @impl true - def handle_info(:scrape_metrics, state) do + def handle_info(msg, state) do + Utils.log_on_error do + do_handle_info(msg, state) + end + end + + defp do_handle_info(:scrape_metrics, state) do Process.send_after(self(), :scrape_metrics, @scrape_interval) metrics = scrape_metrics(state) timestamp = Time.milliseconds(System.monotonic_time(:millisecond) - state.init_time) @@ -363,15 +376,13 @@ defmodule Membrane.Core.Stalker do {:noreply, %{state | metrics: Map.new(metrics), timestamp: timestamp}} end - @impl true - def handle_info({:graph, graph_update}, state) do + defp do_handle_info({:graph, graph_update}, state) do {action, graph_updates, state} = handle_graph_update(graph_update, state) send_to_subscribers(graph_updates, :graph, &{:graph, action, &1}, state) {:noreply, state} end - @impl true - def handle_info({:subscribe, pid, topics, opts}, state) do + defp do_handle_info({:subscribe, pid, topics, opts}, state) do _ref = unless Map.has_key?(state.subscribers, pid), do: Process.monitor(pid) opts = Keyword.validate!(opts, [:confirm]) with {:ok, id} <- Keyword.fetch(opts, :confirm), do: send(pid, {:subscribed, id}) @@ -414,8 +425,7 @@ defmodule Membrane.Core.Stalker do {:noreply, state} end - @impl true - def handle_info({:DOWN, _ref, :process, pid, _reason}, state) do + defp do_handle_info({:DOWN, _ref, :process, pid, _reason}, state) do cond do Map.has_key?(state.subscribers, pid) -> {:noreply, Bunch.Access.delete_in(state, [:subscribers, pid])} diff --git a/lib/membrane/core/subprocess_supervisor.ex b/lib/membrane/core/subprocess_supervisor.ex index 4639ce225..c0437d5af 100644 --- a/lib/membrane/core/subprocess_supervisor.ex +++ b/lib/membrane/core/subprocess_supervisor.ex @@ -12,6 +12,7 @@ defmodule Membrane.Core.SubprocessSupervisor do alias Membrane.Core.Stalker require Membrane.Core.Message, as: Message + require Membrane.Core.Utils, as: Utils require Membrane.Logger @spec start_link!() :: pid() @@ -93,18 +94,30 @@ defmodule Membrane.Core.SubprocessSupervisor do @impl true def init(parent_process) do - Process.flag(:trap_exit, true) - - {:ok, - %{ - parent_component: nil, - parent_process: {:alive, parent_process}, - children: %{} - }} + Utils.log_on_error do + Process.flag(:trap_exit, true) + + {:ok, + %{ + parent_component: nil, + parent_process: {:alive, parent_process}, + children: %{} + }} + end end @impl true - def handle_call(Message.new(:start_component, [name, component_module, options]), _from, state) do + def handle_call(request, from, state) do + Utils.log_on_error do + do_handle_call(request, from, state) + end + end + + defp do_handle_call( + Message.new(:start_component, [name, component_module, options]), + _from, + state + ) do subprocess_supervisor = start_link_subprocess_supervisor!(options) options = @@ -138,8 +151,7 @@ defmodule Membrane.Core.SubprocessSupervisor do end end - @impl true - def handle_call(Message.new(:start_utility, child_spec), _from, state) do + defp do_handle_call(Message.new(:start_utility, child_spec), _from, state) do try do {m, f, a} = child_spec.start apply(m, f, a) @@ -169,30 +181,33 @@ defmodule Membrane.Core.SubprocessSupervisor do end end - @impl true - def handle_call(:which_children, _from, state) do + defp do_handle_call(:which_children, _from, state) do reply = Enum.map(state.children, fn {pid, data} -> {data.name, pid, data.type, []} end) {:reply, reply, state} end @impl true - def handle_info(Message.new(:set_parent_component, [pid, observability_config]), state) do + def handle_info(msg, state) do + Utils.log_on_error do + do_handle_info(msg, state) + end + end + + defp do_handle_info(Message.new(:set_parent_component, [pid, observability_config]), state) do Membrane.Core.Stalker.setup_component_utility(observability_config, "subprocess supervisor") {:noreply, %{state | parent_component: pid}} end - @impl true - def handle_info( - {:EXIT, pid, _reason}, - %{parent_process: {:alive, pid}, children: children} = state - ) - when children == %{} do + defp do_handle_info( + {:EXIT, pid, _reason}, + %{parent_process: {:alive, pid}, children: children} = state + ) + when children == %{} do Membrane.Logger.debug("exiting") {:stop, :normal, state} end - @impl true - def handle_info({:EXIT, pid, reason}, %{parent_process: {:alive, pid}} = state) do + defp do_handle_info({:EXIT, pid, reason}, %{parent_process: {:alive, pid}} = state) do Membrane.Logger.debug( "got exit request from parent, reason: #{inspect(reason)}, shutting down children" ) @@ -204,8 +219,7 @@ defmodule Membrane.Core.SubprocessSupervisor do {:noreply, %{state | parent_process: :exit_requested}} end - @impl true - def handle_info({:EXIT, pid, reason}, state) do + defp do_handle_info({:EXIT, pid, reason}, state) do {data, state} = pop_in(state, [:children, pid]) handle_exit(data, reason, state) diff --git a/lib/membrane/core/macros.ex b/lib/membrane/core/utils.ex similarity index 62% rename from lib/membrane/core/macros.ex rename to lib/membrane/core/utils.ex index 2a206cd3f..1f21bacb1 100644 --- a/lib/membrane/core/macros.ex +++ b/lib/membrane/core/utils.ex @@ -1,9 +1,17 @@ -defmodule Membrane.Core.Macros do +defmodule Membrane.Core.Utils do @moduledoc false # For some reason GenServer processes sometimes don't print logs about crash, so # we add this macro, to ensure that error logs are always printed defmacro log_on_error(do: code) do + error_source = + case __CALLER__.module do + Membrane.Core.Element -> "Membrane Element" + Membrane.Core.Bin -> "Membrane Bin" + Membrane.Core.Pipeline -> "Membrane Pipeline" + other -> inspect(other) + end + quote do try do unquote(code) @@ -12,7 +20,7 @@ defmodule Membrane.Core.Macros do require Membrane.Logger Membrane.Logger.error(""" - Error occured in Membrane Component: + Error occured in #{unquote(error_source)}: #{inspect(e, pretty: true, limit: :infinity)} #{Exception.format_stacktrace(__STACKTRACE__)} """) diff --git a/lib/membrane/resource_guard.ex b/lib/membrane/resource_guard.ex index c443ffb39..6867be790 100644 --- a/lib/membrane/resource_guard.ex +++ b/lib/membrane/resource_guard.ex @@ -21,6 +21,7 @@ defmodule Membrane.ResourceGuard do """ use GenServer + require Membrane.Core.Utils, as: Utils require Membrane.Core.Message, as: Message require Membrane.Logger @@ -87,20 +88,27 @@ defmodule Membrane.ResourceGuard do @impl true def init(owner_pid) do - Process.flag(:trap_exit, true) - monitor = Process.monitor(owner_pid) - {:ok, %{guards: [], monitor: monitor}} + Utils.log_on_error do + Process.flag(:trap_exit, true) + monitor = Process.monitor(owner_pid) + {:ok, %{guards: [], monitor: monitor}} + end end @impl true - def handle_info(Message.new(:register, [function, opts]), state) do + def handle_info(msg, state) do + Utils.log_on_error do + do_handle_info(msg, state) + end + end + + defp do_handle_info(Message.new(:register, [function, opts]), state) do tag = Keyword.fetch!(opts, :tag) timeout = Keyword.get(opts, :timeout, 5000) {:noreply, %{state | guards: [{function, tag, timeout} | state.guards]}} end - @impl true - def handle_info(Message.new(:unregister, tag), state) do + defp do_handle_info(Message.new(:unregister, tag), state) do guards = Enum.reject(state.guards, fn {_function, ^tag, _timeout} -> true @@ -110,14 +118,12 @@ defmodule Membrane.ResourceGuard do {:noreply, %{state | guards: guards}} end - @impl true - def handle_info(Message.new(:cleanup_all), state) do + defp do_handle_info(Message.new(:cleanup_all), state) do do_cleanup_all(state.guards) {:noreply, %{state | guards: []}} end - @impl true - def handle_info(Message.new(:cleanup, tag), state) do + defp do_handle_info(Message.new(:cleanup, tag), state) do guards = Enum.reject(state.guards, fn {function, ^tag, timeout} -> @@ -131,14 +137,12 @@ defmodule Membrane.ResourceGuard do {:noreply, %{state | guards: guards}} end - @impl true - def handle_info({:DOWN, monitor, :process, _pid, _reason}, %{monitor: monitor} = state) do + defp do_handle_info({:DOWN, monitor, :process, _pid, _reason}, %{monitor: monitor} = state) do do_cleanup_all(state.guards) {:stop, :normal, state} end - @impl true - def handle_info(_message, state) do + defp do_handle_info(_message, state) do {:noreply, state} end diff --git a/lib/membrane/sync.ex b/lib/membrane/sync.ex index 56b0387c3..1577074ed 100644 --- a/lib/membrane/sync.ex +++ b/lib/membrane/sync.ex @@ -31,6 +31,8 @@ defmodule Membrane.Sync do alias Membrane.Time + require Membrane.Core.Utils, as: Utils + @no_sync :membrane_no_sync @typedoc """ @@ -92,28 +94,34 @@ defmodule Membrane.Sync do @impl true def init(opts) do - {:ok, - %{ - processes: %{}, - empty_exit?: opts |> Keyword.get(:empty_exit?, false), - active?: false - }} + Utils.log_on_error do + {:ok, + %{ + processes: %{}, + empty_exit?: opts |> Keyword.get(:empty_exit?, false), + active?: false + }} + end end @impl true - def handle_call({:sync_register, pid}, _from, %{active?: false} = state) do + def handle_call(request, from, state) do + Utils.log_on_error do + do_handle_call(request, from, state) + end + end + + defp do_handle_call({:sync_register, pid}, _from, %{active?: false} = state) do Process.monitor(pid) state = state |> put_in([:processes, pid], %{status: :registered, latency: 0, reply_to: nil}) {:reply, :ok, state} end - @impl true - def handle_call({:sync_register, _pid}, _from, state) do + defp do_handle_call({:sync_register, _pid}, _from, state) do {:reply, {:error, :bad_activity_request}, state} end - @impl true - def handle_call({:sync, options}, {pid, _ref} = from, %{active?: true} = state) do + defp do_handle_call({:sync, options}, {pid, _ref} = from, %{active?: true} = state) do latency = options |> Keyword.get(:latency, 0) case state.processes[pid] do @@ -135,19 +143,16 @@ defmodule Membrane.Sync do end end - @impl true - def handle_call({:sync, _options}, _from, %{active?: false} = state) do + defp do_handle_call({:sync, _options}, _from, %{active?: false} = state) do {:reply, :ok, state} end - @impl true - def handle_call({:sync_toggle_active, new_active?}, _from, %{active?: active?} = state) - when new_active? == active? do + defp do_handle_call({:sync_toggle_active, new_active?}, _from, %{active?: active?} = state) + when new_active? == active? do {:reply, {:error, :bad_activity_request}, state} end - @impl true - def handle_call({:sync_toggle_active, active?}, _from, state) do + defp do_handle_call({:sync_toggle_active, active?}, _from, state) do state = %{state | active?: active?} |> check_and_handle_sync() {:reply, :ok, state} end diff --git a/lib/membrane/testing/mock_resource_guard.ex b/lib/membrane/testing/mock_resource_guard.ex index f7330d97e..922aef342 100644 --- a/lib/membrane/testing/mock_resource_guard.ex +++ b/lib/membrane/testing/mock_resource_guard.ex @@ -17,6 +17,7 @@ defmodule Membrane.Testing.MockResourceGuard do use GenServer require Membrane.Core.Message, as: Message + require Membrane.Core.Utils, as: Utils @type options :: [test_process: pid] @@ -40,24 +41,30 @@ defmodule Membrane.Testing.MockResourceGuard do @impl true def init(options) do - {:ok, Map.new(options)} + Utils.log_on_error do + {:ok, Map.new(options)} + end end @impl true - def handle_info(Message.new(:register, [function, options]), state) do + def handle_info(msg, state) do + Utils.log_on_error do + do_handle_info(msg, state) + end + end + + defp do_handle_info(Message.new(:register, [function, options]), state) do tag = Keyword.fetch!(options, :tag) send_to_test_process(state, :register, {function, tag}) {:noreply, state} end - @impl true - def handle_info(Message.new(:unregister, tag), state) do + defp do_handle_info(Message.new(:unregister, tag), state) do send_to_test_process(state, :unregister, tag) {:noreply, state} end - @impl true - def handle_info(Message.new(:cleanup, tag), state) do + defp do_handle_info(Message.new(:cleanup, tag), state) do send_to_test_process(state, :cleanup, tag) {:noreply, state} end