From 7be81c928e0e08cd5b8a2ebc15ea043d5202f05f Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Fri, 5 Jan 2024 15:58:03 +0100 Subject: [PATCH] wip --- lib/membrane/core/element.ex | 1 + .../core/element/demand_controller.ex | 21 ++-- .../demand_controller/auto_flow_utils.ex | 100 ++++++++++++++---- .../core/element/effective_flow_controller.ex | 14 +-- lib/membrane/core/element/event_controller.ex | 14 ++- lib/membrane/core/element/pad_controller.ex | 34 +++--- lib/membrane/core/element/state.ex | 2 + .../integration/auto_demands_test.exs | 19 +--- 8 files changed, 134 insertions(+), 71 deletions(-) diff --git a/lib/membrane/core/element.ex b/lib/membrane/core/element.ex index 59097c79b..97ce42934 100644 --- a/lib/membrane/core/element.ex +++ b/lib/membrane/core/element.ex @@ -158,6 +158,7 @@ defmodule Membrane.Core.Element do setup_incomplete?: false, effective_flow_control: :push, handling_action?: false, + popping_queue?: false, pads_to_snapshot: MapSet.new(), stalker: options.stalker, satisfied_auto_output_pads: MapSet.new(), diff --git a/lib/membrane/core/element/demand_controller.ex b/lib/membrane/core/element/demand_controller.ex index 8a81dfdca..79ce3643e 100644 --- a/lib/membrane/core/element/demand_controller.ex +++ b/lib/membrane/core/element/demand_controller.ex @@ -43,12 +43,7 @@ defmodule Membrane.Core.Element.DemandController do %{flow_control: :auto} = pad_data, %{effective_flow_control: :pull} = state ) do - %{ - atomic_demand: atomic_demand, - associated_pads: associated_pads - } = pad_data - - if AtomicDemand.get(atomic_demand) > 0 do + if AtomicDemand.get(pad_data.atomic_demand) > 0 do # tutaj powinno mieć miejsce # - usuniecie pada z mapsetu # - sflushowanie kolejek, jesli mapset jest pusty @@ -59,13 +54,15 @@ defmodule Membrane.Core.Element.DemandController do # dobra, wyglada git - state = AutoFlowUtils.pop_auto_flow_queues_while_needed(state) + AutoFlowUtils.pop_queues_and_bump_demand(state) - if MapSet.size(state.satisfied_auto_output_pads) == 0 do - AutoFlowUtils.auto_adjust_atomic_demand(associated_pads, state) - else - state - end + # state = AutoFlowUtils.pop_auto_flow_queues_while_needed(state) + + # if MapSet.size(state.satisfied_auto_output_pads) == 0 do + # AutoFlowUtils.auto_adjust_atomic_demand(associated_pads, state) + # else + # state + # end else state end diff --git a/lib/membrane/core/element/demand_controller/auto_flow_utils.ex b/lib/membrane/core/element/demand_controller/auto_flow_utils.ex index fb361cb17..b270f02e5 100644 --- a/lib/membrane/core/element/demand_controller/auto_flow_utils.ex +++ b/lib/membrane/core/element/demand_controller/auto_flow_utils.ex @@ -69,22 +69,32 @@ defmodule Membrane.Core.Element.DemandController.AutoFlowUtils do @spec store_buffers_in_queue(Pad.ref(), [Buffer.t()], State.t()) :: State.t() def store_buffers_in_queue(pad_ref, buffers, state) do state = Map.update!(state, :awaiting_auto_input_pads, &MapSet.put(&1, pad_ref)) - store_in_queue(pad_ref, :buffers, buffers, state) + PadModel.update_data!(state, pad_ref, :auto_flow_queue, fn queue -> + Enum.reduce(buffers, queue, fn buffer, queue -> Qex.push(queue, {:buffer, buffer})) + end) + + # store_in_queue(pad_ref, :buffers, buffers, state) end @spec store_event_in_queue(Pad.ref(), Event.t(), State.t()) :: State.t() def store_event_in_queue(pad_ref, event, state) do - store_in_queue(pad_ref, :event, event, state) + # store_in_queue(pad_ref, :event, event, state) + queue_item = {:event, event} + PadModel.update_data!(state, pad_ref, :auto_flow_queue, &Qex.push(&1, queue_item)) end @spec store_stream_format_in_queue(Pad.ref(), StreamFormat.t(), State.t()) :: State.t() def store_stream_format_in_queue(pad_ref, stream_format, state) do - store_in_queue(pad_ref, :stream_format, stream_format, state) + # store_in_queue(pad_ref, :stream_format, stream_format, state) + queue_item = {:stream_format, stream_format} + PadModel.update_data!(state, pad_ref, :auto_flow_queue, &Qex.push(&1, queue_item)) end - defp store_in_queue(pad_ref, type, item, state) do - PadModel.update_data!(state, pad_ref, :auto_flow_queue, &Qex.push(&1, {type, item})) - end + # defp store_in_queue(pad_ref, type, item, state) do + # PadModel.update_data!(state, pad_ref, :auto_flow_queue, &Qex.push(&1, {type, item})) + # end + + # defp update_queue @spec auto_adjust_atomic_demand(Pad.ref() | [Pad.ref()], State.t()) :: State.t() def auto_adjust_atomic_demand(ref_or_ref_list, state) @@ -94,7 +104,8 @@ defmodule Membrane.Core.Element.DemandController.AutoFlowUtils do |> Enum.reduce(state, fn pad_ref, state -> PadModel.get_data!(state, pad_ref) |> do_auto_adjust_atomic_demand(state) - |> elem(1) # todo: usun to :increased / :unchanged, ktore discardujesz w tym elem(1) + # todo: usun to :increased / :unchanged, ktore discardujesz w tym elem(1) + |> elem(1) end) end @@ -131,6 +142,31 @@ defmodule Membrane.Core.Element.DemandController.AutoFlowUtils do output_auto_demand_positive?(state) end + @spec pop_queues_and_bump_demand(State.t()) :: State.t() + def pop_queues_and_bump_demand(%State{popping_queue?: false} = state) do + %{state | popping_queue?: true} + |> bump_demand() + |> pop_auto_flow_queues_while_needed() + |> bump_demand() + |> Map.put(:popping_queue?, false) + end + + def pop_queues_and_bump_demand(%State{popping_queue?: true} = state), do: state + + defp bump_demand(state) do + if state.effective_flow_control == :pull and + MapSet.size(state.satisfied_auto_output_pads) == 0 do + state.pads_data + |> Enum.flat_map(fn + {ref, %{direction: :input, flow_control: :auto}} -> [ref] + _other -> [] + end) + |> auto_adjust_atomic_demand(state) + else + state + end + end + @spec pop_auto_flow_queues_while_needed(State.t()) :: State.t() def pop_auto_flow_queues_while_needed(state) do if (state.effective_flow_control == :push or @@ -143,10 +179,37 @@ defmodule Membrane.Core.Element.DemandController.AutoFlowUtils do end end + # @spec pop_auto_flow_queues_while_needed(State.t()) :: State.t() + # def pop_auto_flow_queues_while_needed(state) do + # if state.name == :tee do + # {state.effective_flow_control, state.satisfied_auto_output_pads} |> IO.inspect(label: "TAKIE TAM W POP WHILE") + # end + + # if (state.effective_flow_control == :push or + # MapSet.size(state.satisfied_auto_output_pads) == 0) and + # MapSet.size(state.awaiting_auto_input_pads) > 0 do + + # if state.name == :tee do + # IO.puts("A") + # end + + # pop_random_auto_flow_queue(state) + # |> pop_auto_flow_queues_while_needed() + # else + # if state.name == :tee do + # IO.puts("B") + # end + + # state + # end + # end + defp pop_random_auto_flow_queue(state) do pad_ref = Enum.random(state.awaiting_auto_input_pads) - PadModel.get_data!(state, pad_ref, :auto_flow_queue) + state + # pop_stream_formats_and_events(pad_ref, state) + |> PadModel.get_data!(pad_ref, :auto_flow_queue) |> Qex.pop() |> case do {{:value, {:buffers, buffers}}, popped_queue} -> @@ -163,9 +226,14 @@ defmodule Membrane.Core.Element.DemandController.AutoFlowUtils do PadModel.get_data!(state, pad_ref, :auto_flow_queue) |> Qex.pop() |> case do - {{:value, {type, item}}, popped_queue} when type in [:event, :stream_format] -> + {{:value, {:event, event}}, popped_queue} -> + state = PadModel.set_data!(state, pad_ref, :auto_flow_queue, popped_queue) + state = EventController.exec_handle_event(pad_ref, event, state) + pop_stream_formats_and_events(pad_ref, state) + + {{:value, {:stream_format, stream_format}}, popped_queue} -> state = PadModel.set_data!(state, pad_ref, :auto_flow_queue, popped_queue) - state = exec_queue_item_callback(pad_ref, {type, item}, state) + state = StreamFormatController.exec_handle_stream_format(pad_ref, stream_format, state) pop_stream_formats_and_events(pad_ref, state) {{:value, {:buffers, _buffers}}, _popped_queue} -> @@ -178,16 +246,4 @@ defmodule Membrane.Core.Element.DemandController.AutoFlowUtils do defp output_auto_demand_positive?(%State{satisfied_auto_output_pads: pads}), do: MapSet.size(pads) == 0 - - defp exec_queue_item_callback(pad_ref, {:buffers, buffers}, state) do - BufferController.exec_buffer_callback(pad_ref, buffers, state) - end - - defp exec_queue_item_callback(pad_ref, {:event, event}, state) do - EventController.exec_handle_event(pad_ref, event, state) - end - - defp exec_queue_item_callback(pad_ref, {:stream_format, stream_format}, state) do - StreamFormatController.exec_handle_stream_format(pad_ref, stream_format, state) - end end diff --git a/lib/membrane/core/element/effective_flow_controller.ex b/lib/membrane/core/element/effective_flow_controller.ex index 68ec75e63..928c77ef7 100644 --- a/lib/membrane/core/element/effective_flow_controller.ex +++ b/lib/membrane/core/element/effective_flow_controller.ex @@ -131,11 +131,13 @@ defmodule Membrane.Core.Element.EffectiveFlowController do end end) - state.pads_data - |> Enum.flat_map(fn - {pad_ref, %{direction: :input, flow_control: :auto}} -> [pad_ref] - _other -> [] - end) - |> AutoFlowUtils.auto_adjust_atomic_demand(state) + AutoFlowUtils.pop_queues_and_bump_demand(state) + + # state.pads_data + # |> Enum.flat_map(fn + # {pad_ref, %{direction: :input, flow_control: :auto}} -> [pad_ref] + # _other -> [] + # end) + # |> AutoFlowUtils.auto_adjust_atomic_demand(state) end end diff --git a/lib/membrane/core/element/event_controller.ex b/lib/membrane/core/element/event_controller.ex index 513c71854..f6cdf8000 100644 --- a/lib/membrane/core/element/event_controller.ex +++ b/lib/membrane/core/element/event_controller.ex @@ -53,7 +53,19 @@ defmodule Membrane.Core.Element.EventController do # event goes to the auto flow control queue pad_ref in state.awaiting_auto_input_pads -> - AutoFlowUtils.store_event_in_queue(pad_ref, event, state) + with %Membrane.Core.Events.EndOfStream{} <- event do + PadModel.get_data!(state, pad_ref, :auto_flow_queue) + |> IO.inspect(label: "AFQ 1 #{inspect(state.name)}", limit: :infinity) + end + + state = AutoFlowUtils.store_event_in_queue(pad_ref, event, state) + + with %Membrane.Core.Events.EndOfStream{} <- event do + PadModel.get_data!(state, pad_ref, :auto_flow_queue) + |> IO.inspect(label: "AFQ 2 #{inspect(state.name)}", limit: :infinity) + end + + state true -> exec_handle_event(pad_ref, event, state) diff --git a/lib/membrane/core/element/pad_controller.ex b/lib/membrane/core/element/pad_controller.ex index 281440425..4122bd7eb 100644 --- a/lib/membrane/core/element/pad_controller.ex +++ b/lib/membrane/core/element/pad_controller.ex @@ -233,12 +233,19 @@ defmodule Membrane.Core.Element.PadController do Map.update!(state, :pad_refs, &List.delete(&1, pad_ref)) |> PadModel.pop_data!(pad_ref) + IO.inspect(pad_ref, label: "PAD REF") + IO.inspect(state, label: "PRZED", limit: :infinity) + with %{direction: :input, flow_control: :auto, other_effective_flow_control: :pull} <- pad_data do EffectiveFlowController.resolve_effective_flow_control(state) else _pad_data -> state end + # |> Map.update!(:satisfied_auto_output_pads, &MapSet.delete(&1, pad_ref)) + # |> Map.update!(:awaiting_auto_input_pads, &MapSet.delete(&1, pad_ref)) + |> IO.inspect(label: "PO", limit: :infinity) + |> AutoFlowUtils.pop_queues_and_bump_demand() else {:ok, %{availability: :always}} when state.terminating? -> state @@ -484,18 +491,21 @@ defmodule Membrane.Core.Element.PadController do def remove_pad_associations(pad_ref, state) do case PadModel.get_data!(state, pad_ref) do %{flow_control: :auto} = pad_data -> - state = - Enum.reduce(pad_data.associated_pads, state, fn pad, state -> - PadModel.update_data!(state, pad, :associated_pads, &List.delete(&1, pad_data.ref)) - end) - |> PadModel.set_data!(pad_ref, :associated_pads, []) - |> Map.update!(:satisfied_auto_output_pads, &MapSet.delete(&1, pad_ref)) - |> Map.update!(:awaiting_auto_input_pads, &MapSet.delete(&1, pad_ref)) - |> AutoFlowUtils.pop_auto_flow_queues_while_needed() - - if pad_data.direction == :output, - do: AutoFlowUtils.auto_adjust_atomic_demand(pad_data.associated_pads, state), - else: state + # state = + Enum.reduce(pad_data.associated_pads, state, fn pad, state -> + PadModel.update_data!(state, pad, :associated_pads, &List.delete(&1, pad_data.ref)) + end) + |> PadModel.set_data!(pad_ref, :associated_pads, []) + |> Map.update!(:satisfied_auto_output_pads, &MapSet.delete(&1, pad_ref)) + |> Map.update!(:awaiting_auto_input_pads, &MapSet.delete(&1, pad_ref)) + + # |> AutoFlowUtils.pop_queues_and_bump_demand() + + # |> AutoFlowUtils.pop_auto_flow_queues_while_needed() + + # if pad_data.direction == :output, + # do: AutoFlowUtils.auto_adjust_atomic_demand(pad_data.associated_pads, state), + # else: state _pad_data -> state diff --git a/lib/membrane/core/element/state.ex b/lib/membrane/core/element/state.ex index f924cfb42..8b972c70e 100644 --- a/lib/membrane/core/element/state.ex +++ b/lib/membrane/core/element/state.ex @@ -42,6 +42,7 @@ defmodule Membrane.Core.Element.State do setup_incomplete?: boolean(), effective_flow_control: EffectiveFlowController.effective_flow_control(), handling_action?: boolean(), + popping_queue?: boolean(), pads_to_snapshot: MapSet.t(), stalker: Membrane.Core.Stalker.t(), satisfied_auto_output_pads: MapSet.t(), @@ -74,6 +75,7 @@ defmodule Membrane.Core.Element.State do :setup_incomplete?, :supplying_demand?, :handling_action?, + :popping_queue?, :stalker, :resource_guard, :subprocess_supervisor, diff --git a/test/membrane/integration/auto_demands_test.exs b/test/membrane/integration/auto_demands_test.exs index f0826f165..f44e68974 100644 --- a/test/membrane/integration/auto_demands_test.exs +++ b/test/membrane/integration/auto_demands_test.exs @@ -156,24 +156,7 @@ defmodule Membrane.Integration.AutoDemandsTest do IO.puts("START OF THE ASSERTIONS") Enum.each(1..100_000, fn payload -> - IO.puts("ASSERTION NO. #{inspect(payload)}") - - if payload == 801 do - Process.sleep(500) - - for name <- [:source, :tee, :left_sink] do - Pipeline.get_child_pid!(pipeline, name) - |> :sys.get_state() - |> IO.inspect(label: "NAME OF #{inspect(name)}", limit: :infinity) - end - - Pipeline.get_child_pid!(pipeline, :left_sink) - |> :sys.get_state() - |> get_in([:pads_data, :input, :input_queue]) - |> Map.get(:atomic_demand) - |> Membrane.Core.Element.AtomicDemand.get() - |> IO.inspect(label: "ATOMIC DEMAND VALUE") - end + # IO.puts("ASSERTION NO. #{inspect(payload)}") assert_sink_buffer(pipeline, :left_sink, buffer) assert buffer.payload == payload