diff --git a/vertx-core/pom.xml b/vertx-core/pom.xml index 9515dd43386..314a7178b77 100644 --- a/vertx-core/pom.xml +++ b/vertx-core/pom.xml @@ -599,6 +599,51 @@ + + adaptive-allocator + + integration-test + verify + + + + io/vertx/it/buffer/TcpAllocationTest.java + + + adaptive + + + + + pooled-allocator + + integration-test + verify + + + + io/vertx/it/buffer/TcpAllocationTest.java + + + pooled + + + + + unpooled-allocator + + integration-test + verify + + + + io/vertx/it/buffer/TcpAllocationTest.java + + + unpooled + + + diff --git a/vertx-core/src/main/java/io/vertx/core/impl/buffer/VertxByteBufAllocator.java b/vertx-core/src/main/java/io/vertx/core/impl/buffer/VertxByteBufAllocator.java index 6f5cf505f7e..3fbaa589834 100644 --- a/vertx-core/src/main/java/io/vertx/core/impl/buffer/VertxByteBufAllocator.java +++ b/vertx-core/src/main/java/io/vertx/core/impl/buffer/VertxByteBufAllocator.java @@ -10,11 +10,7 @@ */ package io.vertx.core.impl.buffer; -import io.netty.buffer.AbstractByteBufAllocator; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; -import io.netty.buffer.PooledByteBufAllocator; -import io.netty.buffer.UnpooledByteBufAllocator; +import io.netty.buffer.*; import io.netty.util.internal.PlatformDependent; import io.vertx.core.buffer.impl.VertxHeapByteBuf; import io.vertx.core.buffer.impl.VertxUnsafeHeapByteBuf; @@ -24,7 +20,15 @@ public abstract class VertxByteBufAllocator extends AbstractByteBufAllocator { /** * Vert.x pooled allocator. */ - public static final ByteBufAllocator POOLED_ALLOCATOR = new PooledByteBufAllocator(true); + public static final ByteBufAllocator POOLED_ALLOCATOR; + + static { + ByteBufAllocator pooledAllocator = ByteBufAllocator.DEFAULT; + if (pooledAllocator instanceof UnpooledByteBufAllocator) { + pooledAllocator = new AdaptiveByteBufAllocator(); + } + POOLED_ALLOCATOR = pooledAllocator; + } /** * Vert.x shared un-pooled allocator. diff --git a/vertx-core/src/main/java/io/vertx/core/net/impl/NetClientImpl.java b/vertx-core/src/main/java/io/vertx/core/net/impl/NetClientImpl.java index 35589e3d1b1..0233a518c57 100644 --- a/vertx-core/src/main/java/io/vertx/core/net/impl/NetClientImpl.java +++ b/vertx-core/src/main/java/io/vertx/core/net/impl/NetClientImpl.java @@ -22,11 +22,11 @@ import io.netty.util.concurrent.GenericFutureListener; import io.vertx.core.Future; import io.vertx.core.Promise; -import io.vertx.core.buffer.impl.PartialPooledByteBufAllocator; import io.vertx.core.internal.ContextInternal; import io.vertx.core.internal.CloseSequence; import io.vertx.core.internal.VertxInternal; import io.vertx.core.internal.PromiseInternal; +import io.vertx.core.impl.buffer.VertxByteBufAllocator; import io.vertx.core.internal.logging.Logger; import io.vertx.core.internal.logging.LoggerFactory; import io.vertx.core.internal.net.NetClientInternal; @@ -282,7 +282,7 @@ private void connectInternal2(ConnectOptions connectOptions, Objects.requireNonNull(connectHandler, "No null connectHandler accepted"); Bootstrap bootstrap = new Bootstrap(); bootstrap.group(eventLoop); - bootstrap.option(ChannelOption.ALLOCATOR, PartialPooledByteBufAllocator.INSTANCE); + bootstrap.option(ChannelOption.ALLOCATOR, VertxByteBufAllocator.POOLED_ALLOCATOR); SocketAddress remoteAddress = connectOptions.getRemoteAddress(); if (remoteAddress == null) { diff --git a/vertx-core/src/main/java/io/vertx/core/net/impl/NetServerImpl.java b/vertx-core/src/main/java/io/vertx/core/net/impl/NetServerImpl.java index e196a7e357a..02533a269f6 100644 --- a/vertx-core/src/main/java/io/vertx/core/net/impl/NetServerImpl.java +++ b/vertx-core/src/main/java/io/vertx/core/net/impl/NetServerImpl.java @@ -11,7 +11,6 @@ package io.vertx.core.net.impl; import io.netty.bootstrap.ServerBootstrap; -import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.*; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.ChannelGroupFuture; @@ -31,6 +30,7 @@ import io.vertx.core.internal.ContextInternal; import io.vertx.core.internal.PromiseInternal; import io.vertx.core.internal.VertxInternal; +import io.vertx.core.impl.buffer.VertxByteBufAllocator; import io.vertx.core.internal.logging.Logger; import io.vertx.core.internal.logging.LoggerFactory; import io.vertx.core.internal.tls.SslContextManager; @@ -508,7 +508,7 @@ private void bind( if (options.isSsl()) { bootstrap.childOption(ChannelOption.ALLOCATOR, PartialPooledByteBufAllocator.INSTANCE); } else { - bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); + bootstrap.childOption(ChannelOption.ALLOCATOR, VertxByteBufAllocator.POOLED_ALLOCATOR); } bootstrap.childHandler(channelBalancer); diff --git a/vertx-core/src/main/java/io/vertx/core/net/impl/VertxHandler.java b/vertx-core/src/main/java/io/vertx/core/net/impl/VertxHandler.java index 758f8dbb187..8b3aff25ece 100644 --- a/vertx-core/src/main/java/io/vertx/core/net/impl/VertxHandler.java +++ b/vertx-core/src/main/java/io/vertx/core/net/impl/VertxHandler.java @@ -11,10 +11,7 @@ package io.vertx.core.net.impl; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.CompositeByteBuf; -import io.netty.buffer.PooledByteBufAllocator; -import io.netty.buffer.Unpooled; +import io.netty.buffer.*; import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; @@ -49,7 +46,11 @@ private VertxHandler(Function connectionFactory) { * @return a buffer safe */ public static ByteBuf safeBuffer(ByteBuf byteBuf) { - if (byteBuf != Unpooled.EMPTY_BUFFER && (byteBuf.alloc() instanceof PooledByteBufAllocator || byteBuf instanceof CompositeByteBuf)) { + Class allocClass; + if (byteBuf != Unpooled.EMPTY_BUFFER && + ((allocClass = byteBuf.alloc().getClass()) == AdaptiveByteBufAllocator.class + || allocClass == PooledByteBufAllocator.class + || byteBuf instanceof CompositeByteBuf)) { try { if (byteBuf.isReadable()) { ByteBuf buffer = VertxByteBufAllocator.DEFAULT.heapBuffer(byteBuf.readableBytes()); diff --git a/vertx-core/src/test/java/io/vertx/it/buffer/TcpAllocationTest.java b/vertx-core/src/test/java/io/vertx/it/buffer/TcpAllocationTest.java new file mode 100644 index 00000000000..22a9d058f2a --- /dev/null +++ b/vertx-core/src/test/java/io/vertx/it/buffer/TcpAllocationTest.java @@ -0,0 +1,68 @@ +/* + * Copyright (c) 2011-2024 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ +package io.vertx.it.buffer; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.UnpooledByteBufAllocator; +import io.netty.util.ReferenceCountUtil; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.buffer.impl.BufferImpl; +import io.vertx.core.impl.buffer.VertxByteBufAllocator; +import io.vertx.core.internal.net.NetSocketInternal; +import io.vertx.core.net.NetClient; +import io.vertx.core.net.NetServer; +import io.vertx.core.net.NetSocket; +import io.vertx.test.core.VertxTestBase; +import org.junit.Test; + +public class TcpAllocationTest extends VertxTestBase { + + @Test + public void testByteBufOriginateFromDefaultByteBufAllocator() { + NetServer server = vertx.createNetServer(); + server.connectHandler(so -> { + NetSocketInternal soi = (NetSocketInternal) so; + soi.messageHandler(msg -> { + try { + ByteBuf bbuf = (ByteBuf) msg; + assertSame(VertxByteBufAllocator.POOLED_ALLOCATOR, bbuf.alloc()); + } finally { + ReferenceCountUtil.release(msg); + } + testComplete(); + }); + }); + server.listen(1234, "localhost").await(); + NetClient client = vertx.createNetClient(); + NetSocket so = client.connect(1234, "localhost").await(); + so.write(Buffer.buffer("ping")); + await(); + } + + @Test + public void testByteBufCopyAndRelease() { + NetServer server = vertx.createNetServer(); + server.connectHandler(so -> { + so.handler(buff -> { + ByteBuf byteBuf = ((BufferImpl)buff).byteBuf(); + assertFalse(byteBuf.isDirect()); + assertFalse(byteBuf.alloc().isDirectBufferPooled()); + testComplete(); + }); + }); + server.listen(1234, "localhost").await(); + NetClient client = vertx.createNetClient(); + NetSocket so = client.connect(1234, "localhost").await(); + so.write(Buffer.buffer("ping")); + await(); + } +} diff --git a/vertx-core/src/test/java/io/vertx/tests/buffer/VertxBufferTest.java b/vertx-core/src/test/java/io/vertx/tests/buffer/VertxBufferTest.java index a7ee209137a..55580516b64 100644 --- a/vertx-core/src/test/java/io/vertx/tests/buffer/VertxBufferTest.java +++ b/vertx-core/src/test/java/io/vertx/tests/buffer/VertxBufferTest.java @@ -11,14 +11,15 @@ package io.vertx.tests.buffer; -import io.netty.buffer.ByteBuf; +import io.netty.buffer.*; import io.vertx.core.buffer.impl.BufferImpl; import io.vertx.core.buffer.impl.VertxHeapByteBuf; import io.vertx.core.buffer.impl.VertxUnsafeHeapByteBuf; +import io.vertx.core.impl.buffer.VertxByteBufAllocator; +import io.vertx.core.internal.buffer.BufferInternal; import org.junit.Test; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; public class VertxBufferTest { @@ -62,4 +63,30 @@ public void testDuplicate() { assertEquals(0, byteBuf.readerIndex()); } + @Test + public void testSafeBuffer() { + assertCopyAndRelease(AdaptiveByteBufAllocator.DEFAULT.heapBuffer().writeByte('A')); + assertCopyAndRelease(AdaptiveByteBufAllocator.DEFAULT.directBuffer().writeByte('A')); + assertCopyAndRelease(PooledByteBufAllocator.DEFAULT.heapBuffer().writeByte('A')); + assertCopyAndRelease(PooledByteBufAllocator.DEFAULT.directBuffer().writeByte('A')); + assertCopyAndRelease(new CompositeByteBuf(UnpooledByteBufAllocator.DEFAULT, false, 10).writeByte('A')); + assertWrap(Unpooled.buffer().writeByte('A')); + assertWrap(VertxByteBufAllocator.DEFAULT.heapBuffer().writeByte('A')); + assertWrap(VertxByteBufAllocator.DEFAULT.directBuffer().writeByte('A')); + assertWrap(UnpooledByteBufAllocator.DEFAULT.heapBuffer().writeByte('A')); + assertWrap(UnpooledByteBufAllocator.DEFAULT.directBuffer().writeByte('A')); + } + + private static void assertCopyAndRelease(ByteBuf bbuf) { + BufferImpl buffer = (BufferImpl) BufferInternal.safeBuffer(bbuf); + assertNotSame(bbuf, buffer.byteBuf()); + assertEquals(0, bbuf.refCnt()); + } + + private static void assertWrap(ByteBuf bbuf) { + BufferImpl buffer = (BufferImpl) BufferInternal.safeBuffer(bbuf); + assertSame(bbuf, buffer.byteBuf()); + assertEquals(1, bbuf.refCnt()); + bbuf.release(); + } } diff --git a/vertx-core/src/test/java/io/vertx/tests/net/NetTest.java b/vertx-core/src/test/java/io/vertx/tests/net/NetTest.java index fd077d58cd6..008890c2142 100755 --- a/vertx-core/src/test/java/io/vertx/tests/net/NetTest.java +++ b/vertx-core/src/test/java/io/vertx/tests/net/NetTest.java @@ -11,8 +11,7 @@ package io.vertx.tests.net; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; +import io.netty.buffer.*; import io.netty.channel.*; import io.netty.handler.codec.http.DefaultFullHttpRequest; import io.netty.handler.codec.http.DefaultFullHttpResponse; @@ -45,7 +44,6 @@ import io.vertx.core.internal.net.NetSocketInternal; import io.vertx.core.spi.tls.SslContextFactory; import io.vertx.test.core.CheckingSender; -import io.vertx.test.core.Repeat; import io.vertx.test.core.TestUtils; import io.vertx.test.core.VertxTestBase; import io.vertx.test.netty.TestLoggerFactory; @@ -3636,7 +3634,7 @@ private void testNetClientInternal_(HttpServerOptions options, boolean expectSSL case 1: assertTrue(obj instanceof LastHttpContent); ByteBuf content = ((LastHttpContent) obj).content(); - assertEquals(!expectSSL, content.isDirect()); + assertTrue(content.isDirect()); assertEquals(1, content.refCnt()); String val = content.toString(StandardCharsets.UTF_8); assertTrue(content.release());