Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Queue buffers when auto demand is low enough #693

Merged
merged 39 commits into from
Feb 26, 2024
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
00794c3
Implement auto flow queue
FelonEkonom Oct 20, 2023
9be7be2
Fix bugs wip
FelonEkonom Oct 20, 2023
4188729
Fix bugs introduced in recent changes
FelonEkonom Oct 27, 2023
e49d57e
Merge remote-tracking branch 'origin/master' into queue-buffers-when-…
FelonEkonom Oct 27, 2023
0958822
Update changelog
FelonEkonom Oct 27, 2023
b075c70
Refactor code related to auto flow queues
FelonEkonom Oct 27, 2023
99396d4
Write tests for auto flow queue
FelonEkonom Oct 30, 2023
6c1da19
Merge remote-tracking branch 'origin/master' into queue-buffers-when-…
FelonEkonom Oct 30, 2023
b66662d
Merge remote-tracking branch 'origin/master' into queue-buffers-when-…
FelonEkonom Nov 2, 2023
2430810
wip
FelonEkonom Dec 19, 2023
6d56f0c
wip
FelonEkonom Jan 5, 2024
e32c255
Fix tests wip
FelonEkonom Jan 8, 2024
5e5f5b3
Fix tests wip
FelonEkonom Jan 11, 2024
c65b2bb
Write more tests for recent changes
FelonEkonom Jan 12, 2024
242d25f
Fix tests wip
FelonEkonom Jan 12, 2024
de2fc22
Refactor code
FelonEkonom Jan 15, 2024
16f4d4b
Refactor wip
FelonEkonom Jan 15, 2024
2391f09
Fix unit tests wip
FelonEkonom Jan 15, 2024
08a7f4d
Fix unit tests wip
FelonEkonom Jan 15, 2024
3725afe
Fix tests wip
FelonEkonom Jan 16, 2024
7a5c71a
Fix tests wip
FelonEkonom Jan 16, 2024
b7e79b4
Merge remote-tracking branch 'origin/master' into queue-buffers-when-…
FelonEkonom Jan 16, 2024
3b9fdcb
Fix tests wip
FelonEkonom Jan 16, 2024
2cdd49e
Small refactor
FelonEkonom Jan 16, 2024
379871c
Add comments describing, how auto flow queueing works
FelonEkonom Jan 22, 2024
9018e3c
Small refactor in structs fields names
FelonEkonom Jan 31, 2024
e1aadcc
Make membrane fast again wip
FelonEkonom Feb 8, 2024
9e85059
Remove leftovers
FelonEkonom Feb 9, 2024
12c1273
Remove leftovers wip
FelonEkonom Feb 9, 2024
ca1b221
Remove leftovers
FelonEkonom Feb 9, 2024
7032d9b
Fix CI
FelonEkonom Feb 9, 2024
b471d29
Remove comments
FelonEkonom Feb 9, 2024
01728df
Merge remote-tracking branch 'origin/master' into queue-buffers-when-…
FelonEkonom Feb 9, 2024
4f02735
Refactor auto flow queues mechanism description
FelonEkonom Feb 9, 2024
95b1848
wip
FelonEkonom Feb 12, 2024
6ca21dd
Revert "wip"
FelonEkonom Feb 13, 2024
2720120
Remove inspects
FelonEkonom Feb 13, 2024
c929433
Impelemnt CR
FelonEkonom Feb 26, 2024
9313f2c
Merge remote-tracking branch 'origin/master' into queue-buffers-when-…
FelonEkonom Feb 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## 1.0.1
* Specify the order in which state fields will be printed in the error logs. [#614](https://github.com/membraneframework/membrane_core/pull/614)
* Handle buffers from input pads having `flow_control: :auto` only if demand on all output pads having `flow_control: :auto` is positive. [#693](https://github.com/membraneframework/membrane_core/pull/693)
* Fix clock selection [#626](https://github.com/membraneframework/membrane_core/pull/626)
* Log messages in the default handle_info implementation [#680](https://github.com/membraneframework/membrane_core/pull/680)
* Fix typespecs in Membrane.UtilitySupervisor [#681](https://github.com/membraneframework/membrane_core/pull/681)
Expand Down
1 change: 0 additions & 1 deletion lib/membrane/core/child/pad_model.ex
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ defmodule Membrane.Core.Child.PadModel do
input_queue: Membrane.Core.Element.InputQueue.t() | nil,
options: %{optional(atom) => any},
auto_demand_size: pos_integer() | nil,
associated_pads: [Pad.ref()] | nil,
sticky_events: [Membrane.Event.t()],
stalker_metrics: %{atom => :atomics.atomics_ref()}
}
Expand Down
5 changes: 4 additions & 1 deletion lib/membrane/core/element.ex
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,11 @@ defmodule Membrane.Core.Element do
setup_incomplete?: false,
effective_flow_control: :push,
handling_action?: false,
popping_queue?: false,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
popping_queue?: false,
popping_auto_flow_queue?: false,

pads_to_snapshot: MapSet.new(),
stalker: options.stalker
stalker: options.stalker,
satisfied_auto_output_pads: MapSet.new(),
awaiting_auto_input_pads: MapSet.new()
}
|> PadSpecHandler.init_pads()

Expand Down
7 changes: 4 additions & 3 deletions lib/membrane/core/element/action_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ defmodule Membrane.Core.Element.ActionHandler do
alias Membrane.Core.Element.{
DemandController,
DemandHandler,
PadController,
State,
StreamFormatController
}

alias Membrane.Core.Element.DemandController.AutoFlowUtils
alias Membrane.Core.{Events, TimerController}
alias Membrane.Element.Action

Expand Down Expand Up @@ -466,8 +466,9 @@ defmodule Membrane.Core.Element.ActionHandler do
@spec handle_outgoing_event(Pad.ref(), Event.t(), State.t()) :: State.t()
defp handle_outgoing_event(pad_ref, %Events.EndOfStream{}, state) do
with %{direction: :output, end_of_stream?: false} <- PadModel.get_data!(state, pad_ref) do
state = PadController.remove_pad_associations(pad_ref, state)
PadModel.set_data!(state, pad_ref, :end_of_stream?, true)
Map.update!(state, :satisfied_auto_output_pads, &MapSet.delete(&1, pad_ref))
|> PadModel.set_data!(pad_ref, :end_of_stream?, true)
|> AutoFlowUtils.pop_queues_and_bump_demand()
Comment on lines +473 to +475
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pipe raw value

Suggested change
Map.update!(state, :satisfied_auto_output_pads, &MapSet.delete(&1, pad_ref))
|> PadModel.set_data!(pad_ref, :end_of_stream?, true)
|> AutoFlowUtils.pop_queues_and_bump_demand()
state
|> Map.update!(:satisfied_auto_output_pads, &MapSet.delete(&1, pad_ref))
|> PadModel.set_data!(pad_ref, :end_of_stream?, true)
|> AutoFlowUtils.pop_queues_and_bump_demand()

else
%{direction: :input} ->
raise PadDirectionError, action: "end of stream", direction: :input, pad: pad_ref
Expand Down
23 changes: 13 additions & 10 deletions lib/membrane/core/element/atomic_demand.ex
Original file line number Diff line number Diff line change
Expand Up @@ -139,14 +139,14 @@ defmodule Membrane.Core.Element.AtomicDemand do
:ok
end

@spec decrease(t, non_neg_integer()) :: t
@spec decrease(t, non_neg_integer()) :: {{:decreased, integer()}, t} | {:unchanged, t}
def decrease(%__MODULE__{} = atomic_demand, value) do
atomic_demand = Map.update!(atomic_demand, :buffered_decrementation, &(&1 + value))

if atomic_demand.buffered_decrementation >= atomic_demand.throttling_factor do
flush_buffered_decrementation(atomic_demand)
else
atomic_demand
{:unchanged, atomic_demand}
end
end

Expand All @@ -164,14 +164,17 @@ defmodule Membrane.Core.Element.AtomicDemand do

atomic_demand = %{atomic_demand | buffered_decrementation: 0}

if not atomic_demand.toilet_overflowed? and
get_receiver_status(atomic_demand) == {:resolved, :pull} and
get_sender_status(atomic_demand) == {:resolved, :push} and
-1 * atomic_demand_value > atomic_demand.toilet_capacity do
overflow(atomic_demand, atomic_demand_value)
else
atomic_demand
end
atomic_demand =
if not atomic_demand.toilet_overflowed? and
get_receiver_status(atomic_demand) == {:resolved, :pull} and
get_sender_status(atomic_demand) == {:resolved, :push} and
-1 * atomic_demand_value > atomic_demand.toilet_capacity do
overflow(atomic_demand, atomic_demand_value)
else
atomic_demand
end
Comment on lines +167 to +175
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Little suggestion, to break this conditional block;

# in the function
atomic_demand = handle_atomic_demand(atomic_demand, atomic_demand_value)
{{:decreased, atomic_demand_value}, atomic_demand}

# in the code abyss...

@spec handle_atomic_demand... 
defp handle_atomic_demand(atomic_demand, atomic_demand_value) do
  if should_overflow?(atomic_demand, atomic_demand_value) do
    overflow_demand(atomic_demand, atomic_demand_value)
  else
    atomic_demand
  end
end

@spec should_overflow?... 
# maybe it would be worth adding doc here?
defp should_overflow?(demand, value) do
   not demand.overflowed? and
    resolved_pull?(demand) and
    resolved_push?(demand) and
    exceeds_capacity?(value, demand.capacity)
end

@spec ..
defp resolved_pull?(demand), do: get_receiver_status(demand) == {:resolved, :pull}
defp resolved_push?(demand), do: get_sender_status(demand) == {:resolved, :push}
defp exceeds_capacity?(value, capacity), do: -1 * value > capacity


{{:decreased, atomic_demand_value}, atomic_demand}
end

defp overflow(atomic_demand, atomic_demand_value) do
Expand Down
19 changes: 12 additions & 7 deletions lib/membrane/core/element/buffer_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,17 @@ defmodule Membrane.Core.Element.BufferController do
state = PadModel.set_data!(state, pad_ref, :demand, demand - buf_size)
:atomics.put(stalker_metrics.demand, 1, demand - buf_size)

state = AutoFlowUtils.auto_adjust_atomic_demand(pad_ref, state)
exec_buffer_callback(pad_ref, buffers, state)
if state.effective_flow_control == :pull and MapSet.size(state.satisfied_auto_output_pads) > 0 do
AutoFlowUtils.store_buffers_in_queue(pad_ref, buffers, state)
else
if MapSet.member?(state.awaiting_auto_input_pads, pad_ref) or
PadModel.get_data!(state, pad_ref, [:auto_flow_queue]) != Qex.new() do
raise "cannot execute handle_buffer callback for an awaiting input pad"
end
mat-hek marked this conversation as resolved.
Show resolved Hide resolved

state = exec_buffer_callback(pad_ref, buffers, state)
AutoFlowUtils.auto_adjust_atomic_demand(pad_ref, state)
end
end

defp do_handle_buffer(pad_ref, %{flow_control: :manual} = data, buffers, state) do
Expand All @@ -93,11 +102,7 @@ defmodule Membrane.Core.Element.BufferController do
@doc """
Executes `handle_buffer` callback.
"""
@spec exec_buffer_callback(
Pad.ref(),
[Buffer.t()] | Buffer.t(),
State.t()
) :: State.t()
@spec exec_buffer_callback(Pad.ref(), [Buffer.t()], State.t()) :: State.t()
def exec_buffer_callback(pad_ref, buffers, %State{type: :filter} = state) do
Telemetry.report_metric("buffer", 1, inspect(pad_ref))

Expand Down
30 changes: 20 additions & 10 deletions lib/membrane/core/element/demand_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,20 @@ defmodule Membrane.Core.Element.DemandController do

@spec snapshot_atomic_demand(Pad.ref(), State.t()) :: State.t()
def snapshot_atomic_demand(pad_ref, state) do
with {:ok, pad_data} <- PadModel.get_data(state, pad_ref),
with {:ok, pad_data} when not pad_data.end_of_stream? <- PadModel.get_data(state, pad_ref),
%State{playback: :playing} <- state do
if pad_data.direction == :input,
do: raise("cannot snapshot atomic counter in input pad")

do_snapshot_atomic_demand(pad_data, state)
else
{:ok, %{end_of_stream?: true}} ->
Membrane.Logger.debug_verbose(
"Skipping snapshot of pad #{inspect(pad_ref)}, because it has flag :end_of_stream? set to true"
)

state

{:error, :unknown_pad} ->
# We've got a :atomic_demand_increased message on already unlinked pad
state
Expand All @@ -43,13 +50,10 @@ defmodule Membrane.Core.Element.DemandController do
%{flow_control: :auto} = pad_data,
%{effective_flow_control: :pull} = state
) do
%{
atomic_demand: atomic_demand,
associated_pads: associated_pads
} = pad_data

if AtomicDemand.get(atomic_demand) > 0 do
AutoFlowUtils.auto_adjust_atomic_demand(associated_pads, state)
if AtomicDemand.get(pad_data.atomic_demand) > 0 do
state
|> Map.update!(:satisfied_auto_output_pads, &MapSet.delete(&1, pad_data.ref))
|> AutoFlowUtils.pop_queues_and_bump_demand()
else
state
end
Expand Down Expand Up @@ -91,9 +95,15 @@ defmodule Membrane.Core.Element.DemandController do
buffers_size = Buffer.Metric.from_unit(pad_data.demand_unit).buffers_size(buffers)

demand = pad_data.demand - buffers_size
atomic_demand = AtomicDemand.decrease(pad_data.atomic_demand, buffers_size)
{decrease_result, atomic_demand} = AtomicDemand.decrease(pad_data.atomic_demand, buffers_size)

PadModel.set_data!(state, pad_ref, %{
with {:decreased, new_value} when new_value <= 0 <- decrease_result,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What 0 represent? Could it be dumped into module attr, to avoid magic numbers?

%{flow_control: :auto} <- pad_data do
Map.update!(state, :satisfied_auto_output_pads, &MapSet.put(&1, pad_ref))
else
_other -> state
end
|> PadModel.set_data!(pad_ref, %{
pad_data
| demand: demand,
atomic_demand: atomic_demand
Expand Down
Loading
Loading