Skip to content

Commit

Permalink
StorageMeta
Browse files Browse the repository at this point in the history
  • Loading branch information
baibaichen committed Sep 30, 2024
1 parent 80119f7 commit d210384
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.spark.sql.delta.actions.{Metadata, Protocol}
import org.apache.spark.sql.delta.sources.DeltaDataSource
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, PartitionDirectory}
import org.apache.spark.sql.execution.datasources.clickhouse.utils.MergeTreePartsPartitionsUtil
import org.apache.spark.sql.execution.datasources.mergetree.StorageMeta
import org.apache.spark.sql.execution.datasources.v2.clickhouse.source.DeltaMergeTreeFileFormat
import org.apache.spark.sql.execution.datasources.v2.clickhouse.utils.CHDataSourceUtils
import org.apache.spark.sql.util.CaseInsensitiveStringMap
Expand Down Expand Up @@ -98,11 +99,12 @@ class ClickHouseTableV2(
def getFileFormat(protocol: Protocol, meta: Metadata): DeltaMergeTreeFileFormat = {
new DeltaMergeTreeFileFormat(
protocol,
meta,
ClickhouseSnapshot.genSnapshotId(initialSnapshot),
deltaLog.dataPath.toString,
clickhouseTableConfigs
)
StorageMeta.withMoreStorageInfo(
meta,
ClickhouseSnapshot.genSnapshotId(initialSnapshot),
deltaLog.dataPath,
dataBaseName,
tableName))
}

override def deltaProperties: Map[String, String] = properties().asScala.toMap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,14 @@ import org.apache.spark.sql.delta.DeltaParquetFileFormat
import org.apache.spark.sql.delta.actions.{Metadata, Protocol}
import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory}
import org.apache.spark.sql.execution.datasources.clickhouse.ClickhouseMetaSerializer
import org.apache.spark.sql.execution.datasources.mergetree.DeltaMetaReader
import org.apache.spark.sql.execution.datasources.v1.{CHMergeTreeWriterInjects, GlutenMergeTreeWriterInjects}
import org.apache.spark.sql.types.StructType

import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}

