Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

finish Ch.stream/4 #146

Draft
wants to merge 9 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Changelog

## Unreleased

- move rows payload (RowBinary, CSV, etc.) to SQL statement and remove pseudo-positional binds, making param names explicit https://github.com/plausible/ch/pull/143
- drop `:headers` from `%Ch.Result{}` but add `:data` https://github.com/plausible/ch/pull/144

## 0.2.2 (2023-12-23)

- fix query encoding for datetimes with zeroed microseconds `~U[****-**-** **:**:**.000000]` https://github.com/plausible/ch/pull/138
Expand Down
115 changes: 73 additions & 42 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,17 +75,14 @@ Ch.query!(pid, "CREATE TABLE IF NOT EXISTS ch_demo(id UInt64) ENGINE Null")
%Ch.Result{num_rows: 2} =
Ch.query!(pid, "INSERT INTO ch_demo(id) VALUES (0), (1)")

%Ch.Result{num_rows: 2} =
Ch.query!(pid, "INSERT INTO ch_demo(id) VALUES ({$0:UInt8}), ({$1:UInt32})", [0, 1])

%Ch.Result{num_rows: 2} =
Ch.query!(pid, "INSERT INTO ch_demo(id) VALUES ({a:UInt16}), ({b:UInt64})", %{"a" => 0, "b" => 1})

%Ch.Result{num_rows: 2} =
Ch.query!(pid, "INSERT INTO ch_demo(id) SELECT number FROM system.numbers LIMIT {limit:UInt8}", %{"limit" => 2})
```

#### Insert rows as [RowBinary](https://clickhouse.com/docs/en/interfaces/formats#rowbinary) (efficient)
#### Insert [RowBinary](https://clickhouse.com/docs/en/interfaces/formats#rowbinary)

```elixir
{:ok, pid} = Ch.start_link()
Expand All @@ -98,20 +95,29 @@ types = [Ch.Types.u64()]
# or
types = [:u64]

rows = [[0], [1]]
row_binary = Ch.RowBinary.encode_rows(rows, types)

%Ch.Result{num_rows: 2} =
Ch.query!(pid, "INSERT INTO ch_demo(id) FORMAT RowBinary", [[0], [1]], types: types)
Ch.query!(pid, ["INSERT INTO ch_demo(id) FORMAT RowBinary\n" | row_binary])
```

Note that RowBinary format encoding requires `:types` option to be provided.

Similarly, you can use [`RowBinaryWithNamesAndTypes`](https://clickhouse.com/docs/en/interfaces/formats#rowbinarywithnamesandtypes) which would additionally do something like a type check.

```elixir
sql = "INSERT INTO ch_demo FORMAT RowBinaryWithNamesAndTypes"
opts = [names: ["id"], types: ["UInt64"]]
rows = [[0], [1]]
names = ["id"]
types = ["UInt64"]

header = Ch.RowBinary.encode_names_and_types(names, types)
row_binary = Ch.RowBinary.encode_rows(rows, types)

%Ch.Result{num_rows: 2} = Ch.query!(pid, sql, rows, opts)
%Ch.Result{num_rows: 3} =
Ch.query!(pid, [
"INSERT INTO ch_demo FORMAT RowBinaryWithNamesAndTypes\n",
header | row_binary
])
```

#### Insert rows in custom [format](https://clickhouse.com/docs/en/interfaces/formats)
Expand All @@ -121,29 +127,57 @@ rows = [[0], [1]]

Ch.query!(pid, "CREATE TABLE IF NOT EXISTS ch_demo(id UInt64) ENGINE Null")

csv = [0, 1] |> Enum.map(&to_string/1) |> Enum.intersperse(?\n)
csv = "0\n1"

%Ch.Result{num_rows: 2} =
Ch.query!(pid, "INSERT INTO ch_demo(id) FORMAT CSV", csv, encode: false)
Ch.query!(pid, ["INSERT INTO ch_demo(id) FORMAT CSV\n" | csv])
```

#### Insert rows as chunked RowBinary stream
#### Insert [chunked](https://en.wikipedia.org/wiki/Chunked_transfer_encoding) RowBinary stream

