diff --git a/storage/src/lib.rs b/storage/src/lib.rs index 03919e21..02a009a9 100644 --- a/storage/src/lib.rs +++ b/storage/src/lib.rs @@ -70,6 +70,13 @@ impl Storage { } } + pub fn disk_utilized(&self) -> usize { + match &self.persistence { + Some(p) => p.bytes_occupied, + None => 0, + } + } + pub fn inmemory_read_size(&self) -> usize { self.current_read_file.len() } @@ -97,6 +104,10 @@ impl Storage { next_file.file.write_all(&hash.to_be_bytes())?; next_file.file.write_all(&self.current_write_file[..])?; next_file.file.flush()?; + + // 8 is the number of bytes the hash(u64) occupies + persistence.bytes_occupied += 8 + self.current_write_file.len(); + self.current_write_file.clear(); Ok(next_file.deleted) } @@ -213,6 +224,8 @@ struct Persistence { // /// Deleted file id // deleted: Option, non_destructive_read: bool, + /// Disk space(in bytes) currently occupied by persistence files + bytes_occupied: usize, } impl Persistence { @@ -221,6 +234,13 @@ impl Persistence { let backlog_files = get_file_ids(&path)?; info!("List of file ids loaded from disk: {backlog_files:?}"); + let bytes_occupied = backlog_files.iter().fold(0, |acc, id| { + let mut file = PathBuf::from(&path); + let file_name = format!("backup@{id}"); + file.push(file_name); + fs::metadata(&file).unwrap().len() as usize + acc + }); + Ok(Persistence { path, max_file_count, @@ -228,6 +248,7 @@ impl Persistence { current_read_file_id: None, // deleted: None, non_destructive_read: false, + bytes_occupied, }) } @@ -239,8 +260,13 @@ impl Persistence { } /// Removes a file with provided id - fn remove(&self, id: u64) -> Result { + fn remove(&mut self, id: u64) -> Result { let path = self.path(id)?; + + // Query the fs to track size of removed persistence file + let metadata = fs::metadata(&path)?; + self.bytes_occupied -= metadata.len() as usize; + fs::remove_file(&path)?; Ok(path) diff --git a/uplink/src/base/serializer/metrics.rs b/uplink/src/base/serializer/metrics.rs index 2844aead..68e1fc43 100644 --- a/uplink/src/base/serializer/metrics.rs +++ b/uplink/src/base/serializer/metrics.rs @@ -17,6 +17,8 @@ pub struct SerializerMetrics { pub read_memory: usize, /// Number of files that have been written to disk pub disk_files: usize, + /// Disk size currently occupied by persistence files + pub disk_utilized: usize, /// Nuber of persistence files that had to deleted before being consumed pub lost_segments: usize, /// Number of errors faced during serializer operation @@ -35,6 +37,7 @@ impl SerializerMetrics { write_memory: 0, read_memory: 0, disk_files: 0, + disk_utilized: 0, lost_segments: 0, errors: 0, sent_size: 0, @@ -68,6 +71,10 @@ impl SerializerMetrics { self.disk_files = count; } + pub fn set_disk_utilized(&mut self, bytes: usize) { + self.disk_utilized = bytes; + } + pub fn increment_errors(&mut self) { self.errors += 1; } diff --git a/uplink/src/base/serializer/mod.rs b/uplink/src/base/serializer/mod.rs index 60330834..aedb4ff2 100644 --- a/uplink/src/base/serializer/mod.rs +++ b/uplink/src/base/serializer/mod.rs @@ -578,24 +578,28 @@ fn check_metrics(metrics: &mut SerializerMetrics, storage_handler: &StorageHandl let mut inmemory_write_size = 0; let mut inmemory_read_size = 0; let mut file_count = 0; + let mut disk_utilized = 0; for storage in storage_handler.map.values() { inmemory_read_size += storage.inmemory_read_size(); inmemory_write_size += storage.inmemory_write_size(); file_count += storage.file_count(); + disk_utilized += storage.disk_utilized(); } metrics.set_write_memory(inmemory_write_size); metrics.set_read_memory(inmemory_read_size); metrics.set_disk_files(file_count); + metrics.set_disk_utilized(disk_utilized); info!( - "{:>17}: batches = {:<3} errors = {} lost = {} disk_files = {:<3} write_memory = {} read_memory = {}", + "{:>17}: batches = {:<3} errors = {} lost = {} disk_files = {:<3} disk_utilized = {} write_memory = {} read_memory = {}", metrics.mode, metrics.batches, metrics.errors, metrics.lost_segments, metrics.disk_files, + convert(metrics.disk_utilized as f64), convert(metrics.write_memory as f64), convert(metrics.read_memory as f64), ); @@ -609,16 +613,19 @@ fn save_and_prepare_next_metrics( let mut inmemory_write_size = 0; let mut inmemory_read_size = 0; let mut file_count = 0; + let mut disk_utilized = 0; for storage in storage_handler.map.values() { inmemory_write_size += storage.inmemory_write_size(); inmemory_read_size += storage.inmemory_read_size(); file_count += storage.file_count(); + disk_utilized += storage.disk_utilized(); } metrics.set_write_memory(inmemory_write_size); metrics.set_read_memory(inmemory_read_size); metrics.set_disk_files(file_count); + metrics.set_disk_utilized(disk_utilized); let m = metrics.clone(); pending.push_back(m); @@ -637,27 +644,31 @@ fn check_and_flush_metrics( let mut inmemory_write_size = 0; let mut inmemory_read_size = 0; let mut file_count = 0; + let mut disk_utilized = 0; for storage in storage_handler.map.values() { inmemory_write_size += storage.inmemory_write_size(); inmemory_read_size += storage.inmemory_read_size(); file_count += storage.file_count(); + disk_utilized += storage.disk_utilized(); } metrics.set_write_memory(inmemory_write_size); metrics.set_read_memory(inmemory_read_size); metrics.set_disk_files(file_count); + metrics.set_disk_utilized(disk_utilized); // Send pending metrics. This signifies state change while let Some(metrics) = pending.get(0) { // Always send pending metrics. They represent state changes info!( - "{:>17}: batches = {:<3} errors = {} lost = {} disk_files = {:<3} write_memory = {} read_memory = {}", + "{:>17}: batches = {:<3} errors = {} lost = {} disk_files = {:<3} disk_utilized = {} write_memory = {} read_memory = {}", metrics.mode, metrics.batches, metrics.errors, metrics.lost_segments, metrics.disk_files, + convert(metrics.disk_utilized as f64), convert(metrics.write_memory as f64), convert(metrics.read_memory as f64), ); @@ -667,12 +678,13 @@ fn check_and_flush_metrics( if metrics.batches() > 0 { info!( - "{:>17}: batches = {:<3} errors = {} lost = {} disk_files = {:<3} write_memory = {} read_memory = {}", + "{:>17}: batches = {:<3} errors = {} lost = {} disk_files = {:<3} disk_utilized = {} write_memory = {} read_memory = {}", metrics.mode, metrics.batches, metrics.errors, metrics.lost_segments, metrics.disk_files, + convert(metrics.disk_utilized as f64), convert(metrics.write_memory as f64), convert(metrics.read_memory as f64), );