Skip to content

Commit

Permalink
Merge pull request #39 from instaclustr/CompressStream
Browse files Browse the repository at this point in the history
Compress stream
  • Loading branch information
cjrolo authored Oct 11, 2023
2 parents 7320cc6 + cd3a4c7 commit d1ee240
Show file tree
Hide file tree
Showing 8 changed files with 159 additions and 20 deletions.
3 changes: 1 addition & 2 deletions brro-compressor/src/compressor/fft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,8 +239,7 @@ pub fn fft(data: &[f64], max_freqs: usize, min: f64, max: f64) -> Vec<u8> {
/// as the FFT will be calculated over and over until the specific error threshold is achived.
/// `max_freqs` is used as a starting point for the calculation
pub fn fft_allowed_error(data: &[f64], max_freqs: usize, min: f64, max: f64, allowed_error: f64) -> Vec<u8> {
info!("Initializing FFT Compressor");
// Initialize the compressor
// TODO: This can be greatly improved
let frame_size = data.len();
let mut i = 1;
let mut compressed_data = fft(data, max_freqs, min, max);
Expand Down
10 changes: 7 additions & 3 deletions brro-compressor/src/compressor/mod.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
use bincode::config::{self, Configuration};
use bincode::{Decode, Encode};
use self::constant::constant;
use self::fft::fft;
use self::noop::noop;

pub mod noop;
pub mod constant;
pub mod fft;

#[derive(Encode, Decode, Default, Debug, Clone)]
pub enum Compressor {
#[default]
Noop,
FFT,
Wavelet,
Expand All @@ -16,12 +20,12 @@ pub enum Compressor {
}

impl Compressor {
fn compress(&self, data: &[f64] ) -> Vec<u8> {
pub fn compress(&self, data: &[f64] ) -> Vec<u8> {
match self {
Compressor::Noop => noop(data),
Compressor::FFT => todo!(),
Compressor::FFT => fft(data, 8, 0.0, 10.0), // TODO: Remove the placeholders
Compressor::Constant => constant(data),
_ => todo!(),
_ => noop(data),
}
}
}
Expand Down
77 changes: 77 additions & 0 deletions brro-compressor/src/data.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use bincode::{Decode, Encode};
use crate::compressor::{Compressor, BinConfig};
use crate::frame::CompressorFrame;
use crate::header::CompressorHeader;

#[derive(Encode, Decode, Debug, Clone)]
pub struct CompressedStream {
header: CompressorHeader,
data_frames: Vec<CompressorFrame>,
Expand All @@ -14,4 +17,78 @@ impl CompressedStream {
data_frames: Vec::new(),
}
}

/// Compress a chunk of data adding it as a new frame to the current stream
pub fn compress_chunk(&mut self, chunk: &[f64]) {
let mut compressor_frame = CompressorFrame::new(None);
compressor_frame.compress(chunk);
compressor_frame.close();
self.data_frames.push(compressor_frame);
}

/// Compress a chunk of data with a specific compressor adding it as a new frame to the current stream
pub fn compress_chunk_with(&mut self, chunk: &[f64], compressor: Compressor) {
let mut compressor_frame = CompressorFrame::new(Some(compressor));
compressor_frame.compress(chunk);
compressor_frame.close();
self.data_frames.push(compressor_frame);

}

/// Transforms the whole CompressedStream into bytes to be written to a file
pub fn to_bytes(self) -> Vec<u8> {
// Will this chain encode??
let config = BinConfig::get();
bincode::encode_to_vec(self, config).unwrap()
}

/// Gets a binary stream and generates a Compressed Stream
pub fn from_bytes(data: &[u8]) -> Self {
let config = BinConfig::get();
match bincode::decode_from_slice(data, config) {
Ok((compressed_stream, _)) => compressed_stream,
Err(e) => panic!("{e}")
}
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_compress_chunk() {
let vector1 = vec![1.0, 1.0, 1.0, 1.0, 1.0];
let mut cs = CompressedStream::new();
cs.compress_chunk(&vector1);
assert_eq!(cs.data_frames.len(), 1);
}

#[test]
fn test_compress_chunk_with() {
let vector1 = vec![1.0, 1.0, 1.0, 1.0, 1.0];
let mut cs = CompressedStream::new();
cs.compress_chunk_with(&vector1, Compressor::Constant);
assert_eq!(cs.data_frames.len(), 1);
}

#[test]
fn test_to_bytes() {
let vector1 = vec![1.0; 1024];
let mut cs = CompressedStream::new();
cs.compress_chunk_with(&vector1, Compressor::Constant);
let b = cs.to_bytes();
assert_eq!(b, [66, 82, 82, 79, 0, 1, 37, 0, 3, 3, 0, 2, 0]);
}

#[test]
fn test_from_bytes() {
let vector1 = vec![1.0; 1024];
let mut cs = CompressedStream::new();
cs.compress_chunk_with(&vector1, Compressor::Constant);
let len = cs.data_frames.len();
let b = cs.to_bytes();
let cs2 = CompressedStream::from_bytes(&b);
assert_eq!(len, cs2.data_frames.len());
}
}
35 changes: 30 additions & 5 deletions brro-compressor/src/frame/mod.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,46 @@
use std::mem::size_of_val;
use bincode::{Decode, Encode};
use crate::compressor::Compressor;

/// This is the structure of a compressor frame
#[derive(Encode, Decode, Debug, Clone)]
pub struct CompressorFrame{
/// The frame size in Bytes,
frame_size: i64,
/// The frame size in bytes,
frame_size: usize,
/// The number of samples in this frame,
samples: u32,
/// The compressor used in the current frame
compressor: Compressor,
/// Output from the compressor
data: Vec<u8>,
}

impl CompressorFrame {
/// For testing
pub fn new() -> Self {
/// Creates a compressor frame, if a compressor is provided, it forces that compressor, otherwise is selected
/// by the optimizer
/// compressor: None to allow BRRO to chose, or force one
pub fn new(provided_compressor: Option<Compressor>) -> Self {
CompressorFrame {
frame_size: 0,
compressor: Compressor::Noop,
samples: 0,
compressor: provided_compressor.unwrap_or_default(),
data: Vec::new() }
}

/// Calculates the size of the Frame and "closes it"
// TODO this is probably wrong, so we have to use the write stream to dump the bytes writen
pub fn close(&mut self) {
let size = size_of_val(&self.samples)
+ size_of_val(&self.compressor)
+ size_of_val(&self.data)
+ size_of_val(&self.frame_size);
self.frame_size = size;
}

/// Compress a data and stores the result in the frame
pub fn compress(&mut self, data: &[f64]) {
// TODO: Optimize here
// self.compressor = optimizer_selection
self.data = self.compressor.compress(data);
}
}
4 changes: 3 additions & 1 deletion brro-compressor/src/header.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
/// This will write the file headers
use bincode::{Decode, Encode};

/// This will write the file headers
#[derive(Encode, Decode, Debug, Clone)]
pub struct CompressorHeader {
initial_segment: [u8; 4],
// We should go unsigned
Expand Down
4 changes: 2 additions & 2 deletions brro-compressor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ pub mod compressor;
pub mod frame;
pub mod preprocessor;
pub mod utils;
mod header;
mod data;
pub mod header;
pub mod data;

pub mod optimizer;
pub mod types;
25 changes: 18 additions & 7 deletions brro-compressor/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,23 @@
use std::path::Path;
use clap::{Parser, command, arg};
use log::debug;
use brro_compressor::compressor;

use brro_compressor::compressor;
use brro_compressor::optimizer;
use brro_compressor::utils::reader;
use brro_compressor::utils::writer;
use brro_compressor::data::CompressedStream;

/// Process a chunk of WAV content and compresses it
/// If a stream is provided it adds a chunk to that stream, otherwise creates a new one
fn compress_file(stream: Option<CompressedStream>, wav_content: Vec<f64>) -> CompressedStream {
let mut cs = match stream {
Some(cs) => cs,
None => CompressedStream::new()
};
cs.compress_chunk(&wav_content);
cs
}

fn process_args(input_path: &str, arguments: &Args) {
let path = Path::new(input_path);
Expand All @@ -20,7 +32,6 @@ fn process_args(input_path: &str, arguments: &Args) {
for (index, data) in files.contents.iter().enumerate() {
let (vec_data, tag) = data;
let optimizer_results = optimizer::process_data(vec_data, tag);

let optimizer_results_f: Vec<f64> = optimizer_results.iter().map(|&x| x as f64).collect();

let mut compressed: Vec<u8> = Vec::new();
Expand All @@ -36,7 +47,9 @@ fn process_args(input_path: &str, arguments: &Args) {
writer::write_data_to_stream(&mut file, &compressed).expect("Failed to write compressed data");
}
} else {
// process_file(input_path.into());
// TODO: Make this do something...
let cs = compress_file(None, Vec::new());
cs.to_bytes();
}
}

Expand All @@ -49,19 +62,17 @@ struct Args {
#[arg(short, action)]
directory: bool,

/// Write optimized samples to a file, named as optimized.out
/// Forces Noop compressor
#[arg(long, action)]
noop: bool,

/// Write optimized samples to a file, named as optimized.out
/// Forces Constant compressor
#[arg(long, action)]
constant: bool,

}

fn main() {
// How to break the float part??? --> THERE ARE NO FLOATS!
// https://access.redhat.com/documentation/en-us/red_hat_enterprise_linux/6/html/deployment_guide/s2-proc-stat
env_logger::init();
let arguments = Args::parse();
debug!("{:?}", arguments);
Expand Down
21 changes: 21 additions & 0 deletions brro-compressor/src/utils/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,27 @@ pub struct Files {
pub names: Vec<String>,
}

/// Read a file by chunks and processes the chunks
pub fn process_by_chunk(file_path: &Path) -> Result<(), std::io::Error> {
let mut file = match std::fs::File::open(file_path) {
Ok(f) => f,
Err(e) => panic!("{}", e)
};

let mut list_of_chunks = Vec::new();
// 64KB at a time, assuming 64Bit samples, ~1024 samples.
let chunk_size = 0x10000;

loop {
let mut chunk = Vec::with_capacity(chunk_size);
let n = file.by_ref().take(chunk_size as u64).read_to_end(&mut chunk)?;
if n == 0 { break; }
list_of_chunks.push(chunk);
if n < chunk_size { break; }
}
Ok(())
}

// Function to read and process files in a directory
pub fn stream_reader(directory_path: &Path) -> io::Result<Files> {
let mut contents: Vec<(Vec<f64>, MetricTag)> = Vec::new();
Expand Down

0 comments on commit d1ee240

Please sign in to comment.