From c9260da9305dc18e353ec6a37178fa44f53c5a42 Mon Sep 17 00:00:00 2001 From: Terry Wang Date: Fri, 13 Dec 2024 16:40:43 +0800 Subject: [PATCH 1/4] Introduce ApplyResourceProfileExec to apply resource profile for the stage --- .../execution/ApplyResourceProfileExec.scala | 91 +++++++++++++++++++ 1 file changed, 91 insertions(+) create mode 100644 gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ApplyResourceProfileExec.scala diff --git a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ApplyResourceProfileExec.scala b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ApplyResourceProfileExec.scala new file mode 100644 index 000000000000..70ca501dee01 --- /dev/null +++ b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ApplyResourceProfileExec.scala @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution + +import org.apache.gluten.execution.GlutenPlan +import org.apache.gluten.extension.columnar.transition.Convention + +import org.apache.spark.rdd.RDD +import org.apache.spark.resource.ResourceProfile +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OrderPreservingUnaryNode} +import org.apache.spark.sql.catalyst.plans.physical.{Distribution, Partitioning} +import org.apache.spark.sql.vectorized.ColumnarBatch + +private case class ApplyResourceProfileExecAdaptor(child: LogicalPlan) + extends OrderPreservingUnaryNode { + override def output: Seq[Attribute] = child.output + + // For spark 3.2. + protected def withNewChildInternal(newChild: LogicalPlan): ApplyResourceProfileExecAdaptor = + copy(child = newChild) +} + +/** + * Used to apply specified resource profile for the whole stage. + * @param child + * @param resourceProfile + * resource profile specified for child belong stage. + */ +case class ApplyResourceProfileExec(child: SparkPlan, resourceProfile: ResourceProfile) + extends UnaryExecNode + with GlutenPlan { + + if (child.logicalLink.isDefined) { + setLogicalLink(ApplyResourceProfileExecAdaptor(child.logicalLink.get)) + } + + override def batchType(): Convention.BatchType = { + Convention.get(child).batchType + } + + override def rowType0(): Convention.RowType = { + Convention.get(child).rowType + } + + override def outputPartitioning: Partitioning = { + child.outputPartitioning + } + + override def requiredChildDistribution: scala.Seq[Distribution] = { + child.requiredChildDistribution + } + + override def outputOrdering: scala.Seq[SortOrder] = { + child.outputOrdering + } + + override def requiredChildOrdering: scala.Seq[scala.Seq[SortOrder]] = { + child.requiredChildOrdering + } + + override protected def doExecute(): RDD[InternalRow] = { + log.info(s"Apply $resourceProfile for plan ${child.nodeName}") + child.execute.withResources(resourceProfile) + } + + override protected def doExecuteColumnar(): RDD[ColumnarBatch] = { + log.info(s"Apply $resourceProfile for columnar plan ${child.nodeName}") + child.executeColumnar.withResources(resourceProfile) + } + + override def output: scala.Seq[Attribute] = child.output + + override protected def withNewChildInternal(newChild: SparkPlan): ApplyResourceProfileExec = + copy(child = newChild) +} From 58e7535d87ecbfee28f196518c7ab4c675a1084f Mon Sep 17 00:00:00 2001 From: Terry Wang Date: Tue, 17 Dec 2024 14:18:42 +0800 Subject: [PATCH 2/4] remove logical adaptor --- .../sql/execution/ApplyResourceProfileExec.scala | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ApplyResourceProfileExec.scala b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ApplyResourceProfileExec.scala index 70ca501dee01..aa5b1c1d1f7f 100644 --- a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ApplyResourceProfileExec.scala +++ b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ApplyResourceProfileExec.scala @@ -23,19 +23,9 @@ import org.apache.spark.rdd.RDD import org.apache.spark.resource.ResourceProfile import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder} -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OrderPreservingUnaryNode} import org.apache.spark.sql.catalyst.plans.physical.{Distribution, Partitioning} import org.apache.spark.sql.vectorized.ColumnarBatch -private case class ApplyResourceProfileExecAdaptor(child: LogicalPlan) - extends OrderPreservingUnaryNode { - override def output: Seq[Attribute] = child.output - - // For spark 3.2. - protected def withNewChildInternal(newChild: LogicalPlan): ApplyResourceProfileExecAdaptor = - copy(child = newChild) -} - /** * Used to apply specified resource profile for the whole stage. * @param child @@ -46,10 +36,6 @@ case class ApplyResourceProfileExec(child: SparkPlan, resourceProfile: ResourceP extends UnaryExecNode with GlutenPlan { - if (child.logicalLink.isDefined) { - setLogicalLink(ApplyResourceProfileExecAdaptor(child.logicalLink.get)) - } - override def batchType(): Convention.BatchType = { Convention.get(child).batchType } From 1376cfb596f4d6e8c0f309b4367936f8f49bb410 Mon Sep 17 00:00:00 2001 From: Terry Wang Date: Tue, 17 Dec 2024 14:46:11 +0800 Subject: [PATCH 3/4] trigger ci From 7e78c90cfcb23b93dd68756833f4871ccf2ac1dd Mon Sep 17 00:00:00 2001 From: Terry Wang Date: Tue, 17 Dec 2024 15:25:09 +0800 Subject: [PATCH 4/4] add Experimental marker --- .../apache/spark/sql/execution/ApplyResourceProfileExec.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ApplyResourceProfileExec.scala b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ApplyResourceProfileExec.scala index aa5b1c1d1f7f..17640f461213 100644 --- a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ApplyResourceProfileExec.scala +++ b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ApplyResourceProfileExec.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution import org.apache.gluten.execution.GlutenPlan import org.apache.gluten.extension.columnar.transition.Convention +import org.apache.spark.annotation.Experimental import org.apache.spark.rdd.RDD import org.apache.spark.resource.ResourceProfile import org.apache.spark.sql.catalyst.InternalRow @@ -32,6 +33,7 @@ import org.apache.spark.sql.vectorized.ColumnarBatch * @param resourceProfile * resource profile specified for child belong stage. */ +@Experimental case class ApplyResourceProfileExec(child: SparkPlan, resourceProfile: ResourceProfile) extends UnaryExecNode with GlutenPlan {