Skip to content

Commit

Permalink
fix: Fallback to Spark for unsupported input besides ordering
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed Aug 2, 2024
1 parent 39e030b commit cef4884
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2926,7 +2926,8 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim
case HashPartitioning(expressions, _) =>
val supported =
expressions.map(QueryPlanSerde.exprToProto(_, inputs)).forall(_.isDefined) &&
expressions.forall(e => supportedDataType(e.dataType))
expressions.forall(e => supportedDataType(e.dataType)) &&
inputs.forall(attr => supportedDataType(attr.dataType))
if (!supported) {
msg = s"unsupported Spark partitioning expressions: $expressions"
}
Expand All @@ -2936,7 +2937,8 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim
case RangePartitioning(orderings, _) =>
val supported =
orderings.map(QueryPlanSerde.exprToProto(_, inputs)).forall(_.isDefined) &&
orderings.forall(e => supportedDataType(e.dataType))
orderings.forall(e => supportedDataType(e.dataType)) &&
inputs.forall(attr => supportedDataType(attr.dataType))
if (!supported) {
msg = s"unsupported Spark partitioning expressions: $orderings"
}
Expand Down Expand Up @@ -2975,7 +2977,8 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim
case HashPartitioning(expressions, _) =>
val supported =
expressions.map(QueryPlanSerde.exprToProto(_, inputs)).forall(_.isDefined) &&
expressions.forall(e => supportedDataType(e.dataType))
expressions.forall(e => supportedDataType(e.dataType)) &&
inputs.forall(attr => supportedDataType(attr.dataType))
if (!supported) {
msg = s"unsupported Spark partitioning expressions: $expressions"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@

package org.apache.comet.exec

import scala.util.Random

import org.scalactic.source.Position
import org.scalatest.Tag

import org.apache.hadoop.fs.Path
import org.apache.spark.{Partitioner, SparkConf}
import org.apache.spark.sql.{CometTestBase, DataFrame, Row}
import org.apache.spark.sql.{CometTestBase, DataFrame, RandomDataGenerator, Row}
import org.apache.spark.sql.comet.execution.shuffle.{CometShuffleDependency, CometShuffleExchangeExec, CometShuffleManager}
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, AQEShuffleReadExec, ShuffleQueryStageExec}
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
Expand Down Expand Up @@ -68,17 +70,35 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar

test("Unsupported types for SinglePartition should fallback to Spark") {
checkSparkAnswer(spark.sql("""
|SELECT
| AVG(null),
| COUNT(null),
| FIRST(null),
| LAST(null),
| MAX(null),
| MIN(null),
| SUM(null)
|SELECT
| AVG(null),
| COUNT(null),
| FIRST(null),
| LAST(null),
| MAX(null),
| MIN(null),
| SUM(null)
""".stripMargin))
}

test("Fallback to Spark for unsupported input besides ordering") {
val dataGenerator = RandomDataGenerator
.forType(
dataType = NullType,
nullable = true,
new Random(System.nanoTime()),
validJulianDatetime = false)
.get

val schema = new StructType()
.add("index", IntegerType, nullable = false)
.add("col", NullType, nullable = true)
val rdd =
spark.sparkContext.parallelize((1 to 20).map(i => Row(i, dataGenerator())))
val df = spark.createDataFrame(rdd, schema).orderBy("index").coalesce(1)
checkSparkAnswer(df)
}

test("Disable Comet shuffle with AQE coalesce partitions enabled") {
Seq(true, false).foreach { coalescePartitionsEnabled =>
withSQLConf(
Expand Down

0 comments on commit cef4884

Please sign in to comment.