Skip to content

Commit

Permalink
[CORE][VL] Enable from_unixtime function and fix test failure (#4520)
Browse files Browse the repository at this point in the history
  • Loading branch information
PHILO-HE authored Jan 26, 2024
1 parent 6cdb55b commit 9b5525a
Show file tree
Hide file tree
Showing 12 changed files with 219 additions and 50 deletions.
1 change: 0 additions & 1 deletion cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ static const std::unordered_set<std::string> kBlackList = {
"concat_ws",
"from_json",
"json_array_length",
"from_unixtime",
"repeat",
"trunc",
"sequence",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,40 +87,6 @@ case class DateDiffTransformer(
}
}

case class FromUnixTimeTransformer(
substraitExprName: String,
sec: ExpressionTransformer,
format: ExpressionTransformer,
timeZoneId: Option[String] = None,
original: FromUnixTime)
extends ExpressionTransformer {

override def doTransform(args: java.lang.Object): ExpressionNode = {
val secNode = sec.doTransform(args)
val formatNode = format.doTransform(args)

val dataTypes = if (timeZoneId.isDefined) {
Seq(original.sec.dataType, original.format.dataType, StringType)
} else {
Seq(original.sec.dataType, original.format.dataType)
}
val functionMap = args.asInstanceOf[JHashMap[String, JLong]]
val functionId = ExpressionBuilder.newScalarFunction(
functionMap,
ConverterUtils.makeFuncName(substraitExprName, dataTypes))

val expressionNodes = new JArrayList[ExpressionNode]()
expressionNodes.add(secNode)
expressionNodes.add(formatNode)
if (timeZoneId.isDefined) {
expressionNodes.add(ExpressionBuilder.makeStringLiteral(timeZoneId.get))
}

val typeNode = ConverterUtils.getTypeNode(original.dataType, original.nullable)
ExpressionBuilder.makeScalarFunction(functionId, expressionNodes, typeNode)
}
}

/**
* The failOnError depends on the config for ANSI. ANSI is not supported currently. And timeZoneId
* is passed to backend config.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,14 +198,6 @@ object ExpressionConverter extends SQLConfHelper with Logging {
BoundReferenceTransformer(b.ordinal, b.dataType, b.nullable)
case l: Literal =>
LiteralTransformer(l)
case f: FromUnixTime =>
FromUnixTimeTransformer(
substraitExprName,
replaceWithExpressionTransformerInternal(f.sec, attributeSeq, expressionsMap),
replaceWithExpressionTransformerInternal(f.format, attributeSeq, expressionsMap),
f.timeZoneId,
f
)
case d: DateDiff =>
DateDiffTransformer(
substraitExprName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -704,6 +704,7 @@ class ClickHouseTestSettings extends BackendTestSettings {
.exclude("TruncTimestamp")
.exclude("unsupported fmt fields for trunc/date_trunc results null")
.exclude("from_unixtime")
.exclude(GLUTEN_TEST + "from_unixtime")
.exclude("unix_timestamp")
.exclude("to_unix_timestamp")
.exclude("to_utc_timestamp")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,8 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude("DateFormat")
// Legacy mode is not supported, assuming this mode is not commonly used.
.exclude("to_timestamp exception mode")
// Replaced by a gluten test to pass timezone through config.
.exclude("from_unixtime")
enableSuite[GlutenDecimalExpressionSuite]
enableSuite[GlutenStringFunctionsSuite]
enableSuite[GlutenRegexpExpressionsSuite]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@
*/
package org.apache.spark.sql.catalyst.expressions

import org.apache.spark.sql.{GlutenTestConstants, GlutenTestsTrait}
import org.apache.spark.sql.GlutenTestConstants.GLUTEN_TEST
import org.apache.spark.sql.GlutenTestsTrait
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.catalyst.util.DateTimeTestUtils._
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{getZoneId, TimeZoneUTC}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DateType, IntegerType, StringType, TimestampType}
import org.apache.spark.sql.types.{DateType, IntegerType, LongType, StringType, TimestampType}
import org.apache.spark.unsafe.types.UTF8String

import java.sql.{Date, Timestamp}
Expand Down Expand Up @@ -60,7 +61,7 @@ class GlutenDateExpressionsSuite extends DateExpressionsSuite with GlutenTestsTr
// checkResult(Int.MinValue.toLong - 100)
}

