Skip to content

Commit

Permalink
[GLUTEN-7028][CH][Part-10] Collecting Delta stats for MergeTree (#8029)
Browse files Browse the repository at this point in the history
* [Refactor] Add SparkRowInfoJNI for later use

* [Minor] RuntimeSettings.MIN_INSERT_BLOCK_SIZE_ROWS and RuntimeConfig.LOGGER_LEVEL

* [Minor] Using one pipeline write for spark 35, ignore some failed tests now

* [Fix Bug] Only output NO_PARTITION_ID at collectStats to avoid name bug in mergeParts

* [Refactor] mergeParts return MergeTreeDataPartPtr  instead of vector<MergeTreeDataPartPtr>

* [Minor] debug::dumpMemoryUsage and debug::printBlockHeader

* [Feature] SparkMergeTreeSink add DeltaStats

* [Feature] collect DeltaStats for mergetree

* [Feature] one pipeline write for parquet optimize

* [Feature] one pipeline write for mergetree optimize

* Fix ut
  • Loading branch information
baibaichen authored Dec 20, 2024
1 parent dcd356c commit 0530292
Show file tree
Hide file tree
Showing 48 changed files with 1,949 additions and 459 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import org.apache.spark.util.SerializableConfiguration

import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.delta.perf.DeltaOptimizedWriterExec

import scala.collection.mutable.ListBuffer

Expand Down Expand Up @@ -69,20 +70,22 @@ class ClickhouseOptimisticTransaction(
additionalConstraints: Seq[Constraint]): Seq[FileAction] = {
val nativeWrite = GlutenConfig.getConf.enableNativeWriter.getOrElse(false)
if (writingMergeTree) {
if (isOptimize) {
throw new UnsupportedOperationException("Optimize is not supported for ClickHouse")
}
// TODO: update FallbackByBackendSettings for mergetree always return true
val onePipeline = nativeWrite && CHConf.get.enableOnePipelineMergeTreeWrite
if (onePipeline)
pipelineWriteFiles(inputData, writeOptions, isOptimize = false, additionalConstraints)
else
pipelineWriteFiles(inputData, writeOptions, isOptimize, additionalConstraints)
else {
if (isOptimize) {
throw new UnsupportedOperationException(
"Optimize is only supported in one pipeline native write mode")
}
writeMergeTree(inputData, writeOptions, additionalConstraints)
}
} else {
if (isOptimize || !nativeWrite) {
super.writeFiles(inputData, writeOptions, isOptimize, additionalConstraints)
if (nativeWrite) {
pipelineWriteFiles(inputData, writeOptions, isOptimize, additionalConstraints)
} else {
pipelineWriteFiles(inputData, writeOptions, isOptimize = false, additionalConstraints)
super.writeFiles(inputData, writeOptions, isOptimize, additionalConstraints)
}
}
}
Expand Down Expand Up @@ -217,6 +220,19 @@ class ClickhouseOptimisticTransaction(
tableV2.tableName)
}

/**
* Writes out the dataframe in pipeline mode after performing schema validation.Returns a list of
* actions to append these files to the reservoir.
*
* @param inputData
* Data to write out.
* @param writeOptions
* Options to decide how to write out the data.
* @param isOptimize
* Whether the operation writing this is Optimize or not.
* @param additionalConstraints
* Additional constraints on the write.
*/
private def pipelineWriteFiles(
inputData: Dataset[_],
writeOptions: Option[DeltaOptions],
Expand All @@ -232,21 +248,13 @@ class ClickhouseOptimisticTransaction(
normalizeData(deltaLog, writeOptions, data)
val partitioningColumns = getPartitioningColumns(partitionSchema, output)

val fileFormat = deltaLog.fileFormat(protocol, metadata) // TODO support changing formats.

val (committer, collectStats) = fileFormat.toString match {
case "MergeTree" => (getCommitter2(outputPath), false)
case _ => (getCommitter(outputPath), true)
}
val committer = if (writingMergeTree) getCommitter2(outputPath) else getCommitter(outputPath)

// If Statistics Collection is enabled, then create a stats tracker that will be injected during
// the FileFormatWriter.write call below and will collect per-file stats using
// StatisticsCollection
val (optionalStatsTracker, _) = if (collectStats) {
val (optionalStatsTracker, _) =
getOptionalStatsTrackerAndStatsCollection(output, outputPath, partitionSchema, data)
} else {
(None, None)
}

val constraints =
Constraints.getAll(metadata, spark) ++ generatedColumnConstraints ++ additionalConstraints
Expand All @@ -259,18 +267,18 @@ class ClickhouseOptimisticTransaction(
// TODO: val checkInvariants = DeltaInvariantCheckerExec(empty2NullPlan, constraints)
val checkInvariants = empty2NullPlan

// TODO: DeltaOptimizedWriterExec
// No need to plan optimized write if the write command is OPTIMIZE, which aims to produce
// evenly-balanced data files already.
// TODO: val physicalPlan =
// if (
// !isOptimize &&
// shouldOptimizeWrite(writeOptions, spark.sessionState.conf)
// ) {
// DeltaOptimizedWriterExec(checkInvariants, metadata.partitionColumns, deltaLog)
// } else {
// checkInvariants
// }
val physicalPlan = checkInvariants
val physicalPlan =
if (
!isOptimize &&
shouldOptimizeWrite(writeOptions, spark.sessionState.conf)
) {
DeltaOptimizedWriterExec(checkInvariants, metadata.partitionColumns, deltaLog)
} else {
checkInvariants
}

val statsTrackers: ListBuffer[WriteJobStatsTracker] = ListBuffer()

Expand All @@ -296,6 +304,7 @@ class ClickhouseOptimisticTransaction(
}.toMap
}) + (DeltaOptions.WRITE_PARTITION_COLUMNS -> writePartitionColumns.toString)

val fileFormat = deltaLog.fileFormat(protocol, metadata) // TODO support changing formats.
val executedPlan = DeltaV1Writes(
spark,
physicalPlan,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,21 +39,12 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnaryNode}
import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.execution.datasources.v2.clickhouse.ClickHouseConfig
import org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMergeTreeParts
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.execution.metric.SQLMetrics.createMetric
import org.apache.spark.sql.types._
import org.apache.spark.util.{SystemClock, ThreadUtils}

/**
* Gluten overwrite Delta:
*
* This file is copied from Delta 3.2.1. It is modified in:
* 1. getDeltaTable supports to get ClickHouseTableV2
* 2. runOptimizeBinJobClickhouse
* 3. groupFilesIntoBinsClickhouse
*/
// TODO: Remove this file once we needn't support bucket

/** Base class defining abstract optimize command */
abstract class OptimizeTableCommandBase extends RunnableCommand with DeltaCommand {
Expand Down Expand Up @@ -152,10 +143,7 @@ case class OptimizeTableCommand(
copy(child = newChild)(zOrderBy)

override def run(sparkSession: SparkSession): Seq[Row] = {
// --- modified start
val table = OptimizeTableCommandOverwrites.getDeltaTable(child, "OPTIMIZE")
// --- modified end

val table = getDeltaTable(child, "OPTIMIZE")
val txn = table.startTransaction()
if (txn.readVersion == -1) {
throw DeltaErrors.notADeltaTableException(table.deltaLog.dataPath.toString)
Expand Down Expand Up @@ -268,12 +256,6 @@ class OptimizeExecutor(

def optimize(): Seq[Row] = {
recordDeltaOperation(txn.deltaLog, "delta.optimize") {

// --- modified start
val isMergeTreeFormat = ClickHouseConfig
.isMergeTreeFormatEngine(txn.deltaLog.unsafeVolatileMetadata.configuration)
// --- modified end

val minFileSize = optimizeContext.minFileSize.getOrElse(
sparkSession.sessionState.conf.getConf(DeltaSQLConf.DELTA_OPTIMIZE_MIN_FILE_SIZE))
val maxFileSize = optimizeContext.maxFileSize.getOrElse(
Expand All @@ -288,39 +270,15 @@ class OptimizeExecutor(
case Some(reorgOperation) => reorgOperation.filterFilesToReorg(txn.snapshot, candidateFiles)
case None => filterCandidateFileList(minFileSize, maxDeletedRowsRatio, candidateFiles)
}
// --- modified start
val maxThreads =
sparkSession.sessionState.conf.getConf(DeltaSQLConf.DELTA_OPTIMIZE_MAX_THREADS)
val (updates, jobs) = if (isMergeTreeFormat) {
val partitionsToCompact = filesToProcess
.groupBy(file => (file.asInstanceOf[AddMergeTreeParts].bucketNum, file.partitionValues))
.toSeq
val jobs = OptimizeTableCommandOverwrites
.groupFilesIntoBinsClickhouse(partitionsToCompact, maxFileSize)
val updates = ThreadUtils.parmap(jobs, "OptimizeJob", maxThreads) {
partitionBinGroup =>
// --- modified start
OptimizeTableCommandOverwrites.runOptimizeBinJobClickhouse(
txn,
partitionBinGroup._1._2,
partitionBinGroup._1._1,
partitionBinGroup._2,
maxFileSize)
// --- modified end
}.flatten
// uniform the jobs type
(updates, jobs.map(v => (v._1._2 ++ Map("bucketNum" -> v._1.toString()), v._2)))
} else {
val partitionsToCompact = filesToProcess.groupBy(_.partitionValues).toSeq
val partitionsToCompact = filesToProcess.groupBy(_.partitionValues).toSeq

val jobs = groupFilesIntoBins(partitionsToCompact)
val jobs = groupFilesIntoBins(partitionsToCompact)

val updates = ThreadUtils.parmap(jobs, "OptimizeJob", maxThreads) { partitionBinGroup =>
runOptimizeBinJob(txn, partitionBinGroup._1, partitionBinGroup._2, maxFileSize)
}.flatten
(updates, jobs)
}
// --- modified end
val maxThreads =
sparkSession.sessionState.conf.getConf(DeltaSQLConf.DELTA_OPTIMIZE_MAX_THREADS)
val updates = ThreadUtils.parmap(jobs, "OptimizeJob", maxThreads) { partitionBinGroup =>
runOptimizeBinJob(txn, partitionBinGroup._1, partitionBinGroup._2, maxFileSize)
}.flatten

val addedFiles = updates.collect { case a: AddFile => a }
val removedFiles = updates.collect { case r: RemoveFile => r }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,8 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Projection, UnsafeProjection}
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, DeclarativeAggregate}
import org.apache.spark.sql.delta.files.{FileDelayedCommitProtocol, MergeTreeDelayedCommitProtocol2}
import org.apache.spark.sql.delta.stats.DeltaFileStatistics
import org.apache.spark.sql.delta.stats.{DeltaFileStatistics, DeltaJobStatisticsTracker}
import org.apache.spark.sql.execution.datasources.{ExecutedWriteSummary, WriteJobDescription, WriteTaskResult}
import org.apache.spark.sql.types.StringType
import org.apache.spark.util.Utils

import scala.collection.mutable.ArrayBuffer
Expand All @@ -38,7 +37,7 @@ case class DeltaFileCommitInfo(committer: FileDelayedCommitProtocol)
val addedFiles: ArrayBuffer[(Map[String, String], String)] =
new ArrayBuffer[(Map[String, String], String)]
override def apply(stat: NativeFileWriteResult): Unit = {
if (stat.partition_id == "__NO_PARTITION_ID__") {
if (stat.partition_id == CHColumnarWrite.EMPTY_PARTITION_ID) {
addedFiles.append((Map.empty[String, String], stat.filename))
} else {
val partitionValues = committer.parsePartitions(stat.partition_id)
Expand All @@ -61,14 +60,15 @@ case class NativeDeltaStats(projection: Projection) extends (InternalRow => Unit

def result: DeltaFileStatistics = DeltaFileStatistics(results.toMap)
}
case class FileDeltaColumnarWrite(
override val jobTrackerID: String,
override val description: WriteJobDescription,
override val committer: FileDelayedCommitProtocol)
extends CHColumnarWrite[FileDelayedCommitProtocol]
with Logging {

private lazy val nativeDeltaStats: Option[NativeDeltaStats] = {
trait SupportNativeDeltaStats[T <: FileCommitProtocol] extends CHColumnarWrite[T] {

private lazy val deltaWriteJobStatsTracker: Option[DeltaJobStatisticsTracker] =
description.statsTrackers
.find(_.isInstanceOf[DeltaJobStatisticsTracker])
.map(_.asInstanceOf[DeltaJobStatisticsTracker])

lazy val nativeDeltaStats: Option[NativeDeltaStats] = {
deltaWriteJobStatsTracker
.map(
delta => {
Expand All @@ -77,10 +77,7 @@ case class FileDeltaColumnarWrite(
if ae.aggregateFunction.isInstanceOf[DeclarativeAggregate] =>
ae.aggregateFunction.asInstanceOf[DeclarativeAggregate].evaluateExpression
}
val z = Seq(
AttributeReference("filename", StringType, nullable = false)(),
AttributeReference("partition_id", StringType, nullable = false)())
val s =
val vanillaSchema =
delta.statsColExpr
.collect {
case ae: AggregateExpression
Expand All @@ -92,10 +89,24 @@ case class FileDeltaColumnarWrite(
NativeDeltaStats(
UnsafeProjection.create(
exprs = Seq(r),
inputSchema = z :++ s
inputSchema = nativeStatsSchema(vanillaSchema)
))
})
}

def nativeStatsSchema(vanilla: Seq[AttributeReference]): Seq[AttributeReference]
}

case class FileDeltaColumnarWrite(
override val jobTrackerID: String,
override val description: WriteJobDescription,
override val committer: FileDelayedCommitProtocol)
extends SupportNativeDeltaStats[FileDelayedCommitProtocol]
with Logging {

override def nativeStatsSchema(vanilla: Seq[AttributeReference]): Seq[AttributeReference] =
NativeFileWriteResult.nativeStatsSchema(vanilla)

override def doSetupNativeTask(): Unit = {
assert(description.path == committer.outputPath)
val nameSpec = CreateFileNameSpec(taskAttemptContext, description)
Expand Down
Loading

0 comments on commit 0530292

Please sign in to comment.