Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GLUTEN-7028][CH][Part-9] Collecting Delta stats for parquet #7993

Merged
merged 9 commits into from
Nov 21, 2024
12 changes: 12 additions & 0 deletions backends-clickhouse/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,18 @@
</sources>
</configuration>
</execution>
<execution>
<id>add-test-sources</id>
<phase>generate-test-sources</phase>
<goals>
<goal>add-test-source</goal>
</goals>
<configuration>
<sources>
<source>src/test/delta-${delta.binary.version}</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import org.apache.spark.sql.delta.files._
import org.apache.spark.sql.delta.hooks.AutoCompact
import org.apache.spark.sql.delta.schema.{InnerInvariantViolationException, InvariantViolationException}
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.stats.DeltaJobStatisticsTracker
import org.apache.spark.sql.execution.{QueryExecution, SparkPlan, SQLExecution}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, FileFormatWriter, GlutenWriterColumnarRules, WriteFiles, WriteJobStatsTracker}
Expand All @@ -50,6 +49,9 @@ class ClickhouseOptimisticTransaction(
override val snapshot: Snapshot)
extends OptimisticTransaction(deltaLog, catalogTable, snapshot) {

private lazy val writingMergeTree =
ClickHouseConfig.isMergeTreeFormatEngine(metadata.configuration)

def this(
deltaLog: DeltaLog,
catalogTable: Option[CatalogTable],
Expand All @@ -62,119 +64,137 @@ class ClickhouseOptimisticTransaction(
}

override def writeFiles(
inputData: Dataset[_],
writeOptions: Option[DeltaOptions],
isOptimize: Boolean,
additionalConstraints: Seq[Constraint]): Seq[FileAction] = {
val nativeWrite = GlutenConfig.getConf.enableNativeWriter.getOrElse(false)
if (writingMergeTree) {
if (isOptimize) {
throw new UnsupportedOperationException("Optimize is not supported for ClickHouse")
}
// TODO: update FallbackByBackendSettings for mergetree always return true
val onePipeline = nativeWrite && CHConf.get.enableOnePipelineMergeTreeWrite
if (onePipeline)
pipelineWriteFiles(inputData, writeOptions, isOptimize = false, additionalConstraints)
else
writeMergeTree(inputData, writeOptions, additionalConstraints)
} else {
if (isOptimize || !nativeWrite) {
super.writeFiles(inputData, writeOptions, isOptimize, additionalConstraints)
} else {
pipelineWriteFiles(inputData, writeOptions, isOptimize = false, additionalConstraints)
}
}
}

@deprecated("Use pipelineWriteFiles instead")
private def writeMergeTree(
inputData: Dataset[_],
writeOptions: Option[DeltaOptions],
additionalConstraints: Seq[Constraint]): Seq[FileAction] = {

// TODO: update FallbackByBackendSettings for mergetree always return true
val onePipeline = GlutenConfig.getConf.enableNativeWriter.getOrElse(
false) && CHConf.get.enableOnePipelineMergeTreeWrite

if (!onePipeline && ClickHouseConfig.isMergeTreeFormatEngine(metadata.configuration)) {
hasWritten = true

val spark = inputData.sparkSession
val (data, partitionSchema) = performCDCPartition(inputData)
val outputPath = deltaLog.dataPath

val (queryExecution, output, generatedColumnConstraints, _) =
normalizeData(deltaLog, writeOptions, data)

val tableV2 = ClickHouseTableV2.getTable(deltaLog)
val committer =
new MergeTreeDelayedCommitProtocol(
outputPath.toString,
None,
None,
tableV2.dataBaseName,
tableV2.tableName)

// val (optionalStatsTracker, _) =
// getOptionalStatsTrackerAndStatsCollection(output, outputPath, partitionSchema, data)
val (optionalStatsTracker, _) = (None, None)

val constraints =
Constraints.getAll(metadata, spark) ++ generatedColumnConstraints ++ additionalConstraints

SQLExecution.withNewExecutionId(queryExecution, Option("deltaTransactionalWrite")) {
val queryPlan = queryExecution.executedPlan
val (newQueryPlan, newOutput) =
MergeTreeWriterInjects.insertFakeRowAdaptor(queryPlan, output)
val outputSpec = FileFormatWriter.OutputSpec(outputPath.toString, Map.empty, newOutput)
val partitioningColumns = getPartitioningColumns(partitionSchema, newOutput)

val statsTrackers: ListBuffer[WriteJobStatsTracker] = ListBuffer()

if (spark.conf.get(DeltaSQLConf.DELTA_HISTORY_METRICS_ENABLED)) {
val basicWriteJobStatsTracker = new BasicWriteJobStatsTracker(
new SerializableConfiguration(deltaLog.newDeltaHadoopConf()),
BasicWriteJobStatsTracker.metrics)
// registerSQLMetrics(spark, basicWriteJobStatsTracker.driverSideMetrics)
statsTrackers.append(basicWriteJobStatsTracker)
}
hasWritten = true

// Iceberg spec requires partition columns in data files
val writePartitionColumns = IcebergCompat.isAnyEnabled(metadata)
// Retain only a minimal selection of Spark writer options to avoid any potential
// compatibility issues
var options = (writeOptions match {
case None => Map.empty[String, String]
case Some(writeOptions) =>
writeOptions.options.filterKeys {
key =>
key.equalsIgnoreCase(DeltaOptions.MAX_RECORDS_PER_FILE) ||
key.equalsIgnoreCase(DeltaOptions.COMPRESSION)
}.toMap
}) + (DeltaOptions.WRITE_PARTITION_COLUMNS -> writePartitionColumns.toString)

spark.conf.getAll.foreach(
entry => {
if (
CHConf.startWithSettingsPrefix(entry._1)
|| entry._1.equalsIgnoreCase(DeltaSQLConf.DELTA_OPTIMIZE_MIN_FILE_SIZE.key)
) {
options += (entry._1 -> entry._2)
}
})

try {
val format = tableV2.getFileFormat(protocol, metadata)
GlutenWriterColumnarRules.injectSparkLocalProperty(spark, Some(format.shortName()))
MergeTreeFileFormatWriter.write(
sparkSession = spark,
plan = newQueryPlan,
fileFormat = format,
// formats.
committer = committer,
outputSpec = outputSpec,
// scalastyle:off deltahadoopconfiguration
hadoopConf = spark.sessionState
.newHadoopConfWithOptions(metadata.configuration ++ deltaLog.options),
// scalastyle:on deltahadoopconfiguration
partitionColumns = partitioningColumns,
bucketSpec =
tableV2.normalizedBucketSpec(output.map(_.name), spark.sessionState.conf.resolver),
statsTrackers = optionalStatsTracker.toSeq ++ statsTrackers,
options = options,
constraints = constraints
)
} catch {
case s: SparkException =>
// Pull an InvariantViolationException up to the top level if it was the root cause.
val violationException = ExceptionUtils.getRootCause(s)
if (violationException.isInstanceOf[InvariantViolationException]) {
throw violationException
} else {
throw s
}
} finally {
GlutenWriterColumnarRules.injectSparkLocalProperty(spark, None)
}
val spark = inputData.sparkSession
val (data, partitionSchema) = performCDCPartition(inputData)
val outputPath = deltaLog.dataPath

val (queryExecution, output, generatedColumnConstraints, _) =
normalizeData(deltaLog, writeOptions, data)

val tableV2 = ClickHouseTableV2.getTable(deltaLog)
val committer =
new MergeTreeDelayedCommitProtocol(
outputPath.toString,
None,
None,
tableV2.dataBaseName,
tableV2.tableName)

// val (optionalStatsTracker, _) =
// getOptionalStatsTrackerAndStatsCollection(output, outputPath, partitionSchema, data)
val (optionalStatsTracker, _) = (None, None)

val constraints =
Constraints.getAll(metadata, spark) ++ generatedColumnConstraints ++ additionalConstraints

SQLExecution.withNewExecutionId(queryExecution, Option("deltaTransactionalWrite")) {
val queryPlan = queryExecution.executedPlan
val (newQueryPlan, newOutput) =
MergeTreeWriterInjects.insertFakeRowAdaptor(queryPlan, output)
val outputSpec = FileFormatWriter.OutputSpec(outputPath.toString, Map.empty, newOutput)
val partitioningColumns = getPartitioningColumns(partitionSchema, newOutput)

val statsTrackers: ListBuffer[WriteJobStatsTracker] = ListBuffer()

if (spark.conf.get(DeltaSQLConf.DELTA_HISTORY_METRICS_ENABLED)) {
val basicWriteJobStatsTracker = new BasicWriteJobStatsTracker(
new SerializableConfiguration(deltaLog.newDeltaHadoopConf()),
BasicWriteJobStatsTracker.metrics)
// registerSQLMetrics(spark, basicWriteJobStatsTracker.driverSideMetrics)
statsTrackers.append(basicWriteJobStatsTracker)
}

// Iceberg spec requires partition columns in data files
val writePartitionColumns = IcebergCompat.isAnyEnabled(metadata)
// Retain only a minimal selection of Spark writer options to avoid any potential
// compatibility issues
var options = (writeOptions match {
case None => Map.empty[String, String]
case Some(writeOptions) =>
writeOptions.options.filterKeys {
key =>
key.equalsIgnoreCase(DeltaOptions.MAX_RECORDS_PER_FILE) ||
key.equalsIgnoreCase(DeltaOptions.COMPRESSION)
}.toMap
}) + (DeltaOptions.WRITE_PARTITION_COLUMNS -> writePartitionColumns.toString)

spark.conf.getAll.foreach(
entry => {
if (
CHConf.startWithSettingsPrefix(entry._1)
|| entry._1.equalsIgnoreCase(DeltaSQLConf.DELTA_OPTIMIZE_MIN_FILE_SIZE.key)
) {
options += (entry._1 -> entry._2)
}
})

try {
val format = tableV2.getFileFormat(protocol, metadata)
GlutenWriterColumnarRules.injectSparkLocalProperty(spark, Some(format.shortName()))
MergeTreeFileFormatWriter.write(
sparkSession = spark,
plan = newQueryPlan,
fileFormat = format,
// formats.
committer = committer,
outputSpec = outputSpec,
// scalastyle:off deltahadoopconfiguration
hadoopConf = spark.sessionState
.newHadoopConfWithOptions(metadata.configuration ++ deltaLog.options),
// scalastyle:on deltahadoopconfiguration
partitionColumns = partitioningColumns,
bucketSpec =
tableV2.normalizedBucketSpec(output.map(_.name), spark.sessionState.conf.resolver),
statsTrackers = optionalStatsTracker.toSeq ++ statsTrackers,
options = options,
constraints = constraints
)
} catch {
case s: SparkException =>
// Pull an InvariantViolationException up to the top level if it was the root cause.
val violationException = ExceptionUtils.getRootCause(s)
if (violationException.isInstanceOf[InvariantViolationException]) {
throw violationException
} else {
throw s
}
} finally {
GlutenWriterColumnarRules.injectSparkLocalProperty(spark, None)
}
committer.addedStatuses.toSeq ++ committer.changeFiles
} else {
super.writeFiles(inputData, writeOptions, additionalConstraints)
}
committer.addedStatuses.toSeq ++ committer.changeFiles
}

private def shouldOptimizeWrite(
Expand All @@ -188,16 +208,21 @@ class ClickhouseOptimisticTransaction(
override protected def getCommitter(outputPath: Path): DelayedCommitProtocol =
new FileDelayedCommitProtocol("delta", outputPath.toString, None, deltaDataSubdir)

override def writeFiles(
private def getCommitter2(outputPath: Path): DelayedCommitProtocol = {
val tableV2 = ClickHouseTableV2.getTable(deltaLog)
new MergeTreeDelayedCommitProtocol2(
outputPath.toString,
None,
deltaDataSubdir,
tableV2.dataBaseName,
tableV2.tableName)
}

private def pipelineWriteFiles(
inputData: Dataset[_],
writeOptions: Option[DeltaOptions],
isOptimize: Boolean,
additionalConstraints: Seq[Constraint]): Seq[FileAction] = {

if (isOptimize) {
throw new UnsupportedOperationException("Optimize is not supported for ClickHouse")
}

hasWritten = true

val spark = inputData.sparkSession
Expand Down Expand Up @@ -229,24 +254,19 @@ class ClickhouseOptimisticTransaction(
WriteFiles(logicalPlan, fileFormat, partitioningColumns, None, options, Map.empty)

val queryExecution = new QueryExecution(spark, write)
val committer = fileFormat.toString match {
case "MergeTree" =>
val tableV2 = ClickHouseTableV2.getTable(deltaLog)
new MergeTreeDelayedCommitProtocol2(
outputPath.toString,
None,
deltaDataSubdir,
tableV2.dataBaseName,
tableV2.tableName)
case _ => getCommitter(outputPath)
val (committer, collectStats) = fileFormat.toString match {
case "MergeTree" => (getCommitter2(outputPath), false)
case _ => (getCommitter(outputPath), true)
}

// If Statistics Collection is enabled, then create a stats tracker that will be injected during
// the FileFormatWriter.write call below and will collect per-file stats using
// StatisticsCollection
// val (optionalStatsTracker, _) =
// getOptionalStatsTrackerAndStatsCollection(output, outputPath, partitionSchema, data)
val optionalStatsTracker: Option[DeltaJobStatisticsTracker] = None
val (optionalStatsTracker, _) = if (collectStats) {
getOptionalStatsTrackerAndStatsCollection(output, outputPath, partitionSchema, data)
} else {
(None, None)
}

val constraints =
Constraints.getAll(metadata, spark) ++ generatedColumnConstraints ++ additionalConstraints
Expand Down
Loading
Loading