Skip to content

Commit

Permalink
Add an optimized token router filter (#948)
Browse files Browse the repository at this point in the history
* Remove unneccessary allocations

Both serialization and deserialization were doing unnecessary temporary heap allocations

* Remove dead proto includes

* Add an alternative HashedTokenRouter filter
  • Loading branch information
Jake-Shadle authored May 14, 2024
1 parent 6badc24 commit 1556b19
Show file tree
Hide file tree
Showing 17 changed files with 394 additions and 164 deletions.
5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ name = "misc"
harness = false
test = false

[[bench]]
name = "token_router"
harness = false
test = false

[dependencies]
# Local
quilkin-macros = { version = "0.9.0-dev", path = "./macros" }
Expand Down
14 changes: 8 additions & 6 deletions benches/cluster_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ use quilkin::net::cluster::ClusterMap;

mod shared;

use shared::TokenKind;

#[divan::bench_group(sample_count = 10)]
mod serde {
use super::*;
Expand Down Expand Up @@ -66,21 +68,21 @@ mod serde {

#[divan::bench(consts = SEEDS)]
fn serialize_proto<const S: u64>(b: Bencher) {
let gc = gen_cluster_map::<S>();
let gc = gen_cluster_map::<S>(TokenKind::None);
b.counter(gc.total_endpoints)
.bench(|| divan::black_box(serialize_to_protobuf(&gc.cm)));
}

#[divan::bench(consts = SEEDS)]
fn serialize_json<const S: u64>(b: Bencher) {
let gc = gen_cluster_map::<S>();
let gc = gen_cluster_map::<S>(TokenKind::None);
b.counter(gc.total_endpoints)
.bench(|| divan::black_box(serialize_to_json(&gc.cm)));
}

#[divan::bench(consts = SEEDS)]
fn deserialize_json<const S: u64>(b: Bencher) {
let gc = gen_cluster_map::<S>();
let gc = gen_cluster_map::<S>(TokenKind::None);
let json = serialize_to_json(&gc.cm);

b.with_inputs(|| json.clone())
Expand All @@ -90,7 +92,7 @@ mod serde {

#[divan::bench(consts = SEEDS)]
fn deserialize_proto<const S: u64>(b: Bencher) {
let gc = gen_cluster_map::<S>();
let gc = gen_cluster_map::<S>(TokenKind::None);
let pv = serialize_to_protobuf(&gc.cm);

b.with_inputs(|| pv.clone())
Expand Down Expand Up @@ -125,7 +127,7 @@ mod ops {

#[divan::bench(consts = SEEDS)]
fn iterate<const S: u64>(b: Bencher) {
let cm = gen_cluster_map::<S>();
let cm = gen_cluster_map::<S>(TokenKind::None);

b.counter(cm.total_endpoints)
.bench_local(|| divan::black_box(compute_hash::<S>(&cm)));
Expand All @@ -135,7 +137,7 @@ mod ops {

#[divan::bench(consts = SEEDS)]
fn iterate_par<const S: u64>(b: Bencher) {
let cm = gen_cluster_map::<S>();
let cm = gen_cluster_map::<S>(TokenKind::None);

b.counter(cm.total_endpoints)
.bench(|| divan::black_box(compute_hash::<S>(&cm)))
Expand Down
2 changes: 1 addition & 1 deletion benches/misc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ impl GenResource for Cluster {

let mut rng = rand::rngs::SmallRng::seed_from_u64(self.counter as u64);
let mut hasher = xxhash_rust::xxh3::Xxh3::new();
let endpoints = shared::gen_endpoints(&mut rng, &mut hasher);
let endpoints = shared::gen_endpoints(&mut rng, &mut hasher, None);

let msg = quilkin::generated::quilkin::config::v1alpha1::Cluster {
locality: Some(quilkin::generated::quilkin::config::v1alpha1::Locality {
Expand Down
135 changes: 131 additions & 4 deletions benches/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -497,11 +497,20 @@ pub const LOCALITIES: &[&str] = &[
"us:west4:c",
];

pub fn gen_endpoints(rng: &mut rand::rngs::SmallRng, hasher: &mut Hasher) -> BTreeSet<Endpoint> {
pub fn gen_endpoints(
rng: &mut rand::rngs::SmallRng,
hasher: &mut Hasher,
mut tg: Option<&mut TokenGenerator>,
) -> BTreeSet<Endpoint> {
let num_endpoints = rng.gen_range(100..10_000);
hasher.write_u16(num_endpoints);

let mut endpoints = BTreeSet::new();
if let Some(tg) = &mut tg {
if let Some(prev) = &mut tg.previous {
prev.clear();
}
}

for i in 0..num_endpoints {
let ep_addr = match i % 3 {
Expand All @@ -514,7 +523,20 @@ pub fn gen_endpoints(rng: &mut rand::rngs::SmallRng, hasher: &mut Hasher) -> BTr
_ => unreachable!(),
};

endpoints.insert(Endpoint::new(ep_addr));
let ep = if let Some(tg) = &mut tg {
let set = tg.next().unwrap();

Endpoint::with_metadata(
ep_addr,
quilkin::net::endpoint::EndpointMetadata::new(quilkin::net::endpoint::Metadata {
tokens: set,
}),
)
} else {
Endpoint::new(ep_addr)
};

endpoints.insert(ep);
}

for ep in &endpoints {
Expand All @@ -541,7 +563,98 @@ fn write_locality(hasher: &mut Hasher, loc: &Option<Locality>) {
}
}

pub fn gen_cluster_map<const S: u64>() -> GenCluster {
pub enum TokenKind {
None,
Single {
duplicates: bool,
},
Multi {
range: std::ops::Range<usize>,
duplicates: bool,
},
}

impl std::str::FromStr for TokenKind {
type Err = eyre::Error;

fn from_str(s: &str) -> Result<Self, Self::Err> {
let dupes = |s: &str| match s {
"duplicates" => Ok(true),
"unique" => Ok(false),
_ => eyre::bail!("must be `duplicates` or `unique`"),
};

if let Some(rest) = s.strip_prefix("single:") {
Ok(Self::Single {
duplicates: dupes(rest)?,
})
} else if let Some(rest) = s.strip_prefix("multi:") {
let (r, rest) = rest
.split_once(':')
.ok_or_else(|| eyre::format_err!("multi must specify 'range:duplicates'"))?;

let (start, end) = r
.split_once("..")
.ok_or_else(|| eyre::format_err!("range must be specified as '<start>..<end>'"))?;

let range = start.parse()?..end.parse()?;

Ok(Self::Multi {
range,
duplicates: dupes(rest)?,
})
} else {
eyre::bail!("unknown token kind");
}
}
}

pub struct TokenGenerator {
rng: rand::rngs::SmallRng,
previous: Option<Vec<Vec<u8>>>,
range: Option<std::ops::Range<usize>>,
}

impl Iterator for TokenGenerator {
type Item = quilkin::net::endpoint::Set;

fn next(&mut self) -> Option<Self::Item> {
use rand::RngCore;
let mut set = Self::Item::new();

let count = if let Some(range) = self.range.clone() {
self.rng.gen_range(range)
} else {
1
};

if let Some(prev) = &mut self.previous {
for _ in 0..count {
if !prev.is_empty() && self.rng.gen_ratio(1, 10) {
let prev = &prev[self.rng.gen_range(0..prev.len())];
set.insert(prev.clone());
} else {
let count = self.rng.gen_range(4..20);
let mut v = vec![0; count];
self.rng.fill_bytes(&mut v);
prev.push(v.clone());
set.insert(v);
}
}
} else {
for _ in 0..count {
let count = self.rng.gen_range(4..20);
let mut v = vec![0; count];
self.rng.fill_bytes(&mut v);
set.insert(v);
}
}

Some(set)
}
}

pub fn gen_cluster_map<const S: u64>(token_kind: TokenKind) -> GenCluster {
use rand::prelude::*;

let mut rng = rand::rngs::SmallRng::seed_from_u64(S);
Expand All @@ -566,10 +679,24 @@ pub fn gen_cluster_map<const S: u64>() -> GenCluster {
let keys: Vec<_> = cm.iter().map(|kv| kv.key().clone()).collect();
let mut sets = std::collections::BTreeMap::new();

let mut token_generator = match token_kind {
TokenKind::None => None,
TokenKind::Multi { range, duplicates } => Some(TokenGenerator {
rng: rand::rngs::SmallRng::seed_from_u64(S),
previous: duplicates.then_some(Vec::new()),
range: Some(range),
}),
TokenKind::Single { duplicates } => Some(TokenGenerator {
rng: rand::rngs::SmallRng::seed_from_u64(S),
previous: duplicates.then_some(Vec::new()),
range: None,
}),
};

for key in keys {
write_locality(&mut hasher, &key);

let ep = gen_endpoints(&mut rng, &mut hasher);
let ep = gen_endpoints(&mut rng, &mut hasher, token_generator.as_mut());
total_endpoints += ep.len();
cm.insert(key.clone(), ep.clone());
sets.insert(key, ep);
Expand Down
60 changes: 60 additions & 0 deletions benches/token_router.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
use divan::Bencher;
use quilkin::filters::token_router::{HashedTokenRouter, Router, TokenRouter};
use rand::SeedableRng;

mod shared;

#[divan::bench(types = [TokenRouter, HashedTokenRouter], args = ["single:duplicates", "single:unique", "multi:2..128:duplicates", "multi:2..128:unique"])]
fn token_router<T>(b: Bencher, token_kind: &str)
where
T: Router + Sync,
{
let filter = <T as Router>::new();
let gc = shared::gen_cluster_map::<42>(token_kind.parse().unwrap());

let mut tokens = Vec::new();

let cm = std::sync::Arc::new(gc.cm);
cm.build_token_maps();

// Calculate the amount of bytes for all the tokens
for eps in cm.iter() {
for ep in &eps.value().endpoints {
for tok in &ep.metadata.known.tokens {
tokens.push(tok.clone());
}
}
}

let total_token_size: usize = tokens.iter().map(|t| t.len()).sum();
let pool = std::sync::Arc::new(quilkin::pool::BufferPool::new(1, 1));

let mut rand = rand::rngs::SmallRng::seed_from_u64(42);

b.with_inputs(|| {
use rand::seq::SliceRandom as _;
let tok = tokens.choose(&mut rand).unwrap();

let mut rc = quilkin::filters::ReadContext::new(
cm.clone(),
quilkin::net::EndpointAddress::LOCALHOST,
pool.clone().alloc(),
);
rc.metadata.insert(
quilkin::net::endpoint::metadata::Key::from_static(
quilkin::filters::capture::CAPTURED_BYTES,
),
quilkin::net::endpoint::metadata::Value::Bytes((*tok).clone().into()),
);

rc
})
.counter(divan::counter::BytesCount::new(total_token_size))
.bench_local_values(|mut rc| {
let _ = divan::black_box(filter.sync_read(&mut rc));
})
}

fn main() {
divan::main();
}
4 changes: 2 additions & 2 deletions src/components/proxy/packet_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,10 +228,10 @@ impl DownstreamReceiveWorkerConfig {
// cheaply and returned to the pool once all references are dropped
let contents = contents.freeze();

for endpoint in destinations.iter() {
for epa in destinations {
let session_key = SessionKey {
source: packet.source,
dest: endpoint.address.to_socket_addr().await?,
dest: epa.to_socket_addr().await?,
};

sessions
Expand Down
7 changes: 6 additions & 1 deletion src/filters/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,12 @@ impl Filter for FilterChain {
// has rejected, and the destinations is empty, we passthrough to all.
// Which mimics the old behaviour while avoid clones in most cases.
if ctx.destinations.is_empty() {
ctx.destinations = ctx.endpoints.endpoints();
ctx.destinations = ctx
.endpoints
.endpoints()
.into_iter()
.map(|ep| ep.address)
.collect();
}

Ok(())
Expand Down
29 changes: 0 additions & 29 deletions src/filters/compress/proto.rs

This file was deleted.

6 changes: 1 addition & 5 deletions src/filters/load_balancer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,7 @@ mod tests {

filter.read(&mut context).await.unwrap();

context
.destinations
.iter()
.map(|ep| ep.address.clone())
.collect::<Vec<_>>()
context.destinations
}

#[tokio::test]
Expand Down
Loading

0 comments on commit 1556b19

Please sign in to comment.