From 554ada6796231351973ce3ac5cd02e8d39802cd7 Mon Sep 17 00:00:00 2001 From: Devin Smith Date: Tue, 14 Jan 2025 18:23:49 -0800 Subject: [PATCH] add tests, refactor --- .../parquet/table/ParquetInstructions.java | 8 +- .../table/location/ParquetColumnResolver.java | 49 +---- .../location/ParquetColumnResolverMap.java | 63 +++++++ ... ParquetFieldIdColumnResolverFactory.java} | 28 +-- .../table/location/ParquetTableLocation.java | 20 +- .../parquet/table/location/ParquetUtil.java | 1 - .../table/ParquetInstructionsTest.java | 6 +- .../parquet/table/TestParquetTools.java | 24 +-- ...rquetFieldIdColumnResolverFactoryTest.java | 177 ++++++++++++++++++ .../table/location/ParquetUtilTest.java | 52 +++-- 10 files changed, 311 insertions(+), 117 deletions(-) create mode 100644 extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetColumnResolverMap.java rename extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/{ParquetColumnResolverFieldIdFactory.java => ParquetFieldIdColumnResolverFactory.java} (80%) create mode 100644 extensions/parquet/table/src/test/java/io/deephaven/parquet/table/location/ParquetFieldIdColumnResolverFactoryTest.java diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetInstructions.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetInstructions.java index f3c12403344..6888bed988c 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetInstructions.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetInstructions.java @@ -169,7 +169,7 @@ public final String getColumnNameFromParquetColumnNameOrDefault(final String par public abstract Optional>> getIndexColumns(); - public abstract Optional getColumnResolver(); + public abstract Optional getColumnResolverFactory(); /** * Creates a new {@link ParquetInstructions} object with the same properties as the current object but definition @@ -321,7 +321,7 @@ public Optional>> getIndexColumns() { } @Override - public Optional getColumnResolver() { + public Optional getColumnResolverFactory() { return Optional.empty(); } @@ -635,7 +635,7 @@ public Optional>> getIndexColumns() { } @Override - public Optional getColumnResolver() { + public Optional getColumnResolverFactory() { return Optional.ofNullable(columnResolver); } @@ -760,7 +760,7 @@ public Builder(final ParquetInstructions parquetInstructions) { tableDefinition = readOnlyParquetInstructions.getTableDefinition().orElse(null); indexColumns = readOnlyParquetInstructions.getIndexColumns().orElse(null); onWriteCompleted = readOnlyParquetInstructions.onWriteCompleted().orElse(null); - columnResolverFactory = readOnlyParquetInstructions.getColumnResolver().orElse(null); + columnResolverFactory = readOnlyParquetInstructions.getColumnResolverFactory().orElse(null); } public Builder addColumnNameMapping(final String parquetColumnName, final String columnName) { diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetColumnResolver.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetColumnResolver.java index 066dc0ffaa0..c4f0828861b 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetColumnResolver.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetColumnResolver.java @@ -3,28 +3,23 @@ // package io.deephaven.parquet.table.location; -import io.deephaven.annotations.BuildableStyle; import io.deephaven.engine.table.impl.locations.TableKey; import io.deephaven.parquet.table.ParquetInstructions; import org.apache.parquet.column.ColumnDescriptor; -import org.apache.parquet.schema.MessageType; -import org.immutables.value.Value; -import java.util.Map; +import java.util.Optional; /** * A mapping between Deephaven column names and Parquet {@link ColumnDescriptor column descriptors}. * * TODO: describe better */ -@Value.Immutable -@BuildableStyle -public abstract class ParquetColumnResolver { +public interface ParquetColumnResolver { /** * {@link ParquetInstructions.Builder#setColumnResolverFactory(Factory)} */ - public interface Factory { + interface Factory { /** * TODO: description @@ -33,42 +28,8 @@ public interface Factory { * @param tableLocationKey the Parquet TLK * @return the Parquet column resolver */ - ParquetColumnResolver init(TableKey tableKey, ParquetTableLocationKey tableLocationKey); + ParquetColumnResolver of(TableKey tableKey, ParquetTableLocationKey tableLocationKey); } - public static Builder builder() { - return ImmutableParquetColumnResolver.builder(); - } - - // Intentionally not exposed, but necessary to expose to Builder for safety checks. - abstract MessageType schema(); - - /** - * TODO: javadoc - * - * @return - */ - public abstract Map mapping(); - - @Value.Check - final void checkColumns() { - for (ColumnDescriptor columnDescriptor : mapping().values()) { - if (!ParquetUtil.contains(schema(), columnDescriptor)) { - throw new IllegalArgumentException("schema does not contain column descriptor " + columnDescriptor); - } - } - } - - public interface Builder { - - // TODO: javadoc - - Builder schema(MessageType schema); - - Builder putMapping(String key, ColumnDescriptor value); - - Builder putAllMapping(Map entries); - - ParquetColumnResolver build(); - } + Optional of(String columnName); } diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetColumnResolverMap.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetColumnResolverMap.java new file mode 100644 index 00000000000..6c55001dd58 --- /dev/null +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetColumnResolverMap.java @@ -0,0 +1,63 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.parquet.table.location; + +import io.deephaven.annotations.BuildableStyle; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.schema.MessageType; +import org.immutables.value.Value; + +import java.util.Map; +import java.util.Optional; + +/** + * A {@link ParquetColumnResolver} implementation based on a map from Deephaven column names to Parquet + * {@link ColumnDescriptor column descriptors}. + */ +@Value.Immutable +@BuildableStyle +public abstract class ParquetColumnResolverMap implements ParquetColumnResolver { + + public static Builder builder() { + return ImmutableParquetColumnResolverMap.builder(); + } + + /** + * The Parquet schema. + */ + public abstract MessageType schema(); + + /** + * The map from Deephaven column name to {@link ColumnDescriptor}. The {@link #schema()} must contains the column + * descriptors. + */ + public abstract Map mapping(); + + @Override + public final Optional of(String columnName) { + return Optional.ofNullable(mapping().get(columnName)); + } + + public interface Builder { + Builder schema(MessageType schema); + + Builder putMapping(String key, ColumnDescriptor value); + + Builder putAllMapping(Map entries); + + ParquetColumnResolverMap build(); + } + + @Value.Check + final void checkMapping() { + for (Map.Entry e : mapping().entrySet()) { + final ColumnDescriptor columnDescriptor = e.getValue(); + if (!ParquetUtil.contains(schema(), columnDescriptor)) { + throw new IllegalArgumentException( + String.format("schema does not contain Deephaven columnName=%s columnDescriptor=%s", e.getKey(), + columnDescriptor)); + } + } + } +} diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetColumnResolverFieldIdFactory.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetFieldIdColumnResolverFactory.java similarity index 80% rename from extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetColumnResolverFieldIdFactory.java rename to extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetFieldIdColumnResolverFactory.java index c08d97651d7..9a75a6f3cd3 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetColumnResolverFieldIdFactory.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetFieldIdColumnResolverFactory.java @@ -5,7 +5,6 @@ import io.deephaven.engine.table.impl.locations.TableKey; import org.apache.parquet.column.ColumnDescriptor; -import org.apache.parquet.hadoop.metadata.FileMetaData; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Type; @@ -22,7 +21,7 @@ * The following is an example {@link ParquetColumnResolver.Factory} that may be useful for testing and debugging * purposes, but is not meant to be used for production use cases. */ -public final class ParquetColumnResolverFieldIdFactory implements ParquetColumnResolver.Factory { +public final class ParquetFieldIdColumnResolverFactory implements ParquetColumnResolver.Factory { /** * TODO: javadoc @@ -30,8 +29,8 @@ public final class ParquetColumnResolverFieldIdFactory implements ParquetColumnR * @param columnNameToFieldId a map from Deephaven column names to field ids * @return the column resolver provider */ - public static ParquetColumnResolverFieldIdFactory of(Map columnNameToFieldId) { - return new ParquetColumnResolverFieldIdFactory(columnNameToFieldId + public static ParquetFieldIdColumnResolverFactory of(Map columnNameToFieldId) { + return new ParquetFieldIdColumnResolverFactory(columnNameToFieldId .entrySet() .stream() .collect(Collectors.groupingBy( @@ -41,18 +40,22 @@ public static ParquetColumnResolverFieldIdFactory of(Map column private final Map> fieldIdsToDhColumnNames; - private ParquetColumnResolverFieldIdFactory(Map> fieldIdsToDhColumnNames) { + private ParquetFieldIdColumnResolverFactory(Map> fieldIdsToDhColumnNames) { this.fieldIdsToDhColumnNames = Objects.requireNonNull(fieldIdsToDhColumnNames); } @Override - public ParquetColumnResolver init(TableKey tableKey, ParquetTableLocationKey tableLocationKey) { + public ParquetColumnResolver of(TableKey tableKey, ParquetTableLocationKey tableLocationKey) { final MessageType schema = tableLocationKey.getFileReader().getSchema(); // TODO: note the potential for confusion on where to derive schema from. // final MessageType schema = tableLocationKey.getMetadata().getFileMetaData().getSchema(); + return of(schema); + } + + public ParquetColumnResolverMap of(MessageType schema) { final FieldIdMappingVisitor visitor = new FieldIdMappingVisitor(); ParquetUtil.walk(schema, visitor); - return ParquetColumnResolver.builder() + return ParquetColumnResolverMap.builder() .schema(schema) .putAllMapping(visitor.nameToColumnDescriptor) .build(); @@ -67,10 +70,11 @@ public void accept(Collection path, PrimitiveType primitiveType) { // field id closest to the leaf. This version, however, takes the most general approach and considers field // ids wherever they appear; ultimately, only being resolvable if the field id mapping is unambiguous. for (Type type : path) { - if (type.getId() == null) { + final Type.ID id = type.getId(); + if (id == null) { continue; } - final int fieldId = type.getId().intValue(); + final int fieldId = id.intValue(); final Set set = fieldIdsToDhColumnNames.get(fieldId); if (set == null) { continue; @@ -79,9 +83,9 @@ public void accept(Collection path, PrimitiveType primitiveType) { for (String columnName : set) { final ColumnDescriptor existing = nameToColumnDescriptor.putIfAbsent(columnName, columnDescriptor); if (existing != null) { - throw new IllegalStateException(String.format( - "Parquet columns can't be unambigously mapped. %d -> %s has multiple paths %s, %s", - fieldId, columnName, Arrays.toString(existing.getPath()), + throw new IllegalArgumentException(String.format( + "Parquet columns can't be unambigously mapped. %s -> %d has multiple paths %s, %s", + columnName, fieldId, Arrays.toString(existing.getPath()), Arrays.toString(columnDescriptor.getPath()))); } } diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocation.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocation.java index 4c178cae257..afa8172e3f3 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocation.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocation.java @@ -90,12 +90,9 @@ public ParquetTableLocation(@NotNull final TableKey tableKey, parquetMetadata = tableLocationKey.getMetadata(); rowGroupIndices = tableLocationKey.getRowGroupIndices(); } - { - final ParquetColumnResolver.Factory factory = readInstructions.getColumnResolver().orElse(null); - resolver = factory == null - ? null - : Objects.requireNonNull(factory.init(tableKey, tableLocationKey)); - } + resolver = readInstructions.getColumnResolverFactory() + .map(factory -> factory.of(tableKey, tableLocationKey)) + .orElse(null); final int rowGroupCount = rowGroupIndices.length; rowGroups = IntStream.of(rowGroupIndices) .mapToObj(rgi -> parquetFileReader.fileMetaData.getRow_groups().get(rgi)) @@ -194,12 +191,11 @@ protected ColumnLocation makeColumnLocation(@NotNull final String columnName) { final String[] columnPath = parquetColumnNameToPath.get(parquetColumnName); nameList = columnPath == null ? Collections.singletonList(parquetColumnName) : Arrays.asList(columnPath); } else { - final ColumnDescriptor columnDescriptor = resolver.mapping().get(columnName); - if (columnDescriptor == null) { - nameList = List.of(); // empty, will not resolve - } else { - nameList = Arrays.asList(columnDescriptor.getPath()); - } + // empty list will result in exists=false + nameList = resolver.of(columnName) + .map(ColumnDescriptor::getPath) + .map(Arrays::asList) + .orElse(List.of()); } final ColumnChunkReader[] columnChunkReaders = Arrays.stream(getRowGroupReaders()) .map(rgr -> rgr.getColumnChunk(columnName, nameList)) diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetUtil.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetUtil.java index 5e2322cc143..9a9db68d1c0 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetUtil.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetUtil.java @@ -26,7 +26,6 @@ interface Visitor { void accept(Collection path, PrimitiveType primitiveType); } - static class ColumnDescriptorVisitor implements Visitor { private final Consumer consumer; diff --git a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetInstructionsTest.java b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetInstructionsTest.java index 726488152a8..9188563c368 100644 --- a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetInstructionsTest.java +++ b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetInstructionsTest.java @@ -32,7 +32,7 @@ public void empty() { assertThat(ParquetInstructions.EMPTY.getFileLayout()).isEmpty(); assertThat(ParquetInstructions.EMPTY.getTableDefinition()).isEmpty(); assertThat(ParquetInstructions.EMPTY.getIndexColumns()).isEmpty(); - assertThat(ParquetInstructions.EMPTY.getColumnResolver()).isEmpty(); + assertThat(ParquetInstructions.EMPTY.getColumnResolverFactory()).isEmpty(); assertThat(ParquetInstructions.EMPTY.baseNameForPartitionedParquetData()).isEqualTo("{uuid}"); } @@ -152,7 +152,7 @@ public void columnResolver() { .setTableDefinition(TableDefinition.of(ColumnDefinition.ofInt("Foo"))) .setColumnResolverFactory(ColumnResolverTestImpl.INSTANCE) .build(); - assertThat(instructions.getColumnResolver()).hasValue(ColumnResolverTestImpl.INSTANCE); + assertThat(instructions.getColumnResolverFactory()).hasValue(ColumnResolverTestImpl.INSTANCE); } @Test @@ -171,7 +171,7 @@ private enum ColumnResolverTestImpl implements ParquetColumnResolver.Factory { INSTANCE; @Override - public ParquetColumnResolver init(TableKey tableKey, ParquetTableLocationKey tableLocationKey) { + public ParquetColumnResolver of(TableKey tableKey, ParquetTableLocationKey tableLocationKey) { throw new UnsupportedOperationException(); } } diff --git a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/TestParquetTools.java b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/TestParquetTools.java index d1a24085f86..270bde30a0b 100644 --- a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/TestParquetTools.java +++ b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/TestParquetTools.java @@ -21,7 +21,7 @@ import io.deephaven.parquet.base.InvalidParquetFileException; import io.deephaven.parquet.table.layout.ParquetKeyValuePartitionedLayout; import io.deephaven.parquet.table.location.ParquetColumnResolver; -import io.deephaven.parquet.table.location.ParquetColumnResolverFieldIdFactory; +import io.deephaven.parquet.table.location.ParquetFieldIdColumnResolverFactory; import io.deephaven.parquet.table.location.ParquetTableLocationKey; import io.deephaven.qst.type.Type; import io.deephaven.stringset.HashStringSet; @@ -656,7 +656,7 @@ public void testWriteParquetFieldIds() throws NoSuchAlgorithmException, IOExcept { final ParquetInstructions readInstructions = ParquetInstructions.builder() .setTableDefinition(td) - .setColumnResolverFactory(ParquetColumnResolverFieldIdFactory.of(Map.of( + .setColumnResolverFactory(ParquetFieldIdColumnResolverFactory.of(Map.of( BAZ, BAZ_ID, ZAP, ZAP_ID))) .build(); @@ -717,7 +717,7 @@ public void testParquetFieldIds() { final TableDefinition td = TableDefinition.of(bazCol, zapCol); final ParquetInstructions instructions = ParquetInstructions.builder() .setTableDefinition(td) - .setColumnResolverFactory(ParquetColumnResolverFieldIdFactory.of(Map.of(BAZ, BAZ_ID, ZAP, ZAP_ID))) + .setColumnResolverFactory(ParquetFieldIdColumnResolverFactory.of(Map.of(BAZ, BAZ_ID, ZAP, ZAP_ID))) .build(); // But, the user can still provide a TableDefinition @@ -745,7 +745,7 @@ public void testParquetFieldIds() { ColumnDefinition.ofString("column_53f0de5ae06f476eb82aa3f9294fcd05")); final ParquetInstructions partialInstructions = ParquetInstructions.builder() .setTableDefinition(partialTD) - .setColumnResolverFactory(ParquetColumnResolverFieldIdFactory.of(Map.of(BAZ, BAZ_ID))) + .setColumnResolverFactory(ParquetFieldIdColumnResolverFactory.of(Map.of(BAZ, BAZ_ID))) .build(); final Table table = ParquetTools.readTable(file, partialInstructions); assertEquals(partialTD, table.getDefinition()); @@ -755,7 +755,7 @@ public void testParquetFieldIds() { { final Table table = ParquetTools.readTable(file, ParquetInstructions.builder() .setTableDefinition(td) - .setColumnResolverFactory(ParquetColumnResolverFieldIdFactory.of(Map.of( + .setColumnResolverFactory(ParquetFieldIdColumnResolverFactory.of(Map.of( BAZ, BAZ_ID, ZAP, ZAP_ID, "Fake", 99))) @@ -772,7 +772,7 @@ public void testParquetFieldIds() { TableDefinition.of(bazCol, zapCol, ColumnDefinition.ofShort("Fake")); final Table table = ParquetTools.readTable(file, ParquetInstructions.builder() .setTableDefinition(tdWithFake) - .setColumnResolverFactory(ParquetColumnResolverFieldIdFactory.of(Map.of( + .setColumnResolverFactory(ParquetFieldIdColumnResolverFactory.of(Map.of( BAZ, BAZ_ID, ZAP, ZAP_ID, "Fake", 99))) @@ -791,7 +791,7 @@ public void testParquetFieldIds() { TableDefinition.of(bazCol, zapCol, ColumnDefinition.ofLong(BAZ_DUPE)); final ParquetInstructions dupeInstructions = ParquetInstructions.builder() .setTableDefinition(dupeTd) - .setColumnResolverFactory(ParquetColumnResolverFieldIdFactory.of(Map.of( + .setColumnResolverFactory(ParquetFieldIdColumnResolverFactory.of(Map.of( BAZ, BAZ_ID, ZAP, ZAP_ID, BAZ_DUPE, BAZ_ID))) @@ -811,7 +811,7 @@ public void testParquetFieldIds() { final TableDefinition bazTd = TableDefinition.of(bazCol); final ParquetInstructions inconsistent = ParquetInstructions.builder() .setTableDefinition(bazTd) - .setColumnResolverFactory(ParquetColumnResolverFieldIdFactory.of(Map.of(BAZ, BAZ_ID))) + .setColumnResolverFactory(ParquetFieldIdColumnResolverFactory.of(Map.of(BAZ, BAZ_ID))) .addColumnNameMapping("53f0de5a-e06f-476e-b82a-a3f9294fcd05", BAZ) .build(); final Table table = ParquetTools.readTable(file, inconsistent); @@ -862,7 +862,7 @@ public void testPartitionedParquetFieldIds() { final TableDefinition expectedTd = TableDefinition.of(partitionColumn, bazCol, zapCol); final ParquetInstructions instructions = ParquetInstructions.builder() .setTableDefinition(expectedTd) - .setColumnResolverFactory(ParquetColumnResolverFieldIdFactory.of(Map.of( + .setColumnResolverFactory(ParquetFieldIdColumnResolverFactory.of(Map.of( BAZ, BAZ_ID, ZAP, ZAP_ID))) .build(); @@ -902,7 +902,7 @@ public void testParquetFieldIdsWithListType() { final TableDefinition td = TableDefinition.of(ColumnDefinition.of(FOO, Type.intType().arrayType())); final ParquetInstructions instructions = ParquetInstructions.builder() .setTableDefinition(td) - .setColumnResolverFactory(ParquetColumnResolverFieldIdFactory.of(Map.of(FOO, 999))) + .setColumnResolverFactory(ParquetFieldIdColumnResolverFactory.of(Map.of(FOO, 999))) .build(); final Table expected = TableTools.newTable(td, new ColumnHolder<>(FOO, int[].class, int.class, false, new int[] {1, 2, 3}, @@ -955,7 +955,7 @@ public void testRenamingColumnResolver() { .build()); } - final ParquetColumnResolver.Factory resolver = ParquetColumnResolverFieldIdFactory.of(Map.of( + final ParquetColumnResolver.Factory resolver = ParquetFieldIdColumnResolverFactory.of(Map.of( NAME, NAME_ID, FIRST_NAME, FIRST_NAME_ID, LAST_NAME, LAST_NAME_ID)); @@ -1084,7 +1084,7 @@ public void parquetWithNonUniqueFieldIds() { { final Table table = ParquetTools.readTable(f.getPath(), ParquetInstructions.builder() .setTableDefinition(td) - .setColumnResolverFactory(ParquetColumnResolverFieldIdFactory.of(Map.of( + .setColumnResolverFactory(ParquetFieldIdColumnResolverFactory.of(Map.of( FOO, fieldId, BAR, fieldId))) .build()); diff --git a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/location/ParquetFieldIdColumnResolverFactoryTest.java b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/location/ParquetFieldIdColumnResolverFactoryTest.java new file mode 100644 index 00000000000..0fc64fbabc3 --- /dev/null +++ b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/location/ParquetFieldIdColumnResolverFactoryTest.java @@ -0,0 +1,177 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.parquet.table.location; + +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Types; +import org.junit.Test; + +import java.util.Map; + +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.failBecauseExceptionWasNotThrown; + +public class ParquetFieldIdColumnResolverFactoryTest { + + // foo (42) + private static final PrimitiveType FOO_42 = Types.required(INT32) + .id(42) + .named("foo"); + + // bar (43), list, element + private static final GroupType BAR_43 = Types.requiredList() + .id(43) + .requiredElement(INT32) + .named("bar"); + + // baz, list, element (44) + private static final GroupType BAZ_44 = Types.requiredList() + .requiredElement(INT32) + .id(44) + .named("baz"); + + private static final ParquetFieldIdColumnResolverFactory FACTORY = ParquetFieldIdColumnResolverFactory.of(Map.of( + "DeepFoo", 42, + "DeepBar", 43, + "DeepBaz", 44)); + + private static String[] p(String... path) { + return path; + } + + @Test + public void messageFields() { + final MessageType schema = Types.buildMessage() + .addFields(FOO_42, BAR_43, BAZ_44) + .named("root"); + final ParquetColumnResolverMap expected = ParquetColumnResolverMap.builder() + .schema(schema) + .putMapping("DeepFoo", schema.getColumnDescription(p("foo"))) + .putMapping("DeepBar", schema.getColumnDescription(p("bar", "list", "element"))) + .putMapping("DeepBaz", schema.getColumnDescription(p("baz", "list", "element"))) + .build(); + assertThat(FACTORY.of(schema)).isEqualTo(expected); + } + + @Test + public void messageGroupFields() { + final MessageType schema = Types.buildMessage() + .addFields(Types.repeatedGroup() + .addFields(FOO_42, BAR_43, BAZ_44) + .named("my_group")) + .named("root"); + final ParquetColumnResolverMap expected = ParquetColumnResolverMap.builder() + .schema(schema) + .putMapping("DeepFoo", schema.getColumnDescription(p("my_group", "foo"))) + .putMapping("DeepBar", schema.getColumnDescription(p("my_group", "bar", "list", "element"))) + .putMapping("DeepBaz", schema.getColumnDescription(p("my_group", "baz", "list", "element"))) + .build(); + assertThat(FACTORY.of(schema)).isEqualTo(expected); + } + + @Test + public void messageListElements() { + final MessageType schema = Types.buildMessage() + .addFields( + Types.requiredList().element(FOO_42).named("my_list1"), + Types.requiredList().element(BAR_43).named("my_list2"), + Types.requiredList().element(BAZ_44).named("my_list3")) + .named("root"); + final ParquetColumnResolverMap expected = ParquetColumnResolverMap.builder() + .schema(schema) + .putMapping("DeepFoo", schema.getColumnDescription(p("my_list1", "list", "foo"))) + .putMapping("DeepBar", schema.getColumnDescription(p("my_list2", "list", "bar", "list", "element"))) + .putMapping("DeepBaz", schema.getColumnDescription(p("my_list3", "list", "baz", "list", "element"))) + .build(); + assertThat(FACTORY.of(schema)).isEqualTo(expected); + } + + @Test + public void singleFieldMultipleIdsUnambiguous() { + final ParquetFieldIdColumnResolverFactory factory = ParquetFieldIdColumnResolverFactory.of(Map.of( + "Col1", 1, + "Col2", 2)); + + // BothCols (1), list, element (2) + final MessageType schema = Types.buildMessage().addFields(Types.requiredList() + .id(1) + .requiredElement(INT32) + .id(2) + .named("BothCols")) + .named("root"); + + final ParquetColumnResolverMap expected = ParquetColumnResolverMap.builder() + .schema(schema) + .putMapping("Col1", schema.getColumnDescription(p("BothCols", "list", "element"))) + .putMapping("Col2", schema.getColumnDescription(p("BothCols", "list", "element"))) + .build(); + + assertThat(factory.of(schema)).isEqualTo(expected); + } + + @Test + public void ambiguousFields() { + // X (1) + // Y (1) + // Z (2) + final MessageType schema = Types.buildMessage() + .addFields( + Types.required(INT32).id(1).named("X"), + Types.required(INT32).id(1).named("Y"), + Types.required(INT32).id(2).named("Z")) + .named("root"); + try { + ParquetFieldIdColumnResolverFactory.of(Map.of("Col1", 1)).of(schema); + failBecauseExceptionWasNotThrown(IllegalArgumentException.class); + } catch (IllegalArgumentException e) { + assertThat(e).hasMessageContaining("Col1 -> 1 has multiple paths [X], [Y]"); + } + // Does not fail if ambiguous id is not referenced + { + final ParquetColumnResolverMap expected = ParquetColumnResolverMap.builder() + .schema(schema) + .putMapping("ColZ", schema.getColumnDescription(p("Z"))) + .build(); + final ParquetColumnResolverMap actual = + ParquetFieldIdColumnResolverFactory.of(Map.of("ColZ", 2)).of(schema); + assertThat(actual).isEqualTo(expected); + } + } + + @Test + public void ambiguousFieldsNested() { + final ParquetFieldIdColumnResolverFactory factory = ParquetFieldIdColumnResolverFactory.of(Map.of("Col1", 1)); + // X (1), list, element (2) + // Y (2), list, element (1) + // Z (3) + final MessageType schema = Types.buildMessage() + .addFields( + Types.requiredList().id(1).requiredElement(INT32).id(2).named("X"), + Types.requiredList().id(2).requiredElement(INT32).id(1).named("Y"), + Types.required(INT32).id(3).named("Z")) + .named("root"); + // Note: a different implementation _could_ take a different course of action here and proceed without error; + // for example, an implementation could choose to consider the innermost (or outermost) field id for matching + // purposes. + try { + factory.of(schema); + failBecauseExceptionWasNotThrown(IllegalArgumentException.class); + } catch (IllegalArgumentException e) { + assertThat(e).hasMessageContaining("Col1 -> 1 has multiple paths [X, list, element], [Y, list, element]"); + } + // Does not fail if ambiguous id is not referenced + { + final ParquetColumnResolverMap expected = ParquetColumnResolverMap.builder() + .schema(schema) + .putMapping("ColZ", schema.getColumnDescription(p("Z"))) + .build(); + final ParquetColumnResolverMap actual = + ParquetFieldIdColumnResolverFactory.of(Map.of("ColZ", 3)).of(schema); + assertThat(actual).isEqualTo(expected); + } + } +} diff --git a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/location/ParquetUtilTest.java b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/location/ParquetUtilTest.java index 9288a0b80f5..2d95ad8b226 100644 --- a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/location/ParquetUtilTest.java +++ b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/location/ParquetUtilTest.java @@ -5,10 +5,8 @@ import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.schema.GroupType; -import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.PrimitiveType; -import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; import org.apache.parquet.schema.Types; import org.junit.Test; @@ -16,6 +14,12 @@ import java.util.List; import java.util.function.BiPredicate; +import static org.apache.parquet.schema.LogicalTypeAnnotation.intType; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64; +import static org.apache.parquet.schema.Types.optional; +import static org.apache.parquet.schema.Types.repeated; +import static org.apache.parquet.schema.Types.required; import static org.assertj.core.api.Assertions.assertThat; public class ParquetUtilTest { @@ -23,9 +27,9 @@ public class ParquetUtilTest { private static final MessageType SCHEMA; static { - final PrimitiveType required = Types.required(PrimitiveTypeName.INT32).named("Required"); - final PrimitiveType repeated = Types.repeated(PrimitiveTypeName.INT32).named("Repeated"); - final PrimitiveType optional = Types.optional(PrimitiveTypeName.INT32).named("Optional"); + final PrimitiveType required = required(INT32).named("Required"); + final PrimitiveType repeated = repeated(INT32).named("Repeated"); + final PrimitiveType optional = optional(INT32).named("Optional"); final GroupType requiredGroup = Types.requiredGroup() .addFields(required, repeated, optional) .named("RequiredGroup"); @@ -82,31 +86,21 @@ public void contains() { for (ColumnDescriptor column : ParquetUtil.getColumns(SCHEMA)) { assertThat(ParquetUtil.contains(SCHEMA, column)).isTrue(); } - assertThat(ParquetUtil.contains(SCHEMA, new ColumnDescriptor(new String[] {"Required"}, - Types.required(PrimitiveTypeName.INT32).named("Required"), 0, 0))).isTrue(); + assertThat(ParquetUtil.contains(SCHEMA, + new ColumnDescriptor(new String[] {"Required"}, required(INT32).named("Required"), 0, 0))).isTrue(); for (ColumnDescriptor column : new ColumnDescriptor[] { - new ColumnDescriptor(new String[] {"Required"}, - Types.optional(PrimitiveTypeName.INT32).named("Required"), 0, 0), - new ColumnDescriptor(new String[] {"Required"}, - Types.repeated(PrimitiveTypeName.INT32).named("Required"), 0, 0), - new ColumnDescriptor(new String[] {"Required"}, - Types.required(PrimitiveTypeName.INT32).id(42).named("Required"), 0, 0), - new ColumnDescriptor(new String[] {"Required2"}, - Types.required(PrimitiveTypeName.INT32).named("Required"), 0, 0), - new ColumnDescriptor(new String[] {"Required"}, - Types.required(PrimitiveTypeName.INT32).named("Required2"), 0, 0), - new ColumnDescriptor(new String[] {"Required"}, - Types.required(PrimitiveTypeName.INT32).named("Required"), 1, 0), - new ColumnDescriptor(new String[] {"Required"}, - Types.required(PrimitiveTypeName.INT32).named("Required"), 0, 1), - new ColumnDescriptor(new String[] {"Required"}, - Types.required(PrimitiveTypeName.INT64).named("Required"), 0, 0), - new ColumnDescriptor(new String[] {"Required"}, - Types.required(PrimitiveTypeName.INT32).as(LogicalTypeAnnotation.intType(16)).named("Required"), - 0, 0), - new ColumnDescriptor(new String[] {"Required"}, - Types.optional(PrimitiveTypeName.INT32).named("Required"), 0, 0), - new ColumnDescriptor(new String[] {}, Types.repeated(PrimitiveTypeName.INT32).named("Required"), 0, 0) + new ColumnDescriptor(new String[] {"Required"}, optional(INT32).named("Required"), 0, 0), + new ColumnDescriptor(new String[] {"Required"}, repeated(INT32).named("Required"), 0, 0), + new ColumnDescriptor(new String[] {"Required"}, required(INT32).id(42).named("Required"), 0, 0), + new ColumnDescriptor(new String[] {"Required2"}, required(INT32).named("Required"), 0, 0), + new ColumnDescriptor(new String[] {"Required"}, required(INT32).named("Required2"), 0, 0), + new ColumnDescriptor(new String[] {"Required"}, required(INT32).named("Required"), 1, 0), + new ColumnDescriptor(new String[] {"Required"}, required(INT32).named("Required"), 0, 1), + new ColumnDescriptor(new String[] {"Required"}, required(INT64).named("Required"), 0, 0), + new ColumnDescriptor(new String[] {"Required"}, required(INT32).as(intType(16)).named("Required"), 0, + 0), + new ColumnDescriptor(new String[] {"Required"}, optional(INT32).named("Required"), 0, 0), + new ColumnDescriptor(new String[] {}, repeated(INT32).named("Required"), 0, 0) }) { assertThat(ParquetUtil.contains(SCHEMA, column)).isFalse(); }