Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CORE] Improve merge fallback reason #3981

Merged
merged 2 commits into from
Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package io.glutenproject.extension

import io.glutenproject.GlutenConfig
import io.glutenproject.execution.BroadcastHashJoinExecTransformer
import io.glutenproject.extension.columnar.TransformHints
import io.glutenproject.extension.columnar.{TRANSFORM_UNSUPPORTED, TransformHints}

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
Expand Down Expand Up @@ -247,7 +247,9 @@ case class ExpandFallbackPolicy(isAdaptiveContext: Boolean, originalPlan: SparkP
val reason = fallback(plan)
if (reason.isDefined) {
val fallbackPlan = fallbackToRowBasedPlan()
TransformHints.tagAllNotTransformable(fallbackPlan, reason.get)
TransformHints.tagAllNotTransformable(
fallbackPlan,
TRANSFORM_UNSUPPORTED(reason, appendReasonIfExists = false))
FallbackNode(fallbackPlan)
} else {
plan
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ trait TransformHint {
}

case class TRANSFORM_SUPPORTED() extends TransformHint
case class TRANSFORM_UNSUPPORTED(reason: Option[String]) extends TransformHint
case class TRANSFORM_UNSUPPORTED(reason: Option[String], appendReasonIfExists: Boolean = true)
extends TransformHint

object TransformHints {
val TAG: TreeNodeTag[TransformHint] =
Expand All @@ -77,11 +78,19 @@ object TransformHints {
def tag(plan: SparkPlan, hint: TransformHint): Unit = {
val mergedHint = getHintOption(plan)
.map {
case originalHint @ TRANSFORM_UNSUPPORTED(Some(originalReason)) =>
case originalHint @ TRANSFORM_UNSUPPORTED(Some(originalReason), originAppend) =>
hint match {
case TRANSFORM_UNSUPPORTED(Some(newReason)) =>
TRANSFORM_UNSUPPORTED(Some(originalReason + "; " + newReason))
case TRANSFORM_UNSUPPORTED(None) =>
case TRANSFORM_UNSUPPORTED(Some(newReason), append) =>
if (originAppend && append) {
TRANSFORM_UNSUPPORTED(Some(originalReason + "; " + newReason))
} else if (originAppend) {
TRANSFORM_UNSUPPORTED(Some(originalReason))
} else if (append) {
TRANSFORM_UNSUPPORTED(Some(newReason))
} else {
TRANSFORM_UNSUPPORTED(Some(originalReason), false)
}
case TRANSFORM_UNSUPPORTED(None, _) =>
originalHint
case _ =>
throw new UnsupportedOperationException(
Expand Down Expand Up @@ -113,10 +122,10 @@ object TransformHints {
tag(plan, TRANSFORM_UNSUPPORTED(Some(reason)))
}

def tagAllNotTransformable(plan: SparkPlan, reason: String): Unit = {
def tagAllNotTransformable(plan: SparkPlan, hint: TRANSFORM_UNSUPPORTED): Unit = {
plan.foreach {
case _: GlutenPlan => // ignore
case other => tagNotTransformable(other, reason)
case other => tag(other, hint)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,24 +55,41 @@ case class GlutenFallbackReporter(glutenConfig: GlutenConfig, spark: SparkSessio

private def printFallbackReason(plan: SparkPlan): Unit = {
val validationLogLevel = glutenConfig.validationLogLevel
plan.foreach {
plan.foreachUp {
case _: GlutenPlan => // ignore
case plan: SparkPlan =>
plan.logicalLink.flatMap(_.getTagValue(FALLBACK_REASON_TAG)) match {
case Some(_) => // We have logged it before, so skip it
case _ =>
// some SparkPlan do not have hint, e.g., `ColumnarAQEShuffleRead`
TransformHints.getHintOption(plan) match {
case Some(TRANSFORM_UNSUPPORTED(reason)) =>
logFallbackReason(validationLogLevel, plan.nodeName, reason.getOrElse(""))
// With in next round stage in AQE, the physical plan would be a new instance that
// can not preserve the tag, so we need to set the fallback reason to logical plan.
// Then we can be aware of the fallback reason for the whole plan.
plan.logicalLink.foreach {
logicalPlan => logicalPlan.setTagValue(FALLBACK_REASON_TAG, reason.getOrElse(""))
}
case _ =>
TransformHints.getHintOption(plan) match {
case Some(TRANSFORM_UNSUPPORTED(Some(reason), append)) =>
logFallbackReason(validationLogLevel, plan.nodeName, reason)
// With in next round stage in AQE, the physical plan would be a new instance that
// can not preserve the tag, so we need to set the fallback reason to logical plan.
// Then we can be aware of the fallback reason for the whole plan.
// If a logical plan mapping to several physical plan, we add all reason into
// that logical plan to make sure we do not lose any fallback reason.
plan.logicalLink.foreach {
logicalPlan =>
val newReason = logicalPlan
.getTagValue(FALLBACK_REASON_TAG)
.map {
lastReason =>
if (!append) {
lastReason
} else if (lastReason.contains(reason)) {
// use the last reason, as the reason is redundant
lastReason
} else if (reason.contains(lastReason)) {
// overwrite the reason
reason
} else {
// add the new reason
lastReason + "; " + reason
}
}
.getOrElse(reason)
logicalPlan.setTagValue(FALLBACK_REASON_TAG, newReason)
}

case _ =>
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,18 @@
package org.apache.spark.sql.gluten

import io.glutenproject.{GlutenConfig, VERSION}
import io.glutenproject.events.GlutenPlanFallbackEvent
import io.glutenproject.execution.FileSourceScanExecTransformer

import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent}
import org.apache.spark.sql.GlutenSQLTestsTrait
import org.apache.spark.sql.{GlutenSQLTestsTrait, Row}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.ui.{GlutenSQLAppStatusStore, SparkListenerSQLExecutionStart}
import org.apache.spark.status.ElementTrackingStore

class GlutenFallbackSuite extends GlutenSQLTestsTrait {
import scala.collection.mutable.ArrayBuffer

class GlutenFallbackSuite extends GlutenSQLTestsTrait with AdaptiveSparkPlanHelper {

test("test fallback logging") {
val testAppender = new LogAppender("fallback reason")
Expand Down Expand Up @@ -101,4 +106,43 @@ class GlutenFallbackSuite extends GlutenSQLTestsTrait {
.contains("Gluten does not touch it or does not support it"))
}
}

test("Improve merge fallback reason") {
spark.sql("create table t using parquet as select 1 as c1, timestamp '2023-01-01' as c2")
withTable("t") {
val events = new ArrayBuffer[GlutenPlanFallbackEvent]
val listener = new SparkListener {
override def onOtherEvent(event: SparkListenerEvent): Unit = {
event match {
case e: GlutenPlanFallbackEvent => events.append(e)
case _ =>
}
}
}
spark.sparkContext.addSparkListener(listener)
withSQLConf(GlutenConfig.COLUMNAR_WHOLESTAGE_FALLBACK_THRESHOLD.key -> "1") {
try {
val df =
spark.sql("select c1, count(*) from t where c2 > timestamp '2022-01-01' group by c1")
checkAnswer(df, Row(1, 1))
spark.sparkContext.listenerBus.waitUntilEmpty()

// avoid failing when we support transform timestamp filter in future
val isFallback = find(df.queryExecution.executedPlan) {
_.isInstanceOf[FileSourceScanExecTransformer]
}.isEmpty
if (isFallback) {
events.exists(
_.fallbackNodeToReason.values.exists(
_.contains("Subfield filters creation not supported for input type 'TIMESTAMP'")))
events.exists(
_.fallbackNodeToReason.values.exists(
_.contains("Timestamp is not fully supported in Filter")))
}
} finally {
spark.sparkContext.removeSparkListener(listener)
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,18 @@
package org.apache.spark.sql.gluten

import io.glutenproject.{GlutenConfig, VERSION}
import io.glutenproject.events.GlutenPlanFallbackEvent
import io.glutenproject.execution.FileSourceScanExecTransformer

import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent}
import org.apache.spark.sql.GlutenSQLTestsTrait
import org.apache.spark.sql.{GlutenSQLTestsTrait, Row}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.ui.{GlutenSQLAppStatusStore, SparkListenerSQLExecutionStart}
import org.apache.spark.status.ElementTrackingStore

class GlutenFallbackSuite extends GlutenSQLTestsTrait {
import scala.collection.mutable.ArrayBuffer

class GlutenFallbackSuite extends GlutenSQLTestsTrait with AdaptiveSparkPlanHelper {

ignore("test fallback logging") {
val testAppender = new LogAppender("fallback reason")
Expand Down Expand Up @@ -101,4 +106,43 @@ class GlutenFallbackSuite extends GlutenSQLTestsTrait {
.contains("Gluten does not touch it or does not support it"))
}
}

test("Improve merge fallback reason") {
spark.sql("create table t using parquet as select 1 as c1, timestamp '2023-01-01' as c2")
withTable("t") {
val events = new ArrayBuffer[GlutenPlanFallbackEvent]
val listener = new SparkListener {
override def onOtherEvent(event: SparkListenerEvent): Unit = {
event match {
case e: GlutenPlanFallbackEvent => events.append(e)
case _ =>
}
}
}
spark.sparkContext.addSparkListener(listener)
withSQLConf(GlutenConfig.COLUMNAR_WHOLESTAGE_FALLBACK_THRESHOLD.key -> "1") {
try {
val df =
spark.sql("select c1, count(*) from t where c2 > timestamp '2022-01-01' group by c1")
checkAnswer(df, Row(1, 1))
spark.sparkContext.listenerBus.waitUntilEmpty()

// avoid failing when we support transform timestamp filter in future
val isFallback = find(df.queryExecution.executedPlan) {
_.isInstanceOf[FileSourceScanExecTransformer]
}.isEmpty
if (isFallback) {
events.exists(
_.fallbackNodeToReason.values.exists(
_.contains("Subfield filters creation not supported for input type 'TIMESTAMP'")))
events.exists(
_.fallbackNodeToReason.values.exists(
_.contains("Timestamp is not fully supported in Filter")))
}
} finally {
spark.sparkContext.removeSparkListener(listener)
}
}
}
}
}
Loading