Skip to content

Commit

Permalink
MergeTreeFileCommitProtocol trait spark 35
Browse files Browse the repository at this point in the history
  • Loading branch information
baibaichen committed Oct 21, 2024
1 parent d1030fd commit c110840
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 463 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ import org.apache.gluten.execution.ColumnarToRowExecBase

import org.apache.spark.SparkException
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogUtils}
import org.apache.spark.sql.delta.actions._
import org.apache.spark.sql.delta.catalog.ClickHouseTableV2
import org.apache.spark.sql.delta.constraints.{Constraint, Constraints}
import org.apache.spark.sql.delta.files.{DelayedCommitProtocol, DeltaFileFormatWriter, MergeTreeCommitProtocol, TransactionalWrite}
import org.apache.spark.sql.delta.files.{DelayedCommitProtocol, DeltaFileFormatWriter, MergeTreeDelayedCommitProtocol, TransactionalWrite}
import org.apache.spark.sql.delta.hooks.AutoCompact
import org.apache.spark.sql.delta.schema.{InnerInvariantViolationException, InvariantViolationException}
import org.apache.spark.sql.delta.sources.DeltaSQLConf
Expand Down Expand Up @@ -95,10 +95,27 @@ class ClickhouseOptimisticTransaction(

val (queryExecution, output, generatedColumnConstraints, _) =
normalizeData(deltaLog, writeOptions, data)
val partitioningColumns = getPartitioningColumns(partitionSchema, output)

val tableV2 = ClickHouseTableV2.getTable(deltaLog)
val bukSpec = if (tableV2.catalogTable.isDefined) {
tableV2.bucketOption
} else {
tableV2.bucketOption.map {
bucketSpec =>
CatalogUtils.normalizeBucketSpec(
tableV2.tableName,
output.map(_.name),
bucketSpec,
spark.sessionState.conf.resolver)
}
}
val committer =
new MergeTreeCommitProtocol("delta-mergetree", outputPath.toString, None, None)
new MergeTreeDelayedCommitProtocol(
outputPath.toString,
None,
None,
tableV2.dataBaseName,
tableV2.tableName)

// val (optionalStatsTracker, _) =
// getOptionalStatsTrackerAndStatsCollection(output, outputPath, partitionSchema, data)
Expand All @@ -108,10 +125,15 @@ class ClickhouseOptimisticTransaction(
Constraints.getAll(metadata, spark) ++ generatedColumnConstraints ++ additionalConstraints

SQLExecution.withNewExecutionId(queryExecution, Option("deltaTransactionalWrite")) {
val outputSpec = FileFormatWriter.OutputSpec(outputPath.toString, Map.empty, output)

val queryPlan = queryExecution.executedPlan
val newQueryPlan = insertFakeRowAdaptor(queryPlan)
assert(output.size == newQueryPlan.output.size)
val newOutput = newQueryPlan.output.zip(output).map {
case (newAttr, oldAttr) =>
oldAttr.withExprId(newAttr.exprId)
}
val outputSpec = FileFormatWriter.OutputSpec(outputPath.toString, Map.empty, newOutput)
val partitioningColumns = getPartitioningColumns(partitionSchema, newOutput)

val statsTrackers: ListBuffer[WriteJobStatsTracker] = ListBuffer()

Expand Down Expand Up @@ -148,7 +170,6 @@ class ClickhouseOptimisticTransaction(
})

try {
val tableV2 = ClickHouseTableV2.getTable(deltaLog)
val format = tableV2.getFileFormat(protocol, metadata)
GlutenWriterColumnarRules.injectSparkLocalProperty(spark, Some(format.shortName()))
MergeTreeFileFormatWriter.write(
Expand All @@ -163,7 +184,7 @@ class ClickhouseOptimisticTransaction(
.newHadoopConfWithOptions(metadata.configuration ++ deltaLog.options),
// scalastyle:on deltahadoopconfiguration
partitionColumns = partitioningColumns,
bucketSpec = tableV2.bucketOption,
bucketSpec = bukSpec,
statsTrackers = optionalStatsTracker.toSeq ++ statsTrackers,
options = options,
constraints = constraints
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.delta.files

class MergeTreeDelayedCommitProtocol(
val outputPath: String,
randomPrefixLength: Option[Int],
subdir: Option[String],
val database: String,
val tableName: String)
extends DelayedCommitProtocol("delta-mergetree", outputPath, randomPrefixLength, subdir)
with MergeTreeFileCommitProtocol {}
Loading

0 comments on commit c110840

Please sign in to comment.