Skip to content

Commit

Permalink
[VL] Add Scala 2.13 support (apache#6326)
Browse files Browse the repository at this point in the history
* [VL] Add Scala 2.13 support

* Fix scalaStyle issues

* Fix Scala Style issues

* Add Spark 3.5.1 and Scala 2.13 test in workflow

* Add run-spark-test-spark35-scala213 job

* Add Spark 3.5.1 and Scala 2.13 test in workflow

* Fix tests failures

* Fix tests failures

* ScalaStyle fix

* Fix SoftAffinitySuite

* Fix ArrowUtil error

* Fix backend-velox scala issues

* Fix ColumnarArrowEvalPythonExec issues

* Fix ColumnarArrowEvalPythonExec issues

* Fix TestOperator.scala for style issues

* Fix TestOperator.scala for style issues

* Fix issues in DeltaRewriteTransformerRules.scala

* DeltaRewriteTransformerRules fix

* Fix style issues

* Fix issues

* Fix issues

* Fix issues

* Fix issues

* Fix issues

* Fix issues

* Fix issues

---------

Co-authored-by: Hongze Zhang <[email protected]>
  • Loading branch information
2 people authored and weiting-chen committed Aug 10, 2024
1 parent 10ac7e7 commit 2671425
Show file tree
Hide file tree
Showing 26 changed files with 205 additions and 113 deletions.
68 changes: 68 additions & 0 deletions .github/workflows/velox_docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1097,6 +1097,74 @@ jobs:
name: golden-files-spark35
path: /tmp/tpch-approved-plan/**

run-spark-test-spark35-scala213:
needs: build-native-lib-centos-8
runs-on: ubuntu-20.04
container: ghcr.io/facebookincubator/velox-dev:centos8
env:
CCACHE_DIR: "${{ github.workspace }}/.ccache"
steps:
- uses: actions/checkout@v2
- name: Download All Artifacts
uses: actions/download-artifact@v2
with:
name: velox-native-lib-centos-8-${{github.sha}}
path: ./cpp/build/releases
- name: Download UDF Example Lib
uses: actions/download-artifact@v2
with:
name: udf-example-lib-centos-8-${{github.sha}}
path: ./cpp/build/velox/udf/examples/
- name: Download Arrow Jars
uses: actions/download-artifact@v2
with:
name: arrow-jars-centos-8-${{github.sha}}
path: /root/.m2/repository/org/apache/arrow/
- name: Update mirror list
run: |
sed -i -e "s|mirrorlist=|#mirrorlist=|g" /etc/yum.repos.d/CentOS-* || true
sed -i -e "s|#baseurl=http://mirror.centos.org|baseurl=http://vault.centos.org|g" /etc/yum.repos.d/CentOS-* || true
- name: Setup build dependency
run: |
yum install sudo patch java-1.8.0-openjdk-devel wget -y
wget https://downloads.apache.org/maven/maven-3/3.8.8/binaries/apache-maven-3.8.8-bin.tar.gz
tar -xvf apache-maven-3.8.8-bin.tar.gz
mv apache-maven-3.8.8 /usr/lib/maven
echo "PATH=${PATH}:/usr/lib/maven/bin" >> $GITHUB_ENV
- name: Get Ccache
uses: actions/cache/restore@v3
with:
path: '${{ env.CCACHE_DIR }}'
key: ccache-centos-release-default
- name: Ensure Cache Dirs Exists
working-directory: ${{ github.workspace }}
run: |
mkdir -p '${{ env.CCACHE_DIR }}'
- name: Prepare spark.test.home for Spark 3.5.1 (other tests)
run: |
cd $GITHUB_WORKSPACE/ && \
wget https://archive.apache.org/dist/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz && \
tar --strip-components=1 -xf spark-3.5.1-bin-hadoop3.tgz spark-3.5.1-bin-hadoop3/jars/ && \
rm -rf spark-3.5.1-bin-hadoop3.tgz && \
mkdir -p $GITHUB_WORKSPACE//shims/spark35/spark_home/assembly/target/scala-2.13 && \
mv jars $GITHUB_WORKSPACE//shims/spark35/spark_home/assembly/target/scala-2.13 && \
cd $GITHUB_WORKSPACE// && \
wget https://github.com/apache/spark/archive/refs/tags/v3.5.1.tar.gz && \
tar --strip-components=1 -xf v3.5.1.tar.gz spark-3.5.1/sql/core/src/test/resources/ && \
mkdir -p shims/spark35/spark_home/ && \
mv sql shims/spark35/spark_home/ && \
dnf module -y install python39 && \
alternatives --set python3 /usr/bin/python3.9 && \
pip3 install setuptools && \
pip3 install pyspark==3.5.1 cython && \
pip3 install pandas pyarrow
- name: Build and Run unit test for Spark 3.5.1 with scala-2.13 (other tests)
run: |
cd $GITHUB_WORKSPACE/
export SPARK_SCALA_VERSION=2.13
$MVN_CMD clean install -Pspark-3.5 -Pscala-2.13 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Pspark-ut -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark35/spark_home/" -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTestTags && \
$MVN_CMD test -Pspark-3.5 -Pscala-2.13 -Pbackends-velox -Piceberg -Pdelta -DtagsToExclude=None -DtagsToInclude=org.apache.gluten.tags.UDFTest
run-spark-test-spark35-slow:
needs: build-native-lib-centos-8
runs-on: ubuntu-20.04
Expand Down
2 changes: 1 addition & 1 deletion .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
runner.dialect = scala212

# Version is required to make sure IntelliJ picks the right version
version = 3.5.9
version = 3.8.3
preset = default

# Max column
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -370,8 +370,9 @@ case class CHHashAggregateExecTransformer(
// Use approxPercentile.nullable as the nullable of the struct type
// to make sure it returns null when input is empty
fields = fields :+ (approxPercentile.child.dataType, approxPercentile.nullable)
fields = fields :+ (approxPercentile.percentageExpression.dataType,
approxPercentile.percentageExpression.nullable)
fields = fields :+ (
approxPercentile.percentageExpression.dataType,
approxPercentile.percentageExpression.nullable)
(makeStructType(fields), attr.nullable)
case _ =>
(makeStructTypeSingleOne(attr.dataType, attr.nullable), attr.nullable)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,13 +391,13 @@ class GlutenParquetFilterSuite
'p_size.int >= 1,
'p_partkey.long.isNotNull,
('p_brand.string === "Brand#12" &&
('p_container.string in ("SM CASE", "SM BOX", "SM PACK", "SM PKG")) &&
('p_container.string.in("SM CASE", "SM BOX", "SM PACK", "SM PKG")) &&
'p_size.int <= 5) ||
('p_brand.string === "Brand#23" &&
('p_container.string in ("MED BAG", "MED BOX", "MED PKG", "MED PACK")) &&
('p_container.string.in("MED BAG", "MED BOX", "MED PKG", "MED PACK")) &&
'p_size.int <= 10) ||
('p_brand.string === "Brand#34" &&
('p_container.string in ("LG CASE", "LG BOX", "LG PACK", "LG PKG")) &&
('p_container.string.in("LG CASE", "LG BOX", "LG PACK", "LG PKG")) &&
'p_size.int <= 15)
)
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ object CHStorageJoinBenchmark extends SqlBasedBenchmark with CHSqlBasedBenchmark
_numRows += batch.numRows
}
Iterator((_numRows, blockNativeWriter.collectAsByteArray()))
// Iterator((_numRows, new Array[Byte](0)))
// Iterator((_numRows, new Array[Byte](0)))
}
.collect
val count0 = countsAndBytes.map(_._1).sum
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ object PullOutGenerateProjectHelper extends PullOutProjectHelper {
}
}

newProjections += Alias(CreateArray(fieldArray), generatePreAliasName)()
newProjections += Alias(CreateArray(fieldArray.toSeq), generatePreAliasName)()
}

// Plug in a Project between Generate and its child.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,8 @@ abstract class HashAggregateExecTransformer(
childNodes.add(expressionNode)
}
}
exprNodes.add(getRowConstructNode(args, childNodes, newInputAttributes, aggFunc))
exprNodes.add(
getRowConstructNode(args, childNodes, newInputAttributes.toSeq, aggFunc))
case other =>
throw new GlutenNotSupportException(s"$other is not supported.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import java.io.{DataInputStream, DataOutputStream}
import java.net.Socket
import java.util.concurrent.atomic.AtomicBoolean

import scala.collection.{mutable, Seq}
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

class ColumnarArrowPythonRunner(
Expand All @@ -54,7 +54,7 @@ class ColumnarArrowPythonRunner(
schema: StructType,
timeZoneId: String,
conf: Map[String, String])
extends BasePythonRunnerShim(funcs, evalType, argOffsets) {
extends BasePythonRunnerShim(funcs.toSeq, evalType, argOffsets) {

override val simplifiedTraceback: Boolean = SQLConf.get.pysparkSimplifiedTraceback

Expand Down Expand Up @@ -239,7 +239,7 @@ case class ColumnarArrowEvalPythonExec(
val arrowSafeTypeCheck = Seq(
SQLConf.PANDAS_ARROW_SAFE_TYPE_CONVERSION.key ->
conf.arrowSafeTypeConversion.toString)
Map(timeZoneConf ++ pandasColsByName ++ arrowSafeTypeCheck: _*)
Map(timeZoneConf.toSeq ++ pandasColsByName.toSeq ++ arrowSafeTypeCheck: _*)
}

private val pythonRunnerConf = getPythonRunnerConfMap(conf)
Expand Down Expand Up @@ -280,7 +280,7 @@ case class ColumnarArrowEvalPythonExec(
case children =>
// There should not be any other UDFs, or the children can't be evaluated directly.
assert(children.forall(_.find(_.isInstanceOf[PythonUDF]).isEmpty))
(ChainedPythonFunctions(Seq(udf.func)), udf.children)
(ChainedPythonFunctions(Seq(udf.func).toSeq), udf.children)
}
}

Expand Down Expand Up @@ -410,7 +410,7 @@ object PullOutArrowEvalPythonPreProjectHelper extends PullOutProjectHelper {
val (chained, children) = collectFunctions(u)
(ChainedPythonFunctions(chained.funcs ++ Seq(udf.func)), children)
case children =>
(ChainedPythonFunctions(Seq(udf.func)), udf.children)
(ChainedPythonFunctions(Seq(udf.func).toSeq), udf.children)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,12 +116,12 @@ case class UDFExpression(
object UDFResolver extends Logging {
private val UDFNames = mutable.HashSet[String]()
// (udf_name, arg1, arg2, ...) => return type
private val UDFMap = mutable.HashMap[String, mutable.MutableList[UDFSignature]]()
private val UDFMap = mutable.HashMap[String, mutable.ListBuffer[UDFSignature]]()

private val UDAFNames = mutable.HashSet[String]()
// (udaf_name, arg1, arg2, ...) => return type, intermediate attributes
private val UDAFMap =
mutable.HashMap[String, mutable.MutableList[UDAFSignature]]()
mutable.HashMap[String, mutable.ListBuffer[UDAFSignature]]()

private val LIB_EXTENSION = ".so"

Expand All @@ -145,7 +145,7 @@ object UDFResolver extends Logging {
variableArity: Boolean): Unit = {
assert(argTypes.dataType.isInstanceOf[StructType])
val v =
UDFMap.getOrElseUpdate(name, mutable.MutableList[UDFSignature]())
UDFMap.getOrElseUpdate(name, mutable.ListBuffer[UDFSignature]())
v += UDFSignature(
returnType,
argTypes.dataType.asInstanceOf[StructType].fields.map(_.dataType),
Expand Down Expand Up @@ -189,7 +189,7 @@ object UDFResolver extends Logging {
}

val v =
UDAFMap.getOrElseUpdate(name, mutable.MutableList[UDAFSignature]())
UDAFMap.getOrElseUpdate(name, mutable.ListBuffer[UDAFSignature]())
v += UDAFSignature(
returnType,
argTypes.dataType.asInstanceOf[StructType].fields.map(_.dataType),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1458,9 +1458,11 @@ class TestOperator extends VeloxWholeStageTransformerSuite with AdaptiveSparkPla
path =>
(0 to 3).toDF("x").write.parquet(path.getCanonicalPath)
spark.read.parquet(path.getCanonicalPath).createOrReplaceTempView("view")
runQueryAndCompare(
"SELECT x FROM view WHERE cast(x as timestamp) " +
"IN ('1970-01-01 08:00:00.001','1970-01-01 08:00:00.2')")(_)
runQueryAndCompare(s"""
|SELECT x FROM view
|WHERE cast(x as timestamp)
|IN ('1970-01-01 08:00:00.001','1970-01-01 08:00:00.2')
|""".stripMargin)(_ => ())
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ class SoftAffinitySuite extends QueryTest with SharedSparkSession with Predicate
.set(GlutenConfig.GLUTEN_SOFT_AFFINITY_REPLICATIONS_NUM, "2")
.set(GlutenConfig.GLUTEN_SOFT_AFFINITY_MIN_TARGET_HOSTS, "2")

val scalaVersion = scala.util.Properties.versionNumberString

def generateNativePartition1(): Unit = {
val partition = FilePartition(
0,
Expand Down Expand Up @@ -97,7 +99,13 @@ class SoftAffinitySuite extends QueryTest with SharedSparkSession with Predicate

val nativePartition = GlutenPartition(0, PlanBuilder.EMPTY_PLAN, locations = locations)

assertResult(Set("host-1", "host-4", "host-5")) {
val affinityResultSet = if (scalaVersion.startsWith("2.12")) {
Set("host-1", "host-4", "host-5")
} else if (scalaVersion.startsWith("2.13")) {
Set("host-5", "host-4", "host-2")
}

assertResult(affinityResultSet) {
nativePartition.preferredLocations().toSet
}
}
Expand Down Expand Up @@ -184,7 +192,13 @@ class SoftAffinitySuite extends QueryTest with SharedSparkSession with Predicate

val nativePartition = GlutenPartition(0, PlanBuilder.EMPTY_PLAN, locations = locations)

assertResult(Set("host-1", "host-5", "host-6")) {
val affinityResultSet = if (scalaVersion.startsWith("2.12")) {
Set("host-1", "host-5", "host-6")
} else if (scalaVersion.startsWith("2.13")) {
Set("host-6", "host-5", "host-2")
}

assertResult(affinityResultSet) {
nativePartition.preferredLocations().toSet
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ object ArrowAbiUtil {
}
}

def exportField(allocator: BufferAllocator, field: Field, out: ArrowSchema) {
def exportField(allocator: BufferAllocator, field: Field, out: ArrowSchema): Unit = {
val dictProvider = new CDataDictionaryProvider
try {
Data.exportField(allocator, field, dictProvider, out)
Expand All @@ -128,7 +128,7 @@ object ArrowAbiUtil {
}
}

def exportSchema(allocator: BufferAllocator, schema: Schema, out: ArrowSchema) {
def exportSchema(allocator: BufferAllocator, schema: Schema, out: ArrowSchema): Unit = {
val dictProvider = new CDataDictionaryProvider
try {
Data.exportSchema(allocator, schema, dictProvider, out)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ object SparkArrowUtil {
val dt = fromArrowField(child)
StructField(child.getName, dt, child.isNullable)
}
StructType(fields)
StructType(fields.toSeq)
case arrowType => fromArrowType(arrowType)
}
}
Expand All @@ -147,7 +147,7 @@ object SparkArrowUtil {
}

def fromArrowSchema(schema: Schema): StructType = {
StructType(schema.getFields.asScala.map {
StructType(schema.getFields.asScala.toSeq.map {
field =>
val dt = fromArrowField(field)
StructField(field.getName, dt, field.isNullable)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.sql.delta.{DeltaColumnMapping, DeltaParquetFileFormat, N
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.datasources.FileFormat

import scala.collection._
import scala.collection.mutable.ListBuffer

class DeltaRewriteTransformerRules extends RewriteTransformerRules {
override def rules: Seq[Rule[SparkPlan]] = columnMappingRule :: Nil
Expand Down Expand Up @@ -87,8 +87,8 @@ object DeltaRewriteTransformerRules {
)(SparkSession.active)
// transform output's name into physical name so Reader can read data correctly
// should keep the columns order the same as the origin output
val originColumnNames = mutable.ListBuffer.empty[String]
val transformedAttrs = mutable.ListBuffer.empty[Attribute]
val originColumnNames = ListBuffer.empty[String]
val transformedAttrs = ListBuffer.empty[Attribute]
def mapAttribute(attr: Attribute) = {
val newAttr = if (!plan.isMetadataColumn(attr)) {
DeltaColumnMapping
Expand Down Expand Up @@ -142,7 +142,7 @@ object DeltaRewriteTransformerRules {
val expr = (transformedAttrs, originColumnNames).zipped.map {
(attr, columnName) => Alias(attr, columnName)(exprId = attr.exprId)
}
val projectExecTransformer = ProjectExecTransformer(expr, scanExecTransformer)
val projectExecTransformer = ProjectExecTransformer(expr.toSeq, scanExecTransformer)
projectExecTransformer
case _ => plan
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,18 +262,18 @@ class PathFinderSuite extends AnyFunSuite {

assert(path.plan() == Binary(n1, Group(1), Group(2)))
assert(
path.dive(state, 1).map(_.plan()) == List(
path.dive(state, 1).map(_.plan()).toList == List(
Binary(n1, Unary(n2, Group(3)), Unary(n3, Group(4)))))
assert(
path.dive(state, 2).map(_.plan()) == List(
path.dive(state, 2).map(_.plan()).toList == List(
Binary(n1, Unary(n2, Leaf(n4, 1)), Unary(n3, Leaf(n5, 1))),
Binary(n1, Unary(n2, Leaf(n4, 1)), Unary(n3, Leaf(n6, 1)))))
assert(
path.dive(state, 3).map(_.plan()) == List(
path.dive(state, 3).map(_.plan()).toList == List(
Binary(n1, Unary(n2, Leaf(n4, 1)), Unary(n3, Leaf(n5, 1))),
Binary(n1, Unary(n2, Leaf(n4, 1)), Unary(n3, Leaf(n6, 1)))))
assert(
path.dive(state, RasPath.INF_DEPTH).map(_.plan()) == List(
path.dive(state, RasPath.INF_DEPTH).map(_.plan()).toList == List(
Binary(n1, Unary(n2, Leaf(n4, 1)), Unary(n3, Leaf(n5, 1))),
Binary(n1, Unary(n2, Leaf(n4, 1)), Unary(n3, Leaf(n6, 1)))))
}
Expand Down Expand Up @@ -338,13 +338,13 @@ class PathFinderSuite extends AnyFunSuite {
path.dive(state, 1).map(_.plan()).toSeq == List(
Binary(n1, Binary(n2, Group(3), Group(4)), Leaf(n3, 1))))
assert(
path.dive(state, 2).map(_.plan()) == List(
path.dive(state, 2).map(_.plan()).toList == List(
Binary(n1, Binary(n2, Leaf(n4, 1), Leaf(n5, 1)), Leaf(n3, 1))))
assert(
path.dive(state, 3).map(_.plan()) == List(
path.dive(state, 3).map(_.plan()).toList == List(
Binary(n1, Binary(n2, Leaf(n4, 1), Leaf(n5, 1)), Leaf(n3, 1))))
assert(
path.dive(state, RasPath.INF_DEPTH).map(_.plan()) == List(
path.dive(state, RasPath.INF_DEPTH).map(_.plan()).toList == List(
Binary(n1, Binary(n2, Leaf(n4, 1), Leaf(n5, 1)), Leaf(n3, 1))))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,12 @@ abstract class CyclicSearchSpaceSuite extends AnyFunSuite {
PathFinder.builder(ras, mockState).depth(depth).build().find(can)
}

assert(find(node1, 1).map(p => p.plan()) == List(Unary("node1", Group(0))))
assert(find(node1, 2).map(p => p.plan()) == List(Unary("node1", Leaf("node2", 1))))
assert(find(node1, 3).map(p => p.plan()) == List(Unary("node1", Leaf("node2", 1))))
assert(find(node1, 1).map(p => p.plan()).toList == List(Unary("node1", Group(0))))
assert(find(node1, 2).map(p => p.plan()).toList == List(Unary("node1", Leaf("node2", 1))))
assert(find(node1, 3).map(p => p.plan()).toList == List(Unary("node1", Leaf("node2", 1))))
assert(
find(node1, RasPath.INF_DEPTH).map(p => p.plan()) == List(Unary("node1", Leaf("node2", 1))))
find(node1, RasPath.INF_DEPTH).map(p => p.plan()).toList == List(
Unary("node1", Leaf("node2", 1))))
}

test("Cyclic - find best, simple self cycle") {
Expand Down
Loading

0 comments on commit 2671425

Please sign in to comment.