Skip to content

Commit

Permalink
Emit telemetry events on connect/disconnect, move listeners to teleme…
Browse files Browse the repository at this point in the history
…try handler
  • Loading branch information
v0idpwn committed Jun 23, 2024
1 parent 4256384 commit f8b4160
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 22 deletions.
1 change: 1 addition & 0 deletions lib/db_connection/app.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ defmodule DBConnection.App do
def start(_type, _args) do
children = [
{Task.Supervisor, name: DBConnection.Task},
DBConnection.NotifyListeners,
dynamic_supervisor(DBConnection.Ownership.Supervisor),
dynamic_supervisor(DBConnection.ConnectionPool.Supervisor),
DBConnection.Watcher
Expand Down
40 changes: 18 additions & 22 deletions lib/db_connection/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ defmodule DBConnection.Connection do
@doc false
@impl :gen_statem
def init({mod, opts, pool, tag}) do
connection_listeners = Keyword.get(opts, :connection_listeners, [])

s = %{
mod: mod,
opts: opts,
Expand All @@ -56,11 +58,13 @@ defmodule DBConnection.Connection do
tag: tag,
timer: nil,
backoff: Backoff.new(opts),
connection_listeners: Keyword.get(opts, :connection_listeners, []),
connection_listeners: connection_listeners,
after_connect: Keyword.get(opts, :after_connect),
after_connect_timeout: Keyword.get(opts, :after_connect_timeout, @timeout)
}

DBConnection.NotifyListeners.add_listeners(self(), connection_listeners)

{:ok, :no_state, s, {:next_event, :internal, {:connect, :init}}}
end

Expand Down Expand Up @@ -145,7 +149,7 @@ defmodule DBConnection.Connection do
end

:telemetry.execute(
[:db_connection, :disconnect],
[:db_connection, :disconnected],
%{count: 1},
%{mod: mod, opts: s.opts, pool: s.pool}
)
Expand All @@ -156,8 +160,6 @@ defmodule DBConnection.Connection do
:ok = apply(mod, :disconnect, [err, state])
s = %{s | state: nil, client: :closed, timer: nil}

notify_connection_listeners(:disconnected, s)

case client do
_ when backoff == nil ->
{:stop, {:shutdown, err}, s}
Expand Down Expand Up @@ -227,10 +229,15 @@ defmodule DBConnection.Connection do
state: state,
after_connect: after_connect,
after_connect_timeout: timeout,
opts: opts
opts: opts,
pool: pool
} = s

notify_connection_listeners(:connected, s)
:telemetry.execute(
[:db_connection, :connected],
%{count: 1},
%{mod: mod, opts: opts, pool: pool}
)

case apply(mod, :checkout, [state]) do
{:ok, state} ->
Expand All @@ -252,7 +259,11 @@ defmodule DBConnection.Connection do
def handle_event(:cast, {:connected, ref}, :no_state, %{client: {ref, :connect}} = s) do
%{mod: mod, state: state} = s

notify_connection_listeners(:connected, s)
:telemetry.execute(
[:db_connection, :connected],
%{count: 1},
%{mod: mod, opts: s.opts, pool: s.pool}
)

case apply(mod, :checkout, [state]) do
{:ok, state} ->
Expand Down Expand Up @@ -513,19 +524,4 @@ defmodule DBConnection.Connection do
[{mod, fun, length(args), info} | rest]
end
end

defp notify_connection_listeners(action, %{} = state) do
%{connection_listeners: connection_listeners} = state

{listeners, message} =
case connection_listeners do
listeners when is_list(listeners) ->
{listeners, {action, self()}}

{listeners, tag} when is_list(listeners) ->
{listeners, {action, self(), tag}}
end

Enum.each(listeners, &send(&1, message))
end
end
109 changes: 109 additions & 0 deletions lib/db_connection/notify_listeners.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
defmodule DBConnection.NotifyListeners do
@moduledoc false
# This server is responsible for:
# - Handling requests to add connection listeners to a connection
# - Attaching a telemetry handler to forward messages to connection
# listeners (but the forwarding is performed at the connection process)
# - Automatically removing connection listeners for connections that were removed,
# and detaching the telemetry handler if it's not needed anymore

use GenServer

def start_link(_opts) do
GenServer.start_link(__MODULE__, [], name: server_name())
end

def add_listeners(_, []) do
:ok
end

def add_listeners(connection_pid, listeners) do
GenServer.call(server_name(), {:add_listeners, connection_pid, listeners})
end

@impl GenServer
def init(_opts) do
start_ets()
{:ok, %{attached?: false}}
end

@impl GenServer
def handle_info({:DOWN, _, :process, pid, _}, state) do
:ets.delete(ets_name(), pid)

case maybe_detach_handlers() do
:detached -> {:noreply, %{state | attached?: false}}
:attached -> {:noreply, state}
end
end

@impl GenServer
def handle_call({:add_listeners, connection_pid, listeners}, _from, state) do
unless state.attached? do
attach_handlers()
end

:ets.insert(ets_name(), {connection_pid, listeners})
Process.monitor(connection_pid)

{:reply, :ok, %{state | attached?: true}}
end

defp maybe_detach_handlers() do
case :ets.info(ets_name()) do
%{size: 0} ->
detach_handlers()
:detached

_ ->
:attached
end
end

defp start_ets() do
:ets.new(ets_name(), [:public, :named_table])
end

defp attach_handlers() do
:telemetry.attach_many(
handler_name(),
[
[:db_connection, :connected],
[:db_connection, :disconnected]
],
&__MODULE__.notify_listeners/4,
%{}
)
end

defp detach_handlers() do
:telemetry.detach(handler_name())
end

def notify_listeners([:db_connection, :connected], _, _, _) do
do_notify_listeners(:connected, self())
end

def notify_listeners([:db_connection, :disconnected], _, _, _) do
do_notify_listeners(:disconnected, self())
end

defp do_notify_listeners(action, conn_pid) do
[{_, connection_listeners}] = :ets.lookup(ets_name(), conn_pid)

{listeners, message} =
case connection_listeners do
listeners when is_list(listeners) ->
{listeners, {action, conn_pid}}

{listeners, tag} when is_list(listeners) ->
{listeners, {action, conn_pid, tag}}
end

Enum.each(listeners, &send(&1, message))
end

defp ets_name(), do: __MODULE__.Ets
defp handler_name(), do: inspect(__MODULE__.TelemetryHandler)
defp server_name(), do: __MODULE__
end

0 comments on commit f8b4160

Please sign in to comment.