Skip to content

Commit

Permalink
refactor: split into save_read_buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
Devdutt Shenoi committed Oct 14, 2024
1 parent 678a616 commit cd78041
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 57 deletions.
116 changes: 70 additions & 46 deletions storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use seahash::hash;
use std::collections::VecDeque;
use std::fs::{self, OpenOptions};
use std::io::{self, copy, Write};
use std::mem;
use std::path::{Path, PathBuf};

#[derive(thiserror::Error, Debug)]
Expand All @@ -23,7 +22,7 @@ pub enum Error {
}

pub struct Storage {
name: String,
pub name: String,
/// maximum allowed file size
max_file_size: usize,
/// current open file
Expand Down Expand Up @@ -99,34 +98,61 @@ impl Storage {
self.flush()
}

/// Force flush the contents of write buffer onto disk
pub fn flush(&mut self) -> Result<Option<u64>, Error> {
if self.current_write_file.is_empty() {
return Err(Error::NoWrites);
}

let Some(persistence) = &mut self.persistence else {
fn write_to_persistence(
persistence: &mut Option<Persistence>,
name: &String,
buf: &mut BytesMut,
) -> Result<Option<u64>, Error> {
let Some(persistence) = persistence else {
// TODO(RT): Make sure that disk files starts with id 1 to represent in memory file
// with id 0
self.current_write_file.clear();
buf.clear();
warn!(
"Persistence disabled for storage: {}. Deleted in-memory buffer on overflow",
self.name
name
);
return Ok(Some(0));
};

let NextFile { mut file, deleted } = persistence.open_next_write_file()?;
info!("Flushing data to disk for stoarge: {}; path = {:?}", self.name, file.path());
file.write(&mut self.current_write_file)?;
info!("Flushing data to disk for stoarge: {}; path = {:?}", name, file.path());
file.write(buf)?;

// 8 is the number of bytes the hash(u64) occupies
persistence.bytes_occupied += 8 + self.current_write_file.len();
self.current_write_file.clear();
persistence.bytes_occupied += 8 + buf.len();
buf.clear();

Ok(deleted)
}

/// Force flush the contents of write buffer onto disk
pub fn flush(&mut self) -> Result<Option<u64>, Error> {
if self.current_write_file.is_empty() {
return Err(Error::NoWrites);
}

Self::write_to_persistence(&mut self.persistence, &self.name, &mut self.current_write_file)
}

/// Force flush the contents of read buffer onto disk
pub fn save_read_buffer(&mut self) -> Result<(), Error> {
if self.persistence.as_ref().is_some_and(|p| p.current_read_file_id.is_some())
|| self.current_read_file.is_empty()
{
return Err(Error::NoWrites);
}

if let Some(deleted) = Self::write_to_persistence(
&mut self.persistence,
&self.name,
&mut self.current_read_file,
)? {
error!("Persistence deleted during flushing of read buffer: {deleted}");
}

Ok(())
}

/// Loads head file to current inmemory read buffer. Deletes
/// the file after loading. If all the disk data is caught up,
/// swaps current write buffer to current read buffer if there
Expand All @@ -138,41 +164,34 @@ impl Storage {
return Ok(());
}

let Some(persistence) = &mut self.persistence else {
mem::swap(&mut self.current_read_file, &mut self.current_write_file);
// If read buffer is 0 after swapping, all the data is caught up
if self.current_read_file.is_empty() {
return Err(Error::Done);
if let Some(persistence) = &mut self.persistence {
// Remove read file on completion in destructive-read mode
if let Some(id) =
persistence.current_read_file_id.take_if(|_| !persistence.non_destructive_read)
{
let deleted_file = persistence.remove(id)?;
debug!("Completed reading a persistence file, deleting it; storage = {}, path = {deleted_file:?}", self.name);
}

return Ok(());
};

// Remove read file on completion in destructive-read mode
let read_is_destructive = !persistence.non_destructive_read;
let read_file_id = persistence.current_read_file_id.take();
if let Some(id) = read_is_destructive.then_some(read_file_id).flatten() {
let deleted_file = persistence.remove(id)?;
debug!("Completed reading a persistence file, deleting it; storage = {}, path = {deleted_file:?}", self.name);
}

// Swap read buffer with write buffer to read data in inmemory write
// buffer when all the backlog disk files are done
if persistence.backlog_files.is_empty() {
mem::swap(&mut self.current_read_file, &mut self.current_write_file);
// If read buffer is 0 after swapping, all the data is caught up
if self.current_read_file.is_empty() {
return Err(Error::Done);
match persistence.load_next_read_file(&mut self.current_read_file) {
Err(Error::Done) => {}
Err(e) => {
error!("Couldn't read persisted file: {e}");
return Err(e);
}
_ => return Ok(()),
}
};

return Ok(());
if self.current_write_file.is_empty() {
return Err(Error::Done);
}

if let Err(e) = persistence.load_next_read_file(&mut self.current_read_file) {
self.current_read_file.clear();
persistence.current_read_file_id.take();
return Err(e);
}
// Swap read buffer with write buffer to read data from inmemory write
// buffer when all the backlog disk files are done.
std::mem::swap(&mut self.current_read_file, &mut self.current_write_file);
// Write buffer is emptied to ensure fresh start.
self.current_write_file.clear();

Ok(())
}
Expand Down Expand Up @@ -602,17 +621,22 @@ mod test {
assert!(storage.persistence.as_ref().unwrap().current_read_file_id.is_none());

// Trigger flush onto disk, and drop storage
storage.flush().unwrap();
matches!(storage.flush().unwrap_err(), super::Error::NoWrites);
storage.save_read_buffer().unwrap();
drop(storage);

// reload storage
let mut storage = Storage::new("test", 10 * 1036);
storage.set_persistence(backup.path(), 10).unwrap();
assert_eq!(storage.file_count(), 1);

// verify read buffer was persisted by reading a single packet
read_n_publishes(&mut storage, 1);
assert_eq!(storage.file_count(), 1);
assert_eq!(storage.file_count(), 0);
let file_id = storage.persistence.as_ref().unwrap().current_read_file_id.unwrap();
assert_eq!(file_id, 0);

// verify no more data left to be read
matches!(storage.reload_on_eof().unwrap_err(), super::Error::Done);
}
}
28 changes: 17 additions & 11 deletions uplink/src/base/serializer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,21 @@ impl Storage {
}

// Ensures all data is written into persistence, when configured.
pub fn flush(&mut self) -> Result<Option<u64>, storage::Error> {
self.inner.flush()
pub fn flush(&mut self) {
match self.inner.save_read_buffer() {
Ok(()) => trace!("Force flushed read buffer onto disk; stream = {}", self.inner.name),
Err(storage::Error::NoWrites) => {}
Err(e) => {
error!("Error = {e}; storage = {}", self.inner.name)
}
}
match self.inner.flush() {
Ok(_) => trace!("Force flushed write buffer onto disk; stream = {}", self.inner.name),
Err(storage::Error::NoWrites) => {}
Err(e) => {
error!("Error = {e}; storage = {}", self.inner.name)
}
}
}
}

Expand Down Expand Up @@ -287,15 +300,8 @@ impl StorageHandler {
// Force flushes all in-memory buffers to ensure zero packet loss during uplink restart.
// TODO: Ensure packets in read-buffer but not on disk are not lost.
fn flush_all(&mut self) {
for (stream_config, storage) in self.map.iter_mut() {
match storage.flush() {
Ok(_) => trace!("Force flushed stream = {} onto disk", stream_config.topic),
Err(storage::Error::NoWrites) => {}
Err(e) => error!(
"Error when force flushing storage = {}; error = {e}",
stream_config.topic
),
}
for storage in self.map.values_mut() {
storage.flush();
}
}

Expand Down

0 comments on commit cd78041

Please sign in to comment.