diff --git a/artemis-core-client-osgi/pom.xml b/artemis-core-client-osgi/pom.xml index a81b8458bdc..2fba916e49c 100644 --- a/artemis-core-client-osgi/pom.xml +++ b/artemis-core-client-osgi/pom.xml @@ -70,6 +70,7 @@ org.glassfish.json*;resolution:=optional, de.dentrassi.crypto.pem;resolution:=optional, + io.netty.incubator.*;resolution:=optional, io.netty.buffer;io.netty.*;version="[4.1,5)", * diff --git a/artemis-core-client/pom.xml b/artemis-core-client/pom.xml index 37d4c483e46..339c25f4e3d 100644 --- a/artemis-core-client/pom.xml +++ b/artemis-core-client/pom.xml @@ -89,6 +89,15 @@ io.netty netty-transport-classes-kqueue + + io.netty.incubator + netty-incubator-transport-native-io_uring + ${netty-transport-native-io_uring-classifier} + + + io.netty.incubator + netty-incubator-transport-classes-io_uring + io.netty netty-codec-http diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java index 027543cdf78..a9789adcaeb 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java @@ -353,4 +353,11 @@ public interface ActiveMQClientLogger { @LogMessage(id = 214036, value = "Connection closure to {} has been detected: {} [code={}]", level = LogMessage.Level.INFO) void connectionClosureDetected(String remoteAddress, String message, ActiveMQExceptionType type); + + @LogMessage(id = 214037, value = "Unable to check IoUring availability ", level = LogMessage.Level.WARN) + void unableToCheckIoUringAvailability(Throwable e); + + @LogMessage(id = 214038, value = "IoUring is not available, please add to the classpath or configure useIoUring=false to remove this warning", level = LogMessage.Level.WARN) + void unableToCheckIoUringAvailabilitynoClass(); + } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/CheckDependencies.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/CheckDependencies.java index 2bbcd1883e7..0756dfc1172 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/CheckDependencies.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/CheckDependencies.java @@ -19,6 +19,7 @@ import io.netty.channel.epoll.Epoll; import io.netty.channel.kqueue.KQueue; +import io.netty.incubator.channel.uring.IOUring; import org.apache.activemq.artemis.core.client.ActiveMQClientLogger; import org.apache.activemq.artemis.utils.Env; import org.slf4j.Logger; @@ -45,6 +46,18 @@ public static final boolean isEpollAvailable() { } } + public static final boolean isIoUringAvailable() { + try { + return Env.isLinuxOs() && IOUring.isAvailable(); + } catch (NoClassDefFoundError noClassDefFoundError) { + ActiveMQClientLogger.LOGGER.unableToCheckIoUringAvailabilitynoClass(); + return false; + } catch (Throwable e) { + ActiveMQClientLogger.LOGGER.unableToCheckIoUringAvailability(e); + return false; + } + } + public static final boolean isKQueueAvailable() { try { return Env.isMacOs() && KQueue.isAvailable(); diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java index e4017fd9607..0a5ded01f52 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java @@ -98,6 +98,8 @@ import io.netty.handler.proxy.Socks4ProxyHandler; import io.netty.handler.proxy.Socks5ProxyHandler; import io.netty.handler.ssl.SslHandler; +import io.netty.incubator.channel.uring.IOUringEventLoopGroup; +import io.netty.incubator.channel.uring.IOUringSocketChannel; import io.netty.resolver.NoopAddressResolverGroup; import io.netty.util.AttributeKey; import io.netty.util.ResourceLeakDetector; @@ -137,6 +139,7 @@ public class NettyConnector extends AbstractConnector { public static String NIO_CONNECTOR_TYPE = "NIO"; public static String EPOLL_CONNECTOR_TYPE = "EPOLL"; public static String KQUEUE_CONNECTOR_TYPE = "KQUEUE"; + public static String IOURING_CONNECTOR_TYPE = "IO_URING"; private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); @@ -295,6 +298,8 @@ public class NettyConnector extends AbstractConnector { private boolean useKQueue; + private boolean useIoUring; + private int remotingThreads; private boolean useGlobalWorkerPool; @@ -404,6 +409,7 @@ public NettyConnector(final Map configuration, useEpoll = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_EPOLL_PROP_NAME, TransportConstants.DEFAULT_USE_EPOLL, configuration); useKQueue = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_KQUEUE_PROP_NAME, TransportConstants.DEFAULT_USE_KQUEUE, configuration); + useIoUring = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_IOURING_PROP_NAME, TransportConstants.DEFAULT_USE_IOURING, configuration); useServlet = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_SERVLET_PROP_NAME, TransportConstants.DEFAULT_USE_SERVLET, configuration); host = ConfigurationHelper.getStringProperty(TransportConstants.HOST_PROP_NAME, TransportConstants.DEFAULT_HOST, configuration); @@ -528,6 +534,8 @@ public synchronized void start() { return; } + boolean defaultRemotingThreads = remotingThreads == -1; + if (remotingThreads == -1) { // Default to number of cores * 3 remotingThreads = Runtime.getRuntime().availableProcessors() * 3; @@ -535,14 +543,30 @@ public synchronized void start() { String connectorType; - if (useEpoll && CheckDependencies.isEpollAvailable()) { + if (useIoUring && CheckDependencies.isIoUringAvailable()) { + //IO_URING should default to 1 remotingThread unless specified in config + remotingThreads = defaultRemotingThreads ? 1 : remotingThreads; + + if (useGlobalWorkerPool) { + group = SharedEventLoopGroup.getInstance((threadFactory -> new IOUringEventLoopGroup(remotingThreads, threadFactory))); + } else { + group = new IOUringEventLoopGroup(remotingThreads); + } + + connectorType = IOURING_CONNECTOR_TYPE; + channelClazz = IOUringSocketChannel.class; + + logger.debug("Connector {} using native io_uring", this); + } else if (useEpoll && CheckDependencies.isEpollAvailable()) { if (useGlobalWorkerPool) { group = SharedEventLoopGroup.getInstance((threadFactory -> new EpollEventLoopGroup(remotingThreads, threadFactory))); } else { group = new EpollEventLoopGroup(remotingThreads); } + connectorType = EPOLL_CONNECTOR_TYPE; channelClazz = EpollSocketChannel.class; + logger.debug("Connector {} using native epoll", this); } else if (useKQueue && CheckDependencies.isKQueueAvailable()) { if (useGlobalWorkerPool) { @@ -550,19 +574,21 @@ public synchronized void start() { } else { group = new KQueueEventLoopGroup(remotingThreads); } + connectorType = KQUEUE_CONNECTOR_TYPE; channelClazz = KQueueSocketChannel.class; + logger.debug("Connector {} using native kqueue", this); } else { if (useGlobalWorkerPool) { - channelClazz = NioSocketChannel.class; group = SharedEventLoopGroup.getInstance((threadFactory -> new NioEventLoopGroup(remotingThreads, threadFactory))); } else { - channelClazz = NioSocketChannel.class; group = new NioEventLoopGroup(remotingThreads); } + connectorType = NIO_CONNECTOR_TYPE; channelClazz = NioSocketChannel.class; + logger.debug("Connector {} using nio", this); } // if we are a servlet wrap the socketChannelFactory diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java index 7859fec4a6b..507c6118e48 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java @@ -66,6 +66,8 @@ public class TransportConstants { public static final String USE_EPOLL_PROP_NAME = "useEpoll"; + public static final String USE_IOURING_PROP_NAME = "useIoUring"; + public static final String USE_KQUEUE_PROP_NAME = "useKQueue"; @Deprecated @@ -213,6 +215,8 @@ public class TransportConstants { public static final boolean DEFAULT_USE_KQUEUE = true; + public static final boolean DEFAULT_USE_IOURING = false; + public static final boolean DEFAULT_USE_INVM = false; public static final boolean DEFAULT_USE_SERVLET = false; @@ -409,6 +413,7 @@ private static int parseDefaultVariable(String variableName, int defaultValue) { allowableAcceptorKeys.add(TransportConstants.USE_NIO_PROP_NAME); allowableAcceptorKeys.add(TransportConstants.USE_EPOLL_PROP_NAME); allowableAcceptorKeys.add(TransportConstants.USE_KQUEUE_PROP_NAME); + allowableAcceptorKeys.add(TransportConstants.USE_IOURING_PROP_NAME); allowableAcceptorKeys.add(TransportConstants.USE_INVM_PROP_NAME); //noinspection deprecation allowableAcceptorKeys.add(TransportConstants.PROTOCOL_PROP_NAME); @@ -484,6 +489,7 @@ private static int parseDefaultVariable(String variableName, int defaultValue) { allowableConnectorKeys.add(TransportConstants.USE_NIO_GLOBAL_WORKER_POOL_PROP_NAME); allowableConnectorKeys.add(TransportConstants.USE_EPOLL_PROP_NAME); allowableConnectorKeys.add(TransportConstants.USE_KQUEUE_PROP_NAME); + allowableConnectorKeys.add(TransportConstants.USE_IOURING_PROP_NAME); allowableConnectorKeys.add(TransportConstants.USE_GLOBAL_WORKER_POOL_PROP_NAME); allowableConnectorKeys.add(TransportConstants.HOST_PROP_NAME); allowableConnectorKeys.add(TransportConstants.PORT_PROP_NAME); diff --git a/artemis-jms-client-osgi/pom.xml b/artemis-jms-client-osgi/pom.xml index 1bb454df4ed..441415fcc94 100644 --- a/artemis-jms-client-osgi/pom.xml +++ b/artemis-jms-client-osgi/pom.xml @@ -78,6 +78,7 @@ org.glassfish.json*;resolution:=optional, de.dentrassi.crypto.pem;resolution:=optional, + io.netty.incubator.*;resolution:=optional, io.netty.buffer;io.netty.*;version="[4.1,5)", * diff --git a/artemis-pom/pom.xml b/artemis-pom/pom.xml index f30c3ae98b7..0f856a25b00 100644 --- a/artemis-pom/pom.xml +++ b/artemis-pom/pom.xml @@ -440,6 +440,19 @@ ${netty-transport-native-kqueue-classifier} + + io.netty.incubator + netty-incubator-transport-classes-io_uring + ${netty.incubator.io_uring.version} + + + + io.netty.incubator + netty-incubator-transport-native-io_uring + ${netty.incubator.io_uring.version} + ${netty-transport-native-io_uring-classifier} + + org.apache.qpid proton-j diff --git a/artemis-server-osgi/pom.xml b/artemis-server-osgi/pom.xml index 21177268bf8..39a6bb34b6f 100644 --- a/artemis-server-osgi/pom.xml +++ b/artemis-server-osgi/pom.xml @@ -128,6 +128,7 @@ org.glassfish.json*;resolution:=optional, org.postgresql*;resolution:=optional, de.dentrassi.crypto.pem;resolution:=optional, + io.netty.incubator.*;resolution:=optional, io.netty.buffer;io.netty.*;version="[4.1,5)", java.net.http*;resolution:=optional, com.sun.net.httpserver*;resolution:=optional, diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java index 57e86a874e0..e9a4665aa53 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java @@ -64,6 +64,8 @@ import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslHandler; +import io.netty.incubator.channel.uring.IOUringEventLoopGroup; +import io.netty.incubator.channel.uring.IOUringServerSocketChannel; import io.netty.util.ResourceLeakDetector; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; @@ -112,6 +114,7 @@ public class NettyAcceptor extends AbstractAcceptor { public static final String NIO_ACCEPTOR_TYPE = "NIO"; public static final String EPOLL_ACCEPTOR_TYPE = "EPOLL"; public static final String KQUEUE_ACCEPTOR_TYPE = "KQUEUE"; + public static final String IOURING_ACCEPTOR_TYPE = "EXPERIMENTAL_IO_URING"; static { // Disable default Netty leak detection if the Netty leak detection level system properties are not in use @@ -148,6 +151,8 @@ public class NettyAcceptor extends AbstractAcceptor { private final boolean useKQueue; + private final boolean useIoUring; + private final ProtocolHandler protocolHandler; private final String host; @@ -276,6 +281,7 @@ public NettyAcceptor(final String name, useEpoll = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_EPOLL_PROP_NAME, TransportConstants.DEFAULT_USE_EPOLL, configuration); useKQueue = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_KQUEUE_PROP_NAME, TransportConstants.DEFAULT_USE_KQUEUE, configuration); + useIoUring = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_IOURING_PROP_NAME, TransportConstants.DEFAULT_USE_IOURING, configuration); backlog = ConfigurationHelper.getIntProperty(TransportConstants.BACKLOG_PROP_NAME, -1, configuration); useInvm = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_INVM_PROP_NAME, TransportConstants.DEFAULT_USE_INVM, configuration); @@ -425,12 +431,23 @@ public synchronized void start() throws Exception { eventLoopGroup = new DefaultEventLoopGroup(); } else { + boolean defaultRemotingThreads = remotingThreads == -1; + if (remotingThreads == -1) { // Default to number of cores * 3 remotingThreads = Runtime.getRuntime().availableProcessors() * 3; } - if (useEpoll && CheckDependencies.isEpollAvailable()) { + if (useIoUring && CheckDependencies.isIoUringAvailable()) { + //IO_URING should default to 1 remotingThread unless specified in config + remotingThreads = defaultRemotingThreads ? 1 : remotingThreads; + + channelClazz = IOUringServerSocketChannel.class; + eventLoopGroup = new IOUringEventLoopGroup(remotingThreads, AccessController.doPrivileged((PrivilegedAction) () -> new ActiveMQThreadFactory("activemq-netty-threads", true, ClientSessionFactoryImpl.class.getClassLoader()))); + acceptorType = IOURING_ACCEPTOR_TYPE; + + logger.debug("Acceptor using native io_uring"); + } else if (useEpoll && CheckDependencies.isEpollAvailable()) { channelClazz = EpollServerSocketChannel.class; eventLoopGroup = new EpollEventLoopGroup(remotingThreads, AccessController.doPrivileged((PrivilegedAction) () -> new ActiveMQThreadFactory("activemq-netty-threads", true, ClientSessionFactoryImpl.class.getClassLoader()))); acceptorType = EPOLL_ACCEPTOR_TYPE; @@ -446,6 +463,7 @@ public synchronized void start() throws Exception { channelClazz = NioServerSocketChannel.class; eventLoopGroup = new NioEventLoopGroup(remotingThreads, AccessController.doPrivileged((PrivilegedAction) () -> new ActiveMQThreadFactory("activemq-netty-threads", true, ClientSessionFactoryImpl.class.getClassLoader()))); acceptorType = NIO_ACCEPTOR_TYPE; + logger.debug("Acceptor using nio"); } } diff --git a/docs/user-manual/configuring-transports.adoc b/docs/user-manual/configuring-transports.adoc index 3547d4c1269..036efa810de 100644 --- a/docs/user-manual/configuring-transports.adoc +++ b/docs/user-manual/configuring-transports.adoc @@ -243,14 +243,14 @@ These Native transports add features specific to a particular platform, generate Both Clients and Server can benefit from this. -Current Supported Platforms. +Currently supported platforms: * Linux running 64bit JVM * MacOS running 64bit JVM -Apache ActiveMQ Artemis will by default enable the corresponding native transport if a supported platform is detected. +Apache ActiveMQ Artemis will enable the corresponding native transport by default if a supported platform is detected. -If running on an unsupported platform or any issues loading native libs, Apache ActiveMQ Artemis will fallback onto Java NIO. +If running on an unsupported platform, or if any issues occur while loading the native libs, Apache ActiveMQ Artemis will fallback onto Java NIO. ==== Linux Native Transport @@ -263,6 +263,23 @@ enables the use of epoll if a supported linux platform is running a 64bit JVM is Setting this to `false` will force the use of Java NIO instead of epoll. Default is `true` + +Additionally, Apache ActiveMQ Artemis offers `experimental` support for using IO_URING, @see https://en.wikipedia.org/wiki/Io_uring. + +The following properties are specific to this native transport: + +useIoUring:: +enables the use of IO_URING if a supported linux platform running a 64bit JVM is detected. +Setting this to `false` will attempt the use of `epoll`, then finally falling back to using Java NIO. +Default is `false` + +[WARNING] +==== +[#io_uring-warning] +IO_URING support is `experimental` at this point. Using it _could_ introduce unwanted side effects or unpredicted behavior. +It's currently not recommended for production or any otherwise critical use. +==== + ==== MacOS Native Transport On supported MacOS platforms KQueue is used, @see https://en.wikipedia.org/wiki/Kqueue. diff --git a/pom.xml b/pom.xml index 3c462954013..331ecdd9d12 100644 --- a/pom.xml +++ b/pom.xml @@ -121,6 +121,7 @@ 5.14.1 4.0.5 4.1.114.Final + 0.0.25.Final 2.2.2 5.7.0 3.9.2 @@ -261,6 +262,7 @@ linux-x86_64 osx-x86_64 + linux-x86_64 false