Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GLUTEN-5944][CH] Fallback to run delta vacuum command #5945

Merged
merged 1 commit into from
Jun 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.spark.sql.delta.commands

import org.apache.gluten.utils.QueryPlanSelector

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.delta._
import org.apache.spark.sql.delta.actions.{FileAction, RemoveFile}
Expand Down Expand Up @@ -141,6 +143,13 @@ object VacuumCommand extends VacuumCommandImpl with Serializable {
val relativizeIgnoreError =
spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_VACUUM_RELATIVIZE_IGNORE_ERROR)

// --- modified start
val originalEnabledGluten =
spark.sparkContext.getLocalProperty(QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY)
// gluten can not support vacuum command
spark.sparkContext.setLocalProperty(QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY, "false")
// --- modified end

val validFiles = snapshot.stateDS
.mapPartitions {
actions =>
Expand Down Expand Up @@ -358,6 +367,16 @@ object VacuumCommand extends VacuumCommandImpl with Serializable {
spark.createDataset(Seq(basePath)).toDF("path")
} finally {
allFilesAndDirs.unpersist()

// --- modified start
if (originalEnabledGluten != null) {
spark.sparkContext.setLocalProperty(
QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY, originalEnabledGluten)
} else {
spark.sparkContext.setLocalProperty(
QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY, "true")
}
// --- modified end
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import com.fasterxml.jackson.databind.annotation.JsonDeserialize
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.gluten.utils.QueryPlanSelector
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.{Column, DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.execution.datasources.v2.clickhouse.ClickHouseConfig
Expand Down Expand Up @@ -157,6 +158,14 @@ object VacuumCommand extends VacuumCommandImpl with Serializable {
val relativizeIgnoreError =
spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_VACUUM_RELATIVIZE_IGNORE_ERROR)
val startTimeToIdentifyEligibleFiles = System.currentTimeMillis()

// --- modified start
val originalEnabledGluten =
spark.sparkContext.getLocalProperty(QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY)
// gluten can not support vacuum command
spark.sparkContext.setLocalProperty(QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY, "false")
// --- modified end

val validFiles = snapshot.stateDS
.mapPartitions { actions =>
val reservoirBase = new Path(basePath)
Expand Down Expand Up @@ -349,6 +358,16 @@ object VacuumCommand extends VacuumCommandImpl with Serializable {
spark.createDataset(Seq(basePath)).toDF("path")
} finally {
allFilesAndDirs.unpersist()

// --- modified start
if (originalEnabledGluten != null) {
spark.sparkContext.setLocalProperty(
QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY, originalEnabledGluten)
} else {
spark.sparkContext.setLocalProperty(
QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY, "true")
}
// --- modified end
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
*/
package org.apache.gluten.execution

import org.apache.gluten.GlutenConfig

import org.apache.spark.SparkConf
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.delta.actions.AddFile
Expand Down Expand Up @@ -1311,15 +1309,13 @@ class GlutenClickHouseDeltaParquetWriteSuite
val ret = spark.sql("select count(*) from lineitem_delta_parquet_optimize_p2").collect()
assert(ret.apply(0).get(0) == 600572)

spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
assert(countFiles(new File(s"$basePath/lineitem_delta_parquet_optimize_p2")) == 23)
spark.sql("VACUUM lineitem_delta_parquet_optimize_p2 RETAIN 0 HOURS")
if (sparkVersion.equals("3.2")) {
assert(countFiles(new File(s"$basePath/lineitem_delta_parquet_optimize_p2")) == 5)
} else {
assert(countFiles(new File(s"$basePath/lineitem_delta_parquet_optimize_p2")) == 7)
}
spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")

val ret2 = spark.sql("select count(*) from lineitem_delta_parquet_optimize_p2").collect()
assert(ret2.apply(0).get(0) == 600572)
Expand All @@ -1343,15 +1339,13 @@ class GlutenClickHouseDeltaParquetWriteSuite
val ret = spark.sql("select count(*) from lineitem_delta_parquet_optimize_p4").collect()
assert(ret.apply(0).get(0) == 600572)

spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
assert(countFiles(new File(s"$basePath/lineitem_delta_parquet_optimize_p4")) == 149)
spark.sql("VACUUM lineitem_delta_parquet_optimize_p4 RETAIN 0 HOURS")
if (sparkVersion.equals("3.2")) {
assert(countFiles(new File(s"$basePath/lineitem_delta_parquet_optimize_p4")) == 23)
} else {
assert(countFiles(new File(s"$basePath/lineitem_delta_parquet_optimize_p4")) == 25)
}
spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")

val ret2 = spark.sql("select count(*) from lineitem_delta_parquet_optimize_p4").collect()
assert(ret2.apply(0).get(0) == 600572)
Expand All @@ -1377,9 +1371,7 @@ class GlutenClickHouseDeltaParquetWriteSuite
val clickhouseTable = DeltaTable.forPath(spark, dataPath)
clickhouseTable.optimize().executeCompaction()

spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
clickhouseTable.vacuum(0.0)
spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")
if (sparkVersion.equals("3.2")) {
assert(countFiles(new File(dataPath)) == 27)
} else {
Expand All @@ -1397,9 +1389,7 @@ class GlutenClickHouseDeltaParquetWriteSuite
val clickhouseTable = DeltaTable.forPath(spark, dataPath)
clickhouseTable.optimize().executeCompaction()

spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
clickhouseTable.vacuum(0.0)
spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")
if (sparkVersion.equals("3.2")) {
assert(countFiles(new File(dataPath)) == 6)
} else {
Expand All @@ -1414,9 +1404,7 @@ class GlutenClickHouseDeltaParquetWriteSuite
val clickhouseTable = DeltaTable.forPath(spark, dataPath)
clickhouseTable.optimize().executeCompaction()

spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
clickhouseTable.vacuum(0.0)
spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")
if (sparkVersion.equals("3.2")) {
assert(countFiles(new File(dataPath)) == 5)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
*/
package org.apache.gluten.execution

import org.apache.gluten.GlutenConfig

import org.apache.spark.SparkConf
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
Expand Down Expand Up @@ -124,7 +122,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite
val ret = spark.sql("select count(*) from lineitem_mergetree_optimize_p").collect()
assert(ret.apply(0).get(0) == 600572)

spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p")) == 22728)
spark.sql("VACUUM lineitem_mergetree_optimize_p RETAIN 0 HOURS")
if (sparkVersion.equals("3.2")) {
Expand All @@ -134,8 +131,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p")) == 22730)
}

spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")

val ret2 = spark.sql("select count(*) from lineitem_mergetree_optimize_p").collect()
assert(ret2.apply(0).get(0) == 600572)
}
Expand Down Expand Up @@ -167,7 +162,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite
val ret = spark.sql("select count(*) from lineitem_mergetree_optimize_p2").collect()
assert(ret.apply(0).get(0) == 600572)

spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p2")) == 372)
spark.sql("VACUUM lineitem_mergetree_optimize_p2 RETAIN 0 HOURS")
if (sparkVersion.equals("3.2")) {
Expand All @@ -182,7 +176,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite
} else {
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p2")) == 226)
}
spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")

val ret2 = spark.sql("select count(*) from lineitem_mergetree_optimize_p2").collect()
assert(ret2.apply(0).get(0) == 600572)
Expand All @@ -206,7 +199,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite
val ret = spark.sql("select count(*) from lineitem_mergetree_optimize_p3").collect()
assert(ret.apply(0).get(0) == 600572)

spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p3")) == 516)
spark.sql("VACUUM lineitem_mergetree_optimize_p3 RETAIN 0 HOURS")
if (sparkVersion.equals("3.2")) {
Expand All @@ -220,7 +212,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite
} else {
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p3")) == 282)
}
spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")

val ret2 = spark.sql("select count(*) from lineitem_mergetree_optimize_p3").collect()
assert(ret2.apply(0).get(0) == 600572)
Expand All @@ -245,7 +236,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite
val ret = spark.sql("select count(*) from lineitem_mergetree_optimize_p4").collect()
assert(ret.apply(0).get(0) == 600572)

spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p4")) == 516)
spark.sql("VACUUM lineitem_mergetree_optimize_p4 RETAIN 0 HOURS")
if (sparkVersion.equals("3.2")) {
Expand All @@ -259,7 +249,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite
} else {
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p4")) == 282)
}
spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")

val ret2 = spark.sql("select count(*) from lineitem_mergetree_optimize_p4").collect()
assert(ret2.apply(0).get(0) == 600572)
Expand All @@ -283,7 +272,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite

spark.sql("optimize lineitem_mergetree_optimize_p5")

spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
spark.sql("VACUUM lineitem_mergetree_optimize_p5 RETAIN 0 HOURS")
spark.sql("VACUUM lineitem_mergetree_optimize_p5 RETAIN 0 HOURS")
if (sparkVersion.equals("3.2")) {
Expand All @@ -293,7 +281,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite
// this case will create a checkpoint
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p5")) == 105)
}
spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")

val ret = spark.sql("select count(*) from lineitem_mergetree_optimize_p5").collect()
assert(ret.apply(0).get(0) == 600572)
Expand All @@ -309,7 +296,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite

spark.sql("optimize lineitem_mergetree_optimize_p5")

spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
spark.sql("VACUUM lineitem_mergetree_optimize_p5 RETAIN 0 HOURS")
spark.sql("VACUUM lineitem_mergetree_optimize_p5 RETAIN 0 HOURS")
if (sparkVersion.equals("3.2")) {
Expand All @@ -318,7 +304,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite
// For Spark 3.3 + Delta 2.3, vacuum command will create two commit files in deltalog dir.
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p5")) == 104)
}
spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")

val ret = spark.sql("select count(*) from lineitem_mergetree_optimize_p5").collect()
assert(ret.apply(0).get(0) == 600572)
Expand All @@ -327,7 +312,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite
// now merge all parts (testing merging from merged parts)
spark.sql("optimize lineitem_mergetree_optimize_p5")

spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
spark.sql("VACUUM lineitem_mergetree_optimize_p5 RETAIN 0 HOURS")
spark.sql("VACUUM lineitem_mergetree_optimize_p5 RETAIN 0 HOURS")
if (sparkVersion.equals("3.2")) {
Expand All @@ -336,7 +320,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite
// For Spark 3.3 + Delta 2.3, vacuum command will create two commit files in deltalog dir.
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p5")) == 93)
}
spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")

val ret = spark.sql("select count(*) from lineitem_mergetree_optimize_p5").collect()
assert(ret.apply(0).get(0) == 600572)
Expand All @@ -362,7 +345,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite
val ret = spark.sql("select count(*) from lineitem_mergetree_optimize_p6").collect()
assert(ret.apply(0).get(0) == 600572)

spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p6")) == {
if (sparkVersion.equals("3.2")) 499 else 528
})
Expand All @@ -371,7 +353,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p6")) == {
if (sparkVersion.equals("3.2")) 315 else 327
})
spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")

val ret2 = spark.sql("select count(*) from lineitem_mergetree_optimize_p6").collect()
assert(ret2.apply(0).get(0) == 600572)
Expand All @@ -394,9 +375,7 @@ class GlutenClickHouseMergeTreeOptimizeSuite
|""".stripMargin)

spark.sql("optimize lineitem_mergetree_index")
spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
spark.sql("vacuum lineitem_mergetree_index")
spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")

val df = spark
.sql(s"""
Expand Down Expand Up @@ -440,10 +419,8 @@ class GlutenClickHouseMergeTreeOptimizeSuite
val clickhouseTable = ClickhouseTable.forPath(spark, dataPath)
clickhouseTable.optimize().executeCompaction()

spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
clickhouseTable.vacuum(0.0)
clickhouseTable.vacuum(0.0)
spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")
if (sparkVersion.equals("3.2")) {
assert(countFiles(new File(dataPath)) == 99)
} else {
Expand All @@ -465,10 +442,8 @@ class GlutenClickHouseMergeTreeOptimizeSuite
val clickhouseTable = ClickhouseTable.forPath(spark, dataPath)
clickhouseTable.optimize().executeCompaction()

spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
clickhouseTable.vacuum(0.0)
clickhouseTable.vacuum(0.0)
spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")
if (sparkVersion.equals("3.2")) {
assert(countFiles(new File(dataPath)) == 93)
} else {
Expand All @@ -483,10 +458,8 @@ class GlutenClickHouseMergeTreeOptimizeSuite
val clickhouseTable = ClickhouseTable.forPath(spark, dataPath)
clickhouseTable.optimize().executeCompaction()

spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
clickhouseTable.vacuum(0.0)
clickhouseTable.vacuum(0.0)
spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")
if (sparkVersion.equals("3.2")) {
assert(countFiles(new File(dataPath)) == 77)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
*/
package org.apache.gluten.execution

import org.apache.gluten.GlutenConfig

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SparkSession.{getActiveSession, getDefaultSession}
Expand Down Expand Up @@ -250,9 +248,7 @@ class GlutenClickHouseTableAfterRestart

restartSpark()

spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
spark.sql("vacuum table_restart_vacuum")
spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")

assert(spark.sql("select count(*) from table_restart_vacuum").collect().apply(0).get(0) == 4)
}
Expand Down
Loading