Skip to content

Commit

Permalink
organizing policy code
Browse files Browse the repository at this point in the history
  • Loading branch information
Alex Pyattaev committed Dec 15, 2024
1 parent b293f6e commit 2561944
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 24 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 10 additions & 4 deletions thread-manager/README.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
# thread-manager
Balances machine resources between multiple Tokio runtimes
Balances machine resources between multiple threaded runtimes. The purpose is to manage thread contention between different parts of the code that may
benefit from a diverse set of management options. For example, we may want to have cores 1-4 handling networking via Tokio, core 5 handling file IO via Tokio, cores 9-16 hallocated for Rayon thread pool, and cores 6-8 available for general use by std::thread. This will minimize contention for CPU caches and context switches that would occur if Rayon was entirely unaware it was running side-by-side with tokio, and each was to spawn as many threads as there are cores.

# Supported threading models
## Tokio
Multiple tokio runtimes can be created, and each may be assigned its own pool of CPU cores to run on.
Number of worker and blocking threads is configurable
Number of worker and blocking threads is configurable, as are thread priorities for the pool.

## Native
Native threads can be spawned from managed pools, this allows them to inheirt a particular affinity from the pool, as well as to
Native threads (std::thread) can be spawned from managed pools, this allows them to inheirt a particular affinity from the pool, as well as to
control the total number of threads made in every pool.

## Rayon
Rayon already manages thread pools, all thread_manager does on top is enforce affinity and priority for rayon threads.
Rayon already manages thread pools well enough, all thread_manager does on top is enforce affinity and priority for rayon threads. Normally one would only ever have one rayon pool, but for priority allocations one may want to spawn many rayon pools.

# Limitations

* Thread pools can only be created at process startup
* Once thread pool is created, its policy can not be modified at runtime
3 changes: 1 addition & 2 deletions thread-manager/examples/core_contention_basics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ async fn axum_main(port: u16) {
}
}
}
use affinity::*;
use agave_thread_manager::*;
use {affinity::*, agave_thread_manager::*};

fn main() -> anyhow::Result<()> {
println!(
Expand Down
27 changes: 16 additions & 11 deletions thread-manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@ use {

mod native_thread_runtime;
mod policy;
mod tokio_runtime;
mod rayon_runtime;
mod tokio_runtime;

pub use rayon_runtime::{RayonConfig, RayonRuntime}
pub use native_thread_runtime::{NativeConfig, NativeThreadRuntime};
pub use policy::CoreAllocation;
pub use tokio_runtime::{TokioConfig, TokioRuntime};
pub use {
native_thread_runtime::{NativeConfig, NativeThreadRuntime},
policy::CoreAllocation,
rayon_runtime::{RayonConfig, RayonRuntime},
tokio_runtime::{TokioConfig, TokioRuntime},
};
pub type ConstString = Box<str>;

#[derive(Default, Debug)]
Expand Down Expand Up @@ -93,21 +95,24 @@ impl RuntimeManager {
for (name, cfg) in config.tokio_configs.iter() {
let tokiort = TokioRuntime::new(name.clone(), cfg.clone())?;

core_allocations.insert(name.clone().into_boxed_str(), cfg.core_allocation.as_core_mask_vector());
manager.tokio_runtimes.insert(
core_allocations.insert(
name.clone().into_boxed_str(),
tokiort
cfg.core_allocation.as_core_mask_vector(),
);
manager
.tokio_runtimes
.insert(name.clone().into_boxed_str(), tokiort);
}
Ok(manager)
}
}

#[cfg(test)]
mod tests {
use std::collections::HashMap;

use crate::{CoreAllocation, NativeConfig, RuntimeManager, RuntimeManagerConfig};
use {
crate::{CoreAllocation, NativeConfig, RuntimeManager, RuntimeManagerConfig},
std::collections::HashMap,
};

#[test]
fn process_affinity() {
Expand Down
6 changes: 4 additions & 2 deletions thread-manager/src/native_thread_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ use {
crate::policy::CoreAllocation,
anyhow::bail,
serde::{Deserialize, Serialize},
std::sync::atomic::{AtomicUsize, Ordering},
std::sync::Arc,
std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
};

#[derive(Clone, Debug, Serialize, Deserialize)]
Expand Down
2 changes: 1 addition & 1 deletion thread-manager/src/policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,4 @@ impl CoreAllocation {
}

///Applies policy to the calling thread
pub fn apply_policy(alloc: CoreAllocation, priority: u32) {}
pub fn apply_policy(alloc: &CoreAllocation, priority: u32) {}
7 changes: 4 additions & 3 deletions thread-manager/src/rayon_runtime.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use {
crate::policy::CoreAllocation,
crate::policy::{apply_policy, CoreAllocation},
anyhow::Ok,
serde::{Deserialize, Serialize},
};
Expand Down Expand Up @@ -32,11 +32,12 @@ pub struct RayonRuntime {

impl RayonRuntime {
fn new(config: RayonConfig) -> anyhow::Result<Self> {
let policy = config.core_allocation;
let policy = config.core_allocation.clone();
let priority = config.priority;
let rayon_pool = rayon::ThreadPoolBuilder::new()
.num_threads(config.worker_threads)
.start_handler(move |idx| {
affinity::set_thread_affinity([1, 2, 3]).unwrap();
apply_policy(&policy, priority);
})
.build()?;
Ok(Self { rayon_pool, config })
Expand Down

0 comments on commit 2561944

Please sign in to comment.