diff --git a/CHANGELOG.md b/CHANGELOG.md index ca1f9203c..ccfe885ce 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,7 +3,8 @@ ## 1.1.0-rc0 * Deprecate `handle_spec_started/3` callback in Bins and Pipelines. [#708](https://github.com/membraneframework/membrane_core/pull/708) * Handle buffers from input pads having `flow_control: :auto` only if demand on all output pads having `flow_control: :auto` is positive. [#693](https://github.com/membraneframework/membrane_core/pull/693) - * Bump dependency to `:ratio` to at least `v4.0.1`. [#780](https://github.com/membraneframework/membrane_core/pull/780) + * Set `:ratio` dependency version to `"~> 3.0 or ~> 4.0"`. [#780](https://github.com/membraneframework/membrane_core/pull/780) + * Deprecate `Membrane.Testing.Pipeline.message_child/3`. Introduce `Membrane.Testing.Pipeline.notify_child/3` instead. [#779](https://github.com/membraneframework/membrane_core/pull/779) ## 1.0.1 * Specify the order in which state fields will be printed in the error logs. [#614](https://github.com/membraneframework/membrane_core/pull/614) diff --git a/lib/membrane/core/element.ex b/lib/membrane/core/element.ex index 8f673455f..d76497770 100644 --- a/lib/membrane/core/element.ex +++ b/lib/membrane/core/element.ex @@ -137,11 +137,7 @@ defmodule Membrane.Core.Element do module: options.module, type: options.module.membrane_element_type(), name: options.name, - internal_state: nil, parent_pid: options.parent, - supplying_demand?: false, - delayed_demands: MapSet.new(), - handle_demand_loop_counter: 0, synchronization: %{ parent_clock: options.parent_clock, timers: %{}, @@ -149,21 +145,9 @@ defmodule Membrane.Core.Element do stream_sync: options.sync, latency: 0 }, - initialized?: false, - playback: :stopped, - playback_queue: [], resource_guard: resource_guard, subprocess_supervisor: options.subprocess_supervisor, - terminating?: false, - setup_incomplete?: false, - effective_flow_control: :push, - handling_action?: false, - popping_auto_flow_queue?: false, - pads_to_snapshot: MapSet.new(), - stalker: options.stalker, - satisfied_auto_output_pads: MapSet.new(), - awaiting_auto_input_pads: MapSet.new(), - auto_input_pads: [] + stalker: options.stalker } |> PadSpecHandler.init_pads() @@ -226,8 +210,8 @@ defmodule Membrane.Core.Element do {:noreply, state} end - defp do_handle_info(Message.new(:resume_handle_demand_loop), state) do - state = DemandHandler.handle_delayed_demands(state) + defp do_handle_info(Message.new(:resume_delayed_demands_loop), state) do + state = DemandHandler.resume_delayed_demands_loop(state) {:noreply, state} end diff --git a/lib/membrane/core/element/demand_handler.ex b/lib/membrane/core/element/demand_handler.ex index 1d22d7eb9..d7d9ec8e6 100644 --- a/lib/membrane/core/element/demand_handler.ex +++ b/lib/membrane/core/element/demand_handler.ex @@ -113,6 +113,12 @@ defmodule Membrane.Core.Element.DemandHandler do PadModel.set_data!(state, pad_ref, :manual_demand_size, new_manual_demand_size) end + @spec resume_delayed_demands_loop(State.t()) :: State.t() + def resume_delayed_demands_loop(%State{} = state) do + %{state | resume_delayed_demands_loop_in_mailbox?: false} + |> handle_delayed_demands() + end + @spec handle_delayed_demands(State.t()) :: State.t() def handle_delayed_demands(%State{} = state) do # Taking random element of `:delayed_demands` is done to keep data flow @@ -125,10 +131,15 @@ defmodule Membrane.Core.Element.DemandHandler do raise "Cannot handle delayed demands while already supplying demand" state.handle_demand_loop_counter >= @handle_demand_loop_limit -> - Message.self(:resume_handle_demand_loop) + state = + with %{resume_delayed_demands_loop_in_mailbox?: false} <- state do + Message.self(:resume_delayed_demands_loop) + %{state | resume_delayed_demands_loop_in_mailbox?: true} + end + %{state | handle_demand_loop_counter: 0} - Enum.empty?(state.delayed_demands) -> + MapSet.size(state.delayed_demands) == 0 -> %{state | handle_demand_loop_counter: 0} true -> diff --git a/lib/membrane/core/element/state.ex b/lib/membrane/core/element/state.ex index 9f990e68f..102b98a40 100644 --- a/lib/membrane/core/element/state.ex +++ b/lib/membrane/core/element/state.ex @@ -47,7 +47,8 @@ defmodule Membrane.Core.Element.State do pads_to_snapshot: MapSet.t(), stalker: Membrane.Core.Stalker.t(), satisfied_auto_output_pads: MapSet.t(), - awaiting_auto_input_pads: MapSet.t() + awaiting_auto_input_pads: MapSet.t(), + resume_delayed_demands_loop_in_mailbox?: boolean() } # READ THIS BEFORE ADDING NEW FIELD!!! @@ -59,34 +60,32 @@ defmodule Membrane.Core.Element.State do # as the last item in the list, because sometimes it is so big, that everything after it # might be truncated during the inspection. - defstruct [ - :module, - :name, - :parent_pid, - :playback, - :type, - :internal_state, - :pad_refs, - :pads_info, - :synchronization, - :delayed_demands, - :effective_flow_control, - :initialized?, - :terminating?, - :setup_incomplete?, - :supplying_demand?, - :handling_action?, - :popping_auto_flow_queue?, - :stalker, - :resource_guard, - :subprocess_supervisor, - :handle_demand_loop_counter, - :demand_size, - :pads_to_snapshot, - :playback_queue, - :pads_data, - :satisfied_auto_output_pads, - :awaiting_auto_input_pads, - :auto_input_pads - ] + defstruct module: nil, + name: nil, + parent_pid: nil, + playback: :stopped, + type: nil, + internal_state: nil, + pad_refs: [], + pads_info: %{}, + synchronization: nil, + delayed_demands: MapSet.new(), + effective_flow_control: :push, + initialized?: false, + terminating?: false, + setup_incomplete?: false, + supplying_demand?: false, + handling_action?: false, + popping_auto_flow_queue?: false, + stalker: nil, + resource_guard: nil, + subprocess_supervisor: nil, + handle_demand_loop_counter: 0, + pads_to_snapshot: MapSet.new(), + playback_queue: [], + pads_data: %{}, + satisfied_auto_output_pads: MapSet.new(), + awaiting_auto_input_pads: MapSet.new(), + auto_input_pads: [], + resume_delayed_demands_loop_in_mailbox?: false end diff --git a/lib/membrane/testing/pipeline.ex b/lib/membrane/testing/pipeline.ex index 73becaa8c..f37138491 100644 --- a/lib/membrane/testing/pipeline.ex +++ b/lib/membrane/testing/pipeline.ex @@ -52,7 +52,7 @@ defmodule Membrane.Testing.Pipeline do ## Messaging children You can send messages to children using their names specified in the children - list. Please check `message_child/3` for more details. + list. Please check `notify_child/3` for more details. ## Example usage @@ -202,6 +202,23 @@ defmodule Membrane.Testing.Pipeline do defdelegate terminate(pipeline, opts \\ []), to: Pipeline @doc """ + Sends notification to a child by Element name. + + ## Example + + Knowing that `pipeline` has child named `sink`, notification can be sent as follows: + + notify_child(pipeline, :sink, {:notification, "to handle"}) + """ + @spec notify_child(pid(), Element.name(), any()) :: :ok + def notify_child(pipeline, child, notification) do + send(pipeline, {:for_element, child, notification}) + :ok + end + + @doc """ + Deprecated since `v1.1.0-rc0`, use `notify_child/3` instead. + Sends message to a child by Element name. ## Example @@ -210,10 +227,10 @@ defmodule Membrane.Testing.Pipeline do message_child(pipeline, :sink, {:message, "to handle"}) """ + @deprecated "Use #{inspect(__MODULE__)}.notify_child/3 instead" @spec message_child(pid(), Element.name(), any()) :: :ok def message_child(pipeline, child, message) do - send(pipeline, {:for_element, child, message}) - :ok + notify_child(pipeline, child, message) end @doc """ diff --git a/test/membrane/integration/actions_handling_order_test.exs b/test/membrane/integration/actions_handling_order_test.exs index f54ac0d49..8d9476970 100644 --- a/test/membrane/integration/actions_handling_order_test.exs +++ b/test/membrane/integration/actions_handling_order_test.exs @@ -176,7 +176,7 @@ defmodule Membrane.Integration.ActionsHandlingOrderTest do # time for pipeline to play Process.sleep(100) - Testing.Pipeline.message_child(pipeline, :sink, :start_timer) + Testing.Pipeline.notify_child(pipeline, :sink, :start_timer) assert_pipeline_notified(pipeline, :sink, :second_tick) diff --git a/test/membrane/integration/auto_demands_test.exs b/test/membrane/integration/auto_demands_test.exs index 702f8b1f1..2a25604a2 100644 --- a/test/membrane/integration/auto_demands_test.exs +++ b/test/membrane/integration/auto_demands_test.exs @@ -129,7 +129,7 @@ defmodule Membrane.Integration.AutoDemandsTest do assert_sink_playing(pipeline, :right_sink) - Pipeline.message_child(pipeline, :right_sink, {:make_demand, 1000}) + Pipeline.notify_child(pipeline, :right_sink, {:make_demand, 1000}) Enum.each(1..1000, fn payload -> assert_sink_buffer(pipeline, :right_sink, buffer) @@ -246,7 +246,7 @@ defmodule Membrane.Integration.AutoDemandsTest do assert_pipeline_notified(pipeline, :source, :playing) buffers = Enum.map(1..10, &%Membrane.Buffer{payload: &1}) - Pipeline.message_child(pipeline, :source, buffer: {:output, buffers}) + Pipeline.notify_child(pipeline, :source, buffer: {:output, buffers}) Enum.each(1..100_010, fn i -> assert_sink_buffer(pipeline, :sink, buffer) @@ -254,7 +254,7 @@ defmodule Membrane.Integration.AutoDemandsTest do if i <= 100_000 do buffer = %Membrane.Buffer{payload: i + 10} - Pipeline.message_child(pipeline, :source, buffer: {:output, buffer}) + Pipeline.notify_child(pipeline, :source, buffer: {:output, buffer}) end end) @@ -276,7 +276,7 @@ defmodule Membrane.Integration.AutoDemandsTest do assert_pipeline_notified(pipeline, :source, :playing) buffers = Enum.map(1..100_000, &%Membrane.Buffer{payload: &1}) - Pipeline.message_child(pipeline, :source, buffer: {:output, buffers}) + Pipeline.notify_child(pipeline, :source, buffer: {:output, buffers}) assert_receive( {:DOWN, _ref, :process, ^pipeline, {:membrane_child_crash, :sink, _sink_reason}} @@ -347,7 +347,7 @@ defmodule Membrane.Integration.AutoDemandsTest do assert length(buffers) == manual_flow_queue_size demand = 10_000 - Pipeline.message_child(pipeline, :sink, {:make_demand, demand}) + Pipeline.notify_child(pipeline, :sink, {:make_demand, demand}) buffers = receive_processed_buffers(pipeline, 2 * demand) buffers_number = length(buffers) @@ -379,7 +379,7 @@ defmodule Membrane.Integration.AutoDemandsTest do assert_pipeline_notified(pipeline, :filter, :playing) - Pipeline.message_child(pipeline, :filter, pause_auto_demand: Pad.ref(:input, 0)) + Pipeline.notify_child(pipeline, :filter, pause_auto_demand: Pad.ref(:input, 0)) # time for :filter to pause demand on Pad.ref(:input, 0) Process.sleep(500) @@ -388,7 +388,7 @@ defmodule Membrane.Integration.AutoDemandsTest do assert length(buffers) == manual_flow_queue_size demand = 10_000 - Pipeline.message_child(pipeline, :sink, {:make_demand, demand}) + Pipeline.notify_child(pipeline, :sink, {:make_demand, demand}) # fliter paused auto demand on Pad.ref(:input, 0), so it should receive # at most auto_flow_demand_size buffers from there and rest of the buffers @@ -410,12 +410,12 @@ defmodule Membrane.Integration.AutoDemandsTest do # rest of them came from {:source, 1} assert demand - auto_flow_demand_size <= counter_1 - Pipeline.message_child(pipeline, :filter, resume_auto_demand: Pad.ref(:input, 0)) + Pipeline.notify_child(pipeline, :filter, resume_auto_demand: Pad.ref(:input, 0)) # time for :filter to resume demand on Pad.ref(:input, 0) Process.sleep(500) - Pipeline.message_child(pipeline, :sink, {:make_demand, demand}) + Pipeline.notify_child(pipeline, :sink, {:make_demand, demand}) buffers = receive_processed_buffers(pipeline, 2 * demand) buffers_number = length(buffers) diff --git a/test/membrane/integration/delayed_demands_loop_test.exs b/test/membrane/integration/delayed_demands_loop_test.exs new file mode 100644 index 000000000..7688cc744 --- /dev/null +++ b/test/membrane/integration/delayed_demands_loop_test.exs @@ -0,0 +1,83 @@ +defmodule Membrane.Test.DelayedDemandsLoopTest do + use ExUnit.Case, async: true + + import Membrane.ChildrenSpec + import Membrane.Testing.Assertions + + alias Membrane.Buffer + alias Membrane.Debug + alias Membrane.Testing + + defmodule Source do + use Membrane.Source + + defmodule StreamFormat do + defstruct [] + end + + @sleep_time 5 + + def_output_pad :output, + accepted_format: _any, + availability: :on_request, + flow_control: :manual + + @impl true + def handle_demand(_pad, _size, :buffers, %{pads: pads}, state) do + Process.sleep(@sleep_time) + + stream_format_actions = + Enum.flat_map(pads, fn + {pad_ref, %{start_of_stream?: false}} -> [stream_format: {pad_ref, %StreamFormat{}}] + _pad_entry -> [] + end) + + buffer = %Buffer{payload: "a"} + + buffer_and_redemand_actions = + Map.keys(pads) + |> Enum.flat_map(&[buffer: {&1, buffer}, redemand: &1]) + + {stream_format_actions ++ buffer_and_redemand_actions, state} + end + + @impl true + def handle_parent_notification(:request, _ctx, state) do + {[notify_parent: :response], state} + end + end + + describe "delayed demands loop pauses from time to time, when source has" do + test "1 pad", do: do_test(1) + test "2 pads", do: do_test(2) + test "10 pads", do: do_test(10) + end + + defp do_test(sinks_number) do + # auto_demand_size is smaller than delayed_demands_loop_counter_limit, to ensure that + # after a snapshot, the counter is not reset + auto_demand_size = 15 + requests_number = 20 + + spec = + [child(:source, Source)] ++ + for i <- 1..sinks_number do + get_child(:source) + |> via_in(:input, auto_demand_size: auto_demand_size) + |> child({:sink, i}, Debug.Sink) + end + + pipeline = Testing.Pipeline.start_link_supervised!(spec: spec) + + for i <- 1..sinks_number do + assert_start_of_stream(pipeline, {:sink, ^i}) + end + + for _i <- 1..requests_number do + Testing.Pipeline.notify_child(pipeline, :source, :request) + assert_pipeline_notified(pipeline, :source, :response) + end + + Testing.Pipeline.terminate(pipeline) + end +end diff --git a/test/membrane/integration/demands_test.exs b/test/membrane/integration/demands_test.exs index dacd0d60a..dec5d896b 100644 --- a/test/membrane/integration/demands_test.exs +++ b/test/membrane/integration/demands_test.exs @@ -24,14 +24,14 @@ defmodule Membrane.Integration.DemandsTest do assert_sink_playing(pid, :sink) demand = 500 - Pipeline.message_child(pid, :sink, {:make_demand, demand}) + Pipeline.notify_child(pid, :sink, {:make_demand, demand}) 0..(demand - 1) |> assert_buffers_received(pid) pattern = %Buffer{payload: <> <> <<255>>} refute_sink_buffer(pid, :sink, ^pattern, 0) - Pipeline.message_child(pid, :sink, {:make_demand, demand}) + Pipeline.notify_child(pid, :sink, {:make_demand, demand}) demand..(2 * demand - 1) |> assert_buffers_received(pid) diff --git a/test/membrane/integration/no_stream_format_crash_test.exs b/test/membrane/integration/no_stream_format_crash_test.exs index c6abad1dd..759e4262a 100644 --- a/test/membrane/integration/no_stream_format_crash_test.exs +++ b/test/membrane/integration/no_stream_format_crash_test.exs @@ -44,7 +44,7 @@ defmodule Membrane.FailWhenNoStreamFormatAreSent do ] pipeline = Pipeline.start_supervised!(options) - Pipeline.message_child(pipeline, :source, {:send_your_pid, self()}) + Pipeline.notify_child(pipeline, :source, {:send_your_pid, self()}) source_pid = receive do @@ -53,7 +53,7 @@ defmodule Membrane.FailWhenNoStreamFormatAreSent do source_ref = Process.monitor(source_pid) - Pipeline.message_child(pipeline, :source, :send_buffer) + Pipeline.notify_child(pipeline, :source, :send_buffer) assert_receive {:DOWN, ^source_ref, :process, ^source_pid, {reason, _stack_trace}} assert %Membrane.ElementError{message: action_error_msg} = reason assert action_error_msg =~ ~r/buffer.*stream.*format.*not.*sent/ diff --git a/test/membrane/resource_guard_test.exs b/test/membrane/resource_guard_test.exs index b41ce3479..de70f1aae 100644 --- a/test/membrane/resource_guard_test.exs +++ b/test/membrane/resource_guard_test.exs @@ -69,9 +69,11 @@ defmodule Membrane.ResourceGuardTest do assert_receive :bin_guard_triggered Testing.Pipeline.execute_actions(pipeline, - spec: [child(:element, Element), child(:bin, Bin)] + spec: [child(:element2, Element), child(:bin2, Bin)] ) + assert_pipeline_notified(pipeline, :element2, :ready) + assert_pipeline_notified(pipeline, :bin2, :ready) assert :ready = Membrane.Pipeline.call(pipeline, :setup_guard) Membrane.Pipeline.terminate(pipeline) assert_receive :element_guard_triggered