From 8dff659c66f44f758ff166a4946504b3623f5a23 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 3 Sep 2024 17:27:47 -0600 Subject: [PATCH 1/8] Use DataFusion's projection pushdown rule --- .../core/src/execution/datafusion/planner.rs | 38 +++++++++++++------ native/core/src/execution/jni_api.rs | 16 +++++--- 2 files changed, 36 insertions(+), 18 deletions(-) diff --git a/native/core/src/execution/datafusion/planner.rs b/native/core/src/execution/datafusion/planner.rs index 122a24ed3..2c38c51a8 100644 --- a/native/core/src/execution/datafusion/planner.rs +++ b/native/core/src/execution/datafusion/planner.rs @@ -56,6 +56,7 @@ use datafusion::functions_aggregate::min_max::min_udaf; use datafusion::functions_aggregate::sum::sum_udaf; use datafusion::physical_plan::windows::BoundedWindowAggExec; use datafusion::physical_plan::InputOrderMode; +use datafusion::physical_planner::DefaultPhysicalPlanner; use datafusion::{ arrow::{compute::SortOptions, datatypes::SchemaRef}, common::DataFusionError, @@ -138,18 +139,6 @@ pub struct PhysicalPlanner { session_ctx: Arc, } -impl Default for PhysicalPlanner { - fn default() -> Self { - let session_ctx = Arc::new(SessionContext::new()); - let execution_props = ExecutionProps::new(); - Self { - exec_context_id: TEST_EXEC_CONTEXT_ID, - execution_props, - session_ctx, - } - } -} - impl PhysicalPlanner { pub fn new(session_ctx: Arc) -> Self { let execution_props = ExecutionProps::new(); @@ -1078,6 +1067,17 @@ impl PhysicalPlanner { } } + pub fn optimize_plan( + &self, + plan: Arc, + ) -> Result, ExecutionError> { + // optimize the physical plan + let datafusion_planner = DefaultPhysicalPlanner::default(); + datafusion_planner + .optimize_physical_plan(plan, &self.session_ctx.state(), |_, _| {}) + .map_err(|e| e.into()) + } + fn parse_join_parameters( &self, inputs: &mut Vec>, @@ -1935,10 +1935,12 @@ mod tests { use arrow_array::{DictionaryArray, Int32Array, StringArray}; use arrow_schema::DataType; use datafusion::{physical_plan::common::collect, prelude::SessionContext}; + use datafusion_expr::execution_props::ExecutionProps; use tokio::sync::mpsc; use crate::execution::{datafusion::planner::PhysicalPlanner, operators::InputBatch}; + use crate::execution::datafusion::planner::TEST_EXEC_CONTEXT_ID; use crate::execution::operators::ExecutionError; use datafusion_comet_proto::{ spark_expression::expr::ExprStruct::*, @@ -1947,6 +1949,18 @@ mod tests { spark_operator::{operator::OpStruct, Operator}, }; + impl Default for PhysicalPlanner { + fn default() -> Self { + let session_ctx = Arc::new(SessionContext::default()); + let execution_props = ExecutionProps::new(); + Self { + exec_context_id: TEST_EXEC_CONTEXT_ID, + execution_props, + session_ctx, + } + } + } + #[test] fn test_unpack_dictionary_primitive() { let op_scan = Operator { diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index 587defaaa..120242f20 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -17,11 +17,14 @@ //! Define JNI APIs which can be called from Java/Scala. +use super::{serde, utils::SparkArrowConvert, CometMemoryPool}; use arrow::{ datatypes::DataType as ArrowDataType, ffi::{FFI_ArrowArray, FFI_ArrowSchema}, }; use arrow_array::RecordBatch; +use datafusion::execution::session_state::SessionStateBuilder; +use datafusion::physical_optimizer::projection_pushdown::ProjectionPushdown; use datafusion::{ execution::{ disk_manager::DiskManagerConfig, @@ -42,8 +45,6 @@ use jni::{ }; use std::{collections::HashMap, sync::Arc, task::Poll}; -use super::{serde, utils::SparkArrowConvert, CometMemoryPool}; - use crate::{ errors::{try_unwrap_or_throw, CometError, CometResult}, execution::{ @@ -249,11 +250,14 @@ fn prepare_datafusion_session_context( let runtime = RuntimeEnv::new(rt_config).unwrap(); - let mut session_ctx = SessionContext::new_with_config_rt(session_config, Arc::new(runtime)); - - datafusion_functions_nested::register_all(&mut session_ctx)?; + let state = SessionStateBuilder::new() + .with_config(session_config) + .with_runtime_env(Arc::new(runtime)) + .with_default_features() + .with_physical_optimizer_rules(vec![Arc::new(ProjectionPushdown::new())]) + .build(); - Ok(session_ctx) + Ok(SessionContext::new_with_state(state)) } fn parse_bool(conf: &HashMap, name: &str) -> CometResult { From f2dca6314af7a40a3c8f56fdb50aa7bddb081b2b Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 3 Sep 2024 17:32:02 -0600 Subject: [PATCH 2/8] call optimizer --- native/core/src/execution/jni_api.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index 120242f20..6bf39c98e 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -359,6 +359,9 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan( &mut exec_context.input_sources.clone(), )?; + // optimize the physical plan + let root_op = planner.optimize_plan(root_op)?; + exec_context.root_op = Some(root_op.clone()); exec_context.scans = scans; From 913eb15aeedfaeb432223bab1991598f546893a1 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 3 Sep 2024 21:50:35 -0600 Subject: [PATCH 3/8] fix --- native/core/src/execution/operators/copy.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/native/core/src/execution/operators/copy.rs b/native/core/src/execution/operators/copy.rs index d6c095a77..5350747ed 100644 --- a/native/core/src/execution/operators/copy.rs +++ b/native/core/src/execution/operators/copy.rs @@ -116,10 +116,9 @@ impl ExecutionPlan for CopyExec { self: Arc, children: Vec>, ) -> DataFusionResult> { - let input = Arc::clone(&self.input); - let new_input = input.with_new_children(children)?; + assert!(children.len() == 1); Ok(Arc::new(CopyExec { - input: new_input, + input: Arc::clone(&children[0]), schema: Arc::clone(&self.schema), cache: self.cache.clone(), metrics: self.metrics.clone(), From a70cff6f441f95f33862dd53586cfcda9074d860 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 3 Sep 2024 21:56:27 -0600 Subject: [PATCH 4/8] add config --- common/src/main/scala/org/apache/comet/CometConf.scala | 7 +++++++ native/core/src/execution/jni_api.rs | 10 +++++++++- .../scala/org/apache/comet/CometExecIterator.scala | 3 ++- 3 files changed, 18 insertions(+), 2 deletions(-) diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 039538814..4ed88f003 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -347,6 +347,13 @@ object CometConf extends ShimCometConf { .booleanConf .createWithDefault(false) + val COMET_NATIVE_OPTIMIZER_ENABLED: ConfigEntry[Boolean] = + conf("spark.comet.exec.optimizer.enabled") + .internal() + .doc("Enable DataFusion physical optimizer for native plans.") + .booleanConf + .createWithDefault(true) + val COMET_WORKER_THREADS: ConfigEntry[Int] = conf("spark.comet.workerThreads") .internal() diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index d93da06d5..90a573377 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -93,6 +93,8 @@ struct ExecutionContext { pub debug_native: bool, /// Whether to write native plans with metrics to stdout pub explain_native: bool, + /// Whether to enable physical optimizer + pub enable_optimizer: bool, } /// Accept serialized query plan and return the address of the native query plan. @@ -133,6 +135,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan( // Whether we've enabled additional debugging on the native side let debug_native = parse_bool(&configs, "debug_native")?; let explain_native = parse_bool(&configs, "explain_native")?; + let enable_optimizer = parse_bool(&configs, "native_optimizer")?; let worker_threads = configs .get("worker_threads") @@ -185,6 +188,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan( session_ctx: Arc::new(session), debug_native, explain_native, + enable_optimizer, }); Ok(Box::into_raw(exec_context) as i64) @@ -360,7 +364,11 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan( )?; // optimize the physical plan - let root_op = planner.optimize_plan(root_op)?; + let root_op = if exec_context.enable_optimizer { + planner.optimize_plan(root_op)? + } else { + root_op + }; exec_context.root_op = Some(Arc::clone(&root_op)); exec_context.scans = scans; diff --git a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala index dcdc8ae92..d79099f40 100644 --- a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala +++ b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala @@ -23,7 +23,7 @@ import org.apache.spark._ import org.apache.spark.sql.comet.CometMetricNode import org.apache.spark.sql.vectorized._ -import org.apache.comet.CometConf.{COMET_BATCH_SIZE, COMET_BLOCKING_THREADS, COMET_DEBUG_ENABLED, COMET_EXEC_MEMORY_FRACTION, COMET_EXPLAIN_NATIVE_ENABLED, COMET_WORKER_THREADS} +import org.apache.comet.CometConf.{COMET_BATCH_SIZE, COMET_BLOCKING_THREADS, COMET_DEBUG_ENABLED, COMET_EXEC_MEMORY_FRACTION, COMET_EXPLAIN_NATIVE_ENABLED, COMET_NATIVE_OPTIMIZER_ENABLED, COMET_WORKER_THREADS} import org.apache.comet.vector.NativeUtil /** @@ -86,6 +86,7 @@ class CometExecIterator( result.put("batch_size", String.valueOf(COMET_BATCH_SIZE.get())) result.put("debug_native", String.valueOf(COMET_DEBUG_ENABLED.get())) result.put("explain_native", String.valueOf(COMET_EXPLAIN_NATIVE_ENABLED.get())) + result.put("native_optimizer", String.valueOf(COMET_NATIVE_OPTIMIZER_ENABLED.get())) result.put("worker_threads", String.valueOf(COMET_WORKER_THREADS.get())) result.put("blocking_threads", String.valueOf(COMET_BLOCKING_THREADS.get())) From c51ced6831279f981b770c2005dc73b53d08c6a9 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 3 Sep 2024 22:06:08 -0600 Subject: [PATCH 5/8] machete --- native/Cargo.lock | 23 ----------------------- native/core/Cargo.toml | 1 - 2 files changed, 24 deletions(-) diff --git a/native/Cargo.lock b/native/Cargo.lock index a3f6f6d30..7e06bc753 100644 --- a/native/Cargo.lock +++ b/native/Cargo.lock @@ -892,7 +892,6 @@ dependencies = [ "datafusion-common", "datafusion-execution", "datafusion-expr", - "datafusion-functions-nested", "datafusion-physical-expr", "flate2", "futures", @@ -1094,28 +1093,6 @@ dependencies = [ "rand", ] -[[package]] -name = "datafusion-functions-nested" -version = "41.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=dff590b#dff590bfd2bb9993b2c8ce6f76a3bdd973e520a8" -dependencies = [ - "arrow", - "arrow-array", - "arrow-buffer", - "arrow-ord", - "arrow-schema", - "datafusion-common", - "datafusion-execution", - "datafusion-expr", - "datafusion-functions", - "datafusion-functions-aggregate", - "datafusion-physical-expr-common", - "itertools 0.13.0", - "log", - "paste", - "rand", -] - [[package]] name = "datafusion-functions-window" version = "41.0.0" diff --git a/native/core/Cargo.toml b/native/core/Cargo.toml index 58fe00e75..85c6e88f6 100644 --- a/native/core/Cargo.toml +++ b/native/core/Cargo.toml @@ -67,7 +67,6 @@ itertools = "0.11.0" paste = "1.0.14" datafusion-common = { workspace = true } datafusion = { workspace = true } -datafusion-functions-nested = { workspace = true } datafusion-expr = { workspace = true } datafusion-execution = { workspace = true } datafusion-physical-expr = { workspace = true } From bdc81445c72e27ee2decd6c07829928104b5e0cb Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 3 Sep 2024 22:26:24 -0600 Subject: [PATCH 6/8] revert a change --- native/Cargo.lock | 23 +++++++++++++++++++++++ native/core/Cargo.toml | 1 + native/core/src/execution/jni_api.rs | 6 +++++- 3 files changed, 29 insertions(+), 1 deletion(-) diff --git a/native/Cargo.lock b/native/Cargo.lock index 7e06bc753..a3f6f6d30 100644 --- a/native/Cargo.lock +++ b/native/Cargo.lock @@ -892,6 +892,7 @@ dependencies = [ "datafusion-common", "datafusion-execution", "datafusion-expr", + "datafusion-functions-nested", "datafusion-physical-expr", "flate2", "futures", @@ -1093,6 +1094,28 @@ dependencies = [ "rand", ] +[[package]] +name = "datafusion-functions-nested" +version = "41.0.0" +source = "git+https://github.com/apache/datafusion.git?rev=dff590b#dff590bfd2bb9993b2c8ce6f76a3bdd973e520a8" +dependencies = [ + "arrow", + "arrow-array", + "arrow-buffer", + "arrow-ord", + "arrow-schema", + "datafusion-common", + "datafusion-execution", + "datafusion-expr", + "datafusion-functions", + "datafusion-functions-aggregate", + "datafusion-physical-expr-common", + "itertools 0.13.0", + "log", + "paste", + "rand", +] + [[package]] name = "datafusion-functions-window" version = "41.0.0" diff --git a/native/core/Cargo.toml b/native/core/Cargo.toml index 85c6e88f6..58fe00e75 100644 --- a/native/core/Cargo.toml +++ b/native/core/Cargo.toml @@ -67,6 +67,7 @@ itertools = "0.11.0" paste = "1.0.14" datafusion-common = { workspace = true } datafusion = { workspace = true } +datafusion-functions-nested = { workspace = true } datafusion-expr = { workspace = true } datafusion-execution = { workspace = true } datafusion-physical-expr = { workspace = true } diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index 90a573377..a79c3127b 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -261,7 +261,11 @@ fn prepare_datafusion_session_context( .with_physical_optimizer_rules(vec![Arc::new(ProjectionPushdown::new())]) .build(); - Ok(SessionContext::new_with_state(state)) + let mut session_ctx = SessionContext::new_with_state(state); + + datafusion_functions_nested::register_all(&mut session_ctx)?; + + Ok(session_ctx) } fn parse_bool(conf: &HashMap, name: &str) -> CometResult { From 4c5a3ef3e05ea95c835a6b184cfd76be730d57b8 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 4 Sep 2024 09:44:13 -0600 Subject: [PATCH 7/8] remove default features from session state --- native/core/src/execution/jni_api.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index a79c3127b..804c95231 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -257,7 +257,6 @@ fn prepare_datafusion_session_context( let state = SessionStateBuilder::new() .with_config(session_config) .with_runtime_env(Arc::new(runtime)) - .with_default_features() .with_physical_optimizer_rules(vec![Arc::new(ProjectionPushdown::new())]) .build(); From 14c32c5ee4bd7c598e8cf4efbb26197741931ab7 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 4 Sep 2024 10:14:30 -0600 Subject: [PATCH 8/8] Revert "remove default features from session state" This reverts commit 4c5a3ef3e05ea95c835a6b184cfd76be730d57b8. --- native/core/src/execution/jni_api.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index 804c95231..a79c3127b 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -257,6 +257,7 @@ fn prepare_datafusion_session_context( let state = SessionStateBuilder::new() .with_config(session_config) .with_runtime_env(Arc::new(runtime)) + .with_default_features() .with_physical_optimizer_rules(vec![Arc::new(ProjectionPushdown::new())]) .build();