Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Incorporate Funnel, Tee and Fake.Sink #922

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## 1.2.0
* Add `:max_instances` option for dynamic pads. [#876](https://github.com/membraneframework/membrane_core/pull/876)
* Implement `Membrane.Connector`. [#904](https://github.com/membraneframework/membrane_core/pull/904)
* Incorporate `Membrane.Funnel`, `Membrane.Tee` and `Membane.Fake.Sink`. [#922](https://github.com/membraneframework/membrane_core/issues/922)

## 1.1.2
* Add new callback `handle_child_terminated/3` along with new assertions. [#894](https://github.com/membraneframework/membrane_core/pull/894)
Expand Down
12 changes: 12 additions & 0 deletions lib/membrane/fake_sink.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
defmodule Membrane.Fake.Sink do
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about

Suggested change
defmodule Membrane.Fake.Sink do
defmodule Membrane.FakeSink do

@moduledoc """
Membrane Sink that ignores incoming data.
"""

use Membrane.Sink

def_input_pad :input, accepted_format: _any

@impl true
def handle_buffer(:input, _buffer, _ctx, state), do: {[], state}
end
63 changes: 63 additions & 0 deletions lib/membrane/funnel.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
defmodule Membrane.Funnel do
@moduledoc """
Element that can be used for collecting data from multiple inputs and sending it through one
output. When a new input connects in the `:playing` state, the funnel sends
`#{inspect(__MODULE__)}.NewInputEvent` via output.
Comment on lines +3 to +5
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Element that can be used for collecting data from multiple inputs and sending it through one
output. When a new input connects in the `:playing` state, the funnel sends
`#{inspect(__MODULE__)}.NewInputEvent` via output.
Element that can be used for collecting data from multiple inputs and sending it through one
output.
When a new input connects in the `:playing` state, the funnel sends
`#{inspect(__MODULE__)}.NewInputEvent` via output.

"""
use Membrane.Filter

def_input_pad :input, accepted_format: _any, flow_control: :auto, availability: :on_request
def_output_pad :output, accepted_format: _any, flow_control: :auto

def_options end_of_stream: [spec: :on_last_pad | :on_first_pad | :never, default: :on_last_pad]

@impl true
def handle_init(_ctx, opts) do
{[], %{end_of_stream: opts.end_of_stream}}
end

@impl true
def handle_buffer(Pad.ref(:input, _id), buffer, _ctx, state) do
{[buffer: {:output, buffer}], state}
end

@impl true
def handle_pad_added(Pad.ref(:input, _id), %{playback_state: :playing}, state) do
{[event: {:output, %__MODULE__.NewInputEvent{}}], state}
end

@impl true
def handle_pad_added(Pad.ref(:input, _id), _ctx, state) do
{[], state}
end

@impl true
def handle_end_of_stream(Pad.ref(:input, _id), _ctx, %{end_of_stream: :never} = state) do
{[], state}
end

@impl true
def handle_end_of_stream(Pad.ref(:input, _id), ctx, %{end_of_stream: :on_first_pad} = state) do
if ctx.pads.output.end_of_stream? do
{[], state}
else
{[end_of_stream: :output], state}
end
end

@impl true
def handle_end_of_stream(Pad.ref(:input, _id), ctx, %{end_of_stream: :on_last_pad} = state) do
if ctx |> inputs_data() |> Enum.all?(& &1.end_of_stream?) do
{[end_of_stream: :output], state}
else
{[], state}
end
end

defp inputs_data(ctx) do
Enum.flat_map(ctx.pads, fn
{Pad.ref(:input, _id), data} -> [data]
_output -> []
end)
end
end
9 changes: 9 additions & 0 deletions lib/membrane/funnel/new_input_event.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
defmodule Membrane.Funnel.NewInputEvent do
@moduledoc """
Event sent each time new element is linked (via funnel input pad) after playing pipeline.
"""
@derive Membrane.EventProtocol

@type t :: %__MODULE__{}
defstruct []
end
75 changes: 75 additions & 0 deletions lib/membrane/tee.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
defmodule Membrane.Tee do
@moduledoc """
Element for forwarding buffers to at least one output pad

It has one input pad `:input` and 2 output pads:
* `:output` - is a dynamic pad which is always available and works in pull mode
* `:output_copy` - is a dynamic pad that can be linked to any number of elements (including 0) and works
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While copy made (a bit of) sense along with the master pad, now it doesn't IMO. Maybe push_output?

in push mode

The `:output` pads dictate the speed of processing data and any element (or elements) connected to
`:output_copy` pad will receive the same data as all `:output` instances.
"""
use Membrane.Filter, flow_control_hints?: false

require Membrane.Logger

def_input_pad :input,
availability: :always,
flow_control: :auto,
accepted_format: _any

def_output_pad :output,
availability: :on_request,
flow_control: :auto,
accepted_format: _any

def_output_pad :output_copy,
availability: :on_request,
flow_control: :push,
accepted_format: _any

@impl true
def handle_init(_ctx, _opts) do
{[], %{stream_format: nil}}
end

@impl true
def handle_playing(ctx, state) do
if map_size(ctx.pads) < 2 do
Membrane.Logger.debug("""
#{inspect(__MODULE__)} enters :playing playback without any output (:output or :output_copy) \
pads linked.
""")
end

{[], state}
end

@impl true
def handle_stream_format(:input, stream_format, _ctx, state) do
{[forward: stream_format], %{state | stream_format: stream_format}}
end

@impl true
def handle_pad_added(Pad.ref(name, _ref) = output_pad, ctx, state)
when name in [:output, :output_copy] do
maybe_stream_format =
case state.stream_format do
nil -> []
stream_format -> [stream_format: {output_pad, stream_format}]
end

maybe_eos =
if ctx.pads.input.end_of_stream?,
do: [end_of_stream: output_pad],
else: []

{maybe_stream_format ++ maybe_eos, state}
end

@impl true
def handle_buffer(:input, buffer, _ctx, state) do
{[forward: buffer], state}
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ defmodule Membrane.Integration.EffectiveFlowControlResolutionTest do
alias Membrane.Testing

defmodule AutoFilter do
use Membrane.Filter
use Membrane.Filter, flow_control_hints?: false

def_input_pad :input, availability: :on_request, accepted_format: _any
def_output_pad :output, availability: :on_request, accepted_format: _any
Expand Down
33 changes: 33 additions & 0 deletions test/membrane/integration/funnel_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
defmodule Membrane.Integration.FunnelTest do
use ExUnit.Case

import Membrane.Testing.Assertions

alias Membrane.{Buffer, Funnel, Testing}

test "Collects multiple inputs" do
import Membrane.ChildrenSpec
data = 1..10

{:ok, _supervisor_pid, pipeline} =
Testing.Pipeline.start_link(
spec: [
child(:funnel, Funnel),
child(:src1, %Testing.Source{output: data}) |> get_child(:funnel),
child(:src2, %Testing.Source{output: data}) |> get_child(:funnel),
get_child(:funnel) |> child(:sink, Testing.Sink)
]
)

data
|> Enum.flat_map(&[&1, &1])
|> Enum.each(fn payload ->
assert_sink_buffer(pipeline, :sink, %Buffer{payload: ^payload})
end)

assert_end_of_stream(pipeline, :sink)
refute_sink_buffer(pipeline, :sink, _buffer, 0)

Membrane.Pipeline.terminate(pipeline)
end
end
55 changes: 55 additions & 0 deletions test/membrane/integration/tee_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
defmodule Membrane.Integration.TeeTest do
@moduledoc false
use ExUnit.Case, async: true
use Bunch

import Membrane.ChildrenSpec
import Membrane.Testing.Assertions

alias Membrane.Buffer
alias Membrane.Testing.{Pipeline, Sink, Source}

test "forwards input to three all outputs" do
range = 1..100
sinks = [:sink1, :sink2, :sink3, :sink_4]

spec =
[
child(:src, %Source{output: range})
|> child(:tee, Membrane.Tee)
] ++
for sink <- sinks do
pad = if sink in [:sink1, :sink2], do: :output, else: :output_copy

get_child(:tee)
|> via_out(pad)
|> child(sink, %Sink{})
end

pipeline = Pipeline.start_link_supervised!(spec: spec)

for sink <- sinks do
assert_end_of_stream(pipeline, ^sink, :input)
end

for element <- range, sink <- sinks do
assert_sink_buffer(pipeline, sink, %Buffer{payload: ^element})
end

for {pad, sink} <- [output_copy: :sink5, output: :sink6] do
spec =
get_child(:tee)
|> via_out(pad)
|> child(sink, %Sink{})

Pipeline.execute_actions(pipeline, spec: spec)
end

for sink <- [:sink5, :sink6] do
assert_sink_stream_format(pipeline, sink, %Membrane.RemoteStream{})
assert_end_of_stream(pipeline, ^sink, :input)
end

Pipeline.terminate(pipeline)
end
end
Loading