Skip to content

Commit

Permalink
[HUDI-8631] Support of hoodie.populate.meta.fields for Flink append…
Browse files Browse the repository at this point in the history
… mode (#12516)
  • Loading branch information
geserdugarov authored Dec 21, 2024
1 parent 217956c commit ae820af
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public class HoodieRowDataCreateHandle implements Serializable {
private final Path path;
private final String fileId;
private final boolean preserveHoodieMetadata;
private final boolean skipMetadataWrite;
private final HoodieStorage storage;
protected final WriteStatus writeStatus;
private final HoodieRecordLocation newRecordLocation;
Expand All @@ -79,7 +80,7 @@ public class HoodieRowDataCreateHandle implements Serializable {

public HoodieRowDataCreateHandle(HoodieTable table, HoodieWriteConfig writeConfig, String partitionPath, String fileId,
String instantTime, int taskPartitionId, long taskId, long taskEpochId,
RowType rowType, boolean preserveHoodieMetadata) {
RowType rowType, boolean preserveHoodieMetadata, boolean skipMetadataWrite) {
this.partitionPath = partitionPath;
this.table = table;
this.writeConfig = writeConfig;
Expand All @@ -90,6 +91,7 @@ public HoodieRowDataCreateHandle(HoodieTable table, HoodieWriteConfig writeConfi
this.fileId = fileId;
this.newRecordLocation = new HoodieRecordLocation(instantTime, fileId);
this.preserveHoodieMetadata = preserveHoodieMetadata;
this.skipMetadataWrite = skipMetadataWrite;
this.currTimer = HoodieTimer.start();
this.storage = table.getStorage();
this.path = makeNewPath(partitionPath);
Expand Down Expand Up @@ -128,14 +130,21 @@ public HoodieRowDataCreateHandle(HoodieTable table, HoodieWriteConfig writeConfi
*/
public void write(String recordKey, String partitionPath, RowData record) throws IOException {
try {
String seqId = preserveHoodieMetadata
? record.getString(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD_ORD).toString()
: HoodieRecord.generateSequenceId(instantTime, taskPartitionId, SEQGEN.getAndIncrement());
String commitInstant = preserveHoodieMetadata
? record.getString(HoodieRecord.COMMIT_TIME_METADATA_FIELD_ORD).toString()
: instantTime;
RowData rowData = HoodieRowDataCreation.create(commitInstant, seqId, recordKey, partitionPath, path.getName(),
record, writeConfig.allowOperationMetadataField(), preserveHoodieMetadata);
String seqId;
String commitInstant;
RowData rowData;
if (!skipMetadataWrite) {
seqId = preserveHoodieMetadata
? record.getString(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD_ORD).toString()
: HoodieRecord.generateSequenceId(instantTime, taskPartitionId, SEQGEN.getAndIncrement());
commitInstant = preserveHoodieMetadata
? record.getString(HoodieRecord.COMMIT_TIME_METADATA_FIELD_ORD).toString()
: instantTime;
rowData = HoodieRowDataCreation.create(commitInstant, seqId, recordKey, partitionPath, path.getName(),
record, writeConfig.allowOperationMetadataField(), preserveHoodieMetadata);
} else {
rowData = record;
}
try {
fileWriter.writeRow(recordKey, rowData);
HoodieRecordDelegate recordDelegate = writeStatus.isTrackingSuccessfulWrites()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ private HoodieRowDataCreateHandle getRowCreateHandle(String partitionPath, Strin
close();
}
HoodieRowDataCreateHandle rowCreateHandle = new HoodieRowDataCreateHandle(hoodieTable, writeConfig, partitionPath, fileId,
instantTime, taskPartitionId, totalSubtaskNum, taskEpochId, rowType, preserveHoodieMetadata);
instantTime, taskPartitionId, totalSubtaskNum, taskEpochId, rowType, preserveHoodieMetadata, isAppendMode && !populateMetaFields);
handles.put(fileId, rowCreateHandle);
}
return handles.get(fileId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ public class BulkInsertWriterHelper {
protected final HoodieWriteConfig writeConfig;
protected final RowType rowType;
protected final boolean preserveHoodieMetadata;
protected final boolean isAppendMode;
// used for Append mode only, if true then only initial row data without metacolumns is written
protected final boolean populateMetaFields;
protected final Boolean isInputSorted;
private final List<WriteStatus> writeStatusList = new ArrayList<>();
protected HoodieRowDataCreateHandle handle;
Expand Down Expand Up @@ -92,7 +95,11 @@ public BulkInsertWriterHelper(Configuration conf, HoodieTable hoodieTable, Hoodi
this.taskPartitionId = taskPartitionId;
this.totalSubtaskNum = totalSubtaskNum;
this.taskEpochId = taskEpochId;
this.rowType = preserveHoodieMetadata ? rowType : addMetadataFields(rowType, writeConfig.allowOperationMetadataField()); // patch up with metadata fields
this.isAppendMode = OptionsResolver.isAppendMode(conf);
this.populateMetaFields = writeConfig.populateMetaFields();
this.rowType = preserveHoodieMetadata || (isAppendMode && !populateMetaFields)
? rowType
: addMetadataFields(rowType, writeConfig.allowOperationMetadataField());
this.preserveHoodieMetadata = preserveHoodieMetadata;
this.isInputSorted = OptionsResolver.isBulkInsertOperation(conf) && conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT);
this.fileIdPrefix = UUID.randomUUID().toString();
Expand Down Expand Up @@ -140,7 +147,7 @@ private HoodieRowDataCreateHandle getRowCreateHandle(String partitionPath) throw
LOG.info("Creating new file for partition path " + partitionPath);
writeMetrics.ifPresent(FlinkStreamWriteMetrics::startHandleCreation);
HoodieRowDataCreateHandle rowCreateHandle = new HoodieRowDataCreateHandle(hoodieTable, writeConfig, partitionPath, getNextFileId(),
instantTime, taskPartitionId, totalSubtaskNum, taskEpochId, rowType, preserveHoodieMetadata);
instantTime, taskPartitionId, totalSubtaskNum, taskEpochId, rowType, preserveHoodieMetadata, isAppendMode && !populateMetaFields);
handles.put(partitionPath, rowCreateHandle);

writeMetrics.ifPresent(FlinkStreamWriteMetrics::increaseNumOfOpenHandle);
Expand Down Expand Up @@ -216,7 +223,7 @@ public List<WriteStatus> getWriteStatuses(int taskID) {
private HoodieRowDataCreateHandle createWriteHandle(String partitionPath) {
writeMetrics.ifPresent(FlinkStreamWriteMetrics::startHandleCreation);
HoodieRowDataCreateHandle rowCreateHandle = new HoodieRowDataCreateHandle(hoodieTable, writeConfig, partitionPath, getNextFileId(),
instantTime, taskPartitionId, totalSubtaskNum, taskEpochId, rowType, preserveHoodieMetadata);
instantTime, taskPartitionId, totalSubtaskNum, taskEpochId, rowType, preserveHoodieMetadata, isAppendMode && !populateMetaFields);
writeMetrics.ifPresent(FlinkStreamWriteMetrics::endHandleCreation);
return rowCreateHandle;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CollectionUtils;
Expand Down Expand Up @@ -2320,6 +2322,32 @@ void testUpdateDelete(String indexType, HoodieTableType tableType) {
assertRowsEquals(result4, expected4);
}

@ParameterizedTest
@MethodSource("parametersForMetaColumnsSkip")
void testWriteWithoutMetaColumns(HoodieTableType tableType, WriteOperationType operation)
throws TableNotExistException, InterruptedException {
String createSource = TestConfigurations.getFileSourceDDL("source");
streamTableEnv.executeSql(createSource);

String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
.option(FlinkOptions.TABLE_TYPE, tableType)
.option(HoodieTableConfig.POPULATE_META_FIELDS.key(), "false")
.end();
streamTableEnv.executeSql(hoodieTableDDL);
String insertInto = "insert into t1 select * from source";
execInsertSql(streamTableEnv, insertInto);

streamTableEnv.executeSql("drop table t1");
hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
.option(FlinkOptions.TABLE_TYPE, tableType)
.end();
streamTableEnv.executeSql(hoodieTableDDL);
List<Row> rows = execSelectSql(streamTableEnv, "select * from t1", 10);
assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT);
}

@Test
void testReadWithParquetPredicatePushDown() {
TableEnvironment tableEnv = batchTableEnv;
Expand Down Expand Up @@ -2419,6 +2447,16 @@ private static Stream<Arguments> indexAndTableTypeParams() {
return Stream.of(data).map(Arguments::of);
}

private static Stream<Arguments> parametersForMetaColumnsSkip() {
Object[][] data =
new Object[][] {
{HoodieTableType.COPY_ON_WRITE, WriteOperationType.INSERT}
// add MOR upsert check after fixing of HUDI-8785
// {HoodieTableType.MERGE_ON_READ, WriteOperationType.UPSERT}
};
return Stream.of(data).map(Arguments::of);
}

private void execInsertSql(TableEnvironment tEnv, String insert) {
TableResult tableResult = tEnv.executeSql(insert);
// wait to finish
Expand Down

0 comments on commit ae820af

Please sign in to comment.