From f70322700be04bb89946f447c7946fad1304ff1e Mon Sep 17 00:00:00 2001 From: Shivam Malhotra Date: Thu, 28 Dec 2023 17:46:49 +0530 Subject: [PATCH] Added s3 specific parquet instructions Also added support for Python --- .../deephaven/parquet/table/ParquetTools.java | 11 +- .../parquet/table/S3ParquetInstructions.java | 18 +- .../table/util/S3SeekableByteChannel.java | 173 +++--------------- .../table/util/S3SeekableChannelProvider.java | 60 +++--- .../table/ParquetTableReadWriteTest.java | 52 +++++- py/server/deephaven/parquet.py | 127 +++++++++---- 6 files changed, 204 insertions(+), 237 deletions(-) diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTools.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTools.java index a7242cdfb34..02465bc9b2d 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTools.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTools.java @@ -997,6 +997,13 @@ public static Table readSingleFileTable( return readSingleFileTable(convertToURI(file), readInstructions, tableDefinition); } + public static Table readSingleFileTable( + @NotNull final String filePath, + @NotNull final ParquetInstructions readInstructions, + @NotNull final TableDefinition tableDefinition) { + return readSingleFileTable(convertToURI(filePath), readInstructions, tableDefinition); + } + private static Table readSingleFileTable( @NotNull final URI parquetFileURI, @NotNull final ParquetInstructions readInstructions, @@ -1122,9 +1129,7 @@ public static ParquetFileReader getParquetFileReaderChecked( if (parquetFileURI.getScheme() != null && parquetFileURI.getScheme().equals(S3_PARQUET_FILE_URI_SCHEME)) { return new ParquetFileReader(parquetFileURI, new CachedChannelProvider( - new S3SeekableChannelProvider(readInstructions.getAwsRegionName(), - parquetFileURI.toString()), - 1 << 7)); + new S3SeekableChannelProvider(parquetFileURI, readInstructions), 1 << 7)); } return new ParquetFileReader( parquetFileURI, diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/S3ParquetInstructions.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/S3ParquetInstructions.java index d9c46f1cb77..3f35a13c141 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/S3ParquetInstructions.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/S3ParquetInstructions.java @@ -7,14 +7,14 @@ @BuildableStyle public abstract class S3ParquetInstructions { - private final static int DEFAULT_MAX_CONCURRENT_REQUESTS = 20; + private final static int DEFAULT_MAX_CONCURRENT_REQUESTS = 50; private final static int DEFAULT_READ_AHEAD_COUNT = 1; - private final static int DEFAULT_MAX_FRAGMENT_SIZE = 512 << 20; // 5 MB - private final static int MIN_MAX_FRAGMENT_SIZE = 8 << 10; // 8 KB + private final static int DEFAULT_FRAGMENT_SIZE = 512 << 20; // 5 MB + private final static int MIN_FRAGMENT_SIZE = 8 << 10; // 8 KB private final static int DEFAULT_MAX_CACHE_SIZE = 50; public static Builder builder() { - return S3ParquetInstructions.builder(); + return ImmutableS3ParquetInstructions.builder(); } public abstract String awsRegionName(); @@ -30,8 +30,8 @@ public int readAheadCount() { } @Value.Default - public int maxFragmentSize() { - return DEFAULT_MAX_FRAGMENT_SIZE; + public int fragmentSize() { + return DEFAULT_FRAGMENT_SIZE; } @Value.Default @@ -55,8 +55,8 @@ final void boundsCheckReadAheadCount() { @Value.Check final void boundsCheckMaxFragmentSize() { - if (maxFragmentSize() < MIN_MAX_FRAGMENT_SIZE) { - throw new IllegalArgumentException("maxFragmentSize(=" + maxFragmentSize() + ") must be >= 8*1024 or 8 KB"); + if (fragmentSize() < MIN_FRAGMENT_SIZE) { + throw new IllegalArgumentException("fragmentSize(=" + fragmentSize() + ") must be >= 8*1024 or 8 KB"); } } @@ -74,7 +74,7 @@ public interface Builder { Builder readAheadCount(int readAheadCount); - Builder maxFragmentSize(int maxFragmentSize); + Builder fragmentSize(int fragmentSize); Builder maxCacheSize(int maxCacheSize); diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/util/S3SeekableByteChannel.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/util/S3SeekableByteChannel.java index 350a609be0c..6a050ccc9ab 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/util/S3SeekableByteChannel.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/util/S3SeekableByteChannel.java @@ -1,10 +1,7 @@ package io.deephaven.parquet.table.util; import io.deephaven.base.verify.Assert; -import io.deephaven.configuration.Configuration; import io.deephaven.parquet.base.util.SeekableChannelsProvider; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import software.amazon.awssdk.services.s3.S3AsyncClient; import javax.annotation.Nullable; @@ -72,50 +69,37 @@ FragmentContext getFragmentContext(final int fragmentIndex) { private long position; private final S3AsyncClient s3Client; private volatile boolean closed; - private final String s3uri, bucket, key; - private final int maxFragmentSize; -// private final int maxNumberFragments; + private final String bucket, key; + private final int fragmentSize; + private final int readAheadCount; private final int numFragmentsInObject; private final long size; private final Long timeout; private final TimeUnit timeUnit; private ChannelContext context; - private final Logger logger = LoggerFactory.getLogger(this.getClass()); - - static final int READ_AHEAD_COUNT = - Configuration.getInstance().getIntegerWithDefault("s3.read-ahead-count", 1); - static final int MAX_CACHE_SIZE = - Configuration.getInstance().getIntegerWithDefault("s3.spi.read.max-cache-size", 50); - private static final int MAX_FRAGMENT_SIZE = - Configuration.getInstance().getIntegerWithDefault("s3.spi.read.max-fragment-size", 512 * 1024); // 512 KB - - S3SeekableByteChannel(SeekableChannelsProvider.ChannelContext context, String s3uri, String bucket, String key, S3AsyncClient s3Client, long startAt, long size) { - Objects.requireNonNull(s3Client); - if (MAX_FRAGMENT_SIZE < 1) - throw new IllegalArgumentException("maxFragmentSize must be >= 1"); - if (size < 1) - throw new IllegalArgumentException("size must be >= 1"); - - this.position = startAt; + S3SeekableByteChannel(SeekableChannelsProvider.ChannelContext context, String bucket, String key, S3AsyncClient s3Client, long size, int fragmentSize, int readAheadCount) { + this.position = 0; this.bucket = bucket; this.key = key; this.closed = false; this.s3Client = s3Client; - this.s3uri = s3uri; - this.maxFragmentSize = MAX_FRAGMENT_SIZE; + this.fragmentSize = fragmentSize; + this.readAheadCount = readAheadCount; this.timeout = 5L; this.timeUnit = TimeUnit.MINUTES; this.size = size; - this.numFragmentsInObject = (int) Math.ceil((double) size / maxFragmentSize); + this.numFragmentsInObject = (int) Math.ceil((double) size / fragmentSize); this.context = (ChannelContext) context; } @Override public void setContext(@Nullable SeekableChannelsProvider.ChannelContext context) { // null context is allowed for clearing the context - Assert.assertion(context == null || context instanceof ChannelContext, "context == null || context instanceof ChannelContext"); + if (context != null && !(context instanceof ChannelContext)) { + throw new IllegalArgumentException("context must be null or an instance of ChannelContext"); + } this.context = (ChannelContext) context; } @@ -145,7 +129,7 @@ public int read(final ByteBuffer dst) throws IOException { // Figure out the index of the fragment the bytes would start in final int currFragmentIndex = fragmentIndexForByteNumber(channelPosition); - final int fragmentOffset = (int) (channelPosition - ((long) currFragmentIndex * maxFragmentSize)); + final int fragmentOffset = (int) (channelPosition - ((long) currFragmentIndex * fragmentSize)); Assert.neqNull(context, "context"); // Blocking fetch the current fragment if it's not already in the cache @@ -164,9 +148,6 @@ public int read(final ByteBuffer dst) throws IOException { Thread.currentThread().interrupt(); throw new RuntimeException(e); } catch (final ExecutionException e) { - // the async execution completed exceptionally. - // not currently obvious when this will happen or if we can recover - logger.error("an exception occurred while reading bytes from {} that was not recovered by the S3 Client RetryCondition(s)", s3uri); throw new IOException(e); } catch (final TimeoutException e) { throw new RuntimeException(e); @@ -179,7 +160,7 @@ public int read(final ByteBuffer dst) throws IOException { dst.put(currentFragment); // Send requests for read-ahead buffers - final int numFragmentsToLoad = Math.min(READ_AHEAD_COUNT, numFragmentsInObject - currFragmentIndex - 1); + final int numFragmentsToLoad = Math.min(readAheadCount, numFragmentsInObject - currFragmentIndex - 1); for (int i = 0; i < numFragmentsToLoad; i++) { final int readAheadFragmentIndex = i + currFragmentIndex + 1; final ChannelContext.FragmentContext readAheadFragmentContext = context.getFragmentContext(readAheadFragmentIndex); @@ -193,57 +174,35 @@ public int read(final ByteBuffer dst) throws IOException { } /** - * Compute which buffer a byte should be in + * Compute which fragment a byte should be in * * @param byteNumber the number of the byte in the object accessed by this channel * @return the index of the fragment in which {@code byteNumber} will be found. */ private int fragmentIndexForByteNumber(final long byteNumber) { - return Math.toIntExact(Math.floorDiv(byteNumber, (long) maxFragmentSize)); + return Math.toIntExact(Math.floorDiv(byteNumber, (long) fragmentSize)); } private CompletableFuture computeFragmentFuture(final int fragmentIndex) { - final long readFrom = (long) fragmentIndex * maxFragmentSize; - final long readTo = Math.min(readFrom + maxFragmentSize, size) - 1; + final long readFrom = (long) fragmentIndex * fragmentSize; + final long readTo = Math.min(readFrom + fragmentSize, size) - 1; final String range = "bytes=" + readFrom + "-" + readTo; - logger.debug("byte range for {} is '{}'", key, range); return s3Client.getObject( builder -> builder .bucket(bucket) .key(key) .range(range), - new ByteBufferAsyncResponseTransformer<>(maxFragmentSize)); + new ByteBufferAsyncResponseTransformer<>(fragmentSize)); } - /** - * Writes a sequence of bytes to this channel from the given buffer. - * - *

Bytes are written starting at this channel's current position, unless - * the channel is connected to an entity such as a file that is opened with - * the {@link StandardOpenOption#APPEND APPEND} option, in - * which case the position is first advanced to the end. The entity to which - * the channel is connected will grow to accommodate the - * written bytes, and the position updates with the number of bytes - * actually written. Otherwise, this method behaves exactly as specified by - * the {@link WritableByteChannel} interface. - * - * @param src the src of the bytes to write to this channel - */ @Override - public int write(ByteBuffer src) throws IOException { + public int write(final ByteBuffer src) throws IOException { throw new UnsupportedOperationException("Don't support writing to S3 yet"); } - /** - * Returns this channel's position. - * - * @return This channel's position, - * a non-negative integer counting the number of bytes - * from the beginning of the entity to the current position - */ @Override - public long position() throws IOException { + public long position() throws ClosedChannelException { validateOpen(); synchronized (this) { @@ -251,31 +210,8 @@ public long position() throws IOException { } } - /** - * Sets this channel's position. - * - *

Setting the position to a value that is greater than the current size - * is legal but does not change the size of the entity. A later attempt to - * read bytes at such a position will immediately return an end-of-file - * indication. A later attempt to write bytes at such a position will cause - * the entity to grow to accommodate the new bytes; the values of any bytes - * between the previous end-of-file and the newly-written bytes are - * unspecified. - * - *

Setting the channel's position is not recommended when connected to - * an entity, typically a file, that is opened with the {@link - * StandardOpenOption#APPEND APPEND} option. When opened for - * append, the position is first advanced to the end before writing. - * - * @param newPosition The new position, a non-negative integer counting - * the number of bytes from the beginning of the entity - * @return This channel - * @throws ClosedChannelException If this channel is closed - * @throws IllegalArgumentException If the new position is negative - * @throws IOException If some other I/O error occurs - */ @Override - public SeekableByteChannel position(long newPosition) throws IOException { + public SeekableByteChannel position(final long newPosition) throws ClosedChannelException { if (newPosition < 0) throw new IllegalArgumentException("newPosition cannot be < 0"); @@ -289,45 +225,17 @@ public SeekableByteChannel position(long newPosition) throws IOException { } } - /** - * Returns the current size of entity to which this channel is connected. - * - * @return The current size, measured in bytes - * @throws IOException If some other I/O error occurs - */ @Override - public long size() throws IOException { + public long size() throws ClosedChannelException { validateOpen(); return this.size; } - /** - * Truncates the entity, to which this channel is connected, to the given - * size. - * - *

If the given size is less than the current size then the entity is - * truncated, discarding any bytes beyond the new end. If the given size is - * greater than or equal to the current size then the entity is not modified. - * In either case, if the current position is greater than the given size - * then it is set to that size. - * - *

An implementation of this interface may prohibit truncation when - * connected to an entity, typically a file, opened with the {@link - * StandardOpenOption#APPEND APPEND} option. - * - * @param size The new size, a non-negative byte count - * @return This channel - */ @Override - public SeekableByteChannel truncate(long size) { + public SeekableByteChannel truncate(final long size) { throw new UnsupportedOperationException("Currently not supported"); } - /** - * Tells whether this channel is open. - * - * @return {@code true} if, and only if, this channels delegate is open - */ @Override public boolean isOpen() { synchronized (this) { @@ -335,41 +243,6 @@ public boolean isOpen() { } } -// /** -// * The number of fragments currently in the cache. -// * -// * @return the size of the cache after any async evictions or reloads have happened. -// */ -// int numberOfCachedFragments() { -// readAheadBuffersCache.cleanUp(); -// return (int) readAheadBuffersCache.estimatedSize(); -// } - -// /** -// * Obtain a snapshot of the statistics of the internal cache, provides information about hits, misses, requests, evictions etc. -// * that are useful for tuning. -// * -// * @return the statistics of the internal cache. -// */ -// CacheStats cacheStatistics() { -// return readAheadBuffersCache.stats(); -// } - - /** - * Closes this channel. - * - *

After a channel is closed, any further attempt to invoke I/O - * operations upon it will cause a {@link ClosedChannelException} to be - * thrown. - * - *

If this channel is already closed then invoking this method has no - * effect. - * - *

This method may be invoked at any time. If some other thread has - * already invoked it, however, then another invocation will block until - * the first invocation is complete, after which it will return without - * effect.

- */ @Override public void close() throws IOException { synchronized (this) { diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/util/S3SeekableChannelProvider.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/util/S3SeekableChannelProvider.java index f21bcc5a618..b90c4dc756d 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/util/S3SeekableChannelProvider.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/util/S3SeekableChannelProvider.java @@ -1,8 +1,8 @@ package io.deephaven.parquet.table.util; -import io.deephaven.UncheckedDeephavenException; -import io.deephaven.configuration.Configuration; import io.deephaven.parquet.base.util.SeekableChannelsProvider; +import io.deephaven.parquet.table.ParquetInstructions; +import io.deephaven.parquet.table.S3ParquetInstructions; import org.jetbrains.annotations.NotNull; import software.amazon.awssdk.http.async.SdkAsyncHttpClient; import software.amazon.awssdk.regions.Region; @@ -13,7 +13,6 @@ import java.io.IOException; import java.io.UnsupportedEncodingException; import java.net.URI; -import java.net.URISyntaxException; import java.nio.channels.SeekableByteChannel; import java.nio.file.Path; import java.time.Duration; @@ -26,33 +25,25 @@ public final class S3SeekableChannelProvider implements SeekableChannelsProvider { private final S3AsyncClient s3AsyncClient; - private final URI uri; - private final String s3uri, bucket, key; + private final String bucket, key; private final long size; + private final int maxConcurrentRequests; + private final int fragmentSize; + private final int maxCacheSize; + private final int readAheadCount; private final Map contextMap = new HashMap<>(); - private static final int MAX_AWS_CONCURRENT_REQUESTS = - Configuration.getInstance().getIntegerWithDefault("s3.spi.read.max-concurrency", 20); - public S3SeekableChannelProvider(final String awsRegionName, final String uriStr) throws IOException { - if (awsRegionName == null || awsRegionName.isEmpty()) { - throw new IllegalArgumentException("awsRegionName cannot be null or empty"); + public S3SeekableChannelProvider(final URI parquetFileURI, final ParquetInstructions readInstructions) + throws IOException { + if (!(readInstructions.getSpecialInstructions() instanceof S3ParquetInstructions)) { + throw new IllegalArgumentException("Must provide S3ParquetInstructions to read files from S3"); } - if (uriStr == null || uriStr.isEmpty()) { - throw new IllegalArgumentException("uri cannot be null or empty"); - } - if (MAX_AWS_CONCURRENT_REQUESTS < 1) { - throw new IllegalArgumentException("maxConcurrency must be >= 1"); - } - - try { - uri = new URI(uriStr); - } catch (final URISyntaxException e) { - throw new UncheckedDeephavenException("Failed to parse URI " + uriStr, e); - } - + final S3ParquetInstructions s3Instructions = (S3ParquetInstructions) readInstructions.getSpecialInstructions(); + final String awsRegionName = s3Instructions.awsRegionName(); + maxConcurrentRequests = s3Instructions.maxConcurrentRequests(); final SdkAsyncHttpClient asyncHttpClient = AwsCrtAsyncHttpClient.builder() - .maxConcurrency(MAX_AWS_CONCURRENT_REQUESTS) + .maxConcurrency(maxConcurrentRequests) .connectionTimeout(Duration.ofSeconds(5)) .build(); s3AsyncClient = S3AsyncClient.builder() @@ -60,9 +51,12 @@ public S3SeekableChannelProvider(final String awsRegionName, final String uriStr .httpClient(asyncHttpClient) .build(); - this.s3uri = uriStr; - this.bucket = uri.getHost(); - this.key = uri.getPath().substring(1); + fragmentSize = s3Instructions.fragmentSize(); + maxCacheSize = s3Instructions.maxCacheSize(); + readAheadCount = s3Instructions.readAheadCount(); + + this.bucket = parquetFileURI.getHost(); + this.key = parquetFileURI.getPath().substring(1); // Send HEAD request to S3 to get the size of the file { final long timeOut = 1L; @@ -70,9 +64,8 @@ public S3SeekableChannelProvider(final String awsRegionName, final String uriStr final HeadObjectResponse headObjectResponse; try { - headObjectResponse = s3AsyncClient.headObject(builder -> builder - .bucket(bucket) - .key(key)).get(timeOut, unit); + headObjectResponse = s3AsyncClient.headObject( + builder -> builder.bucket(bucket).key(key)).get(timeOut, unit); } catch (final InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException(e); @@ -85,9 +78,9 @@ public S3SeekableChannelProvider(final String awsRegionName, final String uriStr @Override public SeekableByteChannel getReadChannel(@NotNull final SeekableChannelsProvider.ChannelContext context, - @NotNull final Path path) throws IOException { + @NotNull final Path path) { // Ignore the context provided here, will be set properly before reading - return new S3SeekableByteChannel(context, s3uri, bucket, key, s3AsyncClient, 0, size); + return new S3SeekableByteChannel(context, bucket, key, s3AsyncClient, size, fragmentSize, readAheadCount); } @Override @@ -101,8 +94,7 @@ public ChannelContext makeContext() { if (contextMap.containsKey(tid)) { return contextMap.get(tid); } - context = new S3SeekableByteChannel.ChannelContext(S3SeekableByteChannel.READ_AHEAD_COUNT, - S3SeekableByteChannel.MAX_CACHE_SIZE); + context = new S3SeekableByteChannel.ChannelContext(readAheadCount, maxCacheSize); contextMap.put(tid, context); } return context; diff --git a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java index 575759e7785..9b082960318 100644 --- a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java +++ b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java @@ -571,8 +571,15 @@ public void testArrayColumns() { @Test public void readLongParquetFileFromS3Test() { + final S3ParquetInstructions s3ParquetInstructions = S3ParquetInstructions.builder() + .awsRegionName("us-east-2") + .readAheadCount(1) + .fragmentSize(5 * 1024 * 1024) + .maxConcurrentRequests(50) + .maxCacheSize(50) + .build(); final ParquetInstructions readInstructions = new ParquetInstructions.Builder() - .setAwsRegionName("us-east-2") + .setSpecialInstructions(s3ParquetInstructions) .build(); final TableDefinition tableDefinition = TableDefinition.of( @@ -604,8 +611,15 @@ public void readLongParquetFileFromS3Test() { @Test public void readRefParquetFileFromS3Test() { + final S3ParquetInstructions s3ParquetInstructions = S3ParquetInstructions.builder() + .awsRegionName("us-east-2") + .readAheadCount(1) + .fragmentSize(5 * 1024 * 1024) + .maxConcurrentRequests(50) + .maxCacheSize(50) + .build(); final ParquetInstructions readInstructions = new ParquetInstructions.Builder() - .setAwsRegionName("us-east-2") + .setSpecialInstructions(s3ParquetInstructions) .build(); final TableDefinition tableDefinition = TableDefinition.of( ColumnDefinition.ofString("hash"), @@ -673,8 +687,15 @@ public void readRefParquetFileLocally() { @Test public void profileReadingFromS3() { - ParquetInstructions readInstructions = new ParquetInstructions.Builder() - .setAwsRegionName("us-east-1") + final S3ParquetInstructions s3ParquetInstructions = S3ParquetInstructions.builder() + .awsRegionName("us-east-1") + .readAheadCount(1) + .fragmentSize(5 * 1024 * 1024) + .maxConcurrentRequests(50) + .maxCacheSize(50) + .build(); + final ParquetInstructions readInstructions = new ParquetInstructions.Builder() + .setSpecialInstructions(s3ParquetInstructions) .build(); long totalTime = 0; @@ -688,9 +709,15 @@ public void profileReadingFromS3() { } System.out.println("Average execution time AWS is " + totalTime / (NUM_RUNS * 1000_000_000.0) + " sec"); - - readInstructions = new ParquetInstructions.Builder() - .setAwsRegionName("us-east-2") + final S3ParquetInstructions s3ParquetInstructions2 = S3ParquetInstructions.builder() + .awsRegionName("us-east-2") + .readAheadCount(1) + .fragmentSize(5 * 1024 * 1024) + .maxConcurrentRequests(50) + .maxCacheSize(50) + .build(); + final ParquetInstructions readInstructions2 = new ParquetInstructions.Builder() + .setSpecialInstructions(s3ParquetInstructions2) .build(); final TableDefinition tableDefinition = TableDefinition.of( ColumnDefinition.ofString("hash"), @@ -713,7 +740,7 @@ public void profileReadingFromS3() { final long start = System.nanoTime(); ParquetTools.readTable( "s3://aws-public-blockchain/v1.0/btc/transactions/date=2009-01-03/part-00000-bdd84ab2-82e9-4a79-8212-7accd76815e8-c000.snappy.parquet", - readInstructions, tableDefinition).head(5).select(); + readInstructions2, tableDefinition).head(5).select(); final long end = System.nanoTime(); totalTime += end - start; System.out.println((i + 1) + ". Execution time AWS is " + (end - start) / 1000_000_000.0 + " sec"); @@ -734,8 +761,15 @@ public void profileReadingFromS3() { @Test public void readParquetFileFromS3Test() { + final S3ParquetInstructions s3ParquetInstructions = S3ParquetInstructions.builder() + .awsRegionName("us-east-1") + .readAheadCount(1) + .fragmentSize(5 * 1024 * 1024) + .maxConcurrentRequests(50) + .maxCacheSize(50) + .build(); final ParquetInstructions readInstructions = new ParquetInstructions.Builder() - .setAwsRegionName("us-east-1") + .setSpecialInstructions(s3ParquetInstructions) .build(); final Table fromAws1 = ParquetTools.readTable("s3://dh-s3-parquet-test1/multiColFile.parquet", readInstructions).select(); diff --git a/py/server/deephaven/parquet.py b/py/server/deephaven/parquet.py index f4e727ec326..ceb2ef38cad 100644 --- a/py/server/deephaven/parquet.py +++ b/py/server/deephaven/parquet.py @@ -19,8 +19,46 @@ _JFile = jpy.get_type("java.io.File") _JCompressionCodecName = jpy.get_type("org.apache.parquet.hadoop.metadata.CompressionCodecName") _JParquetInstructions = jpy.get_type("io.deephaven.parquet.table.ParquetInstructions") +_JS3ParquetInstructions = jpy.get_type("io.deephaven.parquet.table.S3ParquetInstructions") _JTableDefinition = jpy.get_type("io.deephaven.engine.table.TableDefinition") +def _build_s3_parquet_instructions( + aws_region_name: str, # TODO This is a required parameter, so is this okay? + max_concurrent_requests: int = None, + read_ahead_count: int = None, + fragment_size: int = None, + max_cache_size: int = None, +): + if not any( + [ + aws_region_name, + max_concurrent_requests, + read_ahead_count, + fragment_size, + max_cache_size, + ] + ): + return None + + builder = _JS3ParquetInstructions.builder() + + if aws_region_name: + builder.awsRegionName(aws_region_name) + + if max_concurrent_requests is not None: + builder.maxConcurrentRequests(max_concurrent_requests) + + if read_ahead_count is not None: + builder.readAheadCount(read_ahead_count) + + if fragment_size is not None: + builder.fragmentSize(fragment_size) + + if max_cache_size is not None: + builder.maxCacheSize(max_cache_size) + + return builder.build() + @dataclass class ColumnInstruction: @@ -33,29 +71,37 @@ class ColumnInstruction: def _build_parquet_instructions( - col_instructions: List[ColumnInstruction] = None, - compression_codec_name: str = None, - max_dictionary_keys: int = None, - max_dictionary_size: int = None, - is_legacy_parquet: bool = False, - target_page_size: int = None, - is_refreshing: bool = False, - for_read: bool = True, - force_build: bool = False, - aws_region_name: str = None, + col_instructions: List[ColumnInstruction] = None, + compression_codec_name: str = None, + max_dictionary_keys: int = None, + max_dictionary_size: int = None, + is_legacy_parquet: bool = False, + target_page_size: int = None, + is_refreshing: bool = False, + for_read: bool = True, + force_build: bool = False, + aws_region_name: str = None, + max_concurrent_requests: int = None, + read_ahead_count: int = None, + fragment_size: int = None, + max_cache_size: int = None, ): if not any( - [ - force_build, - col_instructions, - compression_codec_name, - max_dictionary_keys is not None, - max_dictionary_size is not None, - is_legacy_parquet, - target_page_size is not None, - is_refreshing, - aws_region_name, - ] + [ + force_build, + col_instructions, + compression_codec_name, + max_dictionary_keys is not None, + max_dictionary_size is not None, + is_legacy_parquet, + target_page_size is not None, + is_refreshing, + aws_region_name is not None, + max_concurrent_requests is not None, + read_ahead_count is not None, + fragment_size is not None, + max_cache_size is not None, + ] ): return None @@ -91,8 +137,15 @@ def _build_parquet_instructions( if is_refreshing: builder.setIsRefreshing(is_refreshing) - if aws_region_name: - builder.setAwsRegionName(aws_region_name) + if aws_region_name is not None: + s3_parquet_instructions = _build_s3_parquet_instructions( + aws_region_name=aws_region_name, + max_concurrent_requests=max_concurrent_requests, + read_ahead_count=read_ahead_count, + fragment_size=fragment_size, + max_cache_size=max_cache_size, + ) + builder.setSpecialInstructions(s3_parquet_instructions) return builder.build() @@ -113,6 +166,7 @@ def _j_table_definition(table_definition: Union[Dict[str, DType], List[Column], else: raise DHError(f"Unexpected table_definition type: {type(table_definition)}") + class ParquetFileLayout(Enum): """ The parquet file layout. """ @@ -130,13 +184,17 @@ class ParquetFileLayout(Enum): def read( - path: str, - col_instructions: Optional[List[ColumnInstruction]] = None, - is_legacy_parquet: bool = False, - is_refreshing: bool = False, - file_layout: Optional[ParquetFileLayout] = None, - table_definition: Union[Dict[str, DType], List[Column], None] = None, - aws_region_name: str = None, + path: str, + col_instructions: Optional[List[ColumnInstruction]] = None, + is_legacy_parquet: bool = False, + is_refreshing: bool = False, + file_layout: Optional[ParquetFileLayout] = None, + table_definition: Union[Dict[str, DType], List[Column], None] = None, + aws_region_name: str = None, + max_concurrent_requests: int = None, + read_ahead_count: int = None, + fragment_size: int = None, + max_cache_size: int = None, ) -> Table: """ Reads in a table from a single parquet, metadata file, or directory with recognized layout. @@ -154,6 +212,7 @@ def read( empty and is_refreshing=True. It is also useful for specifying a subset of the parquet definition. When set, file_layout must also be set. aws_region_name (str): the AWS region name for reading parquet files stored in AWS S3, by default None + TODO Add docstrings for the more parameters Returns: a table @@ -169,13 +228,17 @@ def read( for_read=True, force_build=True, aws_region_name=aws_region_name, + max_concurrent_requests=max_concurrent_requests, + read_ahead_count=read_ahead_count, + fragment_size=fragment_size, + max_cache_size=max_cache_size, ) j_table_definition = _j_table_definition(table_definition) if j_table_definition is not None: if not file_layout: raise DHError("Must provide file_layout when table_definition is set") if file_layout == ParquetFileLayout.SINGLE_FILE: - j_table = _JParquetTools.readSingleFileTable(_JFile(path), read_instructions, j_table_definition) + j_table = _JParquetTools.readSingleFileTable(path, read_instructions, j_table_definition) elif file_layout == ParquetFileLayout.FLAT_PARTITIONED: j_table = _JParquetTools.readFlatPartitionedTable(_JFile(path), read_instructions, j_table_definition) elif file_layout == ParquetFileLayout.KV_PARTITIONED: @@ -188,7 +251,7 @@ def read( if not file_layout: j_table = _JParquetTools.readTable(path, read_instructions) elif file_layout == ParquetFileLayout.SINGLE_FILE: - j_table = _JParquetTools.readSingleFileTable(_JFile(path), read_instructions) + j_table = _JParquetTools.readSingleFileTable(path, read_instructions) elif file_layout == ParquetFileLayout.FLAT_PARTITIONED: j_table = _JParquetTools.readFlatPartitionedTable(_JFile(path), read_instructions) elif file_layout == ParquetFileLayout.KV_PARTITIONED: