diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/HashAggregateMetricsUpdater.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/HashAggregateMetricsUpdater.scala index e2014e5b8b84..b035d7a04fb0 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/HashAggregateMetricsUpdater.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/HashAggregateMetricsUpdater.scala @@ -65,7 +65,7 @@ class HashAggregateMetricsUpdater(val metrics: Map[String, SQLMetric]) } } } catch { - case e: Throwable => + case e: Exception => logError(s"Updating native metrics failed due to ${e.getCause}.") throw e } diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/HashJoinMetricsUpdater.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/HashJoinMetricsUpdater.scala index 3c35286c1c13..ca891bac27c6 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/HashJoinMetricsUpdater.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/HashJoinMetricsUpdater.scala @@ -104,7 +104,7 @@ class HashJoinMetricsUpdater(val metrics: Map[String, SQLMetric]) } } } catch { - case e: Throwable => + case e: Exception => logError(s"Updating native metrics failed due to ${e.getCause}.") throw e } diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala index a6dfb3dbcb1f..1376dc6a82d1 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala @@ -41,7 +41,7 @@ object MetricsUtil extends Logging { case t: TransformSupport => MetricsUpdaterTree(t.metricsUpdater(), t.children.map(treeifyMetricsUpdaters)) case _ => - MetricsUpdaterTree(NoopMetricsUpdater, Seq()) + MetricsUpdaterTree(MetricsUpdater.Terminate, Seq()) } } @@ -107,7 +107,7 @@ object MetricsUtil extends Logging { s"Updating native metrics failed due to the wrong size of metrics data: " + s"$numNativeMetrics") () - } else if (mutNode.updater == NoopMetricsUpdater) { + } else if (mutNode.updater == MetricsUpdater.Terminate) { () } else { updateTransformerMetricsInternal( @@ -159,7 +159,7 @@ object MetricsUtil extends Logging { mutNode.children.foreach { child => - if (child.updater != NoopMetricsUpdater) { + if (child.updater != MetricsUpdater.Terminate) { val result = updateTransformerMetricsInternal( child, relMap, diff --git a/backends-velox/src/main/scala/org/apache/gluten/execution/TopNTransformer.scala b/backends-velox/src/main/scala/org/apache/gluten/execution/TopNTransformer.scala index c2d12415c78b..01c89bee217b 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/execution/TopNTransformer.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/execution/TopNTransformer.scala @@ -19,7 +19,7 @@ package org.apache.gluten.execution import org.apache.gluten.backendsapi.BackendsApiManager import org.apache.gluten.expression.{ConverterUtils, ExpressionConverter} import org.apache.gluten.extension.ValidationResult -import org.apache.gluten.metrics.{MetricsUpdater, NoopMetricsUpdater} +import org.apache.gluten.metrics.MetricsUpdater import org.apache.gluten.substrait.`type`.TypeBuilder import org.apache.gluten.substrait.SubstraitContext import org.apache.gluten.substrait.extensions.ExtensionBuilder @@ -114,5 +114,5 @@ case class TopNTransformer( } } - override def metricsUpdater(): MetricsUpdater = NoopMetricsUpdater // TODO + override def metricsUpdater(): MetricsUpdater = MetricsUpdater.Todo // TODO } diff --git a/gluten-core/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala b/gluten-core/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala index ed691fc09613..7dfa0563d743 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala @@ -21,7 +21,7 @@ import org.apache.gluten.backendsapi.BackendsApiManager import org.apache.gluten.exception.GlutenException import org.apache.gluten.expression._ import org.apache.gluten.extension.GlutenPlan -import org.apache.gluten.metrics.{GlutenTimeMetric, MetricsUpdater, NoopMetricsUpdater} +import org.apache.gluten.metrics.{GlutenTimeMetric, MetricsUpdater} import org.apache.gluten.substrait.`type`.{TypeBuilder, TypeNode} import org.apache.gluten.substrait.SubstraitContext import org.apache.gluten.substrait.plan.{PlanBuilder, PlanNode} @@ -350,7 +350,7 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f override def metricsUpdater(): MetricsUpdater = { child match { case transformer: TransformSupport => transformer.metricsUpdater() - case _ => NoopMetricsUpdater + case _ => MetricsUpdater.None } } @@ -361,7 +361,7 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f case _ => false } .map(_.asInstanceOf[TransformSupport].metricsUpdater()) - .getOrElse(NoopMetricsUpdater) + .getOrElse(MetricsUpdater.None) } override protected def withNewChildInternal(newChild: SparkPlan): WholeStageTransformer = diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/RemoveFilter.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/RemoveFilter.scala index b980c24227d5..413ab1bc8527 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/RemoveFilter.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/RemoveFilter.scala @@ -17,7 +17,7 @@ package org.apache.gluten.extension.columnar.enumerated import org.apache.gluten.execution._ -import org.apache.gluten.metrics.{MetricsUpdater, NoopMetricsUpdater} +import org.apache.gluten.metrics.MetricsUpdater import org.apache.gluten.ras.path.Pattern._ import org.apache.gluten.ras.path.Pattern.Matchers._ import org.apache.gluten.ras.rule.{RasRule, Shape} @@ -71,7 +71,7 @@ object RemoveFilter extends RasRule[SparkPlan] { // spark.sql.adaptive.logLevel=ERROR. case class NoopFilter(override val child: SparkPlan, override val output: Seq[Attribute]) extends UnaryTransformSupport { - override def metricsUpdater(): MetricsUpdater = NoopMetricsUpdater + override def metricsUpdater(): MetricsUpdater = MetricsUpdater.None override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = copy(newChild) override def outputPartitioning: Partitioning = child.outputPartitioning override def outputOrdering: Seq[SortOrder] = child.outputOrdering diff --git a/gluten-core/src/main/scala/org/apache/gluten/metrics/MetricsUpdater.scala b/gluten-core/src/main/scala/org/apache/gluten/metrics/MetricsUpdater.scala index 0a622ba0b37d..5201df3b3472 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/metrics/MetricsUpdater.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/metrics/MetricsUpdater.scala @@ -16,7 +16,6 @@ */ package org.apache.gluten.metrics -import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.utils.OASPackageBridge.InputMetricsWrapper /** @@ -26,16 +25,34 @@ import org.apache.spark.sql.utils.OASPackageBridge.InputMetricsWrapper * TODO: place it to some other where since it's used not only by whole stage facilities */ trait MetricsUpdater extends Serializable { + def updateInputMetrics(inputMetrics: InputMetricsWrapper): Unit = {} + def updateNativeMetrics(operatorMetrics: IOperatorMetrics): Unit = {} +} - def metrics: Map[String, SQLMetric] +object MetricsUpdater { + // An empty metrics updater. Used when the operator generates native metrics but + // it's yet unwanted to update the metrics in JVM side. + object Todo extends MetricsUpdater {} - def updateInputMetrics(inputMetrics: InputMetricsWrapper): Unit = {} + // Used when the operator doesn't generate native metrics. It could be because + // the operator doesn't generate any native query plan. + object None extends MetricsUpdater { + override def updateInputMetrics(inputMetrics: InputMetricsWrapper): Unit = + throw new UnsupportedOperationException() + override def updateNativeMetrics(operatorMetrics: IOperatorMetrics): Unit = + throw new UnsupportedOperationException() + } - def updateNativeMetrics(operatorMetrics: IOperatorMetrics): Unit = {} + // Indicates a branch of a MetricsUpdaterTree is terminated. It's not bound to + // any operators. + object Terminate extends MetricsUpdater { + override def updateInputMetrics(inputMetrics: InputMetricsWrapper): Unit = + throw new UnsupportedOperationException() + override def updateNativeMetrics(operatorMetrics: IOperatorMetrics): Unit = + throw new UnsupportedOperationException() + } } final case class MetricsUpdaterTree(updater: MetricsUpdater, children: Seq[MetricsUpdaterTree]) -object NoopMetricsUpdater extends MetricsUpdater { - override def metrics: Map[String, SQLMetric] = Map.empty -} +object MetricsUpdaterTree {} diff --git a/gluten-data/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala b/gluten-data/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala index f11800b89c31..98ff9eddccc9 100644 --- a/gluten-data/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala +++ b/gluten-data/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala @@ -54,10 +54,13 @@ object MetricsUtil extends Logging { MetricsUpdaterTree( smj.metricsUpdater(), Seq(treeifyMetricsUpdaters(smj.bufferedPlan), treeifyMetricsUpdaters(smj.streamedPlan))) + case t: TransformSupport if t.metricsUpdater() == MetricsUpdater.None => + assert(t.children.size == 1, "MetricsUpdater.None can only be used on unary operator") + treeifyMetricsUpdaters(t.children.head) case t: TransformSupport => MetricsUpdaterTree(t.metricsUpdater(), t.children.map(treeifyMetricsUpdaters)) case _ => - MetricsUpdaterTree(NoopMetricsUpdater, Seq()) + MetricsUpdaterTree(MetricsUpdater.Terminate, Seq()) } } @@ -180,6 +183,8 @@ object MetricsUtil extends Logging { ) } + // FIXME: Metrics updating code is too magical to maintain. Tree-walking algorithm should be made + // more declarative than by counting down some kind of counters that don't have fixed definition. /** * @return * operator index and metrics index @@ -192,6 +197,9 @@ object MetricsUtil extends Logging { metricsIdx: Int, joinParamsMap: JMap[JLong, JoinParams], aggParamsMap: JMap[JLong, AggregationParams]): (JLong, Int) = { + if (mutNode.updater == MetricsUpdater.Terminate) { + return (operatorIdx, metricsIdx) + } val operatorMetrics = new JArrayList[OperatorMetrics]() var curMetricsIdx = metricsIdx relMap @@ -245,18 +253,16 @@ object MetricsUtil extends Logging { mutNode.children.foreach { child => - if (child.updater != NoopMetricsUpdater) { - val result = updateTransformerMetricsInternal( - child, - relMap, - newOperatorIdx, - metrics, - newMetricsIdx, - joinParamsMap, - aggParamsMap) - newOperatorIdx = result._1 - newMetricsIdx = result._2 - } + val result = updateTransformerMetricsInternal( + child, + relMap, + newOperatorIdx, + metrics, + newMetricsIdx, + joinParamsMap, + aggParamsMap) + newOperatorIdx = result._1 + newMetricsIdx = result._2 } (newOperatorIdx, newMetricsIdx) @@ -292,8 +298,6 @@ object MetricsUtil extends Logging { val numNativeMetrics = metrics.inputRows.length if (numNativeMetrics == 0) { () - } else if (mutNode.updater == NoopMetricsUpdater) { - () } else { updateTransformerMetricsInternal( mutNode, @@ -305,7 +309,7 @@ object MetricsUtil extends Logging { aggParamsMap) } } catch { - case e: Throwable => + case e: Exception => logWarning(s"Updating native metrics failed due to ${e.getCause}.") () }