Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
FelonEkonom committed Dec 19, 2023
1 parent b66662d commit 2430810
Show file tree
Hide file tree
Showing 13 changed files with 255 additions and 189 deletions.
2 changes: 1 addition & 1 deletion .github/actions/add_pr_to_smackore_board/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ runs:
export STATUS_FIELD_ID=PVTSSF_lADOAYE_z84AWEIBzgOGd1k
export TARGET_COLUMN_ID=e6b1ee10
export AUTHOR_ORIGIN=$(curl --request GET --url "https://api.github.com/orgs/membraneframework/members" --header "Authorization: Bearer $GH_TOKEN" -s | python scripts/python/get_author_origin.py $AUTHOR_LOGIN)
export AUTHOR_ORIGIN=$(curl --request GET --url "https://api.github.com/orgs/membraneframework/members" --header "Authorization: Bearer $GH_TOKEN" -s | python scripts/python/get_author_origin.py "$AUTHOR_LOGIN")
if [ "$AUTHOR_ORIGIN" == "COMMUNITY" ]
then
Expand Down
4 changes: 3 additions & 1 deletion lib/membrane/core/element.ex
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,9 @@ defmodule Membrane.Core.Element do
effective_flow_control: :push,
handling_action?: false,
pads_to_snapshot: MapSet.new(),
stalker: options.stalker
stalker: options.stalker,
satisfied_auto_output_pads: MapSet.new(),
awaiting_auto_input_pads: MapSet.new()
}
|> PadSpecHandler.init_pads()

Expand Down
23 changes: 13 additions & 10 deletions lib/membrane/core/element/atomic_demand.ex
Original file line number Diff line number Diff line change
Expand Up @@ -139,14 +139,14 @@ defmodule Membrane.Core.Element.AtomicDemand do
:ok
end

@spec decrease(t, non_neg_integer()) :: t
@spec decrease(t, non_neg_integer()) :: {{:decreased, integer()}, t} | {:unchanged, t}
def decrease(%__MODULE__{} = atomic_demand, value) do
atomic_demand = Map.update!(atomic_demand, :buffered_decrementation, &(&1 + value))

if atomic_demand.buffered_decrementation >= atomic_demand.throttling_factor do
flush_buffered_decrementation(atomic_demand)
else
atomic_demand
{:unchanged, atomic_demand}
end
end

Expand All @@ -164,14 +164,17 @@ defmodule Membrane.Core.Element.AtomicDemand do

atomic_demand = %{atomic_demand | buffered_decrementation: 0}

if not atomic_demand.toilet_overflowed? and
get_receiver_status(atomic_demand) == {:resolved, :pull} and
get_sender_status(atomic_demand) == {:resolved, :push} and
-1 * atomic_demand_value > atomic_demand.toilet_capacity do
overflow(atomic_demand, atomic_demand_value)
else
atomic_demand
end
atomic_demand =
if not atomic_demand.toilet_overflowed? and
get_receiver_status(atomic_demand) == {:resolved, :pull} and
get_sender_status(atomic_demand) == {:resolved, :push} and
-1 * atomic_demand_value > atomic_demand.toilet_capacity do
overflow(atomic_demand, atomic_demand_value)
else
atomic_demand
end

{{:decreased, atomic_demand_value}, atomic_demand}
end

defp overflow(atomic_demand, atomic_demand_value) do
Expand Down
24 changes: 12 additions & 12 deletions lib/membrane/core/element/buffer_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -66,23 +66,23 @@ defmodule Membrane.Core.Element.BufferController do
%{demand: demand, demand_unit: demand_unit, stalker_metrics: stalker_metrics} = data
buf_size = Buffer.Metric.from_unit(demand_unit).buffers_size(buffers)

# we check if pad should be corcked before decrementing :demand field in PadData
# 1) to avoid situation, when big chunk of data is stored in the queue only because it
# exceeds auto_demand_size
# 2) to handle start of stream caused by first buffer arrival possibly early
hard_corcked? = AutoFlowUtils.hard_corcked?(pad_ref, state)

