From 2ed8f4d319a114c014f53cdd0a6522c09d46e2a6 Mon Sep 17 00:00:00 2001 From: Jakub Pryc <94321002+Noarkhh@users.noreply.github.com> Date: Thu, 14 Mar 2024 13:28:45 +0100 Subject: [PATCH 01/10] Update demand type to allow for the default demand size syntax (#768) * Update demand type to allow for the default demand size syntax --- lib/membrane/element/action.ex | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/lib/membrane/element/action.ex b/lib/membrane/element/action.ex index 055821142..0d5db32a9 100644 --- a/lib/membrane/element/action.ex +++ b/lib/membrane/element/action.ex @@ -82,11 +82,12 @@ defmodule Membrane.Element.Action do guaranteed not to receive more data than demanded. Demand size can be either a non-negative integer, that overrides existing demand, - or a function that is passed current demand, and is to return the new demand. + or a function that is passed current demand, and is to return the new demand. In case only pad + is specified, the demand size defaults to 1. Allowed only when playback is playing. """ - @type demand :: {:demand, {Pad.ref(), demand_size}} + @type demand :: {:demand, {Pad.ref(), demand_size} | Pad.ref()} @type demand_size :: pos_integer | (pos_integer() -> non_neg_integer()) @typedoc """ From aef442d6cb1e95a0921f3485ee6b2b5b246ee3ca Mon Sep 17 00:00:00 2001 From: Mateusz Front Date: Mon, 18 Mar 2024 14:21:33 +0100 Subject: [PATCH 02/10] improve resource guard test (#777) --- test/membrane/resource_guard_test.exs | 32 ++++++++++++--------------- 1 file changed, 14 insertions(+), 18 deletions(-) diff --git a/test/membrane/resource_guard_test.exs b/test/membrane/resource_guard_test.exs index b06ff9636..b41ce3479 100644 --- a/test/membrane/resource_guard_test.exs +++ b/test/membrane/resource_guard_test.exs @@ -14,11 +14,8 @@ defmodule Membrane.ResourceGuardTest do @impl true def handle_setup(ctx, state) do - {:ok, pid} = Task.start(fn -> Process.sleep(:infinity) end) - Process.register(pid, :membrane_resource_guard_test_element_resource) - ResourceGuard.register(ctx.resource_guard, fn -> - Process.exit(pid, :shutdown) + send(:membrane_resource_guard_test_process, :element_guard_triggered) end) {[notify_parent: :ready], state} @@ -32,11 +29,8 @@ defmodule Membrane.ResourceGuardTest do @impl true def handle_setup(ctx, state) do - {:ok, pid} = Task.start(fn -> Process.sleep(:infinity) end) - Process.register(pid, :membrane_resource_guard_test_bin_resource) - ResourceGuard.register(ctx.resource_guard, fn -> - Process.exit(pid, :shutdown) + send(:membrane_resource_guard_test_process, :bin_guard_triggered) end) {[notify_parent: :ready], state} @@ -50,17 +44,16 @@ defmodule Membrane.ResourceGuardTest do @impl true def handle_call(:setup_guard, ctx, state) do - {:ok, pid} = Task.start(fn -> Process.sleep(:infinity) end) - Process.register(pid, :membrane_resource_guard_test_pipeline_resource) - ResourceGuard.register(ctx.resource_guard, fn -> - Process.exit(pid, :shutdown) + send(:membrane_resource_guard_test_process, :pipeline_guard_triggered) end) {[reply: :ready], state} end end + Process.register(self(), :membrane_resource_guard_test_process) + pipeline = Testing.Pipeline.start_link_supervised!(module: Pipeline) Testing.Pipeline.execute_actions(pipeline, @@ -68,19 +61,22 @@ defmodule Membrane.ResourceGuardTest do ) assert_pipeline_notified(pipeline, :element, :ready) - monitor_ref = Process.monitor(:membrane_resource_guard_test_element_resource) Testing.Pipeline.execute_actions(pipeline, remove_children: :element) - assert_receive {:DOWN, ^monitor_ref, :process, _pid, :shutdown} + assert_receive :element_guard_triggered assert_pipeline_notified(pipeline, :bin, :ready) - monitor_ref = Process.monitor(:membrane_resource_guard_test_bin_resource) Testing.Pipeline.execute_actions(pipeline, remove_children: :bin) - assert_receive {:DOWN, ^monitor_ref, :process, _pid, :shutdown} + assert_receive :bin_guard_triggered + + Testing.Pipeline.execute_actions(pipeline, + spec: [child(:element, Element), child(:bin, Bin)] + ) assert :ready = Membrane.Pipeline.call(pipeline, :setup_guard) - monitor_ref = Process.monitor(:membrane_resource_guard_test_pipeline_resource) Membrane.Pipeline.terminate(pipeline) - assert_receive {:DOWN, ^monitor_ref, :process, _pid, :shutdown} + assert_receive :element_guard_triggered + assert_receive :bin_guard_triggered + assert_receive :pipeline_guard_triggered end test "Resources can be cleaned up manually and automatically when the owner process dies" do From 4a24e93a47bfe828ff139eb4185362d018ddceb5 Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Mon, 18 Mar 2024 15:06:43 +0100 Subject: [PATCH 03/10] Membrane.Testing.Pipeline.message_child/3 -> notify_child/3 --- lib/membrane/testing/pipeline.ex | 23 ++++++++++++++++--- .../actions_handling_order_test.exs | 2 +- .../integration/auto_demands_test.exs | 18 +++++++-------- test/membrane/integration/demands_test.exs | 4 ++-- .../no_stream_format_crash_test.exs | 4 ++-- 5 files changed, 34 insertions(+), 17 deletions(-) diff --git a/lib/membrane/testing/pipeline.ex b/lib/membrane/testing/pipeline.ex index 73becaa8c..f37138491 100644 --- a/lib/membrane/testing/pipeline.ex +++ b/lib/membrane/testing/pipeline.ex @@ -52,7 +52,7 @@ defmodule Membrane.Testing.Pipeline do ## Messaging children You can send messages to children using their names specified in the children - list. Please check `message_child/3` for more details. + list. Please check `notify_child/3` for more details. ## Example usage @@ -202,6 +202,23 @@ defmodule Membrane.Testing.Pipeline do defdelegate terminate(pipeline, opts \\ []), to: Pipeline @doc """ + Sends notification to a child by Element name. + + ## Example + + Knowing that `pipeline` has child named `sink`, notification can be sent as follows: + + notify_child(pipeline, :sink, {:notification, "to handle"}) + """ + @spec notify_child(pid(), Element.name(), any()) :: :ok + def notify_child(pipeline, child, notification) do + send(pipeline, {:for_element, child, notification}) + :ok + end + + @doc """ + Deprecated since `v1.1.0-rc0`, use `notify_child/3` instead. + Sends message to a child by Element name. ## Example @@ -210,10 +227,10 @@ defmodule Membrane.Testing.Pipeline do message_child(pipeline, :sink, {:message, "to handle"}) """ + @deprecated "Use #{inspect(__MODULE__)}.notify_child/3 instead" @spec message_child(pid(), Element.name(), any()) :: :ok def message_child(pipeline, child, message) do - send(pipeline, {:for_element, child, message}) - :ok + notify_child(pipeline, child, message) end @doc """ diff --git a/test/membrane/integration/actions_handling_order_test.exs b/test/membrane/integration/actions_handling_order_test.exs index f54ac0d49..8d9476970 100644 --- a/test/membrane/integration/actions_handling_order_test.exs +++ b/test/membrane/integration/actions_handling_order_test.exs @@ -176,7 +176,7 @@ defmodule Membrane.Integration.ActionsHandlingOrderTest do # time for pipeline to play Process.sleep(100) - Testing.Pipeline.message_child(pipeline, :sink, :start_timer) + Testing.Pipeline.notify_child(pipeline, :sink, :start_timer) assert_pipeline_notified(pipeline, :sink, :second_tick) diff --git a/test/membrane/integration/auto_demands_test.exs b/test/membrane/integration/auto_demands_test.exs index 702f8b1f1..2a25604a2 100644 --- a/test/membrane/integration/auto_demands_test.exs +++ b/test/membrane/integration/auto_demands_test.exs @@ -129,7 +129,7 @@ defmodule Membrane.Integration.AutoDemandsTest do assert_sink_playing(pipeline, :right_sink) - Pipeline.message_child(pipeline, :right_sink, {:make_demand, 1000}) + Pipeline.notify_child(pipeline, :right_sink, {:make_demand, 1000}) Enum.each(1..1000, fn payload -> assert_sink_buffer(pipeline, :right_sink, buffer) @@ -246,7 +246,7 @@ defmodule Membrane.Integration.AutoDemandsTest do assert_pipeline_notified(pipeline, :source, :playing) buffers = Enum.map(1..10, &%Membrane.Buffer{payload: &1}) - Pipeline.message_child(pipeline, :source, buffer: {:output, buffers}) + Pipeline.notify_child(pipeline, :source, buffer: {:output, buffers}) Enum.each(1..100_010, fn i -> assert_sink_buffer(pipeline, :sink, buffer) @@ -254,7 +254,7 @@ defmodule Membrane.Integration.AutoDemandsTest do if i <= 100_000 do buffer = %Membrane.Buffer{payload: i + 10} - Pipeline.message_child(pipeline, :source, buffer: {:output, buffer}) + Pipeline.notify_child(pipeline, :source, buffer: {:output, buffer}) end end) @@ -276,7 +276,7 @@ defmodule Membrane.Integration.AutoDemandsTest do assert_pipeline_notified(pipeline, :source, :playing) buffers = Enum.map(1..100_000, &%Membrane.Buffer{payload: &1}) - Pipeline.message_child(pipeline, :source, buffer: {:output, buffers}) + Pipeline.notify_child(pipeline, :source, buffer: {:output, buffers}) assert_receive( {:DOWN, _ref, :process, ^pipeline, {:membrane_child_crash, :sink, _sink_reason}} @@ -347,7 +347,7 @@ defmodule Membrane.Integration.AutoDemandsTest do assert length(buffers) == manual_flow_queue_size demand = 10_000 - Pipeline.message_child(pipeline, :sink, {:make_demand, demand}) + Pipeline.notify_child(pipeline, :sink, {:make_demand, demand}) buffers = receive_processed_buffers(pipeline, 2 * demand) buffers_number = length(buffers) @@ -379,7 +379,7 @@ defmodule Membrane.Integration.AutoDemandsTest do assert_pipeline_notified(pipeline, :filter, :playing) - Pipeline.message_child(pipeline, :filter, pause_auto_demand: Pad.ref(:input, 0)) + Pipeline.notify_child(pipeline, :filter, pause_auto_demand: Pad.ref(:input, 0)) # time for :filter to pause demand on Pad.ref(:input, 0) Process.sleep(500) @@ -388,7 +388,7 @@ defmodule Membrane.Integration.AutoDemandsTest do assert length(buffers) == manual_flow_queue_size demand = 10_000 - Pipeline.message_child(pipeline, :sink, {:make_demand, demand}) + Pipeline.notify_child(pipeline, :sink, {:make_demand, demand}) # fliter paused auto demand on Pad.ref(:input, 0), so it should receive # at most auto_flow_demand_size buffers from there and rest of the buffers @@ -410,12 +410,12 @@ defmodule Membrane.Integration.AutoDemandsTest do # rest of them came from {:source, 1} assert demand - auto_flow_demand_size <= counter_1 - Pipeline.message_child(pipeline, :filter, resume_auto_demand: Pad.ref(:input, 0)) + Pipeline.notify_child(pipeline, :filter, resume_auto_demand: Pad.ref(:input, 0)) # time for :filter to resume demand on Pad.ref(:input, 0) Process.sleep(500) - Pipeline.message_child(pipeline, :sink, {:make_demand, demand}) + Pipeline.notify_child(pipeline, :sink, {:make_demand, demand}) buffers = receive_processed_buffers(pipeline, 2 * demand) buffers_number = length(buffers) diff --git a/test/membrane/integration/demands_test.exs b/test/membrane/integration/demands_test.exs index dacd0d60a..dec5d896b 100644 --- a/test/membrane/integration/demands_test.exs +++ b/test/membrane/integration/demands_test.exs @@ -24,14 +24,14 @@ defmodule Membrane.Integration.DemandsTest do assert_sink_playing(pid, :sink) demand = 500 - Pipeline.message_child(pid, :sink, {:make_demand, demand}) + Pipeline.notify_child(pid, :sink, {:make_demand, demand}) 0..(demand - 1) |> assert_buffers_received(pid) pattern = %Buffer{payload: <> <> <<255>>} refute_sink_buffer(pid, :sink, ^pattern, 0) - Pipeline.message_child(pid, :sink, {:make_demand, demand}) + Pipeline.notify_child(pid, :sink, {:make_demand, demand}) demand..(2 * demand - 1) |> assert_buffers_received(pid) diff --git a/test/membrane/integration/no_stream_format_crash_test.exs b/test/membrane/integration/no_stream_format_crash_test.exs index c6abad1dd..759e4262a 100644 --- a/test/membrane/integration/no_stream_format_crash_test.exs +++ b/test/membrane/integration/no_stream_format_crash_test.exs @@ -44,7 +44,7 @@ defmodule Membrane.FailWhenNoStreamFormatAreSent do ] pipeline = Pipeline.start_supervised!(options) - Pipeline.message_child(pipeline, :source, {:send_your_pid, self()}) + Pipeline.notify_child(pipeline, :source, {:send_your_pid, self()}) source_pid = receive do @@ -53,7 +53,7 @@ defmodule Membrane.FailWhenNoStreamFormatAreSent do source_ref = Process.monitor(source_pid) - Pipeline.message_child(pipeline, :source, :send_buffer) + Pipeline.notify_child(pipeline, :source, :send_buffer) assert_receive {:DOWN, ^source_ref, :process, ^source_pid, {reason, _stack_trace}} assert %Membrane.ElementError{message: action_error_msg} = reason assert action_error_msg =~ ~r/buffer.*stream.*format.*not.*sent/ From 150069cd536ac73c8c468f4d5cd98ff2f07078d3 Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Tue, 19 Mar 2024 11:56:19 +0100 Subject: [PATCH 04/10] Write test for delayed demands loop, send at most 1 resume delayed demands loop message at the time --- lib/membrane/core/element.ex | 4 +- lib/membrane/core/element/demand_handler.ex | 15 +++- lib/membrane/core/element/state.ex | 6 +- .../integration/delayed_demands_loop_test.exs | 81 +++++++++++++++++++ 4 files changed, 100 insertions(+), 6 deletions(-) create mode 100644 test/membrane/integration/delayed_demands_loop_test.exs diff --git a/lib/membrane/core/element.ex b/lib/membrane/core/element.ex index 8f673455f..7cec63529 100644 --- a/lib/membrane/core/element.ex +++ b/lib/membrane/core/element.ex @@ -226,8 +226,8 @@ defmodule Membrane.Core.Element do {:noreply, state} end - defp do_handle_info(Message.new(:resume_handle_demand_loop), state) do - state = DemandHandler.handle_delayed_demands(state) + defp do_handle_info(Message.new(:resume_delayed_demands_loop), state) do + state = DemandHandler.handle_resume_delayed_demands_loop(state) {:noreply, state} end diff --git a/lib/membrane/core/element/demand_handler.ex b/lib/membrane/core/element/demand_handler.ex index 1d22d7eb9..029bc2206 100644 --- a/lib/membrane/core/element/demand_handler.ex +++ b/lib/membrane/core/element/demand_handler.ex @@ -113,6 +113,12 @@ defmodule Membrane.Core.Element.DemandHandler do PadModel.set_data!(state, pad_ref, :manual_demand_size, new_manual_demand_size) end + @spec handle_resume_delayed_demands_loop(State.t()) :: State.t() + def handle_resume_delayed_demands_loop(%State{} = state) do + %{state | resume_delayed_demands_loop_in_mailbox?: false} + |> handle_delayed_demands() + end + @spec handle_delayed_demands(State.t()) :: State.t() def handle_delayed_demands(%State{} = state) do # Taking random element of `:delayed_demands` is done to keep data flow @@ -125,10 +131,15 @@ defmodule Membrane.Core.Element.DemandHandler do raise "Cannot handle delayed demands while already supplying demand" state.handle_demand_loop_counter >= @handle_demand_loop_limit -> - Message.self(:resume_handle_demand_loop) + state = + with %{resume_delayed_demands_loop_in_mailbox?: false} <- state do + Message.self(:resume_delayed_demands_loop) + %{state | resume_delayed_demands_loop_in_mailbox?: true} + end + %{state | handle_demand_loop_counter: 0} - Enum.empty?(state.delayed_demands) -> + MapSet.size(state.delayed_demands) == 0 -> %{state | handle_demand_loop_counter: 0} true -> diff --git a/lib/membrane/core/element/state.ex b/lib/membrane/core/element/state.ex index 9f990e68f..b805d572c 100644 --- a/lib/membrane/core/element/state.ex +++ b/lib/membrane/core/element/state.ex @@ -47,7 +47,8 @@ defmodule Membrane.Core.Element.State do pads_to_snapshot: MapSet.t(), stalker: Membrane.Core.Stalker.t(), satisfied_auto_output_pads: MapSet.t(), - awaiting_auto_input_pads: MapSet.t() + awaiting_auto_input_pads: MapSet.t(), + resume_delayed_demands_loop_in_mailbox?: boolean() } # READ THIS BEFORE ADDING NEW FIELD!!! @@ -87,6 +88,7 @@ defmodule Membrane.Core.Element.State do :pads_data, :satisfied_auto_output_pads, :awaiting_auto_input_pads, - :auto_input_pads + :auto_input_pads, + resume_delayed_demands_loop_in_mailbox?: false ] end diff --git a/test/membrane/integration/delayed_demands_loop_test.exs b/test/membrane/integration/delayed_demands_loop_test.exs new file mode 100644 index 000000000..1bc0ed82d --- /dev/null +++ b/test/membrane/integration/delayed_demands_loop_test.exs @@ -0,0 +1,81 @@ +defmodule Membrane.Test.DelayedDemandsLoopTest do + use ExUnit.Case, async: true + + import Membrane.ChildrenSpec + import Membrane.Testing.Assertions + + alias Membrane.Buffer + alias Membrane.Debug + alias Membrane.Testing + + defmodule Source do + use Membrane.Source + + defmodule StreamFormat do + defstruct [] + end + + @sleep_time 5 + @spec sleep_time() :: integer() + def sleep_time(), do: @sleep_time + + def_output_pad :output, + accepted_format: _any, + availability: :on_request, + flow_control: :manual + + @impl true + def handle_demand(_pad, _size, :buffers, %{pads: pads}, state) do + Process.sleep(@sleep_time) + + stream_format_actions = + Enum.flat_map(pads, fn + {pad_ref, %{start_of_stream?: false}} -> [stream_format: {pad_ref, %StreamFormat{}}] + _pad_entry -> [] + end) + + buffer = %Buffer{payload: "a"} + + buffer_and_redemand_actions = + Map.keys(pads) + |> Enum.flat_map(&[buffer: {&1, buffer}, redemand: &1]) + + {stream_format_actions ++ buffer_and_redemand_actions, state} + end + + @impl true + def handle_parent_notification(:request, _ctx, state) do + {[notify_parent: :response], state} + end + end + + describe "atomic demand busy wait loop doesn't occur when source has" do + test "1 pad", do: do_test(1) + test "2 pads", do: do_test(2) + test "10 pads", do: do_test(10) + end + + defp do_test(sinks_number) do + auto_demand_size = 20 + + spec = + Stream.repeatedly(fn -> + get_child(:source) + |> via_in(:input, auto_demand_size: auto_demand_size) + |> child(Debug.Sink) + end) + |> Stream.take(sinks_number) + |> Enum.concat([child(:source, Source)]) + + pipeline = Testing.Pipeline.start_link_supervised!(spec: spec) + + Process.sleep(1_000) + + for _i <- 1..(auto_demand_size + 5) do + Testing.Pipeline.notify_child(pipeline, :source, :request) + assert_pipeline_notified(pipeline, :source, :response) + end + + Testing.Pipeline.terminate(pipeline) + end +end From f4be83da277f68931ee4c8ad55893423d5030918 Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Tue, 19 Mar 2024 12:06:37 +0100 Subject: [PATCH 05/10] Set default fields values in Element state --- lib/membrane/core/element.ex | 18 +-------- lib/membrane/core/element/state.ex | 59 ++++++++++++++---------------- 2 files changed, 29 insertions(+), 48 deletions(-) diff --git a/lib/membrane/core/element.ex b/lib/membrane/core/element.ex index 7cec63529..b4f293e76 100644 --- a/lib/membrane/core/element.ex +++ b/lib/membrane/core/element.ex @@ -137,11 +137,7 @@ defmodule Membrane.Core.Element do module: options.module, type: options.module.membrane_element_type(), name: options.name, - internal_state: nil, parent_pid: options.parent, - supplying_demand?: false, - delayed_demands: MapSet.new(), - handle_demand_loop_counter: 0, synchronization: %{ parent_clock: options.parent_clock, timers: %{}, @@ -149,21 +145,9 @@ defmodule Membrane.Core.Element do stream_sync: options.sync, latency: 0 }, - initialized?: false, - playback: :stopped, - playback_queue: [], resource_guard: resource_guard, subprocess_supervisor: options.subprocess_supervisor, - terminating?: false, - setup_incomplete?: false, - effective_flow_control: :push, - handling_action?: false, - popping_auto_flow_queue?: false, - pads_to_snapshot: MapSet.new(), - stalker: options.stalker, - satisfied_auto_output_pads: MapSet.new(), - awaiting_auto_input_pads: MapSet.new(), - auto_input_pads: [] + stalker: options.stalker } |> PadSpecHandler.init_pads() diff --git a/lib/membrane/core/element/state.ex b/lib/membrane/core/element/state.ex index b805d572c..102b98a40 100644 --- a/lib/membrane/core/element/state.ex +++ b/lib/membrane/core/element/state.ex @@ -60,35 +60,32 @@ defmodule Membrane.Core.Element.State do # as the last item in the list, because sometimes it is so big, that everything after it # might be truncated during the inspection. - defstruct [ - :module, - :name, - :parent_pid, - :playback, - :type, - :internal_state, - :pad_refs, - :pads_info, - :synchronization, - :delayed_demands, - :effective_flow_control, - :initialized?, - :terminating?, - :setup_incomplete?, - :supplying_demand?, - :handling_action?, - :popping_auto_flow_queue?, - :stalker, - :resource_guard, - :subprocess_supervisor, - :handle_demand_loop_counter, - :demand_size, - :pads_to_snapshot, - :playback_queue, - :pads_data, - :satisfied_auto_output_pads, - :awaiting_auto_input_pads, - :auto_input_pads, - resume_delayed_demands_loop_in_mailbox?: false - ] + defstruct module: nil, + name: nil, + parent_pid: nil, + playback: :stopped, + type: nil, + internal_state: nil, + pad_refs: [], + pads_info: %{}, + synchronization: nil, + delayed_demands: MapSet.new(), + effective_flow_control: :push, + initialized?: false, + terminating?: false, + setup_incomplete?: false, + supplying_demand?: false, + handling_action?: false, + popping_auto_flow_queue?: false, + stalker: nil, + resource_guard: nil, + subprocess_supervisor: nil, + handle_demand_loop_counter: 0, + pads_to_snapshot: MapSet.new(), + playback_queue: [], + pads_data: %{}, + satisfied_auto_output_pads: MapSet.new(), + awaiting_auto_input_pads: MapSet.new(), + auto_input_pads: [], + resume_delayed_demands_loop_in_mailbox?: false end From 430933d62eb69b9b37ed738fdbc596e4495fae8d Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Tue, 19 Mar 2024 12:14:15 +0100 Subject: [PATCH 06/10] Add line to changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 283e1bbee..752f1721c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ## 1.1.0-rc0 * Deprecate `handle_spec_started/3` callback in Bins and Pipelines. [#708](https://github.com/membraneframework/membrane_core/pull/708) * Handle buffers from input pads having `flow_control: :auto` only if demand on all output pads having `flow_control: :auto` is positive. [#693](https://github.com/membraneframework/membrane_core/pull/693) + * Deprecate `Membrane.Testing.Pipeline.message_child/3`. Introduce `Membrane.Testing.Pipeline.notify_child/3` instead. [#779](https://github.com/membraneframework/membrane_core/pull/779) ## 1.0.1 * Specify the order in which state fields will be printed in the error logs. [#614](https://github.com/membraneframework/membrane_core/pull/614) From ee7cf48f56386298f8460f8dc0f96e0637dbe6a2 Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Tue, 19 Mar 2024 12:16:50 +0100 Subject: [PATCH 07/10] Rename test --- test/membrane/integration/delayed_demands_loop_test.exs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/membrane/integration/delayed_demands_loop_test.exs b/test/membrane/integration/delayed_demands_loop_test.exs index 1bc0ed82d..e2dd3f91c 100644 --- a/test/membrane/integration/delayed_demands_loop_test.exs +++ b/test/membrane/integration/delayed_demands_loop_test.exs @@ -49,7 +49,7 @@ defmodule Membrane.Test.DelayedDemandsLoopTest do end end - describe "atomic demand busy wait loop doesn't occur when source has" do + describe "delayed demands loop pauses from time to time, when source has" do test "1 pad", do: do_test(1) test "2 pads", do: do_test(2) test "10 pads", do: do_test(10) From b40b1e4dc065966970ca9eeffaae5d3c67c849d7 Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Tue, 19 Mar 2024 14:16:18 +0100 Subject: [PATCH 08/10] Rename function --- lib/membrane/core/element.ex | 2 +- lib/membrane/core/element/demand_handler.ex | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/membrane/core/element.ex b/lib/membrane/core/element.ex index b4f293e76..d76497770 100644 --- a/lib/membrane/core/element.ex +++ b/lib/membrane/core/element.ex @@ -211,7 +211,7 @@ defmodule Membrane.Core.Element do end defp do_handle_info(Message.new(:resume_delayed_demands_loop), state) do - state = DemandHandler.handle_resume_delayed_demands_loop(state) + state = DemandHandler.resume_delayed_demands_loop(state) {:noreply, state} end diff --git a/lib/membrane/core/element/demand_handler.ex b/lib/membrane/core/element/demand_handler.ex index 029bc2206..d7d9ec8e6 100644 --- a/lib/membrane/core/element/demand_handler.ex +++ b/lib/membrane/core/element/demand_handler.ex @@ -113,8 +113,8 @@ defmodule Membrane.Core.Element.DemandHandler do PadModel.set_data!(state, pad_ref, :manual_demand_size, new_manual_demand_size) end - @spec handle_resume_delayed_demands_loop(State.t()) :: State.t() - def handle_resume_delayed_demands_loop(%State{} = state) do + @spec resume_delayed_demands_loop(State.t()) :: State.t() + def resume_delayed_demands_loop(%State{} = state) do %{state | resume_delayed_demands_loop_in_mailbox?: false} |> handle_delayed_demands() end From d1068aa67c75f9f8892b6b30e5b9637c05cb06e5 Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Tue, 19 Mar 2024 14:18:14 +0100 Subject: [PATCH 09/10] Refactor test --- test/membrane/integration/delayed_demands_loop_test.exs | 2 -- 1 file changed, 2 deletions(-) diff --git a/test/membrane/integration/delayed_demands_loop_test.exs b/test/membrane/integration/delayed_demands_loop_test.exs index e2dd3f91c..aae82e772 100644 --- a/test/membrane/integration/delayed_demands_loop_test.exs +++ b/test/membrane/integration/delayed_demands_loop_test.exs @@ -16,8 +16,6 @@ defmodule Membrane.Test.DelayedDemandsLoopTest do end @sleep_time 5 - @spec sleep_time() :: integer() - def sleep_time(), do: @sleep_time def_output_pad :output, accepted_format: _any, From bd5f8f32a6c285176151313490bc30e1db7fc34e Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Tue, 19 Mar 2024 17:04:41 +0100 Subject: [PATCH 10/10] Refactor due to CR --- .../integration/delayed_demands_loop_test.exs | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/test/membrane/integration/delayed_demands_loop_test.exs b/test/membrane/integration/delayed_demands_loop_test.exs index aae82e772..9295b6bbf 100644 --- a/test/membrane/integration/delayed_demands_loop_test.exs +++ b/test/membrane/integration/delayed_demands_loop_test.exs @@ -54,20 +54,21 @@ defmodule Membrane.Test.DelayedDemandsLoopTest do end defp do_test(sinks_number) do - auto_demand_size = 20 + auto_demand_size = 15 spec = - Stream.repeatedly(fn -> - get_child(:source) - |> via_in(:input, auto_demand_size: auto_demand_size) - |> child(Debug.Sink) - end) - |> Stream.take(sinks_number) - |> Enum.concat([child(:source, Source)]) + [child(:source, Source)] ++ + for i <- 1..sinks_number do + get_child(:source) + |> via_in(:input, auto_demand_size: auto_demand_size) + |> child({:sink, i}, Debug.Sink) + end pipeline = Testing.Pipeline.start_link_supervised!(spec: spec) - Process.sleep(1_000) + for i <- 1..sinks_number do + assert_start_of_stream(pipeline, {:sink, ^i}) + end for _i <- 1..(auto_demand_size + 5) do Testing.Pipeline.notify_child(pipeline, :source, :request)