Skip to content

Commit

Permalink
add thread manager
Browse files Browse the repository at this point in the history
  • Loading branch information
Alex Pyattaev committed Dec 15, 2024
1 parent 141db09 commit b293f6e
Show file tree
Hide file tree
Showing 16 changed files with 1,276 additions and 40 deletions.
276 changes: 236 additions & 40 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ members = [
"svm-transaction",
"test-validator",
"thin-client",
"thread-manager",
"timings",
"tls-utils",
"tokens",
Expand Down Expand Up @@ -446,6 +447,7 @@ solana-bucket-map = { path = "bucket_map", version = "=2.2.0" }
solana-builtins = { path = "builtins", version = "=2.2.0" }
solana-builtins-default-costs = { path = "builtins-default-costs", version = "=2.2.0" }
agave-cargo-registry = { path = "cargo-registry", version = "=2.2.0" }
agave-thread-manager = { path = "thread-manager", version = "=2.2.0" }
solana-clap-utils = { path = "clap-utils", version = "=2.2.0" }
solana-clap-v3-utils = { path = "clap-v3-utils", version = "=2.2.0" }
solana-cli = { path = "cli", version = "=2.2.0" }
Expand Down
21 changes: 21 additions & 0 deletions thread-manager/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
[package]
name = "agave-thread-manager"
version = { workspace = true }
authors = { workspace = true }
repository = { workspace = true }
homepage = { workspace = true }
license = { workspace = true }
edition = { workspace = true }


[dependencies]
affinity = "0.1.2"
anyhow = { workspace = true }
serde = { workspace = true, features = ["derive"] }
thread-priority = "1.2.0"
tokio = { workspace = true, features = ["time", "rt-multi-thread"] }
rayon = { workspace = true }

[dev-dependencies]
axum = "0.7.9"
serde_json = { workspace = true }
14 changes: 14 additions & 0 deletions thread-manager/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# thread-manager
Balances machine resources between multiple Tokio runtimes

# Supported threading models
## Tokio
Multiple tokio runtimes can be created, and each may be assigned its own pool of CPU cores to run on.
Number of worker and blocking threads is configurable

## Native
Native threads can be spawned from managed pools, this allows them to inheirt a particular affinity from the pool, as well as to
control the total number of threads made in every pool.

## Rayon
Rayon already manages thread pools, all thread_manager does on top is enforce affinity and priority for rayon threads.
136 changes: 136 additions & 0 deletions thread-manager/examples/core_contention_basics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
use std::{
future::IntoFuture,
io::Write,
net::{IpAddr, Ipv4Addr, SocketAddr},
path::PathBuf,
time::Duration,
};

async fn axum_main(port: u16) {
use axum::{routing::get, Router};

// basic handler that responds with a static string
async fn root() -> &'static str {
tokio::time::sleep(Duration::from_millis(1)).await;
"Hello, World!"
}

// build our application with a route
let app = Router::new().route("/", get(root));

// run our app with hyper, listening globally on port 3000
let listener =
tokio::net::TcpListener::bind(SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), port))
.await
.unwrap();
let timeout = tokio::time::timeout(
Duration::from_secs(11),
axum::serve(listener, app).into_future(),
)
.await;
match timeout {
Ok(v) => v.unwrap(),
Err(_) => {
println!("Terminating server on port {port}");
}
}
}
use affinity::*;
use agave_thread_manager::*;

fn main() -> anyhow::Result<()> {
println!(
"\tCurrent thread affinity : {:?}",
get_thread_affinity().unwrap()
);
println!("\tTotal cores : {}", get_core_num());

let experiments = [
"examples/core_contention_dedicated_set.json",
"examples/core_contention_contending_set.json",
];

for exp in experiments {
println!("===================");
println!("Running {exp}");
let mut conffile = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
conffile.push(exp);
let conffile = std::fs::File::open(conffile)?;
let cfg: RuntimeManagerConfig = serde_json::from_reader(conffile)?;
//println!("Loaded config {}", serde_json::to_string_pretty(&cfg)?);

let rtm = RuntimeManager::new(cfg).unwrap();
let tok1 = rtm
.get_tokio("axum1")
.expect("Expecting runtime named axum1");
let tok2 = rtm
.get_tokio("axum2")
.expect("Expecting runtime named axum2");

let wrk_cores: Vec<_> = (32..64).collect();
let results = std::thread::scope(|s| {
s.spawn(|| {
tok1.start(axum_main(8888));
});
s.spawn(|| {
tok2.start(axum_main(8889));
});
let jh = s.spawn(|| run_wrk(&[8888, 8889], &wrk_cores, wrk_cores.len(), 1000).unwrap());
jh.join().expect("WRK crashed!")
});
println!("Results are: {:?}", results);
}
Ok(())
}

