Skip to content

Commit

Permalink
[CORE] Fix non-deterministic filter executed twice when push down to …
Browse files Browse the repository at this point in the history
…scan
  • Loading branch information
zml1206 committed Jul 1, 2024
1 parent f80c068 commit e0ddb59
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -1892,4 +1892,18 @@ class TestOperator extends VeloxWholeStageTransformerSuite with AdaptiveSparkPla
}
}
}

test("fix non-deterministic filter executed twice when push down to scan") {
val df = sql("select * from lineitem where rand() <= 0.5")
val plan = df.queryExecution.executedPlan
val scans = plan.collect { case scan: FileSourceScanExecTransformer => scan }
val filters = plan.collect { case filter: FilterExecTransformer => filter }
assert(scans.size == 1)
assert(filters.size == 1)
assert(scans(0).dataFilters.size == 1)
val remainingFilters = FilterHandler.getRemainingFilters(
scans(0).dataFilters,
splitConjunctivePredicates(filters(0).condition))
assert(remainingFilters.size == 0)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ object FilterHandler extends PredicateHelper {
* the filter conditions not pushed down into Scan.
*/
def getRemainingFilters(scanFilters: Seq[Expression], filters: Seq[Expression]): Seq[Expression] =
(ExpressionSet(filters) -- ExpressionSet(scanFilters)).toSeq
(filters.toSet -- scanFilters.toSet).toSeq

// Separate and compare the filter conditions in Scan and Filter.
// Try to push down the remaining conditions in Filter into Scan.
Expand Down

0 comments on commit e0ddb59

Please sign in to comment.