From 0fb4dcf43a59b00f68852d7b895a9a31e78f9486 Mon Sep 17 00:00:00 2001 From: joschisan <122358257+joschisan@users.noreply.github.com> Date: Wed, 1 May 2024 22:53:49 +0200 Subject: [PATCH] feat: create BackupWriter and BackupReader traits for async unit backup --- consensus/src/backup/loader.rs | 15 +++++++------- consensus/src/backup/saver.rs | 16 +++++++-------- consensus/src/member.rs | 15 ++++++++------ consensus/src/runway/mod.rs | 16 ++++++++------- types/src/backup.rs | 37 ++++++++++++++++++++++++++++++++++ types/src/lib.rs | 2 ++ 6 files changed, 71 insertions(+), 30 deletions(-) create mode 100644 types/src/backup.rs diff --git a/consensus/src/backup/loader.rs b/consensus/src/backup/loader.rs index 97fefb7b..c263e963 100644 --- a/consensus/src/backup/loader.rs +++ b/consensus/src/backup/loader.rs @@ -2,11 +2,11 @@ use std::{ collections::HashSet, fmt::{self, Debug}, marker::PhantomData, - pin::Pin, }; +use aleph_bft_types::BackupReader; use codec::{Decode, Error as CodecError}; -use futures::{channel::oneshot, AsyncRead, AsyncReadExt}; +use futures::{channel::oneshot}; use log::{error, info, warn}; use crate::{ @@ -68,17 +68,17 @@ impl From for LoaderError { } } -pub struct BackupLoader { - backup: Pin>, +pub struct BackupLoader { + backup: R, index: NodeIndex, session_id: SessionId, _phantom: PhantomData<(H, D, S)>, } -impl BackupLoader { +impl BackupLoader { pub fn new(backup: R, index: NodeIndex, session_id: SessionId) -> BackupLoader { BackupLoader { - backup: Box::pin(backup), + backup, index, session_id, _phantom: PhantomData, @@ -86,8 +86,7 @@ impl BackupLoader { } async fn load(&mut self) -> Result>, LoaderError> { - let mut buf = Vec::new(); - self.backup.read_to_end(&mut buf).await?; + let buf = self.backup.read().await?; let input = &mut &buf[..]; let mut result = Vec::new(); while !input.is_empty() { diff --git a/consensus/src/backup/saver.rs b/consensus/src/backup/saver.rs index 252c0f9b..19cbcbe1 100644 --- a/consensus/src/backup/saver.rs +++ b/consensus/src/backup/saver.rs @@ -1,26 +1,25 @@ -use std::pin::Pin; - use crate::{ dag::DagUnit, units::{UncheckedSignedUnit, WrappedUnit}, Data, Hasher, MultiKeychain, Receiver, Sender, Terminator, }; use codec::Encode; -use futures::{AsyncWrite, AsyncWriteExt, FutureExt, StreamExt}; +use futures::{ FutureExt, StreamExt}; use log::{debug, error}; +use aleph_bft_types::BackupWriter; const LOG_TARGET: &str = "AlephBFT-backup-saver"; /// Component responsible for saving units into backup. /// It waits for items to appear on its receivers, and writes them to backup. /// It announces a successful write through an appropriate response sender. -pub struct BackupSaver { +pub struct BackupSaver { units_from_runway: Receiver>, responses_for_runway: Sender>, - backup: Pin>, + backup: W, } -impl BackupSaver { +impl BackupSaver { pub fn new( units_from_runway: Receiver>, responses_for_runway: Sender>, @@ -29,14 +28,13 @@ impl BackupSaver) -> Result<(), std::io::Error> { let unit: UncheckedSignedUnit<_, _, _> = unit.clone().unpack().into(); - self.backup.write_all(&unit.encode()).await?; - self.backup.flush().await + self.backup.append(&unit.encode()).await } pub async fn run(&mut self, mut terminator: Terminator) { diff --git a/consensus/src/member.rs b/consensus/src/member.rs index 73fe67fb..cf2ab5f3 100644 --- a/consensus/src/member.rs +++ b/consensus/src/member.rs @@ -13,12 +13,13 @@ use crate::{ }; use aleph_bft_types::NodeMap; use codec::{Decode, Encode}; -use futures::{channel::mpsc, pin_mut, AsyncRead, AsyncWrite, FutureExt, StreamExt}; +use futures::{channel::mpsc, pin_mut, FutureExt, StreamExt}; use futures_timer::Delay; use itertools::Itertools; use log::{debug, error, info, trace, warn}; use network::NetworkData; use rand::{prelude::SliceRandom, Rng}; +use aleph_bft_types::BackupWriter; use std::{ collections::HashSet, convert::TryInto, @@ -26,6 +27,8 @@ use std::{ marker::PhantomData, time::Duration, }; +use aleph_bft_types::BackupReader; + /// A message concerning units, either about new units or some requests for them. #[derive(Clone, Eq, PartialEq, Debug, Decode, Encode)] @@ -111,8 +114,8 @@ pub struct LocalIO< D: Data, DP: DataProvider, FH: FinalizationHandler, - US: AsyncWrite, - UL: AsyncRead, + US: BackupWriter, + UL: BackupReader, > { data_provider: DP, finalization_handler: FH, @@ -121,7 +124,7 @@ pub struct LocalIO< _phantom: PhantomData, } -impl, FH: FinalizationHandler, US: AsyncWrite, UL: AsyncRead> +impl, FH: FinalizationHandler, US: BackupWriter, UL: BackupReader> LocalIO { pub fn new( @@ -578,8 +581,8 @@ pub async fn run_session< D: Data, DP: DataProvider, FH: FinalizationHandler, - US: AsyncWrite + Send + Sync + 'static, - UL: AsyncRead + Send + Sync + 'static, + US: BackupWriter + Send + Sync + 'static, + UL: BackupReader + Send + Sync + 'static, N: Network> + 'static, SH: SpawnHandle, MK: MultiKeychain, diff --git a/consensus/src/runway/mod.rs b/consensus/src/runway/mod.rs index 7dcf0243..7e8d7b62 100644 --- a/consensus/src/runway/mod.rs +++ b/consensus/src/runway/mod.rs @@ -14,11 +14,13 @@ use crate::{ UncheckedSigned, }; use aleph_bft_types::Recipient; +use aleph_bft_types::BackupReader; use futures::{ channel::{mpsc, oneshot}, future::pending, - pin_mut, AsyncRead, AsyncWrite, Future, FutureExt, StreamExt, + pin_mut, Future, FutureExt, StreamExt, }; +use aleph_bft_types::BackupWriter; use futures_timer::Delay; use itertools::Itertools; use log::{debug, error, info, trace, warn}; @@ -667,8 +669,8 @@ pub struct RunwayIO< H: Hasher, D: Data, MK: MultiKeychain, - W: AsyncWrite + Send + Sync + 'static, - R: AsyncRead + Send + Sync + 'static, + W: BackupWriter + Send + Sync + 'static, + R: BackupReader + Send + Sync + 'static, DP: DataProvider, FH: FinalizationHandler, > { @@ -683,8 +685,8 @@ impl< H: Hasher, D: Data, MK: MultiKeychain, - W: AsyncWrite + Send + Sync + 'static, - R: AsyncRead + Send + Sync + 'static, + W: BackupWriter + Send + Sync + 'static, + R: BackupReader + Send + Sync + 'static, DP: DataProvider, FH: FinalizationHandler, > RunwayIO @@ -715,8 +717,8 @@ pub(crate) async fn run( ) where H: Hasher, D: Data, - US: AsyncWrite + Send + Sync + 'static, - UL: AsyncRead + Send + Sync + 'static, + US: BackupWriter + Send + Sync + 'static, + UL: BackupReader + Send + Sync + 'static, DP: DataProvider, FH: FinalizationHandler, MK: MultiKeychain, diff --git a/types/src/backup.rs b/types/src/backup.rs new file mode 100644 index 00000000..6c7da2ae --- /dev/null +++ b/types/src/backup.rs @@ -0,0 +1,37 @@ +use async_trait::async_trait; +use std::marker::Unpin; +use futures::io::{AsyncRead, AsyncWrite}; +use futures::{AsyncReadExt, AsyncWriteExt}; + + + +#[async_trait] +/// Write backups to peristent storage. +pub trait BackupWriter { + /// Append new data to the backup. + async fn append(&mut self, data: &[u8]) -> std::io::Result<()>; +} + +#[async_trait] +impl BackupWriter for W { + async fn append(&mut self, data: &[u8]) -> std::io::Result<()> { + self.write_all(data).await?; + self.flush().await + } +} + + +#[async_trait] +pub trait BackupReader { + /// Read the entire backup. + async fn read(&mut self) -> std::io::Result>; +} + +#[async_trait] +impl BackupReader for R { + async fn read(&mut self) -> std::io::Result> { + let mut buf = Vec::new(); + self.read_to_end(&mut buf).await?; + Ok(buf) + } +} diff --git a/types/src/lib.rs b/types/src/lib.rs index 4c247403..993ba191 100644 --- a/types/src/lib.rs +++ b/types/src/lib.rs @@ -3,6 +3,7 @@ mod dataio; mod network; mod tasks; +mod backup; pub use aleph_bft_crypto::{ IncompleteMultisignatureError, Index, Indexed, Keychain, MultiKeychain, Multisigned, NodeCount, @@ -12,6 +13,7 @@ pub use aleph_bft_crypto::{ pub use dataio::{DataProvider, FinalizationHandler}; pub use network::{Network, Recipient}; pub use tasks::{SpawnHandle, TaskHandle}; +pub use backup::{BackupReader, BackupWriter}; use codec::Codec; use std::{fmt::Debug, hash::Hash as StdHash};