Skip to content

Commit

Permalink
renamed to max connection pools and updated unit test.
Browse files Browse the repository at this point in the history
  • Loading branch information
jchenga committed Jan 4, 2025
1 parent 31e7ec2 commit e3e5578
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -394,9 +394,9 @@ interface AllocationStrategy<A extends AllocationStrategy<A>> {
final class Builder extends ConnectionPoolSpec<Builder> {

static final Duration DISPOSE_INACTIVE_POOLS_IN_BACKGROUND_DISABLED = Duration.ZERO;
static final int EXPECTED_CONNECTION_POOLS_DISABLED = -1;
static final int MAX_CONNECTION_POOLS = -1;

int expectedConnectionPools = EXPECTED_CONNECTION_POOLS_DISABLED;
int maxConnectionPools = MAX_CONNECTION_POOLS;

String name;
Duration inactivePoolDisposeInterval = DISPOSE_INACTIVE_POOLS_IN_BACKGROUND_DISABLED;
Expand All @@ -420,7 +420,7 @@ private Builder(String name) {
this.inactivePoolDisposeInterval = copy.inactivePoolDisposeInterval;
this.poolInactivity = copy.poolInactivity;
this.disposeTimeout = copy.disposeTimeout;
this.expectedConnectionPools = copy.expectedConnectionPools;
this.maxConnectionPools = copy.maxConnectionPools;
copy.confPerRemoteHost.forEach((address, spec) -> this.confPerRemoteHost.put(address, new ConnectionPoolSpec<>(spec)));
}

Expand Down Expand Up @@ -493,15 +493,15 @@ public final Builder forRemoteHost(SocketAddress remoteHost, Consumer<HostSpecif
}

/**
* Specifies the expected number of connection pools that the provider can create.
* Specifies the maximum number of connection pools that the provider can create.
* If the number of connection pools created exceeds this value, a warning message is logged.
* The value must be positive; otherwise, the connection pools check is ignored.
* The value must be positive or zero; otherwise, the connection pools check is ignored.
*
* @param expectedConnectionPools the number of connection pools expected to be created.
* @param maxConnectionPools the number of connection pools expected to be created.
* @return the current {@link Builder} instance with the updated configuration.
*/
public Builder expectedConnectionPools(int expectedConnectionPools) {
this.expectedConnectionPools = expectedConnectionPools;
public Builder maxConnectionPools(int maxConnectionPools) {
this.maxConnectionPools = maxConnectionPools;
return this;
}
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public abstract class PooledConnectionProvider<T extends Connection> implements
final Duration inactivePoolDisposeInterval;
final Duration poolInactivity;
final Duration disposeTimeout;
final int expectedConnectionPools;
final int maxConnectionPools;
final AtomicInteger connectionPoolCount = new AtomicInteger(0);
final Map<SocketAddress, Integer> maxConnections = new HashMap<>();
Mono<Void> onDispose;
Expand All @@ -108,7 +108,7 @@ protected PooledConnectionProvider(Builder builder) {
this.inactivePoolDisposeInterval = builder.inactivePoolDisposeInterval;
this.poolInactivity = builder.poolInactivity;
this.disposeTimeout = builder.disposeTimeout;
this.expectedConnectionPools = builder.expectedConnectionPools;
this.maxConnectionPools = builder.maxConnectionPools;
this.defaultPoolFactory = new PoolFactory<>(builder, builder.disposeTimeout, clock);
for (Map.Entry<SocketAddress, ConnectionPoolSpec<?>> entry : builder.confPerRemoteHost.entrySet()) {
poolFactoryPerRemoteHost.put(entry.getKey(), new PoolFactory<>(entry.getValue(), builder.disposeTimeout));
Expand Down Expand Up @@ -136,10 +136,10 @@ public final Mono<? extends Connection> acquire(
log.debug("Creating a new [{}] client pool [{}] for [{}]", name, poolFactory, remoteAddress);
}

if (expectedConnectionPools > Builder.EXPECTED_CONNECTION_POOLS_DISABLED && connectionPoolCount.incrementAndGet() > expectedConnectionPools) {
if (maxConnectionPools > Builder.MAX_CONNECTION_POOLS && connectionPoolCount.incrementAndGet() > maxConnectionPools) {
if (log.isWarnEnabled()) {
log.warn("Connection pool creation limit exceeded: {} pools created, maximum expected is {}", connectionPoolCount.get(),
expectedConnectionPools);
maxConnectionPools);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -618,83 +618,65 @@ void sslExchangeRelativeGet() throws SSLException {
assertThat(responseString).isEqualTo("hello /foo");
}

@Test
void expectedConnectionPoolsEnabled() throws SSLException {
ArgumentCaptor<String> argumentCaptor = ArgumentCaptor.forClass(String.class);
Logger spyLogger = Mockito.spy(log);
Loggers.useCustomLoggers(s -> spyLogger);

SslContext sslServer = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey())
.build();


disposableServer =
createServer()
.secure(ssl -> ssl.sslContext(sslServer))
.handle((req, resp) -> resp.sendString(Flux.just("hello ", req.uri())))
.bindNow();

ConnectionProvider connectionProvider = ConnectionProvider.builder("expected-connection-pool").expectedConnectionPools(1).build();

StepVerifier.create(
Flux.range(1, 2)
.flatMap(i -> createClient(connectionProvider, disposableServer::address)
.secure(ssl -> ssl.sslContext(createClientSslContext()))
.get()
.uri("/foo")
.responseContent()
.aggregate()
.asString()))
.thenConsumeWhile(s -> true)
.verifyComplete();

Loggers.resetLoggerFactory();


Mockito.verify(spyLogger).warn(argumentCaptor.capture(), Mockito.eq(2), Mockito.eq(1));
assertThat(argumentCaptor.getValue()).isEqualTo("Connection pool creation limit exceeded: {} pools created, maximum expected is {}");

connectionProvider.dispose();
disposableServer.dispose();

}

@Test
void expectedConnectionPoolsNotEnabled() throws SSLException {
Logger spyLogger = Mockito.spy(log);
Loggers.useCustomLoggers(s -> spyLogger);

SslContext sslServer = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey())
.build();


disposableServer =
createServer()
.secure(ssl -> ssl.sslContext(sslServer))
.handle((req, resp) -> resp.sendString(Flux.just("hello ", req.uri())))
.bindNow();
@ParameterizedTest
@ValueSource(booleans = {true, false})
void maxConnectionPools(boolean withMaxConnectionPools) throws SSLException {

ConnectionProvider connectionProvider = ConnectionProvider.builder("max-connection-pools").build();
try {
ArgumentCaptor<String> argumentCaptor = ArgumentCaptor.forClass(String.class);
Logger spyLogger = Mockito.spy(log);
Loggers.useCustomLoggers(s -> spyLogger);

StepVerifier.create(
Flux.range(1, 2)
.flatMap(i -> createClient(connectionProvider, disposableServer::address)
.secure(ssl -> ssl.sslContext(createClientSslContext()))
.get()
.uri("/foo")
.responseContent()
.aggregate()
.asString()))
.thenConsumeWhile(s -> true)
.verifyComplete();
SslContext sslServer = SslContextBuilder
.forServer(ssc.certificate(), ssc.privateKey())
.build();

Loggers.resetLoggerFactory();
disposableServer = createServer()
.secure(ssl -> ssl.sslContext(sslServer))
.handle((req, resp) -> resp.sendString(Flux.just("hello ", req.uri())))
.bindNow();

ConnectionProvider connectionProvider = withMaxConnectionPools ? ConnectionProvider
.builder("max-connection-pool")
.maxConnectionPools(1)
.build() : ConnectionProvider
.builder("max-connection-pools")
.build();

Mockito.verify(spyLogger, times(0)).warn(Mockito.eq("Connection pool creation limit exceeded: {} pools created, maximum expected is {}"), Mockito.eq(2), Mockito.eq(1));
StepVerifier
.create(Flux
.range(1, 2)
.flatMap(i -> createClient(connectionProvider, disposableServer::address)
.secure(ssl -> ssl.sslContext(createClientSslContext()))
.get()
.uri("/foo")
.responseContent()
.aggregate()
.asString()))
.thenConsumeWhile(s -> true)
.verifyComplete();

if (withMaxConnectionPools) {
Mockito
.verify(spyLogger)
.warn(argumentCaptor.capture(), Mockito.eq(2), Mockito.eq(1));
assertThat(argumentCaptor.getValue()).isEqualTo(
"Connection pool creation limit exceeded: {} pools created, maximum expected is {}");
}
else {
Mockito
.verify(spyLogger, times(0))
.warn(Mockito.eq(
"Connection pool creation limit exceeded: {} pools created, maximum expected is {}"),
Mockito.eq(2),
Mockito.eq(1));

connectionProvider.dispose();
disposableServer.dispose();
}
}
finally {
disposableServer.dispose();
Loggers.resetLoggerFactory();
}

}

Expand Down

0 comments on commit e3e5578

Please sign in to comment.