diff --git a/thread-manager/Cargo.toml b/thread-manager/Cargo.toml index 69d40c4601dbf3..f127842053da57 100644 --- a/thread-manager/Cargo.toml +++ b/thread-manager/Cargo.toml @@ -1,5 +1,7 @@ [package] name = "agave-thread-manager" +description = "Thread pool manager for agave" + version = { workspace = true } authors = { workspace = true } repository = { workspace = true } @@ -7,6 +9,7 @@ homepage = { workspace = true } license = { workspace = true } edition = { workspace = true } +publish = false [dependencies] affinity = "0.1.2" diff --git a/thread-manager/README.md b/thread-manager/README.md index f12861621686e1..5a75d1dfef28bb 100644 --- a/thread-manager/README.md +++ b/thread-manager/README.md @@ -1,5 +1,5 @@ # thread-manager -Balances machine resources between multiple threaded runtimes. The purpose is to manage thread contention between different parts of the code that may +Balances machine resources between multiple threaded runtimes. The purpose is to manage thread contention between different parts of the code that may benefit from a diverse set of management options. For example, we may want to have cores 1-4 handling networking via Tokio, core 5 handling file IO via Tokio, cores 9-16 hallocated for Rayon thread pool, and cores 6-8 available for general use by std::thread. This will minimize contention for CPU caches and context switches that would occur if Rayon was entirely unaware it was running side-by-side with tokio, and each was to spawn as many threads as there are cores. # Supported threading models @@ -18,3 +18,9 @@ Rayon already manages thread pools well enough, all thread_manager does on top i * Thread pools can only be created at process startup * Once thread pool is created, its policy can not be modified at runtime + +# TODO: + + * support tracing + * proper error handling everywhere + * even more tests diff --git a/thread-manager/examples/core_contention_sweep.rs b/thread-manager/examples/core_contention_sweep.rs index 53706a09a344dd..f160ddf3886d4e 100644 --- a/thread-manager/examples/core_contention_sweep.rs +++ b/thread-manager/examples/core_contention_sweep.rs @@ -38,12 +38,12 @@ async fn axum_main(port: u16) { } use agave_thread_manager::*; fn make_config_shared(cc: usize) -> RuntimeManagerConfig { - let mut tokio_cfg_1 = TokioConfig::default(); - tokio_cfg_1.core_allocation = CoreAllocation::DedicatedCoreSet { min: 0, max: cc }; - tokio_cfg_1.worker_threads = cc; - let mut tokio_cfg_2 = TokioConfig::default(); - tokio_cfg_2.core_allocation = CoreAllocation::DedicatedCoreSet { min: 0, max: cc }; - tokio_cfg_2.worker_threads = cc; + let tokio_cfg_1 = TokioConfig { + core_allocation: CoreAllocation::DedicatedCoreSet { min: 0, max: cc }, + worker_threads: cc, + ..Default::default() + }; + let tokio_cfg_2 = tokio_cfg_1.clone(); RuntimeManagerConfig { tokio_configs: HashMap::from([ ("tokio1".into(), tokio_cfg_1), @@ -57,18 +57,22 @@ fn make_config_shared(cc: usize) -> RuntimeManagerConfig { } } fn make_config_dedicated(cc: usize) -> RuntimeManagerConfig { - let mut tokio_cfg_1 = TokioConfig::default(); - tokio_cfg_1.core_allocation = CoreAllocation::DedicatedCoreSet { - min: 0, - max: cc / 2, + let tokio_cfg_1 = TokioConfig { + core_allocation: CoreAllocation::DedicatedCoreSet { + min: 0, + max: cc / 2, + }, + worker_threads: cc / 2, + ..Default::default() }; - tokio_cfg_1.worker_threads = cc / 2; - let mut tokio_cfg_2 = TokioConfig::default(); - tokio_cfg_2.core_allocation = CoreAllocation::DedicatedCoreSet { - min: cc / 2, - max: cc, + let tokio_cfg_2 = TokioConfig { + core_allocation: CoreAllocation::DedicatedCoreSet { + min: cc / 2, + max: cc, + }, + worker_threads: cc / 2, + ..Default::default() }; - tokio_cfg_2.worker_threads = cc / 2; RuntimeManagerConfig { tokio_configs: HashMap::from([ ("tokio1".into(), tokio_cfg_1), diff --git a/thread-manager/src/lib.rs b/thread-manager/src/lib.rs index 9f58c70d580e55..87e1393e4e630b 100644 --- a/thread-manager/src/lib.rs +++ b/thread-manager/src/lib.rs @@ -1,21 +1,13 @@ use { - affinity::*, anyhow::Ok, serde::{Deserialize, Serialize}, - std::{ - collections::HashMap, - sync::{ - atomic::{AtomicUsize, Ordering}, - Mutex, - }, - }, - thread_priority::*, + std::collections::HashMap, }; -mod native_thread_runtime; -mod policy; -mod rayon_runtime; -mod tokio_runtime; +pub mod native_thread_runtime; +pub mod policy; +pub mod rayon_runtime; +pub mod tokio_runtime; pub use { native_thread_runtime::{NativeConfig, NativeThreadRuntime}, @@ -32,15 +24,23 @@ pub struct RuntimeManager { pub native_thread_runtimes: HashMap, pub native_runtime_mapping: HashMap, + + pub rayon_runtimes: HashMap, + pub rayon_runtime_mapping: HashMap, } #[derive(Default, Clone, Debug, Serialize, Deserialize)] #[serde(default)] pub struct RuntimeManagerConfig { + pub native_configs: HashMap, + pub native_runtime_mapping: HashMap, + + pub rayon_configs: HashMap, + pub rayon_runtime_mapping: HashMap, + pub tokio_configs: HashMap, pub tokio_runtime_mapping: HashMap, - pub native_runtime_mapping: HashMap, - pub native_configs: HashMap, + pub default_core_allocation: CoreAllocation, } @@ -49,6 +49,10 @@ impl RuntimeManager { let n = self.native_runtime_mapping.get(name)?; self.native_thread_runtimes.get(n) } + pub fn get_rayon(&self, name: &str) -> Option<&RayonRuntime> { + let n = self.rayon_runtime_mapping.get(name)?; + self.rayon_runtimes.get(n) + } pub fn get_tokio(&self, name: &str) -> Option<&TokioRuntime> { let n = self.tokio_runtime_mapping.get(name)?; self.tokio_runtimes.get(n) @@ -62,7 +66,7 @@ impl RuntimeManager { } }; - if let Err(e) = set_thread_affinity(&chosen_cores_mask) { + if let Err(e) = affinity::set_thread_affinity(&chosen_cores_mask) { anyhow::bail!(e.to_string()) } Ok(chosen_cores_mask) @@ -84,6 +88,11 @@ impl RuntimeManager { .native_runtime_mapping .insert(k.clone().into_boxed_str(), v.clone().into_boxed_str()); } + for (k, v) in config.rayon_runtime_mapping.iter() { + manager + .rayon_runtime_mapping + .insert(k.clone().into_boxed_str(), v.clone().into_boxed_str()); + } for (name, cfg) in config.native_configs.iter() { let nrt = NativeThreadRuntime::new(cfg.clone()); @@ -91,6 +100,12 @@ impl RuntimeManager { .native_thread_runtimes .insert(name.clone().into_boxed_str(), nrt); } + for (name, cfg) in config.rayon_configs.iter() { + let rrt = RayonRuntime::new(cfg.clone())?; + manager + .rayon_runtimes + .insert(name.clone().into_boxed_str(), rrt); + } for (name, cfg) in config.tokio_configs.iter() { let tokiort = TokioRuntime::new(name.clone(), cfg.clone())?; @@ -110,7 +125,7 @@ impl RuntimeManager { #[cfg(test)] mod tests { use { - crate::{CoreAllocation, NativeConfig, RuntimeManager, RuntimeManagerConfig}, + crate::{CoreAllocation, NativeConfig, RayonConfig, RuntimeManager, RuntimeManagerConfig}, std::collections::HashMap, }; @@ -170,8 +185,19 @@ mod tests { ..Default::default() }, )]), + rayon_configs: HashMap::from([( + "rayon1".to_owned(), + RayonConfig { + core_allocation: CoreAllocation::DedicatedCoreSet { min: 1, max: 4 }, + worker_threads: 3, + priority: 0, + ..Default::default() + }, + )]), default_core_allocation: CoreAllocation::DedicatedCoreSet { min: 4, max: 8 }, native_runtime_mapping: HashMap::from([("test".to_owned(), "pool1".to_owned())]), + + rayon_runtime_mapping: HashMap::from([("test".to_owned(), "rayon1".to_owned())]), ..Default::default() }; @@ -184,25 +210,19 @@ mod tests { assert_eq!(aff, [0, 1, 2, 3], "Managed thread allocation should be 0-3"); }) .unwrap(); + let rrt = rtm.get_rayon("test").unwrap(); - let rayon_pool = rayon::ThreadPoolBuilder::new() - .num_threads(3) - .start_handler(|idx| { - affinity::set_thread_affinity([1, 2, 3]).unwrap(); - }) - /*.spawn_handler(|thread| { - let mut b = std::thread::Builder::new(); - if let Some(name) = thread.name() { - b = b.name(name.to_owned()); - } - if let Some(stack_size) = thread.stack_size() { - b = b.stack_size(stack_size); - } - b.spawn(|| thread.run())?; - Ok(()) - })*/ - .build() - .unwrap(); + /*.spawn_handler(|thread| { + let mut b = std::thread::Builder::new(); + if let Some(name) = thread.name() { + b = b.name(name.to_owned()); + } + if let Some(stack_size) = thread.stack_size() { + b = b.stack_size(stack_size); + } + b.spawn(|| thread.run())?; + Ok(()) + })*/ let t = std::thread::spawn(|| { let aff = affinity::get_thread_affinity().unwrap(); @@ -218,7 +238,7 @@ mod tests { }); tt.join().unwrap(); }); - let _rr = rayon_pool.broadcast(|ctx| { + let _rr = rrt.rayon_pool.broadcast(|ctx| { let aff = affinity::get_thread_affinity().unwrap(); println!("Rayon thread {} reporting", ctx.index()); assert_eq!( diff --git a/thread-manager/src/native_thread_runtime.rs b/thread-manager/src/native_thread_runtime.rs index fad1457347d0f9..a8ce5da516e819 100644 --- a/thread-manager/src/native_thread_runtime.rs +++ b/thread-manager/src/native_thread_runtime.rs @@ -1,10 +1,10 @@ use { - crate::policy::CoreAllocation, + crate::policy::{apply_policy, CoreAllocation}, anyhow::bail, serde::{Deserialize, Serialize}, std::sync::{ atomic::{AtomicUsize, Ordering}, - Arc, + Arc, Mutex, }, }; @@ -13,7 +13,7 @@ use { pub struct NativeConfig { pub core_allocation: CoreAllocation, pub max_threads: usize, - pub priority: usize, + pub priority: u8, pub name_base: String, pub stack_size_bytes: usize, } @@ -97,20 +97,16 @@ impl NativeThreadRuntime { if spawned >= self.config.max_threads { bail!("All allowed threads in this pool are already spawned"); } - let core_set: Vec<_> = match self.config.core_allocation { - CoreAllocation::PinnedCores { min: _, max: _ } => { - todo!("Need to store pinning mask somewhere"); - } - CoreAllocation::DedicatedCoreSet { min, max } => (min..max).collect(), - CoreAllocation::OsDefault => (0..affinity::get_core_num()).collect(), - }; + let core_alloc = self.config.core_allocation.clone(); + let priority = self.config.priority; + let chosen_cores_mask = Mutex::new(self.config.core_allocation.as_core_mask_vector()); let n = self.id_count.fetch_add(1, Ordering::SeqCst); let jh = std::thread::Builder::new() .name(format!("{}-{}", &self.config.name_base, n)) .stack_size(self.config.stack_size_bytes) .spawn(move || { - affinity::set_thread_affinity(core_set).expect("Can not set thread affinity"); + apply_policy(&core_alloc, priority, &chosen_cores_mask); f() })?; self.running_count.fetch_add(1, Ordering::SeqCst); diff --git a/thread-manager/src/policy.rs b/thread-manager/src/policy.rs index 357b7f2b2a5b40..aa5b3e9aef6a6c 100644 --- a/thread-manager/src/policy.rs +++ b/thread-manager/src/policy.rs @@ -1,4 +1,7 @@ -use serde::{Deserialize, Serialize}; +use { + serde::{Deserialize, Serialize}, + thread_priority::ThreadExt, +}; #[derive(Default, Debug, Clone, Serialize, Deserialize)] pub enum CoreAllocation { @@ -23,4 +26,35 @@ impl CoreAllocation { } ///Applies policy to the calling thread -pub fn apply_policy(alloc: &CoreAllocation, priority: u32) {} +pub fn apply_policy( + alloc: &CoreAllocation, + priority: u8, + chosen_cores_mask: &std::sync::Mutex>, +) { + std::thread::current() + .set_priority(thread_priority::ThreadPriority::Crossplatform( + (priority).try_into().unwrap(), + )) + .expect("Can not set thread priority!"); + + match alloc { + CoreAllocation::PinnedCores { min: _, max: _ } => { + let mut lg = chosen_cores_mask + .lock() + .expect("Can not lock core mask mutex"); + let core = lg + .pop() + .expect("Not enough cores provided for pinned allocation"); + affinity::set_thread_affinity([core]) + .expect("Can not set thread affinity for runtime worker"); + } + CoreAllocation::DedicatedCoreSet { min: _, max: _ } => { + let lg = chosen_cores_mask + .lock() + .expect("Can not lock core mask mutex"); + affinity::set_thread_affinity(&(*lg)) + .expect("Can not set thread affinity for runtime worker"); + } + CoreAllocation::OsDefault => {} + } +} diff --git a/thread-manager/src/rayon_runtime.rs b/thread-manager/src/rayon_runtime.rs index bc050be58e8f6b..242cf9f2458f6a 100644 --- a/thread-manager/src/rayon_runtime.rs +++ b/thread-manager/src/rayon_runtime.rs @@ -2,13 +2,14 @@ use { crate::policy::{apply_policy, CoreAllocation}, anyhow::Ok, serde::{Deserialize, Serialize}, + std::sync::Mutex, }; #[derive(Clone, Debug, Serialize, Deserialize)] #[serde(default)] pub struct RayonConfig { pub worker_threads: usize, - pub priority: u32, + pub priority: u8, pub stack_size_bytes: usize, pub core_allocation: CoreAllocation, } @@ -31,13 +32,14 @@ pub struct RayonRuntime { } impl RayonRuntime { - fn new(config: RayonConfig) -> anyhow::Result { + pub fn new(config: RayonConfig) -> anyhow::Result { let policy = config.core_allocation.clone(); + let chosen_cores_mask = Mutex::new(policy.as_core_mask_vector()); let priority = config.priority; let rayon_pool = rayon::ThreadPoolBuilder::new() .num_threads(config.worker_threads) - .start_handler(move |idx| { - apply_policy(&policy, priority); + .start_handler(move |_idx| { + apply_policy(&policy, priority, &chosen_cores_mask); }) .build()?; Ok(Self { rayon_pool, config }) diff --git a/thread-manager/src/tokio_runtime.rs b/thread-manager/src/tokio_runtime.rs index e5d2efda70b2e6..ac8eb3d38f1dc3 100644 --- a/thread-manager/src/tokio_runtime.rs +++ b/thread-manager/src/tokio_runtime.rs @@ -1,5 +1,5 @@ use { - crate::policy::CoreAllocation, + crate::policy::{apply_policy, CoreAllocation}, serde::{Deserialize, Serialize}, std::{ future::Future, @@ -18,7 +18,7 @@ pub struct TokioConfig { pub worker_threads: usize, ///max number of blocking threads tokio is allowed to spawn pub max_blocking_threads: usize, - pub priority: u32, + pub priority: u8, pub stack_size_bytes: usize, pub event_interval: u32, pub core_allocation: CoreAllocation, @@ -83,35 +83,10 @@ impl TokioRuntime { let _tid = cur_thread .get_native_id() .expect("Can not get thread id for newly created thread"); - let tname = cur_thread.name().unwrap(); + // todo - tracing + //let tname = cur_thread.name().unwrap(); //println!("thread {tname} id {tid} started"); - std::thread::current() - .set_priority(thread_priority::ThreadPriority::Crossplatform( - (c.priority as u8).try_into().unwrap(), - )) - .expect("Can not set thread priority!"); - - match c.core_allocation { - CoreAllocation::PinnedCores { min: _, max: _ } => { - let mut lg = chosen_cores_mask - .lock() - .expect("Can not lock core mask mutex"); - let core = lg - .pop() - .expect("Not enough cores provided for pinned allocation"); - println!("Pinning worker {tname} to core {core}"); - affinity::set_thread_affinity([core]) - .expect("Can not set thread affinity for runtime worker"); - } - CoreAllocation::DedicatedCoreSet { min: _, max: _ } => { - let lg = chosen_cores_mask - .lock() - .expect("Can not lock core mask mutex"); - affinity::set_thread_affinity(&(*lg)) - .expect("Can not set thread affinity for runtime worker"); - } - CoreAllocation::OsDefault => {} - } + apply_policy(&c.core_allocation, c.priority, &chosen_cores_mask); }); Ok(TokioRuntime { tokio: builder.build()?,