Skip to content

Commit

Permalink
[GLUTEN-7028][CH][Part-6] Introduce MergeTreeDelayedCommitProtocol (#…
Browse files Browse the repository at this point in the history
…7506)

* DEFAULT_USE_DATASOURCE_V2
* Remove unnecessary parameters
* Remove MergeTreeWriteTaskResult
Remove MergeTreeExecutedWriteSummary
Remove MergeTreeWriterBucketSpec and MergeTreeWriteJobDescription, using spark's definition

* Replace MergeTreeCommitProtocol with MergeTreeDelayedCommitProtocol

* Replace MergeTreeFileFormatWriter with FileFormatWriter

* MergeTreeFileCommitProtocol trait 1

* MergeTreeFileCommitProtocol trait spark 32

* MergeTreeFileCommitProtocol trait spark 35

* FakeRowOutput

* MergeTreeFileFormat trait

* Add job_task_attempt_id for debug aid

* ClickHouseTableV2Base.normalizedBucketSpec
MergeTreeWriterInjects.insertFakeRowAdaptor
  • Loading branch information
baibaichen authored Oct 22, 2024
1 parent 6521ffa commit 19090da
Show file tree
Hide file tree
Showing 44 changed files with 718 additions and 1,421 deletions.
1 change: 0 additions & 1 deletion backends-clickhouse/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,6 @@
<excludes>
<exclude>src/main/delta-${delta.binary.version}/org/apache/spark/sql/delta/commands/*.scala</exclude>
<exclude>src/main/delta-${delta.binary.version}/org/apache/spark/sql/delta/commands/merge/*.scala</exclude>
<exclude>src/main/delta-${delta.binary.version}/org/apache/spark/sql/delta/files/*.scala</exclude>
<exclude>src/main/delta-${delta.binary.version}/org/apache/spark/sql/delta/DeltaLog.scala</exclude>
<exclude>src/main/delta-${delta.binary.version}/org/apache/spark/sql/delta/Snapshot.scala</exclude>
</excludes>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,18 @@
package org.apache.spark.sql.delta

import org.apache.gluten.backendsapi.clickhouse.CHConf
import org.apache.gluten.execution.ColumnarToRowExecBase

import org.apache.spark.SparkException
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.delta.actions._
import org.apache.spark.sql.delta.catalog.ClickHouseTableV2
import org.apache.spark.sql.delta.constraints.{Constraint, Constraints}
import org.apache.spark.sql.delta.files.MergeTreeCommitProtocol
import org.apache.spark.sql.delta.files.MergeTreeDelayedCommitProtocol
import org.apache.spark.sql.delta.schema.InvariantViolationException
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, FakeRowAdaptor, FileFormatWriter, GlutenWriterColumnarRules, WriteJobStatsTracker}
import org.apache.spark.sql.execution.datasources.v1.clickhouse.MergeTreeFileFormatWriter
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, FileFormatWriter, GlutenWriterColumnarRules, WriteJobStatsTracker}
import org.apache.spark.sql.execution.datasources.v1.MergeTreeWriterInjects
import org.apache.spark.sql.execution.datasources.v2.clickhouse.ClickHouseConfig
import org.apache.spark.util.{Clock, SerializableConfiguration}

Expand All @@ -51,26 +49,6 @@ class ClickhouseOptimisticTransaction(
)
}

def insertFakeRowAdaptor(queryPlan: SparkPlan): SparkPlan = queryPlan match {
// if the child is columnar, we can just wrap&transfer the columnar data
case c2r: ColumnarToRowExecBase =>
FakeRowAdaptor(c2r.child)
// If the child is aqe, we make aqe "support columnar",
// then aqe itself will guarantee to generate columnar outputs.
// So FakeRowAdaptor will always consumes columnar data,
// thus avoiding the case of c2r->aqe->r2c->writer
case aqe: AdaptiveSparkPlanExec =>
FakeRowAdaptor(
AdaptiveSparkPlanExec(
aqe.inputPlan,
aqe.context,
aqe.preprocessingRules,
aqe.isSubquery,
supportsColumnar = true
))
case other => FakeRowAdaptor(other)
}

override def writeFiles(
inputData: Dataset[_],
writeOptions: Option[DeltaOptions],
Expand All @@ -84,9 +62,14 @@ class ClickhouseOptimisticTransaction(

val (queryExecution, output, generatedColumnConstraints, _) =
normalizeData(deltaLog, data)
val partitioningColumns = getPartitioningColumns(partitionSchema, output)

val committer = new MergeTreeCommitProtocol("delta-mergetree", outputPath.toString, None)
val tableV2 = ClickHouseTableV2.getTable(deltaLog)
val committer =
new MergeTreeDelayedCommitProtocol(
outputPath.toString,
None,
tableV2.dataBaseName,
tableV2.tableName)

// val (optionalStatsTracker, _) =
// getOptionalStatsTrackerAndStatsCollection(output, outputPath, partitionSchema, data)
Expand All @@ -96,11 +79,11 @@ class ClickhouseOptimisticTransaction(
Constraints.getAll(metadata, spark) ++ generatedColumnConstraints ++ additionalConstraints

SQLExecution.withNewExecutionId(queryExecution, Option("deltaTransactionalWrite")) {
val outputSpec = FileFormatWriter.OutputSpec(outputPath.toString, Map.empty, output)

val queryPlan = queryExecution.executedPlan
val newQueryPlan = insertFakeRowAdaptor(queryPlan)

val (newQueryPlan, newOutput) =
MergeTreeWriterInjects.insertFakeRowAdaptor(queryPlan, output)
val outputSpec = FileFormatWriter.OutputSpec(outputPath.toString, Map.empty, newOutput)
val partitioningColumns = getPartitioningColumns(partitionSchema, newOutput)
val statsTrackers: ListBuffer[WriteJobStatsTracker] = ListBuffer()

if (spark.conf.get(DeltaSQLConf.DELTA_HISTORY_METRICS_ENABLED)) {
Expand Down Expand Up @@ -128,18 +111,17 @@ class ClickhouseOptimisticTransaction(
spark.conf.getAll.foreach(
entry => {
if (
CHConf.startWithSettings(entry._1)
CHConf.startWithSettingsPrefix(entry._1)
|| entry._1.equalsIgnoreCase(DeltaSQLConf.DELTA_OPTIMIZE_MIN_FILE_SIZE.key)
) {
options += (entry._1 -> entry._2)
}
})

try {
val tableV2 = ClickHouseTableV2.getTable(deltaLog)
val format = tableV2.getFileFormat(metadata)
GlutenWriterColumnarRules.injectSparkLocalProperty(spark, Some(format.shortName()))
MergeTreeFileFormatWriter.write(
FileFormatWriter.write(
sparkSession = spark,
plan = newQueryPlan,
fileFormat = format,
Expand All @@ -150,17 +132,11 @@ class ClickhouseOptimisticTransaction(
hadoopConf = spark.sessionState
.newHadoopConfWithOptions(metadata.configuration ++ deltaLog.options),
// scalastyle:on deltahadoopconfiguration
orderByKeyOption = tableV2.orderByKeyOption,
lowCardKeyOption = tableV2.lowCardKeyOption,
minmaxIndexKeyOption = tableV2.minmaxIndexKeyOption,
bfIndexKeyOption = tableV2.bfIndexKeyOption,
setIndexKeyOption = tableV2.setIndexKeyOption,
primaryKeyOption = tableV2.primaryKeyOption,
partitionColumns = partitioningColumns,
bucketSpec = tableV2.bucketOption,
bucketSpec =
tableV2.normalizedBucketSpec(output.map(_.name), spark.sessionState.conf.resolver),
statsTrackers = optionalStatsTracker.toSeq ++ statsTrackers,
options = options,
constraints = constraints
options = options
)
} catch {
case s: SparkException =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,8 @@ import org.apache.spark.sql.delta._
import org.apache.spark.sql.delta.actions.{AddFile, FileAction}
import org.apache.spark.sql.delta.catalog.ClickHouseTableV2
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.datasources.CHDatasourceJniWrapper
import org.apache.spark.sql.execution.datasources.{CHDatasourceJniWrapper, WriteTaskResult}
import org.apache.spark.sql.execution.datasources.v1.CHMergeTreeWriterInjects
import org.apache.spark.sql.execution.datasources.v1.clickhouse._
import org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.{AddFileTags, AddMergeTreeParts}
import org.apache.spark.sql.execution.datasources.v2.clickhouse.utils.CHDataSourceUtils
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -73,7 +72,7 @@ object OptimizeTableCommandOverwrites extends Logging {
sparkStageId: Int,
sparkPartitionId: Int,
sparkAttemptNumber: Int
): MergeTreeWriteTaskResult = {
): WriteTaskResult = {
CHThreadGroup.registerNewThreadGroup()
val jobId = SparkHadoopWriterUtils.createJobID(new Date(description.jobIdInstant), sparkStageId)
val taskId = new TaskID(jobId, TaskType.MAP, sparkPartitionId)
Expand Down Expand Up @@ -135,7 +134,7 @@ object OptimizeTableCommandOverwrites extends Logging {
// val summary = MergeTreeExecutedWriteSummary(
// updatedPartitions = updatedPartitions.toSet,
// stats = statsTrackers.map(_.getFinalStats(taskCommitTime)))
MergeTreeWriteTaskResult(taskCommitMessage, null)
WriteTaskResult(taskCommitMessage, null)
} else {
throw new IllegalStateException()
}
Expand Down Expand Up @@ -172,7 +171,7 @@ object OptimizeTableCommandOverwrites extends Logging {
sparkSession.sparkContext.parallelize(Array.empty[InternalRow], 1)

val jobIdInstant = new Date().getTime
val ret = new Array[MergeTreeWriteTaskResult](rddWithNonEmptyPartitions.partitions.length)
val ret = new Array[WriteTaskResult](rddWithNonEmptyPartitions.partitions.length)

val serializableHadoopConf = new SerializableConfiguration(
sparkSession.sessionState.newHadoopConfWithOptions(
Expand Down Expand Up @@ -221,7 +220,7 @@ object OptimizeTableCommandOverwrites extends Logging {
)
},
rddWithNonEmptyPartitions.partitions.indices,
(index, res: MergeTreeWriteTaskResult) => {
(index, res: WriteTaskResult) => {
ret(index) = res
}
)
Expand Down
Loading

0 comments on commit 19090da

Please sign in to comment.