Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-8232] Adding Index defn for Col stats to track the list of columns indexed #12529

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -284,4 +284,7 @@ protected void writeTableMetadata(HoodieTable table, String instantTime, HoodieC
}
}
}

protected void updateColumnsToIndexWithColStats(HoodieTableMetaClient metaClient, List<String> columnsToIndex) {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.hudi.common.util.CleanerUtils;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Functions;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieClusteringConfig;
Expand Down Expand Up @@ -344,6 +345,11 @@ protected void completeCompaction(HoodieCommitMetadata metadata, HoodieTable tab
LOG.info("Committing Compaction {}", compactionCommitTime);
LOG.debug("Compaction {} finished with result: {}", compactionCommitTime, metadata);
CompactHelpers.getInstance().completeInflightCompaction(table, compactionCommitTime, metadata);
// update table config for cols to Index as applicable
HoodieIndexClientUtils.updateColsToIndex(table, config, metadata, (Functions.Function2<HoodieTableMetaClient, List<String>, Void>) (val1, val2) -> {
updateColumnsToIndexWithColStats(val1, val2);
return null;
});
} finally {
this.txnManager.endTransaction(Option.of(compactionInstant));
releaseResources(compactionCommitTime);
Expand Down Expand Up @@ -407,6 +413,11 @@ protected void completeLogCompaction(HoodieCommitMetadata metadata, HoodieTable
LOG.info("Committing Log Compaction {}", logCompactionCommitTime);
LOG.debug("Log Compaction {} finished with result {}", logCompactionCommitTime, metadata);
CompactHelpers.getInstance().completeInflightLogCompaction(table, logCompactionCommitTime, metadata);
// update table config for cols to Index as applicable
HoodieIndexClientUtils.updateColsToIndex(table, config, metadata, (Functions.Function2<HoodieTableMetaClient, List<String>, Void>) (val1, val2) -> {
updateColumnsToIndexWithColStats(val1, val2);
return null;
});
} finally {
this.txnManager.endTransaction(Option.of(logCompactionInstant));
releaseResources(logCompactionCommitTime);
Expand Down Expand Up @@ -545,6 +556,11 @@ private void completeClustering(HoodieReplaceCommitMetadata metadata,

ClusteringUtils.transitionClusteringOrReplaceInflightToComplete(false, clusteringInstant,
serializeCommitMetadata(table.getMetaClient().getCommitMetadataSerDe(), metadata), table.getActiveTimeline());
// update table config for cols to Index as applicable
HoodieIndexClientUtils.updateColsToIndex(table, config, metadata, (Functions.Function2<HoodieTableMetaClient, List<String>, Void>) (val1, val2) -> {
updateColumnsToIndexWithColStats(val1, val2);
return null;
});
} catch (Exception e) {
throw new HoodieClusteringException("unable to transition clustering inflight to complete: " + clusteringCommitTime, e);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.hudi.common.util.CleanerUtils;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Functions;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
Expand Down Expand Up @@ -296,6 +297,12 @@ protected void commit(HoodieTable table, String commitActionType, String instant
writeTableMetadata(table, instantTime, metadata);
activeTimeline.saveAsComplete(false, table.getMetaClient().createNewInstant(HoodieInstant.State.INFLIGHT, commitActionType, instantTime),
serializeCommitMetadata(table.getMetaClient().getCommitMetadataSerDe(), metadata));
// update table config for cols to Index as applicable
HoodieIndexClientUtils.updateColsToIndex(table, config, metadata,
(Functions.Function2<HoodieTableMetaClient, List<String>, Void>) (val1, val2) -> {
updateColumnsToIndexWithColStats(val1, val2);
return null;
});
}

// Save internal schema
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.hudi.client;

import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Functions;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadataUtil;
import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.table.HoodieTable;

import java.util.List;

public class HoodieIndexClientUtils {

public static void updateColsToIndex(HoodieTable dataTable, HoodieWriteConfig config, HoodieCommitMetadata commitMetadata,
Functions.Function2<HoodieTableMetaClient, List<String>, Void> updateColSatsFunc) {
if (config.getMetadataConfig().isColumnStatsIndexEnabled()) {
nsivabalan marked this conversation as resolved.
Show resolved Hide resolved
try {
HoodieTableMetaClient mdtMetaClient = HoodieTableMetaClient.builder()
.setStorage(dataTable.getStorage())
.setBasePath(HoodieTableMetadata.getMetadataTableBasePath(dataTable.getMetaClient().getBasePath()))
.build();
HoodieInstant latestInstant = mdtMetaClient.getActiveTimeline().filterCompletedInstants().getInstantsOrderedByCompletionTime().reduce((a, b) -> b).get();
nsivabalan marked this conversation as resolved.
Show resolved Hide resolved

final HoodieCommitMetadata mdtCommitMetadata = mdtMetaClient.getTimelineLayout().getCommitMetadataSerDe().deserialize(
latestInstant,
mdtMetaClient.getActiveTimeline().getInstantDetails(latestInstant).get(),
HoodieCommitMetadata.class);
if (mdtCommitMetadata.getPartitionToWriteStats().containsKey(MetadataPartitionType.COLUMN_STATS.getPartitionPath())) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check is costly. If not for this check, then we don't need mdtCommitMetadata and hence don't need to create mdtMetaClient and deserialize the instant. If we strictly need this check, please consider caching the meta client or reusing an existing instance if possible.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is more of a defensive check. what incase nothing gets written to col stats partition even though write config and table property say so. So, just wanted to make sure we have successfully written data to col stats partition. before we can update the index defn.

Also, our metadata writer is short lived. we create a new one, apply a commit metadata and close it out. So, the data table write client does not have any metadata meta client for us to re-use here unfortunately.

// update data table's table config for list of columns indexed.
List<String> columnsToIndex = HoodieTableMetadataUtil.getColumnsToIndex(commitMetadata, dataTable.getMetaClient(), config.getMetadataConfig());
// if col stats is getting updated, lets also update list of columns indexed if changed.
updateColSatsFunc.apply(dataTable.getMetaClient(), columnsToIndex);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how are nested fields getting tracked? Is it like a.b? Asking bcoz if a nested field has same name as any other non-nested field, then equals check could return unexpected result.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, good point. we might have to split by comma and then compare the lists.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, its already taken care.

if (!indexMetadataOpt.get().getIndexDefinitions().get(indexName).getSourceFields().equals(indexDefinition.getSourceFields())) {

this is in HoodieTableMetaClient.buildColSatsIndexDefinition

so, its already list of strings comparison. So, we should be good

}
} catch (Exception e) {
throw new HoodieException("Updating data table config to latest set of columns indexed with col stats failed ", e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.util.Lazy;

import org.apache.avro.Schema;
import org.slf4j.Logger;
Expand All @@ -98,6 +99,7 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.apache.hudi.common.config.HoodieMetadataConfig.COLUMN_STATS_INDEX_FOR_COLUMNS;
import static org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_POPULATE_META_FIELDS;
import static org.apache.hudi.common.table.HoodieTableConfig.TIMELINE_HISTORY_PATH;
import static org.apache.hudi.common.table.timeline.HoodieInstant.State.REQUESTED;
Expand Down Expand Up @@ -403,6 +405,7 @@ private void initializeFromFilesystem(String initializationTime, List<MetadataPa
LOG.info("Initializing MDT partition {} at instant {}", partitionTypeName, instantTimeForPartition);
String partitionName;
Pair<Integer, HoodieData<HoodieRecord>> fileGroupCountAndRecordsPair;
List<String> columnsToIndex = new ArrayList<>();
try {
switch (partitionType) {
case FILES:
Expand All @@ -414,7 +417,9 @@ private void initializeFromFilesystem(String initializationTime, List<MetadataPa
partitionName = BLOOM_FILTERS.getPartitionPath();
break;
case COLUMN_STATS:
fileGroupCountAndRecordsPair = initializeColumnStatsPartition(partitionToFilesMap);
Pair<List<String>, Pair<Integer, HoodieData<HoodieRecord>>> colStatsColumnsAndRecord = initializeColumnStatsPartition(partitionToFilesMap);
columnsToIndex = colStatsColumnsAndRecord.getKey();
fileGroupCountAndRecordsPair = colStatsColumnsAndRecord.getValue();
partitionName = COLUMN_STATS.getPartitionPath();
break;
case RECORD_INDEX:
Expand Down Expand Up @@ -477,7 +482,16 @@ private void initializeFromFilesystem(String initializationTime, List<MetadataPa
HoodieData<HoodieRecord> records = fileGroupCountAndRecordsPair.getValue();
bulkCommit(instantTimeForPartition, partitionName, records, fileGroupCount);
metadataMetaClient.reloadActiveTimeline();

if (partitionType == COLUMN_STATS) {
// initialize Col Stats
// if col stats, lets also update list of columns indexed if changed.
updateColumnsToIndexWithColStats(columnsToIndex);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're anyway going to check and update post commit, why doing it here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just that, if initialization succeeded, but before applying the commit of interest if job crashed. we don't want to miss the index defn update.

Copy link
Contributor Author

@nsivabalan nsivabalan Dec 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

essentially, for regular commits, both mdt write and data table write has to complete and we update the index defn.
but for initialization, its a special commit, once mdt commit is completed, readers are eligible to read the data. and hence its better to keep the index defn in sync.

/*if (!dataMetaClient.getTableConfig().getTableColStatsIndexedColumns().equals(columnsToIndex)) {
LOG.info(String.format("List of columns to index is changing. Old value %s. New value %s", dataMetaClient.getTableConfig().getTableColStatsIndexedColumns(),
columnsToIndex));
dataMetaClient.getTableConfig().setColStatsIndexedColumns(dataMetaClient, columnsToIndex);
}*/
}
dataMetaClient.getTableConfig().setMetadataPartitionState(dataMetaClient, partitionName, true);
// initialize the metadata reader again so the MDT partition can be read after initialization
initMetadataReader();
Expand All @@ -486,6 +500,9 @@ private void initializeFromFilesystem(String initializationTime, List<MetadataPa
}
}

protected void updateColumnsToIndexWithColStats(List<String> columnsToIndex) {
}

/**
* Returns a unique timestamp to use for initializing a MDT partition.
* <p>
Expand Down Expand Up @@ -519,15 +536,28 @@ private Pair<Integer, HoodieData<HoodieRecord>> initializePartitionStatsIndex(Li
return Pair.of(fileGroupCount, records);
}

private Pair<Integer, HoodieData<HoodieRecord>> initializeColumnStatsPartition(Map<String, Map<String, Long>> partitionToFilesMap) {
private Pair<List<String>, Pair<Integer, HoodieData<HoodieRecord>>> initializeColumnStatsPartition(Map<String, Map<String, Long>> partitionToFilesMap) {
// Find the columns to index
final List<String> columnsToIndex = HoodieTableMetadataUtil.getColumnsToIndex(dataMetaClient.getTableConfig(),
dataWriteConfig.getMetadataConfig(), Lazy.lazily(() -> HoodieTableMetadataUtil.tryResolveSchemaForTable(dataMetaClient)));
if (columnsToIndex.isEmpty()) {
// atleast meta fields will be chosen for indexing. So, we should not reach this state.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens when populateMetaFields is disabled? Should we still error out?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nope. why. we should just skip indexing meta field if populateMetaFields is false.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  private static List<String> getColumnsToIndex(HoodieTableConfig tableConfig,
                                                HoodieMetadataConfig metadataConfig,
                                                Either<List<String>, Lazy<Option<Schema>>> tableSchema,
                                                Option<HoodieRecordType> recordType) {
    Stream<String> columnsToIndexWithoutRequiredMetas = getColumnsToIndexWithoutRequiredMetaFields(metadataConfig, tableSchema, recordType);
    if (!tableConfig.populateMetaFields()) {
      return columnsToIndexWithoutRequiredMetas.collect(Collectors.toList());
    }

    return Stream.concat(Arrays.stream(META_COLS_TO_ALWAYS_INDEX), columnsToIndexWithoutRequiredMetas).collect(Collectors.toList());
  }

So, already its accounted for.

LOG.warn("No columns to index for column stats index.");
throw new HoodieException("No columns are found eligible to be indexed. Can you try setting right value for " + COLUMN_STATS_INDEX_FOR_COLUMNS.key()
+ ". Current value set : " + dataWriteConfig.getMetadataConfig().getColumnsEnabledForColumnStatsIndex());
}

LOG.info("Indexing {} columns for column stats index", columnsToIndex.size());

// during initialization, we need stats for base and log files.
HoodieData<HoodieRecord> records = HoodieTableMetadataUtil.convertFilesToColumnStatsRecords(
engineContext, Collections.emptyMap(), partitionToFilesMap, dataMetaClient, dataWriteConfig.getMetadataConfig(),
dataWriteConfig.getColumnStatsIndexParallelism(),
dataWriteConfig.getMetadataConfig().getMaxReaderBufferSize());
dataWriteConfig.getMetadataConfig().getMaxReaderBufferSize(),
columnsToIndex);

final int fileGroupCount = dataWriteConfig.getMetadataConfig().getColumnStatsIndexFileGroupCount();
return Pair.of(fileGroupCount, records);
return Pair.of(columnsToIndex, Pair.of(fileGroupCount, records));
}

private Pair<Integer, HoodieData<HoodieRecord>> initializeBloomFiltersPartition(String createInstantTime, Map<String, Map<String, Long>> partitionToFilesMap) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieClusteringGroup;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.client.HoodieIndexClientUtils;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.transaction.TransactionManager;
import org.apache.hudi.client.utils.TransactionUtils;
Expand All @@ -33,12 +34,14 @@
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieInstant.State;
import org.apache.hudi.common.table.timeline.InstantGenerator;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Functions;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
Expand Down Expand Up @@ -230,12 +233,21 @@ protected void commit(HoodieWriteMetadata<O> result, List<HoodieWriteStat> write
serializeCommitMetadata(table.getMetaClient().getCommitMetadataSerDe(), metadata));
LOG.info("Committed " + instantTime);
result.setCommitMetadata(Option.of(metadata));
// update table config for cols to Index as applicable
HoodieIndexClientUtils.updateColsToIndex(table, config, metadata,
(Functions.Function2<HoodieTableMetaClient, List<String>, Void>) (val1, val2) -> {
updateColumnsToIndexWithColStats(val1, val2);
return null;
});
} catch (IOException e) {
throw new HoodieCommitException("Failed to complete commit " + config.getBasePath() + " at time " + instantTime,
e);
}
}

protected void updateColumnsToIndexWithColStats(HoodieTableMetaClient metaClient, List<String> columnsToIndex) {
}

/**
* Finalize Write operation.
* @param instantTime Instant Time
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Map;

import static org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS;

public abstract class BaseHoodieIndexClient {

private static final Logger LOG = LoggerFactory.getLogger(BaseHoodieIndexClient.class);
Expand All @@ -47,12 +50,19 @@ public BaseHoodieIndexClient() {
public void register(HoodieTableMetaClient metaClient, HoodieIndexDefinition indexDefinition) {
LOG.info("Registering index {} of using {}", indexDefinition.getIndexName(), indexDefinition.getIndexType());
// build HoodieIndexMetadata and then add to index definition file
metaClient.buildIndexDefinition(indexDefinition);
// update table config if necessary
String indexMetaPath = metaClient.getIndexDefinitionPath();
if (!metaClient.getTableConfig().getProps().containsKey(HoodieTableConfig.RELATIVE_INDEX_DEFINITION_PATH) || !metaClient.getTableConfig().getRelativeIndexDefinitionPath().isPresent()) {
metaClient.getTableConfig().setValue(HoodieTableConfig.RELATIVE_INDEX_DEFINITION_PATH, FSUtils.getRelativePartitionPath(metaClient.getBasePath(), new StoragePath(indexMetaPath)));
HoodieTableConfig.update(metaClient.getStorage(), metaClient.getMetaPath(), metaClient.getTableConfig().getProps());
boolean indexDefnUpdated = true;
if (indexDefinition.getIndexName().equals(PARTITION_NAME_COLUMN_STATS)) {
indexDefnUpdated = metaClient.buildColSatsIndexDefinition(indexDefinition);
} else {
metaClient.buildIndexDefinition(indexDefinition);
}
if (indexDefnUpdated) {
// update table config if necessary
String indexMetaPath = metaClient.getIndexDefinitionPath();
if (!metaClient.getTableConfig().getProps().containsKey(HoodieTableConfig.RELATIVE_INDEX_DEFINITION_PATH.key()) || !metaClient.getTableConfig().getRelativeIndexDefinitionPath().isPresent()) {
metaClient.getTableConfig().setValue(HoodieTableConfig.RELATIVE_INDEX_DEFINITION_PATH, FSUtils.getRelativePartitionPath(metaClient.getBasePath(), new StoragePath(indexMetaPath)));
HoodieTableConfig.update(metaClient.getStorage(), metaClient.getMetaPath(), metaClient.getTableConfig().getProps());
}
}
}

Expand All @@ -61,6 +71,8 @@ public void register(HoodieTableMetaClient metaClient, HoodieIndexDefinition ind
*/
public abstract void create(HoodieTableMetaClient metaClient, String indexName, String indexType, Map<String, Map<String, String>> columns, Map<String, String> options) throws Exception;

public abstract void createOrUpdateColStatsIndex(HoodieTableMetaClient metaClient, List<String> columnsToIndex);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about rename to createOrUpdateColumnStatsIndexDefinition? The method is not actually creating/updating the stats right.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


/**
* Drop an index. By default, ignore drop if index does not exist.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.TableServiceType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.TimelineUtils;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.Functions;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieClusteringException;
Expand Down Expand Up @@ -89,6 +91,12 @@ protected void completeCompaction(HoodieCommitMetadata metadata, HoodieTable tab
writeTableMetadata(table, compactionCommitTime, metadata);
LOG.info("Committing Compaction {} finished with result {}.", compactionCommitTime, metadata);
CompactHelpers.getInstance().completeInflightCompaction(table, compactionCommitTime, metadata);
// update table config for cols to Index as applicable
HoodieIndexClientUtils.updateColsToIndex(table, config, metadata,
(Functions.Function2<HoodieTableMetaClient, List<String>, Void>) (val1, val2) -> {
updateColumnsToIndexWithColStats(val1, val2);
return null;
});
} finally {
this.txnManager.endTransaction(Option.of(compactionInstant));
}
Expand Down Expand Up @@ -140,6 +148,12 @@ protected void completeClustering(
clusteringInstant,
serializeCommitMetadata(table.getMetaClient().getCommitMetadataSerDe(), metadata),
table.getActiveTimeline());
// update table config for cols to Index as applicable
HoodieIndexClientUtils.updateColsToIndex(table, config, metadata,
(Functions.Function2<HoodieTableMetaClient, List<String>, Void>) (val1, val2) -> {
updateColumnsToIndexWithColStats(val1, val2);
return null;
});
} catch (IOException e) {
throw new HoodieClusteringException(
"Failed to commit " + table.getMetaClient().getBasePath() + " at time " + clusteringCommitTime, e);
Expand Down
Loading