Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: alternate backing stores for parquet cache #25959

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 50 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,18 +70,21 @@ csv = "1.3.0"
datafusion = { git = "https://github.com/influxdata/arrow-datafusion.git", rev = "ae0a57b05895ccf4d2febb9c91bbb0956cf7e863" }
datafusion-proto = { git = "https://github.com/influxdata/arrow-datafusion.git", rev = "ae0a57b05895ccf4d2febb9c91bbb0956cf7e863" }
dashmap = "6.1.0"
dhat = "0.3.3"
dotenvy = "0.15.7"
flate2 = "1.0.27"
futures = "0.3.31"
futures-util = "0.3.31"
hashbrown = { version = "0.15.1", features = ["serde"] }
hashlink = "0.10.0"
hex = "0.4.3"
http = "0.2.9"
humantime = "2.1.0"
hyper = "0.14"
insta = { version = "1.39", features = ["json", "redactions", "yaml"] }
indexmap = { version = "2.2.6" }
itertools = "0.13.0"
jemalloc_pprof = "0.6.0"
libc = { version = "0.2" }
mime = "0.3.17"
mockito = { version = "1.4.0", default-features = false }
Expand Down
4 changes: 4 additions & 0 deletions influxdb3_cache/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ chrono.workspace = true
dashmap.workspace = true
datafusion.workspace = true
futures.workspace = true
hashbrown.workspace = true
hashlink.workspace = true
indexmap.workspace = true
parking_lot.workspace = true
object_store.workspace = true
Expand All @@ -45,6 +47,8 @@ influxdb3_test_helpers = { path = "../influxdb3_test_helpers" }

# crates.io deps
bimap.workspace = true
criterion.workspace = true
dhat.workspace = true
insta.workspace = true
object_store.workspace = true
pretty_assertions.workspace = true
Expand Down
25 changes: 25 additions & 0 deletions influxdb3_cache/benches/parquet_cache_benchmark.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
use criterion::{BenchmarkId, Criterion};

pub fn criterion_benchmark(c: &mut Criterion) {
let mut group = c.benchmark_group("parquet_cache_t5");
group.bench_function(BenchmarkId::new("dashmap", 0), |b| {
b.iter(|| {
dashmap_based_cache_read_write();
})
});

group.bench_function(BenchmarkId::new("linked_hashmap_safe", 0), |b| {
b.iter(|| {
dashmap_based_cache_read_write();
})
});

group.bench_function(BenchmarkId::new("linked_hashmap_unsafe", 0), |b| {
b.iter(|| {
dashmap_based_cache_read_write();
})
});
group.finish();
}

fn dashmap_based_cache_read_write() {}
50 changes: 50 additions & 0 deletions influxdb3_cache/src/parquet_cache/data_store/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
use std::{fmt::Debug, sync::Arc, time::Duration};

use iox_time::TimeProvider;
use object_store::path::Path;

use crate::parquet_cache::{CacheEntryState, CacheValue, SharedCacheValueFuture};

pub(crate) mod safe_linked_map_cache;
pub(crate) mod sharded_map_cache;

pub(crate) trait CacheProvider: Debug + Send + Sync + 'static {
/// Get an entry in the cache or `None` if there is not an entry
///
/// This updates the hit time of the entry and returns a cloned copy of the entry state so that
/// the reference into the map is dropped
fn get(&self, path: &Path) -> Option<CacheEntryState>;

fn get_used(&self) -> usize;

fn get_capacity(&self) -> usize;

fn get_query_cache_duration(&self) -> Duration;

fn get_time_provider(&self) -> Arc<dyn TimeProvider>;

/// Check if an entry in the cache is in process of being fetched or if it was already fetched
/// successfully
///
/// This does not update the hit time of the entry
fn path_already_fetched(&self, path: &Path) -> bool;

/// Insert a `Fetching` entry to the cache along with the shared future for polling the value
/// being fetched
fn set_fetching(&self, path: &Path, fut: SharedCacheValueFuture);

/// When parquet bytes are in hand this method can be used to update the cache value
/// directly without going through Fetching -> Success lifecycle
fn set_cache_value_directly(&self, path: &Path, cache_value: Arc<CacheValue>);

/// Update a `Fetching` entry to a `Success` entry in the cache
fn set_success(&self, path: &Path, value: Arc<CacheValue>) -> Result<(), anyhow::Error>;

/// Remove an entry from the cache, as well as its associated size from the used capacity
fn remove(&self, path: &Path);

/// Prune least recently hit entries from the cache
///
/// This is a no-op if the `used` amount on the cache is not >= its `capacity`
fn prune(&self) -> Option<usize>;
}
Loading