Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
FelonEkonom committed Jan 5, 2024
1 parent 2430810 commit 7be81c9
Show file tree
Hide file tree
Showing 8 changed files with 134 additions and 71 deletions.
1 change: 1 addition & 0 deletions lib/membrane/core/element.ex
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ defmodule Membrane.Core.Element do
setup_incomplete?: false,
effective_flow_control: :push,
handling_action?: false,
popping_queue?: false,
pads_to_snapshot: MapSet.new(),
stalker: options.stalker,
satisfied_auto_output_pads: MapSet.new(),
Expand Down
21 changes: 9 additions & 12 deletions lib/membrane/core/element/demand_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,7 @@ defmodule Membrane.Core.Element.DemandController do
%{flow_control: :auto} = pad_data,
%{effective_flow_control: :pull} = state
) do
%{
atomic_demand: atomic_demand,
associated_pads: associated_pads
} = pad_data

if AtomicDemand.get(atomic_demand) > 0 do
if AtomicDemand.get(pad_data.atomic_demand) > 0 do
# tutaj powinno mieć miejsce
# - usuniecie pada z mapsetu
# - sflushowanie kolejek, jesli mapset jest pusty
Expand All @@ -59,13 +54,15 @@ defmodule Membrane.Core.Element.DemandController do

# dobra, wyglada git

state = AutoFlowUtils.pop_auto_flow_queues_while_needed(state)
AutoFlowUtils.pop_queues_and_bump_demand(state)

if MapSet.size(state.satisfied_auto_output_pads) == 0 do
AutoFlowUtils.auto_adjust_atomic_demand(associated_pads, state)
else
state
end
# state = AutoFlowUtils.pop_auto_flow_queues_while_needed(state)

# if MapSet.size(state.satisfied_auto_output_pads) == 0 do
# AutoFlowUtils.auto_adjust_atomic_demand(associated_pads, state)
# else
# state
# end
else
state
end
Expand Down
100 changes: 78 additions & 22 deletions lib/membrane/core/element/demand_controller/auto_flow_utils.ex
Original file line number Diff line number Diff line change
Expand Up @@ -69,22 +69,32 @@ defmodule Membrane.Core.Element.DemandController.AutoFlowUtils do
@spec store_buffers_in_queue(Pad.ref(), [Buffer.t()], State.t()) :: State.t()
def store_buffers_in_queue(pad_ref, buffers, state) do
state = Map.update!(state, :awaiting_auto_input_pads, &MapSet.put(&1, pad_ref))
store_in_queue(pad_ref, :buffers, buffers, state)
PadModel.update_data!(state, pad_ref, :auto_flow_queue, fn queue ->
Enum.reduce(buffers, queue, fn buffer, queue -> Qex.push(queue, {:buffer, buffer}))
end)

# store_in_queue(pad_ref, :buffers, buffers, state)
end

@spec store_event_in_queue(Pad.ref(), Event.t(), State.t()) :: State.t()
def store_event_in_queue(pad_ref, event, state) do
store_in_queue(pad_ref, :event, event, state)
# store_in_queue(pad_ref, :event, event, state)
queue_item = {:event, event}
PadModel.update_data!(state, pad_ref, :auto_flow_queue, &Qex.push(&1, queue_item))
end

@spec store_stream_format_in_queue(Pad.ref(), StreamFormat.t(), State.t()) :: State.t()
def store_stream_format_in_queue(pad_ref, stream_format, state) do
store_in_queue(pad_ref, :stream_format, stream_format, state)
# store_in_queue(pad_ref, :stream_format, stream_format, state)
queue_item = {:stream_format, stream_format}
PadModel.update_data!(state, pad_ref, :auto_flow_queue, &Qex.push(&1, queue_item))
end

