Skip to content

Commit

Permalink
fix(issue2139): add computeIfAbsent atomic operation to AsyncLRUCache (
Browse files Browse the repository at this point in the history
…#2144)

close #2139

Signed-off-by: Shichao Nie <[email protected]>
  • Loading branch information
SCNieh authored Nov 8, 2024
1 parent 90516c7 commit a74cf69
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import com.automq.stream.utils.biniarysearch.AbstractOrderedCollection;
import com.automq.stream.utils.biniarysearch.ComparableItem;

import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -26,6 +25,7 @@
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import io.netty.buffer.ByteBuf;
Expand Down
9 changes: 6 additions & 3 deletions s3stream/src/main/java/com/automq/stream/s3/ObjectReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@
import com.automq.stream.s3.operator.ObjectStorage;
import com.automq.stream.utils.CloseableIterator;
import com.automq.stream.utils.biniarysearch.IndexBlockOrderedBytes;
import io.netty.buffer.ByteBuf;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
Expand All @@ -31,8 +34,8 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.netty.buffer.ByteBuf;

import static com.automq.stream.s3.ByteBufAlloc.BLOCK_CACHE;
import static com.automq.stream.s3.ByteBufAlloc.READ_INDEX_BLOCK;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,14 @@

import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import com.automq.stream.s3.metrics.stats.AsyncLRUCacheStats;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AsyncLRUCache<K, V extends AsyncMeasurable> {
private static final Logger LOGGER = LoggerFactory.getLogger(AsyncLRUCache.class);
Expand Down Expand Up @@ -88,6 +87,20 @@ public synchronized V get(K key) {
return val;
}

public synchronized V computeIfAbsent(K key, Function<? super K, ? extends V> valueMapper) {
V value = cache.get(key);
if (value == null) {
value = valueMapper.apply(key);
if (value != null) {
put(key, value);
}
}
return value;
}

public synchronized void inLockRun(Runnable runnable) {
runnable.run();
}

public synchronized boolean remove(K key) {
V value = cache.get(key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.automq.stream.s3.cache.ObjectReaderLRUCache;
import com.automq.stream.s3.metadata.S3ObjectMetadata;
import com.automq.stream.s3.operator.ObjectStorage;
import java.util.concurrent.atomic.AtomicReference;

public class DefaultObjectReaderFactory implements ObjectReaderFactory {
private static final int MAX_OBJECT_READER_SIZE = 100 * 1024 * 1024; // 100MB;
Expand All @@ -29,12 +30,13 @@ public DefaultObjectReaderFactory(ObjectStorage objectStorage) {

@Override
public synchronized ObjectReader get(S3ObjectMetadata metadata) {
ObjectReader objectReader = objectReaders.get(metadata.objectId());
if (objectReader == null) {
objectReader = ObjectReader.reader(metadata, objectStorage);
objectReaders.put(metadata.objectId(), objectReader);
}
return objectReader.retain();
AtomicReference<ObjectReader> objectReaderRef = new AtomicReference<>();
objectReaders.inLockRun(() -> {
ObjectReader objectReader = objectReaders.computeIfAbsent(metadata.objectId(), k -> ObjectReader.reader(metadata, objectStorage));
objectReader.retain();
objectReaderRef.set(objectReader);
});
return objectReaderRef.get();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

package org.apache.kafka.tools.automq.perf;

import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.AlterConsumerGroupOffsetsResult;
Expand Down Expand Up @@ -44,6 +43,7 @@
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
Expand Down

0 comments on commit a74cf69

Please sign in to comment.