500, 500 thousand, and 500 million rows (original)
-$ MIX_ENV=bench mix run bench/stream.exs
-This benchmark is based on https://github.com/ClickHouse/ch-bench
-Operating System: macOS
-CPU Information: Apple M1
-Number of Available Cores: 8
-Available memory: 8 GB
-Elixir 1.14.4
-Erlang 25.3
-Benchmark suite executing with the following configuration:
-warmup: 2 s
-time: 5 s
-memory time: 0 ns
-reduction time: 0 ns
-parallel: 1
-inputs: 500 rows, 500_000 rows, 500_000_000 rows
-Estimated total run time: 1.05 min
-Benchmarking stream with decode with input 500 rows ...
-Benchmarking stream with decode with input 500_000 rows ...
-Benchmarking stream with decode with input 500_000_000 rows ...
-Benchmarking stream with manual decode with input 500 rows ...
-Benchmarking stream with manual decode with input 500_000 rows ...
-Benchmarking stream with manual decode with input 500_000_000 rows ...
-Benchmarking stream without decode with input 500 rows ...
-Benchmarking stream without decode with input 500_000 rows ...
-Benchmarking stream without decode with input 500_000_000 rows ...
-##### With input 500 rows #####
-Name ips average deviation median 99th %
-stream with decode 4.69 K 213.34 μs ±12.49% 211.38 μs 290.94 μs
-stream with manual decode 4.69 K 213.43 μs ±17.40% 210.96 μs 298.75 μs
-stream without decode 4.65 K 215.08 μs ±10.79% 213.79 μs 284.66 μs
-stream with decode 4.69 K
-stream with manual decode 4.69 K - 1.00x slower +0.0838 μs
-stream without decode 4.65 K - 1.01x slower +1.74 μs
-##### With input 500_000 rows #####
-Name ips average deviation median 99th %
-stream without decode 234.58 4.26 ms ±13.99% 4.04 ms 5.95 ms
-stream with manual decode 64.26 15.56 ms ±8.36% 15.86 ms 17.97 ms
-stream with decode 41.03 24.37 ms ±6.27% 24.39 ms 26.60 ms
-stream without decode 234.58
-stream with manual decode 64.26 - 3.65x slower +11.30 ms
-stream with decode 41.03 - 5.72x slower +20.11 ms
-##### With input 500_000_000 rows #####
-Name ips average deviation median 99th %
-stream without decode 0.32 3.17 s ±0.20% 3.17 s 3.17 s
-stream with manual decode 0.0891 11.23 s ±0.00% 11.23 s 11.23 s
-stream with decode 0.0462 21.66 s ±0.00% 21.66 s 21.66 s
-stream without decode 0.32
-stream with manual decode 0.0891 - 3.55x slower +8.06 s
-stream with decode 0.0462 - 6.84x slower +18.50 s
-[CI Results](https://github.com/plausible/ch/actions/workflows/bench.yml) (click the latest workflow run and scroll down to "Artifacts")
+Please see [CI Results](https://github.com/plausible/ch/actions/workflows/bench.yml) (make sure to click the latest workflow run and scroll down to "Artifacts") for [some of our benchmarks.](./bench/) :)
diff --git a/bench/insert.exs b/bench/insert.exs
index f6f01c4..c3502a2 100644
--- a/bench/insert.exs
+++ b/bench/insert.exs
@@ -6,7 +6,7 @@ scheme = System.get_env("CH_SCHEME") || "http"
database = System.get_env("CH_DATABASE") || "ch_bench"
{:ok, conn} = Ch.start_link(scheme: scheme, hostname: hostname, port: port)
-Ch.query!(conn, "CREATE DATABASE IF NOT EXISTS {$0:Identifier}", [database])
+Ch.query!(conn, "CREATE DATABASE IF NOT EXISTS {db:Identifier}", %{"db" => database})
Ch.query!(conn, """
CREATE TABLE IF NOT EXISTS #{database}.benchmark (
@@ -18,7 +18,7 @@ CREATE TABLE IF NOT EXISTS #{database}.benchmark (
types = [Ch.Types.u64(), Ch.Types.string(), Ch.Types.array(Ch.Types.u8()), Ch.Types.datetime()]
-statement = "INSERT INTO #{database}.benchmark FORMAT RowBinary"
+statement = "INSERT INTO #{database}.benchmark FORMAT RowBinary\n"
rows = fn count ->
Enum.map(1..count, fn i ->
@@ -32,21 +32,31 @@ Benchee.run(
# "control" => fn rows -> Enum.each(rows, fn _row -> :ok end) end,
"encode" => fn rows -> RowBinary.encode_rows(rows, types) end,
- "insert" => fn rows -> Ch.query!(conn, statement, rows, types: types) end,
+ "encode+insert" => fn rows ->
+ Ch.query!(conn, [statement | RowBinary.encode_rows(rows, types)])
+ end,
# "control stream" => fn rows -> rows |> Stream.chunk_every(60_000) |> Stream.run() end,
"encode stream" => fn rows ->
+ encoding_types = Ch.RowBinary.encoding_types(types)
|> Stream.chunk_every(60_000)
- |> Stream.map(fn chunk -> RowBinary.encode_rows(chunk, types) end)
+ |> Stream.map(fn chunk -> RowBinary._encode_rows(chunk, encoding_types) end)
|> Stream.run()
"insert stream" => fn rows ->
- stream =
- rows
- |> Stream.chunk_every(60_000)
- |> Stream.map(fn chunk -> RowBinary.encode_rows(chunk, types) end)
+ DBConnection.run(
+ conn,
+ fn conn ->
+ encoding_types = Ch.RowBinary.encoding_types(types)
- Ch.query!(conn, statement, stream, encode: false)
+ rows
+ |> Stream.chunk_every(60_000)
+ |> Stream.map(fn chunk -> RowBinary._encode_rows(chunk, encoding_types) end)
+ |> Enum.into(Ch.stream(conn, statement))
+ end,
+ timeout: :infinity
+ )
inputs: %{
diff --git a/bench/stream.exs b/bench/stream.exs
index 43c0e38..c48196a 100644
--- a/bench/stream.exs
+++ b/bench/stream.exs
@@ -10,36 +10,34 @@ statement = fn limit ->
"SELECT number FROM system.numbers_mt LIMIT #{limit}"
-run_stream = fn statement, opts ->
- f = fn conn -> conn |> Ch.stream(statement, [], opts) |> Stream.run() end
- Ch.run(conn, f, timeout: :infinity)
- "stream without decode" => fn statement ->
- run_stream.(statement, _opts = [])
+ "RowBinary stream without decode" => fn statement ->
+ DBConnection.run(
+ conn,
+ fn conn ->
+ conn
+ |> Ch.stream(statement, _params = [], format: "RowBinary", timeout: :infinity)
+ |> Stream.run()
+ end,
+ timeout: :infinity
+ )
- # TODO why is this faster?
- "stream with manual decode" => fn statement ->
- f = fn conn ->
- conn
- |> Ch.stream(statement, [], format: "RowBinary")
- |> Stream.map(fn responses ->
- Enum.each(responses, fn
- {:data, _ref, data} -> Ch.RowBinary.decode_rows(data, [:u64])
- {:status, _ref, 200} -> :ok
- {:headers, _ref, _headers} -> :ok
- {:done, _ref} -> :ok
+ "RowBinary stream with manual decode" => fn statement ->
+ DBConnection.run(
+ conn,
+ fn conn ->
+ conn
+ |> Ch.stream(statement, _params = [], format: "RowBinary", timeout: :infinity)
+ |> Stream.map(fn %Ch.Result{data: data} ->
+ data
+ |> IO.iodata_to_binary()
+ |> Ch.RowBinary.decode_rows([:u64])
- end)
- |> Stream.run()
- end
- Ch.run(conn, f, timeout: :infinity)
- end,
- "stream with decode" => fn statement ->
- run_stream.(statement, types: [:u64])
+ |> Stream.run()
+ end,
+ timeout: :infinity
+ )
inputs: %{
diff --git a/lib/ch.ex b/lib/ch.ex
index a027102..b5c60ca 100644
--- a/lib/ch.ex
+++ b/lib/ch.ex
@@ -2,49 +2,85 @@ defmodule Ch do
@moduledoc "Minimal HTTP ClickHouse client."
alias Ch.{Connection, Query, Result}
+ @type common_option ::
+ {:database, String.t()}
+ | {:username, String.t()}
+ | {:password, String.t()}
+ | {:settings, Keyword.t()}
+ | {:timeout, timeout}
+ @type start_option ::
+ common_option
+ | {:scheme, String.t()}
+ | {:hostname, String.t()}
+ | {:port, :inet.port_number()}
+ | {:transport_opts, :gen_tcp.connect_option()}
+ | DBConnection.start_option()
@doc """
Start the connection process and connect to ClickHouse.
## Options
+ * `:scheme` - HTTP scheme, defaults to `"http"`
* `:hostname` - server hostname, defaults to `"localhost"`
* `:port` - HTTP port, defualts to `8123`
- * `:scheme` - HTTP scheme, defaults to `"http"`
+ * `:transport_opts` - options to be given to the transport being used. See `Mint.HTTP1.connect/4` for more info
* `:database` - Database, defaults to `"default"`
* `:username` - Username
* `:password` - User password
* `:settings` - Keyword list of ClickHouse settings
* `:timeout` - HTTP receive timeout in milliseconds
- * `:transport_opts` - options to be given to the transport being used. See `Mint.HTTP1.connect/4` for more info
+ * [`DBConnection.start_option()`](https://hexdocs.pm/db_connection/DBConnection.html#t:start_option/0)
+ @spec start_link([start_option]) :: GenServer.on_start()
def start_link(opts \\ []) do
DBConnection.start_link(Connection, opts)
@doc """
Returns a supervisor child specification for a DBConnection pool.
+ See `start_link/1` for supported options.
+ @spec child_spec([start_option]) :: :supervisor.child_spec()
def child_spec(opts) do
DBConnection.child_spec(Connection, opts)
+ # TODO move streaming to Ch.stream/4
+ @type statement :: iodata | Enumerable.t()
+ @type params :: %{String.t() => term} | [{String.t(), term}]
+ @type query_option ::
+ common_option
+ | {:command, Ch.Query.command()}
+ | {:headers, [{String.t(), String.t()}]}
+ | {:format, String.t()}
+ | {:decode, boolean}
+ | DBConnection.connection_option()
@doc """
Runs a query and returns the result as `{:ok, %Ch.Result{}}` or
`{:error, Exception.t()}` if there was a database error.
## Options
- * `:timeout` - Query request timeout
- * `:settings` - Keyword list of settings
* `:database` - Database
* `:username` - Username
* `:password` - User password
+ * `:settings` - Keyword list of settings
+ * `:timeout` - Query request timeout
+ * `:command` - Command tag for the query
+ * `:headers` - Custom HTTP headers for the request
+ * `:format` - Custom response format for the request
+ * `:decode` - Whether to automatically decode the response
+ * [`DBConnection.connection_option()`](https://hexdocs.pm/db_connection/DBConnection.html#t:connection_option/0)
- @spec query(DBConnection.conn(), iodata, params, Keyword.t()) ::
+ @spec query(DBConnection.conn(), statement, params, [query_option]) ::
{:ok, Result.t()} | {:error, Exception.t()}
- when params: map | [term] | [row :: [term]] | iodata | Enumerable.t()
def query(conn, statement, params \\ [], opts \\ []) do
query = Query.build(statement, opts)
@@ -57,24 +93,19 @@ defmodule Ch do
Runs a query and returns the result or raises `Ch.Error` if
there was an error. See `query/4`.
- @spec query!(DBConnection.conn(), iodata, params, Keyword.t()) :: Result.t()
- when params: map | [term] | [row :: [term]] | iodata | Enumerable.t()
+ @spec query!(DBConnection.conn(), statement, params, [query_option]) :: Result.t()
def query!(conn, statement, params \\ [], opts \\ []) do
query = Query.build(statement, opts)
DBConnection.execute!(conn, query, params, opts)
- @doc false
- @spec stream(DBConnection.t(), iodata, map | [term], Keyword.t()) :: DBConnection.Stream.t()
+ @doc """
+ Returns a stream for a query on a connection.
+ """
+ @spec stream(DBConnection.t(), statement, params, [query_option]) :: Ch.Stream.t()
def stream(conn, statement, params \\ [], opts \\ []) do
query = Query.build(statement, opts)
- DBConnection.stream(conn, query, params, opts)
- end
- @doc false
- @spec run(DBConnection.conn(), (DBConnection.t() -> any), Keyword.t()) :: any
- def run(conn, f, opts \\ []) when is_function(f, 1) do
- DBConnection.run(conn, f, opts)
+ %Ch.Stream{conn: conn, query: query, params: params, opts: opts}
if Code.ensure_loaded?(Ecto.ParameterizedType) do
diff --git a/lib/ch/connection.ex b/lib/ch/connection.ex
index 70fcaf7..35de684 100644
--- a/lib/ch/connection.ex
+++ b/lib/ch/connection.ex
@@ -92,41 +92,97 @@ defmodule Ch.Connection do
@impl true
def handle_declare(query, params, opts, conn) do
- {query_params, extra_headers, body} = params
+ %Query{command: command, statement: statement} = query
+ {query_params, extra_headers} = params
path = path(conn, query_params, opts)
headers = headers(conn, extra_headers, opts)
- types = Keyword.get(opts, :types)
- with {:ok, conn, ref} <- send_request(conn, "POST", path, headers, body) do
- {:ok, query, {types, ref}, conn}
+ with {:ok, conn, _ref} <- send_request(conn, "POST", path, headers, statement),
+ {:ok, conn} <- eat_ok_status_and_headers(conn, timeout(conn, opts)) do
+ {:ok, query, %Result{command: command}, conn}
+ @spec eat_ok_status_and_headers(conn, timeout) ::
+ {:ok, %{conn: conn, buffer: [Mint.Types.response()]}}
+ | {:error, Ch.Error.t(), conn}
+ | {:disconnect, Mint.Types.error(), conn}
+ defp eat_ok_status_and_headers(conn, timeout) do
+ case HTTP.recv(conn, 0, timeout) do
+ {:ok, conn, responses} ->
+ case eat_ok_status_and_headers(responses) do
+ {:ok, data} ->
+ {:ok, %{conn: conn, buffer: data}}
+ :more ->
+ eat_ok_status_and_headers(conn, timeout)
+ :error ->
+ all_responses_result =
+ case handle_all_responses(responses, []) do
+ {:ok, responses} -> {:ok, conn, responses}
+ {:more, acc} -> recv_all(conn, acc, timeout)
+ end
+ with {:ok, conn, responses} <- all_responses_result do
+ [_status, headers | data] = responses
+ message = IO.iodata_to_binary(data)
+ code =
+ if code = get_header(headers, "x-clickhouse-exception-code") do
+ String.to_integer(code)
+ end
+ {:error, Error.exception(code: code, message: message), conn}
+ end
+ end
+ {:error, conn, error, _responses} ->
+ {:disconnect, error, conn}
+ end
+ end
+ defp eat_ok_status_and_headers([{:status, _ref, 200} | rest]) do
+ eat_ok_status_and_headers(rest)
+ end
+ defp eat_ok_status_and_headers([{:status, _ref, _status} | _rest]), do: :error
+ defp eat_ok_status_and_headers([{:headers, _ref, _headers} | data]), do: {:ok, data}
+ defp eat_ok_status_and_headers([]), do: :more
@impl true
- def handle_fetch(_query, {types, ref}, opts, conn) do
+ def handle_fetch(query, result, opts, %{conn: conn, buffer: buffer}) do
+ case buffer do
+ [] -> handle_fetch(query, result, opts, conn)
+ _not_empty -> {halt_or_cont(buffer), %Result{result | data: extract_data(buffer)}, conn}
+ end
+ end
+ def handle_fetch(_query, result, opts, conn) do
case HTTP.recv(conn, 0, timeout(conn, opts)) do
{:ok, conn, responses} ->
- {halt_or_cont(responses, ref), {:stream, types, responses}, conn}
+ {halt_or_cont(responses), %Result{result | data: extract_data(responses)}, conn}
{:error, conn, reason, _responses} ->
{:disconnect, reason, conn}
- defp halt_or_cont([{:done, ref}], ref), do: :halt
+ defp halt_or_cont([{:done, _ref}]), do: :halt
+ defp halt_or_cont([_ | rest]), do: halt_or_cont(rest)
+ defp halt_or_cont([]), do: :cont
- defp halt_or_cont([{tag, ref, _data} | rest], ref) when tag in [:data, :status, :headers] do
- halt_or_cont(rest, ref)
- end
- defp halt_or_cont([], _ref), do: :cont
+ defp extract_data([{:data, _ref, data} | rest]), do: [data | extract_data(rest)]
+ defp extract_data([] = empty), do: empty
+ defp extract_data([{:done, _ref}]), do: []
@impl true
- def handle_deallocate(_query, _ref, _opts, conn) do
+ def handle_deallocate(_query, result, _opts, conn) do
case HTTP.open_request_count(conn) do
0 ->
- {:ok, [], conn}
+ # TODO data: [], anything else?
+ {:ok, %Result{result | data: []}, conn}
1 ->
{:disconnect, Error.exception("cannot stop stream before receiving full response"), conn}
@@ -134,31 +190,46 @@ defmodule Ch.Connection do
@impl true
- def handle_execute(%Query{command: :insert} = query, params, opts, conn) do
- {query_params, extra_headers, body} = params
+ def handle_execute(%Query{statement: statement} = query, {:stream, params}, opts, conn) do
+ {query_params, extra_headers} = params
path = path(conn, query_params, opts)
headers = headers(conn, extra_headers, opts)
- result =
- if is_function(body, 2) do
- request_chunked(conn, "POST", path, headers, body, opts)
- else
- request(conn, "POST", path, headers, body, opts)
+ with {:ok, conn, ref} <- send_request(conn, "POST", path, headers, :stream) do
+ case HTTP.stream_request_body(conn, ref, statement) do
+ {:ok, conn} -> {:ok, query, ref, conn}
+ {:error, conn, reason} -> {:disconnect, reason, conn}
+ end
+ end
- with {:ok, conn, responses} <- result do
- {:ok, query, responses, conn}
+ def handle_execute(%Query{} = query, {:stream, ref, body}, opts, conn) do
+ case HTTP.stream_request_body(conn, ref, body) do
+ {:ok, conn} ->
+ case body do
+ :eof ->
+ with {:ok, conn, responses} <- receive_full_response(conn, timeout(conn, opts)) do
+ {:ok, query, responses, conn}
+ end
+ _other ->
+ {:ok, query, ref, conn}
+ end
+ {:error, conn, reason} ->
+ {:disconnect, reason, conn}
- def handle_execute(query, params, opts, conn) do
- {query_params, extra_headers, body} = params
+ def handle_execute(%Query{statement: statement} = query, params, opts, conn)
+ when is_list(statement) or is_binary(statement) do
+ {query_params, extra_headers} = params
path = path(conn, query_params, opts)
headers = headers(conn, extra_headers, opts)
- with {:ok, conn, responses} <- request(conn, "POST", path, headers, body, opts) do
+ with {:ok, conn, responses} <- request(conn, "POST", path, headers, statement, opts) do
{:ok, query, responses, conn}
@@ -171,40 +242,20 @@ defmodule Ch.Connection do
@typep response :: Mint.Types.status() | Mint.Types.headers() | binary
- @spec request(conn, binary, binary, Mint.Types.headers(), iodata, Keyword.t()) ::
+ @spec request(
+ conn,
+ method :: String.t(),
+ path :: String.t(),
+ Mint.Types.headers(),
+ body :: iodata,
+ [Ch.query_option()]
+ ) ::
{:ok, conn, [response]}
| {:error, Error.t(), conn}
| {:disconnect, Mint.Types.error(), conn}
defp request(conn, method, path, headers, body, opts) do
- with {:ok, conn, ref} <- send_request(conn, method, path, headers, body) do
- receive_response(conn, ref, timeout(conn, opts))
- end
- end
- @spec request_chunked(conn, binary, binary, Mint.Types.headers(), Enumerable.t(), Keyword.t()) ::
- {:ok, conn, [response]}
- | {:error, Error.t(), conn}
- | {:disconnect, Mint.Types.error(), conn}
- def request_chunked(conn, method, path, headers, stream, opts) do
- with {:ok, conn, ref} <- send_request(conn, method, path, headers, :stream),
- {:ok, conn} <- stream_body(conn, ref, stream),
- do: receive_response(conn, ref, timeout(conn, opts))
- end
- @spec stream_body(conn, Mint.Types.request_ref(), Enumerable.t()) ::
- {:ok, conn} | {:disconnect, Mint.Types.error(), conn}
- defp stream_body(conn, ref, stream) do
- result =
- stream
- |> Stream.concat([:eof])
- |> Enum.reduce_while({:ok, conn}, fn
- chunk, {:ok, conn} -> {:cont, HTTP.stream_request_body(conn, ref, chunk)}
- _chunk, {:error, _conn, _reason} = error -> {:halt, error}
- end)
- case result do
- {:ok, _conn} = ok -> ok
- {:error, conn, reason} -> {:disconnect, reason, conn}
+ with {:ok, conn, _ref} <- send_request(conn, method, path, headers, body) do
+ receive_full_response(conn, timeout(conn, opts))
@@ -217,12 +268,12 @@ defmodule Ch.Connection do
- @spec receive_response(conn, Mint.Types.request_ref(), timeout) ::
+ @spec receive_full_response(conn, timeout) ::
{:ok, conn, [response]}
| {:error, Error.t(), conn}
| {:disconnect, Mint.Types.error(), conn}
- defp receive_response(conn, ref, timeout) do
- with {:ok, conn, responses} <- recv(conn, ref, [], timeout) do
+ defp receive_full_response(conn, timeout) do
+ with {:ok, conn, responses} <- recv_all(conn, [], timeout) do
case responses do
[200, headers | _rest] ->
conn = ensure_same_server(conn, headers)
@@ -241,14 +292,14 @@ defmodule Ch.Connection do
- @spec recv(conn, Mint.Types.request_ref(), [response], timeout()) ::
+ @spec recv_all(conn, [response], timeout()) ::
{:ok, conn, [response]} | {:disconnect, Mint.Types.error(), conn}
- defp recv(conn, ref, acc, timeout) do
+ defp recv_all(conn, acc, timeout) do
case HTTP.recv(conn, 0, timeout) do
{:ok, conn, responses} ->
- case handle_responses(responses, ref, acc) do
+ case handle_all_responses(responses, acc) do
{:ok, responses} -> {:ok, conn, responses}
- {:more, acc} -> recv(conn, ref, acc, timeout)
+ {:more, acc} -> recv_all(conn, acc, timeout)
{:error, conn, reason, _responses} ->
@@ -256,16 +307,14 @@ defmodule Ch.Connection do
- defp handle_responses([{:done, ref}], ref, acc) do
- {:ok, :lists.reverse(acc)}
- end
- defp handle_responses([{tag, ref, data} | rest], ref, acc)
- when tag in [:data, :status, :headers] do
- handle_responses(rest, ref, [data | acc])
+ for tag <- [:data, :status, :headers] do
+ defp handle_all_responses([{unquote(tag), _ref, data} | rest], acc) do
+ handle_all_responses(rest, [data | acc])
+ end
- defp handle_responses([], _ref, acc), do: {:more, acc}
+ defp handle_all_responses([{:done, _ref}], acc), do: {:ok, :lists.reverse(acc)}
+ defp handle_all_responses([], acc), do: {:more, acc}
defp maybe_put_private(conn, _k, nil), do: conn
defp maybe_put_private(conn, k, v), do: HTTP.put_private(conn, k, v)
diff --git a/lib/ch/query.ex b/lib/ch/query.ex
index 2e6888d..3fa2de8 100644
--- a/lib/ch/query.ex
+++ b/lib/ch/query.ex
@@ -1,16 +1,15 @@
defmodule Ch.Query do
@moduledoc "Query struct wrapping the SQL statement."
- defstruct [:statement, :command, :encode, :decode]
+ defstruct [:statement, :command]
- @type t :: %__MODULE__{statement: iodata, command: atom, encode: boolean, decode: boolean}
+ @type t :: %__MODULE__{statement: Ch.statement(), command: command}
+ @type params :: [{String.t(), String.t()}]
@doc false
- @spec build(iodata, Keyword.t()) :: t
+ @spec build(Ch.statement(), [Ch.query_option()]) :: t
def build(statement, opts \\ []) do
command = Keyword.get(opts, :command) || extract_command(statement)
- encode = Keyword.get(opts, :encode, true)
- decode = Keyword.get(opts, :decode, true)
- %__MODULE__{statement: statement, command: command, encode: encode, decode: decode}
+ %__MODULE__{statement: statement, command: command}
statements = [
@@ -43,6 +42,13 @@ defmodule Ch.Query do
{"WATCH", :watch}
+ command_union =
+ statements
+ |> Enum.map(fn {_, command} -> command end)
+ |> Enum.reduce(&{:|, [], [&1, &2]})
+ @type command :: unquote(command_union)
defp extract_command(statement)
for {statement, command} <- statements do
@@ -54,8 +60,8 @@ defmodule Ch.Query do
- defp extract_command([first_segment | _] = statement) do
- extract_command(first_segment) || extract_command(IO.iodata_to_binary(statement))
+ defp extract_command([first_segment | _]) do
+ extract_command(first_segment)
defp extract_command(_other), do: nil
@@ -64,117 +70,64 @@ end
defimpl DBConnection.Query, for: Ch.Query do
alias Ch.{Query, Result, RowBinary}
- @spec parse(Query.t(), Keyword.t()) :: Query.t()
+ @spec parse(Query.t(), [Ch.query_option()]) :: Query.t()
def parse(query, _opts), do: query
- @spec describe(Query.t(), Keyword.t()) :: Query.t()
+ @spec describe(Query.t(), [Ch.query_option()]) :: Query.t()
def describe(query, _opts), do: query
- @spec encode(Query.t(), params, Keyword.t()) :: {query_params, Mint.Types.headers(), body}
- when params: map | [term] | [row :: [term]] | iodata | Enumerable.t(),
- query_params: [{String.t(), String.t()}],
- body: iodata | Enumerable.t()
- def encode(%Query{command: :insert, encode: false, statement: statement}, data, opts) do
- body =
- case data do
- _ when is_list(data) or is_binary(data) -> [statement, ?\n | data]
- _ -> Stream.concat([[statement, ?\n]], data)
- end
- {_query_params = [], headers(opts), body}
+ @spec encode(Query.t(), Ch.params(), [Ch.query_option()]) ::
+ {Ch.Query.params(), Mint.Types.headers()}
+ def encode(%Query{}, params, opts) when is_list(params) or is_map(params) do
+ format = Keyword.get(opts, :format, "RowBinaryWithNamesAndTypes")
+ headers = Keyword.get(opts, :headers, [])
+ {query_params(params), [{"x-clickhouse-format", format} | headers]}
- def encode(%Query{command: :insert, statement: statement}, params, opts) do
- cond do
- names = Keyword.get(opts, :names) ->
- types = Keyword.fetch!(opts, :types)
- header = RowBinary.encode_names_and_types(names, types)
- data = RowBinary.encode_rows(params, types)
- {_query_params = [], headers(opts), [statement, ?\n, header | data]}
- format_row_binary?(statement) ->
- types = Keyword.fetch!(opts, :types)
- data = RowBinary.encode_rows(params, types)
- {_query_params = [], headers(opts), [statement, ?\n | data]}
- true ->
- {query_params(params), headers(opts), statement}
- end
- end
- def encode(%Query{statement: statement}, params, opts) do
- types = Keyword.get(opts, :types)
- default_format = if types, do: "RowBinary", else: "RowBinaryWithNamesAndTypes"
- format = Keyword.get(opts, :format) || default_format
- {query_params(params), [{"x-clickhouse-format", format} | headers(opts)], statement}
- end
- defp format_row_binary?(statement) when is_binary(statement) do
- statement |> String.trim_trailing() |> String.ends_with?("RowBinary")
+ # stream: insert init
+ @spec encode(Query.t(), {:stream, Ch.params()}, [Ch.query_option()]) ::
+ {:stream, {Ch.Query.params(), Mint.Types.headers()}}
+ def encode(query, {:stream, params}, opts) do
+ {:stream, encode(query, params, opts)}
- defp format_row_binary?(statement) when is_list(statement) do
- statement
- |> IO.iodata_to_binary()
- |> format_row_binary?()
+ # stream: insert data chunk
+ @spec encode(Query.t(), {:stream, Mint.Types.request_ref(), iodata | :eof}, [Ch.query_option()]) ::
+ {:stream, Mint.Types.request_ref(), iodata | :eof}
+ def encode(_query, {:stream, ref, data}, _opts) do
+ {:stream, ref, data}
- @spec decode(Query.t(), [response], Keyword.t()) :: Result.t()
+ @spec decode(Query.t(), [response], [Ch.query_option()]) :: Result.t()
when response: Mint.Types.status() | Mint.Types.headers() | binary
- def decode(%Query{command: :insert}, responses, _opts) do
- [_status, headers | _data] = responses
- num_rows =
- if summary = get_header(headers, "x-clickhouse-summary") do
- %{"written_rows" => written_rows} = Jason.decode!(summary)
- String.to_integer(written_rows)
- end
- %Result{num_rows: num_rows, rows: nil, command: :insert, headers: headers}
- end
- def decode(%Query{decode: false, command: command}, responses, _opts) when is_list(responses) do
- # TODO potentially fails on x-progress-headers
- [_status, headers | data] = responses
- %Result{rows: data, command: command, headers: headers}
- end
def decode(%Query{command: command}, responses, opts) when is_list(responses) do
- # TODO potentially fails on x-progress-headers
[_status, headers | data] = responses
+ format = get_header(headers, "x-clickhouse-format")
+ decode = Keyword.get(opts, :decode, true)
- case get_header(headers, "x-clickhouse-format") do
- "RowBinary" ->
- types = Keyword.fetch!(opts, :types)
- rows = data |> IO.iodata_to_binary() |> RowBinary.decode_rows(types)
- %Result{num_rows: length(rows), rows: rows, command: command, headers: headers}
- "RowBinaryWithNamesAndTypes" ->
+ cond do
+ decode and format == "RowBinaryWithNamesAndTypes" ->
rows = data |> IO.iodata_to_binary() |> RowBinary.decode_rows()
- %Result{num_rows: length(rows), rows: rows, command: command, headers: headers}
+ %Result{num_rows: length(rows), rows: rows, data: data, command: command}
- _other ->
- %Result{rows: data, command: command, headers: headers}
- end
- end
+ format == nil ->
+ num_rows =
+ if summary = get_header(headers, "x-clickhouse-summary") do
+ %{"written_rows" => written_rows} = Jason.decode!(summary)
+ String.to_integer(written_rows)
+ end
- # TODO merge :stream `decode/3` with "normal" `decode/3` clause above
- @spec decode(Query.t(), {:stream, nil, responses}, Keyword.t()) :: responses
- when responses: [Mint.Types.response()]
- def decode(_query, {:stream, nil, responses}, _opts), do: responses
+ %Result{num_rows: num_rows, data: data, command: command}
- @spec decode(Query.t(), {:stream, [atom], [Mint.Types.response()]}, Keyword.t()) :: [[term]]
- def decode(_query, {:stream, types, responses}, _opts) do
- decode_stream_data(responses, types)
- end
- defp decode_stream_data([{:data, _ref, data} | rest], types) do
- [RowBinary.decode_rows(data, types) | decode_stream_data(rest, types)]
+ true ->
+ %Result{data: data, command: command}
+ end
- defp decode_stream_data([_ | rest], types), do: decode_stream_data(rest, types)
- defp decode_stream_data([] = done, _types), do: done
+ # stream: select result
+ def decode(_query, %Result{} = result, _opts), do: result
+ # stream: insert result
+ def decode(_query, ref, _opts) when is_reference(ref), do: ref
defp get_header(headers, key) do
case List.keyfind(headers, key, 0) do
@@ -183,15 +136,9 @@ defimpl DBConnection.Query, for: Ch.Query do
- defp query_params(params) when is_map(params) do
- Enum.map(params, fn {k, v} -> {"param_#{k}", encode_param(v)} end)
- end
- defp query_params(params) when is_list(params) do
- params
- |> Enum.with_index()
- |> Enum.map(fn {v, idx} -> {"param_$#{idx}", encode_param(v)} end)
- end
+ @compile inline: [query_params: 1]
+ defp query_params(params), do: Enum.map(params, &query_param/1)
+ defp query_param({k, v}), do: {"param_#{k}", encode_param(v)}
defp encode_param(n) when is_integer(n), do: Integer.to_string(n)
defp encode_param(f) when is_float(f), do: Float.to_string(f)
@@ -280,9 +227,6 @@ defimpl DBConnection.Query, for: Ch.Query do
defp escape_param([], param), do: param
- @spec headers(Keyword.t()) :: Mint.Types.headers()
- defp headers(opts), do: Keyword.get(opts, :headers, [])
defimpl String.Chars, for: Ch.Query do
diff --git a/lib/ch/result.ex b/lib/ch/result.ex
index ddca4df..2977aa9 100644
--- a/lib/ch/result.ex
+++ b/lib/ch/result.ex
@@ -2,21 +2,19 @@ defmodule Ch.Result do
@moduledoc """
Result struct returned from any successful query. Its fields are:
- * `command` - An atom of the query command, for example: `:select`, `:insert`;
- * `rows` - The result set. One of:
- - a list of lists, each inner list corresponding to a
- row, each element in the inner list corresponds to a column;
- - raw iodata when the response is not automatically decoded, e.g. `x-clickhouse-format: CSV`
- * `num_rows` - The number of fetched or affected rows;
- * `headers` - The HTTP response headers
+ * `command` - An atom of the query command, for example: `:select`, `:insert`
+ * `num_rows` - The number of fetched or affected rows
+ * `rows` - A list of lists, each inner list corresponding to a row, each element in the inner list corresponds to a column
+ * `data` - The raw iodata from the response
- defstruct [:command, :num_rows, :rows, :headers]
+ defstruct [:command, :num_rows, :rows, :data]
@type t :: %__MODULE__{
- command: atom,
+ command: Ch.Query.command() | nil,
num_rows: non_neg_integer | nil,
- rows: [[term]] | iodata | nil,
- headers: Mint.Types.headers()
+ rows: [[term]] | nil,
+ data: iodata
diff --git a/lib/ch/stream.ex b/lib/ch/stream.ex
new file mode 100644
index 0000000..6569782
--- /dev/null
+++ b/lib/ch/stream.ex
@@ -0,0 +1,43 @@
+defmodule Ch.Stream do
+ @moduledoc false
+ @derive {Inspect, only: []}
+ defstruct [:conn, :ref, :query, :params, :opts]
+ @type t :: %__MODULE__{
+ conn: DBConnection.conn(),
+ ref: Mint.Types.request_ref() | nil,
+ query: Ch.Query.t(),
+ params: Ch.params(),
+ opts: [Ch.query_option()]
+ }
+ defimpl Enumerable do
+ def reduce(stream, acc, fun) do
+ %Ch.Stream{conn: conn, query: query, params: params, opts: opts} = stream
+ stream = %DBConnection.Stream{conn: conn, query: query, params: params, opts: opts}
+ DBConnection.reduce(stream, acc, fun)
+ end
+ def member?(_, _), do: {:error, __MODULE__}
+ def count(_), do: {:error, __MODULE__}
+ def slice(_), do: {:error, __MODULE__}
+ end
+ defimpl Collectable do
+ def into(stream) do
+ %Ch.Stream{conn: conn, query: query, params: params, opts: opts} = stream
+ ref = DBConnection.execute!(conn, query, {:stream, params}, opts)
+ {%{stream | ref: ref}, &collect/2}
+ end
+ defp collect(%{conn: conn, query: query, ref: ref} = stream, {:cont, data}) do
+ ^ref = DBConnection.execute!(conn, query, {:stream, ref, data})
+ stream
+ end
+ defp collect(%{conn: conn, query: query, ref: ref}, eof) when eof in [:halt, :done] do
+ DBConnection.execute!(conn, query, {:stream, ref, :eof})
+ end
+ end
diff --git a/test/ch/aggregation_test.exs b/test/ch/aggregation_test.exs
index 37bd8a3..996d03e 100644
--- a/test/ch/aggregation_test.exs
+++ b/test/ch/aggregation_test.exs
@@ -95,14 +95,15 @@ defmodule Ch.AggregationTest do
assert %{num_rows: 2} =
- """
- INSERT INTO test_insert_aggregate_function
- SELECT uid, updated, arrayReduce('argMaxState', [name], [updated])
- FROM input('uid Int16, updated DateTime, name String')
- FORMAT RowBinary\
- """,
- rows,
- types: ["Int16", "DateTime", "String"]
+ [
+ """
+ INSERT INTO test_insert_aggregate_function
+ SELECT uid, updated, arrayReduce('argMaxState', [name], [updated])
+ FROM input('uid Int16, updated DateTime, name String')
+ FORMAT RowBinary
+ """
+ | Ch.RowBinary.encode_rows(rows, ["Int16", "DateTime", "String"])
+ ]
assert Ch.query!(conn, """
@@ -126,12 +127,16 @@ defmodule Ch.AggregationTest do
- "INSERT INTO test_users_ephemeral_column(uid, updated, name_stub) FORMAT RowBinary",
- _rows = [
- [1231, ~N[2020-01-02 00:00:00], "Jane"],
- [1231, ~N[2020-01-01 00:00:00], "John"]
- ],
- types: ["Int16", "DateTime", "String"]
+ [
+ "INSERT INTO test_users_ephemeral_column(uid, updated, name_stub) FORMAT RowBinary\n"
+ | Ch.RowBinary.encode_rows(
+ [
+ [1231, ~N[2020-01-02 00:00:00], "Jane"],
+ [1231, ~N[2020-01-01 00:00:00], "John"]
+ ],
+ ["Int16", "DateTime", "String"]
+ )
+ ]
assert Ch.query!(conn, """
@@ -152,16 +157,20 @@ defmodule Ch.AggregationTest do
- """
- INSERT INTO test_users_input_function
- SELECT uid, updated, arrayReduce('argMaxState', [name], [updated])
- FROM input('uid Int16, updated DateTime, name String') FORMAT RowBinary\
- """,
- _rows = [
- [1231, ~N[2020-01-02 00:00:00], "Jane"],
- [1231, ~N[2020-01-01 00:00:00], "John"]
- ],
- types: ["Int16", "DateTime", "String"]
+ [
+ """
+ INSERT INTO test_users_input_function
+ SELECT uid, updated, arrayReduce('argMaxState', [name], [updated])
+ FROM input('uid Int16, updated DateTime, name String') FORMAT RowBinary
+ """
+ | Ch.RowBinary.encode_rows(
+ [
+ [1231, ~N[2020-01-02 00:00:00], "Jane"],
+ [1231, ~N[2020-01-01 00:00:00], "John"]
+ ],
+ ["Int16", "DateTime", "String"]
+ )
+ ]
assert Ch.query!(conn, """
@@ -196,12 +205,16 @@ defmodule Ch.AggregationTest do
- "INSERT INTO test_users_ne FORMAT RowBinary",
- _rows = [
- [1231, ~N[2020-01-02 00:00:00], "Jane"],
- [1231, ~N[2020-01-01 00:00:00], "John"]
- ],
- types: ["Int16", "DateTime", "String"]
+ [
+ "INSERT INTO test_users_ne FORMAT RowBinary\n"
+ | Ch.RowBinary.encode_rows(
+ [
+ [1231, ~N[2020-01-02 00:00:00], "Jane"],
+ [1231, ~N[2020-01-01 00:00:00], "John"]
+ ],
+ ["Int16", "DateTime", "String"]
+ )
+ ]
assert Ch.query!(conn, """
diff --git a/test/ch/connection_test.exs b/test/ch/connection_test.exs
index bab1635..55c998a 100644
--- a/test/ch/connection_test.exs
+++ b/test/ch/connection_test.exs
@@ -10,10 +10,6 @@ defmodule Ch.ConnectionTest do
assert {:ok, %{num_rows: 1, rows: [[1]]}} = Ch.query(conn, "select 1")
- test "select with types", %{conn: conn} do
- assert {:ok, %{num_rows: 1, rows: [[1]]}} = Ch.query(conn, "select 1", [], types: ["UInt8"])
- end
test "select with params", %{conn: conn} do
assert {:ok, %{num_rows: 1, rows: [[1]]}} = Ch.query(conn, "select {a:UInt8}", %{"a" => 1})
@@ -51,12 +47,12 @@ defmodule Ch.ConnectionTest do
# datetimes in params are sent in text and ClickHouse translates them to UTC from server timezone by default
# see https://clickhouse.com/docs/en/sql-reference/data-types/datetime
# https://kb.altinity.com/altinity-kb-queries-and-syntax/time-zones/
- assert {:ok, %{num_rows: 1, rows: [[naive_datetime]], headers: headers}} =
+ assert {:ok, %{num_rows: 1, rows: [[naive_datetime]]}} =
Ch.query(conn, "select {naive:DateTime}", %{"naive" => naive_noon})
# to make this test pass for contributors with non UTC timezone we perform the same steps as ClickHouse
# i.e. we give server timezone to the naive datetime and shift it to UTC before comparing with the result
- {_, timezone} = List.keyfind!(headers, "x-clickhouse-timezone", 0)
+ %Ch.Result{rows: [[timezone]]} = Ch.query!(conn, "select timezone()")
assert naive_datetime ==
@@ -66,17 +62,17 @@ defmodule Ch.ConnectionTest do
# when the timezone information is provided in the type, we don't need to rely on server timezone
assert {:ok, %{num_rows: 1, rows: [[bkk_datetime]]}} =
- Ch.query(conn, "select {$0:DateTime('Asia/Bangkok')}", [naive_noon])
+ Ch.query(conn, "select {naive:DateTime('Asia/Bangkok')}", [{"naive", naive_noon}])
assert bkk_datetime == DateTime.from_naive!(naive_noon, "Asia/Bangkok")
assert {:ok, %{num_rows: 1, rows: [[~U[2022-01-01 12:00:00Z]]]}} =
- Ch.query(conn, "select {$0:DateTime('UTC')}", [naive_noon])
+ Ch.query(conn, "select {naive:DateTime('UTC')}", [{"naive", naive_noon}])
naive_noon_ms = ~N[2022-01-01 12:00:00.123]
assert {:ok, %{num_rows: 1, rows: [[naive_datetime]]}} =
- Ch.query(conn, "select {$0:DateTime64(3)}", [naive_noon_ms])
+ Ch.query(conn, "select {naive:DateTime64(3)}", [{"naive", naive_noon_ms}])
assert NaiveDateTime.compare(
@@ -109,7 +105,7 @@ defmodule Ch.ConnectionTest do
# Ch.query(conn, "select {a:UUID}", %{"a" => uuid_bin})
# pseudo-positional bind
- assert {:ok, %{num_rows: 1, rows: [[1]]}} = Ch.query(conn, "select {$0:UInt8}", [1])
+ assert {:ok, %{num_rows: 1, rows: [[1]]}} = Ch.query(conn, "select {$0:UInt8}", [{"$0", 1}])
test "utc datetime query param encoding", %{conn: conn} do
@@ -117,13 +113,17 @@ defmodule Ch.ConnectionTest do
msk = DateTime.new!(~D[2021-01-01], ~T[15:00:00], "Europe/Moscow")
naive = utc |> DateTime.shift_zone!(Ch.Test.clickhouse_tz(conn)) |> DateTime.to_naive()
- assert Ch.query!(conn, "select {$0:DateTime} as d, toString(d)", [utc]).rows ==
+ assert Ch.query!(conn, "select {utc:DateTime} as d, toString(d)", %{"utc" => utc}).rows ==
[[~N[2021-01-01 12:00:00], to_string(naive)]]
- assert Ch.query!(conn, "select {$0:DateTime('UTC')} as d, toString(d)", [utc]).rows ==
+ assert Ch.query!(conn, "select {utc:DateTime('UTC')} as d, toString(d)", %{"utc" => utc}).rows ==
[[utc, "2021-01-01 12:00:00"]]
- assert Ch.query!(conn, "select {$0:DateTime('Europe/Moscow')} as d, toString(d)", [utc]).rows ==
+ assert Ch.query!(
+ conn,
+ "select {utc:DateTime('Europe/Moscow')} as d, toString(d)",
+ %{"utc" => utc}
+ ).rows ==
[[msk, "2021-01-01 15:00:00"]]
@@ -132,13 +132,17 @@ defmodule Ch.ConnectionTest do
msk = DateTime.new!(~D[2021-01-01], ~T[15:00:00.123456], "Europe/Moscow")
naive = utc |> DateTime.shift_zone!(Ch.Test.clickhouse_tz(conn)) |> DateTime.to_naive()
- assert Ch.query!(conn, "select {$0:DateTime64(6)} as d, toString(d)", [utc]).rows ==
+ assert Ch.query!(conn, "select {utc:DateTime64(6)} as d, toString(d)", %{"utc" => utc}).rows ==
[[~N[2021-01-01 12:00:00.123456], to_string(naive)]]
- assert Ch.query!(conn, "select {$0:DateTime64(6, 'UTC')} as d, toString(d)", [utc]).rows ==
+ assert Ch.query!(conn, "select {utc:DateTime64(6, 'UTC')} as d, toString(d)", %{"utc" => utc}).rows ==
[[utc, "2021-01-01 12:00:00.123456"]]
- assert Ch.query!(conn, "select {$0:DateTime64(6,'Europe/Moscow')} as d, toString(d)", [utc]).rows ==
+ assert Ch.query!(
+ conn,
+ "select {utc:DateTime64(6,'Europe/Moscow')} as d, toString(d)",
+ %{"utc" => utc}
+ ).rows ==
[[msk, "2021-01-01 15:00:00.123456"]]
@@ -148,7 +152,7 @@ defmodule Ch.ConnectionTest do
utc = ~U[2021-01-01 12:00:00.000000Z]
naive = utc |> DateTime.shift_zone!(Ch.Test.clickhouse_tz(conn)) |> DateTime.to_naive()
- assert Ch.query!(conn, "select {$0:DateTime64(6)} as d, toString(d)", [utc]).rows ==
+ assert Ch.query!(conn, "select {utc:DateTime64(6)} as d, toString(d)", %{"utc" => utc}).rows ==
[[~N[2021-01-01 12:00:00.000000], to_string(naive)]]
@@ -161,13 +165,16 @@ defmodule Ch.ConnectionTest do
test "create", %{conn: conn} do
- assert {:ok, %{num_rows: nil, rows: []}} =
+ assert {:ok, %{command: :create}} =
Ch.query(conn, "create table create_example(a UInt8) engine = Memory")
test "create with options", %{conn: conn} do
assert {:error, %Ch.Error{code: 164, message: message}} =
- Ch.query(conn, "create table create_example(a UInt8) engine = Memory", [],
+ Ch.query(
+ conn,
+ "create table create_example(a UInt8) engine = Memory",
+ _param = [],
settings: [readonly: 1]
@@ -202,8 +209,8 @@ defmodule Ch.ConnectionTest do
assert {:ok, %{num_rows: 2}} =
- "insert into {$0:Identifier}(a, b) values ({$1:UInt8},{$2:String}),({$3:UInt8},{$4:String})",
- [table, 4, "d", 5, "e"]
+ "insert into {table:Identifier}(a, b) values ({a:UInt8},{b:String}),({c:UInt8},{d:String})",
+ %{"table" => table, "a" => 4, "b" => "d", "c" => 5, "d" => "e"}
assert {:ok, %{rows: rows}} =
@@ -226,26 +233,15 @@ defmodule Ch.ConnectionTest do
assert message =~ "Cannot execute query in readonly mode."
- test "automatic RowBinary", %{conn: conn, table: table} do
- stmt = "insert into #{table}(a, b) format RowBinary"
+ test "RowBinary", %{conn: conn, table: table} do
types = ["UInt8", "String"]
rows = [[1, "a"], [2, "b"]]
- assert %{num_rows: 2} = Ch.query!(conn, stmt, rows, types: types)
- assert %{rows: rows} =
- Ch.query!(conn, "select * from {table:Identifier}", %{"table" => table})
- assert rows == [[1, "a"], [2, "b"]]
- end
- test "manual RowBinary", %{conn: conn, table: table} do
- stmt = "insert into #{table}(a, b) format RowBinary"
- types = ["UInt8", "String"]
- rows = [[1, "a"], [2, "b"]]
- data = RowBinary.encode_rows(rows, types)
- assert %{num_rows: 2} = Ch.query!(conn, stmt, data, encode: false)
+ assert %{num_rows: 2} =
+ Ch.query!(conn, [
+ "insert into #{table}(a, b) format RowBinary\n"
+ | RowBinary.encode_rows(rows, types)
+ ])
assert %{rows: rows} =
Ch.query!(conn, "select * from {table:Identifier}", %{"table" => table})
@@ -257,18 +253,18 @@ defmodule Ch.ConnectionTest do
types = ["UInt8", "String"]
rows = [[1, "a"], [2, "b"], [3, "c"]]
- stream =
+ row_binary =
|> Stream.chunk_every(2)
|> Stream.map(fn chunk -> RowBinary.encode_rows(chunk, types) end)
- assert {:ok, %{num_rows: 3}} =
- Ch.query(
- conn,
- "insert into #{table}(a, b) format RowBinary",
- stream,
- encode: false
- )
+ assert %{num_rows: 3} =
+ DBConnection.run(conn, fn conn ->
+ Enum.into(
+ row_binary,
+ Ch.stream(conn, "insert into #{table}(a, b) format RowBinary\n")
+ )
+ end)
assert {:ok, %{rows: rows}} =
Ch.query(conn, "select * from {table:Identifier}", %{"table" => table})
@@ -299,8 +295,8 @@ defmodule Ch.ConnectionTest do
assert {:ok, %{num_rows: 2}} =
- "insert into {$0:Identifier}(a, b) select a, b from {$0:Identifier} where a > {$1:UInt8}",
- [table, 1]
+ "insert into {table:Identifier}(a, b) select a, b from {table:Identifier} where a > {a:UInt8}",
+ %{"table" => table, "a" => 1}
assert {:ok, %{rows: new_rows}} =
@@ -320,7 +316,7 @@ defmodule Ch.ConnectionTest do
settings = [allow_experimental_lightweight_delete: 1]
- assert {:ok, %{rows: [], command: :delete}} =
+ assert {:ok, %{command: :delete}} =
Ch.query(conn, "delete from delete_t where 1", [], settings: settings)
@@ -379,14 +375,18 @@ defmodule Ch.ConnectionTest do
assert {:ok, %{num_rows: 4}} =
- "insert into fixed_string_t(a) format RowBinary",
- [""],
- ["a"],
- ["aa"],
- ["aaa"]
- ],
- types: ["FixedString(3)"]
+ "insert into fixed_string_t(a) format RowBinary\n"
+ | RowBinary.encode_rows(
+ [
+ [""],
+ ["a"],
+ ["aa"],
+ ["aaa"]
+ ],
+ ["FixedString(3)"]
+ )
+ ]
assert Ch.query!(conn, "select * from fixed_string_t").rows == [
@@ -423,13 +423,13 @@ defmodule Ch.ConnectionTest do
assert %{num_rows: 3} =
- "insert into decimal_t(d) format RowBinary",
- _rows = [
- [Decimal.new("2.66")],
- [Decimal.new("2.6666")],
- [Decimal.new("2.66666")]
- ],
- types: ["Decimal32(4)"]
+ [
+ "insert into decimal_t(d) format RowBinary\n"
+ | RowBinary.encode_rows(
+ [[Decimal.new("2.66")], [Decimal.new("2.6666")], [Decimal.new("2.66666")]],
+ ["Decimal32(4)"]
+ )
+ ]
assert Ch.query!(conn, "select * from decimal_t").rows == [
@@ -454,9 +454,13 @@ defmodule Ch.ConnectionTest do
- "insert into test_bool(A, B) format RowBinary",
- _rows = [[3, true], [4, false]],
- types: ["Int64", "Bool"]
+ [
+ "insert into test_bool(A, B) format RowBinary\n"
+ | RowBinary.encode_rows(
+ [[3, true], [4, false]],
+ ["Int64", "Bool"]
+ )
+ ]
# anything > 0 is `true`, here `2` is `true`
@@ -497,9 +501,13 @@ defmodule Ch.ConnectionTest do
- "insert into t_uuid(x,y) format RowBinary",
- _rows = [[uuid, "Example 3"]],
- types: ["UUID", "String"]
+ [
+ "insert into t_uuid(x,y) format RowBinary\n"
+ | RowBinary.encode_rows(
+ [[uuid, "Example 3"]],
+ ["UUID", "String"]
+ )
+ ]
assert {:ok,
@@ -559,9 +567,13 @@ defmodule Ch.ConnectionTest do
- "INSERT INTO t_enum(i, x) FORMAT RowBinary",
- _rows = [[3, "hello"], [4, "world"], [5, 1], [6, 2]],
- types: ["UInt8", "Enum8('hello' = 1, 'world' = 2)"]
+ [
+ "INSERT INTO t_enum(i, x) FORMAT RowBinary\n"
+ | RowBinary.encode_rows(
+ _rows = [[3, "hello"], [4, "world"], [5, 1], [6, 2]],
+ ["UInt8", "Enum8('hello' = 1, 'world' = 2)"]
+ )
+ ]
assert Ch.query!(conn, "SELECT *, CAST(x, 'Int8') FROM t_enum ORDER BY i").rows == [
@@ -608,18 +620,22 @@ defmodule Ch.ConnectionTest do
assert Ch.query!(
- "INSERT INTO table_map FORMAT RowBinary",
- _rows = [
- [%{"key10" => 20, "key20" => 40}],
- # empty map
- [%{}],
- # null map
- [nil],
- # empty proplist map
- [[]],
- [[{"key50", 100}]]
- ],
- types: ["Map(String, UInt64)"]
+ [
+ "INSERT INTO table_map FORMAT RowBinary\n"
+ | RowBinary.encode_rows(
+ [
+ [%{"key10" => 20, "key20" => 40}],
+ # empty map
+ [%{}],
+ # null map
+ [nil],
+ # empty proplist map
+ [[]],
+ [[{"key50", 100}]]
+ ],
+ ["Map(String, UInt64)"]
+ )
+ ]
assert Ch.query!(conn, "SELECT * FROM table_map ORDER BY a ASC").rows == [
@@ -641,7 +657,7 @@ defmodule Ch.ConnectionTest do
[{1, "a"}, "Tuple(UInt8, String)"]
- assert Ch.query!(conn, "SELECT {$0:Tuple(Int8, String)}", [{-1, "abs"}]).rows == [
+ assert Ch.query!(conn, "SELECT {t:Tuple(Int8, String)}", %{"t" => {-1, "abs"}}).rows == [
[{-1, "abs"}]
@@ -660,9 +676,13 @@ defmodule Ch.ConnectionTest do
assert %{num_rows: 2} =
- "INSERT INTO tuples_t FORMAT RowBinary",
- _rows = [[{"a", 20}], [{"b", 30}]],
- types: ["Tuple(String, Int64)"]
+ [
+ "INSERT INTO tuples_t FORMAT RowBinary\n"
+ | RowBinary.encode_rows(
+ [[{"a", 20}], [{"b", 30}]],
+ ["Tuple(String, Int64)"]
+ )
+ ]
assert Ch.query!(conn, "SELECT a FROM tuples_t ORDER BY a.1 ASC").rows == [
@@ -702,13 +722,16 @@ defmodule Ch.ConnectionTest do
# datetimes in params are sent in text and ClickHouse translates them to UTC from server timezone by default
# see https://clickhouse.com/docs/en/sql-reference/data-types/datetime
# https://kb.altinity.com/altinity-kb-queries-and-syntax/time-zones/
- assert {:ok,
- %{num_rows: 1, rows: [[naive_datetime, "2022-12-12 12:00:00"]], headers: headers}} =
- Ch.query(conn, "select {$0:DateTime} as d, toString(d)", [naive_noon])
+ assert {:ok, %{num_rows: 1, rows: [[naive_datetime, "2022-12-12 12:00:00"]]}} =
+ Ch.query(
+ conn,
+ "select {naive:DateTime} as d, toString(d)",
+ %{"naive" => naive_noon}
+ )
# to make this test pass for contributors with non UTC timezone we perform the same steps as ClickHouse
# i.e. we give server timezone to the naive datetime and shift it to UTC before comparing with the result
- {_, timezone} = List.keyfind!(headers, "x-clickhouse-timezone", 0)
+ %Ch.Result{rows: [[timezone]]} = Ch.query!(conn, "select timezone()")
assert naive_datetime ==
@@ -717,12 +740,18 @@ defmodule Ch.ConnectionTest do
|> DateTime.to_naive()
assert {:ok, %{num_rows: 1, rows: [[~U[2022-12-12 12:00:00Z], "2022-12-12 12:00:00"]]}} =
- Ch.query(conn, "select {$0:DateTime('UTC')} as d, toString(d)", [naive_noon])
+ Ch.query(
+ conn,
+ "select {naive:DateTime('UTC')} as d, toString(d)",
+ %{"naive" => naive_noon}
+ )
assert {:ok, %{num_rows: 1, rows: rows}} =
- Ch.query(conn, "select {$0:DateTime('Asia/Bangkok')} as d, toString(d)", [
- naive_noon
- ])
+ Ch.query(
+ conn,
+ "select {naive:DateTime('Asia/Bangkok')} as d, toString(d)",
+ %{"naive" => naive_noon}
+ )
assert rows == [
@@ -737,7 +766,7 @@ defmodule Ch.ConnectionTest do
on_exit(fn -> Calendar.put_time_zone_database(prev_tz_db) end)
assert_raise ArgumentError, ~r/:utc_only_time_zone_database/, fn ->
- Ch.query(conn, "select {$0:DateTime('Asia/Tokyo')}", [naive_noon])
+ Ch.query(conn, "select {naive:DateTime('Asia/Tokyo')}", %{"naive" => naive_noon})
@@ -759,21 +788,37 @@ defmodule Ch.ConnectionTest do
assert {:ok, %{num_rows: 1, rows: [[~D[1900-01-01], "1900-01-01"]]}} =
- Ch.query(conn, "select {$0:Date32} as d, toString(d)", [~D[1900-01-01]])
+ Ch.query(
+ conn,
+ "select {date:Date32} as d, toString(d)",
+ %{"date" => ~D[1900-01-01]}
+ )
# max
assert {:ok, %{num_rows: 1, rows: [[~D[2299-12-31], "2299-12-31"]]}} =
- Ch.query(conn, "select {$0:Date32} as d, toString(d)", [~D[2299-12-31]])
+ Ch.query(
+ conn,
+ "select {date:Date32} as d, toString(d)",
+ %{"date" => ~D[2299-12-31]}
+ )
# min
assert {:ok, %{num_rows: 1, rows: [[~D[1900-01-01], "1900-01-01"]]}} =
- Ch.query(conn, "select {$0:Date32} as d, toString(d)", [~D[1900-01-01]])
+ Ch.query(
+ conn,
+ "select {date:Date32} as d, toString(d)",
+ %{"date" => ~D[1900-01-01]}
+ )
- "insert into new(timestamp, event_id) format RowBinary",
- _rows = [[~D[1960-01-01], 3]],
- types: ["Date32", "UInt8"]
+ [
+ "insert into new(timestamp, event_id) format RowBinary\n"
+ | RowBinary.encode_rows(
+ [[~D[1960-01-01], 3]],
+ ["Date32", "UInt8"]
+ )
+ ]
assert %{
@@ -829,12 +874,16 @@ defmodule Ch.ConnectionTest do
- "insert into datetime64_t(event_id, timestamp) format RowBinary",
- _rows = [
- [4, ~N[2021-01-01 12:00:00.123456]],
- [5, ~N[2021-01-01 12:00:00]]
- ],
- types: ["UInt8", "DateTime64(3)"]
+ [
+ "insert into datetime64_t(event_id, timestamp) format RowBinary\n"
+ | RowBinary.encode_rows(
+ [
+ [4, ~N[2021-01-01 12:00:00.123456]],
+ [5, ~N[2021-01-01 12:00:00]]
+ ],
+ ["UInt8", "DateTime64(3)"]
+ )
+ ]
assert {:ok, %{num_rows: 2, rows: rows}} =
@@ -862,12 +911,16 @@ defmodule Ch.ConnectionTest do
# datetimes in params are sent in text and ClickHouse translates them to UTC from server timezone by default
# see https://clickhouse.com/docs/en/sql-reference/data-types/datetime
# https://kb.altinity.com/altinity-kb-queries-and-syntax/time-zones/
- assert {:ok, %{num_rows: 1, rows: [[naive_datetime]], headers: headers}} =
- Ch.query(conn, "select {$0:DateTime64(#{precision})}", [naive_noon])
+ assert {:ok, %{num_rows: 1, rows: [[naive_datetime]]}} =
+ Ch.query(
+ conn,
+ "select {naive:DateTime64(#{precision})}",
+ %{"naive" => naive_noon}
+ )
# to make this test pass for contributors with non UTC timezone we perform the same steps as ClickHouse
# i.e. we give server timezone to the naive datetime and shift it to UTC before comparing with the result
- {_, timezone} = List.keyfind!(headers, "x-clickhouse-timezone", 0)
+ %Ch.Result{rows: [[timezone]]} = Ch.query!(conn, "select timezone()")
expected =
@@ -919,9 +972,10 @@ defmodule Ch.ConnectionTest do
assert {:ok, %{num_rows: 5}} =
- "insert into nullable format RowBinary",
- <<1, 2, 3, 4, 5>>,
- encode: false
+ [
+ "insert into nullable format RowBinary\n"
+ | <<1, 2, 3, 4, 5>>
+ ]
assert %{num_rows: 1, rows: [[count]]} =
@@ -942,9 +996,13 @@ defmodule Ch.ConnectionTest do
- "INSERT INTO ch_nulls(a, b, c, d) FORMAT RowBinary",
- [[nil, nil, nil, nil]],
- types: ["UInt8", "Nullable(UInt8)", "UInt8", "Nullable(UInt8)"]
+ [
+ "INSERT INTO ch_nulls(a, b, c, d) FORMAT RowBinary\n"
+ | RowBinary.encode_rows(
+ [[nil, nil, nil, nil]],
+ ["UInt8", "Nullable(UInt8)", "UInt8", "Nullable(UInt8)"]
+ )
+ ]
# default is ignored...
@@ -962,14 +1020,18 @@ defmodule Ch.ConnectionTest do
- """
- INSERT INTO test_insert_default_value
- SELECT id, name
- FROM input('id UInt32, name Nullable(String)')
- FORMAT RowBinary\
- """,
- [[1, nil], [-1, nil]],
- types: ["UInt32", "Nullable(String)"]
+ [
+ """
+ INSERT INTO test_insert_default_value
+ SELECT id, name
+ FROM input('id UInt32, name Nullable(String)')
+ FORMAT RowBinary
+ """
+ | RowBinary.encode_rows(
+ [[1, nil], [-1, nil]],
+ ["UInt32", "Nullable(String)"]
+ )
+ ]
assert Ch.query!(conn, "SELECT * FROM test_insert_default_value ORDER BY n").rows ==
@@ -986,7 +1048,7 @@ defmodule Ch.ConnectionTest do
test "can encode and then decode Point in query params", %{conn: conn} do
- assert Ch.query!(conn, "select {$0:Point}", [{10, 10}]).rows == [
+ assert Ch.query!(conn, "select {p:Point}", %{"p" => {10, 10}}).rows == [
_row = [_point = {10.0, 10.0}]
@@ -994,19 +1056,22 @@ defmodule Ch.ConnectionTest do
test "can insert and select Point", %{conn: conn} do
Ch.query!(conn, "CREATE TABLE geo_point (p Point) ENGINE = Memory()")
Ch.query!(conn, "INSERT INTO geo_point VALUES((10, 10))")
- Ch.query!(conn, "INSERT INTO geo_point FORMAT RowBinary", [[{20, 20}]], types: ["Point"])
+ Ch.query!(conn, [
+ "INSERT INTO geo_point FORMAT RowBinary\n"
+ | RowBinary.encode_rows([[{20, 20}]], ["Point"])
+ ])
assert Ch.query!(conn, "SELECT p, toTypeName(p) FROM geo_point ORDER BY p ASC").rows == [
[{10.0, 10.0}, "Point"],
[{20.0, 20.0}, "Point"]
- # to make our RowBinary is not garbage in garbage out we also test a text format response
- assert conn
- |> Ch.query!(
+ # to make sure our RowBinary is not "garbage in, garbage out" we also test a "text format" response
+ assert Ch.query!(
+ conn,
"SELECT p, toTypeName(p) FROM geo_point ORDER BY p ASC FORMAT JSONCompact"
- )
- |> Map.fetch!(:rows)
+ ).data
|> Jason.decode!()
|> Map.fetch!("data") == [
[[10, 10], "Point"],
@@ -1021,7 +1086,7 @@ defmodule Ch.ConnectionTest do
test "can encode and then decode Ring in query params", %{conn: conn} do
ring = [{0.0, 1.0}, {10.0, 3.0}]
- assert Ch.query!(conn, "select {$0:Ring}", [ring]).rows == [_row = [ring]]
+ assert Ch.query!(conn, "select {r:Ring}", %{"r" => ring}).rows == [_row = [ring]]
test "can insert and select Ring", %{conn: conn} do
@@ -1029,7 +1094,10 @@ defmodule Ch.ConnectionTest do
Ch.query!(conn, "INSERT INTO geo_ring VALUES([(0, 0), (10, 0), (10, 10), (0, 10)])")
ring = [{20, 20}, {0, 0}, {0, 20}]
- Ch.query!(conn, "INSERT INTO geo_ring FORMAT RowBinary", [[ring]], types: ["Ring"])
+ Ch.query!(conn, [
+ "INSERT INTO geo_ring FORMAT RowBinary\n" | RowBinary.encode_rows([[ring]], ["Ring"])
+ ])
assert Ch.query!(conn, "SELECT r, toTypeName(r) FROM geo_ring ORDER BY r ASC").rows == [
[[{0.0, 0.0}, {10.0, 0.0}, {10.0, 10.0}, {0.0, 10.0}], "Ring"],
@@ -1040,7 +1108,7 @@ defmodule Ch.ConnectionTest do
assert Ch.query!(
"SELECT r, toTypeName(r) FROM geo_ring ORDER BY r ASC FORMAT JSONCompact"
- ).rows
+ ).data
|> Jason.decode!()
|> Map.fetch!("data") == [
[[[0, 0], [10, 0], [10, 10], [0, 10]], "Ring"],
@@ -1058,7 +1126,7 @@ defmodule Ch.ConnectionTest do
test "can encode and then decode Polygon in query params", %{conn: conn} do
polygon = [[{0.0, 1.0}, {10.0, 3.0}], [], [{2, 2}]]
- assert Ch.query!(conn, "select {$0:Polygon}", [polygon]).rows == [_row = [polygon]]
+ assert Ch.query!(conn, "select {p:Polygon}", %{"p" => polygon}).rows == [_row = [polygon]]
test "can insert and select Polygon", %{conn: conn} do
@@ -1070,7 +1138,11 @@ defmodule Ch.ConnectionTest do
polygon = [[{0, 1.0}, {10, 3.2}], [], [{2, 2}]]
- Ch.query!(conn, "INSERT INTO geo_polygon FORMAT RowBinary", [[polygon]], types: ["Polygon"])
+ Ch.query!(conn, [
+ "INSERT INTO geo_polygon FORMAT RowBinary\n"
+ | RowBinary.encode_rows([[polygon]], ["Polygon"])
+ ])
assert Ch.query!(conn, "SELECT pg, toTypeName(pg) FROM geo_polygon ORDER BY pg ASC").rows ==
@@ -1088,7 +1160,7 @@ defmodule Ch.ConnectionTest do
assert Ch.query!(
"SELECT pg, toTypeName(pg) FROM geo_polygon ORDER BY pg ASC FORMAT JSONCompact"
- ).rows
+ ).data
|> Jason.decode!()
|> Map.fetch!("data") == [
[[[[0, 1], [10, 3.2]], [], [[2, 2]]], "Polygon"],
@@ -1113,7 +1185,7 @@ defmodule Ch.ConnectionTest do
test "can encode and then decode MultiPolygon in query params", %{conn: conn} do
multipolygon = [[[{0.0, 1.0}, {10.0, 3.0}], [], [{2, 2}]], [], [[{3, 3}]]]
- assert Ch.query!(conn, "select {$0:MultiPolygon}", [multipolygon]).rows == [
+ assert Ch.query!(conn, "select {mp:MultiPolygon}", %{"mp" => multipolygon}).rows == [
_row = [multipolygon]
@@ -1128,9 +1200,13 @@ defmodule Ch.ConnectionTest do
multipolygon = [[[{0.0, 1.0}, {10.0, 3.0}], [], [{2, 2}]], [], [[{3, 3}]]]
- Ch.query!(conn, "INSERT INTO geo_multipolygon FORMAT RowBinary", [[multipolygon]],
- types: ["MultiPolygon"]
- )
+ Ch.query!(conn, [
+ "INSERT INTO geo_multipolygon FORMAT RowBinary\n"
+ | RowBinary.encode_rows(
+ [[multipolygon]],
+ ["MultiPolygon"]
+ )
+ ])
assert Ch.query!(conn, "SELECT mpg, toTypeName(mpg) FROM geo_multipolygon ORDER BY mpg ASC").rows ==
@@ -1166,7 +1242,7 @@ defmodule Ch.ConnectionTest do
assert Ch.query!(
"SELECT mpg, toTypeName(mpg) FROM geo_multipolygon ORDER BY mpg ASC FORMAT JSONCompact"
- ).rows
+ ).data
|> Jason.decode!()
|> Map.fetch!("data") == [
@@ -1206,8 +1282,9 @@ defmodule Ch.ConnectionTest do
assert {:error, %Ch.Error{code: 81} = error} =
Ch.query(conn, "select 1 + 1", _params = [], database: "no-db")
+ assert Exception.message(error) =~ "Code: 81."
assert Exception.message(error) =~ "`no-db`"
- assert Exception.message(error) =~ "UNKNOWN_DATABASE"
+ assert Exception.message(error) =~ "(UNKNOWN_DATABASE)"
test "can provide custom database", %{conn: conn} do
@@ -1235,55 +1312,24 @@ defmodule Ch.ConnectionTest do
describe "stream" do
- test "sends mint http packets", %{conn: conn} do
- stmt = "select number from system.numbers limit 1000"
- drop_ref = fn packets ->
- Enum.map(packets, fn
- {tag, _ref, data} -> {tag, data}
- {tag, _ref} -> tag
- end)
- end
- packets =
- Ch.run(conn, fn conn ->
+ test "emits result structs containing raw data", %{conn: conn} do
+ results =
+ DBConnection.run(conn, fn conn ->
- |> Ch.stream(stmt)
- |> Enum.flat_map(drop_ref)
- end)
- assert [{:status, 200}, {:headers, headers} | _rest] = packets
- assert List.keyfind!(headers, "transfer-encoding", 0) == {"transfer-encoding", "chunked"}
- assert data_packets =
- packets
- |> Enum.filter(&match?({:data, _data}, &1))
- |> Enum.map(fn {:data, data} -> data end)
- assert length(data_packets) >= 2
- assert RowBinary.decode_rows(Enum.join(data_packets)) == Enum.map(0..999, &[&1])
- assert List.last(packets) == :done
- end
- test "decodes RowBinary", %{conn: conn} do
- stmt = "select number from system.numbers limit 1000"
- rows =
- Ch.run(conn, fn conn ->
- conn
- |> Ch.stream(stmt, _params = [], types: [:u64])
+ |> Ch.stream("select number from system.numbers limit 1000")
|> Enum.into([])
- assert List.flatten(rows) == Enum.into(0..999, [])
+ assert length(results) >= 2
+ assert results |> Enum.map(& &1.data) |> IO.iodata_to_binary() |> RowBinary.decode_rows() ==
+ Enum.map(0..999, &[&1])
test "disconnects on early halt", %{conn: conn} do
logs =
ExUnit.CaptureLog.capture_log(fn ->
- Ch.run(conn, fn conn ->
+ DBConnection.run(conn, fn conn ->
conn |> Ch.stream("select number from system.numbers") |> Enum.take(1)
@@ -1338,46 +1384,49 @@ defmodule Ch.ConnectionTest do
test "error on type mismatch", %{conn: conn} do
- stmt = "insert into row_binary_names_and_types_t format RowBinaryWithNamesAndTypes"
- rows = [["AB", "rare", -42]]
names = ["country_code", "rare_string", "maybe_int32"]
+ types = [Ch.Types.fixed_string(2), Ch.Types.string(), Ch.Types.nullable(Ch.Types.u32())]
- opts = [
- names: names,
- types: [Ch.Types.fixed_string(2), Ch.Types.string(), Ch.Types.nullable(Ch.Types.u32())]
- ]
+ assert {:error, %Ch.Error{code: 117, message: message}} =
+ Ch.query(conn, [
+ "insert into row_binary_names_and_types_t format RowBinaryWithNamesAndTypes\n",
+ _header = RowBinary.encode_names_and_types(names, types)
+ | RowBinary.encode_rows([["AB", "rare", -42]], types)
+ ])
- assert {:error, %Ch.Error{code: 117, message: message}} = Ch.query(conn, stmt, rows, opts)
assert message =~ "Type of 'rare_string' must be LowCardinality(String), not String"
- opts = [
- names: names,
- types: [
- Ch.Types.fixed_string(2),
- Ch.Types.low_cardinality(Ch.Types.string()),
- Ch.Types.nullable(Ch.Types.u32())
- ]
+ types = [
+ Ch.Types.fixed_string(2),
+ Ch.Types.low_cardinality(Ch.Types.string()),
+ Ch.Types.nullable(Ch.Types.u32())
- assert {:error, %Ch.Error{code: 117, message: message}} = Ch.query(conn, stmt, rows, opts)
+ assert {:error, %Ch.Error{code: 117, message: message}} =
+ Ch.query(conn, [
+ "insert into row_binary_names_and_types_t format RowBinaryWithNamesAndTypes\n",
+ _header = RowBinary.encode_names_and_types(names, types)
+ | RowBinary.encode_rows([["AB", "rare", -42]], types)
+ ])
assert message =~ "Type of 'maybe_int32' must be Nullable(Int32), not Nullable(UInt32)"
test "ok on valid types", %{conn: conn} do
- stmt = "insert into row_binary_names_and_types_t format RowBinaryWithNamesAndTypes"
- rows = [["AB", "rare", -42]]
names = ["country_code", "rare_string", "maybe_int32"]
- opts = [
- names: names,
- types: [
- Ch.Types.fixed_string(2),
- Ch.Types.low_cardinality(Ch.Types.string()),
- Ch.Types.nullable(Ch.Types.i32())
- ]
+ types = [
+ Ch.Types.fixed_string(2),
+ Ch.Types.low_cardinality(Ch.Types.string()),
+ Ch.Types.nullable(Ch.Types.i32())
- assert {:ok, %{num_rows: 1}} = Ch.query(conn, stmt, rows, opts)
+ assert {:ok, %{num_rows: 1}} =
+ Ch.query(conn, [
+ "insert into row_binary_names_and_types_t format RowBinaryWithNamesAndTypes\n",
+ _header = RowBinary.encode_names_and_types(names, types)
+ | RowBinary.encode_rows([["AB", "rare", -42]], types)
+ ])
diff --git a/test/ch/faults_test.exs b/test/ch/faults_test.exs
index e647364..9a97969 100644
--- a/test/ch/faults_test.exs
+++ b/test/ch/faults_test.exs
@@ -364,13 +364,17 @@ defmodule Ch.FaultsTest do
:ok = :gen_tcp.close(mint)
spawn_link(fn ->
- assert {:error, %Mint.TransportError{reason: :closed}} =
- Ch.query(
- conn,
- "insert into unknown_table(a,b) format RowBinary",
- stream,
- encode: false
- )
+ try do
+ DBConnection.run(conn, fn conn ->
+ Enum.into(
+ stream,
+ Ch.stream(conn, "insert into unknown_table(a,b) format RowBinary\n")
+ )
+ end)
+ rescue
+ e in [DBConnection.ConnectionError, Mint.TransportError] ->
+ assert Exception.message(e) =~ "closed"
+ end
# reconnect
@@ -381,15 +385,14 @@ defmodule Ch.FaultsTest do
:ok = :gen_tcp.send(mint, intercept_packets(clickhouse))
spawn_link(fn ->
- assert {:error, %Ch.Error{code: 60, message: message}} =
- Ch.query(
- conn,
- "insert into unknown_table(a,b) format RowBinary",
- stream,
- encode: false
- )
- assert message =~ ~r/UNKNOWN_TABLE/
+ assert_raise Ch.Error, ~r/UNKNOWN_TABLE/, fn ->
+ DBConnection.run(conn, fn conn ->
+ Enum.into(
+ stream,
+ Ch.stream(conn, "insert into unknown_table(a,b) format RowBinary\n")
+ )
+ end)
+ end
send(test, :done)
@@ -423,13 +426,17 @@ defmodule Ch.FaultsTest do
:ok = :gen_tcp.send(mint, intercept_packets(clickhouse))
spawn_link(fn ->
- assert {:error, %Mint.TransportError{reason: :closed}} =
- Ch.query(
- conn,
- "insert into unknown_table(a,b) format RowBinary",
- stream,
- encode: false
- )
+ try do
+ DBConnection.run(conn, fn conn ->
+ Enum.into(
+ stream,
+ Ch.stream(conn, "insert into unknown_table(a,b) format RowBinary\n")
+ )
+ end)
+ rescue
+ e in [DBConnection.ConnectionError, Mint.TransportError] ->
+ assert Exception.message(e) =~ "closed"
+ end
# close after first packet from mint arrives
@@ -444,15 +451,14 @@ defmodule Ch.FaultsTest do
:ok = :gen_tcp.send(mint, intercept_packets(clickhouse))
spawn_link(fn ->
- assert {:error, %Ch.Error{code: 60, message: message}} =
- Ch.query(
- conn,
- "insert into unknown_table(a,b) format RowBinary",
- stream,
- encode: false
- )
- assert message =~ ~r/UNKNOWN_TABLE/
+ assert_raise Ch.Error, ~r/UNKNOWN_TABLE/, fn ->
+ DBConnection.run(conn, fn conn ->
+ Enum.into(
+ stream,
+ Ch.stream(conn, "insert into unknown_table(a,b) format RowBinary\n")
+ )
+ end)
+ end
send(test, :done)
@@ -472,8 +478,7 @@ defmodule Ch.FaultsTest do
test = self()
header = "X-ClickHouse-Server-Display-Name"
- {:ok, %Result{headers: headers}} = Ch.Test.sql_exec("select 1")
- {_, expected_name} = List.keyfind!(headers, String.downcase(header), 0)
+ {:ok, %Result{rows: [[expected_name]]}} = Ch.Test.sql_exec("select hostName()")
log =
capture_async_log(fn ->
diff --git a/test/ch/headers_test.exs b/test/ch/headers_test.exs
index 52aa7c3..18c8b98 100644
--- a/test/ch/headers_test.exs
+++ b/test/ch/headers_test.exs
@@ -7,50 +7,38 @@ defmodule Ch.HeadersTest do
test "can request gzipped response through headers", %{conn: conn} do
- assert {:ok, %{rows: rows, headers: headers}} =
+ assert {:ok, %{data: data}} =
Ch.query(conn, "select number from system.numbers limit 100", [],
decode: false,
settings: [enable_http_compression: 1],
headers: [{"accept-encoding", "gzip"}]
- assert :proplists.get_value("content-type", headers) == "application/octet-stream"
- assert :proplists.get_value("content-encoding", headers) == "gzip"
- assert :proplists.get_value("x-clickhouse-format", headers) == "RowBinaryWithNamesAndTypes"
# https://en.wikipedia.org/wiki/Gzip
- assert <<0x1F, 0x8B, _rest::bytes>> = IO.iodata_to_binary(rows)
+ assert <<0x1F, 0x8B, _rest::bytes>> = IO.iodata_to_binary(data)
test "can request lz4 response through headers", %{conn: conn} do
- assert {:ok, %{rows: rows, headers: headers}} =
+ assert {:ok, %{data: data}} =
Ch.query(conn, "select number from system.numbers limit 100", [],
decode: false,
settings: [enable_http_compression: 1],
headers: [{"accept-encoding", "lz4"}]
- assert :proplists.get_value("content-type", headers) == "application/octet-stream"
- assert :proplists.get_value("content-encoding", headers) == "lz4"
- assert :proplists.get_value("x-clickhouse-format", headers) == "RowBinaryWithNamesAndTypes"
# https://en.wikipedia.org/wiki/LZ4_(compression_algorithm)
- assert <<0x04, 0x22, 0x4D, 0x18, _rest::bytes>> = IO.iodata_to_binary(rows)
+ assert <<0x04, 0x22, 0x4D, 0x18, _rest::bytes>> = IO.iodata_to_binary(data)
test "can request zstd response through headers", %{conn: conn} do
- assert {:ok, %{rows: rows, headers: headers}} =
+ assert {:ok, %{data: data}} =
Ch.query(conn, "select number from system.numbers limit 100", [],
decode: false,
settings: [enable_http_compression: 1],
headers: [{"accept-encoding", "zstd"}]
- assert :proplists.get_value("content-type", headers) == "application/octet-stream"
- assert :proplists.get_value("content-encoding", headers) == "zstd"
- assert :proplists.get_value("x-clickhouse-format", headers) == "RowBinaryWithNamesAndTypes"
# https://en.wikipedia.org/wiki/LZ4_(compression_algorithm)
- assert <<0x28, 0xB5, 0x2F, 0xFD, _rest::bytes>> = IO.iodata_to_binary(rows)
+ assert <<0x28, 0xB5, 0x2F, 0xFD, _rest::bytes>> = IO.iodata_to_binary(data)
diff --git a/test/ch/query_test.exs b/test/ch/query_test.exs
index c8eec26..7ff7b9c 100644
--- a/test/ch/query_test.exs
+++ b/test/ch/query_test.exs
@@ -20,7 +20,9 @@ defmodule Ch.QueryTest do
""").command == :select
assert Query.build(["select 1+2"]).command == :select
- assert Query.build([?S, ?E, ?L | "ECT 1"]).command == :select
+ # TODO?
+ refute Query.build([?S, ?E, ?L | "ECT 1"]).command == :select
assert Query.build("with insert as (select 1) select * from insert").command == :select
assert Query.build("insert into table(a, b) values(1, 2)").command == :insert
@@ -112,27 +114,34 @@ defmodule Ch.QueryTest do
test "decoded binaries copy behaviour", %{conn: conn} do
text = "hello world"
- assert [[bin]] = Ch.query!(conn, "SELECT {$0:String}", [text]).rows
+ assert [[bin]] = Ch.query!(conn, "SELECT {t:String}", %{"t" => text}).rows
assert :binary.referenced_byte_size(bin) == byte_size(text)
# For OTP 20+ refc binaries up to 64 bytes might be copied during a GC
text = String.duplicate("hello world", 6)
- assert [[bin]] = Ch.query!(conn, "SELECT {$0:String}", [text]).rows
+ assert [[bin]] = Ch.query!(conn, "SELECT {t:String}", %{"t" => text}).rows
assert :binary.referenced_byte_size(bin) == byte_size(text)
test "encode basic types", %{conn: conn} do
# assert [[nil, nil]] = query("SELECT $1::text, $2::int", [nil, nil])
- assert [[true, false]] = Ch.query!(conn, "SELECT {$0:bool}, {$1:Bool}", [true, false]).rows
- assert [["ẽ"]] = Ch.query!(conn, "SELECT {$0:char}", ["ẽ"]).rows
- assert [[42]] = Ch.query!(conn, "SELECT {$0:int}", [42]).rows
- assert [[42.0, 43.0]] = Ch.query!(conn, "SELECT {$0:float}, {$1:float}", [42, 43.0]).rows
- assert [[nil, nil]] = Ch.query!(conn, "SELECT {$0:float}, {$1:float}", ["NaN", "nan"]).rows
- assert [[nil]] = Ch.query!(conn, "SELECT {$0:float}", ["inf"]).rows
- assert [[nil]] = Ch.query!(conn, "SELECT {$0:float}", ["-inf"]).rows
- assert [["ẽric"]] = Ch.query!(conn, "SELECT {$0:varchar}", ["ẽric"]).rows
- assert [[<<1, 2, 3>>]] = Ch.query!(conn, "SELECT {$0:bytea}", [<<1, 2, 3>>]).rows
+ assert [[true, false]] =
+ Ch.query!(conn, "SELECT {a:bool}, {b:Bool}", [{"a", true}, {"b", false}]).rows
+ assert [["ẽ"]] = Ch.query!(conn, "SELECT {e:char}", [{"e", "ẽ"}]).rows
+ assert [[42]] = Ch.query!(conn, "SELECT {num:int}", [{"num", 42}]).rows
+ assert [[42.0, 43.0]] =
+ Ch.query!(conn, "SELECT {n:float}, {f:float}", [{"n", 42}, {"f", 43.0}]).rows
+ assert [[nil, nil]] =
+ Ch.query!(conn, "SELECT {a:float}, {b:float}", [{"a", "NaN"}, {"b", "nan"}]).rows
+ assert [[nil]] = Ch.query!(conn, "SELECT {i:float}", %{"i" => "inf"}).rows
+ assert [[nil]] = Ch.query!(conn, "SELECT {ni:float}", %{"ni" => "-inf"}).rows
+ assert [["ẽric"]] = Ch.query!(conn, "SELECT {name:varchar}", %{"name" => "ẽric"}).rows
+ assert [[<<1, 2, 3>>]] = Ch.query!(conn, "SELECT {b:bytea}", %{"b" => <<1, 2, 3>>}).rows
test "encode numeric", %{conn: conn} do
@@ -159,70 +168,78 @@ defmodule Ch.QueryTest do
Enum.each(nums, fn {num, type} ->
dec = Decimal.new(num)
- assert [[dec]] == Ch.query!(conn, "SELECT {$0:#{type}}", [dec]).rows
+ assert [[dec]] == Ch.query!(conn, "SELECT {dec:#{type}}", %{"dec" => dec}).rows
test "encode integers and floats as numeric", %{conn: conn} do
dec = Decimal.new(1)
- assert [[dec]] == Ch.query!(conn, "SELECT {$0:numeric(1,0)}", [1]).rows
+ assert [[dec]] == Ch.query!(conn, "SELECT {dec:numeric(1,0)}", %{"dec" => 1}).rows
dec = Decimal.from_float(1.0)
- assert [[dec]] == Ch.query!(conn, "SELECT {$0:numeric(2,1)}", [1.0]).rows
+ assert [[dec]] == Ch.query!(conn, "SELECT {dec:numeric(2,1)}", %{"dec" => 1.0}).rows
@tag skip: true
test "encode json/jsonb", %{conn: conn} do
json = %{"foo" => 42}
- assert [[json]] == Ch.query!(conn, "SELECT {$0::json}", [json]).rows
+ assert [[json]] == Ch.query!(conn, "SELECT {json::json}", %{"json" => json}).rows
test "encode uuid", %{conn: conn} do
uuid = <<0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15>>
uuid_hex = "00010203-0405-0607-0809-0a0b0c0d0e0f"
- assert [[^uuid]] = Ch.query!(conn, "SELECT {$0:UUID}", [uuid_hex]).rows
+ assert [[^uuid]] = Ch.query!(conn, "SELECT {uuid:UUID}", %{"uuid" => uuid_hex}).rows
test "encode arrays", %{conn: conn} do
- assert [[[]]] = Ch.query!(conn, "SELECT {$0:Array(integer)}", [[]]).rows
- assert [[[1]]] = Ch.query!(conn, "SELECT {$0:Array(integer)}", [[1]]).rows
- assert [[[1, 2]]] = Ch.query!(conn, "SELECT {$0:Array(integer)}", [[1, 2]]).rows
+ assert [[[]]] = Ch.query!(conn, "SELECT {a:Array(integer)}", %{"a" => []}).rows
+ assert [[[1]]] = Ch.query!(conn, "SELECT {a:Array(integer)}", %{"a" => [1]}).rows
+ assert [[[1, 2]]] = Ch.query!(conn, "SELECT {a:Array(integer)}", %{"a" => [1, 2]}).rows
- assert [[["1"]]] = Ch.query!(conn, "SELECT {$0:Array(String)}", [["1"]]).rows
- assert [[[true]]] = Ch.query!(conn, "SELECT {$0:Array(Bool)}", [[true]]).rows
+ assert [[["1"]]] = Ch.query!(conn, "SELECT {a:Array(String)}", %{"a" => ["1"]}).rows
+ assert [[[true]]] = Ch.query!(conn, "SELECT {a:Array(Bool)}", %{"a" => [true]}).rows
assert [[[~D[2023-01-01]]]] =
- Ch.query!(conn, "SELECT {$0:Array(Date)}", [[~D[2023-01-01]]]).rows
+ Ch.query!(conn, "SELECT {a:Array(Date)}", %{"a" => [~D[2023-01-01]]}).rows
assert [[[Ch.Test.to_clickhouse_naive(conn, ~N[2023-01-01 12:00:00])]]] ==
- Ch.query!(conn, "SELECT {$0:Array(DateTime)}", [[~N[2023-01-01 12:00:00]]]).rows
+ Ch.query!(conn, "SELECT {a:Array(DateTime)}", %{"a" => [~N[2023-01-01 12:00:00]]}).rows
assert [[[~U[2023-01-01 12:00:00Z]]]] ==
- Ch.query!(conn, "SELECT {$0:Array(DateTime('UTC'))}", [[~N[2023-01-01 12:00:00]]]).rows
+ Ch.query!(
+ conn,
+ "SELECT {a:Array(DateTime('UTC'))}",
+ %{"a" => [~N[2023-01-01 12:00:00]]}
+ ).rows
assert [[[~N[2023-01-01 12:00:00]]]] ==
- Ch.query!(conn, "SELECT {$0:Array(DateTime)}", [[~U[2023-01-01 12:00:00Z]]]).rows
+ Ch.query!(conn, "SELECT {a:Array(DateTime)}", %{"a" => [~U[2023-01-01 12:00:00Z]]}).rows
assert [[[~U[2023-01-01 12:00:00Z]]]] ==
- Ch.query!(conn, "SELECT {$0:Array(DateTime('UTC'))}", [[~U[2023-01-01 12:00:00Z]]]).rows
+ Ch.query!(conn, "SELECT {a:Array(DateTime('UTC'))}", %{
+ "a" => [~U[2023-01-01 12:00:00Z]]
+ }).rows
assert [[[[0], [1]]]] =
- Ch.query!(conn, "SELECT {$0:Array(Array(integer))}", [[[0], [1]]]).rows
+ Ch.query!(conn, "SELECT {a:Array(Array(integer))}", %{"a" => [[0], [1]]}).rows
+ assert [[[[0]]]] = Ch.query!(conn, "SELECT {a:Array(Array(integer))}", %{"a" => [[0]]}).rows
- assert [[[[0]]]] = Ch.query!(conn, "SELECT {$0:Array(Array(integer))}", [[[0]]]).rows
- # assert [[[1, nil, 3]]] = Ch.query!(conn, "SELECT {$0:Array(integer)}", [[1, nil, 3]]).rows
+ # assert [[[1, nil, 3]]] = Ch.query!(conn, "SELECT {a:Array(integer)}", %{"a" => [1, nil, 3]}).rows
test "encode network types", %{conn: conn} do
# TODO, or wrap in custom struct like in postgrex
# assert [[""]] =
- # Ch.query!(conn, "SELECT {$0:inet4}::text", [{127, 0, 0, 1}]).rows
+ # Ch.query!(conn, "SELECT {ip:inet4}::text", %{"ip" => {127, 0, 0, 1}}).rows
- assert [[{127, 0, 0, 1}]] = Ch.query!(conn, "SELECT {$0:text}::inet4", [""]).rows
+ assert [[{127, 0, 0, 1}]] =
+ Ch.query!(conn, "SELECT {ip:text}::inet4", %{"ip" => ""}).rows
assert [[{0, 0, 0, 0, 0, 0, 0, 1}]] =
- Ch.query!(conn, "SELECT {$0:text}::inet6", ["::1"]).rows
+ Ch.query!(conn, "SELECT {ip:text}::inet6", %{"ip" => "::1"}).rows
test "result struct", %{conn: conn} do
diff --git a/test/ch/stream_test.exs b/test/ch/stream_test.exs
new file mode 100644
index 0000000..8c8292f
--- /dev/null
+++ b/test/ch/stream_test.exs
@@ -0,0 +1,56 @@
+defmodule Ch.StreamTest do
+ use ExUnit.Case
+ alias Ch.{Result, RowBinary}
+ setup do
+ {:ok, conn: start_supervised!({Ch, database: Ch.Test.database()})}
+ end
+ describe "enumerable Ch.stream/4" do
+ test "emits %Ch.Result{}", %{conn: conn} do
+ count = 1_000_000
+ assert [%Result{command: :select, data: header} | _rest] =
+ results =
+ DBConnection.run(conn, fn conn ->
+ conn
+ |> Ch.stream("select * from numbers({count:UInt64})", %{"count" => 1_000_000})
+ |> Enum.into([])
+ end)
+ assert [<<1, 6, "number", 6, "UInt64">> | _] = header
+ decoded = results |> Enum.map(& &1.data) |> IO.iodata_to_binary() |> RowBinary.decode_rows()
+ assert [[0], [1], [2] | _] = decoded
+ assert length(decoded) == count
+ end
+ test "raises on error", %{conn: conn} do
+ assert_raise Ch.Error,
+ ~r/Code: 62. DB::Exception: Syntax error: failed at position 8/,
+ fn ->
+ DBConnection.run(conn, fn conn ->
+ conn |> Ch.stream("select ", %{"count" => 1_000_000}) |> Enum.into([])
+ end)
+ end
+ end
+ end
+ describe "collectable Ch.stream/4" do
+ test "inserts chunks", %{conn: conn} do
+ Ch.query!(conn, "create table collect_stream(i UInt64) engine Memory")
+ assert %Ch.Result{command: :insert, num_rows: 1_000_000} =
+ DBConnection.run(conn, fn conn ->
+ Stream.repeatedly(fn -> [:rand.uniform(100)] end)
+ |> Stream.chunk_every(100_000)
+ |> Stream.map(fn chunk -> RowBinary.encode_rows(chunk, _types = ["UInt64"]) end)
+ |> Stream.take(10)
+ |> Enum.into(Ch.stream(conn, "insert into collect_stream(i) format RowBinary\n"))
+ end)
+ assert Ch.query!(conn, "select count(*) from collect_stream").rows == [[1_000_000]]
+ end
+ end