diff --git a/integration_test/connection_pool/metrics_test.exs b/integration_test/connection_pool/metrics_test.exs new file mode 100644 index 0000000..734196d --- /dev/null +++ b/integration_test/connection_pool/metrics_test.exs @@ -0,0 +1,169 @@ +defmodule TestPoolMetrics do + use ExUnit.Case, async: true + + alias TestPool, as: P + alias TestAgent, as: A + alias TestQuery, as: Q + alias TestResult, as: R + + test "considers checkin/checkout in metrics when ready" do + stack = [ + {:ok, :state}, + fn _, _, _, _ -> + receive do + :continue -> {:ok, %Q{}, %R{}, :state} + end + end + ] + + {:ok, agent} = A.start_link(stack) + {:ok, pool} = P.start_link(agent: agent, parent: self()) + + poll(fn -> + assert %{checkout_queue_length: 0, ready_conn_count: 1} = P.get_connection_metrics(pool) + end) + + query = + spawn_link(fn -> + Process.put(:agent, agent) + P.execute(pool, %Q{}, [:client]) + end) + + poll(fn -> + assert %{checkout_queue_length: 0, ready_conn_count: 0} = P.get_connection_metrics(pool) + end) + + send(query, :continue) + + poll(fn -> + assert %{checkout_queue_length: 0, ready_conn_count: 1} = P.get_connection_metrics(pool) + end) + + assert [ + connect: [_], + handle_execute: [%Q{}, [:client], _, :state] + ] = A.record(agent) + end + + test "considers idle connections on idle timeout in metrics" do + stack = [ + {:ok, :state}, + {:ok, :state} + ] + + idle_interval = 100 + + {:ok, agent} = A.start_link(stack) + {:ok, pool} = P.start_link(agent: agent, parent: self(), idle_interval: idle_interval) + + poll(fn -> + assert %{checkout_queue_length: 0, ready_conn_count: 1} = P.get_connection_metrics(pool) + end) + + :timer.sleep(idle_interval) + + poll(fn -> + assert %{checkout_queue_length: 0, ready_conn_count: 1} = P.get_connection_metrics(pool) + end) + + assert [ + connect: [_], + ping: [:state] + ] = A.record(agent) + end + + test "considers enqueue/dequeue in metrics when busy" do + stack = [ + {:ok, :state}, + {:ok, :state}, + fn _, _, _, _ -> + receive do + :continue -> {:ok, %Q{}, %R{}, :state} + end + end, + fn _, _, _, _ -> + receive do + :continue -> {:ok, %Q{}, %R{}, :state} + end + end, + fn _, _, _, _ -> + receive do + :continue -> {:ok, %Q{}, %R{}, :state} + end + end + ] + + {:ok, agent} = A.start_link(stack) + {:ok, pool} = P.start_link(agent: agent, parent: self(), pool_size: 2) + + poll(fn -> + assert %{checkout_queue_length: 0, ready_conn_count: 2} = P.get_connection_metrics(pool) + end) + + run_query = fn -> + spawn_link(fn -> + Process.put(:agent, agent) + assert P.execute(pool, %Q{}, [:client]) + end) + end + + queries = [run_query.()] + + poll(fn -> + assert %{checkout_queue_length: 0, ready_conn_count: 1} = P.get_connection_metrics(pool) + end) + + queries = [run_query.() | queries] + + poll(fn -> + assert %{checkout_queue_length: 0, ready_conn_count: 0} = P.get_connection_metrics(pool) + end) + + queries = [run_query.() | queries] + + poll(fn -> + assert %{checkout_queue_length: 1, ready_conn_count: 0} = P.get_connection_metrics(pool) + end) + + [query3, query2, query1] = queries + send(query1, :continue) + + poll(fn -> + assert %{checkout_queue_length: 0, ready_conn_count: 0} = P.get_connection_metrics(pool) + end) + + send(query2, :continue) + + poll(fn -> + assert %{checkout_queue_length: 0, ready_conn_count: 1} = P.get_connection_metrics(pool) + end) + + send(query3, :continue) + + poll(fn -> + assert %{checkout_queue_length: 0, ready_conn_count: 2} = P.get_connection_metrics(pool) + end) + + assert [ + connect: [_], + connect: [_], + handle_execute: [%Q{}, [:client], _, :state], + handle_execute: [%Q{}, [:client], _, :state], + handle_execute: [%Q{}, [:client], _, :state] + ] = A.record(agent) + end + + def poll(fun, attempts \\ 5) do + try do + fun.() + rescue + e -> + if attempts > 0 do + :timer.sleep(50) + poll(fun, attempts - 1) + else + reraise e, __STACKTRACE__ + end + end + end +end diff --git a/integration_test/connection_pool/telemetry_test.exs b/integration_test/connection_pool/telemetry_test.exs deleted file mode 100644 index d298ce4..0000000 --- a/integration_test/connection_pool/telemetry_test.exs +++ /dev/null @@ -1,154 +0,0 @@ -defmodule TestPoolTelemetry do - use ExUnit.Case, async: true - - alias TestPool, as: P - alias TestAgent, as: A - alias TestQuery, as: Q - alias TestResult, as: R - - @events [ - [:db_connection, :enqueue], - [:db_connection, :dequeue], - [:db_connection, :checkout], - [:db_connection, :checkin], - [:db_connection, :drop_idle] - ] - - setup do - telemetry_ref = make_ref() - - :telemetry.attach_many( - telemetry_ref, - @events, - fn event, measurements, metadata, {ref, dest_pid} -> - send(dest_pid, {event, ref, measurements, metadata}) - end, - {telemetry_ref, self()} - ) - - on_exit(fn -> - :telemetry.detach(telemetry_ref) - end) - - %{telemetry_ref: telemetry_ref} - end - - test "checkin/checkout telemetry when ready", %{telemetry_ref: telemetry_ref} do - stack = [ - {:ok, :state}, - {:ok, %Q{}, %R{}, :state} - ] - - {:ok, agent} = A.start_link(stack) - {:ok, pool} = P.start_link(agent: agent, parent: self()) - - assert_receive {[:db_connection, :checkin], ^telemetry_ref, %{count: 1}, - %{checkout_queue_length: 0, ready_conn_count: 1}} - - assert P.execute(pool, %Q{}, [:client]) - - assert_receive {[:db_connection, :checkout], ^telemetry_ref, %{count: 1}, - %{checkout_queue_length: 0, ready_conn_count: 0}} - - assert_receive {[:db_connection, :checkin], ^telemetry_ref, %{count: 1}, - %{checkout_queue_length: 0, ready_conn_count: 1}} - - assert [ - connect: [_], - handle_execute: [%Q{}, [:client], _, :state] - ] = A.record(agent) - end - - test "idle telemetry on idle timeout", %{telemetry_ref: telemetry_ref} do - stack = [ - {:ok, :state}, - {:ok, :state} - ] - - idle_interval = 100 - - {:ok, agent} = A.start_link(stack) - {:ok, _} = P.start_link(agent: agent, parent: self(), idle_interval: idle_interval) - - assert_receive {[:db_connection, :checkin], ^telemetry_ref, %{count: 1}, - %{checkout_queue_length: 0, ready_conn_count: 1}} - - assert_receive {[:db_connection, :drop_idle], ^telemetry_ref, %{count: 1}, - %{checkout_queue_length: 0, ready_conn_count: 0}}, - idle_interval - - assert [{:connect, [_]} | _] = A.record(agent) - end - - test "enqueue/dequeue telemetry when busy", %{telemetry_ref: telemetry_ref} do - stack = [ - {:ok, :state}, - {:ok, :state}, - fn _, _, _, _ -> - receive do - :continue -> {:ok, %Q{}, %R{}, :state} - end - end, - fn _, _, _, _ -> - receive do - :continue -> {:ok, %Q{}, %R{}, :state} - end - end, - fn _, _, _, _ -> - receive do - :continue -> {:ok, %Q{}, %R{}, :state} - end - end - ] - - {:ok, agent} = A.start_link(stack) - {:ok, pool} = P.start_link(agent: agent, parent: self(), pool_size: 2) - - assert_receive {[:db_connection, :checkin], ^telemetry_ref, %{count: 1}, - %{checkout_queue_length: 0, ready_conn_count: 1}} - - assert_receive {[:db_connection, :checkin], ^telemetry_ref, %{count: 1}, - %{checkout_queue_length: 0, ready_conn_count: 2}} - - fn -> - spawn_link(fn -> - Process.put(:agent, agent) - assert P.execute(pool, %Q{}, [:client]) - end) - end - |> Stream.repeatedly() - |> Enum.take(3) - |> Enum.each(fn pid -> - send(pid, :continue) - end) - - assert_receive {[:db_connection, :checkout], ^telemetry_ref, %{count: 1}, - %{checkout_queue_length: 0, ready_conn_count: 1}} - - assert_receive {[:db_connection, :checkout], ^telemetry_ref, %{count: 1}, - %{checkout_queue_length: 0, ready_conn_count: 0}} - - assert_receive {[:db_connection, :enqueue], ^telemetry_ref, %{count: 1}, - %{checkout_queue_length: 1, ready_conn_count: 0}} - - assert_receive {[:db_connection, :dequeue], ^telemetry_ref, %{count: 1}, - %{checkout_queue_length: 0, ready_conn_count: 0}} - - assert_receive {[:db_connection, :checkout], ^telemetry_ref, %{count: 1}, - %{checkout_queue_length: 0, ready_conn_count: 0}} - - assert_receive {[:db_connection, :checkin], ^telemetry_ref, %{count: 1}, - %{checkout_queue_length: 0, ready_conn_count: 1}} - - assert_receive {[:db_connection, :checkin], ^telemetry_ref, %{count: 1}, - %{checkout_queue_length: 0, ready_conn_count: 2}} - - assert [ - connect: [_], - connect: [_], - handle_execute: [%Q{}, [:client], _, :state], - handle_execute: [%Q{}, [:client], _, :state], - handle_execute: [%Q{}, [:client], _, :state] - ] = A.record(agent) - end -end diff --git a/lib/db_connection/connection_pool.ex b/lib/db_connection/connection_pool.ex index ac663e2..c65e1ce 100644 --- a/lib/db_connection/connection_pool.ex +++ b/lib/db_connection/connection_pool.ex @@ -40,6 +40,16 @@ defmodule DBConnection.ConnectionPool do GenServer.call(pool, {:disconnect_all, interval}, :infinity) end + @doc """ + Returns connection metrics in the shape of %{ + ready_conn_count: N, + checkout_queue_length: N + } + """ + def get_connection_metrics(pool) do + GenServer.call(pool, :get_connection_metrics) + end + ## GenServer api @impl GenServer @@ -73,9 +83,27 @@ defmodule DBConnection.ConnectionPool do end @impl GenServer - def handle_call({:disconnect_all, interval}, _from, {status, queue, codel, _ts}) do + def handle_call(:get_connection_metrics, _from, {status, queue, _, _} = state) do + {ready_conn_count, checkout_queue_length} = + case status do + :busy -> + {0, :ets.select_count(queue, [{{{:_, :_, :_}}, [], [true]}])} + + :ready -> + {:ets.select_count(queue, [{{{:_, :_}}, [], [true]}]), 0} + end + + metrics = %{ + ready_conn_count: ready_conn_count, + checkout_queue_length: checkout_queue_length + } + + {:reply, metrics, state} + end + + def handle_call({:disconnect_all, interval}, _from, {type, queue, codel, _ts}) do ts = {System.monotonic_time(), interval} - {:reply, :ok, {status, queue, codel, ts}} + {:reply, :ok, {type, queue, codel, ts}} end @impl GenServer @@ -86,7 +114,6 @@ defmodule DBConnection.ConnectionPool do case queue? do true -> :ets.insert(queue, {{now, System.unique_integer(), from}}) - execute_queue_telemetry(:busy, queue, :enqueue) {:noreply, busy} false -> @@ -103,11 +130,7 @@ defmodule DBConnection.ConnectionPool do ) do case :ets.first(queue) do {queued_in_native, holder} = key -> - if Holder.handle_checkout(holder, from, queue, queued_in_native) and - :ets.delete(queue, key) do - execute_queue_telemetry(:ready, queue, :checkout) - end - + Holder.handle_checkout(holder, from, queue, queued_in_native) and :ets.delete(queue, key) {:noreply, ready} :"$end_of_table" -> @@ -204,9 +227,6 @@ defmodule DBConnection.ConnectionPool do :ets.first(queue) do :ets.delete(queue, key) Holder.maybe_disconnect(holder, elem(ts, 0), 0) or Holder.handle_ping(holder) - - execute_queue_telemetry(status, queue, :drop_idle) - drop_idle(past_in_native, limit - 1, status, queue, codel, ts) else _ -> @@ -237,20 +257,15 @@ defmodule DBConnection.ConnectionPool do guards = [{:<, :"$1", min_sent}] select_slow = [{match, guards, [{{:"$1", :"$2"}}]}] - slow = :ets.select(queue, select_slow) - - for {sent, from} <- slow do + for {sent, from} <- :ets.select(queue, select_slow) do drop(time - sent, from) end :ets.select_delete(queue, [{match, guards, [true]}]) - - execute_queue_telemetry(:busy, queue, :dequeue, length(slow)) end defp handle_checkin(holder, now_in_native, {:ready, queue, _, _} = data) do :ets.insert(queue, {{now_in_native, holder}}) - execute_queue_telemetry(:ready, queue, :checkin) {:noreply, data} end @@ -263,7 +278,6 @@ defmodule DBConnection.ConnectionPool do {:ready, _, _, _} = ready -> :ets.insert(queue, {{now_in_native, holder}}) - execute_queue_telemetry(:ready, queue, :checkin) {:noreply, ready} end end @@ -288,7 +302,6 @@ defmodule DBConnection.ConnectionPool do case :ets.first(queue) do {sent, _, from} = key -> :ets.delete(queue, key) - execute_queue_telemetry(:busy, queue, :dequeue) delay = time - sent codel = %{codel | next: next, delay: delay, slow: slow?} go(delay, from, time, holder, queue, codel, ts) @@ -303,7 +316,6 @@ defmodule DBConnection.ConnectionPool do case :ets.first(queue) do {sent, _, from} = key -> :ets.delete(queue, key) - execute_queue_telemetry(:busy, queue, :dequeue) go(time - sent, from, time, holder, queue, codel, ts) :"$end_of_table" -> @@ -315,13 +327,11 @@ defmodule DBConnection.ConnectionPool do case :ets.first(queue) do {sent, _, from} = key when time - sent > timeout -> :ets.delete(queue, key) - execute_queue_telemetry(:busy, queue, :dequeue) drop(time - sent, from) dequeue_slow(time, timeout, holder, queue, codel, ts) {sent, _, from} = key -> :ets.delete(queue, key) - execute_queue_telemetry(:busy, queue, :dequeue) go(time - sent, from, time, holder, queue, codel, ts) :"$end_of_table" -> @@ -332,11 +342,9 @@ defmodule DBConnection.ConnectionPool do defp go(delay, from, time, holder, queue, %{delay: min} = codel, ts) do case Holder.handle_checkout(holder, from, queue, 0) do true when delay < min -> - execute_queue_telemetry(:busy, queue, :checkout) {:busy, queue, %{codel | delay: delay}, ts} true -> - execute_queue_telemetry(:busy, queue, :checkout) {:busy, queue, codel, ts} false -> @@ -378,23 +386,4 @@ defmodule DBConnection.ConnectionPool do idle = :erlang.start_timer(timeout, self(), now_in_native, abs: true) %{codel | idle: idle} end - - defp execute_queue_telemetry(status, queue, event, count \\ 1) - when status in [:busy, :ready] and - event in [:checkin, :checkout, :enqueue, :dequeue, :drop_idle] do - {ready_conn_count, checkout_queue_length} = - case status do - :busy -> - {0, :ets.select_count(queue, [{{{:_, :_, :_}}, [], [true]}])} - - :ready -> - {:ets.select_count(queue, [{{{:_, :_}}, [], [true]}]), 0} - end - - :telemetry.execute( - [:db_connection, event], - %{count: count}, - %{ready_conn_count: ready_conn_count, checkout_queue_length: checkout_queue_length} - ) - end end diff --git a/test/test_support.exs b/test/test_support.exs index ecd4f16..55ba8a2 100644 --- a/test/test_support.exs +++ b/test/test_support.exs @@ -19,6 +19,7 @@ defmodule TestConnection do end defdelegate rollback(conn, reason), to: DBConnection + defdelegate get_connection_metrics(pool), to: DBConnection.ConnectionPool def prepare(pool, query, opts2 \\ []) do DBConnection.prepare(pool, query, opts2 ++ unquote(opts))