From e6dd56e631e29ab446fcd6a4c63846a000fdc2d9 Mon Sep 17 00:00:00 2001 From: Zhen Li <10524738+zhli1142015@users.noreply.github.com> Date: Mon, 27 Nov 2023 17:05:07 +0800 Subject: [PATCH] [VL] Fix RoundRobinPartitioner by setting start partition id (#3842) --- .../sql/execution/utils/CHExecUtil.scala | 23 ++++++++++++++----- cpp/core/jni/JniWrapper.cc | 2 ++ cpp/core/shuffle/Options.h | 1 + cpp/core/shuffle/Partitioner.cc | 5 ++-- cpp/core/shuffle/Partitioner.h | 3 ++- cpp/core/shuffle/RoundRobinPartitioner.cc | 18 +++------------ cpp/core/shuffle/RoundRobinPartitioner.h | 3 ++- cpp/core/tests/RoundRobinPartitionerTest.cc | 20 ++++++++-------- cpp/velox/shuffle/VeloxShuffleWriter.cc | 3 ++- ...lebornHashBasedColumnarShuffleWriter.scala | 1 + .../spark/shuffle/GlutenShuffleUtils.scala | 15 ++++++++++++ .../vectorized/ShuffleWriterJniWrapper.java | 7 +++++- .../spark/shuffle/ColumnarShuffleWriter.scala | 3 ++- .../spark/sql/execution/utils/ExecUtil.scala | 10 ++++---- .../utils/velox/VeloxTestSettings.scala | 1 - .../GlutenFileFormatWriterSuite.scala | 15 +----------- .../utils/velox/VeloxTestSettings.scala | 1 - .../GlutenFileFormatWriterSuite.scala | 15 +----------- .../utils/velox/VeloxTestSettings.scala | 1 - .../GlutenFileFormatWriterSuite.scala | 15 +----------- 20 files changed, 74 insertions(+), 88 deletions(-) diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/utils/CHExecUtil.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/utils/CHExecUtil.scala index 3144522c34e5..55be8846a71b 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/utils/CHExecUtil.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/utils/CHExecUtil.scala @@ -27,7 +27,7 @@ import org.apache.spark.ShuffleDependency import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.serializer.Serializer -import org.apache.spark.shuffle.ColumnarShuffleDependency +import org.apache.spark.shuffle.{ColumnarShuffleDependency, GlutenShuffleUtils} import org.apache.spark.shuffle.utils.RangePartitionerBoundsGenerator import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, BindReferences, BoundReference, UnsafeProjection, UnsafeRow} @@ -198,11 +198,12 @@ object CHExecUtil extends Logging { } new NativePartitioning( - "hash", + GlutenShuffleUtils.HashPartitioningShortName, partitoining.numPartitions, Array.empty[Byte], hashFields.mkString(",").getBytes(), - outputFields.mkString(",").getBytes()) + outputFields.mkString(",").getBytes() + ) } private def buildPartitioningOptions(nativePartitioning: NativePartitioning): IteratorOptions = { @@ -244,9 +245,19 @@ object CHExecUtil extends Logging { } val nativePartitioning: NativePartitioning = newPartitioning match { case SinglePartition => - new NativePartitioning("single", 1, Array.empty[Byte], Array.empty[Byte], requiredFields) + new NativePartitioning( + GlutenShuffleUtils.SinglePartitioningShortName, + 1, + Array.empty[Byte], + Array.empty[Byte], + requiredFields) case RoundRobinPartitioning(n) => - new NativePartitioning("rr", n, Array.empty[Byte], Array.empty[Byte], requiredFields) + new NativePartitioning( + GlutenShuffleUtils.RoundRobinPartitioningShortName, + n, + Array.empty[Byte], + Array.empty[Byte], + requiredFields) case HashPartitioning(_, _) => buildHashPartitioning( newPartitioning.asInstanceOf[HashPartitioning], @@ -282,7 +293,7 @@ object CHExecUtil extends Logging { Seq[Int]() } new NativePartitioning( - "range", + GlutenShuffleUtils.RangePartitioningShortName, numPartitions, Array.empty[Byte], orderingAndRangeBounds.getBytes(), diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc index eefe8b867a72..f6dec501706d 100644 --- a/cpp/core/jni/JniWrapper.cc +++ b/cpp/core/jni/JniWrapper.cc @@ -783,6 +783,7 @@ JNIEXPORT jlong JNICALL Java_io_glutenproject_vectorized_ShuffleWriterJniWrapper jdouble reallocThreshold, jlong firstBatchHandle, jlong taskAttemptId, + jint startPartitionId, jint pushBufferMaxSize, jobject partitionPusher, jstring partitionWriterTypeJstr) { @@ -825,6 +826,7 @@ JNIEXPORT jlong JNICALL Java_io_glutenproject_vectorized_ShuffleWriterJniWrapper } shuffleWriterOptions.task_attempt_id = (int64_t)taskAttemptId; + shuffleWriterOptions.start_partition_id = startPartitionId; shuffleWriterOptions.compression_threshold = bufferCompressThreshold; auto partitionWriterTypeC = env->GetStringUTFChars(partitionWriterTypeJstr, JNI_FALSE); diff --git a/cpp/core/shuffle/Options.h b/cpp/core/shuffle/Options.h index 129b425dd3a6..8c7a395c33f2 100644 --- a/cpp/core/shuffle/Options.h +++ b/cpp/core/shuffle/Options.h @@ -58,6 +58,7 @@ struct ShuffleWriterOptions { int64_t thread_id = -1; int64_t task_attempt_id = -1; + int32_t start_partition_id = 0; arrow::ipc::IpcWriteOptions ipc_write_options = arrow::ipc::IpcWriteOptions::Defaults(); diff --git a/cpp/core/shuffle/Partitioner.cc b/cpp/core/shuffle/Partitioner.cc index 31afb6e190b0..80b4598a1f17 100644 --- a/cpp/core/shuffle/Partitioner.cc +++ b/cpp/core/shuffle/Partitioner.cc @@ -23,12 +23,13 @@ namespace gluten { -arrow::Result> Partitioner::make(Partitioning partitioning, int32_t numPartitions) { +arrow::Result> +Partitioner::make(Partitioning partitioning, int32_t numPartitions, int32_t startPartitionId) { switch (partitioning) { case Partitioning::kHash: return std::make_shared(numPartitions); case Partitioning::kRoundRobin: - return std::make_shared(numPartitions); + return std::make_shared(numPartitions, startPartitionId); case Partitioning::kSingle: return std::make_shared(); case Partitioning::kRange: diff --git a/cpp/core/shuffle/Partitioner.h b/cpp/core/shuffle/Partitioner.h index c60a15cf45ce..9bb33b4b1fa1 100644 --- a/cpp/core/shuffle/Partitioner.h +++ b/cpp/core/shuffle/Partitioner.h @@ -26,7 +26,8 @@ namespace gluten { class Partitioner { public: - static arrow::Result> make(Partitioning partitioning, int32_t numPartitions); + static arrow::Result> + make(Partitioning partitioning, int32_t numPartitions, int32_t startPartitionId); // Whether the first column is partition key. bool hasPid() const { diff --git a/cpp/core/shuffle/RoundRobinPartitioner.cc b/cpp/core/shuffle/RoundRobinPartitioner.cc index 94de3f9247eb..7c7ff82db7ec 100644 --- a/cpp/core/shuffle/RoundRobinPartitioner.cc +++ b/cpp/core/shuffle/RoundRobinPartitioner.cc @@ -27,21 +27,9 @@ arrow::Status gluten::RoundRobinPartitioner::compute( std::fill(std::begin(partition2RowCount), std::end(partition2RowCount), 0); row2Partition.resize(numRows); - int32_t pidSelection = pidSelection_; - for (int32_t i = 0; i < numRows;) { - int32_t low = i; - int32_t up = std::min((int64_t)(i + (numPartitions_ - pidSelection)), numRows); - for (; low != up;) { - row2Partition[low++] = pidSelection++; - } - - pidSelection_ = pidSelection; - pidSelection = 0; - i = up; - } - - if (pidSelection_ >= numPartitions_) { - pidSelection_ -= numPartitions_; + for (int32_t i = 0; i < numRows; ++i) { + row2Partition[i] = pidSelection_; + pidSelection_ = (pidSelection_ + 1) % numPartitions_; } for (auto& pid : row2Partition) { diff --git a/cpp/core/shuffle/RoundRobinPartitioner.h b/cpp/core/shuffle/RoundRobinPartitioner.h index 8ea15e5afc7c..2366bfcfaeb1 100644 --- a/cpp/core/shuffle/RoundRobinPartitioner.h +++ b/cpp/core/shuffle/RoundRobinPartitioner.h @@ -23,7 +23,8 @@ namespace gluten { class RoundRobinPartitioner final : public Partitioner { public: - RoundRobinPartitioner(int32_t numPartitions) : Partitioner(numPartitions, false) {} + RoundRobinPartitioner(int32_t numPartitions, int32_t startPartitionId) + : Partitioner(numPartitions, false), pidSelection_(startPartitionId % numPartitions) {} arrow::Status compute( const int32_t* pidArr, diff --git a/cpp/core/tests/RoundRobinPartitionerTest.cc b/cpp/core/tests/RoundRobinPartitionerTest.cc index 5fb3e00feb19..9c12b2c7e1ca 100644 --- a/cpp/core/tests/RoundRobinPartitionerTest.cc +++ b/cpp/core/tests/RoundRobinPartitionerTest.cc @@ -21,8 +21,8 @@ namespace gluten { class RoundRobinPartitionerTest : public ::testing::Test { protected: - void prepareData(int numPart) { - partitioner_ = std::make_shared(numPart); + void prepareData(int numPart, int seed) { + partitioner_ = std::make_shared(numPart, seed); row2Partition_.clear(); partition2RowCount_.clear(); partition2RowCount_.resize(numPart); @@ -62,18 +62,18 @@ class RoundRobinPartitionerTest : public ::testing::Test { }; TEST_F(RoundRobinPartitionerTest, TestInit) { - int numPart = 0; - prepareData(numPart); + int numPart = 2; + prepareData(numPart, 3); ASSERT_NE(partitioner_, nullptr); int32_t pidSelection = getPidSelection(); - ASSERT_EQ(pidSelection, 0); + ASSERT_EQ(pidSelection, 1); } TEST_F(RoundRobinPartitionerTest, TestComoputeNormal) { // numRows equal numPart { int numPart = 10; - prepareData(numPart); + prepareData(numPart, 0); int numRows = 10; ASSERT_TRUE(partitioner_->compute(nullptr, numRows, row2Partition_, partition2RowCount_).ok()); ASSERT_EQ(getPidSelection(), 0); @@ -85,7 +85,7 @@ TEST_F(RoundRobinPartitionerTest, TestComoputeNormal) { // numRows less than numPart { int numPart = 10; - prepareData(numPart); + prepareData(numPart, 0); int numRows = 8; ASSERT_TRUE(partitioner_->compute(nullptr, numRows, row2Partition_, partition2RowCount_).ok()); ASSERT_EQ(getPidSelection(), 8); @@ -99,7 +99,7 @@ TEST_F(RoundRobinPartitionerTest, TestComoputeNormal) { // numRows greater than numPart { int numPart = 10; - prepareData(numPart); + prepareData(numPart, 0); int numRows = 12; ASSERT_TRUE(partitioner_->compute(nullptr, numRows, row2Partition_, partition2RowCount_).ok()); ASSERT_EQ(getPidSelection(), 2); @@ -113,7 +113,7 @@ TEST_F(RoundRobinPartitionerTest, TestComoputeNormal) { // numRows greater than 2*numPart { int numPart = 10; - prepareData(numPart); + prepareData(numPart, 0); int numRows = 22; ASSERT_TRUE(partitioner_->compute(nullptr, numRows, row2Partition_, partition2RowCount_).ok()); ASSERT_EQ(getPidSelection(), 2); @@ -127,7 +127,7 @@ TEST_F(RoundRobinPartitionerTest, TestComoputeNormal) { TEST_F(RoundRobinPartitionerTest, TestComoputeContinuous) { int numPart = 10; - prepareData(numPart); + prepareData(numPart, 0); { int numRows = 8; diff --git a/cpp/velox/shuffle/VeloxShuffleWriter.cc b/cpp/velox/shuffle/VeloxShuffleWriter.cc index 1e21d6f3f1fb..7955d24b1f40 100644 --- a/cpp/velox/shuffle/VeloxShuffleWriter.cc +++ b/cpp/velox/shuffle/VeloxShuffleWriter.cc @@ -425,7 +425,8 @@ arrow::Status VeloxShuffleWriter::init() { VELOX_CHECK_NOT_NULL(options_.memory_pool); ARROW_ASSIGN_OR_RAISE(partitionWriter_, partitionWriterCreator_->make(this)); - ARROW_ASSIGN_OR_RAISE(partitioner_, Partitioner::make(options_.partitioning, numPartitions_)); + ARROW_ASSIGN_OR_RAISE( + partitioner_, Partitioner::make(options_.partitioning, numPartitions_, options_.start_partition_id)); // pre-allocated buffer size for each partition, unit is row count // when partitioner is SinglePart, partial variables don`t need init diff --git a/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornHashBasedColumnarShuffleWriter.scala b/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornHashBasedColumnarShuffleWriter.scala index e6b5efbc97eb..f2c28f453514 100644 --- a/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornHashBasedColumnarShuffleWriter.scala +++ b/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornHashBasedColumnarShuffleWriter.scala @@ -104,6 +104,7 @@ class VeloxCelebornHashBasedColumnarShuffleWriter[K, V]( .getNativeInstanceHandle, handle, context.taskAttemptId(), + GlutenShuffleUtils.getStartPartitionId(dep.nativePartitioning, context.partitionId), "celeborn", GlutenConfig.getConf.columnarShuffleReallocThreshold ) diff --git a/gluten-core/src/main/scala/org/apache/spark/shuffle/GlutenShuffleUtils.scala b/gluten-core/src/main/scala/org/apache/spark/shuffle/GlutenShuffleUtils.scala index 6a69335b70db..cf1b503a14ba 100644 --- a/gluten-core/src/main/scala/org/apache/spark/shuffle/GlutenShuffleUtils.scala +++ b/gluten-core/src/main/scala/org/apache/spark/shuffle/GlutenShuffleUtils.scala @@ -18,13 +18,28 @@ package org.apache.spark.shuffle import io.glutenproject.GlutenConfig import io.glutenproject.backendsapi.BackendsApiManager +import io.glutenproject.vectorized.NativePartitioning import org.apache.spark.SparkConf import org.apache.spark.internal.config._ +import org.apache.spark.util.random.XORShiftRandom import java.util.Locale object GlutenShuffleUtils { + val SinglePartitioningShortName = "single" + val RoundRobinPartitioningShortName = "rr" + val HashPartitioningShortName = "hash" + val RangePartitioningShortName = "range" + + def getStartPartitionId(partition: NativePartitioning, partitionId: Int): Int = { + partition.getShortName match { + case RoundRobinPartitioningShortName => + new XORShiftRandom(partitionId).nextInt(partition.getNumPartitions) + case _ => 0 + } + } + def checkCodecValues(codecConf: String, codec: String, validValues: Set[String]): Unit = { if (!validValues.contains(codec)) { throw new IllegalArgumentException( diff --git a/gluten-data/src/main/java/io/glutenproject/vectorized/ShuffleWriterJniWrapper.java b/gluten-data/src/main/java/io/glutenproject/vectorized/ShuffleWriterJniWrapper.java index 4b440c766ce7..75c84e0c4275 100644 --- a/gluten-data/src/main/java/io/glutenproject/vectorized/ShuffleWriterJniWrapper.java +++ b/gluten-data/src/main/java/io/glutenproject/vectorized/ShuffleWriterJniWrapper.java @@ -64,7 +64,8 @@ public long make( boolean writeEOS, double reallocThreshold, long handle, - long taskAttemptId) { + long taskAttemptId, + int startPartitionId) { return nativeMake( part.getShortName(), part.getNumPartitions(), @@ -81,6 +82,7 @@ public long make( reallocThreshold, handle, taskAttemptId, + startPartitionId, 0, null, "local"); @@ -105,6 +107,7 @@ public long makeForRSS( long memoryManagerHandle, long handle, long taskAttemptId, + int startPartitionId, String partitionWriterType, double reallocThreshold) { return nativeMake( @@ -123,6 +126,7 @@ public long makeForRSS( reallocThreshold, handle, taskAttemptId, + startPartitionId, pushBufferMaxSize, pusher, partitionWriterType); @@ -144,6 +148,7 @@ public native long nativeMake( double reallocThreshold, long handle, long taskAttemptId, + int startPartitionId, int pushBufferMaxSize, Object pusher, String partitionWriterType); diff --git a/gluten-data/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala b/gluten-data/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala index 9e3cca7744ce..3d10694232b7 100644 --- a/gluten-data/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala +++ b/gluten-data/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala @@ -155,7 +155,8 @@ class ColumnarShuffleWriter[K, V]( writeEOS, reallocThreshold, handle, - taskContext.taskAttemptId() + taskContext.taskAttemptId(), + GlutenShuffleUtils.getStartPartitionId(dep.nativePartitioning, taskContext.partitionId) ) } val startTime = System.nanoTime() diff --git a/gluten-data/src/main/scala/org/apache/spark/sql/execution/utils/ExecUtil.scala b/gluten-data/src/main/scala/org/apache/spark/sql/execution/utils/ExecUtil.scala index eb29ba2709a0..5f3db8bf8a27 100644 --- a/gluten-data/src/main/scala/org/apache/spark/sql/execution/utils/ExecUtil.scala +++ b/gluten-data/src/main/scala/org/apache/spark/sql/execution/utils/ExecUtil.scala @@ -25,7 +25,7 @@ import io.glutenproject.vectorized.{ArrowWritableColumnVector, NativeColumnarToR import org.apache.spark.{Partitioner, RangePartitioner, ShuffleDependency} import org.apache.spark.rdd.RDD import org.apache.spark.serializer.Serializer -import org.apache.spark.shuffle.ColumnarShuffleDependency +import org.apache.spark.shuffle.{ColumnarShuffleDependency, GlutenShuffleUtils} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, BoundReference, UnsafeProjection, UnsafeRow} import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering @@ -146,14 +146,14 @@ object ExecUtil { val nativePartitioning: NativePartitioning = newPartitioning match { case SinglePartition => - new NativePartitioning("single", 1) + new NativePartitioning(GlutenShuffleUtils.SinglePartitioningShortName, 1) case RoundRobinPartitioning(n) => - new NativePartitioning("rr", n) + new NativePartitioning(GlutenShuffleUtils.RoundRobinPartitioningShortName, n) case HashPartitioning(exprs, n) => - new NativePartitioning("hash", n) + new NativePartitioning(GlutenShuffleUtils.HashPartitioningShortName, n) // range partitioning fall back to row-based partition id computation case RangePartitioning(orders, n) => - new NativePartitioning("range", n) + new NativePartitioning(GlutenShuffleUtils.RangePartitioningShortName, n) } val isRoundRobin = newPartitioning.isInstanceOf[RoundRobinPartitioning] && diff --git a/gluten-ut/spark32/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark32/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala index e34ab4cb2f08..c81929c88150 100644 --- a/gluten-ut/spark32/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark32/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala @@ -928,7 +928,6 @@ class VeloxTestSettings extends BackendTestSettings { enableSuite[GlutenDataSourceStrategySuite] enableSuite[GlutenDataSourceSuite] enableSuite[GlutenFileFormatWriterSuite] - .excludeByPrefix("empty file should be skipped while write to file") enableSuite[GlutenFileIndexSuite] enableSuite[GlutenParquetCodecSuite] // Unsupported compression codec. diff --git a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenFileFormatWriterSuite.scala b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenFileFormatWriterSuite.scala index 664caf560499..c0ba24f2be1f 100644 --- a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenFileFormatWriterSuite.scala +++ b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenFileFormatWriterSuite.scala @@ -22,17 +22,4 @@ import org.apache.spark.sql.catalyst.plans.CodegenInterpretedPlanTest class GlutenFileFormatWriterSuite extends FileFormatWriterSuite with GlutenSQLTestsBaseTrait - with CodegenInterpretedPlanTest { - - test("gluten empty file should be skipped while write to file") { - withTempPath { - path => - spark.range(100).repartition(10).where("id = 50").write.parquet(path.toString) - val partFiles = path - .listFiles() - .filter(f => f.isFile && !f.getName.startsWith(".") && !f.getName.startsWith("_")) - // result only one row, gluten result is more reasonable - assert(partFiles.length === 1) - } - } -} + with CodegenInterpretedPlanTest {} diff --git a/gluten-ut/spark33/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark33/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala index fe3e0697ed58..7dc82924c56c 100644 --- a/gluten-ut/spark33/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark33/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala @@ -782,7 +782,6 @@ class VeloxTestSettings extends BackendTestSettings { enableSuite[GlutenDataSourceStrategySuite] enableSuite[GlutenDataSourceSuite] enableSuite[GlutenFileFormatWriterSuite] - .excludeByPrefix("empty file should be skipped while write to file") enableSuite[GlutenFileIndexSuite] enableSuite[GlutenFileMetadataStructSuite] enableSuite[GlutenParquetV1AggregatePushDownSuite] diff --git a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenFileFormatWriterSuite.scala b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenFileFormatWriterSuite.scala index 664caf560499..c0ba24f2be1f 100644 --- a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenFileFormatWriterSuite.scala +++ b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenFileFormatWriterSuite.scala @@ -22,17 +22,4 @@ import org.apache.spark.sql.catalyst.plans.CodegenInterpretedPlanTest class GlutenFileFormatWriterSuite extends FileFormatWriterSuite with GlutenSQLTestsBaseTrait - with CodegenInterpretedPlanTest { - - test("gluten empty file should be skipped while write to file") { - withTempPath { - path => - spark.range(100).repartition(10).where("id = 50").write.parquet(path.toString) - val partFiles = path - .listFiles() - .filter(f => f.isFile && !f.getName.startsWith(".") && !f.getName.startsWith("_")) - // result only one row, gluten result is more reasonable - assert(partFiles.length === 1) - } - } -} + with CodegenInterpretedPlanTest {} diff --git a/gluten-ut/spark34/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark34/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala index f1ef6c1a0065..2df6a5538a62 100644 --- a/gluten-ut/spark34/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark34/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala @@ -805,7 +805,6 @@ class VeloxTestSettings extends BackendTestSettings { enableSuite[GlutenDataSourceStrategySuite] enableSuite[GlutenDataSourceSuite] enableSuite[GlutenFileFormatWriterSuite] - .excludeByPrefix("empty file should be skipped while write to file") enableSuite[GlutenFileIndexSuite] enableSuite[GlutenFileMetadataStructSuite] .exclude("SPARK-41896: Filter on row_index and a stored column at the same time") diff --git a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenFileFormatWriterSuite.scala b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenFileFormatWriterSuite.scala index 664caf560499..c0ba24f2be1f 100644 --- a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenFileFormatWriterSuite.scala +++ b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenFileFormatWriterSuite.scala @@ -22,17 +22,4 @@ import org.apache.spark.sql.catalyst.plans.CodegenInterpretedPlanTest class GlutenFileFormatWriterSuite extends FileFormatWriterSuite with GlutenSQLTestsBaseTrait - with CodegenInterpretedPlanTest { - - test("gluten empty file should be skipped while write to file") { - withTempPath { - path => - spark.range(100).repartition(10).where("id = 50").write.parquet(path.toString) - val partFiles = path - .listFiles() - .filter(f => f.isFile && !f.getName.startsWith(".") && !f.getName.startsWith("_")) - // result only one row, gluten result is more reasonable - assert(partFiles.length === 1) - } - } -} + with CodegenInterpretedPlanTest {}