Skip to content

Commit

Permalink
downgrade Spark to 3.3
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-bli committed Feb 22, 2024
1 parent 3a26f61 commit 07daebb
Show file tree
Hide file tree
Showing 11 changed files with 21 additions and 21 deletions.
4 changes: 2 additions & 2 deletions .github/docker/build_image.sh
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ cd ../..

# Build docker image
docker build \
--build-arg SPARK_URL=https://archive.apache.org/dist/spark/spark-3.4.0/spark-3.4.0-bin-hadoop3.tgz \
--build-arg SPARK_BINARY_NAME=spark-3.4.0-bin-hadoop3.tgz \
--build-arg SPARK_URL=https://archive.apache.org/dist/spark/spark-3.3.2/spark-3.3.2-bin-hadoop3.tgz \
--build-arg SPARK_BINARY_NAME=spark-3.3.2-bin-hadoop3.tgz \
--build-arg JDBC_URL=https://repo1.maven.org/maven2/net/snowflake/snowflake-jdbc/${TEST_JDBC_VERSION}/$JDBC_JAR_NAME \
--build-arg JDBC_BINARY_NAME=$JDBC_JAR_NAME \
--build-arg SPARK_CONNECTOR_LOCATION=target/scala-${TEST_SCALA_VERSION}/$SPARK_CONNECTOR_JAR_NAME \
Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/ClusterTest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ jobs:
strategy:
matrix:
scala_version: [ '2.12.11' ]
spark_version: [ '3.4.0' ]
spark_version: [ '3.3.2' ]
use_copy_unload: [ 'true' ]
cloud_provider: [ 'gcp' ]
env:
SNOWFLAKE_TEST_CONFIG_SECRET: ${{ secrets.SNOWFLAKE_TEST_CONFIG_SECRET }}
TEST_SPARK_VERSION: '3.4'
DOCKER_IMAGE_TAG: 'snowflakedb/spark-base:3.4.0'
TEST_SPARK_VERSION: '3.3'
DOCKER_IMAGE_TAG: 'snowflakedb/spark-base:3.3.2'
TEST_SCALA_VERSION: '2.12'
TEST_COMPILE_SCALA_VERSION: '2.12.11'
TEST_SPARK_CONNECTOR_VERSION: '2.15.0'
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/IntegrationTest_2.12.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:
strategy:
matrix:
scala_version: [ '2.12.11' ]
spark_version: [ '3.4.0' ]
spark_version: [ '3.3.2' ]
use_copy_unload: [ 'true', 'false' ]
cloud_provider: [ 'aws', 'azure' ]
# run_query_in_async can be removed after async mode is stable
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/IntegrationTest_2.13.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:
strategy:
matrix:
scala_version: [ '2.13.9' ]
spark_version: [ '3.4.0' ]
spark_version: [ '3.3.2' ]
use_copy_unload: [ 'true', 'false' ]
cloud_provider: [ 'aws', 'azure' ]
# run_query_in_async can be removed after async mode is stable
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/IntegrationTest_gcp_2.12.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:
strategy:
matrix:
scala_version: [ '2.12.11' ]
spark_version: [ '3.4.0' ]
spark_version: [ '3.3.2' ]
use_copy_unload: [ 'false' ]
cloud_provider: [ 'gcp' ]
# run_query_in_async can be removed after async mode is stable
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/IntegrationTest_gcp_2.13.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:
strategy:
matrix:
scala_version: [ '2.13.9' ]
spark_version: [ '3.4.0' ]
spark_version: [ '3.3.2' ]
use_copy_unload: [ 'false' ]
cloud_provider: [ 'gcp' ]
# run_query_in_async can be removed after async mode is stable
Expand Down
4 changes: 2 additions & 2 deletions ClusterTest/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@

val sparkConnectorVersion = "2.15.0"
val scalaVersionMajor = "2.12"
val sparkVersionMajor = "3.4"
val sparkVersion = s"${sparkVersionMajor}.0"
val sparkVersionMajor = "3.3"
val sparkVersion = s"${sparkVersionMajor}.2"
val testSparkVersion = sys.props.get("spark.testVersion").getOrElse(sparkVersion)

