Skip to content

Commit

Permalink
PR suggestions, code cleanup and some code improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
worryg0d committed Jul 31, 2024
1 parent 50476af commit 9d7a2e4
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 126 deletions.
32 changes: 12 additions & 20 deletions csv-compressor/src/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,7 @@ impl Sample {
/// | 000001 | 1.01 |
/// | 000005 | 1.22 |
pub fn read_samples_from_csv_file(dest: &Path) -> Result<Vec<Sample>, csv::Error> {
let file = OpenOptions::new().
read(true).
open(dest)?;

let file = OpenOptions::new().read(true).open(dest)?;
let mut reader = csv::Reader::from_reader(file);
reader.deserialize().collect()
}
Expand All @@ -54,12 +51,10 @@ mod tests {
#[test]
fn test_write_samples_to_csv_file() {
let expected_contents = "timestamp,value\n1,1.01\n5,1.22\n";
let samples = vec![
Sample::new(1, 1.01),
Sample::new(5, 1.22),
];
let samples = vec![Sample::new(1, 1.01), Sample::new(5, 1.22)];

let temp_dir = TempDir::new("test_write_samples").expect("Unable to create temporary directory");
let temp_dir =
TempDir::new("test_write_samples").expect("Unable to create temporary directory");
let path = temp_dir.path().join("samples.csv");

let result = write_samples_to_csv_file(&path, &samples);
Expand All @@ -72,17 +67,16 @@ mod tests {
#[test]
fn test_read_samples_from_csv_file() {
let csv_content = "timestamp,value\n1,1.01\n5,1.22\n";
let expected_samples = vec![
Sample::new(1, 1.01),
Sample::new(5, 1.22),
];
let expected_samples = vec![Sample::new(1, 1.01), Sample::new(5, 1.22)];

let temp_dir = TempDir::new("test_read_samples").expect("Unable to create temporary directory");
let temp_dir =
TempDir::new("test_read_samples").expect("Unable to create temporary directory");
let path = temp_dir.path().join("samples.csv");

// Writing content to test file
let mut file = File::create(&path).expect("Unable to create test file");
file.write_all(csv_content.as_bytes()).expect("Unable to write data");
file.write_all(csv_content.as_bytes())
.expect("Unable to write data");

let result = read_samples_from_csv_file(&path);
assert!(result.is_ok());
Expand All @@ -93,12 +87,10 @@ mod tests {

#[test]
fn test_write_and_read_samples() {
let samples = vec![
Sample::new(1, 1.01),
Sample::new(5, 1.22),
];
let samples = vec![Sample::new(1, 1.01), Sample::new(5, 1.22)];

let temp_dir = TempDir::new("test_write_and_read_samples").expect("Unable to create temporary directory");
let temp_dir = TempDir::new("test_write_and_read_samples")
.expect("Unable to create temporary directory");
let path = temp_dir.path().join("samples.csv");

let write_result = write_samples_to_csv_file(&path, &samples);
Expand Down
115 changes: 43 additions & 72 deletions csv-compressor/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,12 @@
use crate::csv::{SampleParser, SampleWriter};
use crate::metric::Metric;
use brro_compressor::compressor::Compressor;
use brro_compressor::data::CompressedStream;
use brro_compressor::optimizer::OptimizerPlan;
use brro_compressor::utils::readers::bro_reader::read_file;
use clap::{arg, Parser};
use log::debug;
use std::cell::RefCell;
use std::fs;
use std::fs::OpenOptions;
use std::path::PathBuf;
use std::rc::Rc;
use std::path::{Path, PathBuf};
use vsri::Vsri;
use wavbrro::wavbrro::WavBrro;

Expand Down Expand Up @@ -122,102 +118,78 @@ fn decompress_data(compressed_data: &[u8]) -> Vec<f64> {
cs.decompress()
}

pub fn process_csv(args: &Args) -> Metric {
let res = OpenOptions::new().read(true).write(true).open(&args.input);
if res.is_err() {
panic!(
"[PANIC] Failed to open file {}, err: {}",
&args.input.to_str().unwrap(),
res.err().unwrap()
)
}

let file = res.unwrap();
let mut parser = SampleParser::new(file);
let samples = parser.parse().expect("failed to parse samples");

/// process_csv opens and parses the content of file at path
pub fn process_csv(path: &Path) -> Metric {
let samples = csv::read_samples_from_csv_file(path).expect("failed to read samples from file");
Metric::from_samples(&samples).expect("failed to create metric from samples")
}

fn process_args(args: &Args) {
fn process_args(args: Args) {
let output_base = args
.output
.clone()
.unwrap_or(args.input.clone())
.to_str()
.unwrap()
.to_string();
.unwrap_or_else(|| args.input.clone())
.clone();

// uncompressing input
if args.uncompress {
debug!("Starting uncompressing of {}", args.input.to_str().unwrap());
debug!("Starting uncompressing of {:?}", &args.input);
if let Some(data) = read_file(&args.input).expect("failed to read bro file") {
// decomressing data and creating wavbrro from it
let decompressed_data = Rc::new(RefCell::new(decompress_data(&data)));
let decompressed_data = decompress_data(&data);
let mut wbro = WavBrro::new();
for data in decompressed_data.borrow().iter() {
for data in decompressed_data.iter() {
wbro.add_sample(*data);
}

// reading existing index
// // reading existing index
let mut vsri_file_path = args.input.clone();
vsri_file_path.set_extension("");
vsri_file_path.set_extension("vsri");
debug!("Reading vsri at {:?}", &output_base);
let index = Vsri::load(vsri_file_path.to_str().unwrap()).expect("failed to read vsri");

let metric = Metric::new(wbro, index);

let mut file_path = PathBuf::from(output_base);
let mut file_path = output_base.clone();
file_path.set_extension("wbro");

debug!(
"Writing uncompressed wavbrro to disk, path: {}",
&file_path.to_str().unwrap()
);
WavBrro::to_file_with_data(&file_path, &decompressed_data.borrow());
debug!("Writing uncompressed wavbrro to disk, path: {file_path:?}");
WavBrro::to_file_with_data(&file_path, &decompressed_data);

let samples = metric.get_samples();

// creating csv output file
let mut csv_file_path = file_path.clone();
csv_file_path.set_extension("csv");
debug!(
"Creating file for csv output at {}",
csv_file_path.to_str().unwrap()
);
let csv_file = OpenOptions::new()
.create(true)
.write(true)
.read(true)
.open(&csv_file_path)
.expect("failed to create csv output file");

let csv_file = Rc::new(RefCell::new(csv_file));

debug!("Writing samples into csv file");
let writer = SampleWriter::new(Rc::clone(&csv_file));
writer
.write_samples(&samples)
.expect("failed to write samples to csv")
csv::write_samples_to_csv_file(&csv_file_path, &samples)
.expect("failed to write samples to file")
}
} else {
debug!("Starting processing of {}", args.input.to_str().unwrap());
let metric = process_csv(args);
debug!("Starting processing of {:?}", args.input);
let metric = process_csv(&args.input);

if args.output_wavbrro {
metric.flush_wavbrro(&output_base);
let mut wavbro_file_path = output_base.clone();
wavbro_file_path.set_extension("wavbro");
metric.flush_wavbrro(&wavbro_file_path);
}

if args.output_vsri {
metric.flush_indexes(&output_base);
let mut vsri_file_path = output_base.clone();
vsri_file_path.set_extension("vsri");
metric
.flush_indexes(&vsri_file_path)
.expect("failed to flush vsri to the file");
}

// compressing input if no_compression is not set
if !args.no_compression {
debug!("Starting compressing");
let data = metric.wbro.get_samples();
let compressed = compress_data(&data, args);
let compressed = compress_data(&data, &args);

let mut file_path = PathBuf::from(output_base);
let mut file_path = output_base.clone();
file_path.set_extension("bro");

fs::write(&file_path, compressed).expect("failed to write compressed data");
Expand All @@ -229,20 +201,19 @@ fn main() {
env_logger::init();
let args = Args::parse();

let res = fs::metadata(&args.input);
if res.is_err() {
panic!(
"Failed to retrieve metadata of {}, err: {}",
&args.input.to_str().unwrap(),
res.err().unwrap()
)
}
let metadata = fs::metadata(&args.input);
match metadata {
Ok(metadata) => {
if !metadata.is_file() {
panic!("Input is not a file")
}

let metadata = res.unwrap();
if !metadata.is_file() {
panic!("Input is not a file")
debug!("Starting processing args {:?}", &args);
process_args(args);
}
Err(err) => panic!(
"Failed to retrieve metadata of {:?}, err: {err}",
&args.input
),
}

debug!("Starting processing args");
process_args(&args);
}
58 changes: 25 additions & 33 deletions csv-compressor/src/metric.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::csv::Sample;
use crate::metric::Error::UpdateForPointError;
use std::fmt::{Debug, Display, Formatter};
use std::path::PathBuf;
use std::path::Path;
use vsri::{day_elapsed_seconds, Vsri};
use wavbrro::wavbrro::WavBrro;

Expand All @@ -23,10 +22,10 @@ pub enum Error {
impl Display for Error {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
UpdateForPointError(sample) => {
Error::UpdateForPointError(sample) => {
write!(f, "updating for point failed, sample: {:?}", sample)
}
_ => write!(f, "unknown error occurred"),
Error::UnknownError => write!(f, "unknown error occurred"),
}
}
}
Expand All @@ -40,14 +39,13 @@ impl Metric {
}

/// Appends samples to the metric
pub fn append_samples(&mut self, samples: &Vec<Sample>) -> Result<(), Error> {
pub fn append_samples(&mut self, samples: &[Sample]) -> Result<(), Error> {
for sample in samples {
// For solution simplification it generates only 1 WavBrro and 1 VSRI
let ts = day_elapsed_seconds(sample.timestamp / 1000);
let result = self.vsri.update_for_point(ts);
if result.is_err() {
return Err(UpdateForPointError(sample.clone()));
}
self.vsri
.update_for_point(ts)
.map_err(|_| Error::UpdateForPointError(sample.clone()))?;

self.wbro.add_sample(sample.value);
}
Expand All @@ -56,39 +54,33 @@ impl Metric {
}

/// Creates default metric from the existing samples
pub fn from_samples(samples: &Vec<Sample>) -> Result<Self, Error> {
pub fn from_samples(samples: &[Sample]) -> Result<Self, Error> {
let mut metric = Metric::default();
match metric.append_samples(samples) {
Ok(_) => Ok(metric),
Err(err) => Err(err),
}
metric.append_samples(samples)?;
Ok(metric)
}

/// Flushes underlying WavBrro formatted metrics to the file at path
pub fn flush_wavbrro(&self, path: &String) {
let mut file_path = PathBuf::from(path);
file_path.set_extension("wbro");
self.wbro.to_file(&file_path)
pub fn flush_wavbrro(&self, path: &Path) {
self.wbro.to_file(path)
}

/// Flushes underlying VSRI to the file at path
pub fn flush_indexes(&self, path: &String) {
let mut file_path = PathBuf::from(path);
file_path.set_extension("vsri");
self.vsri
.flush_to(&file_path)
.expect("Failed to write indexes!")
pub fn flush_indexes(&self, path: &Path) -> Result<(), std::io::Error> {
self.vsri.flush_to(path)
}

/// Returns vector of Samples by iterating over data inside underlying WavBrro
/// and getting timestamp for each of data point from VSRI
pub fn get_samples(&self) -> Vec<Sample> {
let mut samples = Vec::new();
let values = self.wbro.clone().get_samples();
for (i, value) in values.iter().enumerate() {
let ts = self.vsri.get_time(i as i32);
samples.push(Sample::new(ts.unwrap() as i64, *value));
}
samples
/// and getting timestamp for each of data point from VSRI.
pub fn get_samples(self) -> Vec<Sample> {
self.wbro
.get_samples()
.iter()
.enumerate()
.map(|(i, value)| {
let ts = self.vsri.get_time(i as i32);
Sample::new(ts.unwrap() as i64, *value)
})
.collect()
}
}
2 changes: 1 addition & 1 deletion vsri/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ impl Vsri {
/// TODO: Add error control (Unwrap hell)
pub fn load(filename: &str) -> Result<Self, std::io::Error> {
debug!("[INDEX] Load existing index");
let file = File::open(format!("{}.vsri", &filename))?;
let file = File::open(filename)?;
let reader = BufReader::new(file);
let mut min_ts = 0;
let mut max_ts = 0;
Expand Down

0 comments on commit 9d7a2e4

Please sign in to comment.