Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
FelonEkonom committed Oct 19, 2023
1 parent 9269612 commit 3cfac50
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 3 deletions.
14 changes: 13 additions & 1 deletion lib/membrane/core/element/buffer_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,23 @@ defmodule Membrane.Core.Element.BufferController do
%{demand: demand, demand_unit: demand_unit, stalker_metrics: stalker_metrics} = data
buf_size = Buffer.Metric.from_unit(demand_unit).buffers_size(buffers)

# We check if pad should be corcked before decrementing :demand field in PadData
hard_corcked? = AutoFlowUtils.hard_corcked?(pad_ref, state)

state = PadModel.set_data!(state, pad_ref, :demand, demand - buf_size)
:atomics.put(stalker_metrics.demand, 1, demand - buf_size)

state = AutoFlowUtils.auto_adjust_atomic_demand(pad_ref, state)
exec_buffer_callback(pad_ref, buffers, state)

# jest szansa ze jak przyjdzie duzy chunk buforow, to dounter poleci leb na szyje i wtedy
# calosc pojdzie do kolejki, w momencie, gdy czesc z nich chcielibysmy jednak przeprocesowac
# napraw ten case

if hard_corcked? do
AutoFlowUtils.store_buffers_in_queue(pad_ref, buffers, state)
else
exec_buffer_callback(pad_ref, buffers, state)
end
end

defp do_handle_buffer(pad_ref, %{flow_control: :manual} = data, buffers, state) do
Expand Down
1 change: 1 addition & 0 deletions lib/membrane/core/element/demand_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ defmodule Membrane.Core.Element.DemandController do

if AtomicDemand.get(atomic_demand) > 0 do
AutoFlowUtils.auto_adjust_atomic_demand(associated_pads, state)
# flush queues
else
state
end
Expand Down
58 changes: 57 additions & 1 deletion lib/membrane/core/element/demand_controller/auto_flow_utils.ex
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
defmodule Membrane.Core.Element.DemandController.AutoFlowUtils do
@moduledoc false

alias Membrane.Buffer

alias Membrane.Core.Element.{
AtomicDemand,
State
Expand Down Expand Up @@ -59,12 +61,33 @@ defmodule Membrane.Core.Element.DemandController.AutoFlowUtils do
state
end

@spec hard_corcked?(Pad.ref(), State.t()) :: boolean()
def hard_corcked?(pad_ref, state) do
pad_data = PadModel.get_data!(state, pad_ref)

state.effective_flow_control == :pull && pad_data.demand < -1 * pad_data.auto_demand_size / 2
end

@spec store_buffers_in_queue(Pad.ref(), [Buffer.t()], State.t()) :: boolean()
def store_buffers_in_queue(pad_ref, buffers, state) do
PadModel.update_data!(state, pad_ref, [:auto_flow_queue], fn queue ->
Enum.reduce(buffers, queue, &Qex.push(queue, &1))
end)
end

@spec auto_adjust_atomic_demand(Pad.ref() | [Pad.ref()], State.t()) :: State.t()
def auto_adjust_atomic_demand(pad_ref_list, state) when is_list(pad_ref_list) do
Enum.reduce(pad_ref_list, state, &auto_adjust_atomic_demand/2)
state = Enum.reduce(pad_ref_list, state, &do_auto_adjust_atomic_demand/2)
flush_auto_flow_queues(pad_ref_list, state)
# flush queues ?
end

def auto_adjust_atomic_demand(pad_ref, state) when Pad.is_pad_ref(pad_ref) do
state = do_auto_adjust_atomic_demand(pad_ref, state)
flush_auto_flow_queues([pad_ref], state)
end

defp do_auto_adjust_atomic_demand(pad_ref, state) when Pad.is_pad_ref(pad_ref) do
PadModel.get_data!(state, pad_ref)
|> do_auto_adjust_atomic_demand(state)
end
Expand Down Expand Up @@ -107,4 +130,37 @@ defmodule Membrane.Core.Element.DemandController.AutoFlowUtils do

atomic_demand_value > 0
end

@spec flush_auto_flow_queues([Pad.ref()], State.t()) :: State.t()
def flush_auto_flow_queues(pad_ref_list, state) do
pad_to_queue_map =
pad_ref_list
|> Enum.filter(&hard_corcked?(&1, state))
|> Map.new(&PadModel.get_data!(state, &1, [:auto_flow_queue]))

state = handle_queued_buffers(pad_to_queue_map, state)

Enum.reduce(pad_ref_list, state, fn pad_ref, state ->
PadModel.set_data!(state, pad_ref, [:auto_flow_queue], Qex.new())
end)
end

defp handle_queued_buffers(%{}, state), do: state

defp handle_queued_buffers(pad_to_queue_map, state) do
{pad_ref, queue} = Enum.random(pad_to_queue_map)

case Qex.pop(queue) do
{{:value, queue_item}, popped_queue} ->
# handle queue_item
pad_to_queue_map
|> Map.put(pad_ref, popped_queue)
|> handle_queued_buffers(state)

{:empty, _empty_queue} ->
pad_to_queue_map
|> Map.pop(pad_ref)
|> handle_queued_buffers(state)
end
end
end
13 changes: 12 additions & 1 deletion lib/membrane/core/element/effective_flow_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ defmodule Membrane.Core.Element.EffectiveFlowController do
"Transiting `flow_control: :auto` pads to #{inspect(new_effective_flow_control)} effective flow control"
)

old_effective_flow_control = state.effective_flow_control
state = %{state | effective_flow_control: new_effective_flow_control}

state.pads_data
Expand Down Expand Up @@ -132,7 +133,17 @@ defmodule Membrane.Core.Element.EffectiveFlowController do
)
end

AutoFlowUtils.auto_adjust_atomic_demand(pad_ref, state)
# AutoFlowUtils.auto_adjust_atomic_demand(pad_ref, state)
state
end)

# flush queues

state.pads_data
|> Enum.filter(fn
{_pad_ref, %{direction: :input, flow_contorl: :auto}} -> true
_other -> false
end)
|> AutoFlowUtils.auto_adjust_atomic_demand(state)
end
end
1 change: 1 addition & 0 deletions lib/membrane/core/element/pad_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,7 @@ defmodule Membrane.Core.Element.PadController do

if pad_data.direction == :output,
do: AutoFlowUtils.auto_adjust_atomic_demand(pad_data.associated_pads, state),
# flush queues
else: state

_pad_data ->
Expand Down
2 changes: 2 additions & 0 deletions lib/membrane/element/pad_data.ex
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ defmodule Membrane.Element.PadData do
pid: private_field,
other_ref: private_field,
input_queue: private_field,
auto_flow_queue: private_field,
incoming_demand: integer() | nil,
demand_unit: private_field,
other_demand_unit: private_field,
Expand Down Expand Up @@ -80,6 +81,7 @@ defmodule Membrane.Element.PadData do
defstruct @enforce_keys ++
[
input_queue: nil,
auto_flow_queue: Qex.new(),
demand: 0,
incoming_demand: nil,
demand_unit: nil,
Expand Down

0 comments on commit 3cfac50

Please sign in to comment.