```elixir
{:ok, pid} = Ch.start_link()

Ch.query!(pid, "CREATE TABLE IF NOT EXISTS ch_demo(id UInt64) ENGINE Null")

stream = Stream.repeatedly(fn -> [:rand.uniform(100)] end)
chunked = Stream.chunk_every(stream, 100)
encoded = Stream.map(chunked, fn chunk -> Ch.RowBinary.encode_rows(chunk, _types = ["UInt64"]) end)
ten_encoded_chunks = Stream.take(encoded, 10)
DBConnection.run(pid, fn conn ->
Stream.repeatedly(fn -> [:rand.uniform(100)] end)
|> Stream.chunk_every(100_000)
|> Stream.map(fn chunk -> Ch.RowBinary.encode_rows(chunk, _types = ["UInt64"]) end)
|> Stream.take(10)
|> Enum.into(Ch.stream(conn, "INSERT INTO ch_demo(id) FORMAT RowBinary\n"))
end)
```

#### Select rows as stream

%Ch.Result{num_rows: 1000} =
Ch.query(pid, "INSERT INTO ch_demo(id) FORMAT RowBinary", ten_encoded_chunks, encode: false)
```elixir
{:ok, pid} = Ch.start_link()

DBConnection.run(pid, fn conn ->
Ch.stream(conn, "SELECT * FROM system.numbers LIMIT {limit:UInt64}", %{"limit" => 1_000_000})
|> Stream.each(&IO.inspect/1)
|> Stream.run()
end)

# %Ch.Result{rows: [[0], [1], [2], ...]}
# %Ch.Result{rows: [[112123], [112124], [113125], ...]}
# etc.
```

