Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for sum(decimal) Spark aggregate function #5372

Closed

Conversation

liujiayi771
Copy link
Contributor

@liujiayi771 liujiayi771 commented Jun 23, 2023

resolve #5226.
spark sql needs an isEmpty attribute in decimal sum agg. We need to implement
a new decimal sum agg and add isEmpty semantics.
Currently, gluten only support no ansi mode, so we need to return null when
overflow. Spark judges whether overflow occurs by using isEmpty=false, but
sum is null. isEmpty can cooperate with sum to express there is overflow in
the intermediate result.
Spark's implement
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Sum.scala

@netlify
Copy link

netlify bot commented Jun 23, 2023

Deploy Preview for meta-velox canceled.

Name Link
🔨 Latest commit e6297bd
🔍 Latest deploy log https://app.netlify.com/sites/meta-velox/deploys/65dfdfc8c2795a00089e9680

@facebook-github-bot facebook-github-bot added the CLA Signed This label is managed by the Facebook bot. Authors need to sign the CLA before a PR can be reviewed. label Jun 23, 2023
@liujiayi771 liujiayi771 force-pushed the spark-decimal-sum branch 2 times, most recently from dc8a517 to 2fa780b Compare June 23, 2023 14:03
@liujiayi771 liujiayi771 reopened this Jun 23, 2023
@liujiayi771 liujiayi771 marked this pull request as draft June 23, 2023 14:45
@liujiayi771 liujiayi771 marked this pull request as ready for review June 24, 2023 14:15
@liujiayi771
Copy link
Contributor Author

@majetideepak Could you help review this PR?

@liujiayi771
Copy link
Contributor Author

@majetideepak Please take a look.

Copy link
Contributor

@mbasmanova mbasmanova left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@majetideepak Deepak, would you help review this PR?

@mbasmanova mbasmanova requested a review from majetideepak July 24, 2023 12:29
@liujiayi771
Copy link
Contributor Author

liujiayi771 commented Aug 2, 2023

Hi, @mbasmanova I think it would be better for spark’s decimal sum and decimal avg to inherit DecimalAggregate. When this PR was proposed, AverageAggregateBase was not moved to function/lib. I plan to re-implement this PR. Do you have any suggestions?
cc @rui-mo

Copy link
Collaborator

@rui-mo rui-mo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, gluten only support no ansi mode, so we need to return null when overflow.

Can we rephrase this part a bit to describe the behavior of Spark more precisely? Because ANSI is a spark config, and this PR only supports ANSI OFF mode.

Currently, only ANSI OFF mode is supported in this PR, which means null is returned when overflow.

As quoted from Spark:
For decimal type, the initial value of sum is 0. We need to keep sum unchanged if the input is null, as SUM function ignores null input. The sum can only be null if overflow happens under non-ansi mode.

velox/functions/sparksql/aggregates/DecimalSumAggregate.h Outdated Show resolved Hide resolved
velox/functions/sparksql/aggregates/DecimalSumAggregate.h Outdated Show resolved Hide resolved
velox/functions/sparksql/aggregates/DecimalSumAggregate.h Outdated Show resolved Hide resolved
velox/functions/sparksql/aggregates/DecimalSumAggregate.h Outdated Show resolved Hide resolved
velox/functions/sparksql/aggregates/DecimalSumAggregate.h Outdated Show resolved Hide resolved
velox/functions/sparksql/aggregates/DecimalSumAggregate.h Outdated Show resolved Hide resolved
velox/functions/sparksql/aggregates/DecimalSumAggregate.h Outdated Show resolved Hide resolved
velox/functions/sparksql/aggregates/DecimalSumAggregate.h Outdated Show resolved Hide resolved
Copy link
Collaborator

@rui-mo rui-mo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add the link of Spark's Sum implementation to the PR description? That could help reader understand this issue.
Also, we may need to record this behavior at https://github.com/facebookincubator/velox/blob/main/velox/docs/functions/spark/aggregate.rst.

@liujiayi771 liujiayi771 force-pushed the spark-decimal-sum branch 2 times, most recently from b806f70 to 53645c0 Compare December 23, 2023 12:05
marin-ma pushed a commit to oap-project/velox that referenced this pull request Jan 4, 2024
marin-ma pushed a commit to oap-project/velox that referenced this pull request Jan 5, 2024
@liujiayi771
Copy link
Contributor Author

@kagamiori Thank you for your patience. I have addressed or made changes in response to the latest comments.

