Skip to content

Commit

Permalink
Added s3 specific parquet instructions
Browse files Browse the repository at this point in the history
Also added support for Python
  • Loading branch information
malhotrashivam committed Dec 28, 2023
1 parent d8f5d49 commit f703227
Show file tree
Hide file tree
Showing 6 changed files with 204 additions and 237 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -997,6 +997,13 @@ public static Table readSingleFileTable(
return readSingleFileTable(convertToURI(file), readInstructions, tableDefinition);
}

public static Table readSingleFileTable(
@NotNull final String filePath,
@NotNull final ParquetInstructions readInstructions,
@NotNull final TableDefinition tableDefinition) {
return readSingleFileTable(convertToURI(filePath), readInstructions, tableDefinition);
}

private static Table readSingleFileTable(
@NotNull final URI parquetFileURI,
@NotNull final ParquetInstructions readInstructions,
Expand Down Expand Up @@ -1122,9 +1129,7 @@ public static ParquetFileReader getParquetFileReaderChecked(
if (parquetFileURI.getScheme() != null && parquetFileURI.getScheme().equals(S3_PARQUET_FILE_URI_SCHEME)) {
return new ParquetFileReader(parquetFileURI,
new CachedChannelProvider(
new S3SeekableChannelProvider(readInstructions.getAwsRegionName(),
parquetFileURI.toString()),
1 << 7));
new S3SeekableChannelProvider(parquetFileURI, readInstructions), 1 << 7));
}
return new ParquetFileReader(
parquetFileURI,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@
@BuildableStyle
public abstract class S3ParquetInstructions {

private final static int DEFAULT_MAX_CONCURRENT_REQUESTS = 20;
private final static int DEFAULT_MAX_CONCURRENT_REQUESTS = 50;
private final static int DEFAULT_READ_AHEAD_COUNT = 1;
private final static int DEFAULT_MAX_FRAGMENT_SIZE = 512 << 20; // 5 MB
private final static int MIN_MAX_FRAGMENT_SIZE = 8 << 10; // 8 KB
private final static int DEFAULT_FRAGMENT_SIZE = 512 << 20; // 5 MB
private final static int MIN_FRAGMENT_SIZE = 8 << 10; // 8 KB
private final static int DEFAULT_MAX_CACHE_SIZE = 50;

public static Builder builder() {
return S3ParquetInstructions.builder();
return ImmutableS3ParquetInstructions.builder();
}

public abstract String awsRegionName();
Expand All @@ -30,8 +30,8 @@ public int readAheadCount() {
}

@Value.Default
public int maxFragmentSize() {
return DEFAULT_MAX_FRAGMENT_SIZE;
public int fragmentSize() {
return DEFAULT_FRAGMENT_SIZE;
}

@Value.Default
Expand All @@ -55,8 +55,8 @@ final void boundsCheckReadAheadCount() {

@Value.Check
final void boundsCheckMaxFragmentSize() {
if (maxFragmentSize() < MIN_MAX_FRAGMENT_SIZE) {
throw new IllegalArgumentException("maxFragmentSize(=" + maxFragmentSize() + ") must be >= 8*1024 or 8 KB");
if (fragmentSize() < MIN_FRAGMENT_SIZE) {
throw new IllegalArgumentException("fragmentSize(=" + fragmentSize() + ") must be >= 8*1024 or 8 KB");
}
}

Expand All @@ -74,7 +74,7 @@ public interface Builder {

Builder readAheadCount(int readAheadCount);

Builder maxFragmentSize(int maxFragmentSize);
Builder fragmentSize(int fragmentSize);

Builder maxCacheSize(int maxCacheSize);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
package io.deephaven.parquet.table.util;

import io.deephaven.base.verify.Assert;
import io.deephaven.configuration.Configuration;
import io.deephaven.parquet.base.util.SeekableChannelsProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.s3.S3AsyncClient;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -72,50 +69,37 @@ FragmentContext getFragmentContext(final int fragmentIndex) {
private long position;
private final S3AsyncClient s3Client;
private volatile boolean closed;
private final String s3uri, bucket, key;
private final int maxFragmentSize;
// private final int maxNumberFragments;
private final String bucket, key;
private final int fragmentSize;
private final int readAheadCount;
private final int numFragmentsInObject;
private final long size;
private final Long timeout;
private final TimeUnit timeUnit;
private ChannelContext context;

private final Logger logger = LoggerFactory.getLogger(this.getClass());

static final int READ_AHEAD_COUNT =
Configuration.getInstance().getIntegerWithDefault("s3.read-ahead-count", 1);
static final int MAX_CACHE_SIZE =
Configuration.getInstance().getIntegerWithDefault("s3.spi.read.max-cache-size", 50);
private static final int MAX_FRAGMENT_SIZE =
Configuration.getInstance().getIntegerWithDefault("s3.spi.read.max-fragment-size", 512 * 1024); // 512 KB

S3SeekableByteChannel(SeekableChannelsProvider.ChannelContext context, String s3uri, String bucket, String key, S3AsyncClient s3Client, long startAt, long size) {
Objects.requireNonNull(s3Client);
if (MAX_FRAGMENT_SIZE < 1)
throw new IllegalArgumentException("maxFragmentSize must be >= 1");
if (size < 1)
throw new IllegalArgumentException("size must be >= 1");

this.position = startAt;
S3SeekableByteChannel(SeekableChannelsProvider.ChannelContext context, String bucket, String key, S3AsyncClient s3Client, long size, int fragmentSize, int readAheadCount) {
this.position = 0;
this.bucket = bucket;
this.key = key;
this.closed = false;
this.s3Client = s3Client;

this.s3uri = s3uri;
this.maxFragmentSize = MAX_FRAGMENT_SIZE;
this.fragmentSize = fragmentSize;
this.readAheadCount = readAheadCount;
this.timeout = 5L;
this.timeUnit = TimeUnit.MINUTES;
this.size = size;
this.numFragmentsInObject = (int) Math.ceil((double) size / maxFragmentSize);
this.numFragmentsInObject = (int) Math.ceil((double) size / fragmentSize);
this.context = (ChannelContext) context;
}

@Override
public void setContext(@Nullable SeekableChannelsProvider.ChannelContext context) {
// null context is allowed for clearing the context
Assert.assertion(context == null || context instanceof ChannelContext, "context == null || context instanceof ChannelContext");
if (context != null && !(context instanceof ChannelContext)) {
throw new IllegalArgumentException("context must be null or an instance of ChannelContext");
}
this.context = (ChannelContext) context;
}

Expand Down Expand Up @@ -145,7 +129,7 @@ public int read(final ByteBuffer dst) throws IOException {

// Figure out the index of the fragment the bytes would start in
final int currFragmentIndex = fragmentIndexForByteNumber(channelPosition);
final int fragmentOffset = (int) (channelPosition - ((long) currFragmentIndex * maxFragmentSize));
final int fragmentOffset = (int) (channelPosition - ((long) currFragmentIndex * fragmentSize));
Assert.neqNull(context, "context");

// Blocking fetch the current fragment if it's not already in the cache
Expand All @@ -164,9 +148,6 @@ public int read(final ByteBuffer dst) throws IOException {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
} catch (final ExecutionException e) {
// the async execution completed exceptionally.
// not currently obvious when this will happen or if we can recover
logger.error("an exception occurred while reading bytes from {} that was not recovered by the S3 Client RetryCondition(s)", s3uri);
throw new IOException(e);
} catch (final TimeoutException e) {
throw new RuntimeException(e);
Expand All @@ -179,7 +160,7 @@ public int read(final ByteBuffer dst) throws IOException {
dst.put(currentFragment);

// Send requests for read-ahead buffers
final int numFragmentsToLoad = Math.min(READ_AHEAD_COUNT, numFragmentsInObject - currFragmentIndex - 1);
final int numFragmentsToLoad = Math.min(readAheadCount, numFragmentsInObject - currFragmentIndex - 1);
for (int i = 0; i < numFragmentsToLoad; i++) {
final int readAheadFragmentIndex = i + currFragmentIndex + 1;
final ChannelContext.FragmentContext readAheadFragmentContext = context.getFragmentContext(readAheadFragmentIndex);
Expand All @@ -193,89 +174,44 @@ public int read(final ByteBuffer dst) throws IOException {
}

/**
* Compute which buffer a byte should be in
* Compute which fragment a byte should be in
*
* @param byteNumber the number of the byte in the object accessed by this channel
* @return the index of the fragment in which {@code byteNumber} will be found.
*/
private int fragmentIndexForByteNumber(final long byteNumber) {
return Math.toIntExact(Math.floorDiv(byteNumber, (long) maxFragmentSize));
return Math.toIntExact(Math.floorDiv(byteNumber, (long) fragmentSize));
}

private CompletableFuture<ByteBuffer> computeFragmentFuture(final int fragmentIndex) {
final long readFrom = (long) fragmentIndex * maxFragmentSize;
final long readTo = Math.min(readFrom + maxFragmentSize, size) - 1;
final long readFrom = (long) fragmentIndex * fragmentSize;
final long readTo = Math.min(readFrom + fragmentSize, size) - 1;
final String range = "bytes=" + readFrom + "-" + readTo;
logger.debug("byte range for {} is '{}'", key, range);

return s3Client.getObject(
builder -> builder
.bucket(bucket)
.key(key)
.range(range),
new ByteBufferAsyncResponseTransformer<>(maxFragmentSize));
new ByteBufferAsyncResponseTransformer<>(fragmentSize));
}

/**
* Writes a sequence of bytes to this channel from the given buffer.
*
* <p> Bytes are written starting at this channel's current position, unless
* the channel is connected to an entity such as a file that is opened with
* the {@link StandardOpenOption#APPEND APPEND} option, in
* which case the position is first advanced to the end. The entity to which
* the channel is connected will grow to accommodate the
* written bytes, and the position updates with the number of bytes
* actually written. Otherwise, this method behaves exactly as specified by
* the {@link WritableByteChannel} interface.
*
* @param src the src of the bytes to write to this channel
*/
@Override
public int write(ByteBuffer src) throws IOException {
public int write(final ByteBuffer src) throws IOException {
throw new UnsupportedOperationException("Don't support writing to S3 yet");
}

/**
* Returns this channel's position.
*
* @return This channel's position,
* a non-negative integer counting the number of bytes
* from the beginning of the entity to the current position
*/
@Override
public long position() throws IOException {
public long position() throws ClosedChannelException {
validateOpen();

synchronized (this) {
return position;
}
}

/**
* Sets this channel's position.
*
* <p> Setting the position to a value that is greater than the current size
* is legal but does not change the size of the entity. A later attempt to
* read bytes at such a position will immediately return an end-of-file
* indication. A later attempt to write bytes at such a position will cause
* the entity to grow to accommodate the new bytes; the values of any bytes
* between the previous end-of-file and the newly-written bytes are
* unspecified.
*
* <p> Setting the channel's position is not recommended when connected to
* an entity, typically a file, that is opened with the {@link
* StandardOpenOption#APPEND APPEND} option. When opened for
* append, the position is first advanced to the end before writing.
*
* @param newPosition The new position, a non-negative integer counting
* the number of bytes from the beginning of the entity
* @return This channel
* @throws ClosedChannelException If this channel is closed
* @throws IllegalArgumentException If the new position is negative
* @throws IOException If some other I/O error occurs
*/
@Override
public SeekableByteChannel position(long newPosition) throws IOException {
public SeekableByteChannel position(final long newPosition) throws ClosedChannelException {
if (newPosition < 0)
throw new IllegalArgumentException("newPosition cannot be < 0");

Expand All @@ -289,87 +225,24 @@ public SeekableByteChannel position(long newPosition) throws IOException {
}
}

/**
* Returns the current size of entity to which this channel is connected.
*
* @return The current size, measured in bytes
* @throws IOException If some other I/O error occurs
*/
@Override
public long size() throws IOException {
public long size() throws ClosedChannelException {
validateOpen();
return this.size;
}

/**
* Truncates the entity, to which this channel is connected, to the given
* size.
*
* <p> If the given size is less than the current size then the entity is
* truncated, discarding any bytes beyond the new end. If the given size is
* greater than or equal to the current size then the entity is not modified.
* In either case, if the current position is greater than the given size
* then it is set to that size.
*
* <p> An implementation of this interface may prohibit truncation when
* connected to an entity, typically a file, opened with the {@link
* StandardOpenOption#APPEND APPEND} option.
*
* @param size The new size, a non-negative byte count
* @return This channel
*/
@Override
public SeekableByteChannel truncate(long size) {
public SeekableByteChannel truncate(final long size) {
throw new UnsupportedOperationException("Currently not supported");
}

/**
* Tells whether this channel is open.
*
* @return {@code true} if, and only if, this channels delegate is open
*/
@Override
public boolean isOpen() {
synchronized (this) {
return !this.closed;
}
}

// /**
// * The number of fragments currently in the cache.
// *
// * @return the size of the cache after any async evictions or reloads have happened.
// */
// int numberOfCachedFragments() {
// readAheadBuffersCache.cleanUp();
// return (int) readAheadBuffersCache.estimatedSize();
// }

// /**
// * Obtain a snapshot of the statistics of the internal cache, provides information about hits, misses, requests, evictions etc.
// * that are useful for tuning.
// *
// * @return the statistics of the internal cache.
// */
// CacheStats cacheStatistics() {
// return readAheadBuffersCache.stats();
// }

/**
* Closes this channel.
*
* <p> After a channel is closed, any further attempt to invoke I/O
* operations upon it will cause a {@link ClosedChannelException} to be
* thrown.
*
* <p> If this channel is already closed then invoking this method has no
* effect.
*
* <p> This method may be invoked at any time. If some other thread has
* already invoked it, however, then another invocation will block until
* the first invocation is complete, after which it will return without
* effect. </p>
*/
@Override
public void close() throws IOException {
synchronized (this) {
Expand Down
Loading

0 comments on commit f703227

Please sign in to comment.