diff --git a/lib/membrane/core/callback_handler.ex b/lib/membrane/core/callback_handler.ex index 30cb8d3bc..f300aafd4 100644 --- a/lib/membrane/core/callback_handler.ex +++ b/lib/membrane/core/callback_handler.ex @@ -7,6 +7,7 @@ defmodule Membrane.Core.CallbackHandler do use Bunch + alias Membrane.Core.Component alias Membrane.CallbackError require Membrane.Logger @@ -191,6 +192,9 @@ defmodule Membrane.Core.CallbackHandler do was_handling_action? = state.handling_action? state = %{state | handling_action?: true} + was_supplying_demand? = Map.get(state, :supplying_demand?, false) + state = if Component.is_element?(state), do: %{state | supplying_demand?: true}, else: state + state = Enum.reduce(actions, state, fn action, state -> try do @@ -210,6 +214,11 @@ defmodule Membrane.Core.CallbackHandler do do: state, else: %{state | handling_action?: false} + state = + if Component.is_element?(state) and not was_supplying_demand?, + do: %{state | supplying_demand?: false}, + else: state + handler_module.handle_end_of_actions(state) end end diff --git a/lib/membrane/core/element/action_handler.ex b/lib/membrane/core/element/action_handler.ex index 0812367b6..b4c9deed7 100644 --- a/lib/membrane/core/element/action_handler.ex +++ b/lib/membrane/core/element/action_handler.ex @@ -47,13 +47,20 @@ defmodule Membrane.Core.Element.ActionHandler do defguardp is_demand_size(size) when is_integer(size) or is_function(size) @impl CallbackHandler - def handle_end_of_actions(state) when not state.handling_action? do - Enum.reduce(state.pads_to_snapshot, state, &DemandController.snapshot_atomic_demand/2) - |> Map.put(:pads_to_snapshot, MapSet.new()) - end + def handle_end_of_actions(state) do + state = + with %{handling_action?: false} <- state do + Enum.reduce(state.pads_to_snapshot, state, &DemandController.snapshot_atomic_demand/2) + |> Map.put(:pads_to_snapshot, MapSet.new()) + end - @impl CallbackHandler - def handle_end_of_actions(state), do: state + state = + with %{supplying_demand?: false} <- state do + DemandHandler.handle_delayed_demands(state) + end + + state + end @impl CallbackHandler def handle_action({action, _}, :handle_init, _params, _state) @@ -467,7 +474,9 @@ defmodule Membrane.Core.Element.ActionHandler do defp handle_outgoing_event(pad_ref, %Events.EndOfStream{}, state) do with %{direction: :output, end_of_stream?: false} <- PadModel.get_data!(state, pad_ref) do state = PadController.remove_pad_associations(pad_ref, state) - PadModel.set_data!(state, pad_ref, :end_of_stream?, true) + + DemandHandler.remove_pad_from_delayed_demands(pad_ref, state) + |> PadModel.set_data!(pad_ref, :end_of_stream?, true) else %{direction: :input} -> raise PadDirectionError, action: "end of stream", direction: :input, pad: pad_ref diff --git a/lib/membrane/core/element/demand_handler.ex b/lib/membrane/core/element/demand_handler.ex index 74c6ce05a..1d22d7eb9 100644 --- a/lib/membrane/core/element/demand_handler.ex +++ b/lib/membrane/core/element/demand_handler.ex @@ -146,6 +146,15 @@ defmodule Membrane.Core.Element.DemandHandler do end end + @spec remove_pad_from_delayed_demands(Pad.ref(), State.t()) :: State.t() + def remove_pad_from_delayed_demands(pad_ref, state) do + Map.update!(state, :delayed_demands, fn delayed_demands_set -> + delayed_demands_set + |> MapSet.delete({pad_ref, :supply}) + |> MapSet.delete({pad_ref, :redemand}) + end) + end + @spec handle_input_queue_output( Pad.ref(), [InputQueue.output_value()], diff --git a/lib/membrane/core/element/event_controller.ex b/lib/membrane/core/element/event_controller.ex index 377b70a30..6d9bf1c2f 100644 --- a/lib/membrane/core/element/event_controller.ex +++ b/lib/membrane/core/element/event_controller.ex @@ -12,6 +12,7 @@ defmodule Membrane.Core.Element.EventController do alias Membrane.Core.Element.{ ActionHandler, CallbackContext, + DemandHandler, InputQueue, PadController, PlaybackQueue, @@ -98,6 +99,7 @@ defmodule Membrane.Core.Element.EventController do Membrane.Logger.debug("Received end of stream on pad #{inspect(pad_ref)}") state = PadModel.set_data!(state, pad_ref, :end_of_stream?, true) + state = DemandHandler.remove_pad_from_delayed_demands(pad_ref, state) state = PadController.remove_pad_associations(pad_ref, state) %{ diff --git a/test/membrane/integration/toilet_forwarding_test.exs b/test/membrane/integration/toilet_forwarding_test.exs index 1f240dfff..37097add8 100644 --- a/test/membrane/integration/toilet_forwarding_test.exs +++ b/test/membrane/integration/toilet_forwarding_test.exs @@ -234,7 +234,8 @@ defmodule Membrane.Integration.ToiletForwardingTest do ) for i <- 1..3000 do - assert_sink_buffer(pipeline, :sink, %Membrane.Buffer{payload: <>}) + assert_sink_buffer(pipeline, :sink, buffer) + assert %Membrane.Buffer{payload: <>} = buffer assert buff_idx == i end