Skip to content

Commit

Permalink
object_store: Add enabled-by-default "fs" feature (#6636)
Browse files Browse the repository at this point in the history
  • Loading branch information
Turbo87 authored Jan 2, 2025
1 parent fbee05f commit 4a0bdde
Show file tree
Hide file tree
Showing 8 changed files with 28 additions and 10 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/object_store.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ jobs:
# targets.
- name: Run clippy with default features
run: cargo clippy -- -D warnings
- name: Run clippy without default features
run: cargo clippy --no-default-features -- -D warnings
- name: Run clippy with fs features
run: cargo clippy --no-default-features --features fs -- -D warnings
- name: Run clippy with aws feature
run: cargo clippy --features aws -- -D warnings
- name: Run clippy with gcp feature
Expand Down
4 changes: 3 additions & 1 deletion object_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ percent-encoding = "2.1"
snafu = { version = "0.8", default-features = false, features = ["std", "rust_1_61"] }
tracing = { version = "0.1" }
url = "2.2"
walkdir = "2"
walkdir = { version = "2", optional = true }

# Cloud storage support
base64 = { version = "0.22", default-features = false, features = ["std"], optional = true }
Expand All @@ -61,8 +61,10 @@ httparse = { version = "1.8.0", default-features = false, features = ["std"], op
nix = { version = "0.29.0", features = ["fs"] }

[features]
default = ["fs"]
cloud = ["serde", "serde_json", "quick-xml", "hyper", "reqwest", "reqwest/json", "reqwest/stream", "chrono/serde", "base64", "rand", "ring"]
azure = ["cloud", "httparse"]
fs = ["walkdir"]
gcp = ["cloud", "rustls-pemfile"]
aws = ["cloud", "md-5"]
http = ["cloud"]
Expand Down
4 changes: 4 additions & 0 deletions object_store/src/chunked.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ impl ObjectStore for ChunkedStore {
async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
let r = self.inner.get_opts(location, options).await?;
let stream = match r.payload {
#[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
GetResultPayload::File(file, path) => {
crate::local::chunked_stream(file, path, r.range.clone(), self.chunk_size)
}
Expand Down Expand Up @@ -178,7 +179,9 @@ impl ObjectStore for ChunkedStore {
mod tests {
use futures::StreamExt;

#[cfg(feature = "fs")]
use crate::integration::*;
#[cfg(feature = "fs")]
use crate::local::LocalFileSystem;
use crate::memory::InMemory;
use crate::path::Path;
Expand Down Expand Up @@ -209,6 +212,7 @@ mod tests {
}
}

#[cfg(feature = "fs")]
#[tokio::test]
async fn test_chunked() {
let temporary = tempfile::tempdir().unwrap();
Expand Down
17 changes: 11 additions & 6 deletions object_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,13 @@
//! By default, this crate provides the following implementations:
//!
//! * Memory: [`InMemory`](memory::InMemory)
//! * Local filesystem: [`LocalFileSystem`](local::LocalFileSystem)
//!
//! Feature flags are used to enable support for other implementations:
//!
#![cfg_attr(
feature = "fs",
doc = "* Local filesystem: [`LocalFileSystem`](local::LocalFileSystem)"
)]
#![cfg_attr(
feature = "gcp",
doc = "* [`gcp`]: [Google Cloud Storage](https://cloud.google.com/storage/) support. See [`GoogleCloudStorageBuilder`](gcp::GoogleCloudStorageBuilder)"
Expand Down Expand Up @@ -513,7 +516,7 @@ pub mod gcp;
#[cfg(feature = "http")]
pub mod http;
pub mod limit;
#[cfg(not(target_arch = "wasm32"))]
#[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
pub mod local;
pub mod memory;
pub mod path;
Expand Down Expand Up @@ -557,15 +560,15 @@ pub use upload::*;
pub use util::{coalesce_ranges, collect_bytes, GetRange, OBJECT_STORE_COALESCE_DEFAULT};

use crate::path::Path;
#[cfg(not(target_arch = "wasm32"))]
#[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
use crate::util::maybe_spawn_blocking;
use async_trait::async_trait;
use bytes::Bytes;
use chrono::{DateTime, Utc};
use futures::{stream::BoxStream, StreamExt, TryStreamExt};
use snafu::Snafu;
use std::fmt::{Debug, Formatter};
#[cfg(not(target_arch = "wasm32"))]
#[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
use std::io::{Read, Seek, SeekFrom};
use std::ops::Range;
use std::sync::Arc;
Expand Down Expand Up @@ -1028,6 +1031,7 @@ pub struct GetResult {
/// be able to optimise the case of a file already present on local disk
pub enum GetResultPayload {
/// The file, path
#[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
File(std::fs::File, std::path::PathBuf),
/// An opaque stream of bytes
Stream(BoxStream<'static, Result<Bytes>>),
Expand All @@ -1036,6 +1040,7 @@ pub enum GetResultPayload {
impl Debug for GetResultPayload {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
#[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
Self::File(_, _) => write!(f, "GetResultPayload(File)"),
Self::Stream(_) => write!(f, "GetResultPayload(Stream)"),
}
Expand All @@ -1047,7 +1052,7 @@ impl GetResult {
pub async fn bytes(self) -> Result<Bytes> {
let len = self.range.end - self.range.start;
match self.payload {
#[cfg(not(target_arch = "wasm32"))]
#[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
GetResultPayload::File(mut file, path) => {
maybe_spawn_blocking(move || {
file.seek(SeekFrom::Start(self.range.start as _))
Expand Down Expand Up @@ -1087,7 +1092,7 @@ impl GetResult {
/// no additional complexity or overheads
pub fn into_stream(self) -> BoxStream<'static, Result<Bytes>> {
match self.payload {
#[cfg(not(target_arch = "wasm32"))]
#[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
GetResultPayload::File(file, path) => {
const CHUNK_SIZE: usize = 8 * 1024;
local::chunked_stream(file, path, self.range, CHUNK_SIZE)
Expand Down
1 change: 1 addition & 0 deletions object_store/src/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ impl<T: ObjectStore> ObjectStore for LimitStore<T> {

fn permit_get_result(r: GetResult, permit: OwnedSemaphorePermit) -> GetResult {
let payload = match r.payload {
#[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
v @ GetResultPayload::File(_, _) => v,
GetResultPayload::Stream(s) => {
GetResultPayload::Stream(PermitWrapper::new(s, permit).boxed())
Expand Down
4 changes: 2 additions & 2 deletions object_store/src/parse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

#[cfg(not(target_arch = "wasm32"))]
#[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
use crate::local::LocalFileSystem;
use crate::memory::InMemory;
use crate::path::Path;
Expand Down Expand Up @@ -179,7 +179,7 @@ where
let path = Path::parse(path)?;

let store = match scheme {
#[cfg(not(target_arch = "wasm32"))]
#[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
ObjectStoreScheme::Local => Box::new(LocalFileSystem::new()) as _,
ObjectStoreScheme::Memory => Box::new(InMemory::new()) as _,
#[cfg(feature = "aws")]
Expand Down
2 changes: 2 additions & 0 deletions object_store/src/throttle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,8 +307,10 @@ fn usize_to_u32_saturate(x: usize) -> u32 {
}

fn throttle_get(result: GetResult, wait_get_per_byte: Duration) -> GetResult {
#[allow(clippy::infallible_destructuring_match)]
let s = match result.payload {
GetResultPayload::Stream(s) => s,
#[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
GetResultPayload::File(_, _) => unimplemented!(),
};

Expand Down
2 changes: 1 addition & 1 deletion object_store/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ where
}
}

#[cfg(not(target_arch = "wasm32"))]
#[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
/// Takes a function and spawns it to a tokio blocking pool if available
pub(crate) async fn maybe_spawn_blocking<F, T>(f: F) -> Result<T>
where
Expand Down

0 comments on commit 4a0bdde

Please sign in to comment.