diff --git a/oncrpc4j-core/src/main/java/org/dcache/oncrpc4j/grizzly/GrizzlyUtils.java b/oncrpc4j-core/src/main/java/org/dcache/oncrpc4j/grizzly/GrizzlyUtils.java index 7676e40..d7d9696 100644 --- a/oncrpc4j-core/src/main/java/org/dcache/oncrpc4j/grizzly/GrizzlyUtils.java +++ b/oncrpc4j-core/src/main/java/org/dcache/oncrpc4j/grizzly/GrizzlyUtils.java @@ -19,11 +19,11 @@ */ package org.dcache.oncrpc4j.grizzly; +import org.dcache.oncrpc4j.rpc.IoStrategy; import org.dcache.oncrpc4j.rpc.MemoryAllocator; import org.dcache.oncrpc4j.rpc.RpcMessageParserTCP; import org.dcache.oncrpc4j.rpc.RpcMessageParserUDP; import org.dcache.oncrpc4j.rpc.net.IpProtocolType; -import org.dcache.oncrpc4j.rpc.IoStrategy; import org.glassfish.grizzly.IOStrategy; import org.glassfish.grizzly.Transport; import org.glassfish.grizzly.filterchain.Filter; @@ -33,10 +33,12 @@ import org.glassfish.grizzly.memory.PooledMemoryManager; import org.glassfish.grizzly.nio.transport.TCPNIOTransport; import org.glassfish.grizzly.nio.transport.UDPNIOTransport; +import org.glassfish.grizzly.strategies.LeaderFollowerNIOStrategy; +import org.glassfish.grizzly.strategies.SameThreadIOStrategy; import org.glassfish.grizzly.threadpool.ThreadPoolConfig; -import static org.dcache.oncrpc4j.rpc.IoStrategy.*; import static com.google.common.base.Preconditions.checkArgument; +import static org.dcache.oncrpc4j.rpc.IoStrategy.WORKER_THREAD; /** * Class with utility methods for Grizzly @@ -143,6 +145,24 @@ public static MemoryManager getMemoryManager(MemoryAllocator allocator) { default: throw new RuntimeException("Unexpected memory allocator."); } + } + /** + * Convert an oncrpc4j IoStrategy enum value into a grizzly NIOStrategy instance. Note that the only two that matter + * here are single-thread and leader-follower. Worker threads should not be used in the grizzly layer as they would + * just inject more context switching overhead to no benefit. + * + * @param ioStrategy the oncrpc4j IoStategy to map to an NIOStrategy instance. + * @return the matching NIOStrategy instance for the specified IoStrategy value. + */ + public static IOStrategy getNIOStrategy(IoStrategy ioStrategy) { + switch (ioStrategy) { + case LEADER_FOLLOWER: + return LeaderFollowerNIOStrategy.getInstance(); + case WORKER_THREAD: + case SAME_THREAD: + default: + return SameThreadIOStrategy.getInstance(); + } } } diff --git a/oncrpc4j-core/src/main/java/org/dcache/oncrpc4j/rpc/OncRpcSvc.java b/oncrpc4j-core/src/main/java/org/dcache/oncrpc4j/rpc/OncRpcSvc.java index c98129b..992bb4b 100644 --- a/oncrpc4j-core/src/main/java/org/dcache/oncrpc4j/rpc/OncRpcSvc.java +++ b/oncrpc4j-core/src/main/java/org/dcache/oncrpc4j/rpc/OncRpcSvc.java @@ -19,14 +19,15 @@ */ package org.dcache.oncrpc4j.rpc; +import org.dcache.oncrpc4j.grizzly.GrizzlyRpcTransport; import org.dcache.oncrpc4j.grizzly.StartTlsFilter; -import org.dcache.oncrpc4j.rpc.net.IpProtocolType; -import org.dcache.oncrpc4j.rpc.net.InetSocketAddresses; -import org.dcache.oncrpc4j.rpc.gss.GssProtocolFilter; -import org.dcache.oncrpc4j.rpc.gss.GssSessionManager; import org.dcache.oncrpc4j.portmap.GenericPortmapClient; import org.dcache.oncrpc4j.portmap.OncPortmapClient; import org.dcache.oncrpc4j.portmap.OncRpcPortmap; +import org.dcache.oncrpc4j.rpc.gss.GssProtocolFilter; +import org.dcache.oncrpc4j.rpc.gss.GssSessionManager; +import org.dcache.oncrpc4j.rpc.net.InetSocketAddresses; +import org.dcache.oncrpc4j.rpc.net.IpProtocolType; import org.glassfish.grizzly.CloseType; import org.glassfish.grizzly.Connection; import org.glassfish.grizzly.ConnectionProbe; @@ -46,15 +47,17 @@ import org.glassfish.grizzly.nio.transport.UDPNIOTransportBuilder; import org.glassfish.grizzly.ssl.SSLEngineConfigurator; import org.glassfish.grizzly.ssl.SSLFilter; -import org.glassfish.grizzly.strategies.SameThreadIOStrategy; import org.glassfish.grizzly.threadpool.ThreadPoolConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLParameters; import java.io.IOException; import java.net.Inet6Address; import java.net.InetAddress; import java.net.InetSocketAddress; +import java.net.SocketAddress; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.HashSet; @@ -68,16 +71,13 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.Consumer; -import javax.net.ssl.SSLContext; +import java.util.stream.Collectors; import static com.google.common.base.Throwables.getRootCause; import static com.google.common.base.Throwables.propagateIfPossible; -import java.net.SocketAddress; -import java.util.stream.Collectors; -import javax.net.ssl.SSLParameters; -import org.dcache.oncrpc4j.grizzly.GrizzlyRpcTransport; -import static org.dcache.oncrpc4j.grizzly.GrizzlyUtils.getSelectorPoolCfg; import static org.dcache.oncrpc4j.grizzly.GrizzlyUtils.getMemoryManager; +import static org.dcache.oncrpc4j.grizzly.GrizzlyUtils.getNIOStrategy; +import static org.dcache.oncrpc4j.grizzly.GrizzlyUtils.getSelectorPoolCfg; import static org.dcache.oncrpc4j.grizzly.GrizzlyUtils.rpcMessageReceiverFor; import static org.dcache.oncrpc4j.grizzly.GrizzlyUtils.transportFor; @@ -158,7 +158,7 @@ public class OncRpcSvc { final TCPNIOTransport tcpTransport = TCPNIOTransportBuilder .newInstance() .setReuseAddress(true) - .setIOStrategy(SameThreadIOStrategy.getInstance()) + .setIOStrategy(getNIOStrategy(ioStrategy)) .setSelectorThreadPoolConfig(selectorPoolConfig) .setSelectorRunnersCount(selectorPoolConfig.getMaxPoolSize()) .setMemoryManager(mm) @@ -170,7 +170,7 @@ public class OncRpcSvc { final UDPNIOTransport udpTransport = UDPNIOTransportBuilder .newInstance() .setReuseAddress(true) - .setIOStrategy(SameThreadIOStrategy.getInstance()) + .setIOStrategy(getNIOStrategy(ioStrategy)) .setSelectorThreadPoolConfig(selectorPoolConfig) .setSelectorRunnersCount(selectorPoolConfig.getMaxPoolSize()) .setMemoryManager(mm)