Skip to content

Commit

Permalink
Merge branch 'main' into gluten_6860
Browse files Browse the repository at this point in the history
  • Loading branch information
taiyang-li authored Aug 15, 2024
2 parents 26ddf7f + 5897d34 commit 97edf84
Show file tree
Hide file tree
Showing 514 changed files with 7,393 additions and 3,596 deletions.
72 changes: 70 additions & 2 deletions .github/workflows/velox_docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ jobs:
-d=FLUSH_MODE:DISABLED,spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false,spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemoryRatio=1.0,spark.gluten.sql.columnar.backend.velox.maxExtendedPartialAggregationMemoryRatio=1.0,spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinPct=100,spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinRows=0 \
-d=FLUSH_MODE:ABANDONED,spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemoryRatio=1.0,spark.gluten.sql.columnar.backend.velox.maxExtendedPartialAggregationMemoryRatio=1.0,spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinPct=0,spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinRows=0 \
-d=FLUSH_MODE:FLUSHED,spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemoryRatio=0.05,spark.gluten.sql.columnar.backend.velox.maxExtendedPartialAggregationMemoryRatio=0.1,spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinPct=100,spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinRows=0
- name: (To be fixed) TPC-DS SF30.0 Parquet local spark3.2 Q23A/Q23B low memory, memory isolation on # Disabled as error https://gist.github.com/zhztheplayer/abd5e83ccdc48730678ae7ebae479fcc
- name: TPC-DS SF30.0 Parquet local spark3.2 Q23A/Q23B low memory, memory isolation on
run: |
cd tools/gluten-it \
&& GLUTEN_IT_JVM_ARGS=-Xmx3G sbin/gluten-it.sh parameterized \
Expand All @@ -346,7 +346,7 @@ jobs:
-d=OFFHEAP_SIZE:2g,spark.memory.offHeap.size=2g \
-d=FLUSH_MODE:DISABLED,spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false,spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemoryRatio=1.0,spark.gluten.sql.columnar.backend.velox.maxExtendedPartialAggregationMemoryRatio=1.0,spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinPct=100,spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinRows=0 \
-d=FLUSH_MODE:ABANDONED,spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemoryRatio=1.0,spark.gluten.sql.columnar.backend.velox.maxExtendedPartialAggregationMemoryRatio=1.0,spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinPct=0,spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinRows=0 \
-d=FLUSH_MODE:FLUSHED,spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemoryRatio=0.05,spark.gluten.sql.columnar.backend.velox.maxExtendedPartialAggregationMemoryRatio=0.1,spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinPct=100,spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinRows=0 || true
-d=FLUSH_MODE:FLUSHED,spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemoryRatio=0.05,spark.gluten.sql.columnar.backend.velox.maxExtendedPartialAggregationMemoryRatio=0.1,spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinPct=100,spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinRows=0
- name: TPC-DS SF30.0 Parquet local spark3.2 Q97 low memory
run: |
cd tools/gluten-it \
Expand Down Expand Up @@ -1083,6 +1083,74 @@ jobs:
name: golden-files-spark35
path: /tmp/tpch-approved-plan/**

run-spark-test-spark35-scala213:
needs: build-native-lib-centos-8
runs-on: ubuntu-20.04
container: ghcr.io/facebookincubator/velox-dev:centos8
env:
CCACHE_DIR: "${{ github.workspace }}/.ccache"
steps:
- uses: actions/checkout@v2
- name: Download All Artifacts
uses: actions/download-artifact@v2
with:
name: velox-native-lib-centos-8-${{github.sha}}
path: ./cpp/build/releases
- name: Download UDF Example Lib
uses: actions/download-artifact@v2
with:
name: udf-example-lib-centos-8-${{github.sha}}
path: ./cpp/build/velox/udf/examples/
- name: Download Arrow Jars
uses: actions/download-artifact@v2
with:
name: arrow-jars-centos-8-${{github.sha}}
path: /root/.m2/repository/org/apache/arrow/
- name: Update mirror list
run: |
sed -i -e "s|mirrorlist=|#mirrorlist=|g" /etc/yum.repos.d/CentOS-* || true
sed -i -e "s|#baseurl=http://mirror.centos.org|baseurl=http://vault.centos.org|g" /etc/yum.repos.d/CentOS-* || true
- name: Setup build dependency
run: |
yum install sudo patch java-1.8.0-openjdk-devel wget -y
wget https://downloads.apache.org/maven/maven-3/3.8.8/binaries/apache-maven-3.8.8-bin.tar.gz
tar -xvf apache-maven-3.8.8-bin.tar.gz
mv apache-maven-3.8.8 /usr/lib/maven
echo "PATH=${PATH}:/usr/lib/maven/bin" >> $GITHUB_ENV
- name: Get Ccache
uses: actions/cache/restore@v3
with:
path: '${{ env.CCACHE_DIR }}'
key: ccache-centos-release-default
- name: Ensure Cache Dirs Exists
working-directory: ${{ github.workspace }}
run: |
mkdir -p '${{ env.CCACHE_DIR }}'
- name: Prepare spark.test.home for Spark 3.5.1 (other tests)
run: |
cd $GITHUB_WORKSPACE/ && \
wget https://archive.apache.org/dist/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz && \
tar --strip-components=1 -xf spark-3.5.1-bin-hadoop3.tgz spark-3.5.1-bin-hadoop3/jars/ && \
rm -rf spark-3.5.1-bin-hadoop3.tgz && \
mkdir -p $GITHUB_WORKSPACE//shims/spark35/spark_home/assembly/target/scala-2.13 && \
mv jars $GITHUB_WORKSPACE//shims/spark35/spark_home/assembly/target/scala-2.13 && \
cd $GITHUB_WORKSPACE// && \
wget https://github.com/apache/spark/archive/refs/tags/v3.5.1.tar.gz && \
tar --strip-components=1 -xf v3.5.1.tar.gz spark-3.5.1/sql/core/src/test/resources/ && \
mkdir -p shims/spark35/spark_home/ && \
mv sql shims/spark35/spark_home/ && \
dnf module -y install python39 && \
alternatives --set python3 /usr/bin/python3.9 && \
pip3 install setuptools && \
pip3 install pyspark==3.5.1 cython && \
pip3 install pandas pyarrow
- name: Build and Run unit test for Spark 3.5.1 with scala-2.13 (other tests)
run: |
cd $GITHUB_WORKSPACE/
export SPARK_SCALA_VERSION=2.13
$MVN_CMD clean install -Pspark-3.5 -Pscala-2.13 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Pspark-ut -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark35/spark_home/" -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTestTags && \
$MVN_CMD test -Pspark-3.5 -Pscala-2.13 -Pbackends-velox -Piceberg -Pdelta -DtagsToExclude=None -DtagsToInclude=org.apache.gluten.tags.UDFTest
run-spark-test-spark35-slow:
needs: build-native-lib-centos-8
runs-on: ubuntu-20.04
Expand Down
2 changes: 1 addition & 1 deletion .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
runner.dialect = scala212

# Version is required to make sure IntelliJ picks the right version
version = 3.5.9
version = 3.8.3
preset = default

# Max column
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution

import org.apache.gluten.exception.GlutenNotSupportException

import org.apache.spark.internal.io.FileCommitProtocol
import org.apache.spark.sql.execution.datasources.WriteJobDescription

object CHDeltaColumnarWrite {
def apply(
jobTrackerID: String,
description: WriteJobDescription,
committer: FileCommitProtocol): CHColumnarWrite[FileCommitProtocol] =
throw new GlutenNotSupportException("Delta Native is not supported in Spark 3.2")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution

import org.apache.gluten.exception.GlutenNotSupportException

import org.apache.spark.internal.io.FileCommitProtocol
import org.apache.spark.sql.execution.datasources.WriteJobDescription

object CHDeltaColumnarWrite {
def apply(
jobTrackerID: String,
description: WriteJobDescription,
committer: FileCommitProtocol): CHColumnarWrite[FileCommitProtocol] =
throw new GlutenNotSupportException("Delta Native is not supported in Spark 3.3")
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,21 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.delta.actions._
import org.apache.spark.sql.delta.catalog.ClickHouseTableV2
import org.apache.spark.sql.delta.constraints.{Constraint, Constraints}
import org.apache.spark.sql.delta.files.MergeTreeCommitProtocol
import org.apache.spark.sql.delta.schema.InvariantViolationException
import org.apache.spark.sql.delta.files.{DelayedCommitProtocol, DeltaFileFormatWriter, MergeTreeCommitProtocol, TransactionalWrite}
import org.apache.spark.sql.delta.hooks.AutoCompact
import org.apache.spark.sql.delta.schema.{InnerInvariantViolationException, InvariantViolationException}
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
import org.apache.spark.sql.delta.stats.DeltaJobStatisticsTracker
import org.apache.spark.sql.execution.{CHDelayedCommitProtocol, QueryExecution, SparkPlan, SQLExecution}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, FakeRowAdaptor, FileFormatWriter, WriteJobStatsTracker}
import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, FakeRowAdaptor, FileFormatWriter, WriteFiles, WriteJobStatsTracker}
import org.apache.spark.sql.execution.datasources.v1.clickhouse.MergeTreeFileFormatWriter
import org.apache.spark.sql.execution.datasources.v2.clickhouse.ClickHouseConfig
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.util.SerializableConfiguration

import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.hadoop.fs.Path

import scala.collection.mutable.ListBuffer

Expand Down Expand Up @@ -190,4 +194,158 @@ class ClickhouseOptimisticTransaction(
super.writeFiles(inputData, writeOptions, additionalConstraints)
}
}

private def shouldOptimizeWrite(
writeOptions: Option[DeltaOptions],
sessionConf: SQLConf): Boolean = {
writeOptions
.flatMap(_.optimizeWrite)
.getOrElse(TransactionalWrite.shouldOptimizeWrite(metadata, sessionConf))
}

override protected def getCommitter(outputPath: Path): DelayedCommitProtocol =
new CHDelayedCommitProtocol("delta", outputPath.toString, None, deltaDataSubdir)

override def writeFiles(
inputData: Dataset[_],
writeOptions: Option[DeltaOptions],
isOptimize: Boolean,
additionalConstraints: Seq[Constraint]): Seq[FileAction] = {

if (isOptimize)
throw new UnsupportedOperationException("Optimize is not supported for ClickHouse")

hasWritten = true

val spark = inputData.sparkSession
val (data, partitionSchema) = performCDCPartition(inputData)
val outputPath = deltaLog.dataPath

val fileFormat = deltaLog.fileFormat(protocol, metadata) // TODO support changing formats.

// Iceberg spec requires partition columns in data files
val writePartitionColumns = IcebergCompat.isAnyEnabled(metadata)
// Retain only a minimal selection of Spark writer options to avoid any potential
// compatibility issues
val options = (writeOptions match {
case None => Map.empty[String, String]
case Some(writeOptions) =>
writeOptions.options.filterKeys {
key =>
key.equalsIgnoreCase(DeltaOptions.MAX_RECORDS_PER_FILE) ||
key.equalsIgnoreCase(DeltaOptions.COMPRESSION)
}.toMap
}) + (DeltaOptions.WRITE_PARTITION_COLUMNS -> writePartitionColumns.toString)

val (normalQueryExecution, output, generatedColumnConstraints, _) =
normalizeData(deltaLog, writeOptions, data)
val partitioningColumns = getPartitioningColumns(partitionSchema, output)

val logicalPlan = normalQueryExecution.optimizedPlan
val write =
WriteFiles(logicalPlan, fileFormat, partitioningColumns, None, options, Map.empty)

val queryExecution = new QueryExecution(spark, write)
val committer = getCommitter(outputPath)

// If Statistics Collection is enabled, then create a stats tracker that will be injected during
// the FileFormatWriter.write call below and will collect per-file stats using
// StatisticsCollection
// val (optionalStatsTracker, _) =
// getOptionalStatsTrackerAndStatsCollection(output, outputPath, partitionSchema, data)
val optionalStatsTracker: Option[DeltaJobStatisticsTracker] = None

val constraints =
Constraints.getAll(metadata, spark) ++ generatedColumnConstraints ++ additionalConstraints

SQLExecution.withNewExecutionId(queryExecution, Option("deltaTransactionalWrite")) {
val outputSpec = FileFormatWriter.OutputSpec(outputPath.toString, Map.empty, output)

val physicalPlan = materializeAdaptiveSparkPlan(queryExecution.executedPlan)
// convertEmptyToNullIfNeeded(queryExecution.executedPlan, partitioningColumns, constraints)
/* val checkInvariants = DeltaInvariantCheckerExec(empty2NullPlan, constraints)
// No need to plan optimized write if the write command is OPTIMIZE, which aims to produce
// evenly-balanced data files already.
val physicalPlan =
if (
!isOptimize &&
shouldOptimizeWrite(writeOptions, spark.sessionState.conf)
) {
DeltaOptimizedWriterExec(checkInvariants, metadata.partitionColumns, deltaLog)
} else {
checkInvariants
}*/
val statsTrackers: ListBuffer[WriteJobStatsTracker] = ListBuffer()

