Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add metrics for connection pool usage v3 #307

Merged
merged 11 commits into from
May 24, 2024
154 changes: 154 additions & 0 deletions integration_test/connection_pool/telemetry_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
defmodule TestPoolTelemetry do
use ExUnit.Case, async: true

alias TestPool, as: P
alias TestAgent, as: A
alias TestQuery, as: Q
alias TestResult, as: R

@events [
[:db_connection, :enqueue],
[:db_connection, :dequeue],
[:db_connection, :checkout],
[:db_connection, :checkin],
[:db_connection, :drop_idle]
]

setup do
telemetry_ref = make_ref()

:telemetry.attach_many(
telemetry_ref,
@events,
fn event, measurements, metadata, {ref, dest_pid} ->
send(dest_pid, {event, ref, measurements, metadata})
end,
{telemetry_ref, self()}
)

on_exit(fn ->
:telemetry.detach(telemetry_ref)
end)

%{telemetry_ref: telemetry_ref}
end

test "checkin/checkout telemetry when ready", %{telemetry_ref: telemetry_ref} do
stack = [
{:ok, :state},
{:ok, %Q{}, %R{}, :state}
]

{:ok, agent} = A.start_link(stack)
{:ok, pool} = P.start_link(agent: agent, parent: self())

assert_receive {[:db_connection, :checkin], ^telemetry_ref, %{count: 1},
%{checkout_queue_length: 0, ready_conn_count: 1}}

assert P.execute(pool, %Q{}, [:client])

assert_receive {[:db_connection, :checkout], ^telemetry_ref, %{count: 1},
%{checkout_queue_length: 0, ready_conn_count: 0}}

assert_receive {[:db_connection, :checkin], ^telemetry_ref, %{count: 1},
%{checkout_queue_length: 0, ready_conn_count: 1}}

assert [
connect: [_],
handle_execute: [%Q{}, [:client], _, :state]
] = A.record(agent)
end

test "idle telemetry on idle timeout", %{telemetry_ref: telemetry_ref} do
stack = [
{:ok, :state},
{:ok, :state}
]

idle_interval = 100

{:ok, agent} = A.start_link(stack)
{:ok, _} = P.start_link(agent: agent, parent: self(), idle_interval: idle_interval)

assert_receive {[:db_connection, :checkin], ^telemetry_ref, %{count: 1},
%{checkout_queue_length: 0, ready_conn_count: 1}}

assert_receive {[:db_connection, :drop_idle], ^telemetry_ref, %{count: 1},
%{checkout_queue_length: 0, ready_conn_count: 0}},
idle_interval

assert [{:connect, [_]} | _] = A.record(agent)
end

test "enqueue/dequeue telemetry when busy", %{telemetry_ref: telemetry_ref} do
stack = [
{:ok, :state},
{:ok, :state},
fn _, _, _, _ ->
receive do
:continue -> {:ok, %Q{}, %R{}, :state}
end
end,
fn _, _, _, _ ->
receive do
:continue -> {:ok, %Q{}, %R{}, :state}
end
end,
fn _, _, _, _ ->
receive do
:continue -> {:ok, %Q{}, %R{}, :state}
end
end
]

{:ok, agent} = A.start_link(stack)
{:ok, pool} = P.start_link(agent: agent, parent: self(), pool_size: 2)

assert_receive {[:db_connection, :checkin], ^telemetry_ref, %{count: 1},
%{checkout_queue_length: 0, ready_conn_count: 1}}

assert_receive {[:db_connection, :checkin], ^telemetry_ref, %{count: 1},
%{checkout_queue_length: 0, ready_conn_count: 2}}

fn ->
spawn_link(fn ->
Process.put(:agent, agent)
assert P.execute(pool, %Q{}, [:client])
end)
end
|> Stream.repeatedly()
|> Enum.take(3)
|> Enum.each(fn pid ->
send(pid, :continue)
end)

assert_receive {[:db_connection, :checkout], ^telemetry_ref, %{count: 1},
%{checkout_queue_length: 0, ready_conn_count: 1}}

assert_receive {[:db_connection, :checkout], ^telemetry_ref, %{count: 1},
%{checkout_queue_length: 0, ready_conn_count: 0}}

assert_receive {[:db_connection, :enqueue], ^telemetry_ref, %{count: 1},
%{checkout_queue_length: 1, ready_conn_count: 0}}

assert_receive {[:db_connection, :dequeue], ^telemetry_ref, %{count: 1},
%{checkout_queue_length: 0, ready_conn_count: 0}}

assert_receive {[:db_connection, :checkout], ^telemetry_ref, %{count: 1},
%{checkout_queue_length: 0, ready_conn_count: 0}}

assert_receive {[:db_connection, :checkin], ^telemetry_ref, %{count: 1},
%{checkout_queue_length: 0, ready_conn_count: 1}}

assert_receive {[:db_connection, :checkin], ^telemetry_ref, %{count: 1},
%{checkout_queue_length: 0, ready_conn_count: 2}}

assert [
connect: [_],
connect: [_],
handle_execute: [%Q{}, [:client], _, :state],
handle_execute: [%Q{}, [:client], _, :state],
handle_execute: [%Q{}, [:client], _, :state]
] = A.record(agent)
end
end
47 changes: 43 additions & 4 deletions lib/db_connection/connection_pool.ex
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,9 @@ defmodule DBConnection.ConnectionPool do
end

