Skip to content

Commit

Permalink
fix(issue2139): prevent read object info from closed ObjectReader (#2141
Browse files Browse the repository at this point in the history
)

Signed-off-by: Shichao Nie <[email protected]>
  • Loading branch information
SCNieh authored Nov 8, 2024
1 parent cbaf3a2 commit 90516c7
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.automq.stream.utils.biniarysearch.AbstractOrderedCollection;
import com.automq.stream.utils.biniarysearch.ComparableItem;

import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -44,6 +45,7 @@ public class CompositeObjectReader implements ObjectReader {
private CompletableFuture<BasicObjectInfo> basicObjectInfoCf;
private CompletableFuture<Integer> sizeCf;
private final AtomicInteger refCount = new AtomicInteger(1);
private final AtomicBoolean isShutdown = new AtomicBoolean(false);

public CompositeObjectReader(S3ObjectMetadata objectMetadata, RangeReader rangeReader) {
this.objectMetadata = objectMetadata;
Expand All @@ -62,6 +64,9 @@ public String objectKey() {

@Override
public synchronized CompletableFuture<BasicObjectInfo> basicObjectInfo() {
if (isShutdown.get()) {
return CompletableFuture.failedFuture(new IllegalStateException("ObjectReader is already shutdown"));
}
if (basicObjectInfoCf == null) {
this.basicObjectInfoCf = new CompletableFuture<>();
this.basicObjectInfoCf.exceptionally(ex -> {
Expand Down Expand Up @@ -104,6 +109,9 @@ public synchronized CompletableFuture<Integer> size() {
}

public synchronized void close0() {
if (!isShutdown.compareAndSet(false, true)) {
return;
}
if (basicObjectInfoCf != null) {
basicObjectInfoCf.thenAccept(BasicObjectInfo::close);
}
Expand Down
18 changes: 11 additions & 7 deletions s3stream/src/main/java/com/automq/stream/s3/ObjectReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,9 @@
import com.automq.stream.s3.network.ThrottleStrategy;
import com.automq.stream.s3.objects.ObjectAttributes;
import com.automq.stream.s3.operator.ObjectStorage;
import com.automq.stream.s3.operator.ObjectStorage.ReadOptions;
import com.automq.stream.utils.CloseableIterator;
import com.automq.stream.utils.biniarysearch.IndexBlockOrderedBytes;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.netty.buffer.ByteBuf;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
Expand All @@ -33,9 +29,10 @@
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import io.netty.buffer.ByteBuf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static com.automq.stream.s3.ByteBufAlloc.BLOCK_CACHE;
import static com.automq.stream.s3.ByteBufAlloc.READ_INDEX_BLOCK;
Expand Down Expand Up @@ -100,6 +97,7 @@ class DefaultObjectReader implements ObjectReader {
private CompletableFuture<BasicObjectInfo> basicObjectInfoCf;
private CompletableFuture<Integer> sizeCf;
private final AtomicInteger refCount = new AtomicInteger(1);
private final AtomicBoolean isShutdown = new AtomicBoolean(false);

public DefaultObjectReader(S3ObjectMetadata metadata, ObjectStorage objectStorage) {
this.metadata = metadata;
Expand All @@ -116,6 +114,9 @@ public String objectKey() {
}

public synchronized CompletableFuture<BasicObjectInfo> basicObjectInfo() {
if (isShutdown.get()) {
return CompletableFuture.failedFuture(new IllegalStateException("ObjectReader is already shutdown"));
}
if (basicObjectInfoCf == null) {
this.basicObjectInfoCf = new CompletableFuture<>();
asyncGetBasicObjectInfo();
Expand Down Expand Up @@ -199,6 +200,9 @@ public synchronized CompletableFuture<Integer> size() {
}

public synchronized void close0() {
if (!isShutdown.compareAndSet(false, true)) {
return;
}
if (basicObjectInfoCf != null) {
basicObjectInfoCf.thenAccept(BasicObjectInfo::close);
}
Expand Down

0 comments on commit 90516c7

Please sign in to comment.