Skip to content

Commit

Permalink
doc: update comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Devdutt Shenoi committed Oct 12, 2024
1 parent 95a8404 commit 01280af
Showing 1 changed file with 17 additions and 16 deletions.
33 changes: 17 additions & 16 deletions uplink/src/base/serializer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,13 +144,11 @@ impl Storage {
&mut self,
backup_path: impl Into<PathBuf>,
max_file_count: usize,
) -> Result<(), Error> {
self.inner.set_persistence(backup_path.into(), max_file_count)?;

Ok(())
) -> Result<(), storage::Error> {
self.inner.set_persistence(backup_path.into(), max_file_count)
}

// Stores the provided publish packet in a cache or writes it to [Storage], after setting its pkid to 1.
// Stores the provided publish packet by serializing it into storage, after setting its pkid to 1.
// If the write buffer is full, it is flushed/written onto disk based on config.
pub fn write(&mut self, mut publish: Publish) -> Result<Option<u64>, storage::Error> {
publish.pkid = 1;
Expand All @@ -159,14 +157,18 @@ impl Storage {
return Ok(None);
}

let deleted = self.inner.flush_on_overflow()?;
Ok(deleted)
self.inner.flush_on_overflow()
}

pub fn reload_on_eof(&mut self) -> Result<bool, Error> {
self.inner.reload_on_eof().map_err(Error::from)
// Ensures read-buffer is ready to read from, exchanges buffers if required, returns true if empty.
pub fn reload_on_eof(&mut self) -> Result<bool, storage::Error> {
self.inner.reload_on_eof()
}

// Deserializes publish packets from storage, returns None when it fails.
//
// ## Panic
// When any packet other than a publish is deserialized.
pub fn read(&mut self, max_packet_size: usize) -> Option<Publish> {
// TODO(RT): This can fail when packet sizes > max_payload_size in config are written to disk.
// This leads to force switching to normal mode. Increasing max_payload_size to bypass this
Expand All @@ -180,10 +182,9 @@ impl Storage {
}
}

pub fn flush(&mut self) -> Result<Option<u64>, Error> {
let deleted = self.inner.flush()?;

Ok(deleted)
// Ensures all data is written into persistence, when configured.
pub fn flush(&mut self) -> Result<Option<u64>, storage::Error> {
self.inner.flush()
}
}

Expand All @@ -210,7 +211,7 @@ impl StorageHandler {
std::fs::create_dir_all(&path).map_err(|_| {
Error::Persistence(config.persistence_path.to_string_lossy().to_string())
})?;
storage.inner.set_persistence(&path, stream_config.persistence.max_file_count)?;
storage.set_persistence(&path, stream_config.persistence.max_file_count)?;

debug!(
"Disk persistance is enabled for stream: {stream_name:?}; path: {}",
Expand Down Expand Up @@ -293,7 +294,7 @@ impl StorageHandler {

fn flush_all(&mut self) {
for (stream_config, storage) in self.map.iter_mut() {
match storage.inner.flush() {
match storage.flush() {
Ok(_) => trace!("Force flushed stream = {} onto disk", stream_config.topic),
Err(storage::Error::NoWrites) => {}
Err(e) => error!(
Expand Down Expand Up @@ -867,7 +868,7 @@ pub mod tests {
use super::*;

fn read_from_storage(storage: &mut Storage, max_packet_size: usize) -> Publish {
if storage.inner.reload_on_eof().unwrap() {
if storage.reload_on_eof().unwrap() {
panic!("No publishes found in storage");
}

Expand Down

0 comments on commit 01280af

Please sign in to comment.