Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GLUTEN-6425][CH] Support day time internval #6456

Merged
merged 1 commit into from
Jul 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -897,6 +897,14 @@ class CHSparkPlanExecApi extends SparkPlanExecApi {
GenericExpressionTransformer(substraitExprName, Seq(argument, function), expr)
}

override def genDateAddTransformer(
attributeSeq: Seq[Attribute],
substraitExprName: String,
children: Seq[Expression],
expr: Expression): ExpressionTransformer = {
DateAddTransformer(attributeSeq, substraitExprName, children, expr).doTransform()
}

override def genPreProjectForGenerate(generate: GenerateExec): SparkPlan = generate

override def genPostProjectForGenerate(generate: GenerateExec): SparkPlan = generate
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.expression

import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String

case class DateAddTransformer(
attributeSeq: Seq[Attribute],
substraitExprName: String,
children: Seq[Expression],
expr: Expression)
extends Logging {

def doTransform(): ExpressionTransformer = {
expr match {
case dateAdd: DateAdd =>
doDateAddTransform()
case timeAdd: TimeAdd =>
doTimeAddTransform()
case other => doDefaultTransorm()
}
}

private def unwrapExtractANSIIntervalDays(extractExpr: Expression): Expression = {
extractExpr match {
case extractDays: ExtractANSIIntervalDays =>
extractDays.child match {
case literal: Literal if literal.dataType.isInstanceOf[DayTimeIntervalType] =>
val (intVal, unitVal) = parseDayIntervalType(literal)
if (unitVal.equals("DAY")) {
Literal(intVal, IntegerType)
} else {
null
}
case _ => null
}
case _ => null
}
}

private def doDateAddTransform(): ExpressionTransformer = {
children(1) match {
case extractDays: ExtractANSIIntervalDays =>
val daysLiteral = unwrapExtractANSIIntervalDays(children(1))
if (daysLiteral != null) {
val daysExpr = LiteralTransformer(daysLiteral)
val dateExpr =
ExpressionConverter.replaceWithExpressionTransformer(children(0), attributeSeq)
GenericExpressionTransformer(substraitExprName, Seq(dateExpr, daysExpr), expr)
} else {
doDefaultTransorm()
}
case minus: UnaryMinus =>
val child = minus.child match {
case extractDays: ExtractANSIIntervalDays =>
unwrapExtractANSIIntervalDays(extractDays)
case e => e
}
if (child != null) {
val newMinus = minus.withNewChildren(Seq(child))
val dateExpr =
ExpressionConverter.replaceWithExpressionTransformer(children(0), attributeSeq)
val minusExpr =
ExpressionConverter.replaceWithExpressionTransformer(newMinus, attributeSeq)
GenericExpressionTransformer(substraitExprName, Seq(dateExpr, minusExpr), expr)
} else {
doDefaultTransorm()
}
case _ => doDefaultTransorm()
}
}

private def doTimeAddTransform(): ExpressionTransformer = {
children(1) match {
case literal: Literal if literal.dataType.isInstanceOf[DayTimeIntervalType] =>
val (intVal, unitVal) = parseDayIntervalType(literal)
if (unitVal != null) {
val timeExpr =
ExpressionConverter.replaceWithExpressionTransformer(children(0), attributeSeq)

val intExpr = Literal(intVal, IntegerType)
val unitExpr = Literal(UTF8String.fromString(unitVal), StringType)
val newExpr = expr.withNewChildren(Seq(unitExpr, intExpr))
GenericExpressionTransformer(
substraitExprName,
Seq(LiteralTransformer(unitExpr), LiteralTransformer(intVal), timeExpr),
newExpr)
} else {
doDefaultTransorm()
}
case minus: UnaryMinus =>
minus.child match {
case literal: Literal if literal.dataType.isInstanceOf[DayTimeIntervalType] =>
val (intVal, unitVal) = parseDayIntervalType(literal)
if (unitVal != null) {
val intExpr = minus.withNewChildren(Seq(Literal(intVal, IntegerType)))
val unitExpr = Literal(UTF8String.fromString(unitVal), StringType)
val newExpr = expr.withNewChildren(Seq(unitExpr, intExpr))
GenericExpressionTransformer(
substraitExprName,
Seq(LiteralTransformer(unitExpr), LiteralTransformer(intVal)),
newExpr)
} else {
doDefaultTransorm()
}
case _ => doDefaultTransorm()
}
case _ => doDefaultTransorm()
}
}

private def doDefaultTransorm(): ExpressionTransformer = {
// transorm it in a generic way
val childrenTransformers =
children.map(ExpressionConverter.replaceWithExpressionTransformer(_, attributeSeq))
GenericExpressionTransformer(substraitExprName, childrenTransformers, expr)
}

private def parseDayIntervalType(literal: Literal): (Integer, String) = {
literal.dataType match {
case dayIntervalType: DayTimeIntervalType =>
val intVal = literal.value.asInstanceOf[Long]
(dayIntervalType.startField, dayIntervalType.endField) match {
case (0, 0) => ((literal.value.asInstanceOf[Long] / 1000000 / 3600 / 24).toInt, "DAY")
case (1, 1) => ((literal.value.asInstanceOf[Long] / 1000000 / 3600).toInt, "HOUR")
case (2, 2) => ((literal.value.asInstanceOf[Long] / 1000000L / 60L).toInt, "MINUTE")
case (3, 3) => (literal.value.asInstanceOf[Long].toInt / 1000000, "SECOND")
case _ => (0, null)
}
case _ => (0, null)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2727,5 +2727,47 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends GlutenClickHouseTPCHAbstr
compareResultsAgainstVanillaSpark(select_sql, true, { _ => })
spark.sql("drop table test_tbl_4451")
}

test("array functions date add") {
spark.sql("create table tb_date(day Date) using parquet")
spark.sql("""
|insert into tb_date values
|(cast('2024-06-01' as Date)),
|(cast('2024-06-02' as Date)),
|(cast('2024-06-03' as Date)),
|(cast('2024-06-04' as Date)),
|(cast('2024-06-05' as Date))
|""".stripMargin)
val sql1 = """
|select * from tb_date where day between
|'2024-06-01' and
|cast('2024-06-01' as Date) + interval 2 day
|order by day
|""".stripMargin
compareResultsAgainstVanillaSpark(sql1, true, { _ => })
val sql2 = """
|select * from tb_date where day between
|'2024-06-01' and
|cast('2024-06-01' as Date) + interval 48 hour
|order by day
|""".stripMargin
compareResultsAgainstVanillaSpark(sql2, true, { _ => })
val sql3 = """
|select * from tb_date where day between
|'2024-06-01' and
|cast('2024-06-05' as Date) - interval 2 day
|order by day
|""".stripMargin
compareResultsAgainstVanillaSpark(sql3, true, { _ => })
val sql4 = """
|select * from tb_date where day between
|'2024-06-01' and
|cast('2024-06-05' as Date) - interval 48 hour
|order by day
|""".stripMargin
compareResultsAgainstVanillaSpark(sql4, true, { _ => })

spark.sql("drop table tb_date")
}
}
// scalastyle:on line.size.limit
Original file line number Diff line number Diff line change
Expand Up @@ -47,23 +47,27 @@ class FunctionParserTimestampAdd : public FunctionParser
const ActionsDAG::Node * parse(const substrait::Expression_ScalarFunction & substrait_func, ActionsDAGPtr & actions_dag) const override
{
auto parsed_args = parseFunctionArguments(substrait_func, actions_dag);
if (parsed_args.size() != 4)
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Function {} requires exactly four arguments", getName());
if (parsed_args.size() < 3)
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Function {} requires at least three arguments", getName());

const auto & unit_field = substrait_func.arguments().at(0);
if (!unit_field.value().has_literal() || !unit_field.value().literal().has_string())
throw Exception(
ErrorCodes::BAD_ARGUMENTS, "Unsupported unit argument, should be a string literal, but: {}", unit_field.DebugString());

const auto & timezone_field = substrait_func.arguments().at(3);
if (!timezone_field.value().has_literal() || !timezone_field.value().literal().has_string())
String timezone;
if (parsed_args.size() == 4)
{
const auto & timezone_field = substrait_func.arguments().at(3);
if (!timezone_field.value().has_literal() || !timezone_field.value().literal().has_string())
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Unsupported timezone_field argument, should be a string literal, but: {}",
timezone_field.DebugString());
timezone = timezone_field.value().literal().string();
}

const auto & unit = Poco::toUpper(unit_field.value().literal().string());
auto timezone = timezone_field.value().literal().string();

std::string ch_function_name;
if (unit == "MICROSECOND")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,18 @@ trait SparkPlanExecApi {
throw new GlutenNotSupportException("PreciseTimestampConversion is not supported")
}

// For date_add(cast('2001-01-01' as Date), interval 1 day), backends may handle it in different
// ways
def genDateAddTransformer(
attributeSeq: Seq[Attribute],
substraitExprName: String,
children: Seq[Expression],
expr: Expression): ExpressionTransformer = {
val childrenTransformers =
children.map(ExpressionConverter.replaceWithExpressionTransformer(_, attributeSeq))
GenericExpressionTransformer(substraitExprName, childrenTransformers, expr)
}

/**
* Generate ShuffleDependency for ColumnarShuffleExchangeExec.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -686,6 +686,20 @@ object ExpressionConverter extends SQLConfHelper with Logging {
LiteralTransformer(Literal(Math.E))
case p: Pi =>
LiteralTransformer(Literal(Math.PI))
case dateAdd: DateAdd =>
BackendsApiManager.getSparkPlanExecApiInstance.genDateAddTransformer(
attributeSeq,
substraitExprName,
dateAdd.children,
dateAdd
)
case timeAdd: TimeAdd =>
BackendsApiManager.getSparkPlanExecApiInstance.genDateAddTransformer(
attributeSeq,
substraitExprName,
timeAdd.children,
timeAdd
)
case expr =>
GenericExpressionTransformer(
substraitExprName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ object ExpressionMappings {
Sig[Second](EXTRACT),
Sig[FromUnixTime](FROM_UNIXTIME),
Sig[DateAdd](DATE_ADD),
Sig[TimeAdd](TIMESTAMP_ADD),
Sig[DateSub](DATE_SUB),
Sig[DateDiff](DATE_DIFF),
Sig[ToUnixTimestamp](TO_UNIX_TIMESTAMP),
Expand Down
Loading