Skip to content

Commit

Permalink
add tests, refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
devinrsmith committed Jan 15, 2025
1 parent ae60022 commit 554ada6
Show file tree
Hide file tree
Showing 10 changed files with 311 additions and 117 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ public final String getColumnNameFromParquetColumnNameOrDefault(final String par

public abstract Optional<Collection<List<String>>> getIndexColumns();

public abstract Optional<ParquetColumnResolver.Factory> getColumnResolver();
public abstract Optional<ParquetColumnResolver.Factory> getColumnResolverFactory();

/**
* Creates a new {@link ParquetInstructions} object with the same properties as the current object but definition
Expand Down Expand Up @@ -321,7 +321,7 @@ public Optional<Collection<List<String>>> getIndexColumns() {
}

@Override
public Optional<ParquetColumnResolver.Factory> getColumnResolver() {
public Optional<ParquetColumnResolver.Factory> getColumnResolverFactory() {
return Optional.empty();
}

Expand Down Expand Up @@ -635,7 +635,7 @@ public Optional<Collection<List<String>>> getIndexColumns() {
}

@Override
public Optional<ParquetColumnResolver.Factory> getColumnResolver() {
public Optional<ParquetColumnResolver.Factory> getColumnResolverFactory() {
return Optional.ofNullable(columnResolver);
}

Expand Down Expand Up @@ -760,7 +760,7 @@ public Builder(final ParquetInstructions parquetInstructions) {
tableDefinition = readOnlyParquetInstructions.getTableDefinition().orElse(null);
indexColumns = readOnlyParquetInstructions.getIndexColumns().orElse(null);
onWriteCompleted = readOnlyParquetInstructions.onWriteCompleted().orElse(null);
columnResolverFactory = readOnlyParquetInstructions.getColumnResolver().orElse(null);
columnResolverFactory = readOnlyParquetInstructions.getColumnResolverFactory().orElse(null);
}

public Builder addColumnNameMapping(final String parquetColumnName, final String columnName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,23 @@
//
package io.deephaven.parquet.table.location;

import io.deephaven.annotations.BuildableStyle;
import io.deephaven.engine.table.impl.locations.TableKey;
import io.deephaven.parquet.table.ParquetInstructions;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.schema.MessageType;
import org.immutables.value.Value;

import java.util.Map;
import java.util.Optional;

/**
* A mapping between Deephaven column names and Parquet {@link ColumnDescriptor column descriptors}.
*
* TODO: describe better
*/
@Value.Immutable
@BuildableStyle
public abstract class ParquetColumnResolver {
public interface ParquetColumnResolver {

/**
* {@link ParquetInstructions.Builder#setColumnResolverFactory(Factory)}
*/
public interface Factory {
interface Factory {

/**
* TODO: description
Expand All @@ -33,42 +28,8 @@ public interface Factory {
* @param tableLocationKey the Parquet TLK
* @return the Parquet column resolver
*/
ParquetColumnResolver init(TableKey tableKey, ParquetTableLocationKey tableLocationKey);
ParquetColumnResolver of(TableKey tableKey, ParquetTableLocationKey tableLocationKey);
}

public static Builder builder() {
return ImmutableParquetColumnResolver.builder();
}

// Intentionally not exposed, but necessary to expose to Builder for safety checks.
abstract MessageType schema();

/**
* TODO: javadoc
*
* @return
*/
public abstract Map<String, ColumnDescriptor> mapping();

@Value.Check
final void checkColumns() {
for (ColumnDescriptor columnDescriptor : mapping().values()) {
if (!ParquetUtil.contains(schema(), columnDescriptor)) {
throw new IllegalArgumentException("schema does not contain column descriptor " + columnDescriptor);
}
}
}

public interface Builder {

// TODO: javadoc

Builder schema(MessageType schema);

Builder putMapping(String key, ColumnDescriptor value);

Builder putAllMapping(Map<String, ? extends ColumnDescriptor> entries);

ParquetColumnResolver build();
}
Optional<ColumnDescriptor> of(String columnName);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.parquet.table.location;

import io.deephaven.annotations.BuildableStyle;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.schema.MessageType;
import org.immutables.value.Value;

import java.util.Map;
import java.util.Optional;

/**
* A {@link ParquetColumnResolver} implementation based on a map from Deephaven column names to Parquet
* {@link ColumnDescriptor column descriptors}.
*/
@Value.Immutable
@BuildableStyle
public abstract class ParquetColumnResolverMap implements ParquetColumnResolver {

public static Builder builder() {
return ImmutableParquetColumnResolverMap.builder();
}

/**
* The Parquet schema.
*/
public abstract MessageType schema();

/**
* The map from Deephaven column name to {@link ColumnDescriptor}. The {@link #schema()} must contains the column
* descriptors.
*/
public abstract Map<String, ColumnDescriptor> mapping();

@Override
public final Optional<ColumnDescriptor> of(String columnName) {
return Optional.ofNullable(mapping().get(columnName));
}

public interface Builder {
Builder schema(MessageType schema);

Builder putMapping(String key, ColumnDescriptor value);

Builder putAllMapping(Map<String, ? extends ColumnDescriptor> entries);

ParquetColumnResolverMap build();
}

@Value.Check
final void checkMapping() {
for (Map.Entry<String, ColumnDescriptor> e : mapping().entrySet()) {
final ColumnDescriptor columnDescriptor = e.getValue();
if (!ParquetUtil.contains(schema(), columnDescriptor)) {
throw new IllegalArgumentException(
String.format("schema does not contain Deephaven columnName=%s columnDescriptor=%s", e.getKey(),
columnDescriptor));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

import io.deephaven.engine.table.impl.locations.TableKey;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.hadoop.metadata.FileMetaData;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
Expand All @@ -22,16 +21,16 @@
* The following is an example {@link ParquetColumnResolver.Factory} that may be useful for testing and debugging
* purposes, but is not meant to be used for production use cases.
*/
public final class ParquetColumnResolverFieldIdFactory implements ParquetColumnResolver.Factory {
public final class ParquetFieldIdColumnResolverFactory implements ParquetColumnResolver.Factory {

/**
* TODO: javadoc
*
* @param columnNameToFieldId a map from Deephaven column names to field ids
* @return the column resolver provider
*/
public static ParquetColumnResolverFieldIdFactory of(Map<String, Integer> columnNameToFieldId) {
return new ParquetColumnResolverFieldIdFactory(columnNameToFieldId
public static ParquetFieldIdColumnResolverFactory of(Map<String, Integer> columnNameToFieldId) {
return new ParquetFieldIdColumnResolverFactory(columnNameToFieldId
.entrySet()
.stream()
.collect(Collectors.groupingBy(
Expand All @@ -41,18 +40,22 @@ public static ParquetColumnResolverFieldIdFactory of(Map<String, Integer> column

private final Map<Integer, Set<String>> fieldIdsToDhColumnNames;

private ParquetColumnResolverFieldIdFactory(Map<Integer, Set<String>> fieldIdsToDhColumnNames) {
private ParquetFieldIdColumnResolverFactory(Map<Integer, Set<String>> fieldIdsToDhColumnNames) {
this.fieldIdsToDhColumnNames = Objects.requireNonNull(fieldIdsToDhColumnNames);
}

@Override
public ParquetColumnResolver init(TableKey tableKey, ParquetTableLocationKey tableLocationKey) {
public ParquetColumnResolver of(TableKey tableKey, ParquetTableLocationKey tableLocationKey) {
final MessageType schema = tableLocationKey.getFileReader().getSchema();
// TODO: note the potential for confusion on where to derive schema from.
// final MessageType schema = tableLocationKey.getMetadata().getFileMetaData().getSchema();
return of(schema);
}

public ParquetColumnResolverMap of(MessageType schema) {
final FieldIdMappingVisitor visitor = new FieldIdMappingVisitor();
ParquetUtil.walk(schema, visitor);
return ParquetColumnResolver.builder()
return ParquetColumnResolverMap.builder()
.schema(schema)
.putAllMapping(visitor.nameToColumnDescriptor)
.build();
Expand All @@ -67,10 +70,11 @@ public void accept(Collection<Type> path, PrimitiveType primitiveType) {
// field id closest to the leaf. This version, however, takes the most general approach and considers field
// ids wherever they appear; ultimately, only being resolvable if the field id mapping is unambiguous.
for (Type type : path) {
if (type.getId() == null) {
final Type.ID id = type.getId();
if (id == null) {
continue;
}
final int fieldId = type.getId().intValue();
final int fieldId = id.intValue();
final Set<String> set = fieldIdsToDhColumnNames.get(fieldId);
if (set == null) {
continue;
Expand All @@ -79,9 +83,9 @@ public void accept(Collection<Type> path, PrimitiveType primitiveType) {
for (String columnName : set) {
final ColumnDescriptor existing = nameToColumnDescriptor.putIfAbsent(columnName, columnDescriptor);
if (existing != null) {
throw new IllegalStateException(String.format(
"Parquet columns can't be unambigously mapped. %d -> %s has multiple paths %s, %s",
fieldId, columnName, Arrays.toString(existing.getPath()),
throw new IllegalArgumentException(String.format(
"Parquet columns can't be unambigously mapped. %s -> %d has multiple paths %s, %s",
columnName, fieldId, Arrays.toString(existing.getPath()),
Arrays.toString(columnDescriptor.getPath())));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,9 @@ public ParquetTableLocation(@NotNull final TableKey tableKey,
parquetMetadata = tableLocationKey.getMetadata();
rowGroupIndices = tableLocationKey.getRowGroupIndices();
}
{
final ParquetColumnResolver.Factory factory = readInstructions.getColumnResolver().orElse(null);
resolver = factory == null
? null
: Objects.requireNonNull(factory.init(tableKey, tableLocationKey));
}
resolver = readInstructions.getColumnResolverFactory()
.map(factory -> factory.of(tableKey, tableLocationKey))
.orElse(null);
final int rowGroupCount = rowGroupIndices.length;
rowGroups = IntStream.of(rowGroupIndices)
.mapToObj(rgi -> parquetFileReader.fileMetaData.getRow_groups().get(rgi))
Expand Down Expand Up @@ -194,12 +191,11 @@ protected ColumnLocation makeColumnLocation(@NotNull final String columnName) {
final String[] columnPath = parquetColumnNameToPath.get(parquetColumnName);
nameList = columnPath == null ? Collections.singletonList(parquetColumnName) : Arrays.asList(columnPath);
} else {
final ColumnDescriptor columnDescriptor = resolver.mapping().get(columnName);
if (columnDescriptor == null) {
nameList = List.of(); // empty, will not resolve
} else {
nameList = Arrays.asList(columnDescriptor.getPath());
}
// empty list will result in exists=false
nameList = resolver.of(columnName)
.map(ColumnDescriptor::getPath)
.map(Arrays::asList)
.orElse(List.of());
}
final ColumnChunkReader[] columnChunkReaders = Arrays.stream(getRowGroupReaders())
.map(rgr -> rgr.getColumnChunk(columnName, nameList))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ interface Visitor {
void accept(Collection<Type> path, PrimitiveType primitiveType);
}


static class ColumnDescriptorVisitor implements Visitor {

private final Consumer<ColumnDescriptor> consumer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public void empty() {
assertThat(ParquetInstructions.EMPTY.getFileLayout()).isEmpty();
assertThat(ParquetInstructions.EMPTY.getTableDefinition()).isEmpty();
assertThat(ParquetInstructions.EMPTY.getIndexColumns()).isEmpty();
assertThat(ParquetInstructions.EMPTY.getColumnResolver()).isEmpty();
assertThat(ParquetInstructions.EMPTY.getColumnResolverFactory()).isEmpty();
assertThat(ParquetInstructions.EMPTY.baseNameForPartitionedParquetData()).isEqualTo("{uuid}");
}

Expand Down Expand Up @@ -152,7 +152,7 @@ public void columnResolver() {
.setTableDefinition(TableDefinition.of(ColumnDefinition.ofInt("Foo")))
.setColumnResolverFactory(ColumnResolverTestImpl.INSTANCE)
.build();
assertThat(instructions.getColumnResolver()).hasValue(ColumnResolverTestImpl.INSTANCE);
assertThat(instructions.getColumnResolverFactory()).hasValue(ColumnResolverTestImpl.INSTANCE);
}

@Test
Expand All @@ -171,7 +171,7 @@ private enum ColumnResolverTestImpl implements ParquetColumnResolver.Factory {
INSTANCE;

@Override
public ParquetColumnResolver init(TableKey tableKey, ParquetTableLocationKey tableLocationKey) {
public ParquetColumnResolver of(TableKey tableKey, ParquetTableLocationKey tableLocationKey) {
throw new UnsupportedOperationException();
}
}
Expand Down
Loading

0 comments on commit 554ada6

Please sign in to comment.