Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Query::fetch_bytes #182

Open
wants to merge 27 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
bad2ea3
Add Query::fetch_raw, expose RawCursor
slvrtrn Nov 8, 2024
b85f17e
Raw streaming with newline support draft
slvrtrn Nov 8, 2024
697e719
Merge remote-tracking branch 'origin/main' into fetch_raw
slvrtrn Dec 4, 2024
cb22c7d
OutputFormat enum, move FORMAT clause to the sql builder
slvrtrn Dec 4, 2024
0b01dc3
Remove useless match
slvrtrn Dec 4, 2024
569519d
fetch_raw_with_error fix
slvrtrn Dec 4, 2024
d099d10
Merge branch 'main' into fetch_raw
slvrtrn Jan 21, 2025
b6dec0d
BytesCursor, impl Stream, AsyncBufRead, tests, examples
slvrtrn Jan 25, 2025
058c573
Merge branch 'main' into fetch_raw
slvrtrn Jan 25, 2025
db24ffa
Fix lint issues
slvrtrn Jan 25, 2025
a14a908
Merge remote-tracking branch 'origin/fetch_raw' into fetch_raw
slvrtrn Jan 25, 2025
b29ffa8
Remove unused format.rs
slvrtrn Jan 25, 2025
34b088c
Fix tests: fetch_raw.rs -> fetch_bytes.rs
slvrtrn Jan 25, 2025
e5fa2eb
feat(cursor/bytes): support tokio traits
loyd Feb 16, 2025
c50b20d
refactor(cursors): split cursor.rs into modules
loyd Feb 16, 2025
c5cf019
feat(cursors/bytes): add `BytesCursor::{received_bytes,decoded_bytes}`
loyd Feb 16, 2025
27be49a
feat(cursors/bytes): add `next()` and `collect()`
loyd Feb 19, 2025
8f24563
test(fetch_bytes): check `AsyncRead` integration
loyd Feb 19, 2025
878791a
test(benches/select): measure `fetch_bytes()`
loyd Feb 19, 2025
6d87ae2
feat(error): add `io::Error` to/from `Error`
loyd Feb 20, 2025
4985557
chore(ci/msrv): use [email protected]
loyd Feb 20, 2025
8c21f6a
test(benches/select): make bench more stable
loyd Feb 26, 2025
29297a4
fix(cursors/bytes): protect against empty chunks
loyd Feb 26, 2025
aa7d941
refactor(cursors/raw): implement `next()` based on `poll_next()`
loyd Mar 5, 2025
a7e352c
docs(cursors): note about cancel safety
loyd Mar 5, 2025
f5f30fd
feat(cursors/bytes): impl FusedStream
loyd Mar 5, 2025
70e6828
chore(ci/msrv): override litemap version
loyd Mar 5, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions examples/raw_stream_into_file.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
use clickhouse::format::OutputFormat;
use clickhouse::Client;
use tokio::fs::File;
use tokio::io::AsyncWriteExt;

/// An example of streaming the result of a query in an arbitrary format to a file.

#[tokio::main]
async fn main() -> clickhouse::error::Result<()> {
let filename = "output.csv";
let client = Client::default().with_url("http://localhost:8123");

let mut raw_cursor = client
.query(
"
SELECT number, randomPrintableASCII(20)
FROM system.numbers
LIMIT 100000
",
)
.fetch_raw(OutputFormat::CSV)?;

let mut file = File::create(filename).await.unwrap();

loop {
match raw_cursor.next().await {
Ok(None) => break,
Err(err) => return Err(err),
Ok(Some(bytes)) => {
println!("Bytes read: {}", bytes.len());
file.write_all(&bytes).await.unwrap()
}
}
}

println!("Bytes written to {filename}");
Ok(())
}
54 changes: 54 additions & 0 deletions examples/raw_stream_json_each_row.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
use std::str::from_utf8;

use serde::Deserialize;

use clickhouse::format::OutputFormat;
use clickhouse::Client;

/// An example of streaming raw data row-by-row in an arbitrary format.
/// In this case, it's JSONEachRow. Similarly, it can be used with CSV, TSV, and others;
/// the only difference will be in how the data is parsed.

