diff --git a/velox/exec/fuzzer/AggregationFuzzer.cpp b/velox/exec/fuzzer/AggregationFuzzer.cpp index e97cc48fd191..89ea2ac08fbb 100644 --- a/velox/exec/fuzzer/AggregationFuzzer.cpp +++ b/velox/exec/fuzzer/AggregationFuzzer.cpp @@ -90,7 +90,7 @@ class AggregationFuzzer : public AggregationFuzzerBase { bool verifyWindow( const std::vector& partitionKeys, const std::vector& sortingKeys, - const std::vector& aggregates, + const std::string& aggregate, const std::vector& input, bool customVerification, bool enableWindowVerification); @@ -102,14 +102,16 @@ class AggregationFuzzer : public AggregationFuzzerBase { const std::vector& masks, const std::vector& input, bool customVerification, - const std::vector>& customVerifiers); + const std::shared_ptr& customVerifier); // Return 'true' if query plans failed. bool verifySortedAggregation( const std::vector& groupingKeys, - const std::vector& aggregates, + const std::string& aggregate, const std::vector& masks, - const std::vector& input); + const std::vector& input, + bool customVerification, + const std::shared_ptr& customVerifier); void verifyAggregation(const std::vector& plans); @@ -119,18 +121,18 @@ class AggregationFuzzer : public AggregationFuzzerBase { const std::vector& plans, bool customVerification, const std::vector& input, - const std::vector>& customVerifiers, + const std::shared_ptr& customVerifier, int32_t maxDrivers = 2, bool testWithSpilling = true); // Return 'true' if query plans failed. bool verifyDistinctAggregation( const std::vector& groupingKeys, - const std::vector& aggregates, + const std::string& aggregate, const std::vector& masks, const std::vector& input, bool customVerification, - const std::vector>& customVerifiers); + const std::shared_ptr& customVerifier); static bool hasPartialGroupBy(const core::PlanNodePtr& plan) { auto partialAgg = core::PlanNode::findFirstNode( @@ -364,7 +366,7 @@ void AggregationFuzzer::go() { bool failed = verifyWindow( partitionKeys, sortingKeys, - {call}, + call, input, customVerification, FLAGS_enable_window_reference_verification); @@ -372,12 +374,8 @@ void AggregationFuzzer::go() { signatureWithStats.second.numFailed++; } } else { - // Exclude approx_xxx aggregations since their results differ - // between Velox and reference DB even when input is sorted. const bool sortedInputs = FLAGS_enable_sorted_aggregations && - canSortInputs(signature) && - (signature.name.find("approx_") == std::string::npos) && - vectorFuzzer_.coinToss(0.2); + canSortInputs(signature) && vectorFuzzer_.coinToss(0.2); // Exclude approx_xxx aggregations since their verifiers may not be able // to verify the results. The approx_percentile verifier would discard @@ -419,8 +417,13 @@ void AggregationFuzzer::go() { if (sortedInputs) { ++stats_.numSortedInputs; - bool failed = - verifySortedAggregation(groupingKeys, {call}, masks, input); + bool failed = verifySortedAggregation( + groupingKeys, + call, + masks, + input, + customVerification, + customVerifier); if (failed) { signatureWithStats.second.numFailed++; } @@ -428,11 +431,11 @@ void AggregationFuzzer::go() { ++stats_.numDistinctInputs; bool failed = verifyDistinctAggregation( groupingKeys, - {call}, + call, masks, input, customVerification, - {customVerifier}); + customVerifier); if (failed) { signatureWithStats.second.numFailed++; } @@ -443,7 +446,7 @@ void AggregationFuzzer::go() { masks, input, customVerification, - {customVerifier}); + customVerifier); if (failed) { signatureWithStats.second.numFailed++; } @@ -646,7 +649,7 @@ void makeStreamingPlansWithTableScan( bool AggregationFuzzer::verifyWindow( const std::vector& partitionKeys, const std::vector& sortingKeys, - const std::vector& aggregates, + const std::string& aggregate, const std::vector& input, bool customVerification, bool enableWindowVerification) { @@ -658,11 +661,10 @@ bool AggregationFuzzer::verifyWindow( frame << " order by " << folly::join(", ", sortingKeys); } - auto plan = - PlanBuilder() - .values(input) - .window({fmt::format("{} over ({})", aggregates[0], frame.str())}) - .planNode(); + auto plan = PlanBuilder() + .values(input) + .window({fmt::format("{} over ({})", aggregate, frame.str())}) + .planNode(); if (persistAndRunOnce_) { persistReproInfo({{plan, {}}}, reproPersistPath_); } @@ -700,58 +702,32 @@ bool AggregationFuzzer::verifyWindow( } } -namespace { -void resetCustomVerifiers( - const std::vector>& customVerifiers) { - for (auto& verifier : customVerifiers) { - if (verifier != nullptr) { - verifier->reset(); - } - } -} - -void initializeVerifiers( - const core::PlanNodePtr& plan, - const std::vector>& customVerifiers, - const std::vector& input, - const std::vector& groupingKeys) { - const auto& aggregationNode = - std::dynamic_pointer_cast(plan); - - for (auto i = 0; i < customVerifiers.size(); ++i) { - auto& verifier = customVerifiers[i]; - if (verifier == nullptr) { - continue; - } - - verifier->initialize( - input, - groupingKeys, - aggregationNode->aggregates()[i], - aggregationNode->aggregateNames()[i]); - } -} -} // namespace - bool AggregationFuzzer::verifyAggregation( const std::vector& groupingKeys, const std::vector& aggregates, const std::vector& masks, const std::vector& input, bool customVerification, - const std::vector>& customVerifiers) { + const std::shared_ptr& customVerifier) { auto firstPlan = PlanBuilder() .values(input) .singleAggregation(groupingKeys, aggregates, masks) .planNode(); - if (customVerification) { - initializeVerifiers(firstPlan, customVerifiers, input, groupingKeys); + if (customVerification && customVerifier != nullptr) { + const auto& aggregationNode = + std::dynamic_pointer_cast(firstPlan); + + customVerifier->initialize( + input, + groupingKeys, + aggregationNode->aggregates()[0], + aggregationNode->aggregateNames()[0]); } SCOPE_EXIT { - if (customVerification) { - resetCustomVerifiers(customVerifiers); + if (customVerification && customVerifier != nullptr) { + customVerifier->reset(); } }; @@ -817,39 +793,50 @@ bool AggregationFuzzer::verifyAggregation( } return compareEquivalentPlanResults( - plans, customVerification, input, customVerifiers); + plans, customVerification, input, customVerifier); } bool AggregationFuzzer::verifySortedAggregation( const std::vector& groupingKeys, - const std::vector& aggregates, + const std::string& aggregate, const std::vector& masks, - const std::vector& input) { + const std::vector& input, + bool customVerification, + const std::shared_ptr& customVerifier) { auto firstPlan = PlanBuilder() .values(input) - .singleAggregation(groupingKeys, aggregates, masks) + .singleAggregation(groupingKeys, {aggregate}, masks) .planNode(); - auto resultOrError = execute(firstPlan); - if (resultOrError.exceptionPtr) { - ++stats_.numFailed; - } + bool aggregateOrderSensitive = false; - auto referenceResult = computeReferenceResults(firstPlan, input); - stats_.updateReferenceQueryStats(referenceResult.second); - auto expectedResult = referenceResult.first; - if (expectedResult && resultOrError.result) { - ++stats_.numVerified; - VELOX_CHECK( - assertEqualResults( - expectedResult.value(), - firstPlan->outputType(), - {resultOrError.result}), - "Velox and reference DB results don't match"); - LOG(INFO) << "Verified results against reference DB"; + if (customVerification && customVerifier != nullptr) { + const auto& aggregationNode = + std::dynamic_pointer_cast(firstPlan); + const auto& aggregateFunctionCall = aggregationNode->aggregates()[0]; + const std::string& aggregateFunctionName = + aggregateFunctionCall.call->name(); + + customVerifier->initialize( + input, + groupingKeys, + aggregateFunctionCall, + aggregationNode->aggregateNames()[0]); + + auto* aggregateFunctionEntry = + getAggregateFunctionEntry(aggregateFunctionName); + aggregateOrderSensitive = aggregateFunctionEntry->metadata.orderSensitive; } + SCOPE_EXIT { + if (customVerification && customVerifier != nullptr) { + customVerifier->reset(); + } + }; + std::vector plans; + plans.push_back({firstPlan, {}}); + if (!groupingKeys.empty()) { plans.push_back( {PlanBuilder() @@ -857,7 +844,7 @@ bool AggregationFuzzer::verifySortedAggregation( .orderBy(groupingKeys, false) .streamingAggregation( groupingKeys, - aggregates, + {aggregate}, masks, core::AggregationNode::Step::kSingle, false) @@ -874,7 +861,7 @@ bool AggregationFuzzer::verifySortedAggregation( plans.push_back( {PlanBuilder() .tableScan(inputRowType) - .singleAggregation(groupingKeys, aggregates, masks) + .singleAggregation(groupingKeys, {aggregate}, masks) .planNode(), splits}); @@ -885,7 +872,7 @@ bool AggregationFuzzer::verifySortedAggregation( .orderBy(groupingKeys, false) .streamingAggregation( groupingKeys, - aggregates, + {aggregate}, masks, core::AggregationNode::Step::kSingle, false) @@ -894,11 +881,25 @@ bool AggregationFuzzer::verifySortedAggregation( } } - // Set customVerification to false to trigger direct result comparison. - // TODO Figure out how to enable custom verify(), but not compare(). - testPlans(plans, false, {}, resultOrError, 1); - - return resultOrError.exceptionPtr != nullptr; + if (customVerification && + (!aggregateOrderSensitive || customVerifier == nullptr || + customVerifier->supportsVerify())) { + // We have custom verification enabled and: + // 1) the aggregate function is not order sensitive (sorting the input won't + // have an effect on the output) or + // 2) the custom verifier is null (we've deliberately turned off + // verification of this aggregation) or + // 3) the custom verifier supports verification (it can't compare the + // results of the aggregation with the reference DB) + // keep the custom verifier enabled. + return compareEquivalentPlanResults( + plans, customVerification, input, customVerifier, 1); + } else { + // If custom verification is not enabled or the custom verifier is used for + // compare and the aggregation is order sensitive (the result shoudl be + // deterministic if the input is sorted), then compare the results directly. + return compareEquivalentPlanResults(plans, false, input, nullptr, 1); + } } // verifyAggregation(std::vector plans) is tied to plan @@ -1029,7 +1030,7 @@ bool AggregationFuzzer::compareEquivalentPlanResults( const std::vector& plans, bool customVerification, const std::vector& input, - const std::vector>& customVerifiers, + const std::shared_ptr& customVerifier, int32_t maxDrivers, bool testWithSpilling) { try { @@ -1078,7 +1079,7 @@ bool AggregationFuzzer::compareEquivalentPlanResults( mergeRowVectors(referenceResult.first.value(), pool_.get()); compare( - resultOrError, customVerification, customVerifiers, expected); + resultOrError, customVerification, {customVerifier}, expected); ++stats_.numVerified; } } @@ -1088,7 +1089,7 @@ bool AggregationFuzzer::compareEquivalentPlanResults( testPlans( plans, customVerification, - customVerifiers, + {customVerifier}, resultOrError, maxDrivers, testWithSpilling); @@ -1104,23 +1105,33 @@ bool AggregationFuzzer::compareEquivalentPlanResults( bool AggregationFuzzer::verifyDistinctAggregation( const std::vector& groupingKeys, - const std::vector& aggregates, + const std::string& aggregate, const std::vector& masks, const std::vector& input, bool customVerification, - const std::vector>& customVerifiers) { - const auto firstPlan = PlanBuilder() - .values(input) - .singleAggregation(groupingKeys, aggregates, masks) - .planNode(); + const std::shared_ptr& customVerifier) { + const auto firstPlan = + PlanBuilder() + .values(input) + .singleAggregation(groupingKeys, {aggregate}, masks) + .planNode(); if (customVerification) { - initializeVerifiers(firstPlan, customVerifiers, input, groupingKeys); + if (customVerification && customVerifier != nullptr) { + const auto& aggregationNode = + std::dynamic_pointer_cast(firstPlan); + + customVerifier->initialize( + input, + groupingKeys, + aggregationNode->aggregates()[0], + aggregationNode->aggregateNames()[0]); + } } SCOPE_EXIT { - if (customVerification) { - resetCustomVerifiers(customVerifiers); + if (customVerification && customVerifier != nullptr) { + customVerifier->reset(); } }; @@ -1135,7 +1146,7 @@ bool AggregationFuzzer::verifyDistinctAggregation( .orderBy(groupingKeys, false) .streamingAggregation( groupingKeys, - aggregates, + {aggregate}, masks, core::AggregationNode::Step::kSingle, false) @@ -1154,7 +1165,7 @@ bool AggregationFuzzer::verifyDistinctAggregation( plans.push_back( {PlanBuilder() .tableScan(inputRowType) - .singleAggregation(groupingKeys, aggregates, masks) + .singleAggregation(groupingKeys, {aggregate}, masks) .planNode(), splits}); @@ -1165,7 +1176,7 @@ bool AggregationFuzzer::verifyDistinctAggregation( .orderBy(groupingKeys, false) .streamingAggregation( groupingKeys, - aggregates, + {aggregate}, masks, core::AggregationNode::Step::kSingle, false) @@ -1181,7 +1192,7 @@ bool AggregationFuzzer::verifyDistinctAggregation( // Distinct aggregation must run single-threaded or data must be partitioned // on group-by keys among threads. return compareEquivalentPlanResults( - plans, customVerification, input, customVerifiers, 1, false); + plans, customVerification, input, customVerifier, 1, false); } } // namespace