Skip to content

Commit

Permalink
perf: use a new GrowableMultiBufferSupplier to avoid memory waste
Browse files Browse the repository at this point in the history
Signed-off-by: Ning Yu <[email protected]>
  • Loading branch information
Chillax-0v0 committed Aug 14, 2024
1 parent dd6b5c1 commit 7d1180d
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -121,4 +121,49 @@ public void close() {
}
}

// AutoMQ for Kafka inject start
/**
* Different from {@link GrowableBufferSupplier}, this buffer supplier caches multiple buffers.
* So it is suitable for scenarios where multiple buffers are needed. For example:
* <pre>
* {@code
* BufferSupplier supplier = new GrowableMultiBufferSupplier();
*
* ByteBuffer buffer1 = supplier.get(1024);
* ByteBuffer buffer2 = supplier.get(2048);
*
* supplier.release(buffer1);
* supplier.release(buffer2);
*
* supplier.close();
* }
* </pre>
*/
public static class GrowableMultiBufferSupplier extends BufferSupplier {
private final Deque<ByteBuffer> buffers = new ArrayDeque<>(1);

@Override
public ByteBuffer get(int minCapacity) {
if (!buffers.isEmpty()) {
ByteBuffer buffer = buffers.pollFirst();
if (buffer.capacity() >= minCapacity) {
return buffer;
}
}
return ByteBuffer.allocate(minCapacity);
}

@Override
public void release(ByteBuffer buffer) {
buffer.clear();
buffers.addLast(buffer);
}

@Override
public void close() {
buffers.clear();
}
}
// AutoMQ for Kafka inject end

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Copyright 2024, AutoMQ HK Limited.
*
* The use of this file is governed by the Business Source License,
* as detailed in the file "/LICENSE.S3Stream" included in this repository.
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

package org.apache.kafka.common.utils;

import java.nio.ByteBuffer;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotSame;
import static org.junit.jupiter.api.Assertions.assertSame;

class GrowableMultiBufferSupplierTest {

private BufferSupplier.GrowableMultiBufferSupplier bufferSupplier;

@BeforeEach
void setUp() {
bufferSupplier = new BufferSupplier.GrowableMultiBufferSupplier();
}

@Test
void testGetWhenNoBuffersAvailable() {
ByteBuffer result = bufferSupplier.get(10);
assertEquals(10, result.capacity());
}

@Test
void testGetWithSufficientCapacity() {
ByteBuffer buffer = bufferSupplier.get(10);
bufferSupplier.release(buffer);

ByteBuffer result = bufferSupplier.get(5);
assertSame(buffer, result);
assertEquals(0, result.position());
assertEquals(10, result.capacity());
}

@Test
void testGetWithInsufficientCapacity() {
ByteBuffer buffer = bufferSupplier.get(10);
bufferSupplier.release(buffer);

ByteBuffer result = bufferSupplier.get(15);
assertNotSame(buffer, result);
assertEquals(15, result.capacity());
}

@Test
void testGetAndReleaseMultipleBuffers() {
ByteBuffer buffer1 = bufferSupplier.get(10);
ByteBuffer buffer2 = bufferSupplier.get(15);
bufferSupplier.release(buffer1);
bufferSupplier.release(buffer2);

ByteBuffer result1 = bufferSupplier.get(5);
ByteBuffer result2 = bufferSupplier.get(10);
assertSame(buffer1, result1);
assertSame(buffer2, result2);
assertEquals(0, result1.position());
assertEquals(0, result2.position());
}

@Test
void testRelease() {
ByteBuffer buffer = ByteBuffer.allocate(10);
buffer.put((byte) 1);
bufferSupplier.release(buffer);

ByteBuffer result = bufferSupplier.get(5);
assertSame(buffer, result);
assertEquals(0, result.position());
}
}
15 changes: 12 additions & 3 deletions raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,10 @@ private Optional<SnapshotReader<T>> latestSnapshot() {
return log.latestSnapshot().map(reader ->
RecordsSnapshotReader.of(reader,
serde,
BufferSupplier.create(),
// AutoMQ for Kafka inject start
// BufferSupplier.create(),
new BufferSupplier.GrowableMultiBufferSupplier(),
// AutoMQ for Kafka inject end
MAX_BATCH_SIZE_BYTES,
true /* Validate batch CRC*/
)
Expand Down Expand Up @@ -386,7 +389,10 @@ public void initialize(
Optional.of(VoterSet.fromInetSocketAddresses(listenerName, voterAddresses)),
log,
serde,
BufferSupplier.create(),
// AutoMQ for Kafka inject start
// BufferSupplier.create(),
new BufferSupplier.GrowableMultiBufferSupplier(),
// AutoMQ for Kafka inject end
MAX_BATCH_SIZE_BYTES,
logContext
);
Expand Down Expand Up @@ -2700,7 +2706,10 @@ private void fireHandleCommit(long baseOffset, Records records) {
baseOffset,
records,
serde,
BufferSupplier.create(),
// AutoMQ for Kafka inject start
// BufferSupplier.create(),
new BufferSupplier.GrowableMultiBufferSupplier(),
// AutoMQ for Kafka inject end
MAX_BATCH_SIZE_BYTES,
this,
true /* Validate batch CRC*/
Expand Down

0 comments on commit 7d1180d

Please sign in to comment.