Skip to content

Commit

Permalink
fix(hashjoin): Turn off dynamic filter push downs for null aware righ…
Browse files Browse the repository at this point in the history
…t semi porject join (facebookincubator#11781)

Summary:

Currently, when there are no matches for a null aware right semi project join but the build side has nulls, the dynamic push down filter filters out all the rows from the probe side. This causes the probe side to be considered empty and therefore sets the entire match column to falses, even in rows where the match value should be null.

Reviewed By: Yuhta

Differential Revision: D66903863
  • Loading branch information
Daniel Hunte authored and facebook-github-bot committed Dec 14, 2024
1 parent b652a99 commit 2bbec57
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 48 deletions.
9 changes: 3 additions & 6 deletions velox/exec/HashProbe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,8 @@ void HashProbe::asyncWaitForHashTable() {
}
} else if (
(isInnerJoin(joinType_) || isLeftSemiFilterJoin(joinType_) ||
isRightSemiFilterJoin(joinType_) || isRightSemiProjectJoin(joinType_)) &&
isRightSemiFilterJoin(joinType_) ||
(isRightSemiProjectJoin(joinType_) && !nullAware_)) &&
table_->hashMode() != BaseHashTable::HashMode::kHash && !isSpillInput() &&
!hasMoreSpillData()) {
// Find out whether there are any upstream operators that can accept dynamic
Expand All @@ -443,13 +444,9 @@ void HashProbe::asyncWaitForHashTable() {
const auto channels = operatorCtx_->driverCtx()->driver->canPushdownFilters(
this, keyChannels_);

// Null aware Right Semi Project join needs to know whether there are any
// nulls on the probe side. Hence, cannot filter these out.
const auto nullAllowed = isRightSemiProjectJoin(joinType_) && nullAware_;

for (auto i = 0; i < keyChannels_.size(); ++i) {
if (channels.find(keyChannels_[i]) != channels.end()) {
if (auto filter = buildHashers[i]->getFilter(nullAllowed)) {
if (auto filter = buildHashers[i]->getFilter(/*nullAllowed=*/false)) {
dynamicFilters_.emplace(keyChannels_[i], std::move(filter));
}
}
Expand Down
102 changes: 60 additions & 42 deletions velox/exec/tests/HashJoinTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3294,62 +3294,80 @@ TEST_P(MultiThreadedHashJoinTest, noSpillLevelLimit) {
.run();
}

// Verify that dynamic filter pushed down from null-aware right semi project
// join into table scan doesn't filter out nulls.
// Verify that dynamic filter pushed down is turned off for null-aware right
// semi project join.
TEST_F(HashJoinTest, nullAwareRightSemiProjectOverScan) {
auto probe = makeRowVector(
std::vector<RowVectorPtr> probes;
std::vector<RowVectorPtr> builds;
// Matches present:
probes.push_back(makeRowVector(
{"t0"},
{
makeNullableFlatVector<int32_t>({1, std::nullopt, 2}),
});
}));
builds.push_back(makeRowVector(
{"u0"},
{
makeNullableFlatVector<int32_t>({1, 2, 3, std::nullopt}),
}));

auto build = makeRowVector(
// No matches present:
probes.push_back(makeRowVector(
{"t0"},
{
makeFlatVector<int32_t>({5, 6}),
}));
builds.push_back(makeRowVector(
{"u0"},
{
makeNullableFlatVector<int32_t>({1, 2, 3, std::nullopt}),
});
}));

std::shared_ptr<TempFilePath> probeFile = TempFilePath::create();
writeToFile(probeFile->getPath(), {probe});
for (int i = 0; i < probes.size(); i++) {
RowVectorPtr& probe = probes[i];
RowVectorPtr& build = builds[i];
std::shared_ptr<TempFilePath> probeFile = TempFilePath::create();
writeToFile(probeFile->getPath(), {probe});

std::shared_ptr<TempFilePath> buildFile = TempFilePath::create();
writeToFile(buildFile->getPath(), {build});
std::shared_ptr<TempFilePath> buildFile = TempFilePath::create();
writeToFile(buildFile->getPath(), {build});

createDuckDbTable("t", {probe});
createDuckDbTable("u", {build});
createDuckDbTable("t", {probe});
createDuckDbTable("u", {build});

core::PlanNodeId probeScanId;
core::PlanNodeId buildScanId;
auto planNodeIdGenerator = std::make_shared<core::PlanNodeIdGenerator>();
auto plan = PlanBuilder(planNodeIdGenerator)
.tableScan(asRowType(probe->type()))
.capturePlanNodeId(probeScanId)
.hashJoin(
{"t0"},
{"u0"},
PlanBuilder(planNodeIdGenerator)
.tableScan(asRowType(build->type()))
.capturePlanNodeId(buildScanId)
.planNode(),
"",
{"u0", "match"},
core::JoinType::kRightSemiProject,
true /*nullAware*/)
.planNode();
core::PlanNodeId probeScanId;
core::PlanNodeId buildScanId;
auto planNodeIdGenerator = std::make_shared<core::PlanNodeIdGenerator>();
auto plan = PlanBuilder(planNodeIdGenerator)
.tableScan(asRowType(probe->type()))
.capturePlanNodeId(probeScanId)
.hashJoin(
{"t0"},
{"u0"},
PlanBuilder(planNodeIdGenerator)
.tableScan(asRowType(build->type()))
.capturePlanNodeId(buildScanId)
.planNode(),
"",
{"u0", "match"},
core::JoinType::kRightSemiProject,
true /*nullAware*/)
.planNode();

SplitInput splitInput = {
{probeScanId,
{exec::Split(makeHiveConnectorSplit(probeFile->getPath()))}},
{buildScanId,
{exec::Split(makeHiveConnectorSplit(buildFile->getPath()))}},
};
SplitInput splitInput = {
{probeScanId,
{exec::Split(makeHiveConnectorSplit(probeFile->getPath()))}},
{buildScanId,
{exec::Split(makeHiveConnectorSplit(buildFile->getPath()))}},
};

HashJoinBuilder(*pool_, duckDbQueryRunner_, driverExecutor_.get())
.planNode(plan)
.inputSplits(splitInput)
.checkSpillStats(false)
.referenceQuery("SELECT u0, u0 IN (SELECT t0 FROM t) FROM u")
.run();
HashJoinBuilder(*pool_, duckDbQueryRunner_, driverExecutor_.get())
.planNode(plan)
.inputSplits(splitInput)
.checkSpillStats(false)
.referenceQuery("SELECT u0, u0 IN (SELECT t0 FROM t) FROM u")
.run();
}
}

TEST_F(HashJoinTest, duplicateJoinKeys) {
Expand Down

0 comments on commit 2bbec57

Please sign in to comment.