Skip to content

Commit

Permalink
move config to parquet-only
Browse files Browse the repository at this point in the history
  • Loading branch information
XiangpengHao committed Jul 26, 2024
1 parent 8d960bf commit 4805e12
Show file tree
Hide file tree
Showing 15 changed files with 156 additions and 94 deletions.
6 changes: 5 additions & 1 deletion benchmarks/src/clickbench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,11 @@ impl RunOpt {
};

let mut config = self.common.config();
config.options_mut().execution.schema_force_string_view = self.common.string_view;
config
.options_mut()
.execution
.parquet
.schema_force_string_view = self.common.string_view;

let ctx = SessionContext::new_with_config(config);
self.register_hits(&ctx).await?;
Expand Down
6 changes: 5 additions & 1 deletion benchmarks/src/tpch/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,11 @@ impl RunOpt {
.config()
.with_collect_statistics(!self.disable_statistics);
config.options_mut().optimizer.prefer_hash_join = self.prefer_hash_join;
config.options_mut().execution.schema_force_string_view = self.common.string_view;
config
.options_mut()
.execution
.parquet
.schema_force_string_view = self.common.string_view;
let ctx = SessionContext::new_with_config(config);

// register tables
Expand Down
8 changes: 4 additions & 4 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,10 +324,6 @@ config_namespace! {

/// Should DataFusion keep the columns used for partition_by in the output RecordBatches
pub keep_partition_by_columns: bool, default = false

/// If true, listing tables will read columns of `Utf8/Utf8Large` with `Utf8View`,
/// and `Binary/BinaryLarge` with `BinaryView`.
pub schema_force_string_view: bool, default = false
}
}

Expand Down Expand Up @@ -473,6 +469,10 @@ config_namespace! {
/// writing out already in-memory data, such as from a cached
/// data frame.
pub maximum_buffered_record_batches_per_stream: usize, default = 2

/// If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`,
/// and `Binary/BinaryLarge` with `BinaryView`.
pub schema_force_string_view: bool, default = false
}
}

Expand Down
23 changes: 23 additions & 0 deletions datafusion/core/src/datasource/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use crate::error::Result;
use crate::execution::context::SessionState;
use crate::physical_plan::{ExecutionPlan, Statistics};

use arrow_schema::{DataType, Field, Schema};
use datafusion_common::file_options::file_type::FileType;
use datafusion_common::{internal_err, not_impl_err, GetExt};
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement};
Expand Down Expand Up @@ -204,6 +205,28 @@ pub fn file_type_to_format(
}
}

/// Transform a schema to use view types for Utf8 and Binary
pub fn transform_schema_to_view(schema: &Schema) -> Schema {
let transformed_fields: Vec<Arc<Field>> = schema
.fields
.iter()
.map(|field| match field.data_type() {
DataType::Utf8 | DataType::LargeUtf8 => Arc::new(Field::new(
field.name(),
DataType::Utf8View,
field.is_nullable(),
)),
DataType::Binary | DataType::LargeBinary => Arc::new(Field::new(
field.name(),
DataType::BinaryView,
field.is_nullable(),
)),
_ => field.clone(),
})
.collect();
Schema::new_with_metadata(transformed_fields, schema.metadata.clone())
}

#[cfg(test)]
pub(crate) mod test_util {
use std::ops::Range;
Expand Down
13 changes: 12 additions & 1 deletion datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::sync::Arc;

use super::write::demux::start_demuxer_task;
use super::write::{create_writer, SharedBuffer};
use super::{FileFormat, FileFormatFactory, FileScanConfig};
use super::{transform_schema_to_view, FileFormat, FileFormatFactory, FileScanConfig};
use crate::arrow::array::RecordBatch;
use crate::arrow::datatypes::{Fields, Schema, SchemaRef};
use crate::datasource::file_format::file_compression_type::FileCompressionType;
Expand Down Expand Up @@ -316,6 +316,17 @@ impl FileFormat for ParquetFormat {
Schema::try_merge(schemas)
}?;

let schema = if state
.config_options()
.execution
.parquet
.schema_force_string_view
{
transform_schema_to_view(&schema)
} else {
schema
};

Ok(Arc::new(schema))
}

Expand Down
27 changes: 2 additions & 25 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -410,31 +410,8 @@ impl ListingOptions {
.try_collect()
.await?;

let mut schema = self.format.infer_schema(state, &store, &files).await?;

if state.config_options().execution.schema_force_string_view {
let transformed_fields: Vec<Arc<Field>> = schema
.fields
.iter()
.map(|field| match field.data_type() {
DataType::Utf8 | DataType::LargeUtf8 => Arc::new(Field::new(
field.name(),
DataType::Utf8View,
field.is_nullable(),
)),
DataType::Binary | DataType::LargeBinary => Arc::new(Field::new(
field.name(),
DataType::BinaryView,
field.is_nullable(),
)),
_ => field.clone(),
})
.collect();
schema = Arc::new(Schema::new_with_metadata(
transformed_fields,
schema.metadata.clone(),
));
}
let schema = self.format.infer_schema(state, &store, &files).await?;

Ok(schema)
}

