Skip to content

Commit

Permalink
Introduce AutoFlowController
Browse files Browse the repository at this point in the history
  • Loading branch information
FelonEkonom committed Mar 27, 2024
1 parent 9b05f29 commit bf9aee0
Show file tree
Hide file tree
Showing 11 changed files with 24 additions and 23 deletions.
2 changes: 1 addition & 1 deletion lib/membrane/core/element.ex
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ defmodule Membrane.Core.Element do

alias Membrane.Core.Element.{
BufferController,
DemandController,
DemandController, AutoFlowController,
EffectiveFlowController,
EventController,
ManualFlowController,
Expand Down
9 changes: 5 additions & 4 deletions lib/membrane/core/element/action_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ defmodule Membrane.Core.Element.ActionHandler do
}

alias Membrane.Core.Element.{
DemandController,
AutoFlowController,
DemandController, AutoFlowController,
State,
StreamFormatController,
ManualFlowController
Expand Down Expand Up @@ -183,13 +184,13 @@ defmodule Membrane.Core.Element.ActionHandler do
@impl CallbackHandler
def handle_action({:pause_auto_demand, in_ref}, _cb, _params, %State{type: type} = state)
when type in [:sink, :filter, :endpoint] do
DemandController.Auto.pause_demands(in_ref, state)
AutoFlowController.pause_demands(in_ref, state)
end

@impl CallbackHandler
def handle_action({:resume_auto_demand, in_ref}, _cb, _params, %State{type: type} = state)
when type in [:sink, :filter, :endpoint] do
DemandController.Auto.resume_demands(in_ref, state)
AutoFlowController.resume_demands(in_ref, state)
end

@impl CallbackHandler
Expand Down Expand Up @@ -483,7 +484,7 @@ defmodule Membrane.Core.Element.ActionHandler do
ManualFlowController.remove_pad_from_delayed_demands(pad_ref, state)
|> Map.update!(:satisfied_auto_output_pads, &MapSet.delete(&1, pad_ref))
|> PadModel.set_data!(pad_ref, :end_of_stream?, true)
|> DemandController.Auto.pop_queues_and_bump_demand()
|> AutoFlowController.pop_queues_and_bump_demand()
else
%{direction: :input} ->
raise PadDirectionError, action: "end of stream", direction: :input, pad: pad_ref
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
defmodule Membrane.Core.Element.DemandController.Auto do
defmodule Membrane.Core.Element.AutoFlowController do
@moduledoc false

alias Membrane.Buffer
Expand Down
7 changes: 4 additions & 3 deletions lib/membrane/core/element/buffer_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ defmodule Membrane.Core.Element.BufferController do

alias Membrane.Core.Element.{
ActionHandler,
AutoFlowController,
CallbackContext,
DemandController,
DemandController, AutoFlowController,
EventController,
InputQueue,
ManualFlowController,
Expand Down Expand Up @@ -70,10 +71,10 @@ defmodule Membrane.Core.Element.BufferController do
:atomics.put(stalker_metrics.demand, 1, demand - buf_size)

if state.effective_flow_control == :pull and MapSet.size(state.satisfied_auto_output_pads) > 0 do
DemandController.Auto.store_buffers_in_queue(pad_ref, buffers, state)
AutoFlowController.store_buffers_in_queue(pad_ref, buffers, state)
else
state = exec_buffer_callback(pad_ref, buffers, state)
DemandController.Auto.auto_adjust_atomic_demand(pad_ref, state)
AutoFlowController.auto_adjust_atomic_demand(pad_ref, state)
end
end

Expand Down
5 changes: 1 addition & 4 deletions lib/membrane/core/element/demand_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,6 @@ defmodule Membrane.Core.Element.DemandController do

use Bunch

alias Membrane.Core.Element.ManualFlowController
alias __MODULE__.{Auto, Manual}

alias Membrane.Buffer

alias Membrane.Core.CallbackHandler
Expand Down Expand Up @@ -63,7 +60,7 @@ defmodule Membrane.Core.Element.DemandController do
if atomic_value > 0 do
state
|> Map.update!(:satisfied_auto_output_pads, &MapSet.delete(&1, pad_data.ref))
|> Auto.pop_queues_and_bump_demand()
|> AutoFlowController.pop_queues_and_bump_demand()
else
state
end
Expand Down
3 changes: 2 additions & 1 deletion lib/membrane/core/element/effective_flow_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ defmodule Membrane.Core.Element.EffectiveFlowController do
# Effective flow control of a single element can switch between :push and :pull many times during the element's lifetime.

alias Membrane.Core.Element.DemandController
alias Membrane.Core.Element.AutoFlowController
alias Membrane.Core.Element.{AtomicDemand, State}

require Membrane.Core.Child.PadModel, as: PadModel
Expand Down Expand Up @@ -140,6 +141,6 @@ defmodule Membrane.Core.Element.EffectiveFlowController do
state
end)
end
|> DemandController.Auto.pop_queues_and_bump_demand()
|> AutoFlowController.pop_queues_and_bump_demand()
end
end
4 changes: 2 additions & 2 deletions lib/membrane/core/element/event_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ defmodule Membrane.Core.Element.EventController do
alias Membrane.Core.Element.{
ActionHandler,
CallbackContext,
DemandController,
DemandController, AutoFlowController,
InputQueue,
ManualFlowController,
AutoFlowController,
Expand Down Expand Up @@ -55,7 +55,7 @@ defmodule Membrane.Core.Element.EventController do

# event goes to the auto flow control queue
not async? and MapSet.member?(state.awaiting_auto_input_pads, pad_ref) ->
DemandController.Auto.store_event_in_queue(pad_ref, event, state)
AutoFlowController.store_event_in_queue(pad_ref, event, state)

true ->
exec_handle_event(pad_ref, event, state)
Expand Down
2 changes: 1 addition & 1 deletion lib/membrane/core/element/manual_flow_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ defmodule Membrane.Core.Element.ManualFlowController do
ActionHandler,
BufferController,
CallbackContext,
DemandController,
DemandController, AutoFlowController,
EventController,
InputQueue,
State,
Expand Down
7 changes: 4 additions & 3 deletions lib/membrane/core/element/pad_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ defmodule Membrane.Core.Element.PadController do
alias Membrane.Core.Element.{
ActionHandler,
AtomicDemand,
AutoFlowController,
CallbackContext,
DemandController,
DemandController, AutoFlowController,
EffectiveFlowController,
EventController,
InputQueue,
Expand Down Expand Up @@ -241,7 +242,7 @@ defmodule Membrane.Core.Element.PadController do
|> Map.update!(:satisfied_auto_output_pads, &MapSet.delete(&1, pad_ref))
|> Map.update!(:awaiting_auto_input_pads, &MapSet.delete(&1, pad_ref))
|> Map.update!(:auto_input_pads, &List.delete(&1, pad_ref))
|> DemandController.Auto.pop_queues_and_bump_demand()
|> AutoFlowController.pop_queues_and_bump_demand()
else
{:ok, %{availability: :always}} when state.terminating? ->
state
Expand Down Expand Up @@ -335,7 +336,7 @@ defmodule Membrane.Core.Element.PadController do
Map.update!(state, :satisfied_auto_output_pads, &MapSet.put(&1, pad_data.ref))

%{direction: :input, flow_control: :auto} ->
DemandController.Auto.auto_adjust_atomic_demand(endpoint.pad_ref, state)
AutoFlowController.auto_adjust_atomic_demand(endpoint.pad_ref, state)
|> Map.update!(:auto_input_pads, &[endpoint.pad_ref | &1])

_pad_data ->
Expand Down
4 changes: 2 additions & 2 deletions lib/membrane/core/element/stream_format_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ defmodule Membrane.Core.Element.StreamFormatController do
alias Membrane.Core.Element.{
ActionHandler,
CallbackContext,
DemandController,
DemandController, AutoFlowController,
InputQueue,
PlaybackQueue,
State
Expand Down Expand Up @@ -48,7 +48,7 @@ defmodule Membrane.Core.Element.StreamFormatController do

# stream format goes to the auto flow control queue
pad_ref in state.awaiting_auto_input_pads ->
DemandController.Auto.store_stream_format_in_queue(pad_ref, stream_format, state)
AutoFlowController.store_stream_format_in_queue(pad_ref, stream_format, state)

true ->
exec_handle_stream_format(pad_ref, stream_format, state)
Expand Down
2 changes: 1 addition & 1 deletion lib/membrane/element/pad_data.ex
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ defmodule Membrane.Element.PadData do
# with input pad, but hasn't been sent yet by the element with output pad. Detects toilet overflow as well.
atomic_demand: private_field,

# Field used in DemandController.Auto and InputQueue, to caluclate, how much AtomicDemand should be increased.
# Field used in AutoFlowController and InputQueue, to caluclate, how much AtomicDemand should be increased.
# Contains amount of data (:buffers/:bytes), that has been demanded from the element on the other side of link, but
# hasn't arrived yet. Unused for output pads.
manual_demand_size: private_field,
Expand Down

0 comments on commit bf9aee0

Please sign in to comment.