#[tokio::main]
async fn main() -> clickhouse::error::Result<()> {
let client = Client::default().with_url("http://localhost:8123");

let mut raw_cursor_newline = client
.query(
"
SELECT number, hex(randomPrintableASCII(20)) AS hex_str
FROM system.numbers
LIMIT 10
",
)
// By default, ClickHouse quotes (U)Int64 in JSON* family formats;
// disable it to simplify this example.
.with_option("output_format_json_quote_64bit_integers", "0")
.fetch_raw(OutputFormat::JSONEachRow)?
.newline();

loop {
match raw_cursor_newline.next().await {
Ok(None) => break,
Err(err) => return Err(err),
Ok(Some(row_bytes)) => {
let json_str = from_utf8(row_bytes).unwrap();
let parsed_json =
serde_json::from_str::<MyRowInJSONEachRowFormat>(json_str).unwrap();
println!("Number: {}", parsed_json.number);
println!("HexStr: {}", parsed_json.hex_str);
println!("=================================================");
}
}
}

println!("Done!");
Ok(())
}

// NB: there is no `Row` derive here
#[derive(Debug, Deserialize)]
struct MyRowInJSONEachRowFormat {
number: i64,
hex_str: String,
}
50 changes: 46 additions & 4 deletions src/cursor.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::marker::PhantomData;

