diff --git a/CHANGELOG.md b/CHANGELOG.md index a75560ee0..da6ceab62 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ## 1.0.1 * Specify the order in which state fields will be printed in the error logs. [#614](https://github.com/membraneframework/membrane_core/pull/614) + * Handle buffers from input pads having `flow_control: :auto` only if demand on all output pads having `flow_control: :auto` is positive. [#693](https://github.com/membraneframework/membrane_core/pull/693) * Fix clock selection [#626](https://github.com/membraneframework/membrane_core/pull/626) * Log messages in the default handle_info implementation [#680](https://github.com/membraneframework/membrane_core/pull/680) * Fix typespecs in Membrane.UtilitySupervisor [#681](https://github.com/membraneframework/membrane_core/pull/681) diff --git a/lib/membrane/core/child/pad_model.ex b/lib/membrane/core/child/pad_model.ex index c545a7421..9fd1c4ace 100644 --- a/lib/membrane/core/child/pad_model.ex +++ b/lib/membrane/core/child/pad_model.ex @@ -42,7 +42,6 @@ defmodule Membrane.Core.Child.PadModel do input_queue: Membrane.Core.Element.InputQueue.t() | nil, options: %{optional(atom) => any}, auto_demand_size: pos_integer() | nil, - associated_pads: [Pad.ref()] | nil, sticky_events: [Membrane.Event.t()], stalker_metrics: %{atom => :atomics.atomics_ref()} } diff --git a/lib/membrane/core/element.ex b/lib/membrane/core/element.ex index 8bd3d91c7..b6500a593 100644 --- a/lib/membrane/core/element.ex +++ b/lib/membrane/core/element.ex @@ -158,8 +158,12 @@ defmodule Membrane.Core.Element do setup_incomplete?: false, effective_flow_control: :push, handling_action?: false, + popping_auto_flow_queue?: false, pads_to_snapshot: MapSet.new(), - stalker: options.stalker + stalker: options.stalker, + satisfied_auto_output_pads: MapSet.new(), + awaiting_auto_input_pads: MapSet.new(), + auto_input_pads: [] } |> PadSpecHandler.init_pads() diff --git a/lib/membrane/core/element/action_handler.ex b/lib/membrane/core/element/action_handler.ex index 0812367b6..759237203 100644 --- a/lib/membrane/core/element/action_handler.ex +++ b/lib/membrane/core/element/action_handler.ex @@ -24,11 +24,11 @@ defmodule Membrane.Core.Element.ActionHandler do alias Membrane.Core.Element.{ DemandController, DemandHandler, - PadController, State, StreamFormatController } + alias Membrane.Core.Element.DemandController.AutoFlowUtils alias Membrane.Core.{Events, TimerController} alias Membrane.Element.Action @@ -176,7 +176,11 @@ defmodule Membrane.Core.Element.ActionHandler do _other -> :output end - pads = state |> PadModel.filter_data(%{direction: dir}) |> Map.keys() + pads = + Enum.flat_map(state.pads_data, fn + {pad_ref, %{direction: ^dir}} -> [pad_ref] + _pad_entry -> [] + end) Enum.reduce(pads, state, fn pad, state -> action = @@ -466,8 +470,9 @@ defmodule Membrane.Core.Element.ActionHandler do @spec handle_outgoing_event(Pad.ref(), Event.t(), State.t()) :: State.t() defp handle_outgoing_event(pad_ref, %Events.EndOfStream{}, state) do with %{direction: :output, end_of_stream?: false} <- PadModel.get_data!(state, pad_ref) do - state = PadController.remove_pad_associations(pad_ref, state) - PadModel.set_data!(state, pad_ref, :end_of_stream?, true) + Map.update!(state, :satisfied_auto_output_pads, &MapSet.delete(&1, pad_ref)) + |> PadModel.set_data!(pad_ref, :end_of_stream?, true) + |> AutoFlowUtils.pop_queues_and_bump_demand() else %{direction: :input} -> raise PadDirectionError, action: "end of stream", direction: :input, pad: pad_ref diff --git a/lib/membrane/core/element/atomic_demand.ex b/lib/membrane/core/element/atomic_demand.ex index 1b387254e..4b7a9a020 100644 --- a/lib/membrane/core/element/atomic_demand.ex +++ b/lib/membrane/core/element/atomic_demand.ex @@ -139,14 +139,14 @@ defmodule Membrane.Core.Element.AtomicDemand do :ok end - @spec decrease(t, non_neg_integer()) :: t + @spec decrease(t, non_neg_integer()) :: {{:decreased, integer()}, t} | {:unchanged, t} def decrease(%__MODULE__{} = atomic_demand, value) do atomic_demand = Map.update!(atomic_demand, :buffered_decrementation, &(&1 + value)) if atomic_demand.buffered_decrementation >= atomic_demand.throttling_factor do flush_buffered_decrementation(atomic_demand) else - atomic_demand + {:unchanged, atomic_demand} end end @@ -164,14 +164,17 @@ defmodule Membrane.Core.Element.AtomicDemand do atomic_demand = %{atomic_demand | buffered_decrementation: 0} - if not atomic_demand.toilet_overflowed? and - get_receiver_status(atomic_demand) == {:resolved, :pull} and - get_sender_status(atomic_demand) == {:resolved, :push} and - -1 * atomic_demand_value > atomic_demand.toilet_capacity do - overflow(atomic_demand, atomic_demand_value) - else - atomic_demand - end + atomic_demand = + if not atomic_demand.toilet_overflowed? and + get_receiver_status(atomic_demand) == {:resolved, :pull} and + get_sender_status(atomic_demand) == {:resolved, :push} and + -1 * atomic_demand_value > atomic_demand.toilet_capacity do + overflow(atomic_demand, atomic_demand_value) + else + atomic_demand + end + + {{:decreased, atomic_demand_value}, atomic_demand} end defp overflow(atomic_demand, atomic_demand_value) do diff --git a/lib/membrane/core/element/buffer_controller.ex b/lib/membrane/core/element/buffer_controller.ex index ada22f144..986a204cf 100644 --- a/lib/membrane/core/element/buffer_controller.ex +++ b/lib/membrane/core/element/buffer_controller.ex @@ -69,8 +69,12 @@ defmodule Membrane.Core.Element.BufferController do state = PadModel.set_data!(state, pad_ref, :demand, demand - buf_size) :atomics.put(stalker_metrics.demand, 1, demand - buf_size) - state = AutoFlowUtils.auto_adjust_atomic_demand(pad_ref, state) - exec_buffer_callback(pad_ref, buffers, state) + if state.effective_flow_control == :pull and MapSet.size(state.satisfied_auto_output_pads) > 0 do + AutoFlowUtils.store_buffers_in_queue(pad_ref, buffers, state) + else + state = exec_buffer_callback(pad_ref, buffers, state) + AutoFlowUtils.auto_adjust_atomic_demand(pad_ref, state) + end end defp do_handle_buffer(pad_ref, %{flow_control: :manual} = data, buffers, state) do @@ -93,11 +97,7 @@ defmodule Membrane.Core.Element.BufferController do @doc """ Executes `handle_buffer` callback. """ - @spec exec_buffer_callback( - Pad.ref(), - [Buffer.t()] | Buffer.t(), - State.t() - ) :: State.t() + @spec exec_buffer_callback(Pad.ref(), [Buffer.t()], State.t()) :: State.t() def exec_buffer_callback(pad_ref, buffers, %State{type: :filter} = state) do Telemetry.report_metric("buffer", 1, inspect(pad_ref)) diff --git a/lib/membrane/core/element/demand_controller.ex b/lib/membrane/core/element/demand_controller.ex index 20d164dc6..daa5fb784 100644 --- a/lib/membrane/core/element/demand_controller.ex +++ b/lib/membrane/core/element/demand_controller.ex @@ -23,13 +23,20 @@ defmodule Membrane.Core.Element.DemandController do @spec snapshot_atomic_demand(Pad.ref(), State.t()) :: State.t() def snapshot_atomic_demand(pad_ref, state) do - with {:ok, pad_data} <- PadModel.get_data(state, pad_ref), + with {:ok, pad_data} when not pad_data.end_of_stream? <- PadModel.get_data(state, pad_ref), %State{playback: :playing} <- state do if pad_data.direction == :input, do: raise("cannot snapshot atomic counter in input pad") do_snapshot_atomic_demand(pad_data, state) else + {:ok, %{end_of_stream?: true}} -> + Membrane.Logger.debug_verbose( + "Skipping snapshot of pad #{inspect(pad_ref)}, because it has flag :end_of_stream? set to true" + ) + + state + {:error, :unknown_pad} -> # We've got a :atomic_demand_increased message on already unlinked pad state @@ -43,13 +50,10 @@ defmodule Membrane.Core.Element.DemandController do %{flow_control: :auto} = pad_data, %{effective_flow_control: :pull} = state ) do - %{ - atomic_demand: atomic_demand, - associated_pads: associated_pads - } = pad_data - - if AtomicDemand.get(atomic_demand) > 0 do - AutoFlowUtils.auto_adjust_atomic_demand(associated_pads, state) + if AtomicDemand.get(pad_data.atomic_demand) > 0 do + state + |> Map.update!(:satisfied_auto_output_pads, &MapSet.delete(&1, pad_data.ref)) + |> AutoFlowUtils.pop_queues_and_bump_demand() else state end @@ -91,9 +95,15 @@ defmodule Membrane.Core.Element.DemandController do buffers_size = Buffer.Metric.from_unit(pad_data.demand_unit).buffers_size(buffers) demand = pad_data.demand - buffers_size - atomic_demand = AtomicDemand.decrease(pad_data.atomic_demand, buffers_size) + {decrease_result, atomic_demand} = AtomicDemand.decrease(pad_data.atomic_demand, buffers_size) - PadModel.set_data!(state, pad_ref, %{ + with {:decreased, new_value} when new_value <= 0 <- decrease_result, + %{flow_control: :auto} <- pad_data do + Map.update!(state, :satisfied_auto_output_pads, &MapSet.put(&1, pad_ref)) + else + _other -> state + end + |> PadModel.set_data!(pad_ref, %{ pad_data | demand: demand, atomic_demand: atomic_demand diff --git a/lib/membrane/core/element/demand_controller/auto_flow_utils.ex b/lib/membrane/core/element/demand_controller/auto_flow_utils.ex index a76a7441f..62d0b2077 100644 --- a/lib/membrane/core/element/demand_controller/auto_flow_utils.ex +++ b/lib/membrane/core/element/demand_controller/auto_flow_utils.ex @@ -1,15 +1,95 @@ defmodule Membrane.Core.Element.DemandController.AutoFlowUtils do @moduledoc false + alias Membrane.Buffer + alias Membrane.Event + alias Membrane.StreamFormat + alias Membrane.Core.Element.{ AtomicDemand, - State + BufferController, + EventController, + State, + StreamFormatController } require Membrane.Core.Child.PadModel, as: PadModel require Membrane.Logger require Membrane.Pad, as: Pad + @empty_map_set MapSet.new() + + # Description of the auto flow control queueing mechanism + + # General concept: Buffers coming to auto input pads should be handled only if + # all auto output pads have positive demand. Buffers arriving when any of the auto + # output pads has negative demand should be queued and only processed when the + # demand everywhere is positive + + # An Element is `corked` when its effective flow control is :pull and it has an auto output pad, + # who's demand is non-positive + + # The following events can make the element shift from `corked` state to `uncorked` state: + # - change of effective flow control from :pull to :push + # - increase in the value of auto output pad demand. We check the demand value: + # - after sending the buffer to a given output pad + # - after receiving a message :atomic_demand_increased from the next element + # - unlinking an auto output pad + # - sending EOS to an auto output pad + + # Analogically, transition from `uncorcked` to `corcked` might be caused by: + # - change of effective flow control from :push to :pull + # - sending a buffer through an output pad + # - linking an output pad + + # In addition, an invariant is maintained, which is that the head of all non-empty + # auto_flow_queue queues contains a buffer (the queue can also contain events and + # stream formats). After popping a queue + # of a given pad, if it has an event or stream format in its head, we pop it further, + # until it becomes empty or a buffer is encountered. + + # auto_flow_queues hold single buffers, event if they arrive to the element in batch, because if we + # done otherwise, we would have to handle whole batch after popping it from the queue, even if demand + # of all output pads would be satisfied after handling first buffer + + # Fields in Element state, that take important part in this mechanism: + # - satisfied_auto_output_pads - MapSet of auto output pads, whose demand is less than or equal to 0. + # We consider only pads with the end_of_stream? flag set to false + # - awaiting_auto_input_pads - MapSet of auto input pads, which have a non-empty auto_flow_queue + # - popping_auto_flow_queue? - a flag determining whether we are on the stack somewhere above popping a queue. + # It's used to avoid situations where the function that pops from the queue calls itself multiple times, + # what could potentially lead to things like altering the order of sent buffers. + + # Each auto input pad in PadData contains a queue in the :auto_flow_queue field, in which it stores queued + # buffers, events and stream formats. If queue is non-empty, corresponding pad_ref should be + # in the Mapset awaiting_auto_input_pads in element state + + # The introduced mechanism consists of two parts, the pseudocode for which is included below + + # def onBufferArrived() do + # if element uncorked do + # exec handle_buffer + # else + # store buffer in queue + # end + # end + + # def onUncorck() do + # # EFC means `effective flow control` + + # if EFC == pull do + # bump demand on auto input pads with an empty queue + # end + + # while (output demand positive or EFC == push) and some queues are not empty do + # pop random queue and handle its head + # end + + # if EFC == pull do + # bump demand on auto input pads with an empty queue + # end + # end + defguardp is_input_auto_pad_data(pad_data) when is_map(pad_data) and is_map_key(pad_data, :flow_control) and pad_data.flow_control == :auto and is_map_key(pad_data, :direction) and @@ -59,12 +139,39 @@ defmodule Membrane.Core.Element.DemandController.AutoFlowUtils do state end + @spec store_buffers_in_queue(Pad.ref(), [Buffer.t()], State.t()) :: State.t() + def store_buffers_in_queue(pad_ref, buffers, state) do + state + |> Map.update!(:awaiting_auto_input_pads, &MapSet.put(&1, pad_ref)) + |> PadModel.update_data!(pad_ref, :auto_flow_queue, fn queue -> + Enum.reduce(buffers, queue, fn buffer, queue -> + Qex.push(queue, {:buffer, buffer}) + end) + end) + end + + @spec store_event_in_queue(Pad.ref(), Event.t(), State.t()) :: State.t() + def store_event_in_queue(pad_ref, event, state) do + queue_item = {:event, event} + PadModel.update_data!(state, pad_ref, :auto_flow_queue, &Qex.push(&1, queue_item)) + end + + @spec store_stream_format_in_queue(Pad.ref(), StreamFormat.t(), State.t()) :: State.t() + def store_stream_format_in_queue(pad_ref, stream_format, state) do + queue_item = {:stream_format, stream_format} + PadModel.update_data!(state, pad_ref, :auto_flow_queue, &Qex.push(&1, queue_item)) + end + @spec auto_adjust_atomic_demand(Pad.ref() | [Pad.ref()], State.t()) :: State.t() def auto_adjust_atomic_demand(pad_ref_list, state) when is_list(pad_ref_list) do - Enum.reduce(pad_ref_list, state, &auto_adjust_atomic_demand/2) + pad_ref_list + |> Enum.reduce(state, fn pad_ref, state -> + PadModel.get_data!(state, pad_ref) + |> do_auto_adjust_atomic_demand(state) + end) end - def auto_adjust_atomic_demand(pad_ref, state) when Pad.is_pad_ref(pad_ref) do + def auto_adjust_atomic_demand(pad_ref, state) do PadModel.get_data!(state, pad_ref) |> do_auto_adjust_atomic_demand(state) end @@ -83,6 +190,7 @@ defmodule Membrane.Core.Element.DemandController.AutoFlowUtils do :ok = AtomicDemand.increase(atomic_demand, diff) :atomics.put(stalker_metrics.demand, 1, auto_demand_size) + PadModel.set_data!(state, ref, :demand, auto_demand_size) else state @@ -97,14 +205,96 @@ defmodule Membrane.Core.Element.DemandController.AutoFlowUtils do state.effective_flow_control == :pull and not pad_data.auto_demand_paused? and pad_data.demand < pad_data.auto_demand_size / 2 and - Enum.all?(pad_data.associated_pads, &atomic_demand_positive?(&1, state)) + state.satisfied_auto_output_pads == @empty_map_set + end + + @spec pop_queues_and_bump_demand(State.t()) :: State.t() + def pop_queues_and_bump_demand(%State{popping_auto_flow_queue?: true} = state), do: state + + def pop_queues_and_bump_demand(%State{} = state) do + %{state | popping_auto_flow_queue?: true} + |> pop_auto_flow_queues_while_needed() + |> bump_demand() + |> Map.put(:popping_auto_flow_queue?, false) + end + + defp bump_demand(state) do + if state.effective_flow_control == :pull and + state.satisfied_auto_output_pads == @empty_map_set do + do_bump_demand(state) + else + state + end + end + + defp do_bump_demand(state) do + state.auto_input_pads + |> Enum.reject(&MapSet.member?(state.awaiting_auto_input_pads, &1)) + |> Enum.reduce(state, fn pad_ref, state -> + pad_data = PadModel.get_data!(state, pad_ref) + + if not pad_data.auto_demand_paused? and + pad_data.demand < pad_data.auto_demand_size / 2 do + diff = pad_data.auto_demand_size - pad_data.demand + :ok = AtomicDemand.increase(pad_data.atomic_demand, diff) + + :atomics.put(pad_data.stalker_metrics.demand, 1, pad_data.auto_demand_size) + + PadModel.set_data!(state, pad_ref, :demand, pad_data.auto_demand_size) + else + state + end + end) + end + + @spec pop_auto_flow_queues_while_needed(State.t()) :: State.t() + def pop_auto_flow_queues_while_needed(state) do + if (state.effective_flow_control == :push or + MapSet.size(state.satisfied_auto_output_pads) == 0) and + MapSet.size(state.awaiting_auto_input_pads) > 0 do + pop_random_auto_flow_queue(state) + |> pop_auto_flow_queues_while_needed() + else + state + end end - defp atomic_demand_positive?(pad_ref, state) do - atomic_demand_value = - PadModel.get_data!(state, pad_ref, :atomic_demand) - |> AtomicDemand.get() + defp pop_random_auto_flow_queue(state) do + pad_ref = Enum.random(state.awaiting_auto_input_pads) - atomic_demand_value > 0 + state + |> PadModel.get_data!(pad_ref, :auto_flow_queue) + |> Qex.pop() + |> case do + {{:value, {:buffer, buffer}}, popped_queue} -> + state = PadModel.set_data!(state, pad_ref, :auto_flow_queue, popped_queue) + state = BufferController.exec_buffer_callback(pad_ref, [buffer], state) + pop_stream_formats_and_events(pad_ref, state) + + {:empty, _empty_queue} -> + Map.update!(state, :awaiting_auto_input_pads, &MapSet.delete(&1, pad_ref)) + end + end + + defp pop_stream_formats_and_events(pad_ref, state) do + PadModel.get_data!(state, pad_ref, :auto_flow_queue) + |> Qex.pop() + |> case do + {{:value, {:event, event}}, popped_queue} -> + state = PadModel.set_data!(state, pad_ref, :auto_flow_queue, popped_queue) + state = EventController.exec_handle_event(pad_ref, event, state) + pop_stream_formats_and_events(pad_ref, state) + + {{:value, {:stream_format, stream_format}}, popped_queue} -> + state = PadModel.set_data!(state, pad_ref, :auto_flow_queue, popped_queue) + state = StreamFormatController.exec_handle_stream_format(pad_ref, stream_format, state) + pop_stream_formats_and_events(pad_ref, state) + + {{:value, {:buffer, _buffer}}, _popped_queue} -> + state + + {:empty, _empty_queue} -> + Map.update!(state, :awaiting_auto_input_pads, &MapSet.delete(&1, pad_ref)) + end end end diff --git a/lib/membrane/core/element/effective_flow_controller.ex b/lib/membrane/core/element/effective_flow_controller.ex index 8db26fb79..6d0849439 100644 --- a/lib/membrane/core/element/effective_flow_controller.ex +++ b/lib/membrane/core/element/effective_flow_controller.ex @@ -18,6 +18,7 @@ defmodule Membrane.Core.Element.EffectiveFlowController do # Effective flow control of a single element can switch between :push and :pull many times during the element's lifetime. + alias Membrane.Core.Element.DemandController alias Membrane.Core.Element.DemandController.AutoFlowUtils alias Membrane.Core.Element.{AtomicDemand, State} @@ -104,8 +105,8 @@ defmodule Membrane.Core.Element.EffectiveFlowController do state.pads_data |> Enum.filter(fn {_ref, %{flow_control: flow_control}} -> flow_control == :auto end) - |> Enum.reduce(state, fn - {_ref, %{direction: :output} = pad_data}, state -> + |> Enum.each(fn + {_ref, %{direction: :output} = pad_data} -> :ok = AtomicDemand.set_sender_status( pad_data.atomic_demand, @@ -120,9 +121,7 @@ defmodule Membrane.Core.Element.EffectiveFlowController do [pad_data.other_ref, new_effective_flow_control] ) - state - - {pad_ref, %{direction: :input} = pad_data}, state -> + {pad_ref, %{direction: :input} = pad_data} -> if triggering_pad in [pad_ref, nil] or AtomicDemand.get_receiver_status(pad_data.atomic_demand) != :to_be_resolved do :ok = @@ -131,8 +130,17 @@ defmodule Membrane.Core.Element.EffectiveFlowController do {:resolved, new_effective_flow_control} ) end - - AutoFlowUtils.auto_adjust_atomic_demand(pad_ref, state) end) + + with %{effective_flow_control: :pull} <- state do + Enum.reduce(state.pads_data, state, fn + {pad_ref, %{direction: :output, flow_control: :auto, end_of_stream?: false}}, state -> + DemandController.snapshot_atomic_demand(pad_ref, state) + + _pad_entry, state -> + state + end) + end + |> AutoFlowUtils.pop_queues_and_bump_demand() end end diff --git a/lib/membrane/core/element/event_controller.ex b/lib/membrane/core/element/event_controller.ex index 377b70a30..e4aff679e 100644 --- a/lib/membrane/core/element/event_controller.ex +++ b/lib/membrane/core/element/event_controller.ex @@ -13,11 +13,12 @@ defmodule Membrane.Core.Element.EventController do ActionHandler, CallbackContext, InputQueue, - PadController, PlaybackQueue, State } + alias Membrane.Core.Element.DemandController.AutoFlowUtils + require Membrane.Core.Child.PadModel require Membrane.Core.Message require Membrane.Core.Telemetry @@ -39,15 +40,24 @@ defmodule Membrane.Core.Element.EventController do playback: %State{playback: :playing} <- state do Telemetry.report_metric(:event, 1, inspect(pad_ref)) - if not Event.async?(event) and buffers_before_event_present?(data) do - PadModel.update_data!( - state, - pad_ref, - :input_queue, - &InputQueue.store(&1, :event, event) - ) - else - exec_handle_event(pad_ref, event, state) + async? = Event.async?(event) + + cond do + # events goes to the manual flow control input queue + not async? and buffers_before_event_present?(data) -> + PadModel.update_data!( + state, + pad_ref, + :input_queue, + &InputQueue.store(&1, :event, event) + ) + + # event goes to the auto flow control queue + not async? and MapSet.member?(state.awaiting_auto_input_pads, pad_ref) -> + AutoFlowUtils.store_event_in_queue(pad_ref, event, state) + + true -> + exec_handle_event(pad_ref, event, state) end else pad: {:error, :unknown_pad} -> @@ -97,8 +107,10 @@ defmodule Membrane.Core.Element.EventController do else Membrane.Logger.debug("Received end of stream on pad #{inspect(pad_ref)}") - state = PadModel.set_data!(state, pad_ref, :end_of_stream?, true) - state = PadController.remove_pad_associations(pad_ref, state) + state = + PadModel.set_data!(state, pad_ref, :end_of_stream?, true) + |> Map.update!(:awaiting_auto_input_pads, &MapSet.delete(&1, pad_ref)) + |> Map.update!(:auto_input_pads, &List.delete(&1, pad_ref)) %{ start_of_stream?: start_of_stream?, diff --git a/lib/membrane/core/element/pad_controller.ex b/lib/membrane/core/element/pad_controller.ex index 6b4d73ff9..61530f83f 100644 --- a/lib/membrane/core/element/pad_controller.ex +++ b/lib/membrane/core/element/pad_controller.ex @@ -227,7 +227,6 @@ defmodule Membrane.Core.Element.PadController do Stalker.unregister_link(state.stalker, pad_ref) state = generate_eos_if_needed(pad_ref, state) state = maybe_handle_pad_removed(pad_ref, state) - state = remove_pad_associations(pad_ref, state) {pad_data, state} = Map.update!(state, :pad_refs, &List.delete(&1, pad_ref)) @@ -239,6 +238,10 @@ defmodule Membrane.Core.Element.PadController do else _pad_data -> state end + |> Map.update!(:satisfied_auto_output_pads, &MapSet.delete(&1, pad_ref)) + |> Map.update!(:awaiting_auto_input_pads, &MapSet.delete(&1, pad_ref)) + |> Map.update!(:auto_input_pads, &List.delete(&1, pad_ref)) + |> AutoFlowUtils.pop_queues_and_bump_demand() else {:ok, %{availability: :always}} when state.terminating? -> state @@ -307,7 +310,6 @@ defmodule Membrane.Core.Element.PadController do stream_format: nil, start_of_stream?: false, end_of_stream?: false, - associated_pads: [], atomic_demand: metadata.atomic_demand, stalker_metrics: %{ total_buffers: total_buffers_metric @@ -328,31 +330,19 @@ defmodule Membrane.Core.Element.PadController do {:resolved, EffectiveFlowController.get_pad_effective_flow_control(pad_data.ref, state)} ) - state = update_associated_pads(pad_data, state) + case pad_data do + %{direction: :output, flow_control: :auto} -> + Map.update!(state, :satisfied_auto_output_pads, &MapSet.put(&1, pad_data.ref)) - if pad_data.direction == :input and pad_data.flow_control == :auto do - AutoFlowUtils.auto_adjust_atomic_demand(endpoint.pad_ref, state) - else - state - end - end + %{direction: :input, flow_control: :auto} -> + AutoFlowUtils.auto_adjust_atomic_demand(endpoint.pad_ref, state) + |> Map.update!(:auto_input_pads, &[endpoint.pad_ref | &1]) - defp update_associated_pads(%{flow_control: :auto} = pad_data, state) do - state.pads_data - |> Map.values() - |> Enum.filter(&(&1.direction != pad_data.direction and &1.flow_control == :auto)) - |> Enum.reduce(state, fn associated_pad_data, state -> - PadModel.update_data!( - state, - associated_pad_data.ref, - :associated_pads, - &[pad_data.ref | &1] - ) - end) + _pad_data -> + state + end end - defp update_associated_pads(_pad_data, state), do: state - defp merge_pad_direction_data(%{direction: :input} = pad_data, metadata, _state) do pad_data |> Map.merge(%{ @@ -412,14 +402,8 @@ defmodule Membrane.Core.Element.PadController do %{flow_control: :auto, direction: direction} = pad_data, pad_props, _other_info, - %State{} = state + _state ) do - associated_pads = - state.pads_data - |> Map.values() - |> Enum.filter(&(&1.direction != direction and &1.flow_control == :auto)) - |> Enum.map(& &1.ref) - auto_demand_size = cond do direction == :output -> @@ -449,7 +433,6 @@ defmodule Membrane.Core.Element.PadController do pad_data |> Map.merge(%{ demand: 0, - associated_pads: associated_pads, auto_demand_size: auto_demand_size }) |> put_in([:stalker_metrics, :demand], demand_metric) @@ -472,28 +455,6 @@ defmodule Membrane.Core.Element.PadController do end end - @doc """ - Removes all associations between the given pad and any other_endpoint pads. - """ - @spec remove_pad_associations(Pad.ref(), State.t()) :: State.t() - def remove_pad_associations(pad_ref, state) do - case PadModel.get_data!(state, pad_ref) do - %{flow_control: :auto} = pad_data -> - state = - Enum.reduce(pad_data.associated_pads, state, fn pad, state -> - PadModel.update_data!(state, pad, :associated_pads, &List.delete(&1, pad_data.ref)) - end) - |> PadModel.set_data!(pad_ref, :associated_pads, []) - - if pad_data.direction == :output, - do: AutoFlowUtils.auto_adjust_atomic_demand(pad_data.associated_pads, state), - else: state - - _pad_data -> - state - end - end - @spec maybe_handle_pad_added(Pad.ref(), State.t()) :: State.t() defp maybe_handle_pad_added(ref, state) do %{options: pad_opts, availability: availability} = PadModel.get_data!(state, ref) diff --git a/lib/membrane/core/element/state.ex b/lib/membrane/core/element/state.ex index 141b53afa..9f990e68f 100644 --- a/lib/membrane/core/element/state.ex +++ b/lib/membrane/core/element/state.ex @@ -33,6 +33,7 @@ defmodule Membrane.Core.Element.State do stream_sync: Sync.t(), clock: Clock.t() | nil }, + auto_input_pads: [Pad.ref()], initialized?: boolean(), playback: Membrane.Playback.t(), playback_queue: Membrane.Core.Element.PlaybackQueue.t(), @@ -42,8 +43,11 @@ defmodule Membrane.Core.Element.State do setup_incomplete?: boolean(), effective_flow_control: EffectiveFlowController.effective_flow_control(), handling_action?: boolean(), + popping_auto_flow_queue?: boolean(), pads_to_snapshot: MapSet.t(), - stalker: Membrane.Core.Stalker.t() + stalker: Membrane.Core.Stalker.t(), + satisfied_auto_output_pads: MapSet.t(), + awaiting_auto_input_pads: MapSet.t() } # READ THIS BEFORE ADDING NEW FIELD!!! @@ -72,6 +76,7 @@ defmodule Membrane.Core.Element.State do :setup_incomplete?, :supplying_demand?, :handling_action?, + :popping_auto_flow_queue?, :stalker, :resource_guard, :subprocess_supervisor, @@ -79,6 +84,9 @@ defmodule Membrane.Core.Element.State do :demand_size, :pads_to_snapshot, :playback_queue, - :pads_data + :pads_data, + :satisfied_auto_output_pads, + :awaiting_auto_input_pads, + :auto_input_pads ] end diff --git a/lib/membrane/core/element/stream_format_controller.ex b/lib/membrane/core/element/stream_format_controller.ex index 1c03bce9e..b22637cc7 100644 --- a/lib/membrane/core/element/stream_format_controller.ex +++ b/lib/membrane/core/element/stream_format_controller.ex @@ -9,6 +9,7 @@ defmodule Membrane.Core.Element.StreamFormatController do alias Membrane.Core.{CallbackHandler, Telemetry} alias Membrane.Core.Child.PadModel alias Membrane.Core.Element.{ActionHandler, CallbackContext, InputQueue, PlaybackQueue, State} + alias Membrane.Core.Element.DemandController.AutoFlowUtils require Membrane.Core.Child.PadModel require Membrane.Core.Telemetry @@ -28,15 +29,22 @@ defmodule Membrane.Core.Element.StreamFormatController do queue = data.input_queue - if queue && not InputQueue.empty?(queue) do - PadModel.set_data!( - state, - pad_ref, - :input_queue, - InputQueue.store(queue, :stream_format, stream_format) - ) - else - exec_handle_stream_format(pad_ref, stream_format, state) + cond do + # stream format goes to the manual flow control input queue + queue && not InputQueue.empty?(queue) -> + PadModel.set_data!( + state, + pad_ref, + :input_queue, + InputQueue.store(queue, :stream_format, stream_format) + ) + + # stream format goes to the auto flow control queue + pad_ref in state.awaiting_auto_input_pads -> + AutoFlowUtils.store_stream_format_in_queue(pad_ref, stream_format, state) + + true -> + exec_handle_stream_format(pad_ref, stream_format, state) end else pad: {:error, :unknown_pad} -> diff --git a/lib/membrane/element/pad_data.ex b/lib/membrane/element/pad_data.ex index f427edfac..3be09d473 100644 --- a/lib/membrane/element/pad_data.ex +++ b/lib/membrane/element/pad_data.ex @@ -40,6 +40,7 @@ defmodule Membrane.Element.PadData do pid: private_field, other_ref: private_field, input_queue: private_field, + auto_flow_queue: private_field, incoming_demand: integer() | nil, demand_unit: private_field, other_demand_unit: private_field, @@ -59,7 +60,6 @@ defmodule Membrane.Element.PadData do # Contains amount of data (:buffers/:bytes), that has been demanded from the element on the other side of link, but # hasn't arrived yet. Unused for output pads. manual_demand_size: private_field, - associated_pads: private_field, sticky_events: private_field, other_effective_flow_control: private_field, stalker_metrics: private_field @@ -80,6 +80,7 @@ defmodule Membrane.Element.PadData do defstruct @enforce_keys ++ [ input_queue: nil, + auto_flow_queue: Qex.new(), demand: 0, incoming_demand: nil, demand_unit: nil, @@ -89,7 +90,6 @@ defmodule Membrane.Element.PadData do sticky_messages: [], atomic_demand: nil, manual_demand_size: 0, - associated_pads: [], sticky_events: [], stream_format_validation_params: [], other_demand_unit: nil, diff --git a/test/membrane/core/element/action_handler_test.exs b/test/membrane/core/element/action_handler_test.exs index 4c505230d..5a82d92cd 100644 --- a/test/membrane/core/element/action_handler_test.exs +++ b/test/membrane/core/element/action_handler_test.exs @@ -18,7 +18,7 @@ defmodule Membrane.Core.Element.ActionHandlerTest do defp demand_test_filter(_context) do state = - struct(State, + struct!(State, module: Filter, name: :test_name, type: :filter, @@ -47,7 +47,10 @@ defmodule Membrane.Core.Element.ActionHandlerTest do pads_info: %{ input: %{flow_control: :manual}, input_push: %{flow_control: :push} - } + }, + satisfied_auto_output_pads: MapSet.new(), + awaiting_auto_input_pads: MapSet.new(), + popping_auto_flow_queue?: false ) [state: state] @@ -100,7 +103,7 @@ defmodule Membrane.Core.Element.ActionHandlerTest do }) state = - struct(State, + struct!(State, module: TrivialFilter, name: :elem_name, type: :filter, @@ -140,7 +143,10 @@ defmodule Membrane.Core.Element.ActionHandlerTest do pads_info: %{ output: %{flow_control: :push}, input: %{flow_control: :push} - } + }, + satisfied_auto_output_pads: MapSet.new(), + awaiting_auto_input_pads: MapSet.new(), + popping_auto_flow_queue?: false ) [state: state] diff --git a/test/membrane/core/element/atomic_demand_test.exs b/test/membrane/core/element/atomic_demand_test.exs index 0cbd513fd..edf425ecf 100644 --- a/test/membrane/core/element/atomic_demand_test.exs +++ b/test/membrane/core/element/atomic_demand_test.exs @@ -10,7 +10,7 @@ defmodule Membrane.Core.Element.AtomicDemandTest do assert get_atomic_value(atomic_demand) == 10 - atomic_demand = AtomicDemand.decrease(atomic_demand, 15) + assert {{:decreased, -5}, atomic_demand} = AtomicDemand.decrease(atomic_demand, 15) assert atomic_demand.buffered_decrementation == 0 assert get_atomic_value(atomic_demand) == -5 @@ -74,7 +74,7 @@ defmodule Membrane.Core.Element.AtomicDemandTest do atomic_demand = new_atomic_demand(:pull, sleeping_process, self()) :ok = AtomicDemand.set_sender_status(atomic_demand, {:resolved, :push}) - atomic_demand = AtomicDemand.decrease(atomic_demand, 100) + {{:decreased, -100}, atomic_demand} = AtomicDemand.decrease(atomic_demand, 100) refute_receive {:DOWN, ^monitor_ref, :process, _pid, _reason} @@ -88,8 +88,9 @@ defmodule Membrane.Core.Element.AtomicDemandTest do |> Enum.reduce(atomic_demand, fn {sender_status, receiver_status}, atomic_demand -> :ok = AtomicDemand.set_sender_status(atomic_demand, sender_status) :ok = AtomicDemand.set_receiver_status(atomic_demand, receiver_status) - atomic_demand = AtomicDemand.decrease(atomic_demand, 1000) + {_status_update, atomic_demand} = AtomicDemand.decrease(atomic_demand, 1000) + refute atomic_demand.toilet_overflowed? refute_receive {:DOWN, ^monitor_ref, :process, _pid, _reason} atomic_demand @@ -97,8 +98,9 @@ defmodule Membrane.Core.Element.AtomicDemandTest do :ok = AtomicDemand.set_sender_status(atomic_demand, {:resolved, :push}) :ok = AtomicDemand.set_receiver_status(atomic_demand, {:resolved, :pull}) - _atomic_demand = AtomicDemand.decrease(atomic_demand, 1000) + {{:decreased, _atomic_value}, atomic_demand} = AtomicDemand.decrease(atomic_demand, 1000) + assert atomic_demand.toilet_overflowed? assert_receive {:DOWN, ^monitor_ref, :process, _pid, _reason} end @@ -109,19 +111,19 @@ defmodule Membrane.Core.Element.AtomicDemandTest do assert %AtomicDemand{throttling_factor: 150} = atomic_demand - atomic_demand = AtomicDemand.decrease(atomic_demand, 100) + assert {:unchanged, %AtomicDemand{buffered_decrementation: 100} = atomic_demand} = + AtomicDemand.decrease(atomic_demand, 100) - assert %AtomicDemand{buffered_decrementation: 100} = atomic_demand assert get_atomic_value(atomic_demand) == 0 - atomic_demand = AtomicDemand.decrease(atomic_demand, 49) + assert {:unchanged, %AtomicDemand{buffered_decrementation: 149} = atomic_demand} = + AtomicDemand.decrease(atomic_demand, 49) - assert %AtomicDemand{buffered_decrementation: 149} = atomic_demand assert get_atomic_value(atomic_demand) == 0 - atomic_demand = AtomicDemand.decrease(atomic_demand, 51) + assert {{:decreased, -200}, %AtomicDemand{buffered_decrementation: 0} = atomic_demand} = + AtomicDemand.decrease(atomic_demand, 51) - assert %AtomicDemand{buffered_decrementation: 0} = atomic_demand assert get_atomic_value(atomic_demand) == -200 end diff --git a/test/membrane/core/element/event_controller_test.exs b/test/membrane/core/element/event_controller_test.exs index 198d13e4e..356501e6d 100644 --- a/test/membrane/core/element/event_controller_test.exs +++ b/test/membrane/core/element/event_controller_test.exs @@ -43,7 +43,7 @@ defmodule Membrane.Core.Element.EventControllerTest do }) state = - struct(State, + struct!(State, module: MockEventHandlingElement, name: :test_name, type: :filter, @@ -64,7 +64,10 @@ defmodule Membrane.Core.Element.EventControllerTest do input_queue: input_queue, demand: 0 ) - } + }, + satisfied_auto_output_pads: MapSet.new(), + awaiting_auto_input_pads: MapSet.new(), + auto_input_pads: [] ) assert AtomicDemand.get(atomic_demand) > 0 diff --git a/test/membrane/core/element/input_queue_test.exs b/test/membrane/core/element/input_queue_test.exs index 116143637..efa95c490 100644 --- a/test/membrane/core/element/input_queue_test.exs +++ b/test/membrane/core/element/input_queue_test.exs @@ -317,7 +317,7 @@ defmodule Membrane.Core.Element.InputQueueTest do queue = InputQueue.store(queue, [%Buffer{payload: "12345678"}]) queue = InputQueue.store(queue, [%Buffer{payload: "12"}]) queue = InputQueue.store(queue, [%Buffer{payload: "12"}]) - queue = Map.update!(queue, :atomic_demand, &AtomicDemand.decrease(&1, 16)) + queue = Map.update!(queue, :atomic_demand, &elem(AtomicDemand.decrease(&1, 16), 1)) assert queue.size == 16 assert queue.demand == -6 {out, queue} = InputQueue.take(queue, 2) @@ -354,7 +354,7 @@ defmodule Membrane.Core.Element.InputQueueTest do queue = InputQueue.store(queue, [%Buffer{payload: "12345678"}]) queue = InputQueue.store(queue, [%Buffer{payload: "12"}]) queue = InputQueue.store(queue, [%Buffer{payload: "12"}]) - queue = Map.update!(queue, :atomic_demand, &AtomicDemand.decrease(&1, 4)) + queue = Map.update!(queue, :atomic_demand, &elem(AtomicDemand.decrease(&1, 4), 1)) assert queue.size == 4 assert queue.demand == -1 {out, queue} = InputQueue.take(queue, 2) diff --git a/test/membrane/core/element/lifecycle_controller_test.exs b/test/membrane/core/element/lifecycle_controller_test.exs index e2d68faa2..53d06f2cd 100644 --- a/test/membrane/core/element/lifecycle_controller_test.exs +++ b/test/membrane/core/element/lifecycle_controller_test.exs @@ -42,7 +42,7 @@ defmodule Membrane.Core.Element.LifecycleControllerTest do }) state = - struct(State, + struct!(State, module: DummyElement, name: :test_name, type: :filter, @@ -63,7 +63,10 @@ defmodule Membrane.Core.Element.LifecycleControllerTest do input_queue: input_queue, demand: 0 ) - } + }, + satisfied_auto_output_pads: MapSet.new(), + awaiting_auto_input_pads: MapSet.new(), + auto_input_pads: [] ) assert_received Message.new(:atomic_demand_increased, :some_pad) diff --git a/test/membrane/core/element/pad_controller_test.exs b/test/membrane/core/element/pad_controller_test.exs index ad17195e1..e52ea842b 100644 --- a/test/membrane/core/element/pad_controller_test.exs +++ b/test/membrane/core/element/pad_controller_test.exs @@ -15,16 +15,18 @@ defmodule Membrane.Core.Element.PadControllerTest do @module Membrane.Core.Element.PadController defp prepare_state(elem_module, name \\ :element) do - struct(State, + struct!(State, name: name, module: elem_module, - callback_depth_counter: 0, pads_to_snapshot: MapSet.new(), parent_pid: self(), internal_state: %{}, synchronization: %{clock: nil, parent_clock: nil}, subprocess_supervisor: SubprocessSupervisor.start_link!(), - stalker: %Membrane.Core.Stalker{pid: spawn(fn -> :ok end), ets: nil} + stalker: %Membrane.Core.Stalker{pid: spawn(fn -> :ok end), ets: nil}, + satisfied_auto_output_pads: MapSet.new(), + awaiting_auto_input_pads: MapSet.new(), + auto_input_pads: [] ) |> PadSpecHandler.init_pads() end diff --git a/test/membrane/core/element/stream_format_controller_test.exs b/test/membrane/core/element/stream_format_controller_test.exs index 309313887..1235e26a3 100644 --- a/test/membrane/core/element/stream_format_controller_test.exs +++ b/test/membrane/core/element/stream_format_controller_test.exs @@ -35,10 +35,9 @@ defmodule Membrane.Core.Element.StreamFormatControllerTest do }) state = - struct(State, + struct!(State, module: Filter, name: :test_name, - parent: self(), type: :filter, playback: :playing, synchronization: %{clock: nil, parent_clock: nil}, @@ -54,7 +53,9 @@ defmodule Membrane.Core.Element.StreamFormatControllerTest do input_queue: input_queue, demand: 0 ) - } + }, + satisfied_auto_output_pads: MapSet.new(), + awaiting_auto_input_pads: MapSet.new() ) assert_received Message.new(:atomic_demand_increased, :some_pad) diff --git a/test/membrane/integration/auto_demands_test.exs b/test/membrane/integration/auto_demands_test.exs index 18bdd7b71..702f8b1f1 100644 --- a/test/membrane/integration/auto_demands_test.exs +++ b/test/membrane/integration/auto_demands_test.exs @@ -4,9 +4,12 @@ defmodule Membrane.Integration.AutoDemandsTest do import Membrane.ChildrenSpec import Membrane.Testing.Assertions + alias Membrane.Buffer alias Membrane.Testing.{Pipeline, Sink, Source} - defmodule AutoDemandFilter do + require Membrane.Pad, as: Pad + + defmodule ExponentialAutoFilter do use Membrane.Filter def_input_pad :input, accepted_format: _any @@ -35,6 +38,32 @@ defmodule Membrane.Integration.AutoDemandsTest do end end + defmodule NotifyingAutoFilter do + use Membrane.Filter + + def_input_pad :input, accepted_format: _any, availability: :on_request + def_output_pad :output, accepted_format: _any + + @impl true + def handle_playing(_ctx, state), do: {[notify_parent: :playing], state} + + @impl true + def handle_parent_notification(actions, _ctx, state), do: {actions, state} + + @impl true + def handle_buffer(pad, buffer, _ctx, state) do + actions = [ + notify_parent: {:handling_buffer, pad, buffer}, + buffer: {:output, buffer} + ] + + {actions, state} + end + + @impl true + def handle_end_of_stream(_pad, _ctx, state), do: {[], state} + end + defmodule AutoDemandTee do use Membrane.Filter @@ -64,7 +93,7 @@ defmodule Membrane.Integration.AutoDemandsTest do :down -> {mult_payloads, payloads} end - filter = %AutoDemandFilter{factor: factor, direction: direction} + filter = %ExponentialAutoFilter{factor: factor, direction: direction} pipeline = Pipeline.start_link_supervised!( @@ -81,6 +110,8 @@ defmodule Membrane.Integration.AutoDemandsTest do assert_end_of_stream(pipeline, :sink) refute_sink_buffer(pipeline, :sink, _buffer, 0) + + Pipeline.terminate(pipeline) end end) @@ -108,6 +139,8 @@ defmodule Membrane.Integration.AutoDemandsTest do end) refute_sink_buffer(pipeline, :left_sink, %{payload: 25_000}) + + Pipeline.terminate(pipeline) end test "handle removed branch" do @@ -127,6 +160,8 @@ defmodule Membrane.Integration.AutoDemandsTest do assert_sink_buffer(pipeline, :left_sink, buffer) assert buffer.payload == payload end) + + Pipeline.terminate(pipeline) end defmodule NotifyingSink do @@ -174,6 +209,8 @@ defmodule Membrane.Integration.AutoDemandsTest do {:buffer_arrived, %Membrane.Buffer{payload: ^payload}} ) end + + Pipeline.terminate(pipeline) end end) @@ -202,7 +239,7 @@ defmodule Membrane.Integration.AutoDemandsTest do Pipeline.start_link_supervised!( spec: child(:source, PushSource) - |> child(:filter, AutoDemandFilter) + |> child(:filter, ExponentialAutoFilter) |> child(:sink, Sink) ) @@ -230,7 +267,7 @@ defmodule Membrane.Integration.AutoDemandsTest do Pipeline.start_supervised!( spec: child(:source, PushSource) - |> child(:filter, AutoDemandFilter) + |> child(:filter, ExponentialAutoFilter) |> child(:sink, %Sink{autodemand: false}) ) @@ -246,6 +283,163 @@ defmodule Membrane.Integration.AutoDemandsTest do ) end + defp source_definiton(name) do + # Testing.Source fed with such actions generator will produce buffers with incremenal + # sequence of numbers as payloads + actions_generator = + fn counter, _size -> + Process.sleep(1) + + buffer = %Buffer{ + metadata: %{creator: name}, + payload: counter + } + + actions = [buffer: {:output, buffer}, redemand: :output] + {actions, counter + 1} + end + + %Source{output: {1, actions_generator}} + end + + defp setup_pipeline_with_notifying_auto_filter(_context) do + pipeline = + Pipeline.start_link_supervised!( + spec: [ + child({:source, 0}, source_definiton({:source, 0})) + |> via_in(Pad.ref(:input, 0)) + |> child(:filter, NotifyingAutoFilter) + |> child(:sink, %Sink{autodemand: false}), + child({:source, 1}, source_definiton({:source, 1})) + |> via_in(Pad.ref(:input, 1)) + |> get_child(:filter) + ] + ) + + [pipeline: pipeline] + end + + describe "auto flow queue" do + setup :setup_pipeline_with_notifying_auto_filter + + defp receive_processed_buffers(pipeline, limit, acc \\ []) + + defp receive_processed_buffers(_pipeline, limit, acc) when limit <= 0 do + Enum.reverse(acc) + end + + defp receive_processed_buffers(pipeline, limit, acc) do + receive do + {Pipeline, ^pipeline, + {:handle_child_notification, {{:handling_buffer, _pad, buffer}, :filter}}} -> + receive_processed_buffers(pipeline, limit - 1, [buffer | acc]) + after + 500 -> Enum.reverse(acc) + end + end + + test "when there is no demand on the output pad", %{pipeline: pipeline} do + manual_flow_queue_size = 40 + + assert_pipeline_notified(pipeline, :filter, :playing) + + buffers = receive_processed_buffers(pipeline, 100) + assert length(buffers) == manual_flow_queue_size + + demand = 10_000 + Pipeline.message_child(pipeline, :sink, {:make_demand, demand}) + + buffers = receive_processed_buffers(pipeline, 2 * demand) + buffers_number = length(buffers) + + # check if filter processed proper number of buffers + assert demand <= buffers_number + assert buffers_number <= demand + manual_flow_queue_size + + # check if filter processed buffers from both sources + buffers_by_creator = Enum.group_by(buffers, & &1.metadata.creator) + assert Enum.count(buffers_by_creator) == 2 + + # check if filter balanced procesesd buffers by their origin - numbers of + # buffers coming from each source should be similar + counter_0 = Map.fetch!(buffers_by_creator, {:source, 0}) |> length() + counter_1 = Map.fetch!(buffers_by_creator, {:source, 1}) |> length() + sources_ratio = counter_0 / counter_1 + + assert 0.8 < sources_ratio and sources_ratio < 1.2 + + Pipeline.terminate(pipeline) + end + + test "when an element returns :pause_auto_demand and :resume_auto_demand action", %{ + pipeline: pipeline + } do + manual_flow_queue_size = 40 + auto_flow_demand_size = 400 + + assert_pipeline_notified(pipeline, :filter, :playing) + + Pipeline.message_child(pipeline, :filter, pause_auto_demand: Pad.ref(:input, 0)) + + # time for :filter to pause demand on Pad.ref(:input, 0) + Process.sleep(500) + + buffers = receive_processed_buffers(pipeline, 100) + assert length(buffers) == manual_flow_queue_size + + demand = 10_000 + Pipeline.message_child(pipeline, :sink, {:make_demand, demand}) + + # fliter paused auto demand on Pad.ref(:input, 0), so it should receive + # at most auto_flow_demand_size buffers from there and rest of the buffers + # from Pad.ref(:input, 1) + buffers = receive_processed_buffers(pipeline, 2 * demand) + buffers_number = length(buffers) + + assert demand <= buffers_number + assert buffers_number <= demand + manual_flow_queue_size + + buffers_by_creator = Enum.group_by(buffers, & &1.metadata.creator) + counter_0 = Map.get(buffers_by_creator, {:source, 0}, []) |> length() + counter_1 = Map.fetch!(buffers_by_creator, {:source, 1}) |> length() + + # at most auto_flow_demand_size buffers came from {:source, 0} + assert auto_flow_demand_size - manual_flow_queue_size <= counter_0 + assert counter_0 <= auto_flow_demand_size + + # rest of them came from {:source, 1} + assert demand - auto_flow_demand_size <= counter_1 + + Pipeline.message_child(pipeline, :filter, resume_auto_demand: Pad.ref(:input, 0)) + + # time for :filter to resume demand on Pad.ref(:input, 0) + Process.sleep(500) + + Pipeline.message_child(pipeline, :sink, {:make_demand, demand}) + + buffers = receive_processed_buffers(pipeline, 2 * demand) + buffers_number = length(buffers) + + # check if filter processed proper number of buffers + assert demand <= buffers_number + assert buffers_number <= demand + manual_flow_queue_size + + # check if filter processed buffers from both sources + buffers_by_creator = Enum.group_by(buffers, & &1.metadata.creator) + assert Enum.count(buffers_by_creator) == 2 + + # check if filter balanced procesesd buffers by their origin - numbers of + # buffers coming from each source should be similar + counter_0 = Map.fetch!(buffers_by_creator, {:source, 0}) |> length() + counter_1 = Map.fetch!(buffers_by_creator, {:source, 1}) |> length() + sources_ratio = counter_0 / counter_1 + + assert 0.8 < sources_ratio and sources_ratio < 1.2 + + Pipeline.terminate(pipeline) + end + end + defp reduce_link(link, enum, fun) do Enum.reduce(enum, link, &fun.(&2, &1)) end diff --git a/test/membrane/integration/demands_test.exs b/test/membrane/integration/demands_test.exs index ea9880c56..3f3b30b2c 100644 --- a/test/membrane/integration/demands_test.exs +++ b/test/membrane/integration/demands_test.exs @@ -280,9 +280,6 @@ defmodule Membrane.Integration.DemandsTest do assert_sink_playing(pipeline, :sink) - # time for pipeline to start playing - Process.sleep(1000) - for i <- 1..10 do # during sleep below source should send around 100 buffers Process.sleep(100 * RedemandingSource.sleep_time())