From 9a1783b7edf27a0bec695ecc9e22c4fce81752ab Mon Sep 17 00:00:00 2001 From: v0idpwn Date: Mon, 24 Jun 2024 20:07:57 -0300 Subject: [PATCH 1/6] Add a connection listener that emits telemetry events --- .../cases/connection_listeners_test.exs | 112 ++++++++++++++++++ lib/db_connection/telemetry_listener.ex | 61 ++++++++++ 2 files changed, 173 insertions(+) create mode 100644 lib/db_connection/telemetry_listener.ex diff --git a/integration_test/cases/connection_listeners_test.exs b/integration_test/cases/connection_listeners_test.exs index d2d0f9a..7486092 100644 --- a/integration_test/cases/connection_listeners_test.exs +++ b/integration_test/cases/connection_listeners_test.exs @@ -203,4 +203,116 @@ defmodule ConnectionListenersTest do assert is_pid(conn3) refute conn1 == conn2 == conn3 end + + describe "telemetry listener" do + test "emits events with no tag" do + attach_telemetry_forwarding_handler() + err = RuntimeError.exception("oops") + + stack = [ + {:ok, :state}, + {:disconnect, err, :discon}, + :ok, + {:error, err} + ] + + {:ok, agent} = A.start_link(stack) + {:ok, telemetry_listener} = DbConnection.TelemetryListener.start_link() + + {:ok, pool} = + P.start_link( + agent: agent, + parent: self(), + connection_listeners: [telemetry_listener], + backoff_min: 1_000 + ) + + assert_receive {:telemetry, :connected, %{tag: nil}} + assert P.close(pool, %Q{}) + assert_receive {:telemetry, :disconnected, %{tag: nil}} + after + detach_telemetry_forwarding_handler() + end + + test "emits events with tag" do + attach_telemetry_forwarding_handler() + err = RuntimeError.exception("oops") + + stack = [ + {:ok, :state}, + {:disconnect, err, :discon}, + :ok, + {:error, err} + ] + + {:ok, agent} = A.start_link(stack) + {:ok, telemetry_listener} = DbConnection.TelemetryListener.start_link() + + tag = make_ref() + + {:ok, pool} = + P.start_link( + agent: agent, + parent: self(), + connection_listeners: {[telemetry_listener], tag}, + backoff_min: 1_000 + ) + + assert_receive {:telemetry, :connected, %{tag: ^tag}} + assert P.close(pool, %Q{}) + assert_receive {:telemetry, :disconnected, %{tag: ^tag}} + after + detach_telemetry_forwarding_handler() + end + + test "handles non-graceful disconnects" do + attach_telemetry_forwarding_handler() + + stack = [ + fn opts -> + send(opts[:parent], {:hi, self()}) + {:ok, :state} + end, + {:ok, :state} + ] + + {:ok, agent} = A.start_link(stack) + {:ok, telemetry_listener} = DbConnection.TelemetryListener.start_link() + + {:ok, _pool} = + P.start_link( + agent: agent, + parent: self(), + connection_listeners: [telemetry_listener], + backoff_min: 1_000 + ) + + assert_receive {:hi, pid} + Process.exit(pid, :kill) + + assert_receive {:telemetry, :disconnected, %{pid: ^pid}} + after + detach_telemetry_forwarding_handler() + end + end + + defp attach_telemetry_forwarding_handler() do + test_pid = self() + + :telemetry.attach_many( + "TestHandler", + [ + [:db_connection, :connected], + [:db_connection, :disconnected] + ], + fn [:db_connection, action], _, metadata, _ -> + send(test_pid, {:telemetry, action, metadata}) + end, + %{} + ) + end + + defp detach_telemetry_forwarding_handler() do + :telemetry.detach("TestHandler") + end end diff --git a/lib/db_connection/telemetry_listener.ex b/lib/db_connection/telemetry_listener.ex new file mode 100644 index 0000000..ebf7ba8 --- /dev/null +++ b/lib/db_connection/telemetry_listener.ex @@ -0,0 +1,61 @@ +defmodule DbConnection.TelemetryListener do + @moduledoc """ + A connection listener that emits telemetry events for connection and disconnection + """ + + use GenServer + + def start_link(opts \\ []) do + GenServer.start_link(__MODULE__, nil, opts) + end + + @impl GenServer + def init(_) do + {:ok, %{monitoring: %{}}} + end + + @impl GenServer + def handle_info({:connected, pid, tag}, state) do + handle_connected(pid, tag, state) + end + + def handle_info({:connected, pid}, state) do + handle_connected(pid, nil, state) + end + + def handle_info({:disconnected, pid, _}, state) do + handle_disconnected(pid, state) + end + + def handle_info({:disconnected, pid}, state) do + handle_disconnected(pid, state) + end + + def handle_info({:DOWN, _ref, :process, pid, _reason}, state) do + handle_disconnected(pid, state) + end + + defp handle_connected(pid, tag, state) do + :telemetry.execute([:db_connection, :connected], %{count: 1}, %{tag: tag, pid: pid}) + + ref = Process.monitor(pid) + monitoring = Map.put(state.monitoring, pid, {ref, tag}) + + {:noreply, %{state | monitoring: monitoring}} + end + + def handle_disconnected(pid, state) do + case state.monitoring[pid] do + # Already handled. We may receive two messages: one from monitor and one + # from listener. For this reason, we need to handle both. + nil -> + {:noreply, state} + + {ref, tag} -> + Process.demonitor(ref, [:flush]) + :telemetry.execute([:db_connection, :disconnected], %{count: 1}, %{tag: tag, pid: pid}) + monitoring = Map.delete(state.monitoring, pid) + {:noreply, %{state | monitoring: monitoring}} + end + end +end From 17b76d2e85631f78be9db425b60868655acdfe12 Mon Sep 17 00:00:00 2001 From: v0idpwn Date: Thu, 27 Jun 2024 08:22:52 -0300 Subject: [PATCH 2/6] Add documentation --- lib/db_connection.ex | 6 ++++ lib/db_connection/telemetry_listener.ex | 37 +++++++++++++++++++++++-- 2 files changed, 41 insertions(+), 2 deletions(-) diff --git a/lib/db_connection.ex b/lib/db_connection.ex index c863819..ec343f3 100644 --- a/lib/db_connection.ex +++ b/lib/db_connection.ex @@ -524,6 +524,12 @@ defmodule DBConnection do This feature is available since v2.6.0. Before this version `:connection_listeners` only accepted a list of listener processes. + ## Telemetry listener + + DBConnection provides a connection listener that emits telemetry events upon + connection and disconnection, see the `DBConnection.TelemetryListener` module + for more info. + ## Connection Configuration Callback The `:configure` function will be called before each individual connection to the diff --git a/lib/db_connection/telemetry_listener.ex b/lib/db_connection/telemetry_listener.ex index ebf7ba8..1ed8de9 100644 --- a/lib/db_connection/telemetry_listener.ex +++ b/lib/db_connection/telemetry_listener.ex @@ -1,16 +1,49 @@ defmodule DbConnection.TelemetryListener do @moduledoc """ A connection listener that emits telemetry events for connection and disconnection + + It monitors connection processes and ensures that disconnection events are + always emitted. + + ## Telemetry events + + ### Connected + + `[:db_connection, :connected]` - Executed after a connection is established. + + #### Measurements + + * `:count` - Always 1 + + #### Metadata + + * `:pid` - The connection pid + * `:tag` - The connection pool tag + + ### Disconnected + + `[:db_connection, :disconnected]` - Executed after a disconnect. + + #### Measurements + + * `:count` - Always 1 + + #### Metadata + + * `:pid` - The connection pid + * `:tag` - The connection pool tag """ use GenServer + @doc "Starts a telemetry listener" + @spec start_link(GenServer.options()) :: {:ok, pid()} def start_link(opts \\ []) do GenServer.start_link(__MODULE__, nil, opts) end @impl GenServer - def init(_) do + def init(nil) do {:ok, %{monitoring: %{}}} end @@ -44,7 +77,7 @@ defmodule DbConnection.TelemetryListener do {:noreply, %{state | monitoring: monitoring}} end - def handle_disconnected(pid, state) do + defp handle_disconnected(pid, state) do case state.monitoring[pid] do # Already handled. We may receive two messages: one from monitor and one # from listener. For this reason, we need to handle both. From 7a96b512aa07975688c89b1d27b69a4852c5657b Mon Sep 17 00:00:00 2001 From: v0idpwn Date: Thu, 27 Jun 2024 08:25:44 -0300 Subject: [PATCH 3/6] Address review comment --- lib/db_connection/telemetry_listener.ex | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/lib/db_connection/telemetry_listener.ex b/lib/db_connection/telemetry_listener.ex index 1ed8de9..9674649 100644 --- a/lib/db_connection/telemetry_listener.ex +++ b/lib/db_connection/telemetry_listener.ex @@ -70,11 +70,9 @@ defmodule DbConnection.TelemetryListener do defp handle_connected(pid, tag, state) do :telemetry.execute([:db_connection, :connected], %{count: 1}, %{tag: tag, pid: pid}) - ref = Process.monitor(pid) - monitoring = Map.put(state.monitoring, pid, {ref, tag}) - {:noreply, %{state | monitoring: monitoring}} + {:noreply, put_in(state.monitoring[pid], {ref, tag})} end defp handle_disconnected(pid, state) do @@ -87,8 +85,7 @@ defmodule DbConnection.TelemetryListener do {ref, tag} -> Process.demonitor(ref, [:flush]) :telemetry.execute([:db_connection, :disconnected], %{count: 1}, %{tag: tag, pid: pid}) - monitoring = Map.delete(state.monitoring, pid) - {:noreply, %{state | monitoring: monitoring}} + {:noreply, %{state | monitoring: Map.delete(state.monitoring, pid)}} end end end From d18b7b4d82c5c0e7bca224a336b9934f2e10266f Mon Sep 17 00:00:00 2001 From: v0idpwn Date: Thu, 27 Jun 2024 08:32:07 -0300 Subject: [PATCH 4/6] Add usage instructions --- lib/db_connection/telemetry_listener.ex | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/lib/db_connection/telemetry_listener.ex b/lib/db_connection/telemetry_listener.ex index 9674649..b1aa574 100644 --- a/lib/db_connection/telemetry_listener.ex +++ b/lib/db_connection/telemetry_listener.ex @@ -5,6 +5,16 @@ defmodule DbConnection.TelemetryListener do It monitors connection processes and ensures that disconnection events are always emitted. + ## Usage + + Start the listener, optionally using a name, and pass it under the + `:connection_listeners` option when starting DbConnection: + + {:ok, pid} = TelemetryListener.start_link([name: MyListener]) + {:ok, _conn} = DBConnection.start_link(SomeModule, [connection_listeners: [MyListener]]) + # Using a tag, which will be sent in telemetry metadata + {:ok, _conn} = DBConnection.start_link(SomeModule, [connection_listeners: {[pid], :my_tag}]) + ## Telemetry events ### Connected From 7bfe5fbb7b9cabe55edffcc34905d0760b2899a9 Mon Sep 17 00:00:00 2001 From: v0idpwn Date: Mon, 1 Jul 2024 11:33:32 -0300 Subject: [PATCH 5/6] Fix wrong module name --- integration_test/cases/connection_listeners_test.exs | 6 +++--- lib/db_connection/telemetry_listener.ex | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/integration_test/cases/connection_listeners_test.exs b/integration_test/cases/connection_listeners_test.exs index 7486092..d208574 100644 --- a/integration_test/cases/connection_listeners_test.exs +++ b/integration_test/cases/connection_listeners_test.exs @@ -217,7 +217,7 @@ defmodule ConnectionListenersTest do ] {:ok, agent} = A.start_link(stack) - {:ok, telemetry_listener} = DbConnection.TelemetryListener.start_link() + {:ok, telemetry_listener} = DBConnection.TelemetryListener.start_link() {:ok, pool} = P.start_link( @@ -246,7 +246,7 @@ defmodule ConnectionListenersTest do ] {:ok, agent} = A.start_link(stack) - {:ok, telemetry_listener} = DbConnection.TelemetryListener.start_link() + {:ok, telemetry_listener} = DBConnection.TelemetryListener.start_link() tag = make_ref() @@ -277,7 +277,7 @@ defmodule ConnectionListenersTest do ] {:ok, agent} = A.start_link(stack) - {:ok, telemetry_listener} = DbConnection.TelemetryListener.start_link() + {:ok, telemetry_listener} = DBConnection.TelemetryListener.start_link() {:ok, _pool} = P.start_link( diff --git a/lib/db_connection/telemetry_listener.ex b/lib/db_connection/telemetry_listener.ex index b1aa574..66f4e40 100644 --- a/lib/db_connection/telemetry_listener.ex +++ b/lib/db_connection/telemetry_listener.ex @@ -1,4 +1,4 @@ -defmodule DbConnection.TelemetryListener do +defmodule DBConnection.TelemetryListener do @moduledoc """ A connection listener that emits telemetry events for connection and disconnection From 09ae2de4ebb7471d616b3407d507400005f605b1 Mon Sep 17 00:00:00 2001 From: v0idpwn Date: Mon, 1 Jul 2024 11:33:40 -0300 Subject: [PATCH 6/6] Documentation improvements --- lib/db_connection.ex | 2 ++ lib/db_connection/telemetry_listener.ex | 18 +++++++++++++----- 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/lib/db_connection.ex b/lib/db_connection.ex index ec343f3..188ec48 100644 --- a/lib/db_connection.ex +++ b/lib/db_connection.ex @@ -566,6 +566,8 @@ defmodule DBConnection do * `:opts` - All options given to the pool operation + See `DBConnection.TelemetryListener` for enabling `[:db_connection, :connected]` + and `[:db_connection, :disconnected]` events. """ @spec start_link(module, [start_option()] | Keyword.t()) :: GenServer.on_start() def start_link(conn_mod, opts) do diff --git a/lib/db_connection/telemetry_listener.ex b/lib/db_connection/telemetry_listener.ex index 66f4e40..5e1f16e 100644 --- a/lib/db_connection/telemetry_listener.ex +++ b/lib/db_connection/telemetry_listener.ex @@ -7,13 +7,21 @@ defmodule DBConnection.TelemetryListener do ## Usage - Start the listener, optionally using a name, and pass it under the - `:connection_listeners` option when starting DbConnection: + Start the listener, and pass it under the `:connection_listeners` option when + starting DBConnection: + + {:ok, pid} = TelemetryListener.start_link() + {:ok, _conn} = DBConnection.start_link(SomeModule, connection_listeners: [pid]) - {:ok, pid} = TelemetryListener.start_link([name: MyListener]) - {:ok, _conn} = DBConnection.start_link(SomeModule, [connection_listeners: [MyListener]]) # Using a tag, which will be sent in telemetry metadata - {:ok, _conn} = DBConnection.start_link(SomeModule, [connection_listeners: {[pid], :my_tag}]) + {:ok, _conn} = DBConnection.start_link(SomeModule, connection_listeners: {[pid], :my_tag}) + + # Or, with a Supervisor: + Supervisor.start_link([ + {TelemetryListener, [name: MyListener]}, + DBConnection.child_spec(SomeModule, connection_listeners: {[MyListener], :my_tag}) + ]) + ## Telemetry events