Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
rdettai committed Sep 29, 2023
1 parent ae417ff commit 704dfac
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 24 deletions.
12 changes: 5 additions & 7 deletions quickwit/quickwit-storage/src/cache/storage_with_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::fmt;
use std::io::Cursor;
use std::ops::Range;
use std::path::Path;
use std::sync::Arc;
Expand Down Expand Up @@ -76,13 +75,12 @@ impl Storage for StorageWithCache {
async fn get_slice_stream(
&self,
path: &Path,
range: Range<usize>,
_range: Range<usize>,
) -> StorageResult<Box<dyn AsyncRead + Send + Unpin>> {
// TODO: using `get_slice` implies that we need to download the whole
// slice before being able to stream it. We might look for a mechanism
// to start the streaming right away and tee it output into the cache.
let bytes = self.get_slice(path, range).await?;
Ok(Box::new(Cursor::new(bytes)))
unimplemented!(
"StorageWithCache does not support streamed read yet. Failed to get {:?}",
path
)
}

async fn get_all(&self, path: &Path) -> StorageResult<OwnedBytes> {
Expand Down
48 changes: 31 additions & 17 deletions quickwit/quickwit-storage/src/object_storage/azure_blob_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -433,23 +433,37 @@ impl Storage for AzureBlobStorage {
path: &Path,
range: Range<usize>,
) -> StorageResult<Box<dyn AsyncRead + Send + Unpin>> {
let name = self.blob_name(path);
let page_stream = self
.container_client
.blob_client(name)
.get()
.range(range)
.into_stream();
let bytes_stream = page_stream
.map(|page_res| {
page_res
.map(|page| page.data)
.map_err(|err| FutureError::new(FutureErrorKind::Other, err))
})
.try_flatten()
.map(|e| e.map_err(|err| FutureError::new(FutureErrorKind::Other, err)));
let stream_reader = StreamReader::new(bytes_stream);
Ok(Box::new(stream_reader))
retry(&self.retry_params, || async {
let range = range.clone();
let name = self.blob_name(path);
let page_stream = self
.container_client
.blob_client(name)
.get()
.range(range)
.into_stream();
let mut bytes_stream = page_stream
.map(|page_res| {
page_res
.map(|page| page.data)
.map_err(|err| FutureError::new(FutureErrorKind::Other, err))
})
.try_flatten()
.map(|e| e.map_err(|err| FutureError::new(FutureErrorKind::Other, err)));
// Peek into the stream so that any early error can be retried
let first_chunk = bytes_stream.next().await;
let reader: Box<dyn AsyncRead + Send + Unpin> = if let Some(res) = first_chunk {
let first_chunk = res.map_err(AzureErrorWrapper::from)?;
let reconstructed_stream =
Box::pin(futures::stream::once(async { Ok(first_chunk) }).chain(bytes_stream));
Box::new(StreamReader::new(reconstructed_stream))
} else {
Box::new(tokio::io::empty())
};
Result::<Box<dyn AsyncRead + Send + Unpin>, AzureErrorWrapper>::Ok(reader)
})
.await
.map_err(|e| e.into())
}

#[instrument(level = "debug", skip(self), fields(fetched_bytes_len))]
Expand Down

0 comments on commit 704dfac

Please sign in to comment.