fn run_wrk(
ports: &[u16],
cpus: &[usize],
threads: usize,
connections: usize,
) -> anyhow::Result<(Vec<Duration>, Vec<f32>)> {
let mut script = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
script.push("examples/report.lua");
let cpus: Vec<String> = cpus.iter().map(|c| c.to_string()).collect();
let cpus = cpus.join(",");

let mut children: Vec<_> = ports
.iter()
.map(|p| {
std::process::Command::new("taskset")
.arg("-c")
.arg(&cpus)
.arg("wrk")
.arg(format!("http://localhost:{}", p))
.arg("-d10")
.arg(format!("-s{}", script.to_str().unwrap()))
.arg(format!("-t{threads}"))
.arg(format!("-c{connections}"))
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.spawn()
.unwrap()
})
.collect();

use std::str;
let outs = children.drain(..).map(|c| c.wait_with_output().unwrap());
let mut all_latencies = vec![];
let mut all_rps = vec![];
for (out, port) in outs.zip(ports.iter()) {
println!("=========================");
std::io::stdout().write_all(&out.stderr)?;
let res = str::from_utf8(&out.stdout)?;
let mut res = res.lines().last().unwrap().split(' ');

let latency_us: u64 = res.next().unwrap().parse()?;
let latency = Duration::from_micros(latency_us);

let requests: usize = res.next().unwrap().parse()?;
let rps = requests as f32 / 10.0;
println!("WRK results for port {port}: {latency:?} {rps}");
all_latencies.push(Duration::from_micros(latency_us));
all_rps.push(rps);
}
Ok((all_latencies, all_rps))
}
31 changes: 31 additions & 0 deletions thread-manager/examples/core_contention_contending_set.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
{
"tokio_configs": {
"tokio1": {
"worker_threads": 8,
"max_blocking_threads": 1,
"priority": 0,
"core_allocation": {
"DedicatedCoreSet": {
"min": 0,
"max": 8
}
}
},
"tokio2": {
"worker_threads": 8,
"max_blocking_threads": 1,
"priority": 0,
"core_allocation": {
"DedicatedCoreSet": {
"min": 0,
"max": 8
}
}
}
},
"tokio_runtime_mapping": {
"axum2": "tokio2",
"axum1": "tokio1"
},
"native_configs": {}
}
31 changes: 31 additions & 0 deletions thread-manager/examples/core_contention_dedicated_set.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
{
"tokio_configs": {
"tokio1": {
"worker_threads": 4,
"max_blocking_threads": 1,
"priority": 0,
"core_allocation": {
"DedicatedCoreSet": {
"min": 0,
"max": 4
}
}
},
"tokio2": {
"worker_threads": 4,
"max_blocking_threads": 1,
"priority": 0,
"core_allocation": {
"DedicatedCoreSet": {
"min": 4,
"max": 8
}
}
}
},
"tokio_runtime_mapping": {
"axum2": "tokio2",
"axum1": "tokio1"
},
"native_configs": {}
}
20 changes: 20 additions & 0 deletions thread-manager/examples/core_contention_single_runtime.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
{
"tokio_configs": {
"tokio1": {
"worker_threads": 8,
"max_blocking_threads": 1,
"priority": 0,
"core_allocation": {
"DedicatedCoreSet": {
"min": 0,
"max": 8
}
}
}
},
"tokio_runtime_mapping": {
"axum2": "tokio1",
"axum1": "tokio1"
},
"native_configs": {}
}
Loading

0 comments on commit b293f6e

Please sign in to comment.