Skip to content

Commit

Permalink
Refactor: save committed log id for examples
Browse files Browse the repository at this point in the history
Other changes: upgrade nightly to 2023-12-21, fix lint warning

- Fix: #910
  • Loading branch information
drmingdrmer committed Dec 21, 2023
1 parent 87e5bd5 commit 069e6eb
Show file tree
Hide file tree
Showing 10 changed files with 84 additions and 11 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ serde_json = "1.0.57"
tempfile = { version = "3.4.0" }
thiserror = "1.0.49"
tokio = { version="1.8", default-features=false, features=["fs", "io-util", "macros", "rt", "rt-multi-thread", "sync", "time"] }
tracing = { version = "0.1.29" }
tracing = { version = "0.1.40" }
tracing-appender = "0.2.0"
tracing-futures = "0.2.4"
tracing-subscriber = { version = "0.3.3", features=["env-filter"] }
Expand Down
13 changes: 13 additions & 0 deletions examples/raft-kv-memstore/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ pub struct LogStore {
/// The Raft log.
log: RwLock<BTreeMap<u64, Entry<TypeConfig>>>,

committed: RwLock<Option<LogId<NodeId>>>,

/// The current granted vote.
vote: RwLock<Option<Vote<NodeId>>>,
}
Expand Down Expand Up @@ -282,6 +284,17 @@ impl RaftLogStorage<TypeConfig> for Arc<LogStore> {
})
}

async fn save_committed(&mut self, committed: Option<LogId<NodeId>>) -> Result<(), StorageError<NodeId>> {
let mut c = self.committed.write().await;
*c = committed;
Ok(())
}

async fn read_committed(&mut self) -> Result<Option<LogId<NodeId>>, StorageError<NodeId>> {
let committed = self.committed.read().await;
Ok(*committed)
}

#[tracing::instrument(level = "trace", skip(self))]
async fn save_vote(&mut self, vote: &Vote<NodeId>) -> Result<(), StorageError<NodeId>> {
let mut v = self.vote.write().await;
Expand Down
2 changes: 1 addition & 1 deletion examples/raft-kv-rocksdb/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ tide = { version = "0.16" }
# for toy-rpc, use `serde_json` instead of the default `serde_bincode`:
# bincode which enabled by default by toy-rpc, does not support `#[serde(flatten)]`: https://docs.rs/bincode/2.0.0-alpha.1/bincode/serde/index.html#known-issues
toy-rpc = { version = "0.8.6", default-features = false, features = [ "serde_json", "ws_async_std", "server", "client", "async_std_runtime", ] }
tracing = "0.1.29"
tracing = "0.1.40"
tracing-subscriber = { version = "0.3.0", features = ["env-filter"] }

[dev-dependencies]
Expand Down
22 changes: 22 additions & 0 deletions examples/raft-kv-rocksdb/src/network/raft_network_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,28 @@ fn to_error<E: std::error::Error + 'static + Clone>(e: toy_rpc::Error, target: N
}
}

// With nightly-2023-12-20, and `err(Debug)` in the instrument macro, this gives the following lint
// warning. Without `err(Debug)` it is OK. Suppress it with `#[allow(clippy::blocks_in_conditions)]`
//
// warning: in a `match` scrutinee, avoid complex blocks or closures with blocks; instead, move the
// block or closure higher and bind it with a `let`
//
// --> src/network/raft_network_impl.rs:99:91
// |
// 99 | ) -> Result<AppendEntriesResponse<NodeId>, RPCError<NodeId, Node, RaftError<NodeId>>>
// {
// | ___________________________________________________________________________________________^
// 100 | | tracing::debug!(req = debug(&req), "send_append_entries");
// 101 | |
// 102 | | let c = self.c().await?;
// ... |
// 108 | | raft.append(req).await.map_err(|e| to_error(e, self.target))
// 109 | | }
// | |_____^
// |
// = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#blocks_in_conditions
// = note: `#[warn(clippy::blocks_in_conditions)]` on by default
#[allow(clippy::blocks_in_conditions)]
#[async_trait]
impl RaftNetwork<TypeConfig> for NetworkConnection {
#[tracing::instrument(level = "debug", skip_all, err(Debug))]
Expand Down
29 changes: 29 additions & 0 deletions examples/raft-kv-rocksdb/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,25 @@ impl LogStore {
Ok(())
}

fn set_committed_(&self, committed: &Option<LogId<NodeId>>) -> Result<(), StorageIOError<NodeId>> {
let json = serde_json::to_vec(committed).unwrap();

self.db.put_cf(self.store(), b"committed", json).map_err(|e| StorageIOError::write(&e))?;

self.flush(ErrorSubject::Store, ErrorVerb::Write)?;
Ok(())
}

fn get_committed_(&self) -> StorageResult<Option<LogId<NodeId>>> {
Ok(self
.db
.get_cf(self.store(), b"committed")
.map_err(|e| StorageError::IO {
source: StorageIOError::read(&e),
})?
.and_then(|v| serde_json::from_slice(&v).ok()))
}

fn set_vote_(&self, vote: &Vote<NodeId>) -> StorageResult<()> {
self.db
.put_cf(self.store(), b"vote", serde_json::to_vec(vote).unwrap())
Expand Down Expand Up @@ -402,6 +421,16 @@ impl RaftLogStorage<TypeConfig> for LogStore {
})
}

