Skip to content

Commit

Permalink
feature flag ExecDriver implementations (#2005)
Browse files Browse the repository at this point in the history
1. Feature flag tokio/rayon based `ExecDriver` implementation
2. If tokio is available AND running within the context of a runtime, use it.
  • Loading branch information
AdamGS authored Jan 17, 2025
1 parent dd09060 commit 77e22f0
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 8 deletions.
2 changes: 1 addition & 1 deletion vortex-datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ vortex-buffer = { workspace = true }
vortex-dtype = { workspace = true }
vortex-error = { workspace = true, features = ["datafusion"] }
vortex-expr = { workspace = true, features = ["datafusion"] }
vortex-file = { workspace = true, features = ["object_store"] }
vortex-file = { workspace = true, features = ["object_store", "tokio"] }
vortex-io = { workspace = true, features = ["object_store", "tokio"] }
vortex-scan = { workspace = true }

Expand Down
4 changes: 2 additions & 2 deletions vortex-file/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ futures-util = { workspace = true }
itertools = { workspace = true }
log = { workspace = true }
pin-project-lite = { workspace = true }
rayon = { workspace = true }
tokio = { workspace = true, features = ["rt"] }
rayon = { workspace = true, optional = true }
tokio = { workspace = true, features = ["rt"], optional = true }
tracing = { workspace = true, optional = true }
vortex-array = { workspace = true }
vortex-buffer = { workspace = true }
Expand Down
2 changes: 2 additions & 0 deletions vortex-file/src/exec/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
pub mod inline;

#[cfg(feature = "tokio")]
pub mod tokio;

use futures_util::future::BoxFuture;
Expand Down
24 changes: 20 additions & 4 deletions vortex-file/src/open/exec.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
use std::sync::Arc;

#[cfg(feature = "tokio")]
use tokio::runtime::Handle;

use crate::exec::inline::InlineDriver;
#[cfg(feature = "tokio")]
use crate::exec::tokio::TokioDriver;
use crate::exec::ExecDriver;

Expand All @@ -11,20 +15,32 @@ pub enum ExecutionMode {
/// [`vortex_array::stream::ArrayStream`]. In other words, uses the same runtime.
Inline,
/// Spawns the tasks onto a provided Rayon thread pool.
// TODO(ngates): feature-flag this dependency.
#[cfg(feature = "rayon")]
RayonThreadPool(Arc<rayon::ThreadPool>),
/// Spawns the tasks onto a provided Tokio runtime.
// TODO(ngates): feature-flag this dependency.
TokioRuntime(tokio::runtime::Handle),
#[cfg(feature = "tokio")]
TokioRuntime(Handle),
}

impl ExecutionMode {
pub fn into_driver(self) -> Arc<dyn ExecDriver> {
match self {
ExecutionMode::Inline => Arc::new(InlineDriver),
ExecutionMode::Inline => {
// Default to tokio-specific behavior if its enabled and there's a runtime running.
#[cfg(feature = "tokio")]
match Handle::try_current() {
Ok(h) => Arc::new(TokioDriver(h)),
Err(_) => Arc::new(InlineDriver),
}

#[cfg(not(feature = "tokio"))]
Arc::new(InlineDriver)
}
#[cfg(feature = "rayon")]
ExecutionMode::RayonThreadPool(_) => {
todo!()
}
#[cfg(feature = "tokio")]
ExecutionMode::TokioRuntime(handle) => Arc::new(TokioDriver(handle)),
}
}
Expand Down
1 change: 1 addition & 0 deletions vortex-file/src/read/record_batch_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub trait AsyncRuntime {
fn block_on<F: Future>(&self, fut: F) -> F::Output;
}

#[cfg(feature = "tokio")]
impl AsyncRuntime for tokio::runtime::Runtime {
fn block_on<F: Future>(&self, fut: F) -> F::Output {
self.block_on(fut)
Expand Down
2 changes: 1 addition & 1 deletion vortex/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ vortex-zigzag = { workspace = true }
vortex-roaring = { workspace = true }

[features]
tokio = ["vortex-io/tokio"]
tokio = ["vortex-io/tokio", "vortex-file/tokio"]
object_store = ["vortex-file/object_store"]
parquet = ["vortex-error/parquet"]
python = ["vortex-error/python"]

0 comments on commit 77e22f0

Please sign in to comment.