From ab2a75b93dc24f5e0880e5157cfdc925687f4581 Mon Sep 17 00:00:00 2001 From: realglebivanov Date: Fri, 24 May 2024 15:03:21 +0300 Subject: [PATCH] Add metrics for connection pool usage v3 (#307) l --- .../connection_pool/metrics_test.exs | 169 ++++++++++ integration_test/ownership/metrics_test.exs | 293 ++++++++++++++++++ lib/db_connection.ex | 17 + lib/db_connection/connection_pool.ex | 25 ++ lib/db_connection/ownership.ex | 4 + lib/db_connection/ownership/manager.ex | 31 ++ lib/db_connection/ownership/proxy.ex | 20 ++ lib/db_connection/pool.ex | 7 + test/test_support.exs | 17 + 9 files changed, 583 insertions(+) create mode 100644 integration_test/connection_pool/metrics_test.exs create mode 100644 integration_test/ownership/metrics_test.exs diff --git a/integration_test/connection_pool/metrics_test.exs b/integration_test/connection_pool/metrics_test.exs new file mode 100644 index 0000000..4dd15c3 --- /dev/null +++ b/integration_test/connection_pool/metrics_test.exs @@ -0,0 +1,169 @@ +defmodule TestPoolMetrics do + import TestHelpers + + use ExUnit.Case, async: true + + alias TestPool, as: P + alias TestAgent, as: A + alias TestQuery, as: Q + alias TestResult, as: R + + test "considers checkin/checkout in metrics when ready" do + stack = [ + {:ok, :state}, + fn _, _, _, _ -> + receive do + :continue -> {:ok, %Q{}, %R{}, :state} + end + end + ] + + {:ok, agent} = A.start_link(stack) + {:ok, pool} = P.start_link(agent: agent, parent: self()) + + poll(fn -> + assert [%{source: {:pool, ^pool}, checkout_queue_length: 0, ready_conn_count: 1}] = + DBConnection.get_connection_metrics(pool) + end) + + query = + spawn_link(fn -> + Process.put(:agent, agent) + assert P.execute(pool, %Q{}, [:client]) + end) + + poll(fn -> + assert [%{source: {:pool, ^pool}, checkout_queue_length: 0, ready_conn_count: 0}] = + DBConnection.get_connection_metrics(pool) + end) + + send(query, :continue) + + poll(fn -> + assert [%{source: {:pool, ^pool}, checkout_queue_length: 0, ready_conn_count: 1}] = + DBConnection.get_connection_metrics(pool) + end) + + assert [ + connect: [_], + handle_execute: [%Q{}, [:client], _, :state] + ] = A.record(agent) + end + + test "considers idle connections on idle timeout in metrics" do + stack = [ + {:ok, :state}, + {:ok, :state} + ] + + idle_interval = 100 + + {:ok, agent} = A.start_link(stack) + {:ok, pool} = P.start_link(agent: agent, parent: self(), idle_interval: idle_interval) + + poll(fn -> + assert [%{source: {:pool, ^pool}, checkout_queue_length: 0, ready_conn_count: 1}] = + DBConnection.get_connection_metrics(pool) + end) + + :timer.sleep(idle_interval) + + poll(fn -> + assert [%{source: {:pool, ^pool}, checkout_queue_length: 0, ready_conn_count: 1}] = + DBConnection.get_connection_metrics(pool) + end) + + assert [ + connect: [_], + ping: [:state] + ] = A.record(agent) + end + + test "considers enqueue/dequeue in metrics when busy" do + stack = [ + {:ok, :state}, + {:ok, :state}, + fn _, _, _, _ -> + receive do + :continue -> {:ok, %Q{}, %R{}, :state} + end + end, + fn _, _, _, _ -> + receive do + :continue -> {:ok, %Q{}, %R{}, :state} + end + end, + fn _, _, _, _ -> + receive do + :continue -> {:ok, %Q{}, %R{}, :state} + end + end + ] + + {:ok, agent} = A.start_link(stack) + {:ok, pool} = P.start_link(agent: agent, parent: self(), pool_size: 2) + + poll(fn -> + assert [%{source: {:pool, ^pool}, checkout_queue_length: 0, ready_conn_count: 2}] = + DBConnection.get_connection_metrics(pool) + end) + + run_query = fn -> + spawn_link(fn -> + Process.put(:agent, agent) + assert P.execute(pool, %Q{}, [:client]) + end) + end + + queries = [run_query.()] + + poll(fn -> + assert [%{source: {:pool, ^pool}, checkout_queue_length: 0, ready_conn_count: 1}] = + DBConnection.get_connection_metrics(pool) + end) + + queries = [run_query.() | queries] + + poll(fn -> + assert [%{source: {:pool, ^pool}, checkout_queue_length: 0, ready_conn_count: 0}] = + DBConnection.get_connection_metrics(pool) + end) + + queries = [run_query.() | queries] + + poll(fn -> + assert [%{source: {:pool, ^pool}, checkout_queue_length: 1, ready_conn_count: 0}] = + DBConnection.get_connection_metrics(pool) + end) + + [query3, query2, query1] = queries + send(query1, :continue) + + poll(fn -> + assert [%{source: {:pool, ^pool}, checkout_queue_length: 0, ready_conn_count: 0}] = + DBConnection.get_connection_metrics(pool) + end) + + send(query2, :continue) + + poll(fn -> + assert [%{source: {:pool, ^pool}, checkout_queue_length: 0, ready_conn_count: 1}] = + DBConnection.get_connection_metrics(pool) + end) + + send(query3, :continue) + + poll(fn -> + assert [%{source: {:pool, ^pool}, checkout_queue_length: 0, ready_conn_count: 2}] = + DBConnection.get_connection_metrics(pool) + end) + + assert [ + connect: [_], + connect: [_], + handle_execute: [%Q{}, [:client], _, :state], + handle_execute: [%Q{}, [:client], _, :state], + handle_execute: [%Q{}, [:client], _, :state] + ] = A.record(agent) + end +end diff --git a/integration_test/ownership/metrics_test.exs b/integration_test/ownership/metrics_test.exs new file mode 100644 index 0000000..4ce450c --- /dev/null +++ b/integration_test/ownership/metrics_test.exs @@ -0,0 +1,293 @@ +defmodule TestOwnershipMetrics do + import TestHelpers + + use ExUnit.Case, async: true + + alias TestPool, as: P + alias TestAgent, as: A + alias TestQuery, as: Q + alias TestResult, as: R + + test "starts without proxy processes" do + stack = [ + {:ok, :state} + ] + + {:ok, agent} = A.start_link(stack) + {:ok, pool} = P.start_link(agent: agent, parent: self()) + + poll(fn -> + assert [%{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 1}] = + DBConnection.get_connection_metrics(pool, pool: DBConnection.Ownership) + end) + + assert [ + connect: [_] + ] = A.record(agent) + end + + with "with `:ownership_mode` = :manual" do + setup do + stack = [ + {:ok, :state}, + fn _, _, _, _ -> + receive do + :continue -> {:ok, %Q{}, %R{}, :state} + end + end, + fn _, _, _, _ -> + receive do + :continue -> {:ok, %Q{}, %R{}, :state} + end + end + ] + + {:ok, agent} = A.start_link(stack) + {:ok, pool} = P.start_link(agent: agent, parent: self(), ownership_mode: :manual) + + {:ok, %{agent: agent, pool: pool}} + end + + test "considers a manual checkin and checkout", %{agent: agent, pool: pool} do + poll(fn -> + assert [%{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 1}] = + DBConnection.get_connection_metrics(pool, pool: DBConnection.Ownership) + end) + + assert :ok = DBConnection.Ownership.ownership_checkout(pool, []) + + poll(fn -> + assert [ + %{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 0}, + %{source: {:proxy, _}, checkout_queue_length: 0, ready_conn_count: 1} + ] = DBConnection.get_connection_metrics(pool, pool: DBConnection.Ownership) + end) + + assert :ok = DBConnection.Ownership.ownership_checkin(pool, []) + + poll(fn -> + assert [ + %{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 1} + ] = DBConnection.get_connection_metrics(pool, pool: DBConnection.Ownership) + end) + + assert [ + connect: [_] + ] = A.record(agent) + end + + test "considers a query in progress", %{agent: agent, pool: pool} do + poll(fn -> + assert [%{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 1}] = + DBConnection.get_connection_metrics(pool, pool: DBConnection.Ownership) + end) + + assert :ok = DBConnection.Ownership.ownership_checkout(pool, []) + + parent = self() + + query = + spawn_link(fn -> + Process.put(:agent, agent) + assert P.execute(pool, %Q{}, [:client], caller: parent, pool: DBConnection.Ownership) + end) + + poll(fn -> + assert [ + %{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 0}, + %{source: {:proxy, _}, checkout_queue_length: 0, ready_conn_count: 0} + ] = DBConnection.get_connection_metrics(pool, pool: DBConnection.Ownership) + end) + + send(query, :continue) + + poll(fn -> + assert [ + %{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 0}, + %{source: {:proxy, _}, checkout_queue_length: 0, ready_conn_count: 1} + ] = DBConnection.get_connection_metrics(pool, pool: DBConnection.Ownership) + end) + + assert :ok = DBConnection.Ownership.ownership_checkin(pool, []) + + poll(fn -> + assert [ + %{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 1} + ] = DBConnection.get_connection_metrics(pool, pool: DBConnection.Ownership) + end) + + assert [ + connect: [_], + handle_execute: [%Q{}, [:client], _, :state] + ] = A.record(agent) + end + end + + describe "with `:ownership_mode = :auto`" do + setup do + stack = [ + {:ok, :state}, + fn _, _, _, _ -> + receive do + :continue -> {:ok, %Q{}, %R{}, :state} + end + end, + fn _, _, _, _ -> + receive do + :continue -> {:ok, %Q{}, %R{}, :state} + end + end + ] + + {:ok, agent} = A.start_link(stack) + {:ok, pool} = P.start_link(agent: agent, parent: self(), ownership_mode: :auto, name: :auto) + + {:ok, %{agent: agent, pool: pool}} + end + + test "implicitly checkouts and considers a manual checkin", %{ + pool: pool, + agent: agent + } do + poll(fn -> + assert [%{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 1}] = + DBConnection.get_connection_metrics(pool, pool: DBConnection.Ownership) + end) + + query = + spawn_link(fn -> + Process.put(:agent, agent) + assert P.execute(pool, %Q{}, [:client], pool: DBConnection.Ownership) + DBConnection.Ownership.ownership_checkin(pool, []) + end) + + poll(fn -> + assert [ + %{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 0}, + %{source: {:proxy, _}, checkout_queue_length: 0, ready_conn_count: 0} + ] = DBConnection.get_connection_metrics(pool, pool: DBConnection.Ownership) + end) + + send(query, :continue) + + poll(fn -> + assert [ + %{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 1} + ] = DBConnection.get_connection_metrics(pool, pool: DBConnection.Ownership) + end) + + assert [ + connect: [_], + handle_execute: [%Q{}, [:client], _, :state] + ] = A.record(agent) + end + + test "implicitly checkouts and checkins in case the proxy dies", %{ + pool: pool, + agent: agent + } do + poll(fn -> + assert [%{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 1}] = + DBConnection.get_connection_metrics(pool, pool: DBConnection.Ownership) + end) + + query = + spawn_link(fn -> + Process.put(:agent, agent) + assert P.execute(pool, %Q{}, [:client], pool: DBConnection.Ownership) + end) + + poll(fn -> + assert [ + %{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 0}, + %{source: {:proxy, _}, checkout_queue_length: 0, ready_conn_count: 0} + ] = DBConnection.get_connection_metrics(pool, pool: DBConnection.Ownership) + end) + + send(query, :continue) + + poll(fn -> + assert [ + %{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 1} + ] = DBConnection.get_connection_metrics(pool, pool: DBConnection.Ownership) + end) + + assert [ + connect: [_], + handle_execute: [%Q{}, [:client], _, :state] + ] = A.record(agent) + end + + test "implicitly checkouts and enqueues in case the proxy is busy", %{ + pool: pool, + agent: agent + } do + poll(fn -> + assert [%{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 1}] = + DBConnection.get_connection_metrics(pool, pool: DBConnection.Ownership) + end) + + parent = self() + + query_fn = fn -> + Process.put(:agent, agent) + + assert P.execute(pool, %Q{}, [:client], + name: :auto, + caller: parent, + pool: DBConnection.Ownership + ) + end + + query1 = spawn_link(query_fn) + + poll(fn -> + assert [ + %{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 0}, + %{source: {:proxy, _}, checkout_queue_length: 0, ready_conn_count: 0} + ] = DBConnection.get_connection_metrics(pool, pool: DBConnection.Ownership) + end) + + query2 = spawn_link(query_fn) + + poll(fn -> + assert [ + %{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 0}, + %{source: {:proxy, _}, checkout_queue_length: 1, ready_conn_count: 0} + ] = DBConnection.get_connection_metrics(pool, pool: DBConnection.Ownership) + end) + + send(query1, :continue) + + poll(fn -> + assert [ + %{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 0}, + %{source: {:proxy, _}, checkout_queue_length: 0, ready_conn_count: 0} + ] = DBConnection.get_connection_metrics(pool, pool: DBConnection.Ownership) + end) + + send(query2, :continue) + + poll(fn -> + assert [ + %{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 0}, + %{source: {:proxy, _}, checkout_queue_length: 0, ready_conn_count: 1} + ] = DBConnection.get_connection_metrics(pool, pool: DBConnection.Ownership) + end) + + assert :ok = DBConnection.Ownership.ownership_checkin(pool, []) + + poll(fn -> + assert [ + %{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 1} + ] = DBConnection.get_connection_metrics(pool, pool: DBConnection.Ownership) + end) + + assert [ + connect: [_], + handle_execute: [%Q{}, [:client], _, :state], + handle_execute: [%Q{}, [:client], _, :state] + ] = A.record(agent) + end + end +end diff --git a/lib/db_connection.ex b/lib/db_connection.ex index 482789e..b56fb1b 100644 --- a/lib/db_connection.ex +++ b/lib/db_connection.ex @@ -1285,6 +1285,23 @@ defmodule DBConnection do else: (_ -> :error) end + @doc """ + Returns connection metrics as a list in the shape of: + + [%{ + source: {:pool | :proxy, pid()}, + ready_conn_count: non_neg_integer(), + checkout_queue_length: non_neg_integer() + }] + + """ + @spec get_connection_metrics(conn, Keyword.t()) :: [DBConnection.Pool.connection_metrics()] + + def get_connection_metrics(conn, opts \\ []) do + pool = Keyword.get(opts, :pool, DBConnection.ConnectionPool) + pool.get_connection_metrics(conn) + end + defp pool_pid(%DBConnection{pool_ref: Holder.pool_ref(pool: pid)}), do: pid defp pool_pid(conn), do: GenServer.whereis(conn) diff --git a/lib/db_connection/connection_pool.ex b/lib/db_connection/connection_pool.ex index 00a793a..7b15529 100644 --- a/lib/db_connection/connection_pool.ex +++ b/lib/db_connection/connection_pool.ex @@ -40,6 +40,12 @@ defmodule DBConnection.ConnectionPool do GenServer.call(pool, {:disconnect_all, interval}, :infinity) end + @doc false + @impl DBConnection.Pool + def get_connection_metrics(pool) do + GenServer.call(pool, :get_connection_metrics, :infinity) + end + ## GenServer api @impl GenServer @@ -73,6 +79,25 @@ defmodule DBConnection.ConnectionPool do end @impl GenServer + def handle_call(:get_connection_metrics, _from, {status, queue, _, _} = state) do + {ready_conn_count, checkout_queue_length} = + case status do + :busy -> + {0, :ets.select_count(queue, [{{{:_, :_, :_}}, [], [true]}])} + + :ready -> + {:ets.select_count(queue, [{{{:_, :_}}, [], [true]}]), 0} + end + + metrics = %{ + source: {:pool, self()}, + ready_conn_count: ready_conn_count, + checkout_queue_length: checkout_queue_length + } + + {:reply, [metrics], state} + end + def handle_call({:disconnect_all, interval}, _from, {type, queue, codel, _ts}) do ts = {System.monotonic_time(), interval} {:reply, :ok, {type, queue, codel, ts}} diff --git a/lib/db_connection/ownership.ex b/lib/db_connection/ownership.ex index ddef239..26b52f2 100644 --- a/lib/db_connection/ownership.ex +++ b/lib/db_connection/ownership.ex @@ -85,6 +85,10 @@ defmodule DBConnection.Ownership do end end + @doc false + @impl DBConnection.Pool + defdelegate get_connection_metrics(pool), to: Manager + @doc """ Explicitly checks a connection out from the ownership manager. diff --git a/lib/db_connection/ownership/manager.ex b/lib/db_connection/ownership/manager.ex index 0703fdc..91a435c 100644 --- a/lib/db_connection/ownership/manager.ex +++ b/lib/db_connection/ownership/manager.ex @@ -59,6 +59,12 @@ defmodule DBConnection.Ownership.Manager do GenServer.call(manager, {:allow, parent, allow}, timeout) end + @spec get_connection_metrics(GenServer.server()) :: + {:ok, [DBConnection.Pool.connection_metrics()]} | :error + def get_connection_metrics(manager) do + GenServer.call(manager, :get_connection_metrics, :infinity) + end + ## Callbacks @impl true @@ -98,6 +104,31 @@ defmodule DBConnection.Ownership.Manager do end @impl true + def handle_call(:get_connection_metrics, _from, %{pool: pool, owners: owners, log: log} = state) do + pool_metrics = DBConnection.ConnectionPool.get_connection_metrics(pool) + + proxy_metrics = + owners + |> Enum.map(fn {_, {proxy, _, _}} -> + try do + GenServer.call(proxy, :get_connection_metrics) + catch + :exit, reason -> + if log do + Logger.log( + log, + "Caught :exit while calling :get_connection_metrics due to #{inspect(reason)}" + ) + end + + nil + end + end) + |> Enum.reject(&is_nil/1) + + {:reply, pool_metrics ++ proxy_metrics, state} + end + def handle_call(:pool, _from, %{pool: pool} = state) do {:reply, pool, state} end diff --git a/lib/db_connection/ownership/proxy.ex b/lib/db_connection/ownership/proxy.ex index 041ce49..0f18d73 100644 --- a/lib/db_connection/ownership/proxy.ex +++ b/lib/db_connection/ownership/proxy.ex @@ -171,6 +171,26 @@ defmodule DBConnection.Ownership.Proxy do down(message, state) end + @impl true + def handle_call( + :get_connection_metrics, + _, + %{queue: queue, holder: holder, client: client} = state + ) do + connection_metrics = %{ + source: {:proxy, self()}, + ready_conn_count: + if is_nil(holder) or not is_nil(client) do + 0 + else + 1 + end, + checkout_queue_length: :queue.len(queue) + } + + {:reply, connection_metrics, state} + end + defp checkout({pid, ref} = from, %{holder: holder} = state) do if Holder.handle_checkout(holder, from, ref, nil) do {:noreply, %{state | client: {pid, ref, pruned_stacktrace(pid)}}} diff --git a/lib/db_connection/pool.ex b/lib/db_connection/pool.ex index 84eb206..3f2242c 100644 --- a/lib/db_connection/pool.ex +++ b/lib/db_connection/pool.ex @@ -2,6 +2,11 @@ defmodule DBConnection.Pool do @moduledoc false @type pool :: GenServer.server() + @type connection_metrics :: %{ + source: {:pool | :proxy, pid()}, + ready_conn_count: non_neg_integer(), + checkout_queue_length: non_neg_integer() + } @callback disconnect_all(pool, interval :: term, options :: keyword) :: :ok @@ -9,4 +14,6 @@ defmodule DBConnection.Pool do {:ok, pool_ref :: term, module, checkin_time :: non_neg_integer() | nil, state :: term} | {:error, Exception.t()} + + @callback get_connection_metrics(pool :: pool()) :: [connection_metrics()] end diff --git a/test/test_support.exs b/test/test_support.exs index ecd4f16..dcbe445 100644 --- a/test/test_support.exs +++ b/test/test_support.exs @@ -1,3 +1,19 @@ +defmodule TestHelpers do + def poll(fun, attempts \\ 5) do + try do + fun.() + rescue + e -> + if attempts > 0 do + :timer.sleep(50) + poll(fun, attempts - 1) + else + reraise e, __STACKTRACE__ + end + end + end +end + defmodule TestConnection do defmacro __using__(opts) do quote do @@ -19,6 +35,7 @@ defmodule TestConnection do end defdelegate rollback(conn, reason), to: DBConnection + defdelegate get_connection_metrics(pool, opts \\ []), to: DBConnection def prepare(pool, query, opts2 \\ []) do DBConnection.prepare(pool, query, opts2 ++ unquote(opts))