From a2a214d53a69a00f7fa5eed309164e6276375416 Mon Sep 17 00:00:00 2001 From: rink1969 Date: Tue, 26 Mar 2024 15:37:55 +0800 Subject: [PATCH] opt wal --- src/consensus.rs | 34 +++++++++++++--------------------- 1 file changed, 13 insertions(+), 21 deletions(-) diff --git a/src/consensus.rs b/src/consensus.rs index dbae24d..110eaf8 100755 --- a/src/consensus.rs +++ b/src/consensus.rs @@ -24,7 +24,6 @@ use cita_cloud_proto::common::{ }; use cita_cloud_proto::network::NetworkMsg; use cita_cloud_proto::status_code::StatusCodeEnum; -use cloud_util::wal::{LogType, Wal as CITAWal}; use creep::Context; use overlord::types::{ AggregatedVote, Commit, Hash, Node, OverlordMsg, Proof, SignedChoke, SignedProposal, @@ -39,7 +38,6 @@ use rlp::Encodable; use rlp::Rlp; use std::error::Error; use std::fs; -use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use tokio::sync::RwLock; @@ -294,17 +292,20 @@ impl Consensus { } } +const OVERLORD_WAL_NAME: &str = "overlord.wal"; + #[derive(Clone)] pub struct ConsensusWal { - wal: Arc>, - wal_count: Arc, + wal_file: Arc>, } impl ConsensusWal { pub async fn new(wal_path: &str) -> Self { + fs::create_dir_all(wal_path).unwrap(); + let wal_file = format!("{}/{}", wal_path, OVERLORD_WAL_NAME); + ConsensusWal { - wal: Arc::new(RwLock::new(CITAWal::create(wal_path).await.unwrap())), - wal_count: Arc::new(AtomicU64::new(0)), + wal_file: Arc::new(RwLock::new(wal_file)), } } } @@ -314,27 +315,18 @@ impl ConsensusWal { impl Wal for ConsensusWal { async fn save(&self, info: Bytes) -> Result<(), Box> { info!("save wal!"); - let current_wal_count = self.wal_count.load(Ordering::SeqCst); - let next_wal_count = current_wal_count.overflowing_add(1).0; - self.wal - .write() - .await - .save(next_wal_count, LogType::Skip, info.as_ref()) - .await - .map_err(ConsensusError::WALErr)?; - self.wal_count.store(next_wal_count, Ordering::SeqCst); + let wal_file = self.wal_file.write().await; + fs::write(wal_file.as_str(), info.as_ref()).map_err(ConsensusError::WALErr)?; Ok(()) } async fn load(&self) -> Result, Box> { info!("load wal!"); - let record = self.wal.write().await.load().await; - if record.is_empty() { - warn!("failed to load wal!"); - Err(ConsensusError::Other("failed to load wal".to_string()).into()) - } else { - let (_, info) = record[0].clone(); + let wal_file = self.wal_file.read().await; + if let Ok(info) = fs::read(wal_file.as_str()) { Ok(Some(Bytes::from(info))) + } else { + Ok(None) } } }