diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/SourceTable.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/SourceTable.java index 679a6e63f59..f2870f10631 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/SourceTable.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/SourceTable.java @@ -116,7 +116,6 @@ private void initialize() { @TestUseOnly public final TableLocationProvider tableLocationProvider() { return locationProvider; - // Ignore } /** diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/RegionContextHolder.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/RegionContextHolder.java index 61843b127d9..50502bd1676 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/RegionContextHolder.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/RegionContextHolder.java @@ -14,6 +14,7 @@ public class RegionContextHolder implements ChunkSource.FillContext { private Context innerContext; public RegionContextHolder(final int chunkCapacity, @Nullable final SharedContext sharedContext) { + this.chunkCapacity = chunkCapacity; this.sharedContext = sharedContext; } diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileReader.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileReader.java index dcf4523a5af..df9c509c1ce 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileReader.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileReader.java @@ -67,9 +67,10 @@ public ParquetFileReader(final URI parquetFileURI, final SeekableChannelsProvide } // Root path should be this file if a single file, else the parent directory for a metadata file rootPath = parquetFileURI.getRawPath().endsWith(".parquet") ? filePath : filePath.getParent(); + // TODO Close this context after Ryan's patch + final SeekableChannelsProvider.ChannelContext context = channelsProvider.makeContext(); final byte[] footer; - try (final SeekableChannelsProvider.ChannelContext context = channelsProvider.makeContext(); - final SeekableByteChannel readChannel = channelsProvider.getReadChannel(context, filePath)) { + try (final SeekableByteChannel readChannel = channelsProvider.getReadChannel(context, filePath)) { final long fileLen = readChannel.size(); if (fileLen < MAGIC.length + FOOTER_LENGTH_SIZE + MAGIC.length) { // MAGIC + data + footer + // footerIndex + MAGIC diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/S3ParquetInstructions.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/S3ParquetInstructions.java index 3f35a13c141..0e4de2f725c 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/S3ParquetInstructions.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/S3ParquetInstructions.java @@ -3,6 +3,9 @@ import io.deephaven.annotations.BuildableStyle; import org.immutables.value.Value; +/** + * This class provides instructions intended for reading and writing parquet files to AWS S3 instances. + */ @Value.Immutable @BuildableStyle public abstract class S3ParquetInstructions { @@ -17,23 +20,39 @@ public static Builder builder() { return ImmutableS3ParquetInstructions.builder(); } + /** + * The AWS region name to use when reading or writing to S3. + */ public abstract String awsRegionName(); + /** + * The maximum number of concurrent requests to make to S3. + */ @Value.Default public int maxConcurrentRequests() { return DEFAULT_MAX_CONCURRENT_REQUESTS; } + /** + * The number of fragments to send asynchronous read requests for while reading the current fragment. + */ @Value.Default public int readAheadCount() { return DEFAULT_READ_AHEAD_COUNT; } + /** + * The maximum size of each fragment to read from S3. The fetched fragment can be smaller than this in case fewer + * bytes remaining in the file. + */ @Value.Default public int fragmentSize() { return DEFAULT_FRAGMENT_SIZE; } + /** + * The maximum number of fragments to cache in memory. + */ @Value.Default public int maxCacheSize() { return DEFAULT_MAX_CACHE_SIZE; diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/util/ByteBufferAsyncResponseTransformer.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/util/ByteBufferAsyncResponseTransformer.java index 8f56bbc9ab6..af46463ce86 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/util/ByteBufferAsyncResponseTransformer.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/util/ByteBufferAsyncResponseTransformer.java @@ -11,13 +11,14 @@ /** * An {@link AsyncResponseTransformer} that transforms a response into a {@link ByteBuffer}. + * This class is inspired from {@link software.amazon.awssdk.core.internal.async.ByteArrayAsyncResponseTransformer} + * but avoids a number of extra copies done by the former. * * @param POJO response type. */ public final class ByteBufferAsyncResponseTransformer implements AsyncResponseTransformer { private volatile CompletableFuture cf; - private ResponseT response; private final ByteBuffer byteBuffer; ByteBufferAsyncResponseTransformer(final int bufferSize) { @@ -33,14 +34,7 @@ public CompletableFuture prepare() { @Override public void onResponse(ResponseT response) { - this.response = response; - } - - /** - * @return the unmarshalled response object from the service. - */ - public ResponseT response() { - return response; + // No need to store the response object as we are only interested in the byte buffer } @Override @@ -55,9 +49,7 @@ public void exceptionOccurred(Throwable throwable) { final static class ByteBuferSubscriber implements Subscriber { private final CompletableFuture resultFuture; - private Subscription subscription; - private final ByteBuffer byteBuffer; ByteBuferSubscriber(CompletableFuture resultFuture, ByteBuffer byteBuffer) { diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/util/S3SeekableByteChannel.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/util/S3SeekableByteChannel.java index 6a050ccc9ab..00f19a8245a 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/util/S3SeekableByteChannel.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/util/S3SeekableByteChannel.java @@ -1,38 +1,56 @@ package io.deephaven.parquet.table.util; +import io.deephaven.UncheckedDeephavenException; import io.deephaven.base.verify.Assert; import io.deephaven.parquet.base.util.SeekableChannelsProvider; +import org.jetbrains.annotations.NotNull; import software.amazon.awssdk.services.s3.S3AsyncClient; import javax.annotation.Nullable; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; -import java.nio.channels.ReadableByteChannel; import java.nio.channels.SeekableByteChannel; -import java.nio.channels.WritableByteChannel; -import java.nio.file.StandardOpenOption; import java.security.InvalidParameterException; import java.util.ArrayList; import java.util.List; -import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; + +/** + * {@link SeekableByteChannel} class used to fetch objects from AWS S3 buckets using an async client with the ability + * to read ahead and cache fragments of the object. + */ public final class S3SeekableByteChannel implements SeekableByteChannel, SeekableChannelsProvider.ContextHolder { + private static final long CLOSED_SENTINEL = -1; + + /** + * Maximum time to wait while fetching fragments from S3. + */ + static final Long CONNECTION_TIMEOUT_MINUTES = 1L; + static final TimeUnit CONNECTION_TIMEOUT_UNIT = TimeUnit.MINUTES; + /** * Context object used to store read-ahead buffers for efficiently reading from S3. */ static final class ChannelContext implements SeekableChannelsProvider.ChannelContext { /** - * Used to store context information for fetching a single fragment from S3 + * Used to store information related to a single fragment */ static class FragmentContext { + /** + * The index of the fragment in the object + */ private final int fragmentIndex; + + /** + * The future that will be completed with the fragment's bytes + */ private final CompletableFuture future; private FragmentContext(final int fragmentIndex, final CompletableFuture future) { @@ -64,75 +82,78 @@ void setFragmentContext(final int fragmentIndex, final CompletableFuture Bytes are read starting at this channel's current position, and - * then the position is updated with the number of bytes actually read. - * Otherwise, this method behaves exactly as specified in the {@link - * ReadableByteChannel} interface. - * - * @param dst the destination buffer - * @return the number of bytes read or -1 if no more bytes can be read. - */ @Override - public int read(final ByteBuffer dst) throws IOException { - validateOpen(); - - Objects.requireNonNull(dst); - - final long channelPosition = this.position(); - - // if the position of the delegator is at the end (>= size) return -1. we're finished reading. - if (channelPosition >= size) { + public int read(@NotNull final ByteBuffer destination) throws ClosedChannelException { + Assert.neqNull(context, "context"); + if (!destination.hasRemaining()) { + return 0; + } + final long localPosition = position; + checkClosed(localPosition); + if (localPosition >= size) { + // We are finished reading return -1; } - // Figure out the index of the fragment the bytes would start in - final int currFragmentIndex = fragmentIndexForByteNumber(channelPosition); - final int fragmentOffset = (int) (channelPosition - ((long) currFragmentIndex * fragmentSize)); - Assert.neqNull(context, "context"); + final int currFragmentIndex = fragmentIndexForByteNumber(localPosition); + final int fragmentOffset = (int) (localPosition - (currFragmentIndex * fragmentSize)); - // Blocking fetch the current fragment if it's not already in the cache + // Send async read request the current fragment, if it's not already in the cache final ChannelContext.FragmentContext fragmentContext = context.getFragmentContext(currFragmentIndex); final CompletableFuture fetchCurrFragment; if (fragmentContext != null && fragmentContext.fragmentIndex == currFragmentIndex) { @@ -141,25 +162,8 @@ public int read(final ByteBuffer dst) throws IOException { fetchCurrFragment = computeFragmentFuture(currFragmentIndex); context.setFragmentContext(currFragmentIndex, fetchCurrFragment); } - final ByteBuffer currentFragment; - try { - currentFragment = fetchCurrFragment.get(timeout, timeUnit).asReadOnlyBuffer(); - } catch (final InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException(e); - } catch (final ExecutionException e) { - throw new IOException(e); - } catch (final TimeoutException e) { - throw new RuntimeException(e); - } - // Put the bytes from fragment from the offset up to the min of fragment remaining or dst remaining - currentFragment.position(fragmentOffset); - final int limit = Math.min(currentFragment.remaining(), dst.remaining()); - currentFragment.limit(currentFragment.position() + limit); - dst.put(currentFragment); - - // Send requests for read-ahead buffers + // Send async requests for read-ahead buffers and store them in the cache final int numFragmentsToLoad = Math.min(readAheadCount, numFragmentsInObject - currFragmentIndex - 1); for (int i = 0; i < numFragmentsToLoad; i++) { final int readAheadFragmentIndex = i + currFragmentIndex + 1; @@ -169,16 +173,26 @@ public int read(final ByteBuffer dst) throws IOException { } } - position(channelPosition + limit); + // Wait till the current fragment is fetched + final ByteBuffer currentFragment; + try { + currentFragment = fetchCurrFragment.get(timeout, timeUnit).asReadOnlyBuffer(); + } catch (final InterruptedException | ExecutionException | TimeoutException | RuntimeException e) { + throw new UncheckedDeephavenException("Failed to fetch fragment " + currFragmentIndex + " at byte offset " + + fragmentOffset + " for file " + key + " in S3 bucket " + bucket, e); + } + + // Copy the bytes from fragment from the offset up to the min of remaining fragment and destination bytes. + // Therefore, the number of bytes read by this method can be less than the number of bytes remaining in the + // destination buffer. + currentFragment.position(fragmentOffset); + final int limit = Math.min(currentFragment.remaining(), destination.remaining()); + currentFragment.limit(currentFragment.position() + limit); + destination.put(currentFragment); + position = localPosition + limit; return limit; } - /** - * Compute which fragment a byte should be in - * - * @param byteNumber the number of the byte in the object accessed by this channel - * @return the index of the fragment in which {@code byteNumber} will be found. - */ private int fragmentIndexForByteNumber(final long byteNumber) { return Math.toIntExact(Math.floorDiv(byteNumber, (long) fragmentSize)); } @@ -187,13 +201,8 @@ private CompletableFuture computeFragmentFuture(final int fragmentIn final long readFrom = (long) fragmentIndex * fragmentSize; final long readTo = Math.min(readFrom + fragmentSize, size) - 1; final String range = "bytes=" + readFrom + "-" + readTo; - - return s3Client.getObject( - builder -> builder - .bucket(bucket) - .key(key) - .range(range), - new ByteBufferAsyncResponseTransformer<>(fragmentSize)); + return s3Client.getObject(builder -> builder.bucket(bucket).key(key).range(range), + new ByteBufferAsyncResponseTransformer<>(fragmentSize)); } @Override @@ -203,32 +212,25 @@ public int write(final ByteBuffer src) throws IOException { @Override public long position() throws ClosedChannelException { - validateOpen(); - - synchronized (this) { - return position; - } + final long localPosition = position; + checkClosed(localPosition); + return localPosition; } @Override public SeekableByteChannel position(final long newPosition) throws ClosedChannelException { - if (newPosition < 0) - throw new IllegalArgumentException("newPosition cannot be < 0"); - - if (!isOpen()) { - throw new ClosedChannelException(); - } - - synchronized (this) { - position = newPosition; - return this; + if (newPosition < 0) { + throw new IllegalArgumentException("newPosition cannot be < 0, provided newPosition=" + newPosition); } + checkClosed(position); + position = newPosition; + return this; } @Override public long size() throws ClosedChannelException { - validateOpen(); - return this.size; + checkClosed(position); + return size; } @Override @@ -238,20 +240,16 @@ public SeekableByteChannel truncate(final long size) { @Override public boolean isOpen() { - synchronized (this) { - return !this.closed; - } + return position != CLOSED_SENTINEL; } @Override public void close() throws IOException { - synchronized (this) { - closed = true; - } + position = CLOSED_SENTINEL; } - private void validateOpen() throws ClosedChannelException { - if (this.closed) { + private static void checkClosed(final long position) throws ClosedChannelException { + if (position == CLOSED_SENTINEL) { throw new ClosedChannelException(); } } diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/util/S3SeekableChannelProvider.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/util/S3SeekableChannelProvider.java index b90c4dc756d..24e8e1f8da1 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/util/S3SeekableChannelProvider.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/util/S3SeekableChannelProvider.java @@ -1,5 +1,7 @@ package io.deephaven.parquet.table.util; +import io.deephaven.UncheckedDeephavenException; +import io.deephaven.base.verify.Assert; import io.deephaven.parquet.base.util.SeekableChannelsProvider; import io.deephaven.parquet.table.ParquetInstructions; import io.deephaven.parquet.table.S3ParquetInstructions; @@ -11,7 +13,6 @@ import software.amazon.awssdk.http.crt.AwsCrtAsyncHttpClient; import java.io.IOException; -import java.io.UnsupportedEncodingException; import java.net.URI; import java.nio.channels.SeekableByteChannel; import java.nio.file.Path; @@ -19,67 +20,71 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import static io.deephaven.parquet.table.util.S3SeekableByteChannel.CONNECTION_TIMEOUT_MINUTES; +import static io.deephaven.parquet.table.util.S3SeekableByteChannel.CONNECTION_TIMEOUT_UNIT; + +/** + * {@link SeekableChannelsProvider} implementation that is used to fetch objects from AWS S3 instances. + */ public final class S3SeekableChannelProvider implements SeekableChannelsProvider { private final S3AsyncClient s3AsyncClient; + + /** + * Parquet file {@link URI} stored as a {@link Path} to save on parsing time at the time of constructing new read + * channels. Note that this conversion is lossy, as the {@link URI} will contain characters "s3://" but {@link Path} + * will have "s3:/". + */ + private final Path parquetFilePath; private final String bucket, key; private final long size; - private final int maxConcurrentRequests; private final int fragmentSize; private final int maxCacheSize; private final int readAheadCount; - private final Map contextMap = new HashMap<>(); + private final Map contextMap = new HashMap<>(); // TODO Remove this - public S3SeekableChannelProvider(final URI parquetFileURI, final ParquetInstructions readInstructions) - throws IOException { + public S3SeekableChannelProvider(final URI parquetFileURI, final ParquetInstructions readInstructions) { if (!(readInstructions.getSpecialInstructions() instanceof S3ParquetInstructions)) { throw new IllegalArgumentException("Must provide S3ParquetInstructions to read files from S3"); } final S3ParquetInstructions s3Instructions = (S3ParquetInstructions) readInstructions.getSpecialInstructions(); final String awsRegionName = s3Instructions.awsRegionName(); - maxConcurrentRequests = s3Instructions.maxConcurrentRequests(); + final int maxConcurrentRequests = s3Instructions.maxConcurrentRequests(); final SdkAsyncHttpClient asyncHttpClient = AwsCrtAsyncHttpClient.builder() .maxConcurrency(maxConcurrentRequests) - .connectionTimeout(Duration.ofSeconds(5)) + .connectionTimeout(Duration.ofMinutes(CONNECTION_TIMEOUT_MINUTES)) .build(); - s3AsyncClient = S3AsyncClient.builder() + this.s3AsyncClient = S3AsyncClient.builder() .region(Region.of(awsRegionName)) .httpClient(asyncHttpClient) .build(); - fragmentSize = s3Instructions.fragmentSize(); - maxCacheSize = s3Instructions.maxCacheSize(); - readAheadCount = s3Instructions.readAheadCount(); + this.fragmentSize = s3Instructions.fragmentSize(); + this.maxCacheSize = s3Instructions.maxCacheSize(); + this.readAheadCount = s3Instructions.readAheadCount(); + this.parquetFilePath = Path.of(parquetFileURI.toString()); this.bucket = parquetFileURI.getHost(); this.key = parquetFileURI.getPath().substring(1); - // Send HEAD request to S3 to get the size of the file - { - final long timeOut = 1L; - final TimeUnit unit = TimeUnit.MINUTES; - - final HeadObjectResponse headObjectResponse; - try { - headObjectResponse = s3AsyncClient.headObject( - builder -> builder.bucket(bucket).key(key)).get(timeOut, unit); - } catch (final InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException(e); - } catch (final ExecutionException | TimeoutException e) { - throw new IOException(e); - } - this.size = headObjectResponse.contentLength(); + // Send a blocking HEAD request to S3 to get the size of the file + final HeadObjectResponse headObjectResponse; + try { + headObjectResponse = s3AsyncClient.headObject( + builder -> builder.bucket(bucket).key(key)) + .get(CONNECTION_TIMEOUT_MINUTES, CONNECTION_TIMEOUT_UNIT); + } catch (final InterruptedException | ExecutionException | TimeoutException | RuntimeException e) { + throw new UncheckedDeephavenException("Failed to fetch HEAD for file " + parquetFileURI, e); } + this.size = headObjectResponse.contentLength(); } @Override public SeekableByteChannel getReadChannel(@NotNull final SeekableChannelsProvider.ChannelContext context, @NotNull final Path path) { - // Ignore the context provided here, will be set properly before reading + Assert.equals(parquetFilePath, "parquetFilePath", path, "path"); return new S3SeekableByteChannel(context, bucket, key, s3AsyncClient, size, fragmentSize, readAheadCount); } @@ -90,6 +95,7 @@ public ChannelContext makeContext() { return contextMap.get(tid); } else { final ChannelContext context; + // TODO Remove this part synchronized (contextMap) { if (contextMap.containsKey(tid)) { return contextMap.get(tid); @@ -102,9 +108,8 @@ public ChannelContext makeContext() { } @Override - public SeekableByteChannel getWriteChannel(@NotNull final Path path, final boolean append) - throws UnsupportedEncodingException { - throw new UnsupportedEncodingException("Don't support writing to S3 yet"); + public SeekableByteChannel getWriteChannel(@NotNull final Path path, final boolean append) { + throw new UnsupportedOperationException("Don't support writing to S3 yet"); } public void close() throws IOException {