diff --git a/CHANGELOG.md b/CHANGELOG.md index c2b2d1bee4..8e420aa507 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ Please enumerate **all user-facing** changes using format ` brokerStartPort: 9193 # <3> numberOfBrokerPorts: 3 # <4> - bindAddress: 192.168.0.1 # <5> + lowestTargetBrokerId: 1000 # <5> + bindAddress: 192.168.0.1 # <6> ---- <1> The hostname and port of the bootstrap that will be used by the Kafka Clients. <2> (Optional) The broker address pattern used to form the broker addresses. If not defined, it defaults to the hostname part of the `bootstrapAddress` and the port number allocated to the broker. <3> (Optional) The starting number for broker port range. Defaults to the port of the `bootstrapAddress` plus 1. <4> (Optional) The _maximum_ number of brokers of ports that are permitted. Defaults to 3. -<5> (Optional) The bind address used when binding the ports. If undefined, all network interfaces will be bound. +<5> (Optional) The lowest broker id of the target cluster. Defaults to 0. This should be the lowest https://kafka.apache.org/documentation/#brokerconfigs_node.id[`node.id`] (or https://kafka.apache.org/documentation/#brokerconfigs_broker.id[`broker.id`]) defined in the target cluster. +<6> (Optional) The bind address used when binding the ports. If undefined, all network interfaces will be bound. The `brokerAddressPattern` configuration parameter understands the replacement token `$(nodeId)`. It is optional. If present, it will be replaced by the https://kafka.apache.org/documentation/#brokerconfigs_node.id[`node.id`] (or @@ -97,7 +104,25 @@ NOTE: It is a responsibility for the deployer of Kroxylicious to ensure that the DNS names are resolvable and routable by the Kafka Client. The `numberOfBrokerPorts` imposes a maximum on the number of brokers that a Kafka Cluster can have. Set this value -to be as high as the maximum number of brokers that your operational rules allow for a Kafka Cluster. +to be as high as the maximum number of brokers that your operational rules allow for a Kafka Cluster. + +Note that each broker's id must be greater than or equal to `lowestTargetBrokerId`, and less than `lowestTargetBrokerId + numberOfBrokerPorts`. +The current strategy for mapping node ids to ports is `nodeId -> brokerStartPort + nodeId - lowestTargetBrokerId`. So a +configuration like: + +[source, yaml] +---- +clusterNetworkAddressConfigProvider: + type: PortPerBrokerClusterNetworkAddressConfigProvider + config: + bootstrapAddress: mycluster.kafka.com:9192 + brokerStartPort: 9193 + numberOfBrokerPorts: 3 + lowestTargetBrokerId: 1000 +---- + +can only map broker ids 1000, 1001 and 1002 to ports 9193, 9194 and 9195 respectively. You would have to reconfigure +`numberOfBrokerPorts` to accommodate new brokers being added to the cluster. ==== SniRouting scheme diff --git a/kroxylicious-integration-tests/src/test/java/io/kroxylicious/proxy/ExpositionIT.java b/kroxylicious-integration-tests/src/test/java/io/kroxylicious/proxy/ExpositionIT.java index ac7f96d775..0b6d5069fc 100644 --- a/kroxylicious-integration-tests/src/test/java/io/kroxylicious/proxy/ExpositionIT.java +++ b/kroxylicious-integration-tests/src/test/java/io/kroxylicious/proxy/ExpositionIT.java @@ -18,6 +18,7 @@ import java.util.stream.Stream; import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; @@ -50,6 +51,7 @@ import io.kroxylicious.testing.kafka.common.BrokerCluster; import io.kroxylicious.testing.kafka.common.KeytoolCertificateGenerator; import io.kroxylicious.testing.kafka.common.SaslPlainAuth; +import io.kroxylicious.testing.kafka.common.ZooKeeperCluster; import io.kroxylicious.testing.kafka.junit5ext.KafkaClusterExtension; import edu.umd.cs.findbugs.annotations.NonNull; @@ -441,6 +443,32 @@ void targetClusterDynamicallyAddsBroker(@BrokerCluster KafkaCluster cluster) thr } } + // we currently cannot influence the node ids, so we start a 2 node cluster and shutdown node 0 + // cannot use KRaft as node 0 is a controller + @Test + void canConfigureLowestBrokerIdWithPortPerBroker(@ZooKeeperCluster @BrokerCluster(numBrokers = 2) KafkaCluster cluster, Admin admin) throws Exception { + cluster.removeBroker(0); + await().atMost(Duration.ofSeconds(5)).until(() -> admin.describeCluster().nodes().get(), + n -> n.size() == 1 && n.iterator().next().id() == 1); + var builder = new ConfigurationBuilder() + .addToVirtualClusters("demo", new VirtualClusterBuilder() + .withNewTargetCluster() + .withBootstrapServers(cluster.getBootstrapServers()) + .endTargetCluster() + .withClusterNetworkAddressConfigProvider( + new ClusterNetworkAddressConfigProviderDefinitionBuilder(PortPerBrokerClusterNetworkAddressConfigProvider.class.getName()) + .withConfig("bootstrapAddress", PROXY_ADDRESS) + .withConfig("lowestTargetBrokerId", 1) + .withConfig("numberOfBrokerPorts", 1) + .build()) + .build()); + + try (var tester = kroxyliciousTester(builder)) { + assertThat(cluster.getNumOfBrokers()).isEqualTo(1); + verifyAllBrokersAvailableViaProxy(tester, cluster); + } + } + @Test void targetClusterDynamicallyRemovesBroker(@BrokerCluster(numBrokers = 2) KafkaCluster cluster) throws Exception { var builder = new ConfigurationBuilder() diff --git a/kroxylicious-runtime/src/main/java/io/kroxylicious/proxy/internal/clusternetworkaddressconfigprovider/PortPerBrokerClusterNetworkAddressConfigProvider.java b/kroxylicious-runtime/src/main/java/io/kroxylicious/proxy/internal/clusternetworkaddressconfigprovider/PortPerBrokerClusterNetworkAddressConfigProvider.java index 599e7b21ff..994438659e 100644 --- a/kroxylicious-runtime/src/main/java/io/kroxylicious/proxy/internal/clusternetworkaddressconfigprovider/PortPerBrokerClusterNetworkAddressConfigProvider.java +++ b/kroxylicious-runtime/src/main/java/io/kroxylicious/proxy/internal/clusternetworkaddressconfigprovider/PortPerBrokerClusterNetworkAddressConfigProvider.java @@ -46,6 +46,7 @@ public class PortPerBrokerClusterNetworkAddressConfigProvider implements Cluster private final Set exclusivePorts; private final int brokerEndPortExclusive; private final int numberOfBrokerPorts; + private final int lowestTargetBrokerId; /** * Creates the provider. @@ -57,6 +58,7 @@ public PortPerBrokerClusterNetworkAddressConfigProvider(PortPerBrokerClusterNetw this.brokerAddressPattern = config.brokerAddressPattern; this.brokerStartPort = config.brokerStartPort; this.numberOfBrokerPorts = config.numberOfBrokerPorts; + this.lowestTargetBrokerId = config.lowestTargetBrokerId; this.brokerEndPortExclusive = brokerStartPort + numberOfBrokerPorts; var exclusivePorts = IntStream.range(brokerStartPort, brokerEndPortExclusive).boxed().collect(Collectors.toCollection(HashSet::new)); @@ -70,10 +72,9 @@ public HostPort getClusterBootstrapAddress() { } @Override - public HostPort getBrokerAddress(int nodeId) throws IllegalArgumentException { - int port = brokerStartPort + nodeId; - if (port >= brokerEndPortExclusive) { + int port = brokerStartPort + nodeId - lowestTargetBrokerId; + if (port < brokerStartPort || port >= brokerEndPortExclusive) { throw new IllegalArgumentException( "Cannot generate broker address for node id %d as port %d would fall outside port range %d-%d that is defined for provider with downstream bootstrap %s)" .formatted( @@ -94,7 +95,8 @@ public Set getExclusivePorts() { @Override public Map discoveryAddressMap() { - return IntStream.range(0, numberOfBrokerPorts).boxed().collect(Collectors.toMap(Function.identity(), this::getBrokerAddress)); + return IntStream.range(lowestTargetBrokerId, lowestTargetBrokerId + numberOfBrokerPorts).boxed() + .collect(Collectors.toMap(Function.identity(), this::getBrokerAddress)); } /** @@ -104,17 +106,20 @@ public static class PortPerBrokerClusterNetworkAddressConfigProviderConfig { private final HostPort bootstrapAddress; private final String brokerAddressPattern; private final int brokerStartPort; + private final int lowestTargetBrokerId; private final int numberOfBrokerPorts; public PortPerBrokerClusterNetworkAddressConfigProviderConfig(@JsonProperty(required = true) HostPort bootstrapAddress, @JsonProperty(required = false) String brokerAddressPattern, @JsonProperty(required = false) Integer brokerStartPort, + @JsonProperty(required = false, defaultValue = "0") Integer lowestTargetBrokerId, @JsonProperty(required = false, defaultValue = "3") Integer numberOfBrokerPorts) { Objects.requireNonNull(bootstrapAddress, "bootstrapAddress cannot be null"); this.bootstrapAddress = bootstrapAddress; this.brokerAddressPattern = brokerAddressPattern != null ? brokerAddressPattern : bootstrapAddress.host(); this.brokerStartPort = brokerStartPort != null ? brokerStartPort : (bootstrapAddress.port() + 1); + this.lowestTargetBrokerId = lowestTargetBrokerId != null ? lowestTargetBrokerId : 0; this.numberOfBrokerPorts = numberOfBrokerPorts != null ? numberOfBrokerPorts : 3; if (this.brokerAddressPattern.isBlank()) { diff --git a/kroxylicious-runtime/src/test/java/io/kroxylicious/proxy/internal/clusternetworkaddressconfigprovider/PortPerBrokerClusterNetworkAddressConfigProviderTest.java b/kroxylicious-runtime/src/test/java/io/kroxylicious/proxy/internal/clusternetworkaddressconfigprovider/PortPerBrokerClusterNetworkAddressConfigProviderTest.java index e68a1f4868..b66832b9e1 100644 --- a/kroxylicious-runtime/src/test/java/io/kroxylicious/proxy/internal/clusternetworkaddressconfigprovider/PortPerBrokerClusterNetworkAddressConfigProviderTest.java +++ b/kroxylicious-runtime/src/test/java/io/kroxylicious/proxy/internal/clusternetworkaddressconfigprovider/PortPerBrokerClusterNetworkAddressConfigProviderTest.java @@ -6,6 +6,8 @@ package io.kroxylicious.proxy.internal.clusternetworkaddressconfigprovider; +import java.util.Map; + import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -13,6 +15,8 @@ import org.junit.jupiter.params.provider.EmptySource; import org.junit.jupiter.params.provider.ValueSource; +import io.kroxylicious.proxy.service.HostPort; + import static io.kroxylicious.proxy.internal.clusternetworkaddressconfigprovider.PortPerBrokerClusterNetworkAddressConfigProvider.PortPerBrokerClusterNetworkAddressConfigProviderConfig; import static io.kroxylicious.proxy.service.HostPort.parse; import static org.assertj.core.api.Assertions.assertThat; @@ -25,7 +29,7 @@ class PortPerBrokerClusterNetworkAddressConfigProviderTest { void badBrokerPortDefinition(int brokerStartPort, int numberOfBrokerPorts) { Assertions.assertThrows(IllegalArgumentException.class, () -> { new PortPerBrokerClusterNetworkAddressConfigProviderConfig(parse("localhost:1235"), - "localhost", brokerStartPort, numberOfBrokerPorts); + "localhost", brokerStartPort, 0, numberOfBrokerPorts); }); } @@ -34,7 +38,7 @@ void badBrokerPortDefinition(int brokerStartPort, int numberOfBrokerPorts) { void bootstrapBrokerAddressCollision(int brokerStartPort, int numberOfBrokerPorts) { Assertions.assertThrows(IllegalArgumentException.class, () -> { new PortPerBrokerClusterNetworkAddressConfigProviderConfig(parse("localhost:1235"), - "localhost", brokerStartPort, numberOfBrokerPorts); + "localhost", brokerStartPort, 0, numberOfBrokerPorts); }); } @@ -43,14 +47,14 @@ void bootstrapBrokerAddressCollision(int brokerStartPort, int numberOfBrokerPort @EmptySource void invalidBrokerAddressPatterns(String input) { assertThrows(IllegalArgumentException.class, - () -> new PortPerBrokerClusterNetworkAddressConfigProviderConfig(parse("good:1235"), input, 1, 5)); + () -> new PortPerBrokerClusterNetworkAddressConfigProviderConfig(parse("good:1235"), input, 1, 0, 5)); } @ParameterizedTest @ValueSource(strings = { "localhost", "127.0.0.1", "[2001:db8::1]", "kafka.example.com", "mybroker$(nodeId)", "mybroker$(nodeId).example.com", "twice$(nodeId)allowed$(nodeId)too" }) void validBrokerAddressPatterns(String input) { - var config = new PortPerBrokerClusterNetworkAddressConfigProviderConfig(parse("good:1235"), input, 1, 5); + var config = new PortPerBrokerClusterNetworkAddressConfigProviderConfig(parse("good:1235"), input, 1, 0, 5); assertThat(config).isNotNull(); } @@ -59,7 +63,7 @@ void validBrokerAddressPatterns(String input) { void portsExhausted() { var provider = new PortPerBrokerClusterNetworkAddressConfigProvider( new PortPerBrokerClusterNetworkAddressConfigProviderConfig(parse("localhost:1235"), - "localhost", 1236, 1)); + "localhost", 1236, 0, 1)); assertThat(provider.getBrokerAddress(0)).isEqualTo(parse("localhost:1236")); Assertions.assertThrows(IllegalArgumentException.class, () -> { provider.getBrokerAddress(1); @@ -69,7 +73,7 @@ void portsExhausted() { @Test void defaultsBrokerPatternBasedOnBootstrapHost() { var provider = new PortPerBrokerClusterNetworkAddressConfigProvider( - new PortPerBrokerClusterNetworkAddressConfigProviderConfig(parse("mycluster:1235"), null, 1236, 1237)); + new PortPerBrokerClusterNetworkAddressConfigProviderConfig(parse("mycluster:1235"), null, 1236, 0, 1237)); assertThat(provider.getClusterBootstrapAddress()).isEqualTo(parse("mycluster:1235")); assertThat(provider.getBrokerAddress(0)).isEqualTo(parse("mycluster:1236")); } @@ -77,7 +81,7 @@ void defaultsBrokerPatternBasedOnBootstrapHost() { @Test void defaultsBrokerStartPortBasedOnBootstrapPort() { var provider = new PortPerBrokerClusterNetworkAddressConfigProvider( - new PortPerBrokerClusterNetworkAddressConfigProviderConfig(parse("mycluster:1235"), null, null, 1237)); + new PortPerBrokerClusterNetworkAddressConfigProviderConfig(parse("mycluster:1235"), null, null, 0, 1237)); assertThat(provider.getClusterBootstrapAddress()).isEqualTo(parse("mycluster:1235")); assertThat(provider.getBrokerAddress(0)).isEqualTo(parse("mycluster:1236")); } @@ -85,21 +89,41 @@ void defaultsBrokerStartPortBasedOnBootstrapPort() { @Test void defaultsNumberOfBrokerPorts() { var provider = new PortPerBrokerClusterNetworkAddressConfigProvider( - new PortPerBrokerClusterNetworkAddressConfigProviderConfig(parse("mycluster:1235"), null, null, null)); + new PortPerBrokerClusterNetworkAddressConfigProviderConfig(parse("mycluster:1235"), null, null, 0, null)); assertThat(provider.getClusterBootstrapAddress()).isEqualTo(parse("mycluster:1235")); assertThat(provider.getBrokerAddress(0)).isEqualTo(parse("mycluster:1236")); assertThat(provider.getBrokerAddress(1)).isEqualTo(parse("mycluster:1237")); assertThat(provider.getBrokerAddress(2)).isEqualTo(parse("mycluster:1238")); + Map expectedDiscovery = Map.of(0, parse("mycluster:1236"), 1, parse("mycluster:1237"), 2, parse("mycluster:1238")); + assertThat(provider.discoveryAddressMap()).containsExactlyInAnyOrderEntriesOf(expectedDiscovery); Assertions.assertThrows(IllegalArgumentException.class, () -> { provider.getBrokerAddress(3); }); } + @Test + void lowestTargetBrokerIdConfigurable() { + var provider = new PortPerBrokerClusterNetworkAddressConfigProvider( + new PortPerBrokerClusterNetworkAddressConfigProviderConfig(parse("mycluster:1235"), null, null, 2, null)); + assertThat(provider.getClusterBootstrapAddress()).isEqualTo(parse("mycluster:1235")); + assertThat(provider.getBrokerAddress(2)).isEqualTo(parse("mycluster:1236")); + assertThat(provider.getBrokerAddress(3)).isEqualTo(parse("mycluster:1237")); + assertThat(provider.getBrokerAddress(4)).isEqualTo(parse("mycluster:1238")); + Map expectedDiscovery = Map.of(2, parse("mycluster:1236"), 3, parse("mycluster:1237"), 4, parse("mycluster:1238")); + assertThat(provider.discoveryAddressMap()).containsExactlyInAnyOrderEntriesOf(expectedDiscovery); + Assertions.assertThrows(IllegalArgumentException.class, () -> { + provider.getBrokerAddress(5); + }); + Assertions.assertThrows(IllegalArgumentException.class, () -> { + provider.getBrokerAddress(1); + }); + } + @Test void definesExclusiveAndSharedCorrectly() { var provider = new PortPerBrokerClusterNetworkAddressConfigProvider( new PortPerBrokerClusterNetworkAddressConfigProviderConfig(parse("localhost:1235"), - "localhost", 1236, 2)); + "localhost", 1236, 0, 2)); assertThat(provider.getExclusivePorts()).containsExactlyInAnyOrder(1235, 1236, 1237); assertThat(provider.getSharedPorts()).isEmpty(); } @@ -108,7 +132,7 @@ void definesExclusiveAndSharedCorrectly() { void generatesBrokerAddresses() { var provider = new PortPerBrokerClusterNetworkAddressConfigProvider( new PortPerBrokerClusterNetworkAddressConfigProviderConfig(parse("localhost:1235"), - "localhost", 1236, 3)); + "localhost", 1236, 0, 3)); assertThat(provider.getClusterBootstrapAddress()).isEqualTo(parse("localhost:1235")); assertThat(provider.getBrokerAddress(0)).isEqualTo(parse("localhost:1236")); assertThat(provider.getBrokerAddress(1)).isEqualTo(parse("localhost:1237")); @@ -119,7 +143,7 @@ void fullyQualifiedHostNames() { var provider = new PortPerBrokerClusterNetworkAddressConfigProvider( new PortPerBrokerClusterNetworkAddressConfigProviderConfig(parse("cluster.kafka.example.com:1235"), "broker.kafka.example.com", 1236, - 1238)); + 0, 1238)); assertThat(provider.getClusterBootstrapAddress()).isEqualTo(parse("cluster.kafka.example.com:1235")); assertThat(provider.getBrokerAddress(0)).isEqualTo(parse("broker.kafka.example.com:1236")); assertThat(provider.getBrokerAddress(1)).isEqualTo(parse("broker.kafka.example.com:1237")); @@ -130,7 +154,7 @@ void fullyQualifiedHostNamesWithNodeInterpolation() { var provider = new PortPerBrokerClusterNetworkAddressConfigProvider( new PortPerBrokerClusterNetworkAddressConfigProviderConfig(parse("cluster.kafka.example.com:1235"), "broker$(nodeId).kafka.example.com", - 1236, 1238)); + 1236, 0, 1238)); assertThat(provider.getClusterBootstrapAddress()).isEqualTo(parse("cluster.kafka.example.com:1235")); assertThat(provider.getBrokerAddress(0)).isEqualTo(parse("broker0.kafka.example.com:1236")); assertThat(provider.getBrokerAddress(1)).isEqualTo(parse("broker1.kafka.example.com:1237"));