Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Configuration for setting maximum number of connection pools to be created #3566

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,9 @@ interface AllocationStrategy<A extends AllocationStrategy<A>> {
final class Builder extends ConnectionPoolSpec<Builder> {

static final Duration DISPOSE_INACTIVE_POOLS_IN_BACKGROUND_DISABLED = Duration.ZERO;
static final int MAX_CONNECTION_POOLS = -1;

int maxConnectionPools = MAX_CONNECTION_POOLS;

String name;
Duration inactivePoolDisposeInterval = DISPOSE_INACTIVE_POOLS_IN_BACKGROUND_DISABLED;
Expand All @@ -417,6 +420,7 @@ private Builder(String name) {
this.inactivePoolDisposeInterval = copy.inactivePoolDisposeInterval;
this.poolInactivity = copy.poolInactivity;
this.disposeTimeout = copy.disposeTimeout;
this.maxConnectionPools = copy.maxConnectionPools;
copy.confPerRemoteHost.forEach((address, spec) -> this.confPerRemoteHost.put(address, new ConnectionPoolSpec<>(spec)));
}

Expand Down Expand Up @@ -488,6 +492,18 @@ public final Builder forRemoteHost(SocketAddress remoteHost, Consumer<HostSpecif
return this;
}

/**
* Specifies the maximum number of connection pools that the provider can create.
* If the number of connection pools created exceeds this value, a warning message is logged.
* The value must be positive or zero; otherwise, the connection pools check is ignored.
jchenga marked this conversation as resolved.
Show resolved Hide resolved
*
* @param maxConnectionPools the number of connection pools expected to be created.
* @return the current {@link Builder} instance with the updated configuration.
*/
public Builder maxConnectionPools(int maxConnectionPools) {
this.maxConnectionPools = maxConnectionPools;
jchenga marked this conversation as resolved.
Show resolved Hide resolved
return this;
}
/**
* Builds new ConnectionProvider.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.resolver.AddressResolverGroup;
import java.util.concurrent.atomic.AtomicInteger;
import org.reactivestreams.Publisher;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
Expand Down Expand Up @@ -91,6 +92,8 @@ public abstract class PooledConnectionProvider<T extends Connection> implements
final Duration inactivePoolDisposeInterval;
final Duration poolInactivity;
final Duration disposeTimeout;
final int maxConnectionPools;
final AtomicInteger connectionPoolCount = new AtomicInteger(0);
final Map<SocketAddress, Integer> maxConnections = new HashMap<>();
Mono<Void> onDispose;

Expand All @@ -105,6 +108,7 @@ protected PooledConnectionProvider(Builder builder) {
this.inactivePoolDisposeInterval = builder.inactivePoolDisposeInterval;
this.poolInactivity = builder.poolInactivity;
this.disposeTimeout = builder.disposeTimeout;
this.maxConnectionPools = builder.maxConnectionPools;
this.defaultPoolFactory = new PoolFactory<>(builder, builder.disposeTimeout, clock);
for (Map.Entry<SocketAddress, ConnectionPoolSpec<?>> entry : builder.confPerRemoteHost.entrySet()) {
poolFactoryPerRemoteHost.put(entry.getKey(), new PoolFactory<>(entry.getValue(), builder.disposeTimeout));
Expand Down Expand Up @@ -132,6 +136,13 @@ public final Mono<? extends Connection> acquire(
log.debug("Creating a new [{}] client pool [{}] for [{}]", name, poolFactory, remoteAddress);
}

if (maxConnectionPools > Builder.MAX_CONNECTION_POOLS && connectionPoolCount.incrementAndGet() > maxConnectionPools) {
if (log.isWarnEnabled()) {
log.warn("Connection pool creation limit exceeded: {} pools created, maximum expected is {}", connectionPoolCount.get(),
maxConnectionPools);
}
}

boolean metricsEnabled = poolFactory.metricsEnabled || config.metricsRecorder() != null;
String id = metricsEnabled ? poolKey.hashCode() + "" : null;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2011-2024 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2011-2025 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -103,6 +103,8 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;

Expand All @@ -125,6 +127,7 @@
import reactor.netty.http.Http2SslContextSpec;
import reactor.netty.http.HttpProtocol;
import reactor.netty.http.HttpResources;
import reactor.netty.http.client.HttpClient.ResponseReceiver;
import reactor.netty.http.server.HttpServer;
import reactor.netty.resources.ConnectionPoolMetrics;
import reactor.netty.resources.ConnectionProvider;
Expand All @@ -148,6 +151,7 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.assertj.core.api.Assumptions.assumeThat;
import static org.mockito.Mockito.times;

/**
* This test class verifies {@link HttpClient}.
Expand Down Expand Up @@ -614,6 +618,79 @@ void sslExchangeRelativeGet() throws SSLException {
assertThat(responseString).isEqualTo("hello /foo");
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
void maxConnectionPools(boolean withMaxConnectionPools) throws SSLException {

try {
ArgumentCaptor<String> argumentCaptor = ArgumentCaptor.forClass(String.class);
Logger spyLogger = Mockito.spy(log);
Loggers.useCustomLoggers(s -> spyLogger);

SslContext sslServer = SslContextBuilder
.forServer(ssc.certificate(), ssc.privateKey())
.build();

disposableServer = createServer()
.secure(ssl -> ssl.sslContext(sslServer))
.handle((req, resp) -> resp.sendString(Flux.just("hello ", req.uri())))
.bindNow();

ConnectionProvider connectionProvider = withMaxConnectionPools ? ConnectionProvider
.builder("max-connection-pools")
.maxConnectionPools(1)
.build() : ConnectionProvider
.builder("max-connection-pools")
.build();

StepVerifier
.create(Flux
.range(1, 2)
.flatMap(i -> createClient(connectionProvider, disposableServer::address)
.secure(ssl -> ssl.sslContext(createClientSslContext()))
.get()
.uri("/foo")
.responseContent()
.aggregate()
.asString()))
.thenConsumeWhile(s -> true)
.verifyComplete();

if (withMaxConnectionPools) {
Mockito
.verify(spyLogger)
.warn(argumentCaptor.capture(), Mockito.eq(2), Mockito.eq(1));
assertThat(argumentCaptor.getValue()).isEqualTo(
"Connection pool creation limit exceeded: {} pools created, maximum expected is {}");
}
else {
Mockito
.verify(spyLogger, times(0))
.warn(Mockito.eq(
"Connection pool creation limit exceeded: {} pools created, maximum expected is {}"),
Mockito.eq(2),
Mockito.eq(1));

}
}
finally {
disposableServer.dispose();
jchenga marked this conversation as resolved.
Show resolved Hide resolved
Loggers.resetLoggerFactory();
}

}

private SslContext createClientSslContext() {
try {
return SslContextBuilder.forClient()
.trustManager(InsecureTrustManagerFactory.INSTANCE)
.build();
}
catch (SSLException e) {
throw new RuntimeException(e);
}
}

@Test
void sslExchangeAbsoluteGet() throws SSLException {
SslContext sslServer = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
Expand Down
Loading