Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use admin client to confirm cluster is correctly started #24

Merged
merged 12 commits into from
Feb 16, 2023
1 change: 0 additions & 1 deletion impl/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,19 @@
*/
package io.kroxylicious.testing.kafka.common;

import lombok.Builder;
import lombok.Getter;
import lombok.Singular;
import lombok.ToString;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.TestInfo;

import java.io.IOException;
import java.lang.annotation.Annotation;
import java.nio.file.Files;
Expand All @@ -20,25 +33,13 @@
import java.util.Properties;
import java.util.Set;
import java.util.TreeMap;
import java.util.function.IntFunction;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.junit.jupiter.api.TestInfo;

import io.kroxylicious.testing.kafka.api.KafkaClusterProvisioningStrategy;
import io.kroxylicious.testing.kafka.common.KafkaClusterConfig.KafkaEndpoints.Endpoint;
import lombok.Builder;
import lombok.Getter;
import lombok.Singular;
import lombok.ToString;

@Builder(toBuilder = true)
@Getter
Expand Down Expand Up @@ -106,7 +107,6 @@ public static boolean supportsConstraint(Class<? extends Annotation> annotation)
}

public static KafkaClusterConfig fromConstraints(List<Annotation> annotations) {
System.Logger logger = System.getLogger(KafkaClusterProvisioningStrategy.class.getName());
var builder = builder();
builder.brokersNum(1);
boolean sasl = false;
Expand All @@ -126,7 +126,8 @@ public static KafkaClusterConfig fromConstraints(List<Annotation> annotations) {
tls = true;
try {
builder.brokerKeytoolCertificateGenerator(new KeytoolCertificateGenerator());
} catch (IOException e) {
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
Expand Down Expand Up @@ -168,8 +169,10 @@ public Stream<ConfigHolder> getBrokerConfigs(Supplier<KafkaEndpoints> endPointCo

var interBrokerEndpoint = kafkaEndpoints.getInterBrokerEndpoint(brokerNum);
var clientEndpoint = kafkaEndpoints.getClientEndpoint(brokerNum);
var anonEndpoint = kafkaEndpoints.getAnonEndpoint(brokerNum);

// - EXTERNAL: used for communications to/from consumers/producers
// - EXTERNAL: used for communications to/from consumers/producers optionally with authentication
// - ANON: used for communications to/from consumers/producers without authentication primarily for the extension to validate the cluster
// - INTERNAL: used for inter-broker communications (always no auth)
// - CONTROLLER: used for inter-broker controller communications (kraft - always no auth)

Expand All @@ -180,20 +183,25 @@ public Stream<ConfigHolder> getBrokerConfigs(Supplier<KafkaEndpoints> endPointCo
var advertisedListeners = new TreeMap<String, String>();

protocolMap.put("EXTERNAL", externalListenerTransport);
listeners.put("EXTERNAL", clientEndpoint.getBind().toString());
advertisedListeners.put("EXTERNAL", clientEndpoint.getConnect().toString());
listeners.put("EXTERNAL", clientEndpoint.listenAddress());
advertisedListeners.put("EXTERNAL", clientEndpoint.advertisedAddress());

protocolMap.put("ANON", SecurityProtocol.PLAINTEXT.name());
listeners.put("ANON", anonEndpoint.listenAddress());
advertisedListeners.put("ANON", anonEndpoint.advertisedAddress());

protocolMap.put("INTERNAL", SecurityProtocol.PLAINTEXT.name());
listeners.put("INTERNAL", interBrokerEndpoint.getBind().toString());
advertisedListeners.put("INTERNAL", interBrokerEndpoint.getConnect().toString());
listeners.put("INTERNAL", interBrokerEndpoint.listenAddress());
advertisedListeners.put("INTERNAL", interBrokerEndpoint.advertisedAddress());
putConfig(server, "inter.broker.listener.name", "INTERNAL");

if (isKraftMode()) {
putConfig(server, "node.id", Integer.toString(brokerNum)); // Required by Kafka 3.3 onwards.

var controllerEndpoint = kafkaEndpoints.getControllerEndpoint(brokerNum);
var quorumVoters = IntStream.range(0, kraftControllers)
.mapToObj(b -> String.format("%d@%s", b, kafkaEndpoints.getControllerEndpoint(b).getConnect().toString())).collect(Collectors.joining(","));
.mapToObj(controllerId -> String.format("%d@//%s", controllerId, kafkaEndpoints.getControllerEndpoint(controllerId).connectAddress()))
.collect(Collectors.joining(","));
putConfig(server, "controller.quorum.voters", quorumVoters);
putConfig(server, "controller.listener.names", "CONTROLLER");
protocolMap.put("CONTROLLER", SecurityProtocol.PLAINTEXT.name());
Expand Down Expand Up @@ -241,7 +249,9 @@ public Stream<ConfigHolder> getBrokerConfigs(Supplier<KafkaEndpoints> endPointCo
throw new RuntimeException("brokerKeytoolCertificateGenerator needs to be initialized when calling KafkaClusterConfig");
}
try {
brokerKeytoolCertificateGenerator.generateSelfSignedCertificateEntry("[email protected]", clientEndpoint.getConnect().getHost(), "KI", "RedHat", null, null,
brokerKeytoolCertificateGenerator.generateSelfSignedCertificateEntry("[email protected]", clientEndpoint.getConnect().getHost(), "Dev",
"Kroxylicious.io", null,
null,
"US");
if (clientKeytoolCertificateGenerator != null && Path.of(clientKeytoolCertificateGenerator.getCertFilePath()).toFile().exists()) {
server.put(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, "required");
Expand All @@ -265,8 +275,8 @@ public Stream<ConfigHolder> getBrokerConfigs(Supplier<KafkaEndpoints> endPointCo
// Disable delay during every re-balance
putConfig(server, "group.initial.rebalance.delay.ms", Integer.toString(0));

properties.add(new ConfigHolder(server, clientEndpoint.getConnect().getPort(),
String.format("%s:%d", clientEndpoint.getConnect().getHost(), clientEndpoint.getConnect().getPort()), brokerNum, kafkaKraftClusterId));
properties.add(new ConfigHolder(server, clientEndpoint.getConnect().getPort(), anonEndpoint.getConnect().getPort(),
clientEndpoint.connectAddress(), brokerNum, kafkaKraftClusterId));
}

return properties.stream();
Expand All @@ -279,34 +289,60 @@ private static void putConfig(Properties server, String key, String value) {
}
}

@NotNull
public String buildClientBootstrapServers(KafkaEndpoints endPointConfig) {
return buildBootstrapServers(getBrokersNum(), endPointConfig::getClientEndpoint);
}

@NotNull
public String buildAnonBootstrapServers(KafkaEndpoints endPointConfig) {
return buildBootstrapServers(getBrokersNum(), endPointConfig::getAnonEndpoint);
}

@NotNull
public String buildControllerBootstrapServers(KafkaEndpoints kafkaEndpoints) {
return buildBootstrapServers(getBrokersNum(), kafkaEndpoints::getControllerEndpoint);
}

@NotNull
public String buildInterBrokerBootstrapServers(KafkaEndpoints kafkaEndpoints) {
return buildBootstrapServers(getBrokersNum(), kafkaEndpoints::getInterBrokerEndpoint);
}

public Map<String, Object> getAnonConnectConfigForCluster(KafkaEndpoints kafkaEndpoints) {
return getConnectConfigForCluster(buildAnonBootstrapServers(kafkaEndpoints), null, null, null, null);
}

public Map<String, Object> getConnectConfigForCluster(String bootstrapServers) {
if (saslMechanism != null) {
Map<String, String> users = getUsers();
if (!users.isEmpty()) {
Map.Entry<String, String> first = users.entrySet().iterator().next();
return getConnectConfigForCluster(bootstrapServers, first.getKey(), first.getKey());
return getConnectConfigForCluster(bootstrapServers, first.getKey(), first.getKey(), getSecurityProtocol(), getSaslMechanism());
}
else {
return getConnectConfigForCluster(bootstrapServers, null, null);
return getConnectConfigForCluster(bootstrapServers, null, null, getSecurityProtocol(), getSaslMechanism());
}
}
else {
return getConnectConfigForCluster(bootstrapServers, null, null);
return getConnectConfigForCluster(bootstrapServers, null, null, getSecurityProtocol(), getSaslMechanism());
}
}

public Map<String, Object> getConnectConfigForCluster(String bootstrapServers, String user, String password) {
return getConnectConfigForCluster(bootstrapServers, user, password, getSecurityProtocol(), getSaslMechanism());
}

public Map<String, Object> getConnectConfigForCluster(String bootstrapServers, String user, String password, String securityProtocol, String saslMechanism) {
Map<String, Object> kafkaConfig = new HashMap<>();
String saslMechanism = getSaslMechanism();
String securityProtocol = getSecurityProtocol();

if (securityProtocol != null) {
kafkaConfig.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol);

if (securityProtocol.contains("SSL")) {
String clientTrustStoreFilePath;
String clientTrustStorePassword;
if(clientKeytoolCertificateGenerator != null) {
if (clientKeytoolCertificateGenerator != null) {
if (Path.of(clientKeytoolCertificateGenerator.getKeyStoreLocation()).toFile().exists()) {
// SSL client auth case
kafkaConfig.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, clientKeytoolCertificateGenerator.getKeyStoreLocation());
Expand All @@ -315,7 +351,8 @@ public Map<String, Object> getConnectConfigForCluster(String bootstrapServers, S
}
try {
clientKeytoolCertificateGenerator.generateTrustStore(brokerKeytoolCertificateGenerator.getCertFilePath(), "client");
} catch (GeneralSecurityException | IOException e) {
}
catch (GeneralSecurityException | IOException e) {
throw new RuntimeException(e);
}
clientTrustStoreFilePath = clientKeytoolCertificateGenerator.getTrustStoreLocation();
Expand All @@ -328,8 +365,10 @@ public Map<String, Object> getConnectConfigForCluster(String bootstrapServers, S
clientTrustStore = Paths.get(certsDirectory.toAbsolutePath().toString(), "kafka.truststore.jks");
certsDirectory.toFile().deleteOnExit();
clientTrustStore.toFile().deleteOnExit();
brokerKeytoolCertificateGenerator.generateTrustStore(brokerKeytoolCertificateGenerator.getCertFilePath(), "client", clientTrustStore.toAbsolutePath().toString());
} catch (GeneralSecurityException | IOException e) {
brokerKeytoolCertificateGenerator.generateTrustStore(brokerKeytoolCertificateGenerator.getCertFilePath(), "client",
clientTrustStore.toAbsolutePath().toString());
}
catch (GeneralSecurityException | IOException e) {
throw new RuntimeException(e);
}
clientTrustStoreFilePath = clientTrustStore.toAbsolutePath().toString();
Expand Down Expand Up @@ -371,11 +410,19 @@ public String clusterId() {
return isKraftMode() ? kafkaKraftClusterId : null;
}

private String buildBootstrapServers(Integer numBrokers, IntFunction<KafkaEndpoints.EndpointPair> brokerEndpoint) {
return IntStream.range(0, numBrokers)
.mapToObj(brokerEndpoint)
.map(KafkaEndpoints.EndpointPair::connectAddress)
.collect(Collectors.joining(","));
}

@Builder
@Getter
public static class ConfigHolder {
private final Properties properties;
private final Integer externalPort;
private final Integer anonPort;
private final String endpoint;
private final int brokerNum;
private final String kafkaKraftClusterId;
Expand All @@ -388,11 +435,23 @@ public interface KafkaEndpoints {
class EndpointPair {
private final Endpoint bind;
private final Endpoint connect;

public String connectAddress() {
return String.format("%s:%d", connect.host, connect.port);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The %d is locale-dependent, according to the "Number Localization Algorithm" section in Formatter Javadoc, which says:

Each digit character d in the string is replaced by a locale-specific digit computed relative to the current locale's zero digit z;

To avoid this I would use concatenation using +.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

public char getZeroDigit()
Gets the character used for zero. Different for Arabic, etc.

Wow, I never knew that. I've written so much code that must fall on its face in the Arabic world.

}

public String listenAddress() {
return String.format("//%s:%d", bind.host, bind.port);
}

public String advertisedAddress() {
return String.format("//%s:%d", connect.host, connect.port);
}
}

@Builder
@Getter
public class Endpoint {
class Endpoint {
private final String host;
private final int port;

Expand All @@ -412,5 +471,7 @@ public String toString() {
EndpointPair getControllerEndpoint(int brokerId);

EndpointPair getClientEndpoint(int brokerId);

EndpointPair getAnonEndpoint(int brokerId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@
*/
package io.kroxylicious.testing.kafka.common;

import org.testcontainers.utility.DockerImageName;

import io.kroxylicious.testing.kafka.api.KafkaCluster;
import io.kroxylicious.testing.kafka.invm.InVMKafkaCluster;
import io.kroxylicious.testing.kafka.testcontainers.TestcontainersKafkaCluster;
import org.testcontainers.utility.DockerImageName;

import static java.lang.System.Logger.Level.INFO;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ public KeytoolCertificateGenerator(String certFilePath, String trustStorePath) t
this.trustStoreFilePath = (trustStorePath != null) ? Path.of(trustStorePath) : Paths.get(certsDirectory.toAbsolutePath().toString(), "kafka.truststore.jks");

certsDirectory.toFile().deleteOnExit();
if(certFilePath == null) {
if (certFilePath == null) {
this.keyStoreFilePath.toFile().deleteOnExit();
}
if(trustStorePath == null) {
if (trustStorePath == null) {
this.trustStoreFilePath.toFile().deleteOnExit();
}
this.certFilePath.toFile().deleteOnExit();
Expand Down Expand Up @@ -79,7 +79,7 @@ public void generateTrustStore(String certFilePath, String alias) throws General

public void generateTrustStore(String certFilePath, String alias, String trustStoreFilePath)
throws GeneralSecurityException, IOException {
//keytool -import -trustcacerts -keystore truststore.jks -storepass password -noprompt -alias localhost -file cert.crt
// keytool -import -trustcacerts -keystore truststore.jks -storepass password -noprompt -alias localhost -file cert.crt
KeyStore keyStore = KeyStore.getInstance("JKS");
if (Path.of(trustStoreFilePath).toFile().exists()) {
keyStore.load(new FileInputStream(trustStoreFilePath), getPassword().toCharArray());
Expand Down Expand Up @@ -164,7 +164,7 @@ private void runCommand(List<String> commandParameters) throws IOException {
final String processOutput = (new BufferedReader(new InputStreamReader(process.getInputStream()))).lines()
.collect(Collectors.joining(" \\ "));
log.log(WARNING, "Error generating certificate, error output: {0}, normal output: {1}, " +
"commandline parameters: {2}",
"commandline parameters: {2}",
processError, processOutput, commandParameters);
throw new IOException(
"Keytool execution error: '" + processError + "', output: '" + processOutput + "'" + ", commandline parameters: " + commandParameters);
Expand Down
43 changes: 43 additions & 0 deletions impl/src/main/java/io/kroxylicious/testing/kafka/common/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,29 @@

package io.kroxylicious.testing.kafka.common;

import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.common.Node;
import org.awaitility.Awaitility;
import org.hamcrest.Matchers;
import org.slf4j.Logger;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.ServerSocket;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Stream;

import static org.slf4j.LoggerFactory.getLogger;

public class Utils {
private static final Logger log = getLogger(Utils.class);

/**
* Pre-allocate 1 or more ephemeral ports which are available for use.
*
Expand All @@ -32,4 +49,30 @@ public static Stream<Integer> preAllocateListeningPorts(int num) {
throw new UncheckedIOException(e);
}
}

public static void awaitExpectedBrokerCountInCluster(Map<String, Object> connectionConfig, int timeout, TimeUnit timeUnit, Integer expectedBrokerCount) {
try (Admin admin = Admin.create(connectionConfig)) {
Awaitility.await()
.pollDelay(Duration.ZERO)
.pollInterval(1, TimeUnit.SECONDS)
.atMost(timeout, timeUnit)
.ignoreExceptions()
.until(() -> {
log.info("describing cluster: {}", connectionConfig.get("bootstrap.servers"));
final Collection<Node> nodes;
try {
nodes = admin.describeCluster().nodes().get(10, TimeUnit.SECONDS);
log.info("got nodes: {}", nodes);
return nodes;
}
catch (InterruptedException | ExecutionException e) {
log.warn("caught: {}", e.getMessage(), e);
}
catch (TimeoutException te) {
log.warn("Kafka timed out describing the the cluster");
}
return Collections.emptyList();
}, Matchers.hasSize(expectedBrokerCount));
}
}
}
Loading