Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-8680][DNM] Enable partition stats by default 3 #12526

Closed
Closed
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
4fedaa3
Enabling cols stats by default on writer
nsivabalan Dec 9, 2024
c363b22
Adding more unsupported types
nsivabalan Dec 9, 2024
dc41b97
Fixing few tests
nsivabalan Dec 10, 2024
2889fe0
Fixing few more tests
nsivabalan Dec 10, 2024
c50fbb8
Fixing minor scala issue
nsivabalan Dec 11, 2024
58474cc
Fixing colon issue with TestRecordLevelIndexWithSQL
nsivabalan Dec 11, 2024
897b46f
Fixing scala runtime issue
nsivabalan Dec 11, 2024
2042a48
Fixing failing tests in TestHoodieTimelineArchiver, TestCopyOnWriteRo…
nsivabalan Dec 11, 2024
228bd35
bumping up timeout for UT spark-datasource Java Tests & DDL to 150 mins
nsivabalan Dec 11, 2024
d434099
disabling failing test
nsivabalan Dec 12, 2024
35cf2c0
Disabling multi writer flaky test
nsivabalan Dec 12, 2024
16a847e
Re-enabling few disabled tests
nsivabalan Dec 12, 2024
9af1b20
Re-enabling RLI test
nsivabalan Dec 12, 2024
9316100
Fixing rli with sql test
nsivabalan Dec 13, 2024
0c2abc7
Triaging multi-writer test
nsivabalan Dec 13, 2024
57ebdc2
Enabling col stats by default on writer for spark
nsivabalan Dec 16, 2024
71025b6
Enabling partition stats by default
nsivabalan Dec 16, 2024
5711c46
Fixing DateWrapper col stats generation
nsivabalan Dec 18, 2024
9ae8233
Minor fixes
nsivabalan Dec 18, 2024
417695a
Fixing test failures
nsivabalan Dec 18, 2024
b1e288d
Fixing more test failures
nsivabalan Dec 18, 2024
9843393
Fixing more test failures
nsivabalan Dec 19, 2024
a3b5e63
Fixing tests in flink IT tests
nsivabalan Dec 19, 2024
82be3a7
minor code refactoring
nsivabalan Dec 19, 2024
d88b76c
Fixing time travel query with mdt
nsivabalan Dec 19, 2024
fc2ceb0
reverting disabling of SI test
nsivabalan Dec 19, 2024
0374837
Fixing SI test
nsivabalan Dec 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion azure-pipelines-20230430.yml
Original file line number Diff line number Diff line change
@@ -214,7 +214,7 @@ stages:
displayName: Top 100 long-running testcases
- job: UT_FT_3
displayName: UT spark-datasource Java Tests & DDL
timeoutInMinutes: '90'
timeoutInMinutes: '120'
steps:
- task: Maven@4
displayName: maven install
Original file line number Diff line number Diff line change
@@ -513,8 +513,9 @@ private String generateUniqueInstantTime(String initializationTime) {
}

private Pair<Integer, HoodieData<HoodieRecord>> initializePartitionStatsIndex(List<DirectoryInfo> partitionInfoList) throws IOException {
HoodieData<HoodieRecord> records = HoodieTableMetadataUtil.convertFilesToPartitionStatsRecords(engineContext, getPartitionFileSlicePairs(), dataWriteConfig.getMetadataConfig(), dataMetaClient,
Option.of(new Schema.Parser().parse(dataWriteConfig.getWriteSchema())));
List<Pair<String, FileSlice>> partitionFileSlicePairs = getPartitionFileSlicePairs();
HoodieData<HoodieRecord> records = HoodieTableMetadataUtil.convertFilesToPartitionStatsRecords(engineContext, partitionFileSlicePairs, dataWriteConfig.getMetadataConfig(), dataMetaClient,
partitionFileSlicePairs.isEmpty() ? Option.empty() : Option.of(new Schema.Parser().parse(dataWriteConfig.getWriteSchema())));
final int fileGroupCount = dataWriteConfig.getMetadataConfig().getPartitionStatsIndexFileGroupCount();
return Pair.of(fileGroupCount, records);
}
Original file line number Diff line number Diff line change
@@ -32,6 +32,7 @@
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.marker.MarkerType;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.hudi.config.HoodieArchivalConfig;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
@@ -80,6 +81,7 @@ public class HoodieMetadataWriteUtils {
* @param writeConfig {@code HoodieWriteConfig} of the main dataset writer
* @param failedWritesCleaningPolicy Cleaning policy on failed writes
*/
@VisibleForTesting
public static HoodieWriteConfig createMetadataWriteConfig(
HoodieWriteConfig writeConfig, HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy) {
String tableName = writeConfig.getTableName() + METADATA_TABLE_NAME_SUFFIX;
Original file line number Diff line number Diff line change
@@ -29,6 +29,7 @@
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.config.LockConfiguration;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.fs.FSUtils;
@@ -473,6 +474,7 @@ public void testMetadataTableServices() throws Exception {
.enable(true)
.enableMetrics(false)
.withMaxNumDeltaCommitsBeforeCompaction(3) // after 3 delta commits for regular writer operations, compaction should kick in.
.withMetadataIndexColumnStats(false)
.build()).build();
initWriteConfigAndMetatableWriter(writeConfig, true);

@@ -572,6 +574,7 @@ public void testMetadataTableCompactionWithPendingInstants() throws Exception {
.enable(true)
.enableMetrics(false)
.withMaxNumDeltaCommitsBeforeCompaction(4)
.withMetadataIndexColumnStats(false)
.build()).build();
initWriteConfigAndMetatableWriter(writeConfig, true);
doWriteOperation(testTable, metaClient.createNewInstantTime(), INSERT);
@@ -1681,6 +1684,7 @@ public void testMetadataMultiWriter() throws Exception {
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "1000");
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "20");
HoodieWriteConfig writeConfig = getWriteConfigBuilder(true, true, false)
.withEngineType(EngineType.JAVA)
.withCleanConfig(HoodieCleanConfig.newBuilder()
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(false).build())
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
Original file line number Diff line number Diff line change
@@ -23,6 +23,7 @@
import org.apache.hudi.client.timeline.versioning.v2.TimelineArchiverV2;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
@@ -105,6 +106,7 @@ public void init(HoodieTableType tableType, Option<HoodieWriteConfig> writeConfi
? writeConfig.get() : getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, true,
enableMetadataTable, enableMetrics, true,
validateMetadataPayloadStateConsistency)
.withEngineType(EngineType.JAVA)
.build();
initWriteConfigAndMetatableWriter(this.writeConfig, enableMetadataTable);
}
@@ -306,6 +308,7 @@ protected HoodieWriteConfig.Builder getWriteConfigBuilder(HoodieFailedWritesClea
.enable(useFileListingMetadata)
.enableMetrics(enableMetrics)
.ignoreSpuriousDeletes(validateMetadataPayloadConsistency)
.withMetadataIndexColumnStats(false)
.build())
.withMetricsConfig(HoodieMetricsConfig.newBuilder().on(enableMetrics)
.withExecutorMetrics(enableMetrics).withReporterType(MetricsReporterType.INMEMORY.name()).build())
Original file line number Diff line number Diff line change
@@ -408,7 +408,8 @@ public void testRollbackCommit() throws Exception {
.withRollbackUsingMarkers(false)
.withCleanConfig(HoodieCleanConfig.newBuilder()
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build())
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()).build();
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().withMetadataIndexColumnStats(false).build()).build();

try (HoodieTableMetadataWriter metadataWriter = SparkHoodieBackedTableMetadataWriter.create(storageConf, config, context)) {
HoodieTestTable testTable = HoodieMetadataTestTable.of(metaClient, metadataWriter, Option.of(context));
@@ -633,7 +634,8 @@ public void testAutoRollbackInflightCommit() throws Exception {
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build())
.withCleanConfig(HoodieCleanConfig.newBuilder()
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build()).build();
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().withMetadataIndexColumnStats(false).build()).build();

try (HoodieTableMetadataWriter metadataWriter = SparkHoodieBackedTableMetadataWriter.create(storageConf, config, context)) {
HoodieTestTable testTable = HoodieMetadataTestTable.of(metaClient, metadataWriter, Option.of(context));
@@ -668,7 +670,8 @@ public void testAutoRollbackInflightCommit() throws Exception {
// Set Failed Writes rollback to EAGER
config = HoodieWriteConfig.newBuilder().withPath(basePath)
.withRollbackUsingMarkers(false)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()).build();
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().withMetadataIndexColumnStats(false).build()).build();
final String commitTime5 = "20160506030631";
try (SparkRDDWriteClient client = getHoodieWriteClient(config)) {
client.startCommitWithTime(commitTime5);
@@ -824,7 +827,8 @@ public void testFallbackToListingBasedRollbackForCompletedInstant() throws Excep
.withRollbackUsingMarkers(true) // rollback using markers to test fallback to listing based rollback for completed instant
.withCleanConfig(HoodieCleanConfig.newBuilder()
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build())
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()).build();
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().withMetadataIndexColumnStats(false).build()).build();