use bstr::ByteSlice;
use bytes::Bytes;
use futures::TryStreamExt;
use serde::Deserialize;
Expand All @@ -13,7 +14,7 @@ use crate::{

// === RawCursor ===

struct RawCursor(RawCursorInner);
pub struct RawCursor(RawCursorInner);

enum RawCursorInner {
Waiting(ResponseFuture),
Expand All @@ -27,11 +28,18 @@ struct RawCursorLoading {
}

impl RawCursor {
fn new(response: Response) -> Self {
pub(crate) fn new(response: Response) -> Self {
Self(RawCursorInner::Waiting(response.into_future()))
}

async fn next(&mut self) -> Result<Option<Bytes>> {
pub fn newline(self) -> RawCursorNewline {
RawCursorNewline {
raw: self,
bytes: BytesExt::default(),
}
}

pub async fn next(&mut self) -> Result<Option<Bytes>> {
if matches!(self.0, RawCursorInner::Waiting(_)) {
self.resolve().await?;
}
Expand Down Expand Up @@ -86,6 +94,40 @@ fn workaround_51132<'a, T: ?Sized>(ptr: &T) -> &'a T {
unsafe { &*(ptr as *const T) }
}

/// Similar to [`RawCursor`], but emits chunks of data split by the `\n` (`0x0a`) character.
/// See [`RawCursorNewline::next`] for more details.
pub struct RawCursorNewline {
raw: RawCursor,
bytes: BytesExt,
}

impl RawCursorNewline {
/// Emits a chunk of data before the next `\n` (`0x0a`) character.
///
/// With stream-friendly formats such as `CSV`, `JSONEachRow`, and similar,
/// each iteration will produce the entire row (excluding the newline character itself).
///
/// The result is unspecified if it's called after `Err` is returned.
pub async fn next<'a>(&mut self) -> Result<Option<&'a [u8]>> {
loop {
if self.bytes.remaining() > 0 {
let slice = workaround_51132(self.bytes.slice());
let newline_pos = slice.find_byte(b'\n');
if let Some(pos) = newline_pos {
let (row, rest) = slice.split_at(pos);
self.bytes.set_remaining(rest.len() - 1); // skip the newline character
return Ok(Some(row));
}
}

match self.raw.next().await? {
Some(chunk) => self.bytes.extend(chunk),
None => return Ok(None),
}
}
}
}

// === RowCursor ===

/// A cursor that emits rows.
Expand All @@ -107,7 +149,7 @@ impl<T> RowCursor<T> {

/// Emits the next row.
///
/// An result is unspecified if it's called after `Err` is returned.
/// The result is unspecified if it's called after `Err` is returned.
pub async fn next<'a, 'b: 'a>(&'a mut self) -> Result<Option<T>>
where
T: Deserialize<'b>,
Expand Down
81 changes: 81 additions & 0 deletions src/format.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
use std::fmt;

#[derive(Debug, Clone)]
pub enum OutputFormat {
TabSeparated,
TabSeparatedRaw,
TabSeparatedWithNames,
TabSeparatedWithNamesAndTypes,
TabSeparatedRawWithNames,
TabSeparatedRawWithNamesAndTypes,
Template,
CSV,
CSVWithNames,
CSVWithNamesAndTypes,
CustomSeparated,
CustomSeparatedWithNames,
CustomSeparatedWithNamesAndTypes,
SQLInsert,
Values,
Vertical,
JSON,
JSONStrings,
JSONColumns,
JSONColumnsWithMetadata,
JSONCompact,
JSONCompactStrings,
JSONCompactColumns,
JSONEachRow,
PrettyJSONEachRow,
JSONEachRowWithProgress,
JSONStringsEachRow,
JSONStringsEachRowWithProgress,
JSONCompactEachRow,
JSONCompactEachRowWithNames,
JSONCompactEachRowWithNamesAndTypes,
JSONCompactStringsEachRow,
JSONCompactStringsEachRowWithNames,
JSONCompactStringsEachRowWithNamesAndTypes,
JSONObjectEachRow,
BSONEachRow,
TSKV,
Pretty,
PrettyNoEscapes,
PrettyMonoBlock,
PrettyNoEscapesMonoBlock,
PrettyCompact,
PrettyCompactNoEscapes,
PrettyCompactMonoBlock,
PrettyCompactNoEscapesMonoBlock,
PrettySpace,
PrettySpaceNoEscapes,
PrettySpaceMonoBlock,
PrettySpaceNoEscapesMonoBlock,
Prometheus,
Protobuf,
ProtobufSingle,
ProtobufList,
Avro,
Parquet,
Arrow,
ArrowStream,
ORC,
Npy,
RowBinary,
RowBinaryWithNames,
RowBinaryWithNamesAndTypes,
Native,
Null,
XML,
CapnProto,
LineAsString,
RawBLOB,
MsgPack,
Markdown,
}

impl fmt::Display for OutputFormat {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Debug::fmt(self, f)
}
}
3 changes: 2 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ use std::{collections::HashMap, fmt::Display, sync::Arc};
pub use self::{compression::Compression, row::Row};
pub use clickhouse_derive::Row;

pub mod cursor;
pub mod error;
pub mod format;
pub mod insert;
#[cfg(feature = "inserter")]
pub mod inserter;
Expand All @@ -25,7 +27,6 @@ pub mod watch;

mod bytes_ext;
mod compression;
mod cursor;
mod headers;
mod http_client;
mod request_body;
Expand Down
11 changes: 10 additions & 1 deletion src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use serde::{Deserialize, Serialize};
use std::fmt::Display;
use url::Url;

use crate::cursor::RawCursor;
use crate::{
error::{Error, Result},
headers::with_request_headers,
Expand All @@ -16,6 +17,7 @@ use crate::{
const MAX_QUERY_LEN_TO_USE_GET: usize = 8192;

pub use crate::cursor::RowCursor;
use crate::format::OutputFormat;

#[must_use]
#[derive(Clone)]
Expand Down Expand Up @@ -84,12 +86,19 @@ impl Query {
/// ```
pub fn fetch<T: Row>(mut self) -> Result<RowCursor<T>> {
self.sql.bind_fields::<T>();
self.sql.append(" FORMAT RowBinary");
self.sql.set_output_format(OutputFormat::RowBinary);

let response = self.do_execute(true)?;
Ok(RowCursor::new(response))
}

/// Executes the query, returning a [`RawCursor`] to obtain results.
pub fn fetch_raw(mut self, format: OutputFormat) -> Result<RawCursor> {
self.sql.set_output_format(format);
let response = self.do_execute(true)?;
Ok(RawCursor::new(response))
}

/// Executes the query and returns just a single row.
///
/// Note that `T` must be owned.
Expand Down
Loading
Loading