Skip to content

Commit

Permalink
Add a few queue telemetry calls to DBConnection.ConnectionPool
Browse files Browse the repository at this point in the history
  • Loading branch information
Gleb Ivanov committed May 21, 2024
1 parent 286f849 commit 69fd868
Show file tree
Hide file tree
Showing 2 changed files with 197 additions and 4 deletions.
154 changes: 154 additions & 0 deletions integration_test/connection_pool/telemetry_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
defmodule TestPoolTelemetry do
use ExUnit.Case, async: true

alias TestPool, as: P
alias TestAgent, as: A
alias TestQuery, as: Q
alias TestResult, as: R

@events [
[:db_connection, :enqueue],
[:db_connection, :dequeue],
[:db_connection, :checkout],
[:db_connection, :checkin],
[:db_connection, :drop_idle]
]

setup do
telemetry_ref = make_ref()

:telemetry.attach_many(
telemetry_ref,
@events,
fn event, measurements, metadata, {ref, dest_pid} ->
send(dest_pid, {event, ref, measurements, metadata})
end,
{telemetry_ref, self()}
)

on_exit(fn ->
:telemetry.detach(telemetry_ref)
end)

%{telemetry_ref: telemetry_ref}
end

test "checkin/checkout telemetry when ready", %{telemetry_ref: telemetry_ref} do
stack = [
{:ok, :state},
{:ok, %Q{}, %R{}, :state}
]

{:ok, agent} = A.start_link(stack)
{:ok, pool} = P.start_link(agent: agent, parent: self())

assert_receive {[:db_connection, :checkin], ^telemetry_ref, %{count: 1},
%{checkout_queue_length: 0, ready_conn_count: 1}}

assert P.execute(pool, %Q{}, [:client])

assert_receive {[:db_connection, :checkout], ^telemetry_ref, %{count: 1},
%{checkout_queue_length: 0, ready_conn_count: 0}}

assert_receive {[:db_connection, :checkin], ^telemetry_ref, %{count: 1},
%{checkout_queue_length: 0, ready_conn_count: 1}}

assert [
connect: [_],
handle_execute: [%Q{}, [:client], _, :state]
] = A.record(agent)
end

test "idle telemetry on idle timeout", %{telemetry_ref: telemetry_ref} do
stack = [
{:ok, :state},
{:ok, :state}
]

idle_interval = 100

{:ok, agent} = A.start_link(stack)
{:ok, _} = P.start_link(agent: agent, parent: self(), idle_interval: idle_interval)

assert_receive {[:db_connection, :checkin], ^telemetry_ref, %{count: 1},
%{checkout_queue_length: 0, ready_conn_count: 1}}

assert_receive {[:db_connection, :drop_idle], ^telemetry_ref, %{count: 1},
%{checkout_queue_length: 0, ready_conn_count: 0}},
idle_interval

assert [{:connect, [_]} | _] = A.record(agent)
end

test "enqueue/dequeue telemetry when busy", %{telemetry_ref: telemetry_ref} do
stack = [
{:ok, :state},
{:ok, :state},
fn _, _, _, _ ->
receive do
:continue -> {:ok, %Q{}, %R{}, :state}
end
end,
fn _, _, _, _ ->
receive do
:continue -> {:ok, %Q{}, %R{}, :state}
end
end,
fn _, _, _, _ ->
receive do
:continue -> {:ok, %Q{}, %R{}, :state}
end
end
]

{:ok, agent} = A.start_link(stack)
{:ok, pool} = P.start_link(agent: agent, parent: self(), pool_size: 2)

assert_receive {[:db_connection, :checkin], ^telemetry_ref, %{count: 1},
%{checkout_queue_length: 0, ready_conn_count: 1}}

assert_receive {[:db_connection, :checkin], ^telemetry_ref, %{count: 1},
%{checkout_queue_length: 0, ready_conn_count: 2}}

fn ->
spawn_link(fn ->
Process.put(:agent, agent)
assert P.execute(pool, %Q{}, [:client])
end)
end
|> Stream.repeatedly()
|> Enum.take(3)
|> Enum.each(fn pid ->
send(pid, :continue)
end)

assert_receive {[:db_connection, :checkout], ^telemetry_ref, %{count: 1},
%{checkout_queue_length: 0, ready_conn_count: 1}}

assert_receive {[:db_connection, :checkout], ^telemetry_ref, %{count: 1},
%{checkout_queue_length: 0, ready_conn_count: 0}}

assert_receive {[:db_connection, :enqueue], ^telemetry_ref, %{count: 1},
%{checkout_queue_length: 1, ready_conn_count: 0}}

assert_receive {[:db_connection, :dequeue], ^telemetry_ref, %{count: 1},
%{checkout_queue_length: 0, ready_conn_count: 0}}

assert_receive {[:db_connection, :checkout], ^telemetry_ref, %{count: 1},
%{checkout_queue_length: 0, ready_conn_count: 0}}

assert_receive {[:db_connection, :checkin], ^telemetry_ref, %{count: 1},
%{checkout_queue_length: 0, ready_conn_count: 1}}

assert_receive {[:db_connection, :checkin], ^telemetry_ref, %{count: 1},
%{checkout_queue_length: 0, ready_conn_count: 2}}

assert [
connect: [_],
connect: [_],
handle_execute: [%Q{}, [:client], _, :state],
handle_execute: [%Q{}, [:client], _, :state],
handle_execute: [%Q{}, [:client], _, :state]
] = A.record(agent)
end
end
47 changes: 43 additions & 4 deletions lib/db_connection/connection_pool.ex
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,9 @@ defmodule DBConnection.ConnectionPool do
end

