Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Queue buffers when auto demand is low enough #693

Merged
merged 39 commits into from
Feb 26, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
00794c3
Implement auto flow queue
FelonEkonom Oct 20, 2023
9be7be2
Fix bugs wip
FelonEkonom Oct 20, 2023
4188729
Fix bugs introduced in recent changes
FelonEkonom Oct 27, 2023
e49d57e
Merge remote-tracking branch 'origin/master' into queue-buffers-when-…
FelonEkonom Oct 27, 2023
0958822
Update changelog
FelonEkonom Oct 27, 2023
b075c70
Refactor code related to auto flow queues
FelonEkonom Oct 27, 2023
99396d4
Write tests for auto flow queue
FelonEkonom Oct 30, 2023
6c1da19
Merge remote-tracking branch 'origin/master' into queue-buffers-when-…
FelonEkonom Oct 30, 2023
b66662d
Merge remote-tracking branch 'origin/master' into queue-buffers-when-…
FelonEkonom Nov 2, 2023
2430810
wip
FelonEkonom Dec 19, 2023
6d56f0c
wip
FelonEkonom Jan 5, 2024
e32c255
Fix tests wip
FelonEkonom Jan 8, 2024
5e5f5b3
Fix tests wip
FelonEkonom Jan 11, 2024
c65b2bb
Write more tests for recent changes
FelonEkonom Jan 12, 2024
242d25f
Fix tests wip
FelonEkonom Jan 12, 2024
de2fc22
Refactor code
FelonEkonom Jan 15, 2024
16f4d4b
Refactor wip
FelonEkonom Jan 15, 2024
2391f09
Fix unit tests wip
FelonEkonom Jan 15, 2024
08a7f4d
Fix unit tests wip
FelonEkonom Jan 15, 2024
3725afe
Fix tests wip
FelonEkonom Jan 16, 2024
7a5c71a
Fix tests wip
FelonEkonom Jan 16, 2024
b7e79b4
Merge remote-tracking branch 'origin/master' into queue-buffers-when-…
FelonEkonom Jan 16, 2024
3b9fdcb
Fix tests wip
FelonEkonom Jan 16, 2024
2cdd49e
Small refactor
FelonEkonom Jan 16, 2024
379871c
Add comments describing, how auto flow queueing works
FelonEkonom Jan 22, 2024
9018e3c
Small refactor in structs fields names
FelonEkonom Jan 31, 2024
e1aadcc
Make membrane fast again wip
FelonEkonom Feb 8, 2024
9e85059
Remove leftovers
FelonEkonom Feb 9, 2024
12c1273
Remove leftovers wip
FelonEkonom Feb 9, 2024
ca1b221
Remove leftovers
FelonEkonom Feb 9, 2024
7032d9b
Fix CI
FelonEkonom Feb 9, 2024
b471d29
Remove comments
FelonEkonom Feb 9, 2024
01728df
Merge remote-tracking branch 'origin/master' into queue-buffers-when-…
FelonEkonom Feb 9, 2024
4f02735
Refactor auto flow queues mechanism description
FelonEkonom Feb 9, 2024
95b1848
wip
FelonEkonom Feb 12, 2024
6ca21dd
Revert "wip"
FelonEkonom Feb 13, 2024
2720120
Remove inspects
FelonEkonom Feb 13, 2024
c929433
Impelemnt CR
FelonEkonom Feb 26, 2024
9313f2c
Merge remote-tracking branch 'origin/master' into queue-buffers-when-…
FelonEkonom Feb 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions lib/membrane/core/element/demand_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,9 @@ defmodule Membrane.Core.Element.DemandController do
%{effective_flow_control: :pull} = state
) do
if AtomicDemand.get(pad_data.atomic_demand) > 0 do
state = Map.update!(state, :satisfied_auto_output_pads, &MapSet.delete(&1, pad_data.ref))
AutoFlowUtils.pop_queues_and_bump_demand(state)
state
|> Map.update!(:satisfied_auto_output_pads, &MapSet.delete(&1, pad_data.ref))
|> AutoFlowUtils.pop_queues_and_bump_demand()
else
state
end
Expand Down
72 changes: 69 additions & 3 deletions lib/membrane/core/element/demand_controller/auto_flow_utils.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,72 @@ defmodule Membrane.Core.Element.DemandController.AutoFlowUtils do
require Membrane.Logger
require Membrane.Pad, as: Pad

