diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java index d752f9f27b90..6f2df3caffa1 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java @@ -280,7 +280,7 @@ protected static int deleteMarker(JavaSparkContext jsc, String instantTime, Stri HoodieWriteConfig config = client.getConfig(); HoodieEngineContext context = client.getEngineContext(); HoodieSparkTable table = HoodieSparkTable.create(config, context); - client.validateAgainstTableProperties(table.getMetaClient().getTableConfig(), config); + SparkRDDWriteClient.validateAgainstTableProperties(table.getMetaClient().getTableConfig(), config); WriteMarkersFactory.get(config.getMarkersType(), table, instantTime) .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism()); return 0; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java index c00c389e12ec..689c1b8af951 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java @@ -33,7 +33,6 @@ import org.apache.hudi.client.utils.TransactionUtils; import org.apache.hudi.common.HoodiePendingRollbackInfo; import org.apache.hudi.common.config.HoodieCommonConfig; -import org.apache.hudi.common.config.HoodieConfig; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.ActionType; import org.apache.hudi.common.model.HoodieCommitMetadata; @@ -77,7 +76,7 @@ import org.apache.hudi.internal.schema.utils.AvroSchemaEvolutionUtils; import org.apache.hudi.internal.schema.utils.InternalSchemaUtils; import org.apache.hudi.internal.schema.utils.SerDeHelper; -import org.apache.hudi.keygen.constant.KeyGeneratorType; +import org.apache.hudi.keygen.KeyGenUtils; import org.apache.hudi.metadata.HoodieTableMetadataUtil; import org.apache.hudi.metadata.HoodieTableMetadataWriter; import org.apache.hudi.metadata.MetadataPartitionType; @@ -1355,7 +1354,7 @@ public final HoodieTable initTable(WriteOperationType operationType, Option= 2) { + throw new HoodieException("When meta fields are not populated, the number of record key fields must be exactly one"); } - if (!keyGenClass.equals("org.apache.hudi.keygen.SimpleKeyGenerator") - && !keyGenClass.equals("org.apache.hudi.keygen.NonpartitionedKeyGenerator") - && !keyGenClass.equals("org.apache.hudi.keygen.ComplexKeyGenerator")) { - throw new HoodieException("Only simple, non-partitioned or complex key generator are supported when meta-fields are disabled. Used: " + keyGenClass); + } + + // Check if operation metadata fields are allowed + if (writeConfig.allowOperationMetadataField()) { + if (!writeConfig.populateMetaFields()) { + throw new HoodieException("Operation metadata fields are allowed, but populateMetaFields is not enabled. " + + "Please ensure that populateMetaFields is set to true in the configuration."); } } - //Check to make sure it's not a COW table with consistent hashing bucket index + // Check to make sure it's not a COW table with consistent hashing bucket index if (tableConfig.getTableType() == HoodieTableType.COPY_ON_WRITE) { HoodieIndex.IndexType indexType = writeConfig.getIndexType(); if (indexType != null && indexType.equals(HoodieIndex.IndexType.BUCKET)) { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java index 46dc9ad55bc0..6cf90c9dd34c 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java @@ -288,9 +288,16 @@ private Option prepareRecord(HoodieRecord hoodieRecord) { } // Prepend meta-fields into the record + HoodieRecord populatedRecord; MetadataValues metadataValues = populateMetadataFields(finalRecord); - HoodieRecord populatedRecord = - finalRecord.prependMetaFields(schema, writeSchemaWithMetaFields, metadataValues, recordProperties); + if (!metadataValues.isEmpty()) { + populatedRecord = finalRecord.prependMetaFields(schema, writeSchemaWithMetaFields, metadataValues, recordProperties); + // Deflate record payload after recording success. This will help users access payload as a part of marking record successful + hoodieRecord.deflate(); + } else { + // Avoid decoding when there are no meta fields to prevent performance overhead + populatedRecord = hoodieRecord; + } // NOTE: Record have to be cloned here to make sure if it holds low-level engine-specific // payload pointing into a shared, mutable (underlying) buffer we get a clean copy of @@ -312,10 +319,6 @@ private Option prepareRecord(HoodieRecord hoodieRecord) { } writeStatus.markSuccess(hoodieRecord, recordMetadata); - // deflate record payload after recording success. This will help users access payload as a - // part of marking - // record successful. - hoodieRecord.deflate(); return finalRecordOpt; } catch (Exception e) { LOG.error("Error writing record " + hoodieRecord, e); @@ -336,9 +339,11 @@ private MetadataValues populateMetadataFields(HoodieRecord hoodieRecord) { metadataValues.setCommitTime(instantTime); metadataValues.setCommitSeqno(seqId); } - } - if (config.allowOperationMetadataField()) { - metadataValues.setOperation(hoodieRecord.getOperation().getName()); + + // ALLOW_OPERATION_METADATA_FIELD can only be enabled when POPULATE_META_FIELDS is enabled + if (config.allowOperationMetadataField()) { + metadataValues.setOperation(hoodieRecord.getOperation().getName()); + } } return metadataValues; @@ -476,7 +481,7 @@ public void doAppend() { protected void appendDataAndDeleteBlocks(Map header, boolean appendDeleteBlocks) { try { header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, instantTime); - header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, writeSchemaWithMetaFields.toString()); + header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, config.populateMetaFields() ? writeSchemaWithMetaFields.toString() : writeSchema.toString()); List blocks = new ArrayList<>(2); if (!recordList.isEmpty()) { String keyField = config.populateMetaFields() diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java index 12406927ae61..8a9f34d094ca 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java @@ -119,7 +119,7 @@ public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTa TaskContextSupplier taskContextSupplier) { this(config, instantTime, hoodieTable, partitionPath, fileId, taskContextSupplier, true); this.recordMap = recordMap; - this.useWriterSchema = true; + this.useWriterSchema = config.populateMetaFields(); } @Override diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java index 707c86dd73ac..bfc99b88ca98 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java @@ -102,6 +102,8 @@ public class HoodieMergeHandle extends HoodieWriteHandle private static final Logger LOG = LoggerFactory.getLogger(HoodieMergeHandle.class); protected Map> keyToNewRecords; + // Input records (new records) do not include metadata by default, except during compaction + protected boolean isNewRecordWithMetadata = false; protected Set writtenRecordKeys; protected HoodieFileWriter fileWriter; protected boolean preserveMetadata = false; @@ -142,6 +144,7 @@ public HoodieMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTab HoodieBaseFile dataFileToBeMerged, TaskContextSupplier taskContextSupplier, Option keyGeneratorOpt) { super(config, instantTime, partitionPath, fileId, hoodieTable, taskContextSupplier); this.keyToNewRecords = keyToNewRecords; + this.isNewRecordWithMetadata = config.populateMetaFields(); this.preserveMetadata = true; init(fileId, this.partitionPath, dataFileToBeMerged); validateAndSetAndKeyGenProps(keyGeneratorOpt, config.populateMetaFields()); @@ -345,12 +348,8 @@ private boolean writeRecord(HoodieRecord newRecord, Option comb * Go through an old record. Here if we detect a newer version shows up, we write the new one to the file. */ public void write(HoodieRecord oldRecord) { - // Use schema with metadata files no matter whether 'hoodie.populate.meta.fields' is enabled - // to avoid unnecessary rewrite. Even with metadata table(whereas the option 'hoodie.populate.meta.fields' is configured as false), - // the record is deserialized with schema including metadata fields, - // see HoodieMergeHelper#runMerge for more details. Schema oldSchema = writeSchemaWithMetaFields; - Schema newSchema = preserveMetadata ? writeSchemaWithMetaFields : writeSchema; + Schema newSchema = isNewRecordWithMetadata ? writeSchemaWithMetaFields : writeSchema; boolean copyOldRecord = true; String key = oldRecord.getRecordKey(oldSchema, keyGeneratorOpt); TypedProperties props = config.getPayloadConfig().getProps(); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java index e92c2a5d7c2d..9022bf21acfc 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java @@ -170,8 +170,10 @@ public List compact(HoodieCompactionHandler compactionHandler, // its safe to modify config here, since we are running in task side. ((HoodieTable) compactionHandler).getConfig().setDefault(config); } else { - readerSchema = HoodieAvroUtils.addMetadataFields( - new Schema.Parser().parse(config.getSchema()), config.allowOperationMetadataField()); + Schema schema = new Schema.Parser().parse(config.getSchema()); + readerSchema = config.populateMetaFields() + ? HoodieAvroUtils.addMetadataFields(schema, config.allowOperationMetadataField()) + : schema; } LOG.info("Compaction operation started for base file: " + operation.getDataFileName() + " and delta files: " + operation.getDeltaFileNames() + " for commit " + instantTime); diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/TestBaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/TestBaseHoodieWriteClient.java new file mode 100644 index 000000000000..d8bbc8d9cde3 --- /dev/null +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/TestBaseHoodieWriteClient.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hudi.common.config.RecordMergeMode; +import org.apache.hudi.common.model.HoodieRecordMerger; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.HoodieTableConfig; +import org.apache.hudi.common.testutils.HoodieTestUtils; +import org.apache.hudi.config.HoodieIndexConfig; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieNotSupportedException; +import org.apache.hudi.index.HoodieIndex; +import org.apache.hudi.storage.HoodieStorage; +import org.apache.hudi.storage.HoodieStorageUtils; +import org.apache.hudi.storage.StorageConfiguration; +import org.apache.hudi.storage.StoragePath; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.Properties; + +public class TestBaseHoodieWriteClient { + private HoodieStorage storage; + private StoragePath metaPath; + + @TempDir + private Path tempDir; + + @BeforeEach + public void setUp() throws IOException { + StorageConfiguration storageConf = HoodieTestUtils.getDefaultStorageConf(); + storageConf.set("fs.defaultFS", "file:///"); + storageConf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName()); + this.storage = HoodieStorageUtils.getStorage(tempDir.toString(), storageConf); + this.metaPath = new StoragePath(tempDir + "/.hoodie"); + } + + @Test + public void testValidateAgainstTableProperties() throws IOException { + // Init table config + Properties properties = new Properties(); + properties.put(HoodieTableConfig.TYPE.key(), HoodieTableType.COPY_ON_WRITE.toString()); + properties.put(HoodieTableConfig.VERSION.key(), 6); + properties.put(HoodieTableConfig.POPULATE_META_FIELDS.key(), false); + HoodieTestUtils.init(this.storage.getConf().newInstance(), this.tempDir.toString(), HoodieTableType.MERGE_ON_READ, properties); + + HoodieTableConfig tableConfig = new HoodieTableConfig(this.storage, this.metaPath, RecordMergeMode.EVENT_TIME_ORDERING, + HoodieTableConfig.DEFAULT_PAYLOAD_CLASS_NAME, HoodieRecordMerger.DEFAULT_MERGE_STRATEGY_UUID); + + // Test version conflicts + HoodieWriteConfig versionConflictConfig = HoodieWriteConfig.newBuilder() + .withPath(tempDir.toString()) + .withWriteTableVersion(8) + .build(); + Assertions.assertThrowsExactly(HoodieNotSupportedException.class, + () -> BaseHoodieWriteClient.validateAgainstTableProperties(tableConfig, versionConflictConfig), + "Table version (6) and Writer version (8) do not match."); + + // Test hoodie.populate.meta.fields conflicts + HoodieWriteConfig metaFieldsConflictConfig = HoodieWriteConfig.newBuilder() + .withPath(tempDir.toString()) + .withWriteTableVersion(6) + .withPopulateMetaFields(true) + .build(); + Assertions.assertThrowsExactly(HoodieException.class, + () -> BaseHoodieWriteClient.validateAgainstTableProperties(tableConfig, metaFieldsConflictConfig), + "hoodie.populate.meta.fields already disabled for the table. Can't be re-enabled back"); + + // Test record key fields conflicts + HoodieWriteConfig recordKeyConflictConfig = HoodieWriteConfig.newBuilder() + .withPath(tempDir.toString()) + .withWriteTableVersion(6) + .withPopulateMetaFields(false) + .withIndexConfig(HoodieIndexConfig.newBuilder().withRecordKeyField("a,b").build()) + .build(); + Assertions.assertThrowsExactly(HoodieException.class, + () -> BaseHoodieWriteClient.validateAgainstTableProperties(tableConfig, recordKeyConflictConfig), + "When meta fields are not populated, the number of record key fields must be exactly one"); + + // Test hoodie.allow.operation.metadata.field conflicts + HoodieWriteConfig operationMetaFieldConflictConfig = HoodieWriteConfig.newBuilder() + .withPath(tempDir.toString()) + .withWriteTableVersion(6) + .withPopulateMetaFields(false) + .withIndexConfig(HoodieIndexConfig.newBuilder().withRecordKeyField("a").build()) + .withAllowOperationMetadataField(true) + .build(); + Assertions.assertThrowsExactly(HoodieException.class, + () -> BaseHoodieWriteClient.validateAgainstTableProperties(tableConfig, operationMetaFieldConflictConfig), + "Operation metadata fields are allowed, but populateMetaFields is not enabled. " + + "Please ensure that populateMetaFields is set to true in the configuration."); + + // Test hoodie.index.bucket.engine conflicts + HoodieWriteConfig bucketIndexEngineTypeConflictConfig = HoodieWriteConfig.newBuilder() + .withPath(tempDir.toString()) + .withWriteTableVersion(6) + .withPopulateMetaFields(false) + .withIndexConfig(HoodieIndexConfig.newBuilder() + .withRecordKeyField("a") + .withIndexType(HoodieIndex.IndexType.BUCKET) + .withBucketIndexEngineType(HoodieIndex.BucketIndexEngineType.CONSISTENT_HASHING) + .build()) + .withAllowOperationMetadataField(true) + .build(); + Assertions.assertThrowsExactly(HoodieException.class, + () -> BaseHoodieWriteClient.validateAgainstTableProperties(tableConfig, bucketIndexEngineTypeConflictConfig), + "Consistent hashing bucket index does not work with COW table. Use simple bucket index or an MOR table."); + } +} diff --git a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java index 59b94670eec5..5b331a756006 100644 --- a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java +++ b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java @@ -916,8 +916,8 @@ private void verifyMetadataRawRecords(HoodieTable table, List log assertNotNull(record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD)); } else { // Metadata table records should not have meta fields! - assertNull(record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD)); - assertNull(record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD)); + assertNull(record.getSchema().getField(HoodieRecord.RECORD_KEY_METADATA_FIELD)); + assertNull(record.getSchema().getField(HoodieRecord.COMMIT_TIME_METADATA_FIELD)); } final String key = String.valueOf(record.get(HoodieMetadataPayload.KEY_FIELD_NAME)); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java index 344df1981b71..3a93df5fb1e8 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java @@ -1394,8 +1394,8 @@ private void verifyMetadataRawRecords(HoodieTable table, List log assertNotNull(record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD)); } else { // Metadata table records should not have meta fields! - assertNull(record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD)); - assertNull(record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD)); + assertNull(record.getSchema().getField(HoodieRecord.RECORD_KEY_METADATA_FIELD)); + assertNull(record.getSchema().getField(HoodieRecord.COMMIT_TIME_METADATA_FIELD)); } final String key = String.valueOf(record.get(HoodieMetadataPayload.KEY_FIELD_NAME)); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java index 798dc0bdc8c3..19e86d565900 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java @@ -470,8 +470,8 @@ private void verifyMetadataRawRecords(HoodieTable table, List log HoodieRecordType.AVRO)) { recordItr.forEachRemaining(indexRecord -> { final GenericRecord record = (GenericRecord) indexRecord.getData(); - assertNull(record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD)); - assertNull(record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD)); + assertNull(record.getSchema().getField(HoodieRecord.RECORD_KEY_METADATA_FIELD)); + assertNull(record.getSchema().getField(HoodieRecord.COMMIT_TIME_METADATA_FIELD)); final String key = String.valueOf(record.get(HoodieMetadataPayload.KEY_FIELD_NAME)); assertFalse(key.isEmpty()); }); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java index 965c9505c48a..7bee12ab2cf6 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java @@ -412,10 +412,9 @@ public void testDeduplicationOnInsert(boolean populateMetaFields) throws Excepti /** * Test De-duplication behavior for HoodieWriteClient insert API. */ - @ParameterizedTest - @MethodSource("populateMetaFieldsParams") - public void testDeduplicationKeepOperationFieldOnInsert(boolean populateMetaFields) throws Exception { - testDeduplication((client, records, commitTime) -> (List) rdd2List.apply((JavaRDD)client.insert(list2Rdd.apply(records), commitTime)), populateMetaFields, true); + @Test + public void testDeduplicationKeepOperationFieldOnInsert() throws Exception { + testDeduplication((client, records, commitTime) -> (List) rdd2List.apply((JavaRDD)client.insert(list2Rdd.apply(records), commitTime)), true, true); } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java index b225bacce0c8..0bc54e3322bc 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java @@ -262,7 +262,7 @@ public class HoodieTableConfig extends HoodieConfig { .key("hoodie.populate.meta.fields") .defaultValue(true) .withDocumentation("When enabled, populates all meta fields. When disabled, no meta fields are populated " - + "and incremental queries will not be functional. This is only meant to be used for append only/immutable data for batch processing"); + + "and incremental queries will not be functional. In the disabled state, the number of record key fields must be equal to one"); public static final ConfigProperty KEY_GENERATOR_CLASS_NAME = ConfigProperty .key("hoodie.table.keygenerator.class") diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java index 522b00411278..2ad4e75ef94b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java @@ -651,7 +651,9 @@ public Pair getLogRecordScanner(List isFullScanAllowedForPartition(partitionName)); // Load the schema - Schema schema = HoodieAvroUtils.addMetadataFields(HoodieMetadataRecord.getClassSchema()); + Schema schema = metadataTableConfig.populateMetaFields() + ? HoodieAvroUtils.addMetadataFields(HoodieMetadataRecord.getClassSchema()) + : HoodieMetadataRecord.getClassSchema(); HoodieCommonConfig commonConfig = HoodieCommonConfig.newBuilder().fromProperties(metadataConfig.getProps()).build(); HoodieMetadataLogRecordReader logRecordScanner = HoodieMetadataLogRecordReader.newBuilder() .withStorage(metadataMetaClient.getStorage()) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index 7f7d5f3855f4..1c6a7958e8b4 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -264,6 +264,7 @@ public static HoodieTableMetaClient initTableIfNotExists( .setPreCombineField(OptionsResolver.getPreCombineField(conf)) .setArchiveLogFolder(TIMELINE_HISTORY_PATH.defaultValue()) .setPartitionFields(conf.getString(FlinkOptions.PARTITION_PATH_FIELD, null)) + .setPopulateMetaFields(conf.getBoolean(HoodieTableConfig.POPULATE_META_FIELDS.key(), HoodieTableConfig.POPULATE_META_FIELDS.defaultValue())) .setKeyGeneratorClassProp( conf.getOptional(FlinkOptions.KEYGEN_CLASS_NAME).orElse(SimpleAvroKeyGenerator.class.getName())) .setHiveStylePartitioningEnable(conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING)) diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java index a5d3b3ece4d9..16b3ff8ec6f3 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java @@ -20,6 +20,7 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.util.Option; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.HadoopConfigurations; @@ -67,7 +68,7 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; +import org.junit.jupiter.params.provider.CsvSource; import java.io.File; import java.util.ArrayList; @@ -115,14 +116,15 @@ public class ITTestDataStreamWrite extends TestLogger { File tempFile; @ParameterizedTest - @ValueSource(strings = {"BUCKET", "FLINK_STATE"}) - public void testWriteCopyOnWrite(String indexType) throws Exception { + @CsvSource({"BUCKET, true", "BUCKET, false", "FLINK_STATE, true", "FLINK_STATE, false"}) + public void testWriteCopyOnWrite(String indexType, boolean populateMetaFields) throws Exception { Configuration conf = TestConfigurations.getDefaultConf(tempFile.toURI().toString()); conf.setString(FlinkOptions.INDEX_TYPE, indexType); conf.setInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, 1); conf.setBoolean(FlinkOptions.PRE_COMBINE, true); + conf.setBoolean(HoodieTableConfig.POPULATE_META_FIELDS.key(), populateMetaFields); - testWriteToHoodie(conf, "cow_write", 2, EXPECTED); + testWriteToHoodie(conf, "cow_write", 2, populateMetaFields ? EXPECTED : removeMetaFields(EXPECTED)); } @Test @@ -160,15 +162,17 @@ public void testWriteCopyOnWriteWithChainedTransformer() throws Exception { } @ParameterizedTest - @ValueSource(strings = {"BUCKET", "FLINK_STATE"}) - public void testWriteMergeOnReadWithCompaction(String indexType) throws Exception { + @CsvSource({"BUCKET, true", "BUCKET, false", "FLINK_STATE, true", "FLINK_STATE, false"}) + public void testWriteMergeOnReadWithCompaction(String indexType, boolean populateMetaFields) throws Exception { Configuration conf = TestConfigurations.getDefaultConf(tempFile.toURI().toString()); + conf.setString(FlinkOptions.RECORD_KEY_FIELD, FlinkOptions.RECORD_KEY_FIELD.defaultValue()); conf.setString(FlinkOptions.INDEX_TYPE, indexType); conf.setInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, 4); conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1); conf.setString(FlinkOptions.TABLE_TYPE, HoodieTableType.MERGE_ON_READ.name()); + conf.setBoolean(HoodieTableConfig.POPULATE_META_FIELDS.key(), populateMetaFields); - testWriteToHoodie(conf, "mor_write_with_compact", 1, EXPECTED); + testWriteToHoodie(conf, "mor_write_with_compact", 1, populateMetaFields ? EXPECTED : removeMetaFields(EXPECTED)); } @Test @@ -566,4 +570,27 @@ public void testColumnDroppingIsNotAllowed() throws Exception { } throw new AssertionError(String.format("Excepted exception %s is not found", MissingSchemaFieldException.class)); } + + private static Map> removeMetaFields(Map> inputMap) { + Map> resultMap = new HashMap<>(); + + for (Map.Entry> entry : inputMap.entrySet()) { + List updatedList = new ArrayList<>(); + for (String value : entry.getValue()) { + // Split the string by comma + String[] parts = value.split(",", 3); + // Replace the first two elements with "null" + parts[0] = "null"; + parts[1] = "null"; + // Join the array back into a string + String updatedValue = String.join(",", parts); + // Add the updated string to the new list + updatedList.add(updatedValue); + } + // Add the updated list to the resultMap + resultMap.put(entry.getKey(), updatedList); + } + + return resultMap; + } } diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java index 2bee6bf7435b..32c392912b47 100755 --- a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java @@ -23,6 +23,7 @@ import org.apache.hudi.common.model.DeleteRecord; import org.apache.hudi.common.model.HoodieArchivedLogFile; import org.apache.hudi.common.model.HoodieAvroIndexedRecord; +import org.apache.hudi.common.model.HoodieAvroPayload; import org.apache.hudi.common.model.HoodieAvroRecord; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieLogFile; @@ -62,6 +63,7 @@ import org.apache.hudi.common.util.collection.ExternalSpillableMap; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.CorruptedLogFileException; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.storage.HoodieStorage; import org.apache.hudi.storage.HoodieStorageUtils; import org.apache.hudi.storage.StoragePath; @@ -111,6 +113,8 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.hudi.common.model.HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION; +import static org.apache.hudi.common.table.HoodieTableMetaClient.METAFOLDER_NAME; import static org.apache.hudi.common.testutils.HoodieTestUtils.getJavaVersion; import static org.apache.hudi.common.testutils.HoodieTestUtils.shouldUseExternalHdfs; import static org.apache.hudi.common.testutils.HoodieTestUtils.useExternalHdfs; @@ -2805,6 +2809,78 @@ public void testGetRecordPositions(boolean addRecordPositionsHeader) throws IOEx } } + @ParameterizedTest + @MethodSource("testArguments") + public void testLogReaderWithNoMetaFields(ExternalSpillableMap.DiskMapType diskMapType, + boolean isCompressionEnabled, + boolean enableOptimizedLogBlocksScan + ) throws IOException, URISyntaxException, InterruptedException { + final String recordKeyField = "name"; + final String instantTime = "100"; + + // Clear the meta directory to ensure a clean test environment + storage.deleteDirectory(new StoragePath(basePath, METAFOLDER_NAME)); + + // Initialize the Hoodie table and do not populate meta fields + Properties properties = new Properties(); + properties.put(HoodieTableConfig.POPULATE_META_FIELDS.key(), false); + properties.put(HoodieTableConfig.RECORDKEY_FIELDS.key(), recordKeyField); + HoodieTestUtils.init(storage.getConf().newInstance(), basePath, HoodieTableType.MERGE_ON_READ, properties); + + // Generate a list of test records + Schema schema = getSimpleSchema(); + SchemaTestUtil testUtil = new SchemaTestUtil(); + List genRecords = testUtil.generateHoodieTestRecords(0, 400, schema).stream().map(r -> { + try { + return (GenericRecord) ((HoodieAvroPayload) r.getData()).getInsertValue(schema).get(); + } catch (IOException e) { + throw new HoodieException(e); + } + }).collect(Collectors.toList()); + + // Deduplicate test records by Record Key to easily verify consistency with scanner results, + // as it is difficult to replicate the deduplication strategy of the Scanner + List duplicatedRecords = genRecords.stream().collect(Collectors.toMap( + r -> r.get(recordKeyField).toString(), // Use the recordKeyField as the key + r -> r, + (r1, r2) -> r1 + )).values().stream().collect(Collectors.toList()); + + List logFiles = writeLogFiles(partitionPath, schema, duplicatedRecords, 4, storage); + FileCreateUtils.createDeltaCommit(basePath, instantTime, storage); + + // Scan all log blocks + HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder() + .withStorage(storage) + .withBasePath(basePath) + .withLogFilePaths(logFiles.stream().map(logFile -> logFile.getPath().toString()).collect(Collectors.toList())) + .withReaderSchema(schema) + .withLatestInstantTime(instantTime) + .withMaxMemorySizeInBytes(10240L) + .withReverseReader(false) + .withBufferSize(BUFFER_SIZE) + .withSpillableMapBasePath(spillableBasePath) + .withDiskMapType(diskMapType) + .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled) + .withOptimizedLogBlocksScan(enableOptimizedLogBlocksScan) + .build(); + + List scannedRecords = new ArrayList<>(); + for (HoodieRecord record : scanner) { + scannedRecords.add((IndexedRecord) ((HoodieAvroRecord) record).getData().getInsertValue(schema).get()); + } + + for (IndexedRecord record : scannedRecords) { + for (String metaField : HOODIE_META_COLUMNS_WITH_OPERATION) { + assertNull(record.getSchema().getField(metaField), "Scanned record has meta field"); + } + } + + assertEquals(sort(duplicatedRecords, recordKeyField), sort(scannedRecords, recordKeyField), + "Scanner records count should be the same as test records"); + scanner.close(); + } + private static Stream testArguments() { // Arg1: ExternalSpillableMap Type, Arg2: isDiskMapCompressionEnabled, Arg3: enableOptimizedLogBlocksScan return Stream.of( @@ -2841,8 +2917,12 @@ private static List getRecords(HoodieDataBlock dataBlock) { } private static List sort(List records) { + return sort(records, HoodieRecord.RECORD_KEY_METADATA_FIELD); + } + + private static List sort(List records, String sortField) { List copy = new ArrayList<>(records); - copy.sort(Comparator.comparing(r -> ((GenericRecord) r).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString())); + copy.sort(Comparator.comparing(r -> ((GenericRecord) r).get(sortField).toString())); return copy; } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java index d9db9cd51d19..2e08365b0716 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java +++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java @@ -69,7 +69,7 @@ public DataSourceInternalWriterHelper(String instantTime, HoodieWriteConfig writ this.metaClient = HoodieTableMetaClient.builder() .setConf(storageConf.newInstance()).setBasePath(writeConfig.getBasePath()).build(); - this.writeClient.validateAgainstTableProperties(this.metaClient.getTableConfig(), writeConfig); + SparkRDDWriteClient.validateAgainstTableProperties(this.metaClient.getTableConfig(), writeConfig); this.writeClient.preWrite(instantTime, WriteOperationType.BULK_INSERT, metaClient); } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieDropPartitionsTool.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieDropPartitionsTool.java index f82c8149e364..b7de0e34ad52 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieDropPartitionsTool.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieDropPartitionsTool.java @@ -324,7 +324,7 @@ public void run() { public void dryRun() { try (SparkRDDWriteClient client = UtilHelpers.createHoodieClient(jsc, cfg.basePath, "", cfg.parallelism, Option.empty(), props)) { HoodieSparkTable table = HoodieSparkTable.create(client.getConfig(), client.getEngineContext()); - client.validateAgainstTableProperties(table.getMetaClient().getTableConfig(), client.getConfig()); + SparkRDDWriteClient.validateAgainstTableProperties(table.getMetaClient().getTableConfig(), client.getConfig()); List parts = Arrays.asList(cfg.partitions.split(",")); Map> partitionToReplaceFileIds = jsc.parallelize(parts, parts.size()).distinct() .mapToPair(partitionPath -> new Tuple2<>(partitionPath, table.getSliceView().getLatestFileSlices(partitionPath).map(fg -> fg.getFileId()).distinct().collect(Collectors.toList())))