Skip to content

Commit

Permalink
feat(examples): additional usage examples (#126)
Browse files Browse the repository at this point in the history
  • Loading branch information
slvrtrn authored Aug 18, 2024
1 parent 950533f commit 231c465
Show file tree
Hide file tree
Showing 7 changed files with 271 additions and 0 deletions.
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ harness = false
name = "mock"
required-features = ["test-util"]

[[example]]
name = "clickhouse_cloud"
required-features = ["rustls-tls"]

[profile.release]
debug = true

Expand Down
53 changes: 53 additions & 0 deletions examples/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# ClickHouse Rust client examples

## Overview

We aim to cover various scenarios of client usage with these examples. You should be able to run any of these examples, see [How to run](#how-to-run) section below.

If something is missing, or you found a mistake in one of these examples, please open an issue or a pull request.

### General usage

- [usage.rs](usage.rs) - creating tables, executing other DDLs, inserting the data, and selecting it back. Additionally, it covers the client-side batching via the `inserter` feature, as well as `WATCH` queries. Optional cargo features: `inserter`, `watch`.
- [mock.rs](mock.rs) - writing tests with `mock` feature. Cargo features: requires `test-util`.
- [async_insert.rs](async_insert.rs) - using the server-side batching via the [asynchronous inserts](https://clickhouse.com/docs/en/optimize/asynchronous-inserts) ClickHouse feature
- [clickhouse_cloud.rs](clickhouse_cloud.rs) - using the client with ClickHouse Cloud, highlighting a few relevant settings (`wait_end_of_query`, `select_sequential_consistency`). Cargo features: requires `rustls-tls`; the code also works with `native-tls`.
- [clickhouse_settings.rs](clickhouse_settings.rs) - applying various ClickHouse settings on the query level

### Special cases

- [custom_http_client.rs](custom_http_client.rs) - using a custom Hyper client, tweaking its connection pool settings
- [custom_http_headers.rs](custom_http_headers.rs) - setting additional HTTP headers to the client, or overriding the generated ones
- [query_id.rs](query_id.rs) - setting a specific `query_id` on the query level
- [session_id.rs](session_id.rs) - using the client in the session context with temporary tables

## How to run

### Prerequisites

* An [up-to-date Rust installation](https://www.rust-lang.org/tools/install)
* ClickHouse server (see below)

### Running the examples

The examples will require a running ClickHouse server on your machine.

You could [install it directly](https://clickhouse.com/docs/en/install), or run it via Docker:

```sh
docker run -d -p 8123:8123 -p 9000:9000 --name chrs-clickhouse-server --ulimit nofile=262144:262144 clickhouse/clickhouse-server
```

Then, you should be able to run a particular example via the command-line with:

```sh
cargo run --package clickhouse --example async_insert
```

If a particular example requires a cargo feature, you could run it as follows:

```sh
cargo run --package clickhouse --example usage --features inserter watch
```

Additionally, the individual examples should be runnable via the IDE such as CLion or RustRover.
75 changes: 75 additions & 0 deletions examples/clickhouse_cloud.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
use clickhouse::sql::Identifier;
use clickhouse::Client;
use clickhouse_derive::Row;
use serde::{Deserialize, Serialize};
use std::env;

// This example requires three environment variables with your instance credentials to be set
//
// - CLICKHOUSE_URL (e.g., https://myservice.clickhouse.cloud:8443)
// - CLICKHOUSE_USER
// - CLICKHOUSE_PASSWORD
//
// Works with either `rustls-tls` or `native-tls` cargo features.

#[tokio::main]
async fn main() -> clickhouse::error::Result<()> {
let table_name = "chrs_cloud";

let client = Client::default()
.with_url(read_env_var("CLICKHOUSE_URL"))
.with_user(read_env_var("CLICKHOUSE_USER"))
.with_password(read_env_var("CLICKHOUSE_PASSWORD"));

// `wait_end_of_query` is required in this case, as we want these DDLs to be executed
// on the entire Cloud cluster before we receive the response.
// See https://clickhouse.com/docs/en/interfaces/http/#response-buffering
client
.query("DROP TABLE IF EXISTS ?")
.bind(Identifier(table_name))
.with_option("wait_end_of_query", "1")
.execute()
.await?;

// Note that you could just use MergeTree with CH Cloud, and omit the `ON CLUSTER` clause.
// The same applies to other engines as well;
// e.g., ReplacingMergeTree will become SharedReplacingMergeTree and so on.
// See https://clickhouse.com/docs/en/cloud/reference/shared-merge-tree#enabling-sharedmergetree
client
.query("CREATE TABLE ? (id Int32, name String) ENGINE MergeTree ORDER BY id")
.bind(Identifier(table_name))
.with_option("wait_end_of_query", "1")
.execute()
.await?;

let mut insert = client.insert(table_name)?;
insert
.write(&Data {
id: 42,
name: "foo".into(),
})
.await?;
insert.end().await?;

let data = client
.query("SELECT ?fields FROM ?")
.bind(Identifier(table_name))
// This setting is optional; use it when you need strong consistency guarantees on the reads
// See https://clickhouse.com/docs/en/cloud/reference/shared-merge-tree#consistency
.with_option("select_sequential_consistency", "1")
.fetch_all::<Data>()
.await?;

println!("Stored data: {data:?}");
Ok(())
}

#[derive(Debug, Serialize, Deserialize, Row)]
struct Data {
id: u32,
name: String,
}

fn read_env_var(key: &str) -> String {
env::var(key).unwrap_or_else(|_| panic!("{key} env variable should be set"))
}
30 changes: 30 additions & 0 deletions examples/custom_http_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
use std::time::Duration;

use hyper_util::client::legacy::connect::HttpConnector;
use hyper_util::client::legacy::Client as HyperClient;
use hyper_util::rt::TokioExecutor;

use clickhouse::{error::Result, Client};

#[tokio::main]
async fn main() -> Result<()> {
let connector = HttpConnector::new(); // or HttpsConnectorBuilder
let hyper_client = HyperClient::builder(TokioExecutor::new())
// For how long keep a particular idle socket alive on the client side (in milliseconds).
// It is supposed to be a fair bit less that the ClickHouse server KeepAlive timeout,
// which was by default 3 seconds for pre-23.11 versions, and 10 seconds after that.
.pool_idle_timeout(Duration::from_millis(2_500))
// Sets the maximum idle Keep-Alive connections allowed in the pool.
.pool_max_idle_per_host(4)
.build(connector);

let client = Client::with_http_client(hyper_client).with_url("http://localhost:8123");

let numbers = client
.query("SELECT number FROM system.numbers LIMIT 1")
.fetch_all::<u64>()
.await?;
println!("Numbers: {numbers:?}");

Ok(())
}
23 changes: 23 additions & 0 deletions examples/custom_http_headers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
use clickhouse::{error::Result, Client};

#[tokio::main]
async fn main() -> Result<()> {
let client = Client::default()
.with_url("http://localhost:8123")
// purposefully invalid credentials in the client configuration for the sake of this example
.with_user("...")
.with_password("...")
// these custom headers will override the auth headers generated by the client
.with_header("X-ClickHouse-User", "default")
.with_header("X-ClickHouse-Key", "")
// or, you could just add your custom headers, e.g., for proxy authentication
.with_header("X-My-Header", "hello");

let numbers = client
.query("SELECT number FROM system.numbers LIMIT 1")
.fetch_all::<u64>()
.await?;
println!("Numbers: {numbers:?}");

Ok(())
}
30 changes: 30 additions & 0 deletions examples/query_id.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
use clickhouse::{error::Result, Client};
use uuid::Uuid;

/// Besides [`Client::query`], it works similarly with [`Client::insert`] and [`Client::inserter`].
#[tokio::main]
async fn main() -> Result<()> {
let client = Client::default().with_url("http://localhost:8123");

let query_id = Uuid::new_v4().to_string();

let numbers = client
.query("SELECT number FROM system.numbers LIMIT 1")
.with_option("query_id", &query_id)
.fetch_all::<u64>()
.await?;
println!("Numbers: {numbers:?}");

// For the sake of this example, force flush the records into the system.query_log table,
// so we can immediately fetch the query information using the query_id
client.query("SYSTEM FLUSH LOGS").execute().await?;

let logged_query = client
.query("SELECT query FROM system.query_log WHERE query_id = ?")
.bind(&query_id)
.fetch_one::<String>()
.await?;
println!("Query from system.query_log: {logged_query}");

Ok(())
}
56 changes: 56 additions & 0 deletions examples/session_id.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
use clickhouse_derive::Row;
use serde::{Deserialize, Serialize};
use uuid::Uuid;

use clickhouse::sql::Identifier;
use clickhouse::{error::Result, Client};

/// Besides [`Client::with_option`], which will be applied for all requests,
/// `session_id` (and other settings) can be set separately for a particular `query`, `insert`,
/// or when using the `inserter` feature.
///
/// This example uses temporary tables feature to demonstrate the `session_id` usage.
///
/// # Important
/// With clustered deployments, due to lack of "sticky sessions", you need to be connected
/// to a _particular cluster node_ in order to properly utilize this feature, cause, for example,
/// a round-robin load-balancer will not guarantee that the consequent requests will be processed
/// by the same ClickHouse node.
///
/// See also:
/// - https://clickhouse.com/docs/en/sql-reference/statements/create/table#temporary-tables
/// - https://github.com/ClickHouse/ClickHouse/issues/21748
/// - `examples/clickhouse_settings.rs`.
#[tokio::main]
async fn main() -> Result<()> {
let table_name = "chrs_session_id";
let session_id = Uuid::new_v4().to_string();

let client = Client::default()
.with_url("http://localhost:8123")
.with_option("session_id", &session_id);

client
.query("CREATE TEMPORARY TABLE ? (i Int32)")
.bind(Identifier(table_name))
.execute()
.await?;

#[derive(Row, Serialize, Deserialize, Debug)]
struct MyRow {
i: i32,
}

let mut insert = client.insert(table_name)?;
insert.write(&MyRow { i: 42 }).await?;
insert.end().await?;

let data = client
.query("SELECT ?fields FROM ?")
.bind(Identifier(table_name))
.fetch_all::<MyRow>()
.await?;

println!("Temporary table data: {data:?}");
Ok(())
}

0 comments on commit 231c465

Please sign in to comment.