Skip to content

Commit

Permalink
Adjust to incorporating Funnel into membrane_core (#17)
Browse files Browse the repository at this point in the history
* Adjust to incorporating Funnel into membrane_core

* Implement review comments
  • Loading branch information
FelonEkonom authored Dec 23, 2024
1 parent 5e97caf commit 5fad92d
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 59 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ The package can be installed by adding `membrane_funnel_plugin` to your list of
```elixir
def deps do
[
{:membrane_funnel_plugin, "~> 0.9.1"}
{:membrane_funnel_plugin, "~> 0.9.2"}
]
end
```
Expand Down
105 changes: 55 additions & 50 deletions lib/membrane_funnel.ex
Original file line number Diff line number Diff line change
@@ -1,65 +1,70 @@
defmodule Membrane.Funnel do
@moduledoc """
Element that can be used for collecting data from multiple inputs and sending it through one
output. When a new input connects in the `:playing` state, the funnel sends
`Membrane.Funnel.NewInputEvent` via output.
"""
use Membrane.Filter
module_name = Membrane.Funnel

alias Membrane.Funnel
if not Code.ensure_loaded?(module_name) do
defmodule module_name do
@moduledoc """
Element that can be used for collecting data from multiple inputs and sending it through one
output. When a new input connects in the `:playing` state, the funnel sends
`Membrane.Funnel.NewInputEvent` via output.
"""
use Membrane.Filter

def_input_pad :input, accepted_format: _any, flow_control: :auto, availability: :on_request
def_output_pad :output, accepted_format: _any, flow_control: :auto
def_input_pad :input, accepted_format: _any, flow_control: :auto, availability: :on_request
def_output_pad :output, accepted_format: _any, flow_control: :auto

def_options end_of_stream: [spec: :on_last_pad | :on_first_pad | :never, default: :on_last_pad]
def_options end_of_stream: [
spec: :on_last_pad | :on_first_pad | :never,
default: :on_last_pad
]

@impl true
def handle_init(_ctx, opts) do
{[], %{end_of_stream: opts.end_of_stream}}
end

@impl true
def handle_buffer(Pad.ref(:input, _id), buffer, _ctx, state) do
{[buffer: {:output, buffer}], state}
end

@impl true
def handle_pad_added(Pad.ref(:input, _id), %{playback_state: :playing}, state) do
{[event: {:output, %Funnel.NewInputEvent{}}], state}
end
@impl true
def handle_init(_ctx, opts) do
{[], %{end_of_stream: opts.end_of_stream}}
end

@impl true
def handle_pad_added(Pad.ref(:input, _id), _ctx, state) do
{[], state}
end
@impl true
def handle_buffer(Pad.ref(:input, _id), buffer, _ctx, state) do
{[buffer: {:output, buffer}], state}
end

@impl true
def handle_end_of_stream(Pad.ref(:input, _id), _ctx, %{end_of_stream: :never} = state) do
{[], state}
end
@impl true
def handle_pad_added(Pad.ref(:input, _id), %{playback_state: :playing}, state) do
{[event: {:output, %__MODULE__.NewInputEvent{}}], state}
end

@impl true
def handle_end_of_stream(Pad.ref(:input, _id), ctx, %{end_of_stream: :on_first_pad} = state) do
if ctx.pads.output.end_of_stream? do
@impl true
def handle_pad_added(Pad.ref(:input, _id), _ctx, state) do
{[], state}
else
{[end_of_stream: :output], state}
end
end

@impl true
def handle_end_of_stream(Pad.ref(:input, _id), ctx, %{end_of_stream: :on_last_pad} = state) do
if ctx |> inputs_data() |> Enum.all?(& &1.end_of_stream?) do
{[end_of_stream: :output], state}
else
@impl true
def handle_end_of_stream(Pad.ref(:input, _id), _ctx, %{end_of_stream: :never} = state) do
{[], state}
end
end

defp inputs_data(ctx) do
Enum.flat_map(ctx.pads, fn
{Pad.ref(:input, _id), data} -> [data]
_output -> []
end)
@impl true
def handle_end_of_stream(Pad.ref(:input, _id), ctx, %{end_of_stream: :on_first_pad} = state) do
if ctx.pads.output.end_of_stream? do
{[], state}
else
{[end_of_stream: :output], state}
end
end

@impl true
def handle_end_of_stream(Pad.ref(:input, _id), ctx, %{end_of_stream: :on_last_pad} = state) do
if ctx |> inputs_data() |> Enum.all?(& &1.end_of_stream?) do
{[end_of_stream: :output], state}
else
{[], state}
end
end

defp inputs_data(ctx) do
Enum.flat_map(ctx.pads, fn
{Pad.ref(:input, _id), data} -> [data]
_output -> []
end)
end
end
end
18 changes: 11 additions & 7 deletions lib/membrane_funnel/new_input_event.ex
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
defmodule Membrane.Funnel.NewInputEvent do
@moduledoc """
Event sent each time new element is linked (via funnel input pad) after playing pipeline.
"""
@derive Membrane.EventProtocol
module_name = Membrane.Funnel.NewInputEvent

@type t :: %__MODULE__{}
defstruct []
if not Code.ensure_loaded?(module_name) do
defmodule module_name do
@moduledoc """
Event sent each time new element is linked (via funnel input pad) after playing pipeline.
"""
@derive Membrane.EventProtocol

@type t :: %__MODULE__{}
defstruct []
end
end
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
defmodule Membrane.Funnel.Plugin.Mixfile do
use Mix.Project

@version "0.9.1"
@version "0.9.2"
@github_url "https://github.com/membraneframework/membrane_funnel_plugin"

def project do
Expand Down

0 comments on commit 5fad92d

Please sign in to comment.