Skip to content

Commit

Permalink
Merge pull request #34 from EspressoSystems/rm/clippy-and-global-permits
Browse files Browse the repository at this point in the history
Global permits feature
  • Loading branch information
rob-maron authored Apr 4, 2024
2 parents 8dbc3a4 + a04052d commit c709b7b
Show file tree
Hide file tree
Showing 19 changed files with 178 additions and 142 deletions.
5 changes: 3 additions & 2 deletions cdn-broker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ edition = "2021"
description = "Defines the broker server, which is responsible for routing messages from clients"

[features]
default = ["runtime-tokio", "strong_consistency"]
default = ["runtime-tokio", "strong-consistency"]

strong_consistency = []
strong-consistency = []
global-permits = ["cdn-proto/global-permits"]

runtime-tokio = []
runtime-async-std = ["dep:async-std"]
Expand Down
12 changes: 4 additions & 8 deletions cdn-broker/benches/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,13 @@ fn bench_broadcast_user(c: &mut Criterion) {
connected_brokers: vec![],
};

let run = run_definition.into_run().await;

run
run_definition.into_run().await
});

// Benchmark
c.bench_function("broadcast: users", |b| {
b.to_async(&benchmark_runtime)
.iter(|| broadcast_user(black_box(&run)))
.iter(|| broadcast_user(black_box(&run)));
});
}

Expand All @@ -76,15 +74,13 @@ fn bench_broadcast_broker(c: &mut Criterion) {
connected_brokers: vec![(vec![], vec![Topic::Global]), (vec![], vec![Topic::Global])],
};

let run = run_definition.into_run().await;

run
run_definition.into_run().await
});

// Benchmark
c.bench_function("broadcast: brokers", |b| {
b.to_async(&benchmark_runtime)
.iter(|| broadcast_broker(black_box(&run)))
.iter(|| broadcast_broker(black_box(&run)));
});
}

Expand Down
24 changes: 8 additions & 16 deletions cdn-broker/benches/direct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,13 @@ fn bench_direct_user_to_self(c: &mut Criterion) {
connected_brokers: vec![],
};

let run = run_definition.into_run().await;

run
run_definition.into_run().await
});

// Run the benchmark
c.bench_function("direct: user -> broker -> same user", |b| {
b.to_async(&benchmark_runtime)
.iter(|| direct_user_to_self(black_box(&run)))
.iter(|| direct_user_to_self(black_box(&run)));
});
}

Expand All @@ -104,15 +102,13 @@ fn bench_direct_user_to_user(c: &mut Criterion) {
connected_brokers: vec![],
};

let run = run_definition.into_run().await;

run
run_definition.into_run().await
});

// Run the benchmark
c.bench_function("direct: user -> broker -> different user", |b| {
b.to_async(&benchmark_runtime)
.iter(|| direct_user_to_user(black_box(&run)))
.iter(|| direct_user_to_user(black_box(&run)));
});
}

Expand All @@ -129,15 +125,13 @@ fn bench_direct_user_to_broker(c: &mut Criterion) {
connected_brokers: vec![(vec![2], vec![Topic::Global])],
};

let run = run_definition.into_run().await;

run
run_definition.into_run().await
});

// Run the benchmark
c.bench_function("direct: user -> broker -> broker", |b| {
b.to_async(&benchmark_runtime)
.iter(|| direct_user_to_broker(black_box(&run)))
.iter(|| direct_user_to_broker(black_box(&run)));
});
}

Expand All @@ -154,15 +148,13 @@ fn bench_direct_broker_to_user(c: &mut Criterion) {
connected_brokers: vec![(vec![2], vec![Topic::Global])],
};

let run = run_definition.into_run().await;

run
run_definition.into_run().await
});

// Run the benchmark
c.bench_function("direct: broker -> broker -> user", |b| {
b.to_async(&benchmark_runtime)
.iter(|| direct_broker_to_user(black_box(&run)))
.iter(|| direct_broker_to_user(black_box(&run)));
});
}

Expand Down
8 changes: 4 additions & 4 deletions cdn-broker/src/connections/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ pub mod tests {
);

// Check that both users are associated with 1
vec_equal!(map.get_keys_by_value(&1), vec!["user0", "user1"]);
vec_equal!(map.get_keys_by_value(&1), ["user0", "user1"]);

