diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReader.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReader.java index 7c647ae7e5e..b9290e96407 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReader.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReader.java @@ -35,15 +35,10 @@ public interface ColumnChunkReader { @Nullable OffsetIndex getOffsetIndex(); - interface ColumnPageReaderIterator extends Iterator, AutoCloseable { - @Override - void close(); - } - /** * @return An iterator over individual parquet pages */ - ColumnPageReaderIterator getPageIterator() throws IOException; + Iterator getPageIterator() throws IOException; interface ColumnPageDirectAccessor { /** diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java index 8ea8b7940fd..ffddffadab1 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java @@ -15,6 +15,7 @@ import org.apache.parquet.column.page.DictionaryPage; import org.apache.parquet.format.*; import org.apache.parquet.internal.column.columnindex.OffsetIndex; +import org.apache.parquet.io.ParquetDecodingException; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Type; @@ -27,6 +28,7 @@ import java.nio.channels.ReadableByteChannel; import java.nio.channels.SeekableByteChannel; import java.nio.file.Path; +import java.util.Iterator; import java.util.List; import java.util.NoSuchElementException; import java.util.function.Supplier; @@ -98,7 +100,7 @@ public final OffsetIndex getOffsetIndex() { } @Override - public ColumnPageReaderIterator getPageIterator() { + public Iterator getPageIterator() { final long dataPageOffset = columnChunk.meta_data.getData_page_offset(); if (offsetIndex == null) { return new ColumnPageReaderIteratorImpl(dataPageOffset, columnChunk.getMeta_data().getNum_values()); @@ -214,7 +216,7 @@ private Dictionary readDictionary(ReadableByteChannel file) throws IOException { return dictionaryPage.getEncoding().initDictionary(path, dictionaryPage); } - private final class ColumnPageReaderIteratorImpl implements ColumnPageReaderIterator { + private final class ColumnPageReaderIteratorImpl implements Iterator { private long currentOffset; private long remainingValues; @@ -231,7 +233,7 @@ public boolean hasNext() { @Override public ColumnPageReader next() { if (!hasNext()) { - throw new RuntimeException("No next element"); + throw new NoSuchElementException("No next element"); } // NB: The channels provider typically caches channels; this avoids maintaining a handle per column chunk try (final SeekableByteChannel readChannel = channelsProvider.getReadChannel(getFilePath())) { @@ -264,24 +266,21 @@ public ColumnPageReader next() { throw new UncheckedDeephavenException( "Unknown parquet data page header type " + pageHeader.type); } - final Supplier pageDictionarySupplier = - (encoding == PLAIN_DICTIONARY || encoding == RLE_DICTIONARY) - ? dictionarySupplier - : () -> NULL_DICTIONARY; - return new ColumnPageReaderImpl( - channelsProvider, decompressor, pageDictionarySupplier, + if ((encoding == PLAIN_DICTIONARY || encoding == RLE_DICTIONARY) + && dictionarySupplier.get() == NULL_DICTIONARY) { + throw new ParquetDecodingException("Error in decoding page because dictionary data not found for " + + " column " + path + " with encoding " + encoding); + } + return new ColumnPageReaderImpl(channelsProvider, decompressor, dictionarySupplier, nullMaterializerFactory, path, getFilePath(), fieldTypes, readChannel.position(), pageHeader, - -1); + ColumnPageReaderImpl.NULL_NUM_VALUES); } catch (IOException e) { - throw new RuntimeException("Error reading page header", e); + throw new UncheckedDeephavenException("Error reading page header", e); } } - - @Override - public void close() {} } - private final class ColumnPageReaderIteratorIndexImpl implements ColumnPageReaderIterator { + private final class ColumnPageReaderIteratorIndexImpl implements Iterator { private int pos; ColumnPageReaderIteratorIndexImpl() { @@ -311,9 +310,6 @@ nullMaterializerFactory, path, getFilePath(), fieldTypes, offsetIndex.getOffset( pos++; return columnPageReader; } - - @Override - public void close() {} } private final class ColumnPageDirectAccessorImpl implements ColumnPageDirectAccessor { @@ -326,9 +322,10 @@ public ColumnPageReader getPageReader(final int pageNum) { throw new IndexOutOfBoundsException( "pageNum=" + pageNum + ", offsetIndex.getPageCount()=" + offsetIndex.getPageCount()); } - final int numValues = -1; // Will be populated properly when we read the page header + // Page header and number of values will be populated later when we read the page header from the file return new ColumnPageReaderImpl(channelsProvider, decompressor, dictionarySupplier, nullMaterializerFactory, - path, getFilePath(), fieldTypes, offsetIndex.getOffset(pageNum), null, numValues); + path, getFilePath(), fieldTypes, offsetIndex.getOffset(pageNum), null, + ColumnPageReaderImpl.NULL_NUM_VALUES); } } } diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnPageReaderImpl.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnPageReaderImpl.java index 8393a8d6146..1e367b42758 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnPageReaderImpl.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnPageReaderImpl.java @@ -45,6 +45,7 @@ public class ColumnPageReaderImpl implements ColumnPageReader { private static final int MAX_HEADER = 8192; private static final int START_HEADER = 128; public static final int NULL_OFFSET = -1; + static final int NULL_NUM_VALUES = -1; private final SeekableChannelsProvider channelsProvider; private final CompressorAdapter compressorAdapter; @@ -64,8 +65,21 @@ public class ColumnPageReaderImpl implements ColumnPageReader { private int rowCount = -1; /** + * Returns a {@link ColumnPageReader} object for reading the column page data from the file. + * + * @param channelsProvider The provider for {@link SeekableByteChannel} for reading the file. + * @param compressorAdapter The adapter for decompressing the data. + * @param dictionarySupplier The supplier for dictionary data, set as {@link ColumnChunkReader#NULL_DICTIONARY} if + * page isn't dictionary encoded + * @param materializerFactory The factory for creating {@link PageMaterializer}. + * @param path The path of the column. + * @param filePath The path of the file. + * @param fieldTypes The types of the fields in the column. * @param offset The offset for page header if supplied {@code pageHeader} is {@code null}. Else, the offset of data - * in the page. + * following the header in the page. + * @param pageHeader The page header if it is already read from the file. Else, {@code null}. + * @param numValues The number of values in the page if it is already read from the file. Else, + * {@value #NULL_NUM_VALUES} */ ColumnPageReaderImpl(SeekableChannelsProvider channelsProvider, CompressorAdapter compressorAdapter, @@ -92,7 +106,6 @@ public class ColumnPageReaderImpl implements ColumnPageReader { @Override public Object materialize(Object nullValue) throws IOException { try (final SeekableByteChannel readChannel = channelsProvider.getReadChannel(filePath)) { - readChannel.position(offset); ensurePageHeader(readChannel); return readDataPage(nullValue, readChannel); } @@ -100,7 +113,6 @@ public Object materialize(Object nullValue) throws IOException { public int readRowCount() throws IOException { try (final SeekableByteChannel readChannel = channelsProvider.getReadChannel(filePath)) { - readChannel.position(offset); ensurePageHeader(readChannel); return readRowCountFromDataPage(readChannel); } @@ -110,53 +122,66 @@ public int readRowCount() throws IOException { @Override public IntBuffer readKeyValues(IntBuffer keyDest, int nullPlaceholder) throws IOException { try (final SeekableByteChannel readChannel = channelsProvider.getReadChannel(filePath)) { - readChannel.position(offset); ensurePageHeader(readChannel); return readKeyFromDataPage(keyDest, nullPlaceholder, readChannel); } } /** - * If {@link #pageHeader} is {@code null}, read it from the file and increment the {@link #offset} and file position - * by the length of page header. This method assumes that file position is set to {@link #offset} before calling. - * This method also read the number of values in the page from the header. + * If {@link #pageHeader} is {@code null}, read it from the file, and increment the {@link #offset} by the length of + * page header. Channel position would be set to the end of page header or beginning of data before returning. */ - private synchronized void ensurePageHeader(SeekableByteChannel file) throws IOException { - if (pageHeader == null) { - if (file.position() != offset) { - throw new IllegalStateException("File position = " + file.position() + " not equal to expected offset =" - + offset); - } - int maxHeader = START_HEADER; - boolean success; - do { - ByteBuffer headerBuffer = ByteBuffer.allocate(maxHeader); - file.read(headerBuffer); - headerBuffer.flip(); - ByteBufferInputStream bufferedIS = ByteBufferInputStream.wrap(headerBuffer); - try { - pageHeader = Util.readPageHeader(bufferedIS); - offset += bufferedIS.position(); - success = true; - } catch (IOException e) { - success = false; - if (maxHeader > MAX_HEADER) { - throw e; + private void ensurePageHeader(final SeekableByteChannel file) throws IOException { + // Set this channel's position to appropriate offset for reading. If pageHeader is null, this offset would be + // the offset of page header, else it would be the offset of data. + file.position(offset); + synchronized (this) { + if (pageHeader == null) { + int maxHeader = START_HEADER; + boolean success; + do { + final ByteBuffer headerBuffer = ByteBuffer.allocate(maxHeader); + file.read(headerBuffer); + headerBuffer.flip(); + final ByteBufferInputStream bufferedIS = ByteBufferInputStream.wrap(headerBuffer); + try { + pageHeader = Util.readPageHeader(bufferedIS); + offset += bufferedIS.position(); + success = true; + } catch (IOException e) { + success = false; + if (maxHeader > MAX_HEADER) { + throw e; + } + maxHeader <<= 1; + file.position(offset); + } + } while (!success); + file.position(offset); + if (numValues >= 0) { + final int numValuesFromHeader = readNumValuesFromPageHeader(pageHeader); + if (numValues != numValuesFromHeader) { + throw new IllegalStateException( + "numValues = " + numValues + " different from number of values " + + "read from the page header = " + numValuesFromHeader + " for column " + path); } - maxHeader *= 2; - file.position(offset); - } - } while (!success); - file.position(offset); - if (numValues >= 0) { - // Make sure the number of values are same as those in the header - if (numValues != readNumValuesFromPageHeader(pageHeader)) { - throw new IllegalStateException("numValues = " + numValues + " different from number of values " + - "read from the page header"); } } + if (numValues == NULL_NUM_VALUES) { + numValues = readNumValuesFromPageHeader(pageHeader); + } + } + } + + private static int readNumValuesFromPageHeader(@NotNull final PageHeader header) throws IOException { + switch (header.type) { + case DATA_PAGE: + return header.getData_page_header().getNum_values(); + case DATA_PAGE_V2: + return header.getData_page_header_v2().getNum_values(); + default: + throw new IOException(String.format("Unexpected page of type {%s}", header.getType())); } - ensureNumValues(); } private int readRowCountFromDataPage(ReadableByteChannel file) throws IOException { @@ -573,9 +598,8 @@ private ValuesReader getDataReader(Encoding dataEncoding, ByteBuffer in, int val if (dataEncoding.usesDictionary()) { final Dictionary dictionary = dictionarySupplier.get(); if (dictionary == ColumnChunkReader.NULL_DICTIONARY) { - throw new ParquetDecodingException( - "Could not read page in col " + path - + " as the dictionary was missing for encoding " + dataEncoding); + throw new ParquetDecodingException("Could not read page in col " + path + " as the dictionary was " + + "missing for encoding " + dataEncoding); } dataReader = new DictionaryValuesReader(dictionary); } else { @@ -596,33 +620,13 @@ public int numValues() throws IOException { return numValues; } try (final SeekableByteChannel readChannel = channelsProvider.getReadChannel(filePath)) { - readChannel.position(offset); ensurePageHeader(readChannel); - // Above will automatically populate numValues - Assert.geq(numValues, "numValues", 0); + // Above will block till it populates numValues + Assert.geqZero(numValues, "numValues"); return numValues; } } - private void ensureNumValues() throws IOException { - if (numValues >= 0) { - return; - } - Assert.neqNull(pageHeader, "pageHeader"); - numValues = readNumValuesFromPageHeader(pageHeader); - } - - private static int readNumValuesFromPageHeader(@NotNull final PageHeader header) throws IOException { - switch (header.type) { - case DATA_PAGE: - return header.getData_page_header().getNum_values(); - case DATA_PAGE_V2: - return header.getData_page_header_v2().getNum_values(); - default: - throw new IOException(String.format("Unexpected page of type {%s}", header.getType())); - } - } - @NotNull @Override public Dictionary getDictionary() { diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/metadata/TableInfo.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/metadata/TableInfo.java index 23f6bb7bffe..5e228456fdd 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/metadata/TableInfo.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/metadata/TableInfo.java @@ -68,7 +68,7 @@ public String version() { final String version = TableInfo.class.getPackage().getImplementationVersion(); if (version == null) { // For unit tests - return "0.dev.0"; + return "unknown"; } return version; } diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/ColumnChunkPageStore.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/ColumnChunkPageStore.java index f36c93cbc64..dcd7677ccf5 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/ColumnChunkPageStore.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/ColumnChunkPageStore.java @@ -65,7 +65,7 @@ private static boolean canUseOffsetIndexBasedPageStore(@NotNull final ColumnChun // part of deephaven-core#4844 final Class columnType = columnDefinition.getDataType(); if (columnType.isArray() || Vector.class.isAssignableFrom(columnType)) { - return satisfiesMinimumVersionRequirements(version); + return hasCorrectVectorOffsetIndexes(version); } return true; } @@ -76,7 +76,7 @@ private static boolean canUseOffsetIndexBasedPageStore(@NotNull final ColumnChun * Check if the version is greater than or equal to 0.31.0, or it doesn't follow the versioning schema X.Y.Z */ @VisibleForTesting - public static boolean satisfiesMinimumVersionRequirements(@NotNull final String version) { + public static boolean hasCorrectVectorOffsetIndexes(@NotNull final String version) { final Matcher matcher = VERSION_PATTERN.matcher(version); if (!matcher.matches()) { // Could be unit tests or some other versioning scheme diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/OffsetIndexBasedColumnChunkPageStore.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/OffsetIndexBasedColumnChunkPageStore.java index a9c7b37f57f..62f0f22cfa0 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/OffsetIndexBasedColumnChunkPageStore.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/OffsetIndexBasedColumnChunkPageStore.java @@ -53,9 +53,9 @@ private static final class PageState { @NotNull final ToPage toPage) throws IOException { super(pageCache, columnChunkReader, mask, toPage); offsetIndex = columnChunkReader.getOffsetIndex(); - Assert.assertion(offsetIndex != null, "offsetIndex != null"); + Assert.neqNull(offsetIndex, "offsetIndex"); numPages = offsetIndex.getPageCount(); - Assert.assertion(numPages > 0, "numPages > 0"); + Assert.gtZero(numPages, "numPages"); pageStates = new AtomicReferenceArray<>(numPages); columnPageDirectAccessor = columnChunkReader.getPageAccessor(); @@ -85,13 +85,13 @@ private static int findPageNumUsingOffsetIndex(final OffsetIndex offsetIndex, fi while (low <= high) { final int mid = (low + high) >>> 1; final long midVal = offsetIndex.getFirstRowIndex(mid); - - if (midVal < row) + if (midVal < row) { low = mid + 1; - else if (midVal > row) + } else if (midVal > row) { high = mid - 1; - else + } else { return mid; // 'row' is the first row of page + } } return (low - 1); // 'row' is somewhere in the middle of page } @@ -141,8 +141,8 @@ public ChunkPage getPageContaining(@NotNull final FillContext fillContext, if (pageNum >= numPages) { // This can happen if the last page is larger than rest of the pages, which are all the same size. // We have already checked that row is less than numRows. - Assert.assertion(row >= offsetIndex.getFirstRowIndex(numPages - 1), - "row >= offsetIndex.getFirstRowIndex(numPages - 1)"); + Assert.geq(row, "row", offsetIndex.getFirstRowIndex(numPages - 1), + "offsetIndex.getFirstRowIndex(numPages - 1)"); pageNum = (numPages - 1); } } diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/VariablePageSizeColumnChunkPageStore.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/VariablePageSizeColumnChunkPageStore.java index e9568936d4c..9975ebdbb5d 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/VariablePageSizeColumnChunkPageStore.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/VariablePageSizeColumnChunkPageStore.java @@ -16,6 +16,7 @@ import java.io.UncheckedIOException; import java.lang.ref.WeakReference; import java.util.Arrays; +import java.util.Iterator; final class VariablePageSizeColumnChunkPageStore extends ColumnChunkPageStore { @@ -27,7 +28,7 @@ final class VariablePageSizeColumnChunkPageStore extends Colum private volatile int numPages = 0; private volatile long[] pageRowOffsets; private volatile ColumnPageReader[] columnPageReaders; - private final ColumnChunkReader.ColumnPageReaderIterator columnPageReaderIterator; + private final Iterator columnPageReaderIterator; private volatile WeakReference>[] pages; VariablePageSizeColumnChunkPageStore(@NotNull final PageCache pageCache, @@ -160,9 +161,4 @@ public ChunkPage getPageContaining(@NotNull final FillContext fillContext, return getPage(pageNum); } - - @Override - public void close() { - columnPageReaderIterator.close(); - } } diff --git a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java index 671a4ed3a15..1ea62f13652 100644 --- a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java +++ b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java @@ -682,19 +682,15 @@ public void testReadOldParquetData() { @Test public void testVersionChecks() { - assertFalse(ColumnChunkPageStore.satisfiesMinimumVersionRequirements("0.0.0")); - assertFalse(ColumnChunkPageStore.satisfiesMinimumVersionRequirements("0.4.0")); - try { - ColumnChunkPageStore.satisfiesMinimumVersionRequirements("0.3"); - TestCase.fail("Exception expected for invalid version string"); - } catch (IllegalArgumentException expected) { - } - assertTrue(ColumnChunkPageStore.satisfiesMinimumVersionRequirements("0.31.0")); - assertTrue(ColumnChunkPageStore.satisfiesMinimumVersionRequirements("0.31.1")); - assertTrue(ColumnChunkPageStore.satisfiesMinimumVersionRequirements("0.32.0")); - assertTrue(ColumnChunkPageStore.satisfiesMinimumVersionRequirements("1.3.0")); - assertTrue(ColumnChunkPageStore.satisfiesMinimumVersionRequirements("0.dev.0")); - assertTrue(ColumnChunkPageStore.satisfiesMinimumVersionRequirements("0.31.0-SNAPSHOT")); + assertFalse(ColumnChunkPageStore.hasCorrectVectorOffsetIndexes("0.0.0")); + assertFalse(ColumnChunkPageStore.hasCorrectVectorOffsetIndexes("0.4.0")); + assertTrue(ColumnChunkPageStore.hasCorrectVectorOffsetIndexes("0.3")); + assertTrue(ColumnChunkPageStore.hasCorrectVectorOffsetIndexes("0.31.0")); + assertTrue(ColumnChunkPageStore.hasCorrectVectorOffsetIndexes("0.31.1")); + assertTrue(ColumnChunkPageStore.hasCorrectVectorOffsetIndexes("0.32.0")); + assertTrue(ColumnChunkPageStore.hasCorrectVectorOffsetIndexes("1.3.0")); + assertTrue(ColumnChunkPageStore.hasCorrectVectorOffsetIndexes("unknown")); + assertTrue(ColumnChunkPageStore.hasCorrectVectorOffsetIndexes("0.31.0-SNAPSHOT")); }