Skip to content

Commit

Permalink
Use Netty default allocator whenever it is pooled otherwise fallback …
Browse files Browse the repository at this point in the history
…to adaptive allocator.

Motivation:

Vert.x should use Netty's default allocator whenever possible in order to minimize the resources for pooled allocation (thread-local direct buffers, arenas).

Changes:

VertxByteBufAllocator.POOLED_ALLOCATOR reuses ByteBufAllocator.DEFAULT when it is pooled otherwise uses AdaptiveByteBufAllocator.DEFAULT.

TCP server/client should use VertxByteBufAllocator.POOLED_ALLOCATOR instead of PooledByteBufAllocator.DEFAULT.
  • Loading branch information
vietj committed Oct 29, 2024
1 parent d49eb4c commit c446d7c
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,7 @@
*/
package io.vertx.core.impl.buffer;

import io.netty.buffer.AbstractByteBufAllocator;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.buffer.*;
import io.netty.util.internal.PlatformDependent;
import io.vertx.core.buffer.impl.VertxHeapByteBuf;
import io.vertx.core.buffer.impl.VertxUnsafeHeapByteBuf;
Expand All @@ -24,7 +20,16 @@ public abstract class VertxByteBufAllocator extends AbstractByteBufAllocator {
/**
* Vert.x pooled allocator.
*/
public static final ByteBufAllocator POOLED_ALLOCATOR = new PooledByteBufAllocator(true);
public static final ByteBufAllocator POOLED_ALLOCATOR;

static {
ByteBufAllocator pooledAllocator = ByteBufAllocator.DEFAULT;
if (!pooledAllocator.isDirectBufferPooled()) {
// When io.netty.allocator.type == unpooled
pooledAllocator = AdaptiveByteBufAllocator.DEFAULT;
}
POOLED_ALLOCATOR = pooledAllocator;
}

/**
* Vert.x shared un-pooled allocator.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@
import io.netty.util.concurrent.GenericFutureListener;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.buffer.impl.PartialPooledByteBufAllocator;
import io.vertx.core.internal.ContextInternal;
import io.vertx.core.internal.CloseSequence;
import io.vertx.core.internal.VertxInternal;
import io.vertx.core.internal.PromiseInternal;
import io.vertx.core.impl.buffer.VertxByteBufAllocator;
import io.vertx.core.internal.logging.Logger;
import io.vertx.core.internal.logging.LoggerFactory;
import io.vertx.core.internal.net.NetClientInternal;
Expand Down Expand Up @@ -282,7 +282,7 @@ private void connectInternal2(ConnectOptions connectOptions,
Objects.requireNonNull(connectHandler, "No null connectHandler accepted");
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoop);
bootstrap.option(ChannelOption.ALLOCATOR, PartialPooledByteBufAllocator.INSTANCE);
bootstrap.option(ChannelOption.ALLOCATOR, VertxByteBufAllocator.POOLED_ALLOCATOR);

SocketAddress remoteAddress = connectOptions.getRemoteAddress();
if (remoteAddress == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
package io.vertx.core.net.impl;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.*;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.ChannelGroupFuture;
Expand All @@ -31,6 +30,7 @@
import io.vertx.core.internal.ContextInternal;
import io.vertx.core.internal.PromiseInternal;
import io.vertx.core.internal.VertxInternal;
import io.vertx.core.impl.buffer.VertxByteBufAllocator;
import io.vertx.core.internal.logging.Logger;
import io.vertx.core.internal.logging.LoggerFactory;
import io.vertx.core.internal.tls.SslContextManager;
Expand Down Expand Up @@ -508,7 +508,7 @@ private void bind(
if (options.isSsl()) {
bootstrap.childOption(ChannelOption.ALLOCATOR, PartialPooledByteBufAllocator.INSTANCE);
} else {
bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
bootstrap.childOption(ChannelOption.ALLOCATOR, VertxByteBufAllocator.POOLED_ALLOCATOR);
}

bootstrap.childHandler(channelBalancer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,7 @@

package io.vertx.core.net.impl;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.buffer.*;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
Expand Down Expand Up @@ -49,7 +46,11 @@ private VertxHandler(Function<ChannelHandlerContext, C> connectionFactory) {
* @return a buffer safe
*/
public static ByteBuf safeBuffer(ByteBuf byteBuf) {
if (byteBuf != Unpooled.EMPTY_BUFFER && (byteBuf.alloc() instanceof PooledByteBufAllocator || byteBuf instanceof CompositeByteBuf)) {
Class<?> allocClass;
if (byteBuf != Unpooled.EMPTY_BUFFER &&
((allocClass = byteBuf.alloc().getClass()) == AdaptiveByteBufAllocator.class
|| allocClass == PooledByteBufAllocator.class
|| byteBuf instanceof CompositeByteBuf)) {
try {
if (byteBuf.isReadable()) {
ByteBuf buffer = VertxByteBufAllocator.DEFAULT.heapBuffer(byteBuf.readableBytes());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@

package io.vertx.tests.buffer;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.*;
import io.vertx.core.buffer.impl.BufferImpl;
import io.vertx.core.buffer.impl.VertxHeapByteBuf;
import io.vertx.core.buffer.impl.VertxUnsafeHeapByteBuf;
import io.vertx.core.impl.buffer.VertxByteBufAllocator;
import io.vertx.core.internal.buffer.BufferInternal;
import org.junit.Test;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.*;

public class VertxBufferTest {

Expand Down Expand Up @@ -62,4 +63,30 @@ public void testDuplicate() {
assertEquals(0, byteBuf.readerIndex());
}

@Test
public void testSafeBuffer() {
assertCopyAndRelease(AdaptiveByteBufAllocator.DEFAULT.heapBuffer().writeByte('A'));
assertCopyAndRelease(AdaptiveByteBufAllocator.DEFAULT.directBuffer().writeByte('A'));
assertCopyAndRelease(PooledByteBufAllocator.DEFAULT.heapBuffer().writeByte('A'));
assertCopyAndRelease(PooledByteBufAllocator.DEFAULT.directBuffer().writeByte('A'));
assertCopyAndRelease(new CompositeByteBuf(UnpooledByteBufAllocator.DEFAULT, false, 10).writeByte('A'));
assertWrap(Unpooled.buffer().writeByte('A'));
assertWrap(VertxByteBufAllocator.DEFAULT.heapBuffer().writeByte('A'));
assertWrap(VertxByteBufAllocator.DEFAULT.directBuffer().writeByte('A'));
assertWrap(UnpooledByteBufAllocator.DEFAULT.heapBuffer().writeByte('A'));
assertWrap(UnpooledByteBufAllocator.DEFAULT.directBuffer().writeByte('A'));
}

private static void assertCopyAndRelease(ByteBuf bbuf) {
BufferImpl buffer = (BufferImpl) BufferInternal.safeBuffer(bbuf);
assertNotSame(bbuf, buffer.byteBuf());
assertEquals(0, bbuf.refCnt());
}

private static void assertWrap(ByteBuf bbuf) {
BufferImpl buffer = (BufferImpl) BufferInternal.safeBuffer(bbuf);
assertSame(bbuf, buffer.byteBuf());
assertEquals(1, bbuf.refCnt());
bbuf.release();
}
}
48 changes: 45 additions & 3 deletions vertx-core/src/test/java/io/vertx/tests/net/NetTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@

package io.vertx.tests.net;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.buffer.*;
import io.netty.channel.*;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
Expand All @@ -28,9 +27,12 @@
import io.netty.handler.ssl.IdentityCipherSuiteFilter;
import io.netty.handler.ssl.JdkSslContext;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
import io.netty.util.internal.PlatformDependent;
import io.vertx.core.*;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.buffer.impl.BufferImpl;
import io.vertx.core.internal.buffer.BufferInternal;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.MessageConsumer;
Expand Down Expand Up @@ -3636,7 +3638,7 @@ private void testNetClientInternal_(HttpServerOptions options, boolean expectSSL
case 1:
assertTrue(obj instanceof LastHttpContent);
ByteBuf content = ((LastHttpContent) obj).content();
assertEquals(!expectSSL, content.isDirect());
assertTrue(content.isDirect());
assertEquals(1, content.refCnt());
String val = content.toString(StandardCharsets.UTF_8);
assertTrue(content.release());
Expand Down Expand Up @@ -4602,4 +4604,44 @@ public void testConnectToServerShutdown() throws Exception {
assertWaitUntil(closed::get);
fut.await();
}

@Test
public void testByteBufOriginateFromDefaultByteBufAllocator() {
server.connectHandler(so -> {
NetSocketInternal soi = (NetSocketInternal) so;
soi.messageHandler(msg -> {
try {
ByteBuf bbuf = (ByteBuf) msg;
assertSame(ByteBufAllocator.DEFAULT, bbuf.alloc());
} finally {
ReferenceCountUtil.release(msg);
}
testComplete();
});
});
server
.listen(1234, "localhost")
.await();
NetSocket so = client.connect(testAddress).await();
so.write(Buffer.buffer("ping"));
await();
}

@Test
public void testByteBufCopyAndRelease() {
server.connectHandler(so -> {
so.handler(buff -> {
ByteBuf byteBuf = ((BufferImpl)buff).byteBuf();
assertFalse(byteBuf.isDirect());
assertFalse(byteBuf.alloc().isDirectBufferPooled());
testComplete();
});
});
server
.listen(1234, "localhost")
.await();
NetSocket so = client.connect(testAddress).await();
so.write(Buffer.buffer("ping"));
await();
}
}

0 comments on commit c446d7c

Please sign in to comment.