Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure handle_demand busy wait infinite loop doesn't occur & deprecate message_child #779

Merged
merged 11 commits into from
Mar 20, 2024
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## 1.1.0-rc0
* Deprecate `handle_spec_started/3` callback in Bins and Pipelines. [#708](https://github.com/membraneframework/membrane_core/pull/708)
* Handle buffers from input pads having `flow_control: :auto` only if demand on all output pads having `flow_control: :auto` is positive. [#693](https://github.com/membraneframework/membrane_core/pull/693)
* Deprecate `Membrane.Testing.Pipeline.message_child/3`. Introduce `Membrane.Testing.Pipeline.notify_child/3` instead. [#779](https://github.com/membraneframework/membrane_core/pull/779)

## 1.0.1
* Specify the order in which state fields will be printed in the error logs. [#614](https://github.com/membraneframework/membrane_core/pull/614)
Expand Down
22 changes: 3 additions & 19 deletions lib/membrane/core/element.ex
Original file line number Diff line number Diff line change
Expand Up @@ -137,33 +137,17 @@ defmodule Membrane.Core.Element do
module: options.module,
type: options.module.membrane_element_type(),
name: options.name,
internal_state: nil,
parent_pid: options.parent,
supplying_demand?: false,
delayed_demands: MapSet.new(),
handle_demand_loop_counter: 0,
synchronization: %{
parent_clock: options.parent_clock,
timers: %{},
clock: nil,
stream_sync: options.sync,
latency: 0
},
initialized?: false,
playback: :stopped,
playback_queue: [],
resource_guard: resource_guard,
subprocess_supervisor: options.subprocess_supervisor,
terminating?: false,
setup_incomplete?: false,
effective_flow_control: :push,
handling_action?: false,
popping_auto_flow_queue?: false,
pads_to_snapshot: MapSet.new(),
stalker: options.stalker,
satisfied_auto_output_pads: MapSet.new(),
awaiting_auto_input_pads: MapSet.new(),
auto_input_pads: []
stalker: options.stalker
}
|> PadSpecHandler.init_pads()

Expand Down Expand Up @@ -226,8 +210,8 @@ defmodule Membrane.Core.Element do
{:noreply, state}
end

defp do_handle_info(Message.new(:resume_handle_demand_loop), state) do
state = DemandHandler.handle_delayed_demands(state)
defp do_handle_info(Message.new(:resume_delayed_demands_loop), state) do
state = DemandHandler.resume_delayed_demands_loop(state)
{:noreply, state}
end

Expand Down
15 changes: 13 additions & 2 deletions lib/membrane/core/element/demand_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,12 @@ defmodule Membrane.Core.Element.DemandHandler do
PadModel.set_data!(state, pad_ref, :manual_demand_size, new_manual_demand_size)
end

@spec resume_delayed_demands_loop(State.t()) :: State.t()
def resume_delayed_demands_loop(%State{} = state) do
%{state | resume_delayed_demands_loop_in_mailbox?: false}
|> handle_delayed_demands()
end

@spec handle_delayed_demands(State.t()) :: State.t()
def handle_delayed_demands(%State{} = state) do
# Taking random element of `:delayed_demands` is done to keep data flow
Expand All @@ -125,10 +131,15 @@ defmodule Membrane.Core.Element.DemandHandler do
raise "Cannot handle delayed demands while already supplying demand"

state.handle_demand_loop_counter >= @handle_demand_loop_limit ->
Message.self(:resume_handle_demand_loop)
state =
with %{resume_delayed_demands_loop_in_mailbox?: false} <- state do
Message.self(:resume_delayed_demands_loop)
%{state | resume_delayed_demands_loop_in_mailbox?: true}
end

%{state | handle_demand_loop_counter: 0}

Enum.empty?(state.delayed_demands) ->
MapSet.size(state.delayed_demands) == 0 ->
%{state | handle_demand_loop_counter: 0}

true ->
Expand Down
61 changes: 30 additions & 31 deletions lib/membrane/core/element/state.ex
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ defmodule Membrane.Core.Element.State do
pads_to_snapshot: MapSet.t(),
stalker: Membrane.Core.Stalker.t(),
satisfied_auto_output_pads: MapSet.t(),
awaiting_auto_input_pads: MapSet.t()
awaiting_auto_input_pads: MapSet.t(),
resume_delayed_demands_loop_in_mailbox?: boolean()
}

# READ THIS BEFORE ADDING NEW FIELD!!!
Expand All @@ -59,34 +60,32 @@ defmodule Membrane.Core.Element.State do
# as the last item in the list, because sometimes it is so big, that everything after it
# might be truncated during the inspection.

defstruct [
:module,
:name,
:parent_pid,
:playback,
:type,
:internal_state,
:pad_refs,
:pads_info,
:synchronization,
:delayed_demands,
:effective_flow_control,
:initialized?,
:terminating?,
:setup_incomplete?,
:supplying_demand?,
:handling_action?,
:popping_auto_flow_queue?,
:stalker,
:resource_guard,
:subprocess_supervisor,
:handle_demand_loop_counter,
:demand_size,
:pads_to_snapshot,
:playback_queue,
:pads_data,
:satisfied_auto_output_pads,
:awaiting_auto_input_pads,
:auto_input_pads
]
defstruct module: nil,
name: nil,
parent_pid: nil,
playback: :stopped,
type: nil,
internal_state: nil,
pad_refs: [],
pads_info: %{},
synchronization: nil,
delayed_demands: MapSet.new(),
effective_flow_control: :push,
initialized?: false,
terminating?: false,
setup_incomplete?: false,
supplying_demand?: false,
handling_action?: false,
popping_auto_flow_queue?: false,
stalker: nil,
resource_guard: nil,
subprocess_supervisor: nil,
handle_demand_loop_counter: 0,
pads_to_snapshot: MapSet.new(),
playback_queue: [],
pads_data: %{},
satisfied_auto_output_pads: MapSet.new(),
awaiting_auto_input_pads: MapSet.new(),
auto_input_pads: [],
resume_delayed_demands_loop_in_mailbox?: false
end
23 changes: 20 additions & 3 deletions lib/membrane/testing/pipeline.ex
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ defmodule Membrane.Testing.Pipeline do
## Messaging children

You can send messages to children using their names specified in the children
list. Please check `message_child/3` for more details.
list. Please check `notify_child/3` for more details.

## Example usage

Expand Down Expand Up @@ -202,6 +202,23 @@ defmodule Membrane.Testing.Pipeline do
defdelegate terminate(pipeline, opts \\ []), to: Pipeline

@doc """
Sends notification to a child by Element name.

## Example

Knowing that `pipeline` has child named `sink`, notification can be sent as follows:

notify_child(pipeline, :sink, {:notification, "to handle"})
"""
@spec notify_child(pid(), Element.name(), any()) :: :ok
def notify_child(pipeline, child, notification) do
send(pipeline, {:for_element, child, notification})
:ok
end

@doc """
Deprecated since `v1.1.0-rc0`, use `notify_child/3` instead.

Sends message to a child by Element name.

## Example
Expand All @@ -210,10 +227,10 @@ defmodule Membrane.Testing.Pipeline do

message_child(pipeline, :sink, {:message, "to handle"})
"""
@deprecated "Use #{inspect(__MODULE__)}.notify_child/3 instead"
@spec message_child(pid(), Element.name(), any()) :: :ok
def message_child(pipeline, child, message) do
send(pipeline, {:for_element, child, message})
:ok
notify_child(pipeline, child, message)
end

@doc """
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ defmodule Membrane.Integration.ActionsHandlingOrderTest do
# time for pipeline to play
Process.sleep(100)

Testing.Pipeline.message_child(pipeline, :sink, :start_timer)
Testing.Pipeline.notify_child(pipeline, :sink, :start_timer)

assert_pipeline_notified(pipeline, :sink, :second_tick)

Expand Down
18 changes: 9 additions & 9 deletions test/membrane/integration/auto_demands_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ defmodule Membrane.Integration.AutoDemandsTest do

assert_sink_playing(pipeline, :right_sink)

Pipeline.message_child(pipeline, :right_sink, {:make_demand, 1000})
Pipeline.notify_child(pipeline, :right_sink, {:make_demand, 1000})

Enum.each(1..1000, fn payload ->
assert_sink_buffer(pipeline, :right_sink, buffer)
Expand Down Expand Up @@ -246,15 +246,15 @@ defmodule Membrane.Integration.AutoDemandsTest do
assert_pipeline_notified(pipeline, :source, :playing)

buffers = Enum.map(1..10, &%Membrane.Buffer{payload: &1})
Pipeline.message_child(pipeline, :source, buffer: {:output, buffers})
Pipeline.notify_child(pipeline, :source, buffer: {:output, buffers})

Enum.each(1..100_010, fn i ->
assert_sink_buffer(pipeline, :sink, buffer)
assert buffer.payload == i

if i <= 100_000 do
buffer = %Membrane.Buffer{payload: i + 10}
Pipeline.message_child(pipeline, :source, buffer: {:output, buffer})
Pipeline.notify_child(pipeline, :source, buffer: {:output, buffer})
end
end)

Expand All @@ -276,7 +276,7 @@ defmodule Membrane.Integration.AutoDemandsTest do
assert_pipeline_notified(pipeline, :source, :playing)

buffers = Enum.map(1..100_000, &%Membrane.Buffer{payload: &1})
Pipeline.message_child(pipeline, :source, buffer: {:output, buffers})
Pipeline.notify_child(pipeline, :source, buffer: {:output, buffers})

assert_receive(
{:DOWN, _ref, :process, ^pipeline, {:membrane_child_crash, :sink, _sink_reason}}
Expand Down Expand Up @@ -347,7 +347,7 @@ defmodule Membrane.Integration.AutoDemandsTest do
assert length(buffers) == manual_flow_queue_size

demand = 10_000
Pipeline.message_child(pipeline, :sink, {:make_demand, demand})
Pipeline.notify_child(pipeline, :sink, {:make_demand, demand})

buffers = receive_processed_buffers(pipeline, 2 * demand)
buffers_number = length(buffers)
Expand Down Expand Up @@ -379,7 +379,7 @@ defmodule Membrane.Integration.AutoDemandsTest do

assert_pipeline_notified(pipeline, :filter, :playing)

Pipeline.message_child(pipeline, :filter, pause_auto_demand: Pad.ref(:input, 0))
Pipeline.notify_child(pipeline, :filter, pause_auto_demand: Pad.ref(:input, 0))

# time for :filter to pause demand on Pad.ref(:input, 0)
Process.sleep(500)
Expand All @@ -388,7 +388,7 @@ defmodule Membrane.Integration.AutoDemandsTest do
assert length(buffers) == manual_flow_queue_size

demand = 10_000
Pipeline.message_child(pipeline, :sink, {:make_demand, demand})
Pipeline.notify_child(pipeline, :sink, {:make_demand, demand})

# fliter paused auto demand on Pad.ref(:input, 0), so it should receive
# at most auto_flow_demand_size buffers from there and rest of the buffers
Expand All @@ -410,12 +410,12 @@ defmodule Membrane.Integration.AutoDemandsTest do
# rest of them came from {:source, 1}
assert demand - auto_flow_demand_size <= counter_1

Pipeline.message_child(pipeline, :filter, resume_auto_demand: Pad.ref(:input, 0))
Pipeline.notify_child(pipeline, :filter, resume_auto_demand: Pad.ref(:input, 0))

# time for :filter to resume demand on Pad.ref(:input, 0)
Process.sleep(500)

Pipeline.message_child(pipeline, :sink, {:make_demand, demand})
Pipeline.notify_child(pipeline, :sink, {:make_demand, demand})

buffers = receive_processed_buffers(pipeline, 2 * demand)
buffers_number = length(buffers)
Expand Down
79 changes: 79 additions & 0 deletions test/membrane/integration/delayed_demands_loop_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
defmodule Membrane.Test.DelayedDemandsLoopTest do
use ExUnit.Case, async: true

import Membrane.ChildrenSpec
import Membrane.Testing.Assertions

alias Membrane.Buffer
alias Membrane.Debug
alias Membrane.Testing

defmodule Source do
use Membrane.Source

defmodule StreamFormat do
defstruct []
end

@sleep_time 5

def_output_pad :output,
accepted_format: _any,
availability: :on_request,
flow_control: :manual

@impl true
def handle_demand(_pad, _size, :buffers, %{pads: pads}, state) do
Process.sleep(@sleep_time)

stream_format_actions =
Enum.flat_map(pads, fn
{pad_ref, %{start_of_stream?: false}} -> [stream_format: {pad_ref, %StreamFormat{}}]
_pad_entry -> []
end)

buffer = %Buffer{payload: "a"}

buffer_and_redemand_actions =
Map.keys(pads)
|> Enum.flat_map(&[buffer: {&1, buffer}, redemand: &1])

{stream_format_actions ++ buffer_and_redemand_actions, state}
end

@impl true
def handle_parent_notification(:request, _ctx, state) do
{[notify_parent: :response], state}
end
end

describe "delayed demands loop pauses from time to time, when source has" do
test "1 pad", do: do_test(1)
test "2 pads", do: do_test(2)
test "10 pads", do: do_test(10)
end

defp do_test(sinks_number) do
auto_demand_size = 20

spec =
Stream.repeatedly(fn ->
get_child(:source)
|> via_in(:input, auto_demand_size: auto_demand_size)
|> child(Debug.Sink)
end)
|> Stream.take(sinks_number)
|> Enum.concat([child(:source, Source)])

pipeline = Testing.Pipeline.start_link_supervised!(spec: spec)

Process.sleep(1_000)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can assert_start_of_stream on each sink instead


for _i <- 1..(auto_demand_size + 5) do
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this number?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To ensure, that this line

DemandHandler.handle_redemand(pad_data.ref, state)

will be executed

Copy link
Member Author

@FelonEkonom FelonEkonom Mar 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now I think, that it would be good to have auto_demand_size < delayed_demand_loop_counter_limit, to ensure, that counter won't reset after reaching linked code. I will change this

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so let’s change the naming or add a comment indicating it’s to exceed the loop counter limit

Testing.Pipeline.notify_child(pipeline, :source, :request)
assert_pipeline_notified(pipeline, :source, :response)
end

Testing.Pipeline.terminate(pipeline)
end
end
4 changes: 2 additions & 2 deletions test/membrane/integration/demands_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ defmodule Membrane.Integration.DemandsTest do
assert_sink_playing(pid, :sink)

demand = 500
Pipeline.message_child(pid, :sink, {:make_demand, demand})
Pipeline.notify_child(pid, :sink, {:make_demand, demand})

0..(demand - 1)
|> assert_buffers_received(pid)

pattern = %Buffer{payload: <<demand::16>> <> <<255>>}
refute_sink_buffer(pid, :sink, ^pattern, 0)
Pipeline.message_child(pid, :sink, {:make_demand, demand})
Pipeline.notify_child(pid, :sink, {:make_demand, demand})

demand..(2 * demand - 1)
|> assert_buffers_received(pid)
Expand Down
Loading
Loading