From 3b7521e0ee745118aed8a5fa26e208a25209419a Mon Sep 17 00:00:00 2001 From: Shivam Malhotra Date: Tue, 16 Jan 2024 01:03:56 +0530 Subject: [PATCH] Resolving more comments --- .../parquet/base/ColumnChunkReaderImpl.java | 10 +++- .../pagestore/topage/ChunkDictionary.java | 2 +- .../extensions/s3/S3SeekableByteChannel.java | 52 ++++++++++++------- .../s3/S3SeekableChannelProvider.java | 2 +- 4 files changed, 45 insertions(+), 21 deletions(-) diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java index 1ee125b69c6..ed6f54665e6 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java @@ -174,7 +174,15 @@ private Dictionary getDictionary(final SeekableChannelContext channelContext) { } else { return NULL_DICTIONARY; } - return getDictionaryHelper(channelContext, dictionaryPageOffset); + if (channelContext == SeekableChannelContext.NULL) { + // Create a new context object and use that for reading the dictionary + try (final SeekableChannelContext newChannelContext = channelsProvider.makeContext()) { + return getDictionaryHelper(newChannelContext, dictionaryPageOffset); + } + } else { + // Use the context object provided by the caller + return getDictionaryHelper(channelContext, dictionaryPageOffset); + } } private Dictionary getDictionaryHelper(final SeekableChannelContext channelContext, diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/topage/ChunkDictionary.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/topage/ChunkDictionary.java index 66113e5fa83..fd188f76456 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/topage/ChunkDictionary.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/topage/ChunkDictionary.java @@ -48,7 +48,7 @@ public interface Lookup { @NotNull final Lookup lookup, @NotNull final Function dictionarySupplier) { this.valuesSupplier = new LazyCachingSupplier<>(() -> { - // Dictionary is already materialized at this point, therefore, we can safely use NULL context + // We use NULL channel context here and rely on materialization logic to provide the correct context final Dictionary dictionary = dictionarySupplier.apply(SeekableChannelContext.NULL); final T[] values = ObjectChunk.makeArray(dictionary.getMaxId() + 1); for (int ki = 0; ki < values.length; ++ki) { diff --git a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3SeekableByteChannel.java b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3SeekableByteChannel.java index 86a69fbc426..ae6b425c0a6 100644 --- a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3SeekableByteChannel.java +++ b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3SeekableByteChannel.java @@ -132,7 +132,7 @@ void setSize(final long size) { private long size; private int numFragmentsInObject; - private S3ChannelContext s3ChannelContext; + private SeekableChannelContext channelContext; private long position; @@ -143,27 +143,25 @@ void setSize(final long size) { this.key = s3Uri.key().orElse(null); this.s3AsyncClient = s3AsyncClient; this.s3Instructions = s3Instructions; - Assert.instanceOf(channelContext, "channelContext", S3ChannelContext.class); - this.s3ChannelContext = (S3ChannelContext) channelContext; + this.channelContext = channelContext; this.size = UNINITIALIZED_SIZE; this.position = 0; } + /** + * @param channelContext The {@link SeekableChannelContext} object used to cache read-ahead buffers for efficiently + * reading from S3. An appropriate channel context should be set before the read and should be cleared after + * the read is complete. A {@code null} parameter value is equivalent to clearing the context. This parameter + * will be {@link SeekableChannelContext#NULL} if no caching and read ahead is desired. + */ @Override public void setContext(@Nullable final SeekableChannelContext channelContext) { - // null context equivalent to clearing the context - if (channelContext != null && !(channelContext instanceof S3ChannelContext)) { - throw new IllegalArgumentException( - "Context must be null or an instance of S3ChannelContext, provided context of class " + - channelContext.getClass().getName()); - } - this.s3ChannelContext = (S3ChannelContext) channelContext; + this.channelContext = channelContext; } @Override public int read(@NotNull final ByteBuffer destination) throws IOException { - Assert.neqNull(s3ChannelContext, "s3ChannelContext"); - Assert.neq(s3ChannelContext, "s3ChannelContext", SeekableChannelContext.NULL, "SeekableChannelContext.NULL"); + Assert.neqNull(channelContext, "channelContext"); if (!destination.hasRemaining()) { return 0; } @@ -171,7 +169,8 @@ public int read(@NotNull final ByteBuffer destination) throws IOException { checkClosed(localPosition); // Fetch the file size if this is the first read - populateSize(); + final S3ChannelContext s3ChannelContext = getS3ChannelContextFrom(channelContext); + populateSize(s3ChannelContext); if (localPosition >= size) { // We are finished reading return -1; @@ -179,8 +178,8 @@ public int read(@NotNull final ByteBuffer destination) throws IOException { // Send async read requests for current fragment as well as read ahead fragments, if not already in cache final int currFragmentIndex = fragmentIndexForByteNumber(localPosition); - final int numReadAheadFragments = - Math.min(s3Instructions.readAheadCount(), numFragmentsInObject - currFragmentIndex - 1); + final int numReadAheadFragments = channelContext == SeekableChannelContext.NULL ? 0 + : Math.min(s3Instructions.readAheadCount(), numFragmentsInObject - currFragmentIndex - 1); for (int idx = currFragmentIndex; idx <= currFragmentIndex + numReadAheadFragments; idx++) { final CompletableFuture future = s3ChannelContext.getCachedFuture(idx); if (future == null) { @@ -213,6 +212,19 @@ public int read(@NotNull final ByteBuffer destination) throws IOException { return sizeToCopy; } + private static S3ChannelContext getS3ChannelContextFrom(@NotNull final SeekableChannelContext channelContext) { + final S3ChannelContext s3ChannelContext; + if (channelContext == SeekableChannelContext.NULL) { + // Create a new temporary context just for this read with a cache size of 1 just to support the current + // chunk with no read ahead + s3ChannelContext = new S3ChannelContext(1); + } else { + Assert.instanceOf(channelContext, "channelContext", S3ChannelContext.class); + s3ChannelContext = (S3ChannelContext) channelContext; + } + return s3ChannelContext; + } + private int fragmentIndexForByteNumber(final long byteNumber) { return Math.toIntExact(byteNumber / s3Instructions.fragmentSize()); } @@ -273,15 +285,19 @@ public SeekableByteChannel position(final long newPosition) throws ClosedChannel @Override public long size() throws IOException { checkClosed(position); - populateSize(); + populateSize(getS3ChannelContextFrom(channelContext)); return size; } - private void populateSize() throws IOException { + private void populateSize(final S3ChannelContext s3ChannelContext) throws IOException { if (size != UNINITIALIZED_SIZE) { + // Store the size in the context if it is uninitialized + if (s3ChannelContext.getSize() == UNINITIALIZED_SIZE) { + s3ChannelContext.setSize(size); + } return; } - if (s3ChannelContext.getSize() < 0) { + if (s3ChannelContext.getSize() == UNINITIALIZED_SIZE) { // Fetch the size of the file on the first read using a blocking HEAD request, and store it in the context // for future use final HeadObjectResponse headObjectResponse; diff --git a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3SeekableChannelProvider.java b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3SeekableChannelProvider.java index e5ab39f449a..63fca0c621f 100644 --- a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3SeekableChannelProvider.java +++ b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3SeekableChannelProvider.java @@ -48,7 +48,7 @@ public SeekableChannelContext makeContext() { @Override public boolean isCompatibleWith(@NotNull final SeekableChannelContext channelContext) { - // A null context implies no caching + // A null context implies no caching or read ahead return channelContext == SeekableChannelContext.NULL || channelContext instanceof S3SeekableByteChannel.S3ChannelContext; }