Skip to content

Commit

Permalink
[GLUTEN-6067][CH] Support CH backend with Spark3.5 (Task 1 and Taks 2)
Browse files Browse the repository at this point in the history
Support CH backend with Spark3.5:

1. Upgrade Spark version to 3.5 and compile passed;
2. Upgrade Delta version to 3.2 and compile passed;
3. CH backend UT passed: (now only the MergeTree + Delta UT passed);
4. Parquet native write passed;
5. Gluten UT passed;
6. Support to run Gluten CH CI with Spark 3.5
  • Loading branch information
zzcclp committed Jun 13, 2024
1 parent 00fee1d commit 76c1227
Show file tree
Hide file tree
Showing 49 changed files with 8,915 additions and 281 deletions.
33 changes: 33 additions & 0 deletions backends-clickhouse/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,38 @@
<version>8.5.9</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-memory-core</artifactId>
<version>${arrow.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-common</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-buffer</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-vector</artifactId>
<version>${arrow.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-common</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-buffer</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>

<build>
Expand Down Expand Up @@ -272,6 +304,7 @@
</includes>
<excludes>
<exclude>src/main/delta-${delta.binary.version}/org/apache/spark/sql/delta/commands/*.scala</exclude>
<exclude>src/main/delta-${delta.binary.version}/org/apache/spark/sql/delta/commands/merge/*.scala</exclude>
<exclude>src/main/delta-${delta.binary.version}/org/apache/spark/sql/delta/files/*.scala</exclude>
<exclude>src/main/delta-${delta.binary.version}/org/apache/spark/sql/delta/DeltaLog.scala</exclude>
<exclude>src/main/delta-${delta.binary.version}/org/apache/spark/sql/delta/Snapshot.scala</exclude>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, FakeRowAdaptor, FileFormatWriter, WriteJobStatsTracker}
import org.apache.spark.sql.execution.datasources.v1.clickhouse.MergeTreeFileFormatWriter
import org.apache.spark.sql.execution.datasources.v2.clickhouse.ClickHouseConfig
import org.apache.spark.sql.execution.datasources.v2.clickhouse.source.DeltaMergeTreeFileFormat
import org.apache.spark.util.{Clock, SerializableConfiguration}

import org.apache.commons.lang3.exception.ExceptionUtils
Expand Down Expand Up @@ -139,20 +138,7 @@ class ClickhouseOptimisticTransaction(
MergeTreeFileFormatWriter.write(
sparkSession = spark,
plan = newQueryPlan,
fileFormat = new DeltaMergeTreeFileFormat(
metadata,
tableV2.dataBaseName,
tableV2.tableName,
ClickhouseSnapshot.genSnapshotId(tableV2.snapshot),
tableV2.orderByKeyOption,
tableV2.lowCardKeyOption,
tableV2.minmaxIndexKeyOption,
tableV2.bfIndexKeyOption,
tableV2.setIndexKeyOption,
tableV2.primaryKeyOption,
tableV2.clickhouseTableConfigs,
tableV2.partitionColumns
),
fileFormat = tableV2.getFileFormat(metadata),
// formats.
committer = committer,
outputSpec = outputSpec,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.datasources.v2.clickhouse
package org.apache.spark.sql.delta

import org.apache.spark.sql.delta.{DeltaLog, Snapshot}

object DeltaLogAdapter {
def snapshot(deltaLog: DeltaLog): Snapshot = deltaLog.unsafeVolatileSnapshot
object DeltaAdapter extends DeltaAdapterTrait {
override def snapshot(deltaLog: DeltaLog): Snapshot = deltaLog.snapshot
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@
package org.apache.spark.sql.delta.catalog
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable}
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
import org.apache.spark.sql.delta.{ClickhouseSnapshot, DeltaErrors, DeltaLog, DeltaTimeTravelSpec}
import org.apache.spark.sql.delta.{ClickhouseSnapshot, DeltaErrors, DeltaLog, DeltaTimeTravelSpec, Snapshot}
import org.apache.spark.sql.delta.actions.Metadata
import org.apache.spark.sql.delta.catalog.ClickHouseTableV2.deltaLog2Table
import org.apache.spark.sql.delta.sources.DeltaDataSource
Expand Down Expand Up @@ -54,8 +54,8 @@ class ClickHouseTableV2(
tableIdentifier,
timeTravelOpt,
options,
cdcOptions) {
protected def getMetadata: Metadata = if (snapshot == null) Metadata() else snapshot.metadata
cdcOptions)
with ClickHouseTableV2Base {

lazy val (rootPath, partitionFilters, timeTravelByPath) = {
if (catalogTable.isDefined) {
Expand Down Expand Up @@ -93,126 +93,6 @@ class ClickHouseTableV2(
new WriteIntoDeltaBuilder(deltaLog, info.options)
}

lazy val dataBaseName = catalogTable
.map(_.identifier.database.getOrElse("default"))
.getOrElse("clickhouse")

lazy val tableName = catalogTable
.map(_.identifier.table)
.getOrElse(path.toUri.getPath)

lazy val bucketOption: Option[BucketSpec] = {
val tableProperties = properties()
if (tableProperties.containsKey("numBuckets")) {
val numBuckets = tableProperties.get("numBuckets").trim.toInt
val bucketColumnNames: Seq[String] =
tableProperties.get("bucketColumnNames").split(",").map(_.trim).toSeq
val sortColumnNames: Seq[String] = if (tableProperties.containsKey("orderByKey")) {
tableProperties.get("orderByKey").split(",").map(_.trim).toSeq
} else Seq.empty[String]
Some(BucketSpec(numBuckets, bucketColumnNames, sortColumnNames))
} else {
None
}
}

lazy val lowCardKeyOption: Option[Seq[String]] = {
getCommaSeparatedColumns("lowCardKey")
}

lazy val minmaxIndexKeyOption: Option[Seq[String]] = {
getCommaSeparatedColumns("minmaxIndexKey")
}

lazy val bfIndexKeyOption: Option[Seq[String]] = {
getCommaSeparatedColumns("bloomfilterIndexKey")
}

lazy val setIndexKeyOption: Option[Seq[String]] = {
getCommaSeparatedColumns("setIndexKey")
}

private def getCommaSeparatedColumns(keyName: String) = {
val tableProperties = properties()
if (tableProperties.containsKey(keyName)) {
if (tableProperties.get(keyName).nonEmpty) {
val keys = tableProperties.get(keyName).split(",").map(_.trim).toSeq
keys.foreach(
s => {
if (s.contains(".")) {
throw new IllegalStateException(
s"$keyName $s can not contain '.' (not support nested column yet)")
}
})
Some(keys.map(s => s.toLowerCase()))
} else {
None
}
} else {
None
}
}

lazy val orderByKeyOption: Option[Seq[String]] = {
if (bucketOption.isDefined && bucketOption.get.sortColumnNames.nonEmpty) {
val orderByKes = bucketOption.get.sortColumnNames
val invalidKeys = orderByKes.intersect(partitionColumns)
if (invalidKeys.nonEmpty) {
throw new IllegalStateException(
s"partition cols $invalidKeys can not be in the order by keys.")
}
Some(orderByKes)
} else {
val tableProperties = properties()
if (tableProperties.containsKey("orderByKey")) {
if (tableProperties.get("orderByKey").nonEmpty) {
val orderByKes = tableProperties.get("orderByKey").split(",").map(_.trim).toSeq
val invalidKeys = orderByKes.intersect(partitionColumns)
if (invalidKeys.nonEmpty) {
throw new IllegalStateException(
s"partition cols $invalidKeys can not be in the order by keys.")
}
Some(orderByKes)
} else {
None
}
} else {
None
}
}
}

lazy val primaryKeyOption: Option[Seq[String]] = {
if (orderByKeyOption.isDefined) {
val tableProperties = properties()
if (tableProperties.containsKey("primaryKey")) {
if (tableProperties.get("primaryKey").nonEmpty) {
val primaryKeys = tableProperties.get("primaryKey").split(",").map(_.trim).toSeq
if (!orderByKeyOption.get.mkString(",").startsWith(primaryKeys.mkString(","))) {
throw new IllegalStateException(
s"Primary key $primaryKeys must be a prefix of the sorting key")
}
Some(primaryKeys)
} else {
None
}
} else {
None
}
} else {
None
}
}

lazy val partitionColumns = snapshot.metadata.partitionColumns

lazy val clickhouseTableConfigs: Map[String, String] = {
val tableProperties = properties()
val configs = scala.collection.mutable.Map[String, String]()
configs += ("storage_policy" -> tableProperties.getOrDefault("storage_policy", "default"))
configs.toMap
}

def getFileFormat(meta: Metadata): DeltaMergeTreeFileFormat = {
new DeltaMergeTreeFileFormat(
meta,
Expand All @@ -230,41 +110,19 @@ class ClickHouseTableV2(
)
}

def cacheThis(): Unit = {
deltaLog2Table.put(deltaLog, this)
}
override def deltaProperties(): ju.Map[String, String] = properties()

cacheThis()
override def deltaCatalog(): Option[CatalogTable] = catalogTable

def primaryKey(): String = primaryKeyOption match {
case Some(keys) => keys.mkString(",")
case None => ""
}

def orderByKey(): String = orderByKeyOption match {
case Some(keys) => keys.mkString(",")
case None => "tuple()"
}

def lowCardKey(): String = lowCardKeyOption match {
case Some(keys) => keys.mkString(",")
case None => ""
}
override def deltaPath(): Path = path

def minmaxIndexKey(): String = minmaxIndexKeyOption match {
case Some(keys) => keys.mkString(",")
case None => ""
}
override def deltaSnapshot(): Snapshot = snapshot

def bfIndexKey(): String = bfIndexKeyOption match {
case Some(keys) => keys.mkString(",")
case None => ""
def cacheThis(): Unit = {
deltaLog2Table.put(deltaLog, this)
}

def setIndexKey(): String = setIndexKeyOption match {
case Some(keys) => keys.mkString(",")
case None => ""
}
cacheThis()
}

@SuppressWarnings(Array("io.github.zhztheplayer.scalawarts.InheritFromCaseClass"))
Expand Down
Loading

0 comments on commit 76c1227

Please sign in to comment.