diff --git a/storage/src/lib.rs b/storage/src/lib.rs index 412f2919..9272d78c 100644 --- a/storage/src/lib.rs +++ b/storage/src/lib.rs @@ -18,6 +18,8 @@ pub enum Error { CorruptedFile, #[error("Empty write buffer")] NoWrites, + #[error("All backups have been consumed")] + Done, } pub struct Storage { @@ -129,17 +131,21 @@ impl Storage { /// the file after loading. If all the disk data is caught up, /// swaps current write buffer to current read buffer if there /// is pending data in memory write buffer. - /// Returns true if all the messages are caught up - pub fn reload_on_eof(&mut self) -> Result { + /// Returns Error::Done if all the messages are caught up + pub fn reload_on_eof(&mut self) -> Result<(), Error> { // Don't reload if there is data in current read file if self.current_read_file.has_remaining() { - return Ok(false); + return Ok(()); } let Some(persistence) = &mut self.persistence else { mem::swap(&mut self.current_read_file, &mut self.current_write_file); // If read buffer is 0 after swapping, all the data is caught up - return Ok(self.current_read_file.is_empty()); + if self.current_read_file.is_empty() { + return Err(Error::Done); + } + + return Ok(()); }; // Remove read file on completion in destructive-read mode @@ -155,7 +161,11 @@ impl Storage { if persistence.backlog_files.is_empty() { mem::swap(&mut self.current_read_file, &mut self.current_write_file); // If read buffer is 0 after swapping, all the data is caught up - return Ok(self.current_read_file.is_empty()); + if self.current_read_file.is_empty() { + return Err(Error::Done); + } + + return Ok(()); } if let Err(e) = persistence.load_next_read_file(&mut self.current_read_file) { @@ -164,7 +174,7 @@ impl Storage { return Err(e); } - Ok(false) + Ok(()) } } @@ -379,16 +389,18 @@ impl Persistence { Ok(NextFile { file: PersistenceFile::new(&self.path, file_name)?, deleted }) } - /// Load the next persistence file to be read into memory + /// Load the next persistence file to be read into memory, returns Error::Done if there is none left. fn load_next_read_file(&mut self, current_read_file: &mut BytesMut) -> Result<(), Error> { - // Len always > 0 because of above if. Doesn't panic - let id = self.backlog_files.pop_front().unwrap(); + let Some(id) = self.backlog_files.pop_front() else { + self.current_read_file_id.take(); + return Err(Error::Done); + }; let file_name = format!("backup@{id}"); let mut file = PersistenceFile::new(&self.path, file_name)?; // Load file into memory and store its id for deleting in the future file.read(current_read_file)?; - self.current_read_file_id = Some(id); + self.current_read_file_id.replace(id); Ok(()) } @@ -423,7 +435,7 @@ mod test { let mut publishes = vec![]; for _ in 0..n { // Done reading all the pending files - if storage.reload_on_eof().unwrap() { + if let Err(super::Error::Done) = storage.reload_on_eof() { break; } diff --git a/uplink/src/base/serializer/mod.rs b/uplink/src/base/serializer/mod.rs index 1853b5d4..798bd862 100644 --- a/uplink/src/base/serializer/mod.rs +++ b/uplink/src/base/serializer/mod.rs @@ -161,7 +161,7 @@ impl Storage { } // Ensures read-buffer is ready to be read from, exchanges buffers if required, returns true if empty. - pub fn reload_on_eof(&mut self) -> Result { + pub fn reload_on_eof(&mut self) -> Result<(), storage::Error> { self.inner.reload_on_eof() } @@ -250,13 +250,8 @@ impl StorageHandler { for (stream, storage) in storages { match storage.reload_on_eof() { - Ok(true) => { - if self.read_stream.take_if(|s| s == stream).is_some() { - debug!("Done reading from: {}", stream.topic); - } - } // Reading from a non-empty persisted stream - Ok(false) => { + Ok(_) => { if self.read_stream.is_none() { self.read_stream.replace(stream.to_owned()); debug!("Started reading from: {}", stream.topic); @@ -271,6 +266,12 @@ impl StorageHandler { return Some((stream.to_owned(), publish)); } + // All packets read from storage + Err(storage::Error::Done) => { + if self.read_stream.take_if(|s| s == stream).is_some() { + debug!("Done reading from: {}", stream.topic); + } + } // Reload again on encountering a corrupted file Err(e) => { metrics.increment_errors(); @@ -834,7 +835,7 @@ pub mod tests { use super::*; fn read_from_storage(storage: &mut Storage, max_packet_size: usize) -> Publish { - if storage.reload_on_eof().unwrap() { + if let Err(storage::Error::Done) = storage.reload_on_eof() { panic!("No publishes found in storage"); }