diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala index 20a9a362536c..82be7f91151c 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala @@ -38,10 +38,13 @@ object MetricsUtil extends Logging { j.metricsUpdater(), // must put the buildPlan first Seq(treeifyMetricsUpdaters(j.buildPlan), treeifyMetricsUpdaters(j.streamedPlan))) + case t: TransformSupport if t.metricsUpdater() == MetricsUpdater.None => + assert(t.children.size == 1, "MetricsUpdater.None can only be used on unary operator") + treeifyMetricsUpdaters(t.children.head) case t: TransformSupport => MetricsUpdaterTree(t.metricsUpdater(), t.children.map(treeifyMetricsUpdaters)) case _ => - MetricsUpdaterTree(NoopMetricsUpdater, Seq()) + MetricsUpdaterTree(MetricsUpdater.Terminate, Seq()) } } @@ -107,8 +110,6 @@ object MetricsUtil extends Logging { s"Updating native metrics failed due to the wrong size of metrics data: " + s"$numNativeMetrics") () - } else if (mutNode.updater == NoopMetricsUpdater) { - () } else { updateTransformerMetricsInternal( mutNode, @@ -138,6 +139,9 @@ object MetricsUtil extends Logging { metricsIdx: Int, joinParamsMap: JMap[JLong, JoinParams], aggParamsMap: JMap[JLong, AggregationParams]): (JLong, Int) = { + if (mutNode.updater == MetricsUpdater.Terminate) { + return (operatorIdx, metricsIdx) + } val nodeMetricsList = new JArrayList[MetricsData]() var curMetricsIdx = metricsIdx relMap @@ -159,18 +163,16 @@ object MetricsUtil extends Logging { mutNode.children.foreach { child => - if (child.updater != NoopMetricsUpdater) { - val result = updateTransformerMetricsInternal( - child, - relMap, - newOperatorIdx, - metrics, - curMetricsIdx, - joinParamsMap, - aggParamsMap) - newOperatorIdx = result._1 - curMetricsIdx = result._2 - } + val result = updateTransformerMetricsInternal( + child, + relMap, + newOperatorIdx, + metrics, + curMetricsIdx, + joinParamsMap, + aggParamsMap) + newOperatorIdx = result._1 + curMetricsIdx = result._2 } (newOperatorIdx, curMetricsIdx) } diff --git a/backends-velox/src/main/scala/org/apache/gluten/execution/TopNTransformer.scala b/backends-velox/src/main/scala/org/apache/gluten/execution/TopNTransformer.scala index c2d12415c78b..01c89bee217b 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/execution/TopNTransformer.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/execution/TopNTransformer.scala @@ -19,7 +19,7 @@ package org.apache.gluten.execution import org.apache.gluten.backendsapi.BackendsApiManager import org.apache.gluten.expression.{ConverterUtils, ExpressionConverter} import org.apache.gluten.extension.ValidationResult -import org.apache.gluten.metrics.{MetricsUpdater, NoopMetricsUpdater} +import org.apache.gluten.metrics.MetricsUpdater import org.apache.gluten.substrait.`type`.TypeBuilder import org.apache.gluten.substrait.SubstraitContext import org.apache.gluten.substrait.extensions.ExtensionBuilder @@ -114,5 +114,5 @@ case class TopNTransformer( } } - override def metricsUpdater(): MetricsUpdater = NoopMetricsUpdater // TODO + override def metricsUpdater(): MetricsUpdater = MetricsUpdater.Todo // TODO } diff --git a/gluten-core/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala b/gluten-core/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala index ed691fc09613..7dfa0563d743 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala @@ -21,7 +21,7 @@ import org.apache.gluten.backendsapi.BackendsApiManager import org.apache.gluten.exception.GlutenException import org.apache.gluten.expression._ import org.apache.gluten.extension.GlutenPlan -import org.apache.gluten.metrics.{GlutenTimeMetric, MetricsUpdater, NoopMetricsUpdater} +import org.apache.gluten.metrics.{GlutenTimeMetric, MetricsUpdater} import org.apache.gluten.substrait.`type`.{TypeBuilder, TypeNode} import org.apache.gluten.substrait.SubstraitContext import org.apache.gluten.substrait.plan.{PlanBuilder, PlanNode} @@ -350,7 +350,7 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f override def metricsUpdater(): MetricsUpdater = { child match { case transformer: TransformSupport => transformer.metricsUpdater() - case _ => NoopMetricsUpdater + case _ => MetricsUpdater.None } } @@ -361,7 +361,7 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f case _ => false } .map(_.asInstanceOf[TransformSupport].metricsUpdater()) - .getOrElse(NoopMetricsUpdater) + .getOrElse(MetricsUpdater.None) } override protected def withNewChildInternal(newChild: SparkPlan): WholeStageTransformer = diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/RemoveFilter.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/RemoveFilter.scala index b980c24227d5..413ab1bc8527 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/RemoveFilter.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/RemoveFilter.scala @@ -17,7 +17,7 @@ package org.apache.gluten.extension.columnar.enumerated import org.apache.gluten.execution._ -import org.apache.gluten.metrics.{MetricsUpdater, NoopMetricsUpdater} +import org.apache.gluten.metrics.MetricsUpdater import org.apache.gluten.ras.path.Pattern._ import org.apache.gluten.ras.path.Pattern.Matchers._ import org.apache.gluten.ras.rule.{RasRule, Shape} @@ -71,7 +71,7 @@ object RemoveFilter extends RasRule[SparkPlan] { // spark.sql.adaptive.logLevel=ERROR. case class NoopFilter(override val child: SparkPlan, override val output: Seq[Attribute]) extends UnaryTransformSupport { - override def metricsUpdater(): MetricsUpdater = NoopMetricsUpdater + override def metricsUpdater(): MetricsUpdater = MetricsUpdater.None override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = copy(newChild) override def outputPartitioning: Partitioning = child.outputPartitioning override def outputOrdering: Seq[SortOrder] = child.outputOrdering diff --git a/gluten-core/src/main/scala/org/apache/gluten/metrics/MetricsUpdater.scala b/gluten-core/src/main/scala/org/apache/gluten/metrics/MetricsUpdater.scala index 8fca4f201dba..5797a65755c4 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/metrics/MetricsUpdater.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/metrics/MetricsUpdater.scala @@ -29,11 +29,22 @@ trait MetricsUpdater extends Serializable { def updateNativeMetrics(operatorMetrics: IOperatorMetrics): Unit = {} } -object NoopMetricsUpdater extends MetricsUpdater {} +object MetricsUpdater { + // An empty metrics updater. Used when the operator generates native metrics but + // it's unwanted to update the metrics in JVM side. + object Todo extends MetricsUpdater {} -final case class MetricsUpdaterTree(updater: MetricsUpdater, children: Seq[MetricsUpdaterTree]) + // Used when the operator doesn't generate native metrics. It's could because + // the operator doesn't generate any native query plan. + object None extends MetricsUpdater { + override def updateInputMetrics(inputMetrics: InputMetricsWrapper): Unit = + throw new UnsupportedOperationException() + override def updateNativeMetrics(operatorMetrics: IOperatorMetrics): Unit = + throw new UnsupportedOperationException() + } -object MetricsUpdaterTree { + // Indicates a branch of a MetricsUpdaterTree is terminated. It's not bound to + // any operators. object Terminate extends MetricsUpdater { override def updateInputMetrics(inputMetrics: InputMetricsWrapper): Unit = throw new UnsupportedOperationException() @@ -41,3 +52,7 @@ object MetricsUpdaterTree { throw new UnsupportedOperationException() } } + +final case class MetricsUpdaterTree(updater: MetricsUpdater, children: Seq[MetricsUpdaterTree]) + +object MetricsUpdaterTree {} diff --git a/gluten-data/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala b/gluten-data/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala index a1ce3ec780ae..98ff9eddccc9 100644 --- a/gluten-data/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala +++ b/gluten-data/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala @@ -54,10 +54,13 @@ object MetricsUtil extends Logging { MetricsUpdaterTree( smj.metricsUpdater(), Seq(treeifyMetricsUpdaters(smj.bufferedPlan), treeifyMetricsUpdaters(smj.streamedPlan))) + case t: TransformSupport if t.metricsUpdater() == MetricsUpdater.None => + assert(t.children.size == 1, "MetricsUpdater.None can only be used on unary operator") + treeifyMetricsUpdaters(t.children.head) case t: TransformSupport => MetricsUpdaterTree(t.metricsUpdater(), t.children.map(treeifyMetricsUpdaters)) case _ => - MetricsUpdaterTree(MetricsUpdaterTree.Terminate, Seq()) + MetricsUpdaterTree(MetricsUpdater.Terminate, Seq()) } } @@ -194,7 +197,7 @@ object MetricsUtil extends Logging { metricsIdx: Int, joinParamsMap: JMap[JLong, JoinParams], aggParamsMap: JMap[JLong, AggregationParams]): (JLong, Int) = { - if (mutNode.updater == MetricsUpdaterTree.Terminate) { + if (mutNode.updater == MetricsUpdater.Terminate) { return (operatorIdx, metricsIdx) } val operatorMetrics = new JArrayList[OperatorMetrics]()