Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure handle_demand busy wait infinite loop doesn't occur & deprecate message_child #779

Merged
merged 11 commits into from
Mar 20, 2024
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## 1.1.0-rc0
* Deprecate `handle_spec_started/3` callback in Bins and Pipelines. [#708](https://github.com/membraneframework/membrane_core/pull/708)
* Handle buffers from input pads having `flow_control: :auto` only if demand on all output pads having `flow_control: :auto` is positive. [#693](https://github.com/membraneframework/membrane_core/pull/693)
* Deprecate `Membrane.Testing.Pipeline.message_child/3`. Introduce `Membrane.Testing.Pipeline.notify_child/3` instead. [#779](https://github.com/membraneframework/membrane_core/pull/779)

## 1.0.1
* Specify the order in which state fields will be printed in the error logs. [#614](https://github.com/membraneframework/membrane_core/pull/614)
Expand Down
22 changes: 3 additions & 19 deletions lib/membrane/core/element.ex
Original file line number Diff line number Diff line change
Expand Up @@ -137,33 +137,17 @@ defmodule Membrane.Core.Element do
module: options.module,
type: options.module.membrane_element_type(),
name: options.name,
internal_state: nil,
parent_pid: options.parent,
supplying_demand?: false,
delayed_demands: MapSet.new(),
handle_demand_loop_counter: 0,
synchronization: %{
parent_clock: options.parent_clock,
timers: %{},
clock: nil,
stream_sync: options.sync,
latency: 0
},
initialized?: false,
playback: :stopped,
playback_queue: [],
resource_guard: resource_guard,
subprocess_supervisor: options.subprocess_supervisor,
terminating?: false,
setup_incomplete?: false,
effective_flow_control: :push,
handling_action?: false,
popping_auto_flow_queue?: false,
pads_to_snapshot: MapSet.new(),
stalker: options.stalker,
satisfied_auto_output_pads: MapSet.new(),
awaiting_auto_input_pads: MapSet.new(),
auto_input_pads: []
stalker: options.stalker
}
|> PadSpecHandler.init_pads()

Expand Down Expand Up @@ -226,8 +210,8 @@ defmodule Membrane.Core.Element do
{:noreply, state}
end

defp do_handle_info(Message.new(:resume_handle_demand_loop), state) do
state = DemandHandler.handle_delayed_demands(state)
defp do_handle_info(Message.new(:resume_delayed_demands_loop), state) do
state = DemandHandler.resume_delayed_demands_loop(state)
{:noreply, state}
end

Expand Down
15 changes: 13 additions & 2 deletions lib/membrane/core/element/demand_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,12 @@ defmodule Membrane.Core.Element.DemandHandler do
PadModel.set_data!(state, pad_ref, :manual_demand_size, new_manual_demand_size)
end

@spec resume_delayed_demands_loop(State.t()) :: State.t()
def resume_delayed_demands_loop(%State{} = state) do
%{state | resume_delayed_demands_loop_in_mailbox?: false}
|> handle_delayed_demands()
end

@spec handle_delayed_demands(State.t()) :: State.t()
def handle_delayed_demands(%State{} = state) do
# Taking random element of `:delayed_demands` is done to keep data flow
Expand All @@ -125,10 +131,15 @@ defmodule Membrane.Core.Element.DemandHandler do
raise "Cannot handle delayed demands while already supplying demand"

state.handle_demand_loop_counter >= @handle_demand_loop_limit ->
Message.self(:resume_handle_demand_loop)
state =
with %{resume_delayed_demands_loop_in_mailbox?: false} <- state do
Message.self(:resume_delayed_demands_loop)
%{state | resume_delayed_demands_loop_in_mailbox?: true}
end

%{state | handle_demand_loop_counter: 0}

Enum.empty?(state.delayed_demands) ->
MapSet.size(state.delayed_demands) == 0 ->
%{state | handle_demand_loop_counter: 0}

true ->
Expand Down
61 changes: 30 additions & 31 deletions lib/membrane/core/element/state.ex
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ defmodule Membrane.Core.Element.State do
pads_to_snapshot: MapSet.t(),
stalker: Membrane.Core.Stalker.t(),
satisfied_auto_output_pads: MapSet.t(),
awaiting_auto_input_pads: MapSet.t()
awaiting_auto_input_pads: MapSet.t(),
resume_delayed_demands_loop_in_mailbox?: boolean()
}

# READ THIS BEFORE ADDING NEW FIELD!!!
Expand All @@ -59,34 +60,32 @@ defmodule Membrane.Core.Element.State do
# as the last item in the list, because sometimes it is so big, that everything after it
# might be truncated during the inspection.

defstruct [
:module,
:name,
:parent_pid,
:playback,
:type,
:internal_state,
:pad_refs,
:pads_info,
:synchronization,
:delayed_demands,
:effective_flow_control,
:initialized?,
:terminating?,
:setup_incomplete?,
:supplying_demand?,
:handling_action?,
:popping_auto_flow_queue?,
:stalker,
:resource_guard,
:subprocess_supervisor,
:handle_demand_loop_counter,
:demand_size,
:pads_to_snapshot,
:playback_queue,
:pads_data,
:satisfied_auto_output_pads,
:awaiting_auto_input_pads,
:auto_input_pads
]
defstruct module: nil,
name: nil,
parent_pid: nil,
playback: :stopped,
type: nil,
internal_state: nil,
pad_refs: [],
pads_info: %{},
synchronization: nil,
delayed_demands: MapSet.new(),
effective_flow_control: :push,
initialized?: false,
terminating?: false,
setup_incomplete?: false,
supplying_demand?: false,
handling_action?: false,
popping_auto_flow_queue?: false,
stalker: nil,
resource_guard: nil,
subprocess_supervisor: nil,
handle_demand_loop_counter: 0,
pads_to_snapshot: MapSet.new(),
playback_queue: [],
pads_data: %{},
satisfied_auto_output_pads: MapSet.new(),
awaiting_auto_input_pads: MapSet.new(),
auto_input_pads: [],
resume_delayed_demands_loop_in_mailbox?: false
end
23 changes: 20 additions & 3 deletions lib/membrane/testing/pipeline.ex
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ defmodule Membrane.Testing.Pipeline do
## Messaging children

You can send messages to children using their names specified in the children
list. Please check `message_child/3` for more details.
list. Please check `notify_child/3` for more details.

## Example usage

Expand Down Expand Up @@ -202,6 +202,23 @@ defmodule Membrane.Testing.Pipeline do
defdelegate terminate(pipeline, opts \\ []), to: Pipeline

@doc """
Sends notification to a child by Element name.

## Example

Knowing that `pipeline` has child named `sink`, notification can be sent as follows:

notify_child(pipeline, :sink, {:notification, "to handle"})
"""
@spec notify_child(pid(), Element.name(), any()) :: :ok
def notify_child(pipeline, child, notification) do
send(pipeline, {:for_element, child, notification})
:ok
end

@doc """
Deprecated since `v1.1.0-rc0`, use `notify_child/3` instead.

Sends message to a child by Element name.

## Example
Expand All @@ -210,10 +227,10 @@ defmodule Membrane.Testing.Pipeline do

message_child(pipeline, :sink, {:message, "to handle"})
"""
@deprecated "Use #{inspect(__MODULE__)}.notify_child/3 instead"
@spec message_child(pid(), Element.name(), any()) :: :ok
def message_child(pipeline, child, message) do
send(pipeline, {:for_element, child, message})
:ok
notify_child(pipeline, child, message)
end

@doc """
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ defmodule Membrane.Integration.ActionsHandlingOrderTest do
# time for pipeline to play
Process.sleep(100)

Testing.Pipeline.message_child(pipeline, :sink, :start_timer)
Testing.Pipeline.notify_child(pipeline, :sink, :start_timer)

assert_pipeline_notified(pipeline, :sink, :second_tick)

Expand Down
18 changes: 9 additions & 9 deletions test/membrane/integration/auto_demands_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ defmodule Membrane.Integration.AutoDemandsTest do

assert_sink_playing(pipeline, :right_sink)

Pipeline.message_child(pipeline, :right_sink, {:make_demand, 1000})
Pipeline.notify_child(pipeline, :right_sink, {:make_demand, 1000})

Enum.each(1..1000, fn payload ->
assert_sink_buffer(pipeline, :right_sink, buffer)
Expand Down Expand Up @@ -246,15 +246,15 @@ defmodule Membrane.Integration.AutoDemandsTest do
assert_pipeline_notified(pipeline, :source, :playing)

buffers = Enum.map(1..10, &%Membrane.Buffer{payload: &1})
Pipeline.message_child(pipeline, :source, buffer: {:output, buffers})
Pipeline.notify_child(pipeline, :source, buffer: {:output, buffers})

Enum.each(1..100_010, fn i ->
assert_sink_buffer(pipeline, :sink, buffer)
assert buffer.payload == i

if i <= 100_000 do
buffer = %Membrane.Buffer{payload: i + 10}
Pipeline.message_child(pipeline, :source, buffer: {:output, buffer})
Pipeline.notify_child(pipeline, :source, buffer: {:output, buffer})
end
end)

Expand All @@ -276,7 +276,7 @@ defmodule Membrane.Integration.AutoDemandsTest do
assert_pipeline_notified(pipeline, :source, :playing)

buffers = Enum.map(1..100_000, &%Membrane.Buffer{payload: &1})
Pipeline.message_child(pipeline, :source, buffer: {:output, buffers})
Pipeline.notify_child(pipeline, :source, buffer: {:output, buffers})

assert_receive(
{:DOWN, _ref, :process, ^pipeline, {:membrane_child_crash, :sink, _sink_reason}}
Expand Down Expand Up @@ -347,7 +347,7 @@ defmodule Membrane.Integration.AutoDemandsTest do
assert length(buffers) == manual_flow_queue_size

demand = 10_000
Pipeline.message_child(pipeline, :sink, {:make_demand, demand})
Pipeline.notify_child(pipeline, :sink, {:make_demand, demand})

buffers = receive_processed_buffers(pipeline, 2 * demand)
buffers_number = length(buffers)
Expand Down Expand Up @@ -379,7 +379,7 @@ defmodule Membrane.Integration.AutoDemandsTest do

assert_pipeline_notified(pipeline, :filter, :playing)

Pipeline.message_child(pipeline, :filter, pause_auto_demand: Pad.ref(:input, 0))
Pipeline.notify_child(pipeline, :filter, pause_auto_demand: Pad.ref(:input, 0))

# time for :filter to pause demand on Pad.ref(:input, 0)
Process.sleep(500)
Expand All @@ -388,7 +388,7 @@ defmodule Membrane.Integration.AutoDemandsTest do
assert length(buffers) == manual_flow_queue_size

demand = 10_000
Pipeline.message_child(pipeline, :sink, {:make_demand, demand})
Pipeline.notify_child(pipeline, :sink, {:make_demand, demand})

# fliter paused auto demand on Pad.ref(:input, 0), so it should receive
# at most auto_flow_demand_size buffers from there and rest of the buffers
Expand All @@ -410,12 +410,12 @@ defmodule Membrane.Integration.AutoDemandsTest do
# rest of them came from {:source, 1}
assert demand - auto_flow_demand_size <= counter_1

Pipeline.message_child(pipeline, :filter, resume_auto_demand: Pad.ref(:input, 0))
Pipeline.notify_child(pipeline, :filter, resume_auto_demand: Pad.ref(:input, 0))

# time for :filter to resume demand on Pad.ref(:input, 0)
Process.sleep(500)

Pipeline.message_child(pipeline, :sink, {:make_demand, demand})
Pipeline.notify_child(pipeline, :sink, {:make_demand, demand})

buffers = receive_processed_buffers(pipeline, 2 * demand)
buffers_number = length(buffers)
Expand Down
83 changes: 83 additions & 0 deletions test/membrane/integration/delayed_demands_loop_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
defmodule Membrane.Test.DelayedDemandsLoopTest do
use ExUnit.Case, async: true

import Membrane.ChildrenSpec
import Membrane.Testing.Assertions

alias Membrane.Buffer
alias Membrane.Debug
alias Membrane.Testing

defmodule Source do
use Membrane.Source

defmodule StreamFormat do
defstruct []
end

@sleep_time 5

def_output_pad :output,
accepted_format: _any,
availability: :on_request,
flow_control: :manual

@impl true
def handle_demand(_pad, _size, :buffers, %{pads: pads}, state) do
Process.sleep(@sleep_time)

stream_format_actions =
Enum.flat_map(pads, fn
{pad_ref, %{start_of_stream?: false}} -> [stream_format: {pad_ref, %StreamFormat{}}]
_pad_entry -> []
end)

buffer = %Buffer{payload: "a"}

buffer_and_redemand_actions =
Map.keys(pads)
|> Enum.flat_map(&[buffer: {&1, buffer}, redemand: &1])

{stream_format_actions ++ buffer_and_redemand_actions, state}
end

@impl true
def handle_parent_notification(:request, _ctx, state) do
{[notify_parent: :response], state}
end
end

describe "delayed demands loop pauses from time to time, when source has" do
test "1 pad", do: do_test(1)
test "2 pads", do: do_test(2)
test "10 pads", do: do_test(10)
end

defp do_test(sinks_number) do
# auto_demand_size is smaller than delayed_demands_loop_counter_limit, to ensure that
# after a snapshot, the counter is not reset
auto_demand_size = 15
requests_number = 20

spec =
[child(:source, Source)] ++
for i <- 1..sinks_number do
get_child(:source)
|> via_in(:input, auto_demand_size: auto_demand_size)
|> child({:sink, i}, Debug.Sink)
end

pipeline = Testing.Pipeline.start_link_supervised!(spec: spec)

for i <- 1..sinks_number do
assert_start_of_stream(pipeline, {:sink, ^i})
end

for _i <- 1..requests_number do
Testing.Pipeline.notify_child(pipeline, :source, :request)
assert_pipeline_notified(pipeline, :source, :response)
end

Testing.Pipeline.terminate(pipeline)
end
end
4 changes: 2 additions & 2 deletions test/membrane/integration/demands_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ defmodule Membrane.Integration.DemandsTest do
assert_sink_playing(pid, :sink)

demand = 500
Pipeline.message_child(pid, :sink, {:make_demand, demand})
Pipeline.notify_child(pid, :sink, {:make_demand, demand})

0..(demand - 1)
|> assert_buffers_received(pid)

pattern = %Buffer{payload: <<demand::16>> <> <<255>>}
refute_sink_buffer(pid, :sink, ^pattern, 0)
Pipeline.message_child(pid, :sink, {:make_demand, demand})
Pipeline.notify_child(pid, :sink, {:make_demand, demand})

demand..(2 * demand - 1)
|> assert_buffers_received(pid)
Expand Down
Loading
Loading