defp store_in_queue(pad_ref, type, item, state) do
PadModel.update_data!(state, pad_ref, :auto_flow_queue, &Qex.push(&1, {type, item}))
end
# defp store_in_queue(pad_ref, type, item, state) do
# PadModel.update_data!(state, pad_ref, :auto_flow_queue, &Qex.push(&1, {type, item}))
# end

# defp update_queue

@spec auto_adjust_atomic_demand(Pad.ref() | [Pad.ref()], State.t()) :: State.t()
def auto_adjust_atomic_demand(ref_or_ref_list, state)
Expand All @@ -94,7 +104,8 @@ defmodule Membrane.Core.Element.DemandController.AutoFlowUtils do
|> Enum.reduce(state, fn pad_ref, state ->
PadModel.get_data!(state, pad_ref)
|> do_auto_adjust_atomic_demand(state)
|> elem(1) # todo: usun to :increased / :unchanged, ktore discardujesz w tym elem(1)
# todo: usun to :increased / :unchanged, ktore discardujesz w tym elem(1)
|> elem(1)
end)
end

Expand Down Expand Up @@ -131,6 +142,31 @@ defmodule Membrane.Core.Element.DemandController.AutoFlowUtils do
output_auto_demand_positive?(state)
end

@spec pop_queues_and_bump_demand(State.t()) :: State.t()
def pop_queues_and_bump_demand(%State{popping_queue?: false} = state) do
%{state | popping_queue?: true}
|> bump_demand()
|> pop_auto_flow_queues_while_needed()
|> bump_demand()
|> Map.put(:popping_queue?, false)
end

def pop_queues_and_bump_demand(%State{popping_queue?: true} = state), do: state

defp bump_demand(state) do
if state.effective_flow_control == :pull and
MapSet.size(state.satisfied_auto_output_pads) == 0 do
state.pads_data
|> Enum.flat_map(fn
{ref, %{direction: :input, flow_control: :auto}} -> [ref]
_other -> []
end)
|> auto_adjust_atomic_demand(state)
else
state
end
end

@spec pop_auto_flow_queues_while_needed(State.t()) :: State.t()
def pop_auto_flow_queues_while_needed(state) do
if (state.effective_flow_control == :push or
Expand All @@ -143,10 +179,37 @@ defmodule Membrane.Core.Element.DemandController.AutoFlowUtils do
end
end

# @spec pop_auto_flow_queues_while_needed(State.t()) :: State.t()
# def pop_auto_flow_queues_while_needed(state) do
# if state.name == :tee do
# {state.effective_flow_control, state.satisfied_auto_output_pads} |> IO.inspect(label: "TAKIE TAM W POP WHILE")
# end

# if (state.effective_flow_control == :push or
# MapSet.size(state.satisfied_auto_output_pads) == 0) and
# MapSet.size(state.awaiting_auto_input_pads) > 0 do

# if state.name == :tee do
# IO.puts("A")
# end

# pop_random_auto_flow_queue(state)
# |> pop_auto_flow_queues_while_needed()
# else
# if state.name == :tee do
# IO.puts("B")
# end

# state
# end
# end

defp pop_random_auto_flow_queue(state) do
pad_ref = Enum.random(state.awaiting_auto_input_pads)

PadModel.get_data!(state, pad_ref, :auto_flow_queue)
state
# pop_stream_formats_and_events(pad_ref, state)
|> PadModel.get_data!(pad_ref, :auto_flow_queue)
|> Qex.pop()
|> case do
{{:value, {:buffers, buffers}}, popped_queue} ->
Expand All @@ -163,9 +226,14 @@ defmodule Membrane.Core.Element.DemandController.AutoFlowUtils do
PadModel.get_data!(state, pad_ref, :auto_flow_queue)
|> Qex.pop()
|> case do
{{:value, {type, item}}, popped_queue} when type in [:event, :stream_format] ->
{{:value, {:event, event}}, popped_queue} ->
state = PadModel.set_data!(state, pad_ref, :auto_flow_queue, popped_queue)
state = EventController.exec_handle_event(pad_ref, event, state)
pop_stream_formats_and_events(pad_ref, state)

{{:value, {:stream_format, stream_format}}, popped_queue} ->
state = PadModel.set_data!(state, pad_ref, :auto_flow_queue, popped_queue)
state = exec_queue_item_callback(pad_ref, {type, item}, state)
state = StreamFormatController.exec_handle_stream_format(pad_ref, stream_format, state)
pop_stream_formats_and_events(pad_ref, state)

{{:value, {:buffers, _buffers}}, _popped_queue} ->
Expand All @@ -178,16 +246,4 @@ defmodule Membrane.Core.Element.DemandController.AutoFlowUtils do

defp output_auto_demand_positive?(%State{satisfied_auto_output_pads: pads}),
do: MapSet.size(pads) == 0

defp exec_queue_item_callback(pad_ref, {:buffers, buffers}, state) do
BufferController.exec_buffer_callback(pad_ref, buffers, state)
end

defp exec_queue_item_callback(pad_ref, {:event, event}, state) do
EventController.exec_handle_event(pad_ref, event, state)
end

defp exec_queue_item_callback(pad_ref, {:stream_format, stream_format}, state) do
StreamFormatController.exec_handle_stream_format(pad_ref, stream_format, state)
end
end
14 changes: 8 additions & 6 deletions lib/membrane/core/element/effective_flow_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,13 @@ defmodule Membrane.Core.Element.EffectiveFlowController do
end
end)