@SuppressWarnings(Array("io.github.zhztheplayer.scalawarts.InheritFromCaseClass"))
class DeltaMergeTreeFileFormat(
protocol: Protocol,
metadata: Metadata,
val snapshotId: String,
val deltaPath: String,
@transient val clickhouseTableConfigs: Map[String, String])
class DeltaMergeTreeFileFormat(protocol: Protocol, metadata: Metadata)
extends DeltaParquetFileFormat(protocol, metadata) {

override def shortName(): String = "mergetree"
Expand Down Expand Up @@ -63,16 +59,15 @@ class DeltaMergeTreeFileFormat(
.getInstance()
.nativeConf(options, "")

val database = clickhouseTableConfigs("storage_db")
val tableName = clickhouseTableConfigs("storage_table")
@transient val deltaMetaReader = DeltaMetaReader(metadata)

val database = deltaMetaReader.storageDB
val tableName = deltaMetaReader.storageTable
val deltaPath = deltaMetaReader.storagePath

val extensionTableBC = sparkSession.sparkContext.broadcast(
ClickhouseMetaSerializer
.forWrite(
snapshotId,
deltaPath,
metadata.schema,
clickhouseTableConfigs
)
.forWrite(deltaMetaReader, metadata.schema)
.toByteArray)

new OutputWriterFactory {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,23 +51,10 @@ trait ClickHouseTableV2Base extends TablePropertiesReader {
.getOrElse(deltaPath.toUri.getPath)

lazy val clickhouseTableConfigs: Map[String, String] = {
val (orderByKey0, primaryKey0) = MergeTreeDeltaUtil.genOrderByAndPrimaryKeyStr(
orderByKeyOption,
primaryKeyOption
)
Map(
"storage_policy" -> deltaProperties.getOrElse("storage_policy", "default"),
"storage_db" -> dataBaseName,
"storage_table" -> tableName,
"storage_orderByKey" -> orderByKey0,
"storage_lowCardKey" -> lowCardKeyOption.map(MergeTreeDeltaUtil.columnsToStr).getOrElse(""),
"storage_minmaxIndexKey" -> minmaxIndexKeyOption
.map(MergeTreeDeltaUtil.columnsToStr)
.getOrElse(""),
"storage_bfIndexKey" -> bfIndexKeyOption.map(MergeTreeDeltaUtil.columnsToStr).getOrElse(""),
"storage_setIndexKey" -> setIndexKeyOption.map(MergeTreeDeltaUtil.columnsToStr).getOrElse(""),
"storage_primaryKey" -> primaryKey0
)
deltaProperties.get("storage_policy") match {
case Some(_) => deltaProperties
case None => deltaProperties ++ Seq("storage_policy" -> "default")
}
}

def primaryKey(): String = MergeTreeDeltaUtil.columnsToStr(primaryKeyOption)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import org.apache.gluten.execution.MergeTreePartSplit
import org.apache.gluten.expression.ConverterUtils

import org.apache.spark.sql.execution.datasources.clickhouse.utils.MergeTreeDeltaUtil
import org.apache.spark.sql.execution.datasources.mergetree.{DeltaMetaReader, StorageMeta}
import org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMergeTreeParts
import org.apache.spark.sql.types.StructType

Expand Down Expand Up @@ -80,16 +81,10 @@ object ClickhousePartSerializer {
}

object ClickhouseMetaSerializer {
private val MERGE_TREE = "MergeTree;"

def forWrite(
snapshotId: String,
path: String,
dataSchema: StructType,
clickhouseTableConfigs: Map[String, String]): ReadRel.ExtensionTable = {
def forWrite(deltaMetaReader: DeltaMetaReader, dataSchema: StructType): ReadRel.ExtensionTable = {
val clickhouseTableConfigs = deltaMetaReader.writeConfiguration

val database = clickhouseTableConfigs("storage_db")
val tableName = clickhouseTableConfigs("storage_table")
val orderByKey = clickhouseTableConfigs("storage_orderByKey")
val lowCardKey = clickhouseTableConfigs("storage_lowCardKey")
val minmaxIndexKey = clickhouseTableConfigs("storage_minmaxIndexKey")
Expand All @@ -98,11 +93,11 @@ object ClickhouseMetaSerializer {
val primaryKey = clickhouseTableConfigs("storage_primaryKey")

val result = apply(
database,
tableName,
snapshotId,
path,
"",
deltaMetaReader.storageDB,
deltaMetaReader.storageTable,
deltaMetaReader.storageSnapshotId,
deltaMetaReader.storagePath,
"", // absolutePath
orderByKey,
lowCardKey,
minmaxIndexKey,
Expand Down Expand Up @@ -176,7 +171,7 @@ object ClickhouseMetaSerializer {

// New: MergeTree;{database}\n{table}\n{orderByKey}\n{primaryKey}\n{relative_path}\n
// {part_path1}\n{part_path2}\n...
val extensionTableStr = new StringBuilder(MERGE_TREE)
val extensionTableStr = new StringBuilder(StorageMeta.SERIALIZER_HEADER)

val orderByKey = ConverterUtils.normalizeColName(orderByKey0)
val lowCardKey = ConverterUtils.normalizeColName(lowCardKey0)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.datasources.mergetree

import org.apache.spark.sql.delta.actions.Metadata

class DeltaMetaReader(
override val metadata: Metadata,
override val configuration: Map[String, String])
extends TablePropertiesReader {

def storageDB: String = configuration(StorageMeta.STORAGE_DB)
def storageTable: String = configuration(StorageMeta.STORAGE_TABLE)
def storageSnapshotId: String = configuration(StorageMeta.STORAGE_SNAPSHOT_ID)
def storagePath: String = configuration(StorageMeta.STORAGE_PATH)
}

object DeltaMetaReader {
def apply(metadata: Metadata): DeltaMetaReader = {
new DeltaMetaReader(metadata, metadata.configuration)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@ import org.apache.gluten.expression.ConverterUtils.normalizeColName

import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.delta.actions.Metadata
import org.apache.spark.sql.execution.datasources.clickhouse.utils.MergeTreeDeltaUtil

import org.apache.hadoop.fs.Path

/** Reserved table property for MergeTree table. */
object TableProperties {
object StorageMeta {
val Provider: String = "clickhouse"
val DEFAULT_FILE_FORMAT: String = "write.format.default"
val DEFAULT_FILE_FORMAT_DEFAULT: String = "mergetree"
Expand All @@ -31,8 +34,26 @@ object TableProperties {
val DefaultStorageDB: String = "default"
val STORAGE_DB: String = "storage_db"
val STORAGE_TABLE: String = "storage_table"
val STORAGE_SNAPSHOT_ID: String = "storage_snapshot_id"
val STORAGE_PATH: String = "storage_path"

val SERIALIZER_HEADER: String = "MergeTree;"

def withMoreStorageInfo(
metadata: Metadata,
snapshotId: String,
deltaPath: Path,
database: String,
tableName: String): Metadata = {
val newOptions =
metadata.configuration ++ Seq(
STORAGE_DB -> database,
STORAGE_SNAPSHOT_ID -> snapshotId,
STORAGE_TABLE -> tableName,
STORAGE_PATH -> deltaPath.toString
)
metadata.copy(configuration = newOptions)
}
}

trait TablePropertiesReader {
Expand All @@ -42,12 +63,6 @@ trait TablePropertiesReader {
/** delta */
def metadata: Metadata

def storageDB: String =
configuration.getOrElse(TableProperties.STORAGE_DB, TableProperties.DefaultStorageDB)

def storageTable: String =
configuration.getOrElse(TableProperties.STORAGE_TABLE, "")

private def getCommaSeparatedColumns(keyName: String): Option[Seq[String]] = {
configuration.get(keyName).map {
v =>
Expand Down Expand Up @@ -128,4 +143,22 @@ trait TablePropertiesReader {
primaryKeys
}
}

lazy val writeConfiguration: Map[String, String] = {
val (orderByKey0, primaryKey0) = MergeTreeDeltaUtil.genOrderByAndPrimaryKeyStr(
orderByKeyOption,
primaryKeyOption
)
Map(
"storage_policy" -> configuration.getOrElse("storage_policy", "default"),
"storage_orderByKey" -> orderByKey0,
"storage_lowCardKey" -> lowCardKeyOption.map(MergeTreeDeltaUtil.columnsToStr).getOrElse(""),
"storage_minmaxIndexKey" -> minmaxIndexKeyOption
.map(MergeTreeDeltaUtil.columnsToStr)
.getOrElse(""),
"storage_bfIndexKey" -> bfIndexKeyOption.map(MergeTreeDeltaUtil.columnsToStr).getOrElse(""),
"storage_setIndexKey" -> setIndexKeyOption.map(MergeTreeDeltaUtil.columnsToStr).getOrElse(""),
"storage_primaryKey" -> primaryKey0
)
}
}

0 comments on commit d210384

Please sign in to comment.