diff --git a/Cargo.toml b/Cargo.toml index e706bf63f3..d90dd1807e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -53,6 +53,11 @@ name = "cluster_map" harness = false test = false +[[bench]] +name = "misc" +harness = false +test = false + [dependencies] # Local quilkin-macros = { version = "0.9.0-dev", path = "./macros" } diff --git a/benches/cluster_map.rs b/benches/cluster_map.rs index b132d538c8..9ce2cd775a 100644 --- a/benches/cluster_map.rs +++ b/benches/cluster_map.rs @@ -1,226 +1,14 @@ -use std::{ - collections::BTreeSet, - hash::{Hash as _, Hasher as _}, - net::{Ipv4Addr, Ipv6Addr}, -}; - use divan::Bencher; -use quilkin::net::{cluster::ClusterMap, endpoint::Locality, Endpoint, EndpointAddress}; -use rand::Rng; -use xxhash_rust::xxh3::Xxh3 as Hasher; - -const LOCALITIES: &[&str] = &[ - "us:east1:b", - "us:east1:c", - "us:east1:d", - "us:east4:c", - "us:east4:b", - "us:east4:a", - "us:central1:c", - "us:central1:a", - "us:central1:f", - "us:central1:b", - "us:west1:b", - "us:west1:c", - "us:west1:a", - "europe:west4:a", - "europe:west4:b", - "europe:west4:c", - "europe:west1:b", - "europe:west1:d", - "europe:west1:c", - "europe:west3:c", - "europe:west3:a", - "europe:west3:b", - "europe:west2:c", - "europe:west2:b", - "europe:west2:a", - "asia:east1:b", - "asia:east1:a", - "asia:east1:c", - "asia:southeast1:b", - "asia:southeast1:a", - "asia:southeast1:c", - "asia:northeast1:b", - "asia:northeast1:c", - "asia:northeast1:a", - "asia:south1:c", - "asia:south1:b", - "asia:south1:a", - "australia:southeast1:b", - "australia:southeast1:c", - "australia:southeast1:a", - "southamerica:east1:b", - "southamerica:east1:c", - "southamerica:east1:a", - "asia:east2:a", - "asia:east2:b", - "asia:east2:c", - "asia:northeast2:a", - "asia:northeast2:b", - "asia:northeast2:c", - "asia:northeast3:a", - "asia:northeast3:b", - "asia:northeast3:c", - "asia:south2:a", - "asia:south2:b", - "asia:south2:c", - "asia:southeast2:a", - "asia:southeast2:b", - "asia:southeast2:c", - "australia:southeast2:a", - "australia:southeast2:b", - "australia:southeast2:c", - "europe:central2:a", - "europe:central2:b", - "europe:central2:c", - "europe:north1:a", - "europe:north1:b", - "europe:north1:c", - "europe:southwest1:a", - "europe:southwest1:b", - "europe:southwest1:c", - "europe:west10:a", - "europe:west10:b", - "europe:west10:c", - "europe:west12:a", - "europe:west12:b", - "europe:west12:c", - "europe:west6:a", - "europe:west6:b", - "europe:west6:c", - "europe:west8:a", - "europe:west8:b", - "europe:west8:c", - "europe:west9:a", - "europe:west9:b", - "europe:west9:c", - "me:central1:a", - "me:central1:b", - "me:central1:c", - "me:central2:a", - "me:central2:b", - "me:central2:c", - "me:west1:a", - "me:west1:b", - "me:west1:c", - "northamerica:northeast1:a", - "northamerica:northeast1:b", - "northamerica:northeast1:c", - "northamerica:northeast2:a", - "northamerica:northeast2:b", - "northamerica:northeast2:c", - "southamerica:west1:a", - "southamerica:west1:b", - "southamerica:west1:c", - "us:east5:a", - "us:east5:b", - "us:east5:c", - "us:south1:a", - "us:south1:b", - "us:south1:c", - "us:west2:a", - "us:west2:b", - "us:west2:c", - "us:west3:a", - "us:west3:b", - "us:west3:c", - "us:west4:a", - "us:west4:b", - "us:west4:c", -]; - -fn gen_endpoints(rng: &mut rand::rngs::SmallRng, hasher: &mut Hasher) -> BTreeSet { - let num_endpoints = rng.gen_range(100..10_000); - hasher.write_u16(num_endpoints); - - let mut endpoints = BTreeSet::new(); - - for i in 0..num_endpoints { - let ep_addr = match i % 3 { - 0 => (Ipv4Addr::new(100, 200, (i >> 8) as _, (i & 0xff) as _), i).into(), - 1 => EndpointAddress { - host: quilkin::net::endpoint::AddressKind::Name(format!("benchmark-{i}")), - port: i, - }, - 2 => (Ipv6Addr::new(100, 200, i, 0, 0, 1, 2, 3), i).into(), - _ => unreachable!(), - }; - - endpoints.insert(Endpoint::new(ep_addr)); - } - - for ep in &endpoints { - ep.address.hash(hasher); - } - - endpoints -} - -#[allow(dead_code)] -struct GenCluster { - cm: ClusterMap, - hash: u64, - total_endpoints: usize, - sets: std::collections::BTreeMap, BTreeSet>, -} - -#[inline] -fn write_locality(hasher: &mut Hasher, loc: &Option) { - if let Some(key) = loc { - key.hash(hasher); - } else { - hasher.write("None".as_bytes()); - } -} - -fn gen_cluster_map() -> GenCluster { - use rand::prelude::*; - - let mut rng = rand::rngs::SmallRng::seed_from_u64(S); - - let mut hasher = Hasher::with_seed(S); - let mut total_endpoints = 0; +use quilkin::net::cluster::ClusterMap; - let num_locals = rng.gen_range(10..LOCALITIES.len()); - - // Select how many localities we want, note we add 1 since we always have a default cluster - hasher.write_usize(num_locals + 1); - - let cm = ClusterMap::default(); - - for locality in LOCALITIES.choose_multiple(&mut rng, num_locals) { - let locality = locality.parse().unwrap(); - cm.insert(Some(locality), Default::default()); - } - - // Now actually insert the endpoints, now that the order of keys is established, - // annoying, but note we split out iteration versus insertion, otherwise we deadlock - let keys: Vec<_> = cm.iter().map(|kv| kv.key().clone()).collect(); - let mut sets = std::collections::BTreeMap::new(); - - for key in keys { - write_locality(&mut hasher, &key); - - let ep = gen_endpoints(&mut rng, &mut hasher); - total_endpoints += ep.len(); - cm.insert(key.clone(), ep.clone()); - sets.insert(key, ep); - } - - GenCluster { - cm, - hash: hasher.finish(), - total_endpoints, - sets, - } -} +mod shared; #[divan::bench_group(sample_count = 10)] mod serde { use super::*; use prost_types::Any; use quilkin::net::cluster::proto::Cluster; + use shared::gen_cluster_map; fn serialize_to_protobuf(cm: &ClusterMap) -> Vec { let mut resources = Vec::new(); @@ -316,6 +104,7 @@ const SEEDS: &[u64] = &[100, 200, 300, 400, 500]; #[divan::bench_group(sample_count = 10)] mod ops { use super::*; + use shared::{gen_cluster_map, GenCluster}; fn compute_hash(gc: &GenCluster) -> usize { let mut total_endpoints = 0; diff --git a/benches/misc.rs b/benches/misc.rs new file mode 100644 index 0000000000..cfdfd5840a --- /dev/null +++ b/benches/misc.rs @@ -0,0 +1,342 @@ +use divan::Bencher; +use prost_types::{value::Kind, Value}; +use quilkin::net::cluster::proto::Endpoint as ProtoEndpoint; + +mod shared; + +#[derive(Default)] +struct Name { + counter: usize, +} + +impl GenAddress for Name { + fn generate(&mut self, slim: bool) -> (usize, ProtoEndpoint) { + let ep = if self.counter % 2 == 0 { + let host = format!("nometa-{}", self.counter); + + quilkin::net::Endpoint::new(quilkin::net::EndpointAddress { + host: quilkin::net::endpoint::AddressKind::Name(host), + port: 1000, + }) + } else { + let host = format!("meta-{}", self.counter); + + let mut md = std::collections::BTreeMap::new(); + md.insert( + "counter".to_owned(), + Value { + kind: Some(Kind::NumberValue(self.counter as _)), + }, + ); + + md.insert( + "tokens".to_owned(), + Value { + kind: Some(Kind::ListValue(prost_types::ListValue { + values: vec![ + Value { + kind: Some(Kind::StringValue("abcd".into())), + }, + Value { + kind: Some(Kind::StringValue("1234".into())), + }, + ], + })), + }, + ); + + quilkin::net::Endpoint::with_metadata( + quilkin::net::EndpointAddress { + host: quilkin::net::endpoint::AddressKind::Name(host), + port: 2000, + }, + quilkin::net::endpoint::EndpointMetadata::try_from(prost_types::Struct { + fields: md, + }) + .unwrap(), + ) + }; + + self.counter += 1; + + let ep = if slim { ep.into_proto() } else { ep.into() }; + + use prost::Message; + (ep.encoded_len(), ep) + } +} + +#[derive(Default)] +struct Ip { + counter: u128, +} + +impl GenAddress for Ip { + fn generate(&mut self, slim: bool) -> (usize, ProtoEndpoint) { + let ip = if self.counter % 2 == 0 { + std::net::Ipv6Addr::new( + (self.counter >> 112 & 0xffff) as _, + (self.counter >> 96 & 0xffff) as _, + (self.counter >> 80 & 0xffff) as _, + (self.counter >> 64 & 0xffff) as _, + (self.counter >> 48 & 0xffff) as _, + (self.counter >> 32 & 0xffff) as _, + (self.counter >> 16 & 0xffff) as _, + (self.counter & 0xffff) as _, + ) + .into() + } else { + std::net::Ipv4Addr::new( + (self.counter >> 24 & 0xff) as _, + (self.counter >> 16 & 0xff) as _, + (self.counter >> 8 & 0xff) as _, + (self.counter & 0xff) as _, + ) + .into() + }; + + let ep = quilkin::net::Endpoint::new(quilkin::net::EndpointAddress { + host: quilkin::net::endpoint::AddressKind::Ip(ip), + port: 3000, + }); + + self.counter += 1; + + let pep = if slim { ep.into_proto() } else { ep.into() }; + + use prost::Message; + (pep.encoded_len(), pep) + } +} + +trait GenAddress: Default { + fn generate(&mut self, slim: bool) -> (usize, ProtoEndpoint); +} + +#[divan::bench_group(sample_count = 1000)] +mod endpoint { + use super::*; + + #[divan::bench( + types = [Name, Ip], + )] + fn slower(bencher: Bencher) { + let mut genn = G::default(); + bencher + .with_inputs(|| genn.generate(false)) + .input_counter(|(i, _)| divan::counter::BytesCount::usize(*i)) + .bench_local_values(|(_, ep)| divan::black_box(quilkin::net::Endpoint::try_from(ep))); + } + + #[divan::bench( + types = [Name, Ip], + )] + fn faster(bencher: Bencher) { + let mut genn = G::default(); + bencher + .with_inputs(|| genn.generate(true)) + .input_counter(|(i, _)| divan::counter::BytesCount::usize(*i)) + .bench_local_values(|(_, ep)| divan::black_box(quilkin::net::Endpoint::from_proto(ep))); + } +} + +trait GenResource: Default { + fn generate(&mut self, slim: bool) -> prost_types::Any; +} + +use quilkin::net::xds::{Resource, ResourceType}; +use rand::SeedableRng; + +#[derive(Default)] +struct Listener { + _counter: usize, +} + +impl GenResource for Listener { + fn generate(&mut self, slim: bool) -> prost_types::Any { + use quilkin::filters::{self, StaticFilter}; + let filters = [ + quilkin::config::Filter { + name: filters::capture::Capture::NAME.into(), + label: None, + config: Some( + serde_json::to_value(&filters::capture::Config { + metadata_key: "boop".into(), + strategy: filters::capture::Strategy::Suffix(filters::capture::Suffix { + size: 3, + remove: true, + }), + }) + .unwrap(), + ), + }, + quilkin::config::Filter { + name: filters::compress::Compress::NAME.into(), + label: Some("a label".into()), + config: Some( + serde_json::to_value(filters::compress::Config { + mode: filters::compress::Mode::Lz4, + on_read: filters::compress::Action::Decompress, + on_write: filters::compress::Action::Compress, + }) + .unwrap(), + ), + }, + ]; + + if slim { + ResourceType::FilterChain.encode_to_any(&quilkin::net::cluster::proto::FilterChain { + filters: filters + .into_iter() + .map(|f| quilkin::net::cluster::proto::Filter { + name: f.name, + label: f.label, + config: f.config.map(|c| c.to_string()), + }) + .collect(), + }) + } else { + ResourceType::Listener.encode_to_any(&quilkin::net::xds::listener::Listener { + filter_chains: vec![ + quilkin::generated::envoy::config::listener::v3::FilterChain { + filters: filters + .into_iter() + .map(TryFrom::try_from) + .collect::>() + .unwrap(), + ..Default::default() + }, + ], + ..Default::default() + }) + } + .unwrap() + } +} + +#[derive(Default)] +struct Cluster { + counter: usize, +} + +impl GenResource for Cluster { + fn generate(&mut self, slim: bool) -> prost_types::Any { + let locality = shared::LOCALITIES[self.counter % shared::LOCALITIES.len()]; + let locality: quilkin::net::endpoint::Locality = locality.parse().unwrap(); + + let mut rng = rand::rngs::SmallRng::seed_from_u64(self.counter as u64); + let mut hasher = xxhash_rust::xxh3::Xxh3::new(); + let endpoints = shared::gen_endpoints(&mut rng, &mut hasher); + + let msg = quilkin::generated::quilkin::config::v1alpha1::Cluster { + locality: Some(quilkin::generated::quilkin::config::v1alpha1::Locality { + region: locality.region().into(), + zone: locality.zone().unwrap_or_default().into(), + sub_zone: locality.sub_zone().unwrap_or_default().into(), + }), + endpoints: endpoints + .into_iter() + .map(|ep| if slim { ep.into_proto() } else { ep.into() }) + .collect(), + }; + + ResourceType::Cluster.encode_to_any(&msg).unwrap() + } +} + +// From Config::apply +fn deserialize(a: prost_types::Any) { + match Resource::try_from(a).unwrap() { + Resource::Listener(mut listener) => { + let chain: quilkin::filters::FilterChain = if listener.filter_chains.is_empty() { + Default::default() + } else { + quilkin::filters::FilterChain::try_create_fallible( + listener.filter_chains.swap_remove(0).filters, + ) + .unwrap() + }; + + drop(chain); + } + Resource::FilterChain(_fc) => { + unimplemented!("should not be used") + } + Resource::Datacenter(dc) => { + let _host: std::net::IpAddr = dc.host.parse().unwrap(); + let _dc = quilkin::config::Datacenter { + qcmp_port: dc.qcmp_port.try_into().unwrap(), + icao_code: dc.icao_code.parse().unwrap(), + }; + } + Resource::Cluster(cluster) => { + let _locality: Option = + cluster.locality.map(From::from); + cluster + .endpoints + .into_iter() + .map(quilkin::net::endpoint::Endpoint::try_from) + .collect::, _>>() + .unwrap(); + } + } +} + +fn deserialize_faster(a: prost_types::Any) { + match Resource::try_from(a).unwrap() { + Resource::Listener(_listener) => { + unimplemented!("should not be used"); + } + Resource::FilterChain(fc) => { + quilkin::filters::FilterChain::try_create_fallible(fc.filters).unwrap(); + } + Resource::Datacenter(dc) => { + let _host: std::net::IpAddr = dc.host.parse().unwrap(); + let _dc = quilkin::config::Datacenter { + qcmp_port: dc.qcmp_port.try_into().unwrap(), + icao_code: dc.icao_code.parse().unwrap(), + }; + } + Resource::Cluster(cluster) => { + let _locality: Option = + cluster.locality.map(From::from); + cluster + .endpoints + .into_iter() + .map(quilkin::net::endpoint::Endpoint::from_proto) + .collect::, _>>() + .unwrap(); + } + } +} + +#[divan::bench_group(sample_count = 1000)] +mod resource { + use super::*; + + #[divan::bench( + types = [Listener, Cluster], + )] + fn slower(bencher: Bencher) { + let mut genn = G::default(); + bencher + .with_inputs(|| genn.generate(false)) + .input_counter(|a| divan::counter::BytesCount::usize(a.value.len() + a.type_url.len())) + .bench_local_values(|a| deserialize(divan::black_box(a))); + } + + #[divan::bench( + types = [Listener, Cluster], + )] + fn faster(bencher: Bencher) { + let mut genn = G::default(); + bencher + .with_inputs(|| genn.generate(true)) + .input_counter(|a| divan::counter::BytesCount::usize(a.value.len() + a.type_url.len())) + .bench_local_values(|a| deserialize_faster(divan::black_box(a))); + } +} + +fn main() { + divan::main(); +} diff --git a/benches/shared.rs b/benches/shared.rs index 9760163b95..4acabb8c24 100644 --- a/benches/shared.rs +++ b/benches/shared.rs @@ -366,3 +366,219 @@ impl Drop for QuilkinLoop { self.thread.take().unwrap().join().unwrap(); } } + +use quilkin::net::{cluster::ClusterMap, endpoint::Locality, Endpoint, EndpointAddress}; +use rand::Rng; +use std::{ + collections::BTreeSet, + hash::{Hash as _, Hasher as _}, + net::Ipv6Addr, +}; +use xxhash_rust::xxh3::Xxh3 as Hasher; + +pub const LOCALITIES: &[&str] = &[ + "us:east1:b", + "us:east1:c", + "us:east1:d", + "us:east4:c", + "us:east4:b", + "us:east4:a", + "us:central1:c", + "us:central1:a", + "us:central1:f", + "us:central1:b", + "us:west1:b", + "us:west1:c", + "us:west1:a", + "europe:west4:a", + "europe:west4:b", + "europe:west4:c", + "europe:west1:b", + "europe:west1:d", + "europe:west1:c", + "europe:west3:c", + "europe:west3:a", + "europe:west3:b", + "europe:west2:c", + "europe:west2:b", + "europe:west2:a", + "asia:east1:b", + "asia:east1:a", + "asia:east1:c", + "asia:southeast1:b", + "asia:southeast1:a", + "asia:southeast1:c", + "asia:northeast1:b", + "asia:northeast1:c", + "asia:northeast1:a", + "asia:south1:c", + "asia:south1:b", + "asia:south1:a", + "australia:southeast1:b", + "australia:southeast1:c", + "australia:southeast1:a", + "southamerica:east1:b", + "southamerica:east1:c", + "southamerica:east1:a", + "asia:east2:a", + "asia:east2:b", + "asia:east2:c", + "asia:northeast2:a", + "asia:northeast2:b", + "asia:northeast2:c", + "asia:northeast3:a", + "asia:northeast3:b", + "asia:northeast3:c", + "asia:south2:a", + "asia:south2:b", + "asia:south2:c", + "asia:southeast2:a", + "asia:southeast2:b", + "asia:southeast2:c", + "australia:southeast2:a", + "australia:southeast2:b", + "australia:southeast2:c", + "europe:central2:a", + "europe:central2:b", + "europe:central2:c", + "europe:north1:a", + "europe:north1:b", + "europe:north1:c", + "europe:southwest1:a", + "europe:southwest1:b", + "europe:southwest1:c", + "europe:west10:a", + "europe:west10:b", + "europe:west10:c", + "europe:west12:a", + "europe:west12:b", + "europe:west12:c", + "europe:west6:a", + "europe:west6:b", + "europe:west6:c", + "europe:west8:a", + "europe:west8:b", + "europe:west8:c", + "europe:west9:a", + "europe:west9:b", + "europe:west9:c", + "me:central1:a", + "me:central1:b", + "me:central1:c", + "me:central2:a", + "me:central2:b", + "me:central2:c", + "me:west1:a", + "me:west1:b", + "me:west1:c", + "northamerica:northeast1:a", + "northamerica:northeast1:b", + "northamerica:northeast1:c", + "northamerica:northeast2:a", + "northamerica:northeast2:b", + "northamerica:northeast2:c", + "southamerica:west1:a", + "southamerica:west1:b", + "southamerica:west1:c", + "us:east5:a", + "us:east5:b", + "us:east5:c", + "us:south1:a", + "us:south1:b", + "us:south1:c", + "us:west2:a", + "us:west2:b", + "us:west2:c", + "us:west3:a", + "us:west3:b", + "us:west3:c", + "us:west4:a", + "us:west4:b", + "us:west4:c", +]; + +pub fn gen_endpoints(rng: &mut rand::rngs::SmallRng, hasher: &mut Hasher) -> BTreeSet { + let num_endpoints = rng.gen_range(100..10_000); + hasher.write_u16(num_endpoints); + + let mut endpoints = BTreeSet::new(); + + for i in 0..num_endpoints { + let ep_addr = match i % 3 { + 0 => (Ipv4Addr::new(100, 200, (i >> 8) as _, (i & 0xff) as _), i).into(), + 1 => EndpointAddress { + host: quilkin::net::endpoint::AddressKind::Name(format!("benchmark-{i}")), + port: i, + }, + 2 => (Ipv6Addr::new(100, 200, i, 0, 0, 1, 2, 3), i).into(), + _ => unreachable!(), + }; + + endpoints.insert(Endpoint::new(ep_addr)); + } + + for ep in &endpoints { + ep.address.hash(hasher); + } + + endpoints +} + +#[allow(dead_code)] +pub struct GenCluster { + pub cm: ClusterMap, + hash: u64, + pub total_endpoints: usize, + sets: std::collections::BTreeMap, BTreeSet>, +} + +#[inline] +fn write_locality(hasher: &mut Hasher, loc: &Option) { + if let Some(key) = loc { + key.hash(hasher); + } else { + hasher.write("None".as_bytes()); + } +} + +pub fn gen_cluster_map() -> GenCluster { + use rand::prelude::*; + + let mut rng = rand::rngs::SmallRng::seed_from_u64(S); + + let mut hasher = Hasher::with_seed(S); + let mut total_endpoints = 0; + + let num_locals = rng.gen_range(10..LOCALITIES.len()); + + // Select how many localities we want, note we add 1 since we always have a default cluster + hasher.write_usize(num_locals + 1); + + let cm = ClusterMap::default(); + + for locality in LOCALITIES.choose_multiple(&mut rng, num_locals) { + let locality = locality.parse().unwrap(); + cm.insert(Some(locality), Default::default()); + } + + // Now actually insert the endpoints, now that the order of keys is established, + // annoying, but note we split out iteration versus insertion, otherwise we deadlock + let keys: Vec<_> = cm.iter().map(|kv| kv.key().clone()).collect(); + let mut sets = std::collections::BTreeMap::new(); + + for key in keys { + write_locality(&mut hasher, &key); + + let ep = gen_endpoints(&mut rng, &mut hasher); + total_endpoints += ep.len(); + cm.insert(key.clone(), ep.clone()); + sets.insert(key, ep); + } + + GenCluster { + cm, + hash: hasher.finish(), + total_endpoints, + sets, + } +} diff --git a/proto-gen/gen.rs b/proto-gen/gen.rs index 4ad4cad9df..f58221090a 100644 --- a/proto-gen/gen.rs +++ b/proto-gen/gen.rs @@ -3,7 +3,7 @@ use std::{ process::{Command, Stdio}, }; -const VERSION: &str = "0.2.1"; +const VERSION: &str = "0.2.6"; fn check_version(name: &str, prefix: &str, wanted: &str) -> bool { if let Ok(output) = Command::new(name).arg("--version").output() { @@ -11,7 +11,8 @@ fn check_version(name: &str, prefix: &str, wanted: &str) -> bool { let version = std::str::from_utf8(&output.stdout).expect("version output was non-utf8"); if let Some(v) = version.strip_prefix(prefix) { - if v.trim() == wanted { + let v = v.trim(); + if v == wanted { return true; } else { println!("{name} version detected as '{v}' which did not match expected version '{wanted}'"); @@ -92,7 +93,7 @@ fn install() { } } else { if !Command::new("cargo") - .args(["install", "-f", "proto-gen"]) + .args(["install", "--locked", "-f", "proto-gen"]) .status() .expect("cargo not installed") .success() diff --git a/proto/quilkin/config/v1alpha1/config.proto b/proto/quilkin/config/v1alpha1/config.proto index 6db568a6d9..7a26e0012e 100644 --- a/proto/quilkin/config/v1alpha1/config.proto +++ b/proto/quilkin/config/v1alpha1/config.proto @@ -19,29 +19,49 @@ import "google/protobuf/struct.proto"; package quilkin.config.v1alpha1; -message ClusterMap { - repeated Cluster clusters = 1; -} +message ClusterMap { repeated Cluster clusters = 1; } message Cluster { - Locality locality = 1; - repeated Endpoint endpoints = 2; + Locality locality = 1; + repeated Endpoint endpoints = 2; } message Locality { - string region = 1; - string zone = 2; - string sub_zone = 3; + string region = 1; + string zone = 2; + string sub_zone = 3; +} + +message Ipv6 { + fixed64 first = 1; + fixed64 second = 2; +} + +message Host { + oneof inner { + string name = 1; + fixed32 ipv4 = 2; + Ipv6 ipv6 = 3; + } } message Endpoint { - string host = 1; - uint32 port = 2; - google.protobuf.Struct metadata = 3; + string host = 1; + uint32 port = 2; + google.protobuf.Struct metadata = 3; + Host host2 = 4; } message Datacenter { - string host = 1; - uint32 qcmp_port = 2; - string icao_code = 3; + string host = 1; + uint32 qcmp_port = 2; + string icao_code = 3; +} + +message Filter { + string name = 1; + optional string label = 2; + optional string config = 3; } + +message FilterChain { repeated Filter filters = 1; } diff --git a/src/config.rs b/src/config.rs index 825e2a2e86..e93263677d 100644 --- a/src/config.rs +++ b/src/config.rs @@ -109,6 +109,8 @@ pub struct DeltaDiscoveryRes { pub removed: Vec, } +use crate::net::xds::ClientVersions; + impl Config { /// Attempts to deserialize `input` as a YAML object representing `Self`. pub fn from_reader(input: R) -> Result { @@ -192,6 +194,11 @@ impl Config { ..<_>::default() })?); } + ResourceType::FilterChain => { + resources.push(resource_type.encode_to_any( + &crate::net::cluster::proto::FilterChain::try_from(&*self.filters.load())?, + )?); + } ResourceType::Cluster => { if names.is_empty() { for cluster in self.clusters.read().iter() { @@ -226,12 +233,12 @@ impl Config { pub fn delta_discovery_request( &self, subscribed: &BTreeSet, - client_versions: &crate::net::xds::ClientVersions, + client_versions: &ClientVersions, ) -> crate::Result { let mut resources = Vec::new(); let (awaiting_ack, removed) = match client_versions { - crate::net::xds::ClientVersions::Listener => { + ClientVersions::Listener => { resources.push(XdsResource { name: "listener".into(), version: "0".into(), @@ -245,7 +252,20 @@ impl Config { }); (crate::net::xds::AwaitingAck::Listener, Vec::new()) } - crate::net::xds::ClientVersions::Datacenter => { + ClientVersions::FilterChain => { + resources.push(XdsResource { + name: "filter_chain".into(), + version: "0".into(), + resource: Some(ResourceType::FilterChain.encode_to_any( + &crate::net::cluster::proto::FilterChain::try_from(&*self.filters.load())?, + )?), + aliases: Vec::new(), + ttl: None, + cache_control: None, + }); + (crate::net::xds::AwaitingAck::FilterChain, Vec::new()) + } + ClientVersions::Datacenter => { match &self.datacenter { DatacenterConfig::Agent { qcmp_port, @@ -287,7 +307,7 @@ impl Config { } (crate::net::xds::AwaitingAck::Datacenter, Vec::new()) } - crate::net::xds::ClientVersions::Cluster(map) => { + ClientVersions::Cluster(map) => { let resource_type = ResourceType::Cluster; let mut to_ack = Vec::new(); @@ -368,6 +388,12 @@ impl Config { self.filters.store(Arc::new(chain)); } + Resource::FilterChain(fc) => { + self.filters + .store(Arc::new(crate::filters::FilterChain::try_create_fallible( + fc.filters.into_iter(), + )?)); + } Resource::Datacenter(dc) => { let DatacenterConfig::NonAgent { datacenters } = &self.datacenter else { eyre::bail!("cannot apply datacenter resource to an agent"); @@ -435,6 +461,20 @@ impl Config { local_versions.insert(listener.name, "".into()); } } + ResourceType::FilterChain => { + for res in resources { + let (resource, _) = res?; + let Resource::FilterChain(fc) = resource else { + return Err(eyre::eyre!("a non-filterchain resource was present")); + }; + + self.filters + .store(Arc::new(crate::filters::FilterChain::try_create_fallible( + fc.filters.into_iter(), + )?)); + local_versions.insert(String::new(), "".into()); + } + } ResourceType::Datacenter => { let DatacenterConfig::NonAgent { datacenters } = &self.datacenter else { eyre::bail!("cannot apply delta datacenters resource to agent"); @@ -828,6 +868,27 @@ impl TryFrom for Filter { } } +impl TryFrom for Filter { + type Error = CreationError; + + fn try_from(value: crate::net::cluster::proto::Filter) -> Result { + let config = if let Some(cfg) = value.config { + Some( + serde_json::from_str(&cfg) + .map_err(|err| CreationError::DeserializeFailed(err.to_string()))?, + ) + } else { + None + }; + + Ok(Self { + name: value.name, + label: value.label, + config, + }) + } +} + impl TryFrom for listener::Filter { type Error = CreationError; diff --git a/src/filters/capture.rs b/src/filters/capture.rs index e92d9e6c78..b26c4bd41f 100644 --- a/src/filters/capture.rs +++ b/src/filters/capture.rs @@ -50,7 +50,7 @@ impl Capture { fn new(config: Config) -> Self { Self { capture: config.strategy.into_capture(), - is_present_key: (config.metadata_key.to_string() + "/is_present").into(), + is_present_key: format!("{}/is_present", config.metadata_key).into(), metadata_key: config.metadata_key, } } diff --git a/src/filters/chain.rs b/src/filters/chain.rs index d9e7319b6f..dca64dc4be 100644 --- a/src/filters/chain.rs +++ b/src/filters/chain.rs @@ -210,6 +210,23 @@ impl TryFrom<&'_ FilterChain> for EnvoyFilterChain { } } +impl TryFrom<&'_ FilterChain> for crate::net::cluster::proto::FilterChain { + type Error = CreationError; + + fn try_from(value: &'_ FilterChain) -> Result { + Ok(Self { + filters: value + .iter() + .map(|filter| crate::net::cluster::proto::Filter { + name: filter.name, + label: filter.label, + config: filter.config.map(|v| v.to_string()), + }) + .collect(), + }) + } +} + impl std::ops::Index for FilterChain { type Output = (String, FilterInstance); @@ -371,7 +388,10 @@ mod tests { let expected = endpoints_fixture.clone(); assert_eq!(&*expected.endpoints(), &*context.destinations); - assert_eq!(b"hello:odr:127.0.0.1:70", &*context.contents); + assert_eq!( + "hello:odr:127.0.0.1:70", + std::str::from_utf8(&context.contents).unwrap() + ); assert_eq!( "receive", context.metadata[&"downstream".into()].as_string().unwrap() diff --git a/src/filters/compress/config.rs b/src/filters/compress/config.rs index ac832a23ce..72c16ae669 100644 --- a/src/filters/compress/config.rs +++ b/src/filters/compress/config.rs @@ -109,7 +109,6 @@ impl From for ActionValue { } #[derive(Clone, Copy, Default, Deserialize, Debug, Eq, PartialEq, Serialize, JsonSchema)] -#[non_exhaustive] pub struct Config { #[serde(default)] pub mode: Mode, diff --git a/src/generated/quilkin/config/v1alpha1.rs b/src/generated/quilkin/config/v1alpha1.rs index a0e474e5fa..e2311d4a9b 100644 --- a/src/generated/quilkin/config/v1alpha1.rs +++ b/src/generated/quilkin/config/v1alpha1.rs @@ -24,6 +24,33 @@ pub struct Locality { } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] +pub struct Ipv6 { + #[prost(fixed64, tag = "1")] + pub first: u64, + #[prost(fixed64, tag = "2")] + pub second: u64, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct Host { + #[prost(oneof = "host::Inner", tags = "1, 2, 3")] + pub inner: ::core::option::Option, +} +/// Nested message and enum types in `Host`. +pub mod host { + #[allow(clippy::derive_partial_eq_without_eq)] + #[derive(Clone, PartialEq, ::prost::Oneof)] + pub enum Inner { + #[prost(string, tag = "1")] + Name(::prost::alloc::string::String), + #[prost(fixed32, tag = "2")] + Ipv4(u32), + #[prost(message, tag = "3")] + Ipv6(super::Ipv6), + } +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] pub struct Endpoint { #[prost(string, tag = "1")] pub host: ::prost::alloc::string::String, @@ -31,6 +58,8 @@ pub struct Endpoint { pub port: u32, #[prost(message, optional, tag = "3")] pub metadata: ::core::option::Option<::prost_types::Struct>, + #[prost(message, optional, tag = "4")] + pub host2: ::core::option::Option, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] @@ -42,3 +71,19 @@ pub struct Datacenter { #[prost(string, tag = "3")] pub icao_code: ::prost::alloc::string::String, } +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct Filter { + #[prost(string, tag = "1")] + pub name: ::prost::alloc::string::String, + #[prost(string, optional, tag = "2")] + pub label: ::core::option::Option<::prost::alloc::string::String>, + #[prost(string, optional, tag = "3")] + pub config: ::core::option::Option<::prost::alloc::string::String>, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct FilterChain { + #[prost(message, repeated, tag = "1")] + pub filters: ::prost::alloc::vec::Vec, +} diff --git a/src/lib.rs b/src/lib.rs index a80081da88..b0e996d475 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -37,9 +37,10 @@ pub mod test; #[allow( clippy::enum_variant_names, clippy::large_enum_variant, + clippy::len_without_is_empty, rustdoc::bare_urls )] -mod generated; +pub mod generated; pub type Result = std::result::Result; diff --git a/src/net/cluster.rs b/src/net/cluster.rs index 1095de30d8..4024109653 100644 --- a/src/net/cluster.rs +++ b/src/net/cluster.rs @@ -621,6 +621,7 @@ impl From<&'_ Endpoint> for proto::Endpoint { host: endpoint.address.host.to_string(), port: endpoint.address.port.into(), metadata: Some((&endpoint.metadata).into()), + host2: None, } } } diff --git a/src/net/endpoint.rs b/src/net/endpoint.rs index 2703199284..1a69c09f93 100644 --- a/src/net/endpoint.rs +++ b/src/net/endpoint.rs @@ -20,6 +20,8 @@ pub(crate) mod address; mod locality; pub mod metadata; +use crate::net::cluster::proto; +use eyre::ContextCompat; use serde::{Deserialize, Serialize}; pub use self::{ @@ -58,6 +60,59 @@ impl Endpoint { ..<_>::default() } } + + #[inline] + pub fn from_proto(proto: proto::Endpoint) -> eyre::Result { + let host: AddressKind = if let Some(host) = proto.host2 { + match host.inner.context("should be unreachable")? { + proto::host::Inner::Name(name) => AddressKind::Name(name), + proto::host::Inner::Ipv4(v4) => { + AddressKind::Ip(std::net::Ipv4Addr::from(v4).into()) + } + proto::host::Inner::Ipv6(v6) => AddressKind::Ip( + std::net::Ipv6Addr::from(((v6.first as u128) << 64) | v6.second as u128).into(), + ), + } + } else { + proto.host.parse()? + }; + + Ok(Self { + address: (host, proto.port as u16).into(), + metadata: proto + .metadata + .map(TryFrom::try_from) + .transpose()? + .unwrap_or_default(), + }) + } + + #[inline] + pub fn into_proto(self) -> proto::Endpoint { + let host = match self.address.host { + AddressKind::Name(name) => proto::host::Inner::Name(name), + AddressKind::Ip(ip) => match ip { + std::net::IpAddr::V4(v4) => { + proto::host::Inner::Ipv4(u32::from_be_bytes(v4.octets())) + } + std::net::IpAddr::V6(v6) => { + let ip = u128::from_be_bytes(v6.octets()); + + let first = ((ip >> 64) & 0xffffffffffffffff) as u64; + let second = (ip & 0xffffffffffffffff) as u64; + + proto::host::Inner::Ipv6(proto::Ipv6 { first, second }) + } + }, + }; + + proto::Endpoint { + host: String::new(), + port: self.address.port.into(), + metadata: Some(self.metadata.into()), + host2: Some(proto::Host { inner: Some(host) }), + } + } } impl Default for Endpoint { @@ -80,20 +135,21 @@ impl std::str::FromStr for Endpoint { } } -impl From for crate::net::cluster::proto::Endpoint { +impl From for proto::Endpoint { fn from(endpoint: Endpoint) -> Self { Self { host: endpoint.address.host.to_string(), port: endpoint.address.port.into(), metadata: Some(endpoint.metadata.into()), + host2: None, } } } -impl TryFrom for Endpoint { +impl TryFrom for Endpoint { type Error = eyre::Error; - fn try_from(endpoint: crate::net::cluster::proto::Endpoint) -> Result { + fn try_from(endpoint: proto::Endpoint) -> Result { let host: address::AddressKind = endpoint.host.parse()?; if endpoint.port > u16::MAX as u32 { return Err(eyre::eyre!("invalid endpoint port")); @@ -323,4 +379,21 @@ mod tests { serde_yaml::from_str::(yaml).unwrap_err(); } } + + // Sanity check conversion between endpoint <-> proto works + #[test] + fn endpoint_proto_conversion() { + let first = Endpoint::new(EndpointAddress { + host: AddressKind::Ip(std::net::IpAddr::V6(std::net::Ipv6Addr::new( + 0x00, 0x01, 0x02, 0x04, 0x08, 0x10, 0xab, 0xcd, + ))), + port: 2001, + }); + + let expected = first.clone(); + let proto = first.into_proto(); + let actual = Endpoint::from_proto(proto).unwrap(); + + assert_eq!(expected, actual); + } } diff --git a/src/net/xds.rs b/src/net/xds.rs index b0685fb9a7..ebb3825692 100644 --- a/src/net/xds.rs +++ b/src/net/xds.rs @@ -30,19 +30,12 @@ pub use crate::generated::envoy::{ }; pub use client::{AdsClient, Client}; pub use resource::{Resource, ResourceType}; -// pub use self::{ -// client::{AdsClient, Client}, -// envoy::service::discovery::v3::aggregated_discovery_service_client::AggregatedDiscoveryServiceClient, -// envoy::*, -// resource::{Resource, ResourceType}, -// server::ControlPlane, -// xds::*, -// }; use std::collections::HashMap; /// Keeps track of what resource versions a particular client has pub enum ClientVersions { Listener, + FilterChain, Cluster(HashMap, EndpointSetVersion>), Datacenter, } @@ -52,6 +45,7 @@ pub enum ClientVersions { /// the client has pub enum AwaitingAck { Listener, + FilterChain, Cluster { updated: Vec<(Option, EndpointSetVersion)>, remove_none: bool, @@ -64,6 +58,7 @@ impl ClientVersions { pub fn new(rt: ResourceType) -> Self { match rt { ResourceType::Listener => Self::Listener, + ResourceType::FilterChain => Self::FilterChain, ResourceType::Cluster => Self::Cluster(HashMap::new()), ResourceType::Datacenter => Self::Datacenter, } @@ -73,6 +68,7 @@ impl ClientVersions { pub fn typ(&self) -> ResourceType { match self { Self::Listener => ResourceType::Listener, + Self::FilterChain => ResourceType::FilterChain, Self::Cluster(_) => ResourceType::Cluster, Self::Datacenter => ResourceType::Datacenter, } @@ -106,43 +102,41 @@ impl ClientVersions { #[inline] pub fn remove(&mut self, name: String) { - match self { - Self::Listener | Self::Datacenter => {} - Self::Cluster(map) => { - let locality = if name.is_empty() { - None - } else { - match name.parse() { - Ok(l) => Some(l), - Err(err) => { - tracing::error!(error = %err, name, "Failed to parse locality"); - return; - } - } - }; - map.remove(&locality); + let Self::Cluster(map) = self else { + return; + }; + + let locality = if name.is_empty() { + None + } else { + match name.parse() { + Ok(l) => Some(l), + Err(err) => { + tracing::error!(error = %err, name, "Failed to parse locality"); + return; + } } - } + }; + map.remove(&locality); } /// Resets the client versions to those specified by the client itself #[inline] pub fn reset(&mut self, versions: HashMap) -> crate::Result<()> { - match self { - Self::Listener | Self::Datacenter => Ok(()), - Self::Cluster(map) => { - map.clear(); + let Self::Cluster(map) = self else { + return Ok(()); + }; - for (k, v) in versions { - let locality = if k.is_empty() { None } else { Some(k.parse()?) }; - let version = v.parse()?; + map.clear(); - map.insert(locality, version); - } + for (k, v) in versions { + let locality = if k.is_empty() { None } else { Some(k.parse()?) }; + let version = v.parse()?; - Ok(()) - } + map.insert(locality, version); } + + Ok(()) } } diff --git a/src/net/xds/resource.rs b/src/net/xds/resource.rs index 88fb7e7a9a..4ff1eb8673 100644 --- a/src/net/xds/resource.rs +++ b/src/net/xds/resource.rs @@ -35,6 +35,7 @@ type_urls! { CLUSTER_TYPE = "quilkin.config.v1alpha1.Cluster", LISTENER_TYPE = "envoy.config.listener.v3.Listener", DATACENTER_TYPE = "quilkin.config.v1alpha1.Datacenter", + FILTER_CHAIN_TYPE = "quilkin.config.v1alpha1.FilterChain", } } @@ -43,6 +44,7 @@ pub enum Resource { Cluster(Box), Datacenter(Box), Listener(Box), + FilterChain(crate::net::cluster::proto::FilterChain), } impl Resource { @@ -55,6 +57,7 @@ impl Resource { .map(|locality| crate::net::endpoint::Locality::from(locality).to_string()) .unwrap_or_default(), Self::Listener(listener) => listener.name.to_string(), + Self::FilterChain(_fc) => String::new(), Self::Datacenter(dc) => dc.icao_code.to_string(), } } @@ -64,6 +67,7 @@ impl Resource { match self { Self::Cluster(_) => ResourceType::Cluster, Self::Listener(_) => ResourceType::Listener, + Self::FilterChain(_) => ResourceType::FilterChain, Self::Datacenter(_) => ResourceType::Datacenter, } } @@ -81,6 +85,16 @@ impl Resource { pub fn type_url(&self) -> &str { self.resource_type().type_url() } + + pub fn from_any(any: prost_types::Any) -> eyre::Result { + Ok(match &*any.type_url { + CLUSTER_TYPE => Resource::Cluster(<_>::decode(&*any.value)?), + LISTENER_TYPE => Resource::Listener(<_>::decode(&*any.value)?), + DATACENTER_TYPE => Resource::Datacenter(<_>::decode(&*any.value)?), + FILTER_CHAIN_TYPE => Resource::FilterChain(<_>::decode(&*any.value)?), + url => return Err(UnknownResourceType(url.into()).into()), + }) + } } impl TryFrom for Resource { @@ -91,6 +105,7 @@ impl TryFrom for Resource { CLUSTER_TYPE => Resource::Cluster(<_>::decode(&*any.value)?), LISTENER_TYPE => Resource::Listener(<_>::decode(&*any.value)?), DATACENTER_TYPE => Resource::Datacenter(<_>::decode(&*any.value)?), + FILTER_CHAIN_TYPE => Resource::FilterChain(<_>::decode(&*any.value)?), url => return Err(UnknownResourceType(url.into()).into()), }) } @@ -100,11 +115,17 @@ impl TryFrom for Resource { pub enum ResourceType { Cluster, Listener, + FilterChain, Datacenter, } impl ResourceType { - pub const VARIANTS: &'static [Self] = &[Self::Cluster, Self::Listener, Self::Datacenter]; + pub const VARIANTS: &'static [Self] = &[ + Self::Cluster, + Self::Listener, + Self::FilterChain, + Self::Datacenter, + ]; /// Returns the corresponding type URL for the response type. #[inline] @@ -113,6 +134,7 @@ impl ResourceType { Self::Cluster => CLUSTER_TYPE, Self::Listener => LISTENER_TYPE, Self::Datacenter => DATACENTER_TYPE, + Self::FilterChain => FILTER_CHAIN_TYPE, } } @@ -140,6 +162,7 @@ impl TryFrom<&'_ str> for ResourceType { Ok(match url { CLUSTER_TYPE => Self::Cluster, LISTENER_TYPE => Self::Listener, + FILTER_CHAIN_TYPE => Self::FilterChain, DATACENTER_TYPE => Self::Datacenter, unknown => return Err(UnknownResourceType(unknown.to_owned())), }) diff --git a/src/net/xds/server.rs b/src/net/xds/server.rs index 67f1829d69..e4749def32 100644 --- a/src/net/xds/server.rs +++ b/src/net/xds/server.rs @@ -354,7 +354,14 @@ impl ControlPlane { subscribed: BTreeSet::new(), kind: ResourceType::Listener, } - } + }, + ResourceType::FilterChain => { + ResourceTypeTracker { + client: ClientVersions::new(ResourceType::FilterChain), + subscribed: BTreeSet::new(), + kind: ResourceType::FilterChain, + } + }, ResourceType::Datacenter => { ResourceTypeTracker { client: ClientVersions::new(ResourceType::Datacenter), @@ -366,6 +373,7 @@ impl ControlPlane { let mut cluster_rx = self.watchers[ResourceType::Cluster].receiver.clone(); let mut listener_rx = self.watchers[ResourceType::Listener].receiver.clone(); + let mut fc_rx = self.watchers[ResourceType::FilterChain].receiver.clone(); let mut dc_rx = self.watchers[ResourceType::Datacenter].receiver.clone(); let id = node_id.clone(); @@ -476,6 +484,11 @@ impl ControlPlane { yield responder(None, &mut trackers[ResourceType::Listener], &mut pending_acks)?; } + _ = fc_rx.changed() => { + tracing::trace!("sending new filter chain delta discovery response"); + + yield responder(None, &mut trackers[ResourceType::FilterChain], &mut pending_acks)?; + } _ = dc_rx.changed() => { tracing::trace!("sending new datacenter delta discovery response");