state.pads_data
|> Enum.flat_map(fn
{pad_ref, %{direction: :input, flow_control: :auto}} -> [pad_ref]
_other -> []
end)
|> AutoFlowUtils.auto_adjust_atomic_demand(state)
AutoFlowUtils.pop_queues_and_bump_demand(state)

# state.pads_data
# |> Enum.flat_map(fn
# {pad_ref, %{direction: :input, flow_control: :auto}} -> [pad_ref]
# _other -> []
# end)
# |> AutoFlowUtils.auto_adjust_atomic_demand(state)
end
end
14 changes: 13 additions & 1 deletion lib/membrane/core/element/event_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,19 @@ defmodule Membrane.Core.Element.EventController do

# event goes to the auto flow control queue
pad_ref in state.awaiting_auto_input_pads ->
AutoFlowUtils.store_event_in_queue(pad_ref, event, state)
with %Membrane.Core.Events.EndOfStream{} <- event do
PadModel.get_data!(state, pad_ref, :auto_flow_queue)
|> IO.inspect(label: "AFQ 1 #{inspect(state.name)}", limit: :infinity)
end

state = AutoFlowUtils.store_event_in_queue(pad_ref, event, state)

with %Membrane.Core.Events.EndOfStream{} <- event do
PadModel.get_data!(state, pad_ref, :auto_flow_queue)
|> IO.inspect(label: "AFQ 2 #{inspect(state.name)}", limit: :infinity)
end

state

true ->
exec_handle_event(pad_ref, event, state)
Expand Down
34 changes: 22 additions & 12 deletions lib/membrane/core/element/pad_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -233,12 +233,19 @@ defmodule Membrane.Core.Element.PadController do
Map.update!(state, :pad_refs, &List.delete(&1, pad_ref))
|> PadModel.pop_data!(pad_ref)

IO.inspect(pad_ref, label: "PAD REF")
IO.inspect(state, label: "PRZED", limit: :infinity)

