Skip to content

Commit

Permalink
feat: create BackupWriter and BackupReader traits for async unit backup
Browse files Browse the repository at this point in the history
  • Loading branch information
joschisan committed May 1, 2024
1 parent 416d27c commit 0fb4dcf
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 30 deletions.
15 changes: 7 additions & 8 deletions consensus/src/backup/loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ use std::{
collections::HashSet,
fmt::{self, Debug},
marker::PhantomData,
pin::Pin,
};
use aleph_bft_types::BackupReader;

use codec::{Decode, Error as CodecError};
use futures::{channel::oneshot, AsyncRead, AsyncReadExt};
use futures::{channel::oneshot};
use log::{error, info, warn};

use crate::{
Expand Down Expand Up @@ -68,26 +68,25 @@ impl From<CodecError> for LoaderError {
}
}

pub struct BackupLoader<H: Hasher, D: Data, S: Signature, R: AsyncRead> {
backup: Pin<Box<R>>,
pub struct BackupLoader<H: Hasher, D: Data, S: Signature, R: BackupReader> {
backup: R,
index: NodeIndex,
session_id: SessionId,
_phantom: PhantomData<(H, D, S)>,
}

impl<H: Hasher, D: Data, S: Signature, R: AsyncRead> BackupLoader<H, D, S, R> {
impl<H: Hasher, D: Data, S: Signature, R: BackupReader> BackupLoader<H, D, S, R> {
pub fn new(backup: R, index: NodeIndex, session_id: SessionId) -> BackupLoader<H, D, S, R> {
BackupLoader {
backup: Box::pin(backup),
backup,
index,
session_id,
_phantom: PhantomData,
}
}

async fn load(&mut self) -> Result<Vec<UncheckedSignedUnit<H, D, S>>, LoaderError> {
let mut buf = Vec::new();
self.backup.read_to_end(&mut buf).await?;
let buf = self.backup.read().await?;
let input = &mut &buf[..];
let mut result = Vec::new();
while !input.is_empty() {
Expand Down
16 changes: 7 additions & 9 deletions consensus/src/backup/saver.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,25 @@
use std::pin::Pin;

use crate::{
dag::DagUnit,
units::{UncheckedSignedUnit, WrappedUnit},
Data, Hasher, MultiKeychain, Receiver, Sender, Terminator,
};
use codec::Encode;
use futures::{AsyncWrite, AsyncWriteExt, FutureExt, StreamExt};
use futures::{ FutureExt, StreamExt};
use log::{debug, error};
use aleph_bft_types::BackupWriter;

const LOG_TARGET: &str = "AlephBFT-backup-saver";

/// Component responsible for saving units into backup.
/// It waits for items to appear on its receivers, and writes them to backup.
/// It announces a successful write through an appropriate response sender.
pub struct BackupSaver<H: Hasher, D: Data, MK: MultiKeychain, W: AsyncWrite> {
pub struct BackupSaver<H: Hasher, D: Data, MK: MultiKeychain, W: BackupWriter> {
units_from_runway: Receiver<DagUnit<H, D, MK>>,
responses_for_runway: Sender<DagUnit<H, D, MK>>,
backup: Pin<Box<W>>,
backup: W,
}

impl<H: Hasher, D: Data, MK: MultiKeychain, W: AsyncWrite> BackupSaver<H, D, MK, W> {
impl<H: Hasher, D: Data, MK: MultiKeychain, W: BackupWriter> BackupSaver<H, D, MK, W> {
pub fn new(
units_from_runway: Receiver<DagUnit<H, D, MK>>,
responses_for_runway: Sender<DagUnit<H, D, MK>>,
Expand All @@ -29,14 +28,13 @@ impl<H: Hasher, D: Data, MK: MultiKeychain, W: AsyncWrite> BackupSaver<H, D, MK,
BackupSaver {
units_from_runway,
responses_for_runway,
backup: Box::pin(backup),
backup,
}
}

pub async fn save_unit(&mut self, unit: &DagUnit<H, D, MK>) -> Result<(), std::io::Error> {
let unit: UncheckedSignedUnit<_, _, _> = unit.clone().unpack().into();
self.backup.write_all(&unit.encode()).await?;
self.backup.flush().await
self.backup.append(&unit.encode()).await
}

pub async fn run(&mut self, mut terminator: Terminator) {
Expand Down
15 changes: 9 additions & 6 deletions consensus/src/member.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,22 @@ use crate::{
};
use aleph_bft_types::NodeMap;
use codec::{Decode, Encode};
use futures::{channel::mpsc, pin_mut, AsyncRead, AsyncWrite, FutureExt, StreamExt};
use futures::{channel::mpsc, pin_mut, FutureExt, StreamExt};
use futures_timer::Delay;
use itertools::Itertools;
use log::{debug, error, info, trace, warn};
use network::NetworkData;
use rand::{prelude::SliceRandom, Rng};
use aleph_bft_types::BackupWriter;
use std::{
collections::HashSet,
convert::TryInto,
fmt::{self, Debug},
marker::PhantomData,
time::Duration,
};
use aleph_bft_types::BackupReader;


/// A message concerning units, either about new units or some requests for them.
#[derive(Clone, Eq, PartialEq, Debug, Decode, Encode)]
Expand Down Expand Up @@ -111,8 +114,8 @@ pub struct LocalIO<
D: Data,
DP: DataProvider<D>,
FH: FinalizationHandler<D>,
US: AsyncWrite,
UL: AsyncRead,
US: BackupWriter,
UL: BackupReader,
> {
data_provider: DP,
finalization_handler: FH,
Expand All @@ -121,7 +124,7 @@ pub struct LocalIO<
_phantom: PhantomData<D>,
}

impl<D: Data, DP: DataProvider<D>, FH: FinalizationHandler<D>, US: AsyncWrite, UL: AsyncRead>
impl<D: Data, DP: DataProvider<D>, FH: FinalizationHandler<D>, US: BackupWriter, UL: BackupReader>
LocalIO<D, DP, FH, US, UL>
{
pub fn new(
Expand Down Expand Up @@ -578,8 +581,8 @@ pub async fn run_session<
D: Data,
DP: DataProvider<D>,
FH: FinalizationHandler<D>,
US: AsyncWrite + Send + Sync + 'static,
UL: AsyncRead + Send + Sync + 'static,
US: BackupWriter + Send + Sync + 'static,
UL: BackupReader + Send + Sync + 'static,
N: Network<NetworkData<H, D, MK::Signature, MK::PartialMultisignature>> + 'static,
SH: SpawnHandle,
MK: MultiKeychain,
Expand Down
16 changes: 9 additions & 7 deletions consensus/src/runway/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@ use crate::{
UncheckedSigned,
};
use aleph_bft_types::Recipient;
use aleph_bft_types::BackupReader;
use futures::{
channel::{mpsc, oneshot},
future::pending,
pin_mut, AsyncRead, AsyncWrite, Future, FutureExt, StreamExt,
pin_mut, Future, FutureExt, StreamExt,
};
use aleph_bft_types::BackupWriter;
use futures_timer::Delay;
use itertools::Itertools;
use log::{debug, error, info, trace, warn};
Expand Down Expand Up @@ -667,8 +669,8 @@ pub struct RunwayIO<
H: Hasher,
D: Data,
MK: MultiKeychain,
W: AsyncWrite + Send + Sync + 'static,
R: AsyncRead + Send + Sync + 'static,
W: BackupWriter + Send + Sync + 'static,
R: BackupReader + Send + Sync + 'static,
DP: DataProvider<D>,
FH: FinalizationHandler<D>,
> {
Expand All @@ -683,8 +685,8 @@ impl<
H: Hasher,
D: Data,
MK: MultiKeychain,
W: AsyncWrite + Send + Sync + 'static,
R: AsyncRead + Send + Sync + 'static,
W: BackupWriter + Send + Sync + 'static,
R: BackupReader + Send + Sync + 'static,
DP: DataProvider<D>,
FH: FinalizationHandler<D>,
> RunwayIO<H, D, MK, W, R, DP, FH>
Expand Down Expand Up @@ -715,8 +717,8 @@ pub(crate) async fn run<H, D, US, UL, MK, DP, FH, SH>(
) where
H: Hasher,
D: Data,
US: AsyncWrite + Send + Sync + 'static,
UL: AsyncRead + Send + Sync + 'static,
US: BackupWriter + Send + Sync + 'static,
UL: BackupReader + Send + Sync + 'static,
DP: DataProvider<D>,
FH: FinalizationHandler<D>,
MK: MultiKeychain,
Expand Down
37 changes: 37 additions & 0 deletions types/src/backup.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
use async_trait::async_trait;
use std::marker::Unpin;
use futures::io::{AsyncRead, AsyncWrite};
use futures::{AsyncReadExt, AsyncWriteExt};



#[async_trait]
/// Write backups to peristent storage.
pub trait BackupWriter {
/// Append new data to the backup.
async fn append(&mut self, data: &[u8]) -> std::io::Result<()>;
}

#[async_trait]
impl<W: AsyncWrite + Send + Unpin> BackupWriter for W {
async fn append(&mut self, data: &[u8]) -> std::io::Result<()> {
self.write_all(data).await?;
self.flush().await
}
}


#[async_trait]
pub trait BackupReader {
/// Read the entire backup.
async fn read(&mut self) -> std::io::Result<Vec<u8>>;
}

#[async_trait]
impl<R: AsyncRead + Send + Unpin> BackupReader for R {
async fn read(&mut self) -> std::io::Result<Vec<u8>> {
let mut buf = Vec::new();
self.read_to_end(&mut buf).await?;
Ok(buf)
}
}
2 changes: 2 additions & 0 deletions types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
mod dataio;
mod network;
mod tasks;
mod backup;

pub use aleph_bft_crypto::{
IncompleteMultisignatureError, Index, Indexed, Keychain, MultiKeychain, Multisigned, NodeCount,
Expand All @@ -12,6 +13,7 @@ pub use aleph_bft_crypto::{
pub use dataio::{DataProvider, FinalizationHandler};
pub use network::{Network, Recipient};
pub use tasks::{SpawnHandle, TaskHandle};
pub use backup::{BackupReader, BackupWriter};

use codec::Codec;
use std::{fmt::Debug, hash::Hash as StdHash};
Expand Down

0 comments on commit 0fb4dcf

Please sign in to comment.