Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[VL] Optimize the performance of hash based shuffle by accumulating batches #5951

Merged
merged 2 commits into from
Jun 11, 2024

Conversation

XinShuoWang
Copy link
Contributor

What changes were proposed in this pull request?

I used perf to observe the benchmark and found that the most time-consuming functions were splitFixedWidthValueBuffer and splitBinaryType. However, current column computing engines (such as starrocks) also use this idea of ​​exchanging random read memory overhead for sequential write memory overhead to implement the split function, so I think there is not much room for optimization of the split function.

截屏2024-06-02 16 39 30

I found that when the ShuffleBatchSize is increased, the performance will be significantly improved. I think the performance benefits mainly come from the following aspects:

  1. It can give full play to the advantages of sequential memory writing in the split stage. When PartitionNum is 10000 and ShuffleBatchSize is 4096 (the default value in the benchmark), each Partition is only allocated 1 row of data at most (the data obtained by logging statistics in the benchmark). At this time, it is obviously impossible to give full play to the advantages of sequential memory writing.

  2. It can reduce the number of function calls and the number of memory allocations.

Therefore, I implemented this PR to cache the data to be Shuffled, which can optimize the performance of ShuffleWrite. For specific test data, please refer to the screenshot below.

I think this PR can also control whether to cache data in combination with memory usage, thereby avoiding the ShuffleWrite OOM problem.

How was this patch tested?

Command

./build/velox/benchmarks/shuffle_split_benchmark --file=/root/shuffleSplitBenchmark/cpp/velox/benchmarks/data/tpch_sf10m/lineitem/part-00000-6c374e0a-7d76-401b-8458-a8e31f8ab704-c000.snappy.parquet --partitions=10000 --iterations=100 --threads=1

Before optimize

截屏2024-06-02 16 13 43

After optimize

截屏2024-06-02 16 18 28

(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)

Copy link

github-actions bot commented Jun 2, 2024

Thanks for opening a pull request!

Could you open an issue for this pull request on Github Issues?

https://github.com/apache/incubator-gluten/issues

Then could you also rename commit message and pull request title in the following format?

[GLUTEN-${ISSUES_ID}][COMPONENT]feat/fix: ${detailed message}

See also:

@XinShuoWang XinShuoWang marked this pull request as ready for review June 2, 2024 08:58
@Yohahaha Yohahaha requested a review from kerwin-zk June 2, 2024 13:39
@marin-ma
Copy link
Contributor

marin-ma commented Jun 3, 2024

Thanks @XinShuoWang Please fix the code style so that we can run TPCH benchmark :)

I think this PR can also control whether to cache data in combination with memory usage, thereby avoiding the ShuffleWrite OOM problem.

Is this part already included in the PR?

@XinShuoWang XinShuoWang force-pushed the optimize_hash_based_shuffle branch from 742ecb7 to f669b6a Compare June 3, 2024 07:09
@FelixYBW
Copy link
Contributor

FelixYBW commented Jun 3, 2024

Thank you for the improvement.

The ideal case of current split function is that: the input batch size should be as large as possible but all columns can fit into L2 cache. Once the column data can't fit into L2 cache, split performance will drop dramatically.

On the other side if the batch size is too small, the overhead of preparing of split actually can't be ignored. We should have room to improve with this case.

Also in current implementation, we must flatten the rowvector to split. The flatten itself also have overhead which is not analyzed.

@XinShuoWang
Copy link
Contributor Author

  template <typename T>
  arrow::Status splitFixedType(const uint8_t* srcAddr, const std::vector<uint8_t*>& dstAddrs) {
    for (auto& pid : partitionUsed_) {
      auto dstPidBase = (T*)(dstAddrs[pid] + partitionBufferBase_[pid] * sizeof(T));
      // record dstPidBase
      ...

@FelixYBW I added a log at the above location and found that the interval between dstPidBase is about 16 KB. I think this can also be optimized to make the interval between dstPidBase very small, which will help reduce cache misses.

@XinShuoWang XinShuoWang force-pushed the optimize_hash_based_shuffle branch 3 times, most recently from 34f87f0 to 9ffbccb Compare June 4, 2024 09:46
@XinShuoWang XinShuoWang marked this pull request as draft June 4, 2024 11:33
@XinShuoWang XinShuoWang force-pushed the optimize_hash_based_shuffle branch from 9ffbccb to 398a42b Compare June 4, 2024 11:34
@zhztheplayer
Copy link
Member

Oops. I missed this PR before opening this for similar purpose...

@zhztheplayer
Copy link
Member

zhztheplayer commented Jun 7, 2024

The change makes sense to me. I think it's operational to merge this and use #6009 as follow-up which adds an individual Spark operator controlling this behavior for being reused for other operators in future (say, joins or aggs) by some kind of strategies. Let me know if any thoughts. @XinShuoWang @FelixYBW @marin-ma

@XinShuoWang XinShuoWang marked this pull request as ready for review June 7, 2024 05:16
@XinShuoWang
Copy link
Contributor Author

@zhztheplayer @Yohahaha Can you help me to fix the failed CI? The error message is java.lang.IllegalStateException: nativeShuffleWriter should not be -1L

@Yohahaha
Copy link
Contributor

@zhztheplayer @Yohahaha Can you help me to fix the failed CI? The error message is java.lang.IllegalStateException: nativeShuffleWriter should not be -1L

I think it's Uniffle's error and has been fixed on main branch, try rebase.

@zhztheplayer zhztheplayer merged commit ec3e92e into apache:main Jun 11, 2024
38 checks passed
@HaoChen-ch
Copy link

HaoChen-ch commented Sep 25, 2024

The change makes sense to me. I think it's operational to merge this and use #6009 as follow-up which adds an individual Spark operator controlling this behavior for being reused for other operators in future (say, joins or aggs) by some kind of strategies. Let me know if any thoughts. @XinShuoWang @FelixYBW @marin-ma

velox has a feature to merge small vector for the output of aggs,filter and join,ref https://github.com/facebookincubator/velox/pull/7899/files but the pr is not merge to master. shall we go on the pr to merge it to master?
@zhztheplayer

@zhztheplayer
Copy link
Member

zhztheplayer commented Sep 25, 2024

Sounds great to me that someone could take on the remaining work of facebookincubator/velox#7899. This batch resizing work doesn't have to be in individual operators actually, as long as the logic can be reused both in Velox and Gluten shuffle. And it's enough to see only some extra metrics showing the resizing time from relevant operators.

Before that we can still use the operator to do optimizations from Gluten side.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants