Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[VL] Allow specifying maximum batch size for batch resizing #6670

Merged
merged 6 commits into from
Aug 2, 2024

Conversation

zhztheplayer
Copy link
Member

@zhztheplayer zhztheplayer commented Aug 1, 2024

This is a rework of #6009.

  1. Rename operator VeloxAppendBatchesExec to VeloxResizeBatchesExec, and have it supporting batch-splitting as well as batch-appending.
  2. Remove old config options spark.gluten.sql.columnar.backend.velox.coalesceBatchesBeforeShuffle and spark.gluten.sql.columnar.backend.velox.minBatchSizeForShuffle
  3. Add new config options:
    • spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput=true
      Enable batch resizing for shuffle input
    • spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput.range=500~1000
      Specify batch resizing target size range: [500, 1000]
  4. Default range of shuffle batch resizing: [0.25 * 4096, 4 * 4096] (changed from [0.8 * 4096,])

Copy link

github-actions bot commented Aug 1, 2024

Thanks for opening a pull request!

Could you open an issue for this pull request on Github Issues?

https://github.com/apache/incubator-gluten/issues

Then could you also rename commit message and pull request title in the following format?

[GLUTEN-${ISSUES_ID}][COMPONENT]feat/fix: ${detailed message}

See also:

Copy link

github-actions bot commented Aug 1, 2024

Run Gluten Clickhouse CI

Copy link

github-actions bot commented Aug 1, 2024

Run Gluten Clickhouse CI

@Yohahaha
Copy link
Contributor

Yohahaha commented Aug 1, 2024

would you consider implement such logic into a Velox operator like ValueStream? then it can be reused in these place: before shuffle write, before agg, after scan. cc @zhli1142015 .

And lots java/scala changes can be removed.

@zhztheplayer
Copy link
Member Author

would you consider implement such logic into a Velox operator like ValueStream?

It will be the next step of the topic. See a related comment from another PR.

The major pros of doing it in Velox is to avoid breaking one Velox task into several.

And lots java/scala changes can be removed.

Regarding the code, it's expected most of C++ code can be removed then. However it might still needed to have a JVM side plan node to map to Velox's corresponding plan node.

Copy link

github-actions bot commented Aug 1, 2024

Run Gluten Clickhouse CI

@Yohahaha
Copy link
Contributor

Yohahaha commented Aug 1, 2024

would you consider implement such logic into a Velox operator like ValueStream?

It will be the next step of the topic. See a related comment from another PR.

The major pros of doing it in Velox is to avoid breaking one Velox task into several.

And lots java/scala changes can be removed.

Regarding the code, it's expected most of C++ code can be removed then. However it might still needed to have a JVM side plan node to map to Velox's corresponding plan node.

yeah, indeed, I have done a POC for above ideas, just one rule/transformer/velox operator added.

@zhztheplayer
Copy link
Member Author

would you consider implement such logic into a Velox operator like ValueStream?

It will be the next step of the topic. See a related comment from another PR.
The major pros of doing it in Velox is to avoid breaking one Velox task into several.

And lots java/scala changes can be removed.

Regarding the code, it's expected most of C++ code can be removed then. However it might still needed to have a JVM side plan node to map to Velox's corresponding plan node.

yeah, indeed, I have done a POC for above ideas, just one rule/transformer/velox operator added.

That sounds great. Do you tend to contribute to upstream Velox? If so please feel free to open a PR to refresh the feature in Gluten once Velox change has landed.

@Yohahaha
Copy link
Contributor

Yohahaha commented Aug 1, 2024

would you consider implement such logic into a Velox operator like ValueStream?

It will be the next step of the topic. See a related comment from another PR.
The major pros of doing it in Velox is to avoid breaking one Velox task into several.

And lots java/scala changes can be removed.

Regarding the code, it's expected most of C++ code can be removed then. However it might still needed to have a JVM side plan node to map to Velox's corresponding plan node.

yeah, indeed, I have done a POC for above ideas, just one rule/transformer/velox operator added.

That sounds great. Do you tend to contribute to upstream Velox? If so please feel free to open a PR to refresh the feature in Gluten once Velox change has landed.

yes, I'm preparing the codes.

@github-actions github-actions bot added CORE works for Gluten Core VELOX labels Aug 2, 2024
Copy link

github-actions bot commented Aug 2, 2024

Run Gluten Clickhouse CI

@zhztheplayer zhztheplayer marked this pull request as ready for review August 2, 2024 06:08
if (remainingLength == 0) {
return nullptr;
}
int32_t sliceLength = std::min(maxOutputBatchSize_, remainingLength);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why use maxOutputBatchSize_ but not default batch size (4096)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be a good point. I was worrying if batch slicing brings extra overhead when size is just slightly larger than 4096, say 5000.

I could raise a PR later to change the value to a more reasonable one. Before that I think it's just OK to use a large value since it aligns with current code (which doesn't split at all) as well.

assert(pattern.count(_ == '~') == 1, s"Invalid range pattern for batch resizing: $pattern")
val splits = pattern.split('~')
assert(splits.length == 2)
ResizeRange(splits(0).toInt, splits(1).toInt)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we check the min size always < COLUMNAR_MAX_BATCH_SIZE and max size > COLUMNAR_MAX_BATCH_SIZE ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason doing such check?

To me it's not necessary to create tight coupling between this range and COLUMNAR_MAX_BATCH_SIZE, the former's value was just derived from the latter.

Copy link
Contributor

@marin-ma marin-ma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks!

@zhztheplayer zhztheplayer merged commit 94b79c7 into apache:main Aug 2, 2024
50 checks passed
@FelixYBW
Copy link
Contributor

FelixYBW commented Aug 2, 2024

Sorry, just noted this PR.
The config of spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput.range=500~1000 is too complex. The maximal batch size should honor the configued batch size always. Slice has little overhead. we just need to define the threshold to merge batches.

@FelixYBW
Copy link
Contributor

FelixYBW commented Aug 2, 2024

yes, I'm preparing the codes.

We can merge small batches into large one. But the slice shouldn't happen, instead we should fix the operator which doesn't honor the maximal batch size define. @jinchengchenghh will fix the generator operator. The other operator I know is the hashjoin.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
CORE works for Gluten Core VELOX
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants