Skip to content

Commit

Permalink
Temporarily retain endpoints which are in active use
Browse files Browse the repository at this point in the history
  • Loading branch information
XAMPPRocky committed Aug 23, 2023
1 parent 1fc3954 commit 2af4534
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 23 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ kube.workspace = true
trust-dns-resolver = { version = "0.23.0", features = ["tokio", "tokio-rustls", "dns-over-https-rustls"] }
async-trait = "0.1.68"
nom = "7.1.3"
itertools = "0.11.0"

[target.'cfg(target_os = "linux")'.dependencies]
sys-info = "0.9.1"
Expand Down
13 changes: 12 additions & 1 deletion src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,16 @@ pub use self::{
pub type EndpointMetadata = crate::metadata::MetadataView<Metadata>;

/// A destination endpoint with any associated metadata.
#[derive(Debug, Deserialize, Serialize, PartialEq, Clone, Eq, schemars::JsonSchema)]
#[derive(Debug, Deserialize, Serialize, Clone, schemars::JsonSchema)]
#[non_exhaustive]
#[serde(deny_unknown_fields)]
pub struct Endpoint {
#[schemars(with = "String")]
pub address: EndpointAddress,
#[serde(default)]
pub metadata: EndpointMetadata,
#[serde(skip, default)]
pub sessions: std::sync::Arc<std::sync::atomic::AtomicUsize>,
}

impl Endpoint {
Expand All @@ -65,6 +67,7 @@ impl Default for Endpoint {
Self {
address: EndpointAddress::UNSPECIFIED,
metadata: <_>::default(),
sessions: <_>::default(),
}
}
}
Expand Down Expand Up @@ -128,6 +131,14 @@ impl<T: Into<EndpointAddress>> From<T> for Endpoint {
}
}

impl PartialEq for Endpoint {
fn eq(&self, other: &Self) -> bool {
self.address.eq(&other.address) && self.metadata.eq(&other.metadata)
}
}

impl Eq for Endpoint {}

impl Ord for Endpoint {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.address.cmp(&other.address)
Expand Down
96 changes: 74 additions & 22 deletions src/endpoint/locality.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

use std::collections::BTreeSet;

use itertools::Itertools;
use serde::{Deserialize, Serialize};

use super::Endpoint;
Expand Down Expand Up @@ -277,34 +278,85 @@ impl LocalitySet {
}

pub fn merge(&mut self, cluster: &Self) {
use std::collections::hash_map::Entry;

for (key, value) in &cluster.0 {
if tracing::enabled!(tracing::Level::INFO) {
let span = tracing::info_span!(
"applied_locality",
locality = &*key
.as_ref()
.map(|locality| locality.colon_separated_string())
.unwrap_or_else(|| String::from("<none>"))
);

let _entered = span.enter();
if value.endpoints.is_empty() {
tracing::info!("removing all endpoints");
} else if let Some(original_endpoints) = self.0.get(key) {
for endpoint in &original_endpoints.endpoints {
if !value.endpoints.contains(endpoint) {
tracing::info!(%endpoint.address, ?endpoint.metadata.known.tokens, "removing endpoint");
let span = tracing::info_span!(
"applied_locality",
locality = &*key
.as_ref()
.map(|locality| locality.colon_separated_string())
.unwrap_or_else(|| String::from("<none>"))
);

let _entered = span.enter();

match self.0.entry(key.clone()) {
// The eviction logic is as follows:
//
// If an endpoint already exists:
// - If `sessions` is zero then it is dropped.
// If that endpoint exists in the new set:
// - Its metadata is replaced with the new set.
// Else the endpoint remains.
//
// This will mean that updated metadata such as new tokens
// will be respected, but we will still retain older
// endpoints that are currently actively used in a session.
Entry::Occupied(entry) => {
let (key, original_locality) = entry.remove_entry();
let mut value = value.clone();
let new_set_addresses = value
.endpoints
.iter()
.map(|endpoint| endpoint.address.clone())
.collect::<BTreeSet<_>>();

if tracing::enabled!(tracing::Level::INFO) {
for endpoint in value.endpoints.iter() {
tracing::info!(
%endpoint.address,
endpoint.tokens=%endpoint.metadata.known.tokens.iter().map(crate::utils::base64_encode).join(", "),
"applying endpoint"
);
}
}

let (retained, dropped): (Vec<_>, _) = original_locality
.endpoints
.into_iter()
.partition(|endpoint| {
!new_set_addresses.contains(&endpoint.address)
&& endpoint.sessions.load(std::sync::atomic::Ordering::SeqCst) != 0
});

if tracing::enabled!(tracing::Level::INFO) {
for endpoint in dropped {
tracing::info!(
%endpoint.address,
endpoint.tokens=%endpoint.metadata.known.tokens.iter().map(crate::utils::base64_encode).join(", "),
"dropping endpoint"
);
}
}
}

for endpoint in &value.endpoints {
tracing::info!(%endpoint.address, ?endpoint.metadata.known.tokens, "applying endpoint");
for endpoint in retained {
tracing::info!(
%endpoint.address,
endpoint.tokens=%endpoint.metadata.known.tokens.iter().map(crate::utils::base64_encode).join(", "),
"retaining endpoint"
);

value.endpoints.insert(endpoint);
}

self.0.insert(key, value.clone());
}
Entry::Vacant(entry) => {
tracing::info!("adding new locality");
entry.insert(value.clone());
}
}

let entry = self.0.entry(key.clone()).or_default();
*entry = value.clone();
}
}
}
Expand Down
6 changes: 6 additions & 0 deletions src/proxy/sessions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ impl Session {
.connect(args.dest.address.to_socket_addr().await?)
.await?;
let (shutdown_tx, shutdown_rx) = watch::channel::<()>(());
args.dest
.sessions
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);

let s = Session {
config: args.config.clone(),
Expand Down Expand Up @@ -235,6 +238,9 @@ impl Drop for Session {
fn drop(&mut self) {
self.active_session_metric().dec();
metrics::duration_secs().observe(self.created_at.elapsed().as_secs() as f64);
self.dest
.sessions
.fetch_sub(1, std::sync::atomic::Ordering::SeqCst);

if let Err(error) = self.shutdown_tx.send(()) {
tracing::warn!(%error, "Error sending session shutdown signal");
Expand Down

0 comments on commit 2af4534

Please sign in to comment.