From 7a6a276a38b199aded03028054dafcab58eee30b Mon Sep 17 00:00:00 2001 From: wiedld Date: Fri, 20 Dec 2024 16:20:50 -0800 Subject: [PATCH] fix(11770): fix parallel ParquetSink to encode arrow schema into the file metadata, based on the ParquetOptions --- .../src/datasource/file_format/parquet.rs | 42 +++++++++++++++++-- 1 file changed, 38 insertions(+), 4 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 43de91eeb2b08..7575e91b1c33f 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -45,7 +45,6 @@ use crate::physical_plan::{ use arrow::compute::sum; use datafusion_common::config::{ConfigField, ConfigFileType, TableParquetOptions}; -use datafusion_common::file_options::parquet_writer::ParquetWriterOptions; use datafusion_common::parsers::CompressionTypeVariant; use datafusion_common::stats::Precision; use datafusion_common::{ @@ -792,7 +791,16 @@ impl DataSink for ParquetSink { data: SendableRecordBatchStream, context: &Arc, ) -> Result { - let parquet_props = ParquetWriterOptions::try_from(&self.parquet_options)?; + let parquet_props = if !self.parquet_options.global.skip_arrow_metadata { + let schema = self.config.output_schema(); + self.parquet_options + .into_writer_properties_builder_with_arrow_schema(Some(schema))? + .build() + } else { + self.parquet_options + .into_writer_properties_builder()? + .build() + }; let object_store = context .runtime_env() @@ -836,7 +844,7 @@ impl DataSink for ParquetSink { .create_async_arrow_writer( &path, Arc::clone(&object_store), - parquet_props.writer_options().clone(), + parquet_props.clone(), ) .await?; let mut reservation = @@ -871,7 +879,7 @@ impl DataSink for ParquetSink { writer, rx, schema, - props.writer_options(), + &props, parallel_options_clone, pool, ) @@ -2437,6 +2445,32 @@ mod tests { let (_, file_metadata) = get_written(parquet_sink)?; assert_file_metadata(file_metadata, expected_with.clone()); + // multithreaded write, skip insert + let opts = ParquetOptions { + allow_single_file_parallelism: true, + maximum_parallel_row_group_writers: 2, + maximum_buffered_record_batches_per_stream: 2, + skip_arrow_metadata: true, + ..Default::default() + }; + let parquet_sink = + create_written_parquet_sink_using_config("file:///", opts).await?; + let (_, file_metadata) = get_written(parquet_sink)?; + assert_file_metadata(file_metadata, expected_without); + + // multithreaded write, do not skip insert + let opts = ParquetOptions { + allow_single_file_parallelism: true, + maximum_parallel_row_group_writers: 2, + maximum_buffered_record_batches_per_stream: 2, + skip_arrow_metadata: false, + ..Default::default() + }; + let parquet_sink = + create_written_parquet_sink_using_config("file:///", opts).await?; + let (_, file_metadata) = get_written(parquet_sink)?; + assert_file_metadata(file_metadata, expected_with); + Ok(()) }