Skip to content

Commit

Permalink
feat : enable and disable kafka listener (#600)
Browse files Browse the repository at this point in the history
* feat : enable and disable kafka listener

* revert to old test case

* feat : implement code review comments
  • Loading branch information
rajadilipkolli authored Dec 2, 2024
1 parent f42d055 commit a024b8f
Show file tree
Hide file tree
Showing 11 changed files with 261 additions and 34 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package com.example.springbootkafkasample.config;

import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.http.HttpStatus;
import org.springframework.http.HttpStatusCode;
import org.springframework.http.ProblemDetail;
import org.springframework.http.converter.HttpMessageNotReadableException;
import org.springframework.validation.FieldError;
import org.springframework.web.bind.MethodArgumentNotValidException;
import org.springframework.web.bind.annotation.ControllerAdvice;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.ResponseStatus;

@Order(Ordered.HIGHEST_PRECEDENCE)
@ControllerAdvice
class GlobalExceptionHandler {

@ExceptionHandler(MethodArgumentNotValidException.class)
@ResponseStatus(HttpStatus.BAD_REQUEST)
ProblemDetail onException(MethodArgumentNotValidException methodArgumentNotValidException) {
ProblemDetail problemDetail =
ProblemDetail.forStatusAndDetail(HttpStatusCode.valueOf(400), "Invalid request content.");
problemDetail.setTitle("Constraint Violation");
List<ApiValidationError> validationErrorsList = methodArgumentNotValidException.getAllErrors().stream()
.map(objectError -> {
FieldError fieldError = (FieldError) objectError;
return new ApiValidationError(
fieldError.getObjectName(),
fieldError.getField(),
fieldError.getRejectedValue(),
Objects.requireNonNull(fieldError.getDefaultMessage(), ""));
})
.sorted(Comparator.comparing(ApiValidationError::field))
.toList();
problemDetail.setProperty("violations", validationErrorsList);
return problemDetail;
}

@ExceptionHandler(IllegalArgumentException.class)
ProblemDetail handleIllegalArgumentException(IllegalArgumentException ex) {
ProblemDetail problemDetail = ProblemDetail.forStatusAndDetail(HttpStatusCode.valueOf(400), ex.getMessage());
problemDetail.setTitle("Bad Request");
return problemDetail;
}

@ExceptionHandler(HttpMessageNotReadableException.class)
public ProblemDetail handleInvalidEnum(HttpMessageNotReadableException ex) {
if (ex.getMessage()
.contains("Cannot deserialize value of type `com.example.springbootkafkasample.dto.Operation`")) {
return ProblemDetail.forStatusAndDetail(
HttpStatusCode.valueOf(400), "Invalid operation value. Allowed values are: START, STOP.");
}
return ProblemDetail.forStatusAndDetail(HttpStatusCode.valueOf(400), "Invalid request.");
}

record ApiValidationError(String object, String field, Object rejectedValue, String message) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.springframework.kafka.annotation.KafkaListenerConfigurer;
import org.springframework.kafka.config.KafkaListenerEndpointRegistrar;
import org.springframework.kafka.listener.KafkaListenerErrorHandler;
import org.springframework.lang.NonNull;
import org.springframework.validation.beanvalidation.LocalValidatorFactoryBean;

@Configuration
Expand All @@ -23,7 +24,7 @@ public KafkaConfig(LocalValidatorFactoryBean validator) {
}

@Override
public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
public void configureKafkaListeners(@NonNull KafkaListenerEndpointRegistrar registrar) {
registrar.setValidator(this.validator);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,19 @@
package com.example.springbootkafkasample.controller;

import com.example.springbootkafkasample.dto.KafkaListenerRequest;
import com.example.springbootkafkasample.dto.MessageDTO;
import com.example.springbootkafkasample.dto.TopicInfo;
import com.example.springbootkafkasample.service.MessageService;
import com.example.springbootkafkasample.service.sender.Sender;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.media.Content;
import io.swagger.v3.oas.annotations.media.Schema;
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import io.swagger.v3.oas.annotations.responses.ApiResponses;
import jakarta.validation.Valid;
import java.util.List;
import java.util.Map;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
Expand All @@ -22,6 +31,28 @@ public MessageRestController(Sender sender, MessageService messageService) {
this.messageService = messageService;
}

@GetMapping("/listeners")
public ResponseEntity<Map<String, Boolean>> getListeners() {
return ResponseEntity.ok(messageService.getListenersState());
}

@Operation(summary = "Update the state of a Kafka listener")
@ApiResponses(
value = {
@ApiResponse(
responseCode = "200",
description = "Listener state updated successfully",
content =
@Content(mediaType = "application/json", schema = @Schema(implementation = Map.class))),
@ApiResponse(responseCode = "400", description = "Invalid request", content = @Content),
@ApiResponse(responseCode = "404", description = "Listener not found", content = @Content)
})
@PostMapping("/listeners")
public ResponseEntity<Map<String, Boolean>> updateListenerState(
@RequestBody @Valid final KafkaListenerRequest kafkaListenerRequest) {
return ResponseEntity.ok(messageService.updateListenerState(kafkaListenerRequest));
}

@PostMapping("/messages")
public void sendMessage(@RequestBody MessageDTO messageDTO) {
this.sender.send(messageDTO);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.example.springbootkafkasample.dto;

import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotNull;

public record KafkaListenerRequest(
@NotBlank(message = "Container ID must not be blank") String containerId,
@NotNull(message = "Operation must not be null") Operation operation) {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package com.example.springbootkafkasample.dto;

public enum Operation {
START,
STOP
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.example.springbootkafkasample.service;

import com.example.springbootkafkasample.dto.KafkaListenerRequest;
import com.example.springbootkafkasample.dto.Operation;
import com.example.springbootkafkasample.dto.TopicInfo;
import java.util.ArrayList;
import java.util.Comparator;
Expand All @@ -8,20 +10,25 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ListTopicsOptions;
import org.apache.kafka.clients.admin.TopicDescription;
import org.springframework.kafka.KafkaException;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.stereotype.Service;

@Service
public class MessageService {

private final KafkaAdmin kafkaAdmin;
private final KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

public MessageService(KafkaAdmin kafkaAdmin) {
public MessageService(KafkaAdmin kafkaAdmin, KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry) {
this.kafkaAdmin = kafkaAdmin;
this.kafkaListenerEndpointRegistry = kafkaListenerEndpointRegistry;
}

public List<TopicInfo> getTopicsWithPartitions(boolean showInternalTopics) {
Expand Down Expand Up @@ -57,4 +64,29 @@ public List<TopicInfo> getTopicsWithPartitions(boolean showInternalTopics) {
topicPartitionCounts.sort(Comparator.comparing(TopicInfo::topicName));
return topicPartitionCounts;
}

public Map<String, Boolean> getListenersState() {
return kafkaListenerEndpointRegistry.getListenerContainers().stream()
.collect(
Collectors.toMap(MessageListenerContainer::getListenerId, MessageListenerContainer::isRunning));
}

public Map<String, Boolean> updateListenerState(KafkaListenerRequest kafkaListenerRequest) {
MessageListenerContainer listenerContainer =
kafkaListenerEndpointRegistry.getListenerContainer(kafkaListenerRequest.containerId());
if (listenerContainer == null) {
throw new IllegalArgumentException(
"Listener container with ID '" + kafkaListenerRequest.containerId() + "' not found");
}
if (kafkaListenerRequest.operation().equals(Operation.START)) {
if (!listenerContainer.isRunning()) {
listenerContainer.start();
}
} else if (kafkaListenerRequest.operation().equals(Operation.STOP)) {
if (listenerContainer.isRunning()) {
listenerContainer.stop();
}
}
return getListenersState();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,6 @@ spring.kafka.consumer.properties.spring.json.trusted.packages=com.example.spring

spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.UUIDSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.threads.virtual.enabled=true

spring.threads.virtual.enabled=true
spring.mvc.problemdetails.enabled=true
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,12 @@

import static com.example.springbootkafkasample.config.Initializer.TOPIC_TEST_1;
import static com.example.springbootkafkasample.config.Initializer.TOPIC_TEST_2;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.content;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;

import com.example.springbootkafkasample.dto.MessageDTO;
import com.example.springbootkafkasample.service.listener.Receiver2;
import java.time.Duration;
import java.util.UUID;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -67,15 +64,6 @@ void setUp() {
}
}

@Test
void sendAndReceiveData() throws InterruptedException {
template.send(TOPIC_TEST_1, UUID.randomUUID(), new MessageDTO(TOPIC_TEST_1, "foo"));
// 4 from topic1 and 3 from topic2 on startUp, plus 1 from test
await().pollInterval(Duration.ofSeconds(1))
.atMost(Duration.ofSeconds(45))
.untilAsserted(() -> assertThat(receiver2.getLatch().getCount()).isEqualTo(9));
}

@Test
void topicsWithPartitionsCount() throws Exception {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,14 @@
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.jsonPath;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;

import com.example.springbootkafkasample.common.ContainerConfig;
import com.example.springbootkafkasample.dto.KafkaListenerRequest;
import com.example.springbootkafkasample.dto.MessageDTO;
import com.example.springbootkafkasample.dto.Operation;
import com.example.springbootkafkasample.service.listener.Receiver2;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.time.Duration;
import org.hamcrest.CoreMatchers;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
Expand All @@ -23,7 +27,7 @@
import org.springframework.http.MediaType;
import org.springframework.test.web.servlet.MockMvc;

@SpringBootTest(classes = TestBootKafkaSampleApplication.class)
@SpringBootTest(classes = ContainerConfig.class)
@AutoConfigureMockMvc
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
class KafkaSampleIntegrationTest {
Expand All @@ -40,6 +44,7 @@ class KafkaSampleIntegrationTest {
@Test
@Order(1)
void sendAndReceiveMessage() throws Exception {
long initialCount = receiver2.getLatch().getCount();
this.mockMvc
.perform(post("/messages")
.content(this.objectMapper.writeValueAsString(new MessageDTO("test_1", "junitTest")))
Expand All @@ -48,8 +53,8 @@ void sendAndReceiveMessage() throws Exception {

// 4 from topic1 and 3 from topic2 on startUp, plus 1 from test
await().pollInterval(Duration.ofSeconds(1))
.atMost(Duration.ofSeconds(30))
.untilAsserted(() -> assertThat(receiver2.getLatch().getCount()).isEqualTo(7));
.atMost(Duration.ofSeconds(15))
.untilAsserted(() -> assertThat(receiver2.getLatch().getCount()).isEqualTo(initialCount - 1));
assertThat(receiver2.getDeadLetterLatch().getCount()).isEqualTo(1);
}

Expand Down Expand Up @@ -97,4 +102,91 @@ void topicsWithPartitionsCount() throws Exception {
.andExpect(jsonPath("$[6].partitionCount").value(32))
.andExpect(jsonPath("$[6].replicationCount").value(1));
}

@Test
void getListOfContainers() throws Exception {
this.mockMvc
.perform(get("/listeners"))
.andExpect(status().isOk())
.andExpect(content().contentType(MediaType.APPLICATION_JSON))
.andExpect(jsonPath("$.size()").value(5))
.andExpect(jsonPath("$.['org.springframework.kafka.KafkaListenerEndpointContainer#1-dlt']")
.value(true))
.andExpect(jsonPath("$.['org.springframework.kafka.KafkaListenerEndpointContainer#0']")
.value(true))
.andExpect(jsonPath("$.['org.springframework.kafka.KafkaListenerEndpointContainer#1']")
.value(true))
.andExpect(jsonPath("$.['org.springframework.kafka.KafkaListenerEndpointContainer#1-retry-0']")
.value(true))
.andExpect(jsonPath("$.['org.springframework.kafka.KafkaListenerEndpointContainer#1-retry-1']")
.value(true));
}

@Test
void stopAndStartContainers() throws Exception {
this.mockMvc
.perform(post("/listeners")
.content(this.objectMapper.writeValueAsString(new KafkaListenerRequest(
"org.springframework.kafka.KafkaListenerEndpointContainer#1-dlt", Operation.STOP)))
.contentType(MediaType.APPLICATION_JSON))
.andExpect(status().isOk())
.andExpect(content().contentType(MediaType.APPLICATION_JSON))
.andExpect(jsonPath("$.size()").value(5))
.andExpect(jsonPath("$.['org.springframework.kafka.KafkaListenerEndpointContainer#1-dlt']")
.value(false))
.andExpect(jsonPath("$.['org.springframework.kafka.KafkaListenerEndpointContainer#0']")
.value(true))
.andExpect(jsonPath("$.['org.springframework.kafka.KafkaListenerEndpointContainer#1']")
.value(true))
.andExpect(jsonPath("$.['org.springframework.kafka.KafkaListenerEndpointContainer#1-retry-0']")
.value(true))
.andExpect(jsonPath("$.['org.springframework.kafka.KafkaListenerEndpointContainer#1-retry-1']")
.value(true));
this.mockMvc
.perform(post("/listeners")
.content(this.objectMapper.writeValueAsString(new KafkaListenerRequest(
"org.springframework.kafka.KafkaListenerEndpointContainer#1-dlt", Operation.START)))
.contentType(MediaType.APPLICATION_JSON))
.andExpect(status().isOk())
.andExpect(content().contentType(MediaType.APPLICATION_JSON))
.andExpect(jsonPath("$.size()").value(5))
.andExpect(jsonPath("$.['org.springframework.kafka.KafkaListenerEndpointContainer#1-dlt']")
.value(true))
.andExpect(jsonPath("$.['org.springframework.kafka.KafkaListenerEndpointContainer#0']")
.value(true))
.andExpect(jsonPath("$.['org.springframework.kafka.KafkaListenerEndpointContainer#1']")
.value(true))
.andExpect(jsonPath("$.['org.springframework.kafka.KafkaListenerEndpointContainer#1-retry-0']")
.value(true))
.andExpect(jsonPath("$.['org.springframework.kafka.KafkaListenerEndpointContainer#1-retry-1']")
.value(true));
}

@Test
void invalidContainerOperation() throws Exception {
this.mockMvc
.perform(post("/listeners")
.content(objectMapper.writeValueAsString(
new KafkaListenerRequest("invalid-container-id", Operation.STOP)))
.contentType(MediaType.APPLICATION_JSON))
.andExpect(status().isBadRequest())
.andExpect(content().contentType(MediaType.APPLICATION_PROBLEM_JSON_VALUE));
}

@Test
void whenInvalidOperation_thenReturnsBadRequest() throws Exception {
String invalidRequest = "{ \"containerId\": \"myListener\", \"operation\": \"INVALID\" }";

mockMvc.perform(post("/listeners")
.contentType(MediaType.APPLICATION_JSON)
.content(invalidRequest))
.andExpect(status().isBadRequest())
.andExpect(content().contentType(MediaType.APPLICATION_PROBLEM_JSON_VALUE))
.andExpect(jsonPath("$.type", CoreMatchers.is("about:blank")))
.andExpect(jsonPath("$.title", CoreMatchers.is("Bad Request")))
.andExpect(jsonPath("$.status", CoreMatchers.is(400)))
.andExpect(jsonPath(
"$.detail", CoreMatchers.is("Invalid operation value. Allowed values are: START, STOP.")))
.andExpect(jsonPath("$.instance", CoreMatchers.is("/listeners")));
}
}
Loading

0 comments on commit a024b8f

Please sign in to comment.