From 3229367b77520bb4898b2547a1fdb7ca133636ef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marko=20Milenkovi=C4=87?= Date: Fri, 8 Nov 2024 21:58:14 +0000 Subject: [PATCH 1/2] Update datafusion to v43 --- Cargo.toml | 8 ++++---- ballista/client/tests/setup.rs | 2 +- ballista/core/src/utils.rs | 9 +++++++++ ballista/scheduler/src/test_utils.rs | 3 ++- benchmarks/src/bin/tpch.rs | 6 +++--- 5 files changed, 19 insertions(+), 9 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 4e88716dc..e17a9ca0e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,10 +27,10 @@ clap = { version = "3", features = ["derive", "cargo"] } configure_me = { version = "0.4.0" } configure_me_codegen = { version = "0.4.4" } # bump directly to datafusion v43 to avoid the serde bug on v42 (https://github.com/apache/datafusion/pull/12626) -datafusion = "42.0.0" -datafusion-cli = "42.0.0" -datafusion-proto = "42.0.0" -datafusion-proto-common = "42.0.0" +datafusion = "43" +datafusion-cli = "43" +datafusion-proto = "43" +datafusion-proto-common = "43" object_store = "0.11" prost = "0.13" prost-types = "0.13" diff --git a/ballista/client/tests/setup.rs b/ballista/client/tests/setup.rs index 10b482906..db7c131fe 100644 --- a/ballista/client/tests/setup.rs +++ b/ballista/client/tests/setup.rs @@ -386,7 +386,7 @@ mod standalone { } } - #[derive(Default)] + #[derive(Default, Debug)] struct BadPlanner {} #[async_trait::async_trait] diff --git a/ballista/core/src/utils.rs b/ballista/core/src/utils.rs index 3f8f6bfea..d4c202254 100644 --- a/ballista/core/src/utils.rs +++ b/ballista/core/src/utils.rs @@ -55,6 +55,7 @@ use datafusion_proto::physical_plan::PhysicalExtensionCodec; use datafusion_proto::protobuf::LogicalPlanNode; use futures::StreamExt; use log::error; +use std::fmt::Debug; use std::io::{BufWriter, Write}; use std::marker::PhantomData; use std::sync::atomic::{AtomicUsize, Ordering}; @@ -509,6 +510,14 @@ pub struct BallistaQueryPlanner { plan_repr: PhantomData, } +impl Debug for BallistaQueryPlanner { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("BallistaQueryPlanner") + .field("scheduler_url", &self.scheduler_url) + .finish() + } +} + impl BallistaQueryPlanner { pub fn new(scheduler_url: String, config: BallistaConfig) -> Self { Self { diff --git a/ballista/scheduler/src/test_utils.rs b/ballista/scheduler/src/test_utils.rs index f9eae3156..c39076b91 100644 --- a/ballista/scheduler/src/test_utils.rs +++ b/ballista/scheduler/src/test_utils.rs @@ -71,6 +71,7 @@ const TEST_SCHEDULER_NAME: &str = "localhost:50050"; /// Sometimes we need to construct logical plans that will produce errors /// when we try and create physical plan. A scan using `ExplodingTableProvider` /// will do the trick +#[derive(Debug)] pub struct ExplodingTableProvider; #[async_trait] @@ -139,7 +140,7 @@ pub async fn datafusion_test_context(path: &str) -> Result { .has_header(false) .file_extension(".tbl"); let dir = format!("{path}/{table}"); - ctx.register_csv(table, &dir, options).await?; + ctx.register_csv(*table, &dir, options).await?; } Ok(ctx) } diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs index ac35b3f14..ef0385682 100644 --- a/benchmarks/src/bin/tpch.rs +++ b/benchmarks/src/bin/tpch.rs @@ -589,7 +589,7 @@ async fn register_tables( "Registering table '{table}' using TBL files at path {path}" ); } - ctx.register_csv(table, &path, options) + ctx.register_csv(*table, &path, options) .await .map_err(|e| DataFusionError::Plan(format!("{e:?}")))?; } @@ -602,7 +602,7 @@ async fn register_tables( "Registering table '{table}' using CSV files at path {path}" ); } - ctx.register_csv(table, &path, options) + ctx.register_csv(*table, &path, options) .await .map_err(|e| DataFusionError::Plan(format!("{e:?}")))?; } @@ -613,7 +613,7 @@ async fn register_tables( "Registering table '{table}' using Parquet files at path {path}" ); } - ctx.register_parquet(table, &path, ParquetReadOptions::default()) + ctx.register_parquet(*table, &path, ParquetReadOptions::default()) .await .map_err(|e| DataFusionError::Plan(format!("{e:?}")))?; } From f3769a6179554978edfebb27dbc6869395e22910 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marko=20Milenkovi=C4=87?= Date: Fri, 8 Nov 2024 22:10:25 +0000 Subject: [PATCH 2/2] replace deprecations --- ballista/core/src/utils.rs | 8 ++++---- ballista/executor/src/executor_process.rs | 2 +- ballista/executor/src/standalone.rs | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/ballista/core/src/utils.rs b/ballista/core/src/utils.rs index d4c202254..0745f9d09 100644 --- a/ballista/core/src/utils.rs +++ b/ballista/core/src/utils.rs @@ -71,7 +71,7 @@ pub fn default_session_builder(config: SessionConfig) -> SessionState { .with_default_features() .with_config(config) .with_runtime_env(Arc::new( - RuntimeEnv::new(with_object_store_registry(RuntimeConfig::default())) + RuntimeEnv::try_new(with_object_store_registry(RuntimeConfig::default())) .unwrap(), )) .build() @@ -268,7 +268,7 @@ pub fn create_df_ctx_with_ballista_query_planner( .with_default_features() .with_config(session_config) .with_runtime_env(Arc::new( - RuntimeEnv::new(with_object_store_registry(RuntimeConfig::default())) + RuntimeEnv::try_new(with_object_store_registry(RuntimeConfig::default())) .unwrap(), )) .with_query_planner(planner) @@ -319,7 +319,7 @@ impl SessionStateExt for SessionState { .with_round_robin_repartition(false); let runtime_config = RuntimeConfig::default(); - let runtime_env = RuntimeEnv::new(runtime_config)?; + let runtime_env = RuntimeEnv::try_new(runtime_config)?; let session_state = SessionStateBuilder::new() .with_default_features() .with_config(session_config) @@ -709,7 +709,7 @@ mod test { use crate::utils::{LocalRun, SessionStateExt}; fn context() -> SessionContext { - let runtime_environment = RuntimeEnv::new(RuntimeConfig::new()).unwrap(); + let runtime_environment = RuntimeEnv::try_new(RuntimeConfig::new()).unwrap(); let session_config = SessionConfig::new().with_information_schema(true); diff --git a/ballista/executor/src/executor_process.rs b/ballista/executor/src/executor_process.rs index e6f034f46..8e2e2ebe8 100644 --- a/ballista/executor/src/executor_process.rs +++ b/ballista/executor/src/executor_process.rs @@ -203,7 +203,7 @@ pub async fn start_executor_process(opt: Arc) -> Result<( let wd = work_dir.clone(); let runtime_producer: RuntimeProducer = Arc::new(move |_| { let config = RuntimeConfig::new().with_temp_file_path(wd.clone()); - Ok(Arc::new(RuntimeEnv::new(config)?)) + Ok(Arc::new(RuntimeEnv::try_new(config)?)) }); let logical = opt diff --git a/ballista/executor/src/standalone.rs b/ballista/executor/src/standalone.rs index 28efe70fa..40f1088ec 100644 --- a/ballista/executor/src/standalone.rs +++ b/ballista/executor/src/standalone.rs @@ -170,7 +170,7 @@ pub async fn new_standalone_executor< let config = with_object_store_registry( RuntimeConfig::new().with_temp_file_path(wd.clone()), ); - Ok(Arc::new(RuntimeEnv::new(config)?)) + Ok(Arc::new(RuntimeEnv::try_new(config)?)) }); let executor = Arc::new(Executor::new_basic(