@impl GenServer
def handle_call({:disconnect_all, interval}, _from, {type, queue, codel, _ts}) do
def handle_call({:disconnect_all, interval}, _from, {status, queue, codel, _ts}) do
ts = {System.monotonic_time(), interval}
{:reply, :ok, {type, queue, codel, ts}}
{:reply, :ok, {status, queue, codel, ts}}
end

@impl GenServer
Expand All @@ -86,6 +86,7 @@ defmodule DBConnection.ConnectionPool do
case queue? do
true ->
:ets.insert(queue, {{now, System.unique_integer(), from}})
execute_queue_telemetry(:busy, queue, :enqueue)
{:noreply, busy}

false ->
Expand All @@ -102,7 +103,11 @@ defmodule DBConnection.ConnectionPool do
) do
case :ets.first(queue) do
{queued_in_native, holder} = key ->
Holder.handle_checkout(holder, from, queue, queued_in_native) and :ets.delete(queue, key)
if Holder.handle_checkout(holder, from, queue, queued_in_native) and
:ets.delete(queue, key) do
execute_queue_telemetry(:ready, queue, :checkout)
end

{:noreply, ready}

:"$end_of_table" ->
Expand Down Expand Up @@ -199,6 +204,9 @@ defmodule DBConnection.ConnectionPool do
:ets.first(queue) do
:ets.delete(queue, key)
Holder.maybe_disconnect(holder, elem(ts, 0), 0) or Holder.handle_ping(holder)

execute_queue_telemetry(status, queue, :drop_idle)

drop_idle(past_in_native, limit - 1, status, queue, codel, ts)
else
_ ->
Expand Down Expand Up @@ -229,15 +237,20 @@ defmodule DBConnection.ConnectionPool do
guards = [{:<, :"$1", min_sent}]
select_slow = [{match, guards, [{{:"$1", :"$2"}}]}]

for {sent, from} <- :ets.select(queue, select_slow) do
slow = :ets.select(queue, select_slow)

for {sent, from} <- slow do
drop(time - sent, from)
end

:ets.select_delete(queue, [{match, guards, [true]}])

execute_queue_telemetry(:busy, queue, :dequeue, length(slow))
end

defp handle_checkin(holder, now_in_native, {:ready, queue, _, _} = data) do
:ets.insert(queue, {{now_in_native, holder}})
execute_queue_telemetry(:ready, queue, :checkin)
{:noreply, data}
end

Expand All @@ -250,6 +263,7 @@ defmodule DBConnection.ConnectionPool do

{:ready, _, _, _} = ready ->
:ets.insert(queue, {{now_in_native, holder}})
execute_queue_telemetry(:ready, queue, :checkin)
{:noreply, ready}
end
end
Expand All @@ -274,6 +288,7 @@ defmodule DBConnection.ConnectionPool do
case :ets.first(queue) do
{sent, _, from} = key ->
:ets.delete(queue, key)
execute_queue_telemetry(:busy, queue, :dequeue)
delay = time - sent
codel = %{codel | next: next, delay: delay, slow: slow?}
go(delay, from, time, holder, queue, codel, ts)
Expand All @@ -288,6 +303,7 @@ defmodule DBConnection.ConnectionPool do
case :ets.first(queue) do
{sent, _, from} = key ->
:ets.delete(queue, key)
execute_queue_telemetry(:busy, queue, :dequeue)
go(time - sent, from, time, holder, queue, codel, ts)

:"$end_of_table" ->
Expand All @@ -299,11 +315,13 @@ defmodule DBConnection.ConnectionPool do
case :ets.first(queue) do
{sent, _, from} = key when time - sent > timeout ->
:ets.delete(queue, key)
execute_queue_telemetry(:busy, queue, :dequeue)
drop(time - sent, from)
dequeue_slow(time, timeout, holder, queue, codel, ts)

{sent, _, from} = key ->
:ets.delete(queue, key)
execute_queue_telemetry(:busy, queue, :dequeue)
go(time - sent, from, time, holder, queue, codel, ts)

:"$end_of_table" ->
Expand All @@ -314,9 +332,11 @@ defmodule DBConnection.ConnectionPool do
defp go(delay, from, time, holder, queue, %{delay: min} = codel, ts) do
case Holder.handle_checkout(holder, from, queue, 0) do
true when delay < min ->
execute_queue_telemetry(:busy, queue, :checkout)
{:busy, queue, %{codel | delay: delay}, ts}

true ->
execute_queue_telemetry(:busy, queue, :checkout)
{:busy, queue, codel, ts}

false ->
Expand Down Expand Up @@ -358,4 +378,23 @@ defmodule DBConnection.ConnectionPool do
idle = :erlang.start_timer(timeout, self(), now_in_native, abs: true)
%{codel | idle: idle}
end

defp execute_queue_telemetry(status, queue, event, count \\ 1)
realglebivanov marked this conversation as resolved.
Show resolved Hide resolved
when status in [:busy, :ready] and
event in [:checkin, :checkout, :enqueue, :dequeue, :drop_idle] do
{ready_conn_count, checkout_queue_length} =
case status do
:busy ->
{0, :ets.select_count(queue, [{{{:_, :_, :_}}, [], [true]}])}

:ready ->
{:ets.select_count(queue, [{{{:_, :_}}, [], [true]}]), 0}
end

:telemetry.execute(
[:db_connection, event],
%{count: count},
%{ready_conn_count: ready_conn_count, checkout_queue_length: checkout_queue_length}
)
end
end