Skip to content

Commit

Permalink
API, AWS: Add RetryableInputStream and use that in S3InputStream
Browse files Browse the repository at this point in the history
Co-authored-by: [email protected]
  • Loading branch information
amogh-jahagirdar committed Jun 3, 2024
1 parent bd046f8 commit f4788e7
Show file tree
Hide file tree
Showing 6 changed files with 378 additions and 9 deletions.
112 changes: 112 additions & 0 deletions api/src/main/java/org/apache/iceberg/io/RetryableInputStream.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.io;

import dev.failsafe.Failsafe;
import dev.failsafe.RetryPolicy;
import dev.failsafe.RetryPolicyBuilder;
import java.io.IOException;
import java.io.InputStream;
import java.net.SocketTimeoutException;
import java.time.Duration;
import java.util.List;
import java.util.function.Supplier;
import javax.net.ssl.SSLException;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;

/**
* RetryableInputStream wraps over an underlying InputStream and retries failures encountered when
* reading through the stream. On retries, the underlying streams will be reinitialized.
*/
public class RetryableInputStream extends InputStream {

private final InputStream underlyingStream;
private final RetryPolicy<Object> retryPolicy;

private RetryableInputStream(InputStream underlyingStream, RetryPolicy<Object> retryPolicy) {
this.underlyingStream = underlyingStream;
this.retryPolicy = retryPolicy;
}

@Override
public int read() throws IOException {
return Failsafe.with(retryPolicy).get(() -> underlyingStream.read());
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
return Failsafe.with(retryPolicy).get(() -> underlyingStream.read(b, off, len));
}

@Override
public void close() throws IOException {
underlyingStream.close();
}

public static RetryableInputStream.Builder builderFor(Supplier<InputStream> newStreamSupplier) {
return new Builder(newStreamSupplier);
}

public static class Builder {

private InputStream underlyingStream;
private final Supplier<InputStream> newStreamProvider;
private List<Class<? extends Exception>> retryableExceptions =
ImmutableList.of(SSLException.class, SocketTimeoutException.class);
private int numRetries = 3;

private long durationMs = 500;

private Builder(Supplier<InputStream> newStreamProvider) {
this.underlyingStream = newStreamProvider.get();
this.newStreamProvider = newStreamProvider;
}

public Builder retryOn(Class<? extends Exception>... exceptions) {
this.retryableExceptions = Lists.newArrayList(exceptions);
return this;
}

public Builder withRetries(int numRetries) {
this.numRetries = numRetries;
return this;
}

public Builder withRetryDelay(long durationMs) {
this.durationMs = durationMs;
return this;
}

public RetryableInputStream build() {
RetryPolicyBuilder<Object> retryPolicyBuilder = RetryPolicy.builder();
retryableExceptions.forEach(retryPolicyBuilder::handle);
retryPolicyBuilder.onRetry(
(event) -> {
this.underlyingStream = newStreamProvider.get();
});
return new RetryableInputStream(
underlyingStream,
retryPolicyBuilder
.withMaxRetries(numRetries)
.withDelay(Duration.ofMillis(durationMs))
.build());
}
}
}
12 changes: 10 additions & 2 deletions aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.iceberg.io.FileIOMetricsContext;
import org.apache.iceberg.io.IOUtil;
import org.apache.iceberg.io.RangeReadable;
import org.apache.iceberg.io.RetryableInputStream;
import org.apache.iceberg.io.SeekableInputStream;
import org.apache.iceberg.metrics.Counter;
import org.apache.iceberg.metrics.MetricsContext;
Expand Down Expand Up @@ -139,7 +140,11 @@ private InputStream readRange(String range) {

S3RequestUtil.configureEncryption(s3FileIOProperties, requestBuilder);

return s3.getObject(requestBuilder.build(), ResponseTransformer.toInputStream());
stream =
RetryableInputStream.builderFor(
() -> s3.getObject(requestBuilder.build(), ResponseTransformer.toInputStream()))
.build();
return stream;
}

