Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support propagating OpenTelemetry context #1227

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions guides/telemetry.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,29 @@ After a query is executed, you'll see something like:
}
}
```

## Opentelemetry

When using Opentelemetry, one usually wants to correlate spans that are created
in spawned tasks with the main trace. For example, you might have a trace started
in a Phoenix endpoint, and then have spans around database access.

One can correlate manually by attaching the OTel context the task function:

```elixir
ctx = OpenTelemetry.Ctx.get_current()

Task.async(fn ->
OpenTelemetry.Ctx.attach(ctx)

# do stuff that might create spans
end)
```

When using the `async` and `batch` middleware, the tasks are spawned by Absinthe,
so you can't attach the context manually.

Instead, you can add the `:opentelemetry_process_propagator` package to your
dependencies, which has a `Task.async/1` wrapper that will attach the context
automatically. If the package is installed, the middleware will use it in place
of the default `Task.async/1`.
11 changes: 10 additions & 1 deletion lib/absinthe/middleware/async.ex
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ defmodule Absinthe.Middleware.Async do
# task so we have actual data. Thus, we prepend this module to the middleware stack.
def call(%{state: :unresolved} = res, {fun, opts}) when is_function(fun) do
task =
Task.async(fn ->
async(fn ->
:telemetry.span([:absinthe, :middleware, :async, :task], %{}, fn -> {fun.(), %{}} end)
end)

Expand Down Expand Up @@ -110,4 +110,13 @@ defmodule Absinthe.Middleware.Async do
pipeline
end
end

# Optionally use `async/1` function from `opentelemetry_process_propagator` if available
if Code.ensure_loaded?(OpentelemetryProcessPropagator.Task) do
@spec async((() -> any)) :: Task.t()
defdelegate async(fun), to: OpentelemetryProcessPropagator.Task
else
@spec async((() -> any)) :: Task.t()
defdelegate async(fun), to: Task
end
end
11 changes: 10 additions & 1 deletion lib/absinthe/middleware/batch.ex
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ defmodule Absinthe.Middleware.Batch do
start_time_mono = System.monotonic_time()

task =
Task.async(fn ->
async(fn ->
{batch_fun, call_batch_fun(batch_fun, batch_data)}
end)

Expand Down Expand Up @@ -206,4 +206,13 @@ defmodule Absinthe.Middleware.Batch do
pipeline
end
end

# Optionally use `async/1` function from `opentelemetry_process_propagator` if available
if Code.ensure_loaded?(OpentelemetryProcessPropagator.Task) do
@spec async((() -> any)) :: Task.t()
defdelegate async(fun), to: OpentelemetryProcessPropagator.Task
else
@spec async((() -> any)) :: Task.t()
defdelegate async(fun), to: Task
end
end
1 change: 1 addition & 0 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ defmodule Absinthe.Mixfile do
{:telemetry, "~> 1.0 or ~> 0.4"},
{:dataloader, "~> 1.0.0", optional: true},
{:decimal, "~> 1.0 or ~> 2.0", optional: true},
{:opentelemetry_process_propagator, "~> 0.2.1", optional: true},
{:ex_doc, "~> 0.22", only: :dev},
{:benchee, ">= 1.0.0", only: :dev},
{:dialyxir, "~> 1.1.0", only: [:dev, :test], runtime: false},
Expand Down
2 changes: 2 additions & 0 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,7 @@
"makeup_graphql": {:hex, :makeup_graphql, "0.1.2", "81e2939aab6d2b81d39ee5d9e13fae02599e9ca6e1152e0eeed737a98a5f96aa", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.1", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "3390ab04ba388d52a94bbe64ef62aa4d7923ceaffac43ec948f58f631440e8fb"},
"mix_test_watch": {:hex, :mix_test_watch, "1.0.2", "34900184cbbbc6b6ed616ed3a8ea9b791f9fd2088419352a6d3200525637f785", [:mix], [{:file_system, "~> 0.2.1 or ~> 0.3", [hex: :file_system, repo: "hexpm", optional: false]}], "hexpm", "47ac558d8b06f684773972c6d04fcc15590abdb97aeb7666da19fcbfdc441a07"},
"nimble_parsec": {:hex, :nimble_parsec, "1.2.3", "244836e6e3f1200c7f30cb56733fd808744eca61fd182f731eac4af635cc6d0b", [:mix], [], "hexpm", "c8d789e39b9131acf7b99291e93dae60ab48ef14a7ee9d58c6964f59efb570b0"},
"opentelemetry_api": {:hex, :opentelemetry_api, "1.2.0", "454a35655b4c1924405ef1f3587f2c6f141bf73366b2c5e8a38dcc619b53eaa0", [:mix, :rebar3], [], "hexpm", "9e677c68243de0f70538798072e66e1fb1d4a2ca8888a6eb493c0a41e5480c35"},
"opentelemetry_process_propagator": {:hex, :opentelemetry_process_propagator, "0.2.1", "20ac37648faf7175cade16fda8d58e6f1ff1b7f2a50a8ef9d70a032c41aba315", [:mix, :rebar3], [{:opentelemetry_api, "~> 1.0", [hex: :opentelemetry_api, repo: "hexpm", optional: false]}], "hexpm", "f317237e39636d4f6140afa5d419e85ed3dc9e9a57072e7cd442df42af7b8aac"},
"telemetry": {:hex, :telemetry, "0.4.3", "a06428a514bdbc63293cd9a6263aad00ddeb66f608163bdec7c8995784080818", [:rebar3], [], "hexpm", "eb72b8365ffda5bed68a620d1da88525e326cb82a75ee61354fc24b844768041"},
}
18 changes: 18 additions & 0 deletions test/absinthe/middleware/async_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,14 @@ defmodule Absinthe.Middleware.AsyncTest do
{:middleware, Elixir.Absinthe.Middleware.Async, task}
end
end

field :async_check_otel_ctx, :string do
resolve fn _, _, _ ->
async(fn ->
{:ok, OpenTelemetry.Ctx.get_value("stored_value", nil)}
end)
end
end
end

def cool_async(fun) do
Expand Down Expand Up @@ -125,4 +133,14 @@ defmodule Absinthe.Middleware.AsyncTest do

assert {:ok, %{data: %{"returnsNil" => nil}}} == Absinthe.run(doc, Schema)
end

test "propagates the OTel context" do
doc = """
{asyncCheckOtelCtx}
"""

OpenTelemetry.Ctx.set_value("stored_value", "some_value")

assert {:ok, %{data: %{"asyncCheckOtelCtx" => "some_value"}}} == Absinthe.run(doc, Schema)
end
end
22 changes: 22 additions & 0 deletions test/absinthe/middleware/batch_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,23 @@ defmodule Absinthe.Middleware.BatchTest do
end)
end
end

field :ctx, :string do
resolve fn _, _, _ ->
batch({__MODULE__, :otel_ctx}, nil, fn batch ->
{:ok, batch}
end)
end
end
end

def by_id(_, ids) do
Map.take(@organizations, ids)
end

def otel_ctx(_, _) do
OpenTelemetry.Ctx.get_value("stored_value", nil)
end
end

test "can resolve a field using the normal async helper" do
Expand Down Expand Up @@ -128,4 +140,14 @@ defmodule Absinthe.Middleware.BatchTest do
assert_receive {:telemetry_event, [:absinthe, :middleware, :batch, :stop], %{duration: _},
%{id: _, batch_fun: _, batch_opts: _, batch_data: _, result: _}}
end

test "propagates the OTel context" do
doc = """
{ctx}
"""

OpenTelemetry.Ctx.set_value("stored_value", "some_value")

assert {:ok, %{data: %{"ctx" => "some_value"}}} == Absinthe.run(doc, Schema)
end
end