Skip to content

Commit

Permalink
Merge pull request #131 from ClickHouse/feat/pure-rust
Browse files Browse the repository at this point in the history
feat: replace lz4 with lz4_flex and clickhouse-rs-cityhash-sys with cityhash-rs
  • Loading branch information
loyd authored Aug 20, 2024
2 parents 1d63a7d + a51fc1c commit 3ed8b86
Show file tree
Hide file tree
Showing 11 changed files with 81 additions and 76 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased] - ReleaseDate
### Changed
- Now this crate is pure Rust, no more C/C++ dependencies.
- insert: increase max size of frames to improve throughput ([#130]).
- compression: replace `lz4` sys binding with `lz4-flex` (pure Rust).
- compression: replace `clickhouse-rs-cityhash-sys` sys binding with `cityhash-rs` (pure Rust) ([#107]).

### Deprecated
- compression: `Compression::Lz4Hc` is deprecated and becomes an alias to `Compression::Lz4`.

[#130]: https://github.com/ClickHouse/clickhouse-rs/issues/130
[#107]: https://github.com/ClickHouse/clickhouse-rs/issues/107

## [0.12.1] - 2024-08-07
### Added
Expand Down
8 changes: 4 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ inserter = ["dep:quanta"]
watch = ["dep:sha-1", "dep:serde_json", "serde/derive"]
uuid = ["dep:uuid"]
time = ["dep:time"]
lz4 = ["dep:lz4", "dep:clickhouse-rs-cityhash-sys"]
lz4 = ["dep:lz4_flex", "dep:cityhash-rs"]
native-tls = ["dep:hyper-tls"]
rustls-tls = ["dep:hyper-rustls"]

Expand All @@ -78,8 +78,8 @@ static_assertions = "1.1"
sealed = "0.5"
sha-1 = { version = "0.10", optional = true }
serde_json = { version = "1.0.68", optional = true }
lz4 = { version = "1.23.3", optional = true }
clickhouse-rs-cityhash-sys = { version = "0.1.2", optional = true }
lz4_flex = { version = "0.11.3", default-features = false, features = ["std"], optional = true }
cityhash-rs = { version = "=1.0.1", optional = true } # exact version for safety
uuid = { version = "1", optional = true }
time = { version = "0.3", optional = true }
bstr = { version = "1.2", default-features = false }
Expand All @@ -95,4 +95,4 @@ serde_bytes = "0.11.4"
serde_repr = "0.1.7"
uuid = { version = "1", features = ["v4"] }
time = { version = "0.3.17", features = ["macros", "rand"] }
rand = "0.8.5"
rand = { version = "0.8.5", features = ["small_rng"] }
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# clickhouse-rs

A typed client for ClickHouse.
Official pure Rust typed client for ClickHouse DB.

[![Crates.io][crates-badge]][crates-url]
[![Documentation][docs-badge]][docs-url]
Expand Down
10 changes: 0 additions & 10 deletions benches/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,16 +102,6 @@ where
rt.block_on((f)(client, iters)).unwrap()
})
});
#[cfg(feature = "lz4")]
group.bench_function("lz4hc(4)", |b| {
b.iter_custom(|iters| {
let rt = Runtime::new().unwrap();
let client = Client::default()
.with_url(format!("http://{addr}"))
.with_compression(Compression::Lz4Hc(4));
rt.block_on((f)(client, iters)).unwrap()
})
});
group.finish();
}

Expand Down
38 changes: 34 additions & 4 deletions benches/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,36 @@ mod common;

async fn serve(
request: Request<Incoming>,
chunk: Bytes,
) -> Response<impl Body<Data = Bytes, Error = Infallible>> {
common::skip_incoming(request).await;

let chunk = Bytes::from_static(&[15; 128 * 1024]);
let stream = stream::repeat(chunk).map(|chunk| Ok(Frame::data(chunk)));

Response::new(StreamBody::new(stream))
}

fn prepare_chunk() -> Bytes {
use rand::{distributions::Standard, rngs::SmallRng, Rng, SeedableRng};

// Generate random data to avoid _real_ compression.
// TODO: It would be more useful to generate real data.
let mut rng = SmallRng::seed_from_u64(0xBA5E_FEED);
let raw: Vec<_> = (&mut rng).sample_iter(Standard).take(128 * 1024).collect();

// If the feature is enabled, compress the data even if we use the `None`
// compression. The compression ratio is low anyway due to random data.
#[cfg(feature = "lz4")]
let chunk = clickhouse::_priv::lz4_compress(&raw).unwrap();
#[cfg(not(feature = "lz4"))]
let chunk = Bytes::from(raw);

chunk
}

fn select(c: &mut Criterion) {
let addr = "127.0.0.1:6543".parse().unwrap();
let _server = common::start_server(addr, serve);
let chunk = prepare_chunk();
let _server = common::start_server(addr, move |req| serve(req, chunk.clone()));

#[allow(dead_code)]
#[derive(Debug, Row, Deserialize)]
Expand All @@ -53,7 +71,7 @@ fn select(c: &mut Criterion) {

let mut group = c.benchmark_group("select");
group.throughput(Throughput::Bytes(mem::size_of::<SomeRow>() as u64));
group.bench_function("select", |b| {
group.bench_function("no compression", |b| {
b.iter_custom(|iters| {
let rt = Runtime::new().unwrap();
let client = Client::default()
Expand All @@ -64,6 +82,18 @@ fn select(c: &mut Criterion) {
start.elapsed()
})
});
#[cfg(feature = "lz4")]
group.bench_function("lz4", |b| {
b.iter_custom(|iters| {
let rt = Runtime::new().unwrap();
let client = Client::default()
.with_url(format!("http://{addr}"))
.with_compression(Compression::Lz4);
let start = Instant::now();
rt.block_on(run(client, iters)).unwrap();
start.elapsed()
})
});
group.finish();
}

Expand Down
59 changes: 14 additions & 45 deletions src/compression/lz4.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
use std::{
os::raw::{c_char, c_int},
pin::Pin,
task::{Context, Poll},
};

use bytes::{Buf, BufMut, Bytes, BytesMut};
use cityhash_rs::cityhash_102_128;
use futures::{ready, stream::Stream};
use lz4::liblz4::LZ4_decompress_safe;
use lz4_flex::block;

use crate::{
buflist::BufList,
error::{Error, Result},
Compression,
};

const MAX_COMPRESSED_SIZE: u32 = 1024 * 1024 * 1024;
Expand Down Expand Up @@ -146,50 +145,30 @@ impl<S> Lz4Decoder<S> {
}

let mut uncompressed = vec![0u8; meta.uncompressed_size as usize];
decompress(&self.buffer[LZ4_HEADER_SIZE..], &mut uncompressed)?;
let len = decompress(&self.buffer[LZ4_HEADER_SIZE..], &mut uncompressed)?;
debug_assert_eq!(len as u32, meta.uncompressed_size);

Ok(uncompressed.into())
}
}

fn calc_checksum(buffer: &[u8]) -> u128 {
let hash = clickhouse_rs_cityhash_sys::city_hash_128(buffer);
u128::from(hash.hi) << 64 | u128::from(hash.lo)
let hash = cityhash_102_128(buffer);
hash << 64 | hash >> 64
}

fn decompress(compressed: &[u8], uncompressed: &mut [u8]) -> Result<()> {
// SAFETY: all pointers are valid and sizes are correspondingly correct.
let status = unsafe {
LZ4_decompress_safe(
compressed.as_ptr() as *const c_char,
uncompressed.as_mut_ptr() as *mut c_char,
compressed.len() as c_int,
uncompressed.len() as c_int,
)
};

if status < 0 {
return Err(Error::Decompression("can't decompress data".into()));
}

Ok(())
fn decompress(compressed: &[u8], uncompressed: &mut [u8]) -> Result<usize> {
block::decompress_into(compressed, uncompressed).map_err(|err| Error::Decompression(err.into()))
}

pub(crate) fn compress(uncompressed: &[u8], mode: Compression) -> Result<Bytes> {
do_compress(uncompressed, mode).map_err(|err| Error::Decompression(err.into()))
}

fn do_compress(uncompressed: &[u8], mode: Compression) -> std::io::Result<Bytes> {
let max_compressed_size = lz4::block::compress_bound(uncompressed.len())?;
pub(crate) fn compress(uncompressed: &[u8]) -> Result<Bytes> {
let max_compressed_size = block::get_maximum_output_size(uncompressed.len());

let mut buffer = BytesMut::new();
buffer.resize(LZ4_META_SIZE + max_compressed_size, 0);

let compressed_data_size = lz4::block::compress_to_buffer(
uncompressed,
Some(compression_mode(mode)),
false,
&mut buffer[LZ4_META_SIZE..],
)?;
let compressed_data_size = block::compress_into(uncompressed, &mut buffer[LZ4_META_SIZE..])
.map_err(|err| Error::Compression(err.into()))?;

buffer.truncate(LZ4_META_SIZE + compressed_data_size);

Expand All @@ -206,16 +185,6 @@ fn do_compress(uncompressed: &[u8], mode: Compression) -> std::io::Result<Bytes>
Ok(buffer.freeze())
}

fn compression_mode(mode: Compression) -> lz4::block::CompressionMode {
use lz4::block::CompressionMode;

match mode {
Compression::None => unreachable!(),
Compression::Lz4 => CompressionMode::DEFAULT,
Compression::Lz4Hc(level) => CompressionMode::HIGHCOMPRESSION(level),
}
}

#[tokio::test]
async fn it_decompresses() {
use futures::stream::{self, TryStreamExt};
Expand Down Expand Up @@ -273,6 +242,6 @@ fn it_compresses() {
110, 103, 3, 97, 98, 99,
];

let actual = compress(&source, Compression::Lz4).unwrap();
let actual = compress(&source).unwrap();
assert_eq!(actual, expected);
}
5 changes: 5 additions & 0 deletions src/compression/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,12 @@ pub enum Compression {
/// High compression levels are useful in networks with low bandwidth.
/// Affects only `INSERT`s, because others are compressed by the server.
/// Possible levels: `[1, 12]`. Recommended level range: `[4, 9]`.
///
/// Deprecated: `lz4_flex` doesn't support HC mode yet: [lz4_flex#165].
///
/// [lz4_flex#165]: https://github.com/PSeitz/lz4_flex/issues/165
#[cfg(feature = "lz4")]
#[deprecated(note = "use `Compression::Lz4` instead")]
Lz4Hc(i32),
}

Expand Down
3 changes: 1 addition & 2 deletions src/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use bytes::{Bytes, BytesMut};
use hyper::{self, Request};
use replace_with::replace_with_or_abort;
use serde::Serialize;
use static_assertions::const_assert;
use tokio::{
task::JoinHandle,
time::{Instant, Sleep},
Expand Down Expand Up @@ -311,7 +310,7 @@ impl<T> Insert<T> {
#[cfg(feature = "lz4")]
fn take_and_prepare_chunk(&mut self) -> Result<Bytes> {
Ok(if self.compression.is_lz4() {
let compressed = crate::compression::lz4::compress(&self.buffer, self.compression)?;
let compressed = crate::compression::lz4::compress(&self.buffer)?;
self.buffer.clear();
compressed
} else {
Expand Down
17 changes: 14 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ use hyper_util::{
rt::TokioExecutor,
};

pub use clickhouse_derive::Row;
use self::{error::Result, http_client::HttpClient};

pub use self::{compression::Compression, row::Row};
use self::{error::Result, http_client::HttpClient};
pub use clickhouse_derive::Row;

pub mod error;
pub mod insert;
Expand Down Expand Up @@ -222,8 +222,19 @@ impl Client {
watch::Watch::new(self, query)
}

/// Used internally to modify the options map of an _already cloned_ [`Client`] instance.
/// Used internally to modify the options map of an _already cloned_
/// [`Client`] instance.
pub(crate) fn add_option(&mut self, name: impl Into<String>, value: impl Into<String>) {
self.options.insert(name.into(), value.into());
}
}

/// This is a private API exported only for internal purposes.
/// Do not use it in your code directly, it doesn't follow semver.
#[doc(hidden)]
pub mod _priv {
#[cfg(feature = "lz4")]
pub fn lz4_compress(uncompressed: &[u8]) -> super::Result<bytes::Bytes> {
crate::compression::lz4::compress(uncompressed)
}
}
1 change: 1 addition & 0 deletions src/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ impl<S> Decompress<S> {
match compression {
Compression::None => Self::Plain(stream),
#[cfg(feature = "lz4")]
#[allow(deprecated)]
Compression::Lz4 | Compression::Lz4Hc(_) => Self::Lz4(Lz4Decoder::new(stream)),
}
}
Expand Down
7 changes: 0 additions & 7 deletions tests/it/compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,3 @@ async fn lz4() {
let client = prepare_database!().with_compression(Compression::Lz4);
check(client).await;
}

#[cfg(feature = "lz4")]
#[tokio::test]
async fn lz4_hc() {
let client = prepare_database!().with_compression(Compression::Lz4Hc(4));
check(client).await;
}

0 comments on commit 3ed8b86

Please sign in to comment.