Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[MINOR] Populate the log record with file name instead of file id #12494

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ private MetadataValues populateMetadataFields(HoodieRecord<T> hoodieRecord) {
if (config.populateMetaFields()) {
String seqId =
HoodieRecord.generateSequenceId(instantTime, getPartitionId(), RECORD_COUNTER.getAndIncrement());
metadataValues.setFileName(fileId);
metadataValues.setFileName(writer.getLogFile().getFileName());
metadataValues.setPartitionPath(partitionPath);
metadataValues.setRecordKey(hoodieRecord.getRecordKey());
if (!this.isLogCompaction) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ object HoodieCreateRecordUtils {
None
}
val recordLocation: Option[HoodieRecordLocation] = if (instantTime.isDefined && fileName.isDefined) {
val fileId = FSUtils.getFileId(fileName.get)
val fileId = FSUtils.getFileIdFromFileName(fileName.get)
Some(new HoodieRecordLocation(instantTime.get, fileId))
} else {
None
Expand Down Expand Up @@ -265,7 +265,7 @@ object HoodieCreateRecordUtils {
}

val recordLocation: Option[HoodieRecordLocation] = if (instantTime.isDefined && fileName.isDefined) {
val fileId = FSUtils.getFileId(fileName.get)
val fileId = FSUtils.getFileIdFromFileName(fileName.get)
Some(new HoodieRecordLocation(instantTime.get, fileId))
} else {
None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,17 +373,13 @@ private Dataset<Row> removeRecord(Dataset<Row> batch, Dataset<Row> recordToRemov
* no records match the condition
*/
private List<Path> getFilesToCorrupt() {
Set<String> fileNames = new HashSet<>();
Set<String> fileIds = new HashSet<>();
sparkSession.read().format("hudi").load(basePath.toString())
.where(matchCond)
.select("_hoodie_file_name").distinct()
.collectAsList().forEach(row -> {
String fileName = row.getString(0);
if (fileName.contains(".parquet")) {
fileNames.add(FSUtils.getFileId(fileName));
} else {
fileNames.add(fileName);
}
fileIds.add(FSUtils.getFileIdFromFileName(fileName));
});

try (Stream<Path> stream = Files.list(basePath)) {
Expand All @@ -392,7 +388,7 @@ private List<Path> getFilesToCorrupt() {
.filter(file -> !Files.isDirectory(file))
.filter(file -> file.toString().contains(".parquet"))
.filter(file -> !file.toString().contains(".crc"))
.filter(file -> !fileNames.contains(FSUtils.getFileId(file.getFileName().toString())))
.filter(file -> !fileIds.contains(FSUtils.getFileId(file.getFileName().toString())))
.collect(Collectors.toList());
files.forEach(f -> {
String fileID = FSUtils.getFileId(f.getFileName().toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.client.SparkRDDWriteClient
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties}
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model._
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieInstantTimeGenerator, MetadataConversionUtils}
Expand All @@ -32,15 +33,13 @@ import org.apache.hudi.metadata.{HoodieBackedTableMetadata, HoodieTableMetadataU
import org.apache.hudi.storage.StoragePath
import org.apache.hudi.testutils.HoodieSparkClientTestBase
import org.apache.hudi.util.JavaConversions

import org.apache.spark.sql._
import org.apache.spark.sql.functions.{col, not}
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
import org.junit.jupiter.api._

import java.util.concurrent.atomic.AtomicInteger
import java.util.stream.Collectors

import scala.collection.JavaConverters._
import scala.collection.{JavaConverters, mutable}
import scala.util.Using
Expand Down Expand Up @@ -304,12 +303,13 @@ class RecordLevelIndexTestBase extends HoodieSparkClientTestBase {
val recordKey: String = row.getAs("_hoodie_record_key")
val partitionPath: String = row.getAs("_hoodie_partition_path")
val fileName: String = row.getAs("_hoodie_file_name")
val fileId = FSUtils.getFileIdFromFileName(fileName)
val recordLocations = recordIndexMap.get(recordKey)
assertFalse(recordLocations.isEmpty)
// assuming no duplicate keys for now
val recordLocation = recordLocations.get(0)
assertEquals(partitionPath, recordLocation.getPartitionPath)
assertTrue(fileName.startsWith(recordLocation.getFileId), fileName + " should start with " + recordLocation.getFileId)
assertTrue(fileId.equals(recordLocation.getFileId), fileName + " should have same file id with " + recordLocation.getFileId)
}

val deletedRows = deletedDf.collect()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package org.apache.spark.sql.hudi.command.index

import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.config.{HoodieMetadataConfig, RecordMergeMode}
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.WriteOperationType
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils.deserializeCommitMetadata
Expand All @@ -31,13 +32,11 @@ import org.apache.hudi.metadata.HoodieMetadataPayload.SECONDARY_INDEX_RECORD_KEY
import org.apache.hudi.metadata.SecondaryIndexKeyUtils
import org.apache.hudi.storage.StoragePath
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions}

import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}

import java.util.concurrent.atomic.AtomicInteger

import scala.collection.JavaConverters._

class TestSecondaryIndex extends HoodieSparkSqlTestBase {
Expand Down Expand Up @@ -469,7 +468,7 @@ class TestSecondaryIndex extends HoodieSparkSqlTestBase {
// Perform Deletes on Records and Validate Secondary Index
val deleteDf = spark.read.format("hudi").load(basePath).filter(s"_row_key in ('${updateKeys.mkString("','")}')")
// Get fileId for the delete record
val deleteFileId = deleteDf.select("_hoodie_file_name").collect().head.getString(0)
val deleteFileId = FSUtils.getFileIdFromFileName(deleteDf.select("_hoodie_file_name").collect().head.getString(0))
deleteDf.write.format("hudi")
.options(hudiOpts)
.option(OPERATION.key, DELETE_OPERATION_OPT_VAL)
Expand Down
Loading