diff --git a/velox/functions/lib/aggregates/tests/utils/AggregationTestBase.cpp b/velox/functions/lib/aggregates/tests/utils/AggregationTestBase.cpp index 65d6a9a1bca6..7a04de032494 100644 --- a/velox/functions/lib/aggregates/tests/utils/AggregationTestBase.cpp +++ b/velox/functions/lib/aggregates/tests/utils/AggregationTestBase.cpp @@ -381,6 +381,7 @@ void AggregationTestBase::testAggregationsWithCompanion( // Spilling needs at least 2 batches of input. Use round-robin // repartitioning to split input into multiple batches. core::PlanNodeId partialNodeId; + core::PlanNodeId finalNodeId; builder.localPartitionRoundRobinRow() .partialAggregation(groupingKeysWithPartialKey, paritialAggregates) .capturePlanNodeId(partialNodeId) @@ -390,6 +391,7 @@ void AggregationTestBase::testAggregationsWithCompanion( .capturePlanNodeId(partialNodeId) .localPartition(groupingKeys) .finalAggregation() + .capturePlanNodeId(finalNodeId) .project(extractExpressions); if (!postAggregationProjections.empty()) { @@ -409,18 +411,22 @@ void AggregationTestBase::testAggregationsWithCompanion( auto task = assertResults(queryBuilder); // Expect > 0 spilled bytes unless there was no input. - const auto inputRows = + const auto partialInputRows = toPlanStats(task->taskStats()).at(partialNodeId).inputRows; - if (inputRows > 1) { + const auto finalInputRows = + toPlanStats(task->taskStats()).at(finalNodeId).inputRows; + if (partialInputRows > 1) { EXPECT_LT(0, spilledBytes(*task)) - << "inputRows: " << inputRows + << "partial inputRows: " << partialInputRows + << " final inputRows: " << finalInputRows << " spilledRows: " << spilledRows(*task) << " spilledInputBytes: " << spilledInputBytes(*task) << " spilledFiles: " << spilledFiles(*task) << " injectedSpills: " << exec::injectedSpillCount(); } else { EXPECT_EQ(0, spilledBytes(*task)) - << "inputRows: " << inputRows + << "partial inputRows: " << partialInputRows + << " final inputRows: " << finalInputRows << " spilledRows: " << spilledRows(*task) << " spilledInputBytes: " << spilledInputBytes(*task) << " spilledFiles: " << spilledFiles(*task) @@ -816,11 +822,13 @@ void AggregationTestBase::testAggregationsImpl( // Spilling needs at least 2 batches of input. Use round-robin // repartitioning to split input into multiple batches. core::PlanNodeId partialNodeId; + core::PlanNodeId finalNodeId; builder.localPartitionRoundRobinRow() .partialAggregation(groupingKeys, aggregates) .capturePlanNodeId(partialNodeId) .localPartition(groupingKeys) - .finalAggregation(); + .finalAggregation() + .capturePlanNodeId(finalNodeId); if (!postAggregationProjections.empty()) { builder.project(postAggregationProjections); @@ -842,10 +850,14 @@ void AggregationTestBase::testAggregationsImpl( auto task = assertResults(queryBuilder); // Expect > 0 spilled bytes unless there was no input. - auto inputRows = toPlanStats(task->taskStats()).at(partialNodeId).inputRows; - if (inputRows > 1) { + const auto partialInputRows = + toPlanStats(task->taskStats()).at(partialNodeId).inputRows; + const auto finalInputRows = + toPlanStats(task->taskStats()).at(finalNodeId).inputRows; + if (partialInputRows > 1) { EXPECT_LT(0, spilledBytes(*task)) - << "inputRows: " << inputRows + << "partial inputRows: " << partialInputRows + << " final inputRows: " << finalInputRows << " spilledRows: " << spilledRows(*task) << " spilledInputBytes: " << spilledInputBytes(*task) << " spilledFiles: " << spilledFiles(*task) @@ -856,7 +868,8 @@ void AggregationTestBase::testAggregationsImpl( memory::spillMemoryPool()->stats().peakBytes, peakSpillMemoryUsage); } else { EXPECT_EQ(0, spilledBytes(*task)) - << "inputRows: " << inputRows + << "partial inputRows: " << partialInputRows + << " final inputRows: " << finalInputRows << " spilledRows: " << spilledRows(*task) << " spilledInputBytes: " << spilledInputBytes(*task) << " spilledFiles: " << spilledFiles(*task)