Skip to content

Commit

Permalink
fix : issues with Integration Tests
Browse files Browse the repository at this point in the history
  • Loading branch information
rajadilipkolli committed Oct 29, 2024
1 parent f44ff26 commit 7cf7b21
Show file tree
Hide file tree
Showing 14 changed files with 153 additions and 104 deletions.
2 changes: 1 addition & 1 deletion kafka-dsl-integration/docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ version: '3.8'
services:

broker:
image: apache/kafka:3.8.1
image: apache/kafka-native:3.8.1
hostname: broker
ports:
- "9092:9092"
Expand Down
2 changes: 1 addition & 1 deletion kafka-dsl-integration/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@
<configuration>
<java>
<palantirJavaFormat>
<version>2.47.0</version>
<version>2.50.0</version>
</palantirJavaFormat>
<importOrder />
<removeUnusedImports />
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.example.integration.kafkadsl;

import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.boot.testcontainers.service.connection.ServiceConnection;
import org.springframework.context.annotation.Bean;
import org.springframework.test.context.DynamicPropertyRegistrar;
import org.testcontainers.kafka.KafkaContainer;
import org.testcontainers.utility.DockerImageName;

@TestConfiguration(proxyBeanMethods = false)
class ContainerConfiguration {

@ServiceConnection
@Bean
KafkaContainer kafkaContainer() {
return new KafkaContainer(DockerImageName.parse("apache/kafka-native").withTag("3.8.1"));
}

@Bean
DynamicPropertyRegistrar kafkaProperties(KafkaContainer kafkaContainer) {
return (properties) -> {
properties.add("spring.kafka.bootstrapServers", kafkaContainer::getBootstrapServers);
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest(classes = TestKafkaDslApplication.class)
@SpringBootTest(classes = ContainerConfiguration.class)
class KafkaDslApplicationTests {

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -1,29 +1,12 @@
package com.example.integration.kafkadsl;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.boot.testcontainers.service.connection.ServiceConnection;
import org.springframework.context.annotation.Bean;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.testcontainers.kafka.KafkaContainer;
import org.testcontainers.utility.DockerImageName;

@TestConfiguration(proxyBeanMethods = false)
public class TestKafkaDslApplication {

@Bean
@ServiceConnection
KafkaContainer kafkaContainer(DynamicPropertyRegistry dynamicPropertyRegistry) {
KafkaContainer kafkaContainer =
new KafkaContainer(DockerImageName.parse("apache/kafka").withTag("3.8.0"));
// Connect our Spring application to our Testcontainers Kafka instance
dynamicPropertyRegistry.add("spring.kafka.bootstrap-servers", kafkaContainer::getBootstrapServers);
return kafkaContainer;
}

public static void main(String[] args) {
SpringApplication.from(KafkaDslApplication::main)
.with(TestKafkaDslApplication.class)
.with(ContainerConfiguration.class)
.run(args);
}
}
2 changes: 1 addition & 1 deletion kafka-reactor/boot-kafka-reactor-consumer/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@
<configuration>
<java>
<palantirJavaFormat>
<version>2.47.0</version>
<version>2.50.0</version>
</palantirJavaFormat>
<importOrder/>
<removeUnusedImports/>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.example.boot.kafka.reactor;

import com.example.boot.kafka.reactor.common.ContainerConfiguration;
import com.example.boot.kafka.reactor.common.TestKafkaProducer;
import com.example.boot.kafka.reactor.entity.MessageDTO;
import com.example.boot.kafka.reactor.util.AppConstants;
import java.security.SecureRandom;
Expand All @@ -21,7 +23,7 @@
import reactor.kafka.sender.SenderRecord;
import reactor.test.StepVerifier;

@SpringBootTest(classes = TestBootKafkaReactorConsumerApplication.class)
@SpringBootTest(classes = {ContainerConfiguration.class, TestKafkaProducer.class})
@ActiveProfiles("test")
@AutoConfigureWebTestClient
class BootKafkaReactorConsumerApplicationTests {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,51 +1,14 @@
package com.example.boot.kafka.reactor;

import com.example.boot.kafka.reactor.entity.MessageDTO;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.example.boot.kafka.reactor.common.ContainerConfiguration;
import com.example.boot.kafka.reactor.common.TestKafkaProducer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.boot.testcontainers.service.connection.ServiceConnection;
import org.springframework.context.annotation.Bean;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.kafka.KafkaContainer;
import org.testcontainers.utility.DockerImageName;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderOptions;

@TestConfiguration(proxyBeanMethods = false)
class TestBootKafkaReactorConsumerApplication {

private static final Logger log = LoggerFactory.getLogger(TestBootKafkaReactorConsumerApplication.class);

@Bean
@ServiceConnection
PostgreSQLContainer<?> postgresContainer() {
return new PostgreSQLContainer<>(DockerImageName.parse("postgres:17.0-alpine"));
}

@Bean
@ServiceConnection
KafkaContainer kafkaContainer(DynamicPropertyRegistry dynamicPropertyRegistry) {
KafkaContainer kafkaContainer =
new KafkaContainer(DockerImageName.parse("apache/kafka").withTag("3.8.0"));
dynamicPropertyRegistry.add("spring.kafka.bootstrapServers", kafkaContainer::getBootstrapServers);
return kafkaContainer;
}

@Bean
KafkaSender<Integer, MessageDTO> reactiveKafkaSender(KafkaProperties properties) {
log.info("Creating Sender");
Map<String, Object> props = properties.buildProducerProperties(null);
return KafkaSender.create(SenderOptions.create(props));
}

public static void main(String[] args) {
SpringApplication.from(BootKafkaReactorConsumerApplication::main)
.with(TestBootKafkaReactorConsumerApplication.class)
.with(ContainerConfiguration.class, TestKafkaProducer.class)
.run(args);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package com.example.boot.kafka.reactor.common;

import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.boot.testcontainers.service.connection.ServiceConnection;
import org.springframework.context.annotation.Bean;
import org.springframework.test.context.DynamicPropertyRegistrar;
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.kafka.KafkaContainer;
import org.testcontainers.utility.DockerImageName;

@TestConfiguration(proxyBeanMethods = false)
public class ContainerConfiguration {

@Bean
@ServiceConnection
PostgreSQLContainer<?> postgresContainer() {
return new PostgreSQLContainer<>(DockerImageName.parse("postgres").withTag("17.0-alpine"));
}

@ServiceConnection
@Bean
KafkaContainer kafkaContainer() {
return new KafkaContainer(DockerImageName.parse("apache/kafka-native").withTag("3.8.1"));
}

@Bean
DynamicPropertyRegistrar kafkaProperties(KafkaContainer kafkaContainer) {
return (properties) -> {
properties.add("spring.kafka.bootstrapServers", kafkaContainer::getBootstrapServers);
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.example.boot.kafka.reactor.common;

import com.example.boot.kafka.reactor.entity.MessageDTO;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.context.annotation.Bean;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderOptions;

@TestConfiguration(proxyBeanMethods = false)
public class TestKafkaProducer {

private static final Logger log = LoggerFactory.getLogger(TestKafkaProducer.class);

@Bean
KafkaSender<Integer, MessageDTO> reactiveKafkaSender(KafkaProperties properties) {
log.info("Creating Sender");
Map<String, Object> props = properties.buildProducerProperties(null);
return KafkaSender.create(SenderOptions.create(props));
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.example.boot.kafka.reactor;

import com.example.boot.kafka.reactor.common.ContainerConfiguration;
import com.example.boot.kafka.reactor.common.TestKafkaConsumer;
import com.example.boot.kafka.reactor.entity.MessageDTO;
import java.time.LocalDateTime;
import java.util.concurrent.TimeUnit;
Expand All @@ -18,7 +20,7 @@
import reactor.kafka.receiver.ReceiverOffset;
import reactor.test.StepVerifier;

@SpringBootTest(classes = TestBootKafkaReactorProducerApplication.class)
@SpringBootTest(classes = {ContainerConfiguration.class, TestKafkaConsumer.class})
@ActiveProfiles("test")
@AutoConfigureWebTestClient
class BootKafkaReactorProducerApplicationTests {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,51 +1,13 @@
package com.example.boot.kafka.reactor;

import static com.example.boot.kafka.reactor.util.AppConstants.HELLO_TOPIC;

import com.example.boot.kafka.reactor.entity.MessageDTO;
import java.util.Collections;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.example.boot.kafka.reactor.common.ContainerConfiguration;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.boot.testcontainers.service.connection.ServiceConnection;
import org.springframework.context.annotation.Bean;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.testcontainers.kafka.KafkaContainer;
import org.testcontainers.utility.DockerImageName;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;

@TestConfiguration(proxyBeanMethods = false)
class TestBootKafkaReactorProducerApplication {

private static final Logger LOGGER = LoggerFactory.getLogger(TestBootKafkaReactorProducerApplication.class);

@Bean
@ServiceConnection
KafkaContainer kafkaContainer(DynamicPropertyRegistry dynamicPropertyRegistry) {
KafkaContainer kafkaContainer =
new KafkaContainer(DockerImageName.parse("apache/kafka-native").withTag("3.8.0"));
dynamicPropertyRegistry.add("spring.kafka.bootstrapServers", kafkaContainer::getBootstrapServers);
return kafkaContainer;
}

@Bean
KafkaReceiver<Integer, MessageDTO> receiver(KafkaProperties properties) {
LOGGER.info("Creating receiver");
ReceiverOptions<Integer, MessageDTO> receiverOptions = ReceiverOptions.<Integer, MessageDTO>create(
properties.buildConsumerProperties(null))
.subscription(Collections.singleton(HELLO_TOPIC))
.addAssignListener(partitions -> LOGGER.debug("onPartitionsAssigned {}", partitions))
.addRevokeListener(partitions -> LOGGER.debug("onPartitionsRevoked {}", partitions));

return KafkaReceiver.create(receiverOptions);
}

public static void main(String[] args) {
SpringApplication.from(BootKafkaReactorProducerApplication::main)
.with(TestBootKafkaReactorProducerApplication.class)
.with(ContainerConfiguration.class)
.run(args);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.example.boot.kafka.reactor.common;

import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.boot.testcontainers.service.connection.ServiceConnection;
import org.springframework.context.annotation.Bean;
import org.springframework.test.context.DynamicPropertyRegistrar;
import org.testcontainers.kafka.KafkaContainer;
import org.testcontainers.utility.DockerImageName;

@TestConfiguration(proxyBeanMethods = false)
public class ContainerConfiguration {

@ServiceConnection
@Bean
KafkaContainer kafkaContainer() {
return new KafkaContainer(DockerImageName.parse("apache/kafka-native").withTag("3.8.1"));
}

@Bean
DynamicPropertyRegistrar kafkaProperties(KafkaContainer kafkaContainer) {
return (properties) -> {
properties.add("spring.kafka.bootstrapServers", kafkaContainer::getBootstrapServers);
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.example.boot.kafka.reactor.common;

import static com.example.boot.kafka.reactor.util.AppConstants.HELLO_TOPIC;

import com.example.boot.kafka.reactor.entity.MessageDTO;
import java.util.Collections;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.context.annotation.Bean;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;

@TestConfiguration(proxyBeanMethods = false)
public class TestKafkaConsumer {

private static final Logger LOGGER = LoggerFactory.getLogger(TestKafkaConsumer.class);

@Bean
KafkaReceiver<Integer, MessageDTO> receiver(KafkaProperties properties) {
LOGGER.info("Creating receiver");
ReceiverOptions<Integer, MessageDTO> receiverOptions = ReceiverOptions.<Integer, MessageDTO>create(
properties.buildConsumerProperties(null))
.subscription(Collections.singleton(HELLO_TOPIC))
.addAssignListener(partitions -> LOGGER.debug("onPartitionsAssigned {}", partitions))
.addRevokeListener(partitions -> LOGGER.debug("onPartitionsRevoked {}", partitions));

return KafkaReceiver.create(receiverOptions);
}
}

0 comments on commit 7cf7b21

Please sign in to comment.