Skip to content

Commit

Permalink
[GLUTEN-5454][CH] Support delete/update/optimize/vacuum API for the M…
Browse files Browse the repository at this point in the history
…ergeTree + Delta (apache#5460)

Support delete/update/optimize/vacuum API for the MergeTree + Delta:
Similar to the DeltaTable API in Delta, add a ClickhouseTable to support delete/update/optimize/vacuum for the MergeTree + Delta by the api way:
```
// create a ClickhouseTable, the dataPath is the delta root path
val clickhouseTable = ClickhouseTable.forPath(spark, dataPath)
// optimize
clickhouseTable.optimize().executeCompaction()
// vacuum
clickhouseTable.vacuum(0.0)
// update
clickhouseTable.updateExpr("l_orderkey = 10086", Map("l_returnflag" -> "'X'"))
// delete
clickhouseTable.delete("mod(l_orderkey, 3) = 2")
```

Close apache#5454.
  • Loading branch information
zzcclp authored Apr 19, 2024
1 parent 884ef64 commit 24947fa
Show file tree
Hide file tree
Showing 13 changed files with 364 additions and 80 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.tables

import org.apache.spark.sql.{Dataset, Row, SparkSession}
import org.apache.spark.sql.delta.{DeltaErrors, DeltaTableIdentifier, DeltaTableUtils}
import org.apache.spark.sql.delta.catalog.ClickHouseTableV2

import org.apache.hadoop.fs.Path

class ClickhouseTable(
@transient private val _df: Dataset[Row],
@transient private val table: ClickHouseTableV2)
extends DeltaTable(_df, table) {

override def optimize(): DeltaOptimizeBuilder = {
DeltaOptimizeBuilder(
sparkSession,
table.tableIdentifier.getOrElse(s"clickhouse.`${deltaLog.dataPath.toString}`"))
}
}

object ClickhouseTable {

/**
* Create a DeltaTable for the data at the given `path`.
*
* Note: This uses the active SparkSession in the current thread to read the table data. Hence,
* this throws error if active SparkSession has not been set, that is,
* `SparkSession.getActiveSession()` is empty.
*
* @since 0.3.0
*/
def forPath(path: String): DeltaTable = {
val sparkSession = SparkSession.getActiveSession.getOrElse {
throw DeltaErrors.activeSparkSessionNotFound()
}
forPath(sparkSession, path)
}

/**
* Create a DeltaTable for the data at the given `path` using the given SparkSession.
*
* @since 0.3.0
*/
def forPath(sparkSession: SparkSession, path: String): DeltaTable = {
val hdpPath = new Path(path)
if (DeltaTableUtils.isDeltaTable(sparkSession, hdpPath)) {
new ClickhouseTable(
sparkSession.read.format("clickhouse").load(path),
new ClickHouseTableV2(spark = sparkSession, path = hdpPath)
)
} else {
throw DeltaErrors.notADeltaTableException(DeltaTableIdentifier(path = Some(path)))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.spark.sql.delta.skipping.MultiDimClustering
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.execution.command.{LeafRunnableCommand, RunnableCommand}
import org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMergeTreeParts
import org.apache.spark.sql.execution.datasources.v2.clickhouse.utils.CHDataSourceUtils
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.execution.metric.SQLMetrics.createMetric
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -126,7 +127,7 @@ case class OptimizeTableCommand(
override val otherCopyArgs: Seq[AnyRef] = zOrderBy :: Nil

override def run(sparkSession: SparkSession): Seq[Row] = {
CommandUtils.ensureClickHouseTableV2(tableId, sparkSession)
CHDataSourceUtils.ensureClickHouseTableV2(tableId, sparkSession)
val deltaLog = getDeltaLogClickhouse(sparkSession, path, tableId, "OPTIMIZE")

val partitionColumns = deltaLog.snapshot.metadata.partitionColumns
Expand Down Expand Up @@ -307,8 +308,8 @@ class OptimizeExecutor(
// Generally, a bin is a group of existing files, whose total size does not exceed the
// desired maxFileSize. They will be coalesced into a single output file.
// However, if isMultiDimClustering = true, all files in a partition will be read by the
// same job, the data will be range-partitioned and numFiles = totalFileSize / maxFileSize
// will be produced. See below.
// same job, the data will be range-partitioned and
// numFiles = totalFileSize / maxFileSize will be produced. See below.
if (file.size + currentBinSize > maxTargetFileSize && !isMultiDimClustering) {
bins += currentBin.toVector
currentBin.clear()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.tables

import org.apache.spark.sql.{Dataset, Row, SparkSession}
import org.apache.spark.sql.delta.{DeltaErrors, DeltaTableIdentifier, DeltaTableUtils}
import org.apache.spark.sql.delta.catalog.ClickHouseTableV2

import org.apache.hadoop.fs.Path

import scala.collection.JavaConverters._

class ClickhouseTable(
@transient private val _df: Dataset[Row],
@transient private val table: ClickHouseTableV2)
extends DeltaTable(_df, table) {

override def optimize(): DeltaOptimizeBuilder = {
DeltaOptimizeBuilder(
sparkSession,
table.tableIdentifier.getOrElse(s"clickhouse.`${deltaLog.dataPath.toString}`"),
table.options)
}
}

object ClickhouseTable {

/**
* Instantiate a [[DeltaTable]] object representing the data at the given path, If the given path
* is invalid (i.e. either no table exists or an existing table is not a Delta table), it throws a
* `not a Delta table` error.
*
* Note: This uses the active SparkSession in the current thread to read the table data. Hence,
* this throws error if active SparkSession has not been set, that is,
* `SparkSession.getActiveSession()` is empty.
*
* @since 0.3.0
*/
def forPath(path: String): DeltaTable = {
val sparkSession = SparkSession.getActiveSession.getOrElse {
throw DeltaErrors.activeSparkSessionNotFound()
}
forPath(sparkSession, path)
}

/**
* Instantiate a [[DeltaTable]] object representing the data at the given path, If the given path
* is invalid (i.e. either no table exists or an existing table is not a Delta table), it throws a
* `not a Delta table` error.
*
* @since 0.3.0
*/
def forPath(sparkSession: SparkSession, path: String): DeltaTable = {
forPath(sparkSession, path, Map.empty[String, String])
}

/**
* Instantiate a [[DeltaTable]] object representing the data at the given path, If the given path
* is invalid (i.e. either no table exists or an existing table is not a Delta table), it throws a
* `not a Delta table` error.
*
* @param hadoopConf
* Hadoop configuration starting with "fs." or "dfs." will be picked up by `DeltaTable` to
* access the file system when executing queries. Other configurations will not be allowed.
*
* {{{
* val hadoopConf = Map(
* "fs.s3a.access.key" -> "<access-key>",
* "fs.s3a.secret.key" -> "<secret-key>"
* )
* DeltaTable.forPath(spark, "/path/to/table", hadoopConf)
* }}}
* @since 2.2.0
*/
def forPath(
sparkSession: SparkSession,
path: String,
hadoopConf: scala.collection.Map[String, String]): DeltaTable = {
// We only pass hadoopConf so that we won't pass any unsafe options to Delta.
val badOptions = hadoopConf.filterKeys {
k => !DeltaTableUtils.validDeltaTableHadoopPrefixes.exists(k.startsWith)
}.toMap
if (!badOptions.isEmpty) {
throw DeltaErrors.unsupportedDeltaTableForPathHadoopConf(badOptions)
}
val fileSystemOptions: Map[String, String] = hadoopConf.toMap
val hdpPath = new Path(path)
if (DeltaTableUtils.isDeltaTable(sparkSession, hdpPath, fileSystemOptions)) {
new ClickhouseTable(
sparkSession.read.format("clickhouse").options(fileSystemOptions).load(path),
new ClickHouseTableV2(spark = sparkSession, path = hdpPath, options = fileSystemOptions)
)
} else {
throw DeltaErrors.notADeltaTableException(DeltaTableIdentifier(path = Some(path)))
}
}

/**
* Java friendly API to instantiate a [[DeltaTable]] object representing the data at the given
* path, If the given path is invalid (i.e. either no table exists or an existing table is not a
* Delta table), it throws a `not a Delta table` error.
*
* @param hadoopConf
* Hadoop configuration starting with "fs." or "dfs." will be picked up by `DeltaTable` to
* access the file system when executing queries. Other configurations will be ignored.
*
* {{{
* val hadoopConf = Map(
* "fs.s3a.access.key" -> "<access-key>",
* "fs.s3a.secret.key", "<secret-key>"
* )
* DeltaTable.forPath(spark, "/path/to/table", hadoopConf)
* }}}
* @since 2.2.0
*/
def forPath(
sparkSession: SparkSession,
path: String,
hadoopConf: java.util.Map[String, String]): DeltaTable = {
val fsOptions = hadoopConf.asScala.toMap
forPath(sparkSession, path, fsOptions)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.spark.sql.delta.skipping.MultiDimClustering
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.execution.command.{LeafRunnableCommand, RunnableCommand}
import org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMergeTreeParts
import org.apache.spark.sql.execution.datasources.v2.clickhouse.utils.CHDataSourceUtils
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.execution.metric.SQLMetrics.createMetric
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -126,7 +127,7 @@ case class OptimizeTableCommand(
override val otherCopyArgs: Seq[AnyRef] = zOrderBy :: Nil

override def run(sparkSession: SparkSession): Seq[Row] = {
CommandUtils.ensureClickHouseTableV2(tableId, sparkSession)
CHDataSourceUtils.ensureClickHouseTableV2(tableId, sparkSession)

val deltaLog = getDeltaLogClickhouse(sparkSession, path, tableId, "OPTIMIZE", options)

Expand Down Expand Up @@ -295,8 +296,8 @@ class OptimizeExecutor(
// Generally, a bin is a group of existing files, whose total size does not exceed the
// desired maxFileSize. They will be coalesced into a single output file.
// However, if isMultiDimClustering = true, all files in a partition will be read by the
// same job, the data will be range-partitioned and numFiles = totalFileSize / maxFileSize
// will be produced. See below.
// same job, the data will be range-partitioned and
// umFiles = totalFileSize / maxFileSize will be produced. See below.
if (file.size + currentBinSize > maxTargetFileSize && !isMultiDimClustering) {
bins += currentBin.toVector
currentBin.clear()
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,15 @@ import org.apache.spark.internal.io.SparkHadoopWriterUtils
import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.delta.{ClickhouseSnapshot, DeltaErrors, DeltaLog, DeltaTableIdentifier, OptimisticTransaction}
import org.apache.spark.sql.delta._
import org.apache.spark.sql.delta.actions.{AddFile, FileAction}
import org.apache.spark.sql.delta.catalog.ClickHouseTableV2
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.datasources.CHDatasourceJniWrapper
import org.apache.spark.sql.execution.datasources.v1.CHMergeTreeWriterInjects
import org.apache.spark.sql.execution.datasources.v1.clickhouse._
import org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.{AddFileTags, AddMergeTreeParts}
import org.apache.spark.sql.execution.datasources.v2.clickhouse.utils.CHDataSourceUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.{SerializableConfiguration, SystemClock, Utils}
Expand Down Expand Up @@ -264,12 +265,21 @@ object OptimizeTableCommandOverwrites extends Logging {
operationName: String,
hadoopConf: Map[String, String] = Map.empty): DeltaLog = {
val tablePath =
if (tableIdentifier.nonEmpty && isDeltaTable(spark, tableIdentifier.get)) {
if (path.nonEmpty) {
new Path(path.get)
} else if (tableIdentifier.nonEmpty) {
val sessionCatalog = spark.sessionState.catalog
lazy val metadata = sessionCatalog.getTableMetadata(tableIdentifier.get)
new Path(metadata.location)

if (CHDataSourceUtils.isClickhousePath(spark, tableIdentifier.get)) {
new Path(tableIdentifier.get.table)
} else if (CHDataSourceUtils.isClickHouseTable(spark, tableIdentifier.get)) {
new Path(metadata.location)
} else {
throw DeltaErrors.notADeltaTableException(operationName)
}
} else {
throw new UnsupportedOperationException("OPTIMIZE is ony supported for clickhouse tables")
throw DeltaErrors.missingTableIdentifierException(operationName)
}

val startTime = Some(System.currentTimeMillis)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ class ClickHouseDataSource extends DeltaDataSource {
.getOrElse(Nil)

val deltaLog = DeltaLog.forTable(sqlContext.sparkSession, path, parameters)
val configs = if (DeltaLogAdapter.snapshot(deltaLog).version < 0) {
// need to use the latest snapshot
val configs = if (deltaLog.update().version < 0) {
// when creating table, save the clickhouse config to the delta metadata
val clickHouseTableV2 = ClickHouseTableV2.getTable(deltaLog)
clickHouseTableV2.properties().asScala.toMap ++ DeltaConfigs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ class ClickHouseSparkCatalog
override def loadTable(ident: Identifier): Table = {
try {
super.loadTable(ident) match {
case v1: V1Table if CHDataSourceUtils.isDeltaTable(v1.catalogTable) =>
case v1: V1Table if CHDataSourceUtils.isClickHouseTable(v1.catalogTable) =>
new ClickHouseTableV2(
spark,
new Path(v1.catalogTable.location),
Expand Down
Loading

0 comments on commit 24947fa

Please sign in to comment.