Copy link
Contributor

@kagamiori kagamiori left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thank you for adding this function!

velox/docs/develop/aggregate-functions.rst Outdated Show resolved Hide resolved
velox/docs/develop/aggregate-functions.rst Outdated Show resolved Hide resolved
@facebook-github-bot
Copy link
Contributor

@kagamiori has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator.

@facebook-github-bot
Copy link
Contributor

@kagamiori has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator.

@liujiayi771
Copy link
Contributor Author

@kagamiori Thank you very much. I will also reimplement Spark decimal average based on the Simple Aggregate Function Interface.

@liujiayi771
Copy link
Contributor Author

Hi @kagamiori @mbasmanova. Did anything go wrong in Facebook Internal tests?

@kagamiori
Copy link
Contributor

Hi @kagamiori @mbasmanova. Did anything go wrong in Facebook Internal tests?

No worries. We were just waiting for internal tests to pass. Landing it now.

@facebook-github-bot
Copy link
Contributor

@kagamiori merged this pull request in a38891a.

@kagamiori
Copy link
Contributor

Hi @liujiayi771, I found that velox/functions/sparksql/aggregates/tests/SumAggregationTest.cpp throws errors when running with UBSAN in two unit tests, SumAggregationTest.hookLimits and SumAggregationTest.overflow. Could you help fix them? I attached the error log below.

[ RUN      ] SumAggregationTest.hookLimits

W0304 11:18:21.697316 1652297 MemoryArbitrator.cpp:83] Query memory capacity[8.00GB] is set for NOOP arbitrator which has no capacity enforcement
I0304 11:18:22.716148 1652297 HiveConnector.cpp:69] Hive connector test-hive created with maximum of 20000 cached file handles.
buck-out/v2/gen/fbcode/0155eabfa767915a/velox/exec/__velox_exec_lib__/buck-headers/velox/exec/AggregationHook.h:106:12: runtime error: signed integer overflow: -9223372036854775808 + -1 cannot be represented in type 'long'
    #0 0xb53140 in void facebook::velox::aggregate::(anonymous namespace)::updateSingleValue<long, true>(long&, long) fbcode/velox/exec/AggregationHook.h:106
    #1 0xb5093a in facebook::velox::aggregate::SumHook<long, long, true>::addValue(int, void const*) fbcode/velox/exec/AggregationHook.h:149
    #2 0xb4f6bf in void facebook::velox::functions::aggregate::test::testHookLimits<long, long, true>(bool) fbcode/velox/functions/lib/aggregates/tests/SumTestBase.h:59
    #3 0xb4ddb2 in facebook::velox::functions::aggregate::sparksql::test::(anonymous namespace)::SumAggregationTest_hookLimits_Test::TestBody() fbcode/velox/functions/sparksql/aggregates/tests/SumAggregationTest.cpp:136
    #4 0x7f95f44c389e in void testing::internal::HandleExceptionsInMethodIfSupported<testing::Test, void>(testing::Test*, void (testing::Test::*)(), char const*) fbsource/src/gtest.cc:2675
    #5 0x7f95f44c3124 in testing::Test::Run() fbsource/src/gtest.cc:2692
    #6 0x7f95f44c8d5f in testing::TestInfo::Run() fbsource/src/gtest.cc:2841
    #7 0x7f95f44d0d16 in testing::TestSuite::Run() fbsource/src/gtest.cc:3020
    #8 0x7f95f450c67b in testing::internal::UnitTestImpl::RunAllTests() fbsource/src/gtest.cc:5925
    #9 0x7f95f450b6db in bool testing::internal::HandleExceptionsInMethodIfSupported<testing::internal::UnitTestImpl, bool>(testing::internal::UnitTestImpl*, bool (testing::internal::UnitTestImpl::*)(), char const*) fbsource/src/gtest.cc:2675
    #10 0x7f95f450ac19 in testing::UnitTest::Run() fbsource/src/gtest.cc:5489
    #11 0xaee620 in RUN_ALL_TESTS() fbsource/gtest/gtest.h:2317
    #12 0xaee4ec in main fbcode/velox/functions/sparksql/aggregates/tests/Main.cpp:22
    #13 0x7f95ebe2c656 in __libc_start_call_main /home/engshare/third-party2/glibc/2.34/src/glibc-2.34/csu/../sysdeps/nptl/libc_start_call_main.h:58:16
    #14 0x7f95ebe2c717 in __libc_start_main@GLIBC_2.2.5 /home/engshare/third-party2/glibc/2.34/src/glibc-2.34/csu/../csu/libc-start.c:409:3
    #15 0x45c1a0 in _start /home/engshare/third-party2/glibc/2.34/src/glibc-2.34/csu/../sysdeps/x86_64/start.S:116

