Skip to content

Commit

Permalink
Change --string-view to only apply to parquet formats (#11663)
Browse files Browse the repository at this point in the history
* use inferenced schema, don't load schema again

* move config to parquet-only

* update

* update

* better format

* format

* update
  • Loading branch information
XiangpengHao authored Jul 27, 2024
1 parent 5690712 commit 322c3d2
Show file tree
Hide file tree
Showing 17 changed files with 110 additions and 40 deletions.
6 changes: 5 additions & 1 deletion benchmarks/src/clickbench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,11 @@ impl RunOpt {
};

let mut config = self.common.config();
config.options_mut().execution.schema_force_string_view = self.common.string_view;
config
.options_mut()
.execution
.parquet
.schema_force_string_view = self.common.string_view;

let ctx = SessionContext::new_with_config(config);
self.register_hits(&ctx).await?;
Expand Down
6 changes: 5 additions & 1 deletion benchmarks/src/tpch/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,11 @@ impl RunOpt {
.config()
.with_collect_statistics(!self.disable_statistics);
config.options_mut().optimizer.prefer_hash_join = self.prefer_hash_join;
config.options_mut().execution.schema_force_string_view = self.common.string_view;
config
.options_mut()
.execution
.parquet
.schema_force_string_view = self.common.string_view;
let ctx = SessionContext::new_with_config(config);

// register tables
Expand Down
8 changes: 4 additions & 4 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,10 +324,6 @@ config_namespace! {

/// Should DataFusion keep the columns used for partition_by in the output RecordBatches
pub keep_partition_by_columns: bool, default = false

/// If true, listing tables will read columns of `Utf8/Utf8Large` with `Utf8View`,
/// and `Binary/BinaryLarge` with `BinaryView`.
pub schema_force_string_view: bool, default = false
}
}

Expand Down Expand Up @@ -473,6 +469,10 @@ config_namespace! {
/// writing out already in-memory data, such as from a cached
/// data frame.
pub maximum_buffered_record_batches_per_stream: usize, default = 2

/// (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`,
/// and `Binary/BinaryLarge` with `BinaryView`.
pub schema_force_string_view: bool, default = false
}
}

Expand Down
4 changes: 4 additions & 0 deletions datafusion/common/src/file_options/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ impl ParquetOptions {
maximum_parallel_row_group_writers: _,
maximum_buffered_record_batches_per_stream: _,
bloom_filter_on_read: _, // reads not used for writer props
schema_force_string_view: _,
} = self;

let mut builder = WriterProperties::builder()
Expand Down Expand Up @@ -440,6 +441,7 @@ mod tests {
maximum_buffered_record_batches_per_stream: defaults
.maximum_buffered_record_batches_per_stream,
bloom_filter_on_read: defaults.bloom_filter_on_read,
schema_force_string_view: defaults.schema_force_string_view,
}
}

Expand Down Expand Up @@ -540,6 +542,8 @@ mod tests {
maximum_buffered_record_batches_per_stream: global_options_defaults
.maximum_buffered_record_batches_per_stream,
bloom_filter_on_read: global_options_defaults.bloom_filter_on_read,
schema_force_string_view: global_options_defaults
.schema_force_string_view,
},
column_specific_options,
key_value_metadata,
Expand Down
23 changes: 23 additions & 0 deletions datafusion/core/src/datasource/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use crate::error::Result;
use crate::execution::context::SessionState;
use crate::physical_plan::{ExecutionPlan, Statistics};

use arrow_schema::{DataType, Field, Schema};
use datafusion_common::file_options::file_type::FileType;
use datafusion_common::{internal_err, not_impl_err, GetExt};
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement};
Expand Down Expand Up @@ -204,6 +205,28 @@ pub fn file_type_to_format(
}
}

