From df7b5f26ad13221440d1064e9f1d7343b2ed0089 Mon Sep 17 00:00:00 2001 From: Jake Shadle Date: Thu, 2 May 2024 21:14:45 +0200 Subject: [PATCH] Add node address selection via type and ip kind (#934) * Add node address selection via type * Allow empty * Select via type and? kind * Split arguments --------- Co-authored-by: Mark Mandel --- src/cli/agent.rs | 28 ++++++++++++++++++ src/components/agent.rs | 2 ++ src/components/manage.rs | 1 + src/config.rs | 13 +++++++++ src/config/providers.rs | 2 ++ src/config/providers/k8s.rs | 31 +++++++++----------- src/config/providers/k8s/agones.rs | 47 ++++++++++++++++++++---------- src/config/watch/agones.rs | 2 ++ test/src/lib.rs | 1 + 9 files changed, 93 insertions(+), 34 deletions(-) diff --git a/src/cli/agent.rs b/src/cli/agent.rs index d667251e2d..e8e925f0c2 100644 --- a/src/cli/agent.rs +++ b/src/cli/agent.rs @@ -48,6 +48,13 @@ pub struct Agent { /// The configuration source for a management server. #[clap(subcommand)] pub provider: Option, + /// If specified, filters the available gameserver addresses to the one that + /// matches the specified type + #[clap(long)] + pub address_type: Option, + /// If specified, additionally filters the gameserver address by its ip kind + #[clap(long, requires("address_type"), value_enum)] + pub ip_kind: Option, /// The interval in seconds at which the agent will wait for a discovery /// request from a relay server before restarting the connection. #[clap(long, env = "QUILKIN_IDLE_REQUEST_INTERVAL_SECS")] @@ -57,6 +64,21 @@ pub struct Agent { pub icao_code: crate::config::IcaoCode, } +impl clap::ValueEnum for crate::config::AddrKind { + fn value_variants<'a>() -> &'a [Self] { + &[Self::Ipv4, Self::Ipv6, Self::Any] + } + + fn to_possible_value(&self) -> Option { + use clap::builder::PossibleValue as pv; + Some(match self { + Self::Ipv4 => pv::new("v4"), + Self::Ipv6 => pv::new("v6"), + Self::Any => pv::new("any"), + }) + } +} + impl Default for Agent { fn default() -> Self { Self { @@ -68,6 +90,8 @@ impl Default for Agent { provider: <_>::default(), idle_request_interval_secs: None, icao_code: <_>::default(), + address_type: None, + ip_kind: None, } } } @@ -97,6 +121,10 @@ impl Agent { icao_code, relay_servers: self.relay, provider: self.provider, + address_selector: self.address_type.map(|at| crate::config::AddressSelector { + name: at, + kind: self.ip_kind.unwrap_or(crate::config::AddrKind::Any), + }), } .run(crate::components::RunArgs { config, diff --git a/src/components/agent.rs b/src/components/agent.rs index 3a8bb831f6..310d277d16 100644 --- a/src/components/agent.rs +++ b/src/components/agent.rs @@ -29,6 +29,7 @@ pub struct Agent { pub icao_code: Option, pub relay_servers: Vec, pub provider: Option, + pub address_selector: Option, } impl Agent { @@ -60,6 +61,7 @@ impl Agent { config.clone(), ready.provider_is_healthy.clone(), self.locality, + self.address_selector, )), None => return Err(eyre::eyre!("no configuration provider given")), }; diff --git a/src/components/manage.rs b/src/components/manage.rs index 0a222ca015..052ee0b05d 100644 --- a/src/components/manage.rs +++ b/src/components/manage.rs @@ -32,6 +32,7 @@ impl Manage { config.clone(), ready.provider_is_healthy.clone(), self.locality, + None, ); let idle_request_interval = ready.idle_request_interval; diff --git a/src/config.rs b/src/config.rs index fb99f51436..825e2a2e86 100644 --- a/src/config.rs +++ b/src/config.rs @@ -861,6 +861,19 @@ impl From<(String, FilterInstance)> for Filter { } } +#[derive(Clone, Debug)] +pub struct AddressSelector { + pub name: String, + pub kind: AddrKind, +} + +#[derive(Copy, Clone, Debug)] +pub enum AddrKind { + Ipv4, + Ipv6, + Any, +} + #[cfg(test)] mod tests { use std::net::Ipv6Addr; diff --git a/src/config/providers.rs b/src/config/providers.rs index d517140ab2..fd1472e4ce 100644 --- a/src/config/providers.rs +++ b/src/config/providers.rs @@ -47,6 +47,7 @@ impl Providers { config: std::sync::Arc, health_check: Arc, locality: Option, + address_selector: Option, ) -> tokio::task::JoinHandle> { match &self { Self::Agones { @@ -63,6 +64,7 @@ impl Providers { health_check.clone(), locality.clone(), config.clone(), + address_selector.clone(), ) } })), diff --git a/src/config/providers/k8s.rs b/src/config/providers/k8s.rs index c3d2e87371..d4be92f563 100644 --- a/src/config/providers/k8s.rs +++ b/src/config/providers/k8s.rs @@ -8,7 +8,7 @@ use kube::runtime::watcher::Event; use agones::GameServer; -use crate::net::endpoint::{Endpoint, Locality}; +use crate::net::endpoint::Locality; pub fn update_filters_from_configmap( client: kube::Client, @@ -100,9 +100,11 @@ pub fn update_endpoints_from_gameservers( namespace: impl AsRef, config: Arc, locality: Option, + address_selector: Option, ) -> impl Stream> { async_stream::stream! { for await event in gameserver_events(client, namespace) { + let ads = address_selector.as_ref(); match event? { Event::Applied(server) => { tracing::debug!("received applied event from k8s"); @@ -112,12 +114,9 @@ pub fn update_endpoints_from_gameservers( continue; } - let endpoint = match Endpoint::try_from(server) { - Ok(endpoint) => endpoint, - Err(error) => { - tracing::warn!(%error, "received invalid gameserver to apply from k8s"); - continue; - } + let Some(endpoint) = server.endpoint(ads) else { + tracing::warn!(selector=?ads, "received invalid gameserver to apply from k8s"); + continue; }; tracing::debug!(endpoint=%serde_json::to_value(&endpoint).unwrap(), "Adding endpoint"); config.clusters.write() @@ -128,16 +127,12 @@ pub fn update_endpoints_from_gameservers( tracing::debug!("received restart event from k8s"); let servers: BTreeSet<_> = servers .into_iter() - .filter(GameServer::is_allocated) - .map(Endpoint::try_from) - .filter_map(|result| { - match result { - Ok(endpoint) => Some(endpoint), - Err(error) => { - tracing::warn!(%error, "received invalid gameserver on restart from k8s"); - None - } + .filter_map(|server| { + if !server.is_allocated() { + return None; } + + server.endpoint(ads) }) .collect(); @@ -151,7 +146,7 @@ pub fn update_endpoints_from_gameservers( Event::Deleted(server) => { tracing::debug!("received delete event from k8s"); - let found = if let Some(endpoint) = server.endpoint() { + let found = if let Some(endpoint) = server.endpoint(ads) { config.clusters.write().remove_endpoint(&endpoint) } else { config.clusters.write().remove_endpoint_if(|endpoint| { @@ -161,7 +156,7 @@ pub fn update_endpoints_from_gameservers( if !found { tracing::debug!( - endpoint=%serde_json::to_value(server.endpoint()).unwrap(), + endpoint=%serde_json::to_value(server.endpoint(ads)).unwrap(), name=%serde_json::to_value(server.metadata.name).unwrap(), "received unknown gameserver to delete from k8s" ); diff --git a/src/config/providers/k8s/agones.rs b/src/config/providers/k8s/agones.rs index f5c13f4b4f..4f374311bd 100644 --- a/src/config/providers/k8s/agones.rs +++ b/src/config/providers/k8s/agones.rs @@ -15,6 +15,7 @@ */ use k8s_openapi::{ + api::core::v1::NodeAddress, apiextensions_apiserver::pkg::apis::apiextensions::v1::{ CustomResourceDefinition, CustomResourceDefinitionNames, CustomResourceDefinitionSpec, CustomResourceDefinitionVersion, CustomResourceValidation, @@ -40,8 +41,11 @@ pub struct GameServer { } impl GameServer { - pub fn endpoint(&self) -> Option { - self.status.as_ref().map(|status| { + pub fn endpoint( + &self, + address_selector: Option<&crate::config::AddressSelector>, + ) -> Option { + self.status.as_ref().and_then(|status| { let port = status .ports .as_ref() @@ -58,13 +62,32 @@ impl GameServer { map }; - Endpoint::with_metadata( - (status.address.clone(), port).into(), + let address = if let Some(ads) = address_selector { + status.addresses.iter().find_map(|adr| { + if adr.type_ != ads.name { + return None; + } + + use crate::config::AddrKind; + match ads.kind { + AddrKind::Any => Some(adr.address.clone()), + AddrKind::Ipv4 => (!adr.address.contains(':')).then(|| adr.address.clone()), + AddrKind::Ipv6 => adr.address.contains(':').then(|| adr.address.clone()), + } + })? + } else { + status.address.clone() + }; + + let ep = Endpoint::with_metadata( + (address, port).into(), crate::net::endpoint::metadata::MetadataView::with_unknown( crate::net::endpoint::Metadata { tokens }, extra_metadata, ), - ) + ); + + Some(ep) }) } @@ -133,7 +156,7 @@ impl GameServer { pub fn is_allocated(&self) -> bool { self.status.as_ref().map_or(false, |status| { - tracing::trace!(%status.address, ?status.state, "checking gameserver"); + tracing::trace!(?status.addresses, ?status.state, "checking gameserver"); matches!(status.state, GameServerState::Allocated) }) } @@ -289,16 +312,6 @@ impl Default for GameServerSpec { } } -impl TryFrom for Endpoint { - type Error = tonic::Status; - - fn try_from(server: GameServer) -> Result { - server - .endpoint() - .ok_or_else(|| tonic::Status::internal("No status found for game server")) - } -} - #[derive(Clone, Debug, Deserialize, Serialize, JsonSchema)] pub struct Health { /// Whether health checking is disabled or not @@ -375,6 +388,8 @@ pub struct GameServerStatus { pub state: GameServerState, pub ports: Option>, pub address: String, + #[serde(default)] + pub addresses: Vec, pub node_name: String, pub reserved_until: Option, } diff --git a/src/config/watch/agones.rs b/src/config/watch/agones.rs index 239655b4c0..6459a4ae5f 100644 --- a/src/config/watch/agones.rs +++ b/src/config/watch/agones.rs @@ -28,6 +28,7 @@ pub async fn watch( health_check: Arc, locality: Option, config: Arc, + address_selector: Option, ) -> crate::Result<()> { let client = tokio::time::timeout( std::time::Duration::from_secs(5), @@ -44,6 +45,7 @@ pub async fn watch( gameservers_namespace, config.clone(), locality, + address_selector, ); tokio::pin!(configmap_reflector); tokio::pin!(gameserver_reflector); diff --git a/test/src/lib.rs b/test/src/lib.rs index 41a1b405f5..c1059bb0c4 100644 --- a/test/src/lib.rs +++ b/test/src/lib.rs @@ -397,6 +397,7 @@ impl Pail { relay_servers, qcmp_socket, provider: Some(Providers::File { path }), + address_selector: None, } .run(RunArgs { config: aconfig,