From 15bce16c259cfb65af516ce26c411894313fdecd Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Mon, 10 Jun 2024 16:41:03 +0200 Subject: [PATCH 1/8] Add macro ensuring errors are logged --- .formatter.exs | 3 +- lib/membrane/core/bin.ex | 32 ++++++++++++----- lib/membrane/core/element.ex | 37 +++++++++++++------- lib/membrane/core/macros.ex | 22 ++++++++++++ lib/membrane/core/pipeline.ex | 65 ++++++++++++++++++++--------------- 5 files changed, 109 insertions(+), 50 deletions(-) create mode 100644 lib/membrane/core/macros.ex diff --git a/.formatter.exs b/.formatter.exs index 9cf8cc2e6..3434dbcd1 100644 --- a/.formatter.exs +++ b/.formatter.exs @@ -5,7 +5,8 @@ locals_without_parens = def_options: 1, def_clock: 1, def_type_from_list: 1, - assert_receive_message: 3 + assert_receive_message: 3, + wrap_with_try: 1 ] [ diff --git a/lib/membrane/core/bin.ex b/lib/membrane/core/bin.ex index 157cf4cf2..0f8091eaa 100644 --- a/lib/membrane/core/bin.ex +++ b/lib/membrane/core/bin.ex @@ -19,6 +19,7 @@ defmodule Membrane.Core.Bin do alias Membrane.ResourceGuard + require Membrane.Core.Macros, as: Macros require Membrane.Core.Message require Membrane.Core.Telemetry require Membrane.Logger @@ -79,6 +80,12 @@ defmodule Membrane.Core.Bin do @impl GenServer def init(options) do + Macros.log_on_error do + do_init(options) + end + end + + defp do_init(options) do Process.link(options.parent_supervisor) %{name: name, module: module} = options @@ -144,13 +151,17 @@ defmodule Membrane.Core.Bin do @impl GenServer def handle_continue(:setup, state) do - state = Parent.LifecycleController.handle_setup(state) - {:noreply, state} + Macros.log_on_error do + state = Parent.LifecycleController.handle_setup(state) + {:noreply, state} + end end @impl GenServer def handle_info(message, state) do - do_handle_info(message, state) + Macros.log_on_error do + do_handle_info(message, state) + end end @compile {:inline, do_handle_info: 2} @@ -162,7 +173,6 @@ defmodule Membrane.Core.Bin do defp do_handle_info(Message.new(:parent_notification, notification), state) do state = Child.LifecycleController.handle_parent_notification(notification, state) - {:noreply, state} end @@ -251,7 +261,13 @@ defmodule Membrane.Core.Bin do end @impl GenServer - def handle_call( + def handle_call(request, from, state) do + Macros.log_on_error do + do_handle_call(request, from, state) + end + end + + defp do_handle_call( Message.new(:handle_link, [direction, this, other, params]), _from, state @@ -260,13 +276,11 @@ defmodule Membrane.Core.Bin do {:reply, reply, state} end - @impl GenServer - def handle_call(Message.new(:get_clock), _from, state) do + defp do_handle_call(Message.new(:get_clock), _from, state) do {:reply, state.synchronization.clock, state} end - @impl GenServer - def handle_call(Message.new(:get_child_pid, child_name), _from, state) do + defp do_handle_call(Message.new(:get_child_pid, child_name), _from, state) do reply = with %State{children: %{^child_name => %{pid: child_pid}}} <- state do {:ok, child_pid} diff --git a/lib/membrane/core/element.ex b/lib/membrane/core/element.ex index bf9ab243e..b3b758d39 100644 --- a/lib/membrane/core/element.ex +++ b/lib/membrane/core/element.ex @@ -35,6 +35,7 @@ defmodule Membrane.Core.Element do alias Membrane.Core.{SubprocessSupervisor, TimerController} + require Membrane.Core.Macros, as: Macros require Membrane.Core.Message, as: Message require Membrane.Core.Stalker, as: Stalker require Membrane.Core.Telemetry, as: Telemetry @@ -94,6 +95,12 @@ defmodule Membrane.Core.Element do @impl GenServer def init(options) do + Macros.log_on_error do + do_init(options) + end + end + + defp do_init(options) do Process.link(options.parent_supervisor) observability_config = %{ @@ -155,17 +162,24 @@ defmodule Membrane.Core.Element do @impl GenServer def handle_continue(:setup, state) do - state = LifecycleController.handle_setup(state) - {:noreply, state} + Macros.log_on_error do + state = LifecycleController.handle_setup(state) + {:noreply, state} + end end @impl GenServer - def handle_call(Message.new(:get_clock), _from, state) do + def handle_call(request, from, state) do + Macros.log_on_error do + do_handle_call(request, from, state) + end + end + + defp do_handle_call(Message.new(:get_clock), _from, state) do {:reply, state.synchronization.clock, state} end - @impl GenServer - def handle_call( + defp do_handle_call( Message.new(:handle_link, [direction, this, other, params]), _from, state @@ -174,19 +188,16 @@ defmodule Membrane.Core.Element do {:reply, reply, state} end - @impl GenServer - def handle_call(Message.new(:set_stream_sync, sync), _from, state) do + defp do_handle_call(Message.new(:set_stream_sync, sync), _from, state) do state = put_in(state.synchronization.stream_sync, sync) {:reply, :ok, state} end - @impl GenServer - def handle_call(Message.new(:get_child_pid, _child_name), _from, state) do + defp do_handle_call(Message.new(:get_child_pid, _child_name), _from, state) do {:reply, {:error, :element_cannot_have_children}, state} end - @impl GenServer - def handle_call(message, {pid, _tag}, _state) do + defp do_handle_call(message, {pid, _tag}, _state) do raise Membrane.ElementError, "Received invalid message #{inspect(message)} from #{inspect(pid)}" end @@ -198,7 +209,9 @@ defmodule Membrane.Core.Element do :erlang.process_info(self(), :message_queue_len) |> elem(1) ) - do_handle_info(message, state) + Macros.log_on_error do + do_handle_info(message, state) + end end @compile {:inline, do_handle_info: 2} diff --git a/lib/membrane/core/macros.ex b/lib/membrane/core/macros.ex new file mode 100644 index 000000000..4c8bd6c3e --- /dev/null +++ b/lib/membrane/core/macros.ex @@ -0,0 +1,22 @@ +defmodule Membrane.Core.Macros do + @moduledoc false + + defmacro log_on_error(do: code) do + quote do + try do + unquote(code) + rescue + e -> + require Membrane.Logger + + Membrane.Logger.error(""" + Error occured in Membrane Component: + #{inspect(e, pretty: true, limit: :infinity)} + #{Exception.format_stacktrace(__STACKTRACE__)} + """) + + reraise e, __STACKTRACE__ + end + end + end +end diff --git a/lib/membrane/core/pipeline.ex b/lib/membrane/core/pipeline.ex index 3e50c0848..60cfb28b5 100644 --- a/lib/membrane/core/pipeline.ex +++ b/lib/membrane/core/pipeline.ex @@ -9,6 +9,7 @@ defmodule Membrane.Core.Pipeline do alias Membrane.Core.Parent.{ChildLifeController, LifecycleController} alias Membrane.Core.TimerController + require Membrane.Core.Macros, as: Macros require Membrane.Core.Message, as: Message require Membrane.Core.Telemetry, as: Telemetry require Membrane.Core.Component @@ -23,6 +24,12 @@ defmodule Membrane.Core.Pipeline do @impl GenServer def init(params) do + Macros.log_on_error do + do_init(params) + end + end + + defp do_init(params) do observability_config = %{ name: params.name, component_type: :pipeline, @@ -74,12 +81,20 @@ defmodule Membrane.Core.Pipeline do @impl GenServer def handle_continue(:setup, state) do - state = LifecycleController.handle_setup(state) - {:noreply, state} + Macros.log_on_error do + state = LifecycleController.handle_setup(state) + {:noreply, state} + end end @impl GenServer - def handle_info( + def handle_info(msg, state) do + Macros.log_on_error do + do_handle_info(msg, state) + end + end + + defp do_handle_info( Message.new(:stream_management_event, [element_name, pad_ref, event, event_params]), state ) do @@ -95,74 +110,69 @@ defmodule Membrane.Core.Pipeline do {:noreply, state} end - @impl GenServer - def handle_info(Message.new(:child_pad_removed, [child, pad]), state) do + defp do_handle_info(Message.new(:child_pad_removed, [child, pad]), state) do state = ChildLifeController.handle_child_pad_removed(child, pad, state) {:noreply, state} end - @impl GenServer - def handle_info(Message.new(:child_notification, [from, notification]), state) do + defp do_handle_info(Message.new(:child_notification, [from, notification]), state) do state = LifecycleController.handle_child_notification(from, notification, state) {:noreply, state} end - @impl GenServer - def handle_info(Message.new(:timer_tick, timer_id), state) do + defp do_handle_info(Message.new(:timer_tick, timer_id), state) do state = TimerController.handle_tick(timer_id, state) {:noreply, state} end - @impl GenServer - def handle_info(Message.new(:link_response, [link_id, direction]), state) do + defp do_handle_info(Message.new(:link_response, [link_id, direction]), state) do state = ChildLifeController.handle_link_response(link_id, direction, state) {:noreply, state} end - @impl GenServer - def handle_info(Message.new(:initialized, child), state) do + defp do_handle_info(Message.new(:initialized, child), state) do state = ChildLifeController.handle_child_initialized(child, state) {:noreply, state} end - @impl GenServer - def handle_info(Message.new(:child_death, [name, reason]), state) do + defp do_handle_info(Message.new(:child_death, [name, reason]), state) do case ChildLifeController.handle_child_death(name, reason, state) do {:stop, reason, _state} -> ProcessHelper.notoelo(reason) {:continue, state} -> {:noreply, state} end end - @impl GenServer - def handle_info(Message.new(:terminate), state) do + defp do_handle_info(Message.new(:terminate), state) do state = LifecycleController.handle_terminate_request(state) {:noreply, state} end - @impl GenServer - def handle_info(Message.new(_type, _args, _opts) = message, _state) do + defp do_handle_info(Message.new(_type, _args, _opts) = message, _state) do raise Membrane.PipelineError, "Received invalid message #{inspect(message)}" end - @impl GenServer - def handle_info({:membrane_clock_ratio, clock, ratio}, state) do + defp do_handle_info({:membrane_clock_ratio, clock, ratio}, state) do state = TimerController.handle_clock_update(clock, ratio, state) {:noreply, state} end - @impl GenServer - def handle_info(message, state) do + defp do_handle_info(message, state) do state = LifecycleController.handle_info(message, state) {:noreply, state} end @impl GenServer - def handle_call(Message.new(:get_stalker), _from, state) do + def handle_call(msg, from, state) do + Macros.log_on_error do + do_handle_call(msg, from, state) + end + end + + defp do_handle_call(Message.new(:get_stalker), _from, state) do {:reply, {:ok, state.stalker}, state} end - @impl GenServer - def handle_call(Message.new(:get_child_pid, child_name), _from, state) do + defp do_handle_call(Message.new(:get_child_pid, child_name), _from, state) do reply = with %State{children: %{^child_name => %{pid: child_pid}}} <- state do {:ok, child_pid} @@ -173,8 +183,7 @@ defmodule Membrane.Core.Pipeline do {:reply, reply, state} end - @impl GenServer - def handle_call(message, from, state) do + defp do_handle_call(message, from, state) do context = &CallbackContext.from_state(&1, from: from) state = From 82d8c0490adc61d7fb23f893525fcd447df57768 Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Mon, 10 Jun 2024 16:42:11 +0200 Subject: [PATCH 2/8] mix format --- lib/membrane/core/bin.ex | 8 ++++---- lib/membrane/core/element.ex | 8 ++++---- lib/membrane/core/pipeline.ex | 6 +++--- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/lib/membrane/core/bin.ex b/lib/membrane/core/bin.ex index 0f8091eaa..6b3f615a5 100644 --- a/lib/membrane/core/bin.ex +++ b/lib/membrane/core/bin.ex @@ -268,10 +268,10 @@ defmodule Membrane.Core.Bin do end defp do_handle_call( - Message.new(:handle_link, [direction, this, other, params]), - _from, - state - ) do + Message.new(:handle_link, [direction, this, other, params]), + _from, + state + ) do {reply, state} = PadController.handle_link(direction, this, other, params, state) {:reply, reply, state} end diff --git a/lib/membrane/core/element.ex b/lib/membrane/core/element.ex index b3b758d39..009f2e16d 100644 --- a/lib/membrane/core/element.ex +++ b/lib/membrane/core/element.ex @@ -180,10 +180,10 @@ defmodule Membrane.Core.Element do end defp do_handle_call( - Message.new(:handle_link, [direction, this, other, params]), - _from, - state - ) do + Message.new(:handle_link, [direction, this, other, params]), + _from, + state + ) do {reply, state} = PadController.handle_link(direction, this, other, params, state) {:reply, reply, state} end diff --git a/lib/membrane/core/pipeline.ex b/lib/membrane/core/pipeline.ex index 60cfb28b5..4dc4acdd5 100644 --- a/lib/membrane/core/pipeline.ex +++ b/lib/membrane/core/pipeline.ex @@ -95,9 +95,9 @@ defmodule Membrane.Core.Pipeline do end defp do_handle_info( - Message.new(:stream_management_event, [element_name, pad_ref, event, event_params]), - state - ) do + Message.new(:stream_management_event, [element_name, pad_ref, event, event_params]), + state + ) do state = LifecycleController.handle_stream_management_event( event, From 267f08fc31cc8e6105e69ac63c7a7e155202a97f Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Mon, 10 Jun 2024 16:47:03 +0200 Subject: [PATCH 3/8] Refactor logs --- lib/membrane/core/callback_handler.ex | 24 ++---------------------- lib/membrane/core/macros.ex | 2 ++ 2 files changed, 4 insertions(+), 22 deletions(-) diff --git a/lib/membrane/core/callback_handler.ex b/lib/membrane/core/callback_handler.ex index 186e50e04..017d819b9 100644 --- a/lib/membrane/core/callback_handler.ex +++ b/lib/membrane/core/callback_handler.ex @@ -185,8 +185,6 @@ defmodule Membrane.Core.CallbackHandler do Error handling actions returned by callback #{inspect(state.module)}.#{callback} """) - log_debug_orginal_error(actions, e, __STACKTRACE__) - reraise e, __STACKTRACE__ end @@ -197,32 +195,14 @@ defmodule Membrane.Core.CallbackHandler do rescue e -> Membrane.Logger.error(""" - Error handling action #{inspect(action)} returned by callback #{inspect(state.module)}.#{callback} + Error handling action returned by callback #{inspect(state.module)}.#{callback}. + Action: #{inspect(action, pretty: true)} """) - log_debug_orginal_error(action, e, __STACKTRACE__) - reraise e, __STACKTRACE__ end end) handler_module.handle_end_of_actions(state) end - - # We log it, because sometimes, for some reason, crashing process doesn't cause - # printing error logs on stderr, so this debug log allows us to get some info - # about what happened in case of process crash - defp log_debug_orginal_error(action_or_actions, error, stacktrace) do - action_or_actions = - if(is_list(action_or_actions), do: "actions ", else: "action ") <> - inspect(action_or_actions, limit: :infinity) - - Membrane.Logger.debug(""" - Error while handling #{action_or_actions} - - Orginal error: - #{inspect(error, pretty: true, limit: :infinity)} - #{Exception.format_stacktrace(stacktrace)} - """) - end end diff --git a/lib/membrane/core/macros.ex b/lib/membrane/core/macros.ex index 4c8bd6c3e..1e81e9bd2 100644 --- a/lib/membrane/core/macros.ex +++ b/lib/membrane/core/macros.ex @@ -1,6 +1,8 @@ defmodule Membrane.Core.Macros do @moduledoc false + # For some reason GenServer processes sometimes don't print logs about crash, so + # we add this macro, to ensure that error logs wil be always logged defmacro log_on_error(do: code) do quote do try do From 69d9e806615e8743449efcda1ba75479f1ec8d1c Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Mon, 10 Jun 2024 16:54:24 +0200 Subject: [PATCH 4/8] Fix small bug --- lib/membrane/core/element.ex | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/lib/membrane/core/element.ex b/lib/membrane/core/element.ex index 009f2e16d..867152d7e 100644 --- a/lib/membrane/core/element.ex +++ b/lib/membrane/core/element.ex @@ -204,12 +204,12 @@ defmodule Membrane.Core.Element do @impl GenServer def handle_info(message, state) do - Telemetry.report_metric( - :queue_len, - :erlang.process_info(self(), :message_queue_len) |> elem(1) - ) - Macros.log_on_error do + Telemetry.report_metric( + :queue_len, + :erlang.process_info(self(), :message_queue_len) |> elem(1) + ) + do_handle_info(message, state) end end From 8fad39cf7c85038b9c04f066ce04a04692224241 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Feliks=20Pobiedzi=C5=84ski?= <38541925+FelonEkonom@users.noreply.github.com> Date: Mon, 10 Jun 2024 17:09:36 +0200 Subject: [PATCH 5/8] Update lib/membrane/core/macros.ex MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Ɓukasz Kita --- lib/membrane/core/macros.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/membrane/core/macros.ex b/lib/membrane/core/macros.ex index 1e81e9bd2..2a206cd3f 100644 --- a/lib/membrane/core/macros.ex +++ b/lib/membrane/core/macros.ex @@ -2,7 +2,7 @@ defmodule Membrane.Core.Macros do @moduledoc false # For some reason GenServer processes sometimes don't print logs about crash, so - # we add this macro, to ensure that error logs wil be always logged + # we add this macro, to ensure that error logs are always printed defmacro log_on_error(do: code) do quote do try do From b524f7001396a2047b8b20ff33469cb0fdf7eeba Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Tue, 11 Jun 2024 11:59:50 +0200 Subject: [PATCH 6/8] Implement CR suggestions --- lib/membrane/clock.ex | 41 ++++++++---- lib/membrane/core/bin.ex | 10 +-- lib/membrane/core/element.ex | 10 +-- .../distributed_atomic/worker.ex | 26 +++++--- lib/membrane/core/pipeline.ex | 10 +-- lib/membrane/core/pipeline/supervisor.ex | 59 +++++++++++------- lib/membrane/core/stalker.ex | 26 +++++--- lib/membrane/core/subprocess_supervisor.ex | 62 ++++++++++++------- lib/membrane/core/{macros.ex => utils.ex} | 12 +++- lib/membrane/resource_guard.ex | 32 +++++----- lib/membrane/sync.ex | 41 ++++++------ lib/membrane/testing/mock_resource_guard.ex | 19 ++++-- 12 files changed, 216 insertions(+), 132 deletions(-) rename lib/membrane/core/{macros.ex => utils.ex} (62%) diff --git a/lib/membrane/clock.ex b/lib/membrane/clock.ex index 5f1097a5c..21d01351d 100644 --- a/lib/membrane/clock.ex +++ b/lib/membrane/clock.ex @@ -32,6 +32,8 @@ defmodule Membrane.Clock do alias Membrane.Core.Message alias Membrane.Time + require Membrane.Core.Utils, as: Utils + @typedoc @moduledoc @type t :: pid @@ -115,6 +117,12 @@ defmodule Membrane.Clock do @impl GenServer def init(options) do + Utils.log_on_error do + do_init(options) + end + end + + defp do_init(options) do proxy_opts = get_proxy_options(options[:proxy], options[:proxy_for]) state = @@ -131,7 +139,13 @@ defmodule Membrane.Clock do end @impl GenServer - def handle_cast({:proxy_for, proxy_for}, %{proxy: true} = state) do + def handle_cast(request, state) do + Utils.log_on_error do + do_handle_cast(request, state) + end + end + + defp do_handle_cast({:proxy_for, proxy_for}, %{proxy: true} = state) do if state.proxy_for, do: unsubscribe(state.proxy_for) state = %{state | proxy_for: proxy_for} @@ -147,8 +161,7 @@ defmodule Membrane.Clock do {:noreply, state} end - @impl GenServer - def handle_cast({:clock_subscribe, pid}, state) do + defp do_handle_cast({:clock_subscribe, pid}, state) do state |> update_in([:subscribers, pid], fn nil -> @@ -162,8 +175,7 @@ defmodule Membrane.Clock do ~> {:noreply, &1} end - @impl GenServer - def handle_cast({:clock_unsubscribe, pid}, state) do + defp do_handle_cast({:clock_unsubscribe, pid}, state) do if Map.has_key?(state.subscribers, pid) do {subs, state} = state |> Bunch.Access.get_updated_in([:subscribers, pid, :subscriptions], &(&1 - 1)) @@ -175,24 +187,27 @@ defmodule Membrane.Clock do ~> {:noreply, &1} end - @impl GenServer - def handle_info({:membrane_clock_update, till_next}, %{proxy: false} = state) do + @impl true + def handle_info(msg, state) do + Utils.log_on_error do + do_handle_info(msg, state) + end + end + + defp do_handle_info({:membrane_clock_update, till_next}, %{proxy: false} = state) do {:noreply, handle_clock_update(till_next, state)} end - @impl GenServer - def handle_info({:membrane_clock_ratio, pid, ratio}, %{proxy: true, proxy_for: pid} = state) do + defp do_handle_info({:membrane_clock_ratio, pid, ratio}, %{proxy: true, proxy_for: pid} = state) do {:noreply, broadcast_and_update_ratio(ratio, state)} end - @impl GenServer # When ratio from previously proxied clock comes in after unsubscribing - def handle_info({:membrane_clock_ratio, _pid, _ratio}, %{proxy: true} = state) do + defp do_handle_info({:membrane_clock_ratio, _pid, _ratio}, %{proxy: true} = state) do {:noreply, state} end - @impl GenServer - def handle_info({:DOWN, _ref, :process, pid, _reason}, state) do + defp do_handle_info({:DOWN, _ref, :process, pid, _reason}, state) do {:noreply, handle_unsubscribe(pid, state)} end diff --git a/lib/membrane/core/bin.ex b/lib/membrane/core/bin.ex index 6b3f615a5..b7bf83a85 100644 --- a/lib/membrane/core/bin.ex +++ b/lib/membrane/core/bin.ex @@ -19,7 +19,7 @@ defmodule Membrane.Core.Bin do alias Membrane.ResourceGuard - require Membrane.Core.Macros, as: Macros + require Membrane.Core.Utils, as: Utils require Membrane.Core.Message require Membrane.Core.Telemetry require Membrane.Logger @@ -80,7 +80,7 @@ defmodule Membrane.Core.Bin do @impl GenServer def init(options) do - Macros.log_on_error do + Utils.log_on_error do do_init(options) end end @@ -151,7 +151,7 @@ defmodule Membrane.Core.Bin do @impl GenServer def handle_continue(:setup, state) do - Macros.log_on_error do + Utils.log_on_error do state = Parent.LifecycleController.handle_setup(state) {:noreply, state} end @@ -159,7 +159,7 @@ defmodule Membrane.Core.Bin do @impl GenServer def handle_info(message, state) do - Macros.log_on_error do + Utils.log_on_error do do_handle_info(message, state) end end @@ -262,7 +262,7 @@ defmodule Membrane.Core.Bin do @impl GenServer def handle_call(request, from, state) do - Macros.log_on_error do + Utils.log_on_error do do_handle_call(request, from, state) end end diff --git a/lib/membrane/core/element.ex b/lib/membrane/core/element.ex index 867152d7e..e5424a672 100644 --- a/lib/membrane/core/element.ex +++ b/lib/membrane/core/element.ex @@ -35,7 +35,7 @@ defmodule Membrane.Core.Element do alias Membrane.Core.{SubprocessSupervisor, TimerController} - require Membrane.Core.Macros, as: Macros + require Membrane.Core.Utils, as: Utils require Membrane.Core.Message, as: Message require Membrane.Core.Stalker, as: Stalker require Membrane.Core.Telemetry, as: Telemetry @@ -95,7 +95,7 @@ defmodule Membrane.Core.Element do @impl GenServer def init(options) do - Macros.log_on_error do + Utils.log_on_error do do_init(options) end end @@ -162,7 +162,7 @@ defmodule Membrane.Core.Element do @impl GenServer def handle_continue(:setup, state) do - Macros.log_on_error do + Utils.log_on_error do state = LifecycleController.handle_setup(state) {:noreply, state} end @@ -170,7 +170,7 @@ defmodule Membrane.Core.Element do @impl GenServer def handle_call(request, from, state) do - Macros.log_on_error do + Utils.log_on_error do do_handle_call(request, from, state) end end @@ -204,7 +204,7 @@ defmodule Membrane.Core.Element do @impl GenServer def handle_info(message, state) do - Macros.log_on_error do + Utils.log_on_error do Telemetry.report_metric( :queue_len, :erlang.process_info(self(), :message_queue_len) |> elem(1) diff --git a/lib/membrane/core/element/atomic_demand/distributed_atomic/worker.ex b/lib/membrane/core/element/atomic_demand/distributed_atomic/worker.ex index 5f60450d7..56a967326 100644 --- a/lib/membrane/core/element/atomic_demand/distributed_atomic/worker.ex +++ b/lib/membrane/core/element/atomic_demand/distributed_atomic/worker.ex @@ -6,6 +6,8 @@ defmodule Membrane.Core.Element.AtomicDemand.DistributedAtomic.Worker do use GenServer + require Membrane.Core.Utils, as: Utils + @type t :: pid() @spec start_link(any()) :: {:ok, t} @@ -18,25 +20,33 @@ defmodule Membrane.Core.Element.AtomicDemand.DistributedAtomic.Worker do @impl true def handle_call({:add_get, atomic_ref, value}, _from, _state) do - result = :atomics.add_get(atomic_ref, 1, value) - {:reply, result, nil} + Utils.log_on_error do + result = :atomics.add_get(atomic_ref, 1, value) + {:reply, result, nil} + end end @impl true def handle_call({:sub_get, atomic_ref, value}, _from, _state) do - result = :atomics.sub_get(atomic_ref, 1, value) - {:reply, result, nil} + Utils.log_on_error do + result = :atomics.sub_get(atomic_ref, 1, value) + {:reply, result, nil} + end end @impl true def handle_call({:get, atomic_ref}, _from, _state) do - result = :atomics.get(atomic_ref, 1) - {:reply, result, nil} + Utils.log_on_error do + result = :atomics.get(atomic_ref, 1) + {:reply, result, nil} + end end @impl true def handle_cast({:put, atomic_ref, value}, _state) do - :atomics.put(atomic_ref, 1, value) - {:noreply, nil} + Utils.log_on_error do + :atomics.put(atomic_ref, 1, value) + {:noreply, nil} + end end end diff --git a/lib/membrane/core/pipeline.ex b/lib/membrane/core/pipeline.ex index 4dc4acdd5..6f9fc95db 100644 --- a/lib/membrane/core/pipeline.ex +++ b/lib/membrane/core/pipeline.ex @@ -9,7 +9,7 @@ defmodule Membrane.Core.Pipeline do alias Membrane.Core.Parent.{ChildLifeController, LifecycleController} alias Membrane.Core.TimerController - require Membrane.Core.Macros, as: Macros + require Membrane.Core.Utils, as: Utils require Membrane.Core.Message, as: Message require Membrane.Core.Telemetry, as: Telemetry require Membrane.Core.Component @@ -24,7 +24,7 @@ defmodule Membrane.Core.Pipeline do @impl GenServer def init(params) do - Macros.log_on_error do + Utils.log_on_error do do_init(params) end end @@ -81,7 +81,7 @@ defmodule Membrane.Core.Pipeline do @impl GenServer def handle_continue(:setup, state) do - Macros.log_on_error do + Utils.log_on_error do state = LifecycleController.handle_setup(state) {:noreply, state} end @@ -89,7 +89,7 @@ defmodule Membrane.Core.Pipeline do @impl GenServer def handle_info(msg, state) do - Macros.log_on_error do + Utils.log_on_error do do_handle_info(msg, state) end end @@ -163,7 +163,7 @@ defmodule Membrane.Core.Pipeline do @impl GenServer def handle_call(msg, from, state) do - Macros.log_on_error do + Utils.log_on_error do do_handle_call(msg, from, state) end end diff --git a/lib/membrane/core/pipeline/supervisor.ex b/lib/membrane/core/pipeline/supervisor.ex index d139642a4..0f68c53b3 100644 --- a/lib/membrane/core/pipeline/supervisor.ex +++ b/lib/membrane/core/pipeline/supervisor.ex @@ -6,6 +6,7 @@ defmodule Membrane.Core.Pipeline.Supervisor do alias Membrane.Core.{ProcessHelper, SubprocessSupervisor} require Membrane.Core.Message, as: Message + require Membrane.Core.Utils, as: Utils require Membrane.Logger @spec run( @@ -30,7 +31,13 @@ defmodule Membrane.Core.Pipeline.Supervisor do end @impl true - def init({start_fun, name, reply_to}) do + def init(params) do + Utils.log_on_error do + do_init(params) + end + end + + defp do_init({start_fun, name, reply_to}) do Process.flag(:trap_exit, true) subprocess_supervisor = SubprocessSupervisor.start_link!() @@ -52,18 +59,26 @@ defmodule Membrane.Core.Pipeline.Supervisor do @impl true def handle_call(:which_children, _from, state) do - reply = - [{SubprocessSupervisor, state.subprocess_supervisor, :supervisor, SubprocessSupervisor}] ++ - case state.pipeline do - {:alive, pid} -> [{:pipeline, pid, :worker, []}] - {:exited, _reason} -> [] - end - - {:reply, reply, state} + Utils.log_on_error do + reply = + [{SubprocessSupervisor, state.subprocess_supervisor, :supervisor, SubprocessSupervisor}] ++ + case state.pipeline do + {:alive, pid} -> [{:pipeline, pid, :worker, []}] + {:exited, _reason} -> [] + end + + {:reply, reply, state} + end end @impl true - def handle_info({:EXIT, pid, reason}, %{pipeline: {:alive, pid}} = state) do + def handle_info(msg, state) do + Utils.log_on_error do + do_handle_info(msg, state) + end + end + + defp do_handle_info({:EXIT, pid, reason}, %{pipeline: {:alive, pid}} = state) do Membrane.Logger.debug( "got exit from pipeline with reason #{inspect(reason)}, stopping subprocess supervisor" ) @@ -72,32 +87,28 @@ defmodule Membrane.Core.Pipeline.Supervisor do {:noreply, %{state | pipeline: {:exited, reason}}} end - @impl true - def handle_info( - {:EXIT, pid, :normal}, - %{subprocess_supervisor: pid, pipeline: {:exited, pipeline_exit_reason}} - ) do + defp do_handle_info( + {:EXIT, pid, :normal}, + %{subprocess_supervisor: pid, pipeline: {:exited, pipeline_exit_reason}} + ) do Membrane.Logger.debug("got exit from subprocess supervisor, exiting") ProcessHelper.notoelo(pipeline_exit_reason, log?: false) end - @impl true - def handle_info({:EXIT, pid, reason}, %{ - subprocess_supervisor: pid, - pipeline: {:alive, _pipeline_pid} - }) do + defp do_handle_info({:EXIT, pid, reason}, %{ + subprocess_supervisor: pid, + pipeline: {:alive, _pipeline_pid} + }) do raise "Subprocess supervisor failure, reason: #{inspect(reason)}" end - @impl true - def handle_info({:EXIT, _pid, reason}, %{pipeline: {:alive, pipeline_pid}} = state) do + defp do_handle_info({:EXIT, _pid, reason}, %{pipeline: {:alive, pipeline_pid}} = state) do Membrane.Logger.debug("got exit from a linked process, stopping pipeline") Process.exit(pipeline_pid, reason) {:noreply, state} end - @impl true - def handle_info({:EXIT, _pid, _reason}, state) do + defp do_handle_info({:EXIT, _pid, _reason}, state) do Membrane.Logger.debug( "got exit from a linked process, pipeline already dead, waiting for subprocess supervisor to exit" ) diff --git a/lib/membrane/core/stalker.ex b/lib/membrane/core/stalker.ex index 7ab89b9b7..384459f20 100644 --- a/lib/membrane/core/stalker.ex +++ b/lib/membrane/core/stalker.ex @@ -3,6 +3,7 @@ defmodule Membrane.Core.Stalker do use GenServer alias Membrane.{ComponentPath, Pad, Time} + require Membrane.Core.Utils, as: Utils @unsafely_name_processes_for_observer Application.compile_env( :membrane_core, @@ -336,7 +337,13 @@ defmodule Membrane.Core.Stalker do end @impl true - def init(%{pipeline: pipeline}) do + def init(options) do + Utils.log_on_error do + do_init(options) + end + end + + defp do_init(%{pipeline: pipeline}) do Process.send_after(self(), :scrape_metrics, @scrape_interval) ets = create_ets() send(pipeline, {:ets, ets}) @@ -354,7 +361,13 @@ defmodule Membrane.Core.Stalker do end @impl true - def handle_info(:scrape_metrics, state) do + def handle_info(msg, state) do + Utils.log_on_error do + do_handle_info(msg, state) + end + end + + defp do_handle_info(:scrape_metrics, state) do Process.send_after(self(), :scrape_metrics, @scrape_interval) metrics = scrape_metrics(state) timestamp = Time.milliseconds(System.monotonic_time(:millisecond) - state.init_time) @@ -363,15 +376,13 @@ defmodule Membrane.Core.Stalker do {:noreply, %{state | metrics: Map.new(metrics), timestamp: timestamp}} end - @impl true - def handle_info({:graph, graph_update}, state) do + defp do_handle_info({:graph, graph_update}, state) do {action, graph_updates, state} = handle_graph_update(graph_update, state) send_to_subscribers(graph_updates, :graph, &{:graph, action, &1}, state) {:noreply, state} end - @impl true - def handle_info({:subscribe, pid, topics, opts}, state) do + defp do_handle_info({:subscribe, pid, topics, opts}, state) do _ref = unless Map.has_key?(state.subscribers, pid), do: Process.monitor(pid) opts = Keyword.validate!(opts, [:confirm]) with {:ok, id} <- Keyword.fetch(opts, :confirm), do: send(pid, {:subscribed, id}) @@ -414,8 +425,7 @@ defmodule Membrane.Core.Stalker do {:noreply, state} end - @impl true - def handle_info({:DOWN, _ref, :process, pid, _reason}, state) do + defp do_handle_info({:DOWN, _ref, :process, pid, _reason}, state) do cond do Map.has_key?(state.subscribers, pid) -> {:noreply, Bunch.Access.delete_in(state, [:subscribers, pid])} diff --git a/lib/membrane/core/subprocess_supervisor.ex b/lib/membrane/core/subprocess_supervisor.ex index 4639ce225..c0437d5af 100644 --- a/lib/membrane/core/subprocess_supervisor.ex +++ b/lib/membrane/core/subprocess_supervisor.ex @@ -12,6 +12,7 @@ defmodule Membrane.Core.SubprocessSupervisor do alias Membrane.Core.Stalker require Membrane.Core.Message, as: Message + require Membrane.Core.Utils, as: Utils require Membrane.Logger @spec start_link!() :: pid() @@ -93,18 +94,30 @@ defmodule Membrane.Core.SubprocessSupervisor do @impl true def init(parent_process) do - Process.flag(:trap_exit, true) - - {:ok, - %{ - parent_component: nil, - parent_process: {:alive, parent_process}, - children: %{} - }} + Utils.log_on_error do + Process.flag(:trap_exit, true) + + {:ok, + %{ + parent_component: nil, + parent_process: {:alive, parent_process}, + children: %{} + }} + end end @impl true - def handle_call(Message.new(:start_component, [name, component_module, options]), _from, state) do + def handle_call(request, from, state) do + Utils.log_on_error do + do_handle_call(request, from, state) + end + end + + defp do_handle_call( + Message.new(:start_component, [name, component_module, options]), + _from, + state + ) do subprocess_supervisor = start_link_subprocess_supervisor!(options) options = @@ -138,8 +151,7 @@ defmodule Membrane.Core.SubprocessSupervisor do end end - @impl true - def handle_call(Message.new(:start_utility, child_spec), _from, state) do + defp do_handle_call(Message.new(:start_utility, child_spec), _from, state) do try do {m, f, a} = child_spec.start apply(m, f, a) @@ -169,30 +181,33 @@ defmodule Membrane.Core.SubprocessSupervisor do end end - @impl true - def handle_call(:which_children, _from, state) do + defp do_handle_call(:which_children, _from, state) do reply = Enum.map(state.children, fn {pid, data} -> {data.name, pid, data.type, []} end) {:reply, reply, state} end @impl true - def handle_info(Message.new(:set_parent_component, [pid, observability_config]), state) do + def handle_info(msg, state) do + Utils.log_on_error do + do_handle_info(msg, state) + end + end + + defp do_handle_info(Message.new(:set_parent_component, [pid, observability_config]), state) do Membrane.Core.Stalker.setup_component_utility(observability_config, "subprocess supervisor") {:noreply, %{state | parent_component: pid}} end - @impl true - def handle_info( - {:EXIT, pid, _reason}, - %{parent_process: {:alive, pid}, children: children} = state - ) - when children == %{} do + defp do_handle_info( + {:EXIT, pid, _reason}, + %{parent_process: {:alive, pid}, children: children} = state + ) + when children == %{} do Membrane.Logger.debug("exiting") {:stop, :normal, state} end - @impl true - def handle_info({:EXIT, pid, reason}, %{parent_process: {:alive, pid}} = state) do + defp do_handle_info({:EXIT, pid, reason}, %{parent_process: {:alive, pid}} = state) do Membrane.Logger.debug( "got exit request from parent, reason: #{inspect(reason)}, shutting down children" ) @@ -204,8 +219,7 @@ defmodule Membrane.Core.SubprocessSupervisor do {:noreply, %{state | parent_process: :exit_requested}} end - @impl true - def handle_info({:EXIT, pid, reason}, state) do + defp do_handle_info({:EXIT, pid, reason}, state) do {data, state} = pop_in(state, [:children, pid]) handle_exit(data, reason, state) diff --git a/lib/membrane/core/macros.ex b/lib/membrane/core/utils.ex similarity index 62% rename from lib/membrane/core/macros.ex rename to lib/membrane/core/utils.ex index 2a206cd3f..1f21bacb1 100644 --- a/lib/membrane/core/macros.ex +++ b/lib/membrane/core/utils.ex @@ -1,9 +1,17 @@ -defmodule Membrane.Core.Macros do +defmodule Membrane.Core.Utils do @moduledoc false # For some reason GenServer processes sometimes don't print logs about crash, so # we add this macro, to ensure that error logs are always printed defmacro log_on_error(do: code) do + error_source = + case __CALLER__.module do + Membrane.Core.Element -> "Membrane Element" + Membrane.Core.Bin -> "Membrane Bin" + Membrane.Core.Pipeline -> "Membrane Pipeline" + other -> inspect(other) + end + quote do try do unquote(code) @@ -12,7 +20,7 @@ defmodule Membrane.Core.Macros do require Membrane.Logger Membrane.Logger.error(""" - Error occured in Membrane Component: + Error occured in #{unquote(error_source)}: #{inspect(e, pretty: true, limit: :infinity)} #{Exception.format_stacktrace(__STACKTRACE__)} """) diff --git a/lib/membrane/resource_guard.ex b/lib/membrane/resource_guard.ex index c443ffb39..6867be790 100644 --- a/lib/membrane/resource_guard.ex +++ b/lib/membrane/resource_guard.ex @@ -21,6 +21,7 @@ defmodule Membrane.ResourceGuard do """ use GenServer + require Membrane.Core.Utils, as: Utils require Membrane.Core.Message, as: Message require Membrane.Logger @@ -87,20 +88,27 @@ defmodule Membrane.ResourceGuard do @impl true def init(owner_pid) do - Process.flag(:trap_exit, true) - monitor = Process.monitor(owner_pid) - {:ok, %{guards: [], monitor: monitor}} + Utils.log_on_error do + Process.flag(:trap_exit, true) + monitor = Process.monitor(owner_pid) + {:ok, %{guards: [], monitor: monitor}} + end end @impl true - def handle_info(Message.new(:register, [function, opts]), state) do + def handle_info(msg, state) do + Utils.log_on_error do + do_handle_info(msg, state) + end + end + + defp do_handle_info(Message.new(:register, [function, opts]), state) do tag = Keyword.fetch!(opts, :tag) timeout = Keyword.get(opts, :timeout, 5000) {:noreply, %{state | guards: [{function, tag, timeout} | state.guards]}} end - @impl true - def handle_info(Message.new(:unregister, tag), state) do + defp do_handle_info(Message.new(:unregister, tag), state) do guards = Enum.reject(state.guards, fn {_function, ^tag, _timeout} -> true @@ -110,14 +118,12 @@ defmodule Membrane.ResourceGuard do {:noreply, %{state | guards: guards}} end - @impl true - def handle_info(Message.new(:cleanup_all), state) do + defp do_handle_info(Message.new(:cleanup_all), state) do do_cleanup_all(state.guards) {:noreply, %{state | guards: []}} end - @impl true - def handle_info(Message.new(:cleanup, tag), state) do + defp do_handle_info(Message.new(:cleanup, tag), state) do guards = Enum.reject(state.guards, fn {function, ^tag, timeout} -> @@ -131,14 +137,12 @@ defmodule Membrane.ResourceGuard do {:noreply, %{state | guards: guards}} end - @impl true - def handle_info({:DOWN, monitor, :process, _pid, _reason}, %{monitor: monitor} = state) do + defp do_handle_info({:DOWN, monitor, :process, _pid, _reason}, %{monitor: monitor} = state) do do_cleanup_all(state.guards) {:stop, :normal, state} end - @impl true - def handle_info(_message, state) do + defp do_handle_info(_message, state) do {:noreply, state} end diff --git a/lib/membrane/sync.ex b/lib/membrane/sync.ex index 56b0387c3..1577074ed 100644 --- a/lib/membrane/sync.ex +++ b/lib/membrane/sync.ex @@ -31,6 +31,8 @@ defmodule Membrane.Sync do alias Membrane.Time + require Membrane.Core.Utils, as: Utils + @no_sync :membrane_no_sync @typedoc """ @@ -92,28 +94,34 @@ defmodule Membrane.Sync do @impl true def init(opts) do - {:ok, - %{ - processes: %{}, - empty_exit?: opts |> Keyword.get(:empty_exit?, false), - active?: false - }} + Utils.log_on_error do + {:ok, + %{ + processes: %{}, + empty_exit?: opts |> Keyword.get(:empty_exit?, false), + active?: false + }} + end end @impl true - def handle_call({:sync_register, pid}, _from, %{active?: false} = state) do + def handle_call(request, from, state) do + Utils.log_on_error do + do_handle_call(request, from, state) + end + end + + defp do_handle_call({:sync_register, pid}, _from, %{active?: false} = state) do Process.monitor(pid) state = state |> put_in([:processes, pid], %{status: :registered, latency: 0, reply_to: nil}) {:reply, :ok, state} end - @impl true - def handle_call({:sync_register, _pid}, _from, state) do + defp do_handle_call({:sync_register, _pid}, _from, state) do {:reply, {:error, :bad_activity_request}, state} end - @impl true - def handle_call({:sync, options}, {pid, _ref} = from, %{active?: true} = state) do + defp do_handle_call({:sync, options}, {pid, _ref} = from, %{active?: true} = state) do latency = options |> Keyword.get(:latency, 0) case state.processes[pid] do @@ -135,19 +143,16 @@ defmodule Membrane.Sync do end end - @impl true - def handle_call({:sync, _options}, _from, %{active?: false} = state) do + defp do_handle_call({:sync, _options}, _from, %{active?: false} = state) do {:reply, :ok, state} end - @impl true - def handle_call({:sync_toggle_active, new_active?}, _from, %{active?: active?} = state) - when new_active? == active? do + defp do_handle_call({:sync_toggle_active, new_active?}, _from, %{active?: active?} = state) + when new_active? == active? do {:reply, {:error, :bad_activity_request}, state} end - @impl true - def handle_call({:sync_toggle_active, active?}, _from, state) do + defp do_handle_call({:sync_toggle_active, active?}, _from, state) do state = %{state | active?: active?} |> check_and_handle_sync() {:reply, :ok, state} end diff --git a/lib/membrane/testing/mock_resource_guard.ex b/lib/membrane/testing/mock_resource_guard.ex index f7330d97e..922aef342 100644 --- a/lib/membrane/testing/mock_resource_guard.ex +++ b/lib/membrane/testing/mock_resource_guard.ex @@ -17,6 +17,7 @@ defmodule Membrane.Testing.MockResourceGuard do use GenServer require Membrane.Core.Message, as: Message + require Membrane.Core.Utils, as: Utils @type options :: [test_process: pid] @@ -40,24 +41,30 @@ defmodule Membrane.Testing.MockResourceGuard do @impl true def init(options) do - {:ok, Map.new(options)} + Utils.log_on_error do + {:ok, Map.new(options)} + end end @impl true - def handle_info(Message.new(:register, [function, options]), state) do + def handle_info(msg, state) do + Utils.log_on_error do + do_handle_info(msg, state) + end + end + + defp do_handle_info(Message.new(:register, [function, options]), state) do tag = Keyword.fetch!(options, :tag) send_to_test_process(state, :register, {function, tag}) {:noreply, state} end - @impl true - def handle_info(Message.new(:unregister, tag), state) do + defp do_handle_info(Message.new(:unregister, tag), state) do send_to_test_process(state, :unregister, tag) {:noreply, state} end - @impl true - def handle_info(Message.new(:cleanup, tag), state) do + defp do_handle_info(Message.new(:cleanup, tag), state) do send_to_test_process(state, :cleanup, tag) {:noreply, state} end From 83c7e805968e5af702d4dc041b0f1f34b4420095 Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Tue, 11 Jun 2024 12:01:21 +0200 Subject: [PATCH 7/8] Remove leftover --- .formatter.exs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/.formatter.exs b/.formatter.exs index 3434dbcd1..9cf8cc2e6 100644 --- a/.formatter.exs +++ b/.formatter.exs @@ -5,8 +5,7 @@ locals_without_parens = def_options: 1, def_clock: 1, def_type_from_list: 1, - assert_receive_message: 3, - wrap_with_try: 1 + assert_receive_message: 3 ] [ From 8230dcb8c66500c68dd1b53667d901933329d586 Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Tue, 11 Jun 2024 16:54:52 +0200 Subject: [PATCH 8/8] Release v1.1.0 --- CHANGELOG.md | 4 +--- README.md | 2 +- mix.exs | 2 +- 3 files changed, 3 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 89c44f407..becaeca20 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,9 +1,7 @@ # Changelog -## 1.1.0-rc1 +## 1.1.0 * Add new callbacks `handle_child_setup_completed/3` and `handle_child_playing/3` in Bins and Pipelines. [#801](https://github.com/membraneframework/membrane_core/pull/801) - -## 1.1.0-rc0 * Deprecate `handle_spec_started/3` callback in Bins and Pipelines. [#708](https://github.com/membraneframework/membrane_core/pull/708) * Handle buffers from input pads having `flow_control: :auto` only if demand on all output pads having `flow_control: :auto` is positive. [#693](https://github.com/membraneframework/membrane_core/pull/693) * Set `:ratio` dependency version to `"~> 3.0 or ~> 4.0"`. [#780](https://github.com/membraneframework/membrane_core/pull/780) diff --git a/README.md b/README.md index 15ef1f57e..b98b20316 100644 --- a/README.md +++ b/README.md @@ -92,7 +92,7 @@ Apart from plugins, Membrane has stream formats, which live in `membrane_X_forma The API for creating pipelines (and custom elements too) is provided by [membrane_core](https://github.com/membraneframework/membrane_core). To install it, add the following line to your `deps` in `mix.exs` and run `mix deps.get` ```elixir -{:membrane_core, "~> 1.0"} +{:membrane_core, "~> 1.1"} ``` **Standalone libraries** diff --git a/mix.exs b/mix.exs index 5b5023fea..53c0ff36b 100644 --- a/mix.exs +++ b/mix.exs @@ -1,7 +1,7 @@ defmodule Membrane.Mixfile do use Mix.Project - @version "1.1.0-rc1" + @version "1.1.0" @source_ref "v#{@version}" def project do