Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Telemetry (#905) #918

Draft
wants to merge 7 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,13 @@ import Config

if config_env() == :test do
config :junit_formatter, include_filename?: true

config :membrane_core, :telemetry_flags,
include: [
:init,
:setup,
:terminate,
:playing,
:link
]
end
7 changes: 3 additions & 4 deletions lib/membrane/core/bin.ex
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,9 @@ defmodule Membrane.Core.Bin do
{:continue, :setup}}
def init(options) do
Utils.log_on_error do
do_init(options)
Telemetry.report_span :bin, :init do
do_init(options)
end
Comment on lines +99 to +101
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we will have an appropriate span for handle_init in CallbackHandler, there is no need for span for :init here

end
end

Expand Down Expand Up @@ -127,9 +129,6 @@ defmodule Membrane.Core.Bin do
{:ok, resource_guard} =
SubprocessSupervisor.start_utility(options.subprocess_supervisor, {ResourceGuard, self()})

Telemetry.report_init(:bin)
ResourceGuard.register(resource_guard, fn -> Telemetry.report_terminate(:bin) end)

state =
%State{
module: module,
Expand Down
6 changes: 5 additions & 1 deletion lib/membrane/core/callback_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ defmodule Membrane.Core.CallbackHandler do

use Bunch

require Membrane.Core.Telemetry
alias Membrane.Core.Telemetry
alias Membrane.CallbackError

require Membrane.Logger
Expand Down Expand Up @@ -136,7 +138,9 @@ defmodule Membrane.Core.CallbackHandler do

callback_result =
try do
apply(module, callback, args)
Telemetry.report_span :element, :terminate do
apply(module, callback, args)
end
Comment on lines +141 to +143
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This apply executes all custom callbacks implemented in the membrane components, not only terminate (but handle_terminate is executed here as well as the rest of the handle_* callbacks) and not only in Elements, but also in Bins and Pipelines. Take a look at the path to the file or module name: there is no element in them, so it suggests that this code is used not only in elements.

Suggested change
Telemetry.report_span :element, :terminate do
apply(module, callback, args)
end
component_type =
case state do
%Membrane.Core.Element.State{} -> :element
# the same for bin and pipeline
end
Telemetry.report_span component_type, callback do
apply(module, callback, args)
end

Doing it this way will emit event for ALL callbacks and I think this is the thing we want to do

rescue
e in UndefinedFunctionError ->
_ignored =
Expand Down
19 changes: 12 additions & 7 deletions lib/membrane/core/element.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ defmodule Membrane.Core.Element do

alias Membrane.{Clock, Core, ResourceGuard, Sync}
alias Membrane.Core.Child.PadSpecHandler
alias Membrane.Core.Telemetry

alias Membrane.Core.Element.{
BufferController,
Expand Down Expand Up @@ -117,10 +118,6 @@ defmodule Membrane.Core.Element do
{:ok, resource_guard} =
SubprocessSupervisor.start_utility(options.subprocess_supervisor, {ResourceGuard, self()})

Telemetry.report_init(:element)

ResourceGuard.register(resource_guard, fn -> Telemetry.report_terminate(:element) end)

self_pid = self()

Stalker.register_metric_function(:message_queue_length, fn ->
Expand Down Expand Up @@ -156,15 +153,21 @@ defmodule Membrane.Core.Element do
}
|> PadSpecHandler.init_pads()

state = LifecycleController.handle_init(options.user_options, state)
state =
Telemetry.report_span :element, :init do
LifecycleController.handle_init(options.user_options, state)
end

{:ok, state, {:continue, :setup}}
end

@impl GenServer
def handle_continue(:setup, state) do
Utils.log_on_error do
state = LifecycleController.handle_setup(state)
{:noreply, state}
Telemetry.report_span :element, :setup do
state = LifecycleController.handle_setup(state)
{:noreply, state}
end
end
end

Expand All @@ -184,6 +187,7 @@ defmodule Membrane.Core.Element do
_from,
state
) do
Telemetry.report_link(this, other)
{reply, state} = PadController.handle_link(direction, this, other, params, state)
{:reply, reply, state}
end
Expand Down Expand Up @@ -246,6 +250,7 @@ defmodule Membrane.Core.Element do

defp do_handle_info(Message.new(:play), state) do
state = LifecycleController.handle_playing(state)
Telemetry.report_playing(:element)
{:noreply, state}
end

Expand Down
2 changes: 1 addition & 1 deletion lib/membrane/core/element/buffer_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ defmodule Membrane.Core.Element.BufferController do
"""
@spec exec_buffer_callback(Pad.ref(), [Buffer.t()], State.t()) :: State.t()
def exec_buffer_callback(pad_ref, buffers, %State{type: :filter} = state) do
Telemetry.report_metric("buffer", 1, inspect(pad_ref))
Telemetry.report_metric(:buffer, 1, inspect(pad_ref))

do_exec_buffer_callback(pad_ref, buffers, state)
end
Expand Down
18 changes: 11 additions & 7 deletions lib/membrane/core/element/lifecycle_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ defmodule Membrane.Core.Element.LifecycleController do

use Bunch

require Membrane.Core.Telemetry
alias Membrane.Core.Telemetry
alias Membrane.{Clock, Element, Sync}
alias Membrane.Core.{CallbackHandler, Element, Message, SubprocessSupervisor}

Expand Down Expand Up @@ -81,13 +83,15 @@ defmodule Membrane.Core.Element.LifecycleController do
|> EffectiveFlowController.resolve_effective_flow_control()

state =
CallbackHandler.exec_and_handle_callback(
:handle_playing,
ActionHandler,
%{context: &CallbackContext.from_state/1},
[],
state
)
Telemetry.report_span :element, :playing do
CallbackHandler.exec_and_handle_callback(
:handle_playing,
ActionHandler,
%{context: &CallbackContext.from_state/1},
[],
state
)
end

PlaybackQueue.eval(state)
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,11 +132,11 @@ defmodule Membrane.Core.Element.ManualFlowController.InputQueue do

def store(input_queue, :buffer, v), do: store(input_queue, :buffers, [v])

def store(%__MODULE__{q: q, size: size} = input_queue, type, v)
def store(%__MODULE__{q: q} = input_queue, type, v)
when type in @non_buf_types do
"Storing #{type}" |> mk_log(input_queue) |> Membrane.Logger.debug_verbose()

Telemetry.report_metric(:store, size, input_queue.log_tag)
Telemetry.report_metric(:store, input_queue.size, input_queue.log_tag)

%__MODULE__{input_queue | q: q |> @qe.push({:non_buffer, type, v})}
end
Expand Down
17 changes: 7 additions & 10 deletions lib/membrane/core/pipeline.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ defmodule Membrane.Core.Pipeline do
require Membrane.Core.Utils, as: Utils
require Membrane.Core.Message, as: Message
require Membrane.Core.Telemetry, as: Telemetry
require Membrane.Core.Component

@spec get_stalker(pipeline :: pid()) :: Membrane.Core.Stalker.t()
def get_stalker(pipeline) do
Expand All @@ -25,7 +24,9 @@ defmodule Membrane.Core.Pipeline do
@impl GenServer
def init(params) do
Utils.log_on_error do
do_init(params)
Telemetry.report_span :pipeline, :init do
do_init(params)
end
end
end

Expand All @@ -43,12 +44,6 @@ defmodule Membrane.Core.Pipeline do
{:ok, resource_guard} =
SubprocessSupervisor.start_utility(subprocess_supervisor, {ResourceGuard, self()})

Telemetry.report_init(:pipeline)

ResourceGuard.register(resource_guard, fn ->
Telemetry.report_terminate(:pipeline)
end)

{:ok, clock_proxy} =
SubprocessSupervisor.start_utility(
params.subprocess_supervisor,
Expand Down Expand Up @@ -82,8 +77,10 @@ defmodule Membrane.Core.Pipeline do
@impl GenServer
def handle_continue(:setup, state) do
Utils.log_on_error do
state = LifecycleController.handle_setup(state)
{:noreply, state}
Telemetry.report_span :pipeline, :setup do
state = LifecycleController.handle_setup(state)
{:noreply, state}
end
end
end

Expand Down
Loading
Loading