Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-8232] Adding Index defn for Col stats to track the list of columns indexed #12529

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -284,4 +284,7 @@ protected void writeTableMetadata(HoodieTable table, String instantTime, HoodieC
}
}
}

protected void updateColumnsToIndexWithColStats(HoodieTableMetaClient metaClient, List<String> columnsToIndex) {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.hudi.common.util.CleanerUtils;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Functions;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
Expand Down Expand Up @@ -296,6 +297,12 @@ protected void commit(HoodieTable table, String commitActionType, String instant
writeTableMetadata(table, instantTime, metadata);
activeTimeline.saveAsComplete(false, table.getMetaClient().createNewInstant(HoodieInstant.State.INFLIGHT, commitActionType, instantTime),
serializeCommitMetadata(table.getMetaClient().getCommitMetadataSerDe(), metadata));
// update cols to Index as applicable
HoodieIndexClientUtils.updateColsToIndex(table, config, metadata,
(Functions.Function2<HoodieTableMetaClient, List<String>, Void>) (val1, val2) -> {
updateColumnsToIndexWithColStats(val1, val2);
return null;
});
}

// Save internal schema
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.hudi.client;

import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Functions;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadataUtil;
import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.table.HoodieTable;

import java.util.List;

public class HoodieIndexClientUtils {

public static void updateColsToIndex(HoodieTable dataTable, HoodieWriteConfig config, HoodieCommitMetadata commitMetadata,
Functions.Function2<HoodieTableMetaClient, List<String>, Void> updateColSatsFunc) {
if (config.getMetadataConfig().isColumnStatsIndexEnabled()) {
nsivabalan marked this conversation as resolved.
Show resolved Hide resolved
dataTable.getMetaClient().reloadTableConfig();
if (dataTable.getMetaClient().getTableConfig().getMetadataPartitions().contains(MetadataPartitionType.COLUMN_STATS.getPartitionPath())) {
try {
HoodieTableMetaClient mdtMetaClient = HoodieTableMetaClient.builder()
.setStorage(dataTable.getStorage())
.setBasePath(HoodieTableMetadata.getMetadataTableBasePath(dataTable.getMetaClient().getBasePath()))
.build();
HoodieInstant latestInstant = mdtMetaClient.getActiveTimeline().getWriteTimeline().filterCompletedInstants().lastInstant().get();
final HoodieCommitMetadata mdtCommitMetadata = mdtMetaClient.getTimelineLayout().getCommitMetadataSerDe().deserialize(
latestInstant,
mdtMetaClient.getActiveTimeline().getInstantDetails(latestInstant).get(),
HoodieCommitMetadata.class);
if (mdtCommitMetadata.getPartitionToWriteStats().containsKey(MetadataPartitionType.COLUMN_STATS.getPartitionPath())) {
// update data table's table config for list of columns indexed.
List<String> columnsToIndex = HoodieTableMetadataUtil.getColumnsToIndex(commitMetadata, dataTable.getMetaClient(), config.getMetadataConfig());
// if col stats is getting updated, lets also update list of columns indexed if changed.
updateColSatsFunc.apply(dataTable.getMetaClient(), columnsToIndex);
}
} catch (Exception e) {
throw new HoodieException("Updating data table config to latest set of columns indexed with col stats failed ", e);
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.util.Lazy;

import org.apache.avro.Schema;
import org.slf4j.Logger;
Expand All @@ -98,6 +99,7 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.apache.hudi.common.config.HoodieMetadataConfig.COLUMN_STATS_INDEX_FOR_COLUMNS;
import static org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_POPULATE_META_FIELDS;
import static org.apache.hudi.common.table.HoodieTableConfig.TIMELINE_HISTORY_PATH;
import static org.apache.hudi.common.table.timeline.HoodieInstant.State.REQUESTED;
Expand Down Expand Up @@ -403,6 +405,7 @@ private void initializeFromFilesystem(String initializationTime, List<MetadataPa
LOG.info("Initializing MDT partition {} at instant {}", partitionTypeName, instantTimeForPartition);
String partitionName;
Pair<Integer, HoodieData<HoodieRecord>> fileGroupCountAndRecordsPair;
List<String> columnsToIndex = new ArrayList<>();
try {
switch (partitionType) {
case FILES:
Expand All @@ -414,7 +417,9 @@ private void initializeFromFilesystem(String initializationTime, List<MetadataPa
partitionName = BLOOM_FILTERS.getPartitionPath();
break;
case COLUMN_STATS:
fileGroupCountAndRecordsPair = initializeColumnStatsPartition(partitionToFilesMap);
Pair<List<String>, Pair<Integer, HoodieData<HoodieRecord>>> colStatsColumnsAndRecord = initializeColumnStatsPartition(partitionToFilesMap);
columnsToIndex = colStatsColumnsAndRecord.getKey();
fileGroupCountAndRecordsPair = colStatsColumnsAndRecord.getValue();
partitionName = COLUMN_STATS.getPartitionPath();
break;
case RECORD_INDEX:
Expand Down Expand Up @@ -477,7 +482,16 @@ private void initializeFromFilesystem(String initializationTime, List<MetadataPa
HoodieData<HoodieRecord> records = fileGroupCountAndRecordsPair.getValue();
bulkCommit(instantTimeForPartition, partitionName, records, fileGroupCount);
metadataMetaClient.reloadActiveTimeline();

if (partitionType == COLUMN_STATS) {
// initialize Col Stats
// if col stats, lets also update list of columns indexed if changed.
updateColumnsToIndexWithColStats(columnsToIndex);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're anyway going to check and update post commit, why doing it here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just that, if initialization succeeded, but before applying the commit of interest if job crashed. we don't want to miss the index defn update.

Copy link
Contributor Author

@nsivabalan nsivabalan Dec 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

essentially, for regular commits, both mdt write and data table write has to complete and we update the index defn.
but for initialization, its a special commit, once mdt commit is completed, readers are eligible to read the data. and hence its better to keep the index defn in sync.

/*if (!dataMetaClient.getTableConfig().getTableColStatsIndexedColumns().equals(columnsToIndex)) {
LOG.info(String.format("List of columns to index is changing. Old value %s. New value %s", dataMetaClient.getTableConfig().getTableColStatsIndexedColumns(),
columnsToIndex));
dataMetaClient.getTableConfig().setColStatsIndexedColumns(dataMetaClient, columnsToIndex);
}*/
}
dataMetaClient.getTableConfig().setMetadataPartitionState(dataMetaClient, partitionName, true);
// initialize the metadata reader again so the MDT partition can be read after initialization
initMetadataReader();
Expand All @@ -486,6 +500,9 @@ private void initializeFromFilesystem(String initializationTime, List<MetadataPa
}
}

protected void updateColumnsToIndexWithColStats(List<String> columnsToIndex) {
}

/**
* Returns a unique timestamp to use for initializing a MDT partition.
* <p>
Expand Down Expand Up @@ -519,15 +536,28 @@ private Pair<Integer, HoodieData<HoodieRecord>> initializePartitionStatsIndex(Li
return Pair.of(fileGroupCount, records);
}

private Pair<Integer, HoodieData<HoodieRecord>> initializeColumnStatsPartition(Map<String, Map<String, Long>> partitionToFilesMap) {
private Pair<List<String>, Pair<Integer, HoodieData<HoodieRecord>>> initializeColumnStatsPartition(Map<String, Map<String, Long>> partitionToFilesMap) {
// Find the columns to index
final List<String> columnsToIndex = HoodieTableMetadataUtil.getColumnsToIndex(dataMetaClient.getTableConfig(),
dataWriteConfig.getMetadataConfig(), Lazy.lazily(() -> HoodieTableMetadataUtil.tryResolveSchemaForTable(dataMetaClient)));
if (columnsToIndex.isEmpty()) {
// atleast meta fields will be chosen for indexing. So, we should not reach this state.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens when populateMetaFields is disabled? Should we still error out?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nope. why. we should just skip indexing meta field if populateMetaFields is false.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  private static List<String> getColumnsToIndex(HoodieTableConfig tableConfig,
                                                HoodieMetadataConfig metadataConfig,
                                                Either<List<String>, Lazy<Option<Schema>>> tableSchema,
                                                Option<HoodieRecordType> recordType) {
    Stream<String> columnsToIndexWithoutRequiredMetas = getColumnsToIndexWithoutRequiredMetaFields(metadataConfig, tableSchema, recordType);
    if (!tableConfig.populateMetaFields()) {
      return columnsToIndexWithoutRequiredMetas.collect(Collectors.toList());
    }

    return Stream.concat(Arrays.stream(META_COLS_TO_ALWAYS_INDEX), columnsToIndexWithoutRequiredMetas).collect(Collectors.toList());
  }

So, already its accounted for.

LOG.warn("No columns to index for column stats index.");
throw new HoodieException("No columns are found eligible to be indexed. Can you try setting right value for " + COLUMN_STATS_INDEX_FOR_COLUMNS.key()
+ ". Current value set : " + dataWriteConfig.getMetadataConfig().getColumnsEnabledForColumnStatsIndex());
}

LOG.info("Indexing {} columns for column stats index", columnsToIndex.size());

// during initialization, we need stats for base and log files.
HoodieData<HoodieRecord> records = HoodieTableMetadataUtil.convertFilesToColumnStatsRecords(
engineContext, Collections.emptyMap(), partitionToFilesMap, dataMetaClient, dataWriteConfig.getMetadataConfig(),
dataWriteConfig.getColumnStatsIndexParallelism(),
dataWriteConfig.getMetadataConfig().getMaxReaderBufferSize());
dataWriteConfig.getMetadataConfig().getMaxReaderBufferSize(),
columnsToIndex);

final int fileGroupCount = dataWriteConfig.getMetadataConfig().getColumnStatsIndexFileGroupCount();
return Pair.of(fileGroupCount, records);
return Pair.of(columnsToIndex, Pair.of(fileGroupCount, records));
}

private Pair<Integer, HoodieData<HoodieRecord>> initializeBloomFiltersPartition(String createInstantTime, Map<String, Map<String, Long>> partitionToFilesMap) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieClusteringGroup;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.client.HoodieIndexClientUtils;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.transaction.TransactionManager;
import org.apache.hudi.client.utils.TransactionUtils;
Expand All @@ -33,12 +34,14 @@
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieInstant.State;
import org.apache.hudi.common.table.timeline.InstantGenerator;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Functions;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
Expand Down Expand Up @@ -230,12 +233,21 @@ protected void commit(HoodieWriteMetadata<O> result, List<HoodieWriteStat> write
serializeCommitMetadata(table.getMetaClient().getCommitMetadataSerDe(), metadata));
LOG.info("Committed " + instantTime);
result.setCommitMetadata(Option.of(metadata));
// update cols to Index as applicable
HoodieIndexClientUtils.updateColsToIndex(table, config, metadata,
(Functions.Function2<HoodieTableMetaClient, List<String>, Void>) (val1, val2) -> {
updateColumnsToIndexWithColStats(val1, val2);
return null;
});
} catch (IOException e) {
throw new HoodieCommitException("Failed to complete commit " + config.getBasePath() + " at time " + instantTime,
e);
}
}

protected void updateColumnsToIndexWithColStats(HoodieTableMetaClient metaClient, List<String> columnsToIndex) {
}

/**
* Finalize Write operation.
* @param instantTime Instant Time
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Map;

public abstract class BaseHoodieIndexClient {
Expand All @@ -47,12 +48,14 @@ public BaseHoodieIndexClient() {
public void register(HoodieTableMetaClient metaClient, HoodieIndexDefinition indexDefinition) {
LOG.info("Registering index {} of using {}", indexDefinition.getIndexName(), indexDefinition.getIndexType());
// build HoodieIndexMetadata and then add to index definition file
metaClient.buildIndexDefinition(indexDefinition);
// update table config if necessary
String indexMetaPath = metaClient.getIndexDefinitionPath();
if (!metaClient.getTableConfig().getProps().containsKey(HoodieTableConfig.RELATIVE_INDEX_DEFINITION_PATH) || !metaClient.getTableConfig().getRelativeIndexDefinitionPath().isPresent()) {
metaClient.getTableConfig().setValue(HoodieTableConfig.RELATIVE_INDEX_DEFINITION_PATH, FSUtils.getRelativePartitionPath(metaClient.getBasePath(), new StoragePath(indexMetaPath)));
HoodieTableConfig.update(metaClient.getStorage(), metaClient.getMetaPath(), metaClient.getTableConfig().getProps());
boolean indexDefnUpdated = metaClient.buildIndexDefinition(indexDefinition);
if (indexDefnUpdated) {
// update table config if necessary
String indexMetaPath = metaClient.getIndexDefinitionPath();
if (!metaClient.getTableConfig().getProps().containsKey(HoodieTableConfig.RELATIVE_INDEX_DEFINITION_PATH.key()) || !metaClient.getTableConfig().getRelativeIndexDefinitionPath().isPresent()) {
metaClient.getTableConfig().setValue(HoodieTableConfig.RELATIVE_INDEX_DEFINITION_PATH, FSUtils.getRelativePartitionPath(metaClient.getBasePath(), new StoragePath(indexMetaPath)));
HoodieTableConfig.update(metaClient.getStorage(), metaClient.getMetaPath(), metaClient.getTableConfig().getProps());
}
}
}

Expand All @@ -61,6 +64,8 @@ public void register(HoodieTableMetaClient metaClient, HoodieIndexDefinition ind
*/
public abstract void create(HoodieTableMetaClient metaClient, String indexName, String indexType, Map<String, Map<String, String>> columns, Map<String, String> options) throws Exception;

public abstract void createOrUpdateColumnStatsIndexDefinition(HoodieTableMetaClient metaClient, List<String> columnsToIndex);

/**
* Drop an index. By default, ignore drop if index does not exist.
*
Expand Down
Loading