Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP Query.fetch_raw #182

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions examples/file_write.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
use clickhouse::Client;
use tokio::fs::File;
use tokio::io::AsyncWriteExt;

/// An example of streaming the result of a query in an arbitrary format to a file.

#[tokio::main]
async fn main() -> clickhouse::error::Result<()> {
let client = Client::default().with_url("http://localhost:8123");

let mut raw_cursor = client
.query(
"
SELECT number, randomPrintableASCII(20)
FROM system.numbers
LIMIT 100000
FORMAT CSV
",
)
.fetch_raw()?;

let mut file = File::create("output_async.txt").await.unwrap();

loop {
match raw_cursor.next().await {
Ok(None) => break,
Err(err) => return Err(err),
Ok(Some(bytes)) => {
println!("Bytes read: {}", bytes.len());
file.write_all(&bytes).await.unwrap()
}
}
}

println!("Bytes written to output_async.txt");
Ok(())
}
54 changes: 54 additions & 0 deletions examples/raw_stream_json_each_row.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
use std::str::from_utf8;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another real use case is to collect everything into one Bytes (or Vec<u8>). It's nice to have such an example, too!


use serde::Deserialize;

use clickhouse::Client;

/// An example of streaming raw data row-by-row in an arbitrary format.
/// In this case, it's JSONEachRow. Similarly, it can be used with CSV, TSV, and others;
/// the only difference will be in how the data is parsed.

#[tokio::main]
async fn main() -> clickhouse::error::Result<()> {
let client = Client::default().with_url("http://localhost:8123");

let mut raw_cursor_newline = client
.query(
"
SELECT number, hex(randomPrintableASCII(20)) AS hex_str
FROM system.numbers
LIMIT 10
FORMAT JSONEachRow
",
)
// By default, ClickHouse quotes (U)Int64 in JSON* family formats;
// disable it to simplify this example.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to simplify this example

because serde_json supports u64 natively, there is no reason to use strings here

.with_option("output_format_json_quote_64bit_integers", "0")
.fetch_raw()?
.newline();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe like this?

Suggested change
.query(
"
SELECT number, hex(randomPrintableASCII(20)) AS hex_str
FROM system.numbers
LIMIT 10
FORMAT JSONEachRow
",
)
// By default, ClickHouse quotes (U)Int64 in JSON* family formats;
// disable it to simplify this example.
.with_option("output_format_json_quote_64bit_integers", "0")
.fetch_raw()?
.newline();
.query(
"
SELECT number, hex(randomPrintableASCII(20)) AS hex_str
FROM system.numbers
LIMIT 10
",
)
// By default, ClickHouse quotes (U)Int64 in JSON* family formats;
// disable it to simplify this example.
.with_option("output_format_json_quote_64bit_integers", "0")
.with_format("JSONEachRow")
.fetch_raw()?
.newline();

Copy link
Contributor

@pravic pravic Nov 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because specifying FORMAT xxx as part of the SQL feels not convenient and a bit more error prone.

Ideally, I would prefer with_format(clickhouse::Format::JsonEachRow) if the maintenance burden of mapping all formats to an enum isn't too much.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is possible after a few tweaks to the SQLBuilder. See this commit.

Copy link
Collaborator

@loyd loyd Dec 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's leave the raw API as-is. The user will quickly find an invalid FORMAT with the first query.

Same thread #182 (comment)


loop {
match raw_cursor_newline.next().await {
Ok(None) => break,
Err(err) => return Err(err),
Ok(Some(row_bytes)) => {
let json_str = from_utf8(row_bytes).unwrap();
let parsed_json =
serde_json::from_str::<MyRowInJSONEachRowFormat>(json_str).unwrap();
println!("Number: {}", parsed_json.number);
println!("HexStr: {}", parsed_json.hex_str);
println!("=================================================");
}
}
}

println!("Done!");
Ok(())
}

// NB: there is no `Row` derive here
#[derive(Debug, Deserialize)]
struct MyRowInJSONEachRowFormat {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest avoiding any use cases where using non-raw API is preferred.

Here we can use https://docs.rs/serde_json/latest/serde_json/enum.Value.html

number: i64,
hex_str: String,
}
50 changes: 46 additions & 4 deletions src/cursor.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::marker::PhantomData;

