From 996ff4c89b4f0e1060b0f32bf0647af4f0c70ebb Mon Sep 17 00:00:00 2001 From: Zhen Li <10524738+zhli1142015@users.noreply.github.com> Date: Wed, 21 Feb 2024 23:43:14 +0800 Subject: [PATCH] [Gluten-4585][VL] Support spark.sql.files.ignoreMissingFiles=true (#4725) --- cpp/core/config/GlutenConfig.h | 2 + cpp/velox/compute/WholeStageResultIterator.cc | 3 +- docs/velox-backend-limitations.md | 2 +- .../clickhouse/ClickHouseTestSettings.scala | 1 + .../utils/velox/VeloxTestSettings.scala | 2 +- .../sql/GlutenFileBasedDataSourceSuite.scala | 71 +++++++++++++++++- .../clickhouse/ClickHouseTestSettings.scala | 1 + .../utils/velox/VeloxTestSettings.scala | 2 +- .../sql/GlutenFileBasedDataSourceSuite.scala | 71 +++++++++++++++++- .../clickhouse/ClickHouseTestSettings.scala | 1 + .../utils/velox/VeloxTestSettings.scala | 2 +- .../sql/GlutenFileBasedDataSourceSuite.scala | 72 ++++++++++++++++++- .../scala/io/glutenproject/GlutenConfig.scala | 3 +- 13 files changed, 224 insertions(+), 9 deletions(-) diff --git a/cpp/core/config/GlutenConfig.h b/cpp/core/config/GlutenConfig.h index c96dc2844c70..72fc10fff173 100644 --- a/cpp/core/config/GlutenConfig.h +++ b/cpp/core/config/GlutenConfig.h @@ -34,6 +34,8 @@ const std::string kLegacySize = "spark.sql.legacy.sizeOfNull"; const std::string kSessionTimezone = "spark.sql.session.timeZone"; +const std::string kIgnoreMissingFiles = "spark.sql.files.ignoreMissingFiles"; + const std::string kDefaultSessionTimezone = "spark.gluten.sql.session.timeZone.default"; const std::string kSparkOffHeapMemory = "spark.gluten.memory.offHeap.size.in.bytes"; diff --git a/cpp/velox/compute/WholeStageResultIterator.cc b/cpp/velox/compute/WholeStageResultIterator.cc index a7f3f6b39efe..332ba210f7dc 100644 --- a/cpp/velox/compute/WholeStageResultIterator.cc +++ b/cpp/velox/compute/WholeStageResultIterator.cc @@ -525,7 +525,8 @@ std::shared_ptr WholeStageResultIterator::createConnectorConfig() configs[velox::connector::hive::HiveConfig::kArrowBridgeTimestampUnit] = "6"; configs[velox::connector::hive::HiveConfig::kMaxPartitionsPerWritersSession] = std::to_string(veloxCfg_->get(kMaxPartitions, 10000)); - + configs[velox::connector::hive::HiveConfig::kIgnoreMissingFilesSession] = + std::to_string(veloxCfg_->get(kIgnoreMissingFiles, false)); return std::make_shared(configs); } diff --git a/docs/velox-backend-limitations.md b/docs/velox-backend-limitations.md index eb35f30037ad..fb3b0f16f677 100644 --- a/docs/velox-backend-limitations.md +++ b/docs/velox-backend-limitations.md @@ -42,7 +42,7 @@ In certain cases, Gluten result may be different from Vanilla spark. Velox only supports double quotes surrounded strings, not single quotes, in JSON data. If single quotes are used, gluten will produce incorrect result. #### Parquet read conf -Gluten supports `spark.files.ignoreCorruptFiles` and `spark.files.ignoreMissingFiles` with default false, if true, the behavior is same as config false. +Gluten supports `spark.files.ignoreCorruptFiles` with default false, if true, the behavior is same as config false. Gluten ignores `spark.sql.parquet.datetimeRebaseModeInRead`, it only returns what write in parquet file. It does not consider the difference between legacy hybrid (Julian Gregorian) calendar and Proleptic Gregorian calendar. The result may be different with vanilla spark. diff --git a/gluten-ut/spark32/src/test/scala/io/glutenproject/utils/clickhouse/ClickHouseTestSettings.scala b/gluten-ut/spark32/src/test/scala/io/glutenproject/utils/clickhouse/ClickHouseTestSettings.scala index c08803a30533..13769ef6cde7 100644 --- a/gluten-ut/spark32/src/test/scala/io/glutenproject/utils/clickhouse/ClickHouseTestSettings.scala +++ b/gluten-ut/spark32/src/test/scala/io/glutenproject/utils/clickhouse/ClickHouseTestSettings.scala @@ -319,6 +319,7 @@ class ClickHouseTestSettings extends BackendTestSettings { enableSuite[GlutenFileBasedDataSourceSuite] .exclude("SPARK-23072 Write and read back unicode column names - csv") .excludeByPrefix("Enabling/disabling ignoreMissingFiles using") + .excludeGlutenTestsByPrefix("Enabling/disabling ignoreMissingFiles using") .exclude("Spark native readers should respect spark.sql.caseSensitive - parquet") .exclude("Spark native readers should respect spark.sql.caseSensitive - orc") .exclude("SPARK-25237 compute correct input metrics in FileScanRDD") diff --git a/gluten-ut/spark32/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark32/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala index 2636ba32ea28..0087c6795a75 100644 --- a/gluten-ut/spark32/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark32/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala @@ -367,7 +367,7 @@ class VeloxTestSettings extends BackendTestSettings { .excludeByPrefix("SPARK-22790") // plan is different cause metric is different, rewrite .excludeByPrefix("SPARK-25237") - // ignoreMissingFiles mode, wait to fix + // ignoreMissingFiles mode: error msg from velox is different, rewrite .exclude("Enabling/disabling ignoreMissingFiles using parquet") .exclude("Enabling/disabling ignoreMissingFiles using orc") .exclude("Spark native readers should respect spark.sql.caseSensitive - orc") diff --git a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/GlutenFileBasedDataSourceSuite.scala b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/GlutenFileBasedDataSourceSuite.scala index aec1453e586b..10a0bb25e6a6 100644 --- a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/GlutenFileBasedDataSourceSuite.scala +++ b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/GlutenFileBasedDataSourceSuite.scala @@ -16,13 +16,16 @@ */ package org.apache.spark.sql -import org.apache.spark.SparkConf +import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} +import org.apache.spark.sql.GlutenTestConstants.GLUTEN_TEST import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, SortMergeJoinExec} import org.apache.spark.sql.internal.SQLConf import org.apache.hadoop.fs.Path +import java.io.FileNotFoundException + import scala.collection.mutable class GlutenFileBasedDataSourceSuite extends FileBasedDataSourceSuite with GlutenSQLTestsTrait { @@ -173,4 +176,70 @@ class GlutenFileBasedDataSourceSuite extends FileBasedDataSourceSuite with Glute } } + Seq("orc", "parquet").foreach { + format => + testQuietly(GLUTEN_TEST + s"Enabling/disabling ignoreMissingFiles using $format") { + def testIgnoreMissingFiles(options: Map[String, String]): Unit = { + withTempDir { + dir => + val basePath = dir.getCanonicalPath + + Seq("0").toDF("a").write.format(format).save(new Path(basePath, "second").toString) + Seq("1").toDF("a").write.format(format).save(new Path(basePath, "fourth").toString) + + val firstPath = new Path(basePath, "first") + val thirdPath = new Path(basePath, "third") + val fs = thirdPath.getFileSystem(spark.sessionState.newHadoopConf()) + Seq("2").toDF("a").write.format(format).save(firstPath.toString) + Seq("3").toDF("a").write.format(format).save(thirdPath.toString) + val files = Seq(firstPath, thirdPath).flatMap { + p => fs.listStatus(p).filter(_.isFile).map(_.getPath) + } + + val df = spark.read + .options(options) + .format(format) + .load( + new Path(basePath, "first").toString, + new Path(basePath, "second").toString, + new Path(basePath, "third").toString, + new Path(basePath, "fourth").toString) + + // Make sure all data files are deleted and can't be opened. + files.foreach(f => fs.delete(f, false)) + assert(fs.delete(thirdPath, true)) + for (f <- files) { + intercept[FileNotFoundException](fs.open(f)) + } + + checkAnswer(df, Seq(Row("0"), Row("1"))) + } + } + + // Test set ignoreMissingFiles via SQL Conf + // Rewrite this test as error msg is different from velox + for { + (ignore, options, sqlConf) <- Seq( + // Set via SQL Conf: leave options empty + ("true", Map.empty[String, String], "true"), + ("false", Map.empty[String, String], "false") + ) + sources <- Seq("", format) + } { + withSQLConf( + SQLConf.USE_V1_SOURCE_LIST.key -> sources, + SQLConf.IGNORE_MISSING_FILES.key -> sqlConf) { + if (ignore.toBoolean) { + testIgnoreMissingFiles(options) + } else { + val exception = intercept[SparkException] { + testIgnoreMissingFiles(options) + } + assert(exception.getMessage().contains("No such file or directory")) + } + } + } + } + } + } diff --git a/gluten-ut/spark33/src/test/scala/io/glutenproject/utils/clickhouse/ClickHouseTestSettings.scala b/gluten-ut/spark33/src/test/scala/io/glutenproject/utils/clickhouse/ClickHouseTestSettings.scala index a2988996fe70..161aa0df7896 100644 --- a/gluten-ut/spark33/src/test/scala/io/glutenproject/utils/clickhouse/ClickHouseTestSettings.scala +++ b/gluten-ut/spark33/src/test/scala/io/glutenproject/utils/clickhouse/ClickHouseTestSettings.scala @@ -329,6 +329,7 @@ class ClickHouseTestSettings extends BackendTestSettings { enableSuite[GlutenFileBasedDataSourceSuite] .exclude("SPARK-23072 Write and read back unicode column names - csv") .excludeByPrefix("Enabling/disabling ignoreMissingFiles using") + .excludeGlutenTestsByPrefix("Enabling/disabling ignoreMissingFiles using") .exclude("Spark native readers should respect spark.sql.caseSensitive - parquet") .exclude("Spark native readers should respect spark.sql.caseSensitive - orc") .exclude("SPARK-25237 compute correct input metrics in FileScanRDD") diff --git a/gluten-ut/spark33/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark33/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala index 2df6ccd186e4..b134ad28ce15 100644 --- a/gluten-ut/spark33/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark33/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala @@ -1055,7 +1055,7 @@ class VeloxTestSettings extends BackendTestSettings { .excludeByPrefix("SPARK-22790") // plan is different cause metric is different, rewrite .excludeByPrefix("SPARK-25237") - // ignoreMissingFiles mode, wait to fix + // ignoreMissingFiles mode: error msg from velox is different, rewrite .exclude("Enabling/disabling ignoreMissingFiles using parquet") .exclude("Enabling/disabling ignoreMissingFiles using orc") .exclude("Spark native readers should respect spark.sql.caseSensitive - orc") diff --git a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/GlutenFileBasedDataSourceSuite.scala b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/GlutenFileBasedDataSourceSuite.scala index 3927b49e4f15..44b45b62954b 100644 --- a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/GlutenFileBasedDataSourceSuite.scala +++ b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/GlutenFileBasedDataSourceSuite.scala @@ -16,13 +16,16 @@ */ package org.apache.spark.sql -import org.apache.spark.SparkConf +import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} +import org.apache.spark.sql.GlutenTestConstants.GLUTEN_TEST import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, SortMergeJoinExec} import org.apache.spark.sql.internal.SQLConf import org.apache.hadoop.fs.Path +import java.io.FileNotFoundException + import scala.collection.mutable class GlutenFileBasedDataSourceSuite extends FileBasedDataSourceSuite with GlutenSQLTestsTrait { @@ -174,4 +177,70 @@ class GlutenFileBasedDataSourceSuite extends FileBasedDataSourceSuite with Glute } } + Seq("orc", "parquet").foreach { + format => + testQuietly(GLUTEN_TEST + s"Enabling/disabling ignoreMissingFiles using $format") { + def testIgnoreMissingFiles(options: Map[String, String]): Unit = { + withTempDir { + dir => + val basePath = dir.getCanonicalPath + + Seq("0").toDF("a").write.format(format).save(new Path(basePath, "second").toString) + Seq("1").toDF("a").write.format(format).save(new Path(basePath, "fourth").toString) + + val firstPath = new Path(basePath, "first") + val thirdPath = new Path(basePath, "third") + val fs = thirdPath.getFileSystem(spark.sessionState.newHadoopConf()) + Seq("2").toDF("a").write.format(format).save(firstPath.toString) + Seq("3").toDF("a").write.format(format).save(thirdPath.toString) + val files = Seq(firstPath, thirdPath).flatMap { + p => fs.listStatus(p).filter(_.isFile).map(_.getPath) + } + + val df = spark.read + .options(options) + .format(format) + .load( + new Path(basePath, "first").toString, + new Path(basePath, "second").toString, + new Path(basePath, "third").toString, + new Path(basePath, "fourth").toString) + + // Make sure all data files are deleted and can't be opened. + files.foreach(f => fs.delete(f, false)) + assert(fs.delete(thirdPath, true)) + for (f <- files) { + intercept[FileNotFoundException](fs.open(f)) + } + + checkAnswer(df, Seq(Row("0"), Row("1"))) + } + } + + // Test set ignoreMissingFiles via SQL Conf + // Rewrite this test as error msg is different from velox + for { + (ignore, options, sqlConf) <- Seq( + // Set via SQL Conf: leave options empty + ("true", Map.empty[String, String], "true"), + ("false", Map.empty[String, String], "false") + ) + sources <- Seq("", format) + } { + withSQLConf( + SQLConf.USE_V1_SOURCE_LIST.key -> sources, + SQLConf.IGNORE_MISSING_FILES.key -> sqlConf) { + if (ignore.toBoolean) { + testIgnoreMissingFiles(options) + } else { + val exception = intercept[SparkException] { + testIgnoreMissingFiles(options) + } + assert(exception.getMessage().contains("No such file or directory")) + } + } + } + } + } + } diff --git a/gluten-ut/spark34/src/test/scala/io/glutenproject/utils/clickhouse/ClickHouseTestSettings.scala b/gluten-ut/spark34/src/test/scala/io/glutenproject/utils/clickhouse/ClickHouseTestSettings.scala index f0f57aa87dae..83de1418b859 100644 --- a/gluten-ut/spark34/src/test/scala/io/glutenproject/utils/clickhouse/ClickHouseTestSettings.scala +++ b/gluten-ut/spark34/src/test/scala/io/glutenproject/utils/clickhouse/ClickHouseTestSettings.scala @@ -327,6 +327,7 @@ class ClickHouseTestSettings extends BackendTestSettings { enableSuite[GlutenFileBasedDataSourceSuite] .exclude("SPARK-23072 Write and read back unicode column names - csv") .excludeByPrefix("Enabling/disabling ignoreMissingFiles using") + .excludeGlutenTestsByPrefix("Enabling/disabling ignoreMissingFiles using") .exclude("Spark native readers should respect spark.sql.caseSensitive - parquet") .exclude("Spark native readers should respect spark.sql.caseSensitive - orc") .exclude("SPARK-25237 compute correct input metrics in FileScanRDD") diff --git a/gluten-ut/spark34/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark34/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala index abc909e45acb..d4e921a21198 100644 --- a/gluten-ut/spark34/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark34/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala @@ -1053,7 +1053,7 @@ class VeloxTestSettings extends BackendTestSettings { .excludeByPrefix("SPARK-22790") // plan is different cause metric is different, rewrite .excludeByPrefix("SPARK-25237") - // ignoreMissingFiles mode, wait to fix + // error msg from velox is different & reader options is not supported, rewrite .exclude("Enabling/disabling ignoreMissingFiles using parquet") .exclude("Enabling/disabling ignoreMissingFiles using orc") .exclude("Spark native readers should respect spark.sql.caseSensitive - orc") diff --git a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenFileBasedDataSourceSuite.scala b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenFileBasedDataSourceSuite.scala index b4d693956c1f..0ea52cf9f52f 100644 --- a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenFileBasedDataSourceSuite.scala +++ b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenFileBasedDataSourceSuite.scala @@ -16,13 +16,16 @@ */ package org.apache.spark.sql -import org.apache.spark.SparkConf +import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} +import org.apache.spark.sql.GlutenTestConstants.GLUTEN_TEST import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, SortMergeJoinExec} import org.apache.spark.sql.internal.SQLConf import org.apache.hadoop.fs.Path +import java.io.FileNotFoundException + import scala.collection.mutable class GlutenFileBasedDataSourceSuite extends FileBasedDataSourceSuite with GlutenSQLTestsTrait { @@ -174,4 +177,71 @@ class GlutenFileBasedDataSourceSuite extends FileBasedDataSourceSuite with Glute } } + Seq("orc", "parquet").foreach { + format => + testQuietly(GLUTEN_TEST + s"Enabling/disabling ignoreMissingFiles using $format") { + def testIgnoreMissingFiles(options: Map[String, String]): Unit = { + withTempDir { + dir => + val basePath = dir.getCanonicalPath + + Seq("0").toDF("a").write.format(format).save(new Path(basePath, "second").toString) + Seq("1").toDF("a").write.format(format).save(new Path(basePath, "fourth").toString) + + val firstPath = new Path(basePath, "first") + val thirdPath = new Path(basePath, "third") + val fs = thirdPath.getFileSystem(spark.sessionState.newHadoopConf()) + Seq("2").toDF("a").write.format(format).save(firstPath.toString) + Seq("3").toDF("a").write.format(format).save(thirdPath.toString) + val files = Seq(firstPath, thirdPath).flatMap { + p => fs.listStatus(p).filter(_.isFile).map(_.getPath) + } + + val df = spark.read + .options(options) + .format(format) + .load( + new Path(basePath, "first").toString, + new Path(basePath, "second").toString, + new Path(basePath, "third").toString, + new Path(basePath, "fourth").toString) + + // Make sure all data files are deleted and can't be opened. + files.foreach(f => fs.delete(f, false)) + assert(fs.delete(thirdPath, true)) + for (f <- files) { + intercept[FileNotFoundException](fs.open(f)) + } + + checkAnswer(df, Seq(Row("0"), Row("1"))) + } + } + + // Test set ignoreMissingFiles via SQL Conf + // Rewrite this test as error msg is different from velox and data Source reader options + // is not supported. + for { + (ignore, options, sqlConf) <- Seq( + // Set via SQL Conf: leave options empty + ("true", Map.empty[String, String], "true"), + ("false", Map.empty[String, String], "false") + ) + sources <- Seq("", format) + } { + withSQLConf( + SQLConf.USE_V1_SOURCE_LIST.key -> sources, + SQLConf.IGNORE_MISSING_FILES.key -> sqlConf) { + if (ignore.toBoolean) { + testIgnoreMissingFiles(options) + } else { + val exception = intercept[SparkException] { + testIgnoreMissingFiles(options) + } + assert(exception.getMessage().contains("No such file or directory")) + } + } + } + } + } + } diff --git a/shims/common/src/main/scala/io/glutenproject/GlutenConfig.scala b/shims/common/src/main/scala/io/glutenproject/GlutenConfig.scala index 18d9bd65c669..856d82401421 100644 --- a/shims/common/src/main/scala/io/glutenproject/GlutenConfig.scala +++ b/shims/common/src/main/scala/io/glutenproject/GlutenConfig.scala @@ -529,7 +529,8 @@ object GlutenConfig { }) val keyWithDefault = ImmutableList.of( - (SQLConf.CASE_SENSITIVE.key, "false") + (SQLConf.CASE_SENSITIVE.key, "false"), + (SQLConf.IGNORE_MISSING_FILES.key, "false") ) keyWithDefault.forEach(e => nativeConfMap.put(e._1, conf.getOrElse(e._1, e._2)))