Skip to content

Commit

Permalink
[GLUTEN-5944][CH] Fallback to run delta vacuum command
Browse files Browse the repository at this point in the history
Fallback to run delta vacuum command:
When AQE is on, now gluten CH backend + Delta ran delta vacuum command failed, fallback to run it first.

Close #5944.
  • Loading branch information
zzcclp committed May 31, 2024
1 parent 91f5407 commit c7bfb97
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.spark.sql.delta.commands

import org.apache.gluten.utils.QueryPlanSelector

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.delta._
import org.apache.spark.sql.delta.actions.{FileAction, RemoveFile}
Expand Down Expand Up @@ -101,7 +103,12 @@ object VacuumCommand extends VacuumCommandImpl with Serializable {
retentionHours: Option[Double] = None,
clock: Clock = new SystemClock): DataFrame = {
recordDeltaOperation(deltaLog, "delta.gc") {

// --- modified start
val originalEnabledGluten =
spark.sparkContext.getLocalProperty(QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY)
// gluten can not support vacuum command
spark.sparkContext.setLocalProperty(QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY, "false")
// --- modified end
val path = deltaLog.dataPath
val deltaHadoopConf = deltaLog.newDeltaHadoopConf()
val fs = path.getFileSystem(deltaHadoopConf)
Expand Down Expand Up @@ -358,6 +365,16 @@ object VacuumCommand extends VacuumCommandImpl with Serializable {
spark.createDataset(Seq(basePath)).toDF("path")
} finally {
allFilesAndDirs.unpersist()

// --- modified start
if (originalEnabledGluten != null) {
spark.sparkContext.setLocalProperty(
QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY, originalEnabledGluten)
} else {
spark.sparkContext.setLocalProperty(
QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY, "true")
}
// --- modified end
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import com.fasterxml.jackson.databind.annotation.JsonDeserialize
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.gluten.utils.QueryPlanSelector
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.{Column, DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.execution.datasources.v2.clickhouse.ClickHouseConfig
Expand Down Expand Up @@ -114,7 +115,12 @@ object VacuumCommand extends VacuumCommandImpl with Serializable {
retentionHours: Option[Double] = None,
clock: Clock = new SystemClock): DataFrame = {
recordDeltaOperation(deltaLog, "delta.gc") {

// --- modified start
val originalEnabledGluten =
spark.sparkContext.getLocalProperty(QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY)
// gluten can not support vacuum command
spark.sparkContext.setLocalProperty(QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY, "false")
// --- modified end
val path = deltaLog.dataPath
val deltaHadoopConf = deltaLog.newDeltaHadoopConf()
val fs = path.getFileSystem(deltaHadoopConf)
Expand Down Expand Up @@ -349,6 +355,16 @@ object VacuumCommand extends VacuumCommandImpl with Serializable {
spark.createDataset(Seq(basePath)).toDF("path")
} finally {
allFilesAndDirs.unpersist()

// --- modified start
if (originalEnabledGluten != null) {
spark.sparkContext.setLocalProperty(
QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY, originalEnabledGluten)
} else {
spark.sparkContext.setLocalProperty(
QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY, "true")
}
// --- modified end
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
*/
package org.apache.gluten.execution

import org.apache.gluten.GlutenConfig

import org.apache.spark.SparkConf
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.delta.actions.AddFile
Expand Down Expand Up @@ -1311,15 +1309,13 @@ class GlutenClickHouseDeltaParquetWriteSuite
val ret = spark.sql("select count(*) from lineitem_delta_parquet_optimize_p2").collect()
assert(ret.apply(0).get(0) == 600572)

spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
assert(countFiles(new File(s"$basePath/lineitem_delta_parquet_optimize_p2")) == 23)
spark.sql("VACUUM lineitem_delta_parquet_optimize_p2 RETAIN 0 HOURS")
if (sparkVersion.equals("3.2")) {
assert(countFiles(new File(s"$basePath/lineitem_delta_parquet_optimize_p2")) == 5)
} else {
assert(countFiles(new File(s"$basePath/lineitem_delta_parquet_optimize_p2")) == 7)
}
spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")

val ret2 = spark.sql("select count(*) from lineitem_delta_parquet_optimize_p2").collect()
assert(ret2.apply(0).get(0) == 600572)
Expand All @@ -1343,15 +1339,13 @@ class GlutenClickHouseDeltaParquetWriteSuite
val ret = spark.sql("select count(*) from lineitem_delta_parquet_optimize_p4").collect()
assert(ret.apply(0).get(0) == 600572)

spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
assert(countFiles(new File(s"$basePath/lineitem_delta_parquet_optimize_p4")) == 149)
spark.sql("VACUUM lineitem_delta_parquet_optimize_p4 RETAIN 0 HOURS")
if (sparkVersion.equals("3.2")) {
assert(countFiles(new File(s"$basePath/lineitem_delta_parquet_optimize_p4")) == 23)
} else {
assert(countFiles(new File(s"$basePath/lineitem_delta_parquet_optimize_p4")) == 25)
}
spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")

val ret2 = spark.sql("select count(*) from lineitem_delta_parquet_optimize_p4").collect()
assert(ret2.apply(0).get(0) == 600572)
Expand All @@ -1377,9 +1371,7 @@ class GlutenClickHouseDeltaParquetWriteSuite
val clickhouseTable = DeltaTable.forPath(spark, dataPath)
clickhouseTable.optimize().executeCompaction()

spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
clickhouseTable.vacuum(0.0)
spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")
if (sparkVersion.equals("3.2")) {
assert(countFiles(new File(dataPath)) == 27)
} else {
Expand All @@ -1397,9 +1389,7 @@ class GlutenClickHouseDeltaParquetWriteSuite
val clickhouseTable = DeltaTable.forPath(spark, dataPath)
clickhouseTable.optimize().executeCompaction()

spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
clickhouseTable.vacuum(0.0)
spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")
if (sparkVersion.equals("3.2")) {
assert(countFiles(new File(dataPath)) == 6)
} else {
Expand All @@ -1414,9 +1404,7 @@ class GlutenClickHouseDeltaParquetWriteSuite
val clickhouseTable = DeltaTable.forPath(spark, dataPath)
clickhouseTable.optimize().executeCompaction()

spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
clickhouseTable.vacuum(0.0)
spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")
if (sparkVersion.equals("3.2")) {
assert(countFiles(new File(dataPath)) == 5)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
*/
package org.apache.gluten.execution

import org.apache.gluten.GlutenConfig

import org.apache.spark.SparkConf
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
Expand Down Expand Up @@ -124,7 +122,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite
val ret = spark.sql("select count(*) from lineitem_mergetree_optimize_p").collect()
assert(ret.apply(0).get(0) == 600572)

spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p")) == 22728)
spark.sql("VACUUM lineitem_mergetree_optimize_p RETAIN 0 HOURS")
if (sparkVersion.equals("3.2")) {
Expand All @@ -134,8 +131,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p")) == 22730)
}

spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")

val ret2 = spark.sql("select count(*) from lineitem_mergetree_optimize_p").collect()
assert(ret2.apply(0).get(0) == 600572)
}
Expand Down Expand Up @@ -167,7 +162,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite
val ret = spark.sql("select count(*) from lineitem_mergetree_optimize_p2").collect()
assert(ret.apply(0).get(0) == 600572)

spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p2")) == 372)
spark.sql("VACUUM lineitem_mergetree_optimize_p2 RETAIN 0 HOURS")
if (sparkVersion.equals("3.2")) {
Expand All @@ -182,7 +176,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite
} else {
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p2")) == 226)
}
spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")

val ret2 = spark.sql("select count(*) from lineitem_mergetree_optimize_p2").collect()
assert(ret2.apply(0).get(0) == 600572)
Expand All @@ -206,7 +199,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite
val ret = spark.sql("select count(*) from lineitem_mergetree_optimize_p3").collect()
assert(ret.apply(0).get(0) == 600572)

spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p3")) == 516)
spark.sql("VACUUM lineitem_mergetree_optimize_p3 RETAIN 0 HOURS")
if (sparkVersion.equals("3.2")) {
Expand All @@ -220,7 +212,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite
} else {
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p3")) == 282)
}
spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")

val ret2 = spark.sql("select count(*) from lineitem_mergetree_optimize_p3").collect()
assert(ret2.apply(0).get(0) == 600572)
Expand All @@ -245,7 +236,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite
val ret = spark.sql("select count(*) from lineitem_mergetree_optimize_p4").collect()
assert(ret.apply(0).get(0) == 600572)

spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p4")) == 516)
spark.sql("VACUUM lineitem_mergetree_optimize_p4 RETAIN 0 HOURS")
if (sparkVersion.equals("3.2")) {
Expand All @@ -259,7 +249,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite
} else {
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p4")) == 282)
}
spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")

val ret2 = spark.sql("select count(*) from lineitem_mergetree_optimize_p4").collect()
assert(ret2.apply(0).get(0) == 600572)
Expand All @@ -283,7 +272,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite

spark.sql("optimize lineitem_mergetree_optimize_p5")

spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
spark.sql("VACUUM lineitem_mergetree_optimize_p5 RETAIN 0 HOURS")
spark.sql("VACUUM lineitem_mergetree_optimize_p5 RETAIN 0 HOURS")
if (sparkVersion.equals("3.2")) {
Expand All @@ -293,7 +281,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite
// this case will create a checkpoint
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p5")) == 105)
}
spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")

val ret = spark.sql("select count(*) from lineitem_mergetree_optimize_p5").collect()
assert(ret.apply(0).get(0) == 600572)
Expand All @@ -309,7 +296,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite

spark.sql("optimize lineitem_mergetree_optimize_p5")

spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
spark.sql("VACUUM lineitem_mergetree_optimize_p5 RETAIN 0 HOURS")
spark.sql("VACUUM lineitem_mergetree_optimize_p5 RETAIN 0 HOURS")
if (sparkVersion.equals("3.2")) {
Expand All @@ -318,7 +304,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite
// For Spark 3.3 + Delta 2.3, vacuum command will create two commit files in deltalog dir.
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p5")) == 104)
}
spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")

val ret = spark.sql("select count(*) from lineitem_mergetree_optimize_p5").collect()
assert(ret.apply(0).get(0) == 600572)
Expand All @@ -327,7 +312,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite
// now merge all parts (testing merging from merged parts)
spark.sql("optimize lineitem_mergetree_optimize_p5")

spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
spark.sql("VACUUM lineitem_mergetree_optimize_p5 RETAIN 0 HOURS")
spark.sql("VACUUM lineitem_mergetree_optimize_p5 RETAIN 0 HOURS")
if (sparkVersion.equals("3.2")) {
Expand All @@ -336,7 +320,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite
// For Spark 3.3 + Delta 2.3, vacuum command will create two commit files in deltalog dir.
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p5")) == 93)
}
spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")

val ret = spark.sql("select count(*) from lineitem_mergetree_optimize_p5").collect()
assert(ret.apply(0).get(0) == 600572)
Expand All @@ -362,7 +345,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite
val ret = spark.sql("select count(*) from lineitem_mergetree_optimize_p6").collect()
assert(ret.apply(0).get(0) == 600572)

spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p6")) == {
if (sparkVersion.equals("3.2")) 499 else 528
})
Expand All @@ -371,7 +353,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p6")) == {
if (sparkVersion.equals("3.2")) 315 else 327
})
spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")

val ret2 = spark.sql("select count(*) from lineitem_mergetree_optimize_p6").collect()
assert(ret2.apply(0).get(0) == 600572)
Expand All @@ -394,9 +375,7 @@ class GlutenClickHouseMergeTreeOptimizeSuite
|""".stripMargin)

spark.sql("optimize lineitem_mergetree_index")
spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
spark.sql("vacuum lineitem_mergetree_index")
spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")

val df = spark
.sql(s"""
Expand Down Expand Up @@ -440,10 +419,8 @@ class GlutenClickHouseMergeTreeOptimizeSuite
val clickhouseTable = ClickhouseTable.forPath(spark, dataPath)
clickhouseTable.optimize().executeCompaction()

spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
clickhouseTable.vacuum(0.0)
clickhouseTable.vacuum(0.0)
spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")
if (sparkVersion.equals("3.2")) {
assert(countFiles(new File(dataPath)) == 99)
} else {
Expand All @@ -465,10 +442,8 @@ class GlutenClickHouseMergeTreeOptimizeSuite
val clickhouseTable = ClickhouseTable.forPath(spark, dataPath)
clickhouseTable.optimize().executeCompaction()

spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
clickhouseTable.vacuum(0.0)
clickhouseTable.vacuum(0.0)
spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")
if (sparkVersion.equals("3.2")) {
assert(countFiles(new File(dataPath)) == 93)
} else {
Expand All @@ -483,10 +458,8 @@ class GlutenClickHouseMergeTreeOptimizeSuite
val clickhouseTable = ClickhouseTable.forPath(spark, dataPath)
clickhouseTable.optimize().executeCompaction()

spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
clickhouseTable.vacuum(0.0)
clickhouseTable.vacuum(0.0)
spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")
if (sparkVersion.equals("3.2")) {
assert(countFiles(new File(dataPath)) == 77)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
*/
package org.apache.gluten.execution

import org.apache.gluten.GlutenConfig

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SparkSession.{getActiveSession, getDefaultSession}
Expand Down Expand Up @@ -250,9 +248,7 @@ class GlutenClickHouseTableAfterRestart

restartSpark()

spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
spark.sql("vacuum table_restart_vacuum")
spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")

assert(spark.sql("select count(*) from table_restart_vacuum").collect().apply(0).get(0) == 4)
}
Expand Down

0 comments on commit c7bfb97

Please sign in to comment.