Skip to content

Commit

Permalink
[GLUTEN-5901][CH] Support CH backend parquet + delta
Browse files Browse the repository at this point in the history
Support CH backend to read/write parquet with the delta:

1. native read parquet from the delta catalog;
2. fallback write the parquet to the delta catalog ( don't support the DeltaInvariantCheckerExec operator and DeltaTaskStatisticsTracker) ;
3. Use the ClickHouseSparkCatalog as the uniform catalog.

Close #5901.
  • Loading branch information
zzcclp committed May 29, 2024
1 parent 79c681b commit 1a60d89
Show file tree
Hide file tree
Showing 24 changed files with 6,901 additions and 230 deletions.
67 changes: 63 additions & 4 deletions backends-clickhouse/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,13 @@
<packaging>jar</packaging>
<name>Gluten Backends ClickHouse</name>

<properties>
<clickhouse.delta.package.name>delta-core</clickhouse.delta.package.name>
<clickhouse.delta.version>2.3.0</clickhouse.delta.version>
<clickhouse.delta.binary.version>23</clickhouse.delta.binary.version>
</properties>


<dependencies>
<dependency>
<groupId>org.apache.gluten</groupId>
Expand Down Expand Up @@ -94,8 +101,19 @@
</dependency>
<dependency>
<groupId>io.delta</groupId>
<artifactId>${delta.package.name}_${scala.binary.version}</artifactId>
<artifactId>${clickhouse.delta.package.name}_${scala.binary.version}</artifactId>
<version>${clickhouse.delta.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.antlr</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.scalacheck</groupId>
Expand Down Expand Up @@ -267,9 +285,15 @@
<includes>
<include>src/main/scala/**/*.scala</include>
<include>src/test/scala/**/*.scala</include>
<include>src/main/delta-${delta.binary.version}/**/*.scala</include>
<include>src/test/delta-${delta.binary.version}/**/*.scala</include>
<include>src/main/delta-${clickhouse.delta.binary.version}/**/*.scala</include>
<include>src/test/delta-${clickhouse.delta.binary.version}/**/*.scala</include>
</includes>
<excludes>
<exclude>src/main/delta-${clickhouse.delta.binary.version}/org/apache/spark/sql/delta/commands/*.scala</exclude>
<exclude>src/main/delta-${clickhouse.delta.binary.version}/org/apache/spark/sql/delta/files/*.scala</exclude>
<exclude>src/main/delta-${clickhouse.delta.binary.version}/org/apache/spark/sql/delta/DeltaLog.scala</exclude>
<exclude>src/main/delta-${clickhouse.delta.binary.version}/org/apache/spark/sql/delta/Snapshot.scala</exclude>
</excludes>
</scala>
</configuration>
</plugin>
Expand Down Expand Up @@ -320,12 +344,47 @@
</goals>
<configuration>
<sources>
<source>src/main/delta-${delta.binary.version}</source>
<source>src/main/delta-${clickhouse.delta.binary.version}</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

<profiles>
<profile>
<id>spark-3.2</id>
<properties>
<clickhouse.delta.package.name>delta-core</clickhouse.delta.package.name>
<clickhouse.delta.version>2.0.1</clickhouse.delta.version>
<clickhouse.delta.binary.version>20</clickhouse.delta.binary.version>
</properties>
</profile>
<profile>
<id>spark-3.3</id>
<properties>
<clickhouse.delta.package.name>delta-core</clickhouse.delta.package.name>
<clickhouse.delta.version>2.3.0</clickhouse.delta.version>
<clickhouse.delta.binary.version>23</clickhouse.delta.binary.version>
</properties>
</profile>
<profile>
<id>spark-3.4</id>
<properties>
<clickhouse.delta.package.name>delta-core</clickhouse.delta.package.name>
<clickhouse.delta.version>2.4.0</clickhouse.delta.version>
<clickhouse.delta.binary.version>24</clickhouse.delta.binary.version>
</properties>
</profile>
<profile>
<id>spark-3.5</id>
<properties>
<clickhouse.delta.package.name>delta-spark</clickhouse.delta.package.name>
<clickhouse.delta.version>3.2.0</clickhouse.delta.version>
<clickhouse.delta.binary.version>32</clickhouse.delta.binary.version>
</properties>
</profile>
</profiles>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.tables

import org.apache.spark.sql.{Dataset, Row, SparkSession}
import org.apache.spark.sql.delta.{DeltaErrors, DeltaTableIdentifier, DeltaTableUtils}
import org.apache.spark.sql.delta.catalog.ClickHouseTableV2

import org.apache.hadoop.fs.Path

import scala.collection.JavaConverters._

class ClickhouseTable(
@transient private val _df: Dataset[Row],
@transient private val table: ClickHouseTableV2)
extends DeltaTable(_df, table) {

override def optimize(): DeltaOptimizeBuilder = {
DeltaOptimizeBuilder(
sparkSession,
table.tableIdentifier.getOrElse(s"clickhouse.`${deltaLog.dataPath.toString}`"),
table.options)
}
}

object ClickhouseTable {

/**
* Instantiate a [[DeltaTable]] object representing the data at the given path, If the given path
* is invalid (i.e. either no table exists or an existing table is not a Delta table), it throws a
* `not a Delta table` error.
*
* Note: This uses the active SparkSession in the current thread to read the table data. Hence,
* this throws error if active SparkSession has not been set, that is,
* `SparkSession.getActiveSession()` is empty.
*
* @since 0.3.0
*/
def forPath(path: String): DeltaTable = {
val sparkSession = SparkSession.getActiveSession.getOrElse {
throw DeltaErrors.activeSparkSessionNotFound()
}
forPath(sparkSession, path)
}

/**
* Instantiate a [[DeltaTable]] object representing the data at the given path, If the given path
* is invalid (i.e. either no table exists or an existing table is not a Delta table), it throws a
* `not a Delta table` error.
*
* @since 0.3.0
*/
def forPath(sparkSession: SparkSession, path: String): DeltaTable = {
forPath(sparkSession, path, Map.empty[String, String])
}

/**
* Instantiate a [[DeltaTable]] object representing the data at the given path, If the given path
* is invalid (i.e. either no table exists or an existing table is not a Delta table), it throws a
* `not a Delta table` error.
*
* @param hadoopConf
* Hadoop configuration starting with "fs." or "dfs." will be picked up by `DeltaTable` to
* access the file system when executing queries. Other configurations will not be allowed.
*
* {{{
* val hadoopConf = Map(
* "fs.s3a.access.key" -> "<access-key>",
* "fs.s3a.secret.key" -> "<secret-key>"
* )
* DeltaTable.forPath(spark, "/path/to/table", hadoopConf)
* }}}
* @since 2.2.0
*/
def forPath(
sparkSession: SparkSession,
path: String,
hadoopConf: scala.collection.Map[String, String]): DeltaTable = {
// We only pass hadoopConf so that we won't pass any unsafe options to Delta.
val badOptions = hadoopConf.filterKeys {
k => !DeltaTableUtils.validDeltaTableHadoopPrefixes.exists(k.startsWith)
}.toMap
if (!badOptions.isEmpty) {
throw DeltaErrors.unsupportedDeltaTableForPathHadoopConf(badOptions)
}
val fileSystemOptions: Map[String, String] = hadoopConf.toMap
val hdpPath = new Path(path)
if (DeltaTableUtils.isDeltaTable(sparkSession, hdpPath, fileSystemOptions)) {
new ClickhouseTable(
sparkSession.read.format("clickhouse").options(fileSystemOptions).load(path),
new ClickHouseTableV2(spark = sparkSession, path = hdpPath, options = fileSystemOptions)
)
} else {
throw DeltaErrors.notADeltaTableException(DeltaTableIdentifier(path = Some(path)))
}
}

/**
* Java friendly API to instantiate a [[DeltaTable]] object representing the data at the given
* path, If the given path is invalid (i.e. either no table exists or an existing table is not a
* Delta table), it throws a `not a Delta table` error.
*
* @param hadoopConf
* Hadoop configuration starting with "fs." or "dfs." will be picked up by `DeltaTable` to
* access the file system when executing queries. Other configurations will be ignored.
*
* {{{
* val hadoopConf = Map(
* "fs.s3a.access.key" -> "<access-key>",
* "fs.s3a.secret.key", "<secret-key>"
* )
* DeltaTable.forPath(spark, "/path/to/table", hadoopConf)
* }}}
* @since 2.2.0
*/
def forPath(
sparkSession: SparkSession,
path: String,
hadoopConf: java.util.Map[String, String]): DeltaTable = {
val fsOptions = hadoopConf.asScala.toMap
forPath(sparkSession, path, fsOptions)
}
}
Loading

0 comments on commit 1a60d89

Please sign in to comment.