From 32a8f67c5732e8247803100fcdd054134d30b830 Mon Sep 17 00:00:00 2001 From: Shichao Nie Date: Fri, 8 Nov 2024 14:25:05 +0800 Subject: [PATCH] fix(issue2139): add computeIfAbsent atomic operation to AsyncLRUCache close #2139 Signed-off-by: Shichao Nie --- .../stream/s3/CompositeObjectReader.java | 2 +- .../com/automq/stream/s3/ObjectReader.java | 9 +++++--- .../automq/stream/s3/cache/AsyncLRUCache.java | 21 +++++++++++++++---- .../DefaultObjectReaderFactory.java | 14 +++++++------ .../tools/automq/perf/ConsumerService.java | 2 +- 5 files changed, 33 insertions(+), 15 deletions(-) diff --git a/s3stream/src/main/java/com/automq/stream/s3/CompositeObjectReader.java b/s3stream/src/main/java/com/automq/stream/s3/CompositeObjectReader.java index 68c7f62d3d..c77a6a7c9d 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/CompositeObjectReader.java +++ b/s3stream/src/main/java/com/automq/stream/s3/CompositeObjectReader.java @@ -17,7 +17,6 @@ import com.automq.stream.utils.biniarysearch.AbstractOrderedCollection; import com.automq.stream.utils.biniarysearch.ComparableItem; -import java.util.concurrent.atomic.AtomicBoolean; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -26,6 +25,7 @@ import java.util.List; import java.util.Objects; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import io.netty.buffer.ByteBuf; diff --git a/s3stream/src/main/java/com/automq/stream/s3/ObjectReader.java b/s3stream/src/main/java/com/automq/stream/s3/ObjectReader.java index 11738772fc..231a015cf2 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/ObjectReader.java +++ b/s3stream/src/main/java/com/automq/stream/s3/ObjectReader.java @@ -20,7 +20,10 @@ import com.automq.stream.s3.operator.ObjectStorage; import com.automq.stream.utils.CloseableIterator; import com.automq.stream.utils.biniarysearch.IndexBlockOrderedBytes; -import io.netty.buffer.ByteBuf; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.ArrayList; import java.util.Iterator; import java.util.LinkedList; @@ -31,8 +34,8 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + +import io.netty.buffer.ByteBuf; import static com.automq.stream.s3.ByteBufAlloc.BLOCK_CACHE; import static com.automq.stream.s3.ByteBufAlloc.READ_INDEX_BLOCK; diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/AsyncLRUCache.java b/s3stream/src/main/java/com/automq/stream/s3/cache/AsyncLRUCache.java index fdf9cdbc2e..0201d0f0d9 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/cache/AsyncLRUCache.java +++ b/s3stream/src/main/java/com/automq/stream/s3/cache/AsyncLRUCache.java @@ -13,15 +13,14 @@ import com.automq.stream.s3.metrics.S3StreamMetricsManager; import com.automq.stream.s3.metrics.stats.AsyncLRUCacheStats; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.util.HashSet; import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class AsyncLRUCache { private static final Logger LOGGER = LoggerFactory.getLogger(AsyncLRUCache.class); @@ -88,6 +87,20 @@ public synchronized V get(K key) { return val; } + public synchronized V computeIfAbsent(K key, Function valueMapper) { + V value = cache.get(key); + if (value == null) { + value = valueMapper.apply(key); + if (value != null) { + put(key, value); + } + } + return value; + } + + public synchronized void inLockRun(Runnable runnable) { + runnable.run(); + } public synchronized boolean remove(K key) { V value = cache.get(key); diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/DefaultObjectReaderFactory.java b/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/DefaultObjectReaderFactory.java index 4ffaf7643a..20b56e81f9 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/DefaultObjectReaderFactory.java +++ b/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/DefaultObjectReaderFactory.java @@ -15,6 +15,7 @@ import com.automq.stream.s3.cache.ObjectReaderLRUCache; import com.automq.stream.s3.metadata.S3ObjectMetadata; import com.automq.stream.s3.operator.ObjectStorage; +import java.util.concurrent.atomic.AtomicReference; public class DefaultObjectReaderFactory implements ObjectReaderFactory { private static final int MAX_OBJECT_READER_SIZE = 100 * 1024 * 1024; // 100MB; @@ -29,12 +30,13 @@ public DefaultObjectReaderFactory(ObjectStorage objectStorage) { @Override public synchronized ObjectReader get(S3ObjectMetadata metadata) { - ObjectReader objectReader = objectReaders.get(metadata.objectId()); - if (objectReader == null) { - objectReader = ObjectReader.reader(metadata, objectStorage); - objectReaders.put(metadata.objectId(), objectReader); - } - return objectReader.retain(); + AtomicReference objectReaderRef = new AtomicReference<>(); + objectReaders.inLockRun(() -> { + ObjectReader objectReader = objectReaders.computeIfAbsent(metadata.objectId(), k -> ObjectReader.reader(metadata, objectStorage)); + objectReader.retain(); + objectReaderRef.set(objectReader); + }); + return objectReaderRef.get(); } @Override diff --git a/tools/src/main/java/org/apache/kafka/tools/automq/perf/ConsumerService.java b/tools/src/main/java/org/apache/kafka/tools/automq/perf/ConsumerService.java index fb7cfc87ca..81627ee4f8 100644 --- a/tools/src/main/java/org/apache/kafka/tools/automq/perf/ConsumerService.java +++ b/tools/src/main/java/org/apache/kafka/tools/automq/perf/ConsumerService.java @@ -11,7 +11,6 @@ package org.apache.kafka.tools.automq.perf; -import java.util.concurrent.ExecutionException; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.AlterConsumerGroupOffsetsResult; @@ -44,6 +43,7 @@ import java.util.Map; import java.util.Properties; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future;