Skip to content

Commit

Permalink
periodic sampling of metrics to avoid flood, fixup examples
Browse files Browse the repository at this point in the history
  • Loading branch information
Alex Pyattaev committed Dec 17, 2024
1 parent 3c83c09 commit 4e1d728
Show file tree
Hide file tree
Showing 6 changed files with 173 additions and 121 deletions.
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions thread-manager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,6 @@ affinity = "0.1.2"

[dev-dependencies]
axum = "0.7.9"
env_logger = { workspace = true }
serde_json = { workspace = true }
toml = { workspace = true }
63 changes: 35 additions & 28 deletions thread-manager/examples/core_contention_basics.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
use std::{
future::IntoFuture,
io::{Read, Write},
net::{IpAddr, Ipv4Addr, SocketAddr},
path::PathBuf,
time::Duration,
use {
agave_thread_manager::*,
log::{debug, info},
std::{
future::IntoFuture,
io::{Read, Write},
net::{IpAddr, Ipv4Addr, SocketAddr},
path::PathBuf,
time::Duration,
},
};

async fn axum_main(port: u16) {
Expand Down Expand Up @@ -31,47 +35,50 @@ async fn axum_main(port: u16) {
match timeout {
Ok(v) => v.unwrap(),
Err(_) => {
println!("Terminating server on port {port}");
info!("Terminating server on port {port}");
}
}
}
use agave_thread_manager::*;

fn main() -> anyhow::Result<()> {
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
let experiments = [
"examples/core_contention_dedicated_set.json",
"examples/core_contention_contending_set.json",
"examples/core_contention_dedicated_set.toml",
"examples/core_contention_contending_set.toml",
];

for exp in experiments {
println!("===================");
println!("Running {exp}");
let mut conffile = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
conffile.push(exp);
info!("===================");
info!("Running {exp}");
let mut conf_file = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
conf_file.push(exp);
let mut buf = String::new();
std::fs::File::open(conffile)?.read_to_string(&mut buf)?;
std::fs::File::open(conf_file)?.read_to_string(&mut buf)?;
let cfg: RuntimeManagerConfig = toml::from_str(&buf)?;
//println!("Loaded config {}", serde_json::to_string_pretty(&cfg)?);

let rtm = ThreadManager::new(cfg).unwrap();
let tok1 = rtm
let manager = ThreadManager::new(cfg).unwrap();
let tokio1 = manager
.get_tokio("axum1")
.expect("Expecting runtime named axum1");
let tok2 = rtm
tokio1.start_metrics_sampling(Duration::from_secs(1));
let tokio2 = manager
.get_tokio("axum2")
.expect("Expecting runtime named axum2");
tokio2.start_metrics_sampling(Duration::from_secs(1));

let wrk_cores: Vec<_> = (32..64).collect();
let results = std::thread::scope(|s| {
s.spawn(|| {
tok1.tokio.block_on(axum_main(8888));
let results = std::thread::scope(|scope| {
scope.spawn(|| {
tokio1.tokio.block_on(axum_main(8888));
});
s.spawn(|| {
tok2.tokio.block_on(axum_main(8889));
scope.spawn(|| {
tokio2.tokio.block_on(axum_main(8889));
});
let jh = s.spawn(|| run_wrk(&[8888, 8889], &wrk_cores, wrk_cores.len(), 1000).unwrap());
jh.join().expect("WRK crashed!")
let join_handle =
scope.spawn(|| run_wrk(&[8888, 8889], &wrk_cores, wrk_cores.len(), 1000).unwrap());
join_handle.join().expect("WRK crashed!")
});
//print out the results of the bench run
println!("Results are: {:?}", results);
}
Ok(())
Expand Down Expand Up @@ -112,7 +119,7 @@ fn run_wrk(
let mut all_latencies = vec![];
let mut all_rps = vec![];
for (out, port) in outs.zip(ports.iter()) {
println!("=========================");
debug!("=========================");
std::io::stdout().write_all(&out.stderr)?;
let res = str::from_utf8(&out.stdout)?;
let mut res = res.lines().last().unwrap().split(' ');
Expand All @@ -122,7 +129,7 @@ fn run_wrk(

let requests: usize = res.next().unwrap().parse()?;
let rps = requests as f32 / 10.0;
println!("WRK results for port {port}: {latency:?} {rps}");
debug!("WRK results for port {port}: {latency:?} {rps}");
all_latencies.push(Duration::from_micros(latency_us));
all_rps.push(rps);
}
Expand Down
122 changes: 66 additions & 56 deletions thread-manager/examples/core_contention_sweep.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
use std::{
collections::HashMap,
future::IntoFuture,
io::Write,
net::{IpAddr, Ipv4Addr, SocketAddr},
path::PathBuf,
time::Duration,
use {
agave_thread_manager::*,
log::{debug, info},
std::{
collections::HashMap,
future::IntoFuture,
io::Write,
net::{IpAddr, Ipv4Addr, SocketAddr},
path::PathBuf,
time::Duration,
},
};

async fn axum_main(port: u16) {
use axum::{routing::get, Router};

// basic handler that responds with a static string
async fn root() -> &'static str {
tokio::time::sleep(Duration::from_millis(1)).await;
Expand All @@ -24,6 +27,7 @@ async fn axum_main(port: u16) {
tokio::net::TcpListener::bind(SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), port))
.await
.unwrap();
info!("Server on port {port} ready");
let timeout = tokio::time::timeout(
Duration::from_secs(11),
axum::serve(listener, app).into_future(),
Expand All @@ -32,11 +36,10 @@ async fn axum_main(port: u16) {
match timeout {
Ok(v) => v.unwrap(),
Err(_) => {
println!("Terminating server on port {port}");
info!("Terminating server on port {port}");
}
}
}
use agave_thread_manager::*;
fn make_config_shared(cc: usize) -> RuntimeManagerConfig {
let tokio_cfg_1 = TokioConfig {
core_allocation: CoreAllocation::DedicatedCoreSet { min: 0, max: cc },
Expand All @@ -46,41 +49,33 @@ fn make_config_shared(cc: usize) -> RuntimeManagerConfig {
let tokio_cfg_2 = tokio_cfg_1.clone();
RuntimeManagerConfig {
tokio_configs: HashMap::from([
("tokio1".into(), tokio_cfg_1),
("tokio2".into(), tokio_cfg_2),
]),
tokio_runtime_mapping: HashMap::from([
("axum1".into(), "tokio1".into()),
("axum2".into(), "tokio2".into()),
("axum1".into(), tokio_cfg_1),
("axum2".into(), tokio_cfg_2),
]),
..Default::default()
}
}
fn make_config_dedicated(cc: usize) -> RuntimeManagerConfig {
fn make_config_dedicated(core_count: usize) -> RuntimeManagerConfig {
let tokio_cfg_1 = TokioConfig {
core_allocation: CoreAllocation::DedicatedCoreSet {
min: 0,
max: cc / 2,
max: core_count / 2,
},
worker_threads: cc / 2,
worker_threads: core_count / 2,
..Default::default()
};
let tokio_cfg_2 = TokioConfig {
core_allocation: CoreAllocation::DedicatedCoreSet {
min: cc / 2,
max: cc,
min: core_count / 2,
max: core_count,
},
worker_threads: cc / 2,
worker_threads: core_count / 2,
..Default::default()
};
RuntimeManagerConfig {
tokio_configs: HashMap::from([
("tokio1".into(), tokio_cfg_1),
("tokio2".into(), tokio_cfg_2),
]),
tokio_runtime_mapping: HashMap::from([
("axum1".into(), "tokio1".into()),
("axum2".into(), "tokio2".into()),
("axum1".into(), tokio_cfg_1),
("axum2".into(), tokio_cfg_2),
]),
..Default::default()
}
Expand All @@ -93,7 +88,7 @@ enum Regime {
Single,
}
impl Regime {
const VALUES: [Self; 3] = [Self::Shared, Self::Dedicated, Self::Single];
const VALUES: [Self; 3] = [Self::Dedicated, Self::Shared, Self::Single];
}

#[derive(Debug, Default, serde::Serialize)]
Expand All @@ -103,72 +98,84 @@ struct Results {
}

fn main() -> anyhow::Result<()> {
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
let mut all_results: HashMap<String, Results> = HashMap::new();
for regime in Regime::VALUES {
let mut res = Results::default();
for core_cnt in [2, 4, 8, 16] {
let rtm;
println!("===================");
println!("Running {core_cnt} cores under {regime:?}");
let (tok1, tok2) = match regime {
let mut results = Results::default();
for core_count in [2, 4, 8, 16] {
let manager;
info!("===================");
info!("Running {core_count} cores under {regime:?}");
let (tokio1, tokio2) = match regime {
Regime::Shared => {
rtm = ThreadManager::new(make_config_shared(core_cnt)).unwrap();
manager = ThreadManager::new(make_config_shared(core_count)).unwrap();
(
rtm.get_tokio("axum1")
manager
.get_tokio("axum1")
.expect("Expecting runtime named axum1"),
rtm.get_tokio("axum2")
manager
.get_tokio("axum2")
.expect("Expecting runtime named axum2"),
)
}
Regime::Dedicated => {
rtm = ThreadManager::new(make_config_dedicated(core_cnt)).unwrap();
manager = ThreadManager::new(make_config_dedicated(core_count)).unwrap();
(
rtm.get_tokio("axum1")
manager
.get_tokio("axum1")
.expect("Expecting runtime named axum1"),
rtm.get_tokio("axum2")
manager
.get_tokio("axum2")
.expect("Expecting runtime named axum2"),
)
}
Regime::Single => {
rtm = ThreadManager::new(make_config_shared(core_cnt)).unwrap();
manager = ThreadManager::new(make_config_shared(core_count)).unwrap();
(
rtm.get_tokio("axum1")
manager
.get_tokio("axum1")
.expect("Expecting runtime named axum1"),
rtm.get_tokio("axum2")
manager
.get_tokio("axum2")
.expect("Expecting runtime named axum2"),
)
}
};

let wrk_cores: Vec<_> = (32..64).collect();
let results = std::thread::scope(|s| {
let measurement = std::thread::scope(|s| {
s.spawn(|| {
tok1.tokio.spawn(axum_main(8888));
tokio1.start_metrics_sampling(Duration::from_secs(1));
tokio1.tokio.block_on(axum_main(8888));
});
let jh = match regime {
Regime::Single => s.spawn(|| {
run_wrk(&[8888, 8888], &wrk_cores, wrk_cores.len(), 1000).unwrap()
run_wrk(&[8888, 8888], &wrk_cores, wrk_cores.len(), 3000).unwrap()
}),
_ => {
s.spawn(|| {
tok2.tokio.spawn(axum_main(8889));
tokio2.start_metrics_sampling(Duration::from_secs(1));
tokio2.tokio.block_on(axum_main(8889));
});
s.spawn(|| {
run_wrk(&[8888, 8889], &wrk_cores, wrk_cores.len(), 1000).unwrap()
run_wrk(&[8888, 8889], &wrk_cores, wrk_cores.len(), 3000).unwrap()
})
}
};
jh.join().expect("WRK crashed!")
});
println!("Results are: {:?}", results);
res.latencies_s.push(
results.0.iter().map(|a| a.as_secs_f32()).sum::<f32>() / results.0.len() as f32,
info!("Results are: {:?}", measurement);
results.latencies_s.push(
measurement.0.iter().map(|a| a.as_secs_f32()).sum::<f32>()
/ measurement.0.len() as f32,
);
res.rps.push(results.1.iter().sum());
results.rps.push(measurement.1.iter().sum());
}
all_results.insert(format!("{regime:?}"), res);
all_results.insert(format!("{regime:?}"), results);
std::thread::sleep(Duration::from_secs(3));
}

//print the resulting measurements so they can be e.g. plotted with matplotlib
println!("{}", serde_json::to_string_pretty(&all_results)?);

Ok(())
Expand All @@ -180,6 +187,9 @@ fn run_wrk(
threads: usize,
connections: usize,
) -> anyhow::Result<(Vec<Duration>, Vec<f32>)> {
//Sleep a bit to let axum start
std::thread::sleep(Duration::from_millis(500));

let mut script = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
script.push("examples/report.lua");
let cpus: Vec<String> = cpus.iter().map(|c| c.to_string()).collect();
Expand Down Expand Up @@ -209,7 +219,7 @@ fn run_wrk(
let mut all_latencies = vec![];
let mut all_rps = vec![];
for (out, port) in outs.zip(ports.iter()) {
println!("=========================");
debug!("=========================");
std::io::stdout().write_all(&out.stderr)?;
let res = str::from_utf8(&out.stdout)?;
let mut res = res.lines().last().unwrap().split(' ');
Expand All @@ -219,7 +229,7 @@ fn run_wrk(

let requests: usize = res.next().unwrap().parse()?;
let rps = requests as f32 / 10.0;
println!("WRK results for port {port}: {latency:?} {rps}");
debug!("WRK results for port {port}: {latency:?} {rps}");
all_latencies.push(Duration::from_micros(latency_us));
all_rps.push(rps);
}
Expand Down
Loading

0 comments on commit 4e1d728

Please sign in to comment.