Skip to content

Commit

Permalink
TablePropertiesReader ?
Browse files Browse the repository at this point in the history
  • Loading branch information
baibaichen committed Sep 29, 2024
1 parent 04e8f2c commit bb4bc5d
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ class ClickHouseTableV2(
)
}

override def deltaProperties(): ju.Map[String, String] = properties()
override def deltaProperties(): Map[String, String] = properties().asScala.toMap

override def deltaCatalog(): Option[CatalogTable] = catalogTable

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ class ClickHouseTableV2(
)
}

override def deltaProperties(): ju.Map[String, String] = properties()
override def deltaProperties(): Map[String, String] = properties().asScala.toMap

override def deltaCatalog(): Option[CatalogTable] = catalogTable

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ class ClickHouseTableV2(
)
}

override def deltaProperties: ju.Map[String, String] = properties()
override def deltaProperties: Map[String, String] = properties().asScala.toMap

override def deltaCatalog: Option[CatalogTable] = catalogTable

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,30 @@ package org.apache.spark.sql.delta.catalog

import org.apache.gluten.expression.ConverterUtils.normalizeColName

import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable}
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.delta.Snapshot
import org.apache.spark.sql.delta.actions.Metadata
import org.apache.spark.sql.execution.datasources.clickhouse.utils.MergeTreeDeltaUtil
import org.apache.spark.sql.execution.datasources.mergetree.TablePropertiesReader

import org.apache.hadoop.fs.Path

import java.{util => ju}

