Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CORE] Use SortShuffleManager instance in ColumnarShuffleManager #6022

Merged
merged 9 commits into from
Jun 13, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,23 @@ class FallbackSuite extends VeloxWholeStageTransformerSuite with AdaptiveSparkPl
collect(plan) { case v: VeloxColumnarToRowExec => v }.size
}

test("fallback with shuffle manager") {
withSQLConf(GlutenConfig.COLUMNAR_SHUFFLE_ENABLED.key -> "false") {
runQueryAndCompare("select c1, count(*) from tmp1 group by c1") {
df =>
val plan = df.queryExecution.executedPlan
val columnarShuffle = find(plan) {
case _: ColumnarShuffledJoin => true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should check the number ofColumnarShuffleExchangeExec and ShuffleExchangeExec. You can use repartition hint to construct the sql https://spark.apache.org/docs/latest/sql-ref-syntax-qry-select-hints.html#partitioning-hints

e.g. select /*+ REPARTITION(3, c1) */ * FROM tmp1;

Then the check should be like:

assert(collectColumnarShuffleExchange(plan) == 0)
assert(collectShuffleExchange(plan) == 1)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the current query this is the plan

AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   VeloxColumnarToRowExec
   +- ^(6) HashAggregateTransformer(keys=[c1#14], functions=[count(1)], output=[c1#14, count(1)#26L])
      +- ^(6) InputIteratorTransformer[c1#14, count#30L]
         +- ^(6) InputAdapter
            +- ^(6) RowToVeloxColumnar
               +- ^(6) AQEShuffleRead coalesced
                  +- ^(6) ShuffleQueryStage 0
                     +- Exchange hashpartitioning(c1#14, 5), ENSURE_REQUIREMENTS, [plan_id=281]
                        +- VeloxColumnarToRowExec
                           +- ^(5) HashAggregateTransformer(keys=[c1#14], functions=[partial_count(1)], output=[c1#14, count#30L])
                              +- ^(5) NativeFileScan parquet spark_catalog.default.tmp11111[c1#14] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/home/anvicto/src/Gluten/spark-warehouse/org.apache.gluten.execut..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<c1:int>

Can I retain it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's ok to me. Thanks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have raised one small PR here #6055 to enable fallback for shuffle.

case _ => false
}
assert(columnarShuffle.isEmpty)

val wholeQueryColumnarToRow = collectColumnarToRow(plan)
assert(wholeQueryColumnarToRow == 2)
}
}
}

test("fallback with collect") {
withSQLConf(GlutenConfig.COLUMNAR_WHOLESTAGE_FALLBACK_THRESHOLD.key -> "1") {
runQueryAndCompare("SELECT count(*) FROM tmp1") {
Expand Down
Loading