diff --git a/config/config.exs b/config/config.exs index 3f582015e..480a70af5 100644 --- a/config/config.exs +++ b/config/config.exs @@ -4,8 +4,9 @@ if config_env() == :test do config :junit_formatter, include_filename?: true config :membrane_core, :telemetry_flags, [ - :links, - :inits_and_terminates, + :links, + :inits_and_terminates, + :spans # {:metrics, [:buffer, :bitrate, :queue_len, :stream_format, :event, :store, :take_and_demand]} ] end diff --git a/lib/membrane/core/element.ex b/lib/membrane/core/element.ex index 682b55c97..5fb081a03 100644 --- a/lib/membrane/core/element.ex +++ b/lib/membrane/core/element.ex @@ -96,7 +96,9 @@ defmodule Membrane.Core.Element do @impl GenServer def init(options) do Utils.log_on_error do - do_init(options) + Telemetry.report_span :element, :init do + do_init(options) + end end end @@ -118,7 +120,7 @@ defmodule Membrane.Core.Element do SubprocessSupervisor.start_utility(options.subprocess_supervisor, {ResourceGuard, self()}) Telemetry.report_init(:element) - path = Membrane.ComponentPath.get_formatted() + path = Membrane.ComponentPath.get_formatted() ResourceGuard.register(resource_guard, fn -> Telemetry.report_terminate(:element, path) end) self_pid = self() @@ -163,8 +165,10 @@ defmodule Membrane.Core.Element do @impl GenServer def handle_continue(:setup, state) do Utils.log_on_error do - state = LifecycleController.handle_setup(state) - {:noreply, state} + Telemetry.report_span :element, :setup do + state = LifecycleController.handle_setup(state) + {:noreply, state} + end end end diff --git a/lib/membrane/core/telemetry.ex b/lib/membrane/core/telemetry.ex index c39fbad5e..6e67e9ab7 100644 --- a/lib/membrane/core/telemetry.ex +++ b/lib/membrane/core/telemetry.ex @@ -28,7 +28,7 @@ defmodule Membrane.Core.Telemetry do report_event( event, value, - Keyword.get(@telemetry_flags, :metrics, []) |> Enum.find(&(&1 == metric)) != nil + Keyword.get(@telemetry_flags, :metrics, []) |> Enum.member?(metric) ) end @@ -60,7 +60,7 @@ defmodule Membrane.Core.Telemetry do report_event( event, value, - Keyword.get(@telemetry_flags, :metrics, []) |> Enum.find(&(&1 == :bitrate)) != nil + Keyword.get(@telemetry_flags, :metrics, []) |> Enum.member?(:bitrate) ) end @@ -96,7 +96,7 @@ defmodule Membrane.Core.Telemetry do } end - report_event(event, value, Enum.find(@telemetry_flags, &(&1 == :links)) != nil) + report_event(event, value, Enum.member?(@telemetry_flags, :links)) end @doc """ @@ -115,12 +115,33 @@ defmodule Membrane.Core.Telemetry do %{log_metadata: Logger.metadata()} end - report_event( - event, - value, - Enum.find(@telemetry_flags, &(&1 == :inits_and_terminates)) != nil, - metadata - ) + report_event(event, value, Enum.member?(@telemetry_flags, :inits_and_terminates), metadata) + end + + defmacro report_span(type, subtype, do: block) do + enabled = Enum.member?(@telemetry_flags, :spans) + + if enabled do + quote do + {time, val} = + :timer.tc(fn -> + unquote(block) + end) + + path = ComponentPath.get() + log_metadata = Logger.metadata() + + :telemetry.execute( + [:membrane, unquote(type), :span, unquote(subtype)], + %{time: time, time_unit: :microseconds, path: path}, + %{log_metadata: Logger.metadata()} + ) + + val + end + else + block + end end @doc """ @@ -128,18 +149,18 @@ defmodule Membrane.Core.Telemetry do """ defmacro report_terminate(type, path) when type in [:pipeline, :bin, :element] do event = [:membrane, type, :terminate] - value = quote do %{path: unquote(path)} end - report_event( - event, - value, - Enum.find(@telemetry_flags, &(&1 == :inits_and_terminates)) != nil - ) + value = + quote do + %{path: unquote(path)} + end + + report_event(event, value, Enum.member?(@telemetry_flags, :inits_and_terminates)) end # Conditional event reporting of telemetry events - defp report_event(event_name, measurement, enable, metadata \\ nil) do - if enable do + defp report_event(event_name, measurement, enabled, metadata \\ nil) do + if enabled do quote do :telemetry.execute( unquote(event_name), diff --git a/test/membrane/telemetry_test.exs b/test/membrane/telemetry_test.exs index 587bf6d38..a65a1b826 100644 --- a/test/membrane/telemetry_test.exs +++ b/test/membrane/telemetry_test.exs @@ -1,6 +1,5 @@ defmodule Membrane.TelemetryTest do use ExUnit.Case, async: false - import Membrane.ChildrenSpec require Logger @@ -10,9 +9,7 @@ defmodule Membrane.TelemetryTest do use Membrane.Filter def_input_pad :input, flow_control: :manual, accepted_format: _any, demand_unit: :buffers - def_output_pad :output, flow_control: :manual, accepted_format: _any - def_options target: [spec: pid()] @impl true @@ -43,6 +40,8 @@ defmodule Membrane.TelemetryTest do end describe "Telemetry attaching" do + @tag :telemetry + test "handles init events", %{ref: ref, links: links} do :ok = [ @@ -91,11 +90,45 @@ defmodule Membrane.TelemetryTest do end end + describe "Telemetry reports" do + @tag :telemetry + + @paths ~w[:filter :sink :source] + @steps [:init, :setup] + + setup %{links: links} do + ref = make_ref() + + for step <- @steps do + [:membrane, :element, :span, step] + end + |> attach_to_events(ref) + + Testing.Pipeline.start_link_supervised!(spec: links) + [ref: ref] + end + + for path <- @paths, step <- @steps do + test "#{path}/#{step}", %{ref: ref} do + path = unquote(path) + step = unquote(step) + + assert_receive( + {^ref, :telemetry_ack, + {[:membrane, :element, :span, ^step], %{path: [_, ^path], time: time}, _}} + ) + + assert time > 0 + end + end + end + defp attach_to_events(events, ref) do - :telemetry.attach_many(ref, events, &TelemetryListener.handle_event/4, %{ - dest: self(), - ref: ref - }) + :ok = + :telemetry.attach_many(ref, events, &TelemetryListener.handle_event/4, %{ + dest: self(), + ref: ref + }) end defp extract_type(%{path: pid_type}) do