Skip to content

Commit

Permalink
Telemetry spans for init and setup]
Browse files Browse the repository at this point in the history
  • Loading branch information
Krzysztof Wende committed Dec 6, 2024
1 parent 3c5c695 commit f30db73
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 30 deletions.
5 changes: 3 additions & 2 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ if config_env() == :test do
config :junit_formatter, include_filename?: true

config :membrane_core, :telemetry_flags, [
:links,
:inits_and_terminates,
:links,
:inits_and_terminates,
:spans
# {:metrics, [:buffer, :bitrate, :queue_len, :stream_format, :event, :store, :take_and_demand]}
]
end
12 changes: 8 additions & 4 deletions lib/membrane/core/element.ex
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,9 @@ defmodule Membrane.Core.Element do
@impl GenServer
def init(options) do
Utils.log_on_error do
do_init(options)
Telemetry.report_span :element, :init do
do_init(options)
end
end
end

Expand All @@ -118,7 +120,7 @@ defmodule Membrane.Core.Element do
SubprocessSupervisor.start_utility(options.subprocess_supervisor, {ResourceGuard, self()})

Telemetry.report_init(:element)
path = Membrane.ComponentPath.get_formatted()
path = Membrane.ComponentPath.get_formatted()
ResourceGuard.register(resource_guard, fn -> Telemetry.report_terminate(:element, path) end)

self_pid = self()
Expand Down Expand Up @@ -163,8 +165,10 @@ defmodule Membrane.Core.Element do
@impl GenServer
def handle_continue(:setup, state) do
Utils.log_on_error do
state = LifecycleController.handle_setup(state)
{:noreply, state}
Telemetry.report_span :element, :setup do
state = LifecycleController.handle_setup(state)
{:noreply, state}
end
end
end

Expand Down
55 changes: 38 additions & 17 deletions lib/membrane/core/telemetry.ex
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ defmodule Membrane.Core.Telemetry do
report_event(
event,
value,
Keyword.get(@telemetry_flags, :metrics, []) |> Enum.find(&(&1 == metric)) != nil
Keyword.get(@telemetry_flags, :metrics, []) |> Enum.member?(metric)
)
end

Expand Down Expand Up @@ -60,7 +60,7 @@ defmodule Membrane.Core.Telemetry do
report_event(
event,
value,
Keyword.get(@telemetry_flags, :metrics, []) |> Enum.find(&(&1 == :bitrate)) != nil
Keyword.get(@telemetry_flags, :metrics, []) |> Enum.member?(:bitrate)
)
end

Expand Down Expand Up @@ -96,7 +96,7 @@ defmodule Membrane.Core.Telemetry do
}
end

report_event(event, value, Enum.find(@telemetry_flags, &(&1 == :links)) != nil)
report_event(event, value, Enum.member?(@telemetry_flags, :links))
end

@doc """
Expand All @@ -115,31 +115,52 @@ defmodule Membrane.Core.Telemetry do
%{log_metadata: Logger.metadata()}
end

report_event(
event,
value,
Enum.find(@telemetry_flags, &(&1 == :inits_and_terminates)) != nil,
metadata
)
report_event(event, value, Enum.member?(@telemetry_flags, :inits_and_terminates), metadata)
end

defmacro report_span(type, subtype, do: block) do
enabled = Enum.member?(@telemetry_flags, :spans)

if enabled do
quote do
{time, val} =
:timer.tc(fn ->
unquote(block)
end)

path = ComponentPath.get()
log_metadata = Logger.metadata()

:telemetry.execute(
[:membrane, unquote(type), :span, unquote(subtype)],
%{time: time, time_unit: :microseconds, path: path},
%{log_metadata: Logger.metadata()}
)

val
end
else
block
end
end

@doc """
Reports a pipeline/bin/element termination.
"""
defmacro report_terminate(type, path) when type in [:pipeline, :bin, :element] do
event = [:membrane, type, :terminate]
value = quote do %{path: unquote(path)} end

report_event(
event,
value,
Enum.find(@telemetry_flags, &(&1 == :inits_and_terminates)) != nil
)
value =
quote do
%{path: unquote(path)}
end

report_event(event, value, Enum.member?(@telemetry_flags, :inits_and_terminates))
end

# Conditional event reporting of telemetry events
defp report_event(event_name, measurement, enable, metadata \\ nil) do
if enable do
defp report_event(event_name, measurement, enabled, metadata \\ nil) do
if enabled do
quote do
:telemetry.execute(
unquote(event_name),
Expand Down
47 changes: 40 additions & 7 deletions test/membrane/telemetry_test.exs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
defmodule Membrane.TelemetryTest do
use ExUnit.Case, async: false

import Membrane.ChildrenSpec

require Logger
Expand All @@ -10,9 +9,7 @@ defmodule Membrane.TelemetryTest do
use Membrane.Filter

def_input_pad :input, flow_control: :manual, accepted_format: _any, demand_unit: :buffers

def_output_pad :output, flow_control: :manual, accepted_format: _any

def_options target: [spec: pid()]

@impl true
Expand Down Expand Up @@ -43,6 +40,8 @@ defmodule Membrane.TelemetryTest do
end

describe "Telemetry attaching" do
@tag :telemetry

test "handles init events", %{ref: ref, links: links} do
:ok =
[
Expand Down Expand Up @@ -91,11 +90,45 @@ defmodule Membrane.TelemetryTest do
end
end

describe "Telemetry reports" do
@tag :telemetry

@paths ~w[:filter :sink :source]
@steps [:init, :setup]

setup %{links: links} do
ref = make_ref()

for step <- @steps do
[:membrane, :element, :span, step]
end
|> attach_to_events(ref)

Testing.Pipeline.start_link_supervised!(spec: links)
[ref: ref]
end

for path <- @paths, step <- @steps do
test "#{path}/#{step}", %{ref: ref} do
path = unquote(path)
step = unquote(step)

assert_receive(
{^ref, :telemetry_ack,
{[:membrane, :element, :span, ^step], %{path: [_, ^path], time: time}, _}}
)

assert time > 0
end
end
end

defp attach_to_events(events, ref) do
:telemetry.attach_many(ref, events, &TelemetryListener.handle_event/4, %{
dest: self(),
ref: ref
})
:ok =
:telemetry.attach_many(ref, events, &TelemetryListener.handle_event/4, %{
dest: self(),
ref: ref
})
end

defp extract_type(%{path: pid_type}) do
Expand Down

0 comments on commit f30db73

Please sign in to comment.