From 47e47be745a8bd2a7738051b8adec524eb944222 Mon Sep 17 00:00:00 2001 From: XAMPPRocky <4464295+XAMPPRocky@users.noreply.github.com> Date: Thu, 30 Jan 2025 11:33:15 +0100 Subject: [PATCH] chore: Add static provider, allow XDP option bet set through env, allow no command in CLI (#1082) --- src/cli.rs | 123 +++++++++++++++++++--------------- src/cli/service.rs | 34 ++++++++-- src/config/providersv2.rs | 136 +++++++++++++++++++++++++++++++++----- 3 files changed, 219 insertions(+), 74 deletions(-) diff --git a/src/cli.rs b/src/cli.rs index f6ec37383..566bed034 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -121,7 +121,7 @@ pub struct Cli { #[clap(short, long, env)] pub quiet: bool, #[clap(subcommand)] - pub command: Commands, + pub command: Option, #[clap( long, default_value_t = LogFormats::Auto, @@ -134,6 +134,8 @@ pub struct Cli { #[command(flatten)] pub locality: LocalityCli, #[command(flatten)] + pub providers: crate::config::providersv2::Providers, + #[command(flatten)] pub service: Service, } @@ -209,47 +211,55 @@ impl Cli { // Non-long running commands (e.g. ones with no administration server) // are executed here. use crate::components::{self, admin as admin_server}; - let mode = match &self.command { - Commands::Qcmp(Qcmp::Ping(ping)) => return ping.run().await, - Commands::GenerateConfigSchema(generator) => { - return generator.generate_config_schema(); - } - Commands::Agent(_) => Admin::Agent(<_>::default()), - Commands::Proxy(proxy) => { - let ready = components::proxy::Ready { - idle_request_interval: proxy - .idle_request_interval_secs - .map(std::time::Duration::from_secs) - .unwrap_or(admin_server::IDLE_REQUEST_INTERVAL), - ..Default::default() - }; - Admin::Proxy(ready) - } - Commands::Manage(_mng) => { - let ready = components::manage::Ready { - is_manage: true, - ..Default::default() - }; - Admin::Manage(ready) - } - Commands::Relay(relay) => { - let ready = components::relay::Ready { - idle_request_interval: relay - .idle_request_interval_secs - .map(std::time::Duration::from_secs) - .unwrap_or(admin_server::IDLE_REQUEST_INTERVAL), - ..Default::default() - }; - Admin::Relay(ready) - } + let mode = if let Some(command) = &self.command { + Some(match command { + Commands::Qcmp(Qcmp::Ping(ping)) => return ping.run().await, + Commands::GenerateConfigSchema(generator) => { + return generator.generate_config_schema(); + } + Commands::Agent(_) => Admin::Agent(<_>::default()), + Commands::Proxy(proxy) => { + let ready = components::proxy::Ready { + idle_request_interval: proxy + .idle_request_interval_secs + .map(std::time::Duration::from_secs) + .unwrap_or(admin_server::IDLE_REQUEST_INTERVAL), + ..Default::default() + }; + Admin::Proxy(ready) + } + Commands::Manage(_mng) => { + let ready = components::manage::Ready { + is_manage: true, + ..Default::default() + }; + Admin::Manage(ready) + } + Commands::Relay(relay) => { + let ready = components::relay::Ready { + idle_request_interval: relay + .idle_request_interval_secs + .map(std::time::Duration::from_secs) + .unwrap_or(admin_server::IDLE_REQUEST_INTERVAL), + ..Default::default() + }; + Admin::Relay(ready) + } + }) + } else { + None }; + if !self.service.any_service_enabled() && mode.is_none() { + eyre::bail!("no service specified, shutting down"); + } + tracing::debug!(cli = ?self, "config parameters"); let config = Arc::new(match Self::read_config(self.config)? { Some(mut config) => { // Workaround deficiency in serde flatten + untagged - if matches!(self.command, Commands::Agent(..)) { + if matches!(self.command, Some(Commands::Agent(..))) { config.datacenter = match config.datacenter { crate::config::DatacenterConfig::Agent { icao_code, @@ -272,38 +282,47 @@ impl Cli { config } - None if matches!(self.command, Commands::Agent(..)) => Config::default_agent(), + None if matches!(self.command, Some(Commands::Agent(..))) => Config::default_agent(), None => Config::default_non_agent(), }); if self.admin.enabled { - mode.server(config.clone(), self.admin.address); + if let Some(mode) = mode.as_ref() { + mode.server(config.clone(), self.admin.address); + } } - let shutdown_rx = crate::signal::spawn_handler(); + let mut shutdown_rx = crate::signal::spawn_handler(); crate::alloc::spawn_heap_stats_updates( std::time::Duration::from_secs(10), shutdown_rx.clone(), ); + let ready = <_>::default(); + let locality = self.locality.locality(); + self.providers + .spawn_providers(&config, ready, locality.clone()); self.service.spawn_services(&config, &shutdown_rx)?; - let locality = self.locality.locality(); - match (self.command, mode) { - (Commands::Agent(agent), Admin::Agent(ready)) => { - agent.run(locality, config, ready, shutdown_rx).await - } - (Commands::Proxy(runner), Admin::Proxy(ready)) => { - runner.run(config, ready, tx, shutdown_rx).await - } - (Commands::Manage(manager), Admin::Manage(ready)) => { - manager.run(locality, config, ready, shutdown_rx).await - } - (Commands::Relay(relay), Admin::Relay(ready)) => { - relay.run(locality, config, ready, shutdown_rx).await + if let Some(mode) = mode { + match (self.command.unwrap(), mode) { + (Commands::Agent(agent), Admin::Agent(ready)) => { + agent.run(locality, config, ready, shutdown_rx).await + } + (Commands::Proxy(runner), Admin::Proxy(ready)) => { + runner.run(config, ready, tx, shutdown_rx).await + } + (Commands::Manage(manager), Admin::Manage(ready)) => { + manager.run(locality, config, ready, shutdown_rx).await + } + (Commands::Relay(relay), Admin::Relay(ready)) => { + relay.run(locality, config, ready, shutdown_rx).await + } + _ => unreachable!(), } - _ => unreachable!(), + } else { + shutdown_rx.changed().await.map_err(From::from) } } diff --git a/src/cli/service.rs b/src/cli/service.rs index 22f1049d3..365bdee39 100644 --- a/src/cli/service.rs +++ b/src/cli/service.rs @@ -162,6 +162,15 @@ impl Service { self } + /// Sets the xDS service port. + pub fn any_service_enabled(&self) -> bool { + self.udp_enabled + || self.qcmp_enabled + || self.phoenix_enabled + || self.xds_enabled + || self.mds_enabled + } + /// The main entrypoint for listening network servers. When called will /// spawn any and all enabled services, if successful returning a future /// that can be await to wait on services to be cancelled. @@ -202,6 +211,7 @@ impl Service { shutdown_rx: &crate::signal::ShutdownRx, ) -> crate::Result>> { if self.phoenix_enabled { + tracing::info!(port=%self.qcmp_port, "starting phoenix service"); let phoenix = crate::net::TcpListener::bind(Some(self.phoenix_port))?; crate::net::phoenix::spawn( phoenix, @@ -220,6 +230,7 @@ impl Service { shutdown_rx: &crate::signal::ShutdownRx, ) -> crate::Result>> { if self.qcmp_enabled { + tracing::info!(port=%self.qcmp_port, "starting qcmp service"); let qcmp = crate::net::raw_socket_with_reuse(self.qcmp_port)?; crate::codec::qcmp::spawn(qcmp, shutdown_rx.clone())?; } @@ -238,6 +249,7 @@ impl Service { use futures::TryFutureExt as _; + tracing::info!(port=%self.mds_port, "starting mds service"); let listener = crate::net::TcpListener::bind(Some(self.mds_port))?; Ok(either::Right( @@ -291,6 +303,8 @@ impl Service { return Ok((either::Left(std::future::pending()), None)); } + tracing::info!(port=%self.udp_port, "starting udp service"); + #[cfg(target_os = "linux")] { match self.spawn_xdp(config.clone(), self.xdp.force_xdp) { @@ -368,6 +382,7 @@ impl Service { eyre::bail!("XDP currently disabled by default"); } + tracing::info!(port=%self.mds_port, "setting up xdp module"); let workers = xdp::setup_xdp_io(xdp::XdpConfig { nic: self .xdp @@ -396,31 +411,40 @@ pub struct XdpOptions { /// If not specified quilkin will attempt to determine the most appropriate /// network interface to use. Quilkin will exit with an error if the network /// interface does not exist, or a suitable default cannot be determined. - #[clap(long = "service.udp.xdp.network-interface")] + #[clap( + long = "service.udp.xdp.network-interface", + env = "QUILKIN_SERVICE_UDP_XDP_NETWORK_INTERFACE" + )] pub network_interface: Option, /// Forces the use of XDP. /// /// If XDP is not available on the chosen NIC, Quilkin exits with an error. /// If false, io-uring will be used as the fallback implementation. - #[clap(long = "service.udp.xdp")] + #[clap(long = "service.udp.xdp", env = "QUILKIN_SERVICE_UDP_XDP")] pub force_xdp: bool, /// Forces the use of [`XDP_ZEROCOPY`](https://www.kernel.org/doc/html/latest/networking/af_xdp.html#xdp-copy-and-xdp-zerocopy-bind-flags) /// /// If zero copy is not available on the chosen NIC, Quilkin exits with an error - #[clap(long = "service.udp.xdp.zerocopy")] + #[clap( + long = "service.udp.xdp.zerocopy", + env = "QUILKIN_SERVICE_UDP_XDP_ZEROCOPY" + )] pub force_zerocopy: bool, /// Forces the use of [TX checksum offload](https://docs.kernel.org/6.8/networking/xsk-tx-metadata.html) /// /// TX checksum offload is an optional feature allowing the data portion of /// a packet to have its internet checksum calculation offloaded to the NIC, /// as otherwise this is done in software - #[clap(long = "service.udp.xdp.tco")] + #[clap(long = "service.udp.xdp.tco", env = "QUILKIN_SERVICE_UDP_XDP_TCO")] pub force_tx_checksum_offload: bool, /// The maximum amount of memory mapped for packet buffers, in bytes /// /// If not specified, this defaults to 4MiB (2k allocated packets of 2k each at a time) /// per NIC queue, ie 128MiB on a 32 queue NIC - #[clap(long = "service.udp.xdp.memory-limit")] + #[clap( + long = "service.udp.xdp.memory-limit", + env = "QUILKIN_SERVICE_UDP_XDP_MEMORY_LIMIT" + )] pub maximum_memory: Option, } diff --git a/src/config/providersv2.rs b/src/config/providersv2.rs index e2ccb7436..71537a774 100644 --- a/src/config/providersv2.rs +++ b/src/config/providersv2.rs @@ -29,20 +29,20 @@ const RETRIES: u32 = 25; const BACKOFF_STEP: std::time::Duration = std::time::Duration::from_millis(250); const MAX_DELAY: std::time::Duration = std::time::Duration::from_secs(2); -/// The available xDS source providers. +/// The available xDS source provider. #[derive(Clone, Debug, Default, clap::Args)] pub struct Providers { /// Watches Agones' game server CRDs for `Allocated` game server endpoints, /// and for a `ConfigMap` that specifies the filter configuration. #[arg( - long = "providers.k8s", + long = "provider.k8s", env = "QUILKIN_PROVIDERS_K8S", default_value_t = false )] k8s_enabled: bool, #[arg( - long = "providers.k8s.namespace", + long = "provider.k8s.namespace", env = "QUILKIN_PROVIDERS_K8S_NAMESPACE", default_value_t = From::from("default"), requires("k8s_enabled"), @@ -50,14 +50,14 @@ pub struct Providers { k8s_namespace: String, #[arg( - long = "providers.k8s.agones", + long = "provider.k8s.agones", env = "QUILKIN_PROVIDERS_K8S_AGONES", default_value_t = false )] agones_enabled: bool, #[arg( - long = "providers.k8s.agones.namespace", + long = "provider.k8s.agones.namespace", env = "QUILKIN_PROVIDERS_K8S_AGONES_NAMESPACE", default_value_t = From::from("default"), requires("agones_enabled"), @@ -67,14 +67,14 @@ pub struct Providers { /// If specified, filters the available gameserver addresses to the one that /// matches the specified type #[arg( - long = "providers.k8s.agones.address_type", + long = "provider.k8s.agones.address_type", env = "QUILKIN_PROVIDERS_K8S_AGONES_ADDRESS_TYPE", requires("agones_enabled") )] pub address_type: Option, /// If specified, additionally filters the gameserver address by its ip kind #[arg( - long = "providers.k8s.agones.ip_kind", + long = "provider.k8s.agones.ip_kind", env = "QUILKIN_PROVIDERS_K8S_AGONES_IP_KIND", requires("address_type"), value_enum @@ -82,7 +82,7 @@ pub struct Providers { pub ip_kind: Option, #[arg( - long = "providers.fs", + long = "provider.fs", env = "QUILKIN_PROVIDERS_FS", conflicts_with("k8s_enabled"), default_value_t = false @@ -90,7 +90,7 @@ pub struct Providers { fs_enabled: bool, #[arg( - long = "providers.fs", + long = "provider.fs.path", env = "QUILKIN_PROVIDERS_FS_PATH", requires("fs_enabled"), default_value = "/etc/quilkin/config.yaml" @@ -98,32 +98,31 @@ pub struct Providers { fs_path: std::path::PathBuf, /// One or more `quilkin relay` endpoints to push configuration changes to. #[clap( - long = "providers.mds.endpoints", + long = "provider.mds.endpoints", env = "QUILKIN_PROVIDERS_MDS_ENDPOINTS" )] pub relay: Vec, /// The remote URL or local file path to retrieve the Maxmind database. #[clap( - long = "providers.mmdb.endpoints", + long = "provider.mmdb.endpoints", env = "QUILKIN_PROVIDERS_MMDB_ENDPOINTS" )] pub mmdb: Option, /// One or more socket addresses to forward packets to. #[clap( - long = "providers.static.endpoints", + long = "provider.static.endpoints", env = "QUILKIN_PROVIDERS_STATIC_ENDPOINTS" )] - pub to: Vec, + pub endpoints: Vec, /// Assigns dynamic tokens to each address in the `--to` argument /// /// Format is `:` - #[clap(long, env = "QUILKIN_DEST_TOKENS", requires("to"))] #[clap( - long = "providers.static.endpoint_tokens", + long = "provider.static.endpoint_tokens", env = "QUILKIN_PROVIDERS_STATIC_ENDPOINT_TOKENS", - requires("to") + requires("endpoints") )] - pub to_tokens: Option, + pub endpoint_tokens: Option, } impl Providers { @@ -157,6 +156,107 @@ impl Providers { self } + fn static_enabled(&self) -> bool { + !self.endpoints.is_empty() + } + + pub fn spawn_static_provider( + &self, + config: Arc, + ) -> crate::Result>> { + let endpoint_tokens = self + .endpoint_tokens + .as_ref() + .map(|tt| { + let Some((count, length)) = tt.split_once(':') else { + eyre::bail!("--to-tokens `{tt}` is invalid, it must have a `:` separator") + }; + + let count = count.parse()?; + let length = length.parse()?; + + Ok(crate::components::proxy::ToTokens { count, length }) + }) + .transpose()?; + + let endpoints = if let Some(tt) = endpoint_tokens { + let (unique, overflow) = 256u64.overflowing_pow(tt.length as _); + if overflow { + panic!( + "can't generate {} tokens of length {} maximum is {}", + self.endpoints.len() * tt.count, + tt.length, + u64::MAX, + ); + } + + if unique < (self.endpoints.len() * tt.count) as u64 { + panic!( + "we require {} unique tokens but only {unique} can be generated", + self.endpoints.len() * tt.count, + ); + } + + { + use crate::filters::StaticFilter as _; + config.filters.store(Arc::new( + crate::filters::FilterChain::try_create([ + crate::filters::Capture::as_filter_config( + crate::filters::capture::Config { + metadata_key: crate::filters::capture::CAPTURED_BYTES.into(), + strategy: crate::filters::capture::Strategy::Suffix( + crate::filters::capture::Suffix { + size: tt.length as _, + remove: true, + }, + ), + }, + ) + .unwrap(), + crate::filters::TokenRouter::as_filter_config(None).unwrap(), + ]) + .unwrap(), + )); + } + + let count = tt.count as u64; + + self.endpoints + .iter() + .enumerate() + .map(|(ind, sa)| { + let mut tokens = std::collections::BTreeSet::new(); + let start = ind as u64 * count; + for i in start..(start + count) { + tokens.insert(i.to_le_bytes()[..tt.length].to_vec()); + } + + crate::net::endpoint::Endpoint::with_metadata( + (*sa).into(), + crate::net::endpoint::Metadata { tokens }, + ) + }) + .collect() + } else { + self.endpoints + .iter() + .cloned() + .map(crate::net::endpoint::Endpoint::from) + .collect() + }; + + tracing::info!( + provider = "static", + endpoints = serde_json::to_string(&endpoints).unwrap(), + "setting endpoints" + ); + config.clusters.modify(|clusters| { + clusters.insert(None, endpoints); + }); + + Ok(tokio::spawn(std::future::pending())) + } + pub fn spawn_k8s_provider( &self, health_check: Arc, @@ -275,6 +375,8 @@ impl Providers { ) } })) + } else if self.static_enabled() { + self.spawn_static_provider(config.clone()).unwrap() } else { tokio::spawn(async move { Ok(()) }) }