trait ClickHouseTableV2Base {
trait ClickHouseTableV2Base extends TablePropertiesReader {

val DEFAULT_DATABASE = "clickhouse_db"

def deltaProperties: ju.Map[String, String]
def deltaProperties: Map[String, String]

def deltaCatalog: Option[CatalogTable]

def deltaPath: Path

def deltaSnapshot: Snapshot

def configuration: Map[String, String] = deltaProperties

def metadata: Metadata = deltaSnapshot.metadata

lazy val dataBaseName: String = deltaCatalog
.map(_.identifier.database.getOrElse("default"))
.getOrElse(DEFAULT_DATABASE)
Expand All @@ -46,112 +50,13 @@ trait ClickHouseTableV2Base {
.map(_.identifier.table)
.getOrElse(deltaPath.toUri.getPath)

lazy val bucketOption: Option[BucketSpec] = {
val tableProperties = deltaProperties
if (tableProperties.containsKey("numBuckets")) {
val numBuckets = tableProperties.get("numBuckets").trim.toInt
val bucketColumnNames: Seq[String] =
getCommaSeparatedColumns("bucketColumnNames").getOrElse(Seq.empty[String])
val sortColumnNames: Seq[String] =
getCommaSeparatedColumns("orderByKey").getOrElse(Seq.empty[String])
Some(BucketSpec(numBuckets, bucketColumnNames, sortColumnNames))
} else {
None
}
}

lazy val lowCardKeyOption: Option[Seq[String]] = {
getCommaSeparatedColumns("lowCardKey")
}

lazy val minmaxIndexKeyOption: Option[Seq[String]] = {
getCommaSeparatedColumns("minmaxIndexKey")
}

lazy val bfIndexKeyOption: Option[Seq[String]] = {
getCommaSeparatedColumns("bloomfilterIndexKey")
}

lazy val setIndexKeyOption: Option[Seq[String]] = {
getCommaSeparatedColumns("setIndexKey")
}

private def getCommaSeparatedColumns(keyName: String) = {
val tableProperties = deltaProperties
if (tableProperties.containsKey(keyName)) {
if (tableProperties.get(keyName).nonEmpty) {
val keys = tableProperties
.get(keyName)
.split(",")
.map(n => normalizeColName(n.trim))
.toSeq
keys.foreach(
s => {
if (s.contains(".")) {
throw new IllegalStateException(
s"$keyName $s can not contain '.' (not support nested column yet)")
}
})
Some(keys)
} else {
None
}
} else {
None
}
}

lazy val orderByKeyOption: Option[Seq[String]] = {
if (bucketOption.isDefined && bucketOption.get.sortColumnNames.nonEmpty) {
val orderByKeys = bucketOption.get.sortColumnNames.map(normalizeColName).toSeq
val invalidKeys = orderByKeys.intersect(partitionColumns)
if (invalidKeys.nonEmpty) {
throw new IllegalStateException(
s"partition cols $invalidKeys can not be in the order by keys.")
}
Some(orderByKeys)
} else {
val orderByKeys = getCommaSeparatedColumns("orderByKey")
if (orderByKeys.isDefined) {
val invalidKeys = orderByKeys.get.intersect(partitionColumns)
if (invalidKeys.nonEmpty) {
throw new IllegalStateException(
s"partition cols $invalidKeys can not be in the order by keys.")
}
orderByKeys
} else {
None
}
}
}

lazy val primaryKeyOption: Option[Seq[String]] = {
if (orderByKeyOption.isDefined) {
val primaryKeys = getCommaSeparatedColumns("primaryKey")
if (
primaryKeys.isDefined && !orderByKeyOption.get
.mkString(",")
.startsWith(primaryKeys.get.mkString(","))
) {
throw new IllegalStateException(
s"Primary key $primaryKeys must be a prefix of the sorting key")
}
primaryKeys
} else {
None
}
}

lazy val partitionColumns: Seq[String] =
deltaSnapshot.metadata.partitionColumns.map(normalizeColName).toSeq

lazy val clickhouseTableConfigs: Map[String, String] = {
val (orderByKey0, primaryKey0) = MergeTreeDeltaUtil.genOrderByAndPrimaryKeyStr(
orderByKeyOption,
primaryKeyOption
)
Map(
"storage_policy" -> deltaProperties.getOrDefault("storage_policy", "default"),
"storage_policy" -> deltaProperties.getOrElse("storage_policy", "default"),
"storage_db" -> dataBaseName,
"storage_table" -> tableName,
"storage_orderByKey" -> orderByKey0,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.datasources.mergetree

import org.apache.gluten.expression.ConverterUtils.normalizeColName

import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.delta.actions.Metadata

/** Reserved table property for MergeTree table. */
object TableProperties {
val Provider: String = "clickhouse"
val DEFAULT_FILE_FORMAT: String = "write.format.default"
val DEFAULT_FILE_FORMAT_DEFAULT: String = "mergetree"

// Storage properties
val DefaultStorageDB: String = "default"
val STORAGE_DB: String = "storage_db"
val STORAGE_TABLE: String = "storage_table"

val SERIALIZER_HEADER: String = "MergeTree;"
}

trait TablePropertiesReader {

def configuration: Map[String, String]

/** delta */
def metadata: Metadata

def storageDB: String =
configuration.getOrElse(TableProperties.STORAGE_DB, TableProperties.DefaultStorageDB)

def storageTable: String =
configuration.getOrElse(TableProperties.STORAGE_TABLE, "")

private def getCommaSeparatedColumns(keyName: String): Option[Seq[String]] = {
configuration.get(keyName).map {
v =>
val keys = v.split(",").map(n => normalizeColName(n.trim)).toSeq
keys.foreach {
s =>
if (s.contains(".")) {
throw new IllegalStateException(
s"$keyName $s can not contain '.' (not support nested column yet)")
}
}
keys
}
}

lazy val bucketOption: Option[BucketSpec] = {
val tableProperties = configuration
if (tableProperties.contains("numBuckets")) {
val numBuckets = tableProperties("numBuckets").trim.toInt
val bucketColumnNames: Seq[String] =
getCommaSeparatedColumns("bucketColumnNames").getOrElse(Seq.empty[String])
val sortColumnNames: Seq[String] =
getCommaSeparatedColumns("orderByKey").getOrElse(Seq.empty[String])
Some(BucketSpec(numBuckets, bucketColumnNames, sortColumnNames))
} else {
None
}
}

lazy val lowCardKeyOption: Option[Seq[String]] = {
getCommaSeparatedColumns("lowCardKey")
}

lazy val minmaxIndexKeyOption: Option[Seq[String]] = {
getCommaSeparatedColumns("minmaxIndexKey")
}

lazy val bfIndexKeyOption: Option[Seq[String]] = {
getCommaSeparatedColumns("bloomfilterIndexKey")
}

lazy val setIndexKeyOption: Option[Seq[String]] = {
getCommaSeparatedColumns("setIndexKey")
}

lazy val partitionColumns: Seq[String] =
metadata.partitionColumns.map(normalizeColName)

lazy val orderByKeyOption: Option[Seq[String]] = {
val orderByKeys =
if (bucketOption.exists(_.sortColumnNames.nonEmpty)) {
bucketOption.map(_.sortColumnNames.map(normalizeColName))
} else {
getCommaSeparatedColumns("orderByKey")
}
orderByKeys
.map(_.intersect(partitionColumns))
.filter(_.nonEmpty)
.foreach {
invalidKeys =>
throw new IllegalStateException(
s"partition cols $invalidKeys can not be in the order by keys.")
}
orderByKeys
}

lazy val primaryKeyOption: Option[Seq[String]] = {
orderByKeyOption.map(_.mkString(",")).flatMap {
orderBy =>
val primaryKeys = getCommaSeparatedColumns("primaryKey")
primaryKeys
.map(_.mkString(","))
.filterNot(orderBy.startsWith)
.foreach(
primaryKey =>
throw new IllegalStateException(
s"Primary key $primaryKey must be a prefix of the sorting key $orderBy"))
primaryKeys
}
}
}

0 comments on commit bb4bc5d

Please sign in to comment.