From dfe6b691a874dc4a0a5fac49af0ae7b603f9de5e Mon Sep 17 00:00:00 2001 From: ruslandoga <67764432+ruslandoga@users.noreply.github.com> Date: Mon, 29 Jan 2024 11:51:52 +0900 Subject: [PATCH 01/20] move data to sql --- CHANGELOG.md | 5 + README.md | 92 +++++---- lib/ch.ex | 59 ++++-- lib/ch/connection.ex | 39 ++-- lib/ch/query.ex | 159 ++++---------- test/ch/aggregation_test.exs | 73 ++++--- test/ch/connection_test.exs | 388 +++++++++++++++++++++-------------- test/ch/faults_test.exs | 28 +-- test/ch/query_test.exs | 83 +++++--- 9 files changed, 510 insertions(+), 416 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9b1afaa..7803592 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,10 @@ # Changelog +## Unreleased + +- move rows payload (RowBinary, CSV, etc.) to SQL statement +- remove pseudo-positional binds, make param names explicit + ## 0.2.4 (2024-01-29) - use `ch-#{version}` as user-agent https://github.com/plausible/ch/pull/154 diff --git a/README.md b/README.md index 4f55203..e6121cf 100644 --- a/README.md +++ b/README.md @@ -85,7 +85,7 @@ Ch.query!(pid, "CREATE TABLE IF NOT EXISTS ch_demo(id UInt64) ENGINE Null") Ch.query!(pid, "INSERT INTO ch_demo(id) SELECT number FROM system.numbers LIMIT {limit:UInt8}", %{"limit" => 2}) ``` -#### Insert rows as [RowBinary](https://clickhouse.com/docs/en/interfaces/formats#rowbinary) (efficient) +#### Insert [RowBinary](https://clickhouse.com/docs/en/interfaces/formats#rowbinary) ```elixir {:ok, pid} = Ch.start_link() @@ -98,8 +98,11 @@ types = [Ch.Types.u64()] # or types = [:u64] +rows = [[0], [1]] +row_binary = Ch.RowBinary.encode_rows(rows, types) + %Ch.Result{num_rows: 2} = - Ch.query!(pid, "INSERT INTO ch_demo(id) FORMAT RowBinary", [[0], [1]], types: types) + Ch.query!(pid, ["INSERT INTO ch_demo(id) FORMAT RowBinary\n" | row_binary]) ``` Note that RowBinary format encoding requires `:types` option to be provided. @@ -107,11 +110,14 @@ Note that RowBinary format encoding requires `:types` option to be provided. Similarly, you can use [`RowBinaryWithNamesAndTypes`](https://clickhouse.com/docs/en/interfaces/formats#rowbinarywithnamesandtypes) which would additionally do something like a type check. ```elixir -sql = "INSERT INTO ch_demo FORMAT RowBinaryWithNamesAndTypes" -opts = [names: ["id"], types: ["UInt64"]] -rows = [[0], [1]] +names = ["id"] +types = ["UInt64"] + +header = Ch.RowBinary.encode_names_and_types(names, types) +row_binary = Ch.RowBinary.encode_rows(rows, types) -%Ch.Result{num_rows: 2} = Ch.query!(pid, sql, rows, opts) +%Ch.Result{num_rows: 3} = + Ch.query!(pid, ["INSERT INTO ch_demo FORMAT RowBinaryWithNamesAndTypes\n", header | row_binary]) ``` #### Insert rows in custom [format](https://clickhouse.com/docs/en/interfaces/formats) @@ -121,29 +127,42 @@ rows = [[0], [1]] Ch.query!(pid, "CREATE TABLE IF NOT EXISTS ch_demo(id UInt64) ENGINE Null") -csv = [0, 1] |> Enum.map(&to_string/1) |> Enum.intersperse(?\n) +csv = "0\n1" %Ch.Result{num_rows: 2} = - Ch.query!(pid, "INSERT INTO ch_demo(id) FORMAT CSV", csv, encode: false) + Ch.query!(pid, ["INSERT INTO ch_demo(id) FORMAT CSV\n" | csv]) ``` -#### Insert rows as chunked RowBinary stream +#### Insert rows as [chunked](https://en.wikipedia.org/wiki/Chunked_transfer_encoding) RowBinary stream ```elixir {:ok, pid} = Ch.start_link() Ch.query!(pid, "CREATE TABLE IF NOT EXISTS ch_demo(id UInt64) ENGINE Null") -stream = Stream.repeatedly(fn -> [:rand.uniform(100)] end) -chunked = Stream.chunk_every(stream, 100) -encoded = Stream.map(chunked, fn chunk -> Ch.RowBinary.encode_rows(chunk, _types = ["UInt64"]) end) -ten_encoded_chunks = Stream.take(encoded, 10) +row_binary = + Stream.repeatedly(fn -> [:rand.uniform(100)] end) + |> Stream.chunk_every(100_000) + |> Stream.map(fn chunk -> Ch.RowBinary.encode_rows(chunk, _types = ["UInt64"]) end) + |> Stream.take(10) -%Ch.Result{num_rows: 1000} = - Ch.query(pid, "INSERT INTO ch_demo(id) FORMAT RowBinary", ten_encoded_chunks, encode: false) +%Ch.Result{num_rows: 1_000_000} = + Ch.query(pid, Stream.concat(["INSERT INTO ch_demo(id) FORMAT RowBinary\n"], row_binary)) ``` -This query makes a [`transfer-encoding: chunked`](https://en.wikipedia.org/wiki/Chunked_transfer_encoding) HTTP request while unfolding the stream resulting in lower memory usage. +#### Insert rows via [input](https://clickhouse.com/docs/en/sql-reference/table-functions/input) function + +```elixir +{:ok, pid} = Ch.start_link() + +Ch.query!(pid, "CREATE TABLE IF NOT EXISTS ch_demo(id UInt64) ENGINE Null") + +sql = "INSERT INTO ch_demo SELECT id + {ego:Int64} FROM input('id UInt64') FORMAT RowBinary\n" +row_binary = Ch.RowBinary.encode_rows([[1], [2], [3]], ["UInt64"]) + +%Ch.Result{num_rows: 3} = + Ch.query!(pid, [sql | row_binary], %{"ego" => -1}) +``` #### Query with custom [settings](https://clickhouse.com/docs/en/operations/settings/settings) @@ -156,7 +175,7 @@ settings = [async_insert: 1] Ch.query!(pid, "SHOW SETTINGS LIKE 'async_insert'") %Ch.Result{rows: [["async_insert", "Bool", "1"]]} = - Ch.query!(pid, "SHOW SETTINGS LIKE 'async_insert'", [], settings: settings) + Ch.query!(pid, "SHOW SETTINGS LIKE 'async_insert'", _params = [], settings: settings) ``` ## Caveats @@ -179,13 +198,13 @@ CREATE TABLE ch_nulls ( """) types = ["Nullable(UInt8)", "UInt8", "UInt8"] -inserted_rows = [[nil, nil, nil]] -selected_rows = [[nil, 0, 0]] +rows = [[nil, nil, nil]] +row_binary = Ch.RowBinary.encode_rows(rows, types) %Ch.Result{num_rows: 1} = - Ch.query!(pid, "INSERT INTO ch_nulls(a, b, c) FORMAT RowBinary", inserted_rows, types: types) + Ch.query!(pid, ["INSERT INTO ch_nulls(a, b, c) FORMAT RowBinary\n" | row_binary]) -%Ch.Result{rows: ^selected_rows} = +%Ch.Result{rows: [[nil, 0, 0]]} = Ch.query!(pid, "SELECT * FROM ch_nulls") ``` @@ -197,13 +216,17 @@ However, [`input()`](https://clickhouse.com/docs/en/sql-reference/table-function sql = """ INSERT INTO ch_nulls SELECT * FROM input('a Nullable(UInt8), b Nullable(UInt8), c UInt8') - FORMAT RowBinary\ + FORMAT RowBinary """ -Ch.query!(pid, sql, inserted_rows, types: ["Nullable(UInt8)", "Nullable(UInt8)", "UInt8"]) +types = ["Nullable(UInt8)", "Nullable(UInt8)", "UInt8"] +row_binary = Ch.RowBinary.encode_rows(rows, types) + +%Ch.Result{num_rows: 1} = + Ch.query!(pid, [sql | row_binary]) -%Ch.Result{rows: [[0], [10]]} = - Ch.query!(pid, "SELECT b FROM ch_nulls ORDER BY b") +%Ch.Result{rows: [_before = [nil, 0, 0], _after = [nil, 10, 0]]} = + Ch.query!(pid, "SELECT * FROM ch_nulls ORDER BY b") ``` #### UTF-8 in RowBinary @@ -215,26 +238,19 @@ When decoding [`String`](https://clickhouse.com/docs/en/sql-reference/data-types Ch.query!(pid, "CREATE TABLE ch_utf8(str String) ENGINE Memory") -bin = "\x61\xF0\x80\x80\x80b" -utf8 = "a�b" +# "\x61\xF0\x80\x80\x80b" will become "a�b" on SELECT +row_binary = Ch.RowBinary.encode(:string, "\x61\xF0\x80\x80\x80b") %Ch.Result{num_rows: 1} = - Ch.query!(pid, "INSERT INTO ch_utf8(str) FORMAT RowBinary", [[bin]], types: ["String"]) + Ch.query!(pid, ["INSERT INTO ch_utf8(str) FORMAT RowBinary\n" | row_binary]) -%Ch.Result{rows: [[^utf8]]} = +%Ch.Result{rows: [["a�b"]]} = Ch.query!(pid, "SELECT * FROM ch_utf8") -%Ch.Result{rows: %{"data" => [[^utf8]]}} = +%Ch.Result{rows: %{"data" => [["a�b"]]}} = pid |> Ch.query!("SELECT * FROM ch_utf8 FORMAT JSONCompact") |> Map.update!(:rows, &Jason.decode!/1) ``` -To get raw binary from `String` columns use `:binary` type that skips UTF-8 checks. - -```elixir -%Ch.Result{rows: [[^bin]]} = - Ch.query!(pid, "SELECT * FROM ch_utf8", [], types: [:binary]) -``` - #### Timezones in RowBinary Decoding non-UTC datetimes like `DateTime('Asia/Taipei')` requires a [timezone database.](https://hexdocs.pm/elixir/DateTime.html#module-time-zone-database) @@ -268,7 +284,7 @@ utc = DateTime.utc_now() taipei = DateTime.shift_zone!(utc, "Asia/Taipei") # ** (ArgumentError) non-UTC timezones are not supported for encoding: 2023-04-26 01:49:43.044569+08:00 CST Asia/Taipei -Ch.query!(pid, "INSERT INTO ch_datetimes(datetime) FORMAT RowBinary", [[naive], [utc], [taipei]], types: ["DateTime"]) +Ch.RowBinary.encode_rows([[naive], [utc], [taipei]], ["DateTime"]) ``` ## Benchmarks diff --git a/lib/ch.ex b/lib/ch.ex index a027102..0a8ac52 100644 --- a/lib/ch.ex +++ b/lib/ch.ex @@ -2,49 +2,85 @@ defmodule Ch do @moduledoc "Minimal HTTP ClickHouse client." alias Ch.{Connection, Query, Result} + @type common_option :: + {:database, String.t()} + | {:username, String.t()} + | {:password, String.t()} + | {:settings, Keyword.t()} + | {:timeout, timeout} + + @type start_option :: + common_option + | {:scheme, String.t()} + | {:hostname, String.t()} + | {:port, :inet.port_number()} + | {:transport_opts, :gen_tcp.connect_option()} + | DBConnection.start_option() + @doc """ Start the connection process and connect to ClickHouse. ## Options + * `:scheme` - HTTP scheme, defaults to `"http"` * `:hostname` - server hostname, defaults to `"localhost"` * `:port` - HTTP port, defualts to `8123` - * `:scheme` - HTTP scheme, defaults to `"http"` + * `:transport_opts` - options to be given to the transport being used. See `Mint.HTTP1.connect/4` for more info * `:database` - Database, defaults to `"default"` * `:username` - Username * `:password` - User password * `:settings` - Keyword list of ClickHouse settings * `:timeout` - HTTP receive timeout in milliseconds - * `:transport_opts` - options to be given to the transport being used. See `Mint.HTTP1.connect/4` for more info + * [`DBConnection.start_option()`](https://hexdocs.pm/db_connection/DBConnection.html#t:start_option/0) """ + @spec start_link([start_option]) :: GenServer.on_start() def start_link(opts \\ []) do DBConnection.start_link(Connection, opts) end @doc """ Returns a supervisor child specification for a DBConnection pool. + + See `start_link/1` for supported options. """ + @spec child_spec([start_option]) :: :supervisor.child_spec() def child_spec(opts) do DBConnection.child_spec(Connection, opts) end + # TODO move streaming to Ch.stream/4 + @type statement :: iodata | Enumerable.t() + @type params :: %{String.t() => term} | [{String.t(), term}] + + @type query_option :: + common_option + | {:command, Ch.Query.command()} + | {:headers, [{String.t(), String.t()}]} + | {:format, String.t()} + | {:decode, boolean} + | DBConnection.connection_option() + @doc """ Runs a query and returns the result as `{:ok, %Ch.Result{}}` or `{:error, Exception.t()}` if there was a database error. ## Options - * `:timeout` - Query request timeout - * `:settings` - Keyword list of settings * `:database` - Database * `:username` - Username * `:password` - User password + * `:settings` - Keyword list of settings + * `:timeout` - Query request timeout + * `:command` - Command tag for the query + * `:headers` - Custom HTTP headers for the request + * `:format` - Custom response format for the request + * `:decode` - Whether to automatically decode the response + * [`DBConnection.connection_option()`](https://hexdocs.pm/db_connection/DBConnection.html#t:connection_option/0) """ - @spec query(DBConnection.conn(), iodata, params, Keyword.t()) :: + @spec query(DBConnection.conn(), statement, params, [query_option]) :: {:ok, Result.t()} | {:error, Exception.t()} - when params: map | [term] | [row :: [term]] | iodata | Enumerable.t() def query(conn, statement, params \\ [], opts \\ []) do query = Query.build(statement, opts) @@ -57,26 +93,19 @@ defmodule Ch do Runs a query and returns the result or raises `Ch.Error` if there was an error. See `query/4`. """ - @spec query!(DBConnection.conn(), iodata, params, Keyword.t()) :: Result.t() - when params: map | [term] | [row :: [term]] | iodata | Enumerable.t() + @spec query!(DBConnection.conn(), statement, params, [query_option]) :: Result.t() def query!(conn, statement, params \\ [], opts \\ []) do query = Query.build(statement, opts) DBConnection.execute!(conn, query, params, opts) end @doc false - @spec stream(DBConnection.t(), iodata, map | [term], Keyword.t()) :: DBConnection.Stream.t() + @spec stream(DBConnection.t(), statement, params, [query_option]) :: DBConnection.Stream.t() def stream(conn, statement, params \\ [], opts \\ []) do query = Query.build(statement, opts) DBConnection.stream(conn, query, params, opts) end - @doc false - @spec run(DBConnection.conn(), (DBConnection.t() -> any), Keyword.t()) :: any - def run(conn, f, opts \\ []) when is_function(f, 1) do - DBConnection.run(conn, f, opts) - end - if Code.ensure_loaded?(Ecto.ParameterizedType) do @behaviour Ecto.ParameterizedType diff --git a/lib/ch/connection.ex b/lib/ch/connection.ex index 70fcaf7..6a729c9 100644 --- a/lib/ch/connection.ex +++ b/lib/ch/connection.ex @@ -134,17 +134,17 @@ defmodule Ch.Connection do end @impl true - def handle_execute(%Query{command: :insert} = query, params, opts, conn) do - {query_params, extra_headers, body} = params + def handle_execute(%Query{statement: statement} = query, params, opts, conn) do + {query_params, extra_headers} = params path = path(conn, query_params, opts) headers = headers(conn, extra_headers, opts) result = - if is_function(body, 2) do - request_chunked(conn, "POST", path, headers, body, opts) + if is_list(statement) or is_binary(statement) do + request(conn, "POST", path, headers, statement, opts) else - request(conn, "POST", path, headers, body, opts) + request_chunked(conn, "POST", path, headers, statement, opts) end with {:ok, conn, responses} <- result do @@ -152,17 +152,6 @@ defmodule Ch.Connection do end end - def handle_execute(query, params, opts, conn) do - {query_params, extra_headers, body} = params - - path = path(conn, query_params, opts) - headers = headers(conn, extra_headers, opts) - - with {:ok, conn, responses} <- request(conn, "POST", path, headers, body, opts) do - {:ok, query, responses, conn} - end - end - @impl true def disconnect(_error, conn) do {:ok = ok, _conn} = HTTP.close(conn) @@ -171,7 +160,14 @@ defmodule Ch.Connection do @typep response :: Mint.Types.status() | Mint.Types.headers() | binary - @spec request(conn, binary, binary, Mint.Types.headers(), iodata, Keyword.t()) :: + @spec request( + conn, + method :: String.t(), + path :: String.t(), + Mint.Types.headers(), + body :: iodata, + [Ch.query_option()] + ) :: {:ok, conn, [response]} | {:error, Error.t(), conn} | {:disconnect, Mint.Types.error(), conn} @@ -181,7 +177,14 @@ defmodule Ch.Connection do end end - @spec request_chunked(conn, binary, binary, Mint.Types.headers(), Enumerable.t(), Keyword.t()) :: + @spec request_chunked( + conn, + method :: String.t(), + path :: String.t(), + Mint.Types.headers(), + body :: Enumerable.t(), + [Ch.query_option()] + ) :: {:ok, conn, [response]} | {:error, Error.t(), conn} | {:disconnect, Mint.Types.error(), conn} diff --git a/lib/ch/query.ex b/lib/ch/query.ex index 2e6888d..ef74095 100644 --- a/lib/ch/query.ex +++ b/lib/ch/query.ex @@ -1,16 +1,15 @@ defmodule Ch.Query do @moduledoc "Query struct wrapping the SQL statement." - defstruct [:statement, :command, :encode, :decode] + defstruct [:statement, :command] - @type t :: %__MODULE__{statement: iodata, command: atom, encode: boolean, decode: boolean} + @type t :: %__MODULE__{statement: Ch.statement(), command: command} + @type params :: [{String.t(), String.t()}] @doc false - @spec build(iodata, Keyword.t()) :: t + @spec build(Ch.statement(), [Ch.query_option()]) :: t def build(statement, opts \\ []) do command = Keyword.get(opts, :command) || extract_command(statement) - encode = Keyword.get(opts, :encode, true) - decode = Keyword.get(opts, :decode, true) - %__MODULE__{statement: statement, command: command, encode: encode, decode: decode} + %__MODULE__{statement: statement, command: command} end statements = [ @@ -43,6 +42,13 @@ defmodule Ch.Query do {"WATCH", :watch} ] + command_union = + statements + |> Enum.map(fn {_, command} -> command end) + |> Enum.reduce(&{:|, [], [&1, &2]}) + + @type command :: unquote(command_union) + defp extract_command(statement) for {statement, command} <- statements do @@ -54,8 +60,8 @@ defmodule Ch.Query do extract_command(rest) end - defp extract_command([first_segment | _] = statement) do - extract_command(first_segment) || extract_command(IO.iodata_to_binary(statement)) + defp extract_command([first_segment | _]) do + extract_command(first_segment) end defp extract_command(_other), do: nil @@ -64,118 +70,46 @@ end defimpl DBConnection.Query, for: Ch.Query do alias Ch.{Query, Result, RowBinary} - @spec parse(Query.t(), Keyword.t()) :: Query.t() + @spec parse(Query.t(), [Ch.query_option()]) :: Query.t() def parse(query, _opts), do: query - @spec describe(Query.t(), Keyword.t()) :: Query.t() + @spec describe(Query.t(), [Ch.query_option()]) :: Query.t() def describe(query, _opts), do: query - @spec encode(Query.t(), params, Keyword.t()) :: {query_params, Mint.Types.headers(), body} - when params: map | [term] | [row :: [term]] | iodata | Enumerable.t(), - query_params: [{String.t(), String.t()}], - body: iodata | Enumerable.t() - - def encode(%Query{command: :insert, encode: false, statement: statement}, data, opts) do - body = - case data do - _ when is_list(data) or is_binary(data) -> [statement, ?\n | data] - _ -> Stream.concat([[statement, ?\n]], data) - end - - {_query_params = [], headers(opts), body} - end - - def encode(%Query{command: :insert, statement: statement}, params, opts) do - cond do - names = Keyword.get(opts, :names) -> - types = Keyword.fetch!(opts, :types) - header = RowBinary.encode_names_and_types(names, types) - data = RowBinary.encode_rows(params, types) - {_query_params = [], headers(opts), [statement, ?\n, header | data]} - - format_row_binary?(statement) -> - types = Keyword.fetch!(opts, :types) - data = RowBinary.encode_rows(params, types) - {_query_params = [], headers(opts), [statement, ?\n | data]} - - true -> - {query_params(params), headers(opts), statement} - end - end - - def encode(%Query{statement: statement}, params, opts) do - types = Keyword.get(opts, :types) - default_format = if types, do: "RowBinary", else: "RowBinaryWithNamesAndTypes" - format = Keyword.get(opts, :format) || default_format - {query_params(params), [{"x-clickhouse-format", format} | headers(opts)], statement} - end - - defp format_row_binary?(statement) when is_binary(statement) do - statement |> String.trim_trailing() |> String.ends_with?("RowBinary") - end - - defp format_row_binary?(statement) when is_list(statement) do - statement - |> IO.iodata_to_binary() - |> format_row_binary?() + @spec encode(Query.t(), Ch.params(), [Ch.query_option()]) :: + {Ch.Query.params(), Mint.Types.headers()} + def encode(%Query{}, params, opts) do + format = Keyword.get(opts, :format, "RowBinaryWithNamesAndTypes") + headers = Keyword.get(opts, :headers, []) + {query_params(params), [{"x-clickhouse-format", format} | headers]} end - @spec decode(Query.t(), [response], Keyword.t()) :: Result.t() + @spec decode(Query.t(), [response], [Ch.query_option()]) :: Result.t() when response: Mint.Types.status() | Mint.Types.headers() | binary - def decode(%Query{command: :insert}, responses, _opts) do - [_status, headers | _data] = responses - - num_rows = - if summary = get_header(headers, "x-clickhouse-summary") do - %{"written_rows" => written_rows} = Jason.decode!(summary) - String.to_integer(written_rows) - end - - %Result{num_rows: num_rows, rows: nil, command: :insert, headers: headers} - end - - def decode(%Query{decode: false, command: command}, responses, _opts) when is_list(responses) do - # TODO potentially fails on x-progress-headers - [_status, headers | data] = responses - %Result{rows: data, command: command, headers: headers} - end - - def decode(%Query{command: command}, responses, opts) when is_list(responses) do - # TODO potentially fails on x-progress-headers + def decode(%Query{command: command}, responses, opts) do [_status, headers | data] = responses + format = get_header(headers, "x-clickhouse-format") + decode = Keyword.get(opts, :decode, true) - case get_header(headers, "x-clickhouse-format") do - "RowBinary" -> - types = Keyword.fetch!(opts, :types) - rows = data |> IO.iodata_to_binary() |> RowBinary.decode_rows(types) - %Result{num_rows: length(rows), rows: rows, command: command, headers: headers} - - "RowBinaryWithNamesAndTypes" -> + cond do + decode and format == "RowBinaryWithNamesAndTypes" -> rows = data |> IO.iodata_to_binary() |> RowBinary.decode_rows() - %Result{num_rows: length(rows), rows: rows, command: command, headers: headers} + %Result{num_rows: length(rows), rows: rows, headers: headers, command: command} - _other -> - %Result{rows: data, command: command, headers: headers} - end - end + format == nil -> + num_rows = + if summary = get_header(headers, "x-clickhouse-summary") do + %{"written_rows" => written_rows} = Jason.decode!(summary) + String.to_integer(written_rows) + end - # TODO merge :stream `decode/3` with "normal" `decode/3` clause above - @spec decode(Query.t(), {:stream, nil, responses}, Keyword.t()) :: responses - when responses: [Mint.Types.response()] - def decode(_query, {:stream, nil, responses}, _opts), do: responses + %Result{num_rows: num_rows, headers: headers, command: command} - @spec decode(Query.t(), {:stream, [atom], [Mint.Types.response()]}, Keyword.t()) :: [[term]] - def decode(_query, {:stream, types, responses}, _opts) do - decode_stream_data(responses, types) - end - - defp decode_stream_data([{:data, _ref, data} | rest], types) do - [RowBinary.decode_rows(data, types) | decode_stream_data(rest, types)] + true -> + %Result{rows: data, headers: headers, command: command} + end end - defp decode_stream_data([_ | rest], types), do: decode_stream_data(rest, types) - defp decode_stream_data([] = done, _types), do: done - defp get_header(headers, key) do case List.keyfind(headers, key, 0) do {_, value} -> value @@ -183,15 +117,9 @@ defimpl DBConnection.Query, for: Ch.Query do end end - defp query_params(params) when is_map(params) do - Enum.map(params, fn {k, v} -> {"param_#{k}", encode_param(v)} end) - end - - defp query_params(params) when is_list(params) do - params - |> Enum.with_index() - |> Enum.map(fn {v, idx} -> {"param_$#{idx}", encode_param(v)} end) - end + @compile inline: [query_params: 1] + defp query_params(params), do: Enum.map(params, &query_param/1) + defp query_param({k, v}), do: {"param_#{k}", encode_param(v)} defp encode_param(n) when is_integer(n), do: Integer.to_string(n) defp encode_param(f) when is_float(f), do: Float.to_string(f) @@ -280,9 +208,6 @@ defimpl DBConnection.Query, for: Ch.Query do end defp escape_param([], param), do: param - - @spec headers(Keyword.t()) :: Mint.Types.headers() - defp headers(opts), do: Keyword.get(opts, :headers, []) end defimpl String.Chars, for: Ch.Query do diff --git a/test/ch/aggregation_test.exs b/test/ch/aggregation_test.exs index 37bd8a3..996d03e 100644 --- a/test/ch/aggregation_test.exs +++ b/test/ch/aggregation_test.exs @@ -95,14 +95,15 @@ defmodule Ch.AggregationTest do assert %{num_rows: 2} = Ch.query!( conn, - """ - INSERT INTO test_insert_aggregate_function - SELECT uid, updated, arrayReduce('argMaxState', [name], [updated]) - FROM input('uid Int16, updated DateTime, name String') - FORMAT RowBinary\ - """, - rows, - types: ["Int16", "DateTime", "String"] + [ + """ + INSERT INTO test_insert_aggregate_function + SELECT uid, updated, arrayReduce('argMaxState', [name], [updated]) + FROM input('uid Int16, updated DateTime, name String') + FORMAT RowBinary + """ + | Ch.RowBinary.encode_rows(rows, ["Int16", "DateTime", "String"]) + ] ) assert Ch.query!(conn, """ @@ -126,12 +127,16 @@ defmodule Ch.AggregationTest do Ch.query!( conn, - "INSERT INTO test_users_ephemeral_column(uid, updated, name_stub) FORMAT RowBinary", - _rows = [ - [1231, ~N[2020-01-02 00:00:00], "Jane"], - [1231, ~N[2020-01-01 00:00:00], "John"] - ], - types: ["Int16", "DateTime", "String"] + [ + "INSERT INTO test_users_ephemeral_column(uid, updated, name_stub) FORMAT RowBinary\n" + | Ch.RowBinary.encode_rows( + [ + [1231, ~N[2020-01-02 00:00:00], "Jane"], + [1231, ~N[2020-01-01 00:00:00], "John"] + ], + ["Int16", "DateTime", "String"] + ) + ] ) assert Ch.query!(conn, """ @@ -152,16 +157,20 @@ defmodule Ch.AggregationTest do Ch.query!( conn, - """ - INSERT INTO test_users_input_function - SELECT uid, updated, arrayReduce('argMaxState', [name], [updated]) - FROM input('uid Int16, updated DateTime, name String') FORMAT RowBinary\ - """, - _rows = [ - [1231, ~N[2020-01-02 00:00:00], "Jane"], - [1231, ~N[2020-01-01 00:00:00], "John"] - ], - types: ["Int16", "DateTime", "String"] + [ + """ + INSERT INTO test_users_input_function + SELECT uid, updated, arrayReduce('argMaxState', [name], [updated]) + FROM input('uid Int16, updated DateTime, name String') FORMAT RowBinary + """ + | Ch.RowBinary.encode_rows( + [ + [1231, ~N[2020-01-02 00:00:00], "Jane"], + [1231, ~N[2020-01-01 00:00:00], "John"] + ], + ["Int16", "DateTime", "String"] + ) + ] ) assert Ch.query!(conn, """ @@ -196,12 +205,16 @@ defmodule Ch.AggregationTest do Ch.query!( conn, - "INSERT INTO test_users_ne FORMAT RowBinary", - _rows = [ - [1231, ~N[2020-01-02 00:00:00], "Jane"], - [1231, ~N[2020-01-01 00:00:00], "John"] - ], - types: ["Int16", "DateTime", "String"] + [ + "INSERT INTO test_users_ne FORMAT RowBinary\n" + | Ch.RowBinary.encode_rows( + [ + [1231, ~N[2020-01-02 00:00:00], "Jane"], + [1231, ~N[2020-01-01 00:00:00], "John"] + ], + ["Int16", "DateTime", "String"] + ) + ] ) assert Ch.query!(conn, """ diff --git a/test/ch/connection_test.exs b/test/ch/connection_test.exs index bab1635..2ff11f5 100644 --- a/test/ch/connection_test.exs +++ b/test/ch/connection_test.exs @@ -10,10 +10,6 @@ defmodule Ch.ConnectionTest do assert {:ok, %{num_rows: 1, rows: [[1]]}} = Ch.query(conn, "select 1") end - test "select with types", %{conn: conn} do - assert {:ok, %{num_rows: 1, rows: [[1]]}} = Ch.query(conn, "select 1", [], types: ["UInt8"]) - end - test "select with params", %{conn: conn} do assert {:ok, %{num_rows: 1, rows: [[1]]}} = Ch.query(conn, "select {a:UInt8}", %{"a" => 1}) @@ -66,17 +62,17 @@ defmodule Ch.ConnectionTest do # when the timezone information is provided in the type, we don't need to rely on server timezone assert {:ok, %{num_rows: 1, rows: [[bkk_datetime]]}} = - Ch.query(conn, "select {$0:DateTime('Asia/Bangkok')}", [naive_noon]) + Ch.query(conn, "select {naive:DateTime('Asia/Bangkok')}", [{"naive", naive_noon}]) assert bkk_datetime == DateTime.from_naive!(naive_noon, "Asia/Bangkok") assert {:ok, %{num_rows: 1, rows: [[~U[2022-01-01 12:00:00Z]]]}} = - Ch.query(conn, "select {$0:DateTime('UTC')}", [naive_noon]) + Ch.query(conn, "select {naive:DateTime('UTC')}", [{"naive", naive_noon}]) naive_noon_ms = ~N[2022-01-01 12:00:00.123] assert {:ok, %{num_rows: 1, rows: [[naive_datetime]]}} = - Ch.query(conn, "select {$0:DateTime64(3)}", [naive_noon_ms]) + Ch.query(conn, "select {naive:DateTime64(3)}", [{"naive", naive_noon_ms}]) assert NaiveDateTime.compare( naive_datetime, @@ -109,7 +105,7 @@ defmodule Ch.ConnectionTest do # Ch.query(conn, "select {a:UUID}", %{"a" => uuid_bin}) # pseudo-positional bind - assert {:ok, %{num_rows: 1, rows: [[1]]}} = Ch.query(conn, "select {$0:UInt8}", [1]) + assert {:ok, %{num_rows: 1, rows: [[1]]}} = Ch.query(conn, "select {$0:UInt8}", [{"$0", 1}]) end test "utc datetime query param encoding", %{conn: conn} do @@ -117,13 +113,17 @@ defmodule Ch.ConnectionTest do msk = DateTime.new!(~D[2021-01-01], ~T[15:00:00], "Europe/Moscow") naive = utc |> DateTime.shift_zone!(Ch.Test.clickhouse_tz(conn)) |> DateTime.to_naive() - assert Ch.query!(conn, "select {$0:DateTime} as d, toString(d)", [utc]).rows == + assert Ch.query!(conn, "select {utc:DateTime} as d, toString(d)", %{"utc" => utc}).rows == [[~N[2021-01-01 12:00:00], to_string(naive)]] - assert Ch.query!(conn, "select {$0:DateTime('UTC')} as d, toString(d)", [utc]).rows == + assert Ch.query!(conn, "select {utc:DateTime('UTC')} as d, toString(d)", %{"utc" => utc}).rows == [[utc, "2021-01-01 12:00:00"]] - assert Ch.query!(conn, "select {$0:DateTime('Europe/Moscow')} as d, toString(d)", [utc]).rows == + assert Ch.query!( + conn, + "select {utc:DateTime('Europe/Moscow')} as d, toString(d)", + %{"utc" => utc} + ).rows == [[msk, "2021-01-01 15:00:00"]] end @@ -132,13 +132,17 @@ defmodule Ch.ConnectionTest do msk = DateTime.new!(~D[2021-01-01], ~T[15:00:00.123456], "Europe/Moscow") naive = utc |> DateTime.shift_zone!(Ch.Test.clickhouse_tz(conn)) |> DateTime.to_naive() - assert Ch.query!(conn, "select {$0:DateTime64(6)} as d, toString(d)", [utc]).rows == + assert Ch.query!(conn, "select {utc:DateTime64(6)} as d, toString(d)", %{"utc" => utc}).rows == [[~N[2021-01-01 12:00:00.123456], to_string(naive)]] - assert Ch.query!(conn, "select {$0:DateTime64(6, 'UTC')} as d, toString(d)", [utc]).rows == + assert Ch.query!(conn, "select {utc:DateTime64(6, 'UTC')} as d, toString(d)", %{"utc" => utc}).rows == [[utc, "2021-01-01 12:00:00.123456"]] - assert Ch.query!(conn, "select {$0:DateTime64(6,'Europe/Moscow')} as d, toString(d)", [utc]).rows == + assert Ch.query!( + conn, + "select {utc:DateTime64(6,'Europe/Moscow')} as d, toString(d)", + %{"utc" => utc} + ).rows == [[msk, "2021-01-01 15:00:00.123456"]] end @@ -148,7 +152,7 @@ defmodule Ch.ConnectionTest do utc = ~U[2021-01-01 12:00:00.000000Z] naive = utc |> DateTime.shift_zone!(Ch.Test.clickhouse_tz(conn)) |> DateTime.to_naive() - assert Ch.query!(conn, "select {$0:DateTime64(6)} as d, toString(d)", [utc]).rows == + assert Ch.query!(conn, "select {utc:DateTime64(6)} as d, toString(d)", %{"utc" => utc}).rows == [[~N[2021-01-01 12:00:00.000000], to_string(naive)]] end @@ -161,13 +165,16 @@ defmodule Ch.ConnectionTest do end test "create", %{conn: conn} do - assert {:ok, %{num_rows: nil, rows: []}} = + assert {:ok, %{command: :create}} = Ch.query(conn, "create table create_example(a UInt8) engine = Memory") end test "create with options", %{conn: conn} do assert {:error, %Ch.Error{code: 164, message: message}} = - Ch.query(conn, "create table create_example(a UInt8) engine = Memory", [], + Ch.query( + conn, + "create table create_example(a UInt8) engine = Memory", + _param = [], settings: [readonly: 1] ) @@ -202,8 +209,8 @@ defmodule Ch.ConnectionTest do assert {:ok, %{num_rows: 2}} = Ch.query( conn, - "insert into {$0:Identifier}(a, b) values ({$1:UInt8},{$2:String}),({$3:UInt8},{$4:String})", - [table, 4, "d", 5, "e"] + "insert into {table:Identifier}(a, b) values ({a:UInt8},{b:String}),({c:UInt8},{d:String})", + %{"table" => table, "a" => 4, "b" => "d", "c" => 5, "d" => "e"} ) assert {:ok, %{rows: rows}} = @@ -226,26 +233,15 @@ defmodule Ch.ConnectionTest do assert message =~ "Cannot execute query in readonly mode." end - test "automatic RowBinary", %{conn: conn, table: table} do - stmt = "insert into #{table}(a, b) format RowBinary" + test "RowBinary", %{conn: conn, table: table} do types = ["UInt8", "String"] rows = [[1, "a"], [2, "b"]] - assert %{num_rows: 2} = Ch.query!(conn, stmt, rows, types: types) - - assert %{rows: rows} = - Ch.query!(conn, "select * from {table:Identifier}", %{"table" => table}) - - assert rows == [[1, "a"], [2, "b"]] - end - test "manual RowBinary", %{conn: conn, table: table} do - stmt = "insert into #{table}(a, b) format RowBinary" - - types = ["UInt8", "String"] - rows = [[1, "a"], [2, "b"]] - data = RowBinary.encode_rows(rows, types) - - assert %{num_rows: 2} = Ch.query!(conn, stmt, data, encode: false) + assert %{num_rows: 2} = + Ch.query!(conn, [ + "insert into #{table}(a, b) format RowBinary\n" + | RowBinary.encode_rows(rows, types) + ]) assert %{rows: rows} = Ch.query!(conn, "select * from {table:Identifier}", %{"table" => table}) @@ -257,7 +253,7 @@ defmodule Ch.ConnectionTest do types = ["UInt8", "String"] rows = [[1, "a"], [2, "b"], [3, "c"]] - stream = + row_binary = rows |> Stream.chunk_every(2) |> Stream.map(fn chunk -> RowBinary.encode_rows(chunk, types) end) @@ -265,9 +261,7 @@ defmodule Ch.ConnectionTest do assert {:ok, %{num_rows: 3}} = Ch.query( conn, - "insert into #{table}(a, b) format RowBinary", - stream, - encode: false + Stream.concat(["insert into #{table}(a, b) format RowBinary\n"], row_binary) ) assert {:ok, %{rows: rows}} = @@ -299,8 +293,8 @@ defmodule Ch.ConnectionTest do assert {:ok, %{num_rows: 2}} = Ch.query( conn, - "insert into {$0:Identifier}(a, b) select a, b from {$0:Identifier} where a > {$1:UInt8}", - [table, 1] + "insert into {table:Identifier}(a, b) select a, b from {table:Identifier} where a > {a:UInt8}", + %{"table" => table, "a" => 1} ) assert {:ok, %{rows: new_rows}} = @@ -320,7 +314,7 @@ defmodule Ch.ConnectionTest do settings = [allow_experimental_lightweight_delete: 1] - assert {:ok, %{rows: [], command: :delete}} = + assert {:ok, %{command: :delete}} = Ch.query(conn, "delete from delete_t where 1", [], settings: settings) end @@ -379,14 +373,18 @@ defmodule Ch.ConnectionTest do assert {:ok, %{num_rows: 4}} = Ch.query( conn, - "insert into fixed_string_t(a) format RowBinary", [ - [""], - ["a"], - ["aa"], - ["aaa"] - ], - types: ["FixedString(3)"] + "insert into fixed_string_t(a) format RowBinary\n" + | RowBinary.encode_rows( + [ + [""], + ["a"], + ["aa"], + ["aaa"] + ], + ["FixedString(3)"] + ) + ] ) assert Ch.query!(conn, "select * from fixed_string_t").rows == [ @@ -423,13 +421,13 @@ defmodule Ch.ConnectionTest do assert %{num_rows: 3} = Ch.query!( conn, - "insert into decimal_t(d) format RowBinary", - _rows = [ - [Decimal.new("2.66")], - [Decimal.new("2.6666")], - [Decimal.new("2.66666")] - ], - types: ["Decimal32(4)"] + [ + "insert into decimal_t(d) format RowBinary\n" + | RowBinary.encode_rows( + [[Decimal.new("2.66")], [Decimal.new("2.6666")], [Decimal.new("2.66666")]], + ["Decimal32(4)"] + ) + ] ) assert Ch.query!(conn, "select * from decimal_t").rows == [ @@ -454,9 +452,13 @@ defmodule Ch.ConnectionTest do Ch.query!( conn, - "insert into test_bool(A, B) format RowBinary", - _rows = [[3, true], [4, false]], - types: ["Int64", "Bool"] + [ + "insert into test_bool(A, B) format RowBinary\n" + | RowBinary.encode_rows( + [[3, true], [4, false]], + ["Int64", "Bool"] + ) + ] ) # anything > 0 is `true`, here `2` is `true` @@ -497,9 +499,13 @@ defmodule Ch.ConnectionTest do Ch.query!( conn, - "insert into t_uuid(x,y) format RowBinary", - _rows = [[uuid, "Example 3"]], - types: ["UUID", "String"] + [ + "insert into t_uuid(x,y) format RowBinary\n" + | RowBinary.encode_rows( + [[uuid, "Example 3"]], + ["UUID", "String"] + ) + ] ) assert {:ok, @@ -559,9 +565,13 @@ defmodule Ch.ConnectionTest do Ch.query!( conn, - "INSERT INTO t_enum(i, x) FORMAT RowBinary", - _rows = [[3, "hello"], [4, "world"], [5, 1], [6, 2]], - types: ["UInt8", "Enum8('hello' = 1, 'world' = 2)"] + [ + "INSERT INTO t_enum(i, x) FORMAT RowBinary\n" + | RowBinary.encode_rows( + _rows = [[3, "hello"], [4, "world"], [5, 1], [6, 2]], + ["UInt8", "Enum8('hello' = 1, 'world' = 2)"] + ) + ] ) assert Ch.query!(conn, "SELECT *, CAST(x, 'Int8') FROM t_enum ORDER BY i").rows == [ @@ -608,18 +618,22 @@ defmodule Ch.ConnectionTest do assert Ch.query!( conn, - "INSERT INTO table_map FORMAT RowBinary", - _rows = [ - [%{"key10" => 20, "key20" => 40}], - # empty map - [%{}], - # null map - [nil], - # empty proplist map - [[]], - [[{"key50", 100}]] - ], - types: ["Map(String, UInt64)"] + [ + "INSERT INTO table_map FORMAT RowBinary\n" + | RowBinary.encode_rows( + [ + [%{"key10" => 20, "key20" => 40}], + # empty map + [%{}], + # null map + [nil], + # empty proplist map + [[]], + [[{"key50", 100}]] + ], + ["Map(String, UInt64)"] + ) + ] ) assert Ch.query!(conn, "SELECT * FROM table_map ORDER BY a ASC").rows == [ @@ -641,7 +655,7 @@ defmodule Ch.ConnectionTest do [{1, "a"}, "Tuple(UInt8, String)"] ] - assert Ch.query!(conn, "SELECT {$0:Tuple(Int8, String)}", [{-1, "abs"}]).rows == [ + assert Ch.query!(conn, "SELECT {t:Tuple(Int8, String)}", %{"t" => {-1, "abs"}}).rows == [ [{-1, "abs"}] ] @@ -660,9 +674,13 @@ defmodule Ch.ConnectionTest do assert %{num_rows: 2} = Ch.query!( conn, - "INSERT INTO tuples_t FORMAT RowBinary", - _rows = [[{"a", 20}], [{"b", 30}]], - types: ["Tuple(String, Int64)"] + [ + "INSERT INTO tuples_t FORMAT RowBinary\n" + | RowBinary.encode_rows( + [[{"a", 20}], [{"b", 30}]], + ["Tuple(String, Int64)"] + ) + ] ) assert Ch.query!(conn, "SELECT a FROM tuples_t ORDER BY a.1 ASC").rows == [ @@ -704,7 +722,11 @@ defmodule Ch.ConnectionTest do # https://kb.altinity.com/altinity-kb-queries-and-syntax/time-zones/ assert {:ok, %{num_rows: 1, rows: [[naive_datetime, "2022-12-12 12:00:00"]], headers: headers}} = - Ch.query(conn, "select {$0:DateTime} as d, toString(d)", [naive_noon]) + Ch.query( + conn, + "select {naive:DateTime} as d, toString(d)", + %{"naive" => naive_noon} + ) # to make this test pass for contributors with non UTC timezone we perform the same steps as ClickHouse # i.e. we give server timezone to the naive datetime and shift it to UTC before comparing with the result @@ -717,12 +739,18 @@ defmodule Ch.ConnectionTest do |> DateTime.to_naive() assert {:ok, %{num_rows: 1, rows: [[~U[2022-12-12 12:00:00Z], "2022-12-12 12:00:00"]]}} = - Ch.query(conn, "select {$0:DateTime('UTC')} as d, toString(d)", [naive_noon]) + Ch.query( + conn, + "select {naive:DateTime('UTC')} as d, toString(d)", + %{"naive" => naive_noon} + ) assert {:ok, %{num_rows: 1, rows: rows}} = - Ch.query(conn, "select {$0:DateTime('Asia/Bangkok')} as d, toString(d)", [ - naive_noon - ]) + Ch.query( + conn, + "select {naive:DateTime('Asia/Bangkok')} as d, toString(d)", + %{"naive" => naive_noon} + ) assert rows == [ [ @@ -737,7 +765,7 @@ defmodule Ch.ConnectionTest do on_exit(fn -> Calendar.put_time_zone_database(prev_tz_db) end) assert_raise ArgumentError, ~r/:utc_only_time_zone_database/, fn -> - Ch.query(conn, "select {$0:DateTime('Asia/Tokyo')}", [naive_noon]) + Ch.query(conn, "select {naive:DateTime('Asia/Tokyo')}", %{"naive" => naive_noon}) end end @@ -759,21 +787,37 @@ defmodule Ch.ConnectionTest do ] assert {:ok, %{num_rows: 1, rows: [[~D[1900-01-01], "1900-01-01"]]}} = - Ch.query(conn, "select {$0:Date32} as d, toString(d)", [~D[1900-01-01]]) + Ch.query( + conn, + "select {date:Date32} as d, toString(d)", + %{"date" => ~D[1900-01-01]} + ) # max assert {:ok, %{num_rows: 1, rows: [[~D[2299-12-31], "2299-12-31"]]}} = - Ch.query(conn, "select {$0:Date32} as d, toString(d)", [~D[2299-12-31]]) + Ch.query( + conn, + "select {date:Date32} as d, toString(d)", + %{"date" => ~D[2299-12-31]} + ) # min assert {:ok, %{num_rows: 1, rows: [[~D[1900-01-01], "1900-01-01"]]}} = - Ch.query(conn, "select {$0:Date32} as d, toString(d)", [~D[1900-01-01]]) + Ch.query( + conn, + "select {date:Date32} as d, toString(d)", + %{"date" => ~D[1900-01-01]} + ) Ch.query!( conn, - "insert into new(timestamp, event_id) format RowBinary", - _rows = [[~D[1960-01-01], 3]], - types: ["Date32", "UInt8"] + [ + "insert into new(timestamp, event_id) format RowBinary\n" + | RowBinary.encode_rows( + [[~D[1960-01-01], 3]], + ["Date32", "UInt8"] + ) + ] ) assert %{ @@ -829,12 +873,16 @@ defmodule Ch.ConnectionTest do Ch.query!( conn, - "insert into datetime64_t(event_id, timestamp) format RowBinary", - _rows = [ - [4, ~N[2021-01-01 12:00:00.123456]], - [5, ~N[2021-01-01 12:00:00]] - ], - types: ["UInt8", "DateTime64(3)"] + [ + "insert into datetime64_t(event_id, timestamp) format RowBinary\n" + | RowBinary.encode_rows( + [ + [4, ~N[2021-01-01 12:00:00.123456]], + [5, ~N[2021-01-01 12:00:00]] + ], + ["UInt8", "DateTime64(3)"] + ) + ] ) assert {:ok, %{num_rows: 2, rows: rows}} = @@ -863,7 +911,11 @@ defmodule Ch.ConnectionTest do # see https://clickhouse.com/docs/en/sql-reference/data-types/datetime # https://kb.altinity.com/altinity-kb-queries-and-syntax/time-zones/ assert {:ok, %{num_rows: 1, rows: [[naive_datetime]], headers: headers}} = - Ch.query(conn, "select {$0:DateTime64(#{precision})}", [naive_noon]) + Ch.query( + conn, + "select {naive:DateTime64(#{precision})}", + %{"naive" => naive_noon} + ) # to make this test pass for contributors with non UTC timezone we perform the same steps as ClickHouse # i.e. we give server timezone to the naive datetime and shift it to UTC before comparing with the result @@ -919,9 +971,10 @@ defmodule Ch.ConnectionTest do assert {:ok, %{num_rows: 5}} = Ch.query( conn, - "insert into nullable format RowBinary", - <<1, 2, 3, 4, 5>>, - encode: false + [ + "insert into nullable format RowBinary\n" + | <<1, 2, 3, 4, 5>> + ] ) assert %{num_rows: 1, rows: [[count]]} = @@ -942,9 +995,13 @@ defmodule Ch.ConnectionTest do Ch.query!( conn, - "INSERT INTO ch_nulls(a, b, c, d) FORMAT RowBinary", - [[nil, nil, nil, nil]], - types: ["UInt8", "Nullable(UInt8)", "UInt8", "Nullable(UInt8)"] + [ + "INSERT INTO ch_nulls(a, b, c, d) FORMAT RowBinary\n" + | RowBinary.encode_rows( + [[nil, nil, nil, nil]], + ["UInt8", "Nullable(UInt8)", "UInt8", "Nullable(UInt8)"] + ) + ] ) # default is ignored... @@ -962,14 +1019,18 @@ defmodule Ch.ConnectionTest do Ch.query!( conn, - """ - INSERT INTO test_insert_default_value - SELECT id, name - FROM input('id UInt32, name Nullable(String)') - FORMAT RowBinary\ - """, - [[1, nil], [-1, nil]], - types: ["UInt32", "Nullable(String)"] + [ + """ + INSERT INTO test_insert_default_value + SELECT id, name + FROM input('id UInt32, name Nullable(String)') + FORMAT RowBinary + """ + | RowBinary.encode_rows( + [[1, nil], [-1, nil]], + ["UInt32", "Nullable(String)"] + ) + ] ) assert Ch.query!(conn, "SELECT * FROM test_insert_default_value ORDER BY n").rows == @@ -986,7 +1047,7 @@ defmodule Ch.ConnectionTest do end test "can encode and then decode Point in query params", %{conn: conn} do - assert Ch.query!(conn, "select {$0:Point}", [{10, 10}]).rows == [ + assert Ch.query!(conn, "select {p:Point}", %{"p" => {10, 10}}).rows == [ _row = [_point = {10.0, 10.0}] ] end @@ -994,7 +1055,11 @@ defmodule Ch.ConnectionTest do test "can insert and select Point", %{conn: conn} do Ch.query!(conn, "CREATE TABLE geo_point (p Point) ENGINE = Memory()") Ch.query!(conn, "INSERT INTO geo_point VALUES((10, 10))") - Ch.query!(conn, "INSERT INTO geo_point FORMAT RowBinary", [[{20, 20}]], types: ["Point"]) + + Ch.query!(conn, [ + "INSERT INTO geo_point FORMAT RowBinary\n" + | RowBinary.encode_rows([[{20, 20}]], ["Point"]) + ]) assert Ch.query!(conn, "SELECT p, toTypeName(p) FROM geo_point ORDER BY p ASC").rows == [ [{10.0, 10.0}, "Point"], @@ -1021,7 +1086,7 @@ defmodule Ch.ConnectionTest do test "can encode and then decode Ring in query params", %{conn: conn} do ring = [{0.0, 1.0}, {10.0, 3.0}] - assert Ch.query!(conn, "select {$0:Ring}", [ring]).rows == [_row = [ring]] + assert Ch.query!(conn, "select {r:Ring}", %{"r" => ring}).rows == [_row = [ring]] end test "can insert and select Ring", %{conn: conn} do @@ -1029,7 +1094,10 @@ defmodule Ch.ConnectionTest do Ch.query!(conn, "INSERT INTO geo_ring VALUES([(0, 0), (10, 0), (10, 10), (0, 10)])") ring = [{20, 20}, {0, 0}, {0, 20}] - Ch.query!(conn, "INSERT INTO geo_ring FORMAT RowBinary", [[ring]], types: ["Ring"]) + + Ch.query!(conn, [ + "INSERT INTO geo_ring FORMAT RowBinary\n" | RowBinary.encode_rows([[ring]], ["Ring"]) + ]) assert Ch.query!(conn, "SELECT r, toTypeName(r) FROM geo_ring ORDER BY r ASC").rows == [ [[{0.0, 0.0}, {10.0, 0.0}, {10.0, 10.0}, {0.0, 10.0}], "Ring"], @@ -1058,7 +1126,7 @@ defmodule Ch.ConnectionTest do test "can encode and then decode Polygon in query params", %{conn: conn} do polygon = [[{0.0, 1.0}, {10.0, 3.0}], [], [{2, 2}]] - assert Ch.query!(conn, "select {$0:Polygon}", [polygon]).rows == [_row = [polygon]] + assert Ch.query!(conn, "select {p:Polygon}", %{"p" => polygon}).rows == [_row = [polygon]] end test "can insert and select Polygon", %{conn: conn} do @@ -1070,7 +1138,11 @@ defmodule Ch.ConnectionTest do ) polygon = [[{0, 1.0}, {10, 3.2}], [], [{2, 2}]] - Ch.query!(conn, "INSERT INTO geo_polygon FORMAT RowBinary", [[polygon]], types: ["Polygon"]) + + Ch.query!(conn, [ + "INSERT INTO geo_polygon FORMAT RowBinary\n" + | RowBinary.encode_rows([[polygon]], ["Polygon"]) + ]) assert Ch.query!(conn, "SELECT pg, toTypeName(pg) FROM geo_polygon ORDER BY pg ASC").rows == [ @@ -1113,7 +1185,7 @@ defmodule Ch.ConnectionTest do test "can encode and then decode MultiPolygon in query params", %{conn: conn} do multipolygon = [[[{0.0, 1.0}, {10.0, 3.0}], [], [{2, 2}]], [], [[{3, 3}]]] - assert Ch.query!(conn, "select {$0:MultiPolygon}", [multipolygon]).rows == [ + assert Ch.query!(conn, "select {mp:MultiPolygon}", %{"mp" => multipolygon}).rows == [ _row = [multipolygon] ] end @@ -1128,9 +1200,13 @@ defmodule Ch.ConnectionTest do multipolygon = [[[{0.0, 1.0}, {10.0, 3.0}], [], [{2, 2}]], [], [[{3, 3}]]] - Ch.query!(conn, "INSERT INTO geo_multipolygon FORMAT RowBinary", [[multipolygon]], - types: ["MultiPolygon"] - ) + Ch.query!(conn, [ + "INSERT INTO geo_multipolygon FORMAT RowBinary\n" + | RowBinary.encode_rows( + [[multipolygon]], + ["MultiPolygon"] + ) + ]) assert Ch.query!(conn, "SELECT mpg, toTypeName(mpg) FROM geo_multipolygon ORDER BY mpg ASC").rows == [ @@ -1235,6 +1311,7 @@ defmodule Ch.ConnectionTest do end describe "stream" do + @tag :skip test "sends mint http packets", %{conn: conn} do stmt = "select number from system.numbers limit 1000" @@ -1246,7 +1323,7 @@ defmodule Ch.ConnectionTest do end packets = - Ch.run(conn, fn conn -> + DBConnection.run(conn, fn conn -> conn |> Ch.stream(stmt) |> Enum.flat_map(drop_ref) @@ -1267,11 +1344,12 @@ defmodule Ch.ConnectionTest do assert List.last(packets) == :done end + @tag :skip test "decodes RowBinary", %{conn: conn} do stmt = "select number from system.numbers limit 1000" rows = - Ch.run(conn, fn conn -> + DBConnection.run(conn, fn conn -> conn |> Ch.stream(stmt, _params = [], types: [:u64]) |> Enum.into([]) @@ -1280,10 +1358,11 @@ defmodule Ch.ConnectionTest do assert List.flatten(rows) == Enum.into(0..999, []) end + @tag :skip test "disconnects on early halt", %{conn: conn} do logs = ExUnit.CaptureLog.capture_log(fn -> - Ch.run(conn, fn conn -> + DBConnection.run(conn, fn conn -> conn |> Ch.stream("select number from system.numbers") |> Enum.take(1) end) @@ -1338,46 +1417,49 @@ defmodule Ch.ConnectionTest do end test "error on type mismatch", %{conn: conn} do - stmt = "insert into row_binary_names_and_types_t format RowBinaryWithNamesAndTypes" - rows = [["AB", "rare", -42]] names = ["country_code", "rare_string", "maybe_int32"] + types = [Ch.Types.fixed_string(2), Ch.Types.string(), Ch.Types.nullable(Ch.Types.u32())] - opts = [ - names: names, - types: [Ch.Types.fixed_string(2), Ch.Types.string(), Ch.Types.nullable(Ch.Types.u32())] - ] + assert {:error, %Ch.Error{code: 117, message: message}} = + Ch.query(conn, [ + "insert into row_binary_names_and_types_t format RowBinaryWithNamesAndTypes\n", + _header = RowBinary.encode_names_and_types(names, types) + | RowBinary.encode_rows([["AB", "rare", -42]], types) + ]) - assert {:error, %Ch.Error{code: 117, message: message}} = Ch.query(conn, stmt, rows, opts) assert message =~ "Type of 'rare_string' must be LowCardinality(String), not String" - opts = [ - names: names, - types: [ - Ch.Types.fixed_string(2), - Ch.Types.low_cardinality(Ch.Types.string()), - Ch.Types.nullable(Ch.Types.u32()) - ] + types = [ + Ch.Types.fixed_string(2), + Ch.Types.low_cardinality(Ch.Types.string()), + Ch.Types.nullable(Ch.Types.u32()) ] - assert {:error, %Ch.Error{code: 117, message: message}} = Ch.query(conn, stmt, rows, opts) + assert {:error, %Ch.Error{code: 117, message: message}} = + Ch.query(conn, [ + "insert into row_binary_names_and_types_t format RowBinaryWithNamesAndTypes\n", + _header = RowBinary.encode_names_and_types(names, types) + | RowBinary.encode_rows([["AB", "rare", -42]], types) + ]) + assert message =~ "Type of 'maybe_int32' must be Nullable(Int32), not Nullable(UInt32)" end test "ok on valid types", %{conn: conn} do - stmt = "insert into row_binary_names_and_types_t format RowBinaryWithNamesAndTypes" - rows = [["AB", "rare", -42]] names = ["country_code", "rare_string", "maybe_int32"] - opts = [ - names: names, - types: [ - Ch.Types.fixed_string(2), - Ch.Types.low_cardinality(Ch.Types.string()), - Ch.Types.nullable(Ch.Types.i32()) - ] + types = [ + Ch.Types.fixed_string(2), + Ch.Types.low_cardinality(Ch.Types.string()), + Ch.Types.nullable(Ch.Types.i32()) ] - assert {:ok, %{num_rows: 1}} = Ch.query(conn, stmt, rows, opts) + assert {:ok, %{num_rows: 1}} = + Ch.query(conn, [ + "insert into row_binary_names_and_types_t format RowBinaryWithNamesAndTypes\n", + _header = RowBinary.encode_names_and_types(names, types) + | RowBinary.encode_rows([["AB", "rare", -42]], types) + ]) end end end diff --git a/test/ch/faults_test.exs b/test/ch/faults_test.exs index e647364..32db145 100644 --- a/test/ch/faults_test.exs +++ b/test/ch/faults_test.exs @@ -367,9 +367,10 @@ defmodule Ch.FaultsTest do assert {:error, %Mint.TransportError{reason: :closed}} = Ch.query( conn, - "insert into unknown_table(a,b) format RowBinary", - stream, - encode: false + Stream.concat( + ["insert into unknown_table(a,b) format RowBinary\n"], + stream + ) ) end) @@ -384,9 +385,10 @@ defmodule Ch.FaultsTest do assert {:error, %Ch.Error{code: 60, message: message}} = Ch.query( conn, - "insert into unknown_table(a,b) format RowBinary", - stream, - encode: false + Stream.concat( + ["insert into unknown_table(a,b) format RowBinary\n"], + stream + ) ) assert message =~ ~r/UNKNOWN_TABLE/ @@ -426,9 +428,10 @@ defmodule Ch.FaultsTest do assert {:error, %Mint.TransportError{reason: :closed}} = Ch.query( conn, - "insert into unknown_table(a,b) format RowBinary", - stream, - encode: false + Stream.concat( + ["insert into unknown_table(a,b) format RowBinary\n"], + stream + ) ) end) @@ -447,9 +450,10 @@ defmodule Ch.FaultsTest do assert {:error, %Ch.Error{code: 60, message: message}} = Ch.query( conn, - "insert into unknown_table(a,b) format RowBinary", - stream, - encode: false + Stream.concat( + ["insert into unknown_table(a,b) format RowBinary\n"], + stream + ) ) assert message =~ ~r/UNKNOWN_TABLE/ diff --git a/test/ch/query_test.exs b/test/ch/query_test.exs index c8eec26..7ff7b9c 100644 --- a/test/ch/query_test.exs +++ b/test/ch/query_test.exs @@ -20,7 +20,9 @@ defmodule Ch.QueryTest do """).command == :select assert Query.build(["select 1+2"]).command == :select - assert Query.build([?S, ?E, ?L | "ECT 1"]).command == :select + + # TODO? + refute Query.build([?S, ?E, ?L | "ECT 1"]).command == :select assert Query.build("with insert as (select 1) select * from insert").command == :select assert Query.build("insert into table(a, b) values(1, 2)").command == :insert @@ -112,27 +114,34 @@ defmodule Ch.QueryTest do test "decoded binaries copy behaviour", %{conn: conn} do text = "hello world" - assert [[bin]] = Ch.query!(conn, "SELECT {$0:String}", [text]).rows + assert [[bin]] = Ch.query!(conn, "SELECT {t:String}", %{"t" => text}).rows assert :binary.referenced_byte_size(bin) == byte_size(text) # For OTP 20+ refc binaries up to 64 bytes might be copied during a GC text = String.duplicate("hello world", 6) - assert [[bin]] = Ch.query!(conn, "SELECT {$0:String}", [text]).rows + assert [[bin]] = Ch.query!(conn, "SELECT {t:String}", %{"t" => text}).rows assert :binary.referenced_byte_size(bin) == byte_size(text) end test "encode basic types", %{conn: conn} do # TODO # assert [[nil, nil]] = query("SELECT $1::text, $2::int", [nil, nil]) - assert [[true, false]] = Ch.query!(conn, "SELECT {$0:bool}, {$1:Bool}", [true, false]).rows - assert [["ẽ"]] = Ch.query!(conn, "SELECT {$0:char}", ["ẽ"]).rows - assert [[42]] = Ch.query!(conn, "SELECT {$0:int}", [42]).rows - assert [[42.0, 43.0]] = Ch.query!(conn, "SELECT {$0:float}, {$1:float}", [42, 43.0]).rows - assert [[nil, nil]] = Ch.query!(conn, "SELECT {$0:float}, {$1:float}", ["NaN", "nan"]).rows - assert [[nil]] = Ch.query!(conn, "SELECT {$0:float}", ["inf"]).rows - assert [[nil]] = Ch.query!(conn, "SELECT {$0:float}", ["-inf"]).rows - assert [["ẽric"]] = Ch.query!(conn, "SELECT {$0:varchar}", ["ẽric"]).rows - assert [[<<1, 2, 3>>]] = Ch.query!(conn, "SELECT {$0:bytea}", [<<1, 2, 3>>]).rows + assert [[true, false]] = + Ch.query!(conn, "SELECT {a:bool}, {b:Bool}", [{"a", true}, {"b", false}]).rows + + assert [["ẽ"]] = Ch.query!(conn, "SELECT {e:char}", [{"e", "ẽ"}]).rows + assert [[42]] = Ch.query!(conn, "SELECT {num:int}", [{"num", 42}]).rows + + assert [[42.0, 43.0]] = + Ch.query!(conn, "SELECT {n:float}, {f:float}", [{"n", 42}, {"f", 43.0}]).rows + + assert [[nil, nil]] = + Ch.query!(conn, "SELECT {a:float}, {b:float}", [{"a", "NaN"}, {"b", "nan"}]).rows + + assert [[nil]] = Ch.query!(conn, "SELECT {i:float}", %{"i" => "inf"}).rows + assert [[nil]] = Ch.query!(conn, "SELECT {ni:float}", %{"ni" => "-inf"}).rows + assert [["ẽric"]] = Ch.query!(conn, "SELECT {name:varchar}", %{"name" => "ẽric"}).rows + assert [[<<1, 2, 3>>]] = Ch.query!(conn, "SELECT {b:bytea}", %{"b" => <<1, 2, 3>>}).rows end test "encode numeric", %{conn: conn} do @@ -159,70 +168,78 @@ defmodule Ch.QueryTest do Enum.each(nums, fn {num, type} -> dec = Decimal.new(num) - assert [[dec]] == Ch.query!(conn, "SELECT {$0:#{type}}", [dec]).rows + assert [[dec]] == Ch.query!(conn, "SELECT {dec:#{type}}", %{"dec" => dec}).rows end) end test "encode integers and floats as numeric", %{conn: conn} do dec = Decimal.new(1) - assert [[dec]] == Ch.query!(conn, "SELECT {$0:numeric(1,0)}", [1]).rows + assert [[dec]] == Ch.query!(conn, "SELECT {dec:numeric(1,0)}", %{"dec" => 1}).rows dec = Decimal.from_float(1.0) - assert [[dec]] == Ch.query!(conn, "SELECT {$0:numeric(2,1)}", [1.0]).rows + assert [[dec]] == Ch.query!(conn, "SELECT {dec:numeric(2,1)}", %{"dec" => 1.0}).rows end @tag skip: true test "encode json/jsonb", %{conn: conn} do json = %{"foo" => 42} - assert [[json]] == Ch.query!(conn, "SELECT {$0::json}", [json]).rows + assert [[json]] == Ch.query!(conn, "SELECT {json::json}", %{"json" => json}).rows end test "encode uuid", %{conn: conn} do # TODO uuid = <<0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15>> uuid_hex = "00010203-0405-0607-0809-0a0b0c0d0e0f" - assert [[^uuid]] = Ch.query!(conn, "SELECT {$0:UUID}", [uuid_hex]).rows + assert [[^uuid]] = Ch.query!(conn, "SELECT {uuid:UUID}", %{"uuid" => uuid_hex}).rows end test "encode arrays", %{conn: conn} do - assert [[[]]] = Ch.query!(conn, "SELECT {$0:Array(integer)}", [[]]).rows - assert [[[1]]] = Ch.query!(conn, "SELECT {$0:Array(integer)}", [[1]]).rows - assert [[[1, 2]]] = Ch.query!(conn, "SELECT {$0:Array(integer)}", [[1, 2]]).rows + assert [[[]]] = Ch.query!(conn, "SELECT {a:Array(integer)}", %{"a" => []}).rows + assert [[[1]]] = Ch.query!(conn, "SELECT {a:Array(integer)}", %{"a" => [1]}).rows + assert [[[1, 2]]] = Ch.query!(conn, "SELECT {a:Array(integer)}", %{"a" => [1, 2]}).rows - assert [[["1"]]] = Ch.query!(conn, "SELECT {$0:Array(String)}", [["1"]]).rows - assert [[[true]]] = Ch.query!(conn, "SELECT {$0:Array(Bool)}", [[true]]).rows + assert [[["1"]]] = Ch.query!(conn, "SELECT {a:Array(String)}", %{"a" => ["1"]}).rows + assert [[[true]]] = Ch.query!(conn, "SELECT {a:Array(Bool)}", %{"a" => [true]}).rows assert [[[~D[2023-01-01]]]] = - Ch.query!(conn, "SELECT {$0:Array(Date)}", [[~D[2023-01-01]]]).rows + Ch.query!(conn, "SELECT {a:Array(Date)}", %{"a" => [~D[2023-01-01]]}).rows assert [[[Ch.Test.to_clickhouse_naive(conn, ~N[2023-01-01 12:00:00])]]] == - Ch.query!(conn, "SELECT {$0:Array(DateTime)}", [[~N[2023-01-01 12:00:00]]]).rows + Ch.query!(conn, "SELECT {a:Array(DateTime)}", %{"a" => [~N[2023-01-01 12:00:00]]}).rows assert [[[~U[2023-01-01 12:00:00Z]]]] == - Ch.query!(conn, "SELECT {$0:Array(DateTime('UTC'))}", [[~N[2023-01-01 12:00:00]]]).rows + Ch.query!( + conn, + "SELECT {a:Array(DateTime('UTC'))}", + %{"a" => [~N[2023-01-01 12:00:00]]} + ).rows assert [[[~N[2023-01-01 12:00:00]]]] == - Ch.query!(conn, "SELECT {$0:Array(DateTime)}", [[~U[2023-01-01 12:00:00Z]]]).rows + Ch.query!(conn, "SELECT {a:Array(DateTime)}", %{"a" => [~U[2023-01-01 12:00:00Z]]}).rows assert [[[~U[2023-01-01 12:00:00Z]]]] == - Ch.query!(conn, "SELECT {$0:Array(DateTime('UTC'))}", [[~U[2023-01-01 12:00:00Z]]]).rows + Ch.query!(conn, "SELECT {a:Array(DateTime('UTC'))}", %{ + "a" => [~U[2023-01-01 12:00:00Z]] + }).rows assert [[[[0], [1]]]] = - Ch.query!(conn, "SELECT {$0:Array(Array(integer))}", [[[0], [1]]]).rows + Ch.query!(conn, "SELECT {a:Array(Array(integer))}", %{"a" => [[0], [1]]}).rows + + assert [[[[0]]]] = Ch.query!(conn, "SELECT {a:Array(Array(integer))}", %{"a" => [[0]]}).rows - assert [[[[0]]]] = Ch.query!(conn, "SELECT {$0:Array(Array(integer))}", [[[0]]]).rows - # assert [[[1, nil, 3]]] = Ch.query!(conn, "SELECT {$0:Array(integer)}", [[1, nil, 3]]).rows + # assert [[[1, nil, 3]]] = Ch.query!(conn, "SELECT {a:Array(integer)}", %{"a" => [1, nil, 3]}).rows end test "encode network types", %{conn: conn} do # TODO, or wrap in custom struct like in postgrex # assert [["127.0.0.1/32"]] = - # Ch.query!(conn, "SELECT {$0:inet4}::text", [{127, 0, 0, 1}]).rows + # Ch.query!(conn, "SELECT {ip:inet4}::text", %{"ip" => {127, 0, 0, 1}}).rows - assert [[{127, 0, 0, 1}]] = Ch.query!(conn, "SELECT {$0:text}::inet4", ["127.0.0.1"]).rows + assert [[{127, 0, 0, 1}]] = + Ch.query!(conn, "SELECT {ip:text}::inet4", %{"ip" => "127.0.0.1"}).rows assert [[{0, 0, 0, 0, 0, 0, 0, 1}]] = - Ch.query!(conn, "SELECT {$0:text}::inet6", ["::1"]).rows + Ch.query!(conn, "SELECT {ip:text}::inet6", %{"ip" => "::1"}).rows end test "result struct", %{conn: conn} do From 433f9543d362b68f8841b3373828b5daef900950 Mon Sep 17 00:00:00 2001 From: ruslandoga <67764432+ruslandoga@users.noreply.github.com> Date: Fri, 29 Dec 2023 19:35:33 +0800 Subject: [PATCH 02/20] update bench/insert.exs --- bench/insert.exs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/bench/insert.exs b/bench/insert.exs index f6f01c4..b852196 100644 --- a/bench/insert.exs +++ b/bench/insert.exs @@ -6,7 +6,7 @@ scheme = System.get_env("CH_SCHEME") || "http" database = System.get_env("CH_DATABASE") || "ch_bench" {:ok, conn} = Ch.start_link(scheme: scheme, hostname: hostname, port: port) -Ch.query!(conn, "CREATE DATABASE IF NOT EXISTS {$0:Identifier}", [database]) +Ch.query!(conn, "CREATE DATABASE IF NOT EXISTS {db:Identifier}", %{"db" => database}) Ch.query!(conn, """ CREATE TABLE IF NOT EXISTS #{database}.benchmark ( @@ -18,7 +18,7 @@ CREATE TABLE IF NOT EXISTS #{database}.benchmark ( """) types = [Ch.Types.u64(), Ch.Types.string(), Ch.Types.array(Ch.Types.u8()), Ch.Types.datetime()] -statement = "INSERT INTO #{database}.benchmark FORMAT RowBinary" +statement = "INSERT INTO #{database}.benchmark FORMAT RowBinary\n" rows = fn count -> Enum.map(1..count, fn i -> @@ -32,7 +32,9 @@ Benchee.run( %{ # "control" => fn rows -> Enum.each(rows, fn _row -> :ok end) end, "encode" => fn rows -> RowBinary.encode_rows(rows, types) end, - "insert" => fn rows -> Ch.query!(conn, statement, rows, types: types) end, + "encode+insert" => fn rows -> + Ch.query!(conn, [statement | RowBinary.encode_rows(rows, types)]) + end, # "control stream" => fn rows -> rows |> Stream.chunk_every(60_000) |> Stream.run() end, "encode stream" => fn rows -> rows @@ -46,7 +48,7 @@ Benchee.run( |> Stream.chunk_every(60_000) |> Stream.map(fn chunk -> RowBinary.encode_rows(chunk, types) end) - Ch.query!(conn, statement, stream, encode: false) + Ch.query!(conn, Stream.concat([statement], stream)) end }, inputs: %{ From 181fd07f7573e93ee77f85e3b4be9fcc4c658138 Mon Sep 17 00:00:00 2001 From: ruslandoga <67764432+ruslandoga@users.noreply.github.com> Date: Fri, 29 Dec 2023 19:37:47 +0800 Subject: [PATCH 03/20] update changelog --- CHANGELOG.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7803592..aed6667 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,8 +2,7 @@ ## Unreleased -- move rows payload (RowBinary, CSV, etc.) to SQL statement -- remove pseudo-positional binds, make param names explicit +- move rows payload (RowBinary, CSV, etc.) to SQL statement and remove pseudo-positional binds, making param names explicit https://github.com/plausible/ch/pull/143 ## 0.2.4 (2024-01-29) From 9beaf1737543590a3e5dc5e255bf24c91e6c3887 Mon Sep 17 00:00:00 2001 From: ruslandoga <67764432+ruslandoga@users.noreply.github.com> Date: Fri, 29 Dec 2023 19:46:45 +0800 Subject: [PATCH 04/20] Ch.Result: drop :headers, add :data --- CHANGELOG.md | 1 + lib/ch/query.ex | 6 +++--- lib/ch/result.ex | 20 +++++++++----------- test/ch/connection_test.exs | 28 +++++++++++++--------------- test/ch/faults_test.exs | 3 +-- test/ch/headers_test.exs | 24 ++++++------------------ 6 files changed, 33 insertions(+), 49 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index aed6667..5c5c7dd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ## Unreleased - move rows payload (RowBinary, CSV, etc.) to SQL statement and remove pseudo-positional binds, making param names explicit https://github.com/plausible/ch/pull/143 +- drop `:headers` from `%Ch.Result{}` but add `:data` ## 0.2.4 (2024-01-29) diff --git a/lib/ch/query.ex b/lib/ch/query.ex index ef74095..3e9bae2 100644 --- a/lib/ch/query.ex +++ b/lib/ch/query.ex @@ -94,7 +94,7 @@ defimpl DBConnection.Query, for: Ch.Query do cond do decode and format == "RowBinaryWithNamesAndTypes" -> rows = data |> IO.iodata_to_binary() |> RowBinary.decode_rows() - %Result{num_rows: length(rows), rows: rows, headers: headers, command: command} + %Result{num_rows: length(rows), rows: rows, data: data, command: command} format == nil -> num_rows = @@ -103,10 +103,10 @@ defimpl DBConnection.Query, for: Ch.Query do String.to_integer(written_rows) end - %Result{num_rows: num_rows, headers: headers, command: command} + %Result{num_rows: num_rows, data: data, command: command} true -> - %Result{rows: data, headers: headers, command: command} + %Result{data: data, command: command} end end diff --git a/lib/ch/result.ex b/lib/ch/result.ex index ddca4df..2977aa9 100644 --- a/lib/ch/result.ex +++ b/lib/ch/result.ex @@ -2,21 +2,19 @@ defmodule Ch.Result do @moduledoc """ Result struct returned from any successful query. Its fields are: - * `command` - An atom of the query command, for example: `:select`, `:insert`; - * `rows` - The result set. One of: - - a list of lists, each inner list corresponding to a - row, each element in the inner list corresponds to a column; - - raw iodata when the response is not automatically decoded, e.g. `x-clickhouse-format: CSV` - * `num_rows` - The number of fetched or affected rows; - * `headers` - The HTTP response headers + * `command` - An atom of the query command, for example: `:select`, `:insert` + * `num_rows` - The number of fetched or affected rows + * `rows` - A list of lists, each inner list corresponding to a row, each element in the inner list corresponds to a column + * `data` - The raw iodata from the response + """ - defstruct [:command, :num_rows, :rows, :headers] + defstruct [:command, :num_rows, :rows, :data] @type t :: %__MODULE__{ - command: atom, + command: Ch.Query.command() | nil, num_rows: non_neg_integer | nil, - rows: [[term]] | iodata | nil, - headers: Mint.Types.headers() + rows: [[term]] | nil, + data: iodata } end diff --git a/test/ch/connection_test.exs b/test/ch/connection_test.exs index 2ff11f5..fd925b4 100644 --- a/test/ch/connection_test.exs +++ b/test/ch/connection_test.exs @@ -47,12 +47,12 @@ defmodule Ch.ConnectionTest do # datetimes in params are sent in text and ClickHouse translates them to UTC from server timezone by default # see https://clickhouse.com/docs/en/sql-reference/data-types/datetime # https://kb.altinity.com/altinity-kb-queries-and-syntax/time-zones/ - assert {:ok, %{num_rows: 1, rows: [[naive_datetime]], headers: headers}} = + assert {:ok, %{num_rows: 1, rows: [[naive_datetime]]}} = Ch.query(conn, "select {naive:DateTime}", %{"naive" => naive_noon}) # to make this test pass for contributors with non UTC timezone we perform the same steps as ClickHouse # i.e. we give server timezone to the naive datetime and shift it to UTC before comparing with the result - {_, timezone} = List.keyfind!(headers, "x-clickhouse-timezone", 0) + %Ch.Result{rows: [[timezone]]} = Ch.query!(conn, "select timezone()") assert naive_datetime == naive_noon @@ -720,8 +720,7 @@ defmodule Ch.ConnectionTest do # datetimes in params are sent in text and ClickHouse translates them to UTC from server timezone by default # see https://clickhouse.com/docs/en/sql-reference/data-types/datetime # https://kb.altinity.com/altinity-kb-queries-and-syntax/time-zones/ - assert {:ok, - %{num_rows: 1, rows: [[naive_datetime, "2022-12-12 12:00:00"]], headers: headers}} = + assert {:ok, %{num_rows: 1, rows: [[naive_datetime, "2022-12-12 12:00:00"]]}} = Ch.query( conn, "select {naive:DateTime} as d, toString(d)", @@ -730,7 +729,7 @@ defmodule Ch.ConnectionTest do # to make this test pass for contributors with non UTC timezone we perform the same steps as ClickHouse # i.e. we give server timezone to the naive datetime and shift it to UTC before comparing with the result - {_, timezone} = List.keyfind!(headers, "x-clickhouse-timezone", 0) + %Ch.Result{rows: [[timezone]]} = Ch.query!(conn, "select timezone()") assert naive_datetime == naive_noon @@ -910,7 +909,7 @@ defmodule Ch.ConnectionTest do # datetimes in params are sent in text and ClickHouse translates them to UTC from server timezone by default # see https://clickhouse.com/docs/en/sql-reference/data-types/datetime # https://kb.altinity.com/altinity-kb-queries-and-syntax/time-zones/ - assert {:ok, %{num_rows: 1, rows: [[naive_datetime]], headers: headers}} = + assert {:ok, %{num_rows: 1, rows: [[naive_datetime]]}} = Ch.query( conn, "select {naive:DateTime64(#{precision})}", @@ -919,7 +918,7 @@ defmodule Ch.ConnectionTest do # to make this test pass for contributors with non UTC timezone we perform the same steps as ClickHouse # i.e. we give server timezone to the naive datetime and shift it to UTC before comparing with the result - {_, timezone} = List.keyfind!(headers, "x-clickhouse-timezone", 0) + %Ch.Result{rows: [[timezone]]} = Ch.query!(conn, "select timezone()") expected = naive_noon @@ -1066,12 +1065,11 @@ defmodule Ch.ConnectionTest do [{20.0, 20.0}, "Point"] ] - # to make our RowBinary is not garbage in garbage out we also test a text format response - assert conn - |> Ch.query!( + # to make sure our RowBinary is not "garbage in, garbage out" we also test a "text format" response + assert Ch.query!( + conn, "SELECT p, toTypeName(p) FROM geo_point ORDER BY p ASC FORMAT JSONCompact" - ) - |> Map.fetch!(:rows) + ).data |> Jason.decode!() |> Map.fetch!("data") == [ [[10, 10], "Point"], @@ -1108,7 +1106,7 @@ defmodule Ch.ConnectionTest do assert Ch.query!( conn, "SELECT r, toTypeName(r) FROM geo_ring ORDER BY r ASC FORMAT JSONCompact" - ).rows + ).data |> Jason.decode!() |> Map.fetch!("data") == [ [[[0, 0], [10, 0], [10, 10], [0, 10]], "Ring"], @@ -1160,7 +1158,7 @@ defmodule Ch.ConnectionTest do assert Ch.query!( conn, "SELECT pg, toTypeName(pg) FROM geo_polygon ORDER BY pg ASC FORMAT JSONCompact" - ).rows + ).data |> Jason.decode!() |> Map.fetch!("data") == [ [[[[0, 1], [10, 3.2]], [], [[2, 2]]], "Polygon"], @@ -1242,7 +1240,7 @@ defmodule Ch.ConnectionTest do assert Ch.query!( conn, "SELECT mpg, toTypeName(mpg) FROM geo_multipolygon ORDER BY mpg ASC FORMAT JSONCompact" - ).rows + ).data |> Jason.decode!() |> Map.fetch!("data") == [ [ diff --git a/test/ch/faults_test.exs b/test/ch/faults_test.exs index 32db145..b54eccc 100644 --- a/test/ch/faults_test.exs +++ b/test/ch/faults_test.exs @@ -476,8 +476,7 @@ defmodule Ch.FaultsTest do test = self() header = "X-ClickHouse-Server-Display-Name" - {:ok, %Result{headers: headers}} = Ch.Test.sql_exec("select 1") - {_, expected_name} = List.keyfind!(headers, String.downcase(header), 0) + {:ok, %Result{rows: [[expected_name]]}} = Ch.Test.sql_exec("select hostName()") log = capture_async_log(fn -> diff --git a/test/ch/headers_test.exs b/test/ch/headers_test.exs index 52aa7c3..18c8b98 100644 --- a/test/ch/headers_test.exs +++ b/test/ch/headers_test.exs @@ -7,50 +7,38 @@ defmodule Ch.HeadersTest do end test "can request gzipped response through headers", %{conn: conn} do - assert {:ok, %{rows: rows, headers: headers}} = + assert {:ok, %{data: data}} = Ch.query(conn, "select number from system.numbers limit 100", [], decode: false, settings: [enable_http_compression: 1], headers: [{"accept-encoding", "gzip"}] ) - assert :proplists.get_value("content-type", headers) == "application/octet-stream" - assert :proplists.get_value("content-encoding", headers) == "gzip" - assert :proplists.get_value("x-clickhouse-format", headers) == "RowBinaryWithNamesAndTypes" - # https://en.wikipedia.org/wiki/Gzip - assert <<0x1F, 0x8B, _rest::bytes>> = IO.iodata_to_binary(rows) + assert <<0x1F, 0x8B, _rest::bytes>> = IO.iodata_to_binary(data) end test "can request lz4 response through headers", %{conn: conn} do - assert {:ok, %{rows: rows, headers: headers}} = + assert {:ok, %{data: data}} = Ch.query(conn, "select number from system.numbers limit 100", [], decode: false, settings: [enable_http_compression: 1], headers: [{"accept-encoding", "lz4"}] ) - assert :proplists.get_value("content-type", headers) == "application/octet-stream" - assert :proplists.get_value("content-encoding", headers) == "lz4" - assert :proplists.get_value("x-clickhouse-format", headers) == "RowBinaryWithNamesAndTypes" - # https://en.wikipedia.org/wiki/LZ4_(compression_algorithm) - assert <<0x04, 0x22, 0x4D, 0x18, _rest::bytes>> = IO.iodata_to_binary(rows) + assert <<0x04, 0x22, 0x4D, 0x18, _rest::bytes>> = IO.iodata_to_binary(data) end test "can request zstd response through headers", %{conn: conn} do - assert {:ok, %{rows: rows, headers: headers}} = + assert {:ok, %{data: data}} = Ch.query(conn, "select number from system.numbers limit 100", [], decode: false, settings: [enable_http_compression: 1], headers: [{"accept-encoding", "zstd"}] ) - assert :proplists.get_value("content-type", headers) == "application/octet-stream" - assert :proplists.get_value("content-encoding", headers) == "zstd" - assert :proplists.get_value("x-clickhouse-format", headers) == "RowBinaryWithNamesAndTypes" - # https://en.wikipedia.org/wiki/LZ4_(compression_algorithm) - assert <<0x28, 0xB5, 0x2F, 0xFD, _rest::bytes>> = IO.iodata_to_binary(rows) + assert <<0x28, 0xB5, 0x2F, 0xFD, _rest::bytes>> = IO.iodata_to_binary(data) end end From a121d3ee6954d459018d0f45eb1574acfb7cc2f1 Mon Sep 17 00:00:00 2001 From: ruslandoga <67764432+ruslandoga@users.noreply.github.com> Date: Fri, 29 Dec 2023 19:49:41 +0800 Subject: [PATCH 05/20] add pr to changelog --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5c5c7dd..388881b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,7 +3,7 @@ ## Unreleased - move rows payload (RowBinary, CSV, etc.) to SQL statement and remove pseudo-positional binds, making param names explicit https://github.com/plausible/ch/pull/143 -- drop `:headers` from `%Ch.Result{}` but add `:data` +- drop `:headers` from `%Ch.Result{}` but add `:data` https://github.com/plausible/ch/pull/144 ## 0.2.4 (2024-01-29) From 7a1fe850698c1f21e670c5d9f57a969aaeaa303b Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Fri, 29 Dec 2023 11:51:31 +0000 Subject: [PATCH 06/20] Bump dialyxir from 1.4.2 to 1.4.3 Bumps [dialyxir](https://github.com/jeremyjh/dialyxir) from 1.4.2 to 1.4.3. - [Release notes](https://github.com/jeremyjh/dialyxir/releases) - [Changelog](https://github.com/jeremyjh/dialyxir/blob/master/CHANGELOG.md) - [Commits](https://github.com/jeremyjh/dialyxir/compare/1.4.2...1.4.3) --- updated-dependencies: - dependency-name: dialyxir dependency-type: direct:development update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] From db849559a93b31b6de7e2da53c43a73e5cd9ec6a Mon Sep 17 00:00:00 2001 From: ruslandoga <67764432+ruslandoga@users.noreply.github.com> Date: Fri, 29 Dec 2023 19:56:24 +0800 Subject: [PATCH 07/20] cleanup readme --- README.md | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index e6121cf..7d84aeb 100644 --- a/README.md +++ b/README.md @@ -75,9 +75,6 @@ Ch.query!(pid, "CREATE TABLE IF NOT EXISTS ch_demo(id UInt64) ENGINE Null") %Ch.Result{num_rows: 2} = Ch.query!(pid, "INSERT INTO ch_demo(id) VALUES (0), (1)") -%Ch.Result{num_rows: 2} = - Ch.query!(pid, "INSERT INTO ch_demo(id) VALUES ({$0:UInt8}), ({$1:UInt32})", [0, 1]) - %Ch.Result{num_rows: 2} = Ch.query!(pid, "INSERT INTO ch_demo(id) VALUES ({a:UInt16}), ({b:UInt64})", %{"a" => 0, "b" => 1}) @@ -117,7 +114,10 @@ header = Ch.RowBinary.encode_names_and_types(names, types) row_binary = Ch.RowBinary.encode_rows(rows, types) %Ch.Result{num_rows: 3} = - Ch.query!(pid, ["INSERT INTO ch_demo FORMAT RowBinaryWithNamesAndTypes\n", header | row_binary]) + Ch.query!(pid, [ + "INSERT INTO ch_demo FORMAT RowBinaryWithNamesAndTypes\n", + header | row_binary + ]) ``` #### Insert rows in custom [format](https://clickhouse.com/docs/en/interfaces/formats) @@ -147,7 +147,12 @@ row_binary = |> Stream.take(10) %Ch.Result{num_rows: 1_000_000} = - Ch.query(pid, Stream.concat(["INSERT INTO ch_demo(id) FORMAT RowBinary\n"], row_binary)) + Ch.query(pid, + Stream.concat( + ["INSERT INTO ch_demo(id) FORMAT RowBinary\n"], + row_binary + ) + ) ``` #### Insert rows via [input](https://clickhouse.com/docs/en/sql-reference/table-functions/input) function @@ -247,8 +252,8 @@ row_binary = Ch.RowBinary.encode(:string, "\x61\xF0\x80\x80\x80b") %Ch.Result{rows: [["a�b"]]} = Ch.query!(pid, "SELECT * FROM ch_utf8") -%Ch.Result{rows: %{"data" => [["a�b"]]}} = - pid |> Ch.query!("SELECT * FROM ch_utf8 FORMAT JSONCompact") |> Map.update!(:rows, &Jason.decode!/1) +%{"data" => [["a�b"]]} = + pid |> Ch.query!("SELECT * FROM ch_utf8 FORMAT JSONCompact").data |> Jason.decode!() ``` #### Timezones in RowBinary From 2a71a73833e31505b5ac8c54af877f5b90317209 Mon Sep 17 00:00:00 2001 From: ruslandoga <67764432+ruslandoga@users.noreply.github.com> Date: Mon, 29 Jan 2024 11:52:40 +0900 Subject: [PATCH 08/20] Fix query string escaping (#147) * add failing test * fix query string escaping * changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 388881b..496a269 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ - move rows payload (RowBinary, CSV, etc.) to SQL statement and remove pseudo-positional binds, making param names explicit https://github.com/plausible/ch/pull/143 - drop `:headers` from `%Ch.Result{}` but add `:data` https://github.com/plausible/ch/pull/144 +- fix query string escaping for `\t`, `\\`, and `\n` https://github.com/plausible/ch/pull/147 ## 0.2.4 (2024-01-29) From 173d06bc5eb2f0234730c7f02860ec970d1adead Mon Sep 17 00:00:00 2001 From: ruslandoga <67764432+ruslandoga@users.noreply.github.com> Date: Thu, 11 Jan 2024 13:25:11 +0900 Subject: [PATCH 09/20] refactor Ch.stream/4 --- CHANGELOG.md | 1 + bench/stream.exs | 42 ++++++------- lib/ch.ex | 4 +- lib/ch/connection.ex | 118 ++++++++++++++++++++++++++---------- lib/ch/query.ex | 5 +- test/ch/connection_test.exs | 48 +++------------ test/ch/stream_test.exs | 40 ++++++++++++ 7 files changed, 159 insertions(+), 99 deletions(-) create mode 100644 test/ch/stream_test.exs diff --git a/CHANGELOG.md b/CHANGELOG.md index 496a269..a60069e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ - move rows payload (RowBinary, CSV, etc.) to SQL statement and remove pseudo-positional binds, making param names explicit https://github.com/plausible/ch/pull/143 - drop `:headers` from `%Ch.Result{}` but add `:data` https://github.com/plausible/ch/pull/144 - fix query string escaping for `\t`, `\\`, and `\n` https://github.com/plausible/ch/pull/147 +- make `Ch.stream/4` emit `%Ch.Result{data: iodata}` https://github.com/plausible/ch/pull/148 ## 0.2.4 (2024-01-29) diff --git a/bench/stream.exs b/bench/stream.exs index 43c0e38..85ba295 100644 --- a/bench/stream.exs +++ b/bench/stream.exs @@ -10,36 +10,30 @@ statement = fn limit -> "SELECT number FROM system.numbers_mt LIMIT #{limit}" end -run_stream = fn statement, opts -> - f = fn conn -> conn |> Ch.stream(statement, [], opts) |> Stream.run() end - Ch.run(conn, f, timeout: :infinity) -end - Benchee.run( %{ - "stream without decode" => fn statement -> - run_stream.(statement, _opts = []) + "RowBinary stream without decode" => fn statement -> + DBConnection.run( + conn, + fn conn -> + conn + |> Ch.stream(statement, _params = [], format: "RowBinary") + |> Stream.run() + end, + timeout: :infinity + ) end, - # TODO why is this faster? - "stream with manual decode" => fn statement -> - f = fn conn -> + "RowBinary stream with manual decode" => fn statement -> + DBConnection.run(conn, fn conn -> conn - |> Ch.stream(statement, [], format: "RowBinary") - |> Stream.map(fn responses -> - Enum.each(responses, fn - {:data, _ref, data} -> Ch.RowBinary.decode_rows(data, [:u64]) - {:status, _ref, 200} -> :ok - {:headers, _ref, _headers} -> :ok - {:done, _ref} -> :ok - end) + |> Ch.stream(statement, _params = [], format: "RowBinary") + |> Stream.map(fn %Ch.Result{data: data} -> + data + |> IO.iodata_to_binary() + |> Ch.RowBinary.decode_rows([:u64]) end) |> Stream.run() - end - - Ch.run(conn, f, timeout: :infinity) - end, - "stream with decode" => fn statement -> - run_stream.(statement, types: [:u64]) + end) end }, inputs: %{ diff --git a/lib/ch.ex b/lib/ch.ex index 0a8ac52..91b7336 100644 --- a/lib/ch.ex +++ b/lib/ch.ex @@ -99,7 +99,9 @@ defmodule Ch do DBConnection.execute!(conn, query, params, opts) end - @doc false + @doc """ + Returns a stream for a query on a connection. + """ @spec stream(DBConnection.t(), statement, params, [query_option]) :: DBConnection.Stream.t() def stream(conn, statement, params \\ [], opts \\ []) do query = Query.build(statement, opts) diff --git a/lib/ch/connection.ex b/lib/ch/connection.ex index 6a729c9..d46a3d4 100644 --- a/lib/ch/connection.ex +++ b/lib/ch/connection.ex @@ -92,41 +92,97 @@ defmodule Ch.Connection do @impl true def handle_declare(query, params, opts, conn) do - {query_params, extra_headers, body} = params + %Query{command: command, statement: statement} = query + {query_params, extra_headers} = params path = path(conn, query_params, opts) headers = headers(conn, extra_headers, opts) - types = Keyword.get(opts, :types) - with {:ok, conn, ref} <- send_request(conn, "POST", path, headers, body) do - {:ok, query, {types, ref}, conn} + with {:ok, conn, _ref} <- send_request(conn, "POST", path, headers, statement), + {:ok, conn} <- eat_ok_status_and_headers(conn, timeout(conn, opts)) do + {:ok, query, %Result{command: command}, conn} end end + @spec eat_ok_status_and_headers(conn, timeout) :: + {:ok, %{conn: conn, buffer: [Mint.Types.response()]}} + | {:error, Ch.Error.t(), conn} + | {:disconnect, Mint.Types.error(), conn} + defp eat_ok_status_and_headers(conn, timeout) do + case HTTP.recv(conn, 0, timeout) do + {:ok, conn, responses} -> + case eat_ok_status_and_headers(responses) do + {:ok, data} -> + {:ok, %{conn: conn, buffer: data}} + + :more -> + eat_ok_status_and_headers(conn, timeout) + + :error -> + all_responses_result = + case handle_all_responses(responses, []) do + {:ok, responses} -> {:ok, conn, responses} + {:more, acc} -> recv_all(conn, acc, timeout) + end + + with {:ok, conn, responses} <- all_responses_result do + [_status, headers | data] = responses + message = IO.iodata_to_binary(data) + + code = + if code = get_header(headers, "x-clickhouse-exception-code") do + String.to_integer(code) + end + + {:error, Error.exception(code: code, message: message), conn} + end + end + + {:error, conn, error, _responses} -> + {:disconnect, error, conn} + end + end + + defp eat_ok_status_and_headers([{:status, _ref, 200} | rest]) do + eat_ok_status_and_headers(rest) + end + + defp eat_ok_status_and_headers([{:status, _ref, _status} | _rest]), do: :error + defp eat_ok_status_and_headers([{:headers, _ref, _headers} | data]), do: {:ok, data} + defp eat_ok_status_and_headers([]), do: :more + @impl true - def handle_fetch(_query, {types, ref}, opts, conn) do + def handle_fetch(query, result, opts, %{conn: conn, buffer: buffer}) do + case buffer do + [] -> handle_fetch(query, result, opts, conn) + _not_empty -> {halt_or_cont(buffer), %Result{result | data: extract_data(buffer)}, conn} + end + end + + def handle_fetch(_query, result, opts, conn) do case HTTP.recv(conn, 0, timeout(conn, opts)) do {:ok, conn, responses} -> - {halt_or_cont(responses, ref), {:stream, types, responses}, conn} + {halt_or_cont(responses), %Result{result | data: extract_data(responses)}, conn} {:error, conn, reason, _responses} -> {:disconnect, reason, conn} end end - defp halt_or_cont([{:done, ref}], ref), do: :halt + defp halt_or_cont([{:done, _ref}]), do: :halt + defp halt_or_cont([_ | rest]), do: halt_or_cont(rest) + defp halt_or_cont([]), do: :cont - defp halt_or_cont([{tag, ref, _data} | rest], ref) when tag in [:data, :status, :headers] do - halt_or_cont(rest, ref) - end - - defp halt_or_cont([], _ref), do: :cont + defp extract_data([{:data, _ref, data} | rest]), do: [data | extract_data(rest)] + defp extract_data([] = empty), do: empty + defp extract_data([{:done, _ref}]), do: [] @impl true - def handle_deallocate(_query, _ref, _opts, conn) do + def handle_deallocate(_query, result, _opts, conn) do case HTTP.open_request_count(conn) do 0 -> - {:ok, [], conn} + # TODO data: [], anything else? + {:ok, %Result{result | data: []}, conn} 1 -> {:disconnect, Error.exception("cannot stop stream before receiving full response"), conn} @@ -172,8 +228,8 @@ defmodule Ch.Connection do | {:error, Error.t(), conn} | {:disconnect, Mint.Types.error(), conn} defp request(conn, method, path, headers, body, opts) do - with {:ok, conn, ref} <- send_request(conn, method, path, headers, body) do - receive_response(conn, ref, timeout(conn, opts)) + with {:ok, conn, _ref} <- send_request(conn, method, path, headers, body) do + receive_full_response(conn, timeout(conn, opts)) end end @@ -191,7 +247,7 @@ defmodule Ch.Connection do def request_chunked(conn, method, path, headers, stream, opts) do with {:ok, conn, ref} <- send_request(conn, method, path, headers, :stream), {:ok, conn} <- stream_body(conn, ref, stream), - do: receive_response(conn, ref, timeout(conn, opts)) + do: receive_full_response(conn, timeout(conn, opts)) end @spec stream_body(conn, Mint.Types.request_ref(), Enumerable.t()) :: @@ -220,12 +276,12 @@ defmodule Ch.Connection do end end - @spec receive_response(conn, Mint.Types.request_ref(), timeout) :: + @spec receive_full_response(conn, timeout) :: {:ok, conn, [response]} | {:error, Error.t(), conn} | {:disconnect, Mint.Types.error(), conn} - defp receive_response(conn, ref, timeout) do - with {:ok, conn, responses} <- recv(conn, ref, [], timeout) do + defp receive_full_response(conn, timeout) do + with {:ok, conn, responses} <- recv_all(conn, [], timeout) do case responses do [200, headers | _rest] -> conn = ensure_same_server(conn, headers) @@ -244,14 +300,14 @@ defmodule Ch.Connection do end end - @spec recv(conn, Mint.Types.request_ref(), [response], timeout()) :: + @spec recv_all(conn, [response], timeout()) :: {:ok, conn, [response]} | {:disconnect, Mint.Types.error(), conn} - defp recv(conn, ref, acc, timeout) do + defp recv_all(conn, acc, timeout) do case HTTP.recv(conn, 0, timeout) do {:ok, conn, responses} -> - case handle_responses(responses, ref, acc) do + case handle_all_responses(responses, acc) do {:ok, responses} -> {:ok, conn, responses} - {:more, acc} -> recv(conn, ref, acc, timeout) + {:more, acc} -> recv_all(conn, acc, timeout) end {:error, conn, reason, _responses} -> @@ -259,16 +315,14 @@ defmodule Ch.Connection do end end - defp handle_responses([{:done, ref}], ref, acc) do - {:ok, :lists.reverse(acc)} - end - - defp handle_responses([{tag, ref, data} | rest], ref, acc) - when tag in [:data, :status, :headers] do - handle_responses(rest, ref, [data | acc]) + for tag <- [:data, :status, :headers] do + defp handle_all_responses([{unquote(tag), _ref, data} | rest], acc) do + handle_all_responses(rest, [data | acc]) + end end - defp handle_responses([], _ref, acc), do: {:more, acc} + defp handle_all_responses([{:done, _ref}], acc), do: {:ok, :lists.reverse(acc)} + defp handle_all_responses([], acc), do: {:more, acc} defp maybe_put_private(conn, _k, nil), do: conn defp maybe_put_private(conn, k, v), do: HTTP.put_private(conn, k, v) diff --git a/lib/ch/query.ex b/lib/ch/query.ex index 3e9bae2..2f850d4 100644 --- a/lib/ch/query.ex +++ b/lib/ch/query.ex @@ -86,7 +86,7 @@ defimpl DBConnection.Query, for: Ch.Query do @spec decode(Query.t(), [response], [Ch.query_option()]) :: Result.t() when response: Mint.Types.status() | Mint.Types.headers() | binary - def decode(%Query{command: command}, responses, opts) do + def decode(%Query{command: command}, responses, opts) when is_list(responses) do [_status, headers | data] = responses format = get_header(headers, "x-clickhouse-format") decode = Keyword.get(opts, :decode, true) @@ -110,6 +110,9 @@ defimpl DBConnection.Query, for: Ch.Query do end end + # stream result + def decode(_query, %Result{} = result, _opts), do: result + defp get_header(headers, key) do case List.keyfind(headers, key, 0) do {_, value} -> value diff --git a/test/ch/connection_test.exs b/test/ch/connection_test.exs index fd925b4..d1423e1 100644 --- a/test/ch/connection_test.exs +++ b/test/ch/connection_test.exs @@ -1309,54 +1309,20 @@ defmodule Ch.ConnectionTest do end describe "stream" do - @tag :skip - test "sends mint http packets", %{conn: conn} do - stmt = "select number from system.numbers limit 1000" - - drop_ref = fn packets -> - Enum.map(packets, fn - {tag, _ref, data} -> {tag, data} - {tag, _ref} -> tag - end) - end - - packets = + test "emits result structs containing raw data", %{conn: conn} do + results = DBConnection.run(conn, fn conn -> conn - |> Ch.stream(stmt) - |> Enum.flat_map(drop_ref) - end) - - assert [{:status, 200}, {:headers, headers} | _rest] = packets - - assert List.keyfind!(headers, "transfer-encoding", 0) == {"transfer-encoding", "chunked"} - - assert data_packets = - packets - |> Enum.filter(&match?({:data, _data}, &1)) - |> Enum.map(fn {:data, data} -> data end) - - assert length(data_packets) >= 2 - assert RowBinary.decode_rows(Enum.join(data_packets)) == Enum.map(0..999, &[&1]) - - assert List.last(packets) == :done - end - - @tag :skip - test "decodes RowBinary", %{conn: conn} do - stmt = "select number from system.numbers limit 1000" - - rows = - DBConnection.run(conn, fn conn -> - conn - |> Ch.stream(stmt, _params = [], types: [:u64]) + |> Ch.stream("select number from system.numbers limit 1000") |> Enum.into([]) end) - assert List.flatten(rows) == Enum.into(0..999, []) + assert length(results) >= 2 + + assert results |> Enum.map(& &1.data) |> IO.iodata_to_binary() |> RowBinary.decode_rows() == + Enum.map(0..999, &[&1]) end - @tag :skip test "disconnects on early halt", %{conn: conn} do logs = ExUnit.CaptureLog.capture_log(fn -> diff --git a/test/ch/stream_test.exs b/test/ch/stream_test.exs new file mode 100644 index 0000000..405398d --- /dev/null +++ b/test/ch/stream_test.exs @@ -0,0 +1,40 @@ +defmodule Ch.StreamTest do + use ExUnit.Case + alias Ch.{Result, RowBinary} + + setup do + {:ok, conn: start_supervised!({Ch, database: Ch.Test.database()})} + end + + describe "Ch.stream/4" do + test "emits %Ch.Result{}", %{conn: conn} do + count = 1_000_000 + + assert [%Result{command: :select, data: header} | rest] = + DBConnection.run(conn, fn conn -> + conn + |> Ch.stream("select * from numbers({count:UInt64})", %{"count" => 1_000_000}) + |> Enum.into([]) + end) + + assert header == [<<1, 6, "number", 6, "UInt64">>] + + decoded = + Enum.flat_map(rest, fn %Result{data: data} -> + data |> IO.iodata_to_binary() |> RowBinary.decode_rows([:u64]) + end) + + assert length(decoded) == count + end + + test "raises on error", %{conn: conn} do + assert_raise Ch.Error, + ~r/Code: 62. DB::Exception: Syntax error: failed at position 8/, + fn -> + DBConnection.run(conn, fn conn -> + conn |> Ch.stream("select ", %{"count" => 1_000_000}) |> Enum.into([]) + end) + end + end + end +end From 7196e80736893800fd04e122374d046373ff14ce Mon Sep 17 00:00:00 2001 From: ruslandoga <67764432+ruslandoga@users.noreply.github.com> Date: Thu, 11 Jan 2024 14:30:47 +0900 Subject: [PATCH 10/20] make Ch.stream/4 collectable --- CHANGELOG.md | 1 + README.md | 18 +++------ bench/insert.exs | 6 +-- lib/ch.ex | 4 +- lib/ch/connection.ex | 74 +++++++++++++++++-------------------- lib/ch/query.ex | 20 +++++++++- lib/ch/stream.ex | 43 +++++++++++++++++++++ test/ch/connection_test.exs | 12 +++--- test/ch/faults_test.exs | 74 +++++++++++++++++++------------------ test/ch/stream_test.exs | 19 +++++++++- 10 files changed, 169 insertions(+), 102 deletions(-) create mode 100644 lib/ch/stream.ex diff --git a/CHANGELOG.md b/CHANGELOG.md index a60069e..2107604 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ - drop `:headers` from `%Ch.Result{}` but add `:data` https://github.com/plausible/ch/pull/144 - fix query string escaping for `\t`, `\\`, and `\n` https://github.com/plausible/ch/pull/147 - make `Ch.stream/4` emit `%Ch.Result{data: iodata}` https://github.com/plausible/ch/pull/148 +- make `Ch.stream/4` collectable and remove stream support in `Ch.query/4` https://github.com/plausible/ch/pull/149 ## 0.2.4 (2024-01-29) diff --git a/README.md b/README.md index 7d84aeb..cf9b1a6 100644 --- a/README.md +++ b/README.md @@ -120,7 +120,7 @@ row_binary = Ch.RowBinary.encode_rows(rows, types) ]) ``` -#### Insert rows in custom [format](https://clickhouse.com/docs/en/interfaces/formats) +#### Insert custom [format](https://clickhouse.com/docs/en/interfaces/formats) ```elixir {:ok, pid} = Ch.start_link() @@ -133,26 +133,20 @@ csv = "0\n1" Ch.query!(pid, ["INSERT INTO ch_demo(id) FORMAT CSV\n" | csv]) ``` -#### Insert rows as [chunked](https://en.wikipedia.org/wiki/Chunked_transfer_encoding) RowBinary stream +#### Insert [chunked](https://en.wikipedia.org/wiki/Chunked_transfer_encoding) RowBinary stream ```elixir {:ok, pid} = Ch.start_link() Ch.query!(pid, "CREATE TABLE IF NOT EXISTS ch_demo(id UInt64) ENGINE Null") -row_binary = +DBConnection.run(pid, fn conn -> Stream.repeatedly(fn -> [:rand.uniform(100)] end) |> Stream.chunk_every(100_000) |> Stream.map(fn chunk -> Ch.RowBinary.encode_rows(chunk, _types = ["UInt64"]) end) - |> Stream.take(10) - -%Ch.Result{num_rows: 1_000_000} = - Ch.query(pid, - Stream.concat( - ["INSERT INTO ch_demo(id) FORMAT RowBinary\n"], - row_binary - ) - ) + |> Stream.take(10) + |> Enum.into(Ch.stream(conn, "INSERT INTO ch_demo(id) FORMAT RowBinary\n")) +end) ``` #### Insert rows via [input](https://clickhouse.com/docs/en/sql-reference/table-functions/input) function diff --git a/bench/insert.exs b/bench/insert.exs index b852196..7c38b36 100644 --- a/bench/insert.exs +++ b/bench/insert.exs @@ -43,12 +43,12 @@ Benchee.run( |> Stream.run() end, "insert stream" => fn rows -> - stream = + DBConnection.run(conn, fn conn -> rows |> Stream.chunk_every(60_000) |> Stream.map(fn chunk -> RowBinary.encode_rows(chunk, types) end) - - Ch.query!(conn, Stream.concat([statement], stream)) + |> Enum.into(Ch.stream(conn, statement)) + end) end }, inputs: %{ diff --git a/lib/ch.ex b/lib/ch.ex index 91b7336..b5c60ca 100644 --- a/lib/ch.ex +++ b/lib/ch.ex @@ -102,10 +102,10 @@ defmodule Ch do @doc """ Returns a stream for a query on a connection. """ - @spec stream(DBConnection.t(), statement, params, [query_option]) :: DBConnection.Stream.t() + @spec stream(DBConnection.t(), statement, params, [query_option]) :: Ch.Stream.t() def stream(conn, statement, params \\ [], opts \\ []) do query = Query.build(statement, opts) - DBConnection.stream(conn, query, params, opts) + %Ch.Stream{conn: conn, query: query, params: params, opts: opts} end if Code.ensure_loaded?(Ecto.ParameterizedType) do diff --git a/lib/ch/connection.ex b/lib/ch/connection.ex index d46a3d4..35de684 100644 --- a/lib/ch/connection.ex +++ b/lib/ch/connection.ex @@ -190,20 +190,46 @@ defmodule Ch.Connection do end @impl true - def handle_execute(%Query{statement: statement} = query, params, opts, conn) do + def handle_execute(%Query{statement: statement} = query, {:stream, params}, opts, conn) do {query_params, extra_headers} = params path = path(conn, query_params, opts) headers = headers(conn, extra_headers, opts) - result = - if is_list(statement) or is_binary(statement) do - request(conn, "POST", path, headers, statement, opts) - else - request_chunked(conn, "POST", path, headers, statement, opts) + with {:ok, conn, ref} <- send_request(conn, "POST", path, headers, :stream) do + case HTTP.stream_request_body(conn, ref, statement) do + {:ok, conn} -> {:ok, query, ref, conn} + {:error, conn, reason} -> {:disconnect, reason, conn} end + end + end + + def handle_execute(%Query{} = query, {:stream, ref, body}, opts, conn) do + case HTTP.stream_request_body(conn, ref, body) do + {:ok, conn} -> + case body do + :eof -> + with {:ok, conn, responses} <- receive_full_response(conn, timeout(conn, opts)) do + {:ok, query, responses, conn} + end + + _other -> + {:ok, query, ref, conn} + end + + {:error, conn, reason} -> + {:disconnect, reason, conn} + end + end + + def handle_execute(%Query{statement: statement} = query, params, opts, conn) + when is_list(statement) or is_binary(statement) do + {query_params, extra_headers} = params + + path = path(conn, query_params, opts) + headers = headers(conn, extra_headers, opts) - with {:ok, conn, responses} <- result do + with {:ok, conn, responses} <- request(conn, "POST", path, headers, statement, opts) do {:ok, query, responses, conn} end end @@ -233,40 +259,6 @@ defmodule Ch.Connection do end end - @spec request_chunked( - conn, - method :: String.t(), - path :: String.t(), - Mint.Types.headers(), - body :: Enumerable.t(), - [Ch.query_option()] - ) :: - {:ok, conn, [response]} - | {:error, Error.t(), conn} - | {:disconnect, Mint.Types.error(), conn} - def request_chunked(conn, method, path, headers, stream, opts) do - with {:ok, conn, ref} <- send_request(conn, method, path, headers, :stream), - {:ok, conn} <- stream_body(conn, ref, stream), - do: receive_full_response(conn, timeout(conn, opts)) - end - - @spec stream_body(conn, Mint.Types.request_ref(), Enumerable.t()) :: - {:ok, conn} | {:disconnect, Mint.Types.error(), conn} - defp stream_body(conn, ref, stream) do - result = - stream - |> Stream.concat([:eof]) - |> Enum.reduce_while({:ok, conn}, fn - chunk, {:ok, conn} -> {:cont, HTTP.stream_request_body(conn, ref, chunk)} - _chunk, {:error, _conn, _reason} = error -> {:halt, error} - end) - - case result do - {:ok, _conn} = ok -> ok - {:error, conn, reason} -> {:disconnect, reason, conn} - end - end - # stacktrace is a bit cleaner with this function inlined @compile inline: [send_request: 5] defp send_request(conn, method, path, headers, body) do diff --git a/lib/ch/query.ex b/lib/ch/query.ex index 2f850d4..3fa2de8 100644 --- a/lib/ch/query.ex +++ b/lib/ch/query.ex @@ -78,12 +78,26 @@ defimpl DBConnection.Query, for: Ch.Query do @spec encode(Query.t(), Ch.params(), [Ch.query_option()]) :: {Ch.Query.params(), Mint.Types.headers()} - def encode(%Query{}, params, opts) do + def encode(%Query{}, params, opts) when is_list(params) or is_map(params) do format = Keyword.get(opts, :format, "RowBinaryWithNamesAndTypes") headers = Keyword.get(opts, :headers, []) {query_params(params), [{"x-clickhouse-format", format} | headers]} end + # stream: insert init + @spec encode(Query.t(), {:stream, Ch.params()}, [Ch.query_option()]) :: + {:stream, {Ch.Query.params(), Mint.Types.headers()}} + def encode(query, {:stream, params}, opts) do + {:stream, encode(query, params, opts)} + end + + # stream: insert data chunk + @spec encode(Query.t(), {:stream, Mint.Types.request_ref(), iodata | :eof}, [Ch.query_option()]) :: + {:stream, Mint.Types.request_ref(), iodata | :eof} + def encode(_query, {:stream, ref, data}, _opts) do + {:stream, ref, data} + end + @spec decode(Query.t(), [response], [Ch.query_option()]) :: Result.t() when response: Mint.Types.status() | Mint.Types.headers() | binary def decode(%Query{command: command}, responses, opts) when is_list(responses) do @@ -110,8 +124,10 @@ defimpl DBConnection.Query, for: Ch.Query do end end - # stream result + # stream: select result def decode(_query, %Result{} = result, _opts), do: result + # stream: insert result + def decode(_query, ref, _opts) when is_reference(ref), do: ref defp get_header(headers, key) do case List.keyfind(headers, key, 0) do diff --git a/lib/ch/stream.ex b/lib/ch/stream.ex new file mode 100644 index 0000000..6569782 --- /dev/null +++ b/lib/ch/stream.ex @@ -0,0 +1,43 @@ +defmodule Ch.Stream do + @moduledoc false + + @derive {Inspect, only: []} + defstruct [:conn, :ref, :query, :params, :opts] + + @type t :: %__MODULE__{ + conn: DBConnection.conn(), + ref: Mint.Types.request_ref() | nil, + query: Ch.Query.t(), + params: Ch.params(), + opts: [Ch.query_option()] + } + + defimpl Enumerable do + def reduce(stream, acc, fun) do + %Ch.Stream{conn: conn, query: query, params: params, opts: opts} = stream + stream = %DBConnection.Stream{conn: conn, query: query, params: params, opts: opts} + DBConnection.reduce(stream, acc, fun) + end + + def member?(_, _), do: {:error, __MODULE__} + def count(_), do: {:error, __MODULE__} + def slice(_), do: {:error, __MODULE__} + end + + defimpl Collectable do + def into(stream) do + %Ch.Stream{conn: conn, query: query, params: params, opts: opts} = stream + ref = DBConnection.execute!(conn, query, {:stream, params}, opts) + {%{stream | ref: ref}, &collect/2} + end + + defp collect(%{conn: conn, query: query, ref: ref} = stream, {:cont, data}) do + ^ref = DBConnection.execute!(conn, query, {:stream, ref, data}) + stream + end + + defp collect(%{conn: conn, query: query, ref: ref}, eof) when eof in [:halt, :done] do + DBConnection.execute!(conn, query, {:stream, ref, :eof}) + end + end +end diff --git a/test/ch/connection_test.exs b/test/ch/connection_test.exs index d1423e1..8b4a441 100644 --- a/test/ch/connection_test.exs +++ b/test/ch/connection_test.exs @@ -258,11 +258,13 @@ defmodule Ch.ConnectionTest do |> Stream.chunk_every(2) |> Stream.map(fn chunk -> RowBinary.encode_rows(chunk, types) end) - assert {:ok, %{num_rows: 3}} = - Ch.query( - conn, - Stream.concat(["insert into #{table}(a, b) format RowBinary\n"], row_binary) - ) + assert %{num_rows: 3} = + DBConnection.run(conn, fn conn -> + Enum.into( + row_binary, + Ch.stream(conn, "insert into #{table}(a, b) format RowBinary\n") + ) + end) assert {:ok, %{rows: rows}} = Ch.query(conn, "select * from {table:Identifier}", %{"table" => table}) diff --git a/test/ch/faults_test.exs b/test/ch/faults_test.exs index b54eccc..9a97969 100644 --- a/test/ch/faults_test.exs +++ b/test/ch/faults_test.exs @@ -364,14 +364,17 @@ defmodule Ch.FaultsTest do :ok = :gen_tcp.close(mint) spawn_link(fn -> - assert {:error, %Mint.TransportError{reason: :closed}} = - Ch.query( - conn, - Stream.concat( - ["insert into unknown_table(a,b) format RowBinary\n"], - stream - ) - ) + try do + DBConnection.run(conn, fn conn -> + Enum.into( + stream, + Ch.stream(conn, "insert into unknown_table(a,b) format RowBinary\n") + ) + end) + rescue + e in [DBConnection.ConnectionError, Mint.TransportError] -> + assert Exception.message(e) =~ "closed" + end end) # reconnect @@ -382,16 +385,14 @@ defmodule Ch.FaultsTest do :ok = :gen_tcp.send(mint, intercept_packets(clickhouse)) spawn_link(fn -> - assert {:error, %Ch.Error{code: 60, message: message}} = - Ch.query( - conn, - Stream.concat( - ["insert into unknown_table(a,b) format RowBinary\n"], - stream - ) - ) - - assert message =~ ~r/UNKNOWN_TABLE/ + assert_raise Ch.Error, ~r/UNKNOWN_TABLE/, fn -> + DBConnection.run(conn, fn conn -> + Enum.into( + stream, + Ch.stream(conn, "insert into unknown_table(a,b) format RowBinary\n") + ) + end) + end send(test, :done) end) @@ -425,14 +426,17 @@ defmodule Ch.FaultsTest do :ok = :gen_tcp.send(mint, intercept_packets(clickhouse)) spawn_link(fn -> - assert {:error, %Mint.TransportError{reason: :closed}} = - Ch.query( - conn, - Stream.concat( - ["insert into unknown_table(a,b) format RowBinary\n"], - stream - ) - ) + try do + DBConnection.run(conn, fn conn -> + Enum.into( + stream, + Ch.stream(conn, "insert into unknown_table(a,b) format RowBinary\n") + ) + end) + rescue + e in [DBConnection.ConnectionError, Mint.TransportError] -> + assert Exception.message(e) =~ "closed" + end end) # close after first packet from mint arrives @@ -447,16 +451,14 @@ defmodule Ch.FaultsTest do :ok = :gen_tcp.send(mint, intercept_packets(clickhouse)) spawn_link(fn -> - assert {:error, %Ch.Error{code: 60, message: message}} = - Ch.query( - conn, - Stream.concat( - ["insert into unknown_table(a,b) format RowBinary\n"], - stream - ) - ) - - assert message =~ ~r/UNKNOWN_TABLE/ + assert_raise Ch.Error, ~r/UNKNOWN_TABLE/, fn -> + DBConnection.run(conn, fn conn -> + Enum.into( + stream, + Ch.stream(conn, "insert into unknown_table(a,b) format RowBinary\n") + ) + end) + end send(test, :done) end) diff --git a/test/ch/stream_test.exs b/test/ch/stream_test.exs index 405398d..7d0fa84 100644 --- a/test/ch/stream_test.exs +++ b/test/ch/stream_test.exs @@ -6,7 +6,7 @@ defmodule Ch.StreamTest do {:ok, conn: start_supervised!({Ch, database: Ch.Test.database()})} end - describe "Ch.stream/4" do + describe "enumerable Ch.stream/4" do test "emits %Ch.Result{}", %{conn: conn} do count = 1_000_000 @@ -37,4 +37,21 @@ defmodule Ch.StreamTest do end end end + + describe "collectable Ch.stream/4" do + test "inserts chunks", %{conn: conn} do + Ch.query!(conn, "create table collect_stream(i UInt64) engine Memory") + + assert %Ch.Result{command: :insert, num_rows: 1_000_000} = + DBConnection.run(conn, fn conn -> + Stream.repeatedly(fn -> [:rand.uniform(100)] end) + |> Stream.chunk_every(100_000) + |> Stream.map(fn chunk -> RowBinary.encode_rows(chunk, _types = ["UInt64"]) end) + |> Stream.take(10) + |> Enum.into(Ch.stream(conn, "insert into collect_stream(i) format RowBinary\n")) + end) + + assert Ch.query!(conn, "select count(*) from collect_stream").rows == [[1_000_000]] + end + end end From 462989e7a0398042ad9f273608bf1df10909638f Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Thu, 11 Jan 2024 11:50:37 +0000 Subject: [PATCH 11/20] Bump ex_doc from 0.31.0 to 0.31.1 Bumps [ex_doc](https://github.com/elixir-lang/ex_doc) from 0.31.0 to 0.31.1. - [Release notes](https://github.com/elixir-lang/ex_doc/releases) - [Changelog](https://github.com/elixir-lang/ex_doc/blob/main/CHANGELOG.md) - [Commits](https://github.com/elixir-lang/ex_doc/compare/v0.31.0...v0.31.1) --- updated-dependencies: - dependency-name: ex_doc dependency-type: direct:development update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] From 263b60f88efcc060b418c2c1d88634a86901ad06 Mon Sep 17 00:00:00 2001 From: ruslandoga <67764432+ruslandoga@users.noreply.github.com> Date: Mon, 29 Jan 2024 11:53:31 +0900 Subject: [PATCH 12/20] split format ci into separate job --- .github/workflows/bench.yml | 4 ++-- .github/workflows/test.yml | 1 - 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/.github/workflows/bench.yml b/.github/workflows/bench.yml index 995235b..691fa2a 100644 --- a/.github/workflows/bench.yml +++ b/.github/workflows/bench.yml @@ -1,9 +1,9 @@ -name: bench +name: mix on: workflow_dispatch jobs: - benchee: + bench: runs-on: ubuntu-latest env: diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index b2fafaf..37051bb 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -63,7 +63,6 @@ jobs: test-${{ matrix.otp }}-${{ matrix.elixir }}-refs/heads/master- - run: mix deps.get --only $MIX_ENV - - run: mix format --check-formatted - run: mix compile --warnings-as-errors - run: mix test --include slow From 2dd97ae73fbe44679673ad8301aecdb3600882d9 Mon Sep 17 00:00:00 2001 From: ruslandoga <67764432+ruslandoga@users.noreply.github.com> Date: Fri, 12 Jan 2024 10:49:21 +0900 Subject: [PATCH 13/20] use different name for bench ci --- .github/workflows/bench.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/bench.yml b/.github/workflows/bench.yml index 691fa2a..995235b 100644 --- a/.github/workflows/bench.yml +++ b/.github/workflows/bench.yml @@ -1,9 +1,9 @@ -name: mix +name: bench on: workflow_dispatch jobs: - bench: + benchee: runs-on: ubuntu-latest env: From dbff0553e34b7ec44213bd00d4242fd1cddec385 Mon Sep 17 00:00:00 2001 From: ruslandoga <67764432+ruslandoga@users.noreply.github.com> Date: Fri, 12 Jan 2024 10:50:30 +0900 Subject: [PATCH 14/20] set infinite timeout in benches --- bench/insert.exs | 16 ++++++++++------ bench/stream.exs | 24 ++++++++++++++---------- 2 files changed, 24 insertions(+), 16 deletions(-) diff --git a/bench/insert.exs b/bench/insert.exs index 7c38b36..a8baa60 100644 --- a/bench/insert.exs +++ b/bench/insert.exs @@ -43,12 +43,16 @@ Benchee.run( |> Stream.run() end, "insert stream" => fn rows -> - DBConnection.run(conn, fn conn -> - rows - |> Stream.chunk_every(60_000) - |> Stream.map(fn chunk -> RowBinary.encode_rows(chunk, types) end) - |> Enum.into(Ch.stream(conn, statement)) - end) + DBConnection.run( + conn, + fn conn -> + rows + |> Stream.chunk_every(60_000) + |> Stream.map(fn chunk -> RowBinary.encode_rows(chunk, types) end) + |> Enum.into(Ch.stream(conn, statement)) + end, + timeout: :infinity + ) end }, inputs: %{ diff --git a/bench/stream.exs b/bench/stream.exs index 85ba295..5c3f140 100644 --- a/bench/stream.exs +++ b/bench/stream.exs @@ -24,16 +24,20 @@ Benchee.run( ) end, "RowBinary stream with manual decode" => fn statement -> - DBConnection.run(conn, fn conn -> - conn - |> Ch.stream(statement, _params = [], format: "RowBinary") - |> Stream.map(fn %Ch.Result{data: data} -> - data - |> IO.iodata_to_binary() - |> Ch.RowBinary.decode_rows([:u64]) - end) - |> Stream.run() - end) + DBConnection.run( + conn, + fn conn -> + conn + |> Ch.stream(statement, _params = [], format: "RowBinary") + |> Stream.map(fn %Ch.Result{data: data} -> + data + |> IO.iodata_to_binary() + |> Ch.RowBinary.decode_rows([:u64]) + end) + |> Stream.run() + end, + timeout: :infinity + ) end }, inputs: %{ From 9d8e88a60b375ef6f51911651fdbd4b614fda116 Mon Sep 17 00:00:00 2001 From: ruslandoga <67764432+ruslandoga@users.noreply.github.com> Date: Mon, 29 Jan 2024 11:54:24 +0900 Subject: [PATCH 15/20] test latest clickhouse by default, but include Plausible-specific version in the matrix --- test/ch/connection_test.exs | 3 ++- test/ch/stream_test.exs | 11 +++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/test/ch/connection_test.exs b/test/ch/connection_test.exs index 8b4a441..55c998a 100644 --- a/test/ch/connection_test.exs +++ b/test/ch/connection_test.exs @@ -1282,8 +1282,9 @@ defmodule Ch.ConnectionTest do assert {:error, %Ch.Error{code: 81} = error} = Ch.query(conn, "select 1 + 1", _params = [], database: "no-db") + assert Exception.message(error) =~ "Code: 81." assert Exception.message(error) =~ "`no-db`" - assert Exception.message(error) =~ "UNKNOWN_DATABASE" + assert Exception.message(error) =~ "(UNKNOWN_DATABASE)" end test "can provide custom database", %{conn: conn} do diff --git a/test/ch/stream_test.exs b/test/ch/stream_test.exs index 7d0fa84..8c8292f 100644 --- a/test/ch/stream_test.exs +++ b/test/ch/stream_test.exs @@ -10,20 +10,19 @@ defmodule Ch.StreamTest do test "emits %Ch.Result{}", %{conn: conn} do count = 1_000_000 - assert [%Result{command: :select, data: header} | rest] = + assert [%Result{command: :select, data: header} | _rest] = + results = DBConnection.run(conn, fn conn -> conn |> Ch.stream("select * from numbers({count:UInt64})", %{"count" => 1_000_000}) |> Enum.into([]) end) - assert header == [<<1, 6, "number", 6, "UInt64">>] + assert [<<1, 6, "number", 6, "UInt64">> | _] = header - decoded = - Enum.flat_map(rest, fn %Result{data: data} -> - data |> IO.iodata_to_binary() |> RowBinary.decode_rows([:u64]) - end) + decoded = results |> Enum.map(& &1.data) |> IO.iodata_to_binary() |> RowBinary.decode_rows() + assert [[0], [1], [2] | _] = decoded assert length(decoded) == count end From 7572662505239452313b72331109b5607955678a Mon Sep 17 00:00:00 2001 From: ruslandoga <67764432+ruslandoga@users.noreply.github.com> Date: Fri, 12 Jan 2024 11:18:46 +0900 Subject: [PATCH 16/20] refactor benchmarks a bit --- bench/insert.exs | 8 ++++++-- bench/stream.exs | 4 ++-- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/bench/insert.exs b/bench/insert.exs index a8baa60..c3502a2 100644 --- a/bench/insert.exs +++ b/bench/insert.exs @@ -37,18 +37,22 @@ Benchee.run( end, # "control stream" => fn rows -> rows |> Stream.chunk_every(60_000) |> Stream.run() end, "encode stream" => fn rows -> + encoding_types = Ch.RowBinary.encoding_types(types) + rows |> Stream.chunk_every(60_000) - |> Stream.map(fn chunk -> RowBinary.encode_rows(chunk, types) end) + |> Stream.map(fn chunk -> RowBinary._encode_rows(chunk, encoding_types) end) |> Stream.run() end, "insert stream" => fn rows -> DBConnection.run( conn, fn conn -> + encoding_types = Ch.RowBinary.encoding_types(types) + rows |> Stream.chunk_every(60_000) - |> Stream.map(fn chunk -> RowBinary.encode_rows(chunk, types) end) + |> Stream.map(fn chunk -> RowBinary._encode_rows(chunk, encoding_types) end) |> Enum.into(Ch.stream(conn, statement)) end, timeout: :infinity diff --git a/bench/stream.exs b/bench/stream.exs index 5c3f140..c48196a 100644 --- a/bench/stream.exs +++ b/bench/stream.exs @@ -17,7 +17,7 @@ Benchee.run( conn, fn conn -> conn - |> Ch.stream(statement, _params = [], format: "RowBinary") + |> Ch.stream(statement, _params = [], format: "RowBinary", timeout: :infinity) |> Stream.run() end, timeout: :infinity @@ -28,7 +28,7 @@ Benchee.run( conn, fn conn -> conn - |> Ch.stream(statement, _params = [], format: "RowBinary") + |> Ch.stream(statement, _params = [], format: "RowBinary", timeout: :infinity) |> Stream.map(fn %Ch.Result{data: data} -> data |> IO.iodata_to_binary() From 7de46a2e3f00f339703995777e18303607a181d0 Mon Sep 17 00:00:00 2001 From: ruslandoga <67764432+ruslandoga@users.noreply.github.com> Date: Fri, 12 Jan 2024 11:21:48 +0900 Subject: [PATCH 17/20] use latest clickhouse in bench From 2abbdaeae8f89f19da4e8ddd4c515d66ff3c7b08 Mon Sep 17 00:00:00 2001 From: ruslandoga <67764432+ruslandoga@users.noreply.github.com> Date: Fri, 12 Jan 2024 11:27:27 +0900 Subject: [PATCH 18/20] remove old benchmark results from readme --- README.md | 117 +----------------------------------------------------- 1 file changed, 1 insertion(+), 116 deletions(-) diff --git a/README.md b/README.md index cf9b1a6..131da34 100644 --- a/README.md +++ b/README.md @@ -288,119 +288,4 @@ Ch.RowBinary.encode_rows([[naive], [utc], [taipei]], ["DateTime"]) ## Benchmarks -
-INSERT 1 million rows (original) - -

-$ MIX_ENV=bench mix run bench/insert.exs
-
-This benchmark is based on https://github.com/ClickHouse/clickhouse-go#benchmark
-
-Operating System: macOS
-CPU Information: Apple M1
-Number of Available Cores: 8
-Available memory: 8 GB
-Elixir 1.14.4
-Erlang 25.3
-
-Benchmark suite executing with the following configuration:
-warmup: 2 s
-time: 5 s
-memory time: 0 ns
-reduction time: 0 ns
-parallel: 1
-inputs: 1_000_000 rows
-Estimated total run time: 28 s
-
-Benchmarking encode with input 1_000_000 rows ...
-Benchmarking encode stream with input 1_000_000 rows ...
-Benchmarking insert with input 1_000_000 rows ...
-Benchmarking insert stream with input 1_000_000 rows ...
-
-##### With input 1_000_000 rows #####
-Name                    ips        average  deviation         median         99th %
-encode stream          1.63      612.96 ms    ±11.30%      583.03 ms      773.01 ms
-insert stream          1.22      819.82 ms     ±9.41%      798.94 ms      973.45 ms
-encode                 1.09      915.75 ms    ±44.13%      750.98 ms     1637.02 ms
-insert                 0.73     1373.84 ms    ±31.01%     1331.86 ms     1915.76 ms
-
-Comparison: 
-encode stream          1.63
-insert stream          1.22 - 1.34x slower +206.87 ms
-encode                 1.09 - 1.49x slower +302.79 ms
-insert                 0.73 - 2.24x slower +760.88 ms
-
- -
- -
-SELECT 500, 500 thousand, and 500 million rows (original) - -

