Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimizer work v1 #56

Merged
merged 9 commits into from
Oct 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion brro-compressor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ clap = {version = "4.3.14", features = ["derive"] }
bincode = "2.0.0-rc.3"
rustfft = "6.1.0"
tempfile = "3.2"

average = "0.14.1"
regex = "1.9.1"
hound = "3.5"
median = "0.3.2"
5 changes: 2 additions & 3 deletions brro-compressor/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,7 @@ fn process_single_file(arguments: &Args) -> Result<(), Box<dyn Error>> {
fn compress_data(vec: &[f64], tag: &MetricTag, arguments: &Args) -> Vec<u8> {
debug!("Compressing data!");
let optimizer_results = optimizer::process_data(vec, tag);
let _optimizer_results_f: Vec<f64> = optimizer_results.iter().map(|&x| x as f64).collect();

debug!("Samples in: {}, Samples out: {}", vec.len(), optimizer_results.len());
let mut cs = CompressedStream::new();
let compressor = match arguments.compressor {
CompressorType::Noop => Compressor::Noop,
Expand All @@ -120,7 +119,7 @@ fn compress_data(vec: &[f64], tag: &MetricTag, arguments: &Args) -> Vec<u8> {
CompressorType::Wavelet => Compressor::Wavelet
};

cs.compress_chunk_with(vec, compressor);
cs.compress_chunk_with(&optimizer_results, compressor);
cs.to_bytes()
}

Expand Down
214 changes: 157 additions & 57 deletions brro-compressor/src/optimizer/mod.rs
Original file line number Diff line number Diff line change
@@ -1,70 +1,170 @@
// Lucas - Once the project is far enough along I strongly reccomend reenabling dead code checks
#![allow(dead_code)]

use median::Filter;
use log::debug;
use types::metric_tag::MetricTag;
use crate::types;

impl MetricTag {
#[allow(clippy::wrong_self_convention)]
fn from_float(&self, x: f64) -> i64 {
match self {
MetricTag::Other => {
0
}
MetricTag::NotFloat | MetricTag::QuasiRandom => {
x as i64
}
MetricTag::Percent(y) => {
to_multiply_and_truncate(x, *y)
}
MetricTag::Duration(y) => {
to_multiply_and_truncate(x, *y)
}
MetricTag::Bytes(y) => {
(x as i64) / (*y as i64)
use crate::{types, utils::{prev_power_of_two, f64_to_u64}, compressor::Compressor};

/// Max Frame size, this can aprox. 36h of data at 1point/sec rate, a little more than 1 week at 1point/5sec
/// and 1 month (30 days) at 1 point/20sec.
/// This would be aprox. 1MB of Raw data (131072 * 64bits).
/// We wouldn't want to decompressed a ton of uncessary data, but for historical view of the data, looking into 1day/week/month at once is very reasonable
const MAX_FRAME_SIZE: usize = 131072; // 2^17
/// The Min frame size is one that allows our compressors potentially achieve 100x compression. Currently the most
/// limited one is the FFT compressor, that needs 3 frequencies at minimum, 3x100 = 300, next power of 2 is 512.
const MIN_FRAME_SIZE: usize = 512; // 2^9

// My idea here:
// 1. Clean data
// 2. Split into good sized chunks (aka power of 2)
// 3. Get each chunk into the compressor that it should go
// 3.1. Chunks should be at least of a size that it can allow a 100x compression for that given compressor (FFT is 512)
// 4. From the clean data and chunk sizes, assign an optimizer for each chunk
#[derive(Debug, Clone)]
struct OptimizerPlan {
pub data: Vec<f64>,
pub chunk_sizes: Vec<usize>,
pub compressors: Vec<Compressor>,
}

impl OptimizerPlan {

/// Creates an optimal data compression plan
pub fn plan(data: Vec<f64>) -> Self {
let c_data = OptimizerPlan::clean_data(&data);
let chunks = OptimizerPlan::get_chunks_sizes(c_data.len());
let optimizer = OptimizerPlan::assign_compressor(&c_data, &chunks, None);
OptimizerPlan { data: c_data,
chunk_sizes: chunks,
compressors: optimizer }
}

/// Creates an optimal plan for compression for the data set provided bound by a given error
pub fn plan_bounded(data: Vec<f64>, max_error: f32) -> Self {
// TODO: Check error limits
let c_data = OptimizerPlan::clean_data(&data);
let chunks = OptimizerPlan::get_chunks_sizes(c_data.len());
let optimizer = OptimizerPlan::assign_compressor(&c_data, &chunks, Some(max_error));
OptimizerPlan { data: c_data,
chunk_sizes: chunks,
compressors: optimizer }
}

/// Sets a given compressor for all data chunks
pub fn set_compressor(&mut self, compressor: Compressor) {
let new_compressors = vec![compressor; self.compressors.len()];
self.compressors = new_compressors;
}

/// Removes NaN and infinite references from the data
pub fn clean_data(wav_data: &[f64]) -> Vec<f64> {
// Cleaning data, removing NaN, etc. This might reduce sample count
wav_data.iter()
.filter(|x| !(x.is_nan() || x.is_infinite()))
.copied()
.collect()
}

/// This function gets a length and returns a vector with the chunk sizes to feed to the different compressors
/// A lot of assumptions go into selecting the chunk size, including:
/// 1. Collection rate - It is not expected that the collection rate exceeds 1point sec (it is expected actually less)
/// 2. Maximum compression achievable - A compressed frame as overhead and a minimum number of segments, small frames don't allow great compressions
/// 3. FFT operates faster under power of 2
fn get_chunks_sizes(mut len: usize) -> Vec<usize> {
let mut chunk_sizes = Vec::<usize>::new();
while len > 0 {
match len {
_ if len >= MAX_FRAME_SIZE => {
chunk_sizes.push(MAX_FRAME_SIZE);
len -= MAX_FRAME_SIZE;
},
_ if len <= MIN_FRAME_SIZE => {
chunk_sizes.push(len);
len = 0;
},
_ => {
let size = prev_power_of_two(len);
chunk_sizes.push(size);
len -= size;
}
}
}
chunk_sizes
}
}

/// Converts a float via multiplication and truncation
fn to_multiply_and_truncate(number: f64, mul: i32) -> i64 {
(number * mul as f64) as i64
}
/// Returns an iterator with the data slice and the compressor associated
pub fn get_execution(&self) -> Vec<(&Compressor, &[f64])> {
let mut output = Vec::with_capacity(self.chunk_sizes.len());
let mut s = 0;
for (i,size) in self.chunk_sizes.iter().enumerate() {
output.push((&self.compressors[i] ,&self.data[s..(s+*size)]));
s += *size;
}
output
}

fn to_median_filter(data: &[f64]) -> Vec<i64> {
let mut filtered = Vec::with_capacity(data.len());
// 10minutes of data
let mut filter = Filter::new(50);
for point in data {
let point_int = MetricTag::QuasiRandom.from_float(*point);
let median = filter.consume(point_int);
filtered.push(median)
/// Walks the data, checks how much variability is in the data, and assigns a compressor based on that
/// NOTE: Is this any good?
fn get_compressor(data: &[f64]) -> Compressor {
let _ = data.iter().map(|&f| f64_to_u64(f, 0));
// For now, let's just return FFT
Compressor::FFT
}
filtered

/// Assigns a compressor to a chunk of data
fn assign_compressor(clean_data: &[f64], chunks: &[usize], max_error: Option<f32>) -> Vec<Compressor> {
let mut selection = Vec::with_capacity(chunks.len());
match max_error {
Some(_err) => todo!(),
None => {
let mut s = 0;
for size in chunks.iter() {
selection.push(OptimizerPlan::get_compressor(&clean_data[s..(s+*size)]));
s += *size;
}
},
}
selection
}

}

/// This should look at the data and return an optimized dataset for a specific compressor,
/// If a compressor is hand picked, this should be skipped.
/// TODO: Make it do that
pub fn process_data(wav_data: &[f64], tag: &MetricTag) -> Vec<i64> {
let mut _bitdepth = 64;
let mut _dc_component: i64 = 0;
let mut _fractional = true;

debug!("Tag: {:?}", tag);
let data = match tag {
MetricTag::Other => Vec::new(),
MetricTag::QuasiRandom => to_median_filter(wav_data),
_ => {
wav_data
.iter()
.map(|x| tag.from_float(*x))
.collect()
}
};
_fractional = false;
data
pub fn process_data(wav_data: &[f64], tag: &MetricTag) -> Vec<f64> {
debug!("Tag: {:?} Len: {}", tag, wav_data.len());
wav_data.iter()
.filter(|x| !(x.is_nan() || x.is_infinite()))
.copied()
.collect()
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn optimizer() {
let fake_data = vec![12.23; 2049];
let op = OptimizerPlan::plan(fake_data);
let plan_vec = op.get_execution();
assert_eq!(plan_vec.len(), 2);
}

#[test]
fn test_get_chunks_sizes() {
let len_very_large: usize = 131072 * 3 + 1765;
let len_small: usize = 31;
let len_right_sized: usize = 2048;
let len_some_size: usize = 12032;
assert_eq!(OptimizerPlan::get_chunks_sizes(len_very_large), [131072, 131072, 131072, 1024, 512, 229]);
assert_eq!(OptimizerPlan::get_chunks_sizes(len_small), [31]);
assert_eq!(OptimizerPlan::get_chunks_sizes(len_right_sized), [2048]);
assert_eq!(OptimizerPlan::get_chunks_sizes(len_some_size), [8192, 2048, 1024, 512, 256]);
}

#[test]
fn assign_compressor() {
let fake_data = vec![12.23; 132671];
let chunks = OptimizerPlan::get_chunks_sizes(fake_data.len());
let compressor_vec = OptimizerPlan::assign_compressor(&fake_data, &chunks, None);
assert_eq!(compressor_vec.len(), 4);
}
}
42 changes: 42 additions & 0 deletions brro-compressor/src/types/metric_tag.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use median::Filter;

#[derive(Debug)]
pub enum MetricTag {
Percent(i32),
Expand All @@ -11,4 +13,44 @@ pub enum MetricTag {
Bytes(i32),
// Data that is in bytes... Make it MB, or KB
Other, // Everything else
}

impl MetricTag {
#[allow(clippy::wrong_self_convention)]
fn from_float(&self, x: f64) -> i64 {
match self {
MetricTag::Other => {
0
}
MetricTag::NotFloat | MetricTag::QuasiRandom => {
x as i64
}
MetricTag::Percent(y) => {
Self::to_multiply_and_truncate(x, *y)
}
MetricTag::Duration(y) => {
Self::to_multiply_and_truncate(x, *y)
}
MetricTag::Bytes(y) => {
(x as i64) / (*y as i64)
}
}
}

/// Converts a float via multiplication and truncation
fn to_multiply_and_truncate(number: f64, mul: i32) -> i64 {
(number * mul as f64) as i64
}

fn to_median_filter(data: &[f64]) -> Vec<i64> {
let mut filtered = Vec::with_capacity(data.len());
// 10minutes of data
let mut filter = Filter::new(50);
for point in data {
let point_int = MetricTag::QuasiRandom.from_float(*point);
let median = filter.consume(point_int);
filtered.push(median)
}
filtered
}
}
Loading