diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/HashAggregateMetricsUpdater.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/HashAggregateMetricsUpdater.scala index e2014e5b8b844..b035d7a04fb0a 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/HashAggregateMetricsUpdater.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/HashAggregateMetricsUpdater.scala @@ -65,7 +65,7 @@ class HashAggregateMetricsUpdater(val metrics: Map[String, SQLMetric]) } } } catch { - case e: Throwable => + case e: Exception => logError(s"Updating native metrics failed due to ${e.getCause}.") throw e } diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/HashJoinMetricsUpdater.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/HashJoinMetricsUpdater.scala index 3c35286c1c13f..ca891bac27c63 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/HashJoinMetricsUpdater.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/HashJoinMetricsUpdater.scala @@ -104,7 +104,7 @@ class HashJoinMetricsUpdater(val metrics: Map[String, SQLMetric]) } } } catch { - case e: Throwable => + case e: Exception => logError(s"Updating native metrics failed due to ${e.getCause}.") throw e } diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala index a6dfb3dbcb1fd..1376dc6a82d1d 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala @@ -41,7 +41,7 @@ object MetricsUtil extends Logging { case t: TransformSupport => MetricsUpdaterTree(t.metricsUpdater(), t.children.map(treeifyMetricsUpdaters)) case _ => - MetricsUpdaterTree(NoopMetricsUpdater, Seq()) + MetricsUpdaterTree(MetricsUpdater.Terminate, Seq()) } } @@ -107,7 +107,7 @@ object MetricsUtil extends Logging { s"Updating native metrics failed due to the wrong size of metrics data: " + s"$numNativeMetrics") () - } else if (mutNode.updater == NoopMetricsUpdater) { + } else if (mutNode.updater == MetricsUpdater.Terminate) { () } else { updateTransformerMetricsInternal( @@ -159,7 +159,7 @@ object MetricsUtil extends Logging { mutNode.children.foreach { child => - if (child.updater != NoopMetricsUpdater) { + if (child.updater != MetricsUpdater.Terminate) { val result = updateTransformerMetricsInternal( child, relMap, diff --git a/backends-velox/src/main/scala/org/apache/gluten/execution/TopNTransformer.scala b/backends-velox/src/main/scala/org/apache/gluten/execution/TopNTransformer.scala index c2d12415c78b0..01c89bee217b8 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/execution/TopNTransformer.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/execution/TopNTransformer.scala @@ -19,7 +19,7 @@ package org.apache.gluten.execution import org.apache.gluten.backendsapi.BackendsApiManager import org.apache.gluten.expression.{ConverterUtils, ExpressionConverter} import org.apache.gluten.extension.ValidationResult -import org.apache.gluten.metrics.{MetricsUpdater, NoopMetricsUpdater} +import org.apache.gluten.metrics.MetricsUpdater import org.apache.gluten.substrait.`type`.TypeBuilder import org.apache.gluten.substrait.SubstraitContext import org.apache.gluten.substrait.extensions.ExtensionBuilder @@ -114,5 +114,5 @@ case class TopNTransformer( } } - override def metricsUpdater(): MetricsUpdater = NoopMetricsUpdater // TODO + override def metricsUpdater(): MetricsUpdater = MetricsUpdater.Todo // TODO } diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxMetricsSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxMetricsSuite.scala index ce8450fea4231..63008f6477379 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxMetricsSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxMetricsSuite.scala @@ -19,7 +19,8 @@ package org.apache.gluten.execution import org.apache.gluten.GlutenConfig import org.apache.gluten.sql.shims.SparkShimLoader -import org.apache.spark.sql.execution.CommandResultExec +import org.apache.spark.SparkConf +import org.apache.spark.sql.execution.{CommandResultExec, InputIteratorTransformer} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.internal.SQLConf @@ -52,6 +53,11 @@ class VeloxMetricsSuite extends VeloxWholeStageTransformerSuite with AdaptiveSpa super.afterAll() } + override protected def sparkConf: SparkConf = { + super.sparkConf + .set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager") + } + test("test sort merge join metrics") { withSQLConf( GlutenConfig.COLUMNAR_FPRCE_SHUFFLED_HASH_JOIN_ENABLED.key -> "false", @@ -164,4 +170,18 @@ class VeloxMetricsSuite extends VeloxWholeStageTransformerSuite with AdaptiveSpa } } } + + test("Metrics of top-n's children") { + runQueryAndCompare("SELECT c1, c2 FROM metrics_t1 order by c2 limit 5") { + df => + val iit = find(df.queryExecution.executedPlan) { + case _: InputIteratorTransformer => true + case _ => false + } + assert(iit.isDefined) + val metrics = iit.get.metrics + assert(metrics("numOutputRows").value == 5) + assert(metrics("outputVectors").value == 1) + } + } } diff --git a/gluten-core/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala b/gluten-core/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala index ed691fc096138..7dfa0563d7439 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala @@ -21,7 +21,7 @@ import org.apache.gluten.backendsapi.BackendsApiManager import org.apache.gluten.exception.GlutenException import org.apache.gluten.expression._ import org.apache.gluten.extension.GlutenPlan -import org.apache.gluten.metrics.{GlutenTimeMetric, MetricsUpdater, NoopMetricsUpdater} +import org.apache.gluten.metrics.{GlutenTimeMetric, MetricsUpdater} import org.apache.gluten.substrait.`type`.{TypeBuilder, TypeNode} import org.apache.gluten.substrait.SubstraitContext import org.apache.gluten.substrait.plan.{PlanBuilder, PlanNode} @@ -350,7 +350,7 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f override def metricsUpdater(): MetricsUpdater = { child match { case transformer: TransformSupport => transformer.metricsUpdater() - case _ => NoopMetricsUpdater + case _ => MetricsUpdater.None } } @@ -361,7 +361,7 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f case _ => false } .map(_.asInstanceOf[TransformSupport].metricsUpdater()) - .getOrElse(NoopMetricsUpdater) + .getOrElse(MetricsUpdater.None) } override protected def withNewChildInternal(newChild: SparkPlan): WholeStageTransformer = diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/RemoveFilter.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/RemoveFilter.scala index b980c24227d56..413ab1bc8527f 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/RemoveFilter.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/RemoveFilter.scala @@ -17,7 +17,7 @@ package org.apache.gluten.extension.columnar.enumerated import org.apache.gluten.execution._ -import org.apache.gluten.metrics.{MetricsUpdater, NoopMetricsUpdater} +import org.apache.gluten.metrics.MetricsUpdater import org.apache.gluten.ras.path.Pattern._ import org.apache.gluten.ras.path.Pattern.Matchers._ import org.apache.gluten.ras.rule.{RasRule, Shape} @@ -71,7 +71,7 @@ object RemoveFilter extends RasRule[SparkPlan] { // spark.sql.adaptive.logLevel=ERROR. case class NoopFilter(override val child: SparkPlan, override val output: Seq[Attribute]) extends UnaryTransformSupport { - override def metricsUpdater(): MetricsUpdater = NoopMetricsUpdater + override def metricsUpdater(): MetricsUpdater = MetricsUpdater.None override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = copy(newChild) override def outputPartitioning: Partitioning = child.outputPartitioning override def outputOrdering: Seq[SortOrder] = child.outputOrdering diff --git a/gluten-core/src/main/scala/org/apache/gluten/metrics/MetricsUpdater.scala b/gluten-core/src/main/scala/org/apache/gluten/metrics/MetricsUpdater.scala index 0a622ba0b37d4..5797a65755c40 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/metrics/MetricsUpdater.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/metrics/MetricsUpdater.scala @@ -16,7 +16,6 @@ */ package org.apache.gluten.metrics -import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.utils.OASPackageBridge.InputMetricsWrapper /** @@ -26,16 +25,34 @@ import org.apache.spark.sql.utils.OASPackageBridge.InputMetricsWrapper * TODO: place it to some other where since it's used not only by whole stage facilities */ trait MetricsUpdater extends Serializable { + def updateInputMetrics(inputMetrics: InputMetricsWrapper): Unit = {} + def updateNativeMetrics(operatorMetrics: IOperatorMetrics): Unit = {} +} - def metrics: Map[String, SQLMetric] +object MetricsUpdater { + // An empty metrics updater. Used when the operator generates native metrics but + // it's unwanted to update the metrics in JVM side. + object Todo extends MetricsUpdater {} - def updateInputMetrics(inputMetrics: InputMetricsWrapper): Unit = {} + // Used when the operator doesn't generate native metrics. It's could because + // the operator doesn't generate any native query plan. + object None extends MetricsUpdater { + override def updateInputMetrics(inputMetrics: InputMetricsWrapper): Unit = + throw new UnsupportedOperationException() + override def updateNativeMetrics(operatorMetrics: IOperatorMetrics): Unit = + throw new UnsupportedOperationException() + } - def updateNativeMetrics(operatorMetrics: IOperatorMetrics): Unit = {} + // Indicates a branch of a MetricsUpdaterTree is terminated. It's not bound to + // any operators. + object Terminate extends MetricsUpdater { + override def updateInputMetrics(inputMetrics: InputMetricsWrapper): Unit = + throw new UnsupportedOperationException() + override def updateNativeMetrics(operatorMetrics: IOperatorMetrics): Unit = + throw new UnsupportedOperationException() + } } final case class MetricsUpdaterTree(updater: MetricsUpdater, children: Seq[MetricsUpdaterTree]) -object NoopMetricsUpdater extends MetricsUpdater { - override def metrics: Map[String, SQLMetric] = Map.empty -} +object MetricsUpdaterTree {} diff --git a/gluten-data/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala b/gluten-data/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala index f11800b89c31f..98ff9eddccc94 100644 --- a/gluten-data/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala +++ b/gluten-data/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala @@ -54,10 +54,13 @@ object MetricsUtil extends Logging { MetricsUpdaterTree( smj.metricsUpdater(), Seq(treeifyMetricsUpdaters(smj.bufferedPlan), treeifyMetricsUpdaters(smj.streamedPlan))) + case t: TransformSupport if t.metricsUpdater() == MetricsUpdater.None => + assert(t.children.size == 1, "MetricsUpdater.None can only be used on unary operator") + treeifyMetricsUpdaters(t.children.head) case t: TransformSupport => MetricsUpdaterTree(t.metricsUpdater(), t.children.map(treeifyMetricsUpdaters)) case _ => - MetricsUpdaterTree(NoopMetricsUpdater, Seq()) + MetricsUpdaterTree(MetricsUpdater.Terminate, Seq()) } } @@ -180,6 +183,8 @@ object MetricsUtil extends Logging { ) } + // FIXME: Metrics updating code is too magical to maintain. Tree-walking algorithm should be made + // more declarative than by counting down some kind of counters that don't have fixed definition. /** * @return * operator index and metrics index @@ -192,6 +197,9 @@ object MetricsUtil extends Logging { metricsIdx: Int, joinParamsMap: JMap[JLong, JoinParams], aggParamsMap: JMap[JLong, AggregationParams]): (JLong, Int) = { + if (mutNode.updater == MetricsUpdater.Terminate) { + return (operatorIdx, metricsIdx) + } val operatorMetrics = new JArrayList[OperatorMetrics]() var curMetricsIdx = metricsIdx relMap @@ -245,18 +253,16 @@ object MetricsUtil extends Logging { mutNode.children.foreach { child => - if (child.updater != NoopMetricsUpdater) { - val result = updateTransformerMetricsInternal( - child, - relMap, - newOperatorIdx, - metrics, - newMetricsIdx, - joinParamsMap, - aggParamsMap) - newOperatorIdx = result._1 - newMetricsIdx = result._2 - } + val result = updateTransformerMetricsInternal( + child, + relMap, + newOperatorIdx, + metrics, + newMetricsIdx, + joinParamsMap, + aggParamsMap) + newOperatorIdx = result._1 + newMetricsIdx = result._2 } (newOperatorIdx, newMetricsIdx) @@ -292,8 +298,6 @@ object MetricsUtil extends Logging { val numNativeMetrics = metrics.inputRows.length if (numNativeMetrics == 0) { () - } else if (mutNode.updater == NoopMetricsUpdater) { - () } else { updateTransformerMetricsInternal( mutNode, @@ -305,7 +309,7 @@ object MetricsUtil extends Logging { aggParamsMap) } } catch { - case e: Throwable => + case e: Exception => logWarning(s"Updating native metrics failed due to ${e.getCause}.") () }