@Override
Expand Down Expand Up @@ -189,7 +194,10 @@ private void openStream() throws IOException {
closeStream();

try {
stream = s3.getObject(requestBuilder.build(), ResponseTransformer.toInputStream());
stream =
RetryableInputStream.builderFor(
() -> s3.getObject(requestBuilder.build(), ResponseTransformer.toInputStream()))
.build();
} catch (NoSuchKeyException e) {
throw new NotFoundException(e, "Location does not exist: %s", location);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.aws.s3;

import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;

import dev.failsafe.FailsafeException;
import java.io.IOException;
import java.io.InputStream;
import java.net.SocketTimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
import javax.net.ssl.SSLException;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import software.amazon.awssdk.awscore.exception.AwsServiceException;
import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.core.sync.ResponseTransformer;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
import software.amazon.awssdk.services.s3.model.CreateBucketResponse;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.PutObjectResponse;

public class TestFuzzyS3InputStream extends TestS3InputStream {

private static final int DATA_SIZE = 3 * 1024 * 1024;

@ParameterizedTest
@MethodSource("retryableExceptions")
public void testReadWithFuzzyStreamRetrySucceed(IOException exception) throws Exception {
testRead(
fuzzyStreamClient(new AtomicInteger(3), exception), new S3FileIOProperties(), DATA_SIZE);
}

@ParameterizedTest
@MethodSource("retryableExceptions")
public void testReadWithFuzzyStreamExhaustedRetries(IOException exception) {
assertThatThrownBy(
() ->
testRead(
fuzzyStreamClient(new AtomicInteger(5), exception),
new S3FileIOProperties(),
DATA_SIZE))
.isInstanceOf(FailsafeException.class)
.hasCause(exception);
}

@ParameterizedTest
@MethodSource("nonRetryableExceptions")
public void testReadWithFuzzyStreamNonRetryableException(IOException exception) {
assertThatThrownBy(
() ->
testRead(
fuzzyStreamClient(new AtomicInteger(3), exception),
new S3FileIOProperties(),
DATA_SIZE))
.isInstanceOf(FailsafeException.class)
.hasCause(exception);
}

private static Stream<Arguments> retryableExceptions() {
return Stream.of(
Arguments.of(
new SocketTimeoutException("socket timeout exception"),
new SSLException("some ssl exception")));
}

private static Stream<Arguments> nonRetryableExceptions() {
return Stream.of(Arguments.of(new IOException("some generic non-retryable IO exception")));
}

private S3Client fuzzyStreamClient(AtomicInteger counter, IOException failure) {
S3Client fuzzyClient = spy(new S3ClientWrapper(s3Client()));
doAnswer(
invocation ->
new FuzzyResponseInputStream(invocation.callRealMethod(), counter, failure))
.when(fuzzyClient)
.getObject(any(GetObjectRequest.class), any(ResponseTransformer.class));
return fuzzyClient;
}

/** Wrapper for S3 client, used to mock the final class DefaultS3Client */
public static class S3ClientWrapper implements S3Client {

private final S3Client delegate;

public S3ClientWrapper(S3Client delegate) {
this.delegate = delegate;
}

@Override
public String serviceName() {
return delegate.serviceName();
}

@Override
public void close() {
delegate.close();
}

@Override
public <ReturnT> ReturnT getObject(
GetObjectRequest getObjectRequest,
ResponseTransformer<GetObjectResponse, ReturnT> responseTransformer)
throws AwsServiceException, SdkClientException {
return delegate.getObject(getObjectRequest, responseTransformer);
}

@Override
public HeadObjectResponse headObject(HeadObjectRequest headObjectRequest)
throws AwsServiceException, SdkClientException {
return delegate.headObject(headObjectRequest);
}

@Override
public PutObjectResponse putObject(PutObjectRequest putObjectRequest, RequestBody requestBody)
throws AwsServiceException, SdkClientException {
return delegate.putObject(putObjectRequest, requestBody);
}

@Override
public CreateBucketResponse createBucket(CreateBucketRequest createBucketRequest)
throws AwsServiceException, SdkClientException {
return delegate.createBucket(createBucketRequest);
}
}

static class FuzzyResponseInputStream extends InputStream {

private final ResponseInputStream<GetObjectResponse> delegate;
private final AtomicInteger counter;
private final int round;
private final IOException exception;

public FuzzyResponseInputStream(
Object invocationResponse, AtomicInteger counter, IOException exception) {
this.delegate = (ResponseInputStream<GetObjectResponse>) invocationResponse;
this.counter = counter;
this.round = counter.get();
this.exception = exception;
}

private void checkCounter() throws IOException {
// for every round of n invocations, only the last call succeeds
if (counter.decrementAndGet() == 0) {
counter.set(round);
} else {
throw exception;
}
}

@Override
public int read() throws IOException {
checkCounter();
return delegate.read();
}

@Override
public int read(byte[] b) throws IOException {
checkCounter();
return delegate.read(b);
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
checkCounter();
return delegate.read(b, off, len);
}

@Override
public long skip(long n) throws IOException {
return delegate.skip(n);
}

@Override
public int available() throws IOException {
return delegate.available();
}

@Override
public void close() throws IOException {
delegate.close();
}

@Override
public synchronized void mark(int readlimit) {
delegate.mark(readlimit);
}

@Override
public synchronized void reset() throws IOException {
delegate.reset();
}

@Override
public boolean markSupported() {
return delegate.markSupported();
}
}
}
Loading

0 comments on commit f4788e7

Please sign in to comment.