diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReader.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReader.java index 60ca30629a7..e5727920612 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReader.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReader.java @@ -13,11 +13,6 @@ import java.util.function.Supplier; public interface ColumnChunkReader { - /** - * @return -1 if the current column doesn't guarantee fixed page size, otherwise the fixed page size - */ - int getPageFixedSize(); - /** * @return The number of rows in this ColumnChunk, or -1 if it's unknown. */ @@ -42,22 +37,28 @@ public interface ColumnChunkReader { interface ColumnPageReaderIterator extends Iterator, AutoCloseable { @Override - void close() throws IOException; + void close(); + } + /** + * @return An iterator over individual parquet pages + */ + ColumnPageReaderIterator getPageIterator() throws IOException; + + interface ColumnPageDirectAccessor extends AutoCloseable { /** - * Directly access a page reader for a given page number. This is an optional method that may not be - * implemented. Note that the user should either use {@link Iterator} methods or this method, but not both. + * Directly access a page reader for a given page number. */ - @Nullable - default ColumnPageReader getPageReader(final int pageNum) { - return null; - }; + ColumnPageReader getPageReader(final int pageNum); + + @Override + void close(); } /** - * @return An iterator over individual parquet pages + * @return An accessor for individual parquet pages */ - ColumnPageReaderIterator getPageIterator() throws IOException; + ColumnPageDirectAccessor getPageAccessor(); /** * @return Whether this column chunk uses a dictionary-based encoding on every page diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java index 9de387e9600..69136cf08e0 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java @@ -28,6 +28,7 @@ import java.nio.channels.SeekableByteChannel; import java.nio.file.Path; import java.util.List; +import java.util.NoSuchElementException; import java.util.function.Supplier; import static org.apache.parquet.format.Encoding.PLAIN_DICTIONARY; @@ -77,11 +78,6 @@ public class ColumnChunkReaderImpl implements ColumnChunkReader { this.version = version; } - @Override - public int getPageFixedSize() { - return -1; - } - @Override public long numRows() { return numRows; @@ -112,6 +108,14 @@ public ColumnPageReaderIterator getPageIterator() { } } + @Override + public final ColumnPageDirectAccessor getPageAccessor() { + if (offsetIndex == null) { + throw new UnsupportedOperationException("Cannot use direct accessor without offset index"); + } + return new ColumnPageDirectAccessorImpl(path, channelsProvider); + } + private Path getFilePath() { if (filePath != null) { return filePath; @@ -306,23 +310,39 @@ public boolean hasNext() { @Override public ColumnPageReader next() { if (!hasNext()) { - throw new RuntimeException("No next element"); + throw new NoSuchElementException("No next element"); } - int rowCount = - (int) (offsetIndex.getLastRowIndex(pos, columnChunk.getMeta_data().getNum_values()) - - offsetIndex.getFirstRowIndex(pos) + 1); + // Following logic assumes that offsetIndex will store the number of values for a page instead of number + // of rows (which can be different for array and vector columns). This behavior is because of a bug on + // parquet writing side which got fixed in deephaven-core/pull/4844 and is only kept to support reading + // parquet files written before deephaven-core/pull/4844. + final int numValues = (int) (offsetIndex.getLastRowIndex(pos, columnChunk.getMeta_data().getNum_values()) + - offsetIndex.getFirstRowIndex(pos) + 1); ColumnPageReaderImpl columnPageReader = new ColumnPageReaderImpl(channelsProvider, decompressor, dictionarySupplier, nullMaterializerFactory, path, getFilePath(), fieldTypes, offsetIndex.getOffset(pos), null, - rowCount); + numValues); pos++; return columnPageReader; } + @Override + public void close() {} + } + + class ColumnPageDirectAccessorImpl implements ColumnPageDirectAccessor { + private final SeekableChannelsProvider channelsProvider; + private final ColumnDescriptor path; + + ColumnPageDirectAccessorImpl(final ColumnDescriptor path, final SeekableChannelsProvider channelsProvider) { + this.path = path; + this.channelsProvider = channelsProvider; + } + @Override public ColumnPageReader getPageReader(final int pageNum) { if (pageNum > offsetIndex.getPageCount()) { - throw new RuntimeException( + throw new NoSuchElementException( "pageNum=" + pageNum + " > offsetIndex.getPageCount()=" + offsetIndex.getPageCount()); } final int numValues = -1; // Will be populated properly when we read the page header @@ -331,6 +351,6 @@ public ColumnPageReader getPageReader(final int pageNum) { } @Override - public void close() throws IOException {} + public void close() {} } } diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/ColumnChunkPageStore.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/ColumnChunkPageStore.java index 7a7039a294f..15c0a56e7f3 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/ColumnChunkPageStore.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/ColumnChunkPageStore.java @@ -22,7 +22,6 @@ import org.jetbrains.annotations.VisibleForTesting; import java.io.IOException; -import java.io.UncheckedIOException; import java.util.function.Supplier; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -36,7 +35,6 @@ public abstract class ColumnChunkPageStore private final ToPage toPage; private final long numRows; - final ColumnChunkReader.ColumnPageReaderIterator columnPageReaderIterator; public static class CreatorResult { @@ -75,7 +73,7 @@ private static boolean canUseOffsetIndexBasedPageStore(@NotNull final ColumnChun private static final Pattern VERSION_PATTERN = Pattern.compile("(\\d+)\\.(\\d+)\\.(\\d+)"); /** - * Check if the version is greater than 0.31.0 + * Check if the version is greater than or equal to 0.31.0 */ @VisibleForTesting public static boolean satisfiesMinimumVersionRequirements(@NotNull final String version) { @@ -123,7 +121,6 @@ public static CreatorResult create( this.toPage = toPage; this.numRows = Require.inRange(columnChunkReader.numRows(), "numRows", mask, "mask"); - this.columnPageReaderIterator = columnChunkReader.getPageIterator(); } ChunkPage toPage(final long offset, @NotNull final ColumnPageReader columnPageReader) @@ -170,11 +167,5 @@ public boolean usesDictionaryOnEveryPage() { } @Override - public void close() { - try { - columnPageReaderIterator.close(); - } catch (IOException except) { - throw new UncheckedIOException(except); - } - } + public void close() {} } diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/FixedPageSizeColumnChunkPageStore.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/FixedPageSizeColumnChunkPageStore.java deleted file mode 100644 index 77703a4e6c2..00000000000 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/FixedPageSizeColumnChunkPageStore.java +++ /dev/null @@ -1,99 +0,0 @@ -/** - * Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending - */ -package io.deephaven.parquet.table.pagestore; - -import io.deephaven.base.verify.Assert; -import io.deephaven.base.verify.Require; -import io.deephaven.chunk.attributes.Any; -import io.deephaven.engine.page.ChunkPage; -import io.deephaven.parquet.table.pagestore.topage.ToPage; -import io.deephaven.parquet.base.ColumnChunkReader; -import io.deephaven.parquet.base.ColumnPageReader; -import org.jetbrains.annotations.NotNull; - -import java.io.IOException; -import java.io.UncheckedIOException; -import java.lang.ref.WeakReference; -import java.util.Arrays; - -/** - * Unused class, use {@link OffsetIndexBasedColumnChunkPageStore} instead because to check if the page sizes are fixed - * without actually reading the page headers, we need offset index. - */ -class FixedPageSizeColumnChunkPageStore extends ColumnChunkPageStore { - - private final int pageFixedSize; - private volatile int numPages = 0; - private final ColumnPageReader[] columnPageReaders; - private final WeakReference>[] pages; - - FixedPageSizeColumnChunkPageStore(@NotNull final PageCache pageCache, - @NotNull final ColumnChunkReader columnChunkReader, - final long mask, - @NotNull final ToPage toPage) throws IOException { - super(pageCache, columnChunkReader, mask, toPage); - - this.pageFixedSize = columnChunkReader.getPageFixedSize(); - - Require.gtZero(pageFixedSize, "pageFixedSize"); - - final int numPages = Math.toIntExact((numRows() - 1) / pageFixedSize + 1); - this.columnPageReaders = new ColumnPageReader[numPages]; - - // noinspection unchecked - this.pages = (WeakReference>[]) new WeakReference[numPages]; - Arrays.fill(pages, PageCache.getNullPage()); - } - - private void fillToPage(final int pageNum) { - - while (numPages <= pageNum) { - synchronized (this) { - if (numPages <= pageNum) { - Assert.assertion(columnPageReaderIterator.hasNext(), - "columnPageReaderIterator.hasNext()", - "Parquet fixed page size and page iterator don't match, not enough pages."); - columnPageReaders[numPages++] = columnPageReaderIterator.next(); - } - } - } - } - - private ChunkPage getPage(final int pageNum) { - PageCache.IntrusivePage page = pages[pageNum].get(); - - if (page == null) { - synchronized (columnPageReaders[pageNum]) { - page = pages[pageNum].get(); - - if (page == null) { - try { - page = new PageCache.IntrusivePage<>( - toPage((long) pageNum * pageFixedSize, columnPageReaders[pageNum])); - } catch (IOException except) { - throw new UncheckedIOException(except); - } - - pages[pageNum] = new WeakReference<>(page); - } - } - } - - pageCache.touch(page); - return page.getPage(); - } - - @Override - public @NotNull ChunkPage getPageContaining(FillContext fillContext, - final long elementIndex) { - final long row = elementIndex & mask(); - Require.inRange(row, "row", numRows(), "numRows"); - - // This is safe because of our check in the constructor, and we know the row is in range. - final int pageNum = (int) (row / pageFixedSize); - - fillToPage(pageNum); - return getPage(pageNum); - } -} diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/OffsetIndexBasedColumnChunkPageStore.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/OffsetIndexBasedColumnChunkPageStore.java index a71b628f37f..8a24fd72bbf 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/OffsetIndexBasedColumnChunkPageStore.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/OffsetIndexBasedColumnChunkPageStore.java @@ -17,22 +17,33 @@ import java.io.UncheckedIOException; import java.lang.ref.WeakReference; import java.util.Arrays; +import java.util.concurrent.atomic.AtomicReferenceArray; final class OffsetIndexBasedColumnChunkPageStore extends ColumnChunkPageStore { + private static final long PAGE_SIZE_NOT_FIXED = -1; + private final OffsetIndex offsetIndex; private final int numPages; /** - * Set if first ({@link #numPages}-1) pages have equal number of rows - */ - private boolean isPageSizeFixed; - /** - * Fixed number of rows per page, only valid if {@link #isPageSizeFixed} is true. Used to map from row index to page - * number. + * Fixed number of rows per page. Set as positive value if first ({@link #numPages}-1) pages have equal number of + * rows, else equal to {@value #PAGE_SIZE_NOT_FIXED}. We don't care about the size of the last page, because we can + * assume all pages to be of the same size and calculate the page number as + * {@code row_index / fixed_page_size -> page_number}. If it is greater than {@link #numPages}, we can assume the + * row to be coming from last page. */ private final long fixedPageSize; - private final Object[] objectsForSynchronizingPageAccess; - private final ColumnPageReader[] columnPageReaders; - private final WeakReference>[] pages; + + private final class PageState { + WeakReference> pageRef; + + PageState() { + // Initialized when used for the first time + pageRef = null; + } + } + + private final AtomicReferenceArray pageStates; + private final ColumnChunkReader.ColumnPageDirectAccessor columnPageDirectAccessor; OffsetIndexBasedColumnChunkPageStore(@NotNull final PageCache pageCache, @NotNull final ColumnChunkReader columnChunkReader, @@ -42,31 +53,22 @@ final class OffsetIndexBasedColumnChunkPageStore extends Colum offsetIndex = columnChunkReader.getOffsetIndex(); Assert.assertion(offsetIndex != null, "offsetIndex != null"); numPages = offsetIndex.getPageCount(); + Assert.assertion(numPages > 0, "numPages > 0"); + pageStates = new AtomicReferenceArray<>(numPages); + columnPageDirectAccessor = columnChunkReader.getPageAccessor(); - // noinspection unchecked - pages = (WeakReference>[]) new WeakReference[numPages]; - columnPageReaders = new ColumnPageReader[numPages]; - - isPageSizeFixed = true; - final long firstPageSize; - if (numPages > 1) { - firstPageSize = offsetIndex.getFirstRowIndex(1) - offsetIndex.getFirstRowIndex(0); - } else { - firstPageSize = numRows(); + if (numPages == 1) { + fixedPageSize = numRows(); + return; } - objectsForSynchronizingPageAccess = new Object[numPages]; - for (int i = 0; i < numPages; ++i) { - objectsForSynchronizingPageAccess[i] = new Object(); - if (isPageSizeFixed && i > 0 - && offsetIndex.getFirstRowIndex(i) - offsetIndex.getFirstRowIndex(i - 1) != firstPageSize) { + boolean isPageSizeFixed = true; + final long firstPageSize = offsetIndex.getFirstRowIndex(1) - offsetIndex.getFirstRowIndex(0); + for (int i = 2; i < numPages && isPageSizeFixed; ++i) { + if (offsetIndex.getFirstRowIndex(i) - offsetIndex.getFirstRowIndex(i - 1) != firstPageSize) { isPageSizeFixed = false; } } - if (isPageSizeFixed) { - fixedPageSize = firstPageSize; - } else { - fixedPageSize = -1; - } + fixedPageSize = isPageSizeFixed ? firstPageSize : PAGE_SIZE_NOT_FIXED; } /** @@ -95,32 +97,24 @@ private ChunkPage getPage(final int pageNum) { if (pageNum < 0 || pageNum >= numPages) { throw new IllegalArgumentException("pageNum " + pageNum + " is out of range [0, " + numPages + ")"); } - final PageCache.IntrusivePage page; - WeakReference> pageRef = pages[pageNum]; - if (pageRef == null || pageRef.get() == null) { - synchronized (objectsForSynchronizingPageAccess[pageNum]) { + PageCache.IntrusivePage page; + PageState pageState = pageStates.get(pageNum); + if (pageState == null) { + pageState = pageStates.updateAndGet(pageNum, p -> p == null ? new PageState() : p); + } + if (pageState.pageRef == null || (page = pageState.pageRef.get()) == null) { + synchronized (pageState) { // Make sure no one materialized this page as we waited for the lock - pageRef = pages[pageNum]; - if (pageRef == null || pageRef.get() == null) { - if (columnPageReaders[pageNum] == null) { - columnPageReaders[pageNum] = columnPageReaderIterator.getPageReader(pageNum); - } + if (pageState.pageRef == null || (page = pageState.pageRef.get()) == null) { + final ColumnPageReader reader = columnPageDirectAccessor.getPageReader(pageNum); try { - page = new PageCache.IntrusivePage<>(toPage(offsetIndex.getFirstRowIndex(pageNum), - columnPageReaders[pageNum])); + page = new PageCache.IntrusivePage<>(toPage(offsetIndex.getFirstRowIndex(pageNum), reader)); } catch (final IOException except) { throw new UncheckedIOException(except); } - pages[pageNum] = new WeakReference<>(page); - } else { - page = pageRef.get(); + pageState.pageRef = new WeakReference<>(page); } } - } else { - page = pageRef.get(); - } - if (page == null) { - throw new IllegalStateException("Page should not be null"); } pageCache.touch(page); return page.getPage(); @@ -133,7 +127,12 @@ public ChunkPage getPageContaining(@NotNull final FillContext fillContext, Require.inRange(row, "row", numRows(), "numRows"); int pageNum; - if (isPageSizeFixed) { + if (fixedPageSize == PAGE_SIZE_NOT_FIXED) { + pageNum = findPageNumUsingOffsetIndex(offsetIndex, row); + if (pageNum < 0) { + pageNum = -2 - pageNum; + } + } else { pageNum = (int) (row / fixedPageSize); if (pageNum >= numPages) { // This can happen if the last page is of different size from rest of the pages. @@ -142,12 +141,6 @@ public ChunkPage getPageContaining(@NotNull final FillContext fillContext, "row >= offsetIndex.getFirstRowIndex(numPages - 1)"); pageNum = (numPages - 1); } - } else { - pageNum = findPageNumUsingOffsetIndex(offsetIndex, row); - if (pageNum < 0) { - pageNum = -2 - pageNum; - } - } return getPage(pageNum); } diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/VariablePageSizeColumnChunkPageStore.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/VariablePageSizeColumnChunkPageStore.java index f89d541726d..e9568936d4c 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/VariablePageSizeColumnChunkPageStore.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/VariablePageSizeColumnChunkPageStore.java @@ -17,18 +17,17 @@ import java.lang.ref.WeakReference; import java.util.Arrays; -class VariablePageSizeColumnChunkPageStore extends ColumnChunkPageStore { +final class VariablePageSizeColumnChunkPageStore extends ColumnChunkPageStore { // We will set numPages after changing all of these arrays in place and/or setting additional - // elements to the - // end of the array. Thus, for i < numPages, array[i] will always have the same value, and be - // valid to use, as - // long as we fetch numPages before accessing the arrays. This is the thread-safe pattern used + // elements to the end of the array. Thus, for i < numPages, array[i] will always have the same value, and be + // valid to use, as long as we fetch numPages before accessing the arrays. This is the thread-safe pattern used // throughout. private volatile int numPages = 0; private volatile long[] pageRowOffsets; private volatile ColumnPageReader[] columnPageReaders; + private final ColumnChunkReader.ColumnPageReaderIterator columnPageReaderIterator; private volatile WeakReference>[] pages; VariablePageSizeColumnChunkPageStore(@NotNull final PageCache pageCache, @@ -41,6 +40,7 @@ class VariablePageSizeColumnChunkPageStore extends ColumnChunk pageRowOffsets = new long[INIT_ARRAY_SIZE + 1]; pageRowOffsets[0] = 0; columnPageReaders = new ColumnPageReader[INIT_ARRAY_SIZE]; + columnPageReaderIterator = columnChunkReader.getPageIterator(); // noinspection unchecked pages = (WeakReference>[]) new WeakReference[INIT_ARRAY_SIZE]; @@ -160,4 +160,9 @@ public ChunkPage getPageContaining(@NotNull final FillContext fillContext, return getPage(pageNum); } + + @Override + public void close() { + columnPageReaderIterator.close(); + } } diff --git a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java index 4790f6384f5..97174b5863b 100644 --- a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java +++ b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java @@ -628,6 +628,20 @@ private Table maybeFixBigDecimal(Table toFix) { .dropColumns("bdColE"); } + private static Table readParquetFileFromGitLFS(final File dest) { + try { + return readSingleFileTable(dest, EMPTY); + } catch (final RuntimeException e) { + if (e.getCause() instanceof InvalidParquetFileException) { + final String InvalidParquetFileErrorMsgString = "Invalid parquet file detected, please ensure the " + + "file is fetched properly from Git LFS. Run commands 'git lfs install; git lfs pull' inside " + + "the repo to pull the files from LFS. Check cause of exception for more details."; + throw new UncheckedDeephavenException(InvalidParquetFileErrorMsgString, e.getCause()); + } + throw e; + } + } + /** * Test if the current code can read the parquet data written by the old code. There is logic in * {@link ColumnChunkPageStore#create} that decides page store based on the version of the parquet file. The old @@ -655,25 +669,15 @@ private Table maybeFixBigDecimal(Table toFix) { @Test public void testReadOldParquetData() { String path = ParquetTableReadWriteTest.class.getResource("/ReferenceParquetData.parquet").getFile(); - try { - ParquetTools.readTable(new File(path)).select(); - } catch (RuntimeException e) { - if (e.getCause() instanceof InvalidParquetFileException) { - final String InvalidParquetFileErrorMsgString = "Invalid parquet file detected, please ensure the " + - "file is fetched properly from Git LFS. Run commands 'git lfs install; git lfs pull' inside " + - "the repo to pull the files from LFS. Check cause of exception for more details."; - throw new UncheckedDeephavenException(InvalidParquetFileErrorMsgString, e.getCause()); - } - throw e; - } + readParquetFileFromGitLFS(new File(path)).select(); final ParquetMetadata metadata = new ParquetTableLocationKey(new File(path), 0, null).getMetadata(); assertTrue(metadata.getFileMetaData().getKeyValueMetaData().get("deephaven").contains("\"version\":\"0.4.0\"")); path = ParquetTableReadWriteTest.class.getResource("/ReferenceParquetVectorData.parquet").getFile(); - ParquetTools.readTable(new File(path)).select(); + readParquetFileFromGitLFS(new File(path)).select(); path = ParquetTableReadWriteTest.class.getResource("/ReferenceParquetArrayData.parquet").getFile(); - ParquetTools.readTable(new File(path)).select(); + readParquetFileFromGitLFS(new File(path)).select(); } @Test @@ -696,13 +700,12 @@ public void testWritingDifferentPageSizes() { // Make a table with arrays of decreasing sizes such that different pages will have different number of rows Table arrayTable = TableTools.emptyTable(100).update( "intArrays = java.util.stream.IntStream.range(0, i).toArray()").reverse(); - final File dest = new File(rootFile + File.separator + "testWritingDifferentPageSizes.parquet"); + final File dest = new File(rootFile, "testWritingDifferentPageSizes.parquet"); final ParquetInstructions writeInstructions = new ParquetInstructions.Builder() .setTargetPageSize(ParquetInstructions.MIN_TARGET_PAGE_SIZE) .build(); ParquetTools.writeTable(arrayTable, dest, writeInstructions); - Table fromDisk = ParquetTools.readTable(dest).select(); - TstUtils.assertTableEquals(arrayTable, fromDisk); + checkSingleTable(arrayTable, dest); // Make a table such that only the last page has different number of rows, all else have equal number final long NUM_ROWS = 1000; @@ -710,11 +713,14 @@ public void testWritingDifferentPageSizes() { "intArrays = (i <= 900) ? java.util.stream.IntStream.range(i, i+50).toArray() : " + "java.util.stream.IntStream.range(i, i+2).toArray()"); ParquetTools.writeTable(arrayTable, dest, writeInstructions); - fromDisk = ParquetTools.readTable(dest); + final Table fromDisk = readSingleFileTable(dest, EMPTY); // Access something on the last page to make sure we can read it final int[] data = (int[]) fromDisk.getColumnSource("intArrays").get(998); - assertTrue(data.length == 2 && data[0] == 998 && data[1] == 999); - TstUtils.assertTableEquals(arrayTable, fromDisk.select()); + assertNotNull(data); + assertEquals(2, data.length); + assertEquals(998, data[0]); + assertEquals(999, data[1]); + assertTableEquals(arrayTable, fromDisk); } // Following is used for testing both writing APIs for parquet tables @@ -931,18 +937,7 @@ public void legacyGroupingFileReadTest() { final File destFile = new File(path); // Read the legacy file and verify that grouping column is read correctly - final Table fromDisk; - try { - fromDisk = readSingleFileTable(destFile, EMPTY); - } catch (RuntimeException e) { - if (e.getCause() instanceof InvalidParquetFileException) { - final String InvalidParquetFileErrorMsgString = "Invalid parquet file detected, please ensure the " + - "file is fetched properly from Git LFS. Run commands 'git lfs install; git lfs pull' inside " + - "the repo to pull the files from LFS. Check cause of exception for more details."; - throw new UncheckedDeephavenException(InvalidParquetFileErrorMsgString, e.getCause()); - } - throw e; - } + final Table fromDisk = readParquetFileFromGitLFS(destFile); final String groupingColName = "gcol"; assertTrue(fromDisk.getDefinition().getColumn(groupingColName).isGrouping()); @@ -1435,18 +1430,8 @@ public void readWriteDateTimeTest() { public void verifyPyArrowStatistics() { final String path = ParquetTableReadWriteTest.class.getResource("/e0/pyarrow_stats.parquet").getFile(); final File pyarrowDest = new File(path); - final Table pyarrowFromDisk; - try { - pyarrowFromDisk = readSingleFileTable(pyarrowDest, EMPTY); - } catch (RuntimeException e) { - if (e.getCause() instanceof InvalidParquetFileException) { - final String InvalidParquetFileErrorMsgString = "Invalid parquet file detected, please ensure the " + - "file is fetched properly from Git LFS. Run commands 'git lfs install; git lfs pull' inside " + - "the repo to pull the files from LFS. Check cause of exception for more details."; - throw new UncheckedDeephavenException(InvalidParquetFileErrorMsgString, e.getCause()); - } - throw e; - } + final Table pyarrowFromDisk = readParquetFileFromGitLFS(pyarrowDest); + // Verify that our verification code works for a pyarrow generated table. assertTableStatistics(pyarrowFromDisk, pyarrowDest);