Skip to content

Commit

Permalink
Delete handle_buffers_batch callback
Browse files Browse the repository at this point in the history
  • Loading branch information
FelonEkonom committed Sep 25, 2023
1 parent 185b5e5 commit b873b98
Show file tree
Hide file tree
Showing 8 changed files with 43 additions and 80 deletions.
4 changes: 2 additions & 2 deletions lib/membrane/core/element/action_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ defmodule Membrane.Core.Element.ActionHandler do
when cb in [
:handle_stream_format,
:handle_event,
:handle_buffers_batch,
:handle_buffer,
:handle_end_of_stream
] do
dir =
Expand All @@ -182,7 +182,7 @@ defmodule Membrane.Core.Element.ActionHandler do
action =
case cb do
:handle_event -> {:event, {pad, data}}
:handle_buffers_batch -> {:buffer, {pad, data}}
:handle_buffer -> {:buffer, {pad, data}}
:handle_stream_format -> {:stream_format, {pad, data}}
:handle_end_of_stream -> {:end_of_stream, pad}
end
Expand Down
20 changes: 12 additions & 8 deletions lib/membrane/core/element/buffer_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ defmodule Membrane.Core.Element.BufferController do
end

@doc """
Executes `handle_buffers_batch` callback.
Executes `handle_buffer` callback.
"""
@spec exec_buffer_callback(
Pad.ref(),
Expand All @@ -113,12 +113,16 @@ defmodule Membrane.Core.Element.BufferController do
end

defp do_exec_buffer_callback(pad_ref, buffers, state) do
CallbackHandler.exec_and_handle_callback(
:handle_buffers_batch,
ActionHandler,
%{context: &CallbackContext.from_state/1},
[pad_ref, buffers],
state
)
buffers
|> List.wrap()
|> Enum.reduce(state, fn buffer, state ->
CallbackHandler.exec_and_handle_callback(
:handle_buffer,
ActionHandler,
%{context: &CallbackContext.from_state/1},
[pad_ref, buffer],
state
)
end)
end
end
11 changes: 4 additions & 7 deletions lib/membrane/element/action.ex
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,7 @@ defmodule Membrane.Element.Action do
sub-callback executions are finished).
Useful when a long action is to be undertaken, and partial results need to
be returned before entire process finishes (e.g. default implementation of
`c:Membrane.WithInputPads.handle_buffers_batch/4` uses split action to invoke
`c:Membrane.WithInputPads.handle_buffer/4` with each buffer)
be returned before entire process finishes.
"""
@type split :: {:split, {callback_name :: atom, args_list :: [[any]]}}

Expand Down Expand Up @@ -80,7 +78,7 @@ defmodule Membrane.Element.Action do
of data from pad's internal queue, which _sends_ demands automatically when it
runs out of data.
If there is any data available at the pad, the data is passed to
`c:Membrane.WithInputPads.handle_buffers_batch/4` callback. Invoked callback is
`c:Membrane.WithInputPads.handle_buffer/4` callback. Invoked callback is
guaranteed not to receive more data than demanded.
Demand size can be either a non-negative integer, that overrides existing demand,
Expand Down Expand Up @@ -161,13 +159,12 @@ defmodule Membrane.Element.Action do
Allowed only when _all_ below conditions are met:
- element is filter,
- callback is `c:Membrane.Element.WithInputPads.handle_buffers_batch/4`,
`c:Membrane.Element.WithInputPads.handle_buffer/4`,
- callback is `c:Membrane.Element.WithInputPads.handle_buffer/4`,
`c:Membrane.Element.WithInputPads.handle_stream_format/4`,
`c:Membrane.Element.Base.handle_event/4` or `c:Membrane.Element.WithInputPads.handle_end_of_stream/3`
- playback is `playing`
Keep in mind that `c:Membrane.WithInputPads.handle_buffers_batch/4` can only
Keep in mind that `c:Membrane.WithInputPads.handle_buffer/4` can only
forward buffers, `c:Membrane.Element.WithInputPads.handle_stream_format/4` - stream formats.
`c:Membrane.Element.Base.handle_event/4` - events and
`c:Membrane.Element.WithInputPads.handle_end_of_stream/3` - ends of streams.
Expand Down
28 changes: 3 additions & 25 deletions lib/membrane/element/with_input_pads.ex
Original file line number Diff line number Diff line change
Expand Up @@ -49,25 +49,10 @@ defmodule Membrane.Element.WithInputPads do
@doc """
Callback that is called when buffer should be processed by the Element.
By default calls `c:handle_buffer/4` for each buffer.
For pads in pull mode it is called when buffers have been demanded (by returning
For pads in pull mode it is called when buffer have been demanded (by returning
`:demand` action from any callback).
For pads in push mode it is invoked when buffers arrive.
"""
@callback handle_buffers_batch(
pad :: Pad.ref(),
buffers :: list(Buffer.t()),
context :: CallbackContext.t(),
state :: Element.state()
) :: Membrane.Element.Base.callback_return()

@doc """
Callback that is called when buffer should be processed by the Element. In contrast
to `c:handle_buffers_batch/4`, it is passed only a single buffer.
Called by default implementation of `c:handle_buffers_batch/4`.
For pads in push mode it is invoked when buffer arrive.
"""
@callback handle_buffer(
pad :: Pad.ref(),
Expand Down Expand Up @@ -99,16 +84,9 @@ defmodule Membrane.Element.WithInputPads do
@impl true
def handle_end_of_stream(pad, _context, state), do: {[], state}

@impl true
def handle_buffers_batch(pad, buffers, _context, state) do
args_list = buffers |> Enum.map(&[pad, &1])
{[split: {:handle_buffer, args_list}], state}
end

defoverridable handle_stream_format: 4,
handle_start_of_stream: 3,
handle_end_of_stream: 3,
handle_buffers_batch: 4
handle_end_of_stream: 3
end
end
end
2 changes: 1 addition & 1 deletion lib/membrane/element/with_output_pads.ex
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ defmodule Membrane.Element.WithOutputPads do
In filters, this callback should usually return `:demand` action with
size sufficient for supplying incoming demand. This will result in calling
`c:Membrane.WithInputPads.handle_buffers_batch/4`, which is to supply
`c:Membrane.WithInputPads.handle_buffer/4`, which is to supply
the demand.
If a source or an endpoint is unable to produce enough buffers, or a filter
Expand Down
10 changes: 3 additions & 7 deletions lib/membrane/filter_aggregator.ex
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,8 @@ defmodule Membrane.FilterAggregator do
end

@impl true
def handle_buffers_batch(:input, buffers, _ctx, %{states: states}) do
{actions, states} = pipe_downstream([buffer: {:output, buffers}], states)
def handle_buffer(:input, buffer, _ctx, %{states: states}) do
{actions, states} = pipe_downstream([buffer: {:output, buffer}], states)
actions = reject_internal_actions(actions)

{actions, %{states: states}}
Expand Down Expand Up @@ -191,12 +191,8 @@ defmodule Membrane.FilterAggregator do
end
end

defp perform_action({:buffer, {:output, []}}, _module, _context, state) do
{[], state}
end

defp perform_action({:buffer, {:output, buffer}}, module, context, state) do
module.handle_buffers_batch(:input, List.wrap(buffer), context, state)
module.handle_buffer(:input, buffer, context, state)
end

defp perform_action({:stream_format, {:output, stream_format}}, module, context, state) do
Expand Down
13 changes: 4 additions & 9 deletions test/membrane/filter_aggregator/integration_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,10 @@ defmodule Membrane.FilterAggregator.IntegrationTest do
def_output_pad :output, flow_control: :auto, accepted_format: RemoteStream

@impl true
def handle_buffers_batch(:input, buffers, _ctx, state) do
buffers =
buffers
|> Enum.map(fn %Buffer{payload: <<idx, payload::binary>>} ->
payload = for <<i <- payload>>, into: <<>>, do: <<i - 1>>
%Buffer{payload: <<idx, payload::binary>>}
end)

{[buffer: {:output, buffers}], state}
def handle_buffer(:input, %Buffer{payload: <<idx, payload::binary>>}, _ctx, state) do
payload = for <<i <- payload>>, into: <<>>, do: <<i - 1>>
buffer = %Buffer{payload: <<idx, payload::binary>>}
{[buffer: {:output, buffer}], state}
end
end

Expand Down
35 changes: 14 additions & 21 deletions test/membrane/filter_aggregator/unit_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ defmodule Membrane.FilterAggregator.UnitTest do

import Mox

alias Membrane.FilterAggregator.IntegrationTest.FilterA
alias Membrane.Buffer
alias Membrane.Element.PadData
alias Membrane.FilterAggregator
Expand Down Expand Up @@ -271,17 +272,11 @@ defmodule Membrane.FilterAggregator.UnitTest do
assert ctx_b.playback == :playing
end

test "handle_buffers_batch splitting and mapping buffers", ctx do
test_range = 1..10
buffers = test_range |> Enum.map(&%Buffer{payload: <<&1>>})
buffers_count = Enum.count(test_range)
test "handle_buffer mapping buffers", ctx do
buffer = %Buffer{payload: <<1>>}

FilterA
|> expect(:handle_buffers_batch, fn :input, buffers, %{}, %{module: FilterA} = state ->
args_list = buffers |> Enum.map(&[:input, &1])
{[split: {:handle_buffer, args_list}], state}
end)
|> expect(:handle_buffer, buffers_count, fn :input, buffer, %{}, state ->
|> expect(:handle_buffer, fn :input, buffer, %{}, state ->
assert state.module == FilterA
assert %Buffer{payload: <<payload>>} = buffer
out_payload = payload + 1
Expand All @@ -290,7 +285,7 @@ defmodule Membrane.FilterAggregator.UnitTest do
end)

FilterB
|> expect(:handle_buffers_batch, buffers_count, fn :input, [buffer], %{}, state ->
|> expect(:handle_buffer, fn :input, buffer, %{}, state ->
assert state.module == FilterB
assert %Buffer{payload: <<payload>>} = buffer
out_payload = payload * 2
Expand All @@ -299,11 +294,9 @@ defmodule Membrane.FilterAggregator.UnitTest do
end)

assert {actions, %{states: states}} =
FilterAggregator.handle_buffers_batch(:input, buffers, %{}, %{states: ctx.states})
FilterAggregator.handle_buffer(:input, buffer, %{}, %{states: ctx.states})

expected_actions =
test_range
|> Enum.map(&{:buffer, {:output, %Buffer{payload: <<(&1 + 1) * 2>>}}})
expected_actions = [buffer: {:output, %Buffer{payload: <<(1 + 1) * 2>>}}]

assert actions == expected_actions

Expand All @@ -312,8 +305,8 @@ defmodule Membrane.FilterAggregator.UnitTest do
{:b, FilterB, ctx_b, %{module: FilterB, state: state_b}}
] = states

assert state_a == test_range |> Enum.map(&(&1 + 1)) |> Enum.sum()
assert state_b == test_range |> Enum.map(&((&1 + 1) * 2)) |> Enum.sum()
assert state_a == 1 + 1
assert state_b == (1 + 1) * 2

assert ctx_a == ctx.states |> Enum.at(0) |> elem(2)
assert ctx_b == ctx.states |> Enum.at(1) |> elem(2)
Expand All @@ -327,9 +320,9 @@ defmodule Membrane.FilterAggregator.UnitTest do
assert ctx.pads.input.start_of_stream? == true
{[], state}
end)
|> expect(:handle_buffers_batch, fn :input, [^buffer], %{} = ctx, state ->
|> expect(:handle_buffer, fn :input, ^buffer, %{} = ctx, state ->
assert ctx.pads.input.start_of_stream? == true
{[forward: [buffer]], state}
{[forward: buffer], state}
end)
|> expect(:handle_end_of_stream, fn :input, %{} = ctx, state ->
assert ctx.pads.input.end_of_stream? == true
Expand All @@ -341,9 +334,9 @@ defmodule Membrane.FilterAggregator.UnitTest do
assert ctx.pads.input.start_of_stream? == true
{[], state}
end)
|> expect(:handle_buffers_batch, fn :input, [^buffer], %{} = ctx, state ->
|> expect(:handle_buffer, fn :input, ^buffer, %{} = ctx, state ->
assert ctx.pads.input.start_of_stream? == true
{[buffer: {:output, [buffer]}], state}
{[buffer: {:output, buffer}], state}
end)
|> expect(:handle_end_of_stream, fn :input, %{} = ctx, state ->
assert ctx.pads.input.end_of_stream? == true
Expand All @@ -356,7 +349,7 @@ defmodule Membrane.FilterAggregator.UnitTest do
})

assert {[buffer: {:output, buffers}], %{states: states}} =
FilterAggregator.handle_buffers_batch(:input, [buffer], %{}, %{states: states})
FilterAggregator.handle_buffer(:input, buffer, %{}, %{states: states})

assert List.wrap(buffers) == [buffer]

Expand Down

0 comments on commit b873b98

Please sign in to comment.