From 99d615ed2bbe767795ea425d3cac18ee0ae692bf Mon Sep 17 00:00:00 2001 From: Chang Chen Date: Sun, 29 Sep 2024 21:20:19 +0800 Subject: [PATCH 1/8] Broadcast ReadRel.ExtensionTable call ClickhouseMetaSerializer.forWrite at driver side org.apache.spark.sql.execution.datasources.utils.MergeTreeDeltaUtil => org.apache.spark.sql.execution.datasources.clickhouse.utils.MergeTreeDeltaUtil ClickhouseMetaSerializer.forWrite => get parameter from clickhouseTableConfigs Directly call ClickhouseMetaSerializer in CHMergeTreeWriterInjects Simplify ExtensionTableNode Minor refactor: using functional way to create collection --- .../sql/delta/catalog/ClickHouseTableV2.scala | 2 +- .../sql/delta/catalog/ClickHouseTableV2.scala | 2 +- .../org/apache/spark/sql/delta/DeltaLog.scala | 3 - .../sql/delta/catalog/ClickHouseTableV2.scala | 20 +- .../source/DeltaMergeTreeFileFormat.scala | 80 ++---- .../clickhouse/ExtensionTableBuilder.java | 45 ++-- .../clickhouse/ExtensionTableNode.java | 174 ++----------- .../clickhouse/CHIteratorApi.scala | 32 +-- .../spark/sql/delta/ClickhouseSnapshot.scala | 7 +- .../delta/catalog/ClickHouseTableV2Base.scala | 42 ++- .../commands/GlutenCHCacheDataCommand.scala | 14 +- .../clickhouse/ClickhouseMetaSerializer.scala | 240 ++++++++++++++++++ .../utils/MergeTreeDeltaUtil.scala | 23 +- .../utils/MergeTreePartsPartitionsUtil.scala | 31 +-- .../v1/CHFormatWriterInjects.scala | 20 -- .../v1/CHMergeTreeWriterInjects.scala | 91 ++++--- .../v1/clickhouse/MergeTreeOutputWriter.scala | 2 +- ...ClickhouseMergetreeSoftAffinitySuite.scala | 2 +- 18 files changed, 414 insertions(+), 416 deletions(-) create mode 100644 backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/clickhouse/ClickhouseMetaSerializer.scala rename backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/{ => clickhouse}/utils/MergeTreeDeltaUtil.scala (75%) rename backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/{ => clickhouse}/utils/MergeTreePartsPartitionsUtil.scala (96%) diff --git a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala index 90370f0b1d99..e654bbf71b7d 100644 --- a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala +++ b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.delta.actions.Metadata import org.apache.spark.sql.delta.catalog.ClickHouseTableV2.deltaLog2Table import org.apache.spark.sql.delta.sources.DeltaDataSource import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, PartitionDirectory} -import org.apache.spark.sql.execution.datasources.utils.MergeTreePartsPartitionsUtil +import org.apache.spark.sql.execution.datasources.clickhouse.utils.MergeTreePartsPartitionsUtil import org.apache.spark.sql.execution.datasources.v2.clickhouse.source.DeltaMergeTreeFileFormat import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.util.collection.BitSet diff --git a/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala b/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala index 90370f0b1d99..e654bbf71b7d 100644 --- a/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala +++ b/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.delta.actions.Metadata import org.apache.spark.sql.delta.catalog.ClickHouseTableV2.deltaLog2Table import org.apache.spark.sql.delta.sources.DeltaDataSource import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, PartitionDirectory} -import org.apache.spark.sql.execution.datasources.utils.MergeTreePartsPartitionsUtil +import org.apache.spark.sql.execution.datasources.clickhouse.utils.MergeTreePartsPartitionsUtil import org.apache.spark.sql.execution.datasources.v2.clickhouse.source.DeltaMergeTreeFileFormat import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.util.collection.BitSet diff --git a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/DeltaLog.scala b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/DeltaLog.scala index dca14d7fb1fb..bac5231309b8 100644 --- a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/DeltaLog.scala +++ b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/DeltaLog.scala @@ -784,9 +784,6 @@ object DeltaLog extends DeltaLogging { FileSourceOptions.IGNORE_CORRUPT_FILES -> "false", FileSourceOptions.IGNORE_MISSING_FILES -> "false" ) - // --- modified start - // Don't need to add the bucketOption here, it handles the delta log meta json file - // --- modified end val fsRelation = HadoopFsRelation( index, index.partitionSchema, schema, None, index.format, allOptions)(spark) LogicalRelation(fsRelation) diff --git a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala index 8b4a13a30a69..303b80ee1387 100644 --- a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala +++ b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.delta.{ClickhouseSnapshot, DeltaErrors, DeltaLog, De import org.apache.spark.sql.delta.actions.{Metadata, Protocol} import org.apache.spark.sql.delta.sources.DeltaDataSource import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, PartitionDirectory} -import org.apache.spark.sql.execution.datasources.utils.MergeTreePartsPartitionsUtil +import org.apache.spark.sql.execution.datasources.clickhouse.utils.MergeTreePartsPartitionsUtil import org.apache.spark.sql.execution.datasources.v2.clickhouse.source.DeltaMergeTreeFileFormat import org.apache.spark.sql.execution.datasources.v2.clickhouse.utils.CHDataSourceUtils import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -99,27 +99,20 @@ class ClickHouseTableV2( new DeltaMergeTreeFileFormat( protocol, meta, - dataBaseName, - tableName, ClickhouseSnapshot.genSnapshotId(initialSnapshot), - orderByKeyOption, - lowCardKeyOption, - minmaxIndexKeyOption, - bfIndexKeyOption, - setIndexKeyOption, - primaryKeyOption, + deltaLog.dataPath.toString, clickhouseTableConfigs, partitionColumns ) } - override def deltaProperties(): ju.Map[String, String] = properties() + override def deltaProperties: ju.Map[String, String] = properties() - override def deltaCatalog(): Option[CatalogTable] = catalogTable + override def deltaCatalog: Option[CatalogTable] = catalogTable - override def deltaPath(): Path = path + override def deltaPath: Path = path - override def deltaSnapshot(): Snapshot = initialSnapshot + override def deltaSnapshot: Snapshot = initialSnapshot def cacheThis(): Unit = { ClickHouseTableV2.deltaLog2Table.put(deltaLog, this) @@ -133,7 +126,6 @@ class TempClickHouseTableV2( override val spark: SparkSession, override val catalogTable: Option[CatalogTable] = None) extends ClickHouseTableV2(spark, null, catalogTable) { - import collection.JavaConverters._ override def properties(): ju.Map[String, String] = catalogTable.get.properties.asJava override lazy val partitionColumns: Seq[String] = catalogTable.get.partitionColumnNames override def cacheThis(): Unit = {} diff --git a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala index 36ab3fb6c138..2da9ea7d4fcf 100644 --- a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala +++ b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala @@ -20,57 +20,22 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.delta.DeltaParquetFileFormat import org.apache.spark.sql.delta.actions.{Metadata, Protocol} import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory} +import org.apache.spark.sql.execution.datasources.clickhouse.ClickhouseMetaSerializer import org.apache.spark.sql.execution.datasources.v1.{CHMergeTreeWriterInjects, GlutenMergeTreeWriterInjects} import org.apache.spark.sql.types.StructType import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} @SuppressWarnings(Array("io.github.zhztheplayer.scalawarts.InheritFromCaseClass")) -class DeltaMergeTreeFileFormat(protocol: Protocol, metadata: Metadata) +class DeltaMergeTreeFileFormat( + protocol: Protocol, + metadata: Metadata, + val snapshotId: String, + val deltaPath: String, + @transient val clickhouseTableConfigs: Map[String, String], + val partitionColumns: Seq[String]) extends DeltaParquetFileFormat(protocol, metadata) { - protected var database = "" - protected var tableName = "" - protected var snapshotId = "" - protected var orderByKeyOption: Option[Seq[String]] = None - protected var lowCardKeyOption: Option[Seq[String]] = None - protected var minmaxIndexKeyOption: Option[Seq[String]] = None - protected var bfIndexKeyOption: Option[Seq[String]] = None - protected var setIndexKeyOption: Option[Seq[String]] = None - protected var primaryKeyOption: Option[Seq[String]] = None - protected var partitionColumns: Seq[String] = Seq.empty[String] - protected var clickhouseTableConfigs: Map[String, String] = Map.empty - - // scalastyle:off argcount - def this( - protocol: Protocol, - metadata: Metadata, - database: String, - tableName: String, - snapshotId: String, - orderByKeyOption: Option[Seq[String]], - lowCardKeyOption: Option[Seq[String]], - minmaxIndexKeyOption: Option[Seq[String]], - bfIndexKeyOption: Option[Seq[String]], - setIndexKeyOption: Option[Seq[String]], - primaryKeyOption: Option[Seq[String]], - clickhouseTableConfigs: Map[String, String], - partitionColumns: Seq[String]) = { - this(protocol, metadata) - this.database = database - this.tableName = tableName - this.snapshotId = snapshotId - this.orderByKeyOption = orderByKeyOption - this.lowCardKeyOption = lowCardKeyOption - this.minmaxIndexKeyOption = minmaxIndexKeyOption - this.bfIndexKeyOption = bfIndexKeyOption - this.setIndexKeyOption = setIndexKeyOption - this.primaryKeyOption = primaryKeyOption - this.clickhouseTableConfigs = clickhouseTableConfigs - this.partitionColumns = partitionColumns - } - // scalastyle:on argcount - override def shortName(): String = "mergetree" override def toString(): String = "MergeTree" @@ -99,6 +64,18 @@ class DeltaMergeTreeFileFormat(protocol: Protocol, metadata: Metadata) .getInstance() .nativeConf(options, "") + val database = clickhouseTableConfigs("storage_db") + val tableName = clickhouseTableConfigs("storage_table") + val extensionTableBC = sparkSession.sparkContext.broadcast( + ClickhouseMetaSerializer + .forWrite( + snapshotId, + deltaPath, + metadata.schema, + clickhouseTableConfigs + ) + .toByteArray) + new OutputWriterFactory { override def getFileExtension(context: TaskAttemptContext): String = { ".mergetree" @@ -108,25 +85,18 @@ class DeltaMergeTreeFileFormat(protocol: Protocol, metadata: Metadata) path: String, dataSchema: StructType, context: TaskAttemptContext): OutputWriter = { + require(path == deltaPath) GlutenMergeTreeWriterInjects .getInstance() .asInstanceOf[CHMergeTreeWriterInjects] .createOutputWriter( path, - database, - tableName, - snapshotId, - orderByKeyOption, - lowCardKeyOption, - minmaxIndexKeyOption, - bfIndexKeyOption, - setIndexKeyOption, - primaryKeyOption, - partitionColumns, metadata.schema, - clickhouseTableConfigs, context, - nativeConf + nativeConf, + database, + tableName, + extensionTableBC.value ) } } diff --git a/backends-clickhouse/src/main/java/org/apache/spark/sql/execution/datasources/clickhouse/ExtensionTableBuilder.java b/backends-clickhouse/src/main/java/org/apache/spark/sql/execution/datasources/clickhouse/ExtensionTableBuilder.java index c43775d85cbd..9d6ed6868ec1 100644 --- a/backends-clickhouse/src/main/java/org/apache/spark/sql/execution/datasources/clickhouse/ExtensionTableBuilder.java +++ b/backends-clickhouse/src/main/java/org/apache/spark/sql/execution/datasources/clickhouse/ExtensionTableBuilder.java @@ -16,8 +16,6 @@ */ package org.apache.spark.sql.execution.datasources.clickhouse; -import org.apache.gluten.expression.ConverterUtils; - import java.util.List; import java.util.Map; @@ -25,8 +23,6 @@ public class ExtensionTableBuilder { private ExtensionTableBuilder() {} public static ExtensionTableNode makeExtensionTable( - Long minPartsNum, - Long maxPartsNum, String database, String tableName, String snapshotId, @@ -38,31 +34,28 @@ public static ExtensionTableNode makeExtensionTable( String bfIndexKey, String setIndexKey, String primaryKey, - List partList, - List starts, - List lengths, + ClickhousePartSerializer partSerializer, String tableSchemaJson, Map clickhouseTableConfigs, List preferredLocations) { + + String result = + ClickhouseMetaSerializer.apply( + database, + tableName, + snapshotId, + relativeTablePath, + absoluteTablePath, + orderByKey, + lowCardKey, + minmaxIndexKey, + bfIndexKey, + setIndexKey, + primaryKey, + partSerializer, + tableSchemaJson, + clickhouseTableConfigs); return new ExtensionTableNode( - minPartsNum, - maxPartsNum, - database, - tableName, - snapshotId, - relativeTablePath, - absoluteTablePath, - ConverterUtils.normalizeColName(orderByKey), - ConverterUtils.normalizeColName(lowCardKey), - ConverterUtils.normalizeColName(minmaxIndexKey), - ConverterUtils.normalizeColName(bfIndexKey), - ConverterUtils.normalizeColName(setIndexKey), - ConverterUtils.normalizeColName(primaryKey), - partList, - starts, - lengths, - tableSchemaJson, - clickhouseTableConfigs, - preferredLocations); + preferredLocations, result, partSerializer.pathList(absoluteTablePath)); } } diff --git a/backends-clickhouse/src/main/java/org/apache/spark/sql/execution/datasources/clickhouse/ExtensionTableNode.java b/backends-clickhouse/src/main/java/org/apache/spark/sql/execution/datasources/clickhouse/ExtensionTableNode.java index 69629c8a09af..bb04652be440 100644 --- a/backends-clickhouse/src/main/java/org/apache/spark/sql/execution/datasources/clickhouse/ExtensionTableNode.java +++ b/backends-clickhouse/src/main/java/org/apache/spark/sql/execution/datasources/clickhouse/ExtensionTableNode.java @@ -19,158 +19,25 @@ import org.apache.gluten.backendsapi.BackendsApiManager; import org.apache.gluten.substrait.rel.SplitInfo; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.protobuf.StringValue; import io.substrait.proto.ReadRel; -import java.net.URI; import java.util.ArrayList; import java.util.List; -import java.util.Map; public class ExtensionTableNode implements SplitInfo { - private static final String MERGE_TREE = "MergeTree;"; - private Long minPartsNum; - private Long maxPartsNum; - private String database; - private String tableName; - private String snapshotId; - private String relativePath; - private String absolutePath; - private String tableSchemaJson; - private StringBuffer extensionTableStr = new StringBuffer(MERGE_TREE); - private StringBuffer partPathList = new StringBuffer(""); private final List preferredLocations = new ArrayList<>(); - - private String orderByKey; - - private String primaryKey; - - private String lowCardKey; - private String minmaxIndexKey; - private String bfIndexKey; - private String setIndexKey; - - private List partList; - private List starts; - private List lengths; - - private Map clickhouseTableConfigs; + private final String serializerResult; + private final scala.collection.Seq pathList; ExtensionTableNode( - Long minPartsNum, - Long maxPartsNum, - String database, - String tableName, - String snapshotId, - String relativePath, - String absolutePath, - String orderByKey, - String lowCardKey, - String minmaxIndexKey, - String bfIndexKey, - String setIndexKey, - String primaryKey, - List partList, - List starts, - List lengths, - String tableSchemaJson, - Map clickhouseTableConfigs, - List preferredLocations) { - this.minPartsNum = minPartsNum; - this.maxPartsNum = maxPartsNum; - this.database = database; - this.tableName = tableName; - this.snapshotId = snapshotId; - URI table_uri = URI.create(relativePath); - if (table_uri.getPath().startsWith("/")) { // file:///tmp/xxx => tmp/xxx - this.relativePath = table_uri.getPath().substring(1); - } else { - this.relativePath = table_uri.getPath(); - } - this.absolutePath = absolutePath; - this.tableSchemaJson = tableSchemaJson; - this.orderByKey = orderByKey; - this.lowCardKey = lowCardKey; - this.minmaxIndexKey = minmaxIndexKey; - this.bfIndexKey = bfIndexKey; - this.setIndexKey = setIndexKey; - this.primaryKey = primaryKey; - this.partList = partList; - this.starts = starts; - this.lengths = lengths; - this.clickhouseTableConfigs = clickhouseTableConfigs; + List preferredLocations, + String serializerResult, + scala.collection.Seq pathList) { + this.pathList = pathList; this.preferredLocations.addAll(preferredLocations); - - // New: MergeTree;{database}\n{table}\n{orderByKey}\n{primaryKey}\n{relative_path}\n - // {part_path1}\n{part_path2}\n... - long end = 0; - for (int i = 0; i < this.partList.size(); i++) { - end = this.starts.get(i) + this.lengths.get(i); - partPathList - .append(this.partList.get(i)) - .append("\n") - .append(this.starts.get(i)) - .append("\n") - .append(end) - .append("\n"); - } - - extensionTableStr - .append(this.database) - .append("\n") - .append(this.tableName) - .append("\n") - .append(this.snapshotId) - .append("\n") - .append(this.tableSchemaJson) - .append("\n") - .append(this.orderByKey) - .append("\n"); - - if (!this.orderByKey.isEmpty() && !this.orderByKey.equals("tuple()")) { - extensionTableStr.append(this.primaryKey).append("\n"); - } - extensionTableStr.append(this.lowCardKey).append("\n"); - extensionTableStr.append(this.minmaxIndexKey).append("\n"); - extensionTableStr.append(this.bfIndexKey).append("\n"); - extensionTableStr.append(this.setIndexKey).append("\n"); - extensionTableStr.append(this.relativePath).append("\n"); - extensionTableStr.append(this.absolutePath).append("\n"); - - if (this.clickhouseTableConfigs != null && !this.clickhouseTableConfigs.isEmpty()) { - ObjectMapper objectMapper = new ObjectMapper(); - try { - String clickhouseTableConfigsJson = - objectMapper - .writeValueAsString(this.clickhouseTableConfigs) - .replaceAll("\\\n", "") - .replaceAll(" ", ""); - extensionTableStr.append(clickhouseTableConfigsJson).append("\n"); - } catch (Exception e) { - extensionTableStr.append("").append("\n"); - } - } else { - extensionTableStr.append("").append("\n"); - } - extensionTableStr.append(partPathList); - /* old format - if (!this.partList.isEmpty()) { - } else { - // Old: MergeTree;{database}\n{table}\n{relative_path}\n{min_part}\n{max_part}\n - extensionTableStr - .append(database) - .append("\n") - .append(tableName) - .append("\n") - .append(relativePath) - .append("\n") - .append(this.minPartsNum) - .append("\n") - .append(this.maxPartsNum) - .append("\n"); - } */ + this.serializerResult = serializerResult; } @Override @@ -180,27 +47,22 @@ public List preferredLocations() { @Override public ReadRel.ExtensionTable toProtobuf() { - ReadRel.ExtensionTable.Builder extensionTableBuilder = ReadRel.ExtensionTable.newBuilder(); - StringValue extensionTable = - StringValue.newBuilder().setValue(extensionTableStr.toString()).build(); - extensionTableBuilder.setDetail( - BackendsApiManager.getTransformerApiInstance().packPBMessage(extensionTable)); - return extensionTableBuilder.build(); + return toProtobuf(serializerResult); } - public String getRelativePath() { - return relativePath; + public scala.collection.Seq getPartList() { + return pathList; } - public String getAbsolutePath() { - return absolutePath; - } - - public List getPartList() { - return partList; + public String getExtensionTableStr() { + return serializerResult; } - public String getExtensionTableStr() { - return extensionTableStr.toString(); + public static ReadRel.ExtensionTable toProtobuf(String result) { + ReadRel.ExtensionTable.Builder extensionTableBuilder = ReadRel.ExtensionTable.newBuilder(); + StringValue extensionTable = StringValue.newBuilder().setValue(result).build(); + extensionTableBuilder.setDetail( + BackendsApiManager.getTransformerApiInstance().packPBMessage(extensionTable)); + return extensionTableBuilder.build(); } } diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala index 9585d9d5f235..0a3dbc3f5a37 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala @@ -36,7 +36,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.shuffle.CHColumnarShuffleWriter import org.apache.spark.sql.connector.read.InputPartition import org.apache.spark.sql.execution.datasources.FilePartition -import org.apache.spark.sql.execution.datasources.clickhouse.{ExtensionTableBuilder, ExtensionTableNode} +import org.apache.spark.sql.execution.datasources.clickhouse.{ClickhousePartSerializer, ExtensionTableBuilder, ExtensionTableNode} import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.types.{StructField, StructType} import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper @@ -131,20 +131,8 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil { properties: Map[String, String]): SplitInfo = { partition match { case p: GlutenMergeTreePartition => - val partLists = new JArrayList[String]() - val starts = new JArrayList[JLong]() - val lengths = new JArrayList[JLong]() - p.partList - .foreach( - parts => { - partLists.add(parts.name) - starts.add(parts.start) - lengths.add(parts.length) - }) ExtensionTableBuilder .makeExtensionTable( - -1L, - -1L, p.database, p.table, p.snapshotId, @@ -156,9 +144,7 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil { p.bfIndexKey, p.setIndexKey, p.primaryKey, - partLists, - starts, - lengths, + ClickhousePartSerializer.fromMergeTreePartSplits(p.partList.toSeq), p.tableSchemaJson, p.clickhouseTableConfigs.asJava, CHAffinity.getNativeMergeTreePartitionLocations(p).toList.asJava @@ -222,27 +208,23 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil { val planByteArray = wsCtx.root.toProtobuf.toByteArray splitInfos.zipWithIndex.map { case (splits, index) => - val files = ArrayBuffer[String]() - val splitInfosByteArray = splits.zipWithIndex.map { + val (splitInfosByteArray, files) = splits.zipWithIndex.map { case (split, i) => split match { case filesNode: LocalFilesNode => setFileSchemaForLocalFiles(filesNode, scans(i)) - filesNode.getPaths.forEach(f => files += f) - filesNode.toProtobuf.toByteArray + (filesNode.toProtobuf.toByteArray, filesNode.getPaths.asScala.toSeq) case extensionTableNode: ExtensionTableNode => - extensionTableNode.getPartList.forEach( - name => files += extensionTableNode.getAbsolutePath + "/" + name) - extensionTableNode.toProtobuf.toByteArray + (extensionTableNode.toProtobuf.toByteArray, extensionTableNode.getPartList) } - } + }.unzip GlutenPartition( index, planByteArray, splitInfosByteArray.toArray, locations = splits.flatMap(_.preferredLocations().asScala).toArray, - files.toArray + files.flatten.toArray ) } } diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/ClickhouseSnapshot.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/ClickhouseSnapshot.scala index f28e9e8fe468..e3f643046671 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/ClickhouseSnapshot.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/ClickhouseSnapshot.scala @@ -125,11 +125,6 @@ object ClickhouseSnapshot { // use timestamp + version as the snapshot id for ch backend def genSnapshotId(snapshot: Snapshot): String = { // When CTAS, there is no latest timestamp in the Snapshot - val ts = if (snapshot.metadata.createdTime.isDefined) { - snapshot.metadata.createdTime.get - } else { - System.currentTimeMillis() - } - ts.toString + "_" + snapshot.version.toString + s"${snapshot.metadata.createdTime.getOrElse(System.currentTimeMillis())}_${snapshot.version}" } } diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2Base.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2Base.scala index 4d01c3798d51..8321db6f4efb 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2Base.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2Base.scala @@ -16,12 +16,11 @@ */ package org.apache.spark.sql.delta.catalog -import org.apache.gluten.expression.ConverterUtils import org.apache.gluten.expression.ConverterUtils.normalizeColName import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable} import org.apache.spark.sql.delta.Snapshot -import org.apache.spark.sql.execution.datasources.utils.MergeTreeDeltaUtil +import org.apache.spark.sql.execution.datasources.clickhouse.utils.MergeTreeDeltaUtil import org.apache.hadoop.fs.Path @@ -31,19 +30,20 @@ trait ClickHouseTableV2Base { val DEFAULT_DATABASE = "clickhouse_db" - def deltaProperties(): ju.Map[String, String] + def deltaProperties: ju.Map[String, String] - def deltaCatalog(): Option[CatalogTable] - def deltaPath(): Path + def deltaCatalog: Option[CatalogTable] - def deltaSnapshot(): Snapshot + def deltaPath: Path - lazy val dataBaseName = deltaCatalog + def deltaSnapshot: Snapshot + + lazy val dataBaseName: String = deltaCatalog .map(_.identifier.database.getOrElse("default")) .getOrElse(DEFAULT_DATABASE) - lazy val tableName = deltaCatalog + lazy val tableName: String = deltaCatalog .map(_.identifier.table) .getOrElse(deltaPath.toUri.getPath) @@ -84,7 +84,7 @@ trait ClickHouseTableV2Base { val keys = tableProperties .get(keyName) .split(",") - .map(n => ConverterUtils.normalizeColName(n.trim)) + .map(n => normalizeColName(n.trim)) .toSeq keys.foreach( s => { @@ -143,13 +143,27 @@ trait ClickHouseTableV2Base { } } - lazy val partitionColumns = deltaSnapshot.metadata.partitionColumns.map(normalizeColName).toSeq + lazy val partitionColumns: Seq[String] = + deltaSnapshot.metadata.partitionColumns.map(normalizeColName).toSeq lazy val clickhouseTableConfigs: Map[String, String] = { - val tableProperties = deltaProperties() - val configs = scala.collection.mutable.Map[String, String]() - configs += ("storage_policy" -> tableProperties.getOrDefault("storage_policy", "default")) - configs.toMap + val (orderByKey0, primaryKey0) = MergeTreeDeltaUtil.genOrderByAndPrimaryKeyStr( + orderByKeyOption, + primaryKeyOption + ) + Map( + "storage_policy" -> deltaProperties.getOrDefault("storage_policy", "default"), + "storage_db" -> dataBaseName, + "storage_table" -> tableName, + "storage_orderByKey" -> orderByKey0, + "storage_lowCardKey" -> lowCardKeyOption.map(MergeTreeDeltaUtil.columnsToStr).getOrElse(""), + "storage_minmaxIndexKey" -> minmaxIndexKeyOption + .map(MergeTreeDeltaUtil.columnsToStr) + .getOrElse(""), + "storage_bfIndexKey" -> bfIndexKeyOption.map(MergeTreeDeltaUtil.columnsToStr).getOrElse(""), + "storage_setIndexKey" -> setIndexKeyOption.map(MergeTreeDeltaUtil.columnsToStr).getOrElse(""), + "storage_primaryKey" -> primaryKey0 + ) } def primaryKey(): String = MergeTreeDeltaUtil.columnsToStr(primaryKeyOption) diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala index 5337e4d31388..341018fb590b 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala @@ -27,8 +27,8 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, import org.apache.spark.sql.delta._ import org.apache.spark.sql.execution.command.LeafRunnableCommand import org.apache.spark.sql.execution.commands.GlutenCacheBase._ -import org.apache.spark.sql.execution.datasources.clickhouse.ExtensionTableBuilder -import org.apache.spark.sql.execution.datasources.utils.MergeTreeDeltaUtil +import org.apache.spark.sql.execution.datasources.clickhouse.{ClickhousePartSerializer, ExtensionTableBuilder} +import org.apache.spark.sql.execution.datasources.clickhouse.utils.MergeTreeDeltaUtil import org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMergeTreeParts import org.apache.spark.sql.types.{BooleanType, StringType} @@ -169,13 +169,7 @@ case class GlutenCHCacheDataCommand( val executorId = value._1 if (parts.nonEmpty) { val onePart = parts(0) - val partNameList = parts.map(_.name).toSeq - // starts and lengths is useless for write - val partRanges = Seq.range(0L, partNameList.length).map(_ => long2Long(0L)).asJava - val extensionTableNode = ExtensionTableBuilder.makeExtensionTable( - -1, - -1, onePart.database, onePart.table, ClickhouseSnapshot.genSnapshotId(snapshot), @@ -188,9 +182,7 @@ case class GlutenCHCacheDataCommand( snapshot.metadata.configuration.getOrElse("bloomfilterIndexKey", ""), snapshot.metadata.configuration.getOrElse("setIndexKey", ""), snapshot.metadata.configuration.getOrElse("primaryKey", ""), - partNameList.asJava, - partRanges, - partRanges, + ClickhousePartSerializer.fromPartNames(parts.map(_.name).toSeq), ConverterUtils.convertNamedStructJson(snapshot.metadata.schema), snapshot.metadata.configuration.asJava, new JList[String]() diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/clickhouse/ClickhouseMetaSerializer.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/clickhouse/ClickhouseMetaSerializer.scala new file mode 100644 index 000000000000..99c03e2adf78 --- /dev/null +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/clickhouse/ClickhouseMetaSerializer.scala @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.clickhouse + +import org.apache.gluten.execution.MergeTreePartSplit +import org.apache.gluten.expression.ConverterUtils + +import org.apache.spark.sql.execution.datasources.clickhouse.utils.MergeTreeDeltaUtil +import org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMergeTreeParts +import org.apache.spark.sql.types.StructType + +import com.fasterxml.jackson.databind.ObjectMapper +import io.substrait.proto.ReadRel + +import java.net.URI +import java.util.{Map => jMap} + +import scala.collection.JavaConverters._ + +case class ClickhousePartSerializer( + partList: Seq[String], + starts: Seq[Long], + lengths: Seq[Long] +) { + def apply(): StringBuilder = { + val partPathList = new StringBuilder + for (i <- partList.indices) { + val end = starts(i) + lengths(i) + partPathList + .append(partList(i)) + .append("\n") + .append(starts(i)) + .append("\n") + .append(end) + .append("\n") + } + partPathList + } + + // TODO: remove pathList + def pathList(absolutePath: String): Seq[String] = { + partList.map(name => absolutePath + "/" + name) + } +} + +object ClickhousePartSerializer { + def fromMergeTreePartSplits(partLists: Seq[MergeTreePartSplit]): ClickhousePartSerializer = { + val partList = partLists.map(_.name) + val starts = partLists.map(_.start) + val lengths = partLists.map(_.length) + ClickhousePartSerializer(partList, starts, lengths) + } + + def fromAddMergeTreeParts(parts: Seq[AddMergeTreeParts]): ClickhousePartSerializer = { + val partList = parts.map(_.name) + val starts = parts.map(_ => 0L) + val lengths = parts.map(_.marks) + ClickhousePartSerializer(partList, starts, lengths) + } + + def fromPartNames(partNames: Seq[String]): ClickhousePartSerializer = { + // starts and lengths is useless for writing + val partRanges = Seq.range(0L, partNames.length) + ClickhousePartSerializer(partNames, partRanges, partRanges) + } +} + +object ClickhouseMetaSerializer { + private val MERGE_TREE = "MergeTree;" + + def forWrite( + snapshotId: String, + path: String, + dataSchema: StructType, + clickhouseTableConfigs: Map[String, String]): ReadRel.ExtensionTable = { + + val database = clickhouseTableConfigs("storage_db") + val tableName = clickhouseTableConfigs("storage_table") + val orderByKey = clickhouseTableConfigs("storage_orderByKey") + val lowCardKey = clickhouseTableConfigs("storage_lowCardKey") + val minmaxIndexKey = clickhouseTableConfigs("storage_minmaxIndexKey") + val bfIndexKey = clickhouseTableConfigs("storage_bfIndexKey") + val setIndexKey = clickhouseTableConfigs("storage_setIndexKey") + val primaryKey = clickhouseTableConfigs("storage_primaryKey") + + val result = apply( + database, + tableName, + snapshotId, + path, + "", + orderByKey, + lowCardKey, + minmaxIndexKey, + bfIndexKey, + setIndexKey, + primaryKey, + ClickhousePartSerializer.fromPartNames(Seq()), + ConverterUtils.convertNamedStructJson(dataSchema), + clickhouseTableConfigs.filter(_._1 == "storage_policy").asJava + ) + ExtensionTableNode.toProtobuf(result) + + } + // scalastyle:off argcount + def apply1( + database: String, + tableName: String, + snapshotId: String, + relativePath: String, + absolutePath: String, + orderByKeyOption: Option[Seq[String]], + lowCardKeyOption: Option[Seq[String]], + minmaxIndexKeyOption: Option[Seq[String]], + bfIndexKeyOption: Option[Seq[String]], + setIndexKeyOption: Option[Seq[String]], + primaryKeyOption: Option[Seq[String]], + partSerializer: ClickhousePartSerializer, + tableSchemaJson: String, + clickhouseTableConfigs: jMap[String, String]): ReadRel.ExtensionTable = { + + val (orderByKey0, primaryKey0) = MergeTreeDeltaUtil.genOrderByAndPrimaryKeyStr( + orderByKeyOption, + primaryKeyOption + ) + + val result = apply( + database, + tableName, + snapshotId, + relativePath, + absolutePath, + orderByKey0, + lowCardKeyOption.map(MergeTreeDeltaUtil.columnsToStr).getOrElse(""), + minmaxIndexKeyOption.map(MergeTreeDeltaUtil.columnsToStr).getOrElse(""), + bfIndexKeyOption.map(MergeTreeDeltaUtil.columnsToStr).getOrElse(""), + setIndexKeyOption.map(MergeTreeDeltaUtil.columnsToStr).getOrElse(""), + primaryKey0, + partSerializer, + tableSchemaJson, + clickhouseTableConfigs + ) + ExtensionTableNode.toProtobuf(result) + } + + def apply( + database: String, + tableName: String, + snapshotId: String, + relativePath: String, + absolutePath: String, + orderByKey0: String, + lowCardKey0: String, + minmaxIndexKey0: String, + bfIndexKey0: String, + setIndexKey0: String, + primaryKey0: String, + partSerializer: ClickhousePartSerializer, + tableSchemaJson: String, + clickhouseTableConfigs: jMap[String, String]): String = { + // scalastyle:on argcount + + // New: MergeTree;{database}\n{table}\n{orderByKey}\n{primaryKey}\n{relative_path}\n + // {part_path1}\n{part_path2}\n... + val extensionTableStr = new StringBuilder(MERGE_TREE) + + val orderByKey = ConverterUtils.normalizeColName(orderByKey0) + val lowCardKey = ConverterUtils.normalizeColName(lowCardKey0) + val minmaxIndexKey = ConverterUtils.normalizeColName(minmaxIndexKey0) + val bfIndexKey = ConverterUtils.normalizeColName(bfIndexKey0) + val setIndexKey = ConverterUtils.normalizeColName(setIndexKey0) + val primaryKey = ConverterUtils.normalizeColName(primaryKey0) + + extensionTableStr + .append(database) + .append("\n") + .append(tableName) + .append("\n") + .append(snapshotId) + .append("\n") + .append(tableSchemaJson) + .append("\n") + .append(orderByKey) + .append("\n") + + if (orderByKey.nonEmpty && !(orderByKey == "tuple()")) { + extensionTableStr.append(primaryKey).append("\n") + } + + extensionTableStr.append(lowCardKey).append("\n") + extensionTableStr.append(minmaxIndexKey).append("\n") + extensionTableStr.append(bfIndexKey).append("\n") + extensionTableStr.append(setIndexKey).append("\n") + extensionTableStr.append(normalizeRelativePath(relativePath)).append("\n") + extensionTableStr.append(absolutePath).append("\n") + appendConfigs(extensionTableStr, clickhouseTableConfigs) + extensionTableStr.append(partSerializer()) + + extensionTableStr.toString() + } + + private def normalizeRelativePath(relativePath: String): String = { + val table_uri = URI.create(relativePath) + if (table_uri.getPath.startsWith("/")) { + table_uri.getPath.substring(1) + } else table_uri.getPath + } + + private def appendConfigs( + extensionTableStr: StringBuilder, + clickhouseTableConfigs: jMap[String, String]): Unit = { + if (clickhouseTableConfigs != null && !clickhouseTableConfigs.isEmpty) { + val objectMapper: ObjectMapper = new ObjectMapper + try { + val clickhouseTableConfigsJson: String = objectMapper + .writeValueAsString(clickhouseTableConfigs) + .replaceAll("\n", "") + .replaceAll(" ", "") + extensionTableStr.append(clickhouseTableConfigsJson).append("\n") + } catch { + case e: Exception => + extensionTableStr.append("\n") + } + } else extensionTableStr.append("\n") + } +} diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/utils/MergeTreeDeltaUtil.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/clickhouse/utils/MergeTreeDeltaUtil.scala similarity index 75% rename from backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/utils/MergeTreeDeltaUtil.scala rename to backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/clickhouse/utils/MergeTreeDeltaUtil.scala index 6b2af0953f00..854c6f91c917 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/utils/MergeTreeDeltaUtil.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/clickhouse/utils/MergeTreeDeltaUtil.scala @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.spark.sql.execution.datasources.utils +package org.apache.spark.sql.execution.datasources.clickhouse.utils import org.apache.gluten.expression.ConverterUtils.normalizeColName @@ -25,18 +25,13 @@ object MergeTreeDeltaUtil { def genOrderByAndPrimaryKeyStr( orderByKeyOption: Option[Seq[String]], primaryKeyOption: Option[Seq[String]]): (String, String) = { + val orderByKey = - if (orderByKeyOption.isDefined && orderByKeyOption.get.nonEmpty) { - columnsToStr(orderByKeyOption) - } else DEFAULT_ORDER_BY_KEY - - val primaryKey = - if ( - !orderByKey.equals(DEFAULT_ORDER_BY_KEY) && primaryKeyOption.isDefined && - primaryKeyOption.get.nonEmpty - ) { - columnsToStr(primaryKeyOption) - } else "" + orderByKeyOption.filter(_.nonEmpty).map(columnsToStr).getOrElse(DEFAULT_ORDER_BY_KEY) + val primaryKey = primaryKeyOption + .filter(p => orderByKey != DEFAULT_ORDER_BY_KEY && p.nonEmpty) + .map(columnsToStr) + .getOrElse("") (orderByKey, primaryKey) } @@ -45,4 +40,8 @@ object MergeTreeDeltaUtil { case Some(keys) => keys.map(normalizeColName).mkString(",") case None => "" } + + def columnsToStr(keys: Seq[String]): String = { + keys.map(normalizeColName).mkString(",") + } } diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/utils/MergeTreePartsPartitionsUtil.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/clickhouse/utils/MergeTreePartsPartitionsUtil.scala similarity index 96% rename from backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/utils/MergeTreePartsPartitionsUtil.scala rename to backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/clickhouse/utils/MergeTreePartsPartitionsUtil.scala index c2eb338261fc..ee53d1c4168f 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/utils/MergeTreePartsPartitionsUtil.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/clickhouse/utils/MergeTreePartsPartitionsUtil.scala @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.spark.sql.execution.datasources.utils +package org.apache.spark.sql.execution.datasources.clickhouse.utils import org.apache.gluten.backendsapi.clickhouse.{CHBackendSettings, CHConf} import org.apache.gluten.execution.{GlutenMergeTreePartition, MergeTreePartRange, MergeTreePartSplit} @@ -35,7 +35,7 @@ import org.apache.spark.sql.delta.ClickhouseSnapshot import org.apache.spark.sql.delta.catalog.ClickHouseTableV2 import org.apache.spark.sql.delta.files.TahoeFileIndex import org.apache.spark.sql.execution.datasources.{CHDatasourceJniWrapper, HadoopFsRelation, PartitionDirectory} -import org.apache.spark.sql.execution.datasources.clickhouse.{ExtensionTableBuilder, MergeTreePartFilterReturnedRange} +import org.apache.spark.sql.execution.datasources.clickhouse.{ClickhousePartSerializer, ExtensionTableBuilder, MergeTreePartFilterReturnedRange} import org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMergeTreeParts import org.apache.spark.sql.execution.datasources.v2.clickhouse.source.DeltaMergeTreeFileFormat import org.apache.spark.sql.types.BooleanType @@ -48,7 +48,6 @@ import com.google.protobuf.{Any, StringValue} import io.substrait.proto.NamedStruct import io.substrait.proto.Plan -import java.lang.{Long => JLong} import java.util import java.util.{ArrayList => JArrayList} @@ -78,7 +77,8 @@ object MergeTreePartsPartitionsUtil extends Logging { val fileIndex = relation.location.asInstanceOf[TahoeFileIndex] // when querying, use deltaLog.update(true) to get the staleness acceptable snapshot - val snapshotId = ClickhouseSnapshot.genSnapshotId(table.deltaLog.update(true)) + val snapshotId = + ClickhouseSnapshot.genSnapshotId(table.deltaLog.update(stalenessAcceptable = true)) val partitions = new ArrayBuffer[InputPartition] val (database, tableName) = if (table.catalogTable.isDefined) { @@ -457,9 +457,10 @@ object MergeTreePartsPartitionsUtil extends Logging { val ret = ClickhouseSnapshot.pathToAddMTPCache.getIfPresent(path) if (ret == null) { val keys = ClickhouseSnapshot.pathToAddMTPCache.asMap().keySet() - val keySample = keys.isEmpty() match { - case true => "" - case false => keys.iterator().next() + val keySample = if (keys.isEmpty) { + "" + } else { + keys.iterator().next() } throw new IllegalStateException( "Can't find AddMergeTreeParts from cache pathToAddMTPCache for key: " + @@ -574,20 +575,8 @@ object MergeTreePartsPartitionsUtil extends Logging { case (l1, l2) => l1.sum / l2.sum } - val partLists = new JArrayList[String]() - val starts = new JArrayList[JLong]() - val lengths = new JArrayList[JLong]() - selectPartsFiles.foreach( - part => { - partLists.add(part.name) - starts.add(0) - lengths.add(part.marks) - }) - val extensionTableNode = ExtensionTableBuilder .makeExtensionTable( - -1L, - -1L, database, tableName, snapshotId, @@ -599,9 +588,7 @@ object MergeTreePartsPartitionsUtil extends Logging { table.bfIndexKey(), table.setIndexKey(), table.primaryKey(), - partLists, - starts, - lengths, + ClickhousePartSerializer.fromAddMergeTreeParts(selectPartsFiles), tableSchemaJson, clickhouseTableConfigs.asJava, new JArrayList[String]() diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHFormatWriterInjects.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHFormatWriterInjects.scala index 36b2a44d13b4..69c001e461d8 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHFormatWriterInjects.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHFormatWriterInjects.scala @@ -83,26 +83,6 @@ trait CHFormatWriterInjects extends GlutenFormatWriterInjectsBase { // TODO: parquet and mergetree OrcUtils.inferSchema(sparkSession, files, options) } - - // scalastyle:off argcount - /** For CH MergeTree format */ - def createOutputWriter( - path: String, - database: String, - tableName: String, - snapshotId: String, - orderByKeyOption: Option[Seq[String]], - lowCardKeyOption: Option[Seq[String]], - minmaxIndexKeyOption: Option[Seq[String]], - bfIndexKeyOption: Option[Seq[String]], - setIndexKeyOption: Option[Seq[String]], - primaryKeyOption: Option[Seq[String]], - partitionColumns: Seq[String], - tableSchema: StructType, - clickhouseTableConfigs: Map[String, String], - context: TaskAttemptContext, - nativeConf: java.util.Map[String, String]): OutputWriter = null - // scalastyle:on argcount } class CHRowSplitter extends GlutenRowSplitter { diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHMergeTreeWriterInjects.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHMergeTreeWriterInjects.scala index cb7dfa8f3835..835d9bb1ba28 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHMergeTreeWriterInjects.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHMergeTreeWriterInjects.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.execution.datasources.v1 import org.apache.gluten.expression.ConverterUtils -import org.apache.gluten.sql.shims.SparkShimLoader import org.apache.gluten.substrait.`type`.ColumnTypeNode import org.apache.gluten.substrait.SubstraitContext import org.apache.gluten.substrait.extensions.ExtensionBuilder @@ -27,8 +26,7 @@ import org.apache.gluten.utils.ConfigUtil import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.execution.datasources.{CHDatasourceJniWrapper, OutputWriter} -import org.apache.spark.sql.execution.datasources.clickhouse.ExtensionTableBuilder -import org.apache.spark.sql.execution.datasources.utils.MergeTreeDeltaUtil +import org.apache.spark.sql.execution.datasources.clickhouse.{ClickhouseMetaSerializer, ClickhousePartSerializer} import org.apache.spark.sql.execution.datasources.v1.clickhouse.MergeTreeOutputWriter import org.apache.spark.sql.types.StructType @@ -37,7 +35,7 @@ import com.google.protobuf.{Any, StringValue} import io.substrait.proto.NamedStruct import org.apache.hadoop.mapreduce.TaskAttemptContext -import java.util.{ArrayList => JList, Map => JMap, UUID} +import java.util.{Map => JMap, UUID} import scala.collection.JavaConverters._ @@ -58,7 +56,7 @@ class CHMergeTreeWriterInjects extends CHFormatWriterInjects { nativeConf: JMap[String, String]): OutputWriter = null // scalastyle:off argcount - override def createOutputWriter( + def createOutputWriter( path: String, database: String, tableName: String, @@ -75,32 +73,49 @@ class CHMergeTreeWriterInjects extends CHFormatWriterInjects { context: TaskAttemptContext, nativeConf: JMap[String, String]): OutputWriter = { - val uuid = UUID.randomUUID.toString - - val planWithSplitInfo = CHMergeTreeWriterInjects.genMergeTreeWriteRel( - path, + val extensionTable = ClickhouseMetaSerializer.apply1( database, tableName, snapshotId, + path, + "", orderByKeyOption, lowCardKeyOption, minmaxIndexKeyOption, bfIndexKeyOption, setIndexKeyOption, primaryKeyOption, - partitionColumns, - Seq(), + ClickhousePartSerializer.fromPartNames(Seq()), ConverterUtils.convertNamedStructJson(tableSchema), - clickhouseTableConfigs, - // use table schema instead of data schema - SparkShimLoader.getSparkShims.attributesFromStruct(tableSchema) + clickhouseTableConfigs.asJava ) + createOutputWriter( + path, + tableSchema, + context, + nativeConf, + database, + tableName, + extensionTable.toByteArray) + } + // scalastyle:on argcount + + override val formatName: String = "mergetree" + + def createOutputWriter( + path: String, + dataSchema: StructType, + context: TaskAttemptContext, + nativeConf: JMap[String, String], + database: String, + tableName: String, + splitInfo: Array[Byte]): OutputWriter = { val datasourceJniWrapper = new CHDatasourceJniWrapper() val instance = datasourceJniWrapper.nativeInitMergeTreeWriterWrapper( - planWithSplitInfo.plan, - planWithSplitInfo.splitInfo, - uuid, + null, + splitInfo, + UUID.randomUUID.toString, context.getTaskAttemptID.getTaskID.getId.toString, context.getConfiguration.get("mapreduce.task.gluten.mergetree.partition.dir"), context.getConfiguration.get("mapreduce.task.gluten.mergetree.bucketid.str"), @@ -109,10 +124,6 @@ class CHMergeTreeWriterInjects extends CHFormatWriterInjects { new MergeTreeOutputWriter(database, tableName, datasourceJniWrapper, instance, path) } - // scalastyle:on argcount - - override val formatName: String = "mergetree" - } object CHMergeTreeWriterInjects { @@ -146,39 +157,23 @@ object CHMergeTreeWriterInjects { } }.asJava - val (orderByKey, primaryKey) = MergeTreeDeltaUtil.genOrderByAndPrimaryKeyStr( - orderByKeyOption, - primaryKeyOption - ) - - val lowCardKey = MergeTreeDeltaUtil.columnsToStr(lowCardKeyOption) - val minmaxIndexKey = MergeTreeDeltaUtil.columnsToStr(minmaxIndexKeyOption) - val bfIndexKey = MergeTreeDeltaUtil.columnsToStr(bfIndexKeyOption) - val setIndexKey = MergeTreeDeltaUtil.columnsToStr(setIndexKeyOption) - val substraitContext = new SubstraitContext - val extensionTableNode = ExtensionTableBuilder.makeExtensionTable( - -1, - -1, + + val extensionTable = ClickhouseMetaSerializer.apply1( database, tableName, snapshotId, path, "", - orderByKey, - lowCardKey, - minmaxIndexKey, - bfIndexKey, - setIndexKey, - primaryKey, - scala.collection.JavaConverters.seqAsJavaList(partList), - scala.collection.JavaConverters.seqAsJavaList( - Seq.range(0L, partList.length).map(long2Long) - ), // starts and lengths is useless for write - scala.collection.JavaConverters.seqAsJavaList(Seq.range(0L, partList.length).map(long2Long)), + orderByKeyOption, + lowCardKeyOption, + minmaxIndexKeyOption, + bfIndexKeyOption, + setIndexKeyOption, + primaryKeyOption, + ClickhousePartSerializer.fromPartNames(partList), tableSchemaJson, - clickhouseTableConfigs.asJava, - new JList[String]() + clickhouseTableConfigs.asJava ) val optimizationContent = "isMergeTree=1\n" @@ -197,6 +192,6 @@ object CHMergeTreeWriterInjects { val plan = PlanBuilder.makePlan(substraitContext, Lists.newArrayList(relNode), nameList).toProtobuf - PlanWithSplitInfo(plan.toByteArray, extensionTableNode.toProtobuf.toByteArray) + PlanWithSplitInfo(plan.toByteArray, extensionTable.toByteArray) } } diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/clickhouse/MergeTreeOutputWriter.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/clickhouse/MergeTreeOutputWriter.scala index 06e2b98bc8b2..52593d7c1795 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/clickhouse/MergeTreeOutputWriter.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/clickhouse/MergeTreeOutputWriter.scala @@ -43,7 +43,7 @@ class MergeTreeOutputWriter( if (nextBatch.numRows > 0) { val col = nextBatch.column(0).asInstanceOf[CHColumnVector] datasourceJniWrapper.writeToMergeTree(instance, col.getBlockAddress) - } // else just ignore this empty block + } // else ignore this empty block } override def close(): Unit = { diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickhouseMergetreeSoftAffinitySuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickhouseMergetreeSoftAffinitySuite.scala index 97418c670f71..aead0bf47fa3 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickhouseMergetreeSoftAffinitySuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickhouseMergetreeSoftAffinitySuite.scala @@ -22,7 +22,7 @@ import org.apache.gluten.execution.{GlutenClickHouseTPCHAbstractSuite, GlutenMer import org.apache.spark.sql.connector.read.InputPartition import org.apache.spark.sql.delta.catalog.ClickHouseTableV2 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper -import org.apache.spark.sql.execution.datasources.utils.MergeTreePartsPartitionsUtil +import org.apache.spark.sql.execution.datasources.clickhouse.utils.MergeTreePartsPartitionsUtil import org.apache.hadoop.fs.Path From a0f4d46c3d9236c3e929095b40c8ee4b24683a7d Mon Sep 17 00:00:00 2001 From: Chang Chen Date: Wed, 25 Sep 2024 16:44:00 +0800 Subject: [PATCH 2/8] Remove partitionColumns from DeltaMergeTreeFileFormat --- .../org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala | 3 +-- .../v2/clickhouse/source/DeltaMergeTreeFileFormat.scala | 3 +-- .../apache/spark/sql/delta/catalog/ClickHouseTableV2Base.scala | 1 - 3 files changed, 2 insertions(+), 5 deletions(-) diff --git a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala index 303b80ee1387..33cfbd593e86 100644 --- a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala +++ b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala @@ -101,8 +101,7 @@ class ClickHouseTableV2( meta, ClickhouseSnapshot.genSnapshotId(initialSnapshot), deltaLog.dataPath.toString, - clickhouseTableConfigs, - partitionColumns + clickhouseTableConfigs ) } diff --git a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala index 2da9ea7d4fcf..229d49fe673b 100644 --- a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala +++ b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala @@ -32,8 +32,7 @@ class DeltaMergeTreeFileFormat( metadata: Metadata, val snapshotId: String, val deltaPath: String, - @transient val clickhouseTableConfigs: Map[String, String], - val partitionColumns: Seq[String]) + @transient val clickhouseTableConfigs: Map[String, String]) extends DeltaParquetFileFormat(protocol, metadata) { override def shortName(): String = "mergetree" diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2Base.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2Base.scala index 8321db6f4efb..4da792a3267d 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2Base.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2Base.scala @@ -32,7 +32,6 @@ trait ClickHouseTableV2Base { def deltaProperties: ju.Map[String, String] - def deltaCatalog: Option[CatalogTable] def deltaPath: Path From 80119f7897c5f909683662bf1e44164052f631f7 Mon Sep 17 00:00:00 2001 From: Chang Chen Date: Fri, 27 Sep 2024 17:13:56 +0800 Subject: [PATCH 3/8] TablePropertiesReader ? --- .../sql/delta/catalog/ClickHouseTableV2.scala | 2 +- .../sql/delta/catalog/ClickHouseTableV2.scala | 2 +- .../sql/delta/catalog/ClickHouseTableV2.scala | 2 +- .../delta/catalog/ClickHouseTableV2Base.scala | 115 ++------------- .../mergetree/TableProperties.scala | 131 ++++++++++++++++++ 5 files changed, 144 insertions(+), 108 deletions(-) create mode 100644 backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/mergetree/TableProperties.scala diff --git a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala index e654bbf71b7d..03d38934b74a 100644 --- a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala +++ b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala @@ -110,7 +110,7 @@ class ClickHouseTableV2( ) } - override def deltaProperties(): ju.Map[String, String] = properties() + override def deltaProperties(): Map[String, String] = properties().asScala.toMap override def deltaCatalog(): Option[CatalogTable] = catalogTable diff --git a/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala b/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala index e654bbf71b7d..03d38934b74a 100644 --- a/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala +++ b/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala @@ -110,7 +110,7 @@ class ClickHouseTableV2( ) } - override def deltaProperties(): ju.Map[String, String] = properties() + override def deltaProperties(): Map[String, String] = properties().asScala.toMap override def deltaCatalog(): Option[CatalogTable] = catalogTable diff --git a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala index 33cfbd593e86..915058b95170 100644 --- a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala +++ b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala @@ -105,7 +105,7 @@ class ClickHouseTableV2( ) } - override def deltaProperties: ju.Map[String, String] = properties() + override def deltaProperties: Map[String, String] = properties().asScala.toMap override def deltaCatalog: Option[CatalogTable] = catalogTable diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2Base.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2Base.scala index 4da792a3267d..608df9ee2a8c 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2Base.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2Base.scala @@ -18,19 +18,19 @@ package org.apache.spark.sql.delta.catalog import org.apache.gluten.expression.ConverterUtils.normalizeColName -import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable} +import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.delta.Snapshot +import org.apache.spark.sql.delta.actions.Metadata import org.apache.spark.sql.execution.datasources.clickhouse.utils.MergeTreeDeltaUtil +import org.apache.spark.sql.execution.datasources.mergetree.TablePropertiesReader import org.apache.hadoop.fs.Path -import java.{util => ju} - -trait ClickHouseTableV2Base { +trait ClickHouseTableV2Base extends TablePropertiesReader { val DEFAULT_DATABASE = "clickhouse_db" - def deltaProperties: ju.Map[String, String] + def deltaProperties: Map[String, String] def deltaCatalog: Option[CatalogTable] @@ -38,6 +38,10 @@ trait ClickHouseTableV2Base { def deltaSnapshot: Snapshot + def configuration: Map[String, String] = deltaProperties + + def metadata: Metadata = deltaSnapshot.metadata + lazy val dataBaseName: String = deltaCatalog .map(_.identifier.database.getOrElse("default")) .getOrElse(DEFAULT_DATABASE) @@ -46,112 +50,13 @@ trait ClickHouseTableV2Base { .map(_.identifier.table) .getOrElse(deltaPath.toUri.getPath) - lazy val bucketOption: Option[BucketSpec] = { - val tableProperties = deltaProperties - if (tableProperties.containsKey("numBuckets")) { - val numBuckets = tableProperties.get("numBuckets").trim.toInt - val bucketColumnNames: Seq[String] = - getCommaSeparatedColumns("bucketColumnNames").getOrElse(Seq.empty[String]) - val sortColumnNames: Seq[String] = - getCommaSeparatedColumns("orderByKey").getOrElse(Seq.empty[String]) - Some(BucketSpec(numBuckets, bucketColumnNames, sortColumnNames)) - } else { - None - } - } - - lazy val lowCardKeyOption: Option[Seq[String]] = { - getCommaSeparatedColumns("lowCardKey") - } - - lazy val minmaxIndexKeyOption: Option[Seq[String]] = { - getCommaSeparatedColumns("minmaxIndexKey") - } - - lazy val bfIndexKeyOption: Option[Seq[String]] = { - getCommaSeparatedColumns("bloomfilterIndexKey") - } - - lazy val setIndexKeyOption: Option[Seq[String]] = { - getCommaSeparatedColumns("setIndexKey") - } - - private def getCommaSeparatedColumns(keyName: String) = { - val tableProperties = deltaProperties - if (tableProperties.containsKey(keyName)) { - if (tableProperties.get(keyName).nonEmpty) { - val keys = tableProperties - .get(keyName) - .split(",") - .map(n => normalizeColName(n.trim)) - .toSeq - keys.foreach( - s => { - if (s.contains(".")) { - throw new IllegalStateException( - s"$keyName $s can not contain '.' (not support nested column yet)") - } - }) - Some(keys) - } else { - None - } - } else { - None - } - } - - lazy val orderByKeyOption: Option[Seq[String]] = { - if (bucketOption.isDefined && bucketOption.get.sortColumnNames.nonEmpty) { - val orderByKeys = bucketOption.get.sortColumnNames.map(normalizeColName).toSeq - val invalidKeys = orderByKeys.intersect(partitionColumns) - if (invalidKeys.nonEmpty) { - throw new IllegalStateException( - s"partition cols $invalidKeys can not be in the order by keys.") - } - Some(orderByKeys) - } else { - val orderByKeys = getCommaSeparatedColumns("orderByKey") - if (orderByKeys.isDefined) { - val invalidKeys = orderByKeys.get.intersect(partitionColumns) - if (invalidKeys.nonEmpty) { - throw new IllegalStateException( - s"partition cols $invalidKeys can not be in the order by keys.") - } - orderByKeys - } else { - None - } - } - } - - lazy val primaryKeyOption: Option[Seq[String]] = { - if (orderByKeyOption.isDefined) { - val primaryKeys = getCommaSeparatedColumns("primaryKey") - if ( - primaryKeys.isDefined && !orderByKeyOption.get - .mkString(",") - .startsWith(primaryKeys.get.mkString(",")) - ) { - throw new IllegalStateException( - s"Primary key $primaryKeys must be a prefix of the sorting key") - } - primaryKeys - } else { - None - } - } - - lazy val partitionColumns: Seq[String] = - deltaSnapshot.metadata.partitionColumns.map(normalizeColName).toSeq - lazy val clickhouseTableConfigs: Map[String, String] = { val (orderByKey0, primaryKey0) = MergeTreeDeltaUtil.genOrderByAndPrimaryKeyStr( orderByKeyOption, primaryKeyOption ) Map( - "storage_policy" -> deltaProperties.getOrDefault("storage_policy", "default"), + "storage_policy" -> deltaProperties.getOrElse("storage_policy", "default"), "storage_db" -> dataBaseName, "storage_table" -> tableName, "storage_orderByKey" -> orderByKey0, diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/mergetree/TableProperties.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/mergetree/TableProperties.scala new file mode 100644 index 000000000000..0c321a27c46b --- /dev/null +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/mergetree/TableProperties.scala @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.mergetree + +import org.apache.gluten.expression.ConverterUtils.normalizeColName + +import org.apache.spark.sql.catalyst.catalog.BucketSpec +import org.apache.spark.sql.delta.actions.Metadata + +/** Reserved table property for MergeTree table. */ +object TableProperties { + val Provider: String = "clickhouse" + val DEFAULT_FILE_FORMAT: String = "write.format.default" + val DEFAULT_FILE_FORMAT_DEFAULT: String = "mergetree" + + // Storage properties + val DefaultStorageDB: String = "default" + val STORAGE_DB: String = "storage_db" + val STORAGE_TABLE: String = "storage_table" + + val SERIALIZER_HEADER: String = "MergeTree;" +} + +trait TablePropertiesReader { + + def configuration: Map[String, String] + + /** delta */ + def metadata: Metadata + + def storageDB: String = + configuration.getOrElse(TableProperties.STORAGE_DB, TableProperties.DefaultStorageDB) + + def storageTable: String = + configuration.getOrElse(TableProperties.STORAGE_TABLE, "") + + private def getCommaSeparatedColumns(keyName: String): Option[Seq[String]] = { + configuration.get(keyName).map { + v => + val keys = v.split(",").map(n => normalizeColName(n.trim)).toSeq + keys.foreach { + s => + if (s.contains(".")) { + throw new IllegalStateException( + s"$keyName $s can not contain '.' (not support nested column yet)") + } + } + keys + } + } + + lazy val bucketOption: Option[BucketSpec] = { + val tableProperties = configuration + if (tableProperties.contains("numBuckets")) { + val numBuckets = tableProperties("numBuckets").trim.toInt + val bucketColumnNames: Seq[String] = + getCommaSeparatedColumns("bucketColumnNames").getOrElse(Seq.empty[String]) + val sortColumnNames: Seq[String] = + getCommaSeparatedColumns("orderByKey").getOrElse(Seq.empty[String]) + Some(BucketSpec(numBuckets, bucketColumnNames, sortColumnNames)) + } else { + None + } + } + + lazy val lowCardKeyOption: Option[Seq[String]] = { + getCommaSeparatedColumns("lowCardKey") + } + + lazy val minmaxIndexKeyOption: Option[Seq[String]] = { + getCommaSeparatedColumns("minmaxIndexKey") + } + + lazy val bfIndexKeyOption: Option[Seq[String]] = { + getCommaSeparatedColumns("bloomfilterIndexKey") + } + + lazy val setIndexKeyOption: Option[Seq[String]] = { + getCommaSeparatedColumns("setIndexKey") + } + + lazy val partitionColumns: Seq[String] = + metadata.partitionColumns.map(normalizeColName) + + lazy val orderByKeyOption: Option[Seq[String]] = { + val orderByKeys = + if (bucketOption.exists(_.sortColumnNames.nonEmpty)) { + bucketOption.map(_.sortColumnNames.map(normalizeColName)) + } else { + getCommaSeparatedColumns("orderByKey") + } + orderByKeys + .map(_.intersect(partitionColumns)) + .filter(_.nonEmpty) + .foreach { + invalidKeys => + throw new IllegalStateException( + s"partition cols $invalidKeys can not be in the order by keys.") + } + orderByKeys + } + + lazy val primaryKeyOption: Option[Seq[String]] = { + orderByKeyOption.map(_.mkString(",")).flatMap { + orderBy => + val primaryKeys = getCommaSeparatedColumns("primaryKey") + primaryKeys + .map(_.mkString(",")) + .filterNot(orderBy.startsWith) + .foreach( + primaryKey => + throw new IllegalStateException( + s"Primary key $primaryKey must be a prefix of the sorting key $orderBy")) + primaryKeys + } + } +} From d21038420351cf18d39a735cccc522d63eccee75 Mon Sep 17 00:00:00 2001 From: Chang Chen Date: Sun, 29 Sep 2024 17:03:30 +0800 Subject: [PATCH 4/8] StorageMeta --- .../sql/delta/catalog/ClickHouseTableV2.scala | 12 +++-- .../source/DeltaMergeTreeFileFormat.scala | 23 ++++----- .../delta/catalog/ClickHouseTableV2Base.scala | 21 ++------- .../clickhouse/ClickhouseMetaSerializer.scala | 23 ++++----- .../mergetree/DeltaMetaReader.scala | 36 ++++++++++++++ ...ableProperties.scala => StorageMeta.scala} | 47 ++++++++++++++++--- 6 files changed, 105 insertions(+), 57 deletions(-) create mode 100644 backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/mergetree/DeltaMetaReader.scala rename backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/mergetree/{TableProperties.scala => StorageMeta.scala} (73%) diff --git a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala index 915058b95170..f730b42e4db0 100644 --- a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala +++ b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala @@ -31,6 +31,7 @@ import org.apache.spark.sql.delta.actions.{Metadata, Protocol} import org.apache.spark.sql.delta.sources.DeltaDataSource import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, PartitionDirectory} import org.apache.spark.sql.execution.datasources.clickhouse.utils.MergeTreePartsPartitionsUtil +import org.apache.spark.sql.execution.datasources.mergetree.StorageMeta import org.apache.spark.sql.execution.datasources.v2.clickhouse.source.DeltaMergeTreeFileFormat import org.apache.spark.sql.execution.datasources.v2.clickhouse.utils.CHDataSourceUtils import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -98,11 +99,12 @@ class ClickHouseTableV2( def getFileFormat(protocol: Protocol, meta: Metadata): DeltaMergeTreeFileFormat = { new DeltaMergeTreeFileFormat( protocol, - meta, - ClickhouseSnapshot.genSnapshotId(initialSnapshot), - deltaLog.dataPath.toString, - clickhouseTableConfigs - ) + StorageMeta.withMoreStorageInfo( + meta, + ClickhouseSnapshot.genSnapshotId(initialSnapshot), + deltaLog.dataPath, + dataBaseName, + tableName)) } override def deltaProperties: Map[String, String] = properties().asScala.toMap diff --git a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala index 229d49fe673b..1489ae4dbf49 100644 --- a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala +++ b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala @@ -21,18 +21,14 @@ import org.apache.spark.sql.delta.DeltaParquetFileFormat import org.apache.spark.sql.delta.actions.{Metadata, Protocol} import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory} import org.apache.spark.sql.execution.datasources.clickhouse.ClickhouseMetaSerializer +import org.apache.spark.sql.execution.datasources.mergetree.DeltaMetaReader import org.apache.spark.sql.execution.datasources.v1.{CHMergeTreeWriterInjects, GlutenMergeTreeWriterInjects} import org.apache.spark.sql.types.StructType import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} @SuppressWarnings(Array("io.github.zhztheplayer.scalawarts.InheritFromCaseClass")) -class DeltaMergeTreeFileFormat( - protocol: Protocol, - metadata: Metadata, - val snapshotId: String, - val deltaPath: String, - @transient val clickhouseTableConfigs: Map[String, String]) +class DeltaMergeTreeFileFormat(protocol: Protocol, metadata: Metadata) extends DeltaParquetFileFormat(protocol, metadata) { override def shortName(): String = "mergetree" @@ -63,16 +59,15 @@ class DeltaMergeTreeFileFormat( .getInstance() .nativeConf(options, "") - val database = clickhouseTableConfigs("storage_db") - val tableName = clickhouseTableConfigs("storage_table") + @transient val deltaMetaReader = DeltaMetaReader(metadata) + + val database = deltaMetaReader.storageDB + val tableName = deltaMetaReader.storageTable + val deltaPath = deltaMetaReader.storagePath + val extensionTableBC = sparkSession.sparkContext.broadcast( ClickhouseMetaSerializer - .forWrite( - snapshotId, - deltaPath, - metadata.schema, - clickhouseTableConfigs - ) + .forWrite(deltaMetaReader, metadata.schema) .toByteArray) new OutputWriterFactory { diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2Base.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2Base.scala index 608df9ee2a8c..06bbb3e4820b 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2Base.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2Base.scala @@ -51,23 +51,10 @@ trait ClickHouseTableV2Base extends TablePropertiesReader { .getOrElse(deltaPath.toUri.getPath) lazy val clickhouseTableConfigs: Map[String, String] = { - val (orderByKey0, primaryKey0) = MergeTreeDeltaUtil.genOrderByAndPrimaryKeyStr( - orderByKeyOption, - primaryKeyOption - ) - Map( - "storage_policy" -> deltaProperties.getOrElse("storage_policy", "default"), - "storage_db" -> dataBaseName, - "storage_table" -> tableName, - "storage_orderByKey" -> orderByKey0, - "storage_lowCardKey" -> lowCardKeyOption.map(MergeTreeDeltaUtil.columnsToStr).getOrElse(""), - "storage_minmaxIndexKey" -> minmaxIndexKeyOption - .map(MergeTreeDeltaUtil.columnsToStr) - .getOrElse(""), - "storage_bfIndexKey" -> bfIndexKeyOption.map(MergeTreeDeltaUtil.columnsToStr).getOrElse(""), - "storage_setIndexKey" -> setIndexKeyOption.map(MergeTreeDeltaUtil.columnsToStr).getOrElse(""), - "storage_primaryKey" -> primaryKey0 - ) + deltaProperties.get("storage_policy") match { + case Some(_) => deltaProperties + case None => deltaProperties ++ Seq("storage_policy" -> "default") + } } def primaryKey(): String = MergeTreeDeltaUtil.columnsToStr(primaryKeyOption) diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/clickhouse/ClickhouseMetaSerializer.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/clickhouse/ClickhouseMetaSerializer.scala index 99c03e2adf78..5c863d76c947 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/clickhouse/ClickhouseMetaSerializer.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/clickhouse/ClickhouseMetaSerializer.scala @@ -20,6 +20,7 @@ import org.apache.gluten.execution.MergeTreePartSplit import org.apache.gluten.expression.ConverterUtils import org.apache.spark.sql.execution.datasources.clickhouse.utils.MergeTreeDeltaUtil +import org.apache.spark.sql.execution.datasources.mergetree.{DeltaMetaReader, StorageMeta} import org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMergeTreeParts import org.apache.spark.sql.types.StructType @@ -80,16 +81,10 @@ object ClickhousePartSerializer { } object ClickhouseMetaSerializer { - private val MERGE_TREE = "MergeTree;" - def forWrite( - snapshotId: String, - path: String, - dataSchema: StructType, - clickhouseTableConfigs: Map[String, String]): ReadRel.ExtensionTable = { + def forWrite(deltaMetaReader: DeltaMetaReader, dataSchema: StructType): ReadRel.ExtensionTable = { + val clickhouseTableConfigs = deltaMetaReader.writeConfiguration - val database = clickhouseTableConfigs("storage_db") - val tableName = clickhouseTableConfigs("storage_table") val orderByKey = clickhouseTableConfigs("storage_orderByKey") val lowCardKey = clickhouseTableConfigs("storage_lowCardKey") val minmaxIndexKey = clickhouseTableConfigs("storage_minmaxIndexKey") @@ -98,11 +93,11 @@ object ClickhouseMetaSerializer { val primaryKey = clickhouseTableConfigs("storage_primaryKey") val result = apply( - database, - tableName, - snapshotId, - path, - "", + deltaMetaReader.storageDB, + deltaMetaReader.storageTable, + deltaMetaReader.storageSnapshotId, + deltaMetaReader.storagePath, + "", // absolutePath orderByKey, lowCardKey, minmaxIndexKey, @@ -176,7 +171,7 @@ object ClickhouseMetaSerializer { // New: MergeTree;{database}\n{table}\n{orderByKey}\n{primaryKey}\n{relative_path}\n // {part_path1}\n{part_path2}\n... - val extensionTableStr = new StringBuilder(MERGE_TREE) + val extensionTableStr = new StringBuilder(StorageMeta.SERIALIZER_HEADER) val orderByKey = ConverterUtils.normalizeColName(orderByKey0) val lowCardKey = ConverterUtils.normalizeColName(lowCardKey0) diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/mergetree/DeltaMetaReader.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/mergetree/DeltaMetaReader.scala new file mode 100644 index 000000000000..de322b65dd8e --- /dev/null +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/mergetree/DeltaMetaReader.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.mergetree + +import org.apache.spark.sql.delta.actions.Metadata + +class DeltaMetaReader( + override val metadata: Metadata, + override val configuration: Map[String, String]) + extends TablePropertiesReader { + + def storageDB: String = configuration(StorageMeta.STORAGE_DB) + def storageTable: String = configuration(StorageMeta.STORAGE_TABLE) + def storageSnapshotId: String = configuration(StorageMeta.STORAGE_SNAPSHOT_ID) + def storagePath: String = configuration(StorageMeta.STORAGE_PATH) +} + +object DeltaMetaReader { + def apply(metadata: Metadata): DeltaMetaReader = { + new DeltaMetaReader(metadata, metadata.configuration) + } +} diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/mergetree/TableProperties.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/mergetree/StorageMeta.scala similarity index 73% rename from backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/mergetree/TableProperties.scala rename to backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/mergetree/StorageMeta.scala index 0c321a27c46b..fc5970224816 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/mergetree/TableProperties.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/mergetree/StorageMeta.scala @@ -20,9 +20,12 @@ import org.apache.gluten.expression.ConverterUtils.normalizeColName import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.delta.actions.Metadata +import org.apache.spark.sql.execution.datasources.clickhouse.utils.MergeTreeDeltaUtil + +import org.apache.hadoop.fs.Path /** Reserved table property for MergeTree table. */ -object TableProperties { +object StorageMeta { val Provider: String = "clickhouse" val DEFAULT_FILE_FORMAT: String = "write.format.default" val DEFAULT_FILE_FORMAT_DEFAULT: String = "mergetree" @@ -31,8 +34,26 @@ object TableProperties { val DefaultStorageDB: String = "default" val STORAGE_DB: String = "storage_db" val STORAGE_TABLE: String = "storage_table" + val STORAGE_SNAPSHOT_ID: String = "storage_snapshot_id" + val STORAGE_PATH: String = "storage_path" val SERIALIZER_HEADER: String = "MergeTree;" + + def withMoreStorageInfo( + metadata: Metadata, + snapshotId: String, + deltaPath: Path, + database: String, + tableName: String): Metadata = { + val newOptions = + metadata.configuration ++ Seq( + STORAGE_DB -> database, + STORAGE_SNAPSHOT_ID -> snapshotId, + STORAGE_TABLE -> tableName, + STORAGE_PATH -> deltaPath.toString + ) + metadata.copy(configuration = newOptions) + } } trait TablePropertiesReader { @@ -42,12 +63,6 @@ trait TablePropertiesReader { /** delta */ def metadata: Metadata - def storageDB: String = - configuration.getOrElse(TableProperties.STORAGE_DB, TableProperties.DefaultStorageDB) - - def storageTable: String = - configuration.getOrElse(TableProperties.STORAGE_TABLE, "") - private def getCommaSeparatedColumns(keyName: String): Option[Seq[String]] = { configuration.get(keyName).map { v => @@ -128,4 +143,22 @@ trait TablePropertiesReader { primaryKeys } } + + lazy val writeConfiguration: Map[String, String] = { + val (orderByKey0, primaryKey0) = MergeTreeDeltaUtil.genOrderByAndPrimaryKeyStr( + orderByKeyOption, + primaryKeyOption + ) + Map( + "storage_policy" -> configuration.getOrElse("storage_policy", "default"), + "storage_orderByKey" -> orderByKey0, + "storage_lowCardKey" -> lowCardKeyOption.map(MergeTreeDeltaUtil.columnsToStr).getOrElse(""), + "storage_minmaxIndexKey" -> minmaxIndexKeyOption + .map(MergeTreeDeltaUtil.columnsToStr) + .getOrElse(""), + "storage_bfIndexKey" -> bfIndexKeyOption.map(MergeTreeDeltaUtil.columnsToStr).getOrElse(""), + "storage_setIndexKey" -> setIndexKeyOption.map(MergeTreeDeltaUtil.columnsToStr).getOrElse(""), + "storage_primaryKey" -> primaryKey0 + ) + } } From 0b95ecd405380eb60e128902fba6d83e148527f5 Mon Sep 17 00:00:00 2001 From: Chang Chen Date: Sun, 29 Sep 2024 22:41:27 +0800 Subject: [PATCH 5/8] spark 33 --- .../sql/delta/catalog/ClickHouseTableV2.scala | 28 +++----- .../source/DeltaMergeTreeFileFormat.scala | 71 +++++-------------- 2 files changed, 29 insertions(+), 70 deletions(-) diff --git a/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala b/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala index 03d38934b74a..ae8ec32bd0a4 100644 --- a/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala +++ b/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.delta.catalog.ClickHouseTableV2.deltaLog2Table import org.apache.spark.sql.delta.sources.DeltaDataSource import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, PartitionDirectory} import org.apache.spark.sql.execution.datasources.clickhouse.utils.MergeTreePartsPartitionsUtil +import org.apache.spark.sql.execution.datasources.mergetree.StorageMeta import org.apache.spark.sql.execution.datasources.v2.clickhouse.source.DeltaMergeTreeFileFormat import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.util.collection.BitSet @@ -95,28 +96,21 @@ class ClickHouseTableV2( def getFileFormat(meta: Metadata): DeltaMergeTreeFileFormat = { new DeltaMergeTreeFileFormat( - meta, - dataBaseName, - tableName, - ClickhouseSnapshot.genSnapshotId(snapshot), - orderByKeyOption, - lowCardKeyOption, - minmaxIndexKeyOption, - bfIndexKeyOption, - setIndexKeyOption, - primaryKeyOption, - clickhouseTableConfigs, - partitionColumns - ) + StorageMeta.withMoreStorageInfo( + meta, + ClickhouseSnapshot.genSnapshotId(snapshot), + deltaLog.dataPath, + dataBaseName, + tableName)) } - override def deltaProperties(): Map[String, String] = properties().asScala.toMap + override def deltaProperties: Map[String, String] = properties().asScala.toMap - override def deltaCatalog(): Option[CatalogTable] = catalogTable + override def deltaCatalog: Option[CatalogTable] = catalogTable - override def deltaPath(): Path = path + override def deltaPath: Path = path - override def deltaSnapshot(): Snapshot = snapshot + override def deltaSnapshot: Snapshot = snapshot def cacheThis(): Unit = { deltaLog2Table.put(deltaLog, this) diff --git a/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala b/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala index d2c7cf5dfe32..c2d1ec47b9ca 100644 --- a/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala +++ b/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala @@ -20,6 +20,8 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.delta.DeltaParquetFileFormat import org.apache.spark.sql.delta.actions.Metadata import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory} +import org.apache.spark.sql.execution.datasources.clickhouse.ClickhouseMetaSerializer +import org.apache.spark.sql.execution.datasources.mergetree.DeltaMetaReader import org.apache.spark.sql.execution.datasources.v1.{CHMergeTreeWriterInjects, GlutenMergeTreeWriterInjects} import org.apache.spark.sql.types.StructType @@ -28,47 +30,6 @@ import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} @SuppressWarnings(Array("io.github.zhztheplayer.scalawarts.InheritFromCaseClass")) class DeltaMergeTreeFileFormat(metadata: Metadata) extends DeltaParquetFileFormat(metadata) { - protected var database = "" - protected var tableName = "" - protected var snapshotId = "" - protected var orderByKeyOption: Option[Seq[String]] = None - protected var lowCardKeyOption: Option[Seq[String]] = None - protected var minmaxIndexKeyOption: Option[Seq[String]] = None - protected var bfIndexKeyOption: Option[Seq[String]] = None - protected var setIndexKeyOption: Option[Seq[String]] = None - protected var primaryKeyOption: Option[Seq[String]] = None - protected var partitionColumns: Seq[String] = Seq.empty[String] - protected var clickhouseTableConfigs: Map[String, String] = Map.empty - - // scalastyle:off argcount - def this( - metadata: Metadata, - database: String, - tableName: String, - snapshotId: String, - orderByKeyOption: Option[Seq[String]], - lowCardKeyOption: Option[Seq[String]], - minmaxIndexKeyOption: Option[Seq[String]], - bfIndexKeyOption: Option[Seq[String]], - setIndexKeyOption: Option[Seq[String]], - primaryKeyOption: Option[Seq[String]], - clickhouseTableConfigs: Map[String, String], - partitionColumns: Seq[String]) { - this(metadata) - this.database = database - this.tableName = tableName - this.snapshotId = snapshotId - this.orderByKeyOption = orderByKeyOption - this.lowCardKeyOption = lowCardKeyOption - this.minmaxIndexKeyOption = minmaxIndexKeyOption - this.bfIndexKeyOption = bfIndexKeyOption - this.setIndexKeyOption = setIndexKeyOption - this.primaryKeyOption = primaryKeyOption - this.clickhouseTableConfigs = clickhouseTableConfigs - this.partitionColumns = partitionColumns - } - // scalastyle:on argcount - override def shortName(): String = "mergetree" override def toString(): String = "MergeTree" @@ -98,6 +59,17 @@ class DeltaMergeTreeFileFormat(metadata: Metadata) extends DeltaParquetFileForma .getInstance() .nativeConf(options, "") + @transient val deltaMetaReader = DeltaMetaReader(metadata) + + val database = deltaMetaReader.storageDB + val tableName = deltaMetaReader.storageTable + val deltaPath = deltaMetaReader.storagePath + + val extensionTableBC = sparkSession.sparkContext.broadcast( + ClickhouseMetaSerializer + .forWrite(deltaMetaReader, metadata.schema) + .toByteArray) + new OutputWriterFactory { override def getFileExtension(context: TaskAttemptContext): String = { ".mergetree" @@ -107,25 +79,18 @@ class DeltaMergeTreeFileFormat(metadata: Metadata) extends DeltaParquetFileForma path: String, dataSchema: StructType, context: TaskAttemptContext): OutputWriter = { + require(path == deltaPath) GlutenMergeTreeWriterInjects .getInstance() .asInstanceOf[CHMergeTreeWriterInjects] .createOutputWriter( path, - database, - tableName, - snapshotId, - orderByKeyOption, - lowCardKeyOption, - minmaxIndexKeyOption, - bfIndexKeyOption, - setIndexKeyOption, - primaryKeyOption, - partitionColumns, metadata.schema, - clickhouseTableConfigs, context, - nativeConf + nativeConf, + database, + tableName, + extensionTableBC.value ) } } From 28fe202670666bfeb6b5006f7e5cf1501063750d Mon Sep 17 00:00:00 2001 From: Chang Chen Date: Sun, 29 Sep 2024 22:52:24 +0800 Subject: [PATCH 6/8] spark 32 --- .../sql/delta/catalog/ClickHouseTableV2.scala | 20 ++---- .../source/DeltaMergeTreeFileFormat.scala | 71 +++++-------------- 2 files changed, 25 insertions(+), 66 deletions(-) diff --git a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala index 03d38934b74a..57ef66b51edc 100644 --- a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala +++ b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.delta.catalog.ClickHouseTableV2.deltaLog2Table import org.apache.spark.sql.delta.sources.DeltaDataSource import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, PartitionDirectory} import org.apache.spark.sql.execution.datasources.clickhouse.utils.MergeTreePartsPartitionsUtil +import org.apache.spark.sql.execution.datasources.mergetree.StorageMeta import org.apache.spark.sql.execution.datasources.v2.clickhouse.source.DeltaMergeTreeFileFormat import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.util.collection.BitSet @@ -95,19 +96,12 @@ class ClickHouseTableV2( def getFileFormat(meta: Metadata): DeltaMergeTreeFileFormat = { new DeltaMergeTreeFileFormat( - meta, - dataBaseName, - tableName, - ClickhouseSnapshot.genSnapshotId(snapshot), - orderByKeyOption, - lowCardKeyOption, - minmaxIndexKeyOption, - bfIndexKeyOption, - setIndexKeyOption, - primaryKeyOption, - clickhouseTableConfigs, - partitionColumns - ) + StorageMeta.withMoreStorageInfo( + meta, + ClickhouseSnapshot.genSnapshotId(snapshot), + deltaLog.dataPath, + dataBaseName, + tableName)) } override def deltaProperties(): Map[String, String] = properties().asScala.toMap diff --git a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala index 994329ccf17c..19b3d396bd6f 100644 --- a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala +++ b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala @@ -20,6 +20,8 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.delta.DeltaParquetFileFormat import org.apache.spark.sql.delta.actions.Metadata import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory} +import org.apache.spark.sql.execution.datasources.clickhouse.ClickhouseMetaSerializer +import org.apache.spark.sql.execution.datasources.mergetree.DeltaMetaReader import org.apache.spark.sql.execution.datasources.v1.{CHMergeTreeWriterInjects, GlutenMergeTreeWriterInjects} import org.apache.spark.sql.types.StructType @@ -28,47 +30,6 @@ import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} class DeltaMergeTreeFileFormat(metadata: Metadata) extends DeltaParquetFileFormat(metadata.columnMappingMode, metadata.schema) { - protected var database = "" - protected var tableName = "" - protected var snapshotId = "" - protected var orderByKeyOption: Option[Seq[String]] = None - protected var lowCardKeyOption: Option[Seq[String]] = None - protected var minmaxIndexKeyOption: Option[Seq[String]] = None - protected var bfIndexKeyOption: Option[Seq[String]] = None - protected var setIndexKeyOption: Option[Seq[String]] = None - protected var primaryKeyOption: Option[Seq[String]] = None - protected var partitionColumns: Seq[String] = Seq.empty[String] - protected var clickhouseTableConfigs: Map[String, String] = Map.empty - - // scalastyle:off argcount - def this( - metadata: Metadata, - database: String, - tableName: String, - snapshotId: String, - orderByKeyOption: Option[Seq[String]], - lowCardKeyOption: Option[Seq[String]], - minmaxIndexKeyOption: Option[Seq[String]], - bfIndexKeyOption: Option[Seq[String]], - setIndexKeyOption: Option[Seq[String]], - primaryKeyOption: Option[Seq[String]], - clickhouseTableConfigs: Map[String, String], - partitionColumns: Seq[String]) { - this(metadata) - this.database = database - this.tableName = tableName - this.snapshotId = snapshotId - this.orderByKeyOption = orderByKeyOption - this.lowCardKeyOption = lowCardKeyOption - this.minmaxIndexKeyOption = minmaxIndexKeyOption - this.bfIndexKeyOption = bfIndexKeyOption - this.setIndexKeyOption = setIndexKeyOption - this.primaryKeyOption = primaryKeyOption - this.clickhouseTableConfigs = clickhouseTableConfigs - this.partitionColumns = partitionColumns - } - // scalastyle:on argcount - override def shortName(): String = "mergetree" override def toString(): String = "MergeTree" @@ -95,6 +56,17 @@ class DeltaMergeTreeFileFormat(metadata: Metadata) .getInstance() .nativeConf(options, "") + @transient val deltaMetaReader = DeltaMetaReader(metadata) + + val database = deltaMetaReader.storageDB + val tableName = deltaMetaReader.storageTable + val deltaPath = deltaMetaReader.storagePath + + val extensionTableBC = sparkSession.sparkContext.broadcast( + ClickhouseMetaSerializer + .forWrite(deltaMetaReader, metadata.schema) + .toByteArray) + new OutputWriterFactory { override def getFileExtension(context: TaskAttemptContext): String = { ".mergetree" @@ -104,25 +76,18 @@ class DeltaMergeTreeFileFormat(metadata: Metadata) path: String, dataSchema: StructType, context: TaskAttemptContext): OutputWriter = { + require(path == deltaPath) GlutenMergeTreeWriterInjects .getInstance() .asInstanceOf[CHMergeTreeWriterInjects] .createOutputWriter( path, - database, - tableName, - snapshotId, - orderByKeyOption, - lowCardKeyOption, - minmaxIndexKeyOption, - bfIndexKeyOption, - setIndexKeyOption, - primaryKeyOption, - partitionColumns, metadata.schema, - clickhouseTableConfigs, context, - nativeConf + nativeConf, + database, + tableName, + extensionTableBC.value ) } } From af2c3de349492059c4f0ed3219eb42903c7cea32 Mon Sep 17 00:00:00 2001 From: Chang Chen Date: Sun, 29 Sep 2024 22:59:27 +0800 Subject: [PATCH 7/8] remove unnecessary createOutputWriter --- .../v1/CHMergeTreeWriterInjects.scala | 45 ------------------- 1 file changed, 45 deletions(-) diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHMergeTreeWriterInjects.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHMergeTreeWriterInjects.scala index 835d9bb1ba28..521b59d60e29 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHMergeTreeWriterInjects.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHMergeTreeWriterInjects.scala @@ -55,51 +55,6 @@ class CHMergeTreeWriterInjects extends CHFormatWriterInjects { context: TaskAttemptContext, nativeConf: JMap[String, String]): OutputWriter = null - // scalastyle:off argcount - def createOutputWriter( - path: String, - database: String, - tableName: String, - snapshotId: String, - orderByKeyOption: Option[Seq[String]], - lowCardKeyOption: Option[Seq[String]], - minmaxIndexKeyOption: Option[Seq[String]], - bfIndexKeyOption: Option[Seq[String]], - setIndexKeyOption: Option[Seq[String]], - primaryKeyOption: Option[Seq[String]], - partitionColumns: Seq[String], - tableSchema: StructType, - clickhouseTableConfigs: Map[String, String], - context: TaskAttemptContext, - nativeConf: JMap[String, String]): OutputWriter = { - - val extensionTable = ClickhouseMetaSerializer.apply1( - database, - tableName, - snapshotId, - path, - "", - orderByKeyOption, - lowCardKeyOption, - minmaxIndexKeyOption, - bfIndexKeyOption, - setIndexKeyOption, - primaryKeyOption, - ClickhousePartSerializer.fromPartNames(Seq()), - ConverterUtils.convertNamedStructJson(tableSchema), - clickhouseTableConfigs.asJava - ) - createOutputWriter( - path, - tableSchema, - context, - nativeConf, - database, - tableName, - extensionTable.toByteArray) - } - // scalastyle:on argcount - override val formatName: String = "mergetree" def createOutputWriter( From 9b0197460d26f04ab2dea54f1145be4bb859ee1c Mon Sep 17 00:00:00 2001 From: Chang Chen Date: Sun, 29 Sep 2024 18:54:16 +0800 Subject: [PATCH 8/8] AddStorageInfo --- backends-clickhouse/pom.xml | 2 + .../sql/delta/catalog/ClickHouseTableV2.scala | 8 +-- .../sql/delta/catalog/ClickHouseTableV2.scala | 4 +- .../backendsapi/clickhouse/CHRuleApi.scala | 11 ++-- .../delta/catalog/ClickHouseTableV2Base.scala | 13 ++-- .../utils/MergeTreePartsPartitionsUtil.scala | 3 +- .../datasources/mergetree/StorageMeta.scala | 36 ++++++++--- .../backendsapi/clickhouse/CHExtendRule.scala | 23 +++++++ .../backendsapi/clickhouse/CHExtendRule.scala | 23 +++++++ .../backendsapi/clickhouse/CHExtendRule.scala | 27 ++++++++ .../spark/sql/catalyst/AddStorageInfo.scala | 50 +++++++++++++++ ...ickHouseMergeTreePathBasedWriteSuite.scala | 63 ++++++++++--------- 12 files changed, 203 insertions(+), 60 deletions(-) create mode 100644 backends-clickhouse/src/main/spark32/org/apache/gluten/backendsapi/clickhouse/CHExtendRule.scala create mode 100644 backends-clickhouse/src/main/spark33/org/apache/gluten/backendsapi/clickhouse/CHExtendRule.scala create mode 100644 backends-clickhouse/src/main/spark35/org/apache/gluten/backendsapi/clickhouse/CHExtendRule.scala create mode 100644 backends-clickhouse/src/main/spark35/org/apache/spark/sql/catalyst/AddStorageInfo.scala diff --git a/backends-clickhouse/pom.xml b/backends-clickhouse/pom.xml index 130fe88552e7..241cdd84519a 100644 --- a/backends-clickhouse/pom.xml +++ b/backends-clickhouse/pom.xml @@ -338,6 +338,7 @@ src/test/scala/**/*.scala src/main/delta-${delta.binary.version}/**/*.scala src/test/delta-${delta.binary.version}/**/*.scala + src/main/${sparkshim.module.name}/**/*.scala src/main/delta-${delta.binary.version}/org/apache/spark/sql/delta/commands/*.scala @@ -397,6 +398,7 @@ src/main/delta-${delta.binary.version} + src/main/${sparkshim.module.name} diff --git a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala index 57ef66b51edc..ae8ec32bd0a4 100644 --- a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala +++ b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala @@ -104,13 +104,13 @@ class ClickHouseTableV2( tableName)) } - override def deltaProperties(): Map[String, String] = properties().asScala.toMap + override def deltaProperties: Map[String, String] = properties().asScala.toMap - override def deltaCatalog(): Option[CatalogTable] = catalogTable + override def deltaCatalog: Option[CatalogTable] = catalogTable - override def deltaPath(): Path = path + override def deltaPath: Path = path - override def deltaSnapshot(): Snapshot = snapshot + override def deltaSnapshot: Snapshot = snapshot def cacheThis(): Unit = { deltaLog2Table.put(deltaLog, this) diff --git a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala index f730b42e4db0..f5f1668f60b5 100644 --- a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala +++ b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala @@ -102,9 +102,7 @@ class ClickHouseTableV2( StorageMeta.withMoreStorageInfo( meta, ClickhouseSnapshot.genSnapshotId(initialSnapshot), - deltaLog.dataPath, - dataBaseName, - tableName)) + deltaLog.dataPath)) } override def deltaProperties: Map[String, String] = properties().asScala.toMap diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala index 2cf1f4fcc45b..ba9d859bc9cd 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala @@ -19,7 +19,7 @@ package org.apache.gluten.backendsapi.clickhouse import org.apache.gluten.backendsapi.RuleApi import org.apache.gluten.extension._ import org.apache.gluten.extension.columnar._ -import org.apache.gluten.extension.columnar.MiscColumnarRules.{RemoveGlutenTableCacheColumnarToRow, RemoveTopmostColumnarToRow, RewriteSubqueryBroadcast, TransformPreOverrides} +import org.apache.gluten.extension.columnar.MiscColumnarRules._ import org.apache.gluten.extension.columnar.rewrite.RewriteSparkPlanRulesManager import org.apache.gluten.extension.columnar.transition.{InsertTransitions, RemoveTransitions} import org.apache.gluten.extension.injector.{RuleInjector, SparkInjector} @@ -28,7 +28,7 @@ import org.apache.gluten.parser.{GlutenCacheFilesSqlParser, GlutenClickhouseSqlP import org.apache.gluten.sql.shims.SparkShimLoader import org.apache.gluten.utils.PhysicalPlanSelector -import org.apache.spark.sql.catalyst.{CHAggregateFunctionRewriteRule, EqualToRewrite} +import org.apache.spark.sql.catalyst._ import org.apache.spark.sql.execution.{ColumnarCollapseTransformStages, GlutenFallbackReporter} import org.apache.spark.util.SparkPlanRules @@ -44,7 +44,7 @@ class CHRuleApi extends RuleApi { } private object CHRuleApi { - def injectSpark(injector: SparkInjector): Unit = { + private def injectSpark(injector: SparkInjector): Unit = { // Inject the regular Spark rules directly. injector.injectQueryStagePrepRule(FallbackBroadcastHashJoinPrepQueryStage.apply) injector.injectQueryStagePrepRule(spark => CHAQEPropagateEmptyRelation(spark)) @@ -61,9 +61,10 @@ private object CHRuleApi { injector.injectOptimizerRule(spark => CHAggregateFunctionRewriteRule(spark)) injector.injectOptimizerRule(_ => CountDistinctWithoutExpand) injector.injectOptimizerRule(_ => EqualToRewrite) + CHExtendRule.injectSpark(injector) } - def injectLegacy(injector: LegacyInjector): Unit = { + private def injectLegacy(injector: LegacyInjector): Unit = { // Gluten columnar: Transform rules. injector.injectTransform(_ => RemoveTransitions) injector.injectTransform(_ => PushDownInputFileExpression.PreOffload) @@ -107,7 +108,7 @@ private object CHRuleApi { injector.injectFinal(_ => RemoveFallbackTagRule()) } - def injectRas(injector: RasInjector): Unit = { + private def injectRas(injector: RasInjector): Unit = { // CH backend doesn't work with RAS at the moment. Inject a rule that aborts any // execution calls. injector.inject( diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2Base.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2Base.scala index 06bbb3e4820b..062e96962297 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2Base.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2Base.scala @@ -22,14 +22,12 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.delta.Snapshot import org.apache.spark.sql.delta.actions.Metadata import org.apache.spark.sql.execution.datasources.clickhouse.utils.MergeTreeDeltaUtil -import org.apache.spark.sql.execution.datasources.mergetree.TablePropertiesReader +import org.apache.spark.sql.execution.datasources.mergetree.{StorageMeta, TablePropertiesReader} import org.apache.hadoop.fs.Path trait ClickHouseTableV2Base extends TablePropertiesReader { - val DEFAULT_DATABASE = "clickhouse_db" - def deltaProperties: Map[String, String] def deltaCatalog: Option[CatalogTable] @@ -43,18 +41,15 @@ trait ClickHouseTableV2Base extends TablePropertiesReader { def metadata: Metadata = deltaSnapshot.metadata lazy val dataBaseName: String = deltaCatalog - .map(_.identifier.database.getOrElse("default")) - .getOrElse(DEFAULT_DATABASE) + .map(_.identifier.database.getOrElse(StorageMeta.DEFAULT_CREATE_TABLE_DATABASE)) + .getOrElse(StorageMeta.DEFAULT_PATH_BASED_DATABASE) lazy val tableName: String = deltaCatalog .map(_.identifier.table) .getOrElse(deltaPath.toUri.getPath) lazy val clickhouseTableConfigs: Map[String, String] = { - deltaProperties.get("storage_policy") match { - case Some(_) => deltaProperties - case None => deltaProperties ++ Seq("storage_policy" -> "default") - } + Map("storage_policy" -> deltaProperties.getOrElse("storage_policy", "default")) } def primaryKey(): String = MergeTreeDeltaUtil.columnsToStr(primaryKeyOption) diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/clickhouse/utils/MergeTreePartsPartitionsUtil.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/clickhouse/utils/MergeTreePartsPartitionsUtil.scala index ee53d1c4168f..dc31822fd73e 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/clickhouse/utils/MergeTreePartsPartitionsUtil.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/clickhouse/utils/MergeTreePartsPartitionsUtil.scala @@ -36,6 +36,7 @@ import org.apache.spark.sql.delta.catalog.ClickHouseTableV2 import org.apache.spark.sql.delta.files.TahoeFileIndex import org.apache.spark.sql.execution.datasources.{CHDatasourceJniWrapper, HadoopFsRelation, PartitionDirectory} import org.apache.spark.sql.execution.datasources.clickhouse.{ClickhousePartSerializer, ExtensionTableBuilder, MergeTreePartFilterReturnedRange} +import org.apache.spark.sql.execution.datasources.mergetree.StorageMeta import org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMergeTreeParts import org.apache.spark.sql.execution.datasources.v2.clickhouse.source.DeltaMergeTreeFileFormat import org.apache.spark.sql.types.BooleanType @@ -85,7 +86,7 @@ object MergeTreePartsPartitionsUtil extends Logging { (table.catalogTable.get.identifier.database.get, table.catalogTable.get.identifier.table) } else { // for file_format.`file_path` - (table.DEFAULT_DATABASE, table.deltaPath.toUri.getPath) + (StorageMeta.DEFAULT_PATH_BASED_DATABASE, table.deltaPath.toUri.getPath) } val engine = "MergeTree" val relativeTablePath = fileIndex.deltaLog.dataPath.toUri.getPath.substring(1) diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/mergetree/StorageMeta.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/mergetree/StorageMeta.scala index fc5970224816..e08f91450ec2 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/mergetree/StorageMeta.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/mergetree/StorageMeta.scala @@ -24,6 +24,8 @@ import org.apache.spark.sql.execution.datasources.clickhouse.utils.MergeTreeDelt import org.apache.hadoop.fs.Path +import scala.collection.mutable.ListBuffer + /** Reserved table property for MergeTree table. */ object StorageMeta { val Provider: String = "clickhouse" @@ -31,12 +33,12 @@ object StorageMeta { val DEFAULT_FILE_FORMAT_DEFAULT: String = "mergetree" // Storage properties - val DefaultStorageDB: String = "default" + val DEFAULT_PATH_BASED_DATABASE: String = "clickhouse_db" + val DEFAULT_CREATE_TABLE_DATABASE: String = "default" val STORAGE_DB: String = "storage_db" val STORAGE_TABLE: String = "storage_table" val STORAGE_SNAPSHOT_ID: String = "storage_snapshot_id" val STORAGE_PATH: String = "storage_path" - val SERIALIZER_HEADER: String = "MergeTree;" def withMoreStorageInfo( @@ -45,14 +47,28 @@ object StorageMeta { deltaPath: Path, database: String, tableName: String): Metadata = { - val newOptions = - metadata.configuration ++ Seq( - STORAGE_DB -> database, - STORAGE_SNAPSHOT_ID -> snapshotId, - STORAGE_TABLE -> tableName, - STORAGE_PATH -> deltaPath.toString - ) - metadata.copy(configuration = newOptions) + val moreOptions = Seq( + STORAGE_DB -> database, + STORAGE_SNAPSHOT_ID -> snapshotId, + STORAGE_TABLE -> tableName, + STORAGE_PATH -> deltaPath.toString) + withMoreOptions(metadata, moreOptions) + } + def withMoreStorageInfo(metadata: Metadata, snapshotId: String, deltaPath: Path): Metadata = { + val moreOptions = + ListBuffer(STORAGE_SNAPSHOT_ID -> snapshotId, STORAGE_PATH -> deltaPath.toString) + // Path-based create table statement does not have storage_db and storage_table + if (!metadata.configuration.contains(STORAGE_DB)) { + moreOptions += STORAGE_DB -> DEFAULT_PATH_BASED_DATABASE + } + if (!metadata.configuration.contains(STORAGE_TABLE)) { + moreOptions += STORAGE_TABLE -> deltaPath.toUri.getPath + } + withMoreOptions(metadata, moreOptions.toSeq) + } + + private def withMoreOptions(metadata: Metadata, newOptions: Seq[(String, String)]): Metadata = { + metadata.copy(configuration = metadata.configuration ++ newOptions) } } diff --git a/backends-clickhouse/src/main/spark32/org/apache/gluten/backendsapi/clickhouse/CHExtendRule.scala b/backends-clickhouse/src/main/spark32/org/apache/gluten/backendsapi/clickhouse/CHExtendRule.scala new file mode 100644 index 000000000000..234954386adb --- /dev/null +++ b/backends-clickhouse/src/main/spark32/org/apache/gluten/backendsapi/clickhouse/CHExtendRule.scala @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.backendsapi.clickhouse + +import org.apache.gluten.extension.injector.SparkInjector + +object CHExtendRule { + def injectSpark(injector: SparkInjector): Unit = {} +} diff --git a/backends-clickhouse/src/main/spark33/org/apache/gluten/backendsapi/clickhouse/CHExtendRule.scala b/backends-clickhouse/src/main/spark33/org/apache/gluten/backendsapi/clickhouse/CHExtendRule.scala new file mode 100644 index 000000000000..234954386adb --- /dev/null +++ b/backends-clickhouse/src/main/spark33/org/apache/gluten/backendsapi/clickhouse/CHExtendRule.scala @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.backendsapi.clickhouse + +import org.apache.gluten.extension.injector.SparkInjector + +object CHExtendRule { + def injectSpark(injector: SparkInjector): Unit = {} +} diff --git a/backends-clickhouse/src/main/spark35/org/apache/gluten/backendsapi/clickhouse/CHExtendRule.scala b/backends-clickhouse/src/main/spark35/org/apache/gluten/backendsapi/clickhouse/CHExtendRule.scala new file mode 100644 index 000000000000..fb3a854ef98c --- /dev/null +++ b/backends-clickhouse/src/main/spark35/org/apache/gluten/backendsapi/clickhouse/CHExtendRule.scala @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.backendsapi.clickhouse + +import org.apache.gluten.extension.injector.SparkInjector + +import org.apache.spark.sql.catalyst.AddStorageInfo + +object CHExtendRule { + def injectSpark(injector: SparkInjector): Unit = { + injector.injectOptimizerRule(_ => AddStorageInfo) + } +} diff --git a/backends-clickhouse/src/main/spark35/org/apache/spark/sql/catalyst/AddStorageInfo.scala b/backends-clickhouse/src/main/spark35/org/apache/spark/sql/catalyst/AddStorageInfo.scala new file mode 100644 index 000000000000..760241f840f2 --- /dev/null +++ b/backends-clickhouse/src/main/spark35/org/apache/spark/sql/catalyst/AddStorageInfo.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst + +import org.apache.spark.sql.catalyst.analysis.ResolvedIdentifier +import org.apache.spark.sql.catalyst.plans.logical.{CreateTable, LogicalPlan, TableSpec} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.catalyst.trees.TreePattern.COMMAND +import org.apache.spark.sql.execution.datasources.mergetree.StorageMeta + +/** This object is responsible for adding storage information to the CreateTable. */ + +object AddStorageInfo extends Rule[LogicalPlan] { + + private def createMergeTreeTable(tableSpec: TableSpec): Boolean = { + tableSpec.provider.contains(StorageMeta.Provider) || + tableSpec.properties + .get(StorageMeta.DEFAULT_FILE_FORMAT) + .contains(StorageMeta.DEFAULT_FILE_FORMAT_DEFAULT) + } + + override def apply(plan: LogicalPlan): LogicalPlan = + plan.transformWithPruning(_.containsAnyPattern(COMMAND)) { + case create @ CreateTable(ResolvedIdentifier(_, ident), _, _, tableSpec: TableSpec, _) + if createMergeTreeTable(tableSpec) => + val newTableSpec = tableSpec.copy( + properties = tableSpec.properties ++ Seq( + StorageMeta.STORAGE_DB -> ident + .namespace() + .lastOption + .getOrElse(StorageMeta.DEFAULT_CREATE_TABLE_DATABASE), + StorageMeta.STORAGE_TABLE -> ident.name()) + ) + create.copy(tableSpec = newTableSpec) + } +} diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreePathBasedWriteSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreePathBasedWriteSuite.scala index a1dd5d8687a0..62b9ee3bcb31 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreePathBasedWriteSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreePathBasedWriteSuite.scala @@ -17,6 +17,7 @@ package org.apache.gluten.execution.mergetree import org.apache.gluten.execution._ +import org.apache.gluten.utils.Arm import org.apache.spark.SparkConf import org.apache.spark.sql.SaveMode @@ -24,6 +25,7 @@ import org.apache.spark.sql.delta.catalog.ClickHouseTableV2 import org.apache.spark.sql.delta.files.TahoeFileIndex import org.apache.spark.sql.execution.LocalTableScanExec import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper +import org.apache.spark.sql.execution.datasources.mergetree.StorageMeta import org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMergeTreeParts import org.apache.spark.sql.functions._ @@ -162,7 +164,7 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite val planNodeJson = wholeStageTransformer.substraitPlanJson assert( !planNodeJson - .replaceAll("\\\n", "") + .replaceAll("\n", "") .replaceAll(" ", "") .contains("\"input\":{\"filter\":{")) } @@ -269,7 +271,7 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite val planNodeJson = wholeStageTransformer.substraitPlanJson assert( !planNodeJson - .replaceAll("\\\n", "") + .replaceAll("\n", "") .replaceAll(" ", "") .contains("\"input\":{\"filter\":{")) } @@ -1006,7 +1008,7 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite // find a folder whose name is like 48b70783-b3b8-4bf8-9c52-5261aead8e3e_0_006 val partDir = directory.listFiles().filter(f => f.getName.length > 20).head val columnsFile = new File(partDir, "columns.txt") - val columns = Source.fromFile(columnsFile).getLines().mkString + val columns = Arm.withResource(Source.fromFile(columnsFile))(_.getLines().mkString) assert(columns.contains("`l_returnflag` LowCardinality(Nullable(String))")) assert(columns.contains("`l_linestatus` LowCardinality(Nullable(String))")) @@ -1366,34 +1368,39 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite .option("clickhouse.lowCardKey", "l_returnflag,l_linestatus") .save(dataPath1) - val df = spark.read - .format("clickhouse") - .load(dataPath) - val result = df.collect() - assertResult(600572)(result.size) + { + val df = spark.read + .format("clickhouse") + .load(dataPath) + val result = df.collect() + assertResult(600572)(result.length) - val plans = collect(df.queryExecution.executedPlan) { - case f: FileSourceScanExecTransformer => f + val plans = collect(df.queryExecution.executedPlan) { + case f: FileSourceScanExecTransformer => f + } + val partitions = plans.head.getPartitions + assert(partitions.nonEmpty) + assert(partitions.head.isInstanceOf[GlutenMergeTreePartition]) + val mergeTreePartition = partitions.head.asInstanceOf[GlutenMergeTreePartition] + assertResult(mergeTreePartition.database)(StorageMeta.DEFAULT_PATH_BASED_DATABASE) + assertResult(mergeTreePartition.table)(dataPath) } - val partitions = plans(0).getPartitions - assert(partitions.nonEmpty) - assert(partitions(0).isInstanceOf[GlutenMergeTreePartition]) - assert(partitions(0).asInstanceOf[GlutenMergeTreePartition].database.equals("clickhouse_db")) - assert(partitions(0).asInstanceOf[GlutenMergeTreePartition].table.equals(dataPath)) - - val df1 = spark.read - .format("clickhouse") - .load(dataPath1) - val result1 = df1.collect() - assertResult(600572)(result.size) + { + val df1 = spark.read + .format("clickhouse") + .load(dataPath1) + val result1 = df1.collect() + assertResult(600572)(result1.length) - val plans1 = collect(df1.queryExecution.executedPlan) { - case f: FileSourceScanExecTransformer => f + val plans1 = collect(df1.queryExecution.executedPlan) { + case f: FileSourceScanExecTransformer => f + } + val partitions1 = plans1.head.getPartitions + assert(partitions1.nonEmpty) + assert(partitions1.head.isInstanceOf[GlutenMergeTreePartition]) + val mergeTreePartition1 = partitions1.head.asInstanceOf[GlutenMergeTreePartition] + assertResult(mergeTreePartition1.database)(StorageMeta.DEFAULT_PATH_BASED_DATABASE) + assertResult(mergeTreePartition1.table)(dataPath1) } - val partitions1 = plans1(0).getPartitions - assert(partitions1.nonEmpty) - assert(partitions1(0).isInstanceOf[GlutenMergeTreePartition]) - assert(partitions1(0).asInstanceOf[GlutenMergeTreePartition].database.equals("clickhouse_db")) - assert(partitions1(0).asInstanceOf[GlutenMergeTreePartition].table.equals(dataPath1)) } }