Skip to content

Commit

Permalink
Added s3 specific parquet instructions
Browse files Browse the repository at this point in the history
Also added support for Python
  • Loading branch information
malhotrashivam committed Dec 29, 2023
1 parent 2cb1c3f commit af10a1a
Show file tree
Hide file tree
Showing 7 changed files with 198 additions and 262 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,6 @@ public final String getColumnNameFromParquetColumnNameOrDefault(final String par
*/
public abstract boolean useDictionary(String columnName);

/**
* @return The AWS region name to use for S3 operations; defaults to null
*/
public abstract String getAwsRegionName();

public abstract Object getSpecialInstructions();

public abstract String getCompressionCodecName();
Expand Down Expand Up @@ -209,11 +204,6 @@ public boolean useDictionary(final String columnName) {
return false;
}

@Override
public @Nullable String getAwsRegionName() {
return null;
}

@Override
public @Nullable String getSpecialInstructions() {
return null;
Expand Down Expand Up @@ -315,7 +305,6 @@ private static final class ReadOnly extends ParquetInstructions {
private final boolean isLegacyParquet;
private final int targetPageSize;
private final boolean isRefreshing;
private final String awsRegionName;
private final Object specialInstructions;

private ReadOnly(
Expand All @@ -327,7 +316,6 @@ private ReadOnly(
final boolean isLegacyParquet,
final int targetPageSize,
final boolean isRefreshing,
final String awsRegionName,
final Object specialInstructions) {
this.columnNameToInstructions = columnNameToInstructions;
this.parquetColumnNameToInstructions = parquetColumnNameToColumnName;
Expand All @@ -337,7 +325,6 @@ private ReadOnly(
this.isLegacyParquet = isLegacyParquet;
this.targetPageSize = targetPageSize;
this.isRefreshing = isRefreshing;
this.awsRegionName = awsRegionName;
this.specialInstructions = specialInstructions;
}

Expand Down Expand Up @@ -427,11 +414,6 @@ public boolean isRefreshing() {
return isRefreshing;
}

@Override
public String getAwsRegionName() {
return awsRegionName;
}

@Override
public @Nullable Object getSpecialInstructions() {
return specialInstructions;
Expand Down Expand Up @@ -488,7 +470,6 @@ public static class Builder {
private boolean isLegacyParquet;
private int targetPageSize = defaultTargetPageSize;
private boolean isRefreshing = DEFAULT_IS_REFRESHING;
private String awsRegionName;
private Object specialInstructions;

public Builder() {}
Expand Down Expand Up @@ -661,11 +642,6 @@ public Builder setIsRefreshing(final boolean isRefreshing) {
return this;
}

public Builder setAwsRegionName(final String awsRegionName) {
this.awsRegionName = awsRegionName;
return this;
}

public Builder setSpecialInstructions(final Object specialInstructions) {
this.specialInstructions = specialInstructions;
return this;
Expand All @@ -679,7 +655,7 @@ public ParquetInstructions build() {
parquetColumnNameToInstructions = null;
return new ReadOnly(columnNameToInstructionsOut, parquetColumnNameToColumnNameOut, compressionCodecName,
maximumDictionaryKeys, maximumDictionarySize, isLegacyParquet, targetPageSize, isRefreshing,
awsRegionName, specialInstructions);
specialInstructions);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1142,9 +1142,7 @@ public static ParquetFileReader getParquetFileReaderChecked(
if (parquetFileURI.getScheme() != null && parquetFileURI.getScheme().equals(S3_PARQUET_FILE_URI_SCHEME)) {
return new ParquetFileReader(parquetFileURI,
new CachedChannelProvider(
new S3SeekableChannelProvider(readInstructions.getAwsRegionName(),
parquetFileURI.toString()),
1 << 7));
new S3SeekableChannelProvider(parquetFileURI, readInstructions), 1 << 7));
}
return new ParquetFileReader(
parquetFileURI,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@
@BuildableStyle
public abstract class S3ParquetInstructions {

private final static int DEFAULT_MAX_CONCURRENT_REQUESTS = 20;
private final static int DEFAULT_MAX_CONCURRENT_REQUESTS = 50;
private final static int DEFAULT_READ_AHEAD_COUNT = 1;
private final static int DEFAULT_MAX_FRAGMENT_SIZE = 512 << 20; // 5 MB
private final static int MIN_MAX_FRAGMENT_SIZE = 8 << 10; // 8 KB
private final static int DEFAULT_FRAGMENT_SIZE = 512 << 20; // 5 MB
private final static int MIN_FRAGMENT_SIZE = 8 << 10; // 8 KB
private final static int DEFAULT_MAX_CACHE_SIZE = 50;

public static Builder builder() {
return S3ParquetInstructions.builder();
return ImmutableS3ParquetInstructions.builder();
}

public abstract String awsRegionName();
Expand All @@ -30,8 +30,8 @@ public int readAheadCount() {
}

@Value.Default
public int maxFragmentSize() {
return DEFAULT_MAX_FRAGMENT_SIZE;
public int fragmentSize() {
return DEFAULT_FRAGMENT_SIZE;
}

@Value.Default
Expand All @@ -55,8 +55,8 @@ final void boundsCheckReadAheadCount() {

@Value.Check
final void boundsCheckMaxFragmentSize() {
if (maxFragmentSize() < MIN_MAX_FRAGMENT_SIZE) {
throw new IllegalArgumentException("maxFragmentSize(=" + maxFragmentSize() + ") must be >= 8*1024 or 8 KB");
if (fragmentSize() < MIN_FRAGMENT_SIZE) {
throw new IllegalArgumentException("fragmentSize(=" + fragmentSize() + ") must be >= 8*1024 or 8 KB");
}
}

Expand All @@ -74,7 +74,7 @@ public interface Builder {

Builder readAheadCount(int readAheadCount);

Builder maxFragmentSize(int maxFragmentSize);
Builder fragmentSize(int fragmentSize);

Builder maxCacheSize(int maxCacheSize);

Expand Down
Loading

0 comments on commit af10a1a

Please sign in to comment.