state = PadModel.set_data!(state, pad_ref, :demand, demand - buf_size)
:atomics.put(stalker_metrics.demand, 1, demand - buf_size)

state =
if hard_corcked? do
AutoFlowUtils.store_buffers_in_queue(pad_ref, buffers, state)
else
exec_buffer_callback(pad_ref, buffers, state)
if state.effective_flow_control == :pull and MapSet.size(state.satisfied_auto_output_pads) > 0 do
AutoFlowUtils.store_buffers_in_queue(pad_ref, buffers, state)
else
if pad_ref in state.awaiting_auto_input_pads do
raise "to nie powinno sie zdarzyc dupa 1"
end

AutoFlowUtils.auto_adjust_atomic_demand(pad_ref, state)
if PadModel.get_data!(state, pad_ref, [:auto_flow_queue]) != Qex.new() do
raise "to nie powinno sie zdarzyc dupa 2"
end

state = exec_buffer_callback(pad_ref, buffers, state)
AutoFlowUtils.auto_adjust_atomic_demand(pad_ref, state)
end
end

defp do_handle_buffer(pad_ref, %{flow_control: :manual} = data, buffers, state) do
Expand Down
28 changes: 25 additions & 3 deletions lib/membrane/core/element/demand_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,23 @@ defmodule Membrane.Core.Element.DemandController do
} = pad_data

if AtomicDemand.get(atomic_demand) > 0 do
AutoFlowUtils.auto_adjust_atomic_demand(associated_pads, state)
# tutaj powinno mieć miejsce
# - usuniecie pada z mapsetu
# - sflushowanie kolejek, jesli mapset jest pusty
# zwroc uwage, czy gdzies w czyms w stylu handle_outgoing_buffers nie wjedzie ci tutaj jakas nieprzyjemna rekurencja
# kolejna rzecz: przerwanie rekurencji moze nastąpić, nawet wtedy, gdy kolejki będą miały w sobie bufory

state = Map.update!(state, :satisfied_auto_output_pads, &MapSet.delete(&1, pad_data.ref))

# dobra, wyglada git

state = AutoFlowUtils.pop_auto_flow_queues_while_needed(state)

if MapSet.size(state.satisfied_auto_output_pads) == 0 do
AutoFlowUtils.auto_adjust_atomic_demand(associated_pads, state)
else
state
end
else
state
end
Expand Down Expand Up @@ -91,9 +107,15 @@ defmodule Membrane.Core.Element.DemandController do
buffers_size = Buffer.Metric.from_unit(pad_data.demand_unit).buffers_size(buffers)

demand = pad_data.demand - buffers_size
atomic_demand = AtomicDemand.decrease(pad_data.atomic_demand, buffers_size)
{decrease_result, atomic_demand} = AtomicDemand.decrease(pad_data.atomic_demand, buffers_size)

