Skip to content

Commit

Permalink
[Gluten-4585][VL] Support spark.sql.files.ignoreMissingFiles=true (#4725
Browse files Browse the repository at this point in the history
)
  • Loading branch information
zhli1142015 authored Feb 21, 2024
1 parent b885906 commit 996ff4c
Show file tree
Hide file tree
Showing 13 changed files with 224 additions and 9 deletions.
2 changes: 2 additions & 0 deletions cpp/core/config/GlutenConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ const std::string kLegacySize = "spark.sql.legacy.sizeOfNull";

const std::string kSessionTimezone = "spark.sql.session.timeZone";

const std::string kIgnoreMissingFiles = "spark.sql.files.ignoreMissingFiles";

const std::string kDefaultSessionTimezone = "spark.gluten.sql.session.timeZone.default";

const std::string kSparkOffHeapMemory = "spark.gluten.memory.offHeap.size.in.bytes";
Expand Down
3 changes: 2 additions & 1 deletion cpp/velox/compute/WholeStageResultIterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,8 @@ std::shared_ptr<velox::Config> WholeStageResultIterator::createConnectorConfig()
configs[velox::connector::hive::HiveConfig::kArrowBridgeTimestampUnit] = "6";
configs[velox::connector::hive::HiveConfig::kMaxPartitionsPerWritersSession] =
std::to_string(veloxCfg_->get<int32_t>(kMaxPartitions, 10000));

configs[velox::connector::hive::HiveConfig::kIgnoreMissingFilesSession] =
std::to_string(veloxCfg_->get<bool>(kIgnoreMissingFiles, false));
return std::make_shared<velox::core::MemConfig>(configs);
}

Expand Down
2 changes: 1 addition & 1 deletion docs/velox-backend-limitations.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ In certain cases, Gluten result may be different from Vanilla spark.
Velox only supports double quotes surrounded strings, not single quotes, in JSON data. If single quotes are used, gluten will produce incorrect result.

#### Parquet read conf
Gluten supports `spark.files.ignoreCorruptFiles` and `spark.files.ignoreMissingFiles` with default false, if true, the behavior is same as config false.
Gluten supports `spark.files.ignoreCorruptFiles` with default false, if true, the behavior is same as config false.
Gluten ignores `spark.sql.parquet.datetimeRebaseModeInRead`, it only returns what write in parquet file. It does not consider the difference between legacy
hybrid (Julian Gregorian) calendar and Proleptic Gregorian calendar. The result may be different with vanilla spark.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,7 @@ class ClickHouseTestSettings extends BackendTestSettings {
enableSuite[GlutenFileBasedDataSourceSuite]
.exclude("SPARK-23072 Write and read back unicode column names - csv")
.excludeByPrefix("Enabling/disabling ignoreMissingFiles using")
.excludeGlutenTestsByPrefix("Enabling/disabling ignoreMissingFiles using")
.exclude("Spark native readers should respect spark.sql.caseSensitive - parquet")
.exclude("Spark native readers should respect spark.sql.caseSensitive - orc")
.exclude("SPARK-25237 compute correct input metrics in FileScanRDD")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ class VeloxTestSettings extends BackendTestSettings {
.excludeByPrefix("SPARK-22790")
// plan is different cause metric is different, rewrite
.excludeByPrefix("SPARK-25237")
// ignoreMissingFiles mode, wait to fix
// ignoreMissingFiles mode: error msg from velox is different, rewrite
.exclude("Enabling/disabling ignoreMissingFiles using parquet")
.exclude("Enabling/disabling ignoreMissingFiles using orc")
.exclude("Spark native readers should respect spark.sql.caseSensitive - orc")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@
*/
package org.apache.spark.sql

import org.apache.spark.SparkConf
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import org.apache.spark.sql.GlutenTestConstants.GLUTEN_TEST
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, SortMergeJoinExec}
import org.apache.spark.sql.internal.SQLConf

import org.apache.hadoop.fs.Path

import java.io.FileNotFoundException

import scala.collection.mutable

class GlutenFileBasedDataSourceSuite extends FileBasedDataSourceSuite with GlutenSQLTestsTrait {
Expand Down Expand Up @@ -173,4 +176,70 @@ class GlutenFileBasedDataSourceSuite extends FileBasedDataSourceSuite with Glute
}
}

