Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Queue buffers when auto demand is low enough #693

Merged
merged 39 commits into from
Feb 26, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
00794c3
Implement auto flow queue
FelonEkonom Oct 20, 2023
9be7be2
Fix bugs wip
FelonEkonom Oct 20, 2023
4188729
Fix bugs introduced in recent changes
FelonEkonom Oct 27, 2023
e49d57e
Merge remote-tracking branch 'origin/master' into queue-buffers-when-…
FelonEkonom Oct 27, 2023
0958822
Update changelog
FelonEkonom Oct 27, 2023
b075c70
Refactor code related to auto flow queues
FelonEkonom Oct 27, 2023
99396d4
Write tests for auto flow queue
FelonEkonom Oct 30, 2023
6c1da19
Merge remote-tracking branch 'origin/master' into queue-buffers-when-…
FelonEkonom Oct 30, 2023
b66662d
Merge remote-tracking branch 'origin/master' into queue-buffers-when-…
FelonEkonom Nov 2, 2023
2430810
wip
FelonEkonom Dec 19, 2023
6d56f0c
wip
FelonEkonom Jan 5, 2024
e32c255
Fix tests wip
FelonEkonom Jan 8, 2024
5e5f5b3
Fix tests wip
FelonEkonom Jan 11, 2024
c65b2bb
Write more tests for recent changes
FelonEkonom Jan 12, 2024
242d25f
Fix tests wip
FelonEkonom Jan 12, 2024
de2fc22
Refactor code
FelonEkonom Jan 15, 2024
16f4d4b
Refactor wip
FelonEkonom Jan 15, 2024
2391f09
Fix unit tests wip
FelonEkonom Jan 15, 2024
08a7f4d
Fix unit tests wip
FelonEkonom Jan 15, 2024
3725afe
Fix tests wip
FelonEkonom Jan 16, 2024
7a5c71a
Fix tests wip
FelonEkonom Jan 16, 2024
b7e79b4
Merge remote-tracking branch 'origin/master' into queue-buffers-when-…
FelonEkonom Jan 16, 2024
3b9fdcb
Fix tests wip
FelonEkonom Jan 16, 2024
2cdd49e
Small refactor
FelonEkonom Jan 16, 2024
379871c
Add comments describing, how auto flow queueing works
FelonEkonom Jan 22, 2024
9018e3c
Small refactor in structs fields names
FelonEkonom Jan 31, 2024
e1aadcc
Make membrane fast again wip
FelonEkonom Feb 8, 2024
9e85059
Remove leftovers
FelonEkonom Feb 9, 2024
12c1273
Remove leftovers wip
FelonEkonom Feb 9, 2024
ca1b221
Remove leftovers
FelonEkonom Feb 9, 2024
7032d9b
Fix CI
FelonEkonom Feb 9, 2024
b471d29
Remove comments
FelonEkonom Feb 9, 2024
01728df
Merge remote-tracking branch 'origin/master' into queue-buffers-when-…
FelonEkonom Feb 9, 2024
4f02735
Refactor auto flow queues mechanism description
FelonEkonom Feb 9, 2024
95b1848
wip
FelonEkonom Feb 12, 2024
6ca21dd
Revert "wip"
FelonEkonom Feb 13, 2024
2720120
Remove inspects
FelonEkonom Feb 13, 2024
c929433
Impelemnt CR
FelonEkonom Feb 26, 2024
9313f2c
Merge remote-tracking branch 'origin/master' into queue-buffers-when-…
FelonEkonom Feb 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/actions/add_pr_to_smackore_board/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ runs:
export STATUS_FIELD_ID=PVTSSF_lADOAYE_z84AWEIBzgOGd1k
export TARGET_COLUMN_ID=e6b1ee10

export AUTHOR_ORIGIN=$(curl --request GET --url "https://api.github.com/orgs/membraneframework/members" --header "Authorization: Bearer $GH_TOKEN" -s | python scripts/python/get_author_origin.py $AUTHOR_LOGIN)
export AUTHOR_ORIGIN=$(curl --request GET --url "https://api.github.com/orgs/membraneframework/members" --header "Authorization: Bearer $GH_TOKEN" -s | python scripts/python/get_author_origin.py "$AUTHOR_LOGIN")

