diff --git a/.github/workflows/dependencies.yml b/.github/workflows/dependencies.yml index a641b74bccfc..ebc5bcf91c94 100644 --- a/.github/workflows/dependencies.yml +++ b/.github/workflows/dependencies.yml @@ -45,7 +45,7 @@ jobs: - name: Setup Rust toolchain uses: ./.github/actions/setup-builder with: - rust-version: 1.82 + rust-version: stable - name: Check dependencies run: | cd dev/depcheck diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 4920b373c258..39b7b2b17857 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -50,7 +50,7 @@ jobs: - name: Setup Rust toolchain uses: ./.github/actions/setup-builder with: - rust-version: 1.82 + rust-version: stable - name: Cache Cargo uses: actions/cache@v4 @@ -144,7 +144,7 @@ jobs: - name: Setup Rust toolchain uses: ./.github/actions/setup-builder with: - rust-version: 1.82 + rust-version: stable - name: Run tests (excluding doctests) run: cargo test --lib --tests --bins --features avro,json,backtrace - name: Verify Working Directory Clean @@ -163,7 +163,7 @@ jobs: - name: Setup Rust toolchain uses: ./.github/actions/setup-builder with: - rust-version: 1.82 + rust-version: stable - name: Run tests (excluding doctests) run: | cd datafusion-cli @@ -184,7 +184,7 @@ jobs: - name: Setup Rust toolchain uses: ./.github/actions/setup-builder with: - rust-version: 1.82 + rust-version: stable - name: Run examples run: | # test datafusion-sql examples @@ -210,7 +210,7 @@ jobs: - name: Setup Rust toolchain uses: ./.github/actions/setup-builder with: - rust-version: 1.82 + rust-version: stable - name: Run doctests run: | cargo test --doc --features avro,json @@ -231,7 +231,7 @@ jobs: - name: Setup Rust toolchain uses: ./.github/actions/setup-builder with: - rust-version: 1.82 + rust-version: stable - name: Run cargo doc run: ci/scripts/rust_docs.sh @@ -245,7 +245,7 @@ jobs: - name: Setup Rust toolchain uses: ./.github/actions/setup-builder with: - rust-version: 1.82 + rust-version: stable - name: Install wasm-pack run: curl https://rustwasm.github.io/wasm-pack/installer/init.sh -sSf | sh - name: Build with wasm-pack @@ -266,7 +266,7 @@ jobs: - name: Setup Rust toolchain uses: ./.github/actions/setup-builder with: - rust-version: 1.82 + rust-version: stable - name: Generate benchmark data and expected query results run: | mkdir -p datafusion/sqllogictest/test_files/tpch/data @@ -384,7 +384,7 @@ jobs: - name: Setup Rust toolchain uses: ./.github/actions/setup-builder with: - rust-version: 1.82 + rust-version: stable - name: Run datafusion-common tests run: cargo test -p datafusion-common --features=pyarrow @@ -413,7 +413,7 @@ jobs: - name: Setup Rust toolchain uses: ./.github/actions/setup-builder with: - rust-version: 1.82 + rust-version: stable - name: Run run: | echo '' > datafusion/proto/src/generated/datafusion.rs @@ -474,7 +474,7 @@ jobs: - name: Setup Rust toolchain uses: ./.github/actions/setup-builder with: - rust-version: 1.82 + rust-version: stable - name: Install Clippy run: rustup component add clippy - name: Run clippy @@ -494,7 +494,7 @@ jobs: - name: Setup Rust toolchain uses: ./.github/actions/setup-builder with: - rust-version: 1.82 + rust-version: stable - name: Run tests run: | cd datafusion @@ -513,7 +513,7 @@ jobs: - name: Setup Rust toolchain uses: ./.github/actions/setup-builder with: - rust-version: 1.82 + rust-version: stable - name: Install taplo run: cargo +stable install taplo-cli --version ^0.9 --locked # if you encounter an error, try running 'taplo format' to fix the formatting automatically. @@ -533,7 +533,7 @@ jobs: - name: Setup Rust toolchain uses: ./.github/actions/setup-builder with: - rust-version: 1.82 + rust-version: stable - uses: actions/setup-node@v4 with: node-version: "20" diff --git a/datafusion/common/src/functional_dependencies.rs b/datafusion/common/src/functional_dependencies.rs index 31eafc744390..984d8ca267d5 100644 --- a/datafusion/common/src/functional_dependencies.rs +++ b/datafusion/common/src/functional_dependencies.rs @@ -18,13 +18,12 @@ //! FunctionalDependencies keeps track of functional dependencies //! inside DFSchema. -use std::collections::HashSet; use std::fmt::{Display, Formatter}; use std::ops::Deref; use std::vec::IntoIter; use crate::utils::{merge_and_order_indices, set_difference}; -use crate::{DFSchema, JoinType}; +use crate::{DFSchema, HashSet, JoinType}; /// This object defines a constraint on a table. #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)] diff --git a/datafusion/common/src/hash_utils.rs b/datafusion/common/src/hash_utils.rs index 8bd646626e06..92498e42810a 100644 --- a/datafusion/common/src/hash_utils.rs +++ b/datafusion/common/src/hash_utils.rs @@ -322,7 +322,11 @@ fn hash_fixed_list_array( ) -> Result<()> { let values = Arc::clone(array.values()); let value_len = array.value_length(); - let offset_size = value_len as usize / array.len(); + let offset_size = if array.len() == 0 { + 0 + } else { + value_len as usize / array.len() + }; let nulls = array.nulls(); let mut values_hashes = vec![0u64; values.len()]; create_hashes(&[values], random_state, &mut values_hashes)?; @@ -462,6 +466,16 @@ mod tests { Ok(()) } + #[test] + fn create_hashes_for_empty_fixed_size_lit() -> Result<()> { + let empty_array = FixedSizeListBuilder::new(StringBuilder::new(), 1).finish(); + let random_state = RandomState::with_seeds(0, 0, 0, 0); + let hashes_buff = &mut vec![0; 0]; + let hashes = create_hashes(&[Arc::new(empty_array)], &random_state, hashes_buff)?; + assert_eq!(hashes, &Vec::::new()); + Ok(()) + } + #[test] fn create_hashes_for_float_arrays() -> Result<()> { let f32_arr = Arc::new(Float32Array::from(vec![0.12, 0.5, 1f32, 444.7])); diff --git a/datafusion/common/src/lib.rs b/datafusion/common/src/lib.rs index 08431a36e82f..618e88fb8382 100644 --- a/datafusion/common/src/lib.rs +++ b/datafusion/common/src/lib.rs @@ -66,6 +66,7 @@ pub use functional_dependencies::{ get_target_functional_dependencies, Constraint, Constraints, Dependency, FunctionalDependence, FunctionalDependencies, }; +use hashbrown::hash_map::DefaultHashBuilder; pub use join_type::{JoinConstraint, JoinSide, JoinType}; pub use param_value::ParamValues; pub use scalar::{ScalarType, ScalarValue}; @@ -87,6 +88,10 @@ pub use error::{ _substrait_datafusion_err, }; +// The HashMap and HashSet implementations that should be used as the uniform defaults +pub type HashMap = hashbrown::HashMap; +pub type HashSet = hashbrown::HashSet; + /// Downcast an Arrow Array to a concrete type, return an `DataFusionError::Internal` if the cast is /// not possible. In normal usage of DataFusion the downcast should always succeed. /// diff --git a/datafusion/core/src/bin/print_functions_docs.rs b/datafusion/core/src/bin/print_functions_docs.rs index 3aedcbc2aa63..7f3990c53427 100644 --- a/datafusion/core/src/bin/print_functions_docs.rs +++ b/datafusion/core/src/bin/print_functions_docs.rs @@ -16,12 +16,11 @@ // under the License. use datafusion::execution::SessionStateDefaults; -use datafusion_common::{not_impl_err, Result}; +use datafusion_common::{not_impl_err, HashSet, Result}; use datafusion_expr::{ aggregate_doc_sections, scalar_doc_sections, window_doc_sections, AggregateUDF, DocSection, Documentation, ScalarUDF, WindowUDF, }; -use hashbrown::HashSet; use itertools::Itertools; use std::env::args; use std::fmt::Write as _; diff --git a/datafusion/core/src/catalog_common/listing_schema.rs b/datafusion/core/src/catalog_common/listing_schema.rs index 665ea58c5f75..67952770f41c 100644 --- a/datafusion/core/src/catalog_common/listing_schema.rs +++ b/datafusion/core/src/catalog_common/listing_schema.rs @@ -18,14 +18,16 @@ //! [`ListingSchemaProvider`]: [`SchemaProvider`] that scans ObjectStores for tables automatically use std::any::Any; -use std::collections::{HashMap, HashSet}; +use std::collections::HashSet; use std::path::Path; use std::sync::{Arc, Mutex}; use crate::catalog::{SchemaProvider, TableProvider, TableProviderFactory}; use crate::execution::context::SessionState; -use datafusion_common::{Constraints, DFSchema, DataFusionError, TableReference}; +use datafusion_common::{ + Constraints, DFSchema, DataFusionError, HashMap, TableReference, +}; use datafusion_expr::CreateExternalTable; use async_trait::async_trait; diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index b3f54e0773fd..e27a13b6e735 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -63,7 +63,7 @@ use datafusion_physical_plan::metrics::MetricsSet; use async_trait::async_trait; use bytes::Bytes; -use hashbrown::HashMap; +use datafusion_common::HashMap; use log::debug; use object_store::buffered::BufWriter; use parquet::arrow::arrow_writer::{ diff --git a/datafusion/core/src/datasource/listing/helpers.rs b/datafusion/core/src/datasource/listing/helpers.rs index 47012f777ad1..1b3588d9a22d 100644 --- a/datafusion/core/src/datasource/listing/helpers.rs +++ b/datafusion/core/src/datasource/listing/helpers.rs @@ -17,7 +17,6 @@ //! Helper functions for the table implementation -use std::collections::HashMap; use std::mem; use std::sync::Arc; @@ -25,7 +24,7 @@ use super::ListingTableUrl; use super::PartitionedFile; use crate::execution::context::SessionState; use datafusion_common::internal_err; -use datafusion_common::{Result, ScalarValue}; +use datafusion_common::{HashMap, Result, ScalarValue}; use datafusion_expr::{BinaryExpr, Operator}; use arrow::{ diff --git a/datafusion/core/src/physical_optimizer/sort_pushdown.rs b/datafusion/core/src/physical_optimizer/sort_pushdown.rs index 9eb200f534db..1a53077b1fd5 100644 --- a/datafusion/core/src/physical_optimizer/sort_pushdown.rs +++ b/datafusion/core/src/physical_optimizer/sort_pushdown.rs @@ -32,7 +32,7 @@ use crate::physical_plan::{ExecutionPlan, ExecutionPlanProperties}; use datafusion_common::tree_node::{ ConcreteTreeNode, Transformed, TreeNode, TreeNodeRecursion, }; -use datafusion_common::{plan_err, JoinSide, Result}; +use datafusion_common::{plan_err, HashSet, JoinSide, Result}; use datafusion_expr::JoinType; use datafusion_physical_expr::expressions::Column; use datafusion_physical_expr::utils::collect_columns; @@ -41,8 +41,6 @@ use datafusion_physical_expr_common::sort_expr::{ LexOrdering, LexOrderingRef, LexRequirement, }; -use hashbrown::HashSet; - /// This is a "data class" we use within the [`EnforceSorting`] rule to push /// down [`SortExec`] in the plan. In some cases, we can reduce the total /// computational cost by pushing down `SortExec`s through some executors. The diff --git a/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs b/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs index 21f604e6c60f..4cb2b1bfbc5c 100644 --- a/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs @@ -42,8 +42,8 @@ use test_utils::{add_empty_batches, StringBatchGenerator}; use crate::fuzz_cases::aggregation_fuzzer::{ AggregationFuzzerBuilder, ColumnDescr, DatasetGeneratorConfig, QueryBuilder, }; +use datafusion_common::HashMap; use datafusion_physical_expr_common::sort_expr::LexOrdering; -use hashbrown::HashMap; use rand::rngs::StdRng; use rand::{Rng, SeedableRng}; use tokio::task::JoinSet; diff --git a/datafusion/core/tests/fuzz_cases/window_fuzz.rs b/datafusion/core/tests/fuzz_cases/window_fuzz.rs index 5bfb4d97ed70..e883207f7bfa 100644 --- a/datafusion/core/tests/fuzz_cases/window_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/window_fuzz.rs @@ -45,10 +45,10 @@ use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr}; use test_utils::add_empty_batches; use datafusion::functions_window::row_number::row_number_udwf; +use datafusion_common::HashMap; use datafusion_functions_window::lead_lag::{lag_udwf, lead_udwf}; use datafusion_functions_window::rank::{dense_rank_udwf, rank_udwf}; use datafusion_physical_expr_common::sort_expr::LexOrdering; -use hashbrown::HashMap; use rand::distributions::Alphanumeric; use rand::rngs::StdRng; use rand::{Rng, SeedableRng}; diff --git a/datafusion/core/tests/memory_limit/mod.rs b/datafusion/core/tests/memory_limit/mod.rs index 6817969580da..c431cd6303db 100644 --- a/datafusion/core/tests/memory_limit/mod.rs +++ b/datafusion/core/tests/memory_limit/mod.rs @@ -31,6 +31,7 @@ use datafusion_execution::memory_pool::{ }; use datafusion_expr::{Expr, TableType}; use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr}; +use datafusion_physical_plan::spill::get_record_batch_memory_size; use futures::StreamExt; use std::any::Any; use std::num::NonZeroUsize; @@ -265,6 +266,10 @@ async fn sort_spill_reservation() { // This test case shows how sort_spill_reservation works by // purposely sorting data that requires non trivial memory to // sort/merge. + + // Merge operation needs extra memory to do row conversion, so make the + // memory limit larger. + let mem_limit = partition_size * 2; let test = TestCase::new() // This query uses a different order than the input table to // force a sort. It also needs to have multiple columns to @@ -272,7 +277,7 @@ async fn sort_spill_reservation() { // substantial memory .with_query("select * from t ORDER BY a , b DESC") // enough memory to sort if we don't try to merge it all at once - .with_memory_limit(partition_size) + .with_memory_limit(mem_limit) // use a single partition so only a sort is needed .with_scenario(scenario) .with_disk_manager_config(DiskManagerConfig::NewOs) @@ -311,7 +316,7 @@ async fn sort_spill_reservation() { // reserve sufficient space up front for merge and this time, // which will force the spills to happen with less buffered // input and thus with enough to merge. - .with_sort_spill_reservation_bytes(partition_size / 2); + .with_sort_spill_reservation_bytes(mem_limit / 2); test.with_config(config).with_expected_success().run().await; } @@ -774,7 +779,7 @@ fn make_dict_batches() -> Vec { // How many bytes does the memory from dict_batches consume? fn batches_byte_size(batches: &[RecordBatch]) -> usize { - batches.iter().map(|b| b.get_array_memory_size()).sum() + batches.iter().map(get_record_batch_memory_size).sum() } #[derive(Debug)] diff --git a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs index f1b172862399..8453a360cd25 100644 --- a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs +++ b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs @@ -16,7 +16,6 @@ // under the License. use std::any::Any; -use std::collections::HashMap; use std::hash::{DefaultHasher, Hash, Hasher}; use std::sync::Arc; @@ -39,7 +38,8 @@ use datafusion_common::cast::{as_float64_array, as_int32_array}; use datafusion_common::tree_node::{Transformed, TreeNode}; use datafusion_common::{ assert_batches_eq, assert_batches_sorted_eq, assert_contains, exec_err, internal_err, - not_impl_err, plan_err, DFSchema, DataFusionError, ExprSchema, Result, ScalarValue, + not_impl_err, plan_err, DFSchema, DataFusionError, ExprSchema, HashMap, Result, + ScalarValue, }; use datafusion_expr::simplify::{ExprSimplifyResult, SimplifyInfo}; use datafusion_expr::{ diff --git a/datafusion/execution/src/memory_pool/pool.rs b/datafusion/execution/src/memory_pool/pool.rs index e169c1f319cc..c2ec42d0df1e 100644 --- a/datafusion/execution/src/memory_pool/pool.rs +++ b/datafusion/execution/src/memory_pool/pool.rs @@ -16,8 +16,8 @@ // under the License. use crate::memory_pool::{MemoryConsumer, MemoryPool, MemoryReservation}; +use datafusion_common::HashMap; use datafusion_common::{resources_datafusion_err, DataFusionError, Result}; -use hashbrown::HashMap; use log::debug; use parking_lot::Mutex; use std::{ diff --git a/datafusion/expr/src/conditional_expressions.rs b/datafusion/expr/src/conditional_expressions.rs index 23cc88f1c0ff..9cb51612d0ca 100644 --- a/datafusion/expr/src/conditional_expressions.rs +++ b/datafusion/expr/src/conditional_expressions.rs @@ -19,8 +19,7 @@ use crate::expr::Case; use crate::{expr_schema::ExprSchemable, Expr}; use arrow::datatypes::DataType; -use datafusion_common::{plan_err, DFSchema, Result}; -use std::collections::HashSet; +use datafusion_common::{plan_err, DFSchema, HashSet, Result}; /// Helper struct for building [Expr::Case] pub struct CaseBuilder { diff --git a/datafusion/expr/src/execution_props.rs b/datafusion/expr/src/execution_props.rs index 3401a94b2736..d672bd1acc46 100644 --- a/datafusion/expr/src/execution_props.rs +++ b/datafusion/expr/src/execution_props.rs @@ -18,7 +18,7 @@ use crate::var_provider::{VarProvider, VarType}; use chrono::{DateTime, TimeZone, Utc}; use datafusion_common::alias::AliasGenerator; -use std::collections::HashMap; +use datafusion_common::HashMap; use std::sync::Arc; /// Holds per-query execution properties and data (such as statement diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs index a9c183952fc7..d3a3852a1eaa 100644 --- a/datafusion/expr/src/expr.rs +++ b/datafusion/expr/src/expr.rs @@ -17,7 +17,7 @@ //! Logical Expressions: [`Expr`] -use std::collections::{HashMap, HashSet}; +use std::collections::HashSet; use std::fmt::{self, Display, Formatter, Write}; use std::hash::{Hash, Hasher}; use std::mem; @@ -39,7 +39,7 @@ use datafusion_common::tree_node::{ Transformed, TransformedResult, TreeNode, TreeNodeRecursion, }; use datafusion_common::{ - plan_err, Column, DFSchema, Result, ScalarValue, TableReference, + plan_err, Column, DFSchema, HashMap, Result, ScalarValue, TableReference, }; use datafusion_functions_window_common::field::WindowUDFFieldArgs; use sqlparser::ast::{ diff --git a/datafusion/expr/src/registry.rs b/datafusion/expr/src/registry.rs index 6d3457f70d4c..4eb49710bcf8 100644 --- a/datafusion/expr/src/registry.rs +++ b/datafusion/expr/src/registry.rs @@ -20,8 +20,8 @@ use crate::expr_rewriter::FunctionRewrite; use crate::planner::ExprPlanner; use crate::{AggregateUDF, ScalarUDF, UserDefinedLogicalNode, WindowUDF}; -use datafusion_common::{not_impl_err, plan_datafusion_err, Result}; -use std::collections::{HashMap, HashSet}; +use datafusion_common::{not_impl_err, plan_datafusion_err, HashMap, Result}; +use std::collections::HashSet; use std::fmt::Debug; use std::sync::Arc; diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index 29c62440abb1..c22ee244fe28 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -18,7 +18,7 @@ //! Expression utilities use std::cmp::Ordering; -use std::collections::{HashMap, HashSet}; +use std::collections::HashSet; use std::ops::Deref; use std::sync::Arc; @@ -36,7 +36,7 @@ use datafusion_common::tree_node::{ use datafusion_common::utils::get_at_indices; use datafusion_common::{ internal_err, plan_datafusion_err, plan_err, Column, DFSchema, DFSchemaRef, - DataFusionError, Result, TableReference, + DataFusionError, HashMap, Result, TableReference, }; use indexmap::IndexSet; diff --git a/datafusion/functions-aggregate/src/median.rs b/datafusion/functions-aggregate/src/median.rs index ff0a930d490b..a7114bb68bfd 100644 --- a/datafusion/functions-aggregate/src/median.rs +++ b/datafusion/functions-aggregate/src/median.rs @@ -15,7 +15,6 @@ // specific language governing permissions and limitations // under the License. -use std::collections::HashSet; use std::fmt::{Debug, Formatter}; use std::mem::{size_of, size_of_val}; use std::sync::{Arc, OnceLock}; @@ -33,7 +32,7 @@ use arrow::array::Array; use arrow::array::ArrowNativeTypeOp; use arrow::datatypes::ArrowNativeType; -use datafusion_common::{DataFusionError, Result, ScalarValue}; +use datafusion_common::{DataFusionError, HashSet, Result, ScalarValue}; use datafusion_expr::aggregate_doc_sections::DOC_SECTION_GENERAL; use datafusion_expr::function::StateFieldsArgs; use datafusion_expr::{ diff --git a/datafusion/functions-aggregate/src/regr.rs b/datafusion/functions-aggregate/src/regr.rs index bf1e81949d23..9dd13634ff2d 100644 --- a/datafusion/functions-aggregate/src/regr.rs +++ b/datafusion/functions-aggregate/src/regr.rs @@ -24,8 +24,10 @@ use arrow::{ datatypes::DataType, datatypes::Field, }; -use datafusion_common::{downcast_value, plan_err, unwrap_or_internal_err, ScalarValue}; -use datafusion_common::{DataFusionError, Result}; +use datafusion_common::{ + downcast_value, plan_err, unwrap_or_internal_err, DataFusionError, HashMap, Result, + ScalarValue, +}; use datafusion_expr::aggregate_doc_sections::DOC_SECTION_STATISTICAL; use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs}; use datafusion_expr::type_coercion::aggregates::NUMERICS; @@ -34,7 +36,6 @@ use datafusion_expr::{ Accumulator, AggregateUDFImpl, Documentation, Signature, Volatility, }; use std::any::Any; -use std::collections::HashMap; use std::fmt::Debug; use std::mem::size_of_val; use std::sync::OnceLock; diff --git a/datafusion/functions-nested/src/except.rs b/datafusion/functions-nested/src/except.rs index 947d3c018221..100fb587d642 100644 --- a/datafusion/functions-nested/src/except.rs +++ b/datafusion/functions-nested/src/except.rs @@ -23,13 +23,12 @@ use arrow_array::cast::AsArray; use arrow_array::{Array, ArrayRef, GenericListArray, OffsetSizeTrait}; use arrow_buffer::OffsetBuffer; use arrow_schema::{DataType, FieldRef}; -use datafusion_common::{exec_err, internal_err, Result}; +use datafusion_common::{exec_err, internal_err, HashSet, Result}; use datafusion_expr::scalar_doc_sections::DOC_SECTION_ARRAY; use datafusion_expr::{ ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility, }; use std::any::Any; -use std::collections::HashSet; use std::sync::{Arc, OnceLock}; make_udf_expr_and_func!( diff --git a/datafusion/functions-nested/src/map.rs b/datafusion/functions-nested/src/map.rs index d7dce3bacbe1..cad193910cee 100644 --- a/datafusion/functions-nested/src/map.rs +++ b/datafusion/functions-nested/src/map.rs @@ -16,7 +16,7 @@ // under the License. use std::any::Any; -use std::collections::{HashSet, VecDeque}; +use std::collections::VecDeque; use std::sync::{Arc, OnceLock}; use arrow::array::ArrayData; @@ -25,7 +25,7 @@ use arrow_buffer::{Buffer, ToByteSlice}; use arrow_schema::{DataType, Field, SchemaBuilder}; use datafusion_common::utils::{fixed_size_list_to_arrays, list_to_arrays}; -use datafusion_common::{exec_err, Result, ScalarValue}; +use datafusion_common::{exec_err, HashSet, Result, ScalarValue}; use datafusion_expr::expr::ScalarFunction; use datafusion_expr::scalar_doc_sections::DOC_SECTION_MAP; use datafusion_expr::{ diff --git a/datafusion/functions/src/core/named_struct.rs b/datafusion/functions/src/core/named_struct.rs index b2c7f06d5868..d53dd2277f84 100644 --- a/datafusion/functions/src/core/named_struct.rs +++ b/datafusion/functions/src/core/named_struct.rs @@ -17,11 +17,10 @@ use arrow::array::StructArray; use arrow::datatypes::{DataType, Field, Fields}; -use datafusion_common::{exec_err, internal_err, Result, ScalarValue}; +use datafusion_common::{exec_err, internal_err, HashSet, Result, ScalarValue}; use datafusion_expr::scalar_doc_sections::DOC_SECTION_STRUCT; use datafusion_expr::{ColumnarValue, Documentation, Expr, ExprSchemable}; use datafusion_expr::{ScalarUDFImpl, Signature, Volatility}; -use hashbrown::HashSet; use std::any::Any; use std::sync::{Arc, OnceLock}; diff --git a/datafusion/functions/src/unicode/translate.rs b/datafusion/functions/src/unicode/translate.rs index fa626b396b3b..845d34c708d4 100644 --- a/datafusion/functions/src/unicode/translate.rs +++ b/datafusion/functions/src/unicode/translate.rs @@ -22,7 +22,7 @@ use arrow::array::{ ArrayAccessor, ArrayIter, ArrayRef, AsArray, GenericStringArray, OffsetSizeTrait, }; use arrow::datatypes::DataType; -use hashbrown::HashMap; +use datafusion_common::HashMap; use unicode_segmentation::UnicodeSegmentation; use crate::utils::{make_scalar_function, utf8_to_str_type}; diff --git a/datafusion/optimizer/src/decorrelate.rs b/datafusion/optimizer/src/decorrelate.rs index 6aa59b77f7f9..b5726d999137 100644 --- a/datafusion/optimizer/src/decorrelate.rs +++ b/datafusion/optimizer/src/decorrelate.rs @@ -17,7 +17,7 @@ //! [`PullUpCorrelatedExpr`] converts correlated subqueries to `Joins` -use std::collections::{BTreeSet, HashMap}; +use std::collections::BTreeSet; use std::ops::Deref; use std::sync::Arc; @@ -27,7 +27,7 @@ use crate::utils::collect_subquery_cols; use datafusion_common::tree_node::{ Transformed, TransformedResult, TreeNode, TreeNodeRecursion, TreeNodeRewriter, }; -use datafusion_common::{plan_err, Column, DFSchemaRef, Result, ScalarValue}; +use datafusion_common::{plan_err, Column, DFSchemaRef, HashMap, Result, ScalarValue}; use datafusion_expr::expr::Alias; use datafusion_expr::simplify::SimplifyContext; use datafusion_expr::utils::{conjunction, find_join_exprs, split_conjunction}; diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs b/datafusion/optimizer/src/optimize_projections/mod.rs index ec2225bbc042..67d888abda52 100644 --- a/datafusion/optimizer/src/optimize_projections/mod.rs +++ b/datafusion/optimizer/src/optimize_projections/mod.rs @@ -19,7 +19,7 @@ mod required_indices; -use std::collections::{HashMap, HashSet}; +use std::collections::HashSet; use std::sync::Arc; use crate::optimizer::ApplyOrder; @@ -27,7 +27,7 @@ use crate::{OptimizerConfig, OptimizerRule}; use datafusion_common::{ get_required_group_by_exprs_indices, internal_datafusion_err, internal_err, Column, - JoinType, Result, + HashMap, JoinType, Result, }; use datafusion_expr::expr::Alias; use datafusion_expr::Unnest; diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index 90a790a0e841..975150cd6122 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -17,7 +17,6 @@ //! [`Optimizer`] and [`OptimizerRule`] -use std::collections::HashSet; use std::fmt::Debug; use std::sync::Arc; @@ -29,7 +28,7 @@ use datafusion_common::alias::AliasGenerator; use datafusion_common::config::ConfigOptions; use datafusion_common::instant::Instant; use datafusion_common::tree_node::{Transformed, TreeNodeRewriter}; -use datafusion_common::{internal_err, DFSchema, DataFusionError, Result}; +use datafusion_common::{internal_err, DFSchema, DataFusionError, HashSet, Result}; use datafusion_expr::logical_plan::LogicalPlan; use crate::common_subexpr_eliminate::CommonSubexprEliminate; diff --git a/datafusion/optimizer/src/single_distinct_to_groupby.rs b/datafusion/optimizer/src/single_distinct_to_groupby.rs index 01875349c922..c8f3a4bc7859 100644 --- a/datafusion/optimizer/src/single_distinct_to_groupby.rs +++ b/datafusion/optimizer/src/single_distinct_to_groupby.rs @@ -22,7 +22,9 @@ use std::sync::Arc; use crate::optimizer::ApplyOrder; use crate::{OptimizerConfig, OptimizerRule}; -use datafusion_common::{internal_err, tree_node::Transformed, DataFusionError, Result}; +use datafusion_common::{ + internal_err, tree_node::Transformed, DataFusionError, HashSet, Result, +}; use datafusion_expr::builder::project; use datafusion_expr::{ col, @@ -31,8 +33,6 @@ use datafusion_expr::{ Expr, }; -use hashbrown::HashSet; - /// single distinct to group by optimizer rule /// ```text /// Before: diff --git a/datafusion/physical-expr-common/src/binary_view_map.rs b/datafusion/physical-expr-common/src/binary_view_map.rs index c6768a19d30e..e131ad8f5085 100644 --- a/datafusion/physical-expr-common/src/binary_view_map.rs +++ b/datafusion/physical-expr-common/src/binary_view_map.rs @@ -393,7 +393,7 @@ where #[cfg(test)] mod tests { use arrow::array::{BinaryViewArray, GenericByteViewArray, StringViewArray}; - use hashbrown::HashMap; + use datafusion_common::HashMap; use super::*; diff --git a/datafusion/physical-expr/src/expressions/in_list.rs b/datafusion/physical-expr/src/expressions/in_list.rs index cf57ce3e0e21..1a3cd7600bbb 100644 --- a/datafusion/physical-expr/src/expressions/in_list.rs +++ b/datafusion/physical-expr/src/expressions/in_list.rs @@ -44,8 +44,8 @@ use datafusion_expr::ColumnarValue; use datafusion_physical_expr_common::datum::compare_with_eq; use ahash::RandomState; +use datafusion_common::HashMap; use hashbrown::hash_map::RawEntryMut; -use hashbrown::HashMap; /// InList pub struct InListExpr { diff --git a/datafusion/physical-expr/src/utils/guarantee.rs b/datafusion/physical-expr/src/utils/guarantee.rs index fbb59cc92fa0..2c37c4d8b394 100644 --- a/datafusion/physical-expr/src/utils/guarantee.rs +++ b/datafusion/physical-expr/src/utils/guarantee.rs @@ -20,9 +20,9 @@ use crate::utils::split_disjunction; use crate::{split_conjunction, PhysicalExpr}; -use datafusion_common::{Column, ScalarValue}; +use datafusion_common::{Column, HashMap, ScalarValue}; use datafusion_expr::Operator; -use std::collections::{HashMap, HashSet}; +use std::collections::HashSet; use std::fmt::{self, Display, Formatter}; use std::sync::Arc; diff --git a/datafusion/physical-expr/src/utils/mod.rs b/datafusion/physical-expr/src/utils/mod.rs index c3d1b1425b7f..73d744b4b614 100644 --- a/datafusion/physical-expr/src/utils/mod.rs +++ b/datafusion/physical-expr/src/utils/mod.rs @@ -17,10 +17,8 @@ mod guarantee; pub use guarantee::{Guarantee, LiteralGuarantee}; -use hashbrown::HashSet; use std::borrow::Borrow; -use std::collections::HashMap; use std::sync::Arc; use crate::expressions::{BinaryExpr, Column}; @@ -32,7 +30,7 @@ use arrow::datatypes::SchemaRef; use datafusion_common::tree_node::{ Transformed, TransformedResult, TreeNode, TreeNodeRecursion, }; -use datafusion_common::Result; +use datafusion_common::{HashMap, HashSet, Result}; use datafusion_expr::Operator; use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexOrderingRef}; diff --git a/datafusion/physical-plan/src/joins/sort_merge_join.rs b/datafusion/physical-plan/src/joins/sort_merge_join.rs index 3ad892c880f6..2f6dc5fa0bc1 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join.rs @@ -43,8 +43,8 @@ use arrow::error::ArrowError; use arrow::ipc::reader::FileReader; use arrow_array::types::UInt64Type; use datafusion_common::{ - exec_err, internal_err, not_impl_err, plan_err, DataFusionError, JoinSide, JoinType, - Result, + exec_err, internal_err, not_impl_err, plan_err, DataFusionError, HashSet, JoinSide, + JoinType, Result, }; use datafusion_execution::disk_manager::RefCountedTempFile; use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; @@ -54,7 +54,6 @@ use datafusion_physical_expr::equivalence::join_equivalence_properties; use datafusion_physical_expr::{PhysicalExprRef, PhysicalSortRequirement}; use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement}; use futures::{Stream, StreamExt}; -use hashbrown::HashSet; use crate::expressions::PhysicalSortExpr; use crate::joins::utils::{ diff --git a/datafusion/physical-plan/src/joins/stream_join_utils.rs b/datafusion/physical-plan/src/joins/stream_join_utils.rs index 5ccdd9b40dee..f08ce0ea2f0f 100644 --- a/datafusion/physical-plan/src/joins/stream_join_utils.rs +++ b/datafusion/physical-plan/src/joins/stream_join_utils.rs @@ -32,7 +32,7 @@ use arrow_buffer::{ArrowNativeType, BooleanBufferBuilder}; use arrow_schema::{Schema, SchemaRef}; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::{ - arrow_datafusion_err, DataFusionError, JoinSide, Result, ScalarValue, + arrow_datafusion_err, DataFusionError, HashSet, JoinSide, Result, ScalarValue, }; use datafusion_expr::interval_arithmetic::Interval; use datafusion_physical_expr::expressions::Column; @@ -42,7 +42,6 @@ use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr}; use datafusion_physical_expr_common::sort_expr::LexOrderingRef; use hashbrown::raw::RawTable; -use hashbrown::HashSet; /// Implementation of `JoinHashMapType` for `PruningJoinHashMap`. impl JoinHashMapType for PruningJoinHashMap { diff --git a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs index 5b6dc2cd2ae9..f082bdbdd3f9 100644 --- a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs +++ b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs @@ -64,7 +64,7 @@ use arrow::record_batch::RecordBatch; use arrow_buffer::ArrowNativeType; use datafusion_common::hash_utils::create_hashes; use datafusion_common::utils::bisect; -use datafusion_common::{internal_err, plan_err, JoinSide, JoinType, Result}; +use datafusion_common::{internal_err, plan_err, HashSet, JoinSide, JoinType, Result}; use datafusion_execution::memory_pool::MemoryConsumer; use datafusion_execution::TaskContext; use datafusion_expr::interval_arithmetic::Interval; @@ -77,7 +77,6 @@ use datafusion_physical_expr_common::sort_expr::{ LexOrdering, LexOrderingRef, LexRequirement, }; use futures::{ready, Stream, StreamExt}; -use hashbrown::HashSet; use parking_lot::Mutex; const HASHMAP_SHRINK_SCALE_FACTOR: usize = 4; diff --git a/datafusion/physical-plan/src/metrics/mod.rs b/datafusion/physical-plan/src/metrics/mod.rs index ead0ca336938..4712729bdaf5 100644 --- a/datafusion/physical-plan/src/metrics/mod.rs +++ b/datafusion/physical-plan/src/metrics/mod.rs @@ -28,7 +28,7 @@ use std::{ sync::Arc, }; -use hashbrown::HashMap; +use datafusion_common::HashMap; // public exports pub use baseline::{BaselineMetrics, RecordOutput}; diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index bc65b251561b..4d0dbc75d40a 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -50,10 +50,10 @@ use datafusion_execution::TaskContext; use datafusion_physical_expr::{EquivalenceProperties, PhysicalExpr}; use crate::execution_plan::CardinalityEffect; +use datafusion_common::HashMap; use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr}; use futures::stream::Stream; use futures::{FutureExt, StreamExt, TryStreamExt}; -use hashbrown::HashMap; use log::trace; use parking_lot::Mutex; diff --git a/datafusion/physical-plan/src/sorts/builder.rs b/datafusion/physical-plan/src/sorts/builder.rs index d32c60697ec8..9b2fa968222c 100644 --- a/datafusion/physical-plan/src/sorts/builder.rs +++ b/datafusion/physical-plan/src/sorts/builder.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use crate::spill::get_record_batch_memory_size; use arrow::compute::interleave; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; @@ -69,7 +70,8 @@ impl BatchBuilder { /// Append a new batch in `stream_idx` pub fn push_batch(&mut self, stream_idx: usize, batch: RecordBatch) -> Result<()> { - self.reservation.try_grow(batch.get_array_memory_size())?; + self.reservation + .try_grow(get_record_batch_memory_size(&batch))?; let batch_idx = self.batches.len(); self.batches.push((stream_idx, batch)); self.cursors[stream_idx] = BatchCursor { @@ -141,7 +143,7 @@ impl BatchBuilder { stream_cursor.batch_idx = retained; retained += 1; } else { - self.reservation.shrink(batch.get_array_memory_size()); + self.reservation.shrink(get_record_batch_memory_size(batch)); } retain }); diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index d90d0f64ceb4..3206d6dd4b9e 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -31,7 +31,9 @@ use crate::metrics::{ BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet, }; use crate::sorts::streaming_merge::StreamingMergeBuilder; -use crate::spill::{read_spill_as_stream, spill_record_batches}; +use crate::spill::{ + get_record_batch_memory_size, read_spill_as_stream, spill_record_batches, +}; use crate::stream::RecordBatchStreamAdapter; use crate::topk::TopK; use crate::{ @@ -288,10 +290,12 @@ impl ExternalSorter { } self.reserve_memory_for_merge()?; - let size = input.get_array_memory_size(); + let size = get_record_batch_memory_size(&input); + if self.reservation.try_grow(size).is_err() { let before = self.reservation.size(); self.in_mem_sort().await?; + // Sorting may have freed memory, especially if fetch is `Some` // // As such we check again, and if the memory usage has dropped by @@ -426,7 +430,7 @@ impl ExternalSorter { let size: usize = self .in_mem_batches .iter() - .map(|x| x.get_array_memory_size()) + .map(get_record_batch_memory_size) .sum(); // Reserve headroom for next sort/merge @@ -521,7 +525,8 @@ impl ExternalSorter { // Concatenate memory batches together and sort let batch = concat_batches(&self.schema, &self.in_mem_batches)?; self.in_mem_batches.clear(); - self.reservation.try_resize(batch.get_array_memory_size())?; + self.reservation + .try_resize(get_record_batch_memory_size(&batch))?; let reservation = self.reservation.take(); return self.sort_batch_stream(batch, metrics, reservation); } @@ -530,7 +535,8 @@ impl ExternalSorter { .into_iter() .map(|batch| { let metrics = self.metrics.baseline.intermediate(); - let reservation = self.reservation.split(batch.get_array_memory_size()); + let reservation = + self.reservation.split(get_record_batch_memory_size(&batch)); let input = self.sort_batch_stream(batch, metrics, reservation)?; Ok(spawn_buffered(input, 1)) }) @@ -557,7 +563,7 @@ impl ExternalSorter { metrics: BaselineMetrics, reservation: MemoryReservation, ) -> Result { - assert_eq!(batch.get_array_memory_size(), reservation.size()); + assert_eq!(get_record_batch_memory_size(&batch), reservation.size()); let schema = batch.schema(); let fetch = self.fetch; @@ -1187,9 +1193,9 @@ mod tests { assert_eq!(metrics.output_rows().unwrap(), 10000); assert!(metrics.elapsed_compute().unwrap() > 0); - assert_eq!(metrics.spill_count().unwrap(), 4); - assert_eq!(metrics.spilled_bytes().unwrap(), 38784); - assert_eq!(metrics.spilled_rows().unwrap(), 9600); + assert_eq!(metrics.spill_count().unwrap(), 3); + assert_eq!(metrics.spilled_bytes().unwrap(), 36000); + assert_eq!(metrics.spilled_rows().unwrap(), 9000); let columns = result[0].columns(); diff --git a/datafusion/physical-plan/src/spill.rs b/datafusion/physical-plan/src/spill.rs index de85a7c6f098..dee2d09862d3 100644 --- a/datafusion/physical-plan/src/spill.rs +++ b/datafusion/physical-plan/src/spill.rs @@ -20,14 +20,16 @@ use std::fs::File; use std::io::BufReader; use std::path::{Path, PathBuf}; +use std::ptr::NonNull; +use arrow::array::ArrayData; use arrow::datatypes::SchemaRef; use arrow::ipc::reader::FileReader; use arrow::record_batch::RecordBatch; use log::debug; use tokio::sync::mpsc::Sender; -use datafusion_common::{exec_datafusion_err, Result}; +use datafusion_common::{exec_datafusion_err, HashSet, Result}; use datafusion_execution::disk_manager::RefCountedTempFile; use datafusion_execution::memory_pool::human_readable_size; use datafusion_execution::SendableRecordBatchStream; @@ -109,10 +111,83 @@ pub fn spill_record_batch_by_size( Ok(()) } +/// Calculate total used memory of this batch. +/// +/// This function is used to estimate the physical memory usage of the `RecordBatch`. +/// It only counts the memory of large data `Buffer`s, and ignores metadata like +/// types and pointers. +/// The implementation will add up all unique `Buffer`'s memory +/// size, due to: +/// - The data pointer inside `Buffer` are memory regions returned by global memory +/// allocator, those regions can't have overlap. +/// - The actual used range of `ArrayRef`s inside `RecordBatch` can have overlap +/// or reuse the same `Buffer`. For example: taking a slice from `Array`. +/// +/// Example: +/// For a `RecordBatch` with two columns: `col1` and `col2`, two columns are pointing +/// to a sub-region of the same buffer. +/// +/// {xxxxxxxxxxxxxxxxxxx} <--- buffer +/// ^ ^ ^ ^ +/// | | | | +/// col1->{ } | | +/// col2--------->{ } +/// +/// In the above case, `get_record_batch_memory_size` will return the size of +/// the buffer, instead of the sum of `col1` and `col2`'s actual memory size. +/// +/// Note: Current `RecordBatch`.get_array_memory_size()` will double count the +/// buffer memory size if multiple arrays within the batch are sharing the same +/// `Buffer`. This method provides temporary fix until the issue is resolved: +/// +pub fn get_record_batch_memory_size(batch: &RecordBatch) -> usize { + // Store pointers to `Buffer`'s start memory address (instead of actual + // used data region's pointer represented by current `Array`) + let mut counted_buffers: HashSet> = HashSet::new(); + let mut total_size = 0; + + for array in batch.columns() { + let array_data = array.to_data(); + count_array_data_memory_size(&array_data, &mut counted_buffers, &mut total_size); + } + + total_size +} + +/// Count the memory usage of `array_data` and its children recursively. +fn count_array_data_memory_size( + array_data: &ArrayData, + counted_buffers: &mut HashSet>, + total_size: &mut usize, +) { + // Count memory usage for `array_data` + for buffer in array_data.buffers() { + if counted_buffers.insert(buffer.data_ptr()) { + *total_size += buffer.capacity(); + } // Otherwise the buffer's memory is already counted + } + + if let Some(null_buffer) = array_data.nulls() { + if counted_buffers.insert(null_buffer.inner().inner().data_ptr()) { + *total_size += null_buffer.inner().inner().capacity(); + } + } + + // Count all children `ArrayData` recursively + for child in array_data.child_data() { + count_array_data_memory_size(child, counted_buffers, total_size); + } +} + #[cfg(test)] mod tests { + use super::*; use crate::spill::{spill_record_batch_by_size, spill_record_batches}; use crate::test::build_table_i32; + use arrow::array::{Float64Array, Int32Array}; + use arrow::datatypes::{DataType, Field, Int32Type, Schema}; + use arrow::record_batch::RecordBatch; + use arrow_array::ListArray; use datafusion_common::Result; use datafusion_execution::disk_manager::DiskManagerConfig; use datafusion_execution::DiskManager; @@ -147,7 +222,7 @@ mod tests { assert_eq!(cnt.unwrap(), num_rows); let file = BufReader::new(File::open(spill_file.path())?); - let reader = arrow::ipc::reader::FileReader::try_new(file, None)?; + let reader = FileReader::try_new(file, None)?; assert_eq!(reader.num_batches(), 2); assert_eq!(reader.schema(), schema); @@ -175,11 +250,138 @@ mod tests { )?; let file = BufReader::new(File::open(spill_file.path())?); - let reader = arrow::ipc::reader::FileReader::try_new(file, None)?; + let reader = FileReader::try_new(file, None)?; assert_eq!(reader.num_batches(), 4); assert_eq!(reader.schema(), schema); Ok(()) } + + #[test] + fn test_get_record_batch_memory_size() { + // Create a simple record batch with two columns + let schema = Arc::new(Schema::new(vec![ + Field::new("ints", DataType::Int32, true), + Field::new("float64", DataType::Float64, false), + ])); + + let int_array = + Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4), Some(5)]); + let float64_array = Float64Array::from(vec![1.0, 2.0, 3.0, 4.0, 5.0]); + + let batch = RecordBatch::try_new( + schema, + vec![Arc::new(int_array), Arc::new(float64_array)], + ) + .unwrap(); + + let size = get_record_batch_memory_size(&batch); + assert_eq!(size, 60); + } + + #[test] + fn test_get_record_batch_memory_size_with_null() { + // Create a simple record batch with two columns + let schema = Arc::new(Schema::new(vec![ + Field::new("ints", DataType::Int32, true), + Field::new("float64", DataType::Float64, false), + ])); + + let int_array = Int32Array::from(vec![None, Some(2), Some(3)]); + let float64_array = Float64Array::from(vec![1.0, 2.0, 3.0]); + + let batch = RecordBatch::try_new( + schema, + vec![Arc::new(int_array), Arc::new(float64_array)], + ) + .unwrap(); + + let size = get_record_batch_memory_size(&batch); + assert_eq!(size, 100); + } + + #[test] + fn test_get_record_batch_memory_size_empty() { + // Test with empty record batch + let schema = Arc::new(Schema::new(vec![Field::new( + "ints", + DataType::Int32, + false, + )])); + + let int_array: Int32Array = Int32Array::from(vec![] as Vec); + let batch = RecordBatch::try_new(schema, vec![Arc::new(int_array)]).unwrap(); + + let size = get_record_batch_memory_size(&batch); + assert_eq!(size, 0, "Empty batch should have 0 memory size"); + } + + #[test] + fn test_get_record_batch_memory_size_shared_buffer() { + // Test with slices that share the same underlying buffer + let original = Int32Array::from(vec![1, 2, 3, 4, 5]); + let slice1 = original.slice(0, 3); + let slice2 = original.slice(2, 3); + + // `RecordBatch` with `original` array + // ---- + let schema_origin = Arc::new(Schema::new(vec![Field::new( + "origin_col", + DataType::Int32, + false, + )])); + let batch_origin = + RecordBatch::try_new(schema_origin, vec![Arc::new(original)]).unwrap(); + + // `RecordBatch` with all columns are reference to `original` array + // ---- + let schema = Arc::new(Schema::new(vec![ + Field::new("slice1", DataType::Int32, false), + Field::new("slice2", DataType::Int32, false), + ])); + + let batch_sliced = + RecordBatch::try_new(schema, vec![Arc::new(slice1), Arc::new(slice2)]) + .unwrap(); + + // Two sizes should all be only counting the buffer in `original` array + let size_origin = get_record_batch_memory_size(&batch_origin); + let size_sliced = get_record_batch_memory_size(&batch_sliced); + + assert_eq!(size_origin, size_sliced); + } + + #[test] + fn test_get_record_batch_memory_size_nested_array() { + let schema = Arc::new(Schema::new(vec![ + Field::new( + "nested_int", + DataType::List(Arc::new(Field::new("item", DataType::Int32, true))), + false, + ), + Field::new( + "nested_int2", + DataType::List(Arc::new(Field::new("item", DataType::Int32, true))), + false, + ), + ])); + + let int_list_array = ListArray::from_iter_primitive::(vec![ + Some(vec![Some(1), Some(2), Some(3)]), + ]); + + let int_list_array2 = ListArray::from_iter_primitive::(vec![ + Some(vec![Some(4), Some(5), Some(6)]), + ]); + + let batch = RecordBatch::try_new( + schema, + vec![Arc::new(int_list_array), Arc::new(int_list_array2)], + ) + .unwrap(); + + let size = get_record_batch_memory_size(&batch); + assert_eq!(size, 8320); + } } diff --git a/datafusion/physical-plan/src/topk/mod.rs b/datafusion/physical-plan/src/topk/mod.rs index 14469ab6c0d9..27bb3b2b36b9 100644 --- a/datafusion/physical-plan/src/topk/mod.rs +++ b/datafusion/physical-plan/src/topk/mod.rs @@ -27,6 +27,7 @@ use std::{cmp::Ordering, collections::BinaryHeap, sync::Arc}; use crate::{stream::RecordBatchStreamAdapter, SendableRecordBatchStream}; use arrow_array::{Array, ArrayRef, RecordBatch}; use arrow_schema::SchemaRef; +use datafusion_common::HashMap; use datafusion_common::Result; use datafusion_execution::{ memory_pool::{MemoryConsumer, MemoryReservation}, @@ -34,7 +35,6 @@ use datafusion_execution::{ }; use datafusion_physical_expr::PhysicalSortExpr; use datafusion_physical_expr_common::sort_expr::LexOrdering; -use hashbrown::HashMap; use super::metrics::{BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder}; diff --git a/datafusion/physical-plan/src/unnest.rs b/datafusion/physical-plan/src/unnest.rs index b7b9f17eb1b6..06288a1f7041 100644 --- a/datafusion/physical-plan/src/unnest.rs +++ b/datafusion/physical-plan/src/unnest.rs @@ -18,7 +18,6 @@ //! Define a plan for unnesting values in columns that contain a list type. use std::cmp::{self, Ordering}; -use std::collections::HashMap; use std::{any::Any, sync::Arc}; use super::metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet}; @@ -40,14 +39,13 @@ use arrow::record_batch::RecordBatch; use arrow_array::{Int64Array, Scalar, StructArray}; use arrow_ord::cmp::lt; use datafusion_common::{ - exec_datafusion_err, exec_err, internal_err, Result, UnnestOptions, + exec_datafusion_err, exec_err, internal_err, HashMap, HashSet, Result, UnnestOptions, }; use datafusion_execution::TaskContext; use datafusion_physical_expr::EquivalenceProperties; use async_trait::async_trait; use futures::{Stream, StreamExt}; -use hashbrown::HashSet; use log::trace; /// Unnest the given columns (either with type struct or list) diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs index 8c0331f94570..c3e0a4e3897c 100644 --- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs @@ -22,7 +22,7 @@ use std::any::Any; use std::cmp::{min, Ordering}; -use std::collections::{HashMap, VecDeque}; +use std::collections::VecDeque; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; @@ -51,7 +51,9 @@ use datafusion_common::stats::Precision; use datafusion_common::utils::{ evaluate_partition_ranges, get_at_indices, get_row_at_idx, }; -use datafusion_common::{arrow_datafusion_err, exec_err, DataFusionError, Result}; +use datafusion_common::{ + arrow_datafusion_err, exec_err, DataFusionError, HashMap, Result, +}; use datafusion_execution::TaskContext; use datafusion_expr::window_state::{PartitionBatchState, WindowAggState}; use datafusion_expr::ColumnarValue; diff --git a/datafusion/sql/src/unparser/rewrite.rs b/datafusion/sql/src/unparser/rewrite.rs index 57d700f86955..6b3b999ba04b 100644 --- a/datafusion/sql/src/unparser/rewrite.rs +++ b/datafusion/sql/src/unparser/rewrite.rs @@ -15,15 +15,12 @@ // specific language governing permissions and limitations // under the License. -use std::{ - collections::{HashMap, HashSet}, - sync::Arc, -}; +use std::{collections::HashSet, sync::Arc}; use arrow_schema::Schema; use datafusion_common::{ tree_node::{Transformed, TransformedResult, TreeNode, TreeNodeRewriter}, - Column, Result, TableReference, + Column, HashMap, Result, TableReference, }; use datafusion_expr::{expr::Alias, tree_node::transform_sort_vec}; use datafusion_expr::{Expr, LogicalPlan, Projection, Sort, SortExpr}; diff --git a/datafusion/sql/src/utils.rs b/datafusion/sql/src/utils.rs index 14436de01843..e479bdbacd83 100644 --- a/datafusion/sql/src/utils.rs +++ b/datafusion/sql/src/utils.rs @@ -17,7 +17,6 @@ //! SQL Utility Functions -use std::collections::HashMap; use std::vec; use arrow_schema::{ @@ -27,8 +26,8 @@ use datafusion_common::tree_node::{ Transformed, TransformedResult, TreeNode, TreeNodeRecursion, TreeNodeRewriter, }; use datafusion_common::{ - exec_err, internal_err, plan_err, Column, DFSchemaRef, DataFusionError, Result, - ScalarValue, + exec_err, internal_err, plan_err, Column, DFSchemaRef, DataFusionError, HashMap, + Result, ScalarValue, }; use datafusion_expr::builder::get_struct_unnested_columns; use datafusion_expr::expr::{Alias, GroupingSet, Unnest, WindowFunction}; diff --git a/datafusion/sqllogictest/test_files/string/string_view.slt b/datafusion/sqllogictest/test_files/string/string_view.slt index 43b08cb25f3f..78a377471893 100644 --- a/datafusion/sqllogictest/test_files/string/string_view.slt +++ b/datafusion/sqllogictest/test_files/string/string_view.slt @@ -93,8 +93,14 @@ select octet_length(column1_utf8view) from test; 0 NULL -query error DataFusion error: Arrow error: Compute error: bit_length not supported for Utf8View +query I select bit_length(column1_utf8view) from test; +---- +48 +72 +56 +0 +NULL query T select btrim(column1_large_utf8) from test; diff --git a/datafusion/substrait/src/extensions.rs b/datafusion/substrait/src/extensions.rs index 459d0e0c5ae5..c74061f2c9f3 100644 --- a/datafusion/substrait/src/extensions.rs +++ b/datafusion/substrait/src/extensions.rs @@ -15,8 +15,7 @@ // specific language governing permissions and limitations // under the License. -use datafusion::common::{plan_err, DataFusionError}; -use std::collections::HashMap; +use datafusion::common::{plan_err, DataFusionError, HashMap}; use substrait::proto::extensions::simple_extension_declaration::{ ExtensionFunction, ExtensionType, ExtensionTypeVariation, MappingType, };