Skip to content

Commit

Permalink
Merge branch 'main' into 2024_12_17
Browse files Browse the repository at this point in the history
  • Loading branch information
zhouyuan authored Dec 17, 2024
2 parents 34e1c54 + ea0e175 commit 1b14e74
Show file tree
Hide file tree
Showing 233 changed files with 1,683 additions and 1,094 deletions.
29 changes: 29 additions & 0 deletions backends-clickhouse/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,35 @@
<name>Gluten Backends ClickHouse</name>

<profiles>
<profile>
<id>celeborn</id>
<activation>
<activeByDefault>false</activeByDefault>
</activation>
<dependencies>
<dependency>
<groupId>org.apache.gluten</groupId>
<artifactId>gluten-celeborn</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.celeborn</groupId>
<artifactId>celeborn-client-spark-${spark.major.version}-shaded_${scala.binary.version}</artifactId>
<version>${celeborn.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.apache.celeborn</groupId>
<artifactId>celeborn-client-spark-${spark.major.version}_${scala.binary.version}</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.celeborn</groupId>
<artifactId>celeborn-spark-${spark.major.version}-columnar-shuffle_${scala.binary.version}</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</profile>
<profile>
<id>iceberg</id>
<activation>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ class GlutenClickHouseRSSColumnarMemorySortShuffleSuite
override protected val tablesPath: String = basePath + "/tpch-data-ch"
override protected val tpchQueries: String = rootPath + "queries/tpch-queries-ch"
override protected val queriesResults: String =
rootPath + "../../../../../backends-clickhouse/src/test/resources/mergetree-queries-output"
rootPath + "../../../../backends-clickhouse/src/test/resources/mergetree-queries-output"

override protected val parquetTableDataPath: String =
"../../../../../gluten-core/src/test/resources/tpch-data"
"../../../../gluten-core/src/test/resources/tpch-data"

/** Run Gluten + ClickHouse Backend with ColumnarShuffleManager */
override protected def sparkConf: SparkConf = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ class GlutenClickHouseRSSColumnarShuffleAQESuite
override protected val tablesPath: String = basePath + "/tpch-data-ch"
override protected val tpchQueries: String = rootPath + "queries/tpch-queries-ch"
override protected val queriesResults: String =
rootPath + "../../../../../backends-clickhouse/src/test/resources/mergetree-queries-output"
rootPath + "../../../../backends-clickhouse/src/test/resources/mergetree-queries-output"

override protected val parquetTableDataPath: String =
"../../../../../gluten-core/src/test/resources/tpch-data"
"../../../../gluten-core/src/test/resources/tpch-data"

/** Run Gluten + ClickHouse Backend with ColumnarShuffleManager */
override protected def sparkConf: SparkConf = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,8 @@ import org.apache.spark.sql.delta.files._
import org.apache.spark.sql.delta.hooks.AutoCompact
import org.apache.spark.sql.delta.schema.{InnerInvariantViolationException, InvariantViolationException}
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.execution.{QueryExecution, SparkPlan, SQLExecution}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, FileFormatWriter, GlutenWriterColumnarRules, WriteFiles, WriteJobStatsTracker}
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, DeltaV1Writes, FileFormatWriter, GlutenWriterColumnarRules, WriteJobStatsTracker}
import org.apache.spark.sql.execution.datasources.v1.MergeTreeWriterInjects
import org.apache.spark.sql.execution.datasources.v1.clickhouse.MergeTreeFileFormatWriter
import org.apache.spark.sql.execution.datasources.v2.clickhouse.ClickHouseConfig
Expand Down Expand Up @@ -229,31 +228,12 @@ class ClickhouseOptimisticTransaction(
val (data, partitionSchema) = performCDCPartition(inputData)
val outputPath = deltaLog.dataPath

val fileFormat = deltaLog.fileFormat(protocol, metadata) // TODO support changing formats.

// Iceberg spec requires partition columns in data files
val writePartitionColumns = IcebergCompat.isAnyEnabled(metadata)
// Retain only a minimal selection of Spark writer options to avoid any potential
// compatibility issues
val options = (writeOptions match {
case None => Map.empty[String, String]
case Some(writeOptions) =>
writeOptions.options.filterKeys {
key =>
key.equalsIgnoreCase(DeltaOptions.MAX_RECORDS_PER_FILE) ||
key.equalsIgnoreCase(DeltaOptions.COMPRESSION)
}.toMap
}) + (DeltaOptions.WRITE_PARTITION_COLUMNS -> writePartitionColumns.toString)

val (normalQueryExecution, output, generatedColumnConstraints, _) =
val (queryExecution, output, generatedColumnConstraints, _) =
normalizeData(deltaLog, writeOptions, data)
val partitioningColumns = getPartitioningColumns(partitionSchema, output)

val logicalPlan = normalQueryExecution.optimizedPlan
val write =
WriteFiles(logicalPlan, fileFormat, partitioningColumns, None, options, Map.empty)
val fileFormat = deltaLog.fileFormat(protocol, metadata) // TODO support changing formats.

val queryExecution = new QueryExecution(spark, write)
val (committer, collectStats) = fileFormat.toString match {
case "MergeTree" => (getCommitter2(outputPath), false)
case _ => (getCommitter(outputPath), true)
Expand All @@ -274,20 +254,24 @@ class ClickhouseOptimisticTransaction(
SQLExecution.withNewExecutionId(queryExecution, Option("deltaTransactionalWrite")) {
val outputSpec = FileFormatWriter.OutputSpec(outputPath.toString, Map.empty, output)

val physicalPlan = materializeAdaptiveSparkPlan(queryExecution.executedPlan)
// convertEmptyToNullIfNeeded(queryExecution.executedPlan, partitioningColumns, constraints)
/* val checkInvariants = DeltaInvariantCheckerExec(empty2NullPlan, constraints)
val empty2NullPlan =
convertEmptyToNullIfNeeded(queryExecution.sparkPlan, partitioningColumns, constraints)
// TODO: val checkInvariants = DeltaInvariantCheckerExec(empty2NullPlan, constraints)
val checkInvariants = empty2NullPlan

// No need to plan optimized write if the write command is OPTIMIZE, which aims to produce
// evenly-balanced data files already.
val physicalPlan =
if (
!isOptimize &&
shouldOptimizeWrite(writeOptions, spark.sessionState.conf)
) {
DeltaOptimizedWriterExec(checkInvariants, metadata.partitionColumns, deltaLog)
} else {
checkInvariants
} */
// TODO: val physicalPlan =
// if (
// !isOptimize &&
// shouldOptimizeWrite(writeOptions, spark.sessionState.conf)
// ) {
// DeltaOptimizedWriterExec(checkInvariants, metadata.partitionColumns, deltaLog)
// } else {
// checkInvariants
// }
val physicalPlan = checkInvariants

val statsTrackers: ListBuffer[WriteJobStatsTracker] = ListBuffer()

if (spark.conf.get(DeltaSQLConf.DELTA_HISTORY_METRICS_ENABLED)) {
Expand All @@ -298,10 +282,33 @@ class ClickhouseOptimisticTransaction(
statsTrackers.append(basicWriteJobStatsTracker)
}

// Iceberg spec requires partition columns in data files
val writePartitionColumns = IcebergCompat.isAnyEnabled(metadata)
// Retain only a minimal selection of Spark writer options to avoid any potential
// compatibility issues
val options = (writeOptions match {
case None => Map.empty[String, String]
case Some(writeOptions) =>
writeOptions.options.filterKeys {
key =>
key.equalsIgnoreCase(DeltaOptions.MAX_RECORDS_PER_FILE) ||
key.equalsIgnoreCase(DeltaOptions.COMPRESSION)
}.toMap
}) + (DeltaOptions.WRITE_PARTITION_COLUMNS -> writePartitionColumns.toString)

val executedPlan = DeltaV1Writes(
spark,
physicalPlan,
fileFormat,
partitioningColumns,
None,
options
).executedPlan

try {
DeltaFileFormatWriter.write(
sparkSession = spark,
plan = physicalPlan,
plan = executedPlan,
fileFormat = fileFormat,
committer = committer,
outputSpec = outputSpec,
Expand Down Expand Up @@ -358,8 +365,4 @@ class ClickhouseOptimisticTransaction(
resultFiles.toSeq ++ committer.changeFiles
}

private def materializeAdaptiveSparkPlan(plan: SparkPlan): SparkPlan = plan match {
case a: AdaptiveSparkPlanExec => a.finalPhysicalPlan
case p: SparkPlan => p
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ case class FileDeltaColumnarWrite(
// stats.map(row => x.apply(row).getString(0)).foreach(println)
// process stats
val commitInfo = DeltaFileCommitInfo(committer)
val basicNativeStat = NativeBasicWriteTaskStatsTracker(description, basicWriteJobStatsTracker)
val basicNativeStat =
NativeBasicWriteTaskStatsTracker(description.path, basicWriteJobStatsTracker)
val basicNativeStats = Seq(commitInfo, basicNativeStat)
NativeStatCompute(stats)(basicNativeStats, nativeDeltaStats)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.datasources
import org.apache.gluten.backendsapi.BackendsApiManager

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder}
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
import org.apache.spark.sql.execution.{QueryExecution, SortExec, SparkPlan}
import org.apache.spark.sql.execution.datasources.V1WritesUtils.isOrderingMatched

case class DeltaV1Writes(
spark: SparkSession,
query: SparkPlan,
fileFormat: FileFormat,
partitionColumns: Seq[Attribute],
bucketSpec: Option[BucketSpec],
options: Map[String, String],
staticPartitions: TablePartitionSpec = Map.empty) {

require(fileFormat != null, "FileFormat is required to write files.")
require(BackendsApiManager.getSettings.enableNativeWriteFiles())

private lazy val requiredOrdering: Seq[SortOrder] =
V1WritesUtils.getSortOrder(
query.output,
partitionColumns,
bucketSpec,
options,
staticPartitions.size)

lazy val sortPlan: SparkPlan = {
val outputOrdering = query.outputOrdering
val orderingMatched = isOrderingMatched(requiredOrdering.map(_.child), outputOrdering)
if (orderingMatched) {
query
} else {
SortExec(requiredOrdering, global = false, query)
}
}

lazy val writePlan: SparkPlan =
WriteFilesExec(
sortPlan,
fileFormat = fileFormat,
partitionColumns = partitionColumns,
bucketSpec = bucketSpec,
options = options,
staticPartitions = staticPartitions)

lazy val executedPlan: SparkPlan =
CallTransformer(spark, writePlan).executedPlan
}

case class CallTransformer(spark: SparkSession, physicalPlan: SparkPlan)
extends QueryExecution(spark, LocalRelation()) {
override lazy val sparkPlan: SparkPlan = physicalPlan
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.datasources

import org.apache.gluten.GlutenConfig
import org.apache.gluten.execution.{GlutenClickHouseWholeStageTransformerSuite, GlutenPlan, SortExecTransformer}
import org.apache.spark.SparkConf
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.{SortExec, SparkPlan}

class DeltaV1WritesSuite extends GlutenClickHouseWholeStageTransformerSuite {

import testImplicits._

override protected def sparkConf: SparkConf = {
super.sparkConf
.set(GlutenConfig.NATIVE_WRITER_ENABLED.key, "true")
}

override def beforeAll(): Unit = {
super.beforeAll()
(0 to 20)
.map(i => (i, i % 5, (i % 10).toString))
.toDF("i", "j", "k")
.write
.saveAsTable("t0")
}

override def afterAll(): Unit = {
sql("drop table if exists t0")
super.afterAll()
}

val format = new ParquetFileFormat
def getSort(child: SparkPlan): Option[SortExecTransformer] = {
child.collectFirst { case w: SortExecTransformer => w }
}
test("don't add sort when the required ordering is empty") {
val df = sql("select * from t0")
val plan = df.queryExecution.sparkPlan
val writes = DeltaV1Writes(spark, plan, format, Nil, None, Map.empty)
assert(writes.sortPlan === plan)
assert(writes.writePlan != null)
assert(writes.executedPlan.isInstanceOf[GlutenPlan])
val writeFilesOpt = V1WritesUtils.getWriteFilesOpt(writes.executedPlan)
assert(writeFilesOpt.isDefined)
val sortExec = getSort(writes.executedPlan)
assert(sortExec.isEmpty)
}

test("don't add sort when the required ordering is already satisfied") {
val df = sql("select * from t0")
def check(plan: SparkPlan): Unit = {
val partitionColumns = plan.output.find(_.name == "k").toSeq
val writes = DeltaV1Writes(spark, plan, format, partitionColumns, None, Map.empty)
assert(writes.sortPlan === plan)
assert(writes.writePlan != null)
assert(writes.executedPlan.isInstanceOf[GlutenPlan])
val writeFilesOpt = V1WritesUtils.getWriteFilesOpt(writes.executedPlan)
assert(writeFilesOpt.isDefined)
val sortExec = getSort(writes.executedPlan)
assert(sortExec.isDefined)
}
check(df.orderBy("k").queryExecution.sparkPlan)
check(df.orderBy("k", "j").queryExecution.sparkPlan)
}

test("add sort when the required ordering is not satisfied") {
val df = sql("select * from t0")
def check(plan: SparkPlan): Unit = {
val partitionColumns = plan.output.find(_.name == "k").toSeq
val writes = DeltaV1Writes(spark, plan, format, partitionColumns, None, Map.empty)
val sort = writes.sortPlan.asInstanceOf[SortExec]
assert(sort.child === plan)
assert(writes.writePlan != null)
assert(writes.executedPlan.isInstanceOf[GlutenPlan])
val writeFilesOpt = V1WritesUtils.getWriteFilesOpt(writes.executedPlan)
assert(writeFilesOpt.isDefined)
val sortExec = getSort(writes.executedPlan)
assert(sortExec.isDefined, s"writes.executedPlan: ${writes.executedPlan}")
}
check(df.queryExecution.sparkPlan)
check(df.orderBy("j", "k").queryExecution.sparkPlan)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ message Write {
message Common {
string format = 1;
string job_task_attempt_id = 2; // currently used in mergetree format

// Describes the partition index in the WriteRel.table_schema.
repeated int32 partition_col_index = 3;
}
message ParquetWrite{}
message OrcWrite{}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ object CHRuleApi {

// Legacy: Post-transform rules.
injector.injectPostTransform(_ => PruneNestedColumnsInHiveTableScan)
injector.injectPostTransform(_ => RemoveNativeWriteFilesSortAndProject())
injector.injectPostTransform(c => intercept(RewriteTransformer.apply(c.session)))
injector.injectPostTransform(_ => PushDownFilterToScan)
injector.injectPostTransform(_ => PushDownInputFileExpression.PostOffload)
Expand Down
Loading

0 comments on commit 1b14e74

Please sign in to comment.