Skip to content

Commit

Permalink
Bump default input queue size to 100
Browse files Browse the repository at this point in the history
  • Loading branch information
FelonEkonom committed Mar 27, 2024
1 parent 6c585e4 commit 415e7c5
Show file tree
Hide file tree
Showing 9 changed files with 29 additions and 23 deletions.
12 changes: 6 additions & 6 deletions lib/membrane/core/element/action_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ defmodule Membrane.Core.Element.ActionHandler do
%State{type: type} = state
)
when is_pad_ref(pad_ref) and is_demand_size(size) and type in [:sink, :filter, :endpoint] do
supply_demand(pad_ref, size, state)
delay_supplying_demand(pad_ref, size, state)
end

@impl CallbackHandler
Expand Down Expand Up @@ -403,28 +403,28 @@ defmodule Membrane.Core.Element.ActionHandler do
end
end

@spec supply_demand(
@spec delay_supplying_demand(
Pad.ref(),
Action.demand_size(),
State.t()
) :: State.t()
defp supply_demand(pad_ref, 0, state) do
defp delay_supplying_demand(pad_ref, 0, state) do
Membrane.Logger.debug_verbose("Ignoring demand of size of 0 on pad #{inspect(pad_ref)}")
state
end

defp supply_demand(pad_ref, size, _state)
defp delay_supplying_demand(pad_ref, size, _state)
when is_integer(size) and size < 0 do
raise ElementError,
"Tried to request a negative demand of size #{inspect(size)} on pad #{inspect(pad_ref)}"
end

defp supply_demand(pad_ref, size, state) do
defp delay_supplying_demand(pad_ref, size, state) do
with %{direction: :input, flow_control: :manual} <-
PadModel.get_data!(state, pad_ref) do
# todo: get_data! above could be eradicated
state = ManualFlowController.update_demand(pad_ref, size, state)
ManualFlowController.delay_demand_supply(pad_ref, state)
ManualFlowController.delay_supplying_demand(pad_ref, state)
else
%{direction: :output} ->
raise PadDirectionError, action: :demand, direction: :output, pad: pad_ref
Expand Down
7 changes: 6 additions & 1 deletion lib/membrane/core/element/buffer_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,12 @@ defmodule Membrane.Core.Element.BufferController do
end

# todo: move it to the flow controllers?
@spec do_handle_ingoing_buffers(Pad.ref(), PadModel.pad_data(), [Buffer.t()] | Buffer.t(), State.t()) ::
@spec do_handle_ingoing_buffers(
Pad.ref(),
PadModel.pad_data(),
[Buffer.t()] | Buffer.t(),
State.t()
) ::
State.t()
defp do_handle_ingoing_buffers(pad_ref, %{flow_control: :auto} = data, buffers, state) do
%{demand: demand, demand_unit: demand_unit, stalker_metrics: stalker_metrics} = data
Expand Down
11 changes: 6 additions & 5 deletions lib/membrane/core/element/manual_flow_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -77,18 +77,14 @@ defmodule Membrane.Core.Element.ManualFlowController do

@spec supply_demand(Pad.ref(), State.t()) :: State.t()
def supply_demand(pad_ref, %State{delay_demands?: true} = state) do
delay_demand_supply(pad_ref, state)
delay_supplying_demand(pad_ref, state)
end

def supply_demand(pad_ref, state) do
do_supply_demand(pad_ref, state)
|> handle_delayed_demands()
end

def delay_demand_supply(pad_ref, state) do
Map.update!(state, :delayed_demands, &MapSet.put(&1, {pad_ref, :supply}))
end

defp do_supply_demand(pad_ref, state) do
# marking is state that actual demand supply has been started (note changing back to false when finished)
state = %State{state | delay_demands?: true}
Expand All @@ -103,6 +99,11 @@ defmodule Membrane.Core.Element.ManualFlowController do
%State{state | delay_demands?: false}
end

@spec delay_supplying_demand(Pad.ref(), State.t()) :: State.t()
def delay_supplying_demand(pad_ref, state) do
Map.update!(state, :delayed_demands, &MapSet.put(&1, {pad_ref, :supply}))
end

@spec update_demand(
Pad.ref(),
non_neg_integer() | (non_neg_integer() -> non_neg_integer()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,20 +54,20 @@ defmodule Membrane.Core.Element.ManualFlowController.InputQueue do

defstruct @enforce_keys ++ [size: 0, demand: 0]

@default_target_size_factor 40
@default_target_size_factor 100

@spec default_min_demand_factor() :: number()
def default_min_demand_factor, do: 0.25

@spec init(%{
@spec new(%{
inbound_demand_unit: Buffer.Metric.unit(),
outbound_demand_unit: Buffer.Metric.unit(),
atomic_demand: AtomicDemand.t(),
pad_ref: Pad.ref(),
log_tag: String.t(),
target_size: pos_integer() | nil
}) :: t()
def init(config) do
def new(config) do
%{
inbound_demand_unit: inbound_demand_unit,
outbound_demand_unit: outbound_demand_unit,
Expand Down
2 changes: 1 addition & 1 deletion lib/membrane/core/element/pad_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ defmodule Membrane.Core.Element.PadController do
} = pad_data

input_queue =
InputQueue.init(%{
InputQueue.new(%{
inbound_demand_unit: other_pad_info[:demand_unit] || this_demand_unit,
outbound_demand_unit: this_demand_unit,
atomic_demand: atomic_demand,
Expand Down
2 changes: 1 addition & 1 deletion test/membrane/core/element/event_controller_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ defmodule Membrane.Core.Element.EventControllerTest do
})

input_queue =
InputQueue.init(%{
InputQueue.new(%{
inbound_demand_unit: :buffers,
outbound_demand_unit: :buffers,
pad_ref: :some_pad,
Expand Down
8 changes: 4 additions & 4 deletions test/membrane/core/element/input_queue_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ defmodule Membrane.Core.Element.InputQueueTest do
end

test "return InputQueue struct and send demand message", context do
assert InputQueue.init(%{
assert InputQueue.new(%{
inbound_demand_unit: context.inbound_demand_unit,
outbound_demand_unit: context.outbound_demand_unit,
pad_ref: context.pad_ref,
Expand Down Expand Up @@ -187,7 +187,7 @@ defmodule Membrane.Core.Element.InputQueueTest do
describe ".take/2 should" do
setup do
input_queue =
InputQueue.init(%{
InputQueue.new(%{
inbound_demand_unit: :buffers,
outbound_demand_unit: :buffers,
pad_ref: :output_pad_ref,
Expand Down Expand Up @@ -302,7 +302,7 @@ defmodule Membrane.Core.Element.InputQueueTest do
atomic_demand = new_atomic_demand()

queue =
InputQueue.init(%{
InputQueue.new(%{
inbound_demand_unit: :bytes,
outbound_demand_unit: :buffers,
atomic_demand: atomic_demand,
Expand Down Expand Up @@ -339,7 +339,7 @@ defmodule Membrane.Core.Element.InputQueueTest do
atomic_demand = new_atomic_demand()

queue =
InputQueue.init(%{
InputQueue.new(%{
inbound_demand_unit: :buffers,
outbound_demand_unit: :bytes,
atomic_demand: atomic_demand,
Expand Down
2 changes: 1 addition & 1 deletion test/membrane/core/element/lifecycle_controller_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ defmodule Membrane.Core.Element.LifecycleControllerTest do
})

input_queue =
InputQueue.init(%{
InputQueue.new(%{
inbound_demand_unit: :buffers,
outbound_demand_unit: :buffers,
atomic_demand: atomic_demand,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ defmodule Membrane.Core.Element.StreamFormatControllerTest do
})

input_queue =
InputQueue.init(%{
InputQueue.new(%{
inbound_demand_unit: :buffers,
outbound_demand_unit: :buffers,
atomic_demand: atomic_demand,
Expand Down

0 comments on commit 415e7c5

Please sign in to comment.