From c41f3c1e8b77ef0d012f3e00235059116ae17349 Mon Sep 17 00:00:00 2001 From: Nathan Fiedler Date: Thu, 26 Jan 2023 21:37:51 -0800 Subject: [PATCH] feat: add read support for v2016, v2020 chunkers In addition to the FastCDC types in the v2016 and v2020 modules, now there are StreamCDC structs that will read from a boxed Read into a buffer sized to fit the maximum chunk. While this is convenient for processing large files, it is a bit slower than using memory-mapped files with a crate such as memmap2. Added examples that demonstrate using the streaming chunkers. cargo test passes --- CHANGELOG.md | 3 + Cargo.toml | 4 +- README.md | 24 +- TODO.org | 25 +- examples/ronomon.rs | 4 +- examples/stream2016.rs | 38 +++ examples/stream2020.rs | 38 +++ examples/v2016.rs | 4 +- examples/v2020.rs | 4 +- src/lib.rs | 14 +- src/v2016/mod.rs | 515 ++++++++++++++++++++++++++++++++-------- src/v2020/mod.rs | 520 ++++++++++++++++++++++++++++++++++------- 12 files changed, 979 insertions(+), 214 deletions(-) create mode 100644 examples/stream2016.rs create mode 100644 examples/stream2020.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index 15565b6..1b2f91e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,9 +8,12 @@ This file follows the convention described at ## [Unreleased] ### Changed - **Breaking:** moved ronomon FastCDC implementation into `ronomon` module. + What was `fastcdc::FastCDC::new()` is now `fastcdc::ronomon::FastCDC::new()`. ### Added - Canonical implementation of FastCDC from 2016 paper in `v2016` module. - Canonical implementation of FastCDC from 2020 paper in `v2020` module. +- `Normalization` enum to set the normalized chunking for `v2016` and `v2020` chunkers. +- `StreamCDC`, streaming version of `FastCDC`, in `v2016` and `v2020` modules. ## [2.0.0] - 2023-01-14 ### Added diff --git a/Cargo.toml b/Cargo.toml index 8c0e040..9d426bf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,7 +16,7 @@ exclude = [ [dev-dependencies] aes = "0.8.2" byteorder = "1.4.3" -clap = { version = "4.0.32", features = ["cargo"] } +clap = { version = "4.1.4", features = ["cargo"] } ctr = "0.9.2" md-5 = "0.10.5" -memmap = "0.7.0" +memmap2 = "0.5.8" diff --git a/README.md b/README.md index 44fbf24..1346f22 100644 --- a/README.md +++ b/README.md @@ -16,9 +16,7 @@ $ cargo test ## Example Usage -Examples can be found in the `examples` directory of the source repository, -which demonstrate reading files of arbitrary size into a memory-mapped buffer -and passing them through the different chunker implementations. +Examples can be found in the `examples` directory of the source repository, which demonstrate finding chunk boundaries in a given file. There are both streaming and non-streaming examples, where the non-streaming examples can read from arbitrarily large files via the `memmap2` crate. ```shell $ cargo run --example v2020 -- --size 16384 test/fixtures/SekienAkashita.jpg @@ -47,6 +45,21 @@ assert_eq!(results[1].offset, 66549); assert_eq!(results[1].length, 42917); ``` +### Streaming + +Both the `v2016` and `v2020` modules have a streaming version of FastCDC named `StreamCDC`, which takes a boxed `Read` and uses a byte vector with capacity equal to the specified maximum chunk size. + +```rust +use std::fs::File; +use fastcdc::v2020::StreamCDC; +let source = File::open("test/fixtures/SekienAkashita.jpg").unwrap(); +let chunker = StreamCDC::new(Box::new(source), 4096, 16384, 65535); +for result in chunker { + let chunk = result.unwrap(); + println!("offset={} length={}", chunk.offset, chunk.length); +} +``` + ## Migration from pre-3.0 If you were using a release of this crate from before the 3.0 release, you will need to make a small adjustment to continue using the same implemetation as before. @@ -54,17 +67,12 @@ If you were using a release of this crate from before the 3.0 release, you will Before the 3.0 release: ```rust -use fastcdc::ronomon as fastcdc; -use std::fs; -let contents = fs::read("test/fixtures/SekienAkashita.jpg").unwrap(); let chunker = fastcdc::FastCDC::new(&contents, 8192, 16384, 32768); ``` After the 3.0 release: ```rust -use std::fs; -let contents = fs::read("test/fixtures/SekienAkashita.jpg").unwrap(); let chunker = fastcdc::ronomon::FastCDC::new(&contents, 8192, 16384, 32768); ``` diff --git a/TODO.org b/TODO.org index 918ab39..66ea2ce 100644 --- a/TODO.org +++ b/TODO.org @@ -1,16 +1,11 @@ * Action Items -** TODO Rewrite -*** TODO incorporate some form of streaming support based on =Read= -**** c.f. https://gitlab.com/asuran-rs/asuran/ (asuran-chunker, uses =fastcdc= with =Read=) -**** basically just allocate a buffer 2*max and fill it as needed -**** c.f. https://github.com/jotfs/fastcdc-go/blob/master/fastcdc.go -**** c.f. https://github.com/wxiacode/Duplicacy-FastCDC/blob/master/src/duplicacy_chunkmaker.go -*** TODO test: check if ronomon version of fastcdc produces same results as rust version -**** if so, maybe make this a requirement of the rust version of ronomon -** timing on =MSEdge-Win10.ova= with 4mb chunks -*** run with =--release= flag 7 times, drop low/high, average remaining 5 -| chunker | avg time | -|---------+----------| -| v2020 | 3.437 | -| ronomon | 4.085 | -| v2016 | 4.266 | +** Time for examples to chunk =MSEdge-Win10.ova= with 4mb chunks +*** use =time cargo run --release ...=, 7 times, drop low/high, average remaining 5 +*** note that the non-streaming examples use =memmap2= to read from the file as a slice +| chunker | avg time | +|------------+----------| +| v2020 | 3.437 | +| ronomon | 4.085 | +| v2016 | 4.266 | +| stream2020 | 5.847 | +| stream2016 | 6.659 | diff --git a/examples/ronomon.rs b/examples/ronomon.rs index 2c2c51d..1d318e2 100644 --- a/examples/ronomon.rs +++ b/examples/ronomon.rs @@ -3,7 +3,7 @@ // use clap::{arg, command, value_parser, Arg}; use fastcdc::ronomon::*; -use memmap::MmapOptions; +use memmap2::Mmap; use std::fs::File; fn main() { @@ -26,7 +26,7 @@ fn main() { let avg_size = *size as usize; let filename = matches.get_one::("INPUT").unwrap(); let file = File::open(filename).expect("cannot open file!"); - let mmap = unsafe { MmapOptions::new().map(&file).expect("cannot create mmap?") }; + let mmap = unsafe { Mmap::map(&file).expect("cannot create mmap?") }; let min_size = avg_size / 4; let max_size = avg_size * 4; let chunker = FastCDC::new(&mmap[..], min_size, avg_size, max_size); diff --git a/examples/stream2016.rs b/examples/stream2016.rs new file mode 100644 index 0000000..d82c5d0 --- /dev/null +++ b/examples/stream2016.rs @@ -0,0 +1,38 @@ +// +// Copyright (c) 2023 Nathan Fiedler +// +use clap::{arg, command, value_parser, Arg}; +use fastcdc::v2016::*; +use std::fs::File; + +fn main() { + let matches = command!("Example of using v2016 streaming chunker.") + .about("Finds the content-defined chunk boundaries of a file.") + .arg( + arg!( + -s --size "The desired average size of the chunks." + ) + .value_parser(value_parser!(u32)), + ) + .arg( + Arg::new("INPUT") + .help("Sets the input file to use") + .required(true) + .index(1), + ) + .get_matches(); + let size = matches.get_one::("size").unwrap_or(&131072); + let avg_size = *size; + let filename = matches.get_one::("INPUT").unwrap(); + let file = File::open(filename).expect("cannot open file!"); + let min_size = avg_size / 4; + let max_size = avg_size * 4; + let chunker = StreamCDC::new(Box::new(file), min_size, avg_size, max_size); + for result in chunker { + let entry = result.expect("failed to read chunk"); + println!( + "hash={} offset={} size={}", + entry.hash, entry.offset, entry.length + ); + } +} diff --git a/examples/stream2020.rs b/examples/stream2020.rs new file mode 100644 index 0000000..962d5d2 --- /dev/null +++ b/examples/stream2020.rs @@ -0,0 +1,38 @@ +// +// Copyright (c) 2023 Nathan Fiedler +// +use clap::{arg, command, value_parser, Arg}; +use fastcdc::v2020::*; +use std::fs::File; + +fn main() { + let matches = command!("Example of using v2020 streaming chunker.") + .about("Finds the content-defined chunk boundaries of a file.") + .arg( + arg!( + -s --size "The desired average size of the chunks." + ) + .value_parser(value_parser!(u32)), + ) + .arg( + Arg::new("INPUT") + .help("Sets the input file to use") + .required(true) + .index(1), + ) + .get_matches(); + let size = matches.get_one::("size").unwrap_or(&131072); + let avg_size = *size; + let filename = matches.get_one::("INPUT").unwrap(); + let file = File::open(filename).expect("cannot open file!"); + let min_size = avg_size / 4; + let max_size = avg_size * 4; + let chunker = StreamCDC::new(Box::new(file), min_size, avg_size, max_size); + for result in chunker { + let entry = result.expect("failed to read chunk"); + println!( + "hash={} offset={} size={}", + entry.hash, entry.offset, entry.length + ); + } +} diff --git a/examples/v2016.rs b/examples/v2016.rs index 41bc541..4930919 100644 --- a/examples/v2016.rs +++ b/examples/v2016.rs @@ -3,7 +3,7 @@ // use clap::{arg, command, value_parser, Arg}; use fastcdc::v2016::*; -use memmap::MmapOptions; +use memmap2::Mmap; use std::fs::File; fn main() { @@ -26,7 +26,7 @@ fn main() { let avg_size = *size; let filename = matches.get_one::("INPUT").unwrap(); let file = File::open(filename).expect("cannot open file!"); - let mmap = unsafe { MmapOptions::new().map(&file).expect("cannot create mmap?") }; + let mmap = unsafe { Mmap::map(&file).expect("cannot create mmap?") }; let min_size = avg_size / 4; let max_size = avg_size * 4; let chunker = FastCDC::new(&mmap[..], min_size, avg_size, max_size); diff --git a/examples/v2020.rs b/examples/v2020.rs index 1a1dd6e..f0b3bdc 100644 --- a/examples/v2020.rs +++ b/examples/v2020.rs @@ -3,7 +3,7 @@ // use clap::{arg, command, value_parser, Arg}; use fastcdc::v2020::*; -use memmap::MmapOptions; +use memmap2::Mmap; use std::fs::File; fn main() { @@ -26,7 +26,7 @@ fn main() { let avg_size = *size; let filename = matches.get_one::("INPUT").unwrap(); let file = File::open(filename).expect("cannot open file!"); - let mmap = unsafe { MmapOptions::new().map(&file).expect("cannot create mmap?") }; + let mmap = unsafe { Mmap::map(&file).expect("cannot create mmap?") }; let min_size = avg_size / 4; let max_size = avg_size * 4; let chunker = FastCDC::new(&mmap[..], min_size, avg_size, max_size); diff --git a/src/lib.rs b/src/lib.rs index 3c36317..e5b3058 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,5 @@ // -// Copyright (c) 2020 Nathan Fiedler +// Copyright (c) 2023 Nathan Fiedler // //! This crate implements multiple versions of the FastCDC content defined @@ -55,7 +55,7 @@ //! For a canonical implementation of the algorithm as described in the 2020 //! paper, see the `v2020` crate. This implementation produces identical cut //! points as the 2016 version, but does so a bit faster. -//! +//! //! If you are using this crate for the first time, the `v2020` implementation //! would be the most approprite. It uses 64-bit hash values and tends to be //! faster than both the `ronomon` and `v2016` versions. @@ -112,6 +112,16 @@ //! points that were determined by the maximum size rather than the data itself. //! Ideally you want cut points that are determined by the input data. However, //! this is application dependent and your situation may be different. +//! +//! ## Large Data +//! +//! If processing very large files, the streaming version of the chunkers in the +//! `v2016` and `v2020` modules may be a suitable approach. They both allocate a +//! byte vector equal to the maximum chunk size, draining and resizing the +//! vector as chunks are found. However, using a crate such as `memmap2` can be +//! significantly faster than the streaming chunkers. See the examples in the +//! `examples` directory for how to use the streaming versions as-is, versus the +//! non-streaming chunkers which read from a memory-mapped file. pub mod ronomon; pub mod v2016; diff --git a/src/v2016/mod.rs b/src/v2016/mod.rs index c19bf83..3027bcb 100644 --- a/src/v2016/mod.rs +++ b/src/v2016/mod.rs @@ -5,7 +5,7 @@ //! This module implements the canonical FastCDC algorithm as described in the //! [paper](https://www.usenix.org/system/files/conference/atc16/atc16-paper-xia.pdf) //! by Wen Xia, et al., in 2016. -//! +//! //! The algorithm incorporates a simplified hash judgement using the fast Gear //! hash, sub-minimum chunk cut-point skipping, and normalized chunking to //! produce chunks of a more consistent length. @@ -23,6 +23,12 @@ //! `hash` field of the `Chunk` struct. While this value has rather low entropy, //! it is computationally cost-free and can be put to some use with additional //! record keeping. +//! +//! The `StreamCDC` implementation is similar to `FastCDC` except that it will +//! read data from a boxed `Read` into an internal buffer of `max_size` and +//! produce `ChunkData` values from the `Iterator`. +use std::fmt; +use std::io::Read; /// Smallest acceptable value for the minimum chunk size. pub const MINIMUM_MIN: u32 = 64; @@ -149,9 +155,56 @@ const GEAR: [u64; 256] = [ 0x8e3e4221d3614413, 0xef14d0d86bf1a22c, 0xe1d830d3f16c5ddb, 0xaabd2b2a451504e1 ]; +// Find the next chunk cut point in the source. +fn cut( + source: &[u8], + min_size: usize, + avg_size: usize, + max_size: usize, + mask_s: u64, + mask_l: u64, +) -> (u64, usize) { + let mut remaining = source.len(); + if remaining <= min_size { + return (0, remaining); + } + let mut center = avg_size; + if remaining > max_size { + remaining = max_size; + } else if remaining < center { + center = remaining; + } + let mut index = min_size; + // Paraphrasing from the paper: Use the mask with more 1 bits for the + // hash judgment when the current chunking position is smaller than the + // desired size, which makes it harder to generate smaller chunks. + let mut hash: u64 = 0; + while index < center { + hash = (hash << 1).wrapping_add(GEAR[source[index] as usize]); + if (hash & mask_s) == 0 { + return (hash, index); + } + index += 1; + } + // Again, paraphrasing: use the mask with fewer 1 bits for the hash + // judgment when the current chunking position is larger than the + // desired size, which makes it easier to generate larger chunks. + let last_pos = remaining; + while index < last_pos { + hash = (hash << 1).wrapping_add(GEAR[source[index] as usize]); + if (hash & mask_l) == 0 { + return (hash, index); + } + index += 1; + } + // If all else fails, return the largest chunk. This will happen with + // pathological data, such as all zeroes. + (hash, index) +} + +/// +/// The level for the normalized chunking used by FastCDC and StreamCDC. /// -/// The level for the normalized chunking used by FastCDC. -/// /// Normalized chunking "generates chunks whose sizes are normalized to a /// specified region centered at the expected chunk size," as described in /// section 4.4 of the FastCDC 2016 paper. @@ -195,14 +248,14 @@ pub struct Chunk { /// The gear hash value as of the end of the chunk. pub hash: u64, /// Starting byte position within the source. - pub offset: u64, + pub offset: usize, /// Length of the chunk in bytes. - pub length: u32, + pub length: usize, } /// /// The FastCDC chunker implementation from 2016. -/// +/// /// Use `new` to construct an instance, and then iterate over the `Chunk`s via /// the `Iterator` trait. /// @@ -215,23 +268,23 @@ pub struct Chunk { /// starting point (cut-point skipping). /// /// ```no_run -/// use std::fs; -/// use fastcdc::v2016; +/// # use std::fs; +/// # use fastcdc::v2016::FastCDC; /// let contents = fs::read("test/fixtures/SekienAkashita.jpg").unwrap(); -/// let chunker = v2016::FastCDC::new(&contents, 8192, 16384, 65535); +/// let chunker = FastCDC::new(&contents, 8192, 16384, 65535); /// for entry in chunker { -/// println!("offset={} size={}", entry.offset, entry.length); +/// println!("offset={} length={}", entry.offset, entry.length); /// } /// ``` /// #[derive(Debug, Clone, Eq, PartialEq)] pub struct FastCDC<'a> { source: &'a [u8], - processed: u64, - remaining: u64, - min_size: u64, - avg_size: u64, - max_size: u64, + processed: usize, + remaining: usize, + min_size: usize, + avg_size: usize, + max_size: usize, mask_s: u64, mask_l: u64, } @@ -266,14 +319,13 @@ impl<'a> FastCDC<'a> { let normalization = level.bits(); let mask_s = MASKS[(bits + normalization) as usize]; let mask_l = MASKS[(bits - normalization) as usize]; - let length = source.len() as u64; Self { source, processed: 0, - remaining: length, - min_size: min_size as u64, - avg_size: avg_size as u64, - max_size: max_size as u64, + remaining: source.len(), + min_size: min_size as usize, + avg_size: avg_size as usize, + max_size: max_size as usize, mask_s, mask_l, } @@ -291,43 +343,17 @@ impl<'a> FastCDC<'a> { /// minimum chunk size, at which point this function returns a hash of 0 and /// the cut point is the end of the source data. /// - pub fn cut(&self, start: u64, mut remaining: u64) -> (u64, u64) { - if remaining <= self.min_size { - return (0, start + remaining); - } - let mut center = self.avg_size; - if remaining > self.max_size { - remaining = self.max_size; - } else if remaining < center { - center = remaining; - } - let mut index = start + self.min_size; - center += start; - // Paraphrasing from the paper: Use the mask with more 1 bits for the - // hash judgment when the current chunking position is smaller than the - // desired size, which makes it harder to generate smaller chunks. - let mut hash: u64 = 0; - while index < center { - hash = (hash << 1).wrapping_add(GEAR[self.source[index as usize] as usize]); - if (hash & self.mask_s) == 0 { - return (hash, index); - } - index += 1; - } - // Again, paraphrasing: use the mask with fewer 1 bits for the hash - // judgment when the current chunking position is larger than the - // desired size, which makes it easier to generate larger chunks. - let last_pos = start + remaining; - while index < last_pos { - hash = (hash << 1).wrapping_add(GEAR[self.source[index as usize] as usize]); - if (hash & self.mask_l) == 0 { - return (hash, index); - } - index += 1; - } - // If all else fails, return the largest chunk. This will happen with - // pathological data, such as all zeroes. - (hash, index) + pub fn cut(&self, start: usize, remaining: usize) -> (u64, usize) { + let end = start + remaining; + let (hash, count) = cut( + &self.source[start..end], + self.min_size, + self.avg_size, + self.max_size, + self.mask_s, + self.mask_l, + ); + (hash, start + count) } } @@ -349,7 +375,7 @@ impl<'a> Iterator for FastCDC<'a> { Some(Chunk { hash, offset, - length: length as u32, + length, }) } } @@ -365,6 +391,216 @@ impl<'a> Iterator for FastCDC<'a> { } } +/// +/// The error type returned from the `StreamCDC` iterator. +/// +#[derive(Debug)] +pub enum Error { + /// End of source data reached. + Empty, + /// An I/O error occurred. + IoError(std::io::Error), + /// Something unexpected happened. + Other(String), +} + +impl fmt::Display for Error { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "chunker error: {:?}", self) + } +} + +impl std::error::Error for Error {} + +impl From for Error { + fn from(error: std::io::Error) -> Self { + Error::IoError(error) + } +} + +/// +/// Represents a chunk returned from the StreamCDC iterator. +/// +#[derive(Debug, Clone, Eq, PartialEq, Hash)] +pub struct ChunkData { + /// The gear hash value as of the end of the chunk. + pub hash: u64, + /// Starting byte position within the source. + pub offset: u64, + /// Length of the chunk in bytes. + pub length: usize, + /// Source bytes contained in this chunk. + pub data: Vec, +} + +/// +/// The FastCDC chunker implementation from 2016 with streaming support. +/// +/// Use `new` to construct an instance, and then iterate over the `ChunkData`s +/// via the `Iterator` trait. +/// +/// Note that this struct allocates a `Vec` of `max_size` bytes to act as a +/// buffer when reading from the source and finding chunk boundaries. +/// +/// ```no_run +/// # use std::fs::File; +/// # use fastcdc::v2016::StreamCDC; +/// let source = File::open("test/fixtures/SekienAkashita.jpg").unwrap(); +/// let chunker = StreamCDC::new(Box::new(source), 4096, 16384, 65535); +/// for result in chunker { +/// let chunk = result.unwrap(); +/// println!("offset={} length={}", chunk.offset, chunk.length); +/// } +/// ``` +/// +pub struct StreamCDC { + /// Buffer of data from source for finding cut points. + buffer: Vec, + /// Maximum capacity of the buffer (always `max_size`). + capacity: usize, + /// Number of relevant bytes in the `buffer`. + length: usize, + /// Source from which data is read into `buffer`. + source: Box, + /// Number of bytes read from the source so far. + processed: u64, + /// True when the source produces no more data. + eof: bool, + min_size: usize, + avg_size: usize, + max_size: usize, + mask_s: u64, + mask_l: u64, +} + +impl StreamCDC { + /// + /// Construct a `StreamCDC` that will process bytes from the given source. + /// + /// Uses chunk size normalization level 1 by default. + /// + pub fn new(source: Box, min_size: u32, avg_size: u32, max_size: u32) -> Self { + StreamCDC::with_level(source, min_size, avg_size, max_size, Normalization::Level1) + } + + /// + /// Create a new `StreamCDC` with the given normalization level. + /// + pub fn with_level( + source: Box, + min_size: u32, + avg_size: u32, + max_size: u32, + level: Normalization, + ) -> Self { + assert!(min_size >= MINIMUM_MIN); + assert!(min_size <= MINIMUM_MAX); + assert!(avg_size >= AVERAGE_MIN); + assert!(avg_size <= AVERAGE_MAX); + assert!(max_size >= MAXIMUM_MIN); + assert!(max_size <= MAXIMUM_MAX); + let bits = logarithm2(avg_size); + let normalization = level.bits(); + let mask_s = MASKS[(bits + normalization) as usize]; + let mask_l = MASKS[(bits - normalization) as usize]; + Self { + buffer: vec![0_u8; max_size as usize], + capacity: max_size as usize, + length: 0, + source, + eof: false, + processed: 0, + min_size: min_size as usize, + avg_size: avg_size as usize, + max_size: max_size as usize, + mask_s, + mask_l, + } + } + + /// Fill the buffer with data from the source, returning the number of bytes + /// read (zero if end of source has been reached). + fn fill_buffer(&mut self) -> Result { + // this code originally copied from asuran crate + if self.eof { + Ok(0) + } else { + let mut all_bytes_read = 0; + while !self.eof && self.length < self.capacity { + let bytes_read = self.source.read(&mut self.buffer[self.length..])?; + if bytes_read == 0 { + self.eof = true; + } else { + self.length += bytes_read; + all_bytes_read += bytes_read; + } + } + Ok(all_bytes_read) + } + } + + /// Drains a specified number of bytes from the buffer, then resizes the + /// buffer back to `capacity` size in preparation for further reads. + fn drain_bytes(&mut self, count: usize) -> Result, Error> { + // this code originally copied from asuran crate + if count > self.length { + Err(Error::Other(format!( + "drain_bytes() called with count larger than length: {} > {}", + count, self.length + ))) + } else { + let data = self.buffer.drain(..count).collect::>(); + self.length -= count; + self.buffer.resize(self.capacity, 0_u8); + Ok(data) + } + } + + /// Find the next chunk in the source. If the end of the source has been + /// reached, returns `Error::Empty` as the error. + fn read_chunk(&mut self) -> Result { + self.fill_buffer()?; + if self.length == 0 { + Err(Error::Empty) + } else { + let (hash, count) = cut( + &self.buffer[..self.length], + self.min_size, + self.avg_size, + self.max_size, + self.mask_s, + self.mask_l, + ); + if count == 0 { + Err(Error::Empty) + } else { + let offset = self.processed; + self.processed += count as u64; + let data = self.drain_bytes(count)?; + Ok(ChunkData { + hash, + offset, + length: count, + data, + }) + } + } + } +} + +impl Iterator for StreamCDC { + type Item = Result; + + fn next(&mut self) -> Option> { + let slice = self.read_chunk(); + if let Err(Error::Empty) = slice { + None + } else { + Some(slice) + } + } +} + /// /// Base-2 logarithm function for unsigned 32-bit integers. /// @@ -375,7 +611,8 @@ fn logarithm2(value: u32) -> u32 { #[cfg(test)] mod tests { use super::*; - use std::fs; + use md5::{Digest, Md5}; + use std::fs::{self, File}; #[test] fn test_logarithm2() { @@ -474,7 +711,7 @@ mod tests { // for all zeros, always returns chunks of maximum size let array = [0u8; 10240]; let chunker = FastCDC::new(&array, 64, 256, 1024); - let mut cursor: u64 = 0; + let mut cursor: usize = 0; for _ in 0..10 { let (hash, pos) = chunker.cut(cursor, 10240 - cursor); assert_eq!(hash, 14169102344523991076); @@ -492,9 +729,9 @@ mod tests { assert!(read_result.is_ok()); let contents = read_result.unwrap(); let chunker = FastCDC::new(&contents, 4096, 16384, 65535); - let mut cursor: u64 = 0; - let mut remaining: u64 = contents.len() as u64; - let expected: Vec<(u64, u64)> = vec![ + let mut cursor: usize = 0; + let mut remaining: usize = contents.len(); + let expected: Vec<(u64, usize)> = vec![ (17968276318003433923, 21325), (4098594969649699419, 17140), (15733367461443853673, 28084), @@ -517,9 +754,9 @@ mod tests { assert!(read_result.is_ok()); let contents = read_result.unwrap(); let chunker = FastCDC::new(&contents, 8192, 32768, 131072); - let mut cursor: u64 = 0; - let mut remaining: u64 = contents.len() as u64; - let expected: Vec<(u64, u64)> = + let mut cursor: usize = 0; + let mut remaining: usize = contents.len(); + let expected: Vec<(u64, usize)> = vec![(15733367461443853673, 66549), (2504464741100432583, 42917)]; for (e_hash, e_length) in expected.iter() { let (hash, pos) = chunker.cut(cursor, remaining); @@ -537,9 +774,9 @@ mod tests { assert!(read_result.is_ok()); let contents = read_result.unwrap(); let chunker = FastCDC::new(&contents, 16384, 65536, 262144); - let mut cursor: u64 = 0; - let mut remaining: u64 = contents.len() as u64; - let expected: Vec<(u64, u64)> = vec![(2504464741100432583, 109466)]; + let mut cursor: usize = 0; + let mut remaining: usize = contents.len(); + let expected: Vec<(u64, usize)> = vec![(2504464741100432583, 109466)]; for (e_hash, e_length) in expected.iter() { let (hash, pos) = chunker.cut(cursor, remaining); assert_eq!(hash, *e_hash); @@ -550,29 +787,67 @@ mod tests { assert_eq!(remaining, 0); } + struct ExpectedChunk { + hash: u64, + offset: u64, + length: usize, + digest: String, + } + #[test] fn test_iter_sekien_16k_chunks() { let read_result = fs::read("test/fixtures/SekienAkashita.jpg"); assert!(read_result.is_ok()); let contents = read_result.unwrap(); + // The digest values are not needed here, but they serve to validate + // that the streaming version tested below is returning the correct + // chunk data on each iteration. + let expected_chunks = vec![ + ExpectedChunk { + hash: 17968276318003433923, + offset: 0, + length: 21325, + digest: "2bb52734718194617c957f5e07ee6054".into(), + }, + ExpectedChunk { + hash: 4098594969649699419, + offset: 21325, + length: 17140, + digest: "badfb0757fe081c20336902e7131f768".into(), + }, + ExpectedChunk { + hash: 15733367461443853673, + offset: 38465, + length: 28084, + digest: "18412d7414de6eb42f638351711f729d".into(), + }, + ExpectedChunk { + hash: 4509236223063678303, + offset: 66549, + length: 18217, + digest: "04fe1405fc5f960363bfcd834c056407".into(), + }, + ExpectedChunk { + hash: 2504464741100432583, + offset: 84766, + length: 24700, + digest: "1aa7ad95f274d6ba34a983946ebc5af3".into(), + }, + ]; let chunker = FastCDC::new(&contents, 4096, 16384, 65535); - let results: Vec = chunker.collect(); - assert_eq!(results.len(), 5); - assert_eq!(results[0].hash, 17968276318003433923); - assert_eq!(results[0].offset, 0); - assert_eq!(results[0].length, 21325); - assert_eq!(results[1].hash, 4098594969649699419); - assert_eq!(results[1].offset, 21325); - assert_eq!(results[1].length, 17140); - assert_eq!(results[2].hash, 15733367461443853673); - assert_eq!(results[2].offset, 38465); - assert_eq!(results[2].length, 28084); - assert_eq!(results[3].hash, 4509236223063678303); - assert_eq!(results[3].offset, 66549); - assert_eq!(results[3].length, 18217); - assert_eq!(results[4].hash, 2504464741100432583); - assert_eq!(results[4].offset, 84766); - assert_eq!(results[4].length, 24700); + let mut index = 0; + for chunk in chunker { + assert_eq!(chunk.hash, expected_chunks[index].hash); + assert_eq!(chunk.offset, expected_chunks[index].offset as usize); + assert_eq!(chunk.length, expected_chunks[index].length); + let mut hasher = Md5::new(); + hasher.update(&contents[chunk.offset..chunk.offset + chunk.length]); + let table = hasher.finalize(); + let digest = format!("{:x}", table); + assert_eq!(digest, expected_chunks[index].digest); + index += 1; + } + assert_eq!(index, 5); } #[test] @@ -581,9 +856,9 @@ mod tests { assert!(read_result.is_ok()); let contents = read_result.unwrap(); let chunker = FastCDC::with_level(&contents, 4096, 16384, 65535, Normalization::Level0); - let mut cursor: u64 = 0; - let mut remaining: u64 = contents.len() as u64; - let expected: Vec<(u64, u64)> = vec![ + let mut cursor: usize = 0; + let mut remaining: usize = contents.len(); + let expected: Vec<(u64, usize)> = vec![ (221561130519947581, 6634), (15733367461443853673, 59915), (10460176299449652894, 25597), @@ -606,9 +881,9 @@ mod tests { assert!(read_result.is_ok()); let contents = read_result.unwrap(); let chunker = FastCDC::with_level(&contents, 4096, 16384, 65535, Normalization::Level3); - let mut cursor: u64 = 0; - let mut remaining: u64 = contents.len() as u64; - let expected: Vec<(u64, u64)> = vec![ + let mut cursor: usize = 0; + let mut remaining: usize = contents.len(); + let expected: Vec<(u64, usize)> = vec![ (14582375164208481996, 17350), (13104072099671895560, 19911), (6161241554519610597, 17426), @@ -625,4 +900,60 @@ mod tests { } assert_eq!(remaining, 0); } + + #[test] + fn test_stream_sekien_16k_chunks() { + let file_result = File::open("test/fixtures/SekienAkashita.jpg"); + assert!(file_result.is_ok()); + let file = file_result.unwrap(); + // The set of expected results should match the non-streaming version. + let expected_chunks = vec![ + ExpectedChunk { + hash: 17968276318003433923, + offset: 0, + length: 21325, + digest: "2bb52734718194617c957f5e07ee6054".into(), + }, + ExpectedChunk { + hash: 4098594969649699419, + offset: 21325, + length: 17140, + digest: "badfb0757fe081c20336902e7131f768".into(), + }, + ExpectedChunk { + hash: 15733367461443853673, + offset: 38465, + length: 28084, + digest: "18412d7414de6eb42f638351711f729d".into(), + }, + ExpectedChunk { + hash: 4509236223063678303, + offset: 66549, + length: 18217, + digest: "04fe1405fc5f960363bfcd834c056407".into(), + }, + ExpectedChunk { + hash: 2504464741100432583, + offset: 84766, + length: 24700, + digest: "1aa7ad95f274d6ba34a983946ebc5af3".into(), + }, + ]; + let chunker = StreamCDC::new(Box::new(file), 4096, 16384, 65535); + let mut index = 0; + for result in chunker { + assert!(result.is_ok()); + let chunk = result.unwrap(); + assert_eq!(chunk.hash, expected_chunks[index].hash); + assert_eq!(chunk.offset, expected_chunks[index].offset); + assert_eq!(chunk.length, expected_chunks[index].length); + let mut hasher = Md5::new(); + hasher.update(&chunk.data); + let table = hasher.finalize(); + let digest = format!("{:x}", table); + assert_eq!(digest, expected_chunks[index].digest); + index += 1; + } + assert_eq!(index, 5); + } } diff --git a/src/v2020/mod.rs b/src/v2020/mod.rs index 71c8c3d..59478c4 100644 --- a/src/v2020/mod.rs +++ b/src/v2020/mod.rs @@ -5,7 +5,7 @@ //! This module implements the canonical FastCDC algorithm as described in the //! [paper](https://ieeexplore.ieee.org/document/9055082) by Wen Xia, et al., in //! 2020. -//! +//! //! The algorithm incorporates a simplified hash judgement using the fast Gear //! hash, sub-minimum chunk cut-point skipping, normalized chunking to produce //! chunks of a more consistent length, and "rolling two bytes each time". @@ -25,6 +25,12 @@ //! `hash` field of the `Chunk` struct. While this value has rather low entropy, //! it is computationally cost-free and can be put to some use with additional //! record keeping. +//! +//! The `StreamCDC` implementation is similar to `FastCDC` except that it will +//! read data from a boxed `Read` into an internal buffer of `max_size` and +//! produce `ChunkData` values from the `Iterator`. +use std::fmt; +use std::io::Read; /// Smallest acceptable value for the minimum chunk size. pub const MINIMUM_MIN: u32 = 64; @@ -225,9 +231,61 @@ const GEAR_LS: [u64; 256] = [ 0x1c7c8443a6c28826, 0xde29a1b0d7e34458, 0xc3b061a7e2d8bbb6, 0x557a56548a2a09c2 ]; +// Find the next chunk cut point in the source. +fn cut( + source: &[u8], + min_size: usize, + avg_size: usize, + max_size: usize, + mask_s: u64, + mask_l: u64, + mask_s_ls: u64, + mask_l_ls: u64, +) -> (u64, usize) { + let mut remaining = source.len(); + if remaining <= min_size { + return (0, remaining); + } + let mut center = avg_size; + if remaining > max_size { + remaining = max_size; + } else if remaining < center { + center = remaining; + } + let mut index = min_size / 2; + let mut hash: u64 = 0; + while index < center / 2 { + let a = index * 2; + hash = (hash << 2).wrapping_add(GEAR_LS[source[a as usize] as usize]); + if (hash & mask_s_ls) == 0 { + return (hash, a); + } + hash = hash.wrapping_add(GEAR[source[(a + 1) as usize] as usize]); + if (hash & mask_s) == 0 { + return (hash, a + 1); + } + index += 1; + } + while index < remaining / 2 { + let a = index * 2; + hash = (hash << 2).wrapping_add(GEAR_LS[source[a as usize] as usize]); + if (hash & mask_l_ls) == 0 { + return (hash, a); + } + hash = hash.wrapping_add(GEAR[source[(a + 1) as usize] as usize]); + if (hash & mask_l) == 0 { + return (hash, a + 1); + } + index += 1; + } + // If all else fails, return the largest chunk. This will happen with + // pathological data, such as all zeroes. + (hash, remaining) +} + /// /// The level for the normalized chunking used by FastCDC. -/// +/// /// Normalized chunking "generates chunks whose sizes are normalized to a /// specified region centered at the expected chunk size," as described in /// section 4.4 of the FastCDC 2016 paper. @@ -270,14 +328,14 @@ pub struct Chunk { /// The gear hash value as of the end of the chunk. pub hash: u64, /// Starting byte position within the source. - pub offset: u64, + pub offset: usize, /// Length of the chunk in bytes. - pub length: u32, + pub length: usize, } /// /// The FastCDC chunker implementation from 2020. -/// +/// /// Use `new` to construct an instance, and then iterate over the `Chunk`s via /// the `Iterator` trait. /// @@ -302,11 +360,11 @@ pub struct Chunk { #[derive(Debug, Clone, Eq, PartialEq)] pub struct FastCDC<'a> { source: &'a [u8], - processed: u64, - remaining: u64, - min_size: u64, - avg_size: u64, - max_size: u64, + processed: usize, + remaining: usize, + min_size: usize, + avg_size: usize, + max_size: usize, mask_s: u64, mask_l: u64, mask_s_ls: u64, @@ -343,14 +401,13 @@ impl<'a> FastCDC<'a> { let normalization = level.bits(); let mask_s = MASKS[(bits + normalization) as usize]; let mask_l = MASKS[(bits - normalization) as usize]; - let length = source.len() as u64; Self { source, processed: 0, - remaining: length, - min_size: min_size as u64, - avg_size: avg_size as u64, - max_size: max_size as u64, + remaining: source.len(), + min_size: min_size as usize, + avg_size: avg_size as usize, + max_size: max_size as usize, mask_s, mask_l, mask_s_ls: mask_s << 1, @@ -371,45 +428,19 @@ impl<'a> FastCDC<'a> { /// minimum chunk size, at which point this function returns a hash of 0 and /// the cut point is the end of the source data. /// - pub fn cut(&self, start: u64, mut remaining: u64) -> (u64, u64) { - if remaining <= self.min_size { - return (0, start + remaining); - } - let mut center = self.avg_size; - if remaining > self.max_size { - remaining = self.max_size; - } else if remaining < center { - center = remaining; - } - let mut index = self.min_size / 2; - let mut hash: u64 = 0; - while index < center / 2 { - let a = index * 2 + start; - hash = (hash << 2).wrapping_add(GEAR_LS[self.source[a as usize] as usize]); - if (hash & self.mask_s_ls) == 0 { - return (hash, a); - } - hash = hash.wrapping_add(GEAR[self.source[(a + 1) as usize] as usize]); - if (hash & self.mask_s) == 0 { - return (hash, a + 1); - } - index += 1; - } - while index < remaining / 2 { - let a = index * 2 + start; - hash = (hash << 2).wrapping_add(GEAR_LS[self.source[a as usize] as usize]); - if (hash & self.mask_l_ls) == 0 { - return (hash, a); - } - hash = hash.wrapping_add(GEAR[self.source[(a + 1) as usize] as usize]); - if (hash & self.mask_l) == 0 { - return (hash, a + 1); - } - index += 1; - } - // If all else fails, return the largest chunk. This will happen with - // pathological data, such as all zeroes. - (hash, start + remaining) + pub fn cut(&self, start: usize, remaining: usize) -> (u64, usize) { + let end = start + remaining; + let (hash, count) = cut( + &self.source[start..end], + self.min_size, + self.avg_size, + self.max_size, + self.mask_s, + self.mask_l, + self.mask_s_ls, + self.mask_l_ls, + ); + (hash, start + count) } } @@ -431,7 +462,7 @@ impl<'a> Iterator for FastCDC<'a> { Some(Chunk { hash, offset, - length: length as u32, + length, }) } } @@ -447,6 +478,222 @@ impl<'a> Iterator for FastCDC<'a> { } } +/// +/// The error type returned from the `StreamCDC` iterator. +/// +#[derive(Debug)] +pub enum Error { + /// End of source data reached. + Empty, + /// An I/O error occurred. + IoError(std::io::Error), + /// Something unexpected happened. + Other(String), +} + +impl fmt::Display for Error { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "chunker error: {:?}", self) + } +} + +impl std::error::Error for Error {} + +impl From for Error { + fn from(error: std::io::Error) -> Self { + Error::IoError(error) + } +} + +/// +/// Represents a chunk returned from the StreamCDC iterator. +/// +#[derive(Debug, Clone, Eq, PartialEq, Hash)] +pub struct ChunkData { + /// The gear hash value as of the end of the chunk. + pub hash: u64, + /// Starting byte position within the source. + pub offset: u64, + /// Length of the chunk in bytes. + pub length: usize, + /// Source bytes contained in this chunk. + pub data: Vec, +} + +/// +/// The FastCDC chunker implementation from 2016 with streaming support. +/// +/// Use `new` to construct an instance, and then iterate over the `ChunkData`s +/// via the `Iterator` trait. +/// +/// Note that this struct allocates a `Vec` of `max_size` bytes to act as a +/// buffer when reading from the source and finding chunk boundaries. +/// +/// ```no_run +/// # use std::fs::File; +/// # use fastcdc::v2020::StreamCDC; +/// let source = File::open("test/fixtures/SekienAkashita.jpg").unwrap(); +/// let chunker = StreamCDC::new(Box::new(source), 4096, 16384, 65535); +/// for result in chunker { +/// let chunk = result.unwrap(); +/// println!("offset={} length={}", chunk.offset, chunk.length); +/// } +/// ``` +/// +pub struct StreamCDC { + /// Buffer of data from source for finding cut points. + buffer: Vec, + /// Maximum capacity of the buffer (always `max_size`). + capacity: usize, + /// Number of relevant bytes in the `buffer`. + length: usize, + /// Source from which data is read into `buffer`. + source: Box, + /// Number of bytes read from the source so far. + processed: u64, + /// True when the source produces no more data. + eof: bool, + min_size: usize, + avg_size: usize, + max_size: usize, + mask_s: u64, + mask_l: u64, + mask_s_ls: u64, + mask_l_ls: u64, +} + +impl StreamCDC { + /// + /// Construct a `StreamCDC` that will process bytes from the given source. + /// + /// Uses chunk size normalization level 1 by default. + /// + pub fn new(source: Box, min_size: u32, avg_size: u32, max_size: u32) -> Self { + StreamCDC::with_level(source, min_size, avg_size, max_size, Normalization::Level1) + } + + /// + /// Create a new `StreamCDC` with the given normalization level. + /// + pub fn with_level( + source: Box, + min_size: u32, + avg_size: u32, + max_size: u32, + level: Normalization, + ) -> Self { + assert!(min_size >= MINIMUM_MIN); + assert!(min_size <= MINIMUM_MAX); + assert!(avg_size >= AVERAGE_MIN); + assert!(avg_size <= AVERAGE_MAX); + assert!(max_size >= MAXIMUM_MIN); + assert!(max_size <= MAXIMUM_MAX); + let bits = logarithm2(avg_size); + let normalization = level.bits(); + let mask_s = MASKS[(bits + normalization) as usize]; + let mask_l = MASKS[(bits - normalization) as usize]; + Self { + buffer: vec![0_u8; max_size as usize], + capacity: max_size as usize, + length: 0, + source, + eof: false, + processed: 0, + min_size: min_size as usize, + avg_size: avg_size as usize, + max_size: max_size as usize, + mask_s, + mask_l, + mask_s_ls: mask_s << 1, + mask_l_ls: mask_l << 1, + } + } + + /// Fill the buffer with data from the source, returning the number of bytes + /// read (zero if end of source has been reached). + fn fill_buffer(&mut self) -> Result { + // this code originally copied from asuran crate + if self.eof { + Ok(0) + } else { + let mut all_bytes_read = 0; + while !self.eof && self.length < self.capacity { + let bytes_read = self.source.read(&mut self.buffer[self.length..])?; + if bytes_read == 0 { + self.eof = true; + } else { + self.length += bytes_read; + all_bytes_read += bytes_read; + } + } + Ok(all_bytes_read) + } + } + + /// Drains a specified number of bytes from the buffer, then resizes the + /// buffer back to `capacity` size in preparation for further reads. + fn drain_bytes(&mut self, count: usize) -> Result, Error> { + // this code originally copied from asuran crate + if count > self.length { + Err(Error::Other(format!( + "drain_bytes() called with count larger than length: {} > {}", + count, self.length + ))) + } else { + let data = self.buffer.drain(..count).collect::>(); + self.length -= count; + self.buffer.resize(self.capacity, 0_u8); + Ok(data) + } + } + + /// Find the next chunk in the source. If the end of the source has been + /// reached, returns `Error::Empty` as the error. + fn read_chunk(&mut self) -> Result { + self.fill_buffer()?; + if self.length == 0 { + Err(Error::Empty) + } else { + let (hash, count) = cut( + &self.buffer[..self.length], + self.min_size, + self.avg_size, + self.max_size, + self.mask_s, + self.mask_l, + self.mask_s_ls, + self.mask_l_ls, + ); + if count == 0 { + Err(Error::Empty) + } else { + let offset = self.processed; + self.processed += count as u64; + let data = self.drain_bytes(count)?; + Ok(ChunkData { + hash, + offset, + length: count, + data, + }) + } + } + } +} + +impl Iterator for StreamCDC { + type Item = Result; + + fn next(&mut self) -> Option> { + let slice = self.read_chunk(); + if let Err(Error::Empty) = slice { + None + } else { + Some(slice) + } + } +} + /// /// Base-2 logarithm function for unsigned 32-bit integers. /// @@ -457,7 +704,8 @@ fn logarithm2(value: u32) -> u32 { #[cfg(test)] mod tests { use super::*; - use std::fs; + use md5::{Digest, Md5}; + use std::fs::{self, File}; #[test] fn test_logarithm2() { @@ -556,7 +804,7 @@ mod tests { // for all zeros, always returns chunks of maximum size let array = [0u8; 10240]; let chunker = FastCDC::new(&array, 64, 256, 1024); - let mut cursor: u64 = 0; + let mut cursor: usize = 0; for _ in 0..10 { let (hash, pos) = chunker.cut(cursor, 10240 - cursor); assert_eq!(hash, 14169102344523991076); @@ -574,9 +822,9 @@ mod tests { assert!(read_result.is_ok()); let contents = read_result.unwrap(); let chunker = FastCDC::new(&contents, 4096, 16384, 65535); - let mut cursor: u64 = 0; - let mut remaining: u64 = contents.len() as u64; - let expected: Vec<(u64, u64)> = vec![ + let mut cursor: usize = 0; + let mut remaining: usize = contents.len(); + let expected: Vec<(u64, usize)> = vec![ (17968276318003433923, 21325), (8197189939299398838, 17140), (13019990849178155730, 28084), @@ -599,9 +847,9 @@ mod tests { assert!(read_result.is_ok()); let contents = read_result.unwrap(); let chunker = FastCDC::new(&contents, 8192, 32768, 131072); - let mut cursor: u64 = 0; - let mut remaining: u64 = contents.len() as u64; - let expected: Vec<(u64, u64)> = + let mut cursor: usize = 0; + let mut remaining: usize = contents.len(); + let expected: Vec<(u64, usize)> = vec![(15733367461443853673, 66549), (6321136627705800457, 42917)]; for (e_hash, e_length) in expected.iter() { let (hash, pos) = chunker.cut(cursor, remaining); @@ -619,9 +867,9 @@ mod tests { assert!(read_result.is_ok()); let contents = read_result.unwrap(); let chunker = FastCDC::new(&contents, 16384, 65536, 262144); - let mut cursor: u64 = 0; - let mut remaining: u64 = contents.len() as u64; - let expected: Vec<(u64, u64)> = vec![(2504464741100432583, 109466)]; + let mut cursor: usize = 0; + let mut remaining: usize = contents.len(); + let expected: Vec<(u64, usize)> = vec![(2504464741100432583, 109466)]; for (e_hash, e_length) in expected.iter() { let (hash, pos) = chunker.cut(cursor, remaining); assert_eq!(hash, *e_hash); @@ -632,29 +880,67 @@ mod tests { assert_eq!(remaining, 0); } + struct ExpectedChunk { + hash: u64, + offset: u64, + length: usize, + digest: String, + } + #[test] fn test_iter_sekien_16k_chunks() { let read_result = fs::read("test/fixtures/SekienAkashita.jpg"); assert!(read_result.is_ok()); let contents = read_result.unwrap(); + // The digest values are not needed here, but they serve to validate + // that the streaming version tested below is returning the correct + // chunk data on each iteration. + let expected_chunks = vec![ + ExpectedChunk { + hash: 17968276318003433923, + offset: 0, + length: 21325, + digest: "2bb52734718194617c957f5e07ee6054".into(), + }, + ExpectedChunk { + hash: 8197189939299398838, + offset: 21325, + length: 17140, + digest: "badfb0757fe081c20336902e7131f768".into(), + }, + ExpectedChunk { + hash: 13019990849178155730, + offset: 38465, + length: 28084, + digest: "18412d7414de6eb42f638351711f729d".into(), + }, + ExpectedChunk { + hash: 4509236223063678303, + offset: 66549, + length: 18217, + digest: "04fe1405fc5f960363bfcd834c056407".into(), + }, + ExpectedChunk { + hash: 2504464741100432583, + offset: 84766, + length: 24700, + digest: "1aa7ad95f274d6ba34a983946ebc5af3".into(), + }, + ]; let chunker = FastCDC::new(&contents, 4096, 16384, 65535); - let results: Vec = chunker.collect(); - assert_eq!(results.len(), 5); - assert_eq!(results[0].hash, 17968276318003433923); - assert_eq!(results[0].offset, 0); - assert_eq!(results[0].length, 21325); - assert_eq!(results[1].hash, 8197189939299398838); - assert_eq!(results[1].offset, 21325); - assert_eq!(results[1].length, 17140); - assert_eq!(results[2].hash, 13019990849178155730); - assert_eq!(results[2].offset, 38465); - assert_eq!(results[2].length, 28084); - assert_eq!(results[3].hash, 4509236223063678303); - assert_eq!(results[3].offset, 66549); - assert_eq!(results[3].length, 18217); - assert_eq!(results[4].hash, 2504464741100432583); - assert_eq!(results[4].offset, 84766); - assert_eq!(results[4].length, 24700); + let mut index = 0; + for chunk in chunker { + assert_eq!(chunk.hash, expected_chunks[index].hash); + assert_eq!(chunk.offset, expected_chunks[index].offset as usize); + assert_eq!(chunk.length, expected_chunks[index].length); + let mut hasher = Md5::new(); + hasher.update(&contents[chunk.offset..chunk.offset + chunk.length]); + let table = hasher.finalize(); + let digest = format!("{:x}", table); + assert_eq!(digest, expected_chunks[index].digest); + index += 1; + } + assert_eq!(index, 5); } #[test] @@ -663,9 +949,9 @@ mod tests { assert!(read_result.is_ok()); let contents = read_result.unwrap(); let chunker = FastCDC::with_level(&contents, 4096, 16384, 65535, Normalization::Level0); - let mut cursor: u64 = 0; - let mut remaining: u64 = contents.len() as u64; - let expected: Vec<(u64, u64)> = vec![ + let mut cursor: usize = 0; + let mut remaining: usize = contents.len(); + let expected: Vec<(u64, usize)> = vec![ (443122261039895162, 6634), (15733367461443853673, 59915), (10460176299449652894, 25597), @@ -688,9 +974,9 @@ mod tests { assert!(read_result.is_ok()); let contents = read_result.unwrap(); let chunker = FastCDC::with_level(&contents, 8192, 16384, 32768, Normalization::Level3); - let mut cursor: u64 = 0; - let mut remaining: u64 = contents.len() as u64; - let expected: Vec<(u64, u64)> = vec![ + let mut cursor: usize = 0; + let mut remaining: usize = contents.len(); + let expected: Vec<(u64, usize)> = vec![ (10718006254707412376, 17350), (13104072099671895560, 19911), (12322483109039221194, 17426), @@ -707,4 +993,60 @@ mod tests { } assert_eq!(remaining, 0); } + + #[test] + fn test_stream_sekien_16k_chunks() { + let file_result = File::open("test/fixtures/SekienAkashita.jpg"); + assert!(file_result.is_ok()); + let file = file_result.unwrap(); + // The set of expected results should match the non-streaming version. + let expected_chunks = vec![ + ExpectedChunk { + hash: 17968276318003433923, + offset: 0, + length: 21325, + digest: "2bb52734718194617c957f5e07ee6054".into(), + }, + ExpectedChunk { + hash: 8197189939299398838, + offset: 21325, + length: 17140, + digest: "badfb0757fe081c20336902e7131f768".into(), + }, + ExpectedChunk { + hash: 13019990849178155730, + offset: 38465, + length: 28084, + digest: "18412d7414de6eb42f638351711f729d".into(), + }, + ExpectedChunk { + hash: 4509236223063678303, + offset: 66549, + length: 18217, + digest: "04fe1405fc5f960363bfcd834c056407".into(), + }, + ExpectedChunk { + hash: 2504464741100432583, + offset: 84766, + length: 24700, + digest: "1aa7ad95f274d6ba34a983946ebc5af3".into(), + }, + ]; + let chunker = StreamCDC::new(Box::new(file), 4096, 16384, 65535); + let mut index = 0; + for result in chunker { + assert!(result.is_ok()); + let chunk = result.unwrap(); + assert_eq!(chunk.hash, expected_chunks[index].hash); + assert_eq!(chunk.offset, expected_chunks[index].offset); + assert_eq!(chunk.length, expected_chunks[index].length); + let mut hasher = Md5::new(); + hasher.update(&chunk.data); + let table = hasher.finalize(); + let digest = format!("{:x}", table); + assert_eq!(digest, expected_chunks[index].digest); + index += 1; + } + assert_eq!(index, 5); + } }