Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delete handle_buffers_batch callback #601

Merged
merged 5 commits into from
Sep 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
* Fix process leak in starting clocks. [#594](https://github.com/membraneframework/membrane_core/pull/594)
* Add child exit reason to the supervisor exit reason. [#595](https://github.com/membraneframework/membrane_core/pull/595)
* Remove default implementation of `start_/2`, `start_link/2` and `terminate/2` in modules using `Membrane.Pipeline`. [#598](https://github.com/membraneframework/membrane_core/pull/598)
* Remove callback _Membrane.Element.WithInputPads.handle_buffers_batch/4_. [#601](https://github.com/membraneframework/membrane_core/pull/601)

## 0.11.0
* Separate element_name and pad arguments in handle_element_{start, end}_of_stream signature [#219](https://github.com/membraneframework/membrane_core/issues/219)
Expand Down
2 changes: 1 addition & 1 deletion guides/upgrading/v1.0.0-rc0.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ Names of the callbacks that are used to process buffers have been unified. This
* _Membrane.Endpoint.handle_write/4_ and _Membrane.Endpoint.handle_write_list/4_
* _Membrane.Sink.handle_write/4_ and _Membrane.Sink.handle_write_list/4_

and they became `c:Membrane.Element.WithInputPads.handle_buffer/4` and `c:Membrane.Element.WithInputPads.handle_buffers_batch/4`, respectively:
and they became `c:Membrane.Element.WithInputPads.handle_buffer/4` and _Membrane.Element.WithInputPads.handle_buffers_batch/4_, respectively:

```diff
@impl true
Expand Down
4 changes: 2 additions & 2 deletions lib/membrane/core/element/action_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ defmodule Membrane.Core.Element.ActionHandler do
when cb in [
:handle_stream_format,
:handle_event,
:handle_buffers_batch,
:handle_buffer,
:handle_end_of_stream
] do
dir =
Expand All @@ -182,7 +182,7 @@ defmodule Membrane.Core.Element.ActionHandler do
action =
case cb do
:handle_event -> {:event, {pad, data}}
:handle_buffers_batch -> {:buffer, {pad, data}}
:handle_buffer -> {:buffer, {pad, data}}
:handle_stream_format -> {:stream_format, {pad, data}}
:handle_end_of_stream -> {:end_of_stream, pad}
end
Expand Down
18 changes: 10 additions & 8 deletions lib/membrane/core/element/buffer_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ defmodule Membrane.Core.Element.BufferController do
end

@doc """
Executes `handle_buffers_batch` callback.
Executes `handle_buffer` callback.
"""
@spec exec_buffer_callback(
Pad.ref(),
Expand All @@ -113,12 +113,14 @@ defmodule Membrane.Core.Element.BufferController do
end

defp do_exec_buffer_callback(pad_ref, buffers, state) do
CallbackHandler.exec_and_handle_callback(
:handle_buffers_batch,
ActionHandler,
%{context: &CallbackContext.from_state/1},
[pad_ref, buffers],
state
)
Enum.reduce(buffers, state, fn buffer, state ->
CallbackHandler.exec_and_handle_callback(
:handle_buffer,
ActionHandler,
%{context: &CallbackContext.from_state/1},
[pad_ref, buffer],
state
)
end)
end
end
11 changes: 4 additions & 7 deletions lib/membrane/element/action.ex
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,7 @@ defmodule Membrane.Element.Action do
sub-callback executions are finished).

Useful when a long action is to be undertaken, and partial results need to
be returned before entire process finishes (e.g. default implementation of
`c:Membrane.WithInputPads.handle_buffers_batch/4` uses split action to invoke
`c:Membrane.WithInputPads.handle_buffer/4` with each buffer)
be returned before entire process finishes.
"""
@type split :: {:split, {callback_name :: atom, args_list :: [[any]]}}

Expand Down Expand Up @@ -80,7 +78,7 @@ defmodule Membrane.Element.Action do
of data from pad's internal queue, which _sends_ demands automatically when it
runs out of data.
If there is any data available at the pad, the data is passed to
`c:Membrane.WithInputPads.handle_buffers_batch/4` callback. Invoked callback is
`c:Membrane.WithInputPads.handle_buffer/4` callback. Invoked callback is
guaranteed not to receive more data than demanded.

Demand size can be either a non-negative integer, that overrides existing demand,
Expand Down Expand Up @@ -161,13 +159,12 @@ defmodule Membrane.Element.Action do

Allowed only when _all_ below conditions are met:
- element is filter,
- callback is `c:Membrane.Element.WithInputPads.handle_buffers_batch/4`,
`c:Membrane.Element.WithInputPads.handle_buffer/4`,
- callback is `c:Membrane.Element.WithInputPads.handle_buffer/4`,
`c:Membrane.Element.WithInputPads.handle_stream_format/4`,
`c:Membrane.Element.Base.handle_event/4` or `c:Membrane.Element.WithInputPads.handle_end_of_stream/3`
- playback is `playing`

Keep in mind that `c:Membrane.WithInputPads.handle_buffers_batch/4` can only
Keep in mind that `c:Membrane.WithInputPads.handle_buffer/4` can only
forward buffers, `c:Membrane.Element.WithInputPads.handle_stream_format/4` - stream formats.
`c:Membrane.Element.Base.handle_event/4` - events and
`c:Membrane.Element.WithInputPads.handle_end_of_stream/3` - ends of streams.
Expand Down
28 changes: 3 additions & 25 deletions lib/membrane/element/with_input_pads.ex
Original file line number Diff line number Diff line change
Expand Up @@ -49,25 +49,10 @@ defmodule Membrane.Element.WithInputPads do
@doc """
Callback that is called when buffer should be processed by the Element.

By default calls `c:handle_buffer/4` for each buffer.

For pads in pull mode it is called when buffers have been demanded (by returning
For pads in pull mode it is called when buffer have been demanded (by returning
`:demand` action from any callback).

For pads in push mode it is invoked when buffers arrive.
"""
@callback handle_buffers_batch(
pad :: Pad.ref(),
buffers :: list(Buffer.t()),
context :: CallbackContext.t(),
state :: Element.state()
) :: Membrane.Element.Base.callback_return()

@doc """
Callback that is called when buffer should be processed by the Element. In contrast
to `c:handle_buffers_batch/4`, it is passed only a single buffer.

Called by default implementation of `c:handle_buffers_batch/4`.
For pads in push mode it is invoked when buffer arrive.
"""
@callback handle_buffer(
pad :: Pad.ref(),
Expand Down Expand Up @@ -99,16 +84,9 @@ defmodule Membrane.Element.WithInputPads do
@impl true
def handle_end_of_stream(pad, _context, state), do: {[], state}

@impl true
def handle_buffers_batch(pad, buffers, _context, state) do
args_list = buffers |> Enum.map(&[pad, &1])
{[split: {:handle_buffer, args_list}], state}
end

defoverridable handle_stream_format: 4,
handle_start_of_stream: 3,
handle_end_of_stream: 3,
handle_buffers_batch: 4
handle_end_of_stream: 3
end
end
end
2 changes: 1 addition & 1 deletion lib/membrane/element/with_output_pads.ex
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ defmodule Membrane.Element.WithOutputPads do

In filters, this callback should usually return `:demand` action with
size sufficient for supplying incoming demand. This will result in calling
`c:Membrane.WithInputPads.handle_buffers_batch/4`, which is to supply
`c:Membrane.WithInputPads.handle_buffer/4`, which is to supply
the demand.

If a source or an endpoint is unable to produce enough buffers, or a filter
Expand Down
10 changes: 3 additions & 7 deletions lib/membrane/filter_aggregator.ex
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,8 @@ defmodule Membrane.FilterAggregator do
end

@impl true
def handle_buffers_batch(:input, buffers, _ctx, %{states: states}) do
{actions, states} = pipe_downstream([buffer: {:output, buffers}], states)
def handle_buffer(:input, buffer, _ctx, %{states: states}) do
{actions, states} = pipe_downstream([buffer: {:output, buffer}], states)
actions = reject_internal_actions(actions)

{actions, %{states: states}}
Expand Down Expand Up @@ -191,12 +191,8 @@ defmodule Membrane.FilterAggregator do
end
end

defp perform_action({:buffer, {:output, []}}, _module, _context, state) do
{[], state}
end

defp perform_action({:buffer, {:output, buffer}}, module, context, state) do
module.handle_buffers_batch(:input, List.wrap(buffer), context, state)
module.handle_buffer(:input, buffer, context, state)
end

defp perform_action({:stream_format, {:output, stream_format}}, module, context, state) do
Expand Down
13 changes: 4 additions & 9 deletions test/membrane/filter_aggregator/integration_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,10 @@ defmodule Membrane.FilterAggregator.IntegrationTest do
def_output_pad :output, flow_control: :auto, accepted_format: RemoteStream

@impl true
def handle_buffers_batch(:input, buffers, _ctx, state) do
buffers =
buffers
|> Enum.map(fn %Buffer{payload: <<idx, payload::binary>>} ->
payload = for <<i <- payload>>, into: <<>>, do: <<i - 1>>
%Buffer{payload: <<idx, payload::binary>>}
end)

{[buffer: {:output, buffers}], state}
def handle_buffer(:input, %Buffer{payload: <<idx, payload::binary>>}, _ctx, state) do
payload = for <<i <- payload>>, into: <<>>, do: <<i - 1>>
buffer = %Buffer{payload: <<idx, payload::binary>>}
{[buffer: {:output, buffer}], state}
end
end

Expand Down
34 changes: 13 additions & 21 deletions test/membrane/filter_aggregator/unit_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -271,17 +271,11 @@ defmodule Membrane.FilterAggregator.UnitTest do
assert ctx_b.playback == :playing
end

test "handle_buffers_batch splitting and mapping buffers", ctx do
test_range = 1..10
buffers = test_range |> Enum.map(&%Buffer{payload: <<&1>>})
buffers_count = Enum.count(test_range)
test "handle_buffer mapping buffers", ctx do
buffer = %Buffer{payload: <<1>>}

FilterA
|> expect(:handle_buffers_batch, fn :input, buffers, %{}, %{module: FilterA} = state ->
args_list = buffers |> Enum.map(&[:input, &1])
{[split: {:handle_buffer, args_list}], state}
end)
|> expect(:handle_buffer, buffers_count, fn :input, buffer, %{}, state ->
|> expect(:handle_buffer, fn :input, buffer, %{}, state ->
assert state.module == FilterA
assert %Buffer{payload: <<payload>>} = buffer
out_payload = payload + 1
Expand All @@ -290,7 +284,7 @@ defmodule Membrane.FilterAggregator.UnitTest do
end)

FilterB
|> expect(:handle_buffers_batch, buffers_count, fn :input, [buffer], %{}, state ->
|> expect(:handle_buffer, fn :input, buffer, %{}, state ->
assert state.module == FilterB
assert %Buffer{payload: <<payload>>} = buffer
out_payload = payload * 2
Expand All @@ -299,11 +293,9 @@ defmodule Membrane.FilterAggregator.UnitTest do
end)

assert {actions, %{states: states}} =
FilterAggregator.handle_buffers_batch(:input, buffers, %{}, %{states: ctx.states})
FilterAggregator.handle_buffer(:input, buffer, %{}, %{states: ctx.states})

expected_actions =
test_range
|> Enum.map(&{:buffer, {:output, %Buffer{payload: <<(&1 + 1) * 2>>}}})
expected_actions = [buffer: {:output, %Buffer{payload: <<(1 + 1) * 2>>}}]

assert actions == expected_actions

Expand All @@ -312,8 +304,8 @@ defmodule Membrane.FilterAggregator.UnitTest do
{:b, FilterB, ctx_b, %{module: FilterB, state: state_b}}
] = states

assert state_a == test_range |> Enum.map(&(&1 + 1)) |> Enum.sum()
assert state_b == test_range |> Enum.map(&((&1 + 1) * 2)) |> Enum.sum()
assert state_a == 1 + 1
assert state_b == (1 + 1) * 2

assert ctx_a == ctx.states |> Enum.at(0) |> elem(2)
assert ctx_b == ctx.states |> Enum.at(1) |> elem(2)
Expand All @@ -327,9 +319,9 @@ defmodule Membrane.FilterAggregator.UnitTest do
assert ctx.pads.input.start_of_stream? == true
{[], state}
end)
|> expect(:handle_buffers_batch, fn :input, [^buffer], %{} = ctx, state ->
|> expect(:handle_buffer, fn :input, ^buffer, %{} = ctx, state ->
assert ctx.pads.input.start_of_stream? == true
{[forward: [buffer]], state}
{[forward: buffer], state}
end)
|> expect(:handle_end_of_stream, fn :input, %{} = ctx, state ->
assert ctx.pads.input.end_of_stream? == true
Expand All @@ -341,9 +333,9 @@ defmodule Membrane.FilterAggregator.UnitTest do
assert ctx.pads.input.start_of_stream? == true
{[], state}
end)
|> expect(:handle_buffers_batch, fn :input, [^buffer], %{} = ctx, state ->
|> expect(:handle_buffer, fn :input, ^buffer, %{} = ctx, state ->
assert ctx.pads.input.start_of_stream? == true
{[buffer: {:output, [buffer]}], state}
{[buffer: {:output, buffer}], state}
end)
|> expect(:handle_end_of_stream, fn :input, %{} = ctx, state ->
assert ctx.pads.input.end_of_stream? == true
Expand All @@ -356,7 +348,7 @@ defmodule Membrane.FilterAggregator.UnitTest do
})

assert {[buffer: {:output, buffers}], %{states: states}} =
FilterAggregator.handle_buffers_batch(:input, [buffer], %{}, %{states: states})
FilterAggregator.handle_buffer(:input, buffer, %{}, %{states: states})

assert List.wrap(buffers) == [buffer]

Expand Down
Loading