Skip to content

Commit

Permalink
chore: Add static provider, allow XDP option bet set through env, all…
Browse files Browse the repository at this point in the history
…ow no command in CLI (#1082)
  • Loading branch information
XAMPPRocky authored Jan 30, 2025
1 parent dbd6b98 commit 47e47be
Show file tree
Hide file tree
Showing 3 changed files with 219 additions and 74 deletions.
123 changes: 71 additions & 52 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ pub struct Cli {
#[clap(short, long, env)]
pub quiet: bool,
#[clap(subcommand)]
pub command: Commands,
pub command: Option<Commands>,
#[clap(
long,
default_value_t = LogFormats::Auto,
Expand All @@ -134,6 +134,8 @@ pub struct Cli {
#[command(flatten)]
pub locality: LocalityCli,
#[command(flatten)]
pub providers: crate::config::providersv2::Providers,
#[command(flatten)]
pub service: Service,
}

Expand Down Expand Up @@ -209,47 +211,55 @@ impl Cli {
// Non-long running commands (e.g. ones with no administration server)
// are executed here.
use crate::components::{self, admin as admin_server};
let mode = match &self.command {
Commands::Qcmp(Qcmp::Ping(ping)) => return ping.run().await,
Commands::GenerateConfigSchema(generator) => {
return generator.generate_config_schema();
}
Commands::Agent(_) => Admin::Agent(<_>::default()),
Commands::Proxy(proxy) => {
let ready = components::proxy::Ready {
idle_request_interval: proxy
.idle_request_interval_secs
.map(std::time::Duration::from_secs)
.unwrap_or(admin_server::IDLE_REQUEST_INTERVAL),
..Default::default()
};
Admin::Proxy(ready)
}
Commands::Manage(_mng) => {
let ready = components::manage::Ready {
is_manage: true,
..Default::default()
};
Admin::Manage(ready)
}
Commands::Relay(relay) => {
let ready = components::relay::Ready {
idle_request_interval: relay
.idle_request_interval_secs
.map(std::time::Duration::from_secs)
.unwrap_or(admin_server::IDLE_REQUEST_INTERVAL),
..Default::default()
};
Admin::Relay(ready)
}
let mode = if let Some(command) = &self.command {
Some(match command {
Commands::Qcmp(Qcmp::Ping(ping)) => return ping.run().await,
Commands::GenerateConfigSchema(generator) => {
return generator.generate_config_schema();
}
Commands::Agent(_) => Admin::Agent(<_>::default()),
Commands::Proxy(proxy) => {
let ready = components::proxy::Ready {
idle_request_interval: proxy
.idle_request_interval_secs
.map(std::time::Duration::from_secs)
.unwrap_or(admin_server::IDLE_REQUEST_INTERVAL),
..Default::default()
};
Admin::Proxy(ready)
}
Commands::Manage(_mng) => {
let ready = components::manage::Ready {
is_manage: true,
..Default::default()
};
Admin::Manage(ready)
}
Commands::Relay(relay) => {
let ready = components::relay::Ready {
idle_request_interval: relay
.idle_request_interval_secs
.map(std::time::Duration::from_secs)
.unwrap_or(admin_server::IDLE_REQUEST_INTERVAL),
..Default::default()
};
Admin::Relay(ready)
}
})
} else {
None
};

if !self.service.any_service_enabled() && mode.is_none() {
eyre::bail!("no service specified, shutting down");
}

tracing::debug!(cli = ?self, "config parameters");

let config = Arc::new(match Self::read_config(self.config)? {
Some(mut config) => {
// Workaround deficiency in serde flatten + untagged
if matches!(self.command, Commands::Agent(..)) {
if matches!(self.command, Some(Commands::Agent(..))) {
config.datacenter = match config.datacenter {
crate::config::DatacenterConfig::Agent {
icao_code,
Expand All @@ -272,38 +282,47 @@ impl Cli {

config
}
None if matches!(self.command, Commands::Agent(..)) => Config::default_agent(),
None if matches!(self.command, Some(Commands::Agent(..))) => Config::default_agent(),
None => Config::default_non_agent(),
});

if self.admin.enabled {
mode.server(config.clone(), self.admin.address);
if let Some(mode) = mode.as_ref() {
mode.server(config.clone(), self.admin.address);
}
}

let shutdown_rx = crate::signal::spawn_handler();
let mut shutdown_rx = crate::signal::spawn_handler();

crate::alloc::spawn_heap_stats_updates(
std::time::Duration::from_secs(10),
shutdown_rx.clone(),
);

let ready = <_>::default();
let locality = self.locality.locality();
self.providers
.spawn_providers(&config, ready, locality.clone());
self.service.spawn_services(&config, &shutdown_rx)?;

let locality = self.locality.locality();
match (self.command, mode) {
(Commands::Agent(agent), Admin::Agent(ready)) => {
agent.run(locality, config, ready, shutdown_rx).await
}
(Commands::Proxy(runner), Admin::Proxy(ready)) => {
runner.run(config, ready, tx, shutdown_rx).await
}
(Commands::Manage(manager), Admin::Manage(ready)) => {
manager.run(locality, config, ready, shutdown_rx).await
}
(Commands::Relay(relay), Admin::Relay(ready)) => {
relay.run(locality, config, ready, shutdown_rx).await
if let Some(mode) = mode {
match (self.command.unwrap(), mode) {
(Commands::Agent(agent), Admin::Agent(ready)) => {
agent.run(locality, config, ready, shutdown_rx).await
}
(Commands::Proxy(runner), Admin::Proxy(ready)) => {
runner.run(config, ready, tx, shutdown_rx).await
}
(Commands::Manage(manager), Admin::Manage(ready)) => {
manager.run(locality, config, ready, shutdown_rx).await
}
(Commands::Relay(relay), Admin::Relay(ready)) => {
relay.run(locality, config, ready, shutdown_rx).await
}
_ => unreachable!(),
}
_ => unreachable!(),
} else {
shutdown_rx.changed().await.map_err(From::from)
}
}

Expand Down
34 changes: 29 additions & 5 deletions src/cli/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,15 @@ impl Service {
self
}

/// Sets the xDS service port.
pub fn any_service_enabled(&self) -> bool {
self.udp_enabled
|| self.qcmp_enabled
|| self.phoenix_enabled
|| self.xds_enabled
|| self.mds_enabled
}

/// The main entrypoint for listening network servers. When called will
/// spawn any and all enabled services, if successful returning a future
/// that can be await to wait on services to be cancelled.
Expand Down Expand Up @@ -202,6 +211,7 @@ impl Service {
shutdown_rx: &crate::signal::ShutdownRx,
) -> crate::Result<impl std::future::Future<Output = crate::Result<()>>> {
if self.phoenix_enabled {
tracing::info!(port=%self.qcmp_port, "starting phoenix service");
let phoenix = crate::net::TcpListener::bind(Some(self.phoenix_port))?;
crate::net::phoenix::spawn(
phoenix,
Expand All @@ -220,6 +230,7 @@ impl Service {
shutdown_rx: &crate::signal::ShutdownRx,
) -> crate::Result<impl Future<Output = crate::Result<()>>> {
if self.qcmp_enabled {
tracing::info!(port=%self.qcmp_port, "starting qcmp service");
let qcmp = crate::net::raw_socket_with_reuse(self.qcmp_port)?;
crate::codec::qcmp::spawn(qcmp, shutdown_rx.clone())?;
}
Expand All @@ -238,6 +249,7 @@ impl Service {

use futures::TryFutureExt as _;

tracing::info!(port=%self.mds_port, "starting mds service");
let listener = crate::net::TcpListener::bind(Some(self.mds_port))?;

Ok(either::Right(
Expand Down Expand Up @@ -291,6 +303,8 @@ impl Service {
return Ok((either::Left(std::future::pending()), None));
}

tracing::info!(port=%self.udp_port, "starting udp service");

#[cfg(target_os = "linux")]
{
match self.spawn_xdp(config.clone(), self.xdp.force_xdp) {
Expand Down Expand Up @@ -368,6 +382,7 @@ impl Service {
eyre::bail!("XDP currently disabled by default");
}

tracing::info!(port=%self.mds_port, "setting up xdp module");
let workers = xdp::setup_xdp_io(xdp::XdpConfig {
nic: self
.xdp
Expand Down Expand Up @@ -396,31 +411,40 @@ pub struct XdpOptions {
/// If not specified quilkin will attempt to determine the most appropriate
/// network interface to use. Quilkin will exit with an error if the network
/// interface does not exist, or a suitable default cannot be determined.
#[clap(long = "service.udp.xdp.network-interface")]
#[clap(
long = "service.udp.xdp.network-interface",
env = "QUILKIN_SERVICE_UDP_XDP_NETWORK_INTERFACE"
)]
pub network_interface: Option<String>,
/// Forces the use of XDP.
///
/// If XDP is not available on the chosen NIC, Quilkin exits with an error.
/// If false, io-uring will be used as the fallback implementation.
#[clap(long = "service.udp.xdp")]
#[clap(long = "service.udp.xdp", env = "QUILKIN_SERVICE_UDP_XDP")]
pub force_xdp: bool,
/// Forces the use of [`XDP_ZEROCOPY`](https://www.kernel.org/doc/html/latest/networking/af_xdp.html#xdp-copy-and-xdp-zerocopy-bind-flags)
///
/// If zero copy is not available on the chosen NIC, Quilkin exits with an error
#[clap(long = "service.udp.xdp.zerocopy")]
#[clap(
long = "service.udp.xdp.zerocopy",
env = "QUILKIN_SERVICE_UDP_XDP_ZEROCOPY"
)]
pub force_zerocopy: bool,
/// Forces the use of [TX checksum offload](https://docs.kernel.org/6.8/networking/xsk-tx-metadata.html)
///
/// TX checksum offload is an optional feature allowing the data portion of
/// a packet to have its internet checksum calculation offloaded to the NIC,
/// as otherwise this is done in software
#[clap(long = "service.udp.xdp.tco")]
#[clap(long = "service.udp.xdp.tco", env = "QUILKIN_SERVICE_UDP_XDP_TCO")]
pub force_tx_checksum_offload: bool,
/// The maximum amount of memory mapped for packet buffers, in bytes
///
/// If not specified, this defaults to 4MiB (2k allocated packets of 2k each at a time)
/// per NIC queue, ie 128MiB on a 32 queue NIC
#[clap(long = "service.udp.xdp.memory-limit")]
#[clap(
long = "service.udp.xdp.memory-limit",
env = "QUILKIN_SERVICE_UDP_XDP_MEMORY_LIMIT"
)]
pub maximum_memory: Option<u64>,
}

Expand Down
Loading

0 comments on commit 47e47be

Please sign in to comment.