-$ MIX_ENV=bench mix run bench/stream.exs
-
-This benchmark is based on https://github.com/ClickHouse/ch-bench
-
-Operating System: macOS
-CPU Information: Apple M1
-Number of Available Cores: 8
-Available memory: 8 GB
-Elixir 1.14.4
-Erlang 25.3
-
-Benchmark suite executing with the following configuration:
-warmup: 2 s
-time: 5 s
-memory time: 0 ns
-reduction time: 0 ns
-parallel: 1
-inputs: 500 rows, 500_000 rows, 500_000_000 rows
-Estimated total run time: 1.05 min
-
-Benchmarking stream with decode with input 500 rows ...
-Benchmarking stream with decode with input 500_000 rows ...
-Benchmarking stream with decode with input 500_000_000 rows ...
-Benchmarking stream with manual decode with input 500 rows ...
-Benchmarking stream with manual decode with input 500_000 rows ...
-Benchmarking stream with manual decode with input 500_000_000 rows ...
-Benchmarking stream without decode with input 500 rows ...
-Benchmarking stream without decode with input 500_000 rows ...
-Benchmarking stream without decode with input 500_000_000 rows ...
-
-##### With input 500 rows #####
-Name                                ips        average  deviation         median         99th %
-stream with decode               4.69 K      213.34 μs    ±12.49%      211.38 μs      290.94 μs
-stream with manual decode        4.69 K      213.43 μs    ±17.40%      210.96 μs      298.75 μs
-stream without decode            4.65 K      215.08 μs    ±10.79%      213.79 μs      284.66 μs
-
-Comparison:
-stream with decode               4.69 K
-stream with manual decode        4.69 K - 1.00x slower +0.0838 μs
-stream without decode            4.65 K - 1.01x slower +1.74 μs
-
-##### With input 500_000 rows #####
-Name                                ips        average  deviation         median         99th %
-stream without decode            234.58        4.26 ms    ±13.99%        4.04 ms        5.95 ms
-stream with manual decode         64.26       15.56 ms     ±8.36%       15.86 ms       17.97 ms
-stream with decode                41.03       24.37 ms     ±6.27%       24.39 ms       26.60 ms
-
-Comparison:
-stream without decode            234.58
-stream with manual decode         64.26 - 3.65x slower +11.30 ms
-stream with decode                41.03 - 5.72x slower +20.11 ms
-
-##### With input 500_000_000 rows #####
-Name                                ips        average  deviation         median         99th %
-stream without decode              0.32         3.17 s     ±0.20%         3.17 s         3.17 s
-stream with manual decode        0.0891        11.23 s     ±0.00%        11.23 s        11.23 s
-stream with decode               0.0462        21.66 s     ±0.00%        21.66 s        21.66 s
-
-Comparison:
-stream without decode              0.32
-stream with manual decode        0.0891 - 3.55x slower +8.06 s
-stream with decode               0.0462 - 6.84x slower +18.50 s
-
- -
- -[CI Results](https://github.com/plausible/ch/actions/workflows/bench.yml) (click the latest workflow run and scroll down to "Artifacts") +Please see [CI Results](https://github.com/plausible/ch/actions/workflows/bench.yml) (make sure to click the latest workflow run and scroll down to "Artifacts") for [some of our benchmarks.](./bench/) :) From 92fe3fe90e7272209116ea2f8f21e8dabeb975e3 Mon Sep 17 00:00:00 2001 From: ruslandoga <67764432+ruslandoga@users.noreply.github.com> Date: Mon, 29 Jan 2024 11:56:13 +0900 Subject: [PATCH 19/20] remove query string fix from changelog --- CHANGELOG.md | 1 - 1 file changed, 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2107604..988ed64 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,7 +4,6 @@ - move rows payload (RowBinary, CSV, etc.) to SQL statement and remove pseudo-positional binds, making param names explicit https://github.com/plausible/ch/pull/143 - drop `:headers` from `%Ch.Result{}` but add `:data` https://github.com/plausible/ch/pull/144 -- fix query string escaping for `\t`, `\\`, and `\n` https://github.com/plausible/ch/pull/147 - make `Ch.stream/4` emit `%Ch.Result{data: iodata}` https://github.com/plausible/ch/pull/148 - make `Ch.stream/4` collectable and remove stream support in `Ch.query/4` https://github.com/plausible/ch/pull/149 From 822fee15234f00494c4c758ec774fd41726d5a85 Mon Sep 17 00:00:00 2001 From: ruslandoga <67764432+ruslandoga@users.noreply.github.com> Date: Tue, 5 Mar 2024 17:53:31 +0800 Subject: [PATCH 20/20] eh --- README.md | 89 ++++++++++++++++++++++++------------------------------- 1 file changed, 38 insertions(+), 51 deletions(-) diff --git a/README.md b/README.md index 131da34..8247a89 100644 --- a/README.md +++ b/README.md @@ -5,23 +5,22 @@ Minimal HTTP ClickHouse client for Elixir. -Used in [Ecto ClickHouse adapter.](https://github.com/plausible/chto) +Used in [Ecto ClickHouse adapter.](https://github.com/plausible/ecto_ch) ### Key features -- RowBinary - Native query parameters - Per query settings - Minimal API -Your ideas are welcome [here.](https://github.com/plausible/ch/issues/82) - ## Installation ```elixir defp deps do [ - {:ch, "~> 0.2.0"} + {:ch, "~> 0.3.0"}, + # TODO ch_native, ch_http + {:ch_row_binary, "~> 0.1.0"} ] end ``` @@ -38,7 +37,15 @@ defaults = [ database: "default", settings: [], pool_size: 1, - timeout: :timer.seconds(15) + timeout: :timer.seconds(15), + # which means the default format for the HTTP interface in your + # ClickHouse server will be used which is usually TSV + format: nil, + decode: %{ + # x-clickhouse-format -> function + "RowBinaryWithNamesAndTypes" => &Ch.RowBinary.decode/1, + "Native" => &Ch.Native.decode/1 + } ] {:ok, pid} = Ch.start_link(defaults) @@ -49,22 +56,19 @@ defaults = [ ```elixir {:ok, pid} = Ch.start_link() -{:ok, %Ch.Result{rows: [[0], [1], [2]]}} = +{:ok, %Ch.Result{decoded: nil, data: _tsv = ""} = Ch.query(pid, "SELECT * FROM system.numbers LIMIT 3") -{:ok, %Ch.Result{rows: [[0], [1], [2]]}} = - Ch.query(pid, "SELECT * FROM system.numbers LIMIT {$0:UInt8}", [3]) +{:ok, %Ch.Result{decoded: %{columns: ["number"], rows: [[0], [1], [2]]}}} = + Ch.query(pid, "SELECT * FROM system.numbers LIMIT 3 FORMAT RowBinaryWithNamesAndTypes") + +{:ok, %Ch.Result{decoded: %{num_rows: 3, num_columns: 1, data: %{"number" => [0, 1, 2]}}}} = + Ch.query(pid, "SELECT * FROM system.numbers LIMIT 3 FORMAT Native") -{:ok, %Ch.Result{rows: [[0], [1], [2]]}} = +{:ok, %Ch.Result{columns: %{"number" => [0, 1, 2]}}} = Ch.query(pid, "SELECT * FROM system.numbers LIMIT {limit:UInt8}", %{"limit" => 3}) ``` -Note on datetime encoding in query parameters: - -- `%NaiveDateTime{}` is encoded as text to make it assume the column's or ClickHouse server's timezone -- `%DateTime{time_zone: "Etc/UTC"}` is encoded as unix timestamp and is treated as UTC timestamp by ClickHouse -- encoding non UTC `%DateTime{}` raises `ArgumentError` - #### Insert rows ```elixir @@ -84,40 +88,22 @@ Ch.query!(pid, "CREATE TABLE IF NOT EXISTS ch_demo(id UInt64) ENGINE Null") #### Insert [RowBinary](https://clickhouse.com/docs/en/interfaces/formats#rowbinary) +#### Insert [Native](https://clickhouse.com/docs/en/interfaces/formats#native) + ```elixir {:ok, pid} = Ch.start_link() -Ch.query!(pid, "CREATE TABLE IF NOT EXISTS ch_demo(id UInt64) ENGINE Null") - -types = ["UInt64"] -# or -types = [Ch.Types.u64()] -# or -types = [:u64] +Ch.query!(pid, "CREATE TABLE IF NOT EXISTS ch_demo(id UInt64, name String) ENGINE Null") -rows = [[0], [1]] -row_binary = Ch.RowBinary.encode_rows(rows, types) +iodata = [ + _cols = 2, + _rows = 3, + Ch.Native.encode_column("id", :u64, [0, 1, 2]), + Ch.Native.encode_column("name", :string, ["alice", "bob", "charlie"]) +] %Ch.Result{num_rows: 2} = - Ch.query!(pid, ["INSERT INTO ch_demo(id) FORMAT RowBinary\n" | row_binary]) -``` - -Note that RowBinary format encoding requires `:types` option to be provided. - -Similarly, you can use [`RowBinaryWithNamesAndTypes`](https://clickhouse.com/docs/en/interfaces/formats#rowbinarywithnamesandtypes) which would additionally do something like a type check. - -```elixir -names = ["id"] -types = ["UInt64"] - -header = Ch.RowBinary.encode_names_and_types(names, types) -row_binary = Ch.RowBinary.encode_rows(rows, types) - -%Ch.Result{num_rows: 3} = - Ch.query!(pid, [ - "INSERT INTO ch_demo FORMAT RowBinaryWithNamesAndTypes\n", - header | row_binary - ]) + Ch.query!(pid, ["INSERT INTO ch_demo FORMAT Native\n" | iodata]) ``` #### Insert custom [format](https://clickhouse.com/docs/en/interfaces/formats) @@ -143,9 +129,10 @@ Ch.query!(pid, "CREATE TABLE IF NOT EXISTS ch_demo(id UInt64) ENGINE Null") DBConnection.run(pid, fn conn -> Stream.repeatedly(fn -> [:rand.uniform(100)] end) |> Stream.chunk_every(100_000) - |> Stream.map(fn chunk -> Ch.RowBinary.encode_rows(chunk, _types = ["UInt64"]) end) + |> Stream.map(fn chunk -> Ch.RowBinary.encode_many(chunk, _types = ["UInt64"]) end) |> Stream.take(10) - |> Enum.into(Ch.stream(conn, "INSERT INTO ch_demo(id) FORMAT RowBinary\n")) + |> Stream.into(Ch.stream(conn, "INSERT INTO ch_demo(id) FORMAT RowBinary\n")) + |> Stream.run() end) ``` @@ -157,7 +144,7 @@ end) Ch.query!(pid, "CREATE TABLE IF NOT EXISTS ch_demo(id UInt64) ENGINE Null") sql = "INSERT INTO ch_demo SELECT id + {ego:Int64} FROM input('id UInt64') FORMAT RowBinary\n" -row_binary = Ch.RowBinary.encode_rows([[1], [2], [3]], ["UInt64"]) +row_binary = Ch.RowBinary.encode_many([[1], [2], [3]], ["UInt64"]) %Ch.Result{num_rows: 3} = Ch.query!(pid, [sql | row_binary], %{"ego" => -1}) @@ -197,8 +184,8 @@ CREATE TABLE ch_nulls ( """) types = ["Nullable(UInt8)", "UInt8", "UInt8"] -rows = [[nil, nil, nil]] -row_binary = Ch.RowBinary.encode_rows(rows, types) +row = [nil, nil, nil] +row_binary = Ch.RowBinary.encode_one(row, types) %Ch.Result{num_rows: 1} = Ch.query!(pid, ["INSERT INTO ch_nulls(a, b, c) FORMAT RowBinary\n" | row_binary]) @@ -219,7 +206,7 @@ INSERT INTO ch_nulls """ types = ["Nullable(UInt8)", "Nullable(UInt8)", "UInt8"] -row_binary = Ch.RowBinary.encode_rows(rows, types) +row_binary = Ch.RowBinary.encode_one(row, types) %Ch.Result{num_rows: 1} = Ch.query!(pid, [sql | row_binary]) @@ -238,7 +225,7 @@ When decoding [`String`](https://clickhouse.com/docs/en/sql-reference/data-types Ch.query!(pid, "CREATE TABLE ch_utf8(str String) ENGINE Memory") # "\x61\xF0\x80\x80\x80b" will become "a�b" on SELECT -row_binary = Ch.RowBinary.encode(:string, "\x61\xF0\x80\x80\x80b") +row_binary = Ch.RowBinary.encode(["\x61\xF0\x80\x80\x80b"], Ch.RowBinary.encoder(types: [:string])) %Ch.Result{num_rows: 1} = Ch.query!(pid, ["INSERT INTO ch_utf8(str) FORMAT RowBinary\n" | row_binary])