Skip to content

Commit

Permalink
Merge pull request #104 from kofemann/issue-99-3.3
Browse files Browse the repository at this point in the history
rpc: introduce OncRpcClientBuilder
  • Loading branch information
kofemann authored Feb 23, 2024
2 parents 9d4db2a + 299c058 commit c971bc3
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 21 deletions.
109 changes: 105 additions & 4 deletions oncrpc4j-core/src/main/java/org/dcache/oncrpc4j/rpc/OncRpcClient.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2009 - 2018 Deutsches Elektronen-Synchroton,
* Copyright (c) 2009 - 2024 Deutsches Elektronen-Synchroton,
* Member of the Helmholtz Association, (DESY), HAMBURG, GERMANY
*
* This library is free software; you can redistribute it and/or modify
Expand All @@ -22,6 +22,7 @@
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

public class OncRpcClient implements AutoCloseable {
Expand Down Expand Up @@ -52,14 +53,19 @@ public OncRpcClient(InetSocketAddress socketAddress, int protocol) {
}

public OncRpcClient(InetSocketAddress socketAddress, int protocol, int localPort, IoStrategy ioStrategy, String serviceName) {
_socketAddress = socketAddress;
_rpcsvc = new OncRpcSvcBuilder()
this(socketAddress, new OncRpcSvcBuilder()
.withClientMode()
.withPort(localPort)
.withIpProtocolType(protocol)
.withIoStrategy(ioStrategy)
.withServiceName(serviceName)
.build();
.build());
}


private OncRpcClient(InetSocketAddress socketAddress, OncRpcSvc clientSvc) {
_socketAddress = socketAddress;
_rpcsvc = clientSvc;
}

public RpcTransport connect() throws IOException {
Expand All @@ -82,4 +88,99 @@ public RpcTransport connect(long timeout, TimeUnit timeUnit) throws IOException
public void close() throws IOException {
_rpcsvc.stop();
}

public static OncRpcClientBuilder newBuilder() {
return new OncRpcClientBuilder();
}

public static class OncRpcClientBuilder {

private final OncRpcSvcBuilder svcBuilder = new OncRpcSvcBuilder()
.withClientMode()
.withWorkerThreadIoStrategy()
.withSelectorThreadPoolSize(1)
.withWorkerThreadPoolSize(1)
.withoutAutoPublish();

private OncRpcClientBuilder() {
// no direct instantiation
}

public OncRpcClientBuilder withProtocol(int protocol) {
svcBuilder.withIpProtocolType(protocol);
return this;
}

public OncRpcClientBuilder withLocalPort(int localPort) {
svcBuilder.withPort(localPort);
return this;
}

public OncRpcClientBuilder withIoStrategy(IoStrategy ioStrategy) {
svcBuilder.withIoStrategy(ioStrategy);
return this;
}

public OncRpcClientBuilder withServiceName(String serviceName) {
svcBuilder.withServiceName(serviceName);
return this;
}

public OncRpcClientBuilder withWorkerThreadPoolSize(int size) {
svcBuilder.withWorkerThreadPoolSize(size);
return this;
}

public OncRpcClientBuilder withSelectorThreadPoolSize(int size) {
svcBuilder.withSelectorThreadPoolSize(size);
return this;
}

public OncRpcClientBuilder withWorkerThreadIoStrategy() {
svcBuilder.withWorkerThreadIoStrategy();
return this;
}

public OncRpcClientBuilder withRpcService(OncRpcProgram program, RpcDispatchable dispatchable) {
svcBuilder.withRpcService(program, dispatchable);
return this;
}

public OncRpcClientBuilder withWorkerThreadExecutionService(ExecutorService executorService) {
svcBuilder.withWorkerThreadExecutionService(executorService);
return this;
}

public OncRpcClientBuilder withTCP() {
svcBuilder.withTCP();
return this;
}

public OncRpcClientBuilder withUDP() {
svcBuilder.withUDP();
return this;
}

/**
* Build a new {@link OncRpcClient} instance.
*
* @param endpoint the socket address of the remote RPC server
* @return a new {@link OncRpcClient} instance
*/
public OncRpcClient build(InetSocketAddress endpoint) {
return new OncRpcClient(endpoint, svcBuilder.build());
}

/**
* Build a new {@link OncRpcClient} instance.
*
* @param endpoint the address of the remote RPC server
* @param port the port of the remote RPC server
* @return a new {@link OncRpcClient} instance
*/
public OncRpcClient build(InetAddress endpoint, int port) {
return build(new InetSocketAddress(endpoint, port));
}
}

}
Original file line number Diff line number Diff line change
@@ -1,11 +1,5 @@
package org.dcache.oncrpc4j.rpc;

import org.dcache.oncrpc4j.rpc.OncRpcSvc;
import org.dcache.oncrpc4j.rpc.RpcCall;
import org.dcache.oncrpc4j.rpc.OncRpcProgram;
import org.dcache.oncrpc4j.rpc.OncRpcSvcBuilder;
import org.dcache.oncrpc4j.rpc.RpcDispatchable;
import org.dcache.oncrpc4j.rpc.RpcAuthTypeNone;
import org.dcache.oncrpc4j.rpc.net.IpProtocolType;
import org.dcache.oncrpc4j.xdr.XdrVoid;
import org.dcache.oncrpc4j.xdr.XdrString;
Expand Down Expand Up @@ -37,7 +31,7 @@ public class ClientServerTest {
private static final int LOST = 4;

private OncRpcSvc svc;
private OncRpcSvc clnt;
private OncRpcClient clnt;
private RpcCall clntCall;

@Before
Expand Down Expand Up @@ -90,18 +84,13 @@ public void setUp() throws IOException {
.build();
svc.start();

clnt = new OncRpcSvcBuilder()
.withoutAutoPublish()
clnt = OncRpcClient.newBuilder()
.withTCP()
.withClientMode()
.withWorkerThreadIoStrategy()
.withSelectorThreadPoolSize(1)
.withWorkerThreadPoolSize(1)
.withRpcService(new OncRpcProgram(PROGNUM, PROGVER), upper)
.withServiceName("clnt")
.build();
clnt.start();
RpcTransport t = clnt.connect(svc.getInetSocketAddress(IpProtocolType.TCP));
.build(svc.getInetSocketAddress(IpProtocolType.TCP));

RpcTransport t = clnt.connect();
clntCall = new RpcCall(PROGNUM, PROGVER, new RpcAuthTypeNone(), t);
}

Expand All @@ -111,7 +100,7 @@ public void tearDown() throws IOException {
svc.stop();
}
if (clnt != null) {
clnt.stop();
clnt.close();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1935,6 +1935,8 @@ public static void dumpClient(JrpcgenProgramInfo programInfo) {
out.println("import java.util.concurrent.TimeoutException;");
}
out.println();
out.println("import org.dcache.oncrpc4j.rpc.OncRpcClient.OncRpcClientBuilder;");
out.println();

out.println("/**");
out.println(" * The class <code>" + clientClass + "</code> implements the client stub proxy");
Expand Down Expand Up @@ -2035,6 +2037,30 @@ public static void dumpClient(JrpcgenProgramInfo programInfo) {
out.println(" }");
out.println();

out.println(" /**");
out.println(" * Constructs a <code>" + clientClass + "</code> client stub proxy object");
out.println(" * from which the " + programInfo.programId + " remote program can be accessed.");
out.println(" * @param host Internet address of host where to contact the remote program.");
out.println(" * @param port Port number at host where the remote program can be reached.");
out.println(" * @param auth {@link RpcAuth} to be used for RPC client authentication.");
out.println(" * @param program Remote program number.");
out.println(" * @param version Remote program version number.");
out.println(" * @param clientBuilder {@link org.dcache.oncrpc4j.rpc.OncRpcClient.OncRpcClientBuilder} to build the client");
out.println(" * @throws OncRpcException if an ONC/RPC error occurs.");
out.println(" * @throws IOException if an I/O error occurs.");
out.println(" */");
out.println(" public " + clientClass + "(InetAddress host, int port, RpcAuth auth, int program, int version, OncRpcClientBuilder clientBuilder)");
out.println(" throws OncRpcException, IOException {");
out.println(" rpcClient = clientBuilder.build(host, port);");
out.println(" try {");
out.println(" client = new RpcCall(program, version, auth, rpcClient.connect());");
out.println(" } catch (IOException e) {");
out.println(" rpcClient.close();");
out.println(" throw e;");
out.println(" } ");
out.println(" }");
out.println();

out.println(" /**");
out.println(" * Shutdown client connection.");
out.println(" *");
Expand Down

0 comments on commit c971bc3

Please sign in to comment.