// create test table with all commits completed
try (HoodieTableMetadataWriter metadataWriter = SparkHoodieBackedTableMetadataWriter.create(metaClient.getStorageConf(), config, context)) {
Original file line number Diff line number Diff line change
@@ -249,7 +249,9 @@ private HoodieWriteConfig initTestTableAndGetWriteConfig(boolean enableMetadata,
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
.withRemoteServerPort(timelineServicePort).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadata)
.withMaxNumDeltaCommitsBeforeCompaction(maxDeltaCommitsMetadataTable).build())
.withMaxNumDeltaCommitsBeforeCompaction(maxDeltaCommitsMetadataTable)
.withMetadataIndexColumnStats(false).build())
// test uses test table infra. So, col stats is not available/populated.
.withWriteConcurrencyMode(writeConcurrencyMode)
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class)
.build())
@@ -343,7 +345,7 @@ public void testArchivalWithAutoAdjustmentBasedOnCleanConfigs(String cleaningPol
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath)
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
.withRemoteServerPort(timelineServicePort).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(5).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(5).withMetadataIndexColumnStats(false).build())
.withCleanConfig(HoodieCleanConfig.newBuilder()
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER)
.withCleanerPolicy(HoodieCleaningPolicy.valueOf(cleaningPolicy))
@@ -1652,6 +1654,7 @@ public void testArchivalAndCompactionInMetadataTable() throws Exception {
.withRemoteServerPort(timelineServicePort).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true)
.withMaxNumDeltaCommitsBeforeCompaction(8)
.withMetadataIndexColumnStats(false) // test uses test table infra. So, col stats is not available/populated.
.build())
.forTable("test-trip-table").build();
initWriteConfigAndMetatableWriter(writeConfig, true);
Original file line number Diff line number Diff line change
@@ -25,6 +25,7 @@
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.model.HoodieRecord;
@@ -360,8 +361,11 @@ public void testRollbackBackup() throws Exception {
.withBaseFilesInPartition(p1, "id21").getLeft()
.withBaseFilesInPartition(p2, "id22").getLeft();

// we are using test table infra. So, col stats are not populated.
HoodieTable table =
this.getHoodieTable(metaClient, getConfigBuilder().withRollbackBackupEnabled(true).build());
this.getHoodieTable(metaClient, getConfigBuilder().withRollbackBackupEnabled(true)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().withMetadataIndexColumnStats(false).build())
.build());
HoodieInstant needRollBackInstant = HoodieTestUtils.getCompleteInstant(
metaClient.getStorage(), metaClient.getTimelinePath(),
"002", HoodieTimeline.COMMIT_ACTION);
Original file line number Diff line number Diff line change
@@ -342,7 +342,7 @@ public void testKeepLatestFileVersions() throws Exception {
public void testKeepLatestFileVersionsWithBootstrapFileClean() throws Exception {
HoodieWriteConfig config =
HoodieWriteConfig.newBuilder().withPath(basePath)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().withMetadataIndexColumnStats(false).build())
.withCleanConfig(HoodieCleanConfig.newBuilder()
.withCleanBootstrapBaseFileEnabled(true)
.withCleanerParallelism(1)
Original file line number Diff line number Diff line change
@@ -1487,4 +1487,14 @@ public static Comparable<?> unwrapAvroValueWrapper(Object avroValueWrapper) {
throw new UnsupportedOperationException(String.format("Unsupported type of the value (%s)", avroValueWrapper.getClass()));
}
}

public static Comparable<?> unwrapAvroValueWrapper(Object avroValueWrapper, String wrapperClassName) {
if (avroValueWrapper == null) {
return null;
} else if (DateWrapper.class.getSimpleName().equals(wrapperClassName)) {
return LocalDate.ofEpochDay((Integer)((Record) avroValueWrapper).get(0));
} else {
throw new UnsupportedOperationException(String.format("Unsupported type of the value (%s)", avroValueWrapper.getClass()));
}
}
}
Original file line number Diff line number Diff line change
@@ -757,6 +757,11 @@ public Builder withPartitionStatsIndexParallelism(int parallelism) {

public HoodieMetadataConfig build() {
metadataConfig.setDefaultValue(ENABLE, getDefaultMetadataEnable(engineType));
metadataConfig.setDefaultValue(ENABLE_METADATA_INDEX_COLUMN_STATS, getDefaultColStatsEnable(engineType));
// fix me: disable when schema on read is enabled.
// if col stats is disabled, default value for partition stats is false.
metadataConfig.setDefaultValue(ENABLE_METADATA_INDEX_PARTITION_STATS, metadataConfig.isColumnStatsIndexEnabled()
? getDefaultPartitionStatsEnable(engineType) : false);
metadataConfig.setDefaults(HoodieMetadataConfig.class.getName());
return metadataConfig;
}
@@ -772,6 +777,30 @@ private boolean getDefaultMetadataEnable(EngineType engineType) {
throw new HoodieNotSupportedException("Unsupported engine " + engineType);
}
}

private boolean getDefaultColStatsEnable(EngineType engineType) {
switch (engineType) {
case SPARK:
return true;
case FLINK:
case JAVA:
return false;
default:
throw new HoodieNotSupportedException("Unsupported engine " + engineType);
}
}

private boolean getDefaultPartitionStatsEnable(EngineType engineType) {
switch (engineType) {
case SPARK:
return true;
case FLINK:
case JAVA:
return false;
default:
throw new HoodieNotSupportedException("Unsupported engine " + engineType);
}
}
}

