Skip to content

Commit

Permalink
minimal viable version of the thread manager
Browse files Browse the repository at this point in the history
  • Loading branch information
Alex Pyattaev committed Dec 15, 2024
1 parent 2561944 commit bfaf7e2
Show file tree
Hide file tree
Showing 8 changed files with 140 additions and 100 deletions.
3 changes: 3 additions & 0 deletions thread-manager/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
[package]
name = "agave-thread-manager"
description = "Thread pool manager for agave"

version = { workspace = true }
authors = { workspace = true }
repository = { workspace = true }
homepage = { workspace = true }
license = { workspace = true }
edition = { workspace = true }

publish = false

[dependencies]
affinity = "0.1.2"
Expand Down
8 changes: 7 additions & 1 deletion thread-manager/README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# thread-manager
Balances machine resources between multiple threaded runtimes. The purpose is to manage thread contention between different parts of the code that may
Balances machine resources between multiple threaded runtimes. The purpose is to manage thread contention between different parts of the code that may
benefit from a diverse set of management options. For example, we may want to have cores 1-4 handling networking via Tokio, core 5 handling file IO via Tokio, cores 9-16 hallocated for Rayon thread pool, and cores 6-8 available for general use by std::thread. This will minimize contention for CPU caches and context switches that would occur if Rayon was entirely unaware it was running side-by-side with tokio, and each was to spawn as many threads as there are cores.

# Supported threading models
Expand All @@ -18,3 +18,9 @@ Rayon already manages thread pools well enough, all thread_manager does on top i

* Thread pools can only be created at process startup
* Once thread pool is created, its policy can not be modified at runtime

# TODO:

* support tracing
* proper error handling everywhere
* even more tests
36 changes: 20 additions & 16 deletions thread-manager/examples/core_contention_sweep.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,12 @@ async fn axum_main(port: u16) {
}
use agave_thread_manager::*;
fn make_config_shared(cc: usize) -> RuntimeManagerConfig {
let mut tokio_cfg_1 = TokioConfig::default();
tokio_cfg_1.core_allocation = CoreAllocation::DedicatedCoreSet { min: 0, max: cc };
tokio_cfg_1.worker_threads = cc;
let mut tokio_cfg_2 = TokioConfig::default();
tokio_cfg_2.core_allocation = CoreAllocation::DedicatedCoreSet { min: 0, max: cc };
tokio_cfg_2.worker_threads = cc;
let tokio_cfg_1 = TokioConfig {
core_allocation: CoreAllocation::DedicatedCoreSet { min: 0, max: cc },
worker_threads: cc,
..Default::default()
};
let tokio_cfg_2 = tokio_cfg_1.clone();
RuntimeManagerConfig {
tokio_configs: HashMap::from([
("tokio1".into(), tokio_cfg_1),
Expand All @@ -57,18 +57,22 @@ fn make_config_shared(cc: usize) -> RuntimeManagerConfig {
}
}
fn make_config_dedicated(cc: usize) -> RuntimeManagerConfig {
let mut tokio_cfg_1 = TokioConfig::default();
tokio_cfg_1.core_allocation = CoreAllocation::DedicatedCoreSet {
min: 0,
max: cc / 2,
let tokio_cfg_1 = TokioConfig {
core_allocation: CoreAllocation::DedicatedCoreSet {
min: 0,
max: cc / 2,
},
worker_threads: cc / 2,
..Default::default()
};
tokio_cfg_1.worker_threads = cc / 2;
let mut tokio_cfg_2 = TokioConfig::default();
tokio_cfg_2.core_allocation = CoreAllocation::DedicatedCoreSet {
min: cc / 2,
max: cc,
let tokio_cfg_2 = TokioConfig {
core_allocation: CoreAllocation::DedicatedCoreSet {
min: cc / 2,
max: cc,
},
worker_threads: cc / 2,
..Default::default()
};
tokio_cfg_2.worker_threads = cc / 2;
RuntimeManagerConfig {
tokio_configs: HashMap::from([
("tokio1".into(), tokio_cfg_1),
Expand Down
92 changes: 56 additions & 36 deletions thread-manager/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,13 @@
use {
affinity::*,
anyhow::Ok,
serde::{Deserialize, Serialize},
std::{
collections::HashMap,
sync::{
atomic::{AtomicUsize, Ordering},
Mutex,
},
},
thread_priority::*,
std::collections::HashMap,
};

mod native_thread_runtime;
mod policy;
mod rayon_runtime;
mod tokio_runtime;
pub mod native_thread_runtime;
pub mod policy;
pub mod rayon_runtime;
pub mod tokio_runtime;

pub use {
native_thread_runtime::{NativeConfig, NativeThreadRuntime},
Expand All @@ -32,15 +24,23 @@ pub struct RuntimeManager {

pub native_thread_runtimes: HashMap<ConstString, NativeThreadRuntime>,
pub native_runtime_mapping: HashMap<ConstString, ConstString>,

pub rayon_runtimes: HashMap<ConstString, RayonRuntime>,
pub rayon_runtime_mapping: HashMap<ConstString, ConstString>,
}

#[derive(Default, Clone, Debug, Serialize, Deserialize)]
#[serde(default)]
pub struct RuntimeManagerConfig {
pub native_configs: HashMap<String, NativeConfig>,
pub native_runtime_mapping: HashMap<String, String>,

pub rayon_configs: HashMap<String, RayonConfig>,
pub rayon_runtime_mapping: HashMap<String, String>,

pub tokio_configs: HashMap<String, TokioConfig>,
pub tokio_runtime_mapping: HashMap<String, String>,
pub native_runtime_mapping: HashMap<String, String>,
pub native_configs: HashMap<String, NativeConfig>,

pub default_core_allocation: CoreAllocation,
}

Expand All @@ -49,6 +49,10 @@ impl RuntimeManager {
let n = self.native_runtime_mapping.get(name)?;
self.native_thread_runtimes.get(n)
}
pub fn get_rayon(&self, name: &str) -> Option<&RayonRuntime> {
let n = self.rayon_runtime_mapping.get(name)?;
self.rayon_runtimes.get(n)
}
pub fn get_tokio(&self, name: &str) -> Option<&TokioRuntime> {
let n = self.tokio_runtime_mapping.get(name)?;
self.tokio_runtimes.get(n)
Expand All @@ -62,7 +66,7 @@ impl RuntimeManager {
}
};

if let Err(e) = set_thread_affinity(&chosen_cores_mask) {
if let Err(e) = affinity::set_thread_affinity(&chosen_cores_mask) {
anyhow::bail!(e.to_string())
}
Ok(chosen_cores_mask)
Expand All @@ -84,13 +88,24 @@ impl RuntimeManager {
.native_runtime_mapping
.insert(k.clone().into_boxed_str(), v.clone().into_boxed_str());
}
for (k, v) in config.rayon_runtime_mapping.iter() {
manager
.rayon_runtime_mapping
.insert(k.clone().into_boxed_str(), v.clone().into_boxed_str());
}

for (name, cfg) in config.native_configs.iter() {
let nrt = NativeThreadRuntime::new(cfg.clone());
manager
.native_thread_runtimes
.insert(name.clone().into_boxed_str(), nrt);
}
for (name, cfg) in config.rayon_configs.iter() {
let rrt = RayonRuntime::new(cfg.clone())?;
manager
.rayon_runtimes
.insert(name.clone().into_boxed_str(), rrt);
}

for (name, cfg) in config.tokio_configs.iter() {
let tokiort = TokioRuntime::new(name.clone(), cfg.clone())?;
Expand All @@ -110,7 +125,7 @@ impl RuntimeManager {
#[cfg(test)]
mod tests {
use {
crate::{CoreAllocation, NativeConfig, RuntimeManager, RuntimeManagerConfig},
crate::{CoreAllocation, NativeConfig, RayonConfig, RuntimeManager, RuntimeManagerConfig},
std::collections::HashMap,
};

Expand Down Expand Up @@ -170,8 +185,19 @@ mod tests {
..Default::default()
},
)]),
rayon_configs: HashMap::from([(
"rayon1".to_owned(),
RayonConfig {
core_allocation: CoreAllocation::DedicatedCoreSet { min: 1, max: 4 },
worker_threads: 3,
priority: 0,
..Default::default()
},
)]),
default_core_allocation: CoreAllocation::DedicatedCoreSet { min: 4, max: 8 },
native_runtime_mapping: HashMap::from([("test".to_owned(), "pool1".to_owned())]),

rayon_runtime_mapping: HashMap::from([("test".to_owned(), "rayon1".to_owned())]),
..Default::default()
};

Expand All @@ -184,25 +210,19 @@ mod tests {
assert_eq!(aff, [0, 1, 2, 3], "Managed thread allocation should be 0-3");
})
.unwrap();
let rrt = rtm.get_rayon("test").unwrap();

let rayon_pool = rayon::ThreadPoolBuilder::new()
.num_threads(3)
.start_handler(|idx| {
affinity::set_thread_affinity([1, 2, 3]).unwrap();
})
/*.spawn_handler(|thread| {
let mut b = std::thread::Builder::new();
if let Some(name) = thread.name() {
b = b.name(name.to_owned());
}
if let Some(stack_size) = thread.stack_size() {
b = b.stack_size(stack_size);
}
b.spawn(|| thread.run())?;
Ok(())
})*/
.build()
.unwrap();
/*.spawn_handler(|thread| {
let mut b = std::thread::Builder::new();
if let Some(name) = thread.name() {
b = b.name(name.to_owned());
}
if let Some(stack_size) = thread.stack_size() {
b = b.stack_size(stack_size);
}
b.spawn(|| thread.run())?;
Ok(())
})*/

let t = std::thread::spawn(|| {
let aff = affinity::get_thread_affinity().unwrap();
Expand All @@ -218,7 +238,7 @@ mod tests {
});
tt.join().unwrap();
});
let _rr = rayon_pool.broadcast(|ctx| {
let _rr = rrt.rayon_pool.broadcast(|ctx| {
let aff = affinity::get_thread_affinity().unwrap();
println!("Rayon thread {} reporting", ctx.index());
assert_eq!(
Expand Down
18 changes: 7 additions & 11 deletions thread-manager/src/native_thread_runtime.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use {
crate::policy::CoreAllocation,
crate::policy::{apply_policy, CoreAllocation},
anyhow::bail,
serde::{Deserialize, Serialize},
std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
Arc, Mutex,
},
};

Expand All @@ -13,7 +13,7 @@ use {
pub struct NativeConfig {
pub core_allocation: CoreAllocation,
pub max_threads: usize,
pub priority: usize,
pub priority: u8,
pub name_base: String,
pub stack_size_bytes: usize,
}
Expand Down Expand Up @@ -97,20 +97,16 @@ impl NativeThreadRuntime {
if spawned >= self.config.max_threads {
bail!("All allowed threads in this pool are already spawned");
}
let core_set: Vec<_> = match self.config.core_allocation {
CoreAllocation::PinnedCores { min: _, max: _ } => {
todo!("Need to store pinning mask somewhere");
}
CoreAllocation::DedicatedCoreSet { min, max } => (min..max).collect(),
CoreAllocation::OsDefault => (0..affinity::get_core_num()).collect(),
};

let core_alloc = self.config.core_allocation.clone();
let priority = self.config.priority;
let chosen_cores_mask = Mutex::new(self.config.core_allocation.as_core_mask_vector());
let n = self.id_count.fetch_add(1, Ordering::SeqCst);
let jh = std::thread::Builder::new()
.name(format!("{}-{}", &self.config.name_base, n))
.stack_size(self.config.stack_size_bytes)
.spawn(move || {
affinity::set_thread_affinity(core_set).expect("Can not set thread affinity");
apply_policy(&core_alloc, priority, &chosen_cores_mask);
f()
})?;
self.running_count.fetch_add(1, Ordering::SeqCst);
Expand Down
38 changes: 36 additions & 2 deletions thread-manager/src/policy.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use serde::{Deserialize, Serialize};
use {
serde::{Deserialize, Serialize},
thread_priority::ThreadExt,
};

#[derive(Default, Debug, Clone, Serialize, Deserialize)]
pub enum CoreAllocation {
Expand All @@ -23,4 +26,35 @@ impl CoreAllocation {
}

///Applies policy to the calling thread
pub fn apply_policy(alloc: &CoreAllocation, priority: u32) {}
pub fn apply_policy(
alloc: &CoreAllocation,
priority: u8,
chosen_cores_mask: &std::sync::Mutex<Vec<usize>>,
) {
std::thread::current()
.set_priority(thread_priority::ThreadPriority::Crossplatform(
(priority).try_into().unwrap(),
))
.expect("Can not set thread priority!");

match alloc {
CoreAllocation::PinnedCores { min: _, max: _ } => {
let mut lg = chosen_cores_mask
.lock()
.expect("Can not lock core mask mutex");
let core = lg
.pop()
.expect("Not enough cores provided for pinned allocation");
affinity::set_thread_affinity([core])
.expect("Can not set thread affinity for runtime worker");
}
CoreAllocation::DedicatedCoreSet { min: _, max: _ } => {
let lg = chosen_cores_mask
.lock()
.expect("Can not lock core mask mutex");
affinity::set_thread_affinity(&(*lg))
.expect("Can not set thread affinity for runtime worker");
}
CoreAllocation::OsDefault => {}
}
}
10 changes: 6 additions & 4 deletions thread-manager/src/rayon_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@ use {
crate::policy::{apply_policy, CoreAllocation},
anyhow::Ok,
serde::{Deserialize, Serialize},
std::sync::Mutex,
};

#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(default)]
pub struct RayonConfig {
pub worker_threads: usize,
pub priority: u32,
pub priority: u8,
pub stack_size_bytes: usize,
pub core_allocation: CoreAllocation,
}
Expand All @@ -31,13 +32,14 @@ pub struct RayonRuntime {
}

impl RayonRuntime {
fn new(config: RayonConfig) -> anyhow::Result<Self> {
pub fn new(config: RayonConfig) -> anyhow::Result<Self> {
let policy = config.core_allocation.clone();
let chosen_cores_mask = Mutex::new(policy.as_core_mask_vector());
let priority = config.priority;
let rayon_pool = rayon::ThreadPoolBuilder::new()
.num_threads(config.worker_threads)
.start_handler(move |idx| {
apply_policy(&policy, priority);
.start_handler(move |_idx| {
apply_policy(&policy, priority, &chosen_cores_mask);
})
.build()?;
Ok(Self { rayon_pool, config })
Expand Down
Loading

0 comments on commit bfaf7e2

Please sign in to comment.