Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure error logs are always printed #822

Merged
merged 8 commits into from
Jun 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
# Changelog

## 1.1.0-rc1
## 1.1.0
* Add new callbacks `handle_child_setup_completed/3` and `handle_child_playing/3` in Bins and Pipelines. [#801](https://github.com/membraneframework/membrane_core/pull/801)

## 1.1.0-rc0
* Deprecate `handle_spec_started/3` callback in Bins and Pipelines. [#708](https://github.com/membraneframework/membrane_core/pull/708)
* Handle buffers from input pads having `flow_control: :auto` only if demand on all output pads having `flow_control: :auto` is positive. [#693](https://github.com/membraneframework/membrane_core/pull/693)
* Set `:ratio` dependency version to `"~> 3.0 or ~> 4.0"`. [#780](https://github.com/membraneframework/membrane_core/pull/780)
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ Apart from plugins, Membrane has stream formats, which live in `membrane_X_forma
The API for creating pipelines (and custom elements too) is provided by [membrane_core](https://github.com/membraneframework/membrane_core). To install it, add the following line to your `deps` in `mix.exs` and run `mix deps.get`

```elixir
{:membrane_core, "~> 1.0"}
{:membrane_core, "~> 1.1"}
```

**Standalone libraries**
Expand Down
41 changes: 28 additions & 13 deletions lib/membrane/clock.ex
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ defmodule Membrane.Clock do
alias Membrane.Core.Message
alias Membrane.Time

require Membrane.Core.Utils, as: Utils

@typedoc @moduledoc
@type t :: pid

Expand Down Expand Up @@ -115,6 +117,12 @@ defmodule Membrane.Clock do

@impl GenServer
def init(options) do
Utils.log_on_error do
do_init(options)
end
end

defp do_init(options) do
proxy_opts = get_proxy_options(options[:proxy], options[:proxy_for])

state =
Expand All @@ -131,7 +139,13 @@ defmodule Membrane.Clock do
end

@impl GenServer
def handle_cast({:proxy_for, proxy_for}, %{proxy: true} = state) do
def handle_cast(request, state) do
Utils.log_on_error do
do_handle_cast(request, state)
end
end

defp do_handle_cast({:proxy_for, proxy_for}, %{proxy: true} = state) do
if state.proxy_for, do: unsubscribe(state.proxy_for)

state = %{state | proxy_for: proxy_for}
Expand All @@ -147,8 +161,7 @@ defmodule Membrane.Clock do
{:noreply, state}
end

@impl GenServer
def handle_cast({:clock_subscribe, pid}, state) do
defp do_handle_cast({:clock_subscribe, pid}, state) do
state
|> update_in([:subscribers, pid], fn
nil ->
Expand All @@ -162,8 +175,7 @@ defmodule Membrane.Clock do
~> {:noreply, &1}
end

@impl GenServer
def handle_cast({:clock_unsubscribe, pid}, state) do
defp do_handle_cast({:clock_unsubscribe, pid}, state) do
if Map.has_key?(state.subscribers, pid) do
{subs, state} =
state |> Bunch.Access.get_updated_in([:subscribers, pid, :subscriptions], &(&1 - 1))
Expand All @@ -175,24 +187,27 @@ defmodule Membrane.Clock do
~> {:noreply, &1}
end

@impl GenServer
def handle_info({:membrane_clock_update, till_next}, %{proxy: false} = state) do
@impl true
def handle_info(msg, state) do
Utils.log_on_error do
do_handle_info(msg, state)
end
end

defp do_handle_info({:membrane_clock_update, till_next}, %{proxy: false} = state) do
{:noreply, handle_clock_update(till_next, state)}
end

@impl GenServer
def handle_info({:membrane_clock_ratio, pid, ratio}, %{proxy: true, proxy_for: pid} = state) do
defp do_handle_info({:membrane_clock_ratio, pid, ratio}, %{proxy: true, proxy_for: pid} = state) do
{:noreply, broadcast_and_update_ratio(ratio, state)}
end

@impl GenServer
# When ratio from previously proxied clock comes in after unsubscribing
def handle_info({:membrane_clock_ratio, _pid, _ratio}, %{proxy: true} = state) do
defp do_handle_info({:membrane_clock_ratio, _pid, _ratio}, %{proxy: true} = state) do
{:noreply, state}
end

@impl GenServer
def handle_info({:DOWN, _ref, :process, pid, _reason}, state) do
defp do_handle_info({:DOWN, _ref, :process, pid, _reason}, state) do
{:noreply, handle_unsubscribe(pid, state)}
end

Expand Down
40 changes: 27 additions & 13 deletions lib/membrane/core/bin.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ defmodule Membrane.Core.Bin do

alias Membrane.ResourceGuard

require Membrane.Core.Utils, as: Utils
require Membrane.Core.Message
require Membrane.Core.Telemetry
require Membrane.Logger
Expand Down Expand Up @@ -79,6 +80,12 @@ defmodule Membrane.Core.Bin do

@impl GenServer
def init(options) do
Utils.log_on_error do
do_init(options)
end
end

defp do_init(options) do
Process.link(options.parent_supervisor)
%{name: name, module: module} = options

Expand Down Expand Up @@ -144,13 +151,17 @@ defmodule Membrane.Core.Bin do

@impl GenServer
def handle_continue(:setup, state) do
state = Parent.LifecycleController.handle_setup(state)
{:noreply, state}
Utils.log_on_error do
state = Parent.LifecycleController.handle_setup(state)
{:noreply, state}
end
end

@impl GenServer
def handle_info(message, state) do
do_handle_info(message, state)
Utils.log_on_error do
do_handle_info(message, state)
end
end

@compile {:inline, do_handle_info: 2}
Expand All @@ -162,7 +173,6 @@ defmodule Membrane.Core.Bin do

defp do_handle_info(Message.new(:parent_notification, notification), state) do
state = Child.LifecycleController.handle_parent_notification(notification, state)

{:noreply, state}
end

Expand Down Expand Up @@ -251,22 +261,26 @@ defmodule Membrane.Core.Bin do
end

@impl GenServer
def handle_call(
Message.new(:handle_link, [direction, this, other, params]),
_from,
state
) do
def handle_call(request, from, state) do
Utils.log_on_error do
do_handle_call(request, from, state)
end
end

defp do_handle_call(
Message.new(:handle_link, [direction, this, other, params]),
_from,
state
) do
{reply, state} = PadController.handle_link(direction, this, other, params, state)
{:reply, reply, state}
end

@impl GenServer
def handle_call(Message.new(:get_clock), _from, state) do
defp do_handle_call(Message.new(:get_clock), _from, state) do
{:reply, state.synchronization.clock, state}
end

@impl GenServer
def handle_call(Message.new(:get_child_pid, child_name), _from, state) do
defp do_handle_call(Message.new(:get_child_pid, child_name), _from, state) do
reply =
with %State{children: %{^child_name => %{pid: child_pid}}} <- state do
{:ok, child_pid}
Expand Down
24 changes: 2 additions & 22 deletions lib/membrane/core/callback_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,6 @@ defmodule Membrane.Core.CallbackHandler do
Error handling actions returned by callback #{inspect(state.module)}.#{callback}
""")

log_debug_orginal_error(actions, e, __STACKTRACE__)

reraise e, __STACKTRACE__
end

Expand All @@ -197,32 +195,14 @@ defmodule Membrane.Core.CallbackHandler do
rescue
e ->
Membrane.Logger.error("""
Error handling action #{inspect(action)} returned by callback #{inspect(state.module)}.#{callback}
Error handling action returned by callback #{inspect(state.module)}.#{callback}.
Action: #{inspect(action, pretty: true)}
""")

log_debug_orginal_error(action, e, __STACKTRACE__)

reraise e, __STACKTRACE__
end
end)

handler_module.handle_end_of_actions(state)
end

# We log it, because sometimes, for some reason, crashing process doesn't cause
# printing error logs on stderr, so this debug log allows us to get some info
# about what happened in case of process crash
defp log_debug_orginal_error(action_or_actions, error, stacktrace) do
action_or_actions =
if(is_list(action_or_actions), do: "actions ", else: "action ") <>
inspect(action_or_actions, limit: :infinity)

Membrane.Logger.debug("""
Error while handling #{action_or_actions}

Orginal error:
#{inspect(error, pretty: true, limit: :infinity)}
#{Exception.format_stacktrace(stacktrace)}
""")
end
end
53 changes: 33 additions & 20 deletions lib/membrane/core/element.ex
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ defmodule Membrane.Core.Element do

alias Membrane.Core.{SubprocessSupervisor, TimerController}

require Membrane.Core.Utils, as: Utils
require Membrane.Core.Message, as: Message
require Membrane.Core.Stalker, as: Stalker
require Membrane.Core.Telemetry, as: Telemetry
Expand Down Expand Up @@ -94,6 +95,12 @@ defmodule Membrane.Core.Element do

@impl GenServer
def init(options) do
Utils.log_on_error do
do_init(options)
end
end

defp do_init(options) do
Process.link(options.parent_supervisor)

observability_config = %{
Expand Down Expand Up @@ -155,50 +162,56 @@ defmodule Membrane.Core.Element do

@impl GenServer
def handle_continue(:setup, state) do
state = LifecycleController.handle_setup(state)
{:noreply, state}
Utils.log_on_error do
state = LifecycleController.handle_setup(state)
{:noreply, state}
end
end

@impl GenServer
def handle_call(Message.new(:get_clock), _from, state) do
def handle_call(request, from, state) do
Utils.log_on_error do
do_handle_call(request, from, state)
end
end

defp do_handle_call(Message.new(:get_clock), _from, state) do
{:reply, state.synchronization.clock, state}
end

@impl GenServer
def handle_call(
Message.new(:handle_link, [direction, this, other, params]),
_from,
state
) do
defp do_handle_call(
Message.new(:handle_link, [direction, this, other, params]),
_from,
state
) do
{reply, state} = PadController.handle_link(direction, this, other, params, state)
{:reply, reply, state}
end

@impl GenServer
def handle_call(Message.new(:set_stream_sync, sync), _from, state) do
defp do_handle_call(Message.new(:set_stream_sync, sync), _from, state) do
state = put_in(state.synchronization.stream_sync, sync)
{:reply, :ok, state}
end

@impl GenServer
def handle_call(Message.new(:get_child_pid, _child_name), _from, state) do
defp do_handle_call(Message.new(:get_child_pid, _child_name), _from, state) do
{:reply, {:error, :element_cannot_have_children}, state}
end

@impl GenServer
def handle_call(message, {pid, _tag}, _state) do
defp do_handle_call(message, {pid, _tag}, _state) do
raise Membrane.ElementError,
"Received invalid message #{inspect(message)} from #{inspect(pid)}"
end

@impl GenServer
def handle_info(message, state) do
Telemetry.report_metric(
:queue_len,
:erlang.process_info(self(), :message_queue_len) |> elem(1)
)
Utils.log_on_error do
Telemetry.report_metric(
:queue_len,
:erlang.process_info(self(), :message_queue_len) |> elem(1)
)

do_handle_info(message, state)
do_handle_info(message, state)
end
end

@compile {:inline, do_handle_info: 2}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ defmodule Membrane.Core.Element.AtomicDemand.DistributedAtomic.Worker do

use GenServer

require Membrane.Core.Utils, as: Utils

@type t :: pid()

@spec start_link(any()) :: {:ok, t}
Expand All @@ -18,25 +20,33 @@ defmodule Membrane.Core.Element.AtomicDemand.DistributedAtomic.Worker do

@impl true
def handle_call({:add_get, atomic_ref, value}, _from, _state) do
result = :atomics.add_get(atomic_ref, 1, value)
{:reply, result, nil}
Utils.log_on_error do
result = :atomics.add_get(atomic_ref, 1, value)
{:reply, result, nil}
end
end

@impl true
def handle_call({:sub_get, atomic_ref, value}, _from, _state) do
result = :atomics.sub_get(atomic_ref, 1, value)
{:reply, result, nil}
Utils.log_on_error do
result = :atomics.sub_get(atomic_ref, 1, value)
{:reply, result, nil}
end
end

@impl true
def handle_call({:get, atomic_ref}, _from, _state) do
result = :atomics.get(atomic_ref, 1)
{:reply, result, nil}
Utils.log_on_error do
result = :atomics.get(atomic_ref, 1)
{:reply, result, nil}
end
end

@impl true
def handle_cast({:put, atomic_ref, value}, _state) do
:atomics.put(atomic_ref, 1, value)
{:noreply, nil}
Utils.log_on_error do
:atomics.put(atomic_ref, 1, value)
{:noreply, nil}
end
end
end
Loading