Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into fix-handling-action…
Browse files Browse the repository at this point in the history
…s-bug
  • Loading branch information
FelonEkonom committed Feb 26, 2024
2 parents 2f78509 + dc5f653 commit 02db64b
Show file tree
Hide file tree
Showing 34 changed files with 748 additions and 258 deletions.
15 changes: 9 additions & 6 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
# Changelog

## 1.1.0-rc0
* Deprecate `handle_spec_started/3` callback in Bins and Pipelines. [#708](https://github.com/membraneframework/membrane_core/pull/708)
* Handle buffers from input pads having `flow_control: :auto` only if demand on all output pads having `flow_control: :auto` is positive. [#693](https://github.com/membraneframework/membrane_core/pull/693)

## 1.0.1
* Specify the order in which state fields will be printed in the error logs. [#614](https://github.com/membraneframework/membrane_core/pull/614)
* Fix clock selection. [#626](https://github.com/membraneframework/membrane_core/pull/626)
* Log messages in the default handle_info implementation. [#680](https://github.com/membraneframework/membrane_core/pull/680)
* Fix typespecs in Membrane.UtilitySupervisor. [#681](https://github.com/membraneframework/membrane_core/pull/681)
* Improve callback return types and group actions types. [#702](https://github.com/membraneframework/membrane_core/pull/702)
* Fix bug in the order of handling actions from callbacks. [#708](https://github.com/membraneframework/membrane_core/pull/708)
* Deprecate `handle_spec_started/3` callback in Bins and Pipelines. [#708](https://github.com/membraneframework/membrane_core/pull/708)
* Fix clock selection [#626](https://github.com/membraneframework/membrane_core/pull/626)
* Log messages in the default handle_info implementation [#680](https://github.com/membraneframework/membrane_core/pull/680)
* Fix typespecs in Membrane.UtilitySupervisor [#681](https://github.com/membraneframework/membrane_core/pull/681)
* Improve callback return types and group actions types [#702](https://github.com/membraneframework/membrane_core/pull/702)
* Add `crash_reason` to `handle_crash_group_down/3` callback context in bins and pipelines. [#720](https://github.com/membraneframework/membrane_core/pull/720)

## 1.0.0
* Introduce `:remove_link` action in pipelines and bins.
Expand Down
5 changes: 3 additions & 2 deletions lib/membrane/bin/callback_context.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ defmodule Membrane.Bin.CallbackContext do
Field `:start_of_stream_received?` is present only in
`c:Membrane.Bin.handle_element_end_of_stream/4`.
Fields `:members` and `:crash_initiator` are present only in
`c:Membrane.Pipeline.handle_crash_group_down/3`.
Fields `:members`, `:crash_initiator` and `crash_reason` and are present only in
`c:Membrane.Bin.handle_crash_group_down/3`.
"""
@type t :: %{
:clock => Membrane.Clock.t(),
Expand All @@ -27,6 +27,7 @@ defmodule Membrane.Bin.CallbackContext do
optional(:pad_options) => map(),
optional(:members) => [Membrane.Child.name()],
optional(:crash_initiator) => Membrane.Child.name(),
optional(:crash_reason) => :normal | :shutdown | {:shutdown, term()} | term(),
optional(:start_of_stream_received?) => boolean()
}
end
6 changes: 5 additions & 1 deletion lib/membrane/core/bin/callback_context.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@ defmodule Membrane.Core.Bin.CallbackContext do

@type optional_fields ::
[pad_options: map()]
| [members: [Membrane.Child.name()], crash_initiator: Membrane.Child.name()]
| [
members: [Membrane.Child.name()],
crash_initiator: Membrane.Child.name(),
crash_reason: :normal | :shutdown | {:shutdown, term()} | term()
]
| [start_of_stream_received?: boolean()]

@spec from_state(Membrane.Core.Bin.State.t(), optional_fields()) ::
Expand Down
1 change: 0 additions & 1 deletion lib/membrane/core/child/pad_model.ex
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ defmodule Membrane.Core.Child.PadModel do
input_queue: Membrane.Core.Element.InputQueue.t() | nil,
options: %{optional(atom) => any},
auto_demand_size: pos_integer() | nil,
associated_pads: [Pad.ref()] | nil,
sticky_events: [Membrane.Event.t()],
stalker_metrics: %{atom => :atomics.atomics_ref()}
}
Expand Down
6 changes: 5 additions & 1 deletion lib/membrane/core/element.ex
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,12 @@ defmodule Membrane.Core.Element do
setup_incomplete?: false,
effective_flow_control: :push,
handling_action?: false,
popping_auto_flow_queue?: false,
pads_to_snapshot: MapSet.new(),
stalker: options.stalker
stalker: options.stalker,
satisfied_auto_output_pads: MapSet.new(),
awaiting_auto_input_pads: MapSet.new(),
auto_input_pads: []
}
|> PadSpecHandler.init_pads()

Expand Down
12 changes: 8 additions & 4 deletions lib/membrane/core/element/action_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ defmodule Membrane.Core.Element.ActionHandler do
alias Membrane.Core.Element.{
DemandController,
DemandHandler,
PadController,
State,
StreamFormatController
}

alias Membrane.Core.Element.DemandController.AutoFlowUtils
alias Membrane.Core.{Events, TimerController}
alias Membrane.Element.Action

Expand Down Expand Up @@ -201,7 +201,11 @@ defmodule Membrane.Core.Element.ActionHandler do
_other -> :output
end

pads = state |> PadModel.filter_data(%{direction: dir}) |> Map.keys()
pads =
Enum.flat_map(state.pads_data, fn
{pad_ref, %{direction: ^dir}} -> [pad_ref]
_pad_entry -> []
end)

Enum.reduce(pads, state, fn pad, state ->
action =
Expand Down Expand Up @@ -467,10 +471,10 @@ defmodule Membrane.Core.Element.ActionHandler do
@spec handle_outgoing_event(Pad.ref(), Event.t(), State.t()) :: State.t()
defp handle_outgoing_event(pad_ref, %Events.EndOfStream{}, state) do
with %{direction: :output, end_of_stream?: false} <- PadModel.get_data!(state, pad_ref) do
state = PadController.remove_pad_associations(pad_ref, state)

DemandHandler.remove_pad_from_delayed_demands(pad_ref, state)
|> Map.update!(:satisfied_auto_output_pads, &MapSet.delete(&1, pad_ref))
|> PadModel.set_data!(pad_ref, :end_of_stream?, true)
|> AutoFlowUtils.pop_queues_and_bump_demand()
else
%{direction: :input} ->
raise PadDirectionError, action: "end of stream", direction: :input, pad: pad_ref
Expand Down
23 changes: 13 additions & 10 deletions lib/membrane/core/element/atomic_demand.ex
Original file line number Diff line number Diff line change
Expand Up @@ -139,14 +139,14 @@ defmodule Membrane.Core.Element.AtomicDemand do
:ok
end

@spec decrease(t, non_neg_integer()) :: t
@spec decrease(t, non_neg_integer()) :: {{:decreased, integer()}, t} | {:unchanged, t}
def decrease(%__MODULE__{} = atomic_demand, value) do
atomic_demand = Map.update!(atomic_demand, :buffered_decrementation, &(&1 + value))

if atomic_demand.buffered_decrementation >= atomic_demand.throttling_factor do
flush_buffered_decrementation(atomic_demand)
else
atomic_demand
{:unchanged, atomic_demand}
end
end

Expand All @@ -164,14 +164,17 @@ defmodule Membrane.Core.Element.AtomicDemand do

atomic_demand = %{atomic_demand | buffered_decrementation: 0}

if not atomic_demand.toilet_overflowed? and
get_receiver_status(atomic_demand) == {:resolved, :pull} and
get_sender_status(atomic_demand) == {:resolved, :push} and
-1 * atomic_demand_value > atomic_demand.toilet_capacity do
overflow(atomic_demand, atomic_demand_value)
else
atomic_demand
end
atomic_demand =
if not atomic_demand.toilet_overflowed? and
get_receiver_status(atomic_demand) == {:resolved, :pull} and
get_sender_status(atomic_demand) == {:resolved, :push} and
-1 * atomic_demand_value > atomic_demand.toilet_capacity do
overflow(atomic_demand, atomic_demand_value)
else
atomic_demand
end

{{:decreased, atomic_demand_value}, atomic_demand}
end

defp overflow(atomic_demand, atomic_demand_value) do
Expand Down
14 changes: 7 additions & 7 deletions lib/membrane/core/element/buffer_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,12 @@ defmodule Membrane.Core.Element.BufferController do
state = PadModel.set_data!(state, pad_ref, :demand, demand - buf_size)
:atomics.put(stalker_metrics.demand, 1, demand - buf_size)

state = AutoFlowUtils.auto_adjust_atomic_demand(pad_ref, state)
exec_buffer_callback(pad_ref, buffers, state)
if state.effective_flow_control == :pull and MapSet.size(state.satisfied_auto_output_pads) > 0 do
AutoFlowUtils.store_buffers_in_queue(pad_ref, buffers, state)
else
state = exec_buffer_callback(pad_ref, buffers, state)
AutoFlowUtils.auto_adjust_atomic_demand(pad_ref, state)
end
end

defp do_handle_buffer(pad_ref, %{flow_control: :manual} = data, buffers, state) do
Expand All @@ -93,11 +97,7 @@ defmodule Membrane.Core.Element.BufferController do
@doc """
Executes `handle_buffer` callback.
"""
@spec exec_buffer_callback(
Pad.ref(),
[Buffer.t()] | Buffer.t(),
State.t()
) :: State.t()
@spec exec_buffer_callback(Pad.ref(), [Buffer.t()], State.t()) :: State.t()
def exec_buffer_callback(pad_ref, buffers, %State{type: :filter} = state) do
Telemetry.report_metric("buffer", 1, inspect(pad_ref))

Expand Down
30 changes: 20 additions & 10 deletions lib/membrane/core/element/demand_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,20 @@ defmodule Membrane.Core.Element.DemandController do

@spec snapshot_atomic_demand(Pad.ref(), State.t()) :: State.t()
def snapshot_atomic_demand(pad_ref, state) do
with {:ok, pad_data} <- PadModel.get_data(state, pad_ref),
with {:ok, pad_data} when not pad_data.end_of_stream? <- PadModel.get_data(state, pad_ref),
%State{playback: :playing} <- state do
if pad_data.direction == :input,
do: raise("cannot snapshot atomic counter in input pad")

do_snapshot_atomic_demand(pad_data, state)
else
{:ok, %{end_of_stream?: true}} ->
Membrane.Logger.debug_verbose(
"Skipping snapshot of pad #{inspect(pad_ref)}, because it has flag :end_of_stream? set to true"
)

state

{:error, :unknown_pad} ->
# We've got a :atomic_demand_increased message on already unlinked pad
state
Expand All @@ -43,13 +50,10 @@ defmodule Membrane.Core.Element.DemandController do
%{flow_control: :auto} = pad_data,
%{effective_flow_control: :pull} = state
) do
%{
atomic_demand: atomic_demand,
associated_pads: associated_pads
} = pad_data

if AtomicDemand.get(atomic_demand) > 0 do
AutoFlowUtils.auto_adjust_atomic_demand(associated_pads, state)
if AtomicDemand.get(pad_data.atomic_demand) > 0 do
state
|> Map.update!(:satisfied_auto_output_pads, &MapSet.delete(&1, pad_data.ref))
|> AutoFlowUtils.pop_queues_and_bump_demand()
else
state
end
Expand Down Expand Up @@ -91,9 +95,15 @@ defmodule Membrane.Core.Element.DemandController do
buffers_size = Buffer.Metric.from_unit(pad_data.demand_unit).buffers_size(buffers)

demand = pad_data.demand - buffers_size
atomic_demand = AtomicDemand.decrease(pad_data.atomic_demand, buffers_size)
{decrease_result, atomic_demand} = AtomicDemand.decrease(pad_data.atomic_demand, buffers_size)

PadModel.set_data!(state, pad_ref, %{
with {:decreased, new_value} when new_value <= 0 <- decrease_result,
%{flow_control: :auto} <- pad_data do
Map.update!(state, :satisfied_auto_output_pads, &MapSet.put(&1, pad_ref))
else
_other -> state
end
|> PadModel.set_data!(pad_ref, %{
pad_data
| demand: demand,
atomic_demand: atomic_demand
Expand Down
Loading

0 comments on commit 02db64b

Please sign in to comment.