Skip to content

Commit

Permalink
[GLUTEN-4424][CORE] Upgrade spark version to 3.5.1 in Gluten (apache#…
Browse files Browse the repository at this point in the history
…4822)

* Upgrade spark version to 3.5

---------

Co-authored-by: Holden Karau <[email protected]>
  • Loading branch information
JkSelf and holdenk authored Mar 4, 2024
1 parent bca0325 commit 6b0f346
Show file tree
Hide file tree
Showing 40 changed files with 3,145 additions and 49 deletions.
34 changes: 34 additions & 0 deletions .github/workflows/velox_be.yml
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,40 @@ jobs:
if: ${{ always() }}
run: |
$PATH_TO_GLUTEN_TE/$OS_IMAGE_NAME/gha/gha-checkout/clean.sh
ubuntu2004-test-spark35:
runs-on: velox-self-hosted
env:
OS_IMAGE_NAME: ubuntu
OS_IMAGE_TAG: 20.04
steps:
- uses: actions/checkout@v4
- name: Setup docker container
run: |
$PATH_TO_GLUTEN_TE/$OS_IMAGE_NAME/gha/gha-checkout/checkout.sh
- name: Build Gluten velox third party
run: |
$PATH_TO_GLUTEN_TE/$OS_IMAGE_NAME/gha/gha-checkout/exec.sh '
cd /opt/gluten/ep/build-velox/src && \
./get_velox.sh && \
./build_velox.sh --run_setup_script=ON --enable_ep_cache=OFF'
- name: Build Gluten CPP library
run: |
$PATH_TO_GLUTEN_TE/$OS_IMAGE_NAME/gha/gha-checkout/exec.sh '
cd /opt/gluten/cpp && \
./compile.sh --build_velox_backend=ON '
- name: TPC-H SF1.0 && TPC-DS SF1.0 Parquet local spark3.5
run: |
$PATH_TO_GLUTEN_TE/$OS_IMAGE_NAME/gha/gha-checkout/exec.sh 'cd /opt/gluten/tools/gluten-it && \
mvn clean install -Pspark-3.5 \
&& GLUTEN_IT_JVM_ARGS=-Xmx5G sbin/gluten-it.sh queries-compare \
--local --preset=velox --benchmark-type=h --error-on-memleak --off-heap-size=10g -s=1.0 --threads=16 --iterations=1 \
&& GLUTEN_IT_JVM_ARGS=-Xmx5G sbin/gluten-it.sh queries-compare \
--local --preset=velox --benchmark-type=ds --error-on-memleak --off-heap-size=10g -s=1.0 --threads=16 --iterations=1'
- name: Exit docker container
if: ${{ always() }}
run: |
$PATH_TO_GLUTEN_TE/$OS_IMAGE_NAME/gha/gha-checkout/clean.sh
ubuntu2204-test-spark33-spark34:
runs-on: velox-self-hosted
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution

import io.glutenproject.columnarbatch.ColumnarBatches
import io.glutenproject.memory.nmm.NativeMemoryManagers
import io.glutenproject.sql.shims.SparkShimLoader
import io.glutenproject.vectorized.{ColumnarBatchSerializeResult, ColumnarBatchSerializerJniWrapper}

