From 0be0031b9dac0ad07f2a97a8a841de697ca6ab00 Mon Sep 17 00:00:00 2001 From: Shivam Malhotra Date: Wed, 12 Jun 2024 16:30:05 -0500 Subject: [PATCH 1/4] Added optional limits on maximum bytes read from file streams --- .../util/channel/CachedChannelProvider.java | 5 +++ .../channel/SeekableChannelsProvider.java | 9 ++++ .../parquet/base/ColumnChunkReaderImpl.java | 43 +++++++++++-------- .../parquet/base/ColumnPageReaderImpl.java | 10 ++--- .../parquet/base/OffsetIndexReaderImpl.java | 3 +- .../parquet/base/ParquetFileReader.java | 12 ++++-- .../DeephavenCompressorAdapterFactory.java | 4 +- .../table/ParquetTableReadWriteTest.java | 12 ++---- .../TrackedSeekableChannelsProvider.java | 11 +++++ 9 files changed, 73 insertions(+), 36 deletions(-) diff --git a/Util/channel/src/main/java/io/deephaven/util/channel/CachedChannelProvider.java b/Util/channel/src/main/java/io/deephaven/util/channel/CachedChannelProvider.java index bb4f5eecab4..de04bf23609 100644 --- a/Util/channel/src/main/java/io/deephaven/util/channel/CachedChannelProvider.java +++ b/Util/channel/src/main/java/io/deephaven/util/channel/CachedChannelProvider.java @@ -106,6 +106,11 @@ public InputStream getInputStream(SeekableByteChannel channel) throws IOExceptio return wrappedProvider.getInputStream(channel); } + @Override + public InputStream getInputStream(SeekableByteChannel channel, int sizeLimit) throws IOException { + return wrappedProvider.getInputStream(channel, sizeLimit); + } + @Override public SeekableByteChannel getWriteChannel(@NotNull final Path path, final boolean append) throws IOException { final String pathKey = path.toAbsolutePath().toString(); diff --git a/Util/channel/src/main/java/io/deephaven/util/channel/SeekableChannelsProvider.java b/Util/channel/src/main/java/io/deephaven/util/channel/SeekableChannelsProvider.java index 91831b60400..e3712f71c19 100644 --- a/Util/channel/src/main/java/io/deephaven/util/channel/SeekableChannelsProvider.java +++ b/Util/channel/src/main/java/io/deephaven/util/channel/SeekableChannelsProvider.java @@ -3,6 +3,7 @@ // package io.deephaven.util.channel; +import com.google.common.io.ByteStreams; import io.deephaven.util.SafeCloseable; import org.jetbrains.annotations.NotNull; @@ -81,6 +82,14 @@ SeekableByteChannel getReadChannel(@NotNull SeekableChannelContext channelContex */ InputStream getInputStream(SeekableByteChannel channel) throws IOException; + /** + * Creates an {@link InputStream} from the current position of {@code channel} from which at most {@code sizeLimit} + * bytes can be read. More details in {@link #getInputStream(SeekableByteChannel)}. + */ + default InputStream getInputStream(SeekableByteChannel channel, int sizeLimit) throws IOException { + return ByteStreams.limit(getInputStream(channel), sizeLimit); + } + default SeekableByteChannel getWriteChannel(@NotNull final String path, final boolean append) throws IOException { return getWriteChannel(Paths.get(path), append); } diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java index 886f1db6345..4b83111542d 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java @@ -4,6 +4,8 @@ package io.deephaven.parquet.base; import io.deephaven.UncheckedDeephavenException; +import io.deephaven.util.channel.ChannelPositionInputStream; +import io.deephaven.util.channel.Channels; import io.deephaven.util.channel.SeekableChannelContext; import io.deephaven.util.channel.SeekableChannelsProvider; import io.deephaven.parquet.compress.CompressorAdapter; @@ -23,6 +25,7 @@ import org.apache.parquet.schema.Type; import org.jetbrains.annotations.NotNull; +import java.io.BufferedInputStream; import java.io.IOException; import java.io.InputStream; import java.io.UncheckedIOException; @@ -194,9 +197,9 @@ private Dictionary getDictionary(final SeekableChannelContext channelContext) { // Use the context object provided by the caller, or create (and close) a new one try ( final ContextHolder holder = SeekableChannelContext.ensureContext(channelsProvider, channelContext); - final SeekableByteChannel ch = channelsProvider.getReadChannel(holder.get(), getURI()); - final InputStream in = channelsProvider.getInputStream(ch.position(dictionaryPageOffset))) { - return readDictionary(in, holder.get()); + final SeekableByteChannel ch = + channelsProvider.getReadChannel(holder.get(), getURI()).position(dictionaryPageOffset)) { + return readDictionary(ch, holder.get()); } catch (IOException e) { throw new UncheckedIOException(e); } @@ -218,9 +221,10 @@ public SeekableChannelsProvider getChannelsProvider() { } @NotNull - private Dictionary readDictionary(InputStream in, SeekableChannelContext channelContext) throws IOException { + private Dictionary readDictionary(SeekableByteChannel ch, SeekableChannelContext channelContext) + throws IOException { // explicitly not closing this, caller is responsible - final PageHeader pageHeader = Util.readPageHeader(in); + final PageHeader pageHeader = readPageHeader(ch); if (pageHeader.getType() != PageType.DICTIONARY_PAGE) { // In case our fallback in getDictionary was too optimistic... return NULL_DICTIONARY; @@ -228,18 +232,21 @@ private Dictionary readDictionary(InputStream in, SeekableChannelContext channel final DictionaryPageHeader dictHeader = pageHeader.getDictionary_page_header(); final int compressedPageSize = pageHeader.getCompressed_page_size(); final BytesInput payload; - if (compressedPageSize == 0) { - // Sometimes the size is explicitly empty, just use an empty payload - payload = BytesInput.empty(); - } else { - payload = decompressor.decompress(in, compressedPageSize, pageHeader.getUncompressed_page_size(), - channelContext); + try (final InputStream in = (compressedPageSize == 0) ? null + : ChannelPositionInputStream.of(ch, channelsProvider.getInputStream(ch, compressedPageSize))) { + if (compressedPageSize == 0) { + // Sometimes the size is explicitly empty, just use an empty payload + payload = BytesInput.empty(); + } else { + payload = decompressor.decompress(in, compressedPageSize, pageHeader.getUncompressed_page_size(), + channelContext); + } + final Encoding encoding = Encoding.valueOf(dictHeader.getEncoding().name()); + final DictionaryPage dictionaryPage = new DictionaryPage(payload, dictHeader.getNum_values(), encoding); + // We are safe to not copy the payload because the Dictionary doesn't hold a reference to dictionaryPage or + // payload and thus doesn't hold a reference to the input stream. + return encoding.initDictionary(path, dictionaryPage); } - final Encoding encoding = Encoding.valueOf(dictHeader.getEncoding().name()); - final DictionaryPage dictionaryPage = new DictionaryPage(payload, dictHeader.getNum_values(), encoding); - // We are safe to not copy the payload because the Dictionary doesn't hold a reference to dictionaryPage or - // payload and thus doesn't hold a reference to the input stream. - return encoding.initDictionary(path, dictionaryPage); } private final class ColumnPageReaderIteratorImpl implements ColumnPageReaderIterator { @@ -315,7 +322,9 @@ private org.apache.parquet.format.Encoding getEncoding(final PageHeader pageHead } private PageHeader readPageHeader(final SeekableByteChannel ch) throws IOException { - try (final InputStream in = SeekableChannelsProvider.channelPositionInputStream(channelsProvider, ch)) { + // We don't know the exact size of page header, so we read it through a buffered stream in chunks of 128 bytes + try (final InputStream in = ChannelPositionInputStream.of(ch, + new BufferedInputStream(Channels.newInputStreamNoClose(ch), 128))) { return Util.readPageHeader(in); } } diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnPageReaderImpl.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnPageReaderImpl.java index 8d209bfd95b..dd2cb0abc20 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnPageReaderImpl.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnPageReaderImpl.java @@ -206,7 +206,7 @@ private int readRowCountFromDataPage( @NotNull final SeekableChannelContext channelContext) throws IOException { switch (pageHeader.type) { case DATA_PAGE: - try (final InputStream in = channelsProvider.getInputStream(ch)) { + try (final InputStream in = channelsProvider.getInputStream(ch, pageHeader.getCompressed_page_size())) { return readRowCountFromPageV1(readV1Unsafe(in, channelContext)); } case DATA_PAGE_V2: @@ -225,12 +225,12 @@ private IntBuffer readKeysFromDataPage( @NotNull final SeekableChannelContext channelContext) throws IOException { switch (pageHeader.type) { case DATA_PAGE: - try (final InputStream in = channelsProvider.getInputStream(ch)) { + try (final InputStream in = channelsProvider.getInputStream(ch, pageHeader.getCompressed_page_size())) { return readKeysFromPageV1(readV1Unsafe(in, channelContext), keyDest, nullPlaceholder, channelContext); } case DATA_PAGE_V2: - try (final InputStream in = channelsProvider.getInputStream(ch)) { + try (final InputStream in = channelsProvider.getInputStream(ch, pageHeader.getCompressed_page_size())) { return readKeysFromPageV2(readV2Unsafe(in, channelContext), keyDest, nullPlaceholder, channelContext); } @@ -246,11 +246,11 @@ private Object readDataPage( @NotNull final SeekableChannelContext channelContext) throws IOException { switch (pageHeader.type) { case DATA_PAGE: - try (final InputStream in = channelsProvider.getInputStream(ch)) { + try (final InputStream in = channelsProvider.getInputStream(ch, pageHeader.getCompressed_page_size())) { return readPageV1(readV1Unsafe(in, channelContext), nullValue, channelContext); } case DATA_PAGE_V2: - try (final InputStream in = channelsProvider.getInputStream(ch)) { + try (final InputStream in = channelsProvider.getInputStream(ch, pageHeader.getCompressed_page_size())) { return readPageV2(readV2Unsafe(in, channelContext), nullValue, channelContext); } default: diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/OffsetIndexReaderImpl.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/OffsetIndexReaderImpl.java index 8758883bd39..c8bc1168741 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/OffsetIndexReaderImpl.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/OffsetIndexReaderImpl.java @@ -55,7 +55,8 @@ private OffsetIndex readOffsetIndex(@NotNull final SeekableChannelContext channe SeekableChannelContext.ensureContext(channelsProvider, channelContext); final SeekableByteChannel readChannel = channelsProvider.getReadChannel(holder.get(), columnChunkURI); final InputStream in = - channelsProvider.getInputStream(readChannel.position(columnChunk.getOffset_index_offset()))) { + channelsProvider.getInputStream(readChannel.position(columnChunk.getOffset_index_offset()), + columnChunk.getOffset_index_length())) { return (offsetIndex = ParquetMetadataConverter.fromParquetOffsetIndex(Util.readOffsetIndex(in))); } catch (final IOException e) { throw new UncheckedIOException(e); diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileReader.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileReader.java index 993bc821e1d..b7227e2f6a5 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileReader.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileReader.java @@ -96,15 +96,20 @@ private ParquetFileReader( try ( final SeekableChannelContext context = channelsProvider.makeSingleUseContext(); final SeekableByteChannel ch = channelsProvider.getReadChannel(context, parquetFileURI)) { - positionToFileMetadata(parquetFileURI, ch); - try (final InputStream in = channelsProvider.getInputStream(ch)) { + final int footerLength = positionToFileMetadata(parquetFileURI, ch); + try (final InputStream in = channelsProvider.getInputStream(ch, footerLength)) { fileMetaData = Util.readFileMetaData(in); } } type = fromParquetSchema(fileMetaData.schema, fileMetaData.column_orders); } - private static void positionToFileMetadata(URI parquetFileURI, SeekableByteChannel readChannel) throws IOException { + /** + * Read the footer length and position the channel to the start of the footer. + * + * @return The length of the footer + */ + private static int positionToFileMetadata(URI parquetFileURI, SeekableByteChannel readChannel) throws IOException { final long fileLen = readChannel.size(); if (fileLen < MAGIC.length + FOOTER_LENGTH_SIZE + MAGIC.length) { // MAGIC + data + footer + // footerIndex + MAGIC @@ -128,6 +133,7 @@ private static void positionToFileMetadata(URI parquetFileURI, SeekableByteChann "corrupted file: the footer index is not within the file: " + footerIndex); } readChannel.position(footerIndex); + return footerLength; } private static int makeLittleEndianInt(byte b0, byte b1, byte b2, byte b3) { diff --git a/extensions/parquet/compression/src/main/java/io/deephaven/parquet/compress/DeephavenCompressorAdapterFactory.java b/extensions/parquet/compression/src/main/java/io/deephaven/parquet/compress/DeephavenCompressorAdapterFactory.java index 66d8ebbeef3..ec6991af549 100644 --- a/extensions/parquet/compression/src/main/java/io/deephaven/parquet/compress/DeephavenCompressorAdapterFactory.java +++ b/extensions/parquet/compression/src/main/java/io/deephaven/parquet/compress/DeephavenCompressorAdapterFactory.java @@ -154,8 +154,8 @@ public BytesInput decompress(final InputStream inputStream, final int compressed try { // Note that we don't close the decompressed stream because doing so may return the decompressor to the // pool - final InputStream buffered = ByteStreams.limit(IOUtils.buffer(inputStream), compressedSize); - final CompressionInputStream decompressed = compressionCodec.createInputStream(buffered, decompressor); + final CompressionInputStream decompressed = + compressionCodec.createInputStream(inputStream, decompressor); return BytesInput.copy(BytesInput.from(decompressed, uncompressedSize)); } finally { if (decompressor != null) { diff --git a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java index eca5a174830..fe15bd18d2e 100644 --- a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java +++ b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java @@ -521,14 +521,10 @@ public void test_lz4_compressed() { final Table fromDisk = checkSingleTable(table, dest).select(); - try { - // The following file is tagged as LZ4 compressed based on its metadata, but is actually compressed with - // LZ4_RAW. We should be able to read it anyway with no exceptions. - String path = TestParquetTools.class.getResource("/sample_lz4_compressed.parquet").getFile(); - readTable(path, EMPTY.withLayout(ParquetInstructions.ParquetFileLayout.SINGLE_FILE)).select(); - } catch (RuntimeException e) { - TestCase.fail("Failed to read parquet file sample_lz4_compressed.parquet"); - } + // The following file is tagged as LZ4 compressed based on its metadata, but is actually compressed with + // LZ4_RAW. We should be able to read it anyway with no exceptions. + String path = TestParquetTools.class.getResource("/sample_lz4_compressed.parquet").getFile(); + readTable(path, EMPTY.withLayout(ParquetInstructions.ParquetFileLayout.SINGLE_FILE)).select(); final File randomDest = new File(rootFile, "random.parquet"); writeTable(fromDisk, randomDest.getPath(), ParquetTools.LZ4_RAW); diff --git a/extensions/trackedfile/src/main/java/io/deephaven/extensions/trackedfile/TrackedSeekableChannelsProvider.java b/extensions/trackedfile/src/main/java/io/deephaven/extensions/trackedfile/TrackedSeekableChannelsProvider.java index 4aec474721d..978e5d23952 100644 --- a/extensions/trackedfile/src/main/java/io/deephaven/extensions/trackedfile/TrackedSeekableChannelsProvider.java +++ b/extensions/trackedfile/src/main/java/io/deephaven/extensions/trackedfile/TrackedSeekableChannelsProvider.java @@ -3,6 +3,7 @@ // package io.deephaven.extensions.trackedfile; +import com.google.common.io.ByteStreams; import io.deephaven.base.FileUtils; import io.deephaven.base.verify.Assert; import io.deephaven.engine.util.file.FileHandle; @@ -34,6 +35,8 @@ */ final class TrackedSeekableChannelsProvider implements SeekableChannelsProvider { + private static int MAX_READ_BUFFER_SIZE = 1 << 16; // 64 KiB + private final TrackedFileHandleFactory fileHandleFactory; TrackedSeekableChannelsProvider(@NotNull final TrackedFileHandleFactory fileHandleFactory) { @@ -64,6 +67,14 @@ public InputStream getInputStream(SeekableByteChannel channel) { return new BufferedInputStream(Channels.newInputStreamNoClose(channel)); } + @Override + public InputStream getInputStream(SeekableByteChannel channel, int sizeLimit) { + // The following stream will read from the channel in chunks of bufferSize bytes, up to sizeLimit bytes. + final int bufferSize = Math.min(sizeLimit, MAX_READ_BUFFER_SIZE); + return new BufferedInputStream(ByteStreams.limit(Channels.newInputStreamNoClose(channel), sizeLimit), + bufferSize); + } + @Override public SeekableByteChannel getWriteChannel(@NotNull final Path filePath, final boolean append) throws IOException { From 61a18729f484d6a48d616a7f9cfd41f0f3472408 Mon Sep 17 00:00:00 2001 From: Shivam Malhotra Date: Fri, 14 Jun 2024 14:08:23 -0500 Subject: [PATCH 2/4] Resolving review comments --- .../util/channel/CachedChannelProvider.java | 11 ++--- .../util/channel/LocalFSChannelProvider.java | 7 ++- .../channel/SeekableChannelsProvider.java | 44 ++++++++----------- .../channel/CachedChannelProviderTest.java | 2 +- .../parquet/base/ColumnChunkReaderImpl.java | 10 ++--- .../DeephavenCompressorAdapterFactory.java | 4 +- .../table/ParquetTableReadWriteTest.java | 2 +- .../s3/S3SeekableChannelProvider.java | 2 +- .../TrackedSeekableChannelsProvider.java | 18 +++----- 9 files changed, 39 insertions(+), 61 deletions(-) diff --git a/Util/channel/src/main/java/io/deephaven/util/channel/CachedChannelProvider.java b/Util/channel/src/main/java/io/deephaven/util/channel/CachedChannelProvider.java index de04bf23609..bb5bafda49b 100644 --- a/Util/channel/src/main/java/io/deephaven/util/channel/CachedChannelProvider.java +++ b/Util/channel/src/main/java/io/deephaven/util/channel/CachedChannelProvider.java @@ -102,13 +102,8 @@ public SeekableByteChannel getReadChannel(@NotNull final SeekableChannelContext } @Override - public InputStream getInputStream(SeekableByteChannel channel) throws IOException { - return wrappedProvider.getInputStream(channel); - } - - @Override - public InputStream getInputStream(SeekableByteChannel channel, int sizeLimit) throws IOException { - return wrappedProvider.getInputStream(channel, sizeLimit); + public InputStream getInputStream(final SeekableByteChannel channel, final int sizeHint) throws IOException { + return wrappedProvider.getInputStream(channel, sizeHint); } @Override @@ -120,7 +115,7 @@ public SeekableByteChannel getWriteChannel(@NotNull final Path path, final boole return result == null ? new CachedChannel(wrappedProvider.getWriteChannel(path, append), channelType, pathKey) : result.position(append ? result.size() : 0); // The seek isn't really necessary for append; will be at - // end no matter what. + // end no matter what. } @Override diff --git a/Util/channel/src/main/java/io/deephaven/util/channel/LocalFSChannelProvider.java b/Util/channel/src/main/java/io/deephaven/util/channel/LocalFSChannelProvider.java index 5d902447180..7b7b23b24ba 100644 --- a/Util/channel/src/main/java/io/deephaven/util/channel/LocalFSChannelProvider.java +++ b/Util/channel/src/main/java/io/deephaven/util/channel/LocalFSChannelProvider.java @@ -19,6 +19,8 @@ import java.util.stream.Stream; public class LocalFSChannelProvider implements SeekableChannelsProvider { + private static final int MAX_READ_BUFFER_SIZE = 1 << 16; // 64 KiB + @Override public SeekableChannelContext makeContext() { // No additional context required for local FS @@ -40,9 +42,10 @@ public SeekableByteChannel getReadChannel(@Nullable final SeekableChannelContext } @Override - public InputStream getInputStream(SeekableByteChannel channel) { + public InputStream getInputStream(final SeekableByteChannel channel, final int sizeHint) { // FileChannel is not buffered, need to buffer - return new BufferedInputStream(Channels.newInputStreamNoClose(channel)); + final int bufferSize = Math.min(sizeHint, MAX_READ_BUFFER_SIZE); + return new BufferedInputStream(Channels.newInputStreamNoClose(channel), bufferSize); } @Override diff --git a/Util/channel/src/main/java/io/deephaven/util/channel/SeekableChannelsProvider.java b/Util/channel/src/main/java/io/deephaven/util/channel/SeekableChannelsProvider.java index e3712f71c19..88f4eda9414 100644 --- a/Util/channel/src/main/java/io/deephaven/util/channel/SeekableChannelsProvider.java +++ b/Util/channel/src/main/java/io/deephaven/util/channel/SeekableChannelsProvider.java @@ -3,7 +3,6 @@ // package io.deephaven.util.channel; -import com.google.common.io.ByteStreams; import io.deephaven.util.SafeCloseable; import org.jetbrains.annotations.NotNull; @@ -20,13 +19,13 @@ public interface SeekableChannelsProvider extends SafeCloseable { /** - * Wraps {@link SeekableChannelsProvider#getInputStream(SeekableByteChannel)} to ensure the channel's position is - * incremented the exact amount that has been consumed from the resulting input stream. To remain valid, the caller - * must ensure that the resulting input stream isn't re-wrapped by any downstream code in a way that would adversely - * affect the position (such as re-wrapping the resulting input stream with buffering). + * Wraps {@link SeekableChannelsProvider#getInputStream(SeekableByteChannel, int)} to ensure the channel's position + * is incremented the exact amount that has been consumed from the resulting input stream. To remain valid, the + * caller must ensure that the resulting input stream isn't re-wrapped by any downstream code in a way that would + * adversely affect the position (such as re-wrapping the resulting input stream with buffering). * *

- * Equivalent to {@code ChannelPositionInputStream.of(ch, provider.getInputStream(ch))}. + * Equivalent to {@code ChannelPositionInputStream.of(ch, provider.getInputStream(ch, sizeHint))}. * * @param provider the provider * @param ch the seekable channel @@ -34,9 +33,9 @@ public interface SeekableChannelsProvider extends SafeCloseable { * @throws IOException if an IO exception occurs * @see ChannelPositionInputStream#of(SeekableByteChannel, InputStream) */ - static InputStream channelPositionInputStream(SeekableChannelsProvider provider, SeekableByteChannel ch) - throws IOException { - return ChannelPositionInputStream.of(ch, provider.getInputStream(ch)); + static InputStream channelPositionInputStream(SeekableChannelsProvider provider, SeekableByteChannel ch, + int sizeHint) throws IOException { + return ChannelPositionInputStream.of(ch, provider.getInputStream(ch, sizeHint)); } /** @@ -67,28 +66,21 @@ SeekableByteChannel getReadChannel(@NotNull SeekableChannelContext channelContex throws IOException; /** - * Creates an {@link InputStream} from the current position of {@code channel}; closing the resulting input stream - * does not close the {@code channel}. The {@link InputStream} will be buffered; either explicitly in the - * case where the implementation uses an unbuffered {@link #getReadChannel(SeekableChannelContext, URI)}, or - * implicitly when the implementation uses a buffered {@link #getReadChannel(SeekableChannelContext, URI)}. - * {@code channel} must have been created by {@code this} provider. The caller can't assume the position of - * {@code channel} after consuming the {@link InputStream}. For use-cases that require the channel's position to be - * incremented the exact amount the {@link InputStream} has been consumed, use - * {@link #channelPositionInputStream(SeekableChannelsProvider, SeekableByteChannel)}. + * Creates an {@link InputStream} from the current position of {@code channel} from which the caller expects to read + * {@code sizeHint} number of bytes. Closing the resulting input stream does not close the {@code channel}. + * The {@link InputStream} will be buffered; either explicitly in the case where the implementation uses an + * unbuffered {@link #getReadChannel(SeekableChannelContext, URI)}, or implicitly when the implementation uses a + * buffered {@link #getReadChannel(SeekableChannelContext, URI)}. {@code channel} must have been created by + * {@code this} provider. The caller can't assume the position of {@code channel} after consuming the + * {@link InputStream}. For use-cases that require the channel's position to be incremented the exact amount the + * {@link InputStream} has been consumed, use + * {@link #channelPositionInputStream(SeekableChannelsProvider, SeekableByteChannel, int)}. * * @param channel the channel * @return the input stream * @throws IOException if an IO exception occurs */ - InputStream getInputStream(SeekableByteChannel channel) throws IOException; - - /** - * Creates an {@link InputStream} from the current position of {@code channel} from which at most {@code sizeLimit} - * bytes can be read. More details in {@link #getInputStream(SeekableByteChannel)}. - */ - default InputStream getInputStream(SeekableByteChannel channel, int sizeLimit) throws IOException { - return ByteStreams.limit(getInputStream(channel), sizeLimit); - } + InputStream getInputStream(SeekableByteChannel channel, int sizeHint) throws IOException; default SeekableByteChannel getWriteChannel(@NotNull final String path, final boolean append) throws IOException { return getWriteChannel(Paths.get(path), append); diff --git a/Util/channel/src/test/java/io/deephaven/util/channel/CachedChannelProviderTest.java b/Util/channel/src/test/java/io/deephaven/util/channel/CachedChannelProviderTest.java index 1bd3339df60..bd9d9428dba 100644 --- a/Util/channel/src/test/java/io/deephaven/util/channel/CachedChannelProviderTest.java +++ b/Util/channel/src/test/java/io/deephaven/util/channel/CachedChannelProviderTest.java @@ -214,7 +214,7 @@ public SeekableByteChannel getReadChannel(@NotNull SeekableChannelContext channe } @Override - public InputStream getInputStream(SeekableByteChannel channel) { + public InputStream getInputStream(SeekableByteChannel channel, int sizeHint) { // TestMockChannel is always empty, so no need to buffer return Channels.newInputStreamNoClose(channel); } diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java index 4b83111542d..2529c50fa10 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java @@ -4,8 +4,6 @@ package io.deephaven.parquet.base; import io.deephaven.UncheckedDeephavenException; -import io.deephaven.util.channel.ChannelPositionInputStream; -import io.deephaven.util.channel.Channels; import io.deephaven.util.channel.SeekableChannelContext; import io.deephaven.util.channel.SeekableChannelsProvider; import io.deephaven.parquet.compress.CompressorAdapter; @@ -25,7 +23,6 @@ import org.apache.parquet.schema.Type; import org.jetbrains.annotations.NotNull; -import java.io.BufferedInputStream; import java.io.IOException; import java.io.InputStream; import java.io.UncheckedIOException; @@ -233,7 +230,7 @@ private Dictionary readDictionary(SeekableByteChannel ch, SeekableChannelContext final int compressedPageSize = pageHeader.getCompressed_page_size(); final BytesInput payload; try (final InputStream in = (compressedPageSize == 0) ? null - : ChannelPositionInputStream.of(ch, channelsProvider.getInputStream(ch, compressedPageSize))) { + : SeekableChannelsProvider.channelPositionInputStream(channelsProvider, ch, compressedPageSize)) { if (compressedPageSize == 0) { // Sometimes the size is explicitly empty, just use an empty payload payload = BytesInput.empty(); @@ -322,9 +319,8 @@ private org.apache.parquet.format.Encoding getEncoding(final PageHeader pageHead } private PageHeader readPageHeader(final SeekableByteChannel ch) throws IOException { - // We don't know the exact size of page header, so we read it through a buffered stream in chunks of 128 bytes - try (final InputStream in = ChannelPositionInputStream.of(ch, - new BufferedInputStream(Channels.newInputStreamNoClose(ch), 128))) { + // We expect page headers to be smaller than 128 bytes + try (final InputStream in = SeekableChannelsProvider.channelPositionInputStream(channelsProvider, ch, 128)) { return Util.readPageHeader(in); } } diff --git a/extensions/parquet/compression/src/main/java/io/deephaven/parquet/compress/DeephavenCompressorAdapterFactory.java b/extensions/parquet/compression/src/main/java/io/deephaven/parquet/compress/DeephavenCompressorAdapterFactory.java index ec6991af549..66d8ebbeef3 100644 --- a/extensions/parquet/compression/src/main/java/io/deephaven/parquet/compress/DeephavenCompressorAdapterFactory.java +++ b/extensions/parquet/compression/src/main/java/io/deephaven/parquet/compress/DeephavenCompressorAdapterFactory.java @@ -154,8 +154,8 @@ public BytesInput decompress(final InputStream inputStream, final int compressed try { // Note that we don't close the decompressed stream because doing so may return the decompressor to the // pool - final CompressionInputStream decompressed = - compressionCodec.createInputStream(inputStream, decompressor); + final InputStream buffered = ByteStreams.limit(IOUtils.buffer(inputStream), compressedSize); + final CompressionInputStream decompressed = compressionCodec.createInputStream(buffered, decompressor); return BytesInput.copy(BytesInput.from(decompressed, uncompressedSize)); } finally { if (decompressor != null) { diff --git a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java index fe15bd18d2e..d6f8ee373ab 100644 --- a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java +++ b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java @@ -1652,7 +1652,7 @@ public void testVersionChecks() { /** * Reference data is generated using the following code: - * + * *

      *      num_rows = 100000
      *      dh_table = empty_table(num_rows).update(formulas=[
diff --git a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3SeekableChannelProvider.java b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3SeekableChannelProvider.java
index b87401e42bb..d819fc4d3c6 100644
--- a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3SeekableChannelProvider.java
+++ b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3SeekableChannelProvider.java
@@ -89,7 +89,7 @@ public SeekableByteChannel getReadChannel(@NotNull final SeekableChannelContext
     }
 
     @Override
-    public InputStream getInputStream(final SeekableByteChannel channel) {
+    public InputStream getInputStream(final SeekableByteChannel channel, final int sizeHint) {
         // S3SeekableByteChannel is internally buffered, no need to re-buffer
         return Channels.newInputStreamNoClose(channel);
     }
diff --git a/extensions/trackedfile/src/main/java/io/deephaven/extensions/trackedfile/TrackedSeekableChannelsProvider.java b/extensions/trackedfile/src/main/java/io/deephaven/extensions/trackedfile/TrackedSeekableChannelsProvider.java
index 978e5d23952..56235d6c336 100644
--- a/extensions/trackedfile/src/main/java/io/deephaven/extensions/trackedfile/TrackedSeekableChannelsProvider.java
+++ b/extensions/trackedfile/src/main/java/io/deephaven/extensions/trackedfile/TrackedSeekableChannelsProvider.java
@@ -3,7 +3,6 @@
 //
 package io.deephaven.extensions.trackedfile;
 
-import com.google.common.io.ByteStreams;
 import io.deephaven.base.FileUtils;
 import io.deephaven.base.verify.Assert;
 import io.deephaven.engine.util.file.FileHandle;
@@ -35,7 +34,7 @@
  */
 final class TrackedSeekableChannelsProvider implements SeekableChannelsProvider {
 
-    private static int MAX_READ_BUFFER_SIZE = 1 << 16; // 64 KiB
+    private static final int MAX_READ_BUFFER_SIZE = 1 << 16; // 64 KiB
 
     private final TrackedFileHandleFactory fileHandleFactory;
 
@@ -62,17 +61,10 @@ public SeekableByteChannel getReadChannel(@Nullable final SeekableChannelContext
     }
 
     @Override
-    public InputStream getInputStream(SeekableByteChannel channel) {
-        // TrackedSeekableByteChannel is not buffered, need to buffer
-        return new BufferedInputStream(Channels.newInputStreamNoClose(channel));
-    }
-
-    @Override
-    public InputStream getInputStream(SeekableByteChannel channel, int sizeLimit) {
-        // The following stream will read from the channel in chunks of bufferSize bytes, up to sizeLimit bytes.
-        final int bufferSize = Math.min(sizeLimit, MAX_READ_BUFFER_SIZE);
-        return new BufferedInputStream(ByteStreams.limit(Channels.newInputStreamNoClose(channel), sizeLimit),
-                bufferSize);
+    public InputStream getInputStream(SeekableByteChannel channel, int sizeHint) {
+        // The following stream will read from the channel in chunks of bufferSize bytes
+        final int bufferSize = Math.min(sizeHint, MAX_READ_BUFFER_SIZE);
+        return new BufferedInputStream(Channels.newInputStreamNoClose(channel), bufferSize);
     }
 
     @Override

From 056d706b1c19ea1dfa59a03cd900367e417796ff Mon Sep 17 00:00:00 2001
From: Shivam Malhotra 
Date: Fri, 14 Jun 2024 14:26:20 -0500
Subject: [PATCH 3/4] Minor changes to code layout for readability

---
 .../parquet/base/ColumnChunkReaderImpl.java   | 67 ++++++++++---------
 1 file changed, 34 insertions(+), 33 deletions(-)

diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java
index 2529c50fa10..d716f97e16b 100644
--- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java
+++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java
@@ -191,15 +191,7 @@ private Dictionary getDictionary(final SeekableChannelContext channelContext) {
         } else {
             return NULL_DICTIONARY;
         }
-        // Use the context object provided by the caller, or create (and close) a new one
-        try (
-                final ContextHolder holder = SeekableChannelContext.ensureContext(channelsProvider, channelContext);
-                final SeekableByteChannel ch =
-                        channelsProvider.getReadChannel(holder.get(), getURI()).position(dictionaryPageOffset)) {
-            return readDictionary(ch, holder.get());
-        } catch (IOException e) {
-            throw new UncheckedIOException(e);
-        }
+        return readDictionary(dictionaryPageOffset, channelContext);
     }
 
     @Override
@@ -218,31 +210,37 @@ public SeekableChannelsProvider getChannelsProvider() {
     }
 
     @NotNull
-    private Dictionary readDictionary(SeekableByteChannel ch, SeekableChannelContext channelContext)
-            throws IOException {
-        // explicitly not closing this, caller is responsible
-        final PageHeader pageHeader = readPageHeader(ch);
-        if (pageHeader.getType() != PageType.DICTIONARY_PAGE) {
-            // In case our fallback in getDictionary was too optimistic...
-            return NULL_DICTIONARY;
-        }
-        final DictionaryPageHeader dictHeader = pageHeader.getDictionary_page_header();
-        final int compressedPageSize = pageHeader.getCompressed_page_size();
-        final BytesInput payload;
-        try (final InputStream in = (compressedPageSize == 0) ? null
-                : SeekableChannelsProvider.channelPositionInputStream(channelsProvider, ch, compressedPageSize)) {
-            if (compressedPageSize == 0) {
-                // Sometimes the size is explicitly empty, just use an empty payload
-                payload = BytesInput.empty();
-            } else {
-                payload = decompressor.decompress(in, compressedPageSize, pageHeader.getUncompressed_page_size(),
-                        channelContext);
+    private Dictionary readDictionary(long dictionaryPageOffset, SeekableChannelContext channelContext) {
+        // Use the context object provided by the caller, or create (and close) a new one
+        try (
+                final ContextHolder holder = SeekableChannelContext.ensureContext(channelsProvider, channelContext);
+                final SeekableByteChannel ch =
+                        channelsProvider.getReadChannel(holder.get(), getURI()).position(dictionaryPageOffset)) {
+            final PageHeader pageHeader = readPageHeader(ch);
+            if (pageHeader.getType() != PageType.DICTIONARY_PAGE) {
+                // In case our fallback in getDictionary was too optimistic...
+                return NULL_DICTIONARY;
+            }
+            final DictionaryPageHeader dictHeader = pageHeader.getDictionary_page_header();
+            final int compressedPageSize = pageHeader.getCompressed_page_size();
+            final BytesInput payload;
+            try (final InputStream in = (compressedPageSize == 0) ? null
+                    : channelsProvider.getInputStream(ch, compressedPageSize)) {
+                if (compressedPageSize == 0) {
+                    // Sometimes the size is explicitly empty, just use an empty payload
+                    payload = BytesInput.empty();
+                } else {
+                    payload = decompressor.decompress(in, compressedPageSize, pageHeader.getUncompressed_page_size(),
+                            holder.get());
+                }
+                final Encoding encoding = Encoding.valueOf(dictHeader.getEncoding().name());
+                final DictionaryPage dictionaryPage = new DictionaryPage(payload, dictHeader.getNum_values(), encoding);
+                // We are safe to not copy the payload because the Dictionary doesn't hold a reference to dictionaryPage
+                // or payload and thus doesn't hold a reference to the input stream.
+                return encoding.initDictionary(path, dictionaryPage);
             }
-            final Encoding encoding = Encoding.valueOf(dictHeader.getEncoding().name());
-            final DictionaryPage dictionaryPage = new DictionaryPage(payload, dictHeader.getNum_values(), encoding);
-            // We are safe to not copy the payload because the Dictionary doesn't hold a reference to dictionaryPage or
-            // payload and thus doesn't hold a reference to the input stream.
-            return encoding.initDictionary(path, dictionaryPage);
+        } catch (IOException e) {
+            throw new UncheckedIOException(e);
         }
     }
 
@@ -318,6 +316,9 @@ private org.apache.parquet.format.Encoding getEncoding(final PageHeader pageHead
         }
     }
 
+    /**
+     * Read the page header from the given channel and increment the channel position by the number of bytes read.
+     */
     private PageHeader readPageHeader(final SeekableByteChannel ch) throws IOException {
         // We expect page headers to be smaller than 128 bytes
         try (final InputStream in = SeekableChannelsProvider.channelPositionInputStream(channelsProvider, ch, 128)) {

From 908615467a2fa7a3957cacc079547b923ac201b3 Mon Sep 17 00:00:00 2001
From: Shivam Malhotra 
Date: Fri, 14 Jun 2024 18:35:00 -0500
Subject: [PATCH 4/4] Javadoc updates

---
 .../io/deephaven/util/channel/SeekableChannelsProvider.java     | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/Util/channel/src/main/java/io/deephaven/util/channel/SeekableChannelsProvider.java b/Util/channel/src/main/java/io/deephaven/util/channel/SeekableChannelsProvider.java
index 88f4eda9414..aca50b64cbf 100644
--- a/Util/channel/src/main/java/io/deephaven/util/channel/SeekableChannelsProvider.java
+++ b/Util/channel/src/main/java/io/deephaven/util/channel/SeekableChannelsProvider.java
@@ -29,6 +29,7 @@ public interface SeekableChannelsProvider extends SafeCloseable {
      *
      * @param provider the provider
      * @param ch the seekable channel
+     * @param sizeHint the number of bytes the caller expects to read from the input stream
      * @return the position-safe input stream
      * @throws IOException if an IO exception occurs
      * @see ChannelPositionInputStream#of(SeekableByteChannel, InputStream)
@@ -77,6 +78,7 @@ SeekableByteChannel getReadChannel(@NotNull SeekableChannelContext channelContex
      * {@link #channelPositionInputStream(SeekableChannelsProvider, SeekableByteChannel, int)}.
      *
      * @param channel the channel
+     * @param sizeHint the number of bytes the caller expects to read from the input stream
      * @return the input stream
      * @throws IOException if an IO exception occurs
      */