Skip to content

Commit

Permalink
[GLUTEN-5901][CH] Support CH backend parquet + delta (#5902)
Browse files Browse the repository at this point in the history
Support CH backend to read/write parquet with the delta:

1. native read parquet from the delta catalog;
2. fallback write the parquet to the delta catalog ( don't support the DeltaInvariantCheckerExec operator and DeltaTaskStatisticsTracker) ;
3. Use the ClickHouseSparkCatalog as the uniform catalog.

Close #5901.
  • Loading branch information
zzcclp authored May 31, 2024
1 parent 2fc808d commit 2c89fb1
Show file tree
Hide file tree
Showing 32 changed files with 4,241 additions and 2,108 deletions.
6 changes: 6 additions & 0 deletions backends-clickhouse/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,12 @@
<include>src/main/delta-${delta.binary.version}/**/*.scala</include>
<include>src/test/delta-${delta.binary.version}/**/*.scala</include>
</includes>
<excludes>
<exclude>src/main/delta-${delta.binary.version}/org/apache/spark/sql/delta/commands/*.scala</exclude>
<exclude>src/main/delta-${delta.binary.version}/org/apache/spark/sql/delta/files/*.scala</exclude>
<exclude>src/main/delta-${delta.binary.version}/org/apache/spark/sql/delta/DeltaLog.scala</exclude>
<exclude>src/main/delta-${delta.binary.version}/org/apache/spark/sql/delta/Snapshot.scala</exclude>
</excludes>
</scala>
</configuration>
</plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.schema.{SchemaMergingUtils, SchemaUtils}
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.storage.LogStoreProvider
import org.apache.spark.sql.execution.datasources.v2.clickhouse.ClickHouseConfig
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources.{BaseRelation, InsertableRelation}
import org.apache.spark.sql.types.{StructField, StructType}
Expand Down Expand Up @@ -214,7 +215,9 @@ class DeltaLog private (
*/
def startTransaction(): OptimisticTransaction = {
update()
// --- modified start
new ClickhouseOptimisticTransaction(this, None)
// --- modified end
}

/**
Expand Down Expand Up @@ -443,7 +446,13 @@ class DeltaLog private (

val fileIndex =
TahoeLogFileIndex(spark, this, dataPath, snapshotToUse, partitionFilters, isTimeTravelQuery)
val bucketSpec: Option[BucketSpec] = ClickHouseTableV2.getTable(this).bucketOption
// --- modified start
val bucketSpec: Option[BucketSpec] =
if (ClickHouseConfig.isMergeTreeFormatEngine(snapshotToUse.metadata.configuration)) {
ClickHouseTableV2.getTable(this).bucketOption
} else {
None
}
new DeltaHadoopFsRelation(
fileIndex,
partitionSchema =
Expand All @@ -464,20 +473,28 @@ class DeltaLog private (
spark,
this
)
// --- modified end
}

override def fileFormat(metadata: Metadata = metadata): FileFormat =
ClickHouseTableV2.getTable(this).getFileFormat(metadata)

override def fileFormat(metadata: Metadata = metadata): FileFormat = {
// --- modified start
if (ClickHouseConfig.isMergeTreeFormatEngine(metadata.configuration)) {
ClickHouseTableV2.getTable(this).getFileFormat(metadata)
} else {
super.fileFormat(metadata)
}
// --- modified end
}
}

object DeltaLog extends DeltaLogging {
// --- modified start
@SuppressWarnings(Array("io.github.zhztheplayer.scalawarts.InheritFromCaseClass"))
private class DeltaHadoopFsRelation(
location: FileIndex,
partitionSchema: StructType,
// The top-level columns in `dataSchema` should match the actual physical file schema, otherwise
// the ORC data source may not work with the by-ordinal mode.
// The top-level columns in `dataSchema` should match the actual physical file schema,
// otherwise the ORC data source may not work with the by-ordinal mode.
dataSchema: StructType,
bucketSpec: Option[BucketSpec],
fileFormat: FileFormat,
Expand All @@ -502,6 +519,7 @@ object DeltaLog extends DeltaLogging {
).run(sparkSession)
}
}
// --- modified end

/**
* The key type of `DeltaLog` cache. It's a pair of the canonicalized table path and the file
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.stats.{DataSkippingReader, DeltaScan, FileSizeHistogram, StatisticsCollection}
import org.apache.spark.sql.delta.util.StateCache
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.execution.datasources.v2.clickhouse.ClickHouseConfig
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.{SerializableConfiguration, Utils}
Expand Down Expand Up @@ -404,6 +405,7 @@ class Snapshot(
s"${getClass.getSimpleName}(path=$path, version=$version, metadata=$metadata, " +
s"logSegment=$logSegment, checksumOpt=$checksumOpt)"

// --- modified start
override def filesForScan(
projection: Seq[Attribute],
filters: Seq[Expression],
Expand All @@ -418,31 +420,36 @@ class Snapshot(
}

private def replaceWithAddMergeTreeParts(deltaScan: DeltaScan) = {
DeltaScan.apply(
deltaScan.version,
deltaScan.files
.map(
addFile => {
val addFileAsKey = AddFileAsKey(addFile)

val ret = ClickhouseSnapshot.addFileToAddMTPCache.get(addFileAsKey)
// this is for later use
ClickhouseSnapshot.pathToAddMTPCache.put(ret.fullPartPath(), ret)
ret
}),
deltaScan.total,
deltaScan.partition,
deltaScan.scanned
)(
deltaScan.scannedSnapshot,
deltaScan.partitionFilters,
deltaScan.dataFilters,
deltaScan.unusedFilters,
deltaScan.projection,
deltaScan.scanDurationMs,
deltaScan.dataSkippingType
)
if (ClickHouseConfig.isMergeTreeFormatEngine(metadata.configuration)) {
DeltaScan.apply(
deltaScan.version,
deltaScan.files
.map(
addFile => {
val addFileAsKey = AddFileAsKey(addFile)

val ret = ClickhouseSnapshot.addFileToAddMTPCache.get(addFileAsKey)
// this is for later use
ClickhouseSnapshot.pathToAddMTPCache.put(ret.fullPartPath(), ret)
ret
}),
deltaScan.total,
deltaScan.partition,
deltaScan.scanned
)(
deltaScan.scannedSnapshot,
deltaScan.partitionFilters,
deltaScan.dataFilters,
deltaScan.unusedFilters,
deltaScan.projection,
deltaScan.scanDurationMs,
deltaScan.dataSkippingType
)
} else {
deltaScan
}
}
// --- modified end

logInfo(s"Created snapshot $this")
init()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ case class DeleteCommand(deltaLog: DeltaLog, target: LogicalPlan, condition: Opt
if (candidateFiles.isEmpty) {
Array.empty[String]
} else {
// --- modified start
data
.filter(new Column(cond))
.select(input_file_name().as("input_files"))
Expand All @@ -224,6 +225,7 @@ case class DeleteCommand(deltaLog: DeltaLog, target: LogicalPlan, condition: Opt
.distinct()
.as[String]
.collect()
// --- modified end
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,9 @@ case class MergeIntoCommand(
val recordTouchedFileName = udf {
(fileName: String) =>
{
// --- modified start
fileName.split(",").foreach(name => touchedFilesAccum.add(name))
// --- modified end
1
}
}.asNondeterministic()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.spark.sql.delta.schema.SchemaUtils
import org.apache.spark.sql.delta.skipping.MultiDimClustering
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.execution.command.{LeafRunnableCommand, RunnableCommand}
import org.apache.spark.sql.execution.datasources.v2.clickhouse.ClickHouseConfig
import org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMergeTreeParts
import org.apache.spark.sql.execution.datasources.v2.clickhouse.utils.CHDataSourceUtils
import org.apache.spark.sql.execution.metric.SQLMetric
Expand Down Expand Up @@ -127,8 +128,10 @@ case class OptimizeTableCommand(
override val otherCopyArgs: Seq[AnyRef] = zOrderBy :: Nil

override def run(sparkSession: SparkSession): Seq[Row] = {
// --- modified start
CHDataSourceUtils.ensureClickHouseTableV2(tableId, sparkSession)
val deltaLog = getDeltaLogClickhouse(sparkSession, path, tableId, "OPTIMIZE")
// --- modified end

val partitionColumns = deltaLog.snapshot.metadata.partitionColumns
// Parse the predicate expression into Catalyst expression and verify only simple filters
Expand Down Expand Up @@ -177,6 +180,10 @@ class OptimizeExecutor(

def optimize(): Seq[Row] = {
recordDeltaOperation(deltaLog, "delta.optimize") {
// --- modified start
val isMergeTreeFormat = ClickHouseConfig
.isMergeTreeFormatEngine(deltaLog.snapshot.metadata.configuration)
// --- modified end
val minFileSize =
sparkSession.sessionState.conf.getConf(DeltaSQLConf.DELTA_OPTIMIZE_MIN_FILE_SIZE)
val maxFileSize =
Expand All @@ -194,37 +201,59 @@ class OptimizeExecutor(

// select all files in case of multi-dimensional clustering
val filesToProcess = candidateFiles.filter(_.size < minFileSize || isMultiDimClustering)
val partitionsToCompact = filesToProcess
.groupBy(file => (file.asInstanceOf[AddMergeTreeParts].bucketNum, file.partitionValues))
.toSeq

val jobs = groupFilesIntoBinsClickhouse(partitionsToCompact, maxFileSize)

val parallelJobCollection = new ParVector(jobs.toVector)

// --- modified start
// Create a task pool to parallelize the submission of optimization jobs to Spark.
val threadPool = ThreadUtils.newForkJoinPool(
"OptimizeJob",
sparkSession.sessionState.conf.getConf(DeltaSQLConf.DELTA_OPTIMIZE_MAX_THREADS))
val (updates, jobs) = if (isMergeTreeFormat) {
val partitionsToCompact = filesToProcess
.groupBy(file => (file.asInstanceOf[AddMergeTreeParts].bucketNum, file.partitionValues))
.toSeq

val jobs = groupFilesIntoBinsClickhouse(partitionsToCompact, maxFileSize)

val parallelJobCollection = new ParVector(jobs.toVector)

val updates =
try {
val forkJoinPoolTaskSupport = new ForkJoinTaskSupport(threadPool)
parallelJobCollection.tasksupport = forkJoinPoolTaskSupport

parallelJobCollection
.flatMap(
partitionBinGroup =>
runOptimizeBinJobClickhouse(
txn,
partitionBinGroup._1._2,
partitionBinGroup._1._1,
partitionBinGroup._2,
maxFileSize))
.seq
} finally {
threadPool.shutdownNow()
}
(updates, jobs)
} else {
val partitionsToCompact = filesToProcess.groupBy(_.partitionValues).toSeq

val jobs = groupFilesIntoBins(partitionsToCompact, maxFileSize)

val updates =
try {
val parallelJobCollection = new ParVector(jobs.toVector)

val updates = try {
val forkJoinPoolTaskSupport = new ForkJoinTaskSupport(threadPool)
parallelJobCollection.tasksupport = forkJoinPoolTaskSupport

parallelJobCollection
.flatMap(
partitionBinGroup =>
runOptimizeBinJobClickhouse(
txn,
partitionBinGroup._1._2,
partitionBinGroup._1._1,
partitionBinGroup._2,
maxFileSize))
.seq
parallelJobCollection.flatMap(partitionBinGroup =>
runOptimizeBinJob(txn, partitionBinGroup._1, partitionBinGroup._2, maxFileSize)).seq
} finally {
threadPool.shutdownNow()
}
(updates, jobs)
}
// --- modified end

val addedFiles = updates.collect { case a: AddFile => a }
val removedFiles = updates.collect { case r: RemoveFile => r }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ case class UpdateCommand(
}.asNondeterministic()
val pathsToRewrite =
withStatusCode("DELTA", UpdateCommand.FINDING_TOUCHED_FILES_MSG) {
// --- modified start
data
.filter(new Column(updateCondition))
.filter(updatedRowUdf())
Expand All @@ -152,6 +153,7 @@ case class UpdateCommand(
.distinct()
.as[String]
.collect()
// --- modified end
}

scanTimeMs = (System.nanoTime() - startTime) / 1000 / 1000
Expand Down
Loading

0 comments on commit 2c89fb1

Please sign in to comment.