// Dissociate "user0" from 1
map.dissociate_keys_from_value(&"user0", &[1]);
Expand All @@ -175,7 +175,7 @@ pub mod tests {

// Check that nobody is associated with 1
assert!(
map.get_keys_by_value(&1).len() == 0,
map.get_keys_by_value(&1).is_empty(),
"expected no user to be associated with value 1"
);

Expand All @@ -202,13 +202,13 @@ pub mod tests {

// Assert zero keys to value
assert!(
map.key_to_values.len() == 0,
map.key_to_values.is_empty(),
"expected `key_to_values` to be empty"
);

// Assert zero values to key
assert!(
map.value_to_keys.len() == 0,
map.value_to_keys.is_empty(),
"expected `value_to_keys` to be empty"
);
}
Expand Down
8 changes: 4 additions & 4 deletions cdn-broker/src/connections/versioned.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ pub mod tests {

// Remove "user0"
map.remove("user0");
assert!(map.get(&"user0") == None);
assert!(map.get(&"user0").is_none());
}

#[test]
Expand Down Expand Up @@ -291,21 +291,21 @@ pub mod tests {

// Merge map_1 with new_diff, expecting user0 to have a value but not user1
map_1.merge(new_diff);
assert!(map_1.get(&"user0") == None);
assert!(map_1.get(&"user0").is_none());
assert!(map_1.get(&"user1") == Some(&"broker0"));

// Full sync, expect now to be present
map_1.merge(map_0.get_full());
assert!(map_1.get(&"user0") == Some(&"broker0"));

// Map 1 removes value, syncs
map_1.remove(&"user0");
map_1.remove("user0");

// Merge map0 with map 1's diff
map_0.merge(map_1.diff());

// Expect user0 to be gone from map0
assert!(map_0.get(&"user0") == None);
assert!(map_0.get(&"user0").is_none());
}

// TODO: fuzzy tests for this
Expand Down
6 changes: 3 additions & 3 deletions cdn-broker/src/handlers/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,13 @@ impl<Def: RunDef> Inner<Def> {
return;
};

// If we have `strong_consistency` enabled, send partials
#[cfg(feature = "strong_consistency")]
// If we have `strong-consistency` enabled, send partials
#[cfg(feature = "strong-consistency")]
if let Err(err) = self.partial_topic_sync() {
error!("failed to perform partial topic sync: {err}");
}

#[cfg(feature = "strong_consistency")]
#[cfg(feature = "strong-consistency")]
if let Err(err) = self.partial_user_sync() {
error!("failed to perform partial user sync: {err}");
}
Expand Down
11 changes: 6 additions & 5 deletions cdn-broker/src/handlers/user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use cdn_proto::connection::hooks::Untrusted;
use cdn_proto::connection::protocols::Protocol;
use cdn_proto::connection::UserPublicKey;
use cdn_proto::def::RunDef;
#[cfg(feature = "strong_consistency")]
#[cfg(feature = "strong-consistency")]
use cdn_proto::discovery::DiscoveryClient;
use cdn_proto::error::{Error, Result};
use cdn_proto::{
Expand All @@ -35,6 +35,7 @@ impl<Def: RunDef> Inner<Def> {
Duration::from_secs(5),
BrokerAuth::<Def>::verify_user(
&connection,
#[cfg(not(feature = "global-permits"))]
&self.identity,
&mut self.discovery_client.clone(),
),
Expand All @@ -58,20 +59,20 @@ impl<Def: RunDef> Inner<Def> {
// Subscribe our user to their connections
self.connections.subscribe_user_to(&public_key, topics);

// If we have `strong_consistency` enabled, send partials
#[cfg(feature = "strong_consistency")]
// If we have `strong-consistency` enabled, send partials
#[cfg(feature = "strong-consistency")]
if let Err(err) = self.partial_topic_sync() {
tracing::error!("failed to perform partial topic sync: {err}");
}

#[cfg(feature = "strong_consistency")]
#[cfg(feature = "strong-consistency")]
if let Err(err) = self.partial_user_sync() {
tracing::error!("failed to perform partial user sync: {err}");
}

// We want to perform a heartbeat for every user connection so that the number
// of users connected to brokers is always evenly distributed.
#[cfg(feature = "strong_consistency")]
#[cfg(feature = "strong-consistency")]
let _ = self
.discovery_client
.clone()
Expand Down
2 changes: 1 addition & 1 deletion cdn-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ impl<Scheme: SignatureScheme, ProtocolType: Protocol<None>> Client<Scheme, Proto

/// Returns only once the connection is fully initialized
pub async fn ensure_initialized(&self) {
self.0.ensure_initialized().await
self.0.ensure_initialized().await;
}

/// Receives the next message from the downstream server.
Expand Down
2 changes: 2 additions & 0 deletions cdn-marshal/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ description = "Contains the server implementation for the marshal, which issues
[features]
default = ["runtime-tokio"]

global-permits = ["cdn-proto/global-permits"]

runtime-tokio = []
runtime-async-std = ["dep:async-std"]

Expand Down
2 changes: 2 additions & 0 deletions cdn-proto/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ rcgen.workspace = true

[features]
metrics = ["dep:prometheus"]
# Allow permits to be issued for _any_ broker
global-permits = []

[dev-dependencies]
portpicker = "0.1.1"
Expand Down
8 changes: 4 additions & 4 deletions cdn-proto/benches/protocols.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ fn set_up_bench<Proto: Protocol<Untrusted>>(
.expect("failed to generate TLS cert from CA");

// Create listener, bind to port
let listener = Proto::bind(&format!("127.0.0.1:{}", port), tls_cert, tls_key)
let listener = Proto::bind(&format!("127.0.0.1:{port}"), tls_cert, tls_key)
.await
.expect("failed to listen on port");

Expand All @@ -85,7 +85,7 @@ fn set_up_bench<Proto: Protocol<Untrusted>>(
});

// Attempt to connect
let conn1 = Proto::connect(&format!("127.0.0.1:{}", port), true)
let conn1 = Proto::connect(&format!("127.0.0.1:{port}"), true)
.await
.expect("failed to connect to listener");

Expand Down Expand Up @@ -114,7 +114,7 @@ fn bench_quic(c: &mut Criterion) {
static MB: usize = 1024 * 1024;
let mut group = c.benchmark_group("quic_transfer");
// The message sizes we want to test
for size in [100, 1 * KB, 100 * KB, 10 * MB, 100 * MB].iter() {
for size in &[100, KB, 100 * KB, 10 * MB, 100 * MB] {
// Set up our bench
let (runtime, conn1, conn2, message) = set_up_bench::<Quic>(*size);

Expand All @@ -140,7 +140,7 @@ fn bench_tcp(c: &mut Criterion) {
static MB: usize = 1024 * 1024;
let mut group = c.benchmark_group("tcp_transfer");
// The message sizes we want to test
for size in [100, 1 * KB, 100 * KB, 10 * MB, 100 * MB].iter() {
for size in &[100, KB, 100 * KB, 10 * MB, 100 * MB] {
// Set up our bench
let (runtime, conn1, conn2, message) = set_up_bench::<Tcp>(*size);

Expand Down
8 changes: 6 additions & 2 deletions cdn-proto/src/connection/auth/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ impl<Def: RunDef> BrokerAuth<Def> {
/// - If our connection fails
pub async fn verify_user(
connection: &UserConnection<Def>,
broker_identifier: &BrokerIdentifier,
#[cfg(not(feature = "global-permits"))] broker_identifier: &BrokerIdentifier,
discovery_client: &mut Def::DiscoveryClientType,
) -> Result<(UserPublicKey, Vec<Topic>)> {
// Receive the permit
Expand All @@ -100,7 +100,11 @@ impl<Def: RunDef> BrokerAuth<Def> {

// Check the permit
let serialized_public_key = match discovery_client
.validate_permit(broker_identifier, auth_message.permit)
.validate_permit(
#[cfg(not(feature = "global-permits"))]
broker_identifier,
auth_message.permit,
)
.await
{
// The permit did not exist
Expand Down
1 change: 1 addition & 0 deletions cdn-proto/src/connection/auth/marshal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ impl<Def: RunDef> MarshalAuth<Def> {
// Generate and issue a permit for said broker
let permit = match discovery_client
.issue_permit(
#[cfg(not(feature = "global-permits"))]
&broker_with_least_connections,
Duration::from_secs(30),
auth_message.public_key,
Expand Down
Loading

0 comments on commit c709b7b

Please sign in to comment.