diff --git a/backends-clickhouse/pom.xml b/backends-clickhouse/pom.xml
index 130fe88552e7..241cdd84519a 100644
--- a/backends-clickhouse/pom.xml
+++ b/backends-clickhouse/pom.xml
@@ -338,6 +338,7 @@
src/test/scala/**/*.scala
src/main/delta-${delta.binary.version}/**/*.scala
src/test/delta-${delta.binary.version}/**/*.scala
+ src/main/${sparkshim.module.name}/**/*.scala
src/main/delta-${delta.binary.version}/org/apache/spark/sql/delta/commands/*.scala
@@ -397,6 +398,7 @@
+
diff --git a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala
index 90370f0b1d99..ae8ec32bd0a4 100644
--- a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala
+++ b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala
@@ -26,7 +26,8 @@ import org.apache.spark.sql.delta.actions.Metadata
import org.apache.spark.sql.delta.catalog.ClickHouseTableV2.deltaLog2Table
import org.apache.spark.sql.delta.sources.DeltaDataSource
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, PartitionDirectory}
-import org.apache.spark.sql.execution.datasources.utils.MergeTreePartsPartitionsUtil
+import org.apache.spark.sql.execution.datasources.clickhouse.utils.MergeTreePartsPartitionsUtil
+import org.apache.spark.sql.execution.datasources.mergetree.StorageMeta
import org.apache.spark.sql.execution.datasources.v2.clickhouse.source.DeltaMergeTreeFileFormat
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.collection.BitSet
@@ -95,28 +96,21 @@ class ClickHouseTableV2(
def getFileFormat(meta: Metadata): DeltaMergeTreeFileFormat = {
new DeltaMergeTreeFileFormat(
- meta,
- dataBaseName,
- tableName,
- ClickhouseSnapshot.genSnapshotId(snapshot),
- orderByKeyOption,
- lowCardKeyOption,
- minmaxIndexKeyOption,
- bfIndexKeyOption,
- setIndexKeyOption,
- primaryKeyOption,
- clickhouseTableConfigs,
- partitionColumns
- )
+ StorageMeta.withMoreStorageInfo(
+ meta,
+ ClickhouseSnapshot.genSnapshotId(snapshot),
+ deltaLog.dataPath,
+ dataBaseName,
+ tableName))
}
- override def deltaProperties(): ju.Map[String, String] = properties()
+ override def deltaProperties: Map[String, String] = properties().asScala.toMap
- override def deltaCatalog(): Option[CatalogTable] = catalogTable
+ override def deltaCatalog: Option[CatalogTable] = catalogTable
- override def deltaPath(): Path = path
+ override def deltaPath: Path = path
- override def deltaSnapshot(): Snapshot = snapshot
+ override def deltaSnapshot: Snapshot = snapshot
def cacheThis(): Unit = {
deltaLog2Table.put(deltaLog, this)
diff --git a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala
index 994329ccf17c..19b3d396bd6f 100644
--- a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala
+++ b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala
@@ -20,6 +20,8 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.delta.DeltaParquetFileFormat
import org.apache.spark.sql.delta.actions.Metadata
import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory}
+import org.apache.spark.sql.execution.datasources.clickhouse.ClickhouseMetaSerializer
+import org.apache.spark.sql.execution.datasources.mergetree.DeltaMetaReader
import org.apache.spark.sql.execution.datasources.v1.{CHMergeTreeWriterInjects, GlutenMergeTreeWriterInjects}
import org.apache.spark.sql.types.StructType
@@ -28,47 +30,6 @@ import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
class DeltaMergeTreeFileFormat(metadata: Metadata)
extends DeltaParquetFileFormat(metadata.columnMappingMode, metadata.schema) {
- protected var database = ""
- protected var tableName = ""
- protected var snapshotId = ""
- protected var orderByKeyOption: Option[Seq[String]] = None
- protected var lowCardKeyOption: Option[Seq[String]] = None
- protected var minmaxIndexKeyOption: Option[Seq[String]] = None
- protected var bfIndexKeyOption: Option[Seq[String]] = None
- protected var setIndexKeyOption: Option[Seq[String]] = None
- protected var primaryKeyOption: Option[Seq[String]] = None
- protected var partitionColumns: Seq[String] = Seq.empty[String]
- protected var clickhouseTableConfigs: Map[String, String] = Map.empty
-
- // scalastyle:off argcount
- def this(
- metadata: Metadata,
- database: String,
- tableName: String,
- snapshotId: String,
- orderByKeyOption: Option[Seq[String]],
- lowCardKeyOption: Option[Seq[String]],
- minmaxIndexKeyOption: Option[Seq[String]],
- bfIndexKeyOption: Option[Seq[String]],
- setIndexKeyOption: Option[Seq[String]],
- primaryKeyOption: Option[Seq[String]],
- clickhouseTableConfigs: Map[String, String],
- partitionColumns: Seq[String]) {
- this(metadata)
- this.database = database
- this.tableName = tableName
- this.snapshotId = snapshotId
- this.orderByKeyOption = orderByKeyOption
- this.lowCardKeyOption = lowCardKeyOption
- this.minmaxIndexKeyOption = minmaxIndexKeyOption
- this.bfIndexKeyOption = bfIndexKeyOption
- this.setIndexKeyOption = setIndexKeyOption
- this.primaryKeyOption = primaryKeyOption
- this.clickhouseTableConfigs = clickhouseTableConfigs
- this.partitionColumns = partitionColumns
- }
- // scalastyle:on argcount
-
override def shortName(): String = "mergetree"
override def toString(): String = "MergeTree"
@@ -95,6 +56,17 @@ class DeltaMergeTreeFileFormat(metadata: Metadata)
.getInstance()
.nativeConf(options, "")
+ @transient val deltaMetaReader = DeltaMetaReader(metadata)
+
+ val database = deltaMetaReader.storageDB
+ val tableName = deltaMetaReader.storageTable
+ val deltaPath = deltaMetaReader.storagePath
+
+ val extensionTableBC = sparkSession.sparkContext.broadcast(
+ ClickhouseMetaSerializer
+ .forWrite(deltaMetaReader, metadata.schema)
+ .toByteArray)
+
new OutputWriterFactory {
override def getFileExtension(context: TaskAttemptContext): String = {
".mergetree"
@@ -104,25 +76,18 @@ class DeltaMergeTreeFileFormat(metadata: Metadata)
path: String,
dataSchema: StructType,
context: TaskAttemptContext): OutputWriter = {
+ require(path == deltaPath)
GlutenMergeTreeWriterInjects
.getInstance()
.asInstanceOf[CHMergeTreeWriterInjects]
.createOutputWriter(
path,
- database,
- tableName,
- snapshotId,
- orderByKeyOption,
- lowCardKeyOption,
- minmaxIndexKeyOption,
- bfIndexKeyOption,
- setIndexKeyOption,
- primaryKeyOption,
- partitionColumns,
metadata.schema,
- clickhouseTableConfigs,
context,
- nativeConf
+ nativeConf,
+ database,
+ tableName,
+ extensionTableBC.value
)
}
}
diff --git a/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala b/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala
index 90370f0b1d99..ae8ec32bd0a4 100644
--- a/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala
+++ b/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala
@@ -26,7 +26,8 @@ import org.apache.spark.sql.delta.actions.Metadata
import org.apache.spark.sql.delta.catalog.ClickHouseTableV2.deltaLog2Table
import org.apache.spark.sql.delta.sources.DeltaDataSource
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, PartitionDirectory}
-import org.apache.spark.sql.execution.datasources.utils.MergeTreePartsPartitionsUtil
+import org.apache.spark.sql.execution.datasources.clickhouse.utils.MergeTreePartsPartitionsUtil
+import org.apache.spark.sql.execution.datasources.mergetree.StorageMeta
import org.apache.spark.sql.execution.datasources.v2.clickhouse.source.DeltaMergeTreeFileFormat
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.collection.BitSet
@@ -95,28 +96,21 @@ class ClickHouseTableV2(
def getFileFormat(meta: Metadata): DeltaMergeTreeFileFormat = {
new DeltaMergeTreeFileFormat(
- meta,
- dataBaseName,
- tableName,
- ClickhouseSnapshot.genSnapshotId(snapshot),
- orderByKeyOption,
- lowCardKeyOption,
- minmaxIndexKeyOption,
- bfIndexKeyOption,
- setIndexKeyOption,
- primaryKeyOption,
- clickhouseTableConfigs,
- partitionColumns
- )
+ StorageMeta.withMoreStorageInfo(
+ meta,
+ ClickhouseSnapshot.genSnapshotId(snapshot),
+ deltaLog.dataPath,
+ dataBaseName,
+ tableName))
}
- override def deltaProperties(): ju.Map[String, String] = properties()
+ override def deltaProperties: Map[String, String] = properties().asScala.toMap
- override def deltaCatalog(): Option[CatalogTable] = catalogTable
+ override def deltaCatalog: Option[CatalogTable] = catalogTable
- override def deltaPath(): Path = path
+ override def deltaPath: Path = path
- override def deltaSnapshot(): Snapshot = snapshot
+ override def deltaSnapshot: Snapshot = snapshot
def cacheThis(): Unit = {
deltaLog2Table.put(deltaLog, this)
diff --git a/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala b/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala
index d2c7cf5dfe32..c2d1ec47b9ca 100644
--- a/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala
+++ b/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala
@@ -20,6 +20,8 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.delta.DeltaParquetFileFormat
import org.apache.spark.sql.delta.actions.Metadata
import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory}
+import org.apache.spark.sql.execution.datasources.clickhouse.ClickhouseMetaSerializer
+import org.apache.spark.sql.execution.datasources.mergetree.DeltaMetaReader
import org.apache.spark.sql.execution.datasources.v1.{CHMergeTreeWriterInjects, GlutenMergeTreeWriterInjects}
import org.apache.spark.sql.types.StructType
@@ -28,47 +30,6 @@ import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
@SuppressWarnings(Array("io.github.zhztheplayer.scalawarts.InheritFromCaseClass"))
class DeltaMergeTreeFileFormat(metadata: Metadata) extends DeltaParquetFileFormat(metadata) {
- protected var database = ""
- protected var tableName = ""
- protected var snapshotId = ""
- protected var orderByKeyOption: Option[Seq[String]] = None
- protected var lowCardKeyOption: Option[Seq[String]] = None
- protected var minmaxIndexKeyOption: Option[Seq[String]] = None
- protected var bfIndexKeyOption: Option[Seq[String]] = None
- protected var setIndexKeyOption: Option[Seq[String]] = None
- protected var primaryKeyOption: Option[Seq[String]] = None
- protected var partitionColumns: Seq[String] = Seq.empty[String]
- protected var clickhouseTableConfigs: Map[String, String] = Map.empty
-
- // scalastyle:off argcount
- def this(
- metadata: Metadata,
- database: String,
- tableName: String,
- snapshotId: String,
- orderByKeyOption: Option[Seq[String]],
- lowCardKeyOption: Option[Seq[String]],
- minmaxIndexKeyOption: Option[Seq[String]],
- bfIndexKeyOption: Option[Seq[String]],
- setIndexKeyOption: Option[Seq[String]],
- primaryKeyOption: Option[Seq[String]],
- clickhouseTableConfigs: Map[String, String],
- partitionColumns: Seq[String]) {
- this(metadata)
- this.database = database
- this.tableName = tableName
- this.snapshotId = snapshotId
- this.orderByKeyOption = orderByKeyOption
- this.lowCardKeyOption = lowCardKeyOption
- this.minmaxIndexKeyOption = minmaxIndexKeyOption
- this.bfIndexKeyOption = bfIndexKeyOption
- this.setIndexKeyOption = setIndexKeyOption
- this.primaryKeyOption = primaryKeyOption
- this.clickhouseTableConfigs = clickhouseTableConfigs
- this.partitionColumns = partitionColumns
- }
- // scalastyle:on argcount
-
override def shortName(): String = "mergetree"
override def toString(): String = "MergeTree"
@@ -98,6 +59,17 @@ class DeltaMergeTreeFileFormat(metadata: Metadata) extends DeltaParquetFileForma
.getInstance()
.nativeConf(options, "")
+ @transient val deltaMetaReader = DeltaMetaReader(metadata)
+
+ val database = deltaMetaReader.storageDB
+ val tableName = deltaMetaReader.storageTable
+ val deltaPath = deltaMetaReader.storagePath
+
+ val extensionTableBC = sparkSession.sparkContext.broadcast(
+ ClickhouseMetaSerializer
+ .forWrite(deltaMetaReader, metadata.schema)
+ .toByteArray)
+
new OutputWriterFactory {
override def getFileExtension(context: TaskAttemptContext): String = {
".mergetree"
@@ -107,25 +79,18 @@ class DeltaMergeTreeFileFormat(metadata: Metadata) extends DeltaParquetFileForma
path: String,
dataSchema: StructType,
context: TaskAttemptContext): OutputWriter = {
+ require(path == deltaPath)
GlutenMergeTreeWriterInjects
.getInstance()
.asInstanceOf[CHMergeTreeWriterInjects]
.createOutputWriter(
path,
- database,
- tableName,
- snapshotId,
- orderByKeyOption,
- lowCardKeyOption,
- minmaxIndexKeyOption,
- bfIndexKeyOption,
- setIndexKeyOption,
- primaryKeyOption,
- partitionColumns,
metadata.schema,
- clickhouseTableConfigs,
context,
- nativeConf
+ nativeConf,
+ database,
+ tableName,
+ extensionTableBC.value
)
}
}
diff --git a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/DeltaLog.scala b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/DeltaLog.scala
index dca14d7fb1fb..bac5231309b8 100644
--- a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/DeltaLog.scala
+++ b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/DeltaLog.scala
@@ -784,9 +784,6 @@ object DeltaLog extends DeltaLogging {
FileSourceOptions.IGNORE_CORRUPT_FILES -> "false",
FileSourceOptions.IGNORE_MISSING_FILES -> "false"
)
- // --- modified start
- // Don't need to add the bucketOption here, it handles the delta log meta json file
- // --- modified end
val fsRelation = HadoopFsRelation(
index, index.partitionSchema, schema, None, index.format, allOptions)(spark)
LogicalRelation(fsRelation)
diff --git a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala
index 8b4a13a30a69..f5f1668f60b5 100644
--- a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala
+++ b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala
@@ -30,7 +30,8 @@ import org.apache.spark.sql.delta.{ClickhouseSnapshot, DeltaErrors, DeltaLog, De
import org.apache.spark.sql.delta.actions.{Metadata, Protocol}
import org.apache.spark.sql.delta.sources.DeltaDataSource
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, PartitionDirectory}
-import org.apache.spark.sql.execution.datasources.utils.MergeTreePartsPartitionsUtil
+import org.apache.spark.sql.execution.datasources.clickhouse.utils.MergeTreePartsPartitionsUtil
+import org.apache.spark.sql.execution.datasources.mergetree.StorageMeta
import org.apache.spark.sql.execution.datasources.v2.clickhouse.source.DeltaMergeTreeFileFormat
import org.apache.spark.sql.execution.datasources.v2.clickhouse.utils.CHDataSourceUtils
import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -98,28 +99,19 @@ class ClickHouseTableV2(
def getFileFormat(protocol: Protocol, meta: Metadata): DeltaMergeTreeFileFormat = {
new DeltaMergeTreeFileFormat(
protocol,
- meta,
- dataBaseName,
- tableName,
- ClickhouseSnapshot.genSnapshotId(initialSnapshot),
- orderByKeyOption,
- lowCardKeyOption,
- minmaxIndexKeyOption,
- bfIndexKeyOption,
- setIndexKeyOption,
- primaryKeyOption,
- clickhouseTableConfigs,
- partitionColumns
- )
+ StorageMeta.withMoreStorageInfo(
+ meta,
+ ClickhouseSnapshot.genSnapshotId(initialSnapshot),
+ deltaLog.dataPath))
}
- override def deltaProperties(): ju.Map[String, String] = properties()
+ override def deltaProperties: Map[String, String] = properties().asScala.toMap
- override def deltaCatalog(): Option[CatalogTable] = catalogTable
+ override def deltaCatalog: Option[CatalogTable] = catalogTable
- override def deltaPath(): Path = path
+ override def deltaPath: Path = path
- override def deltaSnapshot(): Snapshot = initialSnapshot
+ override def deltaSnapshot: Snapshot = initialSnapshot
def cacheThis(): Unit = {
ClickHouseTableV2.deltaLog2Table.put(deltaLog, this)
@@ -133,7 +125,6 @@ class TempClickHouseTableV2(
override val spark: SparkSession,
override val catalogTable: Option[CatalogTable] = None)
extends ClickHouseTableV2(spark, null, catalogTable) {
- import collection.JavaConverters._
override def properties(): ju.Map[String, String] = catalogTable.get.properties.asJava
override lazy val partitionColumns: Seq[String] = catalogTable.get.partitionColumnNames
override def cacheThis(): Unit = {}
diff --git a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala
index 36ab3fb6c138..1489ae4dbf49 100644
--- a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala
+++ b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala
@@ -20,6 +20,8 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.delta.DeltaParquetFileFormat
import org.apache.spark.sql.delta.actions.{Metadata, Protocol}
import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory}
+import org.apache.spark.sql.execution.datasources.clickhouse.ClickhouseMetaSerializer
+import org.apache.spark.sql.execution.datasources.mergetree.DeltaMetaReader
import org.apache.spark.sql.execution.datasources.v1.{CHMergeTreeWriterInjects, GlutenMergeTreeWriterInjects}
import org.apache.spark.sql.types.StructType
@@ -29,48 +31,6 @@ import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
class DeltaMergeTreeFileFormat(protocol: Protocol, metadata: Metadata)
extends DeltaParquetFileFormat(protocol, metadata) {
- protected var database = ""
- protected var tableName = ""
- protected var snapshotId = ""
- protected var orderByKeyOption: Option[Seq[String]] = None
- protected var lowCardKeyOption: Option[Seq[String]] = None
- protected var minmaxIndexKeyOption: Option[Seq[String]] = None
- protected var bfIndexKeyOption: Option[Seq[String]] = None
- protected var setIndexKeyOption: Option[Seq[String]] = None
- protected var primaryKeyOption: Option[Seq[String]] = None
- protected var partitionColumns: Seq[String] = Seq.empty[String]
- protected var clickhouseTableConfigs: Map[String, String] = Map.empty
-
- // scalastyle:off argcount
- def this(
- protocol: Protocol,
- metadata: Metadata,
- database: String,
- tableName: String,
- snapshotId: String,
- orderByKeyOption: Option[Seq[String]],
- lowCardKeyOption: Option[Seq[String]],
- minmaxIndexKeyOption: Option[Seq[String]],
- bfIndexKeyOption: Option[Seq[String]],
- setIndexKeyOption: Option[Seq[String]],
- primaryKeyOption: Option[Seq[String]],
- clickhouseTableConfigs: Map[String, String],
- partitionColumns: Seq[String]) = {
- this(protocol, metadata)
- this.database = database
- this.tableName = tableName
- this.snapshotId = snapshotId
- this.orderByKeyOption = orderByKeyOption
- this.lowCardKeyOption = lowCardKeyOption
- this.minmaxIndexKeyOption = minmaxIndexKeyOption
- this.bfIndexKeyOption = bfIndexKeyOption
- this.setIndexKeyOption = setIndexKeyOption
- this.primaryKeyOption = primaryKeyOption
- this.clickhouseTableConfigs = clickhouseTableConfigs
- this.partitionColumns = partitionColumns
- }
- // scalastyle:on argcount
-
override def shortName(): String = "mergetree"
override def toString(): String = "MergeTree"
@@ -99,6 +59,17 @@ class DeltaMergeTreeFileFormat(protocol: Protocol, metadata: Metadata)
.getInstance()
.nativeConf(options, "")
+ @transient val deltaMetaReader = DeltaMetaReader(metadata)
+
+ val database = deltaMetaReader.storageDB
+ val tableName = deltaMetaReader.storageTable
+ val deltaPath = deltaMetaReader.storagePath
+
+ val extensionTableBC = sparkSession.sparkContext.broadcast(
+ ClickhouseMetaSerializer
+ .forWrite(deltaMetaReader, metadata.schema)
+ .toByteArray)
+
new OutputWriterFactory {
override def getFileExtension(context: TaskAttemptContext): String = {
".mergetree"
@@ -108,25 +79,18 @@ class DeltaMergeTreeFileFormat(protocol: Protocol, metadata: Metadata)
path: String,
dataSchema: StructType,
context: TaskAttemptContext): OutputWriter = {
+ require(path == deltaPath)
GlutenMergeTreeWriterInjects
.getInstance()
.asInstanceOf[CHMergeTreeWriterInjects]
.createOutputWriter(
path,
- database,
- tableName,
- snapshotId,
- orderByKeyOption,
- lowCardKeyOption,
- minmaxIndexKeyOption,
- bfIndexKeyOption,
- setIndexKeyOption,
- primaryKeyOption,
- partitionColumns,
metadata.schema,
- clickhouseTableConfigs,
context,
- nativeConf
+ nativeConf,
+ database,
+ tableName,
+ extensionTableBC.value
)
}
}
diff --git a/backends-clickhouse/src/main/java/org/apache/spark/sql/execution/datasources/clickhouse/ExtensionTableBuilder.java b/backends-clickhouse/src/main/java/org/apache/spark/sql/execution/datasources/clickhouse/ExtensionTableBuilder.java
index c43775d85cbd..9d6ed6868ec1 100644
--- a/backends-clickhouse/src/main/java/org/apache/spark/sql/execution/datasources/clickhouse/ExtensionTableBuilder.java
+++ b/backends-clickhouse/src/main/java/org/apache/spark/sql/execution/datasources/clickhouse/ExtensionTableBuilder.java
@@ -16,8 +16,6 @@
*/
package org.apache.spark.sql.execution.datasources.clickhouse;
-import org.apache.gluten.expression.ConverterUtils;
-
import java.util.List;
import java.util.Map;
@@ -25,8 +23,6 @@ public class ExtensionTableBuilder {
private ExtensionTableBuilder() {}
public static ExtensionTableNode makeExtensionTable(
- Long minPartsNum,
- Long maxPartsNum,
String database,
String tableName,
String snapshotId,
@@ -38,31 +34,28 @@ public static ExtensionTableNode makeExtensionTable(
String bfIndexKey,
String setIndexKey,
String primaryKey,
- List partList,
- List starts,
- List lengths,
+ ClickhousePartSerializer partSerializer,
String tableSchemaJson,
Map clickhouseTableConfigs,
List preferredLocations) {
+
+ String result =
+ ClickhouseMetaSerializer.apply(
+ database,
+ tableName,
+ snapshotId,
+ relativeTablePath,
+ absoluteTablePath,
+ orderByKey,
+ lowCardKey,
+ minmaxIndexKey,
+ bfIndexKey,
+ setIndexKey,
+ primaryKey,
+ partSerializer,
+ tableSchemaJson,
+ clickhouseTableConfigs);
return new ExtensionTableNode(
- minPartsNum,
- maxPartsNum,
- database,
- tableName,
- snapshotId,
- relativeTablePath,
- absoluteTablePath,
- ConverterUtils.normalizeColName(orderByKey),
- ConverterUtils.normalizeColName(lowCardKey),
- ConverterUtils.normalizeColName(minmaxIndexKey),
- ConverterUtils.normalizeColName(bfIndexKey),
- ConverterUtils.normalizeColName(setIndexKey),
- ConverterUtils.normalizeColName(primaryKey),
- partList,
- starts,
- lengths,
- tableSchemaJson,
- clickhouseTableConfigs,
- preferredLocations);
+ preferredLocations, result, partSerializer.pathList(absoluteTablePath));
}
}
diff --git a/backends-clickhouse/src/main/java/org/apache/spark/sql/execution/datasources/clickhouse/ExtensionTableNode.java b/backends-clickhouse/src/main/java/org/apache/spark/sql/execution/datasources/clickhouse/ExtensionTableNode.java
index 69629c8a09af..bb04652be440 100644
--- a/backends-clickhouse/src/main/java/org/apache/spark/sql/execution/datasources/clickhouse/ExtensionTableNode.java
+++ b/backends-clickhouse/src/main/java/org/apache/spark/sql/execution/datasources/clickhouse/ExtensionTableNode.java
@@ -19,158 +19,25 @@
import org.apache.gluten.backendsapi.BackendsApiManager;
import org.apache.gluten.substrait.rel.SplitInfo;
-import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.protobuf.StringValue;
import io.substrait.proto.ReadRel;
-import java.net.URI;
import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
public class ExtensionTableNode implements SplitInfo {
- private static final String MERGE_TREE = "MergeTree;";
- private Long minPartsNum;
- private Long maxPartsNum;
- private String database;
- private String tableName;
- private String snapshotId;
- private String relativePath;
- private String absolutePath;
- private String tableSchemaJson;
- private StringBuffer extensionTableStr = new StringBuffer(MERGE_TREE);
- private StringBuffer partPathList = new StringBuffer("");
private final List preferredLocations = new ArrayList<>();
-
- private String orderByKey;
-
- private String primaryKey;
-
- private String lowCardKey;
- private String minmaxIndexKey;
- private String bfIndexKey;
- private String setIndexKey;
-
- private List partList;
- private List starts;
- private List lengths;
-
- private Map clickhouseTableConfigs;
+ private final String serializerResult;
+ private final scala.collection.Seq pathList;
ExtensionTableNode(
- Long minPartsNum,
- Long maxPartsNum,
- String database,
- String tableName,
- String snapshotId,
- String relativePath,
- String absolutePath,
- String orderByKey,
- String lowCardKey,
- String minmaxIndexKey,
- String bfIndexKey,
- String setIndexKey,
- String primaryKey,
- List partList,
- List starts,
- List lengths,
- String tableSchemaJson,
- Map clickhouseTableConfigs,
- List preferredLocations) {
- this.minPartsNum = minPartsNum;
- this.maxPartsNum = maxPartsNum;
- this.database = database;
- this.tableName = tableName;
- this.snapshotId = snapshotId;
- URI table_uri = URI.create(relativePath);
- if (table_uri.getPath().startsWith("/")) { // file:///tmp/xxx => tmp/xxx
- this.relativePath = table_uri.getPath().substring(1);
- } else {
- this.relativePath = table_uri.getPath();
- }
- this.absolutePath = absolutePath;
- this.tableSchemaJson = tableSchemaJson;
- this.orderByKey = orderByKey;
- this.lowCardKey = lowCardKey;
- this.minmaxIndexKey = minmaxIndexKey;
- this.bfIndexKey = bfIndexKey;
- this.setIndexKey = setIndexKey;
- this.primaryKey = primaryKey;
- this.partList = partList;
- this.starts = starts;
- this.lengths = lengths;
- this.clickhouseTableConfigs = clickhouseTableConfigs;
+ List preferredLocations,
+ String serializerResult,
+ scala.collection.Seq pathList) {
+ this.pathList = pathList;
this.preferredLocations.addAll(preferredLocations);
-
- // New: MergeTree;{database}\n{table}\n{orderByKey}\n{primaryKey}\n{relative_path}\n
- // {part_path1}\n{part_path2}\n...
- long end = 0;
- for (int i = 0; i < this.partList.size(); i++) {
- end = this.starts.get(i) + this.lengths.get(i);
- partPathList
- .append(this.partList.get(i))
- .append("\n")
- .append(this.starts.get(i))
- .append("\n")
- .append(end)
- .append("\n");
- }
-
- extensionTableStr
- .append(this.database)
- .append("\n")
- .append(this.tableName)
- .append("\n")
- .append(this.snapshotId)
- .append("\n")
- .append(this.tableSchemaJson)
- .append("\n")
- .append(this.orderByKey)
- .append("\n");
-
- if (!this.orderByKey.isEmpty() && !this.orderByKey.equals("tuple()")) {
- extensionTableStr.append(this.primaryKey).append("\n");
- }
- extensionTableStr.append(this.lowCardKey).append("\n");
- extensionTableStr.append(this.minmaxIndexKey).append("\n");
- extensionTableStr.append(this.bfIndexKey).append("\n");
- extensionTableStr.append(this.setIndexKey).append("\n");
- extensionTableStr.append(this.relativePath).append("\n");
- extensionTableStr.append(this.absolutePath).append("\n");
-
- if (this.clickhouseTableConfigs != null && !this.clickhouseTableConfigs.isEmpty()) {
- ObjectMapper objectMapper = new ObjectMapper();
- try {
- String clickhouseTableConfigsJson =
- objectMapper
- .writeValueAsString(this.clickhouseTableConfigs)
- .replaceAll("\\\n", "")
- .replaceAll(" ", "");
- extensionTableStr.append(clickhouseTableConfigsJson).append("\n");
- } catch (Exception e) {
- extensionTableStr.append("").append("\n");
- }
- } else {
- extensionTableStr.append("").append("\n");
- }
- extensionTableStr.append(partPathList);
- /* old format
- if (!this.partList.isEmpty()) {
- } else {
- // Old: MergeTree;{database}\n{table}\n{relative_path}\n{min_part}\n{max_part}\n
- extensionTableStr
- .append(database)
- .append("\n")
- .append(tableName)
- .append("\n")
- .append(relativePath)
- .append("\n")
- .append(this.minPartsNum)
- .append("\n")
- .append(this.maxPartsNum)
- .append("\n");
- } */
+ this.serializerResult = serializerResult;
}
@Override
@@ -180,27 +47,22 @@ public List preferredLocations() {
@Override
public ReadRel.ExtensionTable toProtobuf() {
- ReadRel.ExtensionTable.Builder extensionTableBuilder = ReadRel.ExtensionTable.newBuilder();
- StringValue extensionTable =
- StringValue.newBuilder().setValue(extensionTableStr.toString()).build();
- extensionTableBuilder.setDetail(
- BackendsApiManager.getTransformerApiInstance().packPBMessage(extensionTable));
- return extensionTableBuilder.build();
+ return toProtobuf(serializerResult);
}
- public String getRelativePath() {
- return relativePath;
+ public scala.collection.Seq getPartList() {
+ return pathList;
}
- public String getAbsolutePath() {
- return absolutePath;
- }
-
- public List getPartList() {
- return partList;
+ public String getExtensionTableStr() {
+ return serializerResult;
}
- public String getExtensionTableStr() {
- return extensionTableStr.toString();
+ public static ReadRel.ExtensionTable toProtobuf(String result) {
+ ReadRel.ExtensionTable.Builder extensionTableBuilder = ReadRel.ExtensionTable.newBuilder();
+ StringValue extensionTable = StringValue.newBuilder().setValue(result).build();
+ extensionTableBuilder.setDetail(
+ BackendsApiManager.getTransformerApiInstance().packPBMessage(extensionTable));
+ return extensionTableBuilder.build();
}
}
diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
index 9585d9d5f235..0a3dbc3f5a37 100644
--- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
+++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
@@ -36,7 +36,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.shuffle.CHColumnarShuffleWriter
import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.execution.datasources.FilePartition
-import org.apache.spark.sql.execution.datasources.clickhouse.{ExtensionTableBuilder, ExtensionTableNode}
+import org.apache.spark.sql.execution.datasources.clickhouse.{ClickhousePartSerializer, ExtensionTableBuilder, ExtensionTableNode}
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper
@@ -131,20 +131,8 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil {
properties: Map[String, String]): SplitInfo = {
partition match {
case p: GlutenMergeTreePartition =>
- val partLists = new JArrayList[String]()
- val starts = new JArrayList[JLong]()
- val lengths = new JArrayList[JLong]()
- p.partList
- .foreach(
- parts => {
- partLists.add(parts.name)
- starts.add(parts.start)
- lengths.add(parts.length)
- })
ExtensionTableBuilder
.makeExtensionTable(
- -1L,
- -1L,
p.database,
p.table,
p.snapshotId,
@@ -156,9 +144,7 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil {
p.bfIndexKey,
p.setIndexKey,
p.primaryKey,
- partLists,
- starts,
- lengths,
+ ClickhousePartSerializer.fromMergeTreePartSplits(p.partList.toSeq),
p.tableSchemaJson,
p.clickhouseTableConfigs.asJava,
CHAffinity.getNativeMergeTreePartitionLocations(p).toList.asJava
@@ -222,27 +208,23 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil {
val planByteArray = wsCtx.root.toProtobuf.toByteArray
splitInfos.zipWithIndex.map {
case (splits, index) =>
- val files = ArrayBuffer[String]()
- val splitInfosByteArray = splits.zipWithIndex.map {
+ val (splitInfosByteArray, files) = splits.zipWithIndex.map {
case (split, i) =>
split match {
case filesNode: LocalFilesNode =>
setFileSchemaForLocalFiles(filesNode, scans(i))
- filesNode.getPaths.forEach(f => files += f)
- filesNode.toProtobuf.toByteArray
+ (filesNode.toProtobuf.toByteArray, filesNode.getPaths.asScala.toSeq)
case extensionTableNode: ExtensionTableNode =>
- extensionTableNode.getPartList.forEach(
- name => files += extensionTableNode.getAbsolutePath + "/" + name)
- extensionTableNode.toProtobuf.toByteArray
+ (extensionTableNode.toProtobuf.toByteArray, extensionTableNode.getPartList)
}
- }
+ }.unzip
GlutenPartition(
index,
planByteArray,
splitInfosByteArray.toArray,
locations = splits.flatMap(_.preferredLocations().asScala).toArray,
- files.toArray
+ files.flatten.toArray
)
}
}
diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
index 2cf1f4fcc45b..ba9d859bc9cd 100644
--- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
+++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
@@ -19,7 +19,7 @@ package org.apache.gluten.backendsapi.clickhouse
import org.apache.gluten.backendsapi.RuleApi
import org.apache.gluten.extension._
import org.apache.gluten.extension.columnar._
-import org.apache.gluten.extension.columnar.MiscColumnarRules.{RemoveGlutenTableCacheColumnarToRow, RemoveTopmostColumnarToRow, RewriteSubqueryBroadcast, TransformPreOverrides}
+import org.apache.gluten.extension.columnar.MiscColumnarRules._
import org.apache.gluten.extension.columnar.rewrite.RewriteSparkPlanRulesManager
import org.apache.gluten.extension.columnar.transition.{InsertTransitions, RemoveTransitions}
import org.apache.gluten.extension.injector.{RuleInjector, SparkInjector}
@@ -28,7 +28,7 @@ import org.apache.gluten.parser.{GlutenCacheFilesSqlParser, GlutenClickhouseSqlP
import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.gluten.utils.PhysicalPlanSelector
-import org.apache.spark.sql.catalyst.{CHAggregateFunctionRewriteRule, EqualToRewrite}
+import org.apache.spark.sql.catalyst._
import org.apache.spark.sql.execution.{ColumnarCollapseTransformStages, GlutenFallbackReporter}
import org.apache.spark.util.SparkPlanRules
@@ -44,7 +44,7 @@ class CHRuleApi extends RuleApi {
}
private object CHRuleApi {
- def injectSpark(injector: SparkInjector): Unit = {
+ private def injectSpark(injector: SparkInjector): Unit = {
// Inject the regular Spark rules directly.
injector.injectQueryStagePrepRule(FallbackBroadcastHashJoinPrepQueryStage.apply)
injector.injectQueryStagePrepRule(spark => CHAQEPropagateEmptyRelation(spark))
@@ -61,9 +61,10 @@ private object CHRuleApi {
injector.injectOptimizerRule(spark => CHAggregateFunctionRewriteRule(spark))
injector.injectOptimizerRule(_ => CountDistinctWithoutExpand)
injector.injectOptimizerRule(_ => EqualToRewrite)
+ CHExtendRule.injectSpark(injector)
}
- def injectLegacy(injector: LegacyInjector): Unit = {
+ private def injectLegacy(injector: LegacyInjector): Unit = {
// Gluten columnar: Transform rules.
injector.injectTransform(_ => RemoveTransitions)
injector.injectTransform(_ => PushDownInputFileExpression.PreOffload)
@@ -107,7 +108,7 @@ private object CHRuleApi {
injector.injectFinal(_ => RemoveFallbackTagRule())
}
- def injectRas(injector: RasInjector): Unit = {
+ private def injectRas(injector: RasInjector): Unit = {
// CH backend doesn't work with RAS at the moment. Inject a rule that aborts any
// execution calls.
injector.inject(
diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/ClickhouseSnapshot.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/ClickhouseSnapshot.scala
index f28e9e8fe468..e3f643046671 100644
--- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/ClickhouseSnapshot.scala
+++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/ClickhouseSnapshot.scala
@@ -125,11 +125,6 @@ object ClickhouseSnapshot {
// use timestamp + version as the snapshot id for ch backend
def genSnapshotId(snapshot: Snapshot): String = {
// When CTAS, there is no latest timestamp in the Snapshot
- val ts = if (snapshot.metadata.createdTime.isDefined) {
- snapshot.metadata.createdTime.get
- } else {
- System.currentTimeMillis()
- }
- ts.toString + "_" + snapshot.version.toString
+ s"${snapshot.metadata.createdTime.getOrElse(System.currentTimeMillis())}_${snapshot.version}"
}
}
diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2Base.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2Base.scala
index 4d01c3798d51..062e96962297 100644
--- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2Base.scala
+++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2Base.scala
@@ -16,140 +16,40 @@
*/
package org.apache.spark.sql.delta.catalog
-import org.apache.gluten.expression.ConverterUtils
import org.apache.gluten.expression.ConverterUtils.normalizeColName
-import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable}
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.delta.Snapshot
-import org.apache.spark.sql.execution.datasources.utils.MergeTreeDeltaUtil
+import org.apache.spark.sql.delta.actions.Metadata
+import org.apache.spark.sql.execution.datasources.clickhouse.utils.MergeTreeDeltaUtil
+import org.apache.spark.sql.execution.datasources.mergetree.{StorageMeta, TablePropertiesReader}
import org.apache.hadoop.fs.Path
-import java.{util => ju}
+trait ClickHouseTableV2Base extends TablePropertiesReader {
-trait ClickHouseTableV2Base {
+ def deltaProperties: Map[String, String]
- val DEFAULT_DATABASE = "clickhouse_db"
+ def deltaCatalog: Option[CatalogTable]
- def deltaProperties(): ju.Map[String, String]
+ def deltaPath: Path
- def deltaCatalog(): Option[CatalogTable]
+ def deltaSnapshot: Snapshot
- def deltaPath(): Path
+ def configuration: Map[String, String] = deltaProperties
- def deltaSnapshot(): Snapshot
+ def metadata: Metadata = deltaSnapshot.metadata
- lazy val dataBaseName = deltaCatalog
- .map(_.identifier.database.getOrElse("default"))
- .getOrElse(DEFAULT_DATABASE)
+ lazy val dataBaseName: String = deltaCatalog
+ .map(_.identifier.database.getOrElse(StorageMeta.DEFAULT_CREATE_TABLE_DATABASE))
+ .getOrElse(StorageMeta.DEFAULT_PATH_BASED_DATABASE)
- lazy val tableName = deltaCatalog
+ lazy val tableName: String = deltaCatalog
.map(_.identifier.table)
.getOrElse(deltaPath.toUri.getPath)
- lazy val bucketOption: Option[BucketSpec] = {
- val tableProperties = deltaProperties
- if (tableProperties.containsKey("numBuckets")) {
- val numBuckets = tableProperties.get("numBuckets").trim.toInt
- val bucketColumnNames: Seq[String] =
- getCommaSeparatedColumns("bucketColumnNames").getOrElse(Seq.empty[String])
- val sortColumnNames: Seq[String] =
- getCommaSeparatedColumns("orderByKey").getOrElse(Seq.empty[String])
- Some(BucketSpec(numBuckets, bucketColumnNames, sortColumnNames))
- } else {
- None
- }
- }
-
- lazy val lowCardKeyOption: Option[Seq[String]] = {
- getCommaSeparatedColumns("lowCardKey")
- }
-
- lazy val minmaxIndexKeyOption: Option[Seq[String]] = {
- getCommaSeparatedColumns("minmaxIndexKey")
- }
-
- lazy val bfIndexKeyOption: Option[Seq[String]] = {
- getCommaSeparatedColumns("bloomfilterIndexKey")
- }
-
- lazy val setIndexKeyOption: Option[Seq[String]] = {
- getCommaSeparatedColumns("setIndexKey")
- }
-
- private def getCommaSeparatedColumns(keyName: String) = {
- val tableProperties = deltaProperties
- if (tableProperties.containsKey(keyName)) {
- if (tableProperties.get(keyName).nonEmpty) {
- val keys = tableProperties
- .get(keyName)
- .split(",")
- .map(n => ConverterUtils.normalizeColName(n.trim))
- .toSeq
- keys.foreach(
- s => {
- if (s.contains(".")) {
- throw new IllegalStateException(
- s"$keyName $s can not contain '.' (not support nested column yet)")
- }
- })
- Some(keys)
- } else {
- None
- }
- } else {
- None
- }
- }
-
- lazy val orderByKeyOption: Option[Seq[String]] = {
- if (bucketOption.isDefined && bucketOption.get.sortColumnNames.nonEmpty) {
- val orderByKeys = bucketOption.get.sortColumnNames.map(normalizeColName).toSeq
- val invalidKeys = orderByKeys.intersect(partitionColumns)
- if (invalidKeys.nonEmpty) {
- throw new IllegalStateException(
- s"partition cols $invalidKeys can not be in the order by keys.")
- }
- Some(orderByKeys)
- } else {
- val orderByKeys = getCommaSeparatedColumns("orderByKey")
- if (orderByKeys.isDefined) {
- val invalidKeys = orderByKeys.get.intersect(partitionColumns)
- if (invalidKeys.nonEmpty) {
- throw new IllegalStateException(
- s"partition cols $invalidKeys can not be in the order by keys.")
- }
- orderByKeys
- } else {
- None
- }
- }
- }
-
- lazy val primaryKeyOption: Option[Seq[String]] = {
- if (orderByKeyOption.isDefined) {
- val primaryKeys = getCommaSeparatedColumns("primaryKey")
- if (
- primaryKeys.isDefined && !orderByKeyOption.get
- .mkString(",")
- .startsWith(primaryKeys.get.mkString(","))
- ) {
- throw new IllegalStateException(
- s"Primary key $primaryKeys must be a prefix of the sorting key")
- }
- primaryKeys
- } else {
- None
- }
- }
-
- lazy val partitionColumns = deltaSnapshot.metadata.partitionColumns.map(normalizeColName).toSeq
-
lazy val clickhouseTableConfigs: Map[String, String] = {
- val tableProperties = deltaProperties()
- val configs = scala.collection.mutable.Map[String, String]()
- configs += ("storage_policy" -> tableProperties.getOrDefault("storage_policy", "default"))
- configs.toMap
+ Map("storage_policy" -> deltaProperties.getOrElse("storage_policy", "default"))
}
def primaryKey(): String = MergeTreeDeltaUtil.columnsToStr(primaryKeyOption)
diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala
index 5337e4d31388..341018fb590b 100644
--- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala
+++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala
@@ -27,8 +27,8 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference,
import org.apache.spark.sql.delta._
import org.apache.spark.sql.execution.command.LeafRunnableCommand
import org.apache.spark.sql.execution.commands.GlutenCacheBase._
-import org.apache.spark.sql.execution.datasources.clickhouse.ExtensionTableBuilder
-import org.apache.spark.sql.execution.datasources.utils.MergeTreeDeltaUtil
+import org.apache.spark.sql.execution.datasources.clickhouse.{ClickhousePartSerializer, ExtensionTableBuilder}
+import org.apache.spark.sql.execution.datasources.clickhouse.utils.MergeTreeDeltaUtil
import org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMergeTreeParts
import org.apache.spark.sql.types.{BooleanType, StringType}
@@ -169,13 +169,7 @@ case class GlutenCHCacheDataCommand(
val executorId = value._1
if (parts.nonEmpty) {
val onePart = parts(0)
- val partNameList = parts.map(_.name).toSeq
- // starts and lengths is useless for write
- val partRanges = Seq.range(0L, partNameList.length).map(_ => long2Long(0L)).asJava
-
val extensionTableNode = ExtensionTableBuilder.makeExtensionTable(
- -1,
- -1,
onePart.database,
onePart.table,
ClickhouseSnapshot.genSnapshotId(snapshot),
@@ -188,9 +182,7 @@ case class GlutenCHCacheDataCommand(
snapshot.metadata.configuration.getOrElse("bloomfilterIndexKey", ""),
snapshot.metadata.configuration.getOrElse("setIndexKey", ""),
snapshot.metadata.configuration.getOrElse("primaryKey", ""),
- partNameList.asJava,
- partRanges,
- partRanges,
+ ClickhousePartSerializer.fromPartNames(parts.map(_.name).toSeq),
ConverterUtils.convertNamedStructJson(snapshot.metadata.schema),
snapshot.metadata.configuration.asJava,
new JList[String]()
diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/clickhouse/ClickhouseMetaSerializer.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/clickhouse/ClickhouseMetaSerializer.scala
new file mode 100644
index 000000000000..5c863d76c947
--- /dev/null
+++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/clickhouse/ClickhouseMetaSerializer.scala
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.clickhouse
+
+import org.apache.gluten.execution.MergeTreePartSplit
+import org.apache.gluten.expression.ConverterUtils
+
+import org.apache.spark.sql.execution.datasources.clickhouse.utils.MergeTreeDeltaUtil
+import org.apache.spark.sql.execution.datasources.mergetree.{DeltaMetaReader, StorageMeta}
+import org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMergeTreeParts
+import org.apache.spark.sql.types.StructType
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import io.substrait.proto.ReadRel
+
+import java.net.URI
+import java.util.{Map => jMap}
+
+import scala.collection.JavaConverters._
+
+case class ClickhousePartSerializer(
+ partList: Seq[String],
+ starts: Seq[Long],
+ lengths: Seq[Long]
+) {
+ def apply(): StringBuilder = {
+ val partPathList = new StringBuilder
+ for (i <- partList.indices) {
+ val end = starts(i) + lengths(i)
+ partPathList
+ .append(partList(i))
+ .append("\n")
+ .append(starts(i))
+ .append("\n")
+ .append(end)
+ .append("\n")
+ }
+ partPathList
+ }
+
+ // TODO: remove pathList
+ def pathList(absolutePath: String): Seq[String] = {
+ partList.map(name => absolutePath + "/" + name)
+ }
+}
+
+object ClickhousePartSerializer {
+ def fromMergeTreePartSplits(partLists: Seq[MergeTreePartSplit]): ClickhousePartSerializer = {
+ val partList = partLists.map(_.name)
+ val starts = partLists.map(_.start)
+ val lengths = partLists.map(_.length)
+ ClickhousePartSerializer(partList, starts, lengths)
+ }
+
+ def fromAddMergeTreeParts(parts: Seq[AddMergeTreeParts]): ClickhousePartSerializer = {
+ val partList = parts.map(_.name)
+ val starts = parts.map(_ => 0L)
+ val lengths = parts.map(_.marks)
+ ClickhousePartSerializer(partList, starts, lengths)
+ }
+
+ def fromPartNames(partNames: Seq[String]): ClickhousePartSerializer = {
+ // starts and lengths is useless for writing
+ val partRanges = Seq.range(0L, partNames.length)
+ ClickhousePartSerializer(partNames, partRanges, partRanges)
+ }
+}
+
+object ClickhouseMetaSerializer {
+
+ def forWrite(deltaMetaReader: DeltaMetaReader, dataSchema: StructType): ReadRel.ExtensionTable = {
+ val clickhouseTableConfigs = deltaMetaReader.writeConfiguration
+
+ val orderByKey = clickhouseTableConfigs("storage_orderByKey")
+ val lowCardKey = clickhouseTableConfigs("storage_lowCardKey")
+ val minmaxIndexKey = clickhouseTableConfigs("storage_minmaxIndexKey")
+ val bfIndexKey = clickhouseTableConfigs("storage_bfIndexKey")
+ val setIndexKey = clickhouseTableConfigs("storage_setIndexKey")
+ val primaryKey = clickhouseTableConfigs("storage_primaryKey")
+
+ val result = apply(
+ deltaMetaReader.storageDB,
+ deltaMetaReader.storageTable,
+ deltaMetaReader.storageSnapshotId,
+ deltaMetaReader.storagePath,
+ "", // absolutePath
+ orderByKey,
+ lowCardKey,
+ minmaxIndexKey,
+ bfIndexKey,
+ setIndexKey,
+ primaryKey,
+ ClickhousePartSerializer.fromPartNames(Seq()),
+ ConverterUtils.convertNamedStructJson(dataSchema),
+ clickhouseTableConfigs.filter(_._1 == "storage_policy").asJava
+ )
+ ExtensionTableNode.toProtobuf(result)
+
+ }
+ // scalastyle:off argcount
+ def apply1(
+ database: String,
+ tableName: String,
+ snapshotId: String,
+ relativePath: String,
+ absolutePath: String,
+ orderByKeyOption: Option[Seq[String]],
+ lowCardKeyOption: Option[Seq[String]],
+ minmaxIndexKeyOption: Option[Seq[String]],
+ bfIndexKeyOption: Option[Seq[String]],
+ setIndexKeyOption: Option[Seq[String]],
+ primaryKeyOption: Option[Seq[String]],
+ partSerializer: ClickhousePartSerializer,
+ tableSchemaJson: String,
+ clickhouseTableConfigs: jMap[String, String]): ReadRel.ExtensionTable = {
+
+ val (orderByKey0, primaryKey0) = MergeTreeDeltaUtil.genOrderByAndPrimaryKeyStr(
+ orderByKeyOption,
+ primaryKeyOption
+ )
+
+ val result = apply(
+ database,
+ tableName,
+ snapshotId,
+ relativePath,
+ absolutePath,
+ orderByKey0,
+ lowCardKeyOption.map(MergeTreeDeltaUtil.columnsToStr).getOrElse(""),
+ minmaxIndexKeyOption.map(MergeTreeDeltaUtil.columnsToStr).getOrElse(""),
+ bfIndexKeyOption.map(MergeTreeDeltaUtil.columnsToStr).getOrElse(""),
+ setIndexKeyOption.map(MergeTreeDeltaUtil.columnsToStr).getOrElse(""),
+ primaryKey0,
+ partSerializer,
+ tableSchemaJson,
+ clickhouseTableConfigs
+ )
+ ExtensionTableNode.toProtobuf(result)
+ }
+
+ def apply(
+ database: String,
+ tableName: String,
+ snapshotId: String,
+ relativePath: String,
+ absolutePath: String,
+ orderByKey0: String,
+ lowCardKey0: String,
+ minmaxIndexKey0: String,
+ bfIndexKey0: String,
+ setIndexKey0: String,
+ primaryKey0: String,
+ partSerializer: ClickhousePartSerializer,
+ tableSchemaJson: String,
+ clickhouseTableConfigs: jMap[String, String]): String = {
+ // scalastyle:on argcount
+
+ // New: MergeTree;{database}\n{table}\n{orderByKey}\n{primaryKey}\n{relative_path}\n
+ // {part_path1}\n{part_path2}\n...
+ val extensionTableStr = new StringBuilder(StorageMeta.SERIALIZER_HEADER)
+
+ val orderByKey = ConverterUtils.normalizeColName(orderByKey0)
+ val lowCardKey = ConverterUtils.normalizeColName(lowCardKey0)
+ val minmaxIndexKey = ConverterUtils.normalizeColName(minmaxIndexKey0)
+ val bfIndexKey = ConverterUtils.normalizeColName(bfIndexKey0)
+ val setIndexKey = ConverterUtils.normalizeColName(setIndexKey0)
+ val primaryKey = ConverterUtils.normalizeColName(primaryKey0)
+
+ extensionTableStr
+ .append(database)
+ .append("\n")
+ .append(tableName)
+ .append("\n")
+ .append(snapshotId)
+ .append("\n")
+ .append(tableSchemaJson)
+ .append("\n")
+ .append(orderByKey)
+ .append("\n")
+
+ if (orderByKey.nonEmpty && !(orderByKey == "tuple()")) {
+ extensionTableStr.append(primaryKey).append("\n")
+ }
+
+ extensionTableStr.append(lowCardKey).append("\n")
+ extensionTableStr.append(minmaxIndexKey).append("\n")
+ extensionTableStr.append(bfIndexKey).append("\n")
+ extensionTableStr.append(setIndexKey).append("\n")
+ extensionTableStr.append(normalizeRelativePath(relativePath)).append("\n")
+ extensionTableStr.append(absolutePath).append("\n")
+ appendConfigs(extensionTableStr, clickhouseTableConfigs)
+ extensionTableStr.append(partSerializer())
+
+ extensionTableStr.toString()
+ }
+
+ private def normalizeRelativePath(relativePath: String): String = {
+ val table_uri = URI.create(relativePath)
+ if (table_uri.getPath.startsWith("/")) {
+ table_uri.getPath.substring(1)
+ } else table_uri.getPath
+ }
+
+ private def appendConfigs(
+ extensionTableStr: StringBuilder,
+ clickhouseTableConfigs: jMap[String, String]): Unit = {
+ if (clickhouseTableConfigs != null && !clickhouseTableConfigs.isEmpty) {
+ val objectMapper: ObjectMapper = new ObjectMapper
+ try {
+ val clickhouseTableConfigsJson: String = objectMapper
+ .writeValueAsString(clickhouseTableConfigs)
+ .replaceAll("\n", "")
+ .replaceAll(" ", "")
+ extensionTableStr.append(clickhouseTableConfigsJson).append("\n")
+ } catch {
+ case e: Exception =>
+ extensionTableStr.append("\n")
+ }
+ } else extensionTableStr.append("\n")
+ }
+}
diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/utils/MergeTreeDeltaUtil.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/clickhouse/utils/MergeTreeDeltaUtil.scala
similarity index 75%
rename from backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/utils/MergeTreeDeltaUtil.scala
rename to backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/clickhouse/utils/MergeTreeDeltaUtil.scala
index 6b2af0953f00..854c6f91c917 100644
--- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/utils/MergeTreeDeltaUtil.scala
+++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/clickhouse/utils/MergeTreeDeltaUtil.scala
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.spark.sql.execution.datasources.utils
+package org.apache.spark.sql.execution.datasources.clickhouse.utils
import org.apache.gluten.expression.ConverterUtils.normalizeColName
@@ -25,18 +25,13 @@ object MergeTreeDeltaUtil {
def genOrderByAndPrimaryKeyStr(
orderByKeyOption: Option[Seq[String]],
primaryKeyOption: Option[Seq[String]]): (String, String) = {
+
val orderByKey =
- if (orderByKeyOption.isDefined && orderByKeyOption.get.nonEmpty) {
- columnsToStr(orderByKeyOption)
- } else DEFAULT_ORDER_BY_KEY
-
- val primaryKey =
- if (
- !orderByKey.equals(DEFAULT_ORDER_BY_KEY) && primaryKeyOption.isDefined &&
- primaryKeyOption.get.nonEmpty
- ) {
- columnsToStr(primaryKeyOption)
- } else ""
+ orderByKeyOption.filter(_.nonEmpty).map(columnsToStr).getOrElse(DEFAULT_ORDER_BY_KEY)
+ val primaryKey = primaryKeyOption
+ .filter(p => orderByKey != DEFAULT_ORDER_BY_KEY && p.nonEmpty)
+ .map(columnsToStr)
+ .getOrElse("")
(orderByKey, primaryKey)
}
@@ -45,4 +40,8 @@ object MergeTreeDeltaUtil {
case Some(keys) => keys.map(normalizeColName).mkString(",")
case None => ""
}
+
+ def columnsToStr(keys: Seq[String]): String = {
+ keys.map(normalizeColName).mkString(",")
+ }
}
diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/utils/MergeTreePartsPartitionsUtil.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/clickhouse/utils/MergeTreePartsPartitionsUtil.scala
similarity index 96%
rename from backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/utils/MergeTreePartsPartitionsUtil.scala
rename to backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/clickhouse/utils/MergeTreePartsPartitionsUtil.scala
index c2eb338261fc..dc31822fd73e 100644
--- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/utils/MergeTreePartsPartitionsUtil.scala
+++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/clickhouse/utils/MergeTreePartsPartitionsUtil.scala
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.spark.sql.execution.datasources.utils
+package org.apache.spark.sql.execution.datasources.clickhouse.utils
import org.apache.gluten.backendsapi.clickhouse.{CHBackendSettings, CHConf}
import org.apache.gluten.execution.{GlutenMergeTreePartition, MergeTreePartRange, MergeTreePartSplit}
@@ -35,7 +35,8 @@ import org.apache.spark.sql.delta.ClickhouseSnapshot
import org.apache.spark.sql.delta.catalog.ClickHouseTableV2
import org.apache.spark.sql.delta.files.TahoeFileIndex
import org.apache.spark.sql.execution.datasources.{CHDatasourceJniWrapper, HadoopFsRelation, PartitionDirectory}
-import org.apache.spark.sql.execution.datasources.clickhouse.{ExtensionTableBuilder, MergeTreePartFilterReturnedRange}
+import org.apache.spark.sql.execution.datasources.clickhouse.{ClickhousePartSerializer, ExtensionTableBuilder, MergeTreePartFilterReturnedRange}
+import org.apache.spark.sql.execution.datasources.mergetree.StorageMeta
import org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMergeTreeParts
import org.apache.spark.sql.execution.datasources.v2.clickhouse.source.DeltaMergeTreeFileFormat
import org.apache.spark.sql.types.BooleanType
@@ -48,7 +49,6 @@ import com.google.protobuf.{Any, StringValue}
import io.substrait.proto.NamedStruct
import io.substrait.proto.Plan
-import java.lang.{Long => JLong}
import java.util
import java.util.{ArrayList => JArrayList}
@@ -78,14 +78,15 @@ object MergeTreePartsPartitionsUtil extends Logging {
val fileIndex = relation.location.asInstanceOf[TahoeFileIndex]
// when querying, use deltaLog.update(true) to get the staleness acceptable snapshot
- val snapshotId = ClickhouseSnapshot.genSnapshotId(table.deltaLog.update(true))
+ val snapshotId =
+ ClickhouseSnapshot.genSnapshotId(table.deltaLog.update(stalenessAcceptable = true))
val partitions = new ArrayBuffer[InputPartition]
val (database, tableName) = if (table.catalogTable.isDefined) {
(table.catalogTable.get.identifier.database.get, table.catalogTable.get.identifier.table)
} else {
// for file_format.`file_path`
- (table.DEFAULT_DATABASE, table.deltaPath.toUri.getPath)
+ (StorageMeta.DEFAULT_PATH_BASED_DATABASE, table.deltaPath.toUri.getPath)
}
val engine = "MergeTree"
val relativeTablePath = fileIndex.deltaLog.dataPath.toUri.getPath.substring(1)
@@ -457,9 +458,10 @@ object MergeTreePartsPartitionsUtil extends Logging {
val ret = ClickhouseSnapshot.pathToAddMTPCache.getIfPresent(path)
if (ret == null) {
val keys = ClickhouseSnapshot.pathToAddMTPCache.asMap().keySet()
- val keySample = keys.isEmpty() match {
- case true => ""
- case false => keys.iterator().next()
+ val keySample = if (keys.isEmpty) {
+ ""
+ } else {
+ keys.iterator().next()
}
throw new IllegalStateException(
"Can't find AddMergeTreeParts from cache pathToAddMTPCache for key: " +
@@ -574,20 +576,8 @@ object MergeTreePartsPartitionsUtil extends Logging {
case (l1, l2) => l1.sum / l2.sum
}
- val partLists = new JArrayList[String]()
- val starts = new JArrayList[JLong]()
- val lengths = new JArrayList[JLong]()
- selectPartsFiles.foreach(
- part => {
- partLists.add(part.name)
- starts.add(0)
- lengths.add(part.marks)
- })
-
val extensionTableNode = ExtensionTableBuilder
.makeExtensionTable(
- -1L,
- -1L,
database,
tableName,
snapshotId,
@@ -599,9 +589,7 @@ object MergeTreePartsPartitionsUtil extends Logging {
table.bfIndexKey(),
table.setIndexKey(),
table.primaryKey(),
- partLists,
- starts,
- lengths,
+ ClickhousePartSerializer.fromAddMergeTreeParts(selectPartsFiles),
tableSchemaJson,
clickhouseTableConfigs.asJava,
new JArrayList[String]()
diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/mergetree/DeltaMetaReader.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/mergetree/DeltaMetaReader.scala
new file mode 100644
index 000000000000..de322b65dd8e
--- /dev/null
+++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/mergetree/DeltaMetaReader.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.mergetree
+
+import org.apache.spark.sql.delta.actions.Metadata
+
+class DeltaMetaReader(
+ override val metadata: Metadata,
+ override val configuration: Map[String, String])
+ extends TablePropertiesReader {
+
+ def storageDB: String = configuration(StorageMeta.STORAGE_DB)
+ def storageTable: String = configuration(StorageMeta.STORAGE_TABLE)
+ def storageSnapshotId: String = configuration(StorageMeta.STORAGE_SNAPSHOT_ID)
+ def storagePath: String = configuration(StorageMeta.STORAGE_PATH)
+}
+
+object DeltaMetaReader {
+ def apply(metadata: Metadata): DeltaMetaReader = {
+ new DeltaMetaReader(metadata, metadata.configuration)
+ }
+}
diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/mergetree/StorageMeta.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/mergetree/StorageMeta.scala
new file mode 100644
index 000000000000..e08f91450ec2
--- /dev/null
+++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/mergetree/StorageMeta.scala
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.mergetree
+
+import org.apache.gluten.expression.ConverterUtils.normalizeColName
+
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.delta.actions.Metadata
+import org.apache.spark.sql.execution.datasources.clickhouse.utils.MergeTreeDeltaUtil
+
+import org.apache.hadoop.fs.Path
+
+import scala.collection.mutable.ListBuffer
+
+/** Reserved table property for MergeTree table. */
+object StorageMeta {
+ val Provider: String = "clickhouse"
+ val DEFAULT_FILE_FORMAT: String = "write.format.default"
+ val DEFAULT_FILE_FORMAT_DEFAULT: String = "mergetree"
+
+ // Storage properties
+ val DEFAULT_PATH_BASED_DATABASE: String = "clickhouse_db"
+ val DEFAULT_CREATE_TABLE_DATABASE: String = "default"
+ val STORAGE_DB: String = "storage_db"
+ val STORAGE_TABLE: String = "storage_table"
+ val STORAGE_SNAPSHOT_ID: String = "storage_snapshot_id"
+ val STORAGE_PATH: String = "storage_path"
+ val SERIALIZER_HEADER: String = "MergeTree;"
+
+ def withMoreStorageInfo(
+ metadata: Metadata,
+ snapshotId: String,
+ deltaPath: Path,
+ database: String,
+ tableName: String): Metadata = {
+ val moreOptions = Seq(
+ STORAGE_DB -> database,
+ STORAGE_SNAPSHOT_ID -> snapshotId,
+ STORAGE_TABLE -> tableName,
+ STORAGE_PATH -> deltaPath.toString)
+ withMoreOptions(metadata, moreOptions)
+ }
+ def withMoreStorageInfo(metadata: Metadata, snapshotId: String, deltaPath: Path): Metadata = {
+ val moreOptions =
+ ListBuffer(STORAGE_SNAPSHOT_ID -> snapshotId, STORAGE_PATH -> deltaPath.toString)
+ // Path-based create table statement does not have storage_db and storage_table
+ if (!metadata.configuration.contains(STORAGE_DB)) {
+ moreOptions += STORAGE_DB -> DEFAULT_PATH_BASED_DATABASE
+ }
+ if (!metadata.configuration.contains(STORAGE_TABLE)) {
+ moreOptions += STORAGE_TABLE -> deltaPath.toUri.getPath
+ }
+ withMoreOptions(metadata, moreOptions.toSeq)
+ }
+
+ private def withMoreOptions(metadata: Metadata, newOptions: Seq[(String, String)]): Metadata = {
+ metadata.copy(configuration = metadata.configuration ++ newOptions)
+ }
+}
+
+trait TablePropertiesReader {
+
+ def configuration: Map[String, String]
+
+ /** delta */
+ def metadata: Metadata
+
+ private def getCommaSeparatedColumns(keyName: String): Option[Seq[String]] = {
+ configuration.get(keyName).map {
+ v =>
+ val keys = v.split(",").map(n => normalizeColName(n.trim)).toSeq
+ keys.foreach {
+ s =>
+ if (s.contains(".")) {
+ throw new IllegalStateException(
+ s"$keyName $s can not contain '.' (not support nested column yet)")
+ }
+ }
+ keys
+ }
+ }
+
+ lazy val bucketOption: Option[BucketSpec] = {
+ val tableProperties = configuration
+ if (tableProperties.contains("numBuckets")) {
+ val numBuckets = tableProperties("numBuckets").trim.toInt
+ val bucketColumnNames: Seq[String] =
+ getCommaSeparatedColumns("bucketColumnNames").getOrElse(Seq.empty[String])
+ val sortColumnNames: Seq[String] =
+ getCommaSeparatedColumns("orderByKey").getOrElse(Seq.empty[String])
+ Some(BucketSpec(numBuckets, bucketColumnNames, sortColumnNames))
+ } else {
+ None
+ }
+ }
+
+ lazy val lowCardKeyOption: Option[Seq[String]] = {
+ getCommaSeparatedColumns("lowCardKey")
+ }
+
+ lazy val minmaxIndexKeyOption: Option[Seq[String]] = {
+ getCommaSeparatedColumns("minmaxIndexKey")
+ }
+
+ lazy val bfIndexKeyOption: Option[Seq[String]] = {
+ getCommaSeparatedColumns("bloomfilterIndexKey")
+ }
+
+ lazy val setIndexKeyOption: Option[Seq[String]] = {
+ getCommaSeparatedColumns("setIndexKey")
+ }
+
+ lazy val partitionColumns: Seq[String] =
+ metadata.partitionColumns.map(normalizeColName)
+
+ lazy val orderByKeyOption: Option[Seq[String]] = {
+ val orderByKeys =
+ if (bucketOption.exists(_.sortColumnNames.nonEmpty)) {
+ bucketOption.map(_.sortColumnNames.map(normalizeColName))
+ } else {
+ getCommaSeparatedColumns("orderByKey")
+ }
+ orderByKeys
+ .map(_.intersect(partitionColumns))
+ .filter(_.nonEmpty)
+ .foreach {
+ invalidKeys =>
+ throw new IllegalStateException(
+ s"partition cols $invalidKeys can not be in the order by keys.")
+ }
+ orderByKeys
+ }
+
+ lazy val primaryKeyOption: Option[Seq[String]] = {
+ orderByKeyOption.map(_.mkString(",")).flatMap {
+ orderBy =>
+ val primaryKeys = getCommaSeparatedColumns("primaryKey")
+ primaryKeys
+ .map(_.mkString(","))
+ .filterNot(orderBy.startsWith)
+ .foreach(
+ primaryKey =>
+ throw new IllegalStateException(
+ s"Primary key $primaryKey must be a prefix of the sorting key $orderBy"))
+ primaryKeys
+ }
+ }
+
+ lazy val writeConfiguration: Map[String, String] = {
+ val (orderByKey0, primaryKey0) = MergeTreeDeltaUtil.genOrderByAndPrimaryKeyStr(
+ orderByKeyOption,
+ primaryKeyOption
+ )
+ Map(
+ "storage_policy" -> configuration.getOrElse("storage_policy", "default"),
+ "storage_orderByKey" -> orderByKey0,
+ "storage_lowCardKey" -> lowCardKeyOption.map(MergeTreeDeltaUtil.columnsToStr).getOrElse(""),
+ "storage_minmaxIndexKey" -> minmaxIndexKeyOption
+ .map(MergeTreeDeltaUtil.columnsToStr)
+ .getOrElse(""),
+ "storage_bfIndexKey" -> bfIndexKeyOption.map(MergeTreeDeltaUtil.columnsToStr).getOrElse(""),
+ "storage_setIndexKey" -> setIndexKeyOption.map(MergeTreeDeltaUtil.columnsToStr).getOrElse(""),
+ "storage_primaryKey" -> primaryKey0
+ )
+ }
+}
diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHFormatWriterInjects.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHFormatWriterInjects.scala
index 36b2a44d13b4..69c001e461d8 100644
--- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHFormatWriterInjects.scala
+++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHFormatWriterInjects.scala
@@ -83,26 +83,6 @@ trait CHFormatWriterInjects extends GlutenFormatWriterInjectsBase {
// TODO: parquet and mergetree
OrcUtils.inferSchema(sparkSession, files, options)
}
-
- // scalastyle:off argcount
- /** For CH MergeTree format */
- def createOutputWriter(
- path: String,
- database: String,
- tableName: String,
- snapshotId: String,
- orderByKeyOption: Option[Seq[String]],
- lowCardKeyOption: Option[Seq[String]],
- minmaxIndexKeyOption: Option[Seq[String]],
- bfIndexKeyOption: Option[Seq[String]],
- setIndexKeyOption: Option[Seq[String]],
- primaryKeyOption: Option[Seq[String]],
- partitionColumns: Seq[String],
- tableSchema: StructType,
- clickhouseTableConfigs: Map[String, String],
- context: TaskAttemptContext,
- nativeConf: java.util.Map[String, String]): OutputWriter = null
- // scalastyle:on argcount
}
class CHRowSplitter extends GlutenRowSplitter {
diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHMergeTreeWriterInjects.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHMergeTreeWriterInjects.scala
index cb7dfa8f3835..521b59d60e29 100644
--- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHMergeTreeWriterInjects.scala
+++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHMergeTreeWriterInjects.scala
@@ -17,7 +17,6 @@
package org.apache.spark.sql.execution.datasources.v1
import org.apache.gluten.expression.ConverterUtils
-import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.gluten.substrait.`type`.ColumnTypeNode
import org.apache.gluten.substrait.SubstraitContext
import org.apache.gluten.substrait.extensions.ExtensionBuilder
@@ -27,8 +26,7 @@ import org.apache.gluten.utils.ConfigUtil
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.execution.datasources.{CHDatasourceJniWrapper, OutputWriter}
-import org.apache.spark.sql.execution.datasources.clickhouse.ExtensionTableBuilder
-import org.apache.spark.sql.execution.datasources.utils.MergeTreeDeltaUtil
+import org.apache.spark.sql.execution.datasources.clickhouse.{ClickhouseMetaSerializer, ClickhousePartSerializer}
import org.apache.spark.sql.execution.datasources.v1.clickhouse.MergeTreeOutputWriter
import org.apache.spark.sql.types.StructType
@@ -37,7 +35,7 @@ import com.google.protobuf.{Any, StringValue}
import io.substrait.proto.NamedStruct
import org.apache.hadoop.mapreduce.TaskAttemptContext
-import java.util.{ArrayList => JList, Map => JMap, UUID}
+import java.util.{Map => JMap, UUID}
import scala.collection.JavaConverters._
@@ -57,50 +55,22 @@ class CHMergeTreeWriterInjects extends CHFormatWriterInjects {
context: TaskAttemptContext,
nativeConf: JMap[String, String]): OutputWriter = null
- // scalastyle:off argcount
- override def createOutputWriter(
+ override val formatName: String = "mergetree"
+
+ def createOutputWriter(
path: String,
+ dataSchema: StructType,
+ context: TaskAttemptContext,
+ nativeConf: JMap[String, String],
database: String,
tableName: String,
- snapshotId: String,
- orderByKeyOption: Option[Seq[String]],
- lowCardKeyOption: Option[Seq[String]],
- minmaxIndexKeyOption: Option[Seq[String]],
- bfIndexKeyOption: Option[Seq[String]],
- setIndexKeyOption: Option[Seq[String]],
- primaryKeyOption: Option[Seq[String]],
- partitionColumns: Seq[String],
- tableSchema: StructType,
- clickhouseTableConfigs: Map[String, String],
- context: TaskAttemptContext,
- nativeConf: JMap[String, String]): OutputWriter = {
-
- val uuid = UUID.randomUUID.toString
-
- val planWithSplitInfo = CHMergeTreeWriterInjects.genMergeTreeWriteRel(
- path,
- database,
- tableName,
- snapshotId,
- orderByKeyOption,
- lowCardKeyOption,
- minmaxIndexKeyOption,
- bfIndexKeyOption,
- setIndexKeyOption,
- primaryKeyOption,
- partitionColumns,
- Seq(),
- ConverterUtils.convertNamedStructJson(tableSchema),
- clickhouseTableConfigs,
- // use table schema instead of data schema
- SparkShimLoader.getSparkShims.attributesFromStruct(tableSchema)
- )
+ splitInfo: Array[Byte]): OutputWriter = {
val datasourceJniWrapper = new CHDatasourceJniWrapper()
val instance =
datasourceJniWrapper.nativeInitMergeTreeWriterWrapper(
- planWithSplitInfo.plan,
- planWithSplitInfo.splitInfo,
- uuid,
+ null,
+ splitInfo,
+ UUID.randomUUID.toString,
context.getTaskAttemptID.getTaskID.getId.toString,
context.getConfiguration.get("mapreduce.task.gluten.mergetree.partition.dir"),
context.getConfiguration.get("mapreduce.task.gluten.mergetree.bucketid.str"),
@@ -109,10 +79,6 @@ class CHMergeTreeWriterInjects extends CHFormatWriterInjects {
new MergeTreeOutputWriter(database, tableName, datasourceJniWrapper, instance, path)
}
- // scalastyle:on argcount
-
- override val formatName: String = "mergetree"
-
}
object CHMergeTreeWriterInjects {
@@ -146,39 +112,23 @@ object CHMergeTreeWriterInjects {
}
}.asJava
- val (orderByKey, primaryKey) = MergeTreeDeltaUtil.genOrderByAndPrimaryKeyStr(
- orderByKeyOption,
- primaryKeyOption
- )
-
- val lowCardKey = MergeTreeDeltaUtil.columnsToStr(lowCardKeyOption)
- val minmaxIndexKey = MergeTreeDeltaUtil.columnsToStr(minmaxIndexKeyOption)
- val bfIndexKey = MergeTreeDeltaUtil.columnsToStr(bfIndexKeyOption)
- val setIndexKey = MergeTreeDeltaUtil.columnsToStr(setIndexKeyOption)
-
val substraitContext = new SubstraitContext
- val extensionTableNode = ExtensionTableBuilder.makeExtensionTable(
- -1,
- -1,
+
+ val extensionTable = ClickhouseMetaSerializer.apply1(
database,
tableName,
snapshotId,
path,
"",
- orderByKey,
- lowCardKey,
- minmaxIndexKey,
- bfIndexKey,
- setIndexKey,
- primaryKey,
- scala.collection.JavaConverters.seqAsJavaList(partList),
- scala.collection.JavaConverters.seqAsJavaList(
- Seq.range(0L, partList.length).map(long2Long)
- ), // starts and lengths is useless for write
- scala.collection.JavaConverters.seqAsJavaList(Seq.range(0L, partList.length).map(long2Long)),
+ orderByKeyOption,
+ lowCardKeyOption,
+ minmaxIndexKeyOption,
+ bfIndexKeyOption,
+ setIndexKeyOption,
+ primaryKeyOption,
+ ClickhousePartSerializer.fromPartNames(partList),
tableSchemaJson,
- clickhouseTableConfigs.asJava,
- new JList[String]()
+ clickhouseTableConfigs.asJava
)
val optimizationContent = "isMergeTree=1\n"
@@ -197,6 +147,6 @@ object CHMergeTreeWriterInjects {
val plan =
PlanBuilder.makePlan(substraitContext, Lists.newArrayList(relNode), nameList).toProtobuf
- PlanWithSplitInfo(plan.toByteArray, extensionTableNode.toProtobuf.toByteArray)
+ PlanWithSplitInfo(plan.toByteArray, extensionTable.toByteArray)
}
}
diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/clickhouse/MergeTreeOutputWriter.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/clickhouse/MergeTreeOutputWriter.scala
index 06e2b98bc8b2..52593d7c1795 100644
--- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/clickhouse/MergeTreeOutputWriter.scala
+++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/clickhouse/MergeTreeOutputWriter.scala
@@ -43,7 +43,7 @@ class MergeTreeOutputWriter(
if (nextBatch.numRows > 0) {
val col = nextBatch.column(0).asInstanceOf[CHColumnVector]
datasourceJniWrapper.writeToMergeTree(instance, col.getBlockAddress)
- } // else just ignore this empty block
+ } // else ignore this empty block
}
override def close(): Unit = {
diff --git a/backends-clickhouse/src/main/spark32/org/apache/gluten/backendsapi/clickhouse/CHExtendRule.scala b/backends-clickhouse/src/main/spark32/org/apache/gluten/backendsapi/clickhouse/CHExtendRule.scala
new file mode 100644
index 000000000000..234954386adb
--- /dev/null
+++ b/backends-clickhouse/src/main/spark32/org/apache/gluten/backendsapi/clickhouse/CHExtendRule.scala
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.backendsapi.clickhouse
+
+import org.apache.gluten.extension.injector.SparkInjector
+
+object CHExtendRule {
+ def injectSpark(injector: SparkInjector): Unit = {}
+}
diff --git a/backends-clickhouse/src/main/spark33/org/apache/gluten/backendsapi/clickhouse/CHExtendRule.scala b/backends-clickhouse/src/main/spark33/org/apache/gluten/backendsapi/clickhouse/CHExtendRule.scala
new file mode 100644
index 000000000000..234954386adb
--- /dev/null
+++ b/backends-clickhouse/src/main/spark33/org/apache/gluten/backendsapi/clickhouse/CHExtendRule.scala
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.backendsapi.clickhouse
+
+import org.apache.gluten.extension.injector.SparkInjector
+
+object CHExtendRule {
+ def injectSpark(injector: SparkInjector): Unit = {}
+}
diff --git a/backends-clickhouse/src/main/spark35/org/apache/gluten/backendsapi/clickhouse/CHExtendRule.scala b/backends-clickhouse/src/main/spark35/org/apache/gluten/backendsapi/clickhouse/CHExtendRule.scala
new file mode 100644
index 000000000000..fb3a854ef98c
--- /dev/null
+++ b/backends-clickhouse/src/main/spark35/org/apache/gluten/backendsapi/clickhouse/CHExtendRule.scala
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.backendsapi.clickhouse
+
+import org.apache.gluten.extension.injector.SparkInjector
+
+import org.apache.spark.sql.catalyst.AddStorageInfo
+
+object CHExtendRule {
+ def injectSpark(injector: SparkInjector): Unit = {
+ injector.injectOptimizerRule(_ => AddStorageInfo)
+ }
+}
diff --git a/backends-clickhouse/src/main/spark35/org/apache/spark/sql/catalyst/AddStorageInfo.scala b/backends-clickhouse/src/main/spark35/org/apache/spark/sql/catalyst/AddStorageInfo.scala
new file mode 100644
index 000000000000..760241f840f2
--- /dev/null
+++ b/backends-clickhouse/src/main/spark35/org/apache/spark/sql/catalyst/AddStorageInfo.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst
+
+import org.apache.spark.sql.catalyst.analysis.ResolvedIdentifier
+import org.apache.spark.sql.catalyst.plans.logical.{CreateTable, LogicalPlan, TableSpec}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.TreePattern.COMMAND
+import org.apache.spark.sql.execution.datasources.mergetree.StorageMeta
+
+/** This object is responsible for adding storage information to the CreateTable. */
+
+object AddStorageInfo extends Rule[LogicalPlan] {
+
+ private def createMergeTreeTable(tableSpec: TableSpec): Boolean = {
+ tableSpec.provider.contains(StorageMeta.Provider) ||
+ tableSpec.properties
+ .get(StorageMeta.DEFAULT_FILE_FORMAT)
+ .contains(StorageMeta.DEFAULT_FILE_FORMAT_DEFAULT)
+ }
+
+ override def apply(plan: LogicalPlan): LogicalPlan =
+ plan.transformWithPruning(_.containsAnyPattern(COMMAND)) {
+ case create @ CreateTable(ResolvedIdentifier(_, ident), _, _, tableSpec: TableSpec, _)
+ if createMergeTreeTable(tableSpec) =>
+ val newTableSpec = tableSpec.copy(
+ properties = tableSpec.properties ++ Seq(
+ StorageMeta.STORAGE_DB -> ident
+ .namespace()
+ .lastOption
+ .getOrElse(StorageMeta.DEFAULT_CREATE_TABLE_DATABASE),
+ StorageMeta.STORAGE_TABLE -> ident.name())
+ )
+ create.copy(tableSpec = newTableSpec)
+ }
+}
diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreePathBasedWriteSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreePathBasedWriteSuite.scala
index a1dd5d8687a0..62b9ee3bcb31 100644
--- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreePathBasedWriteSuite.scala
+++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreePathBasedWriteSuite.scala
@@ -17,6 +17,7 @@
package org.apache.gluten.execution.mergetree
import org.apache.gluten.execution._
+import org.apache.gluten.utils.Arm
import org.apache.spark.SparkConf
import org.apache.spark.sql.SaveMode
@@ -24,6 +25,7 @@ import org.apache.spark.sql.delta.catalog.ClickHouseTableV2
import org.apache.spark.sql.delta.files.TahoeFileIndex
import org.apache.spark.sql.execution.LocalTableScanExec
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+import org.apache.spark.sql.execution.datasources.mergetree.StorageMeta
import org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMergeTreeParts
import org.apache.spark.sql.functions._
@@ -162,7 +164,7 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite
val planNodeJson = wholeStageTransformer.substraitPlanJson
assert(
!planNodeJson
- .replaceAll("\\\n", "")
+ .replaceAll("\n", "")
.replaceAll(" ", "")
.contains("\"input\":{\"filter\":{"))
}
@@ -269,7 +271,7 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite
val planNodeJson = wholeStageTransformer.substraitPlanJson
assert(
!planNodeJson
- .replaceAll("\\\n", "")
+ .replaceAll("\n", "")
.replaceAll(" ", "")
.contains("\"input\":{\"filter\":{"))
}
@@ -1006,7 +1008,7 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite
// find a folder whose name is like 48b70783-b3b8-4bf8-9c52-5261aead8e3e_0_006
val partDir = directory.listFiles().filter(f => f.getName.length > 20).head
val columnsFile = new File(partDir, "columns.txt")
- val columns = Source.fromFile(columnsFile).getLines().mkString
+ val columns = Arm.withResource(Source.fromFile(columnsFile))(_.getLines().mkString)
assert(columns.contains("`l_returnflag` LowCardinality(Nullable(String))"))
assert(columns.contains("`l_linestatus` LowCardinality(Nullable(String))"))
@@ -1366,34 +1368,39 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite
.option("clickhouse.lowCardKey", "l_returnflag,l_linestatus")
.save(dataPath1)
- val df = spark.read
- .format("clickhouse")
- .load(dataPath)
- val result = df.collect()
- assertResult(600572)(result.size)
+ {
+ val df = spark.read
+ .format("clickhouse")
+ .load(dataPath)
+ val result = df.collect()
+ assertResult(600572)(result.length)
- val plans = collect(df.queryExecution.executedPlan) {
- case f: FileSourceScanExecTransformer => f
+ val plans = collect(df.queryExecution.executedPlan) {
+ case f: FileSourceScanExecTransformer => f
+ }
+ val partitions = plans.head.getPartitions
+ assert(partitions.nonEmpty)
+ assert(partitions.head.isInstanceOf[GlutenMergeTreePartition])
+ val mergeTreePartition = partitions.head.asInstanceOf[GlutenMergeTreePartition]
+ assertResult(mergeTreePartition.database)(StorageMeta.DEFAULT_PATH_BASED_DATABASE)
+ assertResult(mergeTreePartition.table)(dataPath)
}
- val partitions = plans(0).getPartitions
- assert(partitions.nonEmpty)
- assert(partitions(0).isInstanceOf[GlutenMergeTreePartition])
- assert(partitions(0).asInstanceOf[GlutenMergeTreePartition].database.equals("clickhouse_db"))
- assert(partitions(0).asInstanceOf[GlutenMergeTreePartition].table.equals(dataPath))
-
- val df1 = spark.read
- .format("clickhouse")
- .load(dataPath1)
- val result1 = df1.collect()
- assertResult(600572)(result.size)
+ {
+ val df1 = spark.read
+ .format("clickhouse")
+ .load(dataPath1)
+ val result1 = df1.collect()
+ assertResult(600572)(result1.length)
- val plans1 = collect(df1.queryExecution.executedPlan) {
- case f: FileSourceScanExecTransformer => f
+ val plans1 = collect(df1.queryExecution.executedPlan) {
+ case f: FileSourceScanExecTransformer => f
+ }
+ val partitions1 = plans1.head.getPartitions
+ assert(partitions1.nonEmpty)
+ assert(partitions1.head.isInstanceOf[GlutenMergeTreePartition])
+ val mergeTreePartition1 = partitions1.head.asInstanceOf[GlutenMergeTreePartition]
+ assertResult(mergeTreePartition1.database)(StorageMeta.DEFAULT_PATH_BASED_DATABASE)
+ assertResult(mergeTreePartition1.table)(dataPath1)
}
- val partitions1 = plans1(0).getPartitions
- assert(partitions1.nonEmpty)
- assert(partitions1(0).isInstanceOf[GlutenMergeTreePartition])
- assert(partitions1(0).asInstanceOf[GlutenMergeTreePartition].database.equals("clickhouse_db"))
- assert(partitions1(0).asInstanceOf[GlutenMergeTreePartition].table.equals(dataPath1))
}
}
diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickhouseMergetreeSoftAffinitySuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickhouseMergetreeSoftAffinitySuite.scala
index 97418c670f71..aead0bf47fa3 100644
--- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickhouseMergetreeSoftAffinitySuite.scala
+++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickhouseMergetreeSoftAffinitySuite.scala
@@ -22,7 +22,7 @@ import org.apache.gluten.execution.{GlutenClickHouseTPCHAbstractSuite, GlutenMer
import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.delta.catalog.ClickHouseTableV2
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
-import org.apache.spark.sql.execution.datasources.utils.MergeTreePartsPartitionsUtil
+import org.apache.spark.sql.execution.datasources.clickhouse.utils.MergeTreePartsPartitionsUtil
import org.apache.hadoop.fs.Path