From 6c268121908c42d73e1fe9e6f9390ed7f00e8f01 Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Wed, 18 Dec 2024 14:06:00 +0100 Subject: [PATCH 1/9] Incorporate Funnel --- lib/membrane/funnel.ex | 65 ++++++++++++++++++++++++++ lib/membrane/funnel/new_input_event.ex | 9 ++++ 2 files changed, 74 insertions(+) create mode 100644 lib/membrane/funnel.ex create mode 100644 lib/membrane/funnel/new_input_event.ex diff --git a/lib/membrane/funnel.ex b/lib/membrane/funnel.ex new file mode 100644 index 000000000..bc90e6e78 --- /dev/null +++ b/lib/membrane/funnel.ex @@ -0,0 +1,65 @@ +defmodule Membrane.Funnel do + @moduledoc """ + Element that can be used for collecting data from multiple inputs and sending it through one + output. When a new input connects in the `:playing` state, the funnel sends + `Membrane.Funnel.NewInputEvent` via output. + """ + use Membrane.Filter + + alias Membrane.Funnel + + def_input_pad :input, accepted_format: _any, flow_control: :auto, availability: :on_request + def_output_pad :output, accepted_format: _any, flow_control: :auto + + def_options end_of_stream: [spec: :on_last_pad | :on_first_pad | :never, default: :on_last_pad] + + @impl true + def handle_init(_ctx, opts) do + {[], %{end_of_stream: opts.end_of_stream}} + end + + @impl true + def handle_buffer(Pad.ref(:input, _id), buffer, _ctx, state) do + {[buffer: {:output, buffer}], state} + end + + @impl true + def handle_pad_added(Pad.ref(:input, _id), %{playback_state: :playing}, state) do + {[event: {:output, %Funnel.NewInputEvent{}}], state} + end + + @impl true + def handle_pad_added(Pad.ref(:input, _id), _ctx, state) do + {[], state} + end + + @impl true + def handle_end_of_stream(Pad.ref(:input, _id), _ctx, %{end_of_stream: :never} = state) do + {[], state} + end + + @impl true + def handle_end_of_stream(Pad.ref(:input, _id), ctx, %{end_of_stream: :on_first_pad} = state) do + if ctx.pads.output.end_of_stream? do + {[], state} + else + {[end_of_stream: :output], state} + end + end + + @impl true + def handle_end_of_stream(Pad.ref(:input, _id), ctx, %{end_of_stream: :on_last_pad} = state) do + if ctx |> inputs_data() |> Enum.all?(& &1.end_of_stream?) do + {[end_of_stream: :output], state} + else + {[], state} + end + end + + defp inputs_data(ctx) do + Enum.flat_map(ctx.pads, fn + {Pad.ref(:input, _id), data} -> [data] + _output -> [] + end) + end +end diff --git a/lib/membrane/funnel/new_input_event.ex b/lib/membrane/funnel/new_input_event.ex new file mode 100644 index 000000000..22f6455e0 --- /dev/null +++ b/lib/membrane/funnel/new_input_event.ex @@ -0,0 +1,9 @@ +defmodule Membrane.Funnel.NewInputEvent do + @moduledoc """ + Event sent each time new element is linked (via funnel input pad) after playing pipeline. + """ + @derive Membrane.EventProtocol + + @type t :: %__MODULE__{} + defstruct [] +end From 16c8b7512900ab36d8ac19e64a7789aa0ad65daa Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Wed, 18 Dec 2024 14:13:55 +0100 Subject: [PATCH 2/9] Incorporate Tees --- lib/membrane/tee/master.ex | 57 +++++++++++++++++++++++++++++++++ lib/membrane/tee/parallel.ex | 48 +++++++++++++++++++++++++++ lib/membrane/tee/push_output.ex | 45 ++++++++++++++++++++++++++ 3 files changed, 150 insertions(+) create mode 100644 lib/membrane/tee/master.ex create mode 100644 lib/membrane/tee/parallel.ex create mode 100644 lib/membrane/tee/push_output.ex diff --git a/lib/membrane/tee/master.ex b/lib/membrane/tee/master.ex new file mode 100644 index 000000000..08d953fa2 --- /dev/null +++ b/lib/membrane/tee/master.ex @@ -0,0 +1,57 @@ +defmodule Membrane.Tee.Master do + @moduledoc """ + Element for forwarding buffers to at least one output pad + + It has one input pad `:input` and 2 output pads: + * `:master` - is a static pad which is always available and works in pull mode + * `:copy` - is a dynamic pad that can be linked to any number of elements (including 0) and works in push mode + + The `:master` pad dictates the speed of processing data and any element (or elements) connected to `:copy` pad + will receive the same data as `:master` + """ + use Membrane.Filter + + def_input_pad :input, + availability: :always, + flow_control: :auto, + accepted_format: _any + + def_output_pad :master, + availability: :always, + flow_control: :auto, + accepted_format: _any + + def_output_pad :copy, + availability: :on_request, + flow_control: :push, + accepted_format: _any + + @impl true + def handle_init(_ctx, _opts) do + {[], %{accepted_format: nil}} + end + + @impl true + def handle_stream_format(_pad, accepted_format, _ctx, state) do + {[forward: accepted_format], %{state | accepted_format: accepted_format}} + end + + @impl true + def handle_pad_added(Pad.ref(:copy, _ref), _ctx, %{accepted_format: nil} = state) do + {[], state} + end + + @impl true + def handle_pad_added( + Pad.ref(:copy, _ref) = pad, + _ctx, + %{accepted_format: accepted_format} = state + ) do + {[stream_format: {pad, accepted_format}], state} + end + + @impl true + def handle_buffer(:input, %Membrane.Buffer{} = buffer, _ctx, state) do + {[forward: buffer], state} + end +end diff --git a/lib/membrane/tee/parallel.ex b/lib/membrane/tee/parallel.ex new file mode 100644 index 000000000..67ac91047 --- /dev/null +++ b/lib/membrane/tee/parallel.ex @@ -0,0 +1,48 @@ +defmodule Membrane.Tee.Parallel do + @moduledoc """ + Element for forwarding packets to multiple outputs. + + The processing speed is limited by the slowest consuming output. + + To use, link this element to one preceding element via `input` pad and multiple + succesive elements via `output` pads. Each buffer is forwarded only when demand for + it comes in via each output. If there are no outputs, buffers are dropped. + """ + + use Membrane.Filter + + def_input_pad :input, + availability: :always, + flow_control: :auto, + accepted_format: _any + + def_output_pad :output, + availability: :on_request, + flow_control: :auto, + accepted_format: _any + + @impl true + def handle_init(_ctx, _opts) do + {[], %{accepted_format: nil}} + end + + @impl true + def handle_stream_format(_pad, accepted_format, _ctx, state) do + {[forward: accepted_format], %{state | accepted_format: accepted_format}} + end + + @impl true + def handle_pad_added(Pad.ref(:output, _ref), _ctx, %{accepted_format: nil} = state) do + {[], state} + end + + @impl true + def handle_pad_added(pad, _ctx, %{accepted_format: accepted_format} = state) do + {[stream_format: {pad, accepted_format}], state} + end + + @impl true + def handle_buffer(:input, %Membrane.Buffer{} = buffer, _ctx, state) do + {[forward: buffer], state} + end +end diff --git a/lib/membrane/tee/push_output.ex b/lib/membrane/tee/push_output.ex new file mode 100644 index 000000000..61f602204 --- /dev/null +++ b/lib/membrane/tee/push_output.ex @@ -0,0 +1,45 @@ +defmodule Membrane.Tee.PushOutput do + @moduledoc """ + Element forwarding packets to multiple push outputs. + """ + use Membrane.Filter + + def_input_pad :input, + availability: :always, + flow_control: :auto, + accepted_format: _any + + def_output_pad :output, + availability: :on_request, + flow_control: :push, + accepted_format: _any + + @impl true + def handle_init(_ctx, _opts) do + {[], %{accepted_format: nil}} + end + + @impl true + def handle_stream_format(_pad, accepted_format, _ctx, state) do + {[forward: accepted_format], %{state | accepted_format: accepted_format}} + end + + @impl true + def handle_pad_added(Pad.ref(:output, _ref), _ctx, %{accepted_format: nil} = state) do + {[], state} + end + + @impl true + def handle_pad_added( + Pad.ref(:output, _ref) = pad, + _ctx, + %{accepted_format: accepted_format} = state + ) do + {[stream_format: {pad, accepted_format}], state} + end + + @impl true + def handle_buffer(:input, %Membrane.Buffer{} = buffer, _ctx, state) do + {[forward: buffer], state} + end +end From 92c908072b785e304bdb271888fa2d1acfcc0790 Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Wed, 18 Dec 2024 14:18:29 +0100 Subject: [PATCH 3/9] Update changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 04d410f1e..d1d7b9e83 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ## 1.2.0 * Add `:max_instances` option for dynamic pads. [#876](https://github.com/membraneframework/membrane_core/pull/876) * Implement `Membrane.Connector`. [#904](https://github.com/membraneframework/membrane_core/pull/904) + * Incorporate `Membrane.Funnel` and Tees. [#922](https://github.com/membraneframework/membrane_core/issues/922) ## 1.1.2 * Add new callback `handle_child_terminated/3` along with new assertions. [#894](https://github.com/membraneframework/membrane_core/pull/894) From f8f6b369b44dde6a6abaa16e16168662d31bb803 Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Wed, 18 Dec 2024 15:05:56 +0100 Subject: [PATCH 4/9] Merge all Tees into one --- lib/membrane/tee.ex | 67 +++++++++++++++++++++++++++++++++ lib/membrane/tee/master.ex | 57 ---------------------------- lib/membrane/tee/parallel.ex | 48 ----------------------- lib/membrane/tee/push_output.ex | 45 ---------------------- 4 files changed, 67 insertions(+), 150 deletions(-) create mode 100644 lib/membrane/tee.ex delete mode 100644 lib/membrane/tee/master.ex delete mode 100644 lib/membrane/tee/parallel.ex delete mode 100644 lib/membrane/tee/push_output.ex diff --git a/lib/membrane/tee.ex b/lib/membrane/tee.ex new file mode 100644 index 000000000..42f45b143 --- /dev/null +++ b/lib/membrane/tee.ex @@ -0,0 +1,67 @@ +defmodule Membrane.Tee do + @moduledoc """ + Element for forwarding buffers to at least one output pad + + It has one input pad `:input` and 2 output pads: + * `:output` - is a dynamic pad which is always available and works in pull mode + * `:output_copy` - is a dynamic pad that can be linked to any number of elements (including 0) and works + in push mode + + The `:output` pads dictate the speed of processing data and any element (or elements) connected to + `:output_copy` pad will receive the same data as all `:output` instances. + """ + use Membrane.Filter + + require Membrane.Logger + + def_input_pad :input, + availability: :always, + flow_control: :auto, + accepted_format: _any + + def_output_pad :output, + availability: :on_request, + flow_control: :auto, + accepted_format: _any + + def_output_pad :output_copy, + availability: :on_request, + flow_control: :push, + accepted_format: _any + + @impl true + def handle_init(_ctx, _opts) do + {[], %{stream_format: nil}} + end + + @impl true + def handle_playing(ctx, state) do + if map_size(ctx.pads) < 2 do + Membrane.Logger.debug(""" + #{inspect(__MODULE__)} enters :playing playback without any output (:output or :output_copy) \ + pads linked. + """) + end + + {[], state} + end + + @impl true + def handle_stream_format(:input, stream_format, _ctx, state) do + {[forward: stream_format], %{state | stream_format: stream_format}} + end + + @impl true + def handle_pad_added(Pad.ref(name, _ref) = output_pad, _ctx, state) + when name in [:output, :output_copy] do + case state.stream_format do + nil -> {[], state} + stream_format -> {[stream_format: {output_pad, stream_format}], state} + end + end + + @impl true + def handle_buffer(:input, buffer, _ctx, state) do + {[forward: buffer], state} + end +end diff --git a/lib/membrane/tee/master.ex b/lib/membrane/tee/master.ex deleted file mode 100644 index 08d953fa2..000000000 --- a/lib/membrane/tee/master.ex +++ /dev/null @@ -1,57 +0,0 @@ -defmodule Membrane.Tee.Master do - @moduledoc """ - Element for forwarding buffers to at least one output pad - - It has one input pad `:input` and 2 output pads: - * `:master` - is a static pad which is always available and works in pull mode - * `:copy` - is a dynamic pad that can be linked to any number of elements (including 0) and works in push mode - - The `:master` pad dictates the speed of processing data and any element (or elements) connected to `:copy` pad - will receive the same data as `:master` - """ - use Membrane.Filter - - def_input_pad :input, - availability: :always, - flow_control: :auto, - accepted_format: _any - - def_output_pad :master, - availability: :always, - flow_control: :auto, - accepted_format: _any - - def_output_pad :copy, - availability: :on_request, - flow_control: :push, - accepted_format: _any - - @impl true - def handle_init(_ctx, _opts) do - {[], %{accepted_format: nil}} - end - - @impl true - def handle_stream_format(_pad, accepted_format, _ctx, state) do - {[forward: accepted_format], %{state | accepted_format: accepted_format}} - end - - @impl true - def handle_pad_added(Pad.ref(:copy, _ref), _ctx, %{accepted_format: nil} = state) do - {[], state} - end - - @impl true - def handle_pad_added( - Pad.ref(:copy, _ref) = pad, - _ctx, - %{accepted_format: accepted_format} = state - ) do - {[stream_format: {pad, accepted_format}], state} - end - - @impl true - def handle_buffer(:input, %Membrane.Buffer{} = buffer, _ctx, state) do - {[forward: buffer], state} - end -end diff --git a/lib/membrane/tee/parallel.ex b/lib/membrane/tee/parallel.ex deleted file mode 100644 index 67ac91047..000000000 --- a/lib/membrane/tee/parallel.ex +++ /dev/null @@ -1,48 +0,0 @@ -defmodule Membrane.Tee.Parallel do - @moduledoc """ - Element for forwarding packets to multiple outputs. - - The processing speed is limited by the slowest consuming output. - - To use, link this element to one preceding element via `input` pad and multiple - succesive elements via `output` pads. Each buffer is forwarded only when demand for - it comes in via each output. If there are no outputs, buffers are dropped. - """ - - use Membrane.Filter - - def_input_pad :input, - availability: :always, - flow_control: :auto, - accepted_format: _any - - def_output_pad :output, - availability: :on_request, - flow_control: :auto, - accepted_format: _any - - @impl true - def handle_init(_ctx, _opts) do - {[], %{accepted_format: nil}} - end - - @impl true - def handle_stream_format(_pad, accepted_format, _ctx, state) do - {[forward: accepted_format], %{state | accepted_format: accepted_format}} - end - - @impl true - def handle_pad_added(Pad.ref(:output, _ref), _ctx, %{accepted_format: nil} = state) do - {[], state} - end - - @impl true - def handle_pad_added(pad, _ctx, %{accepted_format: accepted_format} = state) do - {[stream_format: {pad, accepted_format}], state} - end - - @impl true - def handle_buffer(:input, %Membrane.Buffer{} = buffer, _ctx, state) do - {[forward: buffer], state} - end -end diff --git a/lib/membrane/tee/push_output.ex b/lib/membrane/tee/push_output.ex deleted file mode 100644 index 61f602204..000000000 --- a/lib/membrane/tee/push_output.ex +++ /dev/null @@ -1,45 +0,0 @@ -defmodule Membrane.Tee.PushOutput do - @moduledoc """ - Element forwarding packets to multiple push outputs. - """ - use Membrane.Filter - - def_input_pad :input, - availability: :always, - flow_control: :auto, - accepted_format: _any - - def_output_pad :output, - availability: :on_request, - flow_control: :push, - accepted_format: _any - - @impl true - def handle_init(_ctx, _opts) do - {[], %{accepted_format: nil}} - end - - @impl true - def handle_stream_format(_pad, accepted_format, _ctx, state) do - {[forward: accepted_format], %{state | accepted_format: accepted_format}} - end - - @impl true - def handle_pad_added(Pad.ref(:output, _ref), _ctx, %{accepted_format: nil} = state) do - {[], state} - end - - @impl true - def handle_pad_added( - Pad.ref(:output, _ref) = pad, - _ctx, - %{accepted_format: accepted_format} = state - ) do - {[stream_format: {pad, accepted_format}], state} - end - - @impl true - def handle_buffer(:input, %Membrane.Buffer{} = buffer, _ctx, state) do - {[forward: buffer], state} - end -end From 59e1fca5928330e4ee566eb8533ab45cc49681c4 Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Wed, 18 Dec 2024 16:02:11 +0100 Subject: [PATCH 5/9] Add funnel and tee tests --- CHANGELOG.md | 2 +- lib/membrane/tee.ex | 18 +++++--- test/membrane/integration/funnel_test.exs | 33 ++++++++++++++ test/membrane/integration/tee_test.exs | 55 +++++++++++++++++++++++ 4 files changed, 102 insertions(+), 6 deletions(-) create mode 100644 test/membrane/integration/funnel_test.exs create mode 100644 test/membrane/integration/tee_test.exs diff --git a/CHANGELOG.md b/CHANGELOG.md index d1d7b9e83..e23bf0274 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,7 +3,7 @@ ## 1.2.0 * Add `:max_instances` option for dynamic pads. [#876](https://github.com/membraneframework/membrane_core/pull/876) * Implement `Membrane.Connector`. [#904](https://github.com/membraneframework/membrane_core/pull/904) - * Incorporate `Membrane.Funnel` and Tees. [#922](https://github.com/membraneframework/membrane_core/issues/922) + * Incorporate `Membrane.Funnel` and `Membrane.Tee`. [#922](https://github.com/membraneframework/membrane_core/issues/922) ## 1.1.2 * Add new callback `handle_child_terminated/3` along with new assertions. [#894](https://github.com/membraneframework/membrane_core/pull/894) diff --git a/lib/membrane/tee.ex b/lib/membrane/tee.ex index 42f45b143..c43c902b8 100644 --- a/lib/membrane/tee.ex +++ b/lib/membrane/tee.ex @@ -52,12 +52,20 @@ defmodule Membrane.Tee do end @impl true - def handle_pad_added(Pad.ref(name, _ref) = output_pad, _ctx, state) + def handle_pad_added(Pad.ref(name, _ref) = output_pad, ctx, state) when name in [:output, :output_copy] do - case state.stream_format do - nil -> {[], state} - stream_format -> {[stream_format: {output_pad, stream_format}], state} - end + maybe_stream_format = + case state.stream_format do + nil -> [] + stream_format -> [stream_format: {output_pad, stream_format}] + end + + maybe_eos = + if ctx.pads.input.end_of_stream?, + do: [end_of_stream: output_pad], + else: [] + + {maybe_stream_format ++ maybe_eos, state} end @impl true diff --git a/test/membrane/integration/funnel_test.exs b/test/membrane/integration/funnel_test.exs new file mode 100644 index 000000000..fb9ca5739 --- /dev/null +++ b/test/membrane/integration/funnel_test.exs @@ -0,0 +1,33 @@ +defmodule Membrane.Integration.FunnelTest do + use ExUnit.Case + + import Membrane.Testing.Assertions + + alias Membrane.{Buffer, Funnel, Testing} + + test "Collects multiple inputs" do + import Membrane.ChildrenSpec + data = 1..10 + + {:ok, _supervisor_pid, pipeline} = + Testing.Pipeline.start_link( + spec: [ + child(:funnel, Funnel), + child(:src1, %Testing.Source{output: data}) |> get_child(:funnel), + child(:src2, %Testing.Source{output: data}) |> get_child(:funnel), + get_child(:funnel) |> child(:sink, Testing.Sink) + ] + ) + + data + |> Enum.flat_map(&[&1, &1]) + |> Enum.each(fn payload -> + assert_sink_buffer(pipeline, :sink, %Buffer{payload: ^payload}) + end) + + assert_end_of_stream(pipeline, :sink) + refute_sink_buffer(pipeline, :sink, _buffer, 0) + + Membrane.Pipeline.terminate(pipeline) + end +end diff --git a/test/membrane/integration/tee_test.exs b/test/membrane/integration/tee_test.exs new file mode 100644 index 000000000..d0e1e8784 --- /dev/null +++ b/test/membrane/integration/tee_test.exs @@ -0,0 +1,55 @@ +defmodule Membrane.Integration.TeeTest do + @moduledoc false + use ExUnit.Case, async: true + use Bunch + + import Membrane.ChildrenSpec + import Membrane.Testing.Assertions + + alias Membrane.Buffer + alias Membrane.Testing.{Pipeline, Sink, Source} + + test "forwards input to three all outputs" do + range = 1..100 + sinks = [:sink1, :sink2, :sink3, :sink_4] + + spec = + [ + child(:src, %Source{output: range}) + |> child(:tee, Membrane.Tee) + ] ++ + for sink <- sinks do + pad = if sink in [:sink1, :sink2], do: :output, else: :output_copy + + get_child(:tee) + |> via_out(pad) + |> child(sink, %Sink{}) + end + + pipeline = Pipeline.start_link_supervised!(spec: spec) + + for sink <- sinks do + assert_end_of_stream(pipeline, ^sink, :input) + end + + for element <- range, sink <- sinks do + assert_sink_buffer(pipeline, sink, %Buffer{payload: ^element}) + end + + for {pad, sink} <- [output_copy: :sink5, output: :sink6] do + spec = + get_child(:tee) + |> via_out(pad) + |> child(sink, %Sink{}) + + Pipeline.execute_actions(pipeline, spec: spec) + end + + for sink <- [:sink5, :sink6] do + assert_sink_stream_format(pipeline, sink, %Membrane.RemoteStream{}) + assert_end_of_stream(pipeline, ^sink, :input) + end + + Pipeline.terminate(pipeline) + end +end From e0d45ce2fd9cf0c5c7574c7381a7ebb1dc125361 Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Wed, 18 Dec 2024 16:07:16 +0100 Subject: [PATCH 6/9] Fix warning --- lib/membrane/tee.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/membrane/tee.ex b/lib/membrane/tee.ex index c43c902b8..5b7a229de 100644 --- a/lib/membrane/tee.ex +++ b/lib/membrane/tee.ex @@ -10,7 +10,7 @@ defmodule Membrane.Tee do The `:output` pads dictate the speed of processing data and any element (or elements) connected to `:output_copy` pad will receive the same data as all `:output` instances. """ - use Membrane.Filter + use Membrane.Filter, flow_control_hints?: false require Membrane.Logger From 523cf679ca3d1576a0ae37e151824c7afb75a58a Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Wed, 18 Dec 2024 16:12:01 +0100 Subject: [PATCH 7/9] Incorporate Membrane.Fake.Sink --- lib/membrane/fake_sink.ex | 12 ++++++++++++ 1 file changed, 12 insertions(+) create mode 100644 lib/membrane/fake_sink.ex diff --git a/lib/membrane/fake_sink.ex b/lib/membrane/fake_sink.ex new file mode 100644 index 000000000..add0ba38b --- /dev/null +++ b/lib/membrane/fake_sink.ex @@ -0,0 +1,12 @@ +defmodule Membrane.Fake.Sink do + @moduledoc """ + Membrane Sink that ignores incoming data. + """ + + use Membrane.Sink + + def_input_pad :input, accepted_format: _any + + @impl true + def handle_buffer(:input, _buffer, _ctx, state), do: {[], state} +end From cd1fa25bd03b9967186c8d878967fad59392fe4d Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Wed, 18 Dec 2024 16:13:26 +0100 Subject: [PATCH 8/9] Upgrade changelog --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e23bf0274..6ae09419e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,7 +3,7 @@ ## 1.2.0 * Add `:max_instances` option for dynamic pads. [#876](https://github.com/membraneframework/membrane_core/pull/876) * Implement `Membrane.Connector`. [#904](https://github.com/membraneframework/membrane_core/pull/904) - * Incorporate `Membrane.Funnel` and `Membrane.Tee`. [#922](https://github.com/membraneframework/membrane_core/issues/922) + * Incorporate `Membrane.Funnel`, `Membrane.Tee` and `Membane.Fake.Sink`. [#922](https://github.com/membraneframework/membrane_core/issues/922) ## 1.1.2 * Add new callback `handle_child_terminated/3` along with new assertions. [#894](https://github.com/membraneframework/membrane_core/pull/894) From 87ae47b4d60e49d51bd92f4c26540b4aedff0912 Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Wed, 18 Dec 2024 16:19:24 +0100 Subject: [PATCH 9/9] Refactor Funnel --- lib/membrane/funnel.ex | 6 ++---- .../integration/effective_flow_control_resolution_test.exs | 2 +- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/lib/membrane/funnel.ex b/lib/membrane/funnel.ex index bc90e6e78..3930c3588 100644 --- a/lib/membrane/funnel.ex +++ b/lib/membrane/funnel.ex @@ -2,12 +2,10 @@ defmodule Membrane.Funnel do @moduledoc """ Element that can be used for collecting data from multiple inputs and sending it through one output. When a new input connects in the `:playing` state, the funnel sends - `Membrane.Funnel.NewInputEvent` via output. + `#{inspect(__MODULE__)}.NewInputEvent` via output. """ use Membrane.Filter - alias Membrane.Funnel - def_input_pad :input, accepted_format: _any, flow_control: :auto, availability: :on_request def_output_pad :output, accepted_format: _any, flow_control: :auto @@ -25,7 +23,7 @@ defmodule Membrane.Funnel do @impl true def handle_pad_added(Pad.ref(:input, _id), %{playback_state: :playing}, state) do - {[event: {:output, %Funnel.NewInputEvent{}}], state} + {[event: {:output, %__MODULE__.NewInputEvent{}}], state} end @impl true diff --git a/test/membrane/integration/effective_flow_control_resolution_test.exs b/test/membrane/integration/effective_flow_control_resolution_test.exs index 894207fef..ca95853ed 100644 --- a/test/membrane/integration/effective_flow_control_resolution_test.exs +++ b/test/membrane/integration/effective_flow_control_resolution_test.exs @@ -7,7 +7,7 @@ defmodule Membrane.Integration.EffectiveFlowControlResolutionTest do alias Membrane.Testing defmodule AutoFilter do - use Membrane.Filter + use Membrane.Filter, flow_control_hints?: false def_input_pad :input, availability: :on_request, accepted_format: _any def_output_pad :output, availability: :on_request, accepted_format: _any