From da511ac109d8418ad520dc562f89330b76c1e936 Mon Sep 17 00:00:00 2001 From: ruslandoga <67764432+ruslandoga@users.noreply.github.com> Date: Mon, 11 Nov 2024 15:50:47 +0700 Subject: [PATCH 1/8] proxy info messages to the adapter --- lib/db_connection/connection.ex | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/lib/db_connection/connection.ex b/lib/db_connection/connection.ex index ae5f911..f6934fd 100644 --- a/lib/db_connection/connection.ex +++ b/lib/db_connection/connection.ex @@ -330,12 +330,22 @@ defmodule DBConnection.Connection do handle_timeout(s) end - def handle_event(:info, msg, :no_state, %{mod: mod} = s) do - Logger.info(fn -> - [inspect(mod), ?\s, ?(, inspect(self()), ") missed message: " | inspect(msg)] - end) + def handle_event(:info, msg, :no_state, %{mod: mod, state: state} = s) do + if function_exported?(mod, :handle_info, 2) do + case apply(mod, :handle_info, [msg, state]) do + {:ok, state} -> + pool_update(state, s) + + {:disconnect, err, state} -> + {:keep_state, %{s | state: state}, {:next_event, :internal, {:disconnect, {:log, err}}}} + end + else + Logger.info(fn -> + [inspect(mod), ?\s, ?(, inspect(self()), ") missed message: " | inspect(msg)] + end) - handle_timeout(s) + handle_timeout(s) + end end @doc false From 00c6958bffa4ec17e5febbb9525c5e90f3e84ea5 Mon Sep 17 00:00:00 2001 From: ruslandoga <67764432+ruslandoga@users.noreply.github.com> Date: Mon, 11 Nov 2024 15:57:42 +0700 Subject: [PATCH 2/8] add failing tests --- integration_test/cases/info_test.exs | 5 +++++ integration_test/connection_pool/disconnect_all_test.exs | 2 ++ 2 files changed, 7 insertions(+) create mode 100644 integration_test/cases/info_test.exs diff --git a/integration_test/cases/info_test.exs b/integration_test/cases/info_test.exs new file mode 100644 index 0000000..1226c27 --- /dev/null +++ b/integration_test/cases/info_test.exs @@ -0,0 +1,5 @@ +defmodule TestInfo do + use ExUnit.Case, async: true + + test "handle_info returns new state" +end diff --git a/integration_test/connection_pool/disconnect_all_test.exs b/integration_test/connection_pool/disconnect_all_test.exs index d2698cd..1b639ef 100644 --- a/integration_test/connection_pool/disconnect_all_test.exs +++ b/integration_test/connection_pool/disconnect_all_test.exs @@ -80,4 +80,6 @@ defmodule TestPoolDisconnectAll do handle_execute: [_, _, _, :final_state1] ] = A.record(agent) end + + test "disconnect on info" end From ba7677aaaaece115f857dbdd149aa5c09f4ad07c Mon Sep 17 00:00:00 2001 From: ruslandoga <67764432+ruslandoga@users.noreply.github.com> Date: Mon, 11 Nov 2024 16:08:02 +0700 Subject: [PATCH 3/8] update tcp_connection example --- .formatter.exs | 2 +- examples/tcp_connection/lib/tcp_connection.ex | 9 +++++++++ 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/.formatter.exs b/.formatter.exs index be68757..ab786a0 100644 --- a/.formatter.exs +++ b/.formatter.exs @@ -1,4 +1,4 @@ # Used by "mix format" [ - inputs: ["{mix,.formatter}.exs", "{config,lib,test,integration_test}/**/*.{ex,exs}"] + inputs: ["{mix,.formatter}.exs", "{config,lib,test,examples,integration_test}/**/*.{ex,exs}"] ] diff --git a/examples/tcp_connection/lib/tcp_connection.ex b/examples/tcp_connection/lib/tcp_connection.ex index efb6835..29a1704 100644 --- a/examples/tcp_connection/lib/tcp_connection.ex +++ b/examples/tcp_connection/lib/tcp_connection.ex @@ -64,6 +64,8 @@ defmodule TCPConnection do case :gen_tcp.connect(host, port, socket_opts, timeout) do {:ok, sock} -> + # Monitor the socket so we can react to it being closed. See handle_info/2. + _ref = :inet.monitor(sock) {:ok, {sock, <<>>}} {:error, reason} -> @@ -143,6 +145,13 @@ defmodule TCPConnection do end end + # The handle_info callback is optional and can be removed if not needed. + # Here it is used to react to `:inet.monitor/1` messages which arrive + # when socket gets closed while the connection is idle. + def handle_info({:DOWN, _ref, _type, sock, _info}, {sock, _buffer} = s) do + {:disconnect, TCPConnection.Error.exception({:idle, :closed}), s} + end + @impl true def handle_close(_, _, s) do {:ok, nil, s} From 9fe84fce94608184a40ffe096254cd36eb2ffd0b Mon Sep 17 00:00:00 2001 From: ruslandoga <67764432+ruslandoga@users.noreply.github.com> Date: Mon, 11 Nov 2024 18:17:58 +0700 Subject: [PATCH 4/8] add tests --- examples/tcp_connection/lib/tcp_connection.ex | 12 +++- integration_test/cases/info_test.exs | 59 ++++++++++++++++++- .../connection_pool/disconnect_all_test.exs | 2 - lib/db_connection/connection.ex | 8 +-- test/test_support.exs | 4 ++ 5 files changed, 75 insertions(+), 10 deletions(-) diff --git a/examples/tcp_connection/lib/tcp_connection.ex b/examples/tcp_connection/lib/tcp_connection.ex index 29a1704..ebc1b7b 100644 --- a/examples/tcp_connection/lib/tcp_connection.ex +++ b/examples/tcp_connection/lib/tcp_connection.ex @@ -148,8 +148,16 @@ defmodule TCPConnection do # The handle_info callback is optional and can be removed if not needed. # Here it is used to react to `:inet.monitor/1` messages which arrive # when socket gets closed while the connection is idle. - def handle_info({:DOWN, _ref, _type, sock, _info}, {sock, _buffer} = s) do - {:disconnect, TCPConnection.Error.exception({:idle, :closed}), s} + def handle_info({:DOWN, _ref, _type, sock, _info}, {sock, _buffer}) do + {:disconnect, TCPConnection.Error.exception({:idle, :closed})} + end + + def handle_info(msg, state) do + Logger.info(fn -> + ["#{__MODULE__} (", inspect(self()), ") missed message: ", inspect(msg)] + end) + + :ok end @impl true diff --git a/integration_test/cases/info_test.exs b/integration_test/cases/info_test.exs index 1226c27..5d815d5 100644 --- a/integration_test/cases/info_test.exs +++ b/integration_test/cases/info_test.exs @@ -1,5 +1,60 @@ -defmodule TestInfo do +defmodule InfoTest do use ExUnit.Case, async: true + alias TestPool, as: P + alias TestAgent, as: A - test "handle_info returns new state" + test "handle_info handles message and moves on" do + stack = [ + fn opts -> + send(opts[:parent], {:connected, self()}) + {:ok, :state} + end, + :ok, + {:idle, :state}, + {:idle, :state} + ] + + {:ok, agent} = A.start_link(stack) + {:ok, pool} = P.start_link(agent: agent, parent: self()) + + assert_receive {:connected, conn} + send(conn, "some harmless message") + assert P.run(pool, fn _ -> :result end) == :result + + assert [ + connect: _, + handle_info: _, + handle_status: _, + handle_status: _ + ] = A.record(agent) + end + + test "handle_info can force disconnect" do + stack = [ + fn opts -> + send(opts[:parent], {:connected, self()}) + {:ok, :state} + end, + {:disconnect, :reason}, + :ok, + fn opts -> + send(opts[:parent], :reconnected) + {:ok, :state} + end + ] + + {:ok, agent} = A.start_link(stack) + P.start_link(agent: agent, parent: self()) + + assert_receive {:connected, conn} + send(conn, "some harmful message") + assert_receive :reconnected + + assert [ + connect: _, + handle_info: _, + disconnect: _, + connect: _ + ] = A.record(agent) + end end diff --git a/integration_test/connection_pool/disconnect_all_test.exs b/integration_test/connection_pool/disconnect_all_test.exs index 1b639ef..d2698cd 100644 --- a/integration_test/connection_pool/disconnect_all_test.exs +++ b/integration_test/connection_pool/disconnect_all_test.exs @@ -80,6 +80,4 @@ defmodule TestPoolDisconnectAll do handle_execute: [_, _, _, :final_state1] ] = A.record(agent) end - - test "disconnect on info" end diff --git a/lib/db_connection/connection.ex b/lib/db_connection/connection.ex index f6934fd..b1c120f 100644 --- a/lib/db_connection/connection.ex +++ b/lib/db_connection/connection.ex @@ -333,11 +333,11 @@ defmodule DBConnection.Connection do def handle_event(:info, msg, :no_state, %{mod: mod, state: state} = s) do if function_exported?(mod, :handle_info, 2) do case apply(mod, :handle_info, [msg, state]) do - {:ok, state} -> - pool_update(state, s) + :ok -> + handle_timeout(s) - {:disconnect, err, state} -> - {:keep_state, %{s | state: state}, {:next_event, :internal, {:disconnect, {:log, err}}}} + {:disconnect, err} -> + {:keep_state, s, {:next_event, :internal, {:disconnect, {:log, err}}}} end else Logger.info(fn -> diff --git a/test/test_support.exs b/test/test_support.exs index dcbe445..a63818d 100644 --- a/test/test_support.exs +++ b/test/test_support.exs @@ -150,6 +150,10 @@ defmodule TestConnection do TestAgent.eval(:handle_deallocate, [query, cursor, opts, state]) end + def handle_info(message, state) do + TestAgent.eval(:handle_info, [message, state]) + end + defp put_agent_from_opts(opts) do Process.get(:agent) || Process.put(:agent, agent_from_opts(opts)) end From 595439a2574ad58f38e867881dfd06af12b9713e Mon Sep 17 00:00:00 2001 From: ruslandoga <67764432+ruslandoga@users.noreply.github.com> Date: Mon, 11 Nov 2024 18:27:19 +0700 Subject: [PATCH 5/8] add another failing test --- integration_test/cases/info_test.exs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/integration_test/cases/info_test.exs b/integration_test/cases/info_test.exs index 5d815d5..ff5273e 100644 --- a/integration_test/cases/info_test.exs +++ b/integration_test/cases/info_test.exs @@ -57,4 +57,6 @@ defmodule InfoTest do connect: _ ] = A.record(agent) end + + test "handle_info's disconnect with connection checked out" end From a3ece3c382fec7557494caf0ddcaf4717dca4c48 Mon Sep 17 00:00:00 2001 From: ruslandoga <67764432+ruslandoga@users.noreply.github.com> Date: Mon, 11 Nov 2024 19:48:19 +0700 Subject: [PATCH 6/8] make test pass --- integration_test/cases/info_test.exs | 44 ++++++++++++++++++++++++++-- 1 file changed, 42 insertions(+), 2 deletions(-) diff --git a/integration_test/cases/info_test.exs b/integration_test/cases/info_test.exs index ff5273e..ee88121 100644 --- a/integration_test/cases/info_test.exs +++ b/integration_test/cases/info_test.exs @@ -1,7 +1,9 @@ defmodule InfoTest do use ExUnit.Case, async: true + alias TestPool, as: P alias TestAgent, as: A + alias TestQuery, as: Q test "handle_info handles message and moves on" do stack = [ @@ -47,7 +49,7 @@ defmodule InfoTest do P.start_link(agent: agent, parent: self()) assert_receive {:connected, conn} - send(conn, "some harmful message") + send(conn, "some harmful message that casuses disconnect") assert_receive :reconnected assert [ @@ -58,5 +60,43 @@ defmodule InfoTest do ] = A.record(agent) end - test "handle_info's disconnect with connection checked out" + test "handle_info's disconnect while checked out client crashes is no-op" do + stack = [ + fn _opts -> + {:ok, %{conn_pid: self()}} + end, + fn _query, _params, _opts, %{conn_pid: conn_pid} -> + send( + conn_pid, + "some harmful message that causes disconnect while conneciton is checked out" + ) + + # This waits for the info message to be processed in the connection. + :sys.get_state(conn_pid) + {:disconnect, :closed, :new_state} + end, + {:disconnect, :closed}, + :ok, + fn opts -> + send(opts[:parent], :reconnected) + {:ok, :state} + end + ] + + parent = self() + + {:ok, agent} = A.start_link(stack) + {:ok, pool} = P.start_link(agent: agent, parent: parent) + + assert {:error, :closed} = P.execute(pool, %Q{}, [:first]) + assert_receive :reconnected + + assert [ + connect: _, + handle_execute: _, + handle_info: _, + disconnect: _, + connect: _ + ] = A.record(agent) + end end From a5cd7d7323fb4692393776afebf66c71ecf5064d Mon Sep 17 00:00:00 2001 From: ruslandoga <67764432+ruslandoga@users.noreply.github.com> Date: Mon, 11 Nov 2024 19:57:40 +0700 Subject: [PATCH 7/8] cleanup --- integration_test/cases/info_test.exs | 26 ++++++++++++-------------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/integration_test/cases/info_test.exs b/integration_test/cases/info_test.exs index ee88121..dea3f6b 100644 --- a/integration_test/cases/info_test.exs +++ b/integration_test/cases/info_test.exs @@ -5,7 +5,7 @@ defmodule InfoTest do alias TestAgent, as: A alias TestQuery, as: Q - test "handle_info handles message and moves on" do + test "handle_info handles harmless message and moves on" do stack = [ fn opts -> send(opts[:parent], {:connected, self()}) @@ -37,7 +37,7 @@ defmodule InfoTest do send(opts[:parent], {:connected, self()}) {:ok, :state} end, - {:disconnect, :reason}, + {:disconnect, RuntimeError.exception("TCP connection just closed")}, :ok, fn opts -> send(opts[:parent], :reconnected) @@ -49,7 +49,7 @@ defmodule InfoTest do P.start_link(agent: agent, parent: self()) assert_receive {:connected, conn} - send(conn, "some harmful message that casuses disconnect") + send(conn, "monitor says TCP connection just closed") assert_receive :reconnected assert [ @@ -66,16 +66,14 @@ defmodule InfoTest do {:ok, %{conn_pid: self()}} end, fn _query, _params, _opts, %{conn_pid: conn_pid} -> - send( - conn_pid, - "some harmful message that causes disconnect while conneciton is checked out" - ) + send(conn_pid, "monitor says TCP connection just closed") - # This waits for the info message to be processed in the connection. + # This waits for the info message to be processed. :sys.get_state(conn_pid) - {:disconnect, :closed, :new_state} + + {:disconnect, RuntimeError.exception("TCP connection is closed"), :new_state} end, - {:disconnect, :closed}, + {:disconnect, RuntimeError.exception("TCP connection just closed")}, :ok, fn opts -> send(opts[:parent], :reconnected) @@ -83,12 +81,12 @@ defmodule InfoTest do end ] - parent = self() - {:ok, agent} = A.start_link(stack) - {:ok, pool} = P.start_link(agent: agent, parent: parent) + {:ok, pool} = P.start_link(agent: agent, parent: self()) + + assert {:error, %RuntimeError{message: "TCP connection is closed"}} = + P.execute(pool, %Q{}, [:first]) - assert {:error, :closed} = P.execute(pool, %Q{}, [:first]) assert_receive :reconnected assert [ From f4e74de89c491a1c0a1d264a634052b5af147818 Mon Sep 17 00:00:00 2001 From: ruslandoga <67764432+ruslandoga@users.noreply.github.com> Date: Mon, 11 Nov 2024 19:58:41 +0700 Subject: [PATCH 8/8] cleanup x2 --- integration_test/cases/info_test.exs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration_test/cases/info_test.exs b/integration_test/cases/info_test.exs index dea3f6b..7d187c8 100644 --- a/integration_test/cases/info_test.exs +++ b/integration_test/cases/info_test.exs @@ -85,7 +85,7 @@ defmodule InfoTest do {:ok, pool} = P.start_link(agent: agent, parent: self()) assert {:error, %RuntimeError{message: "TCP connection is closed"}} = - P.execute(pool, %Q{}, [:first]) + P.execute(pool, %Q{}, []) assert_receive :reconnected