Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

revert(7dcdb78): remove policies of memory allocator #958

Merged
merged 1 commit into from
Mar 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 10 additions & 32 deletions s3stream/src/main/java/com/automq/stream/s3/ByteBufAlloc.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,11 @@

public class ByteBufAlloc {
public static final boolean MEMORY_USAGE_DETECT = Boolean.parseBoolean(System.getenv("AUTOMQ_MEMORY_USAGE_DETECT"));
public static final boolean ALLOCATOR_USAGE_UNPOOLED = Boolean.parseBoolean(System.getenv("AUTOMQ_ALLOCATOR_USAGE_UNPOOLED"));
public static final boolean BUFFER_USAGE_HEAPED = Boolean.parseBoolean(System.getenv("AUTOMQ_BUFFER_USAGE_HEAPED"));

private static final Logger LOGGER = LoggerFactory.getLogger(ByteBufAlloc.class);
private static final AbstractByteBufAllocator ALLOC = ALLOCATOR_USAGE_UNPOOLED ? UnpooledByteBufAllocator.DEFAULT : PooledByteBufAllocator.DEFAULT;
private static final Map<Integer, LongAdder> USAGE_STATS = new ConcurrentHashMap<>();
private static long lastMetricLogTime = System.currentTimeMillis();
private static final Map<Integer, String> ALLOC_TYPE = new HashMap<>();
Expand All @@ -47,15 +50,6 @@ public class ByteBufAlloc {
public static final int STREAM_SET_OBJECT_COMPACTION_WRITE = 10;
public static ByteBufAllocMetric byteBufAllocMetric = null;

/**
* The policy used to allocate memory.
*/
private static ByteBufAllocPolicy policy = ByteBufAllocPolicy.UNPOOLED_HEAP;
/**
* The allocator used to allocate memory. It should be updated when {@link #policy} is updated.
*/
private static AbstractByteBufAllocator allocator = getAllocatorByPolicy(policy);

static {
registerAllocType(DEFAULT, "default");
registerAllocType(ENCODE_RECORD, "write_record");
Expand All @@ -71,17 +65,8 @@ public class ByteBufAlloc {

}

/**
* Set the policy used to allocate memory.
*/
public static void setPolicy(ByteBufAllocPolicy policy) {
LOGGER.info("Set alloc policy to {}", policy);
ByteBufAlloc.policy = policy;
ByteBufAlloc.allocator = getAllocatorByPolicy(policy);
}

public static CompositeByteBuf compositeByteBuffer() {
return allocator.compositeDirectBuffer(Integer.MAX_VALUE);
return ALLOC.compositeDirectBuffer(Integer.MAX_VALUE);
}

public static ByteBuf byteBuffer(int initCapacity) {
Expand All @@ -105,9 +90,9 @@ public static ByteBuf byteBuffer(int initCapacity, int type) {
ByteBufAlloc.byteBufAllocMetric = new ByteBufAllocMetric();
LOGGER.info("Buffer usage: {}", ByteBufAlloc.byteBufAllocMetric);
}
return new WrappedByteBuf(policy.isDirect() ? allocator.directBuffer(initCapacity) : allocator.heapBuffer(initCapacity), () -> usage.add(-initCapacity));
return new WrappedByteBuf(BUFFER_USAGE_HEAPED ? ALLOC.heapBuffer(initCapacity) : ALLOC.directBuffer(initCapacity), () -> usage.add(-initCapacity));
} else {
return policy.isDirect() ? allocator.directBuffer(initCapacity) : allocator.heapBuffer(initCapacity);
return BUFFER_USAGE_HEAPED ? ALLOC.heapBuffer(initCapacity) : ALLOC.directBuffer(initCapacity);
}
} catch (OutOfMemoryError e) {
if (MEMORY_USAGE_DETECT) {
Expand All @@ -129,13 +114,6 @@ public static void registerAllocType(int type, String name) {
ALLOC_TYPE.put(type, name);
}

private static AbstractByteBufAllocator getAllocatorByPolicy(ByteBufAllocPolicy policy) {
if (policy.isPooled()) {
return PooledByteBufAllocator.DEFAULT;
}
return UnpooledByteBufAllocator.DEFAULT;
}

public static class ByteBufAllocMetric {
private final long usedMemory;
private final long allocatedMemory;
Expand All @@ -145,8 +123,8 @@ public ByteBufAllocMetric() {
USAGE_STATS.forEach((k, v) -> {
detail.put(k + "/" + ALLOC_TYPE.get(k), v.longValue());
});
ByteBufAllocatorMetric metric = ((ByteBufAllocatorMetricProvider) allocator).metric();
this.usedMemory = policy.isDirect() ? metric.usedDirectMemory() : metric.usedHeapMemory();
ByteBufAllocatorMetric metric = ((ByteBufAllocatorMetricProvider) ALLOC).metric();
this.usedMemory = BUFFER_USAGE_HEAPED ? metric.usedHeapMemory() : metric.usedDirectMemory();
this.allocatedMemory = this.detail.values().stream().mapToLong(Long::longValue).sum();
}

Expand All @@ -169,9 +147,9 @@ public String toString() {
sb.append(entry.getKey()).append("=").append(entry.getValue()).append(",");
}
sb.append(", pooled=");
sb.append(policy.isPooled());
sb.append(!ALLOCATOR_USAGE_UNPOOLED);
sb.append(", direct=");
sb.append(policy.isDirect());
sb.append(!BUFFER_USAGE_HEAPED);
sb.append("}");
return sb.toString();
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import java.util.concurrent.TimeUnit;
import org.mockito.Mockito;

import static com.automq.stream.s3.ByteBufAllocPolicy.POOLED_DIRECT;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.doReturn;
Expand All @@ -63,7 +62,6 @@ public class CompactionTestBase {
protected S3Operator s3Operator;

public void setUp() throws Exception {
ByteBufAlloc.setPolicy(POOLED_DIRECT);
streamManager = Mockito.mock(MemoryMetadataManager.class);
when(streamManager.getStreams(Mockito.anyList())).thenReturn(CompletableFuture.completedFuture(
List.of(new StreamMetadata(STREAM_0, 0, 0, 20, StreamState.OPENED),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ public class CompactionUploaderTest extends CompactionTestBase {

@BeforeEach
public void setUp() throws Exception {
super.setUp();
s3Operator = new MemoryS3Operator();
objectManager = new MemoryMetadataManager();
config = mock(Config.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.automq.stream.s3.objects.ObjectStreamRange;
import java.util.List;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
Expand All @@ -32,11 +31,6 @@
@Tag("S3Unit")
public class CompactionUtilTest extends CompactionTestBase {

@BeforeEach
public void setUp() throws Exception {
super.setUp();
}

@Test
public void testMergeStreamDataBlocks() {
List<StreamDataBlock> streamDataBlocks = List.of(
Expand Down
Loading