From 5d088f7262b43e442c13c9f880c8fff7dfe92277 Mon Sep 17 00:00:00 2001 From: ruslandoga Date: Tue, 12 Nov 2024 21:27:09 +0700 Subject: [PATCH] Proxy info messages to the adapter (#316) --- .formatter.exs | 2 +- examples/tcp_connection/lib/tcp_connection.ex | 17 +++ integration_test/cases/info_test.exs | 100 ++++++++++++++++++ lib/db_connection/connection.ex | 20 +++- test/test_support.exs | 4 + 5 files changed, 137 insertions(+), 6 deletions(-) create mode 100644 integration_test/cases/info_test.exs diff --git a/.formatter.exs b/.formatter.exs index be68757e..ab786a03 100644 --- a/.formatter.exs +++ b/.formatter.exs @@ -1,4 +1,4 @@ # Used by "mix format" [ - inputs: ["{mix,.formatter}.exs", "{config,lib,test,integration_test}/**/*.{ex,exs}"] + inputs: ["{mix,.formatter}.exs", "{config,lib,test,examples,integration_test}/**/*.{ex,exs}"] ] diff --git a/examples/tcp_connection/lib/tcp_connection.ex b/examples/tcp_connection/lib/tcp_connection.ex index efb68352..ebc1b7b3 100644 --- a/examples/tcp_connection/lib/tcp_connection.ex +++ b/examples/tcp_connection/lib/tcp_connection.ex @@ -64,6 +64,8 @@ defmodule TCPConnection do case :gen_tcp.connect(host, port, socket_opts, timeout) do {:ok, sock} -> + # Monitor the socket so we can react to it being closed. See handle_info/2. + _ref = :inet.monitor(sock) {:ok, {sock, <<>>}} {:error, reason} -> @@ -143,6 +145,21 @@ defmodule TCPConnection do end end + # The handle_info callback is optional and can be removed if not needed. + # Here it is used to react to `:inet.monitor/1` messages which arrive + # when socket gets closed while the connection is idle. + def handle_info({:DOWN, _ref, _type, sock, _info}, {sock, _buffer}) do + {:disconnect, TCPConnection.Error.exception({:idle, :closed})} + end + + def handle_info(msg, state) do + Logger.info(fn -> + ["#{__MODULE__} (", inspect(self()), ") missed message: ", inspect(msg)] + end) + + :ok + end + @impl true def handle_close(_, _, s) do {:ok, nil, s} diff --git a/integration_test/cases/info_test.exs b/integration_test/cases/info_test.exs new file mode 100644 index 00000000..7d187c86 --- /dev/null +++ b/integration_test/cases/info_test.exs @@ -0,0 +1,100 @@ +defmodule InfoTest do + use ExUnit.Case, async: true + + alias TestPool, as: P + alias TestAgent, as: A + alias TestQuery, as: Q + + test "handle_info handles harmless message and moves on" do + stack = [ + fn opts -> + send(opts[:parent], {:connected, self()}) + {:ok, :state} + end, + :ok, + {:idle, :state}, + {:idle, :state} + ] + + {:ok, agent} = A.start_link(stack) + {:ok, pool} = P.start_link(agent: agent, parent: self()) + + assert_receive {:connected, conn} + send(conn, "some harmless message") + assert P.run(pool, fn _ -> :result end) == :result + + assert [ + connect: _, + handle_info: _, + handle_status: _, + handle_status: _ + ] = A.record(agent) + end + + test "handle_info can force disconnect" do + stack = [ + fn opts -> + send(opts[:parent], {:connected, self()}) + {:ok, :state} + end, + {:disconnect, RuntimeError.exception("TCP connection just closed")}, + :ok, + fn opts -> + send(opts[:parent], :reconnected) + {:ok, :state} + end + ] + + {:ok, agent} = A.start_link(stack) + P.start_link(agent: agent, parent: self()) + + assert_receive {:connected, conn} + send(conn, "monitor says TCP connection just closed") + assert_receive :reconnected + + assert [ + connect: _, + handle_info: _, + disconnect: _, + connect: _ + ] = A.record(agent) + end + + test "handle_info's disconnect while checked out client crashes is no-op" do + stack = [ + fn _opts -> + {:ok, %{conn_pid: self()}} + end, + fn _query, _params, _opts, %{conn_pid: conn_pid} -> + send(conn_pid, "monitor says TCP connection just closed") + + # This waits for the info message to be processed. + :sys.get_state(conn_pid) + + {:disconnect, RuntimeError.exception("TCP connection is closed"), :new_state} + end, + {:disconnect, RuntimeError.exception("TCP connection just closed")}, + :ok, + fn opts -> + send(opts[:parent], :reconnected) + {:ok, :state} + end + ] + + {:ok, agent} = A.start_link(stack) + {:ok, pool} = P.start_link(agent: agent, parent: self()) + + assert {:error, %RuntimeError{message: "TCP connection is closed"}} = + P.execute(pool, %Q{}, []) + + assert_receive :reconnected + + assert [ + connect: _, + handle_execute: _, + handle_info: _, + disconnect: _, + connect: _ + ] = A.record(agent) + end +end diff --git a/lib/db_connection/connection.ex b/lib/db_connection/connection.ex index ae5f9115..b1c120f3 100644 --- a/lib/db_connection/connection.ex +++ b/lib/db_connection/connection.ex @@ -330,12 +330,22 @@ defmodule DBConnection.Connection do handle_timeout(s) end - def handle_event(:info, msg, :no_state, %{mod: mod} = s) do - Logger.info(fn -> - [inspect(mod), ?\s, ?(, inspect(self()), ") missed message: " | inspect(msg)] - end) + def handle_event(:info, msg, :no_state, %{mod: mod, state: state} = s) do + if function_exported?(mod, :handle_info, 2) do + case apply(mod, :handle_info, [msg, state]) do + :ok -> + handle_timeout(s) + + {:disconnect, err} -> + {:keep_state, s, {:next_event, :internal, {:disconnect, {:log, err}}}} + end + else + Logger.info(fn -> + [inspect(mod), ?\s, ?(, inspect(self()), ") missed message: " | inspect(msg)] + end) - handle_timeout(s) + handle_timeout(s) + end end @doc false diff --git a/test/test_support.exs b/test/test_support.exs index dcbe445f..a63818d1 100644 --- a/test/test_support.exs +++ b/test/test_support.exs @@ -150,6 +150,10 @@ defmodule TestConnection do TestAgent.eval(:handle_deallocate, [query, cursor, opts, state]) end + def handle_info(message, state) do + TestAgent.eval(:handle_info, [message, state]) + end + defp put_agent_from_opts(opts) do Process.get(:agent) || Process.put(:agent, agent_from_opts(opts)) end