Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Fix some regressions with Spark 3.5.1 #674

Merged
merged 2 commits into from
Jul 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 1 addition & 126 deletions dev/diffs/3.5.1.diff
Original file line number Diff line number Diff line change
Expand Up @@ -1532,107 +1532,6 @@ index 68bae34790a..ea906fd1adc 100644
}
assert(shuffles2.size == 4)
val smj2 = findTopLevelSortMergeJoin(adaptive2)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala
index 15055a276fa..6e60b94dc3d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala
@@ -23,7 +23,7 @@ import java.text.SimpleDateFormat

import org.apache.spark.TestUtils
import org.apache.spark.paths.SparkPath
-import org.apache.spark.sql.{AnalysisException, Column, DataFrame, QueryTest, Row}
+import org.apache.spark.sql.{AnalysisException, Column, DataFrame, IgnoreComet, QueryTest, Row}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.trees.TreeNodeTag
import org.apache.spark.sql.execution.FileSourceScanExec
@@ -116,7 +116,9 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession {
testName: String, fileSchema: StructType)
(f: (DataFrame, Map[String, Any], Map[String, Any]) => Unit): Unit = {
Seq("json", "parquet").foreach { testFileFormat =>
- test(s"metadata struct ($testFileFormat): " + testName) {
+ test(s"metadata struct ($testFileFormat): " + testName,
+ // https://github.com/apache/datafusion-comet/issues/617
+ IgnoreComet("TODO: fix Comet for this test")) {
withTempDir { dir =>
import scala.collection.JavaConverters._

@@ -767,7 +769,9 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession {

Seq(true, false).foreach { useVectorizedReader =>
val label = if (useVectorizedReader) "reading batches" else "reading rows"
- test(s"SPARK-39806: metadata for a partitioned table ($label)") {
+ test(s"SPARK-39806: metadata for a partitioned table ($label)",
+ // https://github.com/apache/datafusion-comet/issues/617
+ IgnoreComet("TODO: fix Comet for this test")) {
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> useVectorizedReader.toString) {
withTempPath { dir =>
// Store dynamically partitioned data.
@@ -789,7 +793,9 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession {
}

Seq("parquet", "orc").foreach { format =>
- test(s"SPARK-40918: Output cols around WSCG.isTooManyFields limit in $format") {
+ test(s"SPARK-40918: Output cols around WSCG.isTooManyFields limit in $format",
+ // https://github.com/apache/datafusion-comet/issues/617
+ IgnoreComet("TODO: fix Comet for this test")) {
// The issue was that ParquetFileFormat would not count the _metadata columns towards
// the WholeStageCodegenExec.isTooManyFields limit, while FileSourceScanExec would,
// resulting in Parquet reader returning columnar output, while scan expected row.
@@ -862,7 +868,9 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession {
}
}

- test("SPARK-41896: Filter on constant and generated metadata attributes at the same time") {
+ test("SPARK-41896: Filter on constant and generated metadata attributes at the same time",
+ // https://github.com/apache/datafusion-comet/issues/617
+ IgnoreComet("TODO: fix Comet for this test")) {
withTempPath { dir =>
val idColumnName = "id"
val partitionColumnName = "partition"
@@ -897,7 +905,9 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession {
}
}

- test("SPARK-41896: Filter by a function that takes the metadata struct as argument") {
+ test("SPARK-41896: Filter by a function that takes the metadata struct as argument",
+ // https://github.com/apache/datafusion-comet/issues/617
+ IgnoreComet("TODO: fix Comet for this test")) {
withTempPath { dir =>
val idColumnName = "id"
val numFiles = 4
@@ -984,7 +994,9 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession {


Seq("parquet", "json", "csv", "text", "orc").foreach { format =>
- test(s"metadata file path is url encoded for format: $format") {
+ test(s"metadata file path is url encoded for format: $format",
+ // https://github.com/apache/datafusion-comet/issues/617
+ IgnoreComet("TODO: fix Comet for this test")) {
withTempPath { f =>
val dirWithSpace = s"$f/with space"
spark.range(10)
@@ -1002,7 +1014,9 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession {
}
}

- test(s"metadata file name is url encoded for format: $format") {
+ test(s"metadata file name is url encoded for format: $format",
+ // https://github.com/apache/datafusion-comet/issues/617
+ IgnoreComet("TODO: fix Comet for this test")) {
val suffix = if (format == "text") ".txt" else s".$format"
withTempPath { f =>
val dirWithSpace = s"$f/with space"
@@ -1056,7 +1070,9 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession {
}
}

- test("SPARK-43450: Filter on full _metadata column struct") {
+ test("SPARK-43450: Filter on full _metadata column struct",
+ // https://github.com/apache/datafusion-comet/issues/617
+ IgnoreComet("TODO: fix Comet for this test")) {
withTempPath { dir =>
val numRows = 10
spark.range(end = numRows)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCustomMetadataStructSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCustomMetadataStructSuite.scala
index 05872d41131..a2c328b9742 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCustomMetadataStructSuite.scala
Expand Down Expand Up @@ -1756,30 +1655,6 @@ index 07e2849ce6f..3e73645b638 100644
val extraOptions = Map[String, String](
ParquetOutputFormat.WRITER_VERSION -> ParquetProperties.WriterVersion.PARQUET_2_0.toString
)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileMetadataStructRowIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileMetadataStructRowIndexSuite.scala
index c10e1799702..f18ca092dba 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileMetadataStructRowIndexSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileMetadataStructRowIndexSuite.scala
@@ -16,7 +16,7 @@
*/
package org.apache.spark.sql.execution.datasources.parquet

-import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest}
+import org.apache.spark.sql.{AnalysisException, DataFrame, IgnoreComet, QueryTest}
import org.apache.spark.sql.execution.datasources.FileFormat
import org.apache.spark.sql.functions.{col, lit}
import org.apache.spark.sql.internal.SQLConf
@@ -219,7 +219,9 @@ class ParquetFileMetadataStructRowIndexSuite extends QueryTest with SharedSparkS
}
}

- test(s"read user created ${FileFormat.METADATA_NAME}.${ROW_INDEX} column") {
+ // https://github.com/apache/datafusion-comet/issues/617
+ test(s"read user created ${FileFormat.METADATA_NAME}.${ROW_INDEX} column",
+ IgnoreComet("TODO: fix Comet for this test")) {
withReadDataFrame("parquet", partitionCol = "pb") { df =>
withTempPath { dir =>
// The `df` has 10 input files with 10 rows each. Therefore the `_metadata.row_index` values
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index 8e88049f51e..98d1eb07493 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
Expand Down Expand Up @@ -2589,7 +2464,7 @@ index abe606ad9c1..2d930b64cca 100644
val tblTargetName = "tbl_target"
val tblSourceQualified = s"default.$tblSourceName"
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
index dd55fcfe42c..293e9dc2986 100644
index dd55fcfe42c..e7fcd0a9e6a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
@@ -41,6 +41,7 @@ import org.apache.spark.sql.catalyst.plans.PlanTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,13 @@

package org.apache.spark.sql.comet.shims

import org.apache.comet.shims.ShimFileFormat

import org.apache.hadoop.fs.Path

import org.apache.spark.SparkException
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.{FileSourceScanExec, PartitionedFileUtil}
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.parquet.ParquetOptions
Expand All @@ -49,16 +48,14 @@ trait ShimCometScanExec {
filePartitions,
readSchema,
fileConstantMetadataColumns,
Map.empty,
fsRelation.fileFormat.fileConstantMetadataExtractors,
options)

protected def invalidBucketFile(path: String, sparkVersion: String): Throwable =
new SparkException("INVALID_BUCKET_FILE", Map("path" -> path), null)
QueryExecutionErrors.invalidBucketFile(path)

protected def isNeededForSchema(sparkSchema: StructType): Boolean = {
// TODO: remove after PARQUET-2161 becomes available in Parquet (tracked in SPARK-39634)
ShimFileFormat.findRowIndexColumnIndexInSchema(sparkSchema) >= 0
}
// see SPARK-39634
protected def isNeededForSchema(sparkSchema: StructType): Boolean = false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We still have this in the 4.0 shim. Is this not needed any more?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nvm. I was looking at the wrong file. This is also changed in the 4.0 shim.

Copy link
Member Author

@andygrove andygrove Jul 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated this to match the version in the 4.0 shims because https://issues.apache.org/jira/browse/SPARK-39634 is marked as fixed in 3.5


protected def getPartitionedFile(f: FileStatusWithMetadata, p: PartitionDirectory): PartitionedFile =
PartitionedFileUtil.getPartitionedFile(f, p.values)
Expand Down
Loading