From e3682fdafb505af87ad1c1605908c5ec30463a9e Mon Sep 17 00:00:00 2001 From: Mingliang Zhu Date: Mon, 25 Nov 2024 16:22:40 +0800 Subject: [PATCH] [GLUTEN-8010][CORE] Don't generate native metrics if transformer don't generate relNode (#8011) --- .../org/apache/gluten/metrics/MetricsUtil.scala | 3 +++ .../BasicPhysicalOperatorTransformer.scala | 15 ++++++++++----- .../gluten/execution/ExpandExecTransformer.scala | 12 ++++++++---- .../gluten/execution/SortExecTransformer.scala | 12 ++++++++---- .../gluten/execution/WholeStageTransformer.scala | 3 +++ .../gluten/execution/WindowExecTransformer.scala | 12 ++++++++---- .../gluten/substrait/SubstraitContext.scala | 13 ------------- 7 files changed, 40 insertions(+), 30 deletions(-) diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala index e1e0f7c11a09..7d81467e978f 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala @@ -38,6 +38,9 @@ object MetricsUtil extends Logging { j.metricsUpdater(), // must put the buildPlan first Seq(treeifyMetricsUpdaters(j.buildPlan), treeifyMetricsUpdaters(j.streamedPlan))) + case t: TransformSupport if t.metricsUpdater() == MetricsUpdater.None => + assert(t.children.size == 1, "MetricsUpdater.None can only be used on unary operator") + treeifyMetricsUpdaters(t.children.head) case t: TransformSupport => MetricsUpdaterTree(t.metricsUpdater(), t.children.map(treeifyMetricsUpdaters)) case _ => diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala index dbe667ebb2aa..2830ef404c0e 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala @@ -61,8 +61,13 @@ abstract class FilterExecTransformerBase(val cond: Expression, val input: SparkP case _ => false } - override def metricsUpdater(): MetricsUpdater = + override def isNoop: Boolean = getRemainingCondition == null + + override def metricsUpdater(): MetricsUpdater = if (isNoop) { + MetricsUpdater.None + } else { BackendsApiManager.getMetricsApiInstance.genFilterTransformerMetricsUpdater(metrics) + } def getRelNode( context: SubstraitContext, @@ -149,15 +154,15 @@ abstract class FilterExecTransformerBase(val cond: Expression, val input: SparkP override protected def doTransform(context: SubstraitContext): TransformContext = { val childCtx = child.asInstanceOf[TransformSupport].transform(context) - val remainingCondition = getRemainingCondition - val operatorId = context.nextOperatorId(this.nodeName) - if (remainingCondition == null) { + if (isNoop) { // The computing for this filter is not needed. - context.registerEmptyRelToOperator(operatorId) // Since some columns' nullability will be removed after this filter, we need to update the // outputAttributes of child context. return TransformContext(output, childCtx.root) } + + val operatorId = context.nextOperatorId(this.nodeName) + val remainingCondition = getRemainingCondition val currRel = getRelNode( context, remainingCondition, diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/ExpandExecTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ExpandExecTransformer.scala index b600175b2826..c6936daaffe5 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/ExpandExecTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ExpandExecTransformer.scala @@ -48,8 +48,13 @@ case class ExpandExecTransformer( AttributeSet.fromAttributeSets(projections.flatten.map(_.references)) } - override def metricsUpdater(): MetricsUpdater = + override def isNoop: Boolean = projections == null || projections.isEmpty + + override def metricsUpdater(): MetricsUpdater = if (isNoop) { + MetricsUpdater.None + } else { BackendsApiManager.getMetricsApiInstance.genExpandTransformerMetricsUpdater(metrics) + } // The GroupExpressions can output data with arbitrary partitioning, so set it // as UNKNOWN partitioning @@ -112,13 +117,12 @@ case class ExpandExecTransformer( override protected def doTransform(context: SubstraitContext): TransformContext = { val childCtx = child.asInstanceOf[TransformSupport].transform(context) - val operatorId = context.nextOperatorId(this.nodeName) - if (projections == null || projections.isEmpty) { + if (isNoop) { // The computing for this Expand is not needed. - context.registerEmptyRelToOperator(operatorId) return childCtx } + val operatorId = context.nextOperatorId(this.nodeName) val currRel = getRelNode(context, projections, child.output, operatorId, childCtx.root, validation = false) assert(currRel != null, "Expand Rel should be valid") diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/SortExecTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/SortExecTransformer.scala index c62a30b84632..6f9564e6d54f 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/SortExecTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/SortExecTransformer.scala @@ -44,8 +44,13 @@ case class SortExecTransformer( @transient override lazy val metrics = BackendsApiManager.getMetricsApiInstance.genSortTransformerMetrics(sparkContext) - override def metricsUpdater(): MetricsUpdater = + override def isNoop: Boolean = sortOrder == null || sortOrder.isEmpty + + override def metricsUpdater(): MetricsUpdater = if (isNoop) { + MetricsUpdater.None + } else { BackendsApiManager.getMetricsApiInstance.genSortTransformerMetricsUpdater(metrics) + } override def output: Seq[Attribute] = child.output @@ -103,13 +108,12 @@ case class SortExecTransformer( override protected def doTransform(context: SubstraitContext): TransformContext = { val childCtx = child.asInstanceOf[TransformSupport].transform(context) - val operatorId = context.nextOperatorId(this.nodeName) - if (sortOrder == null || sortOrder.isEmpty) { + if (isNoop) { // The computing for this project is not needed. - context.registerEmptyRelToOperator(operatorId) return childCtx } + val operatorId = context.nextOperatorId(this.nodeName) val currRel = getRelNode(context, sortOrder, child.output, operatorId, childCtx.root, validation = false) assert(currRel != null, "Sort Rel should be valid") diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala index e8a42883a54f..6414b67a8092 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala @@ -98,6 +98,9 @@ trait TransformSupport extends GlutenPlan { Seq(plan.executeColumnar()) } } + + // When true, it will not generate relNode, nor will it generate native metrics. + def isNoop: Boolean = false } trait LeafTransformSupport extends TransformSupport with LeafExecNode { diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WindowExecTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WindowExecTransformer.scala index 7b9e2865f883..28d780992492 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WindowExecTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WindowExecTransformer.scala @@ -51,8 +51,13 @@ case class WindowExecTransformer( @transient override lazy val metrics = BackendsApiManager.getMetricsApiInstance.genWindowTransformerMetrics(sparkContext) - override def metricsUpdater(): MetricsUpdater = + override def isNoop: Boolean = windowExpression == null || windowExpression.isEmpty + + override def metricsUpdater(): MetricsUpdater = if (isNoop) { + MetricsUpdater.None + } else { BackendsApiManager.getMetricsApiInstance.genWindowTransformerMetricsUpdater(metrics) + } override def output: Seq[Attribute] = child.output ++ windowExpression.map(_.toAttribute) @@ -177,13 +182,12 @@ case class WindowExecTransformer( override protected def doTransform(context: SubstraitContext): TransformContext = { val childCtx = child.asInstanceOf[TransformSupport].transform(context) - val operatorId = context.nextOperatorId(this.nodeName) - if (windowExpression == null || windowExpression.isEmpty) { + if (isNoop) { // The computing for this operator is not needed. - context.registerEmptyRelToOperator(operatorId) return childCtx } + val operatorId = context.nextOperatorId(this.nodeName) val currRel = getWindowRel(context, child.output, operatorId, childCtx.root, validation = false) assert(currRel != null, "Window Rel should be valid") diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/substrait/SubstraitContext.scala b/gluten-substrait/src/main/scala/org/apache/gluten/substrait/SubstraitContext.scala index 79148d9f3093..1ceb2d4155ab 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/substrait/SubstraitContext.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/substrait/SubstraitContext.scala @@ -112,19 +112,6 @@ class SubstraitContext extends Serializable { id } - /** - * Register empty rel list to certain operator id. Used when the computing of a Spark transformer - * is omitted. - * @param operatorId - * operator id - */ - def registerEmptyRelToOperator(operatorId: JLong): Unit = { - if (!operatorToRelsMap.containsKey(operatorId)) { - val rels = new JArrayList[JLong]() - operatorToRelsMap.put(operatorId, rels) - } - } - /** * Return the registered map. * @return