From cd780413e711e5009321520d46e1979cb36cb854 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Mon, 14 Oct 2024 12:10:11 +0530 Subject: [PATCH] refactor: split into `save_read_buffer` --- storage/src/lib.rs | 116 ++++++++++++++++++------------ uplink/src/base/serializer/mod.rs | 28 +++++--- 2 files changed, 87 insertions(+), 57 deletions(-) diff --git a/storage/src/lib.rs b/storage/src/lib.rs index 25a138ab..113ad032 100644 --- a/storage/src/lib.rs +++ b/storage/src/lib.rs @@ -5,7 +5,6 @@ use seahash::hash; use std::collections::VecDeque; use std::fs::{self, OpenOptions}; use std::io::{self, copy, Write}; -use std::mem; use std::path::{Path, PathBuf}; #[derive(thiserror::Error, Debug)] @@ -23,7 +22,7 @@ pub enum Error { } pub struct Storage { - name: String, + pub name: String, /// maximum allowed file size max_file_size: usize, /// current open file @@ -99,34 +98,61 @@ impl Storage { self.flush() } - /// Force flush the contents of write buffer onto disk - pub fn flush(&mut self) -> Result, Error> { - if self.current_write_file.is_empty() { - return Err(Error::NoWrites); - } - - let Some(persistence) = &mut self.persistence else { + fn write_to_persistence( + persistence: &mut Option, + name: &String, + buf: &mut BytesMut, + ) -> Result, Error> { + let Some(persistence) = persistence else { // TODO(RT): Make sure that disk files starts with id 1 to represent in memory file // with id 0 - self.current_write_file.clear(); + buf.clear(); warn!( "Persistence disabled for storage: {}. Deleted in-memory buffer on overflow", - self.name + name ); return Ok(Some(0)); }; let NextFile { mut file, deleted } = persistence.open_next_write_file()?; - info!("Flushing data to disk for stoarge: {}; path = {:?}", self.name, file.path()); - file.write(&mut self.current_write_file)?; + info!("Flushing data to disk for stoarge: {}; path = {:?}", name, file.path()); + file.write(buf)?; // 8 is the number of bytes the hash(u64) occupies - persistence.bytes_occupied += 8 + self.current_write_file.len(); - self.current_write_file.clear(); + persistence.bytes_occupied += 8 + buf.len(); + buf.clear(); Ok(deleted) } + /// Force flush the contents of write buffer onto disk + pub fn flush(&mut self) -> Result, Error> { + if self.current_write_file.is_empty() { + return Err(Error::NoWrites); + } + + Self::write_to_persistence(&mut self.persistence, &self.name, &mut self.current_write_file) + } + + /// Force flush the contents of read buffer onto disk + pub fn save_read_buffer(&mut self) -> Result<(), Error> { + if self.persistence.as_ref().is_some_and(|p| p.current_read_file_id.is_some()) + || self.current_read_file.is_empty() + { + return Err(Error::NoWrites); + } + + if let Some(deleted) = Self::write_to_persistence( + &mut self.persistence, + &self.name, + &mut self.current_read_file, + )? { + error!("Persistence deleted during flushing of read buffer: {deleted}"); + } + + Ok(()) + } + /// Loads head file to current inmemory read buffer. Deletes /// the file after loading. If all the disk data is caught up, /// swaps current write buffer to current read buffer if there @@ -138,41 +164,34 @@ impl Storage { return Ok(()); } - let Some(persistence) = &mut self.persistence else { - mem::swap(&mut self.current_read_file, &mut self.current_write_file); - // If read buffer is 0 after swapping, all the data is caught up - if self.current_read_file.is_empty() { - return Err(Error::Done); + if let Some(persistence) = &mut self.persistence { + // Remove read file on completion in destructive-read mode + if let Some(id) = + persistence.current_read_file_id.take_if(|_| !persistence.non_destructive_read) + { + let deleted_file = persistence.remove(id)?; + debug!("Completed reading a persistence file, deleting it; storage = {}, path = {deleted_file:?}", self.name); } - return Ok(()); - }; - - // Remove read file on completion in destructive-read mode - let read_is_destructive = !persistence.non_destructive_read; - let read_file_id = persistence.current_read_file_id.take(); - if let Some(id) = read_is_destructive.then_some(read_file_id).flatten() { - let deleted_file = persistence.remove(id)?; - debug!("Completed reading a persistence file, deleting it; storage = {}, path = {deleted_file:?}", self.name); - } - - // Swap read buffer with write buffer to read data in inmemory write - // buffer when all the backlog disk files are done - if persistence.backlog_files.is_empty() { - mem::swap(&mut self.current_read_file, &mut self.current_write_file); - // If read buffer is 0 after swapping, all the data is caught up - if self.current_read_file.is_empty() { - return Err(Error::Done); + match persistence.load_next_read_file(&mut self.current_read_file) { + Err(Error::Done) => {} + Err(e) => { + error!("Couldn't read persisted file: {e}"); + return Err(e); + } + _ => return Ok(()), } + }; - return Ok(()); + if self.current_write_file.is_empty() { + return Err(Error::Done); } - if let Err(e) = persistence.load_next_read_file(&mut self.current_read_file) { - self.current_read_file.clear(); - persistence.current_read_file_id.take(); - return Err(e); - } + // Swap read buffer with write buffer to read data from inmemory write + // buffer when all the backlog disk files are done. + std::mem::swap(&mut self.current_read_file, &mut self.current_write_file); + // Write buffer is emptied to ensure fresh start. + self.current_write_file.clear(); Ok(()) } @@ -602,17 +621,22 @@ mod test { assert!(storage.persistence.as_ref().unwrap().current_read_file_id.is_none()); // Trigger flush onto disk, and drop storage - storage.flush().unwrap(); + matches!(storage.flush().unwrap_err(), super::Error::NoWrites); + storage.save_read_buffer().unwrap(); drop(storage); // reload storage let mut storage = Storage::new("test", 10 * 1036); storage.set_persistence(backup.path(), 10).unwrap(); + assert_eq!(storage.file_count(), 1); // verify read buffer was persisted by reading a single packet read_n_publishes(&mut storage, 1); - assert_eq!(storage.file_count(), 1); + assert_eq!(storage.file_count(), 0); let file_id = storage.persistence.as_ref().unwrap().current_read_file_id.unwrap(); assert_eq!(file_id, 0); + + // verify no more data left to be read + matches!(storage.reload_on_eof().unwrap_err(), super::Error::Done); } } diff --git a/uplink/src/base/serializer/mod.rs b/uplink/src/base/serializer/mod.rs index 798bd862..e9519f08 100644 --- a/uplink/src/base/serializer/mod.rs +++ b/uplink/src/base/serializer/mod.rs @@ -183,8 +183,21 @@ impl Storage { } // Ensures all data is written into persistence, when configured. - pub fn flush(&mut self) -> Result, storage::Error> { - self.inner.flush() + pub fn flush(&mut self) { + match self.inner.save_read_buffer() { + Ok(()) => trace!("Force flushed read buffer onto disk; stream = {}", self.inner.name), + Err(storage::Error::NoWrites) => {} + Err(e) => { + error!("Error = {e}; storage = {}", self.inner.name) + } + } + match self.inner.flush() { + Ok(_) => trace!("Force flushed write buffer onto disk; stream = {}", self.inner.name), + Err(storage::Error::NoWrites) => {} + Err(e) => { + error!("Error = {e}; storage = {}", self.inner.name) + } + } } } @@ -287,15 +300,8 @@ impl StorageHandler { // Force flushes all in-memory buffers to ensure zero packet loss during uplink restart. // TODO: Ensure packets in read-buffer but not on disk are not lost. fn flush_all(&mut self) { - for (stream_config, storage) in self.map.iter_mut() { - match storage.flush() { - Ok(_) => trace!("Force flushed stream = {} onto disk", stream_config.topic), - Err(storage::Error::NoWrites) => {} - Err(e) => error!( - "Error when force flushing storage = {}; error = {e}", - stream_config.topic - ), - } + for storage in self.map.values_mut() { + storage.flush(); } }