From 963dd20b81d61b925fa3412791277254f912e2b9 Mon Sep 17 00:00:00 2001 From: Jack Cheng Date: Wed, 8 Jan 2025 01:54:58 -0800 Subject: [PATCH] Configuration for setting maximum number of connection pools to be created (#3566) Introduce a configuration option to specify the maximum number of connection pools a ConnectionProvider should create. Once the maximum number is exceeded, a warning message is logged. Fixes #3318 Co-authored-by: Violeta Georgieva --- .../netty/resources/ConnectionProvider.java | 19 ++++ .../resources/PooledConnectionProvider.java | 11 +++ .../netty/http/client/HttpClientTest.java | 92 ++++++++++++++++++- 3 files changed, 121 insertions(+), 1 deletion(-) diff --git a/reactor-netty-core/src/main/java/reactor/netty/resources/ConnectionProvider.java b/reactor-netty-core/src/main/java/reactor/netty/resources/ConnectionProvider.java index 6a3dd1f6ed..a135340403 100644 --- a/reactor-netty-core/src/main/java/reactor/netty/resources/ConnectionProvider.java +++ b/reactor-netty-core/src/main/java/reactor/netty/resources/ConnectionProvider.java @@ -394,6 +394,9 @@ interface AllocationStrategy> { final class Builder extends ConnectionPoolSpec { static final Duration DISPOSE_INACTIVE_POOLS_IN_BACKGROUND_DISABLED = Duration.ZERO; + static final int MAX_CONNECTION_POOLS = -1; + + int maxConnectionPools = MAX_CONNECTION_POOLS; String name; Duration inactivePoolDisposeInterval = DISPOSE_INACTIVE_POOLS_IN_BACKGROUND_DISABLED; @@ -417,6 +420,7 @@ private Builder(String name) { this.inactivePoolDisposeInterval = copy.inactivePoolDisposeInterval; this.poolInactivity = copy.poolInactivity; this.disposeTimeout = copy.disposeTimeout; + this.maxConnectionPools = copy.maxConnectionPools; copy.confPerRemoteHost.forEach((address, spec) -> this.confPerRemoteHost.put(address, new ConnectionPoolSpec<>(spec))); } @@ -488,6 +492,21 @@ public final Builder forRemoteHost(SocketAddress remoteHost, Consumer implements final Duration inactivePoolDisposeInterval; final Duration poolInactivity; final Duration disposeTimeout; + final int maxConnectionPools; + final AtomicInteger connectionPoolCount = new AtomicInteger(0); final Map maxConnections = new HashMap<>(); Mono onDispose; @@ -105,6 +108,7 @@ protected PooledConnectionProvider(Builder builder) { this.inactivePoolDisposeInterval = builder.inactivePoolDisposeInterval; this.poolInactivity = builder.poolInactivity; this.disposeTimeout = builder.disposeTimeout; + this.maxConnectionPools = builder.maxConnectionPools; this.defaultPoolFactory = new PoolFactory<>(builder, builder.disposeTimeout, clock); for (Map.Entry> entry : builder.confPerRemoteHost.entrySet()) { poolFactoryPerRemoteHost.put(entry.getKey(), new PoolFactory<>(entry.getValue(), builder.disposeTimeout)); @@ -132,6 +136,13 @@ public final Mono acquire( log.debug("Creating a new [{}] client pool [{}] for [{}]", name, poolFactory, remoteAddress); } + if (maxConnectionPools > Builder.MAX_CONNECTION_POOLS && connectionPoolCount.incrementAndGet() > maxConnectionPools) { + if (log.isWarnEnabled()) { + log.warn("Connection pool creation limit exceeded: {} pools created, maximum expected is {}", connectionPoolCount.get(), + maxConnectionPools); + } + } + boolean metricsEnabled = poolFactory.metricsEnabled || config.metricsRecorder() != null; String id = metricsEnabled ? poolKey.hashCode() + "" : null; diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/client/HttpClientTest.java b/reactor-netty-http/src/test/java/reactor/netty/http/client/HttpClientTest.java index d1edc37fe9..c6e6b7c2b3 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/http/client/HttpClientTest.java +++ b/reactor-netty-http/src/test/java/reactor/netty/http/client/HttpClientTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2011-2024 VMware, Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2011-2025 VMware, Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -103,6 +103,8 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; @@ -125,6 +127,7 @@ import reactor.netty.http.Http2SslContextSpec; import reactor.netty.http.HttpProtocol; import reactor.netty.http.HttpResources; +import reactor.netty.http.client.HttpClient.ResponseReceiver; import reactor.netty.http.server.HttpServer; import reactor.netty.resources.ConnectionPoolMetrics; import reactor.netty.resources.ConnectionProvider; @@ -147,7 +150,9 @@ import static io.netty.handler.codec.http.HttpHeaderValues.GZIP; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; import static org.assertj.core.api.Assumptions.assumeThat; +import static org.mockito.Mockito.times; /** * This test class verifies {@link HttpClient}. @@ -614,6 +619,91 @@ void sslExchangeRelativeGet() throws SSLException { assertThat(responseString).isEqualTo("hello /foo"); } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testMaxConnectionPools(boolean withMaxConnectionPools) throws SSLException { + Logger spyLogger = Mockito.spy(log); + Loggers.useCustomLoggers(s -> spyLogger); + + ConnectionProvider connectionProvider = withMaxConnectionPools ? ConnectionProvider + .builder("max-connection-pools") + .maxConnectionPools(1) + .build() : ConnectionProvider + .builder("max-connection-pools") + .build(); + + try { + ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(String.class); + + SslContext sslServer = SslContextBuilder + .forServer(ssc.certificate(), ssc.privateKey()) + .build(); + + disposableServer = createServer() + .secure(ssl -> ssl.sslContext(sslServer)) + .handle((req, resp) -> resp.sendString(Flux.just("hello ", req.uri()))) + .bindNow(); + + + + StepVerifier + .create(Flux + .range(1, 2) + .flatMap(i -> createClient(connectionProvider, disposableServer::address) + .secure(ssl -> ssl.sslContext(createClientSslContext())) + .get() + .uri("/foo") + .responseContent() + .aggregate() + .asString())) + .thenConsumeWhile(s -> true) + .verifyComplete(); + + if (withMaxConnectionPools) { + Mockito + .verify(spyLogger) + .warn(argumentCaptor.capture(), Mockito.eq(2), Mockito.eq(1)); + assertThat(argumentCaptor.getValue()).isEqualTo( + "Connection pool creation limit exceeded: {} pools created, maximum expected is {}"); + } + else { + Mockito + .verify(spyLogger, times(0)) + .warn(Mockito.eq( + "Connection pool creation limit exceeded: {} pools created, maximum expected is {}"), + Mockito.eq(2), + Mockito.eq(1)); + + } + } + finally { + Loggers.resetLoggerFactory(); + connectionProvider.dispose(); + } + + } + + @ParameterizedTest + @ValueSource(ints = {0, -2}) + void testInvalidMaxConnectionPoolsSetting(int maxConnectionPools) { + + assertThatIllegalArgumentException().isThrownBy(() -> ConnectionProvider + .builder("max-connection-pools") + .maxConnectionPools(maxConnectionPools)); + + } + + private SslContext createClientSslContext() { + try { + return SslContextBuilder.forClient() + .trustManager(InsecureTrustManagerFactory.INSTANCE) + .build(); + } + catch (SSLException e) { + throw new RuntimeException(e); + } + } + @Test void sslExchangeAbsoluteGet() throws SSLException { SslContext sslServer = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();