From 927285aded1051f8ee5a6253d5d16a1912943400 Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Mon, 29 Jan 2024 17:05:06 +0100 Subject: [PATCH] WIP Fix bug in executing handle_buffer while handling actions from previous callback --- lib/membrane/core/callback_handler.ex | 19 +++++++++++++++++++ lib/membrane/core/element/action_handler.ex | 4 +++- lib/membrane/core/element/demand_handler.ex | 9 +++++++++ lib/membrane/core/element/event_controller.ex | 2 ++ .../integration/toilet_forwarding_test.exs | 3 ++- 5 files changed, 35 insertions(+), 2 deletions(-) diff --git a/lib/membrane/core/callback_handler.ex b/lib/membrane/core/callback_handler.ex index 30cb8d3bc..5eb287c5a 100644 --- a/lib/membrane/core/callback_handler.ex +++ b/lib/membrane/core/callback_handler.ex @@ -7,6 +7,8 @@ defmodule Membrane.Core.CallbackHandler do use Bunch + alias Membrane.Core.Element.DemandHandler + alias Membrane.Core.Component alias Membrane.CallbackError require Membrane.Logger @@ -191,6 +193,9 @@ defmodule Membrane.Core.CallbackHandler do was_handling_action? = state.handling_action? state = %{state | handling_action?: true} + was_supplying_demand? = state[:supplying_demand?] + state = if Component.is_element?(state), do: %{state | supplying_demand?: true}, else: state + state = Enum.reduce(actions, state, fn action, state -> try do @@ -210,6 +215,20 @@ defmodule Membrane.Core.CallbackHandler do do: state, else: %{state | handling_action?: false} + state = + cond do + was_supplying_demand? -> state + Component.is_element?(state) -> %{state | supplying_demand?: false} + true -> state + end + + state = + if Component.is_element?(state) and not was_supplying_demand? do + DemandHandler.handle_delayed_demands(state) + else + state + end + handler_module.handle_end_of_actions(state) end end diff --git a/lib/membrane/core/element/action_handler.ex b/lib/membrane/core/element/action_handler.ex index 0812367b6..5a5bcfff3 100644 --- a/lib/membrane/core/element/action_handler.ex +++ b/lib/membrane/core/element/action_handler.ex @@ -467,7 +467,9 @@ defmodule Membrane.Core.Element.ActionHandler do defp handle_outgoing_event(pad_ref, %Events.EndOfStream{}, state) do with %{direction: :output, end_of_stream?: false} <- PadModel.get_data!(state, pad_ref) do state = PadController.remove_pad_associations(pad_ref, state) - PadModel.set_data!(state, pad_ref, :end_of_stream?, true) + + DemandHandler.remove_pad_from_delayed_demands(pad_ref, state) + |> PadModel.set_data!(pad_ref, :end_of_stream?, true) else %{direction: :input} -> raise PadDirectionError, action: "end of stream", direction: :input, pad: pad_ref diff --git a/lib/membrane/core/element/demand_handler.ex b/lib/membrane/core/element/demand_handler.ex index 74c6ce05a..1d22d7eb9 100644 --- a/lib/membrane/core/element/demand_handler.ex +++ b/lib/membrane/core/element/demand_handler.ex @@ -146,6 +146,15 @@ defmodule Membrane.Core.Element.DemandHandler do end end + @spec remove_pad_from_delayed_demands(Pad.ref(), State.t()) :: State.t() + def remove_pad_from_delayed_demands(pad_ref, state) do + Map.update!(state, :delayed_demands, fn delayed_demands_set -> + delayed_demands_set + |> MapSet.delete({pad_ref, :supply}) + |> MapSet.delete({pad_ref, :redemand}) + end) + end + @spec handle_input_queue_output( Pad.ref(), [InputQueue.output_value()], diff --git a/lib/membrane/core/element/event_controller.ex b/lib/membrane/core/element/event_controller.ex index 377b70a30..6d9bf1c2f 100644 --- a/lib/membrane/core/element/event_controller.ex +++ b/lib/membrane/core/element/event_controller.ex @@ -12,6 +12,7 @@ defmodule Membrane.Core.Element.EventController do alias Membrane.Core.Element.{ ActionHandler, CallbackContext, + DemandHandler, InputQueue, PadController, PlaybackQueue, @@ -98,6 +99,7 @@ defmodule Membrane.Core.Element.EventController do Membrane.Logger.debug("Received end of stream on pad #{inspect(pad_ref)}") state = PadModel.set_data!(state, pad_ref, :end_of_stream?, true) + state = DemandHandler.remove_pad_from_delayed_demands(pad_ref, state) state = PadController.remove_pad_associations(pad_ref, state) %{ diff --git a/test/membrane/integration/toilet_forwarding_test.exs b/test/membrane/integration/toilet_forwarding_test.exs index 1f240dfff..37097add8 100644 --- a/test/membrane/integration/toilet_forwarding_test.exs +++ b/test/membrane/integration/toilet_forwarding_test.exs @@ -234,7 +234,8 @@ defmodule Membrane.Integration.ToiletForwardingTest do ) for i <- 1..3000 do - assert_sink_buffer(pipeline, :sink, %Membrane.Buffer{payload: <>}) + assert_sink_buffer(pipeline, :sink, buffer) + assert %Membrane.Buffer{payload: <>} = buffer assert buff_idx == i end