Expand Down
4 changes: 4 additions & 0 deletions datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -711,6 +711,10 @@ impl ExecutionPlan for ParquetExec {
enable_page_index: self.enable_page_index(),
enable_bloom_filter: self.bloom_filter_on_read(),
schema_adapter_factory,
schema_force_string_view: self
.table_parquet_options
.global
.schema_force_string_view,
};

let stream =
Expand Down
26 changes: 21 additions & 5 deletions datafusion/core/src/datasource/physical_plan/parquet/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
//! [`ParquetOpener`] for opening Parquet files
use crate::datasource::physical_plan::parquet::page_filter::PagePruningAccessPlanFilter;
use crate::datasource::file_format::transform_schema_to_view;
use crate::datasource::physical_plan::parquet::page_filter::PagePruningPredicate;
use crate::datasource::physical_plan::parquet::row_group_filter::RowGroupAccessPlanFilter;
use crate::datasource::physical_plan::parquet::{
row_filter, should_enable_page_index, ParquetAccessPlan,
Expand All @@ -33,7 +35,7 @@ use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
use futures::{StreamExt, TryStreamExt};
use log::debug;
use parquet::arrow::arrow_reader::ArrowReaderOptions;
use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
use parquet::arrow::async_reader::AsyncFileReader;
use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
use std::sync::Arc;
Expand All @@ -56,6 +58,7 @@ pub(super) struct ParquetOpener {
pub enable_page_index: bool,
pub enable_bloom_filter: bool,
pub schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
pub schema_force_string_view: bool,
}

impl FileOpener for ParquetOpener {
Expand All @@ -66,7 +69,7 @@ impl FileOpener for ParquetOpener {
let file_metrics =
ParquetFileMetrics::new(self.partition_index, &file_name, &self.metrics);

let reader: Box<dyn AsyncFileReader> =
let mut reader: Box<dyn AsyncFileReader> =
self.parquet_file_reader_factory.create_reader(
self.partition_index,
file_meta,
Expand All @@ -90,14 +93,27 @@ impl FileOpener for ParquetOpener {
);
let enable_bloom_filter = self.enable_bloom_filter;
let limit = self.limit;
let schema_force_string_view = self.schema_force_string_view;

Ok(Box::pin(async move {
let options = ArrowReaderOptions::new().with_page_index(enable_page_index);

let metadata =
ArrowReaderMetadata::load_async(&mut reader, options.clone()).await?;
let mut schema = metadata.schema().clone();

if schema_force_string_view {
schema = Arc::new(transform_schema_to_view(&schema));
}

let options = ArrowReaderOptions::new()
.with_page_index(enable_page_index)
.with_schema(table_schema.clone());
.with_schema(schema.clone());
let metadata =
ArrowReaderMetadata::try_new(metadata.metadata().clone(), options)?;

let mut builder =
ParquetRecordBatchStreamBuilder::new_with_options(reader, options)
.await?;
ParquetRecordBatchStreamBuilder::new_with_metadata(reader, metadata);

let file_schema = builder.schema().clone();

Expand Down
1 change: 1 addition & 0 deletions datafusion/proto-common/proto/datafusion_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,7 @@ message ParquetOptions {
uint64 maximum_buffered_record_batches_per_stream = 25; // default = 2
bool bloom_filter_on_read = 26; // default = true
bool bloom_filter_on_write = 27; // default = false
bool schema_force_string_view = 28; // default = false

oneof metadata_size_hint_opt {
uint64 metadata_size_hint = 4;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/proto-common/src/from_proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -955,7 +955,7 @@ impl TryFrom<&protobuf::ParquetOptions> for ParquetOptions {
allow_single_file_parallelism: value.allow_single_file_parallelism,
maximum_parallel_row_group_writers: value.maximum_parallel_row_group_writers as usize,
maximum_buffered_record_batches_per_stream: value.maximum_buffered_record_batches_per_stream as usize,

schema_force_string_view: value.schema_force_string_view,
})
}
}
Expand Down
18 changes: 18 additions & 0 deletions datafusion/proto-common/src/generated/pbjson.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4780,6 +4780,9 @@ impl serde::Serialize for ParquetOptions {
if self.bloom_filter_on_write {
len += 1;
}
if self.schema_force_string_view {
len += 1;
}
if self.dictionary_page_size_limit != 0 {
len += 1;
}
Expand Down Expand Up @@ -4863,6 +4866,9 @@ impl serde::Serialize for ParquetOptions {
if self.bloom_filter_on_write {
struct_ser.serialize_field("bloomFilterOnWrite", &self.bloom_filter_on_write)?;
}
if self.schema_force_string_view {
struct_ser.serialize_field("schemaForceStringView", &self.schema_force_string_view)?;
}
if self.dictionary_page_size_limit != 0 {
#[allow(clippy::needless_borrow)]
struct_ser.serialize_field("dictionaryPageSizeLimit", ToString::to_string(&self.dictionary_page_size_limit).as_str())?;
Expand Down Expand Up @@ -4980,6 +4986,8 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions {
"bloomFilterOnRead",
"bloom_filter_on_write",
"bloomFilterOnWrite",
"schema_force_string_view",
"schemaForceStringView",
"dictionary_page_size_limit",
"dictionaryPageSizeLimit",
"data_page_row_count_limit",
Expand Down Expand Up @@ -5021,6 +5029,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions {
MaximumBufferedRecordBatchesPerStream,
BloomFilterOnRead,
BloomFilterOnWrite,
SchemaForceStringView,
DictionaryPageSizeLimit,
DataPageRowCountLimit,
MaxRowGroupSize,
Expand Down Expand Up @@ -5068,6 +5077,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions {
"maximumBufferedRecordBatchesPerStream" | "maximum_buffered_record_batches_per_stream" => Ok(GeneratedField::MaximumBufferedRecordBatchesPerStream),
"bloomFilterOnRead" | "bloom_filter_on_read" => Ok(GeneratedField::BloomFilterOnRead),
"bloomFilterOnWrite" | "bloom_filter_on_write" => Ok(GeneratedField::BloomFilterOnWrite),
"schemaForceStringView" | "schema_force_string_view" => Ok(GeneratedField::SchemaForceStringView),
"dictionaryPageSizeLimit" | "dictionary_page_size_limit" => Ok(GeneratedField::DictionaryPageSizeLimit),
"dataPageRowCountLimit" | "data_page_row_count_limit" => Ok(GeneratedField::DataPageRowCountLimit),
"maxRowGroupSize" | "max_row_group_size" => Ok(GeneratedField::MaxRowGroupSize),
Expand Down Expand Up @@ -5113,6 +5123,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions {
let mut maximum_buffered_record_batches_per_stream__ = None;
let mut bloom_filter_on_read__ = None;
let mut bloom_filter_on_write__ = None;
let mut schema_force_string_view__ = None;
let mut dictionary_page_size_limit__ = None;
let mut data_page_row_count_limit__ = None;
let mut max_row_group_size__ = None;
Expand Down Expand Up @@ -5214,6 +5225,12 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions {
}
bloom_filter_on_write__ = Some(map_.next_value()?);
}
GeneratedField::SchemaForceStringView => {
if schema_force_string_view__.is_some() {
return Err(serde::de::Error::duplicate_field("schemaForceStringView"));
}
schema_force_string_view__ = Some(map_.next_value()?);
}
GeneratedField::DictionaryPageSizeLimit => {
if dictionary_page_size_limit__.is_some() {
return Err(serde::de::Error::duplicate_field("dictionaryPageSizeLimit"));
Expand Down Expand Up @@ -5314,6 +5331,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions {
maximum_buffered_record_batches_per_stream: maximum_buffered_record_batches_per_stream__.unwrap_or_default(),
bloom_filter_on_read: bloom_filter_on_read__.unwrap_or_default(),
bloom_filter_on_write: bloom_filter_on_write__.unwrap_or_default(),
schema_force_string_view: schema_force_string_view__.unwrap_or_default(),
dictionary_page_size_limit: dictionary_page_size_limit__.unwrap_or_default(),
data_page_row_count_limit: data_page_row_count_limit__.unwrap_or_default(),
max_row_group_size: max_row_group_size__.unwrap_or_default(),
Expand Down
3 changes: 3 additions & 0 deletions datafusion/proto-common/src/generated/prost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,9 @@ pub struct ParquetOptions {
/// default = false
#[prost(bool, tag = "27")]
pub bloom_filter_on_write: bool,
/// default = false
#[prost(bool, tag = "28")]
pub schema_force_string_view: bool,
#[prost(uint64, tag = "12")]
pub dictionary_page_size_limit: u64,
#[prost(uint64, tag = "18")]
Expand Down
1 change: 1 addition & 0 deletions datafusion/proto-common/src/to_proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -826,6 +826,7 @@ impl TryFrom<&ParquetOptions> for protobuf::ParquetOptions {
allow_single_file_parallelism: value.allow_single_file_parallelism,
maximum_parallel_row_group_writers: value.maximum_parallel_row_group_writers as u64,
maximum_buffered_record_batches_per_stream: value.maximum_buffered_record_batches_per_stream as u64,
schema_force_string_view: value.schema_force_string_view,
})
}
}
Expand Down
Loading

0 comments on commit 4805e12

Please sign in to comment.