Skip to content

Commit

Permalink
[metasrv] refactor: move write_to_local_leader() to MetaLeader
Browse files Browse the repository at this point in the history
  • Loading branch information
drmingdrmer committed Nov 10, 2021
1 parent d22a1ae commit d7407fc
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 97 deletions.
3 changes: 3 additions & 0 deletions metasrv/src/meta_service/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ pub struct JoinRequest {
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, derive_more::TryInto)]
pub enum AdminRequestInner {
Join(JoinRequest),
Write(LogEntry),
}

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
Expand All @@ -52,8 +53,10 @@ impl AdminRequest {
}

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, derive_more::TryInto)]
#[allow(clippy::large_enum_variant)]
pub enum AdminResponse {
Join(()),
AppliedState(AppliedState),
}

impl tonic::IntoRequest<RaftRequest> for AdminRequest {
Expand Down
56 changes: 38 additions & 18 deletions metasrv/src/meta_service/meta_leader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
use std::collections::BTreeSet;

use async_raft::error::ResponseError;
use async_raft::raft::ClientWriteRequest;
use async_raft::ChangeConfigError;
use async_raft::ClientWriteError;
use common_meta_raft_store::state_machine::AppliedState;
use common_meta_types::Cmd;
use common_meta_types::LogEntry;
use common_meta_types::Node;
Expand All @@ -25,7 +28,6 @@ use common_tracing::tracing;
use crate::errors::ForwardToLeader;
use crate::errors::InvalidMembership;
use crate::errors::MetaError;
use crate::errors::RetryableError;
use crate::meta_service::message::AdminRequest;
use crate::meta_service::message::AdminResponse;
use crate::meta_service::AdminRequestInner;
Expand All @@ -52,6 +54,10 @@ impl<'a> MetaLeader<'a> {
self.join(join_req).await?;
Ok(AdminResponse::Join(()))
}
AdminRequestInner::Write(entry) => {
let res = self.write(entry).await?;
Ok(AdminResponse::AppliedState(res))
}
}
}

Expand Down Expand Up @@ -85,23 +91,7 @@ impl<'a> MetaLeader<'a> {
},
};

let res = self
.meta_node
.write_to_local_leader(ent.clone())
.await
.map_err(|e| MetaError::UnknownError(e.to_string()))?;
match res {
Ok(_applied_state) => {}
Err(retryable_error) => {
// TODO(xp): remove retryable error.
let leader_id = match retryable_error {
RetryableError::ForwardToLeader { leader } => leader,
};
return Err(MetaError::ForwardToLeader(ForwardToLeader {
leader: Some(leader_id),
}));
}
}
self.write(ent.clone()).await?;

self.change_membership(membership).await
}
Expand Down Expand Up @@ -138,4 +128,34 @@ impl<'a> MetaLeader<'a> {
_ => Err(MetaError::UnknownError("uncovered error".to_string())),
}
}

/// Write a log through local raft node and return the states before and after applying the log.
///
/// If the raft node is not a leader, it returns MetaError::ForwardToLeader.
/// If the leadership is lost during writing the log, it returns an UnknownError.
/// TODO(xp): elaborate the UnknownError, e.g. LeaderLostError
#[tracing::instrument(level = "debug", skip(self))]
pub async fn write(&self, entry: LogEntry) -> Result<AppliedState, MetaError> {
let write_rst = self
.meta_node
.raft
.client_write(ClientWriteRequest::new(entry))
.await;

tracing::debug!("raft.client_write rst: {:?}", write_rst);

match write_rst {
Ok(resp) => Ok(resp.data),
Err(cli_write_err) => match cli_write_err {
// fatal error
ClientWriteError::RaftError(raft_err) => {
Err(MetaError::UnknownError(raft_err.to_string()))
}
// retryable error
ClientWriteError::ForwardToLeader(_, leader) => {
Err(MetaError::ForwardToLeader(ForwardToLeader { leader }))
}
},
}
}
}
22 changes: 15 additions & 7 deletions metasrv/src/meta_service/meta_service_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::sync::Arc;
use common_meta_types::LogEntry;
use common_tracing::tracing;

use crate::errors::MetaError;
use crate::meta_service::message::AdminRequest;
use crate::meta_service::MetaNode;
use crate::proto::meta_service_server::MetaService;
Expand Down Expand Up @@ -53,14 +54,21 @@ impl MetaService for MetaServiceImpl {
let mes = request.into_inner();
let req: LogEntry = mes.try_into()?;

let rst = self
.meta_node
.write_to_local_leader(req)
.await
.map_err(|e| tonic::Status::internal(e.to_string()))?;
let leader = self.meta_node.as_leader().await;

let raft_mes = rst.into();
Ok(tonic::Response::new(raft_mes))
let leader = match leader {
Ok(x) => x,
Err(err) => {
let err: MetaError = err.into();
let raft_reply = Err::<(), _>(err).into();
return Ok(tonic::Response::new(raft_reply));
}
};

let rst = leader.write(req).await;

let raft_reply = rst.into();
Ok(tonic::Response::new(raft_reply))
}

#[tracing::instrument(level = "info", skip(self))]
Expand Down
13 changes: 10 additions & 3 deletions metasrv/src/meta_service/meta_service_impl_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ use common_tracing::tracing;
use log::info;
use pretty_assertions::assert_eq;

use crate::errors::ForwardToLeader;
use crate::errors::MetaError;
use crate::errors::RetryableError;
use crate::meta_service::MetaNode;
use crate::proto::meta_service_client::MetaServiceClient;
Expand Down Expand Up @@ -165,12 +167,17 @@ async fn test_meta_cluster_write_on_non_leader() -> anyhow::Result<()> {
};
let raft_mes = client.write(req).await?.into_inner();

let rst: Result<AppliedState, RetryableError> = raft_mes.into();
let rst: Result<AppliedState, MetaError> = raft_mes.into();
println!("{:?}", rst);

assert!(rst.is_err());
let err = rst.unwrap_err();
match err {
RetryableError::ForwardToLeader { leader } => {
assert_eq!(leader, 0);
MetaError::ForwardToLeader(ForwardToLeader { leader }) => {
assert_eq!(leader, Some(0));
}
_ => {
panic!("expect ForwardToLeader")
}
}

Expand Down
69 changes: 11 additions & 58 deletions metasrv/src/meta_service/raftmeta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ use std::collections::BTreeSet;
use std::sync::Arc;

use async_raft::config::Config;
use async_raft::raft::ClientWriteRequest;
use async_raft::ClientWriteError;
use async_raft::Raft;
use async_raft::RaftMetrics;
use async_raft::SnapshotPolicy;
Expand Down Expand Up @@ -46,10 +44,10 @@ use common_tracing::tracing::Instrument;
use crate::errors::ConnectionError;
use crate::errors::ForwardToLeader;
use crate::errors::MetaError;
use crate::errors::RetryableError;
use crate::meta_service::message::AdminRequest;
use crate::meta_service::message::AdminResponse;
use crate::meta_service::meta_leader::MetaLeader;
use crate::meta_service::AdminRequestInner;
use crate::meta_service::MetaServiceImpl;
use crate::meta_service::Network;
use crate::proto::meta_service_client::MetaServiceClient;
Expand Down Expand Up @@ -532,31 +530,16 @@ impl MetaNode {
/// Submit a write request to the known leader. Returns the response after applying the request.
#[tracing::instrument(level = "info", skip(self))]
pub async fn write(&self, req: LogEntry) -> common_exception::Result<AppliedState> {
let mut curr_leader = self.get_leader().await;
loop {
let rst = if curr_leader == self.sto.id {
self.write_to_local_leader(req.clone()).await?
} else {
// forward to leader

let addr = self.sto.get_node_addr(&curr_leader).await?;

// TODO: retry
let mut client = MetaServiceClient::connect(format!("http://{}", addr))
.await
.map_err(|e| ErrorCode::CannotConnectNode(e.to_string()))?;
let resp = client.write(req.clone()).await?;
let rst: Result<AppliedState, RetryableError> = resp.into_inner().into();
rst
};

match rst {
Ok(resp) => return Ok(resp),
Err(write_err) => match write_err {
RetryableError::ForwardToLeader { leader } => curr_leader = leader,
},
}
}
let res = self
.handle_admin_req(AdminRequest {
forward_to_leader: true,
req: AdminRequestInner::Write(req.clone()),
})
.await?;

let res: AppliedState = res.try_into().expect("expect AppliedState");

Ok(res)
}

/// Try to get the leader from the latest metrics of the local raft node.
Expand Down Expand Up @@ -616,34 +599,4 @@ impl MetaNode {
let res: Result<AdminResponse, MetaError> = raft_mes.into();
res
}

/// Write a meta log through local raft node.
/// It works only when this node is the leader,
/// otherwise it returns ClientWriteError::ForwardToLeader error indicating the latest leader.
#[tracing::instrument(level = "info", skip(self))]
pub async fn write_to_local_leader(
&self,
req: LogEntry,
) -> common_exception::Result<Result<AppliedState, RetryableError>> {
let write_rst = self.raft.client_write(ClientWriteRequest::new(req)).await;

tracing::debug!("raft.client_write rst: {:?}", write_rst);

match write_rst {
Ok(resp) => Ok(Ok(resp.data)),
Err(cli_write_err) => match cli_write_err {
// fatal error
ClientWriteError::RaftError(raft_err) => {
Err(ErrorCode::MetaServiceError(raft_err.to_string()))
}
// retryable error
ClientWriteError::ForwardToLeader(_, leader) => match leader {
Some(id) => Ok(Err(RetryableError::ForwardToLeader { leader: id })),
None => Err(ErrorCode::MetaServiceUnavailable(
"no leader to write".to_string(),
)),
},
},
}
}
}
30 changes: 19 additions & 11 deletions metasrv/src/meta_service/raftmeta_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@ use maplit::btreeset;
use pretty_assertions::assert_eq;

use crate::configs;
use crate::errors::RetryableError;
use crate::errors::ForwardToLeader;
use crate::errors::MetaError;
use crate::meta_service::message::AdminRequest;
use crate::meta_service::meta_leader::MetaLeader;
use crate::meta_service::AdminRequestInner;
use crate::meta_service::JoinRequest;
use crate::meta_service::MetaNode;
Expand Down Expand Up @@ -136,8 +138,9 @@ async fn test_meta_node_write_to_local_leader() -> anyhow::Result<()> {
let key = "t-non-leader-write";
for id in 0u64..4 {
let mn = &all[id as usize];
let rst = mn
.write_to_local_leader(LogEntry {
let maybe_leader = MetaLeader::new(mn);
let rst = maybe_leader
.write(LogEntry {
txid: None,
cmd: Cmd::UpsertKV {
key: key.to_string(),
Expand All @@ -148,16 +151,17 @@ async fn test_meta_node_write_to_local_leader() -> anyhow::Result<()> {
})
.await;

let rst = rst?;

if id == leader_id {
assert!(rst.is_ok());
} else {
assert!(rst.is_err());
let e = rst.unwrap_err();
match e {
RetryableError::ForwardToLeader { leader } => {
assert_eq!(leader_id, leader);
MetaError::ForwardToLeader(ForwardToLeader { leader }) => {
assert_eq!(Some(leader_id), leader);
}
_ => {
panic!("expect ForwardToLeader")
}
}
}
Expand Down Expand Up @@ -559,7 +563,9 @@ async fn test_meta_node_restart_single_node() -> anyhow::Result<()> {
let leader = tc.meta_nodes.pop().unwrap();

leader
.write_to_local_leader(LogEntry {
.as_leader()
.await?
.write(LogEntry {
txid: None,
cmd: Cmd::UpsertKV {
key: "foo".to_string(),
Expand All @@ -568,7 +574,7 @@ async fn test_meta_node_restart_single_node() -> anyhow::Result<()> {
value_meta: None,
},
})
.await??;
.await?;
log_cnt += 1;

want_hs = leader.sto.raft_state.read_hard_state()?;
Expand Down Expand Up @@ -752,7 +758,9 @@ async fn assert_upsert_kv_synced(meta_nodes: Vec<Arc<MetaNode>>, key: &str) -> a
tracing::info!("leader: last_applied={}", last_applied);
{
leader
.write_to_local_leader(LogEntry {
.as_leader()
.await?
.write(LogEntry {
txid: None,
cmd: Cmd::UpsertKV {
key: key.to_string(),
Expand All @@ -761,7 +769,7 @@ async fn assert_upsert_kv_synced(meta_nodes: Vec<Arc<MetaNode>>, key: &str) -> a
value_meta: None,
},
})
.await??;
.await?;
}

assert_applied_index(meta_nodes.clone(), last_applied + 1).await?;
Expand Down

0 comments on commit d7407fc

Please sign in to comment.