use bstr::ByteSlice;
use bytes::Bytes;
use futures::TryStreamExt;
use serde::Deserialize;
Expand All @@ -13,7 +14,7 @@ use crate::{

// === RawCursor ===

struct RawCursor(RawCursorInner);
pub struct RawCursor(RawCursorInner);
Copy link
Collaborator

@loyd loyd Dec 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

docs (preparing for warn(missing_docs) lint).


enum RawCursorInner {
Waiting(ResponseFuture),
Expand All @@ -27,11 +28,18 @@ struct RawCursorLoading {
}

impl RawCursor {
fn new(response: Response) -> Self {
pub(crate) fn new(response: Response) -> Self {
Self(RawCursorInner::Waiting(response.into_future()))
}

Copy link
Collaborator

@loyd loyd Dec 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably, we should also add into_futures03(self) -> RawStream03 (implementing https://docs.rs/futures/latest/futures/prelude/trait.Stream.html) under a new futures03 feature (to support future v1 and AsyncIterator from std when they will released).

This is de facto the standard for streams (with built-in combinators, e.g. concat) and should be used for owned types (like Bytes).

Alternatively, we can directly implement Stream for RawCursor (also under the feature).

async fn next(&mut self) -> Result<Option<Bytes>> {
pub fn newline(self) -> RawCursorNewline {
Copy link
Collaborator

@loyd loyd Dec 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The futures crate already provides the AsyncBufReadExt::lines method.

It seems we can implement the AsyncBufRead (under the futures03 feature) trait for RawCursor covering more use cases, for instance the use of io::copy. This function can be also used in the example of streaming to a file.

RawCursorNewline {
raw: self,
bytes: BytesExt::default(),
}
}

pub async fn next(&mut self) -> Result<Option<Bytes>> {
if matches!(self.0, RawCursorInner::Waiting(_)) {
self.resolve().await?;
}
Expand Down Expand Up @@ -86,6 +94,40 @@ fn workaround_51132<'a, T: ?Sized>(ptr: &T) -> &'a T {
unsafe { &*(ptr as *const T) }
}

/// Similar to [`RawCursor`], but emits chunks of data split by the `\n` (`0x0a`) character.
/// See [`RawCursorNewline::next`] for more details.
pub struct RawCursorNewline {
raw: RawCursor,
bytes: BytesExt,
}

impl RawCursorNewline {
/// Emits a chunk of data before the next `\n` (`0x0a`) character.
///
/// With stream-friendly formats such as `CSV`, `JSONEachRow`, and similar,
/// each iteration will produce the entire row (excluding the newline character itself).
///
/// The result is unspecified if it's called after `Err` is returned.
pub async fn next<'a>(&mut self) -> Result<Option<&'a [u8]>> {
loop {
if self.bytes.remaining() > 0 {
let slice = workaround_51132(self.bytes.slice());
let newline_pos = slice.find_byte(b'\n');
if let Some(pos) = newline_pos {
let (row, rest) = slice.split_at(pos);
self.bytes.set_remaining(rest.len() - 1); // skip the newline character
return Ok(Some(row));
}
}

match self.raw.next().await? {
Some(chunk) => self.bytes.extend(chunk),
None => return Ok(None),
}
}
}
}

// === RowCursor ===

/// A cursor that emits rows.
Expand All @@ -107,7 +149,7 @@ impl<T> RowCursor<T> {

/// Emits the next row.
///
/// An result is unspecified if it's called after `Err` is returned.
/// The result is unspecified if it's called after `Err` is returned.
pub async fn next<'a, 'b: 'a>(&'a mut self) -> Result<Option<T>>
where
T: Deserialize<'b>,
Expand Down
7 changes: 7 additions & 0 deletions src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use serde::Deserialize;
use std::fmt::Display;
use url::Url;

use crate::cursor::RawCursor;
use crate::{
error::{Error, Result},
headers::with_request_headers,
Expand Down Expand Up @@ -90,6 +91,12 @@ impl Query {
Ok(RowCursor::new(response))
}

/// Executes the query, returning a [`RawCursor`] to obtain results.
pub fn fetch_raw(self) -> Result<RawCursor> {
let response = self.do_execute(true)?;
Ok(RawCursor::new(response))
}

/// Executes the query and returns just a single row.
///
/// Note that `T` must be owned.
Expand Down
Loading