From cb24fc062f88e72fbbad02f7f1a154f8fdfe4fee Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Fri, 26 Jul 2024 10:47:49 -0400 Subject: [PATCH] move config to parquet-only --- benchmarks/src/clickbench.rs | 6 +++- benchmarks/src/tpch/run.rs | 6 +++- datafusion/common/src/config.rs | 8 +++--- .../common/src/file_options/parquet_writer.rs | 1 + datafusion/core/example.parquet | Bin 0 -> 976 bytes .../core/src/datasource/file_format/mod.rs | 23 +++++++++++++++ .../src/datasource/file_format/parquet.rs | 13 ++++++++- .../core/src/datasource/listing/table.rs | 27 ++---------------- .../datasource/physical_plan/parquet/mod.rs | 4 +++ .../physical_plan/parquet/opener.rs | 25 ++++++++++++---- .../proto/datafusion_common.proto | 1 + datafusion/proto-common/src/from_proto/mod.rs | 2 +- .../proto-common/src/generated/pbjson.rs | 18 ++++++++++++ .../proto-common/src/generated/prost.rs | 3 ++ datafusion/proto-common/src/to_proto/mod.rs | 1 + .../test_files/information_schema.slt | 4 +-- docs/source/user-guide/configs.md | 4 +-- 17 files changed, 104 insertions(+), 42 deletions(-) create mode 100644 datafusion/core/example.parquet diff --git a/benchmarks/src/clickbench.rs b/benchmarks/src/clickbench.rs index 34004d9fb1c80..a0f051d176234 100644 --- a/benchmarks/src/clickbench.rs +++ b/benchmarks/src/clickbench.rs @@ -117,7 +117,11 @@ impl RunOpt { }; let mut config = self.common.config(); - config.options_mut().execution.schema_force_string_view = self.common.string_view; + config + .options_mut() + .execution + .parquet + .schema_force_string_view = self.common.string_view; let ctx = SessionContext::new_with_config(config); self.register_hits(&ctx).await?; diff --git a/benchmarks/src/tpch/run.rs b/benchmarks/src/tpch/run.rs index 3a31f48a4bd43..a72dfaa0f58ca 100644 --- a/benchmarks/src/tpch/run.rs +++ b/benchmarks/src/tpch/run.rs @@ -120,7 +120,11 @@ impl RunOpt { .config() .with_collect_statistics(!self.disable_statistics); config.options_mut().optimizer.prefer_hash_join = self.prefer_hash_join; - config.options_mut().execution.schema_force_string_view = self.common.string_view; + config + .options_mut() + .execution + .parquet + .schema_force_string_view = self.common.string_view; let ctx = SessionContext::new_with_config(config); // register tables diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 5f5b441bd6066..98461bc6c8c4a 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -311,10 +311,6 @@ config_namespace! { /// Should DataFusion keep the columns used for partition_by in the output RecordBatches pub keep_partition_by_columns: bool, default = false - - /// If true, listing tables will read columns of `Utf8/Utf8Large` with `Utf8View`, - /// and `Binary/BinaryLarge` with `BinaryView`. - pub schema_force_string_view: bool, default = false } } @@ -455,6 +451,10 @@ config_namespace! { /// data frame. pub maximum_buffered_record_batches_per_stream: usize, default = 2 + + /// If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`, + /// and `Binary/BinaryLarge` with `BinaryView`. + pub schema_force_string_view: bool, default = false } } diff --git a/datafusion/common/src/file_options/parquet_writer.rs b/datafusion/common/src/file_options/parquet_writer.rs index 009164a29e348..24ccda3e47cbd 100644 --- a/datafusion/common/src/file_options/parquet_writer.rs +++ b/datafusion/common/src/file_options/parquet_writer.rs @@ -81,6 +81,7 @@ impl TryFrom<&TableParquetOptions> for ParquetWriterOptions { maximum_parallel_row_group_writers: _, maximum_buffered_record_batches_per_stream: _, bloom_filter_on_read: _, + schema_force_string_view: _, } = &parquet_options.global; let key_value_metadata = if !parquet_options.key_value_metadata.is_empty() { diff --git a/datafusion/core/example.parquet b/datafusion/core/example.parquet new file mode 100644 index 0000000000000000000000000000000000000000..94de10394b33d26a23a9888e88faa1fa90f14043 GIT binary patch literal 976 zcmb7@y-UMD7{=dR+8CijD}7&bkU<0w2k`?GGL%A>;?SWub(6GKRM0|Ob#>@0PW}lF zj)D#j4jufP929)7>E!~g7L&j|d7ryqp4>;XcDRc@6Mv}ynjBo~6V zH`y+thhEt7jT5A*RN}trNWm|{0f9NW4_;9QPK*T-bm!2sqpAu*JJPBsrP-S1{t{1r zL|?PhFl5S2s;{?7@i{f==;+c__5v4R+(_C2+(#~f_ zqL5^4^S5jpnYGQ=*fw%%hg8QQV?c&9a#K0ZCz5+L4hnI<-@7>)bWXb$F?zh7>w(ctf)69oWF1C%Kz8Cp)ViHG+x3gs82To&93&yRUd+}=C`ei=G63r()}_L z-TE5)>SeImRT}5jD9>0kT~Se9KA*kUVhCXR^s>`3E!aTC)HE literal 0 HcmV?d00001 diff --git a/datafusion/core/src/datasource/file_format/mod.rs b/datafusion/core/src/datasource/file_format/mod.rs index 1aa93a106aff0..bf6de496506c4 100644 --- a/datafusion/core/src/datasource/file_format/mod.rs +++ b/datafusion/core/src/datasource/file_format/mod.rs @@ -42,6 +42,7 @@ use crate::error::Result; use crate::execution::context::SessionState; use crate::physical_plan::{ExecutionPlan, Statistics}; +use arrow_schema::{DataType, Field, Schema}; use datafusion_common::file_options::file_type::FileType; use datafusion_common::{internal_err, not_impl_err, GetExt}; use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement}; @@ -194,6 +195,28 @@ pub fn file_type_to_format( } } +/// Transform a schema to use view types for Utf8 and Binary +pub fn transform_schema_to_view(schema: &Schema) -> Schema { + let transformed_fields: Vec> = schema + .fields + .iter() + .map(|field| match field.data_type() { + DataType::Utf8 | DataType::LargeUtf8 => Arc::new(Field::new( + field.name(), + DataType::Utf8View, + field.is_nullable(), + )), + DataType::Binary | DataType::LargeBinary => Arc::new(Field::new( + field.name(), + DataType::BinaryView, + field.is_nullable(), + )), + _ => field.clone(), + }) + .collect(); + Schema::new_with_metadata(transformed_fields, schema.metadata.clone()) +} + #[cfg(test)] pub(crate) mod test_util { use std::ops::Range; diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 4f95368592896..520be1b5a202a 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -24,7 +24,7 @@ use std::sync::Arc; use super::write::demux::start_demuxer_task; use super::write::{create_writer, SharedBuffer}; -use super::{FileFormat, FileFormatFactory, FileScanConfig}; +use super::{transform_schema_to_view, FileFormat, FileFormatFactory, FileScanConfig}; use crate::arrow::array::RecordBatch; use crate::arrow::datatypes::{Fields, Schema, SchemaRef}; use crate::datasource::file_format::file_compression_type::FileCompressionType; @@ -305,6 +305,17 @@ impl FileFormat for ParquetFormat { Schema::try_merge(schemas) }?; + let schema = if state + .config_options() + .execution + .parquet + .schema_force_string_view + { + transform_schema_to_view(&schema) + } else { + schema + }; + Ok(Arc::new(schema)) } diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 0142d42767721..b498f64f9f501 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -410,31 +410,8 @@ impl ListingOptions { .try_collect() .await?; - let mut schema = self.format.infer_schema(state, &store, &files).await?; - - if state.config_options().execution.schema_force_string_view { - let transformed_fields: Vec> = schema - .fields - .iter() - .map(|field| match field.data_type() { - DataType::Utf8 | DataType::LargeUtf8 => Arc::new(Field::new( - field.name(), - DataType::Utf8View, - field.is_nullable(), - )), - DataType::Binary | DataType::LargeBinary => Arc::new(Field::new( - field.name(), - DataType::BinaryView, - field.is_nullable(), - )), - _ => field.clone(), - }) - .collect(); - schema = Arc::new(Schema::new_with_metadata( - transformed_fields, - schema.metadata.clone(), - )); - } + let schema = self.format.infer_schema(state, &store, &files).await?; + Ok(schema) } diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index ed0fc5f0169ee..e221ff6a8a0cb 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -720,6 +720,10 @@ impl ExecutionPlan for ParquetExec { enable_page_index: self.enable_page_index(), enable_bloom_filter: self.bloom_filter_on_read(), schema_adapter_factory, + schema_force_string_view: self + .table_parquet_options + .global + .schema_force_string_view, }; let stream = diff --git a/datafusion/core/src/datasource/physical_plan/parquet/opener.rs b/datafusion/core/src/datasource/physical_plan/parquet/opener.rs index 2d66d2c6c5f4f..e35af46a81fa4 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/opener.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/opener.rs @@ -17,6 +17,7 @@ //! [`ParquetOpener`] for opening Parquet files +use crate::datasource::file_format::transform_schema_to_view; use crate::datasource::physical_plan::parquet::page_filter::PagePruningPredicate; use crate::datasource::physical_plan::parquet::row_group_filter::RowGroupAccessPlanFilter; use crate::datasource::physical_plan::parquet::{ @@ -33,7 +34,7 @@ use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; use futures::{StreamExt, TryStreamExt}; use log::debug; -use parquet::arrow::arrow_reader::ArrowReaderOptions; +use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions}; use parquet::arrow::async_reader::AsyncFileReader; use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask}; use std::sync::Arc; @@ -56,6 +57,7 @@ pub(super) struct ParquetOpener { pub enable_page_index: bool, pub enable_bloom_filter: bool, pub schema_adapter_factory: Arc, + pub schema_force_string_view: bool, } impl FileOpener for ParquetOpener { @@ -66,7 +68,7 @@ impl FileOpener for ParquetOpener { let file_metrics = ParquetFileMetrics::new(self.partition_index, &file_name, &self.metrics); - let reader: Box = + let mut reader: Box = self.parquet_file_reader_factory.create_reader( self.partition_index, file_meta, @@ -90,14 +92,27 @@ impl FileOpener for ParquetOpener { ); let enable_bloom_filter = self.enable_bloom_filter; let limit = self.limit; + let schema_force_string_view = self.schema_force_string_view; Ok(Box::pin(async move { + let options = ArrowReaderOptions::new().with_page_index(enable_page_index); + + let metadata = + ArrowReaderMetadata::load_async(&mut reader, options.clone()).await?; + let mut schema = metadata.schema().clone(); + + if schema_force_string_view { + schema = Arc::new(transform_schema_to_view(&schema)); + } + let options = ArrowReaderOptions::new() .with_page_index(enable_page_index) - .with_schema(table_schema.clone()); + .with_schema(schema.clone()); + let metadata = + ArrowReaderMetadata::try_new(metadata.metadata().clone(), options)?; + let mut builder = - ParquetRecordBatchStreamBuilder::new_with_options(reader, options) - .await?; + ParquetRecordBatchStreamBuilder::new_with_metadata(reader, metadata); let file_schema = builder.schema().clone(); diff --git a/datafusion/proto-common/proto/datafusion_common.proto b/datafusion/proto-common/proto/datafusion_common.proto index e2a405595fb74..f94d9e66528a9 100644 --- a/datafusion/proto-common/proto/datafusion_common.proto +++ b/datafusion/proto-common/proto/datafusion_common.proto @@ -478,6 +478,7 @@ message ParquetOptions { uint64 maximum_buffered_record_batches_per_stream = 25; // default = 2 bool bloom_filter_on_read = 26; // default = true bool bloom_filter_on_write = 27; // default = false + bool schema_force_string_view = 28; // default = false oneof metadata_size_hint_opt { uint64 metadata_size_hint = 4; diff --git a/datafusion/proto-common/src/from_proto/mod.rs b/datafusion/proto-common/src/from_proto/mod.rs index df673de4e1191..a538ee1110612 100644 --- a/datafusion/proto-common/src/from_proto/mod.rs +++ b/datafusion/proto-common/src/from_proto/mod.rs @@ -950,7 +950,7 @@ impl TryFrom<&protobuf::ParquetOptions> for ParquetOptions { allow_single_file_parallelism: value.allow_single_file_parallelism, maximum_parallel_row_group_writers: value.maximum_parallel_row_group_writers as usize, maximum_buffered_record_batches_per_stream: value.maximum_buffered_record_batches_per_stream as usize, - + schema_force_string_view: value.schema_force_string_view, }) } } diff --git a/datafusion/proto-common/src/generated/pbjson.rs b/datafusion/proto-common/src/generated/pbjson.rs index be3cc58b23dfe..025cddb5601cc 100644 --- a/datafusion/proto-common/src/generated/pbjson.rs +++ b/datafusion/proto-common/src/generated/pbjson.rs @@ -4634,6 +4634,9 @@ impl serde::Serialize for ParquetOptions { if self.bloom_filter_on_write { len += 1; } + if self.schema_force_string_view { + len += 1; + } if self.dictionary_page_size_limit != 0 { len += 1; } @@ -4717,6 +4720,9 @@ impl serde::Serialize for ParquetOptions { if self.bloom_filter_on_write { struct_ser.serialize_field("bloomFilterOnWrite", &self.bloom_filter_on_write)?; } + if self.schema_force_string_view { + struct_ser.serialize_field("schemaForceStringView", &self.schema_force_string_view)?; + } if self.dictionary_page_size_limit != 0 { #[allow(clippy::needless_borrow)] struct_ser.serialize_field("dictionaryPageSizeLimit", ToString::to_string(&self.dictionary_page_size_limit).as_str())?; @@ -4834,6 +4840,8 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { "bloomFilterOnRead", "bloom_filter_on_write", "bloomFilterOnWrite", + "schema_force_string_view", + "schemaForceStringView", "dictionary_page_size_limit", "dictionaryPageSizeLimit", "data_page_row_count_limit", @@ -4875,6 +4883,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { MaximumBufferedRecordBatchesPerStream, BloomFilterOnRead, BloomFilterOnWrite, + SchemaForceStringView, DictionaryPageSizeLimit, DataPageRowCountLimit, MaxRowGroupSize, @@ -4922,6 +4931,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { "maximumBufferedRecordBatchesPerStream" | "maximum_buffered_record_batches_per_stream" => Ok(GeneratedField::MaximumBufferedRecordBatchesPerStream), "bloomFilterOnRead" | "bloom_filter_on_read" => Ok(GeneratedField::BloomFilterOnRead), "bloomFilterOnWrite" | "bloom_filter_on_write" => Ok(GeneratedField::BloomFilterOnWrite), + "schemaForceStringView" | "schema_force_string_view" => Ok(GeneratedField::SchemaForceStringView), "dictionaryPageSizeLimit" | "dictionary_page_size_limit" => Ok(GeneratedField::DictionaryPageSizeLimit), "dataPageRowCountLimit" | "data_page_row_count_limit" => Ok(GeneratedField::DataPageRowCountLimit), "maxRowGroupSize" | "max_row_group_size" => Ok(GeneratedField::MaxRowGroupSize), @@ -4967,6 +4977,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { let mut maximum_buffered_record_batches_per_stream__ = None; let mut bloom_filter_on_read__ = None; let mut bloom_filter_on_write__ = None; + let mut schema_force_string_view__ = None; let mut dictionary_page_size_limit__ = None; let mut data_page_row_count_limit__ = None; let mut max_row_group_size__ = None; @@ -5068,6 +5079,12 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { } bloom_filter_on_write__ = Some(map_.next_value()?); } + GeneratedField::SchemaForceStringView => { + if schema_force_string_view__.is_some() { + return Err(serde::de::Error::duplicate_field("schemaForceStringView")); + } + schema_force_string_view__ = Some(map_.next_value()?); + } GeneratedField::DictionaryPageSizeLimit => { if dictionary_page_size_limit__.is_some() { return Err(serde::de::Error::duplicate_field("dictionaryPageSizeLimit")); @@ -5168,6 +5185,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { maximum_buffered_record_batches_per_stream: maximum_buffered_record_batches_per_stream__.unwrap_or_default(), bloom_filter_on_read: bloom_filter_on_read__.unwrap_or_default(), bloom_filter_on_write: bloom_filter_on_write__.unwrap_or_default(), + schema_force_string_view: schema_force_string_view__.unwrap_or_default(), dictionary_page_size_limit: dictionary_page_size_limit__.unwrap_or_default(), data_page_row_count_limit: data_page_row_count_limit__.unwrap_or_default(), max_row_group_size: max_row_group_size__.unwrap_or_default(), diff --git a/datafusion/proto-common/src/generated/prost.rs b/datafusion/proto-common/src/generated/prost.rs index b0674ff28d754..961ff3abcd3ae 100644 --- a/datafusion/proto-common/src/generated/prost.rs +++ b/datafusion/proto-common/src/generated/prost.rs @@ -786,6 +786,9 @@ pub struct ParquetOptions { /// default = false #[prost(bool, tag = "27")] pub bloom_filter_on_write: bool, + /// default = false + #[prost(bool, tag = "28")] + pub schema_force_string_view: bool, #[prost(uint64, tag = "12")] pub dictionary_page_size_limit: u64, #[prost(uint64, tag = "18")] diff --git a/datafusion/proto-common/src/to_proto/mod.rs b/datafusion/proto-common/src/to_proto/mod.rs index 705a479e01787..2d5b3f291f3a0 100644 --- a/datafusion/proto-common/src/to_proto/mod.rs +++ b/datafusion/proto-common/src/to_proto/mod.rs @@ -825,6 +825,7 @@ impl TryFrom<&ParquetOptions> for protobuf::ParquetOptions { allow_single_file_parallelism: value.allow_single_file_parallelism, maximum_parallel_row_group_writers: value.maximum_parallel_row_group_writers as u64, maximum_buffered_record_batches_per_stream: value.maximum_buffered_record_batches_per_stream as u64, + schema_force_string_view: value.schema_force_string_view, }) } } diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index 9be3e9fd4146d..2a42e8db555e2 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -200,12 +200,12 @@ datafusion.execution.parquet.metadata_size_hint NULL datafusion.execution.parquet.pruning true datafusion.execution.parquet.pushdown_filters false datafusion.execution.parquet.reorder_filters false +datafusion.execution.parquet.schema_force_string_view false datafusion.execution.parquet.skip_metadata true datafusion.execution.parquet.statistics_enabled NULL datafusion.execution.parquet.write_batch_size 1024 datafusion.execution.parquet.writer_version 1.0 datafusion.execution.planning_concurrency 13 -datafusion.execution.schema_force_string_view false datafusion.execution.soft_max_rows_per_output_file 50000000 datafusion.execution.sort_in_place_threshold_bytes 1048576 datafusion.execution.sort_spill_reservation_bytes 10485760 @@ -285,12 +285,12 @@ datafusion.execution.parquet.metadata_size_hint NULL If specified, the parquet r datafusion.execution.parquet.pruning true If true, the parquet reader attempts to skip entire row groups based on the predicate in the query and the metadata (min/max values) stored in the parquet file datafusion.execution.parquet.pushdown_filters false If true, filter expressions are be applied during the parquet decoding operation to reduce the number of rows decoded. This optimization is sometimes called "late materialization". datafusion.execution.parquet.reorder_filters false If true, filter expressions evaluated during the parquet decoding operation will be reordered heuristically to minimize the cost of evaluation. If false, the filters are applied in the same order as written in the query +datafusion.execution.parquet.schema_force_string_view false If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`, and `Binary/BinaryLarge` with `BinaryView`. datafusion.execution.parquet.skip_metadata true If true, the parquet reader skip the optional embedded metadata that may be in the file Schema. This setting can help avoid schema conflicts when querying multiple parquet files with schemas containing compatible types but different metadata datafusion.execution.parquet.statistics_enabled NULL Sets if statistics are enabled for any column Valid values are: "none", "chunk", and "page" These values are not case sensitive. If NULL, uses default parquet writer setting datafusion.execution.parquet.write_batch_size 1024 Sets write_batch_size in bytes datafusion.execution.parquet.writer_version 1.0 Sets parquet writer version valid values are "1.0" and "2.0" datafusion.execution.planning_concurrency 13 Fan-out during initial physical planning. This is mostly use to plan `UNION` children in parallel. Defaults to the number of CPU cores on the system -datafusion.execution.schema_force_string_view false If true, listing tables will read columns of `Utf8/Utf8Large` with `Utf8View`, and `Binary/BinaryLarge` with `BinaryView`. datafusion.execution.soft_max_rows_per_output_file 50000000 Target number of rows in output files when writing multiple. This is a soft max, so it can be exceeded slightly. There also will be one file smaller than the limit if the total number of rows written is not roughly divisible by the soft max datafusion.execution.sort_in_place_threshold_bytes 1048576 When sorting, below what size should data be concatenated and sorted in a single RecordBatch rather than sorted in batches and merged. datafusion.execution.sort_spill_reservation_bytes 10485760 Specifies the reserved memory for each spillable sort operation to facilitate an in-memory merge. When a sort operation spills to disk, the in-memory data must be sorted and merged before being written to a file. This setting reserves a specific amount of memory for that in-memory sort/merge process. Note: This setting is irrelevant if the sort operation cannot spill (i.e., if there's no `DiskManager` configured). diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 106f4369f970e..d3c18a00fe7e4 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -75,6 +75,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.execution.parquet.allow_single_file_parallelism | true | Controls whether DataFusion will attempt to speed up writing parquet files by serializing them in parallel. Each column in each row group in each output file are serialized in parallel leveraging a maximum possible core count of n_files*n_row_groups*n_columns. | | datafusion.execution.parquet.maximum_parallel_row_group_writers | 1 | By default parallel parquet writer is tuned for minimum memory usage in a streaming execution plan. You may see a performance benefit when writing large parquet files by increasing maximum_parallel_row_group_writers and maximum_buffered_record_batches_per_stream if your system has idle cores and can tolerate additional memory usage. Boosting these values is likely worthwhile when writing out already in-memory data, such as from a cached data frame. | | datafusion.execution.parquet.maximum_buffered_record_batches_per_stream | 2 | By default parallel parquet writer is tuned for minimum memory usage in a streaming execution plan. You may see a performance benefit when writing large parquet files by increasing maximum_parallel_row_group_writers and maximum_buffered_record_batches_per_stream if your system has idle cores and can tolerate additional memory usage. Boosting these values is likely worthwhile when writing out already in-memory data, such as from a cached data frame. | +| datafusion.execution.parquet.schema_force_string_view | false | If true, listing tables will read columns of `Utf8/Utf8Large` with `Utf8View`, and `Binary/BinaryLarge` with `BinaryView`. | | datafusion.execution.aggregate.scalar_update_factor | 10 | Specifies the threshold for using `ScalarValue`s to update accumulators during high-cardinality aggregations for each input batch. The aggregation is considered high-cardinality if the number of affected groups is greater than or equal to `batch_size / scalar_update_factor`. In such cases, `ScalarValue`s are utilized for updating accumulators, rather than the default batch-slice approach. This can lead to performance improvements. By adjusting the `scalar_update_factor`, you can balance the trade-off between more efficient accumulator updates and the number of groups affected. | | datafusion.execution.planning_concurrency | 0 | Fan-out during initial physical planning. This is mostly use to plan `UNION` children in parallel. Defaults to the number of CPU cores on the system | | datafusion.execution.sort_spill_reservation_bytes | 10485760 | Specifies the reserved memory for each spillable sort operation to facilitate an in-memory merge. When a sort operation spills to disk, the in-memory data must be sorted and merged before being written to a file. This setting reserves a specific amount of memory for that in-memory sort/merge process. Note: This setting is irrelevant if the sort operation cannot spill (i.e., if there's no `DiskManager` configured). | @@ -87,8 +88,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.execution.enable_recursive_ctes | true | Should DataFusion support recursive CTEs | | datafusion.execution.split_file_groups_by_statistics | false | Attempt to eliminate sorts by packing & sorting files with non-overlapping statistics into the same file groups. Currently experimental | | datafusion.execution.keep_partition_by_columns | false | Should DataFusion keep the columns used for partition_by in the output RecordBatches | -| datafusion.execution.schema_force_string_view | false | If true, listing tables will read columns of `Utf8/Utf8Large` with `Utf8View`, and `Binary/BinaryLarge` with `BinaryView`. | -| datafusion.optimizer.enable_distinct_aggregation_soft_limit | true | When set to true, the optimizer will push a limit operation into grouped aggregations which have no aggregate expressions, as a soft limit, emitting groups once the limit is reached, before all rows in the group are read. | + datafusion.optimizer.enable_distinct_aggregation_soft_limit | true | When set to true, the optimizer will push a limit operation into grouped aggregations which have no aggregate expressions, as a soft limit, emitting groups once the limit is reached, before all rows in the group are read. | | datafusion.optimizer.enable_round_robin_repartition | true | When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores | | datafusion.optimizer.enable_topk_aggregation | true | When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible | | datafusion.optimizer.filter_null_join_keys | false | When set to true, the optimizer will insert filters before a join between a nullable and non-nullable column to filter out nulls on the nullable side. This filter can add additional overhead when the file format does not fully support predicate push down. |