Skip to content

Commit

Permalink
Add GlutenImplicits to get FallbackSummary easily
Browse files Browse the repository at this point in the history
  • Loading branch information
ulysses-you committed Nov 6, 2023
1 parent 87b3c2e commit a17f88d
Show file tree
Hide file tree
Showing 7 changed files with 402 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@
package org.apache.spark.shuffle

import io.glutenproject.GlutenConfig
import io.glutenproject.backendsapi.clickhouse.CHBackendSettings
import io.glutenproject.memory.alloc.CHNativeMemoryAllocators
import io.glutenproject.memory.memtarget.MemoryTarget
import io.glutenproject.memory.memtarget.Spiller
import io.glutenproject.vectorized._
import io.glutenproject.backendsapi.clickhouse.CHBackendSettings

import org.apache.spark._
import org.apache.spark.scheduler.MapStatus
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions.{Expression, PlanExpression}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.execution.GlutenFallbackReporter.FALLBACK_REASON_TAG
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AdaptiveSparkPlanHelper, QueryStageExec}
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AdaptiveSparkPlanHelper, AQEShuffleReadExec, QueryStageExec}
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import org.apache.spark.sql.execution.command.{DataWritingCommandExec, ExecutedCommandExec}
import org.apache.spark.sql.execution.datasources.v2.V2CommandExec
import org.apache.spark.sql.execution.exchange.{Exchange, ReusedExchangeExec}

