Skip to content

Commit

Permalink
introduce binary_as_string parquet option
Browse files Browse the repository at this point in the history
  • Loading branch information
goldmedal authored and alamb committed Oct 25, 2024
1 parent 128dd14 commit 470b3a8
Show file tree
Hide file tree
Showing 16 changed files with 511 additions and 174 deletions.
15 changes: 9 additions & 6 deletions benchmarks/src/clickbench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,15 @@ impl RunOpt {
None => queries.min_query_id()..=queries.max_query_id(),
};

// configure parquet options
let mut config = self.common.config();
config
.options_mut()
.execution
.parquet
.schema_force_view_types = self.common.force_view_types;
{
let parquet_options = &mut config.options_mut().execution.parquet;
parquet_options.schema_force_view_types = self.common.force_view_types;
// The hits_partitioned dataset specifies string columns
// as binary due to how it was written. Force it to strings
parquet_options.binary_as_string = true;
}

let ctx = SessionContext::new_with_config(config);
self.register_hits(&ctx).await?;
Expand Down Expand Up @@ -148,7 +151,7 @@ impl RunOpt {
Ok(())
}

/// Registrs the `hits.parquet` as a table named `hits`
/// Registers the `hits.parquet` as a table named `hits`
async fn register_hits(&self, ctx: &SessionContext) -> Result<()> {
let options = Default::default();
let path = self.path.as_os_str().to_str().unwrap();
Expand Down
8 changes: 8 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,14 @@ config_namespace! {
/// and `Binary/BinaryLarge` with `BinaryView`.
pub schema_force_view_types: bool, default = false

/// (reading) If true, parquet reader will read columns of
/// `Binary/LargeBinary` with `Utf8`, and `BinaryView` with `Utf8View`.
///
/// Parquet files generated by some legacy writers do not correctly set
/// the UTF8 flag for strings, causing string columns to be loaded as
/// BLOB instead.
pub binary_as_string: bool, default = false

// The following options affect writing to parquet files
// and map to parquet::file::properties::WriterProperties

Expand Down
3 changes: 3 additions & 0 deletions datafusion/common/src/file_options/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ impl ParquetOptions {
maximum_buffered_record_batches_per_stream: _,
bloom_filter_on_read: _, // reads not used for writer props
schema_force_view_types: _,
binary_as_string: _, // not used for writer props
} = self;

let mut builder = WriterProperties::builder()
Expand Down Expand Up @@ -442,6 +443,7 @@ mod tests {
.maximum_buffered_record_batches_per_stream,
bloom_filter_on_read: defaults.bloom_filter_on_read,
schema_force_view_types: defaults.schema_force_view_types,
binary_as_string: defaults.binary_as_string,
}
}

Expand Down Expand Up @@ -543,6 +545,7 @@ mod tests {
.maximum_buffered_record_batches_per_stream,
bloom_filter_on_read: global_options_defaults.bloom_filter_on_read,
schema_force_view_types: global_options_defaults.schema_force_view_types,
binary_as_string: global_options_defaults.binary_as_string,
},
column_specific_options,
key_value_metadata,
Expand Down
112 changes: 95 additions & 17 deletions datafusion/core/src/datasource/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use crate::error::Result;
use crate::execution::context::SessionState;
use crate::physical_plan::{ExecutionPlan, Statistics};

use arrow_schema::{DataType, Field, Schema};
use arrow_schema::{DataType, Field, FieldRef, Schema};
use datafusion_common::file_options::file_type::FileType;
use datafusion_common::{internal_err, not_impl_err, GetExt};
use datafusion_expr::Expr;
Expand Down Expand Up @@ -235,20 +235,26 @@ pub fn file_type_to_format(
}
}

/// Create a new field with the specified data type, copying the other
/// properties from the input field
fn field_with_new_type(field: &FieldRef, new_type: DataType) -> FieldRef {
Arc::new(field.as_ref().clone().with_data_type(new_type))
}

/// Transform a schema to use view types for Utf8 and Binary
///
/// See [parquet::ParquetFormat::force_view_types] for details
pub fn transform_schema_to_view(schema: &Schema) -> Schema {
let transformed_fields: Vec<Arc<Field>> = schema
.fields
.iter()
.map(|field| match field.data_type() {
DataType::Utf8 | DataType::LargeUtf8 => Arc::new(
Field::new(field.name(), DataType::Utf8View, field.is_nullable())
.with_metadata(field.metadata().to_owned()),
),
DataType::Binary | DataType::LargeBinary => Arc::new(
Field::new(field.name(), DataType::BinaryView, field.is_nullable())
.with_metadata(field.metadata().to_owned()),
),
DataType::Utf8 | DataType::LargeUtf8 => {
field_with_new_type(field, DataType::Utf8View)
}
DataType::Binary | DataType::LargeBinary => {
field_with_new_type(field, DataType::BinaryView)
}
_ => field.clone(),
})
.collect();
Expand All @@ -274,6 +280,7 @@ pub(crate) fn coerce_file_schema_to_view_type(
(f.name(), dt)
})
.collect();

