Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add crash reason handle crash group #720

Merged
merged 12 commits into from
Feb 13, 2024
5 changes: 3 additions & 2 deletions lib/membrane/bin/callback_context.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ defmodule Membrane.Bin.CallbackContext do
Field `:start_of_stream_received?` is present only in
`c:Membrane.Bin.handle_element_end_of_stream/4`.

Fields `:members` and `:crash_initiator` are present only in
`c:Membrane.Pipeline.handle_crash_group_down/3`.
Fields `:members`, `:crash_initiator` and `reason` and are present only in
`c:Membrane.Bin.handle_crash_group_down/3`.
"""
@type t :: %{
:clock => Membrane.Clock.t(),
Expand All @@ -27,6 +27,7 @@ defmodule Membrane.Bin.CallbackContext do
optional(:pad_options) => map(),
optional(:members) => [Membrane.Child.name()],
optional(:crash_initiator) => Membrane.Child.name(),
optional(:reason) => any(),
DominikWolek marked this conversation as resolved.
Show resolved Hide resolved
optional(:start_of_stream_received?) => boolean()
}
end
6 changes: 5 additions & 1 deletion lib/membrane/core/bin/callback_context.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@ defmodule Membrane.Core.Bin.CallbackContext do

@type optional_fields ::
[pad_options: map()]
| [members: [Membrane.Child.name()], crash_initiator: Membrane.Child.name()]
| [
members: [Membrane.Child.name()],
crash_initiator: Membrane.Child.name(),
reason: any()
]
| [start_of_stream_received?: boolean()]

@spec from_state(Membrane.Core.Bin.State.t(), optional_fields()) ::
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,12 @@ defmodule Membrane.Core.Parent.ChildLifeController.CrashGroupUtils do
end
end

def handle_crash_group_member_death(child_name, %CrashGroup{} = group, _reason, state) do
def handle_crash_group_member_death(child_name, %CrashGroup{} = group, reason, state) do
state =
if group.detonating? do
state
else
detonate_crash_group(child_name, group, state)
detonate_crash_group(child_name, group, reason, state)
end

all_members_dead? =
Expand All @@ -72,7 +72,7 @@ defmodule Membrane.Core.Parent.ChildLifeController.CrashGroupUtils do
end
end

defp detonate_crash_group(crash_initiator, %CrashGroup{} = group, state) do
defp detonate_crash_group(crash_initiator, %CrashGroup{} = group, reason, state) do
state = ChildLifeController.remove_children_from_specs(group.members, state)
state = LinkUtils.unlink_crash_group(group, state)

Expand All @@ -88,7 +88,8 @@ defmodule Membrane.Core.Parent.ChildLifeController.CrashGroupUtils do
&%CrashGroup{
&1
| detonating?: true,
crash_initiator: crash_initiator
crash_initiator: crash_initiator,
reason: reason
}
)
end
Expand All @@ -108,7 +109,8 @@ defmodule Membrane.Core.Parent.ChildLifeController.CrashGroupUtils do
context_generator =
&Component.context_from_state(&1,
members: crash_group.members,
crash_initiator: crash_group.crash_initiator
crash_initiator: crash_group.crash_initiator,
reason: crash_group.reason
)

CallbackHandler.exec_and_handle_callback(
Expand Down
7 changes: 5 additions & 2 deletions lib/membrane/core/parent/crash_group.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ defmodule Membrane.Core.Parent.CrashGroup do
# * name - name that identifies the group
# * type - responsible for restart policy of members of groups
# * members - list of members of group
# * reason - reason of the crash

use Bunch.Access

Expand All @@ -15,9 +16,11 @@ defmodule Membrane.Core.Parent.CrashGroup do
mode: :temporary,
members: [Membrane.Child.name()],
detonating?: boolean(),
crash_initiator: Membrane.Child.name()
crash_initiator: Membrane.Child.name(),
reason: any()
}

@enforce_keys [:name, :mode]
defstruct @enforce_keys ++ [members: [], detonating?: false, crash_initiator: nil]
defstruct @enforce_keys ++
[members: [], detonating?: false, crash_initiator: nil, reason: nil]
DominikWolek marked this conversation as resolved.
Show resolved Hide resolved
end
6 changes: 5 additions & 1 deletion lib/membrane/core/pipeline/callback_context.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@ defmodule Membrane.Core.Pipeline.CallbackContext do

@type optional_fields ::
[from: GenServer.from()]
| [members: [Membrane.Child.name()], crash_initiator: Membrane.Child.name()]
| [
members: [Membrane.Child.name()],
crash_initiator: Membrane.Child.name(),
reason: any()
]
| [start_of_stream_received?: boolean()]

@spec from_state(Membrane.Core.Pipeline.State.t(), optional_fields()) ::
Expand Down
3 changes: 2 additions & 1 deletion lib/membrane/pipeline/callback_context.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ defmodule Membrane.Pipeline.CallbackContext do
Field `:start_of_stream_received?` is present only in
`c:Membrane.Pipeline.handle_element_end_of_stream/4`.

Fields `:members` and `:crash_initiator` are present only in
Fields `:members`, `:crash_initiator` and `:reason` are present only in
`c:Membrane.Pipeline.handle_crash_group_down/3`.
"""
@type t :: %{
Expand All @@ -23,6 +23,7 @@ defmodule Membrane.Pipeline.CallbackContext do
optional(:from) => [GenServer.from()],
optional(:members) => [Membrane.Child.name()],
optional(:crash_initiator) => Membrane.Child.name(),
optional(:reason) => any(),
optional(:start_of_stream_received?) => boolean()
}
end
16 changes: 16 additions & 0 deletions test/membrane/integration/child_crash_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,22 @@ defmodule Membrane.Integration.ChildCrashTest do
assert_pipeline_crash_group_down(pipeline_pid, 1)
end

