Skip to content

Commit

Permalink
Merge pull request kroxylicious#1074 from robobario/configurable-firs…
Browse files Browse the repository at this point in the history
…t-node-id

Port-per-broker Exposition: make lowest broker id configurable
  • Loading branch information
robobario authored Mar 8, 2024
2 parents 470fb42 + 89c672f commit 287b464
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 19 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ Please enumerate **all user-facing** changes using format `<githib issue/pr numb

## 0.5.0

* [#1074](https://github.com/kroxylicious/kroxylicious/pull/1074): Port-per-broker Exposition: make lowest broker id configurable
* [#1066](https://github.com/kroxylicious/kroxylicious/pull/1066): Log platform information on startup
* [#1050](https://github.com/kroxylicious/kroxylicious/pull/1050): Change AES GCM cipher to require a 256bit key
* [#1049](https://github.com/kroxylicious/kroxylicious/pull/1049): Add deprecated EnvelopeEncryption filter to ease migration to RecordEncryption filter
Expand Down
31 changes: 28 additions & 3 deletions docs/deploying.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ In the `PortPerBroker` scheme, Kroxylicious automatically opens a port for each
one port per broker of each target cluster. So, if you have two virtual clusters, each targeting a Kafka Cluster
of three brokers, Kroxylicious will bind eight ports in total.

`PortPerBroker` is designed to work best with simplistic configurations. It is preferable if the target cluster has
sequential, stable broker ids and a known minimum broker id (like 0,1,2 for a cluster of 3 brokers). It can work with
non-sequential broker ids, but you would have to expose `maxBrokerId - minBrokerId` ports, which could be a huge
number if your cluster included broker ids `0` and `20000`.

Kroxylicious monitors the broker topology of the target cluster at runtime. It will adjust the number of open ports
dynamically. If another broker is added to the Kafka Cluster, Kroxylicious will automatically open a port for it.
Similarly, if a broker is removed from the Kafka Cluster, Kroxylicious will automatically close the port that was
Expand All @@ -70,14 +75,16 @@ clusterNetworkAddressConfigProvider:
brokerAddressPattern: mybroker-$(nodeId).mycluster.kafka.com # <2>
brokerStartPort: 9193 # <3>
numberOfBrokerPorts: 3 # <4>
bindAddress: 192.168.0.1 # <5>
lowestTargetBrokerId: 1000 # <5>
bindAddress: 192.168.0.1 # <6>
----
<1> The hostname and port of the bootstrap that will be used by the Kafka Clients.
<2> (Optional) The broker address pattern used to form the broker addresses. If not defined, it defaults to the
hostname part of the `bootstrapAddress` and the port number allocated to the broker.
<3> (Optional) The starting number for broker port range. Defaults to the port of the `bootstrapAddress` plus 1.
<4> (Optional) The _maximum_ number of brokers of ports that are permitted. Defaults to 3.
<5> (Optional) The bind address used when binding the ports. If undefined, all network interfaces will be bound.
<5> (Optional) The lowest broker id of the target cluster. Defaults to 0. This should be the lowest https://kafka.apache.org/documentation/#brokerconfigs_node.id[`node.id`] (or https://kafka.apache.org/documentation/#brokerconfigs_broker.id[`broker.id`]) defined in the target cluster.
<6> (Optional) The bind address used when binding the ports. If undefined, all network interfaces will be bound.

The `brokerAddressPattern` configuration parameter understands the replacement token `$(nodeId)`. It is optional.
If present, it will be replaced by the https://kafka.apache.org/documentation/#brokerconfigs_node.id[`node.id`] (or
Expand All @@ -97,7 +104,25 @@ NOTE: It is a responsibility for the deployer of Kroxylicious to ensure that the
DNS names are resolvable and routable by the Kafka Client.

The `numberOfBrokerPorts` imposes a maximum on the number of brokers that a Kafka Cluster can have. Set this value
to be as high as the maximum number of brokers that your operational rules allow for a Kafka Cluster.
to be as high as the maximum number of brokers that your operational rules allow for a Kafka Cluster.

Note that each broker's id must be greater than or equal to `lowestTargetBrokerId`, and less than `lowestTargetBrokerId + numberOfBrokerPorts`.
The current strategy for mapping node ids to ports is `nodeId -> brokerStartPort + nodeId - lowestTargetBrokerId`. So a
configuration like:

[source, yaml]
----
clusterNetworkAddressConfigProvider:
type: PortPerBrokerClusterNetworkAddressConfigProvider
config:
bootstrapAddress: mycluster.kafka.com:9192
brokerStartPort: 9193
numberOfBrokerPorts: 3
lowestTargetBrokerId: 1000
----

can only map broker ids 1000, 1001 and 1002 to ports 9193, 9194 and 9195 respectively. You would have to reconfigure
`numberOfBrokerPorts` to accommodate new brokers being added to the cluster.

==== SniRouting scheme

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.stream.Stream;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
Expand Down Expand Up @@ -50,6 +51,7 @@
import io.kroxylicious.testing.kafka.common.BrokerCluster;
import io.kroxylicious.testing.kafka.common.KeytoolCertificateGenerator;
import io.kroxylicious.testing.kafka.common.SaslPlainAuth;
import io.kroxylicious.testing.kafka.common.ZooKeeperCluster;
import io.kroxylicious.testing.kafka.junit5ext.KafkaClusterExtension;

import edu.umd.cs.findbugs.annotations.NonNull;
Expand Down Expand Up @@ -441,6 +443,32 @@ void targetClusterDynamicallyAddsBroker(@BrokerCluster KafkaCluster cluster) thr
}
}

// we currently cannot influence the node ids, so we start a 2 node cluster and shutdown node 0
// cannot use KRaft as node 0 is a controller
@Test
void canConfigureLowestBrokerIdWithPortPerBroker(@ZooKeeperCluster @BrokerCluster(numBrokers = 2) KafkaCluster cluster, Admin admin) throws Exception {
cluster.removeBroker(0);
await().atMost(Duration.ofSeconds(5)).until(() -> admin.describeCluster().nodes().get(),
n -> n.size() == 1 && n.iterator().next().id() == 1);
var builder = new ConfigurationBuilder()
.addToVirtualClusters("demo", new VirtualClusterBuilder()
.withNewTargetCluster()
.withBootstrapServers(cluster.getBootstrapServers())
.endTargetCluster()
.withClusterNetworkAddressConfigProvider(
new ClusterNetworkAddressConfigProviderDefinitionBuilder(PortPerBrokerClusterNetworkAddressConfigProvider.class.getName())
.withConfig("bootstrapAddress", PROXY_ADDRESS)
.withConfig("lowestTargetBrokerId", 1)
.withConfig("numberOfBrokerPorts", 1)
.build())
.build());

try (var tester = kroxyliciousTester(builder)) {
assertThat(cluster.getNumOfBrokers()).isEqualTo(1);
verifyAllBrokersAvailableViaProxy(tester, cluster);
}
}

@Test
void targetClusterDynamicallyRemovesBroker(@BrokerCluster(numBrokers = 2) KafkaCluster cluster) throws Exception {
var builder = new ConfigurationBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public class PortPerBrokerClusterNetworkAddressConfigProvider implements Cluster
private final Set<Integer> exclusivePorts;
private final int brokerEndPortExclusive;
private final int numberOfBrokerPorts;
private final int lowestTargetBrokerId;

/**
* Creates the provider.
Expand All @@ -57,6 +58,7 @@ public PortPerBrokerClusterNetworkAddressConfigProvider(PortPerBrokerClusterNetw
this.brokerAddressPattern = config.brokerAddressPattern;
this.brokerStartPort = config.brokerStartPort;
this.numberOfBrokerPorts = config.numberOfBrokerPorts;
this.lowestTargetBrokerId = config.lowestTargetBrokerId;
this.brokerEndPortExclusive = brokerStartPort + numberOfBrokerPorts;

var exclusivePorts = IntStream.range(brokerStartPort, brokerEndPortExclusive).boxed().collect(Collectors.toCollection(HashSet::new));
Expand All @@ -70,10 +72,9 @@ public HostPort getClusterBootstrapAddress() {
}

@Override

public HostPort getBrokerAddress(int nodeId) throws IllegalArgumentException {
int port = brokerStartPort + nodeId;
if (port >= brokerEndPortExclusive) {
int port = brokerStartPort + nodeId - lowestTargetBrokerId;
if (port < brokerStartPort || port >= brokerEndPortExclusive) {
throw new IllegalArgumentException(
"Cannot generate broker address for node id %d as port %d would fall outside port range %d-%d that is defined for provider with downstream bootstrap %s)"
.formatted(
Expand All @@ -94,7 +95,8 @@ public Set<Integer> getExclusivePorts() {

@Override
public Map<Integer, HostPort> discoveryAddressMap() {
return IntStream.range(0, numberOfBrokerPorts).boxed().collect(Collectors.toMap(Function.identity(), this::getBrokerAddress));
return IntStream.range(lowestTargetBrokerId, lowestTargetBrokerId + numberOfBrokerPorts).boxed()
.collect(Collectors.toMap(Function.identity(), this::getBrokerAddress));
}

/**
Expand All @@ -104,17 +106,20 @@ public static class PortPerBrokerClusterNetworkAddressConfigProviderConfig {
private final HostPort bootstrapAddress;
private final String brokerAddressPattern;
private final int brokerStartPort;
private final int lowestTargetBrokerId;
private final int numberOfBrokerPorts;

public PortPerBrokerClusterNetworkAddressConfigProviderConfig(@JsonProperty(required = true) HostPort bootstrapAddress,
@JsonProperty(required = false) String brokerAddressPattern,
@JsonProperty(required = false) Integer brokerStartPort,
@JsonProperty(required = false, defaultValue = "0") Integer lowestTargetBrokerId,
@JsonProperty(required = false, defaultValue = "3") Integer numberOfBrokerPorts) {
Objects.requireNonNull(bootstrapAddress, "bootstrapAddress cannot be null");

this.bootstrapAddress = bootstrapAddress;
this.brokerAddressPattern = brokerAddressPattern != null ? brokerAddressPattern : bootstrapAddress.host();
this.brokerStartPort = brokerStartPort != null ? brokerStartPort : (bootstrapAddress.port() + 1);
this.lowestTargetBrokerId = lowestTargetBrokerId != null ? lowestTargetBrokerId : 0;
this.numberOfBrokerPorts = numberOfBrokerPorts != null ? numberOfBrokerPorts : 3;

if (this.brokerAddressPattern.isBlank()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,17 @@

package io.kroxylicious.proxy.internal.clusternetworkaddressconfigprovider;

import java.util.Map;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.EmptySource;
import org.junit.jupiter.params.provider.ValueSource;

import io.kroxylicious.proxy.service.HostPort;

import static io.kroxylicious.proxy.internal.clusternetworkaddressconfigprovider.PortPerBrokerClusterNetworkAddressConfigProvider.PortPerBrokerClusterNetworkAddressConfigProviderConfig;
import static io.kroxylicious.proxy.service.HostPort.parse;
import static org.assertj.core.api.Assertions.assertThat;
Expand All @@ -25,7 +29,7 @@ class PortPerBrokerClusterNetworkAddressConfigProviderTest {
void badBrokerPortDefinition(int brokerStartPort, int numberOfBrokerPorts) {
Assertions.assertThrows(IllegalArgumentException.class, () -> {
new PortPerBrokerClusterNetworkAddressConfigProviderConfig(parse("localhost:1235"),
"localhost", brokerStartPort, numberOfBrokerPorts);
"localhost", brokerStartPort, 0, numberOfBrokerPorts);
});
}

Expand All @@ -34,7 +38,7 @@ void badBrokerPortDefinition(int brokerStartPort, int numberOfBrokerPorts) {
void bootstrapBrokerAddressCollision(int brokerStartPort, int numberOfBrokerPorts) {
Assertions.assertThrows(IllegalArgumentException.class, () -> {
new PortPerBrokerClusterNetworkAddressConfigProviderConfig(parse("localhost:1235"),
"localhost", brokerStartPort, numberOfBrokerPorts);
"localhost", brokerStartPort, 0, numberOfBrokerPorts);
});
}

Expand All @@ -43,14 +47,14 @@ void bootstrapBrokerAddressCollision(int brokerStartPort, int numberOfBrokerPort
@EmptySource
void invalidBrokerAddressPatterns(String input) {
assertThrows(IllegalArgumentException.class,
() -> new PortPerBrokerClusterNetworkAddressConfigProviderConfig(parse("good:1235"), input, 1, 5));
() -> new PortPerBrokerClusterNetworkAddressConfigProviderConfig(parse("good:1235"), input, 1, 0, 5));
}

@ParameterizedTest
@ValueSource(strings = { "localhost", "127.0.0.1", "[2001:db8::1]", "kafka.example.com", "mybroker$(nodeId)", "mybroker$(nodeId).example.com",
"twice$(nodeId)allowed$(nodeId)too" })
void validBrokerAddressPatterns(String input) {
var config = new PortPerBrokerClusterNetworkAddressConfigProviderConfig(parse("good:1235"), input, 1, 5);
var config = new PortPerBrokerClusterNetworkAddressConfigProviderConfig(parse("good:1235"), input, 1, 0, 5);
assertThat(config).isNotNull();

}
Expand All @@ -59,7 +63,7 @@ void validBrokerAddressPatterns(String input) {
void portsExhausted() {
var provider = new PortPerBrokerClusterNetworkAddressConfigProvider(
new PortPerBrokerClusterNetworkAddressConfigProviderConfig(parse("localhost:1235"),
"localhost", 1236, 1));
"localhost", 1236, 0, 1));
assertThat(provider.getBrokerAddress(0)).isEqualTo(parse("localhost:1236"));
Assertions.assertThrows(IllegalArgumentException.class, () -> {
provider.getBrokerAddress(1);
Expand All @@ -69,37 +73,57 @@ void portsExhausted() {
@Test
void defaultsBrokerPatternBasedOnBootstrapHost() {
var provider = new PortPerBrokerClusterNetworkAddressConfigProvider(
new PortPerBrokerClusterNetworkAddressConfigProviderConfig(parse("mycluster:1235"), null, 1236, 1237));
new PortPerBrokerClusterNetworkAddressConfigProviderConfig(parse("mycluster:1235"), null, 1236, 0, 1237));
assertThat(provider.getClusterBootstrapAddress()).isEqualTo(parse("mycluster:1235"));
assertThat(provider.getBrokerAddress(0)).isEqualTo(parse("mycluster:1236"));
}

@Test
void defaultsBrokerStartPortBasedOnBootstrapPort() {
var provider = new PortPerBrokerClusterNetworkAddressConfigProvider(
new PortPerBrokerClusterNetworkAddressConfigProviderConfig(parse("mycluster:1235"), null, null, 1237));
new PortPerBrokerClusterNetworkAddressConfigProviderConfig(parse("mycluster:1235"), null, null, 0, 1237));
assertThat(provider.getClusterBootstrapAddress()).isEqualTo(parse("mycluster:1235"));
assertThat(provider.getBrokerAddress(0)).isEqualTo(parse("mycluster:1236"));
}

@Test
void defaultsNumberOfBrokerPorts() {
var provider = new PortPerBrokerClusterNetworkAddressConfigProvider(
new PortPerBrokerClusterNetworkAddressConfigProviderConfig(parse("mycluster:1235"), null, null, null));
new PortPerBrokerClusterNetworkAddressConfigProviderConfig(parse("mycluster:1235"), null, null, 0, null));
assertThat(provider.getClusterBootstrapAddress()).isEqualTo(parse("mycluster:1235"));
assertThat(provider.getBrokerAddress(0)).isEqualTo(parse("mycluster:1236"));
assertThat(provider.getBrokerAddress(1)).isEqualTo(parse("mycluster:1237"));
assertThat(provider.getBrokerAddress(2)).isEqualTo(parse("mycluster:1238"));
Map<Integer, HostPort> expectedDiscovery = Map.of(0, parse("mycluster:1236"), 1, parse("mycluster:1237"), 2, parse("mycluster:1238"));
assertThat(provider.discoveryAddressMap()).containsExactlyInAnyOrderEntriesOf(expectedDiscovery);
Assertions.assertThrows(IllegalArgumentException.class, () -> {
provider.getBrokerAddress(3);
});
}

@Test
void lowestTargetBrokerIdConfigurable() {
var provider = new PortPerBrokerClusterNetworkAddressConfigProvider(
new PortPerBrokerClusterNetworkAddressConfigProviderConfig(parse("mycluster:1235"), null, null, 2, null));
assertThat(provider.getClusterBootstrapAddress()).isEqualTo(parse("mycluster:1235"));
assertThat(provider.getBrokerAddress(2)).isEqualTo(parse("mycluster:1236"));
assertThat(provider.getBrokerAddress(3)).isEqualTo(parse("mycluster:1237"));
assertThat(provider.getBrokerAddress(4)).isEqualTo(parse("mycluster:1238"));
Map<Integer, HostPort> expectedDiscovery = Map.of(2, parse("mycluster:1236"), 3, parse("mycluster:1237"), 4, parse("mycluster:1238"));
assertThat(provider.discoveryAddressMap()).containsExactlyInAnyOrderEntriesOf(expectedDiscovery);
Assertions.assertThrows(IllegalArgumentException.class, () -> {
provider.getBrokerAddress(5);
});
Assertions.assertThrows(IllegalArgumentException.class, () -> {
provider.getBrokerAddress(1);
});
}

@Test
void definesExclusiveAndSharedCorrectly() {
var provider = new PortPerBrokerClusterNetworkAddressConfigProvider(
new PortPerBrokerClusterNetworkAddressConfigProviderConfig(parse("localhost:1235"),
"localhost", 1236, 2));
"localhost", 1236, 0, 2));
assertThat(provider.getExclusivePorts()).containsExactlyInAnyOrder(1235, 1236, 1237);
assertThat(provider.getSharedPorts()).isEmpty();
}
Expand All @@ -108,7 +132,7 @@ void definesExclusiveAndSharedCorrectly() {
void generatesBrokerAddresses() {
var provider = new PortPerBrokerClusterNetworkAddressConfigProvider(
new PortPerBrokerClusterNetworkAddressConfigProviderConfig(parse("localhost:1235"),
"localhost", 1236, 3));
"localhost", 1236, 0, 3));
assertThat(provider.getClusterBootstrapAddress()).isEqualTo(parse("localhost:1235"));
assertThat(provider.getBrokerAddress(0)).isEqualTo(parse("localhost:1236"));
assertThat(provider.getBrokerAddress(1)).isEqualTo(parse("localhost:1237"));
Expand All @@ -119,7 +143,7 @@ void fullyQualifiedHostNames() {
var provider = new PortPerBrokerClusterNetworkAddressConfigProvider(
new PortPerBrokerClusterNetworkAddressConfigProviderConfig(parse("cluster.kafka.example.com:1235"),
"broker.kafka.example.com", 1236,
1238));
0, 1238));
assertThat(provider.getClusterBootstrapAddress()).isEqualTo(parse("cluster.kafka.example.com:1235"));
assertThat(provider.getBrokerAddress(0)).isEqualTo(parse("broker.kafka.example.com:1236"));
assertThat(provider.getBrokerAddress(1)).isEqualTo(parse("broker.kafka.example.com:1237"));
Expand All @@ -130,7 +154,7 @@ void fullyQualifiedHostNamesWithNodeInterpolation() {
var provider = new PortPerBrokerClusterNetworkAddressConfigProvider(
new PortPerBrokerClusterNetworkAddressConfigProviderConfig(parse("cluster.kafka.example.com:1235"),
"broker$(nodeId).kafka.example.com",
1236, 1238));
1236, 0, 1238));
assertThat(provider.getClusterBootstrapAddress()).isEqualTo(parse("cluster.kafka.example.com:1235"));
assertThat(provider.getBrokerAddress(0)).isEqualTo(parse("broker0.kafka.example.com:1236"));
assertThat(provider.getBrokerAddress(1)).isEqualTo(parse("broker1.kafka.example.com:1237"));
Expand Down

0 comments on commit 287b464

Please sign in to comment.