PadModel.set_data!(state, pad_ref, %{
with {:decreased, new_value} when new_value <= 0 <- decrease_result,
%{flow_control: :auto} <- pad_data do
Map.update!(state, :satisfied_auto_output_pads, &MapSet.put(&1, pad_ref))
else
_other -> state
end
|> PadModel.set_data!(pad_ref, %{
pad_data
| demand: demand,
atomic_demand: atomic_demand
Expand Down
89 changes: 46 additions & 43 deletions lib/membrane/core/element/demand_controller/auto_flow_utils.ex
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,9 @@ defmodule Membrane.Core.Element.DemandController.AutoFlowUtils do
state
end

@spec hard_corcked?(Pad.ref(), State.t()) :: boolean()
def hard_corcked?(pad_ref, state) do
pad_data = PadModel.get_data!(state, pad_ref)

state.effective_flow_control == :pull and pad_data.direction == :input and
pad_data.flow_control == :auto and pad_data.demand < 0
end

@spec store_buffers_in_queue(Pad.ref(), [Buffer.t()], State.t()) :: State.t()
def store_buffers_in_queue(pad_ref, buffers, state) do
state = Map.update!(state, :awaiting_auto_input_pads, &MapSet.put(&1, pad_ref))
store_in_queue(pad_ref, :buffers, buffers, state)
end

Expand All @@ -96,19 +89,13 @@ defmodule Membrane.Core.Element.DemandController.AutoFlowUtils do
@spec auto_adjust_atomic_demand(Pad.ref() | [Pad.ref()], State.t()) :: State.t()
def auto_adjust_atomic_demand(ref_or_ref_list, state)
when Pad.is_pad_ref(ref_or_ref_list) or is_list(ref_or_ref_list) do
{bumped_pads, state} =
ref_or_ref_list
|> Bunch.listify()
|> Enum.flat_map_reduce(state, fn pad_ref, state ->
PadModel.get_data!(state, pad_ref)
|> do_auto_adjust_atomic_demand(state)
|> case do
{:increased, state} -> {[pad_ref], state}
{:unchanged, state} -> {[], state}
end
end)

flush_auto_flow_queues(bumped_pads, state)
ref_or_ref_list
|> Bunch.listify()
|> Enum.reduce(state, fn pad_ref, state ->
PadModel.get_data!(state, pad_ref)
|> do_auto_adjust_atomic_demand(state)
|> elem(1) # todo: usun to :increased / :unchanged, ktore discardujesz w tym elem(1)
end)
end

defp do_auto_adjust_atomic_demand(pad_data, state) when is_input_auto_pad_data(pad_data) do
Expand Down Expand Up @@ -141,41 +128,57 @@ defmodule Membrane.Core.Element.DemandController.AutoFlowUtils do
state.effective_flow_control == :pull and
not pad_data.auto_demand_paused? and
pad_data.demand < pad_data.auto_demand_size / 2 and
Enum.all?(pad_data.associated_pads, &atomic_demand_positive?(&1, state))
output_auto_demand_positive?(state)
end

defp atomic_demand_positive?(pad_ref, state) do
atomic_demand_value =
PadModel.get_data!(state, pad_ref, :atomic_demand)
|> AtomicDemand.get()

atomic_demand_value > 0
@spec pop_auto_flow_queues_while_needed(State.t()) :: State.t()
def pop_auto_flow_queues_while_needed(state) do
if (state.effective_flow_control == :push or
MapSet.size(state.satisfied_auto_output_pads) == 0) and
MapSet.size(state.awaiting_auto_input_pads) > 0 do
pop_random_auto_flow_queue(state)
|> pop_auto_flow_queues_while_needed()
else
state
end
end

defp flush_auto_flow_queues([], state), do: state
defp pop_random_auto_flow_queue(state) do
pad_ref = Enum.random(state.awaiting_auto_input_pads)

defp flush_auto_flow_queues(pads_to_flush, state) do
selected_pad = Enum.random(pads_to_flush)

PadModel.get_data!(state, selected_pad, :auto_flow_queue)
PadModel.get_data!(state, pad_ref, :auto_flow_queue)
|> Qex.pop()
|> case do
{{:value, queue_item}, popped_queue} ->
state =
exec_queue_item_callback(selected_pad, queue_item, state)
|> PadModel.set_data!(selected_pad, :auto_flow_queue, popped_queue)
{{:value, {:buffers, buffers}}, popped_queue} ->
state = PadModel.set_data!(state, pad_ref, :auto_flow_queue, popped_queue)
state = BufferController.exec_buffer_callback(pad_ref, buffers, state)
pop_stream_formats_and_events(pad_ref, state)

flush_auto_flow_queues(pads_to_flush, state)
{:empty, _empty_queue} ->
Map.update!(state, :awaiting_auto_input_pads, &MapSet.delete(&1, pad_ref))
end
end

{:empty, empty_queue} ->
state = PadModel.set_data!(state, selected_pad, :auto_flow_queue, empty_queue)
defp pop_stream_formats_and_events(pad_ref, state) do
PadModel.get_data!(state, pad_ref, :auto_flow_queue)
|> Qex.pop()
|> case do
{{:value, {type, item}}, popped_queue} when type in [:event, :stream_format] ->
state = PadModel.set_data!(state, pad_ref, :auto_flow_queue, popped_queue)
state = exec_queue_item_callback(pad_ref, {type, item}, state)
pop_stream_formats_and_events(pad_ref, state)

{{:value, {:buffers, _buffers}}, _popped_queue} ->
state

pads_to_flush
|> List.delete(selected_pad)
|> flush_auto_flow_queues(state)
{:empty, _empty_queue} ->
Map.update!(state, :awaiting_auto_input_pads, &MapSet.delete(&1, pad_ref))
end
end

defp output_auto_demand_positive?(%State{satisfied_auto_output_pads: pads}),
do: MapSet.size(pads) == 0

defp exec_queue_item_callback(pad_ref, {:buffers, buffers}, state) do
BufferController.exec_buffer_callback(pad_ref, buffers, state)
end
Expand Down
2 changes: 1 addition & 1 deletion lib/membrane/core/element/event_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ defmodule Membrane.Core.Element.EventController do
)

# event goes to the auto flow control queue
AutoFlowUtils.hard_corcked?(pad_ref, state) ->
pad_ref in state.awaiting_auto_input_pads ->
AutoFlowUtils.store_event_in_queue(pad_ref, event, state)

true ->
Expand Down
16 changes: 12 additions & 4 deletions lib/membrane/core/element/pad_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -330,10 +330,15 @@ defmodule Membrane.Core.Element.PadController do

state = update_associated_pads(pad_data, state)

if pad_data.direction == :input and pad_data.flow_control == :auto do
AutoFlowUtils.auto_adjust_atomic_demand(endpoint.pad_ref, state)
else
state
case pad_data do
%{direction: :output, flow_control: :auto} ->
Map.update!(state, :satisfied_auto_output_pads, &MapSet.put(&1, pad_data.ref))

%{direction: :input, flow_control: :auto} ->
AutoFlowUtils.auto_adjust_atomic_demand(endpoint.pad_ref, state)

_pad_data ->
state
end
end

Expand Down Expand Up @@ -484,6 +489,9 @@ defmodule Membrane.Core.Element.PadController do
PadModel.update_data!(state, pad, :associated_pads, &List.delete(&1, pad_data.ref))
end)
|> PadModel.set_data!(pad_ref, :associated_pads, [])
|> Map.update!(:satisfied_auto_output_pads, &MapSet.delete(&1, pad_ref))
|> Map.update!(:awaiting_auto_input_pads, &MapSet.delete(&1, pad_ref))
|> AutoFlowUtils.pop_auto_flow_queues_while_needed()

if pad_data.direction == :output,
do: AutoFlowUtils.auto_adjust_atomic_demand(pad_data.associated_pads, state),
Expand Down
8 changes: 6 additions & 2 deletions lib/membrane/core/element/state.ex
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ defmodule Membrane.Core.Element.State do
effective_flow_control: EffectiveFlowController.effective_flow_control(),
handling_action?: boolean(),
pads_to_snapshot: MapSet.t(),
stalker: Membrane.Core.Stalker.t()
stalker: Membrane.Core.Stalker.t(),
satisfied_auto_output_pads: MapSet.t(),
awaiting_auto_input_pads: MapSet.t()
}

# READ THIS BEFORE ADDING NEW FIELD!!!
Expand Down Expand Up @@ -79,6 +81,8 @@ defmodule Membrane.Core.Element.State do
:demand_size,
:pads_to_snapshot,
:playback_queue,
:pads_data
:pads_data,
:satisfied_auto_output_pads,
:awaiting_auto_input_pads
]
end
2 changes: 1 addition & 1 deletion lib/membrane/core/element/stream_format_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ defmodule Membrane.Core.Element.StreamFormatController do
)

# stream format goes to the auto flow control queue
AutoFlowUtils.hard_corcked?(pad_ref, state) ->
pad_ref in state.awaiting_auto_input_pads ->
AutoFlowUtils.store_stream_format_in_queue(pad_ref, stream_format, state)

true ->
Expand Down
Loading

0 comments on commit 2430810

Please sign in to comment.