Skip to content

Commit

Permalink
Improve Pipeline docs (#744)
Browse files Browse the repository at this point in the history
  • Loading branch information
bradhanks authored Feb 23, 2024
1 parent 4c37a06 commit 475c22c
Showing 1 changed file with 79 additions and 69 deletions.
148 changes: 79 additions & 69 deletions lib/membrane/pipeline.ex
Original file line number Diff line number Diff line change
@@ -1,29 +1,25 @@
defmodule Membrane.Pipeline do
@moduledoc """
Module containing functions for constructing and supervising pipelines.
A behaviour module for implementing pipelines.
Pipelines are units that make it possible to instantiate, link and manage
elements and bins in convenient way (actually they should always be used inside
a pipeline). Linking pipeline children together enables them to pass data to one
another, and process it in different ways.
To create a pipeline, use the `__using__/1` macro and implement callbacks
of `Membrane.Pipeline` behaviour. For details on instantiating and linking
children, see `Membrane.ChildrenSpec`.
`Membrane.Pipeline` contains the callbacks and functions for constructing and supervising pipelines.
Pipelines facilitate the convenient instantiation, linking, and management of elements and bins.\\
Linking pipeline children together enables them to pass and process data.
To create a pipeline, use `use Membrane.Pipeline` and implement callbacks of `Membrane.Pipeline` behaviour.
See `Membrane.ChildrenSpec` for details on instantiating and linking children.
## Starting and supervision
Pipeline can be started with `start_link/2` or `start/2` functions. They both
return `{:ok, supervisor_pid, pipeline_pid}` in case of success, because the pipeline
is always spawned under a dedicated supervisor. The supervisor never restarts the
pipeline, but it makes sure that the pipeline and its children terminate properly.
If the pipeline needs to be restarted in case of failure, it should be spawned under
another supervisor with a proper strategy.
Start a pipeline with `start_link/2` or `start/2`. Pipelines always spawn under a dedicated supervisor, so
in the case of success, either function will return `{:ok, supervisor_pid, pipeline_pid}` .
The supervisor never restarts the pipeline, but it does ensure that the pipeline and its children terminate properly.
If the pipeline needs to be restarted, it should be spawned under a different supervisor with the appropriate strategy.
### Starting under a supervision tree
The pipeline can be spawned under a supervision tree like any `GenServer`. Also,
`__using__/1` macro injects a `child_spec/1` function. A simple scenario can look like:
A pipeline can be spawned under a supervision tree like any other `GenServer`.\\
`use Membrane.Pipeline` injects a `child_spec/1` function. A simple scenario could look like this:
defmodule MyPipeline do
use Membrane.Pipeline
Expand All @@ -40,22 +36,22 @@ defmodule Membrane.Pipeline do
### Starting outside of a supervision tree
When starting a pipeline outside a supervision tree and willing to interact with
the pipeline by pid, `pipeline_pid` returned from `start_link` can be used, for example
When starting a pipeline outside a supervision tree, use the `pipeline_pid` pid to interact with the pipeline.
A simple scenario could look like this:
{:ok, _supervisor_pid, pipeline_pid} = Membrane.Pipeline.start_link(MyPipeline, option: :value)
send(pipeline_pid, :message)
### Visualizing pipeline's supervision tree
### Visualizing the supervision tree
Pipeline's internal supervision tree can be looked up with Applications tab of Erlang's Stalker
or with Livebook's `Kino` library.
For debugging (and ONLY for debugging) purposes, you may use the following configuration:
Use the [Applications tab](https://www.erlang.org/doc/apps/observer/observer_ug#applications-tab) in Erlang's Observer GUI
(or the `Kino` library in Livebook) to visualize a pipeline's internal supervision tree. Use the following configuration for debugging purposes only:
config :membrane_core, unsafely_name_processes_for_stalker: [:components]
that makes the stalker's process tree graph more readable by naming pipeline's descendants, for example:
![Stalker graph](assets/images/stalker_graph.png).
This improves the readability of the Observer's process tree graph by naming the pipeline descendants, as demonstrated here:
![Observer graph](assets/images/observer_graph.png).
"""

use Bunch
Expand All @@ -67,50 +63,58 @@ defmodule Membrane.Pipeline do
require Membrane.Core.Message, as: Message

@typedoc """
Defines options that can be passed to `start/3` / `start_link/3` and received
in `c:handle_init/2` callback.
Defines options passed to the `start/3` and `start_link/3` and subsequently received
in the `c:handle_init/2` callback.
"""
@type pipeline_options :: any

@typedoc "The Pipeline name"
@type name :: GenServer.name()

@typedoc "List of configurations used by `start/3` and `start_link/3`."
@type config :: [config_entry()]

@typedoc "Defines configuration value used by the `start/3` and `start_link/3`."
@type config_entry :: {:name, name()}

@typedoc """
Defines the return value of the `start/3` and `start_link/3`."
"""
@type on_start ::
{:ok, supervisor_pid :: pid, pipeline_pid :: pid}
| {:error, {:already_started, pid()} | term()}

@typedoc """
The pipeline state.
"""
@type state :: any()

@typedoc """
Defines return values from Pipeline callback functions.
## Return values
* `{[action], state}` - Return a list of actions that will be performed within the
pipeline. This can be used to start new children, or to send messages to specific children,
for example. Actions are a tuple of `{type, arguments}`, so may be written in the
form a keyword list. See `Membrane.Pipeline.Action` for more info.
* `{[action], state}` - Returns a list of actions that will be performed within the
pipeline, e.g., starting new children, sending messages to specific children, etc.
Actions are tuples of `{type, arguments}`, so they can be expressed as a keyword list.
See `Membrane.Pipeline.Action` for more info.
"""
@type callback_return ::
{[Action.t()], state}

@doc """
Callback invoked on initialization of pipeline.
Callback invoked on initialization of the pipeline.
This callback is synchronous: the process which started the pipeline waits until `handle_init`
finishes. For that reason, it's important to do any long-lasting or complex work in `c:handle_setup/2`,
while `handle_init` should be used for things like parsing options, initializing state or spawning
children.
By default, it converts the `opts` to a map if they're a struct and sets them as the pipeline state.
This callback is synchronous: the process that started the pipeline waits until `handle_init`
finishes, so it's important to do any long-lasting or complex work in `c:handle_setup/2`.
`handle_init` should be used for things, like parsing options, initializing state, or spawning
children. By default, `handle_init` converts `opts` to a map if they're a struct and sets them as the pipeline state.
"""
@callback handle_init(context :: CallbackContext.t(), options :: pipeline_options) ::
{[Action.common_actions()], state()}

@doc """
Callback invoked when pipeline is requested to terminate with `terminate/2`.
Callback invoked when the pipeline is requested to terminate with `terminate/2`.
By default, it returns `t:Membrane.Pipeline.Action.terminate/0` with reason `:normal`.
"""
@callback handle_terminate_request(context :: CallbackContext.t(), state) ::
Expand All @@ -129,7 +133,7 @@ defmodule Membrane.Pipeline do
{[Action.common_actions()], state()}

@doc """
Callback invoked when pipeline switches the playback to `:playing`.
Callback invoked when the pipeline switches the playback to `:playing`.
By default, it does nothing.
"""
@callback handle_playing(
Expand Down Expand Up @@ -166,10 +170,10 @@ defmodule Membrane.Pipeline do
) :: {[Action.common_actions()], state()}

@doc """
Callback invoked when pipeline receives a message that is not recognized
as an internal membrane message.
Callback invoked when the pipeline receives a message that is not recognized
as an internal Membrane message.
Useful for receiving data sent from NIFs or other stuff.
Useful for receiving data sent from NIFs or other external sources.
By default, it logs and ignores the received message.
"""
@callback handle_info(
Expand All @@ -180,7 +184,7 @@ defmodule Membrane.Pipeline do
{[Action.common_actions()], state()}

@doc """
Callback invoked when a child element starts processing stream via given pad.
Callback invoked when a child element starts processing a stream via the given pad.
By default, it does nothing.
"""
Expand All @@ -192,7 +196,7 @@ defmodule Membrane.Pipeline do
) :: {[Action.common_actions()], state()}

@doc """
Callback invoked when a child element finishes processing stream via given pad.
Callback invoked when a child element finishes processing a stream via the given pad.
By default, it does nothing.
"""
Expand Down Expand Up @@ -225,7 +229,7 @@ defmodule Membrane.Pipeline do
) :: {[Action.common_actions()], state()}

