-
Notifications
You must be signed in to change notification settings - Fork 453
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CORE] Use SortShuffleManager instance in ColumnarShuffleManager #6022
Conversation
Thanks for opening a pull request! Could you open an issue for this pull request on Github Issues? https://github.com/apache/incubator-gluten/issues Then could you also rename commit message and pull request title in the following format?
See also: |
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
/Benchmark Velox |
===== Performance report for TPCH SF2000 with Velox backend, for reference only ====
|
@marin-ma can you please review this? Thank you! |
@rui-mo can you too please review this? Thanks! |
Run Gluten Clickhouse CI |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks!
class ColumnarShuffleManager(conf: SparkConf) extends ShuffleManager with Logging { | ||
|
||
import ColumnarShuffleManager._ | ||
|
||
private lazy val shuffleExecutorComponents = loadShuffleExecutorComponents(conf) | ||
private[this] val sortShuffleManager: SortShuffleManager = new SortShuffleManager(conf) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lazy
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated.
Option(taskIdMapsForShuffle.remove(shuffleId)).foreach { | ||
mapTaskIds => | ||
mapTaskIds.iterator.foreach { | ||
mapId => shuffleBlockResolver.removeDataByMap(shuffleId, mapId) | ||
} | ||
} | ||
true | ||
sortShuffleManager.unregisterShuffle(shuffleId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it same with before ? The taskIdMapsForShuffle
is maintained by ColumnarShuffleManager
so sortShuffleManager. unregisterShuffle
will do nothing since the taskIdMapsForShuffle
is empty.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I missed this thanks, updated shuffleBlockResolver related flow.
} | ||
|
||
/** Shut down this ShuffleManager. */ | ||
override def stop(): Unit = { | ||
shuffleBlockResolver.stop() | ||
sortShuffleManager.stop() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The shuffleBlockResolver is hold in ColumnarShuffleManager
, how can SortShuffleManager
stop it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated, thanks!
…r/gluten into acvictor/columnarShuffle
Run Gluten Clickhouse CI |
@ulysses-you can you please review again? Thank you! |
mapTaskIds.synchronized { | ||
mapTaskIds.add(context.taskAttemptId()) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we move this code in case columnarShuffleHandle: ColumnarShuffleHandle
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved to case columnarShuffleHandle: ColumnarShuffleHandle
gluten-core/src/main/scala/org/apache/spark/shuffle/sort/ColumnarShuffleManager.scala
Show resolved
Hide resolved
shall we call
|
Run Gluten Clickhouse CI |
Updated |
Thank you @ulysses-you for the comments! Updated the PR. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ulysses-you Thanks for review! @acvictor Could you help to add a UT to test the fallback behavior for columnar shuffle? Perhaps, add one test case in |
Run Gluten Clickhouse CI |
df => | ||
val plan = df.queryExecution.executedPlan | ||
val columnarShuffle = find(plan) { | ||
case _: ColumnarShuffledJoin => true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should check the number ofColumnarShuffleExchangeExec
and ShuffleExchangeExec
. You can use repartition hint to construct the sql https://spark.apache.org/docs/latest/sql-ref-syntax-qry-select-hints.html#partitioning-hints
e.g. select /*+ REPARTITION(3, c1) */ * FROM tmp1;
Then the check should be like:
assert(collectColumnarShuffleExchange(plan) == 0)
assert(collectShuffleExchange(plan) == 1)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the current query this is the plan
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
VeloxColumnarToRowExec
+- ^(6) HashAggregateTransformer(keys=[c1#14], functions=[count(1)], output=[c1#14, count(1)#26L])
+- ^(6) InputIteratorTransformer[c1#14, count#30L]
+- ^(6) InputAdapter
+- ^(6) RowToVeloxColumnar
+- ^(6) AQEShuffleRead coalesced
+- ^(6) ShuffleQueryStage 0
+- Exchange hashpartitioning(c1#14, 5), ENSURE_REQUIREMENTS, [plan_id=281]
+- VeloxColumnarToRowExec
+- ^(5) HashAggregateTransformer(keys=[c1#14], functions=[partial_count(1)], output=[c1#14, count#30L])
+- ^(5) NativeFileScan parquet spark_catalog.default.tmp11111[c1#14] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/home/anvicto/src/Gluten/spark-warehouse/org.apache.gluten.execut..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<c1:int>
Can I retain it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's ok to me. Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have raised one small PR here #6055 to enable fallback for shuffle.
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
@marin-ma added a test. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
===== Performance report for TPCH SF2000 with Velox backend, for reference only ====
|
What changes were proposed in this pull request?
This PR makes a change to introduce an object of SortShuffleManager in ColumnarShuffleManager to reduce code overlap.
How was this patch tested?
SortShuffleManagerSuite has already been enabled against ColumnarShuffleManager #5816