@impl GenServer
def handle_call({:disconnect_all, interval}, _from, {type, queue, codel, _ts}) do
def handle_call({:disconnect_all, interval}, _from, {status, queue, codel, _ts}) do
ts = {System.monotonic_time(), interval}
{:reply, :ok, {type, queue, codel, ts}}
{:reply, :ok, {status, queue, codel, ts}}
end

@impl GenServer
Expand All @@ -86,6 +86,7 @@ defmodule DBConnection.ConnectionPool do
case queue? do
true ->
:ets.insert(queue, {{now, System.unique_integer(), from}})
execute_queue_telemetry(:busy, queue, :enqueue)
{:noreply, busy}

false ->
Expand All @@ -102,7 +103,11 @@ defmodule DBConnection.ConnectionPool do
) do
case :ets.first(queue) do
{queued_in_native, holder} = key ->
Holder.handle_checkout(holder, from, queue, queued_in_native) and :ets.delete(queue, key)
if Holder.handle_checkout(holder, from, queue, queued_in_native) and
:ets.delete(queue, key) do
execute_queue_telemetry(:ready, queue, :checkout)
end

{:noreply, ready}

:"$end_of_table" ->
Expand Down Expand Up @@ -199,6 +204,9 @@ defmodule DBConnection.ConnectionPool do
:ets.first(queue) do
:ets.delete(queue, key)
Holder.maybe_disconnect(holder, elem(ts, 0), 0) or Holder.handle_ping(holder)

execute_queue_telemetry(status, queue, :drop_idle)

drop_idle(past_in_native, limit - 1, status, queue, codel, ts)
else
_ ->
Expand Down Expand Up @@ -229,15 +237,20 @@ defmodule DBConnection.ConnectionPool do
guards = [{:<, :"$1", min_sent}]
select_slow = [{match, guards, [{{:"$1", :"$2"}}]}]

for {sent, from} <- :ets.select(queue, select_slow) do
slow = :ets.select(queue, select_slow)

for {sent, from} <- slow do
drop(time - sent, from)
end

:ets.select_delete(queue, [{match, guards, [true]}])

execute_queue_telemetry(:busy, queue, :dequeue, length(slow))
end

defp handle_checkin(holder, now_in_native, {:ready, queue, _, _} = data) do
:ets.insert(queue, {{now_in_native, holder}})
execute_queue_telemetry(:ready, queue, :checkin)
{:noreply, data}
end

Expand All @@ -250,6 +263,7 @@ defmodule DBConnection.ConnectionPool do

{:ready, _, _, _} = ready ->
:ets.insert(queue, {{now_in_native, holder}})
execute_queue_telemetry(:ready, queue, :checkin)
{:noreply, ready}
end
end
Expand All @@ -274,6 +288,7 @@ defmodule DBConnection.ConnectionPool do
case :ets.first(queue) do
{sent, _, from} = key ->
:ets.delete(queue, key)
execute_queue_telemetry(:busy, queue, :dequeue)
delay = time - sent
codel = %{codel | next: next, delay: delay, slow: slow?}
go(delay, from, time, holder, queue, codel, ts)
Expand All @@ -288,6 +303,7 @@ defmodule DBConnection.ConnectionPool do
case :ets.first(queue) do
{sent, _, from} = key ->
:ets.delete(queue, key)
execute_queue_telemetry(:busy, queue, :dequeue)
go(time - sent, from, time, holder, queue, codel, ts)

:"$end_of_table" ->
Expand All @@ -299,11 +315,13 @@ defmodule DBConnection.ConnectionPool do
case :ets.first(queue) do
{sent, _, from} = key when time - sent > timeout ->
:ets.delete(queue, key)
execute_queue_telemetry(:busy, queue, :dequeue)
drop(time - sent, from)
dequeue_slow(time, timeout, holder, queue, codel, ts)

{sent, _, from} = key ->
:ets.delete(queue, key)
execute_queue_telemetry(:busy, queue, :dequeue)
go(time - sent, from, time, holder, queue, codel, ts)

:"$end_of_table" ->
Expand All @@ -314,9 +332,11 @@ defmodule DBConnection.ConnectionPool do
defp go(delay, from, time, holder, queue, %{delay: min} = codel, ts) do
case Holder.handle_checkout(holder, from, queue, 0) do
true when delay < min ->
execute_queue_telemetry(:busy, queue, :checkout)
{:busy, queue, %{codel | delay: delay}, ts}

true ->
execute_queue_telemetry(:busy, queue, :checkout)
{:busy, queue, codel, ts}

false ->
Expand Down Expand Up @@ -358,4 +378,23 @@ defmodule DBConnection.ConnectionPool do
idle = :erlang.start_timer(timeout, self(), now_in_native, abs: true)
%{codel | idle: idle}
end

defp execute_queue_telemetry(status, queue, event, count \\ 1)
when status in [:busy, :ready] and
event in [:checkin, :checkout, :enqueue, :dequeue, :drop_idle] do
{ready_conn_count, checkout_queue_length} =
case status do
:busy ->
{0, :ets.select_count(queue, [{{{:_, :_, :_}}, [], [true]}])}

:ready ->
{:ets.select_count(queue, [{{{:_, :_}}, [], [true]}]), 0}
end

:telemetry.execute(
[:db_connection, event],
%{count: count},
%{ready_conn_count: ready_conn_count, checkout_queue_length: checkout_queue_length}
)
end
end

0 comments on commit 69fd868

Please sign in to comment.