Skip to content

Commit

Permalink
[GLUTEN-6876][CORE] Support Spark-352 (#7138)
Browse files Browse the repository at this point in the history
This patch adds support Spark 352. The notable changes are
- shuffle write API
- query plan tag removed
---------

Signed-off-by: Yuan Zhou <[email protected]>
Co-authored-by: Hongze Zhang <[email protected]>
  • Loading branch information
zhouyuan and zhztheplayer authored Oct 15, 2024
1 parent 74c6641 commit 32cd1dc
Show file tree
Hide file tree
Showing 15 changed files with 195 additions and 27 deletions.
20 changes: 10 additions & 10 deletions .github/workflows/util/install_spark_resources.sh
Original file line number Diff line number Diff line change
Expand Up @@ -63,26 +63,26 @@ case "$1" in
3.5)
# Spark-3.5
cd ${INSTALL_DIR} && \
wget -nv https://archive.apache.org/dist/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz && \
tar --strip-components=1 -xf spark-3.5.1-bin-hadoop3.tgz spark-3.5.1-bin-hadoop3/jars/ && \
rm -rf spark-3.5.1-bin-hadoop3.tgz && \
wget -nv https://archive.apache.org/dist/spark/spark-3.5.2/spark-3.5.2-bin-hadoop3.tgz && \
tar --strip-components=1 -xf spark-3.5.2-bin-hadoop3.tgz spark-3.5.2-bin-hadoop3/jars/ && \
rm -rf spark-3.5.2-bin-hadoop3.tgz && \
mkdir -p ${INSTALL_DIR}/shims/spark35/spark_home/assembly/target/scala-2.12 && \
mv jars ${INSTALL_DIR}/shims/spark35/spark_home/assembly/target/scala-2.12 && \
wget -nv https://github.com/apache/spark/archive/refs/tags/v3.5.1.tar.gz && \
tar --strip-components=1 -xf v3.5.1.tar.gz spark-3.5.1/sql/core/src/test/resources/ && \
wget -nv https://github.com/apache/spark/archive/refs/tags/v3.5.2.tar.gz && \
tar --strip-components=1 -xf v3.5.2.tar.gz spark-3.5.2/sql/core/src/test/resources/ && \
mkdir -p shims/spark35/spark_home/ && \
mv sql shims/spark35/spark_home/
;;
3.5-scala2.13)
# Spark-3.5, scala 2.13
cd ${INSTALL_DIR} && \
wget -nv https://archive.apache.org/dist/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz && \
tar --strip-components=1 -xf spark-3.5.1-bin-hadoop3.tgz spark-3.5.1-bin-hadoop3/jars/ && \
rm -rf spark-3.5.1-bin-hadoop3.tgz && \
wget -nv https://archive.apache.org/dist/spark/spark-3.5.2/spark-3.5.2-bin-hadoop3.tgz && \
tar --strip-components=1 -xf spark-3.5.2-bin-hadoop3.tgz spark-3.5.2-bin-hadoop3/jars/ && \
rm -rf spark-3.5.2-bin-hadoop3.tgz && \
mkdir -p ${INSTALL_DIR}/shims/spark35/spark_home/assembly/target/scala-2.13 && \
mv jars ${INSTALL_DIR}/shims/spark35/spark_home/assembly/target/scala-2.13 && \
wget -nv https://github.com/apache/spark/archive/refs/tags/v3.5.1.tar.gz && \
tar --strip-components=1 -xf v3.5.1.tar.gz spark-3.5.1/sql/core/src/test/resources/ && \
wget -nv https://github.com/apache/spark/archive/refs/tags/v3.5.2.tar.gz && \
tar --strip-components=1 -xf v3.5.2.tar.gz spark-3.5.2/sql/core/src/test/resources/ && \
mkdir -p shims/spark35/spark_home/ && \
mv sql shims/spark35/spark_home/
;;
Expand Down
16 changes: 8 additions & 8 deletions .github/workflows/velox_backend.yml
Original file line number Diff line number Diff line change
Expand Up @@ -927,15 +927,15 @@ jobs:
working-directory: ${{ github.workspace }}
run: |
mkdir -p '${{ env.CCACHE_DIR }}'
- name: Prepare spark.test.home for Spark 3.5.1 (other tests)
- name: Prepare spark.test.home for Spark 3.5.2 (other tests)
run: |
bash .github/workflows/util/install_spark_resources.sh 3.5
dnf module -y install python39 && \
alternatives --set python3 /usr/bin/python3.9 && \
pip3 install setuptools && \
pip3 install pyspark==3.5.1 cython && \
pip3 install pyspark==3.5.2 cython && \
pip3 install pandas pyarrow
- name: Build and Run unit test for Spark 3.5.1 (other tests)
- name: Build and Run unit test for Spark 3.5.2 (other tests)
run: |
cd $GITHUB_WORKSPACE/
export SPARK_SCALA_VERSION=2.12
Expand Down Expand Up @@ -984,15 +984,15 @@ jobs:
working-directory: ${{ github.workspace }}
run: |
mkdir -p '${{ env.CCACHE_DIR }}'
- name: Prepare spark.test.home for Spark 3.5.1 (other tests)
- name: Prepare spark.test.home for Spark 3.5.2 (other tests)
run: |
bash .github/workflows/util/install_spark_resources.sh 3.5-scala2.13
dnf module -y install python39 && \
alternatives --set python3 /usr/bin/python3.9 && \
pip3 install setuptools && \
pip3 install pyspark==3.5.1 cython && \
pip3 install pyspark==3.5.2 cython && \
pip3 install pandas pyarrow
- name: Build and Run unit test for Spark 3.5.1 with scala-2.13 (other tests)
- name: Build and Run unit test for Spark 3.5.2 with scala-2.13 (other tests)
run: |
cd $GITHUB_WORKSPACE/
export SPARK_SCALA_VERSION=2.13
Expand Down Expand Up @@ -1041,10 +1041,10 @@ jobs:
working-directory: ${{ github.workspace }}
run: |
mkdir -p '${{ env.CCACHE_DIR }}'
- name: Prepare spark.test.home for Spark 3.5.1 (other tests)
- name: Prepare spark.test.home for Spark 3.5.2 (other tests)
run: |
bash .github/workflows/util/install_spark_resources.sh 3.5
- name: Build and Run unit test for Spark 3.5.1 (slow tests)
- name: Build and Run unit test for Spark 3.5.2 (slow tests)
run: |
cd $GITHUB_WORKSPACE/
$MVN_CMD clean test -Pspark-3.5 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Phudi -Pspark-ut \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ class GlutenClickhouseStringFunctionsSuite extends GlutenClickHouseWholeStageTra
}
}

