diff --git a/.github/workflows/util/install_spark_resources.sh b/.github/workflows/util/install_spark_resources.sh index 0afa69958217..dd2afec821d4 100755 --- a/.github/workflows/util/install_spark_resources.sh +++ b/.github/workflows/util/install_spark_resources.sh @@ -63,26 +63,26 @@ case "$1" in 3.5) # Spark-3.5 cd ${INSTALL_DIR} && \ - wget -nv https://archive.apache.org/dist/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz && \ - tar --strip-components=1 -xf spark-3.5.1-bin-hadoop3.tgz spark-3.5.1-bin-hadoop3/jars/ && \ - rm -rf spark-3.5.1-bin-hadoop3.tgz && \ + wget -nv https://archive.apache.org/dist/spark/spark-3.5.2/spark-3.5.2-bin-hadoop3.tgz && \ + tar --strip-components=1 -xf spark-3.5.2-bin-hadoop3.tgz spark-3.5.2-bin-hadoop3/jars/ && \ + rm -rf spark-3.5.2-bin-hadoop3.tgz && \ mkdir -p ${INSTALL_DIR}/shims/spark35/spark_home/assembly/target/scala-2.12 && \ mv jars ${INSTALL_DIR}/shims/spark35/spark_home/assembly/target/scala-2.12 && \ - wget -nv https://github.com/apache/spark/archive/refs/tags/v3.5.1.tar.gz && \ - tar --strip-components=1 -xf v3.5.1.tar.gz spark-3.5.1/sql/core/src/test/resources/ && \ + wget -nv https://github.com/apache/spark/archive/refs/tags/v3.5.2.tar.gz && \ + tar --strip-components=1 -xf v3.5.2.tar.gz spark-3.5.2/sql/core/src/test/resources/ && \ mkdir -p shims/spark35/spark_home/ && \ mv sql shims/spark35/spark_home/ ;; 3.5-scala2.13) # Spark-3.5, scala 2.13 cd ${INSTALL_DIR} && \ - wget -nv https://archive.apache.org/dist/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz && \ - tar --strip-components=1 -xf spark-3.5.1-bin-hadoop3.tgz spark-3.5.1-bin-hadoop3/jars/ && \ - rm -rf spark-3.5.1-bin-hadoop3.tgz && \ + wget -nv https://archive.apache.org/dist/spark/spark-3.5.2/spark-3.5.2-bin-hadoop3.tgz && \ + tar --strip-components=1 -xf spark-3.5.2-bin-hadoop3.tgz spark-3.5.2-bin-hadoop3/jars/ && \ + rm -rf spark-3.5.2-bin-hadoop3.tgz && \ mkdir -p ${INSTALL_DIR}/shims/spark35/spark_home/assembly/target/scala-2.13 && \ mv jars ${INSTALL_DIR}/shims/spark35/spark_home/assembly/target/scala-2.13 && \ - wget -nv https://github.com/apache/spark/archive/refs/tags/v3.5.1.tar.gz && \ - tar --strip-components=1 -xf v3.5.1.tar.gz spark-3.5.1/sql/core/src/test/resources/ && \ + wget -nv https://github.com/apache/spark/archive/refs/tags/v3.5.2.tar.gz && \ + tar --strip-components=1 -xf v3.5.2.tar.gz spark-3.5.2/sql/core/src/test/resources/ && \ mkdir -p shims/spark35/spark_home/ && \ mv sql shims/spark35/spark_home/ ;; diff --git a/.github/workflows/velox_backend.yml b/.github/workflows/velox_backend.yml index b444d992cd7d..3d1399c3a68b 100644 --- a/.github/workflows/velox_backend.yml +++ b/.github/workflows/velox_backend.yml @@ -927,15 +927,15 @@ jobs: working-directory: ${{ github.workspace }} run: | mkdir -p '${{ env.CCACHE_DIR }}' - - name: Prepare spark.test.home for Spark 3.5.1 (other tests) + - name: Prepare spark.test.home for Spark 3.5.2 (other tests) run: | bash .github/workflows/util/install_spark_resources.sh 3.5 dnf module -y install python39 && \ alternatives --set python3 /usr/bin/python3.9 && \ pip3 install setuptools && \ - pip3 install pyspark==3.5.1 cython && \ + pip3 install pyspark==3.5.2 cython && \ pip3 install pandas pyarrow - - name: Build and Run unit test for Spark 3.5.1 (other tests) + - name: Build and Run unit test for Spark 3.5.2 (other tests) run: | cd $GITHUB_WORKSPACE/ export SPARK_SCALA_VERSION=2.12 @@ -984,15 +984,15 @@ jobs: working-directory: ${{ github.workspace }} run: | mkdir -p '${{ env.CCACHE_DIR }}' - - name: Prepare spark.test.home for Spark 3.5.1 (other tests) + - name: Prepare spark.test.home for Spark 3.5.2 (other tests) run: | bash .github/workflows/util/install_spark_resources.sh 3.5-scala2.13 dnf module -y install python39 && \ alternatives --set python3 /usr/bin/python3.9 && \ pip3 install setuptools && \ - pip3 install pyspark==3.5.1 cython && \ + pip3 install pyspark==3.5.2 cython && \ pip3 install pandas pyarrow - - name: Build and Run unit test for Spark 3.5.1 with scala-2.13 (other tests) + - name: Build and Run unit test for Spark 3.5.2 with scala-2.13 (other tests) run: | cd $GITHUB_WORKSPACE/ export SPARK_SCALA_VERSION=2.13 @@ -1041,10 +1041,10 @@ jobs: working-directory: ${{ github.workspace }} run: | mkdir -p '${{ env.CCACHE_DIR }}' - - name: Prepare spark.test.home for Spark 3.5.1 (other tests) + - name: Prepare spark.test.home for Spark 3.5.2 (other tests) run: | bash .github/workflows/util/install_spark_resources.sh 3.5 - - name: Build and Run unit test for Spark 3.5.1 (slow tests) + - name: Build and Run unit test for Spark 3.5.2 (slow tests) run: | cd $GITHUB_WORKSPACE/ $MVN_CMD clean test -Pspark-3.5 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Phudi -Pspark-ut \ diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/compatibility/GlutenClickhouseStringFunctionsSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/compatibility/GlutenClickhouseStringFunctionsSuite.scala index e40b293ea9d2..ffb9ef57aa08 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/compatibility/GlutenClickhouseStringFunctionsSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/compatibility/GlutenClickhouseStringFunctionsSuite.scala @@ -137,7 +137,8 @@ class GlutenClickhouseStringFunctionsSuite extends GlutenClickHouseWholeStageTra } } - test("base64") { + testSparkVersionLE33("base64") { + // fallback on Spark-352, see https://github.com/apache/spark/pull/47303 val tableName = "base64_table" withTable(tableName) { sql(s"create table $tableName(data String) using parquet") diff --git a/gluten-substrait/src/main/scala/org/apache/spark/shuffle/GlutenShuffleUtils.scala b/gluten-substrait/src/main/scala/org/apache/spark/shuffle/GlutenShuffleUtils.scala index a65211d86a3f..29443b59c5f6 100644 --- a/gluten-substrait/src/main/scala/org/apache/spark/shuffle/GlutenShuffleUtils.scala +++ b/gluten-substrait/src/main/scala/org/apache/spark/shuffle/GlutenShuffleUtils.scala @@ -21,8 +21,9 @@ import org.apache.gluten.backendsapi.BackendsApiManager import org.apache.gluten.sql.shims.SparkShimLoader import org.apache.gluten.vectorized.NativePartitioning -import org.apache.spark.SparkConf +import org.apache.spark.{SparkConf, TaskContext} import org.apache.spark.internal.config._ +import org.apache.spark.shuffle.api.ShuffleExecutorComponents import org.apache.spark.storage.{BlockId, BlockManagerId} import org.apache.spark.util.random.XORShiftRandom @@ -122,4 +123,17 @@ object GlutenShuffleUtils { startPartition, endPartition) } + + def getSortShuffleWriter[K, V]( + handle: ShuffleHandle, + mapId: Long, + context: TaskContext, + metrics: ShuffleWriteMetricsReporter, + shuffleExecutorComponents: ShuffleExecutorComponents + ): ShuffleWriter[K, V] = { + handle match { + case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] => + SparkSortShuffleWriterUtil.create(other, mapId, context, metrics, shuffleExecutorComponents) + } + } } diff --git a/gluten-substrait/src/main/scala/org/apache/spark/shuffle/sort/ColumnarShuffleManager.scala b/gluten-substrait/src/main/scala/org/apache/spark/shuffle/sort/ColumnarShuffleManager.scala index d8ba78cb98fd..18e8b985b893 100644 --- a/gluten-substrait/src/main/scala/org/apache/spark/shuffle/sort/ColumnarShuffleManager.scala +++ b/gluten-substrait/src/main/scala/org/apache/spark/shuffle/sort/ColumnarShuffleManager.scala @@ -107,7 +107,12 @@ class ColumnarShuffleManager(conf: SparkConf) extends ShuffleManager with Loggin metrics, shuffleExecutorComponents) case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] => - new SortShuffleWriter(other, mapId, context, shuffleExecutorComponents) + GlutenShuffleUtils.getSortShuffleWriter( + other, + mapId, + context, + metrics, + shuffleExecutorComponents) } } diff --git a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala index ed4939595b22..03f56b46010a 100644 --- a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala @@ -195,6 +195,8 @@ class VeloxTestSettings extends BackendTestSettings { .exclude("test with tab delimiter and double quote") // Arrow not support corrupt record .exclude("SPARK-27873: disabling enforceSchema should not fail columnNameOfCorruptRecord") + // varchar + .exclude("SPARK-48241: CSV parsing failure with char/varchar type columns") enableSuite[GlutenCSVv2Suite] .exclude("Gluten - test for FAILFAST parsing mode") // Rule org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown in batch @@ -213,6 +215,8 @@ class VeloxTestSettings extends BackendTestSettings { .exclude("test with tab delimiter and double quote") // Arrow not support corrupt record .exclude("SPARK-27873: disabling enforceSchema should not fail columnNameOfCorruptRecord") + // varchar + .exclude("SPARK-48241: CSV parsing failure with char/varchar type columns") enableSuite[GlutenCSVLegacyTimeParserSuite] // file cars.csv include null string, Arrow not support to read .exclude("DDL test with schema") @@ -226,6 +230,8 @@ class VeloxTestSettings extends BackendTestSettings { .exclude("DDL test with tab separated file") .exclude("DDL test parsing decimal type") .exclude("test with tab delimiter and double quote") + // varchar + .exclude("SPARK-48241: CSV parsing failure with char/varchar type columns") enableSuite[GlutenJsonV1Suite] // FIXME: Array direct selection fails .exclude("Complex field and type inferring") diff --git a/package/pom.xml b/package/pom.xml index fc72fe93de84..f4cc8d6f7015 100644 --- a/package/pom.xml +++ b/package/pom.xml @@ -303,6 +303,8 @@ org.apache.spark.sql.hive.execution.HiveFileFormat org.apache.spark.sql.hive.execution.HiveFileFormat$$$$anon$1 org.apache.spark.sql.hive.execution.HiveOutputWriter + org.apache.spark.sql.catalyst.plans.QueryPlan + org.apache.spark.sql.catalyst.plans.QueryPlan* org.apache.spark.sql.execution.datasources.BasicWriteTaskStats org.apache.spark.sql.execution.datasources.BasicWriteTaskStats$ org.apache.spark.sql.execution.datasources.BasicWriteTaskStatsTracker diff --git a/pom.xml b/pom.xml index 439f12b453c9..a5f39b9cc253 100644 --- a/pom.xml +++ b/pom.xml @@ -341,7 +341,7 @@ 3.5 spark35 spark-sql-columnar-shims-spark35 - 3.5.1 + 3.5.2 1.5.0 delta-spark 3.2.0 diff --git a/shims/spark32/src/main/scala/org/apache/spark/shuffle/SparkSortShuffleWriterUtil.scala b/shims/spark32/src/main/scala/org/apache/spark/shuffle/SparkSortShuffleWriterUtil.scala new file mode 100644 index 000000000000..9e684c2afdd4 --- /dev/null +++ b/shims/spark32/src/main/scala/org/apache/spark/shuffle/SparkSortShuffleWriterUtil.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.shuffle + +import org.apache.spark.TaskContext +import org.apache.spark.shuffle.api.ShuffleExecutorComponents +import org.apache.spark.shuffle.sort.SortShuffleWriter + +object SparkSortShuffleWriterUtil { + def create[K, V, C]( + handle: BaseShuffleHandle[K, V, C], + mapId: Long, + context: TaskContext, + writeMetrics: ShuffleWriteMetricsReporter, + shuffleExecutorComponents: ShuffleExecutorComponents): ShuffleWriter[K, V] = { + new SortShuffleWriter(handle, mapId, context, shuffleExecutorComponents) + } +} diff --git a/shims/spark33/src/main/scala/org/apache/spark/shuffle/SparkSortShuffleWriterUtil.scala b/shims/spark33/src/main/scala/org/apache/spark/shuffle/SparkSortShuffleWriterUtil.scala new file mode 100644 index 000000000000..9e684c2afdd4 --- /dev/null +++ b/shims/spark33/src/main/scala/org/apache/spark/shuffle/SparkSortShuffleWriterUtil.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.shuffle + +import org.apache.spark.TaskContext +import org.apache.spark.shuffle.api.ShuffleExecutorComponents +import org.apache.spark.shuffle.sort.SortShuffleWriter + +object SparkSortShuffleWriterUtil { + def create[K, V, C]( + handle: BaseShuffleHandle[K, V, C], + mapId: Long, + context: TaskContext, + writeMetrics: ShuffleWriteMetricsReporter, + shuffleExecutorComponents: ShuffleExecutorComponents): ShuffleWriter[K, V] = { + new SortShuffleWriter(handle, mapId, context, shuffleExecutorComponents) + } +} diff --git a/shims/spark34/src/main/scala/org/apache/spark/shuffle/SparkSortShuffleWriterUtil.scala b/shims/spark34/src/main/scala/org/apache/spark/shuffle/SparkSortShuffleWriterUtil.scala new file mode 100644 index 000000000000..9e684c2afdd4 --- /dev/null +++ b/shims/spark34/src/main/scala/org/apache/spark/shuffle/SparkSortShuffleWriterUtil.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.shuffle + +import org.apache.spark.TaskContext +import org.apache.spark.shuffle.api.ShuffleExecutorComponents +import org.apache.spark.shuffle.sort.SortShuffleWriter + +object SparkSortShuffleWriterUtil { + def create[K, V, C]( + handle: BaseShuffleHandle[K, V, C], + mapId: Long, + context: TaskContext, + writeMetrics: ShuffleWriteMetricsReporter, + shuffleExecutorComponents: ShuffleExecutorComponents): ShuffleWriter[K, V] = { + new SortShuffleWriter(handle, mapId, context, shuffleExecutorComponents) + } +} diff --git a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala index d130864a9fed..43ed51579a1b 100644 --- a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala +++ b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala @@ -526,15 +526,27 @@ class Spark35Shims extends SparkShims { Seq(expr.srcArrayExpr, expr.posExpr, expr.itemExpr, Literal(expr.legacyNegativeIndex)) } + override def withOperatorIdMap[T](idMap: java.util.Map[QueryPlan[_], Int])(body: => T): T = { + val prevIdMap = QueryPlan.localIdMap.get() + try { + QueryPlan.localIdMap.set(idMap) + body + } finally { + QueryPlan.localIdMap.set(prevIdMap) + } + } + override def getOperatorId(plan: QueryPlan[_]): Option[Int] = { - plan.getTagValue(QueryPlan.OP_ID_TAG) + Option(QueryPlan.localIdMap.get().get(plan)) } override def setOperatorId(plan: QueryPlan[_], opId: Int): Unit = { - plan.setTagValue(QueryPlan.OP_ID_TAG, opId) + val map = QueryPlan.localIdMap.get() + assert(!map.containsKey(plan)) + map.put(plan, opId) } override def unsetOperatorId(plan: QueryPlan[_]): Unit = { - plan.unsetTagValue(QueryPlan.OP_ID_TAG) + QueryPlan.localIdMap.get().remove(plan) } } diff --git a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/SparkShimProvider.scala b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/SparkShimProvider.scala index 52bbf4299d44..eab32ab9d0b9 100644 --- a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/SparkShimProvider.scala +++ b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/SparkShimProvider.scala @@ -20,7 +20,7 @@ import org.apache.gluten.sql.shims.{SparkShimDescriptor, SparkShims} import org.apache.gluten.sql.shims.spark35.SparkShimProvider.DESCRIPTOR object SparkShimProvider { - val DESCRIPTOR = SparkShimDescriptor(3, 5, 1) + val DESCRIPTOR = SparkShimDescriptor(3, 5, 2) } class SparkShimProvider extends org.apache.gluten.sql.shims.SparkShimProvider { diff --git a/shims/spark35/src/main/scala/org/apache/spark/shuffle/SparkSortShuffleWriterUtil.scala b/shims/spark35/src/main/scala/org/apache/spark/shuffle/SparkSortShuffleWriterUtil.scala new file mode 100644 index 000000000000..95b15f04e7cb --- /dev/null +++ b/shims/spark35/src/main/scala/org/apache/spark/shuffle/SparkSortShuffleWriterUtil.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.shuffle + +import org.apache.spark.TaskContext +import org.apache.spark.shuffle.api.ShuffleExecutorComponents +import org.apache.spark.shuffle.sort.SortShuffleWriter + +object SparkSortShuffleWriterUtil { + def create[K, V, C]( + handle: BaseShuffleHandle[K, V, C], + mapId: Long, + context: TaskContext, + writeMetrics: ShuffleWriteMetricsReporter, + shuffleExecutorComponents: ShuffleExecutorComponents): ShuffleWriter[K, V] = { + new SortShuffleWriter(handle, mapId, context, writeMetrics, shuffleExecutorComponents) + } +} diff --git a/tools/gluten-it/pom.xml b/tools/gluten-it/pom.xml index b8930dd4a4f1..bad4d6087f11 100644 --- a/tools/gluten-it/pom.xml +++ b/tools/gluten-it/pom.xml @@ -164,7 +164,7 @@ spark-3.5 - 3.5.1 + 3.5.2 2.12.18