/// Transform a schema to use view types for Utf8 and Binary
pub fn transform_schema_to_view(schema: &Schema) -> Schema {
let transformed_fields: Vec<Arc<Field>> = schema
.fields
.iter()
.map(|field| match field.data_type() {
DataType::Utf8 | DataType::LargeUtf8 => Arc::new(Field::new(
field.name(),
DataType::Utf8View,
field.is_nullable(),
)),
DataType::Binary | DataType::LargeBinary => Arc::new(Field::new(
field.name(),
DataType::BinaryView,
field.is_nullable(),
)),
_ => field.clone(),
})
.collect();
Schema::new_with_metadata(transformed_fields, schema.metadata.clone())
}

#[cfg(test)]
pub(crate) mod test_util {
use std::ops::Range;
Expand Down
13 changes: 12 additions & 1 deletion datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::sync::Arc;

use super::write::demux::start_demuxer_task;
use super::write::{create_writer, SharedBuffer};
use super::{FileFormat, FileFormatFactory, FileScanConfig};
use super::{transform_schema_to_view, FileFormat, FileFormatFactory, FileScanConfig};
use crate::arrow::array::RecordBatch;
use crate::arrow::datatypes::{Fields, Schema, SchemaRef};
use crate::datasource::file_format::file_compression_type::FileCompressionType;
Expand Down Expand Up @@ -316,6 +316,17 @@ impl FileFormat for ParquetFormat {
Schema::try_merge(schemas)
}?;

let schema = if state
.config_options()
.execution
.parquet
.schema_force_string_view
{
transform_schema_to_view(&schema)
} else {
schema
};

Ok(Arc::new(schema))
}

Expand Down
27 changes: 2 additions & 25 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -410,31 +410,8 @@ impl ListingOptions {
.try_collect()
.await?;

let mut schema = self.format.infer_schema(state, &store, &files).await?;

if state.config_options().execution.schema_force_string_view {
let transformed_fields: Vec<Arc<Field>> = schema
.fields
.iter()
.map(|field| match field.data_type() {
DataType::Utf8 | DataType::LargeUtf8 => Arc::new(Field::new(
field.name(),
DataType::Utf8View,
field.is_nullable(),
)),
DataType::Binary | DataType::LargeBinary => Arc::new(Field::new(
field.name(),
DataType::BinaryView,
field.is_nullable(),
)),
_ => field.clone(),
})
.collect();
schema = Arc::new(Schema::new_with_metadata(
transformed_fields,
schema.metadata.clone(),
));
}
let schema = self.format.infer_schema(state, &store, &files).await?;

Ok(schema)
}

Expand Down
4 changes: 4 additions & 0 deletions datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -711,6 +711,10 @@ impl ExecutionPlan for ParquetExec {
enable_page_index: self.enable_page_index(),
enable_bloom_filter: self.bloom_filter_on_read(),
schema_adapter_factory,
schema_force_string_view: self
.table_parquet_options
.global
.schema_force_string_view,
};

let stream =
Expand Down
25 changes: 21 additions & 4 deletions datafusion/core/src/datasource/physical_plan/parquet/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