if !transform {
return None;
}
Expand All @@ -283,14 +290,13 @@ pub(crate) fn coerce_file_schema_to_view_type(
.iter()
.map(
|field| match (table_fields.get(field.name()), field.data_type()) {
(Some(DataType::Utf8View), DataType::Utf8)
| (Some(DataType::Utf8View), DataType::LargeUtf8) => Arc::new(
Field::new(field.name(), DataType::Utf8View, field.is_nullable()),
),
(Some(DataType::BinaryView), DataType::Binary)
| (Some(DataType::BinaryView), DataType::LargeBinary) => Arc::new(
Field::new(field.name(), DataType::BinaryView, field.is_nullable()),
),
(Some(DataType::Utf8View), DataType::Utf8 | DataType::LargeUtf8) => {
field_with_new_type(field, DataType::Utf8View)
}
(
Some(DataType::BinaryView),
DataType::Binary | DataType::LargeBinary,
) => field_with_new_type(field, DataType::BinaryView),
_ => field.clone(),
},
)
Expand All @@ -302,6 +308,78 @@ pub(crate) fn coerce_file_schema_to_view_type(
))
}

/// Transform a schema so that any binary types are strings
pub fn transform_binary_to_string(schema: &Schema) -> Schema {
let transformed_fields: Vec<Arc<Field>> = schema
.fields
.iter()
.map(|field| match field.data_type() {
DataType::Binary => field_with_new_type(field, DataType::Utf8),
DataType::LargeBinary => field_with_new_type(field, DataType::LargeUtf8),
DataType::BinaryView => field_with_new_type(field, DataType::Utf8View),
_ => field.clone(),
})
.collect();
Schema::new_with_metadata(transformed_fields, schema.metadata.clone())
}

/// If the table schema uses a string type, coerce the file schema to use a string type.
///
/// See [parquet::ParquetFormat::binary_as_string] for details
pub(crate) fn coerce_file_schema_to_string_type(
table_schema: &Schema,
file_schema: &Schema,
) -> Option<Schema> {
let mut transform = false;
let table_fields: HashMap<_, _> = table_schema
.fields
.iter()
.map(|f| (f.name(), f.data_type()))
.collect();
let transformed_fields: Vec<Arc<Field>> = file_schema
.fields
.iter()
.map(
|field| match (table_fields.get(field.name()), field.data_type()) {
// table schema uses string type, coerce the file schema to use string type
(
Some(DataType::Utf8),
DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
) => {
transform = true;
field_with_new_type(field, DataType::Utf8)
}
// table schema uses large string type, coerce the file schema to use large string type
(
Some(DataType::LargeUtf8),
DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
) => {
transform = true;
field_with_new_type(field, DataType::LargeUtf8)
}
// table schema uses string view type, coerce the file schema to use view type
(
Some(DataType::Utf8View),
DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
) => {
transform = true;
field_with_new_type(field, DataType::Utf8View)
}
_ => field.clone(),
},
)
.collect();

if !transform {
None
} else {
Some(Schema::new_with_metadata(
transformed_fields,
file_schema.metadata.clone(),
))
}
}

#[cfg(test)]
pub(crate) mod test_util {
use std::ops::Range;
Expand Down
37 changes: 32 additions & 5 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ use std::sync::Arc;
use super::write::demux::start_demuxer_task;
use super::write::{create_writer, SharedBuffer};
use super::{
coerce_file_schema_to_view_type, transform_schema_to_view, FileFormat,
FileFormatFactory, FilePushdownSupport, FileScanConfig,
coerce_file_schema_to_string_type, coerce_file_schema_to_view_type,
transform_binary_to_string, transform_schema_to_view, FileFormat, FileFormatFactory,
FilePushdownSupport, FileScanConfig,
};
use crate::arrow::array::RecordBatch;
use crate::arrow::datatypes::{Fields, Schema, SchemaRef};
Expand Down Expand Up @@ -253,13 +254,29 @@ impl ParquetFormat {
self.options.global.schema_force_view_types
}

/// If true, will use view types (StringView and BinaryView).
///
/// Refer to [`Self::force_view_types`].
/// If true, will use view types. See [`Self::force_view_types`] for details
pub fn with_force_view_types(mut self, use_views: bool) -> Self {
self.options.global.schema_force_view_types = use_views;
self
}

/// Return `true` if binary types will be read as strings.
///
/// If this returns true, DataFusion will instruct the parquet reader
/// to read binary columns such as `Binary` or `BinaryView` as the
/// corresponding string type such as `Utf8` or `LargeUtf8`.
/// The parquet reader has special optimizations for `Utf8` and `LargeUtf8`
/// validation, and such queries are significantly faster than reading
/// binary columns and then casting to string columns.
pub fn binary_as_string(&self) -> bool {
self.options.global.binary_as_string
}

/// If true, will read binary types as strings. See [`Self::binary_as_string`] for details
pub fn with_binary_as_string(mut self, binary_as_string: bool) -> Self {
self.options.global.binary_as_string = binary_as_string;
self
}
}

