diff --git a/lib/membrane/core/element/action_handler.ex b/lib/membrane/core/element/action_handler.ex index 93e1d8f81..0812367b6 100644 --- a/lib/membrane/core/element/action_handler.ex +++ b/lib/membrane/core/element/action_handler.ex @@ -167,7 +167,7 @@ defmodule Membrane.Core.Element.ActionHandler do when cb in [ :handle_stream_format, :handle_event, - :handle_buffers_batch, + :handle_buffer, :handle_end_of_stream ] do dir = @@ -182,7 +182,7 @@ defmodule Membrane.Core.Element.ActionHandler do action = case cb do :handle_event -> {:event, {pad, data}} - :handle_buffers_batch -> {:buffer, {pad, data}} + :handle_buffer -> {:buffer, {pad, data}} :handle_stream_format -> {:stream_format, {pad, data}} :handle_end_of_stream -> {:end_of_stream, pad} end diff --git a/lib/membrane/core/element/buffer_controller.ex b/lib/membrane/core/element/buffer_controller.ex index 4b0b1e1c0..97bcd959b 100644 --- a/lib/membrane/core/element/buffer_controller.ex +++ b/lib/membrane/core/element/buffer_controller.ex @@ -91,7 +91,7 @@ defmodule Membrane.Core.Element.BufferController do end @doc """ - Executes `handle_buffers_batch` callback. + Executes `handle_buffer` callback. """ @spec exec_buffer_callback( Pad.ref(), @@ -113,12 +113,16 @@ defmodule Membrane.Core.Element.BufferController do end defp do_exec_buffer_callback(pad_ref, buffers, state) do - CallbackHandler.exec_and_handle_callback( - :handle_buffers_batch, - ActionHandler, - %{context: &CallbackContext.from_state/1}, - [pad_ref, buffers], - state - ) + buffers + |> List.wrap() + |> Enum.reduce(state, fn buffer, state -> + CallbackHandler.exec_and_handle_callback( + :handle_buffer, + ActionHandler, + %{context: &CallbackContext.from_state/1}, + [pad_ref, buffer], + state + ) + end) end end diff --git a/lib/membrane/element/action.ex b/lib/membrane/element/action.ex index a2b82eeb4..3e4f71b8a 100644 --- a/lib/membrane/element/action.ex +++ b/lib/membrane/element/action.ex @@ -48,9 +48,7 @@ defmodule Membrane.Element.Action do sub-callback executions are finished). Useful when a long action is to be undertaken, and partial results need to - be returned before entire process finishes (e.g. default implementation of - `c:Membrane.WithInputPads.handle_buffers_batch/4` uses split action to invoke - `c:Membrane.WithInputPads.handle_buffer/4` with each buffer) + be returned before entire process finishes. """ @type split :: {:split, {callback_name :: atom, args_list :: [[any]]}} @@ -80,7 +78,7 @@ defmodule Membrane.Element.Action do of data from pad's internal queue, which _sends_ demands automatically when it runs out of data. If there is any data available at the pad, the data is passed to - `c:Membrane.WithInputPads.handle_buffers_batch/4` callback. Invoked callback is + `c:Membrane.WithInputPads.handle_buffer/4` callback. Invoked callback is guaranteed not to receive more data than demanded. Demand size can be either a non-negative integer, that overrides existing demand, @@ -161,13 +159,12 @@ defmodule Membrane.Element.Action do Allowed only when _all_ below conditions are met: - element is filter, - - callback is `c:Membrane.Element.WithInputPads.handle_buffers_batch/4`, - `c:Membrane.Element.WithInputPads.handle_buffer/4`, + - callback is `c:Membrane.Element.WithInputPads.handle_buffer/4`, `c:Membrane.Element.WithInputPads.handle_stream_format/4`, `c:Membrane.Element.Base.handle_event/4` or `c:Membrane.Element.WithInputPads.handle_end_of_stream/3` - playback is `playing` - Keep in mind that `c:Membrane.WithInputPads.handle_buffers_batch/4` can only + Keep in mind that `c:Membrane.WithInputPads.handle_buffer/4` can only forward buffers, `c:Membrane.Element.WithInputPads.handle_stream_format/4` - stream formats. `c:Membrane.Element.Base.handle_event/4` - events and `c:Membrane.Element.WithInputPads.handle_end_of_stream/3` - ends of streams. diff --git a/lib/membrane/element/with_input_pads.ex b/lib/membrane/element/with_input_pads.ex index 257bf11f9..ba5a08986 100644 --- a/lib/membrane/element/with_input_pads.ex +++ b/lib/membrane/element/with_input_pads.ex @@ -49,25 +49,10 @@ defmodule Membrane.Element.WithInputPads do @doc """ Callback that is called when buffer should be processed by the Element. - By default calls `c:handle_buffer/4` for each buffer. - - For pads in pull mode it is called when buffers have been demanded (by returning + For pads in pull mode it is called when buffer have been demanded (by returning `:demand` action from any callback). - For pads in push mode it is invoked when buffers arrive. - """ - @callback handle_buffers_batch( - pad :: Pad.ref(), - buffers :: list(Buffer.t()), - context :: CallbackContext.t(), - state :: Element.state() - ) :: Membrane.Element.Base.callback_return() - - @doc """ - Callback that is called when buffer should be processed by the Element. In contrast - to `c:handle_buffers_batch/4`, it is passed only a single buffer. - - Called by default implementation of `c:handle_buffers_batch/4`. + For pads in push mode it is invoked when buffer arrive. """ @callback handle_buffer( pad :: Pad.ref(), @@ -99,16 +84,9 @@ defmodule Membrane.Element.WithInputPads do @impl true def handle_end_of_stream(pad, _context, state), do: {[], state} - @impl true - def handle_buffers_batch(pad, buffers, _context, state) do - args_list = buffers |> Enum.map(&[pad, &1]) - {[split: {:handle_buffer, args_list}], state} - end - defoverridable handle_stream_format: 4, handle_start_of_stream: 3, - handle_end_of_stream: 3, - handle_buffers_batch: 4 + handle_end_of_stream: 3 end end end diff --git a/lib/membrane/element/with_output_pads.ex b/lib/membrane/element/with_output_pads.ex index 974be43c1..be3e5ecf7 100644 --- a/lib/membrane/element/with_output_pads.ex +++ b/lib/membrane/element/with_output_pads.ex @@ -22,7 +22,7 @@ defmodule Membrane.Element.WithOutputPads do In filters, this callback should usually return `:demand` action with size sufficient for supplying incoming demand. This will result in calling - `c:Membrane.WithInputPads.handle_buffers_batch/4`, which is to supply + `c:Membrane.WithInputPads.handle_buffer/4`, which is to supply the demand. If a source or an endpoint is unable to produce enough buffers, or a filter diff --git a/lib/membrane/filter_aggregator.ex b/lib/membrane/filter_aggregator.ex index 57d22938d..b5b2b2b0c 100644 --- a/lib/membrane/filter_aggregator.ex +++ b/lib/membrane/filter_aggregator.ex @@ -134,8 +134,8 @@ defmodule Membrane.FilterAggregator do end @impl true - def handle_buffers_batch(:input, buffers, _ctx, %{states: states}) do - {actions, states} = pipe_downstream([buffer: {:output, buffers}], states) + def handle_buffer(:input, buffer, _ctx, %{states: states}) do + {actions, states} = pipe_downstream([buffer: {:output, buffer}], states) actions = reject_internal_actions(actions) {actions, %{states: states}} @@ -191,12 +191,8 @@ defmodule Membrane.FilterAggregator do end end - defp perform_action({:buffer, {:output, []}}, _module, _context, state) do - {[], state} - end - defp perform_action({:buffer, {:output, buffer}}, module, context, state) do - module.handle_buffers_batch(:input, List.wrap(buffer), context, state) + module.handle_buffer(:input, buffer, context, state) end defp perform_action({:stream_format, {:output, stream_format}}, module, context, state) do diff --git a/test/membrane/filter_aggregator/integration_test.exs b/test/membrane/filter_aggregator/integration_test.exs index b52540265..72fcb63b8 100644 --- a/test/membrane/filter_aggregator/integration_test.exs +++ b/test/membrane/filter_aggregator/integration_test.exs @@ -29,15 +29,10 @@ defmodule Membrane.FilterAggregator.IntegrationTest do def_output_pad :output, flow_control: :auto, accepted_format: RemoteStream @impl true - def handle_buffers_batch(:input, buffers, _ctx, state) do - buffers = - buffers - |> Enum.map(fn %Buffer{payload: <>} -> - payload = for <>, into: <<>>, do: <> - %Buffer{payload: <>} - end) - - {[buffer: {:output, buffers}], state} + def handle_buffer(:input, %Buffer{payload: <>}, _ctx, state) do + payload = for <>, into: <<>>, do: <> + buffer = %Buffer{payload: <>} + {[buffer: {:output, buffer}], state} end end diff --git a/test/membrane/filter_aggregator/unit_test.exs b/test/membrane/filter_aggregator/unit_test.exs index 32a39488f..03dc24355 100644 --- a/test/membrane/filter_aggregator/unit_test.exs +++ b/test/membrane/filter_aggregator/unit_test.exs @@ -3,6 +3,7 @@ defmodule Membrane.FilterAggregator.UnitTest do import Mox + alias Membrane.FilterAggregator.IntegrationTest.FilterA alias Membrane.Buffer alias Membrane.Element.PadData alias Membrane.FilterAggregator @@ -271,17 +272,11 @@ defmodule Membrane.FilterAggregator.UnitTest do assert ctx_b.playback == :playing end - test "handle_buffers_batch splitting and mapping buffers", ctx do - test_range = 1..10 - buffers = test_range |> Enum.map(&%Buffer{payload: <<&1>>}) - buffers_count = Enum.count(test_range) + test "handle_buffer mapping buffers", ctx do + buffer = %Buffer{payload: <<1>>} FilterA - |> expect(:handle_buffers_batch, fn :input, buffers, %{}, %{module: FilterA} = state -> - args_list = buffers |> Enum.map(&[:input, &1]) - {[split: {:handle_buffer, args_list}], state} - end) - |> expect(:handle_buffer, buffers_count, fn :input, buffer, %{}, state -> + |> expect(:handle_buffer, fn :input, buffer, %{}, state -> assert state.module == FilterA assert %Buffer{payload: <>} = buffer out_payload = payload + 1 @@ -290,7 +285,7 @@ defmodule Membrane.FilterAggregator.UnitTest do end) FilterB - |> expect(:handle_buffers_batch, buffers_count, fn :input, [buffer], %{}, state -> + |> expect(:handle_buffer, fn :input, buffer, %{}, state -> assert state.module == FilterB assert %Buffer{payload: <>} = buffer out_payload = payload * 2 @@ -299,11 +294,9 @@ defmodule Membrane.FilterAggregator.UnitTest do end) assert {actions, %{states: states}} = - FilterAggregator.handle_buffers_batch(:input, buffers, %{}, %{states: ctx.states}) + FilterAggregator.handle_buffer(:input, buffer, %{}, %{states: ctx.states}) - expected_actions = - test_range - |> Enum.map(&{:buffer, {:output, %Buffer{payload: <<(&1 + 1) * 2>>}}}) + expected_actions = [buffer: {:output, %Buffer{payload: <<(1 + 1) * 2>>}}] assert actions == expected_actions @@ -312,8 +305,8 @@ defmodule Membrane.FilterAggregator.UnitTest do {:b, FilterB, ctx_b, %{module: FilterB, state: state_b}} ] = states - assert state_a == test_range |> Enum.map(&(&1 + 1)) |> Enum.sum() - assert state_b == test_range |> Enum.map(&((&1 + 1) * 2)) |> Enum.sum() + assert state_a == 1 + 1 + assert state_b == (1 + 1) * 2 assert ctx_a == ctx.states |> Enum.at(0) |> elem(2) assert ctx_b == ctx.states |> Enum.at(1) |> elem(2) @@ -327,9 +320,9 @@ defmodule Membrane.FilterAggregator.UnitTest do assert ctx.pads.input.start_of_stream? == true {[], state} end) - |> expect(:handle_buffers_batch, fn :input, [^buffer], %{} = ctx, state -> + |> expect(:handle_buffer, fn :input, ^buffer, %{} = ctx, state -> assert ctx.pads.input.start_of_stream? == true - {[forward: [buffer]], state} + {[forward: buffer], state} end) |> expect(:handle_end_of_stream, fn :input, %{} = ctx, state -> assert ctx.pads.input.end_of_stream? == true @@ -341,9 +334,9 @@ defmodule Membrane.FilterAggregator.UnitTest do assert ctx.pads.input.start_of_stream? == true {[], state} end) - |> expect(:handle_buffers_batch, fn :input, [^buffer], %{} = ctx, state -> + |> expect(:handle_buffer, fn :input, ^buffer, %{} = ctx, state -> assert ctx.pads.input.start_of_stream? == true - {[buffer: {:output, [buffer]}], state} + {[buffer: {:output, buffer}], state} end) |> expect(:handle_end_of_stream, fn :input, %{} = ctx, state -> assert ctx.pads.input.end_of_stream? == true @@ -356,7 +349,7 @@ defmodule Membrane.FilterAggregator.UnitTest do }) assert {[buffer: {:output, buffers}], %{states: states}} = - FilterAggregator.handle_buffers_batch(:input, [buffer], %{}, %{states: states}) + FilterAggregator.handle_buffer(:input, buffer, %{}, %{states: states}) assert List.wrap(buffers) == [buffer]