Skip to content

Commit

Permalink
Configuration for setting maximum number of connection pools to be cr…
Browse files Browse the repository at this point in the history
…eated (#3566)

Introduce a configuration option to specify the maximum number of connection pools a ConnectionProvider should create. Once the maximum number is exceeded, a warning message is logged.

Fixes #3318

Co-authored-by: Violeta Georgieva <[email protected]>
  • Loading branch information
jchenga and violetagg authored Jan 8, 2025
1 parent e39854d commit 963dd20
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,9 @@ interface AllocationStrategy<A extends AllocationStrategy<A>> {
final class Builder extends ConnectionPoolSpec<Builder> {

static final Duration DISPOSE_INACTIVE_POOLS_IN_BACKGROUND_DISABLED = Duration.ZERO;
static final int MAX_CONNECTION_POOLS = -1;

int maxConnectionPools = MAX_CONNECTION_POOLS;

String name;
Duration inactivePoolDisposeInterval = DISPOSE_INACTIVE_POOLS_IN_BACKGROUND_DISABLED;
Expand All @@ -417,6 +420,7 @@ private Builder(String name) {
this.inactivePoolDisposeInterval = copy.inactivePoolDisposeInterval;
this.poolInactivity = copy.poolInactivity;
this.disposeTimeout = copy.disposeTimeout;
this.maxConnectionPools = copy.maxConnectionPools;
copy.confPerRemoteHost.forEach((address, spec) -> this.confPerRemoteHost.put(address, new ConnectionPoolSpec<>(spec)));
}

Expand Down Expand Up @@ -488,6 +492,21 @@ public final Builder forRemoteHost(SocketAddress remoteHost, Consumer<HostSpecif
return this;
}

/**
* Specifies the maximum number of connection pools that the provider can create.
* If the number of connection pools created exceeds this value, a warning message is logged.
* The value must be strictly positive or -1; otherwise, the connection pools check is ignored.
* Setting the configuration to -1 disables the setting.
* @param maxConnectionPools the maximum number of connection pools that can be created.
* @return the current {@link Builder} instance with the updated configuration.
*/
public Builder maxConnectionPools(int maxConnectionPools) {
if (maxConnectionPools != MAX_CONNECTION_POOLS && maxConnectionPools <= 0) {
throw new IllegalArgumentException("Maximum connection pools setting must be strictly positive.");
}
this.maxConnectionPools = maxConnectionPools;
return this;
}
/**
* Builds new ConnectionProvider.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.resolver.AddressResolverGroup;
import java.util.concurrent.atomic.AtomicInteger;
import org.reactivestreams.Publisher;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
Expand Down Expand Up @@ -91,6 +92,8 @@ public abstract class PooledConnectionProvider<T extends Connection> implements
final Duration inactivePoolDisposeInterval;
final Duration poolInactivity;
final Duration disposeTimeout;
final int maxConnectionPools;
final AtomicInteger connectionPoolCount = new AtomicInteger(0);
final Map<SocketAddress, Integer> maxConnections = new HashMap<>();
Mono<Void> onDispose;

Expand All @@ -105,6 +108,7 @@ protected PooledConnectionProvider(Builder builder) {
this.inactivePoolDisposeInterval = builder.inactivePoolDisposeInterval;
this.poolInactivity = builder.poolInactivity;
this.disposeTimeout = builder.disposeTimeout;
this.maxConnectionPools = builder.maxConnectionPools;
this.defaultPoolFactory = new PoolFactory<>(builder, builder.disposeTimeout, clock);
for (Map.Entry<SocketAddress, ConnectionPoolSpec<?>> entry : builder.confPerRemoteHost.entrySet()) {
poolFactoryPerRemoteHost.put(entry.getKey(), new PoolFactory<>(entry.getValue(), builder.disposeTimeout));
Expand Down Expand Up @@ -132,6 +136,13 @@ public final Mono<? extends Connection> acquire(
log.debug("Creating a new [{}] client pool [{}] for [{}]", name, poolFactory, remoteAddress);
}

if (maxConnectionPools > Builder.MAX_CONNECTION_POOLS && connectionPoolCount.incrementAndGet() > maxConnectionPools) {
if (log.isWarnEnabled()) {
log.warn("Connection pool creation limit exceeded: {} pools created, maximum expected is {}", connectionPoolCount.get(),
maxConnectionPools);
}
}

boolean metricsEnabled = poolFactory.metricsEnabled || config.metricsRecorder() != null;
String id = metricsEnabled ? poolKey.hashCode() + "" : null;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2011-2024 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2011-2025 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -103,6 +103,8 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;

Expand All @@ -125,6 +127,7 @@
import reactor.netty.http.Http2SslContextSpec;
import reactor.netty.http.HttpProtocol;
import reactor.netty.http.HttpResources;
import reactor.netty.http.client.HttpClient.ResponseReceiver;
import reactor.netty.http.server.HttpServer;
import reactor.netty.resources.ConnectionPoolMetrics;
import reactor.netty.resources.ConnectionProvider;
Expand All @@ -147,7 +150,9 @@
import static io.netty.handler.codec.http.HttpHeaderValues.GZIP;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
import static org.assertj.core.api.Assumptions.assumeThat;
import static org.mockito.Mockito.times;

/**
* This test class verifies {@link HttpClient}.
Expand Down Expand Up @@ -614,6 +619,91 @@ void sslExchangeRelativeGet() throws SSLException {
assertThat(responseString).isEqualTo("hello /foo");
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
void testMaxConnectionPools(boolean withMaxConnectionPools) throws SSLException {
Logger spyLogger = Mockito.spy(log);
Loggers.useCustomLoggers(s -> spyLogger);

ConnectionProvider connectionProvider = withMaxConnectionPools ? ConnectionProvider
.builder("max-connection-pools")
.maxConnectionPools(1)
.build() : ConnectionProvider
.builder("max-connection-pools")
.build();

try {
ArgumentCaptor<String> argumentCaptor = ArgumentCaptor.forClass(String.class);

SslContext sslServer = SslContextBuilder
.forServer(ssc.certificate(), ssc.privateKey())
.build();

disposableServer = createServer()
.secure(ssl -> ssl.sslContext(sslServer))
.handle((req, resp) -> resp.sendString(Flux.just("hello ", req.uri())))
.bindNow();



StepVerifier
.create(Flux
.range(1, 2)
.flatMap(i -> createClient(connectionProvider, disposableServer::address)
.secure(ssl -> ssl.sslContext(createClientSslContext()))
.get()
.uri("/foo")
.responseContent()
.aggregate()
.asString()))
.thenConsumeWhile(s -> true)
.verifyComplete();

if (withMaxConnectionPools) {
Mockito
.verify(spyLogger)
.warn(argumentCaptor.capture(), Mockito.eq(2), Mockito.eq(1));
assertThat(argumentCaptor.getValue()).isEqualTo(
"Connection pool creation limit exceeded: {} pools created, maximum expected is {}");
}
else {
Mockito
.verify(spyLogger, times(0))
.warn(Mockito.eq(
"Connection pool creation limit exceeded: {} pools created, maximum expected is {}"),
Mockito.eq(2),
Mockito.eq(1));

}
}
finally {
Loggers.resetLoggerFactory();
connectionProvider.dispose();
}

}

@ParameterizedTest
@ValueSource(ints = {0, -2})
void testInvalidMaxConnectionPoolsSetting(int maxConnectionPools) {

assertThatIllegalArgumentException().isThrownBy(() -> ConnectionProvider
.builder("max-connection-pools")
.maxConnectionPools(maxConnectionPools));

}

private SslContext createClientSslContext() {
try {
return SslContextBuilder.forClient()
.trustManager(InsecureTrustManagerFactory.INSTANCE)
.build();
}
catch (SSLException e) {
throw new RuntimeException(e);
}
}

@Test
void sslExchangeAbsoluteGet() throws SSLException {
SslContext sslServer = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
Expand Down

0 comments on commit 963dd20

Please sign in to comment.