From 0e6260b7ae0298716dcfb909f096e5f5ccbb3130 Mon Sep 17 00:00:00 2001 From: Serge Klochkov <3175289+slvrtrn@users.noreply.github.com> Date: Sun, 4 Aug 2024 16:10:20 +0200 Subject: [PATCH] feat(examples): add async_insert + wait_for_async_insert example (#118) --- examples/async_insert.rs | 78 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 78 insertions(+) create mode 100644 examples/async_insert.rs diff --git a/examples/async_insert.rs b/examples/async_insert.rs new file mode 100644 index 0000000..7a9c18f --- /dev/null +++ b/examples/async_insert.rs @@ -0,0 +1,78 @@ +use std::time::{Duration, UNIX_EPOCH}; + +use serde::{Deserialize, Serialize}; + +use clickhouse::sql::Identifier; +use clickhouse::{error::Result, Client, Row}; + +// This example demonstrates how to use asynchronous inserts, avoiding client side batching of the incoming data. +// Suitable for ClickHouse Cloud, too. See https://clickhouse.com/docs/en/optimize/asynchronous-inserts + +#[derive(Debug, Serialize, Deserialize, Row)] +struct Event { + timestamp: u64, + message: String, +} + +#[tokio::main] +async fn main() -> Result<()> { + let table_name = "chrs_async_insert"; + + let client = Client::default() + .with_url("http://localhost:8123") + // https://clickhouse.com/docs/en/operations/settings/settings#async-insert + .with_option("async_insert", "1") + // https://clickhouse.com/docs/en/operations/settings/settings#wait-for-async-insert + .with_option("wait_for_async_insert", "0"); + + client + .query( + " + CREATE OR REPLACE TABLE ? ( + timestamp DateTime64(9), + message String + ) + ENGINE = MergeTree + ORDER BY timestamp + ", + ) + .bind(Identifier(table_name)) + .execute() + .await?; + + let mut insert = client.insert(table_name)?; + insert + .write(&Event { + timestamp: now(), + message: "one".into(), + }) + .await?; + insert.end().await?; + + loop { + let events = client + .query("SELECT ?fields FROM ?") + .bind(Identifier(table_name)) + .fetch_all::() + .await?; + if !events.is_empty() { + println!("Async insert was flushed"); + println!("{events:?}"); + break; + } + // If you change the `wait_for_async_insert` setting to 1, this line will never be printed; + // however, without waiting, you will see it in the console output several times, + // as the data will remain in the server buffer for a bit before the flush happens + println!("Waiting for async insert flush..."); + tokio::time::sleep(Duration::from_millis(10)).await + } + + Ok(()) +} + +fn now() -> u64 { + UNIX_EPOCH + .elapsed() + .expect("invalid system time") + .as_nanos() as u64 +}