From b293f6e0deb0f8339f87cc083814e7bbfb4d43e0 Mon Sep 17 00:00:00 2001 From: Alex Pyattaev Date: Tue, 3 Dec 2024 04:48:19 +0000 Subject: [PATCH] add thread manager --- Cargo.lock | 276 +++++++++++++++--- Cargo.toml | 2 + thread-manager/Cargo.toml | 21 ++ thread-manager/README.md | 14 + .../examples/core_contention_basics.rs | 136 +++++++++ .../core_contention_contending_set.json | 31 ++ .../core_contention_dedicated_set.json | 31 ++ .../core_contention_single_runtime.json | 20 ++ .../examples/core_contention_sweep.rs | 223 ++++++++++++++ thread-manager/examples/report.lua | 4 + thread-manager/src/lib.rs | 228 +++++++++++++++ thread-manager/src/native_thread_runtime.rs | 120 ++++++++ thread-manager/src/policy.rs | 26 ++ thread-manager/src/rayon_runtime.rs | 44 +++ thread-manager/src/runtime_manager.rs | 0 thread-manager/src/tokio_runtime.rs | 140 +++++++++ 16 files changed, 1276 insertions(+), 40 deletions(-) create mode 100644 thread-manager/Cargo.toml create mode 100644 thread-manager/README.md create mode 100644 thread-manager/examples/core_contention_basics.rs create mode 100644 thread-manager/examples/core_contention_contending_set.json create mode 100644 thread-manager/examples/core_contention_dedicated_set.json create mode 100644 thread-manager/examples/core_contention_single_runtime.json create mode 100644 thread-manager/examples/core_contention_sweep.rs create mode 100644 thread-manager/examples/report.lua create mode 100644 thread-manager/src/lib.rs create mode 100644 thread-manager/src/native_thread_runtime.rs create mode 100644 thread-manager/src/policy.rs create mode 100644 thread-manager/src/rayon_runtime.rs create mode 100644 thread-manager/src/runtime_manager.rs create mode 100644 thread-manager/src/tokio_runtime.rs diff --git a/Cargo.lock b/Cargo.lock index 6730cf067af3df..5acec2a9028929 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -63,6 +63,18 @@ dependencies = [ "zeroize", ] +[[package]] +name = "affinity" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "763e484feceb7dd021b21c5c6f81aee06b1594a743455ec7efbf72e6355e447b" +dependencies = [ + "cfg-if 1.0.0", + "errno", + "libc", + "num_cpus", +] + [[package]] name = "agave-accounts-hash-cache-tool" version = "2.2.0" @@ -85,7 +97,7 @@ dependencies = [ "clap 2.33.3", "flate2", "hex", - "hyper", + "hyper 0.14.31", "log", "serde", "serde_derive", @@ -232,6 +244,20 @@ dependencies = [ "solana-version", ] +[[package]] +name = "agave-thread-manager" +version = "2.2.0" +dependencies = [ + "affinity", + "anyhow", + "axum 0.7.9", + "rayon", + "serde", + "serde_json", + "thread-priority", + "tokio", +] + [[package]] name = "agave-transaction-view" version = "2.2.0" @@ -777,13 +803,13 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3b829e4e32b91e643de6eafe82b1d90675f5874230191a4ffbc1b336dec4d6bf" dependencies = [ "async-trait", - "axum-core", + "axum-core 0.3.4", "bitflags 1.3.2", "bytes", "futures-util", - "http", - "http-body", - "hyper", + "http 0.2.12", + "http-body 0.4.5", + "hyper 0.14.31", "itoa", "matchit", "memchr", @@ -792,12 +818,46 @@ dependencies = [ "pin-project-lite", "rustversion", "serde", - "sync_wrapper", - "tower", + "sync_wrapper 0.1.2", + "tower 0.4.13", "tower-layer", "tower-service", ] +[[package]] +name = "axum" +version = "0.7.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "edca88bc138befd0323b20752846e6587272d3b03b0343c8ea28a6f819e6e71f" +dependencies = [ + "async-trait", + "axum-core 0.4.5", + "bytes", + "futures-util", + "http 1.1.0", + "http-body 1.0.1", + "http-body-util", + "hyper 1.5.1", + "hyper-util", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding 2.3.1", + "pin-project-lite", + "rustversion", + "serde", + "serde_json", + "serde_path_to_error", + "serde_urlencoded", + "sync_wrapper 1.0.2", + "tokio", + "tower 0.5.1", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "axum-core" version = "0.3.4" @@ -807,14 +867,35 @@ dependencies = [ "async-trait", "bytes", "futures-util", - "http", - "http-body", + "http 0.2.12", + "http-body 0.4.5", "mime", "rustversion", "tower-layer", "tower-service", ] +[[package]] +name = "axum-core" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09f2bd6146b97ae3359fa0cc6d6b376d9539582c7b4220f041a33ec24c226199" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http 1.1.0", + "http-body 1.0.1", + "http-body-util", + "mime", + "pin-project-lite", + "rustversion", + "sync_wrapper 1.0.2", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "backoff" version = "0.4.0" @@ -2208,13 +2289,13 @@ version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f4b0ea5ef6dc2388a4b1669fa32097249bc03a15417b97cb75e38afb309e4a89" dependencies = [ - "http", + "http 0.2.12", "prost", "tokio", "tokio-stream", "tonic", "tonic-build", - "tower", + "tower 0.4.13", "tower-service", ] @@ -2759,7 +2840,7 @@ dependencies = [ "bitflags 1.3.2", "bytes", "headers-core", - "http", + "http 0.2.12", "httpdate", "mime", "sha-1 0.10.0", @@ -2771,7 +2852,7 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e7f66481bfee273957b1f20485a4ff3362987f85b2c236580d81b4eb7a326429" dependencies = [ - "http", + "http 0.2.12", ] [[package]] @@ -2861,6 +2942,17 @@ dependencies = [ "itoa", ] +[[package]] +name = "http" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "21b9ddb458710bc376481b842f5da65cdf31522de232c1ca8146abce2a358258" +dependencies = [ + "bytes", + "fnv", + "itoa", +] + [[package]] name = "http-body" version = "0.4.5" @@ -2868,7 +2960,30 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d5f38f16d184e36f2408a55281cd658ecbd3ca05cce6d6510a176eca393e26d1" dependencies = [ "bytes", - "http", + "http 0.2.12", + "pin-project-lite", +] + +[[package]] +name = "http-body" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184" +dependencies = [ + "bytes", + "http 1.1.0", +] + +[[package]] +name = "http-body-util" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "793429d76616a256bcb62c2a2ec2bed781c8307e797e2598c50010f2bee2544f" +dependencies = [ + "bytes", + "futures-util", + "http 1.1.0", + "http-body 1.0.1", "pin-project-lite", ] @@ -2901,8 +3016,8 @@ dependencies = [ "futures-core", "futures-util", "h2", - "http", - "http-body", + "http 0.2.12", + "http-body 0.4.5", "httparse", "httpdate", "itoa", @@ -2914,6 +3029,25 @@ dependencies = [ "want", ] +[[package]] +name = "hyper" +version = "1.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97818827ef4f364230e16705d4706e2897df2bb60617d6ca15d598025a3c481f" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "http 1.1.0", + "http-body 1.0.1", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "smallvec", + "tokio", +] + [[package]] name = "hyper-proxy" version = "0.9.1" @@ -2923,8 +3057,8 @@ dependencies = [ "bytes", "futures 0.3.31", "headers", - "http", - "hyper", + "http 0.2.12", + "hyper 0.14.31", "hyper-tls", "native-tls", "tokio", @@ -2939,8 +3073,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8d78e1e73ec14cf7375674f74d7dde185c8206fd9dea6fb6295e8a98098aaa97" dependencies = [ "futures-util", - "http", - "hyper", + "http 0.2.12", + "hyper 0.14.31", "rustls 0.21.12", "tokio", "tokio-rustls", @@ -2952,7 +3086,7 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" dependencies = [ - "hyper", + "hyper 0.14.31", "pin-project-lite", "tokio", "tokio-io-timeout", @@ -2965,12 +3099,28 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" dependencies = [ "bytes", - "hyper", + "hyper 0.14.31", "native-tls", "tokio", "tokio-native-tls", ] +[[package]] +name = "hyper-util" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df2dcfbe0677734ab2f3ffa7fa7bfd4706bfdc1ef393f2ee30184aed67e631b4" +dependencies = [ + "bytes", + "futures-util", + "http 1.1.0", + "http-body 1.0.1", + "hyper 1.5.1", + "pin-project-lite", + "tokio", + "tower-service", +] + [[package]] name = "iana-time-zone" version = "0.1.46" @@ -3393,7 +3543,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e1dea6e07251d9ce6a552abfb5d7ad6bc290a4596c8dcc3d795fae2bbdc1f3ff" dependencies = [ "futures 0.3.31", - "hyper", + "hyper 0.14.31", "jsonrpc-core", "jsonrpc-server-utils", "log", @@ -5012,9 +5162,9 @@ dependencies = [ "futures-core", "futures-util", "h2", - "http", - "http-body", - "hyper", + "http 0.2.12", + "http-body 0.4.5", + "hyper 0.14.31", "hyper-rustls", "hyper-tls", "ipnet", @@ -5031,7 +5181,7 @@ dependencies = [ "serde", "serde_json", "serde_urlencoded", - "sync_wrapper", + "sync_wrapper 0.1.2", "system-configuration", "tokio", "tokio-native-tls", @@ -5054,7 +5204,7 @@ checksum = "5a735987236a8e238bf0296c7e351b999c188ccc11477f311b82b55c93984216" dependencies = [ "anyhow", "async-trait", - "http", + "http 0.2.12", "reqwest", "serde", "task-local-extensions", @@ -5484,6 +5634,16 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_path_to_error" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af99884400da37c88f5e9146b7f1fd0fbcae8f6eec4e9da38b67d05486f814a6" +dependencies = [ + "itoa", + "serde", +] + [[package]] name = "serde_spanned" version = "0.6.5" @@ -9116,8 +9276,8 @@ dependencies = [ "flate2", "futures 0.3.31", "goauth", - "http", - "hyper", + "http 0.2.12", + "hyper 0.14.31", "hyper-proxy", "log", "openssl", @@ -10566,6 +10726,12 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" +[[package]] +name = "sync_wrapper" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0bf256ce5efdfa370213c1dabab5935a12e49f2c58d15e9eac2870d3b4f27263" + [[package]] name = "synstructure" version = "0.12.6" @@ -10826,6 +10992,20 @@ dependencies = [ "syn 2.0.90", ] +[[package]] +name = "thread-priority" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cfe075d7053dae61ac5413a34ea7d4913b6e6207844fd726bdd858b37ff72bf5" +dependencies = [ + "bitflags 2.6.0", + "cfg-if 1.0.0", + "libc", + "log", + "rustversion", + "winapi 0.3.9", +] + [[package]] name = "thread-scoped" version = "1.0.2" @@ -11125,15 +11305,15 @@ checksum = "3082666a3a6433f7f511c7192923fa1fe07c69332d3c6a2e6bb040b569199d5a" dependencies = [ "async-stream", "async-trait", - "axum", + "axum 0.6.20", "base64 0.21.7", "bytes", "futures-core", "futures-util", "h2", - "http", - "http-body", - "hyper", + "http 0.2.12", + "http-body 0.4.5", + "hyper 0.14.31", "hyper-timeout", "percent-encoding 2.3.1", "pin-project", @@ -11142,7 +11322,7 @@ dependencies = [ "tokio", "tokio-rustls", "tokio-stream", - "tower", + "tower 0.4.13", "tower-layer", "tower-service", "tracing", @@ -11181,17 +11361,33 @@ dependencies = [ "tracing", ] +[[package]] +name = "tower" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2873938d487c3cfb9aed7546dc9f2711d867c9f90c46b889989a2cb84eba6b4f" +dependencies = [ + "futures-core", + "futures-util", + "pin-project-lite", + "sync_wrapper 0.1.2", + "tokio", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "tower-layer" -version = "0.3.2" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c20c8dbed6283a09604c3e69b4b7eeb54e298b8a600d4d5ecb5ad39de609f1d0" +checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e" [[package]] name = "tower-service" -version = "0.3.2" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52" +checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3" [[package]] name = "tracing" @@ -11270,7 +11466,7 @@ dependencies = [ "byteorder", "bytes", "data-encoding", - "http", + "http 0.2.12", "httparse", "log", "rand 0.8.5", diff --git a/Cargo.toml b/Cargo.toml index 6055802e73c5a1..666ef71dc60d1b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -198,6 +198,7 @@ members = [ "svm-transaction", "test-validator", "thin-client", + "thread-manager", "timings", "tls-utils", "tokens", @@ -446,6 +447,7 @@ solana-bucket-map = { path = "bucket_map", version = "=2.2.0" } solana-builtins = { path = "builtins", version = "=2.2.0" } solana-builtins-default-costs = { path = "builtins-default-costs", version = "=2.2.0" } agave-cargo-registry = { path = "cargo-registry", version = "=2.2.0" } +agave-thread-manager = { path = "thread-manager", version = "=2.2.0" } solana-clap-utils = { path = "clap-utils", version = "=2.2.0" } solana-clap-v3-utils = { path = "clap-v3-utils", version = "=2.2.0" } solana-cli = { path = "cli", version = "=2.2.0" } diff --git a/thread-manager/Cargo.toml b/thread-manager/Cargo.toml new file mode 100644 index 00000000000000..69d40c4601dbf3 --- /dev/null +++ b/thread-manager/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "agave-thread-manager" +version = { workspace = true } +authors = { workspace = true } +repository = { workspace = true } +homepage = { workspace = true } +license = { workspace = true } +edition = { workspace = true } + + +[dependencies] +affinity = "0.1.2" +anyhow = { workspace = true } +serde = { workspace = true, features = ["derive"] } +thread-priority = "1.2.0" +tokio = { workspace = true, features = ["time", "rt-multi-thread"] } +rayon = { workspace = true } + +[dev-dependencies] +axum = "0.7.9" +serde_json = { workspace = true } diff --git a/thread-manager/README.md b/thread-manager/README.md new file mode 100644 index 00000000000000..7fa25ffc1571f6 --- /dev/null +++ b/thread-manager/README.md @@ -0,0 +1,14 @@ +# thread-manager +Balances machine resources between multiple Tokio runtimes + +# Supported threading models +## Tokio +Multiple tokio runtimes can be created, and each may be assigned its own pool of CPU cores to run on. +Number of worker and blocking threads is configurable + +## Native +Native threads can be spawned from managed pools, this allows them to inheirt a particular affinity from the pool, as well as to +control the total number of threads made in every pool. + +## Rayon +Rayon already manages thread pools, all thread_manager does on top is enforce affinity and priority for rayon threads. diff --git a/thread-manager/examples/core_contention_basics.rs b/thread-manager/examples/core_contention_basics.rs new file mode 100644 index 00000000000000..ed40dd6918de85 --- /dev/null +++ b/thread-manager/examples/core_contention_basics.rs @@ -0,0 +1,136 @@ +use std::{ + future::IntoFuture, + io::Write, + net::{IpAddr, Ipv4Addr, SocketAddr}, + path::PathBuf, + time::Duration, +}; + +async fn axum_main(port: u16) { + use axum::{routing::get, Router}; + + // basic handler that responds with a static string + async fn root() -> &'static str { + tokio::time::sleep(Duration::from_millis(1)).await; + "Hello, World!" + } + + // build our application with a route + let app = Router::new().route("/", get(root)); + + // run our app with hyper, listening globally on port 3000 + let listener = + tokio::net::TcpListener::bind(SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), port)) + .await + .unwrap(); + let timeout = tokio::time::timeout( + Duration::from_secs(11), + axum::serve(listener, app).into_future(), + ) + .await; + match timeout { + Ok(v) => v.unwrap(), + Err(_) => { + println!("Terminating server on port {port}"); + } + } +} +use affinity::*; +use agave_thread_manager::*; + +fn main() -> anyhow::Result<()> { + println!( + "\tCurrent thread affinity : {:?}", + get_thread_affinity().unwrap() + ); + println!("\tTotal cores : {}", get_core_num()); + + let experiments = [ + "examples/core_contention_dedicated_set.json", + "examples/core_contention_contending_set.json", + ]; + + for exp in experiments { + println!("==================="); + println!("Running {exp}"); + let mut conffile = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + conffile.push(exp); + let conffile = std::fs::File::open(conffile)?; + let cfg: RuntimeManagerConfig = serde_json::from_reader(conffile)?; + //println!("Loaded config {}", serde_json::to_string_pretty(&cfg)?); + + let rtm = RuntimeManager::new(cfg).unwrap(); + let tok1 = rtm + .get_tokio("axum1") + .expect("Expecting runtime named axum1"); + let tok2 = rtm + .get_tokio("axum2") + .expect("Expecting runtime named axum2"); + + let wrk_cores: Vec<_> = (32..64).collect(); + let results = std::thread::scope(|s| { + s.spawn(|| { + tok1.start(axum_main(8888)); + }); + s.spawn(|| { + tok2.start(axum_main(8889)); + }); + let jh = s.spawn(|| run_wrk(&[8888, 8889], &wrk_cores, wrk_cores.len(), 1000).unwrap()); + jh.join().expect("WRK crashed!") + }); + println!("Results are: {:?}", results); + } + Ok(()) +} + +fn run_wrk( + ports: &[u16], + cpus: &[usize], + threads: usize, + connections: usize, +) -> anyhow::Result<(Vec, Vec)> { + let mut script = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + script.push("examples/report.lua"); + let cpus: Vec = cpus.iter().map(|c| c.to_string()).collect(); + let cpus = cpus.join(","); + + let mut children: Vec<_> = ports + .iter() + .map(|p| { + std::process::Command::new("taskset") + .arg("-c") + .arg(&cpus) + .arg("wrk") + .arg(format!("http://localhost:{}", p)) + .arg("-d10") + .arg(format!("-s{}", script.to_str().unwrap())) + .arg(format!("-t{threads}")) + .arg(format!("-c{connections}")) + .stdout(std::process::Stdio::piped()) + .stderr(std::process::Stdio::piped()) + .spawn() + .unwrap() + }) + .collect(); + + use std::str; + let outs = children.drain(..).map(|c| c.wait_with_output().unwrap()); + let mut all_latencies = vec![]; + let mut all_rps = vec![]; + for (out, port) in outs.zip(ports.iter()) { + println!("========================="); + std::io::stdout().write_all(&out.stderr)?; + let res = str::from_utf8(&out.stdout)?; + let mut res = res.lines().last().unwrap().split(' '); + + let latency_us: u64 = res.next().unwrap().parse()?; + let latency = Duration::from_micros(latency_us); + + let requests: usize = res.next().unwrap().parse()?; + let rps = requests as f32 / 10.0; + println!("WRK results for port {port}: {latency:?} {rps}"); + all_latencies.push(Duration::from_micros(latency_us)); + all_rps.push(rps); + } + Ok((all_latencies, all_rps)) +} diff --git a/thread-manager/examples/core_contention_contending_set.json b/thread-manager/examples/core_contention_contending_set.json new file mode 100644 index 00000000000000..1225cc8e494b0f --- /dev/null +++ b/thread-manager/examples/core_contention_contending_set.json @@ -0,0 +1,31 @@ +{ + "tokio_configs": { + "tokio1": { + "worker_threads": 8, + "max_blocking_threads": 1, + "priority": 0, + "core_allocation": { + "DedicatedCoreSet": { + "min": 0, + "max": 8 + } + } + }, + "tokio2": { + "worker_threads": 8, + "max_blocking_threads": 1, + "priority": 0, + "core_allocation": { + "DedicatedCoreSet": { + "min": 0, + "max": 8 + } + } + } + }, + "tokio_runtime_mapping": { + "axum2": "tokio2", + "axum1": "tokio1" + }, + "native_configs": {} +} diff --git a/thread-manager/examples/core_contention_dedicated_set.json b/thread-manager/examples/core_contention_dedicated_set.json new file mode 100644 index 00000000000000..4e9c76170cf7cf --- /dev/null +++ b/thread-manager/examples/core_contention_dedicated_set.json @@ -0,0 +1,31 @@ +{ + "tokio_configs": { + "tokio1": { + "worker_threads": 4, + "max_blocking_threads": 1, + "priority": 0, + "core_allocation": { + "DedicatedCoreSet": { + "min": 0, + "max": 4 + } + } + }, + "tokio2": { + "worker_threads": 4, + "max_blocking_threads": 1, + "priority": 0, + "core_allocation": { + "DedicatedCoreSet": { + "min": 4, + "max": 8 + } + } + } + }, + "tokio_runtime_mapping": { + "axum2": "tokio2", + "axum1": "tokio1" + }, + "native_configs": {} +} diff --git a/thread-manager/examples/core_contention_single_runtime.json b/thread-manager/examples/core_contention_single_runtime.json new file mode 100644 index 00000000000000..42d743a188cc35 --- /dev/null +++ b/thread-manager/examples/core_contention_single_runtime.json @@ -0,0 +1,20 @@ +{ + "tokio_configs": { + "tokio1": { + "worker_threads": 8, + "max_blocking_threads": 1, + "priority": 0, + "core_allocation": { + "DedicatedCoreSet": { + "min": 0, + "max": 8 + } + } + } + }, + "tokio_runtime_mapping": { + "axum2": "tokio1", + "axum1": "tokio1" + }, + "native_configs": {} +} diff --git a/thread-manager/examples/core_contention_sweep.rs b/thread-manager/examples/core_contention_sweep.rs new file mode 100644 index 00000000000000..53706a09a344dd --- /dev/null +++ b/thread-manager/examples/core_contention_sweep.rs @@ -0,0 +1,223 @@ +use std::{ + collections::HashMap, + future::IntoFuture, + io::Write, + net::{IpAddr, Ipv4Addr, SocketAddr}, + path::PathBuf, + time::Duration, +}; + +async fn axum_main(port: u16) { + use axum::{routing::get, Router}; + + // basic handler that responds with a static string + async fn root() -> &'static str { + tokio::time::sleep(Duration::from_millis(1)).await; + "Hello, World!" + } + + // build our application with a route + let app = Router::new().route("/", get(root)); + + // run our app with hyper, listening globally on port 3000 + let listener = + tokio::net::TcpListener::bind(SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), port)) + .await + .unwrap(); + let timeout = tokio::time::timeout( + Duration::from_secs(11), + axum::serve(listener, app).into_future(), + ) + .await; + match timeout { + Ok(v) => v.unwrap(), + Err(_) => { + println!("Terminating server on port {port}"); + } + } +} +use agave_thread_manager::*; +fn make_config_shared(cc: usize) -> RuntimeManagerConfig { + let mut tokio_cfg_1 = TokioConfig::default(); + tokio_cfg_1.core_allocation = CoreAllocation::DedicatedCoreSet { min: 0, max: cc }; + tokio_cfg_1.worker_threads = cc; + let mut tokio_cfg_2 = TokioConfig::default(); + tokio_cfg_2.core_allocation = CoreAllocation::DedicatedCoreSet { min: 0, max: cc }; + tokio_cfg_2.worker_threads = cc; + RuntimeManagerConfig { + tokio_configs: HashMap::from([ + ("tokio1".into(), tokio_cfg_1), + ("tokio2".into(), tokio_cfg_2), + ]), + tokio_runtime_mapping: HashMap::from([ + ("axum1".into(), "tokio1".into()), + ("axum2".into(), "tokio2".into()), + ]), + ..Default::default() + } +} +fn make_config_dedicated(cc: usize) -> RuntimeManagerConfig { + let mut tokio_cfg_1 = TokioConfig::default(); + tokio_cfg_1.core_allocation = CoreAllocation::DedicatedCoreSet { + min: 0, + max: cc / 2, + }; + tokio_cfg_1.worker_threads = cc / 2; + let mut tokio_cfg_2 = TokioConfig::default(); + tokio_cfg_2.core_allocation = CoreAllocation::DedicatedCoreSet { + min: cc / 2, + max: cc, + }; + tokio_cfg_2.worker_threads = cc / 2; + RuntimeManagerConfig { + tokio_configs: HashMap::from([ + ("tokio1".into(), tokio_cfg_1), + ("tokio2".into(), tokio_cfg_2), + ]), + tokio_runtime_mapping: HashMap::from([ + ("axum1".into(), "tokio1".into()), + ("axum2".into(), "tokio2".into()), + ]), + ..Default::default() + } +} + +#[derive(Debug, PartialEq, Eq, Clone, Copy)] +enum Regime { + Shared, + Dedicated, + Single, +} +impl Regime { + const VALUES: [Self; 3] = [Self::Shared, Self::Dedicated, Self::Single]; +} + +#[derive(Debug, Default, serde::Serialize)] +struct Results { + latencies_s: Vec, + rps: Vec, +} + +fn main() -> anyhow::Result<()> { + let mut all_results: HashMap = HashMap::new(); + for regime in Regime::VALUES { + let mut res = Results::default(); + for core_cnt in [2, 4, 8, 16] { + let rtm; + println!("==================="); + println!("Running {core_cnt} cores under {regime:?}"); + let (tok1, tok2) = match regime { + Regime::Shared => { + rtm = RuntimeManager::new(make_config_shared(core_cnt)).unwrap(); + ( + rtm.get_tokio("axum1") + .expect("Expecting runtime named axum1"), + rtm.get_tokio("axum2") + .expect("Expecting runtime named axum2"), + ) + } + Regime::Dedicated => { + rtm = RuntimeManager::new(make_config_dedicated(core_cnt)).unwrap(); + ( + rtm.get_tokio("axum1") + .expect("Expecting runtime named axum1"), + rtm.get_tokio("axum2") + .expect("Expecting runtime named axum2"), + ) + } + Regime::Single => { + rtm = RuntimeManager::new(make_config_shared(core_cnt)).unwrap(); + ( + rtm.get_tokio("axum1") + .expect("Expecting runtime named axum1"), + rtm.get_tokio("axum2") + .expect("Expecting runtime named axum2"), + ) + } + }; + + let wrk_cores: Vec<_> = (32..64).collect(); + let results = std::thread::scope(|s| { + s.spawn(|| { + tok1.start(axum_main(8888)); + }); + let jh = match regime { + Regime::Single => s.spawn(|| { + run_wrk(&[8888, 8888], &wrk_cores, wrk_cores.len(), 1000).unwrap() + }), + _ => { + s.spawn(|| { + tok2.start(axum_main(8889)); + }); + s.spawn(|| { + run_wrk(&[8888, 8889], &wrk_cores, wrk_cores.len(), 1000).unwrap() + }) + } + }; + jh.join().expect("WRK crashed!") + }); + println!("Results are: {:?}", results); + res.latencies_s.push( + results.0.iter().map(|a| a.as_secs_f32()).sum::() / results.0.len() as f32, + ); + res.rps.push(results.1.iter().sum()); + } + all_results.insert(format!("{regime:?}"), res); + std::thread::sleep(Duration::from_secs(3)); + } + println!("{}", serde_json::to_string_pretty(&all_results)?); + + Ok(()) +} + +fn run_wrk( + ports: &[u16], + cpus: &[usize], + threads: usize, + connections: usize, +) -> anyhow::Result<(Vec, Vec)> { + let mut script = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + script.push("examples/report.lua"); + let cpus: Vec = cpus.iter().map(|c| c.to_string()).collect(); + let cpus = cpus.join(","); + + let mut children: Vec<_> = ports + .iter() + .map(|p| { + std::process::Command::new("taskset") + .arg("-c") + .arg(&cpus) + .arg("wrk") + .arg(format!("http://localhost:{}", p)) + .arg("-d10") + .arg(format!("-s{}", script.to_str().unwrap())) + .arg(format!("-t{threads}")) + .arg(format!("-c{connections}")) + .stdout(std::process::Stdio::piped()) + .stderr(std::process::Stdio::piped()) + .spawn() + .unwrap() + }) + .collect(); + + use std::str; + let outs = children.drain(..).map(|c| c.wait_with_output().unwrap()); + let mut all_latencies = vec![]; + let mut all_rps = vec![]; + for (out, port) in outs.zip(ports.iter()) { + println!("========================="); + std::io::stdout().write_all(&out.stderr)?; + let res = str::from_utf8(&out.stdout)?; + let mut res = res.lines().last().unwrap().split(' '); + + let latency_us: u64 = res.next().unwrap().parse()?; + let latency = Duration::from_micros(latency_us); + + let requests: usize = res.next().unwrap().parse()?; + let rps = requests as f32 / 10.0; + println!("WRK results for port {port}: {latency:?} {rps}"); + all_latencies.push(Duration::from_micros(latency_us)); + all_rps.push(rps); + } + Ok((all_latencies, all_rps)) +} diff --git a/thread-manager/examples/report.lua b/thread-manager/examples/report.lua new file mode 100644 index 00000000000000..e135db5528667d --- /dev/null +++ b/thread-manager/examples/report.lua @@ -0,0 +1,4 @@ +done = function(summary, latency, requests) + x = string.format("%d %d\n", latency.mean, summary.requests) + io.write(x) +end diff --git a/thread-manager/src/lib.rs b/thread-manager/src/lib.rs new file mode 100644 index 00000000000000..08dd6510579821 --- /dev/null +++ b/thread-manager/src/lib.rs @@ -0,0 +1,228 @@ +use { + affinity::*, + anyhow::Ok, + serde::{Deserialize, Serialize}, + std::{ + collections::HashMap, + sync::{ + atomic::{AtomicUsize, Ordering}, + Mutex, + }, + }, + thread_priority::*, +}; + +mod native_thread_runtime; +mod policy; +mod tokio_runtime; +mod rayon_runtime; + +pub use rayon_runtime::{RayonConfig, RayonRuntime} +pub use native_thread_runtime::{NativeConfig, NativeThreadRuntime}; +pub use policy::CoreAllocation; +pub use tokio_runtime::{TokioConfig, TokioRuntime}; +pub type ConstString = Box; + +#[derive(Default, Debug)] +pub struct RuntimeManager { + pub tokio_runtimes: HashMap, + pub tokio_runtime_mapping: HashMap, + + pub native_thread_runtimes: HashMap, + pub native_runtime_mapping: HashMap, +} + +#[derive(Default, Clone, Debug, Serialize, Deserialize)] +#[serde(default)] +pub struct RuntimeManagerConfig { + pub tokio_configs: HashMap, + pub tokio_runtime_mapping: HashMap, + pub native_runtime_mapping: HashMap, + pub native_configs: HashMap, + pub default_core_allocation: CoreAllocation, +} + +impl RuntimeManager { + pub fn get_native(&self, name: &str) -> Option<&NativeThreadRuntime> { + let n = self.native_runtime_mapping.get(name)?; + self.native_thread_runtimes.get(n) + } + pub fn get_tokio(&self, name: &str) -> Option<&TokioRuntime> { + let n = self.tokio_runtime_mapping.get(name)?; + self.tokio_runtimes.get(n) + } + pub fn set_process_affinity(config: &RuntimeManagerConfig) -> anyhow::Result> { + let chosen_cores_mask: Vec = { + match config.default_core_allocation { + CoreAllocation::PinnedCores { min, max } => (min..max).collect(), + CoreAllocation::DedicatedCoreSet { min, max } => (min..max).collect(), + CoreAllocation::OsDefault => vec![], + } + }; + + if let Err(e) = set_thread_affinity(&chosen_cores_mask) { + anyhow::bail!(e.to_string()) + } + Ok(chosen_cores_mask) + } + + pub fn new(config: RuntimeManagerConfig) -> anyhow::Result { + let mut core_allocations = HashMap::>::new(); + Self::set_process_affinity(&config)?; + let mut manager = Self::default(); + + //TODO: this should probably be cleaned up at some point... + for (k, v) in config.tokio_runtime_mapping.iter() { + manager + .tokio_runtime_mapping + .insert(k.clone().into_boxed_str(), v.clone().into_boxed_str()); + } + for (k, v) in config.native_runtime_mapping.iter() { + manager + .native_runtime_mapping + .insert(k.clone().into_boxed_str(), v.clone().into_boxed_str()); + } + + for (name, cfg) in config.native_configs.iter() { + let nrt = NativeThreadRuntime::new(cfg.clone()); + manager + .native_thread_runtimes + .insert(name.clone().into_boxed_str(), nrt); + } + + for (name, cfg) in config.tokio_configs.iter() { + let tokiort = TokioRuntime::new(name.clone(), cfg.clone())?; + + core_allocations.insert(name.clone().into_boxed_str(), cfg.core_allocation.as_core_mask_vector()); + manager.tokio_runtimes.insert( + name.clone().into_boxed_str(), + tokiort + ); + } + Ok(manager) + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + + use crate::{CoreAllocation, NativeConfig, RuntimeManager, RuntimeManagerConfig}; + + #[test] + fn process_affinity() { + let conf = RuntimeManagerConfig { + native_configs: HashMap::from([( + "pool1".to_owned(), + NativeConfig { + core_allocation: CoreAllocation::DedicatedCoreSet { min: 0, max: 4 }, + max_threads: 5, + priority: 0, + ..Default::default() + }, + )]), + default_core_allocation: CoreAllocation::DedicatedCoreSet { min: 4, max: 8 }, + native_runtime_mapping: HashMap::from([("test".to_owned(), "pool1".to_owned())]), + ..Default::default() + }; + + let rtm = RuntimeManager::new(conf).unwrap(); + let r = rtm.get_native("test").unwrap(); + + let t2 = r + .spawn(|| { + let aff = affinity::get_thread_affinity().unwrap(); + assert_eq!(aff, [0, 1, 2, 3], "Managed thread allocation should be 0-3"); + }) + .unwrap(); + + let t = std::thread::spawn(|| { + let aff = affinity::get_thread_affinity().unwrap(); + assert_eq!(aff, [4, 5, 6, 7], "Default thread allocation should be 4-7"); + + let tt = std::thread::spawn(|| { + let aff = affinity::get_thread_affinity().unwrap(); + assert_eq!( + aff, + [4, 5, 6, 7], + "Nested thread allocation should still be 4-7" + ); + }); + tt.join().unwrap(); + }); + t.join().unwrap(); + t2.join().unwrap(); + } + #[test] + fn rayon_affinity() { + let conf = RuntimeManagerConfig { + native_configs: HashMap::from([( + "pool1".to_owned(), + NativeConfig { + core_allocation: CoreAllocation::DedicatedCoreSet { min: 0, max: 4 }, + max_threads: 5, + priority: 0, + ..Default::default() + }, + )]), + default_core_allocation: CoreAllocation::DedicatedCoreSet { min: 4, max: 8 }, + native_runtime_mapping: HashMap::from([("test".to_owned(), "pool1".to_owned())]), + ..Default::default() + }; + + let rtm = RuntimeManager::new(conf).unwrap(); + let r = rtm.get_native("test").unwrap(); + + let t2 = r + .spawn(|| { + let aff = affinity::get_thread_affinity().unwrap(); + assert_eq!(aff, [0, 1, 2, 3], "Managed thread allocation should be 0-3"); + }) + .unwrap(); + + let rayon_pool = rayon::ThreadPoolBuilder::new() + .num_threads(3) + .start_handler(|idx| { + affinity::set_thread_affinity([1, 2, 3]).unwrap(); + }) + /*.spawn_handler(|thread| { + let mut b = std::thread::Builder::new(); + if let Some(name) = thread.name() { + b = b.name(name.to_owned()); + } + if let Some(stack_size) = thread.stack_size() { + b = b.stack_size(stack_size); + } + b.spawn(|| thread.run())?; + Ok(()) + })*/ + .build() + .unwrap(); + + let t = std::thread::spawn(|| { + let aff = affinity::get_thread_affinity().unwrap(); + assert_eq!(aff, [4, 5, 6, 7], "Default thread allocation should be 4-7"); + + let tt = std::thread::spawn(|| { + let aff = affinity::get_thread_affinity().unwrap(); + assert_eq!( + aff, + [4, 5, 6, 7], + "Nested thread allocation should still be 4-7" + ); + }); + tt.join().unwrap(); + }); + let _rr = rayon_pool.broadcast(|ctx| { + let aff = affinity::get_thread_affinity().unwrap(); + println!("Rayon thread {} reporting", ctx.index()); + assert_eq!( + aff, + [1, 2, 3], + "Rayon thread allocation should still be 1-3" + ); + }); + t.join().unwrap(); + t2.join().unwrap(); + } +} diff --git a/thread-manager/src/native_thread_runtime.rs b/thread-manager/src/native_thread_runtime.rs new file mode 100644 index 00000000000000..7c405d942d24bb --- /dev/null +++ b/thread-manager/src/native_thread_runtime.rs @@ -0,0 +1,120 @@ +use { + crate::policy::CoreAllocation, + anyhow::bail, + serde::{Deserialize, Serialize}, + std::sync::atomic::{AtomicUsize, Ordering}, + std::sync::Arc, +}; + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(default)] +pub struct NativeConfig { + pub core_allocation: CoreAllocation, + pub max_threads: usize, + pub priority: usize, + pub name_base: String, + pub stack_size_bytes: usize, +} + +impl Default for NativeConfig { + fn default() -> Self { + Self { + core_allocation: CoreAllocation::OsDefault, + max_threads: 10, + priority: 0, + stack_size_bytes: 2 * 1024 * 1024, + name_base: "thread".to_owned(), + } + } +} + +#[derive(Debug)] +pub struct NativeThreadRuntime { + pub id_count: AtomicUsize, + pub running_count: Arc, + pub config: NativeConfig, +} + +pub struct JoinHandle { + std_handle: Option>, + running_count: Arc, +} + +impl JoinHandle { + fn join_inner(&mut self) -> Result> { + let r = match self.std_handle.take() { + Some(jh) => { + let r = jh.join(); + self.running_count.fetch_sub(1, Ordering::SeqCst); + r + } + None => { + panic!("Thread already joined"); + } + }; + dbg!(self.std_handle.is_some()); + r + } + + pub fn join(mut self) -> Result> { + self.join_inner() + } + + pub fn is_finished(&self) -> bool { + match self.std_handle { + Some(ref jh) => jh.is_finished(), + None => true, + } + } +} + +impl Drop for JoinHandle { + fn drop(&mut self) { + if self.std_handle.is_some() { + println!("Attempting to drop a Join Handle of a running thread will leak thread IDs, please join your managed threads!"); + self.join_inner().expect("Child thread panicked"); + } + } +} + +impl NativeThreadRuntime { + pub fn new(cfg: NativeConfig) -> Self { + Self { + id_count: AtomicUsize::new(0), + running_count: Arc::new(AtomicUsize::new(0)), + config: cfg, + } + } + pub fn spawn(&self, f: F) -> anyhow::Result> + where + F: FnOnce() -> T, + F: Send + 'static, + T: Send + 'static, + { + let spawned = self.running_count.load(Ordering::SeqCst); + if spawned >= self.config.max_threads { + bail!("All allowed threads in this pool are already spawned"); + } + let core_set: Vec<_> = match self.config.core_allocation { + CoreAllocation::PinnedCores { min: _, max: _ } => { + todo!("Need to store pinning mask somewhere"); + } + CoreAllocation::DedicatedCoreSet { min, max } => (min..max).collect(), + CoreAllocation::OsDefault => (0..affinity::get_core_num()).collect(), + }; + + let n = self.id_count.fetch_add(1, Ordering::SeqCst); + let jh = std::thread::Builder::new() + .name(format!("{}-{}", &self.config.name_base, n)) + .stack_size(self.config.stack_size_bytes) + .spawn(move || { + affinity::set_thread_affinity(core_set).expect("Can not set thread affinity"); + f() + })?; + self.running_count.fetch_add(1, Ordering::SeqCst); + Ok(JoinHandle { + std_handle: Some(jh), + running_count: self.running_count.clone(), + }) + } +} diff --git a/thread-manager/src/policy.rs b/thread-manager/src/policy.rs new file mode 100644 index 00000000000000..8c5758bbfa7dce --- /dev/null +++ b/thread-manager/src/policy.rs @@ -0,0 +1,26 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Default, Debug, Clone, Serialize, Deserialize)] +pub enum CoreAllocation { + ///Use OS default allocation (i.e. do not alter core affinity) + #[default] + OsDefault, + ///Pin each thread to a core in given range. Number of cores should be >= number of threads + PinnedCores { min: usize, max: usize }, + ///Pin the threads to a set of cores + DedicatedCoreSet { min: usize, max: usize }, +} + +impl CoreAllocation { + /// Converts into a vector of core IDs. OsDefault is converted to empty vector. + pub fn as_core_mask_vector(&self) -> Vec { + match *self { + CoreAllocation::PinnedCores { min, max } => (min..max).collect(), + CoreAllocation::DedicatedCoreSet { min, max } => (min..max).collect(), + CoreAllocation::OsDefault => vec![], + } + } +} + +///Applies policy to the calling thread +pub fn apply_policy(alloc: CoreAllocation, priority: u32) {} diff --git a/thread-manager/src/rayon_runtime.rs b/thread-manager/src/rayon_runtime.rs new file mode 100644 index 00000000000000..642711c12b8314 --- /dev/null +++ b/thread-manager/src/rayon_runtime.rs @@ -0,0 +1,44 @@ +use { + crate::policy::CoreAllocation, + anyhow::Ok, + serde::{Deserialize, Serialize}, +}; + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(default)] +pub struct RayonConfig { + pub worker_threads: usize, + pub priority: u32, + pub stack_size_bytes: usize, + pub core_allocation: CoreAllocation, +} + +impl Default for RayonConfig { + fn default() -> Self { + Self { + core_allocation: CoreAllocation::OsDefault, + worker_threads: 4, + priority: 0, + stack_size_bytes: 2 * 1024 * 1024, + } + } +} + +#[derive(Debug)] +pub struct RayonRuntime { + pub rayon_pool: rayon::ThreadPool, + pub config: RayonConfig, +} + +impl RayonRuntime { + fn new(config: RayonConfig) -> anyhow::Result { + let policy = config.core_allocation; + let rayon_pool = rayon::ThreadPoolBuilder::new() + .num_threads(config.worker_threads) + .start_handler(move |idx| { + affinity::set_thread_affinity([1, 2, 3]).unwrap(); + }) + .build()?; + Ok(Self { rayon_pool, config }) + } +} diff --git a/thread-manager/src/runtime_manager.rs b/thread-manager/src/runtime_manager.rs new file mode 100644 index 00000000000000..e69de29bb2d1d6 diff --git a/thread-manager/src/tokio_runtime.rs b/thread-manager/src/tokio_runtime.rs new file mode 100644 index 00000000000000..e5d2efda70b2e6 --- /dev/null +++ b/thread-manager/src/tokio_runtime.rs @@ -0,0 +1,140 @@ +use { + crate::policy::CoreAllocation, + serde::{Deserialize, Serialize}, + std::{ + future::Future, + sync::{ + atomic::{AtomicUsize, Ordering}, + Mutex, + }, + }, + thread_priority::ThreadExt, +}; + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(default)] +pub struct TokioConfig { + ///number of worker threads tokio is allowed to spawn + pub worker_threads: usize, + ///max number of blocking threads tokio is allowed to spawn + pub max_blocking_threads: usize, + pub priority: u32, + pub stack_size_bytes: usize, + pub event_interval: u32, + pub core_allocation: CoreAllocation, +} + +impl Default for TokioConfig { + fn default() -> Self { + Self { + core_allocation: CoreAllocation::OsDefault, + worker_threads: 1, + max_blocking_threads: 1, + priority: 0, + stack_size_bytes: 2 * 1024 * 1024, + event_interval: 61, + } + } +} + +#[derive(Debug)] +pub struct TokioRuntime { + pub(crate) tokio: tokio::runtime::Runtime, + pub config: TokioConfig, +} +impl TokioRuntime { + pub(crate) fn new(name: String, cfg: TokioConfig) -> anyhow::Result { + let num_workers = if cfg.worker_threads == 0 { + affinity::get_core_num() + } else { + cfg.worker_threads + }; + let chosen_cores_mask = cfg.core_allocation.as_core_mask_vector(); + + let base_name = name.clone(); + println!( + "Assigning {:?} to runtime {}", + &chosen_cores_mask, &base_name + ); + let mut builder = match num_workers { + 1 => tokio::runtime::Builder::new_current_thread(), + _ => { + let mut builder = tokio::runtime::Builder::new_multi_thread(); + builder.worker_threads(num_workers); + builder + } + }; + let atomic_id: AtomicUsize = AtomicUsize::new(0); + builder + .event_interval(cfg.event_interval) + .thread_name_fn(move || { + let id = atomic_id.fetch_add(1, Ordering::SeqCst); + format!("{}-{}", base_name, id) + }) + .thread_stack_size(cfg.stack_size_bytes) + .enable_all() + .max_blocking_threads(cfg.max_blocking_threads); + + //keep borrow checker happy and move these things into the closure + let c = cfg.clone(); + let chosen_cores_mask = Mutex::new(chosen_cores_mask); + builder.on_thread_start(move || { + let cur_thread = std::thread::current(); + let _tid = cur_thread + .get_native_id() + .expect("Can not get thread id for newly created thread"); + let tname = cur_thread.name().unwrap(); + //println!("thread {tname} id {tid} started"); + std::thread::current() + .set_priority(thread_priority::ThreadPriority::Crossplatform( + (c.priority as u8).try_into().unwrap(), + )) + .expect("Can not set thread priority!"); + + match c.core_allocation { + CoreAllocation::PinnedCores { min: _, max: _ } => { + let mut lg = chosen_cores_mask + .lock() + .expect("Can not lock core mask mutex"); + let core = lg + .pop() + .expect("Not enough cores provided for pinned allocation"); + println!("Pinning worker {tname} to core {core}"); + affinity::set_thread_affinity([core]) + .expect("Can not set thread affinity for runtime worker"); + } + CoreAllocation::DedicatedCoreSet { min: _, max: _ } => { + let lg = chosen_cores_mask + .lock() + .expect("Can not lock core mask mutex"); + affinity::set_thread_affinity(&(*lg)) + .expect("Can not set thread affinity for runtime worker"); + } + CoreAllocation::OsDefault => {} + } + }); + Ok(TokioRuntime { + tokio: builder.build()?, + config: cfg.clone(), + }) + } + /* This is bad idea... + pub fn spawn(&self, fut: F)->::Output + where F: Future + { + self.tokio.spawn(fut) + } + pub fn spawn_blocking(&self, fut: F)->::Output + where F: Future + { + self.spawn(fut) + } + */ + pub fn start(&self, fut: F) -> F::Output + where + F: Future, + { + // the thread that calls block_on does not need its affinity messed with here + self.tokio.block_on(fut) + } +}