Skip to content

Commit

Permalink
Improved comments and Javadocs
Browse files Browse the repository at this point in the history
  • Loading branch information
malhotrashivam committed Dec 29, 2023
1 parent af10a1a commit edbd29a
Show file tree
Hide file tree
Showing 7 changed files with 158 additions and 143 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ private void initialize() {
@TestUseOnly
public final TableLocationProvider tableLocationProvider() {
return locationProvider;
// Ignore
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ public class RegionContextHolder implements ChunkSource.FillContext {
private Context innerContext;

public RegionContextHolder(final int chunkCapacity, @Nullable final SharedContext sharedContext) {

this.chunkCapacity = chunkCapacity;
this.sharedContext = sharedContext;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,10 @@ public ParquetFileReader(final URI parquetFileURI, final SeekableChannelsProvide
}
// Root path should be this file if a single file, else the parent directory for a metadata file
rootPath = parquetFileURI.getRawPath().endsWith(".parquet") ? filePath : filePath.getParent();
// TODO Close this context after Ryan's patch
final SeekableChannelsProvider.ChannelContext context = channelsProvider.makeContext();
final byte[] footer;
try (final SeekableChannelsProvider.ChannelContext context = channelsProvider.makeContext();
final SeekableByteChannel readChannel = channelsProvider.getReadChannel(context, filePath)) {
try (final SeekableByteChannel readChannel = channelsProvider.getReadChannel(context, filePath)) {
final long fileLen = readChannel.size();
if (fileLen < MAGIC.length + FOOTER_LENGTH_SIZE + MAGIC.length) { // MAGIC + data + footer +
// footerIndex + MAGIC
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
import io.deephaven.annotations.BuildableStyle;
import org.immutables.value.Value;

/**
* This class provides instructions intended for reading and writing parquet files to AWS S3 instances.
*/
@Value.Immutable
@BuildableStyle
public abstract class S3ParquetInstructions {
Expand All @@ -17,23 +20,39 @@ public static Builder builder() {
return ImmutableS3ParquetInstructions.builder();
}

/**
* The AWS region name to use when reading or writing to S3.
*/
public abstract String awsRegionName();

/**
* The maximum number of concurrent requests to make to S3.
*/
@Value.Default
public int maxConcurrentRequests() {
return DEFAULT_MAX_CONCURRENT_REQUESTS;
}

/**
* The number of fragments to send asynchronous read requests for while reading the current fragment.
*/
@Value.Default
public int readAheadCount() {
return DEFAULT_READ_AHEAD_COUNT;
}

/**
* The maximum size of each fragment to read from S3. The fetched fragment can be smaller than this in case fewer
* bytes remaining in the file.
*/
@Value.Default
public int fragmentSize() {
return DEFAULT_FRAGMENT_SIZE;
}

/**
* The maximum number of fragments to cache in memory.
*/
@Value.Default
public int maxCacheSize() {
return DEFAULT_MAX_CACHE_SIZE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@

/**
* An {@link AsyncResponseTransformer} that transforms a response into a {@link ByteBuffer}.
* This class is inspired from {@link software.amazon.awssdk.core.internal.async.ByteArrayAsyncResponseTransformer}
* but avoids a number of extra copies done by the former.
*
* @param <ResponseT> POJO response type.
*/
public final class ByteBufferAsyncResponseTransformer<ResponseT> implements AsyncResponseTransformer<ResponseT, ByteBuffer> {

private volatile CompletableFuture<ByteBuffer> cf;
private ResponseT response;
private final ByteBuffer byteBuffer;

ByteBufferAsyncResponseTransformer(final int bufferSize) {
Expand All @@ -33,14 +34,7 @@ public CompletableFuture<ByteBuffer> prepare() {

@Override
public void onResponse(ResponseT response) {
this.response = response;
}

/**
* @return the unmarshalled response object from the service.
*/
public ResponseT response() {
return response;
// No need to store the response object as we are only interested in the byte buffer
}

@Override
Expand All @@ -55,9 +49,7 @@ public void exceptionOccurred(Throwable throwable) {

final static class ByteBuferSubscriber implements Subscriber<ByteBuffer> {
private final CompletableFuture<ByteBuffer> resultFuture;

private Subscription subscription;

private final ByteBuffer byteBuffer;

ByteBuferSubscriber(CompletableFuture<ByteBuffer> resultFuture, ByteBuffer byteBuffer) {
Expand Down
Loading

0 comments on commit edbd29a

Please sign in to comment.