Skip to content

Commit

Permalink
feat: force flush non-overflowed in-memory buffers (#310)
Browse files Browse the repository at this point in the history
* feat: force flush non-overflowed in-memory buffers

* fix: increase shutdown period

NOTE: necessary for safely handling force flush to disk

* log: successful force flush

* doc: update comment

* fix: `try_recv` to trigger storage flush

NOTE: needs to be verified for any realistic possibility of data loss

* fix: ignore empty buffers

* fix: write not read buffer

* fix: ignore empty write buffers

* fix: wait max 2 secs for data flushed from bridge
  • Loading branch information
Devdutt Shenoi committed Dec 1, 2023
1 parent 3cc82ff commit 0f07a82
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 3 deletions.
10 changes: 10 additions & 0 deletions storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ pub enum Error {
NotBackup,
#[error("Corrupted backup file")]
CorruptedFile,
#[error("Empty write buffer")]
NoWrites,
}

pub struct Storage {
Expand Down Expand Up @@ -85,6 +87,14 @@ impl Storage {
return Ok(None);
}

self.flush()
}

/// Force flush the contents of write buffer onto disk
pub fn flush(&mut self) -> Result<Option<u64>, Error> {
if self.current_write_file.is_empty() {
return Err(Error::NoWrites);
}
match &mut self.persistence {
Some(persistence) => {
let hash = hash(&self.current_write_file[..]);
Expand Down
22 changes: 20 additions & 2 deletions uplink/src/base/serializer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ mod metrics;

use std::collections::{HashMap, VecDeque};
use std::io::{self, Write};
use std::time::Instant;
use std::{sync::Arc, time::Duration};

use bytes::Bytes;
Expand Down Expand Up @@ -58,6 +59,8 @@ pub enum Error {
EmptyStorage,
#[error("Permission denied while accessing persistence directory \"{0}\"")]
Persistence(String),
#[error("Serializer has shutdown after handling crash")]
Shutdown,
}

#[derive(Debug, PartialEq)]
Expand Down Expand Up @@ -203,6 +206,16 @@ impl StorageHandler {

None
}

fn flush_all(&mut self) {
for (stream_name, storage) in self.map.iter_mut() {
match storage.flush() {
Ok(_) => trace!("Force flushed stream = {stream_name} onto disk"),
Err(storage::Error::NoWrites) => {}
Err(e) => error!("Error when force flushing storage = {stream_name}; error = {e}"),
}
}
}
}

pub struct SerializerShutdown;
Expand Down Expand Up @@ -298,8 +311,13 @@ impl<C: MqttClient> Serializer<C> {
}

loop {
// Collect next data packet and write to disk
let data = self.collector_rx.recv_async().await?;
// Collect remaining data packets and write to disk
// NOTE: wait 2s to allow bridge to shutdown and flush leftover data.
let deadline = Instant::now() + Duration::from_secs(2);
let Ok(data) = self.collector_rx.recv_deadline(deadline) else {
self.storage_handler.flush_all();
return Err(Error::Shutdown);
};
let publish = construct_publish(data)?;
let storage = self.storage_handler.select(&publish.topic);
match write_to_disk(publish, storage) {
Expand Down
2 changes: 1 addition & 1 deletion uplink/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ fn main() -> Result<(), Error> {
uplink.resolve_on_shutdown().await.unwrap();
info!("Uplink shutting down...");
// NOTE: wait 5s to allow serializer to write to network/disk
sleep(Duration::from_secs(5)).await;
sleep(Duration::from_secs(10)).await;
});

Ok(())
Expand Down

0 comments on commit 0f07a82

Please sign in to comment.