From 299c0582146d69c83bb40b6feca27c331211cd37 Mon Sep 17 00:00:00 2001 From: Tigran Mkrtchyan Date: Fri, 23 Feb 2024 12:07:29 +0100 Subject: [PATCH] rpc: introduce OncRpcClientBuilder error proof helpert to build a customized OncRpcClient Fixes: #99 Acked-by: Lea Morschel Target: master, 3.2, 3.1 (cherry picked from commit 388014e53761652b43bcdf958802cb5f65282686) Signed-off-by: Tigran Mkrtchyan --- .../org/dcache/oncrpc4j/rpc/OncRpcClient.java | 109 +++++++++++++++++- .../dcache/oncrpc4j/rpc/ClientServerTest.java | 23 +--- .../acplt/oncrpc/apps/jrpcgen/jrpcgen.java | 26 +++++ 3 files changed, 137 insertions(+), 21 deletions(-) diff --git a/oncrpc4j-core/src/main/java/org/dcache/oncrpc4j/rpc/OncRpcClient.java b/oncrpc4j-core/src/main/java/org/dcache/oncrpc4j/rpc/OncRpcClient.java index 3a4297bb..e32da6e8 100644 --- a/oncrpc4j-core/src/main/java/org/dcache/oncrpc4j/rpc/OncRpcClient.java +++ b/oncrpc4j-core/src/main/java/org/dcache/oncrpc4j/rpc/OncRpcClient.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2009 - 2018 Deutsches Elektronen-Synchroton, + * Copyright (c) 2009 - 2024 Deutsches Elektronen-Synchroton, * Member of the Helmholtz Association, (DESY), HAMBURG, GERMANY * * This library is free software; you can redistribute it and/or modify @@ -22,6 +22,7 @@ import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; public class OncRpcClient implements AutoCloseable { @@ -52,14 +53,19 @@ public OncRpcClient(InetSocketAddress socketAddress, int protocol) { } public OncRpcClient(InetSocketAddress socketAddress, int protocol, int localPort, IoStrategy ioStrategy, String serviceName) { - _socketAddress = socketAddress; - _rpcsvc = new OncRpcSvcBuilder() + this(socketAddress, new OncRpcSvcBuilder() .withClientMode() .withPort(localPort) .withIpProtocolType(protocol) .withIoStrategy(ioStrategy) .withServiceName(serviceName) - .build(); + .build()); + } + + + private OncRpcClient(InetSocketAddress socketAddress, OncRpcSvc clientSvc) { + _socketAddress = socketAddress; + _rpcsvc = clientSvc; } public RpcTransport connect() throws IOException { @@ -82,4 +88,99 @@ public RpcTransport connect(long timeout, TimeUnit timeUnit) throws IOException public void close() throws IOException { _rpcsvc.stop(); } + + public static OncRpcClientBuilder newBuilder() { + return new OncRpcClientBuilder(); + } + + public static class OncRpcClientBuilder { + + private final OncRpcSvcBuilder svcBuilder = new OncRpcSvcBuilder() + .withClientMode() + .withWorkerThreadIoStrategy() + .withSelectorThreadPoolSize(1) + .withWorkerThreadPoolSize(1) + .withoutAutoPublish(); + + private OncRpcClientBuilder() { + // no direct instantiation + } + + public OncRpcClientBuilder withProtocol(int protocol) { + svcBuilder.withIpProtocolType(protocol); + return this; + } + + public OncRpcClientBuilder withLocalPort(int localPort) { + svcBuilder.withPort(localPort); + return this; + } + + public OncRpcClientBuilder withIoStrategy(IoStrategy ioStrategy) { + svcBuilder.withIoStrategy(ioStrategy); + return this; + } + + public OncRpcClientBuilder withServiceName(String serviceName) { + svcBuilder.withServiceName(serviceName); + return this; + } + + public OncRpcClientBuilder withWorkerThreadPoolSize(int size) { + svcBuilder.withWorkerThreadPoolSize(size); + return this; + } + + public OncRpcClientBuilder withSelectorThreadPoolSize(int size) { + svcBuilder.withSelectorThreadPoolSize(size); + return this; + } + + public OncRpcClientBuilder withWorkerThreadIoStrategy() { + svcBuilder.withWorkerThreadIoStrategy(); + return this; + } + + public OncRpcClientBuilder withRpcService(OncRpcProgram program, RpcDispatchable dispatchable) { + svcBuilder.withRpcService(program, dispatchable); + return this; + } + + public OncRpcClientBuilder withWorkerThreadExecutionService(ExecutorService executorService) { + svcBuilder.withWorkerThreadExecutionService(executorService); + return this; + } + + public OncRpcClientBuilder withTCP() { + svcBuilder.withTCP(); + return this; + } + + public OncRpcClientBuilder withUDP() { + svcBuilder.withUDP(); + return this; + } + + /** + * Build a new {@link OncRpcClient} instance. + * + * @param endpoint the socket address of the remote RPC server + * @return a new {@link OncRpcClient} instance + */ + public OncRpcClient build(InetSocketAddress endpoint) { + return new OncRpcClient(endpoint, svcBuilder.build()); + } + + /** + * Build a new {@link OncRpcClient} instance. + * + * @param endpoint the address of the remote RPC server + * @param port the port of the remote RPC server + * @return a new {@link OncRpcClient} instance + */ + public OncRpcClient build(InetAddress endpoint, int port) { + return build(new InetSocketAddress(endpoint, port)); + } + } + } diff --git a/oncrpc4j-core/src/test/java/org/dcache/oncrpc4j/rpc/ClientServerTest.java b/oncrpc4j-core/src/test/java/org/dcache/oncrpc4j/rpc/ClientServerTest.java index cca6f60d..4d656271 100644 --- a/oncrpc4j-core/src/test/java/org/dcache/oncrpc4j/rpc/ClientServerTest.java +++ b/oncrpc4j-core/src/test/java/org/dcache/oncrpc4j/rpc/ClientServerTest.java @@ -1,11 +1,5 @@ package org.dcache.oncrpc4j.rpc; -import org.dcache.oncrpc4j.rpc.OncRpcSvc; -import org.dcache.oncrpc4j.rpc.RpcCall; -import org.dcache.oncrpc4j.rpc.OncRpcProgram; -import org.dcache.oncrpc4j.rpc.OncRpcSvcBuilder; -import org.dcache.oncrpc4j.rpc.RpcDispatchable; -import org.dcache.oncrpc4j.rpc.RpcAuthTypeNone; import org.dcache.oncrpc4j.rpc.net.IpProtocolType; import org.dcache.oncrpc4j.xdr.XdrVoid; import org.dcache.oncrpc4j.xdr.XdrString; @@ -37,7 +31,7 @@ public class ClientServerTest { private static final int LOST = 4; private OncRpcSvc svc; - private OncRpcSvc clnt; + private OncRpcClient clnt; private RpcCall clntCall; @Before @@ -90,18 +84,13 @@ public void setUp() throws IOException { .build(); svc.start(); - clnt = new OncRpcSvcBuilder() - .withoutAutoPublish() + clnt = OncRpcClient.newBuilder() .withTCP() - .withClientMode() - .withWorkerThreadIoStrategy() - .withSelectorThreadPoolSize(1) - .withWorkerThreadPoolSize(1) .withRpcService(new OncRpcProgram(PROGNUM, PROGVER), upper) .withServiceName("clnt") - .build(); - clnt.start(); - RpcTransport t = clnt.connect(svc.getInetSocketAddress(IpProtocolType.TCP)); + .build(svc.getInetSocketAddress(IpProtocolType.TCP)); + + RpcTransport t = clnt.connect(); clntCall = new RpcCall(PROGNUM, PROGVER, new RpcAuthTypeNone(), t); } @@ -111,7 +100,7 @@ public void tearDown() throws IOException { svc.stop(); } if (clnt != null) { - clnt.stop(); + clnt.close(); } } diff --git a/oncrpc4j-rpcgen/src/main/java/org/acplt/oncrpc/apps/jrpcgen/jrpcgen.java b/oncrpc4j-rpcgen/src/main/java/org/acplt/oncrpc/apps/jrpcgen/jrpcgen.java index 3e7c1ab7..78f8adbf 100644 --- a/oncrpc4j-rpcgen/src/main/java/org/acplt/oncrpc/apps/jrpcgen/jrpcgen.java +++ b/oncrpc4j-rpcgen/src/main/java/org/acplt/oncrpc/apps/jrpcgen/jrpcgen.java @@ -1935,6 +1935,8 @@ public static void dumpClient(JrpcgenProgramInfo programInfo) { out.println("import java.util.concurrent.TimeoutException;"); } out.println(); + out.println("import org.dcache.oncrpc4j.rpc.OncRpcClient.OncRpcClientBuilder;"); + out.println(); out.println("/**"); out.println(" * The class " + clientClass + " implements the client stub proxy"); @@ -2035,6 +2037,30 @@ public static void dumpClient(JrpcgenProgramInfo programInfo) { out.println(" }"); out.println(); + out.println(" /**"); + out.println(" * Constructs a " + clientClass + " client stub proxy object"); + out.println(" * from which the " + programInfo.programId + " remote program can be accessed."); + out.println(" * @param host Internet address of host where to contact the remote program."); + out.println(" * @param port Port number at host where the remote program can be reached."); + out.println(" * @param auth {@link RpcAuth} to be used for RPC client authentication."); + out.println(" * @param program Remote program number."); + out.println(" * @param version Remote program version number."); + out.println(" * @param clientBuilder {@link org.dcache.oncrpc4j.rpc.OncRpcClient.OncRpcClientBuilder} to build the client"); + out.println(" * @throws OncRpcException if an ONC/RPC error occurs."); + out.println(" * @throws IOException if an I/O error occurs."); + out.println(" */"); + out.println(" public " + clientClass + "(InetAddress host, int port, RpcAuth auth, int program, int version, OncRpcClientBuilder clientBuilder)"); + out.println(" throws OncRpcException, IOException {"); + out.println(" rpcClient = clientBuilder.build(host, port);"); + out.println(" try {"); + out.println(" client = new RpcCall(program, version, auth, rpcClient.connect());"); + out.println(" } catch (IOException e) {"); + out.println(" rpcClient.close();"); + out.println(" throw e;"); + out.println(" } "); + out.println(" }"); + out.println(); + out.println(" /**"); out.println(" * Shutdown client connection."); out.println(" *");