diff --git a/Cargo.lock b/Cargo.lock index a650878..a4f090a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -118,6 +118,17 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" +[[package]] +name = "average" +version = "0.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d804c74bb2d66e9b7047658d21af0f1c937d7d2466410cbf1aed3b0c04048d4" +dependencies = [ + "easy-cast", + "float-ord", + "num-traits", +] + [[package]] name = "backtrace" version = "0.3.68" @@ -189,6 +200,7 @@ dependencies = [ name = "brro-compressor" version = "0.1.0" dependencies = [ + "average", "bincode", "clap", "env_logger", @@ -387,6 +399,15 @@ version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "347675b2993d588e8506457ea2de0e64a89ad0fcbc0e79d07d25f50542f40b59" +[[package]] +name = "easy-cast" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "10936778145f3bea71fd9bf61332cce28c28e96a380714f7ab34838b80733fd6" +dependencies = [ + "libm", +] + [[package]] name = "either" version = "1.8.1" @@ -451,6 +472,12 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" +[[package]] +name = "float-ord" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ce81f49ae8a0482e4c55ea62ebbd7e5a686af544c00b9d090bba3ff9be97b3d" + [[package]] name = "fnv" version = "1.0.7" @@ -824,6 +851,12 @@ version = "0.2.147" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b4668fb0ea861c1df094127ac5f1da3409a82116a4ba74fca2e58ef927159bb3" +[[package]] +name = "libm" +version = "0.2.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ec2a862134d2a7d32d7983ddcdd1c4923530833c9f2ea1a44fc5fa473989058" + [[package]] name = "linux-raw-sys" version = "0.3.8" @@ -971,6 +1004,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "578ede34cf02f8924ab9447f50c28075b4d3e5b269972345e7e0372b38c6cdcd" dependencies = [ "autocfg", + "libm", ] [[package]] diff --git a/brro-compressor/Cargo.toml b/brro-compressor/Cargo.toml index 19f642f..b82e3fb 100644 --- a/brro-compressor/Cargo.toml +++ b/brro-compressor/Cargo.toml @@ -14,7 +14,7 @@ clap = {version = "4.3.14", features = ["derive"] } bincode = "2.0.0-rc.3" rustfft = "6.1.0" tempfile = "3.2" - +average = "0.14.1" regex = "1.9.1" hound = "3.5" median = "0.3.2" \ No newline at end of file diff --git a/brro-compressor/src/main.rs b/brro-compressor/src/main.rs index bf8c096..1b1451c 100644 --- a/brro-compressor/src/main.rs +++ b/brro-compressor/src/main.rs @@ -108,8 +108,7 @@ fn process_single_file(arguments: &Args) -> Result<(), Box> { fn compress_data(vec: &[f64], tag: &MetricTag, arguments: &Args) -> Vec { debug!("Compressing data!"); let optimizer_results = optimizer::process_data(vec, tag); - let _optimizer_results_f: Vec = optimizer_results.iter().map(|&x| x as f64).collect(); - + debug!("Samples in: {}, Samples out: {}", vec.len(), optimizer_results.len()); let mut cs = CompressedStream::new(); let compressor = match arguments.compressor { CompressorType::Noop => Compressor::Noop, @@ -120,7 +119,7 @@ fn compress_data(vec: &[f64], tag: &MetricTag, arguments: &Args) -> Vec { CompressorType::Wavelet => Compressor::Wavelet }; - cs.compress_chunk_with(vec, compressor); + cs.compress_chunk_with(&optimizer_results, compressor); cs.to_bytes() } diff --git a/brro-compressor/src/optimizer/mod.rs b/brro-compressor/src/optimizer/mod.rs index bf4283f..986f12c 100644 --- a/brro-compressor/src/optimizer/mod.rs +++ b/brro-compressor/src/optimizer/mod.rs @@ -1,70 +1,170 @@ -// Lucas - Once the project is far enough along I strongly reccomend reenabling dead code checks -#![allow(dead_code)] - -use median::Filter; use log::debug; use types::metric_tag::MetricTag; -use crate::types; - -impl MetricTag { - #[allow(clippy::wrong_self_convention)] - fn from_float(&self, x: f64) -> i64 { - match self { - MetricTag::Other => { - 0 - } - MetricTag::NotFloat | MetricTag::QuasiRandom => { - x as i64 - } - MetricTag::Percent(y) => { - to_multiply_and_truncate(x, *y) - } - MetricTag::Duration(y) => { - to_multiply_and_truncate(x, *y) - } - MetricTag::Bytes(y) => { - (x as i64) / (*y as i64) +use crate::{types, utils::{prev_power_of_two, f64_to_u64}, compressor::Compressor}; + +/// Max Frame size, this can aprox. 36h of data at 1point/sec rate, a little more than 1 week at 1point/5sec +/// and 1 month (30 days) at 1 point/20sec. +/// This would be aprox. 1MB of Raw data (131072 * 64bits). +/// We wouldn't want to decompressed a ton of uncessary data, but for historical view of the data, looking into 1day/week/month at once is very reasonable +const MAX_FRAME_SIZE: usize = 131072; // 2^17 +/// The Min frame size is one that allows our compressors potentially achieve 100x compression. Currently the most +/// limited one is the FFT compressor, that needs 3 frequencies at minimum, 3x100 = 300, next power of 2 is 512. +const MIN_FRAME_SIZE: usize = 512; // 2^9 + +// My idea here: +// 1. Clean data +// 2. Split into good sized chunks (aka power of 2) +// 3. Get each chunk into the compressor that it should go +// 3.1. Chunks should be at least of a size that it can allow a 100x compression for that given compressor (FFT is 512) +// 4. From the clean data and chunk sizes, assign an optimizer for each chunk +#[derive(Debug, Clone)] +struct OptimizerPlan { + pub data: Vec, + pub chunk_sizes: Vec, + pub compressors: Vec, +} + +impl OptimizerPlan { + + /// Creates an optimal data compression plan + pub fn plan(data: Vec) -> Self { + let c_data = OptimizerPlan::clean_data(&data); + let chunks = OptimizerPlan::get_chunks_sizes(c_data.len()); + let optimizer = OptimizerPlan::assign_compressor(&c_data, &chunks, None); + OptimizerPlan { data: c_data, + chunk_sizes: chunks, + compressors: optimizer } + } + + /// Creates an optimal plan for compression for the data set provided bound by a given error + pub fn plan_bounded(data: Vec, max_error: f32) -> Self { + // TODO: Check error limits + let c_data = OptimizerPlan::clean_data(&data); + let chunks = OptimizerPlan::get_chunks_sizes(c_data.len()); + let optimizer = OptimizerPlan::assign_compressor(&c_data, &chunks, Some(max_error)); + OptimizerPlan { data: c_data, + chunk_sizes: chunks, + compressors: optimizer } + } + + /// Sets a given compressor for all data chunks + pub fn set_compressor(&mut self, compressor: Compressor) { + let new_compressors = vec![compressor; self.compressors.len()]; + self.compressors = new_compressors; + } + + /// Removes NaN and infinite references from the data + pub fn clean_data(wav_data: &[f64]) -> Vec { + // Cleaning data, removing NaN, etc. This might reduce sample count + wav_data.iter() + .filter(|x| !(x.is_nan() || x.is_infinite())) + .copied() + .collect() + } + + /// This function gets a length and returns a vector with the chunk sizes to feed to the different compressors + /// A lot of assumptions go into selecting the chunk size, including: + /// 1. Collection rate - It is not expected that the collection rate exceeds 1point sec (it is expected actually less) + /// 2. Maximum compression achievable - A compressed frame as overhead and a minimum number of segments, small frames don't allow great compressions + /// 3. FFT operates faster under power of 2 + fn get_chunks_sizes(mut len: usize) -> Vec { + let mut chunk_sizes = Vec::::new(); + while len > 0 { + match len { + _ if len >= MAX_FRAME_SIZE => { + chunk_sizes.push(MAX_FRAME_SIZE); + len -= MAX_FRAME_SIZE; + }, + _ if len <= MIN_FRAME_SIZE => { + chunk_sizes.push(len); + len = 0; + }, + _ => { + let size = prev_power_of_two(len); + chunk_sizes.push(size); + len -= size; + } } } + chunk_sizes } -} -/// Converts a float via multiplication and truncation -fn to_multiply_and_truncate(number: f64, mul: i32) -> i64 { - (number * mul as f64) as i64 -} + /// Returns an iterator with the data slice and the compressor associated + pub fn get_execution(&self) -> Vec<(&Compressor, &[f64])> { + let mut output = Vec::with_capacity(self.chunk_sizes.len()); + let mut s = 0; + for (i,size) in self.chunk_sizes.iter().enumerate() { + output.push((&self.compressors[i] ,&self.data[s..(s+*size)])); + s += *size; + } + output + } -fn to_median_filter(data: &[f64]) -> Vec { - let mut filtered = Vec::with_capacity(data.len()); - // 10minutes of data - let mut filter = Filter::new(50); - for point in data { - let point_int = MetricTag::QuasiRandom.from_float(*point); - let median = filter.consume(point_int); - filtered.push(median) + /// Walks the data, checks how much variability is in the data, and assigns a compressor based on that + /// NOTE: Is this any good? + fn get_compressor(data: &[f64]) -> Compressor { + let _ = data.iter().map(|&f| f64_to_u64(f, 0)); + // For now, let's just return FFT + Compressor::FFT } - filtered + + /// Assigns a compressor to a chunk of data + fn assign_compressor(clean_data: &[f64], chunks: &[usize], max_error: Option) -> Vec { + let mut selection = Vec::with_capacity(chunks.len()); + match max_error { + Some(_err) => todo!(), + None => { + let mut s = 0; + for size in chunks.iter() { + selection.push(OptimizerPlan::get_compressor(&clean_data[s..(s+*size)])); + s += *size; + } + }, + } + selection + } + } /// This should look at the data and return an optimized dataset for a specific compressor, /// If a compressor is hand picked, this should be skipped. -/// TODO: Make it do that -pub fn process_data(wav_data: &[f64], tag: &MetricTag) -> Vec { - let mut _bitdepth = 64; - let mut _dc_component: i64 = 0; - let mut _fractional = true; - - debug!("Tag: {:?}", tag); - let data = match tag { - MetricTag::Other => Vec::new(), - MetricTag::QuasiRandom => to_median_filter(wav_data), - _ => { - wav_data - .iter() - .map(|x| tag.from_float(*x)) - .collect() - } - }; - _fractional = false; - data +pub fn process_data(wav_data: &[f64], tag: &MetricTag) -> Vec { + debug!("Tag: {:?} Len: {}", tag, wav_data.len()); + wav_data.iter() + .filter(|x| !(x.is_nan() || x.is_infinite())) + .copied() + .collect() +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn optimizer() { + let fake_data = vec![12.23; 2049]; + let op = OptimizerPlan::plan(fake_data); + let plan_vec = op.get_execution(); + assert_eq!(plan_vec.len(), 2); + } + + #[test] + fn test_get_chunks_sizes() { + let len_very_large: usize = 131072 * 3 + 1765; + let len_small: usize = 31; + let len_right_sized: usize = 2048; + let len_some_size: usize = 12032; + assert_eq!(OptimizerPlan::get_chunks_sizes(len_very_large), [131072, 131072, 131072, 1024, 512, 229]); + assert_eq!(OptimizerPlan::get_chunks_sizes(len_small), [31]); + assert_eq!(OptimizerPlan::get_chunks_sizes(len_right_sized), [2048]); + assert_eq!(OptimizerPlan::get_chunks_sizes(len_some_size), [8192, 2048, 1024, 512, 256]); + } + + #[test] + fn assign_compressor() { + let fake_data = vec![12.23; 132671]; + let chunks = OptimizerPlan::get_chunks_sizes(fake_data.len()); + let compressor_vec = OptimizerPlan::assign_compressor(&fake_data, &chunks, None); + assert_eq!(compressor_vec.len(), 4); + } } \ No newline at end of file diff --git a/brro-compressor/src/types/metric_tag.rs b/brro-compressor/src/types/metric_tag.rs index 4fa5236..03d8c3c 100644 --- a/brro-compressor/src/types/metric_tag.rs +++ b/brro-compressor/src/types/metric_tag.rs @@ -1,3 +1,5 @@ +use median::Filter; + #[derive(Debug)] pub enum MetricTag { Percent(i32), @@ -11,4 +13,44 @@ pub enum MetricTag { Bytes(i32), // Data that is in bytes... Make it MB, or KB Other, // Everything else +} + +impl MetricTag { + #[allow(clippy::wrong_self_convention)] + fn from_float(&self, x: f64) -> i64 { + match self { + MetricTag::Other => { + 0 + } + MetricTag::NotFloat | MetricTag::QuasiRandom => { + x as i64 + } + MetricTag::Percent(y) => { + Self::to_multiply_and_truncate(x, *y) + } + MetricTag::Duration(y) => { + Self::to_multiply_and_truncate(x, *y) + } + MetricTag::Bytes(y) => { + (x as i64) / (*y as i64) + } + } + } + + /// Converts a float via multiplication and truncation + fn to_multiply_and_truncate(number: f64, mul: i32) -> i64 { + (number * mul as f64) as i64 + } + + fn to_median_filter(data: &[f64]) -> Vec { + let mut filtered = Vec::with_capacity(data.len()); + // 10minutes of data + let mut filter = Filter::new(50); + for point in data { + let point_int = MetricTag::QuasiRandom.from_float(*point); + let median = filter.consume(point_int); + filtered.push(median) + } + filtered + } } \ No newline at end of file diff --git a/brro-compressor/src/utils/mod.rs b/brro-compressor/src/utils/mod.rs index 856052b..c4f2e84 100644 --- a/brro-compressor/src/utils/mod.rs +++ b/brro-compressor/src/utils/mod.rs @@ -1,4 +1,21 @@ pub mod error; -mod file_type_detector; +pub mod writers; pub mod readers; -pub mod writers; \ No newline at end of file + +mod file_type_detector; + +// Is this the right place? +pub fn prev_power_of_two(n: usize) -> usize { + // n = 0 gives highest_bit_set_idx = 0. + let highest_bit_set_idx = 63 - (n|1).leading_zeros(); + // Binary AND of highest bit with n is a no-op, except zero gets wiped. + (1 << highest_bit_set_idx) & n +} + +/// Converts a float to u64 with a given precision +pub fn f64_to_u64(number: f64, precision: usize) -> u64 { + // TODO: Panic on overflow + if precision > 6 { panic!("Precision only available up to 6 digits!")} + let mul = [1, 10, 100, 1_000, 10_000, 100_000, 1_000_000][precision]; + (number * mul as f64) as u64 +} \ No newline at end of file