Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support to read single parquet file hosted in AWS S3 #4972

Merged
merged 41 commits into from
Jan 29, 2024
Merged
Changes from 1 commit
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
9723094
Initial commit
malhotrashivam Dec 6, 2023
2fe31fb
Moved to AWS SDK V2
malhotrashivam Dec 12, 2023
c69f875
Added support for S3 nio AWS channel
malhotrashivam Dec 14, 2023
4b98551
Moved s3 nio files to local repo
malhotrashivam Dec 18, 2023
6d72723
Made a number of improvements to s3 nio channel files
malhotrashivam Dec 18, 2023
cca5051
Moved to a context based cache
malhotrashivam Dec 26, 2023
b5e2c96
Added s3 specific parquet instructions
malhotrashivam Dec 27, 2023
2cb1c3f
Moved to URIs from files for reading single parquet file
malhotrashivam Dec 28, 2023
af10a1a
Added s3 specific parquet instructions
malhotrashivam Dec 29, 2023
edbd29a
Improved comments and Javadocs
malhotrashivam Dec 28, 2023
404af99
Resolving Devin's comments
malhotrashivam Jan 8, 2024
9aa7549
Code review with Devin part 2
malhotrashivam Jan 9, 2024
ae87404
Merge branch 'main' into sm-s3-pq
malhotrashivam Jan 9, 2024
b85043e
Integrated with Ryan's patch
malhotrashivam Jan 9, 2024
0e5ff03
Resolving more comments
malhotrashivam Jan 10, 2024
98b6f31
Moved to a service loader style pattern for channel providers
malhotrashivam Jan 10, 2024
d7fc6a1
Minor fixes in gradle files
malhotrashivam Jan 10, 2024
aefa5ef
Resolved more comments
malhotrashivam Jan 11, 2024
82a4ef3
Resolving Ryan's comments
malhotrashivam Jan 12, 2024
11acf6e
Fixed one test failure
malhotrashivam Jan 12, 2024
ae5a371
Resolving Devin's comments
malhotrashivam Jan 12, 2024
2dc360e
Cosmetic cleanups
malhotrashivam Jan 12, 2024
2840ec5
Resolving Ryan's comments part 2
malhotrashivam Jan 13, 2024
27e1638
Moved ChannelContext outside of SeekableChannelsProvider
malhotrashivam Jan 13, 2024
3b7521e
Resolving more comments
malhotrashivam Jan 15, 2024
36cf3cb
Resolved python comments and updated corresponding descriptions in ja…
malhotrashivam Jan 19, 2024
ecb35a8
Resolved more python comments
malhotrashivam Jan 19, 2024
36db89c
Resolving Ryan's comments
malhotrashivam Jan 19, 2024
b191ae5
Resolving more comments
malhotrashivam Jan 22, 2024
c30fcfa
Moved files from io.deephaven.parquet.base.util to io.deephaven.util.…
malhotrashivam Jan 23, 2024
6bc9f54
Resolving more comments
malhotrashivam Jan 23, 2024
10172e3
Added buffer pool support and TODOs for future projects
malhotrashivam Jan 23, 2024
7264be1
Added a new module Util/channel
malhotrashivam Jan 23, 2024
e28b993
Minor fix
malhotrashivam Jan 23, 2024
3b3dbbd
Resolved more comments
malhotrashivam Jan 25, 2024
cf2b130
Added support to pass AWS credentials
malhotrashivam Jan 26, 2024
dfb2e4c
Merge branch 'main' into sm-s3-pq
malhotrashivam Jan 26, 2024
661424a
Improved comments
malhotrashivam Jan 26, 2024
5695f2a
Renamed static credentials to basic credentials
malhotrashivam Jan 26, 2024
9847b59
Renamed AwsCredentialsImpl to AwsSdkV2Credentials
malhotrashivam Jan 26, 2024
0750e56
Minor refactoring around python comments
malhotrashivam Jan 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Added s3 specific parquet instructions
Also added support for Python
malhotrashivam committed Dec 29, 2023
commit af10a1a2740162b19b8a022f69bd9208b3270476
Original file line number Diff line number Diff line change
@@ -138,11 +138,6 @@ public final String getColumnNameFromParquetColumnNameOrDefault(final String par
*/
public abstract boolean useDictionary(String columnName);

/**
* @return The AWS region name to use for S3 operations; defaults to null
*/
public abstract String getAwsRegionName();

public abstract Object getSpecialInstructions();

public abstract String getCompressionCodecName();
@@ -209,11 +204,6 @@ public boolean useDictionary(final String columnName) {
return false;
}

@Override
public @Nullable String getAwsRegionName() {
return null;
}

@Override
public @Nullable String getSpecialInstructions() {
return null;
@@ -315,7 +305,6 @@ private static final class ReadOnly extends ParquetInstructions {
private final boolean isLegacyParquet;
private final int targetPageSize;
private final boolean isRefreshing;
private final String awsRegionName;
private final Object specialInstructions;

private ReadOnly(
@@ -327,7 +316,6 @@ private ReadOnly(
final boolean isLegacyParquet,
final int targetPageSize,
final boolean isRefreshing,
final String awsRegionName,
final Object specialInstructions) {
this.columnNameToInstructions = columnNameToInstructions;
this.parquetColumnNameToInstructions = parquetColumnNameToColumnName;
@@ -337,7 +325,6 @@ private ReadOnly(
this.isLegacyParquet = isLegacyParquet;
this.targetPageSize = targetPageSize;
this.isRefreshing = isRefreshing;
this.awsRegionName = awsRegionName;
this.specialInstructions = specialInstructions;
}

@@ -427,11 +414,6 @@ public boolean isRefreshing() {
return isRefreshing;
}

@Override
public String getAwsRegionName() {
return awsRegionName;
}

@Override
public @Nullable Object getSpecialInstructions() {
return specialInstructions;
@@ -488,7 +470,6 @@ public static class Builder {
private boolean isLegacyParquet;
private int targetPageSize = defaultTargetPageSize;
private boolean isRefreshing = DEFAULT_IS_REFRESHING;
private String awsRegionName;
private Object specialInstructions;

public Builder() {}
@@ -661,11 +642,6 @@ public Builder setIsRefreshing(final boolean isRefreshing) {
return this;
}

public Builder setAwsRegionName(final String awsRegionName) {
this.awsRegionName = awsRegionName;
return this;
}

public Builder setSpecialInstructions(final Object specialInstructions) {
this.specialInstructions = specialInstructions;
return this;
@@ -679,7 +655,7 @@ public ParquetInstructions build() {
parquetColumnNameToInstructions = null;
return new ReadOnly(columnNameToInstructionsOut, parquetColumnNameToColumnNameOut, compressionCodecName,
maximumDictionaryKeys, maximumDictionarySize, isLegacyParquet, targetPageSize, isRefreshing,
awsRegionName, specialInstructions);
specialInstructions);
}
}

Original file line number Diff line number Diff line change
@@ -1142,9 +1142,7 @@ public static ParquetFileReader getParquetFileReaderChecked(
if (parquetFileURI.getScheme() != null && parquetFileURI.getScheme().equals(S3_PARQUET_FILE_URI_SCHEME)) {
return new ParquetFileReader(parquetFileURI,
new CachedChannelProvider(
new S3SeekableChannelProvider(readInstructions.getAwsRegionName(),
parquetFileURI.toString()),
1 << 7));
new S3SeekableChannelProvider(parquetFileURI, readInstructions), 1 << 7));
}
return new ParquetFileReader(
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
parquetFileURI,
Original file line number Diff line number Diff line change
@@ -7,14 +7,14 @@
@BuildableStyle
public abstract class S3ParquetInstructions {

private final static int DEFAULT_MAX_CONCURRENT_REQUESTS = 20;
private final static int DEFAULT_MAX_CONCURRENT_REQUESTS = 50;
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
private final static int DEFAULT_READ_AHEAD_COUNT = 1;
private final static int DEFAULT_MAX_FRAGMENT_SIZE = 512 << 20; // 5 MB
private final static int MIN_MAX_FRAGMENT_SIZE = 8 << 10; // 8 KB
private final static int DEFAULT_FRAGMENT_SIZE = 512 << 20; // 5 MB
private final static int MIN_FRAGMENT_SIZE = 8 << 10; // 8 KB
private final static int DEFAULT_MAX_CACHE_SIZE = 50;
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved

public static Builder builder() {
return S3ParquetInstructions.builder();
return ImmutableS3ParquetInstructions.builder();
}

public abstract String awsRegionName();
@@ -30,8 +30,8 @@ public int readAheadCount() {
}

@Value.Default
public int maxFragmentSize() {
return DEFAULT_MAX_FRAGMENT_SIZE;
public int fragmentSize() {
return DEFAULT_FRAGMENT_SIZE;
}

@Value.Default
@@ -55,8 +55,8 @@ final void boundsCheckReadAheadCount() {

@Value.Check
final void boundsCheckMaxFragmentSize() {
if (maxFragmentSize() < MIN_MAX_FRAGMENT_SIZE) {
throw new IllegalArgumentException("maxFragmentSize(=" + maxFragmentSize() + ") must be >= 8*1024 or 8 KB");
if (fragmentSize() < MIN_FRAGMENT_SIZE) {
throw new IllegalArgumentException("fragmentSize(=" + fragmentSize() + ") must be >= 8*1024 or 8 KB");
}
}

@@ -74,7 +74,7 @@ public interface Builder {

Builder readAheadCount(int readAheadCount);

Builder maxFragmentSize(int maxFragmentSize);
Builder fragmentSize(int fragmentSize);

Builder maxCacheSize(int maxCacheSize);

Loading