test("base64") {
testSparkVersionLE33("base64") {
// fallback on Spark-352, see https://github.com/apache/spark/pull/47303
val tableName = "base64_table"
withTable(tableName) {
sql(s"create table $tableName(data String) using parquet")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.gluten.vectorized.NativePartitioning

import org.apache.spark.SparkConf
import org.apache.spark.{SparkConf, TaskContext}
import org.apache.spark.internal.config._
import org.apache.spark.shuffle.api.ShuffleExecutorComponents
import org.apache.spark.storage.{BlockId, BlockManagerId}
import org.apache.spark.util.random.XORShiftRandom

Expand Down Expand Up @@ -122,4 +123,17 @@ object GlutenShuffleUtils {
startPartition,
endPartition)
}

def getSortShuffleWriter[K, V](
handle: ShuffleHandle,
mapId: Long,
context: TaskContext,
metrics: ShuffleWriteMetricsReporter,
shuffleExecutorComponents: ShuffleExecutorComponents
): ShuffleWriter[K, V] = {
handle match {
case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>
SparkSortShuffleWriterUtil.create(other, mapId, context, metrics, shuffleExecutorComponents)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,12 @@ class ColumnarShuffleManager(conf: SparkConf) extends ShuffleManager with Loggin
metrics,
shuffleExecutorComponents)
case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>
new SortShuffleWriter(other, mapId, context, shuffleExecutorComponents)
GlutenShuffleUtils.getSortShuffleWriter(
other,
mapId,
context,
metrics,
shuffleExecutorComponents)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,8 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude("test with tab delimiter and double quote")
// Arrow not support corrupt record
.exclude("SPARK-27873: disabling enforceSchema should not fail columnNameOfCorruptRecord")
// varchar
.exclude("SPARK-48241: CSV parsing failure with char/varchar type columns")
enableSuite[GlutenCSVv2Suite]
.exclude("Gluten - test for FAILFAST parsing mode")
// Rule org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown in batch
Expand All @@ -213,6 +215,8 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude("test with tab delimiter and double quote")
// Arrow not support corrupt record
.exclude("SPARK-27873: disabling enforceSchema should not fail columnNameOfCorruptRecord")
// varchar
.exclude("SPARK-48241: CSV parsing failure with char/varchar type columns")
enableSuite[GlutenCSVLegacyTimeParserSuite]
// file cars.csv include null string, Arrow not support to read
.exclude("DDL test with schema")
Expand All @@ -226,6 +230,8 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude("DDL test with tab separated file")
.exclude("DDL test parsing decimal type")
.exclude("test with tab delimiter and double quote")
// varchar
.exclude("SPARK-48241: CSV parsing failure with char/varchar type columns")
enableSuite[GlutenJsonV1Suite]
// FIXME: Array direct selection fails
.exclude("Complex field and type inferring")
Expand Down
2 changes: 2 additions & 0 deletions package/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,8 @@
<ignoreClass>org.apache.spark.sql.hive.execution.HiveFileFormat</ignoreClass>
<ignoreClass>org.apache.spark.sql.hive.execution.HiveFileFormat$$$$anon$1</ignoreClass>
<ignoreClass>org.apache.spark.sql.hive.execution.HiveOutputWriter</ignoreClass>
<ignoreClass>org.apache.spark.sql.catalyst.plans.QueryPlan</ignoreClass>
<ignoreClass>org.apache.spark.sql.catalyst.plans.QueryPlan*</ignoreClass>
<ignoreClass>org.apache.spark.sql.execution.datasources.BasicWriteTaskStats</ignoreClass>
<ignoreClass>org.apache.spark.sql.execution.datasources.BasicWriteTaskStats$</ignoreClass>
<ignoreClass>org.apache.spark.sql.execution.datasources.BasicWriteTaskStatsTracker</ignoreClass>
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@
<sparkbundle.version>3.5</sparkbundle.version>
<sparkshim.module.name>spark35</sparkshim.module.name>
<sparkshim.artifactId>spark-sql-columnar-shims-spark35</sparkshim.artifactId>
<spark.version>3.5.1</spark.version>
<spark.version>3.5.2</spark.version>
<iceberg.version>1.5.0</iceberg.version>
<delta.package.name>delta-spark</delta.package.name>
<delta.version>3.2.0</delta.version>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.shuffle

import org.apache.spark.TaskContext
import org.apache.spark.shuffle.api.ShuffleExecutorComponents
import org.apache.spark.shuffle.sort.SortShuffleWriter

object SparkSortShuffleWriterUtil {
def create[K, V, C](
handle: BaseShuffleHandle[K, V, C],
mapId: Long,
context: TaskContext,
writeMetrics: ShuffleWriteMetricsReporter,
shuffleExecutorComponents: ShuffleExecutorComponents): ShuffleWriter[K, V] = {
new SortShuffleWriter(handle, mapId, context, shuffleExecutorComponents)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.shuffle

import org.apache.spark.TaskContext
import org.apache.spark.shuffle.api.ShuffleExecutorComponents
import org.apache.spark.shuffle.sort.SortShuffleWriter

object SparkSortShuffleWriterUtil {
def create[K, V, C](
handle: BaseShuffleHandle[K, V, C],
mapId: Long,
context: TaskContext,
writeMetrics: ShuffleWriteMetricsReporter,
shuffleExecutorComponents: ShuffleExecutorComponents): ShuffleWriter[K, V] = {
new SortShuffleWriter(handle, mapId, context, shuffleExecutorComponents)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.shuffle

import org.apache.spark.TaskContext
import org.apache.spark.shuffle.api.ShuffleExecutorComponents
import org.apache.spark.shuffle.sort.SortShuffleWriter

object SparkSortShuffleWriterUtil {
def create[K, V, C](
handle: BaseShuffleHandle[K, V, C],
mapId: Long,
context: TaskContext,
writeMetrics: ShuffleWriteMetricsReporter,
shuffleExecutorComponents: ShuffleExecutorComponents): ShuffleWriter[K, V] = {
new SortShuffleWriter(handle, mapId, context, shuffleExecutorComponents)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -526,15 +526,27 @@ class Spark35Shims extends SparkShims {
Seq(expr.srcArrayExpr, expr.posExpr, expr.itemExpr, Literal(expr.legacyNegativeIndex))
}

override def withOperatorIdMap[T](idMap: java.util.Map[QueryPlan[_], Int])(body: => T): T = {
val prevIdMap = QueryPlan.localIdMap.get()
try {
QueryPlan.localIdMap.set(idMap)
body
} finally {
QueryPlan.localIdMap.set(prevIdMap)
}
}

override def getOperatorId(plan: QueryPlan[_]): Option[Int] = {
plan.getTagValue(QueryPlan.OP_ID_TAG)
Option(QueryPlan.localIdMap.get().get(plan))
}

override def setOperatorId(plan: QueryPlan[_], opId: Int): Unit = {
plan.setTagValue(QueryPlan.OP_ID_TAG, opId)
val map = QueryPlan.localIdMap.get()
assert(!map.containsKey(plan))
map.put(plan, opId)
}

override def unsetOperatorId(plan: QueryPlan[_]): Unit = {
plan.unsetTagValue(QueryPlan.OP_ID_TAG)
QueryPlan.localIdMap.get().remove(plan)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import org.apache.gluten.sql.shims.{SparkShimDescriptor, SparkShims}
import org.apache.gluten.sql.shims.spark35.SparkShimProvider.DESCRIPTOR

object SparkShimProvider {
val DESCRIPTOR = SparkShimDescriptor(3, 5, 1)
val DESCRIPTOR = SparkShimDescriptor(3, 5, 2)
}

class SparkShimProvider extends org.apache.gluten.sql.shims.SparkShimProvider {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.shuffle

import org.apache.spark.TaskContext
import org.apache.spark.shuffle.api.ShuffleExecutorComponents
import org.apache.spark.shuffle.sort.SortShuffleWriter

object SparkSortShuffleWriterUtil {
def create[K, V, C](
handle: BaseShuffleHandle[K, V, C],
mapId: Long,
context: TaskContext,
writeMetrics: ShuffleWriteMetricsReporter,
shuffleExecutorComponents: ShuffleExecutorComponents): ShuffleWriter[K, V] = {
new SortShuffleWriter(handle, mapId, context, writeMetrics, shuffleExecutorComponents)
}
}
2 changes: 1 addition & 1 deletion tools/gluten-it/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@
<profile>
<id>spark-3.5</id>
<properties>
<spark.version>3.5.1</spark.version>
<spark.version>3.5.2</spark.version>
<scala.library.version>2.12.18</scala.library.version>
</properties>
</profile>
Expand Down

0 comments on commit 32cd1dc

Please sign in to comment.