Skip to content

Commit

Permalink
feat: DH-18399: Add ParquetColumnResolver
Browse files Browse the repository at this point in the history
  • Loading branch information
devinrsmith committed Jan 13, 2025
1 parent b8f301c commit d379760
Show file tree
Hide file tree
Showing 8 changed files with 823 additions and 168 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
import io.deephaven.hash.KeyedObjectKey;
import io.deephaven.hash.KeyedObjectKey.Basic;
import io.deephaven.parquet.base.ParquetUtils;
import io.deephaven.parquet.table.location.ParquetColumnResolver;
import io.deephaven.util.annotations.VisibleForTesting;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.hadoop.metadata.FileMetaData;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

Expand Down Expand Up @@ -167,6 +169,8 @@ public final String getColumnNameFromParquetColumnNameOrDefault(final String par

public abstract Optional<Collection<List<String>>> getIndexColumns();

public abstract Optional<ParquetColumnResolver.Factory> getColumnResolver();

/**
* Creates a new {@link ParquetInstructions} object with the same properties as the current object but definition
* set as the provided {@link TableDefinition}.
Expand Down Expand Up @@ -316,6 +320,11 @@ public Optional<Collection<List<String>>> getIndexColumns() {
return Optional.empty();
}

@Override
public Optional<ParquetColumnResolver.Factory> getColumnResolver() {
return Optional.empty();
}

@Override
public ParquetInstructions withTableDefinition(@Nullable final TableDefinition useDefinition) {
return withTableDefinitionAndLayout(useDefinition, null);
Expand All @@ -333,15 +342,15 @@ public ParquetInstructions withTableDefinitionAndLayout(
return new ReadOnly(null, null, getCompressionCodecName(), getMaximumDictionaryKeys(),
getMaximumDictionarySize(), isLegacyParquet(), getTargetPageSize(), isRefreshing(),
getSpecialInstructions(), generateMetadataFiles(), baseNameForPartitionedParquetData(),
useLayout, useDefinition, null, null);
useLayout, useDefinition, null, null, null);
}

@Override
ParquetInstructions withIndexColumns(final Collection<List<String>> indexColumns) {
return new ReadOnly(null, null, getCompressionCodecName(), getMaximumDictionaryKeys(),
getMaximumDictionarySize(), isLegacyParquet(), getTargetPageSize(), isRefreshing(),
getSpecialInstructions(), generateMetadataFiles(), baseNameForPartitionedParquetData(),
null, null, indexColumns, null);
null, null, indexColumns, null, null);
}

@Override
Expand Down Expand Up @@ -458,6 +467,7 @@ private static final class ReadOnly extends ParquetInstructions {
private final TableDefinition tableDefinition;
private final Collection<List<String>> indexColumns;
private final OnWriteCompleted onWriteCompleted;
private final ParquetColumnResolver.Factory columnResolver;

private ReadOnly(
final KeyedObjectHashMap<String, ColumnInstructions> columnNameToInstructions,
Expand All @@ -474,7 +484,8 @@ private ReadOnly(
final ParquetFileLayout fileLayout,
final TableDefinition tableDefinition,
final Collection<List<String>> indexColumns,
final OnWriteCompleted onWriteCompleted) {
final OnWriteCompleted onWriteCompleted,
final ParquetColumnResolver.Factory columnResolver) {
this.columnNameToInstructions = columnNameToInstructions;
this.parquetColumnNameToInstructions = parquetColumnNameToColumnName;
this.compressionCodecName = compressionCodecName;
Expand All @@ -493,6 +504,12 @@ private ReadOnly(
.map(List::copyOf)
.collect(Collectors.toUnmodifiableList());
this.onWriteCompleted = onWriteCompleted;
this.columnResolver = columnResolver;
if (columnResolver != null) {
if (tableDefinition == null) {
throw new IllegalArgumentException("When setting columnResolver, tableDefinition must be provided");
}
}
}

private <T> T getOrDefault(final String columnName, final T defaultValue,
Expand Down Expand Up @@ -617,6 +634,11 @@ public Optional<Collection<List<String>>> getIndexColumns() {
return Optional.ofNullable(indexColumns);
}

@Override
public Optional<ParquetColumnResolver.Factory> getColumnResolver() {
return Optional.ofNullable(columnResolver);
}

@Override
public ParquetInstructions withTableDefinition(@Nullable final TableDefinition useDefinition) {
return withTableDefinitionAndLayout(useDefinition, fileLayout);
Expand All @@ -635,7 +657,7 @@ public ParquetInstructions withTableDefinitionAndLayout(
getCompressionCodecName(), getMaximumDictionaryKeys(), getMaximumDictionarySize(),
isLegacyParquet(), getTargetPageSize(), isRefreshing(), getSpecialInstructions(),
generateMetadataFiles(), baseNameForPartitionedParquetData(), useLayout, useDefinition,
indexColumns, onWriteCompleted);
indexColumns, onWriteCompleted, columnResolver);
}

@Override
Expand All @@ -644,7 +666,7 @@ ParquetInstructions withIndexColumns(final Collection<List<String>> useIndexColu
getCompressionCodecName(), getMaximumDictionaryKeys(), getMaximumDictionarySize(),
isLegacyParquet(), getTargetPageSize(), isRefreshing(), getSpecialInstructions(),
generateMetadataFiles(), baseNameForPartitionedParquetData(), fileLayout,
tableDefinition, useIndexColumns, onWriteCompleted);
tableDefinition, useIndexColumns, onWriteCompleted, columnResolver);
}

@Override
Expand Down Expand Up @@ -709,6 +731,7 @@ public static class Builder {
private TableDefinition tableDefinition;
private Collection<List<String>> indexColumns;
private OnWriteCompleted onWriteCompleted;
private ParquetColumnResolver.Factory columnResolverFactory;

/**
* For each additional field added, make sure to update the copy constructor builder
Expand Down Expand Up @@ -737,6 +760,7 @@ public Builder(final ParquetInstructions parquetInstructions) {
tableDefinition = readOnlyParquetInstructions.getTableDefinition().orElse(null);
indexColumns = readOnlyParquetInstructions.getIndexColumns().orElse(null);
onWriteCompleted = readOnlyParquetInstructions.onWriteCompleted().orElse(null);
columnResolverFactory = readOnlyParquetInstructions.getColumnResolver().orElse(null);
}

public Builder addColumnNameMapping(final String parquetColumnName, final String columnName) {
Expand Down Expand Up @@ -974,6 +998,19 @@ public Builder setOnWriteCompleted(final OnWriteCompleted onWriteCompleted) {
return this;
}

/**
* Sets the column resolver factory to allow higher-level managers (such as Iceberg) to use advanced column
* resolution logic based on each Parquet file's {@link FileMetaData}. When set,
* {@link #setTableDefinition(TableDefinition)} must also be set. As such, the factory is <i>not</i> used for
* inference purposes.
*
* @param columnResolverFactory the column resolver factory
*/
public Builder setColumnResolverFactory(ParquetColumnResolver.Factory columnResolverFactory) {
this.columnResolverFactory = columnResolverFactory;
return this;
}

public ParquetInstructions build() {
final KeyedObjectHashMap<String, ColumnInstructions> columnNameToInstructionsOut = columnNameToInstructions;
columnNameToInstructions = null;
Expand All @@ -983,7 +1020,7 @@ public ParquetInstructions build() {
return new ReadOnly(columnNameToInstructionsOut, parquetColumnNameToColumnNameOut, compressionCodecName,
maximumDictionaryKeys, maximumDictionarySize, isLegacyParquet, targetPageSize, isRefreshing,
specialInstructions, generateMetadataFiles, baseNameForPartitionedParquetData, fileLayout,
tableDefinition, indexColumns, onWriteCompleted);
tableDefinition, indexColumns, onWriteCompleted, columnResolverFactory);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.parquet.table.location;

import io.deephaven.annotations.BuildableStyle;
import io.deephaven.engine.table.impl.locations.TableKey;
import io.deephaven.parquet.table.ParquetInstructions;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.schema.MessageType;
import org.immutables.value.Value;

import java.util.Map;

/**
* A mapping between Deephaven column names and Parquet {@link ColumnDescriptor column descriptors}.
*
* TODO: describe better
*/
@Value.Immutable
@BuildableStyle
public abstract class ParquetColumnResolver {

/**
* {@link ParquetInstructions.Builder#setColumnResolverFactory(Factory)}
*/
public interface Factory {

/**
* TODO: description
*
* @param tableKey the table key
* @param tableLocationKey the Parquet TLK
* @return the Parquet column resolver
*/
ParquetColumnResolver init(TableKey tableKey, ParquetTableLocationKey tableLocationKey);
}

public static Builder builder() {
return ImmutableParquetColumnResolver.builder();
}

// Intentionally not exposed, but necessary to expose to Builder for safety checks.
abstract MessageType schema();

/**
* TODO: javadoc
*
* @return
*/
public abstract Map<String, ColumnDescriptor> mapping();

@Value.Check
final void checkColumns() {
for (ColumnDescriptor columnDescriptor : mapping().values()) {
if (!ParquetUtil.contains(schema(), columnDescriptor)) {
throw new IllegalArgumentException("schema does not contain column descriptor " + columnDescriptor);
}
}
}

public interface Builder {

// TODO: javadoc

Builder schema(MessageType schema);

Builder putMapping(String key, ColumnDescriptor value);

Builder putAllMapping(Map<String, ? extends ColumnDescriptor> entries);

ParquetColumnResolver build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.parquet.table.location;

import io.deephaven.engine.table.impl.locations.TableKey;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.hadoop.metadata.FileMetaData;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;

import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;

/**
* The following is an example {@link ParquetColumnResolver.Factory} that may be useful for testing and debugging
* purposes, but is not meant to be used for production use cases.
*/
public final class ParquetColumnResolverFieldIdFactory implements ParquetColumnResolver.Factory {

/**
* TODO: javadoc
*
* @param columnNameToFieldId a map from Deephaven column names to field ids
* @return the column resolver provider
*/
public static ParquetColumnResolverFieldIdFactory of(Map<String, Integer> columnNameToFieldId) {
return new ParquetColumnResolverFieldIdFactory(columnNameToFieldId
.entrySet()
.stream()
.collect(Collectors.groupingBy(
Map.Entry::getValue,
Collectors.mapping(Map.Entry::getKey, Collectors.toSet()))));
}

private final Map<Integer, Set<String>> fieldIdsToDhColumnNames;

private ParquetColumnResolverFieldIdFactory(Map<Integer, Set<String>> fieldIdsToDhColumnNames) {
this.fieldIdsToDhColumnNames = Objects.requireNonNull(fieldIdsToDhColumnNames);
}

@Override
public ParquetColumnResolver init(TableKey tableKey, ParquetTableLocationKey tableLocationKey) {
final MessageType schema = tableLocationKey.getFileReader().getSchema();
// TODO: note the potential for confusion on where to derive schema from.
// final MessageType schema = tableLocationKey.getMetadata().getFileMetaData().getSchema();
final FieldIdMappingVisitor visitor = new FieldIdMappingVisitor();
ParquetUtil.walk(schema, visitor);
return ParquetColumnResolver.builder()
.schema(schema)
.putAllMapping(visitor.nameToColumnDescriptor)
.build();
}

private class FieldIdMappingVisitor implements ParquetUtil.Visitor {
private final Map<String, ColumnDescriptor> nameToColumnDescriptor = new HashMap<>();

@Override
public void accept(Collection<Type> path, PrimitiveType primitiveType) {
// There are different resolution strategies that could all be reasonable. We could consider using only the
// field id closest to the leaf. This version, however, takes the most general approach and considers field
// ids wherever they appear; ultimately, only being resolvable if the field id mapping is unambiguous.
for (Type type : path) {
if (type.getId() == null) {
continue;
}
final int fieldId = type.getId().intValue();
final Set<String> set = fieldIdsToDhColumnNames.get(fieldId);
if (set == null) {
continue;
}
final ColumnDescriptor columnDescriptor = ParquetUtil.makeColumnDescriptor(path, primitiveType);
for (String columnName : set) {
final ColumnDescriptor existing = nameToColumnDescriptor.putIfAbsent(columnName, columnDescriptor);
if (existing != null) {
throw new IllegalStateException(String.format(
"Parquet columns can't be unambigously mapped. %d -> %s has multiple paths %s, %s",
fieldId, columnName, Arrays.toString(existing.getPath()),
Arrays.toString(columnDescriptor.getPath())));
}
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ public class ParquetTableLocation extends AbstractTableLocation {
private final ParquetFileReader parquetFileReader;
private final int[] rowGroupIndices;

private final ParquetColumnResolver resolver;

private final RowGroup[] rowGroups;
private final RegionedPageStore.Parameters regionParameters;
private final Map<String, String[]> parquetColumnNameToPath;
Expand Down Expand Up @@ -88,7 +90,12 @@ public ParquetTableLocation(@NotNull final TableKey tableKey,
parquetMetadata = tableLocationKey.getMetadata();
rowGroupIndices = tableLocationKey.getRowGroupIndices();
}

{
final ParquetColumnResolver.Factory factory = readInstructions.getColumnResolver().orElse(null);
resolver = factory == null
? null
: Objects.requireNonNull(factory.init(tableKey, tableLocationKey));
}
final int rowGroupCount = rowGroupIndices.length;
rowGroups = IntStream.of(rowGroupIndices)
.mapToObj(rgi -> parquetFileReader.fileMetaData.getRow_groups().get(rgi))
Expand Down Expand Up @@ -181,12 +188,22 @@ public List<SortColumn> getSortedColumns() {
@Override
@NotNull
protected ColumnLocation makeColumnLocation(@NotNull final String columnName) {
final List<String> nameList;
final String parquetColumnName = readInstructions.getParquetColumnNameFromColumnNameOrDefault(columnName);
final String[] columnPath = parquetColumnNameToPath.get(parquetColumnName);
final List<String> nameList =
columnPath == null ? Collections.singletonList(parquetColumnName) : Arrays.asList(columnPath);
if (resolver == null) {
final String[] columnPath = parquetColumnNameToPath.get(parquetColumnName);
nameList = columnPath == null ? Collections.singletonList(parquetColumnName) : Arrays.asList(columnPath);
} else {
final ColumnDescriptor columnDescriptor = resolver.mapping().get(columnName);
if (columnDescriptor == null) {
nameList = List.of(); // empty, will not resolve
} else {
nameList = Arrays.asList(columnDescriptor.getPath());
}
}
final ColumnChunkReader[] columnChunkReaders = Arrays.stream(getRowGroupReaders())
.map(rgr -> rgr.getColumnChunk(columnName, nameList)).toArray(ColumnChunkReader[]::new);
.map(rgr -> rgr.getColumnChunk(columnName, nameList))
.toArray(ColumnChunkReader[]::new);
final boolean exists = Arrays.stream(columnChunkReaders).anyMatch(ccr -> ccr != null && ccr.numRows() > 0);
return new ParquetColumnLocation<>(this, columnName, parquetColumnName,
exists ? columnChunkReaders : null);
Expand Down
Loading

0 comments on commit d379760

Please sign in to comment.