Skip to content

Commit

Permalink
cache for parquet reader factory
Browse files Browse the repository at this point in the history
  • Loading branch information
XiangpengHao committed Sep 12, 2024
1 parent 78c1ca1 commit 6c0174f
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 20 deletions.
9 changes: 3 additions & 6 deletions benchmarks/src/tpch/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,11 +277,8 @@ impl RunOpt {
}
"parquet7" => {
let path = format!("{path}/{table}");

// let os = ctx.runtime_env().object_store(Url::new(path) ).unwrap();
let object_url = ObjectStoreUrl::parse(&path).unwrap();
let object_store =
ctx.runtime_env().object_store(&object_url).unwrap();
let url = ObjectStoreUrl::local_filesystem();
let object_store = ctx.runtime_env().object_store(url).unwrap();
let factory = Parquet7FileReaderFactory::new(object_store);
let format = ParquetFormat::default()
.with_options(ctx.state().table_options().parquet.clone())
Expand All @@ -302,7 +299,7 @@ impl RunOpt {
let config = ListingTableConfig::new(table_path).with_listing_options(options);

let config = match table_format {
"parquet" => config.infer_schema(&state).await?,
"parquet" | "parquet7" => config.infer_schema(&state).await?,
"tbl" => config.with_schema(Arc::new(get_tbl_tpch_table_schema(table))),
"csv" => config.with_schema(Arc::new(get_tpch_table_schema(table))),
_ => unreachable!(),
Expand Down
102 changes: 88 additions & 14 deletions datafusion/core/src/datasource/physical_plan/parquet/reader7.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,27 @@ use std::sync::{Arc, LazyLock, RwLock};

use super::ParquetFileReaderFactory;

static META_DATA_CACHE: LazyLock<RwLock<MetadataCache>> =
LazyLock::new(|| RwLock::new(MetadataCache::new()));
static CACHE: LazyLock<Cache> = LazyLock::new(|| Cache::new());

pub struct MetadataCache {
map: HashMap<Path, Arc<ParquetMetaData>>,
pub struct Cache {
metadata_map: RwLock<HashMap<Path, Arc<ParquetMetaData>>>,
bytes_map: RwLock<HashMap<(Path, Range<usize>), Arc<Bytes>>>,
}

impl MetadataCache {
impl Cache {
fn new() -> Self {
Self {
map: HashMap::new(),
metadata_map: RwLock::new(HashMap::new()),
bytes_map: RwLock::new(HashMap::new()),
}
}

pub fn meta_cache() -> &'static RwLock<HashMap<Path, Arc<ParquetMetaData>>> {
&CACHE.metadata_map
}

fn get() -> &'static RwLock<MetadataCache> {
&*META_DATA_CACHE
pub fn bytes_cache() -> &'static RwLock<HashMap<(Path, Range<usize>), Arc<Bytes>>> {
&CACHE.bytes_map
}
}

Expand Down Expand Up @@ -87,34 +92,103 @@ impl AsyncFileReader for Parquet7FileReader {
) -> BoxFuture<'_, parquet::errors::Result<Vec<Bytes>>> {
let total = ranges.iter().map(|r| r.end - r.start).sum();
self.file_metrics.bytes_scanned.add(total);
self.inner.get_byte_ranges(ranges)

let cache = Cache::bytes_cache().read().unwrap();
let path = self.inner.meta.location.clone();

let mut cached_bytes = Vec::new();
let mut missing_ranges = Vec::new();
let mut missing_indices = Vec::new();

for (i, range) in ranges.iter().enumerate() {
let key = (path.clone(), range.clone());
if let Some(bytes) = cache.get(&key) {
cached_bytes.push((i, bytes.clone()));
} else {
missing_ranges.push(range.clone());
missing_indices.push(i);
}
}

drop(cache); // Release the read lock

if missing_ranges.is_empty() {
cached_bytes.sort_by_key(|&(i, _)| i);
let result = cached_bytes.into_iter().map(|(_, bytes)| (*bytes).clone()).collect();
return async move { Ok(result) }.boxed();
}

let get_bytes = self.inner.get_byte_ranges(missing_ranges);
async move {
let bytes = get_bytes.await?;
let mut cache = Cache::bytes_cache().write().unwrap();

for (i, byte) in missing_indices.iter().zip(bytes.iter()) {
let key = (path.clone(), ranges[*i].clone());
cache.entry(key).or_insert_with(|| Arc::new(byte.clone()));
}

drop(cache); // Release the write lock

let mut result = vec![Bytes::new(); ranges.len()];
for (i, bytes) in cached_bytes {
result[i] = (*bytes).clone();
}
for (i, byte) in missing_indices.into_iter().zip(bytes) {
result[i] = byte;
}
Ok(result)
}
.boxed()
}

fn get_bytes(
&mut self,
range: Range<usize>,
) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
self.file_metrics.bytes_scanned.add(range.end - range.start);
self.inner.get_bytes(range)

let cache = Cache::bytes_cache().read().unwrap();
let path = self.inner.meta.location.clone();
let key = (path.clone(), range.clone());

if let Some(bytes) = cache.get(&key) {
let bytes = bytes.clone();
return async move { Ok((*bytes).clone()) }.boxed();
}

drop(cache); // Release the read lock

let get_bytes = self.inner.get_bytes(range.clone());
async move {
let bytes = get_bytes.await?;
let bytes = Arc::new(bytes);
let mut cache = Cache::bytes_cache().write().unwrap();
cache.entry(key).or_insert(bytes.clone());
Ok((*bytes).clone())
}
.boxed()
}

fn get_metadata(
&mut self,
) -> BoxFuture<'_, parquet::errors::Result<Arc<ParquetMetaData>>> {
let cache = MetadataCache::get().read().unwrap();
let cache = Cache::meta_cache().read().unwrap();
let path = &self.inner.meta.location;

if let Some(meta) = cache.map.get(path) {
if let Some(meta) = cache.get(path) {
let meta = meta.clone();
return async move { Ok(meta) }.boxed();
}

drop(cache);

let path = self.inner.meta.location.clone();
let get_meta = self.inner.get_metadata();
async move {
let meta = get_meta.await?;
let mut meta_cache = MetadataCache::get().write().unwrap();
meta_cache.map.entry(path).or_insert(meta.clone());
let mut cache = Cache::meta_cache().write().unwrap();
cache.entry(path).or_insert(meta.clone());
Ok(meta)
}
.boxed()
Expand Down

0 comments on commit 6c0174f

Please sign in to comment.