Skip to content

Commit

Permalink
hm...
Browse files Browse the repository at this point in the history
  • Loading branch information
ruslandoga committed Nov 12, 2024
1 parent 7a0da6b commit 3daddee
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 61 deletions.
86 changes: 30 additions & 56 deletions lib/ch/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,23 @@ defmodule Ch.Connection do
@impl true
@spec connect([Ch.start_option()]) :: {:ok, conn} | {:error, Error.t() | Mint.Types.error()}
def connect(opts) do
with {:ok, conn} <- do_connect(opts) do
scheme = String.to_existing_atom(opts[:scheme] || "http")
address = opts[:hostname] || "localhost"
port = opts[:port] || 8123
mint_opts = [mode: :passive] ++ Keyword.take(opts, [:hostname, :transport_opts])

with {:ok, conn} <- HTTP.connect(scheme, address, port, mint_opts) do
monitor_socket(conn.socket)
IO.inspect(conn.socket, label: "connect")

conn =
conn
|> HTTP.put_private(:timeout, opts[:timeout] || :timer.seconds(15))
|> maybe_put_private(:database, opts[:database])
|> maybe_put_private(:username, opts[:username])
|> maybe_put_private(:password, opts[:password])
|> maybe_put_private(:settings, opts[:settings])

handshake = Query.build("select 1")
params = DBConnection.Query.encode(handshake, _params = [], _opts = [])

Expand All @@ -37,12 +53,13 @@ defmodule Ch.Connection do
{:error, reason}
end
end
catch
_kind, reason -> {:error, reason}
end

@impl true
@spec ping(conn) :: {:ok, conn} | {:disconnect, Mint.Types.error() | Error.t(), conn}
def ping(conn) do
conn = maybe_reconnect(conn)
headers = [{"user-agent", @user_agent}]

case request(conn, "GET", "/ping", headers, _body = "", _opts = []) do
Expand Down Expand Up @@ -80,7 +97,6 @@ defmodule Ch.Connection do

@impl true
def handle_declare(query, params, opts, conn) do
conn = maybe_reconnect(conn)
%Query{command: command} = query
{query_params, extra_headers, body} = params

Expand Down Expand Up @@ -180,7 +196,6 @@ defmodule Ch.Connection do

@impl true
def handle_execute(%Query{} = query, {:stream, params}, opts, conn) do
conn = maybe_reconnect(conn)
{query_params, extra_headers, body} = params

path = path(conn, query_params, opts)
Expand Down Expand Up @@ -213,7 +228,6 @@ defmodule Ch.Connection do
end

def handle_execute(%Query{command: :insert} = query, params, opts, conn) do
conn = maybe_reconnect(conn)
{query_params, extra_headers, body} = params

path = path(conn, query_params, opts)
Expand All @@ -232,7 +246,7 @@ defmodule Ch.Connection do
end

def handle_execute(query, params, opts, conn) do
conn = maybe_reconnect(conn)
IO.puts("query incoming #{inspect(query)} #{inspect(conn.socket)}")
{query_params, extra_headers, body} = params

path = path(conn, query_params, opts)
Expand All @@ -244,11 +258,15 @@ defmodule Ch.Connection do
end

def handle_info({:DOWN, _ref, :port, socket, _reason}, conn) do
if conn.socket == socket do
{:disconnect, Mint.TransportError.exception(reason: :closed)}
else
:ok
end
IO.puts("socket #{inspect(socket)} closed")

IO.inspect(
if conn.socket == socket do
{:disconnect, Mint.TransportError.exception(reason: :closed)}
else
:ok
end
)
end

def handle_info(msg, _state) do
Expand All @@ -257,6 +275,7 @@ defmodule Ch.Connection do

@impl true
def disconnect(_error, conn) do
IO.puts("disconnect #{inspect(conn.socket)}")
{:ok = ok, _conn} = HTTP.close(conn)
ok
end
Expand Down Expand Up @@ -403,51 +422,6 @@ defmodule Ch.Connection do
"/?" <> URI.encode_query(settings ++ query_params)
end

# If the http connection was closed by the server, attempt to
# reconnect once. If the re-connect failed, return the old
# connection and let the error bubble up to the caller.
defp maybe_reconnect(conn) do
if HTTP.open?(conn) do
conn
else
opts = HTTP.get_private(conn, :connect_options)

with {:ok, new_conn} <- do_connect(opts) do
Logger.warning(
"The connection was closed by the server; a new connection has been successfully reestablished."
)

new_conn
else
_ -> conn
end
end
end

