From 99bab83f3656bccd286d5b0214a424619a0eeb06 Mon Sep 17 00:00:00 2001 From: Saverio Date: Wed, 8 Feb 2023 11:30:13 +0100 Subject: [PATCH] Support propagating OTel context --- guides/telemetry.md | 26 +++++++++++++++++++++++++ lib/absinthe/middleware/async.ex | 11 ++++++++++- lib/absinthe/middleware/batch.ex | 11 ++++++++++- mix.exs | 1 + mix.lock | 2 ++ test/absinthe/middleware/async_test.exs | 18 +++++++++++++++++ test/absinthe/middleware/batch_test.exs | 22 +++++++++++++++++++++ 7 files changed, 89 insertions(+), 2 deletions(-) diff --git a/guides/telemetry.md b/guides/telemetry.md index 1a1b62bb4c..304691ef3e 100644 --- a/guides/telemetry.md +++ b/guides/telemetry.md @@ -62,3 +62,29 @@ After a query is executed, you'll see something like: } } ``` + +## Opentelemetry + +When using Opentelemetry, one usually wants to correlate spans that are created +in spawned tasks with the main trace. For example, you might have a trace started +in a Phoenix endpoint, and then have spans around database access. + +One can correlate manually by attaching the OTel context the task function: + +```elixir +ctx = OpenTelemetry.Ctx.get_current() + +Task.async(fn -> + OpenTelemetry.Ctx.attach(ctx) + + # do stuff that might create spans +end) +``` + +When using the `async` and `batch` middleware, the tasks are spawned by Absinthe, +so you can't attach the context manually. + +Instead, you can add the `:opentelemetry_process_propagator` package to your +dependencies, which has a `Task.async/1` wrapper that will attach the context +automatically. If the package is installed, the middleware will use it in place +of the default `Task.async/1`. diff --git a/lib/absinthe/middleware/async.ex b/lib/absinthe/middleware/async.ex index b56c95d57c..553e895d81 100644 --- a/lib/absinthe/middleware/async.ex +++ b/lib/absinthe/middleware/async.ex @@ -52,7 +52,7 @@ defmodule Absinthe.Middleware.Async do # task so we have actual data. Thus, we prepend this module to the middleware stack. def call(%{state: :unresolved} = res, {fun, opts}) when is_function(fun) do task = - Task.async(fn -> + async(fn -> :telemetry.span([:absinthe, :middleware, :async, :task], %{}, fn -> {fun.(), %{}} end) end) @@ -110,4 +110,13 @@ defmodule Absinthe.Middleware.Async do pipeline end end + + # Optionally use `async/1` function from `opentelemetry_process_propagator` if available + if Code.ensure_loaded?(OpentelemetryProcessPropagator.Task) do + @spec async((() -> any)) :: Task.t() + defp async(fun), do: OpentelemetryProcessPropagator.Task.async(fun) + else + @spec async((() -> any)) :: Task.t() + defp async(fun), do: Task.async(fun) + end end diff --git a/lib/absinthe/middleware/batch.ex b/lib/absinthe/middleware/batch.ex index e6334feafc..2f582278e6 100644 --- a/lib/absinthe/middleware/batch.ex +++ b/lib/absinthe/middleware/batch.ex @@ -137,7 +137,7 @@ defmodule Absinthe.Middleware.Batch do start_time_mono = System.monotonic_time() task = - Task.async(fn -> + async(fn -> {batch_fun, call_batch_fun(batch_fun, batch_data)} end) @@ -206,4 +206,13 @@ defmodule Absinthe.Middleware.Batch do pipeline end end + + # Optionally use `async/1` function from `opentelemetry_process_propagator` if available + if Code.ensure_loaded?(OpentelemetryProcessPropagator.Task) do + @spec async((() -> any)) :: Task.t() + defp async(fun), do: OpentelemetryProcessPropagator.Task.async(fun) + else + @spec async((() -> any)) :: Task.t() + defp async(fun), do: Task.async(fun) + end end diff --git a/mix.exs b/mix.exs index 2309e8d1c7..7b2361679f 100644 --- a/mix.exs +++ b/mix.exs @@ -75,6 +75,7 @@ defmodule Absinthe.Mixfile do {:telemetry, "~> 1.0 or ~> 0.4"}, {:dataloader, "~> 1.0.0", optional: true}, {:decimal, "~> 1.0 or ~> 2.0", optional: true}, + {:opentelemetry_process_propagator, "~> 0.2.1", optional: true}, {:ex_doc, "~> 0.22", only: :dev}, {:benchee, ">= 1.0.0", only: :dev}, {:dialyxir, "~> 1.1.0", only: [:dev, :test], runtime: false}, diff --git a/mix.lock b/mix.lock index 3845cebc01..a956f2cf5c 100644 --- a/mix.lock +++ b/mix.lock @@ -14,5 +14,7 @@ "makeup_graphql": {:hex, :makeup_graphql, "0.1.2", "81e2939aab6d2b81d39ee5d9e13fae02599e9ca6e1152e0eeed737a98a5f96aa", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.1", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "3390ab04ba388d52a94bbe64ef62aa4d7923ceaffac43ec948f58f631440e8fb"}, "mix_test_watch": {:hex, :mix_test_watch, "1.0.2", "34900184cbbbc6b6ed616ed3a8ea9b791f9fd2088419352a6d3200525637f785", [:mix], [{:file_system, "~> 0.2.1 or ~> 0.3", [hex: :file_system, repo: "hexpm", optional: false]}], "hexpm", "47ac558d8b06f684773972c6d04fcc15590abdb97aeb7666da19fcbfdc441a07"}, "nimble_parsec": {:hex, :nimble_parsec, "1.2.3", "244836e6e3f1200c7f30cb56733fd808744eca61fd182f731eac4af635cc6d0b", [:mix], [], "hexpm", "c8d789e39b9131acf7b99291e93dae60ab48ef14a7ee9d58c6964f59efb570b0"}, + "opentelemetry_api": {:hex, :opentelemetry_api, "1.2.0", "454a35655b4c1924405ef1f3587f2c6f141bf73366b2c5e8a38dcc619b53eaa0", [:mix, :rebar3], [], "hexpm", "9e677c68243de0f70538798072e66e1fb1d4a2ca8888a6eb493c0a41e5480c35"}, + "opentelemetry_process_propagator": {:hex, :opentelemetry_process_propagator, "0.2.1", "20ac37648faf7175cade16fda8d58e6f1ff1b7f2a50a8ef9d70a032c41aba315", [:mix, :rebar3], [{:opentelemetry_api, "~> 1.0", [hex: :opentelemetry_api, repo: "hexpm", optional: false]}], "hexpm", "f317237e39636d4f6140afa5d419e85ed3dc9e9a57072e7cd442df42af7b8aac"}, "telemetry": {:hex, :telemetry, "0.4.3", "a06428a514bdbc63293cd9a6263aad00ddeb66f608163bdec7c8995784080818", [:rebar3], [], "hexpm", "eb72b8365ffda5bed68a620d1da88525e326cb82a75ee61354fc24b844768041"}, } diff --git a/test/absinthe/middleware/async_test.exs b/test/absinthe/middleware/async_test.exs index 49e39a196e..e797c90838 100644 --- a/test/absinthe/middleware/async_test.exs +++ b/test/absinthe/middleware/async_test.exs @@ -48,6 +48,14 @@ defmodule Absinthe.Middleware.AsyncTest do {:middleware, Elixir.Absinthe.Middleware.Async, task} end end + + field :async_check_otel_ctx, :string do + resolve fn _, _, _ -> + async(fn -> + {:ok, OpenTelemetry.Ctx.get_value("stored_value", nil)} + end) + end + end end def cool_async(fun) do @@ -125,4 +133,14 @@ defmodule Absinthe.Middleware.AsyncTest do assert {:ok, %{data: %{"returnsNil" => nil}}} == Absinthe.run(doc, Schema) end + + test "propagates the OTel context" do + doc = """ + {asyncCheckOtelCtx} + """ + + OpenTelemetry.Ctx.set_value("stored_value", "some_value") + + assert {:ok, %{data: %{"asyncCheckOtelCtx" => "some_value"}}} == Absinthe.run(doc, Schema) + end end diff --git a/test/absinthe/middleware/batch_test.exs b/test/absinthe/middleware/batch_test.exs index d099055649..11b9654ddb 100644 --- a/test/absinthe/middleware/batch_test.exs +++ b/test/absinthe/middleware/batch_test.exs @@ -52,11 +52,23 @@ defmodule Absinthe.Middleware.BatchTest do end) end end + + field :ctx, :string do + resolve fn _, _, _ -> + batch({__MODULE__, :otel_ctx}, nil, fn batch -> + {:ok, batch} + end) + end + end end def by_id(_, ids) do Map.take(@organizations, ids) end + + def otel_ctx(_, _) do + OpenTelemetry.Ctx.get_value("stored_value", nil) + end end test "can resolve a field using the normal async helper" do @@ -128,4 +140,14 @@ defmodule Absinthe.Middleware.BatchTest do assert_receive {:telemetry_event, [:absinthe, :middleware, :batch, :stop], %{duration: _}, %{id: _, batch_fun: _, batch_opts: _, batch_data: _, result: _}} end + + test "propagates the OTel context" do + doc = """ + {ctx} + """ + + OpenTelemetry.Ctx.set_value("stored_value", "some_value") + + assert {:ok, %{data: %{"ctx" => "some_value"}}} == Absinthe.run(doc, Schema) + end end