diff --git a/CHANGELOG.md b/CHANGELOG.md index 283e1bbee..752f1721c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ## 1.1.0-rc0 * Deprecate `handle_spec_started/3` callback in Bins and Pipelines. [#708](https://github.com/membraneframework/membrane_core/pull/708) * Handle buffers from input pads having `flow_control: :auto` only if demand on all output pads having `flow_control: :auto` is positive. [#693](https://github.com/membraneframework/membrane_core/pull/693) + * Deprecate `Membrane.Testing.Pipeline.message_child/3`. Introduce `Membrane.Testing.Pipeline.notify_child/3` instead. [#779](https://github.com/membraneframework/membrane_core/pull/779) ## 1.0.1 * Specify the order in which state fields will be printed in the error logs. [#614](https://github.com/membraneframework/membrane_core/pull/614) diff --git a/lib/membrane/core/element.ex b/lib/membrane/core/element.ex index 218ea7300..3e0c35dcd 100644 --- a/lib/membrane/core/element.ex +++ b/lib/membrane/core/element.ex @@ -137,11 +137,7 @@ defmodule Membrane.Core.Element do module: options.module, type: options.module.membrane_element_type(), name: options.name, - internal_state: nil, parent_pid: options.parent, - delay_consuming_queues?: false, - delayed_demands: MapSet.new(), - handle_demand_loop_counter: 0, synchronization: %{ parent_clock: options.parent_clock, timers: %{}, @@ -149,20 +145,9 @@ defmodule Membrane.Core.Element do stream_sync: options.sync, latency: 0 }, - initialized?: false, - playback: :stopped, - playback_queue: [], resource_guard: resource_guard, subprocess_supervisor: options.subprocess_supervisor, - terminating?: false, - setup_incomplete?: false, - effective_flow_control: :push, - popping_auto_flow_queue?: false, - pads_to_snapshot: MapSet.new(), - stalker: options.stalker, - satisfied_auto_output_pads: MapSet.new(), - awaiting_auto_input_pads: MapSet.new(), - auto_input_pads: [] + stalker: options.stalker } |> PadSpecHandler.init_pads() @@ -225,9 +210,8 @@ defmodule Membrane.Core.Element do {:noreply, state} end - defp do_handle_info(Message.new(:resume_handle_demand_loop), state) do - # pytanie: consume queues czy handle delayed demands? - state = DemandHandler.handle_delayed_demands(state) + defp do_handle_info(Message.new(:resume_delayed_demands_loop), state) do + state = DemandHandler.resume_delayed_demands_loop(state) {:noreply, state} end diff --git a/lib/membrane/core/element/demand_handler.ex b/lib/membrane/core/element/demand_handler.ex index 467bbec1a..281a9b6ff 100644 --- a/lib/membrane/core/element/demand_handler.ex +++ b/lib/membrane/core/element/demand_handler.ex @@ -115,6 +115,12 @@ defmodule Membrane.Core.Element.DemandHandler do PadModel.set_data!(state, pad_ref, :manual_demand_size, new_manual_demand_size) end + @spec resume_delayed_demands_loop(State.t()) :: State.t() + def resume_delayed_demands_loop(%State{} = state) do + %{state | resume_delayed_demands_loop_in_mailbox?: false} + |> handle_delayed_demands() + end + @spec handle_delayed_demands(State.t()) :: State.t() def handle_delayed_demands(%State{} = state) do # Taking random element of `:delayed_demands` is done to keep data flow @@ -128,10 +134,15 @@ defmodule Membrane.Core.Element.DemandHandler do raise "Cannot handle delayed demands while already supplying demand" state.handle_demand_loop_counter >= @handle_demand_loop_limit -> - Message.self(:resume_handle_demand_loop) + state = + with %{resume_delayed_demands_loop_in_mailbox?: false} <- state do + Message.self(:resume_delayed_demands_loop) + %{state | resume_delayed_demands_loop_in_mailbox?: true} + end + %{state | handle_demand_loop_counter: 0} - Enum.empty?(state.delayed_demands) -> + MapSet.size(state.delayed_demands) == 0 -> %{state | handle_demand_loop_counter: 0} true -> diff --git a/lib/membrane/core/element/state.ex b/lib/membrane/core/element/state.ex index f65e1b068..8d5839229 100644 --- a/lib/membrane/core/element/state.ex +++ b/lib/membrane/core/element/state.ex @@ -46,7 +46,8 @@ defmodule Membrane.Core.Element.State do pads_to_snapshot: MapSet.t(), stalker: Membrane.Core.Stalker.t(), satisfied_auto_output_pads: MapSet.t(), - awaiting_auto_input_pads: MapSet.t() + awaiting_auto_input_pads: MapSet.t(), + resume_delayed_demands_loop_in_mailbox?: boolean() } # READ THIS BEFORE ADDING NEW FIELD!!! @@ -58,33 +59,31 @@ defmodule Membrane.Core.Element.State do # as the last item in the list, because sometimes it is so big, that everything after it # might be truncated during the inspection. - defstruct [ - :module, - :name, - :parent_pid, - :playback, - :type, - :internal_state, - :pad_refs, - :pads_info, - :synchronization, - :delayed_demands, - :effective_flow_control, - :initialized?, - :terminating?, - :setup_incomplete?, - :delay_consuming_queues?, - :popping_auto_flow_queue?, - :stalker, - :resource_guard, - :subprocess_supervisor, - :handle_demand_loop_counter, - :demand_size, - :pads_to_snapshot, - :playback_queue, - :pads_data, - :satisfied_auto_output_pads, - :awaiting_auto_input_pads, - :auto_input_pads - ] + defstruct module: nil, + name: nil, + parent_pid: nil, + playback: :stopped, + type: nil, + internal_state: nil, + pad_refs: [], + pads_info: %{}, + synchronization: nil, + resume_delayed_demands_loop_in_mailbox?: false, + delayed_demands: MapSet.new(), + effective_flow_control: :push, + initialized?: false, + terminating?: false, + setup_incomplete?: false, + delay_consuming_queues?: false, + popping_auto_flow_queue?: false, + stalker: nil, + resource_guard: nil, + subprocess_supervisor: nil, + handle_demand_loop_counter: 0, + pads_to_snapshot: MapSet.new(), + playback_queue: [], + pads_data: %{}, + satisfied_auto_output_pads: MapSet.new(), + awaiting_auto_input_pads: MapSet.new(), + auto_input_pads: [] end diff --git a/lib/membrane/element/action.ex b/lib/membrane/element/action.ex index 055821142..0d5db32a9 100644 --- a/lib/membrane/element/action.ex +++ b/lib/membrane/element/action.ex @@ -82,11 +82,12 @@ defmodule Membrane.Element.Action do guaranteed not to receive more data than demanded. Demand size can be either a non-negative integer, that overrides existing demand, - or a function that is passed current demand, and is to return the new demand. + or a function that is passed current demand, and is to return the new demand. In case only pad + is specified, the demand size defaults to 1. Allowed only when playback is playing. """ - @type demand :: {:demand, {Pad.ref(), demand_size}} + @type demand :: {:demand, {Pad.ref(), demand_size} | Pad.ref()} @type demand_size :: pos_integer | (pos_integer() -> non_neg_integer()) @typedoc """ diff --git a/lib/membrane/testing/pipeline.ex b/lib/membrane/testing/pipeline.ex index 73becaa8c..f37138491 100644 --- a/lib/membrane/testing/pipeline.ex +++ b/lib/membrane/testing/pipeline.ex @@ -52,7 +52,7 @@ defmodule Membrane.Testing.Pipeline do ## Messaging children You can send messages to children using their names specified in the children - list. Please check `message_child/3` for more details. + list. Please check `notify_child/3` for more details. ## Example usage @@ -202,6 +202,23 @@ defmodule Membrane.Testing.Pipeline do defdelegate terminate(pipeline, opts \\ []), to: Pipeline @doc """ + Sends notification to a child by Element name. + + ## Example + + Knowing that `pipeline` has child named `sink`, notification can be sent as follows: + + notify_child(pipeline, :sink, {:notification, "to handle"}) + """ + @spec notify_child(pid(), Element.name(), any()) :: :ok + def notify_child(pipeline, child, notification) do + send(pipeline, {:for_element, child, notification}) + :ok + end + + @doc """ + Deprecated since `v1.1.0-rc0`, use `notify_child/3` instead. + Sends message to a child by Element name. ## Example @@ -210,10 +227,10 @@ defmodule Membrane.Testing.Pipeline do message_child(pipeline, :sink, {:message, "to handle"}) """ + @deprecated "Use #{inspect(__MODULE__)}.notify_child/3 instead" @spec message_child(pid(), Element.name(), any()) :: :ok def message_child(pipeline, child, message) do - send(pipeline, {:for_element, child, message}) - :ok + notify_child(pipeline, child, message) end @doc """ diff --git a/test/membrane/core/element/action_handler_test.exs b/test/membrane/core/element/action_handler_test.exs index 66f86267f..99565bf92 100644 --- a/test/membrane/core/element/action_handler_test.exs +++ b/test/membrane/core/element/action_handler_test.exs @@ -25,7 +25,6 @@ defmodule Membrane.Core.Element.ActionHandlerTest do playback: :stopped, synchronization: %{clock: nil, parent_clock: nil}, delayed_demands: MapSet.new(), - handling_action?: false, pads_to_snapshot: MapSet.new(), pads_data: %{ input: @@ -110,7 +109,6 @@ defmodule Membrane.Core.Element.ActionHandlerTest do synchronization: %{clock: nil, parent_clock: nil}, delayed_demands: MapSet.new(), playback: :stopped, - handling_action?: false, pads_to_snapshot: MapSet.new(), pads_data: %{ output: %{ @@ -512,7 +510,6 @@ defmodule Membrane.Core.Element.ActionHandlerTest do name: :elem_name, synchronization: %{clock: nil, parent_clock: nil}, type: :source, - handling_action?: false, pads_to_snapshot: MapSet.new(), pads_data: %{ output: %{ diff --git a/test/membrane/core/element/event_controller_test.exs b/test/membrane/core/element/event_controller_test.exs index 23518bc01..bde5d9176 100644 --- a/test/membrane/core/element/event_controller_test.exs +++ b/test/membrane/core/element/event_controller_test.exs @@ -50,7 +50,6 @@ defmodule Membrane.Core.Element.EventControllerTest do playback: :playing, parent_pid: self(), synchronization: %{clock: nil, parent_clock: nil, stream_sync: nil}, - handling_action?: false, delay_consuming_queues?: false, pads_to_snapshot: MapSet.new(), delayed_demands: MapSet.new(), diff --git a/test/membrane/core/element/lifecycle_controller_test.exs b/test/membrane/core/element/lifecycle_controller_test.exs index b5670d72c..c576213f0 100644 --- a/test/membrane/core/element/lifecycle_controller_test.exs +++ b/test/membrane/core/element/lifecycle_controller_test.exs @@ -49,7 +49,6 @@ defmodule Membrane.Core.Element.LifecycleControllerTest do playback: :playing, parent_pid: self(), synchronization: %{clock: nil, parent_clock: nil}, - handling_action?: false, delay_consuming_queues?: false, pads_to_snapshot: MapSet.new(), delayed_demands: MapSet.new(), diff --git a/test/membrane/core/element/pad_controller_test.exs b/test/membrane/core/element/pad_controller_test.exs index 413582179..d6f36f8c5 100644 --- a/test/membrane/core/element/pad_controller_test.exs +++ b/test/membrane/core/element/pad_controller_test.exs @@ -18,7 +18,6 @@ defmodule Membrane.Core.Element.PadControllerTest do struct!(State, name: name, module: elem_module, - handling_action?: false, delay_consuming_queues?: false, pads_to_snapshot: MapSet.new(), delayed_demands: MapSet.new(), diff --git a/test/membrane/core/element/stream_format_controller_test.exs b/test/membrane/core/element/stream_format_controller_test.exs index d4fe2fec1..b3e396fb6 100644 --- a/test/membrane/core/element/stream_format_controller_test.exs +++ b/test/membrane/core/element/stream_format_controller_test.exs @@ -41,7 +41,6 @@ defmodule Membrane.Core.Element.StreamFormatControllerTest do type: :filter, playback: :playing, synchronization: %{clock: nil, parent_clock: nil}, - handling_action?: false, delay_consuming_queues?: false, pads_to_snapshot: MapSet.new(), delayed_demands: MapSet.new(), diff --git a/test/membrane/integration/actions_handling_order_test.exs b/test/membrane/integration/actions_handling_order_test.exs index f54ac0d49..8d9476970 100644 --- a/test/membrane/integration/actions_handling_order_test.exs +++ b/test/membrane/integration/actions_handling_order_test.exs @@ -176,7 +176,7 @@ defmodule Membrane.Integration.ActionsHandlingOrderTest do # time for pipeline to play Process.sleep(100) - Testing.Pipeline.message_child(pipeline, :sink, :start_timer) + Testing.Pipeline.notify_child(pipeline, :sink, :start_timer) assert_pipeline_notified(pipeline, :sink, :second_tick) diff --git a/test/membrane/integration/auto_demands_test.exs b/test/membrane/integration/auto_demands_test.exs index 702f8b1f1..2a25604a2 100644 --- a/test/membrane/integration/auto_demands_test.exs +++ b/test/membrane/integration/auto_demands_test.exs @@ -129,7 +129,7 @@ defmodule Membrane.Integration.AutoDemandsTest do assert_sink_playing(pipeline, :right_sink) - Pipeline.message_child(pipeline, :right_sink, {:make_demand, 1000}) + Pipeline.notify_child(pipeline, :right_sink, {:make_demand, 1000}) Enum.each(1..1000, fn payload -> assert_sink_buffer(pipeline, :right_sink, buffer) @@ -246,7 +246,7 @@ defmodule Membrane.Integration.AutoDemandsTest do assert_pipeline_notified(pipeline, :source, :playing) buffers = Enum.map(1..10, &%Membrane.Buffer{payload: &1}) - Pipeline.message_child(pipeline, :source, buffer: {:output, buffers}) + Pipeline.notify_child(pipeline, :source, buffer: {:output, buffers}) Enum.each(1..100_010, fn i -> assert_sink_buffer(pipeline, :sink, buffer) @@ -254,7 +254,7 @@ defmodule Membrane.Integration.AutoDemandsTest do if i <= 100_000 do buffer = %Membrane.Buffer{payload: i + 10} - Pipeline.message_child(pipeline, :source, buffer: {:output, buffer}) + Pipeline.notify_child(pipeline, :source, buffer: {:output, buffer}) end end) @@ -276,7 +276,7 @@ defmodule Membrane.Integration.AutoDemandsTest do assert_pipeline_notified(pipeline, :source, :playing) buffers = Enum.map(1..100_000, &%Membrane.Buffer{payload: &1}) - Pipeline.message_child(pipeline, :source, buffer: {:output, buffers}) + Pipeline.notify_child(pipeline, :source, buffer: {:output, buffers}) assert_receive( {:DOWN, _ref, :process, ^pipeline, {:membrane_child_crash, :sink, _sink_reason}} @@ -347,7 +347,7 @@ defmodule Membrane.Integration.AutoDemandsTest do assert length(buffers) == manual_flow_queue_size demand = 10_000 - Pipeline.message_child(pipeline, :sink, {:make_demand, demand}) + Pipeline.notify_child(pipeline, :sink, {:make_demand, demand}) buffers = receive_processed_buffers(pipeline, 2 * demand) buffers_number = length(buffers) @@ -379,7 +379,7 @@ defmodule Membrane.Integration.AutoDemandsTest do assert_pipeline_notified(pipeline, :filter, :playing) - Pipeline.message_child(pipeline, :filter, pause_auto_demand: Pad.ref(:input, 0)) + Pipeline.notify_child(pipeline, :filter, pause_auto_demand: Pad.ref(:input, 0)) # time for :filter to pause demand on Pad.ref(:input, 0) Process.sleep(500) @@ -388,7 +388,7 @@ defmodule Membrane.Integration.AutoDemandsTest do assert length(buffers) == manual_flow_queue_size demand = 10_000 - Pipeline.message_child(pipeline, :sink, {:make_demand, demand}) + Pipeline.notify_child(pipeline, :sink, {:make_demand, demand}) # fliter paused auto demand on Pad.ref(:input, 0), so it should receive # at most auto_flow_demand_size buffers from there and rest of the buffers @@ -410,12 +410,12 @@ defmodule Membrane.Integration.AutoDemandsTest do # rest of them came from {:source, 1} assert demand - auto_flow_demand_size <= counter_1 - Pipeline.message_child(pipeline, :filter, resume_auto_demand: Pad.ref(:input, 0)) + Pipeline.notify_child(pipeline, :filter, resume_auto_demand: Pad.ref(:input, 0)) # time for :filter to resume demand on Pad.ref(:input, 0) Process.sleep(500) - Pipeline.message_child(pipeline, :sink, {:make_demand, demand}) + Pipeline.notify_child(pipeline, :sink, {:make_demand, demand}) buffers = receive_processed_buffers(pipeline, 2 * demand) buffers_number = length(buffers) diff --git a/test/membrane/integration/delayed_demands_loop_test.exs b/test/membrane/integration/delayed_demands_loop_test.exs new file mode 100644 index 000000000..9295b6bbf --- /dev/null +++ b/test/membrane/integration/delayed_demands_loop_test.exs @@ -0,0 +1,80 @@ +defmodule Membrane.Test.DelayedDemandsLoopTest do + use ExUnit.Case, async: true + + import Membrane.ChildrenSpec + import Membrane.Testing.Assertions + + alias Membrane.Buffer + alias Membrane.Debug + alias Membrane.Testing + + defmodule Source do + use Membrane.Source + + defmodule StreamFormat do + defstruct [] + end + + @sleep_time 5 + + def_output_pad :output, + accepted_format: _any, + availability: :on_request, + flow_control: :manual + + @impl true + def handle_demand(_pad, _size, :buffers, %{pads: pads}, state) do + Process.sleep(@sleep_time) + + stream_format_actions = + Enum.flat_map(pads, fn + {pad_ref, %{start_of_stream?: false}} -> [stream_format: {pad_ref, %StreamFormat{}}] + _pad_entry -> [] + end) + + buffer = %Buffer{payload: "a"} + + buffer_and_redemand_actions = + Map.keys(pads) + |> Enum.flat_map(&[buffer: {&1, buffer}, redemand: &1]) + + {stream_format_actions ++ buffer_and_redemand_actions, state} + end + + @impl true + def handle_parent_notification(:request, _ctx, state) do + {[notify_parent: :response], state} + end + end + + describe "delayed demands loop pauses from time to time, when source has" do + test "1 pad", do: do_test(1) + test "2 pads", do: do_test(2) + test "10 pads", do: do_test(10) + end + + defp do_test(sinks_number) do + auto_demand_size = 15 + + spec = + [child(:source, Source)] ++ + for i <- 1..sinks_number do + get_child(:source) + |> via_in(:input, auto_demand_size: auto_demand_size) + |> child({:sink, i}, Debug.Sink) + end + + pipeline = Testing.Pipeline.start_link_supervised!(spec: spec) + + for i <- 1..sinks_number do + assert_start_of_stream(pipeline, {:sink, ^i}) + end + + for _i <- 1..(auto_demand_size + 5) do + Testing.Pipeline.notify_child(pipeline, :source, :request) + assert_pipeline_notified(pipeline, :source, :response) + end + + Testing.Pipeline.terminate(pipeline) + end +end diff --git a/test/membrane/integration/demands_test.exs b/test/membrane/integration/demands_test.exs index dacd0d60a..dec5d896b 100644 --- a/test/membrane/integration/demands_test.exs +++ b/test/membrane/integration/demands_test.exs @@ -24,14 +24,14 @@ defmodule Membrane.Integration.DemandsTest do assert_sink_playing(pid, :sink) demand = 500 - Pipeline.message_child(pid, :sink, {:make_demand, demand}) + Pipeline.notify_child(pid, :sink, {:make_demand, demand}) 0..(demand - 1) |> assert_buffers_received(pid) pattern = %Buffer{payload: <> <> <<255>>} refute_sink_buffer(pid, :sink, ^pattern, 0) - Pipeline.message_child(pid, :sink, {:make_demand, demand}) + Pipeline.notify_child(pid, :sink, {:make_demand, demand}) demand..(2 * demand - 1) |> assert_buffers_received(pid) diff --git a/test/membrane/integration/no_stream_format_crash_test.exs b/test/membrane/integration/no_stream_format_crash_test.exs index c6abad1dd..759e4262a 100644 --- a/test/membrane/integration/no_stream_format_crash_test.exs +++ b/test/membrane/integration/no_stream_format_crash_test.exs @@ -44,7 +44,7 @@ defmodule Membrane.FailWhenNoStreamFormatAreSent do ] pipeline = Pipeline.start_supervised!(options) - Pipeline.message_child(pipeline, :source, {:send_your_pid, self()}) + Pipeline.notify_child(pipeline, :source, {:send_your_pid, self()}) source_pid = receive do @@ -53,7 +53,7 @@ defmodule Membrane.FailWhenNoStreamFormatAreSent do source_ref = Process.monitor(source_pid) - Pipeline.message_child(pipeline, :source, :send_buffer) + Pipeline.notify_child(pipeline, :source, :send_buffer) assert_receive {:DOWN, ^source_ref, :process, ^source_pid, {reason, _stack_trace}} assert %Membrane.ElementError{message: action_error_msg} = reason assert action_error_msg =~ ~r/buffer.*stream.*format.*not.*sent/ diff --git a/test/membrane/resource_guard_test.exs b/test/membrane/resource_guard_test.exs index b06ff9636..b41ce3479 100644 --- a/test/membrane/resource_guard_test.exs +++ b/test/membrane/resource_guard_test.exs @@ -14,11 +14,8 @@ defmodule Membrane.ResourceGuardTest do @impl true def handle_setup(ctx, state) do - {:ok, pid} = Task.start(fn -> Process.sleep(:infinity) end) - Process.register(pid, :membrane_resource_guard_test_element_resource) - ResourceGuard.register(ctx.resource_guard, fn -> - Process.exit(pid, :shutdown) + send(:membrane_resource_guard_test_process, :element_guard_triggered) end) {[notify_parent: :ready], state} @@ -32,11 +29,8 @@ defmodule Membrane.ResourceGuardTest do @impl true def handle_setup(ctx, state) do - {:ok, pid} = Task.start(fn -> Process.sleep(:infinity) end) - Process.register(pid, :membrane_resource_guard_test_bin_resource) - ResourceGuard.register(ctx.resource_guard, fn -> - Process.exit(pid, :shutdown) + send(:membrane_resource_guard_test_process, :bin_guard_triggered) end) {[notify_parent: :ready], state} @@ -50,17 +44,16 @@ defmodule Membrane.ResourceGuardTest do @impl true def handle_call(:setup_guard, ctx, state) do - {:ok, pid} = Task.start(fn -> Process.sleep(:infinity) end) - Process.register(pid, :membrane_resource_guard_test_pipeline_resource) - ResourceGuard.register(ctx.resource_guard, fn -> - Process.exit(pid, :shutdown) + send(:membrane_resource_guard_test_process, :pipeline_guard_triggered) end) {[reply: :ready], state} end end + Process.register(self(), :membrane_resource_guard_test_process) + pipeline = Testing.Pipeline.start_link_supervised!(module: Pipeline) Testing.Pipeline.execute_actions(pipeline, @@ -68,19 +61,22 @@ defmodule Membrane.ResourceGuardTest do ) assert_pipeline_notified(pipeline, :element, :ready) - monitor_ref = Process.monitor(:membrane_resource_guard_test_element_resource) Testing.Pipeline.execute_actions(pipeline, remove_children: :element) - assert_receive {:DOWN, ^monitor_ref, :process, _pid, :shutdown} + assert_receive :element_guard_triggered assert_pipeline_notified(pipeline, :bin, :ready) - monitor_ref = Process.monitor(:membrane_resource_guard_test_bin_resource) Testing.Pipeline.execute_actions(pipeline, remove_children: :bin) - assert_receive {:DOWN, ^monitor_ref, :process, _pid, :shutdown} + assert_receive :bin_guard_triggered + + Testing.Pipeline.execute_actions(pipeline, + spec: [child(:element, Element), child(:bin, Bin)] + ) assert :ready = Membrane.Pipeline.call(pipeline, :setup_guard) - monitor_ref = Process.monitor(:membrane_resource_guard_test_pipeline_resource) Membrane.Pipeline.terminate(pipeline) - assert_receive {:DOWN, ^monitor_ref, :process, _pid, :shutdown} + assert_receive :element_guard_triggered + assert_receive :bin_guard_triggered + assert_receive :pipeline_guard_triggered end test "Resources can be cleaned up manually and automatically when the owner process dies" do