From a0fb72bf8aebae9c06ba5350fcf1e52718e23c9e Mon Sep 17 00:00:00 2001 From: ruslandoga <67764432+ruslandoga@users.noreply.github.com> Date: Tue, 14 Mar 2023 18:48:46 +0700 Subject: [PATCH] multihost --- lib/ch.ex | 29 ++++++++++-- lib/ch/connection.ex | 110 ++++++++++++++++++++++++++++++++++++++++--- lib/ch/health.ex | 97 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 227 insertions(+), 9 deletions(-) create mode 100644 lib/ch/health.ex diff --git a/lib/ch.ex b/lib/ch.ex index 4fc6a6c..3bdf3c0 100644 --- a/lib/ch.ex +++ b/lib/ch.ex @@ -1,13 +1,36 @@ defmodule Ch do @moduledoc "Minimal HTTP ClickHouse client" - alias Ch.{Connection, Query, Result} + alias Ch.{Connection, Query, Result, Health} def start_link(opts \\ []) do - DBConnection.start_link(Connection, opts) + if many_endpoints?(opts) do + children = [Health, DBConnection.child_spec(Connection, opts)] + Supervisor.start_link(children, strategy: :one_for_one) + else + DBConnection.start_link(Connection, opts) + end end + @spec child_spec(opts :: Keywort.t()) :: :supervisor.child_spec() def child_spec(opts) do - DBConnection.child_spec(Connection, opts) + if many_endpoints?(opts) do + children = [Health, DBConnection.child_spec(Connection, opts)] + + %{ + id: __MODULE__, + start: {Supervisor, :start_link, [children, [strategy: :one_for_one]]}, + type: :supervisor + } + else + DBConnection.child_spec(Connection, opts) + end + end + + defp many_endpoints?(opts) do + case Connection.endpoints(opts) do + [_endpoint] -> false + [_ | _] -> true + end end @spec query(DBConnection.conn(), iodata, {:raw, iodata} | Enumerable.t(), Keyword.t()) :: diff --git a/lib/ch/connection.ex b/lib/ch/connection.ex index d63fed1..474449b 100644 --- a/lib/ch/connection.ex +++ b/lib/ch/connection.ex @@ -5,19 +5,88 @@ defmodule Ch.Connection do alias Ch.{Error, RowBinary, Query, Result} alias Mint.HTTP1, as: HTTP + @type endpoint :: { + scheme :: Mint.Types.scheme(), + host :: Mint.Types.address(), + port :: :inet.port_number() + } + @typep conn :: HTTP.t() @impl true @spec connect(Keyword.t()) :: {:ok, conn} | {:error, Error.t() | Mint.Types.error()} def connect(opts) do - scheme = String.to_existing_atom(opts[:scheme] || "http") - address = opts[:hostname] || "localhost" - port = opts[:port] || 8123 + case endpoints(opts) do + [endpoint] -> connect(endpoint, opts) + endpoints -> connect_endpoints(endpoints, opts) + end + end + + @doc false + def endpoints(opts) do + case Keyword.fetch(opts, :endpoints) do + :error -> + scheme = String.to_existing_atom(opts[:scheme] || "http") + address = opts[:hostname] || "localhost" + port = opts[:port] || 8123 + [{scheme, address, port}] + + {:ok, endpoints} when is_list(endpoints) -> + endpoints = + Enum.map(endpoints, fn {scheme, host, port} -> + {String.to_existing_atom(scheme), host, port} + end) + + {:ok, endpoints} + + {:ok, _} -> + raise ArgumentError, "expected :endpoints to be a list of tuples" + end + end + + defp pick_endpoint(endpoints, opts) do + pool_index = Keyword.fetch!(opts, :pool_index) + Enum.at(endpoints, rem(pool_index, length(endpoints))) + end + + defp connect_endpoints(endpoints, opts) do + connect_endpoints(endpoints, [], opts) + end + + defp connect_endpoints(endpoints, errors, opts) do + attempted = Enum.map(errors, fn {endpoint, _error} -> endpoint end) + + case endpoints -- attempted do + [] -> + concat_messages = + errors + |> Enum.reverse() + |> Enum.map_join("\n", fn {endpoint, %error_module{} = error} -> + {scheme, host, port} = endpoint + " * #{scheme}://#{host}:#{port}: (#{inspect(error_module)}) #{Exception.message(error)}" + end) + + message = "failed to establish connection to multiple endpoints:\n\n" <> concat_messages + {:error, Error.exception(message)} + + available -> + endpoint = pick_endpoint(available, opts) + + case connect(endpoint, opts) do + {:ok, _conn} = ok -> ok + {:error, reason} -> connect_endpoints(endpoints, [{endpoint, reason} | errors], opts) + end + end + end + + @doc false + def connect({scheme, address, port}, opts) do with {:ok, conn} <- HTTP.connect(scheme, address, port, mode: :passive) do conn = conn - |> HTTP.put_private(:timeout, opts[:timeout] || :timer.seconds(15)) + |> HTTP.put_private(:opts, opts) + |> maybe_put_private(:timeout, opts[:timeout]) |> maybe_put_private(:database, opts[:database]) |> maybe_put_private(:username, opts[:username]) |> maybe_put_private(:password, opts[:password]) @@ -44,6 +113,36 @@ defmodule Ch.Connection do @impl true @spec ping(conn) :: {:ok, conn} | {:disconnect, Mint.Types.error() | Error.t(), conn} def ping(conn) do + current_endpoint = {conn.scheme_as_string, conn.host, conn.port} + opts = HTTP.get_private(conn, :opts) + + case endpoints(opts) do + [_endpoint] -> + do_ping(conn) + + endpoints -> + # TODO don't check just the intended endpoint by rather the history of attempts + intended_endpoint = pick_endpoint(endpoints, opts) + + if current_endpoint == intended_endpoint do + do_ping(conn) + else + if Ch.Health.is_alive(intended_endpoint) do + {scheme, host, port} = intended_endpoint + + {:disconnect, + Error.exception("re-balancing by re-connecting to #{scheme}://#{host}:#{port}"), + conn} + else + do_ping(conn) + end + end + end + end + + # inlining for cleaner stacktraces + @compile inline: [do_ping: 1] + defp do_ping(conn) do case request(conn, "GET", "/ping", _headers = [], _body = "", _opts = []) do {:ok, conn, _response} -> {:ok, conn} {:error, error, conn} -> {:disconnect, error, conn} @@ -56,7 +155,6 @@ defmodule Ch.Connection do def checkout(conn), do: {:ok, conn} # "supporting" transactions for Repo.checkout - @impl true def handle_begin(_opts, conn), do: {:ok, %{}, conn} @impl true @@ -285,7 +383,7 @@ defmodule Ch.Connection do defp maybe_put_header(headers, k, v), do: [{k, v} | headers] defp timeout(conn) do - HTTP.get_private(conn, :timeout) + HTTP.get_private(conn, :timeout, :timer.seconds(15)) end defp timeout(conn, opts) do diff --git a/lib/ch/health.ex b/lib/ch/health.ex new file mode 100644 index 0000000..c07be09 --- /dev/null +++ b/lib/ch/health.ex @@ -0,0 +1,97 @@ +defmodule Ch.Health do + @moduledoc """ + A helper health check process to minimise wait time in `ping/1` when in multihost mode. + """ + + use GenServer + + @typep state :: %{checking: [{Ch.Connection.endpoint(), reference}], config: Keyword.t()} + + def start_link(opts) do + GenServer.start_link(__MODULE__, opts, name: __MODULE__, hibernate_after: :timer.minutes(30)) + end + + @spec is_alive(Ch.Connection.endpoint()) :: boolean + def is_alive(endpoint) do + case :ets.lookup(__MODULE__, endpoint) do + [{_endpoint, _alive = true, timestamp}] -> + recently_alive = check_age(timestamp) < :timer.minutes(1) + unless recently_alive, do: start_check(endpoint) + recently_alive + + [{_endpoint, not_alive = false, timestamp}] -> + recently_checked = check_age(timestamp) < :timer.minutes(5) + unless recently_checked, do: start_check(endpoint) + not_alive + + [] -> + start_check(endpoint) + false + end + end + + defp start_check(endpoint) do + GenServer.cast(__MODULE__, {:check, endpoint}) + end + + defp check_age(timestamp, now \\ now()) do + now - timestamp + end + + defp now, do: :os.system_time(:second) + + @impl true + @spec init(Keyword.t()) :: {:ok, state} + def init(opts) do + _ensured_task_sup = Keyword.fetch!(opts, :task_sup) + __MODULE__ = :ets.new(__MODULE__, [:named_table]) + {:ok, %{checking: [], config: opts}} + end + + @impl true + def handle_cast({:check, endpoint}, state) do + %{checking: checking, config: config} = state + already_checking? = List.keymember?(checking, endpoint, 0) + + if already_checking? do + {:noreply, state} + else + %Task{ref: ref} = supervised_check(endpoint, config) + checking = [{endpoint, ref} | checking] + state = %{state | checking: checking} + {:noreply, state} + end + end + + defp supervised_check(endpoint, config) do + task_sup = Keyword.fetch!(config, :task_sup) + + Task.Supervisor.async_nolink(task_sup, fn -> + case Ch.Connection.connect(endpoint, config) do + {:ok, _conn} -> true + {:error, _reason} -> false + end + end) + end + + @impl true + def handle_info({ref, alive}, state) do + Process.demonitor(ref, [:flush]) + {:noreply, update_status(state, ref, alive)} + end + + def handle_info({:DOWN, ref, :process, _pid, _reason}, state) do + {:noreply, update_status(state, ref, false)} + end + + defp update_status(state, ref, alive) do + case List.keytake(state.checking, ref, 1) do + {{endpoint, _ref}, checking} -> + :ets.insert(__MODULE__, {endpoint, alive, now()}) + %{state | checking: checking} + + nil -> + state + end + end +end