From a7baebcb4bc6f21eafdc92050a830cb00033bc8b Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Fri, 31 May 2024 10:22:44 +0800 Subject: [PATCH 1/5] [VL] Do not skip updating children's metrics while visiting an operator with NoopMetricsUpdater --- .../metrics/HashAggregateMetricsUpdater.scala | 2 +- .../metrics/HashJoinMetricsUpdater.scala | 2 +- .../apache/gluten/metrics/MetricsUtil.scala | 6 ++-- .../gluten/execution/TopNTransformer.scala | 4 +-- .../execution/WholeStageTransformer.scala | 6 ++-- .../columnar/enumerated/RemoveFilter.scala | 4 +-- .../gluten/metrics/MetricsUpdater.scala | 31 ++++++++++++---- .../apache/gluten/metrics/MetricsUtil.scala | 36 ++++++++++--------- 8 files changed, 56 insertions(+), 35 deletions(-) diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/HashAggregateMetricsUpdater.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/HashAggregateMetricsUpdater.scala index e2014e5b8b84..b035d7a04fb0 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/HashAggregateMetricsUpdater.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/HashAggregateMetricsUpdater.scala @@ -65,7 +65,7 @@ class HashAggregateMetricsUpdater(val metrics: Map[String, SQLMetric]) } } } catch { - case e: Throwable => + case e: Exception => logError(s"Updating native metrics failed due to ${e.getCause}.") throw e } diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/HashJoinMetricsUpdater.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/HashJoinMetricsUpdater.scala index 3c35286c1c13..ca891bac27c6 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/HashJoinMetricsUpdater.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/HashJoinMetricsUpdater.scala @@ -104,7 +104,7 @@ class HashJoinMetricsUpdater(val metrics: Map[String, SQLMetric]) } } } catch { - case e: Throwable => + case e: Exception => logError(s"Updating native metrics failed due to ${e.getCause}.") throw e } diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala index a6dfb3dbcb1f..1376dc6a82d1 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala @@ -41,7 +41,7 @@ object MetricsUtil extends Logging { case t: TransformSupport => MetricsUpdaterTree(t.metricsUpdater(), t.children.map(treeifyMetricsUpdaters)) case _ => - MetricsUpdaterTree(NoopMetricsUpdater, Seq()) + MetricsUpdaterTree(MetricsUpdater.Terminate, Seq()) } } @@ -107,7 +107,7 @@ object MetricsUtil extends Logging { s"Updating native metrics failed due to the wrong size of metrics data: " + s"$numNativeMetrics") () - } else if (mutNode.updater == NoopMetricsUpdater) { + } else if (mutNode.updater == MetricsUpdater.Terminate) { () } else { updateTransformerMetricsInternal( @@ -159,7 +159,7 @@ object MetricsUtil extends Logging { mutNode.children.foreach { child => - if (child.updater != NoopMetricsUpdater) { + if (child.updater != MetricsUpdater.Terminate) { val result = updateTransformerMetricsInternal( child, relMap, diff --git a/backends-velox/src/main/scala/org/apache/gluten/execution/TopNTransformer.scala b/backends-velox/src/main/scala/org/apache/gluten/execution/TopNTransformer.scala index c2d12415c78b..01c89bee217b 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/execution/TopNTransformer.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/execution/TopNTransformer.scala @@ -19,7 +19,7 @@ package org.apache.gluten.execution import org.apache.gluten.backendsapi.BackendsApiManager import org.apache.gluten.expression.{ConverterUtils, ExpressionConverter} import org.apache.gluten.extension.ValidationResult -import org.apache.gluten.metrics.{MetricsUpdater, NoopMetricsUpdater} +import org.apache.gluten.metrics.MetricsUpdater import org.apache.gluten.substrait.`type`.TypeBuilder import org.apache.gluten.substrait.SubstraitContext import org.apache.gluten.substrait.extensions.ExtensionBuilder @@ -114,5 +114,5 @@ case class TopNTransformer( } } - override def metricsUpdater(): MetricsUpdater = NoopMetricsUpdater // TODO + override def metricsUpdater(): MetricsUpdater = MetricsUpdater.Todo // TODO } diff --git a/gluten-core/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala b/gluten-core/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala index ed691fc09613..7dfa0563d743 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala @@ -21,7 +21,7 @@ import org.apache.gluten.backendsapi.BackendsApiManager import org.apache.gluten.exception.GlutenException import org.apache.gluten.expression._ import org.apache.gluten.extension.GlutenPlan -import org.apache.gluten.metrics.{GlutenTimeMetric, MetricsUpdater, NoopMetricsUpdater} +import org.apache.gluten.metrics.{GlutenTimeMetric, MetricsUpdater} import org.apache.gluten.substrait.`type`.{TypeBuilder, TypeNode} import org.apache.gluten.substrait.SubstraitContext import org.apache.gluten.substrait.plan.{PlanBuilder, PlanNode} @@ -350,7 +350,7 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f override def metricsUpdater(): MetricsUpdater = { child match { case transformer: TransformSupport => transformer.metricsUpdater() - case _ => NoopMetricsUpdater + case _ => MetricsUpdater.None } } @@ -361,7 +361,7 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f case _ => false } .map(_.asInstanceOf[TransformSupport].metricsUpdater()) - .getOrElse(NoopMetricsUpdater) + .getOrElse(MetricsUpdater.None) } override protected def withNewChildInternal(newChild: SparkPlan): WholeStageTransformer = diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/RemoveFilter.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/RemoveFilter.scala index b980c24227d5..413ab1bc8527 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/RemoveFilter.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/RemoveFilter.scala @@ -17,7 +17,7 @@ package org.apache.gluten.extension.columnar.enumerated import org.apache.gluten.execution._ -import org.apache.gluten.metrics.{MetricsUpdater, NoopMetricsUpdater} +import org.apache.gluten.metrics.MetricsUpdater import org.apache.gluten.ras.path.Pattern._ import org.apache.gluten.ras.path.Pattern.Matchers._ import org.apache.gluten.ras.rule.{RasRule, Shape} @@ -71,7 +71,7 @@ object RemoveFilter extends RasRule[SparkPlan] { // spark.sql.adaptive.logLevel=ERROR. case class NoopFilter(override val child: SparkPlan, override val output: Seq[Attribute]) extends UnaryTransformSupport { - override def metricsUpdater(): MetricsUpdater = NoopMetricsUpdater + override def metricsUpdater(): MetricsUpdater = MetricsUpdater.None override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = copy(newChild) override def outputPartitioning: Partitioning = child.outputPartitioning override def outputOrdering: Seq[SortOrder] = child.outputOrdering diff --git a/gluten-core/src/main/scala/org/apache/gluten/metrics/MetricsUpdater.scala b/gluten-core/src/main/scala/org/apache/gluten/metrics/MetricsUpdater.scala index 0a622ba0b37d..5201df3b3472 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/metrics/MetricsUpdater.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/metrics/MetricsUpdater.scala @@ -16,7 +16,6 @@ */ package org.apache.gluten.metrics -import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.utils.OASPackageBridge.InputMetricsWrapper /** @@ -26,16 +25,34 @@ import org.apache.spark.sql.utils.OASPackageBridge.InputMetricsWrapper * TODO: place it to some other where since it's used not only by whole stage facilities */ trait MetricsUpdater extends Serializable { + def updateInputMetrics(inputMetrics: InputMetricsWrapper): Unit = {} + def updateNativeMetrics(operatorMetrics: IOperatorMetrics): Unit = {} +} - def metrics: Map[String, SQLMetric] +object MetricsUpdater { + // An empty metrics updater. Used when the operator generates native metrics but + // it's yet unwanted to update the metrics in JVM side. + object Todo extends MetricsUpdater {} - def updateInputMetrics(inputMetrics: InputMetricsWrapper): Unit = {} + // Used when the operator doesn't generate native metrics. It could be because + // the operator doesn't generate any native query plan. + object None extends MetricsUpdater { + override def updateInputMetrics(inputMetrics: InputMetricsWrapper): Unit = + throw new UnsupportedOperationException() + override def updateNativeMetrics(operatorMetrics: IOperatorMetrics): Unit = + throw new UnsupportedOperationException() + } - def updateNativeMetrics(operatorMetrics: IOperatorMetrics): Unit = {} + // Indicates a branch of a MetricsUpdaterTree is terminated. It's not bound to + // any operators. + object Terminate extends MetricsUpdater { + override def updateInputMetrics(inputMetrics: InputMetricsWrapper): Unit = + throw new UnsupportedOperationException() + override def updateNativeMetrics(operatorMetrics: IOperatorMetrics): Unit = + throw new UnsupportedOperationException() + } } final case class MetricsUpdaterTree(updater: MetricsUpdater, children: Seq[MetricsUpdaterTree]) -object NoopMetricsUpdater extends MetricsUpdater { - override def metrics: Map[String, SQLMetric] = Map.empty -} +object MetricsUpdaterTree {} diff --git a/gluten-data/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala b/gluten-data/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala index f11800b89c31..98ff9eddccc9 100644 --- a/gluten-data/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala +++ b/gluten-data/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala @@ -54,10 +54,13 @@ object MetricsUtil extends Logging { MetricsUpdaterTree( smj.metricsUpdater(), Seq(treeifyMetricsUpdaters(smj.bufferedPlan), treeifyMetricsUpdaters(smj.streamedPlan))) + case t: TransformSupport if t.metricsUpdater() == MetricsUpdater.None => + assert(t.children.size == 1, "MetricsUpdater.None can only be used on unary operator") + treeifyMetricsUpdaters(t.children.head) case t: TransformSupport => MetricsUpdaterTree(t.metricsUpdater(), t.children.map(treeifyMetricsUpdaters)) case _ => - MetricsUpdaterTree(NoopMetricsUpdater, Seq()) + MetricsUpdaterTree(MetricsUpdater.Terminate, Seq()) } } @@ -180,6 +183,8 @@ object MetricsUtil extends Logging { ) } + // FIXME: Metrics updating code is too magical to maintain. Tree-walking algorithm should be made + // more declarative than by counting down some kind of counters that don't have fixed definition. /** * @return * operator index and metrics index @@ -192,6 +197,9 @@ object MetricsUtil extends Logging { metricsIdx: Int, joinParamsMap: JMap[JLong, JoinParams], aggParamsMap: JMap[JLong, AggregationParams]): (JLong, Int) = { + if (mutNode.updater == MetricsUpdater.Terminate) { + return (operatorIdx, metricsIdx) + } val operatorMetrics = new JArrayList[OperatorMetrics]() var curMetricsIdx = metricsIdx relMap @@ -245,18 +253,16 @@ object MetricsUtil extends Logging { mutNode.children.foreach { child => - if (child.updater != NoopMetricsUpdater) { - val result = updateTransformerMetricsInternal( - child, - relMap, - newOperatorIdx, - metrics, - newMetricsIdx, - joinParamsMap, - aggParamsMap) - newOperatorIdx = result._1 - newMetricsIdx = result._2 - } + val result = updateTransformerMetricsInternal( + child, + relMap, + newOperatorIdx, + metrics, + newMetricsIdx, + joinParamsMap, + aggParamsMap) + newOperatorIdx = result._1 + newMetricsIdx = result._2 } (newOperatorIdx, newMetricsIdx) @@ -292,8 +298,6 @@ object MetricsUtil extends Logging { val numNativeMetrics = metrics.inputRows.length if (numNativeMetrics == 0) { () - } else if (mutNode.updater == NoopMetricsUpdater) { - () } else { updateTransformerMetricsInternal( mutNode, @@ -305,7 +309,7 @@ object MetricsUtil extends Logging { aggParamsMap) } } catch { - case e: Throwable => + case e: Exception => logWarning(s"Updating native metrics failed due to ${e.getCause}.") () } From 4cd484bb78b26be766ffbe80bbca6f1586323ffb Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Tue, 4 Jun 2024 15:36:09 +0800 Subject: [PATCH 2/5] Test cases --- .../gluten/execution/VeloxMetricsSuite.scala | 36 +++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxMetricsSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxMetricsSuite.scala index ce8450fea423..3cbbed9512e0 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxMetricsSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxMetricsSuite.scala @@ -19,6 +19,7 @@ package org.apache.gluten.execution import org.apache.gluten.GlutenConfig import org.apache.gluten.sql.shims.SparkShimLoader +import org.apache.spark.SparkConf import org.apache.spark.sql.execution.CommandResultExec import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.internal.SQLConf @@ -52,6 +53,11 @@ class VeloxMetricsSuite extends VeloxWholeStageTransformerSuite with AdaptiveSpa super.afterAll() } + override protected def sparkConf: SparkConf = { + super.sparkConf + .set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager") + } + test("test sort merge join metrics") { withSQLConf( GlutenConfig.COLUMNAR_FPRCE_SHUFFLED_HASH_JOIN_ENABLED.key -> "false", @@ -164,4 +170,34 @@ class VeloxMetricsSuite extends VeloxWholeStageTransformerSuite with AdaptiveSpa } } } + + test("Metrics of window") { + runQueryAndCompare("SELECT c1, c2, sum(c2) over (partition by c1) as s FROM metrics_t1") { + df => + val window = find(df.queryExecution.executedPlan) { + case _: WindowExecTransformer => true + case _ => false + } + assert(window.isDefined) + val metrics = window.get.metrics + assert(metrics("numOutputRows").value == 100) + assert(metrics("outputVectors").value == 2) + } + } + + test("Metrics of noop filter's children") { + withSQLConf("spark.gluten.ras.enabled" -> "true") { + runQueryAndCompare("SELECT c1, c2 FROM metrics_t1 where c1 < 50") { + df => + val scan = find(df.queryExecution.executedPlan) { + case _: FileSourceScanExecTransformer => true + case _ => false + } + assert(scan.isDefined) + val metrics = scan.get.metrics + assert(metrics("rawInputRows").value == 100) + assert(metrics("outputVectors").value == 1) + } + } + } } From 33f9f77c8b0fe6944d9a9ec05a49670ad4996506 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Tue, 4 Jun 2024 15:43:52 +0800 Subject: [PATCH 3/5] fixup --- .../gluten/extension/columnar/enumerated/RemoveFilter.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/RemoveFilter.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/RemoveFilter.scala index 413ab1bc8527..5d7209dfbfb4 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/RemoveFilter.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/RemoveFilter.scala @@ -54,7 +54,7 @@ object RemoveFilter extends RasRule[SparkPlan] { leaf(clazz(classOf[BasicScanExecTransformer])) ).build()) - // A noop filter placeholder that indicates that all conditions are pushed down to scan. + // A noop filter placeholder that indicates that all conditions were pushed down to scan. // // This operator has zero cost in cost model to avoid planner from choosing the // original filter-scan that doesn't have all conditions pushed down to scan. From 929b499b30f3e15733f8f2fb3c62694a5ea3941a Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Tue, 4 Jun 2024 16:09:18 +0800 Subject: [PATCH 4/5] fixup --- .../src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gluten-data/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala b/gluten-data/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala index 98ff9eddccc9..0c387b429212 100644 --- a/gluten-data/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala +++ b/gluten-data/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala @@ -184,7 +184,7 @@ object MetricsUtil extends Logging { } // FIXME: Metrics updating code is too magical to maintain. Tree-walking algorithm should be made - // more declarative than by counting down some kind of counters that don't have fixed definition. + // more declarative than by counting down these counters that don't have fixed definition. /** * @return * operator index and metrics index From 2625f12217dec1ba92f511518a64c5284922c913 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Wed, 5 Jun 2024 09:10:49 +0800 Subject: [PATCH 5/5] fixup --- .../gluten/execution/VeloxMetricsSuite.scala | 44 +++++++++---------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxMetricsSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxMetricsSuite.scala index 3cbbed9512e0..468f26259219 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxMetricsSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxMetricsSuite.scala @@ -149,28 +149,6 @@ class VeloxMetricsSuite extends VeloxWholeStageTransformerSuite with AdaptiveSpa } } - test("Write metrics") { - if (SparkShimLoader.getSparkVersion.startsWith("3.4")) { - withSQLConf(("spark.gluten.sql.native.writer.enabled", "true")) { - runQueryAndCompare( - "Insert into table metrics_t1 values(1 , 2)" - ) { - df => - val plan = - df.queryExecution.executedPlan.asInstanceOf[CommandResultExec].commandPhysicalPlan - val write = find(plan) { - case _: WriteFilesExecTransformer => true - case _ => false - } - assert(write.isDefined) - val metrics = write.get.metrics - assert(metrics("physicalWrittenBytes").value > 0) - assert(metrics("numWrittenFiles").value == 1) - } - } - } - } - test("Metrics of window") { runQueryAndCompare("SELECT c1, c2, sum(c2) over (partition by c1) as s FROM metrics_t1") { df => @@ -200,4 +178,26 @@ class VeloxMetricsSuite extends VeloxWholeStageTransformerSuite with AdaptiveSpa } } } + + test("Write metrics") { + if (SparkShimLoader.getSparkVersion.startsWith("3.4")) { + withSQLConf(("spark.gluten.sql.native.writer.enabled", "true")) { + runQueryAndCompare( + "Insert into table metrics_t1 values(1 , 2)" + ) { + df => + val plan = + df.queryExecution.executedPlan.asInstanceOf[CommandResultExec].commandPhysicalPlan + val write = find(plan) { + case _: WriteFilesExecTransformer => true + case _ => false + } + assert(write.isDefined) + val metrics = write.get.metrics + assert(metrics("physicalWrittenBytes").value > 0) + assert(metrics("numWrittenFiles").value == 1) + } + } + } + } }