async fn save_committed(&mut self, _committed: Option<LogId<NodeId>>) -> Result<(), StorageError<NodeId>> {
self.set_committed_(&_committed)?;
Ok(())
}

async fn read_committed(&mut self) -> Result<Option<LogId<NodeId>>, StorageError<NodeId>> {
let c = self.get_committed_()?;
Ok(c)
}

#[tracing::instrument(level = "trace", skip(self))]
async fn save_vote(&mut self, vote: &Vote<NodeId>) -> Result<(), StorageError<NodeId>> {
self.set_vote_(vote)
Expand Down
13 changes: 10 additions & 3 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,10 @@ where
};

// TODO: do not spawn, manage read requests with a queue by RaftCore
C::AsyncRuntime::spawn(waiting_fu.instrument(tracing::debug_span!("spawn_is_leader_waiting")));

// False positive lint warning(`non-binding `let` on a future`): https://github.com/rust-lang/rust-clippy/issues/9932
#[allow(clippy::let_underscore_future)]
let _ = C::AsyncRuntime::spawn(waiting_fu.instrument(tracing::debug_span!("spawn_is_leader_waiting")));
}

/// Submit change-membership by writing a Membership log entry.
Expand Down Expand Up @@ -1002,7 +1005,9 @@ where
let id = self.id;
let option = RPCOption::new(ttl);

C::AsyncRuntime::spawn(
// False positive lint warning(`non-binding `let` on a future`): https://github.com/rust-lang/rust-clippy/issues/9932
#[allow(clippy::let_underscore_future)]
let _ = C::AsyncRuntime::spawn(
async move {
let tm_res = C::AsyncRuntime::timeout(ttl, client.vote(req, option)).await;
let res = match tm_res {
Expand Down Expand Up @@ -1577,7 +1582,9 @@ where
let leader_id = self.current_leader();
let leader_node = self.get_leader_node(leader_id);

AsyncRuntimeOf::<C>::spawn(async move {
// False positive lint warning(`non-binding `let` on a future`): https://github.com/rust-lang/rust-clippy/issues/9932
#[allow(clippy::let_underscore_future)]
let _ = AsyncRuntimeOf::<C>::spawn(async move {
for (log_index, tx) in removed.into_iter() {
let res = tx.send(Err(ClientWriteError::ForwardToLeader(ForwardToLeader {
leader_id,
Expand Down
6 changes: 3 additions & 3 deletions openraft/src/membership/membership_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ fn test_membership() -> anyhow::Result<()> {
let m123 = Membership::<u64, ()>::new(vec![btreeset! {1,2,3}], None);
let m123_345 = Membership::<u64, ()>::new(vec![btreeset! {1,2,3}, btreeset! {3,4,5}], None);

assert_eq!(Some(btreeset! {1}), m1.get_joint_config().get(0).cloned());
assert_eq!(Some(btreeset! {1,2,3}), m123.get_joint_config().get(0).cloned());
assert_eq!(Some(btreeset! {1,2,3}), m123_345.get_joint_config().get(0).cloned());
assert_eq!(Some(btreeset! {1}), m1.get_joint_config().first().cloned());
assert_eq!(Some(btreeset! {1,2,3}), m123.get_joint_config().first().cloned());
assert_eq!(Some(btreeset! {1,2,3}), m123_345.get_joint_config().first().cloned());

assert_eq!(None, m1.get_joint_config().get(1).cloned());
assert_eq!(None, m123.get_joint_config().get(1).cloned());
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/metrics/wait_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ async fn test_wait() -> anyhow::Result<()> {

assert_eq!(
btreeset![1, 2],
got.membership_config.membership().get_joint_config().get(0).unwrap().clone()
got.membership_config.membership().get_joint_config().first().unwrap().clone()
);
}

Expand Down
4 changes: 3 additions & 1 deletion openraft/src/timer/timeout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ impl<RT: AsyncRuntime> RaftTimer<RT> for Timeout<RT> {
inner: inner.clone(),
};

RT::spawn(inner.sleep_loop(rx, callback).instrument(trace_span!("timeout-loop").or_current()));
// False positive lint warning(`non-binding `let` on a future`): https://github.com/rust-lang/rust-clippy/issues/9932
#[allow(clippy::let_underscore_future)]
let _ = RT::spawn(inner.sleep_loop(rx, callback).instrument(trace_span!("timeout-loop").or_current()));

t
}
Expand Down
2 changes: 1 addition & 1 deletion rust-toolchain
Original file line number Diff line number Diff line change
@@ -1 +1 @@
nightly-2023-10-17
nightly-2023-12-21

0 comments on commit 069e6eb

Please sign in to comment.