diff --git a/benchmarks/src/tpch/run.rs b/benchmarks/src/tpch/run.rs index 1a1f51f700651..9a1874be66cb7 100644 --- a/benchmarks/src/tpch/run.rs +++ b/benchmarks/src/tpch/run.rs @@ -31,8 +31,10 @@ use datafusion::datasource::file_format::FileFormat; use datafusion::datasource::listing::{ ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl, }; +use datafusion::datasource::physical_plan::parquet::Parquet7FileReaderFactory; use datafusion::datasource::{MemTable, TableProvider}; use datafusion::error::Result; +use datafusion::execution::object_store::ObjectStoreUrl; use datafusion::physical_plan::display::DisplayableExecutionPlan; use datafusion::physical_plan::{collect, displayable}; use datafusion::prelude::*; @@ -273,6 +275,19 @@ impl RunOpt { (Arc::new(format), path, DEFAULT_PARQUET_EXTENSION) } + "parquet7" => { + let path = format!("{path}/{table}"); + + // let os = ctx.runtime_env().object_store(Url::new(path) ).unwrap(); + let object_url = ObjectStoreUrl::parse(&path).unwrap(); + let object_store = + ctx.runtime_env().object_store(&object_url).unwrap(); + let factory = Parquet7FileReaderFactory::new(object_store); + let format = ParquetFormat::default() + .with_options(ctx.state().table_options().parquet.clone()) + .with_reader(Arc::new(factory)); + (Arc::new(format), path, DEFAULT_PARQUET_EXTENSION) + } other => { unimplemented!("Invalid file format '{}'", other); } diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 2a862dd6dcb37..12a232b592dd1 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -31,7 +31,9 @@ use super::{ use crate::arrow::array::RecordBatch; use crate::arrow::datatypes::{Fields, Schema, SchemaRef}; use crate::datasource::file_format::file_compression_type::FileCompressionType; -use crate::datasource::physical_plan::{FileGroupDisplay, FileSinkConfig}; +use crate::datasource::physical_plan::{ + FileGroupDisplay, FileSinkConfig, ParquetFileReaderFactory, +}; use crate::datasource::statistics::{create_max_min_accs, get_col_stats}; use crate::error::Result; use crate::execution::context::SessionState; @@ -168,6 +170,7 @@ impl fmt::Debug for ParquetFormatFactory { #[derive(Debug, Default)] pub struct ParquetFormat { options: TableParquetOptions, + reader: Option>, } impl ParquetFormat { @@ -253,6 +256,12 @@ impl ParquetFormat { self.options.global.schema_force_view_types = use_views; self } + + /// Create a new ParquetFormat with a custom reader + pub fn with_reader(mut self, reader: Arc) -> Self { + self.reader = Some(reader); + self + } } /// Clears all metadata (Schema level and field level) on an iterator @@ -378,6 +387,10 @@ impl FileFormat for ParquetFormat { let mut builder = ParquetExecBuilder::new_with_options(conf, self.options.clone()); + if let Some(reader) = &self.reader { + builder = builder.with_parquet_file_reader_factory(reader.clone()); + } + // If enable pruning then combine the filters to build the predicate. // If disable pruning then set the predicate to None, thus readers // will not prune data based on the statistics. diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index 54d4d7262a8e6..52f3a6b97c38b 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -50,6 +50,7 @@ mod metrics; mod opener; mod page_filter; mod reader; +mod reader7; mod row_filter; mod row_group_filter; mod writer; @@ -61,6 +62,7 @@ pub use access_plan::{ParquetAccessPlan, RowGroupAccess}; pub use metrics::ParquetFileMetrics; use opener::ParquetOpener; pub use reader::{DefaultParquetFileReaderFactory, ParquetFileReaderFactory}; +pub use reader7::{Parquet7FileReader, Parquet7FileReaderFactory}; pub use writer::plan_to_parquet; /// Execution plan for reading one or more Parquet files. diff --git a/datafusion/core/src/datasource/physical_plan/parquet/reader7.rs b/datafusion/core/src/datasource/physical_plan/parquet/reader7.rs new file mode 100644 index 0000000000000..0d756ddda661b --- /dev/null +++ b/datafusion/core/src/datasource/physical_plan/parquet/reader7.rs @@ -0,0 +1,122 @@ +use crate::datasource::physical_plan::{FileMeta, ParquetFileMetrics}; +use bytes::Bytes; +use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; +use futures::future::BoxFuture; +use futures::FutureExt; +use hashbrown::HashMap; +use object_store::path::Path; +use object_store::ObjectStore; +use parquet::arrow::async_reader::{AsyncFileReader, ParquetObjectReader}; +use parquet::file::metadata::ParquetMetaData; +use std::ops::Range; +use std::sync::{Arc, LazyLock, RwLock}; + +use super::ParquetFileReaderFactory; + +static META_DATA_CACHE: LazyLock> = + LazyLock::new(|| RwLock::new(MetadataCache::new())); + +pub struct MetadataCache { + map: HashMap>, +} + +impl MetadataCache { + fn new() -> Self { + Self { + map: HashMap::new(), + } + } + + fn get() -> &'static RwLock { + &*META_DATA_CACHE + } +} + +/// Doc +#[derive(Debug)] +pub struct Parquet7FileReaderFactory { + store: Arc, +} + +impl Parquet7FileReaderFactory { + /// Doc + pub fn new(store: Arc) -> Self { + Self { store } + } +} + +impl ParquetFileReaderFactory for Parquet7FileReaderFactory { + fn create_reader( + &self, + partition_index: usize, + file_meta: FileMeta, + metadata_size_hint: Option, + metrics: &ExecutionPlanMetricsSet, + ) -> datafusion_common::Result> { + let file_metrics = ParquetFileMetrics::new( + partition_index, + file_meta.location().as_ref(), + metrics, + ); + let store = Arc::clone(&self.store); + let mut inner = ParquetObjectReader::new(store, file_meta.object_meta); + + if let Some(hint) = metadata_size_hint { + inner = inner.with_footer_size_hint(hint) + }; + + Ok(Box::new(Parquet7FileReader { + inner, + file_metrics, + })) + } +} + +/// doc +pub struct Parquet7FileReader { + /// doc + pub file_metrics: ParquetFileMetrics, + /// doc + pub inner: ParquetObjectReader, +} + +impl AsyncFileReader for Parquet7FileReader { + fn get_byte_ranges( + &mut self, + ranges: Vec>, + ) -> BoxFuture<'_, parquet::errors::Result>> { + let total = ranges.iter().map(|r| r.end - r.start).sum(); + self.file_metrics.bytes_scanned.add(total); + self.inner.get_byte_ranges(ranges) + } + + fn get_bytes( + &mut self, + range: Range, + ) -> BoxFuture<'_, parquet::errors::Result> { + self.file_metrics.bytes_scanned.add(range.end - range.start); + self.inner.get_bytes(range) + } + + fn get_metadata( + &mut self, + ) -> BoxFuture<'_, parquet::errors::Result>> { + let cache = MetadataCache::get().read().unwrap(); + let path = &self.inner.meta.location; + + if let Some(meta) = cache.map.get(path) { + let meta = meta.clone(); + return async move { Ok(meta) }.boxed(); + } + + let path = self.inner.meta.location.clone(); + let get_meta = self.inner.get_metadata(); + async move { + let meta = get_meta.await?; + let mut meta_cache = MetadataCache::get().write().unwrap(); + meta_cache.map.entry(path).or_insert(meta.clone()); + Ok(meta) + } + .boxed() + } +}