From 725ccb464e856a292ebe891f9935224faa595d29 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 22 Nov 2024 08:39:54 -0700 Subject: [PATCH 01/21] Refactor native planner to build tree of SparkPlan that maps to original Spark plan --- native/core/src/execution/datafusion/mod.rs | 1 + .../core/src/execution/datafusion/planner.rs | 147 +++++++++++++----- .../src/execution/datafusion/spark_plan.rs | 63 ++++++++ native/core/src/execution/jni_api.rs | 11 +- native/core/src/execution/metrics/utils.rs | 19 ++- native/proto/src/proto/operator.proto | 3 + .../apache/comet/serde/QueryPlanSerde.scala | 2 +- 7 files changed, 196 insertions(+), 50 deletions(-) create mode 100644 native/core/src/execution/datafusion/spark_plan.rs diff --git a/native/core/src/execution/datafusion/mod.rs b/native/core/src/execution/datafusion/mod.rs index 6f81ee918..ca41fa0aa 100644 --- a/native/core/src/execution/datafusion/mod.rs +++ b/native/core/src/execution/datafusion/mod.rs @@ -21,4 +21,5 @@ pub mod expressions; mod operators; pub mod planner; pub mod shuffle_writer; +pub(crate) mod spark_plan; mod util; diff --git a/native/core/src/execution/datafusion/planner.rs b/native/core/src/execution/datafusion/planner.rs index 73541b0a4..4f3aa2559 100644 --- a/native/core/src/execution/datafusion/planner.rs +++ b/native/core/src/execution/datafusion/planner.rs @@ -85,6 +85,7 @@ use datafusion::{ use datafusion_functions_nested::concat::ArrayAppend; use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr}; +use crate::execution::datafusion::spark_plan::SparkPlan; use datafusion_comet_proto::{ spark_expression::{ self, agg_expr::ExprStruct as AggExprStruct, expr::ExprStruct, literal::Value, AggExpr, @@ -861,11 +862,11 @@ impl PhysicalPlanner { /// /// Note that we return created `Scan`s which will be kept at JNI API. JNI calls will use it to /// feed in new input batch from Spark JVM side. - pub fn create_plan<'a>( + pub(crate) fn create_plan<'a>( &'a self, spark_plan: &'a Operator, inputs: &mut Vec>, - ) -> Result<(Vec, Arc), ExecutionError> { + ) -> Result<(Vec, Arc), ExecutionError> { let children = &spark_plan.children; match spark_plan.op_struct.as_ref().unwrap() { OpStruct::Projection(project) => { @@ -880,7 +881,11 @@ impl PhysicalPlanner { .map(|r| (r, format!("col_{}", idx))) }) .collect(); - Ok((scans, Arc::new(ProjectionExec::try_new(exprs?, child)?))) + let projection = Arc::new(ProjectionExec::try_new(exprs?, child.wrapped.clone())?); + Ok(( + scans, + Arc::new(SparkPlan::new(spark_plan.plan_id, projection)), + )) } OpStruct::Filter(filter) => { assert!(children.len() == 1); @@ -888,7 +893,8 @@ impl PhysicalPlanner { let predicate = self.create_expr(filter.predicate.as_ref().unwrap(), child.schema())?; - Ok((scans, Arc::new(FilterExec::try_new(predicate, child)?))) + let filter = Arc::new(FilterExec::try_new(predicate, child.wrapped.clone())?); + Ok((scans, Arc::new(SparkPlan::new(spark_plan.plan_id, filter)))) } OpStruct::HashAgg(agg) => { assert!(children.len() == 1); @@ -926,7 +932,7 @@ impl PhysicalPlanner { group_by, aggr_expr, vec![None; num_agg], // no filter expressions - Arc::clone(&child), + Arc::clone(&child.wrapped), Arc::clone(&schema), )?, ); @@ -940,8 +946,11 @@ impl PhysicalPlanner { }) .collect(); - let exec: Arc = if agg.result_exprs.is_empty() { - aggregate + if agg.result_exprs.is_empty() { + Ok(( + scans, + Arc::new(SparkPlan::new(spark_plan.plan_id, aggregate)), + )) } else { // For final aggregation, DF's hash aggregate exec doesn't support Spark's // aggregate result expressions like `COUNT(col) + 1`, but instead relying @@ -950,17 +959,25 @@ impl PhysicalPlanner { // // Note that `result_exprs` should only be set for final aggregation on the // Spark side. - Arc::new(ProjectionExec::try_new(result_exprs?, aggregate)?) - }; - - Ok((scans, exec)) + let projection = + Arc::new(ProjectionExec::try_new(result_exprs?, aggregate.clone())?); + Ok(( + scans, + Arc::new(SparkPlan::new_with_additional( + spark_plan.plan_id, + projection, + vec![aggregate], + )), + )) + } } OpStruct::Limit(limit) => { assert!(children.len() == 1); let num = limit.limit; let (scans, child) = self.create_plan(&children[0], inputs)?; - Ok((scans, Arc::new(LocalLimitExec::new(child, num as usize)))) + let limit = Arc::new(LocalLimitExec::new(child.wrapped.clone(), num as usize)); + Ok((scans, Arc::new(SparkPlan::new(spark_plan.plan_id, limit)))) } OpStruct::Sort(sort) => { assert!(children.len() == 1); @@ -978,11 +995,19 @@ impl PhysicalPlanner { // SortExec fails in some cases if we do not unpack dictionary-encoded arrays, and // it would be more efficient if we could avoid that. // https://github.com/apache/datafusion-comet/issues/963 - let child = Self::wrap_in_copy_exec(child); + let child_copied = Self::wrap_in_copy_exec(child.wrapped.clone()); + + let sort = Arc::new( + SortExec::new(LexOrdering::new(exprs?), child_copied.clone()).with_fetch(fetch), + ); Ok(( scans, - Arc::new(SortExec::new(LexOrdering::new(exprs?), child).with_fetch(fetch)), + Arc::new(SparkPlan::new_with_additional( + spark_plan.plan_id, + sort, + vec![child.wrapped.clone()], + )), )) } OpStruct::Scan(scan) => { @@ -1008,7 +1033,10 @@ impl PhysicalPlanner { // The `ScanExec` operator will take actual arrays from Spark during execution let scan = ScanExec::new(self.exec_context_id, input_source, &scan.source, data_types)?; - Ok((vec![scan.clone()], Arc::new(scan))) + Ok(( + vec![scan.clone()], + Arc::new(SparkPlan::new(spark_plan.plan_id, Arc::new(scan))), + )) } OpStruct::ShuffleWriter(writer) => { assert!(children.len() == 1); @@ -1017,14 +1045,21 @@ impl PhysicalPlanner { let partitioning = self .create_partitioning(writer.partitioning.as_ref().unwrap(), child.schema())?; + let shuffle_writer = Arc::new(ShuffleWriterExec::try_new( + child.wrapped.clone(), + partitioning, + writer.output_data_file.clone(), + writer.output_index_file.clone(), + )?); + + // TODO this assumes that the child of a shuffle is always a ScanExec Ok(( scans, - Arc::new(ShuffleWriterExec::try_new( - child, - partitioning, - writer.output_data_file.clone(), - writer.output_index_file.clone(), - )?), + Arc::new(SparkPlan::new_with_additional( + spark_plan.plan_id, + shuffle_writer, + vec![child.wrapped.clone()], + )), )) } OpStruct::Expand(expand) => { @@ -1068,16 +1103,28 @@ impl PhysicalPlanner { // the data corruption. Note that we only need to copy the input batch // if the child operator is `ScanExec`, because other operators after `ScanExec` // will create new arrays for the output batch. - let child = if can_reuse_input_batch(&child) { - Arc::new(CopyExec::new(child, CopyMode::UnpackOrDeepCopy)) + if can_reuse_input_batch(&child.wrapped) { + let child_copied = Arc::new(CopyExec::new( + child.wrapped.clone(), + CopyMode::UnpackOrDeepCopy, + )); + let expand = Arc::new(CometExpandExec::new(projections, child_copied, schema)); + Ok(( + scans, + Arc::new(SparkPlan::new_with_additional( + spark_plan.plan_id, + expand, + vec![child.wrapped.clone()], + )), + )) } else { - child - }; - - Ok(( - scans, - Arc::new(CometExpandExec::new(projections, child, schema)), - )) + let expand = Arc::new(CometExpandExec::new( + projections, + child.wrapped.clone(), + schema, + )); + Ok((scans, Arc::new(SparkPlan::new(spark_plan.plan_id, expand)))) + } } OpStruct::SortMergeJoin(join) => { let (join_params, scans) = self.parse_join_parameters( @@ -1115,7 +1162,8 @@ impl PhysicalPlanner { false, )?); - Ok((scans, join)) + // TODO pass in additional plans (CopyExec, ScanExec) + Ok((scans, Arc::new(SparkPlan::new(spark_plan.plan_id, join)))) } OpStruct::HashJoin(join) => { let (join_params, scans) = self.parse_join_parameters( @@ -1154,7 +1202,11 @@ impl PhysicalPlanner { swap_hash_join(hash_join.as_ref(), PartitionMode::Partitioned)? }; - Ok((scans, hash_join)) + // TODO pass in additional plans (CopyExec, ScanExec) + Ok(( + scans, + Arc::new(SparkPlan::new(spark_plan.plan_id, hash_join)), + )) } OpStruct::Window(wnd) => { let (scans, child) = self.create_plan(&children[0], inputs)?; @@ -1187,14 +1239,16 @@ impl PhysicalPlanner { }) .collect(); + let window_agg = Arc::new(BoundedWindowAggExec::try_new( + window_expr?, + child.wrapped.clone(), + partition_exprs.to_vec(), + InputOrderMode::Sorted, + )?); Ok(( scans, - Arc::new(BoundedWindowAggExec::try_new( - window_expr?, - child, - partition_exprs.to_vec(), - InputOrderMode::Sorted, - )?), + // TODO additional metrics? + Arc::new(SparkPlan::new(spark_plan.plan_id, window_agg)), )) } } @@ -1331,8 +1385,8 @@ impl PhysicalPlanner { Ok(( JoinParameters { - left, - right, + left: left.wrapped.clone(), + right: right.wrapped.clone(), join_on, join_type, join_filter, @@ -2207,6 +2261,7 @@ mod tests { #[test] fn test_unpack_dictionary_primitive() { let op_scan = Operator { + plan_id: 0, children: vec![], op_struct: Some(OpStruct::Scan(spark_operator::Scan { fields: vec![spark_expression::DataType { @@ -2232,7 +2287,7 @@ mod tests { let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); - let mut stream = datafusion_plan.execute(0, task_ctx).unwrap(); + let mut stream = datafusion_plan.wrapped.execute(0, task_ctx).unwrap(); let runtime = tokio::runtime::Runtime::new().unwrap(); let (tx, mut rx) = mpsc::channel(1); @@ -2279,6 +2334,7 @@ mod tests { #[test] fn test_unpack_dictionary_string() { let op_scan = Operator { + plan_id: 0, children: vec![], op_struct: Some(OpStruct::Scan(spark_operator::Scan { fields: vec![spark_expression::DataType { @@ -2315,7 +2371,7 @@ mod tests { let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); - let mut stream = datafusion_plan.execute(0, task_ctx).unwrap(); + let mut stream = datafusion_plan.wrapped.execute(0, task_ctx).unwrap(); let runtime = tokio::runtime::Runtime::new().unwrap(); let (tx, mut rx) = mpsc::channel(1); @@ -2365,6 +2421,7 @@ mod tests { #[allow(clippy::field_reassign_with_default)] async fn to_datafusion_filter() { let op_scan = spark_operator::Operator { + plan_id: 0, children: vec![], op_struct: Some(spark_operator::operator::OpStruct::Scan( spark_operator::Scan { @@ -2388,7 +2445,10 @@ mod tests { let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); - let stream = datafusion_plan.execute(0, Arc::clone(&task_ctx)).unwrap(); + let stream = datafusion_plan + .wrapped + .execute(0, Arc::clone(&task_ctx)) + .unwrap(); let output = collect(stream).await.unwrap(); assert!(output.is_empty()); } @@ -2442,6 +2502,7 @@ mod tests { }; Operator { + plan_id: 0, children: vec![child_op], op_struct: Some(OpStruct::Filter(spark_operator::Filter { predicate: Some(expr), diff --git a/native/core/src/execution/datafusion/spark_plan.rs b/native/core/src/execution/datafusion/spark_plan.rs new file mode 100644 index 000000000..136d5607c --- /dev/null +++ b/native/core/src/execution/datafusion/spark_plan.rs @@ -0,0 +1,63 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow_schema::SchemaRef; +use datafusion::physical_plan::ExecutionPlan; +use std::sync::Arc; + +/// Wrapper around a native plan that maps to a Spark plan and can optionally contain +/// references to other native plans that should contribute to the Spark SQL metrics +///for the root plan (such as CopyExec and ScanExec nodes) +#[derive(Debug, Clone)] +pub(crate) struct SparkPlan { + pub(crate) plan_id: u32, + pub(crate) wrapped: Arc, + pub(crate) children: Vec>, + pub(crate) metrics_plans: Vec>, +} + +impl SparkPlan { + pub(crate) fn new(plan_id: u32, wrapped: Arc) -> Self { + Self { + plan_id, + wrapped, + children: vec![], + metrics_plans: vec![], + } + } + + pub(crate) fn new_with_additional( + plan_id: u32, + wrapped: Arc, + metrics_plans: Vec>, + ) -> Self { + Self { + plan_id, + wrapped, + children: vec![], + metrics_plans, + } + } + + pub(crate) fn schema(&self) -> SchemaRef { + self.wrapped.schema() + } + + pub(crate) fn children(&self) -> &Vec> { + &self.children + } +} diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index 448f383c6..1c953128c 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -24,7 +24,7 @@ use datafusion::{ disk_manager::DiskManagerConfig, runtime_env::{RuntimeConfig, RuntimeEnv}, }, - physical_plan::{display::DisplayableExecutionPlan, ExecutionPlan, SendableRecordBatchStream}, + physical_plan::{display::DisplayableExecutionPlan, SendableRecordBatchStream}, prelude::{SessionConfig, SessionContext}, }; use futures::poll; @@ -59,6 +59,7 @@ use jni::{ }; use tokio::runtime::Runtime; +use crate::execution::datafusion::spark_plan::SparkPlan; use crate::execution::operators::ScanExec; use log::info; @@ -69,7 +70,7 @@ struct ExecutionContext { /// The deserialized Spark plan pub spark_plan: Operator, /// The DataFusion root operator converted from the `spark_plan` - pub root_op: Option>, + pub root_op: Option>, /// The input sources for the DataFusion plan pub scans: Vec, /// The global reference of input sources for the DataFusion plan @@ -360,7 +361,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan( if exec_context.explain_native { let formatted_plan_str = - DisplayableExecutionPlan::new(root_op.as_ref()).indent(true); + DisplayableExecutionPlan::new(root_op.wrapped.as_ref()).indent(true); info!("Comet native query plan:\n{formatted_plan_str:}"); } @@ -369,6 +370,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan( .root_op .as_ref() .unwrap() + .wrapped .execute(0, task_ctx)?; exec_context.stream = Some(stream); } else { @@ -400,7 +402,8 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan( if exec_context.explain_native { if let Some(plan) = &exec_context.root_op { let formatted_plan_str = - DisplayableExecutionPlan::with_metrics(plan.as_ref()).indent(true); + DisplayableExecutionPlan::with_metrics(plan.wrapped.as_ref()) + .indent(true); info!( "Comet native query plan with metrics:\ \n[Stage {} Partition {}] plan creation (including CometScans fetching first batches) took {:?}:\ diff --git a/native/core/src/execution/metrics/utils.rs b/native/core/src/execution/metrics/utils.rs index 9291f32c7..5945f06cf 100644 --- a/native/core/src/execution/metrics/utils.rs +++ b/native/core/src/execution/metrics/utils.rs @@ -15,12 +15,12 @@ // specific language governing permissions and limitations // under the License. +use crate::execution::datafusion::spark_plan::SparkPlan; use crate::jvm_bridge::jni_new_global_ref; use crate::{ errors::CometError, jvm_bridge::{jni_call, jni_new_string}, }; -use datafusion::physical_plan::ExecutionPlan; use jni::objects::{GlobalRef, JString}; use jni::{objects::JObject, JNIEnv}; use std::collections::HashMap; @@ -32,13 +32,14 @@ use std::sync::Arc; pub fn update_comet_metric( env: &mut JNIEnv, metric_node: &JObject, - execution_plan: &Arc, + execution_plan: &Arc, metrics_jstrings: &mut HashMap>, ) -> Result<(), CometError> { update_metrics( env, metric_node, &execution_plan + .wrapped .metrics() .unwrap_or_default() .iter() @@ -48,6 +49,20 @@ pub fn update_comet_metric( metrics_jstrings, )?; + if !execution_plan.metrics_plans.is_empty() { + // TODO stop dropping these metrics + println!( + "Dropping the elapsed_compute time of {} for plan {} ({})", + execution_plan + .metrics_plans + .iter() + .map(|p| p.metrics().unwrap().elapsed_compute().unwrap_or(0)) + .sum::(), + execution_plan.wrapped.name(), + execution_plan.plan_id + ); + } + unsafe { for (i, child_plan) in execution_plan.children().iter().enumerate() { let child_metric_node: JObject = jni_call!(env, diff --git a/native/proto/src/proto/operator.proto b/native/proto/src/proto/operator.proto index 533d504c4..74ec80cb5 100644 --- a/native/proto/src/proto/operator.proto +++ b/native/proto/src/proto/operator.proto @@ -31,6 +31,9 @@ message Operator { // The child operators of this repeated Operator children = 1; + // Spark plan ID + uint32 plan_id = 2; + oneof op_struct { Scan scan = 100; Projection projection = 101; diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index f7d5fc91a..2bb467af5 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -2508,7 +2508,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim */ def operator2Proto(op: SparkPlan, childOp: Operator*): Option[Operator] = { val conf = op.conf - val result = OperatorOuterClass.Operator.newBuilder() + val result = OperatorOuterClass.Operator.newBuilder().setPlanId(op.id) childOp.foreach(result.addChildren) op match { From 581984e9a7d5795beb14855c4835e4e584c8f501 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 22 Nov 2024 08:55:59 -0700 Subject: [PATCH 02/21] add SparkPlan children --- .../core/src/execution/datafusion/planner.rs | 36 ++++++++++++++----- .../src/execution/datafusion/spark_plan.rs | 11 ++++-- native/core/src/execution/metrics/utils.rs | 33 +++++++++-------- 3 files changed, 53 insertions(+), 27 deletions(-) diff --git a/native/core/src/execution/datafusion/planner.rs b/native/core/src/execution/datafusion/planner.rs index 4f3aa2559..b183ed696 100644 --- a/native/core/src/execution/datafusion/planner.rs +++ b/native/core/src/execution/datafusion/planner.rs @@ -884,7 +884,7 @@ impl PhysicalPlanner { let projection = Arc::new(ProjectionExec::try_new(exprs?, child.wrapped.clone())?); Ok(( scans, - Arc::new(SparkPlan::new(spark_plan.plan_id, projection)), + Arc::new(SparkPlan::new(spark_plan.plan_id, projection, vec![child])), )) } OpStruct::Filter(filter) => { @@ -894,7 +894,10 @@ impl PhysicalPlanner { self.create_expr(filter.predicate.as_ref().unwrap(), child.schema())?; let filter = Arc::new(FilterExec::try_new(predicate, child.wrapped.clone())?); - Ok((scans, Arc::new(SparkPlan::new(spark_plan.plan_id, filter)))) + Ok(( + scans, + Arc::new(SparkPlan::new(spark_plan.plan_id, filter, vec![child])), + )) } OpStruct::HashAgg(agg) => { assert!(children.len() == 1); @@ -949,7 +952,7 @@ impl PhysicalPlanner { if agg.result_exprs.is_empty() { Ok(( scans, - Arc::new(SparkPlan::new(spark_plan.plan_id, aggregate)), + Arc::new(SparkPlan::new(spark_plan.plan_id, aggregate, vec![child])), )) } else { // For final aggregation, DF's hash aggregate exec doesn't support Spark's @@ -966,6 +969,7 @@ impl PhysicalPlanner { Arc::new(SparkPlan::new_with_additional( spark_plan.plan_id, projection, + vec![child], vec![aggregate], )), )) @@ -977,7 +981,10 @@ impl PhysicalPlanner { let (scans, child) = self.create_plan(&children[0], inputs)?; let limit = Arc::new(LocalLimitExec::new(child.wrapped.clone(), num as usize)); - Ok((scans, Arc::new(SparkPlan::new(spark_plan.plan_id, limit)))) + Ok(( + scans, + Arc::new(SparkPlan::new(spark_plan.plan_id, limit, vec![child])), + )) } OpStruct::Sort(sort) => { assert!(children.len() == 1); @@ -1006,6 +1013,7 @@ impl PhysicalPlanner { Arc::new(SparkPlan::new_with_additional( spark_plan.plan_id, sort, + vec![child.clone()], vec![child.wrapped.clone()], )), )) @@ -1035,7 +1043,7 @@ impl PhysicalPlanner { ScanExec::new(self.exec_context_id, input_source, &scan.source, data_types)?; Ok(( vec![scan.clone()], - Arc::new(SparkPlan::new(spark_plan.plan_id, Arc::new(scan))), + Arc::new(SparkPlan::new(spark_plan.plan_id, Arc::new(scan), vec![])), )) } OpStruct::ShuffleWriter(writer) => { @@ -1058,6 +1066,7 @@ impl PhysicalPlanner { Arc::new(SparkPlan::new_with_additional( spark_plan.plan_id, shuffle_writer, + vec![child.clone()], vec![child.wrapped.clone()], )), )) @@ -1114,6 +1123,7 @@ impl PhysicalPlanner { Arc::new(SparkPlan::new_with_additional( spark_plan.plan_id, expand, + vec![child.clone()], vec![child.wrapped.clone()], )), )) @@ -1123,7 +1133,10 @@ impl PhysicalPlanner { child.wrapped.clone(), schema, )); - Ok((scans, Arc::new(SparkPlan::new(spark_plan.plan_id, expand)))) + Ok(( + scans, + Arc::new(SparkPlan::new(spark_plan.plan_id, expand, vec![child])), + )) } } OpStruct::SortMergeJoin(join) => { @@ -1163,7 +1176,11 @@ impl PhysicalPlanner { )?); // TODO pass in additional plans (CopyExec, ScanExec) - Ok((scans, Arc::new(SparkPlan::new(spark_plan.plan_id, join)))) + // TODO pass in child spark plans + Ok(( + scans, + Arc::new(SparkPlan::new(spark_plan.plan_id, join, vec![])), + )) } OpStruct::HashJoin(join) => { let (join_params, scans) = self.parse_join_parameters( @@ -1203,9 +1220,10 @@ impl PhysicalPlanner { }; // TODO pass in additional plans (CopyExec, ScanExec) + // TODO pass in child spark plans Ok(( scans, - Arc::new(SparkPlan::new(spark_plan.plan_id, hash_join)), + Arc::new(SparkPlan::new(spark_plan.plan_id, hash_join, vec![])), )) } OpStruct::Window(wnd) => { @@ -1248,7 +1266,7 @@ impl PhysicalPlanner { Ok(( scans, // TODO additional metrics? - Arc::new(SparkPlan::new(spark_plan.plan_id, window_agg)), + Arc::new(SparkPlan::new(spark_plan.plan_id, window_agg, vec![child])), )) } } diff --git a/native/core/src/execution/datafusion/spark_plan.rs b/native/core/src/execution/datafusion/spark_plan.rs index 136d5607c..6974c7974 100644 --- a/native/core/src/execution/datafusion/spark_plan.rs +++ b/native/core/src/execution/datafusion/spark_plan.rs @@ -31,11 +31,15 @@ pub(crate) struct SparkPlan { } impl SparkPlan { - pub(crate) fn new(plan_id: u32, wrapped: Arc) -> Self { + pub(crate) fn new( + plan_id: u32, + wrapped: Arc, + children: Vec>, + ) -> Self { Self { plan_id, wrapped, - children: vec![], + children, metrics_plans: vec![], } } @@ -43,12 +47,13 @@ impl SparkPlan { pub(crate) fn new_with_additional( plan_id: u32, wrapped: Arc, + children: Vec>, metrics_plans: Vec>, ) -> Self { Self { plan_id, wrapped, - children: vec![], + children, metrics_plans, } } diff --git a/native/core/src/execution/metrics/utils.rs b/native/core/src/execution/metrics/utils.rs index 5945f06cf..909526360 100644 --- a/native/core/src/execution/metrics/utils.rs +++ b/native/core/src/execution/metrics/utils.rs @@ -32,13 +32,13 @@ use std::sync::Arc; pub fn update_comet_metric( env: &mut JNIEnv, metric_node: &JObject, - execution_plan: &Arc, + spark_plan: &Arc, metrics_jstrings: &mut HashMap>, ) -> Result<(), CometError> { update_metrics( env, metric_node, - &execution_plan + &spark_plan .wrapped .metrics() .unwrap_or_default() @@ -49,22 +49,25 @@ pub fn update_comet_metric( metrics_jstrings, )?; - if !execution_plan.metrics_plans.is_empty() { - // TODO stop dropping these metrics - println!( - "Dropping the elapsed_compute time of {} for plan {} ({})", - execution_plan - .metrics_plans - .iter() - .map(|p| p.metrics().unwrap().elapsed_compute().unwrap_or(0)) - .sum::(), - execution_plan.wrapped.name(), - execution_plan.plan_id - ); + if !spark_plan.metrics_plans.is_empty() { + for metrics_plan in &spark_plan.metrics_plans { + // TODO stop dropping these metrics! + println!( + "Dropping the {} elapsed_compute time of {} for plan {} (#{})", + metrics_plan.name(), + metrics_plan + .metrics() + .unwrap() + .elapsed_compute() + .unwrap_or(0), + spark_plan.wrapped.name(), + spark_plan.plan_id + ); + } } unsafe { - for (i, child_plan) in execution_plan.children().iter().enumerate() { + for (i, child_plan) in spark_plan.children().iter().enumerate() { let child_metric_node: JObject = jni_call!(env, comet_metric_node(metric_node).get_child_node(i as i32) -> JObject )?; From d25f2d3bee8b386c4bf8290f1f19191500f86f22 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 22 Nov 2024 09:10:25 -0700 Subject: [PATCH 03/21] clippy --- .../core/src/execution/datafusion/planner.rs | 47 +++++++++++-------- 1 file changed, 27 insertions(+), 20 deletions(-) diff --git a/native/core/src/execution/datafusion/planner.rs b/native/core/src/execution/datafusion/planner.rs index b183ed696..03972a56c 100644 --- a/native/core/src/execution/datafusion/planner.rs +++ b/native/core/src/execution/datafusion/planner.rs @@ -881,7 +881,8 @@ impl PhysicalPlanner { .map(|r| (r, format!("col_{}", idx))) }) .collect(); - let projection = Arc::new(ProjectionExec::try_new(exprs?, child.wrapped.clone())?); + let projection = + Arc::new(ProjectionExec::try_new(exprs?, Arc::clone(&child.wrapped))?); Ok(( scans, Arc::new(SparkPlan::new(spark_plan.plan_id, projection, vec![child])), @@ -893,7 +894,7 @@ impl PhysicalPlanner { let predicate = self.create_expr(filter.predicate.as_ref().unwrap(), child.schema())?; - let filter = Arc::new(FilterExec::try_new(predicate, child.wrapped.clone())?); + let filter = Arc::new(FilterExec::try_new(predicate, Arc::clone(&child.wrapped))?); Ok(( scans, Arc::new(SparkPlan::new(spark_plan.plan_id, filter, vec![child])), @@ -929,7 +930,7 @@ impl PhysicalPlanner { let num_agg = agg.agg_exprs.len(); let aggr_expr = agg_exprs?.into_iter().map(Arc::new).collect(); - let aggregate = Arc::new( + let aggregate: Arc = Arc::new( datafusion::physical_plan::aggregates::AggregateExec::try_new( mode, group_by, @@ -962,8 +963,10 @@ impl PhysicalPlanner { // // Note that `result_exprs` should only be set for final aggregation on the // Spark side. - let projection = - Arc::new(ProjectionExec::try_new(result_exprs?, aggregate.clone())?); + let projection = Arc::new(ProjectionExec::try_new( + result_exprs?, + Arc::clone(&aggregate), + )?); Ok(( scans, Arc::new(SparkPlan::new_with_additional( @@ -980,7 +983,10 @@ impl PhysicalPlanner { let num = limit.limit; let (scans, child) = self.create_plan(&children[0], inputs)?; - let limit = Arc::new(LocalLimitExec::new(child.wrapped.clone(), num as usize)); + let limit = Arc::new(LocalLimitExec::new( + Arc::clone(&child.wrapped), + num as usize, + )); Ok(( scans, Arc::new(SparkPlan::new(spark_plan.plan_id, limit, vec![child])), @@ -1002,10 +1008,11 @@ impl PhysicalPlanner { // SortExec fails in some cases if we do not unpack dictionary-encoded arrays, and // it would be more efficient if we could avoid that. // https://github.com/apache/datafusion-comet/issues/963 - let child_copied = Self::wrap_in_copy_exec(child.wrapped.clone()); + let child_copied = Self::wrap_in_copy_exec(Arc::clone(&child.wrapped)); let sort = Arc::new( - SortExec::new(LexOrdering::new(exprs?), child_copied.clone()).with_fetch(fetch), + SortExec::new(LexOrdering::new(exprs?), Arc::clone(&child_copied)) + .with_fetch(fetch), ); Ok(( @@ -1013,8 +1020,8 @@ impl PhysicalPlanner { Arc::new(SparkPlan::new_with_additional( spark_plan.plan_id, sort, - vec![child.clone()], - vec![child.wrapped.clone()], + vec![Arc::clone(&child)], + vec![Arc::clone(&child.wrapped)], )), )) } @@ -1054,7 +1061,7 @@ impl PhysicalPlanner { .create_partitioning(writer.partitioning.as_ref().unwrap(), child.schema())?; let shuffle_writer = Arc::new(ShuffleWriterExec::try_new( - child.wrapped.clone(), + Arc::clone(&child.wrapped), partitioning, writer.output_data_file.clone(), writer.output_index_file.clone(), @@ -1066,8 +1073,8 @@ impl PhysicalPlanner { Arc::new(SparkPlan::new_with_additional( spark_plan.plan_id, shuffle_writer, - vec![child.clone()], - vec![child.wrapped.clone()], + vec![Arc::clone(&child)], + vec![Arc::clone(&child.wrapped)], )), )) } @@ -1114,7 +1121,7 @@ impl PhysicalPlanner { // will create new arrays for the output batch. if can_reuse_input_batch(&child.wrapped) { let child_copied = Arc::new(CopyExec::new( - child.wrapped.clone(), + Arc::clone(&child.wrapped), CopyMode::UnpackOrDeepCopy, )); let expand = Arc::new(CometExpandExec::new(projections, child_copied, schema)); @@ -1123,14 +1130,14 @@ impl PhysicalPlanner { Arc::new(SparkPlan::new_with_additional( spark_plan.plan_id, expand, - vec![child.clone()], - vec![child.wrapped.clone()], + vec![Arc::clone(&child)], + vec![Arc::clone(&child.wrapped)], )), )) } else { let expand = Arc::new(CometExpandExec::new( projections, - child.wrapped.clone(), + Arc::clone(&child.wrapped), schema, )); Ok(( @@ -1259,7 +1266,7 @@ impl PhysicalPlanner { let window_agg = Arc::new(BoundedWindowAggExec::try_new( window_expr?, - child.wrapped.clone(), + Arc::clone(&child.wrapped), partition_exprs.to_vec(), InputOrderMode::Sorted, )?); @@ -1403,8 +1410,8 @@ impl PhysicalPlanner { Ok(( JoinParameters { - left: left.wrapped.clone(), - right: right.wrapped.clone(), + left: Arc::clone(&left.wrapped), + right: Arc::clone(&right.wrapped), join_on, join_type, join_filter, From 2027dc9ad5939521807a9bd11be88d2ceb11b160 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 22 Nov 2024 09:18:52 -0700 Subject: [PATCH 04/21] add some more documentation --- native/core/src/execution/datafusion/planner.rs | 8 +++++++- native/core/src/execution/datafusion/spark_plan.rs | 5 +++++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/native/core/src/execution/datafusion/planner.rs b/native/core/src/execution/datafusion/planner.rs index 03972a56c..59c13c987 100644 --- a/native/core/src/execution/datafusion/planner.rs +++ b/native/core/src/execution/datafusion/planner.rs @@ -848,7 +848,13 @@ impl PhysicalPlanner { } } - /// Create a DataFusion physical plan from Spark physical plan. + /// Create a DataFusion physical plan from Spark physical plan. There is a level of + /// abstraction where a tree of SparkPlan nodes is returned. There is a 1:1 mapping from a + /// protobuf Operator (that represents a Spark operator) to a native SparkPlan struct. We + /// need this 1:1 mapping so that we can report metrics back to Spark. The native execution + /// plan that is generated for each Operator is sometimes a single ExecutionPlan, but in some + /// cases we generate a tree of ExecutionPlans and we need to collect metrics for all of these + /// plans so we store references to them in the SparkPlan struct. /// /// `inputs` is a vector of input source IDs. It is used to create `ScanExec`s. Each `ScanExec` /// will be assigned a unique ID from `inputs` and the ID will be used to identify the input diff --git a/native/core/src/execution/datafusion/spark_plan.rs b/native/core/src/execution/datafusion/spark_plan.rs index 6974c7974..b943d8234 100644 --- a/native/core/src/execution/datafusion/spark_plan.rs +++ b/native/core/src/execution/datafusion/spark_plan.rs @@ -24,9 +24,14 @@ use std::sync::Arc; ///for the root plan (such as CopyExec and ScanExec nodes) #[derive(Debug, Clone)] pub(crate) struct SparkPlan { + /// Spark plan ID which is passed down in the protobuf pub(crate) plan_id: u32, + /// The root native plan that was generated for this Spark plan pub(crate) wrapped: Arc, + /// Child Spark plans pub(crate) children: Vec>, + /// Additional native plans that were generated for this Spark plan that we need + /// to collect metrics for pub(crate) metrics_plans: Vec>, } From 8c8c9a5d67c0d4a405adb83eb308e2f0d04cd7df Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 22 Nov 2024 11:40:51 -0700 Subject: [PATCH 05/21] aggregate metrics --- .../core/src/execution/datafusion/planner.rs | 43 ++++++------ .../src/execution/datafusion/spark_plan.rs | 28 ++++---- native/core/src/execution/jni_api.rs | 6 +- native/core/src/execution/metrics/utils.rs | 37 +++++------ .../spark/sql/comet/CometMetricNode.scala | 65 ++++++++++--------- 5 files changed, 95 insertions(+), 84 deletions(-) diff --git a/native/core/src/execution/datafusion/planner.rs b/native/core/src/execution/datafusion/planner.rs index 59c13c987..ea1abdd4a 100644 --- a/native/core/src/execution/datafusion/planner.rs +++ b/native/core/src/execution/datafusion/planner.rs @@ -887,8 +887,10 @@ impl PhysicalPlanner { .map(|r| (r, format!("col_{}", idx))) }) .collect(); - let projection = - Arc::new(ProjectionExec::try_new(exprs?, Arc::clone(&child.wrapped))?); + let projection = Arc::new(ProjectionExec::try_new( + exprs?, + Arc::clone(&child.native_plan), + )?); Ok(( scans, Arc::new(SparkPlan::new(spark_plan.plan_id, projection, vec![child])), @@ -900,7 +902,10 @@ impl PhysicalPlanner { let predicate = self.create_expr(filter.predicate.as_ref().unwrap(), child.schema())?; - let filter = Arc::new(FilterExec::try_new(predicate, Arc::clone(&child.wrapped))?); + let filter = Arc::new(FilterExec::try_new( + predicate, + Arc::clone(&child.native_plan), + )?); Ok(( scans, Arc::new(SparkPlan::new(spark_plan.plan_id, filter, vec![child])), @@ -942,7 +947,7 @@ impl PhysicalPlanner { group_by, aggr_expr, vec![None; num_agg], // no filter expressions - Arc::clone(&child.wrapped), + Arc::clone(&child.native_plan), Arc::clone(&schema), )?, ); @@ -990,7 +995,7 @@ impl PhysicalPlanner { let (scans, child) = self.create_plan(&children[0], inputs)?; let limit = Arc::new(LocalLimitExec::new( - Arc::clone(&child.wrapped), + Arc::clone(&child.native_plan), num as usize, )); Ok(( @@ -1014,7 +1019,7 @@ impl PhysicalPlanner { // SortExec fails in some cases if we do not unpack dictionary-encoded arrays, and // it would be more efficient if we could avoid that. // https://github.com/apache/datafusion-comet/issues/963 - let child_copied = Self::wrap_in_copy_exec(Arc::clone(&child.wrapped)); + let child_copied = Self::wrap_in_copy_exec(Arc::clone(&child.native_plan)); let sort = Arc::new( SortExec::new(LexOrdering::new(exprs?), Arc::clone(&child_copied)) @@ -1027,7 +1032,7 @@ impl PhysicalPlanner { spark_plan.plan_id, sort, vec![Arc::clone(&child)], - vec![Arc::clone(&child.wrapped)], + vec![Arc::clone(&child.native_plan)], )), )) } @@ -1067,7 +1072,7 @@ impl PhysicalPlanner { .create_partitioning(writer.partitioning.as_ref().unwrap(), child.schema())?; let shuffle_writer = Arc::new(ShuffleWriterExec::try_new( - Arc::clone(&child.wrapped), + Arc::clone(&child.native_plan), partitioning, writer.output_data_file.clone(), writer.output_index_file.clone(), @@ -1080,7 +1085,7 @@ impl PhysicalPlanner { spark_plan.plan_id, shuffle_writer, vec![Arc::clone(&child)], - vec![Arc::clone(&child.wrapped)], + vec![Arc::clone(&child.native_plan)], )), )) } @@ -1125,9 +1130,9 @@ impl PhysicalPlanner { // the data corruption. Note that we only need to copy the input batch // if the child operator is `ScanExec`, because other operators after `ScanExec` // will create new arrays for the output batch. - if can_reuse_input_batch(&child.wrapped) { + if can_reuse_input_batch(&child.native_plan) { let child_copied = Arc::new(CopyExec::new( - Arc::clone(&child.wrapped), + Arc::clone(&child.native_plan), CopyMode::UnpackOrDeepCopy, )); let expand = Arc::new(CometExpandExec::new(projections, child_copied, schema)); @@ -1137,13 +1142,13 @@ impl PhysicalPlanner { spark_plan.plan_id, expand, vec![Arc::clone(&child)], - vec![Arc::clone(&child.wrapped)], + vec![Arc::clone(&child.native_plan)], )), )) } else { let expand = Arc::new(CometExpandExec::new( projections, - Arc::clone(&child.wrapped), + Arc::clone(&child.native_plan), schema, )); Ok(( @@ -1272,7 +1277,7 @@ impl PhysicalPlanner { let window_agg = Arc::new(BoundedWindowAggExec::try_new( window_expr?, - Arc::clone(&child.wrapped), + Arc::clone(&child.native_plan), partition_exprs.to_vec(), InputOrderMode::Sorted, )?); @@ -1416,8 +1421,8 @@ impl PhysicalPlanner { Ok(( JoinParameters { - left: Arc::clone(&left.wrapped), - right: Arc::clone(&right.wrapped), + left: Arc::clone(&left.native_plan), + right: Arc::clone(&right.native_plan), join_on, join_type, join_filter, @@ -2318,7 +2323,7 @@ mod tests { let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); - let mut stream = datafusion_plan.wrapped.execute(0, task_ctx).unwrap(); + let mut stream = datafusion_plan.native_plan.execute(0, task_ctx).unwrap(); let runtime = tokio::runtime::Runtime::new().unwrap(); let (tx, mut rx) = mpsc::channel(1); @@ -2402,7 +2407,7 @@ mod tests { let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); - let mut stream = datafusion_plan.wrapped.execute(0, task_ctx).unwrap(); + let mut stream = datafusion_plan.native_plan.execute(0, task_ctx).unwrap(); let runtime = tokio::runtime::Runtime::new().unwrap(); let (tx, mut rx) = mpsc::channel(1); @@ -2477,7 +2482,7 @@ mod tests { let task_ctx = session_ctx.task_ctx(); let stream = datafusion_plan - .wrapped + .native_plan .execute(0, Arc::clone(&task_ctx)) .unwrap(); let output = collect(stream).await.unwrap(); diff --git a/native/core/src/execution/datafusion/spark_plan.rs b/native/core/src/execution/datafusion/spark_plan.rs index b943d8234..3fecba2ed 100644 --- a/native/core/src/execution/datafusion/spark_plan.rs +++ b/native/core/src/execution/datafusion/spark_plan.rs @@ -21,52 +21,56 @@ use std::sync::Arc; /// Wrapper around a native plan that maps to a Spark plan and can optionally contain /// references to other native plans that should contribute to the Spark SQL metrics -///for the root plan (such as CopyExec and ScanExec nodes) +/// for the root plan (such as CopyExec and ScanExec nodes) #[derive(Debug, Clone)] pub(crate) struct SparkPlan { /// Spark plan ID which is passed down in the protobuf pub(crate) plan_id: u32, /// The root native plan that was generated for this Spark plan - pub(crate) wrapped: Arc, + pub(crate) native_plan: Arc, /// Child Spark plans pub(crate) children: Vec>, /// Additional native plans that were generated for this Spark plan that we need - /// to collect metrics for - pub(crate) metrics_plans: Vec>, + /// to collect metrics for (such as CopyExec and ScanExec) + pub(crate) additional_native_plans: Vec>, } impl SparkPlan { + /// Create a SparkPlan that consists of a single native plan pub(crate) fn new( plan_id: u32, - wrapped: Arc, + native_plan: Arc, children: Vec>, ) -> Self { Self { plan_id, - wrapped, + native_plan, children, - metrics_plans: vec![], + additional_native_plans: vec![], } } + /// Create a SparkPlan that consists of more than one native plan pub(crate) fn new_with_additional( plan_id: u32, - wrapped: Arc, + native_plan: Arc, children: Vec>, - metrics_plans: Vec>, + additional_native_plans: Vec>, ) -> Self { Self { plan_id, - wrapped, + native_plan, children, - metrics_plans, + additional_native_plans, } } + /// Get the schema of the native plan pub(crate) fn schema(&self) -> SchemaRef { - self.wrapped.schema() + self.native_plan.schema() } + /// Get the child SparkPlan instances pub(crate) fn children(&self) -> &Vec> { &self.children } diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index 1c953128c..d50084627 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -361,7 +361,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan( if exec_context.explain_native { let formatted_plan_str = - DisplayableExecutionPlan::new(root_op.wrapped.as_ref()).indent(true); + DisplayableExecutionPlan::new(root_op.native_plan.as_ref()).indent(true); info!("Comet native query plan:\n{formatted_plan_str:}"); } @@ -370,7 +370,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan( .root_op .as_ref() .unwrap() - .wrapped + .native_plan .execute(0, task_ctx)?; exec_context.stream = Some(stream); } else { @@ -402,7 +402,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan( if exec_context.explain_native { if let Some(plan) = &exec_context.root_op { let formatted_plan_str = - DisplayableExecutionPlan::with_metrics(plan.wrapped.as_ref()) + DisplayableExecutionPlan::with_metrics(plan.native_plan.as_ref()) .indent(true); info!( "Comet native query plan with metrics:\ diff --git a/native/core/src/execution/metrics/utils.rs b/native/core/src/execution/metrics/utils.rs index 909526360..585de3085 100644 --- a/native/core/src/execution/metrics/utils.rs +++ b/native/core/src/execution/metrics/utils.rs @@ -28,19 +28,31 @@ use std::sync::Arc; /// Updates the metrics of a CometMetricNode. This function is called recursively to /// update the metrics of all the children nodes. The metrics are pulled from the -/// DataFusion execution plan and pushed to the Java side through JNI. +/// native execution plan and pushed to the Java side through JNI. pub fn update_comet_metric( env: &mut JNIEnv, metric_node: &JObject, spark_plan: &Arc, metrics_jstrings: &mut HashMap>, ) -> Result<(), CometError> { + // combine all metrics from all native plans for this SparkPlan + let metrics = if spark_plan.additional_native_plans.is_empty() { + spark_plan.native_plan.metrics() + } else { + let mut metrics = spark_plan.native_plan.metrics().unwrap_or_default(); + for plan in &spark_plan.additional_native_plans { + let additional_metrics = plan.metrics().unwrap_or_default(); + for c in additional_metrics.iter() { + metrics.push(c.to_owned()); + } + } + Some(metrics.aggregate_by_name()) + }; + update_metrics( env, metric_node, - &spark_plan - .wrapped - .metrics() + &metrics .unwrap_or_default() .iter() .map(|m| m.value()) @@ -49,23 +61,6 @@ pub fn update_comet_metric( metrics_jstrings, )?; - if !spark_plan.metrics_plans.is_empty() { - for metrics_plan in &spark_plan.metrics_plans { - // TODO stop dropping these metrics! - println!( - "Dropping the {} elapsed_compute time of {} for plan {} (#{})", - metrics_plan.name(), - metrics_plan - .metrics() - .unwrap() - .elapsed_compute() - .unwrap_or(0), - spark_plan.wrapped.name(), - spark_plan.plan_id - ); - } - } - unsafe { for (i, child_plan) in spark_plan.children().iter().enumerate() { let child_metric_node: JObject = jni_call!(env, diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala index 47c89d943..f07054e19 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala @@ -69,6 +69,11 @@ case class CometMetricNode(metrics: Map[String, SQLMetric], children: Seq[CometM object CometMetricNode { + def defaultMetrics(sc: SparkContext): Map[String, SQLMetric] = { + Map( + "jvm_fetch_time" -> SQLMetrics.createNanoTimingMetric(sc, "time fetching batches from JVM")) + } + /** * The baseline SQL metrics for DataFusion `BaselineMetrics`. */ @@ -93,41 +98,43 @@ object CometMetricNode { * SQL Metrics for DataFusion HashJoin */ def hashJoinMetrics(sc: SparkContext): Map[String, SQLMetric] = { - Map( - "build_time" -> - SQLMetrics.createNanoTimingMetric(sc, "Total time for collecting build-side of join"), - "build_input_batches" -> - SQLMetrics.createMetric(sc, "Number of batches consumed by build-side"), - "build_input_rows" -> - SQLMetrics.createMetric(sc, "Number of rows consumed by build-side"), - "build_mem_used" -> - SQLMetrics.createSizeMetric(sc, "Memory used by build-side"), - "input_batches" -> - SQLMetrics.createMetric(sc, "Number of batches consumed by probe-side"), - "input_rows" -> - SQLMetrics.createMetric(sc, "Number of rows consumed by probe-side"), - "output_batches" -> SQLMetrics.createMetric(sc, "Number of batches produced"), - "output_rows" -> SQLMetrics.createMetric(sc, "Number of rows produced"), - "join_time" -> SQLMetrics.createNanoTimingMetric(sc, "Total time for joining")) + defaultMetrics(sc) ++ + Map( + "build_time" -> + SQLMetrics.createNanoTimingMetric(sc, "Total time for collecting build-side of join"), + "build_input_batches" -> + SQLMetrics.createMetric(sc, "Number of batches consumed by build-side"), + "build_input_rows" -> + SQLMetrics.createMetric(sc, "Number of rows consumed by build-side"), + "build_mem_used" -> + SQLMetrics.createSizeMetric(sc, "Memory used by build-side"), + "input_batches" -> + SQLMetrics.createMetric(sc, "Number of batches consumed by probe-side"), + "input_rows" -> + SQLMetrics.createMetric(sc, "Number of rows consumed by probe-side"), + "output_batches" -> SQLMetrics.createMetric(sc, "Number of batches produced"), + "output_rows" -> SQLMetrics.createMetric(sc, "Number of rows produced"), + "join_time" -> SQLMetrics.createNanoTimingMetric(sc, "Total time for joining")) } /** * SQL Metrics for DataFusion SortMergeJoin */ def sortMergeJoinMetrics(sc: SparkContext): Map[String, SQLMetric] = { - Map( - "peak_mem_used" -> - SQLMetrics.createSizeMetric(sc, "Memory used by build-side"), - "input_batches" -> - SQLMetrics.createMetric(sc, "Number of batches consumed by probe-side"), - "input_rows" -> - SQLMetrics.createMetric(sc, "Number of rows consumed by probe-side"), - "output_batches" -> SQLMetrics.createMetric(sc, "Number of batches produced"), - "output_rows" -> SQLMetrics.createMetric(sc, "Number of rows produced"), - "join_time" -> SQLMetrics.createNanoTimingMetric(sc, "Total time for joining"), - "spill_count" -> SQLMetrics.createMetric(sc, "Count of spills"), - "spilled_bytes" -> SQLMetrics.createSizeMetric(sc, "Total spilled bytes"), - "spilled_rows" -> SQLMetrics.createMetric(sc, "Total spilled rows")) + defaultMetrics(sc) ++ + Map( + "peak_mem_used" -> + SQLMetrics.createSizeMetric(sc, "Memory used by build-side"), + "input_batches" -> + SQLMetrics.createMetric(sc, "Number of batches consumed by probe-side"), + "input_rows" -> + SQLMetrics.createMetric(sc, "Number of rows consumed by probe-side"), + "output_batches" -> SQLMetrics.createMetric(sc, "Number of batches produced"), + "output_rows" -> SQLMetrics.createMetric(sc, "Number of rows produced"), + "join_time" -> SQLMetrics.createNanoTimingMetric(sc, "Total time for joining"), + "spill_count" -> SQLMetrics.createMetric(sc, "Count of spills"), + "spilled_bytes" -> SQLMetrics.createSizeMetric(sc, "Total spilled bytes"), + "spilled_rows" -> SQLMetrics.createMetric(sc, "Total spilled rows")) } /** From baae1ba250c0813c8a09e77e5289e4f2c965ef21 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 22 Nov 2024 12:36:45 -0700 Subject: [PATCH 06/21] simplify approach --- .../core/src/execution/datafusion/planner.rs | 38 ++++++------------- .../src/execution/datafusion/spark_plan.rs | 20 +++++++++- 2 files changed, 30 insertions(+), 28 deletions(-) diff --git a/native/core/src/execution/datafusion/planner.rs b/native/core/src/execution/datafusion/planner.rs index ea1abdd4a..235d948e5 100644 --- a/native/core/src/execution/datafusion/planner.rs +++ b/native/core/src/execution/datafusion/planner.rs @@ -1028,11 +1028,10 @@ impl PhysicalPlanner { Ok(( scans, - Arc::new(SparkPlan::new_with_additional( + Arc::new(SparkPlan::new( spark_plan.plan_id, sort, vec![Arc::clone(&child)], - vec![Arc::clone(&child.native_plan)], )), )) } @@ -1078,14 +1077,12 @@ impl PhysicalPlanner { writer.output_index_file.clone(), )?); - // TODO this assumes that the child of a shuffle is always a ScanExec Ok(( scans, - Arc::new(SparkPlan::new_with_additional( + Arc::new(SparkPlan::new( spark_plan.plan_id, shuffle_writer, vec![Arc::clone(&child)], - vec![Arc::clone(&child.native_plan)], )), )) } @@ -1130,32 +1127,19 @@ impl PhysicalPlanner { // the data corruption. Note that we only need to copy the input batch // if the child operator is `ScanExec`, because other operators after `ScanExec` // will create new arrays for the output batch. - if can_reuse_input_batch(&child.native_plan) { - let child_copied = Arc::new(CopyExec::new( + let input = if can_reuse_input_batch(&child.native_plan) { + Arc::new(CopyExec::new( Arc::clone(&child.native_plan), CopyMode::UnpackOrDeepCopy, - )); - let expand = Arc::new(CometExpandExec::new(projections, child_copied, schema)); - Ok(( - scans, - Arc::new(SparkPlan::new_with_additional( - spark_plan.plan_id, - expand, - vec![Arc::clone(&child)], - vec![Arc::clone(&child.native_plan)], - )), )) } else { - let expand = Arc::new(CometExpandExec::new( - projections, - Arc::clone(&child.native_plan), - schema, - )); - Ok(( - scans, - Arc::new(SparkPlan::new(spark_plan.plan_id, expand, vec![child])), - )) - } + Arc::clone(&child.native_plan) + }; + let expand = Arc::new(CometExpandExec::new(projections, input, schema)); + Ok(( + scans, + Arc::new(SparkPlan::new(spark_plan.plan_id, expand, vec![child])), + )) } OpStruct::SortMergeJoin(join) => { let (join_params, scans) = self.parse_join_parameters( diff --git a/native/core/src/execution/datafusion/spark_plan.rs b/native/core/src/execution/datafusion/spark_plan.rs index 3fecba2ed..2df1cb9e3 100644 --- a/native/core/src/execution/datafusion/spark_plan.rs +++ b/native/core/src/execution/datafusion/spark_plan.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use crate::execution::operators::{CopyExec, ScanExec}; use arrow_schema::SchemaRef; use datafusion::physical_plan::ExecutionPlan; use std::sync::Arc; @@ -42,11 +43,15 @@ impl SparkPlan { native_plan: Arc, children: Vec>, ) -> Self { + let mut additional_native_plans: Vec> = vec![]; + for child in &children { + collect_additional_plans(child.native_plan.clone(), &mut additional_native_plans); + } Self { plan_id, native_plan, children, - additional_native_plans: vec![], + additional_native_plans, } } @@ -75,3 +80,16 @@ impl SparkPlan { &self.children } } + +fn collect_additional_plans( + child: Arc, + additional_native_plans: &mut Vec>, +) { + if child.as_any().is::() { + additional_native_plans.push(child.clone()); + // CopyExec may be wrapping a ScanExec + collect_additional_plans(child.children()[0].clone(), additional_native_plans); + } else if child.as_any().is::() { + additional_native_plans.push(child.clone()); + } +} From 68d0fdf71a0ab7aa937b09afedb9b8e0d30bf6c9 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 22 Nov 2024 13:24:44 -0700 Subject: [PATCH 07/21] add a unit test --- .../core/src/execution/datafusion/planner.rs | 32 ++++++++++++++++++- .../src/execution/datafusion/spark_plan.rs | 19 ++++++++++- .../spark/sql/comet/CometMetricNode.scala | 21 +++++------- 3 files changed, 57 insertions(+), 15 deletions(-) diff --git a/native/core/src/execution/datafusion/planner.rs b/native/core/src/execution/datafusion/planner.rs index 235d948e5..98cf4bd24 100644 --- a/native/core/src/execution/datafusion/planner.rs +++ b/native/core/src/execution/datafusion/planner.rs @@ -1221,7 +1221,6 @@ impl PhysicalPlanner { swap_hash_join(hash_join.as_ref(), PartitionMode::Partitioned)? }; - // TODO pass in additional plans (CopyExec, ScanExec) // TODO pass in child spark plans Ok(( scans, @@ -2529,4 +2528,35 @@ mod tests { })), } } + + #[test] + fn spark_plan_metrics() { + let op_scan = spark_operator::Operator { + plan_id: 0, + children: vec![], + op_struct: Some(spark_operator::operator::OpStruct::Scan( + spark_operator::Scan { + fields: vec![spark_expression::DataType { + type_id: 3, + type_info: None, + }], + source: "".to_string(), + }, + )), + }; + + let op = create_filter(op_scan, 0); + let planner = PhysicalPlanner::default(); + + let (mut _scans, filter_exec) = planner.create_plan(&op, &mut vec![]).unwrap(); + + assert_eq!("FilterExec", filter_exec.native_plan.name()); + assert_eq!(1, filter_exec.children.len()); + assert_eq!(1, filter_exec.additional_native_plans.len()); + assert_eq!("ScanExec", filter_exec.additional_native_plans[0].name()); + + let scan_exec = &filter_exec.children()[0]; + assert_eq!("ScanExec", scan_exec.native_plan.name()); + assert_eq!(0, scan_exec.additional_native_plans.len()); + } } diff --git a/native/core/src/execution/datafusion/spark_plan.rs b/native/core/src/execution/datafusion/spark_plan.rs index 2df1cb9e3..b697cc3b4 100644 --- a/native/core/src/execution/datafusion/spark_plan.rs +++ b/native/core/src/execution/datafusion/spark_plan.rs @@ -47,6 +47,16 @@ impl SparkPlan { for child in &children { collect_additional_plans(child.native_plan.clone(), &mut additional_native_plans); } + + println!( + "native_plan={}, additional={:?}", + native_plan.name(), + additional_native_plans + .iter() + .map(|x| x.name()) + .collect::>() + ); + Self { plan_id, native_plan, @@ -62,11 +72,18 @@ impl SparkPlan { children: Vec>, additional_native_plans: Vec>, ) -> Self { + let mut accum: Vec> = vec![native_plan.clone()]; + for plan in &additional_native_plans { + accum.push(Arc::clone(plan)); + } + for child in &children { + collect_additional_plans(child.native_plan.clone(), &mut accum); + } Self { plan_id, native_plan, children, - additional_native_plans, + additional_native_plans: accum, } } diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala index f07054e19..d517bf741 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala @@ -69,11 +69,6 @@ case class CometMetricNode(metrics: Map[String, SQLMetric], children: Seq[CometM object CometMetricNode { - def defaultMetrics(sc: SparkContext): Map[String, SQLMetric] = { - Map( - "jvm_fetch_time" -> SQLMetrics.createNanoTimingMetric(sc, "time fetching batches from JVM")) - } - /** * The baseline SQL metrics for DataFusion `BaselineMetrics`. */ @@ -82,23 +77,25 @@ object CometMetricNode { "output_rows" -> SQLMetrics.createMetric(sc, "number of output rows"), "elapsed_compute" -> SQLMetrics.createNanoTimingMetric( sc, - "total time (in ms) spent in this operator")) + "total time (in ms) spent in this operator"), + "jvm_fetch_time" -> SQLMetrics.createNanoTimingMetric(sc, "time fetching batches from JVM")) } /** * SQL Metrics for Comet native ScanExec */ def scanMetrics(sc: SparkContext): Map[String, SQLMetric] = { - Map( - "cast_time" -> - SQLMetrics.createNanoTimingMetric(sc, "Total time for casting columns")) + baselineMetrics(sc) ++ + Map( + "cast_time" -> + SQLMetrics.createNanoTimingMetric(sc, "Total time for casting columns")) } /** * SQL Metrics for DataFusion HashJoin */ def hashJoinMetrics(sc: SparkContext): Map[String, SQLMetric] = { - defaultMetrics(sc) ++ + baselineMetrics(sc) ++ Map( "build_time" -> SQLMetrics.createNanoTimingMetric(sc, "Total time for collecting build-side of join"), @@ -113,7 +110,6 @@ object CometMetricNode { "input_rows" -> SQLMetrics.createMetric(sc, "Number of rows consumed by probe-side"), "output_batches" -> SQLMetrics.createMetric(sc, "Number of batches produced"), - "output_rows" -> SQLMetrics.createMetric(sc, "Number of rows produced"), "join_time" -> SQLMetrics.createNanoTimingMetric(sc, "Total time for joining")) } @@ -121,7 +117,7 @@ object CometMetricNode { * SQL Metrics for DataFusion SortMergeJoin */ def sortMergeJoinMetrics(sc: SparkContext): Map[String, SQLMetric] = { - defaultMetrics(sc) ++ + baselineMetrics(sc) ++ Map( "peak_mem_used" -> SQLMetrics.createSizeMetric(sc, "Memory used by build-side"), @@ -130,7 +126,6 @@ object CometMetricNode { "input_rows" -> SQLMetrics.createMetric(sc, "Number of rows consumed by probe-side"), "output_batches" -> SQLMetrics.createMetric(sc, "Number of batches produced"), - "output_rows" -> SQLMetrics.createMetric(sc, "Number of rows produced"), "join_time" -> SQLMetrics.createNanoTimingMetric(sc, "Total time for joining"), "spill_count" -> SQLMetrics.createMetric(sc, "Count of spills"), "spilled_bytes" -> SQLMetrics.createSizeMetric(sc, "Total spilled bytes"), From 2347203b661f9984847b02df34f4a9c1001b6355 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 22 Nov 2024 14:19:54 -0700 Subject: [PATCH 08/21] save progress --- .../core/src/execution/datafusion/planner.rs | 157 ++++++++++++------ .../src/execution/datafusion/spark_plan.rs | 2 +- native/core/src/execution/metrics/utils.rs | 4 + 3 files changed, 111 insertions(+), 52 deletions(-) diff --git a/native/core/src/execution/datafusion/planner.rs b/native/core/src/execution/datafusion/planner.rs index 98cf4bd24..bb73dc062 100644 --- a/native/core/src/execution/datafusion/planner.rs +++ b/native/core/src/execution/datafusion/planner.rs @@ -128,8 +128,8 @@ type PhyExprResult = Result, String)>, ExecutionError type PartitionPhyExprResult = Result>, ExecutionError>; struct JoinParameters { - pub left: Arc, - pub right: Arc, + pub left: Arc, + pub right: Arc, pub join_on: Vec<(Arc, Arc)>, pub join_filter: Option, pub join_type: DFJoinType, @@ -1166,8 +1166,8 @@ impl PhysicalPlanner { .collect(); let join = Arc::new(SortMergeJoinExec::try_new( - join_params.left, - join_params.right, + Arc::clone(&join_params.left.native_plan), + Arc::clone(&join_params.right.native_plan), join_params.join_on, join_params.join_filter, join_params.join_type, @@ -1177,11 +1177,16 @@ impl PhysicalPlanner { false, )?); - // TODO pass in additional plans (CopyExec, ScanExec) - // TODO pass in child spark plans Ok(( scans, - Arc::new(SparkPlan::new(spark_plan.plan_id, join, vec![])), + Arc::new(SparkPlan::new( + spark_plan.plan_id, + join, + vec![ + Arc::clone(&join_params.left), + Arc::clone(&join_params.right), + ], + )), )) } OpStruct::HashJoin(join) => { @@ -1198,8 +1203,8 @@ impl PhysicalPlanner { // to copy the input batch to avoid the data corruption from reusing the input // batch. We also need to unpack dictionary arrays, because the join operators // do not support them. - let left = Self::wrap_in_copy_exec(join_params.left); - let right = Self::wrap_in_copy_exec(join_params.right); + let left = Self::wrap_in_copy_exec(Arc::clone(&join_params.left.native_plan)); + let right = Self::wrap_in_copy_exec(Arc::clone(&join_params.right.native_plan)); let hash_join = Arc::new(HashJoinExec::try_new( left, @@ -1215,17 +1220,30 @@ impl PhysicalPlanner { )?); // If the hash join is build right, we need to swap the left and right - let hash_join = if join.build_side == BuildSide::BuildLeft as i32 { - hash_join + if join.build_side == BuildSide::BuildLeft as i32 { + Ok(( + scans, + Arc::new(SparkPlan::new( + spark_plan.plan_id, + hash_join, + vec![join_params.left, join_params.right], + )), + )) } else { - swap_hash_join(hash_join.as_ref(), PartitionMode::Partitioned)? - }; - - // TODO pass in child spark plans - Ok(( - scans, - Arc::new(SparkPlan::new(spark_plan.plan_id, hash_join, vec![])), - )) + // we insert a projection around the hash join in this case + let projection = + swap_hash_join(hash_join.as_ref(), PartitionMode::Partitioned)?; + let swapped_hash_join = Arc::clone(&projection.children()[0]); + Ok(( + scans, + Arc::new(SparkPlan::new_with_additional( + spark_plan.plan_id, + projection, + vec![join_params.left, join_params.right], + vec![swapped_hash_join], + )), + )) + } } OpStruct::Window(wnd) => { let (scans, child) = self.create_plan(&children[0], inputs)?; @@ -1266,7 +1284,6 @@ impl PhysicalPlanner { )?); Ok(( scans, - // TODO additional metrics? Arc::new(SparkPlan::new(spark_plan.plan_id, window_agg, vec![child])), )) } @@ -1404,8 +1421,8 @@ impl PhysicalPlanner { Ok(( JoinParameters { - left: Arc::clone(&left.native_plan), - right: Arc::clone(&right.native_plan), + left: Arc::clone(&left), + right: Arc::clone(&right), join_on, join_type, join_filter, @@ -2272,6 +2289,7 @@ mod tests { use crate::execution::operators::ExecutionError; use datafusion_comet_proto::{ spark_expression::expr::ExprStruct::*, + spark_expression::Expr, spark_expression::{self, literal}, spark_operator, spark_operator::{operator::OpStruct, Operator}, @@ -2439,20 +2457,7 @@ mod tests { #[tokio::test()] #[allow(clippy::field_reassign_with_default)] async fn to_datafusion_filter() { - let op_scan = spark_operator::Operator { - plan_id: 0, - children: vec![], - op_struct: Some(spark_operator::operator::OpStruct::Scan( - spark_operator::Scan { - fields: vec![spark_expression::DataType { - type_id: 3, - type_info: None, - }], - source: "".to_string(), - }, - )), - }; - + let op_scan = create_scan(); let op = create_filter(op_scan, 0); let planner = PhysicalPlanner::default(); @@ -2530,21 +2535,8 @@ mod tests { } #[test] - fn spark_plan_metrics() { - let op_scan = spark_operator::Operator { - plan_id: 0, - children: vec![], - op_struct: Some(spark_operator::operator::OpStruct::Scan( - spark_operator::Scan { - fields: vec![spark_expression::DataType { - type_id: 3, - type_info: None, - }], - source: "".to_string(), - }, - )), - }; - + fn spark_plan_metrics_filter() { + let op_scan = create_scan(); let op = create_filter(op_scan, 0); let planner = PhysicalPlanner::default(); @@ -2559,4 +2551,67 @@ mod tests { assert_eq!("ScanExec", scan_exec.native_plan.name()); assert_eq!(0, scan_exec.additional_native_plans.len()); } + + #[test] + fn spark_plan_metrics_hash_join() { + let op_scan = create_scan(); + let op_join = Operator { + plan_id: 0, + children: vec![op_scan.clone(), op_scan.clone()], + op_struct: Some(spark_operator::operator::OpStruct::HashJoin( + spark_operator::HashJoin { + left_join_keys: vec![create_bound_reference(0)], + right_join_keys: vec![create_bound_reference(0)], + join_type: 0, + condition: None, + build_side: 0, + }, + )), + }; + + let planner = PhysicalPlanner::default(); + + let (_scans, hash_join_exec) = planner.create_plan(&op_join, &mut vec![]).unwrap(); + + assert_eq!("HashJoinExec", hash_join_exec.native_plan.name()); + assert_eq!(2, hash_join_exec.children.len()); + assert_eq!("ScanExec", hash_join_exec.children[0].native_plan.name()); + assert_eq!("ScanExec", hash_join_exec.children[1].native_plan.name()); + + assert_eq!(2, hash_join_exec.additional_native_plans.len()); + assert_eq!("ScanExec", hash_join_exec.additional_native_plans[0].name()); + assert_eq!("ScanExec", hash_join_exec.additional_native_plans[1].name()); + } + + fn create_bound_reference(index: i32) -> Expr { + spark_expression::Expr { + expr_struct: Some(spark_expression::expr::ExprStruct::Bound( + spark_expression::BoundReference { + index, + datatype: Some(create_proto_datatype()), + }, + )), + } + } + + fn create_scan() -> Operator { + let op_scan = spark_operator::Operator { + plan_id: 0, + children: vec![], + op_struct: Some(spark_operator::operator::OpStruct::Scan( + spark_operator::Scan { + fields: vec![create_proto_datatype()], + source: "".to_string(), + }, + )), + }; + op_scan + } + + fn create_proto_datatype() -> spark_expression::DataType { + spark_expression::DataType { + type_id: 3, + type_info: None, + } + } } diff --git a/native/core/src/execution/datafusion/spark_plan.rs b/native/core/src/execution/datafusion/spark_plan.rs index b697cc3b4..86e2ad6ea 100644 --- a/native/core/src/execution/datafusion/spark_plan.rs +++ b/native/core/src/execution/datafusion/spark_plan.rs @@ -49,7 +49,7 @@ impl SparkPlan { } println!( - "native_plan={}, additional={:?}", + "SparkPlan::new() native_plan={}, additional={:?}", native_plan.name(), additional_native_plans .iter() diff --git a/native/core/src/execution/metrics/utils.rs b/native/core/src/execution/metrics/utils.rs index 585de3085..9248dd7ca 100644 --- a/native/core/src/execution/metrics/utils.rs +++ b/native/core/src/execution/metrics/utils.rs @@ -23,6 +23,7 @@ use crate::{ }; use jni::objects::{GlobalRef, JString}; use jni::{objects::JObject, JNIEnv}; +use log::{info, warn}; use std::collections::HashMap; use std::sync::Arc; @@ -35,6 +36,8 @@ pub fn update_comet_metric( spark_plan: &Arc, metrics_jstrings: &mut HashMap>, ) -> Result<(), CometError> { + info!("update_comet_metric for {}", spark_plan.native_plan.name()); + // combine all metrics from all native plans for this SparkPlan let metrics = if spark_plan.additional_native_plans.is_empty() { spark_plan.native_plan.metrics() @@ -67,6 +70,7 @@ pub fn update_comet_metric( comet_metric_node(metric_node).get_child_node(i as i32) -> JObject )?; if child_metric_node.is_null() { + warn!("child_metric_node was null"); continue; } update_comet_metric(env, &child_metric_node, child_plan, metrics_jstrings)?; From d0aeda1fdda385c2315b62ae7dc7ca84de382318 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 22 Nov 2024 15:36:48 -0700 Subject: [PATCH 09/21] remove debug, add specific jvm timer --- .../core/src/execution/datafusion/planner.rs | 14 ++++++-- .../execution/datafusion/shuffle_writer.rs | 6 ---- .../src/execution/datafusion/spark_plan.rs | 12 +------ native/core/src/execution/metrics/utils.rs | 16 ++++++--- native/core/src/execution/operators/scan.rs | 35 +++++++++++-------- 5 files changed, 45 insertions(+), 38 deletions(-) diff --git a/native/core/src/execution/datafusion/planner.rs b/native/core/src/execution/datafusion/planner.rs index bb73dc062..eec2a7dd7 100644 --- a/native/core/src/execution/datafusion/planner.rs +++ b/native/core/src/execution/datafusion/planner.rs @@ -56,7 +56,7 @@ use datafusion::functions_aggregate::min_max::max_udaf; use datafusion::functions_aggregate::min_max::min_udaf; use datafusion::functions_aggregate::sum::sum_udaf; use datafusion::physical_plan::windows::BoundedWindowAggExec; -use datafusion::physical_plan::InputOrderMode; +use datafusion::physical_plan::{displayable, InputOrderMode}; use datafusion::{ arrow::{compute::SortOptions, datatypes::SchemaRef}, common::DataFusionError, @@ -1234,13 +1234,23 @@ impl PhysicalPlanner { let projection = swap_hash_join(hash_join.as_ref(), PartitionMode::Partitioned)?; let swapped_hash_join = Arc::clone(&projection.children()[0]); + println!( + "Planner creating SparkPlan for HashJoin: {}", + displayable(projection.as_ref()).indent(true) + ); + let mut additional_native_plans = swapped_hash_join + .children() + .iter() + .map(|p| p.to_owned().clone()) + .collect::>(); + additional_native_plans.push(Arc::clone(&swapped_hash_join)); Ok(( scans, Arc::new(SparkPlan::new_with_additional( spark_plan.plan_id, projection, vec![join_params.left, join_params.right], - vec![swapped_hash_join], + additional_native_plans, )), )) } diff --git a/native/core/src/execution/datafusion/shuffle_writer.rs b/native/core/src/execution/datafusion/shuffle_writer.rs index c79eeeb4a..de69a1532 100644 --- a/native/core/src/execution/datafusion/shuffle_writer.rs +++ b/native/core/src/execution/datafusion/shuffle_writer.rs @@ -139,7 +139,6 @@ impl ExecutionPlan for ShuffleWriterExec { ) -> Result { let input = self.input.execute(partition, Arc::clone(&context))?; let metrics = ShuffleRepartitionerMetrics::new(&self.metrics, 0); - let jvm_fetch_time = MetricBuilder::new(&self.metrics).subset_time("jvm_fetch_time", 0); Ok(Box::pin(RecordBatchStreamAdapter::new( self.schema(), @@ -152,7 +151,6 @@ impl ExecutionPlan for ShuffleWriterExec { self.partitioning.clone(), metrics, context, - jvm_fetch_time, ) .map_err(|e| ArrowError::ExternalError(Box::new(e))), ) @@ -1094,7 +1092,6 @@ async fn external_shuffle( partitioning: Partitioning, metrics: ShuffleRepartitionerMetrics, context: Arc, - jvm_fetch_time: Time, ) -> Result { let schema = input.schema(); let mut repartitioner = ShuffleRepartitioner::new( @@ -1109,10 +1106,7 @@ async fn external_shuffle( ); loop { - let mut timer = jvm_fetch_time.timer(); let b = input.next().await; - timer.stop(); - match b { Some(batch_result) => { // Block on the repartitioner to insert the batch and shuffle the rows diff --git a/native/core/src/execution/datafusion/spark_plan.rs b/native/core/src/execution/datafusion/spark_plan.rs index 86e2ad6ea..9aa51f5e4 100644 --- a/native/core/src/execution/datafusion/spark_plan.rs +++ b/native/core/src/execution/datafusion/spark_plan.rs @@ -47,16 +47,6 @@ impl SparkPlan { for child in &children { collect_additional_plans(child.native_plan.clone(), &mut additional_native_plans); } - - println!( - "SparkPlan::new() native_plan={}, additional={:?}", - native_plan.name(), - additional_native_plans - .iter() - .map(|x| x.name()) - .collect::>() - ); - Self { plan_id, native_plan, @@ -72,7 +62,7 @@ impl SparkPlan { children: Vec>, additional_native_plans: Vec>, ) -> Self { - let mut accum: Vec> = vec![native_plan.clone()]; + let mut accum: Vec> = vec![]; for plan in &additional_native_plans { accum.push(Arc::clone(plan)); } diff --git a/native/core/src/execution/metrics/utils.rs b/native/core/src/execution/metrics/utils.rs index 9248dd7ca..59b67d188 100644 --- a/native/core/src/execution/metrics/utils.rs +++ b/native/core/src/execution/metrics/utils.rs @@ -21,9 +21,9 @@ use crate::{ errors::CometError, jvm_bridge::{jni_call, jni_new_string}, }; +use datafusion::physical_plan::metrics::MetricValue; use jni::objects::{GlobalRef, JString}; use jni::{objects::JObject, JNIEnv}; -use log::{info, warn}; use std::collections::HashMap; use std::sync::Arc; @@ -36,8 +36,6 @@ pub fn update_comet_metric( spark_plan: &Arc, metrics_jstrings: &mut HashMap>, ) -> Result<(), CometError> { - info!("update_comet_metric for {}", spark_plan.native_plan.name()); - // combine all metrics from all native plans for this SparkPlan let metrics = if spark_plan.additional_native_plans.is_empty() { spark_plan.native_plan.metrics() @@ -46,7 +44,16 @@ pub fn update_comet_metric( for plan in &spark_plan.additional_native_plans { let additional_metrics = plan.metrics().unwrap_or_default(); for c in additional_metrics.iter() { - metrics.push(c.to_owned()); + match c.value() { + MetricValue::ElapsedCompute(_) + | MetricValue::Time { .. } + | MetricValue::SpilledRows(_) + | MetricValue::SpillCount(_) + | MetricValue::SpilledBytes(_) => { + metrics.push(c.to_owned()); + } + _ => {} + } } } Some(metrics.aggregate_by_name()) @@ -70,7 +77,6 @@ pub fn update_comet_metric( comet_metric_node(metric_node).get_child_node(i as i32) -> JObject )?; if child_metric_node.is_null() { - warn!("child_metric_node was null"); continue; } update_comet_metric(env, &child_metric_node, child_plan, metrics_jstrings)?; diff --git a/native/core/src/execution/operators/scan.rs b/native/core/src/execution/operators/scan.rs index 2cb8a84d9..69f384666 100644 --- a/native/core/src/execution/operators/scan.rs +++ b/native/core/src/execution/operators/scan.rs @@ -77,6 +77,8 @@ pub struct ScanExec { metrics: ExecutionPlanMetricsSet, /// Baseline metrics baseline_metrics: BaselineMetrics, + /// Timer + jvm_fetch_time: Time, } impl ScanExec { @@ -88,6 +90,7 @@ impl ScanExec { ) -> Result { let metrics_set = ExecutionPlanMetricsSet::default(); let baseline_metrics = BaselineMetrics::new(&metrics_set, 0); + let jvm_fetch_time = MetricBuilder::new(&metrics_set).subset_time("jvm_fetch_time", 0); // Scan's schema is determined by the input batch, so we need to set it before execution. // Note that we determine if arrays are dictionary-encoded based on the @@ -97,8 +100,12 @@ impl ScanExec { // Dictionary-encoded primitive arrays are always unpacked. let first_batch = if let Some(input_source) = input_source.as_ref() { let mut timer = baseline_metrics.elapsed_compute().timer(); - let batch = - ScanExec::get_next(exec_context_id, input_source.as_obj(), data_types.len())?; + let batch = ScanExec::get_next( + exec_context_id, + input_source.as_obj(), + data_types.len(), + &jvm_fetch_time, + )?; timer.stop(); batch } else { @@ -124,6 +131,7 @@ impl ScanExec { cache, metrics: metrics_set, baseline_metrics, + jvm_fetch_time, schema, }) } @@ -171,6 +179,7 @@ impl ScanExec { self.exec_context_id, self.input_source.as_ref().unwrap().as_obj(), self.data_types.len(), + &self.jvm_fetch_time, )?; *current_batch = Some(next_batch); } @@ -185,6 +194,7 @@ impl ScanExec { exec_context_id: i64, iter: &JObject, num_cols: usize, + jvm_fetch_time: &Time, ) -> Result { if exec_context_id == TEST_EXEC_CONTEXT_ID { // This is a unit test. We don't need to call JNI. @@ -197,6 +207,7 @@ impl ScanExec { exec_context_id )))); } + let mut timer = jvm_fetch_time.timer(); let mut env = JVMClasses::get_env()?; @@ -255,6 +266,8 @@ impl ScanExec { } } + timer.stop(); + Ok(InputBatch::new(inputs, Some(num_rows as usize))) } } @@ -324,7 +337,7 @@ impl ExecutionPlan for ScanExec { self.clone(), self.schema(), partition, - self.baseline_metrics.clone(), + self.jvm_fetch_time.clone(), ))) } @@ -365,28 +378,23 @@ struct ScanStream<'a> { scan: ScanExec, /// Schema representing the data schema: SchemaRef, - /// Metrics - baseline_metrics: BaselineMetrics, /// Cast options cast_options: CastOptions<'a>, /// elapsed time for casting columns to different data types during scan cast_time: Time, + /// elapsed time for fetching arrow arrays from JVM + jvm_fetch_time: Time, } impl<'a> ScanStream<'a> { - pub fn new( - scan: ScanExec, - schema: SchemaRef, - partition: usize, - baseline_metrics: BaselineMetrics, - ) -> Self { + pub fn new(scan: ScanExec, schema: SchemaRef, partition: usize, jvm_fetch_time: Time) -> Self { let cast_time = MetricBuilder::new(&scan.metrics).subset_time("cast_time", partition); Self { scan, schema, - baseline_metrics, cast_options: CastOptions::default(), cast_time, + jvm_fetch_time, } } @@ -426,7 +434,7 @@ impl<'a> Stream for ScanStream<'a> { type Item = DataFusionResult; fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { - let mut timer = self.baseline_metrics.elapsed_compute().timer(); + let mut timer = self.jvm_fetch_time.timer(); let mut scan_batch = self.scan.batch.try_lock().unwrap(); let input_batch = &*scan_batch; @@ -440,7 +448,6 @@ impl<'a> Stream for ScanStream<'a> { let result = match input_batch { InputBatch::EOF => Poll::Ready(None), InputBatch::Batch(columns, num_rows) => { - self.baseline_metrics.record_output(*num_rows); let maybe_batch = self.build_record_batch(columns, *num_rows); Poll::Ready(Some(maybe_batch)) } From 3359738e7c3f6ad39efbf8d946b609cb6ffcd3a9 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 22 Nov 2024 15:39:25 -0700 Subject: [PATCH 10/21] fix --- native/core/src/execution/metrics/utils.rs | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/native/core/src/execution/metrics/utils.rs b/native/core/src/execution/metrics/utils.rs index 59b67d188..1bf0e220b 100644 --- a/native/core/src/execution/metrics/utils.rs +++ b/native/core/src/execution/metrics/utils.rs @@ -45,14 +45,10 @@ pub fn update_comet_metric( let additional_metrics = plan.metrics().unwrap_or_default(); for c in additional_metrics.iter() { match c.value() { - MetricValue::ElapsedCompute(_) - | MetricValue::Time { .. } - | MetricValue::SpilledRows(_) - | MetricValue::SpillCount(_) - | MetricValue::SpilledBytes(_) => { - metrics.push(c.to_owned()); + MetricValue::OutputRows(_) => { + // we do not want to double count output rows } - _ => {} + _ => metrics.push(c.to_owned()) } } } From 0a8a06bffe4af4417b88ac964ff6c891a07a1eee Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 22 Nov 2024 15:45:31 -0700 Subject: [PATCH 11/21] clippy --- native/core/src/execution/datafusion/planner.rs | 4 ++-- native/core/src/execution/datafusion/spark_plan.rs | 10 +++++----- native/core/src/execution/metrics/utils.rs | 2 +- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/native/core/src/execution/datafusion/planner.rs b/native/core/src/execution/datafusion/planner.rs index eec2a7dd7..69b106979 100644 --- a/native/core/src/execution/datafusion/planner.rs +++ b/native/core/src/execution/datafusion/planner.rs @@ -1233,7 +1233,7 @@ impl PhysicalPlanner { // we insert a projection around the hash join in this case let projection = swap_hash_join(hash_join.as_ref(), PartitionMode::Partitioned)?; - let swapped_hash_join = Arc::clone(&projection.children()[0]); + let swapped_hash_join = Arc::clone(projection.children()[0]); println!( "Planner creating SparkPlan for HashJoin: {}", displayable(projection.as_ref()).indent(true) @@ -1241,7 +1241,7 @@ impl PhysicalPlanner { let mut additional_native_plans = swapped_hash_join .children() .iter() - .map(|p| p.to_owned().clone()) + .map(|p| Arc::clone(p)) .collect::>(); additional_native_plans.push(Arc::clone(&swapped_hash_join)); Ok(( diff --git a/native/core/src/execution/datafusion/spark_plan.rs b/native/core/src/execution/datafusion/spark_plan.rs index 9aa51f5e4..356e90ece 100644 --- a/native/core/src/execution/datafusion/spark_plan.rs +++ b/native/core/src/execution/datafusion/spark_plan.rs @@ -45,7 +45,7 @@ impl SparkPlan { ) -> Self { let mut additional_native_plans: Vec> = vec![]; for child in &children { - collect_additional_plans(child.native_plan.clone(), &mut additional_native_plans); + collect_additional_plans(Arc::clone(&child.native_plan), &mut additional_native_plans); } Self { plan_id, @@ -67,7 +67,7 @@ impl SparkPlan { accum.push(Arc::clone(plan)); } for child in &children { - collect_additional_plans(child.native_plan.clone(), &mut accum); + collect_additional_plans(Arc::clone(&child.native_plan), &mut accum); } Self { plan_id, @@ -93,10 +93,10 @@ fn collect_additional_plans( additional_native_plans: &mut Vec>, ) { if child.as_any().is::() { - additional_native_plans.push(child.clone()); + additional_native_plans.push(Arc::clone(&child)); // CopyExec may be wrapping a ScanExec - collect_additional_plans(child.children()[0].clone(), additional_native_plans); + collect_additional_plans(Arc::clone(child.children()[0]), additional_native_plans); } else if child.as_any().is::() { - additional_native_plans.push(child.clone()); + additional_native_plans.push(Arc::clone(&child)); } } diff --git a/native/core/src/execution/metrics/utils.rs b/native/core/src/execution/metrics/utils.rs index 1bf0e220b..4bb1c4474 100644 --- a/native/core/src/execution/metrics/utils.rs +++ b/native/core/src/execution/metrics/utils.rs @@ -48,7 +48,7 @@ pub fn update_comet_metric( MetricValue::OutputRows(_) => { // we do not want to double count output rows } - _ => metrics.push(c.to_owned()) + _ => metrics.push(c.to_owned()), } } } From 40e90d5640b13c6ea30d8b5d3d0fc1a5e1de1589 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 22 Nov 2024 15:50:25 -0700 Subject: [PATCH 12/21] clippy --- native/core/src/execution/datafusion/planner.rs | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/native/core/src/execution/datafusion/planner.rs b/native/core/src/execution/datafusion/planner.rs index 69b106979..2b36a228a 100644 --- a/native/core/src/execution/datafusion/planner.rs +++ b/native/core/src/execution/datafusion/planner.rs @@ -2568,7 +2568,7 @@ mod tests { let op_join = Operator { plan_id: 0, children: vec![op_scan.clone(), op_scan.clone()], - op_struct: Some(spark_operator::operator::OpStruct::HashJoin( + op_struct: Some(OpStruct::HashJoin( spark_operator::HashJoin { left_join_keys: vec![create_bound_reference(0)], right_join_keys: vec![create_bound_reference(0)], @@ -2594,8 +2594,8 @@ mod tests { } fn create_bound_reference(index: i32) -> Expr { - spark_expression::Expr { - expr_struct: Some(spark_expression::expr::ExprStruct::Bound( + Expr { + expr_struct: Some(Bound( spark_expression::BoundReference { index, datatype: Some(create_proto_datatype()), @@ -2605,17 +2605,16 @@ mod tests { } fn create_scan() -> Operator { - let op_scan = spark_operator::Operator { + Operator { plan_id: 0, children: vec![], - op_struct: Some(spark_operator::operator::OpStruct::Scan( + op_struct: Some(OpStruct::Scan( spark_operator::Scan { fields: vec![create_proto_datatype()], source: "".to_string(), }, )), - }; - op_scan + } } fn create_proto_datatype() -> spark_expression::DataType { From 5bfe33464653d480eeef1585a6948861a6bb7ebf Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 22 Nov 2024 15:52:22 -0700 Subject: [PATCH 13/21] format --- .../core/src/execution/datafusion/planner.rs | 36 ++++++++----------- 1 file changed, 15 insertions(+), 21 deletions(-) diff --git a/native/core/src/execution/datafusion/planner.rs b/native/core/src/execution/datafusion/planner.rs index 2b36a228a..79aaf0c0d 100644 --- a/native/core/src/execution/datafusion/planner.rs +++ b/native/core/src/execution/datafusion/planner.rs @@ -2568,15 +2568,13 @@ mod tests { let op_join = Operator { plan_id: 0, children: vec![op_scan.clone(), op_scan.clone()], - op_struct: Some(OpStruct::HashJoin( - spark_operator::HashJoin { - left_join_keys: vec![create_bound_reference(0)], - right_join_keys: vec![create_bound_reference(0)], - join_type: 0, - condition: None, - build_side: 0, - }, - )), + op_struct: Some(OpStruct::HashJoin(spark_operator::HashJoin { + left_join_keys: vec![create_bound_reference(0)], + right_join_keys: vec![create_bound_reference(0)], + join_type: 0, + condition: None, + build_side: 0, + })), }; let planner = PhysicalPlanner::default(); @@ -2595,12 +2593,10 @@ mod tests { fn create_bound_reference(index: i32) -> Expr { Expr { - expr_struct: Some(Bound( - spark_expression::BoundReference { - index, - datatype: Some(create_proto_datatype()), - }, - )), + expr_struct: Some(Bound(spark_expression::BoundReference { + index, + datatype: Some(create_proto_datatype()), + })), } } @@ -2608,12 +2604,10 @@ mod tests { Operator { plan_id: 0, children: vec![], - op_struct: Some(OpStruct::Scan( - spark_operator::Scan { - fields: vec![create_proto_datatype()], - source: "".to_string(), - }, - )), + op_struct: Some(OpStruct::Scan(spark_operator::Scan { + fields: vec![create_proto_datatype()], + source: "".to_string(), + })), } } From 0a3044ed7193763fdf301904aea7cd2b573da634 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 24 Nov 2024 08:20:16 -0700 Subject: [PATCH 14/21] Revert some changes, update documentation --- docs/source/user-guide/tuning.md | 14 ++++++++++---- native/core/src/execution/operators/scan.rs | 18 ++++++++++++------ .../spark/sql/comet/CometMetricNode.scala | 7 +++---- 3 files changed, 25 insertions(+), 14 deletions(-) diff --git a/docs/source/user-guide/tuning.md b/docs/source/user-guide/tuning.md index e0d7c1bc9..1bebdf7d9 100644 --- a/docs/source/user-guide/tuning.md +++ b/docs/source/user-guide/tuning.md @@ -113,8 +113,14 @@ Some Comet metrics are not directly comparable to Spark metrics in some cases: Comet also adds some custom metrics: -### ShuffleWriterExec +### CometScanExec -| Metric | Description | -| ---------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | -| `jvm_fetch_time` | Measure the time it takes for `ShuffleWriterExec` to fetch batches from the JVM. Note that this does not include the execution time of the query that produced the input batches. | +| Metric | Description | +|------------|-------------------------------------------------------------------------------------------------------------------------------------------------| +| `scanTime` | Total time to read Parquet batches into JVM. This does not include the Arrow FFI cost of exporting these batches to native code for processing. | + +### Common to all Comet Executors + +| Metric | Description | +| ---------------- |----------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `jvm_fetch_time` | Measure the time it takes for an operatior to fetch batches from the JVM. Note that this does not include the execution time of the query that produced the input batches. | diff --git a/native/core/src/execution/operators/scan.rs b/native/core/src/execution/operators/scan.rs index 69f384666..ce4cd7ee1 100644 --- a/native/core/src/execution/operators/scan.rs +++ b/native/core/src/execution/operators/scan.rs @@ -337,7 +337,7 @@ impl ExecutionPlan for ScanExec { self.clone(), self.schema(), partition, - self.jvm_fetch_time.clone(), + self.baseline_metrics.clone(), ))) } @@ -378,23 +378,28 @@ struct ScanStream<'a> { scan: ScanExec, /// Schema representing the data schema: SchemaRef, + /// Metrics + baseline_metrics: BaselineMetrics, /// Cast options cast_options: CastOptions<'a>, /// elapsed time for casting columns to different data types during scan cast_time: Time, - /// elapsed time for fetching arrow arrays from JVM - jvm_fetch_time: Time, } impl<'a> ScanStream<'a> { - pub fn new(scan: ScanExec, schema: SchemaRef, partition: usize, jvm_fetch_time: Time) -> Self { + pub fn new( + scan: ScanExec, + schema: SchemaRef, + partition: usize, + baseline_metrics: BaselineMetrics, + ) -> Self { let cast_time = MetricBuilder::new(&scan.metrics).subset_time("cast_time", partition); Self { scan, schema, + baseline_metrics, cast_options: CastOptions::default(), cast_time, - jvm_fetch_time, } } @@ -434,7 +439,7 @@ impl<'a> Stream for ScanStream<'a> { type Item = DataFusionResult; fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { - let mut timer = self.jvm_fetch_time.timer(); + let mut timer = self.baseline_metrics.elapsed_compute().timer(); let mut scan_batch = self.scan.batch.try_lock().unwrap(); let input_batch = &*scan_batch; @@ -448,6 +453,7 @@ impl<'a> Stream for ScanStream<'a> { let result = match input_batch { InputBatch::EOF => Poll::Ready(None), InputBatch::Batch(columns, num_rows) => { + self.baseline_metrics.record_output(*num_rows); let maybe_batch = self.build_record_batch(columns, *num_rows); Poll::Ready(Some(maybe_batch)) } diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala index d517bf741..b40e1016a 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala @@ -85,10 +85,9 @@ object CometMetricNode { * SQL Metrics for Comet native ScanExec */ def scanMetrics(sc: SparkContext): Map[String, SQLMetric] = { - baselineMetrics(sc) ++ - Map( - "cast_time" -> - SQLMetrics.createNanoTimingMetric(sc, "Total time for casting columns")) + Map( + "cast_time" -> + SQLMetrics.createNanoTimingMetric(sc, "Total time for casting columns")) } /** From d430175f44b267da677647afd6a0e111a7439c36 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 24 Nov 2024 08:20:38 -0700 Subject: [PATCH 15/21] fix typo --- docs/source/user-guide/tuning.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/source/user-guide/tuning.md b/docs/source/user-guide/tuning.md index 1bebdf7d9..145d28b28 100644 --- a/docs/source/user-guide/tuning.md +++ b/docs/source/user-guide/tuning.md @@ -116,11 +116,11 @@ Comet also adds some custom metrics: ### CometScanExec | Metric | Description | -|------------|-------------------------------------------------------------------------------------------------------------------------------------------------| +| ---------- | ----------------------------------------------------------------------------------------------------------------------------------------------- | | `scanTime` | Total time to read Parquet batches into JVM. This does not include the Arrow FFI cost of exporting these batches to native code for processing. | ### Common to all Comet Executors -| Metric | Description | -| ---------------- |----------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| `jvm_fetch_time` | Measure the time it takes for an operatior to fetch batches from the JVM. Note that this does not include the execution time of the query that produced the input batches. | +| Metric | Description | +| ---------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| `jvm_fetch_time` | Measure the time it takes for an operator to fetch batches from the JVM. Note that this does not include the execution time of the query that produced the input batches. | From ff5076dce135eba987aa168d71d9b4a434f770df Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 24 Nov 2024 08:32:17 -0700 Subject: [PATCH 16/21] fix typo --- docs/source/user-guide/tuning.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/source/user-guide/tuning.md b/docs/source/user-guide/tuning.md index 145d28b28..6b545a32c 100644 --- a/docs/source/user-guide/tuning.md +++ b/docs/source/user-guide/tuning.md @@ -119,7 +119,7 @@ Comet also adds some custom metrics: | ---------- | ----------------------------------------------------------------------------------------------------------------------------------------------- | | `scanTime` | Total time to read Parquet batches into JVM. This does not include the Arrow FFI cost of exporting these batches to native code for processing. | -### Common to all Comet Executors +### Common to all Comet Operators | Metric | Description | | ---------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | From ca660955dce22300ef2cf016fa74cbc1e652f834 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 24 Nov 2024 10:45:30 -0700 Subject: [PATCH 17/21] measure more FFI time --- .../scala/org/apache/comet/vector/NativeUtil.scala | 9 +++++++++ docs/source/user-guide/tuning.md | 6 +++--- native/core/src/execution/operators/scan.rs | 14 +++++++------- .../scala/org/apache/comet/CometExecIterator.scala | 3 +++ .../apache/spark/sql/comet/CometMetricNode.scala | 5 ++++- .../shuffle/CometShuffleExchangeExec.scala | 11 ++++++----- .../org/apache/spark/sql/comet/operators.scala | 6 ++++-- 7 files changed, 36 insertions(+), 18 deletions(-) diff --git a/common/src/main/scala/org/apache/comet/vector/NativeUtil.scala b/common/src/main/scala/org/apache/comet/vector/NativeUtil.scala index 72472a540..028f81796 100644 --- a/common/src/main/scala/org/apache/comet/vector/NativeUtil.scala +++ b/common/src/main/scala/org/apache/comet/vector/NativeUtil.scala @@ -26,6 +26,7 @@ import org.apache.arrow.vector.VectorSchemaRoot import org.apache.arrow.vector.dictionary.DictionaryProvider import org.apache.spark.SparkException import org.apache.spark.sql.comet.util.Utils +import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.comet.CometArrowAllocator @@ -148,12 +149,17 @@ class NativeUtil { */ def getNextBatch( numOutputCols: Int, + arrowFfiMetric: Option[SQLMetric], func: (Array[Long], Array[Long]) => Long): Option[ColumnarBatch] = { + + val start = System.nanoTime() val (arrays, schemas) = allocateArrowStructs(numOutputCols) val arrayAddrs = arrays.map(_.memoryAddress()) val schemaAddrs = schemas.map(_.memoryAddress()) + var arrowFfiTime = System.nanoTime() - start + val result = func(arrayAddrs, schemaAddrs) result match { @@ -161,7 +167,10 @@ class NativeUtil { // EOF None case numRows => + val start = System.nanoTime() val cometVectors = importVector(arrays, schemas) + arrowFfiTime += System.nanoTime() - start + arrowFfiMetric.foreach(_.add(arrowFfiTime)) Some(new ColumnarBatch(cometVectors.toArray, numRows.toInt)) case flag => throw new IllegalStateException(s"Invalid native flag: $flag") diff --git a/docs/source/user-guide/tuning.md b/docs/source/user-guide/tuning.md index 6b545a32c..a84ff5a34 100644 --- a/docs/source/user-guide/tuning.md +++ b/docs/source/user-guide/tuning.md @@ -121,6 +121,6 @@ Comet also adds some custom metrics: ### Common to all Comet Operators -| Metric | Description | -| ---------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | -| `jvm_fetch_time` | Measure the time it takes for an operator to fetch batches from the JVM. Note that this does not include the execution time of the query that produced the input batches. | +| Metric | Description | +| ---------------- | ------------------------------------------------------------------------------------------------ | +| `arrow_ffi_time` | Measure the time it takes to transfer Arrow batches between JVM and native code using Arrow FFI. | diff --git a/native/core/src/execution/operators/scan.rs b/native/core/src/execution/operators/scan.rs index ce4cd7ee1..cc9a1176f 100644 --- a/native/core/src/execution/operators/scan.rs +++ b/native/core/src/execution/operators/scan.rs @@ -78,7 +78,7 @@ pub struct ScanExec { /// Baseline metrics baseline_metrics: BaselineMetrics, /// Timer - jvm_fetch_time: Time, + arrow_ffi_time: Time, } impl ScanExec { @@ -90,7 +90,7 @@ impl ScanExec { ) -> Result { let metrics_set = ExecutionPlanMetricsSet::default(); let baseline_metrics = BaselineMetrics::new(&metrics_set, 0); - let jvm_fetch_time = MetricBuilder::new(&metrics_set).subset_time("jvm_fetch_time", 0); + let arrow_ffi_time = MetricBuilder::new(&metrics_set).subset_time("arrow_ffi_time", 0); // Scan's schema is determined by the input batch, so we need to set it before execution. // Note that we determine if arrays are dictionary-encoded based on the @@ -104,7 +104,7 @@ impl ScanExec { exec_context_id, input_source.as_obj(), data_types.len(), - &jvm_fetch_time, + &arrow_ffi_time, )?; timer.stop(); batch @@ -131,7 +131,7 @@ impl ScanExec { cache, metrics: metrics_set, baseline_metrics, - jvm_fetch_time, + arrow_ffi_time, schema, }) } @@ -179,7 +179,7 @@ impl ScanExec { self.exec_context_id, self.input_source.as_ref().unwrap().as_obj(), self.data_types.len(), - &self.jvm_fetch_time, + &self.arrow_ffi_time, )?; *current_batch = Some(next_batch); } @@ -194,7 +194,7 @@ impl ScanExec { exec_context_id: i64, iter: &JObject, num_cols: usize, - jvm_fetch_time: &Time, + arrow_ffi_time: &Time, ) -> Result { if exec_context_id == TEST_EXEC_CONTEXT_ID { // This is a unit test. We don't need to call JNI. @@ -207,7 +207,7 @@ impl ScanExec { exec_context_id )))); } - let mut timer = jvm_fetch_time.timer(); + let mut timer = arrow_ffi_time.timer(); let mut env = JVMClasses::get_env()?; diff --git a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala index 8a349bd37..e68a49229 100644 --- a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala +++ b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala @@ -21,6 +21,7 @@ package org.apache.comet import org.apache.spark._ import org.apache.spark.sql.comet.CometMetricNode +import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.vectorized._ import org.apache.comet.CometConf.{COMET_BATCH_SIZE, COMET_BLOCKING_THREADS, COMET_DEBUG_ENABLED, COMET_EXPLAIN_NATIVE_ENABLED, COMET_WORKER_THREADS} @@ -45,6 +46,7 @@ class CometExecIterator( inputs: Seq[Iterator[ColumnarBatch]], numOutputCols: Int, protobufQueryPlan: Array[Byte], + arrowFfiMetric: Option[SQLMetric], nativeMetrics: CometMetricNode) extends Iterator[ColumnarBatch] { @@ -94,6 +96,7 @@ class CometExecIterator( def getNextBatch(): Option[ColumnarBatch] = { nativeUtil.getNextBatch( numOutputCols, + arrowFfiMetric, (arrayAddrs, schemaAddrs) => { val ctx = TaskContext.get() nativeLib.executePlan(ctx.stageId(), ctx.partitionId(), plan, arrayAddrs, schemaAddrs) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala index b40e1016a..551737b52 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala @@ -69,6 +69,9 @@ case class CometMetricNode(metrics: Map[String, SQLMetric], children: Seq[CometM object CometMetricNode { + val ARROW_FFI_TIME_KEY = "arrow_ffi_time" + val ARROW_FFI_TIME_DESCRIPTION = "Arrow FFI time" + /** * The baseline SQL metrics for DataFusion `BaselineMetrics`. */ @@ -78,7 +81,7 @@ object CometMetricNode { "elapsed_compute" -> SQLMetrics.createNanoTimingMetric( sc, "total time (in ms) spent in this operator"), - "jvm_fetch_time" -> SQLMetrics.createNanoTimingMetric(sc, "time fetching batches from JVM")) + ARROW_FFI_TIME_KEY -> SQLMetrics.createNanoTimingMetric(sc, ARROW_FFI_TIME_DESCRIPTION)) } /** diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala index 4c3f994f9..9b3417a4b 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala @@ -77,9 +77,9 @@ case class CometShuffleExchangeExec( SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext) override lazy val metrics: Map[String, SQLMetric] = Map( "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"), - "jvm_fetch_time" -> SQLMetrics.createNanoTimingMetric( + CometMetricNode.ARROW_FFI_TIME_KEY -> SQLMetrics.createNanoTimingMetric( sparkContext, - "time fetching batches from JVM"), + CometMetricNode.ARROW_FFI_TIME_DESCRIPTION), "numPartitions" -> SQLMetrics.createMetric( sparkContext, "number of partitions")) ++ readMetrics ++ writeMetrics @@ -484,10 +484,10 @@ class CometShuffleWriteProcessor( "data_size" -> metrics("dataSize"), "elapsed_compute" -> metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_WRITE_TIME)) - val nativeMetrics = if (metrics.contains("jvm_fetch_time")) { + val nativeMetrics = if (metrics.contains(CometMetricNode.ARROW_FFI_TIME_KEY)) { CometMetricNode( - nativeSQLMetrics ++ Map("jvm_fetch_time" -> - metrics("jvm_fetch_time"))) + nativeSQLMetrics ++ Map(CometMetricNode.ARROW_FFI_TIME_KEY -> + metrics(CometMetricNode.ARROW_FFI_TIME_KEY))) } else { CometMetricNode(nativeSQLMetrics) } @@ -499,6 +499,7 @@ class CometShuffleWriteProcessor( Seq(newInputs.asInstanceOf[Iterator[ColumnarBatch]]), outputAttributes.length, nativePlan, + None, nativeMetrics) while (cometIter.hasNext) { diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala index dd1526d82..eb754f5e6 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala @@ -121,19 +121,20 @@ object CometExec { inputs: Seq[Iterator[ColumnarBatch]], numOutputCols: Int, nativePlan: Operator): CometExecIterator = { - getCometIterator(inputs, numOutputCols, nativePlan, CometMetricNode(Map.empty)) + getCometIterator(inputs, numOutputCols, nativePlan, None, CometMetricNode(Map.empty)) } def getCometIterator( inputs: Seq[Iterator[ColumnarBatch]], numOutputCols: Int, nativePlan: Operator, + arrowFfiMetric: Option[SQLMetric], nativeMetrics: CometMetricNode): CometExecIterator = { val outputStream = new ByteArrayOutputStream() nativePlan.writeTo(outputStream) outputStream.close() val bytes = outputStream.toByteArray - new CometExecIterator(newIterId, inputs, numOutputCols, bytes, nativeMetrics) + new CometExecIterator(newIterId, inputs, numOutputCols, bytes, arrowFfiMetric, nativeMetrics) } /** @@ -220,6 +221,7 @@ abstract class CometNativeExec extends CometExec { inputs, output.length, serializedPlanCopy, + metrics.get(CometMetricNode.ARROW_FFI_TIME_KEY), nativeMetrics) setSubqueries(it.id, this) From 6fd57233525d9ecf0b2b57fecc94d8770e7e7883 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 26 Nov 2024 07:12:59 -0700 Subject: [PATCH 18/21] record FFI time for CollectLimitExec and TakeOrderedAndProject --- .../spark/sql/comet/CometCollectLimitExec.scala | 3 +++ .../sql/comet/CometTakeOrderedAndProjectExec.scala | 3 +++ .../shuffle/CometShuffleExchangeExec.scala | 13 +++---------- 3 files changed, 9 insertions(+), 10 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometCollectLimitExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometCollectLimitExec.scala index 8ea0b1765..2a698c053 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometCollectLimitExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometCollectLimitExec.scala @@ -54,6 +54,9 @@ case class CometCollectLimitExec( private lazy val readMetrics = SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext) override lazy val metrics: Map[String, SQLMetric] = Map( + CometMetricNode.ARROW_FFI_TIME_KEY -> SQLMetrics.createNanoTimingMetric( + sparkContext, + CometMetricNode.ARROW_FFI_TIME_DESCRIPTION), "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"), "numPartitions" -> SQLMetrics.createMetric( sparkContext, diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala index 6220c809d..cc614e23d 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala @@ -54,6 +54,9 @@ case class CometTakeOrderedAndProjectExec( private lazy val readMetrics = SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext) override lazy val metrics: Map[String, SQLMetric] = Map( + CometMetricNode.ARROW_FFI_TIME_KEY -> SQLMetrics.createNanoTimingMetric( + sparkContext, + CometMetricNode.ARROW_FFI_TIME_DESCRIPTION), "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"), "numPartitions" -> SQLMetrics.createMetric( sparkContext, diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala index 9b3417a4b..af4202328 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala @@ -480,18 +480,11 @@ class CometShuffleWriteProcessor( // Maps native metrics to SQL metrics val nativeSQLMetrics = Map( + CometMetricNode.ARROW_FFI_TIME_KEY -> metrics(CometMetricNode.ARROW_FFI_TIME_KEY), "output_rows" -> metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_RECORDS_WRITTEN), "data_size" -> metrics("dataSize"), "elapsed_compute" -> metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_WRITE_TIME)) - val nativeMetrics = if (metrics.contains(CometMetricNode.ARROW_FFI_TIME_KEY)) { - CometMetricNode( - nativeSQLMetrics ++ Map(CometMetricNode.ARROW_FFI_TIME_KEY -> - metrics(CometMetricNode.ARROW_FFI_TIME_KEY))) - } else { - CometMetricNode(nativeSQLMetrics) - } - // Getting rid of the fake partitionId val newInputs = inputs.asInstanceOf[Iterator[_ <: Product2[Any, Any]]].map(_._2) @@ -499,8 +492,8 @@ class CometShuffleWriteProcessor( Seq(newInputs.asInstanceOf[Iterator[ColumnarBatch]]), outputAttributes.length, nativePlan, - None, - nativeMetrics) + metrics.get(CometMetricNode.ARROW_FFI_TIME_KEY), + CometMetricNode(nativeSQLMetrics)) while (cometIter.hasNext) { cometIter.next() From 56e3eade588e6eb2f680d902e55d1818000848c0 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 26 Nov 2024 10:32:27 -0700 Subject: [PATCH 19/21] save --- native/core/src/execution/datafusion/planner.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/native/core/src/execution/datafusion/planner.rs b/native/core/src/execution/datafusion/planner.rs index 79aaf0c0d..9b237e5e3 100644 --- a/native/core/src/execution/datafusion/planner.rs +++ b/native/core/src/execution/datafusion/planner.rs @@ -1070,6 +1070,8 @@ impl PhysicalPlanner { let partitioning = self .create_partitioning(writer.partitioning.as_ref().unwrap(), child.schema())?; + + let shuffle_writer = Arc::new(ShuffleWriterExec::try_new( Arc::clone(&child.native_plan), partitioning, @@ -1079,10 +1081,11 @@ impl PhysicalPlanner { Ok(( scans, - Arc::new(SparkPlan::new( + Arc::new(SparkPlan::new_with_additional( spark_plan.plan_id, shuffle_writer, vec![Arc::clone(&child)], + vec![Arc::clone(&child.native_plan)] )), )) } From 36e62330225b592923c69768493db9711cf95018 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 26 Nov 2024 10:38:52 -0700 Subject: [PATCH 20/21] Revert "save" This reverts commit 56e3eade588e6eb2f680d902e55d1818000848c0. --- native/core/src/execution/datafusion/planner.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/native/core/src/execution/datafusion/planner.rs b/native/core/src/execution/datafusion/planner.rs index 9b237e5e3..79aaf0c0d 100644 --- a/native/core/src/execution/datafusion/planner.rs +++ b/native/core/src/execution/datafusion/planner.rs @@ -1070,8 +1070,6 @@ impl PhysicalPlanner { let partitioning = self .create_partitioning(writer.partitioning.as_ref().unwrap(), child.schema())?; - - let shuffle_writer = Arc::new(ShuffleWriterExec::try_new( Arc::clone(&child.native_plan), partitioning, @@ -1081,11 +1079,10 @@ impl PhysicalPlanner { Ok(( scans, - Arc::new(SparkPlan::new_with_additional( + Arc::new(SparkPlan::new( spark_plan.plan_id, shuffle_writer, vec![Arc::clone(&child)], - vec![Arc::clone(&child.native_plan)] )), )) } From ff26736a5fc50eb749a2164eb63306f3e4be05e8 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 30 Nov 2024 08:23:13 -0700 Subject: [PATCH 21/21] revert FFI timing changes, which are now in a separate PR --- .../org/apache/comet/vector/NativeUtil.scala | 9 --- docs/source/user-guide/tuning.md | 14 ++-- .../execution/datafusion/shuffle_writer.rs | 6 ++ native/core/src/execution/operators/scan.rs | 17 +---- .../org/apache/comet/CometExecIterator.scala | 3 - .../sql/comet/CometCollectLimitExec.scala | 3 - .../spark/sql/comet/CometMetricNode.scala | 64 +++++++++---------- .../CometTakeOrderedAndProjectExec.scala | 3 - .../shuffle/CometShuffleExchangeExec.scala | 16 +++-- .../apache/spark/sql/comet/operators.scala | 4 -- 10 files changed, 53 insertions(+), 86 deletions(-) diff --git a/common/src/main/scala/org/apache/comet/vector/NativeUtil.scala b/common/src/main/scala/org/apache/comet/vector/NativeUtil.scala index 028f81796..72472a540 100644 --- a/common/src/main/scala/org/apache/comet/vector/NativeUtil.scala +++ b/common/src/main/scala/org/apache/comet/vector/NativeUtil.scala @@ -26,7 +26,6 @@ import org.apache.arrow.vector.VectorSchemaRoot import org.apache.arrow.vector.dictionary.DictionaryProvider import org.apache.spark.SparkException import org.apache.spark.sql.comet.util.Utils -import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.comet.CometArrowAllocator @@ -149,17 +148,12 @@ class NativeUtil { */ def getNextBatch( numOutputCols: Int, - arrowFfiMetric: Option[SQLMetric], func: (Array[Long], Array[Long]) => Long): Option[ColumnarBatch] = { - - val start = System.nanoTime() val (arrays, schemas) = allocateArrowStructs(numOutputCols) val arrayAddrs = arrays.map(_.memoryAddress()) val schemaAddrs = schemas.map(_.memoryAddress()) - var arrowFfiTime = System.nanoTime() - start - val result = func(arrayAddrs, schemaAddrs) result match { @@ -167,10 +161,7 @@ class NativeUtil { // EOF None case numRows => - val start = System.nanoTime() val cometVectors = importVector(arrays, schemas) - arrowFfiTime += System.nanoTime() - start - arrowFfiMetric.foreach(_.add(arrowFfiTime)) Some(new ColumnarBatch(cometVectors.toArray, numRows.toInt)) case flag => throw new IllegalStateException(s"Invalid native flag: $flag") diff --git a/docs/source/user-guide/tuning.md b/docs/source/user-guide/tuning.md index a84ff5a34..e0d7c1bc9 100644 --- a/docs/source/user-guide/tuning.md +++ b/docs/source/user-guide/tuning.md @@ -113,14 +113,8 @@ Some Comet metrics are not directly comparable to Spark metrics in some cases: Comet also adds some custom metrics: -### CometScanExec +### ShuffleWriterExec -| Metric | Description | -| ---------- | ----------------------------------------------------------------------------------------------------------------------------------------------- | -| `scanTime` | Total time to read Parquet batches into JVM. This does not include the Arrow FFI cost of exporting these batches to native code for processing. | - -### Common to all Comet Operators - -| Metric | Description | -| ---------------- | ------------------------------------------------------------------------------------------------ | -| `arrow_ffi_time` | Measure the time it takes to transfer Arrow batches between JVM and native code using Arrow FFI. | +| Metric | Description | +| ---------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| `jvm_fetch_time` | Measure the time it takes for `ShuffleWriterExec` to fetch batches from the JVM. Note that this does not include the execution time of the query that produced the input batches. | diff --git a/native/core/src/execution/datafusion/shuffle_writer.rs b/native/core/src/execution/datafusion/shuffle_writer.rs index de69a1532..c79eeeb4a 100644 --- a/native/core/src/execution/datafusion/shuffle_writer.rs +++ b/native/core/src/execution/datafusion/shuffle_writer.rs @@ -139,6 +139,7 @@ impl ExecutionPlan for ShuffleWriterExec { ) -> Result { let input = self.input.execute(partition, Arc::clone(&context))?; let metrics = ShuffleRepartitionerMetrics::new(&self.metrics, 0); + let jvm_fetch_time = MetricBuilder::new(&self.metrics).subset_time("jvm_fetch_time", 0); Ok(Box::pin(RecordBatchStreamAdapter::new( self.schema(), @@ -151,6 +152,7 @@ impl ExecutionPlan for ShuffleWriterExec { self.partitioning.clone(), metrics, context, + jvm_fetch_time, ) .map_err(|e| ArrowError::ExternalError(Box::new(e))), ) @@ -1092,6 +1094,7 @@ async fn external_shuffle( partitioning: Partitioning, metrics: ShuffleRepartitionerMetrics, context: Arc, + jvm_fetch_time: Time, ) -> Result { let schema = input.schema(); let mut repartitioner = ShuffleRepartitioner::new( @@ -1106,7 +1109,10 @@ async fn external_shuffle( ); loop { + let mut timer = jvm_fetch_time.timer(); let b = input.next().await; + timer.stop(); + match b { Some(batch_result) => { // Block on the repartitioner to insert the batch and shuffle the rows diff --git a/native/core/src/execution/operators/scan.rs b/native/core/src/execution/operators/scan.rs index cc9a1176f..2cb8a84d9 100644 --- a/native/core/src/execution/operators/scan.rs +++ b/native/core/src/execution/operators/scan.rs @@ -77,8 +77,6 @@ pub struct ScanExec { metrics: ExecutionPlanMetricsSet, /// Baseline metrics baseline_metrics: BaselineMetrics, - /// Timer - arrow_ffi_time: Time, } impl ScanExec { @@ -90,7 +88,6 @@ impl ScanExec { ) -> Result { let metrics_set = ExecutionPlanMetricsSet::default(); let baseline_metrics = BaselineMetrics::new(&metrics_set, 0); - let arrow_ffi_time = MetricBuilder::new(&metrics_set).subset_time("arrow_ffi_time", 0); // Scan's schema is determined by the input batch, so we need to set it before execution. // Note that we determine if arrays are dictionary-encoded based on the @@ -100,12 +97,8 @@ impl ScanExec { // Dictionary-encoded primitive arrays are always unpacked. let first_batch = if let Some(input_source) = input_source.as_ref() { let mut timer = baseline_metrics.elapsed_compute().timer(); - let batch = ScanExec::get_next( - exec_context_id, - input_source.as_obj(), - data_types.len(), - &arrow_ffi_time, - )?; + let batch = + ScanExec::get_next(exec_context_id, input_source.as_obj(), data_types.len())?; timer.stop(); batch } else { @@ -131,7 +124,6 @@ impl ScanExec { cache, metrics: metrics_set, baseline_metrics, - arrow_ffi_time, schema, }) } @@ -179,7 +171,6 @@ impl ScanExec { self.exec_context_id, self.input_source.as_ref().unwrap().as_obj(), self.data_types.len(), - &self.arrow_ffi_time, )?; *current_batch = Some(next_batch); } @@ -194,7 +185,6 @@ impl ScanExec { exec_context_id: i64, iter: &JObject, num_cols: usize, - arrow_ffi_time: &Time, ) -> Result { if exec_context_id == TEST_EXEC_CONTEXT_ID { // This is a unit test. We don't need to call JNI. @@ -207,7 +197,6 @@ impl ScanExec { exec_context_id )))); } - let mut timer = arrow_ffi_time.timer(); let mut env = JVMClasses::get_env()?; @@ -266,8 +255,6 @@ impl ScanExec { } } - timer.stop(); - Ok(InputBatch::new(inputs, Some(num_rows as usize))) } } diff --git a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala index 860fe4901..bff3e7925 100644 --- a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala +++ b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala @@ -21,7 +21,6 @@ package org.apache.comet import org.apache.spark._ import org.apache.spark.sql.comet.CometMetricNode -import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.vectorized._ import org.apache.comet.CometConf.{COMET_BATCH_SIZE, COMET_BLOCKING_THREADS, COMET_DEBUG_ENABLED, COMET_EXPLAIN_NATIVE_ENABLED, COMET_WORKER_THREADS} @@ -50,7 +49,6 @@ class CometExecIterator( inputs: Seq[Iterator[ColumnarBatch]], numOutputCols: Int, protobufQueryPlan: Array[Byte], - arrowFfiMetric: Option[SQLMetric], nativeMetrics: CometMetricNode, numParts: Int, partitionIndex: Int) @@ -104,7 +102,6 @@ class CometExecIterator( nativeUtil.getNextBatch( numOutputCols, - arrowFfiMetric, (arrayAddrs, schemaAddrs) => { val ctx = TaskContext.get() nativeLib.executePlan(ctx.stageId(), partitionIndex, plan, arrayAddrs, schemaAddrs) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometCollectLimitExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometCollectLimitExec.scala index 2a698c053..8ea0b1765 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometCollectLimitExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometCollectLimitExec.scala @@ -54,9 +54,6 @@ case class CometCollectLimitExec( private lazy val readMetrics = SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext) override lazy val metrics: Map[String, SQLMetric] = Map( - CometMetricNode.ARROW_FFI_TIME_KEY -> SQLMetrics.createNanoTimingMetric( - sparkContext, - CometMetricNode.ARROW_FFI_TIME_DESCRIPTION), "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"), "numPartitions" -> SQLMetrics.createMetric( sparkContext, diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala index 551737b52..47c89d943 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala @@ -69,9 +69,6 @@ case class CometMetricNode(metrics: Map[String, SQLMetric], children: Seq[CometM object CometMetricNode { - val ARROW_FFI_TIME_KEY = "arrow_ffi_time" - val ARROW_FFI_TIME_DESCRIPTION = "Arrow FFI time" - /** * The baseline SQL metrics for DataFusion `BaselineMetrics`. */ @@ -80,8 +77,7 @@ object CometMetricNode { "output_rows" -> SQLMetrics.createMetric(sc, "number of output rows"), "elapsed_compute" -> SQLMetrics.createNanoTimingMetric( sc, - "total time (in ms) spent in this operator"), - ARROW_FFI_TIME_KEY -> SQLMetrics.createNanoTimingMetric(sc, ARROW_FFI_TIME_DESCRIPTION)) + "total time (in ms) spent in this operator")) } /** @@ -97,41 +93,41 @@ object CometMetricNode { * SQL Metrics for DataFusion HashJoin */ def hashJoinMetrics(sc: SparkContext): Map[String, SQLMetric] = { - baselineMetrics(sc) ++ - Map( - "build_time" -> - SQLMetrics.createNanoTimingMetric(sc, "Total time for collecting build-side of join"), - "build_input_batches" -> - SQLMetrics.createMetric(sc, "Number of batches consumed by build-side"), - "build_input_rows" -> - SQLMetrics.createMetric(sc, "Number of rows consumed by build-side"), - "build_mem_used" -> - SQLMetrics.createSizeMetric(sc, "Memory used by build-side"), - "input_batches" -> - SQLMetrics.createMetric(sc, "Number of batches consumed by probe-side"), - "input_rows" -> - SQLMetrics.createMetric(sc, "Number of rows consumed by probe-side"), - "output_batches" -> SQLMetrics.createMetric(sc, "Number of batches produced"), - "join_time" -> SQLMetrics.createNanoTimingMetric(sc, "Total time for joining")) + Map( + "build_time" -> + SQLMetrics.createNanoTimingMetric(sc, "Total time for collecting build-side of join"), + "build_input_batches" -> + SQLMetrics.createMetric(sc, "Number of batches consumed by build-side"), + "build_input_rows" -> + SQLMetrics.createMetric(sc, "Number of rows consumed by build-side"), + "build_mem_used" -> + SQLMetrics.createSizeMetric(sc, "Memory used by build-side"), + "input_batches" -> + SQLMetrics.createMetric(sc, "Number of batches consumed by probe-side"), + "input_rows" -> + SQLMetrics.createMetric(sc, "Number of rows consumed by probe-side"), + "output_batches" -> SQLMetrics.createMetric(sc, "Number of batches produced"), + "output_rows" -> SQLMetrics.createMetric(sc, "Number of rows produced"), + "join_time" -> SQLMetrics.createNanoTimingMetric(sc, "Total time for joining")) } /** * SQL Metrics for DataFusion SortMergeJoin */ def sortMergeJoinMetrics(sc: SparkContext): Map[String, SQLMetric] = { - baselineMetrics(sc) ++ - Map( - "peak_mem_used" -> - SQLMetrics.createSizeMetric(sc, "Memory used by build-side"), - "input_batches" -> - SQLMetrics.createMetric(sc, "Number of batches consumed by probe-side"), - "input_rows" -> - SQLMetrics.createMetric(sc, "Number of rows consumed by probe-side"), - "output_batches" -> SQLMetrics.createMetric(sc, "Number of batches produced"), - "join_time" -> SQLMetrics.createNanoTimingMetric(sc, "Total time for joining"), - "spill_count" -> SQLMetrics.createMetric(sc, "Count of spills"), - "spilled_bytes" -> SQLMetrics.createSizeMetric(sc, "Total spilled bytes"), - "spilled_rows" -> SQLMetrics.createMetric(sc, "Total spilled rows")) + Map( + "peak_mem_used" -> + SQLMetrics.createSizeMetric(sc, "Memory used by build-side"), + "input_batches" -> + SQLMetrics.createMetric(sc, "Number of batches consumed by probe-side"), + "input_rows" -> + SQLMetrics.createMetric(sc, "Number of rows consumed by probe-side"), + "output_batches" -> SQLMetrics.createMetric(sc, "Number of batches produced"), + "output_rows" -> SQLMetrics.createMetric(sc, "Number of rows produced"), + "join_time" -> SQLMetrics.createNanoTimingMetric(sc, "Total time for joining"), + "spill_count" -> SQLMetrics.createMetric(sc, "Count of spills"), + "spilled_bytes" -> SQLMetrics.createSizeMetric(sc, "Total spilled bytes"), + "spilled_rows" -> SQLMetrics.createMetric(sc, "Total spilled rows")) } /** diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala index 92d996c32..5582f4d68 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala @@ -54,9 +54,6 @@ case class CometTakeOrderedAndProjectExec( private lazy val readMetrics = SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext) override lazy val metrics: Map[String, SQLMetric] = Map( - CometMetricNode.ARROW_FFI_TIME_KEY -> SQLMetrics.createNanoTimingMetric( - sparkContext, - CometMetricNode.ARROW_FFI_TIME_DESCRIPTION), "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"), "numPartitions" -> SQLMetrics.createMetric( sparkContext, diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala index bcb8d094b..a7a33c40d 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala @@ -77,9 +77,9 @@ case class CometShuffleExchangeExec( SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext) override lazy val metrics: Map[String, SQLMetric] = Map( "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"), - CometMetricNode.ARROW_FFI_TIME_KEY -> SQLMetrics.createNanoTimingMetric( + "jvm_fetch_time" -> SQLMetrics.createNanoTimingMetric( sparkContext, - CometMetricNode.ARROW_FFI_TIME_DESCRIPTION), + "time fetching batches from JVM"), "numPartitions" -> SQLMetrics.createMetric( sparkContext, "number of partitions")) ++ readMetrics ++ writeMetrics @@ -482,11 +482,18 @@ class CometShuffleWriteProcessor( // Maps native metrics to SQL metrics val nativeSQLMetrics = Map( - CometMetricNode.ARROW_FFI_TIME_KEY -> metrics(CometMetricNode.ARROW_FFI_TIME_KEY), "output_rows" -> metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_RECORDS_WRITTEN), "data_size" -> metrics("dataSize"), "elapsed_compute" -> metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_WRITE_TIME)) + val nativeMetrics = if (metrics.contains("jvm_fetch_time")) { + CometMetricNode( + nativeSQLMetrics ++ Map("jvm_fetch_time" -> + metrics("jvm_fetch_time"))) + } else { + CometMetricNode(nativeSQLMetrics) + } + // Getting rid of the fake partitionId val newInputs = inputs.asInstanceOf[Iterator[_ <: Product2[Any, Any]]].map(_._2) @@ -494,8 +501,7 @@ class CometShuffleWriteProcessor( Seq(newInputs.asInstanceOf[Iterator[ColumnarBatch]]), outputAttributes.length, nativePlan, - metrics.get(CometMetricNode.ARROW_FFI_TIME_KEY), - CometMetricNode(nativeSQLMetrics), + nativeMetrics, numParts, context.partitionId()) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala index d2003b52a..77188312e 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala @@ -127,7 +127,6 @@ object CometExec { inputs, numOutputCols, nativePlan, - None, CometMetricNode(Map.empty), numParts, partitionIdx) @@ -137,7 +136,6 @@ object CometExec { inputs: Seq[Iterator[ColumnarBatch]], numOutputCols: Int, nativePlan: Operator, - arrowFfiMetric: Option[SQLMetric], nativeMetrics: CometMetricNode, numParts: Int, partitionIdx: Int): CometExecIterator = { @@ -150,7 +148,6 @@ object CometExec { inputs, numOutputCols, bytes, - arrowFfiMetric, nativeMetrics, numParts, partitionIdx) @@ -243,7 +240,6 @@ abstract class CometNativeExec extends CometExec { inputs, output.length, serializedPlanCopy, - metrics.get(CometMetricNode.ARROW_FFI_TIME_KEY), nativeMetrics, numParts, partitionIndex)