Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

multihost #20

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 26 additions & 3 deletions lib/ch.ex
Original file line number Diff line number Diff line change
@@ -1,13 +1,36 @@
defmodule Ch do
@moduledoc "Minimal HTTP ClickHouse client"
alias Ch.{Connection, Query, Result}
alias Ch.{Connection, Query, Result, Health}

def start_link(opts \\ []) do
DBConnection.start_link(Connection, opts)
if many_endpoints?(opts) do
children = [Health, DBConnection.child_spec(Connection, opts)]
Supervisor.start_link(children, strategy: :one_for_one)
else
DBConnection.start_link(Connection, opts)
end
end

@spec child_spec(opts :: Keywort.t()) :: :supervisor.child_spec()
def child_spec(opts) do
DBConnection.child_spec(Connection, opts)
if many_endpoints?(opts) do
children = [Health, DBConnection.child_spec(Connection, opts)]

%{
id: __MODULE__,
start: {Supervisor, :start_link, [children, [strategy: :one_for_one]]},
type: :supervisor
}
else
DBConnection.child_spec(Connection, opts)
end
end

defp many_endpoints?(opts) do
case Connection.endpoints(opts) do
[_endpoint] -> false
[_ | _] -> true
end
end

@spec query(DBConnection.conn(), iodata, {:raw, iodata} | Enumerable.t(), Keyword.t()) ::
Expand Down
110 changes: 104 additions & 6 deletions lib/ch/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,88 @@ defmodule Ch.Connection do
alias Ch.{Error, RowBinary, Query, Result}
alias Mint.HTTP1, as: HTTP

@type endpoint :: {
scheme :: Mint.Types.scheme(),
host :: Mint.Types.address(),
port :: :inet.port_number()
}

@typep conn :: HTTP.t()

@impl true
@spec connect(Keyword.t()) :: {:ok, conn} | {:error, Error.t() | Mint.Types.error()}
def connect(opts) do
scheme = String.to_existing_atom(opts[:scheme] || "http")
address = opts[:hostname] || "localhost"
port = opts[:port] || 8123
case endpoints(opts) do
[endpoint] -> connect(endpoint, opts)
endpoints -> connect_endpoints(endpoints, opts)
end
end

@doc false
def endpoints(opts) do
case Keyword.fetch(opts, :endpoints) do
:error ->
scheme = String.to_existing_atom(opts[:scheme] || "http")
address = opts[:hostname] || "localhost"
port = opts[:port] || 8123
[{scheme, address, port}]

{:ok, endpoints} when is_list(endpoints) ->
endpoints =
Enum.map(endpoints, fn {scheme, host, port} ->
{String.to_existing_atom(scheme), host, port}
end)

{:ok, endpoints}

{:ok, _} ->
raise ArgumentError, "expected :endpoints to be a list of tuples"
end
end

defp pick_endpoint(endpoints, opts) do
pool_index = Keyword.fetch!(opts, :pool_index)
Enum.at(endpoints, rem(pool_index, length(endpoints)))
end

defp connect_endpoints(endpoints, opts) do
connect_endpoints(endpoints, [], opts)
end

defp connect_endpoints(endpoints, errors, opts) do
attempted = Enum.map(errors, fn {endpoint, _error} -> endpoint end)

case endpoints -- attempted do
[] ->
concat_messages =
errors
|> Enum.reverse()
|> Enum.map_join("\n", fn {endpoint, %error_module{} = error} ->
{scheme, host, port} = endpoint

" * #{scheme}://#{host}:#{port}: (#{inspect(error_module)}) #{Exception.message(error)}"
end)

message = "failed to establish connection to multiple endpoints:\n\n" <> concat_messages
{:error, Error.exception(message)}

available ->
endpoint = pick_endpoint(available, opts)

case connect(endpoint, opts) do
{:ok, _conn} = ok -> ok
{:error, reason} -> connect_endpoints(endpoints, [{endpoint, reason} | errors], opts)
end
end
end

@doc false
def connect({scheme, address, port}, opts) do
with {:ok, conn} <- HTTP.connect(scheme, address, port, mode: :passive) do
conn =
conn
|> HTTP.put_private(:timeout, opts[:timeout] || :timer.seconds(15))
|> HTTP.put_private(:opts, opts)
|> maybe_put_private(:timeout, opts[:timeout])
|> maybe_put_private(:database, opts[:database])
|> maybe_put_private(:username, opts[:username])
|> maybe_put_private(:password, opts[:password])
Expand All @@ -44,6 +113,36 @@ defmodule Ch.Connection do
@impl true
@spec ping(conn) :: {:ok, conn} | {:disconnect, Mint.Types.error() | Error.t(), conn}
def ping(conn) do
current_endpoint = {conn.scheme_as_string, conn.host, conn.port}
opts = HTTP.get_private(conn, :opts)