if (spark.conf.get(DeltaSQLConf.DELTA_HISTORY_METRICS_ENABLED)) {
val basicWriteJobStatsTracker = new BasicWriteJobStatsTracker(
new SerializableConfiguration(deltaLog.newDeltaHadoopConf()),
BasicWriteJobStatsTracker.metrics)
registerSQLMetrics(spark, basicWriteJobStatsTracker.driverSideMetrics)
statsTrackers.append(basicWriteJobStatsTracker)
}

try {
DeltaFileFormatWriter.write(
sparkSession = spark,
plan = physicalPlan,
fileFormat = fileFormat,
committer = committer,
outputSpec = outputSpec,
// scalastyle:off deltahadoopconfiguration
hadoopConf =
spark.sessionState.newHadoopConfWithOptions(metadata.configuration ++ deltaLog.options),
// scalastyle:on deltahadoopconfiguration
partitionColumns = partitioningColumns,
bucketSpec = None,
statsTrackers = optionalStatsTracker.toSeq
++ statsTrackers,
options = options
)
} catch {
case InnerInvariantViolationException(violationException) =>
// Pull an InvariantViolationException up to the top level if it was the root cause.
throw violationException
}
}

var resultFiles =
(if (optionalStatsTracker.isDefined) {
committer.addedStatuses.map {
a =>
a.copy(stats =
optionalStatsTracker.map(_.recordedStats(a.toPath.getName)).getOrElse(a.stats))
}
} else {
committer.addedStatuses
})
.filter {
// In some cases, we can write out an empty `inputData`. Some examples of this (though, they
// may be fixed in the future) are the MERGE command when you delete with empty source, or
// empty target, or on disjoint tables. This is hard to catch before the write without
// collecting the DF ahead of time. Instead, we can return only the AddFiles that
// a) actually add rows, or
// b) don't have any stats so we don't know the number of rows at all
case a: AddFile => a.numLogicalRecords.forall(_ > 0)
case _ => true
}

// add [[AddFile.Tags.ICEBERG_COMPAT_VERSION.name]] tags to addFiles
if (IcebergCompatV2.isEnabled(metadata)) {
resultFiles = resultFiles.map {
addFile =>
val tags = if (addFile.tags != null) addFile.tags else Map.empty[String, String]
addFile.copy(tags = tags + (AddFile.Tags.ICEBERG_COMPAT_VERSION.name -> "2"))
}
}

if (resultFiles.nonEmpty && !isOptimize) registerPostCommitHook(AutoCompact)

resultFiles.toSeq ++ committer.changeFiles
}

private def materializeAdaptiveSparkPlan(plan: SparkPlan): SparkPlan = plan match {
case a: AdaptiveSparkPlanExec => a.finalPhysicalPlan
case p: SparkPlan => p
}
}
Loading

0 comments on commit 97edf84

Please sign in to comment.