Skip to content

Commit

Permalink
limit concurrent_task to executor_cores
Browse files Browse the repository at this point in the history
  • Loading branch information
milenkovicm committed Nov 27, 2024
1 parent 74ca8dd commit 94359f7
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 7 deletions.
2 changes: 1 addition & 1 deletion ballista/executor/executor_config_spec.toml
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ doc = "Directory for temporary IPC files"
abbr = "c"
name = "concurrent_tasks"
type = "usize"
default = "0" # defaults to all available cores if left as zero
doc = "Max concurrent tasks."

[[param]]
Expand Down Expand Up @@ -167,6 +166,7 @@ doc = "The number of worker threads for the runtime of caching. Default: 2"
default = "2"

[[param]]
abbr = "e"
name = "executor_cores"
type = "usize"
doc = "The number of worker threads. Default: number of available cores"
6 changes: 1 addition & 5 deletions ballista/executor/src/bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,10 @@ fn main() -> Result<()> {
Config::including_optional_config_files(&["/etc/ballista/executor.toml"])
.unwrap_or_exit();

let executor_cores = opt
.executor_cores
.unwrap_or_else(|| std::thread::available_parallelism().unwrap().get());

let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.thread_name("ballista_executor")
.worker_threads(executor_cores)
.worker_threads(opt.executor_cores_or_default())
.build()
.unwrap();

Expand Down
13 changes: 12 additions & 1 deletion ballista/executor/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,15 @@ use crate::executor_process::ExecutorProcessConfig;
// #[allow(clippy::all)] to silence clippy warnings from the generated code
include!(concat!(env!("OUT_DIR"), "/executor_configure_me_config.rs"));

impl Config {
/// returns executor cores if setup or number of available
/// cpu cores
pub fn executor_cores_or_default(&self) -> usize {
self.executor_cores
.unwrap_or_else(|| std::thread::available_parallelism().unwrap().get())
}
}

impl TryFrom<Config> for ExecutorProcessConfig {
type Error = BallistaError;

Expand All @@ -35,6 +44,8 @@ impl TryFrom<Config> for ExecutorProcessConfig {
opt.bind_port
);

let concurrent_tasks = opt.executor_cores_or_default();

Ok(ExecutorProcessConfig {
special_mod_log_level: opt.log_level_setting,
external_host: opt.external_host,
Expand All @@ -44,7 +55,7 @@ impl TryFrom<Config> for ExecutorProcessConfig {
scheduler_host: opt.scheduler_host,
scheduler_port: opt.scheduler_port,
scheduler_connect_timeout_seconds: opt.scheduler_connect_timeout_seconds,
concurrent_tasks: opt.concurrent_tasks,
concurrent_tasks,
task_scheduling_policy: opt.task_scheduling_policy,
work_dir: opt.work_dir,
log_dir: opt.log_dir,
Expand Down

0 comments on commit 94359f7

Please sign in to comment.