Skip to content

Commit

Permalink
Enable probe spill with dynamic filter replaced (#10849)
Browse files Browse the repository at this point in the history
Summary:
Enable probe spill with dynamic filter replaced. Now the fuzzer test can pass 10hrs run and the
previous issue in table scan for lazy vector processing might have been fixed.

Pull Request resolved: #10849

Reviewed By: gggrace14, tanjialiang

Differential Revision: D61834373

Pulled By: xiaoxmeng

fbshipit-source-id: bc5361f16eb25ad6867c556b1cd78583be494bd4
  • Loading branch information
xiaoxmeng authored and facebook-github-bot committed Sep 1, 2024
1 parent ecebbbf commit 6e52cbd
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 8 deletions.
4 changes: 1 addition & 3 deletions velox/exec/HashProbe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1654,9 +1654,7 @@ void HashProbe::ensureOutputFits() {
}

bool HashProbe::canReclaim() const {
// NOTE: we can't spill from a hash probe operator if it has generated dynamic
// filters.
return spillEnabled() && !hasGeneratedDynamicFilters_;
return spillEnabled();
}

void HashProbe::reclaim(
Expand Down
11 changes: 11 additions & 0 deletions velox/exec/Operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -657,6 +657,17 @@ uint64_t Operator::MemoryReclaimer::reclaim(
memory::ScopedReclaimedBytesRecorder recoder(pool, &reclaimedBytes);
op_->reclaim(targetBytes, stats);
}
// NOTE: the parallel hash build is running at the background thread
// pool which won't stop during memory reclamation so the operator's
// memory usage might increase in such case. memory usage.
if (op_->operatorType() == "HashBuild") {
reclaimedBytes = std::max<int64_t>(0, reclaimedBytes);
}
VELOX_CHECK_GE(
reclaimedBytes,
0,
"Unexpected memory growth after reclaim from operator memory pool {}",
pool->name());
return reclaimedBytes;
},
stats);
Expand Down
6 changes: 1 addition & 5 deletions velox/exec/tests/HashJoinTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8020,10 +8020,6 @@ DEBUG_ONLY_TEST_F(HashJoinTest, spillCheckOnLeftSemiFilterWithDynamicFilters) {
.values(buildVectors)
.project({"c0 AS u_c0", "c1 AS u_c1"})
.planNode();
auto keyOnlyBuildSide = PlanBuilder(planNodeIdGenerator, pool_.get())
.values(keyOnlyBuildVectors)
.project({"c0 AS u_c0"})
.planNode();

// Left semi join.
core::PlanNodeId probeScanId;
Expand Down Expand Up @@ -8070,7 +8066,7 @@ DEBUG_ONLY_TEST_F(HashJoinTest, spillCheckOnLeftSemiFilterWithDynamicFilters) {
// Verify spill hasn't triggered.
auto taskStats = exec::toPlanStats(task->taskStats());
auto& planStats = taskStats.at(joinNodeId);
ASSERT_EQ(planStats.spilledBytes, 0);
ASSERT_GT(planStats.spilledBytes, 0);
})
.run();
}
Expand Down

0 comments on commit 6e52cbd

Please sign in to comment.