test "Pipeline receive correct crash reason" do
DominikWolek marked this conversation as resolved.
Show resolved Hide resolved
Process.flag(:trap_exit, true)

pipeline_pid = Testing.Pipeline.start_link_supervised!(module: ChildCrashTest.Pipeline)

ChildCrashTest.Pipeline.add_path(pipeline_pid, [], :source, 1, :group_1)

[source_pid] = [:source] |> Enum.map(&get_pid_and_link(&1, pipeline_pid))

ChildCrashTest.Pipeline.inform_about_details_in_case_of_crash(pipeline_pid, self())

Process.exit(source_pid, :crash)

assert_receive({:crash, reason: :crash})
DominikWolek marked this conversation as resolved.
Show resolved Hide resolved
end

test "Crash group consisting of bin crashes" do
Process.flag(:trap_exit, true)

Expand Down
23 changes: 22 additions & 1 deletion test/support/child_crash_test/pipeline.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,30 @@ defmodule Membrane.Support.ChildCrashTest.Pipeline do
child(:center_filter, Filter)
|> child(:sink, Testing.Sink)

{[spec: spec], %{}}
{[spec: spec], %{send_to: nil}}
end

@impl true
def handle_info({:create_path, spec}, _ctx, state) do
{[spec: spec], state}
end

@impl true
def handle_info({:inform_about_crash, send_to}, _ctx, state) do
{[], %{state | send_to: send_to}}
end

@impl true
def handle_crash_group_down(_group_name, _ctx, %{send_to: nil} = state) do
{[], state}
end

@impl true
def handle_crash_group_down(_group_name, %{reason: reason}, %{send_to: pid} = state) do
send(pid, {:crash, reason: reason})
{[], state}
end

@spec add_single_source(pid(), any(), any(), any()) :: any()
def add_single_source(pid, source_name, group \\ nil, source \\ Testing.Source) do
spec = child(source_name, source) |> get_child(:center_filter)
Expand Down Expand Up @@ -92,4 +108,9 @@ defmodule Membrane.Support.ChildCrashTest.Pipeline do

send(pid, {:create_path, spec})
end

@spec inform_about_details_in_case_of_crash(pid(), pid()) :: any()
def inform_about_details_in_case_of_crash(pid, send_to) do
send(pid, {:inform_about_crash, send_to})
DominikWolek marked this conversation as resolved.
Show resolved Hide resolved
end
end