diff --git a/quickwit/quickwit-indexing/src/source/file_source.rs b/quickwit/quickwit-indexing/src/source/file_source.rs index 3769c0ee624..4d014adc8eb 100644 --- a/quickwit/quickwit-indexing/src/source/file_source.rs +++ b/quickwit/quickwit-indexing/src/source/file_source.rs @@ -175,16 +175,16 @@ impl TypedSourceFactory for FileSourceFactory { } } -pub fn dir_and_filename(filepath: &Path) -> anyhow::Result<(Uri, &Path)> { +pub(crate) fn dir_and_filename(filepath: &Path) -> anyhow::Result<(Uri, &Path)> { let dir_uri: Uri = filepath .parent() - .ok_or_else(|| anyhow::anyhow!("Parent directory could not be resolved"))? + .context("Parent directory could not be resolved")? .to_str() - .ok_or_else(|| anyhow::anyhow!("Path cannot be turned to string"))? + .context("Path cannot be turned to string")? .parse()?; let file_name = filepath .file_name() - .ok_or_else(|| anyhow::anyhow!("Path does not appear to be a file"))?; + .context("Path does not appear to be a file")?; Ok((dir_uri, file_name.as_ref())) } diff --git a/quickwit/quickwit-storage/src/lib.rs b/quickwit/quickwit-storage/src/lib.rs index 924755ccd2a..b451c59833c 100644 --- a/quickwit/quickwit-storage/src/lib.rs +++ b/quickwit/quickwit-storage/src/lib.rs @@ -149,6 +149,7 @@ pub(crate) mod test_suite { use std::path::Path; use anyhow::Context; + use tokio::io::AsyncReadExt; use crate::{Storage, StorageErrorKind}; @@ -174,6 +175,21 @@ pub(crate) mod test_suite { Ok(()) } + async fn test_write_and_get_slice_stream(storage: &mut dyn Storage) -> anyhow::Result<()> { + let test_path = Path::new("write_and_read_slice_stream"); + storage + .put( + test_path, + Box::new(b"abcdefghiklmnopqrstuvxyz"[..].to_vec()), + ) + .await?; + let mut reader = storage.get_slice_stream(test_path, 3..6).await?; + let mut buf = vec![0; 3]; + reader.read_exact(&mut buf).await?; + assert_eq!(&buf[..], b"def"); + Ok(()) + } + async fn test_write_get_all(storage: &mut dyn Storage) -> anyhow::Result<()> { let test_path = Path::new("write_and_read_all"); storage @@ -294,6 +310,9 @@ pub(crate) mod test_suite { test_write_and_get_slice(storage) .await .context("write_and_get_slice")?; + test_write_and_get_slice_stream(storage) + .await + .context("write_and_get_slice_stream")?; test_write_get_all(storage) .await .context("write_and_get_all")?; @@ -320,8 +339,6 @@ pub(crate) mod test_suite { pub async fn storage_test_single_part_upload(storage: &mut dyn Storage) -> anyhow::Result<()> { use std::ops::Range; - use tokio::io::AsyncReadExt; - let test_path = Path::new("hello_small.txt"); let data = b"hello, happy tax payer!"; let data_size = data.len() as u64; diff --git a/quickwit/quickwit-storage/src/local_file_storage.rs b/quickwit/quickwit-storage/src/local_file_storage.rs index 8d35431ca19..c637d4b579d 100644 --- a/quickwit/quickwit-storage/src/local_file_storage.rs +++ b/quickwit/quickwit-storage/src/local_file_storage.rs @@ -30,7 +30,7 @@ use futures::StreamExt; use quickwit_common::ignore_error_kind; use quickwit_common::uri::Uri; use quickwit_config::StorageBackend; -use tokio::io::{AsyncRead, AsyncSeekExt, AsyncWriteExt}; +use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeekExt, AsyncWriteExt}; use tracing::warn; use crate::storage::SendableAsync; @@ -239,7 +239,7 @@ impl Storage for LocalFileStorage { let full_path = self.full_path(path)?; let mut file = tokio::fs::File::open(&full_path).await?; file.seek(SeekFrom::Start(range.start as u64)).await?; - Ok(Box::new(file)) + Ok(Box::new(file.take(range.len() as u64))) } async fn delete(&self, path: &Path) -> StorageResult<()> {