From 11d67ef330b2f23ae23288e83f03fdfbb2e35f75 Mon Sep 17 00:00:00 2001 From: Anantha Kumaran Date: Sat, 9 Nov 2024 12:36:42 +0530 Subject: [PATCH] Handle Connection: close response from server A server may return `Connection: close` while gracefully shutting down. Mint will mark the connection state as closed and return the response from the server (NOTE: it will not return any error). Since the client doesn't know the connection is closed, it will try to use the connection for the next request and will fail. Before making any HTTP request, check the internal state of the connection and attempt to reconnect once if necessary. partially related to https://github.com/plausible/ch/issues/210 --- lib/ch/connection.ex | 65 +++++++++++++++++++++++++++++++---------- test/ch/faults_test.exs | 59 +++++++++++++++++++++++++++++++++++++ 2 files changed, 108 insertions(+), 16 deletions(-) diff --git a/lib/ch/connection.ex b/lib/ch/connection.ex index e260819..09ba17e 100644 --- a/lib/ch/connection.ex +++ b/lib/ch/connection.ex @@ -12,20 +12,7 @@ defmodule Ch.Connection do @impl true @spec connect([Ch.start_option()]) :: {:ok, conn} | {:error, Error.t() | Mint.Types.error()} def connect(opts) do - scheme = String.to_existing_atom(opts[:scheme] || "http") - address = opts[:hostname] || "localhost" - port = opts[:port] || 8123 - mint_opts = [mode: :passive] ++ Keyword.take(opts, [:hostname, :transport_opts]) - - with {:ok, conn} <- HTTP.connect(scheme, address, port, mint_opts) do - conn = - conn - |> HTTP.put_private(:timeout, opts[:timeout] || :timer.seconds(15)) - |> maybe_put_private(:database, opts[:database]) - |> maybe_put_private(:username, opts[:username]) - |> maybe_put_private(:password, opts[:password]) - |> maybe_put_private(:settings, opts[:settings]) - + with {:ok, conn} <- do_connect(opts) do handshake = Query.build("select 1") params = DBConnection.Query.encode(handshake, _params = [], _opts = []) @@ -50,13 +37,12 @@ defmodule Ch.Connection do {:error, reason} end end - catch - _kind, reason -> {:error, reason} end @impl true @spec ping(conn) :: {:ok, conn} | {:disconnect, Mint.Types.error() | Error.t(), conn} def ping(conn) do + conn = maybe_reconnect(conn) headers = [{"user-agent", @user_agent}] case request(conn, "GET", "/ping", headers, _body = "", _opts = []) do @@ -94,6 +80,7 @@ defmodule Ch.Connection do @impl true def handle_declare(query, params, opts, conn) do + conn = maybe_reconnect(conn) %Query{command: command} = query {query_params, extra_headers, body} = params @@ -193,6 +180,7 @@ defmodule Ch.Connection do @impl true def handle_execute(%Query{} = query, {:stream, params}, opts, conn) do + conn = maybe_reconnect(conn) {query_params, extra_headers, body} = params path = path(conn, query_params, opts) @@ -225,6 +213,7 @@ defmodule Ch.Connection do end def handle_execute(%Query{command: :insert} = query, params, opts, conn) do + conn = maybe_reconnect(conn) {query_params, extra_headers, body} = params path = path(conn, query_params, opts) @@ -243,6 +232,7 @@ defmodule Ch.Connection do end def handle_execute(query, params, opts, conn) do + conn = maybe_reconnect(conn) {query_params, extra_headers, body} = params path = path(conn, query_params, opts) @@ -401,6 +391,49 @@ defmodule Ch.Connection do "/?" <> URI.encode_query(settings ++ query_params) end + # If the http connection was closed by the server, attempt to + # reconnect once. If the re-connect failed, return the old + # connection and let the error bubble up to the caller. + defp maybe_reconnect(conn) do + if HTTP.open?(conn) do + conn + else + opts = HTTP.get_private(conn, :connect_options) + + with {:ok, new_conn} <- do_connect(opts) do + Logger.warning( + "The connection was closed by the server; a new connection has been successfully reestablished." + ) + + new_conn + else + _ -> conn + end + end + end + + defp do_connect(opts) do + scheme = String.to_existing_atom(opts[:scheme] || "http") + address = opts[:hostname] || "localhost" + port = opts[:port] || 8123 + mint_opts = [mode: :passive] ++ Keyword.take(opts, [:hostname, :transport_opts]) + + with {:ok, conn} <- HTTP.connect(scheme, address, port, mint_opts) do + conn = + conn + |> HTTP.put_private(:timeout, opts[:timeout] || :timer.seconds(15)) + |> maybe_put_private(:database, opts[:database]) + |> maybe_put_private(:username, opts[:username]) + |> maybe_put_private(:password, opts[:password]) + |> maybe_put_private(:settings, opts[:settings]) + |> maybe_put_private(:connect_options, opts) + + {:ok, conn} + end + catch + _kind, reason -> {:error, reason} + end + @server_display_name_key :server_display_name @spec ensure_same_server(conn, Mint.Types.headers()) :: conn diff --git a/test/ch/faults_test.exs b/test/ch/faults_test.exs index 3efd477..921a177 100644 --- a/test/ch/faults_test.exs +++ b/test/ch/faults_test.exs @@ -340,6 +340,65 @@ defmodule Ch.FaultsTest do assert log =~ "disconnected: ** (Mint.TransportError) socket closed" end + test "reconnects after Connection: close response from server", ctx do + %{port: port, listen: listen, clickhouse: clickhouse} = ctx + test = self() + + log = + capture_async_log(fn -> + {:ok, conn} = Ch.start_link(port: port) + + # connect + {:ok, mint} = :gen_tcp.accept(listen) + + # handshake + :ok = :gen_tcp.send(clickhouse, intercept_packets(mint)) + :ok = :gen_tcp.send(mint, intercept_packets(clickhouse)) + + spawn_link(fn -> + assert {:ok, %{num_rows: 1, rows: [[2]]}} = Ch.query(conn, "select 1 + 1") + send(test, :done) + end) + + # first select 1 + 1 + :ok = :gen_tcp.send(clickhouse, intercept_packets(mint)) + + response = + String.replace( + intercept_packets(clickhouse), + "Connection: Keep-Alive", + "Connection: Close" + ) + + :ok = :gen_tcp.send(mint, response) + :ok = :gen_tcp.close(mint) + assert_receive :done + + # reconnect + {:ok, mint} = :gen_tcp.accept(listen) + + # handshake + :ok = :gen_tcp.send(clickhouse, intercept_packets(mint)) + :ok = :gen_tcp.send(mint, intercept_packets(clickhouse)) + + spawn_link(fn -> + assert {:ok, %{num_rows: 1, rows: [[2]]}} = + Ch.query(conn, "select 1 + 1") + + send(test, :done) + end) + + # select 1 + 1 + :ok = :gen_tcp.send(clickhouse, intercept_packets(mint)) + :ok = :gen_tcp.send(mint, intercept_packets(clickhouse)) + + assert_receive :done + end) + + refute log =~ "disconnected: **" + assert log =~ "connection was closed by the server" + end + # TODO non-chunked request test "reconnects after closed before streaming request", ctx do