Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-8631] Fix the bug where the Flink table config hoodie.populate.meta.fields is not effective and optimize write performance #12404

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ protected static int deleteMarker(JavaSparkContext jsc, String instantTime, Stri
HoodieWriteConfig config = client.getConfig();
HoodieEngineContext context = client.getEngineContext();
HoodieSparkTable table = HoodieSparkTable.create(config, context);
client.validateAgainstTableProperties(table.getMetaClient().getTableConfig(), config);
SparkRDDWriteClient.validateAgainstTableProperties(table.getMetaClient().getTableConfig(), config);
WriteMarkersFactory.get(config.getMarkersType(), table, instantTime)
.quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
return 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.apache.hudi.client.utils.TransactionUtils;
import org.apache.hudi.common.HoodiePendingRollbackInfo;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.ActionType;
import org.apache.hudi.common.model.HoodieCommitMetadata;
Expand Down Expand Up @@ -77,7 +76,7 @@
import org.apache.hudi.internal.schema.utils.AvroSchemaEvolutionUtils;
import org.apache.hudi.internal.schema.utils.InternalSchemaUtils;
import org.apache.hudi.internal.schema.utils.SerDeHelper;
import org.apache.hudi.keygen.constant.KeyGeneratorType;
import org.apache.hudi.keygen.KeyGenUtils;
import org.apache.hudi.metadata.HoodieTableMetadataUtil;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.metadata.MetadataPartitionType;
Expand Down Expand Up @@ -1355,7 +1354,7 @@ public final HoodieTable initTable(WriteOperationType operationType, Option<Stri
return table;
}

public void validateAgainstTableProperties(HoodieTableConfig tableConfig, HoodieWriteConfig writeConfig) {
public static void validateAgainstTableProperties(HoodieTableConfig tableConfig, HoodieWriteConfig writeConfig) {
// mismatch of table versions.
CommonClientUtils.validateTableVersion(tableConfig, writeConfig);

Expand All @@ -1365,21 +1364,22 @@ public void validateAgainstTableProperties(HoodieTableConfig tableConfig, Hoodie
throw new HoodieException(HoodieTableConfig.POPULATE_META_FIELDS.key() + " already disabled for the table. Can't be re-enabled back");
}

// Meta fields can be disabled only when either {@code SimpleKeyGenerator}, {@code ComplexKeyGenerator},
// {@code NonpartitionedKeyGenerator} is used
// Meta fields can be disabled only when the number of record key fields must be exactly one
if (!tableConfig.populateMetaFields()) {
String keyGenClass = KeyGeneratorType.getKeyGeneratorClassName(new HoodieConfig(properties));
if (StringUtils.isNullOrEmpty(keyGenClass)) {
keyGenClass = "org.apache.hudi.keygen.SimpleKeyGenerator";
if (KeyGenUtils.getRecordKeyFields(writeConfig.getProps()).size() >= 2) {
throw new HoodieException("When meta fields are not populated, the number of record key fields must be exactly one");
}
if (!keyGenClass.equals("org.apache.hudi.keygen.SimpleKeyGenerator")
&& !keyGenClass.equals("org.apache.hudi.keygen.NonpartitionedKeyGenerator")
&& !keyGenClass.equals("org.apache.hudi.keygen.ComplexKeyGenerator")) {
throw new HoodieException("Only simple, non-partitioned or complex key generator are supported when meta-fields are disabled. Used: " + keyGenClass);
}

// Check if operation metadata fields are allowed
if (writeConfig.allowOperationMetadataField()) {
if (!writeConfig.populateMetaFields()) {
throw new HoodieException("Operation metadata fields are allowed, but populateMetaFields is not enabled. "
+ "Please ensure that populateMetaFields is set to true in the configuration.");
}
}

//Check to make sure it's not a COW table with consistent hashing bucket index
// Check to make sure it's not a COW table with consistent hashing bucket index
if (tableConfig.getTableType() == HoodieTableType.COPY_ON_WRITE) {
HoodieIndex.IndexType indexType = writeConfig.getIndexType();
if (indexType != null && indexType.equals(HoodieIndex.IndexType.BUCKET)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,9 +288,16 @@ private Option<HoodieRecord> prepareRecord(HoodieRecord<T> hoodieRecord) {
}

// Prepend meta-fields into the record
HoodieRecord populatedRecord;
MetadataValues metadataValues = populateMetadataFields(finalRecord);
HoodieRecord populatedRecord =
finalRecord.prependMetaFields(schema, writeSchemaWithMetaFields, metadataValues, recordProperties);
if (!metadataValues.isEmpty()) {
populatedRecord = finalRecord.prependMetaFields(schema, writeSchemaWithMetaFields, metadataValues, recordProperties);
// Deflate record payload after recording success. This will help users access payload as a part of marking record successful
hoodieRecord.deflate();
} else {
// Avoid decoding when there are no meta fields to prevent performance overhead
populatedRecord = hoodieRecord;
}

// NOTE: Record have to be cloned here to make sure if it holds low-level engine-specific
// payload pointing into a shared, mutable (underlying) buffer we get a clean copy of
Expand All @@ -312,10 +319,6 @@ private Option<HoodieRecord> prepareRecord(HoodieRecord<T> hoodieRecord) {
}

writeStatus.markSuccess(hoodieRecord, recordMetadata);
// deflate record payload after recording success. This will help users access payload as a
// part of marking
// record successful.
hoodieRecord.deflate();
return finalRecordOpt;
} catch (Exception e) {
LOG.error("Error writing record " + hoodieRecord, e);
Expand All @@ -336,9 +339,11 @@ private MetadataValues populateMetadataFields(HoodieRecord<T> hoodieRecord) {
metadataValues.setCommitTime(instantTime);
metadataValues.setCommitSeqno(seqId);
}
}
if (config.allowOperationMetadataField()) {
metadataValues.setOperation(hoodieRecord.getOperation().getName());

// ALLOW_OPERATION_METADATA_FIELD can only be enabled when POPULATE_META_FIELDS is enabled
if (config.allowOperationMetadataField()) {
metadataValues.setOperation(hoodieRecord.getOperation().getName());
}
}

return metadataValues;
Expand Down Expand Up @@ -476,7 +481,7 @@ public void doAppend() {
protected void appendDataAndDeleteBlocks(Map<HeaderMetadataType, String> header, boolean appendDeleteBlocks) {
try {
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, instantTime);
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, writeSchemaWithMetaFields.toString());
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, config.populateMetaFields() ? writeSchemaWithMetaFields.toString() : writeSchema.toString());
List<HoodieLogBlock> blocks = new ArrayList<>(2);
if (!recordList.isEmpty()) {
String keyField = config.populateMetaFields()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTa
TaskContextSupplier taskContextSupplier) {
this(config, instantTime, hoodieTable, partitionPath, fileId, taskContextSupplier, true);
this.recordMap = recordMap;
this.useWriterSchema = true;
this.useWriterSchema = config.populateMetaFields();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ public class HoodieMergeHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, O>
private static final Logger LOG = LoggerFactory.getLogger(HoodieMergeHandle.class);

protected Map<String, HoodieRecord<T>> keyToNewRecords;
// Input records (new records) do not include metadata by default, except during compaction
protected boolean isNewRecordWithMetadata = false;
protected Set<String> writtenRecordKeys;
protected HoodieFileWriter fileWriter;
protected boolean preserveMetadata = false;
Expand Down Expand Up @@ -142,6 +144,7 @@ public HoodieMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTab
HoodieBaseFile dataFileToBeMerged, TaskContextSupplier taskContextSupplier, Option<BaseKeyGenerator> keyGeneratorOpt) {
super(config, instantTime, partitionPath, fileId, hoodieTable, taskContextSupplier);
this.keyToNewRecords = keyToNewRecords;
this.isNewRecordWithMetadata = config.populateMetaFields();
this.preserveMetadata = true;
init(fileId, this.partitionPath, dataFileToBeMerged);
validateAndSetAndKeyGenProps(keyGeneratorOpt, config.populateMetaFields());
Expand Down Expand Up @@ -345,12 +348,8 @@ private boolean writeRecord(HoodieRecord<T> newRecord, Option<HoodieRecord> comb
* Go through an old record. Here if we detect a newer version shows up, we write the new one to the file.
*/
public void write(HoodieRecord<T> oldRecord) {
// Use schema with metadata files no matter whether 'hoodie.populate.meta.fields' is enabled
// to avoid unnecessary rewrite. Even with metadata table(whereas the option 'hoodie.populate.meta.fields' is configured as false),
// the record is deserialized with schema including metadata fields,
// see HoodieMergeHelper#runMerge for more details.
Schema oldSchema = writeSchemaWithMetaFields;
Schema newSchema = preserveMetadata ? writeSchemaWithMetaFields : writeSchema;
Schema newSchema = isNewRecordWithMetadata ? writeSchemaWithMetaFields : writeSchema;
boolean copyOldRecord = true;
String key = oldRecord.getRecordKey(oldSchema, keyGeneratorOpt);
TypedProperties props = config.getPayloadConfig().getProps();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,10 @@ public List<WriteStatus> compact(HoodieCompactionHandler compactionHandler,
// its safe to modify config here, since we are running in task side.
((HoodieTable) compactionHandler).getConfig().setDefault(config);
} else {
readerSchema = HoodieAvroUtils.addMetadataFields(
new Schema.Parser().parse(config.getSchema()), config.allowOperationMetadataField());
Schema schema = new Schema.Parser().parse(config.getSchema());
readerSchema = config.populateMetaFields()
? HoodieAvroUtils.addMetadataFields(schema, config.allowOperationMetadataField())
: schema;
}
LOG.info("Compaction operation started for base file: " + operation.getDataFileName() + " and delta files: " + operation.getDeltaFileNames()
+ " for commit " + instantTime);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.client;

import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.common.config.RecordMergeMode;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.HoodieStorageUtils;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import java.io.IOException;
import java.nio.file.Path;
import java.util.Properties;

public class TestBaseHoodieWriteClient {
private HoodieStorage storage;
private StoragePath metaPath;

@TempDir
private Path tempDir;

@BeforeEach
public void setUp() throws IOException {
StorageConfiguration<Configuration> storageConf = HoodieTestUtils.getDefaultStorageConf();
storageConf.set("fs.defaultFS", "file:///");
storageConf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
this.storage = HoodieStorageUtils.getStorage(tempDir.toString(), storageConf);
this.metaPath = new StoragePath(tempDir + "/.hoodie");
}

@Test
public void testValidateAgainstTableProperties() throws IOException {
// Init table config
Properties properties = new Properties();
properties.put(HoodieTableConfig.TYPE.key(), HoodieTableType.COPY_ON_WRITE.toString());
properties.put(HoodieTableConfig.VERSION.key(), 6);
properties.put(HoodieTableConfig.POPULATE_META_FIELDS.key(), false);
HoodieTestUtils.init(this.storage.getConf().newInstance(), this.tempDir.toString(), HoodieTableType.MERGE_ON_READ, properties);

HoodieTableConfig tableConfig = new HoodieTableConfig(this.storage, this.metaPath, RecordMergeMode.EVENT_TIME_ORDERING,
HoodieTableConfig.DEFAULT_PAYLOAD_CLASS_NAME, HoodieRecordMerger.DEFAULT_MERGE_STRATEGY_UUID);

// Test version conflicts
HoodieWriteConfig versionConflictConfig = HoodieWriteConfig.newBuilder()
.withPath(tempDir.toString())
.withWriteTableVersion(8)
.build();
Assertions.assertThrowsExactly(HoodieNotSupportedException.class,
() -> BaseHoodieWriteClient.validateAgainstTableProperties(tableConfig, versionConflictConfig),
"Table version (6) and Writer version (8) do not match.");

// Test hoodie.populate.meta.fields conflicts
HoodieWriteConfig metaFieldsConflictConfig = HoodieWriteConfig.newBuilder()
.withPath(tempDir.toString())
.withWriteTableVersion(6)
.withPopulateMetaFields(true)
.build();
Assertions.assertThrowsExactly(HoodieException.class,
() -> BaseHoodieWriteClient.validateAgainstTableProperties(tableConfig, metaFieldsConflictConfig),
"hoodie.populate.meta.fields already disabled for the table. Can't be re-enabled back");

// Test record key fields conflicts
HoodieWriteConfig recordKeyConflictConfig = HoodieWriteConfig.newBuilder()
.withPath(tempDir.toString())
.withWriteTableVersion(6)
.withPopulateMetaFields(false)
.withIndexConfig(HoodieIndexConfig.newBuilder().withRecordKeyField("a,b").build())
.build();
Assertions.assertThrowsExactly(HoodieException.class,
() -> BaseHoodieWriteClient.validateAgainstTableProperties(tableConfig, recordKeyConflictConfig),
"When meta fields are not populated, the number of record key fields must be exactly one");

// Test hoodie.allow.operation.metadata.field conflicts
HoodieWriteConfig operationMetaFieldConflictConfig = HoodieWriteConfig.newBuilder()
.withPath(tempDir.toString())
.withWriteTableVersion(6)
.withPopulateMetaFields(false)
.withIndexConfig(HoodieIndexConfig.newBuilder().withRecordKeyField("a").build())
.withAllowOperationMetadataField(true)
.build();
Assertions.assertThrowsExactly(HoodieException.class,
() -> BaseHoodieWriteClient.validateAgainstTableProperties(tableConfig, operationMetaFieldConflictConfig),
"Operation metadata fields are allowed, but populateMetaFields is not enabled. "
+ "Please ensure that populateMetaFields is set to true in the configuration.");

// Test hoodie.index.bucket.engine conflicts
HoodieWriteConfig bucketIndexEngineTypeConflictConfig = HoodieWriteConfig.newBuilder()
.withPath(tempDir.toString())
.withWriteTableVersion(6)
.withPopulateMetaFields(false)
.withIndexConfig(HoodieIndexConfig.newBuilder()
.withRecordKeyField("a")
.withIndexType(HoodieIndex.IndexType.BUCKET)
.withBucketIndexEngineType(HoodieIndex.BucketIndexEngineType.CONSISTENT_HASHING)
.build())
.withAllowOperationMetadataField(true)
.build();
Assertions.assertThrowsExactly(HoodieException.class,
() -> BaseHoodieWriteClient.validateAgainstTableProperties(tableConfig, bucketIndexEngineTypeConflictConfig),
"Consistent hashing bucket index does not work with COW table. Use simple bucket index or an MOR table.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -916,8 +916,8 @@ private void verifyMetadataRawRecords(HoodieTable table, List<HoodieLogFile> log
assertNotNull(record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD));
} else {
// Metadata table records should not have meta fields!
assertNull(record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
assertNull(record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD));
assertNull(record.getSchema().getField(HoodieRecord.RECORD_KEY_METADATA_FIELD));
assertNull(record.getSchema().getField(HoodieRecord.COMMIT_TIME_METADATA_FIELD));
}

final String key = String.valueOf(record.get(HoodieMetadataPayload.KEY_FIELD_NAME));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1394,8 +1394,8 @@ private void verifyMetadataRawRecords(HoodieTable table, List<HoodieLogFile> log
assertNotNull(record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD));
} else {
// Metadata table records should not have meta fields!
assertNull(record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
assertNull(record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD));
assertNull(record.getSchema().getField(HoodieRecord.RECORD_KEY_METADATA_FIELD));
assertNull(record.getSchema().getField(HoodieRecord.COMMIT_TIME_METADATA_FIELD));
}

final String key = String.valueOf(record.get(HoodieMetadataPayload.KEY_FIELD_NAME));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -470,8 +470,8 @@ private void verifyMetadataRawRecords(HoodieTable table, List<HoodieLogFile> log
HoodieRecordType.AVRO)) {
recordItr.forEachRemaining(indexRecord -> {
final GenericRecord record = (GenericRecord) indexRecord.getData();
assertNull(record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
assertNull(record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD));
assertNull(record.getSchema().getField(HoodieRecord.RECORD_KEY_METADATA_FIELD));
assertNull(record.getSchema().getField(HoodieRecord.COMMIT_TIME_METADATA_FIELD));
final String key = String.valueOf(record.get(HoodieMetadataPayload.KEY_FIELD_NAME));
assertFalse(key.isEmpty());
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -412,10 +412,9 @@ public void testDeduplicationOnInsert(boolean populateMetaFields) throws Excepti
/**
* Test De-duplication behavior for HoodieWriteClient insert API.
*/
@ParameterizedTest
@MethodSource("populateMetaFieldsParams")
public void testDeduplicationKeepOperationFieldOnInsert(boolean populateMetaFields) throws Exception {
testDeduplication((client, records, commitTime) -> (List<WriteStatus>) rdd2List.apply((JavaRDD<WriteStatus>)client.insert(list2Rdd.apply(records), commitTime)), populateMetaFields, true);
@Test
public void testDeduplicationKeepOperationFieldOnInsert() throws Exception {
testDeduplication((client, records, commitTime) -> (List<WriteStatus>) rdd2List.apply((JavaRDD<WriteStatus>)client.insert(list2Rdd.apply(records), commitTime)), true, true);
}

/**
Expand Down
Loading
Loading