Skip to content

Commit

Permalink
[GLUTEN-8018][CORE] Introduce ApplyResourceProfileExec to apply resou…
Browse files Browse the repository at this point in the history
…rce profile for query stage (apache#8195)
  • Loading branch information
zjuwangg authored Dec 17, 2024
1 parent 36f0a8f commit e94e01d
Showing 1 changed file with 79 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution

import org.apache.gluten.execution.GlutenPlan
import org.apache.gluten.extension.columnar.transition.Convention

import org.apache.spark.annotation.Experimental
import org.apache.spark.rdd.RDD
import org.apache.spark.resource.ResourceProfile
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder}
import org.apache.spark.sql.catalyst.plans.physical.{Distribution, Partitioning}
import org.apache.spark.sql.vectorized.ColumnarBatch

/**
* Used to apply specified resource profile for the whole stage.
* @param child
* @param resourceProfile
* resource profile specified for child belong stage.
*/
@Experimental
case class ApplyResourceProfileExec(child: SparkPlan, resourceProfile: ResourceProfile)
extends UnaryExecNode
with GlutenPlan {

override def batchType(): Convention.BatchType = {
Convention.get(child).batchType
}

override def rowType0(): Convention.RowType = {
Convention.get(child).rowType
}

override def outputPartitioning: Partitioning = {
child.outputPartitioning
}

override def requiredChildDistribution: scala.Seq[Distribution] = {
child.requiredChildDistribution
}

override def outputOrdering: scala.Seq[SortOrder] = {
child.outputOrdering
}

override def requiredChildOrdering: scala.Seq[scala.Seq[SortOrder]] = {
child.requiredChildOrdering
}

override protected def doExecute(): RDD[InternalRow] = {
log.info(s"Apply $resourceProfile for plan ${child.nodeName}")
child.execute.withResources(resourceProfile)
}

override protected def doExecuteColumnar(): RDD[ColumnarBatch] = {
log.info(s"Apply $resourceProfile for columnar plan ${child.nodeName}")
child.executeColumnar.withResources(resourceProfile)
}

override def output: scala.Seq[Attribute] = child.output

override protected def withNewChildInternal(newChild: SparkPlan): ApplyResourceProfileExec =
copy(child = newChild)
}

0 comments on commit e94e01d

Please sign in to comment.