Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-8631] Support of hoodie.populate.meta.fields for Flink append mode #12516

Merged
merged 2 commits into from
Dec 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public class HoodieRowDataCreateHandle implements Serializable {
private final Path path;
private final String fileId;
private final boolean preserveHoodieMetadata;
private final boolean skipMetadataWrite;
private final HoodieStorage storage;
protected final WriteStatus writeStatus;
private final HoodieRecordLocation newRecordLocation;
Expand All @@ -79,7 +80,7 @@ public class HoodieRowDataCreateHandle implements Serializable {

public HoodieRowDataCreateHandle(HoodieTable table, HoodieWriteConfig writeConfig, String partitionPath, String fileId,
String instantTime, int taskPartitionId, long taskId, long taskEpochId,
RowType rowType, boolean preserveHoodieMetadata) {
RowType rowType, boolean preserveHoodieMetadata, boolean skipMetadataWrite) {
this.partitionPath = partitionPath;
this.table = table;
this.writeConfig = writeConfig;
Expand All @@ -90,6 +91,7 @@ public HoodieRowDataCreateHandle(HoodieTable table, HoodieWriteConfig writeConfi
this.fileId = fileId;
this.newRecordLocation = new HoodieRecordLocation(instantTime, fileId);
this.preserveHoodieMetadata = preserveHoodieMetadata;
this.skipMetadataWrite = skipMetadataWrite;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't the flag preserveHoodieMetadata already control this behavior, there is another PR raised by @usberkeley for fixing all the scenarios BTW: #12404

Copy link
Contributor Author

@geserdugarov geserdugarov Dec 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I saw #12404 and was confused by ticket name, which mention Flink table config hoodie.populate.meta.fields, and didn't find any changes in hudi-flink-client or hudi-flink-datasource that will change current behavior. My point of view I described in #12404 (comment)
And to support my comment, I've created this MR that shows the lack of support of hoodie.populate.meta.fields in Flink.

Copy link
Contributor Author

@geserdugarov geserdugarov Dec 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

preserveHoodieMetadata actually used in the code as an indicator to get metadata from row data or generate it by calling corresponding methods. And it actually a little bit confusing naming then. I believe that values for preserveHoodieMetadata could be described by this schema:
preserveHoodieMetadata
Looks like preserveHoodieMetadata could be true only for clustering operator.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like the flag preserveHoodieMetadata indicates whether the source row includes the metadata fields already, for table service like clustering, this should be true by default(because clustering is just a rewrite). For regular write, the metadata fields should be generated on the fly.

Let's check in which case the option hoodie.populate.meta.fields could be false.

Copy link
Contributor Author

@geserdugarov geserdugarov Dec 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

User sets value for hoodie.populate.meta.fields option, which is true by default. And in description for this config, "append only/immutable data" is mentioned as use case:

public static final ConfigProperty<Boolean> POPULATE_META_FIELDS = ConfigProperty
.key("hoodie.populate.meta.fields")
.defaultValue(true)
.withDocumentation("When enabled, populates all meta fields. When disabled, no meta fields are populated "
+ "and incremental queries will not be functional. This is only meant to be used for append only/immutable data for batch processing");

For this reason, in this MR I supported hoodie.populate.meta.fields in Flink only for append mode.

For quick check I use SQL queries like the following ones, which used for append mode:

CREATE TABLE hudi_debug (
    id INT,
    part INT,
    desc STRING,
    PRIMARY KEY (id) NOT ENFORCED
) 
WITH (
    'connector' = 'hudi',
    'path' = '...',
    'table.type' = 'COPY_ON_WRITE',
    'write.operation' = 'insert',
    'hoodie.populate.meta.fields' = 'false'
);
INSERT INTO hudi_debug VALUES 
    (1,100,'aaa'),
    (2,200,'bbb');

Expected results: there is no exceptions during

SELECT * FROM hudi_debug;

and corresponding parquet files in HDFS don't contain columns with metadata.

Copy link
Contributor Author

@geserdugarov geserdugarov Dec 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've also found that we could write MOR table in upsert mode without metadata. Call stack in this case will include HoodieAppendHandle. But we couldn't read result MOR table by Flink later due to exception thrown during:

SELECT * FROM hudi_debug;

I've created separate bug for MOR without meta columns read: HUDI-8785, will fix it in a separate MR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we atleast add an integration test in ITTestHoodieDataSource

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@danny0405
I've added ITTestHoodieDataSource::testWriteWithoutMetaColumns. But for proper checking, it would be great to write data by Flink, and then read it by Spark, because in Spark

SELECT * FROM table;

will return all columns including those with metadata.
And it would be really useful check of engines interoperability. I've created a corresponding task HUDI-8788.

this.currTimer = HoodieTimer.start();
this.storage = table.getStorage();
this.path = makeNewPath(partitionPath);
Expand Down Expand Up @@ -128,14 +130,21 @@ public HoodieRowDataCreateHandle(HoodieTable table, HoodieWriteConfig writeConfi
*/
public void write(String recordKey, String partitionPath, RowData record) throws IOException {
try {
String seqId = preserveHoodieMetadata
? record.getString(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD_ORD).toString()
: HoodieRecord.generateSequenceId(instantTime, taskPartitionId, SEQGEN.getAndIncrement());
String commitInstant = preserveHoodieMetadata
? record.getString(HoodieRecord.COMMIT_TIME_METADATA_FIELD_ORD).toString()
: instantTime;
RowData rowData = HoodieRowDataCreation.create(commitInstant, seqId, recordKey, partitionPath, path.getName(),
record, writeConfig.allowOperationMetadataField(), preserveHoodieMetadata);
String seqId;
String commitInstant;
RowData rowData;
if (!skipMetadataWrite) {
seqId = preserveHoodieMetadata
? record.getString(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD_ORD).toString()
: HoodieRecord.generateSequenceId(instantTime, taskPartitionId, SEQGEN.getAndIncrement());
commitInstant = preserveHoodieMetadata
? record.getString(HoodieRecord.COMMIT_TIME_METADATA_FIELD_ORD).toString()
: instantTime;
rowData = HoodieRowDataCreation.create(commitInstant, seqId, recordKey, partitionPath, path.getName(),
record, writeConfig.allowOperationMetadataField(), preserveHoodieMetadata);
} else {
rowData = record;
}
try {
fileWriter.writeRow(recordKey, rowData);
HoodieRecordDelegate recordDelegate = writeStatus.isTrackingSuccessfulWrites()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ private HoodieRowDataCreateHandle getRowCreateHandle(String partitionPath, Strin
close();
}
HoodieRowDataCreateHandle rowCreateHandle = new HoodieRowDataCreateHandle(hoodieTable, writeConfig, partitionPath, fileId,
instantTime, taskPartitionId, totalSubtaskNum, taskEpochId, rowType, preserveHoodieMetadata);
instantTime, taskPartitionId, totalSubtaskNum, taskEpochId, rowType, preserveHoodieMetadata, isAppendMode && !populateMetaFields);
handles.put(fileId, rowCreateHandle);
}
return handles.get(fileId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ public class BulkInsertWriterHelper {
protected final HoodieWriteConfig writeConfig;
protected final RowType rowType;
protected final boolean preserveHoodieMetadata;
protected final boolean isAppendMode;
// used for Append mode only, if true then only initial row data without metacolumns is written
protected final boolean populateMetaFields;
protected final Boolean isInputSorted;
private final List<WriteStatus> writeStatusList = new ArrayList<>();
protected HoodieRowDataCreateHandle handle;
Expand Down Expand Up @@ -92,7 +95,11 @@ public BulkInsertWriterHelper(Configuration conf, HoodieTable hoodieTable, Hoodi
this.taskPartitionId = taskPartitionId;
this.totalSubtaskNum = totalSubtaskNum;
this.taskEpochId = taskEpochId;
this.rowType = preserveHoodieMetadata ? rowType : addMetadataFields(rowType, writeConfig.allowOperationMetadataField()); // patch up with metadata fields
this.isAppendMode = OptionsResolver.isAppendMode(conf);
this.populateMetaFields = writeConfig.populateMetaFields();
this.rowType = preserveHoodieMetadata || (isAppendMode && !populateMetaFields)
? rowType
: addMetadataFields(rowType, writeConfig.allowOperationMetadataField());
this.preserveHoodieMetadata = preserveHoodieMetadata;
this.isInputSorted = OptionsResolver.isBulkInsertOperation(conf) && conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT);
this.fileIdPrefix = UUID.randomUUID().toString();
Expand Down Expand Up @@ -140,7 +147,7 @@ private HoodieRowDataCreateHandle getRowCreateHandle(String partitionPath) throw
LOG.info("Creating new file for partition path " + partitionPath);
writeMetrics.ifPresent(FlinkStreamWriteMetrics::startHandleCreation);
HoodieRowDataCreateHandle rowCreateHandle = new HoodieRowDataCreateHandle(hoodieTable, writeConfig, partitionPath, getNextFileId(),
instantTime, taskPartitionId, totalSubtaskNum, taskEpochId, rowType, preserveHoodieMetadata);
instantTime, taskPartitionId, totalSubtaskNum, taskEpochId, rowType, preserveHoodieMetadata, isAppendMode && !populateMetaFields);
handles.put(partitionPath, rowCreateHandle);

writeMetrics.ifPresent(FlinkStreamWriteMetrics::increaseNumOfOpenHandle);
Expand Down Expand Up @@ -216,7 +223,7 @@ public List<WriteStatus> getWriteStatuses(int taskID) {
private HoodieRowDataCreateHandle createWriteHandle(String partitionPath) {
writeMetrics.ifPresent(FlinkStreamWriteMetrics::startHandleCreation);
HoodieRowDataCreateHandle rowCreateHandle = new HoodieRowDataCreateHandle(hoodieTable, writeConfig, partitionPath, getNextFileId(),
instantTime, taskPartitionId, totalSubtaskNum, taskEpochId, rowType, preserveHoodieMetadata);
instantTime, taskPartitionId, totalSubtaskNum, taskEpochId, rowType, preserveHoodieMetadata, isAppendMode && !populateMetaFields);
writeMetrics.ifPresent(FlinkStreamWriteMetrics::endHandleCreation);
return rowCreateHandle;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CollectionUtils;
Expand Down Expand Up @@ -2320,6 +2322,32 @@ void testUpdateDelete(String indexType, HoodieTableType tableType) {
assertRowsEquals(result4, expected4);
}

@ParameterizedTest
@MethodSource("parametersForMetaColumnsSkip")
void testWriteWithoutMetaColumns(HoodieTableType tableType, WriteOperationType operation)
throws TableNotExistException, InterruptedException {
String createSource = TestConfigurations.getFileSourceDDL("source");
streamTableEnv.executeSql(createSource);

String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
.option(FlinkOptions.TABLE_TYPE, tableType)
.option(HoodieTableConfig.POPULATE_META_FIELDS.key(), "false")
.end();
streamTableEnv.executeSql(hoodieTableDDL);
String insertInto = "insert into t1 select * from source";
execInsertSql(streamTableEnv, insertInto);

streamTableEnv.executeSql("drop table t1");
hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
.option(FlinkOptions.TABLE_TYPE, tableType)
.end();
streamTableEnv.executeSql(hoodieTableDDL);
List<Row> rows = execSelectSql(streamTableEnv, "select * from t1", 10);
assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT);
}

@Test
void testReadWithParquetPredicatePushDown() {
TableEnvironment tableEnv = batchTableEnv;
Expand Down Expand Up @@ -2419,6 +2447,16 @@ private static Stream<Arguments> indexAndTableTypeParams() {
return Stream.of(data).map(Arguments::of);
}

private static Stream<Arguments> parametersForMetaColumnsSkip() {
Object[][] data =
new Object[][] {
{HoodieTableType.COPY_ON_WRITE, WriteOperationType.INSERT}
// add MOR upsert check after fixing of HUDI-8785
// {HoodieTableType.MERGE_ON_READ, WriteOperationType.UPSERT}
};
return Stream.of(data).map(Arguments::of);
}

private void execInsertSql(TableEnvironment tEnv, String insert) {
TableResult tableResult = tEnv.executeSql(insert);
// wait to finish
Expand Down
Loading