Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement handle_spec_setup_completed and handle_spec_playing. Release v1.1.0-rc1. #801

Merged
merged 12 commits into from
Jun 3, 2024
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 1.1.0-rc1
* Add new callbacks `handle_child_setup_completed/3` and `handle_child_playing/3` in Bins and Pipelines. [#801](https://github.com/membraneframework/membrane_core/pull/801)

## 1.1.0-rc0
* Deprecate `handle_spec_started/3` callback in Bins and Pipelines. [#708](https://github.com/membraneframework/membrane_core/pull/708)
* Handle buffers from input pads having `flow_control: :auto` only if demand on all output pads having `flow_control: :auto` is positive. [#693](https://github.com/membraneframework/membrane_core/pull/693)
Expand Down
32 changes: 32 additions & 0 deletions lib/membrane/bin.ex
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,28 @@ defmodule Membrane.Bin do
state :: state
) :: callback_return

@doc """
Callback invoked when a child complete its setup.

By default, it does nothing.
"""
@callback handle_child_setup_completed(
child :: Child.name(),
context :: CallbackContext.t(),
state
) :: callback_return

@doc """
Callback invoked when a child enters `playing` playback.

By default, it does nothing.
"""
@callback handle_child_playing(
child :: Child.name(),
context :: CallbackContext.t(),
state
) :: callback_return

@doc """
Callback invoked upon each timer tick. A timer can be started with `t:Membrane.Bin.Action.start_timer/0`
action.
Expand Down Expand Up @@ -217,6 +239,8 @@ defmodule Membrane.Bin do
handle_playing: 2,
handle_info: 3,
handle_spec_started: 3,
handle_child_setup_completed: 3,
handle_child_playing: 3,
handle_element_start_of_stream: 4,
handle_element_end_of_stream: 4,
handle_child_notification: 4,
Expand Down Expand Up @@ -357,6 +381,12 @@ defmodule Membrane.Bin do
{[], state}
end

@impl true
def handle_child_setup_completed(_child, _ctx, state), do: {[], state}

@impl true
def handle_child_playing(_child, _ctx, state), do: {[], state}

@impl true
def handle_element_start_of_stream(_element, _pad, _ctx, state), do: {[], state}

Expand All @@ -381,6 +411,8 @@ defmodule Membrane.Bin do
handle_setup: 2,
handle_playing: 2,
handle_info: 3,
handle_child_setup_completed: 3,
handle_child_playing: 3,
handle_element_start_of_stream: 4,
handle_element_end_of_stream: 4,
handle_child_notification: 4,
Expand Down
2 changes: 2 additions & 0 deletions lib/membrane/child_entry.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ defmodule Membrane.ChildEntry do
The public fields are:
- `name` - child name
- `module` - child module
- `group` - child group name
- `options` - options passed to the child
- `component_type` - either `:element` or `:bin`
- `playback` - either `:stopped` or `:playing`

Other fields in the struct ARE NOT PART OF THE PUBLIC API and should not be
accessed or relied on.
Expand Down
37 changes: 35 additions & 2 deletions lib/membrane/core/parent/child_life_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,8 @@ defmodule Membrane.Core.Parent.ChildLifeController do
if Enum.all?(spec_data.children_names, &Map.fetch!(children, &1).initialized?) and
Enum.empty?(spec_data.dependent_specs) do
Membrane.Logger.debug("Spec #{inspect(spec_ref)} status changed to initialized")

state = handle_children_setup_completed(spec_data.children_names, state)
do_proceed_spec_startup(spec_ref, %{spec_data | status: :initialized}, state)
else
{spec_data, state}
Expand Down Expand Up @@ -401,17 +403,22 @@ defmodule Membrane.Core.Parent.ChildLifeController do
defp do_proceed_spec_startup(_spec_ref, %{status: :ready} = spec_data, state) do
state =
Enum.reduce(spec_data.children_names, state, fn child, state ->
%{pid: pid, terminating?: terminating?} = get_in(state, [:children, child])
%{pid: pid, terminating?: terminating?} = state.children[child]

cond do
terminating? -> Message.send(pid, :terminate)
state.playback == :playing -> Message.send(pid, :play)
true -> :ok
end

put_in(state, [:children, child, :ready?], true)
put_in(state.children[child].ready?, true)
end)

state =
with %{playback: :playing} <- state do
handle_children_playing(spec_data.children_names, state)
end

{spec_data, state}
end

Expand Down Expand Up @@ -610,6 +617,32 @@ defmodule Membrane.Core.Parent.ChildLifeController do
}
end

@spec handle_children_setup_completed(MapSet.t(Child.name()) | [Child.name()], Parent.state()) ::
Parent.state()
def handle_children_setup_completed(children_names, state) do
exec_child_playback_related_callbacks(:handle_child_setup_completed, children_names, state)
end

@spec handle_children_playing(MapSet.t(Child.name()) | [Child.name()], Parent.state()) ::
Parent.state()
def handle_children_playing(children_names, state) do
exec_child_playback_related_callbacks(:handle_child_playing, children_names, state)
end

defp exec_child_playback_related_callbacks(callback, children_names, state) do
action_handler = Component.action_handler(state)

Enum.reduce(children_names, state, fn child, state ->
CallbackHandler.exec_and_handle_callback(
callback,
action_handler,
%{context: &Component.context_from_state/1},
[child],
state
)
end)
end

@spec handle_child_pad_removed(Child.name(), Pad.ref(), Parent.state()) :: Parent.state()
def handle_child_pad_removed(child, child_pad_ref, state) do
Membrane.Logger.debug_verbose("Child #{inspect(child)} removed pad #{inspect(child_pad_ref)}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,14 @@ defmodule Membrane.Core.Parent.ChildLifeController.StartupUtils do

@spec exec_handle_spec_started([Membrane.Child.name()], Parent.state()) :: Parent.state()
def exec_handle_spec_started(children_names, state) do
callback_name = :handle_spec_started

# handle_spec_started/3 callback is deprecated, so we don't require its implementation
if function_exported?(state.module, :handle_spec_started, 3) do
if function_exported?(state.module, callback_name, 3) do
action_handler = Component.action_handler(state)

CallbackHandler.exec_and_handle_callback(
:handle_spec_started,
callback_name,
action_handler,
%{context: &Component.context_from_state/1},
[children_names],
Expand Down
32 changes: 21 additions & 11 deletions lib/membrane/core/parent/lifecycle_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ defmodule Membrane.Core.Parent.LifecycleController do
}

alias Membrane.Core.Events
alias Membrane.Core.Parent.{ChildLifeController}
alias Membrane.Core.Parent.ChildLifeController

require Membrane.Core.Component
require Membrane.Core.Message
Expand Down Expand Up @@ -43,19 +43,29 @@ defmodule Membrane.Core.Parent.LifecycleController do

activate_syncs(state.children)

Enum.each(state.children, fn {_name, %{pid: pid, ready?: ready?}} ->
if ready?, do: Message.send(pid, :play)
end)
pinged_children =
state.children
|> Enum.flat_map(fn
{child_name, %{ready?: true, terminating?: false, pid: pid}} ->
Message.send(pid, :play)
[child_name]

_other_entry ->
[]
end)

state = %{state | playback: :playing}

CallbackHandler.exec_and_handle_callback(
:handle_playing,
Component.action_handler(state),
%{context: &Component.context_from_state/1},
[],
state
)
state =
CallbackHandler.exec_and_handle_callback(
:handle_playing,
Component.action_handler(state),
%{context: &Component.context_from_state/1},
[],
state
)

ChildLifeController.handle_children_playing(pinged_children, state)
end

@spec handle_terminate_request(Parent.state()) :: Parent.state()
Expand Down
32 changes: 32 additions & 0 deletions lib/membrane/pipeline.ex
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,28 @@ defmodule Membrane.Pipeline do
state
) :: {[Action.common_actions()], state()}

@doc """
Callback invoked when a child completes its setup.

By default, it does nothing.
"""
@callback handle_child_setup_completed(
child :: Child.name(),
context :: CallbackContext.t(),
state
) :: {[Action.common_actions()], state()}

@doc """
Callback invoked when a child enters `playing` playback.

By default, it does nothing.
"""
@callback handle_child_playing(
child :: Child.name(),
context :: CallbackContext.t(),
state
) :: {[Action.common_actions()], state()}

@doc """
Callback invoked upon each timer tick. A timer can be started with `Membrane.Pipeline.Action.start_timer`
action.
Expand Down Expand Up @@ -260,6 +282,8 @@ defmodule Membrane.Pipeline do
handle_playing: 2,
handle_info: 3,
handle_spec_started: 3,
handle_child_setup_completed: 3,
handle_child_playing: 3,
handle_element_start_of_stream: 4,
handle_element_end_of_stream: 4,
handle_child_notification: 4,
Expand Down Expand Up @@ -515,6 +539,12 @@ defmodule Membrane.Pipeline do
{[], state}
end

@impl true
def handle_child_setup_completed(_child, _ctx, state), do: {[], state}

@impl true
def handle_child_playing(_child, _ctx, state), do: {[], state}

@impl true
def handle_element_start_of_stream(_element, _pad, _ctx, state), do: {[], state}

Expand All @@ -538,6 +568,8 @@ defmodule Membrane.Pipeline do
handle_setup: 2,
handle_playing: 2,
handle_info: 3,
handle_child_setup_completed: 3,
handle_child_playing: 3,
handle_element_start_of_stream: 4,
handle_element_end_of_stream: 4,
handle_child_notification: 4,
Expand Down
15 changes: 15 additions & 0 deletions lib/membrane/testing/pipeline.ex
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,21 @@ defmodule Membrane.Testing.Pipeline do
{custom_actions, Map.put(state, :custom_pipeline_state, custom_state)}
end

[:handle_child_setup_completed, :handle_child_playing]
|> Enum.map(fn callback ->
@impl true
def unquote(callback)(child, ctx, %State{} = state) do
{custom_actions, custom_state} =
eval_injected_module_callback(
unquote(callback),
[child, ctx],
state
)

{custom_actions, Map.put(state, :custom_pipeline_state, custom_state)}
end
end)

@impl true
def handle_info({__MODULE__, :__execute_actions__, actions}, _ctx, %State{} = state) do
{actions, state}
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
defmodule Membrane.Mixfile do
use Mix.Project

@version "1.1.0-rc0"
@version "1.1.0-rc1"
@source_ref "v#{@version}"

def project do
Expand Down
Loading
Loading