Skip to content

Commit

Permalink
Fix reconnection deadlock (#23)
Browse files Browse the repository at this point in the history
  • Loading branch information
quinchs authored Oct 20, 2023
1 parent 3be463b commit 4647a17
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,33 @@

import io.netty.channel.ChannelFuture;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.CompletableFuture;

public class ChannelCompletableFuture extends CompletableFuture<Void> {
private static final Logger logger = LoggerFactory.getLogger(ChannelCompletableFuture.class);

public static @NotNull ChannelCompletableFuture completeFrom(@NotNull ChannelFuture future) {
var completableFuture = new ChannelCompletableFuture();
logger.debug("Registering {}", future.hashCode());

future.addListener((v) -> {
if(v.cause() != null) {
logger.debug("Failing from {}", future.hashCode());
completableFuture.completeExceptionally(v.cause());
return;
}

if(v.isCancelled()) {
logger.debug("Cancelling from {}", future.hashCode());
completableFuture.cancel(true);
return;
}

logger.debug("Completing from {}", future.hashCode());

completableFuture.complete(null);
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,17 +56,22 @@ public ChannelHandler() {
channelActivePromise = new CompletableFuture<>();
}

public void reset() {
logger.debug("Resetting channel handler");
this.channelActivePromise = new CompletableFuture<Void>();
}

@Override
public void channelActive(@NotNull ChannelHandlerContext ctx) {
logger.debug("Channel active");
isConnected = true;
channelActivePromise.complete(null);
}

@Override
public void userEventTriggered(ChannelHandlerContext ctx, @NotNull Object evt) {
if(evt.equals("RESET")) {
channelActivePromise = new CompletableFuture<>();
} else if (evt.equals("TIMEOUT")) {
logger.debug("event fired {}", evt);
if (evt.equals("TIMEOUT")) {
var exc = new TimeoutException("A message read process passed the configured message timeout");
for(var promise : readPromises) {
promise.completeExceptionally(exc);
Expand All @@ -83,26 +88,28 @@ public void userEventTriggered(ChannelHandlerContext ctx, @NotNull Object evt) {
@Override
public void channelInactive(@NotNull ChannelHandlerContext ctx) {
isConnected = false;
logger.debug("Channel inactive");
}

@Override
public void channelRead(@NotNull ChannelHandlerContext ctx, @NotNull Object msg) {
var protocolMessage = (Receivable)msg;
logger.debug("Read fired, entering message lock, message type {}", protocolMessage.getMessageType());

if(
protocolMessage instanceof ErrorResponse && (
((ErrorResponse)protocolMessage).errorCode == ErrorCode.IDLE_SESSION_TIMEOUT_ERROR ||
((ErrorResponse)protocolMessage).errorCode == ErrorCode.IDLE_TRANSACTION_TIMEOUT_ERROR
)
) {
logger.debug("Got idle disconnect message, marking as closed");
isConnected = false;
return;
}

int completeCount = 0;

try {
logger.debug("Read fired, entering message lock, message type {}", protocolMessage.getMessageType());
if(!messageEnqueueLock.tryLock(client.getConfig().getMessageTimeoutValue(), client.getConfig().getMessageTimeoutUnit())) {
ctx.fireUserEventTriggered("TIMEOUT");
return;
Expand Down Expand Up @@ -248,7 +255,10 @@ private CompletionStage<Void> send0(AtomicInteger attempts, Sendable packet, @Nu

attempts.incrementAndGet();

return client.connect().thenCompose(n -> send0(attempts, packet, packets));
return client.reconnect().thenCompose(n -> {
logger.debug("Reconnect complete, retrying send");
return send0(attempts, packet, packets);
});
}

return send1(packet, packets);
Expand Down Expand Up @@ -333,7 +343,7 @@ public void init(Channel channel) {
@Override
public void reset() {
if(this.channel != null) {
channel.pipeline().fireUserEventTriggered("RESET");
this.channelHandler.reset();
}
}

Expand All @@ -348,11 +358,14 @@ public CompletionStage<Void> disconnect() {
return CompletableFuture.completedFuture(null);
}

if(this.channel.isOpen()) {
if(this.isConnected) {
logger.debug("Sending terminate for disconnect");
return send(new Terminate())
.thenCompose(v -> ChannelCompletableFuture.completeFrom(this.channel.close()));
.thenCompose(v -> ChannelCompletableFuture.completeFrom(this.channel.disconnect()));
}

return ChannelCompletableFuture.completeFrom(this.channel.close());
logger.debug("Closing channel without terminating");

return ChannelCompletableFuture.completeFrom(this.channel.disconnect());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import com.edgedb.driver.state.Config;
import com.edgedb.driver.state.Session;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.Optional;
Expand All @@ -15,6 +17,7 @@
import java.util.function.Function;

public abstract class BaseEdgeDBClient implements StatefulClient, EdgeDBQueryable, AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(BaseEdgeDBClient.class);
private final @NotNull AsyncEvent<BaseEdgeDBClient> onReady;
private final EdgeDBConnection connection;
private final EdgeDBClientConfig config;
Expand Down Expand Up @@ -89,7 +92,10 @@ public EdgeDBClientConfig getConfig() {
public abstract CompletionStage<Void> disconnect();

public CompletionStage<Void> reconnect() {
return disconnect().thenCompose((v) -> connect());
return disconnect().thenCompose((v) -> {
logger.debug("Executing connection attempt from reconnect");
return connect();
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -861,7 +861,11 @@ public CompletionStage<Void> connect() {
return CompletableFuture
.runAsync(() -> {
try {
this.connectionSemaphore.acquire();
logger.debug("Acquiring connection lock...");
if(!this.connectionSemaphore.tryAcquire(getConfig().getConnectionTimeoutValue(), getConfig().getConnectionTimeoutUnit())) {
logger.debug("Failed to acquire connection lock after timeout");
throw new CompletionException(new ConnectionFailedException("Connection failed to be established because of a already existing attempt"));
}
} catch (InterruptedException e) {
throw new CompletionException(e);
}
Expand All @@ -874,6 +878,7 @@ public CompletionStage<Void> connect() {
}

private CompletionStage<Void> doClientHandshake() {
logger.debug("Reading for handshake");
return getDuplexer().readNext()
.thenCompose(packet -> {
logger.debug("Processing handshake step with packet: {}", packet == null ? "NULL" : packet.getMessageType());
Expand All @@ -900,10 +905,13 @@ public CompletionStage<Void> disconnect() {
}

private CompletionStage<Void> connectInternal() {
logger.debug("Beginning to run connection logic");
if(getDuplexer().isConnected()) {
logger.debug("Already connected, ignoring connection attempt");
return CompletableFuture.completedFuture(null);
}

logger.debug("Resetting ready state");
getDuplexer().reset();
this.readyPromise = new CompletableFuture<>();

Expand All @@ -926,12 +934,17 @@ private CompletionStage<Void> connectInternal() {
connectionParams,
new ProtocolExtension[0]
))
.thenApply(v -> {
logger.debug("Sending handshake");
return v;
})
.thenCompose(getDuplexer()::send);
}

private CompletionStage<Void> retryableConnect() {
try {
return exceptionallyCompose(this.openConnection(), err -> {
logger.debug("Connection attempt failed", err);
if(err instanceof EdgeDBException && ((EdgeDBException)err).shouldReconnect) {
if(getConfig().getConnectionRetryMode() == ConnectionRetryMode.NEVER_RETRY) {
return CompletableFuture.failedFuture(new ConnectionFailedException(err));
Expand All @@ -955,6 +968,7 @@ private CompletionStage<Void> retryableConnect() {
});
}
catch (Exception x) {
logger.debug("Connection failed", x);
return CompletableFuture.failedFuture(x);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,23 +98,23 @@ protected void setTransactionState(TransactionState state) {
protected CompletionStage<Void> openConnection() {
final var connection = getConnectionArguments();


try {
logger.debug("Opening connection from bootstrap");
return exceptionallyCompose(
ChannelCompletableFuture.completeFrom(
bootstrap.connect(
connection.getHostname(),
connection.getPort()
)
),
e -> {
if(e instanceof CompletionException && e.getCause() instanceof ConnectException) {
return CompletableFuture.failedFuture(new ConnectionFailedTemporarilyException(e));
}
), e -> {
logger.debug("Connection failed", e);

return CompletableFuture.failedFuture(e);
if(e instanceof CompletionException && e.getCause() instanceof ConnectException) {
return CompletableFuture.failedFuture(new ConnectionFailedTemporarilyException(e));
}
)

return CompletableFuture.failedFuture(e);
})
.orTimeout(getConfig().getConnectionTimeoutValue(), getConfig().getConnectionTimeoutUnit());
}
catch (Exception err) {
Expand Down

0 comments on commit 4647a17

Please sign in to comment.