Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GLUTEN-6067][CH][Part 1] Support CH backend with Spark3.5 #6068

Merged
merged 2 commits into from
Jun 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions backends-clickhouse/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,38 @@
<version>8.5.9</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-memory-core</artifactId>
<version>${arrow.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-common</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-buffer</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-vector</artifactId>
<version>${arrow.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-common</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-buffer</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>

<build>
Expand Down Expand Up @@ -272,6 +304,7 @@
</includes>
<excludes>
<exclude>src/main/delta-${delta.binary.version}/org/apache/spark/sql/delta/commands/*.scala</exclude>
<exclude>src/main/delta-${delta.binary.version}/org/apache/spark/sql/delta/commands/merge/*.scala</exclude>
<exclude>src/main/delta-${delta.binary.version}/org/apache/spark/sql/delta/files/*.scala</exclude>
<exclude>src/main/delta-${delta.binary.version}/org/apache/spark/sql/delta/DeltaLog.scala</exclude>
<exclude>src/main/delta-${delta.binary.version}/org/apache/spark/sql/delta/Snapshot.scala</exclude>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, FakeRowAdaptor, FileFormatWriter, WriteJobStatsTracker}
import org.apache.spark.sql.execution.datasources.v1.clickhouse.MergeTreeFileFormatWriter
import org.apache.spark.sql.execution.datasources.v2.clickhouse.ClickHouseConfig
import org.apache.spark.sql.execution.datasources.v2.clickhouse.source.DeltaMergeTreeFileFormat
import org.apache.spark.util.{Clock, SerializableConfiguration}

import org.apache.commons.lang3.exception.ExceptionUtils
Expand Down Expand Up @@ -139,20 +138,7 @@ class ClickhouseOptimisticTransaction(
MergeTreeFileFormatWriter.write(
sparkSession = spark,
plan = newQueryPlan,
fileFormat = new DeltaMergeTreeFileFormat(
metadata,
tableV2.dataBaseName,
tableV2.tableName,
ClickhouseSnapshot.genSnapshotId(tableV2.snapshot),
tableV2.orderByKeyOption,
tableV2.lowCardKeyOption,
tableV2.minmaxIndexKeyOption,
tableV2.bfIndexKeyOption,
tableV2.setIndexKeyOption,
tableV2.primaryKeyOption,
tableV2.clickhouseTableConfigs,
tableV2.partitionColumns
),
fileFormat = tableV2.getFileFormat(metadata),
// formats.
committer = committer,
outputSpec = outputSpec,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.datasources.v2.clickhouse
package org.apache.spark.sql.delta

import org.apache.spark.sql.delta.{DeltaLog, Snapshot}

object DeltaLogAdapter {
def snapshot(deltaLog: DeltaLog): Snapshot = deltaLog.unsafeVolatileSnapshot
object DeltaAdapter extends DeltaAdapterTrait {
override def snapshot(deltaLog: DeltaLog): Snapshot = deltaLog.snapshot
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@
package org.apache.spark.sql.delta.catalog
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable}
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
import org.apache.spark.sql.delta.{ClickhouseSnapshot, DeltaErrors, DeltaLog, DeltaTimeTravelSpec}
import org.apache.spark.sql.delta.{ClickhouseSnapshot, DeltaErrors, DeltaLog, DeltaTimeTravelSpec, Snapshot}
import org.apache.spark.sql.delta.actions.Metadata
import org.apache.spark.sql.delta.catalog.ClickHouseTableV2.deltaLog2Table
import org.apache.spark.sql.delta.sources.DeltaDataSource
Expand Down Expand Up @@ -54,8 +54,8 @@ class ClickHouseTableV2(
tableIdentifier,
timeTravelOpt,
options,
cdcOptions) {
protected def getMetadata: Metadata = if (snapshot == null) Metadata() else snapshot.metadata
cdcOptions)
with ClickHouseTableV2Base {

lazy val (rootPath, partitionFilters, timeTravelByPath) = {
if (catalogTable.isDefined) {
Expand Down Expand Up @@ -93,126 +93,6 @@ class ClickHouseTableV2(
new WriteIntoDeltaBuilder(deltaLog, info.options)
}

lazy val dataBaseName = catalogTable
.map(_.identifier.database.getOrElse("default"))
.getOrElse("clickhouse")

lazy val tableName = catalogTable
.map(_.identifier.table)
.getOrElse(path.toUri.getPath)

lazy val bucketOption: Option[BucketSpec] = {
val tableProperties = properties()
if (tableProperties.containsKey("numBuckets")) {
val numBuckets = tableProperties.get("numBuckets").trim.toInt
val bucketColumnNames: Seq[String] =
tableProperties.get("bucketColumnNames").split(",").map(_.trim).toSeq
val sortColumnNames: Seq[String] = if (tableProperties.containsKey("orderByKey")) {
tableProperties.get("orderByKey").split(",").map(_.trim).toSeq
} else Seq.empty[String]
Some(BucketSpec(numBuckets, bucketColumnNames, sortColumnNames))
} else {
None
}
}

lazy val lowCardKeyOption: Option[Seq[String]] = {
getCommaSeparatedColumns("lowCardKey")
}

lazy val minmaxIndexKeyOption: Option[Seq[String]] = {
getCommaSeparatedColumns("minmaxIndexKey")
}

lazy val bfIndexKeyOption: Option[Seq[String]] = {
getCommaSeparatedColumns("bloomfilterIndexKey")
}

lazy val setIndexKeyOption: Option[Seq[String]] = {
getCommaSeparatedColumns("setIndexKey")
}

private def getCommaSeparatedColumns(keyName: String) = {
val tableProperties = properties()
if (tableProperties.containsKey(keyName)) {
if (tableProperties.get(keyName).nonEmpty) {
val keys = tableProperties.get(keyName).split(",").map(_.trim).toSeq
keys.foreach(
s => {
if (s.contains(".")) {
throw new IllegalStateException(
s"$keyName $s can not contain '.' (not support nested column yet)")
}
})
Some(keys.map(s => s.toLowerCase()))
} else {
None
}
} else {
None
}
}

lazy val orderByKeyOption: Option[Seq[String]] = {
if (bucketOption.isDefined && bucketOption.get.sortColumnNames.nonEmpty) {
val orderByKes = bucketOption.get.sortColumnNames
val invalidKeys = orderByKes.intersect(partitionColumns)
if (invalidKeys.nonEmpty) {
throw new IllegalStateException(
s"partition cols $invalidKeys can not be in the order by keys.")
}
Some(orderByKes)
} else {
val tableProperties = properties()
if (tableProperties.containsKey("orderByKey")) {
if (tableProperties.get("orderByKey").nonEmpty) {
val orderByKes = tableProperties.get("orderByKey").split(",").map(_.trim).toSeq
val invalidKeys = orderByKes.intersect(partitionColumns)
if (invalidKeys.nonEmpty) {
throw new IllegalStateException(
s"partition cols $invalidKeys can not be in the order by keys.")
}
Some(orderByKes)
} else {
None
}
} else {
None
}
}
}

lazy val primaryKeyOption: Option[Seq[String]] = {
if (orderByKeyOption.isDefined) {
val tableProperties = properties()
if (tableProperties.containsKey("primaryKey")) {
if (tableProperties.get("primaryKey").nonEmpty) {
val primaryKeys = tableProperties.get("primaryKey").split(",").map(_.trim).toSeq
if (!orderByKeyOption.get.mkString(",").startsWith(primaryKeys.mkString(","))) {
throw new IllegalStateException(
s"Primary key $primaryKeys must be a prefix of the sorting key")
}
Some(primaryKeys)
} else {
None
}
} else {
None
}
} else {
None
}
}

lazy val partitionColumns = snapshot.metadata.partitionColumns

lazy val clickhouseTableConfigs: Map[String, String] = {
val tableProperties = properties()
val configs = scala.collection.mutable.Map[String, String]()
configs += ("storage_policy" -> tableProperties.getOrDefault("storage_policy", "default"))
configs.toMap
}

def getFileFormat(meta: Metadata): DeltaMergeTreeFileFormat = {
new DeltaMergeTreeFileFormat(
meta,
Expand All @@ -230,41 +110,19 @@ class ClickHouseTableV2(
)
}

def cacheThis(): Unit = {
deltaLog2Table.put(deltaLog, this)
}
override def deltaProperties(): ju.Map[String, String] = properties()

cacheThis()
override def deltaCatalog(): Option[CatalogTable] = catalogTable

def primaryKey(): String = primaryKeyOption match {
case Some(keys) => keys.mkString(",")
case None => ""
}

def orderByKey(): String = orderByKeyOption match {
case Some(keys) => keys.mkString(",")
case None => "tuple()"
}

def lowCardKey(): String = lowCardKeyOption match {
case Some(keys) => keys.mkString(",")
case None => ""
}
override def deltaPath(): Path = path

def minmaxIndexKey(): String = minmaxIndexKeyOption match {
case Some(keys) => keys.mkString(",")
case None => ""
}
override def deltaSnapshot(): Snapshot = snapshot

def bfIndexKey(): String = bfIndexKeyOption match {
case Some(keys) => keys.mkString(",")
case None => ""
def cacheThis(): Unit = {
deltaLog2Table.put(deltaLog, this)
}

def setIndexKey(): String = setIndexKeyOption match {
case Some(keys) => keys.mkString(",")
case None => ""
}
cacheThis()
}

@SuppressWarnings(Array("io.github.zhztheplayer.scalawarts.InheritFromCaseClass"))
Expand Down
Loading
Loading