Skip to content

Commit

Permalink
feat : upgrade to spring boot 3.4 and polish tc config
Browse files Browse the repository at this point in the history
  • Loading branch information
rajadilipkolli committed Dec 7, 2024
1 parent 5f6bb9f commit f264514
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 173 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
*/
package com.example.springbootkafkaavro.model;

import org.apache.avro.generic.GenericArray;
import org.apache.avro.specific.SpecificData;
import org.apache.avro.util.Utf8;
import org.apache.avro.message.BinaryMessageEncoder;
Expand Down
13 changes: 4 additions & 9 deletions kafka-avro/spring-boot-kafka-avro-producer/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.3.6</version>
<version>3.4.0</version>
<relativePath /> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
Expand All @@ -18,8 +18,8 @@
<properties>
<java.version>21</java.version>
<avro.version>1.12.0</avro.version>
<confluent.version>7.8.0</confluent.version>
<springdoc-openapi.version>2.6.0</springdoc-openapi.version>
<confluent.version>7.7.2</confluent.version>
<springdoc-openapi.version>2.7.0</springdoc-openapi.version>

<spotless.version>2.43.0</spotless.version>
</properties>
Expand All @@ -43,11 +43,6 @@
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.glassfish.jaxb</groupId>
<artifactId>jaxb-runtime</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.springdoc</groupId>
Expand Down Expand Up @@ -154,7 +149,7 @@
<configuration>
<java>
<googleJavaFormat>
<version>1.19.2</version>
<version>1.25.0</version>
<style>AOSP</style>
</googleJavaFormat>
</java>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;

import com.example.springbootkafkaavro.containers.KafkaContainersConfig;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -23,7 +24,7 @@
"spring.kafka.consumer.value-deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer",
"spring.kafka.properties.specific.avro.reader=true"
},
classes = TestSpringBootKafkaAvroProducerApplication.class)
classes = KafkaContainersConfig.class)
@AutoConfigureMockMvc
@Import(AvroKafkaListener.class)
@ExtendWith(OutputCaptureExtension.class)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,111 +1,13 @@
package com.example.springbootkafkaavro;

import com.example.springbootkafkaavro.containers.KafkaRaftWithExtraListenersContainer;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.List;
import org.junit.runner.Description;
import org.junit.runners.model.Statement;
import com.example.springbootkafkaavro.containers.KafkaContainersConfig;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.boot.testcontainers.service.connection.ServiceConnection;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.DependsOn;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.testcontainers.DockerClientFactory;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.wait.strategy.Wait;

