Skip to content

Commit

Permalink
Add more debug info for flaky aggregation spill stats check (#9434)
Browse files Browse the repository at this point in the history
Summary:
Aggregation test is flaky on circle ci which has seen unexpected spill stats with zero partial
input rows: #9292
Since we can't reproduce this offline, add one more log to print out the number of input rows
for final aggregation. If there is non-zero final aggregation input rows, then we need to look
into if task stats collection is not reliable. Eventually, we shall check spill stats based on whether
spill injections have been triggered or not which is more reliable. But let's first figure out if
anything else in the test is wrong first.

Pull Request resolved: #9434

Reviewed By: mbasmanova

Differential Revision: D55987679

Pulled By: xiaoxmeng

fbshipit-source-id: 8789f077adc877d1762f20b391c77ab87f4e6a61
  • Loading branch information
xiaoxmeng authored and facebook-github-bot committed Apr 10, 2024
1 parent b5ea2d7 commit a3b4849
Showing 1 changed file with 22 additions and 9 deletions.
31 changes: 22 additions & 9 deletions velox/functions/lib/aggregates/tests/utils/AggregationTestBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,7 @@ void AggregationTestBase::testAggregationsWithCompanion(
// Spilling needs at least 2 batches of input. Use round-robin
// repartitioning to split input into multiple batches.
core::PlanNodeId partialNodeId;
core::PlanNodeId finalNodeId;
builder.localPartitionRoundRobinRow()
.partialAggregation(groupingKeysWithPartialKey, paritialAggregates)
.capturePlanNodeId(partialNodeId)
Expand All @@ -390,6 +391,7 @@ void AggregationTestBase::testAggregationsWithCompanion(
.capturePlanNodeId(partialNodeId)
.localPartition(groupingKeys)
.finalAggregation()
.capturePlanNodeId(finalNodeId)
.project(extractExpressions);

if (!postAggregationProjections.empty()) {
Expand All @@ -409,18 +411,22 @@ void AggregationTestBase::testAggregationsWithCompanion(
auto task = assertResults(queryBuilder);

// Expect > 0 spilled bytes unless there was no input.
const auto inputRows =
const auto partialInputRows =
toPlanStats(task->taskStats()).at(partialNodeId).inputRows;
if (inputRows > 1) {
const auto finalInputRows =
toPlanStats(task->taskStats()).at(finalNodeId).inputRows;
if (partialInputRows > 1) {
EXPECT_LT(0, spilledBytes(*task))
<< "inputRows: " << inputRows
<< "partial inputRows: " << partialInputRows
<< " final inputRows: " << finalInputRows
<< " spilledRows: " << spilledRows(*task)
<< " spilledInputBytes: " << spilledInputBytes(*task)
<< " spilledFiles: " << spilledFiles(*task)
<< " injectedSpills: " << exec::injectedSpillCount();
} else {
EXPECT_EQ(0, spilledBytes(*task))
<< "inputRows: " << inputRows
<< "partial inputRows: " << partialInputRows
<< " final inputRows: " << finalInputRows
<< " spilledRows: " << spilledRows(*task)
<< " spilledInputBytes: " << spilledInputBytes(*task)
<< " spilledFiles: " << spilledFiles(*task)
Expand Down Expand Up @@ -816,11 +822,13 @@ void AggregationTestBase::testAggregationsImpl(
// Spilling needs at least 2 batches of input. Use round-robin
// repartitioning to split input into multiple batches.
core::PlanNodeId partialNodeId;
core::PlanNodeId finalNodeId;
builder.localPartitionRoundRobinRow()
.partialAggregation(groupingKeys, aggregates)
.capturePlanNodeId(partialNodeId)
.localPartition(groupingKeys)
.finalAggregation();
.finalAggregation()
.capturePlanNodeId(finalNodeId);

if (!postAggregationProjections.empty()) {
builder.project(postAggregationProjections);
Expand All @@ -842,10 +850,14 @@ void AggregationTestBase::testAggregationsImpl(
auto task = assertResults(queryBuilder);

// Expect > 0 spilled bytes unless there was no input.
auto inputRows = toPlanStats(task->taskStats()).at(partialNodeId).inputRows;
if (inputRows > 1) {
const auto partialInputRows =
toPlanStats(task->taskStats()).at(partialNodeId).inputRows;
const auto finalInputRows =
toPlanStats(task->taskStats()).at(finalNodeId).inputRows;
if (partialInputRows > 1) {
EXPECT_LT(0, spilledBytes(*task))
<< "inputRows: " << inputRows
<< "partial inputRows: " << partialInputRows
<< " final inputRows: " << finalInputRows
<< " spilledRows: " << spilledRows(*task)
<< " spilledInputBytes: " << spilledInputBytes(*task)
<< " spilledFiles: " << spilledFiles(*task)
Expand All @@ -856,7 +868,8 @@ void AggregationTestBase::testAggregationsImpl(
memory::spillMemoryPool()->stats().peakBytes, peakSpillMemoryUsage);
} else {
EXPECT_EQ(0, spilledBytes(*task))
<< "inputRows: " << inputRows
<< "partial inputRows: " << partialInputRows
<< " final inputRows: " << finalInputRows
<< " spilledRows: " << spilledRows(*task)
<< " spilledInputBytes: " << spilledInputBytes(*task)
<< " spilledFiles: " << spilledFiles(*task)
Expand Down

0 comments on commit a3b4849

Please sign in to comment.