Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GLUTEN-5944][CH] Fallback to run delta vacuum command #5945

Merged
merged 1 commit into from
Jun 1, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -16,6 +16,8 @@
*/
package org.apache.spark.sql.delta.commands

import org.apache.gluten.utils.QueryPlanSelector

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.delta._
import org.apache.spark.sql.delta.actions.{FileAction, RemoveFile}
@@ -141,6 +143,13 @@ object VacuumCommand extends VacuumCommandImpl with Serializable {
val relativizeIgnoreError =
spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_VACUUM_RELATIVIZE_IGNORE_ERROR)

// --- modified start
val originalEnabledGluten =
spark.sparkContext.getLocalProperty(QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY)
// gluten can not support vacuum command
spark.sparkContext.setLocalProperty(QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY, "false")
// --- modified end

val validFiles = snapshot.stateDS
.mapPartitions {
actions =>
@@ -358,6 +367,16 @@ object VacuumCommand extends VacuumCommandImpl with Serializable {
spark.createDataset(Seq(basePath)).toDF("path")
} finally {
allFilesAndDirs.unpersist()

// --- modified start
if (originalEnabledGluten != null) {
spark.sparkContext.setLocalProperty(
QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY, originalEnabledGluten)
} else {
spark.sparkContext.setLocalProperty(
QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY, "true")
}
// --- modified end
}
}
}
Original file line number Diff line number Diff line change
@@ -33,6 +33,7 @@ import com.fasterxml.jackson.databind.annotation.JsonDeserialize
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.gluten.utils.QueryPlanSelector
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.{Column, DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.execution.datasources.v2.clickhouse.ClickHouseConfig
@@ -157,6 +158,14 @@ object VacuumCommand extends VacuumCommandImpl with Serializable {
val relativizeIgnoreError =
spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_VACUUM_RELATIVIZE_IGNORE_ERROR)
val startTimeToIdentifyEligibleFiles = System.currentTimeMillis()

// --- modified start
val originalEnabledGluten =
spark.sparkContext.getLocalProperty(QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY)
// gluten can not support vacuum command
spark.sparkContext.setLocalProperty(QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY, "false")
// --- modified end

val validFiles = snapshot.stateDS
.mapPartitions { actions =>
val reservoirBase = new Path(basePath)
@@ -349,6 +358,16 @@ object VacuumCommand extends VacuumCommandImpl with Serializable {
spark.createDataset(Seq(basePath)).toDF("path")
} finally {
allFilesAndDirs.unpersist()

// --- modified start
if (originalEnabledGluten != null) {
spark.sparkContext.setLocalProperty(
QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY, originalEnabledGluten)
} else {
spark.sparkContext.setLocalProperty(
QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY, "true")
}
// --- modified end
}
}
}
Original file line number Diff line number Diff line change
@@ -16,8 +16,6 @@
*/
package org.apache.gluten.execution

import org.apache.gluten.GlutenConfig

import org.apache.spark.SparkConf
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.delta.actions.AddFile
@@ -1311,15 +1309,13 @@ class GlutenClickHouseDeltaParquetWriteSuite
val ret = spark.sql("select count(*) from lineitem_delta_parquet_optimize_p2").collect()
assert(ret.apply(0).get(0) == 600572)

spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
assert(countFiles(new File(s"$basePath/lineitem_delta_parquet_optimize_p2")) == 23)
spark.sql("VACUUM lineitem_delta_parquet_optimize_p2 RETAIN 0 HOURS")
if (sparkVersion.equals("3.2")) {
assert(countFiles(new File(s"$basePath/lineitem_delta_parquet_optimize_p2")) == 5)
} else {
assert(countFiles(new File(s"$basePath/lineitem_delta_parquet_optimize_p2")) == 7)
}
spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")

val ret2 = spark.sql("select count(*) from lineitem_delta_parquet_optimize_p2").collect()
assert(ret2.apply(0).get(0) == 600572)
@@ -1343,15 +1339,13 @@ class GlutenClickHouseDeltaParquetWriteSuite
val ret = spark.sql("select count(*) from lineitem_delta_parquet_optimize_p4").collect()
assert(ret.apply(0).get(0) == 600572)

spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
assert(countFiles(new File(s"$basePath/lineitem_delta_parquet_optimize_p4")) == 149)
spark.sql("VACUUM lineitem_delta_parquet_optimize_p4 RETAIN 0 HOURS")
if (sparkVersion.equals("3.2")) {
assert(countFiles(new File(s"$basePath/lineitem_delta_parquet_optimize_p4")) == 23)
} else {
assert(countFiles(new File(s"$basePath/lineitem_delta_parquet_optimize_p4")) == 25)
}
spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")

