Skip to content

Commit

Permalink
Return BoxStream with 'static lifetime from ObjectStore::list (#…
Browse files Browse the repository at this point in the history
…6619)

Co-authored-by: Andrew Lamb <[email protected]>
  • Loading branch information
kylebarron and alamb authored Jan 8, 2025
1 parent f18dadd commit 74499c0
Show file tree
Hide file tree
Showing 17 changed files with 113 additions and 72 deletions.
2 changes: 1 addition & 1 deletion object_store/src/aws/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -855,7 +855,7 @@ impl GetClient for S3Client {
}

#[async_trait]
impl ListClient for S3Client {
impl ListClient for Arc<S3Client> {
/// Make an S3 List request <https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html>
async fn list_request(
&self,
Expand Down
4 changes: 2 additions & 2 deletions object_store/src/aws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,15 +273,15 @@ impl ObjectStore for AmazonS3 {
.boxed()
}

fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>> {
fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
self.client.list(prefix)
}

fn list_with_offset(
&self,
prefix: Option<&Path>,
offset: &Path,
) -> BoxStream<'_, Result<ObjectMeta>> {
) -> BoxStream<'static, Result<ObjectMeta>> {
if self.client.config.is_s3_express() {
let offset = offset.clone();
// S3 Express does not support start-after
Expand Down
2 changes: 1 addition & 1 deletion object_store/src/azure/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -925,7 +925,7 @@ impl GetClient for AzureClient {
}

#[async_trait]
impl ListClient for AzureClient {
impl ListClient for Arc<AzureClient> {
/// Make an Azure List request <https://docs.microsoft.com/en-us/rest/api/storageservices/list-blobs>
async fn list_request(
&self,
Expand Down
7 changes: 3 additions & 4 deletions object_store/src/azure/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@ impl ObjectStore for MicrosoftAzure {
self.client.delete_request(location, &()).await
}

fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
self.client.list(prefix)
}
fn delete_stream<'a>(
&'a self,
locations: BoxStream<'a, Result<Path>>,
Expand All @@ -139,10 +142,6 @@ impl ObjectStore for MicrosoftAzure {
.boxed()
}

fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>> {
self.client.list(prefix)
}

async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
self.client.list_with_delimiter(prefix).await
}
Expand Down
4 changes: 2 additions & 2 deletions object_store/src/chunked.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,15 +150,15 @@ impl ObjectStore for ChunkedStore {
self.inner.delete(location).await
}

fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>> {
fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
self.inner.list(prefix)
}

fn list_with_offset(
&self,
prefix: Option<&Path>,
offset: &Path,
) -> BoxStream<'_, Result<ObjectMeta>> {
) -> BoxStream<'static, Result<ObjectMeta>> {
self.inner.list_with_offset(prefix, offset)
}

Expand Down
19 changes: 10 additions & 9 deletions object_store/src/client/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,37 +44,38 @@ pub(crate) trait ListClientExt {
prefix: Option<&Path>,
delimiter: bool,
offset: Option<&Path>,
) -> BoxStream<'_, Result<ListResult>>;
) -> BoxStream<'static, Result<ListResult>>;

fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>>;
fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>>;

#[allow(unused)]
fn list_with_offset(
&self,
prefix: Option<&Path>,
offset: &Path,
) -> BoxStream<'_, Result<ObjectMeta>>;
) -> BoxStream<'static, Result<ObjectMeta>>;

async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult>;
}

