Skip to content

Commit

Permalink
Resolving more comments
Browse files Browse the repository at this point in the history
  • Loading branch information
malhotrashivam committed Jan 15, 2024
1 parent 27e1638 commit 3b7521e
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,15 @@ private Dictionary getDictionary(final SeekableChannelContext channelContext) {
} else {
return NULL_DICTIONARY;
}
return getDictionaryHelper(channelContext, dictionaryPageOffset);
if (channelContext == SeekableChannelContext.NULL) {
// Create a new context object and use that for reading the dictionary
try (final SeekableChannelContext newChannelContext = channelsProvider.makeContext()) {
return getDictionaryHelper(newChannelContext, dictionaryPageOffset);
}
} else {
// Use the context object provided by the caller
return getDictionaryHelper(channelContext, dictionaryPageOffset);
}
}

private Dictionary getDictionaryHelper(final SeekableChannelContext channelContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public interface Lookup<T> {
@NotNull final Lookup<T> lookup,
@NotNull final Function<SeekableChannelContext, Dictionary> dictionarySupplier) {
this.valuesSupplier = new LazyCachingSupplier<>(() -> {
// Dictionary is already materialized at this point, therefore, we can safely use NULL context
// We use NULL channel context here and rely on materialization logic to provide the correct context
final Dictionary dictionary = dictionarySupplier.apply(SeekableChannelContext.NULL);
final T[] values = ObjectChunk.makeArray(dictionary.getMaxId() + 1);
for (int ki = 0; ki < values.length; ++ki) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ void setSize(final long size) {
private long size;
private int numFragmentsInObject;

private S3ChannelContext s3ChannelContext;
private SeekableChannelContext channelContext;

private long position;

Expand All @@ -143,44 +143,43 @@ void setSize(final long size) {
this.key = s3Uri.key().orElse(null);
this.s3AsyncClient = s3AsyncClient;
this.s3Instructions = s3Instructions;
Assert.instanceOf(channelContext, "channelContext", S3ChannelContext.class);
this.s3ChannelContext = (S3ChannelContext) channelContext;
this.channelContext = channelContext;
this.size = UNINITIALIZED_SIZE;
this.position = 0;
}

/**
* @param channelContext The {@link SeekableChannelContext} object used to cache read-ahead buffers for efficiently
* reading from S3. An appropriate channel context should be set before the read and should be cleared after
* the read is complete. A {@code null} parameter value is equivalent to clearing the context. This parameter
* will be {@link SeekableChannelContext#NULL} if no caching and read ahead is desired.
*/
@Override
public void setContext(@Nullable final SeekableChannelContext channelContext) {
// null context equivalent to clearing the context
if (channelContext != null && !(channelContext instanceof S3ChannelContext)) {
throw new IllegalArgumentException(
"Context must be null or an instance of S3ChannelContext, provided context of class " +
channelContext.getClass().getName());
}
this.s3ChannelContext = (S3ChannelContext) channelContext;
this.channelContext = channelContext;
}

@Override
public int read(@NotNull final ByteBuffer destination) throws IOException {
Assert.neqNull(s3ChannelContext, "s3ChannelContext");
Assert.neq(s3ChannelContext, "s3ChannelContext", SeekableChannelContext.NULL, "SeekableChannelContext.NULL");
Assert.neqNull(channelContext, "channelContext");
if (!destination.hasRemaining()) {
return 0;
}
final long localPosition = position;
checkClosed(localPosition);

// Fetch the file size if this is the first read
populateSize();
final S3ChannelContext s3ChannelContext = getS3ChannelContextFrom(channelContext);
populateSize(s3ChannelContext);
if (localPosition >= size) {
// We are finished reading
return -1;
}

// Send async read requests for current fragment as well as read ahead fragments, if not already in cache
final int currFragmentIndex = fragmentIndexForByteNumber(localPosition);
final int numReadAheadFragments =
Math.min(s3Instructions.readAheadCount(), numFragmentsInObject - currFragmentIndex - 1);
final int numReadAheadFragments = channelContext == SeekableChannelContext.NULL ? 0
: Math.min(s3Instructions.readAheadCount(), numFragmentsInObject - currFragmentIndex - 1);
for (int idx = currFragmentIndex; idx <= currFragmentIndex + numReadAheadFragments; idx++) {
final CompletableFuture<ByteBuffer> future = s3ChannelContext.getCachedFuture(idx);
if (future == null) {
Expand Down Expand Up @@ -213,6 +212,19 @@ public int read(@NotNull final ByteBuffer destination) throws IOException {
return sizeToCopy;
}

private static S3ChannelContext getS3ChannelContextFrom(@NotNull final SeekableChannelContext channelContext) {
final S3ChannelContext s3ChannelContext;
if (channelContext == SeekableChannelContext.NULL) {
// Create a new temporary context just for this read with a cache size of 1 just to support the current
// chunk with no read ahead
s3ChannelContext = new S3ChannelContext(1);
} else {
Assert.instanceOf(channelContext, "channelContext", S3ChannelContext.class);
s3ChannelContext = (S3ChannelContext) channelContext;
}
return s3ChannelContext;
}

private int fragmentIndexForByteNumber(final long byteNumber) {
return Math.toIntExact(byteNumber / s3Instructions.fragmentSize());
}
Expand Down Expand Up @@ -273,15 +285,19 @@ public SeekableByteChannel position(final long newPosition) throws ClosedChannel
@Override
public long size() throws IOException {
checkClosed(position);
populateSize();
populateSize(getS3ChannelContextFrom(channelContext));
return size;
}

private void populateSize() throws IOException {
private void populateSize(final S3ChannelContext s3ChannelContext) throws IOException {
if (size != UNINITIALIZED_SIZE) {
// Store the size in the context if it is uninitialized
if (s3ChannelContext.getSize() == UNINITIALIZED_SIZE) {
s3ChannelContext.setSize(size);
}
return;
}
if (s3ChannelContext.getSize() < 0) {
if (s3ChannelContext.getSize() == UNINITIALIZED_SIZE) {
// Fetch the size of the file on the first read using a blocking HEAD request, and store it in the context
// for future use
final HeadObjectResponse headObjectResponse;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public SeekableChannelContext makeContext() {

@Override
public boolean isCompatibleWith(@NotNull final SeekableChannelContext channelContext) {
// A null context implies no caching
// A null context implies no caching or read ahead
return channelContext == SeekableChannelContext.NULL
|| channelContext instanceof S3SeekableByteChannel.S3ChannelContext;
}
Expand Down

0 comments on commit 3b7521e

Please sign in to comment.