diff --git a/Cargo.lock b/Cargo.lock index 6730cf067af3df..b70a22fb5addc4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -63,6 +63,18 @@ dependencies = [ "zeroize", ] +[[package]] +name = "affinity" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "763e484feceb7dd021b21c5c6f81aee06b1594a743455ec7efbf72e6355e447b" +dependencies = [ + "cfg-if 1.0.0", + "errno", + "libc", + "num_cpus", +] + [[package]] name = "agave-accounts-hash-cache-tool" version = "2.2.0" @@ -85,7 +97,7 @@ dependencies = [ "clap 2.33.3", "flate2", "hex", - "hyper", + "hyper 0.14.31", "log", "serde", "serde_derive", @@ -232,6 +244,25 @@ dependencies = [ "solana-version", ] +[[package]] +name = "agave-thread-manager" +version = "2.2.0" +dependencies = [ + "affinity", + "anyhow", + "axum 0.7.9", + "env_logger", + "log", + "num_cpus", + "rayon", + "serde", + "serde_json", + "solana-metrics", + "thread-priority", + "tokio", + "toml 0.8.12", +] + [[package]] name = "agave-transaction-view" version = "2.2.0" @@ -431,9 +462,9 @@ dependencies = [ [[package]] name = "anstyle" -version = "1.0.1" +version = "1.0.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3a30da5c5f2d5e72842e00bcb57657162cdabef0931f40e2deb9b4140440cecd" +checksum = "55cc3b69f167a1ef2e161439aa98aed94e6028e5f9a59be9a6ffb47aef1651f9" [[package]] name = "anyhow" @@ -777,13 +808,13 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3b829e4e32b91e643de6eafe82b1d90675f5874230191a4ffbc1b336dec4d6bf" dependencies = [ "async-trait", - "axum-core", + "axum-core 0.3.4", "bitflags 1.3.2", "bytes", "futures-util", - "http", - "http-body", - "hyper", + "http 0.2.12", + "http-body 0.4.5", + "hyper 0.14.31", "itoa", "matchit", "memchr", @@ -792,12 +823,46 @@ dependencies = [ "pin-project-lite", "rustversion", "serde", - "sync_wrapper", - "tower", + "sync_wrapper 0.1.2", + "tower 0.4.13", "tower-layer", "tower-service", ] +[[package]] +name = "axum" +version = "0.7.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "edca88bc138befd0323b20752846e6587272d3b03b0343c8ea28a6f819e6e71f" +dependencies = [ + "async-trait", + "axum-core 0.4.5", + "bytes", + "futures-util", + "http 1.1.0", + "http-body 1.0.1", + "http-body-util", + "hyper 1.5.1", + "hyper-util", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding 2.3.1", + "pin-project-lite", + "rustversion", + "serde", + "serde_json", + "serde_path_to_error", + "serde_urlencoded", + "sync_wrapper 1.0.2", + "tokio", + "tower 0.5.1", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "axum-core" version = "0.3.4" @@ -807,12 +872,33 @@ dependencies = [ "async-trait", "bytes", "futures-util", - "http", - "http-body", + "http 0.2.12", + "http-body 0.4.5", + "mime", + "rustversion", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum-core" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09f2bd6146b97ae3359fa0cc6d6b376d9539582c7b4220f041a33ec24c226199" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http 1.1.0", + "http-body 1.0.1", + "http-body-util", "mime", + "pin-project-lite", "rustversion", + "sync_wrapper 1.0.2", "tower-layer", "tower-service", + "tracing", ] [[package]] @@ -2208,13 +2294,13 @@ version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f4b0ea5ef6dc2388a4b1669fa32097249bc03a15417b97cb75e38afb309e4a89" dependencies = [ - "http", + "http 0.2.12", "prost", "tokio", "tokio-stream", "tonic", "tonic-build", - "tower", + "tower 0.4.13", "tower-service", ] @@ -2692,7 +2778,7 @@ dependencies = [ "futures-core", "futures-sink", "futures-util", - "http", + "http 0.2.12", "indexmap 2.7.0", "slab", "tokio", @@ -2759,7 +2845,7 @@ dependencies = [ "bitflags 1.3.2", "bytes", "headers-core", - "http", + "http 0.2.12", "httpdate", "mime", "sha-1 0.10.0", @@ -2771,7 +2857,7 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e7f66481bfee273957b1f20485a4ff3362987f85b2c236580d81b4eb7a326429" dependencies = [ - "http", + "http 0.2.12", ] [[package]] @@ -2861,6 +2947,17 @@ dependencies = [ "itoa", ] +[[package]] +name = "http" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "21b9ddb458710bc376481b842f5da65cdf31522de232c1ca8146abce2a358258" +dependencies = [ + "bytes", + "fnv", + "itoa", +] + [[package]] name = "http-body" version = "0.4.5" @@ -2868,7 +2965,30 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d5f38f16d184e36f2408a55281cd658ecbd3ca05cce6d6510a176eca393e26d1" dependencies = [ "bytes", - "http", + "http 0.2.12", + "pin-project-lite", +] + +[[package]] +name = "http-body" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184" +dependencies = [ + "bytes", + "http 1.1.0", +] + +[[package]] +name = "http-body-util" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "793429d76616a256bcb62c2a2ec2bed781c8307e797e2598c50010f2bee2544f" +dependencies = [ + "bytes", + "futures-util", + "http 1.1.0", + "http-body 1.0.1", "pin-project-lite", ] @@ -2901,8 +3021,8 @@ dependencies = [ "futures-core", "futures-util", "h2", - "http", - "http-body", + "http 0.2.12", + "http-body 0.4.5", "httparse", "httpdate", "itoa", @@ -2914,6 +3034,25 @@ dependencies = [ "want", ] +[[package]] +name = "hyper" +version = "1.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97818827ef4f364230e16705d4706e2897df2bb60617d6ca15d598025a3c481f" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "http 1.1.0", + "http-body 1.0.1", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "smallvec", + "tokio", +] + [[package]] name = "hyper-proxy" version = "0.9.1" @@ -2923,8 +3062,8 @@ dependencies = [ "bytes", "futures 0.3.31", "headers", - "http", - "hyper", + "http 0.2.12", + "hyper 0.14.31", "hyper-tls", "native-tls", "tokio", @@ -2939,8 +3078,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8d78e1e73ec14cf7375674f74d7dde185c8206fd9dea6fb6295e8a98098aaa97" dependencies = [ "futures-util", - "http", - "hyper", + "http 0.2.12", + "hyper 0.14.31", "rustls 0.21.12", "tokio", "tokio-rustls", @@ -2952,7 +3091,7 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" dependencies = [ - "hyper", + "hyper 0.14.31", "pin-project-lite", "tokio", "tokio-io-timeout", @@ -2965,12 +3104,28 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" dependencies = [ "bytes", - "hyper", + "hyper 0.14.31", "native-tls", "tokio", "tokio-native-tls", ] +[[package]] +name = "hyper-util" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df2dcfbe0677734ab2f3ffa7fa7bfd4706bfdc1ef393f2ee30184aed67e631b4" +dependencies = [ + "bytes", + "futures-util", + "http 1.1.0", + "http-body 1.0.1", + "hyper 1.5.1", + "pin-project-lite", + "tokio", + "tower-service", +] + [[package]] name = "iana-time-zone" version = "0.1.46" @@ -3393,7 +3548,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e1dea6e07251d9ce6a552abfb5d7ad6bc290a4596c8dcc3d795fae2bbdc1f3ff" dependencies = [ "futures 0.3.31", - "hyper", + "hyper 0.14.31", "jsonrpc-core", "jsonrpc-server-utils", "log", @@ -5012,9 +5167,9 @@ dependencies = [ "futures-core", "futures-util", "h2", - "http", - "http-body", - "hyper", + "http 0.2.12", + "http-body 0.4.5", + "hyper 0.14.31", "hyper-rustls", "hyper-tls", "ipnet", @@ -5031,7 +5186,7 @@ dependencies = [ "serde", "serde_json", "serde_urlencoded", - "sync_wrapper", + "sync_wrapper 0.1.2", "system-configuration", "tokio", "tokio-native-tls", @@ -5054,7 +5209,7 @@ checksum = "5a735987236a8e238bf0296c7e351b999c188ccc11477f311b82b55c93984216" dependencies = [ "anyhow", "async-trait", - "http", + "http 0.2.12", "reqwest", "serde", "task-local-extensions", @@ -5484,6 +5639,16 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_path_to_error" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af99884400da37c88f5e9146b7f1fd0fbcae8f6eec4e9da38b67d05486f814a6" +dependencies = [ + "itoa", + "serde", +] + [[package]] name = "serde_spanned" version = "0.6.5" @@ -9116,8 +9281,8 @@ dependencies = [ "flate2", "futures 0.3.31", "goauth", - "http", - "hyper", + "http 0.2.12", + "hyper 0.14.31", "hyper-proxy", "log", "openssl", @@ -10566,6 +10731,12 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" +[[package]] +name = "sync_wrapper" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0bf256ce5efdfa370213c1dabab5935a12e49f2c58d15e9eac2870d3b4f27263" + [[package]] name = "synstructure" version = "0.12.6" @@ -10826,6 +10997,20 @@ dependencies = [ "syn 2.0.90", ] +[[package]] +name = "thread-priority" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cfe075d7053dae61ac5413a34ea7d4913b6e6207844fd726bdd858b37ff72bf5" +dependencies = [ + "bitflags 2.6.0", + "cfg-if 1.0.0", + "libc", + "log", + "rustversion", + "winapi 0.3.9", +] + [[package]] name = "thread-scoped" version = "1.0.2" @@ -11125,15 +11310,15 @@ checksum = "3082666a3a6433f7f511c7192923fa1fe07c69332d3c6a2e6bb040b569199d5a" dependencies = [ "async-stream", "async-trait", - "axum", + "axum 0.6.20", "base64 0.21.7", "bytes", "futures-core", "futures-util", "h2", - "http", - "http-body", - "hyper", + "http 0.2.12", + "http-body 0.4.5", + "hyper 0.14.31", "hyper-timeout", "percent-encoding 2.3.1", "pin-project", @@ -11142,7 +11327,7 @@ dependencies = [ "tokio", "tokio-rustls", "tokio-stream", - "tower", + "tower 0.4.13", "tower-layer", "tower-service", "tracing", @@ -11181,17 +11366,33 @@ dependencies = [ "tracing", ] +[[package]] +name = "tower" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2873938d487c3cfb9aed7546dc9f2711d867c9f90c46b889989a2cb84eba6b4f" +dependencies = [ + "futures-core", + "futures-util", + "pin-project-lite", + "sync_wrapper 0.1.2", + "tokio", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "tower-layer" -version = "0.3.2" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c20c8dbed6283a09604c3e69b4b7eeb54e298b8a600d4d5ecb5ad39de609f1d0" +checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e" [[package]] name = "tower-service" -version = "0.3.2" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52" +checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3" [[package]] name = "tracing" @@ -11270,7 +11471,7 @@ dependencies = [ "byteorder", "bytes", "data-encoding", - "http", + "http 0.2.12", "httparse", "log", "rand 0.8.5", diff --git a/Cargo.toml b/Cargo.toml index 6055802e73c5a1..666ef71dc60d1b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -198,6 +198,7 @@ members = [ "svm-transaction", "test-validator", "thin-client", + "thread-manager", "timings", "tls-utils", "tokens", @@ -446,6 +447,7 @@ solana-bucket-map = { path = "bucket_map", version = "=2.2.0" } solana-builtins = { path = "builtins", version = "=2.2.0" } solana-builtins-default-costs = { path = "builtins-default-costs", version = "=2.2.0" } agave-cargo-registry = { path = "cargo-registry", version = "=2.2.0" } +agave-thread-manager = { path = "thread-manager", version = "=2.2.0" } solana-clap-utils = { path = "clap-utils", version = "=2.2.0" } solana-clap-v3-utils = { path = "clap-v3-utils", version = "=2.2.0" } solana-cli = { path = "cli", version = "=2.2.0" } diff --git a/thread-manager/Cargo.toml b/thread-manager/Cargo.toml new file mode 100644 index 00000000000000..0c760371ddfe0e --- /dev/null +++ b/thread-manager/Cargo.toml @@ -0,0 +1,31 @@ +[package] +name = "agave-thread-manager" +description = "Thread pool manager for agave" + +version = { workspace = true } +authors = { workspace = true } +repository = { workspace = true } +homepage = { workspace = true } +license = { workspace = true } +edition = { workspace = true } + +publish = false + +[dependencies] +anyhow = { workspace = true } +log = { workspace = true } +num_cpus = { workspace = true } +rayon = { workspace = true } +serde = { workspace = true, features = ["derive"] } +solana-metrics = { workspace = true } +thread-priority = "1.2.0" +tokio = { workspace = true, features = ["time", "rt-multi-thread"] } + +[target.'cfg(target_os = "linux")'.dependencies] +affinity = "0.1.2" + +[dev-dependencies] +axum = "0.7.9" +env_logger = { workspace = true } +serde_json = { workspace = true } +toml = { workspace = true } diff --git a/thread-manager/README.md b/thread-manager/README.md new file mode 100644 index 00000000000000..61f81f314f2ea6 --- /dev/null +++ b/thread-manager/README.md @@ -0,0 +1,43 @@ +# thread-manager +Balances machine resources between multiple threaded runtimes. The purpose is to manage thread contention +between different parts of the code that may +benefit from a diverse set of management options. For example, we may want to have cores 1-4 handling +networking via Tokio, core 5 handling file IO via Tokio, cores 9-16 hallocated for Rayon thread pool, +and cores 6-8 available for general use by std::thread. This will minimize contention for CPU caches +and context switches that would occur if Rayon was entirely unaware it was running side-by-side with +tokio, and each was to spawn as many threads as there are cores. + +# Supported threading models +## Tokio +Multiple tokio runtimes can be created, and each may be assigned its own pool of CPU cores to run on. +Number of worker and blocking threads is configurable, as are thread priorities for the pool. + +## Native +Native threads (std::thread) can be spawned from managed pools, this allows them to inheirt a particular +affinity from the pool, as well as to +control the total number of threads made in every pool. + +## Rayon +Rayon already manages thread pools well enough, all thread_manager does on top is enforce affinity and +priority for rayon threads. Normally one would only ever have one rayon pool, but for priority allocations +one may want to spawn many rayon pools. + +# Limitations + + * Thread pools can only be created at process startup + * Once thread pool is created, its policy can not be modified at runtime + * Thread affinity not supported outside of linux + +# TODO: + + * support tracing + * better metrics integration + * proper error handling everywhere + * even more tests + + +# Examples +All examples need wrk for workload generation. Please install it before running. + + * core_contention_basics will demonstrate why core contention is bad, and how thread configs can help + * core_contention_sweep will sweep across a range of core counts to show how benefits scale with core counts diff --git a/thread-manager/examples/core_contention_basics.rs b/thread-manager/examples/core_contention_basics.rs new file mode 100644 index 00000000000000..ea481a707893b8 --- /dev/null +++ b/thread-manager/examples/core_contention_basics.rs @@ -0,0 +1,137 @@ +use { + agave_thread_manager::*, + log::{debug, info}, + std::{ + future::IntoFuture, + io::{Read, Write}, + net::{IpAddr, Ipv4Addr, SocketAddr}, + path::PathBuf, + time::Duration, + }, +}; + +async fn axum_main(port: u16) { + use axum::{routing::get, Router}; + + // basic handler that responds with a static string + async fn root() -> &'static str { + tokio::time::sleep(Duration::from_millis(1)).await; + "Hello, World!" + } + + // build our application with a route + let app = Router::new().route("/", get(root)); + + // run our app with hyper, listening globally on port 3000 + let listener = + tokio::net::TcpListener::bind(SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), port)) + .await + .unwrap(); + let timeout = tokio::time::timeout( + Duration::from_secs(11), + axum::serve(listener, app).into_future(), + ) + .await; + match timeout { + Ok(v) => v.unwrap(), + Err(_) => { + info!("Terminating server on port {port}"); + } + } +} + +fn main() -> anyhow::Result<()> { + env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init(); + let experiments = [ + "examples/core_contention_dedicated_set.toml", + "examples/core_contention_contending_set.toml", + ]; + + for exp in experiments { + info!("==================="); + info!("Running {exp}"); + let mut conf_file = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + conf_file.push(exp); + let mut buf = String::new(); + std::fs::File::open(conf_file)?.read_to_string(&mut buf)?; + let cfg: RuntimeManagerConfig = toml::from_str(&buf)?; + + let manager = ThreadManager::new(cfg).unwrap(); + let tokio1 = manager + .get_tokio("axum1") + .expect("Expecting runtime named axum1"); + tokio1.start_metrics_sampling(Duration::from_secs(1)); + let tokio2 = manager + .get_tokio("axum2") + .expect("Expecting runtime named axum2"); + tokio2.start_metrics_sampling(Duration::from_secs(1)); + + let wrk_cores: Vec<_> = (32..64).collect(); + let results = std::thread::scope(|scope| { + scope.spawn(|| { + tokio1.tokio.block_on(axum_main(8888)); + }); + scope.spawn(|| { + tokio2.tokio.block_on(axum_main(8889)); + }); + let join_handle = + scope.spawn(|| run_wrk(&[8888, 8889], &wrk_cores, wrk_cores.len(), 1000).unwrap()); + join_handle.join().expect("WRK crashed!") + }); + //print out the results of the bench run + println!("Results are: {:?}", results); + } + Ok(()) +} + +fn run_wrk( + ports: &[u16], + cpus: &[usize], + threads: usize, + connections: usize, +) -> anyhow::Result<(Vec, Vec)> { + let mut script = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + script.push("examples/report.lua"); + let cpus: Vec = cpus.iter().map(|c| c.to_string()).collect(); + let cpus = cpus.join(","); + + let mut children: Vec<_> = ports + .iter() + .map(|p| { + std::process::Command::new("taskset") + .arg("-c") + .arg(&cpus) + .arg("wrk") + .arg(format!("http://localhost:{}", p)) + .arg("-d10") + .arg(format!("-s{}", script.to_str().unwrap())) + .arg(format!("-t{threads}")) + .arg(format!("-c{connections}")) + .stdout(std::process::Stdio::piped()) + .stderr(std::process::Stdio::piped()) + .spawn() + .unwrap() + }) + .collect(); + + use std::str; + let outs = children.drain(..).map(|c| c.wait_with_output().unwrap()); + let mut all_latencies = vec![]; + let mut all_rps = vec![]; + for (out, port) in outs.zip(ports.iter()) { + debug!("========================="); + std::io::stdout().write_all(&out.stderr)?; + let res = str::from_utf8(&out.stdout)?; + let mut res = res.lines().last().unwrap().split(' '); + + let latency_us: u64 = res.next().unwrap().parse()?; + let latency = Duration::from_micros(latency_us); + + let requests: usize = res.next().unwrap().parse()?; + let rps = requests as f32 / 10.0; + debug!("WRK results for port {port}: {latency:?} {rps}"); + all_latencies.push(Duration::from_micros(latency_us)); + all_rps.push(rps); + } + Ok((all_latencies, all_rps)) +} diff --git a/thread-manager/examples/core_contention_contending_set.toml b/thread-manager/examples/core_contention_contending_set.toml new file mode 100644 index 00000000000000..e383987a5a432c --- /dev/null +++ b/thread-manager/examples/core_contention_contending_set.toml @@ -0,0 +1,13 @@ +[native_configs] + +[rayon_configs] + +[tokio_configs.axum1] +worker_threads = 8 +max_blocking_threads = 1 +core_allocation.DedicatedCoreSet = { min = 0, max = 8 } + +[tokio_configs.axum2] +worker_threads = 8 +max_blocking_threads = 1 +core_allocation.DedicatedCoreSet = { min = 0, max = 8 } diff --git a/thread-manager/examples/core_contention_dedicated_set.toml b/thread-manager/examples/core_contention_dedicated_set.toml new file mode 100644 index 00000000000000..a82af7d9f5fd47 --- /dev/null +++ b/thread-manager/examples/core_contention_dedicated_set.toml @@ -0,0 +1,13 @@ +[native_configs] + +[rayon_configs] + +[tokio_configs.axum1] +worker_threads = 4 +max_blocking_threads = 1 +core_allocation.DedicatedCoreSet = { min = 0, max = 4 } + +[tokio_configs.axum2] +worker_threads = 4 +max_blocking_threads = 1 +core_allocation.DedicatedCoreSet = { min = 4, max = 8 } diff --git a/thread-manager/examples/core_contention_sweep.rs b/thread-manager/examples/core_contention_sweep.rs new file mode 100644 index 00000000000000..e466b3bae05086 --- /dev/null +++ b/thread-manager/examples/core_contention_sweep.rs @@ -0,0 +1,237 @@ +use { + agave_thread_manager::*, + log::{debug, info}, + std::{ + collections::HashMap, + future::IntoFuture, + io::Write, + net::{IpAddr, Ipv4Addr, SocketAddr}, + path::PathBuf, + time::Duration, + }, +}; + +async fn axum_main(port: u16) { + use axum::{routing::get, Router}; + // basic handler that responds with a static string + async fn root() -> &'static str { + tokio::time::sleep(Duration::from_millis(1)).await; + "Hello, World!" + } + + // build our application with a route + let app = Router::new().route("/", get(root)); + + // run our app with hyper, listening globally on port 3000 + let listener = + tokio::net::TcpListener::bind(SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), port)) + .await + .unwrap(); + info!("Server on port {port} ready"); + let timeout = tokio::time::timeout( + Duration::from_secs(11), + axum::serve(listener, app).into_future(), + ) + .await; + match timeout { + Ok(v) => v.unwrap(), + Err(_) => { + info!("Terminating server on port {port}"); + } + } +} +fn make_config_shared(cc: usize) -> RuntimeManagerConfig { + let tokio_cfg_1 = TokioConfig { + core_allocation: CoreAllocation::DedicatedCoreSet { min: 0, max: cc }, + worker_threads: cc, + ..Default::default() + }; + let tokio_cfg_2 = tokio_cfg_1.clone(); + RuntimeManagerConfig { + tokio_configs: HashMap::from([ + ("axum1".into(), tokio_cfg_1), + ("axum2".into(), tokio_cfg_2), + ]), + ..Default::default() + } +} +fn make_config_dedicated(core_count: usize) -> RuntimeManagerConfig { + let tokio_cfg_1 = TokioConfig { + core_allocation: CoreAllocation::DedicatedCoreSet { + min: 0, + max: core_count / 2, + }, + worker_threads: core_count / 2, + ..Default::default() + }; + let tokio_cfg_2 = TokioConfig { + core_allocation: CoreAllocation::DedicatedCoreSet { + min: core_count / 2, + max: core_count, + }, + worker_threads: core_count / 2, + ..Default::default() + }; + RuntimeManagerConfig { + tokio_configs: HashMap::from([ + ("axum1".into(), tokio_cfg_1), + ("axum2".into(), tokio_cfg_2), + ]), + ..Default::default() + } +} + +#[derive(Debug, PartialEq, Eq, Clone, Copy)] +enum Regime { + Shared, + Dedicated, + Single, +} +impl Regime { + const VALUES: [Self; 3] = [Self::Dedicated, Self::Shared, Self::Single]; +} + +#[derive(Debug, Default, serde::Serialize)] +struct Results { + latencies_s: Vec, + rps: Vec, +} + +fn main() -> anyhow::Result<()> { + env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init(); + let mut all_results: HashMap = HashMap::new(); + for regime in Regime::VALUES { + let mut results = Results::default(); + for core_count in [2, 4, 8, 16] { + let manager; + info!("==================="); + info!("Running {core_count} cores under {regime:?}"); + let (tokio1, tokio2) = match regime { + Regime::Shared => { + manager = ThreadManager::new(make_config_shared(core_count)).unwrap(); + ( + manager + .get_tokio("axum1") + .expect("Expecting runtime named axum1"), + manager + .get_tokio("axum2") + .expect("Expecting runtime named axum2"), + ) + } + Regime::Dedicated => { + manager = ThreadManager::new(make_config_dedicated(core_count)).unwrap(); + ( + manager + .get_tokio("axum1") + .expect("Expecting runtime named axum1"), + manager + .get_tokio("axum2") + .expect("Expecting runtime named axum2"), + ) + } + Regime::Single => { + manager = ThreadManager::new(make_config_shared(core_count)).unwrap(); + ( + manager + .get_tokio("axum1") + .expect("Expecting runtime named axum1"), + manager + .get_tokio("axum2") + .expect("Expecting runtime named axum2"), + ) + } + }; + + let wrk_cores: Vec<_> = (32..64).collect(); + let measurement = std::thread::scope(|s| { + s.spawn(|| { + tokio1.start_metrics_sampling(Duration::from_secs(1)); + tokio1.tokio.block_on(axum_main(8888)); + }); + let jh = match regime { + Regime::Single => s.spawn(|| { + run_wrk(&[8888, 8888], &wrk_cores, wrk_cores.len(), 3000).unwrap() + }), + _ => { + s.spawn(|| { + tokio2.start_metrics_sampling(Duration::from_secs(1)); + tokio2.tokio.block_on(axum_main(8889)); + }); + s.spawn(|| { + run_wrk(&[8888, 8889], &wrk_cores, wrk_cores.len(), 3000).unwrap() + }) + } + }; + jh.join().expect("WRK crashed!") + }); + info!("Results are: {:?}", measurement); + results.latencies_s.push( + measurement.0.iter().map(|a| a.as_secs_f32()).sum::() + / measurement.0.len() as f32, + ); + results.rps.push(measurement.1.iter().sum()); + } + all_results.insert(format!("{regime:?}"), results); + std::thread::sleep(Duration::from_secs(3)); + } + + //print the resulting measurements so they can be e.g. plotted with matplotlib + println!("{}", serde_json::to_string_pretty(&all_results)?); + + Ok(()) +} + +fn run_wrk( + ports: &[u16], + cpus: &[usize], + threads: usize, + connections: usize, +) -> anyhow::Result<(Vec, Vec)> { + //Sleep a bit to let axum start + std::thread::sleep(Duration::from_millis(500)); + + let mut script = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + script.push("examples/report.lua"); + let cpus: Vec = cpus.iter().map(|c| c.to_string()).collect(); + let cpus = cpus.join(","); + + let mut children: Vec<_> = ports + .iter() + .map(|p| { + std::process::Command::new("taskset") + .arg("-c") + .arg(&cpus) + .arg("wrk") + .arg(format!("http://localhost:{}", p)) + .arg("-d10") + .arg(format!("-s{}", script.to_str().unwrap())) + .arg(format!("-t{threads}")) + .arg(format!("-c{connections}")) + .stdout(std::process::Stdio::piped()) + .stderr(std::process::Stdio::piped()) + .spawn() + .unwrap() + }) + .collect(); + + use std::str; + let outs = children.drain(..).map(|c| c.wait_with_output().unwrap()); + let mut all_latencies = vec![]; + let mut all_rps = vec![]; + for (out, port) in outs.zip(ports.iter()) { + debug!("========================="); + std::io::stdout().write_all(&out.stderr)?; + let res = str::from_utf8(&out.stdout)?; + let mut res = res.lines().last().unwrap().split(' '); + + let latency_us: u64 = res.next().unwrap().parse()?; + let latency = Duration::from_micros(latency_us); + + let requests: usize = res.next().unwrap().parse()?; + let rps = requests as f32 / 10.0; + debug!("WRK results for port {port}: {latency:?} {rps}"); + all_latencies.push(Duration::from_micros(latency_us)); + all_rps.push(rps); + } + Ok((all_latencies, all_rps)) +} diff --git a/thread-manager/examples/report.lua b/thread-manager/examples/report.lua new file mode 100644 index 00000000000000..e135db5528667d --- /dev/null +++ b/thread-manager/examples/report.lua @@ -0,0 +1,4 @@ +done = function(summary, latency, requests) + x = string.format("%d %d\n", latency.mean, summary.requests) + io.write(x) +end diff --git a/thread-manager/src/lib.rs b/thread-manager/src/lib.rs new file mode 100644 index 00000000000000..c439432cb20bd0 --- /dev/null +++ b/thread-manager/src/lib.rs @@ -0,0 +1,233 @@ +use { + anyhow::Ok, + serde::{Deserialize, Serialize}, + std::collections::HashMap, +}; + +pub mod native_thread_runtime; +pub mod policy; +pub mod rayon_runtime; +pub mod tokio_runtime; + +pub use { + native_thread_runtime::{JoinHandle, NativeConfig, NativeThreadRuntime}, + policy::CoreAllocation, + rayon_runtime::{RayonConfig, RayonRuntime}, + tokio_runtime::{TokioConfig, TokioRuntime}, +}; +pub type ConstString = Box; + +#[derive(Default, Debug)] +pub struct ThreadManager { + pub tokio_runtimes: HashMap, + pub tokio_runtime_mapping: HashMap, + + pub native_thread_runtimes: HashMap, + pub native_runtime_mapping: HashMap, + + pub rayon_runtimes: HashMap, + pub rayon_runtime_mapping: HashMap, +} + +#[derive(Default, Clone, Debug, Serialize, Deserialize)] +#[serde(default)] +pub struct RuntimeManagerConfig { + pub native_configs: HashMap, + pub native_runtime_mapping: HashMap, + + pub rayon_configs: HashMap, + pub rayon_runtime_mapping: HashMap, + + pub tokio_configs: HashMap, + pub tokio_runtime_mapping: HashMap, + + pub default_core_allocation: CoreAllocation, +} + +impl ThreadManager { + pub fn get_native(&self, name: &str) -> Option<&NativeThreadRuntime> { + let name = self.native_runtime_mapping.get(name)?; + self.native_thread_runtimes.get(name) + } + pub fn get_rayon(&self, name: &str) -> Option<&RayonRuntime> { + let name = self.rayon_runtime_mapping.get(name)?; + self.rayon_runtimes.get(name) + } + pub fn get_tokio(&self, name: &str) -> Option<&TokioRuntime> { + let name = self.tokio_runtime_mapping.get(name)?; + self.tokio_runtimes.get(name) + } + pub fn set_process_affinity(config: &RuntimeManagerConfig) -> anyhow::Result> { + let chosen_cores_mask = config.default_core_allocation.as_core_mask_vector(); + + crate::policy::set_thread_affinity(&chosen_cores_mask); + Ok(chosen_cores_mask) + } + + /// Populates mappings with copies of config names, overrides as appropriate + fn populate_mappings(&mut self, config: &RuntimeManagerConfig) { + //TODO: this should probably be cleaned up with a macro at some point... + + for name in config.native_configs.keys() { + self.native_runtime_mapping + .insert(name.clone().into_boxed_str(), name.clone().into_boxed_str()); + } + for (k, v) in config.native_runtime_mapping.iter() { + self.native_runtime_mapping + .insert(k.clone().into_boxed_str(), v.clone().into_boxed_str()); + } + + for name in config.tokio_configs.keys() { + self.tokio_runtime_mapping + .insert(name.clone().into_boxed_str(), name.clone().into_boxed_str()); + } + for (k, v) in config.tokio_runtime_mapping.iter() { + self.tokio_runtime_mapping + .insert(k.clone().into_boxed_str(), v.clone().into_boxed_str()); + } + + for name in config.rayon_configs.keys() { + self.rayon_runtime_mapping + .insert(name.clone().into_boxed_str(), name.clone().into_boxed_str()); + } + for (k, v) in config.rayon_runtime_mapping.iter() { + self.rayon_runtime_mapping + .insert(k.clone().into_boxed_str(), v.clone().into_boxed_str()); + } + } + pub fn new(config: RuntimeManagerConfig) -> anyhow::Result { + let mut core_allocations = HashMap::>::new(); + Self::set_process_affinity(&config)?; + let mut manager = Self::default(); + manager.populate_mappings(&config); + for (name, cfg) in config.native_configs.iter() { + let nrt = NativeThreadRuntime::new(name.clone(), cfg.clone()); + manager + .native_thread_runtimes + .insert(name.clone().into_boxed_str(), nrt); + } + for (name, cfg) in config.rayon_configs.iter() { + let rrt = RayonRuntime::new(name.clone(), cfg.clone())?; + manager + .rayon_runtimes + .insert(name.clone().into_boxed_str(), rrt); + } + + for (name, cfg) in config.tokio_configs.iter() { + let tokiort = TokioRuntime::new(name.clone(), cfg.clone())?; + + core_allocations.insert( + name.clone().into_boxed_str(), + cfg.core_allocation.as_core_mask_vector(), + ); + manager + .tokio_runtimes + .insert(name.clone().into_boxed_str(), tokiort); + } + Ok(manager) + } +} + +#[cfg(test)] +mod tests { + use { + crate::{CoreAllocation, NativeConfig, RayonConfig, RuntimeManagerConfig, ThreadManager}, + std::{collections::HashMap, io::Read}, + }; + + #[test] + fn configtest() { + let experiments = [ + "examples/core_contention_dedicated_set.toml", + "examples/core_contention_contending_set.toml", + ]; + + for exp in experiments { + println!("Loading config {exp}"); + let mut conffile = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")); + conffile.push(exp); + let mut buf = String::new(); + std::fs::File::open(conffile) + .unwrap() + .read_to_string(&mut buf) + .unwrap(); + let cfg: RuntimeManagerConfig = toml::from_str(&buf).unwrap(); + println!("{:?}", cfg); + } + } + // Nobody runs Agave on windows, and on Mac we can not set mask affinity without patching external crate + #[cfg(target_os = "linux")] + fn validate_affinity(expect_cores: &[usize], error_msg: &str) { + let affinity = affinity::get_thread_affinity().unwrap(); + assert_eq!(affinity, expect_cores, "{}", error_msg); + } + #[cfg(not(target_os = "linux"))] + fn validate_affinity(_expect_cores: &[usize], _error_msg: &str) {} + + #[test] + fn process_affinity() { + let conf = RuntimeManagerConfig { + native_configs: HashMap::from([( + "pool1".to_owned(), + NativeConfig { + core_allocation: CoreAllocation::DedicatedCoreSet { min: 0, max: 4 }, + max_threads: 5, + priority: 0, + ..Default::default() + }, + )]), + default_core_allocation: CoreAllocation::DedicatedCoreSet { min: 4, max: 8 }, + native_runtime_mapping: HashMap::from([("test".to_owned(), "pool1".to_owned())]), + ..Default::default() + }; + + let manager = ThreadManager::new(conf).unwrap(); + let runtime = manager.get_native("test").unwrap(); + + let thread1 = runtime + .spawn(|| { + validate_affinity(&[0, 1, 2, 3], "Managed thread allocation should be 0-3"); + }) + .unwrap(); + + let thread2 = std::thread::spawn(|| { + validate_affinity(&[4, 5, 6, 7], "Default thread allocation should be 4-7"); + + let inner_thread = std::thread::spawn(|| { + validate_affinity( + &[4, 5, 6, 7], + "Nested thread allocation should still be 4-7", + ); + }); + inner_thread.join().unwrap(); + }); + thread1.join().unwrap(); + thread2.join().unwrap(); + } + + #[test] + fn rayon_affinity() { + let conf = RuntimeManagerConfig { + rayon_configs: HashMap::from([( + "test".to_owned(), + RayonConfig { + core_allocation: CoreAllocation::DedicatedCoreSet { min: 1, max: 4 }, + worker_threads: 3, + priority: 0, + ..Default::default() + }, + )]), + default_core_allocation: CoreAllocation::DedicatedCoreSet { min: 4, max: 8 }, + + ..Default::default() + }; + + let manager = ThreadManager::new(conf).unwrap(); + let rayon_runtime = manager.get_rayon("test").unwrap(); + + let _rr = rayon_runtime.rayon_pool.broadcast(|ctx| { + println!("Rayon thread {} reporting", ctx.index()); + validate_affinity(&[1, 2, 3], "Rayon thread allocation should still be 1-3"); + }); + } +} diff --git a/thread-manager/src/native_thread_runtime.rs b/thread-manager/src/native_thread_runtime.rs new file mode 100644 index 00000000000000..f99db65ae9fe5c --- /dev/null +++ b/thread-manager/src/native_thread_runtime.rs @@ -0,0 +1,120 @@ +use { + crate::policy::{apply_policy, CoreAllocation}, + anyhow::bail, + log::error, + serde::{Deserialize, Serialize}, + solana_metrics::datapoint_info, + std::sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, Mutex, + }, +}; + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(default)] +pub struct NativeConfig { + pub core_allocation: CoreAllocation, + pub max_threads: usize, + pub priority: u8, + pub stack_size_bytes: usize, +} + +impl Default for NativeConfig { + fn default() -> Self { + Self { + core_allocation: CoreAllocation::OsDefault, + max_threads: 10, + priority: 0, + stack_size_bytes: 2 * 1024 * 1024, + } + } +} + +#[derive(Debug)] +pub struct NativeThreadRuntime { + pub id_count: AtomicUsize, + pub running_count: Arc, + pub config: NativeConfig, + pub name: String, +} + +pub struct JoinHandle { + std_handle: Option>, + running_count: Arc, +} + +impl JoinHandle { + fn join_inner(&mut self) -> std::thread::Result { + match self.std_handle.take() { + Some(jh) => { + let result = jh.join(); + let rc = self.running_count.fetch_sub(1, Ordering::Relaxed); + datapoint_info!("thread-manager-native", ("threads-running", rc, i64),); + result + } + None => { + panic!("Thread already joined"); + } + } + } + + pub fn join(mut self) -> std::thread::Result { + self.join_inner() + } + + pub fn is_finished(&self) -> bool { + match self.std_handle { + Some(ref jh) => jh.is_finished(), + None => true, + } + } +} + +impl Drop for JoinHandle { + fn drop(&mut self) { + if self.std_handle.is_some() { + error!("Attempting to drop a Join Handle of a running thread will leak thread IDs, please join your managed threads!"); + self.join_inner().expect("Child thread panicked"); + } + } +} + +impl NativeThreadRuntime { + pub fn new(name: String, cfg: NativeConfig) -> Self { + Self { + id_count: AtomicUsize::new(0), + running_count: Arc::new(AtomicUsize::new(0)), + config: cfg, + name, + } + } + pub fn spawn(&self, f: F) -> anyhow::Result> + where + F: FnOnce() -> T, + F: Send + 'static, + T: Send + 'static, + { + let spawned = self.running_count.load(Ordering::Relaxed); + if spawned >= self.config.max_threads { + bail!("All allowed threads in this pool are already spawned"); + } + + let core_alloc = self.config.core_allocation.clone(); + let priority = self.config.priority; + let chosen_cores_mask = Mutex::new(self.config.core_allocation.as_core_mask_vector()); + let n = self.id_count.fetch_add(1, Ordering::Relaxed); + let jh = std::thread::Builder::new() + .name(format!("{}-{}", &self.name, n)) + .stack_size(self.config.stack_size_bytes) + .spawn(move || { + apply_policy(&core_alloc, priority, &chosen_cores_mask); + f() + })?; + let rc = self.running_count.fetch_add(1, Ordering::Relaxed); + datapoint_info!("thread-manager-native", ("threads-running", rc as i64, i64),); + Ok(JoinHandle { + std_handle: Some(jh), + running_count: self.running_count.clone(), + }) + } +} diff --git a/thread-manager/src/policy.rs b/thread-manager/src/policy.rs new file mode 100644 index 00000000000000..cd975884459c1f --- /dev/null +++ b/thread-manager/src/policy.rs @@ -0,0 +1,83 @@ +use { + serde::{Deserialize, Serialize}, + std::sync::OnceLock, + thread_priority::ThreadExt, +}; + +static CORE_COUNT: OnceLock = OnceLock::new(); + +#[derive(Default, Debug, Clone, Serialize, Deserialize)] +pub enum CoreAllocation { + ///Use OS default allocation (i.e. do not alter core affinity) + #[default] + OsDefault, + ///Pin each thread to a core in given range. Number of cores should be >= number of threads + PinnedCores { min: usize, max: usize }, + ///Pin the threads to a set of cores + DedicatedCoreSet { min: usize, max: usize }, +} + +impl CoreAllocation { + /// Converts into a vector of core IDs. OsDefault is converted to empty vector. + pub fn as_core_mask_vector(&self) -> Vec { + let core_count = CORE_COUNT.get_or_init(num_cpus::get); + match *self { + CoreAllocation::PinnedCores { min, max } => (min..max).collect(), + CoreAllocation::DedicatedCoreSet { min, max } => (min..max).collect(), + CoreAllocation::OsDefault => Vec::from_iter(0..*core_count), + } + } +} + +#[cfg(target_os = "linux")] +pub fn set_thread_affinity(cores: &[usize]) { + assert!( + !cores.is_empty(), + "Can not call setaffinity with empty cores mask" + ); + if let Err(e) = affinity::set_thread_affinity(cores) { + let thread = std::thread::current(); + let msg = format!( + "Can not set core affinity {:?} for thread {:?} named {:?}, error {e}", + cores, + thread.id(), + thread.name() + ); + panic!("{}", msg); + } +} + +#[cfg(not(target_os = "linux"))] +pub fn set_thread_affinity(_cores: &[usize]) {} + +///Applies policy to the calling thread +pub fn apply_policy( + alloc: &CoreAllocation, + priority: u8, + chosen_cores_mask: &std::sync::Mutex>, +) { + std::thread::current() + .set_priority(thread_priority::ThreadPriority::Crossplatform( + (priority).try_into().unwrap(), + )) + .expect("Can not set thread priority!"); + + match alloc { + CoreAllocation::PinnedCores { min: _, max: _ } => { + let mut lg = chosen_cores_mask + .lock() + .expect("Can not lock core mask mutex"); + let core = lg + .pop() + .expect("Not enough cores provided for pinned allocation"); + set_thread_affinity(&[core]); + } + CoreAllocation::DedicatedCoreSet { min: _, max: _ } => { + let lg = chosen_cores_mask + .lock() + .expect("Can not lock core mask mutex"); + set_thread_affinity(&lg); + } + CoreAllocation::OsDefault => {} + } +} diff --git a/thread-manager/src/rayon_runtime.rs b/thread-manager/src/rayon_runtime.rs new file mode 100644 index 00000000000000..b731bd83051bcb --- /dev/null +++ b/thread-manager/src/rayon_runtime.rs @@ -0,0 +1,55 @@ +use { + crate::policy::{apply_policy, CoreAllocation}, + anyhow::Ok, + serde::{Deserialize, Serialize}, + solana_metrics::datapoint_info, + std::sync::{ + atomic::{AtomicI64, Ordering}, + Mutex, + }, +}; + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(default)] +pub struct RayonConfig { + pub worker_threads: usize, + pub priority: u8, + pub stack_size_bytes: usize, + pub core_allocation: CoreAllocation, +} + +impl Default for RayonConfig { + fn default() -> Self { + Self { + core_allocation: CoreAllocation::OsDefault, + worker_threads: 4, + priority: 0, + stack_size_bytes: 2 * 1024 * 1024, + } + } +} + +#[derive(Debug)] +pub struct RayonRuntime { + pub rayon_pool: rayon::ThreadPool, + pub config: RayonConfig, +} + +impl RayonRuntime { + pub fn new(name: String, config: RayonConfig) -> anyhow::Result { + let policy = config.core_allocation.clone(); + let chosen_cores_mask = Mutex::new(policy.as_core_mask_vector()); + let priority = config.priority; + let spawned_threads = AtomicI64::new(0); + let rayon_pool = rayon::ThreadPoolBuilder::new() + .num_threads(config.worker_threads) + .thread_name(move |i| format!("{}_{}", &name, i)) + .start_handler(move |_idx| { + let rc = spawned_threads.fetch_add(1, Ordering::Relaxed); + datapoint_info!("thread-manager-rayon", ("threads-spawned", rc, i64),); + apply_policy(&policy, priority, &chosen_cores_mask); + }) + .build()?; + Ok(Self { rayon_pool, config }) + } +} diff --git a/thread-manager/src/runtime_manager.rs b/thread-manager/src/runtime_manager.rs new file mode 100644 index 00000000000000..e69de29bb2d1d6 diff --git a/thread-manager/src/tokio_runtime.rs b/thread-manager/src/tokio_runtime.rs new file mode 100644 index 00000000000000..b8563f9ae11348 --- /dev/null +++ b/thread-manager/src/tokio_runtime.rs @@ -0,0 +1,152 @@ +use { + crate::policy::{apply_policy, CoreAllocation}, + serde::{Deserialize, Serialize}, + solana_metrics::datapoint_info, + std::{ + sync::{ + atomic::{AtomicI64, AtomicUsize, Ordering}, + Arc, Mutex, + }, + time::Duration, + }, + thread_priority::ThreadExt, +}; + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(default)] +pub struct TokioConfig { + ///number of worker threads tokio is allowed to spawn + pub worker_threads: usize, + ///max number of blocking threads tokio is allowed to spawn + pub max_blocking_threads: usize, + pub priority: u8, + pub stack_size_bytes: usize, + pub event_interval: u32, + pub core_allocation: CoreAllocation, +} + +impl Default for TokioConfig { + fn default() -> Self { + Self { + core_allocation: CoreAllocation::OsDefault, + worker_threads: 1, + max_blocking_threads: 1, + priority: 0, + stack_size_bytes: 2 * 1024 * 1024, + event_interval: 61, + } + } +} + +#[derive(Debug)] +pub struct TokioRuntime { + pub tokio: tokio::runtime::Runtime, + pub config: TokioConfig, + pub counters: Arc, +} + +impl TokioRuntime { + /// Starts the metrics sampling task on the runtime to monitor how many workers are busy doing useful things. + pub fn start_metrics_sampling(&self, period: Duration) { + let counters = self.counters.clone(); + self.tokio.spawn(metrics_sampler(counters, period)); + } + + pub fn new(name: String, cfg: TokioConfig) -> anyhow::Result { + let num_workers = if cfg.worker_threads == 0 { + num_cpus::get() + } else { + cfg.worker_threads + }; + let chosen_cores_mask = cfg.core_allocation.as_core_mask_vector(); + + let base_name = name.clone(); + let mut builder = match num_workers { + 1 => tokio::runtime::Builder::new_current_thread(), + _ => { + let mut builder = tokio::runtime::Builder::new_multi_thread(); + builder.worker_threads(num_workers); + builder + } + }; + let atomic_id: AtomicUsize = AtomicUsize::new(0); + + let counters = Arc::new(ThreadCounters { + namespace: format!("thread-manager-tokio-{}", &base_name).leak(), // no workaround, metrics crate will only consume 'static str + parked_threads_cnt: AtomicI64::new(0), + active_threads_cnt: AtomicI64::new( + (num_workers.wrapping_add(cfg.max_blocking_threads)) as i64, + ), + }); + let counters_clone1 = counters.clone(); + let counters_clone2 = counters.clone(); + builder + .event_interval(cfg.event_interval) + .thread_name_fn(move || { + let id = atomic_id.fetch_add(1, Ordering::Relaxed); + format!("{}-{}", base_name, id) + }) + .on_thread_park(move || { + counters_clone1.on_park(); + }) + .on_thread_unpark(move || { + counters_clone2.on_unpark(); + }) + .thread_stack_size(cfg.stack_size_bytes) + .enable_all() + .max_blocking_threads(cfg.max_blocking_threads); + + //keep borrow checker happy and move these things into the closure + let c = cfg.clone(); + let chosen_cores_mask = Mutex::new(chosen_cores_mask); + builder.on_thread_start(move || { + let cur_thread = std::thread::current(); + let _tid = cur_thread + .get_native_id() + .expect("Can not get thread id for newly created thread"); + // todo - tracing + //let tname = cur_thread.name().unwrap(); + //println!("thread {tname} id {tid} started"); + apply_policy(&c.core_allocation, c.priority, &chosen_cores_mask); + }); + Ok(TokioRuntime { + tokio: builder.build()?, + config: cfg.clone(), + counters, + }) + } +} + +///Internal counters to keep track of worker pool utilization +#[derive(Debug)] +pub struct ThreadCounters { + pub namespace: &'static str, + pub parked_threads_cnt: AtomicI64, + pub active_threads_cnt: AtomicI64, +} + +impl ThreadCounters { + pub fn on_park(&self) { + self.parked_threads_cnt.fetch_add(1, Ordering::Relaxed); + self.active_threads_cnt.fetch_sub(1, Ordering::Relaxed); + } + + pub fn on_unpark(&self) { + self.parked_threads_cnt.fetch_sub(1, Ordering::Relaxed); + self.active_threads_cnt.fetch_add(1, Ordering::Relaxed); + } +} + +async fn metrics_sampler(counters: Arc, period: Duration) { + let mut interval = tokio::time::interval(period); + loop { + interval.tick().await; + let parked = counters.parked_threads_cnt.load(Ordering::Relaxed); + let active = counters.active_threads_cnt.load(Ordering::Relaxed); + datapoint_info!( + counters.namespace, + ("threads_parked", parked, i64), + ("threads_active", active, i64), + ); + } +}