val ret2 = spark.sql("select count(*) from lineitem_delta_parquet_optimize_p4").collect()
assert(ret2.apply(0).get(0) == 600572)
@@ -1377,9 +1371,7 @@ class GlutenClickHouseDeltaParquetWriteSuite
val clickhouseTable = DeltaTable.forPath(spark, dataPath)
clickhouseTable.optimize().executeCompaction()

spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
clickhouseTable.vacuum(0.0)
spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")
if (sparkVersion.equals("3.2")) {
assert(countFiles(new File(dataPath)) == 27)
} else {
@@ -1397,9 +1389,7 @@ class GlutenClickHouseDeltaParquetWriteSuite
val clickhouseTable = DeltaTable.forPath(spark, dataPath)
clickhouseTable.optimize().executeCompaction()

spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
clickhouseTable.vacuum(0.0)
spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")
if (sparkVersion.equals("3.2")) {
assert(countFiles(new File(dataPath)) == 6)
} else {
@@ -1414,9 +1404,7 @@ class GlutenClickHouseDeltaParquetWriteSuite
val clickhouseTable = DeltaTable.forPath(spark, dataPath)
clickhouseTable.optimize().executeCompaction()

spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
clickhouseTable.vacuum(0.0)
spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")
if (sparkVersion.equals("3.2")) {
assert(countFiles(new File(dataPath)) == 5)
} else {
Original file line number Diff line number Diff line change
@@ -16,8 +16,6 @@
*/
package org.apache.gluten.execution

import org.apache.gluten.GlutenConfig

import org.apache.spark.SparkConf
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
@@ -124,7 +122,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite
val ret = spark.sql("select count(*) from lineitem_mergetree_optimize_p").collect()
assert(ret.apply(0).get(0) == 600572)

spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p")) == 22728)
spark.sql("VACUUM lineitem_mergetree_optimize_p RETAIN 0 HOURS")
if (sparkVersion.equals("3.2")) {
@@ -134,8 +131,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p")) == 22730)
}

spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")

val ret2 = spark.sql("select count(*) from lineitem_mergetree_optimize_p").collect()
assert(ret2.apply(0).get(0) == 600572)
}
@@ -167,7 +162,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite
val ret = spark.sql("select count(*) from lineitem_mergetree_optimize_p2").collect()
assert(ret.apply(0).get(0) == 600572)

spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p2")) == 372)
spark.sql("VACUUM lineitem_mergetree_optimize_p2 RETAIN 0 HOURS")
if (sparkVersion.equals("3.2")) {
@@ -182,7 +176,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite
} else {
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p2")) == 226)
}
spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")

val ret2 = spark.sql("select count(*) from lineitem_mergetree_optimize_p2").collect()
assert(ret2.apply(0).get(0) == 600572)
@@ -206,7 +199,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite
val ret = spark.sql("select count(*) from lineitem_mergetree_optimize_p3").collect()
assert(ret.apply(0).get(0) == 600572)

spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p3")) == 516)
spark.sql("VACUUM lineitem_mergetree_optimize_p3 RETAIN 0 HOURS")
if (sparkVersion.equals("3.2")) {
@@ -220,7 +212,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite
} else {
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p3")) == 282)
}
spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")

val ret2 = spark.sql("select count(*) from lineitem_mergetree_optimize_p3").collect()
assert(ret2.apply(0).get(0) == 600572)
@@ -245,7 +236,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite
val ret = spark.sql("select count(*) from lineitem_mergetree_optimize_p4").collect()
assert(ret.apply(0).get(0) == 600572)

spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p4")) == 516)
spark.sql("VACUUM lineitem_mergetree_optimize_p4 RETAIN 0 HOURS")
if (sparkVersion.equals("3.2")) {
@@ -259,7 +249,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite
} else {
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p4")) == 282)
}
spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")

val ret2 = spark.sql("select count(*) from lineitem_mergetree_optimize_p4").collect()
assert(ret2.apply(0).get(0) == 600572)
@@ -283,7 +272,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite

spark.sql("optimize lineitem_mergetree_optimize_p5")

spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
spark.sql("VACUUM lineitem_mergetree_optimize_p5 RETAIN 0 HOURS")
spark.sql("VACUUM lineitem_mergetree_optimize_p5 RETAIN 0 HOURS")
if (sparkVersion.equals("3.2")) {
@@ -293,7 +281,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite
// this case will create a checkpoint
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p5")) == 105)
}
spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")

val ret = spark.sql("select count(*) from lineitem_mergetree_optimize_p5").collect()
assert(ret.apply(0).get(0) == 600572)
@@ -309,7 +296,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite

spark.sql("optimize lineitem_mergetree_optimize_p5")

spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
spark.sql("VACUUM lineitem_mergetree_optimize_p5 RETAIN 0 HOURS")
spark.sql("VACUUM lineitem_mergetree_optimize_p5 RETAIN 0 HOURS")
if (sparkVersion.equals("3.2")) {
@@ -318,7 +304,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite
// For Spark 3.3 + Delta 2.3, vacuum command will create two commit files in deltalog dir.
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p5")) == 104)
}
spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")

val ret = spark.sql("select count(*) from lineitem_mergetree_optimize_p5").collect()
assert(ret.apply(0).get(0) == 600572)
@@ -327,7 +312,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite
// now merge all parts (testing merging from merged parts)
spark.sql("optimize lineitem_mergetree_optimize_p5")

spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
spark.sql("VACUUM lineitem_mergetree_optimize_p5 RETAIN 0 HOURS")
spark.sql("VACUUM lineitem_mergetree_optimize_p5 RETAIN 0 HOURS")
if (sparkVersion.equals("3.2")) {
@@ -336,7 +320,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite
// For Spark 3.3 + Delta 2.3, vacuum command will create two commit files in deltalog dir.
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p5")) == 93)
}
spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")

val ret = spark.sql("select count(*) from lineitem_mergetree_optimize_p5").collect()
assert(ret.apply(0).get(0) == 600572)
@@ -362,7 +345,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite
val ret = spark.sql("select count(*) from lineitem_mergetree_optimize_p6").collect()
assert(ret.apply(0).get(0) == 600572)

spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p6")) == {
if (sparkVersion.equals("3.2")) 499 else 528
})
@@ -371,7 +353,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p6")) == {
if (sparkVersion.equals("3.2")) 315 else 327
})
spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")

