diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/SourceTable.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/SourceTable.java index bc205fd2ef0..f2870f10631 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/SourceTable.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/SourceTable.java @@ -113,6 +113,11 @@ private void initialize() { initializeLocationSizes(); } + @TestUseOnly + public final TableLocationProvider tableLocationProvider() { + return locationProvider; + } + /** * This is only for unit tests, at this time. */ diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/TableLocationProvider.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/TableLocationProvider.java index ea8bc55dee3..d8991c7d964 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/TableLocationProvider.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/TableLocationProvider.java @@ -77,7 +77,7 @@ interface Listener extends BasicTableDataListener { void unsubscribe(@NotNull Listener listener); /** - * Initialize or run state information about the list of existing locations. + * Initialize or refresh state information about the list of existing locations. */ void refresh(); diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/impl/KnownLocationKeyFinder.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/impl/KnownLocationKeyFinder.java index 17df1e71c3c..5710f235e33 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/impl/KnownLocationKeyFinder.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/impl/KnownLocationKeyFinder.java @@ -9,6 +9,7 @@ import java.util.Arrays; import java.util.Comparator; import java.util.List; +import java.util.Optional; import java.util.function.Consumer; /** @@ -34,10 +35,18 @@ public static KnownLocationKeyFinder(mutableKeys); + final String comparatorString = comparator == null + ? null + : Comparator.naturalOrder().equals(comparator) + ? "Comparator.naturalOrder()" + : comparator.toString(); + final String toString = + String.format("%s[%s, %s]", KnownLocationKeyFinder.class.getSimpleName(), finder, comparatorString); + return new KnownLocationKeyFinder<>(mutableKeys, toString); } private final List knownKeys; + private final String toString; @SafeVarargs public KnownLocationKeyFinder(@NotNull final TLK... knownKeys) { @@ -45,7 +54,12 @@ public KnownLocationKeyFinder(@NotNull final TLK... knownKeys) { } public KnownLocationKeyFinder(List knownKeys) { + this(knownKeys, null); + } + + public KnownLocationKeyFinder(List knownKeys, String toString) { this.knownKeys = List.copyOf(knownKeys); + this.toString = toString; } /** @@ -55,8 +69,21 @@ public List getKnownKeys() { return knownKeys; } + public Optional getFirstKey() { + return knownKeys.isEmpty() ? Optional.empty() : Optional.of(knownKeys.get(0)); + } + + public Optional getLastKey() { + return knownKeys.isEmpty() ? Optional.empty() : Optional.of(knownKeys.get(knownKeys.size() - 1)); + } + @Override public void findKeys(@NotNull Consumer locationKeyObserver) { knownKeys.forEach(locationKeyObserver); } + + @Override + public String toString() { + return toString == null ? super.toString() : toString; + } } diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTools.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTools.java index a7cdbc89e83..c477d65bf69 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTools.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTools.java @@ -13,7 +13,7 @@ import io.deephaven.engine.table.Table; import io.deephaven.engine.table.TableDefinition; import io.deephaven.engine.table.impl.locations.util.TableDataRefreshService; -import io.deephaven.engine.util.TableTools; +import io.deephaven.engine.updategraph.UpdateSourceRegistrar; import io.deephaven.vector.*; import io.deephaven.stringset.StringSet; import io.deephaven.engine.util.file.TrackedFileHandleFactory; @@ -61,6 +61,8 @@ @SuppressWarnings("WeakerAccess") public class ParquetTools { + private static final int MAX_PARTITIONING_LEVELS_INFERENCE = 32; + private ParquetTools() {} private static final Logger log = LoggerFactory.getLogger(ParquetTools.class); @@ -68,6 +70,18 @@ private ParquetTools() {} /** * Reads in a table from a single parquet, metadata file, or directory with recognized layout. * + *

+ * This method attempts to "do the right thing." It examines the source to determine if it's a single parquet file, + * a metadata file, or a directory. If it's a directory, it additionally tries to guess the layout to use. Unless a + * metadata file is supplied or discovered in the directory, the highest (by {@link ParquetTableLocationKey location + * key} order) location found will be used to infer schema. + * + *

+ * Delegates to one of {@link #readSingleFileTable(File, ParquetInstructions)}, + * {@link #readPartitionedTableWithMetadata(File, ParquetInstructions)}, + * {@link #readFlatPartitionedTable(File, ParquetInstructions)}, or + * {@link #readKeyValuePartitionedTable(File, ParquetInstructions)}. + * * @param sourceFilePath The file or directory to examine * @return table * @see ParquetSingleFileLayout @@ -82,6 +96,18 @@ public static Table readTable(@NotNull final String sourceFilePath) { /** * Reads in a table from a single parquet, metadata file, or directory with recognized layout. * + *

+ * This method attempts to "do the right thing." It examines the source to determine if it's a single parquet file, + * a metadata file, or a directory. If it's a directory, it additionally tries to guess the layout to use. Unless a + * metadata file is supplied or discovered in the directory, the highest (by {@link ParquetTableLocationKey location + * key} order) location found will be used to infer schema. + * + *

+ * Delegates to one of {@link #readSingleFileTable(File, ParquetInstructions)}, + * {@link #readPartitionedTableWithMetadata(File, ParquetInstructions)}, + * {@link #readFlatPartitionedTable(File, ParquetInstructions)}, or + * {@link #readKeyValuePartitionedTable(File, ParquetInstructions)}. + * * @param sourceFilePath The file or directory to examine * @param readInstructions Instructions for customizations while reading * @return table @@ -99,6 +125,18 @@ public static Table readTable( /** * Reads in a table from a single parquet, metadata file, or directory with recognized layout. * + *

+ * This method attempts to "do the right thing." It examines the source to determine if it's a single parquet file, + * a metadata file, or a directory. If it's a directory, it additionally tries to guess the layout to use. Unless a + * metadata file is supplied or discovered in the directory, the highest (by {@link ParquetTableLocationKey location + * key} order) location found will be used to infer schema. + * + *

+ * Delegates to one of {@link #readSingleFileTable(File, ParquetInstructions)}, + * {@link #readPartitionedTableWithMetadata(File, ParquetInstructions)}, + * {@link #readFlatPartitionedTable(File, ParquetInstructions)}, or + * {@link #readKeyValuePartitionedTable(File, ParquetInstructions)}. + * * @param sourceFile The file or directory to examine * @return table * @see ParquetSingleFileLayout @@ -113,6 +151,18 @@ public static Table readTable(@NotNull final File sourceFile) { /** * Reads in a table from a single parquet, metadata file, or directory with recognized layout. * + *

+ * This method attempts to "do the right thing." It examines the source to determine if it's a single parquet file, + * a metadata file, or a directory. If it's a directory, it additionally tries to guess the layout to use. Unless a + * metadata file is supplied or discovered in the directory, the highest (by {@link ParquetTableLocationKey location + * key} order) location found will be used to infer schema. + * + *

+ * Delegates to one of {@link #readSingleFileTable(File, ParquetInstructions)}, + * {@link #readPartitionedTableWithMetadata(File, ParquetInstructions)}, + * {@link #readFlatPartitionedTable(File, ParquetInstructions)}, or + * {@link #readKeyValuePartitionedTable(File, ParquetInstructions)}. + * * @param sourceFile The file or directory to examine * @param readInstructions Instructions for customizations while reading * @return table @@ -551,6 +601,12 @@ public static void deleteTable(File path) { * metadata file is supplied or discovered in the directory, the highest (by {@link ParquetTableLocationKey location * key} order) location found will be used to infer schema. * + *

+ * Delegates to one of {@link #readSingleFileTable(File, ParquetInstructions)}, + * {@link #readPartitionedTableWithMetadata(File, ParquetInstructions)}, + * {@link #readFlatPartitionedTable(File, ParquetInstructions)}, or + * {@link #readKeyValuePartitionedTable(File, ParquetInstructions)}. + * * @param source The source file or directory * @param instructions Instructions for reading * @return A {@link Table} @@ -566,16 +622,7 @@ private static Table readTableInternal( final BasicFileAttributes sourceAttr = readAttributes(sourcePath); if (sourceAttr.isRegularFile()) { if (sourceFileName.endsWith(PARQUET_FILE_EXTENSION)) { - if (instructions.isRefreshing()) { - throw new IllegalArgumentException("Unable to have a refreshing single parquet file"); - } - final ParquetTableLocationKey tableLocationKey = new ParquetTableLocationKey(source, 0, null); - final Pair>, ParquetInstructions> schemaInfo = convertSchema( - tableLocationKey.getFileReader().getSchema(), - tableLocationKey.getMetadata().getFileMetaData().getKeyValueMetaData(), - instructions); - return readSingleFileTable(tableLocationKey, schemaInfo.getSecond(), - TableDefinition.of(schemaInfo.getFirst())); + return readSingleFileTable(source, instructions); } if (sourceFileName.equals(ParquetMetadataFileLayout.METADATA_FILE_NAME)) { return readPartitionedTableWithMetadata(source.getParentFile(), instructions); @@ -594,10 +641,7 @@ private static Table readTableInternal( final Path firstEntryPath; // Ignore dot files while looking for the first entry try (final DirectoryStream sourceStream = - Files.newDirectoryStream(sourcePath, (path) -> { - final String filename = path.getFileName().toString(); - return !filename.isEmpty() && filename.charAt(0) != '.'; - })) { + Files.newDirectoryStream(sourcePath, ParquetTools::ignoreDotFiles)) { final Iterator entryIterator = sourceStream.iterator(); if (!entryIterator.hasNext()) { throw new TableDataException("Source directory " + source + " is empty"); @@ -609,16 +653,21 @@ private static Table readTableInternal( final String firstEntryFileName = firstEntryPath.getFileName().toString(); final BasicFileAttributes firstEntryAttr = readAttributes(firstEntryPath); if (firstEntryAttr.isDirectory() && firstEntryFileName.contains("=")) { - return readPartitionedTableInferSchema(new ParquetKeyValuePartitionedLayout(source, 32), instructions); + return readKeyValuePartitionedTable(source, instructions); } if (firstEntryAttr.isRegularFile() && firstEntryFileName.endsWith(PARQUET_FILE_EXTENSION)) { - return readPartitionedTableInferSchema(new ParquetFlatPartitionedLayout(source), instructions); + return readFlatPartitionedTable(source, instructions); } throw new TableDataException("No recognized Parquet table layout found in " + source); } throw new TableDataException("Source " + source + " is neither a directory nor a regular file"); } + private static boolean ignoreDotFiles(Path path) { + final String filename = path.getFileName().toString(); + return !filename.isEmpty() && filename.charAt(0) != '.'; + } + private static BasicFileAttributes readAttributes(@NotNull final Path path) { try { return Files.readAttributes(path, BasicFileAttributes.class); @@ -630,6 +679,10 @@ private static BasicFileAttributes readAttributes(@NotNull final Path path) { /** * Reads in a table from a single parquet file using the provided table definition. * + *

+ * Callers may prefer the simpler methods {@link #readSingleFileTable(File, ParquetInstructions)} or + * {@link #readSingleFileTable(File, ParquetInstructions, TableDefinition)}. + * * @param tableLocationKey The {@link ParquetTableLocationKey location keys} to include * @param readInstructions Instructions for customizations while reading * @param tableDefinition The table's {@link TableDefinition definition} @@ -639,6 +692,9 @@ public static Table readSingleFileTable( @NotNull final ParquetTableLocationKey tableLocationKey, @NotNull final ParquetInstructions readInstructions, @NotNull final TableDefinition tableDefinition) { + if (readInstructions.isRefreshing()) { + throw new IllegalArgumentException("Unable to have a refreshing single parquet file"); + } final TableLocationProvider locationProvider = new PollingTableLocationProvider<>( StandaloneTableKey.getInstance(), new KnownLocationKeyFinder<>(tableLocationKey), @@ -649,6 +705,27 @@ public static Table readSingleFileTable( RegionedTableComponentFactoryImpl.INSTANCE, locationProvider, null); } + /** + * Reads in a table from files discovered with {@code locationKeyFinder} using a definition built from the highest + * (by {@link ParquetTableLocationKey location key} order) location found, which must have non-null partition values + * for all partition keys. + * + * @param locationKeyFinder The source of {@link ParquetTableLocationKey location keys} to include + * @param readInstructions Instructions for customizations while reading + * @return The table + */ + public static Table readPartitionedTable( + @NotNull final TableLocationKeyFinder locationKeyFinder, + @NotNull final ParquetInstructions readInstructions) { + final KnownLocationKeyFinder inferenceKeys = toKnownKeys(locationKeyFinder); + final Pair inference = infer(inferenceKeys, readInstructions); + return readPartitionedTable( + // In the case of a static output table, we can re-use the already fetched inference keys + readInstructions.isRefreshing() ? locationKeyFinder : inferenceKeys, + inference.getSecond(), + inference.getFirst()); + } + /** * Reads in a table from files discovered with {@code locationKeyFinder} using the provided table definition. * @@ -661,19 +738,31 @@ public static Table readPartitionedTable( @NotNull final TableLocationKeyFinder locationKeyFinder, @NotNull final ParquetInstructions readInstructions, @NotNull final TableDefinition tableDefinition) { - final TableLocationProvider locationProvider = new PollingTableLocationProvider<>( - StandaloneTableKey.getInstance(), - locationKeyFinder, - new ParquetTableLocationFactory(readInstructions), - readInstructions.isRefreshing() ? TableDataRefreshService.getSharedRefreshService() : null); + final String description; + final TableLocationKeyFinder keyFinder; + final TableDataRefreshService refreshService; + final UpdateSourceRegistrar updateSourceRegistrar; + if (readInstructions.isRefreshing()) { + keyFinder = locationKeyFinder; + description = "Read refreshing parquet files with " + keyFinder; + refreshService = TableDataRefreshService.getSharedRefreshService(); + updateSourceRegistrar = ExecutionContext.getContext().getUpdateGraph(); + } else { + keyFinder = toKnownKeys(locationKeyFinder); + description = "Read multiple parquet files with " + keyFinder; + refreshService = null; + updateSourceRegistrar = null; + } return new PartitionAwareSourceTable( tableDefinition, - readInstructions.isRefreshing() - ? "Read refreshing parquet files with " + locationKeyFinder - : "Read multiple parquet files with " + locationKeyFinder, + description, RegionedTableComponentFactoryImpl.INSTANCE, - locationProvider, - readInstructions.isRefreshing() ? ExecutionContext.getContext().getUpdateGraph() : null); + new PollingTableLocationProvider<>( + StandaloneTableKey.getInstance(), + keyFinder, + new ParquetTableLocationFactory(readInstructions), + refreshService), + updateSourceRegistrar); } /** @@ -684,22 +773,23 @@ public static Table readPartitionedTable( * @param locationKeyFinder The source of {@link ParquetTableLocationKey location keys} to include * @param readInstructions Instructions for customizations while reading * @return The table + * @deprecated use {@link #readPartitionedTable(TableLocationKeyFinder, ParquetInstructions)} */ + @Deprecated public static Table readPartitionedTableInferSchema( @NotNull final TableLocationKeyFinder locationKeyFinder, @NotNull final ParquetInstructions readInstructions) { - final KnownLocationKeyFinder sortedKeys = - KnownLocationKeyFinder.copyFrom(locationKeyFinder, Comparator.naturalOrder()); - if (sortedKeys.getKnownKeys().isEmpty()) { - if (readInstructions.isRefreshing()) { - throw new IllegalArgumentException( - "Unable to infer schema for a refreshing partitioned parquet table when there are no initial parquet files"); - } - return TableTools.emptyTable(0); + return readPartitionedTable(locationKeyFinder, readInstructions); + } + + private static Pair infer( + KnownLocationKeyFinder inferenceKeys, ParquetInstructions readInstructions) { + // TODO(deephaven-core#877): Support schema merge when discovering multiple parquet files + final ParquetTableLocationKey lastKey = inferenceKeys.getLastKey().orElse(null); + if (lastKey == null) { + throw new IllegalArgumentException( + "Unable to infer schema for a partitioned parquet table when there are no initial parquet files"); } - // TODO (https://github.com/deephaven/deephaven-core/issues/877): Support schema merge when discovering multiple - // parquet files - final ParquetTableLocationKey lastKey = sortedKeys.getKnownKeys().get(sortedKeys.getKnownKeys().size() - 1); final Pair>, ParquetInstructions> schemaInfo = convertSchema( lastKey.getFileReader().getSchema(), lastKey.getMetadata().getFileMetaData().getKeyValueMetaData(), @@ -712,19 +802,23 @@ public static Table readPartitionedTableInferSchema( throw new IllegalArgumentException(String.format( "Last location key %s has null partition value at partition key %s", lastKey, partitionKey)); } - // Primitives should be unboxed, except booleans Class dataType = partitionValue.getClass(); if (dataType != Boolean.class) { dataType = getUnboxedTypeIfBoxed(partitionValue.getClass()); } - allColumns.add(ColumnDefinition.fromGenericType(partitionKey, dataType, null, ColumnDefinition.ColumnType.Partitioning)); } allColumns.addAll(schemaInfo.getFirst()); - return readPartitionedTable(readInstructions.isRefreshing() ? locationKeyFinder : sortedKeys, - schemaInfo.getSecond(), TableDefinition.of(allColumns)); + return new Pair<>(TableDefinition.of(allColumns), schemaInfo.getSecond()); + } + + private static KnownLocationKeyFinder toKnownKeys( + TableLocationKeyFinder keyFinder) { + return keyFinder instanceof KnownLocationKeyFinder + ? (KnownLocationKeyFinder) keyFinder + : KnownLocationKeyFinder.copyFrom(keyFinder, Comparator.naturalOrder()); } /** @@ -741,6 +835,127 @@ public static Table readPartitionedTableWithMetadata( return readPartitionedTable(layout, layout.getInstructions(), layout.getTableDefinition()); } + /** + * Creates a partitioned table via the key-value partitioned parquet files from the root {@code directory}, + * inferring the table definition from those files. + * + *

+ * Callers wishing to be more explicit and skip the inference step may prefer to call + * {@link #readKeyValuePartitionedTable(File, ParquetInstructions, TableDefinition)}. + * + * @param directory the source of {@link ParquetTableLocationKey location keys} to include + * @param readInstructions the instructions for customizations while reading + * @return the table + * @see ParquetKeyValuePartitionedLayout#ParquetKeyValuePartitionedLayout(File, int) + * @see #readPartitionedTable(TableLocationKeyFinder, ParquetInstructions) + */ + public static Table readKeyValuePartitionedTable( + @NotNull final File directory, + @NotNull final ParquetInstructions readInstructions) { + return readPartitionedTable(new ParquetKeyValuePartitionedLayout(directory, MAX_PARTITIONING_LEVELS_INFERENCE), + readInstructions); + } + + /** + * Creates a partitioned table via the key-value partitioned parquet files from the root {@code directory} using the + * provided {@code tableDefinition}. + * + * @param directory the source of {@link ParquetTableLocationKey location keys} to include + * @param readInstructions the instructions for customizations while reading + * @param tableDefinition the table definition + * @return the table + * @see ParquetKeyValuePartitionedLayout#ParquetKeyValuePartitionedLayout(File, TableDefinition) + * @see #readPartitionedTable(TableLocationKeyFinder, ParquetInstructions, TableDefinition) + */ + public static Table readKeyValuePartitionedTable( + @NotNull final File directory, + @NotNull final ParquetInstructions readInstructions, + @NotNull final TableDefinition tableDefinition) { + if (tableDefinition.getColumnStream().noneMatch(ColumnDefinition::isPartitioning)) { + throw new IllegalArgumentException("No partitioning columns"); + } + return readPartitionedTable(new ParquetKeyValuePartitionedLayout(directory, tableDefinition), readInstructions, + tableDefinition); + } + + /** + * Creates a partitioned table via the flat parquet files from the root {@code directory}, inferring the table + * definition from those files. + * + *

+ * Callers wishing to be more explicit and skip the inference step may prefer to call + * {@link #readFlatPartitionedTable(File, ParquetInstructions, TableDefinition)}. + * + * @param directory the source of {@link ParquetTableLocationKey location keys} to include + * @param readInstructions the instructions for customizations while reading + * @return the table + * @see #readPartitionedTable(TableLocationKeyFinder, ParquetInstructions) + * @see ParquetFlatPartitionedLayout#ParquetFlatPartitionedLayout(File) + */ + public static Table readFlatPartitionedTable( + @NotNull final File directory, + @NotNull final ParquetInstructions readInstructions) { + return readPartitionedTable(new ParquetFlatPartitionedLayout(directory), readInstructions); + } + + /** + * Creates a partitioned table via the flat parquet files from the root {@code directory} using the provided + * {@code tableDefinition}. + * + * @param directory the source of {@link ParquetTableLocationKey location keys} to include + * @param readInstructions the instructions for customizations while reading + * @param tableDefinition the table definition + * @return the table + * @see #readPartitionedTable(TableLocationKeyFinder, ParquetInstructions, TableDefinition) + * @see ParquetFlatPartitionedLayout#ParquetFlatPartitionedLayout(File) + */ + public static Table readFlatPartitionedTable( + @NotNull final File directory, + @NotNull final ParquetInstructions readInstructions, + @NotNull final TableDefinition tableDefinition) { + return readPartitionedTable(new ParquetFlatPartitionedLayout(directory), readInstructions, tableDefinition); + } + + /** + * Creates a single table via the parquet {@code file} using the table definition derived from that {@code file}. + * + *

+ * Callers wishing to be more explicit (for example, to skip some columns) may prefer to call + * {@link #readSingleFileTable(File, ParquetInstructions, TableDefinition)}. + * + * @param file the parquet file + * @param readInstructions the instructions for customizations while reading + * @return the table + * @see ParquetTableLocationKey#ParquetTableLocationKey(File, int, Map) + * @see #readSingleFileTable(ParquetTableLocationKey, ParquetInstructions, TableDefinition) + */ + public static Table readSingleFileTable( + @NotNull final File file, + @NotNull final ParquetInstructions readInstructions) { + final ParquetSingleFileLayout keyFinder = new ParquetSingleFileLayout(file); + final KnownLocationKeyFinder inferenceKeys = toKnownKeys(keyFinder); + final Pair inference = infer(inferenceKeys, readInstructions); + return readSingleFileTable(inferenceKeys.getFirstKey().orElseThrow(), inference.getSecond(), + inference.getFirst()); + } + + /** + * Creates a single table via the parquet {@code file} using the provided {@code tableDefinition}. + * + * @param file the parquet file + * @param readInstructions the instructions for customizations while reading + * @param tableDefinition the table definition + * @return the table + * @see ParquetTableLocationKey#ParquetTableLocationKey(File, int, Map) + * @see #readSingleFileTable(ParquetTableLocationKey, ParquetInstructions, TableDefinition) + */ + public static Table readSingleFileTable( + @NotNull final File file, + @NotNull final ParquetInstructions readInstructions, + @NotNull final TableDefinition tableDefinition) { + return readSingleFileTable(new ParquetTableLocationKey(file, 0, null), readInstructions, tableDefinition); + } + private static final SimpleTypeMap> VECTOR_TYPE_MAP = SimpleTypeMap.create( null, CharVector.class, ByteVector.class, ShortVector.class, IntVector.class, LongVector.class, FloatVector.class, DoubleVector.class, ObjectVector.class); diff --git a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java index 5924b9bbfa6..4727ba36205 100644 --- a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java +++ b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java @@ -6,7 +6,6 @@ import io.deephaven.UncheckedDeephavenException; import io.deephaven.api.Selectable; import io.deephaven.base.FileUtils; -import io.deephaven.datastructures.util.CollectionUtil; import io.deephaven.engine.context.ExecutionContext; import io.deephaven.engine.primitive.function.ByteConsumer; import io.deephaven.engine.primitive.function.CharConsumer; @@ -15,13 +14,15 @@ import io.deephaven.engine.primitive.iterator.CloseableIterator; import io.deephaven.engine.table.ColumnDefinition; import io.deephaven.engine.table.ColumnSource; +import io.deephaven.engine.table.impl.SourceTable; +import io.deephaven.engine.table.impl.locations.TableDataException; import io.deephaven.engine.table.impl.select.FunctionalColumn; import io.deephaven.engine.table.impl.select.SelectColumn; import io.deephaven.engine.table.impl.sources.ReinterpretUtils; import io.deephaven.engine.table.impl.util.ColumnHolder; import io.deephaven.engine.table.impl.select.FormulaEvaluationException; import io.deephaven.engine.table.iterators.*; -import io.deephaven.engine.testutil.TstUtils; +import io.deephaven.engine.testutil.ControlledUpdateGraph; import io.deephaven.engine.testutil.junit4.EngineCleanup; import io.deephaven.engine.util.BigDecimalUtils; import io.deephaven.engine.util.file.TrackedFileHandleFactory; @@ -36,6 +37,7 @@ import io.deephaven.engine.util.TableTools; import io.deephaven.engine.table.impl.QueryTable; import io.deephaven.test.types.OutOfBandTest; +import io.deephaven.util.QueryConstants; import io.deephaven.util.codec.SimpleByteArrayCodec; import junit.framework.TestCase; import org.apache.parquet.column.Encoding; @@ -80,6 +82,23 @@ import javax.annotation.Nullable; import static io.deephaven.engine.testutil.TstUtils.assertTableEquals; +import static io.deephaven.engine.util.TableTools.booleanCol; +import static io.deephaven.engine.util.TableTools.byteCol; +import static io.deephaven.engine.util.TableTools.charCol; +import static io.deephaven.engine.util.TableTools.doubleCol; +import static io.deephaven.engine.util.TableTools.floatCol; +import static io.deephaven.engine.util.TableTools.instantCol; +import static io.deephaven.engine.util.TableTools.intCol; +import static io.deephaven.engine.util.TableTools.longCol; +import static io.deephaven.engine.util.TableTools.merge; +import static io.deephaven.engine.util.TableTools.newTable; +import static io.deephaven.engine.util.TableTools.shortCol; +import static io.deephaven.engine.util.TableTools.stringCol; +import static io.deephaven.parquet.table.ParquetTools.readFlatPartitionedTable; +import static io.deephaven.parquet.table.ParquetTools.readKeyValuePartitionedTable; +import static io.deephaven.parquet.table.ParquetTools.readSingleFileTable; +import static io.deephaven.parquet.table.ParquetTools.readTable; +import static io.deephaven.parquet.table.ParquetTools.writeTable; import static io.deephaven.util.QueryConstants.*; import static org.junit.Assert.*; @@ -89,6 +108,9 @@ public final class ParquetTableReadWriteTest { private static final String ROOT_FILENAME = ParquetTableReadWriteTest.class.getName() + "_root"; private static final int LARGE_TABLE_SIZE = 2_000_000; + private static final ParquetInstructions EMPTY = ParquetInstructions.EMPTY; + private static final ParquetInstructions REFRESHING = ParquetInstructions.builder().setIsRefreshing(true).build(); + private static File rootFile; @Rule @@ -234,34 +256,30 @@ private static Table getGroupedTable(int size, boolean includeSerializable) { private void flatTable(String tableName, int size, boolean includeSerializable) { final Table tableToSave = getTableFlat(size, includeSerializable, true); final File dest = new File(rootFile, "ParquetTest_" + tableName + "_test.parquet"); - ParquetTools.writeTable(tableToSave, dest); - final Table fromDisk = ParquetTools.readTable(dest); - TstUtils.assertTableEquals(maybeFixBigDecimal(tableToSave), fromDisk); + writeTable(tableToSave, dest); + checkSingleTable(maybeFixBigDecimal(tableToSave), dest); } private void groupedTable(String tableName, int size, boolean includeSerializable) { final Table tableToSave = getGroupedTable(size, includeSerializable); final File dest = new File(rootFile, "ParquetTest_" + tableName + "_test.parquet"); - ParquetTools.writeTable(tableToSave, dest, tableToSave.getDefinition()); - final Table fromDisk = ParquetTools.readTable(dest); - TstUtils.assertTableEquals(tableToSave, fromDisk); + writeTable(tableToSave, dest, tableToSave.getDefinition()); + checkSingleTable(tableToSave, dest); } private void groupedOneColumnTable(String tableName, int size) { final Table tableToSave = getGroupedOneColumnTable(size); TableTools.show(tableToSave, 50); final File dest = new File(rootFile, "ParquetTest_" + tableName + "_test.parquet"); - ParquetTools.writeTable(tableToSave, dest, tableToSave.getDefinition()); - final Table fromDisk = ParquetTools.readTable(dest); - TstUtils.assertTableEquals(tableToSave, fromDisk); + writeTable(tableToSave, dest, tableToSave.getDefinition()); + checkSingleTable(tableToSave, dest); } private void testEmptyArrayStore(String tableName, int size) { final Table tableToSave = getEmptyArray(size); final File dest = new File(rootFile, "ParquetTest_" + tableName + "_test.parquet"); - ParquetTools.writeTable(tableToSave, dest, tableToSave.getDefinition()); - final Table fromDisk = ParquetTools.readTable(dest); - TstUtils.assertTableEquals(tableToSave, fromDisk); + writeTable(tableToSave, dest, tableToSave.getDefinition()); + checkSingleTable(tableToSave, dest); } @Test @@ -269,9 +287,8 @@ public void emptyTrivialTable() { final Table t = TableTools.emptyTable(0).select("A = i"); assertEquals(int.class, t.getDefinition().getColumn("A").getDataType()); final File dest = new File(rootFile, "ParquetTest_emptyTrivialTable.parquet"); - ParquetTools.writeTable(t, dest); - final Table fromDisk = ParquetTools.readTable(dest); - TstUtils.assertTableEquals(t, fromDisk); + writeTable(t, dest); + final Table fromDisk = checkSingleTable(t, dest); assertEquals(t.getDefinition(), fromDisk.getDefinition()); } @@ -301,9 +318,8 @@ public void groupingByLongKey() { ((QueryTable) TableTools.emptyTable(10).select("someInt = i", "someLong = ii % 3") .groupBy("someLong").ungroup("someInt")).withDefinitionUnsafe(definition); final File dest = new File(rootFile, "ParquetTest_groupByLong_test.parquet"); - ParquetTools.writeTable(testTable, dest); - final Table fromDisk = ParquetTools.readTable(dest); - TstUtils.assertTableEquals(fromDisk, testTable); + writeTable(testTable, dest); + final Table fromDisk = checkSingleTable(testTable, dest); TestCase.assertNotNull(fromDisk.getColumnSource("someLong").getGroupToRange()); } @@ -317,9 +333,8 @@ public void groupingByStringKey() { .where("i % 2 == 0").groupBy("someString").ungroup("someInt")) .withDefinitionUnsafe(definition); final File dest = new File(rootFile, "ParquetTest_groupByString_test.parquet"); - ParquetTools.writeTable(testTable, dest); - final Table fromDisk = ParquetTools.readTable(dest); - TstUtils.assertTableEquals(fromDisk, testTable); + writeTable(testTable, dest); + final Table fromDisk = checkSingleTable(testTable, dest); TestCase.assertNotNull(fromDisk.getColumnSource("someString").getGroupToRange()); } @@ -333,19 +348,17 @@ public void groupingByBigInt() { .select("someInt = i", "someBigInt = BigInteger.valueOf(i % 3)").where("i % 2 == 0") .groupBy("someBigInt").ungroup("someInt")).withDefinitionUnsafe(definition); final File dest = new File(rootFile, "ParquetTest_groupByBigInt_test.parquet"); - ParquetTools.writeTable(testTable, dest); - final Table fromDisk = ParquetTools.readTable(dest); - TstUtils.assertTableEquals(fromDisk, testTable); + writeTable(testTable, dest); + final Table fromDisk = checkSingleTable(testTable, dest); TestCase.assertNotNull(fromDisk.getColumnSource("someBigInt").getGroupToRange()); } private void compressionCodecTestHelper(final ParquetInstructions codec) { File dest = new File(rootFile + File.separator + "Table1.parquet"); final Table table1 = getTableFlat(10000, false, true); - ParquetTools.writeTable(table1, dest, codec); + writeTable(table1, dest, codec); assertTrue(dest.length() > 0L); - final Table table2 = ParquetTools.readTable(dest); - TstUtils.assertTableEquals(maybeFixBigDecimal(table1), table2); + checkSingleTable(maybeFixBigDecimal(table1), dest); } @Test @@ -368,24 +381,23 @@ public void test_lz4_compressed() { // Write and read a LZ4 compressed file File dest = new File(rootFile + File.separator + "Table.parquet"); final Table table = getTableFlat(100, false, false); - ParquetTools.writeTable(table, dest, ParquetTools.LZ4); - Table fromDisk = ParquetTools.readTable(dest).select(); - TstUtils.assertTableEquals(fromDisk, table); + writeTable(table, dest, ParquetTools.LZ4); + + final Table fromDisk = checkSingleTable(table, dest).select(); try { // The following file is tagged as LZ4 compressed based on its metadata, but is actually compressed with // LZ4_RAW. We should be able to read it anyway with no exceptions. String path = TestParquetTools.class.getResource("/sample_lz4_compressed.parquet").getFile(); - fromDisk = ParquetTools.readTable(path).select(); + readSingleFileTable(new File(path), EMPTY).select(); } catch (RuntimeException e) { TestCase.fail("Failed to read parquet file sample_lz4_compressed.parquet"); } File randomDest = new File(rootFile, "random.parquet"); - ParquetTools.writeTable(fromDisk, randomDest, ParquetTools.LZ4_RAW); + writeTable(fromDisk, randomDest, ParquetTools.LZ4_RAW); // Read the LZ4 compressed file again, to make sure we use a new adapter - fromDisk = ParquetTools.readTable(dest).select(); - TstUtils.assertTableEquals(fromDisk, table); + checkSingleTable(table, randomDest); } @Test @@ -422,11 +434,10 @@ public void testBigDecimalPrecisionScale() { final BigDecimal myBigDecimal = new BigDecimal(".0005"); assertEquals(1, myBigDecimal.precision()); assertEquals(4, myBigDecimal.scale()); - final Table table = TableTools - .newTable(new ColumnHolder<>("MyBigDecimal", BigDecimal.class, null, false, myBigDecimal)); + final Table table = newTable(new ColumnHolder<>("MyBigDecimal", BigDecimal.class, null, false, myBigDecimal)); final File dest = new File(rootFile, "ParquetTest_testBigDecimalPrecisionScale.parquet"); - ParquetTools.writeTable(table, dest); - final Table fromDisk = ParquetTools.readTable(dest); + writeTable(table, dest); + final Table fromDisk = readSingleFileTable(dest, EMPTY); try (final CloseableIterator it = fromDisk.objectColumnIterator("MyBigDecimal")) { assertTrue(it.hasNext()); final BigDecimal item = it.next(); @@ -436,14 +447,13 @@ public void testBigDecimalPrecisionScale() { } private static void writeReadTableTest(final Table table, final File dest) { - writeReadTableTest(table, dest, ParquetInstructions.EMPTY); + writeReadTableTest(table, dest, EMPTY); } private static void writeReadTableTest(final Table table, final File dest, final ParquetInstructions writeInstructions) { - ParquetTools.writeTable(table, dest, writeInstructions); - final Table fromDisk = ParquetTools.readTable(dest); - TstUtils.assertTableEquals(table, fromDisk); + writeTable(table, dest, writeInstructions); + checkSingleTable(table, dest); } @Test @@ -623,9 +633,9 @@ private interface TestParquetTableWriter { void writeTable(final Table table, final File destFile); } - TestParquetTableWriter singleWriter = (table, destFile) -> ParquetTools.writeTable(table, destFile); - TestParquetTableWriter multiWriter = (table, destFile) -> ParquetTools.writeTables(new Table[] {table}, - table.getDefinition(), new File[] {destFile}); + private static final TestParquetTableWriter SINGLE_WRITER = ParquetTools::writeTable; + private static final TestParquetTableWriter MULTI_WRITER = (table, destFile) -> ParquetTools + .writeTables(new Table[] {table}, table.getDefinition(), new File[] {destFile}); /** * Verify that the parent directory contains the expected parquet files and index files in the right directory @@ -668,8 +678,8 @@ private static void verifyFilesInDir(final File parentDir, final String[] expect */ @Test public void basicWriteTests() { - basicWriteTestsImpl(singleWriter); - basicWriteTestsImpl(multiWriter); + basicWriteTestsImpl(SINGLE_WRITER); + basicWriteTestsImpl(MULTI_WRITER); } private static void basicWriteTestsImpl(TestParquetTableWriter writer) { @@ -684,8 +694,8 @@ private static void basicWriteTestsImpl(TestParquetTableWriter writer) { final File destFile = new File(parentDir, filename); writer.writeTable(tableToSave, destFile); verifyFilesInDir(parentDir, new String[] {filename}, null); - Table fromDisk = ParquetTools.readTable(destFile); - TstUtils.assertTableEquals(fromDisk, tableToSave); + + checkSingleTable(tableToSave, destFile); // This write should fail final Table badTable = TableTools.emptyTable(5) @@ -699,15 +709,13 @@ private static void basicWriteTestsImpl(TestParquetTableWriter writer) { // Make sure that original file is preserved and no temporary files verifyFilesInDir(parentDir, new String[] {filename}, null); - fromDisk = ParquetTools.readTable(destFile); - TstUtils.assertTableEquals(fromDisk, tableToSave); + checkSingleTable(tableToSave, destFile); // Write a new table successfully at the same path final Table newTableToSave = TableTools.emptyTable(5).update("A=(int)i"); writer.writeTable(newTableToSave, destFile); verifyFilesInDir(parentDir, new String[] {filename}, null); - fromDisk = ParquetTools.readTable(destFile); - TstUtils.assertTableEquals(fromDisk, newTableToSave); + checkSingleTable(newTableToSave, destFile); FileUtils.deleteRecursively(parentDir); } @@ -737,8 +745,8 @@ public void writeMultiTableBasicTest() { ParquetTools.writeTables(tablesToSave, firstTable.getDefinition(), destFiles); verifyFilesInDir(parentDir, new String[] {firstFilename, secondFilename}, null); - TstUtils.assertTableEquals(ParquetTools.readTable(firstDestFile), firstTable); - TstUtils.assertTableEquals(ParquetTools.readTable(secondDestFile), secondTable); + checkSingleTable(firstTable, firstDestFile); + checkSingleTable(secondTable, secondDestFile); } /** @@ -780,8 +788,8 @@ public void writeMultiTableExceptionTest() { */ @Test public void groupingColumnsBasicWriteTests() { - groupingColumnsBasicWriteTestsImpl(singleWriter); - groupingColumnsBasicWriteTestsImpl(multiWriter); + groupingColumnsBasicWriteTestsImpl(SINGLE_WRITER); + groupingColumnsBasicWriteTestsImpl(MULTI_WRITER); } public void groupingColumnsBasicWriteTestsImpl(TestParquetTableWriter writer) { @@ -795,7 +803,7 @@ public void groupingColumnsBasicWriteTestsImpl(TestParquetTableWriter writer) { data[i] = i / 4; } final TableDefinition tableDefinition = TableDefinition.of(ColumnDefinition.ofInt("vvv").withGrouping()); - final Table tableToSave = TableTools.newTable(tableDefinition, TableTools.col("vvv", data)); + final Table tableToSave = newTable(tableDefinition, TableTools.col("vvv", data)); final String destFilename = "groupingColumnsWriteTests.parquet"; final File destFile = new File(parentDir, destFilename); @@ -803,8 +811,7 @@ public void groupingColumnsBasicWriteTestsImpl(TestParquetTableWriter writer) { String vvvIndexFilePath = ".dh_metadata/indexes/vvv/index_vvv_groupingColumnsWriteTests.parquet"; verifyFilesInDir(parentDir, new String[] {destFilename}, Map.of("vvv", new String[] {vvvIndexFilePath})); - Table fromDisk = ParquetTools.readTable(destFile); - TstUtils.assertTableEquals(fromDisk, tableToSave); + checkSingleTable(tableToSave, destFile); // Verify that the key-value metadata in the file has the correct name ParquetTableLocationKey tableLocationKey = new ParquetTableLocationKey(destFile, 0, null); @@ -813,7 +820,7 @@ public void groupingColumnsBasicWriteTestsImpl(TestParquetTableWriter writer) { // Write another table but this write should fail final TableDefinition badTableDefinition = TableDefinition.of(ColumnDefinition.ofInt("www").withGrouping()); - final Table badTable = TableTools.newTable(badTableDefinition, TableTools.col("www", data)) + final Table badTable = newTable(badTableDefinition, TableTools.col("www", data)) .updateView("InputString = ii % 2 == 0 ? Long.toString(ii) : null", "A=InputString.charAt(0)"); try { writer.writeTable(badTable, destFile); @@ -824,8 +831,7 @@ public void groupingColumnsBasicWriteTestsImpl(TestParquetTableWriter writer) { // Make sure that original file is preserved and no temporary files verifyFilesInDir(parentDir, new String[] {destFilename}, Map.of("vvv", new String[] {vvvIndexFilePath})); - fromDisk = ParquetTools.readTable(destFile); - TstUtils.assertTableEquals(fromDisk, tableToSave); + checkSingleTable(tableToSave, destFile); FileUtils.deleteRecursively(parentDir); } @@ -838,7 +844,7 @@ public void legacyGroupingFileReadTest() { // Read the legacy file and verify that grouping column is read correctly final Table fromDisk; try { - fromDisk = ParquetTools.readTable(destFile); + fromDisk = readSingleFileTable(destFile, EMPTY); } catch (RuntimeException e) { if (e.getCause() instanceof InvalidParquetFileException) { final String InvalidParquetFileErrorMsgString = "Invalid parquet file detected, please ensure the " + @@ -864,8 +870,8 @@ public void legacyGroupingFileReadTest() { } final TableDefinition tableDefinition = TableDefinition.of(ColumnDefinition.ofInt(groupingColName).withGrouping()); - final Table table = TableTools.newTable(tableDefinition, TableTools.col(groupingColName, data)); - TstUtils.assertTableEquals(fromDisk, table); + final Table table = newTable(tableDefinition, TableTools.col(groupingColName, data)); + assertTableEquals(fromDisk, table); } @Test @@ -880,17 +886,17 @@ public void parquetDirectoryWithDotFilesTest() throws IOException { data[i] = i / 4; } final TableDefinition tableDefinition = TableDefinition.of(ColumnDefinition.ofInt("vvv").withGrouping()); - final Table tableToSave = TableTools.newTable(tableDefinition, TableTools.col("vvv", data)); + final Table tableToSave = newTable(tableDefinition, TableTools.col("vvv", data)); final String destFilename = "data.parquet"; final File destFile = new File(parentDir, destFilename); - ParquetTools.writeTable(tableToSave, destFile); + writeTable(tableToSave, destFile); String vvvIndexFilePath = ".dh_metadata/indexes/vvv/index_vvv_data.parquet"; verifyFilesInDir(parentDir, new String[] {destFilename}, Map.of("vvv", new String[] {vvvIndexFilePath})); // Call readTable on parent directory - Table fromDisk = ParquetTools.readTable(parentDir); - TstUtils.assertTableEquals(fromDisk, tableToSave); + Table fromDisk = readFlatPartitionedTable(parentDir, EMPTY); + assertTableEquals(fromDisk, tableToSave); // Add an empty dot file and dot directory (with valid parquet files) in the parent directory final File dotFile = new File(parentDir, ".dotFile"); @@ -898,16 +904,16 @@ public void parquetDirectoryWithDotFilesTest() throws IOException { final File dotDir = new File(parentDir, ".dotDir"); assertTrue(dotDir.mkdir()); final Table someTable = TableTools.emptyTable(5).update("A=(int)i"); - ParquetTools.writeTable(someTable, new File(dotDir, "data.parquet")); - fromDisk = ParquetTools.readTable(parentDir); - TstUtils.assertTableEquals(fromDisk, tableToSave); + writeTable(someTable, new File(dotDir, "data.parquet")); + fromDisk = readFlatPartitionedTable(parentDir, EMPTY); + assertTableEquals(fromDisk, tableToSave); // Add a dot parquet in parent directory final Table anotherTable = TableTools.emptyTable(5).update("A=(int)i"); final File pqDotFile = new File(parentDir, ".dotFile.parquet"); - ParquetTools.writeTable(anotherTable, pqDotFile); - fromDisk = ParquetTools.readTable(parentDir); - TstUtils.assertTableEquals(fromDisk, tableToSave); + writeTable(anotherTable, pqDotFile); + fromDisk = readFlatPartitionedTable(parentDir, EMPTY); + assertTableEquals(fromDisk, tableToSave); } @Test @@ -923,10 +929,10 @@ public void partitionedParquetWithDotFilesTest() throws IOException { final File secondPartition = new File(parentDir, "X=B"); final File secondDataFile = new File(secondPartition, "data.parquet"); - ParquetTools.writeTable(someTable, firstDataFile); - ParquetTools.writeTable(someTable, secondDataFile); + writeTable(someTable, firstDataFile); + writeTable(someTable, secondDataFile); - Table partitionedTable = ParquetTools.readTable(parentDir).select(); + Table partitionedTable = readKeyValuePartitionedTable(parentDir, EMPTY).select(); final Set columnsSet = partitionedTable.getDefinition().getColumnNameSet(); assertTrue(columnsSet.size() == 2 && columnsSet.contains("A") && columnsSet.contains("X")); @@ -935,16 +941,16 @@ public void partitionedParquetWithDotFilesTest() throws IOException { assertTrue(dotFile.createNewFile()); final File dotDir = new File(firstPartition, ".dotDir"); assertTrue(dotDir.mkdir()); - ParquetTools.writeTable(someTable, new File(dotDir, "data.parquet")); - Table fromDisk = ParquetTools.readTable(parentDir); - TstUtils.assertTableEquals(fromDisk, partitionedTable); + writeTable(someTable, new File(dotDir, "data.parquet")); + Table fromDisk = readKeyValuePartitionedTable(parentDir, EMPTY); + assertTableEquals(fromDisk, partitionedTable); // Add a dot parquet file in one of the partitions directory final Table anotherTable = TableTools.emptyTable(5).update("B=(int)i"); final File pqDotFile = new File(secondPartition, ".dotFile.parquet"); - ParquetTools.writeTable(anotherTable, pqDotFile); - fromDisk = ParquetTools.readTable(parentDir); - TstUtils.assertTableEquals(fromDisk, partitionedTable); + writeTable(anotherTable, pqDotFile); + fromDisk = readKeyValuePartitionedTable(parentDir, EMPTY); + assertTableEquals(fromDisk, partitionedTable); } /** @@ -961,11 +967,11 @@ public void writeMultiTableGroupingColumnTest() { data[i] = i / 4; } final TableDefinition tableDefinition = TableDefinition.of(ColumnDefinition.ofInt("vvv").withGrouping()); - final Table firstTable = TableTools.newTable(tableDefinition, TableTools.col("vvv", data)); + final Table firstTable = newTable(tableDefinition, TableTools.col("vvv", data)); final String firstFilename = "firstTable.parquet"; final File firstDestFile = new File(parentDir, firstFilename); - final Table secondTable = TableTools.newTable(tableDefinition, TableTools.col("vvv", data)); + final Table secondTable = newTable(tableDefinition, TableTools.col("vvv", data)); final String secondFilename = "secondTable.parquet"; final File secondDestFile = new File(parentDir, secondFilename); @@ -988,14 +994,14 @@ public void writeMultiTableGroupingColumnTest() { assertTrue(metadataString.contains(secondIndexFilePath)); // Read back the files and verify contents match - TstUtils.assertTableEquals(ParquetTools.readTable(firstDestFile), firstTable); - TstUtils.assertTableEquals(ParquetTools.readTable(secondDestFile), secondTable); + checkSingleTable(firstTable, firstDestFile); + checkSingleTable(secondTable, secondDestFile); } @Test public void groupingColumnsOverwritingTests() { - groupingColumnsOverwritingTestsImpl(singleWriter); - groupingColumnsOverwritingTestsImpl(multiWriter); + groupingColumnsOverwritingTestsImpl(SINGLE_WRITER); + groupingColumnsOverwritingTestsImpl(MULTI_WRITER); } public void groupingColumnsOverwritingTestsImpl(TestParquetTableWriter writer) { @@ -1009,7 +1015,7 @@ public void groupingColumnsOverwritingTestsImpl(TestParquetTableWriter writer) { data[i] = i / 4; } final TableDefinition tableDefinition = TableDefinition.of(ColumnDefinition.ofInt("vvv").withGrouping()); - final Table tableToSave = TableTools.newTable(tableDefinition, TableTools.col("vvv", data)); + final Table tableToSave = newTable(tableDefinition, TableTools.col("vvv", data)); final String destFilename = "groupingColumnsWriteTests.parquet"; final File destFile = new File(parentDir, destFilename); @@ -1018,7 +1024,7 @@ public void groupingColumnsOverwritingTestsImpl(TestParquetTableWriter writer) { // Write a new table successfully at the same position with different grouping columns final TableDefinition anotherTableDefinition = TableDefinition.of(ColumnDefinition.ofInt("xxx").withGrouping()); - Table anotherTableToSave = TableTools.newTable(anotherTableDefinition, TableTools.col("xxx", data)); + Table anotherTableToSave = newTable(anotherTableDefinition, TableTools.col("xxx", data)); writer.writeTable(anotherTableToSave, destFile); final String xxxIndexFilePath = ".dh_metadata/indexes/xxx/index_xxx_groupingColumnsWriteTests.parquet"; @@ -1028,8 +1034,7 @@ public void groupingColumnsOverwritingTestsImpl(TestParquetTableWriter writer) { Map.of("vvv", new String[] {vvvIndexFilePath}, "xxx", new String[] {xxxIndexFilePath})); - Table fromDisk = ParquetTools.readTable(destFile); - TstUtils.assertTableEquals(fromDisk, anotherTableToSave); + checkSingleTable(anotherTableToSave, destFile); ParquetTableLocationKey tableLocationKey = new ParquetTableLocationKey(destFile, 0, null); String metadataString = tableLocationKey.getMetadata().getFileMetaData().toString(); @@ -1056,8 +1061,8 @@ public void groupingColumnsOverwritingTestsImpl(TestParquetTableWriter writer) { @Test public void readChangedUnderlyingFileTests() { - readChangedUnderlyingFileTestsImpl(singleWriter); - readChangedUnderlyingFileTestsImpl(multiWriter); + readChangedUnderlyingFileTestsImpl(SINGLE_WRITER); + readChangedUnderlyingFileTestsImpl(MULTI_WRITER); } public void readChangedUnderlyingFileTestsImpl(TestParquetTableWriter writer) { @@ -1066,31 +1071,31 @@ public void readChangedUnderlyingFileTestsImpl(TestParquetTableWriter writer) { final String filename = "readChangedUnderlyingFileTests.parquet"; final File destFile = new File(rootFile, filename); writer.writeTable(tableToSave, destFile); - Table fromDisk = ParquetTools.readTable(destFile); + Table fromDisk = readSingleFileTable(destFile, EMPTY); // At this point, fromDisk is not fully materialized in the memory and would be read from the file on demand // Change the underlying file final Table stringTable = TableTools.emptyTable(5).update("InputString = Long.toString(ii)"); writer.writeTable(stringTable, destFile); - Table stringFromDisk = ParquetTools.readTable(destFile).select(); - TstUtils.assertTableEquals(stringTable, stringFromDisk); + Table stringFromDisk = readSingleFileTable(destFile, EMPTY).select(); + assertTableEquals(stringTable, stringFromDisk); // Close all the file handles so that next time when fromDisk is accessed, we need to reopen the file handle TrackedFileHandleFactory.getInstance().closeAll(); - // Read back fromDisk and compare it with original table. Since the underlying file has changed, - // assertTableEquals will try to read the file and would crash + // Read back fromDisk. Since the underlying file has changed, we expect this to fail. try { - TstUtils.assertTableEquals(tableToSave, fromDisk); - TestCase.fail(); - } catch (Exception ignored) { + fromDisk.coalesce(); + TestCase.fail("Expected TableDataException"); + } catch (TableDataException ignored) { + // expected } } @Test public void readModifyWriteTests() { - readModifyWriteTestsImpl(singleWriter); - readModifyWriteTestsImpl(multiWriter); + readModifyWriteTestsImpl(SINGLE_WRITER); + readModifyWriteTestsImpl(MULTI_WRITER); } public void readModifyWriteTestsImpl(TestParquetTableWriter writer) { @@ -1099,7 +1104,7 @@ public void readModifyWriteTestsImpl(TestParquetTableWriter writer) { final String filename = "readModifyWriteTests.parquet"; final File destFile = new File(rootFile, filename); writer.writeTable(tableToSave, destFile); - Table fromDisk = ParquetTools.readTable(destFile); + Table fromDisk = readSingleFileTable(destFile, EMPTY); // At this point, fromDisk is not fully materialized in the memory and would be read from the file on demand // Create a view table on fromDisk which should fail on writing, and try to write at the same location @@ -1119,7 +1124,7 @@ public void readModifyWriteTestsImpl(TestParquetTableWriter writer) { // Read back fromDisk and compare it with original table. If the underlying file has not been corrupted or // swapped out, then we would not be able to read from the file - TstUtils.assertTableEquals(tableToSave, fromDisk); + assertTableEquals(tableToSave, fromDisk); } @Test @@ -1134,9 +1139,8 @@ public void dictionaryEncodingTest() { .build(); final Table stringTable = TableTools.emptyTable(numRows).select(Selectable.from(columns)); final File dest = new File(rootFile + File.separator + "dictEncoding.parquet"); - ParquetTools.writeTable(stringTable, dest, writeInstructions); - Table fromDisk = ParquetTools.readTable(dest); - assertTableEquals(stringTable, fromDisk); + writeTable(stringTable, dest, writeInstructions); + checkSingleTable(stringTable, dest); // Verify that string columns are properly dictionary encoded final ParquetMetadata metadata = new ParquetTableLocationKey(dest, 0, null).getMetadata(); @@ -1190,9 +1194,8 @@ private static ColumnChunkMetaData overflowingStringsTestHelper(final Collection .build(); Table stringTable = TableTools.emptyTable(numRows).select(Selectable.from(columns)); final File dest = new File(rootFile + File.separator + "overflowingStringsTest.parquet"); - ParquetTools.writeTable(stringTable, dest, writeInstructions); - Table fromDisk = ParquetTools.readTable(dest).select(); - assertTableEquals(stringTable, fromDisk); + writeTable(stringTable, dest, writeInstructions); + checkSingleTable(stringTable, dest); ParquetMetadata metadata = new ParquetTableLocationKey(dest, 0, null).getMetadata(); ColumnChunkMetaData columnMetadata = metadata.getBlocks().get(0).getColumns().get(0); @@ -1211,13 +1214,12 @@ public void overflowingCodecsTest() { ColumnDefinition.fromGenericType("VariableWidthByteArrayColumn", byte[].class, byte.class); final TableDefinition tableDefinition = TableDefinition.of(columnDefinition); final byte[] byteArray = new byte[pageSize / 2]; - final Table table = TableTools.newTable(tableDefinition, + final Table table = newTable(tableDefinition, TableTools.col("VariableWidthByteArrayColumn", byteArray, byteArray, byteArray)); final File dest = new File(rootFile + File.separator + "overflowingCodecsTest.parquet"); - ParquetTools.writeTable(table, dest, writeInstructions); - Table fromDisk = ParquetTools.readTable(dest).select(); - assertTableEquals(table, fromDisk); + writeTable(table, dest, writeInstructions); + checkSingleTable(table, dest); final ParquetMetadata metadata = new ParquetTableLocationKey(dest, 0, null).getMetadata(); final String metadataStr = metadata.getFileMetaData().getKeyValueMetaData().get("deephaven"); @@ -1238,34 +1240,31 @@ public void readWriteStatisticsTest() { ColumnDefinition.fromGenericType("VariableWidthByteArrayColumn", byte[].class, byte.class); final TableDefinition tableDefinition = TableDefinition.of(columnDefinition); final byte[] byteArray = new byte[] {1, 2, 3, 4, NULL_BYTE, 6, 7, 8, 9, NULL_BYTE, 11, 12, 13}; - final Table simpleTable = TableTools.newTable(tableDefinition, + final Table simpleTable = newTable(tableDefinition, TableTools.col("VariableWidthByteArrayColumn", null, byteArray, byteArray, byteArray, byteArray, byteArray)); final File simpleTableDest = new File(rootFile, "ParquetTest_simple_statistics_test.parquet"); - ParquetTools.writeTable(simpleTable, simpleTableDest); + writeTable(simpleTable, simpleTableDest); - final Table simpleFromDisk = ParquetTools.readTable(simpleTableDest); - TstUtils.assertTableEquals(simpleTable, simpleFromDisk); + checkSingleTable(simpleTable, simpleTableDest); assertTableStatistics(simpleTable, simpleTableDest); // Test flat columns. final Table flatTableToSave = getTableFlat(10_000, true, true); final File flatTableDest = new File(rootFile, "ParquetTest_flat_statistics_test.parquet"); - ParquetTools.writeTable(flatTableToSave, flatTableDest); + writeTable(flatTableToSave, flatTableDest); - final Table flatFromDisk = ParquetTools.readTable(flatTableDest); - TstUtils.assertTableEquals(maybeFixBigDecimal(flatTableToSave), flatFromDisk); + checkSingleTable(maybeFixBigDecimal(flatTableToSave), flatTableDest); assertTableStatistics(flatTableToSave, flatTableDest); // Test nested columns. final Table groupedTableToSave = getGroupedTable(10_000, true); final File groupedTableDest = new File(rootFile, "ParquetTest_grouped_statistics_test.parquet"); - ParquetTools.writeTable(groupedTableToSave, groupedTableDest, groupedTableToSave.getDefinition()); + writeTable(groupedTableToSave, groupedTableDest, groupedTableToSave.getDefinition()); - final Table groupedFromDisk = ParquetTools.readTable(groupedTableDest); - TstUtils.assertTableEquals(groupedTableToSave, groupedFromDisk); + checkSingleTable(groupedTableToSave, groupedTableDest); assertTableStatistics(groupedTableToSave, groupedTableDest); } @@ -1349,7 +1348,7 @@ public void verifyPyArrowStatistics() { final File pyarrowDest = new File(path); final Table pyarrowFromDisk; try { - pyarrowFromDisk = ParquetTools.readTable(pyarrowDest); + pyarrowFromDisk = readSingleFileTable(pyarrowDest, EMPTY); } catch (RuntimeException e) { if (e.getCause() instanceof InvalidParquetFileException) { final String InvalidParquetFileErrorMsgString = "Invalid parquet file detected, please ensure the " + @@ -1364,13 +1363,9 @@ public void verifyPyArrowStatistics() { // Write the table to disk using our code. final File dhDest = new File(rootFile, "ParquetTest_statistics_test.parquet"); - ParquetTools.writeTable(pyarrowFromDisk, dhDest); + writeTable(pyarrowFromDisk, dhDest); - // Read the table back in using our code. - final Table dhFromDisk = ParquetTools.readTable(dhDest); - - // Verify the two tables loaded from disk are equal. - TstUtils.assertTableEquals(pyarrowFromDisk, dhFromDisk); + final Table dhFromDisk = checkSingleTable(pyarrowFromDisk, dhDest); // Run the verification code against DHC writer stats. assertTableStatistics(pyarrowFromDisk, dhDest); @@ -1378,17 +1373,380 @@ public void verifyPyArrowStatistics() { } @Test - public void inferParquetOrderLastKey() { + public void singleTable() { + final File fooSource = new File(rootFile, "singleTable/foo.parquet"); + final File fooBarSource = new File(rootFile, "singleTable/fooBar.parquet"); + final File barSource = new File(rootFile, "singleTable/bar.parquet"); + + final Table foo; + final Table fooBar; + final Table bar; + final Table fooBarNullFoo; + final Table fooBarNullBar; + + final TableDefinition fooDefinition; + final TableDefinition fooBarDefinition; + final TableDefinition barDefinition; + { + fooSource.mkdirs(); + fooBarSource.mkdirs(); + barSource.mkdirs(); + + final ColumnHolder fooCol = intCol("Foo", 1, 2, 3); + final ColumnHolder barCol = stringCol("Bar", "Zip", "Zap", "Zoom"); + + final ColumnHolder nullFooCol = + intCol("Foo", QueryConstants.NULL_INT, QueryConstants.NULL_INT, QueryConstants.NULL_INT); + final ColumnHolder nullBarCol = stringCol("Bar", null, null, null); + + final ColumnDefinition fooColDef = ColumnDefinition.ofInt("Foo"); + final ColumnDefinition barColDef = ColumnDefinition.ofString("Bar"); + + fooDefinition = TableDefinition.of(fooColDef); + fooBarDefinition = TableDefinition.of(fooColDef, barColDef); + barDefinition = TableDefinition.of(barColDef); + + foo = newTable(fooDefinition, fooCol); + fooBar = newTable(fooBarDefinition, fooCol, barCol); + bar = newTable(barDefinition, barCol); + + fooBarNullFoo = newTable(fooBarDefinition, nullFooCol, barCol); + fooBarNullBar = newTable(fooBarDefinition, fooCol, nullBarCol); + + writeTable(foo, fooSource); + writeTable(fooBar, fooBarSource); + writeTable(bar, barSource); + } + + // Infer + { + checkSingleTable(foo, fooSource); + checkSingleTable(fooBar, fooBarSource); + checkSingleTable(bar, barSource); + } + + // readTable inference to readSingleTable + { + assertTableEquals(foo, readTable(fooSource)); + assertTableEquals(fooBar, readTable(fooBarSource)); + assertTableEquals(bar, readTable(barSource)); + } + + // Explicit + { + assertTableEquals(foo, readSingleFileTable(fooSource, EMPTY, fooDefinition)); + assertTableEquals(fooBar, readSingleFileTable(fooBarSource, EMPTY, fooBarDefinition)); + assertTableEquals(bar, readSingleFileTable(barSource, EMPTY, barDefinition)); + } + + // Explicit subset + { + // fooBar as foo + assertTableEquals(foo, readSingleFileTable(fooBarSource, EMPTY, fooDefinition)); + // fooBar as bar + assertTableEquals(bar, readSingleFileTable(fooBarSource, EMPTY, barDefinition)); + } + + // Explicit superset + { + // foo as fooBar + assertTableEquals(fooBarNullBar, readSingleFileTable(fooSource, EMPTY, fooBarDefinition)); + // bar as fooBar + assertTableEquals(fooBarNullFoo, readSingleFileTable(barSource, EMPTY, fooBarDefinition)); + } + + // No refreshing single table support + { + try { + readSingleFileTable(fooSource, REFRESHING); + fail("Expected IllegalArgumentException"); + } catch (IllegalArgumentException e) { + assertEquals("Unable to have a refreshing single parquet file", e.getMessage()); + } + + try { + readSingleFileTable(fooSource, REFRESHING, fooDefinition); + fail("Expected IllegalArgumentException"); + } catch (IllegalArgumentException e) { + assertEquals("Unable to have a refreshing single parquet file", e.getMessage()); + } + } + } + + @Test + public void flatPartitionedTable() { // Create an empty parent directory - final File parentDir = new File(rootFile, "inferParquetOrder"); - parentDir.mkdir(); - final TableDefinition td1 = TableDefinition.of(ColumnDefinition.ofInt("Foo")); - final TableDefinition td2 = - TableDefinition.of(ColumnDefinition.ofInt("Foo"), ColumnDefinition.ofString("Bar")); - ParquetTools.writeTable(TableTools.newTable(td1), new File(parentDir, "01.parquet")); - ParquetTools.writeTable(TableTools.newTable(td2), new File(parentDir, "02.parquet")); - final Table table = ParquetTools.readTable(parentDir); - assertEquals(td2, table.getDefinition()); + final File source = new File(rootFile, "flatPartitionedTable/source"); + final File emptySource = new File(rootFile, "flatPartitionedTable/emptySource"); + + final Table formerData; + final Table latterData; + final TableDefinition formerDefinition; + final TableDefinition latterDefinition; + final Runnable writeIntoEmptySource; + { + final File p1File = new File(source, "01.parquet"); + final File p2File = new File(source, "02.parquet"); + + final File p1FileEmpty = new File(emptySource, "01.parquet"); + final File p2FileEmpty = new File(emptySource, "02.parquet"); + + p1File.mkdirs(); + p2File.mkdirs(); + emptySource.mkdirs(); + + final ColumnHolder foo1 = intCol("Foo", 1, 2, 3); + final ColumnHolder foo2 = intCol("Foo", 4, 5); + + final ColumnHolder bar1 = stringCol("Bar", null, null, null); + final ColumnHolder bar2 = stringCol("Bar", "Zip", "Zap"); + + final Table p1 = newTable(foo1); + final Table p2 = newTable(foo2, bar2); + writeTable(p1, p1File); + writeTable(p2, p2File); + writeIntoEmptySource = () -> { + p1FileEmpty.mkdirs(); + p2FileEmpty.mkdirs(); + writeTable(p1, p1FileEmpty); + writeTable(p2, p2FileEmpty); + }; + + final ColumnDefinition foo = ColumnDefinition.ofInt("Foo"); + final ColumnDefinition bar = ColumnDefinition.ofString("Bar"); + + formerDefinition = TableDefinition.of(foo); + latterDefinition = TableDefinition.of(foo, bar); + + formerData = merge( + newTable(formerDefinition, foo1), + newTable(formerDefinition, foo2)); + latterData = merge( + newTable(latterDefinition, foo1, bar1), + newTable(latterDefinition, foo2, bar2)); + } + + // Infer from last key + { + final Table table = readFlatPartitionedTable(source, EMPTY); + assertTableEquals(latterData, table); + } + // Infer from last key, refreshing + { + final Table table = readFlatPartitionedTable(source, REFRESHING); + assertTableEquals(latterData, table); + } + // readTable inference to readFlatPartitionedTable + { + assertTableEquals(latterData, readTable(source)); + } + + // Explicit latter definition + { + final Table table = readFlatPartitionedTable(source, EMPTY, latterDefinition); + assertTableEquals(latterData, table); + } + // Explicit latter definition, refreshing + { + final Table table = readFlatPartitionedTable(source, REFRESHING, latterDefinition); + assertTableEquals(latterData, table); + } + + // Explicit former definition + { + final Table table = readFlatPartitionedTable(source, EMPTY, formerDefinition); + assertTableEquals(formerData, table); + } + // Explicit former definition, refreshing + { + final Table table = readFlatPartitionedTable(source, REFRESHING, formerDefinition); + assertTableEquals(formerData, table); + } + + // Explicit definition, empty directory + { + final Table table = readFlatPartitionedTable(emptySource, EMPTY, latterDefinition); + assertTableEquals(TableTools.newTable(latterDefinition), table); + } + // Explicit definition, empty directory, refreshing with new data added + { + final Table table = readFlatPartitionedTable(emptySource, REFRESHING, latterDefinition); + assertTableEquals(TableTools.newTable(latterDefinition), table); + + writeIntoEmptySource.run(); + ExecutionContext.getContext().getUpdateGraph().cast().runWithinUnitTestCycle(() -> { + // This is not generally a good way to do this sort of testing. Ideally, we'd be a bit smarter and use + // a test-driven TableDataRefreshService.getSharedRefreshService. + ((SourceTable) table).tableLocationProvider().refresh(); + ((SourceTable) table).refresh(); + assertTableEquals(latterData, table); + }); + } + } + + @Test + public void keyValuePartitionedTable() { + final File source = new File(rootFile, "keyValuePartitionedTable/source"); + final File emptySource = new File(rootFile, "keyValuePartitionedTable/emptySource"); + + final Table formerData; + final Table latterData; + final TableDefinition formerDefinition; + final TableDefinition latterDefinition; + final Runnable writeIntoEmptySource; + { + final File p1File = new File(source, "Partition=1/z.parquet"); + final File p2File = new File(source, "Partition=2/a.parquet"); + + final File p1FileEmpty = new File(emptySource, "Partition=1/z.parquet"); + final File p2FileEmpty = new File(emptySource, "Partition=2/a.parquet"); + + p1File.mkdirs(); + p2File.mkdirs(); + emptySource.mkdirs(); + + final ColumnHolder part1 = intCol("Partition", 1, 1, 1); + final ColumnHolder part2 = intCol("Partition", 2, 2); + + final ColumnHolder foo1 = intCol("Foo", 1, 2, 3); + final ColumnHolder foo2 = intCol("Foo", 4, 5); + + final ColumnHolder bar1 = stringCol("Bar", null, null, null); + final ColumnHolder bar2 = stringCol("Bar", "Zip", "Zap"); + + final Table p1 = newTable(foo1); + final Table p2 = newTable(foo2, bar2); + writeTable(p1, p1File); + writeTable(p2, p2File); + writeIntoEmptySource = () -> { + p1FileEmpty.mkdirs(); + p2FileEmpty.mkdirs(); + writeTable(p1, p1FileEmpty); + writeTable(p2, p2FileEmpty); + }; + + // Need to be explicit w/ definition so partitioning column applied to expected tables + final ColumnDefinition partition = ColumnDefinition.ofInt("Partition").withPartitioning(); + final ColumnDefinition foo = ColumnDefinition.ofInt("Foo"); + final ColumnDefinition bar = ColumnDefinition.ofString("Bar"); + + // Note: merge does not preserve partition column designation, so we need to explicitly create them + formerDefinition = TableDefinition.of(partition, foo); + latterDefinition = TableDefinition.of(partition, foo, bar); + + formerData = merge( + newTable(formerDefinition, part1, foo1), + newTable(formerDefinition, part2, foo2)); + latterData = merge( + newTable(latterDefinition, part1, foo1, bar1), + newTable(latterDefinition, part2, foo2, bar2)); + } + + // Infer from last key + { + final Table table = readKeyValuePartitionedTable(source, EMPTY); + assertTableEquals(latterData, table); + } + // Infer from last key, refreshing + { + final Table table = readKeyValuePartitionedTable(source, REFRESHING); + assertTableEquals(latterData, table); + } + // readTable inference readKeyValuePartitionedTable + { + assertTableEquals(latterData, readTable(source)); + } + + // Explicit latter definition + { + final Table table = readKeyValuePartitionedTable(source, EMPTY, latterDefinition); + assertTableEquals(latterData, table); + } + // Explicit latter definition, refreshing + { + final Table table = readKeyValuePartitionedTable(source, REFRESHING, latterDefinition); + assertTableEquals(latterData, table); + } + + // Explicit former definition + { + final Table table = readKeyValuePartitionedTable(source, EMPTY, formerDefinition); + assertTableEquals(formerData, table); + } + // Explicit former definition, refreshing + { + final Table table = readKeyValuePartitionedTable(source, REFRESHING, formerDefinition); + assertTableEquals(formerData, table); + } + + // Explicit definition, empty directory + { + final Table table = readKeyValuePartitionedTable(emptySource, EMPTY, latterDefinition); + assertTableEquals(TableTools.newTable(latterDefinition), table); + } + // Explicit definition, empty directory, refreshing with new data added + { + final Table table = readKeyValuePartitionedTable(emptySource, REFRESHING, latterDefinition); + assertTableEquals(TableTools.newTable(latterDefinition), table); + + writeIntoEmptySource.run(); + ExecutionContext.getContext().getUpdateGraph().cast().runWithinUnitTestCycle(() -> { + // This is not generally a good way to do this sort of testing. Ideally, we'd be a bit smarter and use + // a test-driven TableDataRefreshService.getSharedRefreshService. + ((SourceTable) table).tableLocationProvider().refresh(); + ((SourceTable) table).refresh(); + assertTableEquals(latterData, table); + }); + } + } + + @Test + public void readSingleColumn() { + final File file = new File(rootFile, "readSingleColumn.parquet"); + final Table primitives = newTable( + booleanCol("Bool", null, true), + charCol("Char", NULL_CHAR, (char) 42), + byteCol("Byte", NULL_BYTE, (byte) 42), + shortCol("Short", NULL_SHORT, (short) 42), + intCol("Int", NULL_INT, 42), + longCol("Long", NULL_LONG, 42L), + floatCol("Float", NULL_FLOAT, 42.0f), + doubleCol("Double", NULL_DOUBLE, 42.0), + stringCol("String", null, "42"), + instantCol("Instant", null, Instant.ofEpochMilli(42))); + { + writeTable(primitives, file); + } + assertTableEquals( + primitives.view("Bool"), + readSingleFileTable(file, EMPTY, TableDefinition.of(ColumnDefinition.ofBoolean("Bool")))); + assertTableEquals( + primitives.view("Char"), + readSingleFileTable(file, EMPTY, TableDefinition.of(ColumnDefinition.ofChar("Char")))); + assertTableEquals( + primitives.view("Byte"), + readSingleFileTable(file, EMPTY, TableDefinition.of(ColumnDefinition.ofByte("Byte")))); + assertTableEquals( + primitives.view("Short"), + readSingleFileTable(file, EMPTY, TableDefinition.of(ColumnDefinition.ofShort("Short")))); + assertTableEquals( + primitives.view("Int"), + readSingleFileTable(file, EMPTY, TableDefinition.of(ColumnDefinition.ofInt("Int")))); + assertTableEquals( + primitives.view("Long"), + readSingleFileTable(file, EMPTY, TableDefinition.of(ColumnDefinition.ofLong("Long")))); + assertTableEquals( + primitives.view("Float"), + readSingleFileTable(file, EMPTY, TableDefinition.of(ColumnDefinition.ofFloat("Float")))); + assertTableEquals( + primitives.view("Double"), + readSingleFileTable(file, EMPTY, TableDefinition.of(ColumnDefinition.ofDouble("Double")))); + assertTableEquals( + primitives.view("String"), + readSingleFileTable(file, EMPTY, TableDefinition.of(ColumnDefinition.ofString("String")))); + assertTableEquals( + primitives.view("Instant"), + readSingleFileTable(file, EMPTY, TableDefinition.of(ColumnDefinition.ofTime("Instant")))); } private void assertTableStatistics(Table inputTable, File dest) { @@ -2717,4 +3075,18 @@ private void assertBigIntegerColumnStatistics(SerialObjectColumnIterator Optional[jpy.JType]: + if table_definition is None: + return None + elif isinstance(table_definition, Dict): + return _JTableDefinition.of( + [ + Column(name=name, data_type=dtype).j_column_definition + for name, dtype in table_definition.items() + ] + ) + elif isinstance(table_definition, List): + return _JTableDefinition.of( + [col.j_column_definition for col in table_definition] + ) + else: + raise DHError(f"Unexpected table_definition type: {type(table_definition)}") + +class ParquetFileLayout(Enum): + """ The parquet file layout. """ + + SINGLE_FILE = 1 + """ A single parquet file. """ + + FLAT_PARTITIONED = 2 + """ A single directory of parquet files. """ + + KV_PARTITIONED = 3 + """ A key-value directory partitioning of parquet files. """ + + METADATA_PARTITIONED = 4 + """ A directory containing a _metadata parquet file and an optional _common_metadata parquet file. """ + def read( path: str, - col_instructions: List[ColumnInstruction] = None, + col_instructions: Optional[List[ColumnInstruction]] = None, is_legacy_parquet: bool = False, is_refreshing: bool = False, + file_layout: Optional[ParquetFileLayout] = None, + table_definition: Union[Dict[str, DType], List[Column], None] = None, ) -> Table: """ Reads in a table from a single parquet, metadata file, or directory with recognized layout. Args: path (str): the file or directory to examine - col_instructions (List[ColumnInstruction]): instructions for customizations while reading + col_instructions (Optional[List[ColumnInstruction]]): instructions for customizations while reading, None by + default. is_legacy_parquet (bool): if the parquet data is legacy is_refreshing (bool): if the parquet data represents a refreshing source - + file_layout (Optional[ParquetFileLayout]): the parquet file layout, by default None. When None, the layout is + inferred. + table_definition (Union[Dict[str, DType], List[Column], None]): the table definition, by default None. When None, + the definition is inferred from the parquet file(s). Setting a definition guarantees the returned table will + have that definition. This is useful for bootstrapping purposes when the initial partitioned directory is + empty and is_refreshing=True. It is also useful for specifying a subset of the parquet definition. When set, + file_layout must also be set. Returns: a table @@ -113,12 +160,36 @@ def read( is_legacy_parquet=is_legacy_parquet, is_refreshing=is_refreshing, for_read=True, + force_build=True, ) - - if read_instructions: - return Table(j_table=_JParquetTools.readTable(path, read_instructions)) + j_table_definition = _j_table_definition(table_definition) + if j_table_definition is not None: + if not file_layout: + raise DHError("Must provide file_layout when table_definition is set") + if file_layout == ParquetFileLayout.SINGLE_FILE: + j_table = _JParquetTools.readSingleFileTable(_JFile(path), read_instructions, j_table_definition) + elif file_layout == ParquetFileLayout.FLAT_PARTITIONED: + j_table = _JParquetTools.readFlatPartitionedTable(_JFile(path), read_instructions, j_table_definition) + elif file_layout == ParquetFileLayout.KV_PARTITIONED: + j_table = _JParquetTools.readKeyValuePartitionedTable(_JFile(path), read_instructions, j_table_definition) + elif file_layout == ParquetFileLayout.METADATA_PARTITIONED: + raise DHError(f"file_layout={ParquetFileLayout.METADATA_PARTITIONED} with table_definition not currently supported") + else: + raise DHError(f"Invalid parquet file_layout '{file_layout}'") else: - return Table(j_table=_JParquetTools.readTable(path)) + if not file_layout: + j_table = _JParquetTools.readTable(path, read_instructions) + elif file_layout == ParquetFileLayout.SINGLE_FILE: + j_table = _JParquetTools.readSingleFileTable(_JFile(path), read_instructions) + elif file_layout == ParquetFileLayout.FLAT_PARTITIONED: + j_table = _JParquetTools.readFlatPartitionedTable(_JFile(path), read_instructions) + elif file_layout == ParquetFileLayout.KV_PARTITIONED: + j_table = _JParquetTools.readKeyValuePartitionedTable(_JFile(path), read_instructions) + elif file_layout == ParquetFileLayout.METADATA_PARTITIONED: + j_table = _JParquetTools.readPartitionedTableWithMetadata(_JFile(path), read_instructions) + else: + raise DHError(f"Invalid parquet file_layout '{file_layout}'") + return Table(j_table=j_table) except Exception as e: raise DHError(e, "failed to read parquet data.") from e diff --git a/py/server/tests/test_parquet.py b/py/server/tests/test_parquet.py index 36c70515000..56cce45957a 100644 --- a/py/server/tests/test_parquet.py +++ b/py/server/tests/test_parquet.py @@ -12,25 +12,23 @@ from deephaven import DHError, empty_table, dtypes, new_table from deephaven import arrow as dharrow -from deephaven.column import InputColumn +from deephaven.column import InputColumn, Column, ColumnType from deephaven.pandas import to_pandas, to_table -from deephaven.parquet import write, batch_write, read, delete, ColumnInstruction +from deephaven.parquet import write, batch_write, read, delete, ColumnInstruction, ParquetFileLayout from tests.testbase import BaseTestCase class ParquetTestCase(BaseTestCase): """ Test cases for the deephaven.ParquetTools module (performed locally) """ - @classmethod - def setUpClass(cls): - super().setUpClass() + def setUp(self): + super().setUp() # define a junk table workspace directory - cls.temp_dir = tempfile.TemporaryDirectory() + self.temp_dir = tempfile.TemporaryDirectory() - @classmethod - def tearDownClass(cls): - cls.temp_dir.cleanup() - super().tearDownClass() + def tearDown(self): + self.temp_dir.cleanup() + super().tearDown() def test_crd(self): """ Test suite for reading, writing, and deleting a table to disk """ @@ -51,7 +49,7 @@ def test_crd(self): with self.subTest(msg="write_table(Table, str)"): write(table, file_location) self.assertTrue(os.path.exists(file_location)) - table2 = read(file_location) + table2 = read(file_location, file_layout=ParquetFileLayout.SINGLE_FILE) self.assert_table_equals(table, table2) shutil.rmtree(base_dir) @@ -59,7 +57,7 @@ def test_crd(self): batch_write([table, table], [file_location, file_location2], definition) self.assertTrue(os.path.exists(file_location)) self.assertTrue(os.path.exists(file_location2)) - table2 = read(file_location) + table2 = read(file_location, file_layout=ParquetFileLayout.SINGLE_FILE) self.assert_table_equals(table, table2) # Delete @@ -114,7 +112,7 @@ def test_crd_with_instructions(self): # Reading with self.subTest(msg="read_table(str)"): - table2 = read(path=file_location, col_instructions=[col_inst, col_inst1]) + table2 = read(path=file_location, col_instructions=[col_inst, col_inst1], file_layout=ParquetFileLayout.SINGLE_FILE) self.assert_table_equals(table, table2) # Delete @@ -141,7 +139,7 @@ def test_big_decimal(self): shutil.rmtree(file_location) write(table, file_location) - table2 = read(file_location) + table2 = read(file_location, file_layout=ParquetFileLayout.SINGLE_FILE) self.assertEqual(table.size, table2.size) self.assert_table_equals(table, table2) @@ -158,7 +156,7 @@ def test_int96_timestamps(self): dataframe = to_pandas(dh_table) table = pyarrow.Table.from_pandas(dataframe) pyarrow.parquet.write_table(table, 'data_from_pa.parquet', use_deprecated_int96_timestamps=True) - from_disk_int96 = read('data_from_pa.parquet') + from_disk_int96 = read('data_from_pa.parquet', file_layout=ParquetFileLayout.SINGLE_FILE) self.assert_table_equals(dh_table, from_disk_int96) # Read the parquet file as a pandas dataframe, and ensure all values are written as null @@ -168,7 +166,7 @@ def test_int96_timestamps(self): # Write the timestamps as int64 using deephaven writing code and compare with int96 table write(dh_table, "data_from_dh.parquet") - from_disk_int64 = read('data_from_dh.parquet') + from_disk_int64 = read('data_from_dh.parquet', file_layout=ParquetFileLayout.SINGLE_FILE) self.assert_table_equals(from_disk_int64, from_disk_int96) def get_table_data(self): @@ -261,7 +259,7 @@ def round_trip_with_compression(self, compression_codec_name, dh_table, vector_c write(dh_table, "data_from_dh.parquet", compression_codec_name=compression_codec_name) # Read the parquet file using deephaven.parquet and compare - result_table = read('data_from_dh.parquet') + result_table = read('data_from_dh.parquet', file_layout=ParquetFileLayout.SINGLE_FILE) self.assert_table_equals(dh_table, result_table) # LZO is not fully supported in pyarrow, so we can't do the rest of the tests @@ -296,14 +294,14 @@ def round_trip_with_compression(self, compression_codec_name, dh_table, vector_c compression=None if compression_codec_name == 'UNCOMPRESSED' else "LZ4" if compression_codec_name == 'LZ4_RAW' or compression_codec_name == 'LZ4RAW' else compression_codec_name) - result_table = read('data_from_pandas.parquet') + result_table = read('data_from_pandas.parquet', file_layout=ParquetFileLayout.SINGLE_FILE) self.assert_table_equals(dh_table, result_table) # dh->dataframe (via pyarrow)->parquet->dh # TODO(deephaven-core#3149) disable for now, since to_pandas results in "None" strings instead of None values # dataframe = to_pandas(dh_table) # dataframe.to_parquet('data_from_pandas.parquet', compression=None if compression_codec_name is 'UNCOMPRESSED' else compression_codec_name) - # result_table = read('data_from_pandas.parquet') + # result_table = read('data_from_pandas.parquet', file_layout=ParquetFileLayout.SINGLE_FILE) # self.assert_table_equals(dh_table, result_table) def test_writing_lists_via_pyarrow(self): @@ -312,7 +310,7 @@ def test_writing_lists_via_pyarrow(self): pa_table = pyarrow.table({'numList': [[2, 2, 4]], 'stringList': [["Flamingo", "Parrot", "Dog"]]}) pyarrow.parquet.write_table(pa_table, 'data_from_pa.parquet') - from_disk = read('data_from_pa.parquet').select() + from_disk = read('data_from_pa.parquet', file_layout=ParquetFileLayout.SINGLE_FILE).select() pa_table_from_disk = dharrow.to_arrow(from_disk) self.assertTrue(pa_table.equals(pa_table_from_disk)) @@ -324,7 +322,7 @@ def test_dictionary_encoding(self): ]) # Force "longStringColumn" to use non-dictionary encoding write(dh_table, "data_from_dh.parquet", max_dictionary_size=100) - from_disk = read('data_from_dh.parquet') + from_disk = read('data_from_dh.parquet', file_layout=ParquetFileLayout.SINGLE_FILE) self.assert_table_equals(dh_table, from_disk) metadata = pyarrow.parquet.read_metadata("data_from_dh.parquet") @@ -344,7 +342,7 @@ def test_dates_and_time(self): ]) write(dh_table, "data_from_dh.parquet", compression_codec_name="SNAPPY") - from_disk = read('data_from_dh.parquet') + from_disk = read('data_from_dh.parquet', file_layout=ParquetFileLayout.SINGLE_FILE) self.assert_table_equals(dh_table, from_disk) # TODO dtype_backend=None is a workaround until https://github.com/deephaven/deephaven-core/issues/4823 is fixed @@ -366,7 +364,7 @@ def test_dates_and_time(self): # Rewrite the dataframe back to parquet using pyarrow and read it back using deephaven.parquet to compare df_from_pandas.to_parquet('data_from_pandas.parquet', compression='SNAPPY') - from_disk_pandas = read('data_from_pandas.parquet') + from_disk_pandas = read('data_from_pandas.parquet', file_layout=ParquetFileLayout.SINGLE_FILE) # Compare only the non-null columns because null columns are written as different logical types by pandas and # deephaven @@ -384,7 +382,7 @@ def test_time_with_different_units(self): def time_test_helper(pa_table, new_schema, dest): # Write the provided pyarrow table type-casted to the new schema pyarrow.parquet.write_table(pa_table.cast(new_schema), dest) - from_disk = read(dest) + from_disk = read(dest, file_layout=ParquetFileLayout.SINGLE_FILE) # TODO dtype_backend=None is a workaround until https://github.com/deephaven/deephaven-core/issues/4823 is fixed df_from_disk = to_pandas(from_disk, dtype_backend=None) @@ -420,7 +418,7 @@ def timestamp_test_helper(pa_table, new_schema, dest): if "isAdjustedToUTC=false" not in str(metadata.row_group(0).column(0)): self.fail("isAdjustedToUTC is not set to false") # Read the parquet file back using deephaven and write it back - dh_table_from_disk = read(dest) + dh_table_from_disk = read(dest, file_layout=ParquetFileLayout.SINGLE_FILE) dh_dest = "dh_" + dest write(dh_table_from_disk, dh_dest) # Read the new parquet file using pyarrow and compare against original table @@ -436,6 +434,124 @@ def timestamp_test_helper(pa_table, new_schema, dest): schema_msec = table.schema.set(0, pyarrow.field('f', pyarrow.timestamp('ms'))) timestamp_test_helper(table, schema_msec, 'timestamp_test_msec.parquet') + def test_read_single_file(self): + table = empty_table(3).update( + formulas=["x=i", "y=(double)(i/10.0)", "z=(double)(i*i)"] + ) + single_parquet = os.path.join(self.temp_dir.name, "single.parquet") + write(table, single_parquet) + + with self.subTest(msg="read infer single file infer definition"): + actual = read(single_parquet) + self.assert_table_equals(actual, table) + + with self.subTest(msg="read single file infer definition"): + actual = read(single_parquet, file_layout=ParquetFileLayout.SINGLE_FILE) + self.assert_table_equals(actual, table) + + with self.subTest(msg="read single file"): + actual = read( + single_parquet, + table_definition={ + "x": dtypes.int32, + "y": dtypes.double, + "z": dtypes.double, + }, + file_layout=ParquetFileLayout.SINGLE_FILE, + ) + self.assert_table_equals(actual, table) + + def test_read_flat_partitioned(self): + table = empty_table(6).update( + formulas=["x=i", "y=(double)(i/10.0)", "z=(double)(i*i)"] + ) + flat_dir = self.temp_dir.name + f1_parquet = os.path.join(flat_dir, "f1.parquet") + f2_parquet = os.path.join(flat_dir, "f2.parquet") + + write(table.head(3), f1_parquet) + write(table.tail(3), f2_parquet) + + with self.subTest(msg="read infer flat infer definition"): + actual = read(flat_dir) + self.assert_table_equals(actual, table) + + with self.subTest(msg="read flat infer definition"): + actual = read(flat_dir, file_layout=ParquetFileLayout.FLAT_PARTITIONED) + self.assert_table_equals(actual, table) + + with self.subTest(msg="read flat"): + actual = read( + flat_dir, + table_definition={ + "x": dtypes.int32, + "y": dtypes.double, + "z": dtypes.double, + }, + file_layout=ParquetFileLayout.FLAT_PARTITIONED, + ) + self.assert_table_equals(actual, table) + + def test_read_kv_partitioned(self): + table = empty_table(6).update( + formulas=[ + "Partition=(int)(i/3)", + "x=i", + "y=(double)(i/10.0)", + "z=(double)(i*i)", + ] + ) + kv_dir = self.temp_dir.name + p0_dir = os.path.join(kv_dir, "Partition=0") + p1_dir = os.path.join(kv_dir, "Partition=1") + os.mkdir(p0_dir) + os.mkdir(p1_dir) + f1_parquet = os.path.join(p0_dir, "f1.parquet") + f2_parquet = os.path.join(p1_dir, "f2.parquet") + + write(table.head(3).drop_columns(["Partition"]), f1_parquet) + write(table.tail(3).drop_columns(["Partition"]), f2_parquet) + + with self.subTest(msg="read infer kv infer definition"): + actual = read(kv_dir) + self.assert_table_equals(actual, table) + + with self.subTest(msg="read kv infer definition"): + actual = read(kv_dir, file_layout=ParquetFileLayout.KV_PARTITIONED) + self.assert_table_equals(actual, table) + + with self.subTest(msg="read kv"): + actual = read( + kv_dir, + table_definition=[ + Column( + "Partition", dtypes.int32, column_type=ColumnType.PARTITIONING + ), + Column("x", dtypes.int32), + Column("y", dtypes.double), + Column("z", dtypes.double), + ], + file_layout=ParquetFileLayout.KV_PARTITIONED, + ) + self.assert_table_equals(actual, table) + + def test_read_with_table_definition_no_type(self): + # no need to write actual file, shouldn't be reading it + fake_parquet = os.path.join(self.temp_dir.name, "fake.parquet") + with self.subTest(msg="read definition no type"): + with self.assertRaises(DHError) as cm: + read( + fake_parquet, + table_definition={ + "x": dtypes.int32, + "y": dtypes.double, + "z": dtypes.double, + }, + ) + self.assertIn( + "Must provide file_layout when table_definition is set", str(cm.exception) + ) + if __name__ == '__main__': unittest.main()