Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GLUTEN-8108] fix: Update logic to throw on failure for cast #8107

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1457,4 +1457,42 @@ abstract class ScalarFunctionsValidateSuite extends FunctionsValidateSuite {
checkGlutenOperatorMatch[FilterExecTransformer](df)
}
}

testWithSpecifiedSparkVersion("Test try_cast", Some("3.4")) {
withTempView("try_cast_table") {
withTempPath {
path =>
Seq[(String)](("123456"), ("000A1234"))
.toDF("str")
.write
.parquet(path.getCanonicalPath)
spark.read.parquet(path.getCanonicalPath).createOrReplaceTempView("try_cast_table")
runQueryAndCompare("select try_cast(str as bigint) from try_cast_table") {
checkGlutenOperatorMatch[ProjectExecTransformer]
}
runQueryAndCompare("select try_cast(str as double) from try_cast_table") {
checkGlutenOperatorMatch[ProjectExecTransformer]
}
}
}
}

test("Test cast") {
withTempView("cast_table") {
withTempPath {
path =>
Seq[(String)](("123456"), ("000A1234"))
.toDF("str")
.write
.parquet(path.getCanonicalPath)
spark.read.parquet(path.getCanonicalPath).createOrReplaceTempView("cast_table")
runQueryAndCompare("select cast(str as bigint) from cast_table") {
checkGlutenOperatorMatch[ProjectExecTransformer]
}
runQueryAndCompare("select cast(str as double) from cast_table") {
checkGlutenOperatorMatch[ProjectExecTransformer]
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,20 @@ public class CastNode implements ExpressionNode, Serializable {
private final TypeNode typeNode;
private final ExpressionNode expressionNode;

public final boolean ansiEnabled;
public final boolean throwOnFailure;

CastNode(TypeNode typeNode, ExpressionNode expressionNode, boolean ansiEnabled) {
CastNode(TypeNode typeNode, ExpressionNode expressionNode, boolean throwOnFailure) {
this.typeNode = typeNode;
this.expressionNode = expressionNode;
this.ansiEnabled = ansiEnabled;
this.throwOnFailure = throwOnFailure;
}

@Override
public Expression toProtobuf() {
Expression.Cast.Builder castBuilder = Expression.Cast.newBuilder();
castBuilder.setType(typeNode.toProtobuf());
castBuilder.setInput(expressionNode.toProtobuf());
if (ansiEnabled) {
if (throwOnFailure) {
// Throw exception on failure.
castBuilder.setFailureBehaviorValue(2);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,8 +263,8 @@ public static AggregateFunctionNode makeAggregateFunction(
}

public static CastNode makeCast(
TypeNode typeNode, ExpressionNode expressionNode, boolean ansiEnabled) {
return new CastNode(typeNode, expressionNode, ansiEnabled);
TypeNode typeNode, ExpressionNode expressionNode, boolean throwOnFailure) {
return new CastNode(typeNode, expressionNode, throwOnFailure);
}

public static StringMapNode makeStringMap(Map<String, String> values) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package org.apache.gluten.expression

import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.exception.GlutenNotSupportException
import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.gluten.substrait.`type`.ListNode
import org.apache.gluten.substrait.`type`.MapNode
import org.apache.gluten.substrait.expression.{ExpressionBuilder, ExpressionNode, StructLiteralNode}
Expand All @@ -43,7 +44,10 @@ case class CastTransformer(substraitExprName: String, child: ExpressionTransform
extends UnaryExpressionTransformer {
override def doTransform(args: java.lang.Object): ExpressionNode = {
val typeNode = ConverterUtils.getTypeNode(dataType, original.nullable)
ExpressionBuilder.makeCast(typeNode, child.doTransform(args), original.ansiEnabled)
ExpressionBuilder.makeCast(
typeNode,
child.doTransform(args),
SparkShimLoader.getSparkShims.withAnsiEvalMode(original))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I note Spark also sets ansiEnabled = true for EvalMode.TRY, which is for re-using the code logic of EvalMode.ANSI. The difference is, in TRY mode any exception is caught and then NULL is returned.
Velox has different implementation, which requires us to set this flag simply according to whether EvalMode is ANSI or not.

@acvictor, please leave some comments here to clarify, which should be helpful for future code maintenance.

https://github.com/apache/spark/blob/v3.5.3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala#L471

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import org.apache.gluten.utils.{BackendTestSettings, SQLQueryTestSettings}

import org.apache.spark.GlutenSortShuffleSuite
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.expressions.{GlutenArithmeticExpressionSuite, GlutenBitwiseExpressionsSuite, GlutenCastSuite, GlutenCollectionExpressionsSuite, GlutenComplexTypeSuite, GlutenConditionalExpressionSuite, GlutenDateExpressionsSuite, GlutenDecimalExpressionSuite, GlutenDecimalPrecisionSuite, GlutenHashExpressionsSuite, GlutenHigherOrderFunctionsSuite, GlutenIntervalExpressionsSuite, GlutenLiteralExpressionSuite, GlutenMathExpressionsSuite, GlutenMiscExpressionsSuite, GlutenNondeterministicSuite, GlutenNullExpressionsSuite, GlutenPredicateSuite, GlutenRandomSuite, GlutenRegexpExpressionsSuite, GlutenSortOrderExpressionsSuite, GlutenStringExpressionsSuite, GlutenTryEvalSuite}
import org.apache.spark.sql.catalyst.expressions.{GlutenArithmeticExpressionSuite, GlutenBitwiseExpressionsSuite, GlutenCastSuite, GlutenCollectionExpressionsSuite, GlutenComplexTypeSuite, GlutenConditionalExpressionSuite, GlutenDateExpressionsSuite, GlutenDecimalExpressionSuite, GlutenDecimalPrecisionSuite, GlutenHashExpressionsSuite, GlutenHigherOrderFunctionsSuite, GlutenIntervalExpressionsSuite, GlutenLiteralExpressionSuite, GlutenMathExpressionsSuite, GlutenMiscExpressionsSuite, GlutenNondeterministicSuite, GlutenNullExpressionsSuite, GlutenPredicateSuite, GlutenRandomSuite, GlutenRegexpExpressionsSuite, GlutenSortOrderExpressionsSuite, GlutenStringExpressionsSuite, GlutenTryCastSuite, GlutenTryEvalSuite}
import org.apache.spark.sql.connector.{GlutenDataSourceV2DataFrameSessionCatalogSuite, GlutenDataSourceV2DataFrameSuite, GlutenDataSourceV2FunctionSuite, GlutenDataSourceV2SQLSessionCatalogSuite, GlutenDataSourceV2SQLSuiteV1Filter, GlutenDataSourceV2SQLSuiteV2Filter, GlutenDataSourceV2Suite, GlutenDeleteFromTableSuite, GlutenDeltaBasedDeleteFromTableSuite, GlutenFileDataSourceV2FallBackSuite, GlutenGroupBasedDeleteFromTableSuite, GlutenKeyGroupedPartitioningSuite, GlutenLocalScanSuite, GlutenMetadataColumnSuite, GlutenSupportsCatalogOptionsSuite, GlutenTableCapabilityCheckSuite, GlutenWriteDistributionAndOrderingSuite}
import org.apache.spark.sql.errors.{GlutenQueryCompilationErrorsDSv2Suite, GlutenQueryCompilationErrorsSuite, GlutenQueryExecutionErrorsSuite, GlutenQueryParsingErrorsSuite}
import org.apache.spark.sql.execution.{FallbackStrategiesSuite, GlutenBroadcastExchangeSuite, GlutenCoalesceShufflePartitionsSuite, GlutenExchangeSuite, GlutenLocalBroadcastExchangeSuite, GlutenReplaceHashWithSortAggSuite, GlutenReuseExchangeAndSubquerySuite, GlutenSameResultSuite, GlutenSortSuite, GlutenSQLAggregateFunctionSuite, GlutenSQLWindowFunctionSuite, GlutenTakeOrderedAndProjectSuite}
Expand Down Expand Up @@ -91,6 +91,16 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude(
"Process Infinity, -Infinity, NaN in case insensitive manner" // +inf not supported in folly.
)
enableSuite[GlutenTryCastSuite]
.exclude(
"Process Infinity, -Infinity, NaN in case insensitive manner" // +inf not supported in folly.
)
.exclude("ANSI mode: Throw exception on casting out-of-range value to byte type")
.exclude("ANSI mode: Throw exception on casting out-of-range value to short type")
.exclude("ANSI mode: Throw exception on casting out-of-range value to int type")
.exclude("ANSI mode: Throw exception on casting out-of-range value to long type")
.exclude("cast from invalid string to numeric should throw NumberFormatException")
.exclude("SPARK-26218: Fix the corner case of codegen when casting float to Integer")
enableSuite[GlutenCollectionExpressionsSuite]
// Rewrite in Gluten to replace Seq with Array
.exclude("Shuffle")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.catalyst.expressions

import org.apache.spark.sql.GlutenTestsTrait

class GlutenTryCastSuite extends TryCastSuite with GlutenTestsTrait {}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import org.apache.gluten.utils.{BackendTestSettings, SQLQueryTestSettings}

import org.apache.spark.GlutenSortShuffleSuite
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.expressions.{GlutenArithmeticExpressionSuite, GlutenBitwiseExpressionsSuite, GlutenCastSuite, GlutenCollectionExpressionsSuite, GlutenComplexTypeSuite, GlutenConditionalExpressionSuite, GlutenDateExpressionsSuite, GlutenDecimalExpressionSuite, GlutenDecimalPrecisionSuite, GlutenHashExpressionsSuite, GlutenHigherOrderFunctionsSuite, GlutenIntervalExpressionsSuite, GlutenLiteralExpressionSuite, GlutenMathExpressionsSuite, GlutenMiscExpressionsSuite, GlutenNondeterministicSuite, GlutenNullExpressionsSuite, GlutenPredicateSuite, GlutenRandomSuite, GlutenRegexpExpressionsSuite, GlutenSortOrderExpressionsSuite, GlutenStringExpressionsSuite, GlutenTryEvalSuite}
import org.apache.spark.sql.catalyst.expressions.{GlutenArithmeticExpressionSuite, GlutenBitwiseExpressionsSuite, GlutenCastSuite, GlutenCollectionExpressionsSuite, GlutenComplexTypeSuite, GlutenConditionalExpressionSuite, GlutenDateExpressionsSuite, GlutenDecimalExpressionSuite, GlutenDecimalPrecisionSuite, GlutenHashExpressionsSuite, GlutenHigherOrderFunctionsSuite, GlutenIntervalExpressionsSuite, GlutenLiteralExpressionSuite, GlutenMathExpressionsSuite, GlutenMiscExpressionsSuite, GlutenNondeterministicSuite, GlutenNullExpressionsSuite, GlutenPredicateSuite, GlutenRandomSuite, GlutenRegexpExpressionsSuite, GlutenSortOrderExpressionsSuite, GlutenStringExpressionsSuite, GlutenTryCastSuite, GlutenTryEvalSuite}
import org.apache.spark.sql.connector._
import org.apache.spark.sql.errors.{GlutenQueryCompilationErrorsDSv2Suite, GlutenQueryCompilationErrorsSuite, GlutenQueryExecutionErrorsSuite, GlutenQueryParsingErrorsSuite}
import org.apache.spark.sql.execution._
Expand Down Expand Up @@ -92,6 +92,16 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude(
"Process Infinity, -Infinity, NaN in case insensitive manner" // +inf not supported in folly.
)
enableSuite[GlutenTryCastSuite]
.exclude(
"Process Infinity, -Infinity, NaN in case insensitive manner" // +inf not supported in folly.
)
.exclude("ANSI mode: Throw exception on casting out-of-range value to byte type")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you add some comments on why these tests are excluded? I remember ANSI ON causes execution fallback, can the tests produce Spark expected result?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests come from https://github.com/apache/spark/blob/master/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastWithAnsiOnSuite.scala and when the suite is run separately with SQLConf.get.setConf(SQLConf.ANSI_ENABLED, true) falls back but when run as part of GlutenTryCastSuite where the conf is not set does not give expected results. (Also, the tests fail without the code changes in this PR.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the conf is not set does not give expected results

Thanks for providing the details. These tests are for ANSI OFF cases, right?

Copy link
Contributor

@PHILO-HE PHILO-HE Dec 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests are for ANSI enabled (Spark views ANSI is enabled for TRY mode).
@acvictor, for these tests triggered from TryCastSuite, exceptions are not thrown, but null results are returned? It should be an expected behavior for Velox cast.
I cannot figure out how these exception checks can pass in vanilla Spark, assuming exception is handled internally in TRY mode.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@PHILO-HE I don't fully understand what you mean. Are you saying Velox behaviour is incorrect?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@acvictor, no. I am just curious why these tests of TryCastSuite can pass for vanilla Spark. I am assuming the exceptions are handled internally by Spark and null result is returned by internally catching these exceptions. But seems in these tests, exception can still be caught from external side in vanilla Spark.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@acvictor, please rebase the code. The core change looks good to me.

.exclude("ANSI mode: Throw exception on casting out-of-range value to short type")
.exclude("ANSI mode: Throw exception on casting out-of-range value to int type")
.exclude("ANSI mode: Throw exception on casting out-of-range value to long type")
.exclude("cast from invalid string to numeric should throw NumberFormatException")
.exclude("SPARK-26218: Fix the corner case of codegen when casting float to Integer")
enableSuite[GlutenCollectionExpressionsSuite]
// Rewrite in Gluten to replace Seq with Array
.exclude("Shuffle")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.catalyst.expressions

import org.apache.spark.sql.GlutenTestsTrait

class GlutenTryCastSuite extends TryCastSuite with GlutenTestsTrait {}