val ret2 = spark.sql("select count(*) from lineitem_mergetree_optimize_p6").collect()
assert(ret2.apply(0).get(0) == 600572)
@@ -394,9 +375,7 @@ class GlutenClickHouseMergeTreeOptimizeSuite
|""".stripMargin)

spark.sql("optimize lineitem_mergetree_index")
spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
spark.sql("vacuum lineitem_mergetree_index")
spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")

val df = spark
.sql(s"""
@@ -440,10 +419,8 @@ class GlutenClickHouseMergeTreeOptimizeSuite
val clickhouseTable = ClickhouseTable.forPath(spark, dataPath)
clickhouseTable.optimize().executeCompaction()

spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
clickhouseTable.vacuum(0.0)
clickhouseTable.vacuum(0.0)
spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")
if (sparkVersion.equals("3.2")) {
assert(countFiles(new File(dataPath)) == 99)
} else {
@@ -465,10 +442,8 @@ class GlutenClickHouseMergeTreeOptimizeSuite
val clickhouseTable = ClickhouseTable.forPath(spark, dataPath)
clickhouseTable.optimize().executeCompaction()

spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
clickhouseTable.vacuum(0.0)
clickhouseTable.vacuum(0.0)
spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")
if (sparkVersion.equals("3.2")) {
assert(countFiles(new File(dataPath)) == 93)
} else {
@@ -483,10 +458,8 @@ class GlutenClickHouseMergeTreeOptimizeSuite
val clickhouseTable = ClickhouseTable.forPath(spark, dataPath)
clickhouseTable.optimize().executeCompaction()

spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
clickhouseTable.vacuum(0.0)
clickhouseTable.vacuum(0.0)
spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")
if (sparkVersion.equals("3.2")) {
assert(countFiles(new File(dataPath)) == 77)
} else {
Original file line number Diff line number Diff line change
@@ -16,8 +16,6 @@
*/
package org.apache.gluten.execution

import org.apache.gluten.GlutenConfig

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SparkSession.{getActiveSession, getDefaultSession}
@@ -250,9 +248,7 @@ class GlutenClickHouseTableAfterRestart

restartSpark()

spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
spark.sql("vacuum table_restart_vacuum")
spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")

assert(spark.sql("select count(*) from table_restart_vacuum").collect().apply(0).get(0) == 4)
}