Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GLUTEN-7028][CH][Part-4] Refactor DeltaMergeTreeFileFormat to read table configuration from deltalog's metadata #7170

Merged
merged 8 commits into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions backends-clickhouse/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,7 @@
<include>src/test/scala/**/*.scala</include>
<include>src/main/delta-${delta.binary.version}/**/*.scala</include>
<include>src/test/delta-${delta.binary.version}/**/*.scala</include>
<include>src/main/${sparkshim.module.name}/**/*.scala</include>
</includes>
<excludes>
<exclude>src/main/delta-${delta.binary.version}/org/apache/spark/sql/delta/commands/*.scala</exclude>
Expand Down Expand Up @@ -397,6 +398,7 @@
<configuration>
<sources>
<source>src/main/delta-${delta.binary.version}</source>
<source>src/main/${sparkshim.module.name}</source>
</sources>
</configuration>
</execution>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ import org.apache.spark.sql.delta.actions.Metadata
import org.apache.spark.sql.delta.catalog.ClickHouseTableV2.deltaLog2Table
import org.apache.spark.sql.delta.sources.DeltaDataSource
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, PartitionDirectory}
import org.apache.spark.sql.execution.datasources.utils.MergeTreePartsPartitionsUtil
import org.apache.spark.sql.execution.datasources.clickhouse.utils.MergeTreePartsPartitionsUtil
import org.apache.spark.sql.execution.datasources.mergetree.StorageMeta
import org.apache.spark.sql.execution.datasources.v2.clickhouse.source.DeltaMergeTreeFileFormat
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.collection.BitSet
Expand Down Expand Up @@ -95,28 +96,21 @@ class ClickHouseTableV2(

def getFileFormat(meta: Metadata): DeltaMergeTreeFileFormat = {
new DeltaMergeTreeFileFormat(
meta,
dataBaseName,
tableName,
ClickhouseSnapshot.genSnapshotId(snapshot),
orderByKeyOption,
lowCardKeyOption,
minmaxIndexKeyOption,
bfIndexKeyOption,
setIndexKeyOption,
primaryKeyOption,
clickhouseTableConfigs,
partitionColumns
)
StorageMeta.withMoreStorageInfo(
meta,
ClickhouseSnapshot.genSnapshotId(snapshot),
deltaLog.dataPath,
dataBaseName,
tableName))
}

override def deltaProperties(): ju.Map[String, String] = properties()
override def deltaProperties: Map[String, String] = properties().asScala.toMap

override def deltaCatalog(): Option[CatalogTable] = catalogTable
override def deltaCatalog: Option[CatalogTable] = catalogTable

override def deltaPath(): Path = path
override def deltaPath: Path = path

override def deltaSnapshot(): Snapshot = snapshot
override def deltaSnapshot: Snapshot = snapshot

def cacheThis(): Unit = {
deltaLog2Table.put(deltaLog, this)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.delta.DeltaParquetFileFormat
import org.apache.spark.sql.delta.actions.Metadata
import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory}
import org.apache.spark.sql.execution.datasources.clickhouse.ClickhouseMetaSerializer
import org.apache.spark.sql.execution.datasources.mergetree.DeltaMetaReader
import org.apache.spark.sql.execution.datasources.v1.{CHMergeTreeWriterInjects, GlutenMergeTreeWriterInjects}
import org.apache.spark.sql.types.StructType

Expand All @@ -28,47 +30,6 @@ import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
class DeltaMergeTreeFileFormat(metadata: Metadata)
extends DeltaParquetFileFormat(metadata.columnMappingMode, metadata.schema) {

protected var database = ""
protected var tableName = ""
protected var snapshotId = ""
protected var orderByKeyOption: Option[Seq[String]] = None
protected var lowCardKeyOption: Option[Seq[String]] = None
protected var minmaxIndexKeyOption: Option[Seq[String]] = None
protected var bfIndexKeyOption: Option[Seq[String]] = None
protected var setIndexKeyOption: Option[Seq[String]] = None
protected var primaryKeyOption: Option[Seq[String]] = None
protected var partitionColumns: Seq[String] = Seq.empty[String]
protected var clickhouseTableConfigs: Map[String, String] = Map.empty

// scalastyle:off argcount
def this(
metadata: Metadata,
database: String,
tableName: String,
snapshotId: String,
orderByKeyOption: Option[Seq[String]],
lowCardKeyOption: Option[Seq[String]],
minmaxIndexKeyOption: Option[Seq[String]],
bfIndexKeyOption: Option[Seq[String]],
setIndexKeyOption: Option[Seq[String]],
primaryKeyOption: Option[Seq[String]],
clickhouseTableConfigs: Map[String, String],
partitionColumns: Seq[String]) {
this(metadata)
this.database = database
this.tableName = tableName
this.snapshotId = snapshotId
this.orderByKeyOption = orderByKeyOption
this.lowCardKeyOption = lowCardKeyOption
this.minmaxIndexKeyOption = minmaxIndexKeyOption
this.bfIndexKeyOption = bfIndexKeyOption
this.setIndexKeyOption = setIndexKeyOption
this.primaryKeyOption = primaryKeyOption
this.clickhouseTableConfigs = clickhouseTableConfigs
this.partitionColumns = partitionColumns
}
// scalastyle:on argcount

override def shortName(): String = "mergetree"

override def toString(): String = "MergeTree"
Expand All @@ -95,6 +56,17 @@ class DeltaMergeTreeFileFormat(metadata: Metadata)
.getInstance()
.nativeConf(options, "")

@transient val deltaMetaReader = DeltaMetaReader(metadata)

val database = deltaMetaReader.storageDB
val tableName = deltaMetaReader.storageTable
val deltaPath = deltaMetaReader.storagePath

val extensionTableBC = sparkSession.sparkContext.broadcast(
ClickhouseMetaSerializer
.forWrite(deltaMetaReader, metadata.schema)
.toByteArray)

new OutputWriterFactory {
override def getFileExtension(context: TaskAttemptContext): String = {
".mergetree"
Expand All @@ -104,25 +76,18 @@ class DeltaMergeTreeFileFormat(metadata: Metadata)
path: String,
dataSchema: StructType,
context: TaskAttemptContext): OutputWriter = {
require(path == deltaPath)
GlutenMergeTreeWriterInjects
.getInstance()
.asInstanceOf[CHMergeTreeWriterInjects]
.createOutputWriter(
path,
database,
tableName,
snapshotId,
orderByKeyOption,
lowCardKeyOption,
minmaxIndexKeyOption,
bfIndexKeyOption,
setIndexKeyOption,
primaryKeyOption,
partitionColumns,
metadata.schema,
clickhouseTableConfigs,
context,
nativeConf
nativeConf,
database,
tableName,
extensionTableBC.value
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ import org.apache.spark.sql.delta.actions.Metadata
import org.apache.spark.sql.delta.catalog.ClickHouseTableV2.deltaLog2Table
import org.apache.spark.sql.delta.sources.DeltaDataSource
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, PartitionDirectory}
import org.apache.spark.sql.execution.datasources.utils.MergeTreePartsPartitionsUtil
import org.apache.spark.sql.execution.datasources.clickhouse.utils.MergeTreePartsPartitionsUtil
import org.apache.spark.sql.execution.datasources.mergetree.StorageMeta
import org.apache.spark.sql.execution.datasources.v2.clickhouse.source.DeltaMergeTreeFileFormat
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.collection.BitSet
Expand Down Expand Up @@ -95,28 +96,21 @@ class ClickHouseTableV2(

def getFileFormat(meta: Metadata): DeltaMergeTreeFileFormat = {
new DeltaMergeTreeFileFormat(
meta,
dataBaseName,
tableName,
ClickhouseSnapshot.genSnapshotId(snapshot),
orderByKeyOption,
lowCardKeyOption,
minmaxIndexKeyOption,
bfIndexKeyOption,
setIndexKeyOption,
primaryKeyOption,
clickhouseTableConfigs,
partitionColumns
)
StorageMeta.withMoreStorageInfo(
meta,
ClickhouseSnapshot.genSnapshotId(snapshot),
deltaLog.dataPath,
dataBaseName,
tableName))
}

override def deltaProperties(): ju.Map[String, String] = properties()
override def deltaProperties: Map[String, String] = properties().asScala.toMap

override def deltaCatalog(): Option[CatalogTable] = catalogTable
override def deltaCatalog: Option[CatalogTable] = catalogTable

override def deltaPath(): Path = path
override def deltaPath: Path = path

override def deltaSnapshot(): Snapshot = snapshot
override def deltaSnapshot: Snapshot = snapshot

def cacheThis(): Unit = {
deltaLog2Table.put(deltaLog, this)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.delta.DeltaParquetFileFormat
import org.apache.spark.sql.delta.actions.Metadata
import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory}
import org.apache.spark.sql.execution.datasources.clickhouse.ClickhouseMetaSerializer
import org.apache.spark.sql.execution.datasources.mergetree.DeltaMetaReader
import org.apache.spark.sql.execution.datasources.v1.{CHMergeTreeWriterInjects, GlutenMergeTreeWriterInjects}
import org.apache.spark.sql.types.StructType

Expand All @@ -28,47 +30,6 @@ import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
@SuppressWarnings(Array("io.github.zhztheplayer.scalawarts.InheritFromCaseClass"))
class DeltaMergeTreeFileFormat(metadata: Metadata) extends DeltaParquetFileFormat(metadata) {

protected var database = ""
protected var tableName = ""
protected var snapshotId = ""
protected var orderByKeyOption: Option[Seq[String]] = None
protected var lowCardKeyOption: Option[Seq[String]] = None
protected var minmaxIndexKeyOption: Option[Seq[String]] = None
protected var bfIndexKeyOption: Option[Seq[String]] = None
protected var setIndexKeyOption: Option[Seq[String]] = None
protected var primaryKeyOption: Option[Seq[String]] = None
protected var partitionColumns: Seq[String] = Seq.empty[String]
protected var clickhouseTableConfigs: Map[String, String] = Map.empty

// scalastyle:off argcount
def this(
metadata: Metadata,
database: String,
tableName: String,
snapshotId: String,
orderByKeyOption: Option[Seq[String]],
lowCardKeyOption: Option[Seq[String]],
minmaxIndexKeyOption: Option[Seq[String]],
bfIndexKeyOption: Option[Seq[String]],
setIndexKeyOption: Option[Seq[String]],
primaryKeyOption: Option[Seq[String]],
clickhouseTableConfigs: Map[String, String],
partitionColumns: Seq[String]) {
this(metadata)
this.database = database
this.tableName = tableName
this.snapshotId = snapshotId
this.orderByKeyOption = orderByKeyOption
this.lowCardKeyOption = lowCardKeyOption
this.minmaxIndexKeyOption = minmaxIndexKeyOption
this.bfIndexKeyOption = bfIndexKeyOption
this.setIndexKeyOption = setIndexKeyOption
this.primaryKeyOption = primaryKeyOption
this.clickhouseTableConfigs = clickhouseTableConfigs
this.partitionColumns = partitionColumns
}
// scalastyle:on argcount

override def shortName(): String = "mergetree"

override def toString(): String = "MergeTree"
Expand Down Expand Up @@ -98,6 +59,17 @@ class DeltaMergeTreeFileFormat(metadata: Metadata) extends DeltaParquetFileForma
.getInstance()
.nativeConf(options, "")

@transient val deltaMetaReader = DeltaMetaReader(metadata)

val database = deltaMetaReader.storageDB
val tableName = deltaMetaReader.storageTable
val deltaPath = deltaMetaReader.storagePath

val extensionTableBC = sparkSession.sparkContext.broadcast(
ClickhouseMetaSerializer
.forWrite(deltaMetaReader, metadata.schema)
.toByteArray)

new OutputWriterFactory {
override def getFileExtension(context: TaskAttemptContext): String = {
".mergetree"
Expand All @@ -107,25 +79,18 @@ class DeltaMergeTreeFileFormat(metadata: Metadata) extends DeltaParquetFileForma
path: String,
dataSchema: StructType,
context: TaskAttemptContext): OutputWriter = {
require(path == deltaPath)
GlutenMergeTreeWriterInjects
.getInstance()
.asInstanceOf[CHMergeTreeWriterInjects]
.createOutputWriter(
path,
database,
tableName,
snapshotId,
orderByKeyOption,
lowCardKeyOption,
minmaxIndexKeyOption,
bfIndexKeyOption,
setIndexKeyOption,
primaryKeyOption,
partitionColumns,
metadata.schema,
clickhouseTableConfigs,
context,
nativeConf
nativeConf,
database,
tableName,
extensionTableBC.value
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -784,9 +784,6 @@ object DeltaLog extends DeltaLogging {
FileSourceOptions.IGNORE_CORRUPT_FILES -> "false",
FileSourceOptions.IGNORE_MISSING_FILES -> "false"
)
// --- modified start
// Don't need to add the bucketOption here, it handles the delta log meta json file
// --- modified end
val fsRelation = HadoopFsRelation(
index, index.partitionSchema, schema, None, index.format, allOptions)(spark)
LogicalRelation(fsRelation)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ import org.apache.spark.sql.delta.{ClickhouseSnapshot, DeltaErrors, DeltaLog, De
import org.apache.spark.sql.delta.actions.{Metadata, Protocol}
import org.apache.spark.sql.delta.sources.DeltaDataSource
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, PartitionDirectory}
import org.apache.spark.sql.execution.datasources.utils.MergeTreePartsPartitionsUtil
import org.apache.spark.sql.execution.datasources.clickhouse.utils.MergeTreePartsPartitionsUtil
import org.apache.spark.sql.execution.datasources.mergetree.StorageMeta
import org.apache.spark.sql.execution.datasources.v2.clickhouse.source.DeltaMergeTreeFileFormat
import org.apache.spark.sql.execution.datasources.v2.clickhouse.utils.CHDataSourceUtils
import org.apache.spark.sql.util.CaseInsensitiveStringMap
Expand Down Expand Up @@ -98,28 +99,19 @@ class ClickHouseTableV2(
def getFileFormat(protocol: Protocol, meta: Metadata): DeltaMergeTreeFileFormat = {
new DeltaMergeTreeFileFormat(
protocol,
meta,
dataBaseName,
tableName,
ClickhouseSnapshot.genSnapshotId(initialSnapshot),
orderByKeyOption,
lowCardKeyOption,
minmaxIndexKeyOption,
bfIndexKeyOption,
setIndexKeyOption,
primaryKeyOption,
clickhouseTableConfigs,
partitionColumns
)
StorageMeta.withMoreStorageInfo(
meta,
ClickhouseSnapshot.genSnapshotId(initialSnapshot),
deltaLog.dataPath))
}

override def deltaProperties(): ju.Map[String, String] = properties()
override def deltaProperties: Map[String, String] = properties().asScala.toMap

override def deltaCatalog(): Option[CatalogTable] = catalogTable
override def deltaCatalog: Option[CatalogTable] = catalogTable

override def deltaPath(): Path = path
override def deltaPath: Path = path

override def deltaSnapshot(): Snapshot = initialSnapshot
override def deltaSnapshot: Snapshot = initialSnapshot

def cacheThis(): Unit = {
ClickHouseTableV2.deltaLog2Table.put(deltaLog, this)
Expand All @@ -133,7 +125,6 @@ class TempClickHouseTableV2(
override val spark: SparkSession,
override val catalogTable: Option[CatalogTable] = None)
extends ClickHouseTableV2(spark, null, catalogTable) {
import collection.JavaConverters._
override def properties(): ju.Map[String, String] = catalogTable.get.properties.asJava
override lazy val partitionColumns: Seq[String] = catalogTable.get.partitionColumnNames
override def cacheThis(): Unit = {}
Expand Down
Loading
Loading