Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat : enable and disable kafka listener #600

Merged
merged 3 commits into from
Dec 2, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package com.example.springbootkafkasample.controller;

import com.example.springbootkafkasample.dto.KafkaListenerRequest;
import com.example.springbootkafkasample.dto.MessageDTO;
import com.example.springbootkafkasample.dto.TopicInfo;
import com.example.springbootkafkasample.service.MessageService;
import com.example.springbootkafkasample.service.sender.Sender;
import java.util.List;
import java.util.Map;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
Expand All @@ -22,6 +25,17 @@ public MessageRestController(Sender sender, MessageService messageService) {
this.messageService = messageService;
}

@GetMapping("/listeners")
public ResponseEntity<Map<String, Boolean>> getListeners() {
return ResponseEntity.ok(messageService.getListeners());
}

@PostMapping("/listeners")
public ResponseEntity<Map<String, Boolean>> getListeners(
@RequestBody final KafkaListenerRequest kafkaListenerRequest) {
return ResponseEntity.ok(messageService.getListeners(kafkaListenerRequest));
}
rajadilipkolli marked this conversation as resolved.
Show resolved Hide resolved

@PostMapping("/messages")
public void sendMessage(@RequestBody MessageDTO messageDTO) {
this.sender.send(messageDTO);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package com.example.springbootkafkasample.dto;

public record KafkaListenerRequest(String containerId, Operation operation) {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package com.example.springbootkafkasample.dto;

public enum Operation {
START,
STOP
}
Original file line number Diff line number Diff line change
@@ -1,27 +1,35 @@
package com.example.springbootkafkasample.service;

import com.example.springbootkafkasample.dto.KafkaListenerRequest;
import com.example.springbootkafkasample.dto.Operation;
import com.example.springbootkafkasample.dto.TopicInfo;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ListTopicsOptions;
import org.apache.kafka.clients.admin.TopicDescription;
import org.springframework.kafka.KafkaException;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.stereotype.Service;

@Service
public class MessageService {

private final KafkaAdmin kafkaAdmin;
private final KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

public MessageService(KafkaAdmin kafkaAdmin) {
public MessageService(KafkaAdmin kafkaAdmin, KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry) {
this.kafkaAdmin = kafkaAdmin;
this.kafkaListenerEndpointRegistry = kafkaListenerEndpointRegistry;
}

public List<TopicInfo> getTopicsWithPartitions(boolean showInternalTopics) {
Expand Down Expand Up @@ -57,4 +65,25 @@ public List<TopicInfo> getTopicsWithPartitions(boolean showInternalTopics) {
topicPartitionCounts.sort(Comparator.comparing(TopicInfo::topicName));
return topicPartitionCounts;
}

public Map<String, Boolean> getListeners() {
return kafkaListenerEndpointRegistry.getListenerContainers().stream()
.collect(
Collectors.toMap(MessageListenerContainer::getListenerId, MessageListenerContainer::isRunning));
}

public Map<String, Boolean> getListeners(KafkaListenerRequest kafkaListenerRequest) {
if (kafkaListenerRequest.operation().equals(Operation.START)) {
Objects.requireNonNull(
kafkaListenerEndpointRegistry.getListenerContainer(kafkaListenerRequest.containerId()))
.start();
} else if (kafkaListenerRequest.operation().equals(Operation.STOP)) {
Objects.requireNonNull(
kafkaListenerEndpointRegistry.getListenerContainer(kafkaListenerRequest.containerId()))
.stop();
}
return kafkaListenerEndpointRegistry.getListenerContainers().stream()
.collect(
Collectors.toMap(MessageListenerContainer::getListenerId, MessageListenerContainer::isRunning));
}
rajadilipkolli marked this conversation as resolved.
Show resolved Hide resolved
rajadilipkolli marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,12 @@

import static com.example.springbootkafkasample.config.Initializer.TOPIC_TEST_1;
import static com.example.springbootkafkasample.config.Initializer.TOPIC_TEST_2;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.content;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;

import com.example.springbootkafkasample.dto.MessageDTO;
import com.example.springbootkafkasample.service.listener.Receiver2;
import java.time.Duration;
import java.util.UUID;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -67,15 +64,6 @@ void setUp() {
}
}

@Test
void sendAndReceiveData() throws InterruptedException {
template.send(TOPIC_TEST_1, UUID.randomUUID(), new MessageDTO(TOPIC_TEST_1, "foo"));
// 4 from topic1 and 3 from topic2 on startUp, plus 1 from test
await().pollInterval(Duration.ofSeconds(1))
.atMost(Duration.ofSeconds(45))
.untilAsserted(() -> assertThat(receiver2.getLatch().getCount()).isEqualTo(9));
}

@Test
void topicsWithPartitionsCount() throws Exception {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.jsonPath;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;

import com.example.springbootkafkasample.common.ContainerConfig;
import com.example.springbootkafkasample.dto.KafkaListenerRequest;
import com.example.springbootkafkasample.dto.MessageDTO;
import com.example.springbootkafkasample.dto.Operation;
import com.example.springbootkafkasample.service.listener.Receiver2;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.time.Duration;
Expand All @@ -23,7 +26,7 @@
import org.springframework.http.MediaType;
import org.springframework.test.web.servlet.MockMvc;

@SpringBootTest(classes = TestBootKafkaSampleApplication.class)
@SpringBootTest(classes = ContainerConfig.class)
@AutoConfigureMockMvc
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
class KafkaSampleIntegrationTest {
Expand All @@ -48,7 +51,7 @@ void sendAndReceiveMessage() throws Exception {

// 4 from topic1 and 3 from topic2 on startUp, plus 1 from test
await().pollInterval(Duration.ofSeconds(1))
.atMost(Duration.ofSeconds(30))
.atMost(Duration.ofSeconds(15))
.untilAsserted(() -> assertThat(receiver2.getLatch().getCount()).isEqualTo(7));
assertThat(receiver2.getDeadLetterLatch().getCount()).isEqualTo(1);
}
Expand Down Expand Up @@ -97,4 +100,63 @@ void topicsWithPartitionsCount() throws Exception {
.andExpect(jsonPath("$[6].partitionCount").value(32))
.andExpect(jsonPath("$[6].replicationCount").value(1));
}

@Test
void getListOfContainers() throws Exception {
this.mockMvc
.perform(get("/listeners"))
.andExpect(status().isOk())
.andExpect(content().contentType(MediaType.APPLICATION_JSON))
.andExpect(jsonPath("$.size()").value(5))
.andExpect(jsonPath("$.['org.springframework.kafka.KafkaListenerEndpointContainer#1-dlt']")
.value(true))
.andExpect(jsonPath("$.['org.springframework.kafka.KafkaListenerEndpointContainer#0']")
.value(true))
.andExpect(jsonPath("$.['org.springframework.kafka.KafkaListenerEndpointContainer#1']")
.value(true))
.andExpect(jsonPath("$.['org.springframework.kafka.KafkaListenerEndpointContainer#1-retry-0']")
.value(true))
.andExpect(jsonPath("$.['org.springframework.kafka.KafkaListenerEndpointContainer#1-retry-1']")
.value(true));
}

@Test
void stopAndStartContainers() throws Exception {
this.mockMvc
.perform(post("/listeners")
.content(this.objectMapper.writeValueAsString(new KafkaListenerRequest(
"org.springframework.kafka.KafkaListenerEndpointContainer#1-dlt", Operation.STOP)))
.contentType(MediaType.APPLICATION_JSON))
.andExpect(status().isOk())
.andExpect(content().contentType(MediaType.APPLICATION_JSON))
.andExpect(jsonPath("$.size()").value(5))
.andExpect(jsonPath("$.['org.springframework.kafka.KafkaListenerEndpointContainer#1-dlt']")
.value(false))
.andExpect(jsonPath("$.['org.springframework.kafka.KafkaListenerEndpointContainer#0']")
.value(true))
.andExpect(jsonPath("$.['org.springframework.kafka.KafkaListenerEndpointContainer#1']")
.value(true))
.andExpect(jsonPath("$.['org.springframework.kafka.KafkaListenerEndpointContainer#1-retry-0']")
.value(true))
.andExpect(jsonPath("$.['org.springframework.kafka.KafkaListenerEndpointContainer#1-retry-1']")
.value(true));
this.mockMvc
.perform(post("/listeners")
.content(this.objectMapper.writeValueAsString(new KafkaListenerRequest(
"org.springframework.kafka.KafkaListenerEndpointContainer#1-dlt", Operation.START)))
.contentType(MediaType.APPLICATION_JSON))
.andExpect(status().isOk())
.andExpect(content().contentType(MediaType.APPLICATION_JSON))
.andExpect(jsonPath("$.size()").value(5))
.andExpect(jsonPath("$.['org.springframework.kafka.KafkaListenerEndpointContainer#1-dlt']")
.value(true))
.andExpect(jsonPath("$.['org.springframework.kafka.KafkaListenerEndpointContainer#0']")
.value(true))
.andExpect(jsonPath("$.['org.springframework.kafka.KafkaListenerEndpointContainer#1']")
.value(true))
.andExpect(jsonPath("$.['org.springframework.kafka.KafkaListenerEndpointContainer#1-retry-0']")
.value(true))
.andExpect(jsonPath("$.['org.springframework.kafka.KafkaListenerEndpointContainer#1-retry-1']")
.value(true));
}
Comment on lines +125 to +163
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Reduce assertion duplication by extracting common verification logic

The test has duplicated assertions for verifying container states. Consider extracting a helper method to reduce duplication and improve maintainability.

+    private void assertContainerStates(ResultActions result, String targetContainer, boolean expectedState) 
+            throws Exception {
+        result
+            .andExpect(status().isOk())
+            .andExpect(content().contentType(MediaType.APPLICATION_JSON))
+            .andExpect(jsonPath("$.size()").value(EXPECTED_CONTAINER_COUNT))
+            .andExpect(jsonPath("$.['org.springframework.kafka.KafkaListenerEndpointContainer#1-dlt']")
+                    .value(targetContainer.contains("#1-dlt") ? expectedState : true))
+            .andExpect(jsonPath("$.['org.springframework.kafka.KafkaListenerEndpointContainer#0']")
+                    .value(true))
+            // ... repeat for other containers
+    }
+
     @Test
     void stopAndStartContainers() throws Exception {
-        this.mockMvc
-                .perform(post("/listeners")
-                        .content(this.objectMapper.writeValueAsString(new KafkaListenerRequest(
-                                "org.springframework.kafka.KafkaListenerEndpointContainer#1-dlt", Operation.STOP)))
-                        .contentType(MediaType.APPLICATION_JSON))
-                .andExpect(/* ... duplicated assertions ... */);
+        assertContainerStates(
+            this.mockMvc
+                    .perform(post("/listeners")
+                            .content(this.objectMapper.writeValueAsString(new KafkaListenerRequest(
+                                    "org.springframework.kafka.KafkaListenerEndpointContainer#1-dlt", Operation.STOP)))
+                            .contentType(MediaType.APPLICATION_JSON)),
+            "#1-dlt",
+            false
+        );

Committable suggestion skipped: line range outside the PR's diff.

}
Original file line number Diff line number Diff line change
@@ -1,27 +1,13 @@
package com.example.springbootkafkasample;

import com.example.springbootkafkasample.common.ContainerConfig;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.boot.testcontainers.service.connection.ServiceConnection;
import org.springframework.context.annotation.Bean;
import org.testcontainers.kafka.KafkaContainer;
import org.testcontainers.utility.DockerImageName;

@TestConfiguration(proxyBeanMethods = false)
public class TestBootKafkaSampleApplication {

@Bean
@ServiceConnection
KafkaContainer kafkaContainer() {
KafkaContainer kafkaContainer =
new KafkaContainer(DockerImageName.parse("apache/kafka-native").withTag("3.8.0"));
kafkaContainer.addEnv("KAFKA_NUM_PARTITIONS", "32");
return kafkaContainer;
}

public static void main(String[] args) {
SpringApplication.from(BootKafkaSampleApplication::main)
.with(TestBootKafkaSampleApplication.class)
.with(ContainerConfig.class)
.run(args);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.example.springbootkafkasample.common;

import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.boot.testcontainers.service.connection.ServiceConnection;
import org.springframework.context.annotation.Bean;
import org.testcontainers.kafka.KafkaContainer;
import org.testcontainers.utility.DockerImageName;

@TestConfiguration(proxyBeanMethods = false)
public class ContainerConfig {

@Bean
@ServiceConnection
KafkaContainer kafkaContainer() {
KafkaContainer kafkaContainer =
new KafkaContainer(DockerImageName.parse("apache/kafka-native").withTag("3.8.1"));
kafkaContainer.addEnv("KAFKA_NUM_PARTITIONS", "32");
return kafkaContainer;
}
}
Loading