#[async_trait]
impl<T: ListClient> ListClientExt for T {
impl<T: ListClient + Clone> ListClientExt for T {
fn list_paginated(
&self,
prefix: Option<&Path>,
delimiter: bool,
offset: Option<&Path>,
) -> BoxStream<'_, Result<ListResult>> {
) -> BoxStream<'static, Result<ListResult>> {
let offset = offset.map(|x| x.to_string());
let prefix = prefix
.filter(|x| !x.as_ref().is_empty())
.map(|p| format!("{}{}", p.as_ref(), crate::path::DELIMITER));

stream_paginated(
self.clone(),
(prefix, offset),
move |(prefix, offset), token| async move {
let (r, next_token) = self
move |client, (prefix, offset), token| async move {
let (r, next_token) = client
.list_request(
prefix.as_deref(),
delimiter,
Expand All @@ -88,7 +89,7 @@ impl<T: ListClient> ListClientExt for T {
.boxed()
}

fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>> {
fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
self.list_paginated(prefix, false, None)
.map_ok(|r| futures::stream::iter(r.objects.into_iter().map(Ok)))
.try_flatten()
Expand All @@ -99,7 +100,7 @@ impl<T: ListClient> ListClientExt for T {
&self,
prefix: Option<&Path>,
offset: &Path,
) -> BoxStream<'_, Result<ObjectMeta>> {
) -> BoxStream<'static, Result<ObjectMeta>> {
self.list_paginated(prefix, false, Some(offset))
.map_ok(|r| futures::stream::iter(r.objects.into_iter().map(Ok)))
.try_flatten()
Expand Down
50 changes: 29 additions & 21 deletions object_store/src/client/pagination.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,14 @@ use std::future::Future;
/// finish, otherwise it will continue to call `op(state, token)` with the values returned by the
/// previous call to `op`, until a continuation token of `None` is returned
///
pub(crate) fn stream_paginated<F, Fut, S, T>(state: S, op: F) -> impl Stream<Item = Result<T>>
pub(crate) fn stream_paginated<F, Fut, S, T, C>(
client: C,
state: S,
op: F,
) -> impl Stream<Item = Result<T>>
where
F: Fn(S, Option<String>) -> Fut + Copy,
C: Clone,
F: Fn(C, S, Option<String>) -> Fut + Copy,
Fut: Future<Output = Result<(T, S, Option<String>)>>,
{
enum PaginationState<T> {
Expand All @@ -46,27 +51,30 @@ where
Done,
}

futures::stream::unfold(PaginationState::Start(state), move |state| async move {
let (s, page_token) = match state {
PaginationState::Start(s) => (s, None),
PaginationState::HasMore(s, page_token) if !page_token.is_empty() => {
(s, Some(page_token))
}
_ => {
return None;
}
};
futures::stream::unfold(PaginationState::Start(state), move |state| {
let client = client.clone();
async move {
let (s, page_token) = match state {
PaginationState::Start(s) => (s, None),
PaginationState::HasMore(s, page_token) if !page_token.is_empty() => {
(s, Some(page_token))
}
_ => {
return None;
}
};

let (resp, s, continuation) = match op(s, page_token).await {
Ok(resp) => resp,
Err(e) => return Some((Err(e), PaginationState::Done)),
};
let (resp, s, continuation) = match op(client, s, page_token).await {
Ok(resp) => resp,
Err(e) => return Some((Err(e), PaginationState::Done)),
};

let next_state = match continuation {
Some(token) => PaginationState::HasMore(s, token),
None => PaginationState::Done,
};
let next_state = match continuation {
Some(token) => PaginationState::HasMore(s, token),
None => PaginationState::Done,
};

Some((Ok(resp), next_state))
Some((Ok(resp), next_state))
}
})
}
2 changes: 1 addition & 1 deletion object_store/src/gcp/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -633,7 +633,7 @@ impl GetClient for GoogleCloudStorageClient {
}

#[async_trait]
impl ListClient for GoogleCloudStorageClient {
impl ListClient for Arc<GoogleCloudStorageClient> {
/// Perform a list request <https://cloud.google.com/storage/docs/xml-api/get-bucket-list>
async fn list_request(
&self,
Expand Down
4 changes: 2 additions & 2 deletions object_store/src/gcp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,15 +183,15 @@ impl ObjectStore for GoogleCloudStorage {
self.client.delete_request(location).await
}

fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>> {
fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
self.client.list(prefix)
}

fn list_with_offset(
&self,
prefix: Option<&Path>,
offset: &Path,
) -> BoxStream<'_, Result<ObjectMeta>> {
) -> BoxStream<'static, Result<ObjectMeta>> {
self.client.list_with_offset(prefix, offset)
}

Expand Down
15 changes: 9 additions & 6 deletions object_store/src/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
//! [rfc2518]: https://datatracker.ietf.org/doc/html/rfc2518
//! [WebDAV]: https://en.wikipedia.org/wiki/WebDAV
use std::sync::Arc;

use async_trait::async_trait;
use futures::stream::BoxStream;
use futures::{StreamExt, TryStreamExt};
Expand Down Expand Up @@ -79,7 +81,7 @@ impl From<Error> for crate::Error {
/// See [`crate::http`] for more information
#[derive(Debug)]
pub struct HttpStore {
client: Client,
client: Arc<Client>,
}

impl std::fmt::Display for HttpStore {
Expand Down Expand Up @@ -130,19 +132,20 @@ impl ObjectStore for HttpStore {
self.client.delete(location).await
}

fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>> {
fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
let prefix_len = prefix.map(|p| p.as_ref().len()).unwrap_or_default();
let prefix = prefix.cloned();
let client = Arc::clone(&self.client);
futures::stream::once(async move {
let status = self.client.list(prefix.as_ref(), "infinity").await?;
let status = client.list(prefix.as_ref(), "infinity").await?;

let iter = status
.response
.into_iter()
.filter(|r| !r.is_dir())
.map(|response| {
.map(move |response| {
response.check_ok()?;
response.object_meta(self.client.base_url())
response.object_meta(client.base_url())
})
// Filter out exact prefix matches
.filter_ok(move |r| r.location.as_ref().len() > prefix_len);
Expand Down Expand Up @@ -238,7 +241,7 @@ impl HttpBuilder {
let parsed = Url::parse(&url).map_err(|source| Error::UnableToParseUrl { url, source })?;

Ok(HttpStore {
client: Client::new(parsed, self.client_options, self.retry_config)?,
client: Arc::new(Client::new(parsed, self.client_options, self.retry_config)?),
})
}
}
Expand Down
8 changes: 4 additions & 4 deletions object_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -722,7 +722,7 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static {
/// `foo/bar_baz/x`. List is recursive, i.e. `foo/bar/more/x` will be included.
///
/// Note: the order of returned [`ObjectMeta`] is not guaranteed
fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>>;
fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>>;

/// List all the objects with the given prefix and a location greater than `offset`
///
Expand All @@ -734,7 +734,7 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static {
&self,
prefix: Option<&Path>,
offset: &Path,
) -> BoxStream<'_, Result<ObjectMeta>> {
) -> BoxStream<'static, Result<ObjectMeta>> {
let offset = offset.clone();
self.list(prefix)
.try_filter(move |f| futures::future::ready(f.location > offset))
Expand Down Expand Up @@ -847,15 +847,15 @@ macro_rules! as_ref_impl {
self.as_ref().delete_stream(locations)
}

fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>> {
fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
self.as_ref().list(prefix)
}

fn list_with_offset(
&self,
prefix: Option<&Path>,
offset: &Path,
) -> BoxStream<'_, Result<ObjectMeta>> {
) -> BoxStream<'static, Result<ObjectMeta>> {
self.as_ref().list_with_offset(prefix, offset)
}

Expand Down
14 changes: 8 additions & 6 deletions object_store/src/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use tokio::sync::{OwnedSemaphorePermit, Semaphore};
///
#[derive(Debug)]
pub struct LimitStore<T: ObjectStore> {
inner: T,
inner: Arc<T>,
max_requests: usize,
semaphore: Arc<Semaphore>,
}
Expand All @@ -56,7 +56,7 @@ impl<T: ObjectStore> LimitStore<T> {
/// `max_requests`
pub fn new(inner: T, max_requests: usize) -> Self {
Self {
inner,
inner: Arc::new(inner),
max_requests,
semaphore: Arc::new(Semaphore::new(max_requests)),
}
Expand Down Expand Up @@ -144,12 +144,13 @@ impl<T: ObjectStore> ObjectStore for LimitStore<T> {
self.inner.delete_stream(locations)
}

fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>> {
fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
let prefix = prefix.cloned();
let inner = Arc::clone(&self.inner);
let fut = Arc::clone(&self.semaphore)
.acquire_owned()
.map(move |permit| {
let s = self.inner.list(prefix.as_ref());
let s = inner.list(prefix.as_ref());
PermitWrapper::new(s, permit.unwrap())
});
fut.into_stream().flatten().boxed()
Expand All @@ -159,13 +160,14 @@ impl<T: ObjectStore> ObjectStore for LimitStore<T> {
&self,
prefix: Option<&Path>,
offset: &Path,
) -> BoxStream<'_, Result<ObjectMeta>> {
) -> BoxStream<'static, Result<ObjectMeta>> {
let prefix = prefix.cloned();
let offset = offset.clone();
let inner = Arc::clone(&self.inner);
let fut = Arc::clone(&self.semaphore)
.acquire_owned()
.map(move |permit| {
let s = self.inner.list_with_offset(prefix.as_ref(), &offset);
let s = inner.list_with_offset(prefix.as_ref(), &offset);
PermitWrapper::new(s, permit.unwrap())
});
fut.into_stream().flatten().boxed()
Expand Down
2 changes: 1 addition & 1 deletion object_store/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,7 @@ impl ObjectStore for LocalFileSystem {
.await
}

fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>> {
fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
let config = Arc::clone(&self.config);

let root_path = match prefix {
Expand Down
2 changes: 1 addition & 1 deletion object_store/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ impl ObjectStore for InMemory {
Ok(())
}

fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>> {
fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
let root = Path::default();
let prefix = prefix.unwrap_or(&root);

Expand Down
Loading

0 comments on commit 74499c0

Please sign in to comment.