test(GlutenTestConstants.GLUTEN_TEST + "TIMESTAMP_MICROS") {
test(GLUTEN_TEST + "TIMESTAMP_MICROS") {
def testIntegralFunc(value: Number): Unit = {
checkEvaluation(MicrosToTimestamp(Literal(value)), value.longValue())
}
Expand All @@ -77,7 +78,7 @@ class GlutenDateExpressionsSuite extends DateExpressionsSuite with GlutenTestsTr
}

val outstandingTimezonesIds: Seq[String] = Seq(
// Velox doesn't support timezones like UTC.
// Velox doesn't support timezones like "UTC".
// "UTC",
PST.getId,
CET.getId,
Expand All @@ -88,7 +89,7 @@ class GlutenDateExpressionsSuite extends DateExpressionsSuite with GlutenTestsTr
"Europe/Brussels")
val outstandingZoneIds: Seq[ZoneId] = outstandingTimezonesIds.map(getZoneId)

test(GlutenTestConstants.GLUTEN_TEST + "unix_timestamp") {
test(GLUTEN_TEST + "unix_timestamp") {
withDefaultTimeZone(UTC) {
for (zid <- outstandingZoneIds) {
Seq("legacy", "corrected").foreach {
Expand Down Expand Up @@ -189,7 +190,7 @@ class GlutenDateExpressionsSuite extends DateExpressionsSuite with GlutenTestsTr
UnixTimestamp(Literal("2015-07-24"), Literal("\""), UTC_OPT) :: Nil)
}

test(GlutenTestConstants.GLUTEN_TEST + "to_unix_timestamp") {
test(GLUTEN_TEST + "to_unix_timestamp") {
withDefaultTimeZone(UTC) {
for (zid <- outstandingZoneIds) {
Seq("legacy", "corrected").foreach {
Expand Down Expand Up @@ -287,7 +288,7 @@ class GlutenDateExpressionsSuite extends DateExpressionsSuite with GlutenTestsTr
}

// Modified based on vanilla spark to explicitly set timezone in config.
test(GlutenTestConstants.GLUTEN_TEST + "DateFormat") {
test(GLUTEN_TEST + "DateFormat") {
val PST_OPT = Option("America/Los_Angeles")
val JST_OPT = Option("Asia/Tokyo")

Expand Down Expand Up @@ -343,4 +344,71 @@ class GlutenDateExpressionsSuite extends DateExpressionsSuite with GlutenTestsTr
}
}
}

test(GLUTEN_TEST + "from_unixtime") {
val outstandingTimezonesIds: Seq[String] = Seq(
// Velox doesn't support timezones like "UTC".
// "UTC",
// Not supported in velox.
// PST.getId,
// CET.getId,
"Africa/Dakar",
LA.getId,
"Asia/Urumqi",
"Asia/Hong_Kong",
"Europe/Brussels"
)
val outstandingZoneIds: Seq[ZoneId] = outstandingTimezonesIds.map(getZoneId)
Seq("legacy", "corrected").foreach {
legacyParserPolicy =>
for (zid <- outstandingZoneIds) {
withSQLConf(
SQLConf.LEGACY_TIME_PARSER_POLICY.key -> legacyParserPolicy,
SQLConf.SESSION_LOCAL_TIMEZONE.key -> zid.getId) {
val fmt1 = "yyyy-MM-dd HH:mm:ss"
val sdf1 = new SimpleDateFormat(fmt1, Locale.US)
val fmt2 = "yyyy-MM-dd HH:mm:ss.SSS"
val sdf2 = new SimpleDateFormat(fmt2, Locale.US)
val timeZoneId = Option(zid.getId)
val tz = TimeZone.getTimeZone(zid)
sdf1.setTimeZone(tz)
sdf2.setTimeZone(tz)

checkEvaluation(
FromUnixTime(Literal(0L), Literal(fmt1), timeZoneId),
sdf1.format(new Timestamp(0)))
checkEvaluation(
FromUnixTime(Literal(1000L), Literal(fmt1), timeZoneId),
sdf1.format(new Timestamp(1000000)))
checkEvaluation(
FromUnixTime(Literal(-1000L), Literal(fmt2), timeZoneId),
sdf2.format(new Timestamp(-1000000)))
checkEvaluation(
FromUnixTime(
Literal.create(null, LongType),
Literal.create(null, StringType),
timeZoneId),
null)
checkEvaluation(
FromUnixTime(Literal.create(null, LongType), Literal(fmt1), timeZoneId),
null)
checkEvaluation(
FromUnixTime(Literal(1000L), Literal.create(null, StringType), timeZoneId),
null)

// SPARK-28072 The codegen path for non-literal input should also work
checkEvaluation(
expression = FromUnixTime(
BoundReference(ordinal = 0, dataType = LongType, nullable = true),
BoundReference(ordinal = 1, dataType = StringType, nullable = true),
timeZoneId),
expected = UTF8String.fromString(sdf1.format(new Timestamp(0))),
inputRow = InternalRow(0L, UTF8String.fromString(fmt1))
)
}
}
}
// Test escaping of format
GenerateUnsafeProjection.generate(FromUnixTime(Literal(0L), Literal("\""), UTC_OPT) :: Nil)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -745,6 +745,7 @@ class ClickHouseTestSettings extends BackendTestSettings {
.exclude("TruncTimestamp")
.exclude("unsupported fmt fields for trunc/date_trunc results null")
.exclude("from_unixtime")
.exclude(GLUTEN_TEST + "from_unixtime")
.exclude("unix_timestamp")
.exclude("to_unix_timestamp")
.exclude("to_utc_timestamp")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude("DateFormat")
// Legacy mode is not supported, assuming this mode is not commonly used.
.exclude("to_timestamp exception mode")
// Replaced by a gluten test to pass timezone through config.
.exclude("from_unixtime")
enableSuite[GlutenDecimalExpressionSuite]
enableSuite[GlutenHashExpressionsSuite]
enableSuite[GlutenIntervalExpressionsSuite]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,4 +343,71 @@ class GlutenDateExpressionsSuite extends DateExpressionsSuite with GlutenTestsTr
}
}
}

test(GLUTEN_TEST + "from_unixtime") {
val outstandingTimezonesIds: Seq[String] = Seq(
// Velox doesn't support timezones like "UTC".
// "UTC",
// Not supported in velox.
// PST.getId,
// CET.getId,
"Africa/Dakar",
LA.getId,
"Asia/Urumqi",
"Asia/Hong_Kong",
"Europe/Brussels"
)
val outstandingZoneIds: Seq[ZoneId] = outstandingTimezonesIds.map(getZoneId)
Seq("legacy", "corrected").foreach {
legacyParserPolicy =>
for (zid <- outstandingZoneIds) {
withSQLConf(
SQLConf.LEGACY_TIME_PARSER_POLICY.key -> legacyParserPolicy,
SQLConf.SESSION_LOCAL_TIMEZONE.key -> zid.getId) {
val fmt1 = "yyyy-MM-dd HH:mm:ss"
val sdf1 = new SimpleDateFormat(fmt1, Locale.US)
val fmt2 = "yyyy-MM-dd HH:mm:ss.SSS"
val sdf2 = new SimpleDateFormat(fmt2, Locale.US)
val timeZoneId = Option(zid.getId)
val tz = TimeZone.getTimeZone(zid)
sdf1.setTimeZone(tz)
sdf2.setTimeZone(tz)

checkEvaluation(
FromUnixTime(Literal(0L), Literal(fmt1), timeZoneId),
sdf1.format(new Timestamp(0)))
checkEvaluation(
FromUnixTime(Literal(1000L), Literal(fmt1), timeZoneId),
sdf1.format(new Timestamp(1000000)))
checkEvaluation(
FromUnixTime(Literal(-1000L), Literal(fmt2), timeZoneId),
sdf2.format(new Timestamp(-1000000)))
checkEvaluation(
FromUnixTime(
Literal.create(null, LongType),
Literal.create(null, StringType),
timeZoneId),
null)
checkEvaluation(
FromUnixTime(Literal.create(null, LongType), Literal(fmt1), timeZoneId),
null)
checkEvaluation(
FromUnixTime(Literal(1000L), Literal.create(null, StringType), timeZoneId),
null)

// SPARK-28072 The codegen path for non-literal input should also work
checkEvaluation(
expression = FromUnixTime(
BoundReference(ordinal = 0, dataType = LongType, nullable = true),
BoundReference(ordinal = 1, dataType = StringType, nullable = true),
timeZoneId),
expected = UTF8String.fromString(sdf1.format(new Timestamp(0))),
inputRow = InternalRow(0L, UTF8String.fromString(fmt1))
)
}
}
}
// Test escaping of format
GenerateUnsafeProjection.generate(FromUnixTime(Literal(0L), Literal("\""), UTC_OPT) :: Nil)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,7 @@ class ClickHouseTestSettings extends BackendTestSettings {
.exclude("TruncTimestamp")
.exclude("unsupported fmt fields for trunc/date_trunc results null")
.exclude("from_unixtime")
.exclude(GLUTEN_TEST + "from_unixtime")
.exclude("unix_timestamp")
.exclude("to_unix_timestamp")
.exclude("to_utc_timestamp")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude("DateFormat")
// Legacy mode is not supported, assuming this mode is not commonly used.
.exclude("to_timestamp exception mode")
// Replaced by a gluten test to pass timezone through config.
.exclude("from_unixtime")
enableSuite[GlutenDecimalExpressionSuite]
enableSuite[GlutenHashExpressionsSuite]
enableSuite[GlutenIntervalExpressionsSuite]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,4 +343,72 @@ class GlutenDateExpressionsSuite extends DateExpressionsSuite with GlutenTestsTr
}
}
}

test(GLUTEN_TEST + "from_unixtime") {
val outstandingTimezonesIds: Seq[String] = Seq(
// Velox doesn't support timezones like "UTC".
// "UTC",
// Not supported in velox.
// PST.getId,
// CET.getId,
"Africa/Dakar",
LA.getId,
"Asia/Urumqi",
"Asia/Hong_Kong",
"Europe/Brussels"
)
val outstandingZoneIds: Seq[ZoneId] = outstandingTimezonesIds.map(getZoneId)
Seq("legacy", "corrected").foreach {
legacyParserPolicy =>
for (zid <- outstandingZoneIds) {
withSQLConf(
SQLConf.LEGACY_TIME_PARSER_POLICY.key -> legacyParserPolicy,
SQLConf.SESSION_LOCAL_TIMEZONE.key -> zid.getId) {
val fmt1 = "yyyy-MM-dd HH:mm:ss"
val sdf1 = new SimpleDateFormat(fmt1, Locale.US)
val fmt2 = "yyyy-MM-dd HH:mm:ss.SSS"
val sdf2 = new SimpleDateFormat(fmt2, Locale.US)
val timeZoneId = Option(zid.getId)
val tz = TimeZone.getTimeZone(zid)
sdf1.setTimeZone(tz)
sdf2.setTimeZone(tz)

checkEvaluation(
FromUnixTime(Literal(0L), Literal(fmt1), timeZoneId),
sdf1.format(new Timestamp(0)))
checkEvaluation(
FromUnixTime(Literal(1000L), Literal(fmt1), timeZoneId),
sdf1.format(new Timestamp(1000000)))
checkEvaluation(
FromUnixTime(Literal(-1000L), Literal(fmt2), timeZoneId),
sdf2.format(new Timestamp(-1000000)))
checkEvaluation(
FromUnixTime(
Literal.create(null, LongType),
Literal.create(null, StringType),
timeZoneId),
null)
checkEvaluation(
FromUnixTime(Literal.create(null, LongType), Literal(fmt1), timeZoneId),
null)
checkEvaluation(
FromUnixTime(Literal(1000L), Literal.create(null, StringType), timeZoneId),
null)

// SPARK-28072 The codegen path for non-literal input should also work
checkEvaluation(
expression = FromUnixTime(
BoundReference(ordinal = 0, dataType = LongType, nullable = true),
BoundReference(ordinal = 1, dataType = StringType, nullable = true),
timeZoneId),
expected = UTF8String.fromString(sdf1.format(new Timestamp(0))),
inputRow = InternalRow(0L, UTF8String.fromString(fmt1))
)
}
}
}
// Test escaping of format
GenerateUnsafeProjection.generate(FromUnixTime(Literal(0L), Literal("\""), UTC_OPT) :: Nil)
}

}

0 comments on commit 9b5525a

Please sign in to comment.