Skip to content

Commit

Permalink
Change: Raft::begin_receiving_snapshot() does not need to check Vote
Browse files Browse the repository at this point in the history
Because `begin_receiving_snapshot` is a read operation and does not
break raft protocol.
  • Loading branch information
drmingdrmer committed Mar 8, 2024
1 parent 2cc7170 commit 9dc88f0
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 46 deletions.
4 changes: 2 additions & 2 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1117,8 +1117,8 @@ where

self.handle_vote_request(rpc, tx);
}
RaftMsg::BeginReceivingSnapshot { vote, tx } => {
self.engine.handle_begin_receiving_snapshot(vote, tx);
RaftMsg::BeginReceivingSnapshot { tx } => {
self.engine.handle_begin_receiving_snapshot(tx);
}
RaftMsg::InstallFullSnapshot { vote, snapshot, tx } => {
self.engine.handle_install_full_snapshot(vote, snapshot, tx);
Expand Down
12 changes: 5 additions & 7 deletions openraft/src/core/raft_msg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,11 @@ where C: RaftTypeConfig

/// Begin receiving a snapshot from the leader.
///
/// Returns a handle to a snapshot data ready for receiving if successful.
/// Otherwise, it is an error because of the `vote` is not GE the local `vote`, the local `vote`
/// will be returned in a Err
/// Returns a snapshot data handle for receiving data.
///
/// It does not check [`Vote`] because it is a read operation
/// and does not break raft protocol.
BeginReceivingSnapshot {
vote: Vote<C::NodeId>,
tx: ResultSender<C, Box<SnapshotDataOf<C>>, HigherVote<C::NodeId>>,
},

Expand Down Expand Up @@ -123,9 +123,7 @@ where C: RaftTypeConfig
RaftMsg::RequestVote { rpc, .. } => {
format!("RequestVote: {}", rpc.summary())
}
RaftMsg::BeginReceivingSnapshot { vote, .. } => {
format!("BeginReceivingSnapshot: vote: {}", vote)
}
RaftMsg::BeginReceivingSnapshot { .. } => "BeginReceivingSnapshot".to_string(),
RaftMsg::InstallFullSnapshot { vote, snapshot, .. } => {
format!("InstallFullSnapshot: vote: {}, snapshot: {}", vote, snapshot)
}
Expand Down
17 changes: 1 addition & 16 deletions openraft/src/engine/engine_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use crate::error::InitializeError;
use crate::error::NotAllowed;
use crate::error::NotInMembers;
use crate::error::RejectAppendEntries;
use crate::error::RejectVoteRequest;
use crate::internal_server_state::InternalServerState;
use crate::membership::EffectiveMembership;
use crate::raft::AppendEntriesResponse;
Expand Down Expand Up @@ -490,23 +489,9 @@ where C: RaftTypeConfig
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn handle_begin_receiving_snapshot(
&mut self,
vote: Vote<C::NodeId>,
tx: ResultSender<C, Box<SnapshotDataOf<C>>, HigherVote<C::NodeId>>,
) {
tracing::info!(vote = display(vote), "{}", func_name!());

let vote_res = self.vote_handler().update_vote(&vote);

if let Err(e) = vote_res {
if let RejectVoteRequest::ByVote(v) = e {
let res = Err(HigherVote { higher: v, mine: vote });
let _ = tx.send(res);
} else {
unreachable!("unexpected error: {:?}", e);
}
return;
}

tracing::info!("{}", func_name!());
self.output.push_command(Command::from(sm::Command::begin_receiving_snapshot(tx)));
}

Expand Down
16 changes: 12 additions & 4 deletions openraft/src/raft/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ use crate::raft::runtime_config_handle::RuntimeConfigHandle;
use crate::raft::trigger::Trigger;
use crate::storage::RaftLogStorage;
use crate::storage::RaftStateMachine;
use crate::type_config::alias::AsyncRuntimeOf;
use crate::type_config::alias::SnapshotDataOf;
use crate::AsyncRuntime;
use crate::ChangeMembers;
Expand Down Expand Up @@ -368,12 +369,11 @@ where C: RaftTypeConfig
#[tracing::instrument(level = "debug", skip_all)]
pub async fn begin_receiving_snapshot(
&self,
vote: Vote<C::NodeId>,
) -> Result<Box<SnapshotDataOf<C>>, RaftError<C::NodeId, HigherVote<C::NodeId>>> {
tracing::info!("Raft::begin_receiving_snapshot()");

let (tx, rx) = C::AsyncRuntime::oneshot();
let resp = self.inner.call_core(RaftMsg::BeginReceivingSnapshot { vote, tx }, rx).await?;
let (tx, rx) = AsyncRuntimeOf::<C>::oneshot();
let resp = self.inner.call_core(RaftMsg::BeginReceivingSnapshot { tx }, rx).await?;
Ok(resp)
}

Expand Down Expand Up @@ -431,6 +431,14 @@ where C: RaftTypeConfig
let req_vote = req.vote;
let snapshot_id = &req.meta.snapshot_id;

let my_vote = self.with_raft_state(|state| *state.vote_ref()).await?;
if req_vote >= my_vote {
// Ok
} else {
tracing::info!("vote {} is rejected by local vote: {}", req_vote, my_vote);
return Ok(InstallSnapshotResponse { vote: my_vote });
}

let mut streaming = self.inner.snapshot.lock().await;

let curr_id = streaming.as_ref().map(|s| s.snapshot_id());
Expand All @@ -450,7 +458,7 @@ where C: RaftTypeConfig
return Err(RaftError::APIError(mismatch));
}
// Changed to another stream. re-init snapshot state.
let res = self.begin_receiving_snapshot(req_vote).await;
let res = self.begin_receiving_snapshot().await;
let snapshot_data = match res {
Ok(snapshot_data) => snapshot_data,
Err(raft_err) => {
Expand Down
18 changes: 1 addition & 17 deletions tests/tests/client_api/t13_begin_receiving_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@ use std::sync::Arc;
use std::time::Duration;

use maplit::btreeset;
use openraft::error::HigherVote;
use openraft::Config;
use openraft::Vote;

use crate::fixtures::init_default_ut_tracing;
use crate::fixtures::RaftRouter;
Expand Down Expand Up @@ -35,24 +33,10 @@ async fn begin_receiving_snapshot() -> anyhow::Result<()> {
router.wait(&1, timeout()).applied_index(Some(log_index), "write more log").await?;
}

tracing::info!(log_index, "--- fails to execute with smaller vote");
{
let n1 = router.get_raft_handle(&1)?;

let res = n1.begin_receiving_snapshot(Vote::new(0, 0)).await;
assert_eq!(
HigherVote {
higher: Vote::new_committed(1, 0),
mine: Vote::new(0, 0),
},
res.unwrap_err().into_api_error().unwrap()
);
}

tracing::info!(log_index, "--- got a snapshot data");
{
let n1 = router.get_raft_handle(&1)?;
let _resp = n1.begin_receiving_snapshot(Vote::new_committed(1, 0)).await?;
let _resp = n1.begin_receiving_snapshot().await?;
}

Ok(())
Expand Down

0 comments on commit 9dc88f0

Please sign in to comment.