Skip to content

Commit

Permalink
Fix idle clients causing exceptions
Browse files Browse the repository at this point in the history
  • Loading branch information
quinchs committed Oct 15, 2023
1 parent a89822d commit 6e52959
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 21 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.edgedb.driver.binary.duplexers;

import com.edgedb.driver.ErrorCode;
import com.edgedb.driver.async.ChannelCompletableFuture;
import com.edgedb.driver.binary.packets.receivable.ErrorResponse;
import com.edgedb.driver.binary.packets.receivable.Receivable;
import com.edgedb.driver.binary.packets.sendables.Sendable;
import com.edgedb.driver.binary.packets.sendables.Terminate;
Expand Down Expand Up @@ -88,6 +90,16 @@ public void channelRead(@NotNull ChannelHandlerContext ctx, @NotNull Object msg)
var protocolMessage = (Receivable)msg;
logger.debug("Read fired, entering message lock, message type {}", protocolMessage.getMessageType());

if(
protocolMessage instanceof ErrorResponse && (
((ErrorResponse)protocolMessage).errorCode == ErrorCode.IDLE_SESSION_TIMEOUT_ERROR ||
((ErrorResponse)protocolMessage).errorCode == ErrorCode.IDLE_TRANSACTION_TIMEOUT_ERROR
)
) {
isConnected = false;
return;
}

int completeCount = 0;

try {
Expand Down Expand Up @@ -199,32 +211,48 @@ public ChannelDuplexer(EdgeDBBinaryClient client) {
private CompletionStage<Void> send1(Sendable packet, @Nullable Sendable @Nullable ... packets) {
logger.debug("Starting to send packets to {}, is connected? {}", channel, isConnected);

// return attachment to ready promise to "queue" to send if this client hasn't connected.
return this.channelHandler.whenReady().thenCompose((v) -> {
if(channel == null || !isConnected) {
return CompletableFuture.failedFuture(
new ConnectionFailedTemporarilyException("Cannot send message to a closed connection")
);
}
if(channel == null || !isConnected) {
return CompletableFuture.failedFuture(
new ConnectionFailedTemporarilyException("Cannot send message to a closed connection")
);
}

logger.debug("Beginning packet encoding and writing...");
var result = ChannelCompletableFuture.completeFrom(channel.write(packet));
logger.debug("Beginning packet encoding and writing...");
var result = ChannelCompletableFuture.completeFrom(channel.write(packet));

if(packets != null) {
for (var p : packets) {
result.thenCompose(channel.write(p));
}
if(packets != null) {
for (var p : packets) {
result.thenCompose(channel.write(p));
}
}

logger.debug("Flushing data...");
channel.flush();
logger.debug("Flush complete, returning write proxy task");
return result;
});
logger.debug("Flushing data...");
channel.flush();
logger.debug("Flush complete, returning write proxy task");
return result;
}

private CompletionStage<Void> send0(AtomicInteger attempts, Sendable packet, @Nullable Sendable... packets) {
return exceptionallyCompose(send1(packet, packets), e -> {
return exceptionallyCompose(this.channelHandler.whenReady().thenCompose(v -> {
if(!isConnected) {
logger.debug(
"Connection isn't open with a ready signal, reconnecting: {}/{}",
attempts.get(), client.getConfig().getMaxConnectionRetries()
);

if(attempts.get() >= client.getConfig().getMaxConnectionRetries()) {
return CompletableFuture.failedFuture(
new ConnectionFailedException("Failed to connect after " + attempts.get() + "attempts")
);
}

attempts.incrementAndGet();

return client.connect().thenCompose(n -> send0(attempts, packet, packets));
}

return send1(packet, packets);
}), e -> {
logger.debug("Caught failed send attempt");

if(e instanceof EdgeDBException && ((EdgeDBException)e).shouldRetry && !((EdgeDBException)e).shouldReconnect) {
Expand Down Expand Up @@ -307,7 +335,6 @@ public void reset() {
if(this.channel != null) {
channel.pipeline().fireUserEventTriggered("RESET");
}

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public abstract class EdgeDBBinaryClient extends BaseEdgeDBClient {
private boolean isIdle;
private final @NotNull Semaphore connectionSemaphore;
private final @NotNull Semaphore querySemaphore;
private final @NotNull CompletableFuture<Void> readyPromise;
private @NotNull CompletableFuture<Void> readyPromise;
private final CodecContext codecContext = new CodecContext(this);

public EdgeDBBinaryClient(EdgeDBConnection connection, EdgeDBClientConfig config, AutoCloseable poolHandle) {
Expand Down Expand Up @@ -904,6 +904,7 @@ private CompletionStage<Void> connectInternal() {
}

getDuplexer().reset();
this.readyPromise = new CompletableFuture<>();

return retryableConnect()
.thenApply(v -> {
Expand Down

0 comments on commit 6e52959

Please sign in to comment.