import java.util.{IdentityHashMap, Set}
Expand All @@ -38,25 +40,52 @@ import scala.collection.mutable.{ArrayBuffer, BitSet}
// 2. remove `plan.verboseStringWithOperatorId`
// 3. remove codegen id
object GlutenExplainUtils extends AdaptiveSparkPlanHelper {
private def collectFallbackNodes(plan: QueryPlan[_]): (Int, Map[String, String]) = {
var numGlutenNodes = 0
val fallbackNodeToReason = new mutable.HashMap[String, String]
type FallbackInfo = (Int, Map[String, String])

def addFallbackNodeWithReason(p: SparkPlan, reason: String): Unit = {
p.getTagValue(QueryPlan.OP_ID_TAG).foreach {
opId =>
// e.g., 002 project, it is used to help analysis by `substring(4)`
val formattedNodeName = f"$opId%03d ${p.nodeName}"
fallbackNodeToReason.put(formattedNodeName, reason)
}
def addFallbackNodeWithReason(
p: SparkPlan,
reason: String,
fallbackNodeToReason: mutable.HashMap[String, String]): Unit = {
p.getTagValue(QueryPlan.OP_ID_TAG).foreach {
opId =>
// e.g., 002 project, it is used to help analysis by `substring(4)`
val formattedNodeName = f"$opId%03d ${p.nodeName}"
fallbackNodeToReason.put(formattedNodeName, reason)
}
}

def handleVanillaSparkPlan(
p: SparkPlan,
fallbackNodeToReason: mutable.HashMap[String, String]
): Unit = {
p.logicalLink.flatMap(_.getTagValue(FALLBACK_REASON_TAG)) match {
case Some(reason) => addFallbackNodeWithReason(p, reason, fallbackNodeToReason)
case _ =>
// If the SparkPlan does not have fallback reason, then there are two options:
// 1. Gluten ignore that plan and it's a kind of fallback
// 2. Gluten does not support it without the fallback reason
addFallbackNodeWithReason(
p,
"Gluten does not touch it or does not support it",
fallbackNodeToReason)
}
}

private def collectFallbackNodes(plan: QueryPlan[_]): FallbackInfo = {
var numGlutenNodes = 0
val fallbackNodeToReason = new mutable.HashMap[String, String]

def collect(tmp: QueryPlan[_]): Unit = {
tmp.foreachUp {
case _: ExecutedCommandExec =>
case _: CommandResultExec =>
case _: V2CommandExec =>
case _: DataWritingCommandExec =>
case _: WholeStageCodegenExec =>
case _: WholeStageTransformer =>
case _: InputAdapter =>
case p: AdaptiveSparkPlanExec => collect(p.executedPlan)
case _: ColumnarToRowTransition =>
case _: RowToColumnarTransition =>
case p: QueryStageExec => collect(p.plan)
case p: GlutenPlan =>
numGlutenNodes += 1
Expand All @@ -65,17 +94,11 @@ object GlutenExplainUtils extends AdaptiveSparkPlanHelper {
if (InMemoryTableScanHelper.isGlutenTableCache(i)) {
numGlutenNodes += 1
} else {
addFallbackNodeWithReason(i, "Columnar table cache is disabled")
addFallbackNodeWithReason(i, "Columnar table cache is disabled", fallbackNodeToReason)
}
case _: AQEShuffleReadExec => // Ignore
case p: SparkPlan =>
p.logicalLink.flatMap(_.getTagValue(FALLBACK_REASON_TAG)) match {
case Some(reason) => addFallbackNodeWithReason(p, reason)
case _ =>
// If the SparkPlan does not have fallback reason, then there are two options:
// 1. Gluten ignore that plan and it's a kind of fallback
// 2. Gluten does not support it without the fallback reason
addFallbackNodeWithReason(p, "Gluten does not touch it or does not support it")
}
handleVanillaSparkPlan(p, fallbackNodeToReason)
p.innerChildren.foreach(collect)
case _ =>
}
Expand Down Expand Up @@ -120,7 +143,8 @@ object GlutenExplainUtils extends AdaptiveSparkPlanHelper {
*/
def processPlan[T <: QueryPlan[T]](
plan: T,
append: String => Unit): (Int, Map[String, String]) = {
append: String => Unit,
collectFallbackFunc: Option[QueryPlan[_] => FallbackInfo] = None): FallbackInfo = {
try {
// Initialize a reference-unique set of Operators to avoid accdiental overwrites and to allow
// intentional overwriting of IDs generated in previous AQE iteration
Expand Down Expand Up @@ -186,7 +210,11 @@ object GlutenExplainUtils extends AdaptiveSparkPlanHelper {
append("\n")
}

collectFallbackNodes(plan)
if (collectFallbackFunc.isEmpty) {
collectFallbackNodes(plan)
} else {
collectFallbackFunc.get.apply(plan)
}
} finally {
removeTags(plan)
}
Expand Down Expand Up @@ -310,11 +338,11 @@ object GlutenExplainUtils extends AdaptiveSparkPlanHelper {
* Returns the operator identifier for the supplied plan by retrieving the `operationId` tag
* value.
*/
def getOpId(plan: QueryPlan[_]): String = {
private def getOpId(plan: QueryPlan[_]): String = {
plan.getTagValue(QueryPlan.OP_ID_TAG).map(v => s"$v").getOrElse("unknown")
}

def removeTags(plan: QueryPlan[_]): Unit = {
private def removeTags(plan: QueryPlan[_]): Unit = {
def remove(p: QueryPlan[_], children: Seq[QueryPlan[_]]): Unit = {
p.unsetTagValue(QueryPlan.OP_ID_TAG)
children.foreach(removeTags)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution

import io.glutenproject.execution.WholeStageTransformer
import io.glutenproject.extension.{GlutenPlan, InMemoryTableScanHelper}

import org.apache.spark.sql.{AnalysisException, Dataset}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.{CommandResult, LogicalPlan}
import org.apache.spark.sql.catalyst.util.StringUtils.PlanStringConcat
import org.apache.spark.sql.execution.GlutenExplainUtils._
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AQEShuffleReadExec, QueryStageExec}
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import org.apache.spark.sql.execution.command.{DataWritingCommandExec, ExecutedCommandExec}
import org.apache.spark.sql.execution.datasources.v2.V2CommandExec
import org.apache.spark.sql.internal.SQLConf

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

// spotless:off
/**
* A helper class to get the Gluten fallback summary from a Spark [[Dataset]].
*
* Note that, if AQE is enabled, but the query is not materialized, then this method will re-plan
* the query execution with disabled AQE. It is a workaround to get the final plan, and it may
* cause the inconsistent results with a materialized query. However, we have no choice.
*
* For example:
*
* {{{
* import org.apache.spark.sql.execution.GlutenImplicits._
* val df = spark.sql("SELECT * FROM t")
* df.fallbackSummary
* }}}
*/
// spotless:on
object GlutenImplicits {

case class FallbackSummary(
numGlutenNodes: Int,
numFallbackNodes: Int,
physicalPlanDescription: Seq[String],
fallbackNodeToReason: Seq[Map[String, String]]) {}

private[sql] def withSQLConf[T](pairs: (String, String)*)(f: => T): T = {
val conf = SQLConf.get
val (keys, values) = pairs.unzip
val currentValues = keys.map {
key =>
if (conf.contains(key)) {
Some(conf.getConfString(key))
} else {
None
}
}
keys.zip(values).foreach {
case (k, v) =>
if (SQLConf.isStaticConfigKey(k)) {
throw new AnalysisException(s"Cannot modify the value of a static config: $k")
}
conf.setConfString(k, v)
}
try f
finally {
keys.zip(currentValues).foreach {
case (key, Some(value)) => conf.setConfString(key, value)
case (key, None) => conf.unsetConf(key)
}
}
}

implicit class DatasetTransformer[T](dateset: Dataset[T]) {
private def isFinalAdaptivePlan(p: AdaptiveSparkPlanExec): Boolean = {
val args = p.argString(Int.MaxValue)
val index = args.indexOf("isFinalPlan=")
assert(index >= 0)
args.substring(index + "isFinalPlan=".length).trim.toBoolean
}

private def collectFallbackNodes(plan: QueryPlan[_]): FallbackInfo = {
var numGlutenNodes = 0
val fallbackNodeToReason = new mutable.HashMap[String, String]

def collect(tmp: QueryPlan[_]): Unit = {
tmp.foreachUp {
case _: ExecutedCommandExec =>
case _: CommandResultExec =>
case _: V2CommandExec =>
case _: DataWritingCommandExec =>
case _: WholeStageCodegenExec =>
case _: WholeStageTransformer =>
case _: InputAdapter =>
case _: ColumnarToRowTransition =>
case _: RowToColumnarTransition =>
case p: AdaptiveSparkPlanExec if isFinalAdaptivePlan(p) =>
collect(p.executedPlan)
case p: AdaptiveSparkPlanExec =>
// if we are here that means we are inside table cache.
val (innerNumGlutenNodes, innerFallbackNodeToReason) =
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
// re-plan manually to skip cached data
val newSparkPlan = QueryExecution.createSparkPlan(
dateset.sparkSession,
dateset.sparkSession.sessionState.planner,
p.inputPlan.logicalLink.get)
val newExecutedPlan = QueryExecution.prepareExecutedPlan(
dateset.sparkSession,
newSparkPlan
)
processPlan(
newExecutedPlan,
new PlanStringConcat().append,
Some(plan => collectFallbackNodes(plan)))
}
numGlutenNodes += innerNumGlutenNodes
fallbackNodeToReason.++=(innerFallbackNodeToReason)
case p: QueryStageExec => collect(p.plan)
case p: GlutenPlan =>
numGlutenNodes += 1
p.innerChildren.foreach(collect)
case i: InMemoryTableScanExec =>
if (InMemoryTableScanHelper.isGlutenTableCache(i)) {
numGlutenNodes += 1
} else {
addFallbackNodeWithReason(i, "Columnar table cache is disabled", fallbackNodeToReason)
}
collect(i.relation.cachedPlan)
case _: AQEShuffleReadExec => // Ignore
case p: SparkPlan =>
handleVanillaSparkPlan(p, fallbackNodeToReason)
p.innerChildren.foreach(collect)
case _ =>
}
}

collect(plan)
(numGlutenNodes, fallbackNodeToReason.toMap)
}

private def collectQueryExecutionFallbackSummary(qe: QueryExecution): FallbackSummary = {
var totalNumGlutenNodes = 0
var totalNumFallbackNodes = 0
val totalPhysicalPlanDescription = new ArrayBuffer[String]()
val totalFallbackNodeToReason = new ArrayBuffer[Map[String, String]]()

def handlePlanWithAQEAndTableCache(
plan: SparkPlan,
logicalPlan: LogicalPlan,
isMaterialized: Boolean): Unit = {
val concat = new PlanStringConcat()
val collectFallbackFunc = Some(plan => collectFallbackNodes(plan))
val (numGlutenNodes, fallbackNodeToReason) = if (!isMaterialized) {
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
// AQE is not materialized, so the columnar rules are not applied.
// For this case, We apply columnar rules manually with disable AQE.
val qe = dateset.sparkSession.sessionState.executePlan(logicalPlan)
processPlan(qe.executedPlan, concat.append, collectFallbackFunc)
}
} else {
processPlan(plan, concat.append, collectFallbackFunc)
}
totalNumGlutenNodes += numGlutenNodes
totalNumFallbackNodes += fallbackNodeToReason.size
totalPhysicalPlanDescription.append(concat.toString())
totalFallbackNodeToReason.append(fallbackNodeToReason)
}

// For command-like query, e.g., `INSERT INTO TABLE ...`
qe.commandExecuted.foreach {
case r: CommandResult =>
handlePlanWithAQEAndTableCache(r.commandPhysicalPlan, r.commandLogicalPlan, true)
case _ => // ignore
}

// For query, e.g., `SELECT * FROM ...`
if (qe.executedPlan.find(_.isInstanceOf[CommandResultExec]).isEmpty) {
val isMaterialized = qe.executedPlan.find {
case a: AdaptiveSparkPlanExec if isFinalAdaptivePlan(a) => true
case _ => false
}.isDefined
handlePlanWithAQEAndTableCache(qe.executedPlan, qe.analyzed, isMaterialized)
}

FallbackSummary(
totalNumGlutenNodes,
totalNumFallbackNodes,
totalPhysicalPlanDescription,
totalFallbackNodeToReason
)
}

def fallbackSummary(): FallbackSummary = {
collectQueryExecutionFallbackSummary(dateset.queryExecution)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2014,6 +2014,9 @@ class ClickHouseTestSettings extends BackendTestSettings {
"SELECT structFieldSimple.key, arrayFieldSimple[1] FROM tableWithSchema a where int_Field=1")
.exclude("SELECT structFieldComplex.Value.`value_(2)` FROM tableWithSchema")
enableSuite[SparkFunctionStatistics]

enableSuite[GlutenImplicitsTest]
.exclude("fallbackSummary with shuffle")
.exclude("fallbackSummary with cache")
.exclude("fallbackSummary with cached data and shuffle")
}
// scalastyle:on line.size.limit
Original file line number Diff line number Diff line change
Expand Up @@ -1164,5 +1164,6 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[GlutenHiveSQLQuerySuite]
// ReaderFactory is not registered for format orc.
.exclude("hive orc scan")
enableSuite[GlutenImplicitsTest]
}
// scalastyle:on line.size.limit
Loading

0 comments on commit a17f88d

Please sign in to comment.