Skip to content

Commit

Permalink
Add an anonymous listener for the extension to ensure it can always d…
Browse files Browse the repository at this point in the history
…ial into the cluster

Signed-off-by: Sam Barker <[email protected]>
  • Loading branch information
SamBarker committed Feb 16, 2023
1 parent af8b4cf commit b92cf1b
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,10 @@ public Stream<ConfigHolder> getBrokerConfigs(Supplier<KafkaEndpoints> endPointCo

var interBrokerEndpoint = kafkaEndpoints.getInterBrokerEndpoint(brokerNum);
var clientEndpoint = kafkaEndpoints.getClientEndpoint(brokerNum);
var anonEndpoint = kafkaEndpoints.getAnonEndpoint(brokerNum);

// - EXTERNAL: used for communications to/from consumers/producers
// - EXTERNAL: used for communications to/from consumers/producers optionally with authentication
// - ANON: used for communications to/from consumers/producers without authentication primarily for the extension to validte the cluster
// - INTERNAL: used for inter-broker communications (always no auth)
// - CONTROLLER: used for inter-broker controller communications (kraft - always no auth)

Expand Down Expand Up @@ -272,8 +274,8 @@ public Stream<ConfigHolder> getBrokerConfigs(Supplier<KafkaEndpoints> endPointCo
// Disable delay during every re-balance
putConfig(server, "group.initial.rebalance.delay.ms", Integer.toString(0));

properties.add(new ConfigHolder(server, clientEndpoint.getConnect().getPort(),
String.format("%s:%d", clientEndpoint.getConnect().getHost(), clientEndpoint.getConnect().getPort()), brokerNum, kafkaKraftClusterId));
properties.add(new ConfigHolder(server, clientEndpoint.getConnect().getPort(), anonEndpoint.getConnect().getPort(),
clientEndpoint.connectAddress(), brokerNum, kafkaKraftClusterId));
}

return properties.stream();
Expand All @@ -291,6 +293,11 @@ public String buildClientBootstrapServers(KafkaEndpoints endPointConfig) {
return buildBootstrapServers(getBrokersNum(), endPointConfig::getClientEndpoint);
}

@NotNull
public String buildAnonBootstrapServers(KafkaEndpoints endPointConfig) {
return buildBootstrapServers(getBrokersNum(), endPointConfig::getAnonEndpoint);
}

@NotNull
public String buildControllerBootstrapServers(KafkaEndpoints kafkaEndpoints) {
return buildBootstrapServers(getBrokersNum(), kafkaEndpoints::getControllerEndpoint);
Expand All @@ -301,26 +308,32 @@ public String buildInterBrokerBootstrapServers(KafkaEndpoints kafkaEndpoints) {
return buildBootstrapServers(getBrokersNum(), kafkaEndpoints::getInterBrokerEndpoint);
}

public Map<String, Object> getAnonConnectConfigForCluster(KafkaEndpoints kafkaEndpoints) {
return getConnectConfigForCluster(buildAnonBootstrapServers(kafkaEndpoints), null, null, null, null);
}

public Map<String, Object> getConnectConfigForCluster(String bootstrapServers) {
if (saslMechanism != null) {
Map<String, String> users = getUsers();
if (!users.isEmpty()) {
Map.Entry<String, String> first = users.entrySet().iterator().next();
return getConnectConfigForCluster(bootstrapServers, first.getKey(), first.getKey());
return getConnectConfigForCluster(bootstrapServers, first.getKey(), first.getKey(), getSecurityProtocol(), getSaslMechanism());
}
else {
return getConnectConfigForCluster(bootstrapServers, null, null);
return getConnectConfigForCluster(bootstrapServers, null, null, getSecurityProtocol(), getSaslMechanism());
}
}
else {
return getConnectConfigForCluster(bootstrapServers, null, null);
return getConnectConfigForCluster(bootstrapServers, null, null, getSecurityProtocol(), getSaslMechanism());
}
}

public Map<String, Object> getConnectConfigForCluster(String bootstrapServers, String user, String password) {
return getConnectConfigForCluster(bootstrapServers, user, password, getSecurityProtocol(), getSaslMechanism());
}

public Map<String, Object> getConnectConfigForCluster(String bootstrapServers, String user, String password, String securityProtocol, String saslMechanism) {
Map<String, Object> kafkaConfig = new HashMap<>();
String saslMechanism = getSaslMechanism();
String securityProtocol = getSecurityProtocol();

if (securityProtocol != null) {
kafkaConfig.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol);
Expand Down Expand Up @@ -408,6 +421,7 @@ private String buildBootstrapServers(Integer numBrokers, IntFunction<KafkaEndpoi
public static class ConfigHolder {
private final Properties properties;
private final Integer externalPort;
private final Integer anonPort;
private final String endpoint;
private final int brokerNum;
private final String kafkaKraftClusterId;
Expand All @@ -428,14 +442,15 @@ public String connectAddress() {
public String listenAddress() {
return String.format("//%s:%d", bind.host, bind.port);
}

public String advertisedAddress() {
return String.format("//%s:%d", connect.host, connect.port);
}
}

@Builder
@Getter
public class Endpoint {
class Endpoint {
private final String host;
private final int port;

Expand All @@ -455,5 +470,7 @@ public String toString() {
EndpointPair getControllerEndpoint(int brokerId);

EndpointPair getClientEndpoint(int brokerId);

EndpointPair getAnonEndpoint(int brokerId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,18 +51,18 @@ public static Stream<Integer> preAllocateListeningPorts(int num) {
}

public static void ensureExpectedBrokerCountInCluster(Map<String, Object> connectionConfig, int timeout, TimeUnit timeUnit, Integer expectedBrokerCount) {
try(Admin admin = Admin.create(connectionConfig)) {
try (Admin admin = Admin.create(connectionConfig)) {
Awaitility.await()
.pollDelay(Duration.ZERO)
.pollInterval(1, TimeUnit.SECONDS)
.atMost(timeout, timeUnit)
.ignoreExceptions()
.until(() -> {
log.debug("describing cluster: {}", connectionConfig.get("bootstrap.servers"));
log.info("describing cluster: {}", connectionConfig.get("bootstrap.servers"));
final Collection<Node> nodes;
try {
nodes = admin.describeCluster().nodes().get(10, TimeUnit.SECONDS);
log.debug("got nodes: {}", nodes);
log.info("got nodes: {}", nodes);
return nodes;
}
catch (InterruptedException | ExecutionException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Comparator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
Expand Down Expand Up @@ -54,15 +53,22 @@ public InVMKafkaCluster(KafkaClusterConfig clusterConfig) {
tempDirectory = Files.createTempDirectory("kafka");
tempDirectory.toFile().deleteOnExit();

// kraft mode: per-broker: 1 external port + 1 inter-broker port + 1 controller port
// zk mode: per-cluster: 1 zk port; per-broker: 1 external port + 1 inter-broker port
var numPorts = clusterConfig.getBrokersNum() * (clusterConfig.isKraftMode() ? 3 : 2) + (clusterConfig.isKraftMode() ? 0 : 1);
LinkedList<Integer> ports = Utils.preAllocateListeningPorts(numPorts).collect(Collectors.toCollection(LinkedList::new));
// kraft mode: per-broker: 1 external port + 1 inter-broker port + 1 controller port + 1 anon port
// zk mode: per-cluster: 1 zk port; per-broker: 1 external port + 1 inter-broker port + 1 anon port
final List<Integer> externalPorts = Utils.preAllocateListeningPorts(clusterConfig.getBrokersNum()).collect(Collectors.toUnmodifiableList());
final List<Integer> anonPorts = Utils.preAllocateListeningPorts(clusterConfig.getBrokersNum()).collect(Collectors.toUnmodifiableList());
final List<Integer> interBrokerPorts = Utils.preAllocateListeningPorts(clusterConfig.getBrokersNum()).collect(Collectors.toUnmodifiableList());
final List<Integer> controllerPorts;
if (clusterConfig.isKraftMode()) {
controllerPorts = Utils.preAllocateListeningPorts(clusterConfig.getBrokersNum()).collect(Collectors.toUnmodifiableList());
}
else {
controllerPorts = Utils.preAllocateListeningPorts(1).collect(Collectors.toUnmodifiableList());
}

final Supplier<KafkaClusterConfig.KafkaEndpoints.Endpoint> zookeeperEndpointSupplier;
if (!clusterConfig.isKraftMode()) {
var zookeeperPort = ports.pop();

final Integer zookeeperPort = controllerPorts.get(0);
zooFactory = ServerCnxnFactory.createFactory(new InetSocketAddress("localhost", zookeeperPort), 1024);

var zoo = tempDirectory.resolve("zoo");
Expand All @@ -80,28 +86,33 @@ public InVMKafkaCluster(KafkaClusterConfig clusterConfig) {
zookeeperEndpointSupplier = null;
}
kafkaEndpoints = new KafkaClusterConfig.KafkaEndpoints() {
final List<Integer> clientPorts = ports.subList(0, clusterConfig.getBrokersNum());
final List<Integer> interBrokerPorts = ports.subList(clusterConfig.getBrokersNum(), 2 * clusterConfig.getBrokersNum());
final List<Integer> controllerPorts = ports.subList(clusterConfig.getBrokersNum() * 2, ports.size());

@Override
public EndpointPair getClientEndpoint(int brokerId) {
var port = clientPorts.get(brokerId);
return EndpointPair.builder().bind(new Endpoint("0.0.0.0", port)).connect(new Endpoint("localhost", port)).build();
return buildEndpointPair(externalPorts, brokerId);
}

@Override
public EndpointPair getAnonEndpoint(int brokerId) {
return buildEndpointPair(anonPorts, brokerId);
}

@Override
public EndpointPair getInterBrokerEndpoint(int brokerId) {
var port = interBrokerPorts.get(brokerId);
return EndpointPair.builder().bind(new Endpoint("0.0.0.0", port)).connect(new Endpoint("localhost", port)).build();
return buildEndpointPair(interBrokerPorts, brokerId);
}

@Override
public EndpointPair getControllerEndpoint(int brokerId) {
// TODO why can't we treat ZK as the controller port outside of kraft mode?
if (!clusterConfig.isKraftMode()) {
throw new IllegalStateException();
}
var port = controllerPorts.get(brokerId);
return buildEndpointPair(controllerPorts, brokerId);
}

private EndpointPair buildEndpointPair(List<Integer> portRange, int brokerId) {
var port = portRange.get(brokerId);
return EndpointPair.builder().bind(new Endpoint("0.0.0.0", port)).connect(new Endpoint("localhost", port)).build();
}
};
Expand Down Expand Up @@ -160,8 +171,7 @@ public void start() {

servers.stream().parallel().forEach(Server::startup);
// TODO expose timeout. Annotations? Provisioning Strategy duration?
final String bootstrapServers = clusterConfig.buildClientBootstrapServers(kafkaEndpoints);
Utils.ensureExpectedBrokerCountInCluster(clusterConfig.getConnectConfigForCluster(bootstrapServers), 120, TimeUnit.SECONDS, clusterConfig.getBrokersNum());
Utils.ensureExpectedBrokerCountInCluster(clusterConfig.getAnonConnectConfigForCluster(kafkaEndpoints), 120, TimeUnit.SECONDS, clusterConfig.getBrokersNum());

}

Expand All @@ -182,8 +192,7 @@ public Map<String, Object> getKafkaClientConfiguration() {

@Override
public Map<String, Object> getKafkaClientConfiguration(String user, String password) {
return clusterConfig.getConnectConfigForCluster(getBootstrapServers(),
user, password);
return clusterConfig.getConnectConfigForCluster(getBootstrapServers(), user, password);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,21 @@
*/
package io.kroxylicious.testing.kafka.testcontainers;

import com.github.dockerjava.api.command.InspectContainerResponse;
import io.kroxylicious.testing.kafka.api.KafkaCluster;
import io.kroxylicious.testing.kafka.common.KafkaClusterConfig;
import io.kroxylicious.testing.kafka.common.Utils;
import lombok.SneakyThrows;
import org.apache.kafka.common.config.SslConfigs;
import org.junit.jupiter.api.TestInfo;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.output.OutputFrame;
import org.testcontainers.images.builder.Transferable;
import org.testcontainers.lifecycle.Startable;
import org.testcontainers.lifecycle.Startables;
import org.testcontainers.utility.DockerImageName;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileWriter;
Expand All @@ -27,23 +42,6 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.apache.kafka.common.config.SslConfigs;
import org.junit.jupiter.api.TestInfo;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.output.OutputFrame;
import org.testcontainers.images.builder.Transferable;
import org.testcontainers.lifecycle.Startable;
import org.testcontainers.lifecycle.Startables;
import org.testcontainers.utility.DockerImageName;

import com.github.dockerjava.api.command.InspectContainerResponse;

import io.kroxylicious.testing.kafka.api.KafkaCluster;
import io.kroxylicious.testing.kafka.common.KafkaClusterConfig;
import io.kroxylicious.testing.kafka.common.Utils;
import lombok.SneakyThrows;

import static io.kroxylicious.testing.kafka.common.Utils.ensureExpectedBrokerCountInCluster;

/**
Expand All @@ -52,13 +50,14 @@
public class TestcontainersKafkaCluster implements Startable, KafkaCluster {

private static final System.Logger LOGGER = System.getLogger(TestcontainersKafkaCluster.class.getName());
public static final int KAFKA_PORT = 9093;
public static final int CLIENT_PORT = 9093;
public static final int ANON_PORT = 9094;
public static final int ZOOKEEPER_PORT = 2181;
private static final String QUAY_KAFKA_IMAGE_REPO = "quay.io/ogunalp/kafka-native";
private static final String QUAY_ZOOKEEPER_IMAGE_REPO = "quay.io/ogunalp/zookeeper-native";
private static DockerImageName DEFAULT_KAFKA_IMAGE = DockerImageName.parse(QUAY_KAFKA_IMAGE_REPO + ":latest-snapshot");
private static DockerImageName DEFAULT_ZOOKEEPER_IMAGE = DockerImageName.parse(QUAY_ZOOKEEPER_IMAGE_REPO + ":latest-snapshot");
private static final int READY_TIMEOUT_SECONDS = 120;
private static final int READY_TIMEOUT_SECONDS = 30;
private final DockerImageName kafkaImage;
private final DockerImageName zookeeperImage;
private final KafkaClusterConfig clusterConfig;
Expand Down Expand Up @@ -99,16 +98,22 @@ public TestcontainersKafkaCluster(DockerImageName kafkaImage, DockerImageName zo
this.zookeeper = new ZookeeperContainer(this.zookeeperImage)
.withName(name)
.withNetwork(network)
// .withEnv("QUARKUS_LOG_LEVEL", "DEBUG") // Enables org.apache.zookeeper logging too
.withEnv("QUARKUS_LOG_LEVEL", "DEBUG") // Enables org.apache.zookeeper logging too
.withNetworkAliases("zookeeper");
}

kafkaEndpoints = new KafkaClusterConfig.KafkaEndpoints() {
final List<Integer> clientPorts = Utils.preAllocateListeningPorts(clusterConfig.getBrokersNum()).collect(Collectors.toList());
final List<Integer> anonPorts = Utils.preAllocateListeningPorts(clusterConfig.getBrokersNum()).collect(Collectors.toList());

@Override
public EndpointPair getClientEndpoint(int brokerId) {
return EndpointPair.builder().bind(new Endpoint("0.0.0.0", 9093)).connect(new Endpoint("localhost", clientPorts.get(brokerId))).build();
return buildExposedEndpoint(brokerId, CLIENT_PORT, clientPorts);
}

@Override
public EndpointPair getAnonEndpoint(int brokerId) {
return buildExposedEndpoint(brokerId, ANON_PORT, anonPorts);
}

@Override
Expand All @@ -120,6 +125,13 @@ public EndpointPair getInterBrokerEndpoint(int brokerId) {
public EndpointPair getControllerEndpoint(int brokerId) {
return EndpointPair.builder().bind(new Endpoint("0.0.0.0", 9091)).connect(new Endpoint(String.format("broker-%d", brokerId), 9091)).build();
}

private EndpointPair buildExposedEndpoint(int brokerId, int internalPort, List<Integer> externalPortRange) {
return EndpointPair.builder()
.bind(new Endpoint("0.0.0.0", internalPort))
.connect(new Endpoint("localhost", externalPortRange.get(brokerId)))
.build();
}
};

Supplier<KafkaClusterConfig.KafkaEndpoints> endPointConfigSupplier = () -> kafkaEndpoints;
Expand All @@ -136,12 +148,13 @@ public EndpointPair getControllerEndpoint(int brokerId) {
copyHostKeyStoreToContainer(kafkaContainer, holder.getProperties(), SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG);

kafkaContainer
// .withEnv("QUARKUS_LOG_LEVEL", "DEBUG") // Enables org.apache.kafka logging too
.withEnv("QUARKUS_LOG_LEVEL", "DEBUG") // Enables org.apache.kafka logging too
.withEnv("SERVER_PROPERTIES_FILE", "/cnf/server.properties")
.withEnv("SERVER_CLUSTER_ID", holder.getKafkaKraftClusterId())
.withCopyToContainer(Transferable.of(propertiesToBytes(holder.getProperties()), 0644), "/cnf/server.properties")
.withStartupTimeout(Duration.ofMinutes(2));
kafkaContainer.addFixedExposedPort(holder.getExternalPort(), KAFKA_PORT);
kafkaContainer.addFixedExposedPort(holder.getExternalPort(), CLIENT_PORT);
kafkaContainer.addFixedExposedPort(holder.getAnonPort(), ANON_PORT);

if (!this.clusterConfig.isKraftMode()) {
kafkaContainer.dependsOn(this.zookeeper);
Expand Down Expand Up @@ -202,7 +215,7 @@ public void start() {
zookeeper.start();
}
Startables.deepStart(brokers.stream()).get(READY_TIMEOUT_SECONDS, TimeUnit.SECONDS);
ensureExpectedBrokerCountInCluster(clusterConfig.getConnectConfigForCluster(getBootstrapServers()), READY_TIMEOUT_SECONDS, TimeUnit.SECONDS,
ensureExpectedBrokerCountInCluster(clusterConfig.getAnonConnectConfigForCluster(kafkaEndpoints), READY_TIMEOUT_SECONDS, TimeUnit.SECONDS,
clusterConfig.getBrokersNum());
}
catch (InterruptedException | ExecutionException | TimeoutException e) {
Expand Down Expand Up @@ -236,8 +249,7 @@ public Map<String, Object> getKafkaClientConfiguration() {

@Override
public Map<String, Object> getKafkaClientConfiguration(String user, String password) {
return clusterConfig.getConnectConfigForCluster(getBootstrapServers(),
user, password);
return clusterConfig.getConnectConfigForCluster(getBootstrapServers(), user, password);
}

// In kraft mode, currently "Advertised listeners cannot be altered when using a Raft-based metadata quorum", so we
Expand Down
Loading

0 comments on commit b92cf1b

Please sign in to comment.