From 41029fdca77cbc54a6f6eabc8f2242631fc97069 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Mon, 6 Nov 2023 19:46:15 +0530 Subject: [PATCH] Pre resolve prefixes for remote query (#552) --- server/src/query.rs | 105 +++++++++++++++++++++++++-- server/src/storage/localfs.rs | 17 ++++- server/src/storage/object_storage.rs | 2 + server/src/storage/s3.rs | 9 +++ 4 files changed, 122 insertions(+), 11 deletions(-) diff --git a/server/src/query.rs b/server/src/query.rs index 528f3d117..f7b2102e5 100644 --- a/server/src/query.rs +++ b/server/src/query.rs @@ -29,10 +29,15 @@ use datafusion::execution::context::SessionState; use datafusion::execution::disk_manager::DiskManagerConfig; use datafusion::execution::runtime_env::RuntimeEnv; use datafusion::prelude::*; +use futures_util::stream::FuturesUnordered; +use futures_util::{future, Future, TryStreamExt}; use itertools::Itertools; +use object_store::path::Path as StorePath; +use object_store::{ObjectMeta, ObjectStore}; use serde_json::Value; use std::collections::HashMap; use std::path::{Path, PathBuf}; +use std::pin::Pin; use std::sync::Arc; use sysinfo::{System, SystemExt}; @@ -76,16 +81,14 @@ impl Query { } /// Return prefixes, each per day/hour/minutes as necessary - fn _get_prefixes(&self) -> Vec { + fn generate_prefixes(&self) -> Vec { TimePeriod::new(self.start, self.end, OBJECT_STORE_DATA_GRANULARITY).generate_prefixes() } - pub fn get_prefixes(&self) -> Vec { - self._get_prefixes() + fn get_prefixes(&self) -> Vec { + self.generate_prefixes() .into_iter() .map(|key| format!("{}/{}", self.stream_name, key)) - // latest first - .rev() .collect() } @@ -129,7 +132,15 @@ impl Query { storage: Arc, ) -> Result<(Vec, Vec), ExecuteError> { let ctx = self.create_session_context(); - let remote_listing_table = self._remote_query(storage)?; + let unresolved_prefixes = self.get_prefixes(); + let client = ctx + .runtime_env() + .object_store(Box::new(storage.store_url())) + .unwrap(); + let prefixes = + resolve_paths(client, storage.normalize_prefixes(unresolved_prefixes)).await?; + + let remote_listing_table = self.remote_query(prefixes, storage)?; let current_minute = Utc::now() .with_second(0) @@ -164,11 +175,12 @@ impl Query { Ok((results, fields)) } - fn _remote_query( + fn remote_query( &self, + prefixes: Vec, storage: Arc, ) -> Result>, ExecuteError> { - let prefixes = storage.query_prefixes(self.get_prefixes()); + let prefixes = storage.query_prefixes(prefixes); if prefixes.is_empty() { return Ok(None); } @@ -231,6 +243,83 @@ fn time_from_path(path: &Path) -> DateTime { .unwrap() } +// accepts relative paths to resolve the narrative +// returns list of prefixes sorted in descending order +async fn resolve_paths( + client: Arc, + prefixes: Vec, +) -> Result, ObjectStorageError> { + let mut minute_resolve: HashMap> = HashMap::new(); + let mut all_resolve = Vec::new(); + + for prefix in prefixes { + let components = prefix.split_terminator('/'); + if components.last().is_some_and(|x| x.starts_with("minute")) { + let hour_prefix = &prefix[0..prefix.rfind("minute").expect("minute exists")]; + minute_resolve + .entry(hour_prefix.to_owned()) + .and_modify(|list| list.push(prefix)) + .or_default(); + } else { + all_resolve.push(prefix) + } + } + + type ResolveFuture = Pin, ObjectStorageError>>>>; + + let tasks: FuturesUnordered = FuturesUnordered::new(); + + for (listing_prefix, prefix) in minute_resolve { + let client = Arc::clone(&client); + tasks.push(Box::pin(async move { + let mut list = client + .list(Some(&StorePath::from(listing_prefix))) + .await? + .try_collect::>() + .await?; + + list.retain(|object| { + prefix.iter().any(|prefix| { + object + .location + .prefix_matches(&StorePath::from(prefix.as_ref())) + }) + }); + + Ok(list) + })); + } + + for prefix in all_resolve { + let client = Arc::clone(&client); + tasks.push(Box::pin(async move { + client + .list(Some(&StorePath::from(prefix))) + .await? + .try_collect::>() + .await + .map_err(Into::into) + })); + } + + let res: Vec> = tasks + .and_then(|res| { + future::ok( + res.into_iter() + .map(|res| res.location.to_string()) + .collect_vec(), + ) + }) + .try_collect() + .await?; + + let mut res = res.into_iter().flatten().collect_vec(); + res.sort(); + res.reverse(); + + Ok(res) +} + pub mod error { use datafusion::error::DataFusionError; diff --git a/server/src/storage/localfs.rs b/server/src/storage/localfs.rs index 7d6267e36..8f0572430 100644 --- a/server/src/storage/localfs.rs +++ b/server/src/storage/localfs.rs @@ -194,15 +194,26 @@ impl ObjectStorage for LocalFS { Ok(()) } - fn query_prefixes(&self, prefixes: Vec) -> Vec { + fn normalize_prefixes(&self, prefixes: Vec) -> Vec { prefixes .into_iter() - .filter_map(|prefix| { + .map(|prefix| { let path = self.root.join(prefix); - ListingTableUrl::parse(path.to_str().unwrap()).ok() + format!("{}", path.display()) }) .collect() } + + fn query_prefixes(&self, prefixes: Vec) -> Vec { + prefixes + .into_iter() + .filter_map(|prefix| ListingTableUrl::parse(format!("/{}", prefix)).ok()) + .collect() + } + + fn store_url(&self) -> url::Url { + url::Url::parse("file:///").unwrap() + } } async fn dir_with_stream( diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index bede20745..d1ce0bd67 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -67,7 +67,9 @@ pub trait ObjectStorage: Sync + 'static { async fn list_streams(&self) -> Result, ObjectStorageError>; async fn list_dates(&self, stream_name: &str) -> Result, ObjectStorageError>; async fn upload_file(&self, key: &str, path: &Path) -> Result<(), ObjectStorageError>; + fn normalize_prefixes(&self, prefixes: Vec) -> Vec; fn query_prefixes(&self, prefixes: Vec) -> Vec; + fn store_url(&self) -> url::Url; async fn put_schema( &self, diff --git a/server/src/storage/s3.rs b/server/src/storage/s3.rs index 68379c55e..1c0ed53a5 100644 --- a/server/src/storage/s3.rs +++ b/server/src/storage/s3.rs @@ -444,6 +444,11 @@ impl ObjectStorage for S3 { Ok(()) } + // no op on s3 + fn normalize_prefixes(&self, prefixes: Vec) -> Vec { + prefixes + } + fn query_prefixes(&self, prefixes: Vec) -> Vec { prefixes .into_iter() @@ -453,6 +458,10 @@ impl ObjectStorage for S3 { }) .collect() } + + fn store_url(&self) -> url::Url { + url::Url::parse(&format!("s3://{}", self.bucket)).unwrap() + } } impl From for ObjectStorageError {