Skip to content

Commit

Permalink
WIP Fix bug in executing handle_buffer while handling actions from pr…
Browse files Browse the repository at this point in the history
…evious callback
  • Loading branch information
FelonEkonom committed Jan 29, 2024
1 parent 0e40638 commit 927285a
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 2 deletions.
19 changes: 19 additions & 0 deletions lib/membrane/core/callback_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ defmodule Membrane.Core.CallbackHandler do

use Bunch

alias Membrane.Core.Element.DemandHandler
alias Membrane.Core.Component
alias Membrane.CallbackError

require Membrane.Logger
Expand Down Expand Up @@ -191,6 +193,9 @@ defmodule Membrane.Core.CallbackHandler do
was_handling_action? = state.handling_action?
state = %{state | handling_action?: true}

was_supplying_demand? = state[:supplying_demand?]
state = if Component.is_element?(state), do: %{state | supplying_demand?: true}, else: state

state =
Enum.reduce(actions, state, fn action, state ->
try do
Expand All @@ -210,6 +215,20 @@ defmodule Membrane.Core.CallbackHandler do
do: state,
else: %{state | handling_action?: false}

state =
cond do
was_supplying_demand? -> state
Component.is_element?(state) -> %{state | supplying_demand?: false}
true -> state
end

state =
if Component.is_element?(state) and not was_supplying_demand? do
DemandHandler.handle_delayed_demands(state)
else
state
end

handler_module.handle_end_of_actions(state)
end
end
4 changes: 3 additions & 1 deletion lib/membrane/core/element/action_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,9 @@ defmodule Membrane.Core.Element.ActionHandler do
defp handle_outgoing_event(pad_ref, %Events.EndOfStream{}, state) do
with %{direction: :output, end_of_stream?: false} <- PadModel.get_data!(state, pad_ref) do
state = PadController.remove_pad_associations(pad_ref, state)
PadModel.set_data!(state, pad_ref, :end_of_stream?, true)

DemandHandler.remove_pad_from_delayed_demands(pad_ref, state)
|> PadModel.set_data!(pad_ref, :end_of_stream?, true)
else
%{direction: :input} ->
raise PadDirectionError, action: "end of stream", direction: :input, pad: pad_ref
Expand Down
9 changes: 9 additions & 0 deletions lib/membrane/core/element/demand_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,15 @@ defmodule Membrane.Core.Element.DemandHandler do
end
end

@spec remove_pad_from_delayed_demands(Pad.ref(), State.t()) :: State.t()
def remove_pad_from_delayed_demands(pad_ref, state) do
Map.update!(state, :delayed_demands, fn delayed_demands_set ->
delayed_demands_set
|> MapSet.delete({pad_ref, :supply})
|> MapSet.delete({pad_ref, :redemand})
end)
end

@spec handle_input_queue_output(
Pad.ref(),
[InputQueue.output_value()],
Expand Down
2 changes: 2 additions & 0 deletions lib/membrane/core/element/event_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ defmodule Membrane.Core.Element.EventController do
alias Membrane.Core.Element.{
ActionHandler,
CallbackContext,
DemandHandler,
InputQueue,
PadController,
PlaybackQueue,
Expand Down Expand Up @@ -98,6 +99,7 @@ defmodule Membrane.Core.Element.EventController do
Membrane.Logger.debug("Received end of stream on pad #{inspect(pad_ref)}")

state = PadModel.set_data!(state, pad_ref, :end_of_stream?, true)
state = DemandHandler.remove_pad_from_delayed_demands(pad_ref, state)
state = PadController.remove_pad_associations(pad_ref, state)

%{
Expand Down
3 changes: 2 additions & 1 deletion test/membrane/integration/toilet_forwarding_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,8 @@ defmodule Membrane.Integration.ToiletForwardingTest do
)

for i <- 1..3000 do
assert_sink_buffer(pipeline, :sink, %Membrane.Buffer{payload: <<buff_idx::64>>})
assert_sink_buffer(pipeline, :sink, buffer)
assert %Membrane.Buffer{payload: <<buff_idx::64>>} = buffer
assert buff_idx == i
end

Expand Down

0 comments on commit 927285a

Please sign in to comment.