Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GLUTEN-4424] Explore adding Spark 35 w/ Scala 2.12 only (WIP) #4425

Closed
wants to merge 23 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
0d38e60
Use sudo when git clean fails for perms.
holdenk Nov 26, 2023
2bb9a91
Try and build w/3.5 profile.
holdenk Dec 29, 2023
dcb1140
Add advisoryPartitionSize for 3.5
holdenk Dec 29, 2023
624eefb
Add spark-3.5 profile to shims pom.
holdenk Dec 29, 2023
5d7119a
Copy the 34 to 35 shims
holdenk Dec 29, 2023
00b6453
Use simplified partiioned file API
holdenk Nov 13, 2023
c79f4c6
Make some more progress towards 3.5 support.
holdenk Feb 21, 2024
21dcb3a
Work a bit on getting 3.4 working again.
holdenk Nov 17, 2023
cf0cf3f
Add shim for fromAttributes
holdenk Nov 17, 2023
d2f8797
Make the 3.5 substrait based on the previous.
holdenk Nov 11, 2023
b4a10a0
Remove unused imports
holdenk Jan 16, 2024
4da60d4
Remove more unused imports.
holdenk Jan 16, 2024
02c64fa
Add spark 3.5 profile to root build.
holdenk Jan 16, 2024
f38ccc8
Fix spark35 shim artifact name
holdenk Jan 16, 2024
3adb80a
To array breaks in 3.4
holdenk Jan 17, 2024
abec8d5
Try and support local running of tests better.
holdenk Jan 17, 2024
4eb46d4
Move the Spark35Shims to match the naming convetion.
holdenk Jan 17, 2024
16514fb
Ignore when we can't make a direct memory buffer and don't test OnHea…
holdenk Jan 17, 2024
129bebc
A little bit of refactoring so the test args are present if needed (e…
holdenk Jan 17, 2024
0b1699a
Bring over getLimit and createTestTaskCOntext from the 34 to 35 shims.
holdenk Feb 21, 2024
7bc3564
Spotless update 3.4
holdenk Feb 21, 2024
13d0c05
Update 3.5 version to match.
holdenk Feb 21, 2024
19225d8
Port over more 3.4 changes to 3.5. Note: HashPartitioningLike did not…
holdenk Feb 21, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;

import static org.junit.Assume.*;

// FIXME our checkstyle config doesn't allow "Suite" as suffix of Java tests
public class OnHeapFileSystemTest {
private final JniFilesystem fs = OnHeapFileSystem.INSTANCE;
Expand All @@ -33,6 +35,13 @@ public void testRoundTrip() {
final String text = "HELLO WORLD";
final long fileSize;
JniFilesystem.WriteFile writeFile = fs.openFileForWrite(path);

try {
ByteBuffer buf = PlatformDependent.allocateDirectNoCleaner(1);
PlatformDependent.freeDirectNoCleaner(buf);
} catch (java.lang.AssertionError e) {
assumeTrue("We are in a JVM which does not support allocateDirectNoCleaner.", false);
}
try {
byte[] bytes = text.getBytes(StandardCharsets.UTF_8);
ByteBuffer buf = PlatformDependent.allocateDirectNoCleaner(bytes.length);
Expand Down
1 change: 1 addition & 0 deletions dev/buildbundle-veloxbe.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ cd $GLUTEN_DIR
mvn clean package -Pbackends-velox -Prss -Pspark-3.2 -DskipTests
mvn clean package -Pbackends-velox -Prss -Pspark-3.3 -DskipTests
mvn clean package -Pbackends-velox -Prss -Pspark-3.4 -DskipTests
mvn clean package -Pbackends-velox -Prss -Pspark-3.5 -DskipTests
6 changes: 4 additions & 2 deletions ep/build-velox/src/build_velox.sh
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,10 @@ function check_commit {
fi
fi
else
# Branch-new build requires all untracked files to be deleted. We only need the source code.
git clean -dffx :/
# On linux some of the scripts use sudo in the build so it fails to clean.
# Yes the right solution is having the build scripts not use sudo, but that's
# huge change and depends on sub-modules.
git clean -dffx :/ || sudo git clean -dffx :/
fi

if [ -f ${VELOX_HOME}/velox-build.cache ]; then
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,35 +155,6 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f
override def outputOrdering: Seq[SortOrder] = child.outputOrdering

override def otherCopyArgs: Seq[AnyRef] = Seq(transformStageId.asInstanceOf[Integer])

override def generateTreeString(
depth: Int,
lastChildren: Seq[Boolean],
append: String => Unit,
verbose: Boolean,
prefix: String = "",
addSuffix: Boolean = false,
maxFields: Int,
printNodeId: Boolean,
indent: Int = 0): Unit = {
val prefix = if (printNodeId) "^ " else s"^($transformStageId) "
child.generateTreeString(
depth,
lastChildren,
append,
verbose,
prefix,
addSuffix = false,
maxFields,
printNodeId = printNodeId,
indent)
if (verbose && wholeStageTransformerContext.isDefined) {
append(prefix + "Substrait plan:\n")
append(substraitPlanJson)
append("\n")
}
}

// It's misleading with "Codegen" used. But we have to keep "WholeStageCodegen" prefixed to
// make whole stage transformer clearly plotted in UI, like spark's whole stage codegen.
// See buildSparkPlanGraphNode in SparkPlanGraph.scala of Spark.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ object ConverterUtils extends Logging {
typ =>
val (field, nullable) = parseFromSubstraitType(typ)
StructField("", field, nullable)
}
}.asJava
(StructType(fields), isNullable(substraitType.getStruct.getNullability))
case Type.KindCase.LIST =>
val list = substraitType.getList
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import io.glutenproject.sql.shims.SparkShimLoader
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.execution.PartitionedFileUtil
import org.apache.spark.sql.execution.datasources.{FilePartition, HadoopFsRelation, PartitionDirectory}
import org.apache.spark.util.collection.BitSet

Expand Down Expand Up @@ -60,7 +59,7 @@ case class InputPartitionsUtil(
val filePath = file.getPath
val isSplitable =
relation.fileFormat.isSplitable(relation.sparkSession, relation.options, filePath)
PartitionedFileUtil.splitFiles(
SparkShimLoader.getSparkShims.splitFiles(
sparkSession = relation.sparkSession,
file = file,
filePath = filePath,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ case class ColumnarShuffleExchangeExec(
}
}

// Added in 3.5
def advisoryPartitionSize: Option[Long] = None

/**
* A [[ShuffleDependency]] that will partition rows of its child based on the partitioning scheme
* defined in `newPartitioning`. Those partitions of the returned ShuffleDependency will be the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,8 @@ object GlutenImplicits {
FallbackSummary(
totalNumGlutenNodes,
totalNumFallbackNodes,
totalPhysicalPlanDescription,
totalFallbackNodeToReason
totalPhysicalPlanDescription.toSeq,
totalFallbackNodeToReason.toSeq
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@
package org.apache.spark.sql.hive

import io.glutenproject.backendsapi.BackendsApiManager
import io.glutenproject.sql.shims.SparkShimLoader

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.{InternalRow, SQLConfHelper}
import org.apache.spark.sql.catalyst.analysis.CastSupport
import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.execution.PartitionedFileUtil
import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionDirectory}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.DataType
Expand Down Expand Up @@ -137,13 +137,13 @@ class HivePartitionConverter(hadoopConf: Configuration, session: SparkSession)
partition.files
.flatMap {
f =>
PartitionedFileUtil.splitFiles(
SparkShimLoader.getSparkShims.splitFiles(
session,
f,
f.getPath,
isSplitable = canBeSplit(f.getPath),
maxSplitBytes,
partition.values
maxSplitBytes = maxSplitBytes,
partitionValues = partition.values
)
}
.sortBy(_.length)(implicitly[Ordering[Long]].reverse)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,21 +107,7 @@ abstract class GlutenQueryTest extends PlanTest {
}

private def getResult[T](ds: => Dataset[T]): Array[T] = {
val analyzedDS =
try ds
catch {
case ae: AnalysisException =>
if (ae.plan.isDefined) {
fail(s"""
|Failed to analyze query: $ae
|${ae.plan.get}
|
|${stackTraceToString(ae)}
""".stripMargin)
} else {
throw ae
}
}
val analyzedDS = ds
assertEmptyMissingInput(analyzedDS)

try ds.collect()
Expand All @@ -148,21 +134,7 @@ abstract class GlutenQueryTest extends PlanTest {
* the expected result in a [[Seq]] of [[Row]]s.
*/
protected def checkAnswer(df: => DataFrame, expectedAnswer: Seq[Row]): Unit = {
val analyzedDF =
try df
catch {
case ae: AnalysisException =>
if (ae.plan.isDefined) {
fail(s"""
|Failed to analyze query: $ae
|${ae.plan.get}
|
|${stackTraceToString(ae)}
|""".stripMargin)
} else {
throw ae
}
}
val analyzedDF = df

assertEmptyMissingInput(analyzedDF)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ import io.glutenproject.columnarbatch.ColumnarBatches
import io.glutenproject.exec.Runtimes
import io.glutenproject.memory.arrowalloc.ArrowBufferAllocators
import io.glutenproject.memory.nmm.NativeMemoryManagers
import io.glutenproject.sql.shims._
import io.glutenproject.utils.{ArrowAbiUtil, Iterators}
import io.glutenproject.vectorized.{ColumnarBatchSerializerJniWrapper, NativeColumnarToRowJniWrapper}

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, BoundReference, Expression, UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.execution.joins.BuildSideRelation
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.utils.SparkArrowUtil
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.TaskResources
Expand All @@ -45,7 +45,7 @@ case class ColumnarBuildSideRelation(output: Seq[Attribute], batches: Array[Arra
val allocator = ArrowBufferAllocators.contextInstance()
val cSchema = ArrowSchema.allocateNew(allocator)
val arrowSchema = SparkArrowUtil.toArrowSchema(
StructType.fromAttributes(output),
SparkShimLoader.getSparkShims.structFromAttributes(output),
SQLConf.get.sessionLocalTimeZone)
ArrowAbiUtil.exportSchema(allocator, arrowSchema, cSchema)
val handle = jniWrapper
Expand Down Expand Up @@ -96,7 +96,7 @@ case class ColumnarBuildSideRelation(output: Seq[Attribute], batches: Array[Arra
val allocator = ArrowBufferAllocators.contextInstance()
val cSchema = ArrowSchema.allocateNew(allocator)
val arrowSchema = SparkArrowUtil.toArrowSchema(
StructType.fromAttributes(output),
SparkShimLoader.getSparkShims.structFromAttributes(output),
SQLConf.get.sessionLocalTimeZone)
ArrowAbiUtil.exportSchema(allocator, arrowSchema, cSchema)
val handle = serializerJniWrapper
Expand Down
Loading
Loading