//! [`ParquetOpener`] for opening Parquet files
use crate::datasource::file_format::transform_schema_to_view;
use crate::datasource::physical_plan::parquet::page_filter::PagePruningAccessPlanFilter;
use crate::datasource::physical_plan::parquet::row_group_filter::RowGroupAccessPlanFilter;
use crate::datasource::physical_plan::parquet::{
Expand All @@ -33,7 +34,7 @@ use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
use futures::{StreamExt, TryStreamExt};
use log::debug;
use parquet::arrow::arrow_reader::ArrowReaderOptions;
use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
use parquet::arrow::async_reader::AsyncFileReader;
use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
use std::sync::Arc;
Expand All @@ -56,6 +57,7 @@ pub(super) struct ParquetOpener {
pub enable_page_index: bool,
pub enable_bloom_filter: bool,
pub schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
pub schema_force_string_view: bool,
}

impl FileOpener for ParquetOpener {
Expand All @@ -66,7 +68,7 @@ impl FileOpener for ParquetOpener {
let file_metrics =
ParquetFileMetrics::new(self.partition_index, &file_name, &self.metrics);

let reader: Box<dyn AsyncFileReader> =
let mut reader: Box<dyn AsyncFileReader> =
self.parquet_file_reader_factory.create_reader(
self.partition_index,
file_meta,
Expand All @@ -90,12 +92,27 @@ impl FileOpener for ParquetOpener {
);
let enable_bloom_filter = self.enable_bloom_filter;
let limit = self.limit;
let schema_force_string_view = self.schema_force_string_view;

Ok(Box::pin(async move {
let options = ArrowReaderOptions::new().with_page_index(enable_page_index);

let metadata =
ArrowReaderMetadata::load_async(&mut reader, options.clone()).await?;
let mut schema = metadata.schema().clone();

if schema_force_string_view {
schema = Arc::new(transform_schema_to_view(&schema));
}

let options = ArrowReaderOptions::new()
.with_page_index(enable_page_index)
.with_schema(schema.clone());
let metadata =
ArrowReaderMetadata::try_new(metadata.metadata().clone(), options)?;

let mut builder =
ParquetRecordBatchStreamBuilder::new_with_options(reader, options)
.await?;
ParquetRecordBatchStreamBuilder::new_with_metadata(reader, metadata);

let file_schema = builder.schema().clone();

Expand Down
1 change: 1 addition & 0 deletions datafusion/proto-common/proto/datafusion_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,7 @@ message ParquetOptions {
uint64 maximum_buffered_record_batches_per_stream = 25; // default = 2
bool bloom_filter_on_read = 26; // default = true
bool bloom_filter_on_write = 27; // default = false
bool schema_force_string_view = 28; // default = false

oneof metadata_size_hint_opt {
uint64 metadata_size_hint = 4;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/proto-common/src/from_proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -955,7 +955,7 @@ impl TryFrom<&protobuf::ParquetOptions> for ParquetOptions {
allow_single_file_parallelism: value.allow_single_file_parallelism,
maximum_parallel_row_group_writers: value.maximum_parallel_row_group_writers as usize,
maximum_buffered_record_batches_per_stream: value.maximum_buffered_record_batches_per_stream as usize,

schema_force_string_view: value.schema_force_string_view,
})
}
}
Expand Down
18 changes: 18 additions & 0 deletions datafusion/proto-common/src/generated/pbjson.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4780,6 +4780,9 @@ impl serde::Serialize for ParquetOptions {
if self.bloom_filter_on_write {
len += 1;
}
if self.schema_force_string_view {
len += 1;
}
if self.dictionary_page_size_limit != 0 {
len += 1;
}
Expand Down Expand Up @@ -4863,6 +4866,9 @@ impl serde::Serialize for ParquetOptions {
if self.bloom_filter_on_write {
struct_ser.serialize_field("bloomFilterOnWrite", &self.bloom_filter_on_write)?;
}
if self.schema_force_string_view {
struct_ser.serialize_field("schemaForceStringView", &self.schema_force_string_view)?;
}
if self.dictionary_page_size_limit != 0 {
#[allow(clippy::needless_borrow)]
struct_ser.serialize_field("dictionaryPageSizeLimit", ToString::to_string(&self.dictionary_page_size_limit).as_str())?;
Expand Down Expand Up @@ -4980,6 +4986,8 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions {
"bloomFilterOnRead",
"bloom_filter_on_write",
"bloomFilterOnWrite",
"schema_force_string_view",
"schemaForceStringView",
"dictionary_page_size_limit",
"dictionaryPageSizeLimit",
"data_page_row_count_limit",
Expand Down Expand Up @@ -5021,6 +5029,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions {
MaximumBufferedRecordBatchesPerStream,
BloomFilterOnRead,
BloomFilterOnWrite,
SchemaForceStringView,
DictionaryPageSizeLimit,
DataPageRowCountLimit,
MaxRowGroupSize,
Expand Down Expand Up @@ -5068,6 +5077,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions {
"maximumBufferedRecordBatchesPerStream" | "maximum_buffered_record_batches_per_stream" => Ok(GeneratedField::MaximumBufferedRecordBatchesPerStream),
"bloomFilterOnRead" | "bloom_filter_on_read" => Ok(GeneratedField::BloomFilterOnRead),
"bloomFilterOnWrite" | "bloom_filter_on_write" => Ok(GeneratedField::BloomFilterOnWrite),
"schemaForceStringView" | "schema_force_string_view" => Ok(GeneratedField::SchemaForceStringView),
"dictionaryPageSizeLimit" | "dictionary_page_size_limit" => Ok(GeneratedField::DictionaryPageSizeLimit),
"dataPageRowCountLimit" | "data_page_row_count_limit" => Ok(GeneratedField::DataPageRowCountLimit),
"maxRowGroupSize" | "max_row_group_size" => Ok(GeneratedField::MaxRowGroupSize),
Expand Down Expand Up @@ -5113,6 +5123,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions {
let mut maximum_buffered_record_batches_per_stream__ = None;
let mut bloom_filter_on_read__ = None;
let mut bloom_filter_on_write__ = None;
let mut schema_force_string_view__ = None;
let mut dictionary_page_size_limit__ = None;
let mut data_page_row_count_limit__ = None;
let mut max_row_group_size__ = None;
Expand Down Expand Up @@ -5214,6 +5225,12 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions {
}
bloom_filter_on_write__ = Some(map_.next_value()?);
}
GeneratedField::SchemaForceStringView => {
if schema_force_string_view__.is_some() {
return Err(serde::de::Error::duplicate_field("schemaForceStringView"));
}
schema_force_string_view__ = Some(map_.next_value()?);
}
GeneratedField::DictionaryPageSizeLimit => {
if dictionary_page_size_limit__.is_some() {
return Err(serde::de::Error::duplicate_field("dictionaryPageSizeLimit"));
Expand Down Expand Up @@ -5314,6 +5331,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions {
maximum_buffered_record_batches_per_stream: maximum_buffered_record_batches_per_stream__.unwrap_or_default(),
bloom_filter_on_read: bloom_filter_on_read__.unwrap_or_default(),
bloom_filter_on_write: bloom_filter_on_write__.unwrap_or_default(),
schema_force_string_view: schema_force_string_view__.unwrap_or_default(),
dictionary_page_size_limit: dictionary_page_size_limit__.unwrap_or_default(),
data_page_row_count_limit: data_page_row_count_limit__.unwrap_or_default(),
max_row_group_size: max_row_group_size__.unwrap_or_default(),
Expand Down
3 changes: 3 additions & 0 deletions datafusion/proto-common/src/generated/prost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,9 @@ pub struct ParquetOptions {
/// default = false
#[prost(bool, tag = "27")]
pub bloom_filter_on_write: bool,
/// default = false
#[prost(bool, tag = "28")]
pub schema_force_string_view: bool,
#[prost(uint64, tag = "12")]
pub dictionary_page_size_limit: u64,
#[prost(uint64, tag = "18")]
Expand Down
1 change: 1 addition & 0 deletions datafusion/proto-common/src/to_proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -826,6 +826,7 @@ impl TryFrom<&ParquetOptions> for protobuf::ParquetOptions {
allow_single_file_parallelism: value.allow_single_file_parallelism,
maximum_parallel_row_group_writers: value.maximum_parallel_row_group_writers as u64,
maximum_buffered_record_batches_per_stream: value.maximum_buffered_record_batches_per_stream as u64,
schema_force_string_view: value.schema_force_string_view,
})
}
}
Expand Down
3 changes: 3 additions & 0 deletions datafusion/proto/src/generated/datafusion_proto_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,9 @@ pub struct ParquetOptions {
/// default = false
#[prost(bool, tag = "27")]
pub bloom_filter_on_write: bool,
/// default = false
#[prost(bool, tag = "28")]
pub schema_force_string_view: bool,
#[prost(uint64, tag = "12")]
pub dictionary_page_size_limit: u64,
#[prost(uint64, tag = "18")]
Expand Down
4 changes: 2 additions & 2 deletions datafusion/sqllogictest/test_files/information_schema.slt
Original file line number Diff line number Diff line change
Expand Up @@ -201,12 +201,12 @@ datafusion.execution.parquet.metadata_size_hint NULL
datafusion.execution.parquet.pruning true
datafusion.execution.parquet.pushdown_filters false
datafusion.execution.parquet.reorder_filters false
datafusion.execution.parquet.schema_force_string_view false
datafusion.execution.parquet.skip_metadata true
datafusion.execution.parquet.statistics_enabled NULL
datafusion.execution.parquet.write_batch_size 1024
datafusion.execution.parquet.writer_version 1.0
datafusion.execution.planning_concurrency 13
datafusion.execution.schema_force_string_view false
datafusion.execution.soft_max_rows_per_output_file 50000000
datafusion.execution.sort_in_place_threshold_bytes 1048576
datafusion.execution.sort_spill_reservation_bytes 10485760
Expand Down Expand Up @@ -288,12 +288,12 @@ datafusion.execution.parquet.metadata_size_hint NULL (reading) If specified, the
datafusion.execution.parquet.pruning true (reading) If true, the parquet reader attempts to skip entire row groups based on the predicate in the query and the metadata (min/max values) stored in the parquet file
datafusion.execution.parquet.pushdown_filters false (reading) If true, filter expressions are be applied during the parquet decoding operation to reduce the number of rows decoded. This optimization is sometimes called "late materialization".
datafusion.execution.parquet.reorder_filters false (reading) If true, filter expressions evaluated during the parquet decoding operation will be reordered heuristically to minimize the cost of evaluation. If false, the filters are applied in the same order as written in the query
datafusion.execution.parquet.schema_force_string_view false (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`, and `Binary/BinaryLarge` with `BinaryView`.
datafusion.execution.parquet.skip_metadata true (reading) If true, the parquet reader skip the optional embedded metadata that may be in the file Schema. This setting can help avoid schema conflicts when querying multiple parquet files with schemas containing compatible types but different metadata
datafusion.execution.parquet.statistics_enabled NULL (writing) Sets if statistics are enabled for any column Valid values are: "none", "chunk", and "page" These values are not case sensitive. If NULL, uses default parquet writer setting
datafusion.execution.parquet.write_batch_size 1024 (writing) Sets write_batch_size in bytes
datafusion.execution.parquet.writer_version 1.0 (writing) Sets parquet writer version valid values are "1.0" and "2.0"
datafusion.execution.planning_concurrency 13 Fan-out during initial physical planning. This is mostly use to plan `UNION` children in parallel. Defaults to the number of CPU cores on the system
datafusion.execution.schema_force_string_view false If true, listing tables will read columns of `Utf8/Utf8Large` with `Utf8View`, and `Binary/BinaryLarge` with `BinaryView`.
datafusion.execution.soft_max_rows_per_output_file 50000000 Target number of rows in output files when writing multiple. This is a soft max, so it can be exceeded slightly. There also will be one file smaller than the limit if the total number of rows written is not roughly divisible by the soft max
datafusion.execution.sort_in_place_threshold_bytes 1048576 When sorting, below what size should data be concatenated and sorted in a single RecordBatch rather than sorted in batches and merged.
datafusion.execution.sort_spill_reservation_bytes 10485760 Specifies the reserved memory for each spillable sort operation to facilitate an in-memory merge. When a sort operation spills to disk, the in-memory data must be sorted and merged before being written to a file. This setting reserves a specific amount of memory for that in-memory sort/merge process. Note: This setting is irrelevant if the sort operation cannot spill (i.e., if there's no `DiskManager` configured).
Expand Down
Loading

0 comments on commit 322c3d2

Please sign in to comment.