/**

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -18,6 +18,7 @@

package org.apache.hudi.metadata;

import org.apache.hudi.avro.model.DateWrapper;
import org.apache.hudi.avro.model.HoodieMetadataBloomFilter;
import org.apache.hudi.avro.model.HoodieMetadataColumnStats;
import org.apache.hudi.avro.model.HoodieMetadataFileInfo;
@@ -295,22 +296,40 @@ private static void constructColumnStatsMetadataPayload(HoodieMetadataPayload pa
checkArgument(record.getSchema().getField(SCHEMA_FIELD_ID_COLUMN_STATS) == null,
String.format("Valid %s record expected for type: %s", SCHEMA_FIELD_ID_COLUMN_STATS, MetadataPartitionType.COLUMN_STATS.getRecordType()));
} else {
payload.columnStatMetadata = HoodieMetadataColumnStats.newBuilder(METADATA_COLUMN_STATS_BUILDER_STUB.get())
.setFileName(columnStatsRecord.get(COLUMN_STATS_FIELD_FILE_NAME).toString())
.setColumnName(columnStatsRecord.get(COLUMN_STATS_FIELD_COLUMN_NAME).toString())
// AVRO-2377 1.9.2 Modified the type of org.apache.avro.Schema#FIELD_RESERVED to Collections.unmodifiableSet.
// This causes Kryo to fail when deserializing a GenericRecord, See HUDI-5484.
// We should avoid using GenericRecord and convert GenericRecord into a serializable type.
.setMinValue(wrapValueIntoAvro(unwrapAvroValueWrapper(columnStatsRecord.get(COLUMN_STATS_FIELD_MIN_VALUE))))
.setMaxValue(wrapValueIntoAvro(unwrapAvroValueWrapper(columnStatsRecord.get(COLUMN_STATS_FIELD_MAX_VALUE))))
.setValueCount((Long) columnStatsRecord.get(COLUMN_STATS_FIELD_VALUE_COUNT))
.setNullCount((Long) columnStatsRecord.get(COLUMN_STATS_FIELD_NULL_COUNT))
.setTotalSize((Long) columnStatsRecord.get(COLUMN_STATS_FIELD_TOTAL_SIZE))
.setTotalUncompressedSize((Long) columnStatsRecord.get(COLUMN_STATS_FIELD_TOTAL_UNCOMPRESSED_SIZE))
.setIsDeleted((Boolean) columnStatsRecord.get(COLUMN_STATS_FIELD_IS_DELETED))
.setIsTightBound((Boolean) columnStatsRecord.get(COLUMN_STATS_FIELD_IS_TIGHT_BOUND))
.build();
try {
boolean isMinValueDateWrapper = getIsDateWrapper(record, COLUMN_STATS_FIELD_MIN_VALUE);
boolean isMaxValueDateWrapper = getIsDateWrapper(record, COLUMN_STATS_FIELD_MAX_VALUE);
payload.columnStatMetadata = HoodieMetadataColumnStats.newBuilder(METADATA_COLUMN_STATS_BUILDER_STUB.get())
.setFileName(columnStatsRecord.get(COLUMN_STATS_FIELD_FILE_NAME).toString())
.setColumnName(columnStatsRecord.get(COLUMN_STATS_FIELD_COLUMN_NAME).toString())
// AVRO-2377 1.9.2 Modified the type of org.apache.avro.Schema#FIELD_RESERVED to Collections.unmodifiableSet.
// This causes Kryo to fail when deserializing a GenericRecord, See HUDI-5484.
// We should avoid using GenericRecord and convert GenericRecord into a serializable type.
.setMinValue(wrapValueIntoAvro(
isMinValueDateWrapper ? unwrapAvroValueWrapper(columnStatsRecord.get(COLUMN_STATS_FIELD_MIN_VALUE), DateWrapper.class.getSimpleName())
: unwrapAvroValueWrapper(columnStatsRecord.get(COLUMN_STATS_FIELD_MIN_VALUE))))
.setMaxValue(wrapValueIntoAvro(isMaxValueDateWrapper ? unwrapAvroValueWrapper(columnStatsRecord.get(COLUMN_STATS_FIELD_MAX_VALUE), DateWrapper.class.getSimpleName())
: unwrapAvroValueWrapper(columnStatsRecord.get(COLUMN_STATS_FIELD_MAX_VALUE))))
.setValueCount((Long) columnStatsRecord.get(COLUMN_STATS_FIELD_VALUE_COUNT))
.setNullCount((Long) columnStatsRecord.get(COLUMN_STATS_FIELD_NULL_COUNT))
.setTotalSize((Long) columnStatsRecord.get(COLUMN_STATS_FIELD_TOTAL_SIZE))
.setTotalUncompressedSize((Long) columnStatsRecord.get(COLUMN_STATS_FIELD_TOTAL_UNCOMPRESSED_SIZE))
.setIsDeleted((Boolean) columnStatsRecord.get(COLUMN_STATS_FIELD_IS_DELETED))
.setIsTightBound((Boolean) columnStatsRecord.get(COLUMN_STATS_FIELD_IS_TIGHT_BOUND))
.build();
} catch (Throwable e) {
System.out.println("adfasdf");
throw e;
}
}
}

private static boolean getIsDateWrapper(GenericRecord record, String subFieldName) {
Object minValue = ((GenericRecord) record.get(SCHEMA_FIELD_ID_COLUMN_STATS)).get(subFieldName);
if (minValue != null) {
return ((GenericRecord) minValue).getSchema().getName().equals(DateWrapper.class.getSimpleName());
}
return false;
}

/**
Original file line number Diff line number Diff line change
@@ -71,7 +71,7 @@ public void testPartitionEnabledByConfigOnly(MetadataPartitionType partitionType
break;
case COLUMN_STATS:
metadataConfigBuilder.enable(true).withMetadataIndexColumnStats(true);
expectedEnabledPartitions = 3;
expectedEnabledPartitions = 2;
break;
case BLOOM_FILTERS:
metadataConfigBuilder.enable(true).withMetadataIndexBloomFilter(true);
@@ -83,7 +83,7 @@ public void testPartitionEnabledByConfigOnly(MetadataPartitionType partitionType
break;
case PARTITION_STATS:
metadataConfigBuilder.enable(true).withMetadataIndexPartitionStats(true).withColumnStatsIndexForColumns("partitionCol");
expectedEnabledPartitions = 3;
expectedEnabledPartitions = 2;
break;
default:
throw new IllegalArgumentException("Unknown partition type: " + partitionType);
@@ -93,10 +93,10 @@ public void testPartitionEnabledByConfigOnly(MetadataPartitionType partitionType

// Verify partition type is enabled due to config
if (partitionType == MetadataPartitionType.EXPRESSION_INDEX || partitionType == MetadataPartitionType.SECONDARY_INDEX) {
assertEquals(2, enabledPartitions.size(), "EXPRESSION_INDEX should be enabled by SQL, only FILES and SECONDARY_INDEX is enabled in this case.");
assertEquals(2 + 2, enabledPartitions.size(), "EXPRESSION_INDEX should be enabled by SQL, only FILES and SECONDARY_INDEX is enabled in this case.");
assertTrue(enabledPartitions.contains(MetadataPartitionType.FILES));
} else {
assertEquals(expectedEnabledPartitions, enabledPartitions.size());
assertEquals(expectedEnabledPartitions + 2, enabledPartitions.size());
assertTrue(enabledPartitions.contains(partitionType) || MetadataPartitionType.ALL_PARTITIONS.equals(partitionType));
}
}
@@ -116,7 +116,7 @@ public void testPartitionAvailableByMetaClientOnly() {
List<MetadataPartitionType> enabledPartitions = MetadataPartitionType.getEnabledPartitions(metadataConfig.getProps(), metaClient);

// Verify RECORD_INDEX and FILES is enabled due to availability, and SECONDARY_INDEX by default
assertEquals(3, enabledPartitions.size(), "RECORD_INDEX, SECONDARY_INDEX and FILES should be available");
assertEquals(5, enabledPartitions.size(), "RECORD_INDEX, SECONDARY_INDEX, FILES, COL_STATS, PARTITION_STATS should be available");
assertTrue(enabledPartitions.contains(MetadataPartitionType.FILES), "FILES should be enabled by availability");
assertTrue(enabledPartitions.contains(MetadataPartitionType.RECORD_INDEX), "RECORD_INDEX should be enabled by availability");
assertTrue(enabledPartitions.contains(MetadataPartitionType.SECONDARY_INDEX), "SECONDARY_INDEX should be enabled by default");
@@ -156,7 +156,7 @@ public void testExpressionIndexPartitionEnabled() {
List<MetadataPartitionType> enabledPartitions = MetadataPartitionType.getEnabledPartitions(metadataConfig.getProps(), metaClient);

// Verify EXPRESSION_INDEX and FILES is enabled due to availability, and SECONDARY_INDEX by default
assertEquals(3, enabledPartitions.size(), "EXPRESSION_INDEX, FILES and SECONDARY_INDEX should be available");
assertEquals(5, enabledPartitions.size(), "EXPRESSION_INDEX, FILES, COL_STATS, PARTITION_STATS and SECONDARY_INDEX should be available");
assertTrue(enabledPartitions.contains(MetadataPartitionType.FILES), "FILES should be enabled by availability");
assertTrue(enabledPartitions.contains(MetadataPartitionType.EXPRESSION_INDEX), "EXPRESSION_INDEX should be enabled by availability");
assertTrue(enabledPartitions.contains(MetadataPartitionType.SECONDARY_INDEX), "SECONDARY_INDEX should be enabled by default");
Original file line number Diff line number Diff line change
@@ -19,6 +19,7 @@
package org.apache.hudi.sink;

import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.WriteConcurrencyMode;
@@ -614,6 +615,8 @@ public void testWriteMultiWriterPartialOverlapping(WriteConcurrencyMode writeCon
public void testReuseEmbeddedServer() throws IOException {
conf.setInteger("hoodie.filesystem.view.remote.timeout.secs", 500);
conf.setString("hoodie.metadata.enable","true");
conf.setString(HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key(), "false");

HoodieFlinkWriteClient writeClient = null;
HoodieFlinkWriteClient writeClient2 = null;

Original file line number Diff line number Diff line change
@@ -19,6 +19,7 @@
package org.apache.hudi.sink;

import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.PartialUpdateAvroPayload;
import org.apache.hudi.common.model.WriteConcurrencyMode;
@@ -96,6 +97,7 @@ public void testNonBlockingConcurrencyControlWithPartialUpdatePayload() throws E
// disable schedule compaction in writers
conf.setBoolean(FlinkOptions.COMPACTION_SCHEDULE_ENABLED, false);
conf.setBoolean(FlinkOptions.PRE_COMBINE, true);
conf.setString(HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key(), "false");

// start pipeline1 and insert record: [id1,Danny,null,1,par1], suspend the tx commit
List<RowData> dataset1 = Collections.singletonList(
@@ -290,6 +292,7 @@ public void testBulkInsertInSequenceWithNonBlockingConcurrencyControl() throws E
// disable schedule compaction in writers
conf.setBoolean(FlinkOptions.COMPACTION_SCHEDULE_ENABLED, false);
conf.setBoolean(FlinkOptions.PRE_COMBINE, true);
conf.setString(HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key(), "false");

Configuration conf1 = conf.clone();
conf1.setString(FlinkOptions.OPERATION, "BULK_INSERT");
Original file line number Diff line number Diff line change
@@ -20,6 +20,7 @@

import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
@@ -363,7 +364,9 @@ private TableOptions defaultTableOptions(String tablePath) {
FlinkOptions.COMPACTION_TASKS.key(), 1,
FlinkOptions.COMPACTION_SCHEDULE_ENABLED.key(), false,
HoodieWriteConfig.EMBEDDED_TIMELINE_SERVER_REUSE_ENABLED.key(), false,
HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key(), true);
HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key(), true,
HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key(), "false",
HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key(), "false");
}

private void checkAnswerEvolved(String... expectedResult) throws Exception {
Original file line number Diff line number Diff line change
@@ -460,7 +460,7 @@ public void testGetColumnsToIndex() {
expected.add("booleanField");
expected.add("decimalField");
expected.add("localTimestampMillisField");
assertEquals(expected, HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, metadataConfig, Lazy.eagerly(Option.of(schema))));
assertEquals(expected, HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, metadataConfig, Lazy.eagerly(Option.of(schema)), true));

//test with avro schema with max cols set
metadataConfig = HoodieMetadataConfig.newBuilder()
@@ -470,9 +470,9 @@ public void testGetColumnsToIndex() {
expected = new ArrayList<>(Arrays.asList(HoodieTableMetadataUtil.META_COLS_TO_ALWAYS_INDEX));
expected.add("booleanField");
expected.add("intField");
assertEquals(expected, HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, metadataConfig, Lazy.eagerly(Option.of(schema))));
assertEquals(expected, HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, metadataConfig, Lazy.eagerly(Option.of(schema)), false));
//test with avro schema with meta cols
assertEquals(expected, HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, metadataConfig, Lazy.eagerly(Option.of(HoodieAvroUtils.addMetadataFields(schema)))));
assertEquals(expected, HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, metadataConfig, Lazy.eagerly(Option.of(HoodieAvroUtils.addMetadataFields(schema))), false));

//test with avro schema with type filter
metadataConfig = HoodieMetadataConfig.newBuilder()
@@ -483,7 +483,6 @@ public void testGetColumnsToIndex() {
expected.add("timestamp");
expected.add("_row_key");
expected.add("partition_path");
expected.add("trip_type");
expected.add("rider");
expected.add("driver");
expected.add("begin_lat");
@@ -493,14 +492,13 @@ public void testGetColumnsToIndex() {
expected.add("distance_in_meters");
expected.add("seconds_since_epoch");
expected.add("weight");
expected.add("nation");
expected.add("current_date");
expected.add("current_ts");
expected.add("height");
expected.add("_hoodie_is_deleted");
assertEquals(expected, HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, metadataConfig, Lazy.eagerly(Option.of(HoodieTestDataGenerator.AVRO_SCHEMA))));
assertEquals(expected, HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, metadataConfig, Lazy.eagerly(Option.of(HoodieTestDataGenerator.AVRO_SCHEMA)), false));
//test with avro schema with meta cols
assertEquals(expected, HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, metadataConfig, Lazy.eagerly(Option.of(HoodieAvroUtils.addMetadataFields(HoodieTestDataGenerator.AVRO_SCHEMA)))));
assertEquals(expected,
HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, metadataConfig, Lazy.eagerly(Option.of(HoodieAvroUtils.addMetadataFields(HoodieTestDataGenerator.AVRO_SCHEMA))), false));

//test with meta cols disabled
tableConfig.setValue(HoodieTableConfig.POPULATE_META_FIELDS.key(), "false");
@@ -535,7 +533,7 @@ public void testGetColumnsToIndex() {
expected.add("booleanField");
expected.add("decimalField");
expected.add("localTimestampMillisField");
assertEquals(expected, HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, metadataConfig, Lazy.eagerly(Option.of(schema))));
assertEquals(expected, HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, metadataConfig, Lazy.eagerly(Option.of(schema)), true));
}

private void addNColumns(List<String> list, int n) {
@@ -573,20 +571,20 @@ public void testValidateDataTypeForPartitionStats() {
assertTrue(validateDataTypeForPartitionStats("floatField", schema));
assertTrue(validateDataTypeForPartitionStats("doubleField", schema));
assertTrue(validateDataTypeForPartitionStats("longField", schema));
assertTrue(validateDataTypeForPartitionStats("bytesField", schema));
assertTrue(validateDataTypeForPartitionStats("unionIntField", schema));

// Test for complex fields
// Test for unsupported fields
assertFalse(validateDataTypeForPartitionStats("arrayField", schema));
assertFalse(validateDataTypeForPartitionStats("mapField", schema));
assertFalse(validateDataTypeForPartitionStats("structField", schema));
assertFalse(validateDataTypeForPartitionStats("bytesField", schema));

// Test for logical types
Schema dateFieldSchema = LogicalTypes.date().addToSchema(Schema.create(Schema.Type.INT));
schema = SchemaBuilder.record("TestRecord")
.fields()
.name("dateField").type(dateFieldSchema).noDefault()
.endRecord();
assertFalse(validateDataTypeForPartitionStats("dateField", schema));
assertTrue(validateDataTypeForPartitionStats("dateField", schema));
}
}
Original file line number Diff line number Diff line change
@@ -81,13 +81,17 @@ void testRecreateMDTForInsertOverwriteTableOperation() {
.setBasePath(mdtBasePath).build();
HoodieActiveTimeline timeline = mdtMetaClient.getActiveTimeline();
List<HoodieInstant> instants = timeline.getInstants();
assertEquals(3, instants.size());
assertEquals(5, instants.size());
// For MDT bootstrap instant.
assertEquals("00000000000000000", instants.get(0).requestedTime());
// For RLI bootstrap instant.
// For col stats bootstrap instant.
assertEquals("00000000000000001", instants.get(1).requestedTime());
// For partition stats bootstrap instant.
assertEquals("00000000000000002", instants.get(2).requestedTime());
// For RLI bootstrap instant.
assertEquals("00000000000000003", instants.get(3).requestedTime());
// For the insert instant.
assertEquals(timestamp0, instants.get(2).requestedTime());
assertEquals(timestamp0, instants.get(4).requestedTime());

// Insert second batch.
String timestamp1 = "20241015000000001";
@@ -101,13 +105,17 @@ void testRecreateMDTForInsertOverwriteTableOperation() {
mdtMetaClient = HoodieTableMetaClient.reload(mdtMetaClient);
timeline = mdtMetaClient.getActiveTimeline();
instants = timeline.getInstants();
assertEquals(3, timeline.getInstants().size());
assertEquals(5, timeline.getInstants().size());
// For MDT bootstrap instant.
assertEquals("00000000000000000", instants.get(0).requestedTime());
// For RLI bootstrap instant.
// For col stats bootstrap instant.
assertEquals("00000000000000001", instants.get(1).requestedTime());
// For partition stats bootstrap instant.
assertEquals("00000000000000002", instants.get(2).requestedTime());
// For RLI bootstrap instant.
assertEquals("00000000000000003", instants.get(3).requestedTime());
// For the insert_overwrite_table instant.
assertEquals(timestamp1, instants.get(2).requestedTime());
assertEquals(timestamp1, instants.get(4).requestedTime());
}
}
}
Original file line number Diff line number Diff line change
@@ -299,9 +299,47 @@ private void testHoodieClientBasicMultiWriterWithEarlyConflictDetection(String t
client4.close();
}

@ParameterizedTest
@MethodSource("providerClassResolutionStrategyAndTableType")
public void testHoodieClientBasicMultiWriter(HoodieTableType tableType, Class providerClass,
@Test
public void testHoodieClientBasicMultiWriterCOW_InProcessLP_SimpleCRS() throws Exception {
testHoodieClientBasicMultiWriter(HoodieTableType.COPY_ON_WRITE, InProcessLockProvider.class, new SimpleConcurrentFileWritesConflictResolutionStrategy());
}

@Test
public void testHoodieClientBasicMultiWriterCOW_FSBasedLP_SimpleCRS() throws Exception {
testHoodieClientBasicMultiWriter(HoodieTableType.COPY_ON_WRITE, FileSystemBasedLockProvider.class, new SimpleConcurrentFileWritesConflictResolutionStrategy());
}

@Test
public void testHoodieClientBasicMultiWriterCOW_FSBasedLP_PreferWriterCRS() throws Exception {
testHoodieClientBasicMultiWriter(HoodieTableType.COPY_ON_WRITE, FileSystemBasedLockProvider.class, new PreferWriterConflictResolutionStrategy());
}

@Test
public void testHoodieClientBasicMultiWriterCOW_InProcessLP_PreferWriterCRS() throws Exception {
testHoodieClientBasicMultiWriter(HoodieTableType.COPY_ON_WRITE, InProcessLockProvider.class, new PreferWriterConflictResolutionStrategy());
}

@Test
public void testHoodieClientBasicMultiWriterMOR_InProcessLP_SimpleCRS() throws Exception {
testHoodieClientBasicMultiWriter(HoodieTableType.MERGE_ON_READ, InProcessLockProvider.class, new SimpleConcurrentFileWritesConflictResolutionStrategy());
}

@Test
public void testHoodieClientBasicMultiWriterMOR_FSBasedLP_SimpleCRS() throws Exception {
testHoodieClientBasicMultiWriter(HoodieTableType.MERGE_ON_READ, FileSystemBasedLockProvider.class, new SimpleConcurrentFileWritesConflictResolutionStrategy());
}

@Test
public void testHoodieClientBasicMultiWriterMOR_FSBasedLP_PreferWriterCRS() throws Exception {
testHoodieClientBasicMultiWriter(HoodieTableType.MERGE_ON_READ, FileSystemBasedLockProvider.class, new PreferWriterConflictResolutionStrategy());
}

@Test
public void testHoodieClientBasicMultiWriterMOR_InProcessLP_PreferWriterCRS() throws Exception {
testHoodieClientBasicMultiWriter(HoodieTableType.MERGE_ON_READ, InProcessLockProvider.class, new PreferWriterConflictResolutionStrategy());
}

private void testHoodieClientBasicMultiWriter(HoodieTableType tableType, Class providerClass,
ConflictResolutionStrategy resolutionStrategy) throws Exception {
if (tableType == HoodieTableType.MERGE_ON_READ) {
setUpMORTestTable();
Original file line number Diff line number Diff line change
@@ -897,6 +897,7 @@ public void testMetadataTableCompactionWithPendingInstants() throws Exception {
.enable(true)
.enableMetrics(false)
.withMaxNumDeltaCommitsBeforeCompaction(4)
.withMetadataIndexColumnStats(false)
.build()).build();
initWriteConfigAndMetatableWriter(writeConfig, true);
doWriteOperation(testTable, metaClient.createNewInstantTime(), INSERT);
@@ -2224,7 +2225,7 @@ public void testMetadataMultiWriter() throws Exception {

// Ensure all commits were synced to the Metadata Table
HoodieTableMetaClient metadataMetaClient = createMetaClient(metadataTableBasePath);
assertEquals(metadataMetaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().countInstants(), 5);
assertEquals(metadataMetaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().countInstants(), 6);
assertTrue(metadataMetaClient.getActiveTimeline().containsInstant(INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "0000002")));
assertTrue(metadataMetaClient.getActiveTimeline().containsInstant(INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "0000003")));
assertTrue(metadataMetaClient.getActiveTimeline().containsInstant(INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "0000004")));
@@ -2277,7 +2278,7 @@ public void testMultiWriterForDoubleLocking() throws Exception {
LOG.warn("total commits in metadata table " + metadataMetaClient.getActiveTimeline().getCommitsTimeline().countInstants());

// 6 commits and 2 cleaner commits.
assertEquals(metadataMetaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().countInstants(), 8);
assertEquals(metadataMetaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().countInstants(), 9);
assertTrue(metadataMetaClient.getActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants().countInstants() <= 1);
// Validation
validateMetadata(writeClient);
Original file line number Diff line number Diff line change
@@ -267,7 +267,7 @@ private void testBootstrapCommon(boolean partitioned, boolean deltaCommit, Effec
.withBootstrapParallelism(3)
.withBootstrapModeSelector(bootstrapModeSelectorClass)
.withBootstrapModeForRegexMatch(modeForRegexMatch).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withMaxNumDeltaCommitsBeforeCompaction(3).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withMaxNumDeltaCommitsBeforeCompaction(3).withMetadataIndexColumnStats(false).build())
.build();

SparkRDDWriteClient client = new SparkRDDWriteClient(context, config);
@@ -415,7 +415,7 @@ private void checkBootstrapResults(int totalRecords, Schema schema, String insta
reloadInputFormats();
List<GenericRecord> records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(
HadoopFSUtils.getStorageConf(jsc.hadoopConfiguration()),
FSUtils.getAllPartitionPaths(context, storage, basePath, HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS).stream()
FSUtils.getAllPartitionPaths(context, storage, basePath, false).stream()
.map(f -> basePath + "/" + f).collect(Collectors.toList()),
basePath, roJobConf, false, schema, TRIP_HIVE_COLUMN_TYPES, false, new ArrayList<>());
assertEquals(totalRecords, records.size());
Original file line number Diff line number Diff line change
@@ -23,6 +23,7 @@
import org.apache.hudi.client.bootstrap.selector.FullRecordBootstrapModeSelector;
import org.apache.hudi.client.bootstrap.selector.MetadataOnlyBootstrapModeSelector;
import org.apache.hudi.client.bootstrap.translator.DecodedBootstrapPartitionPathTranslator;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.config.HoodieBootstrapConfig;
@@ -94,6 +95,7 @@ protected Map<String, String> basicOptions() {
options.put(DataSourceWriteOptions.TABLE_TYPE().key(), tableType.name());
options.put(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING().key(), "true");
options.put(DataSourceWriteOptions.RECORDKEY_FIELD().key(), "_row_key");
options.put(HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key(), "false");
if (nPartitions == 0) {
options.put(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key(), NonpartitionedKeyGenerator.class.getName());
} else {
@@ -116,6 +118,7 @@ protected Map<String, String> setBootstrapOptions() {
Map<String, String> options = basicOptions();
options.put(DataSourceWriteOptions.OPERATION().key(), DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL());
options.put(HoodieBootstrapConfig.BASE_PATH.key(), bootstrapBasePath);
options.put(HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key(), "false");
if (!dashPartitions) {
options.put(HoodieBootstrapConfig.PARTITION_PATH_TRANSLATOR_CLASS_NAME.key(), DecodedBootstrapPartitionPathTranslator.class.getName());
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.functional;

import org.apache.hudi.avro.model.DateWrapper;
import org.apache.hudi.avro.model.HoodieMetadataColumnStats;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.EngineProperty;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.testutils.InProcessTimeGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.SpillableMapUtils;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.io.HoodieCreateHandle;
import org.apache.hudi.io.storage.HoodieIOFactory;
import org.apache.hudi.io.storage.HoodieSeekingFileReader;
import org.apache.hudi.metadata.HoodieMetadataPayload;
import org.apache.hudi.metadata.HoodieMetadataWriteUtils;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.testutils.HoodieSparkClientTestHarness;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.sql.Date;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static org.apache.hudi.common.util.ConfigUtils.DEFAULT_HUDI_CONFIG_FOR_READER;
import static org.apache.hudi.metadata.MetadataPartitionType.COLUMN_STATS;
import static org.junit.jupiter.api.Assertions.assertEquals;

public class TestColStatsRecordWithMetadataRecord extends HoodieSparkClientTestHarness {

private static final Random RANDOM = new Random();

@BeforeEach
public void setUp() throws Exception {
initSparkContexts("TestHoodieCreateHandle");
initPath();
initHoodieStorage();
initTestDataGenerator();
initMetaClient();
initTimelineService();
}

@AfterEach
public void tearDown() throws Exception {
cleanupResources();
}

@Test
public void testRowCreateHandle() throws Exception {

// create a data table which will auto create mdt table as well
HoodieWriteConfig cfg = getConfig();
try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) {
writeData(client, InProcessTimeGenerator.createNewInstantTime(), 100, false);
}

String fileName = "file.parquet";
String targetColName = "c1";

Date minDate = new Date(1000 * 60 * 60 * 10);
Date maxDate = new Date(1000 * 60 * 60 * 60);

HoodieColumnRangeMetadata<Comparable> expectedColStats =
HoodieColumnRangeMetadata.<Comparable>create(fileName, targetColName, minDate, maxDate, 5, 1000, 123456, 123456);

// let's ensure this record gets serialized as DateWrapper
assertEquals(DateWrapper.class.getCanonicalName(), ((HoodieMetadataPayload) HoodieMetadataPayload.createColumnStatsRecords("p1", Collections.singletonList(expectedColStats), false)
.collect(Collectors.toList()).get(0).getData()).getColumnStatMetadata().get().getMinValue().getClass().getCanonicalName());

// create mdt records
HoodieRecord<HoodieMetadataPayload> columnStatsRecord =
HoodieMetadataPayload.createColumnStatsRecords("p1", Collections.singletonList(expectedColStats), false)
.collect(Collectors.toList()).get(0);

HoodieWriteConfig mdtWriteConfig = HoodieMetadataWriteUtils.createMetadataWriteConfig(cfg, HoodieFailedWritesCleaningPolicy.EAGER);
HoodieTableMetaClient mdtMetaClient = HoodieTableMetaClient.builder().setBasePath(mdtWriteConfig.getBasePath()).setConf(context.getStorageConf().newInstance()).build();

HoodieTable table = HoodieSparkTable.create(mdtWriteConfig, context, mdtMetaClient);
String newCommitTime = InProcessTimeGenerator.createNewInstantTime();
HoodieCreateHandle handle = new HoodieCreateHandle(mdtWriteConfig, newCommitTime, table, COLUMN_STATS.getPartitionPath(), "col-stats-00001-0", new PhoneyTaskContextSupplier());

// write the record to hfile.
handle.write(columnStatsRecord, new Schema.Parser().parse(mdtWriteConfig.getSchema()), new TypedProperties());

WriteStatus writeStatus = (WriteStatus) handle.close().get(0);
String filePath = writeStatus.getStat().getPath();

// read the hfile using base file reader.
StoragePath baseFilePath = new StoragePath(mdtMetaClient.getBasePath() + "/" + filePath);
HoodieSeekingFileReader baseFileReader = (HoodieSeekingFileReader<?>) HoodieIOFactory.getIOFactory(mdtMetaClient.getStorage())
.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO)
.getFileReader(DEFAULT_HUDI_CONFIG_FOR_READER, baseFilePath);

ClosableIterator itr = baseFileReader.getRecordIterator();
List<HoodieRecord<HoodieMetadataPayload>> allRecords = new ArrayList<>();
while (itr.hasNext()) {
GenericRecord genericRecord = (GenericRecord) ((HoodieRecord) itr.next()).getData();
HoodieRecord<HoodieMetadataPayload> mdtRec = SpillableMapUtils.convertToHoodieRecordPayload(genericRecord,
mdtWriteConfig.getPayloadClass(), mdtWriteConfig.getPreCombineField(),
Pair.of(mdtMetaClient.getTableConfig().getRecordKeyFieldProp(), mdtMetaClient.getTableConfig().getPartitionFieldProp()),
false, Option.of(COLUMN_STATS.getPartitionPath()), Option.empty());
allRecords.add(mdtRec);
}

// validate the min and max values.
HoodieMetadataColumnStats actualColStatsMetadata = allRecords.get(0).getData().getColumnStatMetadata().get();
HoodieMetadataColumnStats expectedColStatsMetadata = ((HoodieMetadataPayload) HoodieMetadataPayload.createColumnStatsRecords("p1", Collections.singletonList(expectedColStats), false)
.collect(Collectors.toList()).get(0).getData()).getColumnStatMetadata().get();
assertEquals(expectedColStatsMetadata.getMinValue().getClass().getCanonicalName(), actualColStatsMetadata.getMinValue().getClass().getCanonicalName());
assertEquals(expectedColStatsMetadata.getMinValue(), actualColStatsMetadata.getMinValue());
assertEquals(expectedColStatsMetadata.getMaxValue(), actualColStatsMetadata.getMaxValue());
}

private List<WriteStatus> writeData(SparkRDDWriteClient client, String instant, int numRecords, boolean doCommit) {
metaClient = HoodieTableMetaClient.reload(metaClient);
JavaRDD records = jsc.parallelize(dataGen.generateInserts(instant, numRecords), 2);
metaClient = HoodieTableMetaClient.reload(metaClient);
client.startCommitWithTime(instant);
List<WriteStatus> writeStatuses = client.upsert(records, instant).collect();
org.apache.hudi.testutils.Assertions.assertNoWriteErrors(writeStatuses);
if (doCommit) {
List<HoodieWriteStat> writeStats = writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList());
boolean committed = client.commitStats(instant, writeStats, Option.empty(), metaClient.getCommitActionType());
Assertions.assertTrue(committed);
}
metaClient = HoodieTableMetaClient.reload(metaClient);
return writeStatuses;
}

class PhoneyTaskContextSupplier extends TaskContextSupplier {

@Override
public Supplier<Integer> getPartitionIdSupplier() {
return () -> 1;
}

@Override
public Supplier<Integer> getStageIdSupplier() {
return () -> 1;
}

@Override
public Supplier<Long> getAttemptIdSupplier() {
return () -> 1L;
}

@Override
public Option<String> getProperty(EngineProperty prop) {
return Option.empty();
}
}
}
Original file line number Diff line number Diff line change
@@ -28,6 +28,7 @@
import org.apache.hudi.client.bootstrap.selector.MetadataOnlyBootstrapModeSelector;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.bootstrap.index.BootstrapIndex;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieFileFormat;
@@ -246,6 +247,8 @@ private void testBootstrapCommon(boolean partitioned, boolean deltaCommit, Effec
.withBootstrapParallelism(3)
.withBootstrapModeSelector(bootstrapModeSelectorClass)
.withBootstrapModeForRegexMatch(modeForRegexMatch).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withMaxNumDeltaCommitsBeforeCompaction(3)
.withMetadataIndexColumnStats(false).build())
.build();

SparkRDDWriteClient client = new SparkRDDWriteClient(context, config);
Original file line number Diff line number Diff line change
@@ -426,6 +426,8 @@ public void testMetadataStatsOnCommit(Boolean rollbackUsingMarkers) throws Excep
.withAvroSchemaValidate(false)
.withAllowAutoEvolutionColumnDrop(true)
.withAutoCommit(false)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withMetadataIndexColumnStats(false).build())
// in this test we mock few entries in timeline. hence col stats initialization does not work.
.build();

setUp(cfg.getProps());
Original file line number Diff line number Diff line change
@@ -59,7 +59,8 @@ class RecordLevelIndexTestBase extends HoodieSparkClientTestBase {
RECORDKEY_FIELD.key -> "_row_key",
PARTITIONPATH_FIELD.key -> "partition",
PRECOMBINE_FIELD.key -> "timestamp",
HoodieTableConfig.POPULATE_META_FIELDS.key -> "true"
HoodieTableConfig.POPULATE_META_FIELDS.key -> "true",
HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key -> "15"
) ++ metadataOpts

val secondaryIndexOpts = Map(
Original file line number Diff line number Diff line change
@@ -19,7 +19,7 @@ package org.apache.hudi.functional

import org.apache.hudi.{AvroConversionUtils, DataSourceWriteOptions, ScalaAssertionSupport}
import org.apache.hudi.HoodieConversionUtils.toJavaOption
import org.apache.hudi.common.config.RecordMergeMode
import org.apache.hudi.common.config.{HoodieMetadataConfig, RecordMergeMode}
import org.apache.hudi.common.model.{HoodieRecord, HoodieTableType, OverwriteWithLatestAvroPayload}
import org.apache.hudi.common.table.{HoodieTableConfig, TableSchemaResolver}
import org.apache.hudi.common.util.Option
@@ -28,9 +28,8 @@ import org.apache.hudi.exception.SchemaCompatibilityException
import org.apache.hudi.functional.TestBasicSchemaEvolution.{dropColumn, injectColumnAt}
import org.apache.hudi.testutils.HoodieSparkClientTestBase
import org.apache.hudi.util.JFunction

import org.apache.hadoop.fs.FileSystem
import org.apache.spark.sql.{functions, HoodieUnsafeUtils, Row, SaveMode, SparkSession, SparkSessionExtensions}
import org.apache.spark.sql.{HoodieUnsafeUtils, Row, SaveMode, SparkSession, SparkSessionExtensions, functions}
import org.apache.spark.sql.hudi.HoodieSparkSessionExtension
import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructField, StructType}
import org.junit.jupiter.api.{AfterEach, BeforeEach}
@@ -39,7 +38,6 @@ import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.CsvSource

import java.util.function.Consumer

import scala.collection.JavaConverters._

class TestBasicSchemaEvolution extends HoodieSparkClientTestBase with ScalaAssertionSupport {
@@ -55,7 +53,8 @@ class TestBasicSchemaEvolution extends HoodieSparkClientTestBase with ScalaAsser
DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition",
DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp",
HoodieWriteConfig.TBL_NAME.key -> "hoodie_test"
HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key -> "false"
)

val verificationCol: String = "driver"
Original file line number Diff line number Diff line change
@@ -143,7 +143,8 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase {
def testMetadataColumnStatsIndexInitializationWithUpserts(tableType: HoodieTableType, partitionCol : String): Unit = {
val testCase = ColumnStatsTestCase(tableType, shouldReadInMemory = true)
val metadataOpts = Map(
HoodieMetadataConfig.ENABLE.key -> "true"
HoodieMetadataConfig.ENABLE.key -> "true",
HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key() -> "false"
)

val commonOpts = Map(
@@ -256,7 +257,8 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase {
def testMetadataColumnStatsIndexInitializationWithRollbacks(tableType: HoodieTableType, partitionCol : String): Unit = {
val testCase = ColumnStatsTestCase(tableType, shouldReadInMemory = true)
val metadataOpts = Map(
HoodieMetadataConfig.ENABLE.key -> "true"
HoodieMetadataConfig.ENABLE.key -> "true",
HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key() -> "false"
)

val commonOpts = Map(
Original file line number Diff line number Diff line change
@@ -430,7 +430,7 @@ class TestMORDataSource extends HoodieSparkClientTestBase with SparkDatasetMixin
}

@ParameterizedTest
@EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", "SPARK"))
@EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO","SPARK"))
def testPrunedFiltered(recordType: HoodieRecordType) {

val (writeOpts, readOpts) = getWriterReaderOpts(recordType)
Original file line number Diff line number Diff line change
@@ -37,19 +37,17 @@ import org.apache.hudi.storage.hadoop.HoodieHadoopStorage
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness.getSparkSqlConf
import org.apache.hudi.util.JavaScalaConverters.convertJavaListToScalaSeq

import org.apache.spark.SparkConf
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.functions.{col, explode}
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.{Tag, Test}
import org.junit.jupiter.api.{Disabled, Tag, Test}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.CsvSource

import java.util
import java.util.Collections
import java.util.stream.Collectors

import scala.collection.JavaConverters._

@Tag("functional")
@@ -239,8 +237,9 @@ class TestMetadataTableWithSparkDataSource extends SparkClientFunctionalTestHarn
val metadataOpts: Map[String, String] = Map(
HoodieMetadataConfig.ENABLE.key -> "true",
HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true",
HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key -> "true",
DataSourceWriteOptions.TABLE_TYPE.key -> "MERGE_ON_READ",
HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key -> "5"
HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key -> "6"
)
val combinedOpts: Map[String, String] = partitionedCommonOpts ++ metadataOpts

@@ -259,7 +258,7 @@ class TestMetadataTableWithSparkDataSource extends SparkClientFunctionalTestHarn
.setBasePath(s"$basePath/.hoodie/metadata")
.build
val timelineT0 = metaClient.getActiveTimeline
assertEquals(3, timelineT0.countInstants())
assertEquals(4, timelineT0.countInstants())
assertEquals(HoodieTimeline.DELTA_COMMIT_ACTION, timelineT0.lastInstant().get().getAction)
val t0 = timelineT0.lastInstant().get().requestedTime

@@ -285,7 +284,7 @@ class TestMetadataTableWithSparkDataSource extends SparkClientFunctionalTestHarn

//Validate T1
val timelineT1 = metaClient.reloadActiveTimeline()
assertEquals(4, timelineT1.countInstants())
assertEquals(5, timelineT1.countInstants())
assertEquals(HoodieTimeline.DELTA_COMMIT_ACTION, timelineT1.lastInstant().get().getAction)
val t1 = timelineT1.lastInstant().get().requestedTime

@@ -312,7 +311,7 @@ class TestMetadataTableWithSparkDataSource extends SparkClientFunctionalTestHarn

//Validate T2
val timelineT2 = metaClient.reloadActiveTimeline()
assertEquals(5, timelineT2.countInstants())
assertEquals(6, timelineT2.countInstants())
assertEquals(HoodieTimeline.DELTA_COMMIT_ACTION, timelineT2.lastInstant().get().getAction)
val t2 = timelineT2.lastInstant().get().requestedTime

@@ -343,7 +342,7 @@ class TestMetadataTableWithSparkDataSource extends SparkClientFunctionalTestHarn

//Validate T3
val timelineT3 = metaClient.reloadActiveTimeline()
assertEquals(7, timelineT3.countInstants())
assertEquals(8, timelineT3.countInstants())
assertEquals(HoodieTimeline.DELTA_COMMIT_ACTION, timelineT3.getInstants.get(5).getAction)
assertEquals(HoodieTimeline.COMMIT_ACTION, timelineT3.lastInstant().get().getAction)

Original file line number Diff line number Diff line change
@@ -65,7 +65,7 @@ class TestPartitionStatsIndex extends PartitionStatsIndexTestBase {
@Test
def testPartitionStatsWithoutColumnStats(): Unit = {
// remove column stats enable key from commonOpts
val hudiOpts = commonOpts - HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key
val hudiOpts = commonOpts + (HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "false")
// should throw an exception as column stats is required for partition stats
assertThrows[HoodieException] {
doWriteAndValidateDataAndPartitionStats(
Original file line number Diff line number Diff line change
@@ -299,8 +299,8 @@ class TestRecordLevelIndex extends RecordLevelIndexTestBase {
assertEquals(0, getFileGroupCountForRecordIndex(writeConfig))
metaClient = HoodieTableMetaClient.reload(metaClient)
assertEquals(0, metaClient.getTableConfig.getMetadataPartitionsInflight.size())
// only files partition should be present
assertEquals(1, metaClient.getTableConfig.getMetadataPartitions.size())
// only files, col stats, partition stats partition should be present.
assertEquals(3, metaClient.getTableConfig.getMetadataPartitions.size())

doWriteAndValidateDataAndRecordIndex(hudiOpts,
operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
Original file line number Diff line number Diff line change
@@ -45,11 +45,14 @@ class TestRecordLevelIndexWithSQL extends RecordLevelIndexTestBase {
@ParameterizedTest
@ValueSource(strings = Array("COPY_ON_WRITE", "MERGE_ON_READ"))
def testRLIWithSQL(tableType: String): Unit = {
var hudiOpts = commonOpts
hudiOpts = hudiOpts + (
val hudiOpts = commonOpts ++ Map(
DataSourceWriteOptions.TABLE_TYPE.key -> tableType,
"hoodie.metadata.index.column.stats.enable" -> "false",
DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true")

// some negative test cases in this class assumes
// only RLI being enabled. So, disabling col stats for now.

val df = doWriteAndValidateDataAndRecordIndex(hudiOpts,
operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
saveMode = SaveMode.Overwrite,
@@ -276,7 +279,8 @@ class TestRecordLevelIndexWithSQL extends RecordLevelIndexTestBase {
HoodieWriteConfig.TBL_NAME.key -> tableName,
DataSourceWriteOptions.RECORDKEY_FIELD.key -> "record_key_col,name",
DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition_key_col",
DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true"
DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true",
"hoodie.metadata.index.column.stats.enable" -> "false"
) ++ metadataOpts

spark.sql(
@@ -292,7 +296,8 @@ class TestRecordLevelIndexWithSQL extends RecordLevelIndexTestBase {
| hoodie.metadata.enable = 'true',
| hoodie.metadata.record.index.enable = 'true',
| hoodie.datasource.write.recordkey.field = 'record_key_col,name',
| hoodie.enable.data.skipping = 'true'
| hoodie.enable.data.skipping = 'true',
| hoodie.metadata.index.column.stats.enable = 'false'
| )
| partitioned by(partition_key_col)
| location '$dummyTablePath'
@@ -338,7 +343,8 @@ class TestRecordLevelIndexWithSQL extends RecordLevelIndexTestBase {
| hoodie.metadata.enable = 'true',
| hoodie.metadata.record.index.enable = 'true',
| hoodie.datasource.write.recordkey.field = 'record_key_col1,record_key_col2,record_key_col3',
| hoodie.enable.data.skipping = 'true'
| hoodie.enable.data.skipping = 'true',
| hoodie.metadata.index.column.stats.enable = 'false'
| )
| partitioned by(partition_key_col)
| location '$dummyTablePath'
Original file line number Diff line number Diff line change
@@ -1293,9 +1293,13 @@ class TestSecondaryIndexPruning extends SparkClientFunctionalTestHarness {
def testSecondaryIndexWithPrimitiveDataTypes(): Unit = {
var hudiOpts = commonOpts
hudiOpts = hudiOpts ++ Map(
DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true")
DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true",
HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key -> "false")
tableName += "test_secondary_index_with_primitive_data_types"

spark.sql("set hoodie.metadata.index.partition.stats.enable=false")
// HUDI-8620 tracks fixing BYTES and FIXED type for col stats and partition stats.

// Create table with different data types
spark.sql(
s"""
@@ -1379,6 +1383,7 @@ class TestSecondaryIndexPruning extends SparkClientFunctionalTestHarness {
verifyQueryPredicate(hudiOpts, col)
}
}
spark.sessionState.conf.unsetConf("unset hoodie.metadata.index.partition.stats.enable")
}

@Test
Original file line number Diff line number Diff line change
@@ -98,6 +98,8 @@ class TestSecondaryIndex extends HoodieSparkSqlTestBase {
spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)")
spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)")
checkAnswer(s"show indexes from default.$tableName")(
Seq("column_stats", "column_stats", ""),
Seq("partition_stats", "partition_stats", ""),
Seq("record_index", "record_index", "")
)

@@ -108,6 +110,8 @@ class TestSecondaryIndex extends HoodieSparkSqlTestBase {
// Secondary index is created by default for non record key column when index type is not specified
spark.sql(s"create index idx_name on $tableName (name)")
checkAnswer(s"show indexes from default.$tableName")(
Seq("column_stats", "column_stats", ""),
Seq("partition_stats", "partition_stats", ""),
Seq("secondary_index_idx_name", "secondary_index", "name"),
Seq("record_index", "record_index", "")
)
@@ -120,6 +124,8 @@ class TestSecondaryIndex extends HoodieSparkSqlTestBase {

// Both indexes should be shown
checkAnswer(s"show indexes from $tableName")(
Seq("column_stats", "column_stats", ""),
Seq("partition_stats", "partition_stats", ""),
Seq("secondary_index_idx_name", "secondary_index", "name"),
Seq("secondary_index_idx_price", "secondary_index", "price"),
Seq("record_index", "record_index", "")
@@ -128,6 +134,8 @@ class TestSecondaryIndex extends HoodieSparkSqlTestBase {
checkAnswer(s"drop index idx_name on $tableName")()
// show index shows only one index after dropping
checkAnswer(s"show indexes from $tableName")(
Seq("column_stats", "column_stats", ""),
Seq("partition_stats", "partition_stats", ""),
Seq("secondary_index_idx_price", "secondary_index", "price"),
Seq("record_index", "record_index", "")
)
@@ -139,6 +147,8 @@ class TestSecondaryIndex extends HoodieSparkSqlTestBase {
// drop index should work now
checkAnswer(s"drop index idx_name on $tableName")()
checkAnswer(s"show indexes from $tableName")(
Seq("column_stats", "column_stats", ""),
Seq("partition_stats", "partition_stats", ""),
Seq("secondary_index_idx_price", "secondary_index", "price"),
Seq("record_index", "record_index", "")
)
@@ -156,11 +166,15 @@ class TestSecondaryIndex extends HoodieSparkSqlTestBase {
metaClient.getTableConfig.setMetadataPartitionState(metaClient, indexDefinition.getIndexName, false)
checkAnswer(s"drop index idx_price on $tableName")()
checkAnswer(s"show indexes from $tableName")(
Seq("column_stats", "column_stats", ""),
Seq("partition_stats", "partition_stats", ""),
Seq("record_index", "record_index", "")
)

// Drop the record index and show index should show no index
checkAnswer(s"drop index record_index on $tableName")()
checkAnswer(s"drop index partition_stats on $tableName")()
checkAnswer(s"drop index column_stats on $tableName")()
checkAnswer(s"show indexes from $tableName")()

checkException(s"drop index idx_price on $tableName")("Index does not exist: idx_price")
Original file line number Diff line number Diff line change
@@ -77,6 +77,7 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
val tablePath = s"${new Path(tmp.getCanonicalPath, tableName).toUri.toString}"
spark.sql("set " + DataSourceWriteOptions.SPARK_SQL_INSERT_INTO_OPERATION.key + "=upsert")
spark.sql("set hoodie.schema.on.read.enable=true")
spark.sql("set hoodie.metadata.index.column.stats.enable=false")
// NOTE: This is required since as this tests use type coercions which were only permitted in Spark 2.x
// and are disallowed now by default in Spark 3.x
spark.sql("set spark.sql.storeAssignmentPolicy=legacy")
@@ -139,6 +140,7 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
spark.sessionState.conf.unsetConf(DataSourceWriteOptions.SPARK_SQL_INSERT_INTO_OPERATION.key)
spark.sessionState.conf.unsetConf("spark.sql.storeAssignmentPolicy")
}
spark.sessionState.conf.unsetConf("unset hoodie.metadata.index.column.stats.enable")
})
}

@@ -240,6 +242,7 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
spark.sql("set hoodie.compact.schedule.inline=false")

spark.sql("set hoodie.schema.on.read.enable=true")
spark.sql("set hoodie.metadata.index.column.stats.enable=false")
spark.sql("set " + DataSourceWriteOptions.SPARK_SQL_INSERT_INTO_OPERATION.key + "=upsert")
// NOTE: This is required since as this tests use type coercions which were only permitted in Spark 2.x
// and are disallowed now by default in Spark 3.x
@@ -334,6 +337,7 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
}
spark.sessionState.conf.unsetConf("spark.sql.storeAssignmentPolicy")
spark.sessionState.conf.unsetConf(DataSourceWriteOptions.SPARK_SQL_INSERT_INTO_OPERATION.key)
spark.sessionState.conf.unsetConf("unset hoodie.metadata.index.column.stats.enable")
}
}

Original file line number Diff line number Diff line change
@@ -612,11 +612,11 @@ class TestHoodieTableValuedFunction extends HoodieSparkSqlTestBase {
)
assert(result6DF.count() == 0)

// no partition stats by default
// partition stats enabled by default
val result7DF = spark.sql(
s"select type, key, ColumnStatsMetadata from hudi_metadata('$identifier') where type=${MetadataPartitionType.PARTITION_STATS.getRecordType}"
)
assert(result7DF.count() == 0)
assert(result7DF.count() == 12)
}
}
spark.sessionState.conf.unsetConf(SPARK_SQL_INSERT_INTO_OPERATION.key)
Original file line number Diff line number Diff line change
@@ -54,6 +54,7 @@ class TestBootstrapProcedure extends HoodieSparkProcedureTestBase {
}

spark.sql("set hoodie.bootstrap.parallelism = 20")
spark.sql("set hoodie.metadata.index.column.stats.enable = false")
checkAnswer(
s"""call run_bootstrap(
|table => '$tableName',
@@ -93,6 +94,8 @@ class TestBootstrapProcedure extends HoodieSparkProcedureTestBase {
spark.sql("set hoodie.datasource.write.row.writer.enable = false")
spark.sql(s"""call run_clustering(table => '$tableName')""".stripMargin)
assertResult(0)(spark.sql(s"select * from $tableName").except(beforeClusterDf).count())

spark.sessionState.conf.unsetConf("unset hoodie.metadata.index.column.stats.enable")
}
}

@@ -119,6 +122,7 @@ class TestBootstrapProcedure extends HoodieSparkProcedureTestBase {
}

spark.sql("set hoodie.bootstrap.parallelism = 20")
spark.sql("set hoodie.metadata.index.column.stats.enable = false")
checkAnswer(
s"""call run_bootstrap(
|table => '$tableName',
@@ -158,7 +162,7 @@ class TestBootstrapProcedure extends HoodieSparkProcedureTestBase {
assertResult("true") {
metaClient.getTableConfig.getString(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE)
};

spark.sessionState.conf.unsetConf("unset hoodie.metadata.index.column.stats.enable")
}
}

@@ -181,6 +185,7 @@ class TestBootstrapProcedure extends HoodieSparkProcedureTestBase {
df.write.parquet(sourcePath)

spark.sql("set hoodie.bootstrap.parallelism = 20")
spark.sql("set hoodie.metadata.index.column.stats.enable = false")
// run bootstrap
checkAnswer(
s"""call run_bootstrap(
@@ -214,6 +219,7 @@ class TestBootstrapProcedure extends HoodieSparkProcedureTestBase {
spark.sql("set hoodie.datasource.write.row.writer.enable = false")
spark.sql(s"""call run_clustering(table => '$tableName')""".stripMargin)
assertResult(0)(spark.sql(s"select * from $tableName").except(beforeClusterDf).count())
spark.sessionState.conf.unsetConf("unset hoodie.metadata.index.column.stats.enable")
}
}

@@ -241,6 +247,7 @@ class TestBootstrapProcedure extends HoodieSparkProcedureTestBase {

spark.sql("set hoodie.bootstrap.parallelism = 20")
spark.sql("set hoodie.datasource.write.precombine.field=timestamp")
spark.sql("set hoodie.metadata.index.column.stats.enable = false")

checkAnswer(
s"""call run_bootstrap(
@@ -254,6 +261,7 @@ class TestBootstrapProcedure extends HoodieSparkProcedureTestBase {
|bootstrap_overwrite => true)""".stripMargin) {
Seq(0)
}
spark.sessionState.conf.unsetConf("unset hoodie.metadata.index.column.stats.enable")
}
}
}
Original file line number Diff line number Diff line change
@@ -56,14 +56,14 @@ class TestCommitsProcedure extends HoodieSparkProcedureTestBase {

// collect active commits for table
val commits = spark.sql(s"""call show_commits(table => '$tableName', limit => 10)""").collect()
assertResult(4) {
assertResult(5) {
commits.length
}

// collect archived commits for table
val endTs = commits(0).get(0).toString
val archivedCommits = spark.sql(s"""call show_archived_commits(table => '$tableName', end_ts => '$endTs')""").collect()
assertResult(3) {
assertResult(2) {
archivedCommits.length
}
}
@@ -106,14 +106,14 @@ class TestCommitsProcedure extends HoodieSparkProcedureTestBase {

// collect active commits for table
val commits = spark.sql(s"""call show_commits(table => '$tableName', limit => 10)""").collect()
assertResult(4) {
assertResult(5) {
commits.length
}

// collect archived commits for table
val endTs = commits(0).get(0).toString
val archivedCommits = spark.sql(s"""call show_archived_commits_metadata(table => '$tableName', end_ts => '$endTs')""").collect()
assertResult(3) {
assertResult(2) {
archivedCommits.length
}
}
Original file line number Diff line number Diff line change
@@ -133,8 +133,8 @@ public void testIsIndexBuiltForAllRequestedTypes() {
@Test
public void testIndexerWithNotAllIndexesEnabled() {
String tableName = "indexer_test";
// enable files and bloom_filters on the regular write client
HoodieMetadataConfig.Builder metadataConfigBuilder = getMetadataConfigBuilder(true, false).withMetadataIndexBloomFilter(true);
// enable files and bloom_filters only w/ the regular write client
HoodieMetadataConfig.Builder metadataConfigBuilder = getMetadataConfigBuilder(true, false).withMetadataIndexBloomFilter(true).withMetadataIndexColumnStats(false);
upsertToTable(metadataConfigBuilder.build(), tableName);

// validate table config
@@ -148,7 +148,7 @@ public void testIndexerWithNotAllIndexesEnabled() {
@Test
public void testIndexerWithFilesPartition() {
String tableName = "indexer_test";
// enable files and bloom_filters on the regular write client
// enable files and bloom_filters only with the regular write client
HoodieMetadataConfig.Builder metadataConfigBuilder = getMetadataConfigBuilder(false, false).withMetadataIndexBloomFilter(true);
upsertToTable(metadataConfigBuilder.build(), tableName);

@@ -165,8 +165,10 @@ public void testIndexerWithFilesPartition() {
@Test
public void testIndexerForRecordIndex() {
String tableName = "indexer_test";
// enable files and bloom_filters on the regular write client
HoodieMetadataConfig.Builder metadataConfigBuilder = getMetadataConfigBuilder(true, false);
// enable files and bloom_filters only with the regular write client
HoodieMetadataConfig.Builder metadataConfigBuilder = HoodieMetadataConfig.newBuilder()
.enable(true)
.withAsyncIndex(false).withMetadataIndexColumnStats(false);
upsertToTable(metadataConfigBuilder.build(), tableName);

// validate table config
@@ -183,9 +185,9 @@ public void testIndexerWithWriterFinishingFirst() throws IOException {
// is inflight, while the regular writer is updating metadata table.
// The delta commit from the indexer should not be rolled back.
String tableName = "indexer_with_writer_finishing_first";
// Enable files and bloom_filters on the regular write client
// Enable files and bloom_filters only with the regular write client
HoodieMetadataConfig.Builder metadataConfigBuilder =
getMetadataConfigBuilder(true, false).withMetadataIndexBloomFilter(true);
getMetadataConfigBuilder(true, false).withMetadataIndexBloomFilter(true).withMetadataIndexColumnStats(false);
HoodieMetadataConfig metadataConfig = metadataConfigBuilder.build();
upsertToTable(metadataConfig, tableName);

@@ -256,9 +258,9 @@ public void testIndexerWithWriterFinishingLast() throws IOException {
// finishes the original delta commit. In this case, the async indexer should not
// trigger the rollback on other inflight writes in the metadata table.
String tableName = "indexer_with_writer_finishing_first";
// Enable files and bloom_filters on the regular write client
// Enable files and bloom_filters only with the regular write client
HoodieMetadataConfig.Builder metadataConfigBuilder =
getMetadataConfigBuilder(true, false).withMetadataIndexBloomFilter(true);
getMetadataConfigBuilder(true, false).withMetadataIndexBloomFilter(true).withMetadataIndexColumnStats(false);
HoodieMetadataConfig metadataConfig = metadataConfigBuilder.build();
upsertToTable(metadataConfig, tableName);
upsertToTable(metadataConfig, tableName);
@@ -324,7 +326,7 @@ private static Stream<Arguments> colStatsFileGroupCountParams() {
public void testColStatsFileGroupCount(int colStatsFileGroupCount) {
TestHoodieIndexer.colStatsFileGroupCount = colStatsFileGroupCount;
String tableName = "indexer_test";
// enable files and bloom_filters on the regular write client
// enable files and bloom_filters only with the regular write client
HoodieMetadataConfig.Builder metadataConfigBuilder = getMetadataConfigBuilder(false, false).withMetadataIndexBloomFilter(true);
upsertToTable(metadataConfigBuilder.build(), tableName);

@@ -352,7 +354,7 @@ public void testColStatsFileGroupCount(int colStatsFileGroupCount) {
@Test
public void testIndexerForExceptionWithNonFilesPartition() {
String tableName = "indexer_test";
// enable files and bloom_filters on the regular write client
// enable files and bloom_filters only with the regular write client
HoodieMetadataConfig.Builder metadataConfigBuilder = getMetadataConfigBuilder(false, false);
upsertToTable(metadataConfigBuilder.build(), tableName);
// validate table config
@@ -447,8 +449,8 @@ private void indexMetadataPartitionsAndAssert(MetadataPartitionType partitionTyp
public void testIndexerDropPartitionDeletesInstantFromTimeline() {
String tableName = "indexer_test";
HoodieWriteConfig.Builder writeConfigBuilder = getWriteConfigBuilder(basePath(), tableName);
// enable files on the regular write client
HoodieMetadataConfig.Builder metadataConfigBuilder = getMetadataConfigBuilder(true, false).withMetadataIndexBloomFilter(true);
// enable files only with the regular write client
HoodieMetadataConfig.Builder metadataConfigBuilder = getMetadataConfigBuilder(true, false).withMetadataIndexBloomFilter(true).withMetadataIndexColumnStats(false);
HoodieWriteConfig writeConfig = writeConfigBuilder.withMetadataConfig(metadataConfigBuilder.build()).build();
// do one upsert with synchronous metadata update
try (SparkRDDWriteClient writeClient = new SparkRDDWriteClient(context(), writeConfig)) {
@@ -501,8 +503,8 @@ public void testIndexerDropPartitionDeletesInstantFromTimeline() {
public void testTwoIndexersOneCreateOneDropPartition() {
String tableName = "indexer_test";
HoodieWriteConfig.Builder writeConfigBuilder = getWriteConfigBuilder(basePath(), tableName);
// enable files on the regular write client
HoodieMetadataConfig.Builder metadataConfigBuilder = getMetadataConfigBuilder(true, false);
// enable files only with the regular write client
HoodieMetadataConfig.Builder metadataConfigBuilder = getMetadataConfigBuilder(true, false).withMetadataIndexColumnStats(false);
HoodieWriteConfig writeConfig = writeConfigBuilder.withMetadataConfig(metadataConfigBuilder.build()).build();
// do one upsert with synchronous metadata update
try (SparkRDDWriteClient writeClient = new SparkRDDWriteClient(context(), writeConfig)) {
Original file line number Diff line number Diff line change
@@ -25,6 +25,7 @@
import org.apache.hudi.TestHoodieSparkUtils;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieAvroRecord;
@@ -155,6 +156,8 @@ protected HoodieDeltaStreamer.Config getDeltaStreamerConfig(String[] transformer

protected HoodieDeltaStreamer.Config getDeltaStreamerConfig(String[] transformerClasses, boolean nullForDeletedCols,
TypedProperties extraProps) throws IOException {
extraProps.setProperty(HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key(),"false");

extraProps.setProperty("hoodie.datasource.write.table.type", tableType);
extraProps.setProperty("hoodie.datasource.write.row.writer.enable", rowWriterEnable.toString());
extraProps.setProperty(DataSourceWriteOptions.SET_NULL_FOR_MISSING_COLUMNS().key(), Boolean.toString(nullForDeletedCols));
Original file line number Diff line number Diff line change
@@ -506,7 +506,8 @@ public void testPartitionPruningInHoodieIncrSource()
.withScheduleInlineCompaction(true)
.withMaxNumDeltaCommitsBeforeCompaction(1)
.build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withMetadataIndexColumnStats(false).build())
// if col stats is enabled, col stats based pruning kicks in and changes some expected value in this test.
.build();
List<WriteResult> inserts = new ArrayList<>();
try (SparkRDDWriteClient writeClient = getHoodieWriteClient(writeConfig)) {