diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v2/clickhouse/ClickHouseSparkCatalog.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v2/clickhouse/ClickHouseSparkCatalog.scala index 93f94249ec20..be646dae801b 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v2/clickhouse/ClickHouseSparkCatalog.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v2/clickhouse/ClickHouseSparkCatalog.scala @@ -268,6 +268,49 @@ class ClickHouseSparkCatalog ClickHouseTableV2(spark, new Path(ident.name())) } + /** override `dropTable`` method, calling `clearFileStatusCacheByPath` after dropping */ + override def dropTable(ident: Identifier): Boolean = { + try { + loadTable(ident) match { + case t: ClickHouseTableV2 => + val tablePath = t.rootPath + val deletedTable = super.dropTable(ident) + if (deletedTable) ClickHouseTableV2.clearFileStatusCacheByPath(tablePath) + deletedTable + case _ => super.dropTable(ident) + } + } catch { + case _: Exception => + false + } + } + + /** support to delete mergetree data from the external table */ + override def purgeTable(ident: Identifier): Boolean = { + try { + loadTable(ident) match { + case t: ClickHouseTableV2 => + val tableType = t.properties().getOrDefault("Type", "") + // file-based or external table + val isExternal = tableType.isEmpty || tableType.equalsIgnoreCase("external") + val tablePath = t.rootPath + // first delete the table metadata + val deletedTable = super.dropTable(ident) + if (deletedTable && isExternal) { + val fs = tablePath.getFileSystem(spark.sessionState.newHadoopConf()) + // delete all data if there is a external table + fs.delete(tablePath, true) + ClickHouseTableV2.clearFileStatusCacheByPath(tablePath) + } + true + case _ => super.purgeTable(ident) + } + } catch { + case _: Exception => + false + } + } + override def stageCreate( ident: Identifier, schema: StructType, diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v2/clickhouse/table/ClickHouseTableV2.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v2/clickhouse/table/ClickHouseTableV2.scala index bc9e2a0772f1..10437187ce52 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v2/clickhouse/table/ClickHouseTableV2.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v2/clickhouse/table/ClickHouseTableV2.scala @@ -97,7 +97,7 @@ case class ClickHouseTableV2( protected def metadata: Metadata = if (snapshot == null) Metadata() else snapshot.metadata - private lazy val (rootPath, partitionFilters, timeTravelByPath) = { + lazy val (rootPath, partitionFilters, timeTravelByPath) = { if (catalogTable.isDefined) { // Fast path for reducing path munging overhead (new Path(catalogTable.get.location), Nil, None) @@ -381,6 +381,8 @@ object ClickHouseTableV2 extends Logging { def clearAllFileStatusCache: Unit = fileStatusCache.invalidateAll() + def clearFileStatusCacheByPath(p: Path): Unit = fileStatusCache.invalidate(p) + protected val stalenessLimit: Long = SparkSession.active.sessionState.conf .getConf(DeltaSQLConf.DELTA_ASYNC_UPDATE_STALENESS_TIME_LIMIT) protected var lastUpdateTimestamp: Long = -1L diff --git a/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseMergeTreeWriteSuite.scala b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseMergeTreeWriteSuite.scala index 083fa82e2140..93e1d3db8de7 100644 --- a/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseMergeTreeWriteSuite.scala +++ b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseMergeTreeWriteSuite.scala @@ -21,6 +21,8 @@ import org.apache.spark.sql.SaveMode import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.execution.datasources.v1.ClickHouseFileIndex +import java.io.File + // Some sqls' line length exceeds 100 // scalastyle:off line.size.limit @@ -491,5 +493,104 @@ class GlutenClickHouseMergeTreeWriteSuite .size == 1) } } + + test("GLUTEN-4749: Support to purge mergetree data for CH backend") { + def createAndDropTable( + tableName: String, + tableLocation: String, + isExternal: Boolean = false, + purgeTable: Boolean = false): Unit = { + spark.sql(s""" + |DROP TABLE IF EXISTS $tableName; + |""".stripMargin) + + spark.sql(s""" + |CREATE ${if (isExternal) "EXTERNAL" else ""} TABLE IF NOT EXISTS $tableName + |( + | l_orderkey bigint, + | l_partkey bigint, + | l_suppkey bigint, + | l_linenumber bigint, + | l_quantity double, + | l_extendedprice double, + | l_discount double, + | l_tax double, + | l_returnflag string, + | l_linestatus string, + | l_shipdate date, + | l_commitdate date, + | l_receiptdate date, + | l_shipinstruct string, + | l_shipmode string, + | l_comment string + |) + |USING clickhouse + |TBLPROPERTIES (orderByKey='l_shipdate,l_orderkey', + | primaryKey='l_shipdate') + |${if (tableLocation.nonEmpty) "LOCATION '" + tableLocation + "'" else ""} + |""".stripMargin) + + spark.sql(s""" + | insert into table $tableName + | select * from lineitem + |""".stripMargin) + + spark.sql(s""" + |DROP TABLE IF EXISTS $tableName ${if (purgeTable) "PURGE" else ""}; + |""".stripMargin) + } + + def checkTableExists( + tableName: String, + tableLocation: String, + exceptedExists: Boolean): Unit = { + val tableList = spark + .sql(s""" + |show tables; + |""".stripMargin) + .collect() + assert(!tableList.exists(_.getString(1).equals(tableName))) + + val deletedPathStr = if (tableLocation.nonEmpty) { + tableLocation + } else { + warehouse + "/" + tableName + } + val deletedPath = new File(deletedPathStr) + assert(deletedPath.exists() == exceptedExists) + } + + // test non external table + var tableName = "lineitem_mergetree_drop" + var tableLocation = "" + createAndDropTable(tableName, tableLocation) + checkTableExists(tableName, tableLocation, false) + + // test external table + tableName = "lineitem_mergetree_external_drop" + createAndDropTable(tableName, tableLocation, true) + checkTableExists(tableName, tableLocation, false) + + // test table with the specified location + tableName = "lineitem_mergetree_location_drop" + tableLocation = basePath + "/" + tableName + createAndDropTable(tableName, tableLocation) + checkTableExists(tableName, tableLocation, true) + + tableName = "lineitem_mergetree_external_location_drop" + tableLocation = basePath + "/" + tableName + createAndDropTable(tableName, tableLocation, true) + checkTableExists(tableName, tableLocation, true) + + tableName = "lineitem_mergetree_location_purge" + tableLocation = basePath + "/" + tableName + createAndDropTable(tableName, tableLocation, purgeTable = true) + checkTableExists(tableName, tableLocation, false) + + tableName = "lineitem_mergetree_external_location_purge" + tableLocation = basePath + "/" + tableName + createAndDropTable(tableName, tableLocation, true, true) + checkTableExists(tableName, tableLocation, false) + } } // scalastyle:off line.size.limit