Skip to content

Commit

Permalink
Write test for delayed demands loop, send at most 1 resume delayed de…
Browse files Browse the repository at this point in the history
…mands loop message at the time
  • Loading branch information
FelonEkonom committed Mar 19, 2024
1 parent 4a24e93 commit 150069c
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 6 deletions.
4 changes: 2 additions & 2 deletions lib/membrane/core/element.ex
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,8 @@ defmodule Membrane.Core.Element do
{:noreply, state}
end

defp do_handle_info(Message.new(:resume_handle_demand_loop), state) do
state = DemandHandler.handle_delayed_demands(state)
defp do_handle_info(Message.new(:resume_delayed_demands_loop), state) do
state = DemandHandler.handle_resume_delayed_demands_loop(state)
{:noreply, state}
end

Expand Down
15 changes: 13 additions & 2 deletions lib/membrane/core/element/demand_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,12 @@ defmodule Membrane.Core.Element.DemandHandler do
PadModel.set_data!(state, pad_ref, :manual_demand_size, new_manual_demand_size)
end

@spec handle_resume_delayed_demands_loop(State.t()) :: State.t()
def handle_resume_delayed_demands_loop(%State{} = state) do
%{state | resume_delayed_demands_loop_in_mailbox?: false}
|> handle_delayed_demands()
end

@spec handle_delayed_demands(State.t()) :: State.t()
def handle_delayed_demands(%State{} = state) do
# Taking random element of `:delayed_demands` is done to keep data flow
Expand All @@ -125,10 +131,15 @@ defmodule Membrane.Core.Element.DemandHandler do
raise "Cannot handle delayed demands while already supplying demand"

state.handle_demand_loop_counter >= @handle_demand_loop_limit ->
Message.self(:resume_handle_demand_loop)
state =
with %{resume_delayed_demands_loop_in_mailbox?: false} <- state do
Message.self(:resume_delayed_demands_loop)
%{state | resume_delayed_demands_loop_in_mailbox?: true}
end

%{state | handle_demand_loop_counter: 0}

Enum.empty?(state.delayed_demands) ->
MapSet.size(state.delayed_demands) == 0 ->
%{state | handle_demand_loop_counter: 0}

true ->
Expand Down
6 changes: 4 additions & 2 deletions lib/membrane/core/element/state.ex
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ defmodule Membrane.Core.Element.State do
pads_to_snapshot: MapSet.t(),
stalker: Membrane.Core.Stalker.t(),
satisfied_auto_output_pads: MapSet.t(),
awaiting_auto_input_pads: MapSet.t()
awaiting_auto_input_pads: MapSet.t(),
resume_delayed_demands_loop_in_mailbox?: boolean()
}

# READ THIS BEFORE ADDING NEW FIELD!!!
Expand Down Expand Up @@ -87,6 +88,7 @@ defmodule Membrane.Core.Element.State do
:pads_data,
:satisfied_auto_output_pads,
:awaiting_auto_input_pads,
:auto_input_pads
:auto_input_pads,
resume_delayed_demands_loop_in_mailbox?: false
]
end
81 changes: 81 additions & 0 deletions test/membrane/integration/delayed_demands_loop_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
defmodule Membrane.Test.DelayedDemandsLoopTest do
use ExUnit.Case, async: true

import Membrane.ChildrenSpec
import Membrane.Testing.Assertions

alias Membrane.Buffer
alias Membrane.Debug
alias Membrane.Testing

defmodule Source do
use Membrane.Source

defmodule StreamFormat do
defstruct []
end

@sleep_time 5
@spec sleep_time() :: integer()
def sleep_time(), do: @sleep_time

def_output_pad :output,
accepted_format: _any,
availability: :on_request,
flow_control: :manual

@impl true
def handle_demand(_pad, _size, :buffers, %{pads: pads}, state) do
Process.sleep(@sleep_time)

stream_format_actions =
Enum.flat_map(pads, fn
{pad_ref, %{start_of_stream?: false}} -> [stream_format: {pad_ref, %StreamFormat{}}]
_pad_entry -> []
end)

buffer = %Buffer{payload: "a"}

buffer_and_redemand_actions =
Map.keys(pads)
|> Enum.flat_map(&[buffer: {&1, buffer}, redemand: &1])

{stream_format_actions ++ buffer_and_redemand_actions, state}
end

@impl true
def handle_parent_notification(:request, _ctx, state) do
{[notify_parent: :response], state}
end
end

describe "atomic demand busy wait loop doesn't occur when source has" do
test "1 pad", do: do_test(1)
test "2 pads", do: do_test(2)
test "10 pads", do: do_test(10)
end

defp do_test(sinks_number) do
auto_demand_size = 20

spec =
Stream.repeatedly(fn ->
get_child(:source)
|> via_in(:input, auto_demand_size: auto_demand_size)
|> child(Debug.Sink)
end)
|> Stream.take(sinks_number)
|> Enum.concat([child(:source, Source)])

pipeline = Testing.Pipeline.start_link_supervised!(spec: spec)

Process.sleep(1_000)

for _i <- 1..(auto_demand_size + 5) do
Testing.Pipeline.notify_child(pipeline, :source, :request)
assert_pipeline_notified(pipeline, :source, :response)
end

Testing.Pipeline.terminate(pipeline)
end
end

0 comments on commit 150069c

Please sign in to comment.