Skip to content

Commit

Permalink
fix(issue2139): prevent read object info from closed ObjectReader
Browse files Browse the repository at this point in the history
Signed-off-by: Shichao Nie <[email protected]>
  • Loading branch information
SCNieh committed Nov 8, 2024
1 parent c098ee3 commit ebd5e51
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,19 @@
import com.automq.stream.s3.objects.ObjectAttributes;
import com.automq.stream.utils.biniarysearch.AbstractOrderedCollection;
import com.automq.stream.utils.biniarysearch.ComparableItem;
import io.netty.buffer.ByteBuf;

import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.netty.buffer.ByteBuf;

import static com.automq.stream.s3.ByteBufAlloc.BLOCK_CACHE;
import static com.automq.stream.s3.CompositeObject.FOOTER_MAGIC;
Expand All @@ -41,6 +45,7 @@ public class CompositeObjectReader implements ObjectReader {
private CompletableFuture<BasicObjectInfo> basicObjectInfoCf;
private CompletableFuture<Integer> sizeCf;
private final AtomicInteger refCount = new AtomicInteger(1);
private final AtomicBoolean isShutdown = new AtomicBoolean(false);

public CompositeObjectReader(S3ObjectMetadata objectMetadata, RangeReader rangeReader) {
this.objectMetadata = objectMetadata;
Expand All @@ -59,6 +64,9 @@ public String objectKey() {

@Override
public synchronized CompletableFuture<BasicObjectInfo> basicObjectInfo() {
if (isShutdown.get()) {
return CompletableFuture.failedFuture(new IllegalStateException("ObjectReader is already shutdown"));
}
if (basicObjectInfoCf == null) {
this.basicObjectInfoCf = new CompletableFuture<>();
this.basicObjectInfoCf.exceptionally(ex -> {
Expand Down Expand Up @@ -101,6 +109,9 @@ public synchronized CompletableFuture<Integer> size() {
}

public synchronized void close0() {
if (!isShutdown.compareAndSet(false, true)) {
return;
}
if (basicObjectInfoCf != null) {
basicObjectInfoCf.thenAccept(BasicObjectInfo::close);
}
Expand Down
8 changes: 8 additions & 0 deletions s3stream/src/main/java/com/automq/stream/s3/ObjectReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -89,6 +90,7 @@ class DefaultObjectReader implements ObjectReader {
private CompletableFuture<BasicObjectInfo> basicObjectInfoCf;
private CompletableFuture<Integer> sizeCf;
private final AtomicInteger refCount = new AtomicInteger(1);
private final AtomicBoolean isShutdown = new AtomicBoolean(false);

public DefaultObjectReader(S3ObjectMetadata metadata, ObjectStorage objectStorage) {
this.metadata = metadata;
Expand All @@ -105,6 +107,9 @@ public String objectKey() {
}

public synchronized CompletableFuture<BasicObjectInfo> basicObjectInfo() {
if (isShutdown.get()) {
return CompletableFuture.failedFuture(new IllegalStateException("ObjectReader is already shutdown"));
}
if (basicObjectInfoCf == null) {
this.basicObjectInfoCf = new CompletableFuture<>();
asyncGetBasicObjectInfo();
Expand Down Expand Up @@ -186,6 +191,9 @@ public synchronized CompletableFuture<Integer> size() {
}

public synchronized void close0() {
if (!isShutdown.compareAndSet(false, true)) {
return;
}
if (basicObjectInfoCf != null) {
basicObjectInfoCf.thenAccept(BasicObjectInfo::close);
}
Expand Down

0 comments on commit ebd5e51

Please sign in to comment.