# Description of the auto flow control queueing mechanism

# General concept: Buffers coming to auto input pads should be handled only if
# all auto output pads have positive demand. Buffers arriving when any of the auto
# output pads has negative demand should be queued and only processed when the
# demand everywhere is positive

# Fields in Element state, that take important part in this mechanism:
# - satisfied_auto_output_pads - MapSet of auto output pads, whose demand is less than or equal to 0.
# We consider only pads with the end_of_stream? flag set to false
# - awaiting_auto_input_pads - MapSet of auto input pads, which have a non-empty auto_flow_queue
# - popping_queue? - a flag determining whether we are on the stack somewhere above popping a queue.
# It's used to avoid situations where the function that pops from the queue calls itself multiple times,
# what could potentially lead to things like altering the order of sent buffers.

# Each auto input pad in PadData contains a queue in the :auto_flow_queue field, in which it stores queued
# buffers, events and stream formats. If queue is non-empty, corresponding pad_ref should be
# in the Mapset awaiting_auto_input_pads in element state

# The introduced mechanism consists of two parts, the pseudocode for which is included below

# def onBufferArrivedInMessage() do
FelonEkonom marked this conversation as resolved.
Show resolved Hide resolved
# if element uncorked do
# exec handle_buffer
# else
# store buffer in queue
# end
# end

# def onUncorck() do
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it called pop_queues_and_bump_demand in the real code?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is quite close, but to be precise, it works like:

def pop_queues_and_bump_demand() do
  if uncorcked do
    onUncorck()
  end
end

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I'd adjust it to reflect the names from the code, it would be easier to follow

# # EFC means `effective flow control`

# if EFC == pull do
# bump demand on auto input pads with an empty queue
# end

# while (output demand positive or EFC == push) and some queues are not empty do
# pop random queue and handle its head
# end

# if EFC == pull do
# bump demand on auto input pads with an empty queue
# end
# end

# An Element is `corked` when its effective flow control is :pull and it has an auto output pad,
# who's demand is non-positive

# The following events can make the element shift from `corked` state to `uncorked` state:
# - change of effective flow control from :pull to :push
# - increase in the value of auto output pad demand. We check the demand value:
# - after sending the buffer to a given output pad
# - after receiving a message :atomic_demand_increased from the next element
# - unlinking the auto output pad
# - sending an EOS to the auto output pad
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd move this part higher, maybe just after the general concept part. Also, for completeness, I'd mention what makes the element corked. It seems it's not taken into account in the pseudocode as well.

Copy link
Member Author

@FelonEkonom FelonEkonom Feb 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, because on transition from corcked to uncorcked nothing special happens, until the buffer arrival - then the element processes it immediately, omitting auto_flow_queue

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd mention it anyway


# In addition, an invariant is maintained, which is that the head of all non-empty
# auto_flow_queue queues contains a buffer (the queue can also contain events and
# stream formats). After popping a queue
# of a given pad, if it has an event or stream format in its head, we pop it further,
# until it becomes empty or a buffer is encountered.

# auto_flow_queues hold single buffers, event if they arrive to the element in batch, because if we
# done otherwise, we would have to handle whole batch after popping it from the queue, even if demand
# of all output pads would be satisfied after handling first buffer

defguardp is_input_auto_pad_data(pad_data)
when is_map(pad_data) and is_map_key(pad_data, :flow_control) and
pad_data.flow_control == :auto and is_map_key(pad_data, :direction) and
Expand Down Expand Up @@ -68,9 +134,9 @@ defmodule Membrane.Core.Element.DemandController.AutoFlowUtils do

@spec store_buffers_in_queue(Pad.ref(), [Buffer.t()], State.t()) :: State.t()
def store_buffers_in_queue(pad_ref, buffers, state) do
state = Map.update!(state, :awaiting_auto_input_pads, &MapSet.put(&1, pad_ref))

PadModel.update_data!(state, pad_ref, :auto_flow_queue, fn queue ->
state
|> Map.update!(:awaiting_auto_input_pads, &MapSet.put(&1, pad_ref))
|> PadModel.update_data!(pad_ref, :auto_flow_queue, fn queue ->
Enum.reduce(buffers, queue, fn buffer, queue ->
Qex.push(queue, {:buffer, buffer})
end)
Expand Down
Loading