Skip to content

Commit

Permalink
[GLUTEN-8010][CORE] Don't generate native metrics if transformer don'…
Browse files Browse the repository at this point in the history
…t generate relNode (apache#8011)
  • Loading branch information
zml1206 authored Nov 25, 2024
1 parent e5b4b4c commit e3682fd
Show file tree
Hide file tree
Showing 7 changed files with 40 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ object MetricsUtil extends Logging {
j.metricsUpdater(),
// must put the buildPlan first
Seq(treeifyMetricsUpdaters(j.buildPlan), treeifyMetricsUpdaters(j.streamedPlan)))
case t: TransformSupport if t.metricsUpdater() == MetricsUpdater.None =>
assert(t.children.size == 1, "MetricsUpdater.None can only be used on unary operator")
treeifyMetricsUpdaters(t.children.head)
case t: TransformSupport =>
MetricsUpdaterTree(t.metricsUpdater(), t.children.map(treeifyMetricsUpdaters))
case _ =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,13 @@ abstract class FilterExecTransformerBase(val cond: Expression, val input: SparkP
case _ => false
}

override def metricsUpdater(): MetricsUpdater =
override def isNoop: Boolean = getRemainingCondition == null

override def metricsUpdater(): MetricsUpdater = if (isNoop) {
MetricsUpdater.None
} else {
BackendsApiManager.getMetricsApiInstance.genFilterTransformerMetricsUpdater(metrics)
}

def getRelNode(
context: SubstraitContext,
Expand Down Expand Up @@ -149,15 +154,15 @@ abstract class FilterExecTransformerBase(val cond: Expression, val input: SparkP

override protected def doTransform(context: SubstraitContext): TransformContext = {
val childCtx = child.asInstanceOf[TransformSupport].transform(context)
val remainingCondition = getRemainingCondition
val operatorId = context.nextOperatorId(this.nodeName)
if (remainingCondition == null) {
if (isNoop) {
// The computing for this filter is not needed.
context.registerEmptyRelToOperator(operatorId)
// Since some columns' nullability will be removed after this filter, we need to update the
// outputAttributes of child context.
return TransformContext(output, childCtx.root)
}

val operatorId = context.nextOperatorId(this.nodeName)
val remainingCondition = getRemainingCondition
val currRel = getRelNode(
context,
remainingCondition,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,13 @@ case class ExpandExecTransformer(
AttributeSet.fromAttributeSets(projections.flatten.map(_.references))
}

override def metricsUpdater(): MetricsUpdater =
override def isNoop: Boolean = projections == null || projections.isEmpty

override def metricsUpdater(): MetricsUpdater = if (isNoop) {
MetricsUpdater.None
} else {
BackendsApiManager.getMetricsApiInstance.genExpandTransformerMetricsUpdater(metrics)
}

// The GroupExpressions can output data with arbitrary partitioning, so set it
// as UNKNOWN partitioning
Expand Down Expand Up @@ -112,13 +117,12 @@ case class ExpandExecTransformer(

override protected def doTransform(context: SubstraitContext): TransformContext = {
val childCtx = child.asInstanceOf[TransformSupport].transform(context)
val operatorId = context.nextOperatorId(this.nodeName)
if (projections == null || projections.isEmpty) {
if (isNoop) {
// The computing for this Expand is not needed.
context.registerEmptyRelToOperator(operatorId)
return childCtx
}

val operatorId = context.nextOperatorId(this.nodeName)
val currRel =
getRelNode(context, projections, child.output, operatorId, childCtx.root, validation = false)
assert(currRel != null, "Expand Rel should be valid")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,13 @@ case class SortExecTransformer(
@transient override lazy val metrics =
BackendsApiManager.getMetricsApiInstance.genSortTransformerMetrics(sparkContext)

override def metricsUpdater(): MetricsUpdater =
override def isNoop: Boolean = sortOrder == null || sortOrder.isEmpty

override def metricsUpdater(): MetricsUpdater = if (isNoop) {
MetricsUpdater.None
} else {
BackendsApiManager.getMetricsApiInstance.genSortTransformerMetricsUpdater(metrics)
}

override def output: Seq[Attribute] = child.output

Expand Down Expand Up @@ -103,13 +108,12 @@ case class SortExecTransformer(

override protected def doTransform(context: SubstraitContext): TransformContext = {
val childCtx = child.asInstanceOf[TransformSupport].transform(context)
val operatorId = context.nextOperatorId(this.nodeName)
if (sortOrder == null || sortOrder.isEmpty) {
if (isNoop) {
// The computing for this project is not needed.
context.registerEmptyRelToOperator(operatorId)
return childCtx
}

val operatorId = context.nextOperatorId(this.nodeName)
val currRel =
getRelNode(context, sortOrder, child.output, operatorId, childCtx.root, validation = false)
assert(currRel != null, "Sort Rel should be valid")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ trait TransformSupport extends GlutenPlan {
Seq(plan.executeColumnar())
}
}

// When true, it will not generate relNode, nor will it generate native metrics.
def isNoop: Boolean = false
}

trait LeafTransformSupport extends TransformSupport with LeafExecNode {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,13 @@ case class WindowExecTransformer(
@transient override lazy val metrics =
BackendsApiManager.getMetricsApiInstance.genWindowTransformerMetrics(sparkContext)

override def metricsUpdater(): MetricsUpdater =
override def isNoop: Boolean = windowExpression == null || windowExpression.isEmpty

override def metricsUpdater(): MetricsUpdater = if (isNoop) {
MetricsUpdater.None
} else {
BackendsApiManager.getMetricsApiInstance.genWindowTransformerMetricsUpdater(metrics)
}

override def output: Seq[Attribute] = child.output ++ windowExpression.map(_.toAttribute)

Expand Down Expand Up @@ -177,13 +182,12 @@ case class WindowExecTransformer(

override protected def doTransform(context: SubstraitContext): TransformContext = {
val childCtx = child.asInstanceOf[TransformSupport].transform(context)
val operatorId = context.nextOperatorId(this.nodeName)
if (windowExpression == null || windowExpression.isEmpty) {
if (isNoop) {
// The computing for this operator is not needed.
context.registerEmptyRelToOperator(operatorId)
return childCtx
}

val operatorId = context.nextOperatorId(this.nodeName)
val currRel =
getWindowRel(context, child.output, operatorId, childCtx.root, validation = false)
assert(currRel != null, "Window Rel should be valid")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,19 +112,6 @@ class SubstraitContext extends Serializable {
id
}

/**
* Register empty rel list to certain operator id. Used when the computing of a Spark transformer
* is omitted.
* @param operatorId
* operator id
*/
def registerEmptyRelToOperator(operatorId: JLong): Unit = {
if (!operatorToRelsMap.containsKey(operatorId)) {
val rels = new JArrayList[JLong]()
operatorToRelsMap.put(operatorId, rels)
}
}

/**
* Return the registered map.
* @return
Expand Down

0 comments on commit e3682fd

Please sign in to comment.