From 69fd8684504dcc9d2b1f0ca60f58aec5b45faa73 Mon Sep 17 00:00:00 2001 From: Gleb Ivanov Date: Tue, 21 May 2024 18:43:14 +0300 Subject: [PATCH 01/11] Add a few queue telemetry calls to `DBConnection.ConnectionPool` --- .../connection_pool/telemetry_test.exs | 154 ++++++++++++++++++ lib/db_connection/connection_pool.ex | 47 +++++- 2 files changed, 197 insertions(+), 4 deletions(-) create mode 100644 integration_test/connection_pool/telemetry_test.exs diff --git a/integration_test/connection_pool/telemetry_test.exs b/integration_test/connection_pool/telemetry_test.exs new file mode 100644 index 0000000..d298ce4 --- /dev/null +++ b/integration_test/connection_pool/telemetry_test.exs @@ -0,0 +1,154 @@ +defmodule TestPoolTelemetry do + use ExUnit.Case, async: true + + alias TestPool, as: P + alias TestAgent, as: A + alias TestQuery, as: Q + alias TestResult, as: R + + @events [ + [:db_connection, :enqueue], + [:db_connection, :dequeue], + [:db_connection, :checkout], + [:db_connection, :checkin], + [:db_connection, :drop_idle] + ] + + setup do + telemetry_ref = make_ref() + + :telemetry.attach_many( + telemetry_ref, + @events, + fn event, measurements, metadata, {ref, dest_pid} -> + send(dest_pid, {event, ref, measurements, metadata}) + end, + {telemetry_ref, self()} + ) + + on_exit(fn -> + :telemetry.detach(telemetry_ref) + end) + + %{telemetry_ref: telemetry_ref} + end + + test "checkin/checkout telemetry when ready", %{telemetry_ref: telemetry_ref} do + stack = [ + {:ok, :state}, + {:ok, %Q{}, %R{}, :state} + ] + + {:ok, agent} = A.start_link(stack) + {:ok, pool} = P.start_link(agent: agent, parent: self()) + + assert_receive {[:db_connection, :checkin], ^telemetry_ref, %{count: 1}, + %{checkout_queue_length: 0, ready_conn_count: 1}} + + assert P.execute(pool, %Q{}, [:client]) + + assert_receive {[:db_connection, :checkout], ^telemetry_ref, %{count: 1}, + %{checkout_queue_length: 0, ready_conn_count: 0}} + + assert_receive {[:db_connection, :checkin], ^telemetry_ref, %{count: 1}, + %{checkout_queue_length: 0, ready_conn_count: 1}} + + assert [ + connect: [_], + handle_execute: [%Q{}, [:client], _, :state] + ] = A.record(agent) + end + + test "idle telemetry on idle timeout", %{telemetry_ref: telemetry_ref} do + stack = [ + {:ok, :state}, + {:ok, :state} + ] + + idle_interval = 100 + + {:ok, agent} = A.start_link(stack) + {:ok, _} = P.start_link(agent: agent, parent: self(), idle_interval: idle_interval) + + assert_receive {[:db_connection, :checkin], ^telemetry_ref, %{count: 1}, + %{checkout_queue_length: 0, ready_conn_count: 1}} + + assert_receive {[:db_connection, :drop_idle], ^telemetry_ref, %{count: 1}, + %{checkout_queue_length: 0, ready_conn_count: 0}}, + idle_interval + + assert [{:connect, [_]} | _] = A.record(agent) + end + + test "enqueue/dequeue telemetry when busy", %{telemetry_ref: telemetry_ref} do + stack = [ + {:ok, :state}, + {:ok, :state}, + fn _, _, _, _ -> + receive do + :continue -> {:ok, %Q{}, %R{}, :state} + end + end, + fn _, _, _, _ -> + receive do + :continue -> {:ok, %Q{}, %R{}, :state} + end + end, + fn _, _, _, _ -> + receive do + :continue -> {:ok, %Q{}, %R{}, :state} + end + end + ] + + {:ok, agent} = A.start_link(stack) + {:ok, pool} = P.start_link(agent: agent, parent: self(), pool_size: 2) + + assert_receive {[:db_connection, :checkin], ^telemetry_ref, %{count: 1}, + %{checkout_queue_length: 0, ready_conn_count: 1}} + + assert_receive {[:db_connection, :checkin], ^telemetry_ref, %{count: 1}, + %{checkout_queue_length: 0, ready_conn_count: 2}} + + fn -> + spawn_link(fn -> + Process.put(:agent, agent) + assert P.execute(pool, %Q{}, [:client]) + end) + end + |> Stream.repeatedly() + |> Enum.take(3) + |> Enum.each(fn pid -> + send(pid, :continue) + end) + + assert_receive {[:db_connection, :checkout], ^telemetry_ref, %{count: 1}, + %{checkout_queue_length: 0, ready_conn_count: 1}} + + assert_receive {[:db_connection, :checkout], ^telemetry_ref, %{count: 1}, + %{checkout_queue_length: 0, ready_conn_count: 0}} + + assert_receive {[:db_connection, :enqueue], ^telemetry_ref, %{count: 1}, + %{checkout_queue_length: 1, ready_conn_count: 0}} + + assert_receive {[:db_connection, :dequeue], ^telemetry_ref, %{count: 1}, + %{checkout_queue_length: 0, ready_conn_count: 0}} + + assert_receive {[:db_connection, :checkout], ^telemetry_ref, %{count: 1}, + %{checkout_queue_length: 0, ready_conn_count: 0}} + + assert_receive {[:db_connection, :checkin], ^telemetry_ref, %{count: 1}, + %{checkout_queue_length: 0, ready_conn_count: 1}} + + assert_receive {[:db_connection, :checkin], ^telemetry_ref, %{count: 1}, + %{checkout_queue_length: 0, ready_conn_count: 2}} + + assert [ + connect: [_], + connect: [_], + handle_execute: [%Q{}, [:client], _, :state], + handle_execute: [%Q{}, [:client], _, :state], + handle_execute: [%Q{}, [:client], _, :state] + ] = A.record(agent) + end +end diff --git a/lib/db_connection/connection_pool.ex b/lib/db_connection/connection_pool.ex index 00a793a..ac663e2 100644 --- a/lib/db_connection/connection_pool.ex +++ b/lib/db_connection/connection_pool.ex @@ -73,9 +73,9 @@ defmodule DBConnection.ConnectionPool do end @impl GenServer - def handle_call({:disconnect_all, interval}, _from, {type, queue, codel, _ts}) do + def handle_call({:disconnect_all, interval}, _from, {status, queue, codel, _ts}) do ts = {System.monotonic_time(), interval} - {:reply, :ok, {type, queue, codel, ts}} + {:reply, :ok, {status, queue, codel, ts}} end @impl GenServer @@ -86,6 +86,7 @@ defmodule DBConnection.ConnectionPool do case queue? do true -> :ets.insert(queue, {{now, System.unique_integer(), from}}) + execute_queue_telemetry(:busy, queue, :enqueue) {:noreply, busy} false -> @@ -102,7 +103,11 @@ defmodule DBConnection.ConnectionPool do ) do case :ets.first(queue) do {queued_in_native, holder} = key -> - Holder.handle_checkout(holder, from, queue, queued_in_native) and :ets.delete(queue, key) + if Holder.handle_checkout(holder, from, queue, queued_in_native) and + :ets.delete(queue, key) do + execute_queue_telemetry(:ready, queue, :checkout) + end + {:noreply, ready} :"$end_of_table" -> @@ -199,6 +204,9 @@ defmodule DBConnection.ConnectionPool do :ets.first(queue) do :ets.delete(queue, key) Holder.maybe_disconnect(holder, elem(ts, 0), 0) or Holder.handle_ping(holder) + + execute_queue_telemetry(status, queue, :drop_idle) + drop_idle(past_in_native, limit - 1, status, queue, codel, ts) else _ -> @@ -229,15 +237,20 @@ defmodule DBConnection.ConnectionPool do guards = [{:<, :"$1", min_sent}] select_slow = [{match, guards, [{{:"$1", :"$2"}}]}] - for {sent, from} <- :ets.select(queue, select_slow) do + slow = :ets.select(queue, select_slow) + + for {sent, from} <- slow do drop(time - sent, from) end :ets.select_delete(queue, [{match, guards, [true]}]) + + execute_queue_telemetry(:busy, queue, :dequeue, length(slow)) end defp handle_checkin(holder, now_in_native, {:ready, queue, _, _} = data) do :ets.insert(queue, {{now_in_native, holder}}) + execute_queue_telemetry(:ready, queue, :checkin) {:noreply, data} end @@ -250,6 +263,7 @@ defmodule DBConnection.ConnectionPool do {:ready, _, _, _} = ready -> :ets.insert(queue, {{now_in_native, holder}}) + execute_queue_telemetry(:ready, queue, :checkin) {:noreply, ready} end end @@ -274,6 +288,7 @@ defmodule DBConnection.ConnectionPool do case :ets.first(queue) do {sent, _, from} = key -> :ets.delete(queue, key) + execute_queue_telemetry(:busy, queue, :dequeue) delay = time - sent codel = %{codel | next: next, delay: delay, slow: slow?} go(delay, from, time, holder, queue, codel, ts) @@ -288,6 +303,7 @@ defmodule DBConnection.ConnectionPool do case :ets.first(queue) do {sent, _, from} = key -> :ets.delete(queue, key) + execute_queue_telemetry(:busy, queue, :dequeue) go(time - sent, from, time, holder, queue, codel, ts) :"$end_of_table" -> @@ -299,11 +315,13 @@ defmodule DBConnection.ConnectionPool do case :ets.first(queue) do {sent, _, from} = key when time - sent > timeout -> :ets.delete(queue, key) + execute_queue_telemetry(:busy, queue, :dequeue) drop(time - sent, from) dequeue_slow(time, timeout, holder, queue, codel, ts) {sent, _, from} = key -> :ets.delete(queue, key) + execute_queue_telemetry(:busy, queue, :dequeue) go(time - sent, from, time, holder, queue, codel, ts) :"$end_of_table" -> @@ -314,9 +332,11 @@ defmodule DBConnection.ConnectionPool do defp go(delay, from, time, holder, queue, %{delay: min} = codel, ts) do case Holder.handle_checkout(holder, from, queue, 0) do true when delay < min -> + execute_queue_telemetry(:busy, queue, :checkout) {:busy, queue, %{codel | delay: delay}, ts} true -> + execute_queue_telemetry(:busy, queue, :checkout) {:busy, queue, codel, ts} false -> @@ -358,4 +378,23 @@ defmodule DBConnection.ConnectionPool do idle = :erlang.start_timer(timeout, self(), now_in_native, abs: true) %{codel | idle: idle} end + + defp execute_queue_telemetry(status, queue, event, count \\ 1) + when status in [:busy, :ready] and + event in [:checkin, :checkout, :enqueue, :dequeue, :drop_idle] do + {ready_conn_count, checkout_queue_length} = + case status do + :busy -> + {0, :ets.select_count(queue, [{{{:_, :_, :_}}, [], [true]}])} + + :ready -> + {:ets.select_count(queue, [{{{:_, :_}}, [], [true]}]), 0} + end + + :telemetry.execute( + [:db_connection, event], + %{count: count}, + %{ready_conn_count: ready_conn_count, checkout_queue_length: checkout_queue_length} + ) + end end From 5c7808b33ccb0a50e936c4f4cce4ab1f66497dff Mon Sep 17 00:00:00 2001 From: Gleb Ivanov Date: Wed, 22 May 2024 13:35:35 +0300 Subject: [PATCH 02/11] Switch to poll-based ConnectionPool.get_connection_metrics implementation --- .../connection_pool/metrics_test.exs | 169 ++++++++++++++++++ .../connection_pool/telemetry_test.exs | 154 ---------------- lib/db_connection/connection_pool.ex | 75 ++++---- test/test_support.exs | 1 + 4 files changed, 202 insertions(+), 197 deletions(-) create mode 100644 integration_test/connection_pool/metrics_test.exs delete mode 100644 integration_test/connection_pool/telemetry_test.exs diff --git a/integration_test/connection_pool/metrics_test.exs b/integration_test/connection_pool/metrics_test.exs new file mode 100644 index 0000000..734196d --- /dev/null +++ b/integration_test/connection_pool/metrics_test.exs @@ -0,0 +1,169 @@ +defmodule TestPoolMetrics do + use ExUnit.Case, async: true + + alias TestPool, as: P + alias TestAgent, as: A + alias TestQuery, as: Q + alias TestResult, as: R + + test "considers checkin/checkout in metrics when ready" do + stack = [ + {:ok, :state}, + fn _, _, _, _ -> + receive do + :continue -> {:ok, %Q{}, %R{}, :state} + end + end + ] + + {:ok, agent} = A.start_link(stack) + {:ok, pool} = P.start_link(agent: agent, parent: self()) + + poll(fn -> + assert %{checkout_queue_length: 0, ready_conn_count: 1} = P.get_connection_metrics(pool) + end) + + query = + spawn_link(fn -> + Process.put(:agent, agent) + P.execute(pool, %Q{}, [:client]) + end) + + poll(fn -> + assert %{checkout_queue_length: 0, ready_conn_count: 0} = P.get_connection_metrics(pool) + end) + + send(query, :continue) + + poll(fn -> + assert %{checkout_queue_length: 0, ready_conn_count: 1} = P.get_connection_metrics(pool) + end) + + assert [ + connect: [_], + handle_execute: [%Q{}, [:client], _, :state] + ] = A.record(agent) + end + + test "considers idle connections on idle timeout in metrics" do + stack = [ + {:ok, :state}, + {:ok, :state} + ] + + idle_interval = 100 + + {:ok, agent} = A.start_link(stack) + {:ok, pool} = P.start_link(agent: agent, parent: self(), idle_interval: idle_interval) + + poll(fn -> + assert %{checkout_queue_length: 0, ready_conn_count: 1} = P.get_connection_metrics(pool) + end) + + :timer.sleep(idle_interval) + + poll(fn -> + assert %{checkout_queue_length: 0, ready_conn_count: 1} = P.get_connection_metrics(pool) + end) + + assert [ + connect: [_], + ping: [:state] + ] = A.record(agent) + end + + test "considers enqueue/dequeue in metrics when busy" do + stack = [ + {:ok, :state}, + {:ok, :state}, + fn _, _, _, _ -> + receive do + :continue -> {:ok, %Q{}, %R{}, :state} + end + end, + fn _, _, _, _ -> + receive do + :continue -> {:ok, %Q{}, %R{}, :state} + end + end, + fn _, _, _, _ -> + receive do + :continue -> {:ok, %Q{}, %R{}, :state} + end + end + ] + + {:ok, agent} = A.start_link(stack) + {:ok, pool} = P.start_link(agent: agent, parent: self(), pool_size: 2) + + poll(fn -> + assert %{checkout_queue_length: 0, ready_conn_count: 2} = P.get_connection_metrics(pool) + end) + + run_query = fn -> + spawn_link(fn -> + Process.put(:agent, agent) + assert P.execute(pool, %Q{}, [:client]) + end) + end + + queries = [run_query.()] + + poll(fn -> + assert %{checkout_queue_length: 0, ready_conn_count: 1} = P.get_connection_metrics(pool) + end) + + queries = [run_query.() | queries] + + poll(fn -> + assert %{checkout_queue_length: 0, ready_conn_count: 0} = P.get_connection_metrics(pool) + end) + + queries = [run_query.() | queries] + + poll(fn -> + assert %{checkout_queue_length: 1, ready_conn_count: 0} = P.get_connection_metrics(pool) + end) + + [query3, query2, query1] = queries + send(query1, :continue) + + poll(fn -> + assert %{checkout_queue_length: 0, ready_conn_count: 0} = P.get_connection_metrics(pool) + end) + + send(query2, :continue) + + poll(fn -> + assert %{checkout_queue_length: 0, ready_conn_count: 1} = P.get_connection_metrics(pool) + end) + + send(query3, :continue) + + poll(fn -> + assert %{checkout_queue_length: 0, ready_conn_count: 2} = P.get_connection_metrics(pool) + end) + + assert [ + connect: [_], + connect: [_], + handle_execute: [%Q{}, [:client], _, :state], + handle_execute: [%Q{}, [:client], _, :state], + handle_execute: [%Q{}, [:client], _, :state] + ] = A.record(agent) + end + + def poll(fun, attempts \\ 5) do + try do + fun.() + rescue + e -> + if attempts > 0 do + :timer.sleep(50) + poll(fun, attempts - 1) + else + reraise e, __STACKTRACE__ + end + end + end +end diff --git a/integration_test/connection_pool/telemetry_test.exs b/integration_test/connection_pool/telemetry_test.exs deleted file mode 100644 index d298ce4..0000000 --- a/integration_test/connection_pool/telemetry_test.exs +++ /dev/null @@ -1,154 +0,0 @@ -defmodule TestPoolTelemetry do - use ExUnit.Case, async: true - - alias TestPool, as: P - alias TestAgent, as: A - alias TestQuery, as: Q - alias TestResult, as: R - - @events [ - [:db_connection, :enqueue], - [:db_connection, :dequeue], - [:db_connection, :checkout], - [:db_connection, :checkin], - [:db_connection, :drop_idle] - ] - - setup do - telemetry_ref = make_ref() - - :telemetry.attach_many( - telemetry_ref, - @events, - fn event, measurements, metadata, {ref, dest_pid} -> - send(dest_pid, {event, ref, measurements, metadata}) - end, - {telemetry_ref, self()} - ) - - on_exit(fn -> - :telemetry.detach(telemetry_ref) - end) - - %{telemetry_ref: telemetry_ref} - end - - test "checkin/checkout telemetry when ready", %{telemetry_ref: telemetry_ref} do - stack = [ - {:ok, :state}, - {:ok, %Q{}, %R{}, :state} - ] - - {:ok, agent} = A.start_link(stack) - {:ok, pool} = P.start_link(agent: agent, parent: self()) - - assert_receive {[:db_connection, :checkin], ^telemetry_ref, %{count: 1}, - %{checkout_queue_length: 0, ready_conn_count: 1}} - - assert P.execute(pool, %Q{}, [:client]) - - assert_receive {[:db_connection, :checkout], ^telemetry_ref, %{count: 1}, - %{checkout_queue_length: 0, ready_conn_count: 0}} - - assert_receive {[:db_connection, :checkin], ^telemetry_ref, %{count: 1}, - %{checkout_queue_length: 0, ready_conn_count: 1}} - - assert [ - connect: [_], - handle_execute: [%Q{}, [:client], _, :state] - ] = A.record(agent) - end - - test "idle telemetry on idle timeout", %{telemetry_ref: telemetry_ref} do - stack = [ - {:ok, :state}, - {:ok, :state} - ] - - idle_interval = 100 - - {:ok, agent} = A.start_link(stack) - {:ok, _} = P.start_link(agent: agent, parent: self(), idle_interval: idle_interval) - - assert_receive {[:db_connection, :checkin], ^telemetry_ref, %{count: 1}, - %{checkout_queue_length: 0, ready_conn_count: 1}} - - assert_receive {[:db_connection, :drop_idle], ^telemetry_ref, %{count: 1}, - %{checkout_queue_length: 0, ready_conn_count: 0}}, - idle_interval - - assert [{:connect, [_]} | _] = A.record(agent) - end - - test "enqueue/dequeue telemetry when busy", %{telemetry_ref: telemetry_ref} do - stack = [ - {:ok, :state}, - {:ok, :state}, - fn _, _, _, _ -> - receive do - :continue -> {:ok, %Q{}, %R{}, :state} - end - end, - fn _, _, _, _ -> - receive do - :continue -> {:ok, %Q{}, %R{}, :state} - end - end, - fn _, _, _, _ -> - receive do - :continue -> {:ok, %Q{}, %R{}, :state} - end - end - ] - - {:ok, agent} = A.start_link(stack) - {:ok, pool} = P.start_link(agent: agent, parent: self(), pool_size: 2) - - assert_receive {[:db_connection, :checkin], ^telemetry_ref, %{count: 1}, - %{checkout_queue_length: 0, ready_conn_count: 1}} - - assert_receive {[:db_connection, :checkin], ^telemetry_ref, %{count: 1}, - %{checkout_queue_length: 0, ready_conn_count: 2}} - - fn -> - spawn_link(fn -> - Process.put(:agent, agent) - assert P.execute(pool, %Q{}, [:client]) - end) - end - |> Stream.repeatedly() - |> Enum.take(3) - |> Enum.each(fn pid -> - send(pid, :continue) - end) - - assert_receive {[:db_connection, :checkout], ^telemetry_ref, %{count: 1}, - %{checkout_queue_length: 0, ready_conn_count: 1}} - - assert_receive {[:db_connection, :checkout], ^telemetry_ref, %{count: 1}, - %{checkout_queue_length: 0, ready_conn_count: 0}} - - assert_receive {[:db_connection, :enqueue], ^telemetry_ref, %{count: 1}, - %{checkout_queue_length: 1, ready_conn_count: 0}} - - assert_receive {[:db_connection, :dequeue], ^telemetry_ref, %{count: 1}, - %{checkout_queue_length: 0, ready_conn_count: 0}} - - assert_receive {[:db_connection, :checkout], ^telemetry_ref, %{count: 1}, - %{checkout_queue_length: 0, ready_conn_count: 0}} - - assert_receive {[:db_connection, :checkin], ^telemetry_ref, %{count: 1}, - %{checkout_queue_length: 0, ready_conn_count: 1}} - - assert_receive {[:db_connection, :checkin], ^telemetry_ref, %{count: 1}, - %{checkout_queue_length: 0, ready_conn_count: 2}} - - assert [ - connect: [_], - connect: [_], - handle_execute: [%Q{}, [:client], _, :state], - handle_execute: [%Q{}, [:client], _, :state], - handle_execute: [%Q{}, [:client], _, :state] - ] = A.record(agent) - end -end diff --git a/lib/db_connection/connection_pool.ex b/lib/db_connection/connection_pool.ex index ac663e2..c65e1ce 100644 --- a/lib/db_connection/connection_pool.ex +++ b/lib/db_connection/connection_pool.ex @@ -40,6 +40,16 @@ defmodule DBConnection.ConnectionPool do GenServer.call(pool, {:disconnect_all, interval}, :infinity) end + @doc """ + Returns connection metrics in the shape of %{ + ready_conn_count: N, + checkout_queue_length: N + } + """ + def get_connection_metrics(pool) do + GenServer.call(pool, :get_connection_metrics) + end + ## GenServer api @impl GenServer @@ -73,9 +83,27 @@ defmodule DBConnection.ConnectionPool do end @impl GenServer - def handle_call({:disconnect_all, interval}, _from, {status, queue, codel, _ts}) do + def handle_call(:get_connection_metrics, _from, {status, queue, _, _} = state) do + {ready_conn_count, checkout_queue_length} = + case status do + :busy -> + {0, :ets.select_count(queue, [{{{:_, :_, :_}}, [], [true]}])} + + :ready -> + {:ets.select_count(queue, [{{{:_, :_}}, [], [true]}]), 0} + end + + metrics = %{ + ready_conn_count: ready_conn_count, + checkout_queue_length: checkout_queue_length + } + + {:reply, metrics, state} + end + + def handle_call({:disconnect_all, interval}, _from, {type, queue, codel, _ts}) do ts = {System.monotonic_time(), interval} - {:reply, :ok, {status, queue, codel, ts}} + {:reply, :ok, {type, queue, codel, ts}} end @impl GenServer @@ -86,7 +114,6 @@ defmodule DBConnection.ConnectionPool do case queue? do true -> :ets.insert(queue, {{now, System.unique_integer(), from}}) - execute_queue_telemetry(:busy, queue, :enqueue) {:noreply, busy} false -> @@ -103,11 +130,7 @@ defmodule DBConnection.ConnectionPool do ) do case :ets.first(queue) do {queued_in_native, holder} = key -> - if Holder.handle_checkout(holder, from, queue, queued_in_native) and - :ets.delete(queue, key) do - execute_queue_telemetry(:ready, queue, :checkout) - end - + Holder.handle_checkout(holder, from, queue, queued_in_native) and :ets.delete(queue, key) {:noreply, ready} :"$end_of_table" -> @@ -204,9 +227,6 @@ defmodule DBConnection.ConnectionPool do :ets.first(queue) do :ets.delete(queue, key) Holder.maybe_disconnect(holder, elem(ts, 0), 0) or Holder.handle_ping(holder) - - execute_queue_telemetry(status, queue, :drop_idle) - drop_idle(past_in_native, limit - 1, status, queue, codel, ts) else _ -> @@ -237,20 +257,15 @@ defmodule DBConnection.ConnectionPool do guards = [{:<, :"$1", min_sent}] select_slow = [{match, guards, [{{:"$1", :"$2"}}]}] - slow = :ets.select(queue, select_slow) - - for {sent, from} <- slow do + for {sent, from} <- :ets.select(queue, select_slow) do drop(time - sent, from) end :ets.select_delete(queue, [{match, guards, [true]}]) - - execute_queue_telemetry(:busy, queue, :dequeue, length(slow)) end defp handle_checkin(holder, now_in_native, {:ready, queue, _, _} = data) do :ets.insert(queue, {{now_in_native, holder}}) - execute_queue_telemetry(:ready, queue, :checkin) {:noreply, data} end @@ -263,7 +278,6 @@ defmodule DBConnection.ConnectionPool do {:ready, _, _, _} = ready -> :ets.insert(queue, {{now_in_native, holder}}) - execute_queue_telemetry(:ready, queue, :checkin) {:noreply, ready} end end @@ -288,7 +302,6 @@ defmodule DBConnection.ConnectionPool do case :ets.first(queue) do {sent, _, from} = key -> :ets.delete(queue, key) - execute_queue_telemetry(:busy, queue, :dequeue) delay = time - sent codel = %{codel | next: next, delay: delay, slow: slow?} go(delay, from, time, holder, queue, codel, ts) @@ -303,7 +316,6 @@ defmodule DBConnection.ConnectionPool do case :ets.first(queue) do {sent, _, from} = key -> :ets.delete(queue, key) - execute_queue_telemetry(:busy, queue, :dequeue) go(time - sent, from, time, holder, queue, codel, ts) :"$end_of_table" -> @@ -315,13 +327,11 @@ defmodule DBConnection.ConnectionPool do case :ets.first(queue) do {sent, _, from} = key when time - sent > timeout -> :ets.delete(queue, key) - execute_queue_telemetry(:busy, queue, :dequeue) drop(time - sent, from) dequeue_slow(time, timeout, holder, queue, codel, ts) {sent, _, from} = key -> :ets.delete(queue, key) - execute_queue_telemetry(:busy, queue, :dequeue) go(time - sent, from, time, holder, queue, codel, ts) :"$end_of_table" -> @@ -332,11 +342,9 @@ defmodule DBConnection.ConnectionPool do defp go(delay, from, time, holder, queue, %{delay: min} = codel, ts) do case Holder.handle_checkout(holder, from, queue, 0) do true when delay < min -> - execute_queue_telemetry(:busy, queue, :checkout) {:busy, queue, %{codel | delay: delay}, ts} true -> - execute_queue_telemetry(:busy, queue, :checkout) {:busy, queue, codel, ts} false -> @@ -378,23 +386,4 @@ defmodule DBConnection.ConnectionPool do idle = :erlang.start_timer(timeout, self(), now_in_native, abs: true) %{codel | idle: idle} end - - defp execute_queue_telemetry(status, queue, event, count \\ 1) - when status in [:busy, :ready] and - event in [:checkin, :checkout, :enqueue, :dequeue, :drop_idle] do - {ready_conn_count, checkout_queue_length} = - case status do - :busy -> - {0, :ets.select_count(queue, [{{{:_, :_, :_}}, [], [true]}])} - - :ready -> - {:ets.select_count(queue, [{{{:_, :_}}, [], [true]}]), 0} - end - - :telemetry.execute( - [:db_connection, event], - %{count: count}, - %{ready_conn_count: ready_conn_count, checkout_queue_length: checkout_queue_length} - ) - end end diff --git a/test/test_support.exs b/test/test_support.exs index ecd4f16..55ba8a2 100644 --- a/test/test_support.exs +++ b/test/test_support.exs @@ -19,6 +19,7 @@ defmodule TestConnection do end defdelegate rollback(conn, reason), to: DBConnection + defdelegate get_connection_metrics(pool), to: DBConnection.ConnectionPool def prepare(pool, query, opts2 \\ []) do DBConnection.prepare(pool, query, opts2 ++ unquote(opts)) From 1d219197dc298a2f7a33a8878c842b4ac646a4e6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Valim?= Date: Wed, 22 May 2024 12:54:59 +0200 Subject: [PATCH 03/11] Update connection_pool.ex --- lib/db_connection/connection_pool.ex | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/lib/db_connection/connection_pool.ex b/lib/db_connection/connection_pool.ex index c65e1ce..8871575 100644 --- a/lib/db_connection/connection_pool.ex +++ b/lib/db_connection/connection_pool.ex @@ -41,10 +41,13 @@ defmodule DBConnection.ConnectionPool do end @doc """ - Returns connection metrics in the shape of %{ - ready_conn_count: N, - checkout_queue_length: N - } + Returns connection metrics as a map in the shape of: + + %{ + ready_conn_count: integer(), + checkout_queue_length: integer() + } + """ def get_connection_metrics(pool) do GenServer.call(pool, :get_connection_metrics) From f15524428811246b0c56a6ff8a778d32b37bcf89 Mon Sep 17 00:00:00 2001 From: Gleb Ivanov Date: Thu, 23 May 2024 17:30:00 +0300 Subject: [PATCH 04/11] Implement get_connection_metrics for Ownership pool and make get_connection_metrics a part of DBConnection public API --- .../connection_pool/metrics_test.exs | 54 +-- integration_test/ownership/metrics_test.exs | 321 ++++++++++++++++++ lib/db_connection.ex | 15 + lib/db_connection/connection_pool.ex | 14 +- lib/db_connection/ownership.ex | 4 + lib/db_connection/ownership/manager.ex | 38 +++ lib/db_connection/ownership/proxy.ex | 20 ++ lib/db_connection/pool.ex | 7 + test/test_support.exs | 18 +- 9 files changed, 453 insertions(+), 38 deletions(-) create mode 100644 integration_test/ownership/metrics_test.exs diff --git a/integration_test/connection_pool/metrics_test.exs b/integration_test/connection_pool/metrics_test.exs index 734196d..93519da 100644 --- a/integration_test/connection_pool/metrics_test.exs +++ b/integration_test/connection_pool/metrics_test.exs @@ -1,4 +1,6 @@ defmodule TestPoolMetrics do + import TestHelpers + use ExUnit.Case, async: true alias TestPool, as: P @@ -20,23 +22,26 @@ defmodule TestPoolMetrics do {:ok, pool} = P.start_link(agent: agent, parent: self()) poll(fn -> - assert %{checkout_queue_length: 0, ready_conn_count: 1} = P.get_connection_metrics(pool) + assert {:ok, [%{source: {:pool, ^pool}, checkout_queue_length: 0, ready_conn_count: 1}]} = + DBConnection.get_connection_metrics(pool) end) query = spawn_link(fn -> Process.put(:agent, agent) - P.execute(pool, %Q{}, [:client]) + assert P.execute(pool, %Q{}, [:client]) end) poll(fn -> - assert %{checkout_queue_length: 0, ready_conn_count: 0} = P.get_connection_metrics(pool) + assert {:ok, [%{source: {:pool, ^pool}, checkout_queue_length: 0, ready_conn_count: 0}]} = + DBConnection.get_connection_metrics(pool) end) send(query, :continue) poll(fn -> - assert %{checkout_queue_length: 0, ready_conn_count: 1} = P.get_connection_metrics(pool) + assert {:ok, [%{source: {:pool, ^pool}, checkout_queue_length: 0, ready_conn_count: 1}]} = + DBConnection.get_connection_metrics(pool) end) assert [ @@ -57,13 +62,15 @@ defmodule TestPoolMetrics do {:ok, pool} = P.start_link(agent: agent, parent: self(), idle_interval: idle_interval) poll(fn -> - assert %{checkout_queue_length: 0, ready_conn_count: 1} = P.get_connection_metrics(pool) + assert {:ok, [%{source: {:pool, ^pool}, checkout_queue_length: 0, ready_conn_count: 1}]} = + DBConnection.get_connection_metrics(pool) end) :timer.sleep(idle_interval) poll(fn -> - assert %{checkout_queue_length: 0, ready_conn_count: 1} = P.get_connection_metrics(pool) + assert {:ok, [%{source: {:pool, ^pool}, checkout_queue_length: 0, ready_conn_count: 1}]} = + DBConnection.get_connection_metrics(pool) end) assert [ @@ -97,7 +104,8 @@ defmodule TestPoolMetrics do {:ok, pool} = P.start_link(agent: agent, parent: self(), pool_size: 2) poll(fn -> - assert %{checkout_queue_length: 0, ready_conn_count: 2} = P.get_connection_metrics(pool) + assert {:ok, [%{source: {:pool, ^pool}, checkout_queue_length: 0, ready_conn_count: 2}]} = + DBConnection.get_connection_metrics(pool) end) run_query = fn -> @@ -110,38 +118,44 @@ defmodule TestPoolMetrics do queries = [run_query.()] poll(fn -> - assert %{checkout_queue_length: 0, ready_conn_count: 1} = P.get_connection_metrics(pool) + assert {:ok, [%{source: {:pool, ^pool}, checkout_queue_length: 0, ready_conn_count: 1}]} = + DBConnection.get_connection_metrics(pool) end) queries = [run_query.() | queries] poll(fn -> - assert %{checkout_queue_length: 0, ready_conn_count: 0} = P.get_connection_metrics(pool) + assert {:ok, [%{source: {:pool, ^pool}, checkout_queue_length: 0, ready_conn_count: 0}]} = + DBConnection.get_connection_metrics(pool) end) queries = [run_query.() | queries] poll(fn -> - assert %{checkout_queue_length: 1, ready_conn_count: 0} = P.get_connection_metrics(pool) + assert {:ok, [%{source: {:pool, ^pool}, checkout_queue_length: 1, ready_conn_count: 0}]} = + DBConnection.get_connection_metrics(pool) end) [query3, query2, query1] = queries send(query1, :continue) poll(fn -> - assert %{checkout_queue_length: 0, ready_conn_count: 0} = P.get_connection_metrics(pool) + assert {:ok, [%{source: {:pool, ^pool}, checkout_queue_length: 0, ready_conn_count: 0}]} = + DBConnection.get_connection_metrics(pool) end) send(query2, :continue) poll(fn -> - assert %{checkout_queue_length: 0, ready_conn_count: 1} = P.get_connection_metrics(pool) + assert {:ok, [%{source: {:pool, ^pool}, checkout_queue_length: 0, ready_conn_count: 1}]} = + DBConnection.get_connection_metrics(pool) end) send(query3, :continue) poll(fn -> - assert %{checkout_queue_length: 0, ready_conn_count: 2} = P.get_connection_metrics(pool) + assert {:ok, [%{source: {:pool, ^pool}, checkout_queue_length: 0, ready_conn_count: 2}]} = + DBConnection.get_connection_metrics(pool) end) assert [ @@ -152,18 +166,4 @@ defmodule TestPoolMetrics do handle_execute: [%Q{}, [:client], _, :state] ] = A.record(agent) end - - def poll(fun, attempts \\ 5) do - try do - fun.() - rescue - e -> - if attempts > 0 do - :timer.sleep(50) - poll(fun, attempts - 1) - else - reraise e, __STACKTRACE__ - end - end - end end diff --git a/integration_test/ownership/metrics_test.exs b/integration_test/ownership/metrics_test.exs new file mode 100644 index 0000000..649a9fa --- /dev/null +++ b/integration_test/ownership/metrics_test.exs @@ -0,0 +1,321 @@ +defmodule TestOwnershipMetrics do + import TestHelpers + + use ExUnit.Case, async: true + + alias TestPool, as: P + alias TestAgent, as: A + alias TestQuery, as: Q + alias TestResult, as: R + + test "starts without proxy processes" do + stack = [ + {:ok, :state} + ] + + {:ok, agent} = A.start_link(stack) + {:ok, pool} = P.start_link(agent: agent, parent: self()) + + poll(fn -> + assert {:ok, [%{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 1}]} = + DBConnection.get_connection_metrics(pool, pool: DBConnection.Ownership) + end) + + assert [ + connect: [_] + ] = A.record(agent) + end + + with "with `:ownership_mode` = :manual" do + setup do + stack = [ + {:ok, :state}, + fn _, _, _, _ -> + receive do + :continue -> {:ok, %Q{}, %R{}, :state} + end + end, + fn _, _, _, _ -> + receive do + :continue -> {:ok, %Q{}, %R{}, :state} + end + end + ] + + {:ok, agent} = A.start_link(stack) + {:ok, pool} = P.start_link(agent: agent, parent: self(), ownership_mode: :manual) + + {:ok, %{agent: agent, pool: pool}} + end + + test "considers a manual checkin and checkout", %{agent: agent, pool: pool} do + poll(fn -> + assert {:ok, [%{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 1}]} = + DBConnection.get_connection_metrics(pool, pool: DBConnection.Ownership) + end) + + assert :ok = DBConnection.Ownership.ownership_checkout(pool, []) + + poll(fn -> + assert {:ok, + [ + %{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 0}, + %{source: {:proxy, _}, checkout_queue_length: 0, ready_conn_count: 1} + ]} = + DBConnection.get_connection_metrics(pool, pool: DBConnection.Ownership) + end) + + assert :ok = DBConnection.Ownership.ownership_checkin(pool, []) + + poll(fn -> + assert {:ok, + [ + %{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 1} + ]} = + DBConnection.get_connection_metrics(pool, pool: DBConnection.Ownership) + end) + + assert [ + connect: [_] + ] = A.record(agent) + end + + test "considers a query in progress", %{agent: agent, pool: pool} do + poll(fn -> + assert {:ok, [%{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 1}]} = + DBConnection.get_connection_metrics(pool, pool: DBConnection.Ownership) + end) + + assert :ok = DBConnection.Ownership.ownership_checkout(pool, []) + + parent = self() + + query = + spawn_link(fn -> + Process.put(:agent, agent) + assert P.execute(pool, %Q{}, [:client], caller: parent, pool: DBConnection.Ownership) + end) + + poll(fn -> + assert {:ok, + [ + %{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 0}, + %{source: {:proxy, _}, checkout_queue_length: 0, ready_conn_count: 0} + ]} = + DBConnection.get_connection_metrics(pool, pool: DBConnection.Ownership) + end) + + send(query, :continue) + + poll(fn -> + assert {:ok, + [ + %{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 0}, + %{source: {:proxy, _}, checkout_queue_length: 0, ready_conn_count: 1} + ]} = + DBConnection.get_connection_metrics(pool, pool: DBConnection.Ownership) + end) + + assert :ok = DBConnection.Ownership.ownership_checkin(pool, []) + + poll(fn -> + assert {:ok, + [ + %{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 1} + ]} = + DBConnection.get_connection_metrics(pool, pool: DBConnection.Ownership) + end) + + assert [ + connect: [_], + handle_execute: [%Q{}, [:client], _, :state] + ] = A.record(agent) + end + end + + describe "with `:ownership_mode = :auto`" do + setup do + stack = [ + {:ok, :state}, + fn _, _, _, _ -> + receive do + :continue -> {:ok, %Q{}, %R{}, :state} + end + end, + fn _, _, _, _ -> + receive do + :continue -> {:ok, %Q{}, %R{}, :state} + end + end + ] + + {:ok, agent} = A.start_link(stack) + {:ok, pool} = P.start_link(agent: agent, parent: self(), ownership_mode: :auto, name: :auto) + + {:ok, %{agent: agent, pool: pool}} + end + + test "implicitly checkouts and considers a manual checkin", %{ + pool: pool, + agent: agent + } do + poll(fn -> + assert {:ok, [%{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 1}]} = + DBConnection.get_connection_metrics(pool, pool: DBConnection.Ownership) + end) + + query = + spawn_link(fn -> + Process.put(:agent, agent) + assert P.execute(pool, %Q{}, [:client], pool: DBConnection.Ownership) + DBConnection.Ownership.ownership_checkin(pool, []) + end) + + poll(fn -> + assert {:ok, + [ + %{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 0}, + %{source: {:proxy, _}, checkout_queue_length: 0, ready_conn_count: 0} + ]} = + DBConnection.get_connection_metrics(pool, pool: DBConnection.Ownership) + end) + + send(query, :continue) + + poll(fn -> + assert {:ok, + [ + %{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 1} + ]} = + DBConnection.get_connection_metrics(pool, pool: DBConnection.Ownership) + end) + + assert [ + connect: [_], + handle_execute: [%Q{}, [:client], _, :state] + ] = A.record(agent) + end + + test "implicitly checkouts and checkins in case the proxy dies", %{ + pool: pool, + agent: agent + } do + poll(fn -> + assert {:ok, [%{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 1}]} = + DBConnection.get_connection_metrics(pool, pool: DBConnection.Ownership) + end) + + query = + spawn_link(fn -> + Process.put(:agent, agent) + assert P.execute(pool, %Q{}, [:client], pool: DBConnection.Ownership) + end) + + poll(fn -> + assert {:ok, + [ + %{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 0}, + %{source: {:proxy, _}, checkout_queue_length: 0, ready_conn_count: 0} + ]} = + DBConnection.get_connection_metrics(pool, pool: DBConnection.Ownership) + end) + + send(query, :continue) + + poll(fn -> + assert {:ok, + [ + %{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 1} + ]} = + DBConnection.get_connection_metrics(pool, pool: DBConnection.Ownership) + end) + + assert [ + connect: [_], + handle_execute: [%Q{}, [:client], _, :state] + ] = A.record(agent) + end + + test "implicitly checkouts and enqueues in case the proxy is busy", %{ + pool: pool, + agent: agent + } do + poll(fn -> + assert {:ok, [%{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 1}]} = + DBConnection.get_connection_metrics(pool, pool: DBConnection.Ownership) + end) + + parent = self() + + query_fn = fn -> + Process.put(:agent, agent) + + assert P.execute(pool, %Q{}, [:client], + name: :auto, + caller: parent, + pool: DBConnection.Ownership + ) + end + + query1 = spawn_link(query_fn) + + poll(fn -> + assert {:ok, + [ + %{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 0}, + %{source: {:proxy, _}, checkout_queue_length: 0, ready_conn_count: 0} + ]} = + DBConnection.get_connection_metrics(pool, pool: DBConnection.Ownership) + end) + + query2 = spawn_link(query_fn) + + poll(fn -> + assert {:ok, + [ + %{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 0}, + %{source: {:proxy, _}, checkout_queue_length: 1, ready_conn_count: 0} + ]} = + DBConnection.get_connection_metrics(pool, pool: DBConnection.Ownership) + end) + + send(query1, :continue) + + poll(fn -> + assert {:ok, + [ + %{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 0}, + %{source: {:proxy, _}, checkout_queue_length: 0, ready_conn_count: 0} + ]} = + DBConnection.get_connection_metrics(pool, pool: DBConnection.Ownership) + end) + + send(query2, :continue) + + poll(fn -> + assert {:ok, + [ + %{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 0}, + %{source: {:proxy, _}, checkout_queue_length: 0, ready_conn_count: 1} + ]} = + DBConnection.get_connection_metrics(pool, pool: DBConnection.Ownership) + end) + + assert :ok = DBConnection.Ownership.ownership_checkin(pool, []) + + poll(fn -> + assert {:ok, + [ + %{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 1} + ]} = + DBConnection.get_connection_metrics(pool, pool: DBConnection.Ownership) + end) + + assert [ + connect: [_], + handle_execute: [%Q{}, [:client], _, :state], + handle_execute: [%Q{}, [:client], _, :state] + ] = A.record(agent) + end + end +end diff --git a/lib/db_connection.ex b/lib/db_connection.ex index 482789e..633c178 100644 --- a/lib/db_connection.ex +++ b/lib/db_connection.ex @@ -1285,6 +1285,21 @@ defmodule DBConnection do else: (_ -> :error) end + @doc """ + Returns connection metrics in the shape of `%{ + source: {:pool | :proxy, pid()}, + ready_conn_count: non_neg_integer(), + checkout_queue_length: non_neg_integer() + }` + """ + @spec get_connection_metrics(conn, Keyword.t()) :: + {:ok, [DBConnection.Pool.connection_metrics()]} | :error + + def get_connection_metrics(conn, opts \\ []) do + pool = Keyword.get(opts, :pool, DBConnection.ConnectionPool) + pool.get_connection_metrics(conn) + end + defp pool_pid(%DBConnection{pool_ref: Holder.pool_ref(pool: pid)}), do: pid defp pool_pid(conn), do: GenServer.whereis(conn) diff --git a/lib/db_connection/connection_pool.ex b/lib/db_connection/connection_pool.ex index 8871575..c9569af 100644 --- a/lib/db_connection/connection_pool.ex +++ b/lib/db_connection/connection_pool.ex @@ -40,15 +40,8 @@ defmodule DBConnection.ConnectionPool do GenServer.call(pool, {:disconnect_all, interval}, :infinity) end - @doc """ - Returns connection metrics as a map in the shape of: - - %{ - ready_conn_count: integer(), - checkout_queue_length: integer() - } - - """ + @doc false + @impl DBConnection.Pool def get_connection_metrics(pool) do GenServer.call(pool, :get_connection_metrics) end @@ -97,11 +90,12 @@ defmodule DBConnection.ConnectionPool do end metrics = %{ + source: {:pool, self()}, ready_conn_count: ready_conn_count, checkout_queue_length: checkout_queue_length } - {:reply, metrics, state} + {:reply, {:ok, [metrics]}, state} end def handle_call({:disconnect_all, interval}, _from, {type, queue, codel, _ts}) do diff --git a/lib/db_connection/ownership.ex b/lib/db_connection/ownership.ex index ddef239..26b52f2 100644 --- a/lib/db_connection/ownership.ex +++ b/lib/db_connection/ownership.ex @@ -85,6 +85,10 @@ defmodule DBConnection.Ownership do end end + @doc false + @impl DBConnection.Pool + defdelegate get_connection_metrics(pool), to: Manager + @doc """ Explicitly checks a connection out from the ownership manager. diff --git a/lib/db_connection/ownership/manager.ex b/lib/db_connection/ownership/manager.ex index 0703fdc..154957f 100644 --- a/lib/db_connection/ownership/manager.ex +++ b/lib/db_connection/ownership/manager.ex @@ -59,6 +59,12 @@ defmodule DBConnection.Ownership.Manager do GenServer.call(manager, {:allow, parent, allow}, timeout) end + @spec get_connection_metrics(GenServer.server()) :: + {:ok, [DBConnection.Pool.connection_metrics()]} | :error + def get_connection_metrics(manager) do + GenServer.call(manager, :get_connection_metrics) + end + ## Callbacks @impl true @@ -98,6 +104,38 @@ defmodule DBConnection.Ownership.Manager do end @impl true + def handle_call(:get_connection_metrics, _from, %{pool: pool, owners: owners, log: log} = state) do + {:ok, pool_metrics} = DBConnection.ConnectionPool.get_connection_metrics(pool) + + proxy_metrics = + owners + |> Enum.map(fn {_, {proxy, _, _}} -> + if Process.alive?(proxy) do + GenServer.call(proxy, :get_connection_metrics) + end + end) + |> Enum.reject(&is_nil/1) + + {:reply, {:ok, pool_metrics ++ proxy_metrics}, state} + rescue + error in MatchError -> + if log do + Logger.log(log, "Caught while calling :get_connection_metrics: #{inspect(error)}") + end + + {:reply, :error, state} + catch + :exit, reason -> + if log do + Logger.log( + log, + "Caught :exit while calling :get_connection_metrics due to #{inspect(reason)}" + ) + end + + {:reply, :error, state} + end + def handle_call(:pool, _from, %{pool: pool} = state) do {:reply, pool, state} end diff --git a/lib/db_connection/ownership/proxy.ex b/lib/db_connection/ownership/proxy.ex index 041ce49..0f18d73 100644 --- a/lib/db_connection/ownership/proxy.ex +++ b/lib/db_connection/ownership/proxy.ex @@ -171,6 +171,26 @@ defmodule DBConnection.Ownership.Proxy do down(message, state) end + @impl true + def handle_call( + :get_connection_metrics, + _, + %{queue: queue, holder: holder, client: client} = state + ) do + connection_metrics = %{ + source: {:proxy, self()}, + ready_conn_count: + if is_nil(holder) or not is_nil(client) do + 0 + else + 1 + end, + checkout_queue_length: :queue.len(queue) + } + + {:reply, connection_metrics, state} + end + defp checkout({pid, ref} = from, %{holder: holder} = state) do if Holder.handle_checkout(holder, from, ref, nil) do {:noreply, %{state | client: {pid, ref, pruned_stacktrace(pid)}}} diff --git a/lib/db_connection/pool.ex b/lib/db_connection/pool.ex index 84eb206..bccb770 100644 --- a/lib/db_connection/pool.ex +++ b/lib/db_connection/pool.ex @@ -2,6 +2,11 @@ defmodule DBConnection.Pool do @moduledoc false @type pool :: GenServer.server() + @type connection_metrics :: %{ + source: {:pool | :proxy, pid()}, + ready_conn_count: non_neg_integer(), + checkout_queue_length: non_neg_integer() + } @callback disconnect_all(pool, interval :: term, options :: keyword) :: :ok @@ -9,4 +14,6 @@ defmodule DBConnection.Pool do {:ok, pool_ref :: term, module, checkin_time :: non_neg_integer() | nil, state :: term} | {:error, Exception.t()} + + @callback get_connection_metrics(pool :: pool()) :: {:ok, [connection_metrics()]} | :error end diff --git a/test/test_support.exs b/test/test_support.exs index 55ba8a2..dcbe445 100644 --- a/test/test_support.exs +++ b/test/test_support.exs @@ -1,3 +1,19 @@ +defmodule TestHelpers do + def poll(fun, attempts \\ 5) do + try do + fun.() + rescue + e -> + if attempts > 0 do + :timer.sleep(50) + poll(fun, attempts - 1) + else + reraise e, __STACKTRACE__ + end + end + end +end + defmodule TestConnection do defmacro __using__(opts) do quote do @@ -19,7 +35,7 @@ defmodule TestConnection do end defdelegate rollback(conn, reason), to: DBConnection - defdelegate get_connection_metrics(pool), to: DBConnection.ConnectionPool + defdelegate get_connection_metrics(pool, opts \\ []), to: DBConnection def prepare(pool, query, opts2 \\ []) do DBConnection.prepare(pool, query, opts2 ++ unquote(opts)) From 9543ec95cb5654ba6b4ac7fa6293cbc6320b478d Mon Sep 17 00:00:00 2001 From: Gleb Ivanov Date: Thu, 23 May 2024 17:31:44 +0300 Subject: [PATCH 05/11] Update @doc for DBConnection.get_connection_metrics --- lib/db_connection.ex | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/lib/db_connection.ex b/lib/db_connection.ex index 633c178..456785b 100644 --- a/lib/db_connection.ex +++ b/lib/db_connection.ex @@ -1286,11 +1286,14 @@ defmodule DBConnection do end @doc """ - Returns connection metrics in the shape of `%{ - source: {:pool | :proxy, pid()}, - ready_conn_count: non_neg_integer(), - checkout_queue_length: non_neg_integer() - }` + Returns connection metrics as a map in the shape of: + + %{ + source: {:pool | :proxy, pid()}, + ready_conn_count: non_neg_integer(), + checkout_queue_length: non_neg_integer() + } + """ @spec get_connection_metrics(conn, Keyword.t()) :: {:ok, [DBConnection.Pool.connection_metrics()]} | :error From b177d36342811830f96e01add629eaed5d0e7b4d Mon Sep 17 00:00:00 2001 From: Gleb Ivanov Date: Fri, 24 May 2024 11:45:22 +0300 Subject: [PATCH 06/11] Get rid of an unnecessary rescue MatchError clause --- lib/db_connection/ownership/manager.ex | 7 ------- 1 file changed, 7 deletions(-) diff --git a/lib/db_connection/ownership/manager.ex b/lib/db_connection/ownership/manager.ex index 154957f..b23edf1 100644 --- a/lib/db_connection/ownership/manager.ex +++ b/lib/db_connection/ownership/manager.ex @@ -117,13 +117,6 @@ defmodule DBConnection.Ownership.Manager do |> Enum.reject(&is_nil/1) {:reply, {:ok, pool_metrics ++ proxy_metrics}, state} - rescue - error in MatchError -> - if log do - Logger.log(log, "Caught while calling :get_connection_metrics: #{inspect(error)}") - end - - {:reply, :error, state} catch :exit, reason -> if log do From 876cca3ae4d1917e7c00a7bd635be2d5326f9f23 Mon Sep 17 00:00:00 2001 From: realglebivanov Date: Fri, 24 May 2024 12:43:55 +0300 Subject: [PATCH 07/11] Update lib/db_connection/ownership/manager.ex MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: José Valim --- lib/db_connection/ownership/manager.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/db_connection/ownership/manager.ex b/lib/db_connection/ownership/manager.ex index b23edf1..5922ff2 100644 --- a/lib/db_connection/ownership/manager.ex +++ b/lib/db_connection/ownership/manager.ex @@ -62,7 +62,7 @@ defmodule DBConnection.Ownership.Manager do @spec get_connection_metrics(GenServer.server()) :: {:ok, [DBConnection.Pool.connection_metrics()]} | :error def get_connection_metrics(manager) do - GenServer.call(manager, :get_connection_metrics) + GenServer.call(manager, :get_connection_metrics, :infinity) end ## Callbacks From aca0a658525083fefa6e9076a2b6e8a363b5175f Mon Sep 17 00:00:00 2001 From: realglebivanov Date: Fri, 24 May 2024 12:46:45 +0300 Subject: [PATCH 08/11] Update lib/db_connection/connection_pool.ex MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: José Valim --- lib/db_connection/connection_pool.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/db_connection/connection_pool.ex b/lib/db_connection/connection_pool.ex index c9569af..9a6dd79 100644 --- a/lib/db_connection/connection_pool.ex +++ b/lib/db_connection/connection_pool.ex @@ -43,7 +43,7 @@ defmodule DBConnection.ConnectionPool do @doc false @impl DBConnection.Pool def get_connection_metrics(pool) do - GenServer.call(pool, :get_connection_metrics) + GenServer.call(pool, :get_connection_metrics, :infinity) end ## GenServer api From 0c50b5ece06973b25fc8fda245ffff585dca3185 Mon Sep 17 00:00:00 2001 From: Gleb Ivanov Date: Fri, 24 May 2024 13:02:19 +0300 Subject: [PATCH 09/11] Get rid of a maybe wrapper for DBConnection.get_connection_metrics --- .../connection_pool/metrics_test.exs | 24 ++-- integration_test/ownership/metrics_test.exs | 128 ++++++++---------- lib/db_connection.ex | 9 +- lib/db_connection/connection_pool.ex | 2 +- lib/db_connection/ownership/manager.ex | 26 ++-- lib/db_connection/pool.ex | 2 +- 6 files changed, 88 insertions(+), 103 deletions(-) diff --git a/integration_test/connection_pool/metrics_test.exs b/integration_test/connection_pool/metrics_test.exs index 93519da..4dd15c3 100644 --- a/integration_test/connection_pool/metrics_test.exs +++ b/integration_test/connection_pool/metrics_test.exs @@ -22,7 +22,7 @@ defmodule TestPoolMetrics do {:ok, pool} = P.start_link(agent: agent, parent: self()) poll(fn -> - assert {:ok, [%{source: {:pool, ^pool}, checkout_queue_length: 0, ready_conn_count: 1}]} = + assert [%{source: {:pool, ^pool}, checkout_queue_length: 0, ready_conn_count: 1}] = DBConnection.get_connection_metrics(pool) end) @@ -33,14 +33,14 @@ defmodule TestPoolMetrics do end) poll(fn -> - assert {:ok, [%{source: {:pool, ^pool}, checkout_queue_length: 0, ready_conn_count: 0}]} = + assert [%{source: {:pool, ^pool}, checkout_queue_length: 0, ready_conn_count: 0}] = DBConnection.get_connection_metrics(pool) end) send(query, :continue) poll(fn -> - assert {:ok, [%{source: {:pool, ^pool}, checkout_queue_length: 0, ready_conn_count: 1}]} = + assert [%{source: {:pool, ^pool}, checkout_queue_length: 0, ready_conn_count: 1}] = DBConnection.get_connection_metrics(pool) end) @@ -62,14 +62,14 @@ defmodule TestPoolMetrics do {:ok, pool} = P.start_link(agent: agent, parent: self(), idle_interval: idle_interval) poll(fn -> - assert {:ok, [%{source: {:pool, ^pool}, checkout_queue_length: 0, ready_conn_count: 1}]} = + assert [%{source: {:pool, ^pool}, checkout_queue_length: 0, ready_conn_count: 1}] = DBConnection.get_connection_metrics(pool) end) :timer.sleep(idle_interval) poll(fn -> - assert {:ok, [%{source: {:pool, ^pool}, checkout_queue_length: 0, ready_conn_count: 1}]} = + assert [%{source: {:pool, ^pool}, checkout_queue_length: 0, ready_conn_count: 1}] = DBConnection.get_connection_metrics(pool) end) @@ -104,7 +104,7 @@ defmodule TestPoolMetrics do {:ok, pool} = P.start_link(agent: agent, parent: self(), pool_size: 2) poll(fn -> - assert {:ok, [%{source: {:pool, ^pool}, checkout_queue_length: 0, ready_conn_count: 2}]} = + assert [%{source: {:pool, ^pool}, checkout_queue_length: 0, ready_conn_count: 2}] = DBConnection.get_connection_metrics(pool) end) @@ -118,21 +118,21 @@ defmodule TestPoolMetrics do queries = [run_query.()] poll(fn -> - assert {:ok, [%{source: {:pool, ^pool}, checkout_queue_length: 0, ready_conn_count: 1}]} = + assert [%{source: {:pool, ^pool}, checkout_queue_length: 0, ready_conn_count: 1}] = DBConnection.get_connection_metrics(pool) end) queries = [run_query.() | queries] poll(fn -> - assert {:ok, [%{source: {:pool, ^pool}, checkout_queue_length: 0, ready_conn_count: 0}]} = + assert [%{source: {:pool, ^pool}, checkout_queue_length: 0, ready_conn_count: 0}] = DBConnection.get_connection_metrics(pool) end) queries = [run_query.() | queries] poll(fn -> - assert {:ok, [%{source: {:pool, ^pool}, checkout_queue_length: 1, ready_conn_count: 0}]} = + assert [%{source: {:pool, ^pool}, checkout_queue_length: 1, ready_conn_count: 0}] = DBConnection.get_connection_metrics(pool) end) @@ -140,21 +140,21 @@ defmodule TestPoolMetrics do send(query1, :continue) poll(fn -> - assert {:ok, [%{source: {:pool, ^pool}, checkout_queue_length: 0, ready_conn_count: 0}]} = + assert [%{source: {:pool, ^pool}, checkout_queue_length: 0, ready_conn_count: 0}] = DBConnection.get_connection_metrics(pool) end) send(query2, :continue) poll(fn -> - assert {:ok, [%{source: {:pool, ^pool}, checkout_queue_length: 0, ready_conn_count: 1}]} = + assert [%{source: {:pool, ^pool}, checkout_queue_length: 0, ready_conn_count: 1}] = DBConnection.get_connection_metrics(pool) end) send(query3, :continue) poll(fn -> - assert {:ok, [%{source: {:pool, ^pool}, checkout_queue_length: 0, ready_conn_count: 2}]} = + assert [%{source: {:pool, ^pool}, checkout_queue_length: 0, ready_conn_count: 2}] = DBConnection.get_connection_metrics(pool) end) diff --git a/integration_test/ownership/metrics_test.exs b/integration_test/ownership/metrics_test.exs index 649a9fa..d58abfd 100644 --- a/integration_test/ownership/metrics_test.exs +++ b/integration_test/ownership/metrics_test.exs @@ -17,7 +17,7 @@ defmodule TestOwnershipMetrics do {:ok, pool} = P.start_link(agent: agent, parent: self()) poll(fn -> - assert {:ok, [%{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 1}]} = + assert [%{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 1}] = DBConnection.get_connection_metrics(pool, pool: DBConnection.Ownership) end) @@ -50,28 +50,26 @@ defmodule TestOwnershipMetrics do test "considers a manual checkin and checkout", %{agent: agent, pool: pool} do poll(fn -> - assert {:ok, [%{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 1}]} = + assert [%{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 1}] = DBConnection.get_connection_metrics(pool, pool: DBConnection.Ownership) end) assert :ok = DBConnection.Ownership.ownership_checkout(pool, []) poll(fn -> - assert {:ok, - [ - %{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 0}, - %{source: {:proxy, _}, checkout_queue_length: 0, ready_conn_count: 1} - ]} = + assert [ + %{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 0}, + %{source: {:proxy, _}, checkout_queue_length: 0, ready_conn_count: 1} + ] = DBConnection.get_connection_metrics(pool, pool: DBConnection.Ownership) end) assert :ok = DBConnection.Ownership.ownership_checkin(pool, []) poll(fn -> - assert {:ok, - [ - %{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 1} - ]} = + assert [ + %{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 1} + ] = DBConnection.get_connection_metrics(pool, pool: DBConnection.Ownership) end) @@ -82,7 +80,7 @@ defmodule TestOwnershipMetrics do test "considers a query in progress", %{agent: agent, pool: pool} do poll(fn -> - assert {:ok, [%{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 1}]} = + assert [%{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 1}] = DBConnection.get_connection_metrics(pool, pool: DBConnection.Ownership) end) @@ -97,32 +95,29 @@ defmodule TestOwnershipMetrics do end) poll(fn -> - assert {:ok, - [ - %{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 0}, - %{source: {:proxy, _}, checkout_queue_length: 0, ready_conn_count: 0} - ]} = + assert [ + %{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 0}, + %{source: {:proxy, _}, checkout_queue_length: 0, ready_conn_count: 0} + ] = DBConnection.get_connection_metrics(pool, pool: DBConnection.Ownership) end) send(query, :continue) poll(fn -> - assert {:ok, - [ - %{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 0}, - %{source: {:proxy, _}, checkout_queue_length: 0, ready_conn_count: 1} - ]} = + assert [ + %{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 0}, + %{source: {:proxy, _}, checkout_queue_length: 0, ready_conn_count: 1} + ] = DBConnection.get_connection_metrics(pool, pool: DBConnection.Ownership) end) assert :ok = DBConnection.Ownership.ownership_checkin(pool, []) poll(fn -> - assert {:ok, - [ - %{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 1} - ]} = + assert [ + %{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 1} + ] = DBConnection.get_connection_metrics(pool, pool: DBConnection.Ownership) end) @@ -160,7 +155,7 @@ defmodule TestOwnershipMetrics do agent: agent } do poll(fn -> - assert {:ok, [%{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 1}]} = + assert [%{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 1}] = DBConnection.get_connection_metrics(pool, pool: DBConnection.Ownership) end) @@ -172,21 +167,19 @@ defmodule TestOwnershipMetrics do end) poll(fn -> - assert {:ok, - [ - %{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 0}, - %{source: {:proxy, _}, checkout_queue_length: 0, ready_conn_count: 0} - ]} = + assert [ + %{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 0}, + %{source: {:proxy, _}, checkout_queue_length: 0, ready_conn_count: 0} + ] = DBConnection.get_connection_metrics(pool, pool: DBConnection.Ownership) end) send(query, :continue) poll(fn -> - assert {:ok, - [ - %{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 1} - ]} = + assert [ + %{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 1} + ] = DBConnection.get_connection_metrics(pool, pool: DBConnection.Ownership) end) @@ -201,7 +194,7 @@ defmodule TestOwnershipMetrics do agent: agent } do poll(fn -> - assert {:ok, [%{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 1}]} = + assert [%{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 1}] = DBConnection.get_connection_metrics(pool, pool: DBConnection.Ownership) end) @@ -212,21 +205,19 @@ defmodule TestOwnershipMetrics do end) poll(fn -> - assert {:ok, - [ - %{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 0}, - %{source: {:proxy, _}, checkout_queue_length: 0, ready_conn_count: 0} - ]} = + assert [ + %{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 0}, + %{source: {:proxy, _}, checkout_queue_length: 0, ready_conn_count: 0} + ] = DBConnection.get_connection_metrics(pool, pool: DBConnection.Ownership) end) send(query, :continue) poll(fn -> - assert {:ok, - [ - %{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 1} - ]} = + assert [ + %{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 1} + ] = DBConnection.get_connection_metrics(pool, pool: DBConnection.Ownership) end) @@ -241,7 +232,7 @@ defmodule TestOwnershipMetrics do agent: agent } do poll(fn -> - assert {:ok, [%{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 1}]} = + assert [%{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 1}] = DBConnection.get_connection_metrics(pool, pool: DBConnection.Ownership) end) @@ -260,54 +251,49 @@ defmodule TestOwnershipMetrics do query1 = spawn_link(query_fn) poll(fn -> - assert {:ok, - [ - %{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 0}, - %{source: {:proxy, _}, checkout_queue_length: 0, ready_conn_count: 0} - ]} = + assert [ + %{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 0}, + %{source: {:proxy, _}, checkout_queue_length: 0, ready_conn_count: 0} + ] = DBConnection.get_connection_metrics(pool, pool: DBConnection.Ownership) end) query2 = spawn_link(query_fn) poll(fn -> - assert {:ok, - [ - %{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 0}, - %{source: {:proxy, _}, checkout_queue_length: 1, ready_conn_count: 0} - ]} = + assert [ + %{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 0}, + %{source: {:proxy, _}, checkout_queue_length: 1, ready_conn_count: 0} + ] = DBConnection.get_connection_metrics(pool, pool: DBConnection.Ownership) end) send(query1, :continue) poll(fn -> - assert {:ok, - [ - %{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 0}, - %{source: {:proxy, _}, checkout_queue_length: 0, ready_conn_count: 0} - ]} = + assert [ + %{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 0}, + %{source: {:proxy, _}, checkout_queue_length: 0, ready_conn_count: 0} + ] = DBConnection.get_connection_metrics(pool, pool: DBConnection.Ownership) end) send(query2, :continue) poll(fn -> - assert {:ok, - [ - %{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 0}, - %{source: {:proxy, _}, checkout_queue_length: 0, ready_conn_count: 1} - ]} = + assert [ + %{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 0}, + %{source: {:proxy, _}, checkout_queue_length: 0, ready_conn_count: 1} + ] = DBConnection.get_connection_metrics(pool, pool: DBConnection.Ownership) end) assert :ok = DBConnection.Ownership.ownership_checkin(pool, []) poll(fn -> - assert {:ok, - [ - %{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 1} - ]} = + assert [ + %{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 1} + ] = DBConnection.get_connection_metrics(pool, pool: DBConnection.Ownership) end) diff --git a/lib/db_connection.ex b/lib/db_connection.ex index 456785b..1338029 100644 --- a/lib/db_connection.ex +++ b/lib/db_connection.ex @@ -1286,17 +1286,16 @@ defmodule DBConnection do end @doc """ - Returns connection metrics as a map in the shape of: + Returns connection metrics as a list in the shape of: - %{ + [%{ source: {:pool | :proxy, pid()}, ready_conn_count: non_neg_integer(), checkout_queue_length: non_neg_integer() - } + }] """ - @spec get_connection_metrics(conn, Keyword.t()) :: - {:ok, [DBConnection.Pool.connection_metrics()]} | :error + @spec get_connection_metrics(conn, Keyword.t()) :: [DBConnection.Pool.connection_metrics()] def get_connection_metrics(conn, opts \\ []) do pool = Keyword.get(opts, :pool, DBConnection.ConnectionPool) diff --git a/lib/db_connection/connection_pool.ex b/lib/db_connection/connection_pool.ex index 9a6dd79..7b15529 100644 --- a/lib/db_connection/connection_pool.ex +++ b/lib/db_connection/connection_pool.ex @@ -95,7 +95,7 @@ defmodule DBConnection.ConnectionPool do checkout_queue_length: checkout_queue_length } - {:reply, {:ok, [metrics]}, state} + {:reply, [metrics], state} end def handle_call({:disconnect_all, interval}, _from, {type, queue, codel, _ts}) do diff --git a/lib/db_connection/ownership/manager.ex b/lib/db_connection/ownership/manager.ex index 5922ff2..91a435c 100644 --- a/lib/db_connection/ownership/manager.ex +++ b/lib/db_connection/ownership/manager.ex @@ -105,28 +105,28 @@ defmodule DBConnection.Ownership.Manager do @impl true def handle_call(:get_connection_metrics, _from, %{pool: pool, owners: owners, log: log} = state) do - {:ok, pool_metrics} = DBConnection.ConnectionPool.get_connection_metrics(pool) + pool_metrics = DBConnection.ConnectionPool.get_connection_metrics(pool) proxy_metrics = owners |> Enum.map(fn {_, {proxy, _, _}} -> - if Process.alive?(proxy) do + try do GenServer.call(proxy, :get_connection_metrics) + catch + :exit, reason -> + if log do + Logger.log( + log, + "Caught :exit while calling :get_connection_metrics due to #{inspect(reason)}" + ) + end + + nil end end) |> Enum.reject(&is_nil/1) - {:reply, {:ok, pool_metrics ++ proxy_metrics}, state} - catch - :exit, reason -> - if log do - Logger.log( - log, - "Caught :exit while calling :get_connection_metrics due to #{inspect(reason)}" - ) - end - - {:reply, :error, state} + {:reply, pool_metrics ++ proxy_metrics, state} end def handle_call(:pool, _from, %{pool: pool} = state) do diff --git a/lib/db_connection/pool.ex b/lib/db_connection/pool.ex index bccb770..3f2242c 100644 --- a/lib/db_connection/pool.ex +++ b/lib/db_connection/pool.ex @@ -15,5 +15,5 @@ defmodule DBConnection.Pool do state :: term} | {:error, Exception.t()} - @callback get_connection_metrics(pool :: pool()) :: {:ok, [connection_metrics()]} | :error + @callback get_connection_metrics(pool :: pool()) :: [connection_metrics()] end From 076ce8e835131b1eb523b807715e2dda77f2930c Mon Sep 17 00:00:00 2001 From: Gleb Ivanov Date: Fri, 24 May 2024 13:27:04 +0300 Subject: [PATCH 10/11] Run mix format --- integration_test/ownership/metrics_test.exs | 42 +++++++-------------- 1 file changed, 14 insertions(+), 28 deletions(-) diff --git a/integration_test/ownership/metrics_test.exs b/integration_test/ownership/metrics_test.exs index d58abfd..4ce450c 100644 --- a/integration_test/ownership/metrics_test.exs +++ b/integration_test/ownership/metrics_test.exs @@ -60,8 +60,7 @@ defmodule TestOwnershipMetrics do assert [ %{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 0}, %{source: {:proxy, _}, checkout_queue_length: 0, ready_conn_count: 1} - ] = - DBConnection.get_connection_metrics(pool, pool: DBConnection.Ownership) + ] = DBConnection.get_connection_metrics(pool, pool: DBConnection.Ownership) end) assert :ok = DBConnection.Ownership.ownership_checkin(pool, []) @@ -69,8 +68,7 @@ defmodule TestOwnershipMetrics do poll(fn -> assert [ %{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 1} - ] = - DBConnection.get_connection_metrics(pool, pool: DBConnection.Ownership) + ] = DBConnection.get_connection_metrics(pool, pool: DBConnection.Ownership) end) assert [ @@ -98,8 +96,7 @@ defmodule TestOwnershipMetrics do assert [ %{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 0}, %{source: {:proxy, _}, checkout_queue_length: 0, ready_conn_count: 0} - ] = - DBConnection.get_connection_metrics(pool, pool: DBConnection.Ownership) + ] = DBConnection.get_connection_metrics(pool, pool: DBConnection.Ownership) end) send(query, :continue) @@ -108,8 +105,7 @@ defmodule TestOwnershipMetrics do assert [ %{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 0}, %{source: {:proxy, _}, checkout_queue_length: 0, ready_conn_count: 1} - ] = - DBConnection.get_connection_metrics(pool, pool: DBConnection.Ownership) + ] = DBConnection.get_connection_metrics(pool, pool: DBConnection.Ownership) end) assert :ok = DBConnection.Ownership.ownership_checkin(pool, []) @@ -117,8 +113,7 @@ defmodule TestOwnershipMetrics do poll(fn -> assert [ %{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 1} - ] = - DBConnection.get_connection_metrics(pool, pool: DBConnection.Ownership) + ] = DBConnection.get_connection_metrics(pool, pool: DBConnection.Ownership) end) assert [ @@ -170,8 +165,7 @@ defmodule TestOwnershipMetrics do assert [ %{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 0}, %{source: {:proxy, _}, checkout_queue_length: 0, ready_conn_count: 0} - ] = - DBConnection.get_connection_metrics(pool, pool: DBConnection.Ownership) + ] = DBConnection.get_connection_metrics(pool, pool: DBConnection.Ownership) end) send(query, :continue) @@ -179,8 +173,7 @@ defmodule TestOwnershipMetrics do poll(fn -> assert [ %{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 1} - ] = - DBConnection.get_connection_metrics(pool, pool: DBConnection.Ownership) + ] = DBConnection.get_connection_metrics(pool, pool: DBConnection.Ownership) end) assert [ @@ -208,8 +201,7 @@ defmodule TestOwnershipMetrics do assert [ %{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 0}, %{source: {:proxy, _}, checkout_queue_length: 0, ready_conn_count: 0} - ] = - DBConnection.get_connection_metrics(pool, pool: DBConnection.Ownership) + ] = DBConnection.get_connection_metrics(pool, pool: DBConnection.Ownership) end) send(query, :continue) @@ -217,8 +209,7 @@ defmodule TestOwnershipMetrics do poll(fn -> assert [ %{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 1} - ] = - DBConnection.get_connection_metrics(pool, pool: DBConnection.Ownership) + ] = DBConnection.get_connection_metrics(pool, pool: DBConnection.Ownership) end) assert [ @@ -254,8 +245,7 @@ defmodule TestOwnershipMetrics do assert [ %{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 0}, %{source: {:proxy, _}, checkout_queue_length: 0, ready_conn_count: 0} - ] = - DBConnection.get_connection_metrics(pool, pool: DBConnection.Ownership) + ] = DBConnection.get_connection_metrics(pool, pool: DBConnection.Ownership) end) query2 = spawn_link(query_fn) @@ -264,8 +254,7 @@ defmodule TestOwnershipMetrics do assert [ %{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 0}, %{source: {:proxy, _}, checkout_queue_length: 1, ready_conn_count: 0} - ] = - DBConnection.get_connection_metrics(pool, pool: DBConnection.Ownership) + ] = DBConnection.get_connection_metrics(pool, pool: DBConnection.Ownership) end) send(query1, :continue) @@ -274,8 +263,7 @@ defmodule TestOwnershipMetrics do assert [ %{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 0}, %{source: {:proxy, _}, checkout_queue_length: 0, ready_conn_count: 0} - ] = - DBConnection.get_connection_metrics(pool, pool: DBConnection.Ownership) + ] = DBConnection.get_connection_metrics(pool, pool: DBConnection.Ownership) end) send(query2, :continue) @@ -284,8 +272,7 @@ defmodule TestOwnershipMetrics do assert [ %{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 0}, %{source: {:proxy, _}, checkout_queue_length: 0, ready_conn_count: 1} - ] = - DBConnection.get_connection_metrics(pool, pool: DBConnection.Ownership) + ] = DBConnection.get_connection_metrics(pool, pool: DBConnection.Ownership) end) assert :ok = DBConnection.Ownership.ownership_checkin(pool, []) @@ -293,8 +280,7 @@ defmodule TestOwnershipMetrics do poll(fn -> assert [ %{source: {:pool, _}, checkout_queue_length: 0, ready_conn_count: 1} - ] = - DBConnection.get_connection_metrics(pool, pool: DBConnection.Ownership) + ] = DBConnection.get_connection_metrics(pool, pool: DBConnection.Ownership) end) assert [ From 7c68eeb2e416dea96324d336602e4caf4026f012 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Valim?= Date: Fri, 24 May 2024 14:02:30 +0200 Subject: [PATCH 11/11] Update lib/db_connection.ex --- lib/db_connection.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/db_connection.ex b/lib/db_connection.ex index 1338029..b56fb1b 100644 --- a/lib/db_connection.ex +++ b/lib/db_connection.ex @@ -1286,7 +1286,7 @@ defmodule DBConnection do end @doc """ - Returns connection metrics as a list in the shape of: + Returns connection metrics as a list in the shape of: [%{ source: {:pool | :proxy, pid()},