diff --git a/src/cli/proxy.rs b/src/cli/proxy.rs index 41b26bc2a4..5498aa9716 100644 --- a/src/cli/proxy.rs +++ b/src/cli/proxy.rs @@ -46,6 +46,10 @@ pub struct Proxy { /// One or more socket addresses to forward packets to. #[clap(short, long, env = "QUILKIN_DEST")] pub to: Vec, + /// The interval in seconds at which the relay will send a discovery request + /// to an management server after receiving no updates. + #[clap(long, env = "QUILKIN_IDLE_REQUEST_INTERVAL_SECS", default_value_t = crate::xds::server::IDLE_REQUEST_INTERVAL_SECS)] + pub idle_request_interval_secs: u64, } impl Default for Proxy { @@ -56,6 +60,7 @@ impl Default for Proxy { port: PORT, qcmp_port: QCMP_PORT, to: <_>::default(), + idle_request_interval_secs: crate::xds::server::IDLE_REQUEST_INTERVAL_SECS, } } } @@ -107,7 +112,8 @@ impl Proxy { let client = crate::xds::AdsClient::connect(String::clone(&id), self.management_server.clone()) .await?; - let mut stream = client.xds_client_stream(config.clone()); + let mut stream = + client.xds_client_stream(config.clone(), self.idle_request_interval_secs); tokio::time::sleep(std::time::Duration::from_nanos(1)).await; stream diff --git a/src/cli/relay.rs b/src/cli/relay.rs index 21b801543f..fc075653a4 100644 --- a/src/cli/relay.rs +++ b/src/cli/relay.rs @@ -34,6 +34,10 @@ pub struct Relay { /// Port for xDS management_server service #[clap(short, long, env = super::PORT_ENV_VAR, default_value_t = super::manage::PORT)] pub xds_port: u16, + /// The interval in seconds at which the relay will send a discovery request + /// to an management server after receiving no updates. + #[clap(long, env = "QUILKIN_IDLE_REQUEST_INTERVAL_SECS", default_value_t = crate::xds::server::IDLE_REQUEST_INTERVAL_SECS)] + pub idle_request_interval_secs: u64, #[clap(subcommand)] pub providers: Option, } @@ -43,6 +47,7 @@ impl Default for Relay { Self { mds_port: PORT, xds_port: super::manage::PORT, + idle_request_interval_secs: crate::xds::server::IDLE_REQUEST_INTERVAL_SECS, providers: None, } } @@ -57,6 +62,7 @@ impl Relay { let xds_server = crate::xds::server::spawn(self.xds_port, config.clone()); let mds_server = tokio::spawn(crate::xds::server::control_plane_discovery_server( self.mds_port, + self.idle_request_interval_secs, config.clone(), )); diff --git a/src/xds.rs b/src/xds.rs index a39b023a52..95245f3a69 100644 --- a/src/xds.rs +++ b/src/xds.rs @@ -302,7 +302,10 @@ mod tests { ) .await .unwrap(); - let mut stream = client.xds_client_stream(config.clone()); + let mut stream = client.xds_client_stream( + config.clone(), + crate::xds::server::IDLE_REQUEST_INTERVAL_SECS, + ); tokio::time::sleep(std::time::Duration::from_millis(500)).await; // Each time, we create a new upstream endpoint and send a cluster update for it. diff --git a/src/xds/client.rs b/src/xds/client.rs index 09d38bef22..e8ad068240 100644 --- a/src/xds/client.rs +++ b/src/xds/client.rs @@ -217,8 +217,12 @@ impl MdsClient { impl AdsClient { /// Starts a new stream to the xDS management server. - pub fn xds_client_stream(&self, config: Arc) -> AdsStream { - AdsStream::xds_client_stream(self, config) + pub fn xds_client_stream( + &self, + config: Arc, + idle_request_interval_secs: u64, + ) -> AdsStream { + AdsStream::xds_client_stream(self, config, idle_request_interval_secs) } } @@ -238,6 +242,7 @@ impl AdsStream { management_servers, }: &AdsClient, config: Arc, + idle_request_interval_secs: u64, ) -> Self { let mut client = client.clone(); let identifier = identifier.clone(); @@ -285,20 +290,36 @@ impl AdsStream { ); loop { - match stream.next().await { - Some(Ok(ack)) => { + let next_response = tokio::time::timeout( + std::time::Duration::from_secs(idle_request_interval_secs), + stream.next(), + ); + + match next_response.await { + Ok(Some(Ok(ack))) => { tracing::trace!("received ack"); requests.send(ack)?; continue; } - Some(Err(error)) => { + Ok(Some(Err(error))) => { tracing::warn!(%error, "xds stream error"); break; } - None => { + Ok(None) => { tracing::warn!("xDS stream terminated"); break; } + Err(_) => { + tracing::info!( + "exceeded idle request interval sending new requests" + ); + Self::refresh_resources( + &identifier, + &subscribed_resources, + &mut requests, + ) + .await?; + } } } @@ -367,7 +388,10 @@ impl MdsStream { .await? .into_inner(); - let control_plane = super::server::ControlPlane::from_arc(config.clone()); + let control_plane = super::server::ControlPlane::from_arc( + config.clone(), + super::server::IDLE_REQUEST_INTERVAL_SECS, + ); let mut stream = control_plane.stream_aggregated_resources(stream).await?; while let Some(result) = stream.next().await { let response = result?; diff --git a/src/xds/server.rs b/src/xds/server.rs index b4afb6bcc2..207a0d46b4 100644 --- a/src/xds/server.rs +++ b/src/xds/server.rs @@ -38,12 +38,17 @@ use crate::{ }, }; +pub(crate) const IDLE_REQUEST_INTERVAL_SECS: u64 = 30; + #[tracing::instrument(skip_all)] pub fn spawn( port: u16, config: std::sync::Arc, ) -> impl std::future::Future> { - let server = AggregatedDiscoveryServiceServer::new(ControlPlane::from_arc(config)); + let server = AggregatedDiscoveryServiceServer::new(ControlPlane::from_arc( + config, + IDLE_REQUEST_INTERVAL_SECS, + )); let server = tonic::transport::Server::builder().add_service(server); tracing::info!("serving management server on port `{port}`"); server @@ -53,9 +58,13 @@ pub fn spawn( pub(crate) fn control_plane_discovery_server( port: u16, + idle_request_interval_secs: u64, config: Arc, ) -> impl std::future::Future> { - let server = AggregatedControlPlaneDiscoveryServiceServer::new(ControlPlane::from_arc(config)); + let server = AggregatedControlPlaneDiscoveryServiceServer::new(ControlPlane::from_arc( + config, + idle_request_interval_secs, + )); let server = tonic::transport::Server::builder().add_service(server); tracing::info!("serving relay server on port `{port}`"); server @@ -66,6 +75,7 @@ pub(crate) fn control_plane_discovery_server( #[derive(Clone)] pub struct ControlPlane { config: Arc, + idle_request_interval_secs: u64, watchers: Arc>, } @@ -88,13 +98,14 @@ impl Default for Watchers { impl ControlPlane { /// Creates a new server for managing [`Config`]. - pub fn new(config: Config) -> Self { - Self::from_arc(Arc::new(config)) + pub fn new(config: Config, idle_request_interval_secs: u64) -> Self { + Self::from_arc(Arc::new(config), idle_request_interval_secs) } - pub fn from_arc(config: Arc) -> Self { + pub fn from_arc(config: Arc, idle_request_interval_secs: u64) -> Self { let this = Self { config, + idle_request_interval_secs, watchers: <_>::default(), }; @@ -315,6 +326,7 @@ impl AggregatedControlPlaneDiscoveryService for ControlPlane { tracing::info!(%identifier, "new control plane discovery stream"); let config = self.config.clone(); + let idle_request_interval_secs = self.idle_request_interval_secs; let stream = super::client::AdsStream::connect( Arc::from(&*identifier), move |(mut requests, _rx), _subscribed_resources| async move { @@ -334,9 +346,23 @@ impl AggregatedControlPlaneDiscoveryService for ControlPlane { ); loop { - if let Some(ack) = response_handler.next().await { + let next_response = tokio::time::timeout( + std::time::Duration::from_secs(idle_request_interval_secs), + response_handler.next(), + ); + + if let Ok(Some(ack)) = next_response.await { tracing::info!("sending ack request"); requests.send(ack?)?; + } else { + tracing::info!("exceeded idle interval, sending request"); + crate::xds::client::MdsStream::discovery_request_without_cache( + &identifier, + &mut requests, + crate::xds::ResourceType::Cluster, + &[], + ) + .map_err(|error| tonic::Status::internal(error.to_string()))?; } } }, @@ -399,7 +425,7 @@ mod tests { }; let config = Arc::new(Config::default()); - let client = ControlPlane::from_arc(config.clone()); + let client = ControlPlane::from_arc(config.clone(), IDLE_REQUEST_INTERVAL_SECS); let (tx, rx) = tokio::sync::mpsc::channel(256); let mut request = DiscoveryRequest {