This query makes a [`transfer-encoding: chunked`](https://en.wikipedia.org/wiki/Chunked_transfer_encoding) HTTP request while unfolding the stream resulting in lower memory usage.
#### Insert rows via [input](https://clickhouse.com/docs/en/sql-reference/table-functions/input) function

```elixir
{:ok, pid} = Ch.start_link()

Ch.query!(pid, "CREATE TABLE IF NOT EXISTS ch_demo(id UInt64) ENGINE Null")

sql = "INSERT INTO ch_demo SELECT id + {ego:Int64} FROM input('id UInt64') FORMAT RowBinary\n"
row_binary = Ch.RowBinary.encode_rows([[1], [2], [3]], ["UInt64"])

%Ch.Result{num_rows: 3} =
Ch.query!(pid, [sql | row_binary], %{"ego" => -1})
```

#### Query with custom [settings](https://clickhouse.com/docs/en/operations/settings/settings)

Expand All @@ -156,7 +190,7 @@ settings = [async_insert: 1]
Ch.query!(pid, "SHOW SETTINGS LIKE 'async_insert'")

%Ch.Result{rows: [["async_insert", "Bool", "1"]]} =
Ch.query!(pid, "SHOW SETTINGS LIKE 'async_insert'", [], settings: settings)
Ch.query!(pid, "SHOW SETTINGS LIKE 'async_insert'", _params = [], settings: settings)
```

## Caveats
Expand All @@ -179,13 +213,13 @@ CREATE TABLE ch_nulls (
""")

types = ["Nullable(UInt8)", "UInt8", "UInt8"]
inserted_rows = [[nil, nil, nil]]
selected_rows = [[nil, 0, 0]]
rows = [[nil, nil, nil]]
row_binary = Ch.RowBinary.encode_rows(rows, types)

%Ch.Result{num_rows: 1} =
Ch.query!(pid, "INSERT INTO ch_nulls(a, b, c) FORMAT RowBinary", inserted_rows, types: types)
Ch.query!(pid, ["INSERT INTO ch_nulls(a, b, c) FORMAT RowBinary\n" | row_binary])

%Ch.Result{rows: ^selected_rows} =
%Ch.Result{rows: [[nil, 0, 0]]} =
Ch.query!(pid, "SELECT * FROM ch_nulls")
```

Expand All @@ -197,13 +231,17 @@ However, [`input()`](https://clickhouse.com/docs/en/sql-reference/table-function
sql = """
INSERT INTO ch_nulls
SELECT * FROM input('a Nullable(UInt8), b Nullable(UInt8), c UInt8')
FORMAT RowBinary\
FORMAT RowBinary
"""

Ch.query!(pid, sql, inserted_rows, types: ["Nullable(UInt8)", "Nullable(UInt8)", "UInt8"])
types = ["Nullable(UInt8)", "Nullable(UInt8)", "UInt8"]
row_binary = Ch.RowBinary.encode_rows(rows, types)

%Ch.Result{rows: [[0], [10]]} =
Ch.query!(pid, "SELECT b FROM ch_nulls ORDER BY b")
%Ch.Result{num_rows: 1} =
Ch.query!(pid, [sql | row_binary])

%Ch.Result{rows: [_before = [nil, 0, 0], _after = [nil, 10, 0]]} =
Ch.query!(pid, "SELECT * FROM ch_nulls ORDER BY b")
```

#### UTF-8 in RowBinary
Expand All @@ -215,24 +253,17 @@ When decoding [`String`](https://clickhouse.com/docs/en/sql-reference/data-types

Ch.query!(pid, "CREATE TABLE ch_utf8(str String) ENGINE Memory")

bin = "\x61\xF0\x80\x80\x80b"
utf8 = "a�b"
# "\x61\xF0\x80\x80\x80b" will become "a�b" on SELECT
row_binary = Ch.RowBinary.encode(:string, "\x61\xF0\x80\x80\x80b")

%Ch.Result{num_rows: 1} =
Ch.query!(pid, "INSERT INTO ch_utf8(str) FORMAT RowBinary", [[bin]], types: ["String"])
Ch.query!(pid, ["INSERT INTO ch_utf8(str) FORMAT RowBinary\n" | row_binary])

%Ch.Result{rows: [[^utf8]]} =
%Ch.Result{rows: [["a�b"]]} =
Ch.query!(pid, "SELECT * FROM ch_utf8")

%Ch.Result{rows: %{"data" => [[^utf8]]}} =
pid |> Ch.query!("SELECT * FROM ch_utf8 FORMAT JSONCompact") |> Map.update!(:rows, &Jason.decode!/1)
```

To get raw binary from `String` columns use `:binary` type that skips UTF-8 checks.

```elixir
%Ch.Result{rows: [[^bin]]} =
Ch.query!(pid, "SELECT * FROM ch_utf8", [], types: [:binary])
%{"data" => [["a�b"]]} =
pid |> Ch.query!("SELECT * FROM ch_utf8 FORMAT JSONCompact").data |> Jason.decode!()
```

#### Timezones in RowBinary
Expand Down Expand Up @@ -268,7 +299,7 @@ utc = DateTime.utc_now()
taipei = DateTime.shift_zone!(utc, "Asia/Taipei")

# ** (ArgumentError) non-UTC timezones are not supported for encoding: 2023-04-26 01:49:43.044569+08:00 CST Asia/Taipei
Ch.query!(pid, "INSERT INTO ch_datetimes(datetime) FORMAT RowBinary", [[naive], [utc], [taipei]], types: ["DateTime"])
Ch.RowBinary.encode_rows([[naive], [utc], [taipei]], ["DateTime"])
```

## Benchmarks
Expand Down
10 changes: 6 additions & 4 deletions bench/insert.exs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ scheme = System.get_env("CH_SCHEME") || "http"
database = System.get_env("CH_DATABASE") || "ch_bench"

{:ok, conn} = Ch.start_link(scheme: scheme, hostname: hostname, port: port)
Ch.query!(conn, "CREATE DATABASE IF NOT EXISTS {$0:Identifier}", [database])
Ch.query!(conn, "CREATE DATABASE IF NOT EXISTS {db:Identifier}", %{"db" => database})

Ch.query!(conn, """
CREATE TABLE IF NOT EXISTS #{database}.benchmark (
Expand All @@ -18,7 +18,7 @@ CREATE TABLE IF NOT EXISTS #{database}.benchmark (
""")

types = [Ch.Types.u64(), Ch.Types.string(), Ch.Types.array(Ch.Types.u8()), Ch.Types.datetime()]
statement = "INSERT INTO #{database}.benchmark FORMAT RowBinary"
statement = "INSERT INTO #{database}.benchmark FORMAT RowBinary\n"

rows = fn count ->
Enum.map(1..count, fn i ->
Expand All @@ -32,7 +32,9 @@ Benchee.run(
%{
# "control" => fn rows -> Enum.each(rows, fn _row -> :ok end) end,
"encode" => fn rows -> RowBinary.encode_rows(rows, types) end,
"insert" => fn rows -> Ch.query!(conn, statement, rows, types: types) end,
"encode+insert" => fn rows ->
Ch.query!(conn, [statement | RowBinary.encode_rows(rows, types)])
end,
# "control stream" => fn rows -> rows |> Stream.chunk_every(60_000) |> Stream.run() end,
"encode stream" => fn rows ->
rows
Expand All @@ -46,7 +48,7 @@ Benchee.run(
|> Stream.chunk_every(60_000)
|> Stream.map(fn chunk -> RowBinary.encode_rows(chunk, types) end)

Ch.query!(conn, statement, stream, encode: false)
Ch.query!(conn, Stream.concat([statement], stream))
end
},
inputs: %{
Expand Down
65 changes: 48 additions & 17 deletions lib/ch.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,49 +2,85 @@ defmodule Ch do
@moduledoc "Minimal HTTP ClickHouse client."
alias Ch.{Connection, Query, Result}

@type common_option ::
{:database, String.t()}
| {:username, String.t()}
| {:password, String.t()}
| {:settings, Keyword.t()}
| {:timeout, timeout}

@type start_option ::
common_option
| {:scheme, String.t()}
| {:hostname, String.t()}
| {:port, :inet.port_number()}
| {:transport_opts, :gen_tcp.connect_option()}
| DBConnection.start_option()

@doc """
Start the connection process and connect to ClickHouse.

## Options

* `:scheme` - HTTP scheme, defaults to `"http"`
* `:hostname` - server hostname, defaults to `"localhost"`
* `:port` - HTTP port, defualts to `8123`
* `:scheme` - HTTP scheme, defaults to `"http"`
* `:transport_opts` - options to be given to the transport being used. See `Mint.HTTP1.connect/4` for more info
* `:database` - Database, defaults to `"default"`
* `:username` - Username
* `:password` - User password
* `:settings` - Keyword list of ClickHouse settings
* `:timeout` - HTTP receive timeout in milliseconds
* `:transport_opts` - options to be given to the transport being used. See `Mint.HTTP1.connect/4` for more info
* [`DBConnection.start_option()`](https://hexdocs.pm/db_connection/DBConnection.html#t:start_option/0)

"""
@spec start_link([start_option]) :: GenServer.on_start()
def start_link(opts \\ []) do
DBConnection.start_link(Connection, opts)
end

@doc """
Returns a supervisor child specification for a DBConnection pool.

See `start_link/1` for supported options.
"""
@spec child_spec([start_option]) :: :supervisor.child_spec()
def child_spec(opts) do
DBConnection.child_spec(Connection, opts)
end

# TODO move streaming to Ch.stream/4
@type statement :: iodata | Enumerable.t()
@type params :: %{String.t() => term} | [{String.t(), term}]

@type query_option ::
common_option
| {:command, Ch.Query.command()}
| {:headers, [{String.t(), String.t()}]}
| {:format, String.t()}
| {:decode, boolean}
| DBConnection.connection_option()

@doc """
Runs a query and returns the result as `{:ok, %Ch.Result{}}` or
`{:error, Exception.t()}` if there was a database error.

## Options

* `:timeout` - Query request timeout
* `:settings` - Keyword list of settings
* `:database` - Database
* `:username` - Username
* `:password` - User password
* `:settings` - Keyword list of settings
* `:timeout` - Query request timeout
* `:command` - Command tag for the query
* `:headers` - Custom HTTP headers for the request
* `:format` - Custom response format for the request
* `:decode` - Whether to automatically decode the response
* [`DBConnection.connection_option()`](https://hexdocs.pm/db_connection/DBConnection.html#t:connection_option/0)

"""
@spec query(DBConnection.conn(), iodata, params, Keyword.t()) ::
@spec query(DBConnection.conn(), statement, params, [query_option]) ::
{:ok, Result.t()} | {:error, Exception.t()}
when params: map | [term] | [row :: [term]] | iodata | Enumerable.t()
def query(conn, statement, params \\ [], opts \\ []) do
query = Query.build(statement, opts)

Expand All @@ -57,24 +93,19 @@ defmodule Ch do
Runs a query and returns the result or raises `Ch.Error` if
there was an error. See `query/4`.
"""
@spec query!(DBConnection.conn(), iodata, params, Keyword.t()) :: Result.t()
when params: map | [term] | [row :: [term]] | iodata | Enumerable.t()
@spec query!(DBConnection.conn(), statement, params, [query_option]) :: Result.t()
def query!(conn, statement, params \\ [], opts \\ []) do
query = Query.build(statement, opts)
DBConnection.execute!(conn, query, params, opts)
end

@doc false
@spec stream(DBConnection.t(), iodata, map | [term], Keyword.t()) :: DBConnection.Stream.t()
@doc """
Returns a stream for a query on a connection.
"""
@spec stream(DBConnection.t(), statement, params, [query_option]) :: Ch.Stream.t()
def stream(conn, statement, params \\ [], opts \\ []) do
query = Query.build(statement, opts)
DBConnection.stream(conn, query, params, opts)
end

@doc false
@spec run(DBConnection.conn(), (DBConnection.t() -> any), Keyword.t()) :: any
def run(conn, f, opts \\ []) when is_function(f, 1) do
DBConnection.run(conn, f, opts)
%Ch.Stream{conn: conn, query: query, params: params, opts: opts}
end

if Code.ensure_loaded?(Ecto.ParameterizedType) do
Expand Down
Loading
Loading