Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Configuration for setting maximum number of connection pools to be created #3566

Merged
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018-2023 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2018-2024 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -394,6 +394,9 @@ interface AllocationStrategy<A extends AllocationStrategy<A>> {
final class Builder extends ConnectionPoolSpec<Builder> {

static final Duration DISPOSE_INACTIVE_POOLS_IN_BACKGROUND_DISABLED = Duration.ZERO;
static final int EXPECTED_CONNECTION_POOLS_DISABLED = -1;
jchenga marked this conversation as resolved.
Show resolved Hide resolved

int expectedConnectionPools = EXPECTED_CONNECTION_POOLS_DISABLED;
jchenga marked this conversation as resolved.
Show resolved Hide resolved

String name;
Duration inactivePoolDisposeInterval = DISPOSE_INACTIVE_POOLS_IN_BACKGROUND_DISABLED;
Expand All @@ -417,6 +420,7 @@ private Builder(String name) {
this.inactivePoolDisposeInterval = copy.inactivePoolDisposeInterval;
this.poolInactivity = copy.poolInactivity;
this.disposeTimeout = copy.disposeTimeout;
this.expectedConnectionPools = copy.expectedConnectionPools;
jchenga marked this conversation as resolved.
Show resolved Hide resolved
copy.confPerRemoteHost.forEach((address, spec) -> this.confPerRemoteHost.put(address, new ConnectionPoolSpec<>(spec)));
}

Expand Down Expand Up @@ -488,6 +492,18 @@ public final Builder forRemoteHost(SocketAddress remoteHost, Consumer<HostSpecif
return this;
}

/**
* Specifies the expected number of connection pools that the provider can create.
* If the number of connection pools created exceeds this value, a warning message is logged.
* The value must be positive; otherwise, the connection pools check is ignored.
*
* @param expectedConnectionPools the number of connection pools expected to be created.
* @return the current {@link Builder} instance with the updated configuration.
*/
public Builder expectedConnectionPools(int expectedConnectionPools) {
this.expectedConnectionPools = expectedConnectionPools;
jchenga marked this conversation as resolved.
Show resolved Hide resolved
return this;
}
/**
* Builds new ConnectionProvider.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.resolver.AddressResolverGroup;
import java.util.concurrent.atomic.AtomicInteger;
import org.reactivestreams.Publisher;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
Expand Down Expand Up @@ -90,6 +91,8 @@ public abstract class PooledConnectionProvider<T extends Connection> implements
final Duration inactivePoolDisposeInterval;
final Duration poolInactivity;
final Duration disposeTimeout;
final int expectedConnectionPools;
jchenga marked this conversation as resolved.
Show resolved Hide resolved
final AtomicInteger connectionPoolCount = new AtomicInteger(0);
final Map<SocketAddress, Integer> maxConnections = new HashMap<>();
Mono<Void> onDispose;

Expand All @@ -104,6 +107,7 @@ protected PooledConnectionProvider(Builder builder) {
this.inactivePoolDisposeInterval = builder.inactivePoolDisposeInterval;
this.poolInactivity = builder.poolInactivity;
this.disposeTimeout = builder.disposeTimeout;
this.expectedConnectionPools = builder.expectedConnectionPools;
jchenga marked this conversation as resolved.
Show resolved Hide resolved
this.defaultPoolFactory = new PoolFactory<>(builder, builder.disposeTimeout, clock);
for (Map.Entry<SocketAddress, ConnectionPoolSpec<?>> entry : builder.confPerRemoteHost.entrySet()) {
poolFactoryPerRemoteHost.put(entry.getKey(), new PoolFactory<>(entry.getValue(), builder.disposeTimeout));
Expand Down Expand Up @@ -131,6 +135,13 @@ public final Mono<? extends Connection> acquire(
log.debug("Creating a new [{}] client pool [{}] for [{}]", name, poolFactory, remoteAddress);
}

if (expectedConnectionPools > Builder.EXPECTED_CONNECTION_POOLS_DISABLED && connectionPoolCount.incrementAndGet() > expectedConnectionPools) {
if (log.isWarnEnabled()) {
log.warn("Connection pool creation limit exceeded: {} pools created, maximum expected is {}", connectionPoolCount.get(),
expectedConnectionPools);
jchenga marked this conversation as resolved.
Show resolved Hide resolved
}
}

boolean metricsEnabled = poolFactory.metricsEnabled || config.metricsRecorder() != null;
String id = metricsEnabled ? poolKey.hashCode() + "" : null;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
violetagg marked this conversation as resolved.
Show resolved Hide resolved
import org.mockito.Mockito;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
Expand All @@ -119,6 +122,7 @@
import reactor.netty.http.Http2SslContextSpec;
import reactor.netty.http.HttpProtocol;
import reactor.netty.http.HttpResources;
import reactor.netty.http.client.HttpClient.ResponseReceiver;
import reactor.netty.http.server.HttpServer;
import reactor.netty.resources.ConnectionPoolMetrics;
import reactor.netty.resources.ConnectionProvider;
Expand All @@ -142,6 +146,7 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.assertj.core.api.Assumptions.assumeThat;
import static org.mockito.Mockito.times;

/**
* This test class verifies {@link HttpClient}.
Expand Down Expand Up @@ -608,6 +613,97 @@ void sslExchangeRelativeGet() throws SSLException {
assertThat(responseString).isEqualTo("hello /foo");
}

@Test
jchenga marked this conversation as resolved.
Show resolved Hide resolved
void expectedConnectionPoolsEnabled() throws SSLException {
ArgumentCaptor<String> argumentCaptor = ArgumentCaptor.forClass(String.class);
Logger spyLogger = Mockito.spy(log);
Loggers.useCustomLoggers(s -> spyLogger);

SslContext sslServer = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey())
.build();


disposableServer =
createServer()
.secure(ssl -> ssl.sslContext(sslServer))
.handle((req, resp) -> resp.sendString(Flux.just("hello ", req.uri())))
.bindNow();

ConnectionProvider connectionProvider = ConnectionProvider.builder("expected-connection-pool").expectedConnectionPools(1).build();
jchenga marked this conversation as resolved.
Show resolved Hide resolved

StepVerifier.create(
Flux.range(1, 2)
.flatMap(i -> createClient(connectionProvider, disposableServer::address)
.secure(ssl -> ssl.sslContext(createClientSslContext()))
.get()
.uri("/foo")
.responseContent()
.aggregate()
.asString()))
.thenConsumeWhile(s -> true)
.verifyComplete();

Loggers.resetLoggerFactory();
jchenga marked this conversation as resolved.
Show resolved Hide resolved


Mockito.verify(spyLogger).warn(argumentCaptor.capture(), Mockito.eq(2), Mockito.eq(1));
assertThat(argumentCaptor.getValue()).isEqualTo("Connection pool creation limit exceeded: {} pools created, maximum expected is {}");

connectionProvider.dispose();
disposableServer.dispose();

}

@Test
void expectedConnectionPoolsNotEnabled() throws SSLException {
Logger spyLogger = Mockito.spy(log);
Loggers.useCustomLoggers(s -> spyLogger);

SslContext sslServer = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey())
.build();


disposableServer =
createServer()
.secure(ssl -> ssl.sslContext(sslServer))
.handle((req, resp) -> resp.sendString(Flux.just("hello ", req.uri())))
.bindNow();

ConnectionProvider connectionProvider = ConnectionProvider.builder("expected-connection-pool").build();
jchenga marked this conversation as resolved.
Show resolved Hide resolved

StepVerifier.create(
Flux.range(1, 2)
.flatMap(i -> createClient(connectionProvider, disposableServer::address)
.secure(ssl -> ssl.sslContext(createClientSslContext()))
.get()
.uri("/foo")
.responseContent()
.aggregate()
.asString()))
.thenConsumeWhile(s -> true)
.verifyComplete();

Loggers.resetLoggerFactory();


Mockito.verify(spyLogger, times(0)).warn(Mockito.eq("Connection pool creation limit exceeded: {} pools created, maximum expected is {}"), Mockito.eq(2), Mockito.eq(1));

connectionProvider.dispose();
disposableServer.dispose();

}

private SslContext createClientSslContext() {
try {
return SslContextBuilder.forClient()
.trustManager(InsecureTrustManagerFactory.INSTANCE)
.build();
}
catch (SSLException e) {
throw new RuntimeException(e);
}
}

@Test
void sslExchangeAbsoluteGet() throws SSLException {
SslContext sslServer = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
Expand Down
Loading