Skip to content

Commit

Permalink
Introduce ApplyResourceProfileExec to apply resource profile for the …
Browse files Browse the repository at this point in the history
…stage
  • Loading branch information
zjuwangg committed Dec 13, 2024
1 parent 8716bc4 commit 6bfa7e6
Showing 1 changed file with 99 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.execution

import org.apache.gluten.extension.columnar.transition.Convention

import org.apache.spark.rdd.RDD
import org.apache.spark.resource.ResourceProfile
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OrderPreservingUnaryNode}
import org.apache.spark.sql.catalyst.plans.physical.{Distribution, Partitioning}
import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
import org.apache.spark.sql.vectorized.ColumnarBatch

private case class ApplyResourceProfileExecAdaptor(child: LogicalPlan)
extends OrderPreservingUnaryNode {
override def output: Seq[Attribute] = child.output

// For spark 3.2.
protected def withNewChildInternal(newChild: LogicalPlan): ApplyResourceProfileExecAdaptor =
copy(child = newChild)
}

/**
* Used to apply specified resource profile for the whole stage.
* @param child
* @param resourceProfile
*/
case class ApplyResourceProfileExec(child: SparkPlan, resourceProfile: ResourceProfile)
extends UnaryExecNode
with GlutenPlan {

if (child.logicalLink.isDefined) {
setLogicalLink(ApplyResourceProfileExecAdaptor(child.logicalLink.get))
}

override def batchType(): Convention.BatchType = {
child match {
case plan: GlutenPlan =>
plan.batchType()
case _ if child.supportsColumnar => Convention.BatchType.VanillaBatch
case _ => Convention.BatchType.None
}
}

override def rowType0(): Convention.RowType = {
child match {
case plan: GlutenPlan =>
plan.rowType0()
case _ => Convention.RowType.VanillaRow
}
}

override def outputPartitioning: Partitioning = {
child.outputPartitioning
}

override def requiredChildDistribution: scala.Seq[Distribution] = {
child.requiredChildDistribution
}

override def outputOrdering: scala.Seq[SortOrder] = {
child.outputOrdering
}

override def requiredChildOrdering: scala.Seq[scala.Seq[SortOrder]] = {
child.requiredChildOrdering
}

override protected def doExecute(): RDD[InternalRow] = {
log.info(s"Apply $resourceProfile for plan ${child.nodeName}")
child.doExecute().withResources(resourceProfile)
}

override protected def doExecuteColumnar(): RDD[ColumnarBatch] = {
log.info(s"Apply $resourceProfile for columnar plan ${child.nodeName}")
child.doExecuteColumnar().withResources(resourceProfile)
}

override def output: scala.Seq[Attribute] = child.output

override protected def withNewChildInternal(newChild: SparkPlan): ApplyResourceProfileExec =
copy(child = newChild)
}

0 comments on commit 6bfa7e6

Please sign in to comment.