Skip to content

Commit

Permalink
client(reconnect): export reconnect api.
Browse files Browse the repository at this point in the history
  • Loading branch information
CherishCai committed Jul 18, 2022
1 parent 4ba3df3 commit 094036a
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 4 deletions.
11 changes: 7 additions & 4 deletions rpc-client/src/main/java/io/opc/rpc/client/BaseOpcRpcClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public void init(@Nonnull Properties properties) {
try {
final Endpoint poll = this.reconnectionSignal.poll(this.keepActive, TimeUnit.MICROSECONDS);
if (poll != null) {
this.reconnect(poll);
this.reconnect(poll, true);
}
} catch (Exception ignore) {
// ignore
Expand Down Expand Up @@ -189,12 +189,15 @@ protected void asyncSwitchServerExclude(@Nonnull Endpoint exclude) {
this.reconnectionSignal.offer(Endpoint.randomOneExclude(this.endpoints, exclude));
}

protected void reconnect(@Nonnull final Endpoint endpoint) {
protected void reconnect(@Nonnull final Endpoint endpoint, boolean retryOnFailed) {
Connection connection = this.connectToServer(endpoint);
if (connection == null) {
if (!retryOnFailed) {
throw new OpcConnectionException();
}
try {
// sleep x milliseconds to switch next server. first round delay 100ms, second round delay 200ms; max delay 5s.
Thread.sleep(Math.min(this.connRetryTimes.incrementAndGet() * 100L, 5000L));
Thread.sleep(Math.min(this.connRetryTimes.incrementAndGet() * 100L, this.keepActive));
} catch (InterruptedException ignore) {
Thread.currentThread().interrupt();
}
Expand Down Expand Up @@ -244,7 +247,7 @@ public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT
try {
ListenableFuture<Payload> future = opcGrpcServiceFutureStub.request(
PayloadObjectHelper.buildGrpcPayload(connectionInitRequest));
connectionInitResponse = PayloadObjectHelper.buildApiPayload(future.get(3000, TimeUnit.MILLISECONDS));
connectionInitResponse = PayloadObjectHelper.buildApiPayload(future.get(this.keepActive - 200, TimeUnit.MILLISECONDS));
} catch (Exception e) {
log.error("connectionInitRequest get error,requestId={}", connectionInitRequest.getRequestId(), e);
shutdownChanel(channel);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package io.opc.rpc.client;

import io.opc.rpc.api.Endpoint;
import io.opc.rpc.api.OpcRpcClient;
import io.opc.rpc.api.exception.OpcConnectionException;
import java.util.Properties;

/**
Expand All @@ -16,4 +18,12 @@ protected void doInit(Properties properties) {

}

/**
* Reconnect server.
* <p>Build a new connection and then close the old one.</p>
*/
public void reconnect() throws OpcConnectionException {
this.reconnect(Endpoint.randomOneExclude(this.endpoints, this.currentConnection.getEndpoint()), false);
}

}

0 comments on commit 094036a

Please sign in to comment.