Skip to content

Commit

Permalink
relace lz4 with lz4_flex in flight ipc
Browse files Browse the repository at this point in the history
  • Loading branch information
sundy-li authored and dantengsky committed Dec 15, 2024
1 parent 964a086 commit c60191a
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 6 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/common/arrow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ itertools = { workspace = true, optional = true }
lexical-core = { version = "0.8", optional = true }
log = { workspace = true }
lz4 = { version = "1.24" }
lz4_flex = { version = "0.11.3" }
num = { version = "0.4", default-features = false, features = ["std"] }
num-traits = "0.2"
opendal = { workspace = true }
Expand Down
14 changes: 8 additions & 6 deletions src/common/arrow/src/arrow/io/ipc/compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ use crate::arrow::error::Result;
#[cfg_attr(docsrs, doc(cfg(feature = "io_ipc_compression")))]
pub fn decompress_lz4(input_buf: &[u8], output_buf: &mut [u8]) -> Result<()> {
use std::io::Read;
let mut decoder = lz4::Decoder::new(input_buf)?;
decoder.read_exact(output_buf).map_err(|e| e.into())
let _ = lz4_flex::frame::FrameDecoder::new(input_buf).read(output_buf)?;
Ok(())
}

#[cfg(feature = "io_ipc_compression")]
Expand Down Expand Up @@ -49,11 +49,13 @@ pub fn compress_lz4(input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()> {
use std::io::Write;

use crate::arrow::error::Error;
let mut encoder = lz4::EncoderBuilder::new()
.build(output_buf)
.map_err(Error::from)?;

let mut encoder = lz4_flex::frame::FrameEncoder::new(output_buf);
encoder.write_all(input_buf)?;
encoder.finish().1.map_err(|e| e.into())
encoder
.finish()
.map_err(|e| Error::External("lz4_compress".to_string(), Box::new(e)))?;
Ok(())
}

#[cfg(feature = "io_ipc_compression")]
Expand Down

0 comments on commit c60191a

Please sign in to comment.