Skip to content

Commit

Permalink
Merge pull request #65 from cita-cloud/opt_wal
Browse files Browse the repository at this point in the history
opt wal
  • Loading branch information
rink1969 authored Mar 26, 2024
2 parents 332ea17 + a2a214d commit f85056b
Showing 1 changed file with 13 additions and 21 deletions.
34 changes: 13 additions & 21 deletions src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use cita_cloud_proto::common::{
};
use cita_cloud_proto::network::NetworkMsg;
use cita_cloud_proto::status_code::StatusCodeEnum;
use cloud_util::wal::{LogType, Wal as CITAWal};
use creep::Context;
use overlord::types::{
AggregatedVote, Commit, Hash, Node, OverlordMsg, Proof, SignedChoke, SignedProposal,
Expand All @@ -39,7 +38,6 @@ use rlp::Encodable;
use rlp::Rlp;
use std::error::Error;
use std::fs;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use tokio::sync::RwLock;

Expand Down Expand Up @@ -294,17 +292,20 @@ impl Consensus {
}
}

const OVERLORD_WAL_NAME: &str = "overlord.wal";

#[derive(Clone)]
pub struct ConsensusWal {
wal: Arc<RwLock<CITAWal>>,
wal_count: Arc<AtomicU64>,
wal_file: Arc<RwLock<String>>,
}

impl ConsensusWal {
pub async fn new(wal_path: &str) -> Self {
fs::create_dir_all(wal_path).unwrap();
let wal_file = format!("{}/{}", wal_path, OVERLORD_WAL_NAME);

ConsensusWal {
wal: Arc::new(RwLock::new(CITAWal::create(wal_path).await.unwrap())),
wal_count: Arc::new(AtomicU64::new(0)),
wal_file: Arc::new(RwLock::new(wal_file)),
}
}
}
Expand All @@ -314,27 +315,18 @@ impl ConsensusWal {
impl Wal for ConsensusWal {
async fn save(&self, info: Bytes) -> Result<(), Box<dyn Error + Send>> {
info!("save wal!");
let current_wal_count = self.wal_count.load(Ordering::SeqCst);
let next_wal_count = current_wal_count.overflowing_add(1).0;
self.wal
.write()
.await
.save(next_wal_count, LogType::Skip, info.as_ref())
.await
.map_err(ConsensusError::WALErr)?;
self.wal_count.store(next_wal_count, Ordering::SeqCst);
let wal_file = self.wal_file.write().await;
fs::write(wal_file.as_str(), info.as_ref()).map_err(ConsensusError::WALErr)?;
Ok(())
}

async fn load(&self) -> Result<Option<Bytes>, Box<dyn Error + Send>> {
info!("load wal!");
let record = self.wal.write().await.load().await;
if record.is_empty() {
warn!("failed to load wal!");
Err(ConsensusError::Other("failed to load wal".to_string()).into())
} else {
let (_, info) = record[0].clone();
let wal_file = self.wal_file.read().await;
if let Ok(info) = fs::read(wal_file.as_str()) {
Ok(Some(Bytes::from(info)))
} else {
Ok(None)
}
}
}
Expand Down

0 comments on commit f85056b

Please sign in to comment.