Skip to content

Commit

Permalink
Add --idle-request-interval-secs (#790)
Browse files Browse the repository at this point in the history
Co-authored-by: Mark Mandel <[email protected]>
  • Loading branch information
XAMPPRocky and markmandel authored Sep 18, 2023
1 parent 36d14d4 commit de49889
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 16 deletions.
8 changes: 7 additions & 1 deletion src/cli/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ pub struct Proxy {
/// One or more socket addresses to forward packets to.
#[clap(short, long, env = "QUILKIN_DEST")]
pub to: Vec<SocketAddr>,
/// The interval in seconds at which the relay will send a discovery request
/// to an management server after receiving no updates.
#[clap(long, env = "QUILKIN_IDLE_REQUEST_INTERVAL_SECS", default_value_t = crate::xds::server::IDLE_REQUEST_INTERVAL_SECS)]
pub idle_request_interval_secs: u64,
}

impl Default for Proxy {
Expand All @@ -56,6 +60,7 @@ impl Default for Proxy {
port: PORT,
qcmp_port: QCMP_PORT,
to: <_>::default(),
idle_request_interval_secs: crate::xds::server::IDLE_REQUEST_INTERVAL_SECS,
}
}
}
Expand Down Expand Up @@ -107,7 +112,8 @@ impl Proxy {
let client =
crate::xds::AdsClient::connect(String::clone(&id), self.management_server.clone())
.await?;
let mut stream = client.xds_client_stream(config.clone());
let mut stream =
client.xds_client_stream(config.clone(), self.idle_request_interval_secs);

tokio::time::sleep(std::time::Duration::from_nanos(1)).await;
stream
Expand Down
6 changes: 6 additions & 0 deletions src/cli/relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ pub struct Relay {
/// Port for xDS management_server service
#[clap(short, long, env = super::PORT_ENV_VAR, default_value_t = super::manage::PORT)]
pub xds_port: u16,
/// The interval in seconds at which the relay will send a discovery request
/// to an management server after receiving no updates.
#[clap(long, env = "QUILKIN_IDLE_REQUEST_INTERVAL_SECS", default_value_t = crate::xds::server::IDLE_REQUEST_INTERVAL_SECS)]
pub idle_request_interval_secs: u64,
#[clap(subcommand)]
pub providers: Option<Providers>,
}
Expand All @@ -43,6 +47,7 @@ impl Default for Relay {
Self {
mds_port: PORT,
xds_port: super::manage::PORT,
idle_request_interval_secs: crate::xds::server::IDLE_REQUEST_INTERVAL_SECS,
providers: None,
}
}
Expand All @@ -57,6 +62,7 @@ impl Relay {
let xds_server = crate::xds::server::spawn(self.xds_port, config.clone());
let mds_server = tokio::spawn(crate::xds::server::control_plane_discovery_server(
self.mds_port,
self.idle_request_interval_secs,
config.clone(),
));

Expand Down
5 changes: 4 additions & 1 deletion src/xds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,10 @@ mod tests {
)
.await
.unwrap();
let mut stream = client.xds_client_stream(config.clone());
let mut stream = client.xds_client_stream(
config.clone(),
crate::xds::server::IDLE_REQUEST_INTERVAL_SECS,
);
tokio::time::sleep(std::time::Duration::from_millis(500)).await;

// Each time, we create a new upstream endpoint and send a cluster update for it.
Expand Down
38 changes: 31 additions & 7 deletions src/xds/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,8 +217,12 @@ impl MdsClient {

impl AdsClient {
/// Starts a new stream to the xDS management server.
pub fn xds_client_stream(&self, config: Arc<Config>) -> AdsStream {
AdsStream::xds_client_stream(self, config)
pub fn xds_client_stream(
&self,
config: Arc<Config>,
idle_request_interval_secs: u64,
) -> AdsStream {
AdsStream::xds_client_stream(self, config, idle_request_interval_secs)
}
}

Expand All @@ -238,6 +242,7 @@ impl AdsStream {
management_servers,
}: &AdsClient,
config: Arc<Config>,
idle_request_interval_secs: u64,
) -> Self {
let mut client = client.clone();
let identifier = identifier.clone();
Expand Down Expand Up @@ -285,20 +290,36 @@ impl AdsStream {
);

loop {
match stream.next().await {
Some(Ok(ack)) => {
let next_response = tokio::time::timeout(
std::time::Duration::from_secs(idle_request_interval_secs),
stream.next(),
);

match next_response.await {
Ok(Some(Ok(ack))) => {
tracing::trace!("received ack");
requests.send(ack)?;
continue;
}
Some(Err(error)) => {
Ok(Some(Err(error))) => {
tracing::warn!(%error, "xds stream error");
break;
}
None => {
Ok(None) => {
tracing::warn!("xDS stream terminated");
break;
}
Err(_) => {
tracing::info!(
"exceeded idle request interval sending new requests"
);
Self::refresh_resources(
&identifier,
&subscribed_resources,
&mut requests,
)
.await?;
}
}
}

Expand Down Expand Up @@ -367,7 +388,10 @@ impl MdsStream {
.await?
.into_inner();

let control_plane = super::server::ControlPlane::from_arc(config.clone());
let control_plane = super::server::ControlPlane::from_arc(
config.clone(),
super::server::IDLE_REQUEST_INTERVAL_SECS,
);
let mut stream = control_plane.stream_aggregated_resources(stream).await?;
while let Some(result) = stream.next().await {
let response = result?;
Expand Down
40 changes: 33 additions & 7 deletions src/xds/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,17 @@ use crate::{
},
};

pub(crate) const IDLE_REQUEST_INTERVAL_SECS: u64 = 30;

#[tracing::instrument(skip_all)]
pub fn spawn(
port: u16,
config: std::sync::Arc<crate::Config>,
) -> impl std::future::Future<Output = crate::Result<()>> {
let server = AggregatedDiscoveryServiceServer::new(ControlPlane::from_arc(config));
let server = AggregatedDiscoveryServiceServer::new(ControlPlane::from_arc(
config,
IDLE_REQUEST_INTERVAL_SECS,
));
let server = tonic::transport::Server::builder().add_service(server);
tracing::info!("serving management server on port `{port}`");
server
Expand All @@ -53,9 +58,13 @@ pub fn spawn(

pub(crate) fn control_plane_discovery_server(
port: u16,
idle_request_interval_secs: u64,
config: Arc<Config>,
) -> impl std::future::Future<Output = crate::Result<()>> {
let server = AggregatedControlPlaneDiscoveryServiceServer::new(ControlPlane::from_arc(config));
let server = AggregatedControlPlaneDiscoveryServiceServer::new(ControlPlane::from_arc(
config,
idle_request_interval_secs,
));
let server = tonic::transport::Server::builder().add_service(server);
tracing::info!("serving relay server on port `{port}`");
server
Expand All @@ -66,6 +75,7 @@ pub(crate) fn control_plane_discovery_server(
#[derive(Clone)]
pub struct ControlPlane {
config: Arc<Config>,
idle_request_interval_secs: u64,
watchers: Arc<crate::xds::resource::ResourceMap<Watchers>>,
}

Expand All @@ -88,13 +98,14 @@ impl Default for Watchers {

impl ControlPlane {
/// Creates a new server for managing [`Config`].
pub fn new(config: Config) -> Self {
Self::from_arc(Arc::new(config))
pub fn new(config: Config, idle_request_interval_secs: u64) -> Self {
Self::from_arc(Arc::new(config), idle_request_interval_secs)
}

pub fn from_arc(config: Arc<Config>) -> Self {
pub fn from_arc(config: Arc<Config>, idle_request_interval_secs: u64) -> Self {
let this = Self {
config,
idle_request_interval_secs,
watchers: <_>::default(),
};

Expand Down Expand Up @@ -315,6 +326,7 @@ impl AggregatedControlPlaneDiscoveryService for ControlPlane {

tracing::info!(%identifier, "new control plane discovery stream");
let config = self.config.clone();
let idle_request_interval_secs = self.idle_request_interval_secs;
let stream = super::client::AdsStream::connect(
Arc::from(&*identifier),
move |(mut requests, _rx), _subscribed_resources| async move {
Expand All @@ -334,9 +346,23 @@ impl AggregatedControlPlaneDiscoveryService for ControlPlane {
);

loop {
if let Some(ack) = response_handler.next().await {
let next_response = tokio::time::timeout(
std::time::Duration::from_secs(idle_request_interval_secs),
response_handler.next(),
);

if let Ok(Some(ack)) = next_response.await {
tracing::info!("sending ack request");
requests.send(ack?)?;
} else {
tracing::info!("exceeded idle interval, sending request");
crate::xds::client::MdsStream::discovery_request_without_cache(
&identifier,
&mut requests,
crate::xds::ResourceType::Cluster,
&[],
)
.map_err(|error| tonic::Status::internal(error.to_string()))?;
}
}
},
Expand Down Expand Up @@ -399,7 +425,7 @@ mod tests {
};

let config = Arc::new(Config::default());
let client = ControlPlane::from_arc(config.clone());
let client = ControlPlane::from_arc(config.clone(), IDLE_REQUEST_INTERVAL_SECS);
let (tx, rx) = tokio::sync::mpsc::channel(256);

let mut request = DiscoveryRequest {
Expand Down

0 comments on commit de49889

Please sign in to comment.