Skip to content

Commit

Permalink
support inequal join
Browse files Browse the repository at this point in the history
  • Loading branch information
lgbo-ustc committed May 14, 2024
1 parent 182a029 commit 6d96819
Show file tree
Hide file tree
Showing 5 changed files with 208 additions and 149 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package org.apache.gluten.utils

import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions.{AttributeSet, EqualTo, Expression, GreaterThan, GreaterThanOrEqual, In, LessThan, LessThanOrEqual, Not, Or}
import org.apache.spark.sql.catalyst.expressions.{AttributeSet, Expression}
import org.apache.spark.sql.catalyst.plans.JoinType

/**
Expand Down Expand Up @@ -61,53 +61,6 @@ object CHJoinValidateUtil extends Logging {
return true
}
}
if (condition.isDefined) {
condition.get.transform {
case Or(l, r) =>
if (hasTwoTableColumn(leftOutputSet, rightOutputSet, l, r)) {
shouldFallback = true
}
Or(l, r)
case Not(EqualTo(l, r)) =>
if (hasTwoTableColumn(leftOutputSet, rightOutputSet, l, r)) {
shouldFallback = true
}
Not(EqualTo(l, r))
case LessThan(l, r) =>
if (hasTwoTableColumn(leftOutputSet, rightOutputSet, l, r)) {
shouldFallback = true
}
LessThan(l, r)
case LessThanOrEqual(l, r) =>
if (hasTwoTableColumn(leftOutputSet, rightOutputSet, l, r)) {
shouldFallback = true
}
LessThanOrEqual(l, r)
case GreaterThan(l, r) =>
if (hasTwoTableColumn(leftOutputSet, rightOutputSet, l, r)) {
shouldFallback = true
}
GreaterThan(l, r)
case GreaterThanOrEqual(l, r) =>
if (hasTwoTableColumn(leftOutputSet, rightOutputSet, l, r)) {
shouldFallback = true
}
GreaterThanOrEqual(l, r)
case In(l, r) =>
r.foreach(
e => {
if (hasTwoTableColumn(leftOutputSet, rightOutputSet, l, e)) {
shouldFallback = true
}
})
In(l, r)
case EqualTo(l, r) =>
if (hasTwoTableColumn(leftOutputSet, rightOutputSet, l, r)) {
shouldFallback = true
}
EqualTo(l, r)
}
}
shouldFallback
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2550,5 +2550,23 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends GlutenClickHouseTPCHAbstr
compareResultsAgainstVanillaSpark(select_sql, true, { _ => })
spark.sql("drop table test_tbl_5096")
}

test("Inequal join support") {
withSQLConf(("spark.sql.autoBroadcastJoinThreshold", "-1")) {
spark.sql("create table ineq_join_t1 (key bigint, value bigint) using parquet");
spark.sql("create table ineq_join_t2 (key bigint, value bigint) using parquet");
spark.sql("insert into ineq_join_t1 values(1, 1), (2, 2), (3, 3), (4, 4), (5, 5)");
spark.sql("insert into ineq_join_t2 values(2, 2), (2, 1), (3, 3), (4, 6), (5, 3)");
val sql =
"""
| select t1.key, t1.value, t2.key, t2.value from ineq_join_t1 as t1
| left join ineq_join_t2 as t2
| on t1.key = t2.key and t1.value > t2.value
|""".stripMargin
compareResultsAgainstVanillaSpark(sql, true, { _ => })
spark.sql("drop table ineq_join_t1")
spark.sql("drop table ineq_join_t2")
}
}
}
// scalastyle:on line.size.limit
Loading

0 comments on commit 6d96819

Please sign in to comment.