import org.apache.spark.SparkContext
Expand Down Expand Up @@ -104,7 +105,9 @@ object BroadcastUtils {
case result: ColumnarBatchSerializeResult =>
Array(result.getSerialized)
}
ColumnarBuildSideRelation(schema.toAttributes, serialized)
ColumnarBuildSideRelation(
SparkShimLoader.getSparkShims.attributesFromStruct(schema),
serialized)
}
// Rebroadcast Velox relation.
context.broadcast(toRelation).asInstanceOf[Broadcast[T]]
Expand All @@ -120,7 +123,9 @@ object BroadcastUtils {
case result: ColumnarBatchSerializeResult =>
Array(result.getSerialized)
}
ColumnarBuildSideRelation(schema.toAttributes, serialized)
ColumnarBuildSideRelation(
SparkShimLoader.getSparkShims.attributesFromStruct(schema),
serialized)
}
// Rebroadcast Velox relation.
context.broadcast(toRelation).asInstanceOf[Broadcast[T]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,16 @@ trait UnaryTransformSupport extends TransformSupport with UnaryExecNode {

case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = false)(
val transformStageId: Int
) extends UnaryTransformSupport {
) extends GenerateTreeStringShim
with UnaryTransformSupport {
assert(child.isInstanceOf[TransformSupport])

def stageId: Int = transformStageId

def substraitPlanJsonValue: String = substraitPlanJson

def wholeStageTransformerContextDefined: Boolean = wholeStageTransformerContext.isDefined

// For WholeStageCodegen-like operator, only pipeline time will be handled in graph plotting.
// See SparkPlanGraph.scala:205 for reference.
// Note: "metrics" is made transient to avoid sending driver-side metrics to tasks.
Expand Down Expand Up @@ -158,34 +165,6 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f

override def otherCopyArgs: Seq[AnyRef] = Seq(transformStageId.asInstanceOf[Integer])

override def generateTreeString(
depth: Int,
lastChildren: Seq[Boolean],
append: String => Unit,
verbose: Boolean,
prefix: String = "",
addSuffix: Boolean = false,
maxFields: Int,
printNodeId: Boolean,
indent: Int = 0): Unit = {
val prefix = if (printNodeId) "^ " else s"^($transformStageId) "
child.generateTreeString(
depth,
lastChildren,
append,
verbose,
prefix,
addSuffix = false,
maxFields,
printNodeId = printNodeId,
indent)
if (verbose && wholeStageTransformerContext.isDefined) {
append(prefix + "Substrait plan:\n")
append(substraitPlanJson)
append("\n")
}
}

// It's misleading with "Codegen" used. But we have to keep "WholeStageCodegen" prefixed to
// make whole stage transformer clearly plotted in UI, like spark's whole stage codegen.
// See buildSparkPlanGraphNode in SparkPlanGraph.scala of Spark.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import io.glutenproject.sql.shims.SparkShimLoader
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.execution.PartitionedFileUtil
import org.apache.spark.sql.execution.datasources.{FilePartition, HadoopFsRelation, PartitionDirectory}
import org.apache.spark.util.collection.BitSet

Expand Down Expand Up @@ -54,13 +53,13 @@ case class InputPartitionsUtil(
val splitFiles = selectedPartitions
.flatMap {
partition =>
partition.files.flatMap {
SparkShimLoader.getSparkShims.getFileStatus(partition).flatMap {
file =>
// getPath() is very expensive so we only want to call it once in this block:
val filePath = file.getPath
val isSplitable =
relation.fileFormat.isSplitable(relation.sparkSession, relation.options, filePath)
PartitionedFileUtil.splitFiles(
SparkShimLoader.getSparkShims.splitFiles(
sparkSession = relation.sparkSession,
file = file,
filePath = filePath,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ case class ColumnarShuffleExchangeExec(
override val outputPartitioning: Partitioning,
child: SparkPlan,
shuffleOrigin: ShuffleOrigin = ENSURE_REQUIREMENTS,
projectOutputAttributes: Seq[Attribute])
projectOutputAttributes: Seq[Attribute],
advisoryPartitionSize: Option[Long] = None)
extends ShuffleExchangeLike
with GlutenPlan {
private[sql] lazy val writeMetrics =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@
package org.apache.spark.sql.hive

import io.glutenproject.backendsapi.BackendsApiManager
import io.glutenproject.sql.shims.SparkShimLoader

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.{InternalRow, SQLConfHelper}
import org.apache.spark.sql.catalyst.analysis.CastSupport
import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.execution.PartitionedFileUtil
import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionDirectory}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.DataType
Expand Down Expand Up @@ -134,10 +134,11 @@ class HivePartitionConverter(hadoopConf: Configuration, session: SparkSession)
val maxSplitBytes = FilePartition.maxSplitBytes(session, selectedPartitions)
val splitFiles = selectedPartitions.flatMap {
partition =>
partition.files
SparkShimLoader.getSparkShims
.getFileStatus(partition)
.flatMap {
f =>
PartitionedFileUtil.splitFiles(
SparkShimLoader.getSparkShims.splitFiles(
session,
f,
f.getPath,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package org.apache.spark.sql
*/
import org.apache.spark.SPARK_VERSION_SHORT
import org.apache.spark.rpc.GlutenDriverEndpoint
import org.apache.spark.sql.catalyst.ExtendedAnalysisException
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan, Sort, Subquery, SubqueryAlias}
Expand Down Expand Up @@ -110,7 +111,7 @@ abstract class GlutenQueryTest extends PlanTest {
val analyzedDS =
try ds
catch {
case ae: AnalysisException =>
case ae: ExtendedAnalysisException =>
if (ae.plan.isDefined) {
fail(s"""
|Failed to analyze query: $ae
Expand Down Expand Up @@ -151,7 +152,7 @@ abstract class GlutenQueryTest extends PlanTest {
val analyzedDF =
try df
catch {
case ae: AnalysisException =>
case ae: ExtendedAnalysisException =>
if (ae.plan.isDefined) {
fail(s"""
|Failed to analyze query: $ae
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ import io.glutenproject.columnarbatch.ColumnarBatches
import io.glutenproject.exec.Runtimes
import io.glutenproject.memory.arrowalloc.ArrowBufferAllocators
import io.glutenproject.memory.nmm.NativeMemoryManagers
import io.glutenproject.sql.shims.SparkShimLoader
import io.glutenproject.utils.{ArrowAbiUtil, Iterators}
import io.glutenproject.vectorized.{ColumnarBatchSerializerJniWrapper, NativeColumnarToRowJniWrapper}

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, BoundReference, Expression, UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.execution.joins.BuildSideRelation
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.utils.SparkArrowUtil
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.TaskResources
Expand All @@ -45,7 +45,7 @@ case class ColumnarBuildSideRelation(output: Seq[Attribute], batches: Array[Arra
val allocator = ArrowBufferAllocators.contextInstance()
val cSchema = ArrowSchema.allocateNew(allocator)
val arrowSchema = SparkArrowUtil.toArrowSchema(
StructType.fromAttributes(output),
SparkShimLoader.getSparkShims.structFromAttributes(output),
SQLConf.get.sessionLocalTimeZone)
ArrowAbiUtil.exportSchema(allocator, arrowSchema, cSchema)
val handle = jniWrapper
Expand Down Expand Up @@ -96,7 +96,7 @@ case class ColumnarBuildSideRelation(output: Seq[Attribute], batches: Array[Arra
val allocator = ArrowBufferAllocators.contextInstance()
val cSchema = ArrowSchema.allocateNew(allocator)
val arrowSchema = SparkArrowUtil.toArrowSchema(
StructType.fromAttributes(output),
SparkShimLoader.getSparkShims.structFromAttributes(output),
SQLConf.get.sessionLocalTimeZone)
ArrowAbiUtil.exportSchema(allocator, arrowSchema, cSchema)
val handle = serializerJniWrapper
Expand Down
10 changes: 10 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,16 @@
<delta.binary.version>24</delta.binary.version>
</properties>
</profile>
<profile>
<id>spark-3.5</id>
<properties>
<sparkbundle.version>3.5</sparkbundle.version>
<sparkshim.artifactId>spark-sql-columnar-shims-spark35</sparkshim.artifactId>
<spark.version>3.5.1</spark.version>
<delta.version>2.4.0</delta.version>
<delta.binary.version>24</delta.binary.version>
</properties>
</profile>
<profile>
<id>hadoop-2.7.4</id>
<properties>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.shuffle.{ShuffleHandle, ShuffleReader, ShuffleReadMetric
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.plans.physical.Distribution
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.Table
Expand All @@ -38,6 +38,8 @@ import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.storage.{BlockId, BlockManagerId}

import org.apache.hadoop.fs.{FileStatus, Path}

sealed abstract class ShimDescriptor

case class SparkShimDescriptor(major: Int, minor: Int, patch: Int) extends ShimDescriptor {
Expand Down Expand Up @@ -126,4 +128,18 @@ trait SparkShims {

// Because above, this feature is only supported after spark 3.3
def supportDuplicateReadingTracking: Boolean

def getFileStatus(partition: PartitionDirectory): Seq[FileStatus]

def splitFiles(
sparkSession: SparkSession,
file: FileStatus,
filePath: Path,
isSplitable: Boolean,
maxSplitBytes: Long,
partitionValues: InternalRow): Seq[PartitionedFile]

def structFromAttributes(attrs: Seq[Attribute]): StructType

def attributesFromStruct(structType: StructType): Seq[Attribute]
}
6 changes: 6 additions & 0 deletions shims/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -88,5 +88,11 @@
<module>spark34</module>
</modules>
</profile>
<profile>
<id>spark-3.5</id>
<modules>
<module>spark35</module>
</modules>
</profile>
</profiles>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.glutenproject.execution

import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}

/**
* Spark 3.5 has changed the parameter type of the generateTreeString API in TreeNode. In order to
* support multiple versions of Spark, we cannot directly override the generateTreeString method in
* WhostageTransformer. Therefore, we have defined the GenerateTreeStringShim trait in the shim to
* allow different Spark versions to override their own generateTreeString.
*/

trait GenerateTreeStringShim extends UnaryExecNode {

def stageId: Int

def substraitPlanJsonValue: String

def wholeStageTransformerContextDefined: Boolean

override def generateTreeString(
depth: Int,
lastChildren: Seq[Boolean],
append: String => Unit,
verbose: Boolean,
prefix: String = "",
addSuffix: Boolean = false,
maxFields: Int,
printNodeId: Boolean,
indent: Int = 0): Unit = {
val prefix = if (printNodeId) "^ " else s"^($stageId) "
child.generateTreeString(
depth,
lastChildren,
append,
verbose,
prefix,
addSuffix = false,
maxFields,
printNodeId = printNodeId,
indent)

if (verbose && wholeStageTransformerContextDefined) {
append(prefix + "Substrait plan:\n")
append(substraitPlanJsonValue)
append("\n")
}
}
}
Loading

0 comments on commit 6b0f346

Please sign in to comment.