defp do_connect(opts) do
scheme = String.to_existing_atom(opts[:scheme] || "http")
address = opts[:hostname] || "localhost"
port = opts[:port] || 8123
mint_opts = [mode: :passive] ++ Keyword.take(opts, [:hostname, :transport_opts])

with {:ok, conn} <- HTTP.connect(scheme, address, port, mint_opts) do
_ref = monitor_socket(conn.socket)

conn =
conn
|> HTTP.put_private(:timeout, opts[:timeout] || :timer.seconds(15))
|> maybe_put_private(:database, opts[:database])
|> maybe_put_private(:username, opts[:username])
|> maybe_put_private(:password, opts[:password])
|> maybe_put_private(:settings, opts[:settings])
|> maybe_put_private(:connect_options, opts)

{:ok, conn}
end
catch
_kind, reason -> {:error, reason}
end

@server_display_name_key :server_display_name

@spec ensure_same_server(conn, Mint.Types.headers()) :: conn
Expand Down
10 changes: 5 additions & 5 deletions test/ch/faults_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ defmodule Ch.FaultsTest do
assert_receive :done
end)

assert log =~ "disconnected: ** (Mint.TransportError) timeout"
assert log =~ "disconnected: ** (Mint.TransportError) socket closed"
end

test "reconnects after closed on response", ctx do
Expand Down Expand Up @@ -340,7 +340,7 @@ defmodule Ch.FaultsTest do
assert log =~ "disconnected: ** (Mint.TransportError) socket closed"
end

test "reconnects after Connection: close response from server", ctx do
test "reconnects after `connection: close` response from server", ctx do

Check failure on line 343 in test/ch/faults_test.exs

View workflow job for this annotation

GitHub Actions / test (1.14, 25, latest, UTC)

test query reconnects after `connection: close` response from server (Ch.FaultsTest)

Check failure on line 343 in test/ch/faults_test.exs

View workflow job for this annotation

GitHub Actions / test (1.15, 25, latest, UTC)

test query reconnects after `connection: close` response from server (Ch.FaultsTest)

Check failure on line 343 in test/ch/faults_test.exs

View workflow job for this annotation

GitHub Actions / test (1.15, 26, latest, UTC)

test query reconnects after `connection: close` response from server (Ch.FaultsTest)

Check failure on line 343 in test/ch/faults_test.exs

View workflow job for this annotation

GitHub Actions / test (1.16, 25, latest, UTC)

test query reconnects after `connection: close` response from server (Ch.FaultsTest)

Check failure on line 343 in test/ch/faults_test.exs

View workflow job for this annotation

GitHub Actions / test (1.16, 26, latest, UTC)

test query reconnects after `connection: close` response from server (Ch.FaultsTest)

Check failure on line 343 in test/ch/faults_test.exs

View workflow job for this annotation

GitHub Actions / test (1.17, 27, latest, Europe/Berlin)

test query reconnects after `connection: close` response from server (Ch.FaultsTest)

Check failure on line 343 in test/ch/faults_test.exs

View workflow job for this annotation

GitHub Actions / test (1.17, 27, latest, UTC)

test query reconnects after `connection: close` response from server (Ch.FaultsTest)

Check failure on line 343 in test/ch/faults_test.exs

View workflow job for this annotation

GitHub Actions / test (1.17.1, 27, 24.8.5.115, UTC)

test query reconnects after `connection: close` response from server (Ch.FaultsTest)
%{port: port, listen: listen, clickhouse: clickhouse} = ctx
test = self()

Expand All @@ -357,7 +357,6 @@ defmodule Ch.FaultsTest do

spawn_link(fn ->
assert {:ok, %{num_rows: 1, rows: [[2]]}} = Ch.query(conn, "select 1 + 1")
send(test, :done)
end)

# first select 1 + 1
Expand All @@ -372,7 +371,6 @@ defmodule Ch.FaultsTest do

:ok = :gen_tcp.send(mint, response)
:ok = :gen_tcp.close(mint)
assert_receive :done

# reconnect
{:ok, mint} = :gen_tcp.accept(listen)
Expand All @@ -382,8 +380,10 @@ defmodule Ch.FaultsTest do
:ok = :gen_tcp.send(mint, intercept_packets(clickhouse))

spawn_link(fn ->
IO.puts("second spawn")

assert {:ok, %{num_rows: 1, rows: [[2]]}} =
Ch.query(conn, "select 1 + 1")
Ch.query(conn, "select 1 + 1") |> IO.inspect(label: "second fun")

send(test, :done)
end)
Expand Down

0 comments on commit 3daddee

Please sign in to comment.