From f4788e76124cd276fc2ddaa49d1cabf37ca88d91 Mon Sep 17 00:00:00 2001 From: Amogh Jahagirdar Date: Mon, 3 Jun 2024 12:51:10 -0600 Subject: [PATCH] API, AWS: Add RetryableInputStream and use that in S3InputStream Co-authored-by: yzhaoqin@amazon.com --- .../iceberg/io/RetryableInputStream.java | 112 +++++++++ .../apache/iceberg/aws/s3/S3InputStream.java | 12 +- .../aws/s3/TestFuzzyS3InputStream.java | 227 ++++++++++++++++++ .../iceberg/aws/s3/TestS3InputStream.java | 30 ++- build.gradle | 2 + gradle/libs.versions.toml | 4 +- 6 files changed, 378 insertions(+), 9 deletions(-) create mode 100644 api/src/main/java/org/apache/iceberg/io/RetryableInputStream.java create mode 100644 aws/src/test/java/org/apache/iceberg/aws/s3/TestFuzzyS3InputStream.java diff --git a/api/src/main/java/org/apache/iceberg/io/RetryableInputStream.java b/api/src/main/java/org/apache/iceberg/io/RetryableInputStream.java new file mode 100644 index 000000000000..2e3902958921 --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/io/RetryableInputStream.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.io; + +import dev.failsafe.Failsafe; +import dev.failsafe.RetryPolicy; +import dev.failsafe.RetryPolicyBuilder; +import java.io.IOException; +import java.io.InputStream; +import java.net.SocketTimeoutException; +import java.time.Duration; +import java.util.List; +import java.util.function.Supplier; +import javax.net.ssl.SSLException; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; + +/** + * RetryableInputStream wraps over an underlying InputStream and retries failures encountered when + * reading through the stream. On retries, the underlying streams will be reinitialized. + */ +public class RetryableInputStream extends InputStream { + + private final InputStream underlyingStream; + private final RetryPolicy retryPolicy; + + private RetryableInputStream(InputStream underlyingStream, RetryPolicy retryPolicy) { + this.underlyingStream = underlyingStream; + this.retryPolicy = retryPolicy; + } + + @Override + public int read() throws IOException { + return Failsafe.with(retryPolicy).get(() -> underlyingStream.read()); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + return Failsafe.with(retryPolicy).get(() -> underlyingStream.read(b, off, len)); + } + + @Override + public void close() throws IOException { + underlyingStream.close(); + } + + public static RetryableInputStream.Builder builderFor(Supplier newStreamSupplier) { + return new Builder(newStreamSupplier); + } + + public static class Builder { + + private InputStream underlyingStream; + private final Supplier newStreamProvider; + private List> retryableExceptions = + ImmutableList.of(SSLException.class, SocketTimeoutException.class); + private int numRetries = 3; + + private long durationMs = 500; + + private Builder(Supplier newStreamProvider) { + this.underlyingStream = newStreamProvider.get(); + this.newStreamProvider = newStreamProvider; + } + + public Builder retryOn(Class... exceptions) { + this.retryableExceptions = Lists.newArrayList(exceptions); + return this; + } + + public Builder withRetries(int numRetries) { + this.numRetries = numRetries; + return this; + } + + public Builder withRetryDelay(long durationMs) { + this.durationMs = durationMs; + return this; + } + + public RetryableInputStream build() { + RetryPolicyBuilder retryPolicyBuilder = RetryPolicy.builder(); + retryableExceptions.forEach(retryPolicyBuilder::handle); + retryPolicyBuilder.onRetry( + (event) -> { + this.underlyingStream = newStreamProvider.get(); + }); + return new RetryableInputStream( + underlyingStream, + retryPolicyBuilder + .withMaxRetries(numRetries) + .withDelay(Duration.ofMillis(durationMs)) + .build()); + } + } +} diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java b/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java index f1d6c30a27a5..c395eee32540 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java @@ -25,6 +25,7 @@ import org.apache.iceberg.io.FileIOMetricsContext; import org.apache.iceberg.io.IOUtil; import org.apache.iceberg.io.RangeReadable; +import org.apache.iceberg.io.RetryableInputStream; import org.apache.iceberg.io.SeekableInputStream; import org.apache.iceberg.metrics.Counter; import org.apache.iceberg.metrics.MetricsContext; @@ -139,7 +140,11 @@ private InputStream readRange(String range) { S3RequestUtil.configureEncryption(s3FileIOProperties, requestBuilder); - return s3.getObject(requestBuilder.build(), ResponseTransformer.toInputStream()); + stream = + RetryableInputStream.builderFor( + () -> s3.getObject(requestBuilder.build(), ResponseTransformer.toInputStream())) + .build(); + return stream; } @Override @@ -189,7 +194,10 @@ private void openStream() throws IOException { closeStream(); try { - stream = s3.getObject(requestBuilder.build(), ResponseTransformer.toInputStream()); + stream = + RetryableInputStream.builderFor( + () -> s3.getObject(requestBuilder.build(), ResponseTransformer.toInputStream())) + .build(); } catch (NoSuchKeyException e) { throw new NotFoundException(e, "Location does not exist: %s", location); } diff --git a/aws/src/test/java/org/apache/iceberg/aws/s3/TestFuzzyS3InputStream.java b/aws/src/test/java/org/apache/iceberg/aws/s3/TestFuzzyS3InputStream.java new file mode 100644 index 000000000000..509cd48419e7 --- /dev/null +++ b/aws/src/test/java/org/apache/iceberg/aws/s3/TestFuzzyS3InputStream.java @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.aws.s3; + +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.spy; + +import dev.failsafe.FailsafeException; +import java.io.IOException; +import java.io.InputStream; +import java.net.SocketTimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Stream; +import javax.net.ssl.SSLException; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import software.amazon.awssdk.awscore.exception.AwsServiceException; +import software.amazon.awssdk.core.ResponseInputStream; +import software.amazon.awssdk.core.exception.SdkClientException; +import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.core.sync.ResponseTransformer; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.CreateBucketRequest; +import software.amazon.awssdk.services.s3.model.CreateBucketResponse; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.GetObjectResponse; +import software.amazon.awssdk.services.s3.model.HeadObjectRequest; +import software.amazon.awssdk.services.s3.model.HeadObjectResponse; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; +import software.amazon.awssdk.services.s3.model.PutObjectResponse; + +public class TestFuzzyS3InputStream extends TestS3InputStream { + + private static final int DATA_SIZE = 3 * 1024 * 1024; + + @ParameterizedTest + @MethodSource("retryableExceptions") + public void testReadWithFuzzyStreamRetrySucceed(IOException exception) throws Exception { + testRead( + fuzzyStreamClient(new AtomicInteger(3), exception), new S3FileIOProperties(), DATA_SIZE); + } + + @ParameterizedTest + @MethodSource("retryableExceptions") + public void testReadWithFuzzyStreamExhaustedRetries(IOException exception) { + assertThatThrownBy( + () -> + testRead( + fuzzyStreamClient(new AtomicInteger(5), exception), + new S3FileIOProperties(), + DATA_SIZE)) + .isInstanceOf(FailsafeException.class) + .hasCause(exception); + } + + @ParameterizedTest + @MethodSource("nonRetryableExceptions") + public void testReadWithFuzzyStreamNonRetryableException(IOException exception) { + assertThatThrownBy( + () -> + testRead( + fuzzyStreamClient(new AtomicInteger(3), exception), + new S3FileIOProperties(), + DATA_SIZE)) + .isInstanceOf(FailsafeException.class) + .hasCause(exception); + } + + private static Stream retryableExceptions() { + return Stream.of( + Arguments.of( + new SocketTimeoutException("socket timeout exception"), + new SSLException("some ssl exception"))); + } + + private static Stream nonRetryableExceptions() { + return Stream.of(Arguments.of(new IOException("some generic non-retryable IO exception"))); + } + + private S3Client fuzzyStreamClient(AtomicInteger counter, IOException failure) { + S3Client fuzzyClient = spy(new S3ClientWrapper(s3Client())); + doAnswer( + invocation -> + new FuzzyResponseInputStream(invocation.callRealMethod(), counter, failure)) + .when(fuzzyClient) + .getObject(any(GetObjectRequest.class), any(ResponseTransformer.class)); + return fuzzyClient; + } + + /** Wrapper for S3 client, used to mock the final class DefaultS3Client */ + public static class S3ClientWrapper implements S3Client { + + private final S3Client delegate; + + public S3ClientWrapper(S3Client delegate) { + this.delegate = delegate; + } + + @Override + public String serviceName() { + return delegate.serviceName(); + } + + @Override + public void close() { + delegate.close(); + } + + @Override + public ReturnT getObject( + GetObjectRequest getObjectRequest, + ResponseTransformer responseTransformer) + throws AwsServiceException, SdkClientException { + return delegate.getObject(getObjectRequest, responseTransformer); + } + + @Override + public HeadObjectResponse headObject(HeadObjectRequest headObjectRequest) + throws AwsServiceException, SdkClientException { + return delegate.headObject(headObjectRequest); + } + + @Override + public PutObjectResponse putObject(PutObjectRequest putObjectRequest, RequestBody requestBody) + throws AwsServiceException, SdkClientException { + return delegate.putObject(putObjectRequest, requestBody); + } + + @Override + public CreateBucketResponse createBucket(CreateBucketRequest createBucketRequest) + throws AwsServiceException, SdkClientException { + return delegate.createBucket(createBucketRequest); + } + } + + static class FuzzyResponseInputStream extends InputStream { + + private final ResponseInputStream delegate; + private final AtomicInteger counter; + private final int round; + private final IOException exception; + + public FuzzyResponseInputStream( + Object invocationResponse, AtomicInteger counter, IOException exception) { + this.delegate = (ResponseInputStream) invocationResponse; + this.counter = counter; + this.round = counter.get(); + this.exception = exception; + } + + private void checkCounter() throws IOException { + // for every round of n invocations, only the last call succeeds + if (counter.decrementAndGet() == 0) { + counter.set(round); + } else { + throw exception; + } + } + + @Override + public int read() throws IOException { + checkCounter(); + return delegate.read(); + } + + @Override + public int read(byte[] b) throws IOException { + checkCounter(); + return delegate.read(b); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + checkCounter(); + return delegate.read(b, off, len); + } + + @Override + public long skip(long n) throws IOException { + return delegate.skip(n); + } + + @Override + public int available() throws IOException { + return delegate.available(); + } + + @Override + public void close() throws IOException { + delegate.close(); + } + + @Override + public synchronized void mark(int readlimit) { + delegate.mark(readlimit); + } + + @Override + public synchronized void reset() throws IOException { + delegate.reset(); + } + + @Override + public boolean markSupported() { + return delegate.markSupported(); + } + } +} diff --git a/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3InputStream.java b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3InputStream.java index feaac4eadad5..33a0c0174143 100644 --- a/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3InputStream.java +++ b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3InputStream.java @@ -25,6 +25,7 @@ import org.apache.iceberg.io.IOUtil; import org.apache.iceberg.io.RangeReadable; import org.apache.iceberg.io.SeekableInputStream; +import org.apache.iceberg.metrics.MetricsContext; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -51,16 +52,18 @@ public void before() { @Test public void testRead() throws Exception { + testRead(s3, new S3FileIOProperties(), 10 * 1024 * 1024); + } + + protected void testRead(S3Client s3Client, S3FileIOProperties awsProperties, int dataSize) + throws Exception { S3URI uri = new S3URI("s3://bucket/path/to/read.dat"); - int dataSize = 1024 * 1024 * 10; byte[] data = randomData(dataSize); writeS3Data(uri, data); - try (SeekableInputStream in = new S3InputStream(s3, uri)) { + try (SeekableInputStream in = new S3InputStream(s3Client, uri)) { int readSize = 1024; - byte[] actual = new byte[readSize]; - readAndCheck(in, in.getPos(), readSize, data, false); readAndCheck(in, in.getPos(), readSize, data, true); @@ -109,6 +112,11 @@ private void readAndCheck( @Test public void testRangeRead() throws Exception { + testRangeRead(s3, new S3FileIOProperties()); + } + + protected void testRangeRead(S3Client s3Client, S3FileIOProperties awsProperties) + throws Exception { S3URI uri = new S3URI("s3://bucket/path/to/range-read.dat"); int dataSize = 1024 * 1024 * 10; byte[] expected = randomData(dataSize); @@ -120,7 +128,8 @@ public void testRangeRead() throws Exception { writeS3Data(uri, expected); - try (RangeReadable in = new S3InputStream(s3, uri)) { + try (RangeReadable in = + new S3InputStream(s3Client, uri, awsProperties, MetricsContext.nullMetrics())) { // first 1k position = 0; offset = 0; @@ -161,12 +170,17 @@ public void testClose() throws Exception { @Test public void testSeek() throws Exception { + testSeek(s3, new S3FileIOProperties()); + } + + protected void testSeek(S3Client s3Client, S3FileIOProperties awsProperties) throws Exception { S3URI uri = new S3URI("s3://bucket/path/to/seek.dat"); byte[] expected = randomData(1024 * 1024); writeS3Data(uri, expected); - try (SeekableInputStream in = new S3InputStream(s3, uri)) { + try (SeekableInputStream in = + new S3InputStream(s3Client, uri, awsProperties, MetricsContext.nullMetrics())) { in.seek(expected.length / 2); byte[] actual = new byte[expected.length / 2]; IOUtil.readFully(in, actual, 0, expected.length / 2); @@ -198,4 +212,8 @@ private void createBucket(String bucketName) { // don't do anything } } + + protected S3Client s3Client() { + return s3; + } } diff --git a/build.gradle b/build.gradle index 2b95fe291790..7f827290680c 100644 --- a/build.gradle +++ b/build.gradle @@ -300,6 +300,7 @@ project(':iceberg-api') { dependencies { implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow') + implementation libs.failsafe compileOnly libs.errorprone.annotations compileOnly libs.findbugs.jsr305 testImplementation libs.avro.avro @@ -468,6 +469,7 @@ project(':iceberg-aws') { annotationProcessor libs.immutables.value compileOnly libs.immutables.value implementation libs.caffeine + implementation libs.failsafe implementation platform(libs.jackson.bom) implementation "com.fasterxml.jackson.core:jackson-databind" implementation "com.fasterxml.jackson.core:jackson-core" diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 0ad9469ed8da..25837e12202f 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -37,6 +37,7 @@ delta-standalone = "3.1.0" delta-spark = "3.2.0" esotericsoftware-kryo = "4.0.3" errorprone-annotations = "2.27.0" +failsafe = "3.3.2" findbugs-jsr305 = "3.0.2" flink117 = { strictly = "1.17.2"} flink118 = { strictly = "1.18.1"} @@ -95,7 +96,7 @@ antlr-antlr4 = { module = "org.antlr:antlr4", version.ref = "antlr" } antlr-runtime = { module = "org.antlr:antlr4-runtime", version.ref = "antlr" } arrow-memory-netty = { module = "org.apache.arrow:arrow-memory-netty", version.ref = "arrow" } arrow-vector = { module = "org.apache.arrow:arrow-vector", version.ref = "arrow" } -avro-avro = { module = "org.apache.avro:avro", version.ref = "avro" } +avro-avro = { module = "org.apache.avro:avro", version.ref = "avro"} awssdk-bom = { module = "software.amazon.awssdk:bom", version.ref = "awssdk-bom" } awssdk-s3accessgrants = { module = "software.amazon.s3.accessgrants:aws-s3-accessgrants-java-plugin", version.ref = "awssdk-s3accessgrants" } azuresdk-bom = { module = "com.azure:azure-sdk-bom", version.ref = "azuresdk-bom" } @@ -104,6 +105,7 @@ calcite-core = { module = "org.apache.calcite:calcite-core", version.ref = "calc calcite-druid = { module = "org.apache.calcite:calcite-druid", version.ref = "calcite" } delta-standalone = { module = "io.delta:delta-standalone_2.12", version.ref = "delta-standalone" } errorprone-annotations = { module = "com.google.errorprone:error_prone_annotations", version.ref = "errorprone-annotations" } +failsafe = { module = "dev.failsafe:failsafe", version.ref = "failsafe"} findbugs-jsr305 = { module = "com.google.code.findbugs:jsr305", version.ref = "findbugs-jsr305" } flink117-avro = { module = "org.apache.flink:flink-avro", version.ref = "flink117" } flink117-connector-base = { module = "org.apache.flink:flink-connector-base", version.ref = "flink117" }