Skip to content

Commit

Permalink
[Minor] Use std::thread::available_parallelism instead of num_cpus (a…
Browse files Browse the repository at this point in the history
…pache#13579)

* Use std::thread::available_parallelism

* Use std::thread::available_parallelism

* Use std::thread::available_parallelism

* Use std::thread::available_parallelism

* Use std::thread::available_parallelism

* Use std::thread::available_parallelism
  • Loading branch information
Dandandan authored Nov 28, 2024
1 parent fc49e8d commit 5818732
Show file tree
Hide file tree
Showing 18 changed files with 77 additions and 31 deletions.
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ hashbrown = { version = "0.14.5", features = ["raw"] }
indexmap = "2.0.0"
itertools = "0.13"
log = "^0.4"
num_cpus = "1.13.0"
object_store = { version = "0.11.0", default-features = false }
parking_lot = "0.12"
parquet = { version = "53.3.0", default-features = false, features = [
Expand Down
1 change: 0 additions & 1 deletion benchmarks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ env_logger = { workspace = true }
futures = { workspace = true }
log = { workspace = true }
mimalloc = { version = "0.1", optional = true, default-features = false }
num_cpus = { workspace = true }
parquet = { workspace = true, default-features = true }
serde = { version = "1.0.136", features = ["derive"] }
serde_json = { workspace = true }
Expand Down
8 changes: 7 additions & 1 deletion benchmarks/src/bin/external_aggr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
//! external_aggr binary entrypoint
use std::collections::HashMap;
use std::num::NonZero;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::OnceLock;
use std::thread::available_parallelism;
use structopt::StructOpt;

use arrow::record_batch::RecordBatch;
Expand Down Expand Up @@ -325,7 +327,11 @@ impl ExternalAggrConfig {
}

fn partitions(&self) -> usize {
self.common.partitions.unwrap_or(num_cpus::get())
self.common.partitions.unwrap_or(
available_parallelism()
.unwrap_or(NonZero::new(1).unwrap())
.get(),
)
}

/// Parse memory limit from string to number of bytes
Expand Down
6 changes: 5 additions & 1 deletion benchmarks/src/bin/h2o.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@ use datafusion::datasource::MemTable;
use datafusion::prelude::CsvReadOptions;
use datafusion::{arrow::util::pretty, error::Result, prelude::SessionContext};
use datafusion_benchmarks::util::BenchmarkRun;
use std::num::NonZero;
use std::path::PathBuf;
use std::sync::Arc;
use std::thread::available_parallelism;
use structopt::StructOpt;
use tokio::time::Instant;

Expand Down Expand Up @@ -91,7 +93,9 @@ async fn group_by(opt: &GroupBy) -> Result<()> {
.with_listing_options(ListingOptions::new(Arc::new(CsvFormat::default())))
.with_schema(Arc::new(schema));
let csv = ListingTable::try_new(listing_config)?;
let partition_size = num_cpus::get();
let partition_size = available_parallelism()
.unwrap_or(NonZero::new(1).unwrap())
.get();
let memtable =
MemTable::load(Arc::new(csv), Some(partition_size), &ctx.state()).await?;
ctx.register_table("x", Arc::new(memtable))?;
Expand Down
8 changes: 7 additions & 1 deletion benchmarks/src/imdb/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
// specific language governing permissions and limitations
// under the License.

use std::num::NonZero;
use std::path::PathBuf;
use std::sync::Arc;
use std::thread::available_parallelism;

use super::{get_imdb_table_schema, get_query_sql, IMDB_TABLES};
use crate::util::{BenchmarkRun, CommonOpt};
Expand Down Expand Up @@ -468,7 +470,11 @@ impl RunOpt {
}

fn partitions(&self) -> usize {
self.common.partitions.unwrap_or(num_cpus::get())
self.common.partitions.unwrap_or(
available_parallelism()
.unwrap_or(NonZero::new(1).unwrap())
.get(),
)
}
}

Expand Down
8 changes: 7 additions & 1 deletion benchmarks/src/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
// specific language governing permissions and limitations
// under the License.

use std::num::NonZero;
use std::path::PathBuf;
use std::sync::Arc;
use std::thread::available_parallelism;

use crate::util::{AccessLogOpt, BenchmarkRun, CommonOpt};

Expand Down Expand Up @@ -147,7 +149,11 @@ impl RunOpt {
rundata.start_new_case(title);
for i in 0..self.common.iterations {
let config = SessionConfig::new().with_target_partitions(
self.common.partitions.unwrap_or(num_cpus::get()),
self.common.partitions.unwrap_or(
available_parallelism()
.unwrap_or(NonZero::new(1).unwrap())
.get(),
),
);
let ctx = SessionContext::new_with_config(config);
let (rows, elapsed) =
Expand Down
8 changes: 7 additions & 1 deletion benchmarks/src/sort_tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
//! runs end-to-end sort queries and test the performance on multiple CPU cores.
use futures::StreamExt;
use std::num::NonZero;
use std::path::PathBuf;
use std::sync::Arc;
use std::thread::available_parallelism;
use structopt::StructOpt;

use datafusion::datasource::file_format::parquet::ParquetFormat;
Expand Down Expand Up @@ -315,6 +317,10 @@ impl RunOpt {
}

fn partitions(&self) -> usize {
self.common.partitions.unwrap_or(num_cpus::get())
self.common.partitions.unwrap_or(
available_parallelism()
.unwrap_or(NonZero::new(1).unwrap())
.get(),
)
}
}
8 changes: 7 additions & 1 deletion benchmarks/src/tpch/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
// specific language governing permissions and limitations
// under the License.

use std::num::NonZero;
use std::path::PathBuf;
use std::sync::Arc;
use std::thread::available_parallelism;

use super::{
get_query_sql, get_tbl_tpch_table_schema, get_tpch_table_schema, TPCH_TABLES,
Expand Down Expand Up @@ -296,7 +298,11 @@ impl RunOpt {
}

fn partitions(&self) -> usize {
self.common.partitions.unwrap_or(num_cpus::get())
self.common.partitions.unwrap_or(
available_parallelism()
.unwrap_or(NonZero::new(1).unwrap())
.get(),
)
}
}

Expand Down
10 changes: 9 additions & 1 deletion benchmarks/src/util/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.

use std::{num::NonZero, thread::available_parallelism};

use datafusion::prelude::SessionConfig;
use structopt::StructOpt;

Expand Down Expand Up @@ -48,7 +50,13 @@ impl CommonOpt {
/// Modify the existing config appropriately
pub fn update_config(&self, config: SessionConfig) -> SessionConfig {
config
.with_target_partitions(self.partitions.unwrap_or(num_cpus::get()))
.with_target_partitions(
self.partitions.unwrap_or(
available_parallelism()
.unwrap_or(NonZero::new(1).unwrap())
.get(),
),
)
.with_batch_size(self.batch_size)
}
}
6 changes: 5 additions & 1 deletion benchmarks/src/util/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ use serde::{Serialize, Serializer};
use serde_json::Value;
use std::{
collections::HashMap,
num::NonZero,
path::Path,
thread::available_parallelism,
time::{Duration, SystemTime},
};

Expand Down Expand Up @@ -68,7 +70,9 @@ impl RunContext {
Self {
benchmark_version: env!("CARGO_PKG_VERSION").to_owned(),
datafusion_version: DATAFUSION_VERSION.to_owned(),
num_cpus: num_cpus::get(),
num_cpus: available_parallelism()
.unwrap_or(NonZero::new(1).unwrap())
.get(),
start_time: SystemTime::now(),
arguments: std::env::args().skip(1).collect::<Vec<String>>(),
}
Expand Down
12 changes: 0 additions & 12 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion datafusion/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ half = { workspace = true }
hashbrown = { workspace = true }
indexmap = { workspace = true }
libc = "0.2.140"
num_cpus = { workspace = true }
object_store = { workspace = true, optional = true }
parquet = { workspace = true, optional = true, default-features = true }
paste = "1.0.15"
Expand Down
6 changes: 4 additions & 2 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
use std::any::Any;
use std::collections::{BTreeMap, HashMap};
use std::fmt::{self, Display};
use std::num::NonZero;
use std::str::FromStr;
use std::thread::available_parallelism;

use crate::error::_config_err;
use crate::parsers::CompressionTypeVariant;
Expand Down Expand Up @@ -250,7 +252,7 @@ config_namespace! {
/// concurrency.
///
/// Defaults to the number of CPU cores on the system
pub target_partitions: usize, default = num_cpus::get()
pub target_partitions: usize, default = available_parallelism().unwrap_or(NonZero::new(1).unwrap()).get()

/// The default time zone
///
Expand All @@ -266,7 +268,7 @@ config_namespace! {
/// This is mostly use to plan `UNION` children in parallel.
///
/// Defaults to the number of CPU cores on the system
pub planning_concurrency: usize, default = num_cpus::get()
pub planning_concurrency: usize, default = available_parallelism().unwrap_or(NonZero::new(1).unwrap()).get()

/// When set to true, skips verifying that the schema produced by
/// planning the input of `LogicalPlan::Aggregate` exactly matches the
Expand Down
1 change: 0 additions & 1 deletion datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ glob = "0.3.0"
itertools = { workspace = true }
log = { workspace = true }
num-traits = { version = "0.2", optional = true }
num_cpus = { workspace = true }
object_store = { workspace = true }
parking_lot = { workspace = true }
parquet = { workspace = true, optional = true, default-features = true }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use std::{cmp, sync::Arc};
use std::{cmp, num::NonZero, sync::Arc, thread::available_parallelism};

use datafusion::{
datasource::MemTable,
Expand Down Expand Up @@ -73,7 +73,9 @@ impl SessionContextGenerator {
];

let max_batch_size = cmp::max(1, dataset_ref.total_rows_num);
let max_target_partitions = num_cpus::get();
let max_target_partitions = available_parallelism()
.unwrap_or(NonZero::new(1).unwrap())
.get();

Self {
dataset: dataset_ref,
Expand Down
9 changes: 8 additions & 1 deletion datafusion/core/tests/sql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
// specific language governing permissions and limitations
// under the License.

use std::num::NonZero;
use std::sync::Arc;
use std::thread::available_parallelism;

use arrow::{
array::*, datatypes::*, record_batch::RecordBatch,
Expand Down Expand Up @@ -259,7 +261,12 @@ impl ExplainNormalizer {

// convert things like partitioning=RoundRobinBatch(16)
// to partitioning=RoundRobinBatch(NUM_CORES)
let needle = format!("RoundRobinBatch({})", num_cpus::get());
let needle = format!(
"RoundRobinBatch({})",
available_parallelism()
.unwrap_or(NonZero::new(1).unwrap())
.get()
);
replacements.push((needle, "RoundRobinBatch(NUM_CORES)".to_string()));

Self { replacements }
Expand Down
1 change: 0 additions & 1 deletion datafusion/sqllogictest/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ postgres = [

[dev-dependencies]
env_logger = { workspace = true }
num_cpus = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread"] }

[[test]]
Expand Down
8 changes: 7 additions & 1 deletion datafusion/sqllogictest/bin/sqllogictests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

use std::ffi::OsStr;
use std::fs;
use std::num::NonZero;
use std::path::{Path, PathBuf};
use std::thread::available_parallelism;

use clap::Parser;
use datafusion_sqllogictest::{DataFusion, TestContext};
Expand Down Expand Up @@ -112,7 +114,11 @@ async fn run_tests() -> Result<()> {
.join()
})
// run up to num_cpus streams in parallel
.buffer_unordered(num_cpus::get())
.buffer_unordered(
available_parallelism()
.unwrap_or(NonZero::new(1).unwrap())
.get(),
)
.flat_map(|result| {
// Filter out any Ok() leaving only the DataFusionErrors
futures::stream::iter(match result {
Expand Down

0 comments on commit 5818732

Please sign in to comment.