Skip to content

Commit

Permalink
[GLUTEN-4302][CH] Fixed bugs about rewriting date comparison (#4303)
Browse files Browse the repository at this point in the history
[CH] Fixed bugs about rewriting date comparison
  • Loading branch information
lgbo-ustc authored Jan 9, 2024
1 parent 9e24e03 commit 14a3299
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -344,12 +344,9 @@ class CHSparkPlanExecApi extends SparkPlanExecApi {
* @return
*/
override def genExtendedAnalyzers(): List[SparkSession => Rule[LogicalPlan]] = {
val analyzers = List(spark => new ClickHouseAnalysis(spark, spark.sessionState.conf))
if (GlutenConfig.getConf.enableRewriteDateTimestampComparison) {
analyzers :+ (spark => new RewriteDateTimestampComparisonRule(spark, spark.sessionState.conf))
} else {
analyzers
}
List(
spark => new ClickHouseAnalysis(spark, spark.sessionState.conf),
spark => new RewriteDateTimestampComparisonRule(spark, spark.sessionState.conf))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.spark.sql.extension

import io.glutenproject.GlutenConfig

import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions._
Expand Down Expand Up @@ -46,7 +48,11 @@ class RewriteDateTimestampComparisonRule(session: SparkSession, conf: SQLConf)
}

override def apply(plan: LogicalPlan): LogicalPlan = {
if (plan.resolved) {
if (
plan.resolved &&
GlutenConfig.getConf.enableGluten &&
GlutenConfig.getConf.enableRewriteDateTimestampComparison
) {
visitPlan(plan)
} else {
plan
Expand Down Expand Up @@ -193,6 +199,7 @@ class RewriteDateTimestampComparisonRule(session: SparkSession, conf: SQLConf)
Add(toUnixTimestampExpr, adjustExpr)
}

// rewrite an expressiont that converts unix timestamp to date back to unix timestamp
private def rewriteUnixTimestampToDate(expr: Expression): Expression = {
expr match {
case toDate: ParseToDate =>
Expand Down Expand Up @@ -302,25 +309,15 @@ class RewriteDateTimestampComparisonRule(session: SparkSession, conf: SQLConf)
return cmp
}
val zoneId = getTimeZoneId(cmp.left)
val timestampLeft = rewriteUnixTimestampToDate(cmp.left)
val adjustedOffset = Literal(TimeUnitToSeconds(timeUnit.get), timestampLeft.dataType)
val addjustedOffsetExpr = Remainder(timestampLeft, adjustedOffset)
val newLeft = Subtract(timestampLeft, addjustedOffsetExpr)
val newLeft = rewriteUnixTimestampToDate(cmp.left)
val adjustedOffset = Literal(TimeUnitToSeconds(timeUnit.get), newLeft.dataType)
val newRight = rewriteConstDate(cmp.right, timeUnit.get, zoneId, 0)
EqualTo(newLeft, newRight)
val leftBound = GreaterThanOrEqual(newLeft, newRight)
val rigtBound = LessThan(newLeft, Add(newRight, adjustedOffset))
And(leftBound, rigtBound)
}

private def rewriteEqualNullSafe(cmp: EqualNullSafe): Expression = {
val timeUnit = getDateTimeUnit(cmp.left)
if (timeUnit.isEmpty) {
return cmp
}
val zoneId = getTimeZoneId(cmp.left)
val timestampLeft = rewriteUnixTimestampToDate(cmp.left)
val adjustedOffset = Literal(TimeUnitToSeconds(timeUnit.get), timestampLeft.dataType)
val addjustedOffsetExpr = Remainder(timestampLeft, adjustedOffset)
val newLeft = Subtract(timestampLeft, addjustedOffsetExpr)
val newRight = rewriteConstDate(cmp.right, timeUnit.get, zoneId, 0)
EqualNullSafe(newLeft, newRight)
cmp
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ class GlutenConfig(conf: SQLConf) extends Logging {

def enableAnsiMode: Boolean = conf.ansiEnabled

def enableGluten: Boolean = conf.getConf(GLUTEN_ENABLED)

// FIXME the option currently controls both JVM and native validation against a Substrait plan.
def enableNativeValidation: Boolean = conf.getConf(NATIVE_VALIDATION_ENABLED)

Expand Down Expand Up @@ -1411,7 +1413,7 @@ object GlutenConfig {
buildConf("spark.gluten.sql.rewrite.dateTimestampComparison")
.internal()
.doc("Rewrite the comparision between date and timestamp to timestamp comparison."
+ "For example `fron_unixtime(ts) > date` will be rewritten to `ts > to_unixtime(date)`")
+ "For example `from_unixtime(ts) > date` will be rewritten to `ts > to_unixtime(date)`")
.booleanConf
.createWithDefault(true)

Expand Down

0 comments on commit 14a3299

Please sign in to comment.