/// Clears all metadata (Schema level and field level) on an iterator
Expand Down Expand Up @@ -350,6 +367,12 @@ impl FileFormat for ParquetFormat {
Schema::try_merge(schemas)
}?;

let schema = if self.binary_as_string() {
transform_binary_to_string(&schema)
} else {
schema
};

let schema = if self.force_view_types() {
transform_schema_to_view(&schema)
} else {
Expand Down Expand Up @@ -552,6 +575,10 @@ pub fn statistics_from_parquet_meta_calc(
file_metadata.schema_descr(),
file_metadata.key_value_metadata(),
)?;
if let Some(merged) = coerce_file_schema_to_string_type(&table_schema, &file_schema) {
file_schema = merged;
}

if let Some(merged) = coerce_file_schema_to_view_type(&table_schema, &file_schema) {
file_schema = merged;
}
Expand Down
23 changes: 16 additions & 7 deletions datafusion/core/src/datasource/physical_plan/parquet/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

//! [`ParquetOpener`] for opening Parquet files
use crate::datasource::file_format::coerce_file_schema_to_view_type;
use crate::datasource::file_format::{
coerce_file_schema_to_string_type, coerce_file_schema_to_view_type,
};
use crate::datasource::physical_plan::parquet::page_filter::PagePruningAccessPlanFilter;
use crate::datasource::physical_plan::parquet::row_group_filter::RowGroupAccessPlanFilter;
use crate::datasource::physical_plan::parquet::{
Expand Down Expand Up @@ -80,7 +82,7 @@ pub(super) struct ParquetOpener {
}

impl FileOpener for ParquetOpener {
fn open(&self, file_meta: FileMeta) -> datafusion_common::Result<FileOpenFuture> {
fn open(&self, file_meta: FileMeta) -> Result<FileOpenFuture> {
let file_range = file_meta.range.clone();
let extensions = file_meta.extensions.clone();
let file_name = file_meta.location().to_string();
Expand Down Expand Up @@ -121,7 +123,14 @@ impl FileOpener for ParquetOpener {
let mut metadata_timer = file_metrics.metadata_load_time.timer();
let metadata =
ArrowReaderMetadata::load_async(&mut reader, options.clone()).await?;
let mut schema = metadata.schema().clone();
let mut schema = Arc::clone(metadata.schema());

if let Some(merged) =
coerce_file_schema_to_string_type(&table_schema, &schema)
{
schema = Arc::new(merged);
}

// read with view types
if let Some(merged) = coerce_file_schema_to_view_type(&table_schema, &schema)
{
Expand All @@ -130,16 +139,16 @@ impl FileOpener for ParquetOpener {

let options = ArrowReaderOptions::new()
.with_page_index(enable_page_index)
.with_schema(schema.clone());
.with_schema(Arc::clone(&schema));
let metadata =
ArrowReaderMetadata::try_new(metadata.metadata().clone(), options)?;
ArrowReaderMetadata::try_new(Arc::clone(metadata.metadata()), options)?;

metadata_timer.stop();

let mut builder =
ParquetRecordBatchStreamBuilder::new_with_metadata(reader, metadata);

let file_schema = builder.schema().clone();
let file_schema = Arc::clone(builder.schema());

let (schema_mapping, adapted_projections) =
schema_adapter.map_schema(&file_schema)?;
Expand Down Expand Up @@ -177,7 +186,7 @@ impl FileOpener for ParquetOpener {

// Determine which row groups to actually read. The idea is to skip
// as many row groups as possible based on the metadata and query
let file_metadata = builder.metadata().clone();
let file_metadata = Arc::clone(builder.metadata());
let predicate = pruning_predicate.as_ref().map(|p| p.as_ref());
let rg_metadata = file_metadata.row_groups();
// track which row groups to actually read
Expand Down
1 change: 1 addition & 0 deletions datafusion/proto-common/proto/datafusion_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,7 @@ message ParquetOptions {
bool bloom_filter_on_read = 26; // default = true
bool bloom_filter_on_write = 27; // default = false
bool schema_force_view_types = 28; // default = false
bool binary_as_string = 29; // default = false

oneof metadata_size_hint_opt {
uint64 metadata_size_hint = 4;
Expand Down
Loading

0 comments on commit 470b3a8

Please sign in to comment.