Skip to content

Commit

Permalink
reader
Browse files Browse the repository at this point in the history
  • Loading branch information
XiangpengHao committed Sep 12, 2024
1 parent db548ee commit 78c1ca1
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 1 deletion.
15 changes: 15 additions & 0 deletions benchmarks/src/tpch/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ use datafusion::datasource::file_format::FileFormat;
use datafusion::datasource::listing::{
ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
};
use datafusion::datasource::physical_plan::parquet::Parquet7FileReaderFactory;
use datafusion::datasource::{MemTable, TableProvider};
use datafusion::error::Result;
use datafusion::execution::object_store::ObjectStoreUrl;
use datafusion::physical_plan::display::DisplayableExecutionPlan;
use datafusion::physical_plan::{collect, displayable};
use datafusion::prelude::*;
Expand Down Expand Up @@ -273,6 +275,19 @@ impl RunOpt {

(Arc::new(format), path, DEFAULT_PARQUET_EXTENSION)
}
"parquet7" => {
let path = format!("{path}/{table}");

// let os = ctx.runtime_env().object_store(Url::new(path) ).unwrap();
let object_url = ObjectStoreUrl::parse(&path).unwrap();
let object_store =
ctx.runtime_env().object_store(&object_url).unwrap();
let factory = Parquet7FileReaderFactory::new(object_store);
let format = ParquetFormat::default()
.with_options(ctx.state().table_options().parquet.clone())
.with_reader(Arc::new(factory));
(Arc::new(format), path, DEFAULT_PARQUET_EXTENSION)
}
other => {
unimplemented!("Invalid file format '{}'", other);
}
Expand Down
15 changes: 14 additions & 1 deletion datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ use super::{
use crate::arrow::array::RecordBatch;
use crate::arrow::datatypes::{Fields, Schema, SchemaRef};
use crate::datasource::file_format::file_compression_type::FileCompressionType;
use crate::datasource::physical_plan::{FileGroupDisplay, FileSinkConfig};
use crate::datasource::physical_plan::{
FileGroupDisplay, FileSinkConfig, ParquetFileReaderFactory,
};
use crate::datasource::statistics::{create_max_min_accs, get_col_stats};
use crate::error::Result;
use crate::execution::context::SessionState;
Expand Down Expand Up @@ -168,6 +170,7 @@ impl fmt::Debug for ParquetFormatFactory {
#[derive(Debug, Default)]
pub struct ParquetFormat {
options: TableParquetOptions,
reader: Option<Arc<dyn ParquetFileReaderFactory>>,
}

impl ParquetFormat {
Expand Down Expand Up @@ -253,6 +256,12 @@ impl ParquetFormat {
self.options.global.schema_force_view_types = use_views;
self
}

/// Create a new ParquetFormat with a custom reader
pub fn with_reader(mut self, reader: Arc<dyn ParquetFileReaderFactory>) -> Self {
self.reader = Some(reader);
self
}
}

/// Clears all metadata (Schema level and field level) on an iterator
Expand Down Expand Up @@ -378,6 +387,10 @@ impl FileFormat for ParquetFormat {
let mut builder =
ParquetExecBuilder::new_with_options(conf, self.options.clone());

if let Some(reader) = &self.reader {
builder = builder.with_parquet_file_reader_factory(reader.clone());
}

// If enable pruning then combine the filters to build the predicate.
// If disable pruning then set the predicate to None, thus readers
// will not prune data based on the statistics.
Expand Down
2 changes: 2 additions & 0 deletions datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ mod metrics;
mod opener;
mod page_filter;
mod reader;
mod reader7;
mod row_filter;
mod row_group_filter;
mod writer;
Expand All @@ -61,6 +62,7 @@ pub use access_plan::{ParquetAccessPlan, RowGroupAccess};
pub use metrics::ParquetFileMetrics;
use opener::ParquetOpener;
pub use reader::{DefaultParquetFileReaderFactory, ParquetFileReaderFactory};
pub use reader7::{Parquet7FileReader, Parquet7FileReaderFactory};
pub use writer::plan_to_parquet;

/// Execution plan for reading one or more Parquet files.
Expand Down
122 changes: 122 additions & 0 deletions datafusion/core/src/datasource/physical_plan/parquet/reader7.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
use crate::datasource::physical_plan::{FileMeta, ParquetFileMetrics};
use bytes::Bytes;
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
use futures::future::BoxFuture;
use futures::FutureExt;
use hashbrown::HashMap;
use object_store::path::Path;
use object_store::ObjectStore;
use parquet::arrow::async_reader::{AsyncFileReader, ParquetObjectReader};
use parquet::file::metadata::ParquetMetaData;
use std::ops::Range;
use std::sync::{Arc, LazyLock, RwLock};

use super::ParquetFileReaderFactory;

static META_DATA_CACHE: LazyLock<RwLock<MetadataCache>> =
LazyLock::new(|| RwLock::new(MetadataCache::new()));

pub struct MetadataCache {
map: HashMap<Path, Arc<ParquetMetaData>>,
}

impl MetadataCache {
fn new() -> Self {
Self {
map: HashMap::new(),
}
}

fn get() -> &'static RwLock<MetadataCache> {
&*META_DATA_CACHE
}
}

/// Doc
#[derive(Debug)]
pub struct Parquet7FileReaderFactory {
store: Arc<dyn ObjectStore>,
}

impl Parquet7FileReaderFactory {
/// Doc
pub fn new(store: Arc<dyn ObjectStore>) -> Self {
Self { store }
}
}

impl ParquetFileReaderFactory for Parquet7FileReaderFactory {
fn create_reader(
&self,
partition_index: usize,
file_meta: FileMeta,
metadata_size_hint: Option<usize>,
metrics: &ExecutionPlanMetricsSet,
) -> datafusion_common::Result<Box<dyn AsyncFileReader + Send>> {
let file_metrics = ParquetFileMetrics::new(
partition_index,
file_meta.location().as_ref(),
metrics,
);
let store = Arc::clone(&self.store);
let mut inner = ParquetObjectReader::new(store, file_meta.object_meta);

if let Some(hint) = metadata_size_hint {
inner = inner.with_footer_size_hint(hint)
};

Ok(Box::new(Parquet7FileReader {
inner,
file_metrics,
}))
}
}

/// doc
pub struct Parquet7FileReader {
/// doc
pub file_metrics: ParquetFileMetrics,
/// doc
pub inner: ParquetObjectReader,
}

impl AsyncFileReader for Parquet7FileReader {
fn get_byte_ranges(
&mut self,
ranges: Vec<Range<usize>>,
) -> BoxFuture<'_, parquet::errors::Result<Vec<Bytes>>> {
let total = ranges.iter().map(|r| r.end - r.start).sum();
self.file_metrics.bytes_scanned.add(total);
self.inner.get_byte_ranges(ranges)
}

fn get_bytes(
&mut self,
range: Range<usize>,
) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
self.file_metrics.bytes_scanned.add(range.end - range.start);
self.inner.get_bytes(range)
}

fn get_metadata(
&mut self,
) -> BoxFuture<'_, parquet::errors::Result<Arc<ParquetMetaData>>> {
let cache = MetadataCache::get().read().unwrap();
let path = &self.inner.meta.location;

if let Some(meta) = cache.map.get(path) {
let meta = meta.clone();
return async move { Ok(meta) }.boxed();
}

let path = self.inner.meta.location.clone();
let get_meta = self.inner.get_metadata();
async move {
let meta = get_meta.await?;
let mut meta_cache = MetadataCache::get().write().unwrap();
meta_cache.map.entry(path).or_insert(meta.clone());
Ok(meta)
}
.boxed()
}
}

0 comments on commit 78c1ca1

Please sign in to comment.