Skip to content

Commit

Permalink
Queue buffers when auto demand is low enough (#693)
Browse files Browse the repository at this point in the history
* Implement auto flow queue

* Fix bugs wip

* Fix bugs introduced in recent changes

* Update changelog

* Refactor code related to auto flow queues

* Write tests for auto flow queue

* wip

* wip

* Fix tests wip

* Fix tests wip

* Write more tests for recent changes

* Fix tests wip

* Refactor code

* Refactor wip

* Fix unit tests wip

* Fix unit tests wip

* Fix tests wip

* Fix tests wip

* Fix tests wip

* Small refactor

* Add comments describing, how auto flow queueing works

* Small refactor in structs fields names

* Make membrane fast again wip

* Remove leftovers

* Remove leftovers wip

* Remove leftovers

* Fix CI

* Remove comments

* Refactor auto flow queues mechanism description

* wip

* Revert "wip"

This reverts commit 95b1848.

* Remove inspects

* Impelemnt CR
  • Loading branch information
FelonEkonom authored Feb 26, 2024
1 parent 475c22c commit dc5f653
Show file tree
Hide file tree
Showing 23 changed files with 577 additions and 160 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## 1.0.1
* Specify the order in which state fields will be printed in the error logs. [#614](https://github.com/membraneframework/membrane_core/pull/614)
* Handle buffers from input pads having `flow_control: :auto` only if demand on all output pads having `flow_control: :auto` is positive. [#693](https://github.com/membraneframework/membrane_core/pull/693)
* Fix clock selection [#626](https://github.com/membraneframework/membrane_core/pull/626)
* Log messages in the default handle_info implementation [#680](https://github.com/membraneframework/membrane_core/pull/680)
* Fix typespecs in Membrane.UtilitySupervisor [#681](https://github.com/membraneframework/membrane_core/pull/681)
Expand Down
1 change: 0 additions & 1 deletion lib/membrane/core/child/pad_model.ex
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ defmodule Membrane.Core.Child.PadModel do
input_queue: Membrane.Core.Element.InputQueue.t() | nil,
options: %{optional(atom) => any},
auto_demand_size: pos_integer() | nil,
associated_pads: [Pad.ref()] | nil,
sticky_events: [Membrane.Event.t()],
stalker_metrics: %{atom => :atomics.atomics_ref()}
}
Expand Down
6 changes: 5 additions & 1 deletion lib/membrane/core/element.ex
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,12 @@ defmodule Membrane.Core.Element do
setup_incomplete?: false,
effective_flow_control: :push,
handling_action?: false,
popping_auto_flow_queue?: false,
pads_to_snapshot: MapSet.new(),
stalker: options.stalker
stalker: options.stalker,
satisfied_auto_output_pads: MapSet.new(),
awaiting_auto_input_pads: MapSet.new(),
auto_input_pads: []
}
|> PadSpecHandler.init_pads()

Expand Down
13 changes: 9 additions & 4 deletions lib/membrane/core/element/action_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ defmodule Membrane.Core.Element.ActionHandler do
alias Membrane.Core.Element.{
DemandController,
DemandHandler,
PadController,
State,
StreamFormatController
}

alias Membrane.Core.Element.DemandController.AutoFlowUtils
alias Membrane.Core.{Events, TimerController}
alias Membrane.Element.Action

Expand Down Expand Up @@ -176,7 +176,11 @@ defmodule Membrane.Core.Element.ActionHandler do
_other -> :output
end

pads = state |> PadModel.filter_data(%{direction: dir}) |> Map.keys()
pads =
Enum.flat_map(state.pads_data, fn
{pad_ref, %{direction: ^dir}} -> [pad_ref]
_pad_entry -> []
end)

Enum.reduce(pads, state, fn pad, state ->
action =
Expand Down Expand Up @@ -466,8 +470,9 @@ defmodule Membrane.Core.Element.ActionHandler do
@spec handle_outgoing_event(Pad.ref(), Event.t(), State.t()) :: State.t()
defp handle_outgoing_event(pad_ref, %Events.EndOfStream{}, state) do
with %{direction: :output, end_of_stream?: false} <- PadModel.get_data!(state, pad_ref) do
state = PadController.remove_pad_associations(pad_ref, state)
PadModel.set_data!(state, pad_ref, :end_of_stream?, true)
Map.update!(state, :satisfied_auto_output_pads, &MapSet.delete(&1, pad_ref))
|> PadModel.set_data!(pad_ref, :end_of_stream?, true)
|> AutoFlowUtils.pop_queues_and_bump_demand()
else
%{direction: :input} ->
raise PadDirectionError, action: "end of stream", direction: :input, pad: pad_ref
Expand Down
23 changes: 13 additions & 10 deletions lib/membrane/core/element/atomic_demand.ex
Original file line number Diff line number Diff line change
Expand Up @@ -139,14 +139,14 @@ defmodule Membrane.Core.Element.AtomicDemand do
:ok
end

@spec decrease(t, non_neg_integer()) :: t
@spec decrease(t, non_neg_integer()) :: {{:decreased, integer()}, t} | {:unchanged, t}
def decrease(%__MODULE__{} = atomic_demand, value) do
atomic_demand = Map.update!(atomic_demand, :buffered_decrementation, &(&1 + value))

if atomic_demand.buffered_decrementation >= atomic_demand.throttling_factor do
flush_buffered_decrementation(atomic_demand)
else
atomic_demand
{:unchanged, atomic_demand}
end
end

Expand All @@ -164,14 +164,17 @@ defmodule Membrane.Core.Element.AtomicDemand do

atomic_demand = %{atomic_demand | buffered_decrementation: 0}

if not atomic_demand.toilet_overflowed? and
get_receiver_status(atomic_demand) == {:resolved, :pull} and
get_sender_status(atomic_demand) == {:resolved, :push} and
-1 * atomic_demand_value > atomic_demand.toilet_capacity do
overflow(atomic_demand, atomic_demand_value)
else
atomic_demand
end
atomic_demand =
if not atomic_demand.toilet_overflowed? and
get_receiver_status(atomic_demand) == {:resolved, :pull} and
get_sender_status(atomic_demand) == {:resolved, :push} and
-1 * atomic_demand_value > atomic_demand.toilet_capacity do
overflow(atomic_demand, atomic_demand_value)
else
atomic_demand
end

{{:decreased, atomic_demand_value}, atomic_demand}
end

defp overflow(atomic_demand, atomic_demand_value) do
Expand Down
14 changes: 7 additions & 7 deletions lib/membrane/core/element/buffer_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,12 @@ defmodule Membrane.Core.Element.BufferController do
state = PadModel.set_data!(state, pad_ref, :demand, demand - buf_size)
:atomics.put(stalker_metrics.demand, 1, demand - buf_size)

state = AutoFlowUtils.auto_adjust_atomic_demand(pad_ref, state)
exec_buffer_callback(pad_ref, buffers, state)
if state.effective_flow_control == :pull and MapSet.size(state.satisfied_auto_output_pads) > 0 do
AutoFlowUtils.store_buffers_in_queue(pad_ref, buffers, state)
else
state = exec_buffer_callback(pad_ref, buffers, state)
AutoFlowUtils.auto_adjust_atomic_demand(pad_ref, state)
end
end

defp do_handle_buffer(pad_ref, %{flow_control: :manual} = data, buffers, state) do
Expand All @@ -93,11 +97,7 @@ defmodule Membrane.Core.Element.BufferController do
@doc """
Executes `handle_buffer` callback.
"""
@spec exec_buffer_callback(
Pad.ref(),
[Buffer.t()] | Buffer.t(),
State.t()
) :: State.t()
@spec exec_buffer_callback(Pad.ref(), [Buffer.t()], State.t()) :: State.t()
def exec_buffer_callback(pad_ref, buffers, %State{type: :filter} = state) do
Telemetry.report_metric("buffer", 1, inspect(pad_ref))

Expand Down
30 changes: 20 additions & 10 deletions lib/membrane/core/element/demand_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,20 @@ defmodule Membrane.Core.Element.DemandController do

@spec snapshot_atomic_demand(Pad.ref(), State.t()) :: State.t()
def snapshot_atomic_demand(pad_ref, state) do
with {:ok, pad_data} <- PadModel.get_data(state, pad_ref),
with {:ok, pad_data} when not pad_data.end_of_stream? <- PadModel.get_data(state, pad_ref),
%State{playback: :playing} <- state do
if pad_data.direction == :input,
do: raise("cannot snapshot atomic counter in input pad")

do_snapshot_atomic_demand(pad_data, state)
else
{:ok, %{end_of_stream?: true}} ->
Membrane.Logger.debug_verbose(
"Skipping snapshot of pad #{inspect(pad_ref)}, because it has flag :end_of_stream? set to true"
)

state

{:error, :unknown_pad} ->
# We've got a :atomic_demand_increased message on already unlinked pad
state
Expand All @@ -43,13 +50,10 @@ defmodule Membrane.Core.Element.DemandController do
%{flow_control: :auto} = pad_data,
%{effective_flow_control: :pull} = state
) do
%{
atomic_demand: atomic_demand,
associated_pads: associated_pads
} = pad_data

if AtomicDemand.get(atomic_demand) > 0 do
AutoFlowUtils.auto_adjust_atomic_demand(associated_pads, state)
if AtomicDemand.get(pad_data.atomic_demand) > 0 do
state
|> Map.update!(:satisfied_auto_output_pads, &MapSet.delete(&1, pad_data.ref))
|> AutoFlowUtils.pop_queues_and_bump_demand()
else
state
end
Expand Down Expand Up @@ -91,9 +95,15 @@ defmodule Membrane.Core.Element.DemandController do
buffers_size = Buffer.Metric.from_unit(pad_data.demand_unit).buffers_size(buffers)

demand = pad_data.demand - buffers_size
atomic_demand = AtomicDemand.decrease(pad_data.atomic_demand, buffers_size)
{decrease_result, atomic_demand} = AtomicDemand.decrease(pad_data.atomic_demand, buffers_size)

PadModel.set_data!(state, pad_ref, %{
with {:decreased, new_value} when new_value <= 0 <- decrease_result,
%{flow_control: :auto} <- pad_data do
Map.update!(state, :satisfied_auto_output_pads, &MapSet.put(&1, pad_ref))
else
_other -> state
end
|> PadModel.set_data!(pad_ref, %{
pad_data
| demand: demand,
atomic_demand: atomic_demand
Expand Down
Loading

0 comments on commit dc5f653

Please sign in to comment.