diff --git a/src/api/error.rs b/src/api/error.rs index e26a628..8abba0a 100644 --- a/src/api/error.rs +++ b/src/api/error.rs @@ -64,4 +64,7 @@ pub enum Error { #[error("Wrong server address: {0}")] WrongServerAddress(String), + + #[error("Exceeded maximum retry attempts: {0}")] + MaxRetriesExceeded(u32), } diff --git a/src/api/props.rs b/src/api/props.rs index d13373f..ef386fe 100644 --- a/src/api/props.rs +++ b/src/api/props.rs @@ -26,6 +26,8 @@ pub struct ClientProps { client_version: String, /// auth context auth_context: HashMap, + /// max retries + max_retries: Option, } impl ClientProps { @@ -129,6 +131,10 @@ impl ClientProps { Ok(result) } + + pub(crate) fn get_max_retries(&self) -> Option { + self.max_retries + } } #[allow(clippy::new_without_default)] @@ -149,6 +155,7 @@ impl ClientProps { client_version, auth_context: HashMap::default(), grpc_port: None, + max_retries: None, } } @@ -221,6 +228,12 @@ impl ClientProps { self.auth_context.insert(key.into(), val.into()); self } + + /// Sets the max retries. + pub fn max_retries(mut self, max_retries: u32) -> Self { + self.max_retries = Some(max_retries); + self + } } #[cfg(test)] @@ -242,6 +255,7 @@ mod tests { labels: HashMap::new(), client_version: "test_version".to_string(), auth_context: HashMap::new(), + max_retries: None, }; let result = client_props.get_server_list(); diff --git a/src/common/remote/grpc/nacos_grpc_client.rs b/src/common/remote/grpc/nacos_grpc_client.rs index 406b85f..41bb044 100644 --- a/src/common/remote/grpc/nacos_grpc_client.rs +++ b/src/common/remote/grpc/nacos_grpc_client.rs @@ -72,6 +72,7 @@ pub(crate) struct NacosGrpcClientBuilder { disconnected_listener: Option, unary_call_layer: Option, bi_call_layer: Option, + max_retries: Option, } impl NacosGrpcClientBuilder { @@ -89,6 +90,7 @@ impl NacosGrpcClientBuilder { disconnected_listener: None, unary_call_layer: None, bi_call_layer: None, + max_retries: None, } } @@ -117,6 +119,11 @@ impl NacosGrpcClientBuilder { Self { ..self } } + pub(crate) fn max_retries(mut self, max_retries: Option) -> Self { + self.max_retries = max_retries; + Self { ..self } + } + pub(crate) fn support_remote_connection(mut self, enable: bool) -> Self { self.client_abilities.support_remote_connection(enable); Self { ..self } @@ -359,6 +366,7 @@ impl NacosGrpcClientBuilder { self.namespace, self.labels, self.client_abilities, + self.max_retries, ); if let Some(connected_listener) = self.connected_listener { diff --git a/src/common/remote/grpc/nacos_grpc_connection.rs b/src/common/remote/grpc/nacos_grpc_connection.rs index e549127..bc01b90 100644 --- a/src/common/remote/grpc/nacos_grpc_connection.rs +++ b/src/common/remote/grpc/nacos_grpc_connection.rs @@ -34,9 +34,9 @@ type ConnectedListener = Arc; type DisconnectedListener = Arc; type HandlerMap = HashMap>; -const MAX_RETRY: i32 = 6; +const MAX_RETRY: u32 = 6; -fn sleep_time(retry_count: i32) -> i32 { +fn sleep_time(retry_count: u32) -> u32 { if retry_count > MAX_RETRY { 1 << (retry_count % MAX_RETRY) } else { @@ -58,11 +58,12 @@ where state: State, health: Arc, connection_id: Option, - retry_count: i32, + retry_count: u32, connection_id_watcher: ( watch::Sender>, watch::Receiver>, ), + max_retries: Option, } impl NacosGrpcConnection @@ -82,6 +83,7 @@ where namespace: String, labels: HashMap, client_abilities: NacosClientAbilities, + max_retries: Option, ) -> Self { let connection_id_watcher = watch::channel(None); @@ -98,6 +100,7 @@ where connection_id: None, retry_count: 0, connection_id_watcher, + max_retries, } } @@ -480,6 +483,13 @@ where debug_span!(parent: None, "grpc_connection", id = self.id.clone()).entered(); loop { + if let Some(max_retries) = self.max_retries { + if self.retry_count > max_retries { + error!("Exceeded maximum retry attempts: {}", max_retries); + return Poll::Ready(Err(Self::Error::MaxRetriesExceeded(max_retries))); + } + } + match self.state { State::Idle => { info!("create new connection."); diff --git a/src/config/worker.rs b/src/config/worker.rs index 163a169..aab4709 100644 --- a/src/config/worker.rs +++ b/src/config/worker.rs @@ -67,6 +67,7 @@ impl ConfigWorker { )) .unary_call_layer(auth_layer.clone()) .bi_call_layer(auth_layer) + .max_retries(client_props.get_max_retries()) .build(client_id); let remote_client = Arc::new(remote_client); diff --git a/src/naming/mod.rs b/src/naming/mod.rs index 880e48e..7ebc502 100644 --- a/src/naming/mod.rs +++ b/src/naming/mod.rs @@ -133,6 +133,7 @@ impl NacosNamingService { }) .unary_call_layer(auth_layer.clone()) .bi_call_layer(auth_layer) + .max_retries(client_props.get_max_retries()) .build(client_id.clone()); let nacos_grpc_client = Arc::new(nacos_grpc_client);