From 90516c76148dde8ba0566d6235c7d288e31ef7a5 Mon Sep 17 00:00:00 2001 From: Shichao Nie Date: Fri, 8 Nov 2024 11:47:56 +0800 Subject: [PATCH] fix(issue2139): prevent read object info from closed ObjectReader (#2141) Signed-off-by: Shichao Nie --- .../stream/s3/CompositeObjectReader.java | 8 ++++++++ .../com/automq/stream/s3/ObjectReader.java | 18 +++++++++++------- 2 files changed, 19 insertions(+), 7 deletions(-) diff --git a/s3stream/src/main/java/com/automq/stream/s3/CompositeObjectReader.java b/s3stream/src/main/java/com/automq/stream/s3/CompositeObjectReader.java index a04e7c6ef3..68c7f62d3d 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/CompositeObjectReader.java +++ b/s3stream/src/main/java/com/automq/stream/s3/CompositeObjectReader.java @@ -17,6 +17,7 @@ import com.automq.stream.utils.biniarysearch.AbstractOrderedCollection; import com.automq.stream.utils.biniarysearch.ComparableItem; +import java.util.concurrent.atomic.AtomicBoolean; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,6 +45,7 @@ public class CompositeObjectReader implements ObjectReader { private CompletableFuture basicObjectInfoCf; private CompletableFuture sizeCf; private final AtomicInteger refCount = new AtomicInteger(1); + private final AtomicBoolean isShutdown = new AtomicBoolean(false); public CompositeObjectReader(S3ObjectMetadata objectMetadata, RangeReader rangeReader) { this.objectMetadata = objectMetadata; @@ -62,6 +64,9 @@ public String objectKey() { @Override public synchronized CompletableFuture basicObjectInfo() { + if (isShutdown.get()) { + return CompletableFuture.failedFuture(new IllegalStateException("ObjectReader is already shutdown")); + } if (basicObjectInfoCf == null) { this.basicObjectInfoCf = new CompletableFuture<>(); this.basicObjectInfoCf.exceptionally(ex -> { @@ -104,6 +109,9 @@ public synchronized CompletableFuture size() { } public synchronized void close0() { + if (!isShutdown.compareAndSet(false, true)) { + return; + } if (basicObjectInfoCf != null) { basicObjectInfoCf.thenAccept(BasicObjectInfo::close); } diff --git a/s3stream/src/main/java/com/automq/stream/s3/ObjectReader.java b/s3stream/src/main/java/com/automq/stream/s3/ObjectReader.java index d1ccdc83fe..11738772fc 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/ObjectReader.java +++ b/s3stream/src/main/java/com/automq/stream/s3/ObjectReader.java @@ -18,13 +18,9 @@ import com.automq.stream.s3.network.ThrottleStrategy; import com.automq.stream.s3.objects.ObjectAttributes; import com.automq.stream.s3.operator.ObjectStorage; -import com.automq.stream.s3.operator.ObjectStorage.ReadOptions; import com.automq.stream.utils.CloseableIterator; import com.automq.stream.utils.biniarysearch.IndexBlockOrderedBytes; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - +import io.netty.buffer.ByteBuf; import java.util.ArrayList; import java.util.Iterator; import java.util.LinkedList; @@ -33,9 +29,10 @@ import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; - -import io.netty.buffer.ByteBuf; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import static com.automq.stream.s3.ByteBufAlloc.BLOCK_CACHE; import static com.automq.stream.s3.ByteBufAlloc.READ_INDEX_BLOCK; @@ -100,6 +97,7 @@ class DefaultObjectReader implements ObjectReader { private CompletableFuture basicObjectInfoCf; private CompletableFuture sizeCf; private final AtomicInteger refCount = new AtomicInteger(1); + private final AtomicBoolean isShutdown = new AtomicBoolean(false); public DefaultObjectReader(S3ObjectMetadata metadata, ObjectStorage objectStorage) { this.metadata = metadata; @@ -116,6 +114,9 @@ public String objectKey() { } public synchronized CompletableFuture basicObjectInfo() { + if (isShutdown.get()) { + return CompletableFuture.failedFuture(new IllegalStateException("ObjectReader is already shutdown")); + } if (basicObjectInfoCf == null) { this.basicObjectInfoCf = new CompletableFuture<>(); asyncGetBasicObjectInfo(); @@ -199,6 +200,9 @@ public synchronized CompletableFuture size() { } public synchronized void close0() { + if (!isShutdown.compareAndSet(false, true)) { + return; + } if (basicObjectInfoCf != null) { basicObjectInfoCf.thenAccept(BasicObjectInfo::close); }