diff --git a/integration_test/connection_pool/telemetry_test.exs b/integration_test/connection_pool/telemetry_test.exs new file mode 100644 index 0000000..d298ce4 --- /dev/null +++ b/integration_test/connection_pool/telemetry_test.exs @@ -0,0 +1,154 @@ +defmodule TestPoolTelemetry do + use ExUnit.Case, async: true + + alias TestPool, as: P + alias TestAgent, as: A + alias TestQuery, as: Q + alias TestResult, as: R + + @events [ + [:db_connection, :enqueue], + [:db_connection, :dequeue], + [:db_connection, :checkout], + [:db_connection, :checkin], + [:db_connection, :drop_idle] + ] + + setup do + telemetry_ref = make_ref() + + :telemetry.attach_many( + telemetry_ref, + @events, + fn event, measurements, metadata, {ref, dest_pid} -> + send(dest_pid, {event, ref, measurements, metadata}) + end, + {telemetry_ref, self()} + ) + + on_exit(fn -> + :telemetry.detach(telemetry_ref) + end) + + %{telemetry_ref: telemetry_ref} + end + + test "checkin/checkout telemetry when ready", %{telemetry_ref: telemetry_ref} do + stack = [ + {:ok, :state}, + {:ok, %Q{}, %R{}, :state} + ] + + {:ok, agent} = A.start_link(stack) + {:ok, pool} = P.start_link(agent: agent, parent: self()) + + assert_receive {[:db_connection, :checkin], ^telemetry_ref, %{count: 1}, + %{checkout_queue_length: 0, ready_conn_count: 1}} + + assert P.execute(pool, %Q{}, [:client]) + + assert_receive {[:db_connection, :checkout], ^telemetry_ref, %{count: 1}, + %{checkout_queue_length: 0, ready_conn_count: 0}} + + assert_receive {[:db_connection, :checkin], ^telemetry_ref, %{count: 1}, + %{checkout_queue_length: 0, ready_conn_count: 1}} + + assert [ + connect: [_], + handle_execute: [%Q{}, [:client], _, :state] + ] = A.record(agent) + end + + test "idle telemetry on idle timeout", %{telemetry_ref: telemetry_ref} do + stack = [ + {:ok, :state}, + {:ok, :state} + ] + + idle_interval = 100 + + {:ok, agent} = A.start_link(stack) + {:ok, _} = P.start_link(agent: agent, parent: self(), idle_interval: idle_interval) + + assert_receive {[:db_connection, :checkin], ^telemetry_ref, %{count: 1}, + %{checkout_queue_length: 0, ready_conn_count: 1}} + + assert_receive {[:db_connection, :drop_idle], ^telemetry_ref, %{count: 1}, + %{checkout_queue_length: 0, ready_conn_count: 0}}, + idle_interval + + assert [{:connect, [_]} | _] = A.record(agent) + end + + test "enqueue/dequeue telemetry when busy", %{telemetry_ref: telemetry_ref} do + stack = [ + {:ok, :state}, + {:ok, :state}, + fn _, _, _, _ -> + receive do + :continue -> {:ok, %Q{}, %R{}, :state} + end + end, + fn _, _, _, _ -> + receive do + :continue -> {:ok, %Q{}, %R{}, :state} + end + end, + fn _, _, _, _ -> + receive do + :continue -> {:ok, %Q{}, %R{}, :state} + end + end + ] + + {:ok, agent} = A.start_link(stack) + {:ok, pool} = P.start_link(agent: agent, parent: self(), pool_size: 2) + + assert_receive {[:db_connection, :checkin], ^telemetry_ref, %{count: 1}, + %{checkout_queue_length: 0, ready_conn_count: 1}} + + assert_receive {[:db_connection, :checkin], ^telemetry_ref, %{count: 1}, + %{checkout_queue_length: 0, ready_conn_count: 2}} + + fn -> + spawn_link(fn -> + Process.put(:agent, agent) + assert P.execute(pool, %Q{}, [:client]) + end) + end + |> Stream.repeatedly() + |> Enum.take(3) + |> Enum.each(fn pid -> + send(pid, :continue) + end) + + assert_receive {[:db_connection, :checkout], ^telemetry_ref, %{count: 1}, + %{checkout_queue_length: 0, ready_conn_count: 1}} + + assert_receive {[:db_connection, :checkout], ^telemetry_ref, %{count: 1}, + %{checkout_queue_length: 0, ready_conn_count: 0}} + + assert_receive {[:db_connection, :enqueue], ^telemetry_ref, %{count: 1}, + %{checkout_queue_length: 1, ready_conn_count: 0}} + + assert_receive {[:db_connection, :dequeue], ^telemetry_ref, %{count: 1}, + %{checkout_queue_length: 0, ready_conn_count: 0}} + + assert_receive {[:db_connection, :checkout], ^telemetry_ref, %{count: 1}, + %{checkout_queue_length: 0, ready_conn_count: 0}} + + assert_receive {[:db_connection, :checkin], ^telemetry_ref, %{count: 1}, + %{checkout_queue_length: 0, ready_conn_count: 1}} + + assert_receive {[:db_connection, :checkin], ^telemetry_ref, %{count: 1}, + %{checkout_queue_length: 0, ready_conn_count: 2}} + + assert [ + connect: [_], + connect: [_], + handle_execute: [%Q{}, [:client], _, :state], + handle_execute: [%Q{}, [:client], _, :state], + handle_execute: [%Q{}, [:client], _, :state] + ] = A.record(agent) + end +end diff --git a/lib/db_connection/connection_pool.ex b/lib/db_connection/connection_pool.ex index 00a793a..ac663e2 100644 --- a/lib/db_connection/connection_pool.ex +++ b/lib/db_connection/connection_pool.ex @@ -73,9 +73,9 @@ defmodule DBConnection.ConnectionPool do end @impl GenServer - def handle_call({:disconnect_all, interval}, _from, {type, queue, codel, _ts}) do + def handle_call({:disconnect_all, interval}, _from, {status, queue, codel, _ts}) do ts = {System.monotonic_time(), interval} - {:reply, :ok, {type, queue, codel, ts}} + {:reply, :ok, {status, queue, codel, ts}} end @impl GenServer @@ -86,6 +86,7 @@ defmodule DBConnection.ConnectionPool do case queue? do true -> :ets.insert(queue, {{now, System.unique_integer(), from}}) + execute_queue_telemetry(:busy, queue, :enqueue) {:noreply, busy} false -> @@ -102,7 +103,11 @@ defmodule DBConnection.ConnectionPool do ) do case :ets.first(queue) do {queued_in_native, holder} = key -> - Holder.handle_checkout(holder, from, queue, queued_in_native) and :ets.delete(queue, key) + if Holder.handle_checkout(holder, from, queue, queued_in_native) and + :ets.delete(queue, key) do + execute_queue_telemetry(:ready, queue, :checkout) + end + {:noreply, ready} :"$end_of_table" -> @@ -199,6 +204,9 @@ defmodule DBConnection.ConnectionPool do :ets.first(queue) do :ets.delete(queue, key) Holder.maybe_disconnect(holder, elem(ts, 0), 0) or Holder.handle_ping(holder) + + execute_queue_telemetry(status, queue, :drop_idle) + drop_idle(past_in_native, limit - 1, status, queue, codel, ts) else _ -> @@ -229,15 +237,20 @@ defmodule DBConnection.ConnectionPool do guards = [{:<, :"$1", min_sent}] select_slow = [{match, guards, [{{:"$1", :"$2"}}]}] - for {sent, from} <- :ets.select(queue, select_slow) do + slow = :ets.select(queue, select_slow) + + for {sent, from} <- slow do drop(time - sent, from) end :ets.select_delete(queue, [{match, guards, [true]}]) + + execute_queue_telemetry(:busy, queue, :dequeue, length(slow)) end defp handle_checkin(holder, now_in_native, {:ready, queue, _, _} = data) do :ets.insert(queue, {{now_in_native, holder}}) + execute_queue_telemetry(:ready, queue, :checkin) {:noreply, data} end @@ -250,6 +263,7 @@ defmodule DBConnection.ConnectionPool do {:ready, _, _, _} = ready -> :ets.insert(queue, {{now_in_native, holder}}) + execute_queue_telemetry(:ready, queue, :checkin) {:noreply, ready} end end @@ -274,6 +288,7 @@ defmodule DBConnection.ConnectionPool do case :ets.first(queue) do {sent, _, from} = key -> :ets.delete(queue, key) + execute_queue_telemetry(:busy, queue, :dequeue) delay = time - sent codel = %{codel | next: next, delay: delay, slow: slow?} go(delay, from, time, holder, queue, codel, ts) @@ -288,6 +303,7 @@ defmodule DBConnection.ConnectionPool do case :ets.first(queue) do {sent, _, from} = key -> :ets.delete(queue, key) + execute_queue_telemetry(:busy, queue, :dequeue) go(time - sent, from, time, holder, queue, codel, ts) :"$end_of_table" -> @@ -299,11 +315,13 @@ defmodule DBConnection.ConnectionPool do case :ets.first(queue) do {sent, _, from} = key when time - sent > timeout -> :ets.delete(queue, key) + execute_queue_telemetry(:busy, queue, :dequeue) drop(time - sent, from) dequeue_slow(time, timeout, holder, queue, codel, ts) {sent, _, from} = key -> :ets.delete(queue, key) + execute_queue_telemetry(:busy, queue, :dequeue) go(time - sent, from, time, holder, queue, codel, ts) :"$end_of_table" -> @@ -314,9 +332,11 @@ defmodule DBConnection.ConnectionPool do defp go(delay, from, time, holder, queue, %{delay: min} = codel, ts) do case Holder.handle_checkout(holder, from, queue, 0) do true when delay < min -> + execute_queue_telemetry(:busy, queue, :checkout) {:busy, queue, %{codel | delay: delay}, ts} true -> + execute_queue_telemetry(:busy, queue, :checkout) {:busy, queue, codel, ts} false -> @@ -358,4 +378,23 @@ defmodule DBConnection.ConnectionPool do idle = :erlang.start_timer(timeout, self(), now_in_native, abs: true) %{codel | idle: idle} end + + defp execute_queue_telemetry(status, queue, event, count \\ 1) + when status in [:busy, :ready] and + event in [:checkin, :checkout, :enqueue, :dequeue, :drop_idle] do + {ready_conn_count, checkout_queue_length} = + case status do + :busy -> + {0, :ets.select_count(queue, [{{{:_, :_, :_}}, [], [true]}])} + + :ready -> + {:ets.select_count(queue, [{{{:_, :_}}, [], [true]}]), 0} + end + + :telemetry.execute( + [:db_connection, event], + %{count: count}, + %{ready_conn_count: ready_conn_count, checkout_queue_length: checkout_queue_length} + ) + end end