SUMMARY: UndefinedBehaviorSanitizer: signed-integer-overflow buck-out/v2/gen/fbcode/0155eabfa767915a/velox/exec/__velox_exec_lib__/buck-headers/velox/exec/AggregationHook.h:106:12 in 
[ RUN      ] SumAggregationTest.overflow

W0304 11:18:21.659759 1652318 MemoryArbitrator.cpp:83] Query memory capacity[8.00GB] is set for NOOP arbitrator which has no capacity enforcement
I0304 11:18:22.747507 1652318 HiveConnector.cpp:69] Hive connector test-hive created with maximum of 20000 cached file handles.
buck-out/v2/gen/fbcode/0155eabfa767915a/velox/functions/lib/aggregates/__aggregates__/buck-headers/velox/functions/lib/aggregates/SumAggregateBase.h:159:14: runtime error: signed integer overflow: 9223372036854775807 + 1 cannot be represented in type 'long'
    #0 0x7f4371b7c980 in void facebook::velox::functions::aggregate::SumAggregateBase<long, long, long, true>::updateSingleValue<long>(long&, long) fbcode/velox/functions/lib/aggregates/SumAggregateBase.h:159
    #1 0x7f4371b80b4a in void facebook::velox::functions::aggregate::SimpleNumericAggregate<long, long, long>::updateNonNullValue<true, long, void (*)(long&, long)>(char*, long, void (*)(long&, long)) fbcode/velox/functions/lib/aggregates/SimpleNumericAggregate.h:240
    #2 0x7f4371b8fff6 in void facebook::velox::functions::aggregate::SimpleNumericAggregate<long, long, long>::updateOneGroup<long, long, void (*)(long&, long), void (*)(long&, long, int)>(char*, facebook::velox::SelectivityVector const&, std::shared_ptr<facebook::velox::BaseVector> const&, void (*)(long&, long), void (*)(long&, long, int), bool, long)::'lambda0'(int)::operator()(int) const fbcode/velox/functions/lib/aggregates/SimpleNumericAggregate.h:184
    #3 0x7f4371b8e0f6 in void facebook::velox::SelectivityVector::applyToSelected<void facebook::velox::functions::aggregate::SimpleNumericAggregate<long, long, long>::updateOneGroup<long, long, void (*)(long&, long), void (*)(long&, long, int)>(char*, facebook::velox::SelectivityVector const&, std::shared_ptr<facebook::velox::BaseVector> const&, void (*)(long&, long), void (*)(long&, long, int), bool, long)::'lambda0'(int)>(long) const fbcode/velox/vector/SelectivityVector.h:429
    #4 0x7f4371b8d68b in void facebook::velox::functions::aggregate::SimpleNumericAggregate<long, long, long>::updateOneGroup<long, long, void (*)(long&, long), void (*)(long&, long, int)>(char*, facebook::velox::SelectivityVector const&, std::shared_ptr<facebook::velox::BaseVector> const&, void (*)(long&, long), void (*)(long&, long, int), bool, long) fbcode/velox/functions/lib/aggregates/SimpleNumericAggregate.h:183
    #5 0x7f4371b7ac37 in facebook::velox::functions::aggregate::SumAggregateBase<long, long, long, true>::addSingleGroupRawInput(char*, facebook::velox::SelectivityVector const&, std::vector<std::shared_ptr<facebook::velox::BaseVector>, std::allocator<std::shared_ptr<facebook::velox::BaseVector>>> const&, bool) fbcode/velox/functions/lib/aggregates/SumAggregateBase.h:95
    #6 0x7f436522ca43 in facebook::velox::exec::GroupingSet::addGlobalAggregationInput(std::shared_ptr<facebook::velox::RowVector> const&, bool) fbcode/velox/exec/GroupingSet.cpp:539
    #7 0x7f436522bf8f in facebook::velox::exec::GroupingSet::addInput(std::shared_ptr<facebook::velox::RowVector> const&, bool) fbcode/velox/exec/GroupingSet.cpp:159
    #8 0x7f43652b38ff in facebook::velox::exec::HashAggregation::addInput(std::shared_ptr<facebook::velox::RowVector>) fbcode/velox/exec/HashAggregation.cpp:130
    #9 0x7f43650b52b5 in facebook::velox::exec::Driver::runInternal(std::shared_ptr<facebook::velox::exec::Driver>&, std::shared_ptr<facebook::velox::exec::BlockingState>&, std::shared_ptr<facebook::velox::RowVector>&) fbcode/velox/exec/Driver.cpp:619
    #10 0x7f43650a89ed in facebook::velox::exec::Driver::run(std::shared_ptr<facebook::velox::exec::Driver>) fbcode/velox/exec/Driver.cpp:762
    #11 0x7f43650a864e in facebook::velox::exec::Driver::enqueue(std::shared_ptr<facebook::velox::exec::Driver>)::$_1::operator()() const fbcode/velox/exec/Driver.cpp:261
    #12 0x7f43650a84fc in void folly::detail::function::FunctionTraits<void ()>::callSmall<facebook::velox::exec::Driver::enqueue(std::shared_ptr<facebook::velox::exec::Driver>)::$_1>(folly::detail::function::Data&) fbcode/folly/Function.h:346
    #13 0x7f434d1aac16 in folly::detail::function::FunctionTraits<void ()>::operator()() fbcode/folly/Function.h:368
    #14 0x7f434d1a8b9d in catch_exception<folly::Function<void ()> &, void (&)(const char *) noexcept, const char *&, void> fbcode/folly/lang/Exception.h:286
    #15 0x7f434d1a8b9d in invokeCatchingExns<folly::Function<void ()> > fbcode/folly/Executor.h:234
    #16 0x7f434d1a8b9d in folly::ThreadPoolExecutor::runTask(std::shared_ptr<folly::ThreadPoolExecutor::Thread> const&, folly::ThreadPoolExecutor::Task&&) fbcode/folly/executors/ThreadPoolExecutor.cpp:102
    #17 0x7f436f2ea14e in folly::CPUThreadPoolExecutor::threadRun(std::shared_ptr<folly::ThreadPoolExecutor::Thread>) fbcode/folly/executors/CPUThreadPoolExecutor.cpp:333
    #18 0x7f434d1b5f83 in void std::__invoke_impl<void, void (folly::ThreadPoolExecutor::*&)(std::shared_ptr<folly::ThreadPoolExecutor::Thread>), folly::ThreadPoolExecutor*&, std::shared_ptr<folly::ThreadPoolExecutor::Thread>&>(std::__invoke_memfun_deref, void (folly::ThreadPoolExecutor::*&)(std::shared_ptr<folly::ThreadPoolExecutor::Thread>), folly::ThreadPoolExecutor*&, std::shared_ptr<folly::ThreadPoolExecutor::Thread>&) fbcode/third-party-buck/platform010/build/libgcc/include/c++/trunk/bits/invoke.h:74
    #19 0x7f434d1b5d44 in std::__invoke_result<void (folly::ThreadPoolExecutor::*&)(std::shared_ptr<folly::ThreadPoolExecutor::Thread>), folly::ThreadPoolExecutor*&, std::shared_ptr<folly::ThreadPoolExecutor::Thread>&>::type std::__invoke<void (folly::ThreadPoolExecutor::*&)(std::shared_ptr<folly::ThreadPoolExecutor::Thread>), folly::ThreadPoolExecutor*&, std::shared_ptr<folly::ThreadPoolExecutor::Thread>&>(void (folly::ThreadPoolExecutor::*&)(std::shared_ptr<folly::ThreadPoolExecutor::Thread>), folly::ThreadPoolExecutor*&, std::shared_ptr<folly::ThreadPoolExecutor::Thread>&) fbcode/third-party-buck/platform010/build/libgcc/include/c++/trunk/bits/invoke.h:96
    #20 0x7f434d1b5ca3 in void std::_Bind<void (folly::ThreadPoolExecutor::* (folly::ThreadPoolExecutor*, std::shared_ptr<folly::ThreadPoolExecutor::Thread>))(std::shared_ptr<folly::ThreadPoolExecutor::Thread>)>::__call<void, 0ul, 1ul>(std::tuple<>&&, std::_Index_tuple<0ul, 1ul>) fbcode/third-party-buck/platform010/build/libgcc/include/c++/trunk/functional:420
    #21 0x7f434d1b5b0d in void std::_Bind<void (folly::ThreadPoolExecutor::* (folly::ThreadPoolExecutor*, std::shared_ptr<folly::ThreadPoolExecutor::Thread>))(std::shared_ptr<folly::ThreadPoolExecutor::Thread>)>::operator()<void>() fbcode/third-party-buck/platform010/build/libgcc/include/c++/trunk/functional:503
    #22 0x7f434d1b582c in void folly::detail::function::FunctionTraits<void ()>::callSmall<std::_Bind<void (folly::ThreadPoolExecutor::* (folly::ThreadPoolExecutor*, std::shared_ptr<folly::ThreadPoolExecutor::Thread>))(std::shared_ptr<folly::ThreadPoolExecutor::Thread>)>>(folly::detail::function::Data&) fbcode/folly/Function.h:346
    #23 0x7f437629c2c6 in folly::detail::function::FunctionTraits<void ()>::operator()() fbcode/folly/Function.h:368
    #24 0x7f437629c114 in folly::NamedThreadFactory::newThread(folly::Function<void ()>&&)::'lambda'()::operator()() fbcode/folly/executors/thread_factory/NamedThreadFactory.h:40
    #25 0x7f437629bfd4 in void std::__invoke_impl<void, folly::NamedThreadFactory::newThread(folly::Function<void ()>&&)::'lambda'()>(std::__invoke_other, folly::NamedThreadFactory::newThread(folly::Function<void ()>&&)::'lambda'()&&) fbcode/third-party-buck/platform010/build/libgcc/include/c++/trunk/bits/invoke.h:61
    #26 0x7f437629bf94 in std::__invoke_result<folly::NamedThreadFactory::newThread(folly::Function<void ()>&&)::'lambda'()>::type std::__invoke<folly::NamedThreadFactory::newThread(folly::Function<void ()>&&)::'lambda'()>(folly::NamedThreadFactory::newThread(folly::Function<void ()>&&)::'lambda'()&&) fbcode/third-party-buck/platform010/build/libgcc/include/c++/trunk/bits/invoke.h:96
    #27 0x7f437629bf6c in void std::thread::_Invoker<std::tuple<folly::NamedThreadFactory::newThread(folly::Function<void ()>&&)::'lambda'()>>::_M_invoke<0ul>(std::_Index_tuple<0ul>) fbcode/third-party-buck/platform010/build/libgcc/include/c++/trunk/bits/std_thread.h:253
    #28 0x7f437629bf44 in std::thread::_Invoker<std::tuple<folly::NamedThreadFactory::newThread(folly::Function<void ()>&&)::'lambda'()>>::operator()() fbcode/third-party-buck/platform010/build/libgcc/include/c++/trunk/bits/std_thread.h:260
    #29 0x7f437629be08 in std::thread::_State_impl<std::thread::_Invoker<std::tuple<folly::NamedThreadFactory::newThread(folly::Function<void ()>&&)::'lambda'()>>>::_M_run() fbcode/third-party-buck/platform010/build/libgcc/include/c++/trunk/bits/std_thread.h:211
    #30 0x7f436fadf4e4 in execute_native_thread_routine /home/engshare/third-party2/libgcc/11.x/src/gcc-11.x/x86_64-facebook-linux/libstdc++-v3/src/c++11/../../../.././libstdc++-v3/src/c++11/thread.cc:82:18
    #31 0x7f436fe9abae in start_thread /home/engshare/third-party2/glibc/2.34/src/glibc-2.34/nptl/pthread_create.c:434:8
    #32 0x7f436ff2d17b in __GI___clone3 /home/engshare/third-party2/glibc/2.34/src/glibc-2.34/misc/../sysdeps/unix/sysv/linux/x86_64/clone3.S:81

SUMMARY: UndefinedBehaviorSanitizer: signed-integer-overflow buck-out/v2/gen/fbcode/0155eabfa767915a/velox/functions/lib/aggregates/__aggregates__/buck-headers/velox/functions/lib/aggregates/SumAggregateBase.h:159:14 in 

@liujiayi771
Copy link
Contributor Author

@kagamiori OK. I will take a look at it later this week.

@kagamiori
Copy link
Contributor

@kagamiori OK. I will take a look at it later this week.

Thanks!

@liujiayi771 liujiayi771 deleted the spark-decimal-sum branch March 12, 2024 01:31
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
CLA Signed This label is managed by the Facebook bot. Authors need to sign the CLA before a PR can be reviewed. Merged
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Spark sql sum agg function support decimal
7 participants