From 5f5100f26e86cc92e8a90256bf0dd33413b7b12f Mon Sep 17 00:00:00 2001 From: Geser Dugarov Date: Wed, 18 Dec 2024 16:42:29 +0700 Subject: [PATCH 1/2] [HUDI-8631] Support of `hoodie.populate.meta.fields` for Flink append mode --- .../row/HoodieRowDataCreateHandle.java | 27 ++++++++++++------- .../bucket/BucketBulkInsertWriterHelper.java | 2 +- .../sink/bulk/BulkInsertWriterHelper.java | 13 ++++++--- 3 files changed, 29 insertions(+), 13 deletions(-) diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java index b08e814d15c5..29bd3069922f 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java @@ -71,6 +71,7 @@ public class HoodieRowDataCreateHandle implements Serializable { private final Path path; private final String fileId; private final boolean preserveHoodieMetadata; + private final boolean skipMetadataWrite; private final HoodieStorage storage; protected final WriteStatus writeStatus; private final HoodieRecordLocation newRecordLocation; @@ -79,7 +80,7 @@ public class HoodieRowDataCreateHandle implements Serializable { public HoodieRowDataCreateHandle(HoodieTable table, HoodieWriteConfig writeConfig, String partitionPath, String fileId, String instantTime, int taskPartitionId, long taskId, long taskEpochId, - RowType rowType, boolean preserveHoodieMetadata) { + RowType rowType, boolean preserveHoodieMetadata, boolean skipMetadataWrite) { this.partitionPath = partitionPath; this.table = table; this.writeConfig = writeConfig; @@ -90,6 +91,7 @@ public HoodieRowDataCreateHandle(HoodieTable table, HoodieWriteConfig writeConfi this.fileId = fileId; this.newRecordLocation = new HoodieRecordLocation(instantTime, fileId); this.preserveHoodieMetadata = preserveHoodieMetadata; + this.skipMetadataWrite = skipMetadataWrite; this.currTimer = HoodieTimer.start(); this.storage = table.getStorage(); this.path = makeNewPath(partitionPath); @@ -128,14 +130,21 @@ public HoodieRowDataCreateHandle(HoodieTable table, HoodieWriteConfig writeConfi */ public void write(String recordKey, String partitionPath, RowData record) throws IOException { try { - String seqId = preserveHoodieMetadata - ? record.getString(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD_ORD).toString() - : HoodieRecord.generateSequenceId(instantTime, taskPartitionId, SEQGEN.getAndIncrement()); - String commitInstant = preserveHoodieMetadata - ? record.getString(HoodieRecord.COMMIT_TIME_METADATA_FIELD_ORD).toString() - : instantTime; - RowData rowData = HoodieRowDataCreation.create(commitInstant, seqId, recordKey, partitionPath, path.getName(), - record, writeConfig.allowOperationMetadataField(), preserveHoodieMetadata); + String seqId; + String commitInstant; + RowData rowData; + if (!skipMetadataWrite) { + seqId = preserveHoodieMetadata + ? record.getString(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD_ORD).toString() + : HoodieRecord.generateSequenceId(instantTime, taskPartitionId, SEQGEN.getAndIncrement()); + commitInstant = preserveHoodieMetadata + ? record.getString(HoodieRecord.COMMIT_TIME_METADATA_FIELD_ORD).toString() + : instantTime; + rowData = HoodieRowDataCreation.create(commitInstant, seqId, recordKey, partitionPath, path.getName(), + record, writeConfig.allowOperationMetadataField(), preserveHoodieMetadata); + } else { + rowData = record; + } try { fileWriter.writeRow(recordKey, rowData); HoodieRecordDelegate recordDelegate = writeStatus.isTrackingSuccessfulWrites() diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java index 1047a4f5c000..b84c44af832c 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java @@ -82,7 +82,7 @@ private HoodieRowDataCreateHandle getRowCreateHandle(String partitionPath, Strin close(); } HoodieRowDataCreateHandle rowCreateHandle = new HoodieRowDataCreateHandle(hoodieTable, writeConfig, partitionPath, fileId, - instantTime, taskPartitionId, totalSubtaskNum, taskEpochId, rowType, preserveHoodieMetadata); + instantTime, taskPartitionId, totalSubtaskNum, taskEpochId, rowType, preserveHoodieMetadata, isAppendMode && !populateMetaFields); handles.put(fileId, rowCreateHandle); } return handles.get(fileId); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java index dc0c27d64d2e..9b9b85ba6e3d 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java @@ -61,6 +61,9 @@ public class BulkInsertWriterHelper { protected final HoodieWriteConfig writeConfig; protected final RowType rowType; protected final boolean preserveHoodieMetadata; + protected final boolean isAppendMode; + // used for Append mode only, if true then only initial row data without metacolumns is written + protected final boolean populateMetaFields; protected final Boolean isInputSorted; private final List writeStatusList = new ArrayList<>(); protected HoodieRowDataCreateHandle handle; @@ -92,7 +95,11 @@ public BulkInsertWriterHelper(Configuration conf, HoodieTable hoodieTable, Hoodi this.taskPartitionId = taskPartitionId; this.totalSubtaskNum = totalSubtaskNum; this.taskEpochId = taskEpochId; - this.rowType = preserveHoodieMetadata ? rowType : addMetadataFields(rowType, writeConfig.allowOperationMetadataField()); // patch up with metadata fields + this.isAppendMode = OptionsResolver.isAppendMode(conf); + this.populateMetaFields = writeConfig.populateMetaFields(); + this.rowType = preserveHoodieMetadata || (isAppendMode && !populateMetaFields) + ? rowType + : addMetadataFields(rowType, writeConfig.allowOperationMetadataField()); this.preserveHoodieMetadata = preserveHoodieMetadata; this.isInputSorted = OptionsResolver.isBulkInsertOperation(conf) && conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT); this.fileIdPrefix = UUID.randomUUID().toString(); @@ -140,7 +147,7 @@ private HoodieRowDataCreateHandle getRowCreateHandle(String partitionPath) throw LOG.info("Creating new file for partition path " + partitionPath); writeMetrics.ifPresent(FlinkStreamWriteMetrics::startHandleCreation); HoodieRowDataCreateHandle rowCreateHandle = new HoodieRowDataCreateHandle(hoodieTable, writeConfig, partitionPath, getNextFileId(), - instantTime, taskPartitionId, totalSubtaskNum, taskEpochId, rowType, preserveHoodieMetadata); + instantTime, taskPartitionId, totalSubtaskNum, taskEpochId, rowType, preserveHoodieMetadata, isAppendMode && !populateMetaFields); handles.put(partitionPath, rowCreateHandle); writeMetrics.ifPresent(FlinkStreamWriteMetrics::increaseNumOfOpenHandle); @@ -216,7 +223,7 @@ public List getWriteStatuses(int taskID) { private HoodieRowDataCreateHandle createWriteHandle(String partitionPath) { writeMetrics.ifPresent(FlinkStreamWriteMetrics::startHandleCreation); HoodieRowDataCreateHandle rowCreateHandle = new HoodieRowDataCreateHandle(hoodieTable, writeConfig, partitionPath, getNextFileId(), - instantTime, taskPartitionId, totalSubtaskNum, taskEpochId, rowType, preserveHoodieMetadata); + instantTime, taskPartitionId, totalSubtaskNum, taskEpochId, rowType, preserveHoodieMetadata, isAppendMode && !populateMetaFields); writeMetrics.ifPresent(FlinkStreamWriteMetrics::endHandleCreation); return rowCreateHandle; } From f6914db4fbd50ed817aa9308ea36cb8407632207 Mon Sep 17 00:00:00 2001 From: Geser Dugarov Date: Fri, 20 Dec 2024 14:07:30 +0700 Subject: [PATCH 2/2] [HUDI-8631] Added test `ITTestHoodieDataSource::testWriteWithoutMetaColumns` --- .../hudi/table/ITTestHoodieDataSource.java | 38 +++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java index 2cc964f1a26b..3b5c689554ba 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java @@ -21,6 +21,8 @@ import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.model.DefaultHoodieRecordPayload; import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.CollectionUtils; @@ -2320,6 +2322,32 @@ void testUpdateDelete(String indexType, HoodieTableType tableType) { assertRowsEquals(result4, expected4); } + @ParameterizedTest + @MethodSource("parametersForMetaColumnsSkip") + void testWriteWithoutMetaColumns(HoodieTableType tableType, WriteOperationType operation) + throws TableNotExistException, InterruptedException { + String createSource = TestConfigurations.getFileSourceDDL("source"); + streamTableEnv.executeSql(createSource); + + String hoodieTableDDL = sql("t1") + .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) + .option(FlinkOptions.TABLE_TYPE, tableType) + .option(HoodieTableConfig.POPULATE_META_FIELDS.key(), "false") + .end(); + streamTableEnv.executeSql(hoodieTableDDL); + String insertInto = "insert into t1 select * from source"; + execInsertSql(streamTableEnv, insertInto); + + streamTableEnv.executeSql("drop table t1"); + hoodieTableDDL = sql("t1") + .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) + .option(FlinkOptions.TABLE_TYPE, tableType) + .end(); + streamTableEnv.executeSql(hoodieTableDDL); + List rows = execSelectSql(streamTableEnv, "select * from t1", 10); + assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT); + } + @Test void testReadWithParquetPredicatePushDown() { TableEnvironment tableEnv = batchTableEnv; @@ -2419,6 +2447,16 @@ private static Stream indexAndTableTypeParams() { return Stream.of(data).map(Arguments::of); } + private static Stream parametersForMetaColumnsSkip() { + Object[][] data = + new Object[][] { + {HoodieTableType.COPY_ON_WRITE, WriteOperationType.INSERT} + // add MOR upsert check after fixing of HUDI-8785 + // {HoodieTableType.MERGE_ON_READ, WriteOperationType.UPSERT} + }; + return Stream.of(data).map(Arguments::of); + } + private void execInsertSql(TableEnvironment tEnv, String insert) { TableResult tableResult = tEnv.executeSql(insert); // wait to finish