Skip to content

Commit

Permalink
feat(RequestFuture): new request api RequestFuture.
Browse files Browse the repository at this point in the history
  • Loading branch information
CherishCai committed Jun 16, 2022
1 parent 20d39dc commit d7bf94b
Show file tree
Hide file tree
Showing 10 changed files with 179 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ private static OpcRpcClient getOpcRpcClient(Properties properties) {
while (!STOP.get()) {
final ClientTestClientRequest testClientRequest = new ClientTestClientRequest();
try {
rpcClient.asyncRequest(testClientRequest, new RequestCallback<ClientTestServerResponse>() {
rpcClient.requestAsync(testClientRequest, new RequestCallback<ClientTestServerResponse>() {
@Override
public Executor getExecutor() {
return ForkJoinPool.commonPool();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ private static void sendServerTestServerRequest(Connection connection) {
final ServerTestServerRequest testServerRequest = new ServerTestServerRequest();
testServerRequest.setServer(String.valueOf(System.currentTimeMillis()));
try {
connection.asyncRequest(testServerRequest);
connection.requestAsync(testServerRequest);
} catch (Exception e) {
// ignore
log.error("connection.asyncRequest error,{}", connection, e);
Expand Down
17 changes: 13 additions & 4 deletions rpc-api/src/main/java/io/opc/rpc/api/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,16 @@ public interface Connection {
* @param response Response
* @throws OpcConnectionException OpcConnectionException
*/
void asyncResponse(@Nonnull io.opc.rpc.api.response.Response response) throws OpcConnectionException;
void responseAsync(@Nonnull io.opc.rpc.api.response.Response response) throws OpcConnectionException;

/**
* async send Request.
*
* @param request Request
* @throws OpcConnectionException OpcConnectionException
*/
default void asyncRequest(@Nonnull io.opc.rpc.api.request.Request request) throws OpcConnectionException {
asyncRequest(request, null);
default void requestAsync(@Nonnull io.opc.rpc.api.request.Request request) throws OpcConnectionException {
requestAsync(request, null);
}

/**
Expand All @@ -59,9 +59,18 @@ default void asyncRequest(@Nonnull io.opc.rpc.api.request.Request request) throw
* @param requestCallback RequestCallback<R extends Response>, null means do not care about is.
* @throws OpcConnectionException OpcConnectionException
*/
void asyncRequest(@Nonnull io.opc.rpc.api.request.Request request, @Nullable RequestCallback<? extends Response> requestCallback)
void requestAsync(@Nonnull io.opc.rpc.api.request.Request request, @Nullable RequestCallback<? extends Response> requestCallback)
throws OpcConnectionException;

/**
* async send Request. waiting a Response.
*
* @param request Request
* @return RequestFuture<T extends Response>
* @throws OpcConnectionException OpcConnectionException
*/
<T extends Response> RequestFuture<T> requestFuture(@Nonnull io.opc.rpc.api.request.Request request) throws OpcConnectionException;

/**
* close.
*/
Expand Down
11 changes: 10 additions & 1 deletion rpc-api/src/main/java/io/opc/rpc/api/OpcRpcClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,16 @@ void registerServerRequestHandler(Class<? extends ServerRequest> requestClass,
* @param requestCallback RequestCallback<R extends Response>, null means do not care about is.
* @throws OpcConnectionException OpcConnectionException
*/
void asyncRequest(@Nonnull ClientRequest request, @Nullable RequestCallback<? extends ServerResponse> requestCallback)
void requestAsync(@Nonnull ClientRequest request, @Nullable RequestCallback<? extends ServerResponse> requestCallback)
throws OpcConnectionException;

/**
* async send Request. waiting a Response.
*
* @param request Request
* @return RequestFuture<R extends ServerResponse>
* @throws OpcConnectionException OpcConnectionException
*/
RequestFuture<? extends ServerResponse> requestFuture(@Nonnull ClientRequest request) throws OpcConnectionException;

}
14 changes: 14 additions & 0 deletions rpc-api/src/main/java/io/opc/rpc/api/RequestFuture.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package io.opc.rpc.api;

import io.opc.rpc.api.response.Response;
import java.util.concurrent.Future;

/**
* Future for request, who async waiting a Response.
*
* @author caihongwen
* @version Id: RequestFuture.java, v 0.1 2022年06月16日 14:05 caihongwen Exp $
*/
public interface RequestFuture<R extends Response> extends Future<R> {

}
24 changes: 18 additions & 6 deletions rpc-client/src/main/java/io/opc/rpc/client/BaseOpcRpcClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.opc.rpc.api.OpcRpcClient;
import io.opc.rpc.api.OpcRpcStatus;
import io.opc.rpc.api.RequestCallback;
import io.opc.rpc.api.RequestFuture;
import io.opc.rpc.api.RequestHandler;
import io.opc.rpc.api.constant.Constants;
import io.opc.rpc.api.exception.ExceptionCode;
Expand Down Expand Up @@ -143,7 +144,7 @@ public void init(@Nonnull Properties properties) {
// last active timeout will do check with ServerDetectionClientRequest
if (this.currentConnection != null && ConnectionManager.isActiveTimeout(this.currentConnection, this.keepActive)) {
try {
this.currentConnection.asyncRequest(new ServerDetectionClientRequest());
this.currentConnection.requestAsync(new ServerDetectionClientRequest());
} catch (OpcConnectionException connEx) {
log.warn("[{}]requestBi ServerDetectionClientRequest connEx", this.currentConnection.getConnectionId(), connEx);
this.asyncSwitchServerExclude(this.currentConnection.getEndpoint());
Expand Down Expand Up @@ -253,7 +254,7 @@ public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT
final ConnectionSetupClientRequest setupClientRequest = ConnectionSetupClientRequest.builder()
.clientName(this.clientName)
.labels(this.labels).build();
grpcConnection.asyncRequest(setupClientRequest);
grpcConnection.requestAsync(setupClientRequest);

log.info("connect to server success,connection={} with serverAddress={}", grpcConnection.getConnectionId(), endpoint.getAddress());
return grpcConnection;
Expand Down Expand Up @@ -299,14 +300,25 @@ public void registerServerRequestHandler(Class<? extends ServerRequest> requestC
}

@Override
public void asyncRequest(@Nonnull ClientRequest request, @Nullable RequestCallback<? extends ServerResponse> requestCallback)
public void requestAsync(@Nonnull ClientRequest request, @Nullable RequestCallback<? extends ServerResponse> requestCallback)
throws OpcConnectionException {
if (this.currentConnection == null) {
throw new OpcConnectionException(ExceptionCode.CONNECTION_ERROR);
} else if (!OpcRpcStatus.RUNNING.equals(this.rpcClientStatus.get())) {
throw new OpcConnectionException(ExceptionCode.CONNECTION_UNHEALTHY);
}
this.currentConnection.asyncRequest(request, requestCallback);
this.currentConnection.requestAsync(request, requestCallback);
}

@Override
public RequestFuture<ServerResponse> requestFuture(@Nonnull ClientRequest request) throws OpcConnectionException {

if (this.currentConnection == null) {
throw new OpcConnectionException(ExceptionCode.CONNECTION_ERROR);
} else if (!OpcRpcStatus.RUNNING.equals(this.rpcClientStatus.get())) {
throw new OpcConnectionException(ExceptionCode.CONNECTION_UNHEALTHY);
}
return this.currentConnection.requestFuture(request);
}

@Override
Expand Down Expand Up @@ -372,7 +384,7 @@ public void onNext(Payload value) {
final ConnectionResetClientResponse connectionResetResponse = new ConnectionResetClientResponse();
connectionResetResponse.setRequestId(connectionResetRequest.getRequestId());
// do response
grpcConnection.asyncResponse(connectionResetResponse);
grpcConnection.responseAsync(connectionResetResponse);
}
// ServerRequest
else if (payloadObj instanceof ServerRequest) {
Expand All @@ -385,7 +397,7 @@ else if (payloadObj instanceof ServerRequest) {
}
response.setRequestId(serverRequest.getRequestId());
// do response
grpcConnection.asyncResponse(response);
grpcConnection.responseAsync(response);
}
// ServerDetectionServerResponse
else if (payloadObj instanceof ServerDetectionServerResponse) {
Expand Down
85 changes: 85 additions & 0 deletions rpc-core/src/main/java/io/opc/rpc/core/RequestFutureTask.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package io.opc.rpc.core;

import io.opc.rpc.api.RequestCallback;
import io.opc.rpc.api.RequestFuture;
import io.opc.rpc.api.exception.OpcRpcRuntimeException;
import io.opc.rpc.api.response.ErrorResponse;
import io.opc.rpc.api.response.Response;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import lombok.AllArgsConstructor;

/**
* RequestFutureTask. Default impl for RequestFuture.
*
* @author caihongwen
* @version Id: RequestFutureTask.java, v 0.1 2022年06月16日 12:08 caihongwen Exp $
*/
public class RequestFutureTask<R extends Response> extends FutureTask<R> implements RequestFuture<R> {

protected String connectionId;

protected String requestId;

@SuppressWarnings("rawtypes")
private static final Callable EMPTY = () -> null;

public RequestFutureTask(@Nonnull String connectionId, @Nonnull String requestId) {
//noinspection unchecked
super(EMPTY);
this.connectionId = connectionId;
this.requestId = requestId;
}

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
final boolean cancel = super.cancel(mayInterruptIfRunning);
RequestCallbackSupport.clearCallback(connectionId, requestId);
return cancel;
}

/**
* Only for Opc Inner.
*/
public InvokeByRequestCallback<R> buildInvokeByRequestCallback(@Nonnull Executor executor) {
return new InvokeByRequestCallback<>(executor, this);
}

@AllArgsConstructor
static class InvokeByRequestCallback<R extends Response> implements RequestCallback<R> {

private Executor executor;

private RequestFutureTask<R> futureTask;

@Override
public Executor getExecutor() {
return executor;
}

@Override
public long getTimeout() {
// means permanent
return TimeUnit.HOURS.toMillis(23);
}

@Override
public void onTimeout() {
// Should not be triggered.
}

@Override
public void onResponse(R response) {
futureTask.set(response);
}

@Override
public void onError(ErrorResponse errResp) {
futureTask.setException(new OpcRpcRuntimeException(errResp.getResultCode(), errResp.getMessage()));
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@

import io.grpc.ManagedChannel;
import io.grpc.stub.StreamObserver;
import io.opc.rpc.api.response.Response;
import io.opc.rpc.api.RequestCallback;
import io.opc.rpc.api.RequestFuture;
import io.opc.rpc.api.exception.OpcConnectionException;
import io.opc.rpc.api.response.Response;
import io.opc.rpc.core.grpc.auto.Payload;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
Expand All @@ -29,12 +31,22 @@ public ClientGrpcConnection(ManagedChannel channel, StreamObserver<Payload> biSt
}

@Override
public void asyncRequest(@Nonnull io.opc.rpc.api.request.Request request,
@Nullable RequestCallback<? extends Response> requestCallback) {
public void requestAsync(@Nonnull io.opc.rpc.api.request.Request request,
@Nullable RequestCallback<? extends Response> requestCallback) throws OpcConnectionException {

super.requestAsync(request, requestCallback);
// refreshActiveTime for client
this.refreshActiveTime();
}

@Override
public <T extends Response> RequestFuture<T> requestFuture(@Nonnull io.opc.rpc.api.request.Request request)
throws OpcConnectionException {

super.asyncRequest(request, requestCallback);
final RequestFuture<T> future = super.requestFuture(request);
// refreshActiveTime for client
this.refreshActiveTime();
return future;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
package io.opc.rpc.core.connection;

import com.google.common.util.concurrent.MoreExecutors;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;
import io.opc.rpc.api.Connection;
import io.opc.rpc.api.RequestCallback;
import io.opc.rpc.api.RequestFuture;
import io.opc.rpc.api.exception.ExceptionCode;
import io.opc.rpc.api.exception.OpcConnectionException;
import io.opc.rpc.api.response.Response;
import io.opc.rpc.core.RequestCallbackSupport;
import io.opc.rpc.core.RequestFutureTask;
import io.opc.rpc.core.grpc.auto.Payload;
import io.opc.rpc.core.util.PayloadObjectHelper;
import java.util.concurrent.Executor;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import lombok.Builder;
Expand Down Expand Up @@ -51,12 +55,12 @@ protected void payloadNoAck(io.opc.rpc.api.Payload payload) throws OpcConnection
}

@Override
public void asyncResponse(@Nonnull Response response) throws OpcConnectionException {
public void responseAsync(@Nonnull Response response) throws OpcConnectionException {
this.payloadNoAck(response);
}

@Override
public void asyncRequest(@Nonnull io.opc.rpc.api.request.Request request,
public void requestAsync(@Nonnull io.opc.rpc.api.request.Request request,
@Nullable RequestCallback<? extends Response> requestCallback) throws OpcConnectionException {

// First async listening a Response with requestCallback(if not null).
Expand All @@ -74,6 +78,18 @@ public void asyncRequest(@Nonnull io.opc.rpc.api.request.Request request,
}
}

@Override
public <T extends Response> RequestFuture<T> requestFuture(@Nonnull io.opc.rpc.api.request.Request request)
throws OpcConnectionException {

final RequestFutureTask<T> future = new RequestFutureTask<>(this.getConnectionId(), request.getRequestId());
// maybe supply an executor
final Executor executor = MoreExecutors.directExecutor();
this.requestAsync(request, future.buildInvokeByRequestCallback(executor));

return future;
}

@Override
public void close() {
this.closeBiStream();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ protected MutableHandlerRegistry getFallbackHandlerRegistry() {
private void notifyActiveTimeoutConnectionClientDetection() {
for (Connection connection : BaseOpcRpcServer.this.connectionManager.getActiveTimeoutConnections(this.keepActive)) {
try {
connection.asyncRequest(new ClientDetectionServerRequest());
connection.requestAsync(new ClientDetectionServerRequest());
} catch (OpcConnectionException connEx) {
log.warn("[{}]Grpc requestBi ClientDetectionServerRequest connEx", connection.getConnectionId(), connEx);
BaseOpcRpcServer.this.connectionManager.removeAndClose(connection.getConnectionId());
Expand All @@ -175,7 +175,7 @@ protected void notifyAllConnectionResetServer() {
log.warn("[{}]Grpc requestBi ConnectionResetServerRequest", "notifyAllConnectionResetServer");
for (Connection connection : BaseOpcRpcServer.this.connectionManager.getConnections()) {
try {
connection.asyncRequest(connectionResetServerRequest);
connection.requestAsync(connectionResetServerRequest);
} catch (OpcConnectionException connEx) {
log.warn("[{}]Grpc requestBi ConnectionResetServerRequest connEx", connection.getConnectionId(), connEx);
} catch (Exception unknownEx) {
Expand Down Expand Up @@ -466,7 +466,7 @@ else if (payloadObj instanceof ErrorResponse) {
private void doResponseWithConnectionFirst(Response response) {
final Connection connection = BaseOpcRpcServer.this.connectionManager.getConnection(connectionId);
if (connection != null) {
connection.asyncResponse(response);
connection.responseAsync(response);
} else {
responseObserver.onNext(PayloadObjectHelper.buildGrpcPayload(response));
}
Expand Down

0 comments on commit d7bf94b

Please sign in to comment.