Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for sum(decimal) Spark aggregate function #5372
Add support for sum(decimal) Spark aggregate function #5372
Changes from 13 commits
0b07cf2
53c02e2
6ef750f
35324d7
0883e26
9df787a
7bd7f86
855c410
670e220
b6b0020
2b55c08
12e10e0
e1614c7
e6297bd
File filter
Filter by extension
Conversations
Jump to
There are no files selected for viewing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great! Thank you for helping enhance the aggregation function interface! Could you also update the documentation about this new flag? (https://facebookincubator.github.io/velox/develop/aggregate-functions.html, source code is in velox/docs/develop/aggregate-functions.rst.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this false? Would you add a comment to explain?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mbasmanova The main reason is that Spark's decimal sum doesn't have the concept of a null group; each group is initialized with an initial value, where sum = 0 and isEmpty = true. Therefore, to maintain consistency, I need to use the parameter
nonNullGroup
inwriteIntermediateResult
(this parameter is only available when non-default-null behavior is enabled), to output a null group as sum = 0, isEmpty = true.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I understand. What is "null group"? Are you trying to match intermediate results to Spark's? If so, does this matter only when the query using companion functions? Why intermediate result for all null inputs cannot be sum = NULL ? Do you need isEmpty = true to allow Spark to distinguish between all-NULL inputs and decimal overflow?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"null group" means all input values for this group are null, we never call clearNull for this group.
Yes. Not only for companion functions. In Spark decimal sum agg, sum is initialized to 0, not null. sum=0, isEmpty=true means all input values are null. Spark use isEmpty=true to distinguish between all-NULL inputs and input values' sum just equal to 0.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to ensure that the data output by the operator is consistent with Spark, because the final agg of the decimal sum may fallback to being executed in Spark for some reason. We need to make sure that the meaning of the intermediate data is consistent with Spark. Spark uses isEmpty and sum to distinguish many different situations for all-null inputs and overflow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. Makes sense. Perhaps, clarify this in the comments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this implementation correct? Shouldn't we produce a non-null struct for every row, where sum = x and isEmpty = isNull(x)? How do you test this code path to ensure Spark can process the results correctly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we need to set sum = x, isEmpty = false for non-null input, and sum = 0, isEmpty = true for null input. I will add a test case for decimal sum in Gluten that will set
kAbandonPartialAggregationMinPct
andkAbandonPartialAggregationMinRows
to a very small value to trigger partial agg abandon.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is not needed, is it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this done to prevent the direct use of the default no-argument constructor to create a struct? I'm not sure about the main purpose here. I've seen other implementations using the simple agg interface include this line of code, so I added it as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not required since another constructor with HashStringAllocator* argument is defined. But I think deleting the default constructor explicitly is still helpful to clarify the purpose of the code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we only need to set itEmpty if it's true.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will introduce an if branch, which may increase overhead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have seen some use cases where the function (1) adds raw inputs, (2) extracts intermediate results, (3) add intermediate results back to accumulator, and (4) continue adding raw inputs to the accumulator (see the test code in AggregationTestBase::testStreaming()).
Suppose the intermediate result added at step (3) is {null, false}, the code here at step (4) would overwrite it. Should this method also check whether the current accumulator is in the "overflow" status?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I was not aware of this scenario before. Thus, we need to verify
sum.has_value()
here, and if it is false, we should ignore the input data and return true directly.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: You can also return false for
if (isEmpty || otherIsEmpty)
upfront here.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so, if the current accumulator's
isEmpty
is true, andotherIsEmpty
is false, the combined accumulator'sisEmpty
will be false, and the sum will equal to theotherSum
. But we can return false forif (isEmpty && otherIsEmpty)
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh yes, sorry, I meant
if (isEmpty && otherIsEmpty)
. My typo.