From d7407fc2a6277ff474338877b5a0bb7a2e68b318 Mon Sep 17 00:00:00 2001 From: drdr xp Date: Wed, 10 Nov 2021 13:28:16 +0800 Subject: [PATCH] [metasrv] refactor: move write_to_local_leader() to MetaLeader --- metasrv/src/meta_service/message.rs | 3 + metasrv/src/meta_service/meta_leader.rs | 56 ++++++++++----- metasrv/src/meta_service/meta_service_impl.rs | 22 ++++-- .../meta_service/meta_service_impl_test.rs | 13 +++- metasrv/src/meta_service/raftmeta.rs | 69 +++---------------- metasrv/src/meta_service/raftmeta_test.rs | 30 +++++--- 6 files changed, 96 insertions(+), 97 deletions(-) diff --git a/metasrv/src/meta_service/message.rs b/metasrv/src/meta_service/message.rs index 8f7f48bce023..02d4a31412b3 100644 --- a/metasrv/src/meta_service/message.rs +++ b/metasrv/src/meta_service/message.rs @@ -35,6 +35,7 @@ pub struct JoinRequest { #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, derive_more::TryInto)] pub enum AdminRequestInner { Join(JoinRequest), + Write(LogEntry), } #[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] @@ -52,8 +53,10 @@ impl AdminRequest { } #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, derive_more::TryInto)] +#[allow(clippy::large_enum_variant)] pub enum AdminResponse { Join(()), + AppliedState(AppliedState), } impl tonic::IntoRequest for AdminRequest { diff --git a/metasrv/src/meta_service/meta_leader.rs b/metasrv/src/meta_service/meta_leader.rs index 494dce70c907..eb15a0eaffbc 100644 --- a/metasrv/src/meta_service/meta_leader.rs +++ b/metasrv/src/meta_service/meta_leader.rs @@ -15,7 +15,10 @@ use std::collections::BTreeSet; use async_raft::error::ResponseError; +use async_raft::raft::ClientWriteRequest; use async_raft::ChangeConfigError; +use async_raft::ClientWriteError; +use common_meta_raft_store::state_machine::AppliedState; use common_meta_types::Cmd; use common_meta_types::LogEntry; use common_meta_types::Node; @@ -25,7 +28,6 @@ use common_tracing::tracing; use crate::errors::ForwardToLeader; use crate::errors::InvalidMembership; use crate::errors::MetaError; -use crate::errors::RetryableError; use crate::meta_service::message::AdminRequest; use crate::meta_service::message::AdminResponse; use crate::meta_service::AdminRequestInner; @@ -52,6 +54,10 @@ impl<'a> MetaLeader<'a> { self.join(join_req).await?; Ok(AdminResponse::Join(())) } + AdminRequestInner::Write(entry) => { + let res = self.write(entry).await?; + Ok(AdminResponse::AppliedState(res)) + } } } @@ -85,23 +91,7 @@ impl<'a> MetaLeader<'a> { }, }; - let res = self - .meta_node - .write_to_local_leader(ent.clone()) - .await - .map_err(|e| MetaError::UnknownError(e.to_string()))?; - match res { - Ok(_applied_state) => {} - Err(retryable_error) => { - // TODO(xp): remove retryable error. - let leader_id = match retryable_error { - RetryableError::ForwardToLeader { leader } => leader, - }; - return Err(MetaError::ForwardToLeader(ForwardToLeader { - leader: Some(leader_id), - })); - } - } + self.write(ent.clone()).await?; self.change_membership(membership).await } @@ -138,4 +128,34 @@ impl<'a> MetaLeader<'a> { _ => Err(MetaError::UnknownError("uncovered error".to_string())), } } + + /// Write a log through local raft node and return the states before and after applying the log. + /// + /// If the raft node is not a leader, it returns MetaError::ForwardToLeader. + /// If the leadership is lost during writing the log, it returns an UnknownError. + /// TODO(xp): elaborate the UnknownError, e.g. LeaderLostError + #[tracing::instrument(level = "debug", skip(self))] + pub async fn write(&self, entry: LogEntry) -> Result { + let write_rst = self + .meta_node + .raft + .client_write(ClientWriteRequest::new(entry)) + .await; + + tracing::debug!("raft.client_write rst: {:?}", write_rst); + + match write_rst { + Ok(resp) => Ok(resp.data), + Err(cli_write_err) => match cli_write_err { + // fatal error + ClientWriteError::RaftError(raft_err) => { + Err(MetaError::UnknownError(raft_err.to_string())) + } + // retryable error + ClientWriteError::ForwardToLeader(_, leader) => { + Err(MetaError::ForwardToLeader(ForwardToLeader { leader })) + } + }, + } + } } diff --git a/metasrv/src/meta_service/meta_service_impl.rs b/metasrv/src/meta_service/meta_service_impl.rs index dde9cf912e79..1a4a41614f59 100644 --- a/metasrv/src/meta_service/meta_service_impl.rs +++ b/metasrv/src/meta_service/meta_service_impl.rs @@ -21,6 +21,7 @@ use std::sync::Arc; use common_meta_types::LogEntry; use common_tracing::tracing; +use crate::errors::MetaError; use crate::meta_service::message::AdminRequest; use crate::meta_service::MetaNode; use crate::proto::meta_service_server::MetaService; @@ -53,14 +54,21 @@ impl MetaService for MetaServiceImpl { let mes = request.into_inner(); let req: LogEntry = mes.try_into()?; - let rst = self - .meta_node - .write_to_local_leader(req) - .await - .map_err(|e| tonic::Status::internal(e.to_string()))?; + let leader = self.meta_node.as_leader().await; - let raft_mes = rst.into(); - Ok(tonic::Response::new(raft_mes)) + let leader = match leader { + Ok(x) => x, + Err(err) => { + let err: MetaError = err.into(); + let raft_reply = Err::<(), _>(err).into(); + return Ok(tonic::Response::new(raft_reply)); + } + }; + + let rst = leader.write(req).await; + + let raft_reply = rst.into(); + Ok(tonic::Response::new(raft_reply)) } #[tracing::instrument(level = "info", skip(self))] diff --git a/metasrv/src/meta_service/meta_service_impl_test.rs b/metasrv/src/meta_service/meta_service_impl_test.rs index 2be6170159d3..203f5bcf2f8b 100644 --- a/metasrv/src/meta_service/meta_service_impl_test.rs +++ b/metasrv/src/meta_service/meta_service_impl_test.rs @@ -25,6 +25,8 @@ use common_tracing::tracing; use log::info; use pretty_assertions::assert_eq; +use crate::errors::ForwardToLeader; +use crate::errors::MetaError; use crate::errors::RetryableError; use crate::meta_service::MetaNode; use crate::proto::meta_service_client::MetaServiceClient; @@ -165,12 +167,17 @@ async fn test_meta_cluster_write_on_non_leader() -> anyhow::Result<()> { }; let raft_mes = client.write(req).await?.into_inner(); - let rst: Result = raft_mes.into(); + let rst: Result = raft_mes.into(); + println!("{:?}", rst); + assert!(rst.is_err()); let err = rst.unwrap_err(); match err { - RetryableError::ForwardToLeader { leader } => { - assert_eq!(leader, 0); + MetaError::ForwardToLeader(ForwardToLeader { leader }) => { + assert_eq!(leader, Some(0)); + } + _ => { + panic!("expect ForwardToLeader") } } diff --git a/metasrv/src/meta_service/raftmeta.rs b/metasrv/src/meta_service/raftmeta.rs index 8dccdb8b8f9e..506e7cf13907 100644 --- a/metasrv/src/meta_service/raftmeta.rs +++ b/metasrv/src/meta_service/raftmeta.rs @@ -16,8 +16,6 @@ use std::collections::BTreeSet; use std::sync::Arc; use async_raft::config::Config; -use async_raft::raft::ClientWriteRequest; -use async_raft::ClientWriteError; use async_raft::Raft; use async_raft::RaftMetrics; use async_raft::SnapshotPolicy; @@ -46,10 +44,10 @@ use common_tracing::tracing::Instrument; use crate::errors::ConnectionError; use crate::errors::ForwardToLeader; use crate::errors::MetaError; -use crate::errors::RetryableError; use crate::meta_service::message::AdminRequest; use crate::meta_service::message::AdminResponse; use crate::meta_service::meta_leader::MetaLeader; +use crate::meta_service::AdminRequestInner; use crate::meta_service::MetaServiceImpl; use crate::meta_service::Network; use crate::proto::meta_service_client::MetaServiceClient; @@ -532,31 +530,16 @@ impl MetaNode { /// Submit a write request to the known leader. Returns the response after applying the request. #[tracing::instrument(level = "info", skip(self))] pub async fn write(&self, req: LogEntry) -> common_exception::Result { - let mut curr_leader = self.get_leader().await; - loop { - let rst = if curr_leader == self.sto.id { - self.write_to_local_leader(req.clone()).await? - } else { - // forward to leader - - let addr = self.sto.get_node_addr(&curr_leader).await?; - - // TODO: retry - let mut client = MetaServiceClient::connect(format!("http://{}", addr)) - .await - .map_err(|e| ErrorCode::CannotConnectNode(e.to_string()))?; - let resp = client.write(req.clone()).await?; - let rst: Result = resp.into_inner().into(); - rst - }; - - match rst { - Ok(resp) => return Ok(resp), - Err(write_err) => match write_err { - RetryableError::ForwardToLeader { leader } => curr_leader = leader, - }, - } - } + let res = self + .handle_admin_req(AdminRequest { + forward_to_leader: true, + req: AdminRequestInner::Write(req.clone()), + }) + .await?; + + let res: AppliedState = res.try_into().expect("expect AppliedState"); + + Ok(res) } /// Try to get the leader from the latest metrics of the local raft node. @@ -616,34 +599,4 @@ impl MetaNode { let res: Result = raft_mes.into(); res } - - /// Write a meta log through local raft node. - /// It works only when this node is the leader, - /// otherwise it returns ClientWriteError::ForwardToLeader error indicating the latest leader. - #[tracing::instrument(level = "info", skip(self))] - pub async fn write_to_local_leader( - &self, - req: LogEntry, - ) -> common_exception::Result> { - let write_rst = self.raft.client_write(ClientWriteRequest::new(req)).await; - - tracing::debug!("raft.client_write rst: {:?}", write_rst); - - match write_rst { - Ok(resp) => Ok(Ok(resp.data)), - Err(cli_write_err) => match cli_write_err { - // fatal error - ClientWriteError::RaftError(raft_err) => { - Err(ErrorCode::MetaServiceError(raft_err.to_string())) - } - // retryable error - ClientWriteError::ForwardToLeader(_, leader) => match leader { - Some(id) => Ok(Err(RetryableError::ForwardToLeader { leader: id })), - None => Err(ErrorCode::MetaServiceUnavailable( - "no leader to write".to_string(), - )), - }, - }, - } - } } diff --git a/metasrv/src/meta_service/raftmeta_test.rs b/metasrv/src/meta_service/raftmeta_test.rs index ce1faf9fae3a..7cccb6b4f5c9 100644 --- a/metasrv/src/meta_service/raftmeta_test.rs +++ b/metasrv/src/meta_service/raftmeta_test.rs @@ -32,8 +32,10 @@ use maplit::btreeset; use pretty_assertions::assert_eq; use crate::configs; -use crate::errors::RetryableError; +use crate::errors::ForwardToLeader; +use crate::errors::MetaError; use crate::meta_service::message::AdminRequest; +use crate::meta_service::meta_leader::MetaLeader; use crate::meta_service::AdminRequestInner; use crate::meta_service::JoinRequest; use crate::meta_service::MetaNode; @@ -136,8 +138,9 @@ async fn test_meta_node_write_to_local_leader() -> anyhow::Result<()> { let key = "t-non-leader-write"; for id in 0u64..4 { let mn = &all[id as usize]; - let rst = mn - .write_to_local_leader(LogEntry { + let maybe_leader = MetaLeader::new(mn); + let rst = maybe_leader + .write(LogEntry { txid: None, cmd: Cmd::UpsertKV { key: key.to_string(), @@ -148,16 +151,17 @@ async fn test_meta_node_write_to_local_leader() -> anyhow::Result<()> { }) .await; - let rst = rst?; - if id == leader_id { assert!(rst.is_ok()); } else { assert!(rst.is_err()); let e = rst.unwrap_err(); match e { - RetryableError::ForwardToLeader { leader } => { - assert_eq!(leader_id, leader); + MetaError::ForwardToLeader(ForwardToLeader { leader }) => { + assert_eq!(Some(leader_id), leader); + } + _ => { + panic!("expect ForwardToLeader") } } } @@ -559,7 +563,9 @@ async fn test_meta_node_restart_single_node() -> anyhow::Result<()> { let leader = tc.meta_nodes.pop().unwrap(); leader - .write_to_local_leader(LogEntry { + .as_leader() + .await? + .write(LogEntry { txid: None, cmd: Cmd::UpsertKV { key: "foo".to_string(), @@ -568,7 +574,7 @@ async fn test_meta_node_restart_single_node() -> anyhow::Result<()> { value_meta: None, }, }) - .await??; + .await?; log_cnt += 1; want_hs = leader.sto.raft_state.read_hard_state()?; @@ -752,7 +758,9 @@ async fn assert_upsert_kv_synced(meta_nodes: Vec>, key: &str) -> a tracing::info!("leader: last_applied={}", last_applied); { leader - .write_to_local_leader(LogEntry { + .as_leader() + .await? + .write(LogEntry { txid: None, cmd: Cmd::UpsertKV { key: key.to_string(), @@ -761,7 +769,7 @@ async fn assert_upsert_kv_synced(meta_nodes: Vec>, key: &str) -> a value_meta: None, }, }) - .await??; + .await?; } assert_applied_index(meta_nodes.clone(), last_applied + 1).await?;