From 9b0bd4f0372a9ed2d2256a284250f5f10fca1004 Mon Sep 17 00:00:00 2001 From: Quin Lynch Date: Mon, 11 Dec 2023 18:00:22 -0400 Subject: [PATCH 1/2] add cleaners to clients --- .../java/com/edgedb/examples/Example.java | 2 +- .../com/edgedb/examples/GlobalsAndConfig.java | 12 +- .../main/java/com/edgedb/examples/Main.java | 9 +- .../com/edgedb/examples/GlobalsAndConfig.kt | 12 +- .../main/kotlin/com/edgedb/examples/Main.kt | 20 +-- .../scala-examples/src/main/scala/Main.scala | 2 - .../java/com/edgedb/driver/EdgeDBClient.java | 120 ++++++++++++++--- .../driver/clients/BaseEdgeDBClient.java | 13 +- .../driver/clients/EdgeDBBinaryClient.java | 5 +- .../driver/clients/EdgeDBHttpClient.java | 5 +- .../driver/clients/EdgeDBTCPClient.java | 15 +-- .../com/edgedb/driver/pooling/ClientPool.java | 127 ++++++++++++++++++ .../edgedb/driver/pooling/PoolContract.java | 42 ++++++ .../edgedb/driver/util/CleanerProvider.java | 12 ++ .../edgedb/driver/util/ClientPoolHolder.java | 106 --------------- 15 files changed, 332 insertions(+), 170 deletions(-) create mode 100644 src/driver/src/main/java/com/edgedb/driver/pooling/ClientPool.java create mode 100644 src/driver/src/main/java/com/edgedb/driver/pooling/PoolContract.java create mode 100644 src/driver/src/main/java/com/edgedb/driver/util/CleanerProvider.java delete mode 100644 src/driver/src/main/java/com/edgedb/driver/util/ClientPoolHolder.java diff --git a/examples/java-examples/src/main/java/com/edgedb/examples/Example.java b/examples/java-examples/src/main/java/com/edgedb/examples/Example.java index e6eabd47..9c5c33e1 100644 --- a/examples/java-examples/src/main/java/com/edgedb/examples/Example.java +++ b/examples/java-examples/src/main/java/com/edgedb/examples/Example.java @@ -5,5 +5,5 @@ import java.util.concurrent.CompletionStage; public interface Example { - CompletionStage run(EdgeDBClient client); + CompletionStage run(EdgeDBClient client) throws Exception; } \ No newline at end of file diff --git a/examples/java-examples/src/main/java/com/edgedb/examples/GlobalsAndConfig.java b/examples/java-examples/src/main/java/com/edgedb/examples/GlobalsAndConfig.java index 7d880cad..b5d8f935 100644 --- a/examples/java-examples/src/main/java/com/edgedb/examples/GlobalsAndConfig.java +++ b/examples/java-examples/src/main/java/com/edgedb/examples/GlobalsAndConfig.java @@ -13,16 +13,16 @@ public final class GlobalsAndConfig implements Example { private static final Logger logger = LoggerFactory.getLogger(GlobalsAndConfig.class); @Override - public CompletionStage run(EdgeDBClient client) { - var configuredClient = client + public CompletionStage run(EdgeDBClient client) throws Exception { + try(var configuredClient = client .withConfig(config -> config .withIdleTransactionTimeout(Duration.ZERO) .applyAccessPolicies(true)) .withGlobals(new HashMap<>(){{ put("current_user_id", UUID.randomUUID()); - }}); - - return configuredClient.queryRequiredSingle(UUID.class, "select global current_user_id") - .thenAccept(result -> logger.info("current_user_id global: {}", result)); + }})) { + return configuredClient.queryRequiredSingle(UUID.class, "select global current_user_id") + .thenAccept(result -> logger.info("current_user_id global: {}", result)); + } } } \ No newline at end of file diff --git a/examples/java-examples/src/main/java/com/edgedb/examples/Main.java b/examples/java-examples/src/main/java/com/edgedb/examples/Main.java index 0204e280..5f88ca08 100644 --- a/examples/java-examples/src/main/java/com/edgedb/examples/Main.java +++ b/examples/java-examples/src/main/java/com/edgedb/examples/Main.java @@ -1,19 +1,18 @@ package com.edgedb.examples; -import com.edgedb.driver.*; -import com.edgedb.driver.exceptions.EdgeDBException; +import com.edgedb.driver.EdgeDBClient; +import com.edgedb.driver.EdgeDBClientConfig; import com.edgedb.driver.namingstrategies.NamingStrategy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.util.ArrayList; import java.util.function.Supplier; public class Main { private static final Logger logger = LoggerFactory.getLogger(Main.class); - public static void main(String[] args) throws IOException, EdgeDBException { + public static void main(String[] args) throws Exception { var client = new EdgeDBClient(EdgeDBClientConfig.builder() .withNamingStrategy(NamingStrategy.snakeCase()) .useFieldSetters(true) @@ -24,7 +23,7 @@ public static void main(String[] args) throws IOException, EdgeDBException { logger.info("Examples complete"); - System.exit(0); + client.close(); } private static void runJavaExamples(EdgeDBClient client) { diff --git a/examples/kotlin-examples/src/main/kotlin/com/edgedb/examples/GlobalsAndConfig.kt b/examples/kotlin-examples/src/main/kotlin/com/edgedb/examples/GlobalsAndConfig.kt index 191ba4c6..bfc5b79c 100644 --- a/examples/kotlin-examples/src/main/kotlin/com/edgedb/examples/GlobalsAndConfig.kt +++ b/examples/kotlin-examples/src/main/kotlin/com/edgedb/examples/GlobalsAndConfig.kt @@ -21,11 +21,13 @@ class GlobalsAndConfig : Example { "current_user_id" to UUID.randomUUID() )) - val currentUserId = configuredClient.queryRequiredSingle( - UUID::class.java, - "SELECT GLOBAL current_user_id" - ).await() + configuredClient.use { + val currentUserId = configuredClient.queryRequiredSingle( + UUID::class.java, + "SELECT GLOBAL current_user_id" + ).await() - logger.info("Current user ID: {}", currentUserId) + logger.info("Current user ID: {}", currentUserId) + } } } \ No newline at end of file diff --git a/examples/kotlin-examples/src/main/kotlin/com/edgedb/examples/Main.kt b/examples/kotlin-examples/src/main/kotlin/com/edgedb/examples/Main.kt index b5928bb8..d7bff368 100644 --- a/examples/kotlin-examples/src/main/kotlin/com/edgedb/examples/Main.kt +++ b/examples/kotlin-examples/src/main/kotlin/com/edgedb/examples/Main.kt @@ -28,18 +28,18 @@ object Main { Transactions() ) - runBlocking { - for (example in examples) { - logger.info("Running Kotlin example {}...", example) - try { - example.runAsync(client) - logger.info("Kotlin example {} complete!", example) - } catch (x: Exception) { - logger.error("Failed to run Kotlin example {}", example, x) + client.use { + runBlocking { + for (example in examples) { + logger.info("Running Kotlin example {}...", example) + try { + example.runAsync(client) + logger.info("Kotlin example {} complete!", example) + } catch (x: Exception) { + logger.error("Failed to run Kotlin example {}", example, x) + } } } } - - exitProcess(0) } } \ No newline at end of file diff --git a/examples/scala-examples/src/main/scala/Main.scala b/examples/scala-examples/src/main/scala/Main.scala index 4a4e8b5d..b2f546a7 100644 --- a/examples/scala-examples/src/main/scala/Main.scala +++ b/examples/scala-examples/src/main/scala/Main.scala @@ -32,8 +32,6 @@ def main(): Unit = { Await.ready(runExample(logger, client, example), Duration.Inf) logger.info("Examples complete!") - - System.exit(0) } private def runExample(logger: Logger, client: EdgeDBClient, example: Example)(implicit context: ExecutionContext): Future[Unit] = { diff --git a/src/driver/src/main/java/com/edgedb/driver/EdgeDBClient.java b/src/driver/src/main/java/com/edgedb/driver/EdgeDBClient.java index 523778db..71a5bc6e 100644 --- a/src/driver/src/main/java/com/edgedb/driver/EdgeDBClient.java +++ b/src/driver/src/main/java/com/edgedb/driver/EdgeDBClient.java @@ -5,9 +5,11 @@ import com.edgedb.driver.datatypes.Json; import com.edgedb.driver.exceptions.ConfigurationException; import com.edgedb.driver.exceptions.EdgeDBException; +import com.edgedb.driver.pooling.ClientPool; +import com.edgedb.driver.pooling.PoolContract; import com.edgedb.driver.state.Config; import com.edgedb.driver.state.Session; -import com.edgedb.driver.util.ClientPoolHolder; +import com.edgedb.driver.util.CleanerProvider; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; @@ -24,6 +26,7 @@ import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.function.Function; @@ -58,11 +61,13 @@ public Duration age() { private final @NotNull ConcurrentLinkedQueue clients; private final EdgeDBConnection connection; private final EdgeDBClientConfig config; - private final ClientPoolHolder poolHolder; + private final ClientPool clientPool; private final ClientFactory clientFactory; private final Session session; private final int clientAvailability; + private final AtomicBoolean isClosed; + /** * Constructs a new {@linkplain EdgeDBClient}. * @param connection The connection parameters used to connect this client to EdgeDB. @@ -73,10 +78,20 @@ public EdgeDBClient(EdgeDBConnection connection, @NotNull EdgeDBClientConfig con this.clients = new ConcurrentLinkedQueue<>(); this.config = config; this.connection = connection; - this.poolHolder = new ClientPoolHolder(config.getPoolSize()); this.clientFactory = createClientFactory(); this.session = Session.DEFAULT; this.clientAvailability = config.getClientAvailability(); + this.isClosed = new AtomicBoolean(false); + + this.clientPool = new ClientPool(config.getPoolSize()); + this.clientPool.addShareholder(); + + CleanerProvider.getCleaner().register(this, new CleanerState( + this.clientPool, + this.clients, + this.clientCount, + this.isClosed + )); } /** @@ -111,10 +126,20 @@ private EdgeDBClient(@NotNull EdgeDBClient other, Session session) { this.clients = new ConcurrentLinkedQueue<>(); this.config = other.config; this.connection = other.connection; - this.poolHolder = other.poolHolder; this.clientFactory = other.clientFactory; this.session = session; this.clientAvailability = other.clientAvailability; + this.isClosed = new AtomicBoolean(false); + + this.clientPool = other.clientPool; + this.clientPool.addShareholder(); + + CleanerProvider.getCleaner().register(this, new CleanerState( + this.clientPool, + this.clients, + this.clientCount, + this.isClosed + )); } public int getClientCount() { @@ -327,15 +352,6 @@ public CompletionStage> queryJsonElements(@NotNull String query, @Nul ); } - @Override - public void close() throws Exception { - int count = clientCount.get(); - while(!clients.isEmpty() && count > 0) { - clients.poll().client.disconnect().toCompletableFuture().get(); - count = clientCount.decrementAndGet(); - } - } - private synchronized CompletionStage getClient() { logger.trace("polling cached clients..."); var cachedClient = clients.poll(); @@ -394,15 +410,15 @@ private synchronized void acceptClient(BaseEdgeDBClient client) { private synchronized @NotNull CompletionStage onClientReady(@NotNull BaseEdgeDBClient client) { var suggestedConcurrency = client.getSuggestedPoolConcurrency(); - suggestedConcurrency.ifPresent(this.poolHolder::resize); + suggestedConcurrency.ifPresent(this.clientPool::resize); return CompletableFuture.completedFuture(null); } private CompletionStage createClient() { - return this.poolHolder.acquireContract() + return this.clientPool.acquireContract() .thenApply(contract -> { - logger.trace("Contract acquired, remaining handles: {}", this.poolHolder.remaining()); + logger.trace("Contract acquired, remaining handles: {}", this.clientPool.remaining()); BaseEdgeDBClient client; try { client = clientFactory.create(this.connection, this.config, contract); @@ -417,9 +433,79 @@ private CompletionStage createClient() { .thenApply(client -> client.withSession(this.session)); } + @Override + public void close() throws Exception { + logger.debug("Cleaning from explicit close"); + if(!isClosed.compareAndSet(false, true)) { + logger.debug("Cleaning already preformed"); + return; + } + + int count = clientCount.get(); + while(!clients.isEmpty() && count > 0) { + clients.poll().client.disconnect().toCompletableFuture().get(); + count = clientCount.decrementAndGet(); + } + + if(this.clientPool.removeShareholder()) { + this.clientPool.close(); + } + + logger.debug("Cleaning complete"); + } + + private static class CleanerState implements Runnable { + private final ClientPool pool; + private final ConcurrentLinkedQueue clients; + private final AtomicInteger clientCount; + private final AtomicBoolean isClosed; + + public CleanerState( + ClientPool pool, + ConcurrentLinkedQueue clients, + AtomicInteger clientCount, + AtomicBoolean isClosed + ) { + this.pool = pool; + this.clients = clients; + this.clientCount = clientCount; + this.isClosed = isClosed; + } + + @Override + public void run() { + logger.debug("Running cleaning from garbage collection"); + if(!isClosed.compareAndSet(false, true)) { + logger.debug("Cleaning already preformed"); + return; + } + + int count = clientCount.get(); + while(!clients.isEmpty() && count > 0) { + try { + clients.poll().client.disconnect().toCompletableFuture().get(); + } catch (Exception x) { + logger.warn("Failed to disconnect client", x); + } + count = clientCount.decrementAndGet(); + } + + if(this.pool.removeShareholder()) { + try { + this.pool.close(); + } catch (Exception x) { + logger.warn("Failed to close client pool", x); + } + } + + logger.debug("Cleaning complete"); + } + } + + @FunctionalInterface private interface ClientFactory { - BaseEdgeDBClient create(EdgeDBConnection connection, EdgeDBClientConfig config, AutoCloseable poolHandle) + BaseEdgeDBClient create(EdgeDBConnection connection, EdgeDBClientConfig config, PoolContract poolContract) throws EdgeDBException; } } diff --git a/src/driver/src/main/java/com/edgedb/driver/clients/BaseEdgeDBClient.java b/src/driver/src/main/java/com/edgedb/driver/clients/BaseEdgeDBClient.java index 62ba6a24..3d201c60 100644 --- a/src/driver/src/main/java/com/edgedb/driver/clients/BaseEdgeDBClient.java +++ b/src/driver/src/main/java/com/edgedb/driver/clients/BaseEdgeDBClient.java @@ -4,6 +4,7 @@ import com.edgedb.driver.EdgeDBConnection; import com.edgedb.driver.EdgeDBQueryable; import com.edgedb.driver.async.AsyncEvent; +import com.edgedb.driver.pooling.PoolContract; import com.edgedb.driver.state.Config; import com.edgedb.driver.state.Session; import org.jetbrains.annotations.NotNull; @@ -21,18 +22,22 @@ public abstract class BaseEdgeDBClient implements StatefulClient, EdgeDBQueryabl private final @NotNull AsyncEvent onReady; private final EdgeDBConnection connection; private final EdgeDBClientConfig config; - private final AutoCloseable poolHandle; + private final PoolContract poolContract; protected Session session; - public BaseEdgeDBClient(EdgeDBConnection connection, EdgeDBClientConfig config, AutoCloseable poolHandle) { + public BaseEdgeDBClient(EdgeDBConnection connection, EdgeDBClientConfig config, PoolContract poolContract) { this.connection = connection; this.config = config; this.session = new Session(); - this.poolHandle = poolHandle; + this.poolContract = poolContract; this.onReady = new AsyncEvent<>(); } + protected PoolContract getContract() { + return this.poolContract; + } + public void onReady(Function> handler) { this.onReady.add(handler); } @@ -100,6 +105,6 @@ public CompletionStage reconnect() { @Override public void close() throws Exception { - this.poolHandle.close(); + this.poolContract.close(); } } diff --git a/src/driver/src/main/java/com/edgedb/driver/clients/EdgeDBBinaryClient.java b/src/driver/src/main/java/com/edgedb/driver/clients/EdgeDBBinaryClient.java index 9dc7c277..41fa3b11 100644 --- a/src/driver/src/main/java/com/edgedb/driver/clients/EdgeDBBinaryClient.java +++ b/src/driver/src/main/java/com/edgedb/driver/clients/EdgeDBBinaryClient.java @@ -15,6 +15,7 @@ import com.edgedb.driver.exceptions.EdgeDBErrorException; import com.edgedb.driver.exceptions.EdgeDBException; import com.edgedb.driver.exceptions.ResultCardinalityMismatchException; +import com.edgedb.driver.pooling.PoolContract; import io.netty.buffer.ByteBuf; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -46,8 +47,8 @@ public abstract class EdgeDBBinaryClient extends BaseEdgeDBClient { private @NotNull CompletableFuture readyPromise; private final CodecContext codecContext = new CodecContext(this); - public EdgeDBBinaryClient(EdgeDBConnection connection, EdgeDBClientConfig config, AutoCloseable poolHandle) { - super(connection, config, poolHandle); + public EdgeDBBinaryClient(EdgeDBConnection connection, EdgeDBClientConfig config, PoolContract poolContract) { + super(connection, config, poolContract); this.connectionSemaphore = new Semaphore(1); this.querySemaphore = new Semaphore(1); this.readyPromise = new CompletableFuture<>(); diff --git a/src/driver/src/main/java/com/edgedb/driver/clients/EdgeDBHttpClient.java b/src/driver/src/main/java/com/edgedb/driver/clients/EdgeDBHttpClient.java index 7c44ea71..6c8078f0 100644 --- a/src/driver/src/main/java/com/edgedb/driver/clients/EdgeDBHttpClient.java +++ b/src/driver/src/main/java/com/edgedb/driver/clients/EdgeDBHttpClient.java @@ -8,6 +8,7 @@ import com.edgedb.driver.exceptions.ConnectionFailedException; import com.edgedb.driver.exceptions.EdgeDBException; import com.edgedb.driver.exceptions.ScramException; +import com.edgedb.driver.pooling.PoolContract; import com.edgedb.driver.util.Scram; import com.edgedb.driver.util.SslUtils; import org.jetbrains.annotations.Nullable; @@ -45,8 +46,8 @@ public final class EdgeDBHttpClient extends EdgeDBBinaryClient { private @Nullable URI authUri; private @Nullable URI execUri; - public EdgeDBHttpClient(EdgeDBConnection connection, EdgeDBClientConfig config, AutoCloseable poolHandle) throws EdgeDBException { - super(connection, config, poolHandle); + public EdgeDBHttpClient(EdgeDBConnection connection, EdgeDBClientConfig config, PoolContract poolContract) throws EdgeDBException { + super(connection, config, poolContract); this.duplexer = new HttpDuplexer(this); SSLContext context; try { diff --git a/src/driver/src/main/java/com/edgedb/driver/clients/EdgeDBTCPClient.java b/src/driver/src/main/java/com/edgedb/driver/clients/EdgeDBTCPClient.java index bf4ae09a..ceca2a20 100644 --- a/src/driver/src/main/java/com/edgedb/driver/clients/EdgeDBTCPClient.java +++ b/src/driver/src/main/java/com/edgedb/driver/clients/EdgeDBTCPClient.java @@ -5,18 +5,16 @@ import com.edgedb.driver.binary.PacketSerializer; import com.edgedb.driver.binary.duplexers.ChannelDuplexer; import com.edgedb.driver.exceptions.ConnectionFailedTemporarilyException; +import com.edgedb.driver.pooling.PoolContract; import com.edgedb.driver.util.SslUtils; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; -import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.ssl.ApplicationProtocolConfig; import io.netty.handler.ssl.SslContextBuilder; -import io.netty.util.concurrent.DefaultEventExecutorGroup; -import io.netty.util.concurrent.EventExecutorGroup; import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,20 +29,17 @@ public class EdgeDBTCPClient extends EdgeDBBinaryClient implements TransactableClient { private static final Logger logger = LoggerFactory.getLogger(EdgeDBTCPClient.class); - private static final NioEventLoopGroup NETTY_TCP_GROUP = new NioEventLoopGroup(); - private static final EventExecutorGroup DUPLEXER_GROUP = new DefaultEventExecutorGroup(8); - private final @NotNull ChannelDuplexer duplexer; private final Bootstrap bootstrap; private TransactionState transactionState; - public EdgeDBTCPClient(EdgeDBConnection connection, EdgeDBClientConfig config, AutoCloseable poolHandle) { - super(connection, config, poolHandle); + public EdgeDBTCPClient(EdgeDBConnection connection, EdgeDBClientConfig config, PoolContract poolContract) { + super(connection, config, poolContract); this.duplexer = new ChannelDuplexer(this); this.bootstrap = new Bootstrap() .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) - .group(NETTY_TCP_GROUP) + .group(poolContract.getPool().getNettyEventGroup()) .channel(NioSocketChannel.class) .handler(new ChannelInitializer() { @Override @@ -77,7 +72,7 @@ protected void initChannel(@NotNull SocketChannel ch) throws Exception { PacketSerializer.createEncoder() ); - pipeline.addLast(DUPLEXER_GROUP, duplexer.channelHandler); + pipeline.addLast(poolContract.getPool().getDuplexerGroup(), duplexer.channelHandler); duplexer.init(ch); } diff --git a/src/driver/src/main/java/com/edgedb/driver/pooling/ClientPool.java b/src/driver/src/main/java/com/edgedb/driver/pooling/ClientPool.java new file mode 100644 index 00000000..fb6c2ab9 --- /dev/null +++ b/src/driver/src/main/java/com/edgedb/driver/pooling/ClientPool.java @@ -0,0 +1,127 @@ +package com.edgedb.driver.pooling; + +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.util.concurrent.DefaultEventExecutorGroup; +import io.netty.util.concurrent.EventExecutorGroup; +import org.jetbrains.annotations.NotNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +public final class ClientPool implements AutoCloseable { + private static final Logger logger = LoggerFactory.getLogger(ClientPool.class); + + public NioEventLoopGroup nettyEventGroup; + public EventExecutorGroup duplexerGroup; + + private long size; + private final @NotNull ConcurrentLinkedQueue> queue; + + private final @NotNull AtomicLong count; + + private final @NotNull AtomicInteger shareholders; + + public ClientPool(int initialSize) { + this.size = initialSize; + this.queue = new ConcurrentLinkedQueue<>(); + this.count = new AtomicLong(size); + this.shareholders = new AtomicInteger(0); + } + + public int getShareholders() { + return this.shareholders.get(); + } + + public void addShareholder() { + logger.debug("Shareholder added, now at {}", this.shareholders.incrementAndGet()); + } + + public boolean removeShareholder() { + var c = this.shareholders.decrementAndGet(); + logger.debug("Shareholder removed, now at {}", c); + return c <= 0; + } + + public synchronized NioEventLoopGroup getNettyEventGroup() { + if(this.nettyEventGroup == null) { + this.nettyEventGroup = new NioEventLoopGroup(); + } + + return this.nettyEventGroup; + } + + public synchronized EventExecutorGroup getDuplexerGroup() { + if(this.duplexerGroup == null) { + this.duplexerGroup = new DefaultEventExecutorGroup(8); + } + + return this.duplexerGroup; + } + + public long remaining() { + return count.get(); + } + + public void resize(long newValue) { + if(newValue == this.size) { + return; + } + + count.getAndUpdate(v -> v + (newValue - size)); + this.size = newValue; + } + + public CompletionStage acquireContract() { + var c = count.decrementAndGet(); + + if(c < 0) { + count.compareAndSet(-1, 0); + + var promise = new CompletableFuture(); + + queue.add(promise); + + return promise.thenApply(v -> lendContract()); + } + else { + return CompletableFuture.completedFuture(lendContract()); + } + } + + private @NotNull PoolContract lendContract() { + return new PoolContract(this, this::completeContract); + } + + private void completeContract(PoolContract contract) { + logger.debug("Completing contract {}...", contract); + + if(queue.isEmpty()) { + logger.debug("Empty contract queue, incrementing count"); + count.incrementAndGet(); + return; + } + + + logger.debug("Polling queue and completing..."); + Objects.requireNonNull(queue.poll()).complete(null); + } + + @Override + public void close() throws Exception { + logger.debug("Closing client pool"); + + if(this.nettyEventGroup != null) { + this.nettyEventGroup.shutdownGracefully().addListener(f -> logger.debug("Client thread pool shutdown: {}", f.isSuccess())); + } + + if(this.duplexerGroup != null) { + this.duplexerGroup.shutdownGracefully().addListener(f -> logger.debug("Client duplex pool shutdown: {}", f.isSuccess())); + } + } +} diff --git a/src/driver/src/main/java/com/edgedb/driver/pooling/PoolContract.java b/src/driver/src/main/java/com/edgedb/driver/pooling/PoolContract.java new file mode 100644 index 00000000..aedeed7f --- /dev/null +++ b/src/driver/src/main/java/com/edgedb/driver/pooling/PoolContract.java @@ -0,0 +1,42 @@ +package com.edgedb.driver.pooling; + +import com.edgedb.driver.clients.BaseEdgeDBClient; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +import java.util.function.Consumer; + +public class PoolContract implements AutoCloseable { + private final @NotNull ClientPool pool; + private final Consumer completer; + private @Nullable BaseEdgeDBClient client; + private @Nullable Consumer onComplete; + + public PoolContract(@NotNull ClientPool pool, Consumer completer) { + this.pool = pool; + this.completer = completer; + } + + public void register(BaseEdgeDBClient client, Consumer onComplete) { + this.client = client; + this.onComplete = onComplete; + } + + @Override + public void close() { + this.completer.accept(this); + + if(client != null && onComplete != null) { + onComplete.accept(client); + } + } + + @Override + public String toString() { + return String.format("Contract(client := %s|onComplete := %s)", client, onComplete); + } + + public @NotNull ClientPool getPool() { + return pool; + } +} diff --git a/src/driver/src/main/java/com/edgedb/driver/util/CleanerProvider.java b/src/driver/src/main/java/com/edgedb/driver/util/CleanerProvider.java new file mode 100644 index 00000000..526b4adb --- /dev/null +++ b/src/driver/src/main/java/com/edgedb/driver/util/CleanerProvider.java @@ -0,0 +1,12 @@ +package com.edgedb.driver.util; + +import java.lang.ref.Cleaner; + +public class CleanerProvider { + private static final Cleaner CLEANER = Cleaner.create(); + + public static Cleaner getCleaner() { + return CLEANER; + } + +} diff --git a/src/driver/src/main/java/com/edgedb/driver/util/ClientPoolHolder.java b/src/driver/src/main/java/com/edgedb/driver/util/ClientPoolHolder.java deleted file mode 100644 index 65a9069a..00000000 --- a/src/driver/src/main/java/com/edgedb/driver/util/ClientPoolHolder.java +++ /dev/null @@ -1,106 +0,0 @@ -package com.edgedb.driver.util; - -import com.edgedb.driver.clients.BaseEdgeDBClient; -import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Objects; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionStage; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Consumer; - -public final class ClientPoolHolder { - private static final Logger logger = LoggerFactory.getLogger(ClientPoolHolder.class); - - private long size; - private final @NotNull ConcurrentLinkedQueue> queue; - - private final @NotNull AtomicLong count; - - public ClientPoolHolder(int initialSize) { - this.size = initialSize; - this.queue = new ConcurrentLinkedQueue<>(); - this.count = new AtomicLong(size); - } - - public long remaining() { - return this.size - count.get(); - } - - public void resize(long newValue) { - if(newValue == this.size) { - return; - } - - count.getAndUpdate(v -> v + (newValue - size)); - this.size = newValue; - } - - public CompletionStage acquireContract() { - var c = count.decrementAndGet(); - - if(c < 0) { - count.compareAndSet(-1, 0); - - var promise = new CompletableFuture(); - - queue.add(promise); - - return promise.thenApply(v -> lendContract()); - } - else { - return CompletableFuture.completedFuture(lendContract()); - } - } - - private @NotNull PoolContract lendContract() { - return new PoolContract(this::completeContract); - } - - private void completeContract(PoolContract contract) { - logger.debug("Completing contract {}...", contract); - - if(queue.isEmpty()) { - logger.debug("Empty contract queue, incrementing count"); - count.incrementAndGet(); - return; - } - - - logger.debug("Polling queue and completing..."); - Objects.requireNonNull(queue.poll()).complete(null); - } - - public static class PoolContract implements AutoCloseable { - private final Consumer completer; - private @Nullable BaseEdgeDBClient client; - private @Nullable Consumer onComplete; - - private PoolContract(Consumer completer) { - this.completer = completer; - } - - public void register(BaseEdgeDBClient client, Consumer onComplete) { - this.client = client; - this.onComplete = onComplete; - } - - @Override - public void close() { - this.completer.accept(this); - - if(client != null && onComplete != null) { - onComplete.accept(client); - } - } - - @Override - public String toString() { - return String.format("Contract(client := %s|onComplete := %s)", client, onComplete); - } - } -} From 1d4a9bd36d5543691232165aa88cd8b3d9be65d3 Mon Sep 17 00:00:00 2001 From: Quin Lynch Date: Thu, 14 Dec 2023 14:41:48 -0400 Subject: [PATCH 2/2] add explicit GC cycle in examples --- .../main/java/com/edgedb/examples/Main.java | 13 ++++++----- .../main/kotlin/com/edgedb/examples/Main.kt | 3 +++ examples/scala-examples/build.sbt | 2 +- .../scala-examples/src/main/scala/Main.scala | 23 +++++++++++-------- 4 files changed, 25 insertions(+), 16 deletions(-) diff --git a/examples/java-examples/src/main/java/com/edgedb/examples/Main.java b/examples/java-examples/src/main/java/com/edgedb/examples/Main.java index 5f88ca08..36c5095f 100644 --- a/examples/java-examples/src/main/java/com/edgedb/examples/Main.java +++ b/examples/java-examples/src/main/java/com/edgedb/examples/Main.java @@ -13,17 +13,18 @@ public class Main { private static final Logger logger = LoggerFactory.getLogger(Main.class); public static void main(String[] args) throws Exception { - var client = new EdgeDBClient(EdgeDBClientConfig.builder() + try (var client = new EdgeDBClient(EdgeDBClientConfig.builder() .withNamingStrategy(NamingStrategy.snakeCase()) .useFieldSetters(true) .build() - ).withModule("examples"); + ).withModule("examples")) { + runJavaExamples(client); - runJavaExamples(client); - - logger.info("Examples complete"); + logger.info("Examples complete"); + } - client.close(); + // run a GC cycle to ensure that any remaining dormant client instances get collected and closed. + System.gc(); } private static void runJavaExamples(EdgeDBClient client) { diff --git a/examples/kotlin-examples/src/main/kotlin/com/edgedb/examples/Main.kt b/examples/kotlin-examples/src/main/kotlin/com/edgedb/examples/Main.kt index d7bff368..168e54fe 100644 --- a/examples/kotlin-examples/src/main/kotlin/com/edgedb/examples/Main.kt +++ b/examples/kotlin-examples/src/main/kotlin/com/edgedb/examples/Main.kt @@ -41,5 +41,8 @@ object Main { } } } + + // run a GC cycle to ensure that any remaining dormant client instances get collected and closed. + System.gc(); } } \ No newline at end of file diff --git a/examples/scala-examples/build.sbt b/examples/scala-examples/build.sbt index 90aa6c30..ec090c20 100644 --- a/examples/scala-examples/build.sbt +++ b/examples/scala-examples/build.sbt @@ -3,7 +3,7 @@ ThisBuild / version := "0.1.0-SNAPSHOT" ThisBuild / scalaVersion := "3.1.3" libraryDependencies ++= Seq( - "com.edgedb" % "driver" % "0.2.3" from "file:///" + System.getProperty("user.dir") + "/lib/com.edgedb.driver-0.2.3.jar", + "com.edgedb" % "driver" % "0.2.3" from "file:///" + System.getProperty("user.dir") + "/lib/com.edgedb.driver-0.4.3.jar", "ch.qos.logback" % "logback-classic" % "1.4.7", "ch.qos.logback" % "logback-core" % "1.4.7", "com.fasterxml.jackson.core" % "jackson-databind" % "2.15.1", diff --git a/examples/scala-examples/src/main/scala/Main.scala b/examples/scala-examples/src/main/scala/Main.scala index b2f546a7..e845d896 100644 --- a/examples/scala-examples/src/main/scala/Main.scala +++ b/examples/scala-examples/src/main/scala/Main.scala @@ -8,17 +8,12 @@ import scala.concurrent.duration.Duration import scala.concurrent.{Await, ExecutionContext, Future, blocking} import scala.util.control.NonFatal import ExecutionContext.Implicits.global +import scala.util.Using @main def main(): Unit = { val logger = LoggerFactory.getLogger("Main") - val client = new EdgeDBClient(EdgeDBClientConfig.builder - .withNamingStrategy(NamingStrategy.snakeCase) - .useFieldSetters(true) - .build - ).withModule("examples") - val examples = List( AbstractTypes(), BasicQueryFunctions(), @@ -28,10 +23,20 @@ def main(): Unit = { Transactions() ) - for (example <- examples) - Await.ready(runExample(logger, client, example), Duration.Inf) + Using( + new EdgeDBClient(EdgeDBClientConfig.builder + .withNamingStrategy(NamingStrategy.snakeCase) + .useFieldSetters(true) + .build + ).withModule("examples")) { client => + for (example <- examples) + Await.ready(runExample(logger, client, example), Duration.Inf) + + logger.info("Examples complete!") + } - logger.info("Examples complete!") + // run a GC cycle to ensure that any remaining dormant client instances get collected and closed. + System.gc(); } private def runExample(logger: Logger, client: EdgeDBClient, example: Example)(implicit context: ExecutionContext): Future[Unit] = {