@TestConfiguration(proxyBeanMethods = false)
public class TestSpringBootKafkaAvroProducerApplication {

private static final String KAFKA_NETWORK = "kafka-network";

Network network = getNetwork();

static Network getNetwork() {
Network defaultDaprNetwork =
new Network() {
@Override
public String getId() {
return KAFKA_NETWORK;
}

@Override
public void close() {}

@Override
public Statement apply(Statement base, Description description) {
return null;
}
};

List<com.github.dockerjava.api.model.Network> networks =
DockerClientFactory.instance()
.client()
.listNetworksCmd()
.withNameFilter(KAFKA_NETWORK)
.exec();
if (networks.isEmpty()) {
Network.builder()
.createNetworkCmdModifier(cmd -> cmd.withName(KAFKA_NETWORK))
.build()
.getId();
}
return defaultDaprNetwork;
}

@Bean
@ServiceConnection
KafkaContainer kafkaContainer() {
return new KafkaRaftWithExtraListenersContainer("confluentinc/cp-kafka:7.7.1")
.withAdditionalListener(() -> "kafka:19092")
.withKraft()
.withNetwork(network)
.withNetworkAliases("kafka")
.withReuse(true);
}

@Bean
@DependsOn("kafkaContainer")
GenericContainer<?> schemaregistry(DynamicPropertyRegistry dynamicPropertyRegistry) {
GenericContainer<?> schemaRegistry =
new GenericContainer<>("confluentinc/cp-schema-registry:7.7.1")
.withExposedPorts(8085)
.withNetworkAliases("schemaregistry")
.withNetwork(network)
.withEnv(
"SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS",
"PLAINTEXT://kafka:19092")
.withEnv("SCHEMA_REGISTRY_LISTENERS", "http://0.0.0.0:8085")
.withEnv("SCHEMA_REGISTRY_HOST_NAME", "schemaregistry")
.withEnv("SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL", "PLAINTEXT")
.waitingFor(Wait.forHttp("/subjects"))
.withStartupTimeout(Duration.of(120, ChronoUnit.SECONDS))
.withLabel("com.testcontainers.desktop.service", "cp-schema-registry");
dynamicPropertyRegistry.add(
"spring.kafka.producer.properties.schema.registry.url",
() ->
"http://%s:%d"
.formatted(
schemaRegistry.getHost(),
schemaRegistry.getMappedPort(8085)));
dynamicPropertyRegistry.add(
"spring.kafka.properties.schema.registry.url",
() ->
"http://%s:%d"
.formatted(
schemaRegistry.getHost(),
schemaRegistry.getMappedPort(8085)));

return schemaRegistry;
}
class TestSpringBootKafkaAvroProducerApplication {

public static void main(String[] args) {
SpringApplication.from(SpringBootKafkaAvroProducerApplication::main)
.with(TestSpringBootKafkaAvroProducerApplication.class)
.with(KafkaContainersConfig.class)
.run(args);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package com.example.springbootkafkaavro.containers;

import java.time.Duration;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.boot.testcontainers.service.connection.ServiceConnection;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.DependsOn;
import org.springframework.test.context.DynamicPropertyRegistrar;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.kafka.ConfluentKafkaContainer;
import org.testcontainers.utility.DockerImageName;

@TestConfiguration(proxyBeanMethods = false)
public class KafkaContainersConfig {

private final Network network = Network.newNetwork();
private final String CONFLUENT_VERSION = "7.7.2";

@Bean
@ServiceConnection
ConfluentKafkaContainer kafkaContainer() {
ConfluentKafkaContainer confluentKafkaContainer =
new ConfluentKafkaContainer(DockerImageName.parse("confluentinc/cp-kafka").withTag(CONFLUENT_VERSION))
.withListener("tc-kafka:19092") // Internal alias and port
.withNetwork(network) // Shared network for communication
.withNetworkAliases("tc-kafka") // Alias to match Schema Registry
.withReuse(true);
confluentKafkaContainer.start();
return confluentKafkaContainer;
}

@Bean
@DependsOn("kafkaContainer")
GenericContainer<?> schemaRegistry() {
GenericContainer<?> schemaRegistry =
new GenericContainer<>(DockerImageName.parse("confluentinc/cp-schema-registry").withTag(CONFLUENT_VERSION))
.withExposedPorts(8085)
.withNetworkAliases("schemaregistry") // Alias for Schema Registry
.withNetwork(network) // Use the same network as Kafka
.withEnv(
"SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS",
"PLAINTEXT://tc-kafka:19092") // Match Kafka alias and port
.withEnv("SCHEMA_REGISTRY_LISTENERS", "http://0.0.0.0:8085")
.withEnv("SCHEMA_REGISTRY_HOST_NAME", "schemaregistry")
.withEnv("SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL", "PLAINTEXT")
.waitingFor(Wait.forHttp("/subjects").forStatusCode(200))
.withStartupTimeout(Duration.ofSeconds(60));
schemaRegistry.start();
return schemaRegistry;
}

@Bean
DynamicPropertyRegistrar dynamicPropertyRegistrar(GenericContainer<?> schemaRegistry) {
return dynamicProperty -> {
dynamicProperty.add(
"spring.kafka.producer.properties.schema.registry.url",
() ->
"http://%s:%d"
.formatted(
schemaRegistry.getHost(),
schemaRegistry.getMappedPort(8085)));
dynamicProperty.add(
"spring.kafka.properties.schema.registry.url",
() ->
"http://%s:%d"
.formatted(
schemaRegistry.getHost(),
schemaRegistry.getMappedPort(8085)));
};
}
}

This file was deleted.

0 comments on commit f264514

Please sign in to comment.