Skip to content

Commit

Permalink
perf: Improve query planning to more reliably fall back to columnar s…
Browse files Browse the repository at this point in the history
…huffle when native shuffle is not supported (apache#1209)
  • Loading branch information
andygrove authored Jan 9, 2025
1 parent be48839 commit d15d051
Show file tree
Hide file tree
Showing 356 changed files with 30,950 additions and 28,502 deletions.
7 changes: 7 additions & 0 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,13 @@ object CometConf extends ShimCometConf {
.checkValues(Set("native", "jvm", "auto"))
.createWithDefault("auto")

val COMET_SHUFFLE_FALLBACK_TO_COLUMNAR: ConfigEntry[Boolean] =
conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.fallbackToColumnar")
.doc("Whether to try falling back to columnar shuffle when native shuffle is not supported")
.internal()
.booleanConf
.createWithDefault(false)

val COMET_EXEC_BROADCAST_FORCE_ENABLED: ConfigEntry[Boolean] =
conf(s"$COMET_EXEC_CONFIG_PREFIX.broadcast.enabled")
.doc(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -788,68 +788,93 @@ class CometSparkSessionExtensions
}

// Native shuffle for Comet operators
case s: ShuffleExchangeExec
if isCometShuffleEnabled(conf) &&
isCometNativeShuffleMode(conf) &&
QueryPlanSerde.supportPartitioning(s.child.output, s.outputPartitioning)._1 =>
logInfo("Comet extension enabled for Native Shuffle")
case s: ShuffleExchangeExec =>
val nativePrecondition = isCometShuffleEnabled(conf) &&
isCometNativeShuffleMode(conf) &&
QueryPlanSerde.supportPartitioning(s.child.output, s.outputPartitioning)._1

val newOp = transform1(s)
newOp match {
case Some(nativeOp) =>
// Switch to use Decimal128 regardless of precision, since Arrow native execution
// doesn't support Decimal32 and Decimal64 yet.
conf.setConfString(CometConf.COMET_USE_DECIMAL_128.key, "true")
val cometOp = CometShuffleExchangeExec(s, shuffleType = CometNativeShuffle)
CometSinkPlaceHolder(nativeOp, s, cometOp)
case None =>
s
}
val nativeShuffle: Option[SparkPlan] =
if (nativePrecondition) {
val newOp = transform1(s)
newOp match {
case Some(nativeOp) =>
// Switch to use Decimal128 regardless of precision, since Arrow native execution
// doesn't support Decimal32 and Decimal64 yet.
conf.setConfString(CometConf.COMET_USE_DECIMAL_128.key, "true")
val cometOp = CometShuffleExchangeExec(s, shuffleType = CometNativeShuffle)
Some(CometSinkPlaceHolder(nativeOp, s, cometOp))
case None =>
None
}
} else {
None
}

// Columnar shuffle for regular Spark operators (not Comet) and Comet operators
// (if configured).
// If the child of ShuffleExchangeExec is also a ShuffleExchangeExec, we should not
// convert it to CometColumnarShuffle,
case s: ShuffleExchangeExec
if isCometShuffleEnabled(conf) && isCometJVMShuffleMode(conf) &&
// this is a temporary workaround because some Spark SQL tests fail
// when we enable COMET_SHUFFLE_FALLBACK_TO_COLUMNAR due to valid bugs
// that we had not previously seen
val tryColumnarNext =
!nativePrecondition || (nativePrecondition && nativeShuffle.isEmpty &&
COMET_SHUFFLE_FALLBACK_TO_COLUMNAR.get(conf))

val nativeOrColumnarShuffle = if (nativeShuffle.isDefined) {
nativeShuffle
} else if (tryColumnarNext) {
// Columnar shuffle for regular Spark operators (not Comet) and Comet operators
// (if configured).
// If the child of ShuffleExchangeExec is also a ShuffleExchangeExec, we should not
// convert it to CometColumnarShuffle,
if (isCometShuffleEnabled(conf) && isCometJVMShuffleMode(conf) &&
QueryPlanSerde.supportPartitioningTypes(s.child.output, s.outputPartitioning)._1 &&
!isShuffleOperator(s.child) =>
logInfo("Comet extension enabled for JVM Columnar Shuffle")
!isShuffleOperator(s.child)) {

val newOp = QueryPlanSerde.operator2Proto(s)
newOp match {
case Some(nativeOp) =>
s.child match {
case n if n.isInstanceOf[CometNativeExec] || !n.supportsColumnar =>
val cometOp = CometShuffleExchangeExec(s, shuffleType = CometColumnarShuffle)
CometSinkPlaceHolder(nativeOp, s, cometOp)
case _ =>
s
val newOp = QueryPlanSerde.operator2Proto(s)
newOp match {
case Some(nativeOp) =>
s.child match {
case n if n.isInstanceOf[CometNativeExec] || !n.supportsColumnar =>
val cometOp =
CometShuffleExchangeExec(s, shuffleType = CometColumnarShuffle)
Some(CometSinkPlaceHolder(nativeOp, s, cometOp))
case _ =>
None
}
case None =>
None
}
case None =>
s
} else {
None
}
} else {
None
}

case s: ShuffleExchangeExec =>
val isShuffleEnabled = isCometShuffleEnabled(conf)
val outputPartitioning = s.outputPartitioning
val reason = getCometShuffleNotEnabledReason(conf).getOrElse("no reason available")
val msg1 = createMessage(!isShuffleEnabled, s"Comet shuffle is not enabled: $reason")
val columnarShuffleEnabled = isCometJVMShuffleMode(conf)
val msg2 = createMessage(
isShuffleEnabled && !columnarShuffleEnabled && !QueryPlanSerde
.supportPartitioning(s.child.output, outputPartitioning)
._1,
"Native shuffle: " +
s"${QueryPlanSerde.supportPartitioning(s.child.output, outputPartitioning)._2}")
val msg3 = createMessage(
isShuffleEnabled && columnarShuffleEnabled && !QueryPlanSerde
if (nativeOrColumnarShuffle.isDefined) {
nativeOrColumnarShuffle.get
} else {
val isShuffleEnabled = isCometShuffleEnabled(conf)
val outputPartitioning = s.outputPartitioning
val reason = getCometShuffleNotEnabledReason(conf).getOrElse("no reason available")
val msg1 = createMessage(!isShuffleEnabled, s"Comet shuffle is not enabled: $reason")
val columnarShuffleEnabled = isCometJVMShuffleMode(conf)
val msg2 = createMessage(
isShuffleEnabled && !columnarShuffleEnabled && !QueryPlanSerde
.supportPartitioning(s.child.output, outputPartitioning)
._1,
"Native shuffle: " +
s"${QueryPlanSerde.supportPartitioning(s.child.output, outputPartitioning)._2}")
val typeInfo = QueryPlanSerde
.supportPartitioningTypes(s.child.output, outputPartitioning)
._1,
"JVM shuffle: " +
s"${QueryPlanSerde.supportPartitioningTypes(s.child.output, outputPartitioning)._2}")
withInfo(s, Seq(msg1, msg2, msg3).flatten.mkString(","))
s
._2
val msg3 = createMessage(
isShuffleEnabled && columnarShuffleEnabled && !QueryPlanSerde
.supportPartitioningTypes(s.child.output, outputPartitioning)
._1,
"JVM shuffle: " +
s"$typeInfo")
withInfo(s, Seq(msg1, msg2, msg3).flatten.mkString(","))
s
}

case op =>
op match {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,49 +1,50 @@
== Physical Plan ==
TakeOrderedAndProject (45)
+- * HashAggregate (44)
+- Exchange (43)
+- * HashAggregate (42)
+- * Project (41)
+- * BroadcastHashJoin Inner BuildRight (40)
:- * Project (35)
: +- * BroadcastHashJoin Inner BuildRight (34)
: :- * Project (28)
: : +- * Filter (27)
: : +- * BroadcastHashJoin ExistenceJoin(exists#1) BuildRight (26)
: : :- * BroadcastHashJoin ExistenceJoin(exists#2) BuildRight (19)
: : : :- * CometColumnarToRow (12)
: : : : +- CometBroadcastHashJoin (11)
: : : : :- CometFilter (2)
: : : : : +- CometScan parquet spark_catalog.default.customer (1)
: : : : +- CometBroadcastExchange (10)
: : : : +- CometProject (9)
: : : : +- CometBroadcastHashJoin (8)
: : : : :- CometScan parquet spark_catalog.default.store_sales (3)
: : : : +- CometBroadcastExchange (7)
: : : : +- CometProject (6)
: : : : +- CometFilter (5)
: : : : +- CometScan parquet spark_catalog.default.date_dim (4)
: : : +- BroadcastExchange (18)
: : : +- * CometColumnarToRow (17)
: : : +- CometProject (16)
: : : +- CometBroadcastHashJoin (15)
: : : :- CometScan parquet spark_catalog.default.web_sales (13)
: : : +- ReusedExchange (14)
: : +- BroadcastExchange (25)
: : +- * CometColumnarToRow (24)
: : +- CometProject (23)
: : +- CometBroadcastHashJoin (22)
: : :- CometScan parquet spark_catalog.default.catalog_sales (20)
: : +- ReusedExchange (21)
: +- BroadcastExchange (33)
: +- * CometColumnarToRow (32)
: +- CometProject (31)
: +- CometFilter (30)
: +- CometScan parquet spark_catalog.default.customer_address (29)
+- BroadcastExchange (39)
+- * CometColumnarToRow (38)
+- CometFilter (37)
+- CometScan parquet spark_catalog.default.customer_demographics (36)
TakeOrderedAndProject (46)
+- * HashAggregate (45)
+- * CometColumnarToRow (44)
+- CometColumnarExchange (43)
+- * HashAggregate (42)
+- * Project (41)
+- * BroadcastHashJoin Inner BuildRight (40)
:- * Project (35)
: +- * BroadcastHashJoin Inner BuildRight (34)
: :- * Project (28)
: : +- * Filter (27)
: : +- * BroadcastHashJoin ExistenceJoin(exists#1) BuildRight (26)
: : :- * BroadcastHashJoin ExistenceJoin(exists#2) BuildRight (19)
: : : :- * CometColumnarToRow (12)
: : : : +- CometBroadcastHashJoin (11)
: : : : :- CometFilter (2)
: : : : : +- CometScan parquet spark_catalog.default.customer (1)
: : : : +- CometBroadcastExchange (10)
: : : : +- CometProject (9)
: : : : +- CometBroadcastHashJoin (8)
: : : : :- CometScan parquet spark_catalog.default.store_sales (3)
: : : : +- CometBroadcastExchange (7)
: : : : +- CometProject (6)
: : : : +- CometFilter (5)
: : : : +- CometScan parquet spark_catalog.default.date_dim (4)
: : : +- BroadcastExchange (18)
: : : +- * CometColumnarToRow (17)
: : : +- CometProject (16)
: : : +- CometBroadcastHashJoin (15)
: : : :- CometScan parquet spark_catalog.default.web_sales (13)
: : : +- ReusedExchange (14)
: : +- BroadcastExchange (25)
: : +- * CometColumnarToRow (24)
: : +- CometProject (23)
: : +- CometBroadcastHashJoin (22)
: : :- CometScan parquet spark_catalog.default.catalog_sales (20)
: : +- ReusedExchange (21)
: +- BroadcastExchange (33)
: +- * CometColumnarToRow (32)
: +- CometProject (31)
: +- CometFilter (30)
: +- CometScan parquet spark_catalog.default.customer_address (29)
+- BroadcastExchange (39)
+- * CometColumnarToRow (38)
+- CometFilter (37)
+- CometScan parquet spark_catalog.default.customer_demographics (36)


(1) CometScan parquet spark_catalog.default.customer
Expand Down Expand Up @@ -243,50 +244,53 @@ Functions [1]: [partial_count(1)]
Aggregate Attributes [1]: [count#31]
Results [9]: [cd_gender#23, cd_marital_status#24, cd_education_status#25, cd_purchase_estimate#26, cd_credit_rating#27, cd_dep_count#28, cd_dep_employed_count#29, cd_dep_college_count#30, count#32]

(43) Exchange
(43) CometColumnarExchange
Input [9]: [cd_gender#23, cd_marital_status#24, cd_education_status#25, cd_purchase_estimate#26, cd_credit_rating#27, cd_dep_count#28, cd_dep_employed_count#29, cd_dep_college_count#30, count#32]
Arguments: hashpartitioning(cd_gender#23, cd_marital_status#24, cd_education_status#25, cd_purchase_estimate#26, cd_credit_rating#27, cd_dep_count#28, cd_dep_employed_count#29, cd_dep_college_count#30, 5), ENSURE_REQUIREMENTS, [plan_id=5]
Arguments: hashpartitioning(cd_gender#23, cd_marital_status#24, cd_education_status#25, cd_purchase_estimate#26, cd_credit_rating#27, cd_dep_count#28, cd_dep_employed_count#29, cd_dep_college_count#30, 5), ENSURE_REQUIREMENTS, CometColumnarShuffle, [plan_id=5]

(44) HashAggregate [codegen id : 6]
(44) CometColumnarToRow [codegen id : 6]
Input [9]: [cd_gender#23, cd_marital_status#24, cd_education_status#25, cd_purchase_estimate#26, cd_credit_rating#27, cd_dep_count#28, cd_dep_employed_count#29, cd_dep_college_count#30, count#32]

(45) HashAggregate [codegen id : 6]
Input [9]: [cd_gender#23, cd_marital_status#24, cd_education_status#25, cd_purchase_estimate#26, cd_credit_rating#27, cd_dep_count#28, cd_dep_employed_count#29, cd_dep_college_count#30, count#32]
Keys [8]: [cd_gender#23, cd_marital_status#24, cd_education_status#25, cd_purchase_estimate#26, cd_credit_rating#27, cd_dep_count#28, cd_dep_employed_count#29, cd_dep_college_count#30]
Functions [1]: [count(1)]
Aggregate Attributes [1]: [count(1)#33]
Results [14]: [cd_gender#23, cd_marital_status#24, cd_education_status#25, count(1)#33 AS cnt1#34, cd_purchase_estimate#26, count(1)#33 AS cnt2#35, cd_credit_rating#27, count(1)#33 AS cnt3#36, cd_dep_count#28, count(1)#33 AS cnt4#37, cd_dep_employed_count#29, count(1)#33 AS cnt5#38, cd_dep_college_count#30, count(1)#33 AS cnt6#39]

(45) TakeOrderedAndProject
(46) TakeOrderedAndProject
Input [14]: [cd_gender#23, cd_marital_status#24, cd_education_status#25, cnt1#34, cd_purchase_estimate#26, cnt2#35, cd_credit_rating#27, cnt3#36, cd_dep_count#28, cnt4#37, cd_dep_employed_count#29, cnt5#38, cd_dep_college_count#30, cnt6#39]
Arguments: 100, [cd_gender#23 ASC NULLS FIRST, cd_marital_status#24 ASC NULLS FIRST, cd_education_status#25 ASC NULLS FIRST, cd_purchase_estimate#26 ASC NULLS FIRST, cd_credit_rating#27 ASC NULLS FIRST, cd_dep_count#28 ASC NULLS FIRST, cd_dep_employed_count#29 ASC NULLS FIRST, cd_dep_college_count#30 ASC NULLS FIRST], [cd_gender#23, cd_marital_status#24, cd_education_status#25, cnt1#34, cd_purchase_estimate#26, cnt2#35, cd_credit_rating#27, cnt3#36, cd_dep_count#28, cnt4#37, cd_dep_employed_count#29, cnt5#38, cd_dep_college_count#30, cnt6#39]

===== Subqueries =====

Subquery:1 Hosting operator id = 3 Hosting Expression = ss_sold_date_sk#7 IN dynamicpruning#8
BroadcastExchange (50)
+- * CometColumnarToRow (49)
+- CometProject (48)
+- CometFilter (47)
+- CometScan parquet spark_catalog.default.date_dim (46)
BroadcastExchange (51)
+- * CometColumnarToRow (50)
+- CometProject (49)
+- CometFilter (48)
+- CometScan parquet spark_catalog.default.date_dim (47)


(46) CometScan parquet spark_catalog.default.date_dim
(47) CometScan parquet spark_catalog.default.date_dim
Output [3]: [d_date_sk#9, d_year#10, d_moy#11]
Batched: true
Location [not included in comparison]/{warehouse_dir}/date_dim]
PushedFilters: [IsNotNull(d_year), IsNotNull(d_moy), EqualTo(d_year,2002), GreaterThanOrEqual(d_moy,1), LessThanOrEqual(d_moy,4), IsNotNull(d_date_sk)]
ReadSchema: struct<d_date_sk:int,d_year:int,d_moy:int>

(47) CometFilter
(48) CometFilter
Input [3]: [d_date_sk#9, d_year#10, d_moy#11]
Condition : (((((isnotnull(d_year#10) AND isnotnull(d_moy#11)) AND (d_year#10 = 2002)) AND (d_moy#11 >= 1)) AND (d_moy#11 <= 4)) AND isnotnull(d_date_sk#9))

(48) CometProject
(49) CometProject
Input [3]: [d_date_sk#9, d_year#10, d_moy#11]
Arguments: [d_date_sk#9], [d_date_sk#9]

(49) CometColumnarToRow [codegen id : 1]
(50) CometColumnarToRow [codegen id : 1]
Input [1]: [d_date_sk#9]

(50) BroadcastExchange
(51) BroadcastExchange
Input [1]: [d_date_sk#9]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [plan_id=6]

Expand Down
Loading

0 comments on commit d15d051

Please sign in to comment.