Skip to content

Commit

Permalink
feat(stream): exponential backoff retry
Browse files Browse the repository at this point in the history
Signed-off-by: Robin Han <[email protected]>
  • Loading branch information
superhx committed Dec 20, 2024
1 parent c88f06e commit e16289f
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -60,7 +61,6 @@ public abstract class AbstractObjectStorage implements ObjectStorage {
static final Logger LOGGER = LoggerFactory.getLogger(AbstractObjectStorage.class);
private static final int MAX_INFLIGHT_FAST_RETRY_COUNT = 5;

private static final int DEFAULT_RETRY_DELAY = 100;
private static final AtomicInteger INDEX = new AtomicInteger(-1);
private static final int DEFAULT_CONCURRENCY_PER_CORE = 25;
private static final int MIN_CONCURRENCY = 50;
Expand Down Expand Up @@ -305,7 +305,7 @@ private void write0(WriteOptions options, String path, ByteBuf data, Completable
}
} else {
LOGGER.warn("PutObject for object {} fail, retry later", path, cause);
scheduler.schedule(() -> write0(retryOptions, path, data, cf), retryDelay(S3Operation.PUT_OBJECT), TimeUnit.MILLISECONDS);
scheduler.schedule(() -> write0(retryOptions, path, data, cf), retryDelay(S3Operation.PUT_OBJECT, retryOptions.retryCountGetAndAdd()), TimeUnit.MILLISECONDS);
}
return null;
});
Expand Down Expand Up @@ -336,7 +336,7 @@ private void createMultipartUpload0(WriteOptions options, String path, Completab
cf.completeExceptionally(cause);
} else {
LOGGER.warn("CreateMultipartUpload for object {} fail, retry later", path, cause);
scheduler.schedule(() -> createMultipartUpload0(options, path, cf), retryDelay(S3Operation.CREATE_MULTI_PART_UPLOAD), TimeUnit.MILLISECONDS);
scheduler.schedule(() -> createMultipartUpload0(options, path, cf), retryDelay(S3Operation.CREATE_MULTI_PART_UPLOAD, options.retryCountGetAndAdd()), TimeUnit.MILLISECONDS);
}
return null;
});
Expand Down Expand Up @@ -383,7 +383,7 @@ private void uploadPart0(WriteOptions options, String path, String uploadId, int
cf.completeExceptionally(cause);
} else {
LOGGER.warn("UploadPart for object {}-{} fail, retry later", path, partNumber, cause);
scheduler.schedule(() -> uploadPart0(options, path, uploadId, partNumber, data, cf), retryDelay(S3Operation.UPLOAD_PART), TimeUnit.MILLISECONDS);
scheduler.schedule(() -> uploadPart0(options, path, uploadId, partNumber, data, cf), retryDelay(S3Operation.UPLOAD_PART, options.retryCountGetAndAdd()), TimeUnit.MILLISECONDS);
}
return null;
});
Expand Down Expand Up @@ -419,7 +419,7 @@ private void uploadPartCopy0(WriteOptions options, String sourcePath, String pat
long nextApiCallAttemptTimeout = Math.min(options.apiCallAttemptTimeout() * 2, TimeUnit.MINUTES.toMillis(10));
LOGGER.warn("UploadPartCopy for object {}-{} [{}, {}] fail, retry later with apiCallAttemptTimeout={}", path, partNumber, start, end, nextApiCallAttemptTimeout, cause);
options.apiCallAttemptTimeout(nextApiCallAttemptTimeout);
scheduler.schedule(() -> uploadPartCopy0(options, sourcePath, path, start, end, uploadId, partNumber, cf), retryDelay(S3Operation.UPLOAD_PART_COPY), TimeUnit.MILLISECONDS);
scheduler.schedule(() -> uploadPartCopy0(options, sourcePath, path, start, end, uploadId, partNumber, cf), retryDelay(S3Operation.UPLOAD_PART_COPY, options.retryCountGetAndAdd()), TimeUnit.MILLISECONDS);
}
return null;
});
Expand Down Expand Up @@ -458,14 +458,14 @@ private void completeMultipartUpload0(WriteOptions options, String path, String
.whenComplete((nil, t) -> {
if (t != null) {
LOGGER.warn("CompleteMultipartUpload for object {} fail, retry later", path, cause);
scheduler.schedule(() -> completeMultipartUpload0(options, path, uploadId, parts, cf), retryDelay(S3Operation.COMPLETE_MULTI_PART_UPLOAD), TimeUnit.MILLISECONDS);
scheduler.schedule(() -> completeMultipartUpload0(options, path, uploadId, parts, cf), retryDelay(S3Operation.COMPLETE_MULTI_PART_UPLOAD, options.retryCountGetAndAdd()), TimeUnit.MILLISECONDS);
} else {
cf.complete(null);
}
});
} else {
LOGGER.warn("CompleteMultipartUpload for object {} fail, retry later", path, cause);
scheduler.schedule(() -> completeMultipartUpload0(options, path, uploadId, parts, cf), retryDelay(S3Operation.COMPLETE_MULTI_PART_UPLOAD), TimeUnit.MILLISECONDS);
scheduler.schedule(() -> completeMultipartUpload0(options, path, uploadId, parts, cf), retryDelay(S3Operation.COMPLETE_MULTI_PART_UPLOAD, options.retryCountGetAndAdd()), TimeUnit.MILLISECONDS);
}
return null;
});
Expand Down Expand Up @@ -542,12 +542,12 @@ abstract CompletableFuture<Void> doCompleteMultipartUpload(WriteOptions options,

abstract CompletableFuture<List<ObjectInfo>> doList(String prefix);

protected int retryDelay(S3Operation operation) {
protected int retryDelay(S3Operation operation, int retryCount) {
switch (operation) {
case UPLOAD_PART_COPY:
return 1000;
default:
return DEFAULT_RETRY_DELAY;
return ThreadLocalRandom.current().nextInt(1000) + Math.min(1000 * (1 << Math.max(retryCount, 16)), (int) (TimeUnit.MINUTES.toMillis(1)));
}
}

Expand Down Expand Up @@ -650,7 +650,7 @@ private void mergedRangeRead0(ReadOptions options, String path, long start, long
cf.completeExceptionally(cause);
} else {
LOGGER.warn("GetObject for object {} [{}, {}) fail, retry later", path, start, end, cause);
scheduler.schedule(() -> mergedRangeRead0(options, path, start, end, cf), retryDelay(S3Operation.GET_OBJECT), TimeUnit.MILLISECONDS);
scheduler.schedule(() -> mergedRangeRead0(options, path, start, end, cf), retryDelay(S3Operation.GET_OBJECT, options.retryCountGetAndAdd()), TimeUnit.MILLISECONDS);
}
S3OperationStats.getInstance().getObjectStats(size, false).record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
return null;
Expand Down Expand Up @@ -811,9 +811,9 @@ boolean tryMerge(AbstractObjectStorage.ReadTask readTask) {

private boolean canMerge(AbstractObjectStorage.ReadTask readTask) {
return objectPath != null &&
objectPath.equals(readTask.objectPath) &&
dataSparsityRate <= this.maxMergeReadSparsityRate &&
readTask.end != RANGE_READ_TO_END;
objectPath.equals(readTask.objectPath) &&
dataSparsityRate <= this.maxMergeReadSparsityRate &&
readTask.end != RANGE_READ_TO_END;
}

void handleReadCompleted(ByteBuf rst, Throwable ex) {
Expand Down Expand Up @@ -881,9 +881,9 @@ public boolean equals(Object obj) {
return false;
var that = (AbstractObjectStorage.ReadTask) obj;
return Objects.equals(this.objectPath, that.objectPath) &&
this.start == that.start &&
this.end == that.end &&
Objects.equals(this.cf, that.cf);
this.start == that.start &&
this.end == that.end &&
Objects.equals(this.cf, that.cf);
}

@Override
Expand All @@ -894,10 +894,10 @@ public int hashCode() {
@Override
public String toString() {
return "ReadTask[" +
"s3ObjectMetadata=" + objectPath + ", " +
"start=" + start + ", " +
"end=" + end + ", " +
"cf=" + cf + ']';
"s3ObjectMetadata=" + objectPath + ", " +
"start=" + start + ", " +
"end=" + end + ", " +
"cf=" + cf + ']';
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ class WriteOptions {
private short bucketId;
private boolean enableFastRetry;
private boolean retry;
private int retryCount;

public WriteOptions throttleStrategy(ThrottleStrategy throttleStrategy) {
this.throttleStrategy = throttleStrategy;
Expand Down Expand Up @@ -180,6 +181,16 @@ public boolean retry() {
return retry;
}

public int retryCountGetAndAdd() {
int oldRetryCount = this.retryCount;
this.retryCount = retryCount + 1;
return oldRetryCount;
}

public int retryCount() {
return retryCount;
}

public WriteOptions copy() {
WriteOptions copy = new WriteOptions();
copy.throttleStrategy = throttleStrategy;
Expand All @@ -188,6 +199,7 @@ public WriteOptions copy() {
copy.bucketId = bucketId;
copy.enableFastRetry = enableFastRetry;
copy.retry = retry;
copy.retryCount = retryCount;
return copy;
}
}
Expand All @@ -197,6 +209,7 @@ class ReadOptions {

private ThrottleStrategy throttleStrategy = ThrottleStrategy.BYPASS;
private short bucket = UNSET_BUCKET;
private int retryCount;

public ReadOptions throttleStrategy(ThrottleStrategy throttleStrategy) {
this.throttleStrategy = throttleStrategy;
Expand All @@ -215,6 +228,16 @@ public ThrottleStrategy throttleStrategy() {
public short bucket() {
return bucket;
}

public int retryCountGetAndAdd() {
int oldRetryCount = this.retryCount;
this.retryCount = retryCount + 1;
return oldRetryCount;
}

public int retryCount() {
return retryCount;
}
}

class WriteResult {
Expand Down

0 comments on commit e16289f

Please sign in to comment.