if [ "$AUTHOR_ORIGIN" == "COMMUNITY" ]
then
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## 1.0.1
* Specify the order in which state fields will be printed in the error logs. [#614](https://github.com/membraneframework/membrane_core/pull/614)
* Handle buffers, only if demand on input pad with `flow_control: :auto` is non-negative. [#654](https://github.com/membraneframework/membrane_core/pull/654)
* Fix clock selection [#626](https://github.com/membraneframework/membrane_core/pull/626)

## 1.0.0
Expand Down
4 changes: 3 additions & 1 deletion lib/membrane/core/element.ex
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,9 @@ defmodule Membrane.Core.Element do
effective_flow_control: :push,
handling_action?: false,
pads_to_snapshot: MapSet.new(),
stalker: options.stalker
stalker: options.stalker,
satisfied_auto_output_pads: MapSet.new(),
awaiting_auto_input_pads: MapSet.new()
}
|> PadSpecHandler.init_pads()

Expand Down
23 changes: 13 additions & 10 deletions lib/membrane/core/element/atomic_demand.ex
Original file line number Diff line number Diff line change
Expand Up @@ -139,14 +139,14 @@ defmodule Membrane.Core.Element.AtomicDemand do
:ok
end

@spec decrease(t, non_neg_integer()) :: t
@spec decrease(t, non_neg_integer()) :: {{:decreased, integer()}, t} | {:unchanged, t}
def decrease(%__MODULE__{} = atomic_demand, value) do
atomic_demand = Map.update!(atomic_demand, :buffered_decrementation, &(&1 + value))

if atomic_demand.buffered_decrementation >= atomic_demand.throttling_factor do
flush_buffered_decrementation(atomic_demand)
else
atomic_demand
{:unchanged, atomic_demand}
end
end

Expand All @@ -164,14 +164,17 @@ defmodule Membrane.Core.Element.AtomicDemand do

atomic_demand = %{atomic_demand | buffered_decrementation: 0}

if not atomic_demand.toilet_overflowed? and
get_receiver_status(atomic_demand) == {:resolved, :pull} and
get_sender_status(atomic_demand) == {:resolved, :push} and
-1 * atomic_demand_value > atomic_demand.toilet_capacity do
overflow(atomic_demand, atomic_demand_value)
else
atomic_demand
end
atomic_demand =
if not atomic_demand.toilet_overflowed? and
get_receiver_status(atomic_demand) == {:resolved, :pull} and
get_sender_status(atomic_demand) == {:resolved, :push} and
-1 * atomic_demand_value > atomic_demand.toilet_capacity do
overflow(atomic_demand, atomic_demand_value)
else
atomic_demand
end
Comment on lines +167 to +175
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Little suggestion, to break this conditional block;

# in the function
atomic_demand = handle_atomic_demand(atomic_demand, atomic_demand_value)
{{:decreased, atomic_demand_value}, atomic_demand}

# in the code abyss...

@spec handle_atomic_demand... 
defp handle_atomic_demand(atomic_demand, atomic_demand_value) do
  if should_overflow?(atomic_demand, atomic_demand_value) do
    overflow_demand(atomic_demand, atomic_demand_value)
  else
    atomic_demand
  end
end

@spec should_overflow?... 
# maybe it would be worth adding doc here?
defp should_overflow?(demand, value) do
   not demand.overflowed? and
    resolved_pull?(demand) and
    resolved_push?(demand) and
    exceeds_capacity?(value, demand.capacity)
end

@spec ..
defp resolved_pull?(demand), do: get_receiver_status(demand) == {:resolved, :pull}
defp resolved_push?(demand), do: get_sender_status(demand) == {:resolved, :push}
defp exceeds_capacity?(value, capacity), do: -1 * value > capacity


{{:decreased, atomic_demand_value}, atomic_demand}
end

defp overflow(atomic_demand, atomic_demand_value) do
Expand Down
16 changes: 14 additions & 2 deletions lib/membrane/core/element/buffer_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,20 @@ defmodule Membrane.Core.Element.BufferController do
state = PadModel.set_data!(state, pad_ref, :demand, demand - buf_size)
:atomics.put(stalker_metrics.demand, 1, demand - buf_size)

state = AutoFlowUtils.auto_adjust_atomic_demand(pad_ref, state)
exec_buffer_callback(pad_ref, buffers, state)
if state.effective_flow_control == :pull and MapSet.size(state.satisfied_auto_output_pads) > 0 do
AutoFlowUtils.store_buffers_in_queue(pad_ref, buffers, state)
else
if pad_ref in state.awaiting_auto_input_pads do
raise "to nie powinno sie zdarzyc dupa 1"
end

if PadModel.get_data!(state, pad_ref, [:auto_flow_queue]) != Qex.new() do
raise "to nie powinno sie zdarzyc dupa 2"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Love it #dupa

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🇵🇱 🇵🇱 🇵🇱 This is still a draft for a reason 🇵🇱 🇵🇱 🇵🇱

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@FelonEkonom out of curiosity - may I participate in the code review process? 😄

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I will mark this PR as Ready for review, when I fix bugs causing tests to fail

end

state = exec_buffer_callback(pad_ref, buffers, state)
AutoFlowUtils.auto_adjust_atomic_demand(pad_ref, state)
end
end

defp do_handle_buffer(pad_ref, %{flow_control: :manual} = data, buffers, state) do
Expand Down
28 changes: 25 additions & 3 deletions lib/membrane/core/element/demand_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,23 @@ defmodule Membrane.Core.Element.DemandController do
} = pad_data

if AtomicDemand.get(atomic_demand) > 0 do
AutoFlowUtils.auto_adjust_atomic_demand(associated_pads, state)
# tutaj powinno mieć miejsce
# - usuniecie pada z mapsetu
# - sflushowanie kolejek, jesli mapset jest pusty
# zwroc uwage, czy gdzies w czyms w stylu handle_outgoing_buffers nie wjedzie ci tutaj jakas nieprzyjemna rekurencja
# kolejna rzecz: przerwanie rekurencji moze nastąpić, nawet wtedy, gdy kolejki będą miały w sobie bufory

state = Map.update!(state, :satisfied_auto_output_pads, &MapSet.delete(&1, pad_data.ref))

# dobra, wyglada git

state = AutoFlowUtils.pop_auto_flow_queues_while_needed(state)

if MapSet.size(state.satisfied_auto_output_pads) == 0 do
AutoFlowUtils.auto_adjust_atomic_demand(associated_pads, state)
else
state
end
else
state
end
Expand Down Expand Up @@ -91,9 +107,15 @@ defmodule Membrane.Core.Element.DemandController do
buffers_size = Buffer.Metric.from_unit(pad_data.demand_unit).buffers_size(buffers)

demand = pad_data.demand - buffers_size
atomic_demand = AtomicDemand.decrease(pad_data.atomic_demand, buffers_size)
{decrease_result, atomic_demand} = AtomicDemand.decrease(pad_data.atomic_demand, buffers_size)

PadModel.set_data!(state, pad_ref, %{
with {:decreased, new_value} when new_value <= 0 <- decrease_result,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What 0 represent? Could it be dumped into module attr, to avoid magic numbers?

%{flow_control: :auto} <- pad_data do
Map.update!(state, :satisfied_auto_output_pads, &MapSet.put(&1, pad_ref))
else
_other -> state
end
|> PadModel.set_data!(pad_ref, %{
pad_data
| demand: demand,
atomic_demand: atomic_demand
Expand Down
113 changes: 98 additions & 15 deletions lib/membrane/core/element/demand_controller/auto_flow_utils.ex
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
defmodule Membrane.Core.Element.DemandController.AutoFlowUtils do
@moduledoc false

alias Membrane.Buffer
alias Membrane.Event
alias Membrane.StreamFormat

alias Membrane.Core.Element.{
AtomicDemand,
State
BufferController,
EventController,
State,
StreamFormatController
}
Comment on lines 8 to 14
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: Are you sure you want to use {} in the alias? It will be hard for grepping specific context (definitely such as State)


require Membrane.Core.Child.PadModel, as: PadModel
Expand Down Expand Up @@ -59,14 +66,36 @@ defmodule Membrane.Core.Element.DemandController.AutoFlowUtils do
state
end

@spec auto_adjust_atomic_demand(Pad.ref() | [Pad.ref()], State.t()) :: State.t()
def auto_adjust_atomic_demand(pad_ref_list, state) when is_list(pad_ref_list) do
Enum.reduce(pad_ref_list, state, &auto_adjust_atomic_demand/2)
@spec store_buffers_in_queue(Pad.ref(), [Buffer.t()], State.t()) :: State.t()
def store_buffers_in_queue(pad_ref, buffers, state) do
state = Map.update!(state, :awaiting_auto_input_pads, &MapSet.put(&1, pad_ref))
store_in_queue(pad_ref, :buffers, buffers, state)
end

@spec store_event_in_queue(Pad.ref(), Event.t(), State.t()) :: State.t()
def store_event_in_queue(pad_ref, event, state) do
store_in_queue(pad_ref, :event, event, state)
end

def auto_adjust_atomic_demand(pad_ref, state) when Pad.is_pad_ref(pad_ref) do
PadModel.get_data!(state, pad_ref)
|> do_auto_adjust_atomic_demand(state)
@spec store_stream_format_in_queue(Pad.ref(), StreamFormat.t(), State.t()) :: State.t()
def store_stream_format_in_queue(pad_ref, stream_format, state) do
store_in_queue(pad_ref, :stream_format, stream_format, state)
end

defp store_in_queue(pad_ref, type, item, state) do
PadModel.update_data!(state, pad_ref, :auto_flow_queue, &Qex.push(&1, {type, item}))
end

@spec auto_adjust_atomic_demand(Pad.ref() | [Pad.ref()], State.t()) :: State.t()
def auto_adjust_atomic_demand(ref_or_ref_list, state)
when Pad.is_pad_ref(ref_or_ref_list) or is_list(ref_or_ref_list) do
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would not the pad_refs be descriptive enough?

Suggested change
def auto_adjust_atomic_demand(ref_or_ref_list, state)
when Pad.is_pad_ref(ref_or_ref_list) or is_list(ref_or_ref_list) do
def auto_adjust_atomic_demand(pad_refs, state)
when Pad.is_pad_ref(pad_refs) or is_list(pad_refs) do

ref_or_ref_list
|> Bunch.listify()
|> Enum.reduce(state, fn pad_ref, state ->
PadModel.get_data!(state, pad_ref)
|> do_auto_adjust_atomic_demand(state)
Comment on lines +169 to +170
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤭

Suggested change
PadModel.get_data!(state, pad_ref)
|> do_auto_adjust_atomic_demand(state)
state
|> PadModel.get_data!(pad_ref)
|> do_auto_adjust_atomic_demand(state)

|> elem(1) # todo: usun to :increased / :unchanged, ktore discardujesz w tym elem(1)
end)
end

defp do_auto_adjust_atomic_demand(pad_data, state) when is_input_auto_pad_data(pad_data) do
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing spec

Suggested change
defp do_auto_adjust_atomic_demand(pad_data, state) when is_input_auto_pad_data(pad_data) do
@spec do_auto_adjust_atomic_demand([Pad.ref()], State.t()) :: State.t()
defp do_auto_adjust_atomic_demand(pad_data, state) when is_input_auto_pad_data(pad_data) do

Expand All @@ -83,9 +112,11 @@ defmodule Membrane.Core.Element.DemandController.AutoFlowUtils do
:ok = AtomicDemand.increase(atomic_demand, diff)

:atomics.put(stalker_metrics.demand, 1, auto_demand_size)
PadModel.set_data!(state, ref, :demand, auto_demand_size)

state = PadModel.set_data!(state, ref, :demand, auto_demand_size)
{:increased, state}
else
state
{:unchanged, state}
end
end

Expand All @@ -97,14 +128,66 @@ defmodule Membrane.Core.Element.DemandController.AutoFlowUtils do
state.effective_flow_control == :pull and
not pad_data.auto_demand_paused? and
pad_data.demand < pad_data.auto_demand_size / 2 and
Enum.all?(pad_data.associated_pads, &atomic_demand_positive?(&1, state))
output_auto_demand_positive?(state)
end

@spec pop_auto_flow_queues_while_needed(State.t()) :: State.t()
def pop_auto_flow_queues_while_needed(state) do
if (state.effective_flow_control == :push or
MapSet.size(state.satisfied_auto_output_pads) == 0) and
MapSet.size(state.awaiting_auto_input_pads) > 0 do
pop_random_auto_flow_queue(state)
|> pop_auto_flow_queues_while_needed()
else
state
end
end

defp pop_random_auto_flow_queue(state) do
pad_ref = Enum.random(state.awaiting_auto_input_pads)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe popping more than one buffer from each queue at a time would speed it up

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean to do everything in the same way as it is, but just call Enum.random less frequently?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes


PadModel.get_data!(state, pad_ref, :auto_flow_queue)
|> Qex.pop()
|> case do
{{:value, {:buffers, buffers}}, popped_queue} ->
state = PadModel.set_data!(state, pad_ref, :auto_flow_queue, popped_queue)
state = BufferController.exec_buffer_callback(pad_ref, buffers, state)
pop_stream_formats_and_events(pad_ref, state)

{:empty, _empty_queue} ->
Map.update!(state, :awaiting_auto_input_pads, &MapSet.delete(&1, pad_ref))
end
end

defp atomic_demand_positive?(pad_ref, state) do
atomic_demand_value =
PadModel.get_data!(state, pad_ref, :atomic_demand)
|> AtomicDemand.get()
defp pop_stream_formats_and_events(pad_ref, state) do
PadModel.get_data!(state, pad_ref, :auto_flow_queue)
|> Qex.pop()
Comment on lines +280 to +281
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
PadModel.get_data!(state, pad_ref, :auto_flow_queue)
|> Qex.pop()
state
|> PadModel.get_data!(pad_ref, :auto_flow_queue)
|> Qex.pop()

|> case do
{{:value, {type, item}}, popped_queue} when type in [:event, :stream_format] ->
state = PadModel.set_data!(state, pad_ref, :auto_flow_queue, popped_queue)
state = exec_queue_item_callback(pad_ref, {type, item}, state)
pop_stream_formats_and_events(pad_ref, state)

{{:value, {:buffers, _buffers}}, _popped_queue} ->
state

{:empty, _empty_queue} ->
Map.update!(state, :awaiting_auto_input_pads, &MapSet.delete(&1, pad_ref))
end
end

defp output_auto_demand_positive?(%State{satisfied_auto_output_pads: pads}),
do: MapSet.size(pads) == 0
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pattern match is faster than looping over whole map

Suggested change
defp output_auto_demand_positive?(%State{satisfied_auto_output_pads: pads}),
do: MapSet.size(pads) == 0
defp output_auto_demand_positive?(%State{satisfied_auto_output_pads: %{}}), do: true
defp output_auto_demand_positive?(_state), do: false


defp exec_queue_item_callback(pad_ref, {:buffers, buffers}, state) do
BufferController.exec_buffer_callback(pad_ref, buffers, state)
end

defp exec_queue_item_callback(pad_ref, {:event, event}, state) do
EventController.exec_handle_event(pad_ref, event, state)
end

atomic_demand_value > 0
defp exec_queue_item_callback(pad_ref, {:stream_format, stream_format}, state) do
StreamFormatController.exec_handle_stream_format(pad_ref, stream_format, state)
end
end
15 changes: 9 additions & 6 deletions lib/membrane/core/element/effective_flow_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ defmodule Membrane.Core.Element.EffectiveFlowController do

state.pads_data
|> Enum.filter(fn {_ref, %{flow_control: flow_control}} -> flow_control == :auto end)
|> Enum.reduce(state, fn
{_ref, %{direction: :output} = pad_data}, state ->
|> Enum.each(fn
{_ref, %{direction: :output} = pad_data} ->
:ok =
AtomicDemand.set_sender_status(
pad_data.atomic_demand,
Expand All @@ -120,9 +120,7 @@ defmodule Membrane.Core.Element.EffectiveFlowController do
[pad_data.other_ref, new_effective_flow_control]
)

state

{pad_ref, %{direction: :input} = pad_data}, state ->
{pad_ref, %{direction: :input} = pad_data} ->
if triggering_pad in [pad_ref, nil] or
AtomicDemand.get_receiver_status(pad_data.atomic_demand) != :to_be_resolved do
:ok =
Expand All @@ -131,8 +129,13 @@ defmodule Membrane.Core.Element.EffectiveFlowController do
{:resolved, new_effective_flow_control}
)
end
end)

AutoFlowUtils.auto_adjust_atomic_demand(pad_ref, state)
state.pads_data
|> Enum.flat_map(fn
{pad_ref, %{direction: :input, flow_control: :auto}} -> [pad_ref]
_other -> []
end)
|> AutoFlowUtils.auto_adjust_atomic_demand(state)
end
end
27 changes: 18 additions & 9 deletions lib/membrane/core/element/event_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ defmodule Membrane.Core.Element.EventController do
State
}

alias Membrane.Core.Element.DemandController.AutoFlowUtils

require Membrane.Core.Child.PadModel
require Membrane.Core.Message
require Membrane.Core.Telemetry
Expand All @@ -39,15 +41,22 @@ defmodule Membrane.Core.Element.EventController do
playback: %State{playback: :playing} <- state do
Telemetry.report_metric(:event, 1, inspect(pad_ref))

if not Event.async?(event) and buffers_before_event_present?(data) do
PadModel.update_data!(
state,
pad_ref,
:input_queue,
&InputQueue.store(&1, :event, event)
)
else
exec_handle_event(pad_ref, event, state)
cond do
# events goes to the manual flow control input queue
not Event.async?(event) and buffers_before_event_present?(data) ->
PadModel.update_data!(
state,
pad_ref,
:input_queue,
&InputQueue.store(&1, :event, event)
)

# event goes to the auto flow control queue
pad_ref in state.awaiting_auto_input_pads ->
AutoFlowUtils.store_event_in_queue(pad_ref, event, state)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here we can potentially store async events in the queue

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean the first case of cond statement? The same condition as there is currently on master, I haven't changed it

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On master there's an if, not cond

if not Event.async?(event) and buffers_before_event_present?(data) do

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I just transformed if into cond, the logic of the code is the same

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it's not :P

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1st condition in cond is true <=> condition in if is satisfied

true ->
exec_handle_event(pad_ref, event, state)
end
else
pad: {:error, :unknown_pad} ->
Expand Down
16 changes: 12 additions & 4 deletions lib/membrane/core/element/pad_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -330,10 +330,15 @@ defmodule Membrane.Core.Element.PadController do

state = update_associated_pads(pad_data, state)

if pad_data.direction == :input and pad_data.flow_control == :auto do
AutoFlowUtils.auto_adjust_atomic_demand(endpoint.pad_ref, state)
else
state
case pad_data do
%{direction: :output, flow_control: :auto} ->
Map.update!(state, :satisfied_auto_output_pads, &MapSet.put(&1, pad_data.ref))

%{direction: :input, flow_control: :auto} ->
AutoFlowUtils.auto_adjust_atomic_demand(endpoint.pad_ref, state)

_pad_data ->
state
end
end

Expand Down Expand Up @@ -484,6 +489,9 @@ defmodule Membrane.Core.Element.PadController do
PadModel.update_data!(state, pad, :associated_pads, &List.delete(&1, pad_data.ref))
end)
|> PadModel.set_data!(pad_ref, :associated_pads, [])
|> Map.update!(:satisfied_auto_output_pads, &MapSet.delete(&1, pad_ref))
|> Map.update!(:awaiting_auto_input_pads, &MapSet.delete(&1, pad_ref))
|> AutoFlowUtils.pop_auto_flow_queues_while_needed()

if pad_data.direction == :output,
do: AutoFlowUtils.auto_adjust_atomic_demand(pad_data.associated_pads, state),
Expand Down
Loading