diff --git a/Cargo.lock b/Cargo.lock index 5acec2a9028929..8c3a02713a6af8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2773,7 +2773,7 @@ dependencies = [ "futures-core", "futures-sink", "futures-util", - "http", + "http 0.2.12", "indexmap 2.7.0", "slab", "tokio", diff --git a/thread-manager/README.md b/thread-manager/README.md index 7fa25ffc1571f6..f12861621686e1 100644 --- a/thread-manager/README.md +++ b/thread-manager/README.md @@ -1,14 +1,20 @@ # thread-manager -Balances machine resources between multiple Tokio runtimes +Balances machine resources between multiple threaded runtimes. The purpose is to manage thread contention between different parts of the code that may +benefit from a diverse set of management options. For example, we may want to have cores 1-4 handling networking via Tokio, core 5 handling file IO via Tokio, cores 9-16 hallocated for Rayon thread pool, and cores 6-8 available for general use by std::thread. This will minimize contention for CPU caches and context switches that would occur if Rayon was entirely unaware it was running side-by-side with tokio, and each was to spawn as many threads as there are cores. # Supported threading models ## Tokio Multiple tokio runtimes can be created, and each may be assigned its own pool of CPU cores to run on. -Number of worker and blocking threads is configurable +Number of worker and blocking threads is configurable, as are thread priorities for the pool. ## Native -Native threads can be spawned from managed pools, this allows them to inheirt a particular affinity from the pool, as well as to +Native threads (std::thread) can be spawned from managed pools, this allows them to inheirt a particular affinity from the pool, as well as to control the total number of threads made in every pool. ## Rayon -Rayon already manages thread pools, all thread_manager does on top is enforce affinity and priority for rayon threads. +Rayon already manages thread pools well enough, all thread_manager does on top is enforce affinity and priority for rayon threads. Normally one would only ever have one rayon pool, but for priority allocations one may want to spawn many rayon pools. + +# Limitations + + * Thread pools can only be created at process startup + * Once thread pool is created, its policy can not be modified at runtime diff --git a/thread-manager/examples/core_contention_basics.rs b/thread-manager/examples/core_contention_basics.rs index ed40dd6918de85..ff1bf7c9fc4279 100644 --- a/thread-manager/examples/core_contention_basics.rs +++ b/thread-manager/examples/core_contention_basics.rs @@ -35,8 +35,7 @@ async fn axum_main(port: u16) { } } } -use affinity::*; -use agave_thread_manager::*; +use {affinity::*, agave_thread_manager::*}; fn main() -> anyhow::Result<()> { println!( diff --git a/thread-manager/src/lib.rs b/thread-manager/src/lib.rs index 08dd6510579821..9f58c70d580e55 100644 --- a/thread-manager/src/lib.rs +++ b/thread-manager/src/lib.rs @@ -14,13 +14,15 @@ use { mod native_thread_runtime; mod policy; -mod tokio_runtime; mod rayon_runtime; +mod tokio_runtime; -pub use rayon_runtime::{RayonConfig, RayonRuntime} -pub use native_thread_runtime::{NativeConfig, NativeThreadRuntime}; -pub use policy::CoreAllocation; -pub use tokio_runtime::{TokioConfig, TokioRuntime}; +pub use { + native_thread_runtime::{NativeConfig, NativeThreadRuntime}, + policy::CoreAllocation, + rayon_runtime::{RayonConfig, RayonRuntime}, + tokio_runtime::{TokioConfig, TokioRuntime}, +}; pub type ConstString = Box; #[derive(Default, Debug)] @@ -93,11 +95,13 @@ impl RuntimeManager { for (name, cfg) in config.tokio_configs.iter() { let tokiort = TokioRuntime::new(name.clone(), cfg.clone())?; - core_allocations.insert(name.clone().into_boxed_str(), cfg.core_allocation.as_core_mask_vector()); - manager.tokio_runtimes.insert( + core_allocations.insert( name.clone().into_boxed_str(), - tokiort + cfg.core_allocation.as_core_mask_vector(), ); + manager + .tokio_runtimes + .insert(name.clone().into_boxed_str(), tokiort); } Ok(manager) } @@ -105,9 +109,10 @@ impl RuntimeManager { #[cfg(test)] mod tests { - use std::collections::HashMap; - - use crate::{CoreAllocation, NativeConfig, RuntimeManager, RuntimeManagerConfig}; + use { + crate::{CoreAllocation, NativeConfig, RuntimeManager, RuntimeManagerConfig}, + std::collections::HashMap, + }; #[test] fn process_affinity() { diff --git a/thread-manager/src/native_thread_runtime.rs b/thread-manager/src/native_thread_runtime.rs index 7c405d942d24bb..fad1457347d0f9 100644 --- a/thread-manager/src/native_thread_runtime.rs +++ b/thread-manager/src/native_thread_runtime.rs @@ -2,8 +2,10 @@ use { crate::policy::CoreAllocation, anyhow::bail, serde::{Deserialize, Serialize}, - std::sync::atomic::{AtomicUsize, Ordering}, - std::sync::Arc, + std::sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, }; #[derive(Clone, Debug, Serialize, Deserialize)] diff --git a/thread-manager/src/policy.rs b/thread-manager/src/policy.rs index 8c5758bbfa7dce..357b7f2b2a5b40 100644 --- a/thread-manager/src/policy.rs +++ b/thread-manager/src/policy.rs @@ -23,4 +23,4 @@ impl CoreAllocation { } ///Applies policy to the calling thread -pub fn apply_policy(alloc: CoreAllocation, priority: u32) {} +pub fn apply_policy(alloc: &CoreAllocation, priority: u32) {} diff --git a/thread-manager/src/rayon_runtime.rs b/thread-manager/src/rayon_runtime.rs index 642711c12b8314..bc050be58e8f6b 100644 --- a/thread-manager/src/rayon_runtime.rs +++ b/thread-manager/src/rayon_runtime.rs @@ -1,5 +1,5 @@ use { - crate::policy::CoreAllocation, + crate::policy::{apply_policy, CoreAllocation}, anyhow::Ok, serde::{Deserialize, Serialize}, }; @@ -32,11 +32,12 @@ pub struct RayonRuntime { impl RayonRuntime { fn new(config: RayonConfig) -> anyhow::Result { - let policy = config.core_allocation; + let policy = config.core_allocation.clone(); + let priority = config.priority; let rayon_pool = rayon::ThreadPoolBuilder::new() .num_threads(config.worker_threads) .start_handler(move |idx| { - affinity::set_thread_affinity([1, 2, 3]).unwrap(); + apply_policy(&policy, priority); }) .build()?; Ok(Self { rayon_pool, config })