From f11870980e831778068fcd6da28cdfd935383e52 Mon Sep 17 00:00:00 2001 From: Julien Viet Date: Tue, 29 Oct 2024 14:20:58 +0100 Subject: [PATCH] Use Netty default allocator whenever it is pooled otherwise fallback to adaptive allocator. Motivation: Vert.x should use Netty's default allocator whenever possible in order to minimize the resources for pooled allocation (thread-local direct buffers, arenas). Changes: VertxByteBufAllocator.POOLED_ALLOCATOR reuses ByteBufAllocator.DEFAULT when it is pooled otherwise uses AdaptiveByteBufAllocator.DEFAULT. TCP server/client should use VertxByteBufAllocator.POOLED_ALLOCATOR instead of PooledByteBufAllocator.DEFAULT. --- .../impl/buffer/VertxByteBufAllocator.java | 17 ++++--- .../io/vertx/core/net/impl/NetClientImpl.java | 4 +- .../io/vertx/core/net/impl/NetServerImpl.java | 4 +- .../io/vertx/core/net/impl/VertxHandler.java | 11 ++--- .../vertx/tests/buffer/VertxBufferTest.java | 33 ++++++++++++-- .../test/java/io/vertx/tests/net/NetTest.java | 44 +++++++++++++++++-- 6 files changed, 92 insertions(+), 21 deletions(-) diff --git a/vertx-core/src/main/java/io/vertx/core/impl/buffer/VertxByteBufAllocator.java b/vertx-core/src/main/java/io/vertx/core/impl/buffer/VertxByteBufAllocator.java index 6f5cf505f7e..c20dd633594 100644 --- a/vertx-core/src/main/java/io/vertx/core/impl/buffer/VertxByteBufAllocator.java +++ b/vertx-core/src/main/java/io/vertx/core/impl/buffer/VertxByteBufAllocator.java @@ -10,11 +10,7 @@ */ package io.vertx.core.impl.buffer; -import io.netty.buffer.AbstractByteBufAllocator; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; -import io.netty.buffer.PooledByteBufAllocator; -import io.netty.buffer.UnpooledByteBufAllocator; +import io.netty.buffer.*; import io.netty.util.internal.PlatformDependent; import io.vertx.core.buffer.impl.VertxHeapByteBuf; import io.vertx.core.buffer.impl.VertxUnsafeHeapByteBuf; @@ -24,7 +20,16 @@ public abstract class VertxByteBufAllocator extends AbstractByteBufAllocator { /** * Vert.x pooled allocator. */ - public static final ByteBufAllocator POOLED_ALLOCATOR = new PooledByteBufAllocator(true); + public static final ByteBufAllocator POOLED_ALLOCATOR; + + static { + ByteBufAllocator pooledAllocator = ByteBufAllocator.DEFAULT; + if (!pooledAllocator.isDirectBufferPooled()) { + // When io.netty.allocator.type == unpooled + pooledAllocator = AdaptiveByteBufAllocator.DEFAULT; + } + POOLED_ALLOCATOR = pooledAllocator; + } /** * Vert.x shared un-pooled allocator. diff --git a/vertx-core/src/main/java/io/vertx/core/net/impl/NetClientImpl.java b/vertx-core/src/main/java/io/vertx/core/net/impl/NetClientImpl.java index 35589e3d1b1..0233a518c57 100644 --- a/vertx-core/src/main/java/io/vertx/core/net/impl/NetClientImpl.java +++ b/vertx-core/src/main/java/io/vertx/core/net/impl/NetClientImpl.java @@ -22,11 +22,11 @@ import io.netty.util.concurrent.GenericFutureListener; import io.vertx.core.Future; import io.vertx.core.Promise; -import io.vertx.core.buffer.impl.PartialPooledByteBufAllocator; import io.vertx.core.internal.ContextInternal; import io.vertx.core.internal.CloseSequence; import io.vertx.core.internal.VertxInternal; import io.vertx.core.internal.PromiseInternal; +import io.vertx.core.impl.buffer.VertxByteBufAllocator; import io.vertx.core.internal.logging.Logger; import io.vertx.core.internal.logging.LoggerFactory; import io.vertx.core.internal.net.NetClientInternal; @@ -282,7 +282,7 @@ private void connectInternal2(ConnectOptions connectOptions, Objects.requireNonNull(connectHandler, "No null connectHandler accepted"); Bootstrap bootstrap = new Bootstrap(); bootstrap.group(eventLoop); - bootstrap.option(ChannelOption.ALLOCATOR, PartialPooledByteBufAllocator.INSTANCE); + bootstrap.option(ChannelOption.ALLOCATOR, VertxByteBufAllocator.POOLED_ALLOCATOR); SocketAddress remoteAddress = connectOptions.getRemoteAddress(); if (remoteAddress == null) { diff --git a/vertx-core/src/main/java/io/vertx/core/net/impl/NetServerImpl.java b/vertx-core/src/main/java/io/vertx/core/net/impl/NetServerImpl.java index e196a7e357a..02533a269f6 100644 --- a/vertx-core/src/main/java/io/vertx/core/net/impl/NetServerImpl.java +++ b/vertx-core/src/main/java/io/vertx/core/net/impl/NetServerImpl.java @@ -11,7 +11,6 @@ package io.vertx.core.net.impl; import io.netty.bootstrap.ServerBootstrap; -import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.*; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.ChannelGroupFuture; @@ -31,6 +30,7 @@ import io.vertx.core.internal.ContextInternal; import io.vertx.core.internal.PromiseInternal; import io.vertx.core.internal.VertxInternal; +import io.vertx.core.impl.buffer.VertxByteBufAllocator; import io.vertx.core.internal.logging.Logger; import io.vertx.core.internal.logging.LoggerFactory; import io.vertx.core.internal.tls.SslContextManager; @@ -508,7 +508,7 @@ private void bind( if (options.isSsl()) { bootstrap.childOption(ChannelOption.ALLOCATOR, PartialPooledByteBufAllocator.INSTANCE); } else { - bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); + bootstrap.childOption(ChannelOption.ALLOCATOR, VertxByteBufAllocator.POOLED_ALLOCATOR); } bootstrap.childHandler(channelBalancer); diff --git a/vertx-core/src/main/java/io/vertx/core/net/impl/VertxHandler.java b/vertx-core/src/main/java/io/vertx/core/net/impl/VertxHandler.java index 758f8dbb187..8b3aff25ece 100644 --- a/vertx-core/src/main/java/io/vertx/core/net/impl/VertxHandler.java +++ b/vertx-core/src/main/java/io/vertx/core/net/impl/VertxHandler.java @@ -11,10 +11,7 @@ package io.vertx.core.net.impl; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.CompositeByteBuf; -import io.netty.buffer.PooledByteBufAllocator; -import io.netty.buffer.Unpooled; +import io.netty.buffer.*; import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; @@ -49,7 +46,11 @@ private VertxHandler(Function connectionFactory) { * @return a buffer safe */ public static ByteBuf safeBuffer(ByteBuf byteBuf) { - if (byteBuf != Unpooled.EMPTY_BUFFER && (byteBuf.alloc() instanceof PooledByteBufAllocator || byteBuf instanceof CompositeByteBuf)) { + Class allocClass; + if (byteBuf != Unpooled.EMPTY_BUFFER && + ((allocClass = byteBuf.alloc().getClass()) == AdaptiveByteBufAllocator.class + || allocClass == PooledByteBufAllocator.class + || byteBuf instanceof CompositeByteBuf)) { try { if (byteBuf.isReadable()) { ByteBuf buffer = VertxByteBufAllocator.DEFAULT.heapBuffer(byteBuf.readableBytes()); diff --git a/vertx-core/src/test/java/io/vertx/tests/buffer/VertxBufferTest.java b/vertx-core/src/test/java/io/vertx/tests/buffer/VertxBufferTest.java index a7ee209137a..55580516b64 100644 --- a/vertx-core/src/test/java/io/vertx/tests/buffer/VertxBufferTest.java +++ b/vertx-core/src/test/java/io/vertx/tests/buffer/VertxBufferTest.java @@ -11,14 +11,15 @@ package io.vertx.tests.buffer; -import io.netty.buffer.ByteBuf; +import io.netty.buffer.*; import io.vertx.core.buffer.impl.BufferImpl; import io.vertx.core.buffer.impl.VertxHeapByteBuf; import io.vertx.core.buffer.impl.VertxUnsafeHeapByteBuf; +import io.vertx.core.impl.buffer.VertxByteBufAllocator; +import io.vertx.core.internal.buffer.BufferInternal; import org.junit.Test; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; public class VertxBufferTest { @@ -62,4 +63,30 @@ public void testDuplicate() { assertEquals(0, byteBuf.readerIndex()); } + @Test + public void testSafeBuffer() { + assertCopyAndRelease(AdaptiveByteBufAllocator.DEFAULT.heapBuffer().writeByte('A')); + assertCopyAndRelease(AdaptiveByteBufAllocator.DEFAULT.directBuffer().writeByte('A')); + assertCopyAndRelease(PooledByteBufAllocator.DEFAULT.heapBuffer().writeByte('A')); + assertCopyAndRelease(PooledByteBufAllocator.DEFAULT.directBuffer().writeByte('A')); + assertCopyAndRelease(new CompositeByteBuf(UnpooledByteBufAllocator.DEFAULT, false, 10).writeByte('A')); + assertWrap(Unpooled.buffer().writeByte('A')); + assertWrap(VertxByteBufAllocator.DEFAULT.heapBuffer().writeByte('A')); + assertWrap(VertxByteBufAllocator.DEFAULT.directBuffer().writeByte('A')); + assertWrap(UnpooledByteBufAllocator.DEFAULT.heapBuffer().writeByte('A')); + assertWrap(UnpooledByteBufAllocator.DEFAULT.directBuffer().writeByte('A')); + } + + private static void assertCopyAndRelease(ByteBuf bbuf) { + BufferImpl buffer = (BufferImpl) BufferInternal.safeBuffer(bbuf); + assertNotSame(bbuf, buffer.byteBuf()); + assertEquals(0, bbuf.refCnt()); + } + + private static void assertWrap(ByteBuf bbuf) { + BufferImpl buffer = (BufferImpl) BufferInternal.safeBuffer(bbuf); + assertSame(bbuf, buffer.byteBuf()); + assertEquals(1, bbuf.refCnt()); + bbuf.release(); + } } diff --git a/vertx-core/src/test/java/io/vertx/tests/net/NetTest.java b/vertx-core/src/test/java/io/vertx/tests/net/NetTest.java index fd077d58cd6..9881875d4f9 100755 --- a/vertx-core/src/test/java/io/vertx/tests/net/NetTest.java +++ b/vertx-core/src/test/java/io/vertx/tests/net/NetTest.java @@ -11,8 +11,7 @@ package io.vertx.tests.net; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; +import io.netty.buffer.*; import io.netty.channel.*; import io.netty.handler.codec.http.DefaultFullHttpRequest; import io.netty.handler.codec.http.DefaultFullHttpResponse; @@ -28,9 +27,12 @@ import io.netty.handler.ssl.IdentityCipherSuiteFilter; import io.netty.handler.ssl.JdkSslContext; import io.netty.handler.timeout.IdleStateEvent; +import io.netty.util.ReferenceCountUtil; +import io.netty.util.ReferenceCounted; import io.netty.util.internal.PlatformDependent; import io.vertx.core.*; import io.vertx.core.buffer.Buffer; +import io.vertx.core.buffer.impl.BufferImpl; import io.vertx.core.internal.buffer.BufferInternal; import io.vertx.core.eventbus.Message; import io.vertx.core.eventbus.MessageConsumer; @@ -3636,7 +3638,7 @@ private void testNetClientInternal_(HttpServerOptions options, boolean expectSSL case 1: assertTrue(obj instanceof LastHttpContent); ByteBuf content = ((LastHttpContent) obj).content(); - assertEquals(!expectSSL, content.isDirect()); + assertTrue(content.isDirect()); assertEquals(1, content.refCnt()); String val = content.toString(StandardCharsets.UTF_8); assertTrue(content.release()); @@ -4602,4 +4604,40 @@ public void testConnectToServerShutdown() throws Exception { assertWaitUntil(closed::get); fut.await(); } + + @Test + public void testByteBufOriginateFromDefaultByteBufAllocator() throws Exception { + server.connectHandler(so -> { + NetSocketInternal soi = (NetSocketInternal) so; + soi.messageHandler(msg -> { + try { + ByteBuf bbuf = (ByteBuf) msg; + assertSame(ByteBufAllocator.DEFAULT, bbuf.alloc()); + } finally { + ReferenceCountUtil.release(msg); + } + testComplete(); + }); + }); + startServer(); + NetSocket so = client.connect(testAddress).await(); + so.write(Buffer.buffer("ping")); + await(); + } + + @Test + public void testByteBufCopyAndRelease() throws Exception { + server.connectHandler(so -> { + so.handler(buff -> { + ByteBuf byteBuf = ((BufferImpl)buff).byteBuf(); + assertFalse(byteBuf.isDirect()); + assertFalse(byteBuf.alloc().isDirectBufferPooled()); + testComplete(); + }); + }); + startServer(); + NetSocket so = client.connect(testAddress).await(); + so.write(Buffer.buffer("ping")); + await(); + } }