Skip to content

Commit

Permalink
Support propagating OTel context
Browse files Browse the repository at this point in the history
  • Loading branch information
rewritten committed Feb 8, 2023
1 parent 1562385 commit 99bab83
Show file tree
Hide file tree
Showing 7 changed files with 89 additions and 2 deletions.
26 changes: 26 additions & 0 deletions guides/telemetry.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,29 @@ After a query is executed, you'll see something like:
}
}
```

## Opentelemetry

When using Opentelemetry, one usually wants to correlate spans that are created
in spawned tasks with the main trace. For example, you might have a trace started
in a Phoenix endpoint, and then have spans around database access.

One can correlate manually by attaching the OTel context the task function:

```elixir
ctx = OpenTelemetry.Ctx.get_current()

Task.async(fn ->
OpenTelemetry.Ctx.attach(ctx)

# do stuff that might create spans
end)
```

When using the `async` and `batch` middleware, the tasks are spawned by Absinthe,
so you can't attach the context manually.

Instead, you can add the `:opentelemetry_process_propagator` package to your
dependencies, which has a `Task.async/1` wrapper that will attach the context
automatically. If the package is installed, the middleware will use it in place
of the default `Task.async/1`.
11 changes: 10 additions & 1 deletion lib/absinthe/middleware/async.ex
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ defmodule Absinthe.Middleware.Async do
# task so we have actual data. Thus, we prepend this module to the middleware stack.
def call(%{state: :unresolved} = res, {fun, opts}) when is_function(fun) do
task =
Task.async(fn ->
async(fn ->
:telemetry.span([:absinthe, :middleware, :async, :task], %{}, fn -> {fun.(), %{}} end)
end)

Expand Down Expand Up @@ -110,4 +110,13 @@ defmodule Absinthe.Middleware.Async do
pipeline
end
end

# Optionally use `async/1` function from `opentelemetry_process_propagator` if available
if Code.ensure_loaded?(OpentelemetryProcessPropagator.Task) do
@spec async((() -> any)) :: Task.t()
defp async(fun), do: OpentelemetryProcessPropagator.Task.async(fun)
else
@spec async((() -> any)) :: Task.t()
defp async(fun), do: Task.async(fun)
end
end
11 changes: 10 additions & 1 deletion lib/absinthe/middleware/batch.ex
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ defmodule Absinthe.Middleware.Batch do
start_time_mono = System.monotonic_time()

task =
Task.async(fn ->
async(fn ->
{batch_fun, call_batch_fun(batch_fun, batch_data)}
end)

Expand Down Expand Up @@ -206,4 +206,13 @@ defmodule Absinthe.Middleware.Batch do
pipeline
end
end

# Optionally use `async/1` function from `opentelemetry_process_propagator` if available
if Code.ensure_loaded?(OpentelemetryProcessPropagator.Task) do
@spec async((() -> any)) :: Task.t()
defp async(fun), do: OpentelemetryProcessPropagator.Task.async(fun)
else
@spec async((() -> any)) :: Task.t()
defp async(fun), do: Task.async(fun)
end
end
1 change: 1 addition & 0 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ defmodule Absinthe.Mixfile do
{:telemetry, "~> 1.0 or ~> 0.4"},
{:dataloader, "~> 1.0.0", optional: true},
{:decimal, "~> 1.0 or ~> 2.0", optional: true},
{:opentelemetry_process_propagator, "~> 0.2.1", optional: true},
{:ex_doc, "~> 0.22", only: :dev},
{:benchee, ">= 1.0.0", only: :dev},
{:dialyxir, "~> 1.1.0", only: [:dev, :test], runtime: false},
Expand Down
2 changes: 2 additions & 0 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,7 @@
"makeup_graphql": {:hex, :makeup_graphql, "0.1.2", "81e2939aab6d2b81d39ee5d9e13fae02599e9ca6e1152e0eeed737a98a5f96aa", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.1", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "3390ab04ba388d52a94bbe64ef62aa4d7923ceaffac43ec948f58f631440e8fb"},
"mix_test_watch": {:hex, :mix_test_watch, "1.0.2", "34900184cbbbc6b6ed616ed3a8ea9b791f9fd2088419352a6d3200525637f785", [:mix], [{:file_system, "~> 0.2.1 or ~> 0.3", [hex: :file_system, repo: "hexpm", optional: false]}], "hexpm", "47ac558d8b06f684773972c6d04fcc15590abdb97aeb7666da19fcbfdc441a07"},
"nimble_parsec": {:hex, :nimble_parsec, "1.2.3", "244836e6e3f1200c7f30cb56733fd808744eca61fd182f731eac4af635cc6d0b", [:mix], [], "hexpm", "c8d789e39b9131acf7b99291e93dae60ab48ef14a7ee9d58c6964f59efb570b0"},
"opentelemetry_api": {:hex, :opentelemetry_api, "1.2.0", "454a35655b4c1924405ef1f3587f2c6f141bf73366b2c5e8a38dcc619b53eaa0", [:mix, :rebar3], [], "hexpm", "9e677c68243de0f70538798072e66e1fb1d4a2ca8888a6eb493c0a41e5480c35"},
"opentelemetry_process_propagator": {:hex, :opentelemetry_process_propagator, "0.2.1", "20ac37648faf7175cade16fda8d58e6f1ff1b7f2a50a8ef9d70a032c41aba315", [:mix, :rebar3], [{:opentelemetry_api, "~> 1.0", [hex: :opentelemetry_api, repo: "hexpm", optional: false]}], "hexpm", "f317237e39636d4f6140afa5d419e85ed3dc9e9a57072e7cd442df42af7b8aac"},
"telemetry": {:hex, :telemetry, "0.4.3", "a06428a514bdbc63293cd9a6263aad00ddeb66f608163bdec7c8995784080818", [:rebar3], [], "hexpm", "eb72b8365ffda5bed68a620d1da88525e326cb82a75ee61354fc24b844768041"},
}
18 changes: 18 additions & 0 deletions test/absinthe/middleware/async_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,14 @@ defmodule Absinthe.Middleware.AsyncTest do
{:middleware, Elixir.Absinthe.Middleware.Async, task}
end
end

field :async_check_otel_ctx, :string do
resolve fn _, _, _ ->
async(fn ->
{:ok, OpenTelemetry.Ctx.get_value("stored_value", nil)}
end)
end
end
end

def cool_async(fun) do
Expand Down Expand Up @@ -125,4 +133,14 @@ defmodule Absinthe.Middleware.AsyncTest do

assert {:ok, %{data: %{"returnsNil" => nil}}} == Absinthe.run(doc, Schema)
end

test "propagates the OTel context" do
doc = """
{asyncCheckOtelCtx}
"""

OpenTelemetry.Ctx.set_value("stored_value", "some_value")

assert {:ok, %{data: %{"asyncCheckOtelCtx" => "some_value"}}} == Absinthe.run(doc, Schema)
end
end
22 changes: 22 additions & 0 deletions test/absinthe/middleware/batch_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,23 @@ defmodule Absinthe.Middleware.BatchTest do
end)
end
end

field :ctx, :string do
resolve fn _, _, _ ->
batch({__MODULE__, :otel_ctx}, nil, fn batch ->
{:ok, batch}
end)
end
end
end

def by_id(_, ids) do
Map.take(@organizations, ids)
end

def otel_ctx(_, _) do
OpenTelemetry.Ctx.get_value("stored_value", nil)
end
end

test "can resolve a field using the normal async helper" do
Expand Down Expand Up @@ -128,4 +140,14 @@ defmodule Absinthe.Middleware.BatchTest do
assert_receive {:telemetry_event, [:absinthe, :middleware, :batch, :stop], %{duration: _},
%{id: _, batch_fun: _, batch_opts: _, batch_data: _, result: _}}
end

test "propagates the OTel context" do
doc = """
{ctx}
"""

OpenTelemetry.Ctx.set_value("stored_value", "some_value")

assert {:ok, %{data: %{"ctx" => "some_value"}}} == Absinthe.run(doc, Schema)
end
end

0 comments on commit 99bab83

Please sign in to comment.