Skip to content

Commit

Permalink
Revert "Provided interface to inject ResourceProfile for Columnar stage"
Browse files Browse the repository at this point in the history
This reverts commit 228429d.
  • Loading branch information
zjuwangg committed Dec 13, 2024
1 parent 228429d commit 8716bc4
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 137 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import org.apache.gluten.vectorized.CHColumnarBatchSerializer
import org.apache.spark.ShuffleDependency
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.resource.ResourceProfile
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.{GenShuffleWriterParameters, GlutenShuffleWriterWrapper, HashPartitioningWrapper}
import org.apache.spark.shuffle.utils.CHShuffleUtil
Expand Down Expand Up @@ -470,8 +469,7 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with Logging {
mode: BroadcastMode,
child: SparkPlan,
numOutputRows: SQLMetric,
dataSize: SQLMetric,
resourceProfile: Option[ResourceProfile] = None): BuildSideRelation = {
dataSize: SQLMetric): BuildSideRelation = {

val (buildKeys, isNullAware) = mode match {
case mode1: HashedRelationBroadcastMode =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import org.apache.gluten.vectorized.{ColumnarBatchSerializer, ColumnarBatchSeria
import org.apache.spark.{ShuffleDependency, SparkException}
import org.apache.spark.api.python.{ColumnarArrowEvalPythonExec, PullOutArrowEvalPythonPreProjectHelper}
import org.apache.spark.rdd.RDD
import org.apache.spark.resource.ResourceProfile
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.{GenShuffleWriterParameters, GlutenShuffleWriterWrapper}
import org.apache.spark.shuffle.utils.ShuffleUtil
Expand Down Expand Up @@ -621,23 +620,12 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
mode: BroadcastMode,
child: SparkPlan,
numOutputRows: SQLMetric,
dataSize: SQLMetric,
resourceProfile: Option[ResourceProfile] = None): BuildSideRelation = {
val serialized: Array[ColumnarBatchSerializeResult] =
if (resourceProfile.isDefined) {
child
.executeColumnar()
.withResources(resourceProfile.get)
.mapPartitions(itr => Iterator(BroadcastUtils.serializeStream(itr)))
.filter(_.getNumRows != 0)
.collect
} else {
child
.executeColumnar()
.mapPartitions(itr => Iterator(BroadcastUtils.serializeStream(itr)))
.filter(_.getNumRows != 0)
.collect
}
dataSize: SQLMetric): BuildSideRelation = {
val serialized: Array[ColumnarBatchSerializeResult] = child
.executeColumnar()
.mapPartitions(itr => Iterator(BroadcastUtils.serializeStream(itr)))
.filter(_.getNumRows != 0)
.collect
val rawSize = serialized.map(_.getSerialized.length).sum
if (rawSize >= BroadcastExchangeExec.MAX_BROADCAST_TABLE_BYTES) {
throw new SparkException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import org.apache.gluten.substrait.expression.{ExpressionBuilder, ExpressionNode

import org.apache.spark.ShuffleDependency
import org.apache.spark.rdd.RDD
import org.apache.spark.resource.ResourceProfile
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.{GenShuffleWriterParameters, GlutenShuffleWriterWrapper}
import org.apache.spark.sql.catalyst.catalog.BucketSpec
Expand Down Expand Up @@ -358,8 +357,7 @@ trait SparkPlanExecApi {
mode: BroadcastMode,
child: SparkPlan,
numOutputRows: SQLMetric,
dataSize: SQLMetric,
resourceProfile: Option[ResourceProfile] = None): BuildSideRelation
dataSize: SQLMetric): BuildSideRelation

def doCanonicalizeForBroadcastMode(mode: BroadcastMode): BroadcastMode = {
mode.canonicalized
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import org.apache.gluten.utils.SubstraitPlanPrinterUtil

import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.resource.ResourceProfile
import org.apache.spark.softaffinity.SoftAffinity
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder}
Expand Down Expand Up @@ -195,22 +194,10 @@ trait UnaryTransformSupport extends TransformSupport with UnaryExecNode {
}
}

/** Base interface for a query plan that can be used to set ResourceProfile. */
trait WithResourceProfileSupport {
private var resourceProfile: Option[ResourceProfile] = None

def withResourceProfile(resourceProfile: ResourceProfile): Unit = {
this.resourceProfile = Some(resourceProfile)
}

def getResourceProfile: Option[ResourceProfile] = resourceProfile
}

case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = false)(
val transformStageId: Int
) extends WholeStageTransformerGenerateTreeStringShim
with UnaryTransformSupport
with WithResourceProfileSupport {
with UnaryTransformSupport {

def stageId: Int = transformStageId

Expand Down Expand Up @@ -379,99 +366,93 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f
// Check if BatchScan exists.
val basicScanExecTransformers = findAllScanTransformers()

var finalRdd =
if (basicScanExecTransformers.nonEmpty) {

/**
* If containing scan exec transformer this "whole stage" generates a RDD which itself takes
* care of SCAN there won't be any other RDD for SCAN. As a result, genFirstStageIterator
* rather than genFinalStageIterator will be invoked
*/
val allScanPartitions = basicScanExecTransformers.map(_.getPartitions.toIndexedSeq)
val allScanSplitInfos =
getSplitInfosFromPartitions(basicScanExecTransformers, allScanPartitions)
if (GlutenConfig.getConf.enableHdfsViewfs) {
allScanSplitInfos.foreach {
splitInfos =>
splitInfos.foreach {
case splitInfo: LocalFilesNode =>
val paths = splitInfo.getPaths.asScala
if (paths.nonEmpty && paths.head.startsWith("viewfs")) {
// Convert the viewfs path into hdfs
val newPaths = paths.map {
viewfsPath =>
val viewPath = new Path(viewfsPath)
val viewFileSystem =
FileSystem.get(viewPath.toUri, serializableHadoopConf.value)
viewFileSystem.resolvePath(viewPath).toString
}
splitInfo.setPaths(newPaths.asJava)
if (basicScanExecTransformers.nonEmpty) {

/**
* If containing scan exec transformer this "whole stage" generates a RDD which itself takes
* care of SCAN there won't be any other RDD for SCAN. As a result, genFirstStageIterator
* rather than genFinalStageIterator will be invoked
*/
val allScanPartitions = basicScanExecTransformers.map(_.getPartitions.toIndexedSeq)
val allScanSplitInfos =
getSplitInfosFromPartitions(basicScanExecTransformers, allScanPartitions)
if (GlutenConfig.getConf.enableHdfsViewfs) {
allScanSplitInfos.foreach {
splitInfos =>
splitInfos.foreach {
case splitInfo: LocalFilesNode =>
val paths = splitInfo.getPaths.asScala
if (paths.nonEmpty && paths.head.startsWith("viewfs")) {
// Convert the viewfs path into hdfs
val newPaths = paths.map {
viewfsPath =>
val viewPath = new Path(viewfsPath)
val viewFileSystem =
FileSystem.get(viewPath.toUri, serializableHadoopConf.value)
viewFileSystem.resolvePath(viewPath).toString
}
}
}
splitInfo.setPaths(newPaths.asJava)
}
}
}
}

val inputPartitions =
BackendsApiManager.getIteratorApiInstance.genPartitions(
wsCtx,
allScanSplitInfos,
basicScanExecTransformers)

val rdd = new GlutenWholeStageColumnarRDD(
sparkContext,
inputPartitions,
inputRDDs,
pipelineTime,
leafInputMetricsUpdater(),
BackendsApiManager.getMetricsApiInstance.metricsUpdatingFunction(
child,
wsCtx.substraitContext.registeredRelMap,
wsCtx.substraitContext.registeredJoinParams,
wsCtx.substraitContext.registeredAggregationParams
)
)
(0 until allScanPartitions.head.size).foreach(
i => {
val currentPartitions = allScanPartitions.map(_(i))
currentPartitions.indices.foreach(
i =>
currentPartitions(i) match {
case f: FilePartition =>
SoftAffinity.updateFilePartitionLocations(f, rdd.id)
case _ =>
})
})
rdd
} else {

/**
* the whole stage contains NO BasicScanExecTransformer. this the default case for:
* 1. SCAN with clickhouse backend (check
* ColumnarCollapseTransformStages#separateScanRDD()) 2. test case where query plan is
* constructed from simple dataframes (e.g. GlutenDataFrameAggregateSuite) in these
* cases, separate RDDs takes care of SCAN as a result, genFinalStageIterator rather
* than genFirstStageIterator will be invoked
*/
new WholeStageZippedPartitionsRDD(
sparkContext,
inputRDDs,
numaBindingInfo,
sparkConf,
val inputPartitions =
BackendsApiManager.getIteratorApiInstance.genPartitions(
wsCtx,
pipelineTime,
BackendsApiManager.getMetricsApiInstance.metricsUpdatingFunction(
child,
wsCtx.substraitContext.registeredRelMap,
wsCtx.substraitContext.registeredJoinParams,
wsCtx.substraitContext.registeredAggregationParams
),
materializeInput
allScanSplitInfos,
basicScanExecTransformers)

val rdd = new GlutenWholeStageColumnarRDD(
sparkContext,
inputPartitions,
inputRDDs,
pipelineTime,
leafInputMetricsUpdater(),
BackendsApiManager.getMetricsApiInstance.metricsUpdatingFunction(
child,
wsCtx.substraitContext.registeredRelMap,
wsCtx.substraitContext.registeredJoinParams,
wsCtx.substraitContext.registeredAggregationParams
)
}
if (getResourceProfile.isDefined) {
finalRdd = finalRdd.withResources(getResourceProfile.get)
)
(0 until allScanPartitions.head.size).foreach(
i => {
val currentPartitions = allScanPartitions.map(_(i))
currentPartitions.indices.foreach(
i =>
currentPartitions(i) match {
case f: FilePartition =>
SoftAffinity.updateFilePartitionLocations(f, rdd.id)
case _ =>
})
})
rdd
} else {

/**
* the whole stage contains NO BasicScanExecTransformer. this the default case for:
* 1. SCAN with clickhouse backend (check ColumnarCollapseTransformStages#separateScanRDD())
* 2. test case where query plan is constructed from simple dataframes (e.g.
* GlutenDataFrameAggregateSuite) in these cases, separate RDDs takes care of SCAN as a
* result, genFinalStageIterator rather than genFirstStageIterator will be invoked
*/
new WholeStageZippedPartitionsRDD(
sparkContext,
inputRDDs,
numaBindingInfo,
sparkConf,
wsCtx,
pipelineTime,
BackendsApiManager.getMetricsApiInstance.metricsUpdatingFunction(
child,
wsCtx.substraitContext.registeredRelMap,
wsCtx.substraitContext.registeredJoinParams,
wsCtx.substraitContext.registeredAggregationParams
),
materializeInput
)
}
finalRdd
}

override def metricsUpdater(): MetricsUpdater = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package org.apache.spark.sql.execution

import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.execution.{ValidatablePlan, WithResourceProfileSupport}
import org.apache.gluten.execution.ValidatablePlan
import org.apache.gluten.extension.columnar.transition.Convention
import org.apache.gluten.metrics.GlutenTimeMetric
import org.apache.gluten.sql.shims.SparkShimLoader
Expand All @@ -42,8 +42,7 @@ import scala.util.control.NonFatal

case class ColumnarBroadcastExchangeExec(mode: BroadcastMode, child: SparkPlan)
extends BroadcastExchangeLike
with ValidatablePlan
with WithResourceProfileSupport {
with ValidatablePlan {

// Note: "metrics" is made transient to avoid sending driver-side metrics to tasks.
@transient override lazy val metrics: Map[String, SQLMetric] =
Expand Down Expand Up @@ -76,8 +75,7 @@ case class ColumnarBroadcastExchangeExec(mode: BroadcastMode, child: SparkPlan)
mode,
child,
longMetric("numOutputRows"),
longMetric("dataSize"),
getResourceProfile)
longMetric("dataSize"))
}

val broadcasted = GlutenTimeMetric.millis(longMetric("broadcastTime")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package org.apache.spark.sql.execution

import org.apache.gluten.GlutenConfig
import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.execution.{ValidatablePlan, WithResourceProfileSupport}
import org.apache.gluten.execution.ValidatablePlan
import org.apache.gluten.extension.ValidationResult
import org.apache.gluten.extension.columnar.transition.Convention
import org.apache.gluten.sql.shims.SparkShimLoader
Expand Down Expand Up @@ -47,8 +47,7 @@ case class ColumnarShuffleExchangeExec(
projectOutputAttributes: Seq[Attribute],
advisoryPartitionSize: Option[Long] = None)
extends ShuffleExchangeLike
with ValidatablePlan
with WithResourceProfileSupport {
with ValidatablePlan {
private[sql] lazy val writeMetrics =
SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext)

Expand Down Expand Up @@ -162,10 +161,6 @@ case class ColumnarShuffleExchangeExec(
override def doExecuteColumnar(): RDD[ColumnarBatch] = {
if (cachedShuffleRDD == null) {
cachedShuffleRDD = new ShuffledColumnarBatchRDD(columnarShuffleDependency, readMetrics)
if (getResourceProfile.isDefined) {
log.info(s"Set resource profile for $child to ${getResourceProfile.get}")
cachedShuffleRDD.withResources(getResourceProfile.get)
}
}
cachedShuffleRDD
}
Expand Down

0 comments on commit 8716bc4

Please sign in to comment.