with %{direction: :input, flow_control: :auto, other_effective_flow_control: :pull} <-
pad_data do
EffectiveFlowController.resolve_effective_flow_control(state)
else
_pad_data -> state
end
# |> Map.update!(:satisfied_auto_output_pads, &MapSet.delete(&1, pad_ref))
# |> Map.update!(:awaiting_auto_input_pads, &MapSet.delete(&1, pad_ref))
|> IO.inspect(label: "PO", limit: :infinity)
|> AutoFlowUtils.pop_queues_and_bump_demand()
else
{:ok, %{availability: :always}} when state.terminating? ->
state
Expand Down Expand Up @@ -484,18 +491,21 @@ defmodule Membrane.Core.Element.PadController do
def remove_pad_associations(pad_ref, state) do
case PadModel.get_data!(state, pad_ref) do
%{flow_control: :auto} = pad_data ->
state =
Enum.reduce(pad_data.associated_pads, state, fn pad, state ->
PadModel.update_data!(state, pad, :associated_pads, &List.delete(&1, pad_data.ref))
end)
|> PadModel.set_data!(pad_ref, :associated_pads, [])
|> Map.update!(:satisfied_auto_output_pads, &MapSet.delete(&1, pad_ref))
|> Map.update!(:awaiting_auto_input_pads, &MapSet.delete(&1, pad_ref))
|> AutoFlowUtils.pop_auto_flow_queues_while_needed()

if pad_data.direction == :output,
do: AutoFlowUtils.auto_adjust_atomic_demand(pad_data.associated_pads, state),
else: state
# state =
Enum.reduce(pad_data.associated_pads, state, fn pad, state ->
PadModel.update_data!(state, pad, :associated_pads, &List.delete(&1, pad_data.ref))
end)
|> PadModel.set_data!(pad_ref, :associated_pads, [])
|> Map.update!(:satisfied_auto_output_pads, &MapSet.delete(&1, pad_ref))
|> Map.update!(:awaiting_auto_input_pads, &MapSet.delete(&1, pad_ref))

# |> AutoFlowUtils.pop_queues_and_bump_demand()

# |> AutoFlowUtils.pop_auto_flow_queues_while_needed()

# if pad_data.direction == :output,
# do: AutoFlowUtils.auto_adjust_atomic_demand(pad_data.associated_pads, state),
# else: state

_pad_data ->
state
Expand Down
2 changes: 2 additions & 0 deletions lib/membrane/core/element/state.ex
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ defmodule Membrane.Core.Element.State do
setup_incomplete?: boolean(),
effective_flow_control: EffectiveFlowController.effective_flow_control(),
handling_action?: boolean(),
popping_queue?: boolean(),
pads_to_snapshot: MapSet.t(),
stalker: Membrane.Core.Stalker.t(),
satisfied_auto_output_pads: MapSet.t(),
Expand Down Expand Up @@ -74,6 +75,7 @@ defmodule Membrane.Core.Element.State do
:setup_incomplete?,
:supplying_demand?,
:handling_action?,
:popping_queue?,
:stalker,
:resource_guard,
:subprocess_supervisor,
Expand Down
19 changes: 1 addition & 18 deletions test/membrane/integration/auto_demands_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -156,24 +156,7 @@ defmodule Membrane.Integration.AutoDemandsTest do
IO.puts("START OF THE ASSERTIONS")

Enum.each(1..100_000, fn payload ->
IO.puts("ASSERTION NO. #{inspect(payload)}")

if payload == 801 do
Process.sleep(500)

for name <- [:source, :tee, :left_sink] do
Pipeline.get_child_pid!(pipeline, name)
|> :sys.get_state()
|> IO.inspect(label: "NAME OF #{inspect(name)}", limit: :infinity)
end

Pipeline.get_child_pid!(pipeline, :left_sink)
|> :sys.get_state()
|> get_in([:pads_data, :input, :input_queue])
|> Map.get(:atomic_demand)
|> Membrane.Core.Element.AtomicDemand.get()
|> IO.inspect(label: "ATOMIC DEMAND VALUE")
end
# IO.puts("ASSERTION NO. #{inspect(payload)}")

assert_sink_buffer(pipeline, :left_sink, buffer)
assert buffer.payload == payload
Expand Down

0 comments on commit 7be81c9

Please sign in to comment.