Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[VL] Port #6746, #6627, #6318, #6397, #6326, #6363 to branch-1.2 #6773

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 69 additions & 1 deletion .github/workflows/velox_docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ concurrency:
jobs:
build-native-lib-centos-7:
runs-on: ubuntu-20.04
container: apache/gluten:gluten-vcpkg-builder_2024_05_29 # centos7 with dependencies installed
container: apache/gluten:gluten-vcpkg-builder_2024_08_05 # centos7 with dependencies installed
steps:
- uses: actions/checkout@v2
- name: Generate cache key
Expand Down Expand Up @@ -1097,6 +1097,74 @@ jobs:
name: golden-files-spark35
path: /tmp/tpch-approved-plan/**

run-spark-test-spark35-scala213:
needs: build-native-lib-centos-8
runs-on: ubuntu-20.04
container: ghcr.io/facebookincubator/velox-dev:centos8
env:
CCACHE_DIR: "${{ github.workspace }}/.ccache"
steps:
- uses: actions/checkout@v2
- name: Download All Artifacts
uses: actions/download-artifact@v2
with:
name: velox-native-lib-centos-8-${{github.sha}}
path: ./cpp/build/releases
- name: Download UDF Example Lib
uses: actions/download-artifact@v2
with:
name: udf-example-lib-centos-8-${{github.sha}}
path: ./cpp/build/velox/udf/examples/
- name: Download Arrow Jars
uses: actions/download-artifact@v2
with:
name: arrow-jars-centos-8-${{github.sha}}
path: /root/.m2/repository/org/apache/arrow/
- name: Update mirror list
run: |
sed -i -e "s|mirrorlist=|#mirrorlist=|g" /etc/yum.repos.d/CentOS-* || true
sed -i -e "s|#baseurl=http://mirror.centos.org|baseurl=http://vault.centos.org|g" /etc/yum.repos.d/CentOS-* || true
- name: Setup build dependency
run: |
yum install sudo patch java-1.8.0-openjdk-devel wget -y
wget https://downloads.apache.org/maven/maven-3/3.8.8/binaries/apache-maven-3.8.8-bin.tar.gz
tar -xvf apache-maven-3.8.8-bin.tar.gz
mv apache-maven-3.8.8 /usr/lib/maven
echo "PATH=${PATH}:/usr/lib/maven/bin" >> $GITHUB_ENV
- name: Get Ccache
uses: actions/cache/restore@v3
with:
path: '${{ env.CCACHE_DIR }}'
key: ccache-centos-release-default
- name: Ensure Cache Dirs Exists
working-directory: ${{ github.workspace }}
run: |
mkdir -p '${{ env.CCACHE_DIR }}'
- name: Prepare spark.test.home for Spark 3.5.1 (other tests)
run: |
cd $GITHUB_WORKSPACE/ && \
wget https://archive.apache.org/dist/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz && \
tar --strip-components=1 -xf spark-3.5.1-bin-hadoop3.tgz spark-3.5.1-bin-hadoop3/jars/ && \
rm -rf spark-3.5.1-bin-hadoop3.tgz && \
mkdir -p $GITHUB_WORKSPACE//shims/spark35/spark_home/assembly/target/scala-2.13 && \
mv jars $GITHUB_WORKSPACE//shims/spark35/spark_home/assembly/target/scala-2.13 && \
cd $GITHUB_WORKSPACE// && \
wget https://github.com/apache/spark/archive/refs/tags/v3.5.1.tar.gz && \
tar --strip-components=1 -xf v3.5.1.tar.gz spark-3.5.1/sql/core/src/test/resources/ && \
mkdir -p shims/spark35/spark_home/ && \
mv sql shims/spark35/spark_home/ && \
dnf module -y install python39 && \
alternatives --set python3 /usr/bin/python3.9 && \
pip3 install setuptools && \
pip3 install pyspark==3.5.1 cython && \
pip3 install pandas pyarrow
- name: Build and Run unit test for Spark 3.5.1 with scala-2.13 (other tests)
run: |
cd $GITHUB_WORKSPACE/
export SPARK_SCALA_VERSION=2.13
$MVN_CMD clean install -Pspark-3.5 -Pscala-2.13 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Pspark-ut -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark35/spark_home/" -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTestTags && \
$MVN_CMD test -Pspark-3.5 -Pscala-2.13 -Pbackends-velox -Piceberg -Pdelta -DtagsToExclude=None -DtagsToInclude=org.apache.gluten.tags.UDFTest

run-spark-test-spark35-slow:
needs: build-native-lib-centos-8
runs-on: ubuntu-20.04
Expand Down
2 changes: 1 addition & 1 deletion .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
runner.dialect = scala212

# Version is required to make sure IntelliJ picks the right version
version = 3.5.9
version = 3.8.3
preset = default

# Max column
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ class ClickhouseOptimisticTransaction(
// 1. insert FakeRowAdaptor
// 2. DeltaInvariantCheckerExec transform
// 3. DeltaTaskStatisticsTracker collect null count / min values / max values
// 4. set the parameters 'staticPartitionWriteOnly', 'isNativeAppliable',
// 4. set the parameters 'staticPartitionWriteOnly', 'isNativeApplicable',
// 'nativeFormat' in the LocalProperty of the sparkcontext
super.writeFiles(inputData, writeOptions, additionalConstraints)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ class ClickhouseOptimisticTransaction(
// 1. insert FakeRowAdaptor
// 2. DeltaInvariantCheckerExec transform
// 3. DeltaTaskStatisticsTracker collect null count / min values / max values
// 4. set the parameters 'staticPartitionWriteOnly', 'isNativeAppliable',
// 4. set the parameters 'staticPartitionWriteOnly', 'isNativeApplicable',
// 'nativeFormat' in the LocalProperty of the sparkcontext
super.writeFiles(inputData, writeOptions, additionalConstraints)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ class ClickhouseOptimisticTransaction(
// 1. insert FakeRowAdaptor
// 2. DeltaInvariantCheckerExec transform
// 3. DeltaTaskStatisticsTracker collect null count / min values / max values
// 4. set the parameters 'staticPartitionWriteOnly', 'isNativeAppliable',
// 4. set the parameters 'staticPartitionWriteOnly', 'isNativeApplicable',
// 'nativeFormat' in the LocalProperty of the sparkcontext
super.writeFiles(inputData, writeOptions, additionalConstraints)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -370,8 +370,9 @@ case class CHHashAggregateExecTransformer(
// Use approxPercentile.nullable as the nullable of the struct type
// to make sure it returns null when input is empty
fields = fields :+ (approxPercentile.child.dataType, approxPercentile.nullable)
fields = fields :+ (approxPercentile.percentageExpression.dataType,
approxPercentile.percentageExpression.nullable)
fields = fields :+ (
approxPercentile.percentageExpression.dataType,
approxPercentile.percentageExpression.nullable)
(makeStructType(fields), attr.nullable)
case _ =>
(makeStructTypeSingleOne(attr.dataType, attr.nullable), attr.nullable)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,13 +391,13 @@ class GlutenParquetFilterSuite
'p_size.int >= 1,
'p_partkey.long.isNotNull,
('p_brand.string === "Brand#12" &&
('p_container.string in ("SM CASE", "SM BOX", "SM PACK", "SM PKG")) &&
('p_container.string.in("SM CASE", "SM BOX", "SM PACK", "SM PKG")) &&
'p_size.int <= 5) ||
('p_brand.string === "Brand#23" &&
('p_container.string in ("MED BAG", "MED BOX", "MED PKG", "MED PACK")) &&
('p_container.string.in("MED BAG", "MED BOX", "MED PKG", "MED PACK")) &&
'p_size.int <= 10) ||
('p_brand.string === "Brand#34" &&
('p_container.string in ("LG CASE", "LG BOX", "LG PACK", "LG PKG")) &&
('p_container.string.in("LG CASE", "LG BOX", "LG PACK", "LG PKG")) &&
'p_size.int <= 15)
)
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ object CHStorageJoinBenchmark extends SqlBasedBenchmark with CHSqlBasedBenchmark
_numRows += batch.numRows
}
Iterator((_numRows, blockNativeWriter.collectAsByteArray()))
// Iterator((_numRows, new Array[Byte](0)))
// Iterator((_numRows, new Array[Byte](0)))
}
.collect
val count0 = countsAndBytes.map(_._1).sum
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat.{DwrfReadFormat, OrcReadFormat, ParquetReadFormat}

import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions.{Alias, CumeDist, DenseRank, Descending, Expression, Lag, Lead, Literal, MakeYMInterval, NamedExpression, NthValue, NTile, PercentRank, Rand, RangeFrame, Rank, RowNumber, SortOrder, SparkPartitionID, SpecialFrameBoundary, SpecifiedWindowFrame, Uuid}
import org.apache.spark.sql.catalyst.expressions.{Alias, CumeDist, DenseRank, Descending, EulerNumber, Expression, Lag, Lead, Literal, MakeYMInterval, NamedExpression, NthValue, NTile, PercentRank, Rand, RangeFrame, Rank, RowNumber, SortOrder, SparkPartitionID, SpecialFrameBoundary, SpecifiedWindowFrame, Uuid}
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, ApproximatePercentile, Count, Sum}
import org.apache.spark.sql.catalyst.plans.{JoinType, LeftOuter, RightOuter}
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
Expand Down Expand Up @@ -430,7 +430,7 @@ object VeloxBackendSettings extends BackendSettingsApi {
expr match {
// Block directly falling back the below functions by FallbackEmptySchemaRelation.
case alias: Alias => checkExpr(alias.child)
case _: Rand | _: Uuid | _: MakeYMInterval | _: SparkPartitionID => true
case _: Rand | _: Uuid | _: MakeYMInterval | _: SparkPartitionID | _: EulerNumber => true
case _ => false
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ object PullOutGenerateProjectHelper extends PullOutProjectHelper {
}
}

newProjections += Alias(CreateArray(fieldArray), generatePreAliasName)()
newProjections += Alias(CreateArray(fieldArray.toSeq), generatePreAliasName)()
}

// Plug in a Project between Generate and its child.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,8 @@ abstract class HashAggregateExecTransformer(
childNodes.add(expressionNode)
}
}
exprNodes.add(getRowConstructNode(args, childNodes, newInputAttributes, aggFunc))
exprNodes.add(
getRowConstructNode(args, childNodes, newInputAttributes.toSeq, aggFunc))
case other =>
throw new GlutenNotSupportException(s"$other is not supported.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import java.io.{DataInputStream, DataOutputStream}
import java.net.Socket
import java.util.concurrent.atomic.AtomicBoolean

import scala.collection.{mutable, Seq}
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

class ColumnarArrowPythonRunner(
Expand All @@ -54,7 +54,7 @@ class ColumnarArrowPythonRunner(
schema: StructType,
timeZoneId: String,
conf: Map[String, String])
extends BasePythonRunnerShim(funcs, evalType, argOffsets) {
extends BasePythonRunnerShim(funcs.toSeq, evalType, argOffsets) {

override val simplifiedTraceback: Boolean = SQLConf.get.pysparkSimplifiedTraceback

Expand Down Expand Up @@ -239,7 +239,7 @@ case class ColumnarArrowEvalPythonExec(
val arrowSafeTypeCheck = Seq(
SQLConf.PANDAS_ARROW_SAFE_TYPE_CONVERSION.key ->
conf.arrowSafeTypeConversion.toString)
Map(timeZoneConf ++ pandasColsByName ++ arrowSafeTypeCheck: _*)
Map(timeZoneConf.toSeq ++ pandasColsByName.toSeq ++ arrowSafeTypeCheck: _*)
}

private val pythonRunnerConf = getPythonRunnerConfMap(conf)
Expand Down Expand Up @@ -280,7 +280,7 @@ case class ColumnarArrowEvalPythonExec(
case children =>
// There should not be any other UDFs, or the children can't be evaluated directly.
assert(children.forall(_.find(_.isInstanceOf[PythonUDF]).isEmpty))
(ChainedPythonFunctions(Seq(udf.func)), udf.children)
(ChainedPythonFunctions(Seq(udf.func).toSeq), udf.children)
}
}

Expand Down Expand Up @@ -410,7 +410,7 @@ object PullOutArrowEvalPythonPreProjectHelper extends PullOutProjectHelper {
val (chained, children) = collectFunctions(u)
(ChainedPythonFunctions(chained.funcs ++ Seq(udf.func)), children)
case children =>
(ChainedPythonFunctions(Seq(udf.func)), udf.children)
(ChainedPythonFunctions(Seq(udf.func).toSeq), udf.children)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,12 +116,12 @@ case class UDFExpression(
object UDFResolver extends Logging {
private val UDFNames = mutable.HashSet[String]()
// (udf_name, arg1, arg2, ...) => return type
private val UDFMap = mutable.HashMap[String, mutable.MutableList[UDFSignature]]()
private val UDFMap = mutable.HashMap[String, mutable.ListBuffer[UDFSignature]]()

private val UDAFNames = mutable.HashSet[String]()
// (udaf_name, arg1, arg2, ...) => return type, intermediate attributes
private val UDAFMap =
mutable.HashMap[String, mutable.MutableList[UDAFSignature]]()
mutable.HashMap[String, mutable.ListBuffer[UDAFSignature]]()

private val LIB_EXTENSION = ".so"

Expand All @@ -145,7 +145,7 @@ object UDFResolver extends Logging {
variableArity: Boolean): Unit = {
assert(argTypes.dataType.isInstanceOf[StructType])
val v =
UDFMap.getOrElseUpdate(name, mutable.MutableList[UDFSignature]())
UDFMap.getOrElseUpdate(name, mutable.ListBuffer[UDFSignature]())
v += UDFSignature(
returnType,
argTypes.dataType.asInstanceOf[StructType].fields.map(_.dataType),
Expand Down Expand Up @@ -189,7 +189,7 @@ object UDFResolver extends Logging {
}

val v =
UDAFMap.getOrElseUpdate(name, mutable.MutableList[UDAFSignature]())
UDAFMap.getOrElseUpdate(name, mutable.ListBuffer[UDAFSignature]())
v += UDAFSignature(
returnType,
argTypes.dataType.asInstanceOf[StructType].fields.map(_.dataType),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -672,6 +672,16 @@ class ScalarFunctionsValidateSuite extends FunctionsValidateTest {
}
}

test("Test E function") {
runQueryAndCompare("""SELECT E() from lineitem limit 100""".stripMargin) {
checkGlutenOperatorMatch[ProjectExecTransformer]
}
runQueryAndCompare("""SELECT E(), l_orderkey
| from lineitem limit 100""".stripMargin) {
checkGlutenOperatorMatch[ProjectExecTransformer]
}
}

test("Test spark_partition_id function") {
runQueryAndCompare("""SELECT spark_partition_id(), l_orderkey
| from lineitem limit 100""".stripMargin) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1458,9 +1458,11 @@ class TestOperator extends VeloxWholeStageTransformerSuite with AdaptiveSparkPla
path =>
(0 to 3).toDF("x").write.parquet(path.getCanonicalPath)
spark.read.parquet(path.getCanonicalPath).createOrReplaceTempView("view")
runQueryAndCompare(
"SELECT x FROM view WHERE cast(x as timestamp) " +
"IN ('1970-01-01 08:00:00.001','1970-01-01 08:00:00.2')")(_)
runQueryAndCompare(s"""
|SELECT x FROM view
|WHERE cast(x as timestamp)
|IN ('1970-01-01 08:00:00.001','1970-01-01 08:00:00.2')
|""".stripMargin)(_ => ())
}
}

Expand Down
1 change: 0 additions & 1 deletion ep/build-velox/src/get_velox.sh
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,6 @@ function process_setup_alinux3 {
sed -i 's/python39 python39-devel python39-pip //g' scripts/setup-centos8.sh
sed -i "s/.*pip.* install/#&/" scripts/setup-centos8.sh
sed -i 's/ADDITIONAL_FLAGS=""/ADDITIONAL_FLAGS="-Wno-stringop-overflow"/g' scripts/setup-helper-functions.sh
sed -i "s/\${CMAKE_INSTALL_LIBDIR}/lib64/" third_party/CMakeLists.txt
}

function process_setup_tencentos32 {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ private NamedStruct buildNamedStruct() {
for (StructField field : fileSchema.fields()) {
structBuilder.addTypes(
ConverterUtils.getTypeNode(field.dataType(), field.nullable()).toProtobuf());
namedStructBuilder.addNames(field.name());
namedStructBuilder.addNames(ConverterUtils.normalizeColName(field.name()));
}
namedStructBuilder.setStruct(structBuilder.build());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,8 @@ object ExpressionConverter extends SQLConfHelper with Logging {
t.children.map(replaceWithExpressionTransformerInternal(_, attributeSeq, expressionsMap)),
t
)
case e: EulerNumber =>
LiteralTransformer(Literal(Math.E))
case expr =>
GenericExpressionTransformer(
substraitExprName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,27 +163,25 @@ object GlutenWriterColumnarRules {
BackendsApiManager.getSettings.enableNativeWriteFiles() =>
injectFakeRowAdaptor(rc, rc.child)
case rc @ DataWritingCommandExec(cmd, child) =>
// These properties can be set by the same thread in last query submission.
session.sparkContext.setLocalProperty("isNativeApplicable", null)
session.sparkContext.setLocalProperty("nativeFormat", null)
session.sparkContext.setLocalProperty("staticPartitionWriteOnly", null)
if (BackendsApiManager.getSettings.supportNativeWrite(child.output.toStructType.fields)) {
val format = getNativeFormat(cmd)
session.sparkContext.setLocalProperty(
"staticPartitionWriteOnly",
BackendsApiManager.getSettings.staticPartitionWriteOnly().toString)
// FIXME: We should only use context property if having no other approaches.
// Should see if there is another way to pass these options.
session.sparkContext.setLocalProperty("isNativeAppliable", format.isDefined.toString)
session.sparkContext.setLocalProperty("isNativeApplicable", format.isDefined.toString)
session.sparkContext.setLocalProperty("nativeFormat", format.getOrElse(""))
if (format.isDefined) {
injectFakeRowAdaptor(rc, child)
} else {
rc.withNewChildren(rc.children.map(apply))
}
} else {
session.sparkContext.setLocalProperty(
"staticPartitionWriteOnly",
BackendsApiManager.getSettings.staticPartitionWriteOnly().toString)
session.sparkContext.setLocalProperty("isNativeAppliable", "false")
session.sparkContext.setLocalProperty("nativeFormat", "")

rc.withNewChildren(rc.children.map(apply))
}
case plan: SparkPlan => plan.withNewChildren(plan.children.map(apply))
Expand Down
Loading
Loading