From bad2ea357083bb48cdd8422a193bece769a7a06b Mon Sep 17 00:00:00 2001 From: slvrtrn Date: Fri, 8 Nov 2024 18:01:01 +0100 Subject: [PATCH 1/5] Add Query::fetch_raw, expose RawCursor --- examples/file_write.rs | 35 +++++++++++++++++++++++++++++++++++ src/cursor.rs | 8 ++++---- src/query.rs | 7 +++++++ 3 files changed, 46 insertions(+), 4 deletions(-) create mode 100644 examples/file_write.rs diff --git a/examples/file_write.rs b/examples/file_write.rs new file mode 100644 index 0000000..e5b8534 --- /dev/null +++ b/examples/file_write.rs @@ -0,0 +1,35 @@ +use clickhouse::Client; +use tokio::fs::File; +use tokio::io::AsyncWriteExt; + +#[tokio::main] +async fn main() -> clickhouse::error::Result<()> { + let client = Client::default().with_url("http://localhost:8123"); + + let mut raw_cursor = client + .query( + " + SELECT number, randomPrintableASCII(20) + FROM system.numbers + LIMIT 100000 + FORMAT CSV + ", + ) + .fetch_raw()?; + + let mut file = File::create("output_async.txt").await.unwrap(); + + loop { + match raw_cursor.next().await { + Ok(None) => break, + Err(err) => return Err(err), + Ok(Some(bytes)) => { + println!("Bytes read: {}", bytes.len()); + file.write_all(&bytes).await.unwrap() + } + } + } + + println!("Bytes written to output_async.txt"); + Ok(()) +} diff --git a/src/cursor.rs b/src/cursor.rs index a58603c..b03e22e 100644 --- a/src/cursor.rs +++ b/src/cursor.rs @@ -13,7 +13,7 @@ use crate::{ // === RawCursor === -struct RawCursor(RawCursorInner); +pub struct RawCursor(RawCursorInner); enum RawCursorInner { Waiting(ResponseFuture), @@ -27,11 +27,11 @@ struct RawCursorLoading { } impl RawCursor { - fn new(response: Response) -> Self { + pub(crate) fn new(response: Response) -> Self { Self(RawCursorInner::Waiting(response.into_future())) } - async fn next(&mut self) -> Result> { + pub async fn next(&mut self) -> Result> { if matches!(self.0, RawCursorInner::Waiting(_)) { self.resolve().await?; } @@ -107,7 +107,7 @@ impl RowCursor { /// Emits the next row. /// - /// An result is unspecified if it's called after `Err` is returned. + /// The result is unspecified if it's called after `Err` is returned. pub async fn next<'a, 'b: 'a>(&'a mut self) -> Result> where T: Deserialize<'b>, diff --git a/src/query.rs b/src/query.rs index 107c1a3..8d6f814 100644 --- a/src/query.rs +++ b/src/query.rs @@ -3,6 +3,7 @@ use serde::Deserialize; use std::fmt::Display; use url::Url; +use crate::cursor::RawCursor; use crate::{ error::{Error, Result}, headers::with_request_headers, @@ -90,6 +91,12 @@ impl Query { Ok(RowCursor::new(response)) } + /// Executes the query, returning a [`RawCursor`] to obtain results. + pub fn fetch_raw(self) -> Result { + let response = self.do_execute(true)?; + Ok(RawCursor::new(response)) + } + /// Executes the query and returns just a single row. /// /// Note that `T` must be owned. From b85f17ee8ae398cd017a4637e8ddd3af6274b60a Mon Sep 17 00:00:00 2001 From: slvrtrn Date: Fri, 8 Nov 2024 19:47:20 +0100 Subject: [PATCH 2/5] Raw streaming with newline support draft --- examples/file_write.rs | 2 ++ examples/raw_stream_json_each_row.rs | 54 ++++++++++++++++++++++++++++ src/cursor.rs | 42 ++++++++++++++++++++++ 3 files changed, 98 insertions(+) create mode 100644 examples/raw_stream_json_each_row.rs diff --git a/examples/file_write.rs b/examples/file_write.rs index e5b8534..29feb0c 100644 --- a/examples/file_write.rs +++ b/examples/file_write.rs @@ -2,6 +2,8 @@ use clickhouse::Client; use tokio::fs::File; use tokio::io::AsyncWriteExt; +/// An example of streaming the result of a query in an arbitrary format to a file. + #[tokio::main] async fn main() -> clickhouse::error::Result<()> { let client = Client::default().with_url("http://localhost:8123"); diff --git a/examples/raw_stream_json_each_row.rs b/examples/raw_stream_json_each_row.rs new file mode 100644 index 0000000..3c001c0 --- /dev/null +++ b/examples/raw_stream_json_each_row.rs @@ -0,0 +1,54 @@ +use std::str::from_utf8; + +use serde::Deserialize; + +use clickhouse::Client; + +/// An example of streaming raw data row-by-row in an arbitrary format. +/// In this case, it's JSONEachRow. Similarly, it can be used with CSV, TSV, and others; +/// the only difference will be in how the data is parsed. + +#[tokio::main] +async fn main() -> clickhouse::error::Result<()> { + let client = Client::default().with_url("http://localhost:8123"); + + let mut raw_cursor_newline = client + .query( + " + SELECT number, hex(randomPrintableASCII(20)) AS hex_str + FROM system.numbers + LIMIT 10 + FORMAT JSONEachRow + ", + ) + // By default, ClickHouse quotes (U)Int64 in JSON* family formats; + // disable it to simplify this example. + .with_option("output_format_json_quote_64bit_integers", "0") + .fetch_raw()? + .newline(); + + loop { + match raw_cursor_newline.next().await { + Ok(None) => break, + Err(err) => return Err(err), + Ok(Some(row_bytes)) => { + let json_str = from_utf8(row_bytes).unwrap(); + let parsed_json = + serde_json::from_str::(json_str).unwrap(); + println!("Number: {}", parsed_json.number); + println!("HexStr: {}", parsed_json.hex_str); + println!("================================================="); + } + } + } + + println!("Done!"); + Ok(()) +} + +// NB: there is no `Row` derive here +#[derive(Debug, Deserialize)] +struct MyRowInJSONEachRowFormat { + number: i64, + hex_str: String, +} diff --git a/src/cursor.rs b/src/cursor.rs index b03e22e..f5528a8 100644 --- a/src/cursor.rs +++ b/src/cursor.rs @@ -1,5 +1,6 @@ use std::marker::PhantomData; +use bstr::ByteSlice; use bytes::Bytes; use futures::TryStreamExt; use serde::Deserialize; @@ -31,6 +32,13 @@ impl RawCursor { Self(RawCursorInner::Waiting(response.into_future())) } + pub fn newline(self) -> RawCursorNewline { + RawCursorNewline { + raw: self, + bytes: BytesExt::default(), + } + } + pub async fn next(&mut self) -> Result> { if matches!(self.0, RawCursorInner::Waiting(_)) { self.resolve().await?; @@ -86,6 +94,40 @@ fn workaround_51132<'a, T: ?Sized>(ptr: &T) -> &'a T { unsafe { &*(ptr as *const T) } } +/// Similar to [`RawCursor`], but emits chunks of data split by the `\n` (`0x0a`) character. +/// See [`RawCursorNewline::next`] for more details. +pub struct RawCursorNewline { + raw: RawCursor, + bytes: BytesExt, +} + +impl RawCursorNewline { + /// Emits a chunk of data before the next `\n` (`0x0a`) character. + /// + /// With stream-friendly formats such as `CSV`, `JSONEachRow`, and similar, + /// each iteration will produce the entire row (excluding the newline character itself). + /// + /// The result is unspecified if it's called after `Err` is returned. + pub async fn next<'a>(&mut self) -> Result> { + loop { + if self.bytes.remaining() > 0 { + let slice = workaround_51132(self.bytes.slice()); + let newline_pos = slice.find_byte(b'\n'); + if let Some(pos) = newline_pos { + let (row, rest) = slice.split_at(pos); + self.bytes.set_remaining(rest.len() - 1); // skip the newline character + return Ok(Some(row)); + } + } + + match self.raw.next().await? { + Some(chunk) => self.bytes.extend(chunk), + None => return Ok(None), + } + } + } +} + // === RowCursor === /// A cursor that emits rows. From cb22c7d48ac3fb3ea6eaf1b39682715a7fe81e8d Mon Sep 17 00:00:00 2001 From: slvrtrn Date: Wed, 4 Dec 2024 17:16:51 +0100 Subject: [PATCH 3/5] OutputFormat enum, move FORMAT clause to the sql builder --- ...{file_write.rs => raw_stream_into_file.rs} | 9 ++- examples/raw_stream_json_each_row.rs | 4 +- src/format.rs | 81 +++++++++++++++++++ src/lib.rs | 3 +- src/query.rs | 6 +- src/sql/mod.rs | 40 ++++----- tests/it/fetch_raw.rs | 21 +++++ tests/it/main.rs | 1 + 8 files changed, 138 insertions(+), 27 deletions(-) rename examples/{file_write.rs => raw_stream_into_file.rs} (79%) create mode 100644 src/format.rs create mode 100644 tests/it/fetch_raw.rs diff --git a/examples/file_write.rs b/examples/raw_stream_into_file.rs similarity index 79% rename from examples/file_write.rs rename to examples/raw_stream_into_file.rs index 29feb0c..0b3d9ba 100644 --- a/examples/file_write.rs +++ b/examples/raw_stream_into_file.rs @@ -1,3 +1,4 @@ +use clickhouse::format::OutputFormat; use clickhouse::Client; use tokio::fs::File; use tokio::io::AsyncWriteExt; @@ -6,6 +7,7 @@ use tokio::io::AsyncWriteExt; #[tokio::main] async fn main() -> clickhouse::error::Result<()> { + let filename = "output.csv"; let client = Client::default().with_url("http://localhost:8123"); let mut raw_cursor = client @@ -14,12 +16,11 @@ async fn main() -> clickhouse::error::Result<()> { SELECT number, randomPrintableASCII(20) FROM system.numbers LIMIT 100000 - FORMAT CSV ", ) - .fetch_raw()?; + .fetch_raw(OutputFormat::CSV)?; - let mut file = File::create("output_async.txt").await.unwrap(); + let mut file = File::create(filename).await.unwrap(); loop { match raw_cursor.next().await { @@ -32,6 +33,6 @@ async fn main() -> clickhouse::error::Result<()> { } } - println!("Bytes written to output_async.txt"); + println!("Bytes written to {filename}"); Ok(()) } diff --git a/examples/raw_stream_json_each_row.rs b/examples/raw_stream_json_each_row.rs index 3c001c0..1b95d6a 100644 --- a/examples/raw_stream_json_each_row.rs +++ b/examples/raw_stream_json_each_row.rs @@ -2,6 +2,7 @@ use std::str::from_utf8; use serde::Deserialize; +use clickhouse::format::OutputFormat; use clickhouse::Client; /// An example of streaming raw data row-by-row in an arbitrary format. @@ -18,13 +19,12 @@ async fn main() -> clickhouse::error::Result<()> { SELECT number, hex(randomPrintableASCII(20)) AS hex_str FROM system.numbers LIMIT 10 - FORMAT JSONEachRow ", ) // By default, ClickHouse quotes (U)Int64 in JSON* family formats; // disable it to simplify this example. .with_option("output_format_json_quote_64bit_integers", "0") - .fetch_raw()? + .fetch_raw(OutputFormat::JSONEachRow)? .newline(); loop { diff --git a/src/format.rs b/src/format.rs new file mode 100644 index 0000000..f9929a3 --- /dev/null +++ b/src/format.rs @@ -0,0 +1,81 @@ +use std::fmt; + +#[derive(Debug, Clone)] +pub enum OutputFormat { + TabSeparated, + TabSeparatedRaw, + TabSeparatedWithNames, + TabSeparatedWithNamesAndTypes, + TabSeparatedRawWithNames, + TabSeparatedRawWithNamesAndTypes, + Template, + CSV, + CSVWithNames, + CSVWithNamesAndTypes, + CustomSeparated, + CustomSeparatedWithNames, + CustomSeparatedWithNamesAndTypes, + SQLInsert, + Values, + Vertical, + JSON, + JSONStrings, + JSONColumns, + JSONColumnsWithMetadata, + JSONCompact, + JSONCompactStrings, + JSONCompactColumns, + JSONEachRow, + PrettyJSONEachRow, + JSONEachRowWithProgress, + JSONStringsEachRow, + JSONStringsEachRowWithProgress, + JSONCompactEachRow, + JSONCompactEachRowWithNames, + JSONCompactEachRowWithNamesAndTypes, + JSONCompactStringsEachRow, + JSONCompactStringsEachRowWithNames, + JSONCompactStringsEachRowWithNamesAndTypes, + JSONObjectEachRow, + BSONEachRow, + TSKV, + Pretty, + PrettyNoEscapes, + PrettyMonoBlock, + PrettyNoEscapesMonoBlock, + PrettyCompact, + PrettyCompactNoEscapes, + PrettyCompactMonoBlock, + PrettyCompactNoEscapesMonoBlock, + PrettySpace, + PrettySpaceNoEscapes, + PrettySpaceMonoBlock, + PrettySpaceNoEscapesMonoBlock, + Prometheus, + Protobuf, + ProtobufSingle, + ProtobufList, + Avro, + Parquet, + Arrow, + ArrowStream, + ORC, + Npy, + RowBinary, + RowBinaryWithNames, + RowBinaryWithNamesAndTypes, + Native, + Null, + XML, + CapnProto, + LineAsString, + RawBLOB, + MsgPack, + Markdown, +} + +impl fmt::Display for OutputFormat { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Debug::fmt(self, f) + } +} diff --git a/src/lib.rs b/src/lib.rs index f1e4eb3..7cd64e7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -11,7 +11,9 @@ use std::{collections::HashMap, fmt::Display, sync::Arc}; pub use self::{compression::Compression, row::Row}; pub use clickhouse_derive::Row; +pub mod cursor; pub mod error; +pub mod format; pub mod insert; #[cfg(feature = "inserter")] pub mod inserter; @@ -25,7 +27,6 @@ pub mod watch; mod bytes_ext; mod compression; -mod cursor; mod headers; mod http_client; mod request_body; diff --git a/src/query.rs b/src/query.rs index 2d4c22a..b044005 100644 --- a/src/query.rs +++ b/src/query.rs @@ -17,6 +17,7 @@ use crate::{ const MAX_QUERY_LEN_TO_USE_GET: usize = 8192; pub use crate::cursor::RowCursor; +use crate::format::OutputFormat; #[must_use] #[derive(Clone)] @@ -85,14 +86,15 @@ impl Query { /// ``` pub fn fetch(mut self) -> Result> { self.sql.bind_fields::(); - self.sql.append(" FORMAT RowBinary"); + self.sql.set_output_format(OutputFormat::RowBinary); let response = self.do_execute(true)?; Ok(RowCursor::new(response)) } /// Executes the query, returning a [`RawCursor`] to obtain results. - pub fn fetch_raw(self) -> Result { + pub fn fetch_raw(mut self, format: OutputFormat) -> Result { + self.sql.set_output_format(format); let response = self.do_execute(true)?; Ok(RawCursor::new(response)) } diff --git a/src/sql/mod.rs b/src/sql/mod.rs index 66330f6..9ec1c3d 100644 --- a/src/sql/mod.rs +++ b/src/sql/mod.rs @@ -5,6 +5,7 @@ use crate::{ row::{self, Row}, }; +use crate::format::OutputFormat; pub use bind::{Bind, Identifier}; mod bind; @@ -13,7 +14,7 @@ pub(crate) mod ser; #[derive(Debug, Clone)] pub(crate) enum SqlBuilder { - InProgress(Vec), + InProgress(Vec, Option), Failed(String), } @@ -21,7 +22,6 @@ pub(crate) enum SqlBuilder { pub(crate) enum Part { Arg, Fields, - Str(&'static str), Text(String), } @@ -29,15 +29,17 @@ pub(crate) enum Part { impl fmt::Display for SqlBuilder { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { - SqlBuilder::InProgress(parts) => { + SqlBuilder::InProgress(parts, output_format_opt) => { for part in parts { match part { Part::Arg => f.write_char('?')?, Part::Fields => f.write_str("?fields")?, - Part::Str(text) => f.write_str(text)?, Part::Text(text) => f.write_str(text)?, } } + if let Some(output_format) = output_format_opt { + f.write_str(&format!(" FORMAT {output_format}"))? + } } SqlBuilder::Failed(err) => f.write_str(err)?, } @@ -71,11 +73,17 @@ impl SqlBuilder { parts.push(Part::Text(rest.to_string())); } - SqlBuilder::InProgress(parts) + SqlBuilder::InProgress(parts, None) + } + + pub(crate) fn set_output_format(&mut self, format: OutputFormat) { + if let Self::InProgress(_, format_opt) = self { + *format_opt = Some(format); + } } pub(crate) fn bind_arg(&mut self, value: impl Bind) { - let Self::InProgress(parts) = self else { + let Self::InProgress(parts, _) = self else { return; }; @@ -93,7 +101,7 @@ impl SqlBuilder { } pub(crate) fn bind_fields(&mut self) { - let Self::InProgress(parts) = self else { + let Self::InProgress(parts, _) = self else { return; }; @@ -106,21 +114,12 @@ impl SqlBuilder { } } - pub(crate) fn append(&mut self, suffix: &'static str) { - let Self::InProgress(parts) = self else { - return; - }; - - parts.push(Part::Str(suffix)); - } - pub(crate) fn finish(mut self) -> Result { let mut sql = String::new(); - if let Self::InProgress(parts) = &self { + if let Self::InProgress(parts, _) = &self { for part in parts { match part { - Part::Str(text) => sql.push_str(text), Part::Text(text) => sql.push_str(text), Part::Arg => { self.error("unbound query argument"); @@ -135,7 +134,12 @@ impl SqlBuilder { } match self { - Self::InProgress(_) => Ok(sql), + Self::InProgress(_, output_format_opt) => { + if let Some(output_format) = output_format_opt { + sql.push_str(&format!(" FORMAT {output_format}")) + } + Ok(sql) + } Self::Failed(err) => Err(Error::InvalidParams(err.into())), } } diff --git a/tests/it/fetch_raw.rs b/tests/it/fetch_raw.rs new file mode 100644 index 0000000..1f79500 --- /dev/null +++ b/tests/it/fetch_raw.rs @@ -0,0 +1,21 @@ +use clickhouse::format::OutputFormat; + +#[tokio::test] +async fn fetch_raw_with_error() { + let client = prepare_database!(); + + let mut raw_cursor = client + .query("SELECT sleepEachRow(0.01) AS s FROM system.numbers LIMIT 30") + .with_option("max_block_size", "1") + .with_option("max_execution_time", "0.015") + .fetch_raw(OutputFormat::JSONEachRow) + .unwrap(); + + let err = match raw_cursor.next().await { + Ok(res) => Ok(res), + Err(err) => Err(err), + }; + + println!("{err:?}"); + assert!(matches!(err, Err(clickhouse::error::Error::BadResponse(_)))); +} diff --git a/tests/it/main.rs b/tests/it/main.rs index 87995b4..bbcf888 100644 --- a/tests/it/main.rs +++ b/tests/it/main.rs @@ -57,6 +57,7 @@ async fn flush_query_log(client: &Client) { mod compression; mod cursor_error; mod cursor_stats; +mod fetch_raw; mod insert; mod inserter; mod ip; From 0b01dc31b6342df9e4cdcd084cd6fab1326a1f5c Mon Sep 17 00:00:00 2001 From: slvrtrn Date: Wed, 4 Dec 2024 17:19:15 +0100 Subject: [PATCH 4/5] Remove useless match --- tests/it/fetch_raw.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/tests/it/fetch_raw.rs b/tests/it/fetch_raw.rs index 1f79500..cf45159 100644 --- a/tests/it/fetch_raw.rs +++ b/tests/it/fetch_raw.rs @@ -11,10 +11,7 @@ async fn fetch_raw_with_error() { .fetch_raw(OutputFormat::JSONEachRow) .unwrap(); - let err = match raw_cursor.next().await { - Ok(res) => Ok(res), - Err(err) => Err(err), - }; + let err = raw_cursor.next().await; println!("{err:?}"); assert!(matches!(err, Err(clickhouse::error::Error::BadResponse(_)))); From 569519d3fc88a7e4be38ca8f9b5e367119ae054b Mon Sep 17 00:00:00 2001 From: slvrtrn Date: Wed, 4 Dec 2024 17:22:00 +0100 Subject: [PATCH 5/5] fetch_raw_with_error fix --- tests/it/fetch_raw.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/it/fetch_raw.rs b/tests/it/fetch_raw.rs index cf45159..a71a433 100644 --- a/tests/it/fetch_raw.rs +++ b/tests/it/fetch_raw.rs @@ -5,9 +5,9 @@ async fn fetch_raw_with_error() { let client = prepare_database!(); let mut raw_cursor = client - .query("SELECT sleepEachRow(0.01) AS s FROM system.numbers LIMIT 30") + .query("SELECT sleepEachRow(0.05) AS s FROM system.numbers LIMIT 30") .with_option("max_block_size", "1") - .with_option("max_execution_time", "0.015") + .with_option("max_execution_time", "0.08") .fetch_raw(OutputFormat::JSONEachRow) .unwrap();