diff --git a/lib/ch/connection.ex b/lib/ch/connection.ex index e260819..09ba17e 100644 --- a/lib/ch/connection.ex +++ b/lib/ch/connection.ex @@ -12,20 +12,7 @@ defmodule Ch.Connection do @impl true @spec connect([Ch.start_option()]) :: {:ok, conn} | {:error, Error.t() | Mint.Types.error()} def connect(opts) do - scheme = String.to_existing_atom(opts[:scheme] || "http") - address = opts[:hostname] || "localhost" - port = opts[:port] || 8123 - mint_opts = [mode: :passive] ++ Keyword.take(opts, [:hostname, :transport_opts]) - - with {:ok, conn} <- HTTP.connect(scheme, address, port, mint_opts) do - conn = - conn - |> HTTP.put_private(:timeout, opts[:timeout] || :timer.seconds(15)) - |> maybe_put_private(:database, opts[:database]) - |> maybe_put_private(:username, opts[:username]) - |> maybe_put_private(:password, opts[:password]) - |> maybe_put_private(:settings, opts[:settings]) - + with {:ok, conn} <- do_connect(opts) do handshake = Query.build("select 1") params = DBConnection.Query.encode(handshake, _params = [], _opts = []) @@ -50,13 +37,12 @@ defmodule Ch.Connection do {:error, reason} end end - catch - _kind, reason -> {:error, reason} end @impl true @spec ping(conn) :: {:ok, conn} | {:disconnect, Mint.Types.error() | Error.t(), conn} def ping(conn) do + conn = maybe_reconnect(conn) headers = [{"user-agent", @user_agent}] case request(conn, "GET", "/ping", headers, _body = "", _opts = []) do @@ -94,6 +80,7 @@ defmodule Ch.Connection do @impl true def handle_declare(query, params, opts, conn) do + conn = maybe_reconnect(conn) %Query{command: command} = query {query_params, extra_headers, body} = params @@ -193,6 +180,7 @@ defmodule Ch.Connection do @impl true def handle_execute(%Query{} = query, {:stream, params}, opts, conn) do + conn = maybe_reconnect(conn) {query_params, extra_headers, body} = params path = path(conn, query_params, opts) @@ -225,6 +213,7 @@ defmodule Ch.Connection do end def handle_execute(%Query{command: :insert} = query, params, opts, conn) do + conn = maybe_reconnect(conn) {query_params, extra_headers, body} = params path = path(conn, query_params, opts) @@ -243,6 +232,7 @@ defmodule Ch.Connection do end def handle_execute(query, params, opts, conn) do + conn = maybe_reconnect(conn) {query_params, extra_headers, body} = params path = path(conn, query_params, opts) @@ -401,6 +391,49 @@ defmodule Ch.Connection do "/?" <> URI.encode_query(settings ++ query_params) end + # If the http connection was closed by the server, attempt to + # reconnect once. If the re-connect failed, return the old + # connection and let the error bubble up to the caller. + defp maybe_reconnect(conn) do + if HTTP.open?(conn) do + conn + else + opts = HTTP.get_private(conn, :connect_options) + + with {:ok, new_conn} <- do_connect(opts) do + Logger.warning( + "The connection was closed by the server; a new connection has been successfully reestablished." + ) + + new_conn + else + _ -> conn + end + end + end + + defp do_connect(opts) do + scheme = String.to_existing_atom(opts[:scheme] || "http") + address = opts[:hostname] || "localhost" + port = opts[:port] || 8123 + mint_opts = [mode: :passive] ++ Keyword.take(opts, [:hostname, :transport_opts]) + + with {:ok, conn} <- HTTP.connect(scheme, address, port, mint_opts) do + conn = + conn + |> HTTP.put_private(:timeout, opts[:timeout] || :timer.seconds(15)) + |> maybe_put_private(:database, opts[:database]) + |> maybe_put_private(:username, opts[:username]) + |> maybe_put_private(:password, opts[:password]) + |> maybe_put_private(:settings, opts[:settings]) + |> maybe_put_private(:connect_options, opts) + + {:ok, conn} + end + catch + _kind, reason -> {:error, reason} + end + @server_display_name_key :server_display_name @spec ensure_same_server(conn, Mint.Types.headers()) :: conn diff --git a/test/ch/faults_test.exs b/test/ch/faults_test.exs index 3efd477..921a177 100644 --- a/test/ch/faults_test.exs +++ b/test/ch/faults_test.exs @@ -340,6 +340,65 @@ defmodule Ch.FaultsTest do assert log =~ "disconnected: ** (Mint.TransportError) socket closed" end + test "reconnects after Connection: close response from server", ctx do + %{port: port, listen: listen, clickhouse: clickhouse} = ctx + test = self() + + log = + capture_async_log(fn -> + {:ok, conn} = Ch.start_link(port: port) + + # connect + {:ok, mint} = :gen_tcp.accept(listen) + + # handshake + :ok = :gen_tcp.send(clickhouse, intercept_packets(mint)) + :ok = :gen_tcp.send(mint, intercept_packets(clickhouse)) + + spawn_link(fn -> + assert {:ok, %{num_rows: 1, rows: [[2]]}} = Ch.query(conn, "select 1 + 1") + send(test, :done) + end) + + # first select 1 + 1 + :ok = :gen_tcp.send(clickhouse, intercept_packets(mint)) + + response = + String.replace( + intercept_packets(clickhouse), + "Connection: Keep-Alive", + "Connection: Close" + ) + + :ok = :gen_tcp.send(mint, response) + :ok = :gen_tcp.close(mint) + assert_receive :done + + # reconnect + {:ok, mint} = :gen_tcp.accept(listen) + + # handshake + :ok = :gen_tcp.send(clickhouse, intercept_packets(mint)) + :ok = :gen_tcp.send(mint, intercept_packets(clickhouse)) + + spawn_link(fn -> + assert {:ok, %{num_rows: 1, rows: [[2]]}} = + Ch.query(conn, "select 1 + 1") + + send(test, :done) + end) + + # select 1 + 1 + :ok = :gen_tcp.send(clickhouse, intercept_packets(mint)) + :ok = :gen_tcp.send(mint, intercept_packets(clickhouse)) + + assert_receive :done + end) + + refute log =~ "disconnected: **" + assert log =~ "connection was closed by the server" + end + # TODO non-chunked request test "reconnects after closed before streaming request", ctx do