Skip to content

Commit

Permalink
[CORE] Fix non-deterministic filter executed twice when push down to …
Browse files Browse the repository at this point in the history
…scan (#6296)

Co-authored-by: wangguangxin.cn <[email protected]>
  • Loading branch information
zml1206 and WangGuangxin authored Jul 2, 2024
1 parent 457898b commit 264ff2e
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -1892,4 +1892,23 @@ class TestOperator extends VeloxWholeStageTransformerSuite with AdaptiveSparkPla
}
}
}

test("fix non-deterministic filter executed twice when push down to scan") {
val df = sql("select * from lineitem where rand() <= 0.5")
// plan check
val plan = df.queryExecution.executedPlan
val scans = plan.collect { case scan: FileSourceScanExecTransformer => scan }
val filters = plan.collect { case filter: FilterExecTransformer => filter }
assert(scans.size == 1)
assert(filters.size == 1)
assert(scans(0).dataFilters.size == 1)
val remainingFilters = FilterHandler.getRemainingFilters(
scans(0).dataFilters,
splitConjunctivePredicates(filters(0).condition))
assert(remainingFilters.size == 0)

// result length check, table lineitem has 60,000 rows
val resultLength = df.collect().length
assert(resultLength > 25000 && resultLength < 35000)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ object FilterHandler extends PredicateHelper {
* the filter conditions not pushed down into Scan.
*/
def getRemainingFilters(scanFilters: Seq[Expression], filters: Seq[Expression]): Seq[Expression] =
(ExpressionSet(filters) -- ExpressionSet(scanFilters)).toSeq
(filters.toSet -- scanFilters.toSet).toSeq

// Separate and compare the filter conditions in Scan and Filter.
// Try to push down the remaining conditions in Filter into Scan.
Expand Down

0 comments on commit 264ff2e

Please sign in to comment.