case endpoints(opts) do
[_endpoint] ->
do_ping(conn)

endpoints ->
# TODO don't check just the intended endpoint by rather the history of attempts
intended_endpoint = pick_endpoint(endpoints, opts)

if current_endpoint == intended_endpoint do
do_ping(conn)
else
if Ch.Health.is_alive(intended_endpoint) do
{scheme, host, port} = intended_endpoint

{:disconnect,
Error.exception("re-balancing by re-connecting to #{scheme}://#{host}:#{port}"),
conn}
else
do_ping(conn)
end
end
end
end

# inlining for cleaner stacktraces
@compile inline: [do_ping: 1]
defp do_ping(conn) do
case request(conn, "GET", "/ping", _headers = [], _body = "", _opts = []) do
{:ok, conn, _response} -> {:ok, conn}
{:error, error, conn} -> {:disconnect, error, conn}
Expand All @@ -56,7 +155,6 @@ defmodule Ch.Connection do
def checkout(conn), do: {:ok, conn}

# "supporting" transactions for Repo.checkout

@impl true
def handle_begin(_opts, conn), do: {:ok, %{}, conn}
@impl true
Expand Down Expand Up @@ -285,7 +383,7 @@ defmodule Ch.Connection do
defp maybe_put_header(headers, k, v), do: [{k, v} | headers]

defp timeout(conn) do
HTTP.get_private(conn, :timeout)
HTTP.get_private(conn, :timeout, :timer.seconds(15))
end

defp timeout(conn, opts) do
Expand Down
97 changes: 97 additions & 0 deletions lib/ch/health.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
defmodule Ch.Health do
@moduledoc """
A helper health check process to minimise wait time in `ping/1` when in multihost mode.
"""

use GenServer

@typep state :: %{checking: [{Ch.Connection.endpoint(), reference}], config: Keyword.t()}

def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__, hibernate_after: :timer.minutes(30))
end

@spec is_alive(Ch.Connection.endpoint()) :: boolean
def is_alive(endpoint) do
case :ets.lookup(__MODULE__, endpoint) do
[{_endpoint, _alive = true, timestamp}] ->
recently_alive = check_age(timestamp) < :timer.minutes(1)
unless recently_alive, do: start_check(endpoint)
recently_alive

[{_endpoint, not_alive = false, timestamp}] ->
recently_checked = check_age(timestamp) < :timer.minutes(5)
unless recently_checked, do: start_check(endpoint)
not_alive

[] ->
start_check(endpoint)
false
end
end

defp start_check(endpoint) do
GenServer.cast(__MODULE__, {:check, endpoint})
end

defp check_age(timestamp, now \\ now()) do
now - timestamp
end

defp now, do: :os.system_time(:second)

@impl true
@spec init(Keyword.t()) :: {:ok, state}
def init(opts) do
_ensured_task_sup = Keyword.fetch!(opts, :task_sup)
__MODULE__ = :ets.new(__MODULE__, [:named_table])
{:ok, %{checking: [], config: opts}}
end

@impl true
def handle_cast({:check, endpoint}, state) do
%{checking: checking, config: config} = state
already_checking? = List.keymember?(checking, endpoint, 0)

if already_checking? do
{:noreply, state}
else
%Task{ref: ref} = supervised_check(endpoint, config)
checking = [{endpoint, ref} | checking]
state = %{state | checking: checking}
{:noreply, state}
end
end

defp supervised_check(endpoint, config) do
task_sup = Keyword.fetch!(config, :task_sup)

Task.Supervisor.async_nolink(task_sup, fn ->
case Ch.Connection.connect(endpoint, config) do
{:ok, _conn} -> true
{:error, _reason} -> false
end
end)
end

@impl true
def handle_info({ref, alive}, state) do
Process.demonitor(ref, [:flush])
{:noreply, update_status(state, ref, alive)}
end

def handle_info({:DOWN, ref, :process, _pid, _reason}, state) do
{:noreply, update_status(state, ref, false)}
end

defp update_status(state, ref, alive) do
case List.keytake(state.checking, ref, 1) do
{{endpoint, _ref}, checking} ->
:ets.insert(__MODULE__, {endpoint, alive, now()})
%{state | checking: checking}

nil ->
state
end
end
end