Skip to content

Commit

Permalink
Merge pull request #56 from instaclustr/optimizer-work-v1
Browse files Browse the repository at this point in the history
Optimizer work v1
  • Loading branch information
cjrolo authored Oct 22, 2023
2 parents 9669cb1 + 247d2ea commit 2e5a348
Show file tree
Hide file tree
Showing 6 changed files with 255 additions and 63 deletions.
34 changes: 34 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion brro-compressor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ clap = {version = "4.3.14", features = ["derive"] }
bincode = "2.0.0-rc.3"
rustfft = "6.1.0"
tempfile = "3.2"

average = "0.14.1"
regex = "1.9.1"
hound = "3.5"
median = "0.3.2"
5 changes: 2 additions & 3 deletions brro-compressor/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,7 @@ fn process_single_file(arguments: &Args) -> Result<(), Box<dyn Error>> {
fn compress_data(vec: &[f64], tag: &MetricTag, arguments: &Args) -> Vec<u8> {
debug!("Compressing data!");
let optimizer_results = optimizer::process_data(vec, tag);
let _optimizer_results_f: Vec<f64> = optimizer_results.iter().map(|&x| x as f64).collect();

debug!("Samples in: {}, Samples out: {}", vec.len(), optimizer_results.len());
let mut cs = CompressedStream::new();
let compressor = match arguments.compressor {
CompressorType::Noop => Compressor::Noop,
Expand All @@ -120,7 +119,7 @@ fn compress_data(vec: &[f64], tag: &MetricTag, arguments: &Args) -> Vec<u8> {
CompressorType::Wavelet => Compressor::Wavelet
};

cs.compress_chunk_with(vec, compressor);
cs.compress_chunk_with(&optimizer_results, compressor);
cs.to_bytes()
}

Expand Down
214 changes: 157 additions & 57 deletions brro-compressor/src/optimizer/mod.rs
Original file line number Diff line number Diff line change
@@ -1,70 +1,170 @@
// Lucas - Once the project is far enough along I strongly reccomend reenabling dead code checks
#![allow(dead_code)]

use median::Filter;
use log::debug;
use types::metric_tag::MetricTag;
use crate::types;

impl MetricTag {
#[allow(clippy::wrong_self_convention)]
fn from_float(&self, x: f64) -> i64 {
match self {
MetricTag::Other => {
0
}
MetricTag::NotFloat | MetricTag::QuasiRandom => {
x as i64
}
MetricTag::Percent(y) => {
to_multiply_and_truncate(x, *y)
}
MetricTag::Duration(y) => {
to_multiply_and_truncate(x, *y)
}
MetricTag::Bytes(y) => {
(x as i64) / (*y as i64)
use crate::{types, utils::{prev_power_of_two, f64_to_u64}, compressor::Compressor};

/// Max Frame size, this can aprox. 36h of data at 1point/sec rate, a little more than 1 week at 1point/5sec
/// and 1 month (30 days) at 1 point/20sec.
/// This would be aprox. 1MB of Raw data (131072 * 64bits).
/// We wouldn't want to decompressed a ton of uncessary data, but for historical view of the data, looking into 1day/week/month at once is very reasonable
const MAX_FRAME_SIZE: usize = 131072; // 2^17
/// The Min frame size is one that allows our compressors potentially achieve 100x compression. Currently the most
/// limited one is the FFT compressor, that needs 3 frequencies at minimum, 3x100 = 300, next power of 2 is 512.
const MIN_FRAME_SIZE: usize = 512; // 2^9

// My idea here:
// 1. Clean data
// 2. Split into good sized chunks (aka power of 2)
// 3. Get each chunk into the compressor that it should go
// 3.1. Chunks should be at least of a size that it can allow a 100x compression for that given compressor (FFT is 512)
// 4. From the clean data and chunk sizes, assign an optimizer for each chunk
#[derive(Debug, Clone)]
struct OptimizerPlan {
pub data: Vec<f64>,
pub chunk_sizes: Vec<usize>,
pub compressors: Vec<Compressor>,
}

impl OptimizerPlan {

/// Creates an optimal data compression plan
pub fn plan(data: Vec<f64>) -> Self {
let c_data = OptimizerPlan::clean_data(&data);
let chunks = OptimizerPlan::get_chunks_sizes(c_data.len());
let optimizer = OptimizerPlan::assign_compressor(&c_data, &chunks, None);
OptimizerPlan { data: c_data,
chunk_sizes: chunks,
compressors: optimizer }
}

/// Creates an optimal plan for compression for the data set provided bound by a given error
pub fn plan_bounded(data: Vec<f64>, max_error: f32) -> Self {
// TODO: Check error limits
let c_data = OptimizerPlan::clean_data(&data);
let chunks = OptimizerPlan::get_chunks_sizes(c_data.len());
let optimizer = OptimizerPlan::assign_compressor(&c_data, &chunks, Some(max_error));
OptimizerPlan { data: c_data,
chunk_sizes: chunks,
compressors: optimizer }
}

/// Sets a given compressor for all data chunks
pub fn set_compressor(&mut self, compressor: Compressor) {
let new_compressors = vec![compressor; self.compressors.len()];
self.compressors = new_compressors;
}

/// Removes NaN and infinite references from the data
pub fn clean_data(wav_data: &[f64]) -> Vec<f64> {
// Cleaning data, removing NaN, etc. This might reduce sample count
wav_data.iter()
.filter(|x| !(x.is_nan() || x.is_infinite()))
.copied()
.collect()
}

/// This function gets a length and returns a vector with the chunk sizes to feed to the different compressors
/// A lot of assumptions go into selecting the chunk size, including:
/// 1. Collection rate - It is not expected that the collection rate exceeds 1point sec (it is expected actually less)
/// 2. Maximum compression achievable - A compressed frame as overhead and a minimum number of segments, small frames don't allow great compressions
/// 3. FFT operates faster under power of 2
fn get_chunks_sizes(mut len: usize) -> Vec<usize> {
let mut chunk_sizes = Vec::<usize>::new();
while len > 0 {
match len {
_ if len >= MAX_FRAME_SIZE => {
chunk_sizes.push(MAX_FRAME_SIZE);
len -= MAX_FRAME_SIZE;
},
_ if len <= MIN_FRAME_SIZE => {
chunk_sizes.push(len);
len = 0;
},
_ => {
let size = prev_power_of_two(len);
chunk_sizes.push(size);
len -= size;
}
}
}
chunk_sizes
}
}

/// Converts a float via multiplication and truncation
fn to_multiply_and_truncate(number: f64, mul: i32) -> i64 {
(number * mul as f64) as i64
}
/// Returns an iterator with the data slice and the compressor associated
pub fn get_execution(&self) -> Vec<(&Compressor, &[f64])> {
let mut output = Vec::with_capacity(self.chunk_sizes.len());
let mut s = 0;
for (i,size) in self.chunk_sizes.iter().enumerate() {
output.push((&self.compressors[i] ,&self.data[s..(s+*size)]));
s += *size;
}
output
}

fn to_median_filter(data: &[f64]) -> Vec<i64> {
let mut filtered = Vec::with_capacity(data.len());
// 10minutes of data
let mut filter = Filter::new(50);
for point in data {
let point_int = MetricTag::QuasiRandom.from_float(*point);
let median = filter.consume(point_int);
filtered.push(median)
/// Walks the data, checks how much variability is in the data, and assigns a compressor based on that
/// NOTE: Is this any good?
fn get_compressor(data: &[f64]) -> Compressor {
let _ = data.iter().map(|&f| f64_to_u64(f, 0));
// For now, let's just return FFT
Compressor::FFT
}
filtered

/// Assigns a compressor to a chunk of data
fn assign_compressor(clean_data: &[f64], chunks: &[usize], max_error: Option<f32>) -> Vec<Compressor> {
let mut selection = Vec::with_capacity(chunks.len());
match max_error {
Some(_err) => todo!(),
None => {
let mut s = 0;
for size in chunks.iter() {
selection.push(OptimizerPlan::get_compressor(&clean_data[s..(s+*size)]));
s += *size;
}
},
}
selection
}

}

/// This should look at the data and return an optimized dataset for a specific compressor,
/// If a compressor is hand picked, this should be skipped.
/// TODO: Make it do that
pub fn process_data(wav_data: &[f64], tag: &MetricTag) -> Vec<i64> {
let mut _bitdepth = 64;
let mut _dc_component: i64 = 0;
let mut _fractional = true;

debug!("Tag: {:?}", tag);
let data = match tag {
MetricTag::Other => Vec::new(),
MetricTag::QuasiRandom => to_median_filter(wav_data),
_ => {
wav_data
.iter()
.map(|x| tag.from_float(*x))
.collect()
}
};
_fractional = false;
data
pub fn process_data(wav_data: &[f64], tag: &MetricTag) -> Vec<f64> {
debug!("Tag: {:?} Len: {}", tag, wav_data.len());
wav_data.iter()
.filter(|x| !(x.is_nan() || x.is_infinite()))
.copied()
.collect()
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn optimizer() {
let fake_data = vec![12.23; 2049];
let op = OptimizerPlan::plan(fake_data);
let plan_vec = op.get_execution();
assert_eq!(plan_vec.len(), 2);
}

#[test]
fn test_get_chunks_sizes() {
let len_very_large: usize = 131072 * 3 + 1765;
let len_small: usize = 31;
let len_right_sized: usize = 2048;
let len_some_size: usize = 12032;
assert_eq!(OptimizerPlan::get_chunks_sizes(len_very_large), [131072, 131072, 131072, 1024, 512, 229]);
assert_eq!(OptimizerPlan::get_chunks_sizes(len_small), [31]);
assert_eq!(OptimizerPlan::get_chunks_sizes(len_right_sized), [2048]);
assert_eq!(OptimizerPlan::get_chunks_sizes(len_some_size), [8192, 2048, 1024, 512, 256]);
}

#[test]
fn assign_compressor() {
let fake_data = vec![12.23; 132671];
let chunks = OptimizerPlan::get_chunks_sizes(fake_data.len());
let compressor_vec = OptimizerPlan::assign_compressor(&fake_data, &chunks, None);
assert_eq!(compressor_vec.len(), 4);
}
}
42 changes: 42 additions & 0 deletions brro-compressor/src/types/metric_tag.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use median::Filter;

#[derive(Debug)]
pub enum MetricTag {
Percent(i32),
Expand All @@ -11,4 +13,44 @@ pub enum MetricTag {
Bytes(i32),
// Data that is in bytes... Make it MB, or KB
Other, // Everything else
}

impl MetricTag {
#[allow(clippy::wrong_self_convention)]
fn from_float(&self, x: f64) -> i64 {
match self {
MetricTag::Other => {
0
}
MetricTag::NotFloat | MetricTag::QuasiRandom => {
x as i64
}
MetricTag::Percent(y) => {
Self::to_multiply_and_truncate(x, *y)
}
MetricTag::Duration(y) => {
Self::to_multiply_and_truncate(x, *y)
}
MetricTag::Bytes(y) => {
(x as i64) / (*y as i64)
}
}
}

/// Converts a float via multiplication and truncation
fn to_multiply_and_truncate(number: f64, mul: i32) -> i64 {
(number * mul as f64) as i64
}

fn to_median_filter(data: &[f64]) -> Vec<i64> {
let mut filtered = Vec::with_capacity(data.len());
// 10minutes of data
let mut filter = Filter::new(50);
for point in data {
let point_int = MetricTag::QuasiRandom.from_float(*point);
let median = filter.consume(point_int);
filtered.push(median)
}
filtered
}
}
Loading

0 comments on commit 2e5a348

Please sign in to comment.