Seq("orc", "parquet").foreach {
format =>
testQuietly(GLUTEN_TEST + s"Enabling/disabling ignoreMissingFiles using $format") {
def testIgnoreMissingFiles(options: Map[String, String]): Unit = {
withTempDir {
dir =>
val basePath = dir.getCanonicalPath

Seq("0").toDF("a").write.format(format).save(new Path(basePath, "second").toString)
Seq("1").toDF("a").write.format(format).save(new Path(basePath, "fourth").toString)

val firstPath = new Path(basePath, "first")
val thirdPath = new Path(basePath, "third")
val fs = thirdPath.getFileSystem(spark.sessionState.newHadoopConf())
Seq("2").toDF("a").write.format(format).save(firstPath.toString)
Seq("3").toDF("a").write.format(format).save(thirdPath.toString)
val files = Seq(firstPath, thirdPath).flatMap {
p => fs.listStatus(p).filter(_.isFile).map(_.getPath)
}

val df = spark.read
.options(options)
.format(format)
.load(
new Path(basePath, "first").toString,
new Path(basePath, "second").toString,
new Path(basePath, "third").toString,
new Path(basePath, "fourth").toString)

// Make sure all data files are deleted and can't be opened.
files.foreach(f => fs.delete(f, false))
assert(fs.delete(thirdPath, true))
for (f <- files) {
intercept[FileNotFoundException](fs.open(f))
}

checkAnswer(df, Seq(Row("0"), Row("1")))
}
}

// Test set ignoreMissingFiles via SQL Conf
// Rewrite this test as error msg is different from velox
for {
(ignore, options, sqlConf) <- Seq(
// Set via SQL Conf: leave options empty
("true", Map.empty[String, String], "true"),
("false", Map.empty[String, String], "false")
)
sources <- Seq("", format)
} {
withSQLConf(
SQLConf.USE_V1_SOURCE_LIST.key -> sources,
SQLConf.IGNORE_MISSING_FILES.key -> sqlConf) {
if (ignore.toBoolean) {
testIgnoreMissingFiles(options)
} else {
val exception = intercept[SparkException] {
testIgnoreMissingFiles(options)
}
assert(exception.getMessage().contains("No such file or directory"))
}
}
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,7 @@ class ClickHouseTestSettings extends BackendTestSettings {
enableSuite[GlutenFileBasedDataSourceSuite]
.exclude("SPARK-23072 Write and read back unicode column names - csv")
.excludeByPrefix("Enabling/disabling ignoreMissingFiles using")
.excludeGlutenTestsByPrefix("Enabling/disabling ignoreMissingFiles using")
.exclude("Spark native readers should respect spark.sql.caseSensitive - parquet")
.exclude("Spark native readers should respect spark.sql.caseSensitive - orc")
.exclude("SPARK-25237 compute correct input metrics in FileScanRDD")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1055,7 +1055,7 @@ class VeloxTestSettings extends BackendTestSettings {
.excludeByPrefix("SPARK-22790")
// plan is different cause metric is different, rewrite
.excludeByPrefix("SPARK-25237")
// ignoreMissingFiles mode, wait to fix
// ignoreMissingFiles mode: error msg from velox is different, rewrite
.exclude("Enabling/disabling ignoreMissingFiles using parquet")
.exclude("Enabling/disabling ignoreMissingFiles using orc")
.exclude("Spark native readers should respect spark.sql.caseSensitive - orc")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@
*/
package org.apache.spark.sql

import org.apache.spark.SparkConf
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import org.apache.spark.sql.GlutenTestConstants.GLUTEN_TEST
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, SortMergeJoinExec}
import org.apache.spark.sql.internal.SQLConf

import org.apache.hadoop.fs.Path

import java.io.FileNotFoundException

import scala.collection.mutable

class GlutenFileBasedDataSourceSuite extends FileBasedDataSourceSuite with GlutenSQLTestsTrait {
Expand Down Expand Up @@ -174,4 +177,70 @@ class GlutenFileBasedDataSourceSuite extends FileBasedDataSourceSuite with Glute
}
}

Seq("orc", "parquet").foreach {
format =>
testQuietly(GLUTEN_TEST + s"Enabling/disabling ignoreMissingFiles using $format") {
def testIgnoreMissingFiles(options: Map[String, String]): Unit = {
withTempDir {
dir =>
val basePath = dir.getCanonicalPath

Seq("0").toDF("a").write.format(format).save(new Path(basePath, "second").toString)
Seq("1").toDF("a").write.format(format).save(new Path(basePath, "fourth").toString)

val firstPath = new Path(basePath, "first")
val thirdPath = new Path(basePath, "third")
val fs = thirdPath.getFileSystem(spark.sessionState.newHadoopConf())
Seq("2").toDF("a").write.format(format).save(firstPath.toString)
Seq("3").toDF("a").write.format(format).save(thirdPath.toString)
val files = Seq(firstPath, thirdPath).flatMap {
p => fs.listStatus(p).filter(_.isFile).map(_.getPath)
}

val df = spark.read
.options(options)
.format(format)
.load(
new Path(basePath, "first").toString,
new Path(basePath, "second").toString,
new Path(basePath, "third").toString,
new Path(basePath, "fourth").toString)

// Make sure all data files are deleted and can't be opened.
files.foreach(f => fs.delete(f, false))
assert(fs.delete(thirdPath, true))
for (f <- files) {
intercept[FileNotFoundException](fs.open(f))
}

checkAnswer(df, Seq(Row("0"), Row("1")))
}
}

// Test set ignoreMissingFiles via SQL Conf
// Rewrite this test as error msg is different from velox
for {
(ignore, options, sqlConf) <- Seq(
// Set via SQL Conf: leave options empty
("true", Map.empty[String, String], "true"),
("false", Map.empty[String, String], "false")
)
sources <- Seq("", format)
} {
withSQLConf(
SQLConf.USE_V1_SOURCE_LIST.key -> sources,
SQLConf.IGNORE_MISSING_FILES.key -> sqlConf) {
if (ignore.toBoolean) {
testIgnoreMissingFiles(options)
} else {
val exception = intercept[SparkException] {
testIgnoreMissingFiles(options)
}
assert(exception.getMessage().contains("No such file or directory"))
}
}
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,7 @@ class ClickHouseTestSettings extends BackendTestSettings {
enableSuite[GlutenFileBasedDataSourceSuite]
.exclude("SPARK-23072 Write and read back unicode column names - csv")
.excludeByPrefix("Enabling/disabling ignoreMissingFiles using")
.excludeGlutenTestsByPrefix("Enabling/disabling ignoreMissingFiles using")
.exclude("Spark native readers should respect spark.sql.caseSensitive - parquet")
.exclude("Spark native readers should respect spark.sql.caseSensitive - orc")
.exclude("SPARK-25237 compute correct input metrics in FileScanRDD")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1053,7 +1053,7 @@ class VeloxTestSettings extends BackendTestSettings {
.excludeByPrefix("SPARK-22790")
// plan is different cause metric is different, rewrite
.excludeByPrefix("SPARK-25237")
// ignoreMissingFiles mode, wait to fix
// error msg from velox is different & reader options is not supported, rewrite
.exclude("Enabling/disabling ignoreMissingFiles using parquet")
.exclude("Enabling/disabling ignoreMissingFiles using orc")
.exclude("Spark native readers should respect spark.sql.caseSensitive - orc")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@
*/
package org.apache.spark.sql

import org.apache.spark.SparkConf
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import org.apache.spark.sql.GlutenTestConstants.GLUTEN_TEST
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, SortMergeJoinExec}
import org.apache.spark.sql.internal.SQLConf

import org.apache.hadoop.fs.Path

import java.io.FileNotFoundException

import scala.collection.mutable

class GlutenFileBasedDataSourceSuite extends FileBasedDataSourceSuite with GlutenSQLTestsTrait {
Expand Down Expand Up @@ -174,4 +177,71 @@ class GlutenFileBasedDataSourceSuite extends FileBasedDataSourceSuite with Glute
}
}

Seq("orc", "parquet").foreach {
format =>
testQuietly(GLUTEN_TEST + s"Enabling/disabling ignoreMissingFiles using $format") {
def testIgnoreMissingFiles(options: Map[String, String]): Unit = {
withTempDir {
dir =>
val basePath = dir.getCanonicalPath

Seq("0").toDF("a").write.format(format).save(new Path(basePath, "second").toString)
Seq("1").toDF("a").write.format(format).save(new Path(basePath, "fourth").toString)

val firstPath = new Path(basePath, "first")
val thirdPath = new Path(basePath, "third")
val fs = thirdPath.getFileSystem(spark.sessionState.newHadoopConf())
Seq("2").toDF("a").write.format(format).save(firstPath.toString)
Seq("3").toDF("a").write.format(format).save(thirdPath.toString)
val files = Seq(firstPath, thirdPath).flatMap {
p => fs.listStatus(p).filter(_.isFile).map(_.getPath)
}

val df = spark.read
.options(options)
.format(format)
.load(
new Path(basePath, "first").toString,
new Path(basePath, "second").toString,
new Path(basePath, "third").toString,
new Path(basePath, "fourth").toString)

// Make sure all data files are deleted and can't be opened.
files.foreach(f => fs.delete(f, false))
assert(fs.delete(thirdPath, true))
for (f <- files) {
intercept[FileNotFoundException](fs.open(f))
}

checkAnswer(df, Seq(Row("0"), Row("1")))
}
}

// Test set ignoreMissingFiles via SQL Conf
// Rewrite this test as error msg is different from velox and data Source reader options
// is not supported.
for {
(ignore, options, sqlConf) <- Seq(
// Set via SQL Conf: leave options empty
("true", Map.empty[String, String], "true"),
("false", Map.empty[String, String], "false")
)
sources <- Seq("", format)
} {
withSQLConf(
SQLConf.USE_V1_SOURCE_LIST.key -> sources,
SQLConf.IGNORE_MISSING_FILES.key -> sqlConf) {
if (ignore.toBoolean) {
testIgnoreMissingFiles(options)
} else {
val exception = intercept[SparkException] {
testIgnoreMissingFiles(options)
}
assert(exception.getMessage().contains("No such file or directory"))
}
}
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,8 @@ object GlutenConfig {
})

val keyWithDefault = ImmutableList.of(
(SQLConf.CASE_SENSITIVE.key, "false")
(SQLConf.CASE_SENSITIVE.key, "false"),
(SQLConf.IGNORE_MISSING_FILES.key, "false")
)
keyWithDefault.forEach(e => nativeConfMap.put(e._1, conf.getOrElse(e._1, e._2)))

Expand Down

0 comments on commit 996ff4c

Please sign in to comment.