From 3daddee61d75eb6e12bafd6d5695cd4c5a24ac59 Mon Sep 17 00:00:00 2001 From: ruslandoga <67764432+ruslandoga@users.noreply.github.com> Date: Tue, 12 Nov 2024 22:18:42 +0700 Subject: [PATCH] hm... --- lib/ch/connection.ex | 86 ++++++++++++++--------------------------- test/ch/faults_test.exs | 10 ++--- 2 files changed, 35 insertions(+), 61 deletions(-) diff --git a/lib/ch/connection.ex b/lib/ch/connection.ex index 480b9cd..ecab30d 100644 --- a/lib/ch/connection.ex +++ b/lib/ch/connection.ex @@ -12,7 +12,23 @@ defmodule Ch.Connection do @impl true @spec connect([Ch.start_option()]) :: {:ok, conn} | {:error, Error.t() | Mint.Types.error()} def connect(opts) do - with {:ok, conn} <- do_connect(opts) do + scheme = String.to_existing_atom(opts[:scheme] || "http") + address = opts[:hostname] || "localhost" + port = opts[:port] || 8123 + mint_opts = [mode: :passive] ++ Keyword.take(opts, [:hostname, :transport_opts]) + + with {:ok, conn} <- HTTP.connect(scheme, address, port, mint_opts) do + monitor_socket(conn.socket) + IO.inspect(conn.socket, label: "connect") + + conn = + conn + |> HTTP.put_private(:timeout, opts[:timeout] || :timer.seconds(15)) + |> maybe_put_private(:database, opts[:database]) + |> maybe_put_private(:username, opts[:username]) + |> maybe_put_private(:password, opts[:password]) + |> maybe_put_private(:settings, opts[:settings]) + handshake = Query.build("select 1") params = DBConnection.Query.encode(handshake, _params = [], _opts = []) @@ -37,12 +53,13 @@ defmodule Ch.Connection do {:error, reason} end end + catch + _kind, reason -> {:error, reason} end @impl true @spec ping(conn) :: {:ok, conn} | {:disconnect, Mint.Types.error() | Error.t(), conn} def ping(conn) do - conn = maybe_reconnect(conn) headers = [{"user-agent", @user_agent}] case request(conn, "GET", "/ping", headers, _body = "", _opts = []) do @@ -80,7 +97,6 @@ defmodule Ch.Connection do @impl true def handle_declare(query, params, opts, conn) do - conn = maybe_reconnect(conn) %Query{command: command} = query {query_params, extra_headers, body} = params @@ -180,7 +196,6 @@ defmodule Ch.Connection do @impl true def handle_execute(%Query{} = query, {:stream, params}, opts, conn) do - conn = maybe_reconnect(conn) {query_params, extra_headers, body} = params path = path(conn, query_params, opts) @@ -213,7 +228,6 @@ defmodule Ch.Connection do end def handle_execute(%Query{command: :insert} = query, params, opts, conn) do - conn = maybe_reconnect(conn) {query_params, extra_headers, body} = params path = path(conn, query_params, opts) @@ -232,7 +246,7 @@ defmodule Ch.Connection do end def handle_execute(query, params, opts, conn) do - conn = maybe_reconnect(conn) + IO.puts("query incoming #{inspect(query)} #{inspect(conn.socket)}") {query_params, extra_headers, body} = params path = path(conn, query_params, opts) @@ -244,11 +258,15 @@ defmodule Ch.Connection do end def handle_info({:DOWN, _ref, :port, socket, _reason}, conn) do - if conn.socket == socket do - {:disconnect, Mint.TransportError.exception(reason: :closed)} - else - :ok - end + IO.puts("socket #{inspect(socket)} closed") + + IO.inspect( + if conn.socket == socket do + {:disconnect, Mint.TransportError.exception(reason: :closed)} + else + :ok + end + ) end def handle_info(msg, _state) do @@ -257,6 +275,7 @@ defmodule Ch.Connection do @impl true def disconnect(_error, conn) do + IO.puts("disconnect #{inspect(conn.socket)}") {:ok = ok, _conn} = HTTP.close(conn) ok end @@ -403,51 +422,6 @@ defmodule Ch.Connection do "/?" <> URI.encode_query(settings ++ query_params) end - # If the http connection was closed by the server, attempt to - # reconnect once. If the re-connect failed, return the old - # connection and let the error bubble up to the caller. - defp maybe_reconnect(conn) do - if HTTP.open?(conn) do - conn - else - opts = HTTP.get_private(conn, :connect_options) - - with {:ok, new_conn} <- do_connect(opts) do - Logger.warning( - "The connection was closed by the server; a new connection has been successfully reestablished." - ) - - new_conn - else - _ -> conn - end - end - end - - defp do_connect(opts) do - scheme = String.to_existing_atom(opts[:scheme] || "http") - address = opts[:hostname] || "localhost" - port = opts[:port] || 8123 - mint_opts = [mode: :passive] ++ Keyword.take(opts, [:hostname, :transport_opts]) - - with {:ok, conn} <- HTTP.connect(scheme, address, port, mint_opts) do - _ref = monitor_socket(conn.socket) - - conn = - conn - |> HTTP.put_private(:timeout, opts[:timeout] || :timer.seconds(15)) - |> maybe_put_private(:database, opts[:database]) - |> maybe_put_private(:username, opts[:username]) - |> maybe_put_private(:password, opts[:password]) - |> maybe_put_private(:settings, opts[:settings]) - |> maybe_put_private(:connect_options, opts) - - {:ok, conn} - end - catch - _kind, reason -> {:error, reason} - end - @server_display_name_key :server_display_name @spec ensure_same_server(conn, Mint.Types.headers()) :: conn diff --git a/test/ch/faults_test.exs b/test/ch/faults_test.exs index 921a177..0ba6d21 100644 --- a/test/ch/faults_test.exs +++ b/test/ch/faults_test.exs @@ -291,7 +291,7 @@ defmodule Ch.FaultsTest do assert_receive :done end) - assert log =~ "disconnected: ** (Mint.TransportError) timeout" + assert log =~ "disconnected: ** (Mint.TransportError) socket closed" end test "reconnects after closed on response", ctx do @@ -340,7 +340,7 @@ defmodule Ch.FaultsTest do assert log =~ "disconnected: ** (Mint.TransportError) socket closed" end - test "reconnects after Connection: close response from server", ctx do + test "reconnects after `connection: close` response from server", ctx do %{port: port, listen: listen, clickhouse: clickhouse} = ctx test = self() @@ -357,7 +357,6 @@ defmodule Ch.FaultsTest do spawn_link(fn -> assert {:ok, %{num_rows: 1, rows: [[2]]}} = Ch.query(conn, "select 1 + 1") - send(test, :done) end) # first select 1 + 1 @@ -372,7 +371,6 @@ defmodule Ch.FaultsTest do :ok = :gen_tcp.send(mint, response) :ok = :gen_tcp.close(mint) - assert_receive :done # reconnect {:ok, mint} = :gen_tcp.accept(listen) @@ -382,8 +380,10 @@ defmodule Ch.FaultsTest do :ok = :gen_tcp.send(mint, intercept_packets(clickhouse)) spawn_link(fn -> + IO.puts("second spawn") + assert {:ok, %{num_rows: 1, rows: [[2]]}} = - Ch.query(conn, "select 1 + 1") + Ch.query(conn, "select 1 + 1") |> IO.inspect(label: "second fun") send(test, :done) end)