From 08390214bb73a4e177f92143af29a6e339b2d23a Mon Sep 17 00:00:00 2001 From: Zhichao Zhang Date: Fri, 31 May 2024 20:59:47 +0800 Subject: [PATCH] [GLUTEN-5944][CH] Fallback to run delta vacuum command Fallback to run delta vacuum command: When AQE is on, now gluten CH backend + Delta ran delta vacuum command failed, fallback to run it first. Close #5944. --- .../sql/delta/commands/VacuumCommand.scala | 19 +++++++++++++ .../sql/delta/commands/VacuumCommand.scala | 19 +++++++++++++ ...utenClickHouseDeltaParquetWriteSuite.scala | 12 --------- ...utenClickHouseMergeTreeOptimizeSuite.scala | 27 ------------------- .../GlutenClickHouseTableAfterRestart.scala | 4 --- 5 files changed, 38 insertions(+), 43 deletions(-) diff --git a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/commands/VacuumCommand.scala b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/commands/VacuumCommand.scala index 3a390f64d559..c5527933b2fc 100644 --- a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/commands/VacuumCommand.scala +++ b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/commands/VacuumCommand.scala @@ -16,6 +16,8 @@ */ package org.apache.spark.sql.delta.commands +import org.apache.gluten.utils.QueryPlanSelector + import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.delta._ import org.apache.spark.sql.delta.actions.{FileAction, RemoveFile} @@ -141,6 +143,13 @@ object VacuumCommand extends VacuumCommandImpl with Serializable { val relativizeIgnoreError = spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_VACUUM_RELATIVIZE_IGNORE_ERROR) + // --- modified start + val originalEnabledGluten = + spark.sparkContext.getLocalProperty(QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY) + // gluten can not support vacuum command + spark.sparkContext.setLocalProperty(QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY, "false") + // --- modified end + val validFiles = snapshot.stateDS .mapPartitions { actions => @@ -358,6 +367,16 @@ object VacuumCommand extends VacuumCommandImpl with Serializable { spark.createDataset(Seq(basePath)).toDF("path") } finally { allFilesAndDirs.unpersist() + + // --- modified start + if (originalEnabledGluten != null) { + spark.sparkContext.setLocalProperty( + QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY, originalEnabledGluten) + } else { + spark.sparkContext.setLocalProperty( + QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY, "true") + } + // --- modified end } } } diff --git a/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/commands/VacuumCommand.scala b/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/commands/VacuumCommand.scala index 5be548caf01c..9f82feeee2fc 100644 --- a/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/commands/VacuumCommand.scala +++ b/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/commands/VacuumCommand.scala @@ -33,6 +33,7 @@ import com.fasterxml.jackson.databind.annotation.JsonDeserialize import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.gluten.utils.QueryPlanSelector import org.apache.spark.broadcast.Broadcast import org.apache.spark.sql.{Column, DataFrame, Dataset, SparkSession} import org.apache.spark.sql.execution.datasources.v2.clickhouse.ClickHouseConfig @@ -157,6 +158,14 @@ object VacuumCommand extends VacuumCommandImpl with Serializable { val relativizeIgnoreError = spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_VACUUM_RELATIVIZE_IGNORE_ERROR) val startTimeToIdentifyEligibleFiles = System.currentTimeMillis() + + // --- modified start + val originalEnabledGluten = + spark.sparkContext.getLocalProperty(QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY) + // gluten can not support vacuum command + spark.sparkContext.setLocalProperty(QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY, "false") + // --- modified end + val validFiles = snapshot.stateDS .mapPartitions { actions => val reservoirBase = new Path(basePath) @@ -349,6 +358,16 @@ object VacuumCommand extends VacuumCommandImpl with Serializable { spark.createDataset(Seq(basePath)).toDF("path") } finally { allFilesAndDirs.unpersist() + + // --- modified start + if (originalEnabledGluten != null) { + spark.sparkContext.setLocalProperty( + QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY, originalEnabledGluten) + } else { + spark.sparkContext.setLocalProperty( + QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY, "true") + } + // --- modified end } } } diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseDeltaParquetWriteSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseDeltaParquetWriteSuite.scala index a097fc6cd4ab..8fab604dee3c 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseDeltaParquetWriteSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseDeltaParquetWriteSuite.scala @@ -16,8 +16,6 @@ */ package org.apache.gluten.execution -import org.apache.gluten.GlutenConfig - import org.apache.spark.SparkConf import org.apache.spark.sql.SaveMode import org.apache.spark.sql.delta.actions.AddFile @@ -1311,7 +1309,6 @@ class GlutenClickHouseDeltaParquetWriteSuite val ret = spark.sql("select count(*) from lineitem_delta_parquet_optimize_p2").collect() assert(ret.apply(0).get(0) == 600572) - spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false") assert(countFiles(new File(s"$basePath/lineitem_delta_parquet_optimize_p2")) == 23) spark.sql("VACUUM lineitem_delta_parquet_optimize_p2 RETAIN 0 HOURS") if (sparkVersion.equals("3.2")) { @@ -1319,7 +1316,6 @@ class GlutenClickHouseDeltaParquetWriteSuite } else { assert(countFiles(new File(s"$basePath/lineitem_delta_parquet_optimize_p2")) == 7) } - spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true") val ret2 = spark.sql("select count(*) from lineitem_delta_parquet_optimize_p2").collect() assert(ret2.apply(0).get(0) == 600572) @@ -1343,7 +1339,6 @@ class GlutenClickHouseDeltaParquetWriteSuite val ret = spark.sql("select count(*) from lineitem_delta_parquet_optimize_p4").collect() assert(ret.apply(0).get(0) == 600572) - spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false") assert(countFiles(new File(s"$basePath/lineitem_delta_parquet_optimize_p4")) == 149) spark.sql("VACUUM lineitem_delta_parquet_optimize_p4 RETAIN 0 HOURS") if (sparkVersion.equals("3.2")) { @@ -1351,7 +1346,6 @@ class GlutenClickHouseDeltaParquetWriteSuite } else { assert(countFiles(new File(s"$basePath/lineitem_delta_parquet_optimize_p4")) == 25) } - spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true") val ret2 = spark.sql("select count(*) from lineitem_delta_parquet_optimize_p4").collect() assert(ret2.apply(0).get(0) == 600572) @@ -1377,9 +1371,7 @@ class GlutenClickHouseDeltaParquetWriteSuite val clickhouseTable = DeltaTable.forPath(spark, dataPath) clickhouseTable.optimize().executeCompaction() - spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false") clickhouseTable.vacuum(0.0) - spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true") if (sparkVersion.equals("3.2")) { assert(countFiles(new File(dataPath)) == 27) } else { @@ -1397,9 +1389,7 @@ class GlutenClickHouseDeltaParquetWriteSuite val clickhouseTable = DeltaTable.forPath(spark, dataPath) clickhouseTable.optimize().executeCompaction() - spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false") clickhouseTable.vacuum(0.0) - spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true") if (sparkVersion.equals("3.2")) { assert(countFiles(new File(dataPath)) == 6) } else { @@ -1414,9 +1404,7 @@ class GlutenClickHouseDeltaParquetWriteSuite val clickhouseTable = DeltaTable.forPath(spark, dataPath) clickhouseTable.optimize().executeCompaction() - spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false") clickhouseTable.vacuum(0.0) - spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true") if (sparkVersion.equals("3.2")) { assert(countFiles(new File(dataPath)) == 5) } else { diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeOptimizeSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeOptimizeSuite.scala index c94a3bf50c63..650bbcc7b32c 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeOptimizeSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeOptimizeSuite.scala @@ -16,8 +16,6 @@ */ package org.apache.gluten.execution -import org.apache.gluten.GlutenConfig - import org.apache.spark.SparkConf import org.apache.spark.sql.SaveMode import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper @@ -124,7 +122,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite val ret = spark.sql("select count(*) from lineitem_mergetree_optimize_p").collect() assert(ret.apply(0).get(0) == 600572) - spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false") assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p")) == 22728) spark.sql("VACUUM lineitem_mergetree_optimize_p RETAIN 0 HOURS") if (sparkVersion.equals("3.2")) { @@ -134,8 +131,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p")) == 22730) } - spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true") - val ret2 = spark.sql("select count(*) from lineitem_mergetree_optimize_p").collect() assert(ret2.apply(0).get(0) == 600572) } @@ -167,7 +162,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite val ret = spark.sql("select count(*) from lineitem_mergetree_optimize_p2").collect() assert(ret.apply(0).get(0) == 600572) - spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false") assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p2")) == 372) spark.sql("VACUUM lineitem_mergetree_optimize_p2 RETAIN 0 HOURS") if (sparkVersion.equals("3.2")) { @@ -182,7 +176,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite } else { assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p2")) == 226) } - spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true") val ret2 = spark.sql("select count(*) from lineitem_mergetree_optimize_p2").collect() assert(ret2.apply(0).get(0) == 600572) @@ -206,7 +199,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite val ret = spark.sql("select count(*) from lineitem_mergetree_optimize_p3").collect() assert(ret.apply(0).get(0) == 600572) - spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false") assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p3")) == 516) spark.sql("VACUUM lineitem_mergetree_optimize_p3 RETAIN 0 HOURS") if (sparkVersion.equals("3.2")) { @@ -220,7 +212,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite } else { assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p3")) == 282) } - spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true") val ret2 = spark.sql("select count(*) from lineitem_mergetree_optimize_p3").collect() assert(ret2.apply(0).get(0) == 600572) @@ -245,7 +236,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite val ret = spark.sql("select count(*) from lineitem_mergetree_optimize_p4").collect() assert(ret.apply(0).get(0) == 600572) - spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false") assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p4")) == 516) spark.sql("VACUUM lineitem_mergetree_optimize_p4 RETAIN 0 HOURS") if (sparkVersion.equals("3.2")) { @@ -259,7 +249,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite } else { assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p4")) == 282) } - spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true") val ret2 = spark.sql("select count(*) from lineitem_mergetree_optimize_p4").collect() assert(ret2.apply(0).get(0) == 600572) @@ -283,7 +272,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite spark.sql("optimize lineitem_mergetree_optimize_p5") - spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false") spark.sql("VACUUM lineitem_mergetree_optimize_p5 RETAIN 0 HOURS") spark.sql("VACUUM lineitem_mergetree_optimize_p5 RETAIN 0 HOURS") if (sparkVersion.equals("3.2")) { @@ -293,7 +281,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite // this case will create a checkpoint assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p5")) == 105) } - spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true") val ret = spark.sql("select count(*) from lineitem_mergetree_optimize_p5").collect() assert(ret.apply(0).get(0) == 600572) @@ -309,7 +296,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite spark.sql("optimize lineitem_mergetree_optimize_p5") - spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false") spark.sql("VACUUM lineitem_mergetree_optimize_p5 RETAIN 0 HOURS") spark.sql("VACUUM lineitem_mergetree_optimize_p5 RETAIN 0 HOURS") if (sparkVersion.equals("3.2")) { @@ -318,7 +304,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite // For Spark 3.3 + Delta 2.3, vacuum command will create two commit files in deltalog dir. assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p5")) == 104) } - spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true") val ret = spark.sql("select count(*) from lineitem_mergetree_optimize_p5").collect() assert(ret.apply(0).get(0) == 600572) @@ -327,7 +312,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite // now merge all parts (testing merging from merged parts) spark.sql("optimize lineitem_mergetree_optimize_p5") - spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false") spark.sql("VACUUM lineitem_mergetree_optimize_p5 RETAIN 0 HOURS") spark.sql("VACUUM lineitem_mergetree_optimize_p5 RETAIN 0 HOURS") if (sparkVersion.equals("3.2")) { @@ -336,7 +320,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite // For Spark 3.3 + Delta 2.3, vacuum command will create two commit files in deltalog dir. assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p5")) == 93) } - spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true") val ret = spark.sql("select count(*) from lineitem_mergetree_optimize_p5").collect() assert(ret.apply(0).get(0) == 600572) @@ -362,7 +345,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite val ret = spark.sql("select count(*) from lineitem_mergetree_optimize_p6").collect() assert(ret.apply(0).get(0) == 600572) - spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false") assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p6")) == { if (sparkVersion.equals("3.2")) 499 else 528 }) @@ -371,7 +353,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p6")) == { if (sparkVersion.equals("3.2")) 315 else 327 }) - spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true") val ret2 = spark.sql("select count(*) from lineitem_mergetree_optimize_p6").collect() assert(ret2.apply(0).get(0) == 600572) @@ -394,9 +375,7 @@ class GlutenClickHouseMergeTreeOptimizeSuite |""".stripMargin) spark.sql("optimize lineitem_mergetree_index") - spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false") spark.sql("vacuum lineitem_mergetree_index") - spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true") val df = spark .sql(s""" @@ -440,10 +419,8 @@ class GlutenClickHouseMergeTreeOptimizeSuite val clickhouseTable = ClickhouseTable.forPath(spark, dataPath) clickhouseTable.optimize().executeCompaction() - spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false") clickhouseTable.vacuum(0.0) clickhouseTable.vacuum(0.0) - spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true") if (sparkVersion.equals("3.2")) { assert(countFiles(new File(dataPath)) == 99) } else { @@ -465,10 +442,8 @@ class GlutenClickHouseMergeTreeOptimizeSuite val clickhouseTable = ClickhouseTable.forPath(spark, dataPath) clickhouseTable.optimize().executeCompaction() - spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false") clickhouseTable.vacuum(0.0) clickhouseTable.vacuum(0.0) - spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true") if (sparkVersion.equals("3.2")) { assert(countFiles(new File(dataPath)) == 93) } else { @@ -483,10 +458,8 @@ class GlutenClickHouseMergeTreeOptimizeSuite val clickhouseTable = ClickhouseTable.forPath(spark, dataPath) clickhouseTable.optimize().executeCompaction() - spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false") clickhouseTable.vacuum(0.0) clickhouseTable.vacuum(0.0) - spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true") if (sparkVersion.equals("3.2")) { assert(countFiles(new File(dataPath)) == 77) } else { diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTableAfterRestart.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTableAfterRestart.scala index 9e55df0fa836..baf79436cf8b 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTableAfterRestart.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTableAfterRestart.scala @@ -16,8 +16,6 @@ */ package org.apache.gluten.execution -import org.apache.gluten.GlutenConfig - import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession import org.apache.spark.sql.SparkSession.{getActiveSession, getDefaultSession} @@ -250,9 +248,7 @@ class GlutenClickHouseTableAfterRestart restartSpark() - spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false") spark.sql("vacuum table_restart_vacuum") - spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true") assert(spark.sql("select count(*) from table_restart_vacuum").collect().apply(0).get(0) == 4) }