unmanagedJars in Compile += file(s"../target/scala-${scalaVersionMajor}/" +
Expand Down
6 changes: 3 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@

import scala.util.Properties

val sparkVersion = "3.4"
val testSparkVersion = sys.props.get("spark.testVersion").getOrElse("3.4.0")
val sparkVersion = "3.3"
val testSparkVersion = sys.props.get("spark.testVersion").getOrElse("3.3.2")

/*
* Don't change the variable name "sparkConnectorVersion" because
Expand All @@ -41,7 +41,7 @@ lazy val root = project.withId("spark-snowflake").in(file("."))
.settings(
name := "spark-snowflake",
organization := "net.snowflake",
version := s"${sparkConnectorVersion}-spark_3.4",
version := s"${sparkConnectorVersion}-spark_3.3",
scalaVersion := sys.props.getOrElse("SPARK_SCALA_VERSION", default = "2.12.11"),
// Spark 3.2 supports scala 2.12 and 2.13
crossScalaVersions := Seq("2.12.11", "2.13.9"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ object SnowflakeConnectorUtils {
* Check Spark version, if Spark version matches SUPPORT_SPARK_VERSION enable PushDown,
* otherwise disable it.
*/
val SUPPORT_SPARK_VERSION = "3.4"
val SUPPORT_SPARK_VERSION = "3.3"

def checkVersionAndEnablePushdown(session: SparkSession): Boolean =
if (session.version.startsWith(SUPPORT_SPARK_VERSION)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import net.snowflake.spark.snowflake.{
SnowflakePushdownUnsupportedException,
SnowflakeSQLStatement
}
import org.apache.spark.sql.catalyst.expressions.EvalMode.LEGACY
import org.apache.spark.sql.catalyst.expressions.{
Alias,
Ascending,
Expand All @@ -31,6 +30,7 @@ import org.apache.spark.sql.catalyst.expressions.{
}
import org.apache.spark.sql.types.{Decimal, _}
import org.apache.spark.unsafe.types.UTF8String
import net.snowflake.spark.snowflake.Utils

/** Extractors for everything else. */
private[querygeneration] object MiscStatement {
Expand All @@ -54,7 +54,7 @@ private[querygeneration] object MiscStatement {
// - New Type: evalMode: EvalMode.Value = EvalMode.fromSQLConf(SQLConf.get)
// Currently, there are 3 modes: LEGACY, ANSI, TRY
// support to pushdown, if the mode is LEGACY.
case Cast(child, t, _, evalMode) if evalMode == LEGACY =>
case Cast(child, t, _, ansiEnabled) if !ansiEnabled =>
getCastType(t) match {
case Some(cast) =>
// For known unsupported data conversion, raise exception to break the
Expand Down Expand Up @@ -122,7 +122,7 @@ private[querygeneration] object MiscStatement {
// Spark 3.4 introduce join hint. The join hint doesn't affect correctness.
// So it can be ignored in the pushdown process
// https://github.com/apache/spark/commit/0fa9c554fc0b3940a47c3d1c6a5a17ca9a8cee8e
case ScalarSubquery(subquery, _, _, joinCond, _) if joinCond.isEmpty =>
case ScalarSubquery(subquery, _, _, joinCond) if joinCond.isEmpty =>
blockStatement(new QueryBuilder(subquery).statement)

case UnscaledValue(child) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.{
Pi,
Pow,
// PromotePrecision is removed from Spark 3.4
// PromotePrecision,
PromotePrecision,
Rand,
Round,
Sin,
Expand Down Expand Up @@ -78,7 +78,7 @@ private[querygeneration] object NumericStatement {

// PromotePrecision is removed from Spark 3.4
// https://github.com/apache/spark/pull/36698
// case PromotePrecision(child) => convertStatement(child, fields)
case PromotePrecision(child) => convertStatement(child, fields)

case CheckOverflow(child, t, _) =>
MiscStatement.getCastType(t) match {
Expand All @@ -101,7 +101,7 @@ private[querygeneration] object NumericStatement {
// Spark 3.4 adds a new argument: ansiEnabled
// https://github.com/apache/spark/commit/42721120f3c7206a9fc22db5d0bb7cf40f0cacfd
// The pushdown is supported for non-ANSI mode.
case Round(child, scale, ansiEnabled) if !ansiEnabled =>
case Round(child, scale) =>
ConstantString("ROUND") + blockStatement(
convertStatements(fields, child, scale)
)
Expand Down

0 comments on commit 07daebb

Please sign in to comment.