From 97ea527b6d3e35113eeb392c094e80511a259d6f Mon Sep 17 00:00:00 2001 From: TheR1sing3un Date: Mon, 16 Dec 2024 15:34:43 +0800 Subject: [PATCH 1/3] fix: populate the log record with file name instead of file id 1. populate the log record with file name instead of file id Signed-off-by: TheR1sing3un --- .../src/main/java/org/apache/hudi/io/HoodieAppendHandle.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java index 46dc9ad55bc0b..a98a5ab001a2c 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java @@ -329,7 +329,7 @@ private MetadataValues populateMetadataFields(HoodieRecord hoodieRecord) { if (config.populateMetaFields()) { String seqId = HoodieRecord.generateSequenceId(instantTime, getPartitionId(), RECORD_COUNTER.getAndIncrement()); - metadataValues.setFileName(fileId); + metadataValues.setFileName(writer.getLogFile().getFileName()); metadataValues.setPartitionPath(partitionPath); metadataValues.setRecordKey(hoodieRecord.getRecordKey()); if (!this.isLogCompaction) { From 7ec46505a99ddb414f87352976f81ca04517f0ea Mon Sep 17 00:00:00 2001 From: TheR1sing3un Date: Mon, 16 Dec 2024 18:30:59 +0800 Subject: [PATCH 2/3] fix: get file id from `_hoodie_file_name` field in log record 1. get file id from `_hoodie_file_name` field in log record Signed-off-by: TheR1sing3un --- .../org/apache/hudi/HoodieCreateRecordUtils.scala | 4 ++-- .../functional/TestDataSkippingWithMORColstats.java | 10 +++------- .../sql/hudi/command/index/TestSecondaryIndex.scala | 5 ++--- 3 files changed, 7 insertions(+), 12 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala index 048f45bba9f2f..30b42a87344ed 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala @@ -226,7 +226,7 @@ object HoodieCreateRecordUtils { None } val recordLocation: Option[HoodieRecordLocation] = if (instantTime.isDefined && fileName.isDefined) { - val fileId = FSUtils.getFileId(fileName.get) + val fileId = FSUtils.getFileIdFromFileName(fileName.get) Some(new HoodieRecordLocation(instantTime.get, fileId)) } else { None @@ -265,7 +265,7 @@ object HoodieCreateRecordUtils { } val recordLocation: Option[HoodieRecordLocation] = if (instantTime.isDefined && fileName.isDefined) { - val fileId = FSUtils.getFileId(fileName.get) + val fileId = FSUtils.getFileIdFromFileName(fileName.get) Some(new HoodieRecordLocation(instantTime.get, fileId)) } else { None diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestDataSkippingWithMORColstats.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestDataSkippingWithMORColstats.java index e5fe0a3ad00e6..dbab5cc631f9c 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestDataSkippingWithMORColstats.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestDataSkippingWithMORColstats.java @@ -373,17 +373,13 @@ private Dataset removeRecord(Dataset batch, Dataset recordToRemov * no records match the condition */ private List getFilesToCorrupt() { - Set fileNames = new HashSet<>(); + Set fileIds = new HashSet<>(); sparkSession.read().format("hudi").load(basePath.toString()) .where(matchCond) .select("_hoodie_file_name").distinct() .collectAsList().forEach(row -> { String fileName = row.getString(0); - if (fileName.contains(".parquet")) { - fileNames.add(FSUtils.getFileId(fileName)); - } else { - fileNames.add(fileName); - } + fileIds.add(FSUtils.getFileIdFromFileName(fileName)); }); try (Stream stream = Files.list(basePath)) { @@ -392,7 +388,7 @@ private List getFilesToCorrupt() { .filter(file -> !Files.isDirectory(file)) .filter(file -> file.toString().contains(".parquet")) .filter(file -> !file.toString().contains(".crc")) - .filter(file -> !fileNames.contains(FSUtils.getFileId(file.getFileName().toString()))) + .filter(file -> !fileIds.contains(FSUtils.getFileId(file.getFileName().toString()))) .collect(Collectors.toList()); files.forEach(f -> { String fileID = FSUtils.getFileId(f.getFileName().toString()); diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestSecondaryIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestSecondaryIndex.scala index 43b03806261fe..446ed1f086565 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestSecondaryIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestSecondaryIndex.scala @@ -21,6 +21,7 @@ package org.apache.spark.sql.hudi.command.index import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.common.config.{HoodieMetadataConfig, RecordMergeMode} +import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.model.WriteOperationType import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.table.timeline.TimelineMetadataUtils.deserializeCommitMetadata @@ -31,13 +32,11 @@ import org.apache.hudi.metadata.HoodieMetadataPayload.SECONDARY_INDEX_RECORD_KEY import org.apache.hudi.metadata.SecondaryIndexKeyUtils import org.apache.hudi.storage.StoragePath import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions} - import org.apache.spark.sql.SaveMode import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue} import java.util.concurrent.atomic.AtomicInteger - import scala.collection.JavaConverters._ class TestSecondaryIndex extends HoodieSparkSqlTestBase { @@ -469,7 +468,7 @@ class TestSecondaryIndex extends HoodieSparkSqlTestBase { // Perform Deletes on Records and Validate Secondary Index val deleteDf = spark.read.format("hudi").load(basePath).filter(s"_row_key in ('${updateKeys.mkString("','")}')") // Get fileId for the delete record - val deleteFileId = deleteDf.select("_hoodie_file_name").collect().head.getString(0) + val deleteFileId = FSUtils.getFileIdFromFileName(deleteDf.select("_hoodie_file_name").collect().head.getString(0)) deleteDf.write.format("hudi") .options(hudiOpts) .option(OPERATION.key, DELETE_OPERATION_OPT_VAL) From 5b04843fc9a03ebd4031c62c0a6fc4b8c38da595 Mon Sep 17 00:00:00 2001 From: TheR1sing3un Date: Mon, 16 Dec 2024 19:47:50 +0800 Subject: [PATCH 3/3] test: fix wrong verify logic in TestRecordLevelIndex 1. fix wrong verify logic in TestRecordLevelIndex Signed-off-by: TheR1sing3un --- .../apache/hudi/functional/RecordLevelIndexTestBase.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala index 15b52a68b8ef7..4955ac5a8673d 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala @@ -22,6 +22,7 @@ import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.client.SparkRDDWriteClient import org.apache.hudi.client.common.HoodieSparkEngineContext import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties} +import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.model._ import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieInstantTimeGenerator, MetadataConversionUtils} @@ -32,7 +33,6 @@ import org.apache.hudi.metadata.{HoodieBackedTableMetadata, HoodieTableMetadataU import org.apache.hudi.storage.StoragePath import org.apache.hudi.testutils.HoodieSparkClientTestBase import org.apache.hudi.util.JavaConversions - import org.apache.spark.sql._ import org.apache.spark.sql.functions.{col, not} import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue} @@ -40,7 +40,6 @@ import org.junit.jupiter.api._ import java.util.concurrent.atomic.AtomicInteger import java.util.stream.Collectors - import scala.collection.JavaConverters._ import scala.collection.{JavaConverters, mutable} import scala.util.Using @@ -304,12 +303,13 @@ class RecordLevelIndexTestBase extends HoodieSparkClientTestBase { val recordKey: String = row.getAs("_hoodie_record_key") val partitionPath: String = row.getAs("_hoodie_partition_path") val fileName: String = row.getAs("_hoodie_file_name") + val fileId = FSUtils.getFileIdFromFileName(fileName) val recordLocations = recordIndexMap.get(recordKey) assertFalse(recordLocations.isEmpty) // assuming no duplicate keys for now val recordLocation = recordLocations.get(0) assertEquals(partitionPath, recordLocation.getPartitionPath) - assertTrue(fileName.startsWith(recordLocation.getFileId), fileName + " should start with " + recordLocation.getFileId) + assertTrue(fileId.equals(recordLocation.getFileId), fileName + " should have same file id with " + recordLocation.getFileId) } val deletedRows = deletedDf.collect()