Skip to content

Commit

Permalink
Add node address selection via type and ip kind (#934)
Browse files Browse the repository at this point in the history
* Add node address selection via type

* Allow empty

* Select via type and? kind

* Split arguments

---------

Co-authored-by: Mark Mandel <[email protected]>
  • Loading branch information
Jake-Shadle and markmandel authored May 2, 2024
1 parent ca1d668 commit df7b5f2
Show file tree
Hide file tree
Showing 9 changed files with 93 additions and 34 deletions.
28 changes: 28 additions & 0 deletions src/cli/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@ pub struct Agent {
/// The configuration source for a management server.
#[clap(subcommand)]
pub provider: Option<crate::config::Providers>,
/// If specified, filters the available gameserver addresses to the one that
/// matches the specified type
#[clap(long)]
pub address_type: Option<String>,
/// If specified, additionally filters the gameserver address by its ip kind
#[clap(long, requires("address_type"), value_enum)]
pub ip_kind: Option<crate::config::AddrKind>,
/// The interval in seconds at which the agent will wait for a discovery
/// request from a relay server before restarting the connection.
#[clap(long, env = "QUILKIN_IDLE_REQUEST_INTERVAL_SECS")]
Expand All @@ -57,6 +64,21 @@ pub struct Agent {
pub icao_code: crate::config::IcaoCode,
}

impl clap::ValueEnum for crate::config::AddrKind {
fn value_variants<'a>() -> &'a [Self] {
&[Self::Ipv4, Self::Ipv6, Self::Any]
}

fn to_possible_value(&self) -> Option<clap::builder::PossibleValue> {
use clap::builder::PossibleValue as pv;
Some(match self {
Self::Ipv4 => pv::new("v4"),
Self::Ipv6 => pv::new("v6"),
Self::Any => pv::new("any"),
})
}
}

impl Default for Agent {
fn default() -> Self {
Self {
Expand All @@ -68,6 +90,8 @@ impl Default for Agent {
provider: <_>::default(),
idle_request_interval_secs: None,
icao_code: <_>::default(),
address_type: None,
ip_kind: None,
}
}
}
Expand Down Expand Up @@ -97,6 +121,10 @@ impl Agent {
icao_code,
relay_servers: self.relay,
provider: self.provider,
address_selector: self.address_type.map(|at| crate::config::AddressSelector {
name: at,
kind: self.ip_kind.unwrap_or(crate::config::AddrKind::Any),
}),
}
.run(crate::components::RunArgs {
config,
Expand Down
2 changes: 2 additions & 0 deletions src/components/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ pub struct Agent {
pub icao_code: Option<IcaoCode>,
pub relay_servers: Vec<tonic::transport::Endpoint>,
pub provider: Option<Providers>,
pub address_selector: Option<crate::config::AddressSelector>,
}

impl Agent {
Expand Down Expand Up @@ -60,6 +61,7 @@ impl Agent {
config.clone(),
ready.provider_is_healthy.clone(),
self.locality,
self.address_selector,
)),
None => return Err(eyre::eyre!("no configuration provider given")),
};
Expand Down
1 change: 1 addition & 0 deletions src/components/manage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ impl Manage {
config.clone(),
ready.provider_is_healthy.clone(),
self.locality,
None,
);

let idle_request_interval = ready.idle_request_interval;
Expand Down
13 changes: 13 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -861,6 +861,19 @@ impl From<(String, FilterInstance)> for Filter {
}
}

#[derive(Clone, Debug)]
pub struct AddressSelector {
pub name: String,
pub kind: AddrKind,
}

#[derive(Copy, Clone, Debug)]
pub enum AddrKind {
Ipv4,
Ipv6,
Any,
}

#[cfg(test)]
mod tests {
use std::net::Ipv6Addr;
Expand Down
2 changes: 2 additions & 0 deletions src/config/providers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ impl Providers {
config: std::sync::Arc<crate::Config>,
health_check: Arc<AtomicBool>,
locality: Option<crate::net::endpoint::Locality>,
address_selector: Option<crate::config::AddressSelector>,
) -> tokio::task::JoinHandle<crate::Result<()>> {
match &self {
Self::Agones {
Expand All @@ -63,6 +64,7 @@ impl Providers {
health_check.clone(),
locality.clone(),
config.clone(),
address_selector.clone(),
)
}
})),
Expand Down
31 changes: 13 additions & 18 deletions src/config/providers/k8s.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use kube::runtime::watcher::Event;

use agones::GameServer;

use crate::net::endpoint::{Endpoint, Locality};
use crate::net::endpoint::Locality;

pub fn update_filters_from_configmap(
client: kube::Client,
Expand Down Expand Up @@ -100,9 +100,11 @@ pub fn update_endpoints_from_gameservers(
namespace: impl AsRef<str>,
config: Arc<crate::Config>,
locality: Option<Locality>,
address_selector: Option<crate::config::AddressSelector>,
) -> impl Stream<Item = crate::Result<(), eyre::Error>> {
async_stream::stream! {
for await event in gameserver_events(client, namespace) {
let ads = address_selector.as_ref();
match event? {
Event::Applied(server) => {
tracing::debug!("received applied event from k8s");
Expand All @@ -112,12 +114,9 @@ pub fn update_endpoints_from_gameservers(
continue;
}

let endpoint = match Endpoint::try_from(server) {
Ok(endpoint) => endpoint,
Err(error) => {
tracing::warn!(%error, "received invalid gameserver to apply from k8s");
continue;
}
let Some(endpoint) = server.endpoint(ads) else {
tracing::warn!(selector=?ads, "received invalid gameserver to apply from k8s");
continue;
};
tracing::debug!(endpoint=%serde_json::to_value(&endpoint).unwrap(), "Adding endpoint");
config.clusters.write()
Expand All @@ -128,16 +127,12 @@ pub fn update_endpoints_from_gameservers(
tracing::debug!("received restart event from k8s");
let servers: BTreeSet<_> = servers
.into_iter()
.filter(GameServer::is_allocated)
.map(Endpoint::try_from)
.filter_map(|result| {
match result {
Ok(endpoint) => Some(endpoint),
Err(error) => {
tracing::warn!(%error, "received invalid gameserver on restart from k8s");
None
}
.filter_map(|server| {
if !server.is_allocated() {
return None;
}

server.endpoint(ads)
})
.collect();

Expand All @@ -151,7 +146,7 @@ pub fn update_endpoints_from_gameservers(

Event::Deleted(server) => {
tracing::debug!("received delete event from k8s");
let found = if let Some(endpoint) = server.endpoint() {
let found = if let Some(endpoint) = server.endpoint(ads) {
config.clusters.write().remove_endpoint(&endpoint)
} else {
config.clusters.write().remove_endpoint_if(|endpoint| {
Expand All @@ -161,7 +156,7 @@ pub fn update_endpoints_from_gameservers(

if !found {
tracing::debug!(
endpoint=%serde_json::to_value(server.endpoint()).unwrap(),
endpoint=%serde_json::to_value(server.endpoint(ads)).unwrap(),
name=%serde_json::to_value(server.metadata.name).unwrap(),
"received unknown gameserver to delete from k8s"
);
Expand Down
47 changes: 31 additions & 16 deletions src/config/providers/k8s/agones.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/

use k8s_openapi::{
api::core::v1::NodeAddress,
apiextensions_apiserver::pkg::apis::apiextensions::v1::{
CustomResourceDefinition, CustomResourceDefinitionNames, CustomResourceDefinitionSpec,
CustomResourceDefinitionVersion, CustomResourceValidation,
Expand All @@ -40,8 +41,11 @@ pub struct GameServer {
}

impl GameServer {
pub fn endpoint(&self) -> Option<Endpoint> {
self.status.as_ref().map(|status| {
pub fn endpoint(
&self,
address_selector: Option<&crate::config::AddressSelector>,
) -> Option<Endpoint> {
self.status.as_ref().and_then(|status| {
let port = status
.ports
.as_ref()
Expand All @@ -58,13 +62,32 @@ impl GameServer {
map
};

Endpoint::with_metadata(
(status.address.clone(), port).into(),
let address = if let Some(ads) = address_selector {
status.addresses.iter().find_map(|adr| {
if adr.type_ != ads.name {
return None;
}

use crate::config::AddrKind;
match ads.kind {
AddrKind::Any => Some(adr.address.clone()),
AddrKind::Ipv4 => (!adr.address.contains(':')).then(|| adr.address.clone()),
AddrKind::Ipv6 => adr.address.contains(':').then(|| adr.address.clone()),
}
})?
} else {
status.address.clone()
};

let ep = Endpoint::with_metadata(
(address, port).into(),
crate::net::endpoint::metadata::MetadataView::with_unknown(
crate::net::endpoint::Metadata { tokens },
extra_metadata,
),
)
);

Some(ep)
})
}

Expand Down Expand Up @@ -133,7 +156,7 @@ impl GameServer {

pub fn is_allocated(&self) -> bool {
self.status.as_ref().map_or(false, |status| {
tracing::trace!(%status.address, ?status.state, "checking gameserver");
tracing::trace!(?status.addresses, ?status.state, "checking gameserver");
matches!(status.state, GameServerState::Allocated)
})
}
Expand Down Expand Up @@ -289,16 +312,6 @@ impl Default for GameServerSpec {
}
}

impl TryFrom<GameServer> for Endpoint {
type Error = tonic::Status;

fn try_from(server: GameServer) -> Result<Self, Self::Error> {
server
.endpoint()
.ok_or_else(|| tonic::Status::internal("No status found for game server"))
}
}

#[derive(Clone, Debug, Deserialize, Serialize, JsonSchema)]
pub struct Health {
/// Whether health checking is disabled or not
Expand Down Expand Up @@ -375,6 +388,8 @@ pub struct GameServerStatus {
pub state: GameServerState,
pub ports: Option<Vec<GameServerStatusPort>>,
pub address: String,
#[serde(default)]
pub addresses: Vec<NodeAddress>,
pub node_name: String,
pub reserved_until: Option<k8s_openapi::apimachinery::pkg::apis::meta::v1::Time>,
}
Expand Down
2 changes: 2 additions & 0 deletions src/config/watch/agones.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ pub async fn watch(
health_check: Arc<AtomicBool>,
locality: Option<Locality>,
config: Arc<Config>,
address_selector: Option<crate::config::AddressSelector>,
) -> crate::Result<()> {
let client = tokio::time::timeout(
std::time::Duration::from_secs(5),
Expand All @@ -44,6 +45,7 @@ pub async fn watch(
gameservers_namespace,
config.clone(),
locality,
address_selector,
);
tokio::pin!(configmap_reflector);
tokio::pin!(gameserver_reflector);
Expand Down
1 change: 1 addition & 0 deletions test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,7 @@ impl Pail {
relay_servers,
qcmp_socket,
provider: Some(Providers::File { path }),
address_selector: None,
}
.run(RunArgs {
config: aconfig,
Expand Down

0 comments on commit df7b5f2

Please sign in to comment.