@doc """
Callback invoked when crash of the crash group happens.
Callback invoked when a crash group crashes.
Context passed to this callback contains 2 additional fields: `:members` and `:crash_initiator`.
By default, it does nothing.
Expand All @@ -237,9 +241,9 @@ defmodule Membrane.Pipeline do
) :: {[Action.common_actions()], state()}

@doc """
Callback invoked when pipeline is called using a synchronous call.
Callback invoked when the pipeline is called using a synchronous call.
Context passed to this callback contains additional field `:from`.
Context passed to this callback contains an additional field `:from`.
By default, it does nothing.
"""
@callback handle_call(
Expand All @@ -264,10 +268,10 @@ defmodule Membrane.Pipeline do
handle_child_pad_removed: 4

@doc """
Starts the Pipeline based on given module and links it to the current
process.
Starts the pipeline based on the given module and links it to the current process.
Pipeline options are passed to module's `c:handle_init/2` callback.
Pipeline options are passed to the `c:handle_init/2` callback.
Note that this function returns `{:ok, supervisor_pid, pipeline_pid}` in case of
success. Check the 'Starting and supervision' section of the moduledoc for details.
"""
Expand All @@ -276,7 +280,7 @@ defmodule Membrane.Pipeline do
do: do_start(:start_link, module, pipeline_options, process_options)

@doc """
Does the same as `start_link/3` but starts process outside of supervision tree.
Starts the pipeline outside a supervision tree. Compare to `start_link/3`.
"""
@spec start(module, pipeline_options, config) :: on_start
def start(module, pipeline_options \\ nil, process_options \\ []),
Expand Down Expand Up @@ -328,21 +332,21 @@ defmodule Membrane.Pipeline do
Terminates the pipeline.
Accepts three options:
* `asynchronous?` - if set to `true`, pipline termination won't be blocking and
will be executed in the process, which pid is returned as function result. If
set to `false`, pipeline termination will be blocking and will be executed in
the process that called this function. Defaults to `false`.
* `timeout` - tells how much time (ms) to wait for pipeline to get gracefully
terminated. Defaults to 5000.
* `force?` - if set to `true` and pipeline is still alive after `timeout`,
pipeline will be killed using `Process.exit/2` with reason `:kill`, and function
will return `{:error, :timeout}`. If set to `false` and pipeline is still alive
after `timeout`, function will raise an error. Defaults to `false`.
* `asynchronous?` - if set to `true`, pipeline termination won't be blocking and
will be executed in the process whose pid is returned as a function result.
If set to `false`, pipeline termination will be blocking and will be executed in
the process that called this function. Defaults to `false`.
* `timeout` - specifies how much time (ms) to wait for the pipeline to gracefully
terminate. Defaults to 5000.
* `force?` - determines how to handle a pipeline still alive after `timeout`.
If set to `true`, `Process.exit/2` kills the pipeline with reason `:kill` and returns
`{:error, :timeout}`.
If set to `false`, it raises an error. Defaults to `false`.
Returns:
* `{:ok, pid}` - if option `asynchronous?: true` was passed.
* `:ok` - if pipeline was gracefully terminated within `timeout`.
* `{:error, :timeout}` - if pipeline was killed after a `timeout`.
* `{:ok, pid}` - option `asynchronous?: true` was passed.
* `:ok` - pipeline gracefully terminated within `timeout`.
* `{:error, :timeout}` - pipeline was killed after `timeout`.
"""
@spec terminate(pipeline :: pid,
timeout: timeout(),
Expand Down Expand Up @@ -393,23 +397,28 @@ defmodule Membrane.Pipeline do
end
end

@doc """
Calls the pipeline with a message.
Returns the result of the pipeline call.
"""
@spec call(pid, any, timeout()) :: term()
def call(pipeline, message, timeout \\ 5000) do
GenServer.call(pipeline, message, timeout)
end

@doc """
Checks whether module is a pipeline.
Checks whether the module is a pipeline.
"""
@spec pipeline?(module) :: boolean
def pipeline?(module) do
module |> Bunch.Module.check_behaviour(:membrane_pipeline?)
end

@doc """
Lists PIDs of all the pipelines currently running on the current node.
Returns list of pipeline PIDs currently running on the current node.
Use only for debugging purposes.
Use for debugging only.
"""
@spec list_pipelines() :: [pid]
def list_pipelines() do
Expand All @@ -423,7 +432,8 @@ defmodule Membrane.Pipeline do
end

@doc """
Like `list_pipelines/0`, but allows to pass a node.
Returns list of pipeline PIDs currently running on the passed node. \\
Compare to `list_pipelines/0`.
"""
@spec list_pipelines(node()) :: [pid]